# # Copyright (C) 2012 INRA # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import inspect import os import sys import uuid import pkgutil import tempfile import pickle import time import threading from ConfigParser import ConfigParser, NoOptionError import types import jflow import jflow.utils as utils from jflow.workflows_manager import WorkflowsManager, JFlowConfigReader from jflow.parameter import Parameter, date from weaver.script import ABSTRACTIONS from weaver.script import DATASETS from weaver.script import FUNCTIONS from weaver.script import NESTS from weaver.script import OPTIONS from weaver.script import STACKS from weaver.nest import Nest from weaver.options import Options from cctools.makeflow import MakeflowLog from cctools.makeflow.log import Node class Workflow(threading.Thread): """ """ PROPERTIES_FILE_NAME = "workflow.properties" MAKEFLOW_LOG_FILE_NAME = "Makeflow.makeflowlog" DUMP_FILE_NAME = "workflow.dump" WORKING = ".working" STATUS_STARTED = "started" STATUS_COMPLETED = "completed" STATUS_FAILED = "failed" STATUS_ABORTED = "aborted" def __init__(self, args={}, id=None): """ """ # define as a thread threading.Thread.__init__(self) self.jflow_config_reader = JFlowConfigReader() self.manager = WorkflowsManager() self.components = [] self.makes = {} self.globals = {} self.options = Options() self.status = self.STATUS_STARTED self.start_time = None self.end_time = None self.comp_pckg = self._import_components() self.engine_arguments = '' self.component_nameids_is_init = False self.component_nameids = {} # try to parse engine arguments try: type, options = self.jflow_config_reader.get_batch() if type: self.engine_arguments += ' -T ' + type if options : self.engine_arguments += ' -B "' + options + '"' except: self.engine_arguments = None try: # Add config information self.name, self.description, self.parameters = self._get_from_config() except: raise IOError(self.__class__.__name__ + " workflow property file not found or invalid.") self.id = id self.args = self._extend_and_format_args(args) if self.id is not None: self.directory = self.manager.get_workflow_directory(self.name, self.id) if not os.path.isdir(self.directory): os.makedirs(self.directory, 0751) self._serialize() def __setstate__(self, state): self.__dict__ = state.copy() threading.Thread.__init__(self, name=self.name) def __getstate__(self): """ Threading uses Lock Object, do not consider these objects when serializing a workflow """ odict = self.__dict__.copy() del odict['_Thread__started'] del odict['_Thread__block'] del odict['_Thread__stderr'] return odict def add_component(self, component_name, args=[], kwargs={}, component_prefix="default"): # first build and check if this component is OK my_pckge = __import__(self.comp_pckg[component_name], globals(), locals(), [component_name], -1) # build the object and define required field cmpt_object = getattr(my_pckge, component_name)() cmpt_object.output_directory = self.get_component_output_directory(component_name, component_prefix) cmpt_object.prefix = component_prefix if kwargs: cmpt_object.define_parameters(**kwargs) else: cmpt_object.define_parameters(*args) # if already init, add the component to the list and check if weaver should be executed if self.component_nameids_is_init: # add the component self.components.append(cmpt_object) # if this one require a dynamic pipeline, execute the first part if cmpt_object.is_dynamic: self._execute_weaver() else: if self._component_is_duplicated(cmpt_object): raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " + cmpt_object.prefix + " already exist in this pipeline!") self.component_nameids[cmpt_object.get_nameid()] = None return cmpt_object def pre_process(self): pass def process(self): """ Run the workflow, has to be implemented by subclasses """ raise NotImplementedError def post_process(self): pass def get_temporary_file(self, suffix=".txt"): # first check if tmp directory exists if not os.path.isdir(self.jflow_config_reader.get_tmp_directory()): os.makedirs(self.jflow_config_reader.get_tmp_directory(), 0751) tempfile_name = os.path.basename(tempfile.NamedTemporaryFile(suffix=suffix).name) return os.path.join(self.jflow_config_reader.get_tmp_directory(), tempfile_name) def get_component_output_directory(self, component_name, component_prefix): return os.path.join(self.directory, component_name + "_" + component_prefix) def get_components_nameid(self): return self.component_nameids.keys() def run(self): """ Only require for Threading """ self.start_time = time.time() self.status = self.STATUS_STARTED self.end_time = None # serialize the workflow before pre processing self._serialize() self.pre_process() # serialize the workflow before running self._serialize() # if not done yet, init the component nameids list before to run if not self.component_nameids_is_init: try: self.process() except SystemExit, e: print e self.component_nameids_is_init = True self._serialize() try: self.process() except SystemExit, e: print e if len(self.components) > 0: self._execute_weaver() self._serialize() self.post_process() if self.status == self.STATUS_STARTED: self.status = self.STATUS_COMPLETED self.end_time = time.time() self._serialize() def _extend_and_format_args(self, args): extended_args = {} for param in self.parameters: try: args[param.name] = args[param.name].encode('ascii','ignore') except: pass # if this parameter has been modified by the user if args.has_key(param.name): # if it's an append parameter, the result should be a list if param.action == "append": if args[param.name]: try: value = args[param.name].split(",") except: value = args[param.name] else: value = [] extended_args[param.name] = value # if it's a date elif param.type == date: if isinstance(args[param.name], str): extended_args[param.name] = date(args[param.name]) else: extended_args[param.name] = args[param.name] # if it's a boolean elif param.type == types.BooleanType: extended_args[param.name] = args[param.name] in [True, 1, 'true', '1', 't', 'y', 'yes'] else: extended_args[param.name] = args[param.name] # otherwise use the default else: extended_args[param.name] = param.default return extended_args def _get_from_config(self): reader = ConfigParser() reader.read(self._get_property_path()) parameters = [] try: params = {} # First grab all parameters name for param in reader.items("parameters"): cvalue = reader.get("parameters", param[0]) cparam = param[0].split(".")[0] ckey = param[0].split(".")[1] if not params.has_key(cparam): params[cparam] = {ckey: cvalue} else: params[cparam][ckey] = cvalue for param in params: oparam = Parameter(**params[param]) parameters.append(oparam) except: pass return [reader.get("global", "name"), reader.get("global", "description"), parameters] def _update_status_from_log(self): # first update the status from weaver folders # TODO update self.end_time try: working_directory = os.path.join(self.directory, self.WORKING) make_states = [] for wdir in os.listdir(working_directory): log_path = os.path.join(working_directory, wdir, self.MAKEFLOW_LOG_FILE_NAME) log = MakeflowLog(log_path) log.parse() make_states.append(log.state) if self.STATUS_ABORTED in make_states: self.status = self.STATUS_ABORTED elif self.STATUS_FAILED in make_states: self.status = self.STATUS_FAILED elif self.status != self.STATUS_COMPLETED: self.status = self.STATUS_STARTED except: pass self._serialize() def get_status(self): self._update_status_from_log() return self.status def get_component_status(self, component_nameid): status = {"time": 0.0, "tasks": 0, "waiting": 0, "running": 0, "failed":0, "aborted": 0, "completed": 0} try: log = MakeflowLog(self.component_nameids[component_nameid]) log.parse() symbols = set(n.symbol for n in log.nodes if n.symbol) if not symbols: return None for n in log.nodes: if not n.symbol: continue if n.symbol == component_nameid: status["tasks"] += 1 status["time"] += n.elapsed_time if n.state == Node.WAITING: status["waiting"] += 1 elif n.state == Node.RUNNING: status["running"] += 1 elif n.state == Node.FAILED: status["failed"] += 1 elif n.state == Node.ABORTED: status["aborted"] += 1 elif n.state == Node.COMPLETED: status["completed"] += 1 except: pass return status def makeflow_pretty_print_node(self, dag, node): sys.stdout.write('{0:>10} {1} {2}\n'.format('NODE', node.id, node.symbol)) for output_file in sorted(node.output_files): sys.stdout.write('{0:>10} {1:>10} {2}\n'.format('', 'OUTPUT', output_file)) for input_file in sorted(node.input_files): sys.stdout.write('{0:>10} {1:>10} {2}\n'.format('', 'INPUT', input_file)) sys.stdout.write('{0:>10} {1:>10} {2}\n'.format('', 'COMMAND', node.command)) def _execute_weaver(self, engine_wrapper=None): # Add nest path and path to script to Python module path to allow # for importing modules outside of $PYTHONPATH make_directory, new_make = self._get_current_make() current_working_directory = os.path.join(os.path.join(self.directory, self.WORKING), make_directory) sys.path.insert(0, os.path.abspath(os.path.dirname(current_working_directory))) # Load built-ins self._import('abstraction', ABSTRACTIONS) self._import('dataset', DATASETS) self._import('function', FUNCTIONS) self._import('nest', NESTS) self._import('options', OPTIONS) self._import('stack', STACKS) # Execute nest with Nest(current_working_directory, wrapper=engine_wrapper, path=self.jflow_config_reader.get_makeflow_path()) as nest: with self.options: if new_make: try: for component in self.components: nest.symbol = component.get_nameid() self.component_nameids[component.get_nameid()] = os.path.join(current_working_directory, self.MAKEFLOW_LOG_FILE_NAME) component.execute() # create the DAG nest.compile() except Exception as e: sys.exit('Error compiling script: {0}'.format(e)) self.status = self.STATUS_FAILED self._serialize() self.components = [] # Once a weaver script is compiled, serialize the workflow self._serialize() try: nest.execute(self.engine_arguments, exit_on_failure=True) except: self.status = self.STATUS_FAILED self.end_time = time.time() self._serialize() sys.exit() def _get_current_make(self): current_component, make_directory, new_make = [], None, False for component in self.components: current_component.append(component.get_nameid()) for make in self.makes: if set(current_component) == set(self.makes[make]): make_directory = make # If the components in the queue have already been compiled if make_directory is None: make_directory = uuid.uuid4().hex[:10] self.makes[make_directory] = current_component new_make = True return [make_directory, new_make] def _serialize(self): self.dump_path = os.path.join(self.directory, self.DUMP_FILE_NAME) workflow_dump = open(self.dump_path, "wb") pickle.dump(self, workflow_dump) workflow_dump.close() def _component_is_duplicated(self, component): if component.get_nameid() in self.component_nameids.keys(): return True return False def _get_property_path(self): """ Return(type:string): the path to the workflow properties file, None if does not exist """ property_file = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), self.PROPERTIES_FILE_NAME) return property_file if os.path.isfile(property_file) else None def _import_components(self): pckge = {} # then import pipeline packages pipeline_dir = os.path.dirname(inspect.getfile(self.__class__)) for importer, modname, ispkg in pkgutil.iter_modules([os.path.join(pipeline_dir, "components")], "workflows." + os.path.basename(pipeline_dir) + ".components."): m = __import__(modname) for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass): if issubclass(obj, jflow.component.Component) and obj.__name__ != jflow.component.Component.__name__: pckge[class_name] = modname # finally import workflows shared packages workflows_dir = os.path.dirname(os.path.dirname(inspect.getfile(self.__class__))) for importer, modname, ispkg in pkgutil.iter_modules([os.path.join(workflows_dir, "components")], "workflows.components."): m = __import__(modname) for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass): if issubclass(obj, jflow.component.Component) and obj.__name__ != jflow.component.Component.__name__: pckge[class_name] = modname return pckge def _import(self, module, symbols): """ Import ``symbols`` from ``module`` into global namespace. """ # Import module m = 'weaver.{0}'.format(module) m = __import__(m, self.globals, self.globals, symbols, -1) # Import symbols from module into global namespace, which we store as # an attribute for later use (i.e. during compile) for symbol in symbols: self.globals[symbol] = getattr(m, symbol)