22import re
33from typing import Any , Dict
44from unittest import mock
5- from unittest .mock import MagicMock , patch
5+ from unittest .mock import MagicMock , patch , ANY
6+ import requests_mock
7+ import json
8+ import threading
69
710import dirty_equals
811import geopandas as gpd
1316import pytest
1417from requests .auth import AuthBase
1518from shapely .geometry import Point
19+ import requests
1620
1721from openeo .extra .job_management import MultiBackendJobManager
1822from openeo .extra .job_management .stac_job_db import STACAPIJobDatabase
@@ -24,11 +28,6 @@ def mock_auth():
2428 return MagicMock (spec = AuthBase )
2529
2630
27- @pytest .fixture
28- def mock_stac_api_job_database (mock_auth ) -> STACAPIJobDatabase :
29- return STACAPIJobDatabase (collection_id = "test_id" , stac_root_url = "http://fake-stac-api.test" , auth = mock_auth )
30-
31-
3231@pytest .fixture
3332def mock_pystac_client (dummy_stac_item ):
3433 mock_client = MagicMock (spec = pystac_client .Client )
@@ -45,6 +44,14 @@ def mock_pystac_client(dummy_stac_item):
4544 with patch ("pystac_client.Client.open" , return_value = mock_client ):
4645 yield mock_client
4746
47+ @pytest .fixture
48+ def mock_stac_api_job_database (mock_pystac_client , mock_auth ) -> STACAPIJobDatabase :
49+ # Now STACAPIJobDatabase will use the mocked pystac_client.Client
50+ return STACAPIJobDatabase (
51+ collection_id = "test_id" ,
52+ stac_root_url = "http://fake-stac-api.test" ,
53+ auth = mock_auth
54+ )
4855
4956@pytest .fixture
5057def job_db_exists (mock_pystac_client ) -> STACAPIJobDatabase :
@@ -245,6 +252,93 @@ def test_exists(self, job_db_exists, job_db_not_exists):
245252 assert job_db_exists .exists () == True
246253 assert job_db_not_exists .exists () == False
247254
255+ def test_persist_http_error (self , mock_stac_api_job_database , bulk_dataframe ):
256+ """Test error handling during STAC item persistence"""
257+ with patch ("requests.post" ) as mock_post :
258+ mock_post .return_value .raise_for_status .side_effect = requests .HTTPError ("500 Server Error" )
259+ with pytest .raises (requests .HTTPError ):
260+ mock_stac_api_job_database .persist (bulk_dataframe )
261+
262+ #TODO consider if redundant given test_run_jobs_basic
263+ def test_persist_posts_item_payload_to_stac_api (self , mock_stac_api_job_database , mock_auth ):
264+ job_db = mock_stac_api_job_database
265+
266+ df = pd .DataFrame ({
267+ "item_id" : ["test" ],
268+ "geometry" : [None ],
269+ "datetime" : ["2020-05-22T00:00:00Z" ],
270+ "some_property" : ["value" ],
271+ }).set_index ("item_id" )
272+
273+ with requests_mock .Mocker () as m :
274+ # declaratively register responses
275+ m .post ("http://fake-stac-api.test/collections" , json = {"id" :"test_id" }, status_code = 201 )
276+ m .post ("http://fake-stac-api.test/collections/test_id/bulk_items" , json = {"inserted" :1 }, status_code = 201 )
277+ # also register a fallback items endpoint if some implementations use /items
278+ m .post ("http://fake-stac-api.test/collections/test_id/items" , json = {"inserted" :1 }, status_code = 201 )
279+
280+ job_db .persist (df )
281+
282+ # ensure the create-collection call happened
283+ assert any (req .url .endswith ("/collections" ) and req .method == "POST" for req in m .request_history )
284+
285+ # find the collection-specific POST request (either vendor bulk or standard items)
286+ coll_req = next ((req for req in m .request_history if "collections/test_id" in req .url ), None )
287+ assert coll_req is not None , "No collection-specific POST recorded"
288+
289+ # assert the payload contains the expected item id
290+ body_text = coll_req .text or coll_req .body or "{}"
291+ payload = json .loads (body_text )
292+ # payload might be {"items": {...}} or vendor structure; assert the item id exists somewhere
293+ assert "items" in payload or any ("test" in k for k in payload .keys ()), "payload has no items"
294+
295+ # assert that the item is present in the payload items (if present)
296+ if "items" in payload and isinstance (payload ["items" ], dict ):
297+ assert "test" in payload ["items" ]
298+
299+ def test_complex_geometry_handling (self , mock_stac_api_job_database ):
300+ """Test with complex geometry types"""
301+ complex_geom = {
302+ "type" : "MultiPolygon" ,
303+ "coordinates" : [[[[0 ,0 ],[10 ,0 ],[10 ,10 ],[0 ,10 ],[0 ,0 ]]]]
304+ }
305+ series = pd .Series ({
306+ "item_id" : "geo_test" ,
307+ "geometry" : complex_geom ,
308+ "status" : "test"
309+ }, name = "geo_test" )
310+
311+ mock_stac_api_job_database .has_geometry = True
312+ item = mock_stac_api_job_database .item_from (series )
313+
314+ assert item .geometry == complex_geom
315+ assert item .bbox == (0 , 0 , 10 , 10 )
316+
317+
318+ def test_create_collection_with_custom_metadata (self , mock_stac_api_job_database ):
319+ """Test collection creation with custom metadata"""
320+ custom_collection = pystac .Collection (
321+ id = "custom-collection" ,
322+ description = "Test collection" ,
323+ extent = pystac .Extent (
324+ spatial = pystac .SpatialExtent ([[- 180 , - 90 , 180 , 90 ]]),
325+ temporal = pystac .TemporalExtent ([[None , None ]])
326+ ),
327+ extra_fields = {
328+ "custom_field" : "custom_value" ,
329+ "license" : "proprietary"
330+ }
331+ )
332+
333+ with patch ("requests.post" ) as mock_post :
334+ mock_stac_api_job_database ._create_collection (custom_collection )
335+ posted_data = mock_post .call_args [1 ]["json" ]
336+
337+ assert posted_data ["id" ] == "custom-collection"
338+ assert posted_data ["custom_field" ] == "custom_value"
339+ assert "_auth" in posted_data # Verify default auth added
340+
341+
248342 @patch ("openeo.extra.job_management.stac_job_db.STACAPIJobDatabase.persist" , return_value = None )
249343 def test_initialize_from_df_non_existing (
250344 self , mock_persist , job_db_not_exists , dummy_dataframe , normalized_dummy_dataframe
@@ -529,6 +623,49 @@ def _get_search(self, request, context):
529623 "links" : [],
530624 }
531625
626+ def test_upload_items_bulk_thread_behavior (requests_mock , mock_stac_api_job_database ):
627+ """Test thread pool behavior during bulk upload."""
628+ # Configure test
629+ db = mock_stac_api_job_database
630+ db .collection_id = "test"
631+ db .stac_root_url = "http://fake-stac-api.test"
632+ db .bulk_size = 2 # Small chunk size to force multiple chunks
633+
634+ # Track thread activity
635+ thread_counts = []
636+
637+ # Mock the ingest endpoint with a simpler handler
638+ def request_handler (request , context ):
639+ # Record thread count when request is made
640+ thread_counts .append (
641+ sum (1 for t in threading .enumerate () if "ThreadPoolExecutor" in t .name )
642+ )
643+ return {"inserted" : 1 }
644+
645+ requests_mock .post (
646+ f"{ db .stac_root_url } /collections/test/bulk_items" ,
647+ json = request_handler ,
648+ status_code = 201
649+ )
650+
651+ # Create test items (5 items with bulk_size=2 → 3 chunks)
652+ items = [pystac .Item (id = f"id-{ i } " , geometry = None , bbox = None ,
653+ datetime = FAKE_NOW , properties = {})
654+ for i in range (5 )]
655+
656+ # Execute
657+ db ._upload_items_bulk (db .collection_id , items )
658+
659+ # 1. Verify all chunks were processed
660+ assert requests_mock .call_count == 3
661+
662+ # 2. Verify multiple workers were used (at least 2 threads active at some point)
663+ assert max (thread_counts ) > 1 , "Should have used multiple workers concurrently"
664+
665+ # 3. Verify all threads cleaned up
666+ final_workers = sum (1 for t in threading .enumerate () if "ThreadPoolExecutor" in t .name )
667+ assert final_workers == 0 , "All worker threads should be cleaned up"
668+
532669
533670def test_run_jobs_basic (tmp_path , dummy_backend_foo , requests_mock , sleep_mock ):
534671 stac_api_url = "http://stacapi.test"
@@ -626,3 +763,5 @@ def create_job(row, connection, **kwargs):
626763 ),
627764 }
628765 }
766+
767+
0 commit comments