Commit 0531d9d9 authored by Floreal Cabanettes's avatar Floreal Cabanettes
Browse files

Align file: work server side for standalone mode

parent 9ea8a152
......@@ -68,7 +68,7 @@ if MODE == "webserver":
error = CharField(default="")
mem_peak = IntegerField(null=True)
time_elapsed = IntegerField(null=True)
tool = CharField(default="minimap2", max_length=50)
tool = CharField(default="minimap2", max_length=50, null=True)
class Gallery(BaseModel):
......
......@@ -3,7 +3,7 @@ class Fasta:
self._name = name
self._path = path
self._type = type_f
self._example = example
self._example = example is not False
def set_path(self, path):
self._path = path
......
......@@ -6,14 +6,17 @@ import shutil
import sys
import re
import traceback
from inspect import getmembers, isfunction
from collections import OrderedDict
from Bio import SeqIO
from jinja2 import Template
from dgenies.config_reader import AppConfigReader
import dgenies.lib.validators as validators
ALLOWED_EXTENSIONS = {"fasta": ['fa', 'fasta', 'fna', 'fa.gz', 'fasta.gz', 'fna.gz'],
"idx": ['idx'],
"map": ['paf', 'maf']}
"map": [o[0] for o in getmembers(validators) if isfunction(o[1]) and not o[0].startswith("_")]}
# map: all functions of validators which does not starts with an underscore.
class Functions:
......@@ -261,3 +264,8 @@ class Functions:
if "gallery" in all_jobs:
all_jobs.remove("gallery")
return sorted(all_jobs, key=lambda x: x.lower())
@staticmethod
def query_fasta_file_exists(res_dir):
fasta_file = os.path.join(res_dir, ".query")
return os.path.exists(fasta_file) and os.path.isfile(fasta_file)
......@@ -9,7 +9,8 @@ import threading
import re
from dgenies.config_reader import AppConfigReader
from dgenies.tools import Tools
import dgenies.lib.parsers
import dgenies.lib.validators as validators
import dgenies.lib.parsers as parsers
from .fasta import Fasta
from .functions import Functions
import requests
......@@ -39,17 +40,20 @@ if MODE == "webserver":
class JobManager:
def __init__(self, id_job: str, email: str=None, query: Fasta=None, target: Fasta=None, mailer=None,
tool="minimap2"):
tool="minimap2", align: Fasta=None):
self.id_job = id_job
self.email = email
self.query = query
self.target = target
self.align = align
if align is not None:
self.aln_format = os.path.splitext(align.get_path())[1][1:]
self.error = ""
self.id_process = "-1"
# Get configs:
self.config = AppConfigReader()
self.tools = Tools().tools
self.tool = self.tools[tool]
self.tool = self.tools[tool] if tool is not None else None
# Outputs:
self.output_dir = os.path.join(self.config.app_data, id_job)
self.preptime_file = os.path.join(self.output_dir, "prep_times")
......@@ -62,6 +66,12 @@ class JobManager:
self.mailer = mailer
self._filename_for_url = {}
def do_align(self):
"""
:return: True if the job is launched with an alignment file
"""
return not os.path.exists(os.path.join(self.output_dir, ".align"))
@staticmethod
def is_gz_file(filepath):
with open(filepath, 'rb') as test_f:
......@@ -527,6 +537,15 @@ class JobManager:
else:
return 0
def set_job_status(self, status, error=""):
if MODE == "webserver":
job = Job.get(Job.id_job == self.id_job)
job.status = status
job.error = error
job.save()
else:
self.set_status_standalone(status, error)
def check_file(self, input_type, should_be_local, max_upload_size_readable):
"""
......@@ -541,34 +560,31 @@ class JobManager:
my_input = getattr(self, input_type)
if my_input.get_path().endswith(".gz") and not self.is_gz_file(my_input.get_path()):
# Check file is correctly gzipped
status = "fail"
error = input_type + " file is not a correct gzip file"
if MODE == "webserver":
job = Job.get(Job.id_job == self.id_job)
job.status = status
job.error = error
job.save()
else:
self.set_status_standalone(status, error)
self.set_job_status("fail", input_type + " file is not a correct gzip file")
self.clear()
return False, True, None
# Check size:
file_size = self.get_file_size(my_input.get_path())
if -1 < (self.config.max_upload_size if (input_type == "query" or self.query is not None)
else self.config.max_upload_size_ava) < file_size:
status = "fail"
error = input_type + " file exceed size limit of %d Mb (uncompressed)" % max_upload_size_readable
if MODE == "webserver":
job = Job.get(Job.id_job == self.id_job)
job.status = status
job.error = error
job.save()
else:
self.set_status_standalone(status, error)
self.set_job_status("fail",
input_type +
" file exceed size limit of %d Mb (uncompressed)" % max_upload_size_readable)
self.clear()
return False, True, None
if self.config.batch_system_type != "local" and file_size >= getattr(self.config, "min_%s_size" % input_type):
should_be_local = False
if input_type == "align":
if not hasattr(validators, self.aln_format):
self.set_job_status("fail", "Alignment file format not supported")
return False, True, None
if not getattr(validators, self.aln_format)(self.align.get_path()):
self.set_job_status("fail", "Alignment file is invalid. Please check your file.")
return False, True, None
else:
if self.config.batch_system_type != "local" and file_size >= getattr(self.config,
"min_%s_size" % input_type):
should_be_local = False
return True, False, should_be_local
def download_files_with_pending(self, files_to_download, should_be_local, max_upload_size_readable):
......@@ -660,18 +676,24 @@ class JobManager:
files_to_download.append([self.query, "query"])
else:
return False, True, True
if correct:
if self.target is not None:
if self.target.get_type() == "local":
self.target.set_path(self.__getting_local_file(self.target, "target"))
correct, error_set, should_be_local = self.check_file("target", should_be_local,
max_upload_size_readable)
if not correct:
return False, error_set, True
elif self.__check_url(self.target):
files_to_download.append([self.target, "target"])
else:
return False, True, True
if correct and self.target is not None:
if self.target.get_type() == "local":
self.target.set_path(self.__getting_local_file(self.target, "target"))
correct, error_set, should_be_local = self.check_file("target", should_be_local,
max_upload_size_readable)
if not correct:
return False, error_set, True
elif self.__check_url(self.target):
files_to_download.append([self.target, "target"])
else:
return False, True, True
if correct and self.align is not None:
if self.align.get_type() == "local":
final_path = os.path.join(self.output_dir, "in_" + os.path.basename(self.align.get_path()))
shutil.move(self.align.get_path(), final_path)
self.align.set_path(final_path)
correct, error_set, should_be_local = self.check_file("align", should_be_local,
max_upload_size_readable)
all_downloaded = True
if correct :
......@@ -731,14 +753,7 @@ class JobManager:
def prepare_data_local(self):
with open(self.preptime_file, "w") as ptime, Job.connect():
status = "preparing"
if MODE == "webserver":
job = Job.get(Job.id_job == self.id_job)
job.status = status
job.save()
else:
job = None
self.set_status_standalone(status)
self.set_job_status("preparing")
ptime.write(str(round(time.time())) + "\n")
error_tail = "Please check your input file and try again."
if self.query is not None:
......@@ -772,14 +787,7 @@ class JobManager:
replace_fa=True)
filter_f.filter()
else:
status = "fail"
error = "<br/>".join(["Query fasta file is not valid!", error_tail])
if MODE == "webserver":
job.status = status
job.error = error
job.save()
else:
self.set_status_standalone(status, error)
self.set_job_status("fail", "<br/>".join(["Query fasta file is not valid!", error_tail]))
if self.config.send_mail_status:
self.send_mail_post()
return False
......@@ -814,36 +822,87 @@ class JobManager:
os.remove(uncompressed)
except FileNotFoundError:
pass
status = "fail"
error = "<br/>".join(["Target fasta file is not valid!", error_tail])
if MODE == "webserver":
job.status = status
job.error = error
job.save()
else:
self.set_status_standalone(status, error)
self.set_job_status("fail", "<br/>".join(["Target fasta file is not valid!", error_tail]))
if self.config.send_mail_status:
self.send_mail_post()
return False
ptime.write(str(round(time.time())) + "\n")
status = "prepared"
if MODE == "webserver":
job.status = status
job.save()
self.set_job_status("prepared")
self.run_job("local")
def prepare_dotplot_local(self):
"""
Prepare data if alignment already done: just index the fasta (if index not given), then parse the alignment
file and sort it.
"""
self.set_job_status("preparing")
# Prepare target index:
target_format = os.path.splitext(self.target.get_path())[1][1:]
if target_format == "idx":
shutil.move(self.target.get_path(), self.idx_t)
os.remove(os.path.join(self.output_dir, ".target"))
else:
index_file(self.target.get_path(), self.target.get_name(), self.idx_t)
# Prepare query index:
if self.query is not None:
query_format = os.path.splitext(self.query.get_path())[1][1:]
if query_format == "idx":
shutil.move(self.query.get_path(), self.idx_q)
os.remove(os.path.join(self.output_dir, ".query"))
else:
self.set_status_standalone(status)
self.run_job("local")
index_file(self.query.get_path(), self.query.get_name(), self.idx_q)
else:
shutil.copy(self.idx_t, self.idx_q)
# Parse alignment file:
if hasattr(parsers, self.aln_format):
getattr(parsers, self.aln_format)(self.align.get_path(), self.paf_raw)
os.remove(self.align.get_path())
elif self.aln_format == "paf":
shutil.move(self.align.get_path(), self.paf_raw)
else:
self.set_job_status("fail", "No parser found for format %s. Please contact the support." % self.aln_format)
return False
self.set_job_status("started")
# Sort paf lines:
sorter = Sorter(self.paf_raw, self.paf)
sorter.sort()
os.remove(self.paf_raw)
if self.target is not None and os.path.exists(self.target.get_path()):
os.remove(self.target.get_path())
self.align.set_path(self.paf)
self.set_job_status("success")
if MODE == "webserver" and self.config.send_mail_status:
self.send_mail_post()
def prepare_data(self):
if MODE == "webserver":
with Job.connect():
job = Job.get(Job.id_job == self.id_job)
if job.batch_type == "local":
self.prepare_data_local()
else:
self.prepare_data_cluster(job.batch_type)
if self.align is None:
if MODE == "webserver":
with Job.connect():
job = Job.get(Job.id_job == self.id_job)
if job.batch_type == "local":
self.prepare_data_local()
else:
self.prepare_data_cluster(job.batch_type)
else:
self.prepare_data_local()
else:
self.prepare_data_local()
if MODE == "webserver":
with Job.connect():
job = Job.get(Job.id_job == self.id_job)
if job.batch_type == "local":
self.prepare_dotplot_local()
else:
print("NOT IMPLEMENTED!")
# self.prepare_data_cluster(job.batch_type)
else:
self.prepare_dotplot_local()
def run_job(self, batch_system_type):
success = False
......@@ -891,7 +950,7 @@ class JobManager:
Path(os.path.join(self.output_dir, ".all-vs-all")).touch()
if self.tool.parser is not None:
paf_raw = self.paf_raw + ".parsed"
dgenies.lib.parsers.__dict__.get(self.tool.parser)(self.paf_raw, paf_raw)
getattr(parsers, self.tool.parser)(self.paf_raw, paf_raw)
os.remove(self.paf_raw)
self.paf_raw = paf_raw
sorter = Sorter(self.paf_raw, self.paf)
......
......@@ -113,6 +113,12 @@ class Paf:
keep_lines[cls].append(line)
return keep_lines
def keyerror_message(self, exception: KeyError, type_f):
message = "Invalid contig for %s: %s" % (type_f, exception.args[0])
if os.path.exists(os.path.join(self.data_dir, ".align")):
message += ". May be you invert query and target files?"
return message
def parse_paf(self, merge_index=True, noise=True):
min_idy = 10000000000
max_idy = -10000000000
......@@ -158,10 +164,18 @@ class Paf:
min_idy = min(min_idy, idy)
max_idy = max(max_idy, idy)
# x1, x2, y1, y2, idy
y1 = int(parts[2]) + q_abs_start[v1]
y2 = int(parts[3]) + q_abs_start[v1]
x1 = int(parts[7 if strand == 1 else 8]) + t_abs_start[v6]
x2 = int(parts[8 if strand == 1 else 7]) + t_abs_start[v6]
try:
y1 = int(parts[2]) + q_abs_start[v1]
y2 = int(parts[3]) + q_abs_start[v1]
except KeyError as e:
self.error = self.keyerror_message(e, "query")
return False
try:
x1 = int(parts[7 if strand == 1 else 8]) + t_abs_start[v6]
x2 = int(parts[8 if strand == 1 else 7]) + t_abs_start[v6]
except KeyError as e:
self.error = self.keyerror_message(e, "target")
return False
len_m = sqrt(pow(x2 - x1, 2) + pow(y2 - y1, 2))
lines_lens.append(len_m)
if idy < self.limit_idy[0]:
......
......@@ -4,5 +4,48 @@ Define tools parsers here
Each parser (main function) must have 2 and only 2 arguments:
- First argument: input file which is the tool raw output
- Second argument: finale PAF file
Returns True if parse succeed, else False
"""
import traceback
from Bio import AlignIO
def maf(in_maf, out_paf):
maf_f = None
try:
with open(out_paf, "w") as paf:
maf_f = AlignIO.parse(in_maf, "maf")
for grp in maf_f:
seqs = []
for seq in grp:
seqs.append(seq)
matches = 0
for i in range(0, len(seqs[0])):
if seqs[0][i] == seqs[1][i]:
matches += 1
tannots = seqs[0].annotations
qannots = seqs[1].annotations
paf.write("{qname}\t{qlen}\t{qstart}\t{qend}\t{strand}\t{tname}\t{tlen}\t{tstart}\t{tend}\t{matches}\t"
"{block_len}\t255\n".format(
tname=seqs[0].id,
tlen=tannots["srcSize"],
tstart=tannots["start"],
tend=tannots["start"] + tannots["size"],
qname=seqs[1].id,
qlen=qannots["srcSize"],
qstart=qannots["start"],
qend=qannots["start"] + qannots["size"],
strand="+" if tannots["strand"] == qannots["strand"] else "-",
matches=matches,
block_len=tannots["size"]
))
except:
traceback.print_exc()
if maf_f is not None:
maf_f.close()
return False
else:
maf_f.close()
return True
"""
Define formats validators here (for alignment files)
Each validator (main function) has a name which is exactly the name of the format in the aln-formats.yaml file.
Only 1 argument to this function:
- Input file to check
Secondary functions must start with _
Returns True if file is valid, else False
"""
from Bio import AlignIO
import shutil, os
def paf(in_file):
try:
with open(in_file, "r") as aln:
n = 0
for line in aln:
parts = line.rstrip().split("\t")
if len(parts) < 12:
return False
for i in (1, 2, 3, 6, 7, 8, 9, 10, 11):
if not parts[i].isdigit():
return False
if parts[4] not in ("+", "-"):
return False
n += 1
if n == 1000:
break
except:
return False
else:
return True
def _filter_maf(in_file):
new_file = in_file + ".new"
with open(in_file, "r") as inf, open(new_file, "w") as new:
for line in inf:
if len(line.rstrip()) == 0 or line[0] in ["#", "a", "s"]:
new.write(line)
os.remove(in_file)
shutil.move(new_file, in_file)
def maf(in_file):
_filter_maf(in_file)
try:
maf = AlignIO.parse(in_file, "maf")
for grp in maf:
if len(grp) != 2:
return False
except:
return False
else:
return True
\ No newline at end of file
......@@ -48,7 +48,7 @@
<option value="1">Svg</option>
<option value="2">Png</option>
<option value="3">Paf file</option>
{% if not is_gallery %}
{% if not is_gallery and fasta_file %}
<option value="4">Query Fasta</option>
<option value="8">Query assembled as reference</option>
{% endif %}
......
......@@ -89,9 +89,11 @@
<div class="tick started">
<p>Started</p>
</div>
{% if do_align %}
<div class="tick merging">
<p>Merging</p>
</div>
{% endif %}
<div class="tick success">
<p>Done</p>
</div>
......
......@@ -111,14 +111,21 @@ def launch_analysis():
file_query_type = request.form["query_type"]
file_target = request.form["target"]
file_target_type = request.form["target_type"]
tool = request.form["tool"]
tool = request.form["tool"] if "tool" in request.form else None
alignfile = request.form["alignfile"] if "alignfile" in request.form else None
alignfile_type = request.form["alignfile_type"] if "alignfile_type" in request.form else None
# Check form:
form_pass = True
errors = []
if tool not in Tools().tools:
if alignfile is not None and alignfile_type is None:
errors.append("Server error: no alignfile_type in form. Please contact the support")
form_pass = False
if tool is not None and tool not in Tools().tools:
errors.append("Tool unavailable: %s" % tool)
form_pass = False
if id_job == "":
errors.append("Id of job not given")
......@@ -180,9 +187,26 @@ def launch_analysis():
form_pass = False
target = Fasta(name=target_name, path=target_path, type_f=file_target_type, example=example)
align = None
if alignfile is not None:
Path(os.path.join(folder_files, ".align")).touch()
alignfile_name = os.path.splitext(alignfile)[0] if alignfile_type == "local" else None
alignfile_path = os.path.join(app.config["UPLOAD_FOLDER"], upload_folder, alignfile) \
if alignfile_type == "local" else alignfile
if alignfile_type == "local" and not os.path.exists(alignfile_path):
errors.append("Alignment file not correct!")
form_pass = False
align = Fasta(name=alignfile_name, path=alignfile_path, type_f=alignfile_type)
if form_pass:
# Launch job:
job = JobManager(id_job, email, query, target, mailer, tool)
job = JobManager(id_job=id_job,
email=email,
query=query,
target=target,
align=align,
mailer=mailer,
tool=tool)
if MODE == "webserver":
job.launch()
else:
......@@ -220,15 +244,17 @@ def status(id_job):
return render_template("status.html", status=j_status["status"],
error=j_status["error"].replace("#ID#", ""),
id_job=id_job, menu="results", mem_peak=mem_peak,
time_elapsed=time_e,
time_elapsed=time_e, do_align=job.do_align(),
query_filtered=job.is_query_filtered(), target_filtered=job.is_target_filtered())
# Results path
@app.route("/result/<id_res>", methods=['GET'])
def result(id_res):
res_dir = os.path.join(APP_DATA, id_res)
return render_template("result.html", id=id_res, menu="result", current_result=id_res,
is_gallery=Functions.is_in_gallery(id_res, MODE))
is_gallery=Functions.is_in_gallery(id_res, MODE),
fasta_file=Functions.query_fasta_file_exists(res_dir))
@app.route("/gallery", methods=['GET'])
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment