Skip to content
Snippets Groups Projects
Commit 16d1da7b authored by Floreal Cabanettes's avatar Floreal Cabanettes
Browse files

Add wrapper script

parent fe72f650
No related branches found
No related tags found
No related merge requests found
.idea
application.properties
**/__pycache__
**/*.pyc
**/*.pyo
[global]
# batch system type: local, slurm or sge
batch_system_type = local
# list of modules to load (space separated)
modules =
# path environment to prepend (":" separated)
paths =
# number of concurrent jobs to launch
jobs = 999
# svtoolkit home:
sv_dir =
[cluster]
# Ignore these options for local batch system type
# submission mode: drmaa or cluster
submission_mode = drmaa
# cluster submission command (ignore for DRMAA submission mode)
submission_command =
# DRMAA lib (ignore for cluster submission mode)
drmaa =
# native submission commands: keep default on most cases
native_submission_options = ###DEFAULT###
# cluster config file
config = ###PROGRAM###/cluster.yaml
\ No newline at end of file
......@@ -4,6 +4,7 @@ __default__:
name: JOBNAME.{rule}
output: logs/cluster/{rule}.out
error: logs/cluster/{rule}.err
time: "24:00:00"
delly:
mem: 16
......
#!/usr/bin/env python3
import os
import pysam
from config_reader import AppConfigReader, MissingOption
TOOLS = ["lumpy", "delly", "pindel", "genomestrip"]
def link_bams_wdir(samples_file, bams_dir, smple_file):
"""
Link each BAM file in the data dir and write the new sample file
:param samples_file: file listing full path of all bam files, given as input by the user
:type samples_file: str
:param bams_dir: folder which will contain final bam files (as links)
:type bams_dir: str
:param smple_file: final sample files listing all samples, by name (without path)
:type smple_file: str
"""
with open(samples_file, "r") as samples, open(smple_file, "w") as final_samples:
for sample in samples:
sample = sample.rstrip()
if sample == "":
continue
if not os.path.isfile(sample):
raise ValueError("Sample file '%s' does not exists" % sample)
bam = pysam.AlignmentFile(sample)
if "RG" not in bam.header:
raise ValueError("Sample file '%s' has no RG tag in headers")
samples_list = bam.header['RG']
if len(samples_list) > 1:
raise ValueError("Sample file '%s' has more than 1 sample" % sample)
elif len(samples_list) == 0:
raise ValueError("Sample file '%s' has none sample" % sample)
rg_headers = bam.header['RG'][0]
if 'ID' not in rg_headers:
raise ValueError("Sample file '%s' had not ID in RG tag header" % sample)
id_sample = rg_headers["ID"]
final_file = os.path.join(bams_dir, id_sample + ".bam")
if not os.path.exists(final_file):
os.symlink(sample, final_file)
final_samples.write(id_sample + "\n")
def run(reference, tools, samples, out):
"""
Run a workflow
:param reference: reference genome file
:type reference: str
:param tools: list of tools
:type tools: list
:param samples: file containing all samples
:type samples: str
:param out: output folder
:type out: str
"""
try:
if not os.path.isfile(reference):
raise ValueError("Reference file does not exists")
if type(tools) != list and type(tools) != tuple:
raise TypeError("Tools must be a list")
for tool in tools:
if tool not in tools:
raise ValueError("Tool not available: %s" % tool)
if not os.path.isfile(samples):
raise ValueError("Sample file does not exists")
if not os.path.isdir(out):
if os.path.exists(out):
raise ValueError("Output dir exists but is not a folder")
os.makedirs(out)
bams_dir = os.path.join(out, "data", "bams")
if not os.path.exists(bams_dir):
os.makedirs(bams_dir)
final_sample_file = os.path.join(out, "samples.list")
link_bams_wdir(samples_file=samples, bams_dir=bams_dir, smple_file=final_sample_file)
ref_dir = os.path.join(out, "data", "genome")
if not os.path.exists(ref_dir):
os.makedirs(ref_dir)
final_reference = os.path.join(ref_dir, os.path.basename(reference))
if not os.path.exists(final_reference):
os.symlink(reference, final_reference)
app_dir = os.path.dirname(os.path.realpath(__file__))
config = AppConfigReader()
commands = []
if len(config.modules) > 0:
commands.append("module load %s" % config.modules)
if len(config.paths) > 0:
commands.append("export PATH=%s:$PATH")
commands.append("export SV_DIR=\"%s\"" % config.sv_dir)
snk_command = "snakemake --jobs {jobs} -s {snakefile}".format(
jobs=config.n_jobs, snakefile=os.path.join(app_dir, "snakecnv", "Snakefile")
)
if config.batch_system_type != "local":
if config.cluster_submission_mode == "drmaa":
commands.append("export DRMAA_LIBRARY_PATH=\"%s\"" % config.cluster_drmaa_lib)
cluster_options = "--drmaa \""
else:
cluster_options = "--cluster \"" + config.cluster_submission_command + " "
cluster_options += config.cluster_native_submission_options + "\""
snk_command += " --cluster-config {clusterconf} {clusteroptions}".format(
clusterconf=config.cluster_config_file, clusteroptions=cluster_options
)
snk_command += " --config wdir={wdir} sample_file={sample_file} genome={genome} tools={tools}".format(
wdir=out, sample_file=final_sample_file, genome=final_reference, tools=",".join(tools)
)
commands.append(snk_command)
print("; ".join(commands))
os.system("; ".join(commands))
except (ValueError, TypeError, MissingOption) as e:
print("\033[31m\033[1mAn error has occurred:\n%s\033[0m" % str(e))
exit(1)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Run cnv pipelines")
subparsers = parser.add_subparsers()
run_parser = subparsers.add_parser("run", help="Run a workflow")
run_parser.add_argument('-r', '--reference', type=str, required=True, help="Reference fasta file")
run_parser.add_argument('-t', '--tools', type=str, required=True, help="Tools to launch, coma separated",
nargs="+", choices=TOOLS)
run_parser.add_argument('-s', '--samples', type=str, required=True, help="File containing all samples")
run_parser.add_argument('-o', '--out', type=str, required=True, help="Output folder")
run_parser.set_defaults(func=run)
args = parser.parse_args()
args.func(**{k: v for k, v in vars(args).items() if k in args.func.__code__.co_varnames})
import os
import sys
import inspect
from pathlib import Path
from configparser import RawConfigParser, NoOptionError, NoSectionError
class Singleton:
def __init__(self, klass):
self.klass = klass
self.instance = None
def __call__(self, *args, **kwds):
if self.instance is None:
self.instance = self.klass(*args, **kwds)
return self.instance
class MissingOption(Exception):
pass
@Singleton
class AppConfigReader:
"""
Store all configs
"""
def __init__(self):
"""
All "get_*" functions results are stored in the "self.*" corresponding attribute
Example: results of the get_upload_folder function is stored in self.upload_folder
"""
self.app_dir = os.path.dirname(inspect.getfile(self.__class__))
config_file = []
config_file_search = [os.path.join(self.app_dir, "application.properties"),
os.path.join(str(Path.home()), ".cnvpipelines", "application.properties")]
for my_config_file in config_file_search:
if os.path.exists(my_config_file):
config_file.append(my_config_file)
if len(config_file) == 0:
raise FileNotFoundError("ERROR: application.properties not found.")
self.reader = RawConfigParser()
self.reader.read(config_file)
for attr in dir(self):
attr_o = getattr(self, attr)
if attr.startswith("_get_") and callable(attr_o):
setattr(self, attr[5:], attr_o())
def _replace_vars(self, path):
return path.replace("###USER###", os.path.expanduser("~"))\
.replace("###PROGRAM###", self.app_dir)\
.replace("###SYSEXEC###", os.path.dirname(sys.executable))
def _get_batch_system_type(self):
try:
return self.reader.get("global", "batch_system_type")
except (NoSectionError, NoOptionError):
return "local"
def _get_modules(self):
try:
return self.reader.get("global", "modules")
except (NoSectionError, NoOptionError):
return ""
def _get_paths(self):
try:
return self.reader.get("global", "paths")
except (NoSectionError, NoOptionError):
return ""
def _get_n_jobs(self):
try:
return int(self.reader.get("global", "jobs"))
except (NoSectionError, NoOptionError):
return 999
def _get_sv_dir(self):
try:
return self.reader.get("global", "sv_dir")
except (NoSectionError, NoOptionError):
return ""
def _get_cluster_submission_mode(self):
try:
return self.reader.get("cluster", "submission_mode")
except (NoSectionError, NoOptionError):
return "cluster"
def _get_cluster_submission_command(self):
try:
command = self.reader.get("cluster", "submission_command")
if command == "" and self._get_batch_system_type() != "local" and \
self._get_cluster_submission_mode() == "cluster":
raise MissingOption("Application.properties: With cluster submission mode, "
"submission_command in required")
return command
except (NoSectionError, NoOptionError):
if self._get_batch_system_type() != "local" and self._get_cluster_submission_mode() == "cluster":
raise MissingOption("Application.properties: With cluster submission mode, "
"submission_command in required")
return None
def _get_cluster_drmaa_lib(self):
try:
lib = self.reader.get("cluster", "drmaa")
if lib == "" and self._get_batch_system_type() != "local" and \
self._get_cluster_submission_mode() == "drmaa":
raise MissingOption("Application.properties: please specify the DRMAA lib path or use the cluster "
"submission mode")
return lib
except (NoSectionError, NoOptionError):
if self._get_batch_system_type() != "local" and self._get_cluster_submission_mode() == "drmaa":
raise MissingOption("Application.properties: please specify the DRMAA lib path or use the cluster "
"submission mode")
return None
def _get_cluster_native_submission_options(self):
try:
natives = self.reader.get("cluster", "native_submission_options")
except (NoSectionError, NoOptionError):
natives = "###DEFAULT###"
if natives == "###DEFAULT###":
batch_type = self._get_batch_system_type()
if batch_type == "slurm":
return " --mem-per-cpu={cluster.mem}000 --ntasks={cluster.n} --time={cluster.time}"
elif batch_type == "sge":
return " -l mem={cluster.mem},hvem={cluster.mem} -pe parallel_smp {cluster.n}"
return None
return natives
def _get_cluster_config_file(self):
try:
config = self._replace_vars(self.reader.get("cluster", "config"))
if config == "" and self._get_batch_system_type() != "local":
raise MissingOption("Application.properties: please specify a cluster config file or use a"
"local batch system type")
return config
except (NoSectionError, NoOptionError):
if self._get_batch_system_type() != "local":
raise MissingOption("Application.properties: please specify a cluster config file or use a"
"local batch system type")
return None
**/__pycache__
**/*.pyc
**/*.pyo
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment