workflow.py 60.8 KB
Newer Older
Jerome Mariette's avatar
Jerome Mariette committed
1
#
Jerome Mariette's avatar
Jerome Mariette committed
2
# Copyright (C) 2015 INRA
Jerome Mariette's avatar
Jerome Mariette committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import inspect
import os
20
import re
Jerome Mariette's avatar
Jerome Mariette committed
21
22
23
24
25
26
27
import sys
import uuid
import pkgutil
import tempfile
import pickle
import time
import threading
Jerome Mariette's avatar
Jerome Mariette committed
28
import types
Jerome Mariette's avatar
Jerome Mariette committed
29
import datetime
30
31
import logging
import traceback
Jerome Mariette's avatar
Jerome Mariette committed
32

33
from configparser import ConfigParser, NoOptionError
Jerome Mariette's avatar
Jerome Mariette committed
34
from inspect import getcallargs
35
from datetime import date as ddate
Jerome Mariette's avatar
Jerome Mariette committed
36

37
38
from collections import OrderedDict

Jerome Mariette's avatar
Jerome Mariette committed
39
import jflow
Jerome Mariette's avatar
Jerome Mariette committed
40
import jflow.utils as utils
41
from jflow.utils import validate_email
42
from pygraph.classes.digraph import digraph
Ibouniyamine Nabihoudine's avatar
Ibouniyamine Nabihoudine committed
43
44
from jflow.workflows_manager import WorkflowsManager 
from jflow.config_reader import JFlowConfigReader
45
from jflow.utils import get_octet_string_representation, get_nb_octet
46
from jflow.parameter import *
47
from cctools.util import time_format
Jerome Mariette's avatar
Jerome Mariette committed
48
49
50
51
52
53
54
55
56
57
58
59

from weaver.script import ABSTRACTIONS
from weaver.script import DATASETS
from weaver.script import FUNCTIONS
from weaver.script import NESTS
from weaver.script import OPTIONS
from weaver.script import STACKS
from weaver.nest import Nest
from weaver.options import Options
from cctools.makeflow import MakeflowLog
from cctools.makeflow.log import Node

Jerome Mariette's avatar
Jerome Mariette committed
60

61
62
63
64
65
66
67
class MINIWorkflow(object):
    
    def __init__(self, id, name, description, status, start_time, end_time, metadata, 
                 component_nameids, compts_status, errors):
        self.id = id
        self.name = name
        self.description = description
68
        self._status = status
69
70
71
72
73
74
75
76
77
78
        self.start_time = start_time
        self.end_time = end_time
        self.metadata = metadata
        self.component_nameids = component_nameids
        self.compts_status = compts_status
        self.errors = errors
        
    def get_components_nameid(self):
        return self.component_nameids

79
80
81
    def get_components_status(self):
        return self.compts_status
    
82
83
84
85
86
87
88
    def get_component_status(self, component_nameid):
        return self.compts_status[component_nameid]
    
    def get_errors(self):
        return self.errors
    
    def get_status(self):
89
        return self._status
90

91
92
class Workflow(threading.Thread):
    
Jerome Mariette's avatar
Jerome Mariette committed
93
    MAKEFLOW_LOG_FILE_NAME = "Makeflow.makeflowlog"
94
    DUMP_FILE_NAME = ".workflow.dump"
95
    STDERR_FILE_NAME = "wf_stderr.txt"
Jerome Mariette's avatar
Jerome Mariette committed
96
    WORKING = ".working"
97
    OLD_EXTENSION = ".old"
98
    DEFAULT_GROUP = "default"
99
    
100
    STATUS_PENDING = "pending"
Jerome Mariette's avatar
Jerome Mariette committed
101
102
103
104
    STATUS_STARTED = "started"
    STATUS_COMPLETED = "completed"
    STATUS_FAILED = "failed"
    STATUS_ABORTED = "aborted"
Jerome Mariette's avatar
Jerome Mariette committed
105
    STATUS_RESETED = "reseted"
106
    
Jerome Mariette's avatar
Jerome Mariette committed
107
108
109
    INPUTFILE_GRAPH_LABEL = "inputfile"
    INPUTFILES_GRAPH_LABEL = "inputfiles"
    INPUTDIRECTORY_GRAPH_LABEL = "inputdirectory"
110
111
    COMPONENT_GRAPH_LABEL = "component"
    
112
    
113
    def __init__(self, args={}, id=None, function= "process"):
Jerome Mariette's avatar
Jerome Mariette committed
114
115
116
117
        # define as a thread
        threading.Thread.__init__(self)
        self.jflow_config_reader = JFlowConfigReader()
        self.manager = WorkflowsManager()
118
        self.components_to_exec = []
119
        self.components = []
Jerome Mariette's avatar
Jerome Mariette committed
120
121
122
        self.makes = {}
        self.globals = {}
        self.options = Options()
123
124
        self._status = self.STATUS_STARTED
        self._postprocess_status = self.STATUS_PENDING
Jerome Mariette's avatar
Jerome Mariette committed
125
126
        self.start_time = None
        self.end_time = None
Jerome Mariette's avatar
Jerome Mariette committed
127
        self.__step = None
Frédéric Escudié's avatar
Frédéric Escudié committed
128
        self.stderr = None
Jerome Mariette's avatar
Jerome Mariette committed
129
        self.args = args
130
        self.dynamic_component_present = False
131
132
133
        self.__to_address = None
        self.__subject = None
        self.__message = None
134
        self.function = function
135
        # intruduce --log-verbose to be able to monitor the new version of makeflow >=4.2.2
Jerome Mariette's avatar
Jerome Mariette committed
136
        self.engine_arguments = ' --log-verbose '
137
        self.component_nameids_is_init = False
Jerome Mariette's avatar
Jerome Mariette committed
138
        self.component_nameids = {}
139
        self.reseted_components = []
Jerome Mariette's avatar
Jerome Mariette committed
140
141
        # try to parse engine arguments
        try:
142
143
            type, options, limit_submission = self.jflow_config_reader.get_batch()
            if limit_submission : self.engine_arguments += ' -J ' + str(limit_submission)
Jerome Mariette's avatar
Jerome Mariette committed
144
145
146
            if type: self.engine_arguments += ' -T ' + type
            if options : self.engine_arguments += ' -B "' + options + '"'
        except: self.engine_arguments = None
147

Jerome Mariette's avatar
Jerome Mariette committed
148
        self.id = id
149
150
        self.name = self.get_name()
        self.description = self.get_description()
Ibouniyamine Nabihoudine's avatar
Ibouniyamine Nabihoudine committed
151
        self.__group = self.jflow_config_reader.get_workflow_group(self.__class__.__name__) or Workflow.DEFAULT_GROUP
152
153
154
        
        # define the parameters 
        self.params_order = []
Jerome Mariette's avatar
Jerome Mariette committed
155
156
        if self.function != None:
            self.define_parameters(self.function)
157
        # add the metadata parameter
158
        self.metadata = []
Jerome Mariette's avatar
Jerome Mariette committed
159
        
Jerome Mariette's avatar
Jerome Mariette committed
160
161
162
        if self.id is not None:
            self.directory = self.manager.get_workflow_directory(self.name, self.id)
            if not os.path.isdir(self.directory):
163
                os.makedirs(self.directory, 0o751)
Frédéric Escudié's avatar
Frédéric Escudié committed
164
            if self.stderr is None:
165
                self.stderr = self._set_stderr()
Jerome Mariette's avatar
Jerome Mariette committed
166
            self._serialize()
Jerome Mariette's avatar
Jerome Mariette committed
167
            
168
169
        self.internal_components = self._import_internal_components()
        self.external_components = self._import_external_components()
170

171
172
    def get_workflow_group(self):
        return self.__group
173
            
Jerome Mariette's avatar
Jerome Mariette committed
174
    def add_input_directory(self, name, help, default=None, required=False, flag=None, 
175
                            group="default", display_name=None, get_files_fn=None, add_to=None, rules=None):
176
        new_param = InputDirectory(name, help, flag=flag, default=default, required=required, 
177
                                   group=group, display_name=display_name, get_files_fn=get_files_fn, rules=rules)
Jerome Mariette's avatar
Jerome Mariette committed
178
179
180
181
182
183
184
185
186
187
188
        new_param.linkTrace_nameid = name
        # if this input should be added to a particular parameter
        if add_to:
            try:
                self.__getattribute__(add_to).add_sub_parameter(new_param)
            except: pass
        # otherwise, add it to the class itself
        else:
            self.params_order.append(name)
            self.__setattr__(name, new_param)
    
189
    def add_input_file(self, name, help, file_format="any", default=None, type="inputfile", 
190
191
                       required=False, flag=None, group="default", display_name=None, size_limit="0", add_to=None,
                       rules=None):
192
193
194
        # check if the size provided is correct
        try: int(get_nb_octet(size_limit))
        except: size_limit="0"
195
        new_param = InputFile(name, help, flag=flag, file_format=file_format, default=default, 
196
197
                              type=type, required=required, group=group, display_name=display_name, size_limit=size_limit,
                              rules=rules)
Frédéric Escudié's avatar
Frédéric Escudié committed
198
        new_param.linkTrace_nameid = name
199
200
201
202
203
204
        # if this input should be added to a particular parameter
        if add_to:
            try:
                self.__getattribute__(add_to).add_sub_parameter(new_param)
            except: pass
        # otherwise, add it to the class itself
205
        else:
206
207
208
209
            self.params_order.append(name)
            self.__setattr__(name, new_param)
            
    def add_input_file_list(self, name, help, file_format="any", default=None, type="inputfile", 
210
211
                            required=False, flag=None, group="default", display_name=None, size_limit="0", add_to=None,
                            rules=None):
212
        # check if the size provided is correct
213
        if default == None: default = []
214
215
        try: int(get_nb_octet(size_limit))
        except: size_limit="0"
216
217
218
219
220
221
222
        if default == None:
            inputs = []
        elif issubclass(default.__class__, list):
            inputs = [IOFile(file, file_format, name, None) for file in default]
        else:
            inputs = [IOFile(default, file_format, name, None)]
        new_param = InputFileList(name, help, flag=flag, file_format=file_format, default=inputs, 
223
224
                                  type=type, required=required, group=group, display_name=display_name, size_limit=size_limit,
                                  rules=rules)
Frédéric Escudié's avatar
Frédéric Escudié committed
225
        new_param.linkTrace_nameid = name
226
227
228
        # if this input should be added to a particular parameter
        if add_to:
            try:
Jerome Mariette's avatar
help ok    
Jerome Mariette committed
229
                self.__getattribute__(add_to).add_sub_parameter(new_param)
230
231
232
233
234
            except: pass
        # otherwise, add it to the class itself
        else:
            self.params_order.append(name)
            self.__setattr__(name, new_param)
235
236
            
    def add_multiple_parameter(self, name, help, required=False, flag=None, group="default", display_name=None):
237
        self.params_order.append(name)
238
239
240
241
242
243
        new_param = MultiParameter(name, help, flag=flag, required=required, group=group, display_name=display_name)
        self.__setattr__(name, new_param)

    def add_multiple_parameter_list(self, name, help, required=False, flag=None, group="default", display_name=None):
        self.params_order.append(name)
        new_param = MultiParameterList(name, help, flag=flag, required=required, group=group, display_name=display_name)
244
245
        self.__setattr__(name, new_param)
    
246
    def add_parameter(self, name, help, default=None, type=str, choices=None, 
247
                      required=False, flag=None, group="default", display_name=None, add_to=None, rules=None):
248
        new_param = ParameterFactory.factory(name, help, flag=flag, default=default, type=type, choices=choices, 
249
                              required=required, group=group, display_name=display_name, rules=rules)
250
251
252
253
254
255
        # if this input should be added to a particular parameter
        if add_to:
            try:
                self.__getattribute__(add_to).add_sub_parameter(new_param)
            except: pass
        # otherwise, add it to the class itself
256
        else:
257
258
259
            self.params_order.append(name)
            self.__setattr__(name, new_param)
    
260
    def add_parameter_list(self, name, help, default=None, type=str, choices=None, 
261
                           required=False, flag=None, group="default", display_name=None, add_to=None, rules=None):
262
263
        if default == None: default = []
        new_param = ParameterList(name, help, flag=flag, default=default, type=type, choices=choices, 
264
                                  required=required, group=group, display_name=display_name, rules=rules)
265
266
267
        # if this input should be added to a particular parameter
        if add_to:
            try:
Jerome Mariette's avatar
help ok    
Jerome Mariette committed
268
                self.__getattribute__(add_to).add_sub_parameter(new_param)
269
270
271
272
273
274
            except: pass
        # otherwise, add it to the class itself
        else:
            self.params_order.append(name)
            self.__setattr__(name, new_param)
    
Jerome Mariette's avatar
Jerome Mariette committed
275
    def add_exclusion_rule(self, *args2exclude):
Jerome Mariette's avatar
Jerome Mariette committed
276
        # first of all, does this parameter exist
Jerome Mariette's avatar
Jerome Mariette committed
277
278
279
280
281
282
283
284
285
286
287
288
        params2exclude = []
        for arg2exclude in args2exclude:
            try:
                params2exclude.append(self.__getattribute__(arg2exclude))
            except: pass
        # everything is ok, let's go
        if len(params2exclude) == len(args2exclude):
            new_group = "exclude-"+uuid.uuid4().hex[:5]
            for paramsexclude in params2exclude:
                paramsexclude.group = new_group
        # it might be a mutliple param rule
        else:
289
            self._log("Exclusion rule cannot be applied within a MultiParameter or a MultiParameterList", raisee=True)
Jerome Mariette's avatar
Jerome Mariette committed
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
        # save this for MultiParameter internal exclusion rules, works on command line, not supported on gui
#             for attribute_value in self.__dict__.values():
#                 if issubclass(attribute_value.__class__, MultiParameter) or issubclass(attribute_value.__class__, MultiParameterList):
#                     params2exclude = []
#                     for sub_param in attribute_value.sub_parameters:
#                         if sub_param.name in args2exclude:
#                             params2exclude.append(sub_param)
#                     if len(params2exclude) == len(args2exclude):
#                         new_group = "exclude-"+uuid.uuid4().hex[:5]
#                         flags2exclude = []
#                         for paramsexclude in params2exclude:
#                             paramsexclude.group = new_group
#                             flags2exclude.append(paramsexclude.flag)
#                         attribute_value.type.excludes[new_group] = flags2exclude
#                         break
305

Frédéric Escudié's avatar
Frédéric Escudié committed
306
    def _prepare_parameter(self, args, parameter, key="name"):
307
308
        new_param = None
        # Retrieve value
309
        if parameter.__getattribute__(key) in args:
Frédéric Escudié's avatar
Frédéric Escudié committed
310
            value = args[parameter.__getattribute__(key)]
Jerome Mariette's avatar
Jerome Mariette committed
311
        elif parameter.default != None:
312
313
314
315
            value = parameter.default
        else:
            value = None
        # Set new parameter
Ibouniyamine Nabihoudine's avatar
Ibouniyamine Nabihoudine committed
316
        if parameter.__class__ in [StrParameter, IntParameter, FloatParameter, BoolParameter, DateParameter, PasswordParameter]:
Frédéric Escudié's avatar
Frédéric Escudié committed
317
            if value == "" and parameter.__class__ in [IntParameter, FloatParameter, BoolParameter, DateParameter] : value = None # from GUI
318
319
320
321
            new_param = ParameterFactory.factory( parameter.name, parameter.help, default=value, type=parameter.type, choices=parameter.choices, 
                                                  required=parameter.required, flag=parameter.flag, group=parameter.group, 
                                                  display_name=parameter.display_name )
        elif parameter.__class__ ==  ParameterList:
322
            if value == "" : value = [] # from GUI
323
324
325
326
            new_param = ParameterList( parameter.name, parameter.help, default=value, type=parameter.type, choices=parameter.choices,
                                       required=parameter.required, flag=parameter.flag, sub_parameters=parameter.sub_parameters,
                                       group=parameter.group, display_name=parameter.display_name )
        elif parameter.__class__ == InputFileList:
327
            if value == "" : value = [] # from GUI
328
            iovalues = []
Ibouniyamine Nabihoudine's avatar
Ibouniyamine Nabihoudine committed
329
            prepared_files = parameter.prepare(value)
330
            for file in prepared_files:
331
332
                iovalues.append(IOFile(file, parameter.file_format, parameter.linkTrace_nameid, None))
            new_param = InputFileList( parameter.name, parameter.help, file_format=parameter.file_format, default=iovalues,
333
334
                                       type=parameter.type, choices=parameter.choices, required=parameter.required, flag=parameter.flag,
                                       group=parameter.group, display_name=parameter.display_name, size_limit=parameter.size_limit )
335
            new_param.linkTrace_nameid = parameter.linkTrace_nameid            
336
        elif parameter.__class__ == InputFile:
337
            if value == "" : value = None # from GUI
Ibouniyamine Nabihoudine's avatar
Ibouniyamine Nabihoudine committed
338
            prepared_file = parameter.prepare(value)
339
            new_param = InputFile( parameter.name, parameter.help, file_format=parameter.file_format, default=prepared_file,
340
341
                                   type=parameter.type, choices=parameter.choices, required=parameter.required, flag=parameter.flag, 
                                   group=parameter.group, display_name=parameter.display_name )
342
            new_param.linkTrace_nameid = parameter.linkTrace_nameid
Jerome Mariette's avatar
Jerome Mariette committed
343
344
        elif parameter.__class__ == InputDirectory:
            if value == "" : value = None # from GUI
Ibouniyamine Nabihoudine's avatar
Ibouniyamine Nabihoudine committed
345
            prepared_directory = parameter.prepare(value)
346
            new_param = InputDirectory( parameter.name, parameter.help, default=prepared_directory, choices=parameter.choices, 
Jerome Mariette's avatar
Jerome Mariette committed
347
                                        required=parameter.required, flag=parameter.flag, group=parameter.group, 
348
                                        display_name=parameter.display_name, get_files_fn=parameter.get_files_fn)
Jerome Mariette's avatar
Jerome Mariette committed
349
            new_param.linkTrace_nameid = parameter.linkTrace_nameid
350
351
352
353
        else:
            raise Exception( "Unknown class '" +  parameter.__class__.__name__ + "' for parameter.")
        return new_param

354
355
356
    def _set_parameters(self, args):
        parameters = self.get_parameters()
        for param in parameters:
357
            new_param = None
358
            if param.__class__ == MultiParameter:
359
                new_param = MultiParameter(param.name, param.help, required=param.required, flag=param.flag, group=param.group, display_name=param.display_name)
360
                new_param.sub_parameters = param.sub_parameters
361
                if param.name in args:
362
363
364
365
366
367
                    sub_args = {}
                    for sarg in args[param.name]:
                        sub_args[sarg[0]] = sarg[1]
                    for sub_param in param.sub_parameters:
                        new_sub_parameter = self._prepare_parameter(sub_args, sub_param, "flag")
                        new_param[new_sub_parameter.name] = new_sub_parameter
368
            elif param.__class__ == MultiParameterList:
369
                new_param = MultiParameterList(param.name, param.help, required=param.required, flag=param.flag, group=param.group, display_name=param.display_name)
370
                new_param.sub_parameters = param.sub_parameters
371
                if param.name in args:
372
373
374
375
376
377
378
379
380
                    for idx, sargs in enumerate(args[param.name]):
                        new_multi_param = MultiParameter(param.name + '_' + str(idx), '', required=False, flag=None, group="default", display_name=None)
                        sub_args = {}
                        for sarg in sargs:
                            sub_args[sarg[0]] = sarg[1]
                        for sub_param in param.sub_parameters:
                            new_sub_param = self._prepare_parameter(sub_args, sub_param, "flag")
                            new_multi_param[new_sub_param.name] = new_sub_param
                        new_param.append(new_multi_param)
381
            else:
382
                new_param = self._prepare_parameter(args, param)
383
            self.__setattr__(param.name, new_param)
Frédéric Escudié's avatar
Frédéric Escudié committed
384

Jerome Mariette's avatar
Jerome Mariette committed
385
    def get_execution_graph(self):
386
        gr = digraph()
387
388
        # build a all_nodes table to store all nodes
        all_nodes = {}
389
        for ioparameter in list(self.__dict__.values()):
390
391
            if issubclass(ioparameter.__class__, InputFile):
                gr.add_node(ioparameter.name)
Jerome Mariette's avatar
Jerome Mariette committed
392
                gr.add_node_attribute(ioparameter.name, self.INPUTFILE_GRAPH_LABEL)
393
                gr.add_node_attribute(ioparameter.name, ioparameter.display_name)
394
                all_nodes[ioparameter.name] = None
395
396
            elif issubclass(ioparameter.__class__, InputFileList):
                gr.add_node(ioparameter.name)
Jerome Mariette's avatar
Jerome Mariette committed
397
398
399
400
401
402
                gr.add_node_attribute(ioparameter.name, self.INPUTFILES_GRAPH_LABEL)
                gr.add_node_attribute(ioparameter.name, ioparameter.display_name)
                all_nodes[ioparameter.name] = None
            elif issubclass(ioparameter.__class__, InputDirectory):
                gr.add_node(ioparameter.name)
                gr.add_node_attribute(ioparameter.name, self.INPUTDIRECTORY_GRAPH_LABEL)
403
                gr.add_node_attribute(ioparameter.name, ioparameter.display_name)
404
                all_nodes[ioparameter.name] = None
Philippe Bardou's avatar
Philippe Bardou committed
405
            elif issubclass(ioparameter.__class__, MultiParameter):
Jerome Mariette's avatar
Jerome Mariette committed
406
407
408
409
410
411
412
413
414
415
                for subparam in ioparameter.sub_parameters:
                    gr.add_node(subparam.name)
                    all_nodes[subparam.name] = None
                    if issubclass(subparam.__class__, InputFile):
                        gr.add_node_attribute(subparam.name, self.INPUTFILE_GRAPH_LABEL)
                    elif issubclass(subparam.__class__, InputFileList):
                        gr.add_node_attribute(subparam.name, self.INPUTFILES_GRAPH_LABEL)
                    elif issubclass(subparam.__class__, InputDirectory):
                        gr.add_node_attribute(subparam.name, self.INPUTDIRECTORY_GRAPH_LABEL)
                    gr.add_node_attribute(subparam.name, subparam.display_name)
Philippe Bardou's avatar
Philippe Bardou committed
416
417
418
419
420
421
422
423
424
            elif issubclass(ioparameter.__class__, MultiParameterList):
                for subparam in ioparameter.sub_parameters:
                    gr.add_node(subparam.name)
                    all_nodes[subparam.name] = None                        
                    if issubclass(subparam.__class__, InputDirectory):
                        gr.add_node_attribute(subparam.name, self.INPUTDIRECTORY_GRAPH_LABEL)
                    else:
                        gr.add_node_attribute(subparam.name, self.INPUTFILES_GRAPH_LABEL)
                    gr.add_node_attribute(subparam.name, subparam.display_name)
425
        for cpt in self.components:
426
427
            gr.add_node(cpt.get_nameid())
            gr.add_node_attribute(cpt.get_nameid(), self.COMPONENT_GRAPH_LABEL)
428
            gr.add_node_attribute(cpt.get_nameid(), cpt.get_nameid())
429
            all_nodes[cpt.get_nameid()] = None
Jerome Mariette's avatar
Jerome Mariette committed
430
        for cpt in self.components:
431
            for ioparameter in list(cpt.__dict__.values()):
Jerome Mariette's avatar
Jerome Mariette committed
432
                if issubclass( ioparameter.__class__, InputFile ) or issubclass( ioparameter.__class__, InputFileList) or issubclass( ioparameter.__class__, InputDirectory):
Frédéric Escudié's avatar
Frédéric Escudié committed
433
434
                    for parent in ioparameter.parent_linkTrace_nameid:
                        try: gr.add_edge((parent, ioparameter.linkTrace_nameid))
Frédéric Escudié's avatar
Frédéric Escudié committed
435
                        except: pass
436
437
438
439
                elif issubclass( ioparameter.__class__, InputObject) or issubclass( ioparameter.__class__, InputObjectList):
                    for parent in ioparameter.parent_linkTrace_nameid:
                        try: gr.add_edge((parent, ioparameter.linkTrace_nameid))
                        except: pass
440
441
        # check if all nodes are connected
        for edge in gr.edges():
442
            if edge[0] in all_nodes:
443
                del all_nodes[edge[0]]
444
            if edge[1] in all_nodes:
445
446
                del all_nodes[edge[1]]
        # then remove all unconnected nodes: to delete inputs not defined by the user
447
        for orphan_node in list(all_nodes.keys()):
448
            gr.del_node(orphan_node)
449
        return gr
Frédéric Escudié's avatar
Frédéric Escudié committed
450

Jerome Mariette's avatar
Jerome Mariette committed
451
452
    def delete(self):
        if self.get_status() in [self.STATUS_COMPLETED, self.STATUS_FAILED, self.STATUS_ABORTED]:
Jerome Mariette's avatar
Jerome Mariette committed
453
            utils.robust_rmtree(self.directory)
Jerome Mariette's avatar
Jerome Mariette committed
454

455
    @staticmethod
456
    def config_parser(arg_lines):
457
458
        for arg in arg_lines:
            yield arg
459
460
461
462
            
    @staticmethod
    def get_status_under_text_format(workflow, detailed=False, display_errors=False, html=False):
        if workflow.start_time: start_time = time.asctime(time.localtime(workflow.start_time))
463
        else: start_time = "-"
464
465
        if workflow.start_time and workflow.end_time: elapsed_time = str(workflow.end_time-workflow.start_time)
        elif workflow.start_time: elapsed_time = str(time.time()-workflow.start_time)
466
        else: elapsed_time = "-"
Jerome Mariette's avatar
Jerome Mariette committed
467
        elapsed_time = "-" if elapsed_time == "-" else str(datetime.timedelta(seconds=int(str(elapsed_time).split(".")[0])))
468
        if workflow.end_time: end_time = time.asctime(time.localtime(workflow.end_time))
469
470
471
        else: end_time = "-"
        if detailed:
            # Global
472
473
            title = "Workflow #" + utils.get_nb_string(workflow.id) + " (" + workflow.name + ") is " + \
                    workflow.get_status() + ", time elapsed: " + str(elapsed_time) + " (from " + start_time + \
474
475
                    " to " + end_time + ")"
            worflow_errors = ""
476
            error = workflow.get_errors()
477
            if error is not None:
Jerome Mariette's avatar
Jerome Mariette committed
478
479
                if html: worflow_errors = "Workflow Error :\n  <span style='color:#ff0000'>" + error["location"] + "\n    " + "\n    ".join(error["msg"]) + "</span>"
                else: worflow_errors = "Workflow Error :\n  \033[91m" + error["location"] + "\n    " + "\n    ".join(error["msg"]) + "\033[0m"
480
481
482
            # By components
            components_errors = ""
            status = "Components Status :\n"
483
            components_status = workflow.get_components_status()
484
            for i, component in enumerate(workflow.get_components_nameid()):
485
                status_info = components_status[component]
486
487
488
489
490
491
492
493
494
495
496
                try: perc_waiting = (status_info["waiting"]*100.0)/status_info["tasks"]
                except: perc_waiting = 0
                try: perc_running = (status_info["running"]*100.0)/status_info["tasks"]
                except: perc_running = 0
                try: perc_failed = (status_info["failed"]*100.0)/status_info["tasks"]
                except: perc_failed = 0
                try: perc_aborted = (status_info["aborted"]*100.0)/status_info["tasks"]
                except: perc_aborted = 0
                try: perc_completed = (status_info["completed"]*100.0)/status_info["tasks"]
                except: perc_completed = 0
                
Jerome Mariette's avatar
Jerome Mariette committed
497
498
499
                if status_info["running"] > 0: 
                    if html: running = "<span style='color:#3b3bff'>running:" + str(status_info["running"]) + "</span>"
                    else: running = "\033[94mrunning:" + str(status_info["running"]) + "\033[0m"
500
                else: running = "running:" + str(status_info["running"])
Jerome Mariette's avatar
Jerome Mariette committed
501
502
503
                if status_info["waiting"] > 0: 
                    if html: waiting = "<span style='color:#ffea00'>waiting:" + str(status_info["waiting"]) + "</span>"
                    else: waiting = "\033[93mwaiting:" + str(status_info["waiting"]) + "\033[0m"
504
                else: waiting = "waiting:" + str(status_info["waiting"])            
Jerome Mariette's avatar
Jerome Mariette committed
505
506
507
                if status_info["failed"] > 0: 
                    if html: failed = "<span style='color:#ff0000'>failed:" + str(status_info["failed"]) + "</span>"
                    else: failed = "\033[91mfailed:" + str(status_info["failed"]) + "\033[0m"
508
                else: failed = "failed:" + str(status_info["failed"])
Jerome Mariette's avatar
Jerome Mariette committed
509
510
511
                if status_info["aborted"] > 0: 
                    if html: aborted = "<span style='color:#ff01ba'>aborted:" + str(status_info["aborted"]) + "</span>"
                    else: aborted = "\033[95maborted:" + str(status_info["aborted"]) + "\033[0m"
512
                else: aborted = "aborted:" + str(status_info["aborted"])
Jerome Mariette's avatar
Jerome Mariette committed
513
514
515
                if status_info["completed"] == status_info["tasks"] and status_info["completed"] > 0: 
                    if html: completed = "<span style='color:#14ac00'>completed:" + str(status_info["completed"]) + "</span>"
                    else: completed = "\033[92mcompleted:" + str(status_info["completed"]) + "\033[0m"
516
517
518
                else: completed = "completed:" + str(status_info["completed"])
                
                if display_errors and len(status_info["failed_commands"]) > 0:
519
520
                    if components_errors == "":
                        components_errors = "Failed Commands :\n"
521
                    components_errors += "  - " + component + " :\n    " + "\n    ".join(status_info["failed_commands"]) + "\n"
522
523
524
                status += "  - " + component + ", time elapsed " + time_format(status_info["time"]) + \
                    " (total:" + str(status_info["tasks"]) + ", " + waiting + ", " + running + ", " + failed + \
                    ", " + aborted + ", " + completed + ")"
525
                if i<len(workflow.get_components_nameid())-1: status += "\n"
526
527
528
            # Format str
            pretty_str = title
            pretty_str += ("\n" + worflow_errors) if worflow_errors != "" else ""
529
530
            if len(workflow.get_components_nameid()) > 0:
                pretty_str += ("\n" + status) if status != "" else ""
531
                pretty_str += ("\n" + components_errors[:-1]) if components_errors != "" else ""
Jerome Mariette's avatar
Jerome Mariette committed
532
533
            if html: return pretty_str.replace("\n", "<br />")
            else: return pretty_str
534
        else:
535
536
            pretty_str = utils.get_nb_string(workflow.id) + "\t" + workflow.name + "\t"
            if workflow.get_status() == Workflow.STATUS_STARTED:
Jerome Mariette's avatar
Jerome Mariette committed
537
                pretty_str += "\033[94m"
538
            elif workflow.get_status() == Workflow.STATUS_COMPLETED:
Jerome Mariette's avatar
Jerome Mariette committed
539
                pretty_str += "\033[92m"
540
            elif workflow.get_status() == Workflow.STATUS_FAILED:
Jerome Mariette's avatar
Jerome Mariette committed
541
                pretty_str += "\033[91m"
542
            elif workflow.get_status() == Workflow.STATUS_ABORTED:
Jerome Mariette's avatar
Jerome Mariette committed
543
                pretty_str += "\033[91m"
544
            elif workflow.get_status() == Workflow.STATUS_RESETED:
Jerome Mariette's avatar
Jerome Mariette committed
545
                pretty_str += "\033[3m"
546
            pretty_str += workflow.get_status() + "\033[0m"
Jerome Mariette's avatar
Jerome Mariette committed
547
548
            pretty_str += "\t" + elapsed_time + "\t" + start_time + "\t" + end_time
            return pretty_str
549
    
550
    def get_errors(self):
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
        if os.path.isfile(self.stderr):
            error = {
                "title"     : "",
                "msg"       : list(),
                "traceback" : list()
            }
            line_idx = 0
            FH_stderr = open( self.stderr )
            lines = FH_stderr.readlines()
            while line_idx < len(lines):
                if lines[line_idx].strip().startswith("##"):
                    error["title"]     = lines[line_idx].rstrip()
                    error["msg"]       = list()
                    error["traceback"] = list()
                    # skip all lines before the traceback
                    while not lines[line_idx].startswith("Traceback"):
                        line_idx += 1
                    # skip : "Traceback (most recent call last):"
                    line_idx += 1    
                    while lines[line_idx] != lines[line_idx].lstrip():
                        error["traceback"].append({ 
                                                   "location" : lines[line_idx].strip(),
                                                   "line"     : lines[line_idx].strip()
                        })
                        line_idx += 2
                    # Error message
                    while line_idx < len(lines) and not lines[line_idx].strip().startswith("##"):
                        try:
                            error["msg"].append( lines[line_idx].strip().split(":", 1)[1][1:] )
                        except:
                            error["msg"].append( lines[line_idx].strip() )
                        line_idx += 1
                    line_idx -= 1
                line_idx += 1
            FH_stderr.close()
            last_stack_location = ""
            if len(error["traceback"]) > 0:
                last_stack_location = error["traceback"][-1]["location"].strip()
                return { "msg" : error["msg"], "location" : last_stack_location }
            else:
                return None
Frédéric Escudié's avatar
Frédéric Escudié committed
592
593
        else:
            return None
594

595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
    def print_workflow_outputs(self, logs):
        outputs = self.get_outputs_per_components()
        descriptions = self.get_description_per_components()
        idx = 1
        for output in outputs:
            name = output[:output.index(".")]
            component = str(idx) + ". "
            if output in descriptions and descriptions[output] is not None:
                component += descriptions[output]
            else:
                component += name
            component += ":\n"
            has_outputs = False
            for output_file in outputs[output]:
                if logs or (not output_file.endswith(".stdout") and not output_file.endswith(".stderr")):
                    has_outputs = True
                    component += "  - " + outputs[output][output_file] + "\n"
            if has_outputs:
                print(component)
            idx += 1

616
617
618
619
620
621
    def get_description_per_components(self):
        dag = self.get_execution_graph()
        descriptions = {}
        for current_components in self.components:
            descriptions[current_components.get_nameid()] = current_components.get_description()
        return descriptions
622
                
623
    def get_outputs_per_components(self):
624
        outputs_files = OrderedDict()
625
        for current_components in self.components:
Philippe Bardou's avatar
Philippe Bardou committed
626
            #status = self.get_component_status(current_components.get_nameid())
627
            outputs_files[current_components.get_nameid()] = current_components.get_output_files()
Philippe Bardou's avatar
Philippe Bardou committed
628
            #outputs_files["0"] = status["completed"]
629
630
        return outputs_files
    
Jerome Mariette's avatar
Jerome Mariette committed
631
632
    def __setstate__(self, state):
        self.__dict__ = state.copy()
633
        self.external_components = self._import_external_components()
Jerome Mariette's avatar
Jerome Mariette committed
634
635
636
637
638
639
640
        threading.Thread.__init__(self, name=self.name)
        
    def __getstate__(self):
        """
        Threading uses Lock Object, do not consider these objects when serializing a workflow
        """
        odict = self.__dict__.copy()
641
        del odict['_started']
642
643
644
645
        if '_tstate_lock' in odict: # python 3.4
            del odict['_tstate_lock']
        else: # python 3.2
            del odict['_block']
646
647
        del odict['_stderr']
        if 'external_components' in odict:
648
            del odict['external_components']
Jerome Mariette's avatar
Jerome Mariette committed
649
650
        return odict
    
651
652
653
654
655
656
657
658
    def set_to_address(self, to_address):
        self.__to_address = to_address

    def set_subject(self, subject):
        self.__subject = subject

    def set_message(self, message):
        self.__message = message
659
660
661
662

    def _get_cleaned_email_placeholders(self, text):
        """
        @summary: Returns the text after replacement of placeholders by the corresponding workflow values (method or attribute).
Frédéric Escudié's avatar
Frédéric Escudié committed
663
                  Placeholders must be an attribute or a method of the workflow between three sharps: ###attribute### or ###method()###.
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
                  You can add "|date" after the attribute or the method to convert a timestamp in human readable date.
                  Examples: ###id### is replaced by wf.id ; ###get_status()### is replaced by wf.get_status() ; ###start_time|date### is rplaced by wf.start_time in date format.
        @param text: [str] The text containing placeholders.
        @return: [str] The text with placeholders replaced by her real value.
        """
        new_text = text
        placeholders = re.findall("\#\#\#([^\#]+)\#\#\#", text)
        for placeholder in placeholders:
            try:
                placeholder_value = ""
                placeholder_key = placeholder
                is_date = False
                if placeholder.endswith("|date"):
                    placeholder_key = placeholder[:-5]
                    is_date = True
                # Get value
                if placeholder_key.endswith("()"):
                    placeholder_value = str(getattr(self, placeholder_key[:-2])())
                else:
                    placeholder_value = str(getattr(self, placeholder_key))
                # Apply date format
                if is_date:
                    jflow_date_format = self.jflow_config_reader.get_date_format()
                    placeholder_value = time.strftime(jflow_date_format + " %H:%M:%S", time.gmtime(float(placeholder_value)))
                new_text = new_text.replace("###" + placeholder + "###", placeholder_value)
            except:
                pass
        return new_text

693
694
    def _send_email(self):
        import smtplib
695
        from email.mime.text import MIMEText
696
        smtps, smtpp, froma, fromp, toa, subject, message = self.jflow_config_reader.get_email_options()
697
698
699
700
701
        
        if self.__to_address: toa = self.__to_address
        if self.__subject: subject = self.__subject
        if self.__message: message = self.__message
        
702
        if smtps and smtpp and froma and fromp:
703
704
            if not toa: toa = froma
            if validate_email(froma) and validate_email(toa):
705
706
707
708
709
                try:
                    # Open a plain text file for reading.  For this example, assume that
                    # the text file contains only ASCII characters.
                    # Create a text/plain message
                    if not message:
710
                        message = Workflow.get_status_under_text_format(self, True, True, True)
711
                    message = self._get_cleaned_email_placeholders( message )
Jerome Mariette's avatar
Jerome Mariette committed
712
                    msg = MIMEText(message, 'html')
713
714
                    me = froma
                    you = toa
715
716
717
                    if not subject:
                        subject = "JFlow - Workflow #" + str(self.id) + " is " + self.get_status()
                    subject = self._get_cleaned_email_placeholders( subject )
718
719
720
721
722
723
724
                    msg['Subject'] = subject
                    msg['From'] = me
                    msg['To'] = you
                    # Send the message via our own SMTP server, but don't include the
                    # envelope header.
                    s = smtplib.SMTP(smtps, smtpp)
                    s.ehlo()
Jerome Mariette's avatar
Jerome Mariette committed
725
726
727
728
729
                    # if the SMTP server does not provides TLS or identification
                    try:
                        s.starttls()
                        s.login(me, fromp)
                    except smtplib.SMTPHeloError:
Jerome Mariette's avatar
Jerome Mariette committed
730
                        self._log("The server didn't reply properly to the HELO greeting.", level="warning", traceback=traceback.format_exc(chain=False))
Jerome Mariette's avatar
Jerome Mariette committed
731
                    except smtplib.SMTPAuthenticationError:
Jerome Mariette's avatar
Jerome Mariette committed
732
                        self._log("The server didn't accept the username/password combination.", level="warning", traceback=traceback.format_exc(chain=False))
Jerome Mariette's avatar
Jerome Mariette committed
733
                    except smtplib.SMTPException:
Jerome Mariette's avatar
Jerome Mariette committed
734
                        self._log("No suitable authentication method was found, or the server does not support the STARTTLS extension.", level="warning", traceback=traceback.format_exc(chain=False))
Jerome Mariette's avatar
Jerome Mariette committed
735
                    except RuntimeError:
Jerome Mariette's avatar
Jerome Mariette committed
736
                        self._log("SSL/TLS support is not available to your Python interpreter.", level="warning", traceback=traceback.format_exc(chain=False))
Jerome Mariette's avatar
Jerome Mariette committed
737
                    except:
Jerome Mariette's avatar
Jerome Mariette committed
738
                        self._log("Unhandled error when sending mail.", level="warning", traceback=traceback.format_exc(chain=False))
Jerome Mariette's avatar
Jerome Mariette committed
739
740
741
                    finally:
                        s.sendmail(me, [you], msg.as_string())
                        s.close()
742
                except:
Jerome Mariette's avatar
Jerome Mariette committed
743
                    self._log("Impossible to connect to smtp server '" + smtps + "'", level="warning", traceback=traceback.format_exc(chain=False))
744
    
745
746
747
748
    def get_parameters_per_groups(self):
        name = self.get_name()
        description = self.get_description()
        parameters = self.get_parameters()
749
750
        pgparameters, parameters_order = {}, []
        for param in parameters:
Jerome Mariette's avatar
Jerome Mariette committed
751
            if param.group not in parameters_order: parameters_order.append(param.group)
752
            if param.group in pgparameters:
753
                pgparameters[param.group].append(param)
Jerome Mariette's avatar
Jerome Mariette committed
754
            else:
755
756
                pgparameters[param.group] = [param]
        return [pgparameters, parameters_order]
Jerome Mariette's avatar
Jerome Mariette committed
757
    
758
759
760
    def get_parameters(self):
        params = []
        for param in self.params_order:
761
            for attribute_value in list(self.__dict__.values()):
762
763
764
765
                if (issubclass(attribute_value.__class__, AbstractParameter)) and param == attribute_value.name:
                    params.append(attribute_value)
        return params
    
766
    def get_exec_path(self, software):
767
768
769
770
771
        exec_path = self.jflow_config_reader.get_exec(software)
        if exec_path is None and os.path.isfile(os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "../bin", software)):
            exec_path = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "../bin", software)
        elif exec_path is None and os.path.isfile(os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "bin", software)):
            exec_path = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "bin", software)
772
        elif exec_path is None and utils.which(software) == None:
773
            raise Exception("'" + software + "' path connot be retrieved either in the PATH and in the application.properties file!")
774
775
776
        elif exec_path is None and utils.which(software) != None: 
            exec_path = software
        elif exec_path != None and not os.path.isfile(exec_path):
777
            raise Exception("'" + exec_path + "' set for '" + software + "' does not exists, please provide a valid path!")
778
        return exec_path
779
    
Jerome Mariette's avatar
Jerome Mariette committed
780
781
    def add_component(self, component_name, args=[], kwargs={}, component_prefix="default"):
        # first build and check if this component is OK
782
        if component_name in self.internal_components or component_name in self.external_components:
783
            
784
            if component_name in self.internal_components:
Jerome Mariette's avatar
Jerome Mariette committed
785
                my_pckge = __import__(self.internal_components[component_name], globals(), locals(), [component_name])
786
787
788
789
790
791
792
793
794
795
796
797
798
                # build the object and define required field
                cmpt_object = getattr(my_pckge, component_name)()
                cmpt_object.output_directory = self.get_component_output_directory(component_name, component_prefix)
                cmpt_object.prefix = component_prefix
                if kwargs: cmpt_object.define_parameters(**kwargs)
                else: cmpt_object.define_parameters(*args)
            # external components
            else :
                cmpt_object = self.external_components[component_name]()
                cmpt_object.output_directory = self.get_component_output_directory(component_name, component_prefix)
                cmpt_object.prefix = component_prefix
                # can't use positional arguments with external components
                cmpt_object.define_parameters(**kwargs)
799
800
            
            # there is a dynamic component
801
            if cmpt_object.is_dynamic():
802
803
804
805
                self.dynamic_component_present = True
                # if already init, add the component to the list and check if weaver should be executed
                if self.component_nameids_is_init:
                    # add the component
806
                    self.components_to_exec.append(cmpt_object)
807
                    self.components.append(cmpt_object)
808
809
810
811
812
813
814
815
816
                    self._execute_weaver()
                    # update outputs
                    for output in cmpt_object.get_dynamic_outputs():
                        output.update()
                else:
                    if self._component_is_duplicated(cmpt_object):
                        raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " + 
                                            cmpt_object.prefix + " already exist in this pipeline!")
                    self.component_nameids[cmpt_object.get_nameid()] = None
817
                    self.components_to_exec = []
818
                    self.components = []
819
820
821
            else:
                if self.component_nameids_is_init:
                    # add the component
822
                    self.components_to_exec.append(cmpt_object)
823
                    self.components.append(cmpt_object)
824
                elif not self.component_nameids_is_init and not self.dynamic_component_present:
Jerome Mariette's avatar
Jerome Mariette committed
825
826
827
                    if self._component_is_duplicated(cmpt_object):
                        raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " + 
                                            cmpt_object.prefix + " already exist in this pipeline!")
828
                    self.components_to_exec.append(cmpt_object)
829
                    self.components.append(cmpt_object)
830
831
832
833
834
835
                else:
                    if self._component_is_duplicated(cmpt_object):
                        raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " + 
                                            cmpt_object.prefix + " already exist in this pipeline!")
                    self.component_nameids[cmpt_object.get_nameid()] = None

836
837
            return cmpt_object
        else:
838
            raise ImportError(component_name + " component cannot be loaded, available components are: {0}".format(
839
                                           ", ".join(list(self.internal_components.keys()) + list(self.external_components.keys()))))
Jerome Mariette's avatar
Jerome Mariette committed
840
841
842
843
844
845
846
847
    
    def pre_process(self):
        pass
    
    def process(self):
        """ 
        Run the workflow, has to be implemented by subclasses
        """
848
        raise NotImplementedError( "Workflow.process() must be implemented in " + self.__class__.__name__ )
849
850
851

    def get_name(self):
        """ 
852
        Return the workflow name.
853
        """
Jerome Mariette's avatar
Jerome Mariette committed
854
        return self.__class__.__name__.lower()
855
856
857
858
859
    
    def get_description(self):
        """ 
        Return the workflow description, has to be implemented by subclasses
        """
860
        raise NotImplementedError( "Workflow.get_description() must be implemented in " + self.__class__.__name__ )
861
    
862
    def define_parameters(self, function="process"):
863
864
865
        """ 
        Define the workflow parameters, has to be implemented by subclasses
        """
866
        raise NotImplementedError( "Workflow.define_parameters() must be implemented in " + self.__class__.__name__ )
Jerome Mariette's avatar
Jerome Mariette committed
867
868
869
870
871
872
873
874
875
876
877
878
    
    def post_process(self):
        pass
    
    def get_temporary_file(self, suffix=".txt"):
        tempfile_name = os.path.basename(tempfile.NamedTemporaryFile(suffix=suffix).name)
        return os.path.join(self.jflow_config_reader.get_tmp_directory(), tempfile_name)

    def get_component_output_directory(self, component_name, component_prefix):
        return os.path.join(self.directory, component_name + "_" + component_prefix)
    
    def get_components_nameid(self):
879
        return list(self.component_nameids.keys())
Jerome Mariette's avatar
Jerome Mariette committed
880
    
Jerome Mariette's avatar
Jerome Mariette committed
881
882
    def wf_execution_wrapper(self):
        getattr(self, self.function)()
Jerome Mariette's avatar
Jerome Mariette committed
883
    
Jerome Mariette's avatar
Jerome Mariette committed
884
885
886
887
    def run(self):
        """
        Only require for Threading
        """
888
889
        try:
            # if this is the first time the workflow run
890
            if self.__step == None:
891
                self.start_time = time.time()
Jerome Mariette's avatar
Jerome Mariette committed
892
                self.__step = 0
893
894
                self._status = self.STATUS_STARTED
                self._postprocess_status = self.STATUS_PENDING
895
                self.end_time = None
Jerome Mariette's avatar
Jerome Mariette committed
896
897
                # if some args are provided, let's fill the parameters
                self._set_parameters(self.args)
898
                self._serialize()