RAPPEL : Opération de maintenance > ForgeMIA indisponible le 20 Janvier entre 7h et 12h

ng6workflow.py 38.9 KB
Newer Older
Jerome Mariette's avatar
Jerome Mariette committed
1
#
Penom Nom's avatar
Penom Nom committed
2
# Copyright (C) 2015 INRA
3
#
Jerome Mariette's avatar
Jerome Mariette committed
4
5
6
7
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
8
#
Jerome Mariette's avatar
Jerome Mariette committed
9
10
11
12
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
13
#
Jerome Mariette's avatar
Jerome Mariette committed
14
15
16
17
18
19
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import inspect
import os
20
import re
Jerome Mariette's avatar
Jerome Mariette committed
21
import sys
Jerome Mariette's avatar
Jerome Mariette committed
22
import pickle
Jerome Mariette's avatar
Jerome Mariette committed
23
import datetime
24
import argparse
25
import xml.etree.ElementTree as ET
26
import logging
Penom Nom's avatar
Penom Nom committed
27
from jflow.workflow import Workflow
28
from jflow.utils import display_error_message
Penom Nom's avatar
Penom Nom committed
29
from jflow.parameter import *
Jerome Mariette's avatar
Jerome Mariette committed
30

31
from ng6.t3MySQLdb import t3MySQLdb
Jerome Mariette's avatar
Jerome Mariette committed
32
33
from ng6.project import Project
from ng6.run import Run
34
35
from ng6.sample import Sample
from ng6.utils import Utils
ckuchly's avatar
ckuchly committed
36
from ng6.config_reader import NG6ConfigReader
Jerome Mariette's avatar
Jerome Mariette committed
37

38
39
class BasicNG6Workflow (Workflow):

Penom Nom's avatar
Penom Nom committed
40
41
    def __init__(self, args={}, id=None, function= "process"):
        Workflow.__init__(self, args, id, function)
Penom Nom's avatar
Penom Nom committed
42
        self.add_parameter("admin_login", "Who is the project administrator", required = True, type = 'ng6userlogin', display_name="Admin login")
Celine Noirot's avatar
Celine Noirot committed
43

44
    def add_component(self, component_name, args=[], kwargs={}, component_prefix="default", parent=None, addto="run"):
Jerome Mariette's avatar
Jerome Mariette committed
45
        # first build and check if this component is OK
Penom Nom's avatar
Penom Nom committed
46
        if component_name in self.internal_components or component_name in self.external_components:
47

Penom Nom's avatar
Penom Nom committed
48
            if component_name in self.internal_components :
Penom Nom's avatar
Penom Nom committed
49
                my_pckge = __import__(self.internal_components[component_name], globals(), locals(), [component_name])
Penom Nom's avatar
Penom Nom committed
50
51
52
                # build the object and define required field
                cmpt_object = getattr(my_pckge, component_name)()
                cmpt_object.output_directory = self.get_component_output_directory(component_name, component_prefix)
53
                cmpt_object.set_prefix(component_prefix)
Penom Nom's avatar
Penom Nom committed
54
55
56
57
58
59
                if kwargs: cmpt_object.define_parameters(**kwargs)
                else: cmpt_object.define_parameters(*args)
            # external components
            else :
                cmpt_object = self.external_components[component_name]()
                cmpt_object.output_directory = self.get_component_output_directory(component_name, component_prefix)
60
                cmpt_object.set_prefix(component_prefix)
Penom Nom's avatar
Penom Nom committed
61
62
                # can't use positional arguments with external components
                cmpt_object.define_parameters(**kwargs)
63
64
65
66
67
68
            # if the built object is an analysis
            for derived_class in cmpt_object.__class__.__bases__:
                if derived_class.__name__ == "Analysis":
                    # add parent and project/run information
                    cmpt_object.parent = parent
                    if addto == "project": cmpt_object.project = self.project
Celine Noirot's avatar
Celine Noirot committed
69
                    elif addto == "run": cmpt_object.run = self.runobj
70
71
72
73
                    #We replace the default space_id
                    if cmpt_object.space_id == "default" and cmpt_object.space_id != cmpt_object.run.space_id:
                         cmpt_object.space_id = cmpt_object.run.space_id
                    
Celine Noirot's avatar
Celine Noirot committed
74
                    # link analysis with ots create user
Penom Nom's avatar
Penom Nom committed
75
                    cmpt_object.admin_login = self.admin_login
76
            # there is a dynamic component
77
            if cmpt_object.is_dynamic():
78
79
80
81
                self.dynamic_component_present = True
                # if already init, add the component to the list and check if weaver should be executed
                if self.component_nameids_is_init:
                    # add the component
Penom Nom's avatar
Penom Nom committed
82
                    self.components_to_exec.append(cmpt_object)
83
84
85
86
87
88
89
                    self.components.append(cmpt_object)
                    self._execute_weaver()
                    # update outputs
                    for output in cmpt_object.get_dynamic_outputs():
                        output.update()
                else:
                    if self._component_is_duplicated(cmpt_object):
90
                        raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
91
92
                                            cmpt_object.prefix + " already exist in this pipeline!")
                    self.component_nameids[cmpt_object.get_nameid()] = None
Penom Nom's avatar
Penom Nom committed
93
                    self.components_to_exec = []
94
95
96
97
                    self.components = []
            else:
                if self.component_nameids_is_init:
                    # add the component
Penom Nom's avatar
Penom Nom committed
98
                    self.components_to_exec.append(cmpt_object)
99
100
101
                    self.components.append(cmpt_object)
                elif not self.component_nameids_is_init and not self.dynamic_component_present:
                    if self._component_is_duplicated(cmpt_object):
102
                        raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
103
                                            cmpt_object.prefix + " already exist in this pipeline!")
Penom Nom's avatar
Penom Nom committed
104
                    self.components_to_exec.append(cmpt_object)
105
106
107
                    self.components.append(cmpt_object)
                else:
                    if self._component_is_duplicated(cmpt_object):
108
                        raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
109
110
                                            cmpt_object.prefix + " already exist in this pipeline!")
                    self.component_nameids[cmpt_object.get_nameid()] = None
ckuchly's avatar
ckuchly committed
111

112
            return cmpt_object
Jerome Mariette's avatar
Jerome Mariette committed
113
        else:
Penom Nom's avatar
Penom Nom committed
114
            raise ImportError(component_name + " component cannot be loaded, available components are: {0}".format(
Penom Nom's avatar
Penom Nom committed
115
                                           ", ".join(list(self.internal_components.keys()) + list(self.external_components.keys()))))
116

117
118
119
120
121
    def pre_process(self):
        t3mysql = t3MySQLdb()
        infos = t3mysql.get_user_info(self.admin_login)
        if infos['email'] :
            self.set_to_address(infos['email'])
122

Penom Nom's avatar
Penom Nom committed
123

Penom Nom's avatar
Penom Nom committed
124
class DownloadWorkflow(Workflow):
125
126
127
128
    """
        Base class for download workflows classes
    """
    def __init__(self, args={}, id=None, function= "process"):
Penom Nom's avatar
Penom Nom committed
129
130
        Workflow.__init__(self, args, id, function)
        self.add_parameter("admin_login", "Who is the project administrator", type = 'ng6userlogin', display_name="Admin login")
131
132
133
        self.add_parameter_list('data_id', 'Ids of a run from which rawdata will be retrieved', type = 'existingrun')
        self.add_parameter_list('run_id', 'Ids of run from which all data will be retrieved', type = 'existingrun')
        self.add_parameter_list('analysis_id', 'Ids of analysis to retrieve', type = 'existinganalysis')
Celine Noirot's avatar
Celine Noirot committed
134

135
class NG6Workflow (BasicNG6Workflow):
136

Penom Nom's avatar
Penom Nom committed
137
138
139
    def __init__(self, args={}, id=None, function= "process"):
        BasicNG6Workflow.__init__(self, args, id, function)
        self._add_run_parameters()
140
        self.__add_sample_parameters__()
141

Penom Nom's avatar
Penom Nom committed
142
143
144
        self.samples = []
        self.reads1 = []
        self.reads2 = []
ckuchly's avatar
ckuchly committed
145
        self.index = []
Penom Nom's avatar
Penom Nom committed
146
        self.samples_names = []
147
148
        self.reads1_indexes = []
        self.reads2_indexes = []
149

Penom Nom's avatar
Penom Nom committed
150
    def _add_run_parameters(self):
Penom Nom's avatar
Penom Nom committed
151
        self.add_parameter("project_name", "The project name the run belongs to", required = True, type = 'existingproject', group="Run information")
Penom Nom's avatar
Penom Nom committed
152
153
        self.add_parameter("run_name", "Give a name to your run", flag = "--name", required = True, display_name="Name", group="Run information")
        self.add_parameter("run_description", "Give a description to your run", flag = "--description", required = True, display_name="Description", group="Run information")
Penom Nom's avatar
Penom Nom committed
154
        self.add_parameter("run_date", "When were the data produced", flag = "--date", required = True, type = 'date', display_name="Date", group="Run information")
Penom Nom's avatar
Penom Nom committed
155
156
157
158
        self.add_parameter("data_nature", "Are Sequences cDNA, genomique, RNA, ...", required = True, display_name="Data nature", group="Run information")
        self.add_parameter("sequencer", "Which sequencer produced the data", required = True, display_name="Sequencer", group="Run information")
        self.add_parameter("species", "Which species has been sequenced", required = True, display_name="Species", group="Run information")
        self.add_parameter("run_type", "What type of data is it (1 lane, 1 region)", flag = "--type", required = True, display_name="Type", group="Run information")
159
        self.add_parameter("align_subset_reads", "Align only on subset reads", type=bool, default = False)
160

161
    def __add_sample_parameters__(self):
Celine Noirot's avatar
Celine Noirot committed
162
        self.add_multiple_parameter_list("input_sample", "Definition of a sample", flag="--sample",  group="Sample description") # required = True, # TO CHECK casavaWorkflow required not if casava dir
Penom Nom's avatar
Penom Nom committed
163
164
        self.add_parameter("sample_id", "The uniq identifier of the sample", type="nospacestr", add_to = "input_sample")
        self.add_parameter("sample_name", "A descriptive name for the sample", type="nospacestr", add_to = "input_sample")
Penom Nom's avatar
Penom Nom committed
165
        self.add_parameter("sample_description", "A brief description of the sample", add_to = "input_sample")
166
        self.add_parameter("type", "Read orientation and type", choices = Sample.AVAILABLE_TYPES, default='se', add_to = "input_sample")
Penom Nom's avatar
Penom Nom committed
167
168
        self.add_parameter("insert_size", "Insert size for paired end reads", type = int, add_to = "input_sample")
        self.add_parameter("species", "Species related to this sample", add_to = "input_sample")
Penom Nom's avatar
Penom Nom committed
169
        self.add_parameter_list("metadata", "Add metadata to the sample", type='samplemetadata'  ,add_to = "input_sample")
Penom Nom's avatar
Penom Nom committed
170
171
        self.add_input_file_list("read1", "Read 1 data file path", required = True, add_to = "input_sample")
        self.add_input_file_list("read2", "Read 2 data file path", add_to = "input_sample")
ckuchly's avatar
ckuchly committed
172
        self.add_input_file_list("index",  "Index data file path", add_to = "input_sample")
173

Penom Nom's avatar
Penom Nom committed
174
    def __create_samples__(self):
Penom Nom's avatar
Penom Nom committed
175
        for sd in self.input_sample :
ckuchly's avatar
ckuchly committed
176
            sp_object = Sample( sd['sample_id'], sd['read1'], sd['read2'], sd['index'],name = sd['sample_name'], description = sd['sample_description'], type = sd['type'],
Penom Nom's avatar
Penom Nom committed
177
                               insert_size = sd['insert_size'], species = sd['species'] )
178

179
180
181
            for metadata in sd['metadata'] :
                k, v = metadata.split(':', 2)
                sp_object.add_metadata(k, v)
182
            self.samples.append(sp_object)
183

Penom Nom's avatar
Penom Nom committed
184
    def __preprocess_samples__(self):
Penom Nom's avatar
Penom Nom committed
185
186
187
188
189
190
191
192
193
        samples_ids = []
        pidx = 1
        nidx = 1
        for index, sample in enumerate(self.samples) :
            if sample.name :
                self.samples_names.append(sample.name)
            else :
                sample.name = 'SAMPLE_' + str(nidx)
                nidx += 1
194

Penom Nom's avatar
Penom Nom committed
195
196
            if sample.sample_id :
                if sample.sample_id in samples_ids :
197
198
                    display_error_message( "Sample identifier %s must be uniq ( sample %s ).\n" % sample.sample_id, sample.name )
            # add an automatic id for samples
Penom Nom's avatar
Penom Nom committed
199
200
201
202
            else :
                sample.sample_id = 'sample_' + str(pidx)
                pidx += 1
            samples_ids.append(sample.sample_id)
203

204
205
206
            for rfile in sample.reads1 :
                self.reads1_indexes.append(sample.sample_id)
                self.reads1.append(rfile)
207

208
209
210
            for rfile in sample.reads2 :
                self.reads2_indexes.append(sample.sample_id)
                self.reads2.append(rfile)
ckuchly's avatar
ckuchly committed
211
212
213
                
            for rfile in sample.index :
                self.index.append(rfile)
214

Penom Nom's avatar
Penom Nom committed
215
216
        if len(self.samples_names) != 0 :
            if len(self.samples_names) != len (self.samples) :
217
                display_error_message( "All samples must have a defined sample name" )
218

Penom Nom's avatar
Penom Nom committed
219
    def get_all_reads(self, type = None):
Penom Nom's avatar
Penom Nom committed
220
        if type == 'read1' :
Penom Nom's avatar
Penom Nom committed
221
            return self.reads1
Penom Nom's avatar
Penom Nom committed
222
        elif type == 'read2' :
Penom Nom's avatar
Penom Nom committed
223
            return self.reads2
ckuchly's avatar
ckuchly committed
224
225
226
        elif type == 'index' : 
            return self.index
        return self.reads1 + self.reads2 + self.index
227

228
229
230
231
232
233
    def get_files_index(self, type = None):
        if type == 'read1' :
            return self.reads1_indexes
        elif type == 'read2' :
            return self.reads2_indexes
        return self.reads1_indexes + self.reads2_indexes
234

235
    def is_paired_end(self):
Penom Nom's avatar
Penom Nom committed
236
        return len(self.reads2) > 0
237

238
    def pre_process(self):
239
        BasicNG6Workflow.pre_process(self)
240
        # start from an existing project
Penom Nom's avatar
Penom Nom committed
241
242
        self.project = Project.get_from_name(self.project_name)
        self.metadata.append("project_id="+str(self.project.id))
243
        # if user is not allowed to add data on project (is not admin)
Penom Nom's avatar
Penom Nom committed
244
        if self.project is not None and not self.project.is_admin(self.admin_login):
245
            display_error_message( "The user login '" + self.admin_login + "' is not allowed to add data on project '" + self.project.name + "'.\n" )
246

247
        # build the run
248
        self.runobj = Run(self.run_name, self.run_date, self.species, self.data_nature,
249
                          self.run_type, self.run_description, self.sequencer, self.project.space_id)
Penom Nom's avatar
Penom Nom committed
250
        self.runobj.admin_login=self.admin_login
251
252
253
        # then add the run to the project
        self.project.add_run(self.runobj)
        self.metadata.append("run_id="+str(self.runobj.id))
254

Penom Nom's avatar
Penom Nom committed
255
        self.__create_samples__()
Penom Nom's avatar
Penom Nom committed
256
        self.__preprocess_samples__()
Penom Nom's avatar
Penom Nom committed
257
258
        # add samples to run
        self.runobj.add_samples(self.samples)
259

260
    def post_process(self):
Jerome Mariette's avatar
Jerome Mariette committed
261
        # once everything done, sync directories
262
        logging.getLogger("ng6").debug("post_process enter")
Jerome Mariette's avatar
Jerome Mariette committed
263
        if self.runobj:
264
            logging.getLogger("ng6").debug("post_process enter self.runobj.sync")
Jerome Mariette's avatar
Jerome Mariette committed
265
            self.runobj.sync()
Jerome Mariette's avatar
Jerome Mariette committed
266
        elif self.project:
267
            logging.getLogger("ng6").debug("post_process enter self.project.sync")
Jerome Mariette's avatar
Jerome Mariette committed
268
            self.project.sync()
269

270

271
def get_files_from_casava(casava_directory, project_name, lane_number):
Penom Nom's avatar
Penom Nom committed
272
273
274
275
276
277
    """
        Retrieve all fastq files of a specific project and lane number from a given casava directory
        @param casava_directory : path to CASAVA output directory
        @param project_name : project name
        @param lane_number : lane number
    """
278

279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
    def bcl2fastq_18(directory, pname, lane):
        """bcl2fastq <= 1.8"""
        files = []
        with open(os.path.join(directory, "SampleSheet.mk")) as fh :
            subdirs_list = []
            for line in fh :
                if line.startswith("l" + str(lane) + "_SUBDIRS"):
                    parts = line.strip().split(":=")
                    subdirs_list = parts[1].split(" ")
            # parse samples
            for subdir in subdirs_list:
                # filter on project name
                if re.match("Project_" + pname + "/Sample_.+", subdir) or subdir.startswith("Undetermined_indices"):
                    for file in os.listdir(directory + "/" + subdir):
                        filepath = directory + "/" + subdir + "/" + file
                        if file.endswith(".fastq.gz") and re.search(".*_L00" + str(lane) + "_.*", file):
                            files.append(filepath);
        return files
ckuchly's avatar
ckuchly committed
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
        
    def bcl2fastq_10X(directory, pname, lane):
        """longranger"""
        files = []
        with open(os.path.join(directory, "SampleSheet_10X.mk")) as fh :
            subdirs_list = []
            for line in fh :
                if line.startswith("l" + str(lane) + "_SUBDIRS"):
                    parts = line.strip().split(":=")
                    subdirs_list = parts[1].split(" ")
            # parse samples
            for subdir in subdirs_list:
                # filter on project name
                if re.match("Project_" + pname + "/Sample_.+", subdir) or subdir.startswith("Undetermined_indices"):
                    for file in os.listdir(directory + "/" + subdir):
                        filepath = directory + "/" + subdir + "/" + file
                        if file.endswith(".fastq.gz") and re.search(".*_L00" + str(lane) + "_.*", file):
                            files.append(filepath);
        return files
316

317
318
319
320
321
322
    def bcl2fastq_216(directory, pname, lane):
        """bcl2fastq >= 1.9"""
        files = []
        tree = ET.parse( os.path.join( directory, 'Stats', 'DemultiplexingStats.xml'))
        root = tree.getroot()
        project = root.find(".//Project[@name='%s']"%pname)
323

324
        if project is not None :
325
326
            project_files = os.listdir(directory + "/" + pname)

327
328
329
330
331
332
333
334
335
336
337
            for sample in project.findall("./Sample") :
                if sample.get('name') != 'all' :
                    for barcode in sample.findall('./Barcode'):
                        if barcode.get('name') != 'all':
                            lnum = int(barcode.find('Lane').get('number'))
                            if lnum == lane :
                                fileregexp = '%s_S\d_L%03d_'%(sample.get('name'), lnum)
                                for pfile in project_files :
                                    if re.match(fileregexp,pfile) :
                                        files.append(os.path.join(directory,pname,pfile))
        return files
338

339
340
341
342
    if os.path.exists(os.path.join(casava_directory, "SampleSheet.mk")) :
        return bcl2fastq_18(casava_directory, project_name, lane_number)
    elif os.path.exists(os.path.join( casava_directory, 'Stats', 'DemultiplexingStats.xml')) :
        return bcl2fastq_216(casava_directory, project_name, lane_number)
ckuchly's avatar
ckuchly committed
343
344
    elif os.path.exists(os.path.join(casava_directory, "SampleSheet_10X.mk")) :
        return bcl2fastq_10X(casava_directory, project_name, lane_number)
345
346
347



348
class CasavaNG6Workflow(NG6Workflow):
349

Penom Nom's avatar
Penom Nom committed
350
351
    def __init__(self, args={}, id=None, function= "process"):
        NG6Workflow.__init__(self, args, id, function)
352

Penom Nom's avatar
Penom Nom committed
353
354
355
        self.group_prefix = None
        self.undetermined_reads1 = []
        self.undetermined_reads2 = []
ckuchly's avatar
ckuchly committed
356
        self.undetermined_index = []
357
        self.log_files = []
358
        self.is_casava = False
ckuchly's avatar
ckuchly committed
359
        self.is_10Xcasava = False
360

361
    def __add_sample_parameters__(self):
362
363
        self.add_multiple_parameter('casava', 'Provide the options to retrieve samples from a CASAVA directory', group="Sample description")
        self.add_input_directory("directory", "Path to the CASAVA directory to use", required=True, get_files_fn=get_files_from_casava, add_to="casava" )
Celine Noirot's avatar
Celine Noirot committed
364
        self.add_parameter("lane", "The lane number to be retrieved from the casava directory",  type='int', add_to="casava") #required=True,
365
        self.add_parameter('project', 'The name of the project to retrieve in casava directory. The default name is the name of the nG6  project',add_to="casava")
366
        self.add_parameter('mismatch_index', 'Set this value to true if the index sequence in the sample fastq files allows at least 1 mismatch',
367
                           type ='bool', add_to="casava")
368
        self.add_parameter_list('select_sample_id', 'The ids of the sample that will be selected in the SampleSheet.mk file. By default all samples are selected.', add_to="casava",
369
                           display_name = 'Selected samples')
370

371
        NG6Workflow.__add_sample_parameters__(self)
372

Celine Noirot's avatar
Celine Noirot committed
373
        #TODO exclude self.add_exclusion_rule("casava", "input_sample")
374

375
        self.add_parameter("compression", "How should the data be compressed once archived", choices= [ "none", "gz", "bz2"], default = "none")
376
        self.add_parameter("keep_reads", "Keep or discard reads which pass the illumina filter. 'all' option will keep all reads", flag = "--keep",
377
378
379
                           choices=[ "pass_illumina_filters", "not_pass_illumina_filters", "all"], default = "pass_illumina_filters")
        self.add_input_file_list("contamination_databank", "Which databank should be used to seek contamination (as to be phiX databank indexed for bwa)")
        self.add_parameter("no_group", "Disables grouping of bases for reads >50bp", type=bool, default = True)
380
381


Penom Nom's avatar
Penom Nom committed
382
    def __create_samples__(self):
383
        """
384
            Create samples object from a casava directory if provided
385
386
387
            @param casava_directory : path to CASAVA output directory
            @param lane_number : files in each sample are sequenced on this lane
        """
388
        logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ entering")
389

390
        if self.casava and self.casava["directory"] and self.casava["lane"] :
391
            logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ self.casava")
392
393
394
395
            self.is_casava = True
            casava_directory = self.casava["directory"]
            lane_number = self.casava["lane"]
            all_samples, all_samples_id = [], []
396
397

            # get the casava project_name
398
399
400
401
            if self.casava["project"] :
                project_name = self.casava["project"]
            else :
                project_name = self.project_name
402

403
404
            project_name = project_name.replace(" ", "_")
            input_files = casava_directory.get_files( project_name, lane_number)
405
            logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ input_files = " + ",".join(input_files))
406
407
            if len(input_files) == 0 :
                raise Exception("Error while parsing casava directory %s, invalid project name '%s' for lane %s"% (casava_directory, project_name, lane_number))
408

409
410
            all_samples, all_samples_id = [], []
            if os.path.exists(os.path.join(casava_directory, "SampleSheet.mk")) :
411

412
                logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ before self._process_casava_18")
413
                all_samples, all_samples_id = self._process_casava_18(casava_directory, project_name, lane_number, input_files)
414
                logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ before self._process_casava_18")
ckuchly's avatar
ckuchly committed
415

416
417
            elif os.path.exists(os.path.join( casava_directory, 'Stats', 'DemultiplexingStats.xml')) :
                all_samples, all_samples_id = self._process_casava_216(casava_directory, project_name, lane_number, input_files)
418

ckuchly's avatar
ckuchly committed
419
420
421
422
423
424
            elif os.path.exists(os.path.join( casava_directory, "SampleSheet_10X.mk")):
                logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ before self._process_casava_10X")
                all_samples, all_samples_id = self._process_casava_10X(casava_directory, project_name, lane_number, input_files)
                self.is_10Xcasava = True
                logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ after self._process_casava_10X")

425
            selected_samples = self.casava['select_sample_id']
ckuchly's avatar
ckuchly committed
426
            logging.getLogger("CasavaNG6Workflow").debug("__create_samples__. all_samples_id = "+", ".join(all_samples_id))
427
428
429
            if selected_samples :
                for sid in selected_samples :
                    assert sid in all_samples_id , "The sample id %s is not in the SampleSheet.mk" % sid
430

Penom Nom's avatar
Penom Nom committed
431
432
            for sample in all_samples :
                if selected_samples :
433
                    if sample.name in selected_samples :
Penom Nom's avatar
Penom Nom committed
434
435
436
                        self.samples.append(sample)
                else :
                    self.samples.append(sample)
437
438
        else :
            NG6Workflow.__create_samples__(self)
439

Penom Nom's avatar
Penom Nom committed
440
441
    def __preprocess_samples__(self):
        NG6Workflow.__preprocess_samples__(self)
ckuchly's avatar
ckuchly committed
442

443
        if self.is_casava:
Penom Nom's avatar
Penom Nom committed
444
            self.group_prefix = list((Utils.get_group_basenames(self.get_all_reads(), "read")).keys())
445
446

    def _process_casava_18(self, casava_directory, project_name, lane_number, input_files):
447
448
        logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 enter")
        logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 casava_directory = " + casava_directory + ", project_name = " + str(project_name))
449
450
        """
            Creates samples from casavadir (<=1.8) using input files
451
452
453
454
            @param casava_directory:
            @param project_name:
            @param lane_number:
            @param input_files:
455
456
457
        """
        all_samples = []
        all_samples_id = []
458

459
460
        # open casava samplesheet again to associate our files with a sample
        with open(os.path.join(casava_directory, "SampleSheet.mk")) as fh :
461
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 SampleSheet.mk exists")
462
463
464
465
466
467
468
469
470
471
472
473
474
            barcodes_list = []
            sample_ids_list = []
            subdirs_list = []
            for line in fh :
                if line.startswith("l" + str(lane_number) + "_BARCODES"):
                    parts = line.strip().split(":=")
                    barcodes_list = [ re.sub( r"[-_\s]+", "", x) for x in  parts[1].split() ]
                elif line.startswith("l" + str(lane_number) + "_SAMPLEIDS" ):
                    parts = line.strip().split(":=")
                    sample_ids_list = parts[1].split(" ")
                elif line.startswith("l" + str(lane_number) + "_SUBDIRS"):
                    parts = line.strip().split(":=")
                    subdirs_list = parts[1].split(" ")
475

476
            assert  len(barcodes_list) == len(sample_ids_list) == len(subdirs_list), "Invalid lane {0} in SampleSheet.mk".format(lane_number)
477
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 SampleSheet.mk parsed")
478
479
480
481
482
483
484
485
486
            # parse samples
            for i in range(len(barcodes_list)):
                sample = {
                    'barcode'                   : barcodes_list[i],
                    'sample_id'                 : sample_ids_list[i],
                    'subdir'                    : subdirs_list[i],
                    'reads1'                    : [],
                    'reads2'                    : []
                }
487

488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
                # filter on project name
                if re.match("Project_" + project_name + "/Sample_.+", sample['subdir']) or sample['subdir'].startswith("Undetermined_indices"):
                    for file in os.listdir(casava_directory + "/" + sample['subdir']):
                        filepath = casava_directory + "/" + sample['subdir'] + "/" + file
                        if file.endswith(".fastq.gz") and re.search(".*_L00" + str(lane_number) + "_.*", file):
                            for idx, iofile in enumerate(input_files) :
                                if iofile == filepath :
                                    if re.search(".*_R1_.*", file):
                                        if not sample['subdir'].startswith("Undetermined_indices"):
                                            sample['reads1'].append(iofile)
                                        else:
                                            self.undetermined_reads1.append(iofile)
                                    if re.search(".*_R2_.*", file):
                                        if not sample['subdir'].startswith("Undetermined_indices"):
                                            sample['reads2'].append(iofile)
                                        else:
                                            self.undetermined_reads2.append(iofile)
                                    input_files.pop(idx)
                                    break

                    if not sample['subdir'].startswith("Undetermined_indices") :
509
                        logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 create sample " + sample['sample_id'])
510
511
512
                        sp_object = Sample(sample['barcode'], sample['reads1'], reads2 = sample['reads2'], name=sample['sample_id'])
                        sp_object.add_metadata('barcode', sample['barcode'])
                        sp_object.add_metadata('is_casava', True)
513

514
515
                        all_samples.append(sp_object)
                        all_samples_id.append(sample['sample_id'])
516
517
518
519
            for file in os.listdir(casava_directory):
                filepath = casava_directory + "/" + file
                if file.endswith(".log"):
                    self.log_files.append(filepath)
Gerald Salin's avatar
Gerald Salin committed
520
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 self.log_files = " + ",".join(self.log_files))
521
522
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 all_samples_id = " + ",".join(all_samples_id))
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 exiting")
ckuchly's avatar
ckuchly committed
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
            
        return all_samples, all_samples_id

    def _process_casava_10X(self,casava_directory, project_name, lane_number, input_files):
        logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_10X enter")
        logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_10X casava_directory = " + casava_directory + ", project_name = " + str(project_name))
        """
            Creates samples from casavadir from longranger demultiplexing
            @param casava_directory:
            @param project_name:
            @param lane_number:
            @param input_files:
        """
        all_samples = []
        all_samples_id = []

        # open casava samplesheet again to associate our files with a sample
        with open(os.path.join(casava_directory, "SampleSheet_10X.mk")) as fh :
            barcodes_list = []
            sample_ids_list = []
            subdirs_list = []
            for line in fh :
                if line.startswith("l" + str(lane_number) + "_BARCODES"):
                    parts = line.strip().split(":=")
                    barcodes_list = [ re.sub( r"[_\s]+", "", x) for x in  parts[1].split() ]
                elif line.startswith("l" + str(lane_number) + "_SAMPLEIDS" ):
                    parts = line.strip().split(":=")
                    sample_ids_list = parts[1].split(" ")
                elif line.startswith("l" + str(lane_number) + "_SUBDIRS"):
                    parts = line.strip().split(":=")
                    subdirs_list = parts[1].split(" ")

ckuchly's avatar
ckuchly committed
555
            assert  len(barcodes_list) == len(sample_ids_list) == len(subdirs_list), "Invalid lane {0} in SampleSheet_10X.mk".format(lane_number)
ckuchly's avatar
ckuchly committed
556
557
558
559
560
561

            cfg_reader = NG6ConfigReader()
            indexs = cfg_reader.get_10X_indexs()
            
            # parse samples
            for i in range(len(barcodes_list)):
ckuchly's avatar
ckuchly committed
562
                
ckuchly's avatar
ckuchly committed
563
564
565
566
                if barcodes_list[i] == 'Undetermined' :
                    barcode = 'Undetermined'
                else : 
                    barcode = indexs[barcodes_list[i]]
ckuchly's avatar
ckuchly committed
567

ckuchly's avatar
ckuchly committed
568
569
570
571
572
573
574
575
                sample = {
                    'barcode'                   : barcode,
                    'sample_id'                 : sample_ids_list[i],
                    'subdir'                    : subdirs_list[i],
                    'reads1'                    : [],
                    'reads2'                    : [],
                    'index'                      : []
                }
ckuchly's avatar
ckuchly committed
576
                
ckuchly's avatar
ckuchly committed
577
                # filter on project name
ckuchly's avatar
ckuchly committed
578

ckuchly's avatar
ckuchly committed
579
                if re.match("Project_" + project_name + "/Sample_.+", sample['subdir']) or sample['subdir'].startswith("Undetermined_indices"):
ckuchly's avatar
ckuchly committed
580

ckuchly's avatar
ckuchly committed
581
                    for file in os.listdir(casava_directory + "/" + sample['subdir']):
ckuchly's avatar
ckuchly committed
582

ckuchly's avatar
ckuchly committed
583
                        filepath = casava_directory + "/" + sample['subdir'] + "/" + file
ckuchly's avatar
ckuchly committed
584
                        
ckuchly's avatar
ckuchly committed
585
586
587
                        if file.endswith(".fastq.gz") and re.search(".*_L00" + str(lane_number) + "_.*", file):
                            for idx, iofile in enumerate(input_files) :
                                if iofile == filepath :
ckuchly's avatar
ckuchly committed
588

ckuchly's avatar
ckuchly committed
589
                                    if re.search(".*_R1_.*", file):
ckuchly's avatar
ckuchly committed
590

ckuchly's avatar
ckuchly committed
591
592
593
594
595
596
597
                                        if not sample['subdir'].startswith("Undetermined_indices"):
                                            sample['reads1'].append(iofile)
                                        else:
                                            self.undetermined_reads1.append(iofile)
                                    if re.search(".*_R2_.*", file):
                                        if not sample['subdir'].startswith("Undetermined_indices"):
                                            sample['reads2'].append(iofile)
ckuchly's avatar
ckuchly committed
598

ckuchly's avatar
ckuchly committed
599
600
601
602
                                        else:
                                            self.undetermined_reads2.append(iofile)
                                    if re.search(".*_I1_.*", file):
                                        if not sample['subdir'].startswith("Undetermined_indices"):
ckuchly's avatar
ckuchly committed
603
                                            
ckuchly's avatar
ckuchly committed
604
605
606
607
608
                                            sample['index'].append(iofile)
                                        else:
                                            self.undetermined_index.append(iofile)
                                    input_files.pop(idx)
                                    break
609

ckuchly's avatar
ckuchly committed
610
611
612
613
614
615
616
617
618
                    if not sample['subdir'].startswith("Undetermined_indices") :
                        sp_object = Sample(sample['barcode'], sample['reads1'], reads2 = sample['reads2'], index = sample['index'], name=sample['sample_id'])
                        sp_object.add_metadata('barcode', sample['barcode'])
                        sp_object.add_metadata('is_casava', True)

                        all_samples.append(sp_object)
                        all_samples_id.append(sample['sample_id'])
            for file in os.listdir(casava_directory):
                filepath = casava_directory + "/" + file
ckuchly's avatar
ckuchly committed
619

ckuchly's avatar
ckuchly committed
620
621
622
623
624
                if file.endswith(".log"):
                    self.log_files.append(filepath)
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_10X all_samples_id = " + ",".join(all_samples_id))
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_10X exiting")
            
625
        return all_samples, all_samples_id
626

ckuchly's avatar
ckuchly committed
627

628
629
630
    def _process_casava_216(self,casava_directory, project_name, lane_number, input_files):
        """
            Creates samples from casavadir (>=1.9) using input files
631
632
633
634
            @param casava_directory:
            @param project_name:
            @param lane_number:
            @param input_files:
635
636
        """
        raise NotImplementedError
637

638
    def illumina_process(self):
639
        fastqilluminafilter = None
640
        concatenatefastq = None
641
642
643
        filtered_read1_files = []
        filtered_read2_files = []
        saved_files = []
644
        logging.getLogger("ng6").debug("illumina_process entering")
645
        if self.is_casava :
646
            logging.getLogger("ng6").debug("illumina_process self.is_casava")
647
648
            if len(self.log_files) > 0 :
                add_log = self.add_component("BasicAnalysis", [self.log_files,"Log Files","Log files generated during primary analysis","-","-","-","gz", "","log.gz"])
ckuchly's avatar
ckuchly committed
649
            
650
651
652
            if len(self.undetermined_reads1) > 0 :
                if self.casava['mismatch_index'] :
                    demultiplex_stats = self.add_component("DemultiplexStats", [self.get_all_reads("read1"), self.undetermined_reads1, self.get_files_index('read1')])
ckuchly's avatar
ckuchly committed
653
654
655
656
657
658
659
                elif self.is_10Xcasava :
                    logging.getLogger("ng6").debug("illumina_process self.is_10Xcasava = ")
                    logging.getLogger("ng6").debug(self.get_all_reads("read1"))
                    logging.getLogger("ng6").debug("illumina_process undetermined reads = " )
                    logging.getLogger("ng6").debug(self.undetermined_reads1)
                    logging.getLogger("ng6").debug("illumina_process file index =") 
                    logging.getLogger("ng6").debug(self.get_files_index("read1"))
ckuchly's avatar
ckuchly committed
660
                    demultiplex_stats = self.add_component("Demultiplex10XStats", [self.get_all_reads("read1"), self.undetermined_reads1, self.get_files_index("read1")])
661
                else :
ckuchly's avatar
ckuchly committed
662
663
                    demultiplex_stats = self.add_component("DemultiplexStats", [self.get_all_reads("read1"), self.undetermined_reads1])                 

664

665
            if self.keep_reads != "all" :
666
                logging.getLogger("ng6").debug("illumina_process self.keep_reads != all")
ckuchly's avatar
ckuchly committed
667
                logging.getLogger("ng6").debug("illumina_process BEFORE FASTQILLUMINAFILTER self.get_all_reads() = " + ",".join(self.get_all_reads()))
668
                logging.getLogger("ng6").debug("illumina_process self.group_prefix = " + ",".join(self.group_prefix))
669
                # fastq illumina filter
670
                fastqilluminafilter = self.add_component("FastqIlluminaFilter", [self.runobj,self.get_all_reads(), self.keep_reads, self.group_prefix])
671

ckuchly's avatar
ckuchly committed
672
                
673
674
675
676
677
678
679
680
                # list filtered files
                if self.is_paired_end() :
                    # split read 1 and read 2 from filtered files list
                    [filtered_read1_files, filtered_read2_files] = Utils.split_pair(fastqilluminafilter.fastq_files_filtered, (self.group_prefix is not None))
                else:
                    filtered_read1_files = fastqilluminafilter.fastq_files_filtered
                    filtered_read2_files = []
                filtered_read1_files = sorted(filtered_read1_files)
681
                filtered_read2_files = sorted(filtered_read2_files)
682
683
684
685
            else:
                fastqilluminafilter = None
                filtered_read1_files = self.get_all_reads("read1")
                filtered_read2_files = self.get_all_reads("read2")
686

687
            # archive the files
688
            #TODO : if self.group_prefix == None, the create the output of fastqilluminafilter in the run.get_work_directory()
ckuchly's avatar
ckuchly committed
689
            saved_files = filtered_read1_files + filtered_read2_files + self.get_all_reads("index")
690
            logging.getLogger("CasavaNG6Workflow").debug("illumina_process saved_files = " + ",".join(saved_files))
691
692
693
            reads_prefixes = None
            if self.group_prefix != None :
                # concatenate fastq
Penom Nom's avatar
Penom Nom committed
694
                reads_prefixes = list((Utils.get_group_basenames(saved_files, "read")).keys())
695
696
                logging.getLogger("CasavaNG6Workflow").debug("illumina_process reads_prefixes = " + ",".join(reads_prefixes))
                logging.getLogger("CasavaNG6Workflow").debug("illumina_process saved_files = " + ",".join(saved_files))
697
                concatenatefastq = self.add_component("ConcatenateFilesGroups", [self.runobj,saved_files, reads_prefixes])
698
                saved_files = concatenatefastq.concat_files
699
                logging.getLogger("CasavaNG6Workflow").debug("illumina_process after concatenatefastq, saved_files = " + ",".join(saved_files))
700

701
        else :
Penom Nom's avatar
Penom Nom committed
702
            reads_prefixes = None
703
704
705
706
            fastqilluminafilter = None
            filtered_read1_files = self.get_all_reads("read1")
            filtered_read2_files = self.get_all_reads("read2")
            saved_files = self.get_all_reads()
707

Penom Nom's avatar
Penom Nom committed
708
709
710
711
712
713
714
715
        # add raw
        addrawfiles = self.add_component("AddRawFiles", [self.runobj, saved_files, self.compression])
        contam = []
        try :
            contam.append(self.get_resource("phix_bwa"))
            contam.append(self.get_resource("ecoli_bwa"))
            contam.append(self.get_resource("yeast_bwa"))
        except : pass
716

Penom Nom's avatar
Penom Nom committed
717
        # contamination_search
ckuchly's avatar
ckuchly committed
718
719
720
        if contam :
            if self.contamination_databank:  contam.extend(self.contamination_databank)
            contamination_search = self.add_component("ContaminationSearch", [filtered_read1_files+filtered_read2_files, contam, reads_prefixes], parent = fastqilluminafilter)
721

722
723
        # make some statistics on raw file
        fastqc = self.add_component("FastQC", [filtered_read1_files+filtered_read2_files, (self.group_prefix is not None), self.no_group, "fastqc.tar.gz"], parent = fastqilluminafilter)
724
        return fastqilluminafilter, filtered_read1_files, filtered_read2_files, saved_files, concatenatefastq