Commit f79a0774 authored by Audrey Gibert's avatar Audrey Gibert
Browse files

[Sequel_qc]

Beginning the transformation
parent 8645bb68
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import logging
from ng6.ng6workflow import NG6Workflow
from ng6.utils import Utils
class SequelQualityCheck (NG6Workflow):
def __init__(self):
"""Classe héritant de NG6Workflow"""
def get_name(self):
return 'sequel_qc'
def get_description(self):
return "Sequel II PacBio data loading and quality check"
def define_parameters(self, function="process"):
logging.getLogger("jflow").debug("SequelQC | SequelQualityCheck.define_parameters!")
self.add_parameter("nb_threads", "Number of threads to use for fastqc. Each thread will be allocated 250MB of memory.", default=3)
self.add_parameter("min_subreads_length", "Subreads shorter than this value (in base pairs) are filtered out and excluded from analysis", default=0, type='int')
self.add_parameter("polymerase_read_qual", "Polymerase reads with lower quality than this value are filtered out and excluded from analysis", default=0, type='float')
self.add_parameter("polymerase_read_length", "Polymerase reads shorter than this value (in base pairs) are filtered out and excluded from analysis", default=0, type='int')
self.add_input_file( "barcode_file", "Input barcode file", default=None)
self.add_parameter("barcode_score", "Min identical base for barcode", default=22, type='int')
def process(self):
logging.getLogger("jflow").debug("SequelQC | SequelQualityCheck.process started!")
sample_names = []
infiles = []
for sample in self.samples :
sample_names.append( sample.name )
infiles.append(sample.reads1[0])
add_pacbio_raw_file = self.add_component("AddPacBioRawFiles", [self.runobj, self.get_all_reads()])
h5tofastq = self.add_component("H5toFastq", [sample_names, infiles])
fastqc = self.add_component("FastQC", [h5tofastq.output_fastqs, False, False, "fastqc.tar.gz", self.nb_threads], parent = h5tofastq)
self.add_component("RS_Subreads", [sample_names, infiles,self.min_subreads_length,self.polymerase_read_qual,self.polymerase_read_length,self.barcode_file,self.barcode_score ])
logging.getLogger("jflow").debug("SequelQC | SequelQualityCheck.process ended!")
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import pickle
from jflow.component import Component
from workflows.pacbio_qc.lib.pacbiolib import h5file
from weaver.function import PythonFunction
def add_pacbio_raw_files(run_dump_path, tempdir, stdoutfile):
import pickle
import os
import gzip
import sys
from workflows.pacbio_qc.lib.pacbiolib import PacbioH5Reader
from jflow.utils import display_info_message
with open(stdoutfile, 'w') as fhout :
# --- add_pacbio_raw_files ---
my_run = pickle.load(open(run_dump_path, "rb"))
files_to_save = []
nb_seq, full_size = 0, 0
for ifile in my_run.raw_files:
analysisresults_dir = os.path.dirname(ifile)
celldir = os.path.dirname(analysisresults_dir)
if ifile not in files_to_save :
# total sequence length
reader = PacbioH5Reader(ifile)
h5 = reader.bash5
for name, description, sequence, qualities in reader :
nb_seq += 1
full_size += len(sequence)
for partfile in h5.parts :
if partfile not in files_to_save :
files_to_save.append(partfile.filename)
# it's a bas.h5
if h5.filename and h5.filename not in files_to_save:
files_to_save.append(h5.filename)
# copy .metadata.xml
if reader.metadata :
if reader.metadata not in files_to_save :
files_to_save.append(reader.metadata)
else :
display_info_message("Warning : no metadata file found for input file : %s "%ifile)
fhout.write("nb_seq : ")
fhout.write(str(nb_seq)+"\n")
fhout.write("full_size : ")
fhout.write(str(full_size)+"\n")
fhout.write("Files to save : \n")
fhout.write("\n".join(files_to_save) )
my_run.set_nb_sequences(nb_seq)
my_run.set_full_size(full_size)
my_run.archive_files(files_to_save, "none")
my_run.sync()
class AddPacBioRawFiles (Component):
def define_parameters(self, runobj, input_files):
self.runobj = runobj
self.add_input_file_list( "input_files", "File to be saved as raw files", default=input_files, file_format = h5file, required=True)
self.add_output_file("stdout", "AddPacBioRawFiles stdout", filename="AddPacBioRawFiles.stdout")
def process(self):
self.runobj.raw_files = self.input_files
run_dump_path = self.get_temporary_file(".dump")
pickle.dump(self.runobj, open(run_dump_path, "wb"))
addraw = PythonFunction(add_pacbio_raw_files, cmd_format='{EXE} {ARG} {OUT}')
addraw(outputs=self.stdout, includes=self.input_files, arguments=[run_dump_path, self.config_reader.get_tmp_directory()])
\ No newline at end of file
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
from workflows.pacbio_qc.lib.pacbiolib import h5file
from jflow.abstraction import MultiMap
from ng6.analysis import Analysis
from weaver.function import PythonFunction
def h5_to_fastq(input_file, fastqfile):
import gzip
from workflows.pacbio_qc.lib.pacbiolib import PacbioH5Reader
# generate fastq.gz file
f_out = gzip.open(fastqfile, 'wb')
reader = PacbioH5Reader(input_file)
for name, description, sequence, qualities in reader :
f_out.write(str("@"+name+"\n").encode())
f_out.write(sequence+"\n".encode())
f_out.write("+\n".encode())
f_out.write(qualities+"\n".encode())
f_out.close()
class H5toFastq (Analysis):
def define_parameters(self, sample_names, input_files):
self.add_parameter_list("sample_names", "sample names, each sample name must correspond to an input file", default=sample_names, required=True)
self.add_input_file_list( "input_files", "Input pacbio bas.h5 files", default=input_files, file_format = h5file, required=True)
self.add_output_file_list( 'output_fastqs', "Output fastq files", pattern="{basename_woext}.fastq.gz", file_format="fastq", items=self.sample_names)
def process(self):
convertion = PythonFunction(h5_to_fastq, cmd_format="{EXE} {IN} {OUT}")
bwa = MultiMap(convertion, inputs=self.input_files, outputs=self.output_fastqs)
def define_analysis(self):
self.name = "H5toFastq"
self.description = "Extract subreads of pacbio bas.h5 files to fastq files"
self.software = "Python"
self.options = ""
def post_process(self):
self._save_files(self.output_fastqs)
def get_version(self):
return "1.0"
\ No newline at end of file
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import json
import logging
from workflows.pacbio_qc.lib.pacbiolib import h5file
from ng6.analysis import Analysis
from weaver.function import PythonFunction
def rs_subreads(inputfile, stdout, componentdir, smrtpipe, fofnToSmrtpipeInput, min_subreads_length, polymerase_read_qual, polymerase_read_length, barcode_file, barcode_score ):
import subprocess
import os
import logging
from workflows.pacbio_qc.lib.pacbiolib import PacbioH5Reader
logging.getLogger("jflow").debug("Begin rs_subreads!")
# create directory
filebase = os.path.basename(os.path.splitext(os.path.splitext(inputfile)[0])[0])
outputdir = os.path.join(componentdir, filebase)
settings_xml = os.path.join(componentdir, filebase + "_settings.xml" )
inputs_fofn = os.path.join(componentdir, filebase + "_inputs.fofn" )
inputs_xml = os.path.join(componentdir, filebase + "_inputs.xml" )
if not os.path.exists(outputdir) :
subprocess.check_call("mkdir %s >> %s"%(outputdir, stdout), shell=True)
barcode_string=""
if barcode_file != None :
barcode_string="""
<module id="P_Barcode">
<param name='mode'><value>symmetric</value></param>
<param name='barcode.fasta'>
<value>{0}</value>
</param>
<param name='score'><value>{1}</value></param>
</module>""".format(barcode_file, barcode_score)
# write settings
with open( settings_xml, "w") as fh:
fh.write(("""<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<smrtpipeSettings>
<module id="P_Fetch" />
<module id="P_Filter" >
<param name="minSubReadLength">
<value>{0}</value>
</param>
<param name="readScore">
<value>{1}</value>
</param>
<param name="minLength">
<value>{2}</value>
</param>
</module>
<module id="P_FilterReports"/>
"""+barcode_string+"""
</smrtpipeSettings>
""").format(min_subreads_length, polymerase_read_qual, polymerase_read_length))
if barcode_file == 'None' or barcode_file is None :
# write settings
with open( settings_xml, "w") as fh:
fh.write(("""<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<smrtpipeSettings>
<module id="P_Fetch" />
<module id="P_Filter" >
<param name="minSubReadLength">
<value>{0}</value>
</param>
<param name="readScore">
<value>{1}</value>
</param>
<param name="minLength">
<value>{2}</value>
</param>
</module>
<module id="P_FilterReports"/>
</smrtpipeSettings>
""").format(min_subreads_length, polymerase_read_qual, polymerase_read_length))
# write fofn and xml
reader = PacbioH5Reader(inputfile)
with open( inputs_fofn, "w") as fh:
for baxh5 in reader.bash5.parts :
fh.write(baxh5.filename + "\n")
subprocess.check_call("%s %s > %s"%(fofnToSmrtpipeInput, inputs_fofn, inputs_xml), shell = True)
# run smrtpipe
logging.getLogger("jflow").debug("Begin rs_subreads! smrtpipe ")
logging.getLogger("jflow").debug("-------> %s --output=%s --params=%s xml:%s >> %s"%(smrtpipe, outputdir, settings_xml, inputs_xml,stdout))
subprocess.check_call("%s --output=%s --params=%s xml:%s >> %s"%(smrtpipe, outputdir, settings_xml, inputs_xml,stdout), shell = True)
logging.getLogger("jflow").debug("End rs_subreads! smrtpipe")
def extract_metrics(inputfile):
import h5py
import os
from workflows.pacbio_qc.lib.pacbiolib import PacbioH5Reader
reader = PacbioH5Reader(inputfile)
for baxh5 in reader.bash5.parts :
h5 = h5py.File(baxh5,'r')
metrics = h5['PulseData']['BaseCalls']['ZMWMetrics']
class RS_Subreads (Analysis):
"""
implementation of pacbio RS_Subreads.1 protocol
This module filters reads based on a minimum subread length, polymerase read quality and polymerase read length
"""
def define_parameters(self, sample_names, input_files, min_subreads_length = 0, polymerase_read_qual = 0, polymerase_read_length = 0, barcode_file=None, barcode_score=22):
logging.getLogger("jflow").debug("Begin RS_Subreads.define_parameters!")
self.add_parameter_list("sample_names", "sample names, each sample name must correspond to an input file", default=sample_names, required=True)
self.add_input_file_list( "input_files", "Input pacbio bas.h5 files", default=input_files, file_format = h5file, required=True)
self.add_parameter("min_subreads_length", "Subreads shorter than this value (in base pairs) are filtered out and excluded from analysis", default=min_subreads_length, type='int')
self.add_parameter("polymerase_read_qual", "Polymerase reads with lower quality than this value are filtered out and excluded from analysis", default=polymerase_read_qual, type='float')
self.add_parameter("polymerase_read_length", "Polymerase reads shorter than this value (in base pairs) are filtered out and excluded from analysis", default=polymerase_read_length, type='int')
self.add_input_file( "barcode_file", "Input barcode file", default=barcode_file)
self.add_parameter("barcode_score", "Min identical base for barcode", default=barcode_score, type='int')
items = [os.path.basename(os.path.splitext(os.path.splitext(filename)[0])[0]) for filename in self.input_files]
self.add_output_file_list( 'stdouts', "logs", pattern="{basename}.stdout", items=items)
def process(self):
logging.getLogger("jflow").debug("Begin RS_Subreads.process! pacbio_qc")
subreads = PythonFunction(rs_subreads, cmd_format="{EXE} {IN} {OUT} {ARG}")
for i,e in enumerate(self.input_files) :
logging.getLogger("jflow").debug("Begin RS_Subreads.process! " + e)
print('START FOR ',i,'------>',e)
'''self.add_python_execution(rs_subreads,
cmd_format="{EXE} {IN} {OUT} {ARG}",
map=False,
inputs = [self.input_files[i]],
outputs = [self.stdouts[i]],
arguments = [self.output_directory, self.get_exec_path("smrtpipe"), self.get_exec_path("fofnToSmrtpipeInput.py"), self.min_subreads_length, self.polymerase_read_qual, self.polymerase_read_length, self.barcode_file, self.barcode_score]
)'''
subreads(inputs = self.input_files[i],
outputs = [self.stdouts[i]],
arguments = [self.output_directory, self.get_exec_path("smrtpipe"), self.get_exec_path("fofnToSmrtpipeInput.py"),
self.min_subreads_length, self.polymerase_read_qual, self.polymerase_read_length, self.barcode_file, self.barcode_score ])
logging.getLogger("jflow").debug("End RS_Subreads.process! ")
print('END PROCESS TEST')
def get_version(self):
return "1.0"
def define_analysis(self):
self.name = "RS_Subreads"
self.description = "Run SMRTAnalysis RS_Subreads.1"
self.software = "Python"
if self.barcode_file :
self.options = "minSubReadLength %s readScore %s minLength %s barcode_file %s barcode_score %s"%(self.min_subreads_length, self.polymerase_read_qual, self.polymerase_read_length, self.barcode_file, self.barcode_score)
else :
self.options = "minSubReadLength %s readScore %s minLength %s"%(self.min_subreads_length, self.polymerase_read_qual, self.polymerase_read_length)
def post_process(self):
logging.getLogger("jflow").debug("Begin RS_Subreads.post_process!")
metrics = []
metrics2 = []
results_files = []
for i,samplefile in enumerate(self.input_files) :
sample = self.sample_names[i]
sdir = os.path.basename(os.path.splitext(os.path.splitext(samplefile)[0])[0])
sample_outdir = os.path.join(self.output_directory, sdir)
# loading
jsonfile = os.path.join(sample_outdir, 'results', 'filter_reports_loading.json')
if os.path.isfile(jsonfile):
group = 'loading'
if group not in metrics :
metrics.append(group)
self._add_result_element("metrics", "headers", ','.join(['Productive ZMWs', 'Productivity 0', 'Productivity 1', 'Productivity 2']), group)
stats = self.parse_loading_json(jsonfile)
self._add_result_element(sample, 'Productive ZMWs', stats['Productive ZMWs'] , group)
self._add_result_element(sample, 'Productivity 0', stats['Productivity 0'] , group)
self._add_result_element(sample, 'Productivity 1', stats['Productivity 1'] , group)
self._add_result_element(sample, 'Productivity 2', stats['Productivity 2'] , group)
# adapters
jsonfile = os.path.join(sample_outdir, 'results', 'filter_reports_adapters.json')
if os.path.isfile(jsonfile):
group = 'adapter_stats'
if group not in metrics :
metrics.append(group)
self._add_result_element("metrics", "headers", ','.join(['Adapter Dimers', 'Medium Inserts', 'Short Inserts', "insert"]), group)
stats = self.parse_adapter_json(jsonfile)
self._add_result_element(sample, 'Adapter Dimers', stats['Adapter Dimers'] , group)
self._add_result_element(sample, 'Medium Inserts', stats['Medium Inserts'] , group)
self._add_result_element(sample, 'Short Inserts', stats['Short Inserts'] , group)
image = os.path.join(sample_outdir, 'results', 'adapter_observed_insert_length_distribution.png')
self._add_result_element(sample, "insert", self._save_file(image, sample + ".adapter_observed_insert_length_distribution.png"), group)
# filter stats
jsonfile = os.path.join(sample_outdir, 'results', 'filter_reports_filter_stats.json')
if os.path.isfile(jsonfile):
stats = self.parse_filter_json(jsonfile)
groups = ['prefilter_stats', 'postfilter_stats']
for group in groups :
if group not in metrics :
metrics.append(group)
self._add_result_element("metrics", "headers", ','.join(['Mean Read Score', 'Number of Reads', 'Number of Bases',
'N50', 'Mean Read Length', 'readlen', 'readscore']), group)
self._add_result_element(sample, 'Mean Read Score', stats[group]['Mean Read Score'] , group)
self._add_result_element(sample, 'Number of Reads', stats[group]['Number of Reads'] , group)
self._add_result_element(sample, 'Number of Bases', stats[group]['Number of Bases'] , group)
self._add_result_element(sample, 'N50', stats[group]['N50'] , group)
self._add_result_element(sample, 'Mean Read Length', stats[group]['Mean Read Length'] , group)
if group == 'prefilter_stats' :
image = os.path.join(sample_outdir, 'results', 'pre_filter_readlength_histogram.png')
self._add_result_element(sample, "readlen", self._save_file(image, sample + ".pre_filter_readlength_histogram.png"), group)
image = os.path.join(sample_outdir, 'results', 'pre_filterread_score_histogram.png')
self._add_result_element(sample, "readscore", self._save_file(image, sample + ".pre_filterread_score_histogram.png"), group)
if group == 'postfilter_stats' :
image = os.path.join(sample_outdir, 'results', 'post_filter_readlength_histogram.png')
self._add_result_element(sample, "readlen", self._save_file(image, sample + ".post_filter_readlength_histogram.png"), group)
image = os.path.join(sample_outdir, 'results', 'post_filterread_score_histogram.png')
self._add_result_element(sample, "readscore", self._save_file(image, sample + ".post_filterread_score_histogram.png"), group)
# subreads filter stats
jsonfile = os.path.join(sample_outdir, 'results', 'filter_reports_filter_subread_stats.json')
if os.path.isfile(jsonfile):
group = 'subreads_stats'
if group not in metrics :
metrics.append(group)
self._add_result_element("metrics", "headers", ','.join(['Mean Subread length', 'N50', 'Number of Bases', "Number of Reads", 'report']), group)
stats = self.parse_subreads_filter_json(jsonfile)
self._add_result_element(sample, 'Mean Subread length', stats['Mean Subread length'] , group)
self._add_result_element(sample, 'N50', stats['N50'] , group)
self._add_result_element(sample, 'Number of Bases', stats['Number of Bases'] , group)
self._add_result_element(sample, 'Number of Reads', stats['Number of Reads'] , group)
image = os.path.join(sample_outdir, 'results', 'filtered_subread_report.png')
self._add_result_element(sample, "report", self._save_file(image, sample + ".filtered_subread_report.png"), group)
# Barcode report
jsonfile = os.path.join(sample_outdir, 'results', 'barcode_report.json')
if os.path.isfile(jsonfile):
logging.getLogger("jflow").debug("Begin RS_Subreads.post_process Barcode report")
group = 'barcode_results'
#Finaly create and add the archive to the analysis
logging.getLogger("jflow").debug("Begin RS_Subreads.post_process Barcode report START archive")
results_files.append(self.barcode_file)
logging.getLogger("jflow").debug("barcode_file : " + self.barcode_file)
demultiplex_fasta_tar = os.path.join(sample_outdir,'data','barcoded-fastqs.tgz')
logging.getLogger("jflow").debug("demultiplex_fasta_tar : " + demultiplex_fasta_tar)
results_files.append(demultiplex_fasta_tar)
self._create_and_archive(results_files,"Archive_Barcode.tar")
if group not in metrics2 :
metrics2.append(group)
self._add_result_element("metrics2", "headers", ','.join(['Reads', 'Bases']), group)
stats = self.parse_barcode_report_json(jsonfile)
barcode_sample = []
for i,barcode in enumerate(stats['Barcode Name']):
barcode_sample.append(barcode)
self._add_result_element(sample, 'Reads', stats['Reads'][i] , barcode)
self._add_result_element(sample, 'Bases', stats['Bases'][i] , barcode)
self._add_result_element("metrics2", "barcode_sample", ','.join(barcode_sample), group)
# ------ parsing of json files ------
def load_json(self,filename):
jsonstring = "{}"
with open(filename) as fh :
jsonstring = fh.readline()
return json.loads(jsonstring)
def parse_loading_json(self,jsonfile):
obj = self.load_json(jsonfile)
res = {}
for e in obj['attributes'] :
if e['name'] == 'Productive ZMWs' :
res["Productive ZMWs"] = e['value']
elif e['name'] == 'ZMW Loading Productivity 2' :
res["Productivity 0"] = e['value']
elif e['name'] == 'ZMW Loading Productivity 3' :
res["Productivity 1"] = e['value']
elif e['name'] == 'ZMW Loading Productivity 4' :
res["Productivity 2"] = e['value']
return res
def parse_adapter_json(self,jsonfile):
obj = self.load_json(jsonfile)
res = {}
for e in obj['attributes'] :
if e['id'] == 'adapter.adapter_dimers' :
res["Adapter Dimers"] = e['value']
elif e['id'] == 'adapter.short_inserts' :
res["Short Inserts"] = e['value']
elif e['id'] == 'adapter.medium_inserts' :
res["Medium Inserts"] = e['value']
return res
def parse_filter_json(self, jsonfile):
obj = self.load_json(jsonfile)
res = {
'prefilter_stats' : {},
'postfilter_stats': {}
}
for e in obj['attributes'] :
if e['id'] == 'filtering_report.mean_read_score_pre_filter' :
res['prefilter_stats']["Mean Read Score"] = e['value']
elif e['id'] == 'filtering_report.reads_n_pre_filter' :
res['prefilter_stats']["Number of Reads"] = e['value']
elif e['id'] == 'filtering_report.base_n_pre_filter' :
res['prefilter_stats']["Number of Bases"] = e['value']
elif e['id'] == 'filtering_report.n50_read_length_pre_filter' :
res['prefilter_stats']["N50"] = e['value']
elif e['id'] == 'filtering_report.mean_read_length_pre_filter' :
res['prefilter_stats']["Mean Read Length"] = e['value']
elif e['id'] == 'filtering_report.mean_read_score_post_filter' :
res['postfilter_stats']["Mean Read Score"] = e['value']
elif e['id'] == 'filtering_report.reads_n_post_filter' :
res['postfilter_stats']["Number of Reads"] = e['value']
elif e['id'] == 'filtering_report.base_n_post_filter' :
res['postfilter_stats']["Number of Bases"] = e['value']
elif e['id'] == 'filtering_report.n50_read_length_post_filter' :
res['postfilter_stats']["N50"] = e['value']
elif e['id'] == 'filtering_report.mean_read_length_post_filter' :
res['postfilter_stats']["Mean Read Length"] = e['value']
return res