workflow.py 72.4 KB
Newer Older
1
#
Penom Nom's avatar
Penom Nom committed
2
# Copyright (C) 2015 INRA
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import inspect
import os
20
import re
21
22
23
24
import sys
import uuid
import pkgutil
import tempfile
25
import pickle
Jerome Mariette's avatar
Jerome Mariette committed
26
import time
27
import threading
Jerome Mariette's avatar
Jerome Mariette committed
28
import types
29
import datetime
Penom Nom's avatar
Penom Nom committed
30
31
import logging
import traceback
Penom Nom's avatar
Penom Nom committed
32

Penom Nom's avatar
Penom Nom committed
33
from datetime import date as ddate
34

Celine Noirot's avatar
Celine Noirot committed
35
36
from collections import OrderedDict

Jerome Mariette's avatar
Jerome Mariette committed
37
import jflow
Jerome Mariette's avatar
Jerome Mariette committed
38
import jflow.utils as utils
Penom Nom's avatar
Penom Nom committed
39
from jflow.utils import validate_email
40
from pygraph.classes.digraph import digraph
Penom Nom's avatar
Penom Nom committed
41
42
from jflow.workflows_manager import WorkflowsManager 
from jflow.config_reader import JFlowConfigReader
Penom Nom's avatar
Penom Nom committed
43
44
from jflow.utils import get_octet_string_representation, get_nb_octet
from jflow.parameter import *
Celine Noirot's avatar
Celine Noirot committed
45
46
47
from jflow.exceptions import RuleException
import jflow.rules as j_rules

Penom Nom's avatar
Penom Nom committed
48
from cctools.util import time_format
49
50
51
52
53
54
55
56
57

from weaver.script import ABSTRACTIONS
from weaver.script import DATASETS
from weaver.script import FUNCTIONS
from weaver.script import NESTS
from weaver.script import OPTIONS
from weaver.script import STACKS
from weaver.nest import Nest
from weaver.options import Options
Jerome Mariette's avatar
Jerome Mariette committed
58
from cctools.makeflow import MakeflowLog
Jerome Mariette's avatar
Jerome Mariette committed
59
from cctools.makeflow.log import Node
60

Celine Noirot's avatar
Celine Noirot committed
61
62
from workflows import rules as wf_rules

Jerome Mariette's avatar
Jerome Mariette committed
63

64
65
66
67
68
69
70
class MINIWorkflow(object):
    
    def __init__(self, id, name, description, status, start_time, end_time, metadata, 
                 component_nameids, compts_status, errors):
        self.id = id
        self.name = name
        self.description = description
Jerome Mariette's avatar
Jerome Mariette committed
71
        self._status = status
72
73
74
75
76
77
78
79
80
81
        self.start_time = start_time
        self.end_time = end_time
        self.metadata = metadata
        self.component_nameids = component_nameids
        self.compts_status = compts_status
        self.errors = errors
        
    def get_components_nameid(self):
        return self.component_nameids

82
83
84
    def get_components_status(self):
        return self.compts_status
    
85
86
87
88
89
90
91
    def get_component_status(self, component_nameid):
        return self.compts_status[component_nameid]
    
    def get_errors(self):
        return self.errors
    
    def get_status(self):
Jerome Mariette's avatar
Jerome Mariette committed
92
        return self._status
93

94
class Workflow(threading.Thread):
95
    
Jerome Mariette's avatar
Jerome Mariette committed
96
    MAKEFLOW_LOG_FILE_NAME = "Makeflow.makeflowlog"
97
    DUMP_FILE_NAME = ".workflow.dump"
98
    STDERR_FILE_NAME = "wf_stderr.txt"
99
    WORKING = ".working"
100
    OLD_EXTENSION = ".old"
Penom Nom's avatar
Penom Nom committed
101
    DEFAULT_GROUP = "default"
102
    
Penom Nom's avatar
Penom Nom committed
103
    STATUS_PENDING = "pending"
Jerome Mariette's avatar
Jerome Mariette committed
104
105
106
107
    STATUS_STARTED = "started"
    STATUS_COMPLETED = "completed"
    STATUS_FAILED = "failed"
    STATUS_ABORTED = "aborted"
Penom Nom's avatar
Penom Nom committed
108
109
    STATUS_RESETED = "reseted"
    
Penom Nom's avatar
Penom Nom committed
110
111
112
    INPUTFILE_GRAPH_LABEL = "inputfile"
    INPUTFILES_GRAPH_LABEL = "inputfiles"
    INPUTDIRECTORY_GRAPH_LABEL = "inputdirectory"
113
114
    COMPONENT_GRAPH_LABEL = "component"
    
Penom Nom's avatar
Penom Nom committed
115
116
    
    def __init__(self, args={}, id=None, function= "process"):
117
118
        # define as a thread
        threading.Thread.__init__(self)
119
120
        self.jflow_config_reader = JFlowConfigReader()
        self.manager = WorkflowsManager()
Penom Nom's avatar
Penom Nom committed
121
        self.components_to_exec = []
122
        self.components = []
123
        self.makes = {}
124
125
        self.globals = {}
        self.options = Options()
Jerome Mariette's avatar
Jerome Mariette committed
126
127
        self._status = self.STATUS_STARTED
        self._postprocess_status = self.STATUS_PENDING
Jerome Mariette's avatar
Jerome Mariette committed
128
129
        self.start_time = None
        self.end_time = None
Penom Nom's avatar
Penom Nom committed
130
        self.__step = None
Penom Nom's avatar
Penom Nom committed
131
132
        self.stderr = None
        self.args = args
133
        self.dynamic_component_present = False
Penom Nom's avatar
Penom Nom committed
134
135
136
        self.__to_address = None
        self.__subject = None
        self.__message = None
Jerome Mariette's avatar
Jerome Mariette committed
137
        self.function = function
Penom Nom's avatar
Penom Nom committed
138
139
        # intruduce --log-verbose to be able to monitor the new version of makeflow >=4.2.2
        self.engine_arguments = ' --log-verbose '
140
        self.component_nameids_is_init = False
Jerome Mariette's avatar
Jerome Mariette committed
141
        self.component_nameids = {}
Penom Nom's avatar
Penom Nom committed
142
        self.reseted_components = []
143
144
        # try to parse engine arguments
        try:
Penom Nom's avatar
Penom Nom committed
145
            type, options, limit_submission = self.jflow_config_reader.get_batch()
146
            if limit_submission : self.engine_arguments += ' -J ' + str(limit_submission)
147
148
            if type: self.engine_arguments += ' -T ' + type
            if options : self.engine_arguments += ' -B "' + options + '"'
149
        except: self.engine_arguments = None
Penom Nom's avatar
Penom Nom committed
150

151
        self.id = id
Penom Nom's avatar
Penom Nom committed
152
153
        self.name = self.get_name()
        self.description = self.get_description()
Celine Noirot's avatar
Celine Noirot committed
154
        self.tools_description = self.get_tools_description()
Penom Nom's avatar
Penom Nom committed
155
        self.__group = self.jflow_config_reader.get_workflow_group(self.__class__.__name__) or Workflow.DEFAULT_GROUP
Penom Nom's avatar
Penom Nom committed
156
157
158
        
        # define the parameters 
        self.params_order = []
Penom Nom's avatar
Penom Nom committed
159
160
        if self.function != None:
            self.define_parameters(self.function)
Penom Nom's avatar
Penom Nom committed
161
        # add the metadata parameter
162
        self.metadata = []
Penom Nom's avatar
Penom Nom committed
163
        
Jerome Mariette's avatar
Jerome Mariette committed
164
165
166
        if self.id is not None:
            self.directory = self.manager.get_workflow_directory(self.name, self.id)
            if not os.path.isdir(self.directory):
Penom Nom's avatar
Penom Nom committed
167
                os.makedirs(self.directory, 0o751)
Penom Nom's avatar
Penom Nom committed
168
            if self.stderr is None:
169
                self.stderr = self._set_stderr()
Jerome Mariette's avatar
Jerome Mariette committed
170
            self._serialize()
Penom Nom's avatar
Penom Nom committed
171
            
Penom Nom's avatar
Penom Nom committed
172
173
        self.internal_components = self._import_internal_components()
        self.external_components = self._import_external_components()
Penom Nom's avatar
Penom Nom committed
174

Penom Nom's avatar
Penom Nom committed
175
176
    def get_workflow_group(self):
        return self.__group
Penom Nom's avatar
Penom Nom committed
177
            
Penom Nom's avatar
Penom Nom committed
178
    def add_input_directory(self, name, help, default=None, required=False, flag=None, 
Celine Noirot's avatar
Celine Noirot committed
179
                            group="default", display_name=None, get_files_fn=None, add_to=None, rules=None):
Penom Nom's avatar
Penom Nom committed
180
        new_param = InputDirectory(name, help, flag=flag, default=default, required=required, 
Celine Noirot's avatar
Celine Noirot committed
181
                                   group=group, display_name=display_name, get_files_fn=get_files_fn, rules=rules)
Penom Nom's avatar
Penom Nom committed
182
183
184
185
186
187
188
189
190
191
192
        new_param.linkTrace_nameid = name
        # if this input should be added to a particular parameter
        if add_to:
            try:
                self.__getattribute__(add_to).add_sub_parameter(new_param)
            except: pass
        # otherwise, add it to the class itself
        else:
            self.params_order.append(name)
            self.__setattr__(name, new_param)
    
Celine Noirot's avatar
Celine Noirot committed
193
194
195
    def add_input_file(self, name, help, file_format="any", choices=None, default=None, type="inputfile",
                       required=False, flag=None, group="default", display_name=None, size_limit="0", add_to=None,
                       rules=None):
Penom Nom's avatar
Penom Nom committed
196
197
198
        # check if the size provided is correct
        try: int(get_nb_octet(size_limit))
        except: size_limit="0"
Celine Noirot's avatar
Celine Noirot committed
199
200
201
        new_param = InputFile(name, help, flag=flag, file_format=file_format, choices=choices, default=default,
                              type=type, required=required, group=group, display_name=display_name, size_limit=size_limit,
                              rules=rules)
202
        new_param.linkTrace_nameid = name
Penom Nom's avatar
Penom Nom committed
203
204
205
206
207
208
209
210
211
212
        # if this input should be added to a particular parameter
        if add_to:
            try:
                self.__getattribute__(add_to).add_sub_parameter(new_param)
            except: pass
        # otherwise, add it to the class itself
        else:
            self.params_order.append(name)
            self.__setattr__(name, new_param)
            
Celine Noirot's avatar
Celine Noirot committed
213
214
215
    def add_input_file_list(self, name, help, file_format="any", choices=None, default=None, type="inputfile",
                            required=False, flag=None, group="default", display_name=None, size_limit="0", add_to=None,
                            rules=None):
Penom Nom's avatar
Penom Nom committed
216
217
218
219
        # check if the size provided is correct
        if default == None: default = []
        try: int(get_nb_octet(size_limit))
        except: size_limit="0"
220
221
222
223
224
225
        if default == None:
            inputs = []
        elif issubclass(default.__class__, list):
            inputs = [IOFile(file, file_format, name, None) for file in default]
        else:
            inputs = [IOFile(default, file_format, name, None)]
Celine Noirot's avatar
Celine Noirot committed
226
227
228
        new_param = InputFileList(name, help, flag=flag, file_format=file_format, default=inputs, choices=choices,
                                  type=type, required=required, group=group, display_name=display_name, size_limit=size_limit,
                                  rules=rules)
229
        new_param.linkTrace_nameid = name
Penom Nom's avatar
Penom Nom committed
230
231
232
233
234
235
236
237
238
239
        # if this input should be added to a particular parameter
        if add_to:
            try:
                self.__getattribute__(add_to).add_sub_parameter(new_param)
            except: pass
        # otherwise, add it to the class itself
        else:
            self.params_order.append(name)
            self.__setattr__(name, new_param)
            
Celine Noirot's avatar
Celine Noirot committed
240
241
    def add_multiple_parameter(self, name, help, required=False, flag=None, group="default", display_name=None,
                               rules=None):
Penom Nom's avatar
Penom Nom committed
242
        self.params_order.append(name)
Celine Noirot's avatar
Celine Noirot committed
243
244
        new_param = MultiParameter(name, help, flag=flag, required=required, group=group, display_name=display_name,
                                   rules=rules)
Penom Nom's avatar
Penom Nom committed
245
246
        self.__setattr__(name, new_param)

Celine Noirot's avatar
Celine Noirot committed
247
248
    def add_multiple_parameter_list(self, name, help, required=False, flag=None, group="default", display_name=None,
                                    rules=None, paired_columns=None):
Penom Nom's avatar
Penom Nom committed
249
        self.params_order.append(name)
Celine Noirot's avatar
Celine Noirot committed
250
251
        new_param = MultiParameterList(name, help, flag=flag, required=required, group=group, display_name=display_name,
                                       rules=rules, paired_columns=paired_columns)
Penom Nom's avatar
Penom Nom committed
252
253
        self.__setattr__(name, new_param)
    
Penom Nom's avatar
Penom Nom committed
254
    def add_parameter(self, name, help, default=None, type=str, choices=None, 
Celine Noirot's avatar
Celine Noirot committed
255
                      required=False, flag=None, group="default", display_name=None, add_to=None, rules=None):
Penom Nom's avatar
Penom Nom committed
256
        new_param = ParameterFactory.factory(name, help, flag=flag, default=default, type=type, choices=choices, 
Celine Noirot's avatar
Celine Noirot committed
257
                              required=required, group=group, display_name=display_name, rules=rules)
Penom Nom's avatar
Penom Nom committed
258
259
260
261
262
263
264
265
266
267
        # if this input should be added to a particular parameter
        if add_to:
            try:
                self.__getattribute__(add_to).add_sub_parameter(new_param)
            except: pass
        # otherwise, add it to the class itself
        else:
            self.params_order.append(name)
            self.__setattr__(name, new_param)
    
Penom Nom's avatar
Penom Nom committed
268
    def add_parameter_list(self, name, help, default=None, type=str, choices=None, 
Celine Noirot's avatar
Celine Noirot committed
269
                           required=False, flag=None, group="default", display_name=None, add_to=None, rules=None):
Penom Nom's avatar
Penom Nom committed
270
271
        if default == None: default = []
        new_param = ParameterList(name, help, flag=flag, default=default, type=type, choices=choices, 
Celine Noirot's avatar
Celine Noirot committed
272
                                  required=required, group=group, display_name=display_name, rules=rules)
Penom Nom's avatar
Penom Nom committed
273
274
275
276
277
278
279
280
281
        # if this input should be added to a particular parameter
        if add_to:
            try:
                self.__getattribute__(add_to).add_sub_parameter(new_param)
            except: pass
        # otherwise, add it to the class itself
        else:
            self.params_order.append(name)
            self.__setattr__(name, new_param)
Penom Nom's avatar
Penom Nom committed
282
#                         break
Penom Nom's avatar
Penom Nom committed
283
284
285
286

    def _prepare_parameter(self, args, parameter, key="name"):
        new_param = None
        # Retrieve value
Penom Nom's avatar
Penom Nom committed
287
        if parameter.__getattribute__(key) in args:
Penom Nom's avatar
Penom Nom committed
288
            value = args[parameter.__getattribute__(key)]
289
        elif parameter.default != None:
Penom Nom's avatar
Penom Nom committed
290
291
292
293
            value = parameter.default
        else:
            value = None
        # Set new parameter
Penom Nom's avatar
Penom Nom committed
294
        if parameter.__class__ in [StrParameter, IntParameter, FloatParameter, BoolParameter, DateParameter, PasswordParameter]:
295
            if value == "" and parameter.__class__ in [IntParameter, FloatParameter, BoolParameter, DateParameter] : value = None # from GUI
Penom Nom's avatar
Penom Nom committed
296
297
298
299
300
301
302
303
304
305
            new_param = ParameterFactory.factory( parameter.name, parameter.help, default=value, type=parameter.type, choices=parameter.choices, 
                                                  required=parameter.required, flag=parameter.flag, group=parameter.group, 
                                                  display_name=parameter.display_name )
        elif parameter.__class__ ==  ParameterList:
            if value == "" : value = [] # from GUI
            new_param = ParameterList( parameter.name, parameter.help, default=value, type=parameter.type, choices=parameter.choices,
                                       required=parameter.required, flag=parameter.flag, sub_parameters=parameter.sub_parameters,
                                       group=parameter.group, display_name=parameter.display_name )
        elif parameter.__class__ == InputFileList:
            if value == "" : value = [] # from GUI
306
            iovalues = []
Penom Nom's avatar
Penom Nom committed
307
            prepared_files = parameter.prepare(value)
308
309
310
            for file in prepared_files:
                iovalues.append(IOFile(file, parameter.file_format, parameter.linkTrace_nameid, None))
            new_param = InputFileList( parameter.name, parameter.help, file_format=parameter.file_format, default=iovalues,
Penom Nom's avatar
Penom Nom committed
311
312
                                       type=parameter.type, choices=parameter.choices, required=parameter.required, flag=parameter.flag,
                                       group=parameter.group, display_name=parameter.display_name, size_limit=parameter.size_limit )
313
            new_param.linkTrace_nameid = parameter.linkTrace_nameid            
Penom Nom's avatar
Penom Nom committed
314
315
        elif parameter.__class__ == InputFile:
            if value == "" : value = None # from GUI
Penom Nom's avatar
Penom Nom committed
316
            prepared_file = parameter.prepare(value)
317
            new_param = InputFile( parameter.name, parameter.help, file_format=parameter.file_format, default=prepared_file,
Penom Nom's avatar
Penom Nom committed
318
319
                                   type=parameter.type, choices=parameter.choices, required=parameter.required, flag=parameter.flag, 
                                   group=parameter.group, display_name=parameter.display_name )
320
            new_param.linkTrace_nameid = parameter.linkTrace_nameid
Penom Nom's avatar
Penom Nom committed
321
322
323
324
325
326
327
        elif parameter.__class__ == InputDirectory:
            if value == "" : value = None # from GUI
            prepared_directory = parameter.prepare(value)
            new_param = InputDirectory( parameter.name, parameter.help, default=prepared_directory, choices=parameter.choices, 
                                        required=parameter.required, flag=parameter.flag, group=parameter.group, 
                                        display_name=parameter.display_name, get_files_fn=parameter.get_files_fn)
            new_param.linkTrace_nameid = parameter.linkTrace_nameid
Penom Nom's avatar
Penom Nom committed
328
329
330
331
332
333
334
335
336
337
338
        else:
            raise Exception( "Unknown class '" +  parameter.__class__.__name__ + "' for parameter.")
        return new_param

    def _set_parameters(self, args):
        parameters = self.get_parameters()
        for param in parameters:
            new_param = None
            if param.__class__ == MultiParameter:
                new_param = MultiParameter(param.name, param.help, required=param.required, flag=param.flag, group=param.group, display_name=param.display_name)
                new_param.sub_parameters = param.sub_parameters
Penom Nom's avatar
Penom Nom committed
339
                if param.name in args:
340
341
342
343
344
345
                    sub_args = {}
                    for sarg in args[param.name]:
                        sub_args[sarg[0]] = sarg[1]
                    for sub_param in param.sub_parameters:
                        new_sub_parameter = self._prepare_parameter(sub_args, sub_param, "flag")
                        new_param[new_sub_parameter.name] = new_sub_parameter
Penom Nom's avatar
Penom Nom committed
346
347
348
            elif param.__class__ == MultiParameterList:
                new_param = MultiParameterList(param.name, param.help, required=param.required, flag=param.flag, group=param.group, display_name=param.display_name)
                new_param.sub_parameters = param.sub_parameters
Penom Nom's avatar
Penom Nom committed
349
                if param.name in args:
Penom Nom's avatar
Penom Nom committed
350
351
352
353
354
355
356
357
358
                    for idx, sargs in enumerate(args[param.name]):
                        new_multi_param = MultiParameter(param.name + '_' + str(idx), '', required=False, flag=None, group="default", display_name=None)
                        sub_args = {}
                        for sarg in sargs:
                            sub_args[sarg[0]] = sarg[1]
                        for sub_param in param.sub_parameters:
                            new_sub_param = self._prepare_parameter(sub_args, sub_param, "flag")
                            new_multi_param[new_sub_param.name] = new_sub_param
                        new_param.append(new_multi_param)
Penom Nom's avatar
Penom Nom committed
359
360
361
362
            else:
                new_param = self._prepare_parameter(args, param)
            self.__setattr__(param.name, new_param)

Celine Noirot's avatar
Celine Noirot committed
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
    def __print_execution_graph(self):
        gr = self.get_execution_graph()
        inputs, components = [], []
        for node in gr.nodes():
            if Workflow.INPUTFILE_GRAPH_LABEL in gr.node_attributes(node):
                inputs.append(gr.node_attributes(node)[1])
            elif Workflow.INPUTFILES_GRAPH_LABEL in gr.node_attributes(node):
                inputs.append(gr.node_attributes(node)[1])
            elif Workflow.INPUTDIRECTORY_GRAPH_LABEL in gr.node_attributes(node):
                inputs.append(gr.node_attributes(node)[1])
            elif Workflow.COMPONENT_GRAPH_LABEL in gr.node_attributes(node):
                components.append(gr.node_attributes(node)[1])
        print(("inputs: ", inputs))
        print(("components: ", components))
        print(("edges: ", gr.edges()))

    def __print_outputs(self, logs):
        outputs = self.get_outputs_per_components()
        descriptions = self.get_description_per_components()
        idx = 1
        for output in outputs:
            name = output[:output.index(".")]
            component = str(idx) + ". "
            if output in descriptions and descriptions[output] is not None:
                component += descriptions[output]
            else:
                component += name
            component += ":\n"
            has_outputs = False
            for output_file in sorted(outputs[output]):
                if logs or (not output_file.endswith(".stdout") and not output_file.endswith(".stderr")):
                    has_outputs = True
                    component += "  - " + outputs[output][output_file] + "\n"
            if has_outputs:
                print(component)
            idx += 1

    def __print_programs(self):
        programs = self.get_workflow_programs()
        descriptions = self.get_description_per_components()
        idx = 1
        for component, program in programs.items():
            name = component
            component_str = str(idx) + ". "
            if component in descriptions and descriptions[component] is not None:
                component_str += descriptions[component]
            else:
                component_str += name
            component_str += ":\n"
            component_str += "\t- Program: {0}\n".format(program["name"])
            component_str += "\t- Version: {0}\n".format(program["version"])
            component_str += "\t- Parameters: {0}\n".format(", ".join(program["parameters"])
                                                            if len(program["parameters"]) > 0 else "None")
            print(component_str)
            idx += 1

    def print_output(self, type_o):
        if type_o == "execution_graph":
            self.__print_execution_graph()
        elif type_o in ("outputs", "outputs_logs"):
            self.__print_outputs(type_o == "outputs_logs")
        elif type_o == "programs":
            self.__print_programs()
        else:
            print("Unknow command to print: {0}".format(type_o))

Penom Nom's avatar
Penom Nom committed
429
    def get_execution_graph(self):
430
        gr = digraph()
431
432
        # build a all_nodes table to store all nodes
        all_nodes = {}
Penom Nom's avatar
Penom Nom committed
433
        for ioparameter in list(self.__dict__.values()):
434
435
            if issubclass(ioparameter.__class__, InputFile):
                gr.add_node(ioparameter.name)
Penom Nom's avatar
Penom Nom committed
436
                gr.add_node_attribute(ioparameter.name, self.INPUTFILE_GRAPH_LABEL)
Penom Nom's avatar
Penom Nom committed
437
                gr.add_node_attribute(ioparameter.name, ioparameter.display_name)
438
439
440
                all_nodes[ioparameter.name] = None
            elif issubclass(ioparameter.__class__, InputFileList):
                gr.add_node(ioparameter.name)
Penom Nom's avatar
Penom Nom committed
441
442
443
444
445
446
                gr.add_node_attribute(ioparameter.name, self.INPUTFILES_GRAPH_LABEL)
                gr.add_node_attribute(ioparameter.name, ioparameter.display_name)
                all_nodes[ioparameter.name] = None
            elif issubclass(ioparameter.__class__, InputDirectory):
                gr.add_node(ioparameter.name)
                gr.add_node_attribute(ioparameter.name, self.INPUTDIRECTORY_GRAPH_LABEL)
Penom Nom's avatar
Penom Nom committed
447
                gr.add_node_attribute(ioparameter.name, ioparameter.display_name)
448
                all_nodes[ioparameter.name] = None
Penom Nom's avatar
Penom Nom committed
449
450
451
            elif issubclass(ioparameter.__class__, MultiParameter):
                for subparam in ioparameter.sub_parameters:
                    if issubclass(subparam.__class__, InputFile):
Celine Noirot's avatar
Celine Noirot committed
452
453
                        all_nodes[subparam.name] = None
                        gr.add_node(subparam.name)
Penom Nom's avatar
Penom Nom committed
454
                        gr.add_node_attribute(subparam.name, self.INPUTFILE_GRAPH_LABEL)
Celine Noirot's avatar
Celine Noirot committed
455
                        gr.add_node_attribute(subparam.name, subparam.display_name)
Penom Nom's avatar
Penom Nom committed
456
                    elif issubclass(subparam.__class__, InputFileList):
Celine Noirot's avatar
Celine Noirot committed
457
458
                        all_nodes[subparam.name] = None
                        gr.add_node(subparam.name)
Penom Nom's avatar
Penom Nom committed
459
                        gr.add_node_attribute(subparam.name, self.INPUTFILES_GRAPH_LABEL)
Celine Noirot's avatar
Celine Noirot committed
460
                        gr.add_node_attribute(subparam.name, subparam.display_name)
Penom Nom's avatar
Penom Nom committed
461
                    elif issubclass(subparam.__class__, InputDirectory):
Celine Noirot's avatar
Celine Noirot committed
462
463
                        all_nodes[subparam.name] = None
                        gr.add_node(subparam.name)
Penom Nom's avatar
Penom Nom committed
464
                        gr.add_node_attribute(subparam.name, self.INPUTDIRECTORY_GRAPH_LABEL)
Celine Noirot's avatar
Celine Noirot committed
465
                        gr.add_node_attribute(subparam.name, subparam.display_name)
Penom Nom's avatar
Penom Nom committed
466
467
            elif issubclass(ioparameter.__class__, MultiParameterList):
                for subparam in ioparameter.sub_parameters:
Celine Noirot's avatar
Celine Noirot committed
468
469
470
471
472
473
474
475
                    if issubclass(subparam.__class__, InputFile):
                        all_nodes[subparam.name] = None
                        gr.add_node(subparam.name)
                        gr.add_node_attribute(subparam.name, self.INPUTFILE_GRAPH_LABEL)
                        gr.add_node_attribute(subparam.name, subparam.display_name)
                    elif issubclass(subparam.__class__, InputFileList):
                        all_nodes[subparam.name] = None
                        gr.add_node(subparam.name)
Penom Nom's avatar
Penom Nom committed
476
                        gr.add_node_attribute(subparam.name, self.INPUTFILES_GRAPH_LABEL)
Celine Noirot's avatar
Celine Noirot committed
477
478
479
480
481
482
                        gr.add_node_attribute(subparam.name, subparam.display_name)
                    elif issubclass(subparam.__class__, InputDirectory):
                        all_nodes[subparam.name] = None
                        gr.add_node(subparam.name)
                        gr.add_node_attribute(subparam.name, self.INPUTDIRECTORY_GRAPH_LABEL)
                        gr.add_node_attribute(subparam.name, subparam.display_name)
483
        for cpt in self.components:
484
485
            gr.add_node(cpt.get_nameid())
            gr.add_node_attribute(cpt.get_nameid(), self.COMPONENT_GRAPH_LABEL)
Penom Nom's avatar
Penom Nom committed
486
            gr.add_node_attribute(cpt.get_nameid(), cpt.get_nameid())
487
            all_nodes[cpt.get_nameid()] = None
Penom Nom's avatar
Penom Nom committed
488
        for cpt in self.components:
Penom Nom's avatar
Penom Nom committed
489
            for ioparameter in list(cpt.__dict__.values()):
Penom Nom's avatar
Penom Nom committed
490
                if issubclass( ioparameter.__class__, InputFile ) or issubclass( ioparameter.__class__, InputFileList) or issubclass( ioparameter.__class__, InputDirectory):
491
492
                    for parent in ioparameter.parent_linkTrace_nameid:
                        try: gr.add_edge((parent, ioparameter.linkTrace_nameid))
493
                        except: pass
Penom Nom's avatar
Penom Nom committed
494
495
496
497
                elif issubclass( ioparameter.__class__, InputObject) or issubclass( ioparameter.__class__, InputObjectList):
                    for parent in ioparameter.parent_linkTrace_nameid:
                        try: gr.add_edge((parent, ioparameter.linkTrace_nameid))
                        except: pass
498
499
        # check if all nodes are connected
        for edge in gr.edges():
Penom Nom's avatar
Penom Nom committed
500
            if edge[0] in all_nodes:
501
                del all_nodes[edge[0]]
Penom Nom's avatar
Penom Nom committed
502
            if edge[1] in all_nodes:
503
504
                del all_nodes[edge[1]]
        # then remove all unconnected nodes: to delete inputs not defined by the user
Penom Nom's avatar
Penom Nom committed
505
        for orphan_node in list(all_nodes.keys()):
506
            gr.del_node(orphan_node)
507
        return gr
Penom Nom's avatar
Penom Nom committed
508

509
510
    def delete(self):
        if self.get_status() in [self.STATUS_COMPLETED, self.STATUS_FAILED, self.STATUS_ABORTED]:
511
            utils.robust_rmtree(self.directory)
Penom Nom's avatar
Penom Nom committed
512

Jerome Mariette's avatar
Jerome Mariette committed
513
    @staticmethod
514
    def config_parser(arg_lines):
515
        for arg in arg_lines:
516
            yield arg
517
518
519
520
            
    @staticmethod
    def get_status_under_text_format(workflow, detailed=False, display_errors=False, html=False):
        if workflow.start_time: start_time = time.asctime(time.localtime(workflow.start_time))
Penom Nom's avatar
Penom Nom committed
521
        else: start_time = "-"
522
523
        if workflow.start_time and workflow.end_time: elapsed_time = str(workflow.end_time-workflow.start_time)
        elif workflow.start_time: elapsed_time = str(time.time()-workflow.start_time)
Penom Nom's avatar
Penom Nom committed
524
        else: elapsed_time = "-"
525
526
        elapsed_time = "-" if elapsed_time == "-" else str(datetime.timedelta(seconds=int(str(elapsed_time).split(".")[0])))
        if workflow.end_time: end_time = time.asctime(time.localtime(workflow.end_time))
Penom Nom's avatar
Penom Nom committed
527
528
529
        else: end_time = "-"
        if detailed:
            # Global
530
531
            title = "Workflow #" + utils.get_nb_string(workflow.id) + " (" + workflow.name + ") is " + \
                    workflow.get_status() + ", time elapsed: " + str(elapsed_time) + " (from " + start_time + \
Penom Nom's avatar
Penom Nom committed
532
533
                    " to " + end_time + ")"
            worflow_errors = ""
534
            error = workflow.get_errors()
Penom Nom's avatar
Penom Nom committed
535
536
537
538
539
540
            if error is not None:
                if html: worflow_errors = "Workflow Error :\n  <span style='color:#ff0000'>" + error["location"] + "\n    " + "\n    ".join(error["msg"]) + "</span>"
                else: worflow_errors = "Workflow Error :\n  \033[91m" + error["location"] + "\n    " + "\n    ".join(error["msg"]) + "\033[0m"
            # By components
            components_errors = ""
            status = "Components Status :\n"
541
            components_status = workflow.get_components_status()
542
            for i, component in enumerate(workflow.get_components_nameid()):
543
                status_info = components_status[component]
Penom Nom's avatar
Penom Nom committed
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
                try: perc_waiting = (status_info["waiting"]*100.0)/status_info["tasks"]
                except: perc_waiting = 0
                try: perc_running = (status_info["running"]*100.0)/status_info["tasks"]
                except: perc_running = 0
                try: perc_failed = (status_info["failed"]*100.0)/status_info["tasks"]
                except: perc_failed = 0
                try: perc_aborted = (status_info["aborted"]*100.0)/status_info["tasks"]
                except: perc_aborted = 0
                try: perc_completed = (status_info["completed"]*100.0)/status_info["tasks"]
                except: perc_completed = 0
                
                if status_info["running"] > 0: 
                    if html: running = "<span style='color:#3b3bff'>running:" + str(status_info["running"]) + "</span>"
                    else: running = "\033[94mrunning:" + str(status_info["running"]) + "\033[0m"
                else: running = "running:" + str(status_info["running"])
                if status_info["waiting"] > 0: 
                    if html: waiting = "<span style='color:#ffea00'>waiting:" + str(status_info["waiting"]) + "</span>"
                    else: waiting = "\033[93mwaiting:" + str(status_info["waiting"]) + "\033[0m"
                else: waiting = "waiting:" + str(status_info["waiting"])            
                if status_info["failed"] > 0: 
                    if html: failed = "<span style='color:#ff0000'>failed:" + str(status_info["failed"]) + "</span>"
                    else: failed = "\033[91mfailed:" + str(status_info["failed"]) + "\033[0m"
                else: failed = "failed:" + str(status_info["failed"])
                if status_info["aborted"] > 0: 
                    if html: aborted = "<span style='color:#ff01ba'>aborted:" + str(status_info["aborted"]) + "</span>"
                    else: aborted = "\033[95maborted:" + str(status_info["aborted"]) + "\033[0m"
                else: aborted = "aborted:" + str(status_info["aborted"])
                if status_info["completed"] == status_info["tasks"] and status_info["completed"] > 0: 
                    if html: completed = "<span style='color:#14ac00'>completed:" + str(status_info["completed"]) + "</span>"
                    else: completed = "\033[92mcompleted:" + str(status_info["completed"]) + "\033[0m"
                else: completed = "completed:" + str(status_info["completed"])
                
                if display_errors and len(status_info["failed_commands"]) > 0:
Penom Nom's avatar
Penom Nom committed
577
578
                    if components_errors == "":
                        components_errors = "Failed Commands :\n"
Penom Nom's avatar
Penom Nom committed
579
                    components_errors += "  - " + component + " :\n    " + "\n    ".join(status_info["failed_commands"]) + "\n"
Penom Nom's avatar
Penom Nom committed
580
581
582
                status += "  - " + component + ", time elapsed " + time_format(status_info["time"]) + \
                    " (total:" + str(status_info["tasks"]) + ", " + waiting + ", " + running + ", " + failed + \
                    ", " + aborted + ", " + completed + ")"
583
                if i<len(workflow.get_components_nameid())-1: status += "\n"
Penom Nom's avatar
Penom Nom committed
584
585
586
            # Format str
            pretty_str = title
            pretty_str += ("\n" + worflow_errors) if worflow_errors != "" else ""
Penom Nom's avatar
Penom Nom committed
587
588
589
            if len(workflow.get_components_nameid()) > 0:
                pretty_str += ("\n" + status) if status != "" else ""
                pretty_str += ("\n" + components_errors[:-1]) if components_errors != "" else ""
Penom Nom's avatar
Penom Nom committed
590
591
592
            if html: return pretty_str.replace("\n", "<br />")
            else: return pretty_str
        else:
593
594
595
596
597
598
599
600
601
602
603
604
605
606
            pretty_str = utils.get_nb_string(workflow.id) + "\t" + workflow.name + "\t"
            if workflow.get_status() == Workflow.STATUS_STARTED:
                pretty_str += "\033[94m"
            elif workflow.get_status() == Workflow.STATUS_COMPLETED:
                pretty_str += "\033[92m"
            elif workflow.get_status() == Workflow.STATUS_FAILED:
                pretty_str += "\033[91m"
            elif workflow.get_status() == Workflow.STATUS_ABORTED:
                pretty_str += "\033[91m"
            elif workflow.get_status() == Workflow.STATUS_RESETED:
                pretty_str += "\033[3m"
            pretty_str += workflow.get_status() + "\033[0m"
            pretty_str += "\t" + elapsed_time + "\t" + start_time + "\t" + end_time
            return pretty_str
Penom Nom's avatar
Penom Nom committed
607
608
    
    def get_errors(self):
Penom Nom's avatar
Penom Nom committed
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
        if os.path.isfile(self.stderr):
            error = {
                "title"     : "",
                "msg"       : list(),
                "traceback" : list()
            }
            line_idx = 0
            FH_stderr = open( self.stderr )
            lines = FH_stderr.readlines()
            while line_idx < len(lines):
                if lines[line_idx].strip().startswith("##"):
                    error["title"]     = lines[line_idx].rstrip()
                    error["msg"]       = list()
                    error["traceback"] = list()
                    # skip all lines before the traceback
                    while not lines[line_idx].startswith("Traceback"):
                        line_idx += 1
                    # skip : "Traceback (most recent call last):"
                    line_idx += 1    
                    while lines[line_idx] != lines[line_idx].lstrip():
                        error["traceback"].append({ 
                                                   "location" : lines[line_idx].strip(),
                                                   "line"     : lines[line_idx].strip()
                        })
                        line_idx += 2
                    # Error message
                    while line_idx < len(lines) and not lines[line_idx].strip().startswith("##"):
                        try:
                            error["msg"].append( lines[line_idx].strip().split(":", 1)[1][1:] )
                        except:
                            error["msg"].append( lines[line_idx].strip() )
                        line_idx += 1
                    line_idx -= 1
                line_idx += 1
            FH_stderr.close()
            last_stack_location = ""
            if len(error["traceback"]) > 0:
                last_stack_location = error["traceback"][-1]["location"].strip()
                return { "msg" : error["msg"], "location" : last_stack_location }
            else:
                return None
Penom Nom's avatar
Penom Nom committed
650
651
        else:
            return None
Celine Noirot's avatar
Celine Noirot committed
652
653
654
655
656
657
658

    def get_description_per_components(self):
        dag = self.get_execution_graph()
        descriptions = {}
        for current_components in self.components:
            descriptions[current_components.get_nameid()] = current_components.get_description()
        return descriptions
Penom Nom's avatar
Penom Nom committed
659
                
Penom Nom's avatar
Penom Nom committed
660
    def get_outputs_per_components(self):
Celine Noirot's avatar
Celine Noirot committed
661
        outputs_files = OrderedDict()
Penom Nom's avatar
Penom Nom committed
662
663
664
        for current_components in self.components:
            outputs_files[current_components.get_nameid()] = current_components.get_output_files()
        return outputs_files
Celine Noirot's avatar
Celine Noirot committed
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682

    def get_workflow_programs(self):
        programs = OrderedDict()
        for current_component in self.components:
            version = current_component.version
            if version:
                program = {
                    "name": current_component.prg_name,
                    "version": version,
                    "parameters": []
                }
                for param in current_component.params_order:
                    param_obj = getattr(current_component, param)
                    if param_obj.cmd_format != "" and not param_obj.is_None and param_obj:
                        program["parameters"].append(param_obj.cmd_format + ((" " + str(param_obj))
                                                     if param_obj.type != bool else ""))
                programs[current_component.get_nameid()] = program
        return programs
Jerome Mariette's avatar
Jerome Mariette committed
683
    
684
685
    def __setstate__(self, state):
        self.__dict__ = state.copy()
Penom Nom's avatar
Penom Nom committed
686
        self.external_components = self._import_external_components()
687
688
689
690
691
692
693
        threading.Thread.__init__(self, name=self.name)
        
    def __getstate__(self):
        """
        Threading uses Lock Object, do not consider these objects when serializing a workflow
        """
        odict = self.__dict__.copy()
Penom Nom's avatar
Penom Nom committed
694
695
696
697
698
699
700
        del odict['_started']
        if '_tstate_lock' in odict: # python 3.4
            del odict['_tstate_lock']
        else: # python 3.2
            del odict['_block']
        del odict['_stderr']
        if 'external_components' in odict:
Penom Nom's avatar
Penom Nom committed
701
            del odict['external_components']
702
        return odict
703
    
Penom Nom's avatar
Penom Nom committed
704
705
706
707
708
709
710
711
    def set_to_address(self, to_address):
        self.__to_address = to_address

    def set_subject(self, subject):
        self.__subject = subject

    def set_message(self, message):
        self.__message = message
Jerome Mariette's avatar
Jerome Mariette committed
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745

    def _get_cleaned_email_placeholders(self, text):
        """
        @summary: Returns the text after replacement of placeholders by the corresponding workflow values (method or attribute).
                  Placeholders must be an attribute or a method of the workflow between three sharps: ###attribute### or ###method()###.
                  You can add "|date" after the attribute or the method to convert a timestamp in human readable date.
                  Examples: ###id### is replaced by wf.id ; ###get_status()### is replaced by wf.get_status() ; ###start_time|date### is rplaced by wf.start_time in date format.
        @param text: [str] The text containing placeholders.
        @return: [str] The text with placeholders replaced by her real value.
        """
        new_text = text
        placeholders = re.findall("\#\#\#([^\#]+)\#\#\#", text)
        for placeholder in placeholders:
            try:
                placeholder_value = ""
                placeholder_key = placeholder
                is_date = False
                if placeholder.endswith("|date"):
                    placeholder_key = placeholder[:-5]
                    is_date = True
                # Get value
                if placeholder_key.endswith("()"):
                    placeholder_value = str(getattr(self, placeholder_key[:-2])())
                else:
                    placeholder_value = str(getattr(self, placeholder_key))
                # Apply date format
                if is_date:
                    jflow_date_format = self.jflow_config_reader.get_date_format()
                    placeholder_value = time.strftime(jflow_date_format + " %H:%M:%S", time.gmtime(float(placeholder_value)))
                new_text = new_text.replace("###" + placeholder + "###", placeholder_value)
            except:
                pass
        return new_text

Penom Nom's avatar
Penom Nom committed
746
747
    def _send_email(self):
        import smtplib
Jerome Mariette's avatar
Jerome Mariette committed
748
        from email.mime.text import MIMEText
Penom Nom's avatar
Penom Nom committed
749
750
751
752
753
754
755
756
757
758
759
760
761
762
        smtps, smtpp, froma, fromp, toa, subject, message = self.jflow_config_reader.get_email_options()
        
        if self.__to_address: toa = self.__to_address
        if self.__subject: subject = self.__subject
        if self.__message: message = self.__message
        
        if smtps and smtpp and froma and fromp:
            if not toa: toa = froma
            if validate_email(froma) and validate_email(toa):
                try:
                    # Open a plain text file for reading.  For this example, assume that
                    # the text file contains only ASCII characters.
                    # Create a text/plain message
                    if not message:
763
                        message = Workflow.get_status_under_text_format(self, True, True, True)
Jerome Mariette's avatar
Jerome Mariette committed
764
                    message = self._get_cleaned_email_placeholders( message )
Penom Nom's avatar
Penom Nom committed
765
766
767
                    msg = MIMEText(message, 'html')
                    me = froma
                    you = toa
Jerome Mariette's avatar
Jerome Mariette committed
768
769
770
                    if not subject:
                        subject = "JFlow - Workflow #" + str(self.id) + " is " + self.get_status()
                    subject = self._get_cleaned_email_placeholders( subject )
Penom Nom's avatar
Penom Nom committed
771
772
773
774
775
776
777
                    msg['Subject'] = subject
                    msg['From'] = me
                    msg['To'] = you
                    # Send the message via our own SMTP server, but don't include the
                    # envelope header.
                    s = smtplib.SMTP(smtps, smtpp)
                    s.ehlo()
778
779
780
781
782
                    # if the SMTP server does not provides TLS or identification
                    try:
                        s.starttls()
                        s.login(me, fromp)
                    except smtplib.SMTPHeloError:
Jerome Mariette's avatar
Jerome Mariette committed
783
                        self._log("The server didn't reply properly to the HELO greeting.", level="warning", traceback=traceback.format_exc(chain=False))
784
                    except smtplib.SMTPAuthenticationError:
Jerome Mariette's avatar
Jerome Mariette committed
785
                        self._log("The server didn't accept the username/password combination.", level="warning", traceback=traceback.format_exc(chain=False))
786
                    except smtplib.SMTPException:
Jerome Mariette's avatar
Jerome Mariette committed
787
                        self._log("No suitable authentication method was found, or the server does not support the STARTTLS extension.", level="warning", traceback=traceback.format_exc(chain=False))
788
                    except RuntimeError:
Jerome Mariette's avatar
Jerome Mariette committed
789
                        self._log("SSL/TLS support is not available to your Python interpreter.", level="warning", traceback=traceback.format_exc(chain=False))
790
                    except:
Jerome Mariette's avatar
Jerome Mariette committed
791
                        self._log("Unhandled error when sending mail.", level="warning", traceback=traceback.format_exc(chain=False))
792
793
794
                    finally:
                        s.sendmail(me, [you], msg.as_string())
                        s.close()
Penom Nom's avatar
Penom Nom committed
795
                except:
Jerome Mariette's avatar
Jerome Mariette committed
796
                    self._log("Impossible to connect to smtp server '" + smtps + "'", level="warning", traceback=traceback.format_exc(chain=False))
797
    
Penom Nom's avatar
Penom Nom committed
798
799
800
801
    def get_parameters_per_groups(self):
        name = self.get_name()
        description = self.get_description()
        parameters = self.get_parameters()
802
803
        pgparameters, parameters_order = {}, []
        for param in parameters:
Jerome Mariette's avatar
Jerome Mariette committed
804
            if param.group not in parameters_order: parameters_order.append(param.group)
Penom Nom's avatar
Penom Nom committed
805
            if param.group in pgparameters:
806
                pgparameters[param.group].append(param)
Jerome Mariette's avatar
Jerome Mariette committed
807
            else:
808
809
                pgparameters[param.group] = [param]
        return [pgparameters, parameters_order]
Jerome Mariette's avatar
Jerome Mariette committed
810
    
Penom Nom's avatar
Penom Nom committed
811
812
813
    def get_parameters(self):
        params = []
        for param in self.params_order:
Penom Nom's avatar
Penom Nom committed
814
            for attribute_value in list(self.__dict__.values()):
Penom Nom's avatar
Penom Nom committed
815
816
817
818
                if (issubclass(attribute_value.__class__, AbstractParameter)) and param == attribute_value.name:
                    params.append(attribute_value)
        return params
    
819
    def get_exec_path(self, software):
Jerome Mariette's avatar
Jerome Mariette committed
820
        exec_path = self.jflow_config_reader.get_exec(software)
821
822
823
824
        if exec_path is None and os.path.isfile(os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "../bin", software)):
            exec_path = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "../bin", software)
        elif exec_path is None and os.path.isfile(os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "bin", software)):
            exec_path = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "bin", software)
825
        elif exec_path is None and utils.which(software) == None:
826
            raise Exception("'" + software + "' path connot be retrieved either in the PATH and in the application.properties file!")
827
828
829
        elif exec_path is None and utils.which(software) != None: 
            exec_path = software
        elif exec_path != None and not os.path.isfile(exec_path):
830
            raise Exception("'" + exec_path + "' set for '" + software + "' does not exists, please provide a valid path!")
831
        return exec_path
832
    
833
    def add_component(self, component_name, args=[], kwargs={}, component_prefix="default"):
834
        # first build and check if this component is OK
Penom Nom's avatar
Penom Nom committed
835
        if component_name in self.internal_components or component_name in self.external_components:
Penom Nom's avatar
Penom Nom committed
836
            
Penom Nom's avatar
Penom Nom committed
837
838
            if component_name in self.internal_components:
                my_pckge = __import__(self.internal_components[component_name], globals(), locals(), [component_name])
Penom Nom's avatar
Penom Nom committed
839
840
841
                # build the object and define required field
                cmpt_object = getattr(my_pckge, component_name)()
                cmpt_object.output_directory = self.get_component_output_directory(component_name, component_prefix)
Celine Noirot's avatar
Celine Noirot committed
842
                cmpt_object.set_prefix(component_prefix)
Penom Nom's avatar
Penom Nom committed
843
844
845
846
847
848
                if kwargs: cmpt_object.define_parameters(**kwargs)
                else: cmpt_object.define_parameters(*args)
            # external components
            else :
                cmpt_object = self.external_components[component_name]()
                cmpt_object.output_directory = self.get_component_output_directory(component_name, component_prefix)
Celine Noirot's avatar
Celine Noirot committed
849
                cmpt_object.set_prefix(component_prefix)
Penom Nom's avatar
Penom Nom committed
850
851
                # can't use positional arguments with external components
                cmpt_object.define_parameters(**kwargs)
852
853
            
            # there is a dynamic component
854
            if cmpt_object.is_dynamic():
855
856
857
858
                self.dynamic_component_present = True
                # if already init, add the component to the list and check if weaver should be executed
                if self.component_nameids_is_init:
                    # add the component
Penom Nom's avatar
Penom Nom committed
859
                    self.components_to_exec.append(cmpt_object)
860
861
862
863
864
865
866
867
                    self.components.append(cmpt_object)
                    self._execute_weaver()
                    # update outputs
                    for output in cmpt_object.get_dynamic_outputs():
                        output.update()
                else:
                    if self._component_is_duplicated(cmpt_object):
                        raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " + 
Celine Noirot's avatar
Celine Noirot committed
868
                                            cmpt_object.get_prefix() + " already exist in this pipeline!")
869
                    self.component_nameids[cmpt_object.get_nameid()] = None
Penom Nom's avatar
Penom Nom committed
870
                    self.components_to_exec = []
871
872
873
874
                    self.components = []
            else:
                if self.component_nameids_is_init:
                    # add the component
Penom Nom's avatar
Penom Nom committed
875
                    self.components_to_exec.append(cmpt_object)
876
877
878
879
                    self.components.append(cmpt_object)
                elif not self.component_nameids_is_init and not self.dynamic_component_present:
                    if self._component_is_duplicated(cmpt_object):
                        raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " + 
Celine Noirot's avatar
Celine Noirot committed
880
                                            cmpt_object.get_prefix() + " already exist in this pipeline!")
Penom Nom's avatar
Penom Nom committed
881
                    self.components_to_exec.append(cmpt_object)
882
883
884
885
                    self.components.append(cmpt_object)
                else:
                    if self._component_is_duplicated(cmpt_object):
                        raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " + 
Celine Noirot's avatar
Celine Noirot committed
886
                                            cmpt_object.get_prefix() + " already exist in this pipeline!")
887
888
                    self.component_nameids[cmpt_object.get_nameid()] = None

Jerome Mariette's avatar
Jerome Mariette committed
889
890
            return cmpt_object
        else:
Penom Nom's avatar
Penom Nom committed
891
            raise ImportError(component_name + " component cannot be loaded, available components are: {0}".format(
Penom Nom's avatar
Penom Nom committed
892
                                           ", ".join(list(self.internal_components.keys()) + list(self.external_components.keys()))))
893
    
894
895
896
897
    def pre_process(self):
        pass
    
    def process(self):
898
899
900
        """ 
        Run the workflow, has to be implemented by subclasses
        """
Penom Nom's avatar
Penom Nom committed
901
902
903
904
905
906
907
908
909
910
911
912
913
        raise NotImplementedError( "Workflow.process() must be implemented in " + self.__class__.__name__ )

    def get_name(self):
        """ 
        Return the workflow name.
        """
        return self.__class__.__name__.lower()
    
    def get_description(self):
        """ 
        Return the workflow description, has to be implemented by subclasses
        """
        raise NotImplementedError( "Workflow.get_description() must be implemented in " + self.__class__.__name__ )
Celine Noirot's avatar
Celine Noirot committed
914
915
916
917
918
919
920
921
922
923
924
925

    def get_tools_description(self):
        """
        Return the workflow description, has to be implemented by subclasses
        """
        return ""

    def get_summary(self):
        """
        Return a workflow summary, if implemented by subclasses
        """
        return ""
Penom Nom's avatar
Penom Nom committed
926
927
928
929
930
931
    
    def define_parameters(self, function="process"):
        """ 
        Define the workflow parameters, has to be implemented by subclasses
        """
        raise NotImplementedError( "Workflow.define_parameters() must be implemented in " + self.__class__.__name__ )
932
    
933
934
935
    def post_process(self):
        pass
    
936
937
938
939
940
941
942
    def get_temporary_file(self, suffix=".txt"):
        tempfile_name = os.path.basename(tempfile.NamedTemporaryFile(suffix=suffix).name)
        return os.path.join(self.jflow_config_reader.get_tmp_directory(), tempfile_name)

    def get_component_output_directory(self, component_name, component_prefix):
        return os.path.join(self.directory, component_name + "_" + component_prefix)
    
Jerome Mariette's avatar
Jerome Mariette committed
943
    def get_components_nameid(self):
Penom Nom's avatar
Penom Nom committed
944
        return list(self.component_nameids.keys())
Jerome Mariette's avatar
Jerome Mariette committed
945
    
Jerome Mariette's avatar
Jerome Mariette committed
946
947
    def wf_execution_wrapper(self):
        getattr(self, self.function)()
Celine Noirot's avatar
Celine Noirot committed
948
949
950
951
952
953
954
955
956
957

    def _write_parameters(self):
        ofile = self.directory + "/.wf-parameters.txt"
        params = self.get_parameters_per_groups()[0]
        with open(ofile, "w") as wfile:
            for group, g_params in params.items():
                wfile.write("#" + group + "\n")
                for g_param in g_params:
                    wfile.write(g_param.name + ": " + str(g_param) + "\n")
                wfile.write("\n")
Jerome Mariette's avatar
Jerome Mariette committed
958
    
959
960
961
962
    def run(self):
        """
        Only require for Threading
        """
Penom Nom's avatar
Penom Nom committed
963
964
        try:
            # if this is the first time the workflow run
Jerome Mariette's avatar
Jerome Mariette committed
965
            if self.__step == None:
Penom Nom's avatar
Penom Nom committed
966
                self.start_time = time.time()
Penom Nom's avatar
Penom Nom committed
967
                self.__step = 0
Jerome Mariette's avatar
Jerome Mariette committed
968
969
                self._status = self.STATUS_STARTED
                self._postprocess_status = self.STATUS_PENDING
Penom Nom's avatar
Penom Nom committed
970
                self.end_time = None
Celine Noirot's avatar
Celine Noirot committed
971
972
                #check rules
                self.check_parameters_rules(self.args)
Penom Nom's avatar
Penom Nom committed
973
974
                # if some args are provided, let's fill the parameters
                self._set_parameters(self.args)
Celine Noirot's avatar
Celine Noirot committed
975
                self._write_parameters()
976
                self._serialize()
Penom Nom's avatar
Penom Nom committed
977
            # if pre_processing has not been done yet
Penom Nom's avatar
Penom Nom committed
978
            if self.__step == 0:
Penom Nom's avatar
Penom Nom committed
979
                self.pre_process()
Penom Nom's avatar
Penom Nom committed
980
                self.__step = 1
Penom Nom's avatar
Penom Nom committed
981
982
                self._serialize()
            # if collecting components and running workflow has not been done yet
Penom Nom's avatar
Penom Nom committed
983
            if self.__step == 1:
Jerome Mariette's avatar
Jerome Mariette committed
984
985
986
987
988
989
                self.reseted_components = []
                self.components = []
                self._status = self.STATUS_STARTED
                self._postprocess_status = self.STATUS_PENDING
                self._serialize()
                self.wf_execution_wrapper()
Penom Nom's avatar
Penom Nom committed
990
991
                self.component_nameids_is_init = True
                if self.dynamic_component_present:
Penom Nom's avatar
Penom Nom committed
992
                    self.__step = 2
Penom Nom's avatar
Penom Nom committed
993
994
                else:
                    self._execute_weaver()
Penom Nom's avatar
Penom Nom committed
995
                    self.__step = 3
Jerome Mariette's avatar
Jerome Mariette committed
996
                self._serialize()
Penom Nom's avatar
Penom Nom committed
997
            # if the workflow was a dynamic one
Penom Nom's avatar
Penom Nom committed
998
            if self.__step == 2:
Jerome Mariette's avatar
Jerome Mariette committed
999
1000
                self.reseted_components = []
                self.components = []
For faster browsing, not all history is shown. View entire blame