Maintenance - Mise à jour mensuelle Lundi 6 Avril 2020 entre 7h00 et 9h00

Commit 0bb7b52e authored by Penom Nom's avatar Penom Nom

single analysis workflows

parent d47603be
#
# Copyright (C) 2015 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import sys
import argparse
import time
try:
import _preamble
except ImportError:
sys.exc_clear()
from ng6.analyzes_manager import AnalyzesManager
from ng6_cli import JflowArgumentParser
from jflow.workflow import Workflow
import jflow.utils as utils
if __name__ == '__main__':
# Create a workflow manager to get access to our workflows
wfmanager = AnalyzesManager()
# Create the top-level parser
parser = JflowArgumentParser()
subparsers = parser.add_subparsers(title='Available sub commands')
# Add available pipelines
wf_instances, wf_methodes = wfmanager.get_available_workflows()
wf_classes = []
for instance in wf_instances:
wf_classes.append(instance.get_classname())
# create the subparser for each applications
sub_parser = subparsers.add_parser(instance.name, help=instance.description, fromfile_prefix_chars='@')
sub_parser.convert_arg_line_to_args = instance.__class__.config_parser
[parameters_groups, parameters_order] = instance.get_parameters_per_groups()
for group in parameters_order:
if group == "default":
for param in parameters_groups[group]:
sub_parser.add_argument(param.flag, **param.export_to_argparse())
elif group.startswith("exclude-"):
is_required = False
for param in parameters_groups[group]:
if param.required:
is_required = True
# an exlcusive parameter cannot be required, the require is at the group level
param.required = False
pgroup = sub_parser.add_mutually_exclusive_group(required=is_required)
for param in parameters_groups[group]:
pgroup.add_argument(param.flag, **param.export_to_argparse())
else:
pgroup = sub_parser.add_argument_group(group)
for param in parameters_groups[group]:
pgroup.add_argument(param.flag, **param.export_to_argparse())
sub_parser.set_defaults(cmd_object=instance.get_classname())
args = vars(parser.parse_args())
if args["cmd_object"] in wf_classes:
wfmanager.run_workflow(args["cmd_object"], args)
\ No newline at end of file
......@@ -287,7 +287,7 @@ class JFlowServer (object):
groups.append(param.group)
workflows.append({"name": instance.name,
"help": instance.description,
"class": instance.__class__.__name__,
"class": instance.get_classname(),
"parameters": parameters,
"parameters_per_groups": parameters_per_groups,
"groups": list(set(groups))})
......
......@@ -35,6 +35,7 @@ from inspect import getcallargs
from datetime import date as ddate
import jflow
import workflows
import jflow.utils as utils
from jflow.utils import validate_email
from pygraph.classes.digraph import digraph
......@@ -1062,7 +1063,7 @@ class Workflow(threading.Thread):
except Exception as e:
self._log("Component <{0}> cannot be loaded: {1}".format(modname, e), level="debug", traceback=traceback.format_exc())
# finally import workflows shared packages
workflows_dir = os.path.dirname(os.path.dirname(inspect.getfile(self.__class__)))
workflows_dir = os.path.dirname(workflows.__file__)
for importer, modname, ispkg in pkgutil.iter_modules([os.path.join(workflows_dir, "components")], "workflows.components."):
try:
m = __import__(modname)
......@@ -1077,7 +1078,7 @@ class Workflow(threading.Thread):
pckge = {}
parsers = []
# get exparsers
extparsers_dir = os.path.join( os.path.dirname(os.path.dirname(inspect.getfile(self.__class__))), 'extparsers' )
extparsers_dir = os.path.join( os.path.dirname(workflows.__file__), 'extparsers' )
for importer, modname, ispkg in pkgutil.iter_modules([extparsers_dir], "workflows.extparsers.") :
try :
m = __import__(modname)
......@@ -1111,4 +1112,65 @@ class Workflow(threading.Thread):
# Import symbols from module into global namespace, which we store as
# an attribute for later use (i.e. during compile)
for symbol in symbols:
self.globals[symbol] = getattr(m, symbol)
\ No newline at end of file
self.globals[symbol] = getattr(m, symbol)
def get_classname(self):
return self.__class__.__name__
class ComponentWorkflow(Workflow):
"""
A workflow with a single component
"""
def __init__(self, comp_object, *args, **kwargs):
self._comp_object = comp_object
Workflow.__init__(self, *args, **kwargs)
def get_description(self):
return "%sWorkflow"%self._comp_object.__name__
def get_name(self):
return self._comp_object.__name__.lower()
def process(self):
argspec = inspect.getargspec(self._comp_object.define_parameters)
args = argspec[0][1:]
kwargs = {}
for param in args:
for attribute_value in self.__dict__.values():
if (issubclass(attribute_value.__class__, AbstractParameter)) and param == attribute_value.name:
kwargs[param] = attribute_value
self.add_component(self._comp_object.__name__, [], kwargs)
def get_classname(self):
return self._comp_object.__name__
def define_parameters(self, function="process"):
argspec = inspect.getargspec(self._comp_object.define_parameters)
args = argspec[0][1:]
varargs = argspec[1] or []
keywords = argspec[2] or {}
defaults = argspec[3]
if defaults :
for i,e in enumerate(defaults) :
args[-1*(i+1)] = e
for i,e in enumerate(args) :
if defaults and e not in defaults:
args[i] = None
args.extend( varargs )
try :
self._comp_object.__dict__['define_parameters'](self, *args , **keywords)
except AttributeError, e: pass
def _serialize(self):
self.dump_path = os.path.join(self.directory, self.DUMP_FILE_NAME)
workflow_dump = open(self.dump_path, "wb")
pickle.dump(self, workflow_dump)
workflow_dump.close()
def get_comp_object(self):
return self._comp_object
\ No newline at end of file
......@@ -17,6 +17,7 @@
import pkgutil
import workflows
import workflows.components
import inspect
import jflow
import sys
......@@ -228,3 +229,44 @@ class WorkflowsManager(object):
finally:
WorkflowsManager.lock_ids_file.release()
return cid
class ComponentsManager(WorkflowsManager):
def __init__(self):
WorkflowsManager.__init__(self)
def get_available_workflows(self, function="process"):
if function.__class__.__name__ == "str":
functions = [function]
else:
functions = set(function)
wf_instances, wf_methodes = [], []
# Creates workflows with one component
for importer, modname, ispkg in pkgutil.iter_modules(workflows.components.__path__, workflows.components.__name__ + "."):
__import__(modname)
for class_name, comp_obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
if issubclass(comp_obj, jflow.component.Component) and comp_obj.__name__ != jflow.component.Component.__name__:
for function in functions:
for ifunction in inspect.getmembers(jflow.workflow.ComponentWorkflow, predicate=inspect.ismethod):
if ifunction[0] == function:
# try to build the workflow
try:
instance = jflow.workflow.ComponentWorkflow(comp_obj,function=function)
wf_instances.append(instance)
wf_methodes.append(function)
except: pass
return [wf_instances, wf_methodes]
def run_workflow(self, workflow_class, args, function="process"):
# load all analyzes from workflows.components
for importer, modname, ispkg in pkgutil.iter_modules(workflows.components.__path__, workflows.components.__name__ + "."):
__import__(modname)
for class_name, comp_obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
if issubclass(comp_obj, jflow.component.Component) and comp_obj.__name__ != jflow.component.Component.__name__:
if class_name == workflow_class:
workflow = jflow.workflow.ComponentWorkflow(comp_obj, args, self.get_next_id(), function)
workflow.start()
# Add the workflow dump path to the workflows dump
self._dump_workflows([workflow])
return workflow
#
# Copyright (C) 2015 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import pkgutil
import inspect
import workflows.components
import sys
import analysis
import ng6workflow
from jflow.workflows_manager import ComponentsManager
class AnalyzesManager(ComponentsManager):
def __init__(self):
ComponentsManager.__init__(self)
def get_available_workflows(self, function="process"):
instances, methodes = ComponentsManager.get_available_workflows(self)
wf_instances, wf_methodes = [], []
for i, wfinstance in enumerate(instances) :
if issubclass(wfinstance.get_comp_object(), analysis.Analysis):
wf_instances.append(ng6workflow.AnalysisWorkflow(wfinstance.get_comp_object(), function=methodes[i]))
wf_methodes.append(methodes[i])
return [wf_instances, wf_methodes]
def run_workflow(self, workflow_class, args, function="process"):
# load all analyzes from workflows.components
for importer, modname, ispkg in pkgutil.iter_modules(workflows.components.__path__, workflows.components.__name__ + "."):
__import__(modname)
for class_name, comp_obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
if issubclass(comp_obj, analysis.Analysis) and comp_obj.__name__ != analysis.Analysis.__name__:
if class_name == workflow_class:
workflow = ng6workflow.AnalysisWorkflow(comp_obj, args, self.get_next_id(), function)
workflow.start()
# Add the workflow dump path to the workflows dump
self._dump_workflows([workflow])
return workflow
\ No newline at end of file
......@@ -24,8 +24,9 @@ import datetime
import argparse
import xml.etree.ElementTree as ET
from jflow.workflow import Workflow
from jflow.workflow import Workflow, ComponentWorkflow
from jflow.utils import display_error_message
from jflow.parameter import *
from ng6.t3MySQLdb import t3MySQLdb
from ng6.project import Project
......@@ -115,6 +116,63 @@ class BasicNG6Workflow (Workflow):
if infos['email'] :
self.set_to_address(infos['email'])
class AnalysisWorkflow(BasicNG6Workflow, ComponentWorkflow):
"""
A workflow with a single analysis
"""
def __init__(self, comp_object, args={}, id=None, function= "process"):
self._comp_object = comp_object
ComponentWorkflow.__init__(self, comp_object, args, id, function)
BasicNG6Workflow.__init__(self, args, id, function)
self.add_parameter("project_id", "The project id the analysis belongs to", type = 'int', required = True)
self.add_parameter("run_id", "The run id the analysis belongs to", type = 'int', required = True)
self.add_parameter("parent_analysis", "The id of an analysis to be used as a parent analysis", type= 'int')
self.add_exclusion_rule( "project_id", "run_id" )
def process(self):
self.project = None
self.runobj = None
# Check if user provided a project or a run
if self.project_id :
self.project = Project.get_from_id(self.project_id)
self.metadata.append("project_id="+str(self.project_id))
# if user is not allowed to add data on project (is not admin)
if self.project is not None and not self.project.is_admin(self.admin_login):
display_error_message( "The user login '" + self.admin_login + "' is not allowed to add data on project '" + self.project.name + "'.\n" )
elif self.run_id :
self.runobj = Run.get_from_id(self.run_id)
self.metadata.append("run_id="+str(self.run_id))
addto = "project" if self.project else "run"
parent = None
if self.parent_analysis :
parent = Analysis.get_from_id(self.parent_analysis)
if self.project :
analyses_ids = [ o.id for o in self.project.get_analysis()]
else :
analyses_ids = [ o.id for o in self.runobj.get_analysis()]
if parent.id not in analyses_ids :
display_error_message("The parent analysis associated with id '%s' does not belong to the %s id '%s' " % (parent.id, addto, self.project_id or self.run_id) )
argspec = inspect.getargspec(self._comp_object.define_parameters)
args = argspec[0][1:]
kwargs = {}
for param in args:
for attribute_value in self.__dict__.values():
if (issubclass(attribute_value.__class__, AbstractParameter)) and param == attribute_value.name:
kwargs[param] = attribute_value
self.add_component(self._comp_object.__name__, [], kwargs, addto=addto, parent = parent)
def post_process(self):
if self.runobj:
self.runobj.sync()
elif self.project:
self.project.sync()
class NG6Workflow (BasicNG6Workflow):
def __init__(self, args={}, id=None, function= "process"):
......
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from jflow.component import Component
from ng6.analysis import Analysis
class BasicAnalyse (Analysis):
def define_parameters(self, files, name, description, software, options, version, compression="none",
delete=False, archive_name=None):
#self.add_input_file_list( "files", "The files to be archived.", default=files, required=True )
self.add_parameter_list( "input_files", "The files to be archived.", default=files, required=True )
self.add_parameter( "analysis_name", "The analysis name", default=name, required = True )
self.add_parameter( "analysis_description", "Analysis description", default=description, required = True )
self.add_parameter( "analysis_software", "The analysis software", default=software, required = True )
self.add_parameter( "analysis_software_options", "The analysis software options", default=options, required = True )
self.add_parameter( "analysis_software_version", "The analysis software version", default=version, required = True )
self.add_parameter( "data_compression", "File compression once archived", default=compression)
self.add_parameter( "delete", "Should the input data be deleted once the process is over", type = bool, default=delete)
self.add_parameter( "archive_name", "Give a name to the final archive", default=archive_name)
self.is_editable = True
def define_analysis(self):
self.name = self.analysis_name
self.description = self.analysis_description
self.software = self.analysis_software
self.options = self.analysis_software_options
def process(self): pass
def get_version(self):
return self.analysis_software_version
def post_process(self):
self._archive_files(self.files, self.compression, self.archive_name, self.delete)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment