Commit 5ffb3ec6 authored by Floreal Cabanettes's avatar Floreal Cabanettes
Browse files

Ease add of a new tool and add mashmap tool, Implements #123, #124

parent 1943b106
......@@ -6,7 +6,6 @@ data_folder = ###CONFIG###/data
# batch system type: local, sge, slurm
batch_system_type = local
threads_local = 4
web_url = http://localhost:5000
# Max size of uploaded files (also for files from URL, size uncompressed):
......@@ -60,7 +59,6 @@ python3_exec = python3
memory = 32
# Max memory for all-vs-all mode:
memory_ava = 32
threads = 4
[database]
......
......@@ -13,3 +13,4 @@ drmaa==0.7.*
intervaltree==2.1.*
argparse==1.4
Markdown==2.6.*
pyyaml==3.12
......@@ -7,54 +7,28 @@ from split_fa import Splitter
from filter_contigs import Filter
from index import index_file
parser = argparse.ArgumentParser(description="Split huge contigs")
parser.add_argument('-q', '--query', type=str, required=False, help="Query fasta file")
parser.add_argument('-u', '--query-split', type=str, required=False, help="Query fasta file split")
parser.add_argument('-t', '--target', type=str, required=True, help="Target fasta file")
parser.add_argument('-n', '--query-name', type=str, required=False, help="Query name")
parser.add_argument('-m', '--target-name', type=str, required=True, help="Target name")
parser.add_argument('-s', '--size', type=int, required=False, default=10,
help="Max size of contigs (Mb) - for query split")
parser.add_argument('-p', '--preptime-file', type=str, required=True, help="File into save prep times")
args = parser.parse_args()
out_dir = os.path.dirname(args.target)
with open(args.preptime_file, "w") as ptime:
ptime.write(str(round(time.time())) + "\n")
if args.query is not None:
print("Splitting query...")
fasta_in = args.query
index_split = os.path.join(out_dir, "query_split.idx")
splitter = Splitter(input_f=fasta_in, name_f=args.query_name, output_f=args.query_split,
query_index=index_split)
if splitter.split():
filtered_fasta = os.path.join(os.path.dirname(args.query_split), "filtered_" +
os.path.basename(args.query_split))
filter_f = Filter(fasta=args.query_split,
index_file=index_split,
type_f="query",
min_filtered=splitter.nb_contigs / 4,
split=True,
out_fasta=filtered_fasta,
replace_fa=True)
filter_f.filter()
else:
exit(1)
print("Indexing target...")
def index_fasta(name, filepath, out_dir, type_f):
"""
Index and filter fasta
:param name: name of the specie
:param filepath: full path of the fasta file
:param out_dir: output folder
:param type_f: type of fasta (query or target)
"""
uncompressed = None
if args.target.endswith(".gz"):
uncompressed = args.target[:-3]
target_index = os.path.join(out_dir, "target.idx")
success, nb_contigs = index_file(args.target, args.target_name, target_index, uncompressed)
if filepath.endswith(".gz"):
uncompressed = filepath[:-3]
index = os.path.join(out_dir, type_f + ".idx")
success, nb_contigs = index_file(filepath, name, index, uncompressed)
if success:
in_fasta = args.target
in_fasta = filepath
if uncompressed is not None:
in_fasta = uncompressed
filtered_fasta = os.path.join(os.path.dirname(in_fasta), "filtered_" + os.path.basename(in_fasta))
filter_f = Filter(fasta=in_fasta,
index_file=target_index,
type_f="target",
index_file=index,
type_f=type_f,
min_filtered=nb_contigs / 4,
split=False,
out_fasta=filtered_fasta,
......@@ -62,8 +36,8 @@ with open(args.preptime_file, "w") as ptime:
is_filtered = filter_f.filter()
if uncompressed is not None:
if is_filtered:
os.remove(args.target)
with open(os.path.join(out_dir, ".target"), "w") as save_file:
os.remove(filepath)
with open(os.path.join(out_dir, "." + type_f), "w") as save_file:
save_file.write(uncompressed)
else:
os.remove(uncompressed)
......@@ -74,6 +48,50 @@ with open(args.preptime_file, "w") as ptime:
except FileNotFoundError:
pass
exit(1)
parser = argparse.ArgumentParser(description="Split huge contigs")
parser.add_argument('-q', '--query', type=str, required=False, help="Query fasta file")
parser.add_argument('-u', '--query-split', type=str, required=False, help="Query fasta file split")
parser.add_argument('-t', '--target', type=str, required=True, help="Target fasta file")
parser.add_argument('-n', '--query-name', type=str, required=False, help="Query name")
parser.add_argument('-m', '--target-name', type=str, required=True, help="Target name")
parser.add_argument('-s', '--size', type=int, required=False, default=10,
help="Max size of contigs (Mb) - for query split")
parser.add_argument('-p', '--preptime-file', type=str, required=True, help="File into save prep times")
parser.add_argument('--split', type=bool, const=True, nargs="?", required=False, default=False,
help="Split query")
args = parser.parse_args()
out_dir = os.path.dirname(args.target)
with open(args.preptime_file, "w") as ptime:
ptime.write(str(round(time.time())) + "\n")
if args.query is not None:
if args.split:
print("Splitting query...")
fasta_in = args.query
index_split = os.path.join(out_dir, "query_split.idx")
splitter = Splitter(input_f=fasta_in, name_f=args.query_name, output_f=args.query_split,
query_index=index_split)
if splitter.split():
filtered_fasta = os.path.join(os.path.dirname(args.query_split), "filtered_" +
os.path.basename(args.query_split))
filter_f = Filter(fasta=args.query_split,
index_file=index_split,
type_f="query",
min_filtered=splitter.nb_contigs / 4,
split=True,
out_fasta=filtered_fasta,
replace_fa=True)
filter_f.filter()
else:
exit(1)
else:
index_fasta(name=args.query_name, filepath=args.query, out_dir=out_dir, type_f="query")
print("Indexing target...")
index_fasta(name=args.target_name, filepath=args.target, out_dir=out_dir, type_f="target")
ptime.write(str(round(time.time())) + "\n")
print("DONE!")
......@@ -50,7 +50,7 @@ def start_job(id_job, batch_system_type="local"):
job = Job.get(Job.id_job == id_job)
job.status = "starting"
job.save()
job_mng = JobManager(id_job=id_job, email=job.email)
job_mng = JobManager(id_job=id_job, email=job.email, tool=job.tool)
job_mng.set_inputs_from_res_dir()
job_mng.run_job_in_thread(batch_system_type)
......@@ -85,7 +85,7 @@ def prepare_job(id_job):
job = Job.get(Job.id_job == id_job)
job.status = "preparing"
job.save()
job_mng = JobManager(id_job=id_job, email=job.email)
job_mng = JobManager(id_job=id_job, email=job.email, tool=job.tool)
job_mng.set_inputs_from_res_dir()
job_mng.prepare_data_in_thread()
......
......@@ -33,8 +33,7 @@ class AppConfigReader:
if os.path.exists(my_config_file):
config_file.append(my_config_file)
if len(config_file) == 0:
raise FileNotFoundError("ERROR: application.properties not found. Please copy the example file and "
"check properties are correct for you!")
raise FileNotFoundError("ERROR: application.properties not found.")
self.reader = RawConfigParser()
self.reader.read(config_file)
for attr in dir(self):
......@@ -80,12 +79,6 @@ class AppConfigReader:
except NoOptionError:
return "local"
def _get_nb_threads(self):
try:
return self.reader.get("global", "threads_local")
except NoOptionError:
return "4"
def _get_web_url(self):
try:
return self._replace_vars(self.reader.get("global", "web_url"))
......@@ -140,20 +133,6 @@ class AppConfigReader:
except NoOptionError:
return 1024 * 1024 * 1024
def _get_minimap2_exec(self):
try:
entry = self.reader.get("softwares", "minimap2")
return entry if entry != "###DEFAULT###" else os.path.join(self.app_dir, "bin", "minimap2")
except NoOptionError:
return os.path.join(self.app_dir, "bin", "minimap2")
def _get_minimap2_cluster_exec(self):
try:
entry = self.reader.get("softwares", "minimap2_cluster")
return entry if entry != "###DEFAULT###" else "minimap2"
except NoOptionError:
return self._get_minimap2_exec()
def _get_database_type(self):
try:
return self.reader.get("database", "type")
......@@ -356,8 +335,6 @@ class AppConfigReader:
def _get_cluster_memory(self):
try:
memory = int(self.reader.get("cluster", "memory"))
if memory % self._get_cluster_threads() != 0:
raise ValueError("ERROR in config: cluster memory must be divisible by the number of cluster threads!")
return memory
except (NoOptionError, NoSectionError):
return 32
......@@ -365,18 +342,10 @@ class AppConfigReader:
def _get_cluster_memory_ava(self):
try:
memory = int(self.reader.get("cluster", "memory_ava"))
if memory % self._get_cluster_threads() != 0:
raise ValueError("ERROR in config: cluster memory must be divisible by the number of cluster threads!")
return memory
except (NoOptionError, NoSectionError):
return self._get_cluster_memory()
def _get_cluster_threads(self):
try:
return int(self.reader.get("cluster", "threads"))
except (NoOptionError, NoSectionError):
return 4
def _get_debug(self):
try:
return self.reader.get("debug", "enable").lower() == "true"
......
......@@ -68,6 +68,7 @@ if MODE == "webserver":
error = CharField(default="")
mem_peak = IntegerField(null=True)
time_elapsed = IntegerField(null=True)
tool = CharField(default="minimap2", max_length=50)
class Gallery(BaseModel):
......
......@@ -8,6 +8,8 @@ import time
import threading
import re
from dgenies.config_reader import AppConfigReader
from dgenies.tools import Tools
import dgenies.lib.parsers
from .fasta import Fasta
from .functions import Functions
import requests
......@@ -36,7 +38,8 @@ if MODE == "webserver":
class JobManager:
def __init__(self, id_job: str, email: str=None, query: Fasta=None, target: Fasta=None, mailer=None):
def __init__(self, id_job: str, email: str=None, query: Fasta=None, target: Fasta=None, mailer=None,
tool="minimap2"):
self.id_job = id_job
self.email = email
self.query = query
......@@ -45,6 +48,8 @@ class JobManager:
self.id_process = "-1"
# Get configs:
self.config = AppConfigReader()
self.tools = Tools().tools
self.tool = self.tools[tool]
# Outputs:
self.output_dir = os.path.join(self.config.app_data, id_job)
self.preptime_file = os.path.join(self.output_dir, "prep_times")
......@@ -70,6 +75,8 @@ class JobManager:
return file_size
def get_query_split(self):
if not self.tool.split_before:
return self.query.get_path()
query_split = os.path.join(self.output_dir, "split_" + os.path.basename(self.query.get_path()))
if query_split.endswith(".gz"):
return query_split[:-3]
......@@ -191,13 +198,25 @@ class JobManager:
else:
cmd = []
if self.query is not None:
cmd += [self.config.minimap2_exec, "-t", self.config.nb_threads,
self.target.get_path(), self.get_query_split()]
command_line = self.tool.command_line.replace("{query}", self.query.get_path())
else:
cmd += [self.config.minimap2_exec, "-t", self.config.nb_threads, "-X",
self.target.get_path(), self.target.get_path()]
with open(self.logs, "w") as logs, open(self.paf_raw, "w") as paf_raw:
p = subprocess.Popen(cmd, stdout=paf_raw, stderr=logs)
command_line = self.tool.all_vs_all
out_file = None
if ">" in command_line:
out_file = self.paf_raw
command_line = command_line[:command_line.index(">")]
command_line = command_line.replace("{exe}", self.tool.exec) \
.replace("{target}", self.target.get_path()) \
.replace("{threads}", str(self.tool.threads)) \
.replace("{out}", self.paf_raw)
cmd += command_line.split(" ")
if out_file is None:
with open(self.logs, "w") as logs:
p = subprocess.Popen(cmd, stdout=logs, stderr=logs)
else:
with open(self.logs, "w") as logs, open(out_file, "w") as out:
p = subprocess.Popen(cmd, stdout=out, stderr=logs)
with Job.connect():
status = "started"
if MODE == "webserver":
......@@ -301,7 +320,7 @@ class JobManager:
jt = s.createJobTemplate()
jt.remoteCommand = command
jt.args = args
jt.jobName = "_".join([step, self.id_job])
jt.jobName = "_".join([step[:2], self.id_job])
if log_out == log_err:
jt.joinFiles = True
jt.outputPath = ":" + log_out
......@@ -310,6 +329,16 @@ class JobManager:
jt.outputPath = ":" + log_out
jt.errorPath = ":" + log_err
memory = self.config.cluster_memory
if self.query is None:
memory = self.config.cluster_memory_ava
if memory > 32:
name, order, contigs, reversed_c, abs_start, c_len = Index.load(self.idx_t, False)
if c_len <= 500000000:
memory = 32
if memory > self.tool.max_memory:
memory = self.tool.max_memory
native_specs = self.config.drmaa_native_specs
if batch_system_type == "slurm":
if native_specs == "###DEFAULT###":
......@@ -317,15 +346,8 @@ class JobManager:
if step == "prepare":
jt.nativeSpecification = native_specs.format(8000, 1, "02:00:00")
elif step == "start":
memory = self.config.cluster_memory
if self.query is None:
memory = self.config.cluster_memory_ava
if memory > 32:
name, order, contigs, reversed_c, abs_start, c_len = Index.load(self.idx_t, False)
if c_len <= 500000000:
memory = 32
jt.nativeSpecification = native_specs.format(memory // self.config.cluster_threads * 1000,
self.config.cluster_threads, "02:00:00")
jt.nativeSpecification = native_specs.format(memory // self.tool.threads_cluster * 1000,
self.tool.threads_cluster, "02:00:00")
elif batch_system_type == "sge":
if native_specs == "###DEFAULT###":
native_specs = "-l mem={0},h_vmem={0} -pe parallel_smp {1}"
......@@ -333,7 +355,7 @@ class JobManager:
jt.nativeSpecification = native_specs.format(8000, 1)
elif step == "start":
jt.nativeSpecification = native_specs.format(
self.config.cluster_memory // self.config.cluster_threads * 1000, self.config.cluster_threads)
memory // self.tool.threads_cluster * 1000, self.tool.threads_cluster)
jt.workingDirectory = self.output_dir
jobid = s.runJob(jt)
self.id_process = jobid
......@@ -359,14 +381,24 @@ class JobManager:
def __launch_drmaa(self, batch_system_type):
if self.query is not None:
args = ["-t", self.config.nb_threads, self.target.get_path(), self.get_query_split()]
args = re.sub("{exe}\s?", "", self.tool.command_line).replace("{query}", self.get_query_split())
else:
args = ["-t", self.config.nb_threads, "-X", self.target.get_path(), self.target.get_path()]
args = re.sub("{exe}\s?", "", self.tool.all_vs_all)
out_file = self.logs
if ">" in args:
out_file = self.paf_raw
args = args[:args.index(">")]
args.replace("{target}", self.target.get_path()) \
.replace("{threads}", str(self.tool.threads_cluster)) \
.replace("{out}", self.paf_raw)
args = args.split(" ")
return self.launch_to_cluster(step="start",
batch_system_type=batch_system_type,
command=self.config.minimap2_cluster_exec,
command=self.tool.exec,
args=args,
log_out=self.paf_raw,
log_out=out_file,
log_err=self.logs)
def __getting_local_file(self, fasta: Fasta, type_f):
......@@ -688,6 +720,8 @@ class JobManager:
args += ["-q", self.query.get_path(),
"-u", self.get_query_split(),
"-n", self.query.get_name()]
if self.tool.split_before:
args.append("--split")
return self.launch_to_cluster(step="prepare",
batch_system_type=batch_system_type,
command=self.config.cluster_python_exec,
......@@ -709,15 +743,30 @@ class JobManager:
error_tail = "Please check your input file and try again."
if self.query is not None:
fasta_in = self.query.get_path()
splitter = Splitter(input_f=fasta_in, name_f=self.query.get_name(), output_f=self.get_query_split(),
query_index=self.query_index_split, debug=DEBUG)
if splitter.split():
if self.tool.split_before:
split = True
splitter = Splitter(input_f=fasta_in, name_f=self.query.get_name(), output_f=self.get_query_split(),
query_index=self.query_index_split, debug=DEBUG)
success = splitter.split()
nb_contigs = splitter.nb_contigs
in_fasta = self.get_query_split()
else:
split = False
uncompressed = None
if self.query.get_path().endswith(".gz"):
uncompressed = self.query.get_path()[:-3]
success, nb_contigs = index_file(self.query.get_path(), self.query.get_name(), self.idx_q,
uncompressed)
in_fasta = self.query.get_path()
if uncompressed is not None:
in_fasta = uncompressed
if success:
filtered_fasta = os.path.join(os.path.dirname(self.get_query_split()), "filtered_" +
os.path.basename(self.get_query_split()))
filter_f = Filter(fasta=self.get_query_split(),
index_file=self.query_index_split,
filter_f = Filter(fasta=in_fasta,
index_file=self.query_index_split if split else self.idx_q,
type_f="query",
min_filtered=round(splitter.nb_contigs / 4),
min_filtered=round(nb_contigs / 4),
split=True,
out_fasta=filtered_fasta,
replace_fa=True)
......@@ -824,7 +873,7 @@ class JobManager:
job.save()
else:
self.set_status_standalone(status)
if self.query is not None:
if self.tool.split_before and self.query is not None:
start = time.time()
paf_raw = self.paf_raw + ".split"
os.remove(self.get_query_split())
......@@ -837,9 +886,14 @@ class JobManager:
end = time.time()
if MODE == "webserver":
job.time_elapsed += end - start
else:
elif self.query is None:
shutil.copyfile(self.idx_t, self.idx_q)
Path(os.path.join(self.output_dir, ".all-vs-all")).touch()
if self.tool.parser is not None:
paf_raw = self.paf_raw + ".parsed"
dgenies.lib.parsers.__dict__.get(self.tool.parser)(self.paf_raw, paf_raw)
os.remove(self.paf_raw)
self.paf_raw = paf_raw
sorter = Sorter(self.paf_raw, self.paf)
sorter.sort()
os.remove(self.paf_raw)
......@@ -933,7 +987,7 @@ class JobManager:
j11.delete_instance()
if self.target is not None:
job = Job.create(id_job=self.id_job, email=self.email, batch_type=self.config.batch_system_type,
date_created=datetime.now())
date_created=datetime.now(), tool=self.tool.name)
job.save()
if not os.path.exists(self.output_dir):
os.mkdir(self.output_dir)
......@@ -941,7 +995,7 @@ class JobManager:
thread.start()
else:
job = Job.create(id_job=self.id_job, email=self.email, batch_type=self.config.batch_system_type,
date_created=datetime.now(), status="fail")
date_created=datetime.now(), tool=self.tool.name, status="fail")
job.save()
def set_status_standalone(self, status, error=""):
......
"""
Define tools parsers here
Each parser (main function) must have 2 and only 2 arguments:
- First argument: input file which is the tool raw output
- Second argument: finale PAF file
"""
def mashmap2paf(in_paf, out_paf):
with open(in_paf, "r") as in_p, open(out_paf, "w") as out_p:
for line in in_p:
parts = line.rstrip().split(" ")
parts[9] = str(round(float(parts[9]) / 100.0 * 1000.0))
parts.append("1000")
out_p.write("\t".join(parts) + "\n")
......@@ -12,13 +12,16 @@ dgenies.run.allow_upload = false;
dgenies.run.ping_interval = null;
dgenies.run.target_example = "";
dgenies.run.query_example = "";
dgenies.run.tool_has_ava = {};
dgenies.run.init = function (s_id, allowed_ext, max_upload_file_size=1073741824, target_example="", query_example="") {
dgenies.run.init = function (s_id, allowed_ext, max_upload_file_size=1073741824, target_example="", query_example="",
tool_has_ava={}) {
dgenies.run.s_id = s_id;
dgenies.run.allowed_ext = allowed_ext;
dgenies.run.max_upload_file_size = max_upload_file_size;
dgenies.run.target_example = target_example;
dgenies.run.query_example = query_example;
dgenies.run.tool_has_ava = tool_has_ava;
dgenies.run.restore_form();
dgenies.run.set_events();
dgenies.run.init_fileuploads();
......@@ -276,7 +279,8 @@ dgenies.run.do_submit = function () {
"query_type": $("select.query").find(":selected").text().toLowerCase(),
"target": $("input#target").val(),
"target_type": $("select.target").find(":selected").text().toLowerCase(),
"s_id": dgenies.run.s_id
"s_id": dgenies.run.s_id,
"tool": $("input[name=tool]:checked").val()
},
function (data, status) {
if (data["success"]) {
......@@ -336,6 +340,14 @@ dgenies.run.valid_form = function () {
has_errors = true;
}
//Check input query:
let tool = $("input[name=tool]:checked").val();
if (!dgenies.run.tool_has_ava[tool] && $("input#query").val().length === 0) {
$("label.file-query").addClass("error");
dgenies.run.add_error("Query fasta is required!");
has_errors = true;
}
// Returns
return !has_errors;
};
......
......@@ -16,7 +16,8 @@
{% endblock %}
{% block onload %}
{{ super() }}
dgenies.run.init('{{ s_id }}',{{ allowed_ext }}, {{ max_upload_file_size }}, '{{ target }}', '{{ query }}');
dgenies.run.init('{{ s_id }}',{{ allowed_ext }}, {{ max_upload_file_size }}, '{{ target }}', '{{ query }}',
{{ tools_ava }});
{% endblock %}
{% block content %}
<form id="submit_minimap" method=post action="#">
......@@ -105,6 +106,26 @@
<div class="file-size query"></div>
</td>
</tr>
{% if tools | length > 1 %}
<tr>
<td>
<label class="tools">Aligner</label>
</td>
<td>
{% for tool in tools_names %}
<label style="vertical-align: bottom; line-height: 35px;" class="radio-inline">
<input type="radio" id="tool-{{ tool }}" name="tool" value="{{ tool }}"
{% if loop.index0 == 0 %}checked{% endif %} style="vertical-align: top;"/>
{{ tool | capitalize }}
<span class="glyphicon glyphicon-question-sign" style="color: #2aabd2"
title="{{ tools[tool].help }}"></span></label>
{% endfor %}
</td>
</tr>
{% else %}
<input type="radio" name="tool" id="tool" value="{{ tools[0] }}" style="display: none;"
checked/>
{% endif %}
</tbody>
</table>