Commit 360e4460 authored by Jerome Mariette's avatar Jerome Mariette
Browse files

use a system of cache when retrieving workflows with status: failed, aborted and completed.

parent 63af9bec
......@@ -148,9 +148,9 @@ if __name__ == '__main__':
elif args["cmd_object"] == "status":
if args["workflow_id"]:
workflow = wfmanager.get_workflow(args["workflow_id"])
print workflow.get_status_under_text_format(True, args["display_errors"])
print Workflow.get_status_under_text_format(workflow, True, args["display_errors"])
else:
workflows = wfmanager.get_workflows()
workflows = wfmanager.get_workflows(use_cache=True)
if len(workflows) > 0:
workflows_by_id, wfids = {}, []
# first sort workflow by ID
......@@ -159,7 +159,7 @@ if __name__ == '__main__':
workflows_by_id[workflow.id] = workflow
status = "ID\tNAME\tSTATUS\tELAPSED_TIME\tSTART_TIME\tEND_TIME\n"
for i, wfid in enumerate(sorted(wfids, reverse=True)):
status += workflows_by_id[wfid].get_status_under_text_format()
status += Workflow.get_status_under_text_format(workflows_by_id[wfid])
if i<len(workflows)-1: status += "\n"
else: status = "no workflow available"
print status
......@@ -351,7 +351,7 @@ class JFlowServer (object):
@jsonify
def get_workflows_status(self, **kwargs):
status = []
workflows = self.wfmanager.get_workflows()
workflows = self.wfmanager.get_workflows(use_cache=True)
for workflow in workflows:
if kwargs.has_key("metadata_filter"):
is_ok = False
......
......@@ -54,8 +54,35 @@ from cctools.makeflow import MakeflowLog
from cctools.makeflow.log import Node
class Workflow(threading.Thread):
class MINIWorkflow(object):
def __init__(self, id, name, description, status, start_time, end_time, metadata,
component_nameids, compts_status, errors):
self.id = id
self.name = name
self.description = description
self.status = status
self.start_time = start_time
self.end_time = end_time
self.metadata = metadata
self.component_nameids = component_nameids
self.compts_status = compts_status
self.errors = errors
def get_components_nameid(self):
return self.component_nameids
def get_component_status(self, component_nameid):
return self.compts_status[component_nameid]
def get_errors(self):
return self.errors
def get_status(self):
return self.status
class Workflow(threading.Thread):
PROPERTIES_FILE_NAME = "workflow.properties"
MAKEFLOW_LOG_FILE_NAME = "Makeflow.makeflowlog"
......@@ -333,31 +360,32 @@ class Workflow(threading.Thread):
def config_parser(arg_lines):
for arg in arg_lines:
yield arg
def get_status_under_text_format(self, detailed=False, display_errors=False, html=False):
if self.start_time: start_time = time.asctime(time.localtime(self.start_time))
@staticmethod
def get_status_under_text_format(workflow, detailed=False, display_errors=False, html=False):
if workflow.start_time: start_time = time.asctime(time.localtime(workflow.start_time))
else: start_time = "-"
if self.start_time and self.end_time: elapsed_time = str(self.end_time-self.start_time)
elif self.start_time: elapsed_time = str(time.time()-self.start_time)
if workflow.start_time and workflow.end_time: elapsed_time = str(workflow.end_time-workflow.start_time)
elif workflow.start_time: elapsed_time = str(time.time()-workflow.start_time)
else: elapsed_time = "-"
elapsed_time = "-" if elapsed_time == "-" else str(datetime.timedelta(seconds=int(str(elapsed_time).split(".")[0])))
if self.end_time: end_time = time.asctime(time.localtime(self.end_time))
if workflow.end_time: end_time = time.asctime(time.localtime(workflow.end_time))
else: end_time = "-"
if detailed:
# Global
title = "Workflow #" + utils.get_nb_string(self.id) + " (" + self.name + ") is " + \
self.get_status() + ", time elapsed: " + str(elapsed_time) + " (from " + start_time + \
title = "Workflow #" + utils.get_nb_string(workflow.id) + " (" + workflow.name + ") is " + \
workflow.get_status() + ", time elapsed: " + str(elapsed_time) + " (from " + start_time + \
" to " + end_time + ")"
worflow_errors = ""
error = self.get_errors()
error = workflow.get_errors()
if error is not None:
if html: worflow_errors = "Workflow Error :\n <span style='color:#ff0000'>" + error["location"] + "\n " + "\n ".join(error["msg"]) + "</span>"
else: worflow_errors = "Workflow Error :\n \033[91m" + error["location"] + "\n " + "\n ".join(error["msg"]) + "\033[0m"
# By components
components_errors = ""
status = "Components Status :\n"
for i, component in enumerate(self.get_components_nameid()):
status_info = self.get_component_status(component)
for i, component in enumerate(workflow.get_components_nameid()):
status_info = workflow.get_component_status(component)
try: perc_waiting = (status_info["waiting"]*100.0)/status_info["tasks"]
except: perc_waiting = 0
try: perc_running = (status_info["running"]*100.0)/status_info["tasks"]
......@@ -397,7 +425,7 @@ class Workflow(threading.Thread):
status += " - " + component + ", time elapsed " + time_format(status_info["time"]) + \
" (total:" + str(status_info["tasks"]) + ", " + waiting + ", " + running + ", " + failed + \
", " + aborted + ", " + completed + ")"
if i<len(self.get_components_nameid())-1: status += "\n"
if i<len(workflow.get_components_nameid())-1: status += "\n"
# Format str
pretty_str = title
pretty_str += ("\n" + worflow_errors) if worflow_errors != "" else ""
......@@ -406,18 +434,18 @@ class Workflow(threading.Thread):
if html: return pretty_str.replace("\n", "<br />")
else: return pretty_str
else:
pretty_str = utils.get_nb_string(self.id) + "\t" + self.name + "\t"
if self.get_status() == self.STATUS_STARTED:
pretty_str = utils.get_nb_string(workflow.id) + "\t" + workflow.name + "\t"
if workflow.get_status() == Workflow.STATUS_STARTED:
pretty_str += "\033[94m"
elif self.get_status() == self.STATUS_COMPLETED:
elif workflow.get_status() == Workflow.STATUS_COMPLETED:
pretty_str += "\033[92m"
elif self.get_status() == self.STATUS_FAILED:
elif workflow.get_status() == Workflow.STATUS_FAILED:
pretty_str += "\033[91m"
elif self.get_status() == self.STATUS_ABORTED:
elif workflow.get_status() == Workflow.STATUS_ABORTED:
pretty_str += "\033[91m"
elif self.get_status() == self.STATUS_RESETED:
elif workflow.get_status() == Workflow.STATUS_RESETED:
pretty_str += "\033[3m"
pretty_str += self.get_status() + "\033[0m"
pretty_str += workflow.get_status() + "\033[0m"
pretty_str += "\t" + elapsed_time + "\t" + start_time + "\t" + end_time
return pretty_str
......@@ -506,7 +534,7 @@ class Workflow(threading.Thread):
# the text file contains only ASCII characters.
# Create a text/plain message
if not message:
message = self.get_status_under_text_format(True, True, True)
message = Workflow.get_status_under_text_format(self, True, True, True)
msg = MIMEText(message, 'html')
me = froma
you = toa
......@@ -798,6 +826,14 @@ class Workflow(threading.Thread):
self.status = self.STATUS_RESETED
self._serialize()
def minimize(self):
compts_status = {}
for i, component in enumerate(self.get_components_nameid()):
compts_status[component] = self.get_component_status(component)
return MINIWorkflow(self.id, self.name, self.description, self.get_status(), self.start_time,
self.end_time, self.metadata, self.get_components_nameid(), compts_status,
self.get_errors())
def makeflow_pretty_print_node(self, dag, node):
sys.stdout.write('{0:>10} {1} {2}\n'.format('NODE', node.id, node.symbol))
......
......@@ -44,15 +44,17 @@ class WorkflowsManager(object):
self.dump_file = os.path.join(self.get_output_directory(), self.WORKFLOWS_DUMP_FILE_NAME)
self.ids_file = os.path.join(self.get_output_directory(), self.IDS_FILE_NAME)
def _add_workflow(self, workflow):
def _dump_workflows(self, workflows):
WorkflowsManager.lock_workflows_file.acquire()
# first load the existing workflows
try:
workflows_dump = pickle.load(open(self.dump_file))
except:
workflows_dump = {}
# then add the new one
workflows_dump[utils.get_nb_string(workflow.id)] = workflow.dump_path
# then add the new ones
for workflow in workflows:
workflows_dump[utils.get_nb_string(workflow.id)] = {"dump_path": workflow.dump_path,
"object": workflow.minimize()}
# and save them
pickle.dump(workflows_dump, open(self.dump_file, "w"))
WorkflowsManager.lock_workflows_file.release()
......@@ -85,11 +87,15 @@ class WorkflowsManager(object):
if hasattr(workflow, "stderr"):
workflow.set_stderr()
workflow.start()
# Update the workflow in the cache
self._dump_workflows([workflow])
return workflow
def reset_workflow_component(self, workflow_id, component_name):
workflow = self.get_workflow(workflow_id)
workflow.reset_component(component_name)
# Update the workflow in the cache
self._dump_workflows([workflow])
return workflow
def run_workflow(self, workflow_class, args, function="process"):
......@@ -101,7 +107,7 @@ class WorkflowsManager(object):
if class_name == workflow_class: workflow = obj(args, self.get_next_id(), function)
workflow.start()
# Add the workflow dump path to the workflows dump
self._add_workflow(workflow)
self._dump_workflows([workflow])
return workflow
def get_workflow_errors(self, workflow_id):
......@@ -111,18 +117,27 @@ class WorkflowsManager(object):
def get_output_directory(self):
return self.config_reader.get_work_directory()
def get_workflows(self):
def get_workflows(self, use_cache=False):
from jflow.workflow import Workflow
workflows = []
try:
workflows_dump = pickle.load(open(self.dump_file))
except:
workflows_dump = {}
updated_workflows = []
for workflow_id in workflows_dump:
try:
workflow_dump = open(workflows_dump[workflow_id], "rb")
workflows.append(pickle.load(workflow_dump))
workflow_dump.close()
except: pass
# is the workflow completed, failed or aborted use the miniworkflow cached
if use_cache and workflows_dump[workflow_id]["object"].get_status() in [Workflow.STATUS_COMPLETED, Workflow.STATUS_FAILED, Workflow.STATUS_ABORTED]:
workflows.append(workflows_dump[workflow_id]["object"])
else:
try:
workflow_dump = open(workflows_dump[workflow_id]["dump_path"], "rb")
workflow = pickle.load(workflow_dump)
workflows.append(workflow)
updated_workflows.append(workflow)
workflow_dump.close()
except: pass
self._dump_workflows(updated_workflows)
return workflows
def get_workflow_by_class(self, workflow_class):
......@@ -142,7 +157,7 @@ class WorkflowsManager(object):
except:
workflows_dump = {}
if workflows_dump.has_key(rworkflow_id):
workflow_dump = open(workflows_dump[rworkflow_id], "rb")
workflow_dump = open(workflows_dump[rworkflow_id]["dump_path"], "rb")
workflow = pickle.load(workflow_dump)
workflow_dump.close()
else:
......@@ -171,4 +186,4 @@ class WorkflowsManager(object):
ifh.close()
finally:
WorkflowsManager.lock_ids_file.release()
return cid
\ No newline at end of file
return cid
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment