workflows_manager.py 5.75 KB
Newer Older
Jerome Mariette's avatar
Jerome Mariette committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#
# Copyright (C) 2012 INRA
# 
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import pkgutil
import workflows
import inspect
import jflow
import sys
import imp
import os
import pickle
Jerome Mariette's avatar
Jerome Mariette committed
26
import threading
Jerome Mariette's avatar
Jerome Mariette committed
27
28
29
30
31
32
33
34
35

import jflow.utils as utils
from jflow.config_reader import JFlowConfigReader

class WorkflowsManager(object):
    """
    """
    
    IDS_FILE_NAME = "jflowIDs.txt"
36
    WORKFLOWS_DUMP_FILE_NAME = ".workflows.dump"
Jerome Mariette's avatar
Jerome Mariette committed
37
38
    WF_DIRECTORY_PREFIX = "wf"
    
Jerome Mariette's avatar
Jerome Mariette committed
39
    lock_ids_file = threading.Lock()
40
    lock_workflows_file = threading.Lock()
Jerome Mariette's avatar
Jerome Mariette committed
41
    
Jerome Mariette's avatar
Jerome Mariette committed
42
43
44
45
46
47
    def __init__(self):
        """ 
        """
        self.config_reader = JFlowConfigReader()
        if not os.path.isdir(self.get_output_directory()):
            os.makedirs(self.get_output_directory(), 0751)
48
49
50
51
52
53
        self.dump_file = os.path.join(self.get_output_directory(), self.WORKFLOWS_DUMP_FILE_NAME)
        # if the file already exists, load all the workflows dump path
        if os.path.isfile(self.dump_file):
            self.workflows_dump = pickle.load(open(self.dump_file))
        else:
            self.workflows_dump = {}
Jerome Mariette's avatar
Jerome Mariette committed
54
55
        self.ids_file = os.path.join(self.get_output_directory(), self.IDS_FILE_NAME)
    
56
57
58
59
60
61
    def _add_workflow(self, workflow):
        WorkflowsManager.lock_workflows_file.acquire()
        self.workflows_dump[utils.get_nb_string(workflow.id)] = workflow.dump_path
        pickle.dump(self.workflows_dump, open(self.dump_file, "w"))
        WorkflowsManager.lock_workflows_file.release()
    
62
    def get_available_workflows(self, parameters_section="parameters"):
Jerome Mariette's avatar
Jerome Mariette committed
63
64
65
66
67
68
69
70
        wf_instances = []
        # Load all modules within the workflow module
        for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
            __import__(modname)
            # Search for Workflow classes
            for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
                if issubclass(obj, jflow.workflow.Workflow) and obj.__name__ != jflow.workflow.Workflow.__name__:
                    # try to build the workflow
71
                    try: wf_instances.append(obj(parameters_section=parameters_section))
Jerome Mariette's avatar
Jerome Mariette committed
72
73
74
                    except: pass
        return wf_instances
    
75
    def rerun_workflow(self, workflow_id):
Jerome Mariette's avatar
Jerome Mariette committed
76
        workflow = self.get_workflow(workflow_id)
77
78
        workflow.start()
    
79
    def run_workflow(self, workflow_class, args, function="process", parameters_section="parameters"):
Jerome Mariette's avatar
Jerome Mariette committed
80
81
82
83
84
        # Load all modules within the workflow module
        for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
            __import__(modname)
            # Search for Workflow classes
            for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
85
                if class_name == workflow_class: workflow = obj(args, self.get_next_id(), function, parameters_section)
86
        workflow.start()
87
88
        # Add the workflow dump path to the workflows dump
        self._add_workflow(workflow)
Jerome Mariette's avatar
Jerome Mariette committed
89
90
91
92
93
94
95
        return workflow
        
    def get_output_directory(self):
        return self.config_reader.get_work_directory()
    
    def get_workflows(self):
        workflows = []
96
97
98
99
100
101
        for workflow_id in self.workflows_dump:
            try:
                workflow_dump = open(self.workflows_dump[workflow_id], "rb")
                workflows.append(pickle.load(workflow_dump))
                workflow_dump.close()
            except: pass
Jerome Mariette's avatar
Jerome Mariette committed
102
103
104
105
106
107
108
109
110
111
112
113
114
        return workflows

    def get_workflow_by_class(self, workflow_class):
        # Load all modules within the workflow module
        for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
            __import__(modname)
            # Search for Workflow classes
            for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
                if class_name == workflow_class:
                    return obj()
        return None
    
    def get_workflow(self, workflow_id):
115
116
117
        rworkflow_id = utils.get_nb_string(workflow_id)
        if self.workflows_dump.has_key(rworkflow_id):
            workflow_dump = open(self.workflows_dump[rworkflow_id], "rb")
Jerome Mariette's avatar
Jerome Mariette committed
118
119
120
            workflow = pickle.load(workflow_dump)
            workflow_dump.close()
        else:
121
            raise ValueError("Workflow with id " + str(rworkflow_id) + " cannot be retrived!")
Jerome Mariette's avatar
Jerome Mariette committed
122
123
124
125
126
127
128
        return workflow

    def get_workflow_directory(self, wname, wid):
        return os.path.join(os.path.join(self.config_reader.get_work_directory(), wname), 
                                         self.WF_DIRECTORY_PREFIX + utils.get_nb_string(wid))
    
    def get_next_id(self):
Jerome Mariette's avatar
Jerome Mariette committed
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
        WorkflowsManager.lock_ids_file.acquire()
        cid = 1
        try:
            if os.path.isfile(self.ids_file):
                ifh = open(self.ids_file)
                cid = int(ifh.readline().strip())
                ifh.close()
                ifh = open(self.ids_file, "w")
                ifh.write(str(cid+1))
                ifh.close()
                return cid+1
            else:
                ifh = open(self.ids_file, "w")
                ifh.write("1")
                ifh.close()
        finally:
            WorkflowsManager.lock_ids_file.release()
        return cid