workflows_manager.py 6.33 KB
Newer Older
Jerome Mariette's avatar
Jerome Mariette committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#
# Copyright (C) 2012 INRA
# 
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import pkgutil
import workflows
import inspect
import jflow
import sys
import imp
import os
import pickle
Jerome Mariette's avatar
Jerome Mariette committed
26
import threading
Jerome Mariette's avatar
Jerome Mariette committed
27
28
29
30
31
32
33
34
35

import jflow.utils as utils
from jflow.config_reader import JFlowConfigReader

class WorkflowsManager(object):
    """
    """
    
    IDS_FILE_NAME = "jflowIDs.txt"
36
    WORKFLOWS_DUMP_FILE_NAME = ".workflows.dump"
Jerome Mariette's avatar
Jerome Mariette committed
37
38
    WF_DIRECTORY_PREFIX = "wf"
    
Jerome Mariette's avatar
Jerome Mariette committed
39
    lock_ids_file = threading.Lock()
40
    lock_workflows_file = threading.Lock()
Jerome Mariette's avatar
Jerome Mariette committed
41
    
Jerome Mariette's avatar
Jerome Mariette committed
42
43
44
45
46
47
    def __init__(self):
        """ 
        """
        self.config_reader = JFlowConfigReader()
        if not os.path.isdir(self.get_output_directory()):
            os.makedirs(self.get_output_directory(), 0751)
48
        self.dump_file = os.path.join(self.get_output_directory(), self.WORKFLOWS_DUMP_FILE_NAME)
Jerome Mariette's avatar
Jerome Mariette committed
49
50
        self.ids_file = os.path.join(self.get_output_directory(), self.IDS_FILE_NAME)
    
51
52
    def _add_workflow(self, workflow):
        WorkflowsManager.lock_workflows_file.acquire()
53
54
55
56
57
58
59
60
61
        # first load the existing workflows
        try:
            workflows_dump = pickle.load(open(self.dump_file))
        except:
            workflows_dump = {}
        # then add the new one
        workflows_dump[utils.get_nb_string(workflow.id)] = workflow.dump_path
        # and save them
        pickle.dump(workflows_dump, open(self.dump_file, "w"))
62
63
        WorkflowsManager.lock_workflows_file.release()
    
64
    def get_available_workflows(self, parameters_section="parameters"):
Jerome Mariette's avatar
Jerome Mariette committed
65
66
67
68
69
70
71
72
        wf_instances = []
        # Load all modules within the workflow module
        for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
            __import__(modname)
            # Search for Workflow classes
            for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
                if issubclass(obj, jflow.workflow.Workflow) and obj.__name__ != jflow.workflow.Workflow.__name__:
                    # try to build the workflow
73
                    try: wf_instances.append(obj(parameters_section=parameters_section))
Jerome Mariette's avatar
Jerome Mariette committed
74
75
76
                    except: pass
        return wf_instances
    
77
    def rerun_workflow(self, workflow_id):
Jerome Mariette's avatar
Jerome Mariette committed
78
        workflow = self.get_workflow(workflow_id)
79
80
        if hasattr(workflow, "stderr"):
            workflow.set_stderr()
81
82
        workflow.start()
    
83
    def run_workflow(self, workflow_class, args, function="process", parameters_section="parameters"):
Jerome Mariette's avatar
Jerome Mariette committed
84
85
86
87
88
        # Load all modules within the workflow module
        for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
            __import__(modname)
            # Search for Workflow classes
            for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
89
                if class_name == workflow_class: workflow = obj(args, self.get_next_id(), function, parameters_section)
90
        workflow.start()
91
92
        # Add the workflow dump path to the workflows dump
        self._add_workflow(workflow)
Jerome Mariette's avatar
Jerome Mariette committed
93
        return workflow
94
    
95
96
97
98
    def get_workflow_errors(self, workflow_id):
        workflow = self.get_workflow(workflow_id)
        return workflow.get_errors()
    
Philippe Bardou's avatar
Philippe Bardou committed
99
    def get_workflow_ouputs(self, web_path, workflow_id):
100
        workflow = self.get_workflow(workflow_id)
Philippe Bardou's avatar
Philippe Bardou committed
101
        return workflow.get_outputs_per_components(web_path)
102
    
Jerome Mariette's avatar
Jerome Mariette committed
103
104
105
106
107
    def get_output_directory(self):
        return self.config_reader.get_work_directory()
    
    def get_workflows(self):
        workflows = []
108
109
110
111
112
        try:
            workflows_dump = pickle.load(open(self.dump_file))
        except:
            workflows_dump = {}
        for workflow_id in workflows_dump:
113
            try:
114
                workflow_dump = open(workflows_dump[workflow_id], "rb")
115
116
117
                workflows.append(pickle.load(workflow_dump))
                workflow_dump.close()
            except: pass
Jerome Mariette's avatar
Jerome Mariette committed
118
119
120
121
122
123
124
125
126
127
128
129
130
        return workflows

    def get_workflow_by_class(self, workflow_class):
        # Load all modules within the workflow module
        for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
            __import__(modname)
            # Search for Workflow classes
            for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
                if class_name == workflow_class:
                    return obj()
        return None
    
    def get_workflow(self, workflow_id):
131
        rworkflow_id = utils.get_nb_string(workflow_id)
132
133
134
135
136
137
        try:
            workflows_dump = pickle.load(open(self.dump_file))
        except:
            workflows_dump = {}
        if workflows_dump.has_key(rworkflow_id):
            workflow_dump = open(workflows_dump[rworkflow_id], "rb")
Jerome Mariette's avatar
Jerome Mariette committed
138
139
140
            workflow = pickle.load(workflow_dump)
            workflow_dump.close()
        else:
141
            raise ValueError("Workflow with id " + str(rworkflow_id) + " cannot be retrived!")
Jerome Mariette's avatar
Jerome Mariette committed
142
143
144
145
146
147
148
        return workflow

    def get_workflow_directory(self, wname, wid):
        return os.path.join(os.path.join(self.config_reader.get_work_directory(), wname), 
                                         self.WF_DIRECTORY_PREFIX + utils.get_nb_string(wid))
    
    def get_next_id(self):
Jerome Mariette's avatar
Jerome Mariette committed
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
        WorkflowsManager.lock_ids_file.acquire()
        cid = 1
        try:
            if os.path.isfile(self.ids_file):
                ifh = open(self.ids_file)
                cid = int(ifh.readline().strip())
                ifh.close()
                ifh = open(self.ids_file, "w")
                ifh.write(str(cid+1))
                ifh.close()
                return cid+1
            else:
                ifh = open(self.ids_file, "w")
                ifh.write("1")
                ifh.close()
        finally:
            WorkflowsManager.lock_ids_file.release()
166
        return cid