ng6workflow.py 6.33 KB
Newer Older
Jerome Mariette's avatar
Jerome Mariette committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#
# Copyright (C) 2012 INRA
# 
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import inspect
import os
import pickle
Jerome Mariette's avatar
Jerome Mariette committed
21
import datetime
Jerome Mariette's avatar
Jerome Mariette committed
22
23

from jflow.workflow import Workflow
Jerome Mariette's avatar
Jerome Mariette committed
24
25
from jflow.parameter import Parameter, date

Jerome Mariette's avatar
Jerome Mariette committed
26
27
from ng6.project import Project
from ng6.run import Run
Jerome Mariette's avatar
Jerome Mariette committed
28
29
30
31
32


class NG6Workflow (Workflow):
    """
    """             
Jerome Mariette's avatar
Jerome Mariette committed
33

Jerome Mariette's avatar
Jerome Mariette committed
34
35
    def __init__(self, args={}, id=None, function= "process", parameters_section="parameters", add_run_params=True):
        Workflow.__init__(self, args, id, function, parameters_section)
Jerome Mariette's avatar
Jerome Mariette committed
36
37
38
        if add_run_params: 
            self.parameters.extend(self._add_run_parameters())
            self.args = self._extend_and_format_args(self.parameters, args)
Jerome Mariette's avatar
Jerome Mariette committed
39
40
41
42

    def _add_run_parameters(self):
        orequired = []
        # add run parameters
Jerome Mariette's avatar
Jerome Mariette committed
43
44
45
46
47
48
49
50
        orequired.append(Parameter("project_id", "The project id the run belongs to", "--project-id", required=True, type=int))
        orequired.append(Parameter("run_name", "Give a name to your run", "--name", required=True))
        orequired.append(Parameter("run_description", "Give a description to your run", "--description", required=True))
        orequired.append(Parameter("run_date", "When were the data produced","--date",  required=True, type=date))
        orequired.append(Parameter("data_nature", "Are Sequences cDNA, genomique, RNA, ...", "--data-nature", required=True))
        orequired.append(Parameter("sequencer", "Which sequencer produced the data", "--sequencer", required=True))
        orequired.append(Parameter("species", "Which species has been sequenced", "--species", required=True))
        orequired.append(Parameter("type", "What type of data is it (1 lane, 1 region)", "--type", required=True))        
Jerome Mariette's avatar
Jerome Mariette committed
51
52
        return orequired

53
    def pre_process(self):
Jerome Mariette's avatar
Jerome Mariette committed
54

Jerome Mariette's avatar
Jerome Mariette committed
55
56
        self.project = None
        self.runobj = None
Jerome Mariette's avatar
Jerome Mariette committed
57

Jerome Mariette's avatar
Jerome Mariette committed
58
        # if starting from an existing project
59
60
        if self.args.has_key("project_id"):
            self.project = Project.get_from_id(self.args["project_id"])
61
            self.metadata.append("project_id="+str(self.args["project_id"]))
Jerome Mariette's avatar
Jerome Mariette committed
62
        # if starting from a brand new project
63
64
        elif self.args.has_key("project_name") and self.args.has_key("project_description") and self.args.has_key("admin_login"):
            self.project = Project(self.args["project_name"], self.args["project_description"], self.args["admin_login"])
65
            self.metadata.append("project_id="+str(self.project.id))
Jerome Mariette's avatar
Jerome Mariette committed
66
        # if starting from an existing run
67
68
        if self.args.has_key("run_id"):
            self.runobj = Run.get_from_id(self.args["run_id"])
69
            self.metadata.append("run_id="+str(self.args["run_id"]))
Jerome Mariette's avatar
Jerome Mariette committed
70
        # if starting from a brand new run
71
72
        elif self.args.has_key("run_date") and self.args.has_key("run_name") and self.args.has_key("species") and self.args.has_key("data_nature") \
            and self.args.has_key("type") and self.args.has_key("run_description") and self.args.has_key("sequencer") and self.args.has_key("project_id"):
Jerome Mariette's avatar
Jerome Mariette committed
73
            # build the project
74
            self.project = Project.get_from_id(self.args["project_id"])
Jerome Mariette's avatar
Jerome Mariette committed
75
            # and the run
Jerome Mariette's avatar
Jerome Mariette committed
76
77
            self.runobj = Run(self.args["run_name"], self.args["run_date"], self.args["species"], self.args["data_nature"], 
                              self.args["type"], self.args["run_description"], self.args["sequencer"])
78
79
            # then add the run to the project
            self.project.add_run(self.runobj)
Jerome Mariette's avatar
Jerome Mariette committed
80
            # if it's a run with some index, let's write down description
81
            if self.args.has_key("sample") and self.args["sample"]:
82
                try:
83
84
85
86
87
                    samples = {}
                    for sample in self.args["sample"]:
                        samples[sample["sample_name"]] = sample["sample_description"]
                    self.runobj.add_mids_description(samples)
                except: pass
88
            self.metadata.append("run_id="+str(self.runobj.id))
Jerome Mariette's avatar
Jerome Mariette committed
89
    
90
    def add_component(self, component_name, args=[], kwargs={}, component_prefix="default", parent=None, addto="run"):
Jerome Mariette's avatar
Jerome Mariette committed
91
        # first build and check if this component is OK
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
        if self.comp_pckg.has_key(component_name):
            my_pckge = __import__(self.comp_pckg[component_name], globals(), locals(), [component_name], -1)
            # build the object and define required field
            cmpt_object = getattr(my_pckge, component_name)()
            cmpt_object.output_directory = self.get_component_output_directory(component_name, component_prefix)
            cmpt_object.prefix = component_prefix
            if kwargs: cmpt_object.define_parameters(**kwargs)
            else: cmpt_object.define_parameters(*args)
            # if the built object is an analysis
            for derived_class in cmpt_object.__class__.__bases__:
                if derived_class.__name__ == "Analysis":
                    # add parent and project/run information
                    cmpt_object.parent = parent
                    if addto == "project": cmpt_object.project = self.project
                    elif addto == "run": cmpt_object.run = self.runobj  
Jerome Mariette's avatar
Jerome Mariette committed
107
108
109
            # add the component
            self.components.append(cmpt_object)
            # if this one require a dynamic pipeline, execute the first part
110
            if cmpt_object.is_dynamic():
Jerome Mariette's avatar
Jerome Mariette committed
111
                self._execute_weaver()
112
113
114
                # update outputs
                for output in cmpt_object.get_dynamic_outputs():
                    output.update()
115
            return cmpt_object
Jerome Mariette's avatar
Jerome Mariette committed
116
        else:
117
            raise ImportError(component_name + " component cannot be loaded, available components are: {0}".format(", ".join(self.comp_pckg.keys())))
Jerome Mariette's avatar
Jerome Mariette committed
118
        
119
    def post_process(self):
Jerome Mariette's avatar
Jerome Mariette committed
120
        # once everything done, sync directories
Jerome Mariette's avatar
Jerome Mariette committed
121
        if self.runobj:
Jerome Mariette's avatar
Jerome Mariette committed
122
            self.runobj.sync()
Jerome Mariette's avatar
Jerome Mariette committed
123
124
        elif self.project:
            self.project.sync()
125