ng6workflow.py 38.8 KB
Newer Older
Jerome Mariette's avatar
Jerome Mariette committed
1
#
Penom Nom's avatar
Penom Nom committed
2
# Copyright (C) 2015 INRA
3
#
Jerome Mariette's avatar
Jerome Mariette committed
4
5
6
7
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
8
#
Jerome Mariette's avatar
Jerome Mariette committed
9
10
11
12
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
13
#
Jerome Mariette's avatar
Jerome Mariette committed
14
15
16
17
18
19
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import inspect
import os
20
import re
Jerome Mariette's avatar
Jerome Mariette committed
21
import sys
Jerome Mariette's avatar
Jerome Mariette committed
22
import pickle
Jerome Mariette's avatar
Jerome Mariette committed
23
import datetime
24
import argparse
25
import xml.etree.ElementTree as ET
26
import logging
Penom Nom's avatar
Penom Nom committed
27
from jflow.workflow import Workflow
28
from jflow.utils import display_error_message
Penom Nom's avatar
Penom Nom committed
29
from jflow.parameter import *
Jerome Mariette's avatar
Jerome Mariette committed
30

31
from ng6.t3MySQLdb import t3MySQLdb
Jerome Mariette's avatar
Jerome Mariette committed
32
33
from ng6.project import Project
from ng6.run import Run
34
35
from ng6.sample import Sample
from ng6.utils import Utils
ckuchly's avatar
ckuchly committed
36
from ng6.config_reader import NG6ConfigReader
Jerome Mariette's avatar
Jerome Mariette committed
37

38
39
class BasicNG6Workflow (Workflow):

Penom Nom's avatar
Penom Nom committed
40
41
    def __init__(self, args={}, id=None, function= "process"):
        Workflow.__init__(self, args, id, function)
Penom Nom's avatar
Penom Nom committed
42
        self.add_parameter("admin_login", "Who is the project administrator", required = True, type = 'ng6userlogin', display_name="Admin login")
Celine Noirot's avatar
Celine Noirot committed
43

44
    def add_component(self, component_name, args=[], kwargs={}, component_prefix="default", parent=None, addto="run"):
Romain Therville's avatar
Romain Therville committed
45
        logging.getLogger("ng6").debug("addto, logging test 1")
Jerome Mariette's avatar
Jerome Mariette committed
46
        # first build and check if this component is OK
Penom Nom's avatar
Penom Nom committed
47
        if component_name in self.internal_components or component_name in self.external_components:
48

Penom Nom's avatar
Penom Nom committed
49
            if component_name in self.internal_components :
Penom Nom's avatar
Penom Nom committed
50
                my_pckge = __import__(self.internal_components[component_name], globals(), locals(), [component_name])
Penom Nom's avatar
Penom Nom committed
51
52
53
                # build the object and define required field
                cmpt_object = getattr(my_pckge, component_name)()
                cmpt_object.output_directory = self.get_component_output_directory(component_name, component_prefix)
54
                cmpt_object.set_prefix(component_prefix)
Penom Nom's avatar
Penom Nom committed
55
56
57
58
59
60
                if kwargs: cmpt_object.define_parameters(**kwargs)
                else: cmpt_object.define_parameters(*args)
            # external components
            else :
                cmpt_object = self.external_components[component_name]()
                cmpt_object.output_directory = self.get_component_output_directory(component_name, component_prefix)
61
                cmpt_object.set_prefix(component_prefix)
Penom Nom's avatar
Penom Nom committed
62
63
                # can't use positional arguments with external components
                cmpt_object.define_parameters(**kwargs)
64
65
66
67
68
            # if the built object is an analysis
            for derived_class in cmpt_object.__class__.__bases__:
                if derived_class.__name__ == "Analysis":
                    # add parent and project/run information
                    cmpt_object.parent = parent
Romain Therville's avatar
Romain Therville committed
69
70
71
72
73
                    if addto == "project":
                        cmpt_object.project = self.project
                        cmpt_object.space_id = cmpt_object.project.space_id
                    elif addto == "run":
                        cmpt_object.run = self.runobj
Romain Therville's avatar
Romain Therville committed
74
                        #We replace the default space_id 
Romain Therville's avatar
Romain Therville committed
75
76
                        if cmpt_object.space_id == "default":
                            cmpt_object.space_id = cmpt_object.run.space_id
77
                    
Celine Noirot's avatar
Celine Noirot committed
78
                    # link analysis with ots create user
Penom Nom's avatar
Penom Nom committed
79
                    cmpt_object.admin_login = self.admin_login
80
            # there is a dynamic component
81
            if cmpt_object.is_dynamic():
82
83
84
85
                self.dynamic_component_present = True
                # if already init, add the component to the list and check if weaver should be executed
                if self.component_nameids_is_init:
                    # add the component
Penom Nom's avatar
Penom Nom committed
86
                    self.components_to_exec.append(cmpt_object)
87
88
89
90
91
92
93
                    self.components.append(cmpt_object)
                    self._execute_weaver()
                    # update outputs
                    for output in cmpt_object.get_dynamic_outputs():
                        output.update()
                else:
                    if self._component_is_duplicated(cmpt_object):
94
                        raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
95
96
                                            cmpt_object.prefix + " already exist in this pipeline!")
                    self.component_nameids[cmpt_object.get_nameid()] = None
Penom Nom's avatar
Penom Nom committed
97
                    self.components_to_exec = []
98
99
100
101
                    self.components = []
            else:
                if self.component_nameids_is_init:
                    # add the component
Penom Nom's avatar
Penom Nom committed
102
                    self.components_to_exec.append(cmpt_object)
103
104
105
                    self.components.append(cmpt_object)
                elif not self.component_nameids_is_init and not self.dynamic_component_present:
                    if self._component_is_duplicated(cmpt_object):
106
                        raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
107
                                            cmpt_object.prefix + " already exist in this pipeline!")
Penom Nom's avatar
Penom Nom committed
108
                    self.components_to_exec.append(cmpt_object)
109
110
111
                    self.components.append(cmpt_object)
                else:
                    if self._component_is_duplicated(cmpt_object):
112
                        raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
113
114
                                            cmpt_object.prefix + " already exist in this pipeline!")
                    self.component_nameids[cmpt_object.get_nameid()] = None
ckuchly's avatar
ckuchly committed
115

116
            return cmpt_object
Jerome Mariette's avatar
Jerome Mariette committed
117
        else:
Penom Nom's avatar
Penom Nom committed
118
            raise ImportError(component_name + " component cannot be loaded, available components are: {0}".format(
Penom Nom's avatar
Penom Nom committed
119
                                           ", ".join(list(self.internal_components.keys()) + list(self.external_components.keys()))))
120

121
122
123
124
125
    def pre_process(self):
        t3mysql = t3MySQLdb()
        infos = t3mysql.get_user_info(self.admin_login)
        if infos['email'] :
            self.set_to_address(infos['email'])
126

Penom Nom's avatar
Penom Nom committed
127

Penom Nom's avatar
Penom Nom committed
128
class DownloadWorkflow(Workflow):
129
130
131
132
    """
        Base class for download workflows classes
    """
    def __init__(self, args={}, id=None, function= "process"):
Penom Nom's avatar
Penom Nom committed
133
134
        Workflow.__init__(self, args, id, function)
        self.add_parameter("admin_login", "Who is the project administrator", type = 'ng6userlogin', display_name="Admin login")
135
136
137
        self.add_parameter_list('data_id', 'Ids of a run from which rawdata will be retrieved', type = 'existingrun')
        self.add_parameter_list('run_id', 'Ids of run from which all data will be retrieved', type = 'existingrun')
        self.add_parameter_list('analysis_id', 'Ids of analysis to retrieve', type = 'existinganalysis')
Celine Noirot's avatar
Celine Noirot committed
138

139
class NG6Workflow (BasicNG6Workflow):
140

Penom Nom's avatar
Penom Nom committed
141
142
143
    def __init__(self, args={}, id=None, function= "process"):
        BasicNG6Workflow.__init__(self, args, id, function)
        self._add_run_parameters()
144
        self.__add_sample_parameters__()
145

Penom Nom's avatar
Penom Nom committed
146
147
148
        self.samples = []
        self.reads1 = []
        self.reads2 = []
149
        self.readsi = []
Penom Nom's avatar
Penom Nom committed
150
        self.samples_names = []
151
152
        self.reads1_indexes = []
        self.reads2_indexes = []
153

Penom Nom's avatar
Penom Nom committed
154
    def _add_run_parameters(self):
Penom Nom's avatar
Penom Nom committed
155
        self.add_parameter("project_name", "The project name the run belongs to", required = True, type = 'existingproject', group="Run information")
Penom Nom's avatar
Penom Nom committed
156
157
        self.add_parameter("run_name", "Give a name to your run", flag = "--name", required = True, display_name="Name", group="Run information")
        self.add_parameter("run_description", "Give a description to your run", flag = "--description", required = True, display_name="Description", group="Run information")
Penom Nom's avatar
Penom Nom committed
158
        self.add_parameter("run_date", "When were the data produced", flag = "--date", required = True, type = 'date', display_name="Date", group="Run information")
Penom Nom's avatar
Penom Nom committed
159
160
161
162
        self.add_parameter("data_nature", "Are Sequences cDNA, genomique, RNA, ...", required = True, display_name="Data nature", group="Run information")
        self.add_parameter("sequencer", "Which sequencer produced the data", required = True, display_name="Sequencer", group="Run information")
        self.add_parameter("species", "Which species has been sequenced", required = True, display_name="Species", group="Run information")
        self.add_parameter("run_type", "What type of data is it (1 lane, 1 region)", flag = "--type", required = True, display_name="Type", group="Run information")
163
        self.add_parameter("align_subset_reads", "Align only on subset reads", type=bool, default = False)
164

165
    def __add_sample_parameters__(self):
Celine Noirot's avatar
Celine Noirot committed
166
        self.add_multiple_parameter_list("input_sample", "Definition of a sample", flag="--sample",  group="Sample description") # required = True, # TO CHECK casavaWorkflow required not if casava dir
Penom Nom's avatar
Penom Nom committed
167
168
        self.add_parameter("sample_id", "The uniq identifier of the sample", type="nospacestr", add_to = "input_sample")
        self.add_parameter("sample_name", "A descriptive name for the sample", type="nospacestr", add_to = "input_sample")
Penom Nom's avatar
Penom Nom committed
169
        self.add_parameter("sample_description", "A brief description of the sample", add_to = "input_sample")
170
        self.add_parameter("type", "Read orientation and type", choices = Sample.AVAILABLE_TYPES, default='se', add_to = "input_sample")
Penom Nom's avatar
Penom Nom committed
171
172
        self.add_parameter("insert_size", "Insert size for paired end reads", type = int, add_to = "input_sample")
        self.add_parameter("species", "Species related to this sample", add_to = "input_sample")
Penom Nom's avatar
Penom Nom committed
173
        self.add_parameter_list("metadata", "Add metadata to the sample", type='samplemetadata'  ,add_to = "input_sample")
Penom Nom's avatar
Penom Nom committed
174
175
        self.add_input_file_list("read1", "Read 1 data file path", required = True, add_to = "input_sample")
        self.add_input_file_list("read2", "Read 2 data file path", add_to = "input_sample")
ckuchly's avatar
ckuchly committed
176
        self.add_input_file_list("index",  "Index data file path", add_to = "input_sample")
177

Penom Nom's avatar
Penom Nom committed
178
    def __create_samples__(self):
Penom Nom's avatar
Penom Nom committed
179
        for sd in self.input_sample :
180
            sp_object = Sample( sd['sample_id'], sd['read1'], sd['read2'], sd['index'], name = sd['sample_name'], description = sd['sample_description'], type = sd['type'],
Penom Nom's avatar
Penom Nom committed
181
                               insert_size = sd['insert_size'], species = sd['species'] )
182

183
184
185
            for metadata in sd['metadata'] :
                k, v = metadata.split(':', 2)
                sp_object.add_metadata(k, v)
186
            self.samples.append(sp_object)
187

Penom Nom's avatar
Penom Nom committed
188
    def __preprocess_samples__(self):
Penom Nom's avatar
Penom Nom committed
189
190
191
        samples_ids = []
        pidx = 1
        nidx = 1
192
193
194
195
        for rang, sample in enumerate(self.samples) :
            print(sample)
            print("sample readsi")
            print(sample.readsi)
Penom Nom's avatar
Penom Nom committed
196
197
198
199
200
            if sample.name :
                self.samples_names.append(sample.name)
            else :
                sample.name = 'SAMPLE_' + str(nidx)
                nidx += 1
201

Penom Nom's avatar
Penom Nom committed
202
203
            if sample.sample_id :
                if sample.sample_id in samples_ids :
204
205
                    display_error_message( "Sample identifier %s must be uniq ( sample %s ).\n" % sample.sample_id, sample.name )
            # add an automatic id for samples
Penom Nom's avatar
Penom Nom committed
206
207
208
209
            else :
                sample.sample_id = 'sample_' + str(pidx)
                pidx += 1
            samples_ids.append(sample.sample_id)
210

211
212
213
            for rfile in sample.reads1 :
                self.reads1_indexes.append(sample.sample_id)
                self.reads1.append(rfile)
214

215
216
217
            for rfile in sample.reads2 :
                self.reads2_indexes.append(sample.sample_id)
                self.reads2.append(rfile)
218
219
220
            
            for rfile in sample.readsi :
                self.readsi.append(rfile)
221

Penom Nom's avatar
Penom Nom committed
222
223
        if len(self.samples_names) != 0 :
            if len(self.samples_names) != len (self.samples) :
224
                display_error_message( "All samples must have a defined sample name" )
225

Penom Nom's avatar
Penom Nom committed
226
    def get_all_reads(self, type = None):
Penom Nom's avatar
Penom Nom committed
227
        if type == 'read1' :
Penom Nom's avatar
Penom Nom committed
228
            return self.reads1
Penom Nom's avatar
Penom Nom committed
229
        elif type == 'read2' :
Penom Nom's avatar
Penom Nom committed
230
            return self.reads2
ckuchly's avatar
ckuchly committed
231
        elif type == 'index' : 
232
233
            return self.readsi
        return self.reads1 + self.reads2 + self.readsi
234

235
236
237
238
239
240
    def get_files_index(self, type = None):
        if type == 'read1' :
            return self.reads1_indexes
        elif type == 'read2' :
            return self.reads2_indexes
        return self.reads1_indexes + self.reads2_indexes
241

242
    def is_paired_end(self):
Penom Nom's avatar
Penom Nom committed
243
        return len(self.reads2) > 0
244

245
    def pre_process(self):
246
        BasicNG6Workflow.pre_process(self)
247
        # start from an existing project
Penom Nom's avatar
Penom Nom committed
248
249
        self.project = Project.get_from_name(self.project_name)
        self.metadata.append("project_id="+str(self.project.id))
250
        # if user is not allowed to add data on project (is not admin)
Penom Nom's avatar
Penom Nom committed
251
        if self.project is not None and not self.project.is_admin(self.admin_login):
252
            display_error_message( "The user login '" + self.admin_login + "' is not allowed to add data on project '" + self.project.name + "'.\n" )
253

254
        # build the run
255
        self.runobj = Run(self.run_name, self.run_date, self.species, self.data_nature,
256
                          self.run_type, self.run_description, self.sequencer, self.project.space_id)
Penom Nom's avatar
Penom Nom committed
257
        self.runobj.admin_login=self.admin_login
258
259
260
        # then add the run to the project
        self.project.add_run(self.runobj)
        self.metadata.append("run_id="+str(self.runobj.id))
261

Penom Nom's avatar
Penom Nom committed
262
        self.__create_samples__()
Penom Nom's avatar
Penom Nom committed
263
        self.__preprocess_samples__()
Penom Nom's avatar
Penom Nom committed
264
265
        # add samples to run
        self.runobj.add_samples(self.samples)
266

267
    def post_process(self):
Jerome Mariette's avatar
Jerome Mariette committed
268
        # once everything done, sync directories
269
        logging.getLogger("ng6").debug("post_process enter")
Jerome Mariette's avatar
Jerome Mariette committed
270
        if self.runobj:
271
            logging.getLogger("ng6").debug("post_process enter self.runobj.sync")
Jerome Mariette's avatar
Jerome Mariette committed
272
            self.runobj.sync()
Jerome Mariette's avatar
Jerome Mariette committed
273
        elif self.project:
274
            logging.getLogger("ng6").debug("post_process enter self.project.sync")
Jerome Mariette's avatar
Jerome Mariette committed
275
            self.project.sync()
276

277

278
def get_files_from_casava(casava_directory, project_name, lane_number):
Penom Nom's avatar
Penom Nom committed
279
280
281
282
283
284
    """
        Retrieve all fastq files of a specific project and lane number from a given casava directory
        @param casava_directory : path to CASAVA output directory
        @param project_name : project name
        @param lane_number : lane number
    """
285

286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
    def bcl2fastq_18(directory, pname, lane):
        """bcl2fastq <= 1.8"""
        files = []
        with open(os.path.join(directory, "SampleSheet.mk")) as fh :
            subdirs_list = []
            for line in fh :
                if line.startswith("l" + str(lane) + "_SUBDIRS"):
                    parts = line.strip().split(":=")
                    subdirs_list = parts[1].split(" ")
            # parse samples
            for subdir in subdirs_list:
                # filter on project name
                if re.match("Project_" + pname + "/Sample_.+", subdir) or subdir.startswith("Undetermined_indices"):
                    for file in os.listdir(directory + "/" + subdir):
                        filepath = directory + "/" + subdir + "/" + file
                        if file.endswith(".fastq.gz") and re.search(".*_L00" + str(lane) + "_.*", file):
                            files.append(filepath);
        return files
ckuchly's avatar
ckuchly committed
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
        
    def bcl2fastq_10X(directory, pname, lane):
        """longranger"""
        files = []
        with open(os.path.join(directory, "SampleSheet_10X.mk")) as fh :
            subdirs_list = []
            for line in fh :
                if line.startswith("l" + str(lane) + "_SUBDIRS"):
                    parts = line.strip().split(":=")
                    subdirs_list = parts[1].split(" ")
            # parse samples
            for subdir in subdirs_list:
                # filter on project name
                if re.match("Project_" + pname + "/Sample_.+", subdir) or subdir.startswith("Undetermined_indices"):
                    for file in os.listdir(directory + "/" + subdir):
                        filepath = directory + "/" + subdir + "/" + file
                        if file.endswith(".fastq.gz") and re.search(".*_L00" + str(lane) + "_.*", file):
                            files.append(filepath);
        return files
323

324
325
326
327
328
329
    def bcl2fastq_216(directory, pname, lane):
        """bcl2fastq >= 1.9"""
        files = []
        tree = ET.parse( os.path.join( directory, 'Stats', 'DemultiplexingStats.xml'))
        root = tree.getroot()
        project = root.find(".//Project[@name='%s']"%pname)
330

331
        if project is not None :
332
333
            project_files = os.listdir(directory + "/" + pname)

334
335
336
337
338
339
340
341
342
343
344
            for sample in project.findall("./Sample") :
                if sample.get('name') != 'all' :
                    for barcode in sample.findall('./Barcode'):
                        if barcode.get('name') != 'all':
                            lnum = int(barcode.find('Lane').get('number'))
                            if lnum == lane :
                                fileregexp = '%s_S\d_L%03d_'%(sample.get('name'), lnum)
                                for pfile in project_files :
                                    if re.match(fileregexp,pfile) :
                                        files.append(os.path.join(directory,pname,pfile))
        return files
345

346
347
348
349
    if os.path.exists(os.path.join(casava_directory, "SampleSheet.mk")) :
        return bcl2fastq_18(casava_directory, project_name, lane_number)
    elif os.path.exists(os.path.join( casava_directory, 'Stats', 'DemultiplexingStats.xml')) :
        return bcl2fastq_216(casava_directory, project_name, lane_number)
ckuchly's avatar
ckuchly committed
350
351
    elif os.path.exists(os.path.join(casava_directory, "SampleSheet_10X.mk")) :
        return bcl2fastq_10X(casava_directory, project_name, lane_number)
352
353
354



355
class CasavaNG6Workflow(NG6Workflow):
356

Penom Nom's avatar
Penom Nom committed
357
358
    def __init__(self, args={}, id=None, function= "process"):
        NG6Workflow.__init__(self, args, id, function)
359

Penom Nom's avatar
Penom Nom committed
360
361
362
        self.group_prefix = None
        self.undetermined_reads1 = []
        self.undetermined_reads2 = []
ckuchly's avatar
ckuchly committed
363
        self.undetermined_index = []
364
        self.log_files = []
365
        self.is_casava = False
ckuchly's avatar
ckuchly committed
366
        self.is_10Xcasava = False
367

368
    def __add_sample_parameters__(self):
369
370
        self.add_multiple_parameter('casava', 'Provide the options to retrieve samples from a CASAVA directory', group="Sample description")
        self.add_input_directory("directory", "Path to the CASAVA directory to use", required=True, get_files_fn=get_files_from_casava, add_to="casava" )
Celine Noirot's avatar
Celine Noirot committed
371
        self.add_parameter("lane", "The lane number to be retrieved from the casava directory",  type='int', add_to="casava") #required=True,
372
        self.add_parameter('project', 'The name of the project to retrieve in casava directory. The default name is the name of the nG6  project',add_to="casava")
373
        self.add_parameter('mismatch_index', 'Set this value to true if the index sequence in the sample fastq files allows at least 1 mismatch',
374
                           type ='bool', add_to="casava")
375
        self.add_parameter_list('select_sample_id', 'The ids of the sample that will be selected in the SampleSheet.mk file. By default all samples are selected.', add_to="casava",
376
                           display_name = 'Selected samples')
377

378
        NG6Workflow.__add_sample_parameters__(self)
379

Celine Noirot's avatar
Celine Noirot committed
380
        #TODO exclude self.add_exclusion_rule("casava", "input_sample")
381

382
        self.add_parameter("compression", "How should the data be compressed once archived", choices= [ "none", "gz", "bz2"], default = "none")
383
        self.add_parameter("keep_reads", "Keep or discard reads which pass the illumina filter. 'all' option will keep all reads", flag = "--keep",
384
385
386
                           choices=[ "pass_illumina_filters", "not_pass_illumina_filters", "all"], default = "pass_illumina_filters")
        self.add_input_file_list("contamination_databank", "Which databank should be used to seek contamination (as to be phiX databank indexed for bwa)")
        self.add_parameter("no_group", "Disables grouping of bases for reads >50bp", type=bool, default = True)
387
388


Penom Nom's avatar
Penom Nom committed
389
    def __create_samples__(self):
390
        """
391
            Create samples object from a casava directory if provided
392
393
394
            @param casava_directory : path to CASAVA output directory
            @param lane_number : files in each sample are sequenced on this lane
        """
395
        logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ entering")
396

397
        if self.casava and self.casava["directory"] and self.casava["lane"] :
398
            logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ self.casava")
399
400
401
402
            self.is_casava = True
            casava_directory = self.casava["directory"]
            lane_number = self.casava["lane"]
            all_samples, all_samples_id = [], []
403
404

            # get the casava project_name
405
406
407
408
            if self.casava["project"] :
                project_name = self.casava["project"]
            else :
                project_name = self.project_name
409

410
411
            project_name = project_name.replace(" ", "_")
            input_files = casava_directory.get_files( project_name, lane_number)
412
            logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ input_files = " + ",".join(input_files))
413
414
            if len(input_files) == 0 :
                raise Exception("Error while parsing casava directory %s, invalid project name '%s' for lane %s"% (casava_directory, project_name, lane_number))
415

416
417
            all_samples, all_samples_id = [], []
            if os.path.exists(os.path.join(casava_directory, "SampleSheet.mk")) :
418

419
                logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ before self._process_casava_18")
420
                all_samples, all_samples_id = self._process_casava_18(casava_directory, project_name, lane_number, input_files)
421
                logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ before self._process_casava_18")
ckuchly's avatar
ckuchly committed
422

423
424
            elif os.path.exists(os.path.join( casava_directory, 'Stats', 'DemultiplexingStats.xml')) :
                all_samples, all_samples_id = self._process_casava_216(casava_directory, project_name, lane_number, input_files)
425

ckuchly's avatar
ckuchly committed
426
427
428
429
430
431
            elif os.path.exists(os.path.join( casava_directory, "SampleSheet_10X.mk")):
                logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ before self._process_casava_10X")
                all_samples, all_samples_id = self._process_casava_10X(casava_directory, project_name, lane_number, input_files)
                self.is_10Xcasava = True
                logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ after self._process_casava_10X")

432
            selected_samples = self.casava['select_sample_id']
ckuchly's avatar
ckuchly committed
433
            logging.getLogger("CasavaNG6Workflow").debug("__create_samples__. all_samples_id = "+", ".join(all_samples_id))
434
435
436
            if selected_samples :
                for sid in selected_samples :
                    assert sid in all_samples_id , "The sample id %s is not in the SampleSheet.mk" % sid
437

Penom Nom's avatar
Penom Nom committed
438
439
            for sample in all_samples :
                if selected_samples :
440
                    if sample.name in selected_samples :
Penom Nom's avatar
Penom Nom committed
441
442
443
                        self.samples.append(sample)
                else :
                    self.samples.append(sample)
444
445
        else :
            NG6Workflow.__create_samples__(self)
446

Penom Nom's avatar
Penom Nom committed
447
448
    def __preprocess_samples__(self):
        NG6Workflow.__preprocess_samples__(self)
ckuchly's avatar
ckuchly committed
449

450
        if self.is_casava:
Penom Nom's avatar
Penom Nom committed
451
            self.group_prefix = list((Utils.get_group_basenames(self.get_all_reads(), "read")).keys())
452
453

    def _process_casava_18(self, casava_directory, project_name, lane_number, input_files):
454
455
        logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 enter")
        logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 casava_directory = " + casava_directory + ", project_name = " + str(project_name))
456
457
        """
            Creates samples from casavadir (<=1.8) using input files
458
459
460
461
            @param casava_directory:
            @param project_name:
            @param lane_number:
            @param input_files:
462
463
464
        """
        all_samples = []
        all_samples_id = []
465

466
467
        # open casava samplesheet again to associate our files with a sample
        with open(os.path.join(casava_directory, "SampleSheet.mk")) as fh :
468
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 SampleSheet.mk exists")
469
470
471
472
473
474
475
476
477
478
479
480
481
            barcodes_list = []
            sample_ids_list = []
            subdirs_list = []
            for line in fh :
                if line.startswith("l" + str(lane_number) + "_BARCODES"):
                    parts = line.strip().split(":=")
                    barcodes_list = [ re.sub( r"[-_\s]+", "", x) for x in  parts[1].split() ]
                elif line.startswith("l" + str(lane_number) + "_SAMPLEIDS" ):
                    parts = line.strip().split(":=")
                    sample_ids_list = parts[1].split(" ")
                elif line.startswith("l" + str(lane_number) + "_SUBDIRS"):
                    parts = line.strip().split(":=")
                    subdirs_list = parts[1].split(" ")
482

483
            assert  len(barcodes_list) == len(sample_ids_list) == len(subdirs_list), "Invalid lane {0} in SampleSheet.mk".format(lane_number)
484
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 SampleSheet.mk parsed")
485
486
487
488
489
490
491
            # parse samples
            for i in range(len(barcodes_list)):
                sample = {
                    'barcode'                   : barcodes_list[i],
                    'sample_id'                 : sample_ids_list[i],
                    'subdir'                    : subdirs_list[i],
                    'reads1'                    : [],
492
493
                    'reads2'                    : [],
                    'readsi'                    : []
494
                }
495

496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
                # filter on project name
                if re.match("Project_" + project_name + "/Sample_.+", sample['subdir']) or sample['subdir'].startswith("Undetermined_indices"):
                    for file in os.listdir(casava_directory + "/" + sample['subdir']):
                        filepath = casava_directory + "/" + sample['subdir'] + "/" + file
                        if file.endswith(".fastq.gz") and re.search(".*_L00" + str(lane_number) + "_.*", file):
                            for idx, iofile in enumerate(input_files) :
                                if iofile == filepath :
                                    if re.search(".*_R1_.*", file):
                                        if not sample['subdir'].startswith("Undetermined_indices"):
                                            sample['reads1'].append(iofile)
                                        else:
                                            self.undetermined_reads1.append(iofile)
                                    if re.search(".*_R2_.*", file):
                                        if not sample['subdir'].startswith("Undetermined_indices"):
                                            sample['reads2'].append(iofile)
                                        else:
                                            self.undetermined_reads2.append(iofile)
                                    input_files.pop(idx)
                                    break

                    if not sample['subdir'].startswith("Undetermined_indices") :
517
                        logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 create sample " + sample['sample_id'])
518
                        sp_object = Sample(sample['barcode'], sample['reads1'], reads2 = sample['reads2'], readsi = [], name=sample['sample_id'])
519
520
                        sp_object.add_metadata('barcode', sample['barcode'])
                        sp_object.add_metadata('is_casava', True)
521

522
523
                        all_samples.append(sp_object)
                        all_samples_id.append(sample['sample_id'])
524
525
526
527
            for file in os.listdir(casava_directory):
                filepath = casava_directory + "/" + file
                if file.endswith(".log"):
                    self.log_files.append(filepath)
Gerald Salin's avatar
Gerald Salin committed
528
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 self.log_files = " + ",".join(self.log_files))
529
530
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 all_samples_id = " + ",".join(all_samples_id))
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 exiting")
ckuchly's avatar
ckuchly committed
531
532
            
        return all_samples, all_samples_id
533

ckuchly's avatar
ckuchly committed
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
    def _process_casava_10X(self,casava_directory, project_name, lane_number, input_files):
        logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_10X enter")
        logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_10X casava_directory = " + casava_directory + ", project_name = " + str(project_name))
        """
            Creates samples from casavadir from longranger demultiplexing
            @param casava_directory:
            @param project_name:
            @param lane_number:
            @param input_files:
        """
        all_samples = []
        all_samples_id = []

        # open casava samplesheet again to associate our files with a sample
        with open(os.path.join(casava_directory, "SampleSheet_10X.mk")) as fh :
            barcodes_list = []
            sample_ids_list = []
            subdirs_list = []
            for line in fh :
                if line.startswith("l" + str(lane_number) + "_BARCODES"):
                    parts = line.strip().split(":=")
                    barcodes_list = [ re.sub( r"[_\s]+", "", x) for x in  parts[1].split() ]
                elif line.startswith("l" + str(lane_number) + "_SAMPLEIDS" ):
                    parts = line.strip().split(":=")
                    sample_ids_list = parts[1].split(" ")
                elif line.startswith("l" + str(lane_number) + "_SUBDIRS"):
                    parts = line.strip().split(":=")
                    subdirs_list = parts[1].split(" ")

ckuchly's avatar
ckuchly committed
563
            assert  len(barcodes_list) == len(sample_ids_list) == len(subdirs_list), "Invalid lane {0} in SampleSheet_10X.mk".format(lane_number)
ckuchly's avatar
ckuchly committed
564
565
566
567
568
569

            cfg_reader = NG6ConfigReader()
            indexs = cfg_reader.get_10X_indexs()
            
            # parse samples
            for i in range(len(barcodes_list)):
ckuchly's avatar
ckuchly committed
570
                
ckuchly's avatar
ckuchly committed
571
572
573
574
                if barcodes_list[i] == 'Undetermined' :
                    barcode = 'Undetermined'
                else : 
                    barcode = indexs[barcodes_list[i]]
ckuchly's avatar
ckuchly committed
575

ckuchly's avatar
ckuchly committed
576
577
578
579
580
581
                sample = {
                    'barcode'                   : barcode,
                    'sample_id'                 : sample_ids_list[i],
                    'subdir'                    : subdirs_list[i],
                    'reads1'                    : [],
                    'reads2'                    : [],
582
                    'readsi'                    : []
ckuchly's avatar
ckuchly committed
583
                }
ckuchly's avatar
ckuchly committed
584
                
ckuchly's avatar
ckuchly committed
585
                # filter on project name
ckuchly's avatar
ckuchly committed
586

ckuchly's avatar
ckuchly committed
587
                if re.match("Project_" + project_name + "/Sample_.+", sample['subdir']) or sample['subdir'].startswith("Undetermined_indices"):
ckuchly's avatar
ckuchly committed
588

ckuchly's avatar
ckuchly committed
589
                    for file in os.listdir(casava_directory + "/" + sample['subdir']):
ckuchly's avatar
ckuchly committed
590

ckuchly's avatar
ckuchly committed
591
                        filepath = casava_directory + "/" + sample['subdir'] + "/" + file
ckuchly's avatar
ckuchly committed
592
                        
ckuchly's avatar
ckuchly committed
593
594
595
                        if file.endswith(".fastq.gz") and re.search(".*_L00" + str(lane_number) + "_.*", file):
                            for idx, iofile in enumerate(input_files) :
                                if iofile == filepath :
ckuchly's avatar
ckuchly committed
596

ckuchly's avatar
ckuchly committed
597
                                    if re.search(".*_R1_.*", file):
ckuchly's avatar
ckuchly committed
598

ckuchly's avatar
ckuchly committed
599
600
601
602
603
604
605
606
607
608
609
                                        if not sample['subdir'].startswith("Undetermined_indices"):
                                            sample['reads1'].append(iofile)
                                        else:
                                            self.undetermined_reads1.append(iofile)
                                    if re.search(".*_R2_.*", file):
                                        if not sample['subdir'].startswith("Undetermined_indices"):
                                            sample['reads2'].append(iofile)
                                        else:
                                            self.undetermined_reads2.append(iofile)
                                    if re.search(".*_I1_.*", file):
                                        if not sample['subdir'].startswith("Undetermined_indices"):
610
                                            sample['readsi'].append(iofile)
ckuchly's avatar
ckuchly committed
611
612
613
614
                                        else:
                                            self.undetermined_index.append(iofile)
                                    input_files.pop(idx)
                                    break
615

ckuchly's avatar
ckuchly committed
616
                    if not sample['subdir'].startswith("Undetermined_indices") :
617
                        sp_object = Sample(sample['barcode'], sample['reads1'], reads2 = sample['reads2'], readsi = sample['readsi'], name=sample['sample_id'])
ckuchly's avatar
ckuchly committed
618
619
620
621
622
623
624
                        sp_object.add_metadata('barcode', sample['barcode'])
                        sp_object.add_metadata('is_casava', True)

                        all_samples.append(sp_object)
                        all_samples_id.append(sample['sample_id'])
            for file in os.listdir(casava_directory):
                filepath = casava_directory + "/" + file
ckuchly's avatar
ckuchly committed
625

ckuchly's avatar
ckuchly committed
626
627
628
629
630
                if file.endswith(".log"):
                    self.log_files.append(filepath)
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_10X all_samples_id = " + ",".join(all_samples_id))
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_10X exiting")
            
631
        return all_samples, all_samples_id
632

ckuchly's avatar
ckuchly committed
633

634
635
636
    def _process_casava_216(self,casava_directory, project_name, lane_number, input_files):
        """
            Creates samples from casavadir (>=1.9) using input files
637
638
639
640
            @param casava_directory:
            @param project_name:
            @param lane_number:
            @param input_files:
641
642
        """
        raise NotImplementedError
643

644
    def illumina_process(self):
645
        fastqilluminafilter = None
646
        concatenatefastq = None
647
648
649
        filtered_read1_files = []
        filtered_read2_files = []
        saved_files = []
650
        logging.getLogger("ng6").debug("illumina_process entering")
651
        if self.is_casava :
652
            logging.getLogger("ng6").debug("illumina_process self.is_casava")
653
654
            if len(self.log_files) > 0 :
                add_log = self.add_component("BasicAnalysis", [self.log_files,"Log Files","Log files generated during primary analysis","-","-","-","gz", "","log.gz"])
ckuchly's avatar
ckuchly committed
655
            
656
657
658
            if len(self.undetermined_reads1) > 0 :
                if self.casava['mismatch_index'] :
                    demultiplex_stats = self.add_component("DemultiplexStats", [self.get_all_reads("read1"), self.undetermined_reads1, self.get_files_index('read1')])
ckuchly's avatar
ckuchly committed
659
                elif self.is_10Xcasava :
ckuchly's avatar
ckuchly committed
660
                    demultiplex_stats = self.add_component("Demultiplex10XStats", [self.get_all_reads("read1"), self.undetermined_reads1, self.get_files_index("read1")])
661
                else :
ckuchly's avatar
ckuchly committed
662
663
                    demultiplex_stats = self.add_component("DemultiplexStats", [self.get_all_reads("read1"), self.undetermined_reads1])                 

664

665
            if self.keep_reads != "all" :
666
                logging.getLogger("ng6").debug("illumina_process self.keep_reads != all")
ckuchly's avatar
ckuchly committed
667
                logging.getLogger("ng6").debug("illumina_process BEFORE FASTQILLUMINAFILTER self.get_all_reads() = " + ",".join(self.get_all_reads()))
668
                logging.getLogger("ng6").debug("illumina_process self.group_prefix = " + ",".join(self.group_prefix))
669
                # fastq illumina filter
670
                fastqilluminafilter = self.add_component("FastqIlluminaFilter", [self.runobj,self.get_all_reads(), self.keep_reads, self.group_prefix])
671

ckuchly's avatar
ckuchly committed
672
                
673
674
675
676
677
678
679
680
                # list filtered files
                if self.is_paired_end() :
                    # split read 1 and read 2 from filtered files list
                    [filtered_read1_files, filtered_read2_files] = Utils.split_pair(fastqilluminafilter.fastq_files_filtered, (self.group_prefix is not None))
                else:
                    filtered_read1_files = fastqilluminafilter.fastq_files_filtered
                    filtered_read2_files = []
                filtered_read1_files = sorted(filtered_read1_files)
681
                filtered_read2_files = sorted(filtered_read2_files)
682
683
684
685
            else:
                fastqilluminafilter = None
                filtered_read1_files = self.get_all_reads("read1")
                filtered_read2_files = self.get_all_reads("read2")
686

687
            # archive the files
688
            #TODO : if self.group_prefix == None, the create the output of fastqilluminafilter in the run.get_work_directory()
689
690
691
            logging.getLogger("CasavaNG6Workflow").debug("illumina_process filter reads = " + ",".join(filtered_read1_files))
            logging.getLogger("CasavaNG6Workflow").debug("illumina_process readsi =" + ",".join(self.get_all_reads("index")))
            saved_files = filtered_read1_files + filtered_read2_files + self.get_all_reads("index")
692
            logging.getLogger("CasavaNG6Workflow").debug("illumina_process saved_files = " + ",".join(saved_files))
693
694
695
            reads_prefixes = None
            if self.group_prefix != None :
                # concatenate fastq
Penom Nom's avatar
Penom Nom committed
696
                reads_prefixes = list((Utils.get_group_basenames(saved_files, "read")).keys())
697
                logging.getLogger("CasavaNG6Workflow").debug("illumina_process saved_files = " + ",".join(saved_files))
698
                concatenatefastq = self.add_component("ConcatenateFilesGroups", [self.runobj,saved_files, reads_prefixes])
699
                saved_files = concatenatefastq.concat_files
700
                logging.getLogger("CasavaNG6Workflow").debug("illumina_process after concatenatefastq, saved_files = " + ",".join(saved_files))
701

702
        else :
Penom Nom's avatar
Penom Nom committed
703
            reads_prefixes = None
704
705
706
707
            fastqilluminafilter = None
            filtered_read1_files = self.get_all_reads("read1")
            filtered_read2_files = self.get_all_reads("read2")
            saved_files = self.get_all_reads()
708

Penom Nom's avatar
Penom Nom committed
709
710
711
712
713
714
715
716
        # add raw
        addrawfiles = self.add_component("AddRawFiles", [self.runobj, saved_files, self.compression])
        contam = []
        try :
            contam.append(self.get_resource("phix_bwa"))
            contam.append(self.get_resource("ecoli_bwa"))
            contam.append(self.get_resource("yeast_bwa"))
        except : pass
717

Penom Nom's avatar
Penom Nom committed
718
        # contamination_search
719
720
        if contam :
            if self.contamination_databank:  contam.extend(self.contamination_databank)
ckuchly's avatar
ckuchly committed
721
            contamination_search = self.add_component("ContaminationSearch", [filtered_read1_files+filtered_read2_files, contam, reads_prefixes], parent = fastqilluminafilter)
722

723
724
        # make some statistics on raw file
        fastqc = self.add_component("FastQC", [filtered_read1_files+filtered_read2_files, (self.group_prefix is not None), self.no_group, "fastqc.tar.gz"], parent = fastqilluminafilter)
725
        return fastqilluminafilter, filtered_read1_files, filtered_read2_files, saved_files, concatenatefastq