Commit 051a23cb authored by Jerome Mariette's avatar Jerome Mariette
Browse files

implement a different way to handle errors

parent 541c2ced
...@@ -25,7 +25,6 @@ from functools import wraps ...@@ -25,7 +25,6 @@ from functools import wraps
import time import time
import os import os
import argparse import argparse
import logging
from argparse import ArgumentTypeError from argparse import ArgumentTypeError
......
...@@ -26,12 +26,13 @@ import pickle ...@@ -26,12 +26,13 @@ import pickle
import time import time
import threading import threading
import types import types
import logging
import datetime import datetime
import logging
import traceback
from logging import FileHandler
from ConfigParser import ConfigParser, NoOptionError from ConfigParser import ConfigParser, NoOptionError
from inspect import getcallargs from inspect import getcallargs
from datetime import date as ddate
import jflow import jflow
import jflow.utils as utils import jflow.utils as utils
...@@ -116,7 +117,6 @@ class Workflow(threading.Thread): ...@@ -116,7 +117,6 @@ class Workflow(threading.Thread):
self.end_time = None self.end_time = None
self.step = None self.step = None
self.stderr = None self.stderr = None
self.stderrfh = None
self.args = args self.args = args
self.dynamic_component_present = False self.dynamic_component_present = False
self.__to_address = None self.__to_address = None
...@@ -269,8 +269,7 @@ class Workflow(threading.Thread): ...@@ -269,8 +269,7 @@ class Workflow(threading.Thread):
paramsexclude.group = new_group paramsexclude.group = new_group
# it might be a mutliple param rule # it might be a mutliple param rule
else: else:
logging.getLogger("wf." + str(self.__class__.__name__)).exception("Exclusion rule cannot be applied within a MultiParameter or a MultiParameterList") self._log("Exclusion rule cannot be applied within a MultiParameter or a MultiParameterList", raisee=True)
raise
# save this for MultiParameter internal exclusion rules, works on command line, not supported on gui # save this for MultiParameter internal exclusion rules, works on command line, not supported on gui
# for attribute_value in self.__dict__.values(): # for attribute_value in self.__dict__.values():
# if issubclass(attribute_value.__class__, MultiParameter) or issubclass(attribute_value.__class__, MultiParameterList): # if issubclass(attribute_value.__class__, MultiParameter) or issubclass(attribute_value.__class__, MultiParameterList):
...@@ -430,8 +429,6 @@ class Workflow(threading.Thread): ...@@ -430,8 +429,6 @@ class Workflow(threading.Thread):
def delete(self): def delete(self):
if self.get_status() in [self.STATUS_COMPLETED, self.STATUS_FAILED, self.STATUS_ABORTED]: if self.get_status() in [self.STATUS_COMPLETED, self.STATUS_FAILED, self.STATUS_ABORTED]:
# first close the open files
self.stderrfh.close()
utils.robust_rmtree(self.directory) utils.robust_rmtree(self.directory)
@staticmethod @staticmethod
...@@ -528,48 +525,51 @@ class Workflow(threading.Thread): ...@@ -528,48 +525,51 @@ class Workflow(threading.Thread):
return pretty_str return pretty_str
def get_errors(self): def get_errors(self):
error = { if os.path.isfile(self.stderr):
"title" : "", error = {
"msg" : list(), "title" : "",
"traceback" : list() "msg" : list(),
} "traceback" : list()
line_idx = 0 }
FH_stderr = open( self.stderr ) line_idx = 0
lines = FH_stderr.readlines() FH_stderr = open( self.stderr )
while line_idx < len(lines): lines = FH_stderr.readlines()
if lines[line_idx].strip().startswith("##"): while line_idx < len(lines):
error["title"] = lines[line_idx].rstrip() if lines[line_idx].strip().startswith("##"):
error["msg"] = list() error["title"] = lines[line_idx].rstrip()
error["traceback"] = list() error["msg"] = list()
# skip all lines before the traceback error["traceback"] = list()
while not lines[line_idx].startswith("Traceback"): # skip all lines before the traceback
line_idx += 1 while not lines[line_idx].startswith("Traceback"):
# skip : "Traceback (most recent call last):" line_idx += 1
line_idx += 1 # skip : "Traceback (most recent call last):"
while lines[line_idx] != lines[line_idx].lstrip(): line_idx += 1
error["traceback"].append({ while lines[line_idx] != lines[line_idx].lstrip():
"location" : lines[line_idx].strip(), error["traceback"].append({
"line" : lines[line_idx].strip() "location" : lines[line_idx].strip(),
}) "line" : lines[line_idx].strip()
line_idx += 2 })
# Error message line_idx += 2
while line_idx < len(lines) and not lines[line_idx].strip().startswith("##"): # Error message
try: while line_idx < len(lines) and not lines[line_idx].strip().startswith("##"):
error["msg"].append( lines[line_idx].strip().split(":", 1)[1][1:] ) try:
except: error["msg"].append( lines[line_idx].strip().split(":", 1)[1][1:] )
error["msg"].append( lines[line_idx].strip() ) except:
line_idx += 1 error["msg"].append( lines[line_idx].strip() )
line_idx -= 1 line_idx += 1
line_idx += 1 line_idx -= 1
FH_stderr.close() line_idx += 1
last_stack_location = "" FH_stderr.close()
if len(error["traceback"]) > 0: last_stack_location = ""
last_stack_location = error["traceback"][-1]["location"].strip() if len(error["traceback"]) > 0:
self.status = self.STATUS_FAILED last_stack_location = error["traceback"][-1]["location"].strip()
return { "msg" : error["msg"], "location" : last_stack_location } self.status = self.STATUS_FAILED
return { "msg" : error["msg"], "location" : last_stack_location }
else:
return None
else: else:
return None return None
def get_outputs_per_components(self): def get_outputs_per_components(self):
outputs_files = {} outputs_files = {}
for current_components in self.components: for current_components in self.components:
...@@ -581,8 +581,6 @@ class Workflow(threading.Thread): ...@@ -581,8 +581,6 @@ class Workflow(threading.Thread):
def __setstate__(self, state): def __setstate__(self, state):
self.__dict__ = state.copy() self.__dict__ = state.copy()
self.external_components = self._import_external_components() self.external_components = self._import_external_components()
self.stderrfh = None
self.stderr = self._set_stderr(True)
threading.Thread.__init__(self, name=self.name) threading.Thread.__init__(self, name=self.name)
def __getstate__(self): def __getstate__(self):
...@@ -595,9 +593,6 @@ class Workflow(threading.Thread): ...@@ -595,9 +593,6 @@ class Workflow(threading.Thread):
del odict['_Thread__stderr'] del odict['_Thread__stderr']
if odict.has_key('external_components') : if odict.has_key('external_components') :
del odict['external_components'] del odict['external_components']
if odict.has_key('stderrfh') :
self.stderrfh.close()
del odict['stderrfh']
return odict return odict
def set_to_address(self, to_address): def set_to_address(self, to_address):
...@@ -644,20 +639,20 @@ class Workflow(threading.Thread): ...@@ -644,20 +639,20 @@ class Workflow(threading.Thread):
s.starttls() s.starttls()
s.login(me, fromp) s.login(me, fromp)
except smtplib.SMTPHeloError: except smtplib.SMTPHeloError:
logging.getLogger("wf." + str(self.id)).debug("The server didn't reply properly to the HELO greeting.") self._log("The server didn't reply properly to the HELO greeting.", level="debug", traceback=traceback.format_exc())
except smtplib.SMTPAuthenticationError: except smtplib.SMTPAuthenticationError:
logging.getLogger("wf." + str(self.id)).debug("The server didn't accept the username/password combination.") self._log("The server didn't accept the username/password combination.", level="debug", traceback=traceback.format_exc())
except smtplib.SMTPException: except smtplib.SMTPException:
logging.getLogger("wf." + str(self.id)).debug("No suitable authentication method was found, or the server does not support the STARTTLS extension.") self._log("No suitable authentication method was found, or the server does not support the STARTTLS extension.", level="debug", traceback=traceback.format_exc())
except RuntimeError: except RuntimeError:
logging.getLogger("wf." + str(self.id)).debug("SSL/TLS support is not available to your Python interpreter.") self._log("SSL/TLS support is not available to your Python interpreter.", level="debug", traceback=traceback.format_exc())
except: except:
logging.getLogger("wf." + str(self.id)).debug("Unhandled error when sending mail.") self._log("Unhandled error when sending mail.", level="debug", traceback=traceback.format_exc())
finally: finally:
s.sendmail(me, [you], msg.as_string()) s.sendmail(me, [you], msg.as_string())
s.close() s.close()
except: except:
logging.getLogger("wf." + str(self.id)).debug("Impossible to connect to smtp server '" + smtps + "'") self._log("Impossible to connect to smtp server '" + smtps + "'", level="debug", traceback=traceback.format_exc())
def get_parameters_per_groups(self): def get_parameters_per_groups(self):
name = self.get_name() name = self.get_name()
...@@ -862,7 +857,7 @@ class Workflow(threading.Thread): ...@@ -862,7 +857,7 @@ class Workflow(threading.Thread):
self._serialize() self._serialize()
self._send_email() self._send_email()
except Exception as e: except Exception as e:
logging.getLogger("wf." + str(self.id)).exception(str(e)) self._log(str(e), traceback=traceback.format_exc())
utils.display_error_message(str(e)) utils.display_error_message(str(e))
def _update_status_from_log(self): def _update_status_from_log(self):
...@@ -961,23 +956,28 @@ class Workflow(threading.Thread): ...@@ -961,23 +956,28 @@ class Workflow(threading.Thread):
sys.stdout.write('{0:>10} {1:>10} {2}\n'.format('', 'COMMAND', node.command)) sys.stdout.write('{0:>10} {1:>10} {2}\n'.format('', 'COMMAND', node.command))
def _set_stderr(self, keep_same_error_file=False): def _set_stderr(self):
# if it is specified to keep the same error file, keep with it if hasattr(self, "stderr") and self.stderr is not None:
if not keep_same_error_file: os.rename( self.stderr, os.path.join(self.directory, str(time.time()) + self.STDERR_FILE_NAME + self.OLD_EXTENSION) )
if hasattr(self, "stderr") and self.stderr is not None:
os.rename( self.stderr, os.path.join(self.directory, str(time.time()) + self.STDERR_FILE_NAME + self.OLD_EXTENSION) )
stderr = os.path.join(self.directory, self.STDERR_FILE_NAME) stderr = os.path.join(self.directory, self.STDERR_FILE_NAME)
logger = logging.getLogger( "wf." + str(self.id) )
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter( '## %(asctime)s :: %(message)s' )
if self.stderrfh is None:
self.stderrfh = FileHandler(stderr, 'a')
self.stderrfh.setLevel(logging.ERROR)
self.stderrfh.setFormatter(formatter)
logger.addHandler(self.stderrfh)
return stderr return stderr
def _log(self, msg, level="exception", raisee=False, traceback=None):
if level == "exception":
logging.getLogger("wf." + str(self.id)).exception(msg)
elif level == "debug":
logging.getLogger("wf." + str(self.id)).debug(msg)
logh = open(self.stderr, "a")
today = ddate.today()
logh.write("## " + today.strftime("%c") + " :: " + msg + "\n")
if traceback: logh.write(traceback)
logh.close()
if raisee:
raise Exception(msg)
def _execute_weaver(self, engine_wrapper=None): def _execute_weaver(self, engine_wrapper=None):
# Add nest path and path to script to Python module path to allow # Add nest path and path to script to Python module path to allow
# for importing modules outside of $PYTHONPATH # for importing modules outside of $PYTHONPATH
...@@ -1060,7 +1060,7 @@ class Workflow(threading.Thread): ...@@ -1060,7 +1060,7 @@ class Workflow(threading.Thread):
if issubclass(obj, jflow.component.Component) and obj.__name__ != jflow.component.Component.__name__: if issubclass(obj, jflow.component.Component) and obj.__name__ != jflow.component.Component.__name__:
pckge[class_name] = modname pckge[class_name] = modname
except Exception as e: except Exception as e:
logging.getLogger("wf." + str(self.id)).debug("Component <{0}> cannot be loaded: {1}".format(modname, e)) self._log("Component <{0}> cannot be loaded: {1}".format(modname, e), level="debug", traceback=traceback.format_exc())
# finally import workflows shared packages # finally import workflows shared packages
workflows_dir = os.path.dirname(os.path.dirname(inspect.getfile(self.__class__))) workflows_dir = os.path.dirname(os.path.dirname(inspect.getfile(self.__class__)))
for importer, modname, ispkg in pkgutil.iter_modules([os.path.join(workflows_dir, "components")], "workflows.components."): for importer, modname, ispkg in pkgutil.iter_modules([os.path.join(workflows_dir, "components")], "workflows.components."):
...@@ -1070,7 +1070,7 @@ class Workflow(threading.Thread): ...@@ -1070,7 +1070,7 @@ class Workflow(threading.Thread):
if issubclass(obj, jflow.component.Component) and obj.__name__ != jflow.component.Component.__name__: if issubclass(obj, jflow.component.Component) and obj.__name__ != jflow.component.Component.__name__:
pckge[class_name] = modname pckge[class_name] = modname
except Exception as e: except Exception as e:
logging.getLogger("wf." + str(self.id)).debug("Component <{0}> cannot be loaded: {1}".format(modname, e)) self._log("Component <{0}> cannot be loaded: {1}".format(modname, e), level="debug", traceback=traceback.format_exc())
return pckge return pckge
def _import_external_components(self): def _import_external_components(self):
...@@ -1085,7 +1085,7 @@ class Workflow(threading.Thread): ...@@ -1085,7 +1085,7 @@ class Workflow(threading.Thread):
if issubclass(obj, jflow.extparser.ExternalParser) and obj.__name__ != jflow.extparser.ExternalParser.__name__: if issubclass(obj, jflow.extparser.ExternalParser) and obj.__name__ != jflow.extparser.ExternalParser.__name__:
parsers.append(obj()) parsers.append(obj())
except Exception as e: except Exception as e:
logging.getLogger("wf." + str(self.id)).debug("Parser <{0}> cannot be loaded: {1}".format(modname, e)) self._log("Parser <{0}> cannot be loaded: {1}".format(modname, e), level="debug", traceback=traceback.format_exc())
for parser in parsers : for parser in parsers :
# import from pipeline components package ... # import from pipeline components package ...
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment