Commit 62e909d6 authored by Gerald Salin's avatar Gerald Salin
Browse files

logging, refactoring

parent ecf239b6
......@@ -26,6 +26,7 @@ import glob
import os
import string, gzip, bz2
import tarfile, tempfile
import logging
from shutil import copyfile, rmtree
import re
import base64
......@@ -317,12 +318,13 @@ class Utils(object):
@param prefixes : prefix to gather
"""
path_groups = {}
logging.getLogger("Utils").debug("get_filepath_by_prefix. prefixes = " + ", ".join(prefixes))
for current_prefix in prefixes:
path_groups[current_prefix] = []
for file_path in path_list:
if os.path.basename(file_path).startswith(current_prefix):
path_groups[current_prefix].append(file_path)
logging.getLogger("Utils").debug("get_filepath_by_prefix. path_groups[current_prefix] = " + ", ".join(path_groups[current_prefix]))
return path_groups
......
......@@ -41,30 +41,27 @@ def extract_stats_from_seq_file(input_file,output_file_stat):
logging.getLogger("AddRawFiles").debug("extract_stats_from_seq_files. finished")
def concatenate_stats_file(total_file_stat, *files_to_sync):
def md5sum(md5_file_out, *files_to_md5sum):
import jflow.seqio as seqio
import logging
import os
from subprocess import Popen, PIPE
logging.getLogger("AddRawFiles").debug("concatenate_stats_file. before md5sum Does work_directory of the run ("+os.path.dirname(total_file_stat)+")exists?" + str(os.path.isdir(os.path.dirname(total_file_stat))))
logging.getLogger("AddRawFiles").debug("concatenate_stats_file. content of work_directory of the run ("+os.path.dirname(total_file_stat)+") = " + ",".join(os.listdir(os.path.dirname(total_file_stat))))
logging.getLogger("AddRawFiles").debug("concatenate_stats_file. md5sum in " + str(total_file_stat))
p = Popen("md5sum " + " ".join(files_to_sync) +"|awk -F/ {'print $1,$NF'} > "+ total_file_stat ,shell=True,stdout=PIPE,stderr=PIPE,universal_newlines=True)
logging.getLogger("AddRawFiles").debug("md5sum. entering")
logging.getLogger("AddRawFiles").debug("md5sum. files_to_md5sum in "+os.path.dirname(md5_file_out)+" = " + ",".join(files_to_md5sum))
p = Popen("md5sum " + " ".join(files_to_md5sum) +"|awk -F/ {'print $1,$NF'} > "+ md5_file_out ,shell=True,stdout=PIPE,stderr=PIPE,universal_newlines=True)
stdout,stderr = p.communicate()
logging.getLogger("AddRawFiles").debug("concatenate_stats_file. after md5sum Does work_directory of the run ("+os.path.dirname(total_file_stat)+")exists?" + str(os.path.isdir(os.path.dirname(total_file_stat))))
logging.getLogger("AddRawFiles").debug("concatenate_stats_file. Content of the folder ("+os.path.dirname(total_file_stat)+" = " + ",".join(os.listdir(os.path.dirname(total_file_stat))))
logging.getLogger("AddRawFiles").debug("concatenate_stats_file. finished")
logging.getLogger("AddRawFiles").debug("md5sum. finished")
def add_raw_files (run_dump_path,*files):
def add_stats_to_run (run_dump_path,*files):
import pickle
import logging
total_nb_seq = 0
total_size = 0
for curr_stat_file in files:
logging.getLogger("AddRawFiles").debug("add_raw_files. Content of " + curr_stat_file)
logging.getLogger("AddRawFiles").debug("add_stats_to_run. Content of " + curr_stat_file)
with open(curr_stat_file, 'r') as myfile:
nb_seq,size=myfile.read().split(':')
logging.getLogger("AddRawFiles").debug("add_raw_files. Content of " + curr_stat_file + " nb_seq = " + str(nb_seq) + ", size = " + str(size))
logging.getLogger("AddRawFiles").debug("add_stats_to_run. Content of " + curr_stat_file + " nb_seq = " + str(nb_seq) + ", size = " + str(size))
size= int(size)
nb_seq= int(nb_seq)
total_size+=size
......@@ -73,11 +70,11 @@ def add_raw_files (run_dump_path,*files):
run_dump = open(run_dump_path, "rb")
my_run = pickle.load(run_dump)
run_dump.close()
logging.getLogger("AddRawFiles").debug("add_raw_files. Before my_run.process_raw_files total_nb_seq = " + str(total_nb_seq) + ", total_size = " + str(total_size))
logging.getLogger("AddRawFiles").debug("add_raw_files. work_directory of the run "+ my_run.get_work_directory())
logging.getLogger("AddRawFiles").debug("add_stats_to_run. Before my_run.process_raw_files total_nb_seq = " + str(total_nb_seq) + ", total_size = " + str(total_size))
logging.getLogger("AddRawFiles").debug("add_stats_to_run. work_directory of the run "+ my_run.get_work_directory())
my_run.process_raw_files(total_size,total_nb_seq)
logging.getLogger("AddRawFiles").debug("add_raw_files. after my_run.process_raw_files, does the work dir ("+my_run.get_work_directory()+") exist? "+ str(os.path.isdir(my_run.get_work_directory())))
logging.getLogger("AddRawFiles").debug("add_raw_files. Content of the folder ("+my_run.get_work_directory()+" = " + ",".join(os.listdir(my_run.get_work_directory())))
logging.getLogger("AddRawFiles").debug("add_stats_to_run. after my_run.process_raw_files, does the work dir ("+my_run.get_work_directory()+") exist? "+ str(os.path.isdir(my_run.get_work_directory())))
logging.getLogger("AddRawFiles").debug("add_stats_to_run. Content of the folder ("+my_run.get_work_directory()+" = " + ",".join(os.listdir(my_run.get_work_directory())))
def copy_file (file,output):
import os
......@@ -138,6 +135,8 @@ class AddRawFiles (Component):
self.add_output_file("total_stat_file", "total_stat", filename="total_stat.stdout")
self.add_output_file_list("files_to_save_stats", "Files containing stat of the corresponding read file", items=self.files_to_save)
self.add_output_file_list("files_to_sync", "Files in the work directory of the run, to be synchronized", items=self.files_to_save)
self.add_output_file_list("files_to_md5", "Files in the work directory for md5sum", items=self.files_to_save)
def process(self):
self.runobj.raw_files = self.files_to_save
logging.getLogger("AddRawFiles").debug("process. work_directory of the run "+ self.runobj.get_work_directory())
......@@ -148,55 +147,74 @@ class AddRawFiles (Component):
logging.getLogger("AddRawFiles").debug("process. begin does the work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))
files_to_save_stats = self.get_outputs( '{basename_woext}.count', self.files_to_save)
logging.getLogger("AddRawFiles").debug("process. before extract_stats_from_seq_file. does the work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))
logging.getLogger("AddRawFiles").debug("process. Before self.add_python_execution(extract_stats_from_seq_file")
#count number of reads and total length in base for each seq file
for i,o in zip(self.files_to_save,files_to_save_stats ):
self.add_python_execution(extract_stats_from_seq_file,cmd_format="{EXE} {IN} {OUT}",
inputs = i, outputs = o, map=False)
logging.getLogger("AddRawFiles").debug("process. before add_raw_files. does the work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))
self.add_python_execution(add_raw_files, cmd_format='{EXE} {ARG} {IN} > {OUT}', map=False,
logging.getLogger("AddRawFiles").debug("process. Before self.add_stats_to_run(extract_stats_from_seq_file")
#Add number of reads and total length in base for each seq file and add these data to the run in the database
self.add_python_execution(add_stats_to_run, cmd_format='{EXE} {ARG} {IN} > {OUT}', map=False,
outputs=self.stdout, inputs=files_to_save_stats,includes=self.files_to_save, arguments=[run_dump_path])
logging.getLogger("AddRawFiles").debug("process. before self.compression. does the work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))
#archive the files in the work folder of the run to be rsynced at the end
logging.getLogger("AddRawFiles").debug("process. Before copying/archiving files with compression = " + self.compression )
files_to_sync_ori = []
files_to_md5_ori = []
if self.compression=="none":
for file in self.files_to_save:
files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file)))
files_to_md5_ori.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file)))
if os.path.dirname(file) != self.runobj.get_work_directory():
files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file)))
files_to_md5 = files_to_md5_ori
files_to_sync = files_to_sync_ori
for idx, file in enumerate(self.files_to_save):
for idx, file in enumerate(files_to_sync):
self.add_python_execution(copy_file,cmd_format="{EXE} {IN} {OUT}",
inputs = file, outputs = files_to_sync[idx], map=False)
inputs = self.files_to_save[idx], outputs = file, map=False)
#TODO possible inconsistancy : if the filename ends with one extension in Utils.UNCOMPRESS_EXTENSION, output file name won't be suffixed with gz in Utils.gzip
elif self.compression=="gz":
for file in self.files_to_save:
files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file)))
files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file)+".gz"))
files_to_md5_ori.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file)+".gz"))
files_to_md5 = files_to_md5_ori
files_to_sync = files_to_sync_ori
for idx, file in enumerate(self.files_to_save):
self.add_python_execution(zip_file,cmd_format="{EXE} {IN} {OUT}",
inputs = file, outputs = files_to_sync[idx], map=False)
#TODO possible inconsistancy : if the filename ends with one extension in Utils.UNCOMPRESS_EXTENSION, output file name won't be suffixed with bz2 in Utils.bz2
elif self.compression=="bz2":
for file in self.files_to_save:
files_to_md5_ori.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file)))
files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file)))
files_to_md5 = files_to_md5_ori
files_to_sync = files_to_sync_ori
for idx, file in enumerate(self.files_to_save):
self.add_python_execution(zip_file,cmd_format="{EXE} {IN} {OUT}",
self.add_python_execution(bzip2_file,cmd_format="{EXE} {IN} {OUT}",
inputs = file, outputs = files_to_sync[idx], map=False)
elif mode == "tar.gz":
files_to_md5_ori.append(os.path.join(self.runobj.get_work_directory(),"ng6_archive.tar.gz"))
files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),"ng6_archive.tar.gz"))
files_to_md5 = files_to_md5_ori
files_to_sync = files_to_sync_ori
self.add_python_execution(tar_zip_files,cmd_format="{EXE} {OUT} {IN}",
inputs = self.files_to_save, outputs = files_to_sync[idx], map=False)
elif mode == "tar.bz2":
files_to_md5_ori.append(os.path.join(self.runobj.get_work_directory(),"ng6_archive.tar.bz2"))
files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),"ng6_archive.tar.bz2"))
files_to_md5 = files_to_md5_ori
files_to_sync = files_to_sync_ori
self.add_python_execution(tar_bz2_files,cmd_format="{EXE} {OUT} {IN}",
inputs = self.files_to_save, outputs = files_to_sync[idx], map=False)
logging.getLogger("AddRawFiles").debug("process. after self.compression. does the work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))
total_stat_file = os.path.join(self.runobj.get_work_directory(), "md5sum.txt")
self.add_python_execution(concatenate_stats_file,cmd_format="{EXE} {OUT} {IN}",
inputs = files_to_sync, outputs = total_stat_file, map=False)
logging.getLogger("AddRawFiles").debug("process. after concatenate_stats_file. does the work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))
logging.getLogger("AddRawFiles").debug("process. after self.compression.")
if (len(files_to_md5_ori)>0):
md5_file = os.path.join(self.runobj.get_work_directory(), "md5sum.txt")
self.add_python_execution(md5sum,cmd_format="{EXE} {OUT} {IN}",
inputs = files_to_md5, outputs = md5_file, map=False)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment