# # Copyright (C) 2012 INRA # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import inspect import os import re import sys import uuid import pkgutil import tempfile import pickle import time import threading import urllib2 import types import logging from logging import FileHandler from ConfigParser import ConfigParser, NoOptionError from inspect import getcallargs from urlparse import urlparse import jflow import jflow.utils as utils from jflow.workflows_manager import WorkflowsManager, JFlowConfigReader from jflow.parameter import * from weaver.script import ABSTRACTIONS from weaver.script import DATASETS from weaver.script import FUNCTIONS from weaver.script import NESTS from weaver.script import OPTIONS from weaver.script import STACKS from weaver.nest import Nest from weaver.options import Options from cctools.makeflow import MakeflowLog from cctools.makeflow.log import Node class Workflow(threading.Thread): """ """ PROPERTIES_FILE_NAME = "workflow.properties" MAKEFLOW_LOG_FILE_NAME = "Makeflow.makeflowlog" DUMP_FILE_NAME = ".workflow.dump" WORKING = ".working" STATUS_STARTED = "started" STATUS_COMPLETED = "completed" STATUS_FAILED = "failed" STATUS_ABORTED = "aborted" def __init__(self, args={}, id=None, function= "process", parameters_section=None): """ """ # define as a thread threading.Thread.__init__(self) self.jflow_config_reader = JFlowConfigReader() self.manager = WorkflowsManager() self.components_to_exec = [] self.components = [] self.makes = {} self.globals = {} self.options = Options() self.status = self.STATUS_STARTED self.start_time = None self.end_time = None self.step = None self.stderr = None self.dynamic_component_present = False self.function = function self.parameters_section = parameters_section self.comp_pckg = self._import_components() # intruduce --log-verbose to be able to monitor the new version of makeflow >=4.2.2 self.engine_arguments = ' --log-verbose ' self.component_nameids_is_init = False self.component_nameids = {} # try to parse engine arguments try: type, options, limit_submission = self.jflow_config_reader.get_batch() if limit_submission : self.engine_arguments += ' -J ' + str(limit_submission) if type: self.engine_arguments += ' -T ' + type if options : self.engine_arguments += ' -B "' + options + '"' except: self.engine_arguments = None self.id = id self.name = self.get_name() self.description = self.get_description() # define the parameters self.params_order = [] self.define_parameters(self.parameters_section) # add the metadata parameter self.add_parameter("metadata", "Which metadata should be linked to this workflow", is_list=True) # if some args are provided, let's fill the parameters if args: self._set_parameters(args) if self.id is not None: self.directory = self.manager.get_workflow_directory(self.name, self.id) if not os.path.isdir(self.directory): os.makedirs(self.directory, 0751) if self.stderr is None: self.set_stderr() self._serialize() def add_input_file(self, name, help, file_format="any", default=None, type=types.StringType, choices=None, required=False, flag=None, group="default", display_name=None, is_list=False, add_to=None): if is_list: if default == None: default = [] new_param = InputFileList(name, help, flag=flag, file_format=file_format, default=default, type=type, choices=choices, required=required, group=group, display_name=display_name) else: new_param = InputFile(name, help, flag=flag, file_format=file_format, default=default, type=type, choices=choices, required=required, group=group, display_name=display_name) # if this input should be added to a particular parameter if add_to: try: self.__getattribute__(add_to).add_sub_parameter(new_param) except: pass # otherwise, add it to the class itself else: self.params_order.append(name) self.__setattr__(name, new_param) def add_output_file(self, name, help, file_format="any", default=None, type=types.StringType, choices=None, required=False, flag=None, group="default", display_name=None, is_list=False, add_to=None): if is_list: if default == None: default = [] new_param = OutputFileList(name, help, flag=flag, file_format=file_format, default=default, type=type, choices=choices, required=required, group=group, display_name=display_name) else: new_param = OutputFile(name, help, flag=flag, file_format=file_format, default=default, type=type, choices=choices, required=required, group=group, display_name=display_name) # if this input should be added to a particular parameter if add_to: try: self.__getattribute__(add_to).add_sub_parameter(new_param) except: pass # otherwise, add it to the class itself else: self.params_order.append(name) self.__setattr__(name, new_param) def add_multiple_parameter(self, name, help, required=False, flag=None, group="default", display_name=None, is_list=False): self.params_order.append(name) if is_list: new_param = MultiParameterList(name, help, flag=flag, required=required, group=group, display_name=display_name) else: new_param = MultiParameter(name, help, flag=flag, required=required, group=group, display_name=display_name) self.__setattr__(name, new_param) def add_parameter(self, name, help, default=None, type=types.StringType, choices=None, required=False, flag=None, group="default", display_name=None, is_list=False, add_to=None): if is_list: if default == None: default = [] new_param = ParameterList(name, help, flag=flag, default=default, type=type, choices=choices, required=required, group=group, display_name=display_name) else: new_param = Parameter(name, help, flag=flag, default=default, type=type, choices=choices, required=required, group=group, display_name=display_name) # if this input should be added to a particular parameter if add_to: try: self.__getattribute__(add_to).add_sub_parameter(new_param) except: pass # otherwise, add it to the class itself else: self.params_order.append(name) self.__setattr__(name, new_param) def add_exclusion_rule(self, *args2exclude): # first of all, are this parameter exist params2exclude = [] for arg2exclude in args2exclude: try: params2exclude.append(self.__getattribute__(arg2exclude)) except: pass # everything is ok, let's go if len(params2exclude) == len(args2exclude): new_group = "exclude-"+uuid.uuid4().hex[:5] for paramsexclude in params2exclude: paramsexclude.group = new_group # it might be a mutliple param rule else: for attribute_value in self.__dict__.values(): if issubclass(attribute_value.__class__, MultiParameter) or issubclass(attribute_value.__class__, MultiParameterList): params2exclude = [] for sub_param in attribute_value.sub_parameters: if sub_param.name in args2exclude: params2exclude.append(sub_param) if len(params2exclude) == len(args2exclude): new_group = "exclude-"+uuid.uuid4().hex[:5] flags2exclude = [] for paramsexclude in params2exclude: paramsexclude.group = new_group flags2exclude.append(paramsexclude.flag) attribute_value.type.excludes[new_group] = flags2exclude break def _set_parameters(self, args): parameters = self.get_parameters() for param in parameters: if args[param.name]: try: args[param.name] = args[param.name].encode('ascii','ignore') except: pass if param.__class__ == Parameter: new_param = Parameter(param.name, param.help, default=args[param.name], type=param.type, choices=param.choices, required=param.required, flag=param.flag, group=param.group, display_name=param.display_name) self.__setattr__(param.name, new_param) elif param.__class__ == InputFile: new_param = InputFile(param.name, param.help, file_format=param.file_format, default=args[param.name], type=param.type, choices=param.choices, required=param.required, flag=param.flag, group=param.group, display_name=param.display_name) self.__setattr__(param.name, new_param) elif param.__class__ == ParameterList or param.__class__ == InputFileList: self.__getattribute__(param.name).extend(args[param.name]) elif param.__class__ == MultiParameter: sub_args = {} for sarg in args[param.name]: sub_args[sarg[0]] = sarg[1] for sub_param in param.sub_parameters: if sub_args.has_key(sub_param.flag): self.__getattribute__(param.name)[sub_param.name] = sub_args[sub_param.flag] else: self.__getattribute__(param.name)[sub_param.name] = self.__getattribute__(param.name).default[sub_param.name] def _download_urlfile(self, input): try: uri_object = urlparse(input) opener = urllib2.urlopen(input) block_size = 8000 jflowconf = JFlowConfigReader() tmp_directory = os.path.join(jflowconf.get_tmp_directory(), os.path.basename(tempfile.NamedTemporaryFile().name)) os.mkdir(tmp_directory) file_path = os.path.join(tmp_directory, os.path.basename(uri_object.path)) local_file = open(file_path, 'wb') if os.path.basename(uri_object.path) is not None and os.path.basename(uri_object.path) != "": metadata = opener.info() file_size = int(metadata.getheaders("Content-Length")[0]) while True: buffer = opener.read(block_size) # End of download if not buffer: break # Parts of download local_file.write(buffer) local_file.close() logging.getLogger("types.urlfile").debug("URL file '{0}' successfully downloaded as: {1}".format(input, file_path)) return [file_path, True] except: return [input, False] def _prepare_input_files(self, parameters, args): kparams = {} for param in parameters: kparams[param.name] = param new_args = args for param in args.keys(): is_uri, is_local = False, False # handle url inputs if kparams[param].type.__name__ == "inputfile" or kparams[param].type.__name__ == "urlfile": if args[param].__class__.__name__ == "list": for i, val in enumerate(args[param]): new_args[param][i], is_uri = self._download_urlfile(args[param][i]) else: new_args[param], is_uri = self._download_urlfile(args[param]) # handle upload inputs try: is_local = os.path.isfile(args[param]) except: is_local = False if not is_uri and not is_local and (kparams[param].type.__name__ == "inputfile" or kparams[param].type.__name__ == "browsefile"): if args[param].__class__.__name__ == "list": for i, val in enumerate(args[param]): if args[param][i]: new_args[param][i] = os.path.join(self.jflow_config_reader.get_tmp_directory(), (args[param][i])) else: if args[param]: new_args[param] = os.path.join(self.jflow_config_reader.get_tmp_directory(), (args[param])) return new_args def set_stderr(self): if hasattr(self, "stderr") and self.stderr is not None: os.rename( self.stderr, os.path.join(self.directory, str(time.time()) + "wf_stderr.old") ) stderr = os.path.join(self.directory, "wf_stderr.txt") logger = logging.getLogger( "wf." + str(self.id) ) logger.setLevel(logging.DEBUG) formatter = logging.Formatter( '%(asctime)s :: %(message)s' ) file_handler = FileHandler(stderr, 'a') file_handler.setLevel(logging.ERROR) file_handler.setFormatter(formatter) logger.addHandler(file_handler) self.stderr = stderr @staticmethod def config_parser(arg_lines): for arg in arg_lines: yield arg def get_errors(self): error = { "title" : "", "msg" : list(), "traceback" : list() } line_idx = 0 FH_stderr = open( self.stderr ) lines = FH_stderr.readlines() while line_idx < len(lines): if lines[line_idx].strip().endswith("RunWorflowException"): error["title"] = lines[line_idx].rstrip() error["msg"] = list() error["traceback"] = list() line_idx += 2 # skip : "Traceback (most recent call last):" # Traceback while lines[line_idx] != lines[line_idx].lstrip(): error["traceback"].append({ "location" : lines[line_idx].strip(), "line" : lines[line_idx].strip() }) line_idx += 2 # Error message while line_idx < len(lines) and not lines[line_idx].strip().endswith("RunWorflowException"): error["msg"].append( lines[line_idx].strip() ) line_idx += 1 line_idx -= 1 line_idx += 1 FH_stderr.close() last_stack_location = "" if len(error["traceback"]) > 0: last_stack_location = error["traceback"][-1]["location"].strip() return { "msg" : error["msg"], "location" : last_stack_location } else: return None def get_outputs_per_components(self, web_path): outputs_files = {} for current_components in self.components: #status = self.get_component_status(current_components.get_nameid()) outputs_files[current_components.get_nameid()] = current_components.get_component_outputs(web_path) #outputs_files["0"] = status["completed"] return outputs_files def __setstate__(self, state): self.__dict__ = state.copy() threading.Thread.__init__(self, name=self.name) def __getstate__(self): """ Threading uses Lock Object, do not consider these objects when serializing a workflow """ odict = self.__dict__.copy() del odict['_Thread__started'] del odict['_Thread__block'] del odict['_Thread__stderr'] return odict def get_parameters_per_groups(self): name = self.get_name() description = self.get_description() parameters = self.get_parameters() pgparameters, parameters_order = {}, [] for param in parameters: if param.group not in parameters_order: parameters_order.append(param.group) if pgparameters.has_key(param.group): pgparameters[param.group].append(param) else: pgparameters[param.group] = [param] return [pgparameters, parameters_order] def get_parameters(self): params = [] for param in self.params_order: for attribute_value in self.__dict__.values(): if (issubclass(attribute_value.__class__, AbstractParameter)) and param == attribute_value.name: params.append(attribute_value) return params def get_exec_path(self, software): exec_path = self.jflow_config_reader.get_exec(software) if exec_path is None and os.path.isfile(os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "../bin", software)): exec_path = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "../bin", software) elif exec_path is None and os.path.isfile(os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "bin", software)): exec_path = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "bin", software) elif exec_path is None and utils.which(software) == None: sys.stderr.write("Error: '" + software + "' path connot be retrieved either in the PATH and in the application.properties file!\n") sys.exit(1) elif exec_path is None and utils.which(software) != None: exec_path = software elif exec_path != None and not os.path.isfile(exec_path): sys.stderr.write("Error: '" + exec_path + "' set for '" + software + "' does not exists, please provide a valid path!\n") sys.exit(1) return exec_path def add_component(self, component_name, args=[], kwargs={}, component_prefix="default"): # first build and check if this component is OK if self.comp_pckg.has_key(component_name): my_pckge = __import__(self.comp_pckg[component_name], globals(), locals(), [component_name], -1) # build the object and define required field cmpt_object = getattr(my_pckge, component_name)() cmpt_object.output_directory = self.get_component_output_directory(component_name, component_prefix) cmpt_object.prefix = component_prefix if kwargs: cmpt_object.define_parameters(**kwargs) else: cmpt_object.define_parameters(*args) # there is a dynamic component if cmpt_object.is_dynamic(): self.dynamic_component_present = True # if already init, add the component to the list and check if weaver should be executed if self.component_nameids_is_init: # add the component self.components_to_exec.append(cmpt_object) self.components.append(cmpt_object) self._execute_weaver() # update outputs for output in cmpt_object.get_dynamic_outputs(): output.update() else: if self._component_is_duplicated(cmpt_object): raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " + cmpt_object.prefix + " already exist in this pipeline!") self.component_nameids[cmpt_object.get_nameid()] = None self.components_to_exec = [] self.components = [] else: if self.component_nameids_is_init: # add the component self.components_to_exec.append(cmpt_object) self.components.append(cmpt_object) elif not self.component_nameids_is_init and not self.dynamic_component_present: if self._component_is_duplicated(cmpt_object): raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " + cmpt_object.prefix + " already exist in this pipeline!") self.components_to_exec.append(cmpt_object) self.components.append(cmpt_object) else: if self._component_is_duplicated(cmpt_object): raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " + cmpt_object.prefix + " already exist in this pipeline!") self.component_nameids[cmpt_object.get_nameid()] = None return cmpt_object else: raise ImportError(component_name + " component cannot be loaded, available components are: {0}".format(", ".join(self.comp_pckg.keys()))) def pre_process(self): pass def process(self): """ Run the workflow, has to be implemented by subclasses """ raise NotImplementedError def get_name(self): """ Return the workflow name, has to be implemented by subclasses """ raise NotImplementedErro def get_description(self): """ Return the workflow description, has to be implemented by subclasses """ raise NotImplementedErrorr def define_parameters(self, parameters_section=None): """ Define the workflow parameters, has to be implemented by subclasses """ raise NotImplementedError def post_process(self): pass def get_temporary_file(self, suffix=".txt"): tempfile_name = os.path.basename(tempfile.NamedTemporaryFile(suffix=suffix).name) return os.path.join(self.jflow_config_reader.get_tmp_directory(), tempfile_name) def get_component_output_directory(self, component_name, component_prefix): return os.path.join(self.directory, component_name + "_" + component_prefix) def get_components_nameid(self): return self.component_nameids.keys() def wf_execution_wrapper(self): getattr(self, self.function)() def run(self): """ Only require for Threading """ try: # if this is the first time the workflow run if self.step == None : self.start_time = time.time() self.step = 0 self.status = self.STATUS_STARTED self.end_time = None self._serialize() # if pre_processing has not been done yet if self.step == 0: self.pre_process() self.step = 1 self._serialize() # if collecting components and running workflow has not been done yet if self.step == 1: try: self.wf_execution_wrapper() except SystemExit, e: self.status = self.STATUS_FAILED self.end_time = time.time() self._serialize() raise self.component_nameids_is_init = True if self.dynamic_component_present: self.step = 2 else: self._execute_weaver() self.step = 3 self._serialize() # if the workflow was a dynamic one if self.step == 2: try: self.wf_execution_wrapper() except SystemExit, e: self.status = self.STATUS_FAILED self.end_time = time.time() self._serialize() raise if len(self.components_to_exec) > 0: self._execute_weaver() self.step = 3 self._serialize() # if post processing has ne been done yet if self.step == 3: self.post_process() if self.status == self.STATUS_STARTED: self.status = self.STATUS_COMPLETED self.end_time = time.time() self._serialize() except: logging.getLogger("wf." + str(self.id)).exception("RunWorflowException") raise def _update_status_from_log(self): # first update the status from weaver folders # TODO update self.end_time try: working_directory = os.path.join(self.directory, self.WORKING) make_states = [] for wdir in os.listdir(working_directory): log_path = os.path.join(working_directory, wdir, self.MAKEFLOW_LOG_FILE_NAME) log = MakeflowLog(log_path) log.parse() make_states.append(log.state) if self.STATUS_ABORTED in make_states: self.status = self.STATUS_ABORTED elif self.STATUS_FAILED in make_states: self.status = self.STATUS_FAILED elif self.status != self.STATUS_COMPLETED: self.status = self.STATUS_STARTED except: pass self._serialize() def get_status(self): self._update_status_from_log() return self.status def get_resource(self, resource): return self.jflow_config_reader.get_resource(resource) def get_component_status(self, component_nameid): status = {"time": 0.0, "tasks": 0, "waiting": 0, "running": 0, "failed":0, "aborted": 0, "completed": 0, "failed_commands": list() } try: log = MakeflowLog(self.component_nameids[component_nameid]) log.parse() symbols = set(n.symbol for n in log.nodes if n.symbol) if not symbols: return None for n in log.nodes: if not n.symbol: continue if n.symbol == component_nameid: status["tasks"] += 1 status["time"] += n.elapsed_time if n.state == Node.WAITING: status["waiting"] += 1 elif n.state == Node.RUNNING: status["running"] += 1 elif n.state == Node.FAILED: status["failed"] += 1 status["failed_commands"].append( n.command ) elif n.state == Node.ABORTED: status["aborted"] += 1 elif n.state == Node.COMPLETED: status["completed"] += 1 except: pass return status def makeflow_pretty_print_node(self, dag, node): sys.stdout.write('{0:>10} {1} {2}\n'.format('NODE', node.id, node.symbol)) for output_file in sorted(node.output_files): sys.stdout.write('{0:>10} {1:>10} {2}\n'.format('', 'OUTPUT', output_file)) for input_file in sorted(node.input_files): sys.stdout.write('{0:>10} {1:>10} {2}\n'.format('', 'INPUT', input_file)) sys.stdout.write('{0:>10} {1:>10} {2}\n'.format('', 'COMMAND', node.command)) def _execute_weaver(self, engine_wrapper=None): # Add nest path and path to script to Python module path to allow # for importing modules outside of $PYTHONPATH make_directory, new_make = self._get_current_make() current_working_directory = os.path.join(os.path.join(self.directory, self.WORKING), make_directory) sys.path.insert(0, os.path.abspath(os.path.dirname(current_working_directory))) # Load built-ins self._import('abstraction', ABSTRACTIONS) self._import('dataset', DATASETS) self._import('function', FUNCTIONS) self._import('nest', NESTS) self._import('options', OPTIONS) self._import('stack', STACKS) # Execute nest with Nest(current_working_directory, wrapper=engine_wrapper, path=self.jflow_config_reader.get_makeflow_path()) as nest: with self.options: if new_make: try: for component in self.components_to_exec: nest.symbol = component.get_nameid() nest.batch = component.batch_options self.component_nameids[component.get_nameid()] = os.path.join(current_working_directory, self.MAKEFLOW_LOG_FILE_NAME) component.execute() # create the DAG nest.compile() except Exception as e: self.status = self.STATUS_FAILED self.end_time = time.time() self._serialize() raise self.components_to_exec = [] # Once a weaver script is compiled, serialize the workflow self._serialize() try: nest.execute(self.engine_arguments, exit_on_failure=True) except: self.status = self.STATUS_FAILED self.end_time = time.time() self._serialize() raise def _get_current_make(self): current_component, make_directory, new_make = [], None, False for component in self.components_to_exec: current_component.append(component.get_nameid()) for make in self.makes: if set(current_component) == set(self.makes[make]): make_directory = make # If the components in the queue have already been compiled if make_directory is None: make_directory = uuid.uuid4().hex[:10] self.makes[make_directory] = current_component new_make = True return [make_directory, new_make] def _serialize(self): self.dump_path = os.path.join(self.directory, self.DUMP_FILE_NAME) workflow_dump = open(self.dump_path, "wb") pickle.dump(self, workflow_dump) workflow_dump.close() def _component_is_duplicated(self, component): if component.get_nameid() in self.component_nameids.keys(): return True return False def _get_property_path(self): """ Return(type:string): the path to the workflow properties file, None if does not exist """ property_file = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), self.PROPERTIES_FILE_NAME) return property_file if os.path.isfile(property_file) else None def _import_components(self): pckge = {} # then import pipeline packages pipeline_dir = os.path.dirname(inspect.getfile(self.__class__)) for importer, modname, ispkg in pkgutil.iter_modules([os.path.join(pipeline_dir, "components")], "workflows." + os.path.basename(pipeline_dir) + ".components."): try: m = __import__(modname) for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass): if issubclass(obj, jflow.component.Component) and obj.__name__ != jflow.component.Component.__name__: pckge[class_name] = modname except Exception as e: logging.getLogger("Workflow._import_components").debug("Component <{0}> cannot be loaded: {1}".format(modname, e)) # finally import workflows shared packages workflows_dir = os.path.dirname(os.path.dirname(inspect.getfile(self.__class__))) for importer, modname, ispkg in pkgutil.iter_modules([os.path.join(workflows_dir, "components")], "workflows.components."): try: m = __import__(modname) for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass): if issubclass(obj, jflow.component.Component) and obj.__name__ != jflow.component.Component.__name__: pckge[class_name] = modname except Exception as e: logging.getLogger("Workflow._import_components").debug("Component <{0}> cannot be loaded: {1}".format(modname, e)) return pckge def _import(self, module, symbols): """ Import ``symbols`` from ``module`` into global namespace. """ # Import module m = 'weaver.{0}'.format(module) m = __import__(m, self.globals, self.globals, symbols, -1) # Import symbols from module into global namespace, which we store as # an attribute for later use (i.e. during compile) for symbol in symbols: self.globals[symbol] = getattr(m, symbol)