Commit b65bedb4 authored by Floreal Cabanettes's avatar Floreal Cabanettes
Browse files

Add filter of data before processing + some code fix, Fixes #2

parent 6c93d608
#!/usr/bin/env python3
import argparse
import time
import os
from split_fa import Splitter
from filter_contigs import Filter
from build_index import index_file
parser = argparse.ArgumentParser(description="Split huge contigs")
parser.add_argument('-q', '--query', type=str, required=False, help="Query fasta file")
parser.add_argument('-u', '--query-split', type=str, required=False, help="Query fasta file split")
parser.add_argument('-t', '--target', type=str, required=True, help="Target fasta file")
parser.add_argument('-n', '--query-name', type=str, required=False, help="Query name")
parser.add_argument('-m', '--target-name', type=str, required=True, help="Target name")
parser.add_argument('-s', '--size', type=int, required=False, default=10,
help="Max size of contigs (Mb) - for query split")
parser.add_argument('-p', '--preptime-file', type=str, required=True, help="File into save prep times")
args = parser.parse_args()
out_dir = os.path.dirname(args.target)
with open(args.preptime_file, "w") as ptime:
ptime.write(str(round(time.time())) + "\n")
if args.query is not None:
print("Splitting query...")
fasta_in = args.query
index_split = os.path.join(out_dir, "query_split.idx")
splitter = Splitter(input_f=fasta_in, name_f=args.query_name, output_f=args.query_split,
query_index=index_split)
if splitter.split():
filtered_fasta = os.path.join(os.path.dirname(args.query_split), "filtered_" +
os.path.basename(args.query_split))
filter_f = Filter(fasta=args.query_split,
index_file=index_split,
type_f="query",
min_filtered=splitter.nb_contigs / 4,
split=True,
out_fasta=filtered_fasta,
replace_fa=True)
filter_f.filter()
else:
exit(1)
print("Indexing target...")
uncompressed = None
if args.target.endswith(".gz"):
uncompressed = args.target[:-3]
target_index = os.path.join(out_dir, "target.idx")
success, nb_contigs = index_file(args.target, args.target_name, target_index, uncompressed)
if success:
in_fasta = args.target
if uncompressed is not None:
in_fasta = uncompressed
filtered_fasta = os.path.join(os.path.dirname(in_fasta), "filtered_" + os.path.basename(in_fasta))
filter_f = Filter(fasta=in_fasta,
index_file=target_index,
type_f="target",
min_filtered=nb_contigs / 4,
split=False,
out_fasta=filtered_fasta,
replace_fa=True)
is_filtered = filter_f.filter()
if uncompressed is not None:
if is_filtered:
os.remove(args.target)
with open(os.path.join(out_dir, ".target"), "w") as save_file:
save_file.write(uncompressed)
else:
os.remove(uncompressed)
else:
if uncompressed is not None:
try:
os.remove(uncompressed)
except FileNotFoundError:
pass
exit(1)
ptime.write(str(round(time.time())) + "\n")
print("DONE!")
......@@ -5,10 +5,14 @@ import re
import gzip
def index_file(fasta_path, fasta_name, out):
def index_file(fasta_path, fasta_name, out, write_fa=None):
has_header = False
next_header = False # True if next line must be a header line
compressed = fasta_path.endswith(".gz")
nb_contigs = 0
write_f = None
if write_fa is not None:
write_f = open(write_fa, "w")
with (gzip.open(fasta_path) if compressed else open(fasta_path)) as in_file, \
open(out, "w") as out_file:
out_file.write(fasta_name + "\n")
......@@ -16,12 +20,15 @@ def index_file(fasta_path, fasta_name, out):
contig = None
len_c = 0
for line in fasta:
if write_f is not None:
write_f.write(line)
line = line.strip("\n")
if re.match(r"^>.+", line) is not None:
has_header = True
next_header = False
if contig is not None:
if len_c > 0:
nb_contigs += 1
out_file.write("%s\t%d\n" % (contig, len_c))
else:
return False
......@@ -35,9 +42,13 @@ def index_file(fasta_path, fasta_name, out):
next_header = True
if contig is not None and len_c > 0:
nb_contigs += 1
out_file.write("%s\t%d\n" % (contig, len_c))
return has_header
if write_f is not None:
write_f.close()
return has_header, nb_contigs
if __name__ == '__main__':
......
#!/usr/bin/env python3
import os
import re
import shutil
from dgenies.lib.paf import Index
from Bio import SeqIO
class Filter:
def __init__(self, fasta, index_file, type_f, min_filtered=0, split=False, out_fasta=None, replace_fa=False):
self.fasta = fasta
self.index_file = index_file
self.type_f = type_f
self.min_filtered = min_filtered
self.split = split
if out_fasta is not None:
self.out_fasta = out_fasta
else:
self.out_fasta = os.path.join(os.path.dirname(self.fasta), "filtered_" + os.path.basename(self.fasta))
self.replace_fa = replace_fa
def filter(self):
f_outs = self._check_filter()
if len(f_outs) > 0:
self._filter_out(f_outs=f_outs)
return True
return False
def _check_filter(self):
"""
Load index of fasta file, and determine contigs which must be removed. Remove them only in the index
:param index_file: index file for the fasta file
:return: list of contigs which must be removed
"""
# Load contigs:
name, order, contigs, reversed_c, abs_start, c_len = Index.load(index_file=self.index_file,
merge_splits=self.split)
# Sort contigs:
contigs_order = sorted(order, key=lambda x: -contigs[x])
# Find the N90:
sum_l = 0
n95_contig = None
n95_value = 0.95 * c_len
pos = -1
for contig in contigs_order:
pos += 1
sum_l += contigs[contig]
if sum_l >= n95_value:
n95_contig = contig
break
# Min length of contigs
min_length = 0.05 * contigs[n95_contig]
f_outs = []
breakpoint = None
for contig in contigs_order[pos:]:
if contigs[contig] < min_length:
breakpoint = pos
break
pos += 1
if breakpoint is not None:
f_outs = contigs_order[breakpoint:]
if len(f_outs) > self.min_filtered:
with open(os.path.join(os.path.dirname(self.fasta), ".filter-" + self.type_f), "w") as list_f:
list_f.write("\n".join(f_outs) + "\n")
kept = contigs_order[:breakpoint]
if self.split:
f_outs = []
name, contigs_order_split, contigs, reversed_c, abs_start_split, c_len_split = \
Index.load(index_file=self.index_file, merge_splits=False)
kept_s = []
for contig in contigs_order_split:
match = re.match(r"(.+)_###_\d+", contig)
contig_name = contig
if match is not None:
contig_name = match.group(1)
if contig_name in kept:
kept_s.append(contig)
else:
f_outs.append(contig)
kept = kept_s
else:
kept.sort(key=lambda k: order.index(k))
Index.save(index_file=self.index_file,
name=name,
contigs=contigs,
order=kept,
reversed_c=reversed_c)
else:
f_outs = []
return f_outs
def _filter_out(self, f_outs):
"""
Remove too small contigs from Fasta file
:param fasta: fasta files
:param f_outs: contigs which must be filtered out
"""
sequences = SeqIO.parse(open(self.fasta), "fasta")
keeped = (record for record in sequences if record.name not in f_outs)
with open(self.out_fasta, "w") as out_fa:
SeqIO.write(keeped, out_fa, "fasta")
if self.replace_fa:
os.remove(self.fasta)
shutil.move(self.out_fasta, self.fasta)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Filter too small contigs")
parser.add_argument('-f', '--fasta', type=str, required=True, help="Input fasta file")
parser.add_argument('-i', '--index', type=str, required=True, help="Index file for the fasta file")
parser.add_argument('-t', '--type', type=str, required=True, choices=["query", "target"],
help="Type of fasta: query or target")
parser.add_argument('-m', '--min-filtered', type=int, required=False, default=0,
help="Minimum number of filtered contigs")
parser.add_argument('-r', "--replace-fasta", action='store_const', const=True, default=False,
help="Replace original fasta file")
parser.add_argument('-s', "--split", action='store_const', const=True, default=False,
help="Is fasta split")
args = parser.parse_args()
filter_f = Filter(fasta=args.fasta,
index_file=args.index,
type_f=args.type,
min_filtered=args.min_filtered,
split=args.split,
replace_fa=args.replace_fasta)
filter_f.filter()
......@@ -19,11 +19,12 @@ class Splitter:
self.output_gz = output_f.endswith(".gz")
self.out_dir = os.path.dirname(output_f)
self.index_file = os.path.join(self.out_dir, query_index)
self.nb_contigs = 0
def split(self):
"""
Split contigs in smaller ones
:return: True if the Fasta is correct, False else
:return: True if the Fasta is correct, else False
"""
has_header = False
next_header = False # True if next line must be a header line
......@@ -41,6 +42,7 @@ class Splitter:
has_header = True
next_header = False
if chr_name is not None and len(fasta_str) > 0:
self.nb_contigs += 1
self.flush_contig(fasta_str, chr_name, self.size_c, enc, index_f)
elif chr_name is not None:
return False
......@@ -53,6 +55,7 @@ class Splitter:
fasta_str += line
elif len(line) == 0:
next_header = True
self.nb_contigs += 1
self.flush_contig(fasta_str, chr_name, self.size_c, enc, index_f)
return has_header
......
......@@ -19,6 +19,7 @@ from pathlib import Path
from urllib import request, parse
from dgenies.bin.split_fa import Splitter
from dgenies.bin.build_index import index_file
from dgenies.bin.filter_contigs import Filter
from dgenies.bin.merge_splitted_chrms import Merger
from dgenies.bin.sort_paf import Sorter
import gzip
......@@ -95,6 +96,12 @@ class JobManager:
return "no-match"
return "fail"
def is_query_filtered(self):
return os.path.exists(os.path.join(self.output_dir, ".filter-query"))
def is_target_filtered(self):
return os.path.exists(os.path.join(self.output_dir, ".filter-target"))
def get_mail_content(self, status):
message = "D-Genies\n\n"
if status == "success":
......@@ -114,6 +121,15 @@ class JobManager:
message += "Target: %s\nQuery: %s\n\n" % (self.target.get_name(), self.query.get_name())
else:
message += "Target: %s\n\n" % self.target.get_name()
if status == "success":
if self.is_target_filtered():
message += str("Note: target fasta has been filtered because it contains too small contigs."
"To see which contigs has been removed from the analysis, click on the link below:\n"
"{1}/filter-out/{0}/target\n\n").format(self.id_job, self.config.web_url)
if self.is_query_filtered():
message += str("Note: query fasta has been filtered because it contains too small contigs."
"To see which contigs has been removed from the analysis, click on the link below:\n"
"{1}/filter-out/{0}/query\n\n").format(self.id_job, self.config.web_url)
message += "See you soon on D-Genies,\n"
message += "The team"
return message
......@@ -125,7 +141,8 @@ class JobManager:
return template.render(job_name=self.id_job, status=status, url_base=self.config.web_url,
query_name=self.query.get_name() if self.query is not None else "",
target_name=self.target.get_name(),
error=self.error)
error=self.error,
target_filtered=self.is_target_filtered(), query_filtered=self.is_query_filtered())
def get_mail_subject(self, status):
if status == "success" or status == "no-match":
......@@ -143,8 +160,10 @@ class JobManager:
self.error = job.error
# Send:
self.mailer.send_mail([self.email], self.get_mail_subject(status), self.get_mail_content(status),
self.get_mail_content_html(status))
self.mailer.send_mail(recipients=[self.email],
subject=self.get_mail_subject(status),
message=self.get_mail_content(status),
message_html=self.get_mail_content_html(status))
def search_error(self):
logs = os.path.join(self.output_dir, "logs.txt")
......@@ -548,13 +567,17 @@ class JobManager:
thread.start() # Start the execution
def prepare_data_cluster(self, batch_system_type):
args = [self.preptime_file, self.config.cluster_python_script, self.target.get_path(), self.target.get_name(),
self.idx_t]
args = [self.config.cluster_prepare_script,
"-t", self.target.get_path(),
"-m", self.target.get_name(),
"-p", self.preptime_file]
if self.query is not None:
args += [self.query.get_path(), self.query.get_name(), self.get_query_split()]
args += ["-q", self.query.get_path(),
"-u", self.get_query_split(),
"-n", self.query.get_name()]
return self.launch_to_cluster(step="prepare",
batch_system_type=batch_system_type,
command=self.config.cluster_prepare_script,
command=self.config.cluster_python_script,
args=args,
log_out=self.logs,
log_err=self.logs)
......@@ -570,14 +593,55 @@ class JobManager:
fasta_in = self.query.get_path()
splitter = Splitter(input_f=fasta_in, name_f=self.query.get_name(), output_f=self.get_query_split(),
query_index=self.query_index_split)
if not splitter.split():
if splitter.split():
filtered_fasta = os.path.join(os.path.dirname(self.get_query_split()), "filtered_" +
os.path.basename(self.get_query_split()))
filter_f = Filter(fasta=self.get_query_split(),
index_file=self.query_index_split,
type_f="query",
min_filtered=round(splitter.nb_contigs / 4),
split=True,
out_fasta=filtered_fasta,
replace_fa=True)
filter_f.filter()
else:
job.status = "fail"
job.error = "<br/>".join(["Query fasta file is not valid!", error_tail])
job.save()
if self.config.send_mail_status:
self.send_mail_post()
return False
if not index_file(self.target.get_path(), self.target.get_name(), self.idx_t):
uncompressed = None
if self.target.get_path().endswith(".gz"):
uncompressed = self.target.get_path()[:-3]
success, nb_contigs = index_file(self.target.get_path(), self.target.get_name(), self.idx_t, uncompressed)
if success:
in_fasta = self.target.get_path()
if uncompressed is not None:
in_fasta = uncompressed
filtered_fasta = os.path.join(os.path.dirname(in_fasta), "filtered_" + os.path.basename(in_fasta))
filter_f = Filter(fasta=in_fasta,
index_file=self.idx_t,
type_f="target",
min_filtered=round(nb_contigs / 4),
split=False,
out_fasta=filtered_fasta,
replace_fa=True)
is_filtered = filter_f.filter()
if uncompressed is not None:
if is_filtered:
os.remove(self.target.get_path())
self.target.set_path(uncompressed)
with open(os.path.join(self.output_dir, ".target"), "w") as save_file:
save_file.write(uncompressed)
else:
os.remove(uncompressed)
else:
if uncompressed is not None:
try:
os.remove(uncompressed)
except FileNotFoundError:
pass
job.status = "fail"
job.error = "<br/>".join(["Target fasta file is not valid!", error_tail])
job.save()
......
......@@ -46,7 +46,20 @@
<p>Sequences compared in this analysis:</p>
<p><em>Target:</em> {{ target_name }}<br/>
{% if query_name != "" %}
<em>Query:</em> {{ query_name }}</p>
<em>Query:</em> {{ query_name }}
</p>
{% if status == "success" %}
{% if target_filtered %}
<p>Note: target fasta has been filtered because it contains too small contigs.<br/>
To see which contigs has been removed from the analysis,
<a href="{{ url_base }}/filter-out/{{ job_name }}/target">click here</a>.</p>
{% endif %}
{% if query_filtered %}
<p>Note: query fasta has been filtered because it contains too small contigs.<br/>
To see which contigs has been removed from the analysis,
<a href="{{ url_base }}/filter-out/{{ job_name }}/query">click here</a>.</p>
{% endif %}
{% endif %}
<p>See you soon on D-Genies,</p>
......
#!/usr/bin/env python3
import os
import re
import shutil
from math import sqrt
from numpy import mean
......@@ -12,6 +13,54 @@ import json
from collections import Counter
class Index:
def __init__(self):
pass
@staticmethod
def load(index_file, merge_splits=False):
with open(index_file, "r") as idx_q_f:
abs_start = {}
abs_current_start = 0
c_len = 0
name = idx_q_f.readline().strip("\n")
order = []
contigs = {}
reversed_c = {}
for line in idx_q_f:
parts = line.strip("\n").split("\t")
id_c = parts[0]
is_split = False
if merge_splits:
match = re.match(r"(.+)_###_\d+", id_c)
if match is not None:
id_c = match.group(1)
is_split = True
len_c = int(parts[1])
if len(parts) > 2:
reversed_c[id_c] = parts[2] == "1"
else:
reversed_c[id_c] = False
if not is_split or (is_split and id_c not in order):
order.append(id_c)
abs_start[id_c] = abs_current_start
contigs[id_c] = len_c
else:
contigs[id_c] += len_c
c_len += len_c
abs_current_start += len_c
return name, order, contigs, reversed_c, abs_start, c_len
@staticmethod
def save(index_file, name, contigs, order, reversed_c):
with open(index_file, "w") as idx:
idx.write(name + "\n")
for contig in order:
idx.write("\t".join([contig, str(contigs[contig]), "1" if reversed_c[contig] else "0"])
+ "\n")
class Paf:
limit_idy = [0.25, 0.5, 0.75]
max_nb_lines = 100000
......@@ -101,31 +150,6 @@ class Paf:
keep_lines[cls].append(line)
return keep_lines
@staticmethod
def load_index(index):
with open(index, "r") as idx_q_f:
abs_start = {}
abs_current_start = 0
c_len = 0
name = idx_q_f.readline().strip("\n")
order = []
contigs = {}
reversed = {}
for line in idx_q_f:
parts = line.strip("\n").split("\t")
id_c = parts[0]
len_c = int(parts[1])
if len(parts) > 2:
reversed[id_c] = parts[2] == "1"
else:
reversed[id_c] = False
order.append(id_c)
abs_start[id_c] = abs_current_start
contigs[id_c] = len_c
c_len += len_c
abs_current_start += len_c
return name, order, contigs, reversed, abs_start, abs_current_start, c_len
def parse_paf(self, merge_index=True, noise=True):
min_idy = 10000000000
max_idy = -10000000000
......@@ -136,25 +160,21 @@ class Paf:
"3": [] # idy > 0.75
}
try:
name_q, q_order, q_contigs, q_reversed, q_abs_start, q_abs_current_start, q_len = self.load_index(
self.idx_q)
name_q, q_order, q_contigs, q_reversed, q_abs_start, len_q = Index.load(self.idx_q)
if merge_index:
q_contigs, q_order = self.parse_index(q_order, q_contigs, q_len)
q_contigs, q_order = self.parse_index(q_order, q_contigs, len_q)
except IOError:
self.error = "Index file does not exist for query!"
return False
try:
name_t, t_order, t_contigs, t_reversed, t_abs_start, t_abs_current_start, t_len = self.load_index(
self.idx_t)
name_t, t_order, t_contigs, t_reversed, t_abs_start, len_t = Index.load(self.idx_t)
if merge_index:
t_contigs, t_order = self.parse_index(t_order, t_contigs, t_len)
t_contigs, t_order = self.parse_index(t_order, t_contigs, len_t)
except IOError:
self.error = "Index file does not exist for target!"
return False
len_q = q_abs_current_start
len_t = t_abs_current_start
lines_lens = []
try:
......@@ -497,7 +517,7 @@ class Paf:
:return: content of the file
"""
index = self.idx_q if to == "query" else self.idx_t
name, contigs_list, contigs, reversed, abs_start, abs_current_start, c_len = self.load_index(self.idx_t)
name, contigs_list, contigs, reversed, abs_start, c_len = Index.load(self.idx_t)
with open(self.paf, "r") as paf:
for line in paf:
c_name = line.strip("\n").split("\t")[0 if to == "query" else 5]
......
......@@ -55,6 +55,16 @@
<p>Your job was completed successfully.<br/>