Skip to content

Commit 6309c2b

Browse files
committed
WIP
1 parent b8ac36e commit 6309c2b

File tree

2 files changed

+41
-30
lines changed

2 files changed

+41
-30
lines changed

openeo/extra/job_management/stac_job_db.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,8 @@ def persist(self, df: pd.DataFrame):
235235
else:
236236
# Merge data on item_id (in the index)
237237
df_to_persist = existing_df
238+
# TODO: better way to do update without risk for data update loss?
239+
assert set(df.index).issubset(df_to_persist.index)
238240
df_to_persist.update(df, overwrite=True)
239241

240242
items_to_persist = [self.item_from(s) for _, s in df_to_persist.iterrows()]

tests/extra/job_management/test_stac_job_db.py

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import datetime
22
import re
3-
from typing import Any, Dict, Optional, Union
3+
from typing import Any, Dict, List, Optional, Union
44
from unittest import mock
55
from unittest.mock import MagicMock, patch
66

@@ -139,20 +139,27 @@ def _pystac_item(
139139
geometry: Optional = None,
140140
bbox: Optional = None,
141141
datetime_: Union[None, str, datetime.date, datetime.date] = "2025-06-07",
142+
links: Optional[List[Union[pystac.Link, dict]]] = None,
143+
**kwargs,
142144
) -> pystac.Item:
143145
"""Helper to easily construct a dummy but valid pystac.Item"""
144146
if isinstance(datetime_, str):
145147
datetime_ = pystac.utils.str_to_datetime(datetime_)
146148
elif isinstance(datetime_, datetime.date):
147149
datetime_ = datetime.datetime.combine(datetime_, datetime.time.min, tzinfo=datetime.timezone.utc)
148150

149-
return pystac.Item(
151+
item = pystac.Item(
150152
id=id,
151153
geometry=geometry,
152154
bbox=bbox,
153155
properties=properties or {},
154156
datetime=datetime_,
157+
**kwargs,
155158
)
159+
if links:
160+
for link in links:
161+
item.add_link(pystac.Link.from_dict(link) if isinstance(link, dict) else link)
162+
return item
156163

157164

158165
@pytest.fixture
@@ -422,38 +429,40 @@ def test_get_by_status_result(self, job_db_exists):
422429
),
423430
)
424431

425-
@patch("requests.post")
426-
def test_persist_single_chunk(self, mock_requests_post, bulk_dataframe, job_db_exists):
427-
def bulk_items(df):
428-
all_items = []
429-
if not df.empty:
430-
431-
def handle_row(series):
432-
item = job_db_exists.item_from(series)
433-
job_db_exists._prepare_item(item, job_db_exists.collection_id)
434-
all_items.append(item)
435-
436-
df.apply(handle_row, axis=1)
437-
return all_items
432+
def test_persist_single_chunk(self, requests_mock, job_db_exists, mock_pystac_client):
433+
rows = 5
434+
bulk_dataframe = pd.DataFrame(
435+
index=[f"item-{i}" for i in range(rows)],
436+
data={
437+
"datetime": [f"2020-{i + 1:02d}-01" for i in range(rows)],
438+
"some_property": [f"value-{i}" for i in range(rows)],
439+
},
440+
)
441+
mock_pystac_client.search.return_value.items.return_value = []
442+
443+
expected_items = [
444+
_pystac_item(
445+
id=f"item-{i}",
446+
properties={"some_property": f"value-{i}"},
447+
datetime_=f"2020-{i + 1:02d}-01",
448+
collection="collection-1",
449+
links=[{"rel": "collection", "href": "collection-1"}],
450+
)
451+
for i in range(rows)
452+
]
453+
expected_items = {item.id: item.to_dict() for item in expected_items}
438454

439-
items = bulk_items(bulk_dataframe)
455+
def post_bulk_items(request, context):
456+
post_data = request.json()
457+
assert post_data == {"method": "upsert", "items": expected_items}
458+
return {"status": "success"}
440459

441-
mock_requests_post.return_value.status_code = 200
442-
mock_requests_post.return_value.json.return_value = {"status": "success"}
443-
mock_requests_post.reason = "OK"
460+
post_bulk_items_mock = requests_mock.post(
461+
re.compile(r"http://fake-stac-api/collections/.*/bulk_items"), json=post_bulk_items
462+
)
444463

445464
job_db_exists.persist(bulk_dataframe)
446-
447-
mock_requests_post.assert_called_once()
448-
449-
mock_requests_post.assert_called_with(
450-
url=f"http://fake-stac-api/collections/{job_db_exists.collection_id}/bulk_items",
451-
auth=None,
452-
json={
453-
"method": "upsert",
454-
"items": {item.id: item.to_dict() for item in items},
455-
},
456-
)
465+
assert post_bulk_items_mock.called
457466

458467
@patch("requests.post")
459468
def test_persist_multiple_chunks(self, mock_requests_post, bulk_dataframe, job_db_exists):

0 commit comments

Comments
 (0)