jflow_cli.py 7.64 KB
Newer Older
Jerome Mariette's avatar
Jerome Mariette committed
1
#
Jerome Mariette's avatar
Jerome Mariette committed
2
# Copyright (C) 2015 INRA
Jerome Mariette's avatar
Jerome Mariette committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import sys
Jerome Mariette's avatar
Jerome Mariette committed
19
import time
Jerome Mariette's avatar
Jerome Mariette committed
20
21
22
23
24
25
26
27

try:
    import _preamble
except ImportError:
    sys.exc_clear()

from jflow.workflows_manager import WorkflowsManager
from jflow.workflow import Workflow
28
from jflow.argparser import JflowArgumentParser
Jerome Mariette's avatar
Jerome Mariette committed
29
import jflow.utils as utils
Jerome Mariette's avatar
Jerome Mariette committed
30
31
32

if __name__ == '__main__':

Jerome Mariette's avatar
Jerome Mariette committed
33
34
35
    # Create a workflow manager to get access to our workflows
    wfmanager = WorkflowsManager()
    
Jerome Mariette's avatar
Jerome Mariette committed
36
    # Create the top-level parser
Jerome Mariette's avatar
Jerome Mariette committed
37
    parser = JflowArgumentParser()
Jerome Mariette's avatar
Jerome Mariette committed
38
39
    subparsers = parser.add_subparsers(title='Available sub commands')
    
Jerome Mariette's avatar
Jerome Mariette committed
40
    # Add rerun workflow availability
Jerome Mariette's avatar
Jerome Mariette committed
41
    sub_parser = subparsers.add_parser("rerun", help="Rerun a specific workflow")
Jerome Mariette's avatar
Jerome Mariette committed
42
43
44
45
    sub_parser.add_argument("--workflow-id", type=str, help="Which workflow should be rerun",
                            required=True, dest="workflow_id")
    sub_parser.set_defaults(cmd_object="rerun")

46
47
48
49
50
51
52
53
    # Add rerun workflow availability
    sub_parser = subparsers.add_parser("reset", help="Reset a workflow component")
    sub_parser.add_argument("--workflow-id", type=str, help="Which workflow should be used",
                            required=True, dest="workflow_id")
    sub_parser.add_argument("--component-name", type=str, help="Which component should be reseted",
                            required=True, dest="component_name")
    sub_parser.set_defaults(cmd_object="reset")

Jerome Mariette's avatar
Jerome Mariette committed
54
55
56
57
58
59
    # Add delete workflow availability
    sub_parser = subparsers.add_parser("delete", help="Delete a workflow")
    sub_parser.add_argument("--workflow-id", type=str, help="Which workflow should be deleted",
                            required=True, dest="workflow_id")
    sub_parser.set_defaults(cmd_object="delete")

Jerome Mariette's avatar
Jerome Mariette committed
60
61
62
63
64
65
    # Add rerun workflow availability
    sub_parser = subparsers.add_parser("execution-graph", help="Display the workflow execution graph")
    sub_parser.add_argument("--workflow-id", type=str, help="Which workflow should be considered",
                            required=True, dest="workflow_id")
    sub_parser.set_defaults(cmd_object="execution_graph")

Jerome Mariette's avatar
Jerome Mariette committed
66
    # Add status workflow availability
Jerome Mariette's avatar
Jerome Mariette committed
67
    sub_parser = subparsers.add_parser("status", help="Monitor a specific workflow")
Jerome Mariette's avatar
Jerome Mariette committed
68
69
70
71
    sub_parser.add_argument("--workflow-id", type=str, help="Which workflow status should be displayed",
                            default=None, dest="workflow_id")
    sub_parser.add_argument("--all", action="store_true", help="Display all workflows status",
                            default=False, dest="all")
72
73
    sub_parser.add_argument("--errors", action="store_true", help="Display failed commands",
                            default=False, dest="display_errors")
Jerome Mariette's avatar
Jerome Mariette committed
74
75
    sub_parser.set_defaults(cmd_object="status")
    
Jerome Mariette's avatar
Jerome Mariette committed
76
    # Add available pipelines
77
    wf_instances, wf_methodes = wfmanager.get_available_workflows()
Jerome Mariette's avatar
Jerome Mariette committed
78
    wf_classes = []
Jerome Mariette's avatar
Jerome Mariette committed
79
    for instance in wf_instances:
80
        wf_classes.append(instance.get_classname())
Jerome Mariette's avatar
Jerome Mariette committed
81
        # create the subparser for each applications
Jerome Mariette's avatar
Jerome Mariette committed
82
83
        sub_parser = subparsers.add_parser(instance.name, help=instance.description, fromfile_prefix_chars='@')
        sub_parser.convert_arg_line_to_args = instance.__class__.config_parser
Jerome Mariette's avatar
Jerome Mariette committed
84
        [parameters_groups, parameters_order] = instance.get_parameters_per_groups()
85
        for group in parameters_order:
Jerome Mariette's avatar
Jerome Mariette committed
86
87
88
            if group == "default":
                for param in parameters_groups[group]:
                    sub_parser.add_argument(param.flag, **param.export_to_argparse())
Jerome Mariette's avatar
Jerome Mariette committed
89
90
91
92
93
94
95
96
97
98
            elif group.startswith("exclude-"):
                is_required = False
                for param in parameters_groups[group]:
                    if param.required:
                        is_required = True
                        # an exlcusive parameter cannot be required, the require is at the group level
                        param.required = False
                pgroup = sub_parser.add_mutually_exclusive_group(required=is_required)
                for param in parameters_groups[group]:
                    pgroup.add_argument(param.flag, **param.export_to_argparse())
Jerome Mariette's avatar
Jerome Mariette committed
99
100
101
102
            else:
                pgroup = sub_parser.add_argument_group(group)
                for param in parameters_groups[group]:
                    pgroup.add_argument(param.flag, **param.export_to_argparse())
103
        sub_parser.set_defaults(cmd_object=instance.get_classname())
104
    args = vars(parser.parse_args())
105

Jerome Mariette's avatar
Jerome Mariette committed
106
107
108
109
    if args["cmd_object"] in wf_classes:
        wfmanager.run_workflow(args["cmd_object"], args)
    elif args["cmd_object"] == "rerun":
        wfmanager.rerun_workflow(args["workflow_id"])
110
    elif args["cmd_object"] == "reset":
111
112
113
114
        try:
            wfmanager.reset_workflow_component(args["workflow_id"], args["component_name"])
        except Exception as e:
            utils.display_error_message(str(e))
Jerome Mariette's avatar
Jerome Mariette committed
115
    elif args["cmd_object"] == "delete":
116
117
118
119
        try:
            wfmanager.delete_workflow(args["workflow_id"])
        except Exception as e:
            utils.display_error_message(str(e))
Jerome Mariette's avatar
Jerome Mariette committed
120
    elif args["cmd_object"] == "execution_graph":
121
122
123
124
        try:
            workflow = wfmanager.get_workflow(args["workflow_id"])
        except Exception as e:
            utils.display_error_message(str(e))
125
        gr = workflow.get_execution_graph()
126
127
        inputs, components = [], []
        for node in gr.nodes():
Jerome Mariette's avatar
Jerome Mariette committed
128
129
130
131
132
            if Workflow.INPUTFILE_GRAPH_LABEL in gr.node_attributes(node):
                inputs.append(gr.node_attributes(node)[1])
            elif Workflow.INPUTFILES_GRAPH_LABEL in gr.node_attributes(node):
                inputs.append(gr.node_attributes(node)[1])
            elif Workflow.INPUTDIRECTORY_GRAPH_LABEL in gr.node_attributes(node):
133
                inputs.append(gr.node_attributes(node)[1])
134
            elif Workflow.COMPONENT_GRAPH_LABEL in gr.node_attributes(node):
135
                components.append(gr.node_attributes(node)[1])
136
137
        print "inputs: ", inputs
        print "components: ", components
138
        print "edges: ", gr.edges()
139
        
Jerome Mariette's avatar
Jerome Mariette committed
140
141
    elif args["cmd_object"] == "status":
        if args["workflow_id"]:
142
143
144
145
            try:
                workflow = wfmanager.get_workflow(args["workflow_id"])
            except Exception as e:
                utils.display_error_message(str(e))
146
            print Workflow.get_status_under_text_format(workflow, True, args["display_errors"])
Jerome Mariette's avatar
Jerome Mariette committed
147
        else:
148
149
150
151
            try:
                workflows = wfmanager.get_workflows(use_cache=True)
            except Exception as e:
                utils.display_error_message(str(e))
Jerome Mariette's avatar
Jerome Mariette committed
152
            if len(workflows) > 0:
Jerome Mariette's avatar
Jerome Mariette committed
153
154
155
156
157
                workflows_by_id, wfids = {}, []
                # first sort workflow by ID
                for workflow in workflows:
                    wfids.append(workflow.id)
                    workflows_by_id[workflow.id] = workflow
Jerome Mariette's avatar
Jerome Mariette committed
158
                status = "ID\tNAME\tSTATUS\tELAPSED_TIME\tSTART_TIME\tEND_TIME\n"
Jerome Mariette's avatar
Jerome Mariette committed
159
                for i, wfid in enumerate(sorted(wfids, reverse=True)):
160
                    status += Workflow.get_status_under_text_format(workflows_by_id[wfid])
Jerome Mariette's avatar
Jerome Mariette committed
161
162
163
                    if i<len(workflows)-1: status += "\n"
            else: status = "no workflow available"
            print status
Jerome Mariette's avatar
Jerome Mariette committed
164