Commit d6eaacde authored by Penom Nom's avatar Penom Nom
Browse files

update dowload components to properly clos filehandlers. Update CASAVANG6WF...

update dowload components to properly clos filehandlers. Update CASAVANG6WF class to add a funtion that will run all casava specific process (illumina filter, demultiplexstat etc...) This makes the code cleaner for all casavang6workflows. This class has also been updated to be able to use --sample option.
parent 57e891d1
......@@ -803,8 +803,8 @@ class AbstractInputFile(AbstractIOFile):
tmp_directory = os.path.join(jflowconf.get_tmp_directory(), os.path.basename(tempfile.NamedTemporaryFile().name))
os.mkdir(tmp_directory)
file_path = os.path.join(tmp_directory, os.path.basename(uri_object.path))
local_file = open(file_path, 'wb')
if os.path.basename(uri_object.path) is not None and os.path.basename(uri_object.path) != "":
local_file = open(file_path, 'wb')
metadata = opener.info()
file_size = int(metadata.getheaders("Content-Length")[0])
while True:
......
......@@ -20,6 +20,7 @@ import sys
import smtplib
import socket
import math
import shutil
try:
import DNS
......@@ -28,6 +29,27 @@ except:
DNS = None
class ServerError(Exception): pass
def robust_rmtree(path, logger=None, max_retries=6):
"""Robustly tries to delete paths.
Retries several times (with increasing delays) if an OSError
occurs. If the final attempt fails, the Exception is propagated
to the caller.
"""
dt = 1
for i in range(max_retries):
try:
shutil.rmtree(path)
return
except OSError:
if logger:
logger.info('Unable to remove path: %s' % path)
logger.info('Retrying after %d seconds' % dt)
time.sleep(dt)
dt *= 2
# Final attempt, pass any Exceptions up to caller.
shutil.rmtree(path)
def display_error_message(msg):
sys.stderr.write("\033[91mError: "+msg+"\n\033[0m")
sys.exit(1)
......
......@@ -28,7 +28,6 @@ import threading
import types
import logging
import datetime
import shutil
from logging import FileHandler
from ConfigParser import ConfigParser, NoOptionError
......@@ -117,6 +116,7 @@ class Workflow(threading.Thread):
self.end_time = None
self.step = None
self.stderr = None
self.stderrfh = None
self.args = args
self.dynamic_component_present = False
self.__to_address = None
......@@ -387,7 +387,10 @@ class Workflow(threading.Thread):
gr.add_node_attribute(ioparameter.name, ioparameter.display_name)
all_nodes[ioparameter.name] = None
elif issubclass(ioparameter.__class__, MultiParameter):
print ioparameter.sub_parameters
print '-----'
for subparam in ioparameter.sub_parameters:
print '>>>>',subparam.name
gr.add_node(subparam.name)
all_nodes[subparam.name] = None
if issubclass(subparam.__class__, InputFile):
......@@ -430,7 +433,9 @@ class Workflow(threading.Thread):
def delete(self):
if self.get_status() in [self.STATUS_COMPLETED, self.STATUS_FAILED, self.STATUS_ABORTED]:
shutil.rmtree(self.directory)
# first close the open files
self.stderrfh.close()
utils.robust_rmtree(self.directory)
@staticmethod
def config_parser(arg_lines):
......@@ -579,6 +584,8 @@ class Workflow(threading.Thread):
def __setstate__(self, state):
self.__dict__ = state.copy()
self.external_components = self._import_external_components()
self.stderrfh = None
self.stderr = self._set_stderr(True)
threading.Thread.__init__(self, name=self.name)
def __getstate__(self):
......@@ -591,6 +598,9 @@ class Workflow(threading.Thread):
del odict['_Thread__stderr']
if odict.has_key('external_components') :
del odict['external_components']
if odict.has_key('stderrfh') :
self.stderrfh.close()
del odict['stderrfh']
return odict
def set_to_address(self, to_address):
......@@ -632,10 +642,23 @@ class Workflow(threading.Thread):
# envelope header.
s = smtplib.SMTP(smtps, smtpp)
s.ehlo()
s.starttls()
s.login(me, fromp)
s.sendmail(me, [you], msg.as_string())
s.close()
# if the SMTP server does not provides TLS or identification
try:
s.starttls()
s.login(me, fromp)
except smtplib.SMTPHeloError:
logging.getLogger("wf." + str(self.id)).debug("The server didn't reply properly to the HELO greeting.")
except smtplib.SMTPAuthenticationError:
logging.getLogger("wf." + str(self.id)).debug("The server didn't accept the username/password combination.")
except smtplib.SMTPException:
logging.getLogger("wf." + str(self.id)).debug("No suitable authentication method was found, or the server does not support the STARTTLS extension.")
except RuntimeError:
logging.getLogger("wf." + str(self.id)).debug("SSL/TLS support is not available to your Python interpreter.")
except:
logging.getLogger("wf." + str(self.id)).debug("Unhandled error when sending mail.")
finally:
s.sendmail(me, [you], msg.as_string())
s.close()
except:
logging.getLogger("wf." + str(self.id)).debug("Impossible to connect to smtp server '" + smtps + "'")
......@@ -941,17 +964,21 @@ class Workflow(threading.Thread):
sys.stdout.write('{0:>10} {1:>10} {2}\n'.format('', 'COMMAND', node.command))
def _set_stderr(self):
if hasattr(self, "stderr") and self.stderr is not None:
os.rename( self.stderr, os.path.join(self.directory, str(time.time()) + self.STDERR_FILE_NAME + self.OLD_EXTENSION) )
def _set_stderr(self, keep_same_error_file=False):
# if it is specified to keep the same error file, keep with it
if not keep_same_error_file:
if hasattr(self, "stderr") and self.stderr is not None:
os.rename( self.stderr, os.path.join(self.directory, str(time.time()) + self.STDERR_FILE_NAME + self.OLD_EXTENSION) )
stderr = os.path.join(self.directory, self.STDERR_FILE_NAME)
logger = logging.getLogger( "wf." + str(self.id) )
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter( '## %(asctime)s :: %(message)s' )
file_handler = FileHandler(stderr, 'a')
file_handler.setLevel(logging.ERROR)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
if self.stderrfh is None:
self.stderrfh = FileHandler(stderr, 'a')
self.stderrfh.setLevel(logging.ERROR)
self.stderrfh.setFormatter(formatter)
logger.addHandler(self.stderrfh)
return stderr
def _execute_weaver(self, engine_wrapper=None):
......@@ -991,6 +1018,8 @@ class Workflow(threading.Thread):
self._serialize()
try:
nest.execute(self.engine_arguments, exit_on_failure=True)
# close dag_file after execution to avoid nfs troubles
nest.dag_file.close()
except:
self.status = self.STATUS_FAILED
self.end_time = time.time()
......
......@@ -49,7 +49,9 @@ class WorkflowsManager(object):
WorkflowsManager.lock_workflows_file.acquire()
# first load the existing workflows
try:
workflows_dump = pickle.load(open(self.dump_file))
wdfh = open(self.dump_file)
workflows_dump = pickle.load(wdfh)
wdfh.close()
except:
workflows_dump = {}
# then add the new ones
......@@ -57,7 +59,9 @@ class WorkflowsManager(object):
workflows_dump[utils.get_nb_string(workflow.id)] = {"dump_path": workflow.dump_path,
"object": workflow.minimize()}
# and save them
pickle.dump(workflows_dump, open(self.dump_file, "w"))
wdfh = open(self.dump_file, "w")
pickle.dump(workflows_dump, wdfh)
wdfh.close()
WorkflowsManager.lock_workflows_file.release()
def get_available_workflows(self, function="process"):
......@@ -115,7 +119,9 @@ class WorkflowsManager(object):
from jflow.workflow import Workflow
WorkflowsManager.lock_workflows_file.acquire()
try:
all_workflows_dump = pickle.load(open(self.dump_file))
awfh = open(self.dump_file)
all_workflows_dump = pickle.load(awfh)
awfh.close()
except:
all_workflows_dump = {}
rworkflow_id = utils.get_nb_string(workflow_id)
......@@ -131,7 +137,9 @@ class WorkflowsManager(object):
logging.getLogger("jflow").debug("Workflow #" + rworkflow_id + " connot be retrieved in the available workflows!")
raise Exception("Workflow #" + rworkflow_id + " connot be retrieved in the available workflows!")
# and save them
pickle.dump(all_workflows_dump, open(self.dump_file, "w"))
awfh = open(self.dump_file, "w")
pickle.dump(all_workflows_dump, awfh)
awfh.close()
WorkflowsManager.lock_workflows_file.release()
def get_workflow_errors(self, workflow_id):
......@@ -149,7 +157,9 @@ class WorkflowsManager(object):
from jflow.workflow import Workflow
workflows = []
try:
workflows_dump = pickle.load(open(self.dump_file))
awfh = open(self.dump_file)
workflows_dump = pickle.load(awfh)
awfh.close()
except:
workflows_dump = {}
updated_workflows = []
......@@ -181,7 +191,9 @@ class WorkflowsManager(object):
def get_workflow(self, workflow_id):
rworkflow_id = utils.get_nb_string(workflow_id)
try:
workflows_dump = pickle.load(open(self.dump_file))
wdfh = open(self.dump_file)
workflows_dump = pickle.load(wdfh)
wdfh.close()
except:
workflows_dump = {}
if workflows_dump.has_key(rworkflow_id):
......
......@@ -274,14 +274,28 @@ class CasavaNG6Workflow(NG6Workflow):
self.group_prefix = None
self.undetermined_reads1 = []
self.undetermined_reads2 = []
self.is_casava = True
self.is_casava = False
def __add_sample_parameters__(self):
self.add_input_directory("casava_directory", "Path to the CASAVA directory to use", required=True, get_files_fn=get_files_from_casava, group="Sample description")
self.add_parameter("casava_lane", "The lane number to be retrieved from the casava directory", required=True, type=int, group="Sample description")
self.add_parameter('casava_project', 'The name of the project to retrieve in casava directory. The default name is the name of the nG6 project', group="Sample description")
self.add_multiple_parameter('casava', 'Provide the options to retrieve samples from a CASAVA directory', group="Sample description")
self.add_input_directory("directory", "Path to the CASAVA directory to use", required=True, get_files_fn=get_files_from_casava, add_to="casava" )
self.add_parameter("lane", "The lane number to be retrieved from the casava directory", required=True, type='int', add_to="casava")
self.add_parameter('project', 'The name of the project to retrieve in casava directory. The default name is the name of the nG6 project',add_to="casava")
self.add_parameter('mismatch_index', 'Set this value to true if the index sequence in the sample fastq files allows at least 1 mismatch',
type ='bool', add_to="casava")
self.add_parameter_list('select_sample_id', 'The ids of the sample that will be selected in the SampleSheet.mk file. By default all samples are selected.', add_to="casava",
display_name = 'Selected samples')
NG6Workflow.__add_sample_parameters__(self)
self.add_exclusion_rule("casava", "input_sample")
self.add_parameter("compression", "How should the data be compressed once archived", choices= [ "none", "gz", "bz2"], default = "none")
self.add_parameter("keep_reads", "Keep or discard reads which pass the illumina filter. 'all' option will keep all reads", flag = "--keep",
choices=[ "pass_illumina_filters", "not_pass_illumina_filters", "all"], default = "pass_illumina_filters")
self.add_input_file_list("contamination_databank", "Which databank should be used to seek contamination (as to be phiX databank indexed for bwa)")
self.add_parameter("no_group", "Disables grouping of bases for reads >50bp", type=bool, default = True)
self.add_parameter('mismatch_index', 'Set this value to true if the index sequence in the sample fastq files allows at least 1 mismatch', type ='bool', group="Sample description")
def __create_samples__(self):
"""
......@@ -289,75 +303,150 @@ class CasavaNG6Workflow(NG6Workflow):
@param casava_directory : path to CASAVA output directory
@param lane_number : files in each sample are sequenced on this lane
"""
casava_directory = self.casava_directory
lane_number = self.casava_lane
# open casava samplesheet again to associate our files with a sample
with open(os.path.join(casava_directory, "SampleSheet.mk")) as fh :
barcodes_list = []
sample_ids_list = []
subdirs_list = []
for line in fh :
if line.startswith("l" + str(lane_number) + "_BARCODES"):
parts = line.strip().split(":=")
barcodes_list = [ re.sub( r"[-_\s]+", "", x) for x in parts[1].split() ]
elif line.startswith("l" + str(lane_number) + "_SAMPLEIDS" ):
parts = line.strip().split(":=")
sample_ids_list = parts[1].split(" ")
elif line.startswith("l" + str(lane_number) + "_SUBDIRS"):
parts = line.strip().split(":=")
subdirs_list = parts[1].split(" ")
assert len(barcodes_list) == len(sample_ids_list) == len(subdirs_list), "Invalid lane {0} in SampleSheet.mk".format(lane_number)
# get the casava project_name
if self.casava_project :
project_name = self.casava_project
else :
project_name = self.project_name
if self.casava and self.casava["directory"] and self.casava["lane"] :
self.is_casava = True
casava_directory = self.casava["directory"]
lane_number = self.casava["lane"]
all_samples, all_samples_id = [], []
project_name = project_name.replace(" ", "_")
input_files = casava_directory.get_files( lane_number, project_name)
if len(input_files) == 0 :
raise Exception("Error while parsing casava directory %s, invalid project name '%s' for lane %s"% (casava_directory, project_name, lane_number))
# open casava samplesheet again to associate our files with a sample
with open(os.path.join(casava_directory, "SampleSheet.mk")) as fh :
barcodes_list = []
sample_ids_list = []
subdirs_list = []
for line in fh :
if line.startswith("l" + str(lane_number) + "_BARCODES"):
parts = line.strip().split(":=")
barcodes_list = [ re.sub( r"[-_\s]+", "", x) for x in parts[1].split() ]
elif line.startswith("l" + str(lane_number) + "_SAMPLEIDS" ):
parts = line.strip().split(":=")
sample_ids_list = parts[1].split(" ")
elif line.startswith("l" + str(lane_number) + "_SUBDIRS"):
parts = line.strip().split(":=")
subdirs_list = parts[1].split(" ")
assert len(barcodes_list) == len(sample_ids_list) == len(subdirs_list), "Invalid lane {0} in SampleSheet.mk".format(lane_number)
# get the casava project_name
if self.casava["project"] :
project_name = self.casava["project"]
else :
project_name = self.project_name
project_name = project_name.replace(" ", "_")
input_files = casava_directory.get_files( lane_number, project_name)
if len(input_files) == 0 :
raise Exception("Error while parsing casava directory %s, invalid project name '%s' for lane %s"% (casava_directory, project_name, lane_number))
# parse samples
for i in range(len(barcodes_list)):
sample = {
'barcode' : barcodes_list[i],
'sample_id' : sample_ids_list[i],
'subdir' : subdirs_list[i],
'reads1' : [],
'reads2' : []
}
# filter on project name
if re.match("Project_" + project_name + "/Sample_.+", sample['subdir']) or sample['subdir'].startswith("Undetermined_indices"):
for file in os.listdir(casava_directory + "/" + sample['subdir']):
filepath = casava_directory + "/" + sample['subdir'] + "/" + file
if file.endswith(".fastq.gz") and re.search(".*_L00" + str(lane_number) + "_.*", file):
for idx, iofile in enumerate(input_files) :
if iofile == filepath :
if re.search(".*_R1_.*", file):
if not sample['subdir'].startswith("Undetermined_indices"):
sample['reads1'].append(iofile)
else:
self.undetermined_reads1.append(iofile)
if re.search(".*_R2_.*", file):
if not sample['subdir'].startswith("Undetermined_indices"):
sample['reads2'].append(iofile)
else:
self.undetermined_reads2.append(iofile)
input_files.pop(idx)
break
if not sample['subdir'].startswith("Undetermined_indices") :
sp_object = Sample(sample['barcode'], sample['reads1'], reads2 = sample['reads2'], name=sample['sample_id'])
sp_object.add_metadata('barcode', sample['barcode'])
sp_object.add_metadata('is_casava', True)
all_samples.append(sp_object)
all_samples_id.append(sample['sample_id'])
# parse samples
for i in range(len(barcodes_list)):
sample = {
'barcode' : barcodes_list[i],
'sample_id' : sample_ids_list[i],
'subdir' : subdirs_list[i],
'reads1' : [],
'reads2' : []
}
selected_samples = self.casava['select_sample_id']
if selected_samples :
for sid in selected_samples :
assert sid in all_samples_id , "The sample id %s is not in the SampleSheet.mk" % sid
# filter on project name
if re.match("Project_" + project_name + "/Sample_.+", sample['subdir']) or sample['subdir'].startswith("Undetermined_indices"):
for file in os.listdir(casava_directory + "/" + sample['subdir']):
filepath = casava_directory + "/" + sample['subdir'] + "/" + file
if file.endswith(".fastq.gz") and re.search(".*_L00" + str(lane_number) + "_.*", file):
for idx, iofile in enumerate(input_files) :
if iofile == filepath :
if re.search(".*_R1_.*", file):
if not sample['subdir'].startswith("Undetermined_indices"):
sample['reads1'].append(iofile)
else:
self.undetermined_reads1.append(iofile)
if re.search(".*_R2_.*", file):
if not sample['subdir'].startswith("Undetermined_indices"):
sample['reads2'].append(iofile)
else:
self.undetermined_reads2.append(iofile)
input_files.pop(idx)
break
if not sample['subdir'].startswith("Undetermined_indices") :
sp_object = Sample(sample['barcode'], sample['reads1'], reads2 = sample['reads2'], name=sample['sample_id'])
sp_object.add_metadata('barcode', sample['barcode'])
sp_object.add_metadata('is_casava', True)
for sample in all_samples :
if sample.name in selected_samples :
self.samples.append(sp_object)
# no filter, select all
else :
self.samples.append(sp_object)
else :
NG6Workflow.__create_samples__(self)
def __preprocess_samples__(self):
NG6Workflow.__preprocess_samples__(self)
self.group_prefix = (Utils.get_group_basenames(self.get_all_reads(), "read")).keys()
\ No newline at end of file
if self.is_casava:
self.group_prefix = (Utils.get_group_basenames(self.get_all_reads(), "read")).keys()
def illumina_process(self):
if self.is_casava :
if len(self.undetermined_reads1) > 0 :
if self.casava['mismatch_index'] :
demultiplex_stats = self.add_component("DemultiplexStats", [self.get_all_reads("read1"), self.undetermined_reads1, self.get_files_index('read1')])
else :
demultiplex_stats = self.add_component("DemultiplexStats", [self.get_all_reads("read1"), self.undetermined_reads1])
if self.keep_reads != "all" :
# fastq illumina filter
fastqilluminafilter = self.add_component("FastqIlluminaFilter", [self.get_all_reads(), self.keep_reads, self.group_prefix])
# list filtered files
if self.is_paired_end() :
# split read 1 and read 2 from filtered files list
[filtered_read1_files, filtered_read2_files] = Utils.split_pair(fastqilluminafilter.fastq_files_filtered, (self.group_prefix is not None))
else:
filtered_read1_files = fastqilluminafilter.fastq_files_filtered
filtered_read2_files = []
filtered_read1_files = sorted(filtered_read1_files)
filtered_read2_files = sorted(filtered_read2_files)
else:
fastqilluminafilter = None
filtered_read1_files = self.get_all_reads("read1")
filtered_read2_files = self.get_all_reads("read2")
# archive the files
saved_files = filtered_read1_files + filtered_read2_files
reads_prefixes = None
if self.group_prefix != None :
# concatenate fastq
reads_prefixes = (Utils.get_group_basenames(saved_files, "read")).keys()
concatenatefastq = self.add_component("ConcatenateFilesGroups", [saved_files, reads_prefixes])
saved_files = concatenatefastq.concat_files
addrawfiles = self.add_component("AddRawFiles", [self.runobj, saved_files, self.compression])
# make some statistics on raw file
fastqc = self.add_component("FastQC", [filtered_read1_files+filtered_read2_files, (self.group_prefix is not None), self.no_group, "fastqc.tar.gz"], parent = fastqilluminafilter)
contam = []
try :
contam.append(self.get_resource("phix_bwa"))
contam.append(self.get_resource("ecoli_bwa"))
contam.append(self.get_resource("yeast_bwa"))
except : pass
# contamination_search
if contam :
if self.contamination_databank: contam.extend(self.contamination_databank)
contamination_search = self.add_component("ContaminationSearch", [filtered_read1_files+filtered_read2_files, contam, reads_prefixes], parent = fastqilluminafilter)
return fastqilluminafilter, filtered_read1_files, filtered_read2_files, saved_files
else :
return None, self.get_all_reads("read1"), self.get_all_reads("read2"), self.get_all_reads()
\ No newline at end of file
......@@ -38,7 +38,11 @@ def create_archive( ng6_username, email, data_folder, output_folder, ids_dump_pa
t3mysql = t3MySQLdb()
user_id = t3mysql.get_user_id(ng6_username)
prefixed_ids = ";".join(pickle.load(open(ids_dump_path, "rb")))
fh = open(ids_dump_path, "rb")
prefixed_ids = pickle.load(fh)
fh.close()
prefixed_ids = ";".join(prefixed_ids)
src_directories, dest_directories = get_directories_structure_and_content(user_id, data_folder, output_folder, prefixed_ids)
for i, source_dir in enumerate(src_directories):
......@@ -86,8 +90,10 @@ class CreateArchive (Component):
analysis_ids = [ "analyse_%s"%i for i in self.analysis_ids ]
ids_dump_path = self.get_temporary_file(".dump")
pickle.dump( run_ids + data_ids + analysis_ids, open(ids_dump_path, "wb"))
fh = open(ids_dump_path, "wb")
pickle.dump( run_ids + data_ids + analysis_ids, fh)
output_folder = os.path.join(self.output_directory, 'ng6_data')
fh.close()
web_archive = self._webify_outputs(self.archive)
archive = PythonFunction(create_archive, cmd_format = "{EXE} {ARG} {IN} {OUT} ")
......
......@@ -34,7 +34,10 @@ def create_symbolik_link (username, password, ng6_username, data_folder, output_
t3mysql = t3MySQLdb()
user_id = t3mysql.get_user_id(ng6_username)
prefixed_ids = ";".join(pickle.load(open(ids_dump_path, "rb")))
fh = open(ids_dump_path, "rb")
prefixed_ids = pickle.load(fh)
fh.close()
prefixed_ids = ";".join(prefixed_ids)
src_directories, dest_directories = get_directories_structure_and_content(user_id, data_folder, output_folder, prefixed_ids)
with open(user_script, "w") as us:
......@@ -81,7 +84,9 @@ class CreateSymlink (Component):
analysis_ids = [ "analyse_%s"%i for i in self.analysis_ids ]
ids_dump_path = self.get_temporary_file(".dump")
pickle.dump( run_ids + data_ids + analysis_ids, open(ids_dump_path, "wb"))
fh = open(ids_dump_path, "wb")
pickle.dump( run_ids + data_ids + analysis_ids, fh)
fh.close()
createlinks = PythonFunction(create_symbolik_link, cmd_format = "{EXE} {ARG} {IN} {OUT} ")
createlinks(arguments = [ self.username, "'"+self.password+"'", self.login, self.data_directory, self.user_directory],
......
......@@ -33,72 +33,20 @@ class IlluminaDiversityQC (CasavaNG6Workflow):
def define_parameters(self, function="process"):
self.add_parameter("compression", "How should the data be compressed once archived", choices= [ "none", "gz", "bz2"], default = "none")
self.add_input_file("reference_genome", "Which genome should the read being align on")
self.add_input_file_list("databank", "Which databank should be used to seek contamination (as to be phiX databank indexed for bwa)")
self.add_parameter("assignation_databank", "Blast databank to classify a subset of sequences")
self.add_parameter("keep_reads", "Keep or discard reads which pass the illumina filter. all option will keep all reads", flag = "--keep",
choices=[ "pass_illumina_filters", "not_pass_illumina_filters", "all"], default = "pass_illumina_filters")
self.add_parameter("mismatch_ratio", "Maximum allowed ratio between the number of mismatched base pairs and the overlap length.", default = 0.1, type = float, group="JOIN section")
self.add_parameter("min_overlap", "The minimum required overlap length between two reads to provide a confident overlap.", default = 20, type = int, group="JOIN section")
self.add_parameter("max_overlap", "Maximum overlap length expected in approximately 90 percent of read pairs.", default = 55, type = int, group="JOIN section")
def process(self):
if len(self.undetermined_reads1) > 0 :
if self.mismatch_index :
demultiplex_stats = self.add_component("DemultiplexStats", [self.get_all_reads("read1"), self.undetermined_reads1, self.get_files_index('read1')])
else :
demultiplex_stats = self.add_component("DemultiplexStats", [self.get_all_reads("read1"), self.undetermined_reads1])
if self.keep_reads != "all" :
# fastq illumina filter
fastqilluminafilter = self.add_component("FastqIlluminaFilter", [self.get_all_reads(), self.keep_reads, self.group_prefix])
# list filtered files
if self.is_paired_end() :
# split read 1 and read 2 from filtered files list
[filtered_read1_files, filtered_read2_files] = Utils.split_pair(fastqilluminafilter.fastq_files_filtered, (self.group_prefix is not None))
else:
filtered_read1_files = fastqilluminafilter.fastq_files_filtered
filtered_read2_files = []
filtered_read1_files = sorted(filtered_read1_files)
filtered_read2_files = sorted(filtered_read2_files)
else:
fastqilluminafilter = None
filtered_read1_files = self.get_all_reads("read1")
filtered_read2_files = self.get_all_reads("read2")
# archive the files
saved_files = filtered_read1_files + filtered_read2_files
reads_prefixes = None
if self.group_prefix != None:
# concatenate fastq
reads_prefixes = (Utils.get_group_basenames(saved_files, "read")).keys()
concatenatefastq = self.add_component("ConcatenateFilesGroups", [saved_files, reads_prefixes])
saved_files = concatenatefastq.concat_files
addrawfiles = self.add_component("AddRawFiles", [self.runobj, saved_files, self.compression])
def process(self):
fastqilluminafilter, filtered_read1_files, filtered_read2_files, concat_files = self.illumina_process()
concatenate1 = self.add_component("ConcatenateFilesGroups", [filtered_read1_files, (Utils.get_group_basenames(self.get_all_reads("read1"), "read")).keys()],component_prefix="read1")