Commit a8d1eca7 authored by Jerome Mariette's avatar Jerome Mariette
Browse files

add DynamicOutputs to handle dynamc components

parent 5ef8d333
...@@ -24,6 +24,7 @@ from jflow.workflows_manager import WorkflowsManager ...@@ -24,6 +24,7 @@ from jflow.workflows_manager import WorkflowsManager
from jflow.config_reader import JFlowConfigReader from jflow.config_reader import JFlowConfigReader
from jflow.dataset import ArrayList from jflow.dataset import ArrayList
from jflow.utils import which from jflow.utils import which
from jflow.iotypes import DynamicOutput
from weaver.util import parse_string_list from weaver.util import parse_string_list
...@@ -32,16 +33,25 @@ class Component(object): ...@@ -32,16 +33,25 @@ class Component(object):
""" """
""" """
def __init__(self, is_dynamic=False): def __init__(self):
"""
@param is_dynamic: if true, this component has dynamic inputs/outputs
"""
self.is_dynamic = is_dynamic
self.prefix = "default" self.prefix = "default"
self.output_directory = None self.output_directory = None
self.config_reader = JFlowConfigReader() self.config_reader = JFlowConfigReader()
self.version = self.get_version() self.version = self.get_version()
self.batch_options = self.config_reader.get_component_batch_options(self.__class__.__name__) self.batch_options = self.config_reader.get_component_batch_options(self.__class__.__name__)
def is_dynamic(self):
return len(self.get_dynamic_outputs()) != 0
def get_dynamic_outputs(self):
"""
@return : the list of outputs updated at the end of component execution.
"""
dynamic_outputs = list()
for attribute_value in self.__dict__.values():
if issubclass( attribute_value.__class__, DynamicOutput ):
dynamic_outputs.append( attribute_value )
return dynamic_outputs
def _longestCommonSubstr(self, data, clean_end=True): def _longestCommonSubstr(self, data, clean_end=True):
substr = '' substr = ''
......
...@@ -15,7 +15,10 @@ ...@@ -15,7 +15,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
# #
import os import os, re
import abc
from _pyio import __metaclass__
class Formats(object): class Formats(object):
ANY = "any" ANY = "any"
...@@ -23,6 +26,9 @@ class Formats(object): ...@@ -23,6 +26,9 @@ class Formats(object):
FASTQ = "fastq" FASTQ = "fastq"
FASTA = "fasta" FASTA = "fasta"
SFF = "sff" SFF = "sff"
QUAL = "qual"
FLOW = "flow"
HTML = "html"
# #
# Inputs classes # Inputs classes
...@@ -55,6 +61,10 @@ class InputFileList(list): ...@@ -55,6 +61,10 @@ class InputFileList(list):
return list.__init__(self, inputs) return list.__init__(self, inputs)
elif inputs.__class__.__name__ == "OutputFileList" and (inputs.format == self.format or self.format == Formats.ANY): elif inputs.__class__.__name__ == "OutputFileList" and (inputs.format == self.format or self.format == Formats.ANY):
return list.__init__(self, inputs) return list.__init__(self, inputs)
elif inputs.__class__.__name__ == "OutputFileEndsWith" and (inputs.format == self.format or self.format == Formats.ANY):
return list.__init__(self, inputs)
elif inputs.__class__.__name__ == "OutputFilePattern" and (inputs.format == self.format or self.format == Formats.ANY):
return list.__init__(self, inputs)
elif (inputs.__class__.__name__ == "OutputFile" or inputs.__class__.__name__ == "OutputFileList") and \ elif (inputs.__class__.__name__ == "OutputFile" or inputs.__class__.__name__ == "OutputFileList") and \
not (inputs.format == self.format or self.format == Formats.ANY): not (inputs.format == self.format or self.format == Formats.ANY):
raise IOError(inputs.format+" format is incompatible with allowed format "+self.format) raise IOError(inputs.format+" format is incompatible with allowed format "+self.format)
...@@ -94,6 +104,19 @@ class InputDirectoryList(list): ...@@ -94,6 +104,19 @@ class InputDirectoryList(list):
# #
# Outputs classes # Outputs classes
# #
class DynamicOutput(list):
"""
@warning : with this class of output, the component become dynamic.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def update(self):
"""
This method is used at the end of component execution to update output list.
"""
raise NotImplementedError
class OutputFile(str): class OutputFile(str):
def __new__(self, output, format=Formats.ANY): def __new__(self, output, format=Formats.ANY):
...@@ -105,6 +128,78 @@ class OutputFile(str): ...@@ -105,6 +128,78 @@ class OutputFile(str):
def check(self): def check(self):
return "TODO: check output file" return "TODO: check output file"
class OutputFileList(list):
def __init__(self, outputs, format=Formats.ANY):
self.format = format
if outputs.__class__.__name__ == "str":
return list.__init__(self, [outputs])
elif outputs.__class__.__name__ == "list":
return list.__init__(self, outputs)
else:
raise IOError(outputs.__class__.__name__+" cannot be used as OutputFileList")
def check(self):
return "TODO: check output file list"
class OutputFilePattern(DynamicOutput):
def __init__(self, output_directory, pattern, format=Formats.ANY, include=True):
"""
@warning : with this class of output, the component become dynamic.
@param output_directory : path to the directory where outputs will be created.
@param pattern : the pattern of (a part) the file names.
@param format : the files format.
@param include : if true, the files with the pattern in file name are added into output files.
If false, the files with the pattern in file name are added into output files.
"""
self.output_directory = output_directory
self.pattern = pattern
self.format = format
self.include = include
return list.__init__(self, [])
def update(self):
output_files = list()
for file in os.listdir( self.output_directory ):
if self.include and re.search( self.pattern, file ) is not None:
output_files.append( os.path.join(self.output_directory, file) )
elif not self.include and re.search( self.pattern, file ) is None:
output_files.append( os.path.join(self.output_directory, file) )
return list.__init__(self, output_files)
def check(self):
return "TODO: check output file list"
class OutputFileEndsWith(DynamicOutput):
def __init__(self, output_directory, end_str, format=Formats.ANY, include=True):
"""
@warning : with this class of output, the component become dynamic.
@param output_directory : path to the directory where outputs will be created.
@param end_str : the end of the files names.
@param format : the files format.
@param include : if true, the files with name terminated by end_str are added into output files.
If false, the files with name not terminated by end_str are added into output files.
"""
self.output_directory = output_directory
self.end_str = end_str
self.format = format
self.include = include
return list.__init__(self, [])
def update(self):
output_files = list()
for file in os.listdir( self.output_directory ):
if file.endswith( self.end_str ) and self.include :
output_files.append( os.path.join(self.output_directory, file) )
elif not file.endswith( self.end_str ) and not self.include:
output_files.append( os.path.join(self.output_directory, file) )
return list.__init__(self, output_files)
def check(self):
return "TODO: check output file list"
class OutputFileList(list): class OutputFileList(list):
......
...@@ -209,8 +209,11 @@ class Workflow(threading.Thread): ...@@ -209,8 +209,11 @@ class Workflow(threading.Thread):
# add the component # add the component
self.components.append(cmpt_object) self.components.append(cmpt_object)
# if this one require a dynamic pipeline, execute the first part # if this one require a dynamic pipeline, execute the first part
if cmpt_object.is_dynamic: if cmpt_object.is_dynamic():
self._execute_weaver() self._execute_weaver()
# update outputs
for output in cmpt_object.get_dynamic_outputs():
output.update()
return cmpt_object return cmpt_object
else: else:
raise ImportError(component_name + " component cannot be loaded, available components are: {0}".format(", ".join(self.comp_pckg.keys()))) raise ImportError(component_name + " component cannot be loaded, available components are: {0}".format(", ".join(self.comp_pckg.keys())))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment