Skip to content
Snippets Groups Projects

Add export and import as tar file

Merged Floreal Cabanettes requested to merge exportzip into master
@@ -21,6 +21,7 @@ from jinja2 import Template
import traceback
from pathlib import Path
from urllib import request, parse
import tarfile
from dgenies.bin.split_fa import Splitter
from dgenies.bin.index import index_file, Index
from dgenies.bin.filter_contigs import Filter
@@ -40,7 +41,7 @@ if MODE == "webserver":
class JobManager:
def __init__(self, id_job: str, email: str=None, query: Fasta=None, target: Fasta=None, mailer=None,
tool="minimap2", align: Fasta=None):
tool="minimap2", align: Fasta=None, backup: Fasta=None):
self.id_job = id_job
self.email = email
self.query = query
@@ -48,6 +49,7 @@ class JobManager:
self.align = align
if align is not None:
self.aln_format = os.path.splitext(align.get_path())[1][1:]
self.backup = backup
self.error = ""
self.id_process = "-1"
# Get configs:
@@ -99,7 +101,8 @@ class JobManager:
with open(query_file) as q_f:
file_path = q_f.readline()
self.query = Fasta(
name=os.path.splitext(os.path.basename(file_path.replace(".gz", "")).split("_", 1)[1])[0],
name="target" if file_path.endswith(".idx") else
os.path.splitext(os.path.basename(file_path.replace(".gz", "")).split("_", 1)[1])[0],
path=file_path,
type_f="local"
)
@@ -108,7 +111,8 @@ class JobManager:
with open(target_file) as t_f:
file_path = t_f.readline()
self.target = Fasta(
name=os.path.splitext(os.path.basename(file_path.replace(".gz", "")).split("_", 1)[1])[0],
name="query" if file_path.endswith(".idx") else
os.path.splitext(os.path.basename(file_path.replace(".gz", "")).split("_", 1)[1])[0],
path=file_path,
type_f="local"
)
@@ -151,11 +155,12 @@ class JobManager:
else:
message += "Your job %s has failed. You can try again. " \
"If the problem persists, please contact the support.\n\n" % self.id_job
message += "Sequences compared in this analysis:\n"
if query_name is not None:
message += "Target: %s\nQuery: %s\n\n" % (target_name, query_name)
else:
message += "Target: %s\n\n" % target_name
if target_name is not None:
message += "Sequences compared in this analysis:\n"
if query_name is not None:
message += "Target: %s\nQuery: %s\n\n" % (target_name, query_name)
else:
message += "Target: %s\n\n" % target_name
if status == "success":
if self.is_target_filtered():
message += str("Note: target fasta has been filtered because it contains too small contigs."
@@ -175,7 +180,7 @@ class JobManager:
template = Template(t_file.read())
return template.render(job_name=self.id_job, status=status, url_base=self.config.web_url,
query_name=query_name if query_name is not None else "",
target_name=target_name,
target_name=target_name if target_name is not None else "",
error=self.error,
target_filtered=self.is_target_filtered(), query_filtered=self.is_query_filtered())
@@ -194,12 +199,16 @@ class JobManager:
status = job.status
self.error = job.error
with open(self.idx_t, "r") as idxt:
target_name = idxt.readline().rstrip()
with open(self.idx_q, "r") as idxq:
query_name = idxq.readline().rstrip()
if query_name == target_name:
query_name = None
target_name = None
if os.path.exists(self.idx_t):
with open(self.idx_t, "r") as idxt:
target_name = idxt.readline().rstrip()
query_name = None
if os.path.exists(self.idx_q):
with open(self.idx_q, "r") as idxq:
query_name = idxq.readline().rstrip()
if query_name == target_name:
query_name = None
# Send:
self.mailer.send_mail(recipients=[self.email],
@@ -499,7 +508,7 @@ class JobManager:
save_file.write(finale_path)
return True, False, finale_path, name
def __check_url(self, fasta: Fasta):
def __check_url(self, fasta: Fasta, formats: tuple):
url = fasta.get_path()
try:
filename = self.__get_filename_from_url(url)
@@ -517,11 +526,28 @@ class JobManager:
self.set_status_standalone(status, error)
return False
if filename is not None:
allowed = Functions.allowed_file(filename)
allowed = Functions.allowed_file(filename, formats)
if not allowed:
status = "fail"
error = "<p>File <b>%s</b> downloaded from <b>%s</b> is not a Fasta file!</p>" \
"<p>If this is unattended, please contact the support.</p>" % (filename, url)
format_txt = ""
if len(formats) == 1:
if formats[0] == "fasta":
format_txt = "a Fasta file"
elif formats[0] == "idx":
format_txt = "an index file"
elif formats[0] == "map":
format_txt = "an alignment file"
elif formats[0] == "backup":
format_txt = "a backup file"
else:
format_txt = "a valid file"
else:
if "fasta" in formats and "idx" in formats:
format_txt = "a Fasta file or an index file"
else:
format_txt = "a valid file"
error = "<p>File <b>%s</b> downloaded from <b>%s</b> is not %s!</p>" \
"<p>If this is unattended, please contact the support.</p>" % (filename, url, format_txt)
if MODE == "webserver":
with Job.connect():
job = Job.get(Job.id_job == self.id_job)
@@ -600,7 +626,7 @@ class JobManager:
if not getattr(validators, self.aln_format)(self.align.get_path()):
self.set_job_status("fail", "Alignment file is invalid. Please check your file.")
return False, True, None
else:
elif input_type != "backup":
if self.config.batch_system_type != "local" and file_size >= getattr(self.config,
"min_%s_size" % input_type):
should_be_local = False
@@ -692,7 +718,7 @@ class JobManager:
max_upload_size_readable)
if not correct:
return False, error_set, True
elif self.__check_url(self.query):
elif self.__check_url(self.query, ("fasta",) if self.align is None else ("fasta", "idx")):
files_to_download.append([self.query, "query"])
else:
return False, True, True
@@ -703,7 +729,7 @@ class JobManager:
max_upload_size_readable)
if not correct:
return False, error_set, True
elif self.__check_url(self.target):
elif self.__check_url(self.target, ("fasta",) if self.align is None else ("fasta", "idx")):
files_to_download.append([self.target, "target"])
else:
return False, True, True
@@ -712,10 +738,19 @@ class JobManager:
self.align.set_path(self.__getting_local_file(self.align, "align"))
correct, error_set, should_be_local = self.check_file("align", should_be_local,
max_upload_size_readable)
elif self.__check_url(self.align):
elif self.__check_url(self.align, ("map",)):
files_to_download.append([self.align, "align"])
else:
return False, True, True
if correct and self.backup is not None:
if self.backup.get_type() == "local":
self.backup.set_path(self.__getting_local_file(self.backup, "backup"))
correct, error_set, should_be_local = self.check_file("backup", should_be_local,
max_upload_size_readable)
elif self.__check_url(self.backup, ("backup",)):
files_to_download.append([self.backup, "backup"])
else:
return False, True, True
all_downloaded = True
if correct :
@@ -870,7 +905,8 @@ class JobManager:
sorter = Sorter(self.paf_raw, self.paf)
sorter.sort()
os.remove(self.paf_raw)
if self.target is not None and os.path.exists(self.target.get_path()):
if self.target is not None and os.path.exists(self.target.get_path()) and not \
self.target.get_path().endswith(".idx"):
os.remove(self.target.get_path())
self.align.set_path(self.paf)
@@ -1076,9 +1112,48 @@ class JobManager:
batch_type=job.batch_type)
log.save()
def unpack_backup(self):
try:
with tarfile.open(self.backup.get_path(), "r") as tar:
names = tar.getnames()
if len(names) != 3:
return False
for name in ["map.paf", "query.idx", "target.idx"]:
if name not in names:
return False
tar.extractall(self.output_dir)
shutil.move(self.paf, self.paf_raw)
if not validators.paf(self.paf_raw):
return False
self.align = Fasta(name="map", path=self.paf_raw, type_f="local")
self.aln_format = "paf"
with open(os.path.join(self.output_dir, ".align"), "w") as aln:
aln.write(self.paf_raw)
target_path = os.path.join(self.output_dir, "target.idx")
self.target = Fasta(name="target", path=target_path, type_f="local")
with open(os.path.join(self.output_dir, ".target"), "w") as trgt:
trgt.write(target_path)
query_path = os.path.join(self.output_dir, "query.idx")
self.query = Fasta(name="query", path=query_path, type_f="local")
with open(os.path.join(self.output_dir, ".query"), "w") as qr:
qr.write(query_path)
os.remove(self.backup.get_path())
return True
except:
traceback.print_exc()
return False
def _after_start(self, success, error_set):
with Job.connect():
if success:
if self.backup is not None:
success = self.unpack_backup()
if not success:
self.set_job_status("fail", "Backup file is not valid. If it is unattended, please contact the "
"support.")
if MODE == "webserver" and self.config.send_mail_status:
self.send_mail()
return False
status = "waiting"
if MODE == "webserver":
job = Job.get(Job.id_job == self.id_job)
@@ -1137,7 +1212,7 @@ class JobManager:
print("Old job found without result dir existing: delete it from BDD!")
for j11 in j1:
j11.delete_instance()
if self.target is not None:
if self.target is not None or self.backup is not None:
job = Job.create(id_job=self.id_job, email=self.email, batch_type=self.config.batch_system_type,
date_created=datetime.now(), tool=self.tool.name if self.tool is not None else None)
job.save()
Loading