Commit 742f50a2 authored by Floreal Cabanettes's avatar Floreal Cabanettes
Browse files

Remove position from sessions, use date_created instead

parent f6ead9f8
......@@ -148,13 +148,13 @@ def parse_uploads_asks():
session.remove() # We consider the user has left
nb_active_dl -= 1
# Get pending:
sessions = Session.sort_sessions(Session.get_by_status("pending"), "position")
sessions = Session.sort_sessions(Session.get_by_status("pending"), "date_created")
_printer("Pending:", len(sessions))
for session in sessions:
delay = (now - session.last_ping).total_seconds()
if delay > 30:
try:
session.reset() # Reset position, the user has probably left
session.reset() # Reset session, the user has probably left
except DoesNotExist:
pass
_printer("Reset 1 session:", session.s_id)
......
......@@ -13,6 +13,17 @@ import abc
class Database:
_ops = {
"==": operator.eq,
"!=": operator.ne,
">=": operator.ge,
">": operator.gt,
"<=": operator.le,
"<": operator.lt,
"in": lambda a, b: a in b,
"not in": lambda a, b: a not in b
}
def __init__(self, d_id, type):
self.id = d_id
self.type = type
......@@ -260,24 +271,14 @@ class Job(Database):
thread = threading.Timer(2, self.remove, kwargs={"safe": False})
thread.start()
@staticmethod
def select(properties: dict):
@classmethod
def select(cls, properties: dict):
"""
Select jobs with some properties
:param properties: dict of properties with in values the value and the operator:
{"prop1": [">", 25], "prop2": ["==", "success"], ...}
:return:
"""
ops = {
"==": operator.eq,
"!=": operator.ne,
">=": operator.ge,
">": operator.gt,
"<=": operator.le,
"<": operator.lt,
"in": lambda a, b: a in b,
"not in": lambda a, b: a not in b
}
if "id_job" in properties:
jobs = [properties["id_job"]]
......@@ -295,7 +296,7 @@ class Job(Database):
else:
match = True
for my_property, value in properties.items():
if value[0] not in ops:
if value[0] not in cls._ops:
raise ValueError("Invalid operator: %s" % value[0])
try:
j_value = job.__getattribute__(my_property)
......@@ -303,7 +304,7 @@ class Job(Database):
match = False
break
else:
if not ops[value[0]](j_value, value[1]):
if not cls._ops[value[0]](j_value, value[1]):
match = False
break
if not match:
......@@ -433,7 +434,6 @@ class Session(Database):
self._s_status = "reset"
self._s_upload_folder = None
self._s_date_last_ping = None
self._s_position = -1
self._s_keep_active = False
self._old_status = None
self._loaded = False
......@@ -483,15 +483,6 @@ class Session(Database):
self._s_date_last_ping = value
self._changes.append("date_last_ping")
@property
def position(self):
return self._s_position
@position.setter
def position(self, value):
self._s_position = value
self._changes.append("position")
@property
def keep_active(self):
return self._s_keep_active
......@@ -574,32 +565,49 @@ class Session(Database):
if save:
self.save()
@staticmethod
def get_by_status(status):
@classmethod
def get_by_status(cls, status, conditions=()):
"""
Get all Session objects with a given status
:param status:
:param conditions: list of conditions to apply to selection
:return: list of Session objects
"""
status_dir = Session._get_session_status_dir(status)
sessions = []
for file in os.listdir(status_dir):
try:
sessions.append(Session(file))
session = Session(file)
accept = True
for condition in conditions:
if condition[1] not in cls._ops:
raise ValueError("Invalid operator: %s" % condition[1])
try:
s_value = session.__getattribute__(condition[0])
except AttributeError:
accept = False
break
else:
if not cls._ops[condition[1]](s_value, condition[2]):
accept = False
break
if accept:
sessions.append(session)
except DoesNotExist:
pass
return sessions
@staticmethod
def get_by_statuses(statuses):
@classmethod
def get_by_statuses(cls, statuses, conditions=()):
"""
Get all Session objects with status in the given list
:param statuses: list of accepted statuses
:param conditions: list of conditions to apply to selection
:return: list of Session objects
"""
sessions = []
for status in statuses:
sessions += Session.get_by_status(status)
sessions += cls.get_by_status(status, conditions)
return sessions
@classmethod
......@@ -619,16 +627,10 @@ class Session(Database):
return sessions
def ask_for_upload(self, change_status=False):
all_asked = self.get_by_statuses(["pending", "active"])
all_asked = self.get_by_statuses(["pending", "active"], [["date_created", "<=", self.date_created]])
nb_asked = len(all_asked)
if self.position == -1:
if nb_asked == 0:
position = 0
else:
position = all_asked[-1].position + 1
else:
if self.status != "reset":
change_status = False
position = self.position
status = "pending"
if self.status == "active" or (change_status and nb_asked < 5):
......@@ -636,11 +638,10 @@ class Session(Database):
if status != self.status:
self.status = status
self.position = position
self.last_ping = datetime.now()
self.save()
return status == "active", position
return status == "active"
def ping(self):
self.last_ping = datetime.now()
......@@ -666,7 +667,6 @@ class Session(Database):
def reset(self):
self.status = "reset"
self.position = -1
self.save()
def enable(self):
......
......@@ -6,7 +6,7 @@ import time
import threading
import re
from dgenies.config_reader import AppConfigReader
from dgenies.database import Job, Session, DoesNotExist
from dgenies.database import Job, Session
from .fasta import Fasta
from .functions import Functions
import requests
......@@ -27,7 +27,7 @@ import binascii
class JobManager:
def __init__(self, job: Job, query: Fasta=None, target: Fasta=None, mailer=None):
def __init__(self, job: Job, query: Fasta = None, target: Fasta = None, mailer=None):
self.id_job = job.id_job
self.output_dir = job.output_dir
self.query = query
......@@ -208,7 +208,7 @@ class JobManager:
end = None
mem_peak = None
acct = subprocess.check_output("qacct -d 1 -j %s" % self.id_process,
shell=True).decode("utf-8")
shell=True).decode("utf-8")
lines = acct.split("\n")
for line in lines:
if line.startswith("failed"):
......@@ -279,7 +279,7 @@ class JobManager:
retval = s.wait(jobid, drmaa.Session.TIMEOUT_WAIT_FOREVER)
if retval.hasExited and (self.check_job_status_slurm() if batch_system_type == "slurm" else
self.check_job_status_sge()):
self.check_job_status_sge()):
if step == "start":
status = self.check_job_success()
else:
......@@ -420,11 +420,11 @@ class JobManager:
try:
correct = True
error_set = False
allowed, position = session.ask_for_upload(True)
allowed = session.ask_for_upload(True)
while not allowed:
time.sleep(15)
session = Session(s_id=s_id)
allowed, position = session.ask_for_upload(False)
allowed = session.ask_for_upload(False)
if allowed:
job.change_status("getfiles")
for file, input_type in files_to_download:
......@@ -490,7 +490,7 @@ class JobManager:
return False, True, True
all_downloaded = True
if correct :
if correct:
if len(files_to_download) > 0:
all_downloaded = False
thread = threading.Timer(0, self.download_files_with_pending,
......
......@@ -400,11 +400,10 @@ def ask_upload():
try:
s_id = request.form['s_id']
session = Session(s_id=s_id)
allowed, position = session.ask_for_upload(True)
allowed = session.ask_for_upload(True)
return jsonify({
"success": True,
"allowed": allowed,
"position": position
"allowed": allowed
})
except DoesNotExist:
return jsonify({"success": False, "message": "Session not initialized. Please refresh the page."})
......@@ -426,7 +425,7 @@ def upload():
try:
s_id = request.form['s_id']
session = Session(s_id=s_id)
if session.ask_for_upload(False)[0]:
if session.ask_for_upload(False):
folder = session.upload_folder
files = request.files[list(request.files.keys())[0]]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment