Commit ecf239b6 authored by Gerald Salin's avatar Gerald Salin
Browse files

logging + add runobj argument to components to be able to access current

run information
parent b0cf01d8
......@@ -23,7 +23,7 @@ import pickle
import datetime
import argparse
import xml.etree.ElementTree as ET
import logging
from jflow.workflow import Workflow
from jflow.utils import display_error_message
from jflow.parameter import *
......@@ -353,8 +353,10 @@ class CasavaNG6Workflow(NG6Workflow):
@param casava_directory : path to CASAVA output directory
@param lane_number : files in each sample are sequenced on this lane
"""
logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ entering")
if self.casava and self.casava["directory"] and self.casava["lane"] :
logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ self.casava")
self.is_casava = True
casava_directory = self.casava["directory"]
lane_number = self.casava["lane"]
......@@ -368,13 +370,16 @@ class CasavaNG6Workflow(NG6Workflow):
project_name = project_name.replace(" ", "_")
input_files = casava_directory.get_files( project_name, lane_number)
logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ input_files = " + ",".join(input_files))
if len(input_files) == 0 :
raise Exception("Error while parsing casava directory %s, invalid project name '%s' for lane %s"% (casava_directory, project_name, lane_number))
all_samples, all_samples_id = [], []
if os.path.exists(os.path.join(casava_directory, "SampleSheet.mk")) :
logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ before self._process_casava_18")
all_samples, all_samples_id = self._process_casava_18(casava_directory, project_name, lane_number, input_files)
logging.getLogger("ng6").debug("CasavaNG6Workflow.__create_samples__ before self._process_casava_18")
elif os.path.exists(os.path.join( casava_directory, 'Stats', 'DemultiplexingStats.xml')) :
all_samples, all_samples_id = self._process_casava_216(casava_directory, project_name, lane_number, input_files)
......@@ -398,8 +403,8 @@ class CasavaNG6Workflow(NG6Workflow):
self.group_prefix = list((Utils.get_group_basenames(self.get_all_reads(), "read")).keys())
def _process_casava_18(self, casava_directory, project_name, lane_number, input_files):
logging.getLogger("ng6").debug("_process_casava_18 enter")
logging.getLogger("ng6").debug("_process_casava_18 casava_directory = " + casava_directory + ", project_name = " + str(project_name))
logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 enter")
logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 casava_directory = " + casava_directory + ", project_name = " + str(project_name))
"""
Creates samples from casavadir (<=1.8) using input files
@param casava_directory:
......@@ -469,7 +474,8 @@ class CasavaNG6Workflow(NG6Workflow):
filepath = casava_directory + "/" + file
if file.endswith(".log"):
self.log_files.append(filepath)
logging.getLogger("ng6").debug("_process_casava_18 exiting")
logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 all_samples_id = " + ",".join(all_samples_id))
logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 exiting")
return all_samples, all_samples_id
......@@ -488,8 +494,9 @@ class CasavaNG6Workflow(NG6Workflow):
filtered_read1_files = []
filtered_read2_files = []
saved_files = []
logging.getLogger("ng6").debug("illumina_process entering")
if self.is_casava :
logging.getLogger("ng6").debug("illumina_process self.is_casava")
if len(self.log_files) > 0 :
add_log = self.add_component("BasicAnalysis", [self.log_files,"Log Files","Log files generated during primary analysis","-","-","-","gz", "","log.gz"])
......@@ -500,8 +507,11 @@ class CasavaNG6Workflow(NG6Workflow):
demultiplex_stats = self.add_component("DemultiplexStats", [self.get_all_reads("read1"), self.undetermined_reads1])
if self.keep_reads != "all" :
logging.getLogger("ng6").debug("illumina_process self.keep_reads != all")
logging.getLogger("ng6").debug("illumina_process self.get_all_reads() = " + ",".join(self.get_all_reads()))
logging.getLogger("ng6").debug("illumina_process self.group_prefix = " + ",".join(self.group_prefix))
# fastq illumina filter
fastqilluminafilter = self.add_component("FastqIlluminaFilter", [self.get_all_reads(), self.keep_reads, self.group_prefix])
fastqilluminafilter = self.add_component("FastqIlluminaFilter", [self.runobj,self.get_all_reads(), self.keep_reads, self.group_prefix])
# list filtered files
if self.is_paired_end() :
......@@ -518,6 +528,7 @@ class CasavaNG6Workflow(NG6Workflow):
filtered_read2_files = self.get_all_reads("read2")
# archive the files
#TODO : if self.group_prefix == None, the create the output of fastqilluminafilter in the run.get_work_directory()
saved_files = filtered_read1_files + filtered_read2_files
logging.getLogger("CasavaNG6Workflow").debug("illumina_process saved_files = " + ",".join(saved_files))
reads_prefixes = None
......@@ -526,8 +537,9 @@ class CasavaNG6Workflow(NG6Workflow):
reads_prefixes = list((Utils.get_group_basenames(saved_files, "read")).keys())
logging.getLogger("CasavaNG6Workflow").debug("illumina_process reads_prefixes = " + ",".join(reads_prefixes))
logging.getLogger("CasavaNG6Workflow").debug("illumina_process saved_files = " + ",".join(saved_files))
concatenatefastq = self.add_component("ConcatenateFilesGroups", [saved_files, reads_prefixes])
concatenatefastq = self.add_component("ConcatenateFilesGroups", [self.runobj,saved_files, reads_prefixes])
saved_files = concatenatefastq.concat_files
logging.getLogger("CasavaNG6Workflow").debug("illumina_process after concatenatefastq, saved_files = " + ",".join(saved_files))
else :
reads_prefixes = None
......
......@@ -17,7 +17,6 @@
import os
import logging
from jflow.utils import get_argument_pattern
from jflow.component import Component
......@@ -29,34 +28,48 @@ class ConcatenateFilesGroups (Component):
@summary : Concatenates files according to filename prefixes.
"""
def define_parameters(self, files_list, group_prefix):
def define_parameters(self, runobj, files_list, group_prefix):
"""
@param files_list : path of files that will be concatenated
@param group_prefix : list of filename prefixes
"""
logging.getLogger("ConcatenateFilesGroups").debug("define_parameters. group_prefix = " + ",".join(group_prefix))
self.runobj = runobj
self.add_input_file_list( "files_list", "files_list", default=files_list, required=True)
self.add_parameter_list("group_prefix", "group_prefix", default=group_prefix)
extensions = os.path.basename(self.files_list[0]).split(".")[1:]
self.add_output_file_list( "concat_files", "concat_files", pattern='{basename_woext}.'+".".join(extensions), items= self.group_prefix)
output_list = []
for prefix in self.group_prefix:
output_list.append( os.path.join(self.runobj.get_work_directory(),prefix+"."+".".join(extensions)))
logging.getLogger("ConcatenateFilesGroups").debug("define_parameters. extensions = " + ",".join(extensions))
self.add_output_file_list( "concat_files", "concat_files", pattern='{fullpath}', items= output_list)
#self.add_output_file_list( "concat_files", "concat_files", items= self.group_prefix)
def process(self):
logging.getLogger("ConcatenateFilesGroups").debug("process. entering")
logging.getLogger("ConcatenateFilesGroups").debug("process. work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))
logging.getLogger("ConcatenateFilesGroups").debug("process. self.files_list = " + ", ".join(self.files_list))
logging.getLogger("ConcatenateFilesGroups").debug("process. self.concat_files = " + ", ".join(self.concat_files))
# Create dictionary : key = prefix and value = list of files to concatenate
concat_groups = Utils.get_filepath_by_prefix(self.files_list, self.group_prefix)
files_to_concatenate = []
for file_to_concatenate in self.group_prefix:
files_to_concatenate.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file_to_concatenate)+"."+".".join(os.path.basename(self.files_list[0]).split(".")[1:])))
logging.getLogger("ConcatenateFilesGroups").debug("process. files_to_concatenate = " + ", ".join(files_to_concatenate))
self.concat_files = self.get_outputs('{fullpath}', files_to_concatenate)
# self.concat_files = files_to_concatenate
logging.getLogger("ConcatenateFilesGroups").debug("process. self.concat_files bis = " + ", ".join(self.concat_files))
# Create dictionary : key = prefix and value = the output file
outputs_path = Utils.get_filepath_by_prefix(self.concat_files, self.group_prefix)
outputs_path = Utils.get_filepath_by_prefix(files_to_concatenate, self.group_prefix)
out_files = []
for prefix in self.group_prefix:
logging.getLogger("ConcatenateFilesGroups").debug("process. working on "+prefix+", concat_groups[prefix] = "+",".join(concat_groups[prefix]) + ", outputs_path[prefix] = "+ ",".join(outputs_path[prefix]))
logging.getLogger("ConcatenateFilesGroups").debug("process. prefix = " + prefix)
logging.getLogger("ConcatenateFilesGroups").debug("process. outputs_path[prefix] = " + ", ".join(outputs_path[prefix]))
# Sort list of files to concatenate
concat_groups[prefix].sort()
[cmd_inputs_pattern, next_arg_number] = get_argument_pattern(concat_groups[prefix], 1)
logging.getLogger("ConcatenateFilesGroups").debug("process. cmd_inputs_pattern = "+cmd_inputs_pattern + ", next_arg_number = "+str(next_arg_number))
#TODO : check if a cp would be OK in case there is only one file per group
# If the file is not zip
if not self.files_list[0].endswith(".gz"):
self.add_shell_execution('cat ' + cmd_inputs_pattern + ' > ${' + str(next_arg_number) + '}',
......@@ -64,6 +77,10 @@ class ConcatenateFilesGroups (Component):
inputs = concat_groups[prefix], outputs = outputs_path[prefix])
# If the file is zip
else:
logging.getLogger("ConcatenateFilesGroups").debug("process. command = " + 'zcat ' + cmd_inputs_pattern + ' | gzip - > ${' + str(next_arg_number) + '}' + ", ".join(concat_groups[prefix]) + ", and ouput = "+", ".join(outputs_path[prefix]))
output = os.path.join(self.runobj.get_work_directory(),prefix+"."+".".join(os.path.basename(self.files_list[0]).split(".")[1:]))
out_files.append(output)
self.add_shell_execution('zcat ' + cmd_inputs_pattern + ' | gzip - > ${' + str(next_arg_number) + '}',
cmd_format='{EXE} {IN} {OUT}', map=False,
inputs = concat_groups[prefix], outputs = outputs_path[prefix])
# self.concat_files = out_files
......@@ -23,7 +23,8 @@ from ng6.utils import Utils
class FastqIlluminaFilter (Analysis):
def define_parameters(self, fastq_files, keep_reads="pass_illumina_filters", group_prefix=None):
def define_parameters(self,runobj, fastq_files, keep_reads="pass_illumina_filters", group_prefix=None):
self.runobj = runobj
self.add_input_file_list( "fastq_files", "fastq_files", default=fastq_files, required=True, file_format = 'fastq')
self.add_parameter("keep_reads", "keep_reads", default=keep_reads, choices=[ "pass_illumina_filters", "not_pass_illumina_filters", "all"])
self.add_parameter_list("group_prefix", "group_prefix", default=group_prefix)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment