component.py 30.6 KB
Newer Older
Jerome Mariette's avatar
Jerome Mariette committed
1
#
Jerome Mariette's avatar
Jerome Mariette committed
2
# Copyright (C) 2015 INRA
Jerome Mariette's avatar
Jerome Mariette committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import os
19
import re
20
import sys
Jerome Mariette's avatar
Jerome Mariette committed
21
22
import inspect
import tempfile
Jerome Mariette's avatar
Jerome Mariette committed
23
import types
Jerome Mariette's avatar
Jerome Mariette committed
24

25
26
from operator import attrgetter

Jerome Mariette's avatar
Jerome Mariette committed
27
28
29
from jflow.workflows_manager import WorkflowsManager
from jflow.config_reader import JFlowConfigReader
from jflow.dataset import ArrayList
Jerome Mariette's avatar
Jerome Mariette committed
30
from jflow.utils import which, display_error_message
Jerome Mariette's avatar
Jerome Mariette committed
31
from jflow.parameter import *
32
from jflow.abstraction import MultiMap
Jerome Mariette's avatar
Jerome Mariette committed
33

34
35
36
from weaver.util import parse_string_list
from weaver.function import ShellFunction
from weaver.abstraction import Map
37
38
from weaver.function import PythonFunction

Jerome Mariette's avatar
Jerome Mariette committed
39
40

class Component(object):
Céline Noirot's avatar
Céline Noirot committed
41
    
Jerome Mariette's avatar
Jerome Mariette committed
42
    TRACE_FILE_NAME = "trace.txt"
Céline Noirot's avatar
Céline Noirot committed
43
    
44
    def __init__(self):
Jerome Mariette's avatar
Jerome Mariette committed
45
        self.prefix = "default"
Jerome Mariette's avatar
Jerome Mariette committed
46
        self.params_order = []
Jerome Mariette's avatar
Jerome Mariette committed
47
48
        self.output_directory = None
        self.config_reader = JFlowConfigReader()
49
        self.version = self.get_version()
Jerome Mariette's avatar
Jerome Mariette committed
50
51
        if isinstance(self.version, bytes):
            self.version = self.version.decode()
52
        self.batch_options = self.config_reader.get_component_batch_options(self.__class__.__name__)
53
54
55
56
57
58
59
60
61
62
63
64
65
        # in case of SGE, parse the cpu and memory parameter
        type, options, limit_submission = self.jflow_config_reader.get_batch()
        if type.lower() == "sge":
            try:
                self.__cpu = int(re.match( r'.*-pe\s+(\w+)\s+(\d+)\s?.*', self.batch_options).group(2))
                self.__memory = int(re.match( r'.*-l\s+mem=(\d+\S+)\s?.*', self.batch_options))
            except: pass 

    def get_cpu(self):
        return self.__cpu
    
    def get_memory(self):
        return self.__memory
Frédéric Escudié's avatar
Frédéric Escudié committed
66
67

    def is_dynamic(self):
68
69
70
71
72
73
74
        return len(self.get_dynamic_outputs()) != 0

    def get_dynamic_outputs(self):
        """
         @return : the list of outputs updated at the end of component execution.
        """
        dynamic_outputs = list()
75
        for attribute_value in list(self.__dict__.values()):
76
77
78
            if issubclass( attribute_value.__class__, DynamicOutput ):
                dynamic_outputs.append( attribute_value )
        return dynamic_outputs
Frédéric Escudié's avatar
Frédéric Escudié committed
79

80
    def get_output_files(self):
Philippe Bardou's avatar
Philippe Bardou committed
81
        outputs = {}
82
        for attribute_value in list(self.__dict__.values()):
83
            if ( issubclass( attribute_value.__class__, DynamicOutput ) or
Philippe Bardou's avatar
Philippe Bardou committed
84
85
                 issubclass( attribute_value.__class__, OutputFileList) ):
                for f in attribute_value:
86
                    outputs[os.path.basename(f)] = f
87
            elif issubclass( attribute_value.__class__, OutputFile):
88
                outputs[os.path.basename(attribute_value)] = attribute_value
89
        return outputs
Frédéric Escudié's avatar
Frédéric Escudié committed
90

91
92
93
    def add_input_directory(self, name, help, default=None, required=False, flag=None, 
                            group="default",  
                            display_name=None, cmd_format="", argpos=-1):
Jerome Mariette's avatar
Jerome Mariette committed
94
95
96
97
98
99
        new_param = InputDirectory(name, help, flag=flag, default=default, required=required, group=group, 
                                   display_name=display_name, cmd_format=cmd_format, argpos=argpos)
        # store where the parameter is coming from
        new_param.linkTrace_nameid = self.get_nameid()
        if issubclass( default.__class__, LinkTraceback ):
            new_param.parent_linkTrace_nameid = [default.linkTrace_nameid]
100
101
102
        # add it to the class itself
        self.params_order.append(name)
        self.__setattr__(name, new_param)
Jerome Mariette's avatar
Jerome Mariette committed
103

104
    def add_input_file(self, name, help, file_format="any", default=None, type="inputfile", 
105
                       required=False, flag=None, group="default", display_name=None, 
106
                       cmd_format="", argpos=-1):
107
        new_param = InputFile(name, help, flag=flag, file_format=file_format, default=default, 
108
109
                              type=type, required=required, group=group, display_name=display_name, 
                              cmd_format=cmd_format, argpos=argpos)
110
        # store where the parameter is coming from
Frédéric Escudié's avatar
Frédéric Escudié committed
111
        new_param.linkTrace_nameid = self.get_nameid()
Frédéric Escudié's avatar
Frédéric Escudié committed
112
        if issubclass( default.__class__, LinkTraceback ):
Frédéric Escudié's avatar
Frédéric Escudié committed
113
            new_param.parent_linkTrace_nameid = [default.linkTrace_nameid]
114
115
116
        # add it to the class itself
        self.params_order.append(name)
        self.__setattr__(name, new_param)
117
118
119
120
121
    
    def reset(self):
        for file in os.listdir(self.output_directory):
            os.remove(os.path.join(self.output_directory, file))
    
122
    def add_input_file_list(self, name, help, file_format="any", default=None, type="inputfile", 
123
                            required=False, flag=None, group="default", display_name=None,
124
                            cmd_format="", argpos=-1):
125
126
        if default == None:
            inputs = []
127
        elif issubclass(default.__class__, list):
Frédéric Escudié's avatar
Frédéric Escudié committed
128
            inputs = [IOFile(file, file_format, self.get_nameid(), None) for file in default]
129
130
        else:
            inputs = [IOFile(default, file_format, self.get_nameid(), None)]
131
        new_param = InputFileList(name, help, flag=flag, file_format=file_format, default=inputs, 
132
133
                                  type=type, required=required, group=group, display_name=display_name,
                                  cmd_format=cmd_format, argpos=argpos)
134
        # store where the parameter is coming from
Frédéric Escudié's avatar
Frédéric Escudié committed
135
        new_param.linkTrace_nameid = self.get_nameid()
136
137
        if issubclass( default.__class__, list ):
            for idx, val in enumerate(default):
Frédéric Escudié's avatar
Frédéric Escudié committed
138
                if issubclass( val.__class__, LinkTraceback ):
Frédéric Escudié's avatar
Frédéric Escudié committed
139
                    new_param[idx].parent_linkTrace_nameid = [val.linkTrace_nameid]
Frédéric Escudié's avatar
Frédéric Escudié committed
140
141
                    if not val.linkTrace_nameid in new_param.parent_linkTrace_nameid:
                        new_param.parent_linkTrace_nameid.append( val.linkTrace_nameid )
Frédéric Escudié's avatar
Frédéric Escudié committed
142
143
        else:
            if issubclass( default.__class__, LinkTraceback ):
Frédéric Escudié's avatar
Frédéric Escudié committed
144
145
146
                new_param[0].parent_linkTrace_nameid = [default.linkTrace_nameid]
                if not default.linkTrace_nameid in new_param.parent_linkTrace_nameid:
                    new_param.parent_linkTrace_nameid.append( default.linkTrace_nameid )
147
148
149
        # add it to the class itself
        self.params_order.append(name)
        self.__setattr__(name, new_param)
Frédéric Escudié's avatar
Frédéric Escudié committed
150

151
    def add_parameter(self, name, help, default=None, type=str, choices=None, 
152
                      required=False, flag=None, group="default", display_name=None,   
153
                      cmd_format="", argpos=-1):
Frédéric Escudié's avatar
Frédéric Escudié committed
154
        new_param = ParameterFactory.factory(name, help, flag=flag, default=default, type=type, choices=choices, 
155
156
                              required=required, group=group, display_name=display_name,  
                              cmd_format=cmd_format, argpos=argpos)
157
158
159
        # add it to the class itself
        self.params_order.append(name)
        self.__setattr__(name, new_param)
160

161
    def add_parameter_list(self, name, help, default=None, type=str, choices=None, 
162
                           required=False, flag=None, group="default", display_name=None,
163
                           cmd_format="", argpos=-1):
164
165
        if default == None: default = []
        new_param = ParameterList(name, help, flag=flag, default=default, type=type, choices=choices, 
166
167
                                  required=required, group=group, display_name=display_name,
                                  cmd_format=cmd_format, argpos=argpos)
168
169
170
        # add it to the class itself
        self.params_order.append(name)
        self.__setattr__(name, new_param)
Frédéric Escudié's avatar
Frédéric Escudié committed
171

172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
    def add_input_object(self, name, help, default=None, required=False):
        new_param = InputObject(name, help, default=default, required=required)
        # store where the parameter is coming from
        new_param.linkTrace_nameid = self.get_nameid()
        
        if issubclass( default.__class__, list ):
            for idx, val in enumerate(default):
                if hasattr( val, "linkTrace_nameid" ):
                    if not val.linkTrace_nameid in new_param.parent_linkTrace_nameid:
                        new_param.parent_linkTrace_nameid.append(val.linkTrace_nameid)
                    new_param.default[idx].parent_linkTrace_nameid = [val.linkTrace_nameid]
                    new_param.default[idx].linkTrace_nameid = self.get_nameid()
        elif hasattr( default, "linkTrace_nameid" ):
            new_param.parent_linkTrace_nameid = [default.linkTrace_nameid]
            new_param.default.parent_linkTrace_nameid = [default.linkTrace_nameid]
            new_param.default.linkTrace_nameid = self.get_nameid()
        
        # add it to the class itself
        self.params_order.append(name)
        self.__setattr__(name, new_param)        
    
    def add_input_object_list(self, name, help, default=None, required=False):
        if default == None: default = []
        new_param = InputObjectList(name, help, default=default, required=required)
        
        # store where the parameter is coming from
        new_param.linkTrace_nameid = self.get_nameid()
        for idx, val in enumerate(new_param.default):
            if hasattr( val, "linkTrace_nameid" ):
                if not val.linkTrace_nameid in new_param.parent_linkTrace_nameid:
                    new_param.parent_linkTrace_nameid.append(val.linkTrace_nameid)
                new_param.default[idx].parent_linkTrace_nameid = [val.linkTrace_nameid]
                new_param.default[idx].linkTrace_nameid = self.get_nameid()

        # add it to the class itself
        self.params_order.append(name)
        self.__setattr__(name, new_param)
    
    def add_output_object(self, name, help, required=False):
        new_param = OutputObject(name, help, required=required)
        # store where the parameter is coming from
        new_param.linkTrace_nameid = self.get_nameid()
        new_param.default.linkTrace_nameid = self.get_nameid()
        # add it to the class itself
        self.params_order.append(name)
        self.__setattr__(name, new_param)
    
    def add_output_object_list(self, name, help, nb_items=0, required=False):
        new_param = OutputObjectList(name, help, nb_items=nb_items, required=required)
        # store where the parameter is coming from
        new_param.linkTrace_nameid = self.get_nameid()
        for idx, val in enumerate(new_param.default):
            new_param.default[idx].linkTrace_nameid = self.get_nameid()
        # add it to the class itself
        self.params_order.append(name)
        self.__setattr__(name, new_param)
        
229
    def add_output_file(self, name, help, file_format="any", filename=None, group="default", display_name=None,
230
                         cmd_format="", argpos=-1):
231
        filename = os.path.basename(filename)
Frédéric Escudié's avatar
Frédéric Escudié committed
232
        new_param = OutputFile(name, help, default=os.path.join(self.output_directory, filename),
233
234
                               file_format=file_format, group=group, display_name=display_name,
                               cmd_format=cmd_format, argpos=argpos)
Frédéric Escudié's avatar
Frédéric Escudié committed
235
        # store where the parameter is coming from
Frédéric Escudié's avatar
Frédéric Escudié committed
236
        new_param.linkTrace_nameid = self.get_nameid()
237
238
239
        # add it to the class itself
        self.params_order.append(name)
        self.__setattr__(name, new_param)
Frédéric Escudié's avatar
Frédéric Escudié committed
240

241
    def add_output_file_list(self, name, help, file_format="any", pattern='{basename_woext}.out', 
242
                             items=None, group="default", display_name=None, cmd_format="", argpos=-1):
Frédéric Escudié's avatar
Frédéric Escudié committed
243
        default = [IOFile(file, file_format, self.get_nameid(), None) for file in self.get_outputs(pattern, items)]
244
245
        new_param = OutputFileList(name, help, default=default, file_format=file_format, group=group, display_name=display_name,
                                   cmd_format=cmd_format, argpos=argpos)
Frédéric Escudié's avatar
Frédéric Escudié committed
246
        # store where the parameter is coming from
Frédéric Escudié's avatar
Frédéric Escudié committed
247
        new_param.linkTrace_nameid = self.get_nameid()
248
249
250
        # add it to the class itself
        self.params_order.append(name)
        self.__setattr__(name, new_param)
Frédéric Escudié's avatar
Frédéric Escudié committed
251

Frédéric Escudié's avatar
Frédéric Escudié committed
252
    def add_output_file_endswith(self, name, help, pattern, file_format="any", behaviour="include",
253
                               group="default", display_name=None, cmd_format="", argpos=-1):
254
        new_param = OutputFilesEndsWith(name, help, self.output_directory, pattern, include=(behaviour == "include"), 
255
256
                                       file_format=file_format, group=group, display_name=display_name,
                                       cmd_format=cmd_format, argpos=argpos)
Frédéric Escudié's avatar
Frédéric Escudié committed
257
        # store where the parameter is coming from
Frédéric Escudié's avatar
Frédéric Escudié committed
258
        new_param.linkTrace_nameid = self.get_nameid()
259
260
261
        # add it to the class itself
        self.params_order.append(name)
        self.__setattr__(name, new_param)
Frédéric Escudié's avatar
Frédéric Escudié committed
262

Frédéric Escudié's avatar
Frédéric Escudié committed
263
    def add_output_file_pattern(self, name, help, pattern, file_format="any", behaviour="include",
264
                               group="default", display_name=None, cmd_format="", argpos=-1):
265
        new_param = OutputFilesPattern(name, help, self.output_directory, pattern, include=(behaviour == "exclude"), 
266
267
                                       file_format=file_format, group=group, display_name=display_name,
                                       cmd_format=cmd_format, argpos=argpos)
Frédéric Escudié's avatar
Frédéric Escudié committed
268
        # store where the parameter is coming from
Frédéric Escudié's avatar
Frédéric Escudié committed
269
        new_param.linkTrace_nameid = self.get_nameid()
270
271
272
        # add it to the class itself
        self.params_order.append(name)
        self.__setattr__(name, new_param)
273
    
Jerome Mariette's avatar
Jerome Mariette committed
274
    def _longestCommonSubstr(self, data, clean_end=True):
Jerome Mariette's avatar
Jerome Mariette committed
275
276
277
278
279
280
281
282
        substr = ''
        if len(data) > 1 and len(data[0]) > 0:
            for i in range(len(data[0])):
                for j in range(len(data[0])-i+1):
                    if j > len(substr) and all(data[0][i:i+j] in x for x in data):
                        substr = data[0][i:i+j]
        else:
            substr = data[0]
Jerome Mariette's avatar
Jerome Mariette committed
283
284
285
        if clean_end:
            while substr.endswith("_") or substr.endswith("-") or substr.endswith("."):
                substr = substr[:-1]
Jerome Mariette's avatar
Jerome Mariette committed
286
        return substr
Frédéric Escudié's avatar
Frédéric Escudié committed
287

Jerome Mariette's avatar
Jerome Mariette committed
288
289
290
291
    def get_outputs(self, output_list=None, input_list=None):
        """
        If `output_list` is a string template, then it may have the following
        fields:
Frédéric Escudié's avatar
Frédéric Escudié committed
292

Jerome Mariette's avatar
Jerome Mariette committed
293
294
295
296
        - `{fullpath}`, `{FULL}`         -- Full input file path.
        - `{basename}`, `{BASE}`         -- Base input file name.
        - `{fullpath_woext}`, `{FULLWE}` -- Full input file path without extension
        - `{basename_woext}`, `{BASEWE}` -- Base input file name without extension
Frédéric Escudié's avatar
Frédéric Escudié committed
297
        """
Jerome Mariette's avatar
Jerome Mariette committed
298
299
        if output_list is None:
            return []
Frédéric Escudié's avatar
Frédéric Escudié committed
300

Jerome Mariette's avatar
Jerome Mariette committed
301
        if isinstance(output_list, str):
Jerome Mariette's avatar
Jerome Mariette committed
302
            ilist = []
Jerome Mariette's avatar
Jerome Mariette committed
303
            if not input_list or not '{' in str(output_list):
304
305
306
307
                if input_list is not None and len(input_list) == 0:
                    return []
                else:
                    return [output_list]
Jerome Mariette's avatar
Jerome Mariette committed
308
309
310
311
312
313
314
315
316
317
            # if multiple list of inputs is used
            elif isinstance(input_list[0], list):
                for i, val in enumerate(input_list[0]):
                    iter_values = []
                    for j, ingroup in enumerate(input_list):
                        iter_values.append(os.path.basename(input_list[j][i]))
                    ilist.append(self._longestCommonSubstr(iter_values))
            else:
                ilist = parse_string_list(input_list)
                            
Jerome Mariette's avatar
Jerome Mariette committed
318
319
320
321
322
323
324
325
326
327
328
            return [os.path.join(self.output_directory, str(output_list).format(
                        fullpath       = input,
                        FULL           = input,
                        i              = '{0:05X}'.format(i),
                        NUMBER         = '{0:05X}'.format(i),
                        fullpath_woext = os.path.splitext(input)[0],
                        FULL_WOEXT     = os.path.splitext(input)[0],
                        basename       = os.path.basename(input),
                        BASE           = os.path.basename(input),
                        basename_woext = os.path.splitext(os.path.basename(input))[0] if os.path.splitext(os.path.basename(input))[1] != ".gz" else os.path.splitext(os.path.splitext(os.path.basename(input))[0])[0],
                        BASE_WOEXT     = os.path.splitext(os.path.basename(input))[0] if os.path.splitext(os.path.basename(input))[1] != ".gz" else os.path.splitext(os.path.splitext(os.path.basename(input))[0])[0]))
Jerome Mariette's avatar
Jerome Mariette committed
329
                    for i, input in enumerate(ilist)]
Jerome Mariette's avatar
Jerome Mariette committed
330
331
332
    
    def execute(self):
        # first create the output directory
Jerome Mariette's avatar
Jerome Mariette committed
333
        if not os.path.isdir(self.output_directory):
334
            os.makedirs(self.output_directory, 0o751)
Jerome Mariette's avatar
Jerome Mariette committed
335
        # then run the component
Jerome Mariette's avatar
Jerome Mariette committed
336
        self.process()
Jerome Mariette's avatar
Jerome Mariette committed
337
    
Jerome Mariette's avatar
Jerome Mariette committed
338
    def process(self):
Jerome Mariette's avatar
Jerome Mariette committed
339
        """ 
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
        Run the component, can be implemented by subclasses for a 
        more complex process 
        """
        # get all parameters
        parameters = []
        inputs = []
        outputs = []
        for param_name in self.params_order:
            param = self.__getattribute__(param_name)
            if isinstance(param, AbstractParameter) :
                if isinstance(param, AbstractInputFile):
                    inputs.append(param)
                elif isinstance(param, AbstractOutputFile):
                    outputs.append(param)
                else :
                    parameters.append(param)
        
        # sort parameters using argpos
        parameters = sorted(parameters, key=attrgetter('argpos'))
        inputs = sorted(inputs, key=attrgetter('argpos'))
        outputs = sorted(outputs, key=attrgetter('argpos'))
        filteredparams = []
362
        commandline = self.get_exec_path(self.get_command())
363
364
365
366
367
368

        for p in parameters :
            if isinstance(p, BoolParameter) :
                if p:
                    commandline += " %s " % p.cmd_format
            else :
369
370
                if p.default :
                    commandline += " %s %s " % (p.cmd_format, p.default)
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
        
        abstraction = self.get_abstraction()
        
        if abstraction == None:
            cpt = 1
            for file in inputs + outputs :
                if isinstance(file, InputFile) or isinstance(file, OutputFile):
                    commandline +=  ' %s $%s ' % (file.cmd_format, cpt)
                    cpt+=1
                # input file list or output file list / pattern / ends with
                else :
                    for e in file :
                        commandline += ' %s $%s ' % (file.cmd_format, cpt)
                        cpt+=1
            function = ShellFunction( commandline,  cmd_format='{EXE} {IN} {OUT}')
            function(inputs=inputs, outputs=outputs) 
        # weaver map abstraction
        elif abstraction == 'map' :
            if not(len(inputs) == len(outputs) == 1) :
                display_error_message("You can only have one type of input and one type of output for the map abstraction")

            for file in inputs :
                commandline += ' %s $1 ' % file.cmd_format
                if isinstance(file, ParameterList) :
                    inputs = file
            
            for file in outputs :
                commandline += ' %s $2 ' % file.cmd_format
                if isinstance(file, ParameterList) :
                    outputs = file 
            
            function = ShellFunction( commandline,  cmd_format='{EXE} {IN} {OUT}')
            exe = Map(function, inputs=inputs, outputs=outputs)
        
        # jflow multimap
        elif abstraction == 'multimap' :
            cpt = 1
            for file in inputs + outputs:
                if not(isinstance(file, ParameterList)):
                    display_error_message("Multimap abstraction can be used only with ParameterList")
                commandline += ' %s $%s ' % (file.cmd_format, cpt)
                cpt+=1
            
            function  = ShellFunction( commandline,  cmd_format='{EXE} {IN} {OUT}')
            exe = MultiMap(function, inputs=inputs, outputs=outputs)
        # anything other than that will be considered errored
        else :
            raise Exception('Unsupported abstraction %s ' % abstraction)
        
    def get_command(self):
        """
            get a path to an executable. Has to be implemented by subclasses 
            if the process has not been implemented  
Jerome Mariette's avatar
Jerome Mariette committed
424
        """
425
        raise NotImplementedError("Either the Component.get_command() function or the Component.process() function has to be implemented!")
Jerome Mariette's avatar
Jerome Mariette committed
426
    
427
428
429
430
431
    def get_abstraction(self):
        """
            get the abstraction. Has to be implemented by subclasses 
            if the process has not been implemented  
        """ 
432
        raise NotImplementedError("Either the Component.get_abstraction() function or the Component.process() function has to be implemented!")
433

434
435
436
437
    def get_version(self):
        """ 
        Return the tool version, has to be implemented by subclasses
        """
Jerome Mariette's avatar
Jerome Mariette committed
438
        return None
439
    
Jerome Mariette's avatar
Jerome Mariette committed
440
441
442
    def get_temporary_file(self, suffix=".txt"):
        # first check if tmp directory exists
        if not os.path.isdir(self.config_reader.get_tmp_directory()):
443
            os.makedirs(self.config_reader.get_tmp_directory(), 0o751)
Jerome Mariette's avatar
Jerome Mariette committed
444
445
446
447
448
449
450
451
        tempfile_name = os.path.basename(tempfile.NamedTemporaryFile(suffix=suffix).name)
        return os.path.join(self.config_reader.get_tmp_directory(), tempfile_name)
    
    def define_parameters(self, *args):
        """ 
        Define the component parameters, has to be implemented by subclasses
        """
        raise NotImplementedError
452
453
454

    def get_resource(self, resource):
        return self.config_reader.get_resource(resource)
Jerome Mariette's avatar
Jerome Mariette committed
455
456
    
    def get_exec_path(self, software):
457
        exec_path = self.config_reader.get_exec(software)
458
459
460
        if exec_path is None and os.path.isfile(os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "../../bin", software)):
            exec_path = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "../../bin", software)
        elif exec_path is None and os.path.isfile(os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "../bin", software)):
461
462
463
            exec_path = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "../bin", software)
        elif exec_path is None and os.path.isfile(os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "bin", software)):
            exec_path = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "bin", software)
464
        elif exec_path is None and which(software) == None:
465
466
            logging.getLogger("jflow").exception("'" + software + "' path connot be retrieved either in the PATH and in the application.properties file!")
            raise Exception("'" + software + "' path connot be retrieved either in the PATH and in the application.properties file!")
467
468
469
        elif exec_path is None and which(software) != None: 
            exec_path = software
        elif exec_path != None and not os.path.isfile(exec_path):
470
471
            logging.getLogger("jflow").exception("'" + exec_path + "' set for '" + software + "' does not exists, please provide a valid path!")
            raise Exception("'" + exec_path + "' set for '" + software + "' does not exists, please provide a valid path!")
472
        return exec_path
Jerome Mariette's avatar
Jerome Mariette committed
473
    
Jerome Mariette's avatar
Jerome Mariette committed
474
475
476
    def get_nameid(self):
        return self.__class__.__name__ + "." + self.prefix
    
Jerome Mariette's avatar
Jerome Mariette committed
477
478
    def __eq__(self, other):
        return self.__class__ == other.__class__ and self.prefix == other.prefix
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
    
    def __getattribute__(self, attr):
        # an IOobject is a specific object defined by the presence of the dump_path attribute
        if hasattr(object.__getattribute__(self, attr), "default"):
            if isinstance (object.__getattribute__(self, attr).default, OObject) and os.path.exists(object.__getattribute__(self, attr).default.dump_path):
                object.__getattribute__(self, attr).default=object.__getattribute__(self, attr).default.load()
            if hasattr(object.__getattribute__(self, attr).default, "is_ioobject"):
                return object.__getattribute__(self, attr).default
            elif isinstance(object.__getattribute__(self, attr).default, list) and len(object.__getattribute__(self, attr).default)>0:
                if isinstance(object.__getattribute__(self, attr).default[0], OObject):
                    for i, val in enumerate (object.__getattribute__(self, attr).default):
                       if os.path.exists(val.dump_path):
                           object.__getattribute__(self, attr).default[i]=val.load()
                if hasattr(object.__getattribute__(self, attr).default[0], "is_ioobject"):
                    return object.__getattribute__(self, attr).default
            
                
        return object.__getattribute__(self, attr)
    
    def __generate_iolist (self, ioparameter, map):
        new_ios = []
        includes = []
        if map :
            if len (ioparameter) >0 :
                if isinstance(ioparameter[0], list):
                    for cin in ioparameter:
                        if hasattr(cin[0], "is_ioobject"):
                            new_ios.append([i.dump_path for i in cin])
                        else:
                            new_ios.append(cin)
                else:
                    for cin in ioparameter:
                        if hasattr(cin, "is_ioobject"):
                            new_ios.append(cin.dump_path)
                        else:
                            new_ios.append(cin)
        else :
            new_ios = []
            if hasattr(ioparameter, "is_ioobject"):
                includes.extend(ioparameter.includes)
                new_ios.append(ioparameter.dump_path)
            elif isinstance(ioparameter, list):
                for cin in ioparameter:
                    if hasattr(cin, "is_ioobject"):
                        includes.extend(cin.includes)
                        new_ios.append(cin.dump_path)
                    else:
                        new_ios.append(cin)
            else:
                new_ios = ioparameter
        return new_ios,includes
    
    def add_python_execution(self, function, inputs=[], outputs=[], arguments=[], includes=[], 
                             add_path=None, collect=False, local=False, map=False, cmd_format=""):
               
        if map:
            if arguments != [] :
                logging.getLogger("jflow").exception("add_python_execution: '" + function.__name__ + "' arguments parameter not allowed with map!")
                raise Exception("add_python_execution: '" + function.__name__ + "' arguments parameter not allowed with map!" )
            if not issubclass(inputs.__class__, list) or not issubclass(outputs.__class__, list):
                logging.getLogger("jflow").exception("add_python_execution: '" + function.__name__ + "' map requires a list as inputs and output!")
                raise Exception("add_python_execution: '" + function.__name__ + "' map requires a list as inputs and output!")
        #Command format to build
        if cmd_format == "" :
            cmd_format = "{EXE} "
            if len(arguments)>0:
                cmd_format += " {ARG}"
            if (isinstance(inputs, list) and len(inputs)>0) or (inputs is not None and inputs != []):
                cmd_format += " {IN}"
            if (isinstance(outputs, list) and len(outputs)>0) or (outputs is not None and outputs != []):
                cmd_format += " {OUT}"
        py_function = PythonFunction(function, add_path=add_path, cmd_format=cmd_format)

        
        new_inputs,includes_in = self.__generate_iolist(inputs, map)
        new_outputs,includes_out = self.__generate_iolist(outputs, map)
        if not isinstance(includes, list):
            includes=[includes]
        if map:
            MultiMap(py_function, inputs=new_inputs, outputs=new_outputs, includes=includes+includes_in, collect=collect, local=local)
        else:
            py_function(inputs=new_inputs, outputs=new_outputs, arguments=arguments, includes=includes+includes_in)
        
Jerome Mariette's avatar
Jerome Mariette committed
562
        self.__write_trace(function.__name__, inputs, outputs, arguments, cmd_format, map, "PythonFunction")
Céline Noirot's avatar
Céline Noirot committed
563
        
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
    def add_shell_execution(self, source, inputs=[], outputs=[], arguments=[], includes=[], 
                            cmd_format=None, map=False, shell=None, collect=False, local=False):
        shell_function  = ShellFunction( source, shell=shell, cmd_format=cmd_format )
       
        # if abstraction is map or multimap
        if map :
            # if input and output are list or filelist
            if issubclass(inputs.__class__, list) and issubclass(outputs.__class__, list) :
                # arguments cannot be set with 
                if arguments != [] :
                    logging.getLogger("jflow").exception("add_shell_execution: '" + source + "' arguments parameter not allowed with map")
                    raise Exception("add_shell_execution: '" + source + "' arguments parameter not allowed with map" )
                MultiMap(shell_function,inputs=inputs, outputs=outputs, includes=includes, collect=collect, local=local)
            else :
                logging.getLogger("jflow").exception("add_shell_execution: '" + source + "' map requires a list as inputs and output")
                raise Exception("add_shell_execution: '" + source + "'  map requires a list as inputs and output")
            
        else :
            shell_function( inputs=inputs, outputs=outputs, arguments=arguments, includes=includes )   
Jerome Mariette's avatar
Jerome Mariette committed
583
        self.__write_trace(source, inputs, outputs, arguments, cmd_format, map, "Shell")
Céline Noirot's avatar
Céline Noirot committed
584
        
Jerome Mariette's avatar
Jerome Mariette committed
585
586
    def __write_trace(self, name,  inputs, outputs, arguments, cmd_format, map, type):
        trace_fh=open(os.path.join(self.output_directory, Component.TRACE_FILE_NAME), "a")
Céline Noirot's avatar
Céline Noirot committed
587
588
589
590
591
592
        trace_fh.write("###\n###Execution trace of "+type+": "+name+"\n")
        if map :
            trace_fh.write("MODE MAP\n")
        
        if cmd_format != "" and type == "Shell":
            trace_fh.write("COMMAND: " + str(cmd_format) + "\n")
Jerome Mariette's avatar
Jerome Mariette committed
593
594
595
        self.__write_element(trace_fh,"ARGCUMENTS",arguments)
        self.__write_element(trace_fh,"INPUTS",inputs)
        self.__write_element(trace_fh,"OUTPUTS",outputs)
Céline Noirot's avatar
Céline Noirot committed
596
597
        trace_fh.close()
        
Jerome Mariette's avatar
Jerome Mariette committed
598
    def __write_element(self,fh, title, element):
Céline Noirot's avatar
Céline Noirot committed
599
600
601
602
603
604
605
606
607
608
609
610
611
        to_write=''
        if isinstance(element, list):
            if len (element)> 0 :
                if isinstance(element[0], list):
                    for i in range(len(element)) : 
                        to_write+="List"+str(i+1)+": \n"
                        to_write+="\n".join(element[i])+"\n"
                else : 
                    to_write+="\n".join(element)+"\n"
        else :
            to_write+= element+"\n"
        if to_write != "" :
            fh.write(title+" :\n")
Jerome Mariette's avatar
Jerome Mariette committed
612
613
            fh.write(to_write)