Commit e4b3fa80 authored by Floreal Cabanettes's avatar Floreal Cabanettes
Browse files

Add alignment file as input instead of query and target, Implements #126

See merge request !3
parents 0295c795 003c20cf
......@@ -121,7 +121,7 @@ def run(mode="standalone", debug=False, host="127.0.0.1", port=5000, no_crons=Fa
"app": app
})
thread.start()
app.run(host=host, port=port, debug=debug)
app.run(host=host, port=port, debug=debug, threaded=True)
def clear_crons():
......
......@@ -8,7 +8,7 @@ from filter_contigs import Filter
from index import index_file
def index_fasta(name, filepath, out_dir, type_f):
def index_fasta(name, filepath, out_dir, type_f, dofilter = True):
"""
Index and filter fasta
:param name: name of the specie
......@@ -17,23 +17,25 @@ def index_fasta(name, filepath, out_dir, type_f):
:param type_f: type of fasta (query or target)
"""
uncompressed = None
if filepath.endswith(".gz"):
if filepath.endswith(".gz") and dofilter:
uncompressed = filepath[:-3]
index = os.path.join(out_dir, type_f + ".idx")
index = os.path.join(out_dir, type_f + ".idx")
success, nb_contigs = index_file(filepath, name, index, uncompressed)
if success:
in_fasta = filepath
if uncompressed is not None:
in_fasta = uncompressed
filtered_fasta = os.path.join(os.path.dirname(in_fasta), "filtered_" + os.path.basename(in_fasta))
filter_f = Filter(fasta=in_fasta,
index_file=index,
type_f=type_f,
min_filtered=nb_contigs / 4,
split=False,
out_fasta=filtered_fasta,
replace_fa=True)
is_filtered = filter_f.filter()
is_filtered = False
if dofilter:
in_fasta = filepath
if uncompressed is not None:
in_fasta = uncompressed
filtered_fasta = os.path.join(os.path.dirname(in_fasta), "filtered_" + os.path.basename(in_fasta))
filter_f = Filter(fasta=in_fasta,
index_file=index,
type_f=type_f,
min_filtered=nb_contigs / 4,
split=False,
out_fasta=filtered_fasta,
replace_fa=True)
is_filtered = filter_f.filter()
if uncompressed is not None:
if is_filtered:
os.remove(filepath)
......@@ -41,6 +43,7 @@ def index_fasta(name, filepath, out_dir, type_f):
save_file.write(uncompressed)
else:
os.remove(uncompressed)
else:
if uncompressed is not None:
try:
......@@ -53,16 +56,21 @@ def index_fasta(name, filepath, out_dir, type_f):
parser = argparse.ArgumentParser(description="Split huge contigs")
parser.add_argument('-q', '--query', type=str, required=False, help="Query fasta file")
parser.add_argument('-u', '--query-split', type=str, required=False, help="Query fasta file split")
parser.add_argument('-t', '--target', type=str, required=True, help="Target fasta file")
parser.add_argument('-t', '--target', type=str, required=False, help="Target fasta file")
parser.add_argument('-n', '--query-name', type=str, required=False, help="Query name")
parser.add_argument('-m', '--target-name', type=str, required=True, help="Target name")
parser.add_argument('-m', '--target-name', type=str, required=False, help="Target name")
parser.add_argument('-s', '--size', type=int, required=False, default=10,
help="Max size of contigs (Mb) - for query split")
parser.add_argument('-p', '--preptime-file', type=str, required=True, help="File into save prep times")
parser.add_argument('--split', type=bool, const=True, nargs="?", required=False, default=False,
help="Split query")
parser.add_argument('--index-only', type=bool, const=True, nargs="?", required=False, default=False,
help="Index files only. No split, no filter.")
args = parser.parse_args()
if args.index_only and args.split:
raise Exception("--index-only and --split arguments are mutually exclusive")
out_dir = os.path.dirname(args.target)
with open(args.preptime_file, "w") as ptime:
......@@ -88,9 +96,13 @@ with open(args.preptime_file, "w") as ptime:
else:
exit(1)
else:
index_fasta(name=args.query_name, filepath=args.query, out_dir=out_dir, type_f="query")
print("Indexing target...")
index_fasta(name=args.target_name, filepath=args.target, out_dir=out_dir, type_f="target")
print("Indexing query...")
index_fasta(name=args.query_name, filepath=args.query, out_dir=out_dir, type_f="query",
dofilter=not args.index_only)
if args.target is not None:
print("Indexing target...")
index_fasta(name=args.target_name, filepath=args.target, out_dir=out_dir, type_f="target",
dofilter=not args.index_only)
ptime.write(str(round(time.time())) + "\n")
......
......@@ -68,7 +68,7 @@ if MODE == "webserver":
error = CharField(default="")
mem_peak = IntegerField(null=True)
time_elapsed = IntegerField(null=True)
tool = CharField(default="minimap2", max_length=50)
tool = CharField(default="minimap2", max_length=50, null=True)
class Gallery(BaseModel):
......
......@@ -3,7 +3,7 @@ class Fasta:
self._name = name
self._path = path
self._type = type_f
self._example = example
self._example = example is not False
def set_path(self, path):
self._path = path
......
......@@ -6,12 +6,17 @@ import shutil
import sys
import re
import traceback
from inspect import getmembers, isfunction
from collections import OrderedDict
from Bio import SeqIO
from jinja2 import Template
from dgenies.config_reader import AppConfigReader
import dgenies.lib.validators as validators
ALLOWED_EXTENSIONS = ['fa', 'fasta', 'fna', 'fa.gz', 'fasta.gz', 'fna.gz']
ALLOWED_EXTENSIONS = {"fasta": ['fa', 'fasta', 'fna', 'fa.gz', 'fasta.gz', 'fna.gz'],
"idx": ['idx'],
"map": [o[0] for o in getmembers(validators) if isfunction(o[1]) and not o[0].startswith("_")]}
# map: all functions of validators which does not starts with an underscore.
class Functions:
......@@ -19,10 +24,13 @@ class Functions:
config = AppConfigReader()
@staticmethod
def allowed_file(filename):
return '.' in filename and \
(filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS or ".".join(filename.rsplit('.', 2)[1:]).lower()
in ALLOWED_EXTENSIONS)
def allowed_file(filename, file_formats=("fasta",)):
for file_format in file_formats:
if '.' in filename and \
(filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS[file_format]
or ".".join(filename.rsplit('.', 2)[1:]).lower() in ALLOWED_EXTENSIONS[file_format]):
return True
return False
@staticmethod
def random_string(s_len):
......@@ -256,3 +264,8 @@ class Functions:
if "gallery" in all_jobs:
all_jobs.remove("gallery")
return sorted(all_jobs, key=lambda x: x.lower())
@staticmethod
def query_fasta_file_exists(res_dir):
fasta_file = os.path.join(res_dir, ".query")
return os.path.exists(fasta_file) and os.path.isfile(fasta_file)
......@@ -9,7 +9,8 @@ import threading
import re
from dgenies.config_reader import AppConfigReader
from dgenies.tools import Tools
import dgenies.lib.parsers
import dgenies.lib.validators as validators
import dgenies.lib.parsers as parsers
from .fasta import Fasta
from .functions import Functions
import requests
......@@ -39,17 +40,20 @@ if MODE == "webserver":
class JobManager:
def __init__(self, id_job: str, email: str=None, query: Fasta=None, target: Fasta=None, mailer=None,
tool="minimap2"):
tool="minimap2", align: Fasta=None):
self.id_job = id_job
self.email = email
self.query = query
self.target = target
self.align = align
if align is not None:
self.aln_format = os.path.splitext(align.get_path())[1][1:]
self.error = ""
self.id_process = "-1"
# Get configs:
self.config = AppConfigReader()
self.tools = Tools().tools
self.tool = self.tools[tool]
self.tool = self.tools[tool] if tool is not None else None
# Outputs:
self.output_dir = os.path.join(self.config.app_data, id_job)
self.preptime_file = os.path.join(self.output_dir, "prep_times")
......@@ -62,6 +66,12 @@ class JobManager:
self.mailer = mailer
self._filename_for_url = {}
def do_align(self):
"""
:return: True if the job is launched with an alignment file
"""
return not os.path.exists(os.path.join(self.output_dir, ".align"))
@staticmethod
def is_gz_file(filepath):
with open(filepath, 'rb') as test_f:
......@@ -102,6 +112,16 @@ class JobManager:
path=file_path,
type_f="local"
)
align_file = os.path.join(res_dir, ".align")
if os.path.exists(align_file):
with open(align_file) as a_f:
file_path = a_f.readline()
self.align = Fasta(
name="map",
path=file_path,
type_f="local"
)
self.aln_format = os.path.splitext(file_path)[1][1:]
def check_job_success(self):
if os.path.exists(self.paf_raw):
......@@ -117,7 +137,7 @@ class JobManager:
def is_target_filtered(self):
return os.path.exists(os.path.join(self.output_dir, ".filter-target"))
def get_mail_content(self, status):
def get_mail_content(self, status, target_name, query_name=None):
message = "D-Genies\n\n"
if status == "success":
message += "Your job %s was completed successfully!\n\n" % self.id_job
......@@ -132,10 +152,10 @@ class JobManager:
message += "Your job %s has failed. You can try again. " \
"If the problem persists, please contact the support.\n\n" % self.id_job
message += "Sequences compared in this analysis:\n"
if self.query is not None:
message += "Target: %s\nQuery: %s\n\n" % (self.target.get_name(), self.query.get_name())
if query_name is not None:
message += "Target: %s\nQuery: %s\n\n" % (target_name, query_name)
else:
message += "Target: %s\n\n" % self.target.get_name()
message += "Target: %s\n\n" % target_name
if status == "success":
if self.is_target_filtered():
message += str("Note: target fasta has been filtered because it contains too small contigs."
......@@ -149,13 +169,13 @@ class JobManager:
message += "The team"
return message
def get_mail_content_html(self, status):
def get_mail_content_html(self, status, target_name, query_name=None):
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), "mail_templates", "job_notification.html"))\
as t_file:
template = Template(t_file.read())
return template.render(job_name=self.id_job, status=status, url_base=self.config.web_url,
query_name=self.query.get_name() if self.query is not None else "",
target_name=self.target.get_name(),
query_name=query_name if query_name is not None else "",
target_name=target_name,
error=self.error,
target_filtered=self.is_target_filtered(), query_filtered=self.is_query_filtered())
......@@ -174,11 +194,18 @@ class JobManager:
status = job.status
self.error = job.error
with open(self.idx_t, "r") as idxt:
target_name = idxt.readline().rstrip()
with open(self.idx_q, "r") as idxq:
query_name = idxq.readline().rstrip()
if query_name == target_name:
query_name = None
# Send:
self.mailer.send_mail(recipients=[self.email],
subject=self.get_mail_subject(status),
message=self.get_mail_content(status),
message_html=self.get_mail_content_html(status))
message=self.get_mail_content(status, target_name, query_name),
message_html=self.get_mail_content_html(status, target_name, query_name))
def search_error(self):
logs = os.path.join(self.output_dir, "logs.txt")
......@@ -329,22 +356,25 @@ class JobManager:
jt.outputPath = ":" + log_out
jt.errorPath = ":" + log_err
memory = self.config.cluster_memory
if self.query is None:
memory = self.config.cluster_memory_ava
if memory > 32:
name, order, contigs, reversed_c, abs_start, c_len = Index.load(self.idx_t, False)
if c_len <= 500000000:
memory = 32
if memory > self.tool.max_memory:
memory = self.tool.max_memory
if step == "start":
memory = self.config.cluster_memory
if self.query is None:
memory = self.config.cluster_memory_ava
if memory > 32:
name, order, contigs, reversed_c, abs_start, c_len = Index.load(self.idx_t, False)
if c_len <= 500000000:
memory = 32
if memory > self.tool.max_memory:
memory = self.tool.max_memory
else:
memory = 8000
native_specs = self.config.drmaa_native_specs
if batch_system_type == "slurm":
if native_specs == "###DEFAULT###":
native_specs = "--mem-per-cpu={0} --ntasks={1} --time={2}"
if step == "prepare":
jt.nativeSpecification = native_specs.format(8000, 1, "02:00:00")
jt.nativeSpecification = native_specs.format(memory, 1, "02:00:00")
elif step == "start":
jt.nativeSpecification = native_specs.format(memory // self.tool.threads_cluster * 1000,
self.tool.threads_cluster, "02:00:00")
......@@ -527,6 +557,15 @@ class JobManager:
else:
return 0
def set_job_status(self, status, error=""):
if MODE == "webserver":
job = Job.get(Job.id_job == self.id_job)
job.status = status
job.error = error
job.save()
else:
self.set_status_standalone(status, error)
def check_file(self, input_type, should_be_local, max_upload_size_readable):
"""
......@@ -541,34 +580,31 @@ class JobManager:
my_input = getattr(self, input_type)
if my_input.get_path().endswith(".gz") and not self.is_gz_file(my_input.get_path()):
# Check file is correctly gzipped
status = "fail"
error = input_type + " file is not a correct gzip file"
if MODE == "webserver":
job = Job.get(Job.id_job == self.id_job)
job.status = status
job.error = error
job.save()
else:
self.set_status_standalone(status, error)
self.set_job_status("fail", input_type + " file is not a correct gzip file")
self.clear()
return False, True, None
# Check size:
file_size = self.get_file_size(my_input.get_path())
if -1 < (self.config.max_upload_size if (input_type == "query" or self.query is not None)
else self.config.max_upload_size_ava) < file_size:
status = "fail"
error = input_type + " file exceed size limit of %d Mb (uncompressed)" % max_upload_size_readable
if MODE == "webserver":
job = Job.get(Job.id_job == self.id_job)
job.status = status
job.error = error
job.save()
else:
self.set_status_standalone(status, error)
self.set_job_status("fail",
input_type +
" file exceed size limit of %d Mb (uncompressed)" % max_upload_size_readable)
self.clear()
return False, True, None
if self.config.batch_system_type != "local" and file_size >= getattr(self.config, "min_%s_size" % input_type):
should_be_local = False
if input_type == "align":
if not hasattr(validators, self.aln_format):
self.set_job_status("fail", "Alignment file format not supported")
return False, True, None
if not getattr(validators, self.aln_format)(self.align.get_path()):
self.set_job_status("fail", "Alignment file is invalid. Please check your file.")
return False, True, None
else:
if self.config.batch_system_type != "local" and file_size >= getattr(self.config,
"min_%s_size" % input_type):
should_be_local = False
return True, False, should_be_local
def download_files_with_pending(self, files_to_download, should_be_local, max_upload_size_readable):
......@@ -660,18 +696,22 @@ class JobManager:
files_to_download.append([self.query, "query"])
else:
return False, True, True
if correct:
if self.target is not None:
if self.target.get_type() == "local":
self.target.set_path(self.__getting_local_file(self.target, "target"))
correct, error_set, should_be_local = self.check_file("target", should_be_local,
max_upload_size_readable)
if not correct:
return False, error_set, True
elif self.__check_url(self.target):
files_to_download.append([self.target, "target"])
else:
return False, True, True
if correct and self.target is not None:
if self.target.get_type() == "local":
self.target.set_path(self.__getting_local_file(self.target, "target"))
correct, error_set, should_be_local = self.check_file("target", should_be_local,
max_upload_size_readable)
if not correct:
return False, error_set, True
elif self.__check_url(self.target):
files_to_download.append([self.target, "target"])
else:
return False, True, True
if correct and self.align is not None:
if self.align.get_type() == "local":
self.align.set_path(self.__getting_local_file(self.align, "align"))
correct, error_set, should_be_local = self.check_file("align", should_be_local,
max_upload_size_readable)
all_downloaded = True
if correct :
......@@ -731,14 +771,7 @@ class JobManager:
def prepare_data_local(self):
with open(self.preptime_file, "w") as ptime, Job.connect():
status = "preparing"
if MODE == "webserver":
job = Job.get(Job.id_job == self.id_job)
job.status = status
job.save()
else:
job = None
self.set_status_standalone(status)
self.set_job_status("preparing")
ptime.write(str(round(time.time())) + "\n")
error_tail = "Please check your input file and try again."
if self.query is not None:
......@@ -772,14 +805,7 @@ class JobManager:
replace_fa=True)
filter_f.filter()
else:
status = "fail"
error = "<br/>".join(["Query fasta file is not valid!", error_tail])
if MODE == "webserver":
job.status = status
job.error = error
job.save()
else:
self.set_status_standalone(status, error)
self.set_job_status("fail", "<br/>".join(["Query fasta file is not valid!", error_tail]))
if self.config.send_mail_status:
self.send_mail_post()
return False
......@@ -814,36 +840,134 @@ class JobManager:
os.remove(uncompressed)
except FileNotFoundError:
pass
status = "fail"
error = "<br/>".join(["Target fasta file is not valid!", error_tail])
if MODE == "webserver":
job.status = status
job.error = error
job.save()
else:
self.set_status_standalone(status, error)
self.set_job_status("fail", "<br/>".join(["Target fasta file is not valid!", error_tail]))
if self.config.send_mail_status:
self.send_mail_post()
return False
ptime.write(str(round(time.time())) + "\n")
status = "prepared"
if MODE == "webserver":
job.status = status
job.save()
self.set_job_status("prepared")
self.run_job("local")
def _end_of_prepare_dotplot(self):
# Parse alignment file:
if hasattr(parsers, self.aln_format):
getattr(parsers, self.aln_format)(self.align.get_path(), self.paf_raw)
os.remove(self.align.get_path())
elif self.aln_format == "paf":
shutil.move(self.align.get_path(), self.paf_raw)
else:
self.set_job_status("fail", "No parser found for format %s. Please contact the support." % self.aln_format)
return False
self.set_job_status("started")
# Sort paf lines:
sorter = Sorter(self.paf_raw, self.paf)
sorter.sort()
os.remove(self.paf_raw)
if self.target is not None and os.path.exists(self.target.get_path()):
os.remove(self.target.get_path())
self.align.set_path(self.paf)
self.set_job_status("success")
if MODE == "webserver" and self.config.send_mail_status:
self.send_mail_post()
def prepare_dotplot_cluster(self, batch_system_type):
"""
Prepare data if alignment already done: just index the fasta (if index not given), then parse the alignment
:param batch_system_type: type of cluster
"""
args = [self.config.cluster_prepare_script,
"-p", self.preptime_file, "--index-only"]
has_index = False
target_format = os.path.splitext(self.target.get_path())[1][1:]
if target_format == "idx":
shutil.move(self.target.get_path(), self.idx_t)
os.remove(os.path.join(self.output_dir, ".target"))
else:
has_index = True
args += ["-t", self.target.get_path(),
"-m", self.target.get_name()]
if self.query is not None:
query_format = os.path.splitext(self.query.get_path())[1][1:]
if query_format == "idx":
shutil.move(self.query.get_path(), self.idx_q)
os.remove(os.path.join(self.output_dir, ".query"))
else:
self.set_status_standalone(status)
self.run_job("local")
has_index = True
args += ["-q", self.query.get_path(),
"-n", self.query.get_name()]
success = True
if has_index:
success = self.launch_to_cluster(step="prepare",
batch_system_type=batch_system_type,
command=self.config.cluster_python_exec,
args=args,
log_out=self.logs,
log_err=self.logs)
if success:
if self.query is None:
shutil.copy(self.idx_t, self.idx_q)
self._end_of_prepare_dotplot()
elif MODE == "webserver" and self.config.send_mail_status:
self.send_mail_post()
def prepare_dotplot_local(self):
"""
Prepare data if alignment already done: just index the fasta (if index not given), then parse the alignment
file and sort it.
"""
self.set_job_status("preparing")
# Prepare target index:
target_format = os.path.splitext(self.target.get_path())[1][1:]
if target_format == "idx":
shutil.move(self.target.get_path(), self.idx_t)
os.remove(os.path.join(self.output_dir, ".target"))
else:
index_file(self.target.get_path(), self.target.get_name(), self.idx_t)
# Prepare query index:
if self.query is not None:
query_format = os.path.splitext(self.query.get_path())[1][1:]
if query_format == "idx":
shutil.move(self.query.get_path(), self.idx_q)
os.remove(os.path.join(self.output_dir, ".query"))
else:
index_file(self.query.get_path(), self.query.get_name(), self.idx_q)
else:
shutil.copy(self.idx_t, self.idx_q)
self._end_of_prepare_dotplot()
def prepare_data(self):