job_manager.py 46.1 KB
Newer Older
1
2
from dgenies import MODE, DEBUG

3
import os
4
import shutil
5
import subprocess
6
from datetime import datetime
7
import time
8
import threading
9
import re
10
from dgenies.config_reader import AppConfigReader
11
12
from dgenies.tools import Tools
import dgenies.lib.parsers
13
14
from .fasta import Fasta
from .functions import Functions
15
import requests
16
from requests.exceptions import ConnectionError
17
18
from urllib.request import urlretrieve
from urllib.error import URLError
19
from jinja2 import Template
20
import traceback
21
from pathlib import Path
22
from urllib import request, parse
23
from dgenies.bin.split_fa import Splitter
24
from dgenies.bin.index import index_file, Index
25
from dgenies.bin.filter_contigs import Filter
26
27
from dgenies.bin.merge_splitted_chrms import Merger
from dgenies.bin.sort_paf import Sorter
28
from dgenies.lib.paf import Paf
29
30
import gzip
import io
31
import binascii
32
33
34
from dgenies.database import Job

if MODE == "webserver":
35
    from dgenies.database import Session, Gallery
36
    from peewee import DoesNotExist
37
38
39
40


class JobManager:

41
42
    def __init__(self, id_job: str, email: str=None, query: Fasta=None, target: Fasta=None, mailer=None,
                 tool="minimap2"):
43
44
        self.id_job = id_job
        self.email = email
45
46
        self.query = query
        self.target = target
47
        self.error = ""
48
        self.id_process = "-1"
49
        # Get configs:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
50
        self.config = AppConfigReader()
51
52
        self.tools = Tools().tools
        self.tool = self.tools[tool]
53
        # Outputs:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
54
        self.output_dir = os.path.join(self.config.app_data, id_job)
55
        self.preptime_file = os.path.join(self.output_dir, "prep_times")
56
        self.query_index_split = os.path.join(self.output_dir, "query_split.idx")
57
        self.paf = os.path.join(self.output_dir, "map.paf")
58
        self.paf_raw = os.path.join(self.output_dir, "map_raw.paf")
59
60
        self.idx_q = os.path.join(self.output_dir, "query.idx")
        self.idx_t = os.path.join(self.output_dir, "target.idx")
61
        self.logs = os.path.join(self.output_dir, "logs.txt")
62
        self.mailer = mailer
63
        self._filename_for_url = {}
64

65
66
67
68
69
    @staticmethod
    def is_gz_file(filepath):
        with open(filepath, 'rb') as test_f:
            return binascii.hexlify(test_f.read(2)) == b'1f8b'

70
71
72
    def get_file_size(self, filepath: str):
        file_size = os.path.getsize(filepath)
        if filepath.endswith(".gz") and file_size <= self.config.max_upload_size:
73
74
            with gzip.open(filepath, 'rb') as file_obj:
                file_size = file_obj.seek(0, io.SEEK_END)
75
76
        return file_size

77
    def get_query_split(self):
78
79
        if not self.tool.split_before:
            return self.query.get_path()
80
81
82
83
84
        query_split = os.path.join(self.output_dir, "split_" + os.path.basename(self.query.get_path()))
        if query_split.endswith(".gz"):
            return query_split[:-3]
        return query_split

85
    def set_inputs_from_res_dir(self):
Floreal Cabanettes's avatar
Floreal Cabanettes committed
86
        res_dir = os.path.join(self.config.app_data, self.id_job)
87
88
89
90
91
        query_file = os.path.join(res_dir, ".query")
        if os.path.exists(query_file):
            with open(query_file) as q_f:
                file_path = q_f.readline()
                self.query = Fasta(
Floreal Cabanettes's avatar
Floreal Cabanettes committed
92
                    name=os.path.splitext(os.path.basename(file_path.replace(".gz", "")).split("_", 1)[1])[0],
93
                    path=file_path,
94
95
96
97
98
99
100
                    type_f="local"
                )
        target_file = os.path.join(res_dir, ".target")
        if os.path.exists(target_file):
            with open(target_file) as t_f:
                file_path = t_f.readline()
                self.target = Fasta(
Floreal Cabanettes's avatar
Floreal Cabanettes committed
101
                    name=os.path.splitext(os.path.basename(file_path.replace(".gz", "")).split("_", 1)[1])[0],
102
103
104
105
                    path=file_path,
                    type_f="local"
                )

106
    def check_job_success(self):
107
108
        if os.path.exists(self.paf_raw):
            if os.path.getsize(self.paf_raw) > 0:
109
                return "success"
110
111
            else:
                return "no-match"
Floreal Cabanettes's avatar
Floreal Cabanettes committed
112
        return "fail"
113

114
115
116
117
118
119
    def is_query_filtered(self):
        return os.path.exists(os.path.join(self.output_dir, ".filter-query"))

    def is_target_filtered(self):
        return os.path.exists(os.path.join(self.output_dir, ".filter-target"))

120
    def get_mail_content(self, status):
121
        message = "D-Genies\n\n"
122
        if status == "success":
Floreal Cabanettes's avatar
Floreal Cabanettes committed
123
            message += "Your job %s was completed successfully!\n\n" % self.id_job
124
            message += str("Your job {0} is finished. You can see  the results by clicking on the link below:\n"
Floreal Cabanettes's avatar
Floreal Cabanettes committed
125
                           "{1}/result/{0}\n\n").format(self.id_job, self.config.web_url)
126
127
        else:
            message += "Your job %s has failed!\n\n" % self.id_job
128
129
            if self.error != "":
                message += self.error.replace("#ID#", self.id_job).replace("<br/>", "\n")
130
                message += "\n\n"
131
132
133
            else:
                message += "Your job %s has failed. You can try again. " \
                           "If the problem persists, please contact the support.\n\n" % self.id_job
134
        message += "Sequences compared in this analysis:\n"
135
136
137
138
        if self.query is not None:
            message += "Target: %s\nQuery: %s\n\n" % (self.target.get_name(), self.query.get_name())
        else:
            message += "Target: %s\n\n" % self.target.get_name()
139
140
141
142
143
144
145
146
147
        if status == "success":
            if self.is_target_filtered():
                message += str("Note: target fasta has been filtered because it contains too small contigs."
                               "To see which contigs has been removed from the analysis, click on the link below:\n"
                               "{1}/filter-out/{0}/target\n\n").format(self.id_job, self.config.web_url)
            if self.is_query_filtered():
                message += str("Note: query fasta has been filtered because it contains too small contigs."
                               "To see which contigs has been removed from the analysis, click on the link below:\n"
                               "{1}/filter-out/{0}/query\n\n").format(self.id_job, self.config.web_url)
148
149
        message += "See you soon on D-Genies,\n"
        message += "The team"
150
        return message
151

152
    def get_mail_content_html(self, status):
153
        with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), "mail_templates", "job_notification.html"))\
Floreal Cabanettes's avatar
Floreal Cabanettes committed
154
155
                as t_file:
            template = Template(t_file.read())
Floreal Cabanettes's avatar
Floreal Cabanettes committed
156
            return template.render(job_name=self.id_job, status=status, url_base=self.config.web_url,
157
158
                                   query_name=self.query.get_name() if self.query is not None else "",
                                   target_name=self.target.get_name(),
159
160
                                   error=self.error,
                                   target_filtered=self.is_target_filtered(), query_filtered=self.is_query_filtered())
161

162
    def get_mail_subject(self, status):
163
        if status == "success" or status == "no-match":
Floreal Cabanettes's avatar
Floreal Cabanettes committed
164
            return "DGenies - Job completed: %s" % self.id_job
165
        else:
166
            return "DGenies - Job failed: %s" % self.id_job
167

168
169
    def send_mail(self):
        # Retrieve infos:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
170
171
172
173
174
175
        with Job.connect():
            job = Job.get(Job.id_job == self.id_job)
            if self.email is None:
                self.email = job.email
            status = job.status
            self.error = job.error
176

Floreal Cabanettes's avatar
Floreal Cabanettes committed
177
            # Send:
178
179
180
181
            self.mailer.send_mail(recipients=[self.email],
                                  subject=self.get_mail_subject(status),
                                  message=self.get_mail_content(status),
                                  message_html=self.get_mail_content_html(status))
182

183
184
185
    def search_error(self):
        logs = os.path.join(self.output_dir, "logs.txt")
        if os.path.exists(logs):
186
187
188
189
190
            lines = subprocess.check_output(['tail', '-2', logs]).decode("utf-8").split("\n")
            if re.match(r"\[morecore] \d+ bytes requested but not available.", lines[1]) or \
                    re.match(r"\[morecore] \d+ bytes requested but not available.", lines[1]) or \
                    re.match(r"\[morecore] insufficient memory", lines[0]) or \
                    re.match(r"\[morecore] insufficient memory", lines[1]):
191
192
193
194
                return "Your job #ID# has failed because of memory limit exceeded. May be your sequences are too big?" \
                       "<br/>You can contact the support for more information."
        return "Your job #ID# has failed. You can try again.<br/>If the problem persists, please contact the support."

195
    def __launch_local(self):
196
197
198
199
        if MODE == "webserver":
            cmd = ["/usr/bin/time", "-f", "%e %M"]
        else:
            cmd = []
200
        if self.query is not None:
201
            command_line = self.tool.command_line.replace("{query}", self.query.get_path())
202
        else:
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
            command_line = self.tool.all_vs_all
        out_file = None
        if ">" in command_line:
            out_file = self.paf_raw
            command_line = command_line[:command_line.index(">")]
        command_line = command_line.replace("{exe}", self.tool.exec) \
                                   .replace("{target}", self.target.get_path()) \
                                   .replace("{threads}", str(self.tool.threads)) \
                                   .replace("{out}", self.paf_raw)

        cmd += command_line.split(" ")
        if out_file is None:
            with open(self.logs, "w") as logs:
                p = subprocess.Popen(cmd, stdout=logs, stderr=logs)
        else:
            with open(self.logs, "w") as logs, open(out_file, "w") as out:
                p = subprocess.Popen(cmd, stdout=out, stderr=logs)
Floreal Cabanettes's avatar
Floreal Cabanettes committed
220
        with Job.connect():
221
222
223
224
225
226
227
228
229
            status = "started"
            if MODE == "webserver":
                job = Job.get(Job.id_job == self.id_job)
                job.id_process = p.pid
                job.status = status
                job.save()
            else:
                job = None
                self.set_status_standalone(status)
Floreal Cabanettes's avatar
Floreal Cabanettes committed
230
231
232
            p.wait()
            if p.returncode == 0:
                status = self.check_job_success()
233
234
235
236
237
                if MODE == "webserver":
                    job.status = status
                    job.save()
                else:
                    self.set_status_standalone(status)
Floreal Cabanettes's avatar
Floreal Cabanettes committed
238
239
                return status == "success"
            self.error = self.search_error()
240
241
242
243
244
245
246
            status = "fail"
            if MODE == "webserver":
                job.status = status
                job.error = self.error
                job.save()
            else:
                self.set_status_standalone(status, self.error)
247
        return False
248

249
250
251
252
253
    def check_job_status_slurm(self):
        """
        Check status of a SLURM job run
        :return: True if the job has successfully ended
        """
Floreal Cabanettes's avatar
Floreal Cabanettes committed
254
        status = subprocess.check_output("sacct -p -n --format=state,maxvmsize,elapsed -j %s.batch" % self.id_process,
255
                                         shell=True).decode("utf-8").strip("\n")
256

257
        status = status.split("|")
258

259
260
261
        success = status[0] == "COMPLETED"
        if success:
            mem_peak = int(status[1][:-1])  # Remove the K letter
Floreal Cabanettes's avatar
Floreal Cabanettes committed
262
            elapsed_full = list(map(int, status[2].split(":")))
263
264
265
            elapsed = elapsed_full[0] * 3600 + elapsed_full[1] * 60 + elapsed_full[2]
            with open(self.logs, "a") as logs:
                logs.write("%s %d" % (elapsed, mem_peak))
266
267
268
269
270
271
272
273

        return success

    def check_job_status_sge(self):
        """
        Check status of a SGE job run
        :return: True if the job jas successfully ended
        """
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
        status = "-1"
        start = None
        end = None
        mem_peak = None
        acct = subprocess.check_output("qacct -d 1 -j %s" % self.id_process,
                                         shell=True).decode("utf-8")
        lines = acct.split("\n")
        for line in lines:
            if line.startswith("failed"):
                status = re.split(r"\s+", line, 1)[1]
            elif line.startswith("start_time"):
                start = datetime.strptime(re.split(r"\s+", line, 1)[1], "%a %b %d %H:%M:%S %Y")
            elif line.startswith("end_time"):
                end = datetime.strptime(re.split(r"\s+", line, 1)[1], "%a %b %d %H:%M:%S %Y")
            elif line.startswith("maxvmem"):
                mem_peak = re.split(r"\s+", line, 1)[1]
                if mem_peak.endswith("G"):
                    mem_peak = int(mem_peak[-1]) * 1024 * 1024
                elif mem_peak.endswith("M"):
                    mem_peak = int(mem_peak[-1]) * 1024

        if status == "0":
            if start is not None and end is not None and mem_peak is not None:
                elapsed = end - start
                elapsed = elapsed.seconds
                with open(self.logs, "a") as logs:
                    logs.write("%s %d" % (elapsed, mem_peak))
301
302
303
304

        return status == "0"

    def update_job_status(self, status, id_process=None):
305
306
307
308
309
310
311
312
313
        if MODE == "webserver":
            with Job.connect():
                job = Job.get(Job.id_job == self.id_job)
                job.status = status
                if id_process is not None:
                    job.id_process = id_process
                job.save()
        else:
            self.set_status_standalone(status)
314

315
    def launch_to_cluster(self, step, batch_system_type, command, args, log_out, log_err):
316
        import drmaa
317
        from dgenies.lib.drmaasession import DrmaaSession
318
319
320
        drmaa_session = DrmaaSession()
        s = drmaa_session.session
        jt = s.createJobTemplate()
321
322
        jt.remoteCommand = command
        jt.args = args
323
        jt.jobName = "_".join([step[:2], self.id_job])
324
325
        if log_out == log_err:
            jt.joinFiles = True
Floreal Cabanettes's avatar
Floreal Cabanettes committed
326
            jt.outputPath = ":" + log_out
327
        else:
328
329
330
            jt.joinFiles = False
            jt.outputPath = ":" + log_out
            jt.errorPath = ":" + log_err
331

332
333
334
335
336
337
338
339
340
341
        memory = self.config.cluster_memory
        if self.query is None:
            memory = self.config.cluster_memory_ava
            if memory > 32:
                name, order, contigs, reversed_c, abs_start, c_len = Index.load(self.idx_t, False)
                if c_len <= 500000000:
                    memory = 32
        if memory > self.tool.max_memory:
            memory = self.tool.max_memory

342
343
344
345
        native_specs = self.config.drmaa_native_specs
        if batch_system_type == "slurm":
            if native_specs == "###DEFAULT###":
                native_specs = "--mem-per-cpu={0} --ntasks={1} --time={2}"
346
347
348
            if step == "prepare":
                jt.nativeSpecification = native_specs.format(8000, 1, "02:00:00")
            elif step == "start":
349
350
                jt.nativeSpecification = native_specs.format(memory // self.tool.threads_cluster * 1000,
                                                             self.tool.threads_cluster, "02:00:00")
351
        elif batch_system_type == "sge":
352
353
            if native_specs == "###DEFAULT###":
                native_specs = "-l mem={0},h_vmem={0} -pe parallel_smp {1}"
354
355
356
            if step == "prepare":
                jt.nativeSpecification = native_specs.format(8000, 1)
            elif step == "start":
357
                jt.nativeSpecification = native_specs.format(
358
                    memory // self.tool.threads_cluster * 1000, self.tool.threads_cluster)
359
360
361
362
        jt.workingDirectory = self.output_dir
        jobid = s.runJob(jt)
        self.id_process = jobid

363
        self.update_job_status("scheduled-cluster" if step == "start" else "prepare-scheduled", jobid)
364
365
366

        retval = s.wait(jobid, drmaa.Session.TIMEOUT_WAIT_FOREVER)
        if retval.hasExited and (self.check_job_status_slurm() if batch_system_type == "slurm" else
367
368
369
370
371
        self.check_job_status_sge()):
            if step == "start":
                status = self.check_job_success()
            else:
                status = "prepared"
372
373
374
375
376
            # job = Job.get(id_job=self.id_job)
            # job.status = status
            # db.commit()
            self.update_job_status(status)
            s.deleteJobTemplate(jt)
377
            return status == "success" or status == "prepared"
378
379
380
381
        self.update_job_status("fail")
        s.deleteJobTemplate(jt)
        return False

382
383
    def __launch_drmaa(self, batch_system_type):
        if self.query is not None:
384
            args = re.sub("{exe}\s?", "", self.tool.command_line).replace("{query}", self.get_query_split())
385
        else:
386
387
388
389
390
            args = re.sub("{exe}\s?", "", self.tool.all_vs_all)
        out_file = self.logs
        if ">" in args:
            out_file = self.paf_raw
            args = args[:args.index(">")]
Floreal Cabanettes's avatar
Floreal Cabanettes committed
391
392
393
        args = args.replace("{target}", self.target.get_path()) \
                   .replace("{threads}", str(self.tool.threads_cluster)) \
                   .replace("{out}", self.paf_raw)
394
395
396

        args = args.split(" ")

397
398
        return self.launch_to_cluster(step="start",
                                      batch_system_type=batch_system_type,
399
                                      command=self.tool.exec,
400
                                      args=args,
401
                                      log_out=out_file,
402
403
                                      log_err=self.logs)

404
405
    def __getting_local_file(self, fasta: Fasta, type_f):
        finale_path = os.path.join(self.output_dir, type_f + "_" + os.path.basename(fasta.get_path()))
406
407
408
409
        if fasta.is_example():
            shutil.copy(fasta.get_path(), finale_path)
        else:
            shutil.move(fasta.get_path(), finale_path)
410
411
        with open(os.path.join(self.output_dir, "." + type_f), "w") as save_file:
            save_file.write(finale_path)
412
413
        return finale_path

414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
    def __get_filename_from_url(self, url):
        if url not in self._filename_for_url:
            if url.startswith("ftp://"):
                self._filename_for_url[url] = url.split("/")[-1]
            elif url.startswith("http://") or url.startswith("https://"):
                self._filename_for_url[url] = requests.head(url, allow_redirects=True).url.split("/")[-1]
            else:
                return None
        return self._filename_for_url[url]

    def _download_file(self, url):
        local_filename = os.path.join(self.output_dir, self.__get_filename_from_url(url))
        # NOTE the stream=True parameter
        if url.startswith("ftp://"):
            urlretrieve(url, local_filename)
        else:
            r = requests.get(url, stream=True)
            with open(local_filename, 'wb') as f:
                for chunk in r.iter_content(chunk_size=1024):
                    if chunk:  # filter out keep-alive new chunks
                        f.write(chunk)
                        # f.flush() commented by recommendation from J.F.Sebastian
        return local_filename

438
    def __getting_file_from_url(self, fasta: Fasta, type_f):
439
440
441
442
443
444
445
446
447
448
449
        """
        Download file from URL
        :param fasta: Fasta object describing the input file {Fasta}
        :param type_f: type of the file (query or target) {str}
        :return: Tuple:
            [0] True if no error happened, else False
            [1] If an error happened, True if the error was saved for the job, else False (will be saved later)
            [2] Finale path of the downloaded file {str}
            [3] Name of the downloaded file {str}
        """
        try:
450
451
            dl_path = self._download_file(fasta.get_path())
        except (ConnectionError, URLError):
452
453
454
455
456
457
458
459
460
461
462
            status = "fail"
            error = "<p>Url <b>%s</b> is not valid!</p>" \
                    "<p>If this is unattended, please contact the support.</p>" % fasta.get_path()
            if MODE == "webserver":
                with Job.connect():
                    job = Job.get(Job.id_job == self.id_job)
                    job.status = status
                    job.error = error
                    job.save()
            else:
                self.set_status_standalone(status, error)
463
            return False, True, None, None
464
465
466
467
468
469
        filename = os.path.basename(dl_path)
        name = os.path.splitext(filename.replace(".gz", ""))[0]
        finale_path = os.path.join(self.output_dir, type_f + "_" + filename)
        shutil.move(dl_path, finale_path)
        with open(os.path.join(self.output_dir, "." + type_f), "w") as save_file:
            save_file.write(finale_path)
470
        return True, False, finale_path, name
471
472

    def __check_url(self, fasta: Fasta):
473
        url = fasta.get_path()
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
        try:
            filename = self.__get_filename_from_url(url)
        except (ConnectionError, URLError):
            status = "fail"
            error = "<p>Url <b>%s</b> is not valid!</p>" \
                    "<p>If this is unattended, please contact the support.</p>" % fasta.get_path()
            if MODE == "webserver":
                with Job.connect():
                    job = Job.get(Job.id_job == self.id_job)
                    job.status = status
                    job.error = error
                    job.save()
            else:
                self.set_status_standalone(status, error)
            return False
489
        if filename is not None:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
490
            allowed = Functions.allowed_file(filename)
491
            if not allowed:
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
                status = "fail"
                error = "<p>File <b>%s</b> downloaded from <b>%s</b> is not a Fasta file!</p>" \
                        "<p>If this is unattended, please contact the support.</p>" % (filename, url)
                if MODE == "webserver":
                    with Job.connect():
                        job = Job.get(Job.id_job == self.id_job)
                        job.status = status
                        job.error = error
                        job.save()
                else:
                    self.set_status_standalone(status, error)
        else:
            allowed = False
            status = "fail"
            error = "<p>Url <b>%s</b> is not a valid URL!</p>" \
                    "<p>If this is unattended, please contact the support.</p>" % url
            if MODE == "webserver":
Floreal Cabanettes's avatar
Floreal Cabanettes committed
509
510
                with Job.connect():
                    job = Job.get(Job.id_job == self.id_job)
511
512
                    job.status = status
                    job.error = error
Floreal Cabanettes's avatar
Floreal Cabanettes committed
513
                    job.save()
514
515
            else:
                self.set_status_standalone(status, error)
516
517
        return allowed

518
519
520
521
522
    def clear(self):
        shutil.rmtree(self.output_dir)

    @staticmethod
    def get_pending_local_number():
523
524
525
526
527
528
        if MODE == "webserver":
            with Job.connect():
                return len(Job.select().where((Job.batch_type == "local") & (Job.status != "success") &
                                              (Job.status != "fail") & (Job.status != "no-match")))
        else:
            return 0
529

530
531
532
533
534
535
536
537
    def check_file(self, input_type, should_be_local, max_upload_size_readable):
        """

        :param input_type: query or target
        :param should_be_local: True if job should be treated locally
        :param max_upload_size_readable: max upload size human readable
        :return: (True if correct, True if error set [for fail], True if should be local)
        """
538
539
        if input_type == "target" and self.query is None:
            max_upload_size_readable = self.config.max_upload_size_ava / 1024 / 1024
Floreal Cabanettes's avatar
Floreal Cabanettes committed
540
541
542
543
        with Job.connect():
            my_input = getattr(self, input_type)
            if my_input.get_path().endswith(".gz") and not self.is_gz_file(my_input.get_path()):
                # Check file is correctly gzipped
544
545
546
547
548
549
550
551
552
                status = "fail"
                error = input_type + " file is not a correct gzip file"
                if MODE == "webserver":
                    job = Job.get(Job.id_job == self.id_job)
                    job.status = status
                    job.error = error
                    job.save()
                else:
                    self.set_status_standalone(status, error)
Floreal Cabanettes's avatar
Floreal Cabanettes committed
553
554
555
556
                self.clear()
                return False, True, None
            # Check size:
            file_size = self.get_file_size(my_input.get_path())
557
558
            if -1 < (self.config.max_upload_size if (input_type == "query" or self.query is not None)
                     else self.config.max_upload_size_ava) < file_size:
559
560
561
562
563
564
565
566
567
                status = "fail"
                error = input_type + " file exceed size limit of %d Mb (uncompressed)" % max_upload_size_readable
                if MODE == "webserver":
                    job = Job.get(Job.id_job == self.id_job)
                    job.status = status
                    job.error = error
                    job.save()
                else:
                    self.set_status_standalone(status, error)
Floreal Cabanettes's avatar
Floreal Cabanettes committed
568
569
570
571
                self.clear()
                return False, True, None
            if self.config.batch_system_type != "local" and file_size >= getattr(self.config, "min_%s_size" % input_type):
                should_be_local = False
572
573
574
        return True, False, should_be_local

    def download_files_with_pending(self, files_to_download, should_be_local, max_upload_size_readable):
Floreal Cabanettes's avatar
Floreal Cabanettes committed
575
        with Job.connect():
576
577
578
579
580
581
582
583
584
585
586
587
588
            status = "getfiles-waiting"
            if MODE == "webserver":
                job = Job.get(Job.id_job == self.id_job)
                job.status = status
                job.save()
                # Create a session:
                s_id = Session.new(True)
                session = Session.get(s_id=s_id)
            else:
                status = "getfiles"
                session = None
                job = None
                s_id = None
Floreal Cabanettes's avatar
Floreal Cabanettes committed
589

Floreal Cabanettes's avatar
Floreal Cabanettes committed
590
591
592
            try:
                correct = True
                error_set = False
593
594
595
596
                if MODE == "webserver":
                    allowed = session.ask_for_upload(True)
                else:
                    allowed = True
Floreal Cabanettes's avatar
Floreal Cabanettes committed
597
598
599
600
601
                while not allowed:
                    time.sleep(15)
                    session = Session.get(s_id=s_id)
                    allowed = session.ask_for_upload(False)
                if allowed:
602
603
604
                    if MODE == "webserver":
                        job.status = "getfiles"
                        job.save()
Floreal Cabanettes's avatar
Floreal Cabanettes committed
605
606
607
608
609
610
611
612
613
614
615
616
                    for file, input_type in files_to_download:
                        correct, error_set, finale_path, filename = self.__getting_file_from_url(file, input_type)
                        if not correct:
                            break
                        my_input = getattr(self, input_type)
                        my_input.set_path(finale_path)
                        my_input.set_name(filename)
                        correct, error_set, should_be_local = self.check_file(input_type, should_be_local,
                                                                              max_upload_size_readable)
                        if not correct:
                            break

617
                    if correct and MODE == "webserver" and job.batch_type != "local" and should_be_local \
Floreal Cabanettes's avatar
Floreal Cabanettes committed
618
619
620
621
622
623
                            and self.get_pending_local_number() < self.config.max_run_local:
                        job.batch_type = "local"
                        job.save()
                else:
                    correct = False
            except:  # Except all possible exceptions
624
                traceback.print_exc()
Floreal Cabanettes's avatar
Floreal Cabanettes committed
625
                correct = False
Floreal Cabanettes's avatar
Floreal Cabanettes committed
626
                error_set = False
627
628
            if MODE == "webserver":
                session.delete_instance()
Floreal Cabanettes's avatar
Floreal Cabanettes committed
629
            self._after_start(correct, error_set)
630

631
    def getting_files(self):
632
633
634
635
636
        """
        Get files for the job
        :return: Tuple:
            [0] True if getting files succeed, False else
            [1] If error happenned, True if error already saved for the job, False else (error will be saved later)
Floreal Cabanettes's avatar
Floreal Cabanettes committed
637
            [2] True if no data must be downloaded (will be downloaded with pending if True)
638
        """
Floreal Cabanettes's avatar
Floreal Cabanettes committed
639
        with Job.connect():
640
641
642
643
644
645
646
647
            status = "getfiles"
            if MODE == "webserver":
                job = Job.get(Job.id_job == self.id_job)
                job.status = status
                job.save()
            else:
                job = None
                self.set_status_standalone(status)
Floreal Cabanettes's avatar
Floreal Cabanettes committed
648
649
650
651
652
653
654
655
            correct = True
            should_be_local = True
            max_upload_size_readable = self.config.max_upload_size / 1024 / 1024  # Set it in Mb
            files_to_download = []
            if self.query is not None:
                if self.query.get_type() == "local":
                    self.query.set_path(self.__getting_local_file(self.query, "query"))
                    correct, error_set, should_be_local = self.check_file("query", should_be_local,
656
657
                                                                          max_upload_size_readable)
                    if not correct:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
658
                        return False, error_set, True
Floreal Cabanettes's avatar
Floreal Cabanettes committed
659
660
                elif self.__check_url(self.query):
                    files_to_download.append([self.query, "query"])
661
                else:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
662
                    return False, True, True
Floreal Cabanettes's avatar
Floreal Cabanettes committed
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
            if correct:
                if self.target is not None:
                    if self.target.get_type() == "local":
                        self.target.set_path(self.__getting_local_file(self.target, "target"))
                        correct, error_set, should_be_local = self.check_file("target", should_be_local,
                                                                              max_upload_size_readable)
                        if not correct:
                            return False, error_set, True
                    elif self.__check_url(self.target):
                        files_to_download.append([self.target, "target"])
                    else:
                        return False, True, True

            all_downloaded = True
            if correct :
                if len(files_to_download) > 0:
                    all_downloaded = False
                    thread = threading.Timer(0, self.download_files_with_pending,
                                             kwargs={"files_to_download": files_to_download,
                                                     "should_be_local": should_be_local,
                                                     "max_upload_size_readable": max_upload_size_readable})
                    thread.start()  # Start the execution

686
                elif correct and MODE == "webserver" and job.batch_type != "local" and should_be_local \
Floreal Cabanettes's avatar
Floreal Cabanettes committed
687
688
689
                        and self.get_pending_local_number() < self.config.max_run_local:
                    job.batch_type = "local"
                    job.save()
Floreal Cabanettes's avatar
Floreal Cabanettes committed
690
        return correct, False, all_downloaded
691

692
693
694
695
696
    def send_mail_post(self):
        """
        Send mail using POST url (we have no access to mailer)
        """
        key = Functions.random_string(15)
Floreal Cabanettes's avatar
Floreal Cabanettes committed
697
        key_file = os.path.join(self.config.app_data, self.id_job, ".key")
698
699
700
        with open(key_file, "w") as k_f:
            k_f.write(key)
        data = parse.urlencode({"key": key}).encode()
Floreal Cabanettes's avatar
Floreal Cabanettes committed
701
        req = request.Request(self.config.web_url + "/send-mail/" + self.id_job, data=data)
702
703
704
705
        resp = request.urlopen(req)
        if resp.getcode() != 200:
            print("Job %s: Send mail failed!" % self.id_job)

706
707
    def run_job_in_thread(self, batch_system_type="local"):
        thread = threading.Timer(1, self.run_job, kwargs={"batch_system_type": batch_system_type})
708
709
        thread.start()  # Start the execution

710
711
712
713
    def prepare_data_in_thread(self):
        thread = threading.Timer(1, self.prepare_data)
        thread.start()  # Start the execution

714
    def prepare_data_cluster(self, batch_system_type):
715
716
717
718
        args = [self.config.cluster_prepare_script,
                "-t", self.target.get_path(),
                "-m", self.target.get_name(),
                "-p", self.preptime_file]
719
        if self.query is not None:
720
721
722
            args += ["-q", self.query.get_path(),
                     "-u", self.get_query_split(),
                     "-n", self.query.get_name()]
723
724
            if self.tool.split_before:
                args.append("--split")
725
726
        return self.launch_to_cluster(step="prepare",
                                      batch_system_type=batch_system_type,
727
                                      command=self.config.cluster_python_exec,
728
729
730
731
732
                                      args=args,
                                      log_out=self.logs,
                                      log_err=self.logs)

    def prepare_data_local(self):
Floreal Cabanettes's avatar
Floreal Cabanettes committed
733
        with open(self.preptime_file, "w") as ptime, Job.connect():
734
735
736
737
738
739
740
741
            status = "preparing"
            if MODE == "webserver":
                job = Job.get(Job.id_job == self.id_job)
                job.status = status
                job.save()
            else:
                job = None
                self.set_status_standalone(status)
742
            ptime.write(str(round(time.time())) + "\n")
743
744
745
            error_tail = "Please check your input file and try again."
            if self.query is not None:
                fasta_in = self.query.get_path()
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
                if self.tool.split_before:
                    split = True
                    splitter = Splitter(input_f=fasta_in, name_f=self.query.get_name(), output_f=self.get_query_split(),
                                        query_index=self.query_index_split, debug=DEBUG)
                    success = splitter.split()
                    nb_contigs = splitter.nb_contigs
                    in_fasta = self.get_query_split()
                else:
                    split = False
                    uncompressed = None
                    if self.query.get_path().endswith(".gz"):
                        uncompressed = self.query.get_path()[:-3]
                    success, nb_contigs = index_file(self.query.get_path(), self.query.get_name(), self.idx_q,
                                                     uncompressed)
                    in_fasta = self.query.get_path()
                    if uncompressed is not None:
                        in_fasta = uncompressed
                if success:
764
765
                    filtered_fasta = os.path.join(os.path.dirname(self.get_query_split()), "filtered_" +
                                                  os.path.basename(self.get_query_split()))
766
767
                    filter_f = Filter(fasta=in_fasta,
                                      index_file=self.query_index_split if split else self.idx_q,
768
                                      type_f="query",
769
                                      min_filtered=round(nb_contigs / 4),
770
771
772
773
774
                                      split=True,
                                      out_fasta=filtered_fasta,
                                      replace_fa=True)
                    filter_f.filter()
                else:
775
776
777
778
779
780
781
782
                    status = "fail"
                    error = "<br/>".join(["Query fasta file is not valid!", error_tail])
                    if MODE == "webserver":
                        job.status = status
                        job.error = error
                        job.save()
                    else:
                        self.set_status_standalone(status, error)
783
784
785
                    if self.config.send_mail_status:
                        self.send_mail_post()
                    return False
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
            uncompressed = None
            if self.target.get_path().endswith(".gz"):
                uncompressed = self.target.get_path()[:-3]
            success, nb_contigs = index_file(self.target.get_path(), self.target.get_name(), self.idx_t, uncompressed)
            if success:
                in_fasta = self.target.get_path()
                if uncompressed is not None:
                    in_fasta = uncompressed
                filtered_fasta = os.path.join(os.path.dirname(in_fasta), "filtered_" + os.path.basename(in_fasta))
                filter_f = Filter(fasta=in_fasta,
                                  index_file=self.idx_t,
                                  type_f="target",
                                  min_filtered=round(nb_contigs / 4),
                                  split=False,
                                  out_fasta=filtered_fasta,
                                  replace_fa=True)
                is_filtered = filter_f.filter()
                if uncompressed is not None:
                    if is_filtered:
                        os.remove(self.target.get_path())
                        self.target.set_path(uncompressed)
                        with open(os.path.join(self.output_dir, ".target"), "w") as save_file:
                            save_file.write(uncompressed)
                    else:
                        os.remove(uncompressed)
            else:
                if uncompressed is not None:
                    try:
                        os.remove(uncompressed)
                    except FileNotFoundError:
                        pass
817
818
819
820
821
822
823
824
                status = "fail"
                error = "<br/>".join(["Target fasta file is not valid!", error_tail])
                if MODE == "webserver":
                    job.status = status
                    job.error = error
                    job.save()
                else:
                    self.set_status_standalone(status, error)
825
826
827
                if self.config.send_mail_status:
                    self.send_mail_post()
                return False
828
            ptime.write(str(round(time.time())) + "\n")
829
830
831
832
833
834
835
            status = "prepared"
            if MODE == "webserver":
                job.status = status
                job.save()
            else:
                self.set_status_standalone(status)
                self.run_job("local")
836

837
    def prepare_data(self):
838
839
840
841
842
843
844
845
846
        if MODE == "webserver":
            with Job.connect():
                job = Job.get(Job.id_job == self.id_job)
                if job.batch_type == "local":
                    self.prepare_data_local()
                else:
                    self.prepare_data_cluster(job.batch_type)
        else:
            self.prepare_data_local()
847

848
849
    def run_job(self, batch_system_type):
        success = False
850
851
        if batch_system_type == "local":
            success = self.__launch_local()
852
853
        elif batch_system_type in ["slurm", "sge"]:
            success = self.__launch_drmaa(batch_system_type)
854
        if success:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
855
            with Job.connect():
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
                if MODE == "webserver":
                    job = Job.get(Job.id_job == self.id_job)
                    with open(self.logs) as logs:
                        measures = logs.readlines()[-1].strip("\n").split(" ")
                        map_elapsed = round(float(measures[0]))
                        job.mem_peak = int(measures[1])
                    with open(self.preptime_file) as ptime:
                        lines = ptime.readlines()
                        start = int(lines[0].strip("\n"))
                        end = int(lines[1].strip("\n"))
                        prep_elapsed = end - start
                        job.time_elapsed = prep_elapsed + map_elapsed
                else:
                    job = None
                status = "merging"
                if MODE == "webserver":
                    job.status = "merging"
                    job.save()
                else:
                    self.set_status_standalone(status)
876
                if self.tool.split_before and self.query is not None:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
877
878
879
880
                    start = time.time()
                    paf_raw = self.paf_raw + ".split"
                    os.remove(self.get_query_split())
                    merger = Merger(self.paf_raw, paf_raw, self.query_index_split,
881
                                    self.idx_q, debug=DEBUG)
Floreal Cabanettes's avatar
Floreal Cabanettes committed
882
883
884
885
886
                    merger.merge()
                    os.remove(self.paf_raw)
                    os.remove(self.query_index_split)
                    self.paf_raw = paf_raw
                    end = time.time()
887
888
                    if MODE == "webserver":
                        job.time_elapsed += end - start
889
                elif self.query is None:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
890
891
                    shutil.copyfile(self.idx_t, self.idx_q)
                    Path(os.path.join(self.output_dir, ".all-vs-all")).touch()
892
893
894
895
896
                if self.tool.parser is not None:
                    paf_raw = self.paf_raw + ".parsed"
                    dgenies.lib.parsers.__dict__.get(self.tool.parser)(self.paf_raw, paf_raw)
                    os.remove(self.paf_raw)
                    self.paf_raw = paf_raw
Floreal Cabanettes's avatar
Floreal Cabanettes committed
897
898
                sorter = Sorter(self.paf_raw, self.paf)
                sorter.sort()
899
                os.remove(self.paf_raw)
Floreal Cabanettes's avatar
Floreal Cabanettes committed
900
901
                if self.target is not None and os.path.exists(self.target.get_path()):
                    os.remove(self.target.get_path())
902
903
904
905
906
907
908
909
                if os.path.isfile(os.path.join(self.output_dir, ".do-sort")):
                    paf = Paf(paf=self.paf,
                              idx_q=self.idx_q,
                              idx_t=self.idx_t,
                              auto_parse=False)
                    paf.sort()
                    if not paf.parsed:
                        success = False
910
911
912
913
914
915
916
917
                        status = "fail"
                        error = "Error while sorting query. Please contact us to report the bug"
                        if MODE == "webserver":
                            job = Job.get(Job.id_job == self.id_job)
                            job.status = status
                            job.error = error
                        else:
                            self.set_status_standalone(status, error)
918
                if success:
919
920
921
922
923
924
925
926
                    status = "success"
                    if MODE == "webserver":
                        job = Job.get(Job.id_job == self.id_job)
                        job.status = "success"
                        job.save()
                    else:
                        self.set_status_standalone(status)
        if MODE == "webserver" and self.config.send_mail_status:
927
928
            self.send_mail_post()

929
    def _after_start(self, success, error_set):
Floreal Cabanettes's avatar
Floreal Cabanettes committed
930
931
        with Job.connect():
            if success:
932
933
                status = "waiting"
                if MODE == "webserver":
Floreal Cabanettes's avatar
Floreal Cabanettes committed
934
                    job = Job.get(Job.id_job == self.id_job)
935
                    job.status = status
Floreal Cabanettes's avatar
Floreal Cabanettes committed
936
                    job.save()
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
                else:
                    self.set_status_standalone("waiting")
                    self.prepare_data_in_thread()
            else:
                if not error_set:
                    status = "fail"
                    error = "<p>Error while getting input files. Please contact the support to report the bug.</p>"
                    if MODE == "webserver":
                        job = Job.get(Job.id_job == self.id_job)
                        job.status = status
                        job.error = error
                        job.save()
                    else:
                        self.set_status_standalone(status, error)
                if MODE == "webserver" and self.config.send_mail_status:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
952
                    self.send_mail()
953

954
    def start_job(self):
955
        try:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
956
957
958
            success, error_set, all_downloaded = self.getting_files()
            if not success or all_downloaded:
                self._after_start(success, error_set)
959

960
961
        except Exception:
            print(traceback.print_exc())
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
            error = "<p>An unexpected error has occurred. Please contact the support to report the bug.</p>"
            if MODE == "webserver":
                with Job.connect():
                    job = Job.get(Job.id_job == self.id_job)
                    job.status = "fail"
                    job.error = error
                    job.save()
                    if self.config.send_mail_status:
                        self.send_mail()
            else:
                self.set_status_standalone("fail", error)

    def launch_standalone(self):
        if not os.path.exists(self.output_dir):
            os.mkdir(self.output_dir)
        self.set_status_standalone("submitted")
        thread = threading.Timer(1, self.start_job)
        thread.start()
980

981
    def launch(self):
Floreal Cabanettes's avatar
Floreal Cabanettes committed
982
983
984
985
986
987
988
989
        with Job.connect():
            j1 = Job.select().where(Job.id_job == self.id_job)
            if len(j1) > 0:
                print("Old job found without result dir existing: delete it from BDD!")
                for j11 in j1:
                    j11.delete_instance()
            if self.target is not None:
                job = Job.create(id_job=self.id_job, email=self.email, batch_type=self.config.batch_system_type,
990
                                 date_created=datetime.now(), tool=self.tool.name)
Floreal Cabanettes's avatar
Floreal Cabanettes committed
991
992
993
994
995
996
997
                job.save()
                if not os.path.exists(self.output_dir):
                    os.mkdir(self.output_dir)
                thread = threading.Timer(1, self.start_job)
                thread.start()
            else:
                job = Job.create(id_job=self.id_job, email=self.email, batch_type=self.config.batch_system_type,
998
                                 date_created=datetime.now(), tool=self.tool.name, status="fail")
Floreal Cabanettes's avatar
Floreal Cabanettes committed
999
                job.save()
1000

1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
    def set_status_standalone(self, status, error=""):
        status_file = os.path.join(self.output_dir, ".status")
        with open(status_file, "w") as s_file:
            s_file.write("|".join([status, error]))

    def get_status_standalone(self, with_error=False):
        status_file = os.path.join(self.output_dir, ".status")
        with open(status_file, "r") as s_file:
            items = s_file.read().strip("\n").split("|")
            if with_error:
                return items
            return items[0]

1014
    def status(self):
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
        if MODE == "webserver":
            try:
                with Job.connect():
                    job = Job.get(Job.id_job == self.id_job)
                    return {"status": job.status, "mem_peak": job.mem_peak, "time_elapsed": job.time_elapsed,
                            "error": job.error}
            except DoesNotExist:
                return {"status": "unknown", "error": ""}
        else:
            try:
                status, error = self.get_status_standalone(True)
                return {"status": status, "mem_peak": None, "time_elapsed": None, "error": error}
            except FileNotFoundError:
                return {"status": "unknown", "error": ""}
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044

    def delete(self):
        if not os.path.exists(self.output_dir) or not os.path.isdir(self.output_dir):
            return False, "Job does not exists"
        if MODE == "webserver":
            try:
                job = Job.get(id_job=self.id_job)
            except DoesNotExist:
                pass
            else:
                is_gallery = Gallery.select().where(Gallery.job == job)
                if is_gallery:
                    return False, "Delete a job that is in gallery is forbidden"
                job.delete_instance()
        shutil.rmtree(self.output_dir)
        return True, ""