Unverified Commit 71b56151 authored by Dannon's avatar Dannon Committed by GitHub
Browse files

Merge pull request #6559 from jmchilton/wf_api_legacy

[18.01] Restore the ability to track jobs for a WF invocation step.
parents 7d7a875a b878835e
......@@ -662,7 +662,16 @@ class Configuration(object):
self.containers_conf = parse_containers_config(self.containers_config_file)
if kwargs.get("log_destination", None):
log_destination = kwargs.get("log_destination", None)
if log_destination == "stdout":
LOGGING_CONFIG_DEFAULT['handlers']['console'] = {
'class': 'logging.StreamHandler',
'formatter': 'stack',
'level': 'DEBUG',
'stream': 'ext://sys.stdout',
'filters': ['stack']
}
elif log_destination:
LOGGING_CONFIG_DEFAULT['handlers']['console'] = {
'class': 'logging.FileHandler',
'formatter': 'stack',
......
......@@ -4185,7 +4185,7 @@ class WorkflowInvocation(object, UsesCreateAndUpdateTime, Dictifiable):
else:
raise Exception("Unknown output type encountered")
def to_dict(self, view='collection', value_mapper=None, step_details=False):
def to_dict(self, view='collection', value_mapper=None, step_details=False, legacy_job_state=False):
rval = super(WorkflowInvocation, self).to_dict(view=view, value_mapper=value_mapper)
if view == 'element':
steps = []
......@@ -4194,6 +4194,18 @@ class WorkflowInvocation(object, UsesCreateAndUpdateTime, Dictifiable):
v = step.to_dict(view='element')
else:
v = step.to_dict(view='collection')
if legacy_job_state:
step_jobs = step.jobs
if step_jobs:
for step_job in step_jobs:
v_clone = v.copy()
v_clone["state"] = step_job.state
v_clone["job_id"] = step_job.id
steps.append(v_clone)
else:
v["state"] = None
steps.append(v)
else:
steps.append(v)
rval['steps'] = steps
......@@ -4326,6 +4338,10 @@ class WorkflowInvocationStep(object, Dictifiable):
# Following no longer makes sense...
# rval['state'] = self.job.state if self.job is not None else None
if view == 'element':
jobs = []
for job in self.jobs:
jobs.append(job.to_dict())
outputs = {}
for output_assoc in self.output_datasets:
name = output_assoc.output_name
......@@ -4345,6 +4361,7 @@ class WorkflowInvocationStep(object, Dictifiable):
rval['outputs'] = outputs
rval['output_collections'] = output_collections
rval['jobs'] = jobs
return rval
......
......@@ -690,12 +690,30 @@ class WorkflowsAPIController(BaseAPIController, UsesStoredWorkflowMixin, UsesAnn
:param invocation_id: the invocation id (required)
:type invocation_id: str
:param step_details: fetch details about individual invocation steps
and populate a steps attribute in the resulting
dictionary. Defaults to false.
:type step_details: bool
:param legacy_job_state: If step_details is rrue, and this is set to true
populate the invocation step state with the job state
instead of the invocation step state. This will also
produce one step per job in mapping jobs to mimic the
older behavior with respect to collections. Partially
scheduled steps may provide incomplete information
and the listed steps outputs are the mapped over
step outputs but the individual job outputs
when this is set - at least for now.
:type legacy_job_state: bool
:raises: exceptions.MessageException, exceptions.ObjectNotFound
"""
decoded_workflow_invocation_id = self.decode_id(invocation_id)
workflow_invocation = self.workflow_manager.get_invocation(trans, decoded_workflow_invocation_id)
if workflow_invocation:
return self.__encode_invocation(trans, workflow_invocation, step_details=kwd.get('step_details', False))
step_details = util.string_as_bool(kwd.get('step_details', 'False'))
legacy_job_state = util.string_as_bool(kwd.get('legacy_job_state', 'False'))
return self.__encode_invocation(trans, workflow_invocation, step_details=step_details, legacy_job_state=legacy_job_state)
return None
@expose_api
......@@ -785,9 +803,9 @@ class WorkflowsAPIController(BaseAPIController, UsesStoredWorkflowMixin, UsesAnn
def __get_stored_workflow(self, trans, workflow_id):
return self.workflow_manager.get_stored_workflow(trans, workflow_id)
def __encode_invocation(self, trans, invocation, view="element", step_details=False):
def __encode_invocation(self, trans, invocation, view="element", step_details=False, legacy_job_state=False):
return self.encode_all_ids(
trans,
invocation.to_dict(view, step_details=step_details),
invocation.to_dict(view, step_details=step_details, legacy_job_state=legacy_job_state),
True
)
......@@ -193,8 +193,8 @@ class BaseWorkflowsApiTestCase(api.ApiTestCase):
workflow_inputs = workflow_show_resposne.json()["inputs"]
return workflow_inputs
def _invocation_details(self, workflow_id, invocation_id):
invocation_details_response = self._get("workflows/%s/usage/%s" % (workflow_id, invocation_id))
def _invocation_details(self, workflow_id, invocation_id, **kwds):
invocation_details_response = self._get("workflows/%s/usage/%s" % (workflow_id, invocation_id), data=kwds)
self._assert_status_code_is(invocation_details_response, 200)
invocation_details = invocation_details_response.json()
return invocation_details
......@@ -2868,15 +2868,108 @@ steps:
invocation_id = usage["id"]
usage_details = self._invocation_details(workflow_id, invocation_id)
# Assert some high-level things about the structure of data returned.
self._assert_has_keys(usage_details, "inputs", "steps")
self._assert_has_keys(usage_details, "inputs", "steps", "workflow_id")
# stored_workflow vs workflow thing makes following state false.
# assert usage_details["workflow_id"] == workflow_id
# Wait for the invocation to be fully scheduled, so we have details on all steps.
self._wait_for_invocation_state(workflow_id, invocation_id, 'scheduled')
usage_details = self._invocation_details(workflow_id, invocation_id)
invocation_steps = usage_details["steps"]
invocation_input_step, invocation_tool_step = None, None
for invocation_step in invocation_steps:
self._assert_has_keys(invocation_step, "workflow_step_id", "order_index", "id")
order_index = invocation_step["order_index"]
assert order_index in [0, 1, 2], order_index
if order_index == 0:
invocation_input_step = invocation_step
elif order_index == 2:
invocation_tool_step = invocation_step
# Tool steps have non-null job_ids (deprecated though they may be)
assert invocation_input_step.get("job_id", None) is None
job_id = invocation_tool_step.get("job_id", None)
assert job_id is not None
invocation_tool_step_id = invocation_tool_step["id"]
invocation_tool_step_response = self._get("workflows/%s/invocations/%s/steps/%s" % (workflow_id, invocation_id, invocation_tool_step_id))
self._assert_status_code_is(invocation_tool_step_response, 200)
self._assert_has_keys(invocation_tool_step_response.json(), "id", "order_index", "job_id")
assert invocation_tool_step_response.json()["job_id"] == job_id
def test_invocation_with_collection_mapping(self):
workflow_id, invocation_id = self._run_mapping_workflow()
usage_details = self._invocation_details(workflow_id, invocation_id)
# Assert some high-level things about the structure of data returned.
self._assert_has_keys(usage_details, "inputs", "steps", "workflow_id")
invocation_steps = usage_details["steps"]
invocation_input_step, invocation_tool_step = None, None
for invocation_step in invocation_steps:
self._assert_has_keys(invocation_step, "workflow_step_id", "order_index", "id")
order_index = invocation_step["order_index"]
assert order_index in [0, 1]
if invocation_step["order_index"] == 0:
assert invocation_input_step is None
invocation_input_step = invocation_step
else:
assert invocation_tool_step is None
invocation_tool_step = invocation_step
# Tool steps have non-null job_ids (deprecated though they may be)
assert invocation_input_step.get("job_id", None) is None
assert invocation_tool_step.get("job_id", None) is None
assert invocation_tool_step["state"] == "scheduled"
usage_details = self._invocation_details(workflow_id, invocation_id, legacy_job_state="true")
# Assert some high-level things about the structure of data returned.
self._assert_has_keys(usage_details, "inputs", "steps", "workflow_id")
invocation_steps = usage_details["steps"]
for step in invocation_steps:
self._assert_has_keys(step, "workflow_step_id", "order_index", "id")
an_invocation_step = invocation_steps[0]
step_id = an_invocation_step["id"]
step_response = self._get("workflows/%s/usage/%s/steps/%s" % (workflow_id, invocation_id, step_id))
self._assert_status_code_is(step_response, 200)
self._assert_has_keys(step_response.json(), "id", "order_index")
invocation_input_step = None
invocation_tool_steps = []
for invocation_step in invocation_steps:
self._assert_has_keys(invocation_step, "workflow_step_id", "order_index", "id")
order_index = invocation_step["order_index"]
assert order_index in [0, 1]
if invocation_step["order_index"] == 0:
assert invocation_input_step is None
invocation_input_step = invocation_step
else:
invocation_tool_steps.append(invocation_step)
assert len(invocation_tool_steps) == 2
assert invocation_tool_steps[0]["state"] == "ok"
def _run_mapping_workflow(self):
history_id = self.dataset_populator.new_history()
summary = self._run_jobs("""
class: GalaxyWorkflow
inputs:
- type: collection
label: input_c
steps:
- label: cat1
tool_id: cat1
state:
input1:
$link: input_c
test_data:
input_c:
type: list
elements:
- identifier: i1
content: "0"
- identifier: i2
content: "1"
""", history_id=history_id, wait=True, assert_ok=True)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
return workflow_id, invocation_id
@skip_without_tool("cat1")
def test_invocations_accessible_imported_workflow(self):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment