Skip to content

Commit 40af9cd

Browse files
committed
Merge branch 'issue635-job-manager-initialize'
2 parents f965ddf + f7d3070 commit 40af9cd

File tree

3 files changed

+301
-28
lines changed

3 files changed

+301
-28
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
### Added
1111

1212
- Added `DataCube.load_stac()` to also support creating a `load_stac` based cube without a connection ([#638](https://github.com/Open-EO/openeo-python-client/issues/638))
13+
- `MultiBackendJobManager`: Added `initialize_from_df(df)` (to `CsvJobDatabase` and `ParquetJobDatabase`) to initialize (and persist) the job database from a given DataFrame.
14+
Also added `create_job_db()` factory to easily create a job database from a given dataframe and its type guessed from filename extension.
15+
([#635](https://github.com/Open-EO/openeo-python-client/issues/635))
16+
17+
1318

1419
### Changed
1520

openeo/extra/job_management.py

Lines changed: 77 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -258,14 +258,15 @@ def _make_resilient(connection):
258258
connection.session.mount("https://", HTTPAdapter(max_retries=retries))
259259
connection.session.mount("http://", HTTPAdapter(max_retries=retries))
260260

261-
def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
262-
"""Ensure we have the required columns and the expected type for the geometry column.
261+
@staticmethod
262+
def _normalize_df(df: pd.DataFrame) -> pd.DataFrame:
263+
"""
264+
Normalize given pandas dataframe (creating a new one):
265+
ensure we have the required columns.
263266
264267
:param df: The dataframe to normalize.
265268
:return: a new dataframe that is normalized.
266269
"""
267-
# TODO: this was originally an internal helper, but we need a clean public API for the user
268-
269270
# check for some required columns.
270271
required_with_default = [
271272
("status", "not_started"),
@@ -440,13 +441,7 @@ def run_jobs(
440441
assert not kwargs, f"Unexpected keyword arguments: {kwargs!r}"
441442

442443
if isinstance(job_db, (str, Path)):
443-
job_db_path = Path(job_db)
444-
if job_db_path.suffix.lower() == ".csv":
445-
job_db = CsvJobDatabase(path=job_db_path)
446-
elif job_db_path.suffix.lower() == ".parquet":
447-
job_db = ParquetJobDatabase(path=job_db_path)
448-
else:
449-
raise ValueError(f"Unsupported job database file type {job_db_path!r}")
444+
job_db = get_job_db(path=job_db)
450445

451446
if not isinstance(job_db, JobDatabaseInterface):
452447
raise ValueError(f"Unsupported job_db {job_db!r}")
@@ -456,8 +451,7 @@ def run_jobs(
456451
_log.info(f"Resuming `run_jobs` from existing {job_db}")
457452
elif df is not None:
458453
# TODO: start showing deprecation warnings for this usage pattern?
459-
df = self._normalize_df(df)
460-
job_db.persist(df)
454+
job_db.initialize_from_df(df)
461455

462456
while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0:
463457
self._job_update_loop(job_db=job_db, start_job=start_job)
@@ -697,6 +691,35 @@ def __init__(self):
697691
super().__init__()
698692
self._df = None
699693

694+
def initialize_from_df(self, df: pd.DataFrame, *, on_exists: str = "error"):
695+
"""
696+
Initialize the job database from a given dataframe,
697+
which will be first normalized to be compatible
698+
with :py:class:`MultiBackendJobManager` usage.
699+
700+
:param df: dataframe with some columns your ``start_job`` callable expects
701+
:param on_exists: what to do when the job database already exists (persisted on disk):
702+
- "error": (default) raise an exception
703+
- "skip": work with existing database, ignore given dataframe and skip any initialization
704+
705+
:return: initialized job database.
706+
707+
.. versionadded:: 0.33.0
708+
"""
709+
# TODO: option to provide custom MultiBackendJobManager subclass with custom normalize?
710+
if self.exists():
711+
if on_exists == "skip":
712+
return self
713+
elif on_exists == "error":
714+
raise FileExistsError(f"Job database {self!r} already exists.")
715+
else:
716+
# TODO handle other on_exists modes: e.g. overwrite, merge, ...
717+
raise ValueError(f"Invalid on_exists={on_exists!r}")
718+
df = MultiBackendJobManager._normalize_df(df)
719+
self.persist(df)
720+
# Return self to allow chaining with constructor.
721+
return self
722+
700723
@property
701724
def df(self) -> pd.DataFrame:
702725
if self._df is None:
@@ -822,3 +845,44 @@ def persist(self, df: pd.DataFrame):
822845
self._merge_into_df(df)
823846
self.path.parent.mkdir(parents=True, exist_ok=True)
824847
self.df.to_parquet(self.path, index=False)
848+
849+
850+
def get_job_db(path: Union[str, Path]) -> JobDatabaseInterface:
851+
"""
852+
Factory to get a job database at a given path,
853+
guessing the database type from filename extension.
854+
855+
:param path: path to job database file.
856+
857+
.. versionadded:: 0.33.0
858+
"""
859+
path = Path(path)
860+
if path.suffix.lower() in {".csv"}:
861+
job_db = CsvJobDatabase(path=path)
862+
elif path.suffix.lower() in {".parquet", ".geoparquet"}:
863+
job_db = ParquetJobDatabase(path=path)
864+
else:
865+
raise ValueError(f"Could not guess job database type from {path!r}")
866+
return job_db
867+
868+
869+
def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = "error"):
870+
"""
871+
Factory to create a job database at given path,
872+
initialized from a given dataframe,
873+
and its database type guessed from filename extension.
874+
875+
:param path: Path to the job database file.
876+
:param df: DataFrame to store in the job database.
877+
:param on_exists: What to do when the job database already exists:
878+
- "error": (default) raise an exception
879+
- "skip": work with existing database, ignore given dataframe and skip any initialization
880+
881+
.. versionadded:: 0.33.0
882+
"""
883+
job_db = get_job_db(path)
884+
if isinstance(job_db, FullDataFrameJobDatabase):
885+
job_db.initialize_from_df(df=df, on_exists=on_exists)
886+
else:
887+
raise NotImplementedError(f"Initialization of {type(job_db)} is not supported.")
888+
return job_db

0 commit comments

Comments
 (0)