Commit e18eb015 authored by Claire Kuchly's avatar Claire Kuchly
Browse files

Merge branch 'master' into 'ng6-Slurm-10X'

Master into 10X

See merge request !30
parents 0e04d755 00b365f2
__pycache__
smarty
......@@ -27,9 +27,16 @@ limit_submission = 100
server_socket_host = 127.0.0.1
# on which socket port should run the web server
server_socket_port = 8080
#server_socket_port = 8083
# protocol used to access web server (http|https)
server_socket_protocol = https
# date format
date_format = %d/%m/%Y
#optional : key for jflow server ssl configuration
ssl_certificate = certificate.crt
ssl_certificate_chain = certificate_chain.crt
ssl_private_key = private_key.key
[email]
# if you want an email to be sent at the end of the workflow execution
# set the smtp_server and the from_address values
......@@ -78,6 +85,19 @@ work_directory = <path>/work
# accessible by all cluster nodes
tmp_directory = <path>/tmp
# Configure different storage directory
# and associate retention date or duration
#
# eg with a duration
[space_default]
dir_name=default
retention_period=3
#
# eg with a fix date
[space_final]
dir_name=directory
retention_date=12/03/2021
[ui]
# This must be an absolute path of a directory mounted and accessible by the server
user_base_directory = /work/
......
......@@ -138,7 +138,7 @@ CREATE USER 'typo3'@' %' IDENTIFIED BY 'typo3';
GRANT ALL ON typo3.* TO 'typo3'@' %';
#Get last ng6 source code
svn checkout svn://scm.mulcyber.toulouse.inra.fr/svnroot/ng6/nG6
Download the latest version from https://forgemia.inra.fr/genotoul-bioinfo/ng6/tags/
</code></pre>
Now go to step #4.
</div>
......@@ -146,7 +146,7 @@ Now go to step #4.
<section id="download">
<h1 class="page-header">Step #2 <small>download the latest copy</small></h1>
<p>The latest version can be downloaded from the <a href="https://mulcyber.toulouse.inra.fr/frs/?group_id=102" target="_blank">download page</a>.</p>
<p>The latest version can be downloaded from the <a href="https://forgemia.inra.fr/genotoul-bioinfo/ng6/tags/" target="_blank">page</a>.</p>
</section>
<section id="extract">
......@@ -184,6 +184,7 @@ dbname = typo3
[storage]
# where should be written the log file
log_file = /path/to/jflow.log
log_level = logging.INFO
# Where should the pipelines write results, should be accessible
# by all cluster nodes
work_directory = /path/to/work
......
......@@ -63,6 +63,7 @@ class Component(object):
# in case of SGE, parse the cpu and memory parameter
self.__cpu=None
self.__memory=None
self.__memory_per_cpu=None
type, options, limit_submission = self.config_reader.get_batch()
if type.lower() == "sge" :
try:
......@@ -80,11 +81,14 @@ class Component(object):
except: pass
elif type.lower() == "slurm" :
try:
self.__cpu = int(re.match( r'.*-c\s+(\w+)\s+(\d+)\s?.*', self.batch_options).group(2))
self.__cpu = int(re.match( r'.*-c\s+(\d+).*', self.batch_options).group(1))
except: pass
try:
self.__memory = re.match( r'.*--mem=(\d+\S+)\s?.*', self.batch_options).group(1)
except: pass
try:
self.__memory_per_cpu = re.match( r'.*--mem-per-cpu=(\d+\S+)\s?.*', self.batch_options).group(1)
except: pass
def get_prefix(self):
return self.__prefix
......@@ -99,7 +103,10 @@ class Component(object):
def get_memory(self):
return self.__memory
def get_memory_per_cpu(self):
return self.__memory_per_cpu
def is_dynamic(self):
return len(self.get_dynamic_outputs()) != 0
......
......@@ -112,9 +112,15 @@ class JFlowConfigReader(object):
def get_socket_options(self):
try:
return [self.reader.get("global", "server_socket_host"), int(self.reader.get("global", "server_socket_port"))]
return [self.reader.get("global", "server_socket_host"), int(self.reader.get("global", "server_socket_port")), self.reader.get("global", "server_socket_protocol")]
except:
return ["127.0.0.1", 8080]
return ["127.0.0.1", 8080,"http"]
def get_ssl_files(self):
try:
return [self.reader.get("global", "ssl_certificate"), self.reader.get("global", "ssl_certificate_chain"),self.reader.get("global", "ssl_private_key")]
except:
return []
def get_email_options(self):
try: smtps = self.reader.get("email", "smtp_server")
......
......@@ -140,6 +140,7 @@ class JFlowServer (object):
# define the socket host and port
jflowconf = JFlowConfigReader()
socket_opts = jflowconf.get_socket_options()
ssl_opts = jflowconf.get_ssl_files()
# add the result directory
if config is None or not '/' in config:
......@@ -162,6 +163,11 @@ class JFlowServer (object):
cherrypy.config.update({'server.socket_host': socket_opts[0],
'server.socket_port': socket_opts[1]})
if len(ssl_opts)==3:
cherrypy.config.update({'server.ssl_module': 'builtin',
'server.ssl_certificate': ssl_opts[0],
'server.ssl_certificate_chain': ssl_opts[1],
'server.ssl_private_key': ssl_opts[2]})
# start the server
cherrypy.quickstart(server_class(), config=config)
......@@ -524,7 +530,7 @@ class JFlowServer (object):
socket_opt = self.jflow_config_reader.get_socket_options()
if not os.path.isdir(path):
return {
'url':'http://' + socket_opt[0] + ':' + str(socket_opt[1]) + '/' + path.replace(work_dir, web_path),
'url':socket_opt[2]+'://' + socket_opt[0] + ':' + str(socket_opt[1]) + '/' + path.replace(work_dir, web_path),
'size': get_octet_string_representation(os.path.getsize(os.path.abspath(path))),
'extension': os.path.splitext(path)[1]
}
......
......@@ -50,33 +50,54 @@ def add_analysis(parent_id, analysis_cfg, *input_files):
from ng6.project import Project
from ng6.run import Run
import pickle
import logging
logging.getLogger("Analysis").debug("Start. Imports went good.")
logging.getLogger("Analysis").debug("Start. working for analysis " + analysis_cfg)
# get inputs from parameters
analysis_serialized_path = input_files[0]
try: parent_analysis_cfg = input_files[1]
except: parent_analysis_cfg = None
logging.getLogger("Analysis").debug("Start. parent_analysis_cfg is initialised.")
# load the analysis object
analysis_dump = open(analysis_serialized_path, "rb")
analysis = pickle.load(analysis_dump)
analysis_dump.close()
logging.getLogger("Analysis").debug("Dump. analysis_dump is closed and analysis is loaded.")
# add the parent information
logging.getLogger("Analysis").debug("parent_analysis_cfg. Starting")
if parent_analysis_cfg:
if os.path.isfile(parent_analysis_cfg):
logging.getLogger("Analysis").debug("parent_analysis_cfg. Analysis.get_from_file(parent_analysis_cfg) is about to start")
parent = Analysis.get_from_file(parent_analysis_cfg)
logging.getLogger("Analysis").debug("parent_analysis_cfg. Analysis.get_from_file(parent_analysis_cfg) is done")
analysis.parent = parent
elif parent_id != 'none' :
logging.getLogger("Analysis").debug("parent_analysis_cfg. Analysis.get_from_file(parent_analysis_cfg) is about to start")
parent = Analysis.get_from_id(int(parent_id))
analysis.parent = parent
logging.getLogger("Analysis").debug("parent_analysis_cfg. Analysis.get_from_file(parent_analysis_cfg) is done")
# process the parsing of the analysis
logging.getLogger("Analysis").debug("analysis.post_process. Starting")
analysis.post_process()
logging.getLogger("Analysis").debug("analysis.post_process. Done")
# add the analysis to the right run/project object
if analysis.project: analysis.project.add_analysis(analysis)
elif analysis.run: analysis.run.add_analysis(analysis)
logging.getLogger("Analysis").debug("analysis.project. Starting add_analysis(analysis)")
if analysis.project:
logging.getLogger("Analysis").debug("analysis.project. Starting analysis.project.add_analysis(analysis)")
analysis.project.add_analysis(analysis)
logging.getLogger("Analysis").debug("analysis.project. Finishing analysis.project.add_analysis(analysis)")
elif analysis.run:
logging.getLogger("Analysis").debug("analysis.run. Starting analysis.run.add_analysis(analysis)")
analysis.run.add_analysis(analysis)
logging.getLogger("Analysis").debug("analysis.run. Finishing analysis.run.add_analysis(analysis)")
logging.getLogger("Analysis").debug("Config File. Starting the writing of config file")
analysis.write_config_file()
logging.getLogger("Analysis").debug("Config File. Is written")
class Analysis (Component):
"""
......@@ -84,11 +105,11 @@ class Analysis (Component):
"""
# The directories structure into ng6
DIRECTORIES_STRUCTURE = "/data/analyze/"
DIRECTORIES_STRUCTURE = "analyze"
ANALYSIS_CONFIG_FILE_NAME = "analysis.cfg"
def __init__(self, name="", description="", software="",
options="", version="", id=None, parent_analysis=None):
options="", version="", id=None, parent_analysis=None, space_id="default"):
"""
Build an Analysis object
@param name : the analysis name
......@@ -111,48 +132,60 @@ class Analysis (Component):
self.results = {}
self.run = None
self.project = None
self.is_editable = False
self.is_editable = False
self.space_id = space_id
self.retention_date = None
# Set the temp folder to the ng6 temp folder
ng6conf = NG6ConfigReader()
logging.getLogger("Analysis.__init__").debug("ng6conf est chargé")
tempfile.tempdir = ng6conf.get_tmp_directory()
if not os.path.isdir(tempfile.tempdir):
os.makedirs(tempfile.tempdir, 0o751)
logging.getLogger("Analysis.__init__").debug("tempfile.tempdir = " + tempfile.tempdir)
if not os.path.isdir(tempfile.tempdir):
os.makedirs(tempfile.tempdir, 0o751)
if id != None : # If not a new analysis
logging.getLogger("Analysis.__init__").debug("Connexion à la BD")
t3mysql = t3MySQLdb()
logging.getLogger("Analysis.__init__").debug("Connexion effectuée")
self.run = Run.get_from_id(self.__get_run_id())
from ng6.project import Project
self.project = Project.get_from_id(self.__get_project_id())
self.project = Project.get_from_run_id(self.__get_run_id())
self.space_id=self.project.space_id
self.directory = t3mysql.select_analysis_directory(id)
logging.getLogger("Analysis.__init__").debug("Building analysis with id=" + str(id) + " [" + str(self) + "]")
else :
self.version = self.get_version()
if isinstance(self.version, bytes):
self.version = self.version.decode()
directory_name = uuid.uuid4().hex[:9]
while True:
save_dir = os.path.join(ng6conf.get_save_directory()+self.DIRECTORIES_STRUCTURE, directory_name)
work_dir = os.path.join(ng6conf.get_work_directory()+self.DIRECTORIES_STRUCTURE, directory_name)
if not os.path.isdir(save_dir) and not os.path.isdir(work_dir):
break
directory_name = uuid.uuid4().hex[:9]
self.directory = os.path.join(self.DIRECTORIES_STRUCTURE, directory_name)
logging.getLogger("Analysis.__init__").debug("Building brand new analysis [" + str(self) + "]")
def define_analysis(self):
"""
"""
Define all analysis attributs, has to be implemented by subclasses
"""
raise NotImplementedError
def post_process(self):
"""
Process the analysis, has to be implemented in the sub class
"""
raise NotImplementedError
def execute(self):
ng6conf = NG6ConfigReader()
directory_name = uuid.uuid4().hex[:9]
while True:
save_dir = os.path.join(ng6conf.get_save_directory(), ng6conf.get_space_directory(self.space_id), self.DIRECTORIES_STRUCTURE, directory_name)
logging.getLogger("Analysis.__init__").debug("Building analysis save_dir=" + save_dir)
work_dir = os.path.join(ng6conf.get_work_directory(), ng6conf.get_space_directory(self.space_id), self.DIRECTORIES_STRUCTURE, directory_name)
logging.getLogger("Analysis.__init__").debug("Building analysis work_dir=" + save_dir)
if not os.path.isdir(save_dir) and not os.path.isdir(work_dir):
break
directory_name = uuid.uuid4().hex[:9]
self.directory = "/" + os.path.join(ng6conf.get_space_directory(self.space_id), self.DIRECTORIES_STRUCTURE, directory_name)
self.retention_date = ng6conf.get_retention_date(self.space_id)
# first create the output directory
if not os.path.isdir(self.output_directory):
os.makedirs(self.output_directory, 0o751)
......@@ -162,7 +195,7 @@ class Analysis (Component):
# then add analysis information
self.define_analysis()
inputs = []
# serialized the object
analysis_dump_path = self.get_temporary_file(".dump")
......@@ -170,7 +203,7 @@ class Analysis (Component):
pickle.dump(self, analysis_dump)
analysis_dump.close()
inputs.append(analysis_dump_path)
parent_id = 'none'
wait_for_files = []
for attr in self.__dict__:
......@@ -187,17 +220,17 @@ class Analysis (Component):
inputs.append(self.parent.get_config_file_path())
# then run the component
self.process()
# add the add_analysis command lines to the make
add = PythonFunction(add_analysis, cmd_format="{EXE} {ARG} {OUT} {IN}")
add(includes=wait_for_files, outputs=self.get_config_file_path(), inputs=inputs, arguments=parent_id)
def __str__(self):
"""
Return a full description of the analysis
Return a full description of the analysis
"""
return "name="+str(self.name)+";description="+str(self.description)+";software="+\
str(self.software)+";version="+str(self.version)+";options="+str(self.options)
return "id="+str(self.id)+";name="+str(self.name)+";description="+str(self.description)+";software="+\
str(self.software)+";version="+str(self.version)+";options="+str(self.options)+\
";space_id="+str(self.space_id)+";project="+str(self.project)+";run="+str(self.run)
def sync(self):
"""
......@@ -227,7 +260,7 @@ class Analysis (Component):
t3mysql.update_analysis_field(self.id, "storage_size", Utils.get_storage_size(self.__get_save_directory()))
except Exception as err:
raise Exception(str(err))
def list_or_copy_analysis_files(self, outputdir, pattern):
"""
Return list of file corresponding to the pattern and copy unzipped files into output dir if it's zipped
......@@ -237,7 +270,7 @@ class Analysis (Component):
list = []
if outputdir == "":
outputdir = tempfile.mkdtemp()
for file in os.listdir(self.__get_save_directory()) :
# If it fits criteria
if pattern != "":
......@@ -280,13 +313,17 @@ class Analysis (Component):
update the database info
"""
# First add the run into the database
logging.getLogger("Analysis.save").debug("Connexion try")
t3mysql = t3MySQLdb()
logging.getLogger("Analysis.save").debug("Connexion done")
if self.parent: parent_uid = self.parent.id
else: parent_uid = 0
logging.getLogger("Analysis.save").debug("Doing the add analysis")
self.id = t3mysql.add_analysis(self.get_template(), self.name, self.description, self.admin_login,
datetime.date.today(), self.directory, self.software, self.version,
self.options, self.is_editable, parent_uid=parent_uid)
datetime.date.today(), self.directory, self.software, self.version,
self.options, self.retention_date, self.retention_date,
self.is_editable, parent_uid=parent_uid)
logging.getLogger("Analysis.save").debug("add_analysis done")
# Then add all results link to this analysis
for file in self.results:
for result in self.results[file]:
......@@ -304,7 +341,7 @@ class Analysis (Component):
@return : path the the config file
"""
t3mysql = t3MySQLdb()
# First select run
[name, date, description, software, options, version] = t3mysql.select_analysis(self.id)
config = RawConfigParser()
......@@ -316,7 +353,7 @@ class Analysis (Component):
config.set("Analysis", "software", software)
config.set("Analysis", "options", options)
config.set("Analysis", "version", version)
config_path = self.get_config_file_path()
config.write(open(config_path,'w'))
return config_path
......@@ -337,6 +374,7 @@ class Analysis (Component):
def _create_and_archive(self, files, archive_name=None, prefix="dir"):
logging.getLogger("Analysis").debug("_create_and_archive entering")
"""
return the web path to the archive files
If there are samples, datas will be organised by samples
......@@ -369,6 +407,7 @@ class Analysis (Component):
run_id = self.__get_run_id()
my_run = Run.get_from_id(run_id)
samples = my_run.get_samples()
logging.getLogger("Analysis").debug("_create_and_archive with samples")
gfiles = {}
ungrouped_files = []
what_left = []
......@@ -442,14 +481,20 @@ class Analysis (Component):
for file in what_left:
copyfile(file, os.path.join(tmp_dir, os.path.basename(file)))
logging.getLogger("Analysis").debug("_create_and_archive before tarf")
tarf = Utils.tar_dir(tmp_dir, os.path.join(self.__get_work_directory(), archive_name))
logging.getLogger("Analysis").debug("_create_and_archive before targzf")
targzf = Utils.gzip(tarf, self.__get_work_directory(), delete=False)
# Then delete temporary files
logging.getLogger("Analysis").debug("_create_and_archive before os.remove(tarf)")
os.remove(tarf)
logging.getLogger("Analysis").debug("_create_and_archive before rmtree(tmp_dir)")
rmtree(tmp_dir)
logging.getLogger("Analysis").debug("_create_and_archive before return " + os.path.join(self.directory, os.path.basename(targzf)))
return 'fileadmin' + os.path.join(self.directory, os.path.basename(targzf))
except :
logging.getLogger("Analysis").debug("_create_and_archive in execpt, without samples?")
gfiles = {}
# Group files by folders
for file in files:
......@@ -470,19 +515,29 @@ class Analysis (Component):
os.makedirs(dir)
for file in gfiles[dir_name]:
copyfile(file, os.path.join(dir, os.path.basename(file)))
logging.getLogger("Analysis").debug("_create_and_archive before tarf")
tarf = Utils.tar_dir(tmp_dir, os.path.join(self.__get_work_directory(), archive_name))
logging.getLogger("Analysis").debug("_create_and_archive before targzf")
targzf = Utils.gzip(tarf, self.__get_work_directory(), delete=False)
# Then delete temporary files
logging.getLogger("Analysis").debug("_create_and_archive before os.remove(tarf)")
os.remove(tarf)
logging.getLogger("Analysis").debug("_create_and_archive before rmtree(tmp_dir)")
rmtree(tmp_dir)
logging.getLogger("Analysis").debug("_create_and_archive before return " + os.path.join(self.directory, os.path.basename(targzf)))
return 'fileadmin' + os.path.join(self.directory, os.path.basename(targzf))
else :
logging.getLogger("Analysis").debug("_create_and_archive, length differs")
logging.getLogger("Analysis").debug("_create_and_archive before tarf")
tarf = Utils.tar_files(files, os.path.join(self.__get_work_directory(), archive_name))
logging.getLogger("Analysis").debug("_create_and_archive before targzf")
targzf = Utils.gzip(tarf, self.__get_work_directory(), delete=False)
# Then delete temporary files
logging.getLogger("Analysis").debug("_create_and_archive before os.remove(tarf)")
os.remove(tarf)
logging.getLogger("Analysis").debug("_create_and_archive before return " + os.path.join(self.directory, os.path.basename(targzf)))
return 'fileadmin' + os.path.join(self.directory, os.path.basename(targzf))
......@@ -493,14 +548,15 @@ class Analysis (Component):
@param files: the files to archive
@param mode: can be none, gz, bz2, tar.gz and tar.bz2
@param archive_name: the archive name if tar is requested
@param delete: delete files
@param delete: delete files
"""
# First handle if only one file
if len(files) == 1 and mode == "tar.gz":
mode = "gz"
elif len(files) == 1 and mode == "tar.bz2":
mode = "bz2"
if mode == "none":
for file in files:
if os.path.isfile(file):
......@@ -548,7 +604,7 @@ class Analysis (Component):
@param gzip : if True the file will be gziped
@return : the web path to the file
"""
if not os.path.isdir(self.__get_work_directory()):
os.makedirs(self.__get_work_directory(), 0o751)
# First copy the file into the analysis directory
......
......@@ -25,24 +25,29 @@ __status__ = 'beta'
import os
from configparser import ConfigParser
from .utils import *
import logging
import logging
from dateutil.relativedelta import relativedelta
import datetime
import time
class NG6ConfigReader(object):
"""
Class NG6ConfigReader: this object read the config file and return the different configuration values
"""
"""
# The ng6 config file path
CONFIG_FILE = os.path.join(os.path.dirname(__file__), "../../application.properties")
CONFIG_FILE = os.path.join(os.path.dirname(__file__), "../../application.properties")
PID = 5
def __init__(self):
"""
Build a ConfigReader
"""
"""
self.reader = ConfigParser()
self.reader.read(self.CONFIG_FILE)
def get_typo3_db_params(self):
"""
return params needed to connect to the typo3 database
......@@ -56,7 +61,7 @@ class NG6ConfigReader(object):
db_params.append(self.reader.get('database', 'dbname'))
return db_params
except :
raise Error("Failed when parsing the config file !")
raise Exception("Failed when parsing the config file - get_typo3_db_params !")
def get_pid(self):
......@@ -75,7 +80,7 @@ class NG6ConfigReader(object):
try:
return self.reader.get('storage', 'work_directory')
except :
raise Error("Failed when parsing the config file !")
raise Exception("Failed when parsing the config file - get_work_directory! ")
def get_tmp_directory(self):
......@@ -86,8 +91,8 @@ class NG6ConfigReader(object):
try:
return self.reader.get('storage', 'tmp_directory')
except :
raise Error("Failed when parsing the config file !")
raise Exception("Failed when parsing the config file get_tmp_directory !")
def get_save_directory(self):
"""
......@@ -97,9 +102,37 @@ class NG6ConfigReader(object):
try:
return self.reader.get('storage', 'save_directory')
except :
raise Error("Failed when parsing the config file !")
raise Exception("Failed when parsing the config file get_save_directory !")
def get_space_directory(self, space_id="default"):
"""
return the directory corresponding to space_id
@return: space_dir
"""
try:
return self.reader.get('space_'+space_id, 'dir_name').strip("/")
except:
raise Exception("Failed when parsing the config file !")
def get_retention_date(self, space_id="default"):
"""
return the directory corresponding to space_id
@return: space_dir
"""
date = None
try :
nb_month=self.reader.getint('space_'+space_id,"retention_period")
retention_date = time.mktime((datetime.datetime.today()+ relativedelta(months=nb_month)).timetuple())
return retention_date
except:
try :
(d,m,y)=self.reader.get('space_'+space_id,"retention_date").split('/')
date = time.mktime(datetime.date(int(y),int(m),int(d)).timetuple())
return date
except:
raise Exception("Failed when parsing the config file !")
raise ValueError("Failed while gnerating retention date!")
def get_log_file_path(self):
"""
return the log file path
......@@ -108,7 +141,7 @@ class NG6ConfigReader(object):
try:
return self.reader.get('storage', 'log_file')
except :
raise Error("Failed when parsing the config file !")
raise Exception("Failed when parsing the config file !")
def get_log_level(self):
"""
return the level of log
......@@ -117,9 +150,9 @@ class NG6ConfigReader(object):
try:
return self.reader.get('storage', 'log_level')
except :
raise Error("Failed when parsing the config file !")