ng6workflow.py 38.5 KB
Newer Older
Jerome Mariette's avatar
Jerome Mariette committed
1
#
Penom Nom's avatar
Penom Nom committed
2
# Copyright (C) 2015 INRA
3
#
Jerome Mariette's avatar
Jerome Mariette committed
4
5
6
7
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
8
#
Jerome Mariette's avatar
Jerome Mariette committed
9
10
11
12
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
13
#
Jerome Mariette's avatar
Jerome Mariette committed
14
15
16
17
18
19
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import inspect
import os
20
import re
Jerome Mariette's avatar
Jerome Mariette committed
21
import sys
Jerome Mariette's avatar
Jerome Mariette committed
22
import pickle
Jerome Mariette's avatar
Jerome Mariette committed
23
import datetime
24
import argparse
25
import xml.etree.ElementTree as ET
26
import logging
Penom Nom's avatar
Penom Nom committed
27
from jflow.workflow import Workflow
28
from jflow.utils import display_error_message
Penom Nom's avatar
Penom Nom committed
29
from jflow.parameter import *
Jerome Mariette's avatar
Jerome Mariette committed
30

31
from ng6.t3MySQLdb import t3MySQLdb
Jerome Mariette's avatar
Jerome Mariette committed
32
33
from ng6.project import Project
from ng6.run import Run
34
35
from ng6.sample import Sample
from ng6.utils import Utils
ckuchly's avatar
ckuchly committed
36
from ng6.config_reader import NG6ConfigReader
Jerome Mariette's avatar
Jerome Mariette committed
37

38
39
class BasicNG6Workflow (Workflow):

Penom Nom's avatar
Penom Nom committed
40
41
    def __init__(self, args={}, id=None, function= "process"):
        Workflow.__init__(self, args, id, function)
Penom Nom's avatar
Penom Nom committed
42
        self.add_parameter("admin_login", "Who is the project administrator", required = True, type = 'ng6userlogin', display_name="Admin login")
Celine Noirot's avatar
Celine Noirot committed
43

44
    def add_component(self, component_name, args=[], kwargs={}, component_prefix="default", parent=None, addto="run"):
Romain Therville's avatar
Romain Therville committed
45
        logging.getLogger("ng6").debug("addto, logging test 1")
Jerome Mariette's avatar
Jerome Mariette committed
46
        # first build and check if this component is OK
Penom Nom's avatar
Penom Nom committed
47
        if component_name in self.internal_components or component_name in self.external_components:
48

Penom Nom's avatar
Penom Nom committed
49
            if component_name in self.internal_components :
Penom Nom's avatar
Penom Nom committed
50
                my_pckge = __import__(self.internal_components[component_name], globals(), locals(), [component_name])
Penom Nom's avatar
Penom Nom committed
51
52
53
                # build the object and define required field
                cmpt_object = getattr(my_pckge, component_name)()
                cmpt_object.output_directory = self.get_component_output_directory(component_name, component_prefix)
54
                cmpt_object.set_prefix(component_prefix)
Penom Nom's avatar
Penom Nom committed
55
56
57
58
59
60
                if kwargs: cmpt_object.define_parameters(**kwargs)
                else: cmpt_object.define_parameters(*args)
            # external components
            else :
                cmpt_object = self.external_components[component_name]()
                cmpt_object.output_directory = self.get_component_output_directory(component_name, component_prefix)
61
                cmpt_object.set_prefix(component_prefix)
Penom Nom's avatar
Penom Nom committed
62
63
                # can't use positional arguments with external components
                cmpt_object.define_parameters(**kwargs)
64
65
66
67
68
            # if the built object is an analysis
            for derived_class in cmpt_object.__class__.__bases__:
                if derived_class.__name__ == "Analysis":
                    # add parent and project/run information
                    cmpt_object.parent = parent
Romain Therville's avatar
Romain Therville committed
69
70
71
72
73
                    if addto == "project":
                        cmpt_object.project = self.project
                        cmpt_object.space_id = cmpt_object.project.space_id
                    elif addto == "run":
                        cmpt_object.run = self.runobj
Romain Therville's avatar
Romain Therville committed
74
                        #We replace the default space_id 
Romain Therville's avatar
Romain Therville committed
75
76
                        if cmpt_object.space_id == "default":
                            cmpt_object.space_id = cmpt_object.run.space_id
77
                    
Celine Noirot's avatar
Celine Noirot committed
78
                    # link analysis with ots create user
Penom Nom's avatar
Penom Nom committed
79
                    cmpt_object.admin_login = self.admin_login
80
            # there is a dynamic component
81
            if cmpt_object.is_dynamic():
82
83
84
85
                self.dynamic_component_present = True
                # if already init, add the component to the list and check if weaver should be executed
                if self.component_nameids_is_init:
                    # add the component
Penom Nom's avatar
Penom Nom committed
86
                    self.components_to_exec.append(cmpt_object)
87
88
89
90
91
92
93
                    self.components.append(cmpt_object)
                    self._execute_weaver()
                    # update outputs
                    for output in cmpt_object.get_dynamic_outputs():
                        output.update()
                else:
                    if self._component_is_duplicated(cmpt_object):
94
                        raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
95
96
                                            cmpt_object.prefix + " already exist in this pipeline!")
                    self.component_nameids[cmpt_object.get_nameid()] = None
Penom Nom's avatar
Penom Nom committed
97
                    self.components_to_exec = []
98
99
100
101
                    self.components = []
            else:
                if self.component_nameids_is_init:
                    # add the component
Penom Nom's avatar
Penom Nom committed
102
                    self.components_to_exec.append(cmpt_object)
103
104
105
                    self.components.append(cmpt_object)
                elif not self.component_nameids_is_init and not self.dynamic_component_present:
                    if self._component_is_duplicated(cmpt_object):
106
                        raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
107
                                            cmpt_object.prefix + " already exist in this pipeline!")
Penom Nom's avatar
Penom Nom committed
108
                    self.components_to_exec.append(cmpt_object)
109
110
111
                    self.components.append(cmpt_object)
                else:
                    if self._component_is_duplicated(cmpt_object):
112
                        raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
113
114
                                            cmpt_object.prefix + " already exist in this pipeline!")
                    self.component_nameids[cmpt_object.get_nameid()] = None
ckuchly's avatar
ckuchly committed
115

116
            return cmpt_object
Jerome Mariette's avatar
Jerome Mariette committed
117
        else:
Penom Nom's avatar
Penom Nom committed
118
            raise ImportError(component_name + " component cannot be loaded, available components are: {0}".format(
Penom Nom's avatar
Penom Nom committed
119
                                           ", ".join(list(self.internal_components.keys()) + list(self.external_components.keys()))))
120

121
122
123
124
125
    def pre_process(self):
        t3mysql = t3MySQLdb()
        infos = t3mysql.get_user_info(self.admin_login)
        if infos['email'] :
            self.set_to_address(infos['email'])
126

Penom Nom's avatar
Penom Nom committed
127

Penom Nom's avatar
Penom Nom committed
128
class DownloadWorkflow(Workflow):
129
130
131
132
    """
        Base class for download workflows classes
    """
    def __init__(self, args={}, id=None, function= "process"):
Penom Nom's avatar
Penom Nom committed
133
134
        Workflow.__init__(self, args, id, function)
        self.add_parameter("admin_login", "Who is the project administrator", type = 'ng6userlogin', display_name="Admin login")
135
136
137
        self.add_parameter_list('data_id', 'Ids of a run from which rawdata will be retrieved', type = 'existingrun')
        self.add_parameter_list('run_id', 'Ids of run from which all data will be retrieved', type = 'existingrun')
        self.add_parameter_list('analysis_id', 'Ids of analysis to retrieve', type = 'existinganalysis')
Celine Noirot's avatar
Celine Noirot committed
138

139
class NG6Workflow (BasicNG6Workflow):
140

Penom Nom's avatar
Penom Nom committed
141
142
143
    def __init__(self, args={}, id=None, function= "process"):
        BasicNG6Workflow.__init__(self, args, id, function)
        self._add_run_parameters()
144
        self.__add_sample_parameters__()
145

Penom Nom's avatar
Penom Nom committed
146
147
148
        self.samples = []
        self.reads1 = []
        self.reads2 = []
149
        self.readsi = []
Penom Nom's avatar
Penom Nom committed
150
        self.samples_names = []
151
152
        self.reads1_indexes = []
        self.reads2_indexes = []
153

Penom Nom's avatar
Penom Nom committed
154
    def _add_run_parameters(self):
Penom Nom's avatar
Penom Nom committed
155
        self.add_parameter("project_name", "The project name the run belongs to", required = True, type = 'existingproject', group="Run information")
Penom Nom's avatar
Penom Nom committed
156
157
        self.add_parameter("run_name", "Give a name to your run", flag = "--name", required = True, display_name="Name", group="Run information")
        self.add_parameter("run_description", "Give a description to your run", flag = "--description", required = True, display_name="Description", group="Run information")
Penom Nom's avatar
Penom Nom committed
158
        self.add_parameter("run_date", "When were the data produced", flag = "--date", required = True, type = 'date', display_name="Date", group="Run information")
Penom Nom's avatar
Penom Nom committed
159
160
161
162
        self.add_parameter("data_nature", "Are Sequences cDNA, genomique, RNA, ...", required = True, display_name="Data nature", group="Run information")
        self.add_parameter("sequencer", "Which sequencer produced the data", required = True, display_name="Sequencer", group="Run information")
        self.add_parameter("species", "Which species has been sequenced", required = True, display_name="Species", group="Run information")
        self.add_parameter("run_type", "What type of data is it (1 lane, 1 region)", flag = "--type", required = True, display_name="Type", group="Run information")
163
        self.add_parameter("align_subset_reads", "Align only on subset reads", type=bool, default = False)
164

165
    def __add_sample_parameters__(self):
Celine Noirot's avatar
Celine Noirot committed
166
        self.add_multiple_parameter_list("input_sample", "Definition of a sample", flag="--sample",  group="Sample description") # required = True, # TO CHECK casavaWorkflow required not if casava dir
Penom Nom's avatar
Penom Nom committed
167
168
        self.add_parameter("sample_id", "The uniq identifier of the sample", type="nospacestr", add_to = "input_sample")
        self.add_parameter("sample_name", "A descriptive name for the sample", type="nospacestr", add_to = "input_sample")
Penom Nom's avatar
Penom Nom committed
169
        self.add_parameter("sample_description", "A brief description of the sample", add_to = "input_sample")
170
        self.add_parameter("type", "Read orientation and type", choices = Sample.AVAILABLE_TYPES, default='se', add_to = "input_sample")
Penom Nom's avatar
Penom Nom committed
171
172
        self.add_parameter("insert_size", "Insert size for paired end reads", type = int, add_to = "input_sample")
        self.add_parameter("species", "Species related to this sample", add_to = "input_sample")
Penom Nom's avatar
Penom Nom committed
173
        self.add_parameter_list("metadata", "Add metadata to the sample", type='samplemetadata'  ,add_to = "input_sample")
Penom Nom's avatar
Penom Nom committed
174
175
        self.add_input_file_list("read1", "Read 1 data file path", required = True, add_to = "input_sample")
        self.add_input_file_list("read2", "Read 2 data file path", add_to = "input_sample")
ckuchly's avatar
ckuchly committed
176
        self.add_input_file_list("index",  "Index data file path", add_to = "input_sample")
177

Penom Nom's avatar
Penom Nom committed
178
    def __create_samples__(self):
Penom Nom's avatar
Penom Nom committed
179
        for sd in self.input_sample :
180
            sp_object = Sample( sd['sample_id'], sd['read1'], sd['read2'], sd['index'], name = sd['sample_name'], description = sd['sample_description'], type = sd['type'],
Penom Nom's avatar
Penom Nom committed
181
                               insert_size = sd['insert_size'], species = sd['species'] )
182

183
184
185
            for metadata in sd['metadata'] :
                k, v = metadata.split(':', 2)
                sp_object.add_metadata(k, v)
186
            self.samples.append(sp_object)
187

Penom Nom's avatar
Penom Nom committed
188
    def __preprocess_samples__(self):
Penom Nom's avatar
Penom Nom committed
189
190
191
        samples_ids = []
        pidx = 1
        nidx = 1
192
        for rang, sample in enumerate(self.samples) :
Penom Nom's avatar
Penom Nom committed
193
194
195
196
197
            if sample.name :
                self.samples_names.append(sample.name)
            else :
                sample.name = 'SAMPLE_' + str(nidx)
                nidx += 1
198

Penom Nom's avatar
Penom Nom committed
199
200
            if sample.sample_id :
                if sample.sample_id in samples_ids :
201
202
                    display_error_message( "Sample identifier %s must be uniq ( sample %s ).\n" % sample.sample_id, sample.name )
            # add an automatic id for samples
Penom Nom's avatar
Penom Nom committed
203
204
205
206
            else :
                sample.sample_id = 'sample_' + str(pidx)
                pidx += 1
            samples_ids.append(sample.sample_id)
207

208
209
210
            for rfile in sample.reads1 :
                self.reads1_indexes.append(sample.sample_id)
                self.reads1.append(rfile)
211

212
213
214
            for rfile in sample.reads2 :
                self.reads2_indexes.append(sample.sample_id)
                self.reads2.append(rfile)
215
216
217
            
            for rfile in sample.readsi :
                self.readsi.append(rfile)
218

Penom Nom's avatar
Penom Nom committed
219
220
        if len(self.samples_names) != 0 :
            if len(self.samples_names) != len (self.samples) :
221
                display_error_message( "All samples must have a defined sample name" )
222

Penom Nom's avatar
Penom Nom committed
223
    def get_all_reads(self, type = None):
Penom Nom's avatar
Penom Nom committed
224
        if type == 'read1' :
Penom Nom's avatar
Penom Nom committed
225
            return self.reads1
Penom Nom's avatar
Penom Nom committed
226
        elif type == 'read2' :
Penom Nom's avatar
Penom Nom committed
227
            return self.reads2
ckuchly's avatar
ckuchly committed
228
        elif type == 'index' : 
229
230
            return self.readsi
        return self.reads1 + self.reads2 + self.readsi
231

232
233
234
235
236
237
    def get_files_index(self, type = None):
        if type == 'read1' :
            return self.reads1_indexes
        elif type == 'read2' :
            return self.reads2_indexes
        return self.reads1_indexes + self.reads2_indexes
238

239
    def is_paired_end(self):
Penom Nom's avatar
Penom Nom committed
240
        return len(self.reads2) > 0
241

242
    def pre_process(self):
243
        BasicNG6Workflow.pre_process(self)
244
        # start from an existing project
Penom Nom's avatar
Penom Nom committed
245
246
        self.project = Project.get_from_name(self.project_name)
        self.metadata.append("project_id="+str(self.project.id))
247
        # if user is not allowed to add data on project (is not admin)
Penom Nom's avatar
Penom Nom committed
248
        if self.project is not None and not self.project.is_admin(self.admin_login):
249
            display_error_message( "The user login '" + self.admin_login + "' is not allowed to add data on project '" + self.project.name + "'.\n" )
250

251
        # build the run
252
        self.runobj = Run(self.run_name, self.run_date, self.species, self.data_nature,
253
                          self.run_type, self.run_description, self.sequencer, self.project.space_id)
Penom Nom's avatar
Penom Nom committed
254
        self.runobj.admin_login=self.admin_login
255
256
257
        # then add the run to the project
        self.project.add_run(self.runobj)
        self.metadata.append("run_id="+str(self.runobj.id))
258

Penom Nom's avatar
Penom Nom committed
259
        self.__create_samples__()
Penom Nom's avatar
Penom Nom committed
260
        self.__preprocess_samples__()
Penom Nom's avatar
Penom Nom committed
261
262
        # add samples to run
        self.runobj.add_samples(self.samples)
263

264
    def post_process(self):
Jerome Mariette's avatar
Jerome Mariette committed
265
        # once everything done, sync directories
266
        logging.getLogger("ng6").debug("post_process enter")
Jerome Mariette's avatar
Jerome Mariette committed
267
        if self.runobj:
268
            logging.getLogger("ng6").debug("post_process enter self.runobj.sync")
Jerome Mariette's avatar
Jerome Mariette committed
269
            self.runobj.sync()
Jerome Mariette's avatar
Jerome Mariette committed
270
        elif self.project:
271
            logging.getLogger("ng6").debug("post_process enter self.project.sync")
Jerome Mariette's avatar
Jerome Mariette committed
272
            self.project.sync()
273

274

275
def get_files_from_casava(casava_directory, project_name, lane_number):
Penom Nom's avatar
Penom Nom committed
276
277
278
279
280
281
    """
        Retrieve all fastq files of a specific project and lane number from a given casava directory
        @param casava_directory : path to CASAVA output directory
        @param project_name : project name
        @param lane_number : lane number
    """
282

283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
    def bcl2fastq_18(directory, pname, lane):
        """bcl2fastq <= 1.8"""
        files = []
        with open(os.path.join(directory, "SampleSheet.mk")) as fh :
            subdirs_list = []
            for line in fh :
                if line.startswith("l" + str(lane) + "_SUBDIRS"):
                    parts = line.strip().split(":=")
                    subdirs_list = parts[1].split(" ")
            # parse samples
            for subdir in subdirs_list:
                # filter on project name
                if re.match("Project_" + pname + "/Sample_.+", subdir) or subdir.startswith("Undetermined_indices"):
                    for file in os.listdir(directory + "/" + subdir):
                        filepath = directory + "/" + subdir + "/" + file
                        if file.endswith(".fastq.gz") and re.search(".*_L00" + str(lane) + "_.*", file):
                            files.append(filepath);
        return files
ckuchly's avatar
ckuchly committed
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
        
    def bcl2fastq_10X(directory, pname, lane):
        """longranger"""
        files = []
        with open(os.path.join(directory, "SampleSheet_10X.mk")) as fh :
            subdirs_list = []
            for line in fh :
                if line.startswith("l" + str(lane) + "_SUBDIRS"):
                    parts = line.strip().split(":=")
                    subdirs_list = parts[1].split(" ")
            # parse samples
            for subdir in subdirs_list:
                # filter on project name
                if re.match("Project_" + pname + "/Sample_.+", subdir) or subdir.startswith("Undetermined_indices"):
                    for file in os.listdir(directory + "/" + subdir):
                        filepath = directory + "/" + subdir + "/" + file
                        if file.endswith(".fastq.gz") and re.search(".*_L00" + str(lane) + "_.*", file):
                            files.append(filepath);
        return files
320

321
322
323
324
325
326
    def bcl2fastq_216(directory, pname, lane):
        """bcl2fastq >= 1.9"""
        files = []
        tree = ET.parse( os.path.join( directory, 'Stats', 'DemultiplexingStats.xml'))
        root = tree.getroot()
        project = root.find(".//Project[@name='%s']"%pname)
327

328
        if project is not None :
329
330
            project_files = os.listdir(directory + "/" + pname)

331
332
333
334
335
336
337
338
339
340
341
            for sample in project.findall("./Sample") :
                if sample.get('name') != 'all' :
                    for barcode in sample.findall('./Barcode'):
                        if barcode.get('name') != 'all':
                            lnum = int(barcode.find('Lane').get('number'))
                            if lnum == lane :
                                fileregexp = '%s_S\d_L%03d_'%(sample.get('name'), lnum)
                                for pfile in project_files :
                                    if re.match(fileregexp,pfile) :
                                        files.append(os.path.join(directory,pname,pfile))
        return files
342

343
344
345
346
    if os.path.exists(os.path.join(casava_directory, "SampleSheet.mk")) :
        return bcl2fastq_18(casava_directory, project_name, lane_number)
    elif os.path.exists(os.path.join( casava_directory, 'Stats', 'DemultiplexingStats.xml')) :
        return bcl2fastq_216(casava_directory, project_name, lane_number)
ckuchly's avatar
ckuchly committed
347
348
    elif os.path.exists(os.path.join(casava_directory, "SampleSheet_10X.mk")) :
        return bcl2fastq_10X(casava_directory, project_name, lane_number)
349
350
351



352
class CasavaNG6Workflow(NG6Workflow):
353

Penom Nom's avatar
Penom Nom committed
354
355
    def __init__(self, args={}, id=None, function= "process"):
        NG6Workflow.__init__(self, args, id, function)
356

Penom Nom's avatar
Penom Nom committed
357
358
359
        self.group_prefix = None
        self.undetermined_reads1 = []
        self.undetermined_reads2 = []
ckuchly's avatar
ckuchly committed
360
        self.undetermined_index = []
361
        self.log_files = []
362
        self.is_casava = False
ckuchly's avatar
ckuchly committed
363
        self.is_10Xcasava = False
364

365
    def __add_sample_parameters__(self):
366
367
        self.add_multiple_parameter('casava', 'Provide the options to retrieve samples from a CASAVA directory', group="Sample description")
        self.add_input_directory("directory", "Path to the CASAVA directory to use", required=True, get_files_fn=get_files_from_casava, add_to="casava" )
Celine Noirot's avatar
Celine Noirot committed
368
        self.add_parameter("lane", "The lane number to be retrieved from the casava directory",  type='int', add_to="casava") #required=True,
369
        self.add_parameter('project', 'The name of the project to retrieve in casava directory. The default name is the name of the nG6  project',add_to="casava")
370
        self.add_parameter('mismatch_index', 'Set this value to true if the index sequence in the sample fastq files allows at least 1 mismatch',
371
                           type ='bool', add_to="casava")
372
        self.add_parameter_list('select_sample_id', 'The ids of the sample that will be selected in the SampleSheet.mk file. By default all samples are selected.', add_to="casava",
373
                           display_name = 'Selected samples')
374

375
        NG6Workflow.__add_sample_parameters__(self)
376

Celine Noirot's avatar
Celine Noirot committed
377
        #TODO exclude self.add_exclusion_rule("casava", "input_sample")
378

379
        self.add_parameter("compression", "How should the data be compressed once archived", choices= [ "none", "gz", "bz2"], default = "none")
380
        self.add_parameter("keep_reads", "Keep or discard reads which pass the illumina filter. 'all' option will keep all reads", flag = "--keep",
381
382
383
                           choices=[ "pass_illumina_filters", "not_pass_illumina_filters", "all"], default = "pass_illumina_filters")
        self.add_input_file_list("contamination_databank", "Which databank should be used to seek contamination (as to be phiX databank indexed for bwa)")
        self.add_parameter("no_group", "Disables grouping of bases for reads >50bp", type=bool, default = True)
384
385


Penom Nom's avatar
Penom Nom committed
386
    def __create_samples__(self):
387
        """
388
            Create samples object from a casava directory if provided
389
390
391
            @param casava_directory : path to CASAVA output directory
            @param lane_number : files in each sample are sequenced on this lane
        """
392
        logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ entering")
393

394
        if self.casava and self.casava["directory"] and self.casava["lane"] :
395
            logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ self.casava")
396
397
398
399
            self.is_casava = True
            casava_directory = self.casava["directory"]
            lane_number = self.casava["lane"]
            all_samples, all_samples_id = [], []
400
401

            # get the casava project_name
402
403
404
405
            if self.casava["project"] :
                project_name = self.casava["project"]
            else :
                project_name = self.project_name
406

407
408
            project_name = project_name.replace(" ", "_")
            input_files = casava_directory.get_files( project_name, lane_number)
409
            logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ input_files = " + ",".join(input_files))
410
411
            if len(input_files) == 0 :
                raise Exception("Error while parsing casava directory %s, invalid project name '%s' for lane %s"% (casava_directory, project_name, lane_number))
412

413
414
            all_samples, all_samples_id = [], []
            if os.path.exists(os.path.join(casava_directory, "SampleSheet.mk")) :
415

416
                logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ before self._process_casava_18")
417
                all_samples, all_samples_id = self._process_casava_18(casava_directory, project_name, lane_number, input_files)
418
                logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ before self._process_casava_18")
ckuchly's avatar
ckuchly committed
419

420
421
            elif os.path.exists(os.path.join( casava_directory, 'Stats', 'DemultiplexingStats.xml')) :
                all_samples, all_samples_id = self._process_casava_216(casava_directory, project_name, lane_number, input_files)
422

ckuchly's avatar
ckuchly committed
423
424
425
426
427
428
            elif os.path.exists(os.path.join( casava_directory, "SampleSheet_10X.mk")):
                logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ before self._process_casava_10X")
                all_samples, all_samples_id = self._process_casava_10X(casava_directory, project_name, lane_number, input_files)
                self.is_10Xcasava = True
                logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ after self._process_casava_10X")

429
            selected_samples = self.casava['select_sample_id']
ckuchly's avatar
ckuchly committed
430
            logging.getLogger("CasavaNG6Workflow").debug("__create_samples__. all_samples_id = "+", ".join(all_samples_id))
431
432
433
            if selected_samples :
                for sid in selected_samples :
                    assert sid in all_samples_id , "The sample id %s is not in the SampleSheet.mk" % sid
434

Penom Nom's avatar
Penom Nom committed
435
436
            for sample in all_samples :
                if selected_samples :
437
                    if sample.name in selected_samples :
Penom Nom's avatar
Penom Nom committed
438
439
440
                        self.samples.append(sample)
                else :
                    self.samples.append(sample)
441
442
        else :
            NG6Workflow.__create_samples__(self)
443

Penom Nom's avatar
Penom Nom committed
444
445
    def __preprocess_samples__(self):
        NG6Workflow.__preprocess_samples__(self)
ckuchly's avatar
ckuchly committed
446

447
        if self.is_casava:
Penom Nom's avatar
Penom Nom committed
448
            self.group_prefix = list((Utils.get_group_basenames(self.get_all_reads(), "read")).keys())
449
450

    def _process_casava_18(self, casava_directory, project_name, lane_number, input_files):
451
452
        logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 enter")
        logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 casava_directory = " + casava_directory + ", project_name = " + str(project_name))
453
454
        """
            Creates samples from casavadir (<=1.8) using input files
455
456
457
458
            @param casava_directory:
            @param project_name:
            @param lane_number:
            @param input_files:
459
460
461
        """
        all_samples = []
        all_samples_id = []
462

463
464
        # open casava samplesheet again to associate our files with a sample
        with open(os.path.join(casava_directory, "SampleSheet.mk")) as fh :
465
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 SampleSheet.mk exists")
466
467
468
469
470
471
472
473
474
475
476
477
478
            barcodes_list = []
            sample_ids_list = []
            subdirs_list = []
            for line in fh :
                if line.startswith("l" + str(lane_number) + "_BARCODES"):
                    parts = line.strip().split(":=")
                    barcodes_list = [ re.sub( r"[-_\s]+", "", x) for x in  parts[1].split() ]
                elif line.startswith("l" + str(lane_number) + "_SAMPLEIDS" ):
                    parts = line.strip().split(":=")
                    sample_ids_list = parts[1].split(" ")
                elif line.startswith("l" + str(lane_number) + "_SUBDIRS"):
                    parts = line.strip().split(":=")
                    subdirs_list = parts[1].split(" ")
479

480
            assert  len(barcodes_list) == len(sample_ids_list) == len(subdirs_list), "Invalid lane {0} in SampleSheet.mk".format(lane_number)
481
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 SampleSheet.mk parsed")
482
483
484
485
486
487
488
            # parse samples
            for i in range(len(barcodes_list)):
                sample = {
                    'barcode'                   : barcodes_list[i],
                    'sample_id'                 : sample_ids_list[i],
                    'subdir'                    : subdirs_list[i],
                    'reads1'                    : [],
489
490
                    'reads2'                    : [],
                    'readsi'                    : []
491
                }
492

493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
                # filter on project name
                if re.match("Project_" + project_name + "/Sample_.+", sample['subdir']) or sample['subdir'].startswith("Undetermined_indices"):
                    for file in os.listdir(casava_directory + "/" + sample['subdir']):
                        filepath = casava_directory + "/" + sample['subdir'] + "/" + file
                        if file.endswith(".fastq.gz") and re.search(".*_L00" + str(lane_number) + "_.*", file):
                            for idx, iofile in enumerate(input_files) :
                                if iofile == filepath :
                                    if re.search(".*_R1_.*", file):
                                        if not sample['subdir'].startswith("Undetermined_indices"):
                                            sample['reads1'].append(iofile)
                                        else:
                                            self.undetermined_reads1.append(iofile)
                                    if re.search(".*_R2_.*", file):
                                        if not sample['subdir'].startswith("Undetermined_indices"):
                                            sample['reads2'].append(iofile)
                                        else:
                                            self.undetermined_reads2.append(iofile)
                                    input_files.pop(idx)
                                    break

                    if not sample['subdir'].startswith("Undetermined_indices") :
514
                        logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 create sample " + sample['sample_id'])
515
                        sp_object = Sample(sample['barcode'], sample['reads1'], reads2 = sample['reads2'], readsi = [], name=sample['sample_id'])
516
517
                        sp_object.add_metadata('barcode', sample['barcode'])
                        sp_object.add_metadata('is_casava', True)
518

519
520
                        all_samples.append(sp_object)
                        all_samples_id.append(sample['sample_id'])
521
522
523
524
            for file in os.listdir(casava_directory):
                filepath = casava_directory + "/" + file
                if file.endswith(".log"):
                    self.log_files.append(filepath)
Gerald Salin's avatar
Gerald Salin committed
525
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 self.log_files = " + ",".join(self.log_files))
526
527
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 all_samples_id = " + ",".join(all_samples_id))
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 exiting")
ckuchly's avatar
ckuchly committed
528
529
            
        return all_samples, all_samples_id
530

ckuchly's avatar
ckuchly committed
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
    def _process_casava_10X(self,casava_directory, project_name, lane_number, input_files):
        logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_10X enter")
        logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_10X casava_directory = " + casava_directory + ", project_name = " + str(project_name))
        """
            Creates samples from casavadir from longranger demultiplexing
            @param casava_directory:
            @param project_name:
            @param lane_number:
            @param input_files:
        """
        all_samples = []
        all_samples_id = []

        # open casava samplesheet again to associate our files with a sample
        with open(os.path.join(casava_directory, "SampleSheet_10X.mk")) as fh :
            barcodes_list = []
            sample_ids_list = []
            subdirs_list = []
            for line in fh :
                if line.startswith("l" + str(lane_number) + "_BARCODES"):
                    parts = line.strip().split(":=")
                    barcodes_list = [ re.sub( r"[_\s]+", "", x) for x in  parts[1].split() ]
                elif line.startswith("l" + str(lane_number) + "_SAMPLEIDS" ):
                    parts = line.strip().split(":=")
                    sample_ids_list = parts[1].split(" ")
                elif line.startswith("l" + str(lane_number) + "_SUBDIRS"):
                    parts = line.strip().split(":=")
                    subdirs_list = parts[1].split(" ")

ckuchly's avatar
ckuchly committed
560
            assert  len(barcodes_list) == len(sample_ids_list) == len(subdirs_list), "Invalid lane {0} in SampleSheet_10X.mk".format(lane_number)
ckuchly's avatar
ckuchly committed
561
562
563
564
565
566

            cfg_reader = NG6ConfigReader()
            indexs = cfg_reader.get_10X_indexs()
            
            # parse samples
            for i in range(len(barcodes_list)):
ckuchly's avatar
ckuchly committed
567
                
ckuchly's avatar
ckuchly committed
568
569
570
571
                if barcodes_list[i] == 'Undetermined' :
                    barcode = 'Undetermined'
                else : 
                    barcode = indexs[barcodes_list[i]]
ckuchly's avatar
ckuchly committed
572

ckuchly's avatar
ckuchly committed
573
574
575
576
577
578
                sample = {
                    'barcode'                   : barcode,
                    'sample_id'                 : sample_ids_list[i],
                    'subdir'                    : subdirs_list[i],
                    'reads1'                    : [],
                    'reads2'                    : [],
579
                    'readsi'                    : []
ckuchly's avatar
ckuchly committed
580
                }
ckuchly's avatar
ckuchly committed
581
                
ckuchly's avatar
ckuchly committed
582
                # filter on project name
ckuchly's avatar
ckuchly committed
583

ckuchly's avatar
ckuchly committed
584
                if re.match("Project_" + project_name + "/Sample_.+", sample['subdir']) or sample['subdir'].startswith("Undetermined_indices"):
ckuchly's avatar
ckuchly committed
585

ckuchly's avatar
ckuchly committed
586
                    for file in os.listdir(casava_directory + "/" + sample['subdir']):
ckuchly's avatar
ckuchly committed
587

ckuchly's avatar
ckuchly committed
588
                        filepath = casava_directory + "/" + sample['subdir'] + "/" + file
ckuchly's avatar
ckuchly committed
589
                        
ckuchly's avatar
ckuchly committed
590
591
592
                        if file.endswith(".fastq.gz") and re.search(".*_L00" + str(lane_number) + "_.*", file):
                            for idx, iofile in enumerate(input_files) :
                                if iofile == filepath :
ckuchly's avatar
ckuchly committed
593

ckuchly's avatar
ckuchly committed
594
                                    if re.search(".*_R1_.*", file):
ckuchly's avatar
ckuchly committed
595

ckuchly's avatar
ckuchly committed
596
597
598
599
600
601
602
603
604
605
606
                                        if not sample['subdir'].startswith("Undetermined_indices"):
                                            sample['reads1'].append(iofile)
                                        else:
                                            self.undetermined_reads1.append(iofile)
                                    if re.search(".*_R2_.*", file):
                                        if not sample['subdir'].startswith("Undetermined_indices"):
                                            sample['reads2'].append(iofile)
                                        else:
                                            self.undetermined_reads2.append(iofile)
                                    if re.search(".*_I1_.*", file):
                                        if not sample['subdir'].startswith("Undetermined_indices"):
607
                                            sample['readsi'].append(iofile)
ckuchly's avatar
ckuchly committed
608
609
610
611
                                        else:
                                            self.undetermined_index.append(iofile)
                                    input_files.pop(idx)
                                    break
612

ckuchly's avatar
ckuchly committed
613
                    if not sample['subdir'].startswith("Undetermined_indices") :
614
                        sp_object = Sample(sample['barcode'], sample['reads1'], reads2 = sample['reads2'], readsi = sample['readsi'], name=sample['sample_id'])
ckuchly's avatar
ckuchly committed
615
616
617
618
619
620
621
                        sp_object.add_metadata('barcode', sample['barcode'])
                        sp_object.add_metadata('is_casava', True)

                        all_samples.append(sp_object)
                        all_samples_id.append(sample['sample_id'])
            for file in os.listdir(casava_directory):
                filepath = casava_directory + "/" + file
ckuchly's avatar
ckuchly committed
622

ckuchly's avatar
ckuchly committed
623
624
625
626
627
                if file.endswith(".log"):
                    self.log_files.append(filepath)
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_10X all_samples_id = " + ",".join(all_samples_id))
            logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_10X exiting")
            
628
        return all_samples, all_samples_id
629

ckuchly's avatar
ckuchly committed
630

631
632
633
    def _process_casava_216(self,casava_directory, project_name, lane_number, input_files):
        """
            Creates samples from casavadir (>=1.9) using input files
634
635
636
637
            @param casava_directory:
            @param project_name:
            @param lane_number:
            @param input_files:
638
639
        """
        raise NotImplementedError
640

641
    def illumina_process(self):
642
        fastqilluminafilter = None
643
        concatenatefastq = None
644
645
646
        filtered_read1_files = []
        filtered_read2_files = []
        saved_files = []
647
        logging.getLogger("ng6").debug("illumina_process entering")
648
        if self.is_casava :
649
            logging.getLogger("ng6").debug("illumina_process self.is_casava")
650
651
            if len(self.log_files) > 0 :
                add_log = self.add_component("BasicAnalysis", [self.log_files,"Log Files","Log files generated during primary analysis","-","-","-","gz", "","log.gz"])
ckuchly's avatar
ckuchly committed
652
            
653
654
655
            if len(self.undetermined_reads1) > 0 :
                if self.casava['mismatch_index'] :
                    demultiplex_stats = self.add_component("DemultiplexStats", [self.get_all_reads("read1"), self.undetermined_reads1, self.get_files_index('read1')])
ckuchly's avatar
ckuchly committed
656
                elif self.is_10Xcasava :
ckuchly's avatar
ckuchly committed
657
                    demultiplex_stats = self.add_component("Demultiplex10XStats", [self.get_all_reads("read1"), self.undetermined_reads1, self.get_files_index("read1")])
658
                else :
ckuchly's avatar
ckuchly committed
659
660
                    demultiplex_stats = self.add_component("DemultiplexStats", [self.get_all_reads("read1"), self.undetermined_reads1])                 

661

662
            if self.keep_reads != "all" :
663
                logging.getLogger("ng6").debug("illumina_process self.keep_reads != all")
ckuchly's avatar
ckuchly committed
664
                logging.getLogger("ng6").debug("illumina_process BEFORE FASTQILLUMINAFILTER self.get_all_reads() = " + ",".join(self.get_all_reads()))
665
                logging.getLogger("ng6").debug("illumina_process self.group_prefix = " + ",".join(self.group_prefix))
666
                # fastq illumina filter
667
                fastqilluminafilter = self.add_component("FastqIlluminaFilter", [self.runobj,self.get_all_reads(), self.keep_reads, self.group_prefix])
668

ckuchly's avatar
ckuchly committed
669
                
670
671
672
673
674
675
676
677
                # list filtered files
                if self.is_paired_end() :
                    # split read 1 and read 2 from filtered files list
                    [filtered_read1_files, filtered_read2_files] = Utils.split_pair(fastqilluminafilter.fastq_files_filtered, (self.group_prefix is not None))
                else:
                    filtered_read1_files = fastqilluminafilter.fastq_files_filtered
                    filtered_read2_files = []
                filtered_read1_files = sorted(filtered_read1_files)
678
                filtered_read2_files = sorted(filtered_read2_files)
679
680
681
682
            else:
                fastqilluminafilter = None
                filtered_read1_files = self.get_all_reads("read1")
                filtered_read2_files = self.get_all_reads("read2")
683

684
            # archive the files
685
            #TODO : if self.group_prefix == None, the create the output of fastqilluminafilter in the run.get_work_directory()
ckuchly's avatar
ckuchly committed
686

687
            saved_files = filtered_read1_files + filtered_read2_files + self.get_all_reads("index")
688
            logging.getLogger("CasavaNG6Workflow").debug("illumina_process saved_files = " + ",".join(saved_files))
689
690
691
            reads_prefixes = None
            if self.group_prefix != None :
                # concatenate fastq
Penom Nom's avatar
Penom Nom committed
692
                reads_prefixes = list((Utils.get_group_basenames(saved_files, "read")).keys())
693
                logging.getLogger("CasavaNG6Workflow").debug("illumina_process saved_files = " + ",".join(saved_files))
ckuchly's avatar
ckuchly committed
694
                concatenatefastq = self.add_component("ConcatenateFilesGroups", [self.runobj,saved_files,reads_prefixes])
695
                saved_files = concatenatefastq.concat_files
696
                logging.getLogger("CasavaNG6Workflow").debug("illumina_process after concatenatefastq, saved_files = " + ",".join(saved_files))
697

698
        else :
Penom Nom's avatar
Penom Nom committed
699
            reads_prefixes = None
700
701
702
703
            fastqilluminafilter = None
            filtered_read1_files = self.get_all_reads("read1")
            filtered_read2_files = self.get_all_reads("read2")
            saved_files = self.get_all_reads()
704

Penom Nom's avatar
Penom Nom committed
705
706
707
708
709
710
711
712
        # add raw
        addrawfiles = self.add_component("AddRawFiles", [self.runobj, saved_files, self.compression])
        contam = []
        try :
            contam.append(self.get_resource("phix_bwa"))
            contam.append(self.get_resource("ecoli_bwa"))
            contam.append(self.get_resource("yeast_bwa"))
        except : pass
713

Penom Nom's avatar
Penom Nom committed
714
        # contamination_search
715
716
        if contam :
            if self.contamination_databank:  contam.extend(self.contamination_databank)
ckuchly's avatar
ckuchly committed
717
            contamination_search = self.add_component("ContaminationSearch", [filtered_read1_files+filtered_read2_files, contam, reads_prefixes], parent = fastqilluminafilter)
718

719
720
        # make some statistics on raw file
        fastqc = self.add_component("FastQC", [filtered_read1_files+filtered_read2_files, (self.group_prefix is not None), self.no_group, "fastqc.tar.gz"], parent = fastqilluminafilter)
721
        return fastqilluminafilter, filtered_read1_files, filtered_read2_files, saved_files, concatenatefastq