Skip to content

Commit be3f09a

Browse files
committed
Move JobDatabaseInterface to dedicated submodule #714
1 parent 47b1e5b commit be3f09a

File tree

4 files changed

+81
-68
lines changed

4 files changed

+81
-68
lines changed

docs/cookbook/job_manager.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ API
1313
.. autoclass:: openeo.extra.job_management.MultiBackendJobManager
1414
:members:
1515

16-
.. autoclass:: openeo.extra.job_management.JobDatabaseInterface
16+
.. autoclass:: openeo.extra.job_management.job_db.JobDatabaseInterface
1717
:members:
1818

1919
.. autoclass:: openeo.extra.job_management.CsvJobDatabase

openeo/extra/job_management/__init__.py

Lines changed: 6 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
_JobManagerWorkerThreadPool,
3838
_JobStartTask,
3939
)
40+
from openeo.extra.job_management.job_db import JobDatabaseInterface
4041
from openeo.internal.processes.parse import (
4142
Parameter,
4243
Process,
@@ -64,67 +65,6 @@ class _Backend(NamedTuple):
6465
_UNSET = object()
6566

6667

67-
class JobDatabaseInterface(metaclass=abc.ABCMeta):
68-
"""
69-
Interface for a database of job metadata to use with the :py:class:`MultiBackendJobManager`,
70-
allowing to regularly persist the job metadata while polling the job statuses
71-
and resume/restart the job tracking after it was interrupted.
72-
73-
.. versionadded:: 0.31.0
74-
"""
75-
76-
@abc.abstractmethod
77-
def exists(self) -> bool:
78-
"""Does the job database already exist, to read job data from?"""
79-
...
80-
81-
@abc.abstractmethod
82-
def persist(self, df: pd.DataFrame):
83-
"""
84-
Store (now or updated) job data to the database.
85-
86-
The provided dataframe may only cover a subset of all the jobs ("rows") of the whole database,
87-
so it should be merged with the existing data (if any) instead of overwriting it completely.
88-
89-
:param df: job data to store.
90-
"""
91-
...
92-
93-
@abc.abstractmethod
94-
def count_by_status(self, statuses: Iterable[str] = ()) -> dict:
95-
"""
96-
Retrieve the number of jobs per status.
97-
98-
:param statuses: List/set of statuses to include. If empty, all statuses are included.
99-
100-
:return: dictionary with status as key and the count as value.
101-
"""
102-
...
103-
104-
@abc.abstractmethod
105-
def get_by_status(self, statuses: List[str], max=None) -> pd.DataFrame:
106-
"""
107-
Returns a dataframe with jobs, filtered by status.
108-
109-
:param statuses: List of statuses to include.
110-
:param max: Maximum number of jobs to return.
111-
112-
:return: DataFrame with jobs filtered by status.
113-
"""
114-
...
115-
116-
@abc.abstractmethod
117-
def get_by_indices(self, indices: Iterable[Union[int, str]]) -> pd.DataFrame:
118-
"""
119-
Returns a dataframe with jobs based on their (dataframe) index
120-
121-
:param indices: List of indices to include.
122-
123-
:return: DataFrame with jobs filtered by indices.
124-
"""
125-
...
126-
127-
12868
def _start_job_default(row: pd.Series, connection: Connection, *args, **kwargs):
12969
raise NotImplementedError("No 'start_job' callable provided")
13070

@@ -367,7 +307,7 @@ def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabas
367307
:param job_db:
368308
Job database to load/store existing job status data and other metadata from/to.
369309
Can be specified as a path to CSV or Parquet file,
370-
or as a custom database object following the :py:class:`JobDatabaseInterface` interface.
310+
or as a custom database object following the :py:class:`job_db.JobDatabaseInterface` interface.
371311
372312
.. note::
373313
Support for Parquet files depends on the ``pyarrow`` package
@@ -472,7 +412,7 @@ def run_jobs(
472412
:param job_db:
473413
Job database to load/store existing job status data and other metadata from/to.
474414
Can be specified as a path to CSV or Parquet file,
475-
or as a custom database object following the :py:class:`JobDatabaseInterface` interface.
415+
or as a custom database object following the :py:class:`job_db.JobDatabaseInterface` interface.
476416
477417
.. note::
478418
Support for Parquet files depends on the ``pyarrow`` package
@@ -488,7 +428,7 @@ def run_jobs(
488428
.. versionchanged:: 0.31.0
489429
Replace ``output_file`` argument with ``job_db`` argument,
490430
which can be a path to a CSV or Parquet file,
491-
or a user-defined :py:class:`JobDatabaseInterface` object.
431+
or a user-defined :py:class:`job_db.JobDatabaseInterface` object.
492432
The deprecated ``output_file`` argument is still supported for now.
493433
494434
.. versionchanged:: 0.33.0
@@ -998,7 +938,7 @@ class CsvJobDatabase(FullDataFrameJobDatabase):
998938
"""
999939
Persist/load job metadata with a CSV file.
1000940
1001-
:implements: :py:class:`JobDatabaseInterface`
941+
:implements: :py:class:`job_db.JobDatabaseInterface`
1002942
:param path: Path to local CSV file.
1003943
1004944
.. note::
@@ -1053,7 +993,7 @@ class ParquetJobDatabase(FullDataFrameJobDatabase):
1053993
"""
1054994
Persist/load job metadata with a Parquet file.
1055995
1056-
:implements: :py:class:`JobDatabaseInterface`
996+
:implements: :py:class:`job_db.JobDatabaseInterface`
1057997
:param path: Path to the Parquet file.
1058998
1059999
.. note::
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import abc
2+
import logging
3+
from typing import (
4+
Iterable,
5+
List,
6+
Union,
7+
)
8+
9+
import pandas as pd
10+
11+
_log = logging.getLogger(__name__)
12+
13+
14+
class JobDatabaseInterface(metaclass=abc.ABCMeta):
15+
"""
16+
Interface for a database of job metadata to use with the :py:class:`MultiBackendJobManager`,
17+
allowing to regularly persist the job metadata while polling the job statuses
18+
and resume/restart the job tracking after it was interrupted.
19+
20+
.. versionadded:: 0.31.0
21+
"""
22+
23+
@abc.abstractmethod
24+
def exists(self) -> bool:
25+
"""Does the job database already exist, to read job data from?"""
26+
...
27+
28+
@abc.abstractmethod
29+
def persist(self, df: pd.DataFrame):
30+
"""
31+
Store (now or updated) job data to the database.
32+
33+
The provided dataframe may only cover a subset of all the jobs ("rows") of the whole database,
34+
so it should be merged with the existing data (if any) instead of overwriting it completely.
35+
36+
:param df: job data to store.
37+
"""
38+
...
39+
40+
@abc.abstractmethod
41+
def count_by_status(self, statuses: Iterable[str] = ()) -> dict:
42+
"""
43+
Retrieve the number of jobs per status.
44+
45+
:param statuses: List/set of statuses to include. If empty, all statuses are included.
46+
47+
:return: dictionary with status as key and the count as value.
48+
"""
49+
...
50+
51+
@abc.abstractmethod
52+
def get_by_status(self, statuses: List[str], max=None) -> pd.DataFrame:
53+
"""
54+
Returns a dataframe with jobs, filtered by status.
55+
56+
:param statuses: List of statuses to include.
57+
:param max: Maximum number of jobs to return.
58+
59+
:return: DataFrame with jobs filtered by status.
60+
"""
61+
...
62+
63+
@abc.abstractmethod
64+
def get_by_indices(self, indices: Iterable[Union[int, str]]) -> pd.DataFrame:
65+
"""
66+
Returns a dataframe with jobs based on their (dataframe) index
67+
68+
:param indices: List of indices to include.
69+
70+
:return: DataFrame with jobs filtered by indices.
71+
"""
72+
...

openeo/extra/job_management/stac_job_db.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
import requests
1212
from shapely.geometry import mapping, shape
1313

14-
from openeo.extra.job_management import JobDatabaseInterface, MultiBackendJobManager
14+
from openeo.extra.job_management import MultiBackendJobManager
15+
from openeo.extra.job_management.job_db import JobDatabaseInterface
1516

1617
_log = logging.getLogger(__name__)
1718

0 commit comments

Comments
 (0)