job_manager.py 68.4 KB
Newer Older
1
2
from dgenies import MODE, DEBUG

3
import os
4
import shutil
5
import subprocess
6
from datetime import datetime
7
import time
8
import threading
9
import re
10
from dgenies.config_reader import AppConfigReader
11
from dgenies.tools import Tools
12
13
import dgenies.lib.validators as validators
import dgenies.lib.parsers as parsers
14
15
from .fasta import Fasta
from .functions import Functions
16
import requests
17
from requests.exceptions import ConnectionError
18
19
from urllib.request import urlretrieve
from urllib.error import URLError
20
from jinja2 import Template
21
import traceback
22
from pathlib import Path
23
from urllib import request, parse
24
import tarfile
25
from dgenies.bin.split_fa import Splitter
26
from dgenies.bin.index import index_file, Index
27
from dgenies.bin.filter_contigs import Filter
28
29
from dgenies.bin.merge_splitted_chrms import Merger
from dgenies.bin.sort_paf import Sorter
30
from dgenies.lib.paf import Paf
31
32
import gzip
import io
33
import binascii
34
35
36
from dgenies.database import Job

if MODE == "webserver":
37
    from dgenies.database import Session, Gallery
38
    from peewee import DoesNotExist
39
40
41


class JobManager:
42
43
44
    """
    Jobs management
    """
45

46
    def __init__(self, id_job, email=None, query: Fasta=None, target: Fasta=None, mailer=None,
47
                 tool="minimap2", align: Fasta=None, backup: Fasta=None):
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
        """

        :param id_job: job id
        :type id_job: str
        :param email: email from user
        :type email: str
        :param query: query fasta
        :type query: Fasta
        :param target: target fasta
        :type target: Fasta
        :param mailer: mailer object (to send mail throw flask app)
        :type mailer: Mailer
        :param tool: tool to use for mapping (choice from tools config)
        :type tool: str
        :param align: alignment file (PAF, MAF, ...) as a fasta object
        :type align: Fasta
        :param backup: backup TAR file
        :type backup: Fasta
        """
67
68
        self.id_job = id_job
        self.email = email
69
70
        self.query = query
        self.target = target
71
72
73
        self.align = align
        if align is not None:
            self.aln_format = os.path.splitext(align.get_path())[1][1:]
74
        self.backup = backup
75
        self.error = ""
76
        self.id_process = "-1"
77
        # Get configs:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
78
        self.config = AppConfigReader()
79
        self.tools = Tools().tools
80
        self.tool = self.tools[tool] if tool is not None else None
81
        # Outputs:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
82
        self.output_dir = os.path.join(self.config.app_data, id_job)
83
        self.preptime_file = os.path.join(self.output_dir, "prep_times")
84
        self.query_index_split = os.path.join(self.output_dir, "query_split.idx")
85
        self.paf = os.path.join(self.output_dir, "map.paf")
86
        self.paf_raw = os.path.join(self.output_dir, "map_raw.paf")
87
88
        self.idx_q = os.path.join(self.output_dir, "query.idx")
        self.idx_t = os.path.join(self.output_dir, "target.idx")
89
        self.logs = os.path.join(self.output_dir, "logs.txt")
90
        self.mailer = mailer
91
        self._filename_for_url = {}
92

93
94
    def do_align(self):
        """
95
96
        Check if we have to make alignment

97
98
99
100
        :return: True if the job is launched with an alignment file
        """
        return not os.path.exists(os.path.join(self.output_dir, ".align"))

101
102
    @staticmethod
    def is_gz_file(filepath):
103
104
105
106
107
108
109
        """
        Check if a file is gzipped

        :param filepath: file to check
        :type filepath: str
        :return: True if gzipped, else False
        """
110
111
112
        with open(filepath, 'rb') as test_f:
            return binascii.hexlify(test_f.read(2)) == b'1f8b'

113
    def get_file_size(self, filepath: str):
114
115
116
117
118
119
120
121
        """
        Get file size

        :param filepath: file path
        :type filepath: str
        :return: file size (bytes)
        :rtype: int
        """
122
123
        file_size = os.path.getsize(filepath)
        if filepath.endswith(".gz") and file_size <= self.config.max_upload_size:
124
125
            with gzip.open(filepath, 'rb') as file_obj:
                file_size = file_obj.seek(0, io.SEEK_END)
126
127
        return file_size

128
    def get_query_split(self):
129
130
131
132
133
134
        """
        Get query split fasta file

        :return: split query fasta file
        :rtype: str
        """
135
136
        if not self.tool.split_before:
            return self.query.get_path()
137
138
139
140
141
        query_split = os.path.join(self.output_dir, "split_" + os.path.basename(self.query.get_path()))
        if query_split.endswith(".gz"):
            return query_split[:-3]
        return query_split

142
    def set_inputs_from_res_dir(self):
143
144
145
        """
        Sets inputs (query, target, ...) from job dir
        """
Floreal Cabanettes's avatar
Floreal Cabanettes committed
146
        res_dir = os.path.join(self.config.app_data, self.id_job)
147
148
149
150
151
        query_file = os.path.join(res_dir, ".query")
        if os.path.exists(query_file):
            with open(query_file) as q_f:
                file_path = q_f.readline()
                self.query = Fasta(
152
153
                    name="target" if file_path.endswith(".idx") else
                         os.path.splitext(os.path.basename(file_path.replace(".gz", "")).split("_", 1)[1])[0],
154
                    path=file_path,
155
156
157
158
159
160
161
                    type_f="local"
                )
        target_file = os.path.join(res_dir, ".target")
        if os.path.exists(target_file):
            with open(target_file) as t_f:
                file_path = t_f.readline()
                self.target = Fasta(
162
163
                    name="query" if file_path.endswith(".idx") else
                         os.path.splitext(os.path.basename(file_path.replace(".gz", "")).split("_", 1)[1])[0],
164
165
166
                    path=file_path,
                    type_f="local"
                )
167
168
169
170
171
172
173
174
175
176
        align_file = os.path.join(res_dir, ".align")
        if os.path.exists(align_file):
            with open(align_file) as a_f:
                file_path = a_f.readline()
                self.align = Fasta(
                    name="map",
                    path=file_path,
                    type_f="local"
                )
                self.aln_format = os.path.splitext(file_path)[1][1:]
177

178
    def check_job_success(self):
179
180
181
182
183
184
        """
        Check if a job succeed

        :return: status of a job: succeed, no-match or fail
        :rtype: str
        """
185
186
        if os.path.exists(self.paf_raw):
            if os.path.getsize(self.paf_raw) > 0:
187
                return "succeed"
188
189
            else:
                return "no-match"
Floreal Cabanettes's avatar
Floreal Cabanettes committed
190
        return "fail"
191

192
    def is_query_filtered(self):
193
194
195
196
197
        """
        Check if query has been filtered

        :return: True if filtered, else False
        """
198
199
200
        return os.path.exists(os.path.join(self.output_dir, ".filter-query"))

    def is_target_filtered(self):
201
202
203
204
205
206
        """
        Check if target has been filtered

        :return: True if filtered, else False
        :return:
        """
207
208
        return os.path.exists(os.path.join(self.output_dir, ".filter-target"))

209
    def get_mail_content(self, status, target_name, query_name=None):
210
211
212
213
214
215
216
217
218
219
220
221
        """
        Build mail content for status mail

        :param status: job status
        :type status: str
        :param target_name: name of target
        :type target_name: str
        :param query_name:  name of query
        :type query_name: str
        :return: mail content
        :rtype: str
        """
222
        message = "D-Genies\n\n"
223
        if status == "success":
Floreal Cabanettes's avatar
Floreal Cabanettes committed
224
            message += "Your job %s was completed successfully!\n\n" % self.id_job
225
            message += str("Your job {0} is finished. You can see  the results by clicking on the link below:\n"
Floreal Cabanettes's avatar
Floreal Cabanettes committed
226
                           "{1}/result/{0}\n\n").format(self.id_job, self.config.web_url)
227
228
        else:
            message += "Your job %s has failed!\n\n" % self.id_job
229
230
            if self.error != "":
                message += self.error.replace("#ID#", self.id_job).replace("<br/>", "\n")
231
                message += "\n\n"
232
233
234
            else:
                message += "Your job %s has failed. You can try again. " \
                           "If the problem persists, please contact the support.\n\n" % self.id_job
Floreal Cabanettes's avatar
Floreal Cabanettes committed
235
236
237
238
239
240
        if target_name is not None:
            message += "Sequences compared in this analysis:\n"
            if query_name is not None:
                message += "Target: %s\nQuery: %s\n\n" % (target_name, query_name)
            else:
                message += "Target: %s\n\n" % target_name
241
242
243
244
245
246
247
248
249
        if status == "success":
            if self.is_target_filtered():
                message += str("Note: target fasta has been filtered because it contains too small contigs."
                               "To see which contigs has been removed from the analysis, click on the link below:\n"
                               "{1}/filter-out/{0}/target\n\n").format(self.id_job, self.config.web_url)
            if self.is_query_filtered():
                message += str("Note: query fasta has been filtered because it contains too small contigs."
                               "To see which contigs has been removed from the analysis, click on the link below:\n"
                               "{1}/filter-out/{0}/query\n\n").format(self.id_job, self.config.web_url)
250
251
        message += "See you soon on D-Genies,\n"
        message += "The team"
252
        return message
253

254
    def get_mail_content_html(self, status, target_name, query_name=None):
255
256
257
258
259
260
261
262
263
264
265
266
        """
        Build mail content as HTML

        :param status: job status
        :type status: str
        :param target_name: name of target
        :type target_name: str
        :param query_name:  name of query
        :type query_name: str
        :return: mail content (html)
        :rtype: str
        """
267
        with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), "mail_templates", "job_notification.html"))\
Floreal Cabanettes's avatar
Floreal Cabanettes committed
268
269
                as t_file:
            template = Template(t_file.read())
Floreal Cabanettes's avatar
Floreal Cabanettes committed
270
            return template.render(job_name=self.id_job, status=status, url_base=self.config.web_url,
271
                                   query_name=query_name if query_name is not None else "",
Floreal Cabanettes's avatar
Floreal Cabanettes committed
272
                                   target_name=target_name if target_name is not None else "",
273
274
                                   error=self.error,
                                   target_filtered=self.is_target_filtered(), query_filtered=self.is_query_filtered())
275

276
    def get_mail_subject(self, status):
277
278
279
280
281
282
283
284
285
        """
        Build mail subject

        :param status: job status
        :type status: str
        :return: mail subject
        :rtype: str
        """

286
        if status == "success" or status == "no-match":
Floreal Cabanettes's avatar
Floreal Cabanettes committed
287
            return "DGenies - Job completed: %s" % self.id_job
288
        else:
289
            return "DGenies - Job failed: %s" % self.id_job
290

291
    def send_mail(self):
292
293
294
        """
        Send mail
        """
295
        # Retrieve infos:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
296
297
298
299
300
301
        with Job.connect():
            job = Job.get(Job.id_job == self.id_job)
            if self.email is None:
                self.email = job.email
            status = job.status
            self.error = job.error
302

Floreal Cabanettes's avatar
Floreal Cabanettes committed
303
304
305
306
307
308
309
310
311
312
            target_name = None
            if os.path.exists(self.idx_t):
                with open(self.idx_t, "r") as idxt:
                    target_name = idxt.readline().rstrip()
            query_name = None
            if os.path.exists(self.idx_q):
                with open(self.idx_q, "r") as idxq:
                    query_name = idxq.readline().rstrip()
                    if query_name == target_name:
                        query_name = None
313

Floreal Cabanettes's avatar
Floreal Cabanettes committed
314
            # Send:
315
316
            self.mailer.send_mail(recipients=[self.email],
                                  subject=self.get_mail_subject(status),
317
318
                                  message=self.get_mail_content(status, target_name, query_name),
                                  message_html=self.get_mail_content_html(status, target_name, query_name))
319

320
    def search_error(self):
321
322
323
324
325
326
        """
        Search for an error in the log file (for local runs). If no error found, returns a generic error message

        :return: error message to give to the user
        :rtype: str
        """
327
328
        logs = os.path.join(self.output_dir, "logs.txt")
        if os.path.exists(logs):
329
330
331
332
333
            lines = subprocess.check_output(['tail', '-2', logs]).decode("utf-8").split("\n")
            if re.match(r"\[morecore] \d+ bytes requested but not available.", lines[1]) or \
                    re.match(r"\[morecore] \d+ bytes requested but not available.", lines[1]) or \
                    re.match(r"\[morecore] insufficient memory", lines[0]) or \
                    re.match(r"\[morecore] insufficient memory", lines[1]):
334
335
336
337
                return "Your job #ID# has failed because of memory limit exceeded. May be your sequences are too big?" \
                       "<br/>You can contact the support for more information."
        return "Your job #ID# has failed. You can try again.<br/>If the problem persists, please contact the support."

338
339
340
341
342
343
344
    def _launch_local(self):
        """
        Launch a job on the current machine

        :return: True if job succeed, else False
        :rtype: bool
        """
345
346
347
348
        if MODE == "webserver":
            cmd = ["/usr/bin/time", "-f", "%e %M"]
        else:
            cmd = []
349
        if self.query is not None:
350
            command_line = self.tool.command_line.replace("{query}", self.query.get_path())
351
        else:
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
            command_line = self.tool.all_vs_all
        out_file = None
        if ">" in command_line:
            out_file = self.paf_raw
            command_line = command_line[:command_line.index(">")]
        command_line = command_line.replace("{exe}", self.tool.exec) \
                                   .replace("{target}", self.target.get_path()) \
                                   .replace("{threads}", str(self.tool.threads)) \
                                   .replace("{out}", self.paf_raw)

        cmd += command_line.split(" ")
        if out_file is None:
            with open(self.logs, "w") as logs:
                p = subprocess.Popen(cmd, stdout=logs, stderr=logs)
        else:
            with open(self.logs, "w") as logs, open(out_file, "w") as out:
                p = subprocess.Popen(cmd, stdout=out, stderr=logs)
Floreal Cabanettes's avatar
Floreal Cabanettes committed
369
        with Job.connect():
370
371
372
373
374
375
376
377
378
            status = "started"
            if MODE == "webserver":
                job = Job.get(Job.id_job == self.id_job)
                job.id_process = p.pid
                job.status = status
                job.save()
            else:
                job = None
                self.set_status_standalone(status)
Floreal Cabanettes's avatar
Floreal Cabanettes committed
379
380
381
            p.wait()
            if p.returncode == 0:
                status = self.check_job_success()
382
383
384
385
386
                if MODE == "webserver":
                    job.status = status
                    job.save()
                else:
                    self.set_status_standalone(status)
387
                return status == "succeed"
Floreal Cabanettes's avatar
Floreal Cabanettes committed
388
            self.error = self.search_error()
389
390
391
392
393
394
395
            status = "fail"
            if MODE == "webserver":
                job.status = status
                job.error = self.error
                job.save()
            else:
                self.set_status_standalone(status, self.error)
396
        return False
397

398
399
400
    def check_job_status_slurm(self):
        """
        Check status of a SLURM job run
401
402

        :return: True if the job has successfully ended, else False
403
        """
Floreal Cabanettes's avatar
Floreal Cabanettes committed
404
        status = subprocess.check_output("sacct -p -n --format=state,maxvmsize,elapsed -j %s.batch" % self.id_process,
405
                                         shell=True).decode("utf-8").strip("\n")
406

407
        status = status.split("|")
408

409
410
411
        success = status[0] == "COMPLETED"
        if success:
            mem_peak = int(status[1][:-1])  # Remove the K letter
Floreal Cabanettes's avatar
Floreal Cabanettes committed
412
            elapsed_full = list(map(int, status[2].split(":")))
413
414
415
            elapsed = elapsed_full[0] * 3600 + elapsed_full[1] * 60 + elapsed_full[2]
            with open(self.logs, "a") as logs:
                logs.write("%s %d" % (elapsed, mem_peak))
416
417
418
419
420
421

        return success

    def check_job_status_sge(self):
        """
        Check status of a SGE job run
422
423

        :return: True if the job jas successfully ended, else False
424
        """
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
        status = "-1"
        start = None
        end = None
        mem_peak = None
        acct = subprocess.check_output("qacct -d 1 -j %s" % self.id_process,
                                         shell=True).decode("utf-8")
        lines = acct.split("\n")
        for line in lines:
            if line.startswith("failed"):
                status = re.split(r"\s+", line, 1)[1]
            elif line.startswith("start_time"):
                start = datetime.strptime(re.split(r"\s+", line, 1)[1], "%a %b %d %H:%M:%S %Y")
            elif line.startswith("end_time"):
                end = datetime.strptime(re.split(r"\s+", line, 1)[1], "%a %b %d %H:%M:%S %Y")
            elif line.startswith("maxvmem"):
                mem_peak = re.split(r"\s+", line, 1)[1]
                if mem_peak.endswith("G"):
                    mem_peak = int(mem_peak[-1]) * 1024 * 1024
                elif mem_peak.endswith("M"):
                    mem_peak = int(mem_peak[-1]) * 1024

        if status == "0":
            if start is not None and end is not None and mem_peak is not None:
                elapsed = end - start
                elapsed = elapsed.seconds
                with open(self.logs, "a") as logs:
                    logs.write("%s %d" % (elapsed, mem_peak))
452
453
454
455

        return status == "0"

    def update_job_status(self, status, id_process=None):
456
457
458
459
460
461
        """
        Update job status

        :param status: new status
        :param id_process: system process id
        """
462
463
464
465
466
467
468
469
470
        if MODE == "webserver":
            with Job.connect():
                job = Job.get(Job.id_job == self.id_job)
                job.status = status
                if id_process is not None:
                    job.id_process = id_process
                job.save()
        else:
            self.set_status_standalone(status)
471

472
473
    @staticmethod
    def find_error_in_log(log_file):
474
475
476
477
478
479
480
        """
        Find error in log (for cluster run)

        :param log_file: log file of the job
        :return: error (empty if no error)
        :rtype: str
        """
481
482
        error = ""
        with open(log_file, "r") as log:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
483
484
485
486
            for line in log:
                if line.startswith("###ERR### "):
                    error = line[10:].rstrip()
                    break
487
488
        return error

489
    def launch_to_cluster(self, step, batch_system_type, command, args, log_out, log_err):
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
        """
        Launch a program to the cluster

        :param step: step (prepare, start)
        :type step: str
        :param batch_system_type: slurm or sge
        :type batch_system_type: str
        :param command: program to launch (without arguments)
        :type command: str
        :param args: arguments to use for the program
        :type args: list
        :param log_out: log file for stdout
        :type log_out: str
        :param log_err: log file for stderr
        :type log_err: str
        :return: True if succeed, else False
        :rtype: bool
        """
508
        import drmaa
509
        from dgenies.lib.drmaasession import DrmaaSession
510
511
512
        drmaa_session = DrmaaSession()
        s = drmaa_session.session
        jt = s.createJobTemplate()
513
514
        jt.remoteCommand = command
        jt.args = args
515
        jt.jobName = "_".join([step[:2], self.id_job])
516
517
        if log_out == log_err:
            jt.joinFiles = True
Floreal Cabanettes's avatar
Floreal Cabanettes committed
518
            jt.outputPath = ":" + log_out
519
        else:
520
521
522
            jt.joinFiles = False
            jt.outputPath = ":" + log_out
            jt.errorPath = ":" + log_err
523

524
525
526
527
528
529
530
531
532
533
534
535
        if step == "start":
            memory = self.config.cluster_memory
            if self.query is None:
                memory = self.config.cluster_memory_ava
                if memory > 32:
                    name, order, contigs, reversed_c, abs_start, c_len = Index.load(self.idx_t, False)
                    if c_len <= 500000000:
                        memory = 32
            if memory > self.tool.max_memory:
                memory = self.tool.max_memory
        else:
            memory = 8000
536

537
538
539
        native_specs = self.config.drmaa_native_specs
        if batch_system_type == "slurm":
            if native_specs == "###DEFAULT###":
Floreal Cabanettes's avatar
Floreal Cabanettes committed
540
                native_specs = "--mem-per-cpu={0} --mincpus={1} -N 1=1 --time={2}"
541
            if step == "prepare":
542
                jt.nativeSpecification = native_specs.format(memory, 1, "02:00:00")
543
            elif step == "start":
544
545
                jt.nativeSpecification = native_specs.format(memory // self.tool.threads_cluster * 1000,
                                                             self.tool.threads_cluster, "02:00:00")
546
        elif batch_system_type == "sge":
547
548
            if native_specs == "###DEFAULT###":
                native_specs = "-l mem={0},h_vmem={0} -pe parallel_smp {1}"
549
550
551
            if step == "prepare":
                jt.nativeSpecification = native_specs.format(8000, 1)
            elif step == "start":
552
                jt.nativeSpecification = native_specs.format(
553
                    memory // self.tool.threads_cluster * 1000, self.tool.threads_cluster)
554
555
556
557
        jt.workingDirectory = self.output_dir
        jobid = s.runJob(jt)
        self.id_process = jobid

558
        self.update_job_status("scheduled-cluster" if step == "start" else "prepare-scheduled", jobid)
559
560
561

        retval = s.wait(jobid, drmaa.Session.TIMEOUT_WAIT_FOREVER)
        if retval.hasExited and (self.check_job_status_slurm() if batch_system_type == "slurm" else
562
563
564
565
566
        self.check_job_status_sge()):
            if step == "start":
                status = self.check_job_success()
            else:
                status = "prepared"
567
568
569
570
571
            # job = Job.get(id_job=self.id_job)
            # job.status = status
            # db.commit()
            self.update_job_status(status)
            s.deleteJobTemplate(jt)
572
            return status == "succeed" or status == "prepared"
573
        error = self.find_error_in_log(log_err)
574
575
        if step == "prepare":
            error += "<br/>Please check your input file and try again."
576
577
        self.set_job_status("fail", error)

578
579
580
        s.deleteJobTemplate(jt)
        return False

581
582
583
584
585
586
587
588
    def _launch_drmaa(self, batch_system_type):
        """
        Launch the mapping step to a cluster

        :param batch_system_type: slurm or sge
        :return: True if job succeed, else False
        """

589
        if self.query is not None:
590
            args = re.sub("{exe}\s?", "", self.tool.command_line).replace("{query}", self.get_query_split())
591
        else:
592
593
594
595
596
            args = re.sub("{exe}\s?", "", self.tool.all_vs_all)
        out_file = self.logs
        if ">" in args:
            out_file = self.paf_raw
            args = args[:args.index(">")]
Floreal Cabanettes's avatar
Floreal Cabanettes committed
597
598
599
        args = args.replace("{target}", self.target.get_path()) \
                   .replace("{threads}", str(self.tool.threads_cluster)) \
                   .replace("{out}", self.paf_raw)
600
601
602

        args = args.split(" ")

603
604
        return self.launch_to_cluster(step="start",
                                      batch_system_type=batch_system_type,
605
                                      command=self.tool.exec,
606
                                      args=args,
607
                                      log_out=out_file,
608
609
                                      log_err=self.logs)

610
611
612
613
614
615
616
617
618
619
620
    def _getting_local_file(self, fasta, type_f):
        """
        Copy temp file to its final location

        :param fasta: fasta file Object
        :type fasta: Fasta
        :param type_f: query or target
        :type type_f: str
        :return: final full path of the file
        :rtype: str
        """
621
        finale_path = os.path.join(self.output_dir, type_f + "_" + os.path.basename(fasta.get_path()))
622
623
624
625
        if fasta.is_example():
            shutil.copy(fasta.get_path(), finale_path)
        else:
            shutil.move(fasta.get_path(), finale_path)
626
627
        with open(os.path.join(self.output_dir, "." + type_f), "w") as save_file:
            save_file.write(finale_path)
628
629
        return finale_path

630
631
632
633
634
635
636
637
638
    def _get_filename_from_url(self, url):
        """
        Retrieve filename from an URL (http or ftp)

        :param url: url of the file to download
        :type url: str
        :return: filename
        :rtype: str
        """
639
640
641
642
643
644
645
646
647
648
        if url not in self._filename_for_url:
            if url.startswith("ftp://"):
                self._filename_for_url[url] = url.split("/")[-1]
            elif url.startswith("http://") or url.startswith("https://"):
                self._filename_for_url[url] = requests.head(url, allow_redirects=True).url.split("/")[-1]
            else:
                return None
        return self._filename_for_url[url]

    def _download_file(self, url):
649
650
651
652
653
654
655
656
657
        """
        Download a file from an URL

        :param url: url of the file to download
        :type url: str
        :return: absolute path of the downloaded file
        :rtype: str
        """
        local_filename = os.path.join(self.output_dir, self._get_filename_from_url(url))
658
659
660
661
662
663
664
665
666
667
668
669
        # NOTE the stream=True parameter
        if url.startswith("ftp://"):
            urlretrieve(url, local_filename)
        else:
            r = requests.get(url, stream=True)
            with open(local_filename, 'wb') as f:
                for chunk in r.iter_content(chunk_size=1024):
                    if chunk:  # filter out keep-alive new chunks
                        f.write(chunk)
                        # f.flush() commented by recommendation from J.F.Sebastian
        return local_filename

670
    def _getting_file_from_url(self, fasta, type_f):
671
672
        """
        Download file from URL
673
674
675
676
677
678
679
680
681
682
683

        :param fasta: Fasta object describing the input file
        :type fasta: Fasta
        :param type_f: type of the file (query or target)
        :type type_f: str
        :return:
            * [0] True if no error happened, else False
            * [1] If an error happened, True if the error was saved for the job, else False (will be saved later)
            * [2] Finale path of the downloaded file {str}
            * [3] Name of the downloaded file {str}
        :rtype: tuple
684
685
        """
        try:
686
687
            dl_path = self._download_file(fasta.get_path())
        except (ConnectionError, URLError):
688
689
690
691
692
693
694
695
696
697
698
            status = "fail"
            error = "<p>Url <b>%s</b> is not valid!</p>" \
                    "<p>If this is unattended, please contact the support.</p>" % fasta.get_path()
            if MODE == "webserver":
                with Job.connect():
                    job = Job.get(Job.id_job == self.id_job)
                    job.status = status
                    job.error = error
                    job.save()
            else:
                self.set_status_standalone(status, error)
699
            return False, True, None, None
700
701
702
703
704
705
        filename = os.path.basename(dl_path)
        name = os.path.splitext(filename.replace(".gz", ""))[0]
        finale_path = os.path.join(self.output_dir, type_f + "_" + filename)
        shutil.move(dl_path, finale_path)
        with open(os.path.join(self.output_dir, "." + type_f), "w") as save_file:
            save_file.write(finale_path)
706
        return True, False, finale_path, name
707

708
709
710
711
712
713
714
715
716
717
718
    def _check_url(self, fasta, formats):
        """
        Check if an URL is valid, and if the file is valid too

        :param fasta: fasta file object
        :type fasta: Fasta
        :param formats: allowed file formats
        :type formats: tuple
        :return: True if URL and file are valid, else False
        :rtype: bool
        """
719
        url = fasta.get_path()
720
        try:
721
            filename = self._get_filename_from_url(url)
722
723
724
725
726
727
728
729
730
731
732
733
734
        except (ConnectionError, URLError):
            status = "fail"
            error = "<p>Url <b>%s</b> is not valid!</p>" \
                    "<p>If this is unattended, please contact the support.</p>" % fasta.get_path()
            if MODE == "webserver":
                with Job.connect():
                    job = Job.get(Job.id_job == self.id_job)
                    job.status = status
                    job.error = error
                    job.save()
            else:
                self.set_status_standalone(status, error)
            return False
735
        if filename is not None:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
736
            allowed = Functions.allowed_file(filename, formats)
737
            if not allowed:
738
                status = "fail"
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
                format_txt = ""
                if len(formats) == 1:
                    if formats[0] == "fasta":
                        format_txt = "a Fasta file"
                    elif formats[0] == "idx":
                        format_txt = "an index file"
                    elif formats[0] == "map":
                        format_txt = "an alignment file"
                    elif formats[0] == "backup":
                        format_txt = "a backup file"
                    else:
                        format_txt = "a valid file"
                else:
                    if "fasta" in formats and "idx" in formats:
                        format_txt = "a Fasta file or an index file"
                    else:
                        format_txt = "a valid file"
                error = "<p>File <b>%s</b> downloaded from <b>%s</b> is not %s!</p>" \
                        "<p>If this is unattended, please contact the support.</p>" % (filename, url, format_txt)
758
759
760
761
762
763
764
765
766
767
768
769
770
771
                if MODE == "webserver":
                    with Job.connect():
                        job = Job.get(Job.id_job == self.id_job)
                        job.status = status
                        job.error = error
                        job.save()
                else:
                    self.set_status_standalone(status, error)
        else:
            allowed = False
            status = "fail"
            error = "<p>Url <b>%s</b> is not a valid URL!</p>" \
                    "<p>If this is unattended, please contact the support.</p>" % url
            if MODE == "webserver":
Floreal Cabanettes's avatar
Floreal Cabanettes committed
772
773
                with Job.connect():
                    job = Job.get(Job.id_job == self.id_job)
774
775
                    job.status = status
                    job.error = error
Floreal Cabanettes's avatar
Floreal Cabanettes committed
776
                    job.save()
777
778
            else:
                self.set_status_standalone(status, error)
779
780
        return allowed

781
    def clear(self):
782
783
784
        """
        Remove job dir
        """
785
786
787
788
        shutil.rmtree(self.output_dir)

    @staticmethod
    def get_pending_local_number():
789
790
791
792
793
794
        """
        Get number of of jobs running or waiting for a run

        :return: number of jobs
        :rtype: int
        """
795
796
797
798
799
800
        if MODE == "webserver":
            with Job.connect():
                return len(Job.select().where((Job.batch_type == "local") & (Job.status != "success") &
                                              (Job.status != "fail") & (Job.status != "no-match")))
        else:
            return 0
801

802
    def set_job_status(self, status, error=""):
803
804
805
806
807
808
809
810
        """
        Change status of a job

        :param status: new job status
        :type status: str
        :param error: error description (if any)
        :type error: str
        """
811
812
813
814
815
816
817
818
        if MODE == "webserver":
            job = Job.get(Job.id_job == self.id_job)
            job.status = status
            job.error = error
            job.save()
        else:
            self.set_status_standalone(status, error)

819
820
    def check_file(self, input_type, should_be_local, max_upload_size_readable):
        """
821
        Check if file is correct: format, size, valid gzip
822
823
824
825
826
827

        :param input_type: query or target
        :param should_be_local: True if job should be treated locally
        :param max_upload_size_readable: max upload size human readable
        :return: (True if correct, True if error set [for fail], True if should be local)
        """
828
829
        if input_type == "target" and self.query is None:
            max_upload_size_readable = self.config.max_upload_size_ava / 1024 / 1024
Floreal Cabanettes's avatar
Floreal Cabanettes committed
830
831
832
833
        with Job.connect():
            my_input = getattr(self, input_type)
            if my_input.get_path().endswith(".gz") and not self.is_gz_file(my_input.get_path()):
                # Check file is correctly gzipped
834
                self.set_job_status("fail", input_type + " file is not a correct gzip file")
Floreal Cabanettes's avatar
Floreal Cabanettes committed
835
836
837
838
                self.clear()
                return False, True, None
            # Check size:
            file_size = self.get_file_size(my_input.get_path())
839
840
            if -1 < (self.config.max_upload_size if (input_type == "query" or self.query is not None)
                     else self.config.max_upload_size_ava) < file_size:
841
842
843
                self.set_job_status("fail",
                                    input_type +
                                    " file exceed size limit of %d Mb (uncompressed)" % max_upload_size_readable)
Floreal Cabanettes's avatar
Floreal Cabanettes committed
844
845
                self.clear()
                return False, True, None
846
847
848
849
850
851
852
853

            if input_type == "align":
                if not hasattr(validators, self.aln_format):
                    self.set_job_status("fail", "Alignment file format not supported")
                    return False, True, None
                if not getattr(validators, self.aln_format)(self.align.get_path()):
                    self.set_job_status("fail", "Alignment file is invalid. Please check your file.")
                    return False, True, None
Floreal Cabanettes's avatar
Floreal Cabanettes committed
854
            elif input_type != "backup":
855
856
857
858
859
860
                if my_input.get_path().endswith(".idx"):
                    if not validators.idx(my_input.get_path()):
                        self.set_job_status("fail",
                                            "%s index file is invalid. Please check your file." %
                                            input_type.capitalize())
                        return False, True, None
861
862
863
864
                if self.config.batch_system_type != "local" and file_size >= getattr(self.config,
                                                                                     "min_%s_size" % input_type):
                    should_be_local = False

865
866
867
        return True, False, should_be_local

    def download_files_with_pending(self, files_to_download, should_be_local, max_upload_size_readable):
868
869
870
871
872
873
874
875
876
877
878
        """
        Download files from URLs, with pending (according to the max number of concurrent downloads)

        :param files_to_download: files to download. For each item of the list, it's a list with 2 elements: first one
            is the Fasta object, second one the input type (query or target)
        :type files_to_download: list of list
        :param should_be_local: True if the job should be run locally (according to input file sizes), else False
        :type should_be_local: bool
        :param max_upload_size_readable: Human readable max upload size (to show on errors)
        :type max_upload_size_readable: str
        """
Floreal Cabanettes's avatar
Floreal Cabanettes committed
879
        with Job.connect():
880
881
882
883
884
885
886
887
888
889
890
891
892
            status = "getfiles-waiting"
            if MODE == "webserver":
                job = Job.get(Job.id_job == self.id_job)
                job.status = status
                job.save()
                # Create a session:
                s_id = Session.new(True)
                session = Session.get(s_id=s_id)
            else:
                status = "getfiles"
                session = None
                job = None
                s_id = None
Floreal Cabanettes's avatar
Floreal Cabanettes committed
893

Floreal Cabanettes's avatar
Floreal Cabanettes committed
894
895
896
            try:
                correct = True
                error_set = False
897
898
899
900
                if MODE == "webserver":
                    allowed = session.ask_for_upload(True)
                else:
                    allowed = True
Floreal Cabanettes's avatar
Floreal Cabanettes committed
901
902
903
904
905
                while not allowed:
                    time.sleep(15)
                    session = Session.get(s_id=s_id)
                    allowed = session.ask_for_upload(False)
                if allowed:
906
907
908
                    if MODE == "webserver":
                        job.status = "getfiles"
                        job.save()
Floreal Cabanettes's avatar
Floreal Cabanettes committed
909
                    for file, input_type in files_to_download:
910
                        correct, error_set, finale_path, filename = self._getting_file_from_url(file, input_type)
Floreal Cabanettes's avatar
Floreal Cabanettes committed
911
912
913
914
915
916
917
918
919
920
                        if not correct:
                            break
                        my_input = getattr(self, input_type)
                        my_input.set_path(finale_path)
                        my_input.set_name(filename)
                        correct, error_set, should_be_local = self.check_file(input_type, should_be_local,
                                                                              max_upload_size_readable)
                        if not correct:
                            break

921
                    if correct and MODE == "webserver" and job.batch_type != "local" and should_be_local \
Floreal Cabanettes's avatar
Floreal Cabanettes committed
922
923
924
925
926
927
                            and self.get_pending_local_number() < self.config.max_run_local:
                        job.batch_type = "local"
                        job.save()
                else:
                    correct = False
            except:  # Except all possible exceptions
928
                traceback.print_exc()
Floreal Cabanettes's avatar
Floreal Cabanettes committed
929
                correct = False
Floreal Cabanettes's avatar
Floreal Cabanettes committed
930
                error_set = False
931
932
            if MODE == "webserver":
                session.delete_instance()
Floreal Cabanettes's avatar
Floreal Cabanettes committed
933
            self._after_start(correct, error_set)
934

935
    def getting_files(self):
936
937
        """
        Get files for the job
938
939
940
941
942
943

        :return:
            * [0] True if getting files succeed, False else
            * [1] If error happenned, True if error already saved for the job, False else (error will be saved later)
            * [2] True if no data must be downloaded (will be downloaded with pending if True)
        :rtype: tuple
944
        """
Floreal Cabanettes's avatar
Floreal Cabanettes committed
945
        with Job.connect():
946
947
948
949
950
951
952
953
            status = "getfiles"
            if MODE == "webserver":
                job = Job.get(Job.id_job == self.id_job)
                job.status = status
                job.save()
            else:
                job = None
                self.set_status_standalone(status)
Floreal Cabanettes's avatar
Floreal Cabanettes committed
954
955
956
957
958
959
            correct = True
            should_be_local = True
            max_upload_size_readable = self.config.max_upload_size / 1024 / 1024  # Set it in Mb
            files_to_download = []
            if self.query is not None:
                if self.query.get_type() == "local":
960
                    self.query.set_path(self._getting_local_file(self.query, "query"))
Floreal Cabanettes's avatar
Floreal Cabanettes committed
961
                    correct, error_set, should_be_local = self.check_file("query", should_be_local,
962
963
                                                                          max_upload_size_readable)
                    if not correct:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
964
                        return False, error_set, True
965
                elif self._check_url(self.query, ("fasta",) if self.align is None else ("fasta", "idx")):
Floreal Cabanettes's avatar
Floreal Cabanettes committed
966
                    files_to_download.append([self.query, "query"])
967
                else:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
968
                    return False, True, True
969
970
            if correct and self.target is not None:
                if self.target.get_type() == "local":
971
                    self.target.set_path(self._getting_local_file(self.target, "target"))
972
973
974
975
                    correct, error_set, should_be_local = self.check_file("target", should_be_local,
                                                                          max_upload_size_readable)
                    if not correct:
                        return False, error_set, True
976
                elif self._check_url(self.target, ("fasta",) if self.align is None else ("fasta", "idx")):
977
978
979
980
981
                    files_to_download.append([self.target, "target"])
                else:
                    return False, True, True
            if correct and self.align is not None:
                if self.align.get_type() == "local":
982
                    self.align.set_path(self._getting_local_file(self.align, "align"))
983
984
                    correct, error_set, should_be_local = self.check_file("align", should_be_local,
                                                                          max_upload_size_readable)
985
                elif self._check_url(self.align, ("map",)):
986
987
988
                    files_to_download.append([self.align, "align"])
                else:
                    return False, True, True
989
990
            if correct and self.backup is not None:
                if self.backup.get_type() == "local":
991
                    self.backup.set_path(self._getting_local_file(self.backup, "backup"))
992
993
                    correct, error_set, should_be_local = self.check_file("backup", should_be_local,
                                                                          max_upload_size_readable)
994
                elif self._check_url(self.backup, ("backup",)):
995
996
997
                    files_to_download.append([self.backup, "backup"])
                else:
                    return False, True, True
Floreal Cabanettes's avatar
Floreal Cabanettes committed
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008

            all_downloaded = True
            if correct :
                if len(files_to_download) > 0:
                    all_downloaded = False
                    thread = threading.Timer(0, self.download_files_with_pending,
                                             kwargs={"files_to_download": files_to_download,
                                                     "should_be_local": should_be_local,
                                                     "max_upload_size_readable": max_upload_size_readable})
                    thread.start()  # Start the execution

1009
                elif correct and MODE == "webserver" and job.batch_type != "local" and should_be_local \
Floreal Cabanettes's avatar
Floreal Cabanettes committed
1010
1011
1012
                        and self.get_pending_local_number() < self.config.max_run_local:
                    job.batch_type = "local"
                    job.save()
Floreal Cabanettes's avatar
Floreal Cabanettes committed
1013
        return correct, False, all_downloaded
1014

1015
1016
    def send_mail_post(self):
        """
1017
        Send mail using POST url (if there is no access to mailer)
1018
1019
        """
        key = Functions.random_string(15)
Floreal Cabanettes's avatar
Floreal Cabanettes committed
1020
        key_file = os.path.join(self.config.app_data, self.id_job, ".key")
1021
1022
1023
        with open(key_file, "w") as k_f:
            k_f.write(key)
        data = parse.urlencode({"key": key}).encode()
Floreal Cabanettes's avatar
Floreal Cabanettes committed
1024
        req = request.Request(self.config.web_url + "/send-mail/" + self.id_job, data=data)
1025
1026
1027
1028
        resp = request.urlopen(req)
        if resp.getcode() != 200:
            print("Job %s: Send mail failed!" % self.id_job)

1029
    def run_job_in_thread(self, batch_system_type="local"):
1030
1031
1032
1033
1034
1035
        """
        Run a job asynchronously into a new thread

        :param batch_system_type: slurm or sge
        :type batch_system_type: str
        """
1036
        thread = threading.Timer(1, self.run_job, kwargs={"batch_system_type": batch_system_type})
1037
1038
        thread.start()  # Start the execution

1039
    def prepare_data_in_thread(self):
1040
1041
1042
        """
        Prepare data in a new thread
        """
1043
1044
1045
        thread = threading.Timer(1, self.prepare_data)
        thread.start()  # Start the execution

1046
    def prepare_data_cluster(self, batch_system_type):
1047
1048
1049
1050
1051
1052
1053
1054
        """
        Launch of prepare data on a cluster

        :param batch_system_type: slurm or sge
        :type batch_system_type: str
        :return: True if succeed, else False
        :rtype: bool
        """
1055
1056
1057
1058
        args = [self.config.cluster_prepare_script,
                "-t", self.target.get_path(),
                "-m", self.target.get_name(),
                "-p", self.preptime_file]
1059
        if self.query is not None:
1060
1061
1062
            args += ["-q", self.query.get_path(),
                     "-u", self.get_query_split(),
                     "-n", self.query.get_name()]
1063
1064
            if self.tool.split_before:
                args.append("--split")
1065
1066
1067
1068
1069
1070
1071
1072
1073
        success = self.launch_to_cluster(step="prepare",
                                         batch_system_type=batch_system_type,
                                         command=self.config.cluster_python_exec,
                                         args=args,
                                         log_out=self.logs,
                                         log_err=self.logs)
        if not success and MODE == "webserver" and self.config.send_mail_status:
            self.send_mail_post()
        return success
1074
1075

    def prepare_data_local(self):
1076
1077
        """
        Prepare data locally. On standalone mode, launch job after, if success.
1078
1079
        :return: True if job succeed, else False
        :rtype: bool
1080
        """
Floreal Cabanettes's avatar
Floreal Cabanettes committed
1081
        with open(self.preptime_file, "w") as ptime, Job.connect():
1082
            self.set_job_status("preparing")
1083
            ptime.write(str(round(time.time())) + "\n")