Commit c8757197 authored by Jerome Mariette's avatar Jerome Mariette
Browse files

delete variantcalling workflow

parent addabcaf
#
# Copyright (C) 2015 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from jflow.workflow import Workflow
import os
import sys
from io import StringIO
from configparser import ConfigParser
from jflow import seqio
class VariantCalling (Workflow):
def get_description(self):
return "Variation calling"
def define_parameters(self, function="process"):
self.add_input_file("reference", "The reference used to align reads.", required=True)
# Libraries
self.add_parameter("is_rna", "The sequence in BAM are RNA (add '-recoverDanglingHeads -dontUseSoftClippedBases' to calling options). Keep in mind to change min confidence and quality when you used RNASeq data.", default=False, type="bool" )
self.add_multiple_parameter_list("library", "Library.", required=True)
self.add_input_file("R1", "Path to R1 file.", required=True, add_to="library")
self.add_input_file("R2", "Path to R2 file.", add_to="library")
self.add_parameter("sample", "Sample Name.", add_to="library")
self.add_parameter("sequencer", "The sequencer type.", choices=["HiSeq2000", "ILLUMINA","SLX","SOLEXA","SOLID","454","COMPLETE","PACBIO","IONTORRENT","CAPILLARY","HELICOS","UNKNOWN"], default="HiSeq2000", add_to="library")
self.add_input_file("sequenced_regions", "Bed file which describe which region are sequenced. this information is used to reduce analysis region.")
# Parameter GATK
self.add_parameter( "global_calling", "The variants discovery is lead with all samples and not sample by sample.", default=False, type="bool" )
self.add_parameter( "min_call_confidence", "The minimum confidence needed for a given base for it to be used in variant calling.", default=30, type="float" )
self.add_parameter( "min_emit_confidence", "This argument allows you to emit low quality calls as filtered records.", default=10, type="float" )
self.add_parameter( "min_qual_score", "This argument allows you to emit low quality calls as filtered records.", default=10, type="int" )
self.add_parameter("two_steps_calling", "The SNP calling is realised in two step. The first step (recalibration, calling, filter) has hard filters. The second step (recalibration, calling, filter) has standard filters and the variants detected in the first step are used as database of known polymorphic sites.",
display_name="Two steps for SNP calling", type="bool", default=False, group="SNP Calling")
self.add_input_file("known_snp", "Known snp for the reference.", group="SNP Calling")
self.add_exclusion_rule( "known_snp", "two_steps_calling" )
def update_sequenced_regions(self):
if self.sequenced_regions == None and self.global_calling:
self.sequenced_regions = self.get_temporary_file(".intervals")
FH_sequenced_regions = open(self.sequenced_regions, "w")
reader = seqio.FastaReader(self.reference)
for id, desc, seq, qualities in reader:
FH_sequenced_regions.write( id + "\n" )
FH_sequenced_regions.close()
def process(self):
JAVA_HUGE_MEM = 2
self.update_sequenced_regions()
samples = dict()
lib_paired_idx = 0
lib_single_idx = 0
single_fastq, pair1_fastq, pair2_fastq = [], [], []
single_librairies_sequencers, pair_librairies_sequencers = [], []
for lib_arg in self.library:
if lib_arg["sample"] != None and not lib_arg["sample"] in samples:
samples[lib_arg["sample"]] = { "paired": list(), "single": list() }
if lib_arg["R2"] != None and lib_arg["R2"] != "":
if lib_arg["sample"] != None:
samples[lib_arg["sample"]]["paired"].append( lib_paired_idx )
pair1_fastq.append(lib_arg["R1"])
pair2_fastq.append(lib_arg["R2"])
if lib_arg["sequencer"].lower().startswith("hiseq") or lib_arg["sequencer"].lower().startswith("miseq"):
pair_librairies_sequencers.append( "ILLUMINA" )
else:
pair_librairies_sequencers.append( lib_arg["sequencer"] )
lib_paired_idx += 1
else:
if lib_arg["sample"] != None:
samples[lib_arg["sample"]]["single"].append( lib_single_idx )
single_fastq.append(lib_arg["R1"])
if lib_arg["sequencer"].lower().startswith("hiseq") or lib_arg["sequencer"].lower().startswith("miseq"):
single_librairies_sequencers.append( "ILLUMINA" )
else:
single_librairies_sequencers.append( lib_arg["sequencer"] )
lib_single_idx += 1
# Index
# ##############################
databank_fai= self.reference
databank_bwa= self.reference
if not os.path.exists(self.reference + ".fai") or not os.path.exists(os.path.splitext(self.reference)[0] + ".dict"):
index_fai_dict = self.add_component( "IndexFaiDict", [self.reference] )
databank_fai = index_fai_dict.databank
if not os.path.exists(self.reference + ".bwt"):
index_bwa = self.add_component( "BWAIndex", [self.reference] )
databank_bwa = index_bwa.databank
# Change quality offset
# ##############################
# Paired-end reads
pair1_fastq_qual = pair1_fastq
pair2_fastq_qual = pair2_fastq
if len(pair1_fastq) > 0:
change_offset_paired = self.add_component( "ChangeQualOffset", [pair1_fastq, pair2_fastq], component_prefix="paired" )
pair1_fastq_qual = change_offset_paired.R1_out_files
pair2_fastq_qual = change_offset_paired.R2_out_files
# Single reads
single_fastq_qual = single_fastq
if len(single_fastq) > 0:
change_offset_paired = self.add_component( "ChangeQualOffset", [single_fastq], component_prefix="single" )
single_fastq_qual = change_offset_paired.R1_out_files
# Alignment
# ##############################
aln_algorithm = "mem"
all_bams = []
all_bais = []
# Paired-end reads
if len(pair2_fastq_qual)>0 and len(pair1_fastq_qual)>0 :
align_paired = self.add_component( "BWA", [databank_bwa, pair1_fastq_qual, pair2_fastq_qual, None, aln_algorithm], component_prefix="paired" )
index_bam_paired = self.add_component( "SamtoolsIndex", [align_paired.bam_files, 2, 2], component_prefix="paired" )
all_bams = all_bams + index_bam_paired.sorted_bams
all_bais = all_bais + index_bam_paired.output_files
# Single reads
if len(single_fastq)>0 :
align_single = self.add_component( "BWA", [databank_bwa, single_fastq_qual, None, None, aln_algorithm], component_prefix="single" )
index_bam_single = self.add_component( "SamtoolsIndex", [align_single.bam_files, 2, 2], component_prefix="single" )
all_bams = all_bams + index_bam_single.sorted_bams
all_bais = all_bais + index_bam_single.output_files
# Variant calling and annotation
##############################
# Pre-process
spliced_aln = (aln_algorithm == "mem")
if len(single_fastq) > 0:
variant_preprocess_single = self.add_component("VariantPreprocess", [databank_fai, index_bam_single.sorted_bams, single_librairies_sequencers, False, 30, JAVA_HUGE_MEM], component_prefix="single" )
gatk_preprocess_single = self.add_component("GatkPreprocess", [variant_preprocess_single.output_files, variant_preprocess_single.index_files, databank_fai, JAVA_HUGE_MEM, spliced_aln, None, self.sequenced_regions], component_prefix="single" )
if len(pair2_fastq) > 0:
variant_preprocess_paired = self.add_component("VariantPreprocess", [databank_fai, index_bam_paired.sorted_bams, pair_librairies_sequencers, True, 30, JAVA_HUGE_MEM], component_prefix="paired" )
gatk_preprocess_paired = self.add_component("GatkPreprocess", [variant_preprocess_paired.output_files, variant_preprocess_paired.index_files, databank_fai, JAVA_HUGE_MEM, spliced_aln, None, self.sequenced_regions], component_prefix="paired" )
# Merge BAM with the same sample
gatk_preprocess_bams = list()
processed_idx_paired = dict()
processed_idx_single = dict()
for sample_name in list(samples.keys()):
current_sample = samples[sample_name]
bams = list()
for idx_bam in current_sample['paired']:
bams.append( gatk_preprocess_paired.output_files[idx_bam] )
processed_idx_paired[idx_bam] = True
for idx_bam in current_sample['single']:
bams.append( gatk_preprocess_single.output_files[idx_bam] )
processed_idx_single[idx_bam] = True
merged_bam = self.add_component("SamtoolsMerge", [bams, sample_name], component_prefix=sample_name )
gatk_preprocess_bams.append( merged_bam.merged_bam )
if len(pair2_fastq) > 0:
for idx, bam_path in enumerate(gatk_preprocess_paired.output_files):
if not idx in processed_idx_paired:
gatk_preprocess_bams.append( bam_path )
if len(single_fastq) > 0:
for idx, bam_path in enumerate(gatk_preprocess_single.output_files):
if not idx in processed_idx_single:
gatk_preprocess_bams.append( bam_path )
# Variant calling
pre_vcf_file = self.known_snp
if self.two_steps_calling: # if known_snp is not provide and two_steps_calling True
# SNP calling with hards parameters to produce a database of safe polymorphic sites
gatk_recalibration_pre_step = self.add_component("GatkRecalibration", [gatk_preprocess_bams, databank_fai, None, self.sequenced_regions, JAVA_HUGE_MEM], component_prefix="pre_step")
gatk_haplotype_caller_pre_step = self.add_component("GatkHaplotypeCaller", [gatk_recalibration_pre_step.output_files, databank_fai, self.min_call_confidence+10, self.min_emit_confidence+10, self.min_qual_score, self.is_rna, self.sequenced_regions, JAVA_HUGE_MEM, self.global_calling], component_prefix="pre_step")
gatk_filter_pre_step = self.add_component("GatkVariantFilter", [gatk_haplotype_caller_pre_step.output_file, databank_fai,
'-window 35 -cluster 3 --filterExpression "QD < 2.0 || FS > 100.0 || MQ < 40.0 || HaplotypeScore > 13.0 || MappingQualityRankSum < -12.5 || ReadPosRankSum < -8.0" --filterName "hard_filter"',
'-window 35 -cluster 3 --filterExpression "QD < 2.0 || FS > 200.0 || ReadPosRankSum < -20.0" --filterName "hard_filter"',
JAVA_HUGE_MEM],
component_prefix="pre_step")
pre_vcf_file = gatk_filter_pre_step.output_file
# SNP calling with standards parameters to produce finals variants
gatk_recalibration = self.add_component("GatkRecalibration", [gatk_preprocess_bams, databank_fai, pre_vcf_file, self.sequenced_regions, JAVA_HUGE_MEM], component_prefix="final")
gatk_haplotype_caller = self.add_component("GatkHaplotypeCaller", [gatk_recalibration.output_files, databank_fai, self.min_call_confidence, self.min_emit_confidence, self.min_qual_score, self.is_rna, self.sequenced_regions, JAVA_HUGE_MEM, self.global_calling], component_prefix="final")
gatk_filter = self.add_component("GatkVariantFilter", [gatk_haplotype_caller.output_file, databank_fai,
'-window 18 -cluster 3 -filterName FS --filterExpression \"FS > 30.0\"',
'-window 18 -cluster 3 -filterName FS --filterExpression \"FS > 30.0\"',
JAVA_HUGE_MEM],
component_prefix="final")
@staticmethod
def config_parser(arg_lines):
config = ConfigParser()
config.readfp(StringIO('\n'.join(arg_lines)))
arguments = []
section = 'General'
if config.has_section(section):
if config.has_option(section, 'reference'):
arguments.extend( [ '--reference', config.get(section, 'reference') ] )
if config.has_option(section, 'global_calling') and config.getboolean(section, 'global_calling'):
arguments.extend( [ '--global-calling' ] )
if config.has_option(section, 'is_rna') and config.getboolean(section, 'is_rna'):
arguments.extend( [ '--is-rna' ] )
if config.has_option(section, 'min_call_confidence'):
arguments.extend( [ '--min-call-confidence', config.get(section, 'min_call_confidence') ] )
if config.has_option(section, 'min_emit_confidence'):
arguments.extend( [ '--min-emit-confidence', config.get(section, 'min_emit_confidence') ] )
if config.has_option(section, 'min_emit_confidence'):
arguments.extend( [ '--min-qual-score', config.get(section, 'min_qual_score') ] )
section = 'Libraries'
if config.has_section(section):
libraries = dict()
items = config.items(section)
for tag, value in items:
lib_id, param = tag.split('.')
if not lib_id in libraries:
libraries[lib_id] = list()
if param == 'r1':
libraries[lib_id].append( 'R1=' + value )
if param == 'r2':
libraries[lib_id].append( 'R2=' + value )
if param == 'sample':
libraries[lib_id].append( 'sample=' + value )
if param == 'sequencer':
libraries[lib_id].append( 'sequencer=' + value )
if param == 'sequenced_regions':
libraries[lib_id].append( 'sequenced-regions=' + value )
for current_lib in libraries:
arguments.append('--library')
for param in libraries[current_lib]:
arguments.append( param )
section = 'SNP calling'
if config.has_section(section):
if config.has_option(section, 'known_snp'):
arguments.extend( [ '--known-snp', config.get(section, 'known_snp') ] )
if config.has_option(section, 'two_steps_calling') and config.getboolean(section, 'two_steps_calling'):
arguments.extend( [ '--two-steps-calling' ] )
return arguments
\ No newline at end of file
#!/usr/bin/env python2.7
#
# Copyright (C) 2014 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
__author__ = 'Frederic Escudie - Plateforme bioinformatique Toulouse'
__copyright__ = 'Copyright (C) 2014 INRA'
__license__ = 'GNU General Public License'
__version__ = '0.2.0'
__email__ = 'support.genopole@toulouse.inra.fr'
__status__ = 'dev'
import os
import gzip
import argparse
##################################################################################################################################################
#
# FUNCTIONS
#
##################################################################################################################################################
def is_gzip( file ):
"""
@return: [bool] True if the file is gziped.
@param file : [str] Path to processed file.
"""
is_gzip = None
FH_input = gzip.open( file )
try:
FH_input.readline()
is_gzip = True
except:
is_gzip = False
finally:
FH_input.close()
return is_gzip
class Sequence:
def __init__(self, id, string, description=None, quality=None):
"""
@param id : [str] Id of the sequence.
@param string : [str] Sequence of the sequence.
@param description : [str] The sequence description.
@param quality : [str] The quality of the sequence (same length as string).
"""
self.id = id
self.description = description
self.string = string
self.quality = quality
class FastqIO:
def __init__( self, filepath, mode="r" ):
"""
@param filepath : [str] The filepath.
@param mode : [str] Mode to open the file ('r', 'w', 'a').
"""
self.filepath = filepath
self.mode = mode
if (mode in ["w", "a"] and filepath.endswith('.gz')) or (mode not in ["w", "a"] and is_gzip(filepath)):
self.file_handle = gzip.open( filepath, mode )
else:
self.file_handle = open( filepath, mode )
self.current_line_nb = 1
self.current_line = None
def __del__( self ):
self.close()
def close( self ) :
if hasattr(self, 'file_handle') and self.file_handle is not None:
self.file_handle.close()
self.file_handle = None
self.current_line_nb = None
def __iter__( self ):
seq_id = None
seq_desc = None
seq_str = None
seq_qual = None
try:
for line in self.file_handle:
line = line.rstrip()
if (self.current_line_nb % 4) == 1:
fields = line[1:].split(None, 1)
seq_id = fields[0]
seq_desc = fields[1] if len(fields) == 2 else None
elif (self.current_line_nb % 4) == 2:
seq_str = line
elif (self.current_line_nb % 4) == 0:
seq_qual = line
yield Sequence( seq_id, seq_str, seq_desc, seq_qual )
self.current_line_nb += 1
except:
raise IOError( "The line " + str(self.current_line_nb) + " in '" + self.filepath + "' cannot be parsed by " + self.__class__.__name__ + "." )
#//////////////////////
def next_seq( self ):
"""
@summary : Returns the next sequence.
@return : [Sequence] The next sequence.
"""
seq_record = None
try:
# Header
header = self.file_handle.readline().strip()
fields = header[1:].split(None, 1)
seq_id = fields[0]
seq_desc = fields[1] if len(fields) == 2 else None
self.current_line_nb += 1
# Sequence
seq_str = self.file_handle.readline().strip()
self.current_line_nb += 1
# Separator
separator = self.file_handle.readline()
self.current_line_nb += 1
# Quality
seq_qual = self.file_handle.readline().strip()
self.current_line_nb += 1
# Record
seq_record = Sequence( seq_id, seq_str, seq_desc, seq_qual )
except:
raise IOError( "The line " + str(self.current_line_nb) + " in '" + self.filepath + "' cannot be parsed by " + self.__class__.__name__ + ".\n"
+ "content : " + line )
return seq_record
@staticmethod
def is_valid(filepath):
try:
FH_in = FastqIO(filepath)
seq_idx = 0
previous = None
while seq_idx < 10 and (seq_idx != 0 and previous is not None):
previous = FH_in.next_seq()
seq_idx += 1
return True
except:
return False
def write( self, sequence_record ):
self.file_handle.write( self.seqToFastqLine(sequence_record) + "\n" )
def seqToFastqLine( self, sequence ):
"""
@summary : Returns the sequence in fastq format.
@param sequence : [Sequence] The sequence to process.
@return : [str] The sequence.
"""
seq = "@" + sequence.id + (" " + sequence.description if sequence.description is not None else "")
seq += "\n" + sequence.string
seq += "\n+"
seq += "\n" + sequence.quality
return seq
def get_offset( min_ascii, max_ascii ):
"""
@summary: Returns the quality offset corresponding to an ascii interval.
@param min_ascii: [int] The int representation of the minimum ascii find.
@param max_ascii: [int] The int representation of the maximum ascii find.
@return: [int] The quality offset.
"""
if min_ascii >= 59:
return 64
else:
return 33
def get_files_offset( paths ):
"""
@summary: Returns the quality offset deduced from the files.
@param paths: [list] Files to prase (R1 or R1 and R2).
@return: [int] The quality offset.
"""
min_ascii = 8000
max_ascii = -8000
for current_path in paths:
FH_in = FastqIO( current_path )
for record in FH_in:
for qual in list(record.quality):
ascii_nb = ord(qual)
if ascii_nb > max_ascii:
max_ascii = ascii_nb
if ascii_nb < min_ascii:
min_ascii = ascii_nb
FH_in.close()
return get_offset( min_ascii, max_ascii )
def change_offset( in_path, out_path, offset_diff ):
"""
@summary: Writes a file with the new quality offset from a fastq.
@param in_path: [str] The initial fastq.
@param out_path: [str] The output file.
@param offset_diff: [int] The offset difference (example: -31 for 'Solexa' to 'Illumina 1.8+').
"""
FH_in = FastqIO( in_path )
FH_out = FastqIO( out_path, "w" )
for record in FH_in:
new_qual = ""
for qual in list(record.quality):
new_qual += chr(ord(qual) + offset_diff)
record.quality = new_qual
FH_out.write( record )
FH_in.close()
FH_out.close()
##################################################################################################################################################
#
# MAIN
#
##################################################################################################################################################
if __name__ == "__main__":
# Manage parameters
parser = argparse.ArgumentParser( description='Changes quality offset of the input file or the pair of input files.' )
parser.add_argument( '-i1', '--R1-file', required=True, help='The reads_1 file (format : FASTQ).' )
parser.add_argument( '-i2', '--R2-file', default=None, help='The reads_2 file (format : FASTQ).' )
parser.add_argument( '-o1', '--ouput-R1-file', required=True, help='The reads_1 file after recode.' )
parser.add_argument( '-o2', '--ouput-R2-file', default=None, help='The reads_2 file after recode.' )
parser.add_argument( '-nq', '--new-qual-offset', type=int, default=33, required=True, help='The new quality offset (example: 33 for Illumina 1.8+).' )
parser.add_argument( '-oq', '--old-qual-offset', type=int, default=None, help='The old quality offset (example: 64 for Solexa). By default, this value is automatically deduced from your reads files.' )
parser.add_argument( '-v', '--version', action='version', version=__version__ )
args = parser.parse_args()
# Retrieve offset diff
offset_diff = None
if args.old_qual_offset is not None:
offset_diff = args.new_qual_offset - args.old_qual_offset
else:
files_offset = None
if args.R2_file is None:
files_offset = get_files_offset( [args.R1_file] )
else:
files_offset = get_files_offset( [args.R1_file, args.R2_file] )
offset_diff = args.new_qual_offset - files_offset
# Change file(s)
if offset_diff == 0:
if os.path.exists(args.ouput_R1_file): os.unlink(args.ouput_R1_file)
os.symlink( args.R1_file, args.ouput_R1_file )
if args.R2_file is not None:
if os.path.exists(args.ouput_R2_file): os.unlink(args.ouput_R2_file)
os.symlink( args.R2_file, args.ouput_R2_file )
else:
change_offset( args.R1_file, args.ouput_R1_file, offset_diff )
if args.R2_file is not None:
change_offset( args.R2_file, args.ouput_R2_file, offset_diff )
\ No newline at end of file
#
# Copyright (C) 2015 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
\ No newline at end of file
#
# Copyright (C) 2015 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
from subprocess import Popen, PIPE
from jflow.component import Component
from jflow.abstraction import MultiMap
from jflow.utils import get_argument_pattern
from weaver.function import ShellFunction
from weaver.abstraction import Map
class BWA (Component):
def define_parameters(self, reference_genome, read1, read2=None, group_prefix=None, algorithm="aln"):
"""
@param reference_genome : [str] Path to the reference genome (it must be indexed).
@param read1 : [list] Paths to reads 1.
@param read2 : [list] Paths to reads 2.
@param group_prefix : [list] The component produces one bam by prefix. Each bam is a merge of all files with the correct prefix.
@param algorithm : [str] Algorithm for the alignment (aln or bwasw or mem).
"""
# Parameters
self.add_parameter_list( "group_prefix", "The component produces one bam by prefix. Each bam is a merge of all files with the correct prefix.", default=group_prefix )
self.add_parameter( "algorithm", "Algorithm for the alignment : aln or bwasw or mem.", default=algorithm, choices=["aln", "bwasw", "mem"] )
# Files