workflows_manager.py 7.01 KB
Newer Older
Jerome Mariette's avatar
Jerome Mariette committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#
# Copyright (C) 2012 INRA
# 
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import pkgutil
import workflows
import inspect
import jflow
import sys
import imp
import os
import pickle
Jerome Mariette's avatar
Jerome Mariette committed
26
import threading
Jerome Mariette's avatar
Jerome Mariette committed
27
28
29
30
31
32
33
34
35

import jflow.utils as utils
from jflow.config_reader import JFlowConfigReader

class WorkflowsManager(object):
    """
    """
    
    IDS_FILE_NAME = "jflowIDs.txt"
36
    WORKFLOWS_DUMP_FILE_NAME = ".workflows.dump"
Jerome Mariette's avatar
Jerome Mariette committed
37
38
    WF_DIRECTORY_PREFIX = "wf"
    
Jerome Mariette's avatar
Jerome Mariette committed
39
    lock_ids_file = threading.Lock()
40
    lock_workflows_file = threading.Lock()
Jerome Mariette's avatar
Jerome Mariette committed
41
    
Jerome Mariette's avatar
Jerome Mariette committed
42
43
44
45
46
47
    def __init__(self):
        """ 
        """
        self.config_reader = JFlowConfigReader()
        if not os.path.isdir(self.get_output_directory()):
            os.makedirs(self.get_output_directory(), 0751)
48
        self.dump_file = os.path.join(self.get_output_directory(), self.WORKFLOWS_DUMP_FILE_NAME)
Jerome Mariette's avatar
Jerome Mariette committed
49
50
        self.ids_file = os.path.join(self.get_output_directory(), self.IDS_FILE_NAME)
    
51
52
    def _add_workflow(self, workflow):
        WorkflowsManager.lock_workflows_file.acquire()
53
54
55
56
57
58
59
60
61
        # first load the existing workflows
        try:
            workflows_dump = pickle.load(open(self.dump_file))
        except:
            workflows_dump = {}
        # then add the new one
        workflows_dump[utils.get_nb_string(workflow.id)] = workflow.dump_path
        # and save them
        pickle.dump(workflows_dump, open(self.dump_file, "w"))
62
63
        WorkflowsManager.lock_workflows_file.release()
    
64
65
66
67
68
69
    def get_available_workflows(self, function="process"):
        if function.__class__.__name__ == "str":
            functions = [function]
        else:
            functions = set(function)
        wf_instances, wf_methodes = [], []
Jerome Mariette's avatar
Jerome Mariette committed
70
71
72
73
74
75
        # Load all modules within the workflow module
        for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
            __import__(modname)
            # Search for Workflow classes
            for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
                if issubclass(obj, jflow.workflow.Workflow) and obj.__name__ != jflow.workflow.Workflow.__name__:
76
77
78
79
80
81
82
83
84
85
                    for function in functions:
                        # check if the workflow has the requested methode
                        for ifunction in inspect.getmembers(obj, predicate=inspect.ismethod):
                            if ifunction[0] == function:
                                # try to build the workflow
                                try: 
                                    wf_instances.append(obj(function=function))
                                    wf_methodes.append(function)
                                except: pass
        return [wf_instances, wf_methodes]
Jerome Mariette's avatar
Jerome Mariette committed
86
    
87
    def rerun_workflow(self, workflow_id):
Jerome Mariette's avatar
Jerome Mariette committed
88
        workflow = self.get_workflow(workflow_id)
89
90
        if hasattr(workflow, "stderr"):
            workflow.set_stderr()
91
        workflow.start()
Jerome Mariette's avatar
Jerome Mariette committed
92
        return workflow
93
94
95
96

    def reset_workflow_component(self, workflow_id, component_name):
        workflow = self.get_workflow(workflow_id)
        workflow.reset_component(component_name)
Jerome Mariette's avatar
Jerome Mariette committed
97
        return workflow
98
    
99
    def run_workflow(self, workflow_class, args, function="process"):
Jerome Mariette's avatar
Jerome Mariette committed
100
101
102
103
104
        # Load all modules within the workflow module
        for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
            __import__(modname)
            # Search for Workflow classes
            for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
105
                if class_name == workflow_class: workflow = obj(args, self.get_next_id(), function)
106
        workflow.start()
107
108
        # Add the workflow dump path to the workflows dump
        self._add_workflow(workflow)
Jerome Mariette's avatar
Jerome Mariette committed
109
        return workflow
110
    
111
112
113
114
    def get_workflow_errors(self, workflow_id):
        workflow = self.get_workflow(workflow_id)
        return workflow.get_errors()
    
115
    def get_workflow_ouputs(self, workflow_id):
116
        workflow = self.get_workflow(workflow_id)
117
        return workflow.get_outputs_per_components()
118
    
Jerome Mariette's avatar
Jerome Mariette committed
119
120
    def get_output_directory(self):
        return self.config_reader.get_work_directory()
Jerome Mariette's avatar
Jerome Mariette committed
121
        
Jerome Mariette's avatar
Jerome Mariette committed
122
123
    def get_workflows(self):
        workflows = []
124
125
126
127
128
        try:
            workflows_dump = pickle.load(open(self.dump_file))
        except:
            workflows_dump = {}
        for workflow_id in workflows_dump:
129
            try:
130
                workflow_dump = open(workflows_dump[workflow_id], "rb")
131
132
133
                workflows.append(pickle.load(workflow_dump))
                workflow_dump.close()
            except: pass
Jerome Mariette's avatar
Jerome Mariette committed
134
135
136
137
138
139
140
141
142
143
144
145
146
        return workflows

    def get_workflow_by_class(self, workflow_class):
        # Load all modules within the workflow module
        for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
            __import__(modname)
            # Search for Workflow classes
            for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
                if class_name == workflow_class:
                    return obj()
        return None
    
    def get_workflow(self, workflow_id):
147
        rworkflow_id = utils.get_nb_string(workflow_id)
148
149
150
151
152
153
        try:
            workflows_dump = pickle.load(open(self.dump_file))
        except:
            workflows_dump = {}
        if workflows_dump.has_key(rworkflow_id):
            workflow_dump = open(workflows_dump[rworkflow_id], "rb")
Jerome Mariette's avatar
Jerome Mariette committed
154
155
156
            workflow = pickle.load(workflow_dump)
            workflow_dump.close()
        else:
157
            raise ValueError("Workflow with id " + str(rworkflow_id) + " cannot be retrived!")
Jerome Mariette's avatar
Jerome Mariette committed
158
159
160
161
162
163
164
        return workflow

    def get_workflow_directory(self, wname, wid):
        return os.path.join(os.path.join(self.config_reader.get_work_directory(), wname), 
                                         self.WF_DIRECTORY_PREFIX + utils.get_nb_string(wid))
    
    def get_next_id(self):
Jerome Mariette's avatar
Jerome Mariette committed
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
        WorkflowsManager.lock_ids_file.acquire()
        cid = 1
        try:
            if os.path.isfile(self.ids_file):
                ifh = open(self.ids_file)
                cid = int(ifh.readline().strip())
                ifh.close()
                ifh = open(self.ids_file, "w")
                ifh.write(str(cid+1))
                ifh.close()
                return cid+1
            else:
                ifh = open(self.ids_file, "w")
                ifh.write("1")
                ifh.close()
        finally:
            WorkflowsManager.lock_ids_file.release()
182
        return cid