Commit 9de87188 authored by Jerome Mariette's avatar Jerome Mariette
Browse files

update jflow

parent 5fc1671d
......@@ -16,6 +16,7 @@
#
import os
import re
import sys
import inspect
import tempfile
......@@ -37,7 +38,9 @@ from weaver.function import PythonFunction
class Component(object):
TRACE_FILE_NAME = "trace.txt"
def __init__(self):
self.prefix = "default"
self.params_order = []
......@@ -47,6 +50,19 @@ class Component(object):
if isinstance(self.version, bytes):
self.version = self.version.decode()
self.batch_options = self.config_reader.get_component_batch_options(self.__class__.__name__)
# in case of SGE, parse the cpu and memory parameter
type, options, limit_submission = self.jflow_config_reader.get_batch()
if type.lower() == "sge":
try:
self.__cpu = int(re.match( r'.*-pe\s+(\w+)\s+(\d+)\s?.*', self.batch_options).group(2))
self.__memory = int(re.match( r'.*-l\s+mem=(\d+\S+)\s?.*', self.batch_options))
except: pass
def get_cpu(self):
return self.__cpu
def get_memory(self):
return self.__memory
def is_dynamic(self):
return len(self.get_dynamic_outputs()) != 0
......@@ -543,6 +559,8 @@ class Component(object):
else:
py_function(inputs=new_inputs, outputs=new_outputs, arguments=arguments, includes=includes+includes_in)
self.__write_trace(function.__name__, inputs, outputs, arguments, cmd_format, map, "PythonFunction")
def add_shell_execution(self, source, inputs=[], outputs=[], arguments=[], includes=[],
cmd_format=None, map=False, shell=None, collect=False, local=False):
shell_function = ShellFunction( source, shell=shell, cmd_format=cmd_format )
......@@ -562,3 +580,34 @@ class Component(object):
else :
shell_function( inputs=inputs, outputs=outputs, arguments=arguments, includes=includes )
self.__write_trace(source, inputs, outputs, arguments, cmd_format, map, "Shell")
def __write_trace(self, name, inputs, outputs, arguments, cmd_format, map, type):
trace_fh=open(os.path.join(self.output_directory, Component.TRACE_FILE_NAME), "a")
trace_fh.write("###\n###Execution trace of "+type+": "+name+"\n")
if map :
trace_fh.write("MODE MAP\n")
if cmd_format != "" and type == "Shell":
trace_fh.write("COMMAND: " + str(cmd_format) + "\n")
self.__write_element(trace_fh,"ARGCUMENTS",arguments)
self.__write_element(trace_fh,"INPUTS",inputs)
self.__write_element(trace_fh,"OUTPUTS",outputs)
trace_fh.close()
def __write_element(self,fh, title, element):
to_write=''
if isinstance(element, list):
if len (element)> 0 :
if isinstance(element[0], list):
for i in range(len(element)) :
to_write+="List"+str(i+1)+": \n"
to_write+="\n".join(element[i])+"\n"
else :
to_write+="\n".join(element)+"\n"
else :
to_write+= element+"\n"
if to_write != "" :
fh.write(title+" :\n")
fh.write(to_write)
\ No newline at end of file
......@@ -60,8 +60,8 @@ class UploadFieldStorage(cgi.FieldStorage):
if not os.path.exists( tmp_folder ):
try : os.mkdir(tmp_folder)
except : pass
fh = open(os.path.join(tmp_folder, self.filename), "w+")
fh_name = fh.name
fh = open(os.path.join(tmp_folder, self.filename), "wb+")
fh_name = fh.name
fh.write(self.file.getvalue())
fh.close()
return fh_name
......@@ -227,10 +227,10 @@ class JFlowServer (object):
select = False
if 'filter_groups' in kwargs : filter_groups = kwargs['filter_groups'].split(',')
if 'select' in kwargs : select = kwargs['select'] in ['True', 'true', '1', 1]
wf_instances, wf_methodes = self.wfmanager.get_available_workflows(filter_groups = filter_groups , select = select)
for instance in wf_instances:
parameters, parameters_per_groups, groups = [], {}, ["default"]
parameters, parameters_per_groups, ordered_groups = [], {}, ["default"]
for param in instance.get_parameters():
# if it's a multiple action change the action by the name
if param.action == MiltipleAction:
......@@ -289,13 +289,14 @@ class JFlowServer (object):
if param.group in parameters_per_groups:
parameters_per_groups[param.group].append(hash_param)
else: parameters_per_groups[param.group] = [hash_param]
groups.append(param.group)
if param.group not in ordered_groups:
ordered_groups.append(param.group)
workflows.append({"name": instance.name,
"help": instance.description,
"class": instance.__class__.__name__,
"parameters": parameters,
"parameters_per_groups": parameters_per_groups,
"groups": list(set(groups))})
"groups": ordered_groups})
return workflows
@cherrypy.expose
......
......@@ -63,7 +63,7 @@ class MINIWorkflow(object):
self.id = id
self.name = name
self.description = description
self.status = status
self._status = status
self.start_time = start_time
self.end_time = end_time
self.metadata = metadata
......@@ -84,7 +84,7 @@ class MINIWorkflow(object):
return self.errors
def get_status(self):
return self.status
return self._status
class Workflow(threading.Thread):
......@@ -118,8 +118,8 @@ class Workflow(threading.Thread):
self.makes = {}
self.globals = {}
self.options = Options()
self.status = self.STATUS_STARTED
self.postprocess_status = self.STATUS_PENDING
self._status = self.STATUS_STARTED
self._postprocess_status = self.STATUS_PENDING
self.start_time = None
self.end_time = None
self.__step = None
......@@ -580,7 +580,6 @@ class Workflow(threading.Thread):
last_stack_location = ""
if len(error["traceback"]) > 0:
last_stack_location = error["traceback"][-1]["location"].strip()
self.status = self.STATUS_FAILED
return { "msg" : error["msg"], "location" : last_stack_location }
else:
return None
......@@ -623,11 +622,43 @@ class Workflow(threading.Thread):
def set_message(self, message):
self.__message = message
def _get_cleaned_email_placeholders(self, text):
"""
@summary: Returns the text after replacement of placeholders by the corresponding workflow values (method or attribute).
Placeholders must be an attribute or a method of the workflow between three sharps: ###attribute### or ###method()###.
You can add "|date" after the attribute or the method to convert a timestamp in human readable date.
Examples: ###id### is replaced by wf.id ; ###get_status()### is replaced by wf.get_status() ; ###start_time|date### is rplaced by wf.start_time in date format.
@param text: [str] The text containing placeholders.
@return: [str] The text with placeholders replaced by her real value.
"""
new_text = text
placeholders = re.findall("\#\#\#([^\#]+)\#\#\#", text)
for placeholder in placeholders:
try:
placeholder_value = ""
placeholder_key = placeholder
is_date = False
if placeholder.endswith("|date"):
placeholder_key = placeholder[:-5]
is_date = True
# Get value
if placeholder_key.endswith("()"):
placeholder_value = str(getattr(self, placeholder_key[:-2])())
else:
placeholder_value = str(getattr(self, placeholder_key))
# Apply date format
if is_date:
jflow_date_format = self.jflow_config_reader.get_date_format()
placeholder_value = time.strftime(jflow_date_format + " %H:%M:%S", time.gmtime(float(placeholder_value)))
new_text = new_text.replace("###" + placeholder + "###", placeholder_value)
except:
pass
return new_text
def _send_email(self):
import smtplib
from email.mime.text import MIMEText
from email.mime.text import MIMEText
smtps, smtpp, froma, fromp, toa, subject, message = self.jflow_config_reader.get_email_options()
if self.__to_address: toa = self.__to_address
......@@ -643,10 +674,13 @@ class Workflow(threading.Thread):
# Create a text/plain message
if not message:
message = Workflow.get_status_under_text_format(self, True, True, True)
message = self._get_cleaned_email_placeholders( message )
msg = MIMEText(message, 'html')
me = froma
you = toa
if not subject: subject = "JFlow - Workflow #" + str(self.id) + " is " + self.status
if not subject:
subject = "JFlow - Workflow #" + str(self.id) + " is " + self.get_status()
subject = self._get_cleaned_email_placeholders( subject )
msg['Subject'] = subject
msg['From'] = me
msg['To'] = you
......@@ -659,20 +693,20 @@ class Workflow(threading.Thread):
s.starttls()
s.login(me, fromp)
except smtplib.SMTPHeloError:
self._log("The server didn't reply properly to the HELO greeting.", level="debug", traceback=traceback.format_exc(chain=False))
self._log("The server didn't reply properly to the HELO greeting.", level="warning", traceback=traceback.format_exc(chain=False))
except smtplib.SMTPAuthenticationError:
self._log("The server didn't accept the username/password combination.", level="debug", traceback=traceback.format_exc(chain=False))
self._log("The server didn't accept the username/password combination.", level="warning", traceback=traceback.format_exc(chain=False))
except smtplib.SMTPException:
self._log("No suitable authentication method was found, or the server does not support the STARTTLS extension.", level="debug", traceback=traceback.format_exc(chain=False))
self._log("No suitable authentication method was found, or the server does not support the STARTTLS extension.", level="warning", traceback=traceback.format_exc(chain=False))
except RuntimeError:
self._log("SSL/TLS support is not available to your Python interpreter.", level="debug", traceback=traceback.format_exc(chain=False))
self._log("SSL/TLS support is not available to your Python interpreter.", level="warning", traceback=traceback.format_exc(chain=False))
except:
self._log("Unhandled error when sending mail.", level="debug", traceback=traceback.format_exc(chain=False))
self._log("Unhandled error when sending mail.", level="warning", traceback=traceback.format_exc(chain=False))
finally:
s.sendmail(me, [you], msg.as_string())
s.close()
except:
self._log("Impossible to connect to smtp server '" + smtps + "'", level="debug", traceback=traceback.format_exc(chain=False))
self._log("Impossible to connect to smtp server '" + smtps + "'", level="warning", traceback=traceback.format_exc(chain=False))
def get_parameters_per_groups(self):
name = self.get_name()
......@@ -819,11 +853,11 @@ class Workflow(threading.Thread):
"""
try:
# if this is the first time the workflow run
if self.__step == None :
if self.__step == None:
self.start_time = time.time()
self.__step = 0
self.status = self.STATUS_STARTED
self.postprocess_status = self.STATUS_PENDING
self._status = self.STATUS_STARTED
self._postprocess_status = self.STATUS_PENDING
self.end_time = None
# if some args are provided, let's fill the parameters
self._set_parameters(self.args)
......@@ -835,39 +869,27 @@ class Workflow(threading.Thread):
self._serialize()
# if collecting components and running workflow has not been done yet
if self.__step == 1:
try:
self.reseted_components = []
self.components = []
self.status = self.STATUS_STARTED
self.postprocess_status = self.STATUS_PENDING
self.wf_execution_wrapper()
except SystemExit:
self.status = self.STATUS_FAILED
self.end_time = time.time()
self._serialize()
self._send_email()
raise
self.reseted_components = []
self.components = []
self._status = self.STATUS_STARTED
self._postprocess_status = self.STATUS_PENDING
self._serialize()
self.wf_execution_wrapper()
self.component_nameids_is_init = True
if self.dynamic_component_present:
self.__step = 2
else:
self._execute_weaver()
self.__step = 3
self._serialize()
self._serialize()
# if the workflow was a dynamic one
if self.__step == 2:
try:
self.reseted_components = []
self.components = []
self.status = self.STATUS_STARTED
self.postprocess_status = self.STATUS_PENDING
self.wf_execution_wrapper()
except SystemExit:
self.status = self.STATUS_FAILED
self.end_time = time.time()
self._serialize()
self._send_email()
raise
self.reseted_components = []
self.components = []
self._status = self.STATUS_STARTED
self._postprocess_status = self.STATUS_PENDING
self._serialize()
self.wf_execution_wrapper()
if len(self.components_to_exec) > 0:
self._execute_weaver()
self.__step = 3
......@@ -875,25 +897,43 @@ class Workflow(threading.Thread):
# if post processing has ne been done yet
if self.__step == 3:
try:
self.postprocess_status = self.STATUS_STARTED
self._postprocess_status = self.STATUS_STARTED
self.post_process()
self.postprocess_status = self.STATUS_COMPLETED
self.status = self.STATUS_COMPLETED
self._postprocess_status = self.STATUS_COMPLETED
self._status = self.STATUS_COMPLETED
except:
self.postprocess_status = self.STATUS_FAILED
self._postprocess_status = self.STATUS_FAILED
raise
finally:
self.end_time = time.time()
self._serialize()
self._send_email()
self.end_time = time.time()
self._serialize()
except Exception as e:
self._status = self.STATUS_FAILED
self.end_time = time.time()
if self.__step is not None:
self._serialize()
self._log(str(e), traceback=traceback.format_exc(chain=False))
utils.display_error_message(str(e))
finally:
if self.__step is not None:
self._send_email()
def restart(self):
"""
@summary: Reruns incompleted steps.
@note: This method is asynchrone.
"""
if hasattr(self, "stderr"):
self._set_stderr()
self._status = self.STATUS_STARTED
self._postprocess_status = self.STATUS_PENDING
self.start()
def get_status(self):
# first update the status from weaver folders
# TODO update self.end_time
"""
@summary: Updates and returns self._status.
@return: [STATUS] the workflow status.
"""
try:
working_directory = os.path.join(self.directory, self.WORKING)
make_states = []
......@@ -903,17 +943,15 @@ class Workflow(threading.Thread):
log.parse()
make_states.append(log.state)
if len(self.reseted_components) > 0:
self.status = self.STATUS_RESETED
elif self.STATUS_ABORTED in make_states:
self.status = self.STATUS_ABORTED
elif self.STATUS_FAILED in make_states:
self.status = self.STATUS_FAILED
elif self.postprocess_status == self.STATUS_FAILED:
self.status = self.STATUS_FAILED
elif self.status != self.STATUS_COMPLETED:
self.status = self.STATUS_STARTED
self._status = self.STATUS_RESETED
elif self.STATUS_ABORTED in make_states: # Error in component execution
self._status = self.STATUS_ABORTED
elif self.STATUS_FAILED in make_states: # Error in component execution
self._status = self.STATUS_FAILED
elif self._postprocess_status == self.STATUS_FAILED: # Error in postprocess
self._status = self.STATUS_FAILED
except: pass
return self.status
return self._status
def get_resource(self, resource):
return self.jflow_config_reader.get_resource(resource)
......@@ -977,7 +1015,7 @@ class Workflow(threading.Thread):
if not found:
raise Exception("Impossible to reset component '" + component_name + "'! This one is not part of the workflow!")
self.reseted_components.append(component_name)
self.status = self.STATUS_RESETED
self._status = self.STATUS_RESETED
self._serialize()
def minimize(self):
......@@ -1007,14 +1045,15 @@ class Workflow(threading.Thread):
if level == "exception":
logging.getLogger("wf." + str(self.id)).exception(msg)
logh = open(self.stderr, "a")
today = ddate.today()
logh.write("## " + today.strftime("%c") + " :: " + msg + "\n")
if traceback: logh.write(traceback)
logh.close()
elif level == "debug":
logging.getLogger("wf." + str(self.id)).debug(msg)
logh = open(self.stderr, "a")
today = ddate.today()
logh.write("## " + today.strftime("%c") + " :: " + msg + "\n")
if traceback: logh.write(traceback)
logh.close()
elif level == "warning":
logging.getLogger("wf." + str(self.id)).warning(msg)
if raisee:
raise Exception(msg)
......@@ -1047,7 +1086,7 @@ class Workflow(threading.Thread):
# create the DAG
nest.compile()
except Exception as e:
self.status = self.STATUS_FAILED
self._status = self.STATUS_FAILED
self.end_time = time.time()
self._serialize()
raise
......@@ -1059,7 +1098,7 @@ class Workflow(threading.Thread):
# close dag_file after execution to avoid nfs troubles
nest.dag_file.close()
except:
self.status = self.STATUS_FAILED
self._status = self.STATUS_FAILED
self.end_time = time.time()
self._serialize()
raise
......
......@@ -98,9 +98,7 @@ class WorkflowsManager(object):
def rerun_workflow(self, workflow_id):
workflow = self.get_workflow(workflow_id)
if hasattr(workflow, "stderr"):
workflow._set_stderr()
workflow.start()
workflow.restart()
# Update the workflow in the cache
self._dump_workflows([workflow])
return workflow
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment