Commit 695c2bf5 authored by Floreal Cabanettes's avatar Floreal Cabanettes
Browse files

Fix Mysql lost connection

parent 64addeff
......@@ -35,22 +35,23 @@ def parse_upload_folders(upload_folder, now, max_age, fake=False):
def parse_database(app_data, max_age, fake=False):
old_jobs = Job.select().where(
((Job.status == "success") & (Job.date_created < datetime.now() - timedelta(days=max_age["data"])))
|
((Job.status != "success") & (Job.date_created < datetime.now() - timedelta(days=max_age["error"])))
)
for job in old_jobs:
id_job = job.id_job
print("Removing job %s..." % id_job)
data_dir = os.path.join(app_data, id_job)
if os.path.exists(data_dir) and os.path.isdir(data_dir):
with Job.connect():
old_jobs = Job.select().where(
((Job.status == "success") & (Job.date_created < datetime.now() - timedelta(days=max_age["data"])))
|
((Job.status != "success") & (Job.date_created < datetime.now() - timedelta(days=max_age["error"])))
)
for job in old_jobs:
id_job = job.id_job
print("Removing job %s..." % id_job)
data_dir = os.path.join(app_data, id_job)
if os.path.exists(data_dir) and os.path.isdir(data_dir):
if not fake:
shutil.rmtree(data_dir)
else:
print("Job %s has no data folder!" % id_job)
if not fake:
shutil.rmtree(data_dir)
else:
print("Job %s has no data folder!" % id_job)
if not fake:
job.delete_instance()
job.delete_instance()
def parse_data_folders(app_data, now, max_age, fake=False):
......
......@@ -46,140 +46,149 @@ def _printer(*messages):
def start_job(id_job, batch_system_type="local"):
_printer("Start job", id_job)
job = Job.get(Job.id_job == id_job)
job.status = "starting"
job.save()
job_mng = JobManager(id_job=id_job, email=job.email)
job_mng.set_inputs_from_res_dir()
job_mng.run_job_in_thread(batch_system_type)
with Job.connect():
job = Job.get(Job.id_job == id_job)
job.status = "starting"
job.save()
job_mng = JobManager(id_job=id_job, email=job.email)
job_mng.set_inputs_from_res_dir()
job_mng.run_job_in_thread(batch_system_type)
def get_scheduled_local_jobs():
all_jobs = []
jobs = Job.select().where((Job.batch_type == "local") & ((Job.status == "prepared") | (Job.status == "scheduled"))).\
order_by(Job.date_created)
for job in jobs:
all_jobs.append(job.id_job)
job.status = "scheduled"
job.save()
with Job.connect():
jobs = Job.select().where((Job.batch_type == "local") & ((Job.status == "prepared") | (Job.status == "scheduled"))).\
order_by(Job.date_created)
for job in jobs:
all_jobs.append(job.id_job)
job.status = "scheduled"
job.save()
return all_jobs
def get_scheduled_cluster_jobs():
all_jobs = []
jobs = Job.select().where((Job.batch_type != "local") & ((Job.status == "prepared") | (Job.status == "scheduled"))).\
order_by(Job.date_created)
for job in jobs:
all_jobs.append({"job_id": job.id_job, "batch_type": job.batch_type})
job.status = "scheduled"
job.save()
with Job.connect():
jobs = Job.select().where((Job.batch_type != "local") & ((Job.status == "prepared") | (Job.status == "scheduled"))).\
order_by(Job.date_created)
for job in jobs:
all_jobs.append({"job_id": job.id_job, "batch_type": job.batch_type})
job.status = "scheduled"
job.save()
return all_jobs
def prepare_job(id_job):
_printer("Prepare data for job:", id_job)
job = Job.get(Job.id_job == id_job)
job.status = "preparing"
job.save()
job_mng = JobManager(id_job=id_job, email=job.email)
job_mng.set_inputs_from_res_dir()
job_mng.prepare_data_in_thread()
with Job.connect():
job = Job.get(Job.id_job == id_job)
job.status = "preparing"
job.save()
job_mng = JobManager(id_job=id_job, email=job.email)
job_mng.set_inputs_from_res_dir()
job_mng.prepare_data_in_thread()
def get_prep_scheduled_jobs():
jobs = Job.select().where(Job.status == "waiting").order_by(Job.date_created)
return [(j.id_job, j.batch_type) for j in jobs]
with Job.connect():
jobs = Job.select().where(Job.status == "waiting").order_by(Job.date_created)
return [(j.id_job, j.batch_type) for j in jobs]
def get_preparing_jobs_nb():
return len(Job.select().where(Job.status == "preparing"))
with Job.connect():
return len(Job.select().where(Job.status == "preparing"))
def get_preparing_jobs_cluster_nb():
return len(Job.select().where(Job.status == "preparing-cluster")), \
len(Job.select().where(Job.status == "prepare-scheduled"))
with Job.connect():
return len(Job.select().where(Job.status == "preparing-cluster")), \
len(Job.select().where(Job.status == "prepare-scheduled"))
def parse_started_jobs():
jobs_started = [] # Only local jobs
cluster_jobs_started = [] # Only cluster jobs
jobs = Job.select().where((Job.status == "started") | (Job.status == "starting") | (Job.status == "merging") |
(Job.status == "scheduled-cluster") | (Job.status == "prepare-scheduled") |
(Job.status == "prepare-cluster"))
for job in jobs:
pid = job.id_process
if job.batch_type == "local":
if job.status != "started" or psutil.pid_exists(pid):
jobs_started.append(job.id_job)
else:
print("Job %s (pid: %d) has died!" % (job.id_job, job.id_process))
job.status = "fail"
job.error = "<p>Your job has failed for an unexpected reason. Please contact the support.</p>"
job.save()
# Todo: send mail about the error
else:
if job.status in ["started", "scheduled-cluster", "prepare-scheduled", "preparing-cluster"]:
s = DRMAA_SESSION.session
status = s.jobStatus(str(job.id_process))
if status not in [drmaa.JobState.RUNNING, drmaa.JobState.DONE, drmaa.JobState.QUEUED_ACTIVE,
drmaa.JobState.SYSTEM_ON_HOLD, drmaa.JobState.USER_ON_HOLD,
drmaa.JobState.USER_SYSTEM_ON_HOLD]:
if job.batch_type == "slurm":
os.system("scancel %s" % job.id_process)
elif job.batch_type == "sge":
os.system("qdel %s" % job.id_process)
print("Job %s (id on cluster: %d) has died!" % (job.id_job, job.id_process))
with Job.connect():
jobs_started = [] # Only local jobs
cluster_jobs_started = [] # Only cluster jobs
jobs = Job.select().where((Job.status == "started") | (Job.status == "starting") | (Job.status == "merging") |
(Job.status == "scheduled-cluster") | (Job.status == "prepare-scheduled") |
(Job.status == "prepare-cluster"))
for job in jobs:
pid = job.id_process
if job.batch_type == "local":
if job.status != "started" or psutil.pid_exists(pid):
jobs_started.append(job.id_job)
else:
print("Job %s (pid: %d) has died!" % (job.id_job, job.id_process))
job.status = "fail"
job.error = "<p>Your job has failed for an unexpected reason. Please contact the support.</p>"
job.save()
# Todo: send mail about the error
else:
if job.status == "scheduled-cluster" and status == drmaa.JobState.RUNNING:
job.status = "started"
job.save()
cluster_jobs_started.append(job.id_job)
elif job.status == "prepare-scheduled" and status == drmaa.JobState.RUNNING:
job.status = "preparing-cluster"
job.save()
else:
cluster_jobs_started.append(job.id_job)
if job.status in ["started", "scheduled-cluster", "prepare-scheduled", "preparing-cluster"]:
s = DRMAA_SESSION.session
status = s.jobStatus(str(job.id_process))
if status not in [drmaa.JobState.RUNNING, drmaa.JobState.DONE, drmaa.JobState.QUEUED_ACTIVE,
drmaa.JobState.SYSTEM_ON_HOLD, drmaa.JobState.USER_ON_HOLD,
drmaa.JobState.USER_SYSTEM_ON_HOLD]:
if job.batch_type == "slurm":
os.system("scancel %s" % job.id_process)
elif job.batch_type == "sge":
os.system("qdel %s" % job.id_process)
print("Job %s (id on cluster: %d) has died!" % (job.id_job, job.id_process))
job.status = "fail"
job.error = "<p>Your job has failed for an unexpected reason. Please contact the support.</p>"
job.save()
# Todo: send mail about the error
else:
if job.status == "scheduled-cluster" and status == drmaa.JobState.RUNNING:
job.status = "started"
job.save()
cluster_jobs_started.append(job.id_job)
elif job.status == "prepare-scheduled" and status == drmaa.JobState.RUNNING:
job.status = "preparing-cluster"
job.save()
else:
cluster_jobs_started.append(job.id_job)
return jobs_started, cluster_jobs_started
def parse_uploads_asks():
now = datetime.now()
# Get allowed:
all_sessions = Session.select()
nb_sessions = len(all_sessions)
_printer("All sessions:", nb_sessions)
sessions = Session.select().where(Session.allow_upload)
nb_active_dl = len(sessions)
_printer("Active_dl:", nb_active_dl)
for session in sessions:
if not session.keep_active and (now - session.last_ping).total_seconds() > 30:
_printer("Delete 1 active session:", session.s_id)
session.delete_instance() # We consider the user has left
nb_active_dl -= 1
# Get pending:
sessions = Session.select().where((Session.allow_upload == False) & (Session.position >= 0)).order_by(Session.position)
_printer("Pending:", len(sessions))
for session in sessions:
delay = (now - session.last_ping).total_seconds()
if delay > 30:
session.position = -1 # Reset position, the user has probably left
session.save()
_printer("Reset 1 session:", session.s_id)
elif nb_active_dl < config_reader.max_concurrent_dl:
session.allow_upload = True
session.save()
nb_active_dl += 1
_printer("Enable 1 session:", session.s_id)
# Remove old sessions:
for session in all_sessions:
delay = (now - session.last_ping).total_seconds()
if delay > 86400: # Session has more than 1 day
_printer("Delete 1 outdated session:", session.s_id)
session.delete_instance() # Session has expired
with Session.connect():
now = datetime.now()
# Get allowed:
all_sessions = Session.select()
nb_sessions = len(all_sessions)
_printer("All sessions:", nb_sessions)
sessions = Session.select().where(Session.status == "active")
nb_active_dl = len(sessions)
_printer("Active_dl:", nb_active_dl)
for session in sessions:
if not session.keep_active and (now - session.last_ping).total_seconds() > 30:
_printer("Delete 1 active session:", session.s_id)
session.delete_instance() # We consider the user has left
nb_active_dl -= 1
# Get pending:
sessions = Session.select().where(Session.status == "pending").order_by(Session.date_created)
_printer("Pending:", len(sessions))
for session in sessions:
delay = (now - session.last_ping).total_seconds()
if delay > 30:
session.status = "reset" # Reset position, the user has probably left
session.save()
_printer("Reset 1 session:", session.s_id)
elif nb_active_dl < config_reader.max_concurrent_dl:
session.status = "active"
session.save()
nb_active_dl += 1
_printer("Enable 1 session:", session.s_id)
# Remove old sessions:
for session in all_sessions:
delay = (now - session.last_ping).total_seconds()
if delay > 86400: # Session has more than 1 day
_printer("Delete 1 outdated session:", session.s_id)
session.delete_instance() # Session has expired
@atexit.register
......@@ -188,6 +197,13 @@ def cleaner():
DRMAA_SESSION.exit()
def move_job_to_cluster(id_job):
with Job.connect():
job = Job.get(Job.id_job == id_job)
job.batch_type = config_reader.batch_system_type
job.save()
def parse_args():
global DEBUG, LOG_FILE
......@@ -216,12 +232,6 @@ def parse_args():
LOG_FILE = os.path.join(config_reader.log_dir, "local_scheduler.log")
def move_job_to_cluster(id_job):
job = Job.get(Job.id_job == id_job)
job.batch_type = config_reader.batch_system_type
job.save()
if __name__ == '__main__':
parse_args()
......
import os
from dgenies.config_reader import AppConfigReader
from peewee import SqliteDatabase, Model, CharField, IntegerField, DateTimeField, BooleanField, MySQLDatabase
from peewee import SqliteDatabase, Model, CharField, IntegerField, DateTimeField, BooleanField, MySQLDatabase, OperationalError
from datetime import datetime
config = AppConfigReader()
......@@ -14,10 +14,39 @@ elif db_type == "mysql":
passwd=config.database_password, database=config.database_db)
else:
raise Exception("Unsupported database type: " + db_type)
db.connect()
class Job(Model):
class Database:
nb_open = 0
def __init__(self):
pass
def __enter__(self):
Database.nb_open += 1
try:
db.connect()
except OperationalError:
pass
def __exit__(self, exc_type, exc_val, exc_tb):
Database.nb_open -= 1
if Database.nb_open == 0:
db.close()
class BaseModel(Model):
class Meta:
database = db
@classmethod
def connect(cls):
return Database()
class Job(BaseModel):
id_job = CharField(max_length=50, unique=True)
email = CharField()
id_process = IntegerField(null=True)
......@@ -28,17 +57,13 @@ class Job(Model):
mem_peak = IntegerField(null=True)
time_elapsed = IntegerField(null=True)
class Meta:
database = db
class Session(Model):
class Session(BaseModel):
s_id = CharField(max_length=20, unique=True)
date_created = DateTimeField()
upload_folder = CharField(max_length=20)
allow_upload = BooleanField(default=False)
last_ping = DateTimeField()
position = IntegerField(default=-1)
status = CharField(default="reset")
keep_active = BooleanField(default=False) # Uploads made by the server must be keep active
@classmethod
......@@ -58,27 +83,21 @@ class Session(Model):
return my_s_id
def ask_for_upload(self, change_status=False):
all_asked = Session.select().where(Session.position >= 0).order_by(Session.position)
all_asked = Session.select().where((Session.status == "pending") | (Session.status == "active")).\
order_by(Session.date_created)
nb_asked = len(all_asked)
if self.position == -1:
if nb_asked == 0:
position = 0
else:
position = all_asked[-1].position + 1
else:
if self.status != "reset":
change_status = False
position = self.position
allow_upload = self.allow_upload
if not allow_upload and change_status and nb_asked < 5:
allow_upload = True
status = "pending"
if self.status == "active" or (change_status and nb_asked < 5):
status = "active"
self.allow_upload = allow_upload
self.position = position
self.status = status
self.last_ping = datetime.now()
self.save()
return allow_upload, position
return self.status == "active"
def ping(self):
self.last_ping = datetime.now()
......
......@@ -135,8 +135,9 @@ class Functions:
@staticmethod
def get_mail_for_job(id_job):
j1 = Job.get(Job.id_job == id_job)
return j1.email
with Job.connect():
j1 = Job.get(Job.id_job == id_job)
return j1.email
@staticmethod
def send_fasta_ready(mailer, job_name, sample_name, compressed=False):
......
This diff is collapsed.
......@@ -243,7 +243,7 @@ dgenies.run.enable_form = function () {
dgenies.run.hide_loading("query");
dgenies.run.hide_loading("target");
dgenies.run.hide_success("query");
dgenies.run.hide_success("fasta");
dgenies.run.hide_success("target");
dgenies.run.files = [undefined, undefined];
dgenies.run.restore_form();
};
......
......@@ -31,16 +31,17 @@ def main():
@app.route("/run", methods=['GET'])
def run():
s_id = Session.new()
id_job = Functions.random_string(5) + "_" + datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d%H%M%S')
if "id_job" in request.args:
id_job = request.args["id_job"]
email = ""
if "email" in request.args:
email = request.args["email"]
return render_template("run.html", title=app_title, id_job=id_job, email=email,
menu="run", allowed_ext=ALLOWED_EXTENSIONS, s_id=s_id,
max_upload_file_size=config_reader.max_upload_file_size)
with Session.connect():
s_id = Session.new()
id_job = Functions.random_string(5) + "_" + datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d%H%M%S')
if "id_job" in request.args:
id_job = request.args["id_job"]
email = ""
if "email" in request.args:
email = request.args["email"]
return render_template("run.html", title=app_title, id_job=id_job, email=email,
menu="run", allowed_ext=ALLOWED_EXTENSIONS, s_id=s_id,
max_upload_file_size=config_reader.max_upload_file_size)
@app.route("/run-test", methods=['GET'])
......@@ -48,14 +49,16 @@ def run_test():
print(config_reader.allowed_ip_tests)
if request.remote_addr not in config_reader.allowed_ip_tests:
return abort(404)
return Session.new()
with Session.connect():
return Session.new()
# Launch analysis
@app.route("/launch_analysis", methods=['POST'])
def launch_analysis():
try:
session = Session.get(s_id=request.form["s_id"])
with Session.connect():
session = Session.get(s_id=request.form["s_id"])
except DoesNotExist:
return jsonify({"success": False, "errors": ["Session has expired. Please refresh the page and try again"]})
# Reset session upload:
......@@ -401,12 +404,12 @@ def summary(id_res):
def ask_upload():
try:
s_id = request.form['s_id']
session = Session.get(s_id=s_id)
allowed, position = session.ask_for_upload(True)
with Session.connect():
session = Session.get(s_id=s_id)
allowed = session.ask_for_upload(True)
return jsonify({
"success": True,
"allowed": allowed,
"position": position
"allowed": allowed
})
except DoesNotExist:
return jsonify({"success": False, "message": "Session not initialized. Please refresh the page."})
......@@ -415,8 +418,9 @@ def ask_upload():
@app.route("/ping-upload", methods=['POST'])
def ping_upload():
s_id = request.form['s_id']
session = Session.get(s_id=s_id)
session.ping()
with Session.connect():
session = Session.get(s_id=s_id)
session.ping()
return "OK"
......@@ -424,38 +428,39 @@ def ping_upload():
def upload():
try:
s_id = request.form['s_id']
session = Session.get(s_id=s_id)
if session.ask_for_upload(False)[0]:
folder = session.upload_folder
files = request.files[list(request.files.keys())[0]]
if files:
filename = files.filename
folder_files = os.path.join(app.config["UPLOAD_FOLDER"], folder)
if not os.path.exists(folder_files):
os.makedirs(folder_files)
filename = Functions.get_valid_uploaded_filename(filename, folder_files)
mime_type = files.content_type
if not Functions.allowed_file(files.filename):
result = UploadFile(name=filename, type_f=mime_type, size=0, not_allowed_msg="File type not allowed")
shutil.rmtree(folder_files)
else:
# save file to disk
uploaded_file_path = os.path.join(folder_files, filename)
files.save(uploaded_file_path)
# get file size after saving
size = os.path.getsize(uploaded_file_path)
# return json for js call back
result = UploadFile(name=filename, type_f=mime_type, size=size)
return jsonify({"files": [result.get_file()], "success": "OK"})
return jsonify({"files": [], "success": "404", "message": "No file provided"})
return jsonify({"files": [], "success": "ERR", "message": "Not allowed to upload!"})
with Session.connect():
session = Session.get(s_id=s_id)
if session.ask_for_upload(False):
folder = session.upload_folder
files = request.files[list(request.files.keys())[0]]
if files:
filename = files.filename
folder_files = os.path.join(app.config["UPLOAD_FOLDER"], folder)
if not os.path.exists(folder_files):
os.makedirs(folder_files)
filename = Functions.get_valid_uploaded_filename(filename, folder_files)
mime_type = files.content_type
if not Functions.allowed_file(files.filename):
result = UploadFile(name=filename, type_f=mime_type, size=0, not_allowed_msg="File type not allowed")
shutil.rmtree(folder_files)
else:
# save file to disk
uploaded_file_path = os.path.join(folder_files, filename)
files.save(uploaded_file_path)
# get file size after saving
size = os.path.getsize(uploaded_file_path)
# return json for js call back
result = UploadFile(name=filename, type_f=mime_type, size=size)
return jsonify({"files": [result.get_file()], "success": "OK"})
return jsonify({"files": [], "success": "404", "message": "No file provided"})
return jsonify({"files": [], "success": "ERR", "message": "Not allowed to upload!"})
except DoesNotExist:
return jsonify({"files": [], "success": "ERR", "message": "Session not initialized. Please refresh the page."})