Skip to content

Commit 64cafcf

Browse files
committed
Issue #645 introduce returning event stats from MultiBackendJobManager.run_jobs
1 parent ff8b553 commit 64cafcf

File tree

3 files changed

+106
-25
lines changed

3 files changed

+106
-25
lines changed

CHANGELOG.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313
- `MultiBackendJobManager`: Added `initialize_from_df(df)` (to `CsvJobDatabase` and `ParquetJobDatabase`) to initialize (and persist) the job database from a given DataFrame.
1414
Also added `create_job_db()` factory to easily create a job database from a given dataframe and its type guessed from filename extension.
1515
([#635](https://github.com/Open-EO/openeo-python-client/issues/635))
16-
17-
16+
- `MultiBackendJobManager.run_jobs()` now returns a dictionary with counters/stats about various events during the job run ([#645](https://github.com/Open-EO/openeo-python-client/issues/645))
1817

1918
### Changed
2019

openeo/extra/job_management.py

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import abc
2+
import collections
23
import contextlib
34
import datetime
45
import functools
@@ -380,7 +381,7 @@ def run_jobs(
380381
start_job: Callable[[], BatchJob] = _start_job_default,
381382
job_db: Union[str, Path, JobDatabaseInterface, None] = None,
382383
**kwargs,
383-
):
384+
) -> dict:
384385
"""Runs jobs, specified in a dataframe, and tracks parameters.
385386
386387
:param df:
@@ -422,6 +423,10 @@ def run_jobs(
422423
Support for Parquet files depends on the ``pyarrow`` package
423424
as :ref:`optional dependency <installation-optional-dependencies>`.
424425
426+
:return: dictionary with stats collected during the job running loop.
427+
Note that the set of fields in this dictionary is experimental
428+
and subject to change
429+
425430
.. versionchanged:: 0.31.0
426431
Added support for persisting the job metadata in Parquet format.
427432
@@ -430,6 +435,9 @@ def run_jobs(
430435
which can be a path to a CSV or Parquet file,
431436
or a user-defined :py:class:`JobDatabaseInterface` object.
432437
The deprecated ``output_file`` argument is still supported for now.
438+
439+
.. versionchanged:: 0.33.0
440+
return a stats dictionary
433441
"""
434442
# TODO: Defining start_jobs as a Protocol might make its usage more clear, and avoid complicated doctrings,
435443
# but Protocols are only supported in Python 3.8 and higher.
@@ -457,23 +465,35 @@ def run_jobs(
457465
# TODO: start showing deprecation warnings for this usage pattern?
458466
job_db.initialize_from_df(df)
459467

468+
stats = collections.defaultdict(int)
460469
while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0:
461-
self._job_update_loop(job_db=job_db, start_job=start_job)
470+
self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats)
471+
stats["run_jobs loop"] += 1
472+
462473
time.sleep(self.poll_sleep)
474+
stats["sleep"] += 1
463475

464-
def _job_update_loop(self, job_db: JobDatabaseInterface, start_job: Callable[[], BatchJob]):
476+
return stats
477+
478+
def _job_update_loop(
479+
self, job_db: JobDatabaseInterface, start_job: Callable[[], BatchJob], stats: Optional[dict] = None
480+
):
465481
"""
466482
Inner loop logic of job management:
467483
go through the necessary jobs to check for status updates,
468484
trigger status events, start new jobs when there is room for them, etc.
469485
"""
486+
stats = stats if stats is not None else collections.defaultdict(int)
487+
470488
with ignore_connection_errors(context="get statuses"):
471-
self._track_statuses(job_db)
489+
self._track_statuses(job_db, stats=stats)
490+
stats["track_statuses"] += 1
472491

473492
not_started = job_db.get_by_status(statuses=["not_started"], max=200)
474493
if len(not_started) > 0:
475494
# Check number of jobs running at each backend
476495
running = job_db.get_by_status(statuses=["created", "queued", "running"])
496+
stats["job_db get_by_status"] += 1
477497
per_backend = running.groupby("backend_name").size().to_dict()
478498
_log.info(f"Running per backend: {per_backend}")
479499
for backend_name in self.backends:
@@ -482,10 +502,13 @@ def _job_update_loop(self, job_db: JobDatabaseInterface, start_job: Callable[[],
482502
to_add = self.backends[backend_name].parallel_jobs - backend_load
483503
to_launch = not_started.iloc[0:to_add]
484504
for i in to_launch.index:
485-
self._launch_job(start_job, not_started, i, backend_name)
505+
self._launch_job(start_job, df=not_started, i=i, backend_name=backend_name, stats=stats)
506+
stats["job launch"] += 1
507+
486508
job_db.persist(to_launch)
509+
stats["job_db persist"] += 1
487510

488-
def _launch_job(self, start_job, df, i, backend_name):
511+
def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = None):
489512
"""Helper method for launching jobs
490513
491514
:param start_job:
@@ -508,13 +531,15 @@ def _launch_job(self, start_job, df, i, backend_name):
508531
:param backend_name:
509532
name of the backend that will execute the job.
510533
"""
534+
stats = stats if stats is not None else collections.defaultdict(int)
511535

512536
df.loc[i, "backend_name"] = backend_name
513537
row = df.loc[i]
514538
try:
515539
_log.info(f"Starting job on backend {backend_name} for {row.to_dict()}")
516540
connection = self._get_connection(backend_name, resilient=True)
517541

542+
stats["start_job call"] += 1
518543
job = start_job(
519544
row=row,
520545
connection_provider=self._get_connection,
@@ -524,23 +549,30 @@ def _launch_job(self, start_job, df, i, backend_name):
524549
except requests.exceptions.ConnectionError as e:
525550
_log.warning(f"Failed to start job for {row.to_dict()}", exc_info=True)
526551
df.loc[i, "status"] = "start_failed"
552+
stats["start_job error"] += 1
527553
else:
528554
df.loc[i, "start_time"] = rfc3339.utcnow()
529555
if job:
530556
df.loc[i, "id"] = job.job_id
531557
with ignore_connection_errors(context="get status"):
532558
status = job.status()
559+
stats["job get status"] += 1
533560
df.loc[i, "status"] = status
534561
if status == "created":
535562
# start job if not yet done by callback
536563
try:
537564
job.start()
565+
stats["job start"] += 1
538566
df.loc[i, "status"] = job.status()
567+
stats["job get status"] += 1
539568
except OpenEoApiError as e:
540569
_log.error(e)
541570
df.loc[i, "status"] = "start_failed"
571+
stats["job start error"] += 1
542572
else:
573+
# TODO: what is this "skipping" about actually?
543574
df.loc[i, "status"] = "skipped"
575+
stats["start_job skipped"] += 1
544576

545577
def on_job_done(self, job: BatchJob, row):
546578
"""
@@ -623,11 +655,13 @@ def ensure_job_dir_exists(self, job_id: str) -> Path:
623655
if not job_dir.exists():
624656
job_dir.mkdir(parents=True)
625657

626-
def _track_statuses(self, job_db: JobDatabaseInterface):
658+
def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = None):
627659
"""
628660
Tracks status (and stats) of running jobs (in place).
629661
Optionally cancels jobs when running too long.
630662
"""
663+
stats = stats if stats is not None else collections.defaultdict(int)
664+
631665
active = job_db.get_by_status(statuses=["created", "queued", "running"])
632666
for i in active.index:
633667
job_id = active.loc[i, "id"]
@@ -638,22 +672,27 @@ def _track_statuses(self, job_db: JobDatabaseInterface):
638672
con = self._get_connection(backend_name)
639673
the_job = con.job(job_id)
640674
job_metadata = the_job.describe()
675+
stats["job describe"] += 1
641676
new_status = job_metadata["status"]
642677

643678
_log.info(
644679
f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r} (previously {previous_status!r})"
645680
)
646681

647682
if new_status == "finished":
683+
stats["job finished"] += 1
648684
self.on_job_done(the_job, active.loc[i])
649685

650686
if previous_status != "error" and new_status == "error":
687+
stats["job failed"] += 1
651688
self.on_job_error(the_job, active.loc[i])
652689

653690
if previous_status in {"created", "queued"} and new_status == "running":
691+
stats["job started running"] += 1
654692
active.loc[i, "running_start_time"] = rfc3339.utcnow()
655693

656694
if new_status == "canceled":
695+
stats["job canceled"] += 1
657696
self.on_job_cancel(the_job, active.loc[i])
658697

659698
if self._cancel_running_job_after and new_status == "running":
@@ -667,10 +706,14 @@ def _track_statuses(self, job_db: JobDatabaseInterface):
667706
active.loc[i, key] = _format_usage_stat(job_metadata, key)
668707

669708
except OpenEoApiError as e:
709+
stats["job tracking error"] += 1
670710
print(f"error for job {job_id!r} on backend {backend_name}")
671711
print(e)
712+
713+
stats["job_db persist"] += 1
672714
job_db.persist(active)
673715

716+
674717
def _format_usage_stat(job_metadata: dict, field: str) -> str:
675718
value = deep_get(job_metadata, "usage", field, "value", default=0)
676719
unit = deep_get(job_metadata, "usage", field, "unit", default="")
@@ -977,6 +1020,3 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob:
9771020
def __call__(self, *arg, **kwargs) -> BatchJob:
9781021
"""Syntactic sugar for calling `start_job` directly."""
9791022
return self.start_job(*arg, **kwargs)
980-
981-
982-

tests/extra/test_job_management.py

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
CsvJobDatabase,
3030
MultiBackendJobManager,
3131
ParquetJobDatabase,
32+
UDPJobFactory,
3233
create_job_db,
3334
get_job_db,
34-
UDPJobFactory,
3535
)
3636
from openeo.rest._testing import OPENEO_BACKEND, DummyBackend, build_capabilities
3737
from openeo.util import rfc3339
@@ -113,8 +113,17 @@ def start_job(row, connection, **kwargs):
113113
year = int(row["year"])
114114
return BatchJob(job_id=f"job-{year}", connection=connection)
115115

116-
manager.run_jobs(df=df, start_job=start_job, output_file=output_file)
117-
assert sleep_mock.call_count > 10
116+
run_stats = manager.run_jobs(df=df, start_job=start_job, output_file=output_file)
117+
assert run_stats == dirty_equals.IsPartialDict(
118+
{
119+
"sleep": dirty_equals.IsInt(gt=10),
120+
"start_job call": 7, # TODO?
121+
"job started running": 5,
122+
"job finished": 5,
123+
"job_db persist": dirty_equals.IsInt(gt=5),
124+
"run_jobs loop": dirty_equals.IsInt(gt=5),
125+
}
126+
)
118127

119128
result = pd.read_csv(output_file)
120129
assert len(result) == 5
@@ -148,8 +157,17 @@ def start_job(row, connection, **kwargs):
148157

149158
job_db = CsvJobDatabase(output_file).initialize_from_df(df)
150159

151-
manager.run_jobs(job_db=job_db, start_job=start_job)
152-
assert sleep_mock.call_count > 10
160+
run_stats = manager.run_jobs(job_db=job_db, start_job=start_job)
161+
assert run_stats == dirty_equals.IsPartialDict(
162+
{
163+
"sleep": dirty_equals.IsInt(gt=10),
164+
"start_job call": 7, # TODO?
165+
"job started running": 5,
166+
"job finished": 5,
167+
"job_db persist": dirty_equals.IsInt(gt=5),
168+
"run_jobs loop": dirty_equals.IsInt(gt=5),
169+
}
170+
)
153171

154172
result = pd.read_csv(output_file)
155173
assert len(result) == 5
@@ -176,8 +194,14 @@ def start_job(row, connection, **kwargs):
176194
output_file = tmp_path / "jobs.db"
177195
job_db = db_class(output_file).initialize_from_df(df)
178196

179-
manager.run_jobs(job_db=job_db, start_job=start_job)
180-
assert sleep_mock.call_count > 10
197+
run_stats = manager.run_jobs(job_db=job_db, start_job=start_job)
198+
assert run_stats == dirty_equals.IsPartialDict(
199+
{
200+
"start_job call": 7, # TODO?
201+
"job finished": 5,
202+
"job_db persist": dirty_equals.IsInt(gt=5),
203+
}
204+
)
181205

182206
result = job_db.read()
183207
assert len(result) == 5
@@ -205,8 +229,14 @@ def start_job(row, connection, **kwargs):
205229
output_file = tmp_path / filename
206230
job_db = create_job_db(path=output_file, df=df)
207231

208-
manager.run_jobs(job_db=job_db, start_job=start_job)
209-
assert sleep_mock.call_count > 10
232+
run_stats = manager.run_jobs(job_db=job_db, start_job=start_job)
233+
assert run_stats == dirty_equals.IsPartialDict(
234+
{
235+
"start_job call": 7, # TODO?
236+
"job finished": 5,
237+
"job_db persist": dirty_equals.IsInt(gt=5),
238+
}
239+
)
210240

211241
result = job_db.read()
212242
assert len(result) == 5
@@ -235,6 +265,7 @@ def start_job(row, connection, **kwargs):
235265
# Trigger context switch to job thread
236266
sleep(1)
237267
manager.stop_job_thread()
268+
# TODO #645 how to collect stats with the threaded run_job?
238269
assert sleep_mock.call_count > 10
239270

240271
result = pd.read_csv(output_file)
@@ -543,8 +574,12 @@ def start_job(row, connection_provider, connection, **kwargs):
543574

544575
output_file = tmp_path / "jobs.csv"
545576

546-
manager.run_jobs(df=df, start_job=start_job, output_file=output_file)
547-
assert sleep_mock.call_count > 3
577+
run_stats = manager.run_jobs(df=df, start_job=start_job, output_file=output_file)
578+
assert run_stats == dirty_equals.IsPartialDict(
579+
{
580+
"start_job call": 1,
581+
}
582+
)
548583

549584
# Sanity check: the job succeeded
550585
result = pd.read_csv(output_file)
@@ -615,6 +650,7 @@ def start_job(row, connection_provider, connection, **kwargs):
615650
with pytest.raises(requests.exceptions.RetryError) as exc:
616651
manager.run_jobs(df=df, start_job=start_job, output_file=output_file)
617652

653+
# TODO #645 how to still check stats when run_jobs raised exception?
618654
assert sleep_mock.call_count > 3
619655

620656
# Sanity check: the job has status "error"
@@ -1103,8 +1139,14 @@ def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job
11031139
# TODO #636 avoid this cumbersome pattern using private _normalize_df API
11041140
job_db.persist(job_manager._normalize_df(df))
11051141

1106-
job_manager.run_jobs(job_db=job_db, start_job=job_starter)
1107-
assert sleep_mock.call_count > 0
1142+
stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter)
1143+
assert stats == dirty_equals.IsPartialDict(
1144+
{
1145+
"sleep": dirty_equals.IsInt(gt=1),
1146+
"start_job call": 3,
1147+
"job start": 3,
1148+
}
1149+
)
11081150

11091151
result = job_db.read()
11101152
assert set(result.status) == {"finished"}

0 commit comments

Comments
 (0)