Commit 07c77e9f authored by Floreal Cabanettes's avatar Floreal Cabanettes
Browse files

Add limit to concurrent uploads + optimize sessions to be per-tab

parent 40fe1a88
......@@ -60,3 +60,4 @@ menage_freq = 1
[jobs]
# Number of parallel runs for local jobs:
run_local = 1
max_concurrent_dl = 5
......@@ -5,6 +5,7 @@ import time
import sys
import psutil
import atexit
from datetime import datetime
from tendo import singleton
# Allow only one instance:
......@@ -14,7 +15,7 @@ app_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__fil
os.environ["PATH"] = os.path.join(app_folder, "bin") + ":" + os.environ["PATH"]
sys.path.insert(0, app_folder)
from database import Job
from database import Job, Session
from config_reader import AppConfigReader
from lib.job_manager import JobManager
......@@ -134,6 +135,42 @@ def parse_started_jobs():
return jobs_started, cluster_jobs_started
def parse_uploads_asks():
now = datetime.now()
# Get allowed:
all_sessions = Session.select()
nb_sessions = len(all_sessions)
_printer("All sessions:", nb_sessions)
sessions = Session.select().where(Session.allow_upload)
nb_active_dl = len(sessions)
_printer("Active_dl:", nb_active_dl)
for session in sessions:
if (now - session.last_ping).total_seconds() > 30:
_printer("Delete 1 active session")
session.delete_instance() # We consider the user has left
nb_active_dl -= 1
# Get pending:
sessions = Session.select().where((Session.allow_upload == False) & (Session.position >= 0)).order_by(Session.position)
_printer("Pending:", len(sessions))
for session in sessions:
delay = (now - session.last_ping).total_seconds()
if delay > 30:
session.position = -1 # Reset position, the user has probably left
session.save()
_printer("Reset 1 session")
elif nb_active_dl < config_reader.max_concurrent_dl:
session.allow_upload = True
session.save()
nb_active_dl += 1
_printer("Enable 1 session")
# Remove old sessions:
for session in all_sessions:
delay = (now - session.last_ping).total_seconds()
if delay > 86400: # Session has more than 1 day
session.delete_instance() # Session has expired
_printer("Delete 1 outdated session")
@atexit.register
def cleaner():
if "DRMAA_SESSION" in globals():
......@@ -143,6 +180,9 @@ def cleaner():
if __name__ == '__main__':
while True:
_printer("Check uploads...")
parse_uploads_asks()
_printer("")
_printer("Checking jobs...")
scheduled_jobs_local = get_scheduled_local_jobs()
scheduled_jobs_cluster = get_scheduled_cluster_jobs()
......@@ -165,3 +205,4 @@ if __name__ == '__main__':
# Wait before return
_printer("Sleeping...")
time.sleep(15 if nb_preparing_jobs == 0 else 5)
_printer("\n")
......@@ -106,8 +106,14 @@ dgenies.ajax = function(url, data, success, error, method="POST") {
);
};
dgenies.post = function(url, data, success, error) {
dgenies.ajax(url, data, success, error, "POST")
dgenies.post = function(url, data, success, error, async=true) {
dgenies.ajax({
url: url,
data: data,
success: success,
error: error,
type: "POST",
async: async})
};
dgenies.get = function (url, data, success, error) {
......
......@@ -4,10 +4,13 @@ if (!dgenies) {
dgenies.run = {};
// Init global variables:
dgenies.run.s_id = null;
dgenies.run.allowed_ext = [];
dgenies.run.files = [undefined, undefined];
dgenies.run.allow_upload = false;
dgenies.run.init = function (allowed_ext) {
dgenies.run.init = function (s_id, allowed_ext) {
dgenies.run.s_id = s_id;
dgenies.run.allowed_ext = allowed_ext;
dgenies.run.restore_form();
dgenies.run.set_events();
......@@ -47,6 +50,9 @@ dgenies.run.allowed_file = function (filename) {
dgenies.run.init_fileuploads = function () {
$('input.file-query').fileupload({
dataType: 'json',
formData: {
"s_id": dgenies.run.s_id
},
add: function (e, data) {
let filename = data.files[0].name;
if (dgenies.run.allowed_file(filename))
......@@ -84,7 +90,9 @@ dgenies.run.init_fileuploads = function () {
});
$('input.file-target').fileupload({
dataType: 'json',
formData: {folder: dgenies.run.upload_folder},
formData: {
"s_id": dgenies.run.s_id
},
add: function (e, data) {
let filename = data.files[0].name
if (dgenies.run.allowed_file(filename))
......@@ -244,7 +252,8 @@ dgenies.run.do_submit = function () {
"query": $("input#query").val(),
"query_type": $("select.query").find(":selected").text().toLowerCase(),
"target": $("input#target").val(),
"target_type": $("select.target").find(":selected").text().toLowerCase()
"target_type": $("select.target").find(":selected").text().toLowerCase(),
"s_id": dgenies.run.s_id
},
function (data, status) {
if (data["success"]) {
......@@ -312,11 +321,48 @@ dgenies.run.reset_errors = function() {
$("div.errors-submit ul.flashes").find("li").remove();
};
dgenies.run.ask_for_upload = function () {
console.log("Ask for upload...");
dgenies.post("/ask-upload",
{
"s_id": dgenies.run.s_id
},
function (data, status) {
if (data["success"]) {
let allow_upload = data["allowed"];
if (allow_upload) {
$("div#uploading-loading").html("Uploading files...");
window.setInterval(dgenies.run.ping_upload, 15000);
dgenies.run.upload_next();
}
else {
window.setTimeout(dgenies.run.ask_for_upload, 15000);
}
}
else {
dgenies.notify("message" in data ? data["message"] : "An error has occurred. Please contact the support", "danger", 3000);
}
}, undefined, false
);
};
dgenies.run.ping_upload = function () {
dgenies.post("/ping-upload",
{
"s_id": dgenies.run.s_id
},
function (data, status) {
}
);
};
dgenies.run.start_uploads = function() {
let query_type = parseInt($("select.query").val());
let has_uploads = false;
if (query_type === 0 && $("input#query").val().length > 0) {
$("button#button-query").hide();
dgenies.run.show_loading("query");
has_uploads = true;
}
else {
dgenies.run.files[0] = undefined;
......@@ -325,11 +371,18 @@ dgenies.run.start_uploads = function() {
if (target_type === 0 && $("input#target").val().length > 0) {
$("button#button-target").hide();
dgenies.run.show_loading("target");
has_uploads = true;
}
else {
dgenies.run.files[1] = undefined;
}
if (has_uploads) {
$("div#uploading-loading").html("Asking for upload...");
dgenies.run.ask_for_upload();
}
else {
dgenies.run.upload_next();
}
};
dgenies.run.show_global_loading = function () {
......
......@@ -207,3 +207,9 @@ class AppConfigReader:
return min_size
except (NoOptionError, NoSectionError):
return 0
def get_max_concurrent_dl(self):
try:
return int(self.reader.get("jobs", "max_concurrent_dl"))
except (NoOptionError, NoSectionError):
return 5
import os
from config_reader import AppConfigReader
from peewee import SqliteDatabase, Model, CharField, IntegerField, DateTimeField
from peewee import SqliteDatabase, Model, CharField, IntegerField, DateTimeField, BooleanField
from datetime import datetime
config = AppConfigReader()
file_path = config.database
......@@ -9,7 +11,7 @@ db.connect()
class Job(Model):
id_job = CharField(max_length=50)
id_job = CharField(max_length=50, unique=True)
email = CharField()
id_process = IntegerField(null=True)
batch_type = CharField(max_length=20, default="local")
......@@ -23,5 +25,62 @@ class Job(Model):
database = db
class Session(Model):
s_id = CharField(max_length=20, unique=True)
date_created = DateTimeField()
upload_folder = CharField(max_length=20)
allow_upload = BooleanField(default=False)
last_ping = DateTimeField()
position = IntegerField(default=-1)
@classmethod
def new(cls):
from lib.functions import Functions
my_s_id = Functions.random_string(20)
while len(cls.select().where(cls.s_id == my_s_id)) > 0:
my_s_id = Functions.random_string(20)
upload_folder = Functions.random_string(20)
tmp_dir = config.upload_folder
upload_folder_path = os.path.join(tmp_dir, upload_folder)
while os.path.exists(upload_folder_path):
upload_folder = Functions.random_string(20)
upload_folder_path = os.path.join(tmp_dir, upload_folder)
cls.create(s_id=my_s_id, date_created=datetime.now(), upload_folder=upload_folder, last_ping=datetime.now())
return my_s_id
def ask_for_upload(self, change_status=False):
all_asked = Session.select().where(Session.position >= 0).order_by(Session.position)
nb_asked = len(all_asked)
if self.position == -1:
if nb_asked == 0:
position = 0
else:
position = all_asked[-1].position + 1
else:
change_status = False
position = self.position
allow_upload = self.allow_upload
if change_status and nb_asked < 5:
allow_upload = True
self.allow_upload = allow_upload
self.position = position
self.last_ping = datetime.now()
self.save()
return allow_upload, position
def ping(self):
self.last_ping = datetime.now()
self.save()
class Meta:
database = db
if not Job.table_exists():
Job.create_table()
if not Session.table_exists():
Session.create_table()
......@@ -6,7 +6,7 @@ import datetime
import shutil
import re
import threading
from flask import Flask, render_template, request, url_for, jsonify, session, Response, abort
from flask import Flask, render_template, request, url_for, jsonify, Response, abort
from pathlib import Path
from lib.paf import Paf
from config_reader import AppConfigReader
......@@ -16,6 +16,8 @@ from lib.upload_file import UploadFile
from lib.fasta import Fasta
from lib.mailer import Mailer
from lib.crons import Crons
from database import Session
from peewee import DoesNotExist
import sys
......@@ -66,8 +68,7 @@ def main():
@app.route("/run", methods=['GET'])
def run():
session["user_tmp_dir"] = Functions.random_string(5) + "_" + \
datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d%H%M%S')
s_id = Session.new()
id_job = Functions.random_string(5) + "_" + datetime.datetime.fromtimestamp(time.time()).strftime('%Y%m%d%H%M%S')
if "id_job" in request.args:
id_job = request.args["id_job"]
......@@ -75,12 +76,20 @@ def run():
if "email" in request.args:
email = request.args["email"]
return render_template("run.html", title=app_title, id_job=id_job, email=email,
menu="run", allowed_ext=ALLOWED_EXTENSIONS)
menu="run", allowed_ext=ALLOWED_EXTENSIONS, s_id=s_id)
# Launch analysis
@app.route("/launch_analysis", methods=['POST'])
def launch_analysis():
try:
session = Session.get(s_id=request.form["s_id"])
except DoesNotExist:
return jsonify({"success": False, "errors": ["Session has expired. Please refresh the page and try again"]})
# Reset session upload:
session.allow_upload = False
session.position = -1
session.save()
id_job = request.form["id_job"]
email = request.form["email"]
file_query = request.form["query"]
......@@ -117,19 +126,23 @@ def launch_analysis():
# Save files:
query = None
upload_folder = session.upload_folder
if file_query != "":
query_name = os.path.splitext(file_query.replace(".gz", ""))[0] if file_query_type == "local" else None
query_path = os.path.join(app.config["UPLOAD_FOLDER"], session["user_tmp_dir"], file_query) \
query_path = os.path.join(app.config["UPLOAD_FOLDER"], upload_folder, file_query) \
if file_query_type == "local" else file_query
query = Fasta(name=query_name, path=query_path, type_f=file_query_type)
target_name = os.path.splitext(file_target.replace(".gz", ""))[0] if file_target_type == "local" else None
target_path = os.path.join(app.config["UPLOAD_FOLDER"], session["user_tmp_dir"], file_target) \
target_path = os.path.join(app.config["UPLOAD_FOLDER"], upload_folder, file_target) \
if file_target_type == "local" else file_target
target = Fasta(name=target_name, path=target_path, type_f=file_target_type)
# Launch job:
job = JobManager(id_job, email, query, target, mailer)
job.launch()
# Delete session:
session.delete_instance()
return jsonify({"success": True, "redirect": url_for(".status", id_job=id_job)})
else:
return jsonify({"success": False, "errors": errors})
......@@ -345,10 +358,36 @@ def qt_assoc(id_res):
abort(404)
@app.route("/ask-upload", methods=['POST'])
def ask_upload():
try:
s_id = request.form['s_id']
session = Session.get(s_id=s_id)
allowed, position = session.ask_for_upload(True)
return jsonify({
"success": True,
"allowed": allowed,
"position": position
})
except DoesNotExist:
return jsonify({"success": False, "message": "Session not initialized. Please refresh the page."})
@app.route("/ping-upload", methods=['POST'])
def ping_upload():
s_id = request.form['s_id']
session = Session.get(s_id=s_id)
session.ping()
return "OK"
@app.route("/upload", methods=['POST'])
def upload():
if "user_tmp_dir" in session and session["user_tmp_dir"] != "":
folder = session["user_tmp_dir"]
try:
s_id = request.form['s_id']
session = Session.get(s_id=s_id)
if session.ask_for_upload(False)[0]:
folder = session.upload_folder
files = request.files[list(request.files.keys())[0]]
if files:
......@@ -377,7 +416,12 @@ def upload():
return jsonify({"files": [result.get_file()], "success": "OK"})
return jsonify({"files": [], "success": "404", "message": "No file provided"})
return jsonify({"files": [], "success": "ERR", "message": "Not allowed to upload!"})
except DoesNotExist:
return jsonify({"files": [], "success": "ERR", "message": "Session not initialized. Please refresh the page."})
except: # Except all possible exceptions to prevent crashes
return jsonify({"files": [], "success": "ERR", "message": "An unexpected error has occurred on upload. "
"Please contact the support."})
@app.route("/send-mail/<id_res>", methods=['POST'])
......
......@@ -9,7 +9,7 @@
<script src="{{ url_for('static', filename='js/jquery.fileupload-validate.js') }}"></script>
<script src="{{ url_for('static', filename='js/dgenies.run.js') }}" type="text/JavaScript"></script>
{% endblock %}
{% block onload %}dgenies.run.init({{ allowed_ext }});{% endblock %}
{% block onload %}dgenies.run.init('{{ s_id }}',{{ allowed_ext }});{% endblock %}
{% block content %}
<form id="submit_minimap" method=post action="#">
<h2 class="title-launch">Launch map analysis</h2>
......
logs
samples.txt
samples*.txt
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment