Skip to content
Snippets Groups Projects
Commit bbefebc2 authored by mariabernard's avatar mariabernard
Browse files

Snakemake 1000RNASeq ASE : add summary script for trimgalore, star and to convert phred scale

parent fed78720
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
from Bio import SeqIO
from Bio.SeqIO.QualityIO import FastqGeneralIterator
import gzip
import os,sys
def is_gzip(file_in):
"""
@return: [bool] True if the file is gziped.
@param file : [str] Path to processed file.
"""
is_gzip = None
handle_input = gzip.open( file_in )
try:
handle_input.readline()
is_gzip = True
except:
is_gzip = False
finally:
handle_input.close()
return is_gzip
def checkQualityEncode ( file_in ):
"""
@summary : check the quality Phred scale. Inspire on https://github.com/brentp/bio-playground/blob/master/reads-utils/guess-encoding.py
@param input : [str] Path of fastq file
@return : either "Q33" or "Q64"
"""
RANGES = {
'Q33': (33, 74),
'Q64': (64, 105)
}
handle_in=None
if is_gzip(file_in):
handle_in = gzip.open(file_in,"rt")
else:
handle_in = open(file_in,"rt")
scale = ""
gmin, gmax = 99, 0
for title_line, seq_string, quality_string in FastqGeneralIterator(handle_in):
qual = [ ord(c) for c in quality_string ]
qmin = min(qual)
qmax = max(qual)
if qmin < gmin or qmax > gmax:
gmin, gmax = min(qmin, gmin), max(qmax, gmax)
valid_encodings = []
for encoding, (emin, emax) in RANGES.items():
if gmin >= emin and gmax <= emax:
valid_encodings.append(encoding)
if len(valid_encodings) == 1 :
scale = valid_encodings[0]
break
return scale
#~ f_in="/work/project/sigenae/Maria/Sandrine_SNP-ASE/1000_RNASeq/FLLL_livr_F0/tmp.fq.gz"
#~ f_out="tmp.gz"
f_in=snakemake.input.fastq_in
f_out=snakemake.output.fastq_out
encoding = checkQualityEncode(f_in)
if encoding == "Q33":
print(f_in+" is "+encoding+ " and DO NOT need to be converted\n", file=sys.stderr)
os.symlink(f_in, f_out)
else:
try:
print(f_in+" is "+encoding+ " and need to be converted\n", file=sys.stderr)
handle_in = gzip.open(f_in,"rt") if is_gzip(f_in) else open(f_in, "rt")
handle_out = gzip.open(f_out, "wt")
SeqIO.convert(handle_in, 'fastq-illumina' , handle_out, 'fastq-sanger')
except:
raise Exception("there is no valid encoding found")
#!/usr/bin/env python3
import os
import re
from collections import OrderedDict
def _build_lines_star(file, res_lines):
key_value_line = r"\s*(.+)\s\|\t(.+)\n"
with open(file, "r") as log_file:
lines = log_file.readlines()
for i in range(5, len(lines)): # Remove header
line = lines[i]
match = re.match(key_value_line, line)
if match:
name = match.group(1)
value = match.group(2).replace("%","")
if name not in res_lines:
res_lines[name] = []
res_lines[name].append(value)
return res_lines
def _build_star_summary(star_summary_file, logs_files):
with open(star_summary_file, "w") as sy_file:
res_lines = OrderedDict()
for log in logs_files:
res_lines = _build_lines_star(log, res_lines)
name_sample = re.search(r"(.+)_Log\.final\.out", os.path.basename(log)).group(1)
sy_file.write("\t" + name_sample)
sy_file.write("\n")
for name_line, values_line in res_lines.items():
sy_file.write(name_line)
if values_line is not None:
sy_file.write("\t" + "\t".join(values_line))
sy_file.write("\n")
sy_file.write("\n")
#Get outputs:
star_logs = snakemake.input.logs
starmap_summary = snakemake.output.out
# Build summary for STAR mapping:
_build_star_summary(starmap_summary, star_logs)
#!/usr/bin/env python3
import os
import re
from collections import OrderedDict
def build_single_lines(file,res_dic):
sample_single.append(re.search(r"(.+)_trim[SP]e.log", os.path.basename(file)).group(1))
with open(file, "r") as log_file:
line = log_file.readline()
# forward reads
while line != "=== Summary ===\n":
line = log_file.readline()
line = log_file.readline().strip()
while not line.startswith("==="):
match = re.match(r"(.+):\s*(.+)", line)
if match:
head = match.group(1)
head = head.replace("Total", "Total forward")
head = head.replace("Reads", "Forward Reads")
head = head.replace("Quality", "Forward quality")
value = match.group(2)
value = re.sub("\(.+\)", "", value)
value = value.replace(" bp", "").replace(",", "")
if head not in res_dic:
res_dic[head] = []
res_dic[head].append(value)
line = log_file.readline().strip()
# final length filter
while not "shorter than the length cutoff" in line :
line = log_file.readline()
head = "too short reads (pairs) removed"
value = line.split(":")[1].split("(")[0].strip()
if head not in res_dic:
res_dic[head] = []
res_dic[head].append(value)
def build_paired_lines(file,res_dic):
sample_paired.append(re.search(r"(.+)_trim[SP]e.log", os.path.basename(file)).group(1))
with open(file, "r") as log_file:
line = log_file.readline()
# forward reads
while line != "=== Summary ===\n":
line = log_file.readline()
line = log_file.readline().strip()
while not line.startswith("==="):
match = re.match(r"(.+):\s*(.+)", line)
if match:
head = match.group(1)
head = head.replace("Total", "Total forward")
head = head.replace("Reads", "Forward Reads")
head = head.replace("Quality", "Forward quality")
value = match.group(2)
value = re.sub("\(.+\)", "", value)
value = value.replace(" bp", "").replace(",", "")
if head not in res_dic:
res_dic[head] = []
res_dic[head].append(value)
line = log_file.readline().strip()
# reverse reads
while line != "=== Summary ===\n":
line = log_file.readline()
line = log_file.readline().strip()
while not line.startswith("==="):
match = re.match(r"(.+):\s*(.+)", line)
if match:
head = match.group(1)
head = head.replace("Total", "Total reverse")
head = head.replace("Reads", "Reverse Reads")
head = head.replace("Quality", "Reverse quality")
value = match.group(2)
value = re.sub("\(.+\)", "", value)
value = value.replace(" bp", "").replace(",", "")
if head not in res_dic:
res_dic[head] = []
res_dic[head].append(value)
line = log_file.readline().strip()
while not "shorter than the length cutoff" in line :
line = log_file.readline()
head = "too short reads (pairs) removed"
value = line.split(":")[1].split("(")[0].strip()
if head not in res_dic:
res_dic[head] = []
res_dic[head].append(value)
def build_lines(file):
with open(file, "r") as log_file:
line = log_file.readline()
while not line.startswith("Trimming mode:"):
line = log_file.readline()
# either paired-end or single-end
mode = line.split(":")[1].strip()
if mode == "paired-end":
build_paired_lines(file,res_paired_lines)
else:
build_single_lines(file,res_single_lines)
# parse logs
sample_paired = []
sample_single = []
res_paired_lines = OrderedDict()
res_single_lines = OrderedDict()
if isinstance(snakemake.input.logs, list) :
for log in snakemake.input.logs:
build_lines(log)
else:
build_lines(snakemake.input.logs)
# merge in one dict
sample_names = sample_paired + sample_single
res_lines = OrderedDict()
if len(res_paired_lines) > 0:
for head in res_paired_lines:
if head in res_single_lines:
res_lines[head] = res_paired_lines[head] + res_single_lines[head]
else:
res_lines[head] = res_paired_lines[head] + ['0']*len(sample_single)
else:
for head in res_single_lines:
res_lines[head] = res_single_lines[head]
# write results
with open(snakemake.output.out, "w") as summary_file:
summary_file.write("\t" + "\t".join(sample_names) + "\n")
for head, value in res_lines.items():
summary_file.write(head + "\t" + "\t".join(value) + "\n")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment