Commit 0a9db566 authored by Jerome Mariette's avatar Jerome Mariette
Browse files

execute the workflow only once

parent d6b5b7e4
......@@ -75,7 +75,6 @@ class Workflow(threading.Thread):
self.function = "process"
self.comp_pckg = self._import_components()
self.engine_arguments = ''
self.component_nameids_is_init = False
self.component_nameids = {}
# try to parse engine arguments
try:
......@@ -98,6 +97,21 @@ class Workflow(threading.Thread):
os.makedirs(self.directory, 0751)
self._serialize()
def init_component_nameids(self):
# TODO
# could be better with a better parsing of the add_component parameters
for line in inspect.getsourcelines(getattr(self, self.function))[0]:
if line.find("self.add_component(") != -1:
prefix = "default"
params = line.strip().split("self.add_component(")[1][:-1]
try: prefix = params.split("component_prefix=")[1][1:-1]
except: pass
component_name = params.split(",")[0][1:-1]
if self._component_is_duplicated(component_name+"."+prefix):
raise ValueError("Component " + component_name + " with prefix " +
prefix + " already exist in this pipeline!")
self.component_nameids[component_name+"."+prefix] = None
@staticmethod
def config_parser(arg_line):
for arg in arg_line.split():
......@@ -129,22 +143,12 @@ class Workflow(threading.Thread):
cmpt_object.prefix = component_prefix
if kwargs: cmpt_object.define_parameters(**kwargs)
else: cmpt_object.define_parameters(*args)
# if already init, add the component to the list and check if weaver should be executed
if self.component_nameids_is_init:
# add the component
self.components.append(cmpt_object)
# if this one require a dynamic pipeline, execute the first part
if cmpt_object.is_dynamic:
self._execute_weaver()
else:
if self._component_is_duplicated(cmpt_object):
raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
cmpt_object.prefix + " already exist in this pipeline!")
self.component_nameids[cmpt_object.get_nameid()] = None
# add the component
self.components.append(cmpt_object)
# if this one require a dynamic pipeline, execute the first part
if cmpt_object.is_dynamic:
self._execute_weaver()
return cmpt_object
else:
raise ImportError(component_name + " component cannot be loaded, available components are: {0}".format(", ".join(self.comp_pckg.keys())))
......@@ -177,6 +181,8 @@ class Workflow(threading.Thread):
"""
Only require for Threading
"""
# first init the component list within the workflow
self.init_component_nameids()
self.start_time = time.time()
self.status = self.STATUS_STARTED
self.end_time = None
......@@ -185,17 +191,6 @@ class Workflow(threading.Thread):
self.pre_process()
# serialize the workflow before running
self._serialize()
# if not done yet, init the component nameids list before to run
if not self.component_nameids_is_init:
try:
getattr(self, self.function)()
except SystemExit, e:
self.status = self.STATUS_FAILED
self.end_time = time.time()
self._serialize()
raise
self.component_nameids_is_init = True
self._serialize()
try:
getattr(self, self.function)()
except SystemExit, e:
......@@ -398,8 +393,8 @@ class Workflow(threading.Thread):
pickle.dump(self, workflow_dump)
workflow_dump.close()
def _component_is_duplicated(self, component):
if component.get_nameid() in self.component_nameids.keys():
def _component_is_duplicated(self, component_nameid):
if component_nameid in self.component_nameids.keys():
return True
return False
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment