Skip to content

Commit 7931b61

Browse files
feat(job-manager): add kubernetes_queue option for MultiKueue (reanahub#494)
Also adds monitoring of job status (rather than just pods) to the job_monitor to support MK since the status of remote pods cannot be seen. Closes reanahub#493
1 parent f014442 commit 7931b61

File tree

7 files changed

+151
-33
lines changed

7 files changed

+151
-33
lines changed

docs/openapi.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@
8585
"kubernetes_memory_limit": {
8686
"type": "string"
8787
},
88+
"kubernetes_queue": {
89+
"type": "string"
90+
},
8891
"kubernetes_uid": {
8992
"format": "int32",
9093
"type": "integer"

reana_job_controller/config.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
import os
1313
import secrets
1414

15-
from reana_commons.config import REANA_COMPONENT_PREFIX
16-
1715
from werkzeug.utils import import_string
1816

1917
REANA_DB_CLOSE_POOL_CONNECTIONS = bool(
@@ -177,11 +175,11 @@
177175
SLURM_SSH_AUTH_TIMEOUT = float(os.getenv("SLURM_SSH_AUTH_TIMEOUT", "60"))
178176
"""Seconds to wait for SLURM SSH authentication response."""
179177

180-
USE_KUEUE = bool(strtobool(os.getenv("USE_KUEUE", "False")))
178+
KUEUE_ENABLED = bool(strtobool(os.getenv("KUEUE_ENABLED", "False")))
181179
"""Whether to use Kueue to manage job execution."""
182180

183-
KUEUE_LOCAL_QUEUE_NAME = "local-queue-job"
184-
"""Name of the local queue to be used by Kueue."""
181+
KUEUE_DEFAULT_QUEUE = os.getenv("KUEUE_DEFAULT_QUEUE", "")
182+
"""Name of the default queue to be used by Kueue."""
185183

186184
REANA_USER_ID = os.getenv("REANA_USER_ID")
187185
"""User UUID of the owner of the workflow."""

reana_job_controller/job_db.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,10 @@ def retrieve_job_logs(job_id):
115115

116116

117117
def store_job_logs(job_id, logs):
118-
"""Store job logs.
118+
"""Store job logs in the database.
119119
120120
:param job_id: Internal REANA job ID.
121-
:param logs: Job logs.
121+
:param logs: Job logs to store.
122122
:type job_id: str
123123
:type logs: str
124124
"""

reana_job_controller/job_monitor.py

Lines changed: 108 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717
from kubernetes import client, watch
1818
from reana_commons.config import REANA_RUNTIME_KUBERNETES_NAMESPACE
1919
from reana_commons.k8s.api_client import current_k8s_corev1_api_client
20-
from reana_db.database import Session
21-
from reana_db.models import Job, JobStatus
20+
from reana_db.models import JobStatus
2221

2322
from reana_job_controller.config import (
2423
COMPUTE_BACKENDS,
@@ -32,10 +31,10 @@
3231
C4P_SSH_TIMEOUT,
3332
C4P_SSH_BANNER_TIMEOUT,
3433
C4P_SSH_AUTH_TIMEOUT,
34+
KUEUE_ENABLED,
3535
)
3636

3737
from reana_job_controller.job_db import JOB_DB, store_job_logs, update_job_status
38-
from reana_job_controller.kubernetes_job_manager import KubernetesJobManager
3938
from reana_job_controller.utils import (
4039
SSHClient,
4140
singleton,
@@ -104,7 +103,8 @@ def get_reana_job_id(self, backend_job_id: str) -> str:
104103
remaining_jobs = self._get_remaining_jobs()
105104
return remaining_jobs[backend_job_id]
106105

107-
def get_backend_job_id(self, job_pod):
106+
@staticmethod
107+
def get_backend_job_id(job_pod):
108108
"""Get the backend job id for the backend object.
109109
110110
:param job_pod: Compute backend job object (Kubernetes V1Pod
@@ -115,7 +115,7 @@ def get_backend_job_id(self, job_pod):
115115
"""
116116
return job_pod.metadata.labels["job-name"]
117117

118-
def should_process_job(self, job_pod) -> bool:
118+
def should_process_job_pod(self, job_pod) -> bool:
119119
"""Decide whether the job should be processed or not.
120120
121121
Each job is processed only once, when it reaches a final state (either `failed` or `finished`).
@@ -141,6 +141,27 @@ def should_process_job(self, job_pod) -> bool:
141141

142142
return is_job_in_remaining_jobs and is_job_completed
143143

144+
def should_process_job(self, job) -> bool:
145+
"""Decide whether the job should be processed or not.
146+
147+
Each job is processed only once, when it reaches a final state (either `failed` or `finished`).
148+
149+
:param job: Compute backend job object (Kubernetes V1Job
150+
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Job.md)
151+
"""
152+
remaining_jobs = self._get_remaining_jobs(
153+
statuses_to_skip=[
154+
JobStatus.finished.name,
155+
JobStatus.failed.name,
156+
JobStatus.stopped.name,
157+
]
158+
)
159+
160+
is_job_in_remaining_jobs = job.metadata.name in remaining_jobs
161+
is_job_completed = job.status.succeeded and not job.status.active
162+
163+
return is_job_in_remaining_jobs and is_job_completed
164+
144165
@staticmethod
145166
def _get_job_container_statuses(job_pod):
146167
return (job_pod.status.container_statuses or []) + (
@@ -230,13 +251,77 @@ def get_job_status(self, job_pod) -> Optional[str]:
230251

231252
return status
232253

233-
def watch_jobs(self, job_db, app=None):
234-
"""Open stream connection to k8s apiserver to watch all jobs status.
254+
def watch_job_event_stream(self):
255+
"""
256+
Watch job events from the Kubernetes API.
235257
236-
:param job_db: Dictionary which contains all current jobs.
258+
This method is used when MultiKueue is enabled, since in that case we can't
259+
directly monitor the worker pods as they are remote.
237260
"""
238261
while True:
239262
logging.info("Starting a new stream request to watch Jobs")
263+
264+
try:
265+
w = watch.Watch()
266+
for event in w.stream(
267+
client.BatchV1Api().list_namespaced_job,
268+
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
269+
label_selector=f"reana-run-job-workflow-uuid={self.workflow_uuid}",
270+
):
271+
logging.info(f"New Job event received: {event["type"]}")
272+
273+
job = event["object"]
274+
job_id = job.metadata.name
275+
job_finished = (
276+
job.status.succeeded
277+
and not job.status.active
278+
and not job.status.failed
279+
)
280+
job_status = (
281+
JobStatus.finished.name
282+
if job_finished
283+
else (
284+
JobStatus.failed.name
285+
if job.status.failed
286+
else JobStatus.running.name
287+
)
288+
)
289+
290+
if self.should_process_job(job):
291+
reana_job_id = self.get_reana_job_id(job_id)
292+
293+
if job_status == JobStatus.failed.name:
294+
self.log_disruption(
295+
event["object"].status.conditions, job_id
296+
)
297+
298+
# TODO: Fetch logs from the remote job pod on the remote worker when MultiKueue supports this
299+
logs = self.job_manager_cls.get_logs(job_id)
300+
if logs is not None:
301+
store_job_logs(reana_job_id, logs)
302+
303+
update_job_status(
304+
reana_job_id,
305+
job_status,
306+
)
307+
308+
if JobStatus.should_cleanup_job(job_status):
309+
self.clean_job(job_id)
310+
311+
except client.rest.ApiException as e:
312+
logging.exception(f"Error from Kubernetes API while watching jobs: {e}")
313+
except Exception as e:
314+
logging.error(traceback.format_exc())
315+
logging.error("Unexpected error: {}".format(e))
316+
317+
def watch_pod_event_stream(self):
318+
"""
319+
Watch pod events from the Kubernetes API.
320+
321+
This method is used when MultiKueue is not enabled, since in that case we can
322+
directly monitor the worker pods as they are running on the local cluster.
323+
"""
324+
while True:
240325
try:
241326
w = watch.Watch()
242327
for event in w.stream(
@@ -249,7 +334,7 @@ def watch_jobs(self, job_db, app=None):
249334

250335
# Each job is processed once, when reaching a final state
251336
# (either successfully or not)
252-
if self.should_process_job(job_pod):
337+
if self.should_process_job_pod(job_pod):
253338
job_status = self.get_job_status(job_pod)
254339
backend_job_id = self.get_backend_job_id(job_pod)
255340
reana_job_id = self.get_reana_job_id(backend_job_id)
@@ -276,7 +361,20 @@ def watch_jobs(self, job_db, app=None):
276361
logging.error(traceback.format_exc())
277362
logging.error("Unexpected error: {}".format(e))
278363

279-
def log_disruption(self, conditions, backend_job_id):
364+
def watch_jobs(self, job_db, app=None):
365+
"""Open stream connection to k8s apiserver to watch all jobs status.
366+
367+
:param job_db: Dictionary which contains all current jobs.
368+
"""
369+
# If using MultiKueue, watch jobs instead of pods since worker pods could be
370+
# running on a remote cluster that we can't directly monitor
371+
if KUEUE_ENABLED:
372+
self.watch_job_event_stream()
373+
else:
374+
self.watch_pod_event_stream()
375+
376+
@staticmethod
377+
def log_disruption(conditions, backend_job_id):
280378
"""Log disruption message from Kubernetes event conditions.
281379
282380
Usually it is pod eviction but can be any of https://kubernetes.io/docs/concepts/workloads/pods/disruptions/#pod-disruption-conditions.

reana_job_controller/kubernetes_job_manager.py

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@
5252
REANA_KUBERNETES_JOBS_MEMORY_LIMIT,
5353
REANA_KUBERNETES_JOBS_MAX_USER_MEMORY_LIMIT,
5454
REANA_USER_ID,
55-
USE_KUEUE,
56-
KUEUE_LOCAL_QUEUE_NAME,
55+
KUEUE_ENABLED,
56+
KUEUE_DEFAULT_QUEUE,
5757
)
5858
from reana_job_controller.errors import ComputingBackendSubmissionError
5959
from reana_job_controller.job_manager import JobManager
@@ -67,6 +67,13 @@ class KubernetesJobManager(JobManager):
6767
MAX_NUM_JOB_RESTARTS = 0
6868
"""Maximum number of job restarts in case of internal failures."""
6969

70+
@property
71+
def secrets(self):
72+
"""Get cached secrets if present, otherwise fetch them from k8s."""
73+
if self._secrets is None:
74+
self._secrets = UserSecretsStore.fetch(REANA_USER_ID)
75+
return self._secrets
76+
7077
def __init__(
7178
self,
7279
docker_img=None,
@@ -81,6 +88,7 @@ def __init__(
8188
kerberos=False,
8289
kubernetes_uid=None,
8390
kubernetes_memory_limit=None,
91+
kubernetes_queue=None,
8492
voms_proxy=False,
8593
rucio=False,
8694
kubernetes_job_timeout: Optional[int] = None,
@@ -113,6 +121,8 @@ def __init__(
113121
:type kubernetes_uid: int
114122
:param kubernetes_memory_limit: Memory limit for job container.
115123
:type kubernetes_memory_limit: str
124+
:param kubernetes_queue: If Kueue is enabled of the MultiKueue LocalQueue to send jobs to.
125+
:type kubernetes_queue: str
116126
:param kubernetes_job_timeout: Job timeout in seconds.
117127
:type kubernetes_job_timeout: int
118128
:param voms_proxy: Decides if a voms-proxy certificate should be
@@ -142,41 +152,46 @@ def __init__(
142152
self.rucio = rucio
143153
self.set_user_id(kubernetes_uid)
144154
self.set_memory_limit(kubernetes_memory_limit)
155+
self.kubernetes_queue = kubernetes_queue
145156
self.workflow_uuid = workflow_uuid
146157
self.kubernetes_job_timeout = kubernetes_job_timeout
147158
self._secrets: Optional[UserSecrets] = secrets
148159

149-
@property
150-
def secrets(self):
151-
"""Get cached secrets if present, otherwise fetch them from k8s."""
152-
if self._secrets is None:
153-
self._secrets = UserSecretsStore.fetch(REANA_USER_ID)
154-
return self._secrets
155-
156160
@JobManager.execution_hook
157161
def execute(self):
158162
"""Execute a job in Kubernetes."""
159163
backend_job_id = build_unique_component_name("run-job")
160164

165+
if KUEUE_ENABLED and not (self.kubernetes_queue or KUEUE_DEFAULT_QUEUE):
166+
logging.error(
167+
"Kueue is enabled but no queue name was provided. Please set a KUEUE_DEFAULT_QUEUE or ensure that all jobs set a value for kubernetes_queue in their spec."
168+
)
169+
raise
170+
171+
labels = {
172+
"reana-run-job-workflow-uuid": self.workflow_uuid,
173+
}
174+
175+
if KUEUE_ENABLED:
176+
labels["kueue.x-k8s.io/queue-name"] = (
177+
f"{self.kubernetes_queue or KUEUE_DEFAULT_QUEUE}-job-queue"
178+
)
179+
161180
self.job = {
162181
"kind": "Job",
163182
"apiVersion": "batch/v1",
164183
"metadata": {
165184
"name": backend_job_id,
166185
"namespace": REANA_RUNTIME_KUBERNETES_NAMESPACE,
167-
"labels": (
168-
{"kueue.x-k8s.io/queue-name": KUEUE_LOCAL_QUEUE_NAME}
169-
if USE_KUEUE
170-
else {}
171-
),
186+
"labels": labels,
172187
},
173188
"spec": {
174189
"backoffLimit": KubernetesJobManager.MAX_NUM_JOB_RESTARTS,
175190
"autoSelector": True,
176191
"template": {
177192
"metadata": {
178193
"name": backend_job_id,
179-
"labels": {"reana-run-job-workflow-uuid": self.workflow_uuid},
194+
"labels": labels,
180195
},
181196
"spec": {
182197
"automountServiceAccountToken": False,

reana_job_controller/schemas.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class JobRequest(Schema):
5050
rucio = fields.Bool(required=False)
5151
kubernetes_uid = fields.Int(required=False)
5252
kubernetes_memory_limit = fields.Str(required=False)
53+
kubernetes_queue = fields.Str(required=False)
5354
kubernetes_job_timeout = fields.Int(required=False)
5455
unpacked_img = fields.Bool(required=False)
5556
htcondor_max_runtime = fields.Str(required=False)

tests/test_job_monitor.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,10 @@ def test_kubernetes_should_process_job(
107107
"Succeeded", "Completed", job_id=backend_job_id
108108
)
109109

110-
assert bool(job_monitor_k8s.should_process_job(job_pod_event)) == should_process
110+
assert (
111+
bool(job_monitor_k8s.should_process_job_pod(job_pod_event))
112+
== should_process
113+
)
111114

112115

113116
@pytest.mark.parametrize(

0 commit comments

Comments
 (0)