Commit 4383df24 authored by Jerome Mariette's avatar Jerome Mariette
Browse files

add graph info

parent e099f690
...@@ -132,6 +132,12 @@ if __name__ == '__main__': ...@@ -132,6 +132,12 @@ if __name__ == '__main__':
required=True, dest="workflow_id") required=True, dest="workflow_id")
sub_parser.set_defaults(cmd_object="rerun") sub_parser.set_defaults(cmd_object="rerun")
# Add rerun workflow availability
sub_parser = subparsers.add_parser("execution-graph", help="Display the workflow execution graph")
sub_parser.add_argument("--workflow-id", type=str, help="Which workflow should be considered",
required=True, dest="workflow_id")
# Add status workflow availability # Add status workflow availability
sub_parser = subparsers.add_parser("status", help="monitor a specific workflow") sub_parser = subparsers.add_parser("status", help="monitor a specific workflow")
sub_parser.add_argument("--workflow-id", type=str, help="Which workflow status should be displayed", sub_parser.add_argument("--workflow-id", type=str, help="Which workflow status should be displayed",
...@@ -176,6 +182,8 @@ if __name__ == '__main__': ...@@ -176,6 +182,8 @@ if __name__ == '__main__':
wfmanager.run_workflow(args["cmd_object"], args) wfmanager.run_workflow(args["cmd_object"], args)
elif args["cmd_object"] == "rerun": elif args["cmd_object"] == "rerun":
wfmanager.rerun_workflow(args["workflow_id"]) wfmanager.rerun_workflow(args["workflow_id"])
elif args["cmd_object"] == "execution_graph":
elif args["cmd_object"] == "status": elif args["cmd_object"] == "status":
if args["workflow_id"]: if args["workflow_id"]:
workflow = wfmanager.get_workflow(args["workflow_id"]) workflow = wfmanager.get_workflow(args["workflow_id"])
# Copyright (C) 2012 INRA
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <>.
class Graph(object):
def __init__(self, graph_dict={}):
""" initializes a graph object """
self.__graph_dict = graph_dict
def vertices(self):
""" returns the vertices of a graph """
return list(self.__graph_dict.keys())
def edges(self):
""" returns the edges of a graph """
return self.__generate_edges()
def add_vertex(self, vertex):
""" If the vertex "vertex" is not in
self.__graph_dict, a key "vertex" with an empty
list as a value is added to the dictionary.
Otherwise nothing has to be done.
if vertex not in self.__graph_dict:
self.__graph_dict[vertex] = []
def add_edge(self, edge):
""" assumes that edge is of type set, tuple or list;
between two vertices can be multiple edges!
edge = set(edge)
(vertex1, vertex2) = tuple(edge)
if vertex1 in self.__graph_dict:
self.__graph_dict[vertex1] = [vertex2]
def __generate_edges(self):
""" A static method generating the edges of the
graph "graph". Edges are represented as sets
with one (a loop back to the vertex) or two
edges = []
for vertex in self.__graph_dict:
for neighbour in self.__graph_dict[vertex]:
if {neighbour, vertex} not in edges:
edges.append({vertex, neighbour})
return edges
def __str__(self):
res = "vertices: "
for k in self.__graph_dict:
res += str(k) + " "
res += "\nedges: "
for edge in self.__generate_edges():
res += str(edge) + " "
return res
...@@ -34,6 +34,7 @@ from inspect import getcallargs ...@@ -34,6 +34,7 @@ from inspect import getcallargs
import jflow import jflow
import jflow.utils as utils import jflow.utils as utils
from jflow.graph import Graph
from jflow.workflows_manager import WorkflowsManager, JFlowConfigReader from jflow.workflows_manager import WorkflowsManager, JFlowConfigReader
from jflow.parameter import * from jflow.parameter import *
...@@ -283,6 +284,17 @@ class Workflow(threading.Thread): ...@@ -283,6 +284,17 @@ class Workflow(threading.Thread):
final_sub_args[] = sub_param.default final_sub_args[] = sub_param.default
self.__getattribute__( self.__getattribute__(
def get_execution_graph(self):
graph = Graph()
for cpt in self.components:
for ioparameter in cpt.__dict__.values():
if ( issubclass( ioparameter.__class__, InputFile ) or issubclass( ioparameter.__class__, OutputFile)
or issubclass( ioparameter.__class__, InputFileList) or issubclass( ioparameter.__class__, OutputFileList) ):
print, ioparameter.component_nameid, ioparameter.parent_component_nameid
graph.add_edge({ioparameter.component_nameid, ioparameter.parent_component_nameid})
print graph
def set_stderr(self): def set_stderr(self):
if hasattr(self, "stderr") and self.stderr is not None: if hasattr(self, "stderr") and self.stderr is not None:
os.rename( self.stderr, os.path.join(, str(time.time()) + "wf_stderr.old") ) os.rename( self.stderr, os.path.join(, str(time.time()) + "wf_stderr.old") )
...@@ -104,6 +104,10 @@ class WorkflowsManager(object): ...@@ -104,6 +104,10 @@ class WorkflowsManager(object):
def get_output_directory(self): def get_output_directory(self):
return self.config_reader.get_work_directory() return self.config_reader.get_work_directory()
def get_workflow_execution_graph(self, workflow_id):
workflow = self.get_workflow(workflow_id)
def get_workflows(self): def get_workflows(self):
workflows = [] workflows = []
try: try:
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment