Commit d25986df authored by Gerald Salin's avatar Gerald Salin
Browse files

optimization of the addrawfiles step, aligenemnt step with

subset_seqfiles
add md5sum
parent 81cbb809
......@@ -510,7 +510,7 @@ class Workflow(threading.Thread):
logging.getLogger("wf." + str(self.id)).debug("delete. Trying to delete " + self.directory)
if self.get_status() in [self.STATUS_COMPLETED, self.STATUS_FAILED, self.STATUS_ABORTED]:
logging.getLogger("wf." + str(self.id)).debug("status is OK for deletetion" )
#utils.robust_rmtree(self.directory)
utils.robust_rmtree(self.directory)
logging.getLogger("wf." + str(self.id)).debug("workflow deleted" )
@staticmethod
......
......@@ -249,9 +249,12 @@ class NG6Workflow (BasicNG6Workflow):
def post_process(self):
# once everything done, sync directories
logging.getLogger("ng6").debug("post_process enter")
if self.runobj:
logging.getLogger("ng6").debug("post_process enter self.runobj.sync")
self.runobj.sync()
elif self.project:
logging.getLogger("ng6").debug("post_process enter self.project.sync")
self.project.sync()
......@@ -515,10 +518,13 @@ class CasavaNG6Workflow(NG6Workflow):
# archive the files
saved_files = filtered_read1_files + filtered_read2_files
logging.getLogger("CasavaNG6Workflow").debug("illumina_process saved_files = " + ",".join(saved_files))
reads_prefixes = None
if self.group_prefix != None :
# concatenate fastq
reads_prefixes = list((Utils.get_group_basenames(saved_files, "read")).keys())
logging.getLogger("CasavaNG6Workflow").debug("illumina_process reads_prefixes = " + ",".join(reads_prefixes))
logging.getLogger("CasavaNG6Workflow").debug("illumina_process saved_files = " + ",".join(saved_files))
concatenatefastq = self.add_component("ConcatenateFilesGroups", [saved_files, reads_prefixes])
saved_files = concatenatefastq.concat_files
......
......@@ -91,24 +91,26 @@ class Run(object):
os.makedirs(work_dir, 0o751)
self.directory = os.path.join(self.DIRECTORIES_STRUCTURE, directory_name)
def process_raw_files(self, mode):
def process_raw_files(self, total_size, total_seq):
"""
Copy, archive and compress the files list to the run. And add info to the run
@param mode: can be none, gz, bz2, tar.gz and tar.bz2
"""
nb_seq, full_size = 0, 0
for file in self.raw_files:
try:
# Get nb_seq and full_size values
reader = seqio.SequenceReader(file)
for id, desc, seq, qualities in reader:
nb_seq += 1
full_size += len(seq)
except:
logging.getLogger("Run.sync").debug(file + " seems to not be a seq file")
self.set_nb_sequences(nb_seq)
self.set_full_size(full_size)
self.archive_files(self.raw_files, mode)
# nb_seq, full_size = 0, 0
# for file in self.raw_files:
# try:
# # Get nb_seq and full_size values
# reader = seqio.SequenceReader(file)
# for id, desc, seq, qualities in reader:
# nb_seq += 1
# full_size += len(seq)
# except:
# logging.getLogger("Run.sync").debug(file + " seems to not be a seq file")
self.set_nb_sequences(total_seq)
self.set_full_size(total_size)
# if not os.path.isdir(self.__get_work_directory()) :
# os.makedirs(self.__get_work_directory(), 0o751)
# self.archive_files(self.raw_files, mode)
def archive_files(self, files, mode, archive_name="ng6_archive.tar", delete=False):
......@@ -120,7 +122,6 @@ class Run(object):
@param archive_name: the archive name if tar is requested
@param delete: delete files
"""
list_files_md5 = ""
# create workdir if it does not exists
if not os.path.isdir(self.__get_work_directory()) :
os.makedirs(self.__get_work_directory(), 0o751)
......@@ -135,7 +136,6 @@ class Run(object):
for file in files:
if os.path.isfile(file):
copyfile(file, os.path.join(self.__get_work_directory(), os.path.basename(file)))
list_files_md5 += os.path.join(self.__get_work_directory(), os.path.basename(file)) + " "
# Finaly try to delete the original file if asked to do so
try:
if delete:
......@@ -146,26 +146,20 @@ class Run(object):
elif mode == "gz":
for file in files:
Utils.gzip(file, self.__get_work_directory(), delete)
list_files_md5 += os.path.join(self.__get_work_directory(), os.path.basename(file))+".gz" + " "
elif mode == "bz2":
for file in files:
Utils.bz2(file, self.__get_work_directory(), delete)
list_files_md5 += os.path.join(self.__get_work_directory(), os.path.basename(file))+".bz2" + " "
elif mode == "tar.gz":
Utils.tar_files(files, os.path.join(self.__get_work_directory(), archive_name), delete)
Utils.gzip(os.path.join(self.__get_work_directory(), archive_name), self.__get_work_directory(), True)
list_files_md5 += os.path.join(self.__get_work_directory(), archive_name) + ".gz" + " "
elif mode == "tar.bz2":
Utils.tar_files(files, os.path.join(self.__get_work_directory(), archive_name), delete)
Utils.bz2(os.path.join(self.__get_work_directory(), archive_name), self.__get_work_directory(), True)
list_files_md5 += os.path.join(self.__get_work_directory(), archive_name)+".bz2" + " "
logging.getLogger("Run").debug("archive_files. list_files_md5 = " + list_files_md5)
p = Popen("md5sum " + list_files_md5+"|awk -F/ {'print $1,$NF'} > "+os.path.join(self.__get_work_directory(), "md5sum.txt") ,shell=True,stdout=PIPE,stderr=PIPE,universal_newlines=True)
stdout,stderr = p.communicate()
logging.getLogger("Run").debug("archive_files. stdout = " + stdout)
def set_nb_sequences(self, nb_seqs):
"""
......@@ -385,7 +379,14 @@ class Run(object):
"""
ng6conf = NG6ConfigReader()
return ng6conf.get_work_directory() + self.directory
def get_work_directory(self):
"""
Return the full path of the run directory into the work dir
"""
ng6conf = NG6ConfigReader()
return ng6conf.get_work_directory() + self.directory
def __get_save_directory(self):
"""
Return the full path of the run directory into the save dir
......
......@@ -17,17 +17,117 @@
import os
import pickle
import logging
from jflow.component import Component
def add_raw_files (run_dump_path, compression):
def extract_stats_from_seq_file(input_file,output_file_stat):
import jflow.seqio as seqio
import logging
import os
logging.getLogger("AddRawFiles").debug("extract_stats_from_seq_files. Entering, working on "+input_file)
nb_seq, full_size = 0, 0
try:
# Get nb_seq and full_size values
reader = seqio.SequenceReader(input_file)
for id, desc, seq, qualities in reader:
nb_seq += 1
full_size += len(seq)
f = open(output_file_stat, 'w')
f.write(str(nb_seq)+":"+str(full_size)) # python will convert \n to os.linesep
f.close()
except:
logging.getLogger("AddRawFiles").debug(file + " seems to not be a seq file")
logging.getLogger("AddRawFiles").debug("extract_stats_from_seq_files. finished")
def concatenate_stats_file(total_file_stat, *files_to_sync):
import jflow.seqio as seqio
import logging
import os
from subprocess import Popen, PIPE
logging.getLogger("AddRawFiles").debug("concatenate_stats_file. before md5sum Does work_directory of the run ("+os.path.dirname(total_file_stat)+")exists?" + str(os.path.isdir(os.path.dirname(total_file_stat))))
logging.getLogger("AddRawFiles").debug("concatenate_stats_file. content of work_directory of the run ("+os.path.dirname(total_file_stat)+") = " + ",".join(os.listdir(os.path.dirname(total_file_stat))))
logging.getLogger("AddRawFiles").debug("concatenate_stats_file. md5sum in " + str(total_file_stat))
p = Popen("md5sum " + " ".join(files_to_sync) +"|awk -F/ {'print $1,$NF'} > "+ total_file_stat ,shell=True,stdout=PIPE,stderr=PIPE,universal_newlines=True)
stdout,stderr = p.communicate()
logging.getLogger("AddRawFiles").debug("concatenate_stats_file. after md5sum Does work_directory of the run ("+os.path.dirname(total_file_stat)+")exists?" + str(os.path.isdir(os.path.dirname(total_file_stat))))
logging.getLogger("AddRawFiles").debug("concatenate_stats_file. Content of the folder ("+os.path.dirname(total_file_stat)+" = " + ",".join(os.listdir(os.path.dirname(total_file_stat))))
logging.getLogger("AddRawFiles").debug("concatenate_stats_file. finished")
def add_raw_files (run_dump_path,*files):
import pickle
import logging
total_nb_seq = 0
total_size = 0
for curr_stat_file in files:
logging.getLogger("AddRawFiles").debug("add_raw_files. Content of " + curr_stat_file)
with open(curr_stat_file, 'r') as myfile:
nb_seq,size=myfile.read().split(':')
logging.getLogger("AddRawFiles").debug("add_raw_files. Content of " + curr_stat_file + " nb_seq = " + str(nb_seq) + ", size = " + str(size))
size= int(size)
nb_seq= int(nb_seq)
total_size+=size
total_nb_seq+=nb_seq
# load the analysis object
run_dump = open(run_dump_path, "rb")
my_run = pickle.load(run_dump)
run_dump.close()
my_run.process_raw_files(compression)
logging.getLogger("AddRawFiles").debug("add_raw_files. Before my_run.process_raw_files total_nb_seq = " + str(total_nb_seq) + ", total_size = " + str(total_size))
logging.getLogger("AddRawFiles").debug("add_raw_files. work_directory of the run "+ my_run.get_work_directory())
my_run.process_raw_files(total_size,total_nb_seq)
logging.getLogger("AddRawFiles").debug("add_raw_files. after my_run.process_raw_files, does the work dir ("+my_run.get_work_directory()+") exist? "+ str(os.path.isdir(my_run.get_work_directory())))
logging.getLogger("AddRawFiles").debug("add_raw_files. Content of the folder ("+my_run.get_work_directory()+" = " + ",".join(os.listdir(my_run.get_work_directory())))
def copy_file (file,output):
import os
import logging
from shutil import copyfile
logging.getLogger("AddRawFiles").debug("copy_file. copy " + str(file) +" to " + str(output))
try:
logging.getLogger("AddRawFiles").debug("copy_file. Does work_directory of the run ("+os.path.dirname(output)+")exists?" + str(os.path.isdir(os.path.dirname(output))))
copyfile(file, output)
logging.getLogger("AddRawFiles").debug("copy_file. Content of the folder ("+os.path.dirname(output)+" = " + ",".join(os.listdir(os.path.dirname(output))))
except:
logging.getLogger("AddRawFiles").error("copy_file. Error raised while copying "+file +" to " + output)
#raise Exception('Could not ')
def zip_file (file,output):
import os
import logging
from ng6.utils import Utils
logging.getLogger("AddRawFiles").debug("zip_file. zip " + str(file) +" to " + os.path.dirname(output))
Utils.gzip(file, os.path.dirname(output))
def bzip2_file (file,output):
import os
import logging
from ng6.utils import Utils
logging.getLogger("AddRawFiles").debug("bzip2_file. zip " + str(file) +" to " + os.path.dirname(output))
Utils.bz2(file, os.path.dirname(output))
def tar_zip_files (output,files):
import os
import logging
from ng6.utils import Utils
logging.getLogger("AddRawFiles").debug("tar_zip_files.")
Utils.tar_files(files, os.path.join(os.path.dirname(output), "ng6_archive.tar"), delete)
Utils.gzip(os.path.join(os.path.dirname(output), archive_name), os.path.dirname(output), False)
def tar_bz2_files (output, files):
import os
import logging
from ng6.utils import Utils
logging.getLogger("AddRawFiles").debug("tar_bz2_files.")
Utils.tar_files(files, os.path.join(self.__get_work_directory(), archive_name), delete)
Utils.bz2(os.path.join(self.__get_work_directory(), archive_name), self.__get_work_directory(), False)
class AddRawFiles (Component):
def define_parameters(self, runobj, files_to_save, compression):
......@@ -35,13 +135,70 @@ class AddRawFiles (Component):
self.add_input_file_list( "files_to_save", "File to be saved as raw files", default=files_to_save, required=True)
self.add_parameter("compression", "How should data be compressed once archived", default=compression, required = True)
self.add_output_file("stdout", "AddRawfiles stdout", filename="addRawFiles.stdout")
self.add_output_file("total_stat_file", "total_stat", filename="total_stat.stdout")
self.add_output_file_list("files_to_save_stats", "Files containing stat of the corresponding read file", items=self.files_to_save)
self.add_output_file_list("files_to_sync", "Files in the work directory of the run, to be synchronized", items=self.files_to_save)
def process(self):
self.runobj.raw_files = self.files_to_save
logging.getLogger("AddRawFiles").debug("process. work_directory of the run "+ self.runobj.get_work_directory())
run_dump_path = self.get_temporary_file(".dump")
run_dump = open(run_dump_path, "wb")
pickle.dump(self.runobj, run_dump)
run_dump.close()
self.add_python_execution(add_raw_files, cmd_format='{EXE} {ARG} > {OUT}', map=False,
outputs=self.stdout, includes=self.files_to_save, arguments=[run_dump_path, self.compression] )
logging.getLogger("AddRawFiles").debug("process. begin does the work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))
files_to_save_stats = self.get_outputs( '{basename_woext}.count', self.files_to_save)
logging.getLogger("AddRawFiles").debug("process. before extract_stats_from_seq_file. does the work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))
for i,o in zip(self.files_to_save,files_to_save_stats ):
self.add_python_execution(extract_stats_from_seq_file,cmd_format="{EXE} {IN} {OUT}",
inputs = i, outputs = o, map=False)
logging.getLogger("AddRawFiles").debug("process. before add_raw_files. does the work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))
self.add_python_execution(add_raw_files, cmd_format='{EXE} {ARG} {IN} > {OUT}', map=False,
outputs=self.stdout, inputs=files_to_save_stats,includes=self.files_to_save, arguments=[run_dump_path])
logging.getLogger("AddRawFiles").debug("process. before self.compression. does the work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))
files_to_sync_ori = []
if self.compression=="none":
for file in self.files_to_save:
files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file)))
files_to_sync = files_to_sync_ori
for idx, file in enumerate(self.files_to_save):
self.add_python_execution(copy_file,cmd_format="{EXE} {IN} {OUT}",
inputs = file, outputs = files_to_sync[idx], map=False)
elif self.compression=="gz":
for file in self.files_to_save:
files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file)) + ".gz")
files_to_sync = files_to_sync_ori
for idx, file in enumerate(self.files_to_save):
self.add_python_execution(zip_file,cmd_format="{EXE} {IN} {OUT}",
inputs = file, outputs = files_to_sync[idx], map=False)
elif self.compression=="bz2":
for file in self.files_to_save:
files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file)) + ".bz2")
files_to_sync = files_to_sync_ori
for idx, file in enumerate(self.files_to_save):
self.add_python_execution(zip_file,cmd_format="{EXE} {IN} {OUT}",
inputs = file, outputs = files_to_sync[idx], map=False)
elif mode == "tar.gz":
files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),"ng6_archive.tar.gz"))
files_to_sync = files_to_sync_ori
self.add_python_execution(tar_zip_files,cmd_format="{EXE} {OUT} {IN}",
inputs = self.files_to_save, outputs = files_to_sync[idx], map=False)
elif mode == "tar.bz2":
files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),"ng6_archive.tar.bz2"))
files_to_sync = files_to_sync_ori
self.add_python_execution(tar_bz2_files,cmd_format="{EXE} {OUT} {IN}",
inputs = self.files_to_save, outputs = files_to_sync[idx], map=False)
logging.getLogger("AddRawFiles").debug("process. after self.compression. does the work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))
total_stat_file = os.path.join(self.runobj.get_work_directory(), "md5sum.txt")
self.add_python_execution(concatenate_stats_file,cmd_format="{EXE} {OUT} {IN}",
inputs = files_to_sync, outputs = total_stat_file, map=False)
logging.getLogger("AddRawFiles").debug("process. after concatenate_stats_file. does the work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))
def post_process(self):
logging.getLogger("AddRawFiles").debug("post_process. does the work dir ("+self.runobj.get_work_directory()+") exist? ")
\ No newline at end of file
......@@ -71,7 +71,6 @@ def get_number_of_reads_to_extract(extract_rate,min_nb_seq,max_nb_seq, filename,
f.write(str(step)+":"+str(nb_seq_to_extract)) # python will convert \n to os.linesep
f.close()
#def extract_random_seq(output_file,input_file,output_file):
def extract_random_seq(input_file,info_file,output_file):
import jflow.seqio as seqio
import logging
......@@ -82,61 +81,9 @@ def extract_random_seq(input_file,info_file,output_file):
logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. Content of " + info_file + " step = " + step + ", nb_seq_to_extract = " + nb_seq_to_extract)
step= int(step)
nb_seq_to_extract= int(nb_seq_to_extract)
# Check parameters
# extract_rate = float(extract_rate)
# min_nb_seq = int(min_nb_seq)
# max_nb_seq = int(max_nb_seq)
# logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. extract_rate = " + str(extract_rate))
# logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. min_nb_seq = " + str(min_nb_seq))
# logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. max_nb_seq = " + str(max_nb_seq))
#
# if extract_rate > 1 or extract_rate <= 0:
# raise Exception("[ERROR] : the extract_rate value is not correct! (Should be between 0 and 1)")
# if int(min_nb_seq) > int(max_nb_seq):
# raise Exception("[ERROR] : the threshold values are not correct ! (Minimum threshold is bigger than Maximum threshold)")
# nb_files_pair = len(files)/2
# if int(nb_files_pair) != nb_files_pair:
# raise Exception("[ERROR] : the number of files is not correct! (Each sequence_files should correspond to an sub_sequence_files : [file1, file2, sub_file1, sub_file2])")
# nb_files_pair = int(nb_files_pair)
# sequence_files = files[:nb_files_pair]
# outputs = files[nb_files_pair:]
# logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. Number of files " + str(nb_files_pair))
# logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. sequence_files = " + ", ".join(sequence_files))
# logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. outputs " + ", ".join(outputs))
# # Compute min_nb_seq, max_nb_seq, nbsubseq, totalseq
# nb_seq = 0
# min_file_size_for_reference = 0
# reference_file = ""
# for curr_sequence_file in sequence_files:
# logging.getLogger("SubsetSeqFiles").debug("Working on"+curr_sequence_file+" to determine the number of reads ")
# logging.getLogger("SubsetSeqFiles").debug("Size of "+curr_sequence_file+" = "+str(os.stat(curr_sequence_file).st_size))
# if min_file_size_for_reference == 0 or os.stat(curr_sequence_file).st_size < min_file_size_for_reference:
# min_file_size_for_reference = os.stat(curr_sequence_file).st_size
# reference_file = curr_sequence_file
# logging.getLogger("SubsetSeqFiles").debug("Reference file (smallest) is "+reference_file)
# reader = seqio.SequenceReader(reference_file)
# for id, desc, seq, qual in reader :
# nb_seq += 1
# nb_seq_to_extract = int(nb_seq * float(extract_rate))
# logging.getLogger("SubsetSeqFiles").debug("Total number of reads for the smallest seq file " + str(nb_seq))
# logging.getLogger("SubsetSeqFiles").debug("Number of reads to extract with the formula (nb_seq * float(extract_rate)) " + str(nb_seq_to_extract))
# # Setting the thresholds if necessary
# if nb_seq_to_extract > max_nb_seq:
# nb_seq_to_extract = max_nb_seq
# elif nb_seq_to_extract < min_nb_seq:
# nb_seq_to_extract = min_nb_seq
# if nb_seq < nb_seq_to_extract :
# nb_seq_to_extract = nb_seq
# step = int(nb_seq / nb_seq_to_extract)
# logging.getLogger("SubsetSeqFiles").debug("Number of reads to extract after checking min and max reads allowed " + str(nb_seq_to_extract))
# logging.getLogger("SubsetSeqFiles").debug("Step " + str(step))
# Create sub_sequence_files
#for i in range(0, nb_files_pair):
count = 0
countExtractedReads = 0
# Writes result in fasta file
# Writes result in seq file
reader = seqio.SequenceReader(input_file)
logging.getLogger("SubsetSeqFiles").debug("extract_random_seq. creating = " + output_file + ", from = " + input_file)
with open(output_file, "w") as output:
......@@ -154,8 +101,7 @@ def extract_random_seq(input_file,info_file,output_file):
class SubsetSeqFiles (Analysis):
def define_parameters(self, read1, read2=None, max_target_seqs=10,
extract_rate=0.05, min_nb_seq=2000, max_nb_seq=3000):
#extract_rate=0.05, min_nb_seq=20000, max_nb_seq=1e6):
extract_rate=0.05, min_nb_seq=20000, max_nb_seq=1e6):
"""
"""
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment