Commit ef9c7436 authored by Floreal Cabanettes's avatar Floreal Cabanettes
Browse files

Add job scheduler for local jobs, Implements #83, #84

parent fce91adc
......@@ -4,3 +4,4 @@ srv/database.sqlite
application.properties
logs/*
!logs/README
srv/.local_scheduler_pid
......@@ -24,3 +24,7 @@ send_mail_status = True
menage_hour = 1h00
# Frequency (days):
menage_freq = 1
[jobs]
# Number of parallel runs for local jobs:
run_local = 1
\ No newline at end of file
#!/usr/bin/env python3
import os
from lib.crons import Crons
crons = Crons()
app_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "srv")
crons = Crons(app_folder)
crons.clear()
#!/usr/bin/env python3
import os
import threading
import time
import sys
import psutil
from pony.orm import db_session, select
from tendo import singleton
# Allow only one instance:
me = singleton.SingleInstance()
app_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "srv")
os.environ["PATH"] = os.path.join(app_folder, "bin") + ":" + os.environ["PATH"]
sys.path.insert(0, app_folder)
from database import Job, db
from config_reader import AppConfigReader
from lib.job_manager import JobManager
config_reader = AppConfigReader()
NB_RUN = config_reader.get_local_nb_runs()
@db_session
def start_job(id_job):
print("Start job", id_job)
job = Job.get(id_job=id_job)
job.status = "starting"
db.commit()
job_mng = JobManager(id_job=id_job, email=job.email)
job_mng.set_inputs_from_res_dir()
job_mng.run_job_in_thread()
# thread = threading.Timer(1, job_mng.start_job, kwargs={"batch_system_type": "local"})
# thread.daemon = True # Daemonize thread
# thread.start() # Start the execution
@db_session
def get_scheduled_jobs():
all_jobs = []
jobs = select(j for j in Job if j.batch_type == "local" and (j.status == "waiting" or j.status == "scheduled")).\
sort_by(Job.date_created)
for job in jobs:
all_jobs.append(job.id_job)
job.status = "scheduled"
db.commit()
return all_jobs
@db_session
def parse_started_jobs():
jobs_started = []
jobs = select(j for j in Job if j.batch_type == "local" and j.status == "started" or j.status == "starting"
or j.status == "indexing")
for job in jobs:
pid = job.id_process
if job.status != "started" or psutil.pid_exists(pid):
jobs_started.append(job.id_job)
else:
print("Job %s (pid: %d) has died!" % (job.id_job, job.id_process))
job.status = "error"
job.error = "<p>Your job has failed for an unexpected reason. Please contact the support.</p>"
db.commit()
# Todo: send mail about the error
return jobs_started
if __name__ == '__main__':
while True:
print("Checking jobs...")
scheduled_jobs = get_scheduled_jobs()
print("Scheduled:", len(scheduled_jobs))
started_jobs = parse_started_jobs()
nb_started = len(started_jobs)
print("Started:", nb_started)
while len(scheduled_jobs) > 0 and nb_started < NB_RUN:
start_job(scheduled_jobs.pop(0))
nb_started += 1
# Wait before return
print("Sleeping...")
time.sleep(15)
#!/usr/bin/env bash
prg_dir=$1
python=$2
pid_file=$3
cd ${prg_dir}
is_started=0
if [ -f "${pid_file}" ]; then
pid=`cat .local_scheduler_pid`
if ps -p"$pid" -o "pid=" > /dev/null; then
is_started=1
fi
fi
if [ "$is_started" -eq "0" ]; then
echo "Starting scheduler..."
${python} bin/local_scheduler.py > /dev/null &
echo $! > ${pid_file}
else
echo "Already started!"
fi
\ No newline at end of file
import os
import sys
import getpass
import psutil
from crontab import CronTab
from config_reader import AppConfigReader
class Crons:
def __init__(self, base_dir=None):
def __init__(self, base_dir):
self.base_dir = base_dir
self.my_cron = CronTab(user=getpass.getuser())
self.config_reader = AppConfigReader()
self.local_scheduler_pid_file = os.path.join(self.base_dir, ".local_scheduler_pid")
def clear(self):
def clear(self, kill_scheduler=True):
# Remove old crons:
self.my_cron.remove_all(comment="dgenies")
self.my_cron.write()
if kill_scheduler:
# Kill local scheduler:
if os.path.exists(self.local_scheduler_pid_file):
with open(self.local_scheduler_pid_file) as p_f:
pid = int(p_f.readline().strip("\n"))
if psutil.pid_exists(pid):
p = psutil.Process(pid)
p.terminate()
def start_all(self):
self.clear()
self.clear(False)
self.init_menage_cron()
self.init_launch_local_cron()
def init_menage_cron(self):
"""
......@@ -26,7 +39,8 @@ class Crons:
menage_hour = self.config_reader.get_cron_menage_hour()
menage_freq = self.config_reader.get_cron_menage_freq()
if self.base_dir is not None:
job = self.my_cron.new("python3 {0}/bin/clean_jobs.py > {0}/logs/menage.log 2>&1".format(self.base_dir),
job = self.my_cron.new(sys.executable +
" {0}/bin/clean_jobs.py > {0}/logs/menage.log 2>&1".format(self.base_dir),
comment="dgenies")
job.day.every(menage_freq)
job.hour.on(menage_hour[0])
......@@ -34,3 +48,15 @@ class Crons:
self.my_cron.write()
else:
raise Exception("Crons: base_dir must not be None")
def init_launch_local_cron(self):
"""
Try to launch local scheduler (if not already launched)
:return:
"""
if self.base_dir is not None:
job = self.my_cron.new("{0}/bin/start_local_scheduler.sh {0} {1} {2} > /dev/null 2>&1 &".
format(self.base_dir, sys.executable, self.local_scheduler_pid_file),
comment="dgenies")
job.minute.every(1)
self.my_cron.write()
\ No newline at end of file
......@@ -14,6 +14,7 @@ import wget
from jinja2 import Template
import traceback
from pathlib import Path
from urllib import request, parse
class JobManager:
......@@ -44,6 +45,27 @@ class JobManager:
self.logs = os.path.join(self.output_dir, "logs.txt")
self.mailer = mailer
def set_inputs_from_res_dir(self):
res_dir = os.path.join(self.app_data, self.id_job)
query_file = os.path.join(res_dir, ".query")
if os.path.exists(query_file):
with open(query_file) as q_f:
file_path = q_f.readline()
self.query = Fasta(
name=os.path.splitext(file_path.replace(".gz", ""))[0],
path=file_path,
type_f="local"
)
target_file = os.path.join(res_dir, ".target")
if os.path.exists(target_file):
with open(target_file) as t_f:
file_path = t_f.readline()
self.target = Fasta(
name=os.path.splitext(file_path.replace(".gz", ""))[0],
path=file_path,
type_f="local"
)
def __check_job_success_local(self):
if os.path.exists(self.paf):
if os.path.getsize(self.paf) > 0:
......@@ -93,7 +115,16 @@ class JobManager:
else:
return "DGenies - Job failed: %s" % self.id_job
def send_mail(self, status):
@db_session
def send_mail(self):
# Retrieve infos:
job = Job.get(id_job=self.id_job)
if self.email is None:
self.email = job.email
status = job.status
self.error = job.error
# Send:
self.mailer.send_mail([self.email], self.get_mail_subject(status), self.get_mail_content(status),
self.get_mail_content_html(status))
......@@ -200,16 +231,28 @@ class JobManager:
correct = False
return correct
def send_mail_post(self):
"""
Send mail using POST url (we have no access to mailer)
"""
key = Functions.random_string(15)
key_file = os.path.join(self.app_data, self.id_job, ".key")
with open(key_file, "w") as k_f:
k_f.write(key)
data = parse.urlencode({"key": key}).encode()
req = request.Request(self.web_url + "/send-mail/" + self.id_job, data=data)
resp = request.urlopen(req)
if resp.getcode() != 200:
print("Job %s: Send mail failed!" % self.id_job)
def run_job_in_thread(self):
thread = threading.Timer(1, self.run_job, kwargs={"batch_system_type": "local"})
thread.start() # Start the execution
@db_session
def start_job(self):
try:
success = self.getting_files()
if success:
job = Job.get(id_job=self.id_job)
job.status = "waiting"
db.commit()
success = True
if self.batch_system_type == "local":
def run_job(self, batch_system_type):
success = False
if batch_system_type == "local":
success = self.__launch_local()
if success:
job = Job.get(id_job=self.id_job)
......@@ -226,6 +269,26 @@ class JobManager:
job = Job.get(id_job=self.id_job)
job.status = "success"
db.commit()
if self.do_send:
self.send_mail_post()
#self.send_mail(job.status)
@db_session
def start_job(self):
try:
success = self.getting_files()
if success:
job = Job.get(id_job=self.id_job)
job.status = "waiting"
db.commit()
else:
job = Job.get(id_job=self.id_job)
job.status = "error"
job.error = "<p>Error while getting input files. Please contact the support to report the bug.</p>"
db.commit()
if self.do_send:
self.send_mail()
except Exception:
print(traceback.print_exc())
job = Job.get(id_job=self.id_job)
......@@ -233,8 +296,8 @@ class JobManager:
job.error = "<p>An unexpected error has occurred. Please contact the support to report the bug.</p>"
db.commit()
if self.do_send:
job = Job.get(id_job=self.id_job)
self.send_mail(job.status)
self.send_mail()
@db_session
def launch(self):
......
......@@ -8,3 +8,5 @@ wget==3.2
requests==2.18.*
biopython==1.70
python-crontab==2.2.*
psutil==5.4.*
tendo==0.2.*
......@@ -115,3 +115,9 @@ class AppConfigReader(object):
return int(self.reader.get("cron", "menage_freq"))
except (NoOptionError, NoSectionError):
return 1
def get_local_nb_runs(self):
try:
return int(self.reader.get("jobs", "run_local"))
except (NoOptionError, NoSectionError):
return 1
......@@ -342,5 +342,27 @@ def upload():
return jsonify({"files": [], "success": "ERR", "message": "Session not initialized. Please refresh the page."})
@app.route("/send-mail/<id_res>", methods=['POST'])
def send_mail(id_res):
allowed = False
key_file = None
if "key" in request.form:
key = request.form["key"]
res_dir = os.path.join(app_data, id_res)
key_file = os.path.join(res_dir, ".key")
if os.path.exists(key_file):
with open(key_file) as k_f:
true_key = k_f.readline().strip("\n")
allowed = key == true_key
if allowed:
os.remove(key_file)
job_mng = JobManager(id_job=id_res, mailer=mailer)
job_mng.set_inputs_from_res_dir()
job_mng.send_mail()
return "OK"
else:
abort(403)
if __name__ == '__main__':
app.run()
......@@ -5,7 +5,11 @@
<p>Your job has started.<br/>
You will receive an email when it will be finished.<br/>
Alternatively, you can refresh this page to update status.</p>
{% elif status == "submitted" %}
{% elif status == "starting" %}
<p>Your job is starting...<br/>
You will receive an email when it will be finished.<br/>
Alternatively, you can refresh this page to update status.</p>
{% elif status == "submitted" or status == "waiting" %}
<p>Your job has been submitted.<br/>
You will receive an email when it will be finished.<br/>
Alternatively, you can refresh this page to update status.</p>
......@@ -17,7 +21,7 @@
<p>Your map is done. We are indexing fasta files for visualisation...<br/>
You will receive an email when it will be finished.<br/>
Alternatively, you can refresh this page to update status.</p>
{% elif status == "waiting" %}
{% elif status == "scheduled" %}
<p>Your job has been shedulded. It will start soon...<br/>
You will receive an email when it will be finished.<br/>
Alternatively, you can refresh this page to update status.</p>
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment