Skip to content

Commit f8db877

Browse files
committed
Issue #604/#644 UDPJobFactory: improve geometry support
1 parent 04296c1 commit f8db877

File tree

6 files changed

+282
-18
lines changed

6 files changed

+282
-18
lines changed

openeo/extra/job_management.py

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,16 @@
1616
import pandas as pd
1717
import requests
1818
import shapely.errors
19+
import shapely.geometry.base
1920
import shapely.wkt
2021
from requests.adapters import HTTPAdapter, Retry
2122

2223
from openeo import BatchJob, Connection
23-
from openeo.internal.processes.parse import Process, parse_remote_process_definition
24+
from openeo.internal.processes.parse import (
25+
Parameter,
26+
Process,
27+
parse_remote_process_definition,
28+
)
2429
from openeo.rest import OpenEoApiError
2530
from openeo.util import deep_get, repr_truncate, rfc3339
2631

@@ -943,11 +948,17 @@ class UDPJobFactory:
943948
"""
944949

945950
def __init__(
946-
self, process_id: str, *, namespace: Union[str, None] = None, parameter_defaults: Optional[dict] = None
951+
self,
952+
process_id: str,
953+
*,
954+
namespace: Union[str, None] = None,
955+
parameter_defaults: Optional[dict] = None,
956+
parameter_column_map: Optional[dict] = None,
947957
):
948958
self._process_id = process_id
949959
self._namespace = namespace
950960
self._parameter_defaults = parameter_defaults or {}
961+
self._parameter_column_map = parameter_column_map
951962

952963
def _get_process_definition(self, connection: Connection) -> Process:
953964
if isinstance(self._namespace, str) and re.match("https?://", self._namespace):
@@ -979,33 +990,38 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob:
979990

980991
process_definition = self._get_process_definition(connection=connection)
981992
parameters = process_definition.parameters or []
993+
994+
if self._parameter_column_map is None:
995+
self._parameter_column_map = self._guess_parameter_column_map(parameters=parameters, row=row)
996+
982997
arguments = {}
983998
for parameter in parameters:
984-
name = parameter.name
985-
schema = parameter.schema
986-
if name in row.index:
987-
# Higherst priority: value from dataframe row
988-
value = row[name]
989-
elif name in self._parameter_defaults:
999+
param_name = parameter.name
1000+
column_name = self._parameter_column_map.get(param_name, param_name)
1001+
if column_name in row.index:
1002+
# Get value from dataframe row
1003+
value = row.loc[column_name]
1004+
elif param_name in self._parameter_defaults:
9901005
# Fallback on default values from constructor
991-
value = self._parameter_defaults[name]
1006+
value = self._parameter_defaults[param_name]
9921007
elif parameter.has_default():
9931008
# Explicitly use default value from parameter schema
9941009
value = parameter.default
9951010
elif parameter.optional:
9961011
# Skip optional parameters without any fallback default value
9971012
continue
9981013
else:
999-
raise ValueError(f"Missing required parameter {name!r} for process {self._process_id!r}")
1014+
raise ValueError(f"Missing required parameter {param_name !r} for process {self._process_id!r}")
10001015

1001-
# TODO: validation or normalization based on schema?
1002-
# Some pandas/numpy data types need a bit of conversion for JSON encoding
1016+
# Prepare some values/dtypes for JSON encoding
10031017
if isinstance(value, numpy.integer):
10041018
value = int(value)
10051019
elif isinstance(value, numpy.number):
10061020
value = float(value)
1021+
elif isinstance(value, shapely.geometry.base.BaseGeometry):
1022+
value = shapely.geometry.mapping(value)
10071023

1008-
arguments[name] = value
1024+
arguments[param_name] = value
10091025

10101026
cube = connection.datacube_from_process(process_id=self._process_id, namespace=self._namespace, **arguments)
10111027

@@ -1020,3 +1036,26 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob:
10201036
def __call__(self, *arg, **kwargs) -> BatchJob:
10211037
"""Syntactic sugar for calling `start_job` directly."""
10221038
return self.start_job(*arg, **kwargs)
1039+
1040+
@staticmethod
1041+
def _guess_parameter_column_map(parameters: List[Parameter], row: pd.Series) -> dict:
1042+
"""
1043+
Guess parameter-column mapping from given parameter list and dataframe row
1044+
"""
1045+
parameter_column_map = {}
1046+
# Geometry based mapping: try to automatically map geometry columns to geojson parameters
1047+
geojson_parameters = [p.name for p in parameters if p.schema.accepts_geojson()]
1048+
geometry_columns = [i for (i, v) in row.items() if isinstance(v, shapely.geometry.base.BaseGeometry)]
1049+
if geojson_parameters and geometry_columns:
1050+
if len(geojson_parameters) == 1 and len(geometry_columns) == 1:
1051+
# Most common case: one geometry parameter and one geometry column: can be mapped naively
1052+
parameter_column_map[geojson_parameters[0]] = geometry_columns[0]
1053+
elif all(p in geometry_columns for p in geojson_parameters):
1054+
# Each geometry param has geometry column with same name: easy to map
1055+
parameter_column_map.update((p, p) for p in geojson_parameters)
1056+
else:
1057+
raise RuntimeError(
1058+
f"Problem with mapping geometry columns ({geometry_columns}) to process parameters ({geojson_parameters})"
1059+
)
1060+
_log.debug(f"Guessed parameter-column map: {parameter_column_map}")
1061+
return parameter_column_map

openeo/internal/processes/parse.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,18 @@ def is_process_graph(self) -> bool:
3131
and self.schema.get("subtype") == "process-graph"
3232
)
3333

34+
def accepts_geojson(self) -> bool:
35+
"""Does this schema accept inline GeoJSON objects?"""
36+
37+
def is_geojson_schema(schema) -> bool:
38+
return isinstance(schema, dict) and schema.get("type") == "object" and schema.get("subtype") == "geojson"
39+
40+
if isinstance(self.schema, dict):
41+
return is_geojson_schema(self.schema)
42+
elif isinstance(self.schema, list):
43+
return any(is_geojson_schema(s) for s in self.schema)
44+
return False
45+
3446

3547
_NO_DEFAULT = object()
3648

openeo/rest/_testing.py

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
import collections
12
import json
23
import re
3-
from typing import Optional, Union
4+
from typing import Callable, Iterator, Optional, Sequence, Union
45

56
from openeo import Connection, DataCube
67
from openeo.rest.vectorcube import VectorCube
@@ -25,6 +26,7 @@ class DummyBackend:
2526
"validation_requests",
2627
"next_result",
2728
"next_validation_errors",
29+
"job_status_updater",
2830
)
2931

3032
# Default result (can serve both as JSON or binary data)
@@ -37,6 +39,13 @@ def __init__(self, requests_mock, connection: Connection):
3739
self.validation_requests = []
3840
self.next_result = self.DEFAULT_RESULT
3941
self.next_validation_errors = []
42+
43+
# Job status update hook:
44+
# callable that is called on starting a job, and getting job metadata
45+
# allows to dynamically change how the status of a job evolves
46+
# By default: immediately set to "finished" once job is started
47+
self.job_status_updater = lambda job_id, current_status: "finished"
48+
4049
requests_mock.post(
4150
connection.build_url("/result"),
4251
content=self._handle_post_result,
@@ -90,13 +99,19 @@ def _handle_post_job_results(self, request, context):
9099
"""Handler of `POST /job/{job_id}/results` (start batch job)."""
91100
job_id = self._get_job_id(request)
92101
assert self.batch_jobs[job_id]["status"] == "created"
93-
# TODO: support custom status sequence (instead of directly going to status "finished")?
94-
self.batch_jobs[job_id]["status"] = "finished"
102+
self.batch_jobs[job_id]["status"] = self.job_status_updater(
103+
job_id=job_id, current_status=self.batch_jobs[job_id]["status"]
104+
)
95105
context.status_code = 202
96106

97107
def _handle_get_job(self, request, context):
98108
"""Handler of `GET /job/{job_id}` (get batch job status and metadata)."""
99109
job_id = self._get_job_id(request)
110+
# Allow updating status with `job_status_setter` once job got past status "created"
111+
if self.batch_jobs[job_id]["status"] != "created":
112+
self.batch_jobs[job_id]["status"] = self.job_status_updater(
113+
job_id=job_id, current_status=self.batch_jobs[job_id]["status"]
114+
)
100115
return {"id": job_id, "status": self.batch_jobs[job_id]["status"]}
101116

102117
def _handle_get_job_results(self, request, context):
@@ -162,6 +177,21 @@ def execute(self, cube: Union[DataCube, VectorCube], process_id: Optional[str] =
162177
cube.execute()
163178
return self.get_pg(process_id=process_id)
164179

180+
def setup_simple_job_status_flow(self, *, queued: int = 1, running: int = 4, final: str = "finished"):
181+
"""
182+
Set up simple job status flow:
183+
queued (a couple of times) -> running (a couple of times) -> finished/error.
184+
"""
185+
template = ["queued"] * queued + ["running"] * running + [final]
186+
job_stacks = collections.defaultdict(template.copy)
187+
188+
def get_status(job_id: str, current_status: str) -> str:
189+
stack = job_stacks[job_id]
190+
# Pop first item each time, but repeat the last one at the end
191+
return stack.pop(0) if len(stack) > 1 else stack[0]
192+
193+
self.job_status_updater = get_status
194+
165195

166196
def build_capabilities(
167197
*,

tests/extra/test_job_management.py

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,7 +1011,9 @@ def test_create_job_db(tmp_path, filename, expected):
10111011
class TestUDPJobFactory:
10121012
@pytest.fixture
10131013
def dummy_backend(self, requests_mock, con120) -> DummyBackend:
1014-
return DummyBackend(requests_mock=requests_mock, connection=con120)
1014+
dummy = DummyBackend(requests_mock=requests_mock, connection=con120)
1015+
dummy.setup_simple_job_status_flow(queued=2, running=3, final="finished")
1016+
return dummy
10151017

10161018
@pytest.fixture(autouse=True)
10171019
def remote_process_definitions(self, requests_mock):
@@ -1043,6 +1045,31 @@ def remote_process_definitions(self, requests_mock):
10431045
},
10441046
},
10451047
)
1048+
requests_mock.get(
1049+
"https://remote.test/offset_poplygon.json",
1050+
json={
1051+
"id": "offset_poplygon",
1052+
"parameters": [
1053+
{"name": "data", "description": "data", "schema": {"type": "number"}},
1054+
{
1055+
"name": "polygons",
1056+
"description": "polygons",
1057+
"schema": {
1058+
"title": "GeoJSON",
1059+
"type": "object",
1060+
"subtype": "geojson",
1061+
},
1062+
},
1063+
{
1064+
"name": "offset",
1065+
"description": "Offset",
1066+
"schema": {"type": "number"},
1067+
"optional": True,
1068+
"default": 0,
1069+
},
1070+
],
1071+
},
1072+
)
10461073

10471074
def test_minimal(self, con120, dummy_backend):
10481075
"""Bare minimum: just start a job, no parameters/arguments"""
@@ -1124,7 +1151,7 @@ def test_basic_parameterization(self, con120, dummy_backend, parameter_defaults,
11241151
@pytest.fixture
11251152
def job_manager(self, tmp_path, dummy_backend) -> MultiBackendJobManager:
11261153
job_manager = MultiBackendJobManager(root_dir=tmp_path / "job_mgr_root")
1127-
job_manager.add_backend("dummy", connection=dummy_backend.connection)
1154+
job_manager.add_backend("dummy", connection=dummy_backend.connection, parallel_jobs=1)
11281155
return job_manager
11291156

11301157
def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock):
@@ -1143,6 +1170,8 @@ def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job
11431170
"sleep": dirty_equals.IsInt(gt=1),
11441171
"start_job call": 3,
11451172
"job start": 3,
1173+
"job started running": 3,
1174+
"job finished": 3,
11461175
}
11471176
)
11481177
assert set(job_db.read().status) == {"finished"}
@@ -1244,6 +1273,7 @@ def test_udp_job_manager_parameter_handling(
12441273
"sleep": dirty_equals.IsInt(gt=1),
12451274
"start_job call": 3,
12461275
"job start": 3,
1276+
"job finished": 3,
12471277
}
12481278
)
12491279
assert set(job_db.read().status) == {"finished"}
@@ -1286,3 +1316,78 @@ def test_udp_job_manager_parameter_handling(
12861316
"status": "finished",
12871317
},
12881318
}
1319+
1320+
def test_udp_job_manager_geometry(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock):
1321+
job_starter = UDPJobFactory(
1322+
process_id="offset_poplygon",
1323+
namespace="https://remote.test/offset_poplygon.json",
1324+
parameter_defaults={"data": 123},
1325+
)
1326+
1327+
df = geopandas.GeoDataFrame.from_features(
1328+
{
1329+
"type": "FeatureCollection",
1330+
"features": [
1331+
{
1332+
"type": "Feature",
1333+
"id": "one",
1334+
"properties": {"offset": 11},
1335+
"geometry": {"type": "Point", "coordinates": (1.0, 2.0)},
1336+
},
1337+
{
1338+
"type": "Feature",
1339+
"id": "two",
1340+
"properties": {"offset": 22},
1341+
"geometry": {"type": "Point", "coordinates": (3.0, 4.0)},
1342+
},
1343+
],
1344+
}
1345+
)
1346+
1347+
job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df)
1348+
1349+
stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter)
1350+
assert stats == dirty_equals.IsPartialDict(
1351+
{
1352+
"sleep": dirty_equals.IsInt(gt=1),
1353+
"start_job call": 2,
1354+
"job start": 2,
1355+
"job finished": 2,
1356+
}
1357+
)
1358+
assert set(job_db.read().status) == {"finished"}
1359+
1360+
assert dummy_backend.batch_jobs == {
1361+
"job-000": {
1362+
"job_id": "job-000",
1363+
"pg": {
1364+
"offsetpoplygon1": {
1365+
"process_id": "offset_poplygon",
1366+
"namespace": "https://remote.test/offset_poplygon.json",
1367+
"arguments": {
1368+
"data": 123,
1369+
"polygons": {"type": "Point", "coordinates": [1.0, 2.0]},
1370+
"offset": 11,
1371+
},
1372+
"result": True,
1373+
}
1374+
},
1375+
"status": "finished",
1376+
},
1377+
"job-001": {
1378+
"job_id": "job-001",
1379+
"pg": {
1380+
"offsetpoplygon1": {
1381+
"process_id": "offset_poplygon",
1382+
"namespace": "https://remote.test/offset_poplygon.json",
1383+
"arguments": {
1384+
"data": 123,
1385+
"polygons": {"type": "Point", "coordinates": [3.0, 4.0]},
1386+
"offset": 22,
1387+
},
1388+
"result": True,
1389+
}
1390+
},
1391+
"status": "finished",
1392+
},
1393+
}

tests/internal/processes/test_parse.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,20 @@ def test_schema_equality():
2222
assert Schema({"type": "number"}) != Schema({"type": "string"})
2323

2424

25+
@pytest.mark.parametrize(
26+
["schema", "expected"],
27+
[
28+
({"type": "object", "subtype": "geojson"}, True),
29+
({"type": "object"}, False),
30+
({"subtype": "geojson"}, False),
31+
({"type": "object", "subtype": "vectorzz"}, False),
32+
],
33+
)
34+
def test_schema_accepts_geojson(schema, expected):
35+
assert Schema(schema).accepts_geojson() == expected
36+
assert Schema([{"type": "number"}, schema]).accepts_geojson() == expected
37+
38+
2539
def test_parameter():
2640
p = Parameter.from_dict({
2741
"name": "foo",

0 commit comments

Comments
 (0)