run.py 20.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# Copyright (C) 2009 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

__author__ = 'Plateforme bioinformatique Midi Pyrenees'
__copyright__ = 'Copyright (C) 2009 INRA'
__license__ = 'GNU General Public License'
__version__ = '1.0'
22
__email__ = 'support.bioinfo.genotoul@inra.fr'
23
24
25
26
27
__status__ = 'beta'

import os, uuid, logging, tempfile, re
import datetime, time
from subprocess import call
28
from subprocess import Popen, PIPE
29
from shutil import copyfile, rmtree
Penom Nom's avatar
Penom Nom committed
30
from configparser import ConfigParser, RawConfigParser
31

32
from jflow import seqio
33
34
from ng6.t3MySQLdb import t3MySQLdb
from ng6.config_reader import NG6ConfigReader
Jerome Mariette's avatar
Jerome Mariette committed
35
36
from ng6.utils import Utils
from ng6.exceptions import UnsavedRunError
37

38
39
40
41
42
43
class Run(object):
    """
    Class Run: Define a nG6 run object
    """

    # The directories structure into ng6
44
    DIRECTORIES_STRUCTURE = "run"
Jerome Mariette's avatar
Jerome Mariette committed
45
    SEQUENCERS = ["454", "454 GS FLX Titanium", "454 GS FLX+", "454 GS FLX XL+", "HiSeq 2000", "MiSeq"]
46
47

    def __init__(self, name, date, species, data_nature, type, description, sequencer, space_id="default", id = None ):
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
        """
        Build a Run object
          @param name        : the run name
          @param date        : the run data in a date format
          @param species     : the species used in the run
          @param data_nature : the data nature used in the run
          @param type        : the type of run
          @param description : the run description
          @param sequencer   : the sequencer used
          @param run_id      : the id of the run, if None a new one will be created
        """
        self.name = name
        self.date = date
        self.species = species
        self.data_nature = data_nature
        self.type = type
        self.description = description
        self.sequencer = sequencer
Penom Nom's avatar
Penom Nom committed
66
        self.__samples = None
67
        self.raw_files = []
68
        self.admin_login = None
69
        self.space_id = space_id
Celine Noirot's avatar
Celine Noirot committed
70
71
        self.retention_date = None
        
72
73
74
        # Set the temp folder to the ng6 temp folder
        ng6conf = NG6ConfigReader()
        tempfile.tempdir = ng6conf.get_tmp_directory()
75
        if not os.path.isdir(tempfile.tempdir):
Penom Nom's avatar
Penom Nom committed
76
            os.makedirs(tempfile.tempdir, 0o751)
77

78
79
80
81
82
83
84
85
86
        if id != None : # If not a new run
            self.id = id
            t3mysql = t3MySQLdb()
            self.directory = t3mysql.select_run_directory(id)
        else :
            # First find out a uniq id
            uniq = False
            directory_name = uuid.uuid4().hex[:9]
            while True:
87
88
89
90
                save_dir = os.path.join(ng6conf.get_save_directory(), \
                            ng6conf.get_space_directory(self.space_id), self.DIRECTORIES_STRUCTURE, directory_name)
                work_dir = os.path.join(ng6conf.get_work_directory(), ng6conf.get_space_directory(self.space_id), \
                        self.DIRECTORIES_STRUCTURE, directory_name)
ckuchly's avatar
ckuchly committed
91

92
93
94
                if not os.path.isdir(save_dir) and not os.path.isdir(work_dir):
                    break
                directory_name = uuid.uuid4().hex[:9]
95

96
            # Then make directories
Penom Nom's avatar
Penom Nom committed
97
            os.makedirs(work_dir, 0o751)
98
            self.directory = "/" + os.path.join(ng6conf.get_space_directory(self.space_id),self.DIRECTORIES_STRUCTURE, directory_name)
Celine Noirot's avatar
Celine Noirot committed
99
            self.retention_date = ng6conf.get_retention_date(self.space_id)
100

101
    def process_raw_files(self, total_size, total_seq):
Jerome Mariette's avatar
Jerome Mariette committed
102
103
        """
        Copy, archive and compress the files list to the run. And add info to the run
104
          @param mode: can be none, gz, bz2, tar.gz and tar.bz2
Jerome Mariette's avatar
Jerome Mariette committed
105
        """
Gerald Salin's avatar
Gerald Salin committed
106
        logging.getLogger("Run").debug("process_raw_files. set_full_size()")
107
108
        self.set_nb_sequences(total_seq)
        self.set_full_size(total_size)
109
        logging.getLogger("Run").debug("process_raw_files. set_full_size("+str(total_size) + ")")
Gerald Salin's avatar
Gerald Salin committed
110
111
        logging.getLogger("Run").debug("process_raw_files. set_nb_sequences("+str(total_seq) + ")")

112

113
    def archive_files(self, files, mode, archive_name="ng6_archive.tar", delete=False):
114

115
116
117
118
119
120
        """
        Copy, archive and compress the files list to the run. Files can then be downloaded and gave back by
        the script ng6run2ergatis.
          @param files: the files to archive
          @param mode: can be none, gz, bz2, tar.gz and tar.bz2
          @param archive_name: the archive name if tar is requested
121
          @param delete: delete files
122
        """
123
124
        # create workdir if it does not exists
        if not os.path.isdir(self.__get_work_directory()) :
Penom Nom's avatar
Penom Nom committed
125
            os.makedirs(self.__get_work_directory(), 0o751)
126

127
128
129
130
131
        # First handle if only one file
        if len(files) == 1 and mode == "tar.gz":
            mode = "gz"
        elif len(files) == 1 and mode == "tar.bz2":
            mode = "bz2"
132

133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
        if mode == "none":
            for file in files:
                if os.path.isfile(file):
                    copyfile(file, os.path.join(self.__get_work_directory(), os.path.basename(file)))
                # Finaly try to delete the original file if asked to do so
                try:
                    if delete:
                        os.remove(file)
                except:
                    pass

        elif mode == "gz":
            for file in files:
                Utils.gzip(file, self.__get_work_directory(), delete)

        elif mode == "bz2":
            for file in files:
                Utils.bz2(file, self.__get_work_directory(), delete)

        elif mode == "tar.gz":
            Utils.tar_files(files, os.path.join(self.__get_work_directory(), archive_name), delete)
            Utils.gzip(os.path.join(self.__get_work_directory(), archive_name), self.__get_work_directory(), True)

        elif mode == "tar.bz2":
            Utils.tar_files(files, os.path.join(self.__get_work_directory(), archive_name), delete)
            Utils.bz2(os.path.join(self.__get_work_directory(), archive_name), self.__get_work_directory(), True)
159
160
161



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
    def set_nb_sequences(self, nb_seqs):
        """
        Sets the number of sequences to nb_seqs
          @param nb_seqs : the run number of sequences
        """
        if self.id != None:
            t3mysql = t3MySQLdb()
            t3mysql.update_run_info(self.id, nb_sequences=nb_seqs)
        else :
            raise UnsavedRunError()

    def set_full_size(self, full_size):
        """
        Sets the number of nucleotides in all sequences
          @param full_size : the number of nucleotides
        """
        if self.id != None :
            t3mysql = t3MySQLdb()
            t3mysql.update_run_info(self.id, full_seq_size=full_size)
        else :
            raise UnsavedRunError()

    def get_analysis(self):
        """
Jerome Mariette's avatar
Jerome Mariette committed
186
        Return a table of Analysis object that belongs to the run
187
        """
Jerome Mariette's avatar
Jerome Mariette committed
188
        from ng6.analysis import Analysis
189
190
        t3mysql = t3MySQLdb()
        analysis = []
Jerome Mariette's avatar
Jerome Mariette committed
191
192
193
194
        analysis_ids = t3mysql.get_run_analysis_ids(self.id)
        # For each analysis
        for analysis_id in analysis_ids:
            analysis.append(Analysis.get_from_id(analysis_id))
195
196
197
198
199
200
201
202
203
204
205
        return analysis

    def sync(self):
        """
        Synchronyze data related to the Run between temporary folder and long term storage folder.
        """
        logging.getLogger("Run.sync").debug("Synchronizing run id=" + str(self.id) + " from " + self.__get_work_directory() + " to " + self.__get_save_directory())
        if self.id == None :
            raise UnsavedRunError()
        # First sync all analysis
        try :
Jerome Mariette's avatar
Jerome Mariette committed
206
207
            for my_analysis in self.get_analysis():
                my_analysis.sync()
Penom Nom's avatar
Penom Nom committed
208
        except Exception as err:
Jerome Mariette's avatar
Jerome Mariette committed
209
210
            logging.getLogger("Run.sync").error("Error when synchronizing analysis that belongs to run id=" + str(self.id))
            raise Exception(str(err))
Jerome Mariette's avatar
Jerome Mariette committed
211
        # If a sync is required
Penom Nom's avatar
Penom Nom committed
212
        if os.path.exists(self.__get_work_directory()) and len(os.listdir(self.__get_work_directory())):
Jerome Mariette's avatar
Jerome Mariette committed
213
214
            # Finally the Run itself
            try :
215
216
217
                # not created for updates
                if not  os.path.isdir(self.__get_save_directory()) :
                    # First make the save directory
218
                    os.makedirs(self.__get_save_directory(), 0o751)
Jerome Mariette's avatar
Jerome Mariette committed
219
220
221
222
223
224
225
226
227
228
                if os.path.exists(self.__get_work_directory()) and self.__get_work_directory() != self.__get_save_directory():
                    cmd = "rsync -avh --remove-sent-files "+ self.__get_work_directory() + "/ " + self.__get_save_directory()
                    logging.getLogger("Run.sync").debug(cmd)
                    retcode = call(["rsync", "-avh", "--remove-sent-files", self.__get_work_directory() + "/", self.__get_save_directory()], shell=False)
                    if retcode < 0:
                        logging.getLogger("Run.sync").error("Error when synchronizing run id=" + str(self.id) + "(retcode=" + str(retcode) + ")")
                    else:
                        try: os.rmdir(self.__get_work_directory())
                        except: pass
                        logging.getLogger("Run.sync").debug("Synchronization done for run id=" + str(self.id) + "(retcode=" + str(retcode) + ")")
Jerome Mariette's avatar
Jerome Mariette committed
229
230
                # update the storage size
                t3mysql = t3MySQLdb()
231
                t3mysql.update_run_info(self.id, storage_size=Utils.get_storage_size(self.__get_save_directory()))
Penom Nom's avatar
Penom Nom committed
232
            except Exception as err:
Jerome Mariette's avatar
Jerome Mariette committed
233
                raise Exception(str(err))
234

Jerome Mariette's avatar
Jerome Mariette committed
235
    def add_analysis(self, my_analysis):
236
        """
Jerome Mariette's avatar
Jerome Mariette committed
237
238
        Add the analysis to the run
          @param my_analysis   : the analysis to add
239
240
241
        """
        if self.id != None :
            # First define the run
Jerome Mariette's avatar
Jerome Mariette committed
242
            analysis_id = my_analysis.save()
243
            t3mysql = t3MySQLdb()
Jerome Mariette's avatar
Jerome Mariette committed
244
            t3mysql.add_analysis_to_run(self.id, analysis_id)
245
246
247
248
249
250
251
252
253
        else :
            raise UnsavedRunError()

    def get_directory_path(self):
        """
        Return the full path to the Run directory
        """
        return self.__get_save_directory()

Penom Nom's avatar
Penom Nom committed
254
255
256
257
258
259
    def get_samples(self):
        '''
            return a list of samples object
        '''
        if self.__samples :
            return self.__samples
260
        else :
261
            t3mysql = t3MySQLdb()
Penom Nom's avatar
Penom Nom committed
262
263
264
            results = t3mysql.select_run_samples(self.id)
            samples = []
            for res in results :
Penom Nom's avatar
Penom Nom committed
265
266
267
268
269
270
271
272
273
274
275
276
277
278
                t = dict(reads1 = [], reads2 = [])
                for k in t.keys() :
                    for f in res[k].split(',') :
                        fpath = f
                        if os.path.isdir(self.__get_work_directory()) :
                            fpath = self.__get_work_directory() + "/" + fpath
                        elif os.path.isdir(self.__get_save_directory()) :
                            fpath = self.__get_save_directory() + "/" + fpath
                        if not os.path.isfile(fpath):
                            raise Exception("The sample read file %s cannot be retrieved either in the work or save directory for the run %s " % fpath, self.id)
                        if k == 'reads1' :
                            reads1.append(fpath)
                        else :
                            reads2.append(fapth)
279
280

                samples.append(Sample( res['sample_id'], t['reads1'], reads2 = t['reads2'], name = res['name'], description = res['description'],
Penom Nom's avatar
Penom Nom committed
281
282
                                       type = res['type'], insert_size = res['insert_size'], species = res['species'], nb_sequence = res['nb_sequence'] ))
            return samples
283
284


Penom Nom's avatar
Penom Nom committed
285
286
287
288
289
290
291
    def add_samples(self, samples):
        '''
            Add a list of samples to the run
            @param samples: the list of samples object to add
        '''
        self.__samples = samples
        if self.id != None :
292
293
            # Update database
            t3mysql = t3MySQLdb()
Penom Nom's avatar
Penom Nom committed
294
295
296
297
298
299
300
            for spo in self.__samples:
                reads1 = []
                reads2 = []
                if spo.reads1:
                    reads1 = [ os.path.basename(ff) for ff in spo.reads1 ]
                if spo.reads2 :
                    reads2 = [ os.path.basename(ff) for ff in spo.reads2 ]
301
302
303

                uid = t3mysql.add_sample_to_run(self.id, sample_id = spo.sample_id, reads1 = reads1, reads2 = reads2, name=spo.name,
                                          description = spo.description, type = spo.type, insert_size = spo.insert_size,
Penom Nom's avatar
Penom Nom committed
304
305
                                          species = spo.species, nb_sequences = spo.nb_sequences)
                spo.id = uid
306

307
308
309
310
311
312
    def get_config_file(self):
        """
        Return a Run config file
          @return : path the the config file
        """
        t3mysql = t3MySQLdb()
313

314
315
316
317
318
319
320
321
322
323
324
325
        # First select run
        [name, date, species, data_nature, type, description, sequencer] = t3mysql.select_run(self.id)
        config = RawConfigParser()
        config.add_section("Run")
        config.set("Run", "run_id", self.id)
        config.set("Run", "name", name)
        config.set("Run", "date", date)
        config.set("Run", "species", species)
        config.set("Run", "data_nature", data_nature)
        config.set("Run", "type", type)
        config.set("Run", "description", description)
        config.set("Run", "sequencer", sequencer)
326

327
328
329
330
331
        # Then run samples
        samples = t3mysql.select_run_samples(self.id)
        if len(samples.keys()) > 0 : config.add_section("MIDS")
        for sample in samples.keys():
            config.set("MIDS", sample, samples[sample])
332

333
334
335
336
        config_path = tempfile.NamedTemporaryFile(suffix=".cfg").name
        config.write(open(config_path,'w'))
        return config_path

337
    def get_run_files(self, pattern = "", unzip=True):
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
        """
        Return list of file corresponding to the pattern
          @param file  : the pattern of files to return
          @param unzip : true for unzip files returned with pattern
          @return      : list of file.
        """
        list = []
        for file in os.listdir(self.__get_save_directory()) :
            # If it fits criteria
            if pattern != "":
                if unzip:
                    if re.search("^" + pattern + "\.gz$", file) or re.search("^" + pattern + "\.zip$", file):
                        tmp_dir = tempfile.mkdtemp()
                        list.append(Utils.gunzip(os.path.join(self.__get_save_directory(), file), tmp_dir))
                    elif re.search("^" + pattern + "\.bz2$", file) or re.search("^" + pattern + "\.bz$", file):
                        tmp_dir = tempfile.mkdtemp()
                        list.append(Utils.uncompress_bz2(os.path.join(self.__get_save_directory(), file), tmp_dir))
                    elif re.search("^" + pattern + "$", file):
                        list.append(os.path.join(self.__get_save_directory(), file))
                else:
                    if re.search("^" + pattern + "$", file):
                        list.append(os.path.join(self.__get_save_directory(), file))
            else:
                list.append(file)
        return list

    def save(self):
        """
        Creates the run config file and save it in the run directory
        """
        # First add the run into the database
        t3mysql = t3MySQLdb()
        self.id = t3mysql.add_run(self.name, self.date, self.directory, self.species, self.data_nature,
Celine Noirot's avatar
Celine Noirot committed
371
                                  self.type, 0, 0, self.description, self.sequencer, self.admin_login, self.retention_date)
372
373
374
375
376
377
378
        return self.id

    def __get_work_directory(self):
        """
        Return the full path of the run directory into the work dir
        """
        ng6conf = NG6ConfigReader()
379
        return ng6conf.get_work_directory() + self.directory
380

381
382
383
384
385
    def get_work_directory(self):
        """
        Return the full path of the run directory into the work dir
        """
        ng6conf = NG6ConfigReader()
386
        return self.__get_work_directory()
387

388
389
390
391
392
    def __get_save_directory(self):
        """
        Return the full path of the run directory into the save dir
        """
        ng6conf = NG6ConfigReader()
393
        return ng6conf.get_save_directory() + self.directory
394
395
396
397
398
399
400

    @staticmethod
    def get_from_id (id):
        """
        Return a run object specified by its id
          @param id : the run id
        """
401
        my_run=None
402
403
404
        try:
            t3mysql = t3MySQLdb()
            [name, date, species, data_nature, type, description, sequencer] = t3mysql.select_run(id)
405
            my_run = Run(name, date, species, data_nature, type, description, sequencer, id=id)
Penom Nom's avatar
Penom Nom committed
406
            # reload samples
407
408
        except :
            logging.getLogger("Run.get_from_id").debug("Impossible to build run id=" + str(id))
409
410
411
412
413
414
415
        try :
            my_run.__samples = None
            my_run.__samples = my_run.get_samples()
        except :
            logging.getLogger("Run.get_from_id").debug("Impossible to get samples for run id=" + str(id))
        return my_run

416
417
418
419
420
421
422
423
424
425
426
427
428
429
430

    @staticmethod
    def get_from_config (config_file):
        """
        Return a run object specified by its config file
          @param config_file : the run config file
        """
        try:
            reader = ConfigParser()
            reader.read(config_file)
            run_id = reader.get('Run', 'run_id')
            my_run = Run.get_from_id(run_id)
            return my_run
        except :
            pass
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
    
    def set_retention(self, retention):
        """
        Updates a run's retention field
          @param retention : the new retention value, in (unix) epoch timestamp format
        """
        if self.id != None :
            t3mysql = t3MySQLdb()
            t3mysql.set_run_retention_from_epoch_timestamp(self.id, retention)
        else :
            raise UnsavedRunError()
    
    def set_directory(self, directory):
        """
        Updates a run's directory field
          @param directory : the new directory path
        """
        if self.id != None :
            t3mysql = t3MySQLdb()
            t3mysql.update_fields('tx_nG6_run', str(self.id), ['directory'], [str(directory)] )
        else :
            raise UnsavedRunError()
        
    def get_creation_date(self):
        """
        Returns a run's crdate value
        """
        if self.id != None :
            t3mysql = t3MySQLdb()
            result = t3mysql.get_run_creation_date(self.id)
            return next(iter(result))
        else :
            raise UnsavedRunError()
    
    def get_directory(self):
        """
        Returns a run's directory
        """
        if self.id != None :
            t3mysql = t3MySQLdb()
            return t3mysql.select_run_directory(self.id)
        else :
473
474
475
476
477
478
479
480
481
482
483
            raise UnsavedRunError()
        
    def change_space (self, space_id ):
        ng6conf = NG6ConfigReader()
        
        old_path = ng6conf.get_save_directory() + self.directory
        directory_name = os.path.split(old_path)[-1]
        new_relative_path = os.path.join(ng6conf.get_space_directory(space_id), self.DIRECTORIES_STRUCTURE, directory_name)
        
        new_absolute_path =  os.path.join(ng6conf.get_save_directory(), new_relative_path )
        
484
485
486
487
488
        #We create the /run directory if it's missing
        path_to_run_dir = os.path.join(ng6conf.get_save_directory(),ng6conf.get_space_directory(space_id),self.DIRECTORIES_STRUCTURE)
        if not os.path.isdir(path_to_run_dir):
            os.mkdir(path_to_run_dir,0o755)
        
489
490
491
492
493
494
        str_cmd = ""
        retcode = -1
        if str(old_path) != str(new_absolute_path):
            [retcode, str_cmd] = Utils.rsync_getcmd(old_path,new_absolute_path)
            if retcode != 0 :   
                raise Exception("Error while trying to rsync " + old_path + " to " + new_absolute_path + "\n" +
495
                                "Command : " + str_cmd + "\n" + "Error code : " + str(retcode) + "\n")
496
        else:
497
            str_cmd = "Source and destination directories are the same : " + old_path + " , ignored.\n"
498
499
500
501
502
503
504
505
        
        new_retention = ng6conf.get_retention_date(space_id, self.date)
        
        t3mysql = t3MySQLdb()
        t3mysql.update_fields('tx_nG6_run', str(self.id), ['directory','retention_date'], [str("/"+new_relative_path),new_retention] )
            
        return [retcode, str_cmd]