job_manager.py 10.4 KB
Newer Older
1
import os
2
import shutil
3
import subprocess
4
5
import datetime
import threading
6
import gzip
7
import io
8
from config_reader import AppConfigReader
9
from pony.orm import db_session, select
10
from database import db, Job
11
from lib.Fasta import Fasta
Floreal Cabanettes's avatar
Floreal Cabanettes committed
12
from lib.functions import Functions
13
14
import requests
import wget
15
from jinja2 import Template
Floreal Cabanettes's avatar
Floreal Cabanettes committed
16
from flask_mail import Message, Mail
17
18
19
20


class JobManager:

Floreal Cabanettes's avatar
Floreal Cabanettes committed
21
    def __init__(self, id_job: str, email: str=None, query: Fasta=None, target: Fasta=None, app=None, mailer: Mail=None):
22
23
        self.id_job = id_job
        self.email = email
24
25
        self.query = query
        self.target = target
26
27
28
29
30
31
        config_reader = AppConfigReader()
        # Get configs:
        self.batch_system_type = config_reader.get_batch_system_type()
        self.minimap2 = config_reader.get_minimap2_exec()
        self.threads = config_reader.get_nb_threads()
        self.app_data = config_reader.get_app_data()
32
33
34
35
36
        self.web_url = config_reader.get_web_url()
        self.mail_status = config_reader.get_mail_status_sender()
        self.mail_reply = config_reader.get_mail_reply()
        self.mail_org = config_reader.get_mail_org()
        self.do_send = config_reader.get_send_mail_status()
37
38
        # Outputs:
        self.output_dir = os.path.join(self.app_data, id_job)
39
        self.paf = os.path.join(self.output_dir, "map.paf")
40
        self.paf_raw = os.path.join(self.output_dir, "map_raw.paf")
41
42
        self.idx_q = os.path.join(self.output_dir, "query.idx")
        self.idx_t = os.path.join(self.output_dir, "target.idx")
43
        self.logs = os.path.join(self.output_dir, "logs.txt")
44
        self.mailer = mailer
Floreal Cabanettes's avatar
Floreal Cabanettes committed
45
46
47
48
49
        self.app = app

    def send_async_email(self, msg):
        with self.app.app_context():
            self.mailer.send(msg)
50

51
52
53
    def __check_job_success_local(self):
        if os.path.exists(self.paf):
            if os.path.getsize(self.paf) > 0:
54
                return "success"
55
56
57
58
59
60
        return "error"

    def check_job_success(self):
        if self.batch_system_type == "local":
            return self.__check_job_success_local()

61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
    def get_mail_content(self):
        message = "D-Genies\n\n"
        if self.status == "success":
            message += "Your job %s has successfully ended!\n\n" % self.id_job
            message += str("Your job {0} is finished. You can see  the results by clicking on the link below:\n"
                           "{1}/results/{0}\n\n").format(self.id_job, self.web_url)
        else:
            message += "Your job %s has failed!\n\n" % self.id_job
            message += "Your job %s has failed. If the problem persists, please contact the support.\n\n" % self.id_job
        message += "See you soon on D-Genies,\n"
        message += "The team"

    def get_mail_content_html(self):
        template_str = """<h1>D-Genies</h1>
<h3>{% if status == "success" %}Your job {{ job_name }} has successfully ended!{% else %}Your job {{ job_name }} has failed{% endif %}</h3>
<p>Hi,</p>
{% if status == "success" %}
<p>Your job {{ job_name }} is finished. You can <a href="{{ url_base }}/results/{{ job_name }}">click here</a> to see results.</p>
{% else %}
<p>Your job {{ job_name }} has failed. If the problem persists, please contact the support.</p>
{% endif %}
<p>See you soon on D-Genies,</p>
<p>The team</p>
        """
        template = Template(template_str)
        return template.render(job_name=self.id_job, status=self.status, url_base=self.web_url)

    def get_mail_subject(self):
        if self.status == "success":
            return "DGenies - Job %s finished" % self.id_job
        else:
            return "DGenies - Job %s failed" % self.id_job

    def send_mail(self):
        msg = Message(
            subject=self.get_mail_subject(),
Floreal Cabanettes's avatar
Floreal Cabanettes committed
97
            recipients=[self.email],
98
99
100
101
102
            html=self.get_mail_content_html(),
            body=self.get_mail_content(),
            sender=(self.mail_org, self.mail_status) if self.mail_org is not None else self.mail_status,
            reply_to=self.mail_reply
        )
Floreal Cabanettes's avatar
Floreal Cabanettes committed
103
        self.send_async_email(msg)
104

105
106
    @db_session
    def __launch_local(self):
107
        cmd = ["run_minimap2.sh", self.minimap2, self.threads,
108
               self.target.get_path() if self.target is not None else "NONE", self.query.get_path(),
109
110
               self.query.get_name(), self.target.get_name() if self.target is not None else "NONE", self.paf,
               self.paf_raw, self.output_dir]
111
112
        with open(self.logs, "w") as logs:
            p = subprocess.Popen(cmd, stdout=logs, stderr=logs)
113
114
115
        job = Job.get(id_job=self.id_job)
        job.id_process = p.pid
        job.status = "started"
116
117
        db.commit()
        p.wait()
118
119
120
        status = self.check_job_success()
        job.status = status
        db.commit()
121
        return status == "success"
122

123
124
125
126
127
    def __getting_local_file(self, fasta: Fasta):
        finale_path = os.path.join(self.output_dir, os.path.basename(fasta.get_path()))
        shutil.move(fasta.get_path(), finale_path)
        return finale_path

128
129
130
131
132
133
    def __getting_file_from_url(self, fasta: Fasta):
        finale_path = wget.download(fasta.get_path(), self.output_dir, None)
        return finale_path

    @db_session
    def __check_url(self, fasta: Fasta):
134
135
136
137
138
139
140
141
        url = fasta.get_path()
        if url.startswith("http://") or url.startswith("https://"):
            filename = requests.head(url, allow_redirects=True).url.split("/")[-1]
        elif url.startswith("ftp://"):
            filename = url.split("/")[-1]
        else:
            filename = None
        if filename is not None:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
142
            allowed = Functions.allowed_file(filename)
143
144
145
146
147
148
149
150
            if not allowed:
                job = Job.get(id_job=self.id_job)
                job.status = "error"
                job.error = "<p>File <b>%s</b> downloaded from <b>%s</b> is not a Fasta file!</p>" \
                            "<p>If this is unattended, please contact the support.</p>" % (filename, url)
                db.commit()
        else:
            allowed = False
151
152
            job = Job.get(id_job=self.id_job)
            job.status = "error"
153
154
            job.error = "<p>Url <b>%s</b> is not a valid URL!</p>" \
                        "<p>If this is unattended, please contact the support.</p>" % (url)
155
156
157
            db.commit()
        return allowed

158
159
160
161
162
    @db_session
    def getting_files(self):
        job = Job.get(id_job=self.id_job)
        job.status = "getfiles"
        db.commit()
163
        correct = True
164
165
166
        if self.query is not None:
            if self.query.get_type() == "local":
                self.query.set_path(self.__getting_local_file(self.query))
167
168
169
170
171
172
173
174
            elif self.__check_url(self.query):
                finale_path = self.__getting_file_from_url(self.query)
                filename = os.path.splitext(os.path.basename(finale_path).replace(".gz", ""))[0]
                self.query.set_path(finale_path)
                self.query.set_name(filename)
            else:
                correct = False
        if correct and self.target is not None:
175
176
            if self.target.get_type() == "local":
                self.target.set_path(self.__getting_local_file(self.target))
177
178
179
180
181
182
183
            elif self.__check_url(self.target):
                finale_path = self.__getting_file_from_url(self.target)
                filename = os.path.splitext(os.path.basename(finale_path).replace(".gz", ""))[0]
                self.target.set_path(finale_path)
                self.target.set_name(filename)
            else:
                correct = False
184
185
186
187
188
189
190
        return correct

    @db_session
    def start_job(self):
        success = self.getting_files()
        if success:
            job = Job.get(id_job=self.id_job)
191
192
            job.status = "waiting"
            db.commit()
193
            success = True
194
            if self.batch_system_type == "local":
195
196
197
198
199
200
201
202
203
204
205
206
                success = self.__launch_local()
            if success:
                job = Job.get(id_job=self.id_job)
                job.status = "indexing"
                db.commit()
                query_index = os.path.join(self.output_dir, "query.idx")
                self.index_file(self.query, query_index)
                target_index = os.path.join(self.output_dir, "target.idx")
                if self.target is not None:
                    self.index_file(self.target, target_index)
                else:
                    shutil.copyfile(query_index, target_index)
207
208
209
                job = Job.get(id_job=self.id_job)
                job.status = "success"
                db.commit()
210
211
        if self.do_send:
            self.send_mail()
212

213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
    @staticmethod
    def index_file(fasta: Fasta, out):
        compressed = fasta.get_path().endswith(".gz")
        with (gzip.open(fasta.get_path()) if compressed else open(fasta.get_path())) as in_file, \
                open(out, "w") as out_file:
            out_file.write(fasta.get_name() + "\n")
            with (io.TextIOWrapper(in_file) if compressed else in_file) as fasta:
                contig = None
                len_c = 0
                for line in fasta:
                    line = line.strip("\n")
                    if line.startswith(">"):
                        if contig is not None:
                            out_file.write("%s\t%d\n" % (contig, len_c))
                        contig = line[1:].split(" ")[0]
                        len_c = 0
                    elif len(line) > 0:
                        len_c += len(line)
                if contig is not None and len_c > 0:
                    out_file.write("%s\t%d\n" % (contig, len_c))

234
    @db_session
235
    def launch(self):
236
237
238
239
        j1 = select(j for j in Job if j.id_job == self.id_job)
        if len(j1) > 0:
            print("Old job found without result dir existing: delete it from BDD!")
            j1.delete()
240
        if self.query is not None:
241
242
243
244
245
            job = Job(id_job=self.id_job, email=self.email, batch_type=self.batch_system_type,
                      date_created=datetime.datetime.now())
            db.commit()
            if not os.path.exists(self.output_dir):
                os.mkdir(self.output_dir)
246
            thread = threading.Timer(1, self.start_job)
247
            thread.start()
248
249
250
251
        else:
            job = Job(id_job=self.id_job, email=self.email, batch_type=self.batch_system_type,
                      date_created=datetime.datetime.now(), status="error")
            db.commit()
252
253
254
255
256

    @db_session
    def status(self):
        job = Job.get(id_job=self.id_job)
        if job is not None:
257
            return job.status, job.error
258
        else:
259
            return "unknown", ""