Maintenance - Mise à jour mensuelle Lundi 6 Avril 2020 entre 7h00 et 9h00

Commit e3162b73 authored by Penom Nom's avatar Penom Nom

update jflow

parent df53f573
......@@ -88,7 +88,7 @@
</ul>
</li>
<li><a target="_blank" href="http://www.cse.nd.edu/~ccl/software/makeflow/">Makeflow</a>: a workflow
engine for executing large complex workflows on clusters, clouds, and grids. The supported versions are v4.2.2 or higher.</li>
engine for executing large complex workflows on clusters, clouds, and grids. The supported versions are v4.4.3 or higher.</li>
<li>An existing <a target="_blank" href="http://www.mysql.fr/">mysql</a> database: the database should be accessible from
the application with a user/password with create/truncate privileges.</li>
<li>an <a target="_blank" href="http://httpd.apache.org/download.cgi">apache</a> WEB server and the php-mysql module.</li>
......
......@@ -33,6 +33,8 @@ from jflow.abstraction import MultiMap
from weaver.util import parse_string_list
from weaver.function import ShellFunction
from weaver.abstraction import Map
from weaver.function import PythonFunction
class Component(object):
......@@ -137,6 +139,63 @@ class Component(object):
self.params_order.append(name)
self.__setattr__(name, new_param)
def add_input_object(self, name, help, default=None, required=False):
new_param = InputObject(name, help, default=default, required=required)
# store where the parameter is coming from
new_param.linkTrace_nameid = self.get_nameid()
if issubclass( default.__class__, list ):
for idx, val in enumerate(default):
if hasattr( val, "linkTrace_nameid" ):
if not val.linkTrace_nameid in new_param.parent_linkTrace_nameid:
new_param.parent_linkTrace_nameid.append(val.linkTrace_nameid)
new_param.default[idx].parent_linkTrace_nameid = [val.linkTrace_nameid]
new_param.default[idx].linkTrace_nameid = self.get_nameid()
elif hasattr( default, "linkTrace_nameid" ):
new_param.parent_linkTrace_nameid = [default.linkTrace_nameid]
new_param.default.parent_linkTrace_nameid = [default.linkTrace_nameid]
new_param.default.linkTrace_nameid = self.get_nameid()
# add it to the class itself
self.params_order.append(name)
self.__setattr__(name, new_param)
def add_input_object_list(self, name, help, default=None, required=False):
if default == None: default = []
new_param = InputObjectList(name, help, default=default, required=required)
# store where the parameter is coming from
new_param.linkTrace_nameid = self.get_nameid()
for idx, val in enumerate(new_param.default):
if hasattr( val, "linkTrace_nameid" ):
if not val.linkTrace_nameid in new_param.parent_linkTrace_nameid:
new_param.parent_linkTrace_nameid.append(val.linkTrace_nameid)
new_param.default[idx].parent_linkTrace_nameid = [val.linkTrace_nameid]
new_param.default[idx].linkTrace_nameid = self.get_nameid()
# add it to the class itself
self.params_order.append(name)
self.__setattr__(name, new_param)
def add_output_object(self, name, help, required=False):
new_param = OutputObject(name, help, required=required)
# store where the parameter is coming from
new_param.linkTrace_nameid = self.get_nameid()
new_param.default.linkTrace_nameid = self.get_nameid()
# add it to the class itself
self.params_order.append(name)
self.__setattr__(name, new_param)
def add_output_object_list(self, name, help, nb_items=0, required=False):
new_param = OutputObjectList(name, help, nb_items=nb_items, required=required)
# store where the parameter is coming from
new_param.linkTrace_nameid = self.get_nameid()
for idx, val in enumerate(new_param.default):
new_param.default[idx].linkTrace_nameid = self.get_nameid()
# add it to the class itself
self.params_order.append(name)
self.__setattr__(name, new_param)
def add_output_file(self, name, help, file_format="any", filename=None, group="default", display_name=None,
cmd_format="", argpos=-1):
filename = os.path.basename(filename)
......@@ -387,3 +446,105 @@ class Component(object):
def __eq__(self, other):
return self.__class__ == other.__class__ and self.prefix == other.prefix
def __getattribute__(self, attr):
# an IOobject is a specific object defined by the presence of the dump_path attribute
if hasattr(object.__getattribute__(self, attr), "default"):
if isinstance (object.__getattribute__(self, attr).default, OObject) and os.path.exists(object.__getattribute__(self, attr).default.dump_path):
object.__getattribute__(self, attr).default=object.__getattribute__(self, attr).default.load()
if hasattr(object.__getattribute__(self, attr).default, "is_ioobject"):
return object.__getattribute__(self, attr).default
elif isinstance(object.__getattribute__(self, attr).default, list) and len(object.__getattribute__(self, attr).default)>0:
if isinstance(object.__getattribute__(self, attr).default[0], OObject):
for i, val in enumerate (object.__getattribute__(self, attr).default):
if os.path.exists(val.dump_path):
object.__getattribute__(self, attr).default[i]=val.load()
if hasattr(object.__getattribute__(self, attr).default[0], "is_ioobject"):
return object.__getattribute__(self, attr).default
return object.__getattribute__(self, attr)
def __generate_iolist (self, ioparameter, map):
new_ios = []
includes = []
if map :
if len (ioparameter) >0 :
if isinstance(ioparameter[0], list):
for cin in ioparameter:
if hasattr(cin[0], "is_ioobject"):
new_ios.append([i.dump_path for i in cin])
else:
new_ios.append(cin)
else:
for cin in ioparameter:
if hasattr(cin, "is_ioobject"):
new_ios.append(cin.dump_path)
else:
new_ios.append(cin)
else :
new_ios = []
if hasattr(ioparameter, "is_ioobject"):
includes.extend(ioparameter.includes)
new_ios.append(ioparameter.dump_path)
elif isinstance(ioparameter, list):
for cin in ioparameter:
if hasattr(cin, "is_ioobject"):
includes.extend(cin.includes)
new_ios.append(cin.dump_path)
else:
new_ios.append(cin)
else:
new_ios = ioparameter
return new_ios,includes
def add_python_execution(self, function, inputs=[], outputs=[], arguments=[], includes=[],
add_path=None, collect=False, local=False, map=False, cmd_format=""):
if map:
if arguments != [] :
logging.getLogger("jflow").exception("add_python_execution: '" + function.__name__ + "' arguments parameter not allowed with map!")
raise Exception("add_python_execution: '" + function.__name__ + "' arguments parameter not allowed with map!" )
if not issubclass(inputs.__class__, list) or not issubclass(outputs.__class__, list):
logging.getLogger("jflow").exception("add_python_execution: '" + function.__name__ + "' map requires a list as inputs and output!")
raise Exception("add_python_execution: '" + function.__name__ + "' map requires a list as inputs and output!")
#Command format to build
if cmd_format == "" :
cmd_format = "{EXE} "
if len(arguments)>0:
cmd_format += " {ARG}"
if (isinstance(inputs, list) and len(inputs)>0) or (inputs is not None and inputs != []):
cmd_format += " {IN}"
if (isinstance(outputs, list) and len(outputs)>0) or (outputs is not None and outputs != []):
cmd_format += " {OUT}"
py_function = PythonFunction(function, add_path=add_path, cmd_format=cmd_format)
new_inputs,includes_in = self.__generate_iolist(inputs, map)
new_outputs,includes_out = self.__generate_iolist(outputs, map)
if not isinstance(includes, list):
includes=[includes]
if map:
MultiMap(py_function, inputs=new_inputs, outputs=new_outputs, includes=includes+includes_in, collect=collect, local=local)
else:
py_function(inputs=new_inputs, outputs=new_outputs, arguments=arguments, includes=includes+includes_in)
def add_shell_execution(self, source, inputs=[], outputs=[], arguments=[], includes=[],
cmd_format=None, map=False, shell=None, collect=False, local=False):
shell_function = ShellFunction( source, shell=shell, cmd_format=cmd_format )
# if abstraction is map or multimap
if map :
# if input and output are list or filelist
if issubclass(inputs.__class__, list) and issubclass(outputs.__class__, list) :
# arguments cannot be set with
if arguments != [] :
logging.getLogger("jflow").exception("add_shell_execution: '" + source + "' arguments parameter not allowed with map")
raise Exception("add_shell_execution: '" + source + "' arguments parameter not allowed with map" )
MultiMap(shell_function,inputs=inputs, outputs=outputs, includes=includes, collect=collect, local=local)
else :
logging.getLogger("jflow").exception("add_shell_execution: '" + source + "' map requires a list as inputs and output")
raise Exception("add_shell_execution: '" + source + "' map requires a list as inputs and output")
else :
shell_function( inputs=inputs, outputs=outputs, arguments=arguments, includes=includes )
......@@ -28,6 +28,7 @@ from argparse import _ensure_value
import urllib2
import copy as _copy
from urlparse import urlparse
import pickle
from jflow.config_reader import JFlowConfigReader
from jflow.utils import get_octet_string_representation, get_nb_octet, display_error_message
......@@ -440,8 +441,8 @@ class MultiParameter(dict, AbstractParameter):
self.help = self.global_help + self.type.get_help()
param.flag = param_flag
self.sub_parameters.append(param)
class MultiParameterList(list, AbstractParameter):
def __init__(self, name, help, required=False, flag=None, group="default", display_name=None, cmd_format="", argpos=-1):
......@@ -476,6 +477,23 @@ class MultiParameterList(list, AbstractParameter):
self.help = self.global_help + self.type.get_help()
param.flag = param_flag
self.sub_parameters.append(param)
def __getitem__(self, key):
getitem = self.__dict__.get("__getitem__", list.__getitem__)
if isinstance(key, int):
return getitem(self, key)
else :
if len(self) > 0:
if getitem(self, 0).has_key(key):
res=[]
for mparam in self :
if isinstance(getitem(self, 0)[key], list):
res.extend(mparam[key])
else:
res.append(mparam[key])
return res
else:
return []
class ParameterFactory(object):
......@@ -1090,4 +1108,155 @@ class OutputFilesPattern(DynamicOutput):
output_files.append( IOFile(os.path.join(self.output_directory, file), self.file_format, self.linkTrace_nameid, None) )
elif not self.include and re.search( self.pattern, file ) is None:
output_files.append( IOFile(os.path.join(self.output_directory, file), self.file_format, self.linkTrace_nameid, None) )
return list.__init__(self, output_files)
\ No newline at end of file
return list.__init__(self, output_files)
class IOObject(object):
IOOBJECT_EXT = ".ioobj"
@staticmethod
def add_required_attributs(obj):
jflowconf = JFlowConfigReader()
obj.is_ioobject = True
obj.dump_path = os.path.join(jflowconf.get_tmp_directory(), os.path.basename(tempfile.NamedTemporaryFile(suffix=IOObject.IOOBJECT_EXT).name))
obj.includes = []
obj.linkTrace_nameid = None
obj.parent_linkTrace_nameid = []
def __init__(self):
IOObject.add_required_attributs(self)
class IObjectList(list, IOObject):
def __new__(self, value):
val = list.__new__(self)
val.extend(value)
return val
def __init__(self, value):
IOObject.__init__(self)
for cobj in value:
if isinstance(cobj, OObject):
self.includes.append(cobj.dump_path)
if isinstance(cobj, InputFile) or isinstance(cobj, OutputFile) :
self.includes.append(cobj)
if isinstance(cobj, list): # list of list
for elt in cobj:
if issubclass(elt.__class__, AbstractIOFile) and issubclass(elt.__class__, ParameterList) :
self.includes.extend(elt)
elif issubclass(elt.__class__, AbstractIOFile) :
self.includes.append(elt)
if isinstance(cobj, dict): # list of dict
for ckey,cval in cobj.items():
if issubclass(cval.__class__, AbstractIOFile) and issubclass(cval.__class__, ParameterList) :
self.includes.extend(cval)
elif issubclass(cval.__class__, AbstractIOFile) :
self.includes.append(cval)
class IObjectDict(dict, IOObject):
def __new__(self, value):
val = dict.__new__(self, value)
for ckey in value.keys(): val[ckey] = value[ckey]
return val
def __init__(self, value):
IOObject.__init__(self)
for ckey,value in value.items():
if isinstance(value, OObject):
self.includes.append(value.dump_path)
if isinstance(value, InputFile) or isinstance(value, OutputFile) :
self.includes.append(value)
if isinstance(value, list): # list of list
if issubclass(value.__class__, AbstractIOFile) and issubclass(value.__class__, ParameterList) :
self.includes.extend(value)
elif issubclass(value.__class__, AbstractIOFile) :
self.includes.append(value)
if isinstance(value, dict): # list of dict
for k,cval in cobj.items():
if issubclass(cval.__class__, AbstractIOFile) and issubclass(cval.__class__, ParameterList) :
self.includes.extend(cval)
elif issubclass(cval.__class__, AbstractIOFile) :
self.includes.append(cval)
class IObjectFactory(object):
@staticmethod
def factory(obj):
new_obj = None
to_pickle = True
if isinstance(obj, OObject):
to_pickle = False
new_obj = obj
elif isinstance(obj, list):
new_obj = IObjectList(obj)
elif isinstance(obj, dict):
new_obj = IObjectDict(obj)
else:
new_obj = obj
IOObject.add_required_attributs(new_obj)
if to_pickle:
ioobjh = open(new_obj.dump_path, "w")
pickle.dump(new_obj, ioobjh)
ioobjh.close()
return new_obj
class OObject(IOObject):
def load(self):
if os.path.exists(self.dump_path):
fh = open(self.dump_path, 'rb')
o=pickle.load(fh)
fh.close()
return o
class InputObject(AbstractParameter, LinkTraceback):
def __init__( self, name, help, default=None, required=False):
new_object = IObjectFactory.factory(default)
AbstractParameter.__init__(self, name, help, default=new_object, type="object", required=required, action="store")
LinkTraceback.__init__(self)
class InputObjectList(ParameterList, LinkTraceback) :
def __init__ (self, name, help, default=None, required=False) :
new_default = []
if isinstance(default, list):
if len(default)>0 :
objects_class = default[0].__class__.__name__
for cobj in default:
if objects_class != cobj.__class__.__name__:
raise Exception('All object in an InputObjectList should have the same type!')
for obj in default:
new_obj = IObjectFactory.factory(obj)
new_default.append(new_obj)
else :
new_obj = IObjectFactory.factory(default)
new_default = [new_obj]
ParameterList.__init__(self, name, help, default=new_default, type="object", required=required)
LinkTraceback.__init__(self)
class OutputObject(AbstractParameter, LinkTraceback):
def __init__( self, name, help, required=False):
AbstractParameter.__init__(self, name, help, default=OObject(), type="object", required=required, action="store")
LinkTraceback.__init__(self)
class OutputObjectList(ParameterList, LinkTraceback) :
def __init__ (self, name, help, nb_items=0, required=False) :
new_default = [ OObject() for i in range(nb_items) ]
ParameterList.__init__(self, name, help, default=new_default, type="object", required=required)
LinkTraceback.__init__(self)
......@@ -246,8 +246,7 @@ class FastaReader(object):
David Benjamin Gordon).
"""
wholefile = self.fp.read()
assert '\r' not in wholefile, "Sorry, currently don't know how to deal with files that contain \\r linebreaks"
assert len(wholefile) == 0 or wholefile[0] == '>', "FASTA file must start with '>'"
assert len(wholefile) != 0 and wholefile[0] == '>', "FASTA file must start with '>'"
parts = wholefile.split('\n>')
# first part has '>' in front
parts[0] = parts[0][1:]
......
......@@ -143,7 +143,8 @@ class Workflow(threading.Thread):
# define the parameters
self.params_order = []
self.define_parameters(self.function)
if self.function != None:
self.define_parameters(self.function)
# add the metadata parameter
self.metadata = []
......@@ -417,6 +418,10 @@ class Workflow(threading.Thread):
for parent in ioparameter.parent_linkTrace_nameid:
try: gr.add_edge((parent, ioparameter.linkTrace_nameid))
except: pass
elif issubclass( ioparameter.__class__, InputObject) or issubclass( ioparameter.__class__, InputObjectList):
for parent in ioparameter.parent_linkTrace_nameid:
try: gr.add_edge((parent, ioparameter.linkTrace_nameid))
except: pass
# check if all nodes are connected
for edge in gr.edges():
if all_nodes.has_key(edge[0]):
......
......@@ -210,11 +210,55 @@ class PythonFunction(ScriptFunction):
"""
PYTHON_VERSION = 'python{0}.{1}'.format(sys.version_info[0], sys.version_info[1])
PYTHON_TEMPLATE = '''#!/usr/bin/env {0}
import pickle
import {{0}}
{{1}}
from jflow.parameter import IOObject
def prepare_arguments(*args):
ioobj_ext = IOObject.IOOBJECT_EXT
new_args = []
output_path = []
# load input object
for i, arg in enumerate(args):
if arg.endswith(ioobj_ext):
if os.path.exists(arg):
ioobjh = open(arg, 'rb')
try:
ioobj = pickle.load(ioobjh)
if isinstance(ioobj, list):
for j, cioobj in enumerate(ioobj):
if (hasattr(cioobj, "is_ioobject")):
cioobjh = open(cioobj.dump_path, 'rb')
ioobj[j] = pickle.load(cioobjh)
cioobjh.close()
finally:
ioobjh.close()
new_args.append(ioobj)
else :
output_path.append(arg)
else :
new_args.append(arg)
return (new_args, output_path)
{{2}}
if __name__ == '__main__':
{{3}}(*sys.argv[1:])
(new_args, output_path) = prepare_arguments(*sys.argv[1:])
outputs = {{3}}(*new_args)
if not isinstance(outputs, list): outputs = [outputs]
# in case the returned object is a table
if len(output_path) == 1 and len(outputs) != len(output_path):
outputs = [outputs]
# otherwise the number does not match
elif len(outputs) != len(output_path) and len (output_path) != 0:
raise Exception( "The number of object returned by the function is different from the number of outputs specified!")
if len (output_path) > 0 :
for i, obj in enumerate(outputs):
objh = open(output_path[i], "w")
pickle.dump(obj, objh)
objh.close()
'''.format(PYTHON_VERSION)
def __init__(self, function, add_path=None, executable=None, cmd_format=None):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment