Skip to content

Commit 6bb1232

Browse files
committed
Port more STACAPIJobDatabase test tweaks from #798
1 parent aef9e26 commit 6bb1232

File tree

1 file changed

+194
-124
lines changed

1 file changed

+194
-124
lines changed

tests/extra/job_management/test_stac_job_db.py

Lines changed: 194 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -78,32 +78,29 @@ def _pystac_item(
7878
geometry: Optional = None,
7979
bbox: Optional = None,
8080
datetime_: Union[None, str, datetime.date, datetime.date] = "2025-06-07",
81+
links: Optional[List[Union[pystac.Link, dict]]] = None,
82+
**kwargs,
8183
) -> pystac.Item:
8284
"""Helper to easily construct a dummy but valid pystac.Item"""
8385
if isinstance(datetime_, str):
8486
datetime_ = pystac.utils.str_to_datetime(datetime_)
8587
elif isinstance(datetime_, datetime.date):
8688
datetime_ = datetime.datetime.combine(datetime_, datetime.time.min, tzinfo=datetime.timezone.utc)
8789

88-
return pystac.Item(
90+
item = pystac.Item(
8991
id=id,
9092
geometry=geometry,
9193
bbox=bbox,
9294
properties=properties or {},
9395
datetime=datetime_,
96+
**kwargs,
9497
)
9598

99+
if links:
100+
for link in links:
101+
item.add_link(pystac.Link.from_dict(link) if isinstance(link, dict) else link)
96102

97-
@pytest.fixture
98-
def bulk_dataframe():
99-
return pd.DataFrame(
100-
{
101-
"item_id": [f"test-{i}" for i in range(10)],
102-
"some_property": [f"value-{i}" for i in range(10)],
103-
"datetime": [f"2020-{i+1:02d}-01" for i in range(10)],
104-
},
105-
index=[i for i in range(10)],
106-
)
103+
return item
107104

108105

109106
class TestSTACAPIJobDatabase:
@@ -112,27 +109,38 @@ def test_exists(self, job_db_exists, job_db_not_exists):
112109
assert job_db_exists.exists() == True
113110
assert job_db_not_exists.exists() == False
114111

112+
@pytest.mark.parametrize(
113+
["df", "expected"],
114+
[
115+
(
116+
pd.DataFrame({"no": [1], "geometry": [2], "here": [3]}),
117+
pd.DataFrame(
118+
data={
119+
"item_id": [0],
120+
"no": [1],
121+
"geometry": [2],
122+
"here": [3],
123+
**_common_normalized_df_data(),
124+
},
125+
),
126+
),
127+
(
128+
pd.DataFrame({"item_id": ["item-123", "item-456"], "hello": ["world", "earth"]}),
129+
pd.DataFrame(
130+
data={
131+
"item_id": ["item-123", "item-456"],
132+
"hello": ["world", "earth"],
133+
**_common_normalized_df_data(rows=2),
134+
},
135+
),
136+
),
137+
],
138+
)
115139
@patch("openeo.extra.job_management.stac_job_db.STACAPIJobDatabase.persist", return_value=None)
116-
def test_initialize_from_df_non_existing(self, mock_persist, job_db_not_exists):
117-
df = pd.DataFrame(
118-
{
119-
"no": [1],
120-
"geometry": [2],
121-
"here": [3],
122-
}
123-
)
140+
def test_initialize_from_df_non_existing(self, mock_persist, job_db_not_exists, df, expected):
124141
job_db_not_exists.initialize_from_df(df)
125142

126143
mock_persist.assert_called_once()
127-
expected = pd.DataFrame(
128-
{
129-
"item_id": [0],
130-
"no": [1],
131-
"geometry": [2],
132-
"here": [3],
133-
**_common_normalized_df_data(),
134-
},
135-
)
136144
pdt.assert_frame_equal(mock_persist.call_args[0][0], expected)
137145
assert job_db_not_exists.has_geometry == False
138146

@@ -182,44 +190,73 @@ def test_initialize_from_df_existing_append(
182190
pdt.assert_frame_equal(mock_persist.call_args[0][0], expected_df)
183191
assert job_db_exists.has_geometry == False
184192

193+
@pytest.mark.parametrize(
194+
["df", "expected"],
195+
[
196+
(
197+
gpd.GeoDataFrame(
198+
{
199+
"there": [1],
200+
"is": [2],
201+
"geometry": [Point(1, 1)],
202+
},
203+
geometry="geometry",
204+
),
205+
pd.DataFrame(
206+
{
207+
"item_id": [0],
208+
"there": [1],
209+
"is": [2],
210+
"geometry": [{"type": "Point", "coordinates": (1.0, 1.0)}],
211+
**_common_normalized_df_data(),
212+
}
213+
),
214+
),
215+
(
216+
gpd.GeoDataFrame(
217+
data={"item_id": ["item-123", "item-456"], "hello": ["world", "earth"]},
218+
geometry=[Point(1, 1), Point(2, 2)],
219+
),
220+
pd.DataFrame(
221+
data={
222+
"item_id": ["item-123", "item-456"],
223+
"hello": ["world", "earth"],
224+
"geometry": [
225+
{"type": "Point", "coordinates": (1.0, 1.0)},
226+
{"type": "Point", "coordinates": (2.0, 2.0)},
227+
],
228+
**_common_normalized_df_data(rows=2),
229+
},
230+
),
231+
),
232+
],
233+
)
185234
@patch("openeo.extra.job_management.stac_job_db.STACAPIJobDatabase.persist", return_value=None)
186-
def test_initialize_from_df_with_geometry(self, mock_persists, job_db_not_exists):
187-
df = gpd.GeoDataFrame(
188-
{
189-
"there": [1],
190-
"is": [2],
191-
"geometry": [Point(1, 1)],
192-
},
193-
geometry="geometry",
194-
)
235+
def test_initialize_from_df_with_geometry(self, mock_persists, job_db_not_exists, df, expected):
195236
job_db_not_exists.initialize_from_df(df)
196237

197238
mock_persists.assert_called_once()
198-
expected = pd.DataFrame(
199-
{
200-
"item_id": [0],
201-
"there": [1],
202-
"is": [2],
203-
"geometry": [{"type": "Point", "coordinates": (1.0, 1.0)}],
204-
**_common_normalized_df_data(),
205-
}
206-
)
207239
pdt.assert_frame_equal(mock_persists.call_args[0][0], expected)
208240
assert job_db_not_exists.has_geometry == True
209241
assert job_db_not_exists.geometry_column == "geometry"
210242

211-
def test_series_from(self, job_db_exists):
212-
item = pystac.Item(
213-
id="test",
214-
geometry=None,
215-
bbox=None,
216-
properties={
217-
"datetime": "2020-05-22T00:00:00Z",
218-
"some_property": "value",
219-
},
220-
datetime=datetime.datetime(2020, 5, 22),
221-
)
222-
expected = pd.Series({"datetime": "2020-05-22T00:00:00Z", "some_property": "value"}, name="test")
243+
@pytest.mark.parametrize(
244+
["item", "expected"],
245+
[
246+
(
247+
_pystac_item(
248+
id="item-123",
249+
properties={"some_property": "value"},
250+
datetime_=datetime.datetime(2020, 5, 22),
251+
),
252+
pd.Series(
253+
name="item-123",
254+
data={"some_property": "value", "datetime": "2020-05-22T00:00:00Z"},
255+
),
256+
),
257+
],
258+
)
259+
def test_series_from(self, job_db_exists, item, expected):
223260
pdt.assert_series_equal(job_db_exists.series_from(item), expected)
224261

225262
@pytest.mark.parametrize(
@@ -360,89 +397,90 @@ def test_get_by_status_result(self, requests_mock):
360397
),
361398
)
362399

363-
@patch("requests.post")
364-
def test_persist_single_chunk(self, mock_requests_post, bulk_dataframe, job_db_exists):
365-
def bulk_items(df):
366-
all_items = []
367-
if not df.empty:
368-
369-
def handle_row(series):
370-
item = job_db_exists.item_from(series)
371-
job_db_exists._prepare_item(item, job_db_exists.collection_id)
372-
all_items.append(item)
400+
def test_persist_single_chunk(self, requests_mock, job_db_exists):
401+
rows = 5
402+
bulk_dataframe = pd.DataFrame(
403+
data={
404+
"item_id": [f"item-{i}" for i in range(rows)],
405+
"datetime": [f"2020-{i + 1:02d}-01" for i in range(rows)],
406+
"some_property": [f"value-{i}" for i in range(rows)],
407+
},
408+
)
373409

374-
df.apply(handle_row, axis=1)
375-
return all_items
410+
expected_items = [
411+
_pystac_item(
412+
id=f"item-{i}",
413+
properties={"some_property": f"value-{i}"},
414+
datetime_=f"2020-{i + 1:02d}-01",
415+
collection="collection-1",
416+
links=[{"rel": "collection", "href": "collection-1"}],
417+
)
418+
for i in range(rows)
419+
]
420+
expected_items = {item.id: item.to_dict() for item in expected_items}
376421

377-
items = bulk_items(bulk_dataframe)
422+
def post_bulk_items(request, context):
423+
post_data = request.json()
424+
assert post_data == {"method": "upsert", "items": expected_items}
425+
return {"status": "success"}
378426

379-
mock_requests_post.return_value.status_code = 200
380-
mock_requests_post.return_value.json.return_value = {"status": "success"}
381-
mock_requests_post.reason = "OK"
427+
post_bulk_items_mock = requests_mock.post(
428+
re.compile(r"http://fake-stac-api/collections/.*/bulk_items"), json=post_bulk_items
429+
)
382430

383431
job_db_exists.persist(bulk_dataframe)
432+
assert post_bulk_items_mock.called
384433

385-
mock_requests_post.assert_called_once()
386434

387-
mock_requests_post.assert_called_with(
388-
url=f"http://fake-stac-api/collections/{job_db_exists.collection_id}/bulk_items",
389-
auth=None,
390-
json={
391-
"method": "upsert",
392-
"items": {item.id: item.to_dict() for item in items},
435+
436+
def test_persist_multiple_chunks(self, requests_mock, job_db_exists):
437+
rows = 12
438+
bulk_dataframe = pd.DataFrame(
439+
data={
440+
"item_id": [f"item-{i}" for i in range(rows)],
441+
"datetime": [f"2020-{i + 1:02d}-01" for i in range(rows)],
442+
"some_property": [f"value-{i}" for i in range(rows)],
393443
},
394444
)
395445

396-
@patch("requests.post")
397-
def test_persist_multiple_chunks(self, mock_requests_post, bulk_dataframe, job_db_exists):
398-
def bulk_items(df):
399-
all_items = []
400-
if not df.empty:
401-
402-
def handle_row(series):
403-
item = job_db_exists.item_from(series)
404-
job_db_exists._prepare_item(item, job_db_exists.collection_id)
405-
all_items.append(item)
406-
407-
df.apply(handle_row, axis=1)
408-
return all_items
446+
chunks = []
409447

410-
items = bulk_items(bulk_dataframe)
448+
def post_bulk_items(request, context):
449+
nonlocal chunks
450+
post_data = request.json()
451+
chunks.append(post_data)
452+
return {"status": "success"}
411453

412-
mock_requests_post.return_value.status_code = 200
413-
mock_requests_post.return_value.json.return_value = {"status": "success"}
414-
mock_requests_post.reason = "OK"
454+
post_bulk_items_mock = requests_mock.post(
455+
re.compile(r"http://fake-stac-api/collections/.*/bulk_items"), json=post_bulk_items
456+
)
415457

416-
job_db_exists.bulk_size = 3
417-
job_db_exists._upload_items_bulk(collection_id=job_db_exists.collection_id, items=items)
458+
job_db_exists.bulk_size = 5
459+
job_db_exists.persist(bulk_dataframe)
418460

419-
# 10 items in total, 3 items per chunk, should result in 4 calls
420-
assert sorted(
421-
(c.kwargs for c in mock_requests_post.call_args_list),
422-
key=lambda d: sorted(d["json"]["items"].keys()),
423-
) == [
424-
{
425-
"url": f"http://fake-stac-api/collections/{job_db_exists.collection_id}/bulk_items",
426-
"auth": None,
427-
"json": {"method": "upsert", "items": {item.id: item.to_dict() for item in items[:3]}},
428-
},
461+
assert post_bulk_items_mock.call_count == 3
462+
expected = [
429463
{
430-
"url": f"http://fake-stac-api/collections/{job_db_exists.collection_id}/bulk_items",
431-
"auth": None,
432-
"json": {"method": "upsert", "items": {item.id: item.to_dict() for item in items[3:6]}},
433-
},
434-
{
435-
"url": f"http://fake-stac-api/collections/{job_db_exists.collection_id}/bulk_items",
436-
"auth": None,
437-
"json": {"method": "upsert", "items": {item.id: item.to_dict() for item in items[6:9]}},
438-
},
439-
{
440-
"url": f"http://fake-stac-api/collections/{job_db_exists.collection_id}/bulk_items",
441-
"auth": None,
442-
"json": {"method": "upsert", "items": {item.id: item.to_dict() for item in items[9:]}},
443-
},
464+
"method": "upsert",
465+
"items": {
466+
f"item-{i}": _pystac_item(
467+
id=f"item-{i}",
468+
properties={"some_property": f"value-{i}"},
469+
datetime_=f"2020-{i + 1:02d}-01",
470+
collection="collection-1",
471+
links=[{"rel": "collection", "href": "collection-1"}],
472+
).to_dict()
473+
for i in range(start, end)
474+
},
475+
}
476+
for (start, end) in [(0, 5), (5, 10), (10, 12)]
444477
]
445478

479+
def normalize(data):
480+
return sorted(data, key=lambda d: min(d["items"].keys()))
481+
482+
assert normalize(chunks) == normalize(expected)
483+
446484

447485
@pytest.fixture
448486
def dummy_backend_foo(requests_mock) -> DummyBackend:
@@ -576,6 +614,38 @@ def test_run_jobs_basic(tmp_path, dummy_backend_foo, requests_mock, sleep_mock):
576614
}
577615
)
578616
job_db.initialize_from_df(df=df)
617+
assert dummy_stac_api.items == {
618+
"collection-123": {
619+
"item-2024": dirty_equals.IsPartialDict(
620+
{
621+
"type": "Feature",
622+
"id": "item-2024",
623+
"properties": dirty_equals.IsPartialDict(
624+
{
625+
"year": 2024,
626+
"id": None,
627+
"status": "not_started",
628+
"backend_name": None,
629+
}
630+
),
631+
}
632+
),
633+
"item-2025": dirty_equals.IsPartialDict(
634+
{
635+
"type": "Feature",
636+
"id": "item-2025",
637+
"properties": dirty_equals.IsPartialDict(
638+
{
639+
"year": 2025,
640+
"id": None,
641+
"status": "not_started",
642+
"backend_name": None,
643+
}
644+
),
645+
}
646+
),
647+
}
648+
}
579649

580650
def create_job(row, connection, **kwargs):
581651
year = int(row["year"])

0 commit comments

Comments
 (0)