Skip to content
Snippets Groups Projects
Commit 1bc65f06 authored by Thomas Faraut's avatar Thomas Faraut
Browse files

Merge branch 'master' of forgemia.inra.fr:genotoul-bioinfo/cnvpipelines

parents ec8cf0ac 5fc613df
No related branches found
No related tags found
No related merge requests found
...@@ -12,6 +12,7 @@ import hashlib ...@@ -12,6 +12,7 @@ import hashlib
import traceback import traceback
import math import math
import time import time
import sys
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from subprocess import run from subprocess import run
...@@ -54,6 +55,9 @@ class CnvPipeline: ...@@ -54,6 +55,9 @@ class CnvPipeline:
self._wf_type = "unknown" self._wf_type = "unknown"
self._load_wf_type() self._load_wf_type()
self._filename_for_url = {} self._filename_for_url = {}
self.isrerun = False
self.config_timestamp = None
self.config_data = None
@property @property
def wf_type(self): def wf_type(self):
...@@ -81,12 +85,10 @@ class CnvPipeline: ...@@ -81,12 +85,10 @@ class CnvPipeline:
hash_md5.update(chunk) hash_md5.update(chunk)
return hash_md5.hexdigest() return hash_md5.hexdigest()
def _write_samples(self, samples, sample_file): def _write_samples(self, samples, sample_file):
with open(sample_file, "w") as final_samples: with open(sample_file, "w") as final_samples:
final_samples.write("\n".join(samples) + "\n") final_samples.write("\n".join(samples) + "\n")
def _link_bams_wdir(self, samples_file, bams_dir, smple_file, nb_samples): def _link_bams_wdir(self, samples_file, bams_dir, smple_file, nb_samples):
""" """
Link each BAM file in the data dir and write the new sample file Link each BAM file in the data dir and write the new sample file
...@@ -167,6 +169,11 @@ class CnvPipeline: ...@@ -167,6 +169,11 @@ class CnvPipeline:
print("\033[31m\033[1mAn error has occurred:\n%s\033[0m" % message) print("\033[31m\033[1mAn error has occurred:\n%s\033[0m" % message)
exit(1) exit(1)
def _get_wdir_type(self):
with open(os.path.join(self.wdir, ".wf"), "r") as f:
wf_type = f.read().rstrip()
return wf_type
def _check_wdir(self, force=False): def _check_wdir(self, force=False):
""" """
Check working dir at run: search for an existing workflow Check working dir at run: search for an existing workflow
...@@ -176,22 +183,41 @@ class CnvPipeline: ...@@ -176,22 +183,41 @@ class CnvPipeline:
accepted_values = ["y", "n", ""] accepted_values = ["y", "n", ""]
confirm = None confirm = None
while confirm not in accepted_values: while confirm not in accepted_values:
if self.wf_type != self._get_wdir_type():
print("Error: working directory already exists and is not a " + self.wf_type +
" job. Exiting...", file=sys.stderr)
exit(1)
if confirm is not None: if confirm is not None:
print("Invalid choice!") print("Invalid choice!")
else:
message = "Working dir %s already exists." % self.wdir
if self.wf_type == "detection":
message += " We will use previous samples and genome and ignore given ones."
elif self.wf_type == "simulation":
print("Error: working directory already exists. "
"Not allowed for simulation jobs. Exiting...", file=sys.stderr)
exit(1)
print(message)
if force: if force:
print("WARN: Working dir %s already exists. We erase all files (force mode)" % self.wdir) print("WARN: Continue without confirmation (forced mode)" % self.wdir)
confirm = "y" confirm = "y"
else: else:
confirm = input("Working dir %s already exists. " confirm = input("Continue? [y/N] ").lower()
"Continue will erase previous run. Continue? [y/N] " % self.wdir).lower()
if confirm == "n" or confirm == "": if confirm == "n" or confirm == "":
print("Run canceled.") print("Run canceled.")
exit(0) exit(0)
self.isrerun = True
else: else:
if os.path.exists(self.wdir): if os.path.exists(self.wdir):
raise ValueError("Output dir exists but is not a folder") raise ValueError("Output dir exists but is not a folder")
os.makedirs(self.wdir) os.makedirs(self.wdir)
if self.isrerun:
self.config_timestamp = datetime.fromtimestamp(os.stat(self.config_file).st_ctime).\
strftime("%Y-%m-%d %H:%M:%S")
with open(self.config_file, "r") as cfg:
self.config_data = yaml.safe_load(cfg)
def _prerun_commands(self): def _prerun_commands(self):
""" """
Set commands to launch before running snakemake Set commands to launch before running snakemake
...@@ -378,14 +404,15 @@ class CnvPipeline: ...@@ -378,14 +404,15 @@ class CnvPipeline:
:param reason: if True, show reason of each executed rule :param reason: if True, show reason of each executed rule
:type reason: bool :type reason: bool
""" """
if mode == "rerun" : if self.isrerun:
if self.wf_type == "detection" and len(snake_args) == 0: run(["touch", self.config_file, "-d", self.config_timestamp])
if mode == "rerun" and self.wf_type == "detection" and len(snake_args) == 0:
local_vars = locals() local_vars = locals()
del local_vars["self"] del local_vars["self"]
del local_vars["snake_args"] del local_vars["snake_args"]
return self.run_or_rerun_detection(**local_vars) return self.run_or_rerun_detection(**local_vars)
# Fix snakemake bug: make reference files timestamp to the config.yaml timestamp value # Fix snakemake bug: make reference files timestamp to the config.yaml timestamp value
if self.wf_type == "simulation" or self.wf_type == "refbundle": if (mode == "rerun" or self.isrerun) and (self.wf_type == "simulation" or self.wf_type == "refbundle"):
reference = os.path.join(self.wdir, "reference.fasta") reference = os.path.join(self.wdir, "reference.fasta")
fixed_date = datetime.fromtimestamp(os.stat(self.config_file).st_ctime).strftime("%Y-%m-%d %H:%M:%S") fixed_date = datetime.fromtimestamp(os.stat(self.config_file).st_ctime).strftime("%Y-%m-%d %H:%M:%S")
if os.path.exists(reference): if os.path.exists(reference):
...@@ -576,6 +603,32 @@ class CnvPipeline: ...@@ -576,6 +603,32 @@ class CnvPipeline:
self._fail("No valid chromosomes found in your reference. Are all your chromosomes smaller than 500 kb?") self._fail("No valid chromosomes found in your reference. Are all your chromosomes smaller than 500 kb?")
return chromosomes return chromosomes
def _rerun_tools(self, new_tools):
"""
Remove post-parsed steps on rerun detection if tools changed
:param new_tools:
:return:
"""
old_tools = self.config_data["tools"]
tools_changed = False
if len(old_tools) == len(new_tools):
for tool in new_tools:
if tool not in old_tools:
tools_changed = True
break
else:
tools_changed = True
if tools_changed:
for folder in glob(os.path.join(self.wdir, "batch*"), recursive=False):
for fold in ("merge", "genotypes", "filtered"):
t_folder = os.path.join(folder, fold)
if os.path.exists(t_folder):
if not os.path.isdir(t_folder):
raise ValueError(t_folder + "exists but is not a folder")
shutil.rmtree(t_folder)
def run_detection(self, reference, tools, samples, variant_types, chromosomes="all", force_wdir=False, batches=-1, def run_detection(self, reference, tools, samples, variant_types, chromosomes="all", force_wdir=False, batches=-1,
batches_parallel=5, refbundle=None, force_all_chr=False, **kwargs): batches_parallel=5, refbundle=None, force_all_chr=False, **kwargs):
""" """
...@@ -631,21 +684,28 @@ class CnvPipeline: ...@@ -631,21 +684,28 @@ class CnvPipeline:
final_reference = os.path.join(self.wdir, "reference.fasta") final_reference = os.path.join(self.wdir, "reference.fasta")
final_fai = final_reference + ".fai" final_fai = final_reference + ".fai"
if not os.path.exists(final_reference):
os.symlink(reference, final_reference) if not self.isrerun:
fai = reference + ".fai" if not os.path.exists(final_reference):
if os.path.isfile(fai): os.symlink(reference, final_reference)
os.symlink(fai, final_fai) fai = reference + ".fai"
else: if os.path.isfile(fai):
print("Indexing reference fasta file...") os.symlink(fai, final_fai)
exit_code = run("samtools faidx %s" % final_reference, shell=True).returncode else:
if exit_code != 0: print("Indexing reference fasta file...")
self._fail("Indexing reference fasta file failed") exit_code = run("samtools faidx %s" % final_reference, shell=True).returncode
return False if exit_code != 0:
elif not os.path.exists(final_fai): self._fail("Indexing reference fasta file failed")
self._fail("Working directory is corrupted: reference.fasta.fai not found in the existing working " return False
"directory") elif not os.path.exists(final_fai):
return False self._fail("Working directory is corrupted: reference.fasta.fai not found in the existing working "
"directory")
return False
else:
self._rerun_tools(new_tools=tools)
if self.config_data["batches"] != batches:
print("Error: rerun with a different number of batch size not allowed. Exiting...", file=sys.stderr)
exit(1)
if chromosomes == "all": if chromosomes == "all":
chromosomes = self._chromosomes_list(final_fai, not force_all_chr) chromosomes = self._chromosomes_list(final_fai, not force_all_chr)
...@@ -679,13 +739,14 @@ class CnvPipeline: ...@@ -679,13 +739,14 @@ class CnvPipeline:
self._save_wf_type("detection") self._save_wf_type("detection")
# Prepare data: if not self.isrerun:
bams_dir = os.path.join(self.wdir, "data", "bams") # Prepare data:
if not os.path.exists(bams_dir): bams_dir = os.path.join(self.wdir, "data", "bams")
os.makedirs(bams_dir) if not os.path.exists(bams_dir):
final_sample_file = os.path.join(self.wdir, "samples.list") os.makedirs(bams_dir)
self._link_bams_wdir(samples_file=samples, bams_dir=bams_dir, final_sample_file = os.path.join(self.wdir, "samples.list")
smple_file=final_sample_file, nb_samples=size_pbatches) self._link_bams_wdir(samples_file=samples, bams_dir=bams_dir,
smple_file=final_sample_file, nb_samples=size_pbatches)
return self.run_or_rerun_detection(**kwargs) return self.run_or_rerun_detection(**kwargs)
...@@ -724,21 +785,23 @@ class CnvPipeline: ...@@ -724,21 +785,23 @@ class CnvPipeline:
final_reference = os.path.join(self.wdir, "reference_raw.fasta") final_reference = os.path.join(self.wdir, "reference_raw.fasta")
final_fai = final_reference + ".fai" final_fai = final_reference + ".fai"
if not os.path.exists(final_reference):
os.symlink(reference, final_reference) if not self.isrerun:
fai = reference + ".fai" if not os.path.exists(final_reference):
if os.path.isfile(fai): os.symlink(reference, final_reference)
os.symlink(fai, final_fai) fai = reference + ".fai"
else: if os.path.isfile(fai):
print("Indexing reference fasta file...") os.symlink(fai, final_fai)
exit_code = run("samtools faidx %s" % final_reference, shell=True).returncode else:
if exit_code != 0: print("Indexing reference fasta file...")
self._fail("Indexing reference fasta file failed") exit_code = run("samtools faidx %s" % final_reference, shell=True).returncode
return False if exit_code != 0:
elif not os.path.exists(final_fai): self._fail("Indexing reference fasta file failed")
self._fail("Working directory is corrupted: reference.fasta.fai not found in the existing working " return False
"directory") elif not os.path.exists(final_fai):
return False self._fail("Working directory is corrupted: reference.fasta.fai not found in the existing working "
"directory")
return False
if chromosomes == "all": if chromosomes == "all":
chromosomes = self._chromosomes_list(final_fai, not force_all_chr) chromosomes = self._chromosomes_list(final_fai, not force_all_chr)
...@@ -818,14 +881,15 @@ class CnvPipeline: ...@@ -818,14 +881,15 @@ class CnvPipeline:
elif not os.path.isdir(ref_dir): elif not os.path.isdir(ref_dir):
raise FileExistsError(ref_dir + " already exists but is not a folder") raise FileExistsError(ref_dir + " already exists but is not a folder")
for file in glob(reference + "*"): if not self.isrerun:
final_link = os.path.join(ref_dir, os.path.basename(file)) for file in glob(reference + "*"):
if os.path.exists(final_link): final_link = os.path.join(ref_dir, os.path.basename(file))
if os.path.islink(final_link): if os.path.exists(final_link):
os.remove(final_link) if os.path.islink(final_link):
else: os.remove(final_link)
raise FileExistsError(file + "already exists but is not a link") else:
os.symlink(file, final_link) raise FileExistsError(file + "already exists but is not a link")
os.symlink(file, final_link)
final_sample_file = os.path.join(self.wdir, "samples.yml") final_sample_file = os.path.join(self.wdir, "samples.yml")
self.parse_align_sample_file(samples, final_sample_file) self.parse_align_sample_file(samples, final_sample_file)
...@@ -998,6 +1062,7 @@ class CnvPipeline: ...@@ -998,6 +1062,7 @@ class CnvPipeline:
final_reference = os.path.join(self.wdir, "reference_raw.fasta") final_reference = os.path.join(self.wdir, "reference_raw.fasta")
final_fai = final_reference + ".fai" final_fai = final_reference + ".fai"
if not os.path.exists(final_reference): if not os.path.exists(final_reference):
os.symlink(reference, final_reference) os.symlink(reference, final_reference)
fai = reference + ".fai" fai = reference + ".fai"
...@@ -1014,15 +1079,19 @@ class CnvPipeline: ...@@ -1014,15 +1079,19 @@ class CnvPipeline:
"directory") "directory")
return False return False
self._rerun_tools(tools)
refbundle_dir = os.path.join(self.wdir, "refbundle") refbundle_dir = os.path.join(self.wdir, "refbundle")
refbundle_fasta = os.path.join(refbundle_dir, "reference_raw.fasta") refbundle_fasta = os.path.join(refbundle_dir, "reference_raw.fasta")
if not os.path.exists(refbundle_fasta):
if not os.path.exists(refbundle_dir): if "genomestrip" in tools:
os.makedirs(refbundle_dir) if not os.path.exists(refbundle_fasta):
os.symlink(final_reference, refbundle_fasta) if not os.path.exists(refbundle_dir):
refbundle_fai = refbundle_fasta + ".fai" os.makedirs(refbundle_dir)
if not os.path.exists(refbundle_fai): os.symlink(final_reference, refbundle_fasta)
os.symlink(final_fai, refbundle_fai) refbundle_fai = refbundle_fasta + ".fai"
if not os.path.exists(refbundle_fai):
os.symlink(final_fai, refbundle_fai)
if chromosomes == "all": if chromosomes == "all":
chromosomes = self._chromosomes_list(final_fai, not force_all_chr) chromosomes = self._chromosomes_list(final_fai, not force_all_chr)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment