Commit 851c30e2 authored by Jerome Mariette's avatar Jerome Mariette
Browse files

fix a bug when reruning a workflow

parent 13b8c89b
......@@ -74,6 +74,7 @@ class Workflow(threading.Thread):
self.status = self.STATUS_STARTED
self.start_time = None
self.end_time = None
self.step = None
self.function = function
self.parameters_section = parameters_section
self.comp_pckg = self._import_components()
......@@ -101,9 +102,12 @@ class Workflow(threading.Thread):
self._serialize()
def init_component_nameids(self):
logging.getLogger("Workflow.init_component_nameids").debug("Initializing components names ids")
logging.getLogger("Workflow.init_component_nameids").debug("Name ids in the array : " + str(self.component_nameids.keys()))
workflow_command_lines = inspect.getsourcelines(getattr(self, self.function))[0]
for i in range(0, len(workflow_command_lines)):
line = workflow_command_lines[i]
logging.getLogger("Workflow.init_component_nameids").debug("Line : " + line)
if line.find("self.add_component") != -1 and not re.search('^\s*\#', line) :
while re.search('\\\s*$', line) or re.search(',\s*$', line): # While line ends with backslash
# Concatenate next line
......@@ -141,6 +145,7 @@ class Workflow(threading.Thread):
# Find component prefix
if len(args_split_clean) >= 4:
component_prefix = args_split_clean[3].strip().strip('"')
logging.getLogger("Workflow.init_component_nameids").debug("Component added : " + component_name+"."+component_prefix)
if self._component_is_duplicated(component_name+"."+component_prefix):
raise ValueError("Component " + component_name + " with prefix " +
component_prefix + " already exist in this pipeline!")
......@@ -243,30 +248,37 @@ class Workflow(threading.Thread):
Only require for Threading
"""
# if this is the first time the workflow run
if self.start_time == None :
if self.step == None :
self.init_component_nameids()
self.start_time = time.time()
self.status = self.STATUS_STARTED
self.end_time = None
# serialize the workflow before pre processing
self._serialize()
self.pre_process()
# serialize the workflow before running
self._serialize()
try:
getattr(self, self.function)()
except SystemExit, e:
self.status = self.STATUS_FAILED
self.step = 0
self.status = self.STATUS_STARTED
self.end_time = None
self._serialize()
# if pre_processing has not been done yet
if self.step == 0:
self.pre_process()
self.step = 1
self._serialize()
# if running workflow has not been done yet
if self.step == 1:
try:
getattr(self, self.function)()
except SystemExit, e:
self.status = self.STATUS_FAILED
self.end_time = time.time()
self._serialize()
raise
if len(self.components) > 0:
self._execute_weaver()
self.step = 2
self._serialize()
# if post processing has ne been done yet
if self.step == 2:
self.post_process()
if self.status == self.STATUS_STARTED: self.status = self.STATUS_COMPLETED
self.end_time = time.time()
self._serialize()
raise
if len(self.components) > 0:
self._execute_weaver()
self._serialize()
self.post_process()
if self.status == self.STATUS_STARTED: self.status = self.STATUS_COMPLETED
self.end_time = time.time()
self._serialize()
def _extend_and_format_args(self, parameters, args):
extended_args = {}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment