Commit 5ec6800b authored by Floreal Cabanettes's avatar Floreal Cabanettes
Browse files

Finish comments of all python code + some minor fixes + some refactoring

parent cbf64fab
......@@ -165,3 +165,4 @@ texinfo_documents = [
# -- Extension configuration -------------------------------------------------
autodoc_default_flags = ['private-members']
autoclass_content = "both"
dgenies.bin package
===================
Submodules
----------
dgenies.bin.clean\_jobs module
------------------------------
.. automodule:: dgenies.bin.clean_jobs
:members:
:undoc-members:
:show-inheritance:
dgenies.bin.filter\_contigs module
----------------------------------
.. automodule:: dgenies.bin.filter_contigs
:members:
:undoc-members:
:show-inheritance:
dgenies.bin.index module
------------------------
.. automodule:: dgenies.bin.index
:members:
:undoc-members:
:show-inheritance:
dgenies.bin.local\_scheduler module
-----------------------------------
.. automodule:: dgenies.bin.local_scheduler
:members:
:undoc-members:
:show-inheritance:
dgenies.bin.merge\_splitted\_chrms module
-----------------------------------------
.. automodule:: dgenies.bin.merge_splitted_chrms
:members:
:undoc-members:
:show-inheritance:
dgenies.bin.sort\_paf module
----------------------------
.. automodule:: dgenies.bin.sort_paf
:members:
:undoc-members:
:show-inheritance:
dgenies.bin.split\_fa module
----------------------------
.. automodule:: dgenies.bin.split_fa
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: bin
:members:
:undoc-members:
:show-inheritance:
dgenies
-------
.. automodule:: dgenies
:members:
:undoc-members:
:show-inheritance:
......@@ -7,6 +7,7 @@ Subpackages
.. toctree::
dgenies.lib
dgenies.bin
Submodules
^^^^^^^^^^
......@@ -14,3 +15,10 @@ Submodules
.. toctree::
dgenies.submodules
Module contents
^^^^^^^^^^^^^^^
.. toctree::
dgenies.content
......@@ -18,6 +18,16 @@ DEBUG = False
def launch(mode="webserver", debug=False):
"""
Launch the application
:param mode: webserver or standalone
:type mode: str
:param debug: True to enable debug mode
:type debug: bool
:return: flask app object
:rtype: Flask
"""
global app, app_title, app_folder, APP_DATA, config_reader, mailer, MODE, DEBUG
app_folder = os.path.dirname(os.path.realpath(__file__))
......
......@@ -12,6 +12,7 @@ from index import index_file
def index_fasta(name, filepath, out_dir, type_f, dofilter = True):
"""
Index and filter fasta
:param name: name of the specie
:param filepath: full path of the fasta file
:param out_dir: output folder
......
......@@ -15,6 +15,19 @@ config_reader = AppConfigReader()
def parse_upload_folders(upload_folder, now, max_age, fake=False):
"""
Parse upload folders and remove too old files and folders
:param upload_folder: upload folder path
:type upload_folder: str
:param now: current timestamp
:type now: float
:param max_age: remove all files & folders older than this age. Define it for each category
(uploads, data, error, ...)
:type max_age: dict
:param fake: if True, just print files to delete, without delete them
:type fake: bool
"""
for file in os.listdir(upload_folder):
file = os.path.join(upload_folder, file)
create_date = os.path.getctime(file)
......@@ -34,6 +47,19 @@ def parse_upload_folders(upload_folder, now, max_age, fake=False):
def parse_database(app_data, max_age, fake=False):
"""
Parse database and remove too old jobs (from database and from disk)
:param app_data: folder where jobs are stored
:type app_data: str
:param max_age: remove all files & folders older than this age. Define it for each category
(uploads, data, error, ...)
:type max_age: dict
:param fake: if True, just print files to delete, without delete them
:type fake: bool
:return: id jobs which are in the gallery (not removed independently of their age)
:rtype: list
"""
from dgenies.database import Job, Gallery
gallery_jobs = []
with Job.connect():
......@@ -60,7 +86,22 @@ def parse_database(app_data, max_age, fake=False):
return gallery_jobs
def parse_data_folders(app_data, now, max_age, gallery_jobs, fake=False):
def parse_data_folders(app_data, gallery_jobs, now, max_age, fake=False):
"""
Parse data folder and remove too old jobs
:param app_data: folder where jobs are stored
:param gallery_jobs: id of jobs which are inside the gallery
:type gallery_jobs: list
:param now: current timestamp
:type now: float
:param max_age: remove all files & folders older than this age. Define it for each category
(uploads, data, error, ...)
:type max_age: dict
:param fake: if True, just print files to delete, without delete them
:type fake: bool
:return:
"""
for file in os.listdir(app_data):
if file not in gallery_jobs and file not in ["gallery"]:
file = os.path.join(app_data, file)
......
#!/usr/bin/env python3
import os
from dgenies.lib.crons import Crons
app_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "dgenies")
crons = Crons(app_folder)
crons.clear()
#!/usr/bin/env python3
import os
import sys
from glob import glob
from dgenies.config_reader import AppConfigReader
config = AppConfigReader()
if hasattr(config, "log_dir"):
log_files = glob(os.path.join(config.log_dir, "*.log"))
for file in log_files:
os.remove(file)
else:
print("No log dir defined!")
......@@ -13,7 +13,28 @@ from Bio import SeqIO
class Filter:
"""
Filter of a fasta file: remove too small contigs
"""
def __init__(self, fasta, index_file, type_f, min_filtered=0, split=False, out_fasta=None, replace_fa=False):
"""
:param fasta: fasta file path
:type fasta: str
:param index_file: index file path
:type index_file: str
:param type_f: type of sample (query or target)
:type type_f: str
:param min_filtered: minimum number of large contigs to allow filtering
:type min_filtered: int
:param split: are contigs split
:type split: bool
:param out_fasta: output fasta file path
:type out_fasta: str
:param replace_fa: if True, replace fasta file
:type replace_fa: bool
"""
self.fasta = fasta
self.index_file = index_file
self.type_f = type_f
......@@ -26,6 +47,12 @@ class Filter:
self.replace_fa = replace_fa
def filter(self):
"""
Run filter of contigs
:return: True if success, else False
:rtype: bool
"""
f_outs = self._check_filter()
if len(f_outs) > 0:
self._filter_out(f_outs=f_outs)
......@@ -35,8 +62,9 @@ class Filter:
def _check_filter(self):
"""
Load index of fasta file, and determine contigs which must be removed. Remove them only in the index
:param index_file: index file for the fasta file
:return: list of contigs which must be removed
:rtype: list
"""
# Load contigs:
name, order, contigs, reversed_c, abs_start, c_len = Index.load(index_file=self.index_file,
......@@ -112,8 +140,9 @@ class Filter:
def _filter_out(self, f_outs):
"""
Remove too small contigs from Fasta file
:param fasta: fasta files
:param f_outs: contigs which must be filtered out
:type f_outs: list
"""
sequences = SeqIO.parse(open(self.fasta), "fasta")
keeped = (record for record in sequences if record.name not in f_outs)
......
......@@ -7,11 +7,31 @@ import gzip
class Index:
"""
Manage Fasta Index
"""
def __init__(self):
pass
@staticmethod
def load(index_file, merge_splits=False):
"""
Load index
:param index_file: index file path
:type index: str
:param merge_splits: if True, merge split contigs together
:type merge_splits: bool
:return:
* [0] sample name
* [1] contigs order
* [2] contigs size
* [3] reversed status for each contig
* [4] absolute start position for each contig
* [5] total len of the sample
:rtype: (str, list, dict, dict, dict, int)
"""
with open(index_file, "r") as idx_q_f:
abs_start = {}
abs_current_start = 0
......@@ -46,6 +66,20 @@ class Index:
@staticmethod
def save(index_file, name, contigs, order, reversed_c):
"""
Save index
:param index_file: index file path
:type index_file: str
:param name: sample name
:type name: str
:param contigs: contigs size
:type contigs: dict
:param order: contifs order
:type order: list
:param reversed_c: reversed status for each contig
:type reversed_c: dict
"""
with open(index_file, "w") as idx:
idx.write(name + "\n")
for contig in order:
......@@ -54,6 +88,23 @@ class Index:
def index_file(fasta_path, fasta_name, out, write_fa=None):
"""
Index fasta file
:param fasta_path: fasta file path
:type fasta_path: str
:param fasta_name: sample name
:type fasta_name: str
:param out: output index file
:type out: str
:param write_fa: file path of the new fasta file to write, None to don't save fasta in a new file
:type write_fa: str
:return:
* [0] True if success, else False
* [1] Number of contigs
* [2] Error message
:rtype: (bool, int, str)
"""
has_header = False
next_header = False # True if next line must be a header line
compressed = fasta_path.endswith(".gz")
......@@ -112,7 +163,7 @@ if __name__ == '__main__':
parser.add_argument('-o', '--output', type=str, required=True, help="Output index file")
args = parser.parse_args()
success, message = index_file(args.input, args.name, args.output)
success, nb_contigs, message = index_file(args.input, args.name, args.output)
if success:
print("Success!")
else:
......
......@@ -36,6 +36,11 @@ LOG_FILE = "stdout"
def _printer(*messages):
"""
print messages to stdout or to a file (according to LOG_FILE global constant)
:param messages: messages to print
"""
if DEBUG:
if LOG_FILE == "stdout":
print(*messages)
......@@ -45,6 +50,14 @@ def _printer(*messages):
def start_job(id_job, batch_system_type="local"):
"""
Start a job (mapping step)
:param id_job: job id
:type id_job: str
:param batch_system_type: local, slurm or sge
:type batch_system_type: str
"""
_printer("Start job", id_job)
with Job.connect():
job = Job.get(Job.id_job == id_job)
......@@ -56,6 +69,12 @@ def start_job(id_job, batch_system_type="local"):
def get_scheduled_local_jobs():
"""
Get list of jobs ready to be started (for local runs)
:return: list of jobs
:rtype: list
"""
all_jobs = []
with Job.connect():
jobs = Job.select().where((Job.batch_type == "local") & ((Job.status == "prepared") | (Job.status == "scheduled"))).\
......@@ -68,6 +87,12 @@ def get_scheduled_local_jobs():
def get_scheduled_cluster_jobs():
"""
Get list of jobs ready to be started (for cluster runs)
:return: list of jobs
:rtype: list
"""
all_jobs = []
with Job.connect():
jobs = Job.select().where((Job.batch_type != "local") & ((Job.status == "prepared") | (Job.status == "scheduled"))).\
......@@ -80,6 +105,12 @@ def get_scheduled_cluster_jobs():
def prepare_job(id_job):
"""
Launch job preparation of data
:param id_job: job id
:type id_job: str
"""
_printer("Prepare data for job:", id_job)
with Job.connect():
job = Job.get(Job.id_job == id_job)
......@@ -91,23 +122,47 @@ def prepare_job(id_job):
def get_prep_scheduled_jobs():
"""
Get list of jobs ready to be prepared (all data is downloaded and parsed)
:return: list of jobs
:rtype: list
"""
with Job.connect():
jobs = Job.select().where(Job.status == "waiting").order_by(Job.date_created)
return [(j.id_job, j.batch_type) for j in jobs]
def get_preparing_jobs_nb():
"""
Get number of jobs in preparation step (for local runs)
:return: number of jobs
:rtype: int
"""
with Job.connect():
return len(Job.select().where(Job.status == "preparing"))
def get_preparing_jobs_cluster_nb():
"""
Get number of jobs in preparation step (for cluster runs)
:return: number of jobs
:rtype: int
"""
with Job.connect():
return len(Job.select().where(Job.status == "preparing-cluster")), \
len(Job.select().where(Job.status == "prepare-scheduled"))
def parse_started_jobs():
"""
Parse all started jobs: check all is OK, change jobs status if needed. Look for died jobs
:return: (list of id of jobs started locally, list of id of jobs started on cluster)
:rtype: (list, list)
"""
with Job.connect():
jobs_started = [] # Only local jobs
cluster_jobs_started = [] # Only cluster jobs
......@@ -157,6 +212,9 @@ def parse_started_jobs():
def parse_uploads_asks():
"""
Parse asks for an upload: allow new uploads when other end, remove expired sessions, ...
"""
with Session.connect():
now = datetime.now()
# Get allowed:
......@@ -195,11 +253,20 @@ def parse_uploads_asks():
@atexit.register
def cleaner():
"""
Exit DRMAA session at program exit
"""
if "DRMAA_SESSION" in globals():
DRMAA_SESSION.exit()
def move_job_to_cluster(id_job):
"""
Change local job to be run on the cluster
:param id_job:
:return:
"""
with Job.connect():
job = Job.get(Job.id_job == id_job)
job.batch_type = config_reader.batch_system_type
......@@ -207,6 +274,10 @@ def move_job_to_cluster(id_job):
def parse_args():
"""
Parse command line arguments and define DEBUG and LOG_FILE constants
"""
global DEBUG, LOG_FILE
parser = argparse.ArgumentParser(description="Start local scheduler")
......
......@@ -6,7 +6,24 @@ from collections import OrderedDict
class Merger:
"""
Merge splitted contigs together in PAF file
"""
def __init__(self, paf_in, paf_out, query_in, query_out, debug=False):
"""
:param paf_in: input PAF file path
:type paf_in: str
:param paf_out: output PAF file path
:type paf_out: str
:param query_in: input query index file path
:type query_in: str
:param query_out: output query index file path
:type query_out: str
:param debug: True to enable debug mode
:type debug: bool
"""
self.paf_in = paf_in
self.paf_out = paf_out
self.query_in = query_in
......@@ -14,10 +31,19 @@ class Merger:
self.debug = debug
def _printer(self, message):
"""
Print debug messages if debug mode enabled
:param message: message to print
:type message: str
"""
if self.debug:
print(message)
def merge(self):
"""
Launch the merge
"""
self._printer("Loading query index...")
contigs, contigs_split, q_name = self.load_query_index(self.query_in)
......@@ -30,14 +56,18 @@ class Merger:
self._printer("DONE!")
@staticmethod
def _get_sorted_splits(contigs_split: dict, all_contigs: dict):
def _get_sorted_splits(contigs_split, all_contigs):
"""
For each contigs_split, save how many base we will must add to each line of the corresponding split contig in PAF
file.
Also, save the final merged contig size in all contig dict
:param contigs_split: split contigs
:type contigs_split: dict
:param all_contigs: all and final contigs
:type all_contigs: dict
:return: all contigs and new split contigs with start of each split contig set
:rtype: (dict, dict)
"""
new_contigs = {}
for contig, splits_d in contigs_split.items():
......@@ -51,6 +81,17 @@ class Merger:
return all_contigs, new_contigs
def load_query_index(self, index):
"""
Load query index
:param index: index file path
:type index: str
:return:
* [0] contigs length
* [1] splitted contigs length
* [2] sample name
:rtype: (dict, dict, str)
"""
contigs = OrderedDict()
contigs_split = {}
with open(index) as idx_f:
......@@ -75,7 +116,17 @@ class Merger:
@staticmethod
def write_query_index(index: str, contigs: dict, q_name: str):
def write_query_index(index, contigs, q_name):
"""
Save new query index
:param index: index file path
:type index: str
:param contigs: contigs size
:type contigs: dict
:param q_name: sample name
:type q_name: str
"""
with open(index, "w") as idx_f:
idx_f.write(q_name + "\n")
for contig_name, contig_len in contigs.items():
......@@ -83,6 +134,18 @@ class Merger:
@staticmethod
def merge_paf(paf_in, paf_out, contigs, contigs_split):
"""
Do merge PAF staff
:param paf_in: path of input PAF with split contigs
:type paf_in: str
:param paf_out: path of output PAF where split contigs are now merged together
:type paf_out: str