workflows_manager.py 6.56 KB
Newer Older
Jerome Mariette's avatar
Jerome Mariette committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#
# Copyright (C) 2012 INRA
# 
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import pkgutil
import workflows
import inspect
import jflow
import sys
import imp
import os
import pickle
Jerome Mariette's avatar
Jerome Mariette committed
26
import threading
Jerome Mariette's avatar
Jerome Mariette committed
27
28
29
30
31
32
33
34
35

import jflow.utils as utils
from jflow.config_reader import JFlowConfigReader

class WorkflowsManager(object):
    """
    """
    
    IDS_FILE_NAME = "jflowIDs.txt"
36
    WORKFLOWS_DUMP_FILE_NAME = ".workflows.dump"
Jerome Mariette's avatar
Jerome Mariette committed
37
38
    WF_DIRECTORY_PREFIX = "wf"
    
Jerome Mariette's avatar
Jerome Mariette committed
39
    lock_ids_file = threading.Lock()
40
    lock_workflows_file = threading.Lock()
Jerome Mariette's avatar
Jerome Mariette committed
41
    
Jerome Mariette's avatar
Jerome Mariette committed
42
43
44
45
46
47
    def __init__(self):
        """ 
        """
        self.config_reader = JFlowConfigReader()
        if not os.path.isdir(self.get_output_directory()):
            os.makedirs(self.get_output_directory(), 0751)
48
        self.dump_file = os.path.join(self.get_output_directory(), self.WORKFLOWS_DUMP_FILE_NAME)
Jerome Mariette's avatar
Jerome Mariette committed
49
50
        self.ids_file = os.path.join(self.get_output_directory(), self.IDS_FILE_NAME)
    
51
52
    def _add_workflow(self, workflow):
        WorkflowsManager.lock_workflows_file.acquire()
53
54
55
56
57
58
59
60
61
        # first load the existing workflows
        try:
            workflows_dump = pickle.load(open(self.dump_file))
        except:
            workflows_dump = {}
        # then add the new one
        workflows_dump[utils.get_nb_string(workflow.id)] = workflow.dump_path
        # and save them
        pickle.dump(workflows_dump, open(self.dump_file, "w"))
62
63
        WorkflowsManager.lock_workflows_file.release()
    
64
    def get_available_workflows(self, parameters_section="parameters"):
Jerome Mariette's avatar
Jerome Mariette committed
65
66
67
68
69
70
71
72
        wf_instances = []
        # Load all modules within the workflow module
        for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
            __import__(modname)
            # Search for Workflow classes
            for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
                if issubclass(obj, jflow.workflow.Workflow) and obj.__name__ != jflow.workflow.Workflow.__name__:
                    # try to build the workflow
73
74
75
                    wf_instances.append(obj(parameters_section=parameters_section))
                    #try: wf_instances.append(obj(parameters_section=parameters_section))
                    #except: pass
Jerome Mariette's avatar
Jerome Mariette committed
76
77
        return wf_instances
    
78
    def rerun_workflow(self, workflow_id):
Jerome Mariette's avatar
Jerome Mariette committed
79
        workflow = self.get_workflow(workflow_id)
80
81
        if hasattr(workflow, "stderr"):
            workflow.set_stderr()
82
83
        workflow.start()
    
84
    def run_workflow(self, workflow_class, args, function="process", parameters_section="parameters"):
Jerome Mariette's avatar
Jerome Mariette committed
85
86
87
88
89
        # Load all modules within the workflow module
        for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
            __import__(modname)
            # Search for Workflow classes
            for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
90
                if class_name == workflow_class: workflow = obj(args, self.get_next_id(), function, parameters_section)
91
        workflow.start()
92
93
        # Add the workflow dump path to the workflows dump
        self._add_workflow(workflow)
Jerome Mariette's avatar
Jerome Mariette committed
94
        return workflow
95
    
96
97
98
99
    def get_workflow_errors(self, workflow_id):
        workflow = self.get_workflow(workflow_id)
        return workflow.get_errors()
    
Philippe Bardou's avatar
Philippe Bardou committed
100
    def get_workflow_ouputs(self, web_path, workflow_id):
101
        workflow = self.get_workflow(workflow_id)
Philippe Bardou's avatar
Philippe Bardou committed
102
        return workflow.get_outputs_per_components(web_path)
103
    
Jerome Mariette's avatar
Jerome Mariette committed
104
105
106
    def get_output_directory(self):
        return self.config_reader.get_work_directory()
    
Jerome Mariette's avatar
Jerome Mariette committed
107
108
109
110
    def get_workflow_execution_graph(self, workflow_id):
        workflow = self.get_workflow(workflow_id)
        workflow.get_execution_graph()
        
Jerome Mariette's avatar
Jerome Mariette committed
111
112
    def get_workflows(self):
        workflows = []
113
114
115
116
117
        try:
            workflows_dump = pickle.load(open(self.dump_file))
        except:
            workflows_dump = {}
        for workflow_id in workflows_dump:
118
            try:
119
                workflow_dump = open(workflows_dump[workflow_id], "rb")
120
121
122
                workflows.append(pickle.load(workflow_dump))
                workflow_dump.close()
            except: pass
Jerome Mariette's avatar
Jerome Mariette committed
123
124
125
126
127
128
129
130
131
132
133
134
135
        return workflows

    def get_workflow_by_class(self, workflow_class):
        # Load all modules within the workflow module
        for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
            __import__(modname)
            # Search for Workflow classes
            for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
                if class_name == workflow_class:
                    return obj()
        return None
    
    def get_workflow(self, workflow_id):
136
        rworkflow_id = utils.get_nb_string(workflow_id)
137
138
139
140
141
142
        try:
            workflows_dump = pickle.load(open(self.dump_file))
        except:
            workflows_dump = {}
        if workflows_dump.has_key(rworkflow_id):
            workflow_dump = open(workflows_dump[rworkflow_id], "rb")
Jerome Mariette's avatar
Jerome Mariette committed
143
144
145
            workflow = pickle.load(workflow_dump)
            workflow_dump.close()
        else:
146
            raise ValueError("Workflow with id " + str(rworkflow_id) + " cannot be retrived!")
Jerome Mariette's avatar
Jerome Mariette committed
147
148
149
150
151
152
153
        return workflow

    def get_workflow_directory(self, wname, wid):
        return os.path.join(os.path.join(self.config_reader.get_work_directory(), wname), 
                                         self.WF_DIRECTORY_PREFIX + utils.get_nb_string(wid))
    
    def get_next_id(self):
Jerome Mariette's avatar
Jerome Mariette committed
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
        WorkflowsManager.lock_ids_file.acquire()
        cid = 1
        try:
            if os.path.isfile(self.ids_file):
                ifh = open(self.ids_file)
                cid = int(ifh.readline().strip())
                ifh.close()
                ifh = open(self.ids_file, "w")
                ifh.write(str(cid+1))
                ifh.close()
                return cid+1
            else:
                ifh = open(self.ids_file, "w")
                ifh.write("1")
                ifh.close()
        finally:
            WorkflowsManager.lock_ids_file.release()
171
        return cid