Skip to content

Commit 3d71ce0

Browse files
committed
Internal: clarify non-standard "node" key in nested process graph representation
related to openeo-geopyspark-driver#1331
1 parent 532e957 commit 3d71ce0

File tree

1 file changed

+20
-14
lines changed

1 file changed

+20
-14
lines changed

openeo/internal/process_graph_visitor.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ class ProcessGraphVisitException(OpenEoClientException):
1212
pass
1313

1414

15+
# TODO: this key is non-standard and mainly specific to a particular openeo-python-driver use case,
16+
# which probably should be refactored out from this more generic implementation.
17+
# Making it a named constant at least makes this more visible and trackable.
18+
DEREFERENCED_NODE_KEY = "node"
19+
20+
1521
class ProcessGraphVisitor(ABC):
1622
"""
1723
Hierarchical Visitor for (nested) process graphs structures.
@@ -26,40 +32,40 @@ def dereference_from_node_arguments(cls, process_graph: dict) -> str:
2632
Walk through the given (flat) process graph and replace (in-place) "from_node" references in
2733
process arguments (dictionaries or lists) with the corresponding resolved subgraphs
2834
29-
:param process_graph: process graph dictionary to be manipulated in-place
35+
:param process_graph: process graph dictionary (flat representation) to be manipulated in-place
3036
:return: name of the "result" node of the graph
3137
"""
3238

3339
# TODO avoid manipulating process graph in place? make it more explicit? work on a copy?
3440
# TODO call it more something like "unflatten"?. Split this off of ProcessGraphVisitor?
3541
# TODO implement this through `ProcessGraphUnflattener` ?
3642

37-
def resolve_from_node(process_graph, node, from_node):
43+
def resolve_from_node(process_graph: dict, node_id: str, from_node: str):
3844
if from_node not in process_graph:
3945
raise ProcessGraphVisitException(
40-
"from_node {f!r} (referenced by {n!r}) not in process graph.".format(f=from_node, n=node)
46+
f"from_node {from_node!r} (referenced by {node_id!r}) not in process graph."
4147
)
4248
return process_graph[from_node]
4349

4450
result_node = None
45-
for node, node_dict in process_graph.items():
46-
if node_dict.get("result", False):
51+
for node_id, node_data in process_graph.items():
52+
if node_data.get("result", False):
4753
if result_node:
48-
raise ProcessGraphVisitException("Multiple result nodes: {a}, {b}".format(a=result_node, b=node))
49-
result_node = node
50-
arguments = node_dict.get("arguments", {})
54+
raise ProcessGraphVisitException(f"Multiple result nodes: {result_node!r}, {node_id!r}")
55+
result_node = node_id
56+
arguments = node_data.get("arguments", {})
5157
for arg in arguments.values():
5258
if isinstance(arg, dict):
5359
if "from_node" in arg:
54-
arg["node"] = resolve_from_node(process_graph, node, arg["from_node"])
60+
arg[DEREFERENCED_NODE_KEY] = resolve_from_node(process_graph, node_id, arg["from_node"])
5561
else:
5662
for k, v in arg.items():
5763
if isinstance(v, dict) and "from_node" in v:
58-
v["node"] = resolve_from_node(process_graph, node, v["from_node"])
64+
v[DEREFERENCED_NODE_KEY] = resolve_from_node(process_graph, node_id, v["from_node"])
5965
elif isinstance(arg, list):
6066
for i, element in enumerate(arg):
6167
if isinstance(element, dict) and "from_node" in element:
62-
arg[i] = resolve_from_node(process_graph, node, element["from_node"])
68+
arg[i] = resolve_from_node(process_graph, node_id, element["from_node"])
6369

6470
if result_node is None:
6571
dump = json.dumps(process_graph, indent=2)
@@ -114,9 +120,9 @@ def _accept_argument_list(self, elements: list):
114120
self.constantArrayElement(element)
115121

116122
def _accept_argument_dict(self, value: dict):
117-
if "node" in value and "from_node" in value:
123+
if DEREFERENCED_NODE_KEY in value and "from_node" in value:
118124
# TODO: this looks bit weird (or at least very specific).
119-
self.accept_node(value["node"])
125+
self.accept_node(value[DEREFERENCED_NODE_KEY])
120126
elif value.get("from_node"):
121127
self.accept_node(value["from_node"])
122128
elif "process_id" in value:
@@ -230,7 +236,7 @@ def _process_from_node(self, key: str, node: dict) -> Any:
230236
"""
231237
# Default/original implementation: keep "from_node" key and add resolved node under "node" key.
232238
# TODO: just return `self.get_node(key=key)`
233-
return {"from_node": key, "node": self.get_node(key=key)}
239+
return {"from_node": key, DEREFERENCED_NODE_KEY: self.get_node(key=key)}
234240

235241
def _process_from_parameter(self, name: str) -> Any:
236242
"""

0 commit comments

Comments
 (0)