Commit 2c4106b1 authored by Jerome Mariette's avatar Jerome Mariette
Browse files

handle multiple methodes for a workflow

parent a7acd848
......@@ -149,7 +149,7 @@ if __name__ == '__main__':
sub_parser.set_defaults(cmd_object="status")
# Add available pipelines
wf_instances = wfmanager.get_available_workflows()
wf_instances, wf_methodes = wfmanager.get_available_workflows()
wf_classes = []
for instance in wf_instances:
wf_classes.append(instance.__class__.__name__)
......
......@@ -64,7 +64,7 @@ class Workflow(threading.Thread):
STATUS_ABORTED = "aborted"
def __init__(self, args={}, id=None, function= "process", parameters_section=None):
def __init__(self, args={}, id=None, function= "process"):
"""
"""
# define as a thread
......@@ -83,7 +83,6 @@ class Workflow(threading.Thread):
self.stderr = None
self.dynamic_component_present = False
self.function = function
self.parameters_section = parameters_section
self.comp_pckg = self._import_components()
# intruduce --log-verbose to be able to monitor the new version of makeflow >=4.2.2
self.engine_arguments = ' --log-verbose '
......@@ -103,7 +102,7 @@ class Workflow(threading.Thread):
# define the parameters
self.params_order = []
self.define_parameters(self.parameters_section)
self.define_parameters(self.function)
# add the metadata parameter
self.add_parameter("metadata", "Which metadata should be linked to this workflow", is_list=True)
# if some args are provided, let's fill the parameters
......@@ -480,7 +479,7 @@ class Workflow(threading.Thread):
"""
raise NotImplementedError( "Workflow.get_description() must be implemented in " + self.__class__.__name__ )
def define_parameters(self, parameters_section=None):
def define_parameters(self, function="process"):
"""
Define the workflow parameters, has to be implemented by subclasses
"""
......
......@@ -61,18 +61,28 @@ class WorkflowsManager(object):
pickle.dump(workflows_dump, open(self.dump_file, "w"))
WorkflowsManager.lock_workflows_file.release()
def get_available_workflows(self, parameters_section="parameters"):
wf_instances = []
def get_available_workflows(self, function="process"):
if function.__class__.__name__ == "str":
functions = [function]
else:
functions = set(function)
wf_instances, wf_methodes = [], []
# Load all modules within the workflow module
for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
__import__(modname)
# Search for Workflow classes
for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
if issubclass(obj, jflow.workflow.Workflow) and obj.__name__ != jflow.workflow.Workflow.__name__:
# try to build the workflow
try: wf_instances.append(obj(parameters_section=parameters_section))
except: pass
return wf_instances
for function in functions:
# check if the workflow has the requested methode
for ifunction in inspect.getmembers(obj, predicate=inspect.ismethod):
if ifunction[0] == function:
# try to build the workflow
try:
wf_instances.append(obj(function=function))
wf_methodes.append(function)
except: pass
return [wf_instances, wf_methodes]
def rerun_workflow(self, workflow_id):
workflow = self.get_workflow(workflow_id)
......@@ -80,13 +90,13 @@ class WorkflowsManager(object):
workflow.set_stderr()
workflow.start()
def run_workflow(self, workflow_class, args, function="process", parameters_section="parameters"):
def run_workflow(self, workflow_class, args, function="process"):
# Load all modules within the workflow module
for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
__import__(modname)
# Search for Workflow classes
for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
if class_name == workflow_class: workflow = obj(args, self.get_next_id(), function, parameters_section)
if class_name == workflow_class: workflow = obj(args, self.get_next_id(), function)
workflow.start()
# Add the workflow dump path to the workflows dump
self._add_workflow(workflow)
......
......@@ -23,7 +23,7 @@ class Alignment (Workflow):
def get_description(self):
return "align reads against a reference genome"
def define_parameters(self, parameters_section=None):
def define_parameters(self, function="process"):
self.add_input_file("read_1", "Which read1 files should be used", required=True, is_list=True)
self.add_input_file("read_2", "Which read2 files should be used (if single end, leave empty)", type="localfile", is_list=True)
self.add_input_file("reference_genome", "Which genome should the read being align on", required=True, type="localfile")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment