# # Copyright (C) 2015 INRA # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import sys import argparse import time try: import _preamble except ImportError: sys.exc_clear() from jflow.workflows_manager import WorkflowsManager from jflow.workflow import Workflow import jflow.utils as utils class JflowArgumentParser (argparse.ArgumentParser): def _read_args_from_files(self, arg_strings): # expand arguments referencing files new_arg_strings = [] for arg_string in arg_strings: # if it's not a comment or an empty line if not arg_string.startswith("#") and arg_string: # for regular arguments, just add them back into the list if not arg_string or arg_string[0] not in self.fromfile_prefix_chars: new_arg_strings.append(arg_string) # replace arguments referencing files with the file content else: try: with open(arg_string[1:]) as args_file: arg_strings = [] # give to the convert_arg_line_to_args a table of lines instead of line per line for arg in self.convert_arg_line_to_args(args_file.read().splitlines()): arg_strings.append(arg) arg_strings = self._read_args_from_files(arg_strings) new_arg_strings.extend(arg_strings) except OSError: err = _sys.exc_info()[1] self.error(str(err)) # return the modified argument list return new_arg_strings if __name__ == '__main__': # Create a workflow manager to get access to our workflows wfmanager = WorkflowsManager() # Create the top-level parser parser = JflowArgumentParser() subparsers = parser.add_subparsers(title='Available sub commands') # Add rerun workflow availability sub_parser = subparsers.add_parser("rerun", help="Rerun a specific workflow") sub_parser.add_argument("--workflow-id", type=str, help="Which workflow should be rerun", required=True, dest="workflow_id") sub_parser.set_defaults(cmd_object="rerun") # Add rerun workflow availability sub_parser = subparsers.add_parser("reset", help="Reset a workflow component") sub_parser.add_argument("--workflow-id", type=str, help="Which workflow should be used", required=True, dest="workflow_id") sub_parser.add_argument("--component-name", type=str, help="Which component should be reseted", required=True, dest="component_name") sub_parser.set_defaults(cmd_object="reset") # Add delete workflow availability sub_parser = subparsers.add_parser("delete", help="Delete a workflow") sub_parser.add_argument("--workflow-id", type=str, help="Which workflow should be deleted", required=True, dest="workflow_id") sub_parser.set_defaults(cmd_object="delete") # Add rerun workflow availability sub_parser = subparsers.add_parser("execution-graph", help="Display the workflow execution graph") sub_parser.add_argument("--workflow-id", type=str, help="Which workflow should be considered", required=True, dest="workflow_id") sub_parser.set_defaults(cmd_object="execution_graph") # Add status workflow availability sub_parser = subparsers.add_parser("status", help="Monitor a specific workflow") sub_parser.add_argument("--workflow-id", type=str, help="Which workflow status should be displayed", default=None, dest="workflow_id") sub_parser.add_argument("--all", action="store_true", help="Display all workflows status", default=False, dest="all") sub_parser.add_argument("--errors", action="store_true", help="Display failed commands", default=False, dest="display_errors") sub_parser.set_defaults(cmd_object="status") # Add available pipelines wf_instances, wf_methodes = wfmanager.get_available_workflows() wf_classes = [] for instance in wf_instances: wf_classes.append(instance.__class__.__name__) # create the subparser for each applications sub_parser = subparsers.add_parser(instance.name, help=instance.description, fromfile_prefix_chars='@') sub_parser.convert_arg_line_to_args = instance.__class__.config_parser [parameters_groups, parameters_order] = instance.get_parameters_per_groups() for group in parameters_order: if group == "default": for param in parameters_groups[group]: sub_parser.add_argument(param.flag, **param.export_to_argparse()) elif group.startswith("exclude-"): is_required = False for param in parameters_groups[group]: if param.required: is_required = True # an exlcusive parameter cannot be required, the require is at the group level param.required = False pgroup = sub_parser.add_mutually_exclusive_group(required=is_required) for param in parameters_groups[group]: pgroup.add_argument(param.flag, **param.export_to_argparse()) else: pgroup = sub_parser.add_argument_group(group) for param in parameters_groups[group]: pgroup.add_argument(param.flag, **param.export_to_argparse()) sub_parser.set_defaults(cmd_object=instance.__class__.__name__) args = vars(parser.parse_args()) if not "cmd_object" in args: print(parser.format_help()) parser.exit(0, "") if args["cmd_object"] in wf_classes: wfmanager.run_workflow(args["cmd_object"], args) elif args["cmd_object"] == "rerun": wfmanager.rerun_workflow(args["workflow_id"]) elif args["cmd_object"] == "reset": try: wfmanager.reset_workflow_component(args["workflow_id"], args["component_name"]) except Exception as e: utils.display_error_message(str(e)) elif args["cmd_object"] == "delete": try: wfmanager.delete_workflow(args["workflow_id"]) except Exception as e: utils.display_error_message(str(e)) elif args["cmd_object"] == "execution_graph": try: workflow = wfmanager.get_workflow(args["workflow_id"]) except Exception as e: utils.display_error_message(str(e)) gr = workflow.get_execution_graph() inputs, components = [], [] for node in gr.nodes(): if Workflow.INPUTFILE_GRAPH_LABEL in gr.node_attributes(node): inputs.append(gr.node_attributes(node)[1]) elif Workflow.INPUTFILES_GRAPH_LABEL in gr.node_attributes(node): inputs.append(gr.node_attributes(node)[1]) elif Workflow.INPUTDIRECTORY_GRAPH_LABEL in gr.node_attributes(node): inputs.append(gr.node_attributes(node)[1]) elif Workflow.COMPONENT_GRAPH_LABEL in gr.node_attributes(node): components.append(gr.node_attributes(node)[1]) print(("inputs: ", inputs)) print(("components: ", components)) print(("edges: ", gr.edges())) elif args["cmd_object"] == "status": if args["workflow_id"]: try: workflow = wfmanager.get_workflow(args["workflow_id"]) except Exception as e: utils.display_error_message(str(e)) print((Workflow.get_status_under_text_format(workflow, True, args["display_errors"]))) else: try: workflows = wfmanager.get_workflows(use_cache=True) except Exception as e: utils.display_error_message(str(e)) if len(workflows) > 0: workflows_by_id, wfids = {}, [] # first sort workflow by ID for workflow in workflows: wfids.append(workflow.id) workflows_by_id[workflow.id] = workflow status = "ID\tNAME\tSTATUS\tELAPSED_TIME\tSTART_TIME\tEND_TIME\n" for i, wfid in enumerate(sorted(wfids, reverse=True)): status += Workflow.get_status_under_text_format(workflows_by_id[wfid]) if i