Skip to content

Commit 9706528

Browse files
committed
Merge branch 'issue744_download_optional'
2 parents e8f6a21 + d0bb37e commit 9706528

File tree

3 files changed

+51
-9
lines changed

3 files changed

+51
-9
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- `MultiBackendJobManager`: add `download_results` option to enable/disable the automated download of job results once completed by the job manager ([#744](https://github.com/Open-EO/openeo-python-client/issues/744))
13+
1214
### Changed
1315

1416
- Internal reorganization of `openeo.extra.job_management` submodule to ease future development ([#741](https://github.com/Open-EO/openeo-python-client/issues/741))

openeo/extra/job_management/_manager.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,21 @@ def start_job(
157157
- get_error_log_path
158158
- get_job_metadata_path
159159
160+
:param download_results:
161+
Whether to download job results automatically once the job is completed.
162+
160163
:param cancel_running_job_after:
161164
Optional temporal limit (in seconds) after which running jobs should be canceled
162165
by the job manager.
163166
167+
164168
.. versionadded:: 0.14.0
165169
166170
.. versionchanged:: 0.32.0
167171
Added ``cancel_running_job_after`` parameter.
172+
173+
.. versionchanged:: 0.47.0
174+
Added ``download_results`` parameter.
168175
"""
169176

170177
# Expected columns in the job DB dataframes.
@@ -193,6 +200,7 @@ def __init__(
193200
poll_sleep: int = 60,
194201
root_dir: Optional[Union[str, Path]] = ".",
195202
*,
203+
download_results: bool = True,
196204
cancel_running_job_after: Optional[int] = None,
197205
):
198206
"""Create a MultiBackendJobManager."""
@@ -204,6 +212,8 @@ def __init__(
204212
# An explicit None or "" should also default to "."
205213
self._root_dir = Path(root_dir or ".")
206214

215+
self._download_results = download_results
216+
207217
self._cancel_running_job_after = (
208218
datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None
209219
)
@@ -712,16 +722,16 @@ def on_job_done(self, job: BatchJob, row):
712722
:param row: DataFrame row containing the job's metadata.
713723
"""
714724
# TODO: param `row` is never accessed in this method. Remove it? Is this intended for future use?
725+
if self._download_results:
726+
job_metadata = job.describe()
727+
job_dir = self.get_job_dir(job.job_id)
728+
metadata_path = self.get_job_metadata_path(job.job_id)
715729

716-
job_metadata = job.describe()
717-
job_dir = self.get_job_dir(job.job_id)
718-
metadata_path = self.get_job_metadata_path(job.job_id)
719-
720-
self.ensure_job_dir_exists(job.job_id)
721-
job.get_results().download_files(target=job_dir)
730+
self.ensure_job_dir_exists(job.job_id)
731+
job.get_results().download_files(target=job_dir)
722732

723-
with metadata_path.open("w", encoding="utf-8") as f:
724-
json.dump(job_metadata, f, ensure_ascii=False)
733+
with metadata_path.open("w", encoding="utf-8") as f:
734+
json.dump(job_metadata, f, ensure_ascii=False)
725735

726736
def on_job_error(self, job: BatchJob, row):
727737
"""

tests/extra/job_management/test_manager.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
)
4040
from openeo.rest._testing import DummyBackend
4141
from openeo.rest.auth.testing import OidcMock
42-
from openeo.util import rfc3339
42+
from openeo.util import load_json, rfc3339
4343

4444

4545
def _job_id_from_year(process_graph) -> Union[str, None]:
@@ -897,3 +897,33 @@ def test_refresh_bearer_token_before_start(
897897
# Because of proactive+throttled token refreshing,
898898
# we should have 2 additional token requests now
899899
assert len(oidc_mock.grant_request_history) == 4
900+
901+
@pytest.mark.parametrize(
902+
["download_results"],
903+
[
904+
(True,),
905+
(False,),
906+
],
907+
)
908+
def test_download_results_toggle(
909+
self, tmp_path, job_manager_root_dir, dummy_backend_foo, download_results, sleep_mock
910+
):
911+
job_manager = MultiBackendJobManager(root_dir=job_manager_root_dir, download_results=download_results)
912+
job_manager.add_backend("foo", connection=dummy_backend_foo.connection)
913+
914+
df = pd.DataFrame({"year": [2018, 2019]})
915+
job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df)
916+
run_stats = job_manager.run_jobs(job_db=job_db, start_job=self._create_year_job)
917+
assert run_stats == dirty_equals.IsPartialDict({"job finished": 2})
918+
919+
if download_results:
920+
assert (job_manager_root_dir / "job_job-2018/result.data").read_bytes() == DummyBackend.DEFAULT_RESULT
921+
assert load_json(job_manager_root_dir / "job_job-2018/job_job-2018.json") == dirty_equals.IsPartialDict(
922+
id="job-2018", status="finished"
923+
)
924+
assert (job_manager_root_dir / "job_job-2019/result.data").read_bytes() == DummyBackend.DEFAULT_RESULT
925+
assert load_json(job_manager_root_dir / "job_job-2019/job_job-2019.json") == dirty_equals.IsPartialDict(
926+
id="job-2019", status="finished"
927+
)
928+
else:
929+
assert not job_manager_root_dir.exists() or list(job_manager_root_dir.iterdir()) == []

0 commit comments

Comments
 (0)