Commit b2e91dd7 authored by Ibouniyamine Nabihoudine's avatar Ibouniyamine Nabihoudine
Browse files

single component workflows

parent 2510ab3f
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
# #
import sys import sys
import argparse
import time import time
try: try:
...@@ -26,35 +25,9 @@ except ImportError: ...@@ -26,35 +25,9 @@ except ImportError:
from jflow.workflows_manager import WorkflowsManager from jflow.workflows_manager import WorkflowsManager
from jflow.workflow import Workflow from jflow.workflow import Workflow
from jflow.argparser import JflowArgumentParser
import jflow.utils as utils import jflow.utils as utils
class JflowArgumentParser (argparse.ArgumentParser):
def _read_args_from_files(self, arg_strings):
# expand arguments referencing files
new_arg_strings = []
for arg_string in arg_strings:
# if it's not a comment or an empty line
if not arg_string.startswith("#") and arg_string:
# for regular arguments, just add them back into the list
if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
new_arg_strings.append(arg_string)
# replace arguments referencing files with the file content
else:
try:
with open(arg_string[1:]) as args_file:
arg_strings = []
# give to the convert_arg_line_to_args a table of lines instead of line per line
for arg in self.convert_arg_line_to_args(args_file.read().splitlines()):
arg_strings.append(arg)
arg_strings = self._read_args_from_files(arg_strings)
new_arg_strings.extend(arg_strings)
except OSError:
err = _sys.exc_info()[1]
self.error(str(err))
# return the modified argument list
return new_arg_strings
if __name__ == '__main__': if __name__ == '__main__':
# Create a workflow manager to get access to our workflows # Create a workflow manager to get access to our workflows
...@@ -104,7 +77,7 @@ if __name__ == '__main__': ...@@ -104,7 +77,7 @@ if __name__ == '__main__':
wf_instances, wf_methodes = wfmanager.get_available_workflows() wf_instances, wf_methodes = wfmanager.get_available_workflows()
wf_classes = [] wf_classes = []
for instance in wf_instances: for instance in wf_instances:
wf_classes.append(instance.__class__.__name__) wf_classes.append(instance.get_classname())
# create the subparser for each applications # create the subparser for each applications
sub_parser = subparsers.add_parser(instance.name, help=instance.description, fromfile_prefix_chars='@') sub_parser = subparsers.add_parser(instance.name, help=instance.description, fromfile_prefix_chars='@')
sub_parser.convert_arg_line_to_args = instance.__class__.config_parser sub_parser.convert_arg_line_to_args = instance.__class__.config_parser
...@@ -127,7 +100,7 @@ if __name__ == '__main__': ...@@ -127,7 +100,7 @@ if __name__ == '__main__':
pgroup = sub_parser.add_argument_group(group) pgroup = sub_parser.add_argument_group(group)
for param in parameters_groups[group]: for param in parameters_groups[group]:
pgroup.add_argument(param.flag, **param.export_to_argparse()) pgroup.add_argument(param.flag, **param.export_to_argparse())
sub_parser.set_defaults(cmd_object=instance.__class__.__name__) sub_parser.set_defaults(cmd_object=instance.get_classname())
args = vars(parser.parse_args()) args = vars(parser.parse_args())
if args["cmd_object"] in wf_classes: if args["cmd_object"] in wf_classes:
......
#
# Copyright (C) 2015 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import argparse
class JflowArgumentParser (argparse.ArgumentParser):
def _read_args_from_files(self, arg_strings):
# expand arguments referencing files
new_arg_strings = []
for arg_string in arg_strings:
# if it's not a comment or an empty line
if not arg_string.startswith("#") and arg_string:
# for regular arguments, just add them back into the list
if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
new_arg_strings.append(arg_string)
# replace arguments referencing files with the file content
else:
try:
with open(arg_string[1:]) as args_file:
arg_strings = []
# give to the convert_arg_line_to_args a table of lines instead of line per line
for arg in self.convert_arg_line_to_args(args_file.read().splitlines()):
arg_strings.append(arg)
arg_strings = self._read_args_from_files(arg_strings)
new_arg_strings.extend(arg_strings)
except OSError:
err = _sys.exc_info()[1]
self.error(str(err))
# return the modified argument list
return new_arg_strings
\ No newline at end of file
...@@ -285,7 +285,7 @@ class JFlowServer (object): ...@@ -285,7 +285,7 @@ class JFlowServer (object):
groups.append(param.group) groups.append(param.group)
workflows.append({"name": instance.name, workflows.append({"name": instance.name,
"help": instance.description, "help": instance.description,
"class": instance.__class__.__name__, "class": instance.get_classname(),
"parameters": parameters, "parameters": parameters,
"parameters_per_groups": parameters_per_groups, "parameters_per_groups": parameters_per_groups,
"groups": list(set(groups))}) "groups": list(set(groups))})
......
...@@ -35,6 +35,7 @@ from inspect import getcallargs ...@@ -35,6 +35,7 @@ from inspect import getcallargs
from datetime import date as ddate from datetime import date as ddate
import jflow import jflow
import workflows
import jflow.utils as utils import jflow.utils as utils
from jflow.utils import validate_email from jflow.utils import validate_email
from pygraph.classes.digraph import digraph from pygraph.classes.digraph import digraph
...@@ -1062,7 +1063,7 @@ class Workflow(threading.Thread): ...@@ -1062,7 +1063,7 @@ class Workflow(threading.Thread):
except Exception as e: except Exception as e:
self._log("Component <{0}> cannot be loaded: {1}".format(modname, e), level="debug", traceback=traceback.format_exc()) self._log("Component <{0}> cannot be loaded: {1}".format(modname, e), level="debug", traceback=traceback.format_exc())
# finally import workflows shared packages # finally import workflows shared packages
workflows_dir = os.path.dirname(os.path.dirname(inspect.getfile(self.__class__))) workflows_dir = os.path.dirname(workflows.__file__)
for importer, modname, ispkg in pkgutil.iter_modules([os.path.join(workflows_dir, "components")], "workflows.components."): for importer, modname, ispkg in pkgutil.iter_modules([os.path.join(workflows_dir, "components")], "workflows.components."):
try: try:
m = __import__(modname) m = __import__(modname)
...@@ -1077,7 +1078,7 @@ class Workflow(threading.Thread): ...@@ -1077,7 +1078,7 @@ class Workflow(threading.Thread):
pckge = {} pckge = {}
parsers = [] parsers = []
# get exparsers # get exparsers
extparsers_dir = os.path.join( os.path.dirname(os.path.dirname(inspect.getfile(self.__class__))), 'extparsers' ) extparsers_dir = os.path.join( os.path.dirname(workflows.__file__), 'extparsers' )
for importer, modname, ispkg in pkgutil.iter_modules([extparsers_dir], "workflows.extparsers.") : for importer, modname, ispkg in pkgutil.iter_modules([extparsers_dir], "workflows.extparsers.") :
try : try :
m = __import__(modname) m = __import__(modname)
...@@ -1112,3 +1113,63 @@ class Workflow(threading.Thread): ...@@ -1112,3 +1113,63 @@ class Workflow(threading.Thread):
# an attribute for later use (i.e. during compile) # an attribute for later use (i.e. during compile)
for symbol in symbols: for symbol in symbols:
self.globals[symbol] = getattr(m, symbol) self.globals[symbol] = getattr(m, symbol)
def get_classname(self):
return self.__class__.__name__
class ComponentWorkflow(Workflow):
"""
A workflow build automatically and which will execute only one component.
The used components are the workflow shared components.
"""
def __init__(self, comp_object, *args, **kwargs):
self._comp_object = comp_object
Workflow.__init__(self, *args, **kwargs)
def get_description(self):
return "component workflow for %s"%self._comp_object.__name__
def get_name(self):
return self._comp_object.__name__.lower()
def process(self):
argspec = inspect.getargspec(self._comp_object.define_parameters)
args = argspec[0][1:]
kwargs = {}
for param in args:
for attribute_value in self.__dict__.values():
if (issubclass(attribute_value.__class__, AbstractParameter)) and param == attribute_value.name:
kwargs[param] = attribute_value
self.add_component(self._comp_object.__name__, [], kwargs)
def get_classname(self):
return self._comp_object.__name__
def define_parameters(self, function="process"):
argspec = inspect.getargspec(self._comp_object.define_parameters)
args = argspec[0][1:]
varargs = argspec[1] or []
keywords = argspec[2] or {}
defaults = argspec[3]
if defaults :
for i,e in enumerate(defaults) :
args[-1*(i+1)] = e
for i,e in enumerate(args) :
if defaults and e not in defaults:
args[i] = None
args.extend( varargs )
try :
self._comp_object.__dict__['define_parameters'](self, *args , **keywords)
except AttributeError, e: pass
def _serialize(self):
self.dump_path = os.path.join(self.directory, self.DUMP_FILE_NAME)
workflow_dump = open(self.dump_path, "wb")
pickle.dump(self, workflow_dump)
workflow_dump.close()
\ No newline at end of file
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
import pkgutil import pkgutil
import workflows import workflows
import workflows.components
import inspect import inspect
import jflow import jflow
import sys import sys
...@@ -85,6 +86,31 @@ class WorkflowsManager(object): ...@@ -85,6 +86,31 @@ class WorkflowsManager(object):
wf_instances.append(obj(function=function)) wf_instances.append(obj(function=function))
wf_methodes.append(function) wf_methodes.append(function)
except: pass except: pass
comp_wf_instances, comp_wf_methods = self.get_available_component_workflows(function)
wf_instances.extend(comp_wf_instances)
wf_methodes.extend(comp_wf_methods)
return [wf_instances, wf_methodes]
def get_available_component_workflows(self, function="process"):
if function.__class__.__name__ == "str":
functions = [function]
else:
functions = set(function)
wf_instances, wf_methodes = [], []
# Creates workflows with one component
for importer, modname, ispkg in pkgutil.iter_modules(workflows.components.__path__, workflows.components.__name__ + "."):
__import__(modname)
for class_name, comp_obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
if issubclass(comp_obj, jflow.component.Component) and comp_obj.__name__ != jflow.component.Component.__name__:
for function in functions:
for ifunction in inspect.getmembers(jflow.workflow.ComponentWorkflow, predicate=inspect.ismethod):
if ifunction[0] == function:
# try to build the workflow
try:
instance = jflow.workflow.ComponentWorkflow(comp_obj,function=function)
wf_instances.append(instance)
wf_methodes.append(function)
except: pass
return [wf_instances, wf_methodes] return [wf_instances, wf_methodes]
def rerun_workflow(self, workflow_id): def rerun_workflow(self, workflow_id):
...@@ -104,17 +130,36 @@ class WorkflowsManager(object): ...@@ -104,17 +130,36 @@ class WorkflowsManager(object):
return workflow return workflow
def run_workflow(self, workflow_class, args, function="process"): def run_workflow(self, workflow_class, args, function="process"):
workflow = None
# Load all modules within the workflow module # Load all modules within the workflow module
for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."): for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
__import__(modname) __import__(modname)
# Search for Workflow classes # Search for Workflow classes
for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass): for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
if class_name == workflow_class: workflow = obj(args, self.get_next_id(), function) if class_name == workflow_class: workflow = obj(args, self.get_next_id(), function)
if workflow is None:
return self.run_component_workflow(workflow_class, args, function)
workflow.start()
# Add the workflow dump path to the workflows dump
self._dump_workflows([workflow])
return workflow
def run_component_workflow(self, workflow_class, args, function="process"):
# load all components from workflows.components
for importer, modname, ispkg in pkgutil.iter_modules(workflows.components.__path__, workflows.components.__name__ + "."):
__import__(modname)
for class_name, comp_obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
if issubclass(comp_obj, jflow.component.Component) and comp_obj.__name__ != jflow.component.Component.__name__:
if class_name == workflow_class:
workflow = jflow.workflow.ComponentWorkflow(comp_obj, args, self.get_next_id(), function)
workflow.start() workflow.start()
# Add the workflow dump path to the workflows dump # Add the workflow dump path to the workflows dump
self._dump_workflows([workflow]) self._dump_workflows([workflow])
return workflow return workflow
def delete_workflow(self, workflow_id): def delete_workflow(self, workflow_id):
from jflow.workflow import Workflow from jflow.workflow import Workflow
WorkflowsManager.lock_workflows_file.acquire() WorkflowsManager.lock_workflows_file.acquire()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment