Skip to content

Commit c6cff43

Browse files
feat(job-manager): add kubernetes_queue option for MultiKueue (#494)
Also adds monitoring of job status (rather than just pods) to the job_monitor to support MK since the status of remote pods cannot be seen. Closes #493
1 parent f014442 commit c6cff43

File tree

7 files changed

+197
-71
lines changed

7 files changed

+197
-71
lines changed

docs/openapi.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@
8585
"kubernetes_memory_limit": {
8686
"type": "string"
8787
},
88+
"kubernetes_queue": {
89+
"type": "string"
90+
},
8891
"kubernetes_uid": {
8992
"format": "int32",
9093
"type": "integer"

reana_job_controller/config.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
import os
1313
import secrets
1414

15-
from reana_commons.config import REANA_COMPONENT_PREFIX
16-
1715
from werkzeug.utils import import_string
1816

1917
REANA_DB_CLOSE_POOL_CONNECTIONS = bool(
@@ -177,11 +175,11 @@
177175
SLURM_SSH_AUTH_TIMEOUT = float(os.getenv("SLURM_SSH_AUTH_TIMEOUT", "60"))
178176
"""Seconds to wait for SLURM SSH authentication response."""
179177

180-
USE_KUEUE = bool(strtobool(os.getenv("USE_KUEUE", "False")))
178+
KUEUE_ENABLED = bool(strtobool(os.getenv("KUEUE_ENABLED", "False")))
181179
"""Whether to use Kueue to manage job execution."""
182180

183-
KUEUE_LOCAL_QUEUE_NAME = "local-queue-job"
184-
"""Name of the local queue to be used by Kueue."""
181+
KUEUE_DEFAULT_QUEUE = os.getenv("KUEUE_DEFAULT_QUEUE", "")
182+
"""Name of the default queue to be used by Kueue."""
185183

186184
REANA_USER_ID = os.getenv("REANA_USER_ID")
187185
"""User UUID of the owner of the workflow."""

reana_job_controller/job_db.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,10 @@ def retrieve_job_logs(job_id):
115115

116116

117117
def store_job_logs(job_id, logs):
118-
"""Store job logs.
118+
"""Store job logs in the database.
119119
120120
:param job_id: Internal REANA job ID.
121-
:param logs: Job logs.
121+
:param logs: Job logs to store.
122122
:type job_id: str
123123
:type logs: str
124124
"""

reana_job_controller/job_monitor.py

Lines changed: 124 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717
from kubernetes import client, watch
1818
from reana_commons.config import REANA_RUNTIME_KUBERNETES_NAMESPACE
1919
from reana_commons.k8s.api_client import current_k8s_corev1_api_client
20-
from reana_db.database import Session
21-
from reana_db.models import Job, JobStatus
20+
from reana_db.models import JobStatus
2221

2322
from reana_job_controller.config import (
2423
COMPUTE_BACKENDS,
@@ -32,10 +31,10 @@
3231
C4P_SSH_TIMEOUT,
3332
C4P_SSH_BANNER_TIMEOUT,
3433
C4P_SSH_AUTH_TIMEOUT,
34+
KUEUE_ENABLED,
3535
)
3636

3737
from reana_job_controller.job_db import JOB_DB, store_job_logs, update_job_status
38-
from reana_job_controller.kubernetes_job_manager import KubernetesJobManager
3938
from reana_job_controller.utils import (
4039
SSHClient,
4140
singleton,
@@ -104,7 +103,8 @@ def get_reana_job_id(self, backend_job_id: str) -> str:
104103
remaining_jobs = self._get_remaining_jobs()
105104
return remaining_jobs[backend_job_id]
106105

107-
def get_backend_job_id(self, job_pod):
106+
@staticmethod
107+
def get_backend_job_id(job_pod):
108108
"""Get the backend job id for the backend object.
109109
110110
:param job_pod: Compute backend job object (Kubernetes V1Pod
@@ -115,7 +115,7 @@ def get_backend_job_id(self, job_pod):
115115
"""
116116
return job_pod.metadata.labels["job-name"]
117117

118-
def should_process_job(self, job_pod) -> bool:
118+
def should_process_job_pod(self, job_pod) -> bool:
119119
"""Decide whether the job should be processed or not.
120120
121121
Each job is processed only once, when it reaches a final state (either `failed` or `finished`).
@@ -141,6 +141,27 @@ def should_process_job(self, job_pod) -> bool:
141141

142142
return is_job_in_remaining_jobs and is_job_completed
143143

144+
def should_process_job(self, job) -> bool:
145+
"""Decide whether the job should be processed or not.
146+
147+
Each job is processed only once, when it reaches a final state (either `failed` or `finished`).
148+
149+
:param job: Compute backend job object (Kubernetes V1Job
150+
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Job.md)
151+
"""
152+
remaining_jobs = self._get_remaining_jobs(
153+
statuses_to_skip=[
154+
JobStatus.finished.name,
155+
JobStatus.failed.name,
156+
JobStatus.stopped.name,
157+
]
158+
)
159+
160+
is_job_in_remaining_jobs = job.metadata.name in remaining_jobs
161+
is_job_completed = job.status.succeeded and not job.status.active
162+
163+
return is_job_in_remaining_jobs and is_job_completed
164+
144165
@staticmethod
145166
def _get_job_container_statuses(job_pod):
146167
return (job_pod.status.container_statuses or []) + (
@@ -235,48 +256,111 @@ def watch_jobs(self, job_db, app=None):
235256
236257
:param job_db: Dictionary which contains all current jobs.
237258
"""
238-
while True:
239-
logging.info("Starting a new stream request to watch Jobs")
240-
try:
241-
w = watch.Watch()
242-
for event in w.stream(
243-
current_k8s_corev1_api_client.list_namespaced_pod,
244-
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
245-
label_selector=f"reana-run-job-workflow-uuid={self.workflow_uuid}",
246-
):
247-
logging.info("New Pod event received: {0}".format(event["type"]))
248-
job_pod = event["object"]
249-
250-
# Each job is processed once, when reaching a final state
251-
# (either successfully or not)
252-
if self.should_process_job(job_pod):
253-
job_status = self.get_job_status(job_pod)
254-
backend_job_id = self.get_backend_job_id(job_pod)
255-
reana_job_id = self.get_reana_job_id(backend_job_id)
259+
# If using MultiKueue, watch jobs instead of pods since worker pods could be
260+
# running on a remote cluster that we can't directly monitor
261+
if KUEUE_ENABLED:
262+
while True:
263+
logging.info("Starting a new stream request to watch Jobs")
256264

257-
logs = self.job_manager_cls.get_logs(
258-
backend_job_id, job_pod=job_pod
265+
try:
266+
w = watch.Watch()
267+
for event in w.stream(
268+
client.BatchV1Api().list_namespaced_job,
269+
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
270+
label_selector=f"reana-run-job-workflow-uuid={self.workflow_uuid}",
271+
):
272+
logging.info(f"New Job event received: {event["type"]}")
273+
274+
job = event["object"]
275+
job_id = job.metadata.name
276+
job_finished = (
277+
job.status.succeeded
278+
and not job.status.active
279+
and not job.status.failed
259280
)
281+
job_status = (
282+
JobStatus.finished.name
283+
if job_finished
284+
else (
285+
JobStatus.failed.name
286+
if job.status.failed
287+
else JobStatus.running.name
288+
)
289+
)
290+
291+
if self.should_process_job(job):
292+
reana_job_id = self.get_reana_job_id(job_id)
260293

261-
if job_status == JobStatus.failed.name:
262-
self.log_disruption(
263-
event["object"].status.conditions, backend_job_id
294+
if job_status == JobStatus.failed.name:
295+
self.log_disruption(
296+
event["object"].status.conditions, job_id
297+
)
298+
299+
# TODO: Fetch logs from the remote job pod on the remote worker when MultiKueue supports this
300+
logs = self.job_manager_cls.get_logs(job_id)
301+
if logs is not None:
302+
store_job_logs(reana_job_id, logs)
303+
304+
update_job_status(
305+
reana_job_id,
306+
job_status,
264307
)
265308

266-
store_job_logs(reana_job_id, logs)
267-
update_job_status(reana_job_id, job_status)
309+
if JobStatus.should_cleanup_job(job_status):
310+
self.clean_job(job_id)
268311

269-
if JobStatus.should_cleanup_job(job_status):
270-
self.clean_job(backend_job_id)
271-
except client.rest.ApiException as e:
272-
logging.exception(
273-
f"Error from Kubernetes API while watching jobs pods: {e}"
274-
)
275-
except Exception as e:
276-
logging.error(traceback.format_exc())
277-
logging.error("Unexpected error: {}".format(e))
312+
except client.rest.ApiException as e:
313+
logging.exception(
314+
f"Error from Kubernetes API while watching jobs: {e}"
315+
)
316+
except Exception as e:
317+
logging.error(traceback.format_exc())
318+
logging.error("Unexpected error: {}".format(e))
319+
else:
320+
while True:
321+
try:
322+
w = watch.Watch()
323+
for event in w.stream(
324+
current_k8s_corev1_api_client.list_namespaced_pod,
325+
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
326+
label_selector=f"reana-run-job-workflow-uuid={self.workflow_uuid}",
327+
):
328+
logging.info(
329+
"New Pod event received: {0}".format(event["type"])
330+
)
331+
job_pod = event["object"]
332+
333+
# Each job is processed once, when reaching a final state
334+
# (either successfully or not)
335+
if self.should_process_job_pod(job_pod):
336+
job_status = self.get_job_status(job_pod)
337+
backend_job_id = self.get_backend_job_id(job_pod)
338+
reana_job_id = self.get_reana_job_id(backend_job_id)
339+
340+
logs = self.job_manager_cls.get_logs(
341+
backend_job_id, job_pod=job_pod
342+
)
343+
344+
if job_status == JobStatus.failed.name:
345+
self.log_disruption(
346+
event["object"].status.conditions, backend_job_id
347+
)
278348

279-
def log_disruption(self, conditions, backend_job_id):
349+
store_job_logs(reana_job_id, logs)
350+
update_job_status(reana_job_id, job_status)
351+
352+
if JobStatus.should_cleanup_job(job_status):
353+
self.clean_job(backend_job_id)
354+
except client.rest.ApiException as e:
355+
logging.exception(
356+
f"Error from Kubernetes API while watching jobs pods: {e}"
357+
)
358+
except Exception as e:
359+
logging.error(traceback.format_exc())
360+
logging.error("Unexpected error: {}".format(e))
361+
362+
@staticmethod
363+
def log_disruption(conditions, backend_job_id):
280364
"""Log disruption message from Kubernetes event conditions.
281365
282366
Usually it is pod eviction but can be any of https://kubernetes.io/docs/concepts/workloads/pods/disruptions/#pod-disruption-conditions.

0 commit comments

Comments
 (0)