job_manager.py 2.82 KB
Newer Older
1
2
import os
import subprocess
3
4
import datetime
import threading
5
6
from config_reader import AppConfigReader
from pony.orm import db_session
7
from database import db, Job
8
9
10
11


class JobManager:

12
    def __init__(self, id_job, email=None, fasta_q=None, fasta_t=None, query_name=None, target_name=None):
13
14
15
16
        self.id_job = id_job
        self.email = email
        self.fasta_q = fasta_q
        self.fasta_t = fasta_t
17
18
        self.query = query_name
        self.target = target_name
19
20
21
22
23
24
25
26
27
        config_reader = AppConfigReader()
        # Get configs:
        self.batch_system_type = config_reader.get_batch_system_type()
        self.minimap2 = config_reader.get_minimap2_exec()
        self.samtools = config_reader.get_samtools_exec()
        self.threads = config_reader.get_nb_threads()
        self.app_data = config_reader.get_app_data()
        # Outputs:
        self.output_dir = os.path.join(self.app_data, id_job)
28
        self.paf = os.path.join(self.output_dir, "map.paf")
29
30
        self.idx_q = os.path.join(self.output_dir, "query.idx")
        self.idx_t = os.path.join(self.output_dir, "target.idx")
31
        self.logs = os.path.join(self.output_dir, "logs.txt")
32

33
34
35
    def __check_job_success_local(self):
        if os.path.exists(self.paf):
            if os.path.getsize(self.paf) > 0:
36
37
38
39
40
                if os.path.exists(self.idx_q):
                    if os.path.getsize(self.idx_q) > 0:
                        if os.path.exists(self.idx_t):
                            if os.path.getsize(self.idx_t) > 0:
                                return "success"
41
42
43
44
45
46
        return "error"

    def check_job_success(self):
        if self.batch_system_type == "local":
            return self.__check_job_success_local()

47
48
    @db_session
    def __launch_local(self):
49
        cmd = ["run_minimap2.sh", self.minimap2, self.samtools, self.threads, self.fasta_t, self.fasta_q, self.query,
50
               self.target, self.paf, self.output_dir]
51
52
        with open(self.logs, "w") as logs:
            p = subprocess.Popen(cmd, stdout=logs, stderr=logs)
53
54
55
        job = Job.get(id_job=self.id_job)
        job.id_process = p.pid
        job.status = "started"
56
57
        db.commit()
        p.wait()
58
59
60
        status = self.check_job_success()
        job.status = status
        db.commit()
61

62
    @db_session
63
    def launch(self):
64
65
66
        job = Job(id_job=self.id_job, email=self.email, batch_type=self.batch_system_type,
                  date_created=datetime.datetime.now())
        db.commit()
67
68
69
70
71
        if not os.path.exists(self.output_dir):
            os.mkdir(self.output_dir)
        if self.batch_system_type == "local":
            thread = threading.Timer(1, self.__launch_local)
            thread.start()
72
73
74
75
76
77
78

    @db_session
    def status(self):
        job = Job.get(id_job=self.id_job)
        if job is not None:
            return job.status
        else:
79
            return "unknown"