workflows_manager.py 6.11 KB
Newer Older
Jerome Mariette's avatar
Jerome Mariette committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#
# Copyright (C) 2012 INRA
# 
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import pkgutil
import workflows
import inspect
import jflow
import sys
import imp
import os
import pickle
Jerome Mariette's avatar
Jerome Mariette committed
26
import threading
Jerome Mariette's avatar
Jerome Mariette committed
27
28
29
30
31
32
33
34
35

import jflow.utils as utils
from jflow.config_reader import JFlowConfigReader

class WorkflowsManager(object):
    """
    """
    
    IDS_FILE_NAME = "jflowIDs.txt"
36
    WORKFLOWS_DUMP_FILE_NAME = ".workflows.dump"
Jerome Mariette's avatar
Jerome Mariette committed
37
38
    WF_DIRECTORY_PREFIX = "wf"
    
Jerome Mariette's avatar
Jerome Mariette committed
39
    lock_ids_file = threading.Lock()
40
    lock_workflows_file = threading.Lock()
Jerome Mariette's avatar
Jerome Mariette committed
41
    
Jerome Mariette's avatar
Jerome Mariette committed
42
43
44
45
46
47
    def __init__(self):
        """ 
        """
        self.config_reader = JFlowConfigReader()
        if not os.path.isdir(self.get_output_directory()):
            os.makedirs(self.get_output_directory(), 0751)
48
        self.dump_file = os.path.join(self.get_output_directory(), self.WORKFLOWS_DUMP_FILE_NAME)
Jerome Mariette's avatar
Jerome Mariette committed
49
50
        self.ids_file = os.path.join(self.get_output_directory(), self.IDS_FILE_NAME)
    
51
52
    def _add_workflow(self, workflow):
        WorkflowsManager.lock_workflows_file.acquire()
53
54
55
56
57
58
59
60
61
        # first load the existing workflows
        try:
            workflows_dump = pickle.load(open(self.dump_file))
        except:
            workflows_dump = {}
        # then add the new one
        workflows_dump[utils.get_nb_string(workflow.id)] = workflow.dump_path
        # and save them
        pickle.dump(workflows_dump, open(self.dump_file, "w"))
62
63
        WorkflowsManager.lock_workflows_file.release()
    
64
    def get_available_workflows(self, parameters_section="parameters"):
Jerome Mariette's avatar
Jerome Mariette committed
65
66
67
68
69
70
71
72
        wf_instances = []
        # Load all modules within the workflow module
        for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
            __import__(modname)
            # Search for Workflow classes
            for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
                if issubclass(obj, jflow.workflow.Workflow) and obj.__name__ != jflow.workflow.Workflow.__name__:
                    # try to build the workflow
73
                    try: wf_instances.append(obj(parameters_section=parameters_section))
Jerome Mariette's avatar
Jerome Mariette committed
74
75
76
                    except: pass
        return wf_instances
    
77
    def rerun_workflow(self, workflow_id):
Jerome Mariette's avatar
Jerome Mariette committed
78
        workflow = self.get_workflow(workflow_id)
79
80
        workflow.start()
    
81
    def run_workflow(self, workflow_class, args, function="process", parameters_section="parameters"):
Jerome Mariette's avatar
Jerome Mariette committed
82
83
84
85
86
        # Load all modules within the workflow module
        for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
            __import__(modname)
            # Search for Workflow classes
            for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
87
                if class_name == workflow_class: workflow = obj(args, self.get_next_id(), function, parameters_section)
88
        workflow.start()
89
90
        # Add the workflow dump path to the workflows dump
        self._add_workflow(workflow)
Jerome Mariette's avatar
Jerome Mariette committed
91
        return workflow
92
93
94
95
96
    
    def get_workflow_ouputs(self, workflow_id):
        workflow = self.get_workflow(workflow_id)
        return workflow.get_outputs_per_components()
    
Jerome Mariette's avatar
Jerome Mariette committed
97
98
99
100
101
    def get_output_directory(self):
        return self.config_reader.get_work_directory()
    
    def get_workflows(self):
        workflows = []
102
103
104
105
106
        try:
            workflows_dump = pickle.load(open(self.dump_file))
        except:
            workflows_dump = {}
        for workflow_id in workflows_dump:
107
            try:
108
                workflow_dump = open(workflows_dump[workflow_id], "rb")
109
110
111
                workflows.append(pickle.load(workflow_dump))
                workflow_dump.close()
            except: pass
Jerome Mariette's avatar
Jerome Mariette committed
112
113
114
115
116
117
118
119
120
121
122
123
124
        return workflows

    def get_workflow_by_class(self, workflow_class):
        # Load all modules within the workflow module
        for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
            __import__(modname)
            # Search for Workflow classes
            for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
                if class_name == workflow_class:
                    return obj()
        return None
    
    def get_workflow(self, workflow_id):
125
        rworkflow_id = utils.get_nb_string(workflow_id)
126
127
128
129
130
131
        try:
            workflows_dump = pickle.load(open(self.dump_file))
        except:
            workflows_dump = {}
        if workflows_dump.has_key(rworkflow_id):
            workflow_dump = open(workflows_dump[rworkflow_id], "rb")
Jerome Mariette's avatar
Jerome Mariette committed
132
133
134
            workflow = pickle.load(workflow_dump)
            workflow_dump.close()
        else:
135
            raise ValueError("Workflow with id " + str(rworkflow_id) + " cannot be retrived!")
Jerome Mariette's avatar
Jerome Mariette committed
136
137
138
139
140
141
142
        return workflow

    def get_workflow_directory(self, wname, wid):
        return os.path.join(os.path.join(self.config_reader.get_work_directory(), wname), 
                                         self.WF_DIRECTORY_PREFIX + utils.get_nb_string(wid))
    
    def get_next_id(self):
Jerome Mariette's avatar
Jerome Mariette committed
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
        WorkflowsManager.lock_ids_file.acquire()
        cid = 1
        try:
            if os.path.isfile(self.ids_file):
                ifh = open(self.ids_file)
                cid = int(ifh.readline().strip())
                ifh.close()
                ifh = open(self.ids_file, "w")
                ifh.write(str(cid+1))
                ifh.close()
                return cid+1
            else:
                ifh = open(self.ids_file, "w")
                ifh.write("1")
                ifh.close()
        finally:
            WorkflowsManager.lock_ids_file.release()
        return cid