Skip to content

Commit ff9c3f2

Browse files
committed
Issue #604/#644 test coverage for personal UDP mode
and some related refactoring
1 parent f8db877 commit ff9c3f2

File tree

1 file changed

+130
-72
lines changed

1 file changed

+130
-72
lines changed

tests/extra/test_job_management.py

Lines changed: 130 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import copy
12
import json
23
import re
34
import threading
@@ -38,8 +39,8 @@
3839

3940

4041
@pytest.fixture
41-
def con120(requests_mock) -> openeo.Connection:
42-
requests_mock.get(OPENEO_BACKEND, json=build_capabilities(api_version="1.2.0"))
42+
def con(requests_mock) -> openeo.Connection:
43+
requests_mock.get(OPENEO_BACKEND, json=build_capabilities(api_version="1.2.0", udp=True))
4344
con = openeo.Connection(OPENEO_BACKEND)
4445
return con
4546

@@ -1010,72 +1011,69 @@ def test_create_job_db(tmp_path, filename, expected):
10101011

10111012
class TestUDPJobFactory:
10121013
@pytest.fixture
1013-
def dummy_backend(self, requests_mock, con120) -> DummyBackend:
1014-
dummy = DummyBackend(requests_mock=requests_mock, connection=con120)
1014+
def dummy_backend(self, requests_mock, con) -> DummyBackend:
1015+
dummy = DummyBackend(requests_mock=requests_mock, connection=con)
10151016
dummy.setup_simple_job_status_flow(queued=2, running=3, final="finished")
10161017
return dummy
10171018

1018-
@pytest.fixture(autouse=True)
1019-
def remote_process_definitions(self, requests_mock):
1020-
requests_mock.get(
1021-
"https://remote.test/3plus5.json",
1022-
json={
1023-
"id": "3plus5",
1024-
"process_graph": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True},
1019+
PG_3PLUS5 = {
1020+
"id": "3plus5",
1021+
"process_graph": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True},
1022+
}
1023+
PG_INCREMENT = {
1024+
"id": "increment",
1025+
"parameters": [
1026+
{"name": "data", "description": "data", "schema": {"type": "number"}},
1027+
{
1028+
"name": "increment",
1029+
"description": "increment",
1030+
"schema": {"type": "number"},
1031+
"optional": True,
1032+
"default": 1,
10251033
},
1026-
)
1027-
requests_mock.get(
1028-
"https://remote.test/increment.json",
1029-
json={
1030-
"id": "increment",
1031-
"parameters": [
1032-
{"name": "data", "description": "data", "schema": {"type": "number"}},
1033-
{
1034-
"name": "increment",
1035-
"description": "increment",
1036-
"schema": {"type": "number"},
1037-
"optional": True,
1038-
"default": 1,
1039-
},
1040-
],
1041-
"process_graph": {
1042-
"process_id": "add",
1043-
"arguments": {"x": {"from_parameter": "data"}, "y": {"from_parameter": "increment"}},
1044-
"result": True,
1034+
],
1035+
"process_graph": {
1036+
"process_id": "add",
1037+
"arguments": {"x": {"from_parameter": "data"}, "y": {"from_parameter": "increment"}},
1038+
"result": True,
1039+
},
1040+
}
1041+
PG_OFFSET_POLYGON = {
1042+
"id": "offset_polygon",
1043+
"parameters": [
1044+
{"name": "data", "description": "data", "schema": {"type": "number"}},
1045+
{
1046+
"name": "polygons",
1047+
"description": "polygons",
1048+
"schema": {
1049+
"title": "GeoJSON",
1050+
"type": "object",
1051+
"subtype": "geojson",
10451052
},
10461053
},
1047-
)
1048-
requests_mock.get(
1049-
"https://remote.test/offset_poplygon.json",
1050-
json={
1051-
"id": "offset_poplygon",
1052-
"parameters": [
1053-
{"name": "data", "description": "data", "schema": {"type": "number"}},
1054-
{
1055-
"name": "polygons",
1056-
"description": "polygons",
1057-
"schema": {
1058-
"title": "GeoJSON",
1059-
"type": "object",
1060-
"subtype": "geojson",
1061-
},
1062-
},
1063-
{
1064-
"name": "offset",
1065-
"description": "Offset",
1066-
"schema": {"type": "number"},
1067-
"optional": True,
1068-
"default": 0,
1069-
},
1070-
],
1054+
{
1055+
"name": "offset",
1056+
"description": "Offset",
1057+
"schema": {"type": "number"},
1058+
"optional": True,
1059+
"default": 0,
10711060
},
1072-
)
1061+
],
1062+
}
10731063

1074-
def test_minimal(self, con120, dummy_backend):
1064+
@pytest.fixture(autouse=True)
1065+
def remote_process_definitions(self, requests_mock) -> dict:
1066+
mocks = {}
1067+
for pg in [self.PG_3PLUS5, self.PG_INCREMENT, self.PG_OFFSET_POLYGON]:
1068+
process_id = pg["id"]
1069+
mocks[process_id] = requests_mock.get(f"https://remote.test/{process_id}.json", json=pg)
1070+
return mocks
1071+
1072+
def test_minimal(self, con, dummy_backend, remote_process_definitions):
10751073
"""Bare minimum: just start a job, no parameters/arguments"""
10761074
job_factory = UDPJobFactory(process_id="3plus5", namespace="https://remote.test/3plus5.json")
10771075

1078-
job = job_factory.start_job(row=pd.Series({"foo": 123}), connection=con120)
1076+
job = job_factory.start_job(row=pd.Series({"foo": 123}), connection=con)
10791077
assert isinstance(job, BatchJob)
10801078
assert dummy_backend.batch_jobs == {
10811079
"job-000": {
@@ -1092,11 +1090,13 @@ def test_minimal(self, con120, dummy_backend):
10921090
}
10931091
}
10941092

1095-
def test_basic(self, con120, dummy_backend):
1093+
assert remote_process_definitions["3plus5"].call_count == 1
1094+
1095+
def test_basic(self, con, dummy_backend, remote_process_definitions):
10961096
"""Basic parameterized UDP job generation"""
10971097
job_factory = UDPJobFactory(process_id="increment", namespace="https://remote.test/increment.json")
10981098

1099-
job = job_factory.start_job(row=pd.Series({"data": 123}), connection=con120)
1099+
job = job_factory.start_job(row=pd.Series({"data": 123}), connection=con)
11001100
assert isinstance(job, BatchJob)
11011101
assert dummy_backend.batch_jobs == {
11021102
"job-000": {
@@ -1112,6 +1112,7 @@ def test_basic(self, con120, dummy_backend):
11121112
"status": "created",
11131113
}
11141114
}
1115+
assert remote_process_definitions["increment"].call_count == 1
11151116

11161117
@pytest.mark.parametrize(
11171118
["parameter_defaults", "row", "expected_arguments"],
@@ -1122,15 +1123,15 @@ def test_basic(self, con120, dummy_backend):
11221123
({"increment": 5}, {"data": 123, "increment": 1000}, {"data": 123, "increment": 1000}),
11231124
],
11241125
)
1125-
def test_basic_parameterization(self, con120, dummy_backend, parameter_defaults, row, expected_arguments):
1126+
def test_basic_parameterization(self, con, dummy_backend, parameter_defaults, row, expected_arguments):
11261127
"""Basic parameterized UDP job generation"""
11271128
job_factory = UDPJobFactory(
11281129
process_id="increment",
11291130
namespace="https://remote.test/increment.json",
11301131
parameter_defaults=parameter_defaults,
11311132
)
11321133

1133-
job = job_factory.start_job(row=pd.Series(row), connection=con120)
1134+
job = job_factory.start_job(row=pd.Series(row), connection=con)
11341135
assert isinstance(job, BatchJob)
11351136
assert dummy_backend.batch_jobs == {
11361137
"job-000": {
@@ -1154,7 +1155,9 @@ def job_manager(self, tmp_path, dummy_backend) -> MultiBackendJobManager:
11541155
job_manager.add_backend("dummy", connection=dummy_backend.connection, parallel_jobs=1)
11551156
return job_manager
11561157

1157-
def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock):
1158+
def test_with_job_manager_remote_basic(
1159+
self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock, remote_process_definitions
1160+
):
11581161
job_starter = UDPJobFactory(
11591162
process_id="increment",
11601163
namespace="https://remote.test/increment.json",
@@ -1175,6 +1178,7 @@ def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job
11751178
}
11761179
)
11771180
assert set(job_db.read().status) == {"finished"}
1181+
assert remote_process_definitions["increment"].call_count == 1
11781182

11791183
assert dummy_backend.batch_jobs == {
11801184
"job-000": {
@@ -1247,7 +1251,7 @@ def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job
12471251
),
12481252
],
12491253
)
1250-
def test_udp_job_manager_parameter_handling(
1254+
def test_with_job_manager_remote_parameter_handling(
12511255
self,
12521256
tmp_path,
12531257
requests_mock,
@@ -1317,10 +1321,10 @@ def test_udp_job_manager_parameter_handling(
13171321
},
13181322
}
13191323

1320-
def test_udp_job_manager_geometry(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock):
1324+
def test_with_job_manager_remote_geometry(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock):
13211325
job_starter = UDPJobFactory(
1322-
process_id="offset_poplygon",
1323-
namespace="https://remote.test/offset_poplygon.json",
1326+
process_id="offset_polygon",
1327+
namespace="https://remote.test/offset_polygon.json",
13241328
parameter_defaults={"data": 123},
13251329
)
13261330

@@ -1361,9 +1365,9 @@ def test_udp_job_manager_geometry(self, tmp_path, requests_mock, dummy_backend,
13611365
"job-000": {
13621366
"job_id": "job-000",
13631367
"pg": {
1364-
"offsetpoplygon1": {
1365-
"process_id": "offset_poplygon",
1366-
"namespace": "https://remote.test/offset_poplygon.json",
1368+
"offsetpolygon1": {
1369+
"process_id": "offset_polygon",
1370+
"namespace": "https://remote.test/offset_polygon.json",
13671371
"arguments": {
13681372
"data": 123,
13691373
"polygons": {"type": "Point", "coordinates": [1.0, 2.0]},
@@ -1377,9 +1381,9 @@ def test_udp_job_manager_geometry(self, tmp_path, requests_mock, dummy_backend,
13771381
"job-001": {
13781382
"job_id": "job-001",
13791383
"pg": {
1380-
"offsetpoplygon1": {
1381-
"process_id": "offset_poplygon",
1382-
"namespace": "https://remote.test/offset_poplygon.json",
1384+
"offsetpolygon1": {
1385+
"process_id": "offset_polygon",
1386+
"namespace": "https://remote.test/offset_polygon.json",
13831387
"arguments": {
13841388
"data": 123,
13851389
"polygons": {"type": "Point", "coordinates": [3.0, 4.0]},
@@ -1391,3 +1395,57 @@ def test_udp_job_manager_geometry(self, tmp_path, requests_mock, dummy_backend,
13911395
"status": "finished",
13921396
},
13931397
}
1398+
1399+
def test_with_job_manager_udp_basic(
1400+
self, tmp_path, requests_mock, con, dummy_backend, job_manager, sleep_mock, remote_process_definitions
1401+
):
1402+
# make deep copy
1403+
udp = copy.deepcopy(self.PG_INCREMENT)
1404+
# Register personal UDP
1405+
increment_udp_mock = requests_mock.get(con.build_url("/process_graphs/increment"), json=udp)
1406+
1407+
job_starter = UDPJobFactory(
1408+
process_id="increment",
1409+
# No namespace to trigger personal UDP mode
1410+
namespace=None,
1411+
parameter_defaults={"increment": 5},
1412+
)
1413+
assert increment_udp_mock.call_count == 0
1414+
1415+
df = pd.DataFrame({"data": [3, 5]})
1416+
job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df)
1417+
1418+
stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter)
1419+
assert stats == dirty_equals.IsPartialDict(
1420+
{
1421+
"start_job call": 2,
1422+
"job finished": 2,
1423+
}
1424+
)
1425+
assert increment_udp_mock.call_count == 2
1426+
assert set(job_db.read().status) == {"finished"}
1427+
1428+
assert dummy_backend.batch_jobs == {
1429+
"job-000": {
1430+
"job_id": "job-000",
1431+
"pg": {
1432+
"increment1": {
1433+
"process_id": "increment",
1434+
"arguments": {"data": 3, "increment": 5},
1435+
"result": True,
1436+
}
1437+
},
1438+
"status": "finished",
1439+
},
1440+
"job-001": {
1441+
"job_id": "job-001",
1442+
"pg": {
1443+
"increment1": {
1444+
"process_id": "increment",
1445+
"arguments": {"data": 5, "increment": 5},
1446+
"result": True,
1447+
}
1448+
},
1449+
"status": "finished",
1450+
},
1451+
}

0 commit comments

Comments
 (0)