Skip to content

Commit 9d99e52

Browse files
feat(job-manager): add kubernetes_queue option for MultiKueue (#494)
Also adds monitoring of job status (rather than just pods) to the job_monitor to support MK since the status of remote pods cannot be seen. Closes #493
1 parent f014442 commit 9d99e52

File tree

7 files changed

+167
-63
lines changed

7 files changed

+167
-63
lines changed

docs/openapi.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@
8585
"kubernetes_memory_limit": {
8686
"type": "string"
8787
},
88+
"kubernetes_queue": {
89+
"type": "string"
90+
},
8891
"kubernetes_uid": {
8992
"format": "int32",
9093
"type": "integer"

reana_job_controller/config.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
import os
1313
import secrets
1414

15-
from reana_commons.config import REANA_COMPONENT_PREFIX
16-
1715
from werkzeug.utils import import_string
1816

1917
REANA_DB_CLOSE_POOL_CONNECTIONS = bool(
@@ -177,11 +175,11 @@
177175
SLURM_SSH_AUTH_TIMEOUT = float(os.getenv("SLURM_SSH_AUTH_TIMEOUT", "60"))
178176
"""Seconds to wait for SLURM SSH authentication response."""
179177

180-
USE_KUEUE = bool(strtobool(os.getenv("USE_KUEUE", "False")))
178+
KUEUE_ENABLED = bool(strtobool(os.getenv("KUEUE_ENABLED", "False")))
181179
"""Whether to use Kueue to manage job execution."""
182180

183-
KUEUE_LOCAL_QUEUE_NAME = "local-queue-job"
184-
"""Name of the local queue to be used by Kueue."""
181+
KUEUE_DEFAULT_QUEUE = os.getenv("KUEUE_DEFAULT_QUEUE", "")
182+
"""Name of the default queue to be used by Kueue."""
185183

186184
REANA_USER_ID = os.getenv("REANA_USER_ID")
187185
"""User UUID of the owner of the workflow."""

reana_job_controller/job_db.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,10 @@ def retrieve_job_logs(job_id):
115115

116116

117117
def store_job_logs(job_id, logs):
118-
"""Store job logs.
118+
"""Store job logs in the database.
119119
120120
:param job_id: Internal REANA job ID.
121-
:param logs: Job logs.
121+
:param logs: Job logs to store.
122122
:type job_id: str
123123
:type logs: str
124124
"""

reana_job_controller/job_monitor.py

Lines changed: 124 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717
from kubernetes import client, watch
1818
from reana_commons.config import REANA_RUNTIME_KUBERNETES_NAMESPACE
1919
from reana_commons.k8s.api_client import current_k8s_corev1_api_client
20-
from reana_db.database import Session
21-
from reana_db.models import Job, JobStatus
20+
from reana_db.models import JobStatus
2221

2322
from reana_job_controller.config import (
2423
COMPUTE_BACKENDS,
@@ -32,10 +31,10 @@
3231
C4P_SSH_TIMEOUT,
3332
C4P_SSH_BANNER_TIMEOUT,
3433
C4P_SSH_AUTH_TIMEOUT,
34+
KUEUE_ENABLED,
3535
)
3636

3737
from reana_job_controller.job_db import JOB_DB, store_job_logs, update_job_status
38-
from reana_job_controller.kubernetes_job_manager import KubernetesJobManager
3938
from reana_job_controller.utils import (
4039
SSHClient,
4140
singleton,
@@ -104,7 +103,8 @@ def get_reana_job_id(self, backend_job_id: str) -> str:
104103
remaining_jobs = self._get_remaining_jobs()
105104
return remaining_jobs[backend_job_id]
106105

107-
def get_backend_job_id(self, job_pod):
106+
@staticmethod
107+
def get_backend_job_id(job_pod):
108108
"""Get the backend job id for the backend object.
109109
110110
:param job_pod: Compute backend job object (Kubernetes V1Pod
@@ -115,7 +115,7 @@ def get_backend_job_id(self, job_pod):
115115
"""
116116
return job_pod.metadata.labels["job-name"]
117117

118-
def should_process_job(self, job_pod) -> bool:
118+
def should_process_job_pod(self, job_pod) -> bool:
119119
"""Decide whether the job should be processed or not.
120120
121121
Each job is processed only once, when it reaches a final state (either `failed` or `finished`).
@@ -141,6 +141,27 @@ def should_process_job(self, job_pod) -> bool:
141141

142142
return is_job_in_remaining_jobs and is_job_completed
143143

144+
def should_process_job(self, job) -> bool:
145+
"""Decide whether the job should be processed or not.
146+
147+
Each job is processed only once, when it reaches a final state (either `failed` or `finished`).
148+
149+
:param job: Compute backend job object (Kubernetes V1Job
150+
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Job.md)
151+
"""
152+
remaining_jobs = self._get_remaining_jobs(
153+
statuses_to_skip=[
154+
JobStatus.finished.name,
155+
JobStatus.failed.name,
156+
JobStatus.stopped.name,
157+
]
158+
)
159+
160+
is_job_in_remaining_jobs = job.metadata.name in remaining_jobs
161+
is_job_completed = job.status.succeeded and not job.status.active
162+
163+
return is_job_in_remaining_jobs and is_job_completed
164+
144165
@staticmethod
145166
def _get_job_container_statuses(job_pod):
146167
return (job_pod.status.container_statuses or []) + (
@@ -235,48 +256,111 @@ def watch_jobs(self, job_db, app=None):
235256
236257
:param job_db: Dictionary which contains all current jobs.
237258
"""
238-
while True:
239-
logging.info("Starting a new stream request to watch Jobs")
240-
try:
241-
w = watch.Watch()
242-
for event in w.stream(
243-
current_k8s_corev1_api_client.list_namespaced_pod,
244-
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
245-
label_selector=f"reana-run-job-workflow-uuid={self.workflow_uuid}",
246-
):
247-
logging.info("New Pod event received: {0}".format(event["type"]))
248-
job_pod = event["object"]
249-
250-
# Each job is processed once, when reaching a final state
251-
# (either successfully or not)
252-
if self.should_process_job(job_pod):
253-
job_status = self.get_job_status(job_pod)
254-
backend_job_id = self.get_backend_job_id(job_pod)
255-
reana_job_id = self.get_reana_job_id(backend_job_id)
259+
# If using MultiKueue, watch jobs instead of pods since worker pods could be
260+
# running on a remote cluster that we can't directly monitor
261+
if KUEUE_ENABLED:
262+
while True:
263+
logging.info("Starting a new stream request to watch Jobs")
256264

257-
logs = self.job_manager_cls.get_logs(
258-
backend_job_id, job_pod=job_pod
265+
try:
266+
w = watch.Watch()
267+
for event in w.stream(
268+
client.BatchV1Api().list_namespaced_job,
269+
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
270+
label_selector=f"reana-run-job-workflow-uuid={self.workflow_uuid}",
271+
):
272+
logging.info(f"New Job event received: {event["type"]}")
273+
274+
job = event["object"]
275+
job_id = job.metadata.name
276+
job_finished = (
277+
job.status.succeeded
278+
and not job.status.active
279+
and not job.status.failed
259280
)
281+
job_status = (
282+
JobStatus.finished.name
283+
if job_finished
284+
else (
285+
JobStatus.failed.name
286+
if job.status.failed
287+
else JobStatus.running.name
288+
)
289+
)
290+
291+
if self.should_process_job(job):
292+
reana_job_id = self.get_reana_job_id(job_id)
260293

261-
if job_status == JobStatus.failed.name:
262-
self.log_disruption(
263-
event["object"].status.conditions, backend_job_id
294+
if job_status == JobStatus.failed.name:
295+
self.log_disruption(
296+
event["object"].status.conditions, job_id
297+
)
298+
299+
# TODO: Fetch logs from the remote job pod on the remote worker when MultiKueue supports this
300+
logs = self.job_manager_cls.get_logs(job_id)
301+
if logs is not None:
302+
store_job_logs(reana_job_id, logs)
303+
304+
update_job_status(
305+
reana_job_id,
306+
job_status,
264307
)
265308

266-
store_job_logs(reana_job_id, logs)
267-
update_job_status(reana_job_id, job_status)
309+
if JobStatus.should_cleanup_job(job_status):
310+
self.clean_job(job_id)
268311

269-
if JobStatus.should_cleanup_job(job_status):
270-
self.clean_job(backend_job_id)
271-
except client.rest.ApiException as e:
272-
logging.exception(
273-
f"Error from Kubernetes API while watching jobs pods: {e}"
274-
)
275-
except Exception as e:
276-
logging.error(traceback.format_exc())
277-
logging.error("Unexpected error: {}".format(e))
312+
except client.rest.ApiException as e:
313+
logging.exception(
314+
f"Error from Kubernetes API while watching jobs: {e}"
315+
)
316+
except Exception as e:
317+
logging.error(traceback.format_exc())
318+
logging.error("Unexpected error: {}".format(e))
319+
else:
320+
while True:
321+
try:
322+
w = watch.Watch()
323+
for event in w.stream(
324+
current_k8s_corev1_api_client.list_namespaced_pod,
325+
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
326+
label_selector=f"reana-run-job-workflow-uuid={self.workflow_uuid}",
327+
):
328+
logging.info(
329+
"New Pod event received: {0}".format(event["type"])
330+
)
331+
job_pod = event["object"]
332+
333+
# Each job is processed once, when reaching a final state
334+
# (either successfully or not)
335+
if self.should_process_job_pod(job_pod):
336+
job_status = self.get_job_status(job_pod)
337+
backend_job_id = self.get_backend_job_id(job_pod)
338+
reana_job_id = self.get_reana_job_id(backend_job_id)
339+
340+
logs = self.job_manager_cls.get_logs(
341+
backend_job_id, job_pod=job_pod
342+
)
343+
344+
if job_status == JobStatus.failed.name:
345+
self.log_disruption(
346+
event["object"].status.conditions, backend_job_id
347+
)
278348

279-
def log_disruption(self, conditions, backend_job_id):
349+
store_job_logs(reana_job_id, logs)
350+
update_job_status(reana_job_id, job_status)
351+
352+
if JobStatus.should_cleanup_job(job_status):
353+
self.clean_job(backend_job_id)
354+
except client.rest.ApiException as e:
355+
logging.exception(
356+
f"Error from Kubernetes API while watching jobs pods: {e}"
357+
)
358+
except Exception as e:
359+
logging.error(traceback.format_exc())
360+
logging.error("Unexpected error: {}".format(e))
361+
362+
@staticmethod
363+
def log_disruption(conditions, backend_job_id):
280364
"""Log disruption message from Kubernetes event conditions.
281365
282366
Usually it is pod eviction but can be any of https://kubernetes.io/docs/concepts/workloads/pods/disruptions/#pod-disruption-conditions.

reana_job_controller/kubernetes_job_manager.py

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@
5252
REANA_KUBERNETES_JOBS_MEMORY_LIMIT,
5353
REANA_KUBERNETES_JOBS_MAX_USER_MEMORY_LIMIT,
5454
REANA_USER_ID,
55-
USE_KUEUE,
56-
KUEUE_LOCAL_QUEUE_NAME,
55+
KUEUE_ENABLED,
56+
KUEUE_DEFAULT_QUEUE,
5757
)
5858
from reana_job_controller.errors import ComputingBackendSubmissionError
5959
from reana_job_controller.job_manager import JobManager
@@ -67,6 +67,13 @@ class KubernetesJobManager(JobManager):
6767
MAX_NUM_JOB_RESTARTS = 0
6868
"""Maximum number of job restarts in case of internal failures."""
6969

70+
@property
71+
def secrets(self):
72+
"""Get cached secrets if present, otherwise fetch them from k8s."""
73+
if self._secrets is None:
74+
self._secrets = UserSecretsStore.fetch(REANA_USER_ID)
75+
return self._secrets
76+
7077
def __init__(
7178
self,
7279
docker_img=None,
@@ -81,6 +88,7 @@ def __init__(
8188
kerberos=False,
8289
kubernetes_uid=None,
8390
kubernetes_memory_limit=None,
91+
kubernetes_queue=None,
8492
voms_proxy=False,
8593
rucio=False,
8694
kubernetes_job_timeout: Optional[int] = None,
@@ -113,6 +121,8 @@ def __init__(
113121
:type kubernetes_uid: int
114122
:param kubernetes_memory_limit: Memory limit for job container.
115123
:type kubernetes_memory_limit: str
124+
:param kubernetes_queue: If Kueue is enabled of the MultiKueue LocalQueue to send jobs to.
125+
:type kubernetes_queue: str
116126
:param kubernetes_job_timeout: Job timeout in seconds.
117127
:type kubernetes_job_timeout: int
118128
:param voms_proxy: Decides if a voms-proxy certificate should be
@@ -142,41 +152,46 @@ def __init__(
142152
self.rucio = rucio
143153
self.set_user_id(kubernetes_uid)
144154
self.set_memory_limit(kubernetes_memory_limit)
155+
self.kubernetes_queue = kubernetes_queue
145156
self.workflow_uuid = workflow_uuid
146157
self.kubernetes_job_timeout = kubernetes_job_timeout
147158
self._secrets: Optional[UserSecrets] = secrets
148159

149-
@property
150-
def secrets(self):
151-
"""Get cached secrets if present, otherwise fetch them from k8s."""
152-
if self._secrets is None:
153-
self._secrets = UserSecretsStore.fetch(REANA_USER_ID)
154-
return self._secrets
155-
156160
@JobManager.execution_hook
157161
def execute(self):
158162
"""Execute a job in Kubernetes."""
159163
backend_job_id = build_unique_component_name("run-job")
160164

165+
if KUEUE_ENABLED and not (self.kubernetes_queue or KUEUE_DEFAULT_QUEUE):
166+
logging.error(
167+
"Kueue is enabled but no queue name was provided. Please set a KUEUE_DEFAULT_QUEUE or ensure that all jobs set a value for kubernetes_queue in their spec."
168+
)
169+
raise
170+
171+
labels = {
172+
"reana-run-job-workflow-uuid": self.workflow_uuid,
173+
}
174+
175+
if KUEUE_ENABLED:
176+
labels["kueue.x-k8s.io/queue-name"] = (
177+
f"{self.kubernetes_queue or KUEUE_DEFAULT_QUEUE}-job-queue"
178+
)
179+
161180
self.job = {
162181
"kind": "Job",
163182
"apiVersion": "batch/v1",
164183
"metadata": {
165184
"name": backend_job_id,
166185
"namespace": REANA_RUNTIME_KUBERNETES_NAMESPACE,
167-
"labels": (
168-
{"kueue.x-k8s.io/queue-name": KUEUE_LOCAL_QUEUE_NAME}
169-
if USE_KUEUE
170-
else {}
171-
),
186+
"labels": labels,
172187
},
173188
"spec": {
174189
"backoffLimit": KubernetesJobManager.MAX_NUM_JOB_RESTARTS,
175190
"autoSelector": True,
176191
"template": {
177192
"metadata": {
178193
"name": backend_job_id,
179-
"labels": {"reana-run-job-workflow-uuid": self.workflow_uuid},
194+
"labels": labels,
180195
},
181196
"spec": {
182197
"automountServiceAccountToken": False,

reana_job_controller/schemas.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class JobRequest(Schema):
5050
rucio = fields.Bool(required=False)
5151
kubernetes_uid = fields.Int(required=False)
5252
kubernetes_memory_limit = fields.Str(required=False)
53+
kubernetes_queue = fields.Str(required=False)
5354
kubernetes_job_timeout = fields.Int(required=False)
5455
unpacked_img = fields.Bool(required=False)
5556
htcondor_max_runtime = fields.Str(required=False)

tests/test_job_monitor.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,10 @@ def test_kubernetes_should_process_job(
107107
"Succeeded", "Completed", job_id=backend_job_id
108108
)
109109

110-
assert bool(job_monitor_k8s.should_process_job(job_pod_event)) == should_process
110+
assert (
111+
bool(job_monitor_k8s.should_process_job_pod(job_pod_event))
112+
== should_process
113+
)
111114

112115

113116
@pytest.mark.parametrize(

0 commit comments

Comments
 (0)