Commit 81cbb809 authored by Gerald Salin's avatar Gerald Salin
Browse files

subset component finalized / parallelized

parent 186b5cc6
......@@ -20,56 +20,45 @@ import logging
from subprocess import Popen, PIPE
from jflow.utils import get_argument_pattern
from weaver.function import ShellFunction
from weaver.function import PythonFunction
from jflow.abstraction import MultiMap
from ng6.analysis import Analysis
from ng6.utils import Utils
def extract_random_seq(extract_rate, min_nb_seq, max_nb_seq, *files):
def get_number_of_reads_to_extract(extract_rate,min_nb_seq,max_nb_seq, filename, *files):
import jflow.seqio as seqio
import logging
import os
logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. Entering")
import os
# Check parameters
extract_rate = float(extract_rate)
min_nb_seq = int(min_nb_seq)
max_nb_seq = int(max_nb_seq)
logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. extract_rate = " + str(extract_rate))
logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. min_nb_seq = " + str(min_nb_seq))
logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. max_nb_seq = " + str(max_nb_seq))
logging.getLogger("SubsetSeqFiles").debug("get_number_of_reads_to_extract. extract_rate = " + str(extract_rate))
logging.getLogger("SubsetSeqFiles").debug("get_number_of_reads_to_extract. min_nb_seq = " + str(min_nb_seq))
logging.getLogger("SubsetSeqFiles").debug("get_number_of_reads_to_extract. max_nb_seq = " + str(max_nb_seq))
logging.getLogger("SubsetSeqFiles").debug("get_number_of_reads_to_extract. filename = " + filename)
if extract_rate > 1 or extract_rate <= 0:
raise Exception("[ERROR] : the extract_rate value is not correct! (Should be between 0 and 1)")
if int(min_nb_seq) > int(max_nb_seq):
raise Exception("[ERROR] : the threshold values are not correct ! (Minimum threshold is bigger than Maximum threshold)")
nb_files_pair = len(files)/2
if int(nb_files_pair) != nb_files_pair:
raise Exception("[ERROR] : the number of files is not correct! (Each sequence_files should correspond to an sub_sequence_files : [file1, file2, sub_file1, sub_file2])")
nb_files_pair = int(nb_files_pair)
sequence_files = files[:nb_files_pair]
outputs = files[nb_files_pair:]
logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. Number of files " + str(nb_files_pair))
logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. sequence_files = " + ", ".join(sequence_files))
logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. outputs " + ", ".join(outputs))
# Compute min_nb_seq, max_nb_seq, nbsubseq, totalseq
nb_seq = 0
min_file_size_for_reference = 0
reference_file = ""
for curr_sequence_file in sequence_files:
logging.getLogger("SubsetSeqFiles").debug("Working on"+curr_sequence_file+" to determine the number of reads ")
logging.getLogger("SubsetSeqFiles").debug("Size of "+curr_sequence_file+" = "+str(os.stat(curr_sequence_file).st_size))
for curr_sequence_file in files:
logging.getLogger("SubsetSeqFiles").debug("get_number_of_reads_to_extract. Working on"+curr_sequence_file+" to determine the number of reads ")
logging.getLogger("SubsetSeqFiles").debug("get_number_of_reads_to_extract. Size of "+curr_sequence_file+" = "+str(os.stat(curr_sequence_file).st_size))
if min_file_size_for_reference == 0 or os.stat(curr_sequence_file).st_size < min_file_size_for_reference:
min_file_size_for_reference = os.stat(curr_sequence_file).st_size
reference_file = curr_sequence_file
logging.getLogger("SubsetSeqFiles").debug("Reference file (smallest) is "+reference_file)
logging.getLogger("SubsetSeqFiles").debug("get_number_of_reads_to_extract. Reference file (smallest) is "+reference_file)
reader = seqio.SequenceReader(reference_file)
for id, desc, seq, qual in reader :
nb_seq += 1
nb_seq += 1
nb_seq_to_extract = int(nb_seq * float(extract_rate))
logging.getLogger("SubsetSeqFiles").debug("Total number of reads for the smallest seq file " + str(nb_seq))
logging.getLogger("SubsetSeqFiles").debug("Number of reads to extract with the formula (nb_seq * float(extract_rate)) " + str(nb_seq_to_extract))
logging.getLogger("SubsetSeqFiles").debug("get_number_of_reads_to_extract. Total number of reads for the smallest seq file " + str(nb_seq))
logging.getLogger("SubsetSeqFiles").debug("get_number_of_reads_to_extract. Number of reads to extract with the formula (nb_seq * float(extract_rate)) " + str(nb_seq_to_extract))
# Setting the thresholds if necessary
if nb_seq_to_extract > max_nb_seq:
nb_seq_to_extract = max_nb_seq
......@@ -78,36 +67,99 @@ def extract_random_seq(extract_rate, min_nb_seq, max_nb_seq, *files):
if nb_seq < nb_seq_to_extract :
nb_seq_to_extract = nb_seq
step = int(nb_seq / nb_seq_to_extract)
logging.getLogger("SubsetSeqFiles").debug("Number of reads to extract after checking min and max reads allowed " + str(nb_seq_to_extract))
logging.getLogger("SubsetSeqFiles").debug("Step " + str(step))
f = open(filename, 'w')
f.write(str(step)+":"+str(nb_seq_to_extract)) # python will convert \n to os.linesep
f.close()
#def extract_random_seq(output_file,input_file,output_file):
def extract_random_seq(input_file,info_file,output_file):
import jflow.seqio as seqio
import logging
import os
logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. Entering")
with open(info_file, 'r') as myfile:
step,nb_seq_to_extract=myfile.read().split(':')
logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. Content of " + info_file + " step = " + step + ", nb_seq_to_extract = " + nb_seq_to_extract)
step= int(step)
nb_seq_to_extract= int(nb_seq_to_extract)
# Check parameters
# extract_rate = float(extract_rate)
# min_nb_seq = int(min_nb_seq)
# max_nb_seq = int(max_nb_seq)
# logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. extract_rate = " + str(extract_rate))
# logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. min_nb_seq = " + str(min_nb_seq))
# logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. max_nb_seq = " + str(max_nb_seq))
#
# if extract_rate > 1 or extract_rate <= 0:
# raise Exception("[ERROR] : the extract_rate value is not correct! (Should be between 0 and 1)")
# if int(min_nb_seq) > int(max_nb_seq):
# raise Exception("[ERROR] : the threshold values are not correct ! (Minimum threshold is bigger than Maximum threshold)")
# nb_files_pair = len(files)/2
# if int(nb_files_pair) != nb_files_pair:
# raise Exception("[ERROR] : the number of files is not correct! (Each sequence_files should correspond to an sub_sequence_files : [file1, file2, sub_file1, sub_file2])")
# nb_files_pair = int(nb_files_pair)
# sequence_files = files[:nb_files_pair]
# outputs = files[nb_files_pair:]
# logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. Number of files " + str(nb_files_pair))
# logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. sequence_files = " + ", ".join(sequence_files))
# logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. outputs " + ", ".join(outputs))
# # Compute min_nb_seq, max_nb_seq, nbsubseq, totalseq
# nb_seq = 0
# min_file_size_for_reference = 0
# reference_file = ""
# for curr_sequence_file in sequence_files:
# logging.getLogger("SubsetSeqFiles").debug("Working on"+curr_sequence_file+" to determine the number of reads ")
# logging.getLogger("SubsetSeqFiles").debug("Size of "+curr_sequence_file+" = "+str(os.stat(curr_sequence_file).st_size))
# if min_file_size_for_reference == 0 or os.stat(curr_sequence_file).st_size < min_file_size_for_reference:
# min_file_size_for_reference = os.stat(curr_sequence_file).st_size
# reference_file = curr_sequence_file
# logging.getLogger("SubsetSeqFiles").debug("Reference file (smallest) is "+reference_file)
# reader = seqio.SequenceReader(reference_file)
# for id, desc, seq, qual in reader :
# nb_seq += 1
# nb_seq_to_extract = int(nb_seq * float(extract_rate))
# logging.getLogger("SubsetSeqFiles").debug("Total number of reads for the smallest seq file " + str(nb_seq))
# logging.getLogger("SubsetSeqFiles").debug("Number of reads to extract with the formula (nb_seq * float(extract_rate)) " + str(nb_seq_to_extract))
# # Setting the thresholds if necessary
# if nb_seq_to_extract > max_nb_seq:
# nb_seq_to_extract = max_nb_seq
# elif nb_seq_to_extract < min_nb_seq:
# nb_seq_to_extract = min_nb_seq
# if nb_seq < nb_seq_to_extract :
# nb_seq_to_extract = nb_seq
# step = int(nb_seq / nb_seq_to_extract)
# logging.getLogger("SubsetSeqFiles").debug("Number of reads to extract after checking min and max reads allowed " + str(nb_seq_to_extract))
# logging.getLogger("SubsetSeqFiles").debug("Step " + str(step))
# Create sub_sequence_files
for i in range(0, nb_files_pair):
count = 0
countExtractedReads = 0
# Writes result in fasta file
reader = seqio.SequenceReader(sequence_files[i])
with open(outputs[i], "w") as output:
for id, desc, seq, qual in reader :
count += 1
logging.getLogger("SubsetSeqFiles").debug("count = " + str(count) + ", countExtractedReads = " + str(countExtractedReads))
if count % step == 0 and nb_seq_to_extract > countExtractedReads :
logging.getLogger("SubsetSeqFiles").debug("Add a read in subset")
countExtractedReads += 1
seqio.writefastq(output, [[id, desc, seq, qual]])
if nb_seq_to_extract <= countExtractedReads:
break
#for i in range(0, nb_files_pair):
count = 0
countExtractedReads = 0
# Writes result in fasta file
reader = seqio.SequenceReader(input_file)
logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. creating = " + output_file + ", from = " + input_file)
with open(output_file, "w") as output:
for id, desc, seq, qual in reader :
count += 1
if count % step == 0 and nb_seq_to_extract > countExtractedReads :
logging.getLogger("SubsetSeqFiles").debug("Add a read in subset")
countExtractedReads += 1
seqio.writefastq(output, [[id, desc, seq, qual]])
if nb_seq_to_extract <= countExtractedReads:
break
logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. finished")
class SubsetSeqFiles (Analysis):
def define_parameters(self, read1, read2=None,align_subset_reads=True, max_target_seqs=10,
def define_parameters(self, read1, read2=None, max_target_seqs=10,
extract_rate=0.05, min_nb_seq=2000, max_nb_seq=3000):
#extract_rate=0.05, min_nb_seq=20000, max_nb_seq=1e6):
"""
@param align_subset_reads : [bool] True align only on a subset of the reads
"""
# Parameters
self.add_parameter("align_subset_reads", "align_subset_reads", default=align_subset_reads, type=bool)
self.add_parameter("max_target_seqs", "max_target_seqs", type='int', default=max_target_seqs)
self.add_parameter("extract_rate", "extract_rate", type='float', default=extract_rate)
self.add_parameter("min_nb_seq", "min_nb_seq", type='int', default=min_nb_seq)
......@@ -117,7 +169,7 @@ class SubsetSeqFiles (Analysis):
self.add_input_file_list( "read2", "Which read2 files should be used.", default=read2 )
self.add_output_file_list("subset_read1", "Subset of the read1 file", pattern='{basename_woext}_subset.fastq.gz', items=self.read1)
self.add_output_file_list("subset_read2", "Subset of the read1 file", pattern='{basename_woext}_subset.fastq.gz', items=self.read2)
self.add_output_file("output_file", "output_file", filename="gg.txt" )
def get_version(self):
return '-'
......@@ -134,20 +186,35 @@ class SubsetSeqFiles (Analysis):
#self._save_files(self.bam_files)
def process(self):
logging.getLogger("SubsetSeqFiles").debug("Value of self.align_subset_reads = " + str(self.align_subset_reads))
subset_read1 = self.get_outputs( '{basename_woext}_subset.fastq', self.read1 )
subset_read1_gz = self.get_outputs( '{basename_woext}_subset.fastq.gz', self.read1 )
logging.getLogger("SubsetSeqFiles").debug("before subset of reads")
self.add_python_execution(get_number_of_reads_to_extract, cmd_format='{EXE} {ARG} {OUT} {IN}',
map=False, outputs = self.output_file, inputs=self.read1, arguments=[self.extract_rate,self.min_nb_seq, self.max_nb_seq])
# subset_reads = PythonFunction(extract_random_seq, cmd_format="{EXE} {ARG} {IN} {OUT}")
# for i,o in zip(self.read1,subset_read1 ):
# subset_reads(outputs=o, arguments=[self.output_file], inputs=i)
if self.read2:
subset_read2 = self.get_outputs( '{basename_woext}_subset.fastq', self.read2 )
subset_read2_gz = self.get_outputs( '{basename_woext}_subset.fastq.gz', self.read2 )
self.add_python_execution(extract_random_seq,cmd_format="{EXE} " +
" ".join([str(self.extract_rate),str(self.min_nb_seq), str(self.max_nb_seq)]) + " {IN} {OUT}",
inputs = [self.read1,self.read2], outputs = [subset_read1,subset_read2], map=False)
else:
self.add_python_execution(extract_random_seq,cmd_format="{EXE} " +
" ".join([str(self.extract_rate),str(self.min_nb_seq), str(self.max_nb_seq)]) + " {IN} {OUT}",
inputs = [self.read1], outputs = [subset_read1], map=False)
#MultiMap(subset_reads( inputs = [self.read1,self.read2], outputs = [subset_read1,subset_read2]) )
# for i,o in zip(self.read2,subset_read2 ):
# subset_reads(outputs=o, arguments=self.output_file, inputs=i)
for i,o in zip(self.read1,subset_read1 ):
self.add_python_execution(extract_random_seq,cmd_format="{EXE} {IN} {OUT}",
inputs = [i,self.output_file], outputs = o, map=False)
for i,o in zip(self.read2,subset_read2 ):
self.add_python_execution(extract_random_seq,cmd_format="{EXE} {IN} {OUT}",
inputs = [i,self.output_file], outputs = o, map=False)
# self.add_python_execution(extract_random_seq,cmd_format="{EXE} " + self.output_file + " {IN} {OUT}",
# inputs = [self.read1,self.read2], outputs = [subset_read1,subset_read2], map=False)
# else:
# MultiMap(subset_reads( inputs = [self.read1], outputs = [subset_read1],includes=self.output_file) )
# self.add_python_execution(extract_random_seq,cmd_format="{EXE} " + self.output_file + " {IN} {OUT}",
# inputs = [self.read1], outputs = [subset_read1], map=False)
gzip = ShellFunction( 'gzip $1', cmd_format='{EXE} {IN} {OUT}')
MultiMap(gzip, inputs=[subset_read1], outputs=[subset_read1_gz])
......
......@@ -57,7 +57,7 @@ class IlluminaQualityCheck (CasavaNG6Workflow):
if self.group_prefix != None :
sample_lane_prefixes = list((Utils.get_group_basenames(filtered_read1_files+filtered_read2_files, "lane")).keys())
if self.align_subset_reads:
subset = self.add_component("SubsetSeqFiles", [filtered_read1_files, filtered_read2_files, self.align_subset_reads], parent = fastqilluminafilter)
subset = self.add_component("SubsetSeqFiles", [filtered_read1_files, filtered_read2_files], parent = fastqilluminafilter)
filtered_read1_files = subset.subset_read1
filtered_read2_files = subset.subset_read2
bwa = self.add_component("BWA", [indexed_ref, filtered_read1_files, filtered_read2_files, sample_lane_prefixes, "mem", not self.delete_bam], parent = subset)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment