Commit 00b365f2 authored by Celine Noirot's avatar Celine Noirot
Browse files

Merge branch 'retention_date' into 'master'

Retention date

See merge request !28
parents 11250b33 fd2e57a6
__pycache__
smarty
......@@ -32,8 +32,6 @@ server_socket_port = 8080
server_socket_protocol = https
# date format
date_format = %d/%m/%Y
# data retention time (in years)
retention_time = 2
#optional : key for jflow server ssl configuration
ssl_certificate = certificate.crt
......@@ -87,6 +85,19 @@ work_directory = <path>/work
# accessible by all cluster nodes
tmp_directory = <path>/tmp
# Configure different storage directory
# and associate retention date or duration
#
# eg with a duration
[space_default]
dir_name=default
retention_period=3
#
# eg with a fix date
[space_final]
dir_name=directory
retention_date=12/03/2021
[ui]
# This must be an absolute path of a directory mounted and accessible by the server
user_base_directory = /work/
......
......@@ -53,13 +53,13 @@ def add_analysis(parent_id, analysis_cfg, *input_files):
import logging
logging.getLogger("Analysis").debug("Start. Imports went good.")
logging.getLogger("Analysis").debug("Start. working for analysis " + analysis_cfg)
# get inputs from parameters
analysis_serialized_path = input_files[0]
try: parent_analysis_cfg = input_files[1]
except: parent_analysis_cfg = None
logging.getLogger("Analysis").debug("Start. parent_analysis_cfg is initialised.")
# load the analysis object
analysis_dump = open(analysis_serialized_path, "rb")
analysis = pickle.load(analysis_dump)
......@@ -84,7 +84,7 @@ def add_analysis(parent_id, analysis_cfg, *input_files):
logging.getLogger("Analysis").debug("analysis.post_process. Starting")
analysis.post_process()
logging.getLogger("Analysis").debug("analysis.post_process. Done")
# add the analysis to the right run/project object
logging.getLogger("Analysis").debug("analysis.project. Starting add_analysis(analysis)")
if analysis.project:
......@@ -92,9 +92,9 @@ def add_analysis(parent_id, analysis_cfg, *input_files):
analysis.project.add_analysis(analysis)
logging.getLogger("Analysis").debug("analysis.project. Finishing analysis.project.add_analysis(analysis)")
elif analysis.run:
logging.getLogger("Analysis").debug("analysis.project. Starting analysis.run.add_analysis(analysis)")
logging.getLogger("Analysis").debug("analysis.run. Starting analysis.run.add_analysis(analysis)")
analysis.run.add_analysis(analysis)
logging.getLogger("Analysis").debug("analysis.project. Finishing analysis.run.add_analysis(analysis)")
logging.getLogger("Analysis").debug("analysis.run. Finishing analysis.run.add_analysis(analysis)")
logging.getLogger("Analysis").debug("Config File. Starting the writing of config file")
analysis.write_config_file()
logging.getLogger("Analysis").debug("Config File. Is written")
......@@ -105,11 +105,11 @@ class Analysis (Component):
"""
# The directories structure into ng6
DIRECTORIES_STRUCTURE = "/data/analyze/"
DIRECTORIES_STRUCTURE = "analyze"
ANALYSIS_CONFIG_FILE_NAME = "analysis.cfg"
def __init__(self, name="", description="", software="",
options="", version="", id=None, parent_analysis=None):
options="", version="", id=None, parent_analysis=None, space_id="default"):
"""
Build an Analysis object
@param name : the analysis name
......@@ -132,54 +132,60 @@ class Analysis (Component):
self.results = {}
self.run = None
self.project = None
self.is_editable = False
self.is_editable = False
self.space_id = space_id
self.retention_date = None
# Set the temp folder to the ng6 temp folder
ng6conf = NG6ConfigReader()
logging.getLogger("Analysis.__init__").debug("ng6conf est chargé")
tempfile.tempdir = ng6conf.get_tmp_directory()
logging.getLogger("Analysis.__init__").debug("tempfile.tempdir = " + tempfile.tempdir)
if not os.path.isdir(tempfile.tempdir):
os.makedirs(tempfile.tempdir, 0o751)
if not os.path.isdir(tempfile.tempdir):
os.makedirs(tempfile.tempdir, 0o751)
if id != None : # If not a new analysis
logging.getLogger("Analysis.__init__").debug("Connexion à la BD")
t3mysql = t3MySQLdb()
logging.getLogger("Analysis.__init__").debug("Connexion effectuée")
self.run = Run.get_from_id(self.__get_run_id())
from ng6.project import Project
self.project = Project.get_from_id(self.__get_project_id())
self.project = Project.get_from_run_id(self.__get_run_id())
self.space_id=self.project.space_id
self.directory = t3mysql.select_analysis_directory(id)
logging.getLogger("Analysis.__init__").debug("Building analysis with id=" + str(id) + " [" + str(self) + "]")
else :
self.version = self.get_version()
if isinstance(self.version, bytes):
self.version = self.version.decode()
directory_name = uuid.uuid4().hex[:9]
while True:
save_dir = os.path.join(ng6conf.get_save_directory()+self.DIRECTORIES_STRUCTURE, directory_name)
logging.getLogger("Analysis.__init__").debug("Building analysis save_dir=" + save_dir)
work_dir = os.path.join(ng6conf.get_work_directory()+self.DIRECTORIES_STRUCTURE, directory_name)
logging.getLogger("Analysis.__init__").debug("Building analysis work_dir=" + save_dir)
if not os.path.isdir(save_dir) and not os.path.isdir(work_dir):
break
directory_name = uuid.uuid4().hex[:9]
self.directory = os.path.join(self.DIRECTORIES_STRUCTURE, directory_name)
logging.getLogger("Analysis.__init__").debug("Building brand new analysis [" + str(self) + "]")
def define_analysis(self):
"""
"""
Define all analysis attributs, has to be implemented by subclasses
"""
raise NotImplementedError
def post_process(self):
"""
Process the analysis, has to be implemented in the sub class
"""
raise NotImplementedError
def execute(self):
ng6conf = NG6ConfigReader()
directory_name = uuid.uuid4().hex[:9]
while True:
save_dir = os.path.join(ng6conf.get_save_directory(), ng6conf.get_space_directory(self.space_id), self.DIRECTORIES_STRUCTURE, directory_name)
logging.getLogger("Analysis.__init__").debug("Building analysis save_dir=" + save_dir)
work_dir = os.path.join(ng6conf.get_work_directory(), ng6conf.get_space_directory(self.space_id), self.DIRECTORIES_STRUCTURE, directory_name)
logging.getLogger("Analysis.__init__").debug("Building analysis work_dir=" + save_dir)
if not os.path.isdir(save_dir) and not os.path.isdir(work_dir):
break
directory_name = uuid.uuid4().hex[:9]
self.directory = "/" + os.path.join(ng6conf.get_space_directory(self.space_id), self.DIRECTORIES_STRUCTURE, directory_name)
self.retention_date = ng6conf.get_retention_date(self.space_id)
# first create the output directory
if not os.path.isdir(self.output_directory):
os.makedirs(self.output_directory, 0o751)
......@@ -189,7 +195,7 @@ class Analysis (Component):
# then add analysis information
self.define_analysis()
inputs = []
# serialized the object
analysis_dump_path = self.get_temporary_file(".dump")
......@@ -197,7 +203,7 @@ class Analysis (Component):
pickle.dump(self, analysis_dump)
analysis_dump.close()
inputs.append(analysis_dump_path)
parent_id = 'none'
wait_for_files = []
for attr in self.__dict__:
......@@ -214,17 +220,17 @@ class Analysis (Component):
inputs.append(self.parent.get_config_file_path())
# then run the component
self.process()
# add the add_analysis command lines to the make
add = PythonFunction(add_analysis, cmd_format="{EXE} {ARG} {OUT} {IN}")
add(includes=wait_for_files, outputs=self.get_config_file_path(), inputs=inputs, arguments=parent_id)
def __str__(self):
"""
Return a full description of the analysis
Return a full description of the analysis
"""
return "name="+str(self.name)+";description="+str(self.description)+";software="+\
str(self.software)+";version="+str(self.version)+";options="+str(self.options)
return "id="+str(self.id)+";name="+str(self.name)+";description="+str(self.description)+";software="+\
str(self.software)+";version="+str(self.version)+";options="+str(self.options)+\
";space_id="+str(self.space_id)+";project="+str(self.project)+";run="+str(self.run)
def sync(self):
"""
......@@ -254,7 +260,7 @@ class Analysis (Component):
t3mysql.update_analysis_field(self.id, "storage_size", Utils.get_storage_size(self.__get_save_directory()))
except Exception as err:
raise Exception(str(err))
def list_or_copy_analysis_files(self, outputdir, pattern):
"""
Return list of file corresponding to the pattern and copy unzipped files into output dir if it's zipped
......@@ -264,7 +270,7 @@ class Analysis (Component):
list = []
if outputdir == "":
outputdir = tempfile.mkdtemp()
for file in os.listdir(self.__get_save_directory()) :
# If it fits criteria
if pattern != "":
......@@ -314,8 +320,9 @@ class Analysis (Component):
else: parent_uid = 0
logging.getLogger("Analysis.save").debug("Doing the add analysis")
self.id = t3mysql.add_analysis(self.get_template(), self.name, self.description, self.admin_login,
datetime.date.today(), self.directory, self.software, self.version,
self.options, self.is_editable, parent_uid=parent_uid)
datetime.date.today(), self.directory, self.software, self.version,
self.options, self.retention_date, self.retention_date,
self.is_editable, parent_uid=parent_uid)
logging.getLogger("Analysis.save").debug("add_analysis done")
# Then add all results link to this analysis
for file in self.results:
......@@ -334,7 +341,7 @@ class Analysis (Component):
@return : path the the config file
"""
t3mysql = t3MySQLdb()
# First select run
[name, date, description, software, options, version] = t3mysql.select_analysis(self.id)
config = RawConfigParser()
......@@ -346,7 +353,7 @@ class Analysis (Component):
config.set("Analysis", "software", software)
config.set("Analysis", "options", options)
config.set("Analysis", "version", version)
config_path = self.get_config_file_path()
config.write(open(config_path,'w'))
return config_path
......@@ -393,7 +400,7 @@ class Analysis (Component):
if len(file_basenames) != len(set(file_basenames)) :
try : # If there are samples
if self.run :
samples = self.run.get_samples()
else :
......@@ -541,14 +548,15 @@ class Analysis (Component):
@param files: the files to archive
@param mode: can be none, gz, bz2, tar.gz and tar.bz2
@param archive_name: the archive name if tar is requested
@param delete: delete files
@param delete: delete files
"""
# First handle if only one file
if len(files) == 1 and mode == "tar.gz":
mode = "gz"
elif len(files) == 1 and mode == "tar.bz2":
mode = "bz2"
if mode == "none":
for file in files:
if os.path.isfile(file):
......@@ -596,7 +604,7 @@ class Analysis (Component):
@param gzip : if True the file will be gziped
@return : the web path to the file
"""
if not os.path.isdir(self.__get_work_directory()):
os.makedirs(self.__get_work_directory(), 0o751)
# First copy the file into the analysis directory
......
......@@ -25,24 +25,29 @@ __status__ = 'beta'
import os
from configparser import ConfigParser
from .utils import *
import logging
import logging
from dateutil.relativedelta import relativedelta
import datetime
import time
class NG6ConfigReader(object):
"""
Class NG6ConfigReader: this object read the config file and return the different configuration values
"""
"""
# The ng6 config file path
CONFIG_FILE = os.path.join(os.path.dirname(__file__), "../../application.properties")
CONFIG_FILE = os.path.join(os.path.dirname(__file__), "../../application.properties")
PID = 5
def __init__(self):
"""
Build a ConfigReader
"""
"""
self.reader = ConfigParser()
self.reader.read(self.CONFIG_FILE)
def get_typo3_db_params(self):
"""
return params needed to connect to the typo3 database
......@@ -56,7 +61,7 @@ class NG6ConfigReader(object):
db_params.append(self.reader.get('database', 'dbname'))
return db_params
except :
raise Error("Failed when parsing the config file !")
raise Exception("Failed when parsing the config file - get_typo3_db_params !")
def get_pid(self):
......@@ -75,7 +80,7 @@ class NG6ConfigReader(object):
try:
return self.reader.get('storage', 'work_directory')
except :
raise Error("Failed when parsing the config file !")
raise Exception("Failed when parsing the config file - get_work_directory! ")
def get_tmp_directory(self):
......@@ -86,8 +91,8 @@ class NG6ConfigReader(object):
try:
return self.reader.get('storage', 'tmp_directory')
except :
raise Error("Failed when parsing the config file !")
raise Exception("Failed when parsing the config file get_tmp_directory !")
def get_save_directory(self):
"""
......@@ -97,9 +102,37 @@ class NG6ConfigReader(object):
try:
return self.reader.get('storage', 'save_directory')
except :
raise Error("Failed when parsing the config file !")
raise Exception("Failed when parsing the config file get_save_directory !")
def get_space_directory(self, space_id="default"):
"""
return the directory corresponding to space_id
@return: space_dir
"""
try:
return self.reader.get('space_'+space_id, 'dir_name').strip("/")
except:
raise Exception("Failed when parsing the config file !")
def get_retention_date(self, space_id="default"):
"""
return the directory corresponding to space_id
@return: space_dir
"""
date = None
try :
nb_month=self.reader.getint('space_'+space_id,"retention_period")
retention_date = time.mktime((datetime.datetime.today()+ relativedelta(months=nb_month)).timetuple())
return retention_date
except:
try :
(d,m,y)=self.reader.get('space_'+space_id,"retention_date").split('/')
date = time.mktime(datetime.date(int(y),int(m),int(d)).timetuple())
return date
except:
raise Exception("Failed when parsing the config file !")
raise ValueError("Failed while gnerating retention date!")
def get_log_file_path(self):
"""
return the log file path
......@@ -108,7 +141,7 @@ class NG6ConfigReader(object):
try:
return self.reader.get('storage', 'log_file')
except :
raise Error("Failed when parsing the config file !")
raise Exception("Failed when parsing the config file !")
def get_log_level(self):
"""
return the level of log
......@@ -117,9 +150,9 @@ class NG6ConfigReader(object):
try:
return self.reader.get('storage', 'log_level')
except :
raise Error("Failed when parsing the config file !")
raise Exception("Failed when parsing the config file !")
def get_454_mids(self):
"""
return the 454 mids list
......@@ -132,19 +165,19 @@ class NG6ConfigReader(object):
mid_array[mid[0]] = mid[1]
return mid_array
except :
raise Error("Failed when parsing the config file !")
raise Exception("Failed when parsing the config file get_454_mids !")
def get_workflow_filters(self):
"""
Return a list of workflow class names
Return a list of workflow class names
@return: list
"""
filters = self.reader.get("local", 'runwf_filter')
if filters :
return filters.split()
return []
def get_user_base_directory(self):
"""
Return an absolute path of a directory mounted on the web server
......@@ -153,28 +186,20 @@ class NG6ConfigReader(object):
try:
return self.reader.get('ui', 'user_base_directory')
except :
raise Error("Could not retrieve server directory !")
raise Exception("Could not retrieve server directory get_user_base_directory !")
def get_server_parameters(self):
"""
return the server parameters
@return: server_socket_host, server_socket_port, server_socket_protocol
"""
# try:
server_params = []
server_params.append(self.reader.get('global', 'server_socket_host'))
server_params.append(self.reader.get('global', 'server_socket_port'))
server_params.append(self.reader.get("global", 'server_socket_protocol'))
return server_params
# except :
# raise Error("Failed when parsing the config file !")
def get_retention_time(self):
"""
return the retention time parameter
@return: retention_time
"""
try:
return self.reader.get('global', 'retention_time')
server_params = []
server_params.append(self.reader.get('global', 'server_socket_host'))
server_params.append(self.reader.get('global', 'server_socket_port'))
server_params.append(self.reader.get("global", 'server_socket_protocol'))
return server_params
except :
raise Error("Could not retrieve retention_time !")
\ No newline at end of file
raise Exception("Failed when parsing the config file get_server_parameters !")
......@@ -67,6 +67,10 @@ class BasicNG6Workflow (Workflow):
cmpt_object.parent = parent
if addto == "project": cmpt_object.project = self.project
elif addto == "run": cmpt_object.run = self.runobj
#We replace the default space_id
if cmpt_object.space_id == "default" and cmpt_object.space_id != cmpt_object.run.space_id:
cmpt_object.space_id = cmpt_object.run.space_id
# link analysis with ots create user
cmpt_object.admin_login = self.admin_login
# there is a dynamic component
......@@ -104,7 +108,6 @@ class BasicNG6Workflow (Workflow):
raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
cmpt_object.prefix + " already exist in this pipeline!")
self.component_nameids[cmpt_object.get_nameid()] = None
return cmpt_object
else:
raise ImportError(component_name + " component cannot be loaded, available components are: {0}".format(
......@@ -237,7 +240,7 @@ class NG6Workflow (BasicNG6Workflow):
# build the run
self.runobj = Run(self.run_name, self.run_date, self.species, self.data_nature,
self.run_type, self.run_description, self.sequencer)
self.run_type, self.run_description, self.sequencer, self.project.space_id)
self.runobj.admin_login=self.admin_login
# then add the run to the project
self.project.add_run(self.runobj)
......@@ -444,10 +447,10 @@ class CasavaNG6Workflow(NG6Workflow):
'reads1' : [],
'reads2' : []
}
# filter on project name
if re.match("Project_" + project_name + "/Sample_.+", sample['subdir']) or sample['subdir'].startswith("Undetermined_indices"):
for file in os.listdir(casava_directory + "/" + sample['subdir']):
filepath = casava_directory + "/" + sample['subdir'] + "/" + file
if file.endswith(".fastq.gz") and re.search(".*_L00" + str(lane_number) + "_.*", file):
......@@ -474,7 +477,7 @@ class CasavaNG6Workflow(NG6Workflow):
all_samples.append(sp_object)
all_samples_id.append(sample['sample_id'])
for file in os.listdir(casava_directory):
filepath = casava_directory + "/" + file
if file.endswith(".log"):
......@@ -482,7 +485,7 @@ class CasavaNG6Workflow(NG6Workflow):
logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 self.log_files = " + ",".join(self.log_files))
logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 all_samples_id = " + ",".join(all_samples_id))
logging.getLogger("ng6").debug("CasavaNG6Workflow._process_casava_18 exiting")
return all_samples, all_samples_id
def _process_casava_216(self,casava_directory, project_name, lane_number, input_files):
......@@ -506,7 +509,7 @@ class CasavaNG6Workflow(NG6Workflow):
logging.getLogger("ng6").debug("illumina_process self.is_casava")
if len(self.log_files) > 0 :
add_log = self.add_component("BasicAnalysis", [self.log_files,"Log Files","Log files generated during primary analysis","-","-","-","gz", "","log.gz"])
if len(self.undetermined_reads1) > 0 :
if self.casava['mismatch_index'] :
demultiplex_stats = self.add_component("DemultiplexStats", [self.get_all_reads("read1"), self.undetermined_reads1, self.get_files_index('read1')])
......
......@@ -35,7 +35,7 @@ class Project(object):
Class Project: Define a nG6 Project object
"""
def __init__(self, name, description, admin_login=None, id=None):
def __init__(self, name, description, admin_login=None, id=None, space_id="default"):
"""
Build a Project object
@param name : the project name
......@@ -46,14 +46,16 @@ class Project(object):
if id == None : # If brand new run
if admin_login != None:
t3mysql = t3MySQLdb()
self.id = t3mysql.add_project(name, description, admin_login)
self.id = t3mysql.add_project(name, description, admin_login, space_id)
else:
raise ValueError("An admin login is required to create a new project!")
else :
self.id = id
self.name = name
self.description = description
self.space_id=space_id
def get_name(self):
"""
Return the project name
......@@ -125,16 +127,16 @@ class Project(object):
config.set("Project", "project_id", self.id)
config.set("Project", "name", self.name)
config.set("Project", "description", self.description)
config_path = tempfile.NamedTemporaryFile(suffix=".cfg").name
config.write(open(config_path,'w'))
return config_path
def is_admin(self, admin_login):
"""
Return true/false
@param admin_user: login of the user
@return : true is admin_login is in admin user of the project
@param admin_user: login of the user
@return : true is admin_login is in admin user of the project
"""
t3mysql = t3MySQLdb()
user_ids = t3mysql.get_users(self.id,"administrator")
......@@ -143,7 +145,7 @@ class Project(object):
return True
else :
return False
@staticmethod
def get_from_id (id):
"""
......@@ -152,11 +154,11 @@ class Project(object):
"""
try:
t3mysql = t3MySQLdb()
[name, description] = t3mysql.select_project(id)
return Project(name, description, None, id)
[name, description, space_id] = t3mysql.select_project(id)
return Project(name, description, None, id, space_id)
except :
return None
@staticmethod
def get_from_name (name):
"""
......@@ -165,14 +167,11 @@ class Project(object):
"""
try:
t3mysql = t3MySQLdb()
[id, description] = t3mysql.select_project_from_name(name)
return Project(name, description, None, id)