Commit b5884e18 authored by Jerome Mariette's avatar Jerome Mariette
Browse files

update jflow to the last version (improve get_workflow)

parent 88935c70
......@@ -76,7 +76,8 @@ class JFlowConfigReader(object):
try:
type = self.reader.get("global", "batch_system_type")
options = self.reader.get("global", "batch_options")
return [type, options]
limit_submission = self.reader.get("global", "limit_submission")
return [type, options, limit_submission]
except NoOptionError, e:
return None
......
......@@ -53,7 +53,7 @@ class Workflow(threading.Thread):
PROPERTIES_FILE_NAME = "workflow.properties"
MAKEFLOW_LOG_FILE_NAME = "Makeflow.makeflowlog"
DUMP_FILE_NAME = "workflow.dump"
DUMP_FILE_NAME = ".workflow.dump"
WORKING = ".working"
STATUS_STARTED = "started"
STATUS_COMPLETED = "completed"
......@@ -75,20 +75,23 @@ class Workflow(threading.Thread):
self.start_time = None
self.end_time = None
self.step = None
self.dynamic_component_present = False
self.function = function
self.parameters_section = parameters_section
self.comp_pckg = self._import_components()
self.engine_arguments = ''
self.component_nameids_is_init = False
self.component_nameids = {}
# try to parse engine arguments
try:
type, options = self.jflow_config_reader.get_batch()
type, options, limit_submission = self.jflow_config_reader.get_batch()
if limit_submission : self.engine_arguments += ' -J ' + str(limit_submission)
if type: self.engine_arguments += ' -T ' + type
if options : self.engine_arguments += ' -B "' + options + '"'
except: self.engine_arguments = None
try:
# Add config information
self.name, self.description, self.parameters = self._get_from_config()
self.name, self.description, self.parameters = self._get_from_config(self.parameters_section)
except:
raise IOError(self.__class__.__name__ + " workflow property file not found or invalid.")
......@@ -101,56 +104,6 @@ class Workflow(threading.Thread):
os.makedirs(self.directory, 0751)
self._serialize()
def init_component_nameids(self):
logging.getLogger("Workflow.init_component_nameids").debug("Initializing components names ids")
logging.getLogger("Workflow.init_component_nameids").debug("Name ids in the array : " + str(self.component_nameids.keys()))
workflow_command_lines = inspect.getsourcelines(getattr(self, self.function))[0]
for i in range(0, len(workflow_command_lines)):
line = workflow_command_lines[i]
logging.getLogger("Workflow.init_component_nameids").debug("Line : " + line)
if line.find("self.add_component") != -1 and not re.search('^\s*\#', line) :
while re.search('\\\s*$', line) or re.search(',\s*$', line): # While line ends with backslash
# Concatenate next line
i += 1
next_line = workflow_command_lines[i]
if re.search('^\s*\#', next_line) :
nex_line = "\\"
if re.search('\\\s*$', line):
line = re.search('(.+)\\\s*$', line).groups()[0] + next_line
elif re.search(',\s*$', line):
line = re.search('(.+),\s*$', line).groups()[0] + next_line
component_prefix = "default"
component_name = line.strip().split("self.add_component")[1][1:-1].split(",")[0][1:-1]
arguments = re.search("self.add_component\s*\((.+)\)", line).groups()[0]
named_prefix = re.search("component_prefix\s*=\s*([^,]+)", arguments)
# Component prefix is in named argument
if named_prefix:
component_prefix = named_prefix.groups()[0].strip().strip('"')
# Component prefix is missing or in unamed argument
else :
without_named_args = re.sub( ",?\s*\S+\s*=\s*\S+", "" , arguments) # Discard named arguments
fields = without_named_args.strip(",").split(",")
# Group brackets arguments
args_split_clean = []
current_idx = 0
current_arg = ""
while current_idx < len(fields) :
current_arg = current_arg + fields[current_idx] + ','
open_brackets = current_arg.count("[")
close_brackets = current_arg.count("]")
if open_brackets == close_brackets:
args_split_clean.append( current_arg[:-1] )
current_arg = ""
current_idx += 1
# Find component prefix
if len(args_split_clean) >= 4:
component_prefix = args_split_clean[3].strip().strip('"')
logging.getLogger("Workflow.init_component_nameids").debug("Component added : " + component_name+"."+component_prefix)
if self._component_is_duplicated(component_name+"."+component_prefix):
raise ValueError("Component " + component_name + " with prefix " +
component_prefix + " already exist in this pipeline!")
self.component_nameids[component_name+"."+component_prefix] = None
@staticmethod
def config_parser(arg_lines):
for arg in arg_lines:
......@@ -170,15 +123,16 @@ class Workflow(threading.Thread):
del odict['_Thread__stderr']
return odict
def get_parameters_per_groups(self):
parameters, parameters_order = {}, []
for param in self.parameters:
def get_parameters_per_groups(self, parameters_section="parameters"):
name, description, parameters = self._get_from_config(parameters_section)
pgparameters, parameters_order = {}, []
for param in parameters:
if param.group not in parameters_order: parameters_order.append(param.group)
if parameters.has_key(param.group):
parameters[param.group].append(param)
if pgparameters.has_key(param.group):
pgparameters[param.group].append(param)
else:
parameters[param.group] = [param]
return [parameters, parameters_order]
pgparameters[param.group] = [param]
return [pgparameters, parameters_order]
def get_exec_path(self, software):
exec_path = self.jflow_config_reader.get_exec(software)
......@@ -206,14 +160,39 @@ class Workflow(threading.Thread):
cmpt_object.prefix = component_prefix
if kwargs: cmpt_object.define_parameters(**kwargs)
else: cmpt_object.define_parameters(*args)
# add the component
self.components.append(cmpt_object)
# if this one require a dynamic pipeline, execute the first part
# there is a dynamic component
if cmpt_object.is_dynamic():
self._execute_weaver()
# update outputs
for output in cmpt_object.get_dynamic_outputs():
output.update()
self.dynamic_component_present = True
# if already init, add the component to the list and check if weaver should be executed
if self.component_nameids_is_init:
# add the component
self.components.append(cmpt_object)
self._execute_weaver()
# update outputs
for output in cmpt_object.get_dynamic_outputs():
output.update()
else:
if self._component_is_duplicated(cmpt_object):
raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
cmpt_object.prefix + " already exist in this pipeline!")
self.component_nameids[cmpt_object.get_nameid()] = None
self.components = []
else:
if self.component_nameids_is_init:
# add the component
self.components.append(cmpt_object)
elif not self.component_nameids_is_init and not self.dynamic_component_present:
if self._component_is_duplicated(cmpt_object):
raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
cmpt_object.prefix + " already exist in this pipeline!")
self.components.append(cmpt_object)
else:
if self._component_is_duplicated(cmpt_object):
raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
cmpt_object.prefix + " already exist in this pipeline!")
self.component_nameids[cmpt_object.get_nameid()] = None
return cmpt_object
else:
raise ImportError(component_name + " component cannot be loaded, available components are: {0}".format(", ".join(self.comp_pckg.keys())))
......@@ -249,19 +228,34 @@ class Workflow(threading.Thread):
"""
# if this is the first time the workflow run
if self.step == None :
self.init_component_nameids()
self.start_time = time.time()
self.step = 0
self.status = self.STATUS_STARTED
self.end_time = None
self.end_time = None
self._serialize()
# if pre_processing has not been done yet
if self.step == 0:
self.pre_process()
self.step = 1
self._serialize()
# if running workflow has not been done yet
# if collecting components and running workflow has not been done yet
if self.step == 1:
try:
getattr(self, self.function)()
except SystemExit, e:
self.status = self.STATUS_FAILED
self.end_time = time.time()
self._serialize()
raise
self.component_nameids_is_init = True
if self.dynamic_component_present:
self.step = 2
else:
self._execute_weaver()
self.step = 3
self._serialize()
# if the workflow was a dynamic one
if self.step == 2:
try:
getattr(self, self.function)()
except SystemExit, e:
......@@ -271,10 +265,10 @@ class Workflow(threading.Thread):
raise
if len(self.components) > 0:
self._execute_weaver()
self.step = 2
self.step = 3
self._serialize()
# if post processing has ne been done yet
if self.step == 2:
if self.step == 3:
self.post_process()
if self.status == self.STATUS_STARTED: self.status = self.STATUS_COMPLETED
self.end_time = time.time()
......@@ -340,15 +334,15 @@ class Workflow(threading.Thread):
extended_args[param.name] = param.default
return extended_args
def _get_from_config(self):
def _get_from_config(self, parameters_section):
reader = ConfigParser()
reader.read(self._get_property_path())
parameters, param_order = [], []
try:
params, sub_order, excludes, hash_params = {}, {}, [], {}
# first grab all parameters name
for param in reader.items(self.parameters_section):
cvalue = reader.get(self.parameters_section, param[0])
for param in reader.items(parameters_section):
cvalue = reader.get(parameters_section, param[0])
cparam = param[0].split(".")[0]
ckey = ".".join(param[0].split(".")[1:])
if len(ckey.split(".")) > 1:
......@@ -564,8 +558,8 @@ class Workflow(threading.Thread):
pickle.dump(self, workflow_dump)
workflow_dump.close()
def _component_is_duplicated(self, component_nameid):
if component_nameid in self.component_nameids.keys():
def _component_is_duplicated(self, component):
if component.get_nameid() in self.component_nameids.keys():
return True
return False
......
......@@ -33,9 +33,11 @@ class WorkflowsManager(object):
"""
IDS_FILE_NAME = "jflowIDs.txt"
WORKFLOWS_DUMP_FILE_NAME = ".workflows.dump"
WF_DIRECTORY_PREFIX = "wf"
lock_ids_file = threading.Lock()
lock_workflows_file = threading.Lock()
def __init__(self):
"""
......@@ -43,8 +45,20 @@ class WorkflowsManager(object):
self.config_reader = JFlowConfigReader()
if not os.path.isdir(self.get_output_directory()):
os.makedirs(self.get_output_directory(), 0751)
self.dump_file = os.path.join(self.get_output_directory(), self.WORKFLOWS_DUMP_FILE_NAME)
# if the file already exists, load all the workflows dump path
if os.path.isfile(self.dump_file):
self.workflows_dump = pickle.load(open(self.dump_file))
else:
self.workflows_dump = {}
self.ids_file = os.path.join(self.get_output_directory(), self.IDS_FILE_NAME)
def _add_workflow(self, workflow):
WorkflowsManager.lock_workflows_file.acquire()
self.workflows_dump[utils.get_nb_string(workflow.id)] = workflow.dump_path
pickle.dump(self.workflows_dump, open(self.dump_file, "w"))
WorkflowsManager.lock_workflows_file.release()
def get_available_workflows(self, parameters_section="parameters"):
wf_instances = []
# Load all modules within the workflow module
......@@ -70,26 +84,21 @@ class WorkflowsManager(object):
for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
if class_name == workflow_class: workflow = obj(args, self.get_next_id(), function, parameters_section)
workflow.start()
# Add the workflow dump path to the workflows dump
self._add_workflow(workflow)
return workflow
def get_output_directory(self):
return self.config_reader.get_work_directory()
def get_workflows(self):
from jflow.workflow import Workflow
workflows = []
for wf_name in os.listdir(self.get_output_directory()):
wf_name_path = os.path.join(self.get_output_directory(), wf_name)
# If it's a workflow directory
if os.path.isdir(wf_name_path):
for wf in os.listdir(wf_name_path):
wf_path = os.path.join(wf_name_path, wf)
workflow_dump_path = os.path.join(wf_path, Workflow.DUMP_FILE_NAME)
try:
workflow_dump = open(workflow_dump_path, "rb")
workflows.append(pickle.load(workflow_dump))
workflow_dump.close()
except: pass
for workflow_id in self.workflows_dump:
try:
workflow_dump = open(self.workflows_dump[workflow_id], "rb")
workflows.append(pickle.load(workflow_dump))
workflow_dump.close()
except: pass
return workflows
def get_workflow_by_class(self, workflow_class):
......@@ -103,25 +112,13 @@ class WorkflowsManager(object):
return None
def get_workflow(self, workflow_id):
from jflow.workflow import Workflow
search_directory = self.WF_DIRECTORY_PREFIX + utils.get_nb_string(workflow_id)
workflow_dump_path = None
for wf_name in os.listdir(self.get_output_directory()):
wf_name_path = os.path.join(self.get_output_directory(), wf_name)
# If it's a workflow directory
if os.path.isdir(wf_name_path):
for wf in os.listdir(wf_name_path):
wf_path = os.path.join(wf_name_path, wf)
if wf == search_directory:
if os.path.isfile(os.path.join(wf_path, Workflow.DUMP_FILE_NAME)):
workflow_dump_path = os.path.join(wf_path, Workflow.DUMP_FILE_NAME)
break
if workflow_dump_path:
workflow_dump = open(workflow_dump_path, "rb")
rworkflow_id = utils.get_nb_string(workflow_id)
if self.workflows_dump.has_key(rworkflow_id):
workflow_dump = open(self.workflows_dump[rworkflow_id], "rb")
workflow = pickle.load(workflow_dump)
workflow_dump.close()
else:
raise ValueError("Workflow with id " + str(workflow_id) + " cannot be retrived!")
raise ValueError("Workflow with id " + str(rworkflow_id) + " cannot be retrived!")
return workflow
def get_workflow_directory(self, wname, wid):
......
......@@ -45,30 +45,41 @@ class NG6Workflow (Workflow):
def __init__(self, args={}, id=None, function= "process", parameters_section="parameters", add_run_params=True):
Workflow.__init__(self, args, id, function, parameters_section)
# admin login is required for all workflow
self.parameters.append(Parameter("admin_login", "Who is the project administrator", "--admin-login", required=True, type=adminlogin, display_name="Admin login"))
if add_run_params:
self.parameters.extend(self._add_run_parameters())
self.add_run_params = add_run_params
self.parameters.extend(self._add_required_parameters())
self.args = self._extend_and_format_args(self.parameters, args)
def _add_run_parameters(self):
def _add_required_parameters(self):
orequired = []
orequired.append(Parameter("admin_login", "Who is the project administrator", "--admin-login", required=True, type=adminlogin, display_name="Admin login"))
# add run parameters
orequired.append(Parameter("project_id", "The project id the run belongs to", "--project-id", required=True, type=int, group="Run information"))
orequired.append(Parameter("run_name", "Give a name to your run", "--name", required=True, group="Run information", display_name="Name"))
orequired.append(Parameter("run_description", "Give a description to your run", "--description", required=True, group="Run information", display_name="Description"))
orequired.append(Parameter("run_date", "When were the data produced","--date", required=True, type=date, group="Run information", display_name="Date"))
orequired.append(Parameter("data_nature", "Are Sequences cDNA, genomique, RNA, ...", "--data-nature", required=True, group="Run information", display_name="Data nature"))
orequired.append(Parameter("sequencer", "Which sequencer produced the data", "--sequencer", required=True, group="Run information", display_name="Sequencer"))
orequired.append(Parameter("species", "Which species has been sequenced", "--species", required=True, group="Run information", display_name="Species"))
orequired.append(Parameter("run_type", "What type of data is it (1 lane, 1 region)", "--type", required=True, group="Run information", display_name="Type"))
if self.add_run_params:
orequired.append(Parameter("project_id", "The project id the run belongs to", "--project-id", required=True, type=int, group="Run information"))
orequired.append(Parameter("run_name", "Give a name to your run", "--name", required=True, group="Run information", display_name="Name"))
orequired.append(Parameter("run_description", "Give a description to your run", "--description", required=True, group="Run information", display_name="Description"))
orequired.append(Parameter("run_date", "When were the data produced","--date", required=True, type=date, group="Run information", display_name="Date"))
orequired.append(Parameter("data_nature", "Are Sequences cDNA, genomique, RNA, ...", "--data-nature", required=True, group="Run information", display_name="Data nature"))
orequired.append(Parameter("sequencer", "Which sequencer produced the data", "--sequencer", required=True, group="Run information", display_name="Sequencer"))
orequired.append(Parameter("species", "Which species has been sequenced", "--species", required=True, group="Run information", display_name="Species"))
orequired.append(Parameter("run_type", "What type of data is it (1 lane, 1 region)", "--type", required=True, group="Run information", display_name="Type"))
return orequired
def get_parameters_per_groups(self, parameters_section="parameters"):
name, description, parameters = self._get_from_config(parameters_section)
parameters.extend(self._add_required_parameters())
pgparameters, parameters_order = {}, []
for param in parameters:
if param.group not in parameters_order: parameters_order.append(param.group)
if pgparameters.has_key(param.group):
pgparameters[param.group].append(param)
else:
pgparameters[param.group] = [param]
return [pgparameters, parameters_order]
def pre_process(self):
self.project = None
self.runobj = None
# if starting from an existing project
if self.args.has_key("project_id") and self.args["project_id"] is not None:
self.project = Project.get_from_id(self.args["project_id"])
......@@ -130,15 +141,38 @@ class NG6Workflow (Workflow):
elif addto == "run": cmpt_object.run = self.runobj
# link analysis with ots create user
cmpt_object.admin_login = self.args["admin_login"]
# add the component
self.components.append(cmpt_object)
# if this one require a dynamic pipeline, execute the first part
# there is a dynamic component
if cmpt_object.is_dynamic():
self._execute_weaver()
# update outputs
for output in cmpt_object.get_dynamic_outputs():
output.update()
self.dynamic_component_present = True
# if already init, add the component to the list and check if weaver should be executed
if self.component_nameids_is_init:
# add the component
self.components.append(cmpt_object)
self._execute_weaver()
# update outputs
for output in cmpt_object.get_dynamic_outputs():
output.update()
else:
if self._component_is_duplicated(cmpt_object):
raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
cmpt_object.prefix + " already exist in this pipeline!")
self.component_nameids[cmpt_object.get_nameid()] = None
self.components = []
else:
if self.component_nameids_is_init:
# add the component
self.components.append(cmpt_object)
elif not self.component_nameids_is_init and not self.dynamic_component_present:
if self._component_is_duplicated(cmpt_object):
raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
cmpt_object.prefix + " already exist in this pipeline!")
self.components.append(cmpt_object)
else:
if self._component_is_duplicated(cmpt_object):
raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
cmpt_object.prefix + " already exist in this pipeline!")
self.component_nameids[cmpt_object.get_nameid()] = None
return cmpt_object
else:
raise ImportError(component_name + " component cannot be loaded, available components are: {0}".format(", ".join(self.comp_pckg.keys())))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment