Commit c288fa02 authored by Philippe Bardou's avatar Philippe Bardou
Browse files

Start of get access to output files

parent 444482c2
...@@ -24,7 +24,7 @@ from jflow.workflows_manager import WorkflowsManager ...@@ -24,7 +24,7 @@ from jflow.workflows_manager import WorkflowsManager
from jflow.config_reader import JFlowConfigReader from jflow.config_reader import JFlowConfigReader
from jflow.dataset import ArrayList from jflow.dataset import ArrayList
from jflow.utils import which from jflow.utils import which
from jflow.iotypes import DynamicOutput from jflow.iotypes import DynamicOutput, OutputFile, OutputFileList
from weaver.util import parse_string_list from weaver.util import parse_string_list
...@@ -53,6 +53,16 @@ class Component(object): ...@@ -53,6 +53,16 @@ class Component(object):
dynamic_outputs.append( attribute_value ) dynamic_outputs.append( attribute_value )
return dynamic_outputs return dynamic_outputs
def get_component_outputs(self):
outputs = list()
for attribute_value in self.__dict__.values():
if ( issubclass( attribute_value.__class__, DynamicOutput ) or
issubclass( attribute_value.__class__, OutputFileList) ):
outputs.extend( attribute_value )
elif issubclass( attribute_value.__class__, OutputFile):
outputs.append( attribute_value )
return outputs
def _longestCommonSubstr(self, data, clean_end=True): def _longestCommonSubstr(self, data, clean_end=True):
substr = '' substr = ''
if len(data) > 1 and len(data[0]) > 0: if len(data) > 1 and len(data[0]) > 0:
......
...@@ -71,6 +71,7 @@ class Workflow(threading.Thread): ...@@ -71,6 +71,7 @@ class Workflow(threading.Thread):
self.jflow_config_reader = JFlowConfigReader() self.jflow_config_reader = JFlowConfigReader()
self.manager = WorkflowsManager() self.manager = WorkflowsManager()
self.components_to_exec = [] self.components_to_exec = []
self.components = []
self.makes = {} self.makes = {}
self.globals = {} self.globals = {}
self.options = Options() self.options = Options()
...@@ -113,6 +114,12 @@ class Workflow(threading.Thread): ...@@ -113,6 +114,12 @@ class Workflow(threading.Thread):
for arg in arg_lines: for arg in arg_lines:
yield arg yield arg
def get_outputs_per_components(self):
outputs_files = {}
for current_components in self.components:
outputs_files[current_components.get_nameid()] = current_components.get_component_outputs()
return outputs_files
def __setstate__(self, state): def __setstate__(self, state):
self.__dict__ = state.copy() self.__dict__ = state.copy()
threading.Thread.__init__(self, name=self.name) threading.Thread.__init__(self, name=self.name)
...@@ -172,6 +179,7 @@ class Workflow(threading.Thread): ...@@ -172,6 +179,7 @@ class Workflow(threading.Thread):
if self.component_nameids_is_init: if self.component_nameids_is_init:
# add the component # add the component
self.components_to_exec.append(cmpt_object) self.components_to_exec.append(cmpt_object)
self.components.append(cmpt_object)
self._execute_weaver() self._execute_weaver()
# update outputs # update outputs
for output in cmpt_object.get_dynamic_outputs(): for output in cmpt_object.get_dynamic_outputs():
...@@ -182,15 +190,18 @@ class Workflow(threading.Thread): ...@@ -182,15 +190,18 @@ class Workflow(threading.Thread):
cmpt_object.prefix + " already exist in this pipeline!") cmpt_object.prefix + " already exist in this pipeline!")
self.component_nameids[cmpt_object.get_nameid()] = None self.component_nameids[cmpt_object.get_nameid()] = None
self.components_to_exec = [] self.components_to_exec = []
self.components = []
else: else:
if self.component_nameids_is_init: if self.component_nameids_is_init:
# add the component # add the component
self.components_to_exec.append(cmpt_object) self.components_to_exec.append(cmpt_object)
self.components.append(cmpt_object)
elif not self.component_nameids_is_init and not self.dynamic_component_present: elif not self.component_nameids_is_init and not self.dynamic_component_present:
if self._component_is_duplicated(cmpt_object): if self._component_is_duplicated(cmpt_object):
raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " + raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
cmpt_object.prefix + " already exist in this pipeline!") cmpt_object.prefix + " already exist in this pipeline!")
self.components_to_exec.append(cmpt_object) self.components_to_exec.append(cmpt_object)
self.components.append(cmpt_object)
else: else:
if self._component_is_duplicated(cmpt_object): if self._component_is_duplicated(cmpt_object):
raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " + raise ValueError("Component " + cmpt_object.__class__.__name__ + " with prefix " +
......
...@@ -90,6 +90,10 @@ class WorkflowsManager(object): ...@@ -90,6 +90,10 @@ class WorkflowsManager(object):
self._add_workflow(workflow) self._add_workflow(workflow)
return workflow return workflow
def get_workflow_ouputs(self, workflow_id):
workflow = self.get_workflow(workflow_id)
return workflow.get_outputs_per_components()
def get_output_directory(self): def get_output_directory(self):
return self.config_reader.get_work_directory() return self.config_reader.get_work_directory()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment