Commit 5ccb90b5 authored by Celine Noirot's avatar Celine Noirot
Browse files

Merge branch 'nG6-Slurm-rules' into 'nG6-Slurm'

N g6 slurm rules

See merge request !10
parents 99f4eebf fcd072d4
......@@ -174,7 +174,6 @@ class Exclude(LinkRule):
if exclude in self.user_args and self.user_args[exclude] is not None and self.user_args[exclude] and \
self.user_args[exclude] != self.get_parameter(exclude).default:
# The target is found, is not None and is not False
print(self.user_args)
self.error("Parameters '" + self.parameter_name + "' and '" + exclude + "' are mutually excluded")
......
......@@ -507,11 +507,8 @@ class Workflow(threading.Thread):
return gr
def delete(self):
logging.getLogger("wf." + str(self.id)).debug("delete. Trying to delete " + self.directory)
if self.get_status() in [self.STATUS_COMPLETED, self.STATUS_FAILED, self.STATUS_ABORTED]:
logging.getLogger("wf." + str(self.id)).debug("status is OK for deletetion" )
utils.robust_rmtree(self.directory)
logging.getLogger("wf." + str(self.id)).debug("workflow deleted" )
@staticmethod
def config_parser(arg_lines):
......@@ -971,6 +968,8 @@ class Workflow(threading.Thread):
self._status = self.STATUS_STARTED
self._postprocess_status = self.STATUS_PENDING
self.end_time = None
#check rules
self.check_parameters_rules(self.args)
# if some args are provided, let's fill the parameters
self._set_parameters(self.args)
self._write_parameters()
......
#
# Copyright (C) 2012 INRA
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
......@@ -26,37 +26,37 @@ from ng6.run import Run
from ng6.analysis import Analysis
class AddAnalysis (BasicNG6Workflow):
def get_description(self):
return "Add a brand new analysis to a run or a project"
def define_parameters(self, function="process"):
self.add_input_file_list("data_file", " Which data files should be added to the analysis", required = True)
self.add_parameter("compression", "How should data be compressed once archived", choices = ['none', 'gz', 'bz2'], default = 'none')
self.add_parameter("delete", "Should the input data be deleted once the process is over", type = bool , default = False)
self.add_parameter("archive_name", "Give a name to the final archive", default = 'ng6_archive')
self.add_parameter("project_id", "The project id the analysis belongs to", type = 'int', rules="Exclude=run_id;RequiredIf?ALL[run_id=None]")
self.add_parameter("run_id", "The run id the run belongs to", type = 'int', rules = "Exclude=project_id;RequiredIf?ALL[project_id=None]")
self.add_parameter("run_id", "The run id the run belongs to", type = 'int')
self.add_parameter("analysis_name", "Give a name to your analysis", required = True)
self.add_parameter("analysis_description", "Give a description to your analysis", required = True)
self.add_parameter("analysis_software", "Which software was used for this analysis", required = True)
self.add_parameter("analysis_software_options", "Which software options were used for this analysis", required = True)
self.add_parameter("analysis_software_version", "analysis_software_version", required = True)
self.add_parameter("parent_analysis", "The id of an analysis to be used as a parent analysis", type= 'int')
def process(self):
self.runobj = None
self.project = None
# First check if files provided exists
files_to_save = []
files_to_save = []
for file in self.data_file:
if os.path.isfile(file):
files_to_save.append(file)
else:
display_error_message("error: %s file does not exists\n" % (file))
# Check if user provided a project or a run
# Check if user provided a project or a run
if self.project_id :
self.project = Project.get_from_id(self.project_id)
self.metadata.append("project_id="+str(self.project_id))
......@@ -68,9 +68,9 @@ class AddAnalysis (BasicNG6Workflow):
self.metadata.append("run_id="+str(self.run_id))
else :
display_error_message("One of run_id or project_id parameter is required!\n")
addto = "project" if self.project else "run"
parent = None
if self.parent_analysis :
parent = Analysis.get_from_id(self.parent_analysis)
......@@ -80,11 +80,11 @@ class AddAnalysis (BasicNG6Workflow):
analyses_ids = [ o.id for o in self.runobj.get_analysis()]
if parent.id not in analyses_ids :
display_error_message("The parent analysis associated with id '%s' does not belong to the %s id '%s' " % (parent.id, addto, self.project_id or self.run_id) )
basicanalysis = self.add_component("BasicAnalysis", [files_to_save, self.analysis_name,
self.analysis_description, self.analysis_software, self.analysis_software_options,
self.analysis_software_version, self.compression, self.delete,
self.archive_name], addto=addto, parent = parent)
basicanalysis = self.add_component("BasicAnalysis", [files_to_save, self.analysis_name,
self.analysis_description, self.analysis_software, self.analysis_software_options,
self.analysis_software_version, self.compression, self.delete,
self.archive_name], addto=addto, parent = parent)
def post_process(self):
if self.runobj:
......
#
# Copyright (C) 2012 INRA
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
......@@ -24,41 +24,41 @@ from ng6.run import Run
from ng6.analysis import Analysis
class AddFiles (BasicNG6Workflow):
def get_description(self):
return "Add files to an existing analysis or run"
def define_parameters(self, function="process"):
self.add_input_file_list("data_file", " Which data files should be added to the analysis or the run", required = True)
self.add_parameter("compression", "How should data be compressed once archived", choices = ['none', 'gz', 'bz2'], default = 'none')
self.add_parameter("delete", "Should the input data be deleted once the process is over", type = bool , default = False)
self.add_parameter("analysis_id", "The id of the analysis", type = int) #, required = True)
self.add_parameter("run_id", "The run id the run belongs to", type = int)#, required = True)
#TODO Exclude self.add_exclusion_rule( "analysis_id", "run_id" )
self.add_parameter("analysis_id", "The id of the analysis", type = int, rules="Exclude=run_id;RequiredIf?ALL[run_id=None]")
self.add_parameter("run_id", "The run id the run belongs to", type = 'int')
def process(self):
# check if files provided exists
files_to_save = []
files_to_save = []
for file in self.data_file:
if os.path.isfile(file):
files_to_save.append(file)
else:
sys.exit("error: %s file does not exists\n" % (file))
self.runobj = None
self.analysisobj = None
self.project = None
if self.run_id :
self.runobj = Run.get_from_id(self.run_id)
self.project = Project.get_from_run_id(self.run_id)
if self.project is not None and not self.project.is_admin(self.admin_login):
sys.stderr.write( "The user login '" + self.admin_login + "' is not allowed to add data on project '" + self.project.name + "'.\n" )
sys.exit()
self.metadata.append("run_id="+str(self.run_id))
add = self.add_component("AddRawFiles", [ self.runobj, files_to_save, self.compression ] )
elif self.analysis_id :
self.analysisobj = Analysis.get_from_id(self.analysis_id)
self.project = self.analysisobj.project
......@@ -71,11 +71,10 @@ class AddFiles (BasicNG6Workflow):
else :
sys.stderr.write("One of --run-id or --analysis-id parameter is required!\n")
sys.exit()
def post_process(self):
# once everything done, sync directories
if self.runobj:
self.runobj.sync()
elif self.analysisobj:
self.analysisobj.sync()
#
# Copyright (C) 2012 INRA
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
......@@ -23,7 +23,7 @@ from ng6.ng6workflow import NG6Workflow
from ng6.utils import Utils
class RADseq (NG6Workflow):
ENZYMES = {
'sbfI' : {
'rad' : 'CCTGCAGG',
......@@ -39,28 +39,29 @@ class RADseq (NG6Workflow):
}
}
def get_enzyme (self, name):
if name not in self.ENZYMES :
raise ValueError("The enzyme name " + str(name) + " does not exists. Accepted names are " + str (list(self.ENZYMES.keys())))
return (self.ENZYMES[name]['rad'], self.ENZYMES[name]['radtag'])
def get_name(self):
return 'radseq'
def get_description(self):
return "RADseq data quality analysis workflow"
def define_parameters(self, function="process"):
# if metadata.barcode faire splitbc. Sinon liste read1 = liste des échantillons.
# split_bc
self.add_multiple_parameter_list("pool", "Sequence pool", required = False)
self.add_parameter("id", "Pool uniq identifier", add_to = "pool")
# required = True que si pool précisé
self.add_input_file("read1", "Read 1 fastq file path", required = False, add_to = "pool")
#TODO check rules
self.add_input_file("read1", "Read 1 fastq file path", rules="RequiredIf?ALL[poll!=None]", add_to = "pool")
self.add_input_file("read2", "Read 2 fastq file path", add_to = "pool")
# options bougées dans process_radtag
#self.add_parameter("enzyme", "Restriction enzyme name", required = True , choices = list(self.ENZYMES.keys()))
......@@ -68,13 +69,12 @@ class RADseq (NG6Workflow):
# option inutile maintenant, puisque potentiel rescue et check RAD dans process_radtag
#self.add_parameter("tag_mismatch", "Max. number of mismatches allowed in the radTAG sequence",default = 0, type=int)
self.add_parameter("mismatches", "Max. number of mismatches allowed.",default = 0, type=int)
self.add_parameter("trim_barcode", "Should the barecode be trimmed",default = True, type=bool)
self.add_parameter("trim_barcode", "Should the barecode be trimmed",default = True, type=bool, rules="Exclude=trim_reads2")
self.add_parameter("trim_reads2", " Shoud the read 2 be trimmed to have the same length as the read1",default = False, type=bool)
#TODO Exclude self.add_exclusion_rule('trim_reads2', 'trim_barcode')
#clone_filter
self.add_parameter("dereplicate", "Should we remove PCR duplcat. Only for paired end - single RADSeq protocol",default = False, type=bool)
# process_radtag
self.add_parameter("enzyme", "Restriction enzyme name", required = True , choices = list(self.ENZYMES.keys()))
self.add_parameter("enzyme2", "Restriction enzyme 2 name", required = False, default="", choices = list(self.ENZYMES.keys()))
......@@ -86,7 +86,7 @@ class RADseq (NG6Workflow):
self.add_parameter("keep_discard_read", "capture discarded reads to a file.", default = True, type=bool)
self.add_parameter("window_size", "set the size of the sliding window as a fraction of the read length, between 0 and 1 (default 0.15).", default=0.15, type=float)
self.add_parameter("limit_score", "set the score limit. If the average score within the sliding window drops below this value, the read is discarded (default 10).", default=10, type=int)
# ustacks
self.add_parameter("min_cov", "stacks minimum coverage", type=int)
self.add_parameter("primary_mismatch", "stacks max mismatch", type=int)
......@@ -95,21 +95,21 @@ class RADseq (NG6Workflow):
self.add_parameter("disable_removal_algo", "removal cluster correction algo", type=bool)
self.add_parameter("disable_deleveraging_algo", "deleveraging cluster correction algo", type=bool)
self.add_parameter("max_locus", "max stacks per cluster", type=int)
# cstacks
self.add_parameter("batch_id", " batch_id of this radseq analysis",default = 1, type=int)
self.add_parameter("catalog_mismatches", "How many mismatches allowed between sample to generate the cluster catalog",default = 1, type=int)
# ng6
self.add_parameter("compression", "How should data be compressed once archived (none|gz|bz2)",default = 'gz', choices=['none', 'gz', 'bz2'])
self.add_parameter("databank", "Which databank should be used to seek contamination (as to be nr databank)")
def process(self):
read1_list=[]
read2_list= []
# plus besoins on fait le check du rad dans process_radtag, sinon ça ne fonctionne pas dans le cas des double digest.
# rad, rad_tag = self.get_enzyme(self.enzyme)
# si aucun pool, alors données démultipléxées et donc les sample read1 sont les vraies sample read1
......@@ -127,18 +127,18 @@ class RADseq (NG6Workflow):
if p["id"] in pools :
raise ValueError("Duplicated pool id." + p['id'])
pools[p["id"]] = (p, [])
for sample in self.samples :
pool_id = sample.get_metadata('pool_id')
if pool_id not in pools:
raise ValueError("The pool id " + pool_id + " does not exists in (individual " + sample.name + ")")
pools[pool_id][1].append(sample)
# prepare fastq files
# prepare fastq files
fastq_files_1 = []
fastq_files_2 = []
barcode_files = []
for pool_id, data in pools.items() :
p = data[0]
samples = data[1]
......@@ -147,18 +147,18 @@ class RADseq (NG6Workflow):
fastq_files_1.append(p['read1']);
if p['read2'] :
fastq_files_2.append(p['read2'])
# write barcode file
with open(tmp_barcode, "w") as ff:
for sample in samples :
ff.write(sample.name + "\t" + sample.get_metadata('barcode') +"\n")
#splitbc = self.add_component("Splitbc", [ fastq_files_1, barcode_files, fastq_files_2, rad, rad_tag, self.mismatches, self.tag_mismatch, self.trim_barcode, self.trim_reads2])
splitbc = self.add_component("Splitbc", [ fastq_files_1, barcode_files, fastq_files_2, self.mismatches, self.trim_barcode, self.trim_reads2])
read1_list=splitbc.output_read1
read2_list=splitbc.output_read2
# PROCESS_RADTAG
# PROCESS_RADTAG
print("before process",read1_list,read2_list)
process_radtag = self.add_component("ProcessRadtag", [read1_list, read2_list, self.dereplicate, self.enzyme, self.enzyme2, self.window_size, self.limit_score, self.quality_encode, self.max_length, self.uncall_remove, self.discard_low_qual, self.rescue_radtag, self.keep_discard_read ])
"""
......@@ -169,32 +169,32 @@ class RADseq (NG6Workflow):
# read1_files = []
# read2_files = []
# # check non empty and MAJ sample_id
# for idx, read1 in enumerate(process_radtag.output_read_1):
# for idx, read1 in enumerate(process_radtag.output_read_1):
# if os.path.getsize(read1) > 0:
# read1_files.append(read1)
# read2_files.append(process_radtag.output_read_2[idx])
# samples_id.append(idx)
#
ustacks_opt={"samples_id":samples_id,"read1_files":read1_list}
if self.min_cov :
if self.min_cov :
ustacks_opt["min_cov"] = self.min_cov
if self.primary_mismatch :
if self.primary_mismatch :
ustacks_opt["primary_mismatch"] = self.primary_mismatch
if self.secondary_mismatch :
if self.secondary_mismatch :
ustacks_opt["secondary_mismatch"] = self.secondary_mismatch
if self.uncall_hap :
if self.uncall_hap :
ustacks_opt["uncall_hap"] = self.uncall_hap
if self.disable_removal_algo :
if self.disable_removal_algo :
ustacks_opt["removal_algo"] = False
if self.disable_deleveraging_algo :
if self.disable_deleveraging_algo :
ustacks_opt["deleveraging_algo"] = False
if self.max_locus :
if self.max_locus :
ustacks_opt["max_locus"] = self.max_locus
ustacks = self.add_component("Ustacks", [],ustacks_opt)
# CSTACKS
# cstacks = self.add_component("Cstacks", [ustacks.alleles, ustacks.snps, ustacks.tags, self.batch_id, self.catalog_mismatches])
"""
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment