Skip to content

Commit ed28aaa

Browse files
committed
PR #736 no need to split up db_updates in _process_threadworker_updates
is not necessary and eliminates `job_db.read()` which is not part of JobDatabaseInterface
1 parent 5e5de21 commit ed28aaa

File tree

2 files changed

+3
-14
lines changed

2 files changed

+3
-14
lines changed

openeo/extra/job_management/__init__.py

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -710,17 +710,9 @@ def _process_threadworker_updates(
710710
# Build DataFrame of updates indexed by df_idx
711711
df_updates = pd.DataFrame(updates).set_index("df_idx", drop=True)
712712

713-
# Determine which rows to upsert
714-
existing_indices = set(df_updates.index).intersection(job_db.read().index)
715-
if existing_indices:
716-
df_upsert = df_updates.loc[sorted(existing_indices)]
717-
job_db.persist(df_upsert)
718-
stats["job_db persist"] = stats.get("job_db persist", 0) + 1
719-
720-
# Any df_idx not in original index are errors
721-
missing = set(df_updates.index) - existing_indices
722-
if missing:
723-
_log.error(f"Skipping non-existing dataframe indices: {sorted(missing)}")
713+
job_db.persist(df_updates)
714+
stats["job_db persist"] = stats.get("job_db persist", 0) + 1
715+
724716

725717
def on_job_done(self, job: BatchJob, row):
726718
"""

tests/extra/job_management/test_job_management.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -786,9 +786,6 @@ def test_process_threadworker_updates(self, tmp_path, caplog):
786786
assert stats.get("queued", 0) == 2
787787
assert stats["job_db persist"] == 1
788788

789-
# Assert error log for invalid index
790-
assert any("Skipping non-existing dataframe indices" in msg for msg in caplog.messages)
791-
792789
def test_no_results_leaves_db_and_stats_untouched(self, tmp_path, caplog):
793790
pool = _JobManagerWorkerThreadPool(max_workers=2)
794791
stats = collections.defaultdict(int)

0 commit comments

Comments
 (0)