Commit 3536adc2 authored by Floreal Cabanettes's avatar Floreal Cabanettes
Browse files

Add standalone mode, Implements #114

parent 6ce3b7f3
......@@ -6,11 +6,8 @@ import webbrowser
import threading
from glob import glob
import time
from dgenies.lib.crons import Crons
from dgenies.config_reader import AppConfigReader
from dgenies.bin.clean_jobs import parse_data_folders, parse_database, parse_upload_folders
from dgenies.database import Gallery, Job
from peewee import DoesNotExist
runned = False
......@@ -45,6 +42,8 @@ def parse_args():
default=False)
clear.add_argument("-m", "--max-age", help="Max age for job to delete (0 for all)", type=int, required=False,
default=0)
clear.add_argument("-w", "--web", help="Add this option with -j option, if you use the webserver mode", type=bool,
const=True, nargs="?", required=False, default=False)
# Gallery:
gallery = subparsers.add_parser("gallery", help="Manage gallery")
......@@ -103,11 +102,12 @@ def run(mode="standalone", debug=False, host="127.0.0.1", port=5000, no_crons=Fa
if debug:
os.environ['LOGS'] = "True"
from dgenies import launch
app = launch()
app = launch(mode=mode, debug=debug)
app.run(host=host, port=port, debug=debug)
def clear_crons():
from dgenies.lib.crons import Crons
crons = Crons(None, True)
crons.clear()
......@@ -122,8 +122,7 @@ def clear_logs():
print("No log dir defined!")
def clear_jobs(max_data_age=7):
def clear_jobs(max_data_age=7, web=False):
upload_folder = config.upload_folder
app_data = config.app_data
now = time.time()
......@@ -146,15 +145,18 @@ def clear_jobs(max_data_age=7):
)
print("")
print("######################")
print("# Parsing Jobs in DB #")
print("######################")
print("")
gallery_jobs = parse_database(
app_data=app_data,
max_age=max_age
)
print("")
if web:
print("######################")
print("# Parsing Jobs in DB #")
print("######################")
print("")
gallery_jobs = parse_database(
app_data=app_data,
max_age=max_age
)
print("")
else:
gallery_jobs = []
print("#######################")
print("# Parsing Data folder #")
......@@ -170,6 +172,8 @@ def clear_jobs(max_data_age=7):
def add_to_gallery(id_job, name, picture, query, target):
from dgenies.database import Gallery, Job
from peewee import DoesNotExist
try:
job = Job.get(id_job=id_job)
except DoesNotExist:
......@@ -185,6 +189,7 @@ def add_to_gallery(id_job, name, picture, query, target):
def del_from_gallery_by_id(id_job):
from dgenies.database import Gallery, Job
items = Gallery.select().join(Job).where(Job.id_job == id_job)
list_pictures = []
for item in items:
......@@ -194,6 +199,7 @@ def del_from_gallery_by_id(id_job):
def del_from_gallery_by_name(name):
from dgenies.database import Gallery
items = Gallery.select().where(Gallery.name == name)
list_pictures = []
for item in items:
......@@ -215,7 +221,7 @@ if __name__ == "__main__":
clear_logs()
if args.jobs:
print("Cleaning jobs...")
clear_jobs(args.max_age)
clear_jobs(args.max_age, args.web)
elif command == "gallery_add":
add_to_gallery(args.id_job, args.name, args.pict, args.query, args.target)
elif command == "gallery_del":
......
......@@ -3,7 +3,6 @@
import os
from flask import Flask
from .config_reader import AppConfigReader
from .lib.mailer import Mailer
from .lib.crons import Crons
app = None
......@@ -12,12 +11,16 @@ APP_DATA = None
config_reader = None
mailer = None
app_folder = None
MODE = "webserver"
DEBUG = False
def launch():
global app, app_title, app_folder, APP_DATA, config_reader, mailer
def launch(mode="webserver", debug=False):
global app, app_title, app_folder, APP_DATA, config_reader, mailer, MODE, DEBUG
app_folder = os.path.dirname(os.path.realpath(__file__))
MODE = mode
DEBUG = debug
# Init config reader:
config_reader = AppConfigReader()
......@@ -34,13 +37,15 @@ def launch():
app.config['SECRET_KEY'] = 'dsqdsq-255sdA-fHfg52-25Asd5'
# Init mail:
mailer = Mailer(app)
if MODE == "webserver":
from .lib.mailer import Mailer
mailer = Mailer(app)
if config_reader.debug and config_reader.log_dir != "stdout" and not os.path.exists(config_reader.log_dir):
os.makedirs(config_reader.log_dir)
# Crons:
if os.getenv('DISABLE_CRONS') != "True":
if os.getenv('DISABLE_CRONS') != "True" and MODE == "webserver":
print("Starting crons...")
crons = Crons(app_folder, config_reader.debug or os.getenv('LOGS') == "True")
crons.start_all()
......
......@@ -10,7 +10,6 @@ import argparse
from dgenies.config_reader import AppConfigReader
from dgenies.lib.functions import Functions
from dgenies.database import Job, Gallery
config_reader = AppConfigReader()
......@@ -35,6 +34,7 @@ def parse_upload_folders(upload_folder, now, max_age, fake=False):
def parse_database(app_data, max_age, fake=False):
from dgenies.database import Job, Gallery
gallery_jobs = []
with Job.connect():
old_jobs = Job.select().where(
......
......@@ -6,23 +6,28 @@ from collections import OrderedDict
class Merger:
def __init__(self, paf_in, paf_out, query_in, query_out):
def __init__(self, paf_in, paf_out, query_in, query_out, debug=False):
self.paf_in = paf_in
self.paf_out = paf_out
self.query_in = query_in
self.query_out = query_out
self.debug = debug
def _printer(self, message):
if self.debug:
print(message)
def merge(self):
print("Loading query index...")
self._printer("Loading query index...")
contigs, contigs_split, q_name = self.load_query_index(self.query_in)
print("Merging contigs in PAF file...")
self._printer("Merging contigs in PAF file...")
self.merge_paf(self.paf_in, self.paf_out, contigs, contigs_split)
print("Writing new query index...")
self._printer("Writing new query index...")
self.write_query_index(self.query_out, contigs, q_name)
print("DONE!")
self._printer("DONE!")
@staticmethod
def _get_sorted_splits(contigs_split: dict, all_contigs: dict):
......
......@@ -10,7 +10,7 @@ from collections import OrderedDict
class Splitter:
def __init__(self, input_f, name_f, output_f, size_c=10000000, query_index="query_split.idx"):
def __init__(self, input_f, name_f, output_f, size_c=10000000, query_index="query_split.idx", debug=False):
self.input_f = input_f
self.name_f = name_f
self.size_c = size_c
......@@ -20,6 +20,7 @@ class Splitter:
self.out_dir = os.path.dirname(output_f)
self.index_file = os.path.join(self.out_dir, query_index)
self.nb_contigs = 0
self.debug = debug
def split(self):
"""
......@@ -48,7 +49,8 @@ class Splitter:
return False
chr_name = re.split("\s", line[1:])[0]
fasta_str = ""
print("Parsing contig \"%s\"... " % chr_name, end="")
if self.debug:
print("Parsing contig \"%s\"... " % chr_name, end="")
elif len(line) > 0:
if next_header or re.match(r"^[ATGCKMRYSWBVHDXN.\-]+$", line.upper()) is None:
return False
......@@ -95,10 +97,11 @@ class Splitter:
self.write_contig(name, seq, enc)
index_f.write("%s\t%d\n" % (name, len(seq)))
nb_contigs = len(contigs)
if nb_contigs == 1:
print("Keeped!")
else:
print("Splited in %d contigs!" % nb_contigs)
if self.debug:
if nb_contigs == 1:
print("Keeped!")
else:
print("Splited in %d contigs!" % nb_contigs)
def parse_args():
......
from dgenies import MODE
import os
from dgenies.config_reader import AppConfigReader
from peewee import SqliteDatabase, Model, CharField, IntegerField, DateTimeField, BooleanField, MySQLDatabase, \
OperationalError, ForeignKeyField
from playhouse.shortcuts import RetryOperationalError
from datetime import datetime
config = AppConfigReader()
db_url = config.database_url
db_type = config.database_type
if MODE == "webserver":
from peewee import SqliteDatabase, Model, CharField, IntegerField, DateTimeField, BooleanField, MySQLDatabase, \
OperationalError, ForeignKeyField
from playhouse.shortcuts import RetryOperationalError
class MyRetryDB(RetryOperationalError, MySQLDatabase):
pass
db_url = config.database_url
db_type = config.database_type
if db_type == "sqlite":
db = SqliteDatabase(db_url)
elif db_type == "mysql":
db = MyRetryDB(host=config.database_url, port=config.database_port, user=config.database_user,
passwd=config.database_password, database=config.database_db)
else:
raise Exception("Unsupported database type: " + db_type)
class MyRetryDB(RetryOperationalError, MySQLDatabase):
pass
class Database:
if db_type == "sqlite":
db = SqliteDatabase(db_url)
elif db_type == "mysql":
db = MyRetryDB(host=config.database_url, port=config.database_port, user=config.database_user,
passwd=config.database_password, database=config.database_db)
else:
raise Exception("Unsupported database type: " + db_type)
nb_open = 0
def __init__(self):
pass
class Database:
nb_open = 0
def __enter__(self):
Database.nb_open += 1
try:
db.connect()
except OperationalError:
def __init__(self):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
Database.nb_open -= 1
if Database.nb_open == 0:
db.close()
def __enter__(self):
Database.nb_open += 1
try:
db.connect()
except OperationalError:
pass
def __exit__(self, exc_type, exc_val, exc_tb):
Database.nb_open -= 1
if Database.nb_open == 0:
db.close()
class BaseModel(Model):
class Meta:
database = db
class BaseModel(Model):
@classmethod
def connect(cls):
return Database()
class Meta:
database = db
@classmethod
def connect(cls):
return Database()
class Job(BaseModel):
id_job = CharField(max_length=50, unique=True)
email = CharField()
id_process = IntegerField(null=True)
batch_type = CharField(max_length=20, default="local")
status = CharField(max_length=20, default="submitted")
date_created = DateTimeField()
error = CharField(default="")
mem_peak = IntegerField(null=True)
time_elapsed = IntegerField(null=True)
class Job(BaseModel):
id_job = CharField(max_length=50, unique=True)
email = CharField()
id_process = IntegerField(null=True)
batch_type = CharField(max_length=20, default="local")
status = CharField(max_length=20, default="submitted")
date_created = DateTimeField()
error = CharField(default="")
mem_peak = IntegerField(null=True)
time_elapsed = IntegerField(null=True)
class Gallery(BaseModel):
job = ForeignKeyField(Job)
name = CharField()
query = CharField()
target = CharField()
picture = CharField()
class Gallery(BaseModel):
job = ForeignKeyField(Job)
name = CharField()
query = CharField()
target = CharField()
picture = CharField()
class Session(BaseModel):
s_id = CharField(max_length=20, unique=True)
date_created = DateTimeField()
upload_folder = CharField(max_length=20)
last_ping = DateTimeField()
status = CharField(default="reset")
keep_active = BooleanField(default=False) # Uploads made by the server must be keep active
@classmethod
def new(cls, keep_active=False):
from dgenies.lib.functions import Functions
my_s_id = Functions.random_string(20)
while len(cls.select().where(cls.s_id == my_s_id)) > 0:
class Session(BaseModel):
s_id = CharField(max_length=20, unique=True)
date_created = DateTimeField()
upload_folder = CharField(max_length=20)
last_ping = DateTimeField()
status = CharField(default="reset")
keep_active = BooleanField(default=False) # Uploads made by the server must be keep active
@classmethod
def new(cls, keep_active=False):
from dgenies.lib.functions import Functions
my_s_id = Functions.random_string(20)
upload_folder = Functions.random_string(20)
tmp_dir = config.upload_folder
upload_folder_path = os.path.join(tmp_dir, upload_folder)
while os.path.exists(upload_folder_path):
while len(cls.select().where(cls.s_id == my_s_id)) > 0:
my_s_id = Functions.random_string(20)
upload_folder = Functions.random_string(20)
tmp_dir = config.upload_folder
upload_folder_path = os.path.join(tmp_dir, upload_folder)
cls.create(s_id=my_s_id, date_created=datetime.now(), upload_folder=upload_folder, last_ping=datetime.now(),
keep_active=keep_active)
return my_s_id
while os.path.exists(upload_folder_path):
upload_folder = Functions.random_string(20)
upload_folder_path = os.path.join(tmp_dir, upload_folder)
cls.create(s_id=my_s_id, date_created=datetime.now(), upload_folder=upload_folder, last_ping=datetime.now(),
keep_active=keep_active)
return my_s_id
def ask_for_upload(self, change_status=False):
all_asked = Session.select().where((Session.status == "pending") | (Session.status == "active")).\
order_by(Session.date_created)
nb_asked = len(all_asked)
if self.status != "reset":
change_status = False
status = "pending"
if self.status == "active" or (change_status and nb_asked < 5):
status = "active"
self.status = status
self.last_ping = datetime.now()
self.save()
return self.status == "active"
def ask_for_upload(self, change_status=False):
all_asked = Session.select().where((Session.status == "pending") | (Session.status == "active")).\
order_by(Session.date_created)
nb_asked = len(all_asked)
if self.status != "reset":
change_status = False
def ping(self):
self.last_ping = datetime.now()
self.save()
status = "pending"
if self.status == "active" or (change_status and nb_asked < 5):
status = "active"
self.status = status
self.last_ping = datetime.now()
self.save()
return self.status == "active"
def ping(self):
self.last_ping = datetime.now()
self.save()
if not Job.table_exists():
Job.create_table()
if not Gallery.table_exists():
Gallery.create_table()
if not Job.table_exists():
Job.create_table()
if not Session.table_exists():
Session.create_table()
else:
class Database:
nb_open = 0
def __init__(self):
pass
def __enter__(self):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
pass
if not Gallery.table_exists():
Gallery.create_table()
class Job:
if not Session.table_exists():
Session.create_table()
@classmethod
def connect(cls):
return Database()
\ No newline at end of file
......@@ -10,7 +10,6 @@ from collections import OrderedDict
from Bio import SeqIO
from jinja2 import Template
from dgenies.config_reader import AppConfigReader
from dgenies.database import Job, Gallery
ALLOWED_EXTENSIONS = ['fa', 'fasta', 'fna', 'fa.gz', 'fasta.gz', 'fna.gz']
......@@ -137,6 +136,7 @@ class Functions:
@staticmethod
def get_mail_for_job(id_job):
from dgenies.database import Job
with Job.connect():
j1 = Job.get(Job.id_job == id_job)
return j1.email
......@@ -221,6 +221,7 @@ class Functions:
@staticmethod
def get_gallery_items():
from dgenies.database import Gallery
items = []
for item in Gallery.select():
items.append({
......@@ -232,4 +233,4 @@ class Functions:
"mem_peak": Functions.get_readable_size(item.job.mem_peak),
"time_elapsed": Functions.get_readable_time(item.job.time_elapsed)
})
return items
return items
\ No newline at end of file
from dgenies import MODE, DEBUG
import os
import shutil
import subprocess
......@@ -6,8 +8,6 @@ import time
import threading
import re
from dgenies.config_reader import AppConfigReader
from dgenies.database import Job, Session
from peewee import DoesNotExist
from .fasta import Fasta
from .functions import Functions
import requests
......@@ -26,7 +26,11 @@ from dgenies.lib.paf import Paf
import gzip
import io
import binascii
import glob
from dgenies.database import Job
if MODE == "webserver":
from dgenies.database import Session
from peewee import DoesNotExist
class JobManager:
......@@ -180,29 +184,45 @@ class JobManager:
return "Your job #ID# has failed. You can try again.<br/>If the problem persists, please contact the support."
def __launch_local(self):
if MODE == "webserver":
cmd = ["/usr/bin/time", "-f", "%e %M"]
else:
cmd = []
if self.query is not None:
cmd = ["/usr/bin/time", "-f", "%e %M", self.config.minimap2_exec, "-t", self.config.nb_threads,
cmd += [self.config.minimap2_exec, "-t", self.config.nb_threads,
self.target.get_path(), self.get_query_split()]
else:
cmd = ["/usr/bin/time", "-f", "%e %M", self.config.minimap2_exec, "-t", self.config.nb_threads, "-X",
cmd += [self.config.minimap2_exec, "-t", self.config.nb_threads, "-X",
self.target.get_path(), self.target.get_path()]
with open(self.logs, "w") as logs, open(self.paf_raw, "w") as paf_raw:
p = subprocess.Popen(cmd, stdout=paf_raw, stderr=logs)
with Job.connect():
job = Job.get(Job.id_job == self.id_job)
job.id_process = p.pid
job.status = "started"
job.save()
status = "started"
if MODE == "webserver":
job = Job.get(Job.id_job == self.id_job)
job.id_process = p.pid
job.status = status
job.save()
else:
job = None
self.set_status_standalone(status)
p.wait()
if p.returncode == 0:
status = self.check_job_success()
job.status = status
job.save()
if MODE == "webserver":
job.status = status
job.save()
else:
self.set_status_standalone(status)
return status == "success"
job.status = "fail"
self.error = self.search_error()
job.error = self.error
job.save()
status = "fail"
if MODE == "webserver":
job.status = status
job.error = self.error
job.save()
else:
self.set_status_standalone(status, self.error)
return False
def check_job_status_slurm(self):
......@@ -261,12 +281,15 @@ class JobManager:
return status == "0"
def update_job_status(self, status, id_process=None):
with Job.connect():
job = Job.get(Job.id_job == self.id_job)
job.status = status
if id_process is not None: