Maintenance - Mise à jour mensuelle Lundi 7 Décembre 2021 entre 7h00 et 9h00

Commit 2272c9cd authored by Floreal Cabanettes's avatar Floreal Cabanettes
Browse files

Add print of programs and refactor for outputs and exec graph

parent ce65abbe
......@@ -89,19 +89,13 @@ if __name__ == '__main__':
required=True, dest="workflow_id")
sub_parser.set_defaults(cmd_object="delete")
# Add outputs workflow availability
sub_parser = subparsers.add_parser("outputs", help="Print workflow outputs")
# Add print details workflow availability
sub_parser = subparsers.add_parser("print", help="Print workflow details")
sub_parser.add_argument("what", help="What to print [outputs, outputs_logs, execution_graph, programs]",
metavar="COMMAND", choices=["outputs", "outputs_logs", "execution_graph", "programs"])
sub_parser.add_argument("--workflow-id", type=str, help="Workflow for which display outputs",
required=True, dest="workflow_id")
sub_parser.add_argument("--logs", help="Include logs files", dest="logs", action="store_true", default=False)
sub_parser.set_defaults(logs=False)
sub_parser.set_defaults(cmd_object="outputs")
# Add rerun workflow availability
sub_parser = subparsers.add_parser("execution-graph", help="Display the workflow execution graph")
sub_parser.add_argument("--workflow-id", type=str, help="Which workflow should be considered",
required=True, dest="workflow_id")
sub_parser.set_defaults(cmd_object="execution_graph")
sub_parser.set_defaults(cmd_object="print")
# Add status workflow availability
sub_parser = subparsers.add_parser("status", help="Monitor a specific workflow")
......@@ -169,32 +163,35 @@ if __name__ == '__main__':
wfmanager.delete_workflow(args["workflow_id"])
except Exception as e:
utils.display_error_message(str(e))
elif args["cmd_object"] == "outputs":
try:
elif args["cmd_object"] == "print":
if args["what"] in ["outputs", "outputs_logs"]:
try:
my_workflow = wfmanager.get_workflow(args["workflow_id"])
my_workflow.print_workflow_outputs(args["what"] == "outputs_logs")
except Exception as e:
utils.display_error_message(str(e))
elif args["what"] == "execution_graph":
try:
workflow = wfmanager.get_workflow(args["workflow_id"])
except Exception as e:
utils.display_error_message(str(e))
gr = workflow.get_execution_graph()
inputs, components = [], []
for node in gr.nodes():
if Workflow.INPUTFILE_GRAPH_LABEL in gr.node_attributes(node):
inputs.append(gr.node_attributes(node)[1])
elif Workflow.INPUTFILES_GRAPH_LABEL in gr.node_attributes(node):
inputs.append(gr.node_attributes(node)[1])
elif Workflow.INPUTDIRECTORY_GRAPH_LABEL in gr.node_attributes(node):
inputs.append(gr.node_attributes(node)[1])
elif Workflow.COMPONENT_GRAPH_LABEL in gr.node_attributes(node):
components.append(gr.node_attributes(node)[1])
print(("inputs: ", inputs))
print(("components: ", components))
print(("edges: ", gr.edges()))
elif args["what"] == "programs":
my_workflow = wfmanager.get_workflow(args["workflow_id"])
my_workflow.print_workflow_outputs(args["logs"])
except Exception as e:
utils.display_error_message(str(e))
elif args["cmd_object"] == "execution_graph":
try:
workflow = wfmanager.get_workflow(args["workflow_id"])
except Exception as e:
utils.display_error_message(str(e))
gr = workflow.get_execution_graph()
inputs, components = [], []
for node in gr.nodes():
if Workflow.INPUTFILE_GRAPH_LABEL in gr.node_attributes(node):
inputs.append(gr.node_attributes(node)[1])
elif Workflow.INPUTFILES_GRAPH_LABEL in gr.node_attributes(node):
inputs.append(gr.node_attributes(node)[1])
elif Workflow.INPUTDIRECTORY_GRAPH_LABEL in gr.node_attributes(node):
inputs.append(gr.node_attributes(node)[1])
elif Workflow.COMPONENT_GRAPH_LABEL in gr.node_attributes(node):
components.append(gr.node_attributes(node)[1])
print(("inputs: ", inputs))
print(("components: ", components))
print(("edges: ", gr.edges()))
my_workflow.print_workflow_programs()
elif args["cmd_object"] == "status":
if args["workflow_id"]:
try:
......
......@@ -51,6 +51,7 @@ class Component(object):
self.output_directory = None
self.description = None
self.config_reader = JFlowConfigReader()
self.prg_name = self.get_prg_name()
self.version = self.get_version()
if isinstance(self.version, bytes):
self.version = self.version.decode()
......@@ -474,6 +475,9 @@ class Component(object):
"""
raise NotImplementedError("Either the Component.get_abstraction() function or the Component.process() function has to be implemented!")
def get_prg_name(self):
return self.__class__.__name__
def get_version(self):
"""
Return the tool version, has to be implemented by subclasses
......
......@@ -606,6 +606,25 @@ class Workflow(threading.Thread):
print(component)
idx += 1
def print_workflow_programs(self):
programs = self.get_workflow_programs()
descriptions = self.get_description_per_components()
idx = 1
for component, program in programs.items():
name = component[:component.index(".")]
component = str(idx) + ". "
if component in descriptions and descriptions[component] is not None:
component += descriptions[component]
else:
component += name
component += ":\n"
component += "\t- Program: {0}\n".format(program["name"])
component += "\t- Version: {0}\n".format(program["version"])
component += "\t- Parameters: {0}\n".format(", ".join(program["parameters"])
if len(program["parameters"]) > 0 else "None")
print(component)
idx += 1
def get_description_per_components(self):
dag = self.get_execution_graph()
descriptions = {}
......@@ -616,10 +635,25 @@ class Workflow(threading.Thread):
def get_outputs_per_components(self):
outputs_files = OrderedDict()
for current_components in self.components:
#status = self.get_component_status(current_components.get_nameid())
outputs_files[current_components.get_nameid()] = current_components.get_output_files()
#outputs_files["0"] = status["completed"]
return outputs_files
def get_workflow_programs(self):
programs = OrderedDict()
for current_component in self.components:
version = current_component.get_version()
if version:
program = {
"name": current_component.get_prg_name(),
"version": version,
"parameters": []
}
for param in current_component.params_order:
param_obj = getattr(current_component, param)
if param_obj.cmd_format != "":
program["parameters"].append(param_obj.cmd_format.format(param_obj))
programs[current_component.get_nameid()] = program
return programs
def __setstate__(self, state):
self.__dict__ = state.copy()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment