Commit f7002fca authored by Frédéric Escudié's avatar Frédéric Escudié
Browse files

GUI manage workflow.run errors.

GUI display workflow execution errors.
Add log file for  workflow execution errors.
parent deb8268b
...@@ -200,18 +200,21 @@ class JFlowServer (object): ...@@ -200,18 +200,21 @@ class JFlowServer (object):
@cherrypy.expose @cherrypy.expose
@jsonify @jsonify
def run_workflow(self, **kwargs): def run_workflow(self, **kwargs):
kwargs_modified = {} try:
for key in kwargs.keys(): kwargs_modified = {}
parts = key.split(JFlowServer.MULTIPLE_TYPE_SPLITER) for key in kwargs.keys():
if len(parts) == 1: parts = key.split(JFlowServer.MULTIPLE_TYPE_SPLITER)
kwargs_modified[key] = kwargs[key] if len(parts) == 1:
elif len(parts) == 2: kwargs_modified[key] = kwargs[key]
if kwargs_modified.has_key(parts[0]): elif len(parts) == 2:
kwargs_modified[parts[0]].append((parts[1], kwargs[key])) if kwargs_modified.has_key(parts[0]):
else: kwargs_modified[parts[0]].append((parts[1], kwargs[key]))
kwargs_modified[parts[0]] = [(parts[1], kwargs[key])] else:
workflow = self.wfmanager.run_workflow(kwargs_modified["workflow_class"], kwargs_modified) kwargs_modified[parts[0]] = [(parts[1], kwargs[key])]
return self.jsonify_workflow_status(workflow, True) workflow = self.wfmanager.run_workflow(kwargs_modified["workflow_class"], kwargs_modified)
return { "status" : 0, "content" : self.jsonify_workflow_status(workflow, True) }
except Exception as err:
return { "status" : 1, "content" : str(err) }
@cherrypy.expose @cherrypy.expose
@jsonify @jsonify
...@@ -300,6 +303,13 @@ class JFlowServer (object): ...@@ -300,6 +303,13 @@ class JFlowServer (object):
status.append(self.jsonify_workflow_status(workflow)) status.append(self.jsonify_workflow_status(workflow))
return status return status
@cherrypy.expose
@jsonify
def get_workflow_errors(self, **kwargs):
print JFlowServer.JFLOW_WDATA
print self.wfmanager.get_workflow_errors(kwargs["workflow_id"])
return self.wfmanager.get_workflow_errors(kwargs["workflow_id"])
@cherrypy.expose @cherrypy.expose
@jsonify @jsonify
def get_workflow_outputs(self, **kwargs): def get_workflow_outputs(self, **kwargs):
......
...@@ -29,6 +29,7 @@ import urllib2 ...@@ -29,6 +29,7 @@ import urllib2
import types import types
import logging import logging
from logging import FileHandler
from ConfigParser import ConfigParser, NoOptionError from ConfigParser import ConfigParser, NoOptionError
from inspect import getcallargs from inspect import getcallargs
from urlparse import urlparse from urlparse import urlparse
...@@ -107,13 +108,46 @@ class Workflow(threading.Thread): ...@@ -107,13 +108,46 @@ class Workflow(threading.Thread):
self.directory = self.manager.get_workflow_directory(self.name, self.id) self.directory = self.manager.get_workflow_directory(self.name, self.id)
if not os.path.isdir(self.directory): if not os.path.isdir(self.directory):
os.makedirs(self.directory, 0751) os.makedirs(self.directory, 0751)
if not hasattr(self, "stderr") or self.stderr is None:
self.set_stderr()
self._serialize() self._serialize()
def set_stderr(self):
if hasattr(self, "stderr") and self.stderr is not None:
os.rename( self.stderr, os.path.join(self.directory, str(time.time()) + "wf_stderr.old") )
stderr = os.path.join(self.directory, "wf_stderr.txt")
logger = logging.getLogger( "wf." + str(self.id) )
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter( '%(asctime)s :: %(message)s' )
file_handler = FileHandler(stderr, 'a')
file_handler.setLevel(logging.ERROR)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
self.stderr = stderr
@staticmethod @staticmethod
def config_parser(arg_lines): def config_parser(arg_lines):
for arg in arg_lines: for arg in arg_lines:
yield arg yield arg
def get_errors(self):
error_msg = list()
last_stacktrace_location = ""
FH_stderr = open( self.stderr )
lines = FH_stderr.readlines()
line_idx = 0
while line_idx < len(lines):
if lines[line_idx].startswith("Traceback"):
line_idx += 1
while lines[line_idx] != lines[line_idx].lstrip():
last_stacktrace_location = lines[line_idx].lstrip()
line_idx += 2
if not lines[line_idx].strip().endswith("RunWorflowException"):
error_msg.append( lines[line_idx].strip() )
line_idx += 1
FH_stderr.close()
return { "msg" : error_msg, "location" : last_stacktrace_location }
def get_outputs_per_components(self, web_path): def get_outputs_per_components(self, web_path):
outputs_files = {} outputs_files = {}
for current_components in self.components: for current_components in self.components:
...@@ -243,53 +277,58 @@ class Workflow(threading.Thread): ...@@ -243,53 +277,58 @@ class Workflow(threading.Thread):
""" """
Only require for Threading Only require for Threading
""" """
# if this is the first time the workflow run try:
if self.step == None : # if this is the first time the workflow run
self.start_time = time.time() if self.step == None :
self.step = 0 self.start_time = time.time()
self.status = self.STATUS_STARTED self.step = 0
self.end_time = None self.status = self.STATUS_STARTED
self._serialize() self.end_time = None
# if pre_processing has not been done yet
if self.step == 0:
self.pre_process()
self.step = 1
self._serialize()
# if collecting components and running workflow has not been done yet
if self.step == 1:
try:
self.wf_execution_wrapper()
except SystemExit, e:
self.status = self.STATUS_FAILED
self.end_time = time.time()
self._serialize() self._serialize()
raise # if pre_processing has not been done yet
self.component_nameids_is_init = True if self.step == 0:
if self.dynamic_component_present: self.pre_process()
self.step = 2 self.step = 1
else: self._serialize()
self._execute_weaver() # if collecting components and running workflow has not been done yet
if self.step == 1:
try:
self.wf_execution_wrapper()
except SystemExit, e:
self.status = self.STATUS_FAILED
self.end_time = time.time()
self._serialize()
raise
self.component_nameids_is_init = True
if self.dynamic_component_present:
self.step = 2
else:
self._execute_weaver()
self.step = 3
self._serialize()
# if the workflow was a dynamic one
if self.step == 2:
try:
self.wf_execution_wrapper()
except SystemExit, e:
self.status = self.STATUS_FAILED
self.end_time = time.time()
self._serialize()
raise
if len(self.components_to_exec) > 0:
self._execute_weaver()
self.step = 3 self.step = 3
self._serialize() self._serialize()
# if the workflow was a dynamic one # if post processing has ne been done yet
if self.step == 2: if self.step == 3:
try: self.post_process()
self.wf_execution_wrapper() if self.status == self.STATUS_STARTED: self.status = self.STATUS_COMPLETED
except SystemExit, e:
self.status = self.STATUS_FAILED
self.end_time = time.time() self.end_time = time.time()
self._serialize() self._serialize()
raise except:
if len(self.components_to_exec) > 0: logging.getLogger("wf." + str(self.id)).exception("RunWorflowException")
self._execute_weaver() raise
self.step = 3
self._serialize()
# if post processing has ne been done yet
if self.step == 3:
self.post_process()
if self.status == self.STATUS_STARTED: self.status = self.STATUS_COMPLETED
self.end_time = time.time()
self._serialize()
def _download_urlfile(self, input): def _download_urlfile(self, input):
try: try:
......
...@@ -76,6 +76,8 @@ class WorkflowsManager(object): ...@@ -76,6 +76,8 @@ class WorkflowsManager(object):
def rerun_workflow(self, workflow_id): def rerun_workflow(self, workflow_id):
workflow = self.get_workflow(workflow_id) workflow = self.get_workflow(workflow_id)
if hasattr(workflow, "stderr"):
workflow.set_stderr()
workflow.start() workflow.start()
def run_workflow(self, workflow_class, args, function="process", parameters_section="parameters"): def run_workflow(self, workflow_class, args, function="process", parameters_section="parameters"):
...@@ -90,6 +92,10 @@ class WorkflowsManager(object): ...@@ -90,6 +92,10 @@ class WorkflowsManager(object):
self._add_workflow(workflow) self._add_workflow(workflow)
return workflow return workflow
def get_workflow_errors(self, workflow_id):
workflow = self.get_workflow(workflow_id)
return workflow.get_errors()
def get_workflow_ouputs(self, web_path, workflow_id): def get_workflow_ouputs(self, web_path, workflow_id):
workflow = self.get_workflow(workflow_id) workflow = self.get_workflow(workflow_id)
return workflow.get_outputs_per_components(web_path) return workflow.get_outputs_per_components(web_path)
...@@ -157,5 +163,4 @@ class WorkflowsManager(object): ...@@ -157,5 +163,4 @@ class WorkflowsManager(object):
ifh.close() ifh.close()
finally: finally:
WorkflowsManager.lock_ids_file.release() WorkflowsManager.lock_ids_file.release()
return cid return cid
\ No newline at end of file
\ No newline at end of file
...@@ -224,8 +224,18 @@ jQuery.validator.addMethod("exclude_required", function(value, element, options) ...@@ -224,8 +224,18 @@ jQuery.validator.addMethod("exclude_required", function(value, element, options)
$.ajax({ $.ajax({
url: $this.options.serverURL + '/run_workflow?' + params + 'callback=?', url: $this.options.serverURL + '/run_workflow?' + params + 'callback=?',
dataType: "json", dataType: "json",
success: function(running_wf) { success: function(data) {
$this.$element.trigger('run', running_wf); // Ajax success
if( data["status"] == 0 ) {
$this.$element.trigger('run', data["content"]);
// Ajax error
} else {
var alert_message = ['<div class="alert alert-error">',
'<strong>Error!</strong>',
'Jflow failed to run workflow <strong>' + data["content"] + '</strong>',
'</div>'].join('\n');
$this.$element.html(alert_message);
}
} }
}); });
}); });
......
...@@ -60,6 +60,32 @@ ...@@ -60,6 +60,32 @@
return workflow; return workflow;
} }
WFStatus.prototype.errorMessage = function() {
var $this = this,
params = "" ;
if (this.options.workflowID) { params = "workflow_id=" + this.options.workflowID + "&"; }
$.ajax({
url: this.options.serverURL + '/get_workflow_errors?'+params+'callback=?',
dataType: "json",
timeout: 20000,
error: function (xhr, ajaxOptions, thrownError) {
var alert_message = ['<div class="alert alert-error">',
'<strong>Error!</strong>',
'Jflow failed to connect to the specified server <strong>' + $this.options.serverURL + '</strong>',
'</div>'].join('\n');
$this.$element.html(alert_message);
},
success: function(data) {
$this.$element.find("#wfstatus_error_panel").hide();
if( data["msg"].length != 0 ) {
$this.$element.find("#wfstatus_error_msg").text( data["msg"].join("<br/>") );
$this.$element.find("#wfstatus_error_location").text( data["location"] );
$this.$element.find("#wfstatus_error_panel").show();
}
}
});
}
WFStatus.prototype.reload = function() { WFStatus.prototype.reload = function() {
if (this.options.forceUsingWorkflow) { if (this.options.forceUsingWorkflow) {
var workflow = this.reformatWorkflowJSON(this.options.forceUsingWorkflow); var workflow = this.reformatWorkflowJSON(this.options.forceUsingWorkflow);
...@@ -97,6 +123,7 @@ ...@@ -97,6 +123,7 @@
var workflow = $this.reformatWorkflowJSON(data); var workflow = $this.reformatWorkflowJSON(data);
$this.$element.html(""); $this.$element.html("");
$.tmpl($this.options.template, {workflow: workflow}).appendTo($this.$element); $.tmpl($this.options.template, {workflow: workflow}).appendTo($this.$element);
$this.errorMessage();
} }
}); });
} }
...@@ -133,6 +160,13 @@ ...@@ -133,6 +160,13 @@
'<dt>Elapsed time</dt>', '<dt>Elapsed time</dt>',
'<dd>${workflow.elapsed_time}</dd>', '<dd>${workflow.elapsed_time}</dd>',
'<br/>', '<br/>',
'<div id="wfstatus_error_panel" class="alert alert-error" style="display:none;">',
' <strong>Error message :</strong><br/>',
' <span id="wfstatus_error_msg"></span><br/>',
' <strong>Error location :</strong><br/>',
' <span id="wfstatus_error_location"></span>',
'</div>',
'<br/>',
'{{each(index, component) workflow.components}}', '{{each(index, component) workflow.components}}',
'<dt>${component.name}</dt>', '<dt>${component.name}</dt>',
'<dd>', '<dd>',
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment