Commit da901539 authored by Penom Nom's avatar Penom Nom

maj jflow workflows

parent 0fc91c0d
This diff is collapsed.
......@@ -27,7 +27,6 @@ except ImportError:
from jflow.workflows_manager import WorkflowsManager
from jflow.workflow import Workflow
import jflow.utils as utils
from cctools.util import time_format
class JflowArgumentParser (argparse.ArgumentParser):
def _read_args_from_files(self, arg_strings):
......@@ -55,50 +54,6 @@ class JflowArgumentParser (argparse.ArgumentParser):
# return the modified argument list
return new_arg_strings
def display_workflow_status(workflow, detailed=False):
if workflow.start_time: start_time = time.asctime(time.localtime(workflow.start_time))
else: start_time = "-"
if workflow.start_time and workflow.end_time: elapsed_time = str(workflow.end_time-workflow.start_time)
elif workflow.start_time: elapsed_time = str(time.time()-workflow.start_time)
else: elapsed_time = "-"
if workflow.end_time: end_time = time.asctime(time.localtime(workflow.end_time))
else: end_time = "-"
if detailed:
status = "Workflow #" + utils.get_nb_string(workflow.id) + " (" + workflow.name + ") is " + \
workflow.get_status() + ", time elapsed: " + str(elapsed_time) + " (from " + start_time + \
" to " + end_time + "):\n"
for i, component in enumerate(workflow.get_components_nameid()):
status_info = workflow.get_component_status(component)
try: perc_waiting = (status_info["waiting"]*100.0)/status_info["tasks"]
except: perc_waiting = 0
try: perc_running = (status_info["running"]*100.0)/status_info["tasks"]
except: perc_running = 0
try: perc_failed = (status_info["failed"]*100.0)/status_info["tasks"]
except: perc_failed = 0
try: perc_aborted = (status_info["aborted"]*100.0)/status_info["tasks"]
except: perc_aborted = 0
try: perc_completed = (status_info["completed"]*100.0)/status_info["tasks"]
except: perc_completed = 0
if status_info["running"] > 0: running = "\033[94mrunning:" + str(status_info["running"]) + "\033[0m"
else: running = "running:" + str(status_info["running"])
if status_info["waiting"] > 0: waiting = "\033[93mwaiting:" + str(status_info["waiting"]) + "\033[0m"
else: waiting = "waiting:" + str(status_info["waiting"])
if status_info["failed"] > 0: failed = "\033[91mfailed:" + str(status_info["failed"]) + "\033[0m"
else: failed = "failed:" + str(status_info["failed"])
if status_info["aborted"] > 0: aborted = "\033[95maborted:" + str(status_info["aborted"]) + "\033[0m"
else: aborted = "aborted:" + str(status_info["aborted"])
if status_info["completed"] == status_info["tasks"] and status_info["completed"] > 0: completed = "\033[92mcompleted:" + str(status_info["completed"]) + "\033[0m"
else: completed = "completed:" + str(status_info["completed"])
status += " - " + component + ", time elapsed " + time_format(status_info["time"]) + \
" (total:" + str(status_info["tasks"]) + ", " + waiting + ", " + running + ", " + failed + \
", " + aborted + ", " + completed + ")"
if i<len(workflow.get_components_nameid())-1: status += "\n"
return status
else:
return utils.get_nb_string(workflow.id) + "\t" + workflow.name + "\t" + workflow.get_status() + \
"\t" + elapsed_time + "\t" + start_time + "\t" + end_time
if __name__ == '__main__':
......@@ -110,21 +65,37 @@ if __name__ == '__main__':
subparsers = parser.add_subparsers(title='Available sub commands')
# Add rerun workflow availability
sub_parser = subparsers.add_parser("rerun", help="rerun a specific workflow")
sub_parser = subparsers.add_parser("rerun", help="Rerun a specific workflow")
sub_parser.add_argument("--workflow-id", type=str, help="Which workflow should be rerun",
required=True, dest="workflow_id")
sub_parser.set_defaults(cmd_object="rerun")
# Add rerun workflow availability
sub_parser = subparsers.add_parser("reset", help="Reset a workflow component")
sub_parser.add_argument("--workflow-id", type=str, help="Which workflow should be used",
required=True, dest="workflow_id")
sub_parser.add_argument("--component-name", type=str, help="Which component should be reseted",
required=True, dest="component_name")
sub_parser.set_defaults(cmd_object="reset")
# Add rerun workflow availability
sub_parser = subparsers.add_parser("execution-graph", help="Display the workflow execution graph")
sub_parser.add_argument("--workflow-id", type=str, help="Which workflow should be considered",
required=True, dest="workflow_id")
sub_parser.set_defaults(cmd_object="execution_graph")
# Add status workflow availability
sub_parser = subparsers.add_parser("status", help="monitor a specific workflow")
sub_parser = subparsers.add_parser("status", help="Monitor a specific workflow")
sub_parser.add_argument("--workflow-id", type=str, help="Which workflow status should be displayed",
default=None, dest="workflow_id")
sub_parser.add_argument("--all", action="store_true", help="Display all workflows status",
default=False, dest="all")
sub_parser.add_argument("--errors", action="store_true", help="Display failed commands",
default=False, dest="display_errors")
sub_parser.set_defaults(cmd_object="status")
# Add available pipelines
wf_instances = wfmanager.get_available_workflows()
wf_instances, wf_methodes = wfmanager.get_available_workflows()
wf_classes = []
for instance in wf_instances:
wf_classes.append(instance.__class__.__name__)
......@@ -132,7 +103,7 @@ if __name__ == '__main__':
sub_parser = subparsers.add_parser(instance.name, help=instance.description, fromfile_prefix_chars='@')
sub_parser.convert_arg_line_to_args = instance.__class__.config_parser
[parameters_groups, parameters_order] = instance.get_parameters_per_groups()
for group in parameters_order:
for group in parameters_order:
if group == "default":
for param in parameters_groups[group]:
sub_parser.add_argument(param.flag, **param.export_to_argparse())
......@@ -152,21 +123,26 @@ if __name__ == '__main__':
pgroup.add_argument(param.flag, **param.export_to_argparse())
sub_parser.set_defaults(cmd_object=instance.__class__.__name__)
args = vars(parser.parse_args())
if args["cmd_object"] in wf_classes:
wfmanager.run_workflow(args["cmd_object"], args)
elif args["cmd_object"] == "rerun":
wfmanager.rerun_workflow(args["workflow_id"])
elif args["cmd_object"] == "reset":
wfmanager.reset_workflow_component(args["workflow_id"], args["component_name"])
elif args["cmd_object"] == "execution_graph":
workflow = wfmanager.get_workflow(args["workflow_id"])
print workflow.get_execution_graph()
elif args["cmd_object"] == "status":
if args["workflow_id"]:
workflow = wfmanager.get_workflow(args["workflow_id"])
print display_workflow_status(workflow, True)
print workflow.get_status_under_text_format(True, args["display_errors"])
else:
workflows = wfmanager.get_workflows()
if len(workflows) > 0:
status = "ID\tNAME\tSTATUS\tELAPSED_TIME\tSTART_TIME\tEND_TIME\n"
for i, workflow in enumerate(workflows):
status += display_workflow_status(workflow)
status += workflow.get_status_under_text_format()
if i<len(workflows)-1: status += "\n"
else: status = "no workflow available"
print status
......@@ -22,6 +22,9 @@ import os
from jflow.config_reader import JFlowConfigReader
# Define some Error classes
class InvalidFormatError(Exception): pass
jflowconf = JFlowConfigReader()
# if log file directory does not exist, create it
......
This diff is collapsed.
......@@ -37,6 +37,8 @@ class JFlowConfigReader(object):
self.reader.read(os.path.join(os.path.dirname(inspect.getfile(self.__class__)), self.CONFIG_FILE_PATH))
def get_tmp_directory(self):
if not os.path.isdir(self.reader.get("storage", "tmp_directory")):
os.makedirs(self.reader.get("storage", "tmp_directory"), 0751)
return self.reader.get("storage", "tmp_directory")
def get_work_directory(self):
......@@ -71,7 +73,14 @@ class JFlowConfigReader(object):
sys.stderr.write("Error: 'makeflow' path connot be retrieved either in the PATH and in the application.properties file!\n")
sys.exit(1)
return exec_path
def get_date_format(self):
try:
date_format = self.reader.get("global", "date_format")
except:
raise Error("Failed when parsing the config file, no parameter date_format!")
return date_format
def get_batch(self):
try:
type = self.reader.get("global", "batch_system_type")
......@@ -86,7 +95,24 @@ class JFlowConfigReader(object):
return [self.reader.get("global", "server_socket_host"), int(self.reader.get("global", "server_socket_port"))]
except:
return ["127.0.0.1", 8080]
def get_email_options(self):
try: smtps = self.reader.get("email", "smtp_server")
except: smtps = None
try: smtpp = self.reader.get("email", "smtp_port")
except: smtpp = None
try: froma = self.reader.get("email", "from_address")
except: froma = None
try: fromp = self.reader.get("email", "from_password")
except: fromp = None
try: toa = self.reader.get("email", "to_address")
except: toa = None
try: subject = self.reader.get("email", "subject")
except: subject = None
try: message = self.reader.get("email", "message")
except: message = None
return [smtps, smtpp, froma, fromp, toa, subject, message]
def get_component_batch_options(self, component_class):
try:
return self.reader.get("components", component_class+".batch_options")
......
This diff is collapsed.
......@@ -14,7 +14,7 @@ class GFF3Record:
self.strand = None
self.phase = None
self.attributes = None
def setAttribute( self, tag, value ):
"""
@summary : Create or replace an attribute tag.
......@@ -43,7 +43,7 @@ class GFF3Record:
self.attributes[cleaned_tag] = cleaned_value
else:
raise "The attibute 'Attributes' is not initialized."
def _attributesToGff( self ):
"""
@summary : Returns a string in GFF3 format attributes field from the GFF3Record.attributes.
......@@ -52,16 +52,16 @@ class GFF3Record:
gff_string = ""
for tag in self.attributes:
gff_string = gff_string + tag + "=" + str(self.attributes[tag]) + ";"
return gff_string[:-1]
def toGff( self ):
"""
@summary : Returns a string in GFF3 format from the GFF3Record object.
@return : [str] the line in GFF3 format.
"""
gff_record = "\t".join( [self.seq_id, self.source, self.type, str(self.start), str(self.end), str(self.score), self.strand, str(self.phase), self._attributesToGff()] )
return gff_record
def attributesToStr( self, tag ):
......@@ -78,8 +78,8 @@ class GFF3Record:
redable_value = readable_value.replace('%3D', '=')
return redable_value
else:
raise "The attibute 'Attributes' is not initialized."
raise "The attibute 'Attributes' is not initialized."
@staticmethod
def _getCleanedAttribute( dirty_value ):
"""
......@@ -97,7 +97,7 @@ class GFF3Record:
cleaned_value = cleaned_value.replace('\t', ' ')
cleaned_value = cleaned_value.replace("'", '')
cleaned_value = cleaned_value.replace('"', '')
return cleaned_value
@staticmethod
......@@ -120,6 +120,8 @@ class GFF3Record:
# Parse attributes
gff_record.attributes = dict()
attributes = "\t".join(line_fields[8:])
if attributes.strip().endswith(";"): # if attributes end with ';'
attributes = attributes.strip()[:-1]
attributes_array = attributes.split(";")
cleaned_attributes = list()
for attribute in attributes_array:
......@@ -144,10 +146,10 @@ class GFF3IO:
self._path = file_path
self._handle = open( file_path, mode )
self._line = 1
def __del__( self ):
self.close()
def __iter__( self ):
for line in self._handle:
line = line.rstrip()
......@@ -161,10 +163,12 @@ class GFF3IO:
"Line content : " + line )
else:
yield gff_record
def close( self ) :
if self._handle is not None:
if hasattr(self, '_handle') and self._handle is not None:
self._handle.close()
self._handle = None
self._line = None
def write( self, gff_record ):
"""
......
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
class Graph(object):
def __init__(self, graph_dict={}):
""" initializes a graph object """
self.__graph_dict = graph_dict
def vertices(self):
""" returns the vertices of a graph """
return list(self.__graph_dict.keys())
def edges(self):
""" returns the edges of a graph """
edges = []
for edge in self.__generate_edges():
edges.append(list(edge))
return edges
def add_vertex(self, vertex):
""" If the vertex "vertex" is not in
self.__graph_dict, a key "vertex" with an empty
list as a value is added to the dictionary.
Otherwise nothing has to be done.
"""
if vertex not in self.__graph_dict:
self.__graph_dict[vertex] = []
def add_edge(self, edge):
""" assumes that edge is of type set, tuple or list;
between two vertices can be multiple edges!
"""
edge = set(edge)
(vertex1, vertex2) = tuple(edge)
if vertex1 in self.__graph_dict:
self.__graph_dict[vertex1].append(vertex2)
else:
self.__graph_dict[vertex1] = [vertex2]
def __generate_edges(self):
""" A static method generating the edges of the
graph "graph". Edges are represented as sets
with one (a loop back to the vertex) or two
vertices
"""
edges = []
for vertex in self.__graph_dict:
for neighbour in self.__graph_dict[vertex]:
if {neighbour, vertex} not in edges:
edges.append({vertex, neighbour})
return edges
def __str__(self):
res = "vertices: "
for k in self.__graph_dict:
res += str(k) + " "
res += "\nedges: "
for edge in self.__generate_edges():
res += str(edge) + " "
return res
This diff is collapsed.
......@@ -15,6 +15,18 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import re
import smtplib
import socket
import math
try:
import DNS
ServerError = DNS.ServerError
except:
DNS = None
class ServerError(Exception): pass
def which(program):
"""
Return if the asked program exist in the user path
......@@ -60,6 +72,34 @@ def get_nb_string(value, length=6):
s_value = zeros + s_value
return s_value
def get_nb_octet(size):
"""
Return the number of bytes: value has to be formated like this: 5Mb, 20Gb ...
"""
octets_link = ["bytes", "Kb", "Mb", "Gb", "Tb", "Pb", "Eb", "Zb"]
unit = size[len(size)-2:len(size)]
pow_val = int(octets_link.index(unit)) * 10
val = pow(2, pow_val)
nb_octet = int(size[:len(size)-2]) * val
return nb_octet
def get_octet_string_representation(size):
"""
Return the string representation of a byte
"""
octets_link = ["bytes", "Kb", "Mb", "Gb", "Tb", "Pb", "Eb", "Zb"]
p = int(math.ceil(float(len(str(size)))/float(3) - float(1)))
pow_needed = p * 10
pow_needed = pow(2, pow_needed)
value = str(float(size)/float(pow_needed))
tmp = value.split(".")
value = tmp[0] + "." + tmp[1][:2]
try:
value = value + " " + octets_link[p]
except:
raise TypeError("In core.common:project_id unexpected input value for size: " + str(size))
return str(value)
def get_argument_pattern( list, start_number=1 ):
"""
Return the argument pattern for arguments files. Ex : with 3 element in list this function returns ["${1} ${2} ${3}", 4].
......@@ -72,4 +112,94 @@ def get_argument_pattern( list, start_number=1 ):
arg_pattern += ' ${' + str(next_number) + '}'
next_number += 1
return [arg_pattern, next_number]
\ No newline at end of file
return [arg_pattern, next_number]
# All we are really doing is comparing the input string to one
# gigantic regular expression. But building that regexp, and
# ensuring its correctness, is made much easier by assembling it
# from the "tokens" defined by the RFC. Each of these tokens is
# tested in the accompanying unit test file.
#
# The section of RFC 2822 from which each pattern component is
# derived is given in an accompanying comment.
#
# (To make things simple, every string below is given as 'raw',
# even when it's not strictly necessary. This way we don't forget
# when it is necessary.)
#
WSP = r'[ \t]' # see 2.2.2. Structured Header Field Bodies
CRLF = r'(?:\r\n)' # see 2.2.3. Long Header Fields
NO_WS_CTL = r'\x01-\x08\x0b\x0c\x0f-\x1f\x7f' # see 3.2.1. Primitive Tokens
QUOTED_PAIR = r'(?:\\.)' # see 3.2.2. Quoted characters
FWS = r'(?:(?:' + WSP + r'*' + CRLF + r')?' + \
WSP + r'+)' # see 3.2.3. Folding white space and comments
CTEXT = r'[' + NO_WS_CTL + \
r'\x21-\x27\x2a-\x5b\x5d-\x7e]' # see 3.2.3
CCONTENT = r'(?:' + CTEXT + r'|' + \
QUOTED_PAIR + r')' # see 3.2.3 (NB: The RFC includes COMMENT here
# as well, but that would be circular.)
COMMENT = r'\((?:' + FWS + r'?' + CCONTENT + \
r')*' + FWS + r'?\)' # see 3.2.3
CFWS = r'(?:' + FWS + r'?' + COMMENT + ')*(?:' + \
FWS + '?' + COMMENT + '|' + FWS + ')' # see 3.2.3
ATEXT = r'[\w!#$%&\'\*\+\-/=\?\^`\{\|\}~]' # see 3.2.4. Atom
ATOM = CFWS + r'?' + ATEXT + r'+' + CFWS + r'?' # see 3.2.4
DOT_ATOM_TEXT = ATEXT + r'+(?:\.' + ATEXT + r'+)*' # see 3.2.4
DOT_ATOM = CFWS + r'?' + DOT_ATOM_TEXT + CFWS + r'?' # see 3.2.4
QTEXT = r'[' + NO_WS_CTL + \
r'\x21\x23-\x5b\x5d-\x7e]' # see 3.2.5. Quoted strings
QCONTENT = r'(?:' + QTEXT + r'|' + \
QUOTED_PAIR + r')' # see 3.2.5
QUOTED_STRING = CFWS + r'?' + r'"(?:' + FWS + \
r'?' + QCONTENT + r')*' + FWS + \
r'?' + r'"' + CFWS + r'?'
LOCAL_PART = r'(?:' + DOT_ATOM + r'|' + \
QUOTED_STRING + r')' # see 3.4.1. Addr-spec specification
DTEXT = r'[' + NO_WS_CTL + r'\x21-\x5a\x5e-\x7e]' # see 3.4.1
DCONTENT = r'(?:' + DTEXT + r'|' + \
QUOTED_PAIR + r')' # see 3.4.1
DOMAIN_LITERAL = CFWS + r'?' + r'\[' + \
r'(?:' + FWS + r'?' + DCONTENT + \
r')*' + FWS + r'?\]' + CFWS + r'?' # see 3.4.1
DOMAIN = r'(?:' + DOT_ATOM + r'|' + \
DOMAIN_LITERAL + r')' # see 3.4.1
ADDR_SPEC = LOCAL_PART + r'@' + DOMAIN # see 3.4.1
# A valid address will match exactly the 3.4.1 addr-spec.
VALID_ADDRESS_REGEXP = '^' + ADDR_SPEC + '$'
def validate_email(email, check_mx=False,verify=False):
"""Indicate whether the given string is a valid email address
according to the 'addr-spec' portion of RFC 2822 (see section
3.4.1). Parts of the spec that are marked obsolete are *not*
included in this test, and certain arcane constructions that
depend on circular definitions in the spec may not pass, but in
general this should correctly identify any email address likely
to be in use as of 2011."""
try:
assert re.match(VALID_ADDRESS_REGEXP, email) is not None
check_mx |= verify
if check_mx:
if not DNS: raise Exception('For check the mx records or check if the email exists you must have installed pyDNS python package')
DNS.DiscoverNameServers()
hostname = email[email.find('@')+1:]
mx_hosts = DNS.mxlookup(hostname)
for mx in mx_hosts:
try:
smtp = smtplib.SMTP()
smtp.connect(mx[1])
if not verify: return True
status, _ = smtp.helo()
if status != 250: continue
smtp.mail('')
status, _ = smtp.rcpt(email)
if status != 250: return False
break
except smtplib.SMTPServerDisconnected: #Server not permits verify user
break
except smtplib.SMTPConnectError:
continue
except (AssertionError, ServerError):
return False
return True
\ No newline at end of file
This diff is collapsed.
......@@ -61,38 +61,64 @@ class WorkflowsManager(object):
pickle.dump(workflows_dump, open(self.dump_file, "w"))
WorkflowsManager.lock_workflows_file.release()
def get_available_workflows(self, parameters_section="parameters"):
wf_instances = []
def get_available_workflows(self, function="process"):
if function.__class__.__name__ == "str":
functions = [function]
else:
functions = set(function)
wf_instances, wf_methodes = [], []
# Load all modules within the workflow module
for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
__import__(modname)
# Search for Workflow classes
for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
if issubclass(obj, jflow.workflow.Workflow) and obj.__name__ != jflow.workflow.Workflow.__name__:
# try to build the workflow
try: wf_instances.append(obj(parameters_section=parameters_section))
except: pass
return wf_instances
for function in functions:
# check if the workflow has the requested methode
for ifunction in inspect.getmembers(obj, predicate=inspect.ismethod):
if ifunction[0] == function:
# try to build the workflow
try:
wf_instances.append(obj(function=function))
wf_methodes.append(function)
except: pass
return [wf_instances, wf_methodes]
def rerun_workflow(self, workflow_id):
workflow = self.get_workflow(workflow_id)
if hasattr(workflow, "stderr"):
workflow.set_stderr()
workflow.start()
return workflow
def reset_workflow_component(self, workflow_id, component_name):
workflow = self.get_workflow(workflow_id)
workflow.reset_component(component_name)
return workflow
def run_workflow(self, workflow_class, args, function="process", parameters_section="parameters"):
def run_workflow(self, workflow_class, args, function="process"):
# Load all modules within the workflow module
for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
__import__(modname)
# Search for Workflow classes
for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
if class_name == workflow_class: workflow = obj(args, self.get_next_id(), function, parameters_section)
if class_name == workflow_class: workflow = obj(args, self.get_next_id(), function)
workflow.start()
# Add the workflow dump path to the workflows dump
self._add_workflow(workflow)
return workflow
def get_workflow_errors(self, workflow_id):
workflow = self.get_workflow(workflow_id)
return workflow.get_errors()
def get_workflow_ouputs(self, workflow_id):
workflow = self.get_workflow(workflow_id)
return workflow.get_outputs_per_components()
def get_output_directory(self):
return self.config_reader.get_work_directory()
def get_workflows(self):
workflows = []
try:
......@@ -128,7 +154,8 @@ class WorkflowsManager(object):
workflow = pickle.load(workflow_dump)
workflow_dump.close()