Commit a5fb93db authored by Floreal Cabanettes's avatar Floreal Cabanettes
Browse files

Add docstrings to job_manager module + some refactoring

parent e58053ee
......@@ -163,3 +163,5 @@ texinfo_documents = [
# -- Extension configuration -------------------------------------------------
autodoc_default_flags = ['private-members']
dgenies package
===============
Packages & Modules
------------------
Subpackages
-----------
^^^^^^^^^^^
.. toctree::
dgenies.lib
Submodules
----------
^^^^^^^^^^
.. toctree::
......
......@@ -6,12 +6,6 @@
Welcome to D-Genies's documentation!
====================================
.. toctree::
:maxdepth: 2
:caption: Contents:
Dot plots are widely used to quickly compare sequence sets. They provide a synthetic similarity overview, highlighting repetitions, breaks and inversions. Different tools have been developed to easily generated genomic alignment dot plots, but they are often limited in the input sequence size. D-GENIES is a standalone and web application performing large genome alignments using minimap2 software package and generating interactive dot plots. It enables users to sort query sequences along the reference, zoom in the plot and download several image, alignment or sequence files. D-GENIES is an easy-to-install, open-source software package (GPL) developed in Python and JavaScript. The source code is available at https://github.com/genotoul-bioinfo/dgenies and it can be tested at http://dgenies.toulouse.inra.fr/.
......
Packages & Modules
------------------
.. toctree::
.. toctree::
:maxdepth: 4
dgenies
dgenies.rst
......@@ -39,6 +39,9 @@ if MODE == "webserver":
class JobManager:
"""
Jobs management
"""
def __init__(self, id_job: str, email: str=None, query: Fasta=None, target: Fasta=None, mailer=None,
tool="minimap2", align: Fasta=None, backup: Fasta=None):
......@@ -70,16 +73,33 @@ class JobManager:
def do_align(self):
"""
Check if we have to make alignment
:return: True if the job is launched with an alignment file
"""
return not os.path.exists(os.path.join(self.output_dir, ".align"))
@staticmethod
def is_gz_file(filepath):
"""
Check if a file is gzipped
:param filepath: file to check
:type filepath: str
:return: True if gzipped, else False
"""
with open(filepath, 'rb') as test_f:
return binascii.hexlify(test_f.read(2)) == b'1f8b'
def get_file_size(self, filepath: str):
"""
Get file size
:param filepath: file path
:type filepath: str
:return: file size (bytes)
:rtype: int
"""
file_size = os.path.getsize(filepath)
if filepath.endswith(".gz") and file_size <= self.config.max_upload_size:
with gzip.open(filepath, 'rb') as file_obj:
......@@ -87,6 +107,12 @@ class JobManager:
return file_size
def get_query_split(self):
"""
Get query split fasta file
:return: split query fasta file
:rtype: str
"""
if not self.tool.split_before:
return self.query.get_path()
query_split = os.path.join(self.output_dir, "split_" + os.path.basename(self.query.get_path()))
......@@ -95,6 +121,9 @@ class JobManager:
return query_split
def set_inputs_from_res_dir(self):
"""
Sets inputs (query, target, ...) from job dir
"""
res_dir = os.path.join(self.config.app_data, self.id_job)
query_file = os.path.join(res_dir, ".query")
if os.path.exists(query_file):
......@@ -128,6 +157,12 @@ class JobManager:
self.aln_format = os.path.splitext(file_path)[1][1:]
def check_job_success(self):
"""
Check if a job succeed
:return: status of a job: succeed, no-match or fail
:rtype: str
"""
if os.path.exists(self.paf_raw):
if os.path.getsize(self.paf_raw) > 0:
return "succeed"
......@@ -136,12 +171,35 @@ class JobManager:
return "fail"
def is_query_filtered(self):
"""
Check if query has been filtered
:return: True if filtered, else False
"""
return os.path.exists(os.path.join(self.output_dir, ".filter-query"))
def is_target_filtered(self):
"""
Check if target has been filtered
:return: True if filtered, else False
:return:
"""
return os.path.exists(os.path.join(self.output_dir, ".filter-target"))
def get_mail_content(self, status, target_name, query_name=None):
"""
Build mail content for status mail
:param status: job status
:type status: str
:param target_name: name of target
:type target_name: str
:param query_name: name of query
:type query_name: str
:return: mail content
:rtype: str
"""
message = "D-Genies\n\n"
if status == "success":
message += "Your job %s was completed successfully!\n\n" % self.id_job
......@@ -175,6 +233,18 @@ class JobManager:
return message
def get_mail_content_html(self, status, target_name, query_name=None):
"""
Build mail content as HTML
:param status: job status
:type status: str
:param target_name: name of target
:type target_name: str
:param query_name: name of query
:type query_name: str
:return: mail content (html)
:rtype: str
"""
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), "mail_templates", "job_notification.html"))\
as t_file:
template = Template(t_file.read())
......@@ -185,12 +255,24 @@ class JobManager:
target_filtered=self.is_target_filtered(), query_filtered=self.is_query_filtered())
def get_mail_subject(self, status):
"""
Build mail subject
:param status: job status
:type status: str
:return: mail subject
:rtype: str
"""
if status == "success" or status == "no-match":
return "DGenies - Job completed: %s" % self.id_job
else:
return "DGenies - Job failed: %s" % self.id_job
def send_mail(self):
"""
Send mail
"""
# Retrieve infos:
with Job.connect():
job = Job.get(Job.id_job == self.id_job)
......@@ -217,6 +299,12 @@ class JobManager:
message_html=self.get_mail_content_html(status, target_name, query_name))
def search_error(self):
"""
Search for an error in the log file (for local runs). If no error found, returns a generic error message
:return: error message to give to the user
:rtype: str
"""
logs = os.path.join(self.output_dir, "logs.txt")
if os.path.exists(logs):
lines = subprocess.check_output(['tail', '-2', logs]).decode("utf-8").split("\n")
......@@ -228,7 +316,13 @@ class JobManager:
"<br/>You can contact the support for more information."
return "Your job #ID# has failed. You can try again.<br/>If the problem persists, please contact the support."
def __launch_local(self):
def _launch_local(self):
"""
Launch a job on the current machine
:return: True if job succeed, else False
:rtype: bool
"""
if MODE == "webserver":
cmd = ["/usr/bin/time", "-f", "%e %M"]
else:
......@@ -285,7 +379,8 @@ class JobManager:
def check_job_status_slurm(self):
"""
Check status of a SLURM job run
:return: True if the job has successfully ended
:return: True if the job has successfully ended, else False
"""
status = subprocess.check_output("sacct -p -n --format=state,maxvmsize,elapsed -j %s.batch" % self.id_process,
shell=True).decode("utf-8").strip("\n")
......@@ -305,7 +400,8 @@ class JobManager:
def check_job_status_sge(self):
"""
Check status of a SGE job run
:return: True if the job jas successfully ended
:return: True if the job jas successfully ended, else False
"""
status = "-1"
start = None
......@@ -338,6 +434,12 @@ class JobManager:
return status == "0"
def update_job_status(self, status, id_process=None):
"""
Update job status
:param status: new status
:param id_process: system process id
"""
if MODE == "webserver":
with Job.connect():
job = Job.get(Job.id_job == self.id_job)
......@@ -350,6 +452,13 @@ class JobManager:
@staticmethod
def find_error_in_log(log_file):
"""
Find error in log (for cluster run)
:param log_file: log file of the job
:return: error (empty if no error)
:rtype: str
"""
error = ""
with open(log_file, "r") as log:
for line in log:
......@@ -359,6 +468,24 @@ class JobManager:
return error
def launch_to_cluster(self, step, batch_system_type, command, args, log_out, log_err):
"""
Launch a program to the cluster
:param step: step (prepare, start)
:type step: str
:param batch_system_type: slurm or sge
:type batch_system_type: str
:param command: program to launch (without arguments)
:type command: str
:param args: arguments to use for the program
:type args: list
:param log_out: log file for stdout
:type log_out: str
:param log_err: log file for stderr
:type log_err: str
:return: True if succeed, else False
:rtype: bool
"""
import drmaa
from dgenies.lib.drmaasession import DrmaaSession
drmaa_session = DrmaaSession()
......@@ -432,7 +559,14 @@ class JobManager:
s.deleteJobTemplate(jt)
return False
def __launch_drmaa(self, batch_system_type):
def _launch_drmaa(self, batch_system_type):
"""
Launch the mapping step to a cluster
:param batch_system_type: slurm or sge
:return: True if job succeed, else False
"""
if self.query is not None:
args = re.sub("{exe}\s?", "", self.tool.command_line).replace("{query}", self.get_query_split())
else:
......@@ -454,7 +588,17 @@ class JobManager:
log_out=out_file,
log_err=self.logs)
def __getting_local_file(self, fasta: Fasta, type_f):
def _getting_local_file(self, fasta, type_f):
"""
Copy temp file to its final location
:param fasta: fasta file Object
:type fasta: Fasta
:param type_f: query or target
:type type_f: str
:return: final full path of the file
:rtype: str
"""
finale_path = os.path.join(self.output_dir, type_f + "_" + os.path.basename(fasta.get_path()))
if fasta.is_example():
shutil.copy(fasta.get_path(), finale_path)
......@@ -464,7 +608,15 @@ class JobManager:
save_file.write(finale_path)
return finale_path
def __get_filename_from_url(self, url):
def _get_filename_from_url(self, url):
"""
Retrieve filename from an URL (http or ftp)
:param url: url of the file to download
:type url: str
:return: filename
:rtype: str
"""
if url not in self._filename_for_url:
if url.startswith("ftp://"):
self._filename_for_url[url] = url.split("/")[-1]
......@@ -475,7 +627,15 @@ class JobManager:
return self._filename_for_url[url]
def _download_file(self, url):
local_filename = os.path.join(self.output_dir, self.__get_filename_from_url(url))
"""
Download a file from an URL
:param url: url of the file to download
:type url: str
:return: absolute path of the downloaded file
:rtype: str
"""
local_filename = os.path.join(self.output_dir, self._get_filename_from_url(url))
# NOTE the stream=True parameter
if url.startswith("ftp://"):
urlretrieve(url, local_filename)
......@@ -488,16 +648,20 @@ class JobManager:
# f.flush() commented by recommendation from J.F.Sebastian
return local_filename
def __getting_file_from_url(self, fasta: Fasta, type_f):
def _getting_file_from_url(self, fasta, type_f):
"""
Download file from URL
:param fasta: Fasta object describing the input file {Fasta}
:param type_f: type of the file (query or target) {str}
:return: Tuple:
[0] True if no error happened, else False
[1] If an error happened, True if the error was saved for the job, else False (will be saved later)
[2] Finale path of the downloaded file {str}
[3] Name of the downloaded file {str}
:param fasta: Fasta object describing the input file
:type fasta: Fasta
:param type_f: type of the file (query or target)
:type type_f: str
:return:
* [0] True if no error happened, else False
* [1] If an error happened, True if the error was saved for the job, else False (will be saved later)
* [2] Finale path of the downloaded file {str}
* [3] Name of the downloaded file {str}
:rtype: tuple
"""
try:
dl_path = self._download_file(fasta.get_path())
......@@ -522,10 +686,20 @@ class JobManager:
save_file.write(finale_path)
return True, False, finale_path, name
def __check_url(self, fasta: Fasta, formats: tuple):
def _check_url(self, fasta, formats):
"""
Check if an URL is valid, and if the file is valid too
:param fasta: fasta file object
:type fasta: Fasta
:param formats: allowed file formats
:type formats: tuple
:return: True if URL and file are valid, else False
:rtype: bool
"""
url = fasta.get_path()
try:
filename = self.__get_filename_from_url(url)
filename = self._get_filename_from_url(url)
except (ConnectionError, URLError):
status = "fail"
error = "<p>Url <b>%s</b> is not valid!</p>" \
......@@ -586,10 +760,19 @@ class JobManager:
return allowed
def clear(self):
"""
Remove job dir
"""
shutil.rmtree(self.output_dir)
@staticmethod
def get_pending_local_number():
"""
Get number of of jobs running or waiting for a run
:return: number of jobs
:rtype: int
"""
if MODE == "webserver":
with Job.connect():
return len(Job.select().where((Job.batch_type == "local") & (Job.status != "success") &
......@@ -598,6 +781,14 @@ class JobManager:
return 0
def set_job_status(self, status, error=""):
"""
Change status of a job
:param status: new job status
:type status: str
:param error: error description (if any)
:type error: str
"""
if MODE == "webserver":
job = Job.get(Job.id_job == self.id_job)
job.status = status
......@@ -608,6 +799,7 @@ class JobManager:
def check_file(self, input_type, should_be_local, max_upload_size_readable):
"""
Check if file is correct: format, size, valid gzip
:param input_type: query or target
:param should_be_local: True if job should be treated locally
......@@ -654,6 +846,17 @@ class JobManager:
return True, False, should_be_local
def download_files_with_pending(self, files_to_download, should_be_local, max_upload_size_readable):
"""
Download files from URLs, with pending (according to the max number of concurrent downloads)
:param files_to_download: files to download. For each item of the list, it's a list with 2 elements: first one
is the Fasta object, second one the input type (query or target)
:type files_to_download: list of list
:param should_be_local: True if the job should be run locally (according to input file sizes), else False
:type should_be_local: bool
:param max_upload_size_readable: Human readable max upload size (to show on errors)
:type max_upload_size_readable: str
"""
with Job.connect():
status = "getfiles-waiting"
if MODE == "webserver":
......@@ -685,7 +888,7 @@ class JobManager:
job.status = "getfiles"
job.save()
for file, input_type in files_to_download:
correct, error_set, finale_path, filename = self.__getting_file_from_url(file, input_type)
correct, error_set, finale_path, filename = self._getting_file_from_url(file, input_type)
if not correct:
break
my_input = getattr(self, input_type)
......@@ -713,10 +916,12 @@ class JobManager:
def getting_files(self):
"""
Get files for the job
:return: Tuple:
[0] True if getting files succeed, False else
[1] If error happenned, True if error already saved for the job, False else (error will be saved later)
[2] True if no data must be downloaded (will be downloaded with pending if True)
:return:
* [0] True if getting files succeed, False else
* [1] If error happenned, True if error already saved for the job, False else (error will be saved later)
* [2] True if no data must be downloaded (will be downloaded with pending if True)
:rtype: tuple
"""
with Job.connect():
status = "getfiles"
......@@ -733,41 +938,41 @@ class JobManager:
files_to_download = []
if self.query is not None:
if self.query.get_type() == "local":
self.query.set_path(self.__getting_local_file(self.query, "query"))
self.query.set_path(self._getting_local_file(self.query, "query"))
correct, error_set, should_be_local = self.check_file("query", should_be_local,
max_upload_size_readable)
if not correct:
return False, error_set, True
elif self.__check_url(self.query, ("fasta",) if self.align is None else ("fasta", "idx")):
elif self._check_url(self.query, ("fasta",) if self.align is None else ("fasta", "idx")):
files_to_download.append([self.query, "query"])
else:
return False, True, True
if correct and self.target is not None:
if self.target.get_type() == "local":
self.target.set_path(self.__getting_local_file(self.target, "target"))
self.target.set_path(self._getting_local_file(self.target, "target"))
correct, error_set, should_be_local = self.check_file("target", should_be_local,
max_upload_size_readable)
if not correct:
return False, error_set, True
elif self.__check_url(self.target, ("fasta",) if self.align is None else ("fasta", "idx")):
elif self._check_url(self.target, ("fasta",) if self.align is None else ("fasta", "idx")):
files_to_download.append([self.target, "target"])
else:
return False, True, True
if correct and self.align is not None:
if self.align.get_type() == "local":
self.align.set_path(self.__getting_local_file(self.align, "align"))
self.align.set_path(self._getting_local_file(self.align, "align"))
correct, error_set, should_be_local = self.check_file("align", should_be_local,
max_upload_size_readable)
elif self.__check_url(self.align, ("map",)):
elif self._check_url(self.align, ("map",)):
files_to_download.append([self.align, "align"])
else:
return False, True, True
if correct and self.backup is not None:
if self.backup.get_type() == "local":
self.backup.set_path(self.__getting_local_file(self.backup, "backup"))
self.backup.set_path(self._getting_local_file(self.backup, "backup"))
correct, error_set, should_be_local = self.check_file("backup", should_be_local,
max_upload_size_readable)
elif self.__check_url(self.backup, ("backup",)):
elif self._check_url(self.backup, ("backup",)):
files_to_download.append([self.backup, "backup"])
else:
return False, True, True
......@@ -790,7 +995,7 @@ class JobManager:
def send_mail_post(self):
"""
Send mail using POST url (we have no access to mailer)
Send mail using POST url (if there is no access to mailer)
"""
key = Functions.random_string(15)
key_file = os.path.join(self.config.app_data, self.id_job, ".key")
......@@ -803,14 +1008,31 @@ class JobManager:
print("Job %s: Send mail failed!" % self.id_job)
def run_job_in_thread(self, batch_system_type="local"):
"""
Run a job asynchronously into a new thread
:param batch_system_type: slurm or sge
:type batch_system_type: str
"""
thread = threading.Timer(1, self.run_job, kwargs={"batch_system_type": batch_system_type})
thread.start() # Start the execution
def prepare_data_in_thread(self):
"""
Prepare data in a new thread
"""
thread = threading.Timer(1, self.prepare_data)
thread.start() # Start the execution
def prepare_data_cluster(self, batch_system_type):
"""
Launch of prepare data on a cluster
:param batch_system_type: slurm or sge
:type batch_system_type: str
:return: True if succeed, else False
:rtype: bool
"""
args = [self.config.cluster_prepare_script,
"-t", self.target.get_path(),
"-m", self.target.get_name(),
......@@ -832,6 +1054,9 @@ class JobManager:
return success
def prepare_data_local(self):
"""
Prepare data locally. On standalone mode, launch job after, if success.
"""
with open(self.preptime_file, "w") as ptime, Job.connect():
self.set_job_status("preparing")
ptime.write(str(round(time.time())) + "\n")
......@@ -912,6 +1137,9 @@ class JobManager:
self.run_job("local")
def _end_of_prepare_dotplot(self):
"""
Tasks done after preparing dot plot data: parse & sort of alignment file
"""
# Parse alignment file:
if hasattr(parsers, self.aln_format):
getattr(parsers, self.aln_format)(self.align.get_path(), self.paf_raw)
......@@ -942,7 +1170,9 @@ class JobManager:
def prepare_dotplot_cluster(self, batch_system_type):
"""
Prepare data if alignment already done: just index the fasta (if index not given), then parse the alignment
:param batch_system_type: type of cluster
:param batch_system_type: type of cluster (slurm or sge)
:type batch_system_type: str
"""