Source code for ewoksppf.tests.test_ppf_workflow10
import pytest
from ewokscore.tests.utils.results import assert_execute_graph_values
from ewoksppf import execute_graph
[docs]
def workflow10(inputs):
default_inputs = [{"name": name, "value": value} for name, value in inputs.items()]
nodes = [
{
"id": "addWithoutSleep",
"default_inputs": default_inputs,
"force_start_node": True,
"task_type": "ppfmethod",
"task_identifier": "ewoksppf.tests.test_ppf_actors.pythonActorAddWithoutSleep.run",
},
{
"id": "check",
"task_type": "ppfmethod",
"task_identifier": "ewoksppf.tests.test_ppf_actors.pythonActorCheck.run",
},
]
links = [
{
"source": "addWithoutSleep",
"target": "check",
"map_all_data": True,
},
{
"source": "check",
"target": "addWithoutSleep",
"conditions": [{"source_output": "doContinue", "value": "true"}],
"map_all_data": True,
},
]
graph = {
"graph": {"id": "workflow10"},
"links": links,
"nodes": nodes,
}
limit = inputs["limit"]
expected_result = {
"_ppfdict": {"doContinue": "false", "limit": limit, "value": limit}
}
return graph, expected_result
[docs]
@pytest.mark.parametrize("limit", [10])
@pytest.mark.parametrize("scheme", [None, "json"])
def test_workflow10(limit, scheme, ppf_log_config, tmpdir):
if scheme:
varinfo = {"root_uri": str(tmpdir), "scheme": scheme}
else:
varinfo = {}
inputs = {"value": 1, "limit": limit}
graph, expected = workflow10(inputs)
result = execute_graph(graph, varinfo=varinfo)
if scheme:
assert_execute_graph_values(result, expected, varinfo)
else:
assert len(tmpdir.listdir()) == 0
for k in expected:
assert result[k] == expected[k]