Skip to content

Commit 4dfdf77

Browse files
committed
Add minimal test of MultiBackendJobManager.run_jobs with STACAPIJobDatabase
related to #719/#736
1 parent 98fae3d commit 4dfdf77

File tree

2 files changed

+169
-2
lines changed

2 files changed

+169
-2
lines changed

openeo/extra/job_management/stac_job_db.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def __init__(
2929
self,
3030
collection_id: str,
3131
stac_root_url: str,
32-
auth: requests.auth.AuthBase,
32+
auth: Optional[requests.auth.AuthBase] = None,
3333
has_geometry: bool = False,
3434
geometry_column: str = "geometry",
3535
):
@@ -53,7 +53,7 @@ def __init__(
5353

5454
def exists(self) -> bool:
5555
return any(c.id == self.collection_id for c in self.client.get_collections())
56-
56+
5757
def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
5858
"""
5959
Normalize the given dataframe to be compatible with :py:class:`MultiBackendJobManager`
@@ -221,6 +221,7 @@ def _ingest_bulk(self, items: List[pystac.Item]) -> dict:
221221
if not all(i.collection_id == collection_id for i in items):
222222
raise Exception("All collection IDs should be identical for bulk ingests")
223223

224+
# TODO: this "bulk_items" endpoint is from obscure "bulk transactions" extension?
224225
url_path = f"collections/{collection_id}/bulk_items"
225226
data = {"method": "upsert", "items": {item.id: item.to_dict() for item in items}}
226227
response = requests.post(url=self.join_url(url_path), auth=self._auth, json=data)

tests/extra/job_management/test_stac_job_db.py

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import datetime
2+
import re
3+
from typing import Any, Dict
4+
from unittest import mock
25
from unittest.mock import MagicMock, patch
36

7+
import dirty_equals
48
import geopandas as gpd
59
import pandas as pd
610
import pandas.testing as pdt
@@ -12,6 +16,7 @@
1216

1317
from openeo.extra.job_management import MultiBackendJobManager
1418
from openeo.extra.job_management.stac_job_db import STACAPIJobDatabase
19+
from openeo.rest._testing import DummyBackend
1520

1621

1722
@pytest.fixture
@@ -399,3 +404,164 @@ def handle_row(series):
399404
"json": {"method": "upsert", "items": {item.id: item.to_dict() for item in items[9:]}},
400405
},
401406
]
407+
408+
409+
@pytest.fixture
410+
def dummy_backend_foo(requests_mock) -> DummyBackend:
411+
dummy = DummyBackend.at_url("https://foo.test", requests_mock=requests_mock)
412+
dummy.setup_simple_job_status_flow(queued=1, running=2)
413+
return dummy
414+
415+
416+
@pytest.fixture
417+
def sleep_mock():
418+
with mock.patch("time.sleep") as sleep:
419+
yield sleep
420+
421+
422+
class DummyStacApi:
423+
"""Minimal dummy implementation of a STAC API for testing purposes."""
424+
425+
def __init__(self, root_url: str, requests_mock):
426+
self.root_url = root_url.rstrip("/")
427+
self._requests_mock = requests_mock
428+
429+
requests_mock.get(f"{self.root_url}/", json=self._get_root())
430+
self.collections = []
431+
requests_mock.get(f"{self.root_url}/collections", json=self._get_collections)
432+
requests_mock.post(f"{self.root_url}/collections", json=self._post_collections)
433+
434+
self.items: Dict[str, Dict[str, Any]] = {}
435+
requests_mock.post(
436+
re.compile(rf"{self.root_url}/collections/[^/]+/bulk_items"), json=self._post_collections_bulk_items
437+
)
438+
439+
requests_mock.get(f"{self.root_url}/search?", json=self._get_search)
440+
441+
def _get_root(self) -> dict:
442+
"""Handler of `GET /` requests."""
443+
return {
444+
"stac_version": "1.0.0",
445+
"id": "dummy-stac-api",
446+
"title": "Dummy",
447+
"description": "Dummy STAC API",
448+
"type": "Catalog",
449+
"conformsTo": [
450+
"https://api.stacspec.org/v1.0.0/core",
451+
"https://api.stacspec.org/v1.0.0/collections",
452+
"https://api.stacspec.org/v1.0.0/item-search",
453+
],
454+
"links": [],
455+
}
456+
457+
def _get_collections(self, request, context):
458+
"""Handler of `GET /collections` requests."""
459+
return {"collections": self.collections}
460+
461+
def _post_collections(self, request, context):
462+
"""Handler of `POST /collections` requests."""
463+
post_data = request.json()
464+
self.collections.append(post_data)
465+
return {}
466+
467+
def _post_collections_bulk_items(self, request, context):
468+
"""Handler of `POST /collections/{collection_id}/bulk_items` requests."""
469+
# extract the collection_id from the URL
470+
collection_id = re.search("/collections/([^/]+)/bulk_items", request.url).group(1)
471+
post_data = request.json()
472+
# TODO handle insert/upsert method?
473+
for item_id, item in post_data["items"].items():
474+
if collection_id not in self.items:
475+
self.items[collection_id] = {}
476+
self.items[collection_id][item_id] = item
477+
return {}
478+
479+
def _get_search(self, request, context):
480+
"""Handler of `GET /search` requests."""
481+
collections = request.qs["collections"][0].split(",")
482+
filter = request.qs["filter"][0] if "filter" in request.qs else None
483+
484+
if filter:
485+
# TODO: use a more robust CQL2-text parser?
486+
assert re.match(r"^\s*\"properties\.status\"='\w+'(\s+or\s+\"properties\.status\"='\w+')*\s*$", filter)
487+
statuses = re.findall(r"\"properties\.status\"='(\w+)'", filter)
488+
else:
489+
statuses = None
490+
491+
items = [
492+
item
493+
for cid in collections
494+
for item in self.items.get(cid, {}).values()
495+
if statuses is None or item.get("properties", {}).get("status") in statuses
496+
]
497+
return {
498+
"type": "FeatureCollection",
499+
"features": items,
500+
"links": [],
501+
}
502+
503+
504+
def test_run_jobs_basic(tmp_path, dummy_backend_foo, requests_mock, sleep_mock):
505+
job_manager = MultiBackendJobManager(root_dir=tmp_path, poll_sleep=2)
506+
job_manager.add_backend("foo", connection=dummy_backend_foo.connection)
507+
508+
stac_api_url = "http://stacapi.test"
509+
dummy_stac_api = DummyStacApi(root_url=stac_api_url, requests_mock=requests_mock)
510+
511+
job_db = STACAPIJobDatabase(collection_id="collection-123", stac_root_url=stac_api_url)
512+
df = pd.DataFrame(
513+
{
514+
"item_id": ["item-2024", "item-2025"],
515+
"year": [2024, 2025],
516+
}
517+
)
518+
job_db.initialize_from_df(df=df)
519+
520+
def create_job(row, connection, **kwargs):
521+
year = int(row["year"])
522+
pg = {"dummy1": {"process_id": "dummy", "arguments": {"year": year}, "result": True}}
523+
job = connection.create_job(pg)
524+
return job
525+
526+
run_stats = job_manager.run_jobs(job_db=job_db, start_job=create_job)
527+
528+
assert run_stats == dirty_equals.IsPartialDict(
529+
{
530+
"job finished": 2,
531+
"job launch": 2,
532+
"job start": 2,
533+
"start_job call": 2,
534+
}
535+
)
536+
assert dummy_stac_api.items == {
537+
"collection-123": {
538+
"item-2024": dirty_equals.IsPartialDict(
539+
{
540+
"type": "Feature",
541+
"id": "item-2024",
542+
"properties": dirty_equals.IsPartialDict(
543+
{
544+
"year": 2024,
545+
"id": "job-000",
546+
"status": "finished",
547+
"backend_name": "foo",
548+
}
549+
),
550+
}
551+
),
552+
"item-2025": dirty_equals.IsPartialDict(
553+
{
554+
"type": "Feature",
555+
"id": "item-2025",
556+
"properties": dirty_equals.IsPartialDict(
557+
{
558+
"year": 2025,
559+
"id": "job-001",
560+
"status": "finished",
561+
"backend_name": "foo",
562+
}
563+
),
564+
}
565+
),
566+
}
567+
}

0 commit comments

Comments
 (0)