Skip to content

Commit 47b1e5b

Browse files
committed
ruff format and fixup extra/job_management
1 parent 4171004 commit 47b1e5b

File tree

6 files changed

+35
-51
lines changed

6 files changed

+35
-51
lines changed

openeo/extra/job_management/__init__.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,6 @@ def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabas
383383
self._worker_pool = _JobManagerWorkerThreadPool()
384384

385385
def run_loop():
386-
387386
# TODO: support user-provided `stats`
388387
stats = collections.defaultdict(int)
389388

@@ -726,7 +725,6 @@ def _process_threadworker_updates(
726725
job_db.persist(df_updates)
727726
stats["job_db persist"] = stats.get("job_db persist", 0) + 1
728727

729-
730728
def on_job_done(self, job: BatchJob, row):
731729
"""
732730
Handles jobs that have finished. Can be overridden to provide custom behaviour.
@@ -862,10 +860,10 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
862860
active.loc[i, "running_start_time"] = rfc3339.now_utc()
863861

864862
if self._cancel_running_job_after and new_status == "running":
865-
if (not active.loc[i, "running_start_time"] or pd.isna(active.loc[i, "running_start_time"])):
863+
if not active.loc[i, "running_start_time"] or pd.isna(active.loc[i, "running_start_time"]):
866864
_log.warning(
867865
f"Unknown 'running_start_time' for running job {job_id}. Using current time as an approximation."
868-
)
866+
)
869867
stats["job started running"] += 1
870868
active.loc[i, "running_start_time"] = rfc3339.now_utc()
871869

@@ -910,7 +908,6 @@ def ignore_connection_errors(context: Optional[str] = None, sleep: int = 5):
910908

911909

912910
class FullDataFrameJobDatabase(JobDatabaseInterface):
913-
914911
def __init__(self):
915912
super().__init__()
916913
self._df = None
@@ -1272,7 +1269,7 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob:
12721269
# Skip optional parameters without any fallback default value
12731270
continue
12741271
else:
1275-
raise ValueError(f"Missing required parameter {param_name !r} for process {process_id!r}")
1272+
raise ValueError(f"Missing required parameter {param_name!r} for process {process_id!r}")
12761273

12771274
# Prepare some values/dtypes for JSON encoding
12781275
if isinstance(value, numpy.integer):

openeo/extra/job_management/stac_job_db.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,7 @@ def get_by_status(self, statuses: Iterable[str], max: Optional[int] = None) -> p
188188

189189
if df.shape[0] == 0:
190190
# TODO: What if default columns are overwritten by the user?
191-
df = self._normalize_df(
192-
df
193-
) # Even for an empty dataframe the default columns are required
191+
df = self._normalize_df(df) # Even for an empty dataframe the default columns are required
194192
return df
195193

196194
def get_by_indices(self, indices: Iterable[Union[int, str]]) -> pd.DataFrame:

tests/extra/job_management/test_job_management.py

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import logging
77
import re
88
import threading
9-
import time
109
from pathlib import Path
1110
from time import sleep
1211
from typing import Union
@@ -109,7 +108,6 @@ def execute(self) -> _TaskResult:
109108

110109

111110
class TestMultiBackendJobManager:
112-
113111
@pytest.fixture
114112
def job_manager_root_dir(self, tmp_path):
115113
return tmp_path / "job_mgr_root"
@@ -582,7 +580,6 @@ def start_job(row, connection_provider, connection, **kwargs):
582580
12 * 60 * 60,
583581
"finished",
584582
),
585-
586583
],
587584
)
588585
def test_automatic_cancel_of_too_long_running_jobs(
@@ -672,30 +669,28 @@ def test_status_logging(self, tmp_path, job_manager, job_manager_root_dir, sleep
672669
needle = re.compile(r"Job status histogram:.*'finished': 5.*Run stats:.*'job_queued_for_start': 5")
673670
assert needle.search(caplog.text)
674671

675-
676-
677672
@pytest.mark.parametrize(
678-
["create_time", "start_time", "running_start_time", "end_time", "end_status", "cancel_after_seconds"],
679-
[
680-
# Scenario 1: Missing running_start_time (None)
681-
(
682-
"2024-09-01T09:00:00Z", # Job creation time
683-
"2024-09-01T09:00:00Z", # Job start time (should be 1 hour after create_time)
684-
None, # Missing running_start_time
685-
"2024-09-01T20:00:00Z", # Job end time
686-
"finished", # Job final status
687-
6 * 60 * 60, # Cancel after 6 hours
688-
),
689-
# Scenario 2: NaN running_start_time
690-
(
691-
"2024-09-01T09:00:00Z",
692-
"2024-09-01T09:00:00Z",
693-
float("nan"), # NaN running_start_time
694-
"2024-09-01T20:00:00Z", # Job end time
695-
"finished", # Job final status
696-
6 * 60 * 60, # Cancel after 6 hours
697-
),
698-
]
673+
["create_time", "start_time", "running_start_time", "end_time", "end_status", "cancel_after_seconds"],
674+
[
675+
# Scenario 1: Missing running_start_time (None)
676+
(
677+
"2024-09-01T09:00:00Z", # Job creation time
678+
"2024-09-01T09:00:00Z", # Job start time (should be 1 hour after create_time)
679+
None, # Missing running_start_time
680+
"2024-09-01T20:00:00Z", # Job end time
681+
"finished", # Job final status
682+
6 * 60 * 60, # Cancel after 6 hours
683+
),
684+
# Scenario 2: NaN running_start_time
685+
(
686+
"2024-09-01T09:00:00Z",
687+
"2024-09-01T09:00:00Z",
688+
float("nan"), # NaN running_start_time
689+
"2024-09-01T20:00:00Z", # Job end time
690+
"finished", # Job final status
691+
6 * 60 * 60, # Cancel after 6 hours
692+
),
693+
],
699694
)
700695
def test_ensure_running_start_time_is_datetime(
701696
self,
@@ -726,10 +721,12 @@ def get_status(job_id, current_status):
726721
job_manager.add_backend("foo", connection=dummy_backend_foo.connection)
727722

728723
# Create a DataFrame representing the job database
729-
df = pd.DataFrame({
730-
"year": [2024],
731-
"running_start_time": [running_start_time], # Initial running_start_time
732-
})
724+
df = pd.DataFrame(
725+
{
726+
"year": [2024],
727+
"running_start_time": [running_start_time], # Initial running_start_time
728+
}
729+
)
733730

734731
# Move the time machine to the job creation time
735732
time_machine.move_to(create_time)
@@ -871,6 +868,7 @@ def execute(self):
871868
assert any("Skipping invalid db_update" in msg for msg in caplog.messages)
872869
assert any("Skipping invalid stats_update" in msg for msg in caplog.messages)
873870

871+
874872
JOB_DB_DF_BASICS = pd.DataFrame(
875873
{
876874
"numbers": [3, 2, 1],
@@ -986,7 +984,6 @@ def test_count_by_status(self, tmp_path, db_class):
986984

987985

988986
class TestCsvJobDatabase:
989-
990987
def test_repr(self, tmp_path):
991988
path = tmp_path / "db.csv"
992989
db = CsvJobDatabase(path)
@@ -1153,7 +1150,6 @@ def test_read_with_crs_column(self, tmp_path):
11531150

11541151

11551152
class TestParquetJobDatabase:
1156-
11571153
def test_repr(self, tmp_path):
11581154
path = tmp_path / "db.pq"
11591155
db = ParquetJobDatabase(path)

tests/extra/job_management/test_job_splitting.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
# TODO: using fixtures for these simple objects is a bit overkill, makes the test harder to follow, and undermines opportunity to parameterize
1212

13+
1314
@pytest.fixture
1415
def mock_polygon_wgs():
1516
return shapely.geometry.box(0.0, 0.0, 1.0, 1.0)

tests/extra/job_management/test_stac_job_db.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ def _pystac_item(
105105

106106
class TestSTACAPIJobDatabase:
107107
def test_exists(self, job_db_exists, job_db_not_exists):
108-
109108
assert job_db_exists.exists() == True
110109
assert job_db_not_exists.exists() == False
111110

@@ -431,8 +430,6 @@ def post_bulk_items(request, context):
431430
job_db_exists.persist(bulk_dataframe)
432431
assert post_bulk_items_mock.called
433432

434-
435-
436433
def test_persist_multiple_chunks(self, requests_mock, job_db_exists):
437434
rows = 12
438435
bulk_dataframe = pd.DataFrame(
@@ -572,11 +569,7 @@ def _post_collections_bulk_items(self, request, context):
572569
def _get_search(self, request, context):
573570
"""Handler of `GET /search` requests."""
574571
collections = request.qs["collections"][0].split(",")
575-
items = [
576-
item
577-
for cid in collections
578-
for item in self.items.get(cid, {}).values()
579-
]
572+
items = [item for cid in collections for item in self.items.get(cid, {}).values()]
580573
if "ids" in request.qs:
581574
[ids] = request.qs["ids"]
582575
ids = set(ids.split(","))

tests/extra/job_management/test_thread_worker.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from typing import Iterator
66

77
import pytest
8-
import requests
98

109
from openeo.extra.job_management._thread_worker import (
1110
Task,
@@ -69,7 +68,7 @@ def test_start_failure(self, dummy_backend, caplog):
6968
)
7069
assert job.status() == "error"
7170
assert caplog.messages == [
72-
"Failed to start job 'job-000': OpenEoApiError('[500] Internal: No job starting " "for you, buddy')"
71+
"Failed to start job 'job-000': OpenEoApiError('[500] Internal: No job starting for you, buddy')"
7372
]
7473

7574
@pytest.mark.parametrize("serializer", [repr, str])

0 commit comments

Comments
 (0)