Commit e223c65f authored by Gerald Salin's avatar Gerald Salin
Browse files

Merge branch 'md5sum_parallel' into 'master'

Long sample list is still a problem. Please do not accept this merge request!

See merge request !14
parents f8269aea db51d9e5
...@@ -50,33 +50,53 @@ def add_analysis(parent_id, analysis_cfg, *input_files): ...@@ -50,33 +50,53 @@ def add_analysis(parent_id, analysis_cfg, *input_files):
from ng6.project import Project from ng6.project import Project
from ng6.run import Run from ng6.run import Run
import pickle import pickle
import logging
logging.getLogger("Analysis").debug("Start. Imports went good.")
# get inputs from parameters # get inputs from parameters
analysis_serialized_path = input_files[0] analysis_serialized_path = input_files[0]
try: parent_analysis_cfg = input_files[1] try: parent_analysis_cfg = input_files[1]
except: parent_analysis_cfg = None except: parent_analysis_cfg = None
logging.getLogger("Analysis").debug("Start. parent_analysis_cfg is initialised.")
# load the analysis object # load the analysis object
analysis_dump = open(analysis_serialized_path, "rb") analysis_dump = open(analysis_serialized_path, "rb")
analysis = pickle.load(analysis_dump) analysis = pickle.load(analysis_dump)
analysis_dump.close() analysis_dump.close()
logging.getLogger("Analysis").debug("Dump. analysis_dump is closed and analysis is loaded.")
# add the parent information # add the parent information
logging.getLogger("Analysis").debug("parent_analysis_cfg. Starting")
if parent_analysis_cfg: if parent_analysis_cfg:
if os.path.isfile(parent_analysis_cfg): if os.path.isfile(parent_analysis_cfg):
logging.getLogger("Analysis").debug("parent_analysis_cfg. Analysis.get_from_file(parent_analysis_cfg) is about to start")
parent = Analysis.get_from_file(parent_analysis_cfg) parent = Analysis.get_from_file(parent_analysis_cfg)
logging.getLogger("Analysis").debug("parent_analysis_cfg. Analysis.get_from_file(parent_analysis_cfg) is done")
analysis.parent = parent analysis.parent = parent
elif parent_id != 'none' : elif parent_id != 'none' :
logging.getLogger("Analysis").debug("parent_analysis_cfg. Analysis.get_from_file(parent_analysis_cfg) is about to start")
parent = Analysis.get_from_id(int(parent_id)) parent = Analysis.get_from_id(int(parent_id))
analysis.parent = parent analysis.parent = parent
logging.getLogger("Analysis").debug("parent_analysis_cfg. Analysis.get_from_file(parent_analysis_cfg) is done")
# process the parsing of the analysis # process the parsing of the analysis
logging.getLogger("Analysis").debug("analysis.post_process. Starting")
analysis.post_process() analysis.post_process()
logging.getLogger("Analysis").debug("analysis.post_process. Done")
# add the analysis to the right run/project object # add the analysis to the right run/project object
if analysis.project: analysis.project.add_analysis(analysis) logging.getLogger("Analysis").debug("analysis.project. Starting add_analysis(analysis)")
elif analysis.run: analysis.run.add_analysis(analysis) if analysis.project:
logging.getLogger("Analysis").debug("analysis.project. Starting analysis.project.add_analysis(analysis)")
analysis.project.add_analysis(analysis)
logging.getLogger("Analysis").debug("analysis.project. Finishing analysis.project.add_analysis(analysis)")
elif analysis.run:
logging.getLogger("Analysis").debug("analysis.project. Starting analysis.run.add_analysis(analysis)")
analysis.run.add_analysis(analysis)
logging.getLogger("Analysis").debug("analysis.project. Finishing analysis.run.add_analysis(analysis)")
logging.getLogger("Analysis").debug("Config File. Starting the writing of config file")
analysis.write_config_file() analysis.write_config_file()
logging.getLogger("Analysis").debug("Config File. Is written")
class Analysis (Component): class Analysis (Component):
""" """
...@@ -115,12 +135,16 @@ class Analysis (Component): ...@@ -115,12 +135,16 @@ class Analysis (Component):
# Set the temp folder to the ng6 temp folder # Set the temp folder to the ng6 temp folder
ng6conf = NG6ConfigReader() ng6conf = NG6ConfigReader()
logging.getLogger("Analysis.__init__").debug("ng6conf est chargé")
tempfile.tempdir = ng6conf.get_tmp_directory() tempfile.tempdir = ng6conf.get_tmp_directory()
logging.getLogger("Analysis.__init__").debug("tempfile.tempdir = " + tempfile.tempdir)
if not os.path.isdir(tempfile.tempdir): if not os.path.isdir(tempfile.tempdir):
os.makedirs(tempfile.tempdir, 0o751) os.makedirs(tempfile.tempdir, 0o751)
if id != None : # If not a new analysis if id != None : # If not a new analysis
logging.getLogger("Analysis.__init__").debug("Connexion à la BD")
t3mysql = t3MySQLdb() t3mysql = t3MySQLdb()
logging.getLogger("Analysis.__init__").debug("Connexion effectuée")
self.run = Run.get_from_id(self.__get_run_id()) self.run = Run.get_from_id(self.__get_run_id())
from ng6.project import Project from ng6.project import Project
self.project = Project.get_from_id(self.__get_project_id()) self.project = Project.get_from_id(self.__get_project_id())
...@@ -133,7 +157,9 @@ class Analysis (Component): ...@@ -133,7 +157,9 @@ class Analysis (Component):
directory_name = uuid.uuid4().hex[:9] directory_name = uuid.uuid4().hex[:9]
while True: while True:
save_dir = os.path.join(ng6conf.get_save_directory()+self.DIRECTORIES_STRUCTURE, directory_name) save_dir = os.path.join(ng6conf.get_save_directory()+self.DIRECTORIES_STRUCTURE, directory_name)
logging.getLogger("Analysis.__init__").debug("Building analysis save_dir=" + save_dir)
work_dir = os.path.join(ng6conf.get_work_directory()+self.DIRECTORIES_STRUCTURE, directory_name) work_dir = os.path.join(ng6conf.get_work_directory()+self.DIRECTORIES_STRUCTURE, directory_name)
logging.getLogger("Analysis.__init__").debug("Building analysis work_dir=" + save_dir)
if not os.path.isdir(save_dir) and not os.path.isdir(work_dir): if not os.path.isdir(save_dir) and not os.path.isdir(work_dir):
break break
directory_name = uuid.uuid4().hex[:9] directory_name = uuid.uuid4().hex[:9]
...@@ -280,13 +306,16 @@ class Analysis (Component): ...@@ -280,13 +306,16 @@ class Analysis (Component):
update the database info update the database info
""" """
# First add the run into the database # First add the run into the database
logging.getLogger("Analysis.save").debug("Connexion try")
t3mysql = t3MySQLdb() t3mysql = t3MySQLdb()
logging.getLogger("Analysis.save").debug("Connexion done")
if self.parent: parent_uid = self.parent.id if self.parent: parent_uid = self.parent.id
else: parent_uid = 0 else: parent_uid = 0
logging.getLogger("Analysis.save").debug("Doing the add analysis")
self.id = t3mysql.add_analysis(self.get_template(), self.name, self.description, self.admin_login, self.id = t3mysql.add_analysis(self.get_template(), self.name, self.description, self.admin_login,
datetime.date.today(), self.directory, self.software, self.version, datetime.date.today(), self.directory, self.software, self.version,
self.options, self.is_editable, parent_uid=parent_uid) self.options, self.is_editable, parent_uid=parent_uid)
logging.getLogger("Analysis.save").debug("add_analysis done")
# Then add all results link to this analysis # Then add all results link to this analysis
for file in self.results: for file in self.results:
for result in self.results[file]: for result in self.results[file]:
......
...@@ -736,7 +736,9 @@ class t3MySQLdb(object): ...@@ -736,7 +736,9 @@ class t3MySQLdb(object):
sql = "INSERT INTO tx_nG6_result (pid, tstamp, crdate, analyze_id, file, rkey, rvalue, rgroup) \ sql = "INSERT INTO tx_nG6_result (pid, tstamp, crdate, analyze_id, file, rkey, rvalue, rgroup) \
VALUES('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')" % ( self.cfg_reader.get_pid(), crdate, crdate, VALUES('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')" % ( self.cfg_reader.get_pid(), crdate, crdate,
analysis_id, file, key, value, group ) analysis_id, file, key, value, group )
logging.getLogger("t3MySQL").debug("add_result. sql created : " + sql)
self.execute(sql, commit = True ) self.execute(sql, commit = True )
logging.getLogger("t3MySQL").debug("add_result. executed")
def update_analysis_field(self, analysis_id, field, value): def update_analysis_field(self, analysis_id, field, value):
""" """
......
...@@ -21,11 +21,11 @@ import logging ...@@ -21,11 +21,11 @@ import logging
from jflow.component import Component from jflow.component import Component
def extract_stats_from_seq_file(input_file,output_file_stat): def extract_stats_from_seq_file(input_file, output_file_stat):
import jflow.seqio as seqio import jflow.seqio as seqio
import logging import logging
import os import os
logging.getLogger("AddRawFiles").debug("extract_stats_from_seq_files. Entering, working on "+input_file) logging.getLogger("AddRawFiles").debug("extract_stats_from_seq_files. Entering, working on " + input_file)
nb_seq, full_size = 0, 0 nb_seq, full_size = 0, 0
try: try:
# Get nb_seq and full_size values # Get nb_seq and full_size values
...@@ -41,25 +41,33 @@ def extract_stats_from_seq_file(input_file,output_file_stat): ...@@ -41,25 +41,33 @@ def extract_stats_from_seq_file(input_file,output_file_stat):
logging.getLogger("AddRawFiles").debug("extract_stats_from_seq_files. finished") logging.getLogger("AddRawFiles").debug("extract_stats_from_seq_files. finished")
def md5sum(md5_file_out, *files_to_md5sum): def md5sum(md5_file_out, files_to_md5sum):
import jflow.seqio as seqio import jflow.seqio as seqio
import logging import logging
import os import os
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
logging.getLogger("AddRawFiles").debug("md5sum. entering") logging.getLogger("AddRawFiles").debug("md5sum. entering")
logging.getLogger("AddRawFiles").debug("md5sum. files_to_md5sum in "+os.path.dirname(md5_file_out)+" = " + ",".join(files_to_md5sum)) logging.getLogger("AddRawFiles").debug("md5sum. md5sum in this directory : "+os.path.dirname(md5_file_out))
p = Popen("md5sum " + " ".join(files_to_md5sum) +"|awk -F/ {'print $1,$NF'} > "+ md5_file_out ,shell=True,stdout=PIPE,stderr=PIPE,universal_newlines=True) logging.getLogger("AddRawFiles").debug("md5sum. Generating the md5sum.txt with this command : " +
"find " + os.path.dirname(files_to_md5sum) + " -maxdepth 1 -type f -not -name " + os.path.basename(md5_file_out) +
" -exec md5sum {} \; | awk -F/ {'print $1$NF'} > "+ md5_file_out)
p = Popen("find " + os.path.dirname(files_to_md5sum) + " -maxdepth 1 -type f -not -name " + os.path.basename(md5_file_out) + " -exec md5sum {} \;" +
"| awk -F/ {'print $1$NF'} > "+ md5_file_out, shell = True, stdout = PIPE, stderr = PIPE, universal_newlines = True)
stdout,stderr = p.communicate() stdout,stderr = p.communicate()
logging.getLogger("AddRawFiles").debug("md5sum. finished") logging.getLogger("AddRawFiles").debug("md5sum. finished")
def add_stats_to_run (run_dump_path,*files): def add_stats_to_run (run_dump_path,file):
import pickle import pickle
import logging import logging
total_nb_seq = 0 total_nb_seq = 0
total_size = 0 total_size = 0
for curr_stat_file in files: logging.getLogger("AddRawFiles").debug("add_stats_to_run. Working on files of the directory containing this one : " + file)
logging.getLogger("AddRawFiles").debug("add_stats_to_run. Content of " + curr_stat_file) file_list = [ f for f in os.listdir(os.path.dirname(file)) if f.endswith('.count')]
with open(curr_stat_file, 'r') as myfile: logging.getLogger("AddRawFiles").debug("add_stats_to_run. Length on the file_list : " + str(len(file_list)))
logging.getLogger("AddRawFiles").debug("add_stats_to_run. Files on the file_list : " + str(file_list))
for curr_stat_file in file_list:
logging.getLogger("AddRawFiles").debug("add_stats_to_run. Content of " + os.path.dirname(file) + "/" + curr_stat_file)
with open(os.path.dirname(file) + "/" + curr_stat_file, 'r') as myfile:
nb_seq,size=myfile.read().split(':') nb_seq,size=myfile.read().split(':')
logging.getLogger("AddRawFiles").debug("add_stats_to_run. Content of " + curr_stat_file + " nb_seq = " + str(nb_seq) + ", size = " + str(size)) logging.getLogger("AddRawFiles").debug("add_stats_to_run. Content of " + curr_stat_file + " nb_seq = " + str(nb_seq) + ", size = " + str(size))
size= int(size) size= int(size)
...@@ -149,14 +157,15 @@ class AddRawFiles (Component): ...@@ -149,14 +157,15 @@ class AddRawFiles (Component):
files_to_save_stats = self.get_outputs( '{basename_woext}.count', self.files_to_save) files_to_save_stats = self.get_outputs( '{basename_woext}.count', self.files_to_save)
logging.getLogger("AddRawFiles").debug("process. Before self.add_python_execution(extract_stats_from_seq_file") logging.getLogger("AddRawFiles").debug("process. Before self.add_python_execution(extract_stats_from_seq_file")
#count number of reads and total length in base for each seq file #count number of reads and total length in base for each seq file
for i,o in zip(self.files_to_save,files_to_save_stats ): for i,o in zip(self.files_to_save, files_to_save_stats ):
self.add_python_execution(extract_stats_from_seq_file,cmd_format="{EXE} {IN} {OUT}", self.add_python_execution(extract_stats_from_seq_file,cmd_format="{EXE} {IN} {OUT}",
inputs = i, outputs = o, map=False) inputs = i, outputs = o, map=False)
logging.getLogger("AddRawFiles").debug("process. Before self.add_stats_to_run(extract_stats_from_seq_file") logging.getLogger("AddRawFiles").debug("process. Before self.add_stats_to_run(extract_stats_from_seq_file")
logging.getLogger("AddRawFiles").debug("process. Dirname of files_to_save_stats[0] : "+ os.path.dirname(files_to_save_stats[0]))
#Add number of reads and total length in base for each seq file and add these data to the run in the database #Add number of reads and total length in base for each seq file and add these data to the run in the database
self.add_python_execution(add_stats_to_run, cmd_format='{EXE} {ARG} {IN} > {OUT}', map=False, self.add_python_execution(add_stats_to_run, cmd_format='{EXE} {ARG} {IN} > {OUT}', map=False,
outputs=self.stdout, inputs=files_to_save_stats,includes=self.files_to_save, arguments=[run_dump_path]) outputs = self.stdout, inputs = files_to_save_stats[0], includes=self.files_to_save, arguments=[run_dump_path])
#archive the files in the work folder of the run to be rsynced at the end #archive the files in the work folder of the run to be rsynced at the end
logging.getLogger("AddRawFiles").debug("process. Before copying/archiving files with compression = " + self.compression ) logging.getLogger("AddRawFiles").debug("process. Before copying/archiving files with compression = " + self.compression )
...@@ -173,7 +182,7 @@ class AddRawFiles (Component): ...@@ -173,7 +182,7 @@ class AddRawFiles (Component):
for idx, file in enumerate(files_to_sync): for idx, file in enumerate(files_to_sync):
self.add_python_execution(copy_file,cmd_format="{EXE} {IN} {OUT}", self.add_python_execution(copy_file,cmd_format="{EXE} {IN} {OUT}",
inputs = self.files_to_save[idx], outputs = file, map=False) inputs = self.files_to_save[idx], outputs = file, map=False)
#TODO possible inconsistancy : if the filename ends with one extension in Utils.UNCOMPRESS_EXTENSION, output file name won't be suffixed with gz in Utils.gzip #TODO: possible inconsistancy : if the filename ends with one extension in Utils.UNCOMPRESS_EXTENSION, output file name won't be suffixed with gz in Utils.gzip
elif self.compression=="gz": elif self.compression=="gz":
for file in self.files_to_save: for file in self.files_to_save:
files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file)+".gz")) files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file)+".gz"))
...@@ -183,7 +192,7 @@ class AddRawFiles (Component): ...@@ -183,7 +192,7 @@ class AddRawFiles (Component):
for idx, file in enumerate(self.files_to_save): for idx, file in enumerate(self.files_to_save):
self.add_python_execution(zip_file,cmd_format="{EXE} {IN} {OUT}", self.add_python_execution(zip_file,cmd_format="{EXE} {IN} {OUT}",
inputs = file, outputs = files_to_sync[idx], map=False) inputs = file, outputs = files_to_sync[idx], map=False)
#TODO possible inconsistancy : if the filename ends with one extension in Utils.UNCOMPRESS_EXTENSION, output file name won't be suffixed with bz2 in Utils.bz2 #TODO: possible inconsistancy : if the filename ends with one extension in Utils.UNCOMPRESS_EXTENSION, output file name won't be suffixed with bz2 in Utils.bz2
elif self.compression=="bz2": elif self.compression=="bz2":
for file in self.files_to_save: for file in self.files_to_save:
files_to_md5_ori.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file))) files_to_md5_ori.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file)))
...@@ -213,8 +222,7 @@ class AddRawFiles (Component): ...@@ -213,8 +222,7 @@ class AddRawFiles (Component):
if (len(files_to_md5_ori)>0): if (len(files_to_md5_ori)>0):
md5_file = os.path.join(self.runobj.get_work_directory(), "md5sum.txt") md5_file = os.path.join(self.runobj.get_work_directory(), "md5sum.txt")
self.add_python_execution(md5sum,cmd_format="{EXE} {OUT} {IN}", self.add_python_execution(md5sum, cmd_format="{EXE} {OUT} {IN}",
inputs = files_to_md5, outputs = md5_file, map=False) inputs = files_to_md5[0], outputs = md5_file, map=False)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment