Commit 59802c76 authored by Jerome Mariette's avatar Jerome Mariette
Browse files

inputs of the workflow are parts of the graph display

parent 19ef3122
......@@ -133,8 +133,18 @@ if __name__ == '__main__':
elif args["cmd_object"] == "execution_graph":
workflow = wfmanager.get_workflow(args["workflow_id"])
gr = workflow.get_execution_graph()
print "nodes: ", gr.nodes()
inputs, components = [], []
for node in gr.nodes():
if Workflow.INPUT_GRAPH_LABEL in gr.node_attributes(node):
inputs.append(node)
elif Workflow.INPUTS_GRAPH_LABEL in gr.node_attributes(node):
inputs.append(node)
elif Workflow.COMPONENT_GRAPH_LABEL in gr.node_attributes(node):
components.append(node)
print "inputs: ", inputs
print "components: ", components
print "edges: ", gr.edges()
elif args["cmd_object"] == "status":
if args["workflow_id"]:
workflow = wfmanager.get_workflow(args["workflow_id"])
......
......@@ -30,8 +30,6 @@ from weaver.util import parse_string_list
class Component(object):
"""
"""
def __init__(self):
self.prefix = "default"
......@@ -72,11 +70,11 @@ class Component(object):
new_param.component_nameid = self.get_nameid()
# store where the parameter is coming from
if issubclass( default.__class__, AbstractOutputFile ):
if issubclass( default.__class__, AbstractIOFile ):
new_param.parent_component_nameid.append(default.component_nameid)
elif issubclass( default.__class__, list ):
for val in default:
if issubclass( val.__class__, AbstractOutputFile ):
if issubclass( val.__class__, AbstractIOFile ):
new_param.parent_component_nameid.append(val.component_nameid)
# if this input should be added to a particular parameter
......@@ -101,11 +99,11 @@ class Component(object):
new_param.component_nameid = self.get_nameid()
# store where the parameter is coming from
if issubclass( default.__class__, AbstractOutputFile ):
if issubclass( default.__class__, AbstractIOFile ):
new_param.parent_component_nameid.append(default.component_nameid)
elif issubclass( default.__class__, list ):
for val in default:
if issubclass( val.__class__, AbstractOutputFile ):
if issubclass( val.__class__, AbstractIOFile ):
new_param.parent_component_nameid.append(val.component_nameid)
# if this input should be added to a particular parameter
......
......@@ -55,19 +55,23 @@ from cctools.makeflow.log import Node
class Workflow(threading.Thread):
"""
"""
PROPERTIES_FILE_NAME = "workflow.properties"
MAKEFLOW_LOG_FILE_NAME = "Makeflow.makeflowlog"
DUMP_FILE_NAME = ".workflow.dump"
WORKING = ".working"
STATUS_STARTED = "started"
STATUS_COMPLETED = "completed"
STATUS_FAILED = "failed"
STATUS_ABORTED = "aborted"
STATUS_RESETED = "reseted"
INPUT_GRAPH_LABEL = "input"
INPUTS_GRAPH_LABEL = "inputs"
COMPONENT_GRAPH_LABEL = "component"
def __init__(self, args={}, id=None, function= "process"):
# define as a thread
......@@ -129,6 +133,7 @@ class Workflow(threading.Thread):
except: size_limit="0"
new_param = InputFile(name, help, flag=flag, file_format=file_format, default=default,
type=type, required=required, group=group, display_name=display_name, size_limit=size_limit)
new_param.component_nameid = name
# if this input should be added to a particular parameter
if add_to:
try:
......@@ -147,6 +152,7 @@ class Workflow(threading.Thread):
except: size_limit="0"
new_param = InputFileList(name, help, flag=flag, file_format=file_format, default=default,
type=type, required=required, group=group, display_name=display_name, size_limit=size_limit)
new_param.component_nameid = name
# if this input should be added to a particular parameter
if add_to:
try:
......@@ -249,11 +255,13 @@ class Workflow(threading.Thread):
new_param = InputFileList( parameter.name, parameter.help, file_format=parameter.file_format, default=parameter.prepare_input_files(value),
type=parameter.type, choices=parameter.choices, required=parameter.required, flag=parameter.flag,
group=parameter.group, display_name=parameter.display_name, size_limit=parameter.size_limit )
new_param.component_nameid = parameter.component_nameid
elif parameter.__class__ == InputFile:
if value == "" : value = None # from GUI
new_param = InputFile( parameter.name, parameter.help, file_format=parameter.file_format, default=parameter.prepare_input_file(value),
type=parameter.type, choices=parameter.choices, required=parameter.required, flag=parameter.flag,
group=parameter.group, display_name=parameter.display_name )
new_param.component_nameid = parameter.component_nameid
else:
raise Exception( "Unknown class '" + parameter.__class__.__name__ + "' for parameter.")
return new_param
......@@ -290,8 +298,16 @@ class Workflow(threading.Thread):
def get_execution_graph(self):
gr = digraph()
for ioparameter in self.__dict__.values():
if issubclass(ioparameter.__class__, InputFile):
gr.add_node(ioparameter.name)
gr.add_node_attribute(ioparameter.name, self.INPUT_GRAPH_LABEL)
elif issubclass(ioparameter.__class__, InputFileList):
gr.add_node(ioparameter.name)
gr.add_node_attribute(ioparameter.name, self.INPUTS_GRAPH_LABEL)
for cpt in self.components:
gr.add_nodes([cpt.get_nameid()])
gr.add_node(cpt.get_nameid())
gr.add_node_attribute(cpt.get_nameid(), self.COMPONENT_GRAPH_LABEL)
for cpt in self.components:
for ioparameter in cpt.__dict__.values():
if ( issubclass( ioparameter.__class__, InputFile ) or issubclass( ioparameter.__class__, InputFileList)):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment