workflows_manager.py 9.48 KB
Newer Older
1
#
Penom Nom's avatar
Penom Nom committed
2
# Copyright (C) 2015 INRA
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import pkgutil
import workflows
import inspect
import jflow
import sys
import imp
import os
25
import pickle
26
import threading
27
import logging
Jerome Mariette's avatar
Jerome Mariette committed
28
29

import jflow.utils as utils
30
from jflow.config_reader import JFlowConfigReader
Jerome Mariette's avatar
Jerome Mariette committed
31

32
33
34
class WorkflowsManager(object):
    
    IDS_FILE_NAME = "jflowIDs.txt"
35
    WORKFLOWS_DUMP_FILE_NAME = ".workflows.dump"
36
37
    WF_DIRECTORY_PREFIX = "wf"
    
38
    lock_ids_file = threading.Lock()
39
    lock_workflows_file = threading.Lock()
40
    
41
42
    def __init__(self):
        self.config_reader = JFlowConfigReader()
43
44
        if not os.path.isdir(self.get_output_directory()):
            os.makedirs(self.get_output_directory(), 0751)
45
        self.dump_file = os.path.join(self.get_output_directory(), self.WORKFLOWS_DUMP_FILE_NAME)
46
47
        self.ids_file = os.path.join(self.get_output_directory(), self.IDS_FILE_NAME)
    
48
    def _dump_workflows(self, workflows):
49
        WorkflowsManager.lock_workflows_file.acquire()
50
51
        # first load the existing workflows
        try:
52
53
54
            wdfh = open(self.dump_file)
            workflows_dump = pickle.load(wdfh)
            wdfh.close()
55
56
        except:
            workflows_dump = {}
57
58
59
60
        # then add the new ones
        for workflow in workflows:
            workflows_dump[utils.get_nb_string(workflow.id)] = {"dump_path": workflow.dump_path,
                                                                "object": workflow.minimize()}
61
        # and save them
62
63
64
        wdfh = open(self.dump_file, "w")
        pickle.dump(workflows_dump, wdfh)
        wdfh.close()
65
66
        WorkflowsManager.lock_workflows_file.release()
    
Penom Nom's avatar
Penom Nom committed
67
68
69
70
71
72
    def get_available_workflows(self, function="process"):
        if function.__class__.__name__ == "str":
            functions = [function]
        else:
            functions = set(function)
        wf_instances, wf_methodes = [], []
73
74
75
76
77
78
        # Load all modules within the workflow module
        for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
            __import__(modname)
            # Search for Workflow classes
            for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
                if issubclass(obj, jflow.workflow.Workflow) and obj.__name__ != jflow.workflow.Workflow.__name__:
Penom Nom's avatar
Penom Nom committed
79
80
81
82
83
84
85
86
87
88
                    for function in functions:
                        # check if the workflow has the requested methode
                        for ifunction in inspect.getmembers(obj, predicate=inspect.ismethod):
                            if ifunction[0] == function:
                                # try to build the workflow
                                try: 
                                    wf_instances.append(obj(function=function))
                                    wf_methodes.append(function)
                                except: pass
        return [wf_instances, wf_methodes]
89
    
Jerome Mariette's avatar
Jerome Mariette committed
90
    def rerun_workflow(self, workflow_id):
91
        workflow = self.get_workflow(workflow_id)
Penom Nom's avatar
Penom Nom committed
92
        if hasattr(workflow, "stderr"):
Penom Nom's avatar
Penom Nom committed
93
            workflow._set_stderr()
94
        workflow.start()
95
96
        # Update the workflow in the cache
        self._dump_workflows([workflow])
Penom Nom's avatar
Penom Nom committed
97
98
99
100
101
        return workflow

    def reset_workflow_component(self, workflow_id, component_name):
        workflow = self.get_workflow(workflow_id)
        workflow.reset_component(component_name)
102
103
        # Update the workflow in the cache
        self._dump_workflows([workflow])
Penom Nom's avatar
Penom Nom committed
104
        return workflow
105
    
Penom Nom's avatar
Penom Nom committed
106
    def run_workflow(self, workflow_class, args, function="process"):
107
108
109
110
111
        # Load all modules within the workflow module
        for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
            __import__(modname)
            # Search for Workflow classes
            for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
Penom Nom's avatar
Penom Nom committed
112
                if class_name == workflow_class: workflow = obj(args, self.get_next_id(), function)
113
        workflow.start()
114
        # Add the workflow dump path to the workflows dump
115
        self._dump_workflows([workflow])
Jerome Mariette's avatar
Jerome Mariette committed
116
        return workflow
Penom Nom's avatar
Penom Nom committed
117
    
118
119
120
121
    def delete_workflow(self, workflow_id):
        from jflow.workflow import Workflow
        WorkflowsManager.lock_workflows_file.acquire()
        try:
122
123
124
            awfh = open(self.dump_file)
            all_workflows_dump = pickle.load(awfh)
            awfh.close()
125
        except:
126
            all_workflows_dump = {}
127
128
        rworkflow_id = utils.get_nb_string(workflow_id)
        try:
129
            workflow_dump = open(all_workflows_dump[rworkflow_id]["dump_path"], "rb")
130
            workflow = pickle.load(workflow_dump)
131
            workflow_dump.close()
132
133
134
            # if workflow is not in a running status
            if workflow.get_status() in [Workflow.STATUS_COMPLETED, Workflow.STATUS_FAILED, Workflow.STATUS_ABORTED]:
                workflow.delete()
135
                del all_workflows_dump[rworkflow_id]
136
137
138
139
        except:
            logging.getLogger("jflow").debug("Workflow #" + rworkflow_id + " connot be retrieved in the available workflows!")
            raise Exception("Workflow #" + rworkflow_id + " connot be retrieved in the available workflows!")
        # and save them
140
141
142
        awfh = open(self.dump_file, "w")
        pickle.dump(all_workflows_dump, awfh)
        awfh.close()
143
144
        WorkflowsManager.lock_workflows_file.release()
        
Penom Nom's avatar
Penom Nom committed
145
146
147
148
    def get_workflow_errors(self, workflow_id):
        workflow = self.get_workflow(workflow_id)
        return workflow.get_errors()
    
Jerome Mariette's avatar
Jerome Mariette committed
149
150
    def get_output_directory(self):
        return self.config_reader.get_work_directory()
Penom Nom's avatar
Penom Nom committed
151
        
152
153
154
155
156
157
    def get_workflow_outputs(self, workflow_id):
        workflow = self.get_workflow(workflow_id)
        return workflow.get_outputs_per_components()
        
    def get_workflows(self, use_cache=False):
        from jflow.workflow import Workflow
Jerome Mariette's avatar
Jerome Mariette committed
158
        workflows = []
159
        try:
160
161
162
            awfh = open(self.dump_file)
            workflows_dump = pickle.load(awfh)
            awfh.close()
163
164
        except:
            workflows_dump = {}
165
        updated_workflows = []
166
        for workflow_id in workflows_dump:
167
168
169
170
171
172
173
174
175
176
177
178
            # is the workflow completed, failed or aborted use the miniworkflow cached
            if use_cache and workflows_dump[workflow_id]["object"].get_status() in [Workflow.STATUS_COMPLETED, Workflow.STATUS_FAILED, Workflow.STATUS_ABORTED]:
                workflows.append(workflows_dump[workflow_id]["object"])
            else:
                try:
                    workflow_dump = open(workflows_dump[workflow_id]["dump_path"], "rb")
                    workflow = pickle.load(workflow_dump)
                    workflows.append(workflow)
                    updated_workflows.append(workflow)
                    workflow_dump.close()
                except: pass
        self._dump_workflows(updated_workflows)
Jerome Mariette's avatar
Jerome Mariette committed
179
        return workflows
Jerome Mariette's avatar
save    
Jerome Mariette committed
180
181
182
183
184
185
186
187
188
189

    def get_workflow_by_class(self, workflow_class):
        # Load all modules within the workflow module
        for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
            __import__(modname)
            # Search for Workflow classes
            for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
                if class_name == workflow_class:
                    return obj()
        return None
Jerome Mariette's avatar
Jerome Mariette committed
190
    
191
    def get_workflow(self, workflow_id):
192
        rworkflow_id = utils.get_nb_string(workflow_id)
193
        try:
194
195
196
            wdfh = open(self.dump_file)
            workflows_dump = pickle.load(wdfh)
            wdfh.close()
197
198
199
        except:
            workflows_dump = {}
        if workflows_dump.has_key(rworkflow_id):
200
            workflow_dump = open(workflows_dump[rworkflow_id]["dump_path"], "rb")
201
202
203
            workflow = pickle.load(workflow_dump)
            workflow_dump.close()
        else:
204
205
            logging.getLogger("jflow").debug("Workflow #" + str(rworkflow_id) + " connot be retrieved in the available workflows!")
            raise Exception("Workflow #" + str(rworkflow_id) + " connot be retrieved in the available workflows!")
Jerome Mariette's avatar
Jerome Mariette committed
206
        return workflow
207
208

    def get_workflow_directory(self, wname, wid):
209
        return os.path.join(os.path.join(self.config_reader.get_work_directory(), wname), 
Jerome Mariette's avatar
Jerome Mariette committed
210
                                         self.WF_DIRECTORY_PREFIX + utils.get_nb_string(wid))
211
212
    
    def get_next_id(self):
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
        WorkflowsManager.lock_ids_file.acquire()
        cid = 1
        try:
            if os.path.isfile(self.ids_file):
                ifh = open(self.ids_file)
                cid = int(ifh.readline().strip())
                ifh.close()
                ifh = open(self.ids_file, "w")
                ifh.write(str(cid+1))
                ifh.close()
                return cid+1
            else:
                ifh = open(self.ids_file, "w")
                ifh.write("1")
                ifh.close()
        finally:
            WorkflowsManager.lock_ids_file.release()
230
        return cid