Skip to content

Commit 78489b8

Browse files
feat(job-manager): add kubernetes_queue option for MultiKueue (reanahub#494)
Also adds monitoring of job status (rather than just pods) to the job_monitor to support MK since the status of remote pods cannot be seen. Closes reanahub#493
1 parent f014442 commit 78489b8

File tree

7 files changed

+204
-33
lines changed

7 files changed

+204
-33
lines changed

docs/openapi.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@
8585
"kubernetes_memory_limit": {
8686
"type": "string"
8787
},
88+
"kubernetes_queue": {
89+
"type": "string"
90+
},
8891
"kubernetes_uid": {
8992
"format": "int32",
9093
"type": "integer"

reana_job_controller/config.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
import os
1313
import secrets
1414

15-
from reana_commons.config import REANA_COMPONENT_PREFIX
16-
1715
from werkzeug.utils import import_string
1816

1917
REANA_DB_CLOSE_POOL_CONNECTIONS = bool(
@@ -177,11 +175,11 @@
177175
SLURM_SSH_AUTH_TIMEOUT = float(os.getenv("SLURM_SSH_AUTH_TIMEOUT", "60"))
178176
"""Seconds to wait for SLURM SSH authentication response."""
179177

180-
USE_KUEUE = bool(strtobool(os.getenv("USE_KUEUE", "False")))
178+
KUEUE_ENABLED = bool(strtobool(os.getenv("KUEUE_ENABLED", "False")))
181179
"""Whether to use Kueue to manage job execution."""
182180

183-
KUEUE_LOCAL_QUEUE_NAME = "local-queue-job"
184-
"""Name of the local queue to be used by Kueue."""
181+
KUEUE_DEFAULT_QUEUE = os.getenv("KUEUE_DEFAULT_QUEUE", "")
182+
"""Name of the default queue to be used by Kueue."""
185183

186184
REANA_USER_ID = os.getenv("REANA_USER_ID")
187185
"""User UUID of the owner of the workflow."""

reana_job_controller/job_db.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,10 @@ def retrieve_job_logs(job_id):
115115

116116

117117
def store_job_logs(job_id, logs):
118-
"""Store job logs.
118+
"""Store job logs in the database.
119119
120120
:param job_id: Internal REANA job ID.
121-
:param logs: Job logs.
121+
:param logs: Job logs to store.
122122
:type job_id: str
123123
:type logs: str
124124
"""

reana_job_controller/job_monitor.py

Lines changed: 109 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717
from kubernetes import client, watch
1818
from reana_commons.config import REANA_RUNTIME_KUBERNETES_NAMESPACE
1919
from reana_commons.k8s.api_client import current_k8s_corev1_api_client
20-
from reana_db.database import Session
21-
from reana_db.models import Job, JobStatus
20+
from reana_db.models import JobStatus
2221

2322
from reana_job_controller.config import (
2423
COMPUTE_BACKENDS,
@@ -32,10 +31,11 @@
3231
C4P_SSH_TIMEOUT,
3332
C4P_SSH_BANNER_TIMEOUT,
3433
C4P_SSH_AUTH_TIMEOUT,
34+
KUEUE_ENABLED,
35+
KUEUE_DEFAULT_QUEUE,
3536
)
3637

3738
from reana_job_controller.job_db import JOB_DB, store_job_logs, update_job_status
38-
from reana_job_controller.kubernetes_job_manager import KubernetesJobManager
3939
from reana_job_controller.utils import (
4040
SSHClient,
4141
singleton,
@@ -104,7 +104,8 @@ def get_reana_job_id(self, backend_job_id: str) -> str:
104104
remaining_jobs = self._get_remaining_jobs()
105105
return remaining_jobs[backend_job_id]
106106

107-
def get_backend_job_id(self, job_pod):
107+
@staticmethod
108+
def get_backend_job_id(job_pod):
108109
"""Get the backend job id for the backend object.
109110
110111
:param job_pod: Compute backend job object (Kubernetes V1Pod
@@ -115,7 +116,7 @@ def get_backend_job_id(self, job_pod):
115116
"""
116117
return job_pod.metadata.labels["job-name"]
117118

118-
def should_process_job(self, job_pod) -> bool:
119+
def should_process_job_pod(self, job_pod) -> bool:
119120
"""Decide whether the job should be processed or not.
120121
121122
Each job is processed only once, when it reaches a final state (either `failed` or `finished`).
@@ -141,6 +142,27 @@ def should_process_job(self, job_pod) -> bool:
141142

142143
return is_job_in_remaining_jobs and is_job_completed
143144

145+
def should_process_job(self, job) -> bool:
146+
"""Decide whether the job should be processed or not.
147+
148+
Each job is processed only once, when it reaches a final state (either `failed` or `finished`).
149+
150+
:param job: Compute backend job object (Kubernetes V1Job
151+
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Job.md)
152+
"""
153+
remaining_jobs = self._get_remaining_jobs(
154+
statuses_to_skip=[
155+
JobStatus.finished.name,
156+
JobStatus.failed.name,
157+
JobStatus.stopped.name,
158+
]
159+
)
160+
161+
is_job_in_remaining_jobs = job.metadata.name in remaining_jobs
162+
is_job_completed = not job.status.active and (job.status.succeeded or job.status.failed)
163+
164+
return is_job_in_remaining_jobs and is_job_completed
165+
144166
@staticmethod
145167
def _get_job_container_statuses(job_pod):
146168
return (job_pod.status.container_statuses or []) + (
@@ -230,13 +252,77 @@ def get_job_status(self, job_pod) -> Optional[str]:
230252

231253
return status
232254

233-
def watch_jobs(self, job_db, app=None):
234-
"""Open stream connection to k8s apiserver to watch all jobs status.
255+
def watch_job_event_stream(self):
256+
"""
257+
Watch job events from the Kubernetes API.
235258
236-
:param job_db: Dictionary which contains all current jobs.
259+
This method is used when MultiKueue is enabled, since in that case we can't
260+
directly monitor the worker pods as they are remote.
237261
"""
238262
while True:
239263
logging.info("Starting a new stream request to watch Jobs")
264+
265+
try:
266+
w = watch.Watch()
267+
for event in w.stream(
268+
client.BatchV1Api().list_namespaced_job,
269+
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
270+
label_selector=f"reana-run-job-workflow-uuid={self.workflow_uuid}",
271+
):
272+
logging.info(f"New Job event received: {event["type"]}")
273+
274+
job = event["object"]
275+
job_id = job.metadata.name
276+
job_finished = (
277+
job.status.succeeded
278+
and not job.status.active
279+
and not job.status.failed
280+
)
281+
job_status = (
282+
JobStatus.finished.name
283+
if job_finished
284+
else (
285+
JobStatus.failed.name
286+
if job.status.failed
287+
else JobStatus.running.name
288+
)
289+
)
290+
291+
if self.should_process_job(job):
292+
reana_job_id = self.get_reana_job_id(job_id)
293+
294+
if job_status == JobStatus.failed.name:
295+
self.log_disruption(
296+
event["object"].status.conditions, job_id
297+
)
298+
299+
# TODO: Fetch logs from the remote job pod on the remote worker when MultiKueue supports this
300+
logs = self.job_manager_cls.get_logs(job_id)
301+
if logs is not None:
302+
store_job_logs(reana_job_id, logs)
303+
304+
update_job_status(
305+
reana_job_id,
306+
job_status,
307+
)
308+
309+
if JobStatus.should_cleanup_job(job_status):
310+
self.clean_job(job_id)
311+
312+
except client.rest.ApiException as e:
313+
logging.exception(f"Error from Kubernetes API while watching jobs: {e}")
314+
except Exception as e:
315+
logging.error(traceback.format_exc())
316+
logging.error("Unexpected error: {}".format(e))
317+
318+
def watch_pod_event_stream(self):
319+
"""
320+
Watch pod events from the Kubernetes API.
321+
322+
This method is used when MultiKueue is not enabled, since in that case we can
323+
directly monitor the worker pods as they are running on the local cluster.
324+
"""
325+
while True:
240326
try:
241327
w = watch.Watch()
242328
for event in w.stream(
@@ -249,7 +335,7 @@ def watch_jobs(self, job_db, app=None):
249335

250336
# Each job is processed once, when reaching a final state
251337
# (either successfully or not)
252-
if self.should_process_job(job_pod):
338+
if self.should_process_job_pod(job_pod):
253339
job_status = self.get_job_status(job_pod)
254340
backend_job_id = self.get_backend_job_id(job_pod)
255341
reana_job_id = self.get_reana_job_id(backend_job_id)
@@ -276,7 +362,20 @@ def watch_jobs(self, job_db, app=None):
276362
logging.error(traceback.format_exc())
277363
logging.error("Unexpected error: {}".format(e))
278364

279-
def log_disruption(self, conditions, backend_job_id):
365+
def watch_jobs(self, job_db, app=None):
366+
"""Open stream connection to k8s apiserver to watch all jobs status.
367+
368+
:param job_db: Dictionary which contains all current jobs.
369+
"""
370+
# If using MultiKueue, watch jobs instead of pods since worker pods could be
371+
# running on a remote cluster that we can't directly monitor
372+
if KUEUE_ENABLED:
373+
self.watch_job_event_stream()
374+
else:
375+
self.watch_pod_event_stream()
376+
377+
@staticmethod
378+
def log_disruption(conditions, backend_job_id):
280379
"""Log disruption message from Kubernetes event conditions.
281380
282381
Usually it is pod eviction but can be any of https://kubernetes.io/docs/concepts/workloads/pods/disruptions/#pod-disruption-conditions.

reana_job_controller/kubernetes_job_manager.py

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@
5252
REANA_KUBERNETES_JOBS_MEMORY_LIMIT,
5353
REANA_KUBERNETES_JOBS_MAX_USER_MEMORY_LIMIT,
5454
REANA_USER_ID,
55-
USE_KUEUE,
56-
KUEUE_LOCAL_QUEUE_NAME,
55+
KUEUE_ENABLED,
56+
KUEUE_DEFAULT_QUEUE,
5757
)
5858
from reana_job_controller.errors import ComputingBackendSubmissionError
5959
from reana_job_controller.job_manager import JobManager
@@ -67,6 +67,13 @@ class KubernetesJobManager(JobManager):
6767
MAX_NUM_JOB_RESTARTS = 0
6868
"""Maximum number of job restarts in case of internal failures."""
6969

70+
@property
71+
def secrets(self):
72+
"""Get cached secrets if present, otherwise fetch them from k8s."""
73+
if self._secrets is None:
74+
self._secrets = UserSecretsStore.fetch(REANA_USER_ID)
75+
return self._secrets
76+
7077
def __init__(
7178
self,
7279
docker_img=None,
@@ -81,6 +88,7 @@ def __init__(
8188
kerberos=False,
8289
kubernetes_uid=None,
8390
kubernetes_memory_limit=None,
91+
kubernetes_queue=None,
8492
voms_proxy=False,
8593
rucio=False,
8694
kubernetes_job_timeout: Optional[int] = None,
@@ -113,6 +121,8 @@ def __init__(
113121
:type kubernetes_uid: int
114122
:param kubernetes_memory_limit: Memory limit for job container.
115123
:type kubernetes_memory_limit: str
124+
:param kubernetes_queue: If Kueue is enabled of the MultiKueue LocalQueue to send jobs to.
125+
:type kubernetes_queue: str
116126
:param kubernetes_job_timeout: Job timeout in seconds.
117127
:type kubernetes_job_timeout: int
118128
:param voms_proxy: Decides if a voms-proxy certificate should be
@@ -142,41 +152,46 @@ def __init__(
142152
self.rucio = rucio
143153
self.set_user_id(kubernetes_uid)
144154
self.set_memory_limit(kubernetes_memory_limit)
155+
self.kubernetes_queue = kubernetes_queue
145156
self.workflow_uuid = workflow_uuid
146157
self.kubernetes_job_timeout = kubernetes_job_timeout
147158
self._secrets: Optional[UserSecrets] = secrets
148159

149-
@property
150-
def secrets(self):
151-
"""Get cached secrets if present, otherwise fetch them from k8s."""
152-
if self._secrets is None:
153-
self._secrets = UserSecretsStore.fetch(REANA_USER_ID)
154-
return self._secrets
155-
156160
@JobManager.execution_hook
157161
def execute(self):
158162
"""Execute a job in Kubernetes."""
159163
backend_job_id = build_unique_component_name("run-job")
160164

165+
if KUEUE_ENABLED and not (self.kubernetes_queue or KUEUE_DEFAULT_QUEUE):
166+
logging.error(
167+
"Kueue is enabled but no queue name was provided. Please set a KUEUE_DEFAULT_QUEUE or ensure that all jobs set a value for kubernetes_queue in their spec."
168+
)
169+
raise
170+
171+
labels = {
172+
"reana-run-job-workflow-uuid": self.workflow_uuid,
173+
}
174+
175+
if KUEUE_ENABLED:
176+
labels["kueue.x-k8s.io/queue-name"] = (
177+
f"{self.kubernetes_queue or KUEUE_DEFAULT_QUEUE}-job-queue"
178+
)
179+
161180
self.job = {
162181
"kind": "Job",
163182
"apiVersion": "batch/v1",
164183
"metadata": {
165184
"name": backend_job_id,
166185
"namespace": REANA_RUNTIME_KUBERNETES_NAMESPACE,
167-
"labels": (
168-
{"kueue.x-k8s.io/queue-name": KUEUE_LOCAL_QUEUE_NAME}
169-
if USE_KUEUE
170-
else {}
171-
),
186+
"labels": labels,
172187
},
173188
"spec": {
174189
"backoffLimit": KubernetesJobManager.MAX_NUM_JOB_RESTARTS,
175190
"autoSelector": True,
176191
"template": {
177192
"metadata": {
178193
"name": backend_job_id,
179-
"labels": {"reana-run-job-workflow-uuid": self.workflow_uuid},
194+
"labels": labels,
180195
},
181196
"spec": {
182197
"automountServiceAccountToken": False,

reana_job_controller/schemas.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class JobRequest(Schema):
5050
rucio = fields.Bool(required=False)
5151
kubernetes_uid = fields.Int(required=False)
5252
kubernetes_memory_limit = fields.Str(required=False)
53+
kubernetes_queue = fields.Str(required=False)
5354
kubernetes_job_timeout = fields.Int(required=False)
5455
unpacked_img = fields.Bool(required=False)
5556
htcondor_max_runtime = fields.Str(required=False)

0 commit comments

Comments
 (0)