Commit 94cef2b9 authored by Jerome Mariette's avatar Jerome Mariette
Browse files

make jflow able to send emails

parent 6f470e58
......@@ -27,7 +27,6 @@ except ImportError:
from jflow.workflows_manager import WorkflowsManager
from jflow.workflow import Workflow
import jflow.utils as utils
from cctools.util import time_format
class JflowArgumentParser (argparse.ArgumentParser):
def _read_args_from_files(self, arg_strings):
......@@ -55,67 +54,6 @@ class JflowArgumentParser (argparse.ArgumentParser):
# return the modified argument list
return new_arg_strings
def display_workflow_status(workflow, detailed=False, display_errors=False):
if workflow.start_time: start_time = time.asctime(time.localtime(workflow.start_time))
else: start_time = "-"
if workflow.start_time and workflow.end_time: elapsed_time = str(workflow.end_time-workflow.start_time)
elif workflow.start_time: elapsed_time = str(time.time()-workflow.start_time)
else: elapsed_time = "-"
if workflow.end_time: end_time = time.asctime(time.localtime(workflow.end_time))
else: end_time = "-"
if detailed:
# Global
title = "Workflow #" + utils.get_nb_string(workflow.id) + " (" + workflow.name + ") is " + \
workflow.get_status() + ", time elapsed: " + str(elapsed_time) + " (from " + start_time + \
" to " + end_time + ")"
worflow_errors = ""
error = workflow.get_errors()
if error is not None:
worflow_errors = "Workflow Error :\n \033[91m" + error["location"] + "\n " + "\n ".join(error["msg"]) + "\033[0m"
# By components
components_errors = ""
status = "Components Status :\n"
for i, component in enumerate(workflow.get_components_nameid()):
status_info = workflow.get_component_status(component)
try: perc_waiting = (status_info["waiting"]*100.0)/status_info["tasks"]
except: perc_waiting = 0
try: perc_running = (status_info["running"]*100.0)/status_info["tasks"]
except: perc_running = 0
try: perc_failed = (status_info["failed"]*100.0)/status_info["tasks"]
except: perc_failed = 0
try: perc_aborted = (status_info["aborted"]*100.0)/status_info["tasks"]
except: perc_aborted = 0
try: perc_completed = (status_info["completed"]*100.0)/status_info["tasks"]
except: perc_completed = 0
if status_info["running"] > 0: running = "\033[94mrunning:" + str(status_info["running"]) + "\033[0m"
else: running = "running:" + str(status_info["running"])
if status_info["waiting"] > 0: waiting = "\033[93mwaiting:" + str(status_info["waiting"]) + "\033[0m"
else: waiting = "waiting:" + str(status_info["waiting"])
if status_info["failed"] > 0: failed = "\033[91mfailed:" + str(status_info["failed"]) + "\033[0m"
else: failed = "failed:" + str(status_info["failed"])
if status_info["aborted"] > 0: aborted = "\033[95maborted:" + str(status_info["aborted"]) + "\033[0m"
else: aborted = "aborted:" + str(status_info["aborted"])
if status_info["completed"] == status_info["tasks"] and status_info["completed"] > 0: completed = "\033[92mcompleted:" + str(status_info["completed"]) + "\033[0m"
else: completed = "completed:" + str(status_info["completed"])
if display_errors and len(status_info["failed_commands"]) > 0:
if components_errors == "" :
components_errors = "Failed Commands :\n"
components_errors += " - " + component + " :\n " + "\n ".join(status_info["failed_commands"])
status += " - " + component + ", time elapsed " + time_format(status_info["time"]) + \
" (total:" + str(status_info["tasks"]) + ", " + waiting + ", " + running + ", " + failed + \
", " + aborted + ", " + completed + ")"
if i<len(workflow.get_components_nameid())-1: status += "\n"
# Format str
pretty_str = title
pretty_str += ("\n" + worflow_errors) if worflow_errors != "" else ""
pretty_str += ("\n" + status) if status != "" else ""
pretty_str += ("\n" + components_errors) if components_errors != "" else ""
return pretty_str
else:
return utils.get_nb_string(workflow.id) + "\t" + workflow.name + "\t" + workflow.get_status() + \
"\t" + elapsed_time + "\t" + start_time + "\t" + end_time
if __name__ == '__main__':
......@@ -198,13 +136,13 @@ if __name__ == '__main__':
elif args["cmd_object"] == "status":
if args["workflow_id"]:
workflow = wfmanager.get_workflow(args["workflow_id"])
print display_workflow_status(workflow, True, args["display_errors"])
print workflow.get_status_under_text_format(True, args["display_errors"])
else:
workflows = wfmanager.get_workflows()
if len(workflows) > 0:
status = "ID\tNAME\tSTATUS\tELAPSED_TIME\tSTART_TIME\tEND_TIME\n"
for i, workflow in enumerate(workflows):
status += display_workflow_status(workflow)
status += workflow.get_status_under_text_format()
if i<len(workflows)-1: status += "\n"
else: status = "no workflow available"
print status
......@@ -88,7 +88,20 @@ class JFlowConfigReader(object):
return [self.reader.get("global", "server_socket_host"), int(self.reader.get("global", "server_socket_port"))]
except:
return ["127.0.0.1", 8080]
def get_email_options(self):
try: smtps = self.reader.get("email", "smtp_server")
except: smtps = None
try: froma = self.reader.get("email", "from_address")
except: froma = None
try: toa = self.reader.get("email", "to_address")
except: toa = None
try: subject = self.reader.get("email", "subject")
except: subject = None
try: message = self.reader.get("email", "message")
except: message = None
return [smtps, froma, toa, subject, message]
def get_component_batch_options(self, component_class):
try:
return self.reader.get("components", component_class+".batch_options")
......
......@@ -15,6 +15,17 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import re
import smtplib
import socket
try:
import DNS
ServerError = DNS.ServerError
except:
DNS = None
class ServerError(Exception): pass
def which(program):
"""
Return if the asked program exist in the user path
......@@ -72,4 +83,94 @@ def get_argument_pattern( list, start_number=1 ):
arg_pattern += ' ${' + str(next_number) + '}'
next_number += 1
return [arg_pattern, next_number]
\ No newline at end of file
return [arg_pattern, next_number]
# All we are really doing is comparing the input string to one
# gigantic regular expression. But building that regexp, and
# ensuring its correctness, is made much easier by assembling it
# from the "tokens" defined by the RFC. Each of these tokens is
# tested in the accompanying unit test file.
#
# The section of RFC 2822 from which each pattern component is
# derived is given in an accompanying comment.
#
# (To make things simple, every string below is given as 'raw',
# even when it's not strictly necessary. This way we don't forget
# when it is necessary.)
#
WSP = r'[ \t]' # see 2.2.2. Structured Header Field Bodies
CRLF = r'(?:\r\n)' # see 2.2.3. Long Header Fields
NO_WS_CTL = r'\x01-\x08\x0b\x0c\x0f-\x1f\x7f' # see 3.2.1. Primitive Tokens
QUOTED_PAIR = r'(?:\\.)' # see 3.2.2. Quoted characters
FWS = r'(?:(?:' + WSP + r'*' + CRLF + r')?' + \
WSP + r'+)' # see 3.2.3. Folding white space and comments
CTEXT = r'[' + NO_WS_CTL + \
r'\x21-\x27\x2a-\x5b\x5d-\x7e]' # see 3.2.3
CCONTENT = r'(?:' + CTEXT + r'|' + \
QUOTED_PAIR + r')' # see 3.2.3 (NB: The RFC includes COMMENT here
# as well, but that would be circular.)
COMMENT = r'\((?:' + FWS + r'?' + CCONTENT + \
r')*' + FWS + r'?\)' # see 3.2.3
CFWS = r'(?:' + FWS + r'?' + COMMENT + ')*(?:' + \
FWS + '?' + COMMENT + '|' + FWS + ')' # see 3.2.3
ATEXT = r'[\w!#$%&\'\*\+\-/=\?\^`\{\|\}~]' # see 3.2.4. Atom
ATOM = CFWS + r'?' + ATEXT + r'+' + CFWS + r'?' # see 3.2.4
DOT_ATOM_TEXT = ATEXT + r'+(?:\.' + ATEXT + r'+)*' # see 3.2.4
DOT_ATOM = CFWS + r'?' + DOT_ATOM_TEXT + CFWS + r'?' # see 3.2.4
QTEXT = r'[' + NO_WS_CTL + \
r'\x21\x23-\x5b\x5d-\x7e]' # see 3.2.5. Quoted strings
QCONTENT = r'(?:' + QTEXT + r'|' + \
QUOTED_PAIR + r')' # see 3.2.5
QUOTED_STRING = CFWS + r'?' + r'"(?:' + FWS + \
r'?' + QCONTENT + r')*' + FWS + \
r'?' + r'"' + CFWS + r'?'
LOCAL_PART = r'(?:' + DOT_ATOM + r'|' + \
QUOTED_STRING + r')' # see 3.4.1. Addr-spec specification
DTEXT = r'[' + NO_WS_CTL + r'\x21-\x5a\x5e-\x7e]' # see 3.4.1
DCONTENT = r'(?:' + DTEXT + r'|' + \
QUOTED_PAIR + r')' # see 3.4.1
DOMAIN_LITERAL = CFWS + r'?' + r'\[' + \
r'(?:' + FWS + r'?' + DCONTENT + \
r')*' + FWS + r'?\]' + CFWS + r'?' # see 3.4.1
DOMAIN = r'(?:' + DOT_ATOM + r'|' + \
DOMAIN_LITERAL + r')' # see 3.4.1
ADDR_SPEC = LOCAL_PART + r'@' + DOMAIN # see 3.4.1
# A valid address will match exactly the 3.4.1 addr-spec.
VALID_ADDRESS_REGEXP = '^' + ADDR_SPEC + '$'
def validate_email(email, check_mx=False,verify=False):
"""Indicate whether the given string is a valid email address
according to the 'addr-spec' portion of RFC 2822 (see section
3.4.1). Parts of the spec that are marked obsolete are *not*
included in this test, and certain arcane constructions that
depend on circular definitions in the spec may not pass, but in
general this should correctly identify any email address likely
to be in use as of 2011."""
try:
assert re.match(VALID_ADDRESS_REGEXP, email) is not None
check_mx |= verify
if check_mx:
if not DNS: raise Exception('For check the mx records or check if the email exists you must have installed pyDNS python package')
DNS.DiscoverNameServers()
hostname = email[email.find('@')+1:]
mx_hosts = DNS.mxlookup(hostname)
for mx in mx_hosts:
try:
smtp = smtplib.SMTP()
smtp.connect(mx[1])
if not verify: return True
status, _ = smtp.helo()
if status != 250: continue
smtp.mail('')
status, _ = smtp.rcpt(email)
if status != 250: return False
break
except smtplib.SMTPServerDisconnected: #Server not permits verify user
break
except smtplib.SMTPConnectError:
continue
except (AssertionError, ServerError):
return False
return True
\ No newline at end of file
......@@ -34,9 +34,11 @@ from inspect import getcallargs
import jflow
import jflow.utils as utils
from jflow.utils import validate_email
from jflow.graph import Graph
from jflow.workflows_manager import WorkflowsManager, JFlowConfigReader
from jflow.parameter import *
from cctools.util import time_format
from weaver.script import ABSTRACTIONS
from weaver.script import DATASETS
......@@ -81,6 +83,9 @@ class Workflow(threading.Thread):
self.step = None
self.stderr = None
self.dynamic_component_present = False
self.__to_address = None
self.__subject = None
self.__message = None
self.function = function
self.comp_pckg = self._import_components()
# intruduce --log-verbose to be able to monitor the new version of makeflow >=4.2.2
......@@ -212,7 +217,7 @@ class Workflow(threading.Thread):
flags2exclude.append(paramsexclude.flag)
attribute_value.type.excludes[new_group] = flags2exclude
break
def _set_parameters(self, args):
parameters = self.get_parameters()
for param in parameters:
......@@ -298,6 +303,68 @@ class Workflow(threading.Thread):
for arg in arg_lines:
yield arg
def get_status_under_text_format(self, detailed=False, display_errors=False):
if self.start_time: start_time = time.asctime(time.localtime(self.start_time))
else: start_time = "-"
if self.start_time and self.end_time: elapsed_time = str(self.end_time-self.start_time)
elif self.start_time: elapsed_time = str(time.time()-self.start_time)
else: elapsed_time = "-"
if self.end_time: end_time = time.asctime(time.localtime(self.end_time))
else: end_time = "-"
if detailed:
# Global
title = "Workflow #" + utils.get_nb_string(self.id) + " (" + self.name + ") is " + \
self.get_status() + ", time elapsed: " + str(elapsed_time) + " (from " + start_time + \
" to " + end_time + ")"
worflow_errors = ""
error = self.get_errors()
if error is not None:
worflow_errors = "Workflow Error :\n \033[91m" + error["location"] + "\n " + "\n ".join(error["msg"]) + "\033[0m"
# By components
components_errors = ""
status = "Components Status :\n"
for i, component in enumerate(self.get_components_nameid()):
status_info = self.get_component_status(component)
try: perc_waiting = (status_info["waiting"]*100.0)/status_info["tasks"]
except: perc_waiting = 0
try: perc_running = (status_info["running"]*100.0)/status_info["tasks"]
except: perc_running = 0
try: perc_failed = (status_info["failed"]*100.0)/status_info["tasks"]
except: perc_failed = 0
try: perc_aborted = (status_info["aborted"]*100.0)/status_info["tasks"]
except: perc_aborted = 0
try: perc_completed = (status_info["completed"]*100.0)/status_info["tasks"]
except: perc_completed = 0
if status_info["running"] > 0: running = "\033[94mrunning:" + str(status_info["running"]) + "\033[0m"
else: running = "running:" + str(status_info["running"])
if status_info["waiting"] > 0: waiting = "\033[93mwaiting:" + str(status_info["waiting"]) + "\033[0m"
else: waiting = "waiting:" + str(status_info["waiting"])
if status_info["failed"] > 0: failed = "\033[91mfailed:" + str(status_info["failed"]) + "\033[0m"
else: failed = "failed:" + str(status_info["failed"])
if status_info["aborted"] > 0: aborted = "\033[95maborted:" + str(status_info["aborted"]) + "\033[0m"
else: aborted = "aborted:" + str(status_info["aborted"])
if status_info["completed"] == status_info["tasks"] and status_info["completed"] > 0: completed = "\033[92mcompleted:" + str(status_info["completed"]) + "\033[0m"
else: completed = "completed:" + str(status_info["completed"])
if display_errors and len(status_info["failed_commands"]) > 0:
if components_errors == "" :
components_errors = "Failed Commands :\n"
components_errors += " - " + component + " :\n " + "\n ".join(status_info["failed_commands"])
status += " - " + component + ", time elapsed " + time_format(status_info["time"]) + \
" (total:" + str(status_info["tasks"]) + ", " + waiting + ", " + running + ", " + failed + \
", " + aborted + ", " + completed + ")"
if i<len(self.get_components_nameid())-1: status += "\n"
# Format str
pretty_str = title
pretty_str += ("\n" + worflow_errors) if worflow_errors != "" else ""
pretty_str += ("\n" + status) if status != "" else ""
pretty_str += ("\n" + components_errors) if components_errors != "" else ""
return pretty_str
else:
return utils.get_nb_string(self.id) + "\t" + self.name + "\t" + self.get_status() + \
"\t" + elapsed_time + "\t" + start_time + "\t" + end_time
def get_errors(self):
error = {
"title" : "",
......@@ -356,6 +423,46 @@ class Workflow(threading.Thread):
del odict['_Thread__stderr']
return odict
def set_to_address(self, to_address):
self.__to_address = to_address
def set_subject(self, subject):
self.__subject = subject
def set_message(self, message):
self.__message = message
def _send_email(self):
import smtplib
from email.mime.text import MIMEText
smtps, froma, toa, subject, message = self.jflow_config_reader.get_email_options()
if self.__to_address: toa = self.__to_address
if self.__subject: subject = self.__subject
if self.__message: message = self.__message
if smtps and froma:
if not toa: toa = froma
if validate_email(froma) and validate_email(toa):
# Open a plain text file for reading. For this example, assume that
# the text file contains only ASCII characters.
# Create a text/plain message
if not message:
message = self.get_status_under_text_format(True, True)
msg = MIMEText(message)
me = froma
you = toa
if not subject: subject = "JFlow - Workflow #" + str(self.id) + " is " + self.status
msg['Subject'] = subject
msg['From'] = me
msg['To'] = you
# Send the message via our own SMTP server, but don't include the
# envelope header.
s = smtplib.SMTP(smtps)
s.sendmail(me, [you], msg.as_string())
s.quit()
def get_parameters_per_groups(self):
name = self.get_name()
description = self.get_description()
......@@ -515,6 +622,7 @@ class Workflow(threading.Thread):
self.status = self.STATUS_FAILED
self.end_time = time.time()
self._serialize()
self._send_email()
raise
self.component_nameids_is_init = True
if self.dynamic_component_present:
......@@ -534,6 +642,7 @@ class Workflow(threading.Thread):
self.status = self.STATUS_FAILED
self.end_time = time.time()
self._serialize()
self._send_email()
raise
if len(self.components_to_exec) > 0:
self._execute_weaver()
......@@ -545,6 +654,7 @@ class Workflow(threading.Thread):
if self.status == self.STATUS_STARTED: self.status = self.STATUS_COMPLETED
self.end_time = time.time()
self._serialize()
self._send_email()
except:
logging.getLogger("wf." + str(self.id)).exception("RunWorflowException")
raise
......
......@@ -154,7 +154,8 @@ class WorkflowsManager(object):
workflow = pickle.load(workflow_dump)
workflow_dump.close()
else:
raise ValueError("Workflow with id " + str(rworkflow_id) + " cannot be retrived!")
sys.stderr.write("Error: Workflow with id " + str(rworkflow_id) + " cannot be retrived!\n")
sys.exit(1)
return workflow
def get_workflow_directory(self, wname, wid):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment