Commit 18f27f0c authored by Jerome Mariette's avatar Jerome Mariette

integration of Dynamic components into ng6

parent 1760cdc7
......@@ -24,6 +24,7 @@ from jflow.workflows_manager import WorkflowsManager
from jflow.config_reader import JFlowConfigReader
from jflow.dataset import ArrayList
from jflow.utils import which
from jflow.iotypes import DynamicOutput
from weaver.util import parse_string_list
......@@ -32,16 +33,25 @@ class Component(object):
"""
"""
def __init__(self, is_dynamic=False):
"""
@param is_dynamic: if true, this component has dynamic inputs/outputs
"""
self.is_dynamic = is_dynamic
def __init__(self):
self.prefix = "default"
self.output_directory = None
self.config_reader = JFlowConfigReader()
self.version = self.get_version()
self.batch_options = self.config_reader.get_component_batch_options(self.__class__.__name__)
def is_dynamic(self):
return len(self.get_dynamic_outputs()) != 0
def get_dynamic_outputs(self):
"""
@return : the list of outputs updated at the end of component execution.
"""
dynamic_outputs = list()
for attribute_value in self.__dict__.values():
if issubclass( attribute_value.__class__, DynamicOutput ):
dynamic_outputs.append( attribute_value )
return dynamic_outputs
def _longestCommonSubstr(self, data, clean_end=True):
substr = ''
......
......@@ -96,7 +96,10 @@ class GFF3Reader(_AbstractFeatureReader):
if row[8] and row[8] != '.' :
for p in row[8].split(';') :
if p != "" :
attributes[p.split('=')[0]] = p.split('=')[1]
if len(p.partition('=')) == 3 :
attributes[p.partition('=')[0]] = p.partition('=')[2]
else :
print "Warn : Attribute "+ p +" gff3 for "+ "\t".join(row) + "\n"
return Entry(**{'seqid' : row[0], 'source' : row[1], 'type' : row[2], 'start' : int(row[3]), 'end' : int(row[4]),
'score' : row[5], 'strand' : row[6], 'phase' : row[7], 'attributes' : attributes })
......@@ -251,9 +254,11 @@ class VCFReader(_AbstractFeatureReader):
if len(variation.alt) > 1 :
variation.addattr( 'is_indel', True)
if row[7] != '.' :
variation.addattr( 'info', { p.split('=')[0] : autocast(p.split('=')[1]) for p in row[7].split(';') })
regexp_none=re.compile("\.(\/\.)*")
#if row[7] != '.' :
# variation.addattr( 'info', { p.split('=')[0] : autocast(p.split('=')[1]) for p in row[7].split(';') })
format = row[8].split(':')
for lib_infos in range (9,len(row)) :
if not regexp_none.match(row[lib_infos]):
......
......@@ -15,7 +15,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import os, re
import abc
from _pyio import __metaclass__
class Formats(object):
ANY = "any"
......@@ -26,15 +29,8 @@ class Formats(object):
QUAL = "qual"
FLOW = "flow"
HTML = "html"
MOTHUR_NAMES = "mothur_names"
MOTHUR_OLIGOS = "mothur_oligos"
MOTHUR_GROUPS = "mothur_groups"
MOTHUR_ACCNOS = "mothur_accnos"
MOTHUR_COUNT_TABLE = "mothur_count_table"
MOTHUR_TAXONOMY = "mothur_taxonomy"
MOTHUR_TAX_SUMMARY = "mothur_tax_summary"
MOTHUR_DIST = "mothur_dist"
#
# Inputs classes
#
class InputFile(str):
......@@ -65,6 +61,10 @@ class InputFileList(list):
return list.__init__(self, inputs)
elif inputs.__class__.__name__ == "OutputFileList" and (inputs.format == self.format or self.format == Formats.ANY):
return list.__init__(self, inputs)
elif inputs.__class__.__name__ == "OutputFileEndsWith" and (inputs.format == self.format or self.format == Formats.ANY):
return list.__init__(self, inputs)
elif inputs.__class__.__name__ == "OutputFilePattern" and (inputs.format == self.format or self.format == Formats.ANY):
return list.__init__(self, inputs)
elif (inputs.__class__.__name__ == "OutputFile" or inputs.__class__.__name__ == "OutputFileList") and \
not (inputs.format == self.format or self.format == Formats.ANY):
raise IOError(inputs.format+" format is incompatible with allowed format "+self.format)
......@@ -104,6 +104,19 @@ class InputDirectoryList(list):
#
# Outputs classes
#
class DynamicOutput(list):
"""
@warning : with this class of output, the component become dynamic.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def update(self):
"""
This method is used at the end of component execution to update output list.
"""
raise NotImplementedError
class OutputFile(str):
def __new__(self, output, format=Formats.ANY):
......@@ -115,6 +128,78 @@ class OutputFile(str):
def check(self):
return "TODO: check output file"
class OutputFileList(list):
def __init__(self, outputs, format=Formats.ANY):
self.format = format
if outputs.__class__.__name__ == "str":
return list.__init__(self, [outputs])
elif outputs.__class__.__name__ == "list":
return list.__init__(self, outputs)
else:
raise IOError(outputs.__class__.__name__+" cannot be used as OutputFileList")
def check(self):
return "TODO: check output file list"
class OutputFilePattern(DynamicOutput):
def __init__(self, output_directory, pattern, format=Formats.ANY, include=True):
"""
@warning : with this class of output, the component become dynamic.
@param output_directory : path to the directory where outputs will be created.
@param pattern : the pattern of (a part) the file names.
@param format : the files format.
@param include : if true, the files with the pattern in file name are added into output files.
If false, the files with the pattern in file name are added into output files.
"""
self.output_directory = output_directory
self.pattern = pattern
self.format = format
self.include = include
return list.__init__(self, [])
def update(self):
output_files = list()
for file in os.listdir( self.output_directory ):
if self.include and re.search( self.pattern, file ) is not None:
output_files.append( os.path.join(self.output_directory, file) )
elif not self.include and re.search( self.pattern, file ) is None:
output_files.append( os.path.join(self.output_directory, file) )
return list.__init__(self, output_files)
def check(self):
return "TODO: check output file list"
class OutputFileEndsWith(DynamicOutput):
def __init__(self, output_directory, end_str, format=Formats.ANY, include=True):
"""
@warning : with this class of output, the component become dynamic.
@param output_directory : path to the directory where outputs will be created.
@param end_str : the end of the files names.
@param format : the files format.
@param include : if true, the files with name terminated by end_str are added into output files.
If false, the files with name not terminated by end_str are added into output files.
"""
self.output_directory = output_directory
self.end_str = end_str
self.format = format
self.include = include
return list.__init__(self, [])
def update(self):
output_files = list()
for file in os.listdir( self.output_directory ):
if file.endswith( self.end_str ) and self.include :
output_files.append( os.path.join(self.output_directory, file) )
elif not file.endswith( self.end_str ) and not self.include:
output_files.append( os.path.join(self.output_directory, file) )
return list.__init__(self, output_files)
def check(self):
return "TODO: check output file list"
class OutputFileList(list):
......
......@@ -209,8 +209,11 @@ class Workflow(threading.Thread):
# add the component
self.components.append(cmpt_object)
# if this one require a dynamic pipeline, execute the first part
if cmpt_object.is_dynamic:
if cmpt_object.is_dynamic():
self._execute_weaver()
# update outputs
for output in cmpt_object.get_dynamic_outputs():
output.update()
return cmpt_object
else:
raise ImportError(component_name + " component cannot be loaded, available components are: {0}".format(", ".join(self.comp_pckg.keys())))
......
......@@ -84,7 +84,7 @@ class Analysis (Component):
DIRECTORIES_STRUCTURE = "/data/analyze/"
ANALYSIS_CONFIG_FILE_NAME = "analysis.cfg"
def __init__(self, is_dynamic=False, name="", description="", software="",
def __init__(self, name="", description="", software="",
options="", version="", id=None, parent_analysis=None):
"""
Build an Analysis object
......@@ -95,7 +95,7 @@ class Analysis (Component):
@param id : the analysis id if not defined
@param parent_analysis : the parent analysis
"""
Component.__init__(self, is_dynamic)
Component.__init__(self)
self.name = name
self.description = description
self.software = software
......@@ -589,7 +589,7 @@ class Analysis (Component):
try:
t3mysql = t3MySQLdb()
[name, date, description, software, options, version] = t3mysql.select_analysis(id)
my_analysis = Analysis(None, name, description, software, options, version, id)
my_analysis = Analysis(name, description, software, options, version, id)
if my_analysis == None:
logging.getLogger("Analysis.get_from_id").error("The analysis id=" + str(id) + " does not exists in the database.")
raise Exception("The analysis id=" + str(id) + " does not exists in the database.\n")
......
......@@ -107,8 +107,11 @@ class NG6Workflow (Workflow):
# add the component
self.components.append(cmpt_object)
# if this one require a dynamic pipeline, execute the first part
if cmpt_object.is_dynamic:
if cmpt_object.is_dynamic():
self._execute_weaver()
# update outputs
for output in cmpt_object.get_dynamic_outputs():
output.update()
return cmpt_object
else:
raise ImportError(component_name + " component cannot be loaded, available components are: {0}".format(", ".join(self.comp_pckg.keys())))
......@@ -119,3 +122,4 @@ class NG6Workflow (Workflow):
self.runobj.sync()
elif self.project:
self.project.sync()
\ No newline at end of file
......@@ -28,11 +28,12 @@ from subprocess import call
from shutil import copyfile, rmtree
from ConfigParser import ConfigParser, RawConfigParser
from jflow import seqio
from ng6.t3MySQLdb import t3MySQLdb
from ng6.config_reader import NG6ConfigReader
from ng6.utils import Utils
from ng6.exceptions import UnsavedRunError
from ng6 import seqio
class Run(object):
"""
......
......@@ -546,24 +546,4 @@ class t3MySQLdb(object):
conn.close()
logging.getLogger("t3MySQLdb.select_analysis").debug("Returning [" + str([res[0], res[1], res[2], res[3], res[4], res[5]]) + "]")
return [res[0], res[1], res[2], res[3], res[4], res[5]]
def select_analysis_run_id(self, analysis_id):
"""
Return the run_id the analysis belongs to
@param analysis_id : the analysis id to select
@return: run_id
"""
conn = connect(self.host, self.user, self.passwd, self.db)
curs = conn.cursor()
try : # If it's a run analysis
req = "SELECT run_id FROM tx_nG6_run_analyze WHERE tx_nG6_run_analyze.analyze_id='" + str(analysis_id) + "'"
curs.execute(req)
res = curs.fetchall()[0]
curs.close()
conn.close()
return res[0]
except : # If not, return None
curs.close()
conn.close()
return None
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment