Commit 5c7cc980 authored by Penom Nom's avatar Penom Nom
Browse files

jflow update (package version)

parent d71edb64
......@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
[global]
# uncomment and set if not in the PATH
# uncomment and set if not in the PATH, should be version >= 4.4.3
#makeflow = /usr/bin/makeflow
# batch system type: local, condor, sge, moab, cluster, wq, hadoop, mpi-queue
batch_system_type = local
......
This diff is collapsed.
......@@ -333,14 +333,14 @@ class Component(object):
get a path to an executable. Has to be implemented by subclasses
if the process has not been implemented
"""
raise NotImplementedError
raise NotImplementedError("Either the Component.get_command() function or the Component.process() function has to be implemented!")
def get_abstraction(self):
"""
get the abstraction. Has to be implemented by subclasses
if the process has not been implemented
"""
raise NotImplementedError
raise NotImplementedError("Either the Component.get_abstraction() function or the Component.process() function has to be implemented!")
def get_version(self):
"""
......
......@@ -29,7 +29,7 @@ class ExternalParser(object):
return components
def parse(self, component_file):
raise NotImplementedError
raise NotImplementedError("The ExternalParser.parser() function has to be implemented!")
def build_component(self, component_name, fn_define_parameters, **kwargs):
"""
......
#
# Copyright (C) 2014 INRA
#
# Copyright (C) 2015 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
\ No newline at end of file
#
# Copyright (C) 2015 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import re
......
#
# Copyright (C) 2015 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import re
......
#
# Copyright (C) 2015 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import sys, re
class GFF3Record:
......
#
# Copyright (C) 2015 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import re
......
#
# Copyright (C) 2015 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import re
......
#
# Copyright (C) 2014 INRA
#
#
# Copyright (C) 2015 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
......
#
# Copyright (C) 2015 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import re
......
#
# Copyright (C) 2015 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import re
......
This diff is collapsed.
......@@ -26,12 +26,13 @@ import pickle
import time
import threading
import types
import logging
import datetime
import logging
import traceback
from logging import FileHandler
from ConfigParser import ConfigParser, NoOptionError
from inspect import getcallargs
from datetime import date as ddate
import jflow
import jflow.utils as utils
......@@ -116,7 +117,6 @@ class Workflow(threading.Thread):
self.end_time = None
self.step = None
self.stderr = None
self.stderrfh = None
self.args = args
self.dynamic_component_present = False
self.__to_address = None
......@@ -269,8 +269,7 @@ class Workflow(threading.Thread):
paramsexclude.group = new_group
# it might be a mutliple param rule
else:
logging.getLogger("wf." + str(self.__class__.__name__)).exception("Exclusion rule cannot be applied within a MultiParameter or a MultiParameterList")
raise
self._log("Exclusion rule cannot be applied within a MultiParameter or a MultiParameterList", raisee=True)
# save this for MultiParameter internal exclusion rules, works on command line, not supported on gui
# for attribute_value in self.__dict__.values():
# if issubclass(attribute_value.__class__, MultiParameter) or issubclass(attribute_value.__class__, MultiParameterList):
......@@ -430,8 +429,6 @@ class Workflow(threading.Thread):
def delete(self):
if self.get_status() in [self.STATUS_COMPLETED, self.STATUS_FAILED, self.STATUS_ABORTED]:
# first close the open files
self.stderrfh.close()
utils.robust_rmtree(self.directory)
@staticmethod
......@@ -528,48 +525,51 @@ class Workflow(threading.Thread):
return pretty_str
def get_errors(self):
error = {
"title" : "",
"msg" : list(),
"traceback" : list()
}
line_idx = 0
FH_stderr = open( self.stderr )
lines = FH_stderr.readlines()
while line_idx < len(lines):
if lines[line_idx].strip().startswith("##"):
error["title"] = lines[line_idx].rstrip()
error["msg"] = list()
error["traceback"] = list()
# skip all lines before the traceback
while not lines[line_idx].startswith("Traceback"):
line_idx += 1
# skip : "Traceback (most recent call last):"
line_idx += 1
while lines[line_idx] != lines[line_idx].lstrip():
error["traceback"].append({
"location" : lines[line_idx].strip(),
"line" : lines[line_idx].strip()
})
line_idx += 2
# Error message
while line_idx < len(lines) and not lines[line_idx].strip().startswith("##"):
try:
error["msg"].append( lines[line_idx].strip().split(":", 1)[1][1:] )
except:
error["msg"].append( lines[line_idx].strip() )
line_idx += 1
line_idx -= 1
line_idx += 1
FH_stderr.close()
last_stack_location = ""
if len(error["traceback"]) > 0:
last_stack_location = error["traceback"][-1]["location"].strip()
self.status = self.STATUS_FAILED
return { "msg" : error["msg"], "location" : last_stack_location }
if os.path.isfile(self.stderr):
error = {
"title" : "",
"msg" : list(),
"traceback" : list()
}
line_idx = 0
FH_stderr = open( self.stderr )
lines = FH_stderr.readlines()
while line_idx < len(lines):
if lines[line_idx].strip().startswith("##"):
error["title"] = lines[line_idx].rstrip()
error["msg"] = list()
error["traceback"] = list()
# skip all lines before the traceback
while not lines[line_idx].startswith("Traceback"):
line_idx += 1
# skip : "Traceback (most recent call last):"
line_idx += 1
while lines[line_idx] != lines[line_idx].lstrip():
error["traceback"].append({
"location" : lines[line_idx].strip(),
"line" : lines[line_idx].strip()
})
line_idx += 2
# Error message
while line_idx < len(lines) and not lines[line_idx].strip().startswith("##"):
try:
error["msg"].append( lines[line_idx].strip().split(":", 1)[1][1:] )
except:
error["msg"].append( lines[line_idx].strip() )
line_idx += 1
line_idx -= 1
line_idx += 1
FH_stderr.close()
last_stack_location = ""
if len(error["traceback"]) > 0:
last_stack_location = error["traceback"][-1]["location"].strip()
self.status = self.STATUS_FAILED
return { "msg" : error["msg"], "location" : last_stack_location }
else:
return None
else:
return None
def get_outputs_per_components(self):
outputs_files = {}
for current_components in self.components:
......@@ -581,8 +581,6 @@ class Workflow(threading.Thread):
def __setstate__(self, state):
self.__dict__ = state.copy()
self.external_components = self._import_external_components()
self.stderrfh = None
self.stderr = self._set_stderr(True)
threading.Thread.__init__(self, name=self.name)
def __getstate__(self):
......@@ -595,9 +593,6 @@ class Workflow(threading.Thread):
del odict['_Thread__stderr']
if odict.has_key('external_components') :
del odict['external_components']
if odict.has_key('stderrfh') :
self.stderrfh.close()
del odict['stderrfh']
return odict
def set_to_address(self, to_address):
......@@ -644,20 +639,20 @@ class Workflow(threading.Thread):
s.starttls()
s.login(me, fromp)
except smtplib.SMTPHeloError:
logging.getLogger("wf." + str(self.id)).debug("The server didn't reply properly to the HELO greeting.")
self._log("The server didn't reply properly to the HELO greeting.", level="debug", traceback=traceback.format_exc())
except smtplib.SMTPAuthenticationError:
logging.getLogger("wf." + str(self.id)).debug("The server didn't accept the username/password combination.")
self._log("The server didn't accept the username/password combination.", level="debug", traceback=traceback.format_exc())
except smtplib.SMTPException:
logging.getLogger("wf." + str(self.id)).debug("No suitable authentication method was found, or the server does not support the STARTTLS extension.")
self._log("No suitable authentication method was found, or the server does not support the STARTTLS extension.", level="debug", traceback=traceback.format_exc())
except RuntimeError:
logging.getLogger("wf." + str(self.id)).debug("SSL/TLS support is not available to your Python interpreter.")
self._log("SSL/TLS support is not available to your Python interpreter.", level="debug", traceback=traceback.format_exc())
except:
logging.getLogger("wf." + str(self.id)).debug("Unhandled error when sending mail.")
self._log("Unhandled error when sending mail.", level="debug", traceback=traceback.format_exc())
finally:
s.sendmail(me, [you], msg.as_string())
s.close()
except:
logging.getLogger("wf." + str(self.id)).debug("Impossible to connect to smtp server '" + smtps + "'")
self._log("Impossible to connect to smtp server '" + smtps + "'", level="debug", traceback=traceback.format_exc())
def get_parameters_per_groups(self):
name = self.get_name()
......@@ -862,7 +857,7 @@ class Workflow(threading.Thread):
self._serialize()
self._send_email()
except Exception as e:
logging.getLogger("wf." + str(self.id)).exception(str(e))
self._log(str(e), traceback=traceback.format_exc())
utils.display_error_message(str(e))
def _update_status_from_log(self):
......@@ -911,7 +906,7 @@ class Workflow(threading.Thread):
if not symbols: return None
for n in log.nodes:
if not n.symbol: continue
if n.symbol == component_nameid:
if n.symbol.replace('"', '') == component_nameid:
status["tasks"] += 1
status["time"] += n.elapsed_time
if n.state == Node.WAITING:
......@@ -961,23 +956,28 @@ class Workflow(threading.Thread):
sys.stdout.write('{0:>10} {1:>10} {2}\n'.format('', 'COMMAND', node.command))
def _set_stderr(self, keep_same_error_file=False):
# if it is specified to keep the same error file, keep with it
if not keep_same_error_file:
if hasattr(self, "stderr") and self.stderr is not None:
os.rename( self.stderr, os.path.join(self.directory, str(time.time()) + self.STDERR_FILE_NAME + self.OLD_EXTENSION) )
def _set_stderr(self):
if hasattr(self, "stderr") and self.stderr is not None and os.path.isfile(self.stderr):
os.rename( self.stderr, os.path.join(self.directory, str(time.time()) + "_" + self.STDERR_FILE_NAME + self.OLD_EXTENSION) )
stderr = os.path.join(self.directory, self.STDERR_FILE_NAME)
logger = logging.getLogger( "wf." + str(self.id) )
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter( '## %(asctime)s :: %(message)s' )
if self.stderrfh is None:
self.stderrfh = FileHandler(stderr, 'a')
self.stderrfh.setLevel(logging.ERROR)
self.stderrfh.setFormatter(formatter)
logger.addHandler(self.stderrfh)
return stderr
def _log(self, msg, level="exception", raisee=False, traceback=None):
if level == "exception":
logging.getLogger("wf." + str(self.id)).exception(msg)
elif level == "debug":
logging.getLogger("wf." + str(self.id)).debug(msg)
logh = open(self.stderr, "a")
today = ddate.today()
logh.write("## " + today.strftime("%c") + " :: " + msg + "\n")
if traceback: logh.write(traceback)
logh.close()
if raisee:
raise Exception(msg)
def _execute_weaver(self, engine_wrapper=None):
# Add nest path and path to script to Python module path to allow
# for importing modules outside of $PYTHONPATH
......@@ -1060,7 +1060,7 @@ class Workflow(threading.Thread):
if issubclass(obj, jflow.component.Component) and obj.__name__ != jflow.component.Component.__name__:
pckge[class_name] = modname
except Exception as e:
logging.getLogger("wf." + str(self.id)).debug("Component <{0}> cannot be loaded: {1}".format(modname, e))
self._log("Component <{0}> cannot be loaded: {1}".format(modname, e), level="debug", traceback=traceback.format_exc())
# finally import workflows shared packages
workflows_dir = os.path.dirname(os.path.dirname(inspect.getfile(self.__class__)))
for importer, modname, ispkg in pkgutil.iter_modules([os.path.join(workflows_dir, "components")], "workflows.components."):
......@@ -1070,7 +1070,7 @@ class Workflow(threading.Thread):
if issubclass(obj, jflow.component.Component) and obj.__name__ != jflow.component.Component.__name__:
pckge[class_name] = modname
except Exception as e:
logging.getLogger("wf." + str(self.id)).debug("Component <{0}> cannot be loaded: {1}".format(modname, e))
self._log("Component <{0}> cannot be loaded: {1}".format(modname, e), level="debug", traceback=traceback.format_exc())
return pckge
def _import_external_components(self):
......@@ -1085,7 +1085,7 @@ class Workflow(threading.Thread):
if issubclass(obj, jflow.extparser.ExternalParser) and obj.__name__ != jflow.extparser.ExternalParser.__name__:
parsers.append(obj())
except Exception as e:
logging.getLogger("wf." + str(self.id)).debug("Parser <{0}> cannot be loaded: {1}".format(modname, e))
self._log("Parser <{0}> cannot be loaded: {1}".format(modname, e), level="debug", traceback=traceback.format_exc())
for parser in parsers :
# import from pipeline components package ...
......
......@@ -91,18 +91,18 @@ class Makeflow(Engine):
# if a symbol is provided
if symbol:
self.dag_file.write('\t@SYMBOL="' + symbol+'"\n')
self.dag_file.write('@SYMBOL="' + symbol+'"\n')
# Write environmental variables
if options.local:
self.dag_file.write('\t@BATCH_LOCAL=1\n')
self.dag_file.write('@BATCH_LOCAL=1\n')
if options.batch:
self.dag_file.write('\t@BATCH_OPTIONS={0}\n'.format(options.batch))
self.dag_file.write('@BATCH_OPTIONS={0}\n'.format(options.batch))
if options.collect:
self.dag_file.write('\t@_MAKEFLOW_COLLECT_LIST+={0}\n'.format(
self.dag_file.write('@_MAKEFLOW_COLLECT_LIST+={0}\n'.format(
' '.join(map(str, options.collect))))
for k, v in options.environment.items():
self.dag_file.write('\t@{0}={1}\n'.format(k, v))
self.dag_file.write('@{0}={1}\n'.format(k, v))
# Write task command
self.dag_file.write('\t{0}\n'.format(command))
......
This diff is collapsed.
......@@ -42,7 +42,7 @@ def inserts_metrics(bam_file, pairs_count_file, metrics_file, hist_file, log_fil
if properly_paired_nb > 0 :
# Process inserts sizes metrics
command = Popen( ["-c", "java -Xmx1g -jar " + collectinsertsizemetrics_path + options + " HISTOGRAM_FILE=" + hist_file + " INPUT=" + bam_file + " OUTPUT=" + metrics_file + " 2> " + log_file], shell=True, stdout=PIPE, stderr=PIPE )
command = Popen( ["-c", "java -Xmx1g -jar " + collectinsertsizemetrics_path + " " +options + " HISTOGRAM_FILE=" + hist_file + " INPUT=" + bam_file + " OUTPUT=" + metrics_file + " 2> " + log_file], shell=True, stdout=PIPE, stderr=PIPE )
stdout, stderr = command.communicate()
# Count nb pairs in bam file
command = Popen( ["-c", samtools_path + " view -F384 " + bam_file + " | wc -l"], shell=True, stdout=PIPE, stderr=PIPE) # First read in pair
......@@ -86,7 +86,7 @@ class InsertsSizes (Analysis):
self.name = "InsertsSizes"
self.description = "Insert size statistics"
self.software = "Picards tools - Insert size"
self.options = " HISTOGRAM_WIDTH="+str(self.histogram_width)+" VALIDATION_STRINGENCY="+self.validation_stringency+" MINIMUM_PCT="+str(self.minimum_pct)
self.options = "HISTOGRAM_WIDTH="+str(self.histogram_width)+" VALIDATION_STRINGENCY="+self.validation_stringency+" MINIMUM_PCT="+str(self.minimum_pct)
def post_process(self):
nb_omitted_samples = 0
......@@ -129,7 +129,7 @@ class InsertsSizes (Analysis):
def process(self):
for i in range(len(self.bam_files)):
insertsSizes = PythonFunction(inserts_metrics, cmd_format="{EXE} {IN} {OUT} {ARG}")
insertsSizes(inputs=self.bam_files[i], outputs=[self.pairs_count_files[i], self.info_files[i], self.hist_files[i], self.log_files[i]], arguments=[self.get_exec_path("samtools"), self.get_exec_path("CollectInsertSizeMetrics"), "'"+self.options+"'"])
insertsSizes(inputs=self.bam_files[i], outputs=[self.pairs_count_files[i], self.info_files[i], self.hist_files[i], self.log_files[i]], arguments=[self.get_exec_path("samtools"), self.get_exec_path("CollectInsertSizeMetrics"), '"'+self.options+'"'])
def parse_pairs_count_file(self, input_file):
"""
......
......@@ -85,5 +85,5 @@ class AddSamplesNames (Component):
def process(self):
# Rename files
for file_idx in range( len(self.input_fasta) ):
rename = PythonFunction( add_sample_name, cmd_format="{EXE} " + self.samples_names[file_idx] + " '" + str(self.sequence_name_sep) + "' '" + str(self.sequence_count_sep) + "' {IN} {OUT} 2>> " + self.stderr )
rename = PythonFunction( add_sample_name, cmd_format='{EXE} ' + self.samples_names[file_idx] + ' "' + str(self.sequence_name_sep) + '" "' + str(self.sequence_count_sep) + '" {IN} {OUT} 2>> ' + self.stderr )
rename( inputs=self.input_fasta[file_idx], outputs=self.output_files[file_idx] )
\ No newline at end of file
#
# Copyright (C) 2014 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
from jflow.component import Component
from jflow.iotypes import OutputFile, OutputFileList, InputFile, InputFileList, Formats
from weaver.function import ShellFunction
from weaver.abstraction import Map
class Blast (Component):
"""
@summary : Launch the NCBI Blast on database(s).