Source code for ewoksppf.tests.test_ppf_workflow25

import itertools
import time

import pytest
from ewokscore.task import Task
from ewoksutils.import_utils import qualname

from ..bindings import execute_graph


[docs] class Required(Task, input_names=["compute_time"], output_names=["required"]):
[docs] def run(self): time.sleep(self.inputs.compute_time) self.outputs.required = True
[docs] class Optional(Task, input_names=["compute_time"], output_names=["optional"]):
[docs] def run(self): time.sleep(self.inputs.compute_time) self.outputs.optional = True
[docs] class Gather( Task, input_names=["required1", "required2"], optional_input_names=["optional1", "optional2", "retained1", "retained2"], output_names=["cached"], ):
[docs] def run(self): global _GATHER_CACHE cached = self.get_input_values() _GATHER_CACHE = cached print(f"\nDecider executed with inputs: {cached}") self.outputs.cached = cached
[docs] def workflow(): nodes = [ { "id": "required1", "task_type": "class", "task_identifier": qualname(Required), }, { "id": "required2", "task_type": "class", "task_identifier": qualname(Required), }, { "id": "optional1", "task_type": "class", "task_identifier": qualname(Optional), }, { "id": "optional2", "task_type": "class", "task_identifier": qualname(Optional), }, { "id": "retained1", "task_type": "class", "task_identifier": qualname(Optional), }, { "id": "retained2", "task_type": "class", "task_identifier": qualname(Optional), }, { "id": "gather", "task_type": "class", "task_identifier": qualname(Gather), }, ] links = [ { "source": "required1", "target": "gather", "data_mapping": [ {"source_output": "required", "target_input": "required1"} ], }, { "source": "required2", "target": "gather", "data_mapping": [ {"source_output": "required", "target_input": "required2"} ], }, { "source": "optional1", "target": "gather", "required": False, "cache_if_optional": True, "data_mapping": [ {"source_output": "optional", "target_input": "optional1"} ], }, { "source": "optional2", "target": "gather", "required": False, "cache_if_optional": True, "data_mapping": [ {"source_output": "optional", "target_input": "optional2"} ], }, { "source": "retained1", "target": "gather", "required": False, "cache_if_optional": False, "data_mapping": [ {"source_output": "optional", "target_input": "retained1"} ], }, { "source": "retained2", "target": "gather", "required": False, "cache_if_optional": False, "data_mapping": [ {"source_output": "optional", "target_input": "retained2"} ], }, ] return {"graph": {"id": "workflow"}, "nodes": nodes, "links": links}
[docs] def get_inputs(required, optional, retained): return [ {"id": "required1", "name": "compute_time", "value": required}, {"id": "required2", "name": "compute_time", "value": required}, {"id": "optional1", "name": "compute_time", "value": optional}, {"id": "optional2", "name": "compute_time", "value": optional}, {"id": "retained1", "name": "compute_time", "value": retained}, {"id": "retained2", "name": "compute_time", "value": retained}, ]
_ORDER = list(itertools.permutations(["required", "optional", "retained"]))
[docs] @pytest.mark.parametrize("order", _ORDER, ids=["-".join(keys) for keys in _ORDER]) def test_ppf_workflow25(ppf_log_config, order): """Test input caching for different types of links executed in different orders.""" global _GATHER_CACHE _GATHER_CACHE = None compute_times = [0, 0.5, 1] inputs = get_inputs(**dict(zip(order, compute_times))) # result = execute_graph(workflow(), inputs=inputs) # cached = set(result["cached"]) # # When # # order = ('retained', 'required', 'optional') # # the last two calls to "Gather" could be for example # # {'required1': True, 'required2': True, 'optional1': True, 'retained2': True} # {'required1': True, 'required2': True, 'optional1': True, 'optional2': True, 'retained2': True} # # Since these calls happen in parallel and there is nothing in the workflow # that guarantees we get one or the other as the final workflow result we # cannot use the result to test the caching. _ = execute_graph(workflow(), pool_type="thread", inputs=inputs) cached = set(_GATHER_CACHE) cached1 = {"required1", "required2", "optional1", "optional2", "retained1"} cached2 = {"required1", "required2", "optional1", "optional2", "retained2"} assert cached == cached1 or cached == cached2, cached