Skip to content

Commit f16920c

Browse files
committed
WIP "post_dry_run" concept for #406
1 parent 7726839 commit f16920c

File tree

3 files changed

+65
-19
lines changed

3 files changed

+65
-19
lines changed

openeo_driver/ProcessGraphDeserializer.py

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -469,16 +469,33 @@ def evaluate(
469469
if do_dry_run:
470470
dry_run_tracer = do_dry_run if isinstance(do_dry_run, DryRunDataTracer) else DryRunDataTracer()
471471
_log.info("Doing dry run")
472-
convert_node(top_level_node, env=env.push({
473-
ENV_DRY_RUN_TRACER: dry_run_tracer,
474-
ENV_SAVE_RESULT: [], # otherwise dry run and real run append to the same mutable result list
475-
"node_caching": False
476-
}))
472+
dry_run_result = convert_node(
473+
top_level_node,
474+
env=env.push(
475+
{
476+
ENV_DRY_RUN_TRACER: dry_run_tracer,
477+
ENV_SAVE_RESULT: [], # otherwise dry run and real run append to the same mutable result list
478+
"node_caching": False,
479+
}
480+
),
481+
)
477482
# TODO: work with a dedicated DryRunEvalEnv?
478483
source_constraints = dry_run_tracer.get_source_constraints()
479484
_log.info("Dry run extracted these source constraints: {s}".format(s=source_constraints))
485+
486+
# TODO: Given the post-dry-run hook concept: is it still necessary to push source_constraints into env?
480487
env = env.push({ENV_SOURCE_CONSTRAINTS: source_constraints})
481488

489+
post_dry_run_data = env.backend_implementation.post_dry_run(
490+
dry_run_result=dry_run_result,
491+
dry_run_tracer=dry_run_tracer,
492+
source_constraints=source_constraints,
493+
# TODO: use env before ENV_SOURCE_CONSTRAINTS have been pushed?
494+
env=env,
495+
)
496+
if post_dry_run_data:
497+
env = env.push(post_dry_run_data)
498+
482499
result = convert_node(top_level_node, env=env)
483500
if len(env[ENV_SAVE_RESULT]) > 0:
484501
if len(env[ENV_SAVE_RESULT]) == 1:
@@ -763,17 +780,17 @@ def _extract_load_parameters(env: EvalEnv, source_id: tuple) -> LoadParameters:
763780

764781
if extent is not None:
765782
collection_crs = _collection_crs(collection_id=collection_id, env=env)
766-
crs = constraint.get("resample", {}).get("target_crs", collection_crs) or collection_crs
783+
target_crs = constraint.get("resample", {}).get("target_crs", collection_crs) or collection_crs
767784
target_resolution = constraint.get("resample", {}).get("resolution", None) or _collection_resolution(
768785
collection_id=collection_id, env=env
769786
)
770787

771788
if "pixel_buffer" in constraint:
772789
buffer = constraint["pixel_buffer"]["buffer_size"]
773790

774-
if (crs is not None) and target_resolution:
791+
if (target_crs is not None) and target_resolution:
775792
bbox = BoundingBox.from_dict(extent, default_crs=4326)
776-
extent = bbox.reproject(crs).as_dict()
793+
extent = bbox.reproject(target_crs).as_dict()
777794

778795
extent = {
779796
"west": extent["west"] - target_resolution[0] * math.ceil(buffer[0]),
@@ -785,11 +802,11 @@ def _extract_load_parameters(env: EvalEnv, source_id: tuple) -> LoadParameters:
785802
else:
786803
_log.warning("Not applying buffer to extent because the target CRS is not known.")
787804

788-
load_collection_in_native_grid = "resample" not in constraint or crs == collection_crs
805+
load_collection_in_native_grid = "resample" not in constraint or target_crs == collection_crs
789806
if (not load_collection_in_native_grid) and collection_crs is not None and ("42001" in str(collection_crs)):
790807
#resampling auto utm to utm means we are loading in native grid
791808
try:
792-
load_collection_in_native_grid = "UTM zone" in CRS.from_user_input(crs).to_wkt()
809+
load_collection_in_native_grid = "UTM zone" in CRS.from_user_input(target_crs).to_wkt()
793810
except CRSError as e:
794811
pass
795812

@@ -2608,7 +2625,7 @@ def load_stac(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
26082625

26092626
dry_run_tracer: DryRunDataTracer = env.get(ENV_DRY_RUN_TRACER)
26102627
if dry_run_tracer:
2611-
return dry_run_tracer.load_stac(url, arguments, env)
2628+
return dry_run_tracer.load_stac(url=url, arguments=arguments, env=env)
26122629
else:
26132630
source_id = dry_run.DataSource.load_stac(
26142631
url, properties=arguments.get("properties", {}), bands=arguments.get("bands", []), env=env

openeo_driver/backend.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import sys
1717
from datetime import datetime, timedelta
1818
from pathlib import Path
19-
from typing import List, Union, NamedTuple, Dict, Optional, Callable, Iterable, Container
19+
from typing import List, Union, NamedTuple, Dict, Optional, Callable, Iterable, Container, Any
2020

2121
import flask
2222

@@ -28,6 +28,7 @@
2828
from openeo_driver.config import OpenEoBackendConfig, get_backend_config
2929
from openeo_driver.datacube import DriverDataCube, DriverMlModel, DriverVectorCube
3030
from openeo_driver.datastructs import SarBackscatterArgs
31+
from openeo_driver.dry_run import DryRunDataTracer, SourceConstraint, DryRunDataCube
3132
from openeo_driver.errors import CollectionNotFoundException, ServiceUnsupportedException, FeatureUnsupportedException
3233
from openeo_driver.constants import JOB_STATUS, DEFAULT_LOG_LEVEL_RETRIEVAL
3334
from openeo_driver.processes import ProcessRegistry
@@ -1004,6 +1005,32 @@ def request_costs(
10041005
"""
10051006
return None
10061007

1008+
def post_dry_run(
1009+
self,
1010+
*,
1011+
dry_run_result: Union[DryRunDataCube, Any],
1012+
dry_run_tracer: DryRunDataTracer,
1013+
source_constraints: List[SourceConstraint],
1014+
env: EvalEnv,
1015+
) -> Union[None, dict]:
1016+
"""
1017+
Hook to analyse the outcome of a full dry-run evaluation
1018+
and set some additional "global" EvalEnv state for the wet run evaluation.
1019+
For example, to set process-graph-wide, preferred processing hints/directives
1020+
about CRS, projection, resolution, alignment, partitioning, ...
1021+
1022+
This is an experimental API, the set of available arguments is still in flux.
1023+
1024+
:param dry_run_result: result of dry-run evaluation of process graph (typically a DryRunDataCube)
1025+
:param dry_run_tracer: tracer used in dry-run evaluation
1026+
:param env: EvalEnv as used in dry-run evaluation
1027+
:param source_constraints:
1028+
1029+
:return: dict with extra state to push to the EvalEnv before triggering the wet run,
1030+
or None (to push nothing).
1031+
"""
1032+
return None
1033+
10071034

10081035
def function_has_argument(function: Callable, argument: str) -> bool:
10091036
"""Does function support given argument?"""

openeo_driver/dry_run.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,22 @@
1212
but pushing around dummy data cubes.
1313
1414
The architecture consists of these classes:
15-
- DataTrace: starts from a `load_collection` (or other source) process and records what happens to this
16-
single data source (filter_temporal, filter_bbox, ...)
15+
- DataTrace: starts from a `load_collection`, `load_stac` or other source process
16+
and records what happens to this single data source (filter_temporal, filter_bbox, ...)
1717
- DryRunDataTracer: observer that keeps track of all data traces during a dry run
18-
- DryRunDataCube: dummy data cube that is passed around in processed
18+
- DryRunDataCube: dummy data cube that is passed around durin (dry-run) processing
1919
2020
Their relationship is as follows:
2121
- There is a single DryRunDataTracer for a dry-run, keeping track of all relevant operations on all sources
2222
- A DryRunDataCube links to one or more DataTraces, describing the operations that happened
2323
on the sources that lead to the state of the DryRunDataCube. Often there is just one DataTrace
2424
in a DryRunDataCube, but when the DryRunDataCube is result of mask or merge_cubes operations,
2525
there will be multiple DataTraces.
26-
A DryRunDataCube also has a reference to the DryRunDataTracer in play, so that it can be informed
27-
when processes are applied to the DryRunDataCube.
26+
A DryRunDataCube also has a reference to the DryRunDataTracer in play,
27+
so that this tracer can be informed when processes are applied to the DryRunDataCube.
2828
2929
When the dry-run phase is done, the DryRunDataTracer knows about all relevant operations
30-
on each data source. It provides methods for example to extract source constraints (bbox/bands/date ranges)
30+
on each data source. It provides methods to extract source constraints (bbox/bands/date ranges)
3131
which are used to bootstrap the EvalEnv that is used for the real process graph processing phase.
3232
These source constraints can then be fetched from the EvalEnv at `load_collection` time.
3333
@@ -205,7 +205,9 @@ def load_stac(cls, url: str, properties={}, bands=[], env=EvalEnv()) -> "DataSou
205205

206206
class DataTrace(DataTraceBase):
207207
"""
208-
Processed data: linked list of processes, ending at a data source node.
208+
Processed data: chain of processes/operations (with arguments),
209+
linked together through parent-child relations,
210+
originating from a source node `DataSource` (final parent in the chain).
209211
210212
Note: this is not the same as a data cube, as a data cube can be combination of multiple data
211213
traces (e.g. after mask or merge process).

0 commit comments

Comments
 (0)