Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@
"kubernetes_memory_limit": {
"type": "string"
},
"kubernetes_queue": {
"type": "string"
},
"kubernetes_uid": {
"format": "int32",
"type": "integer"
Expand Down
8 changes: 3 additions & 5 deletions reana_job_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import os
import secrets

from reana_commons.config import REANA_COMPONENT_PREFIX

from werkzeug.utils import import_string

REANA_DB_CLOSE_POOL_CONNECTIONS = bool(
Expand Down Expand Up @@ -177,11 +175,11 @@
SLURM_SSH_AUTH_TIMEOUT = float(os.getenv("SLURM_SSH_AUTH_TIMEOUT", "60"))
"""Seconds to wait for SLURM SSH authentication response."""

USE_KUEUE = bool(strtobool(os.getenv("USE_KUEUE", "False")))
KUEUE_ENABLED = bool(strtobool(os.getenv("KUEUE_ENABLED", "False")))
"""Whether to use Kueue to manage job execution."""

KUEUE_LOCAL_QUEUE_NAME = "local-queue-job"
"""Name of the local queue to be used by Kueue."""
KUEUE_DEFAULT_QUEUE = os.getenv("KUEUE_DEFAULT_QUEUE", "")
"""Name of the default queue to be used by Kueue."""

REANA_USER_ID = os.getenv("REANA_USER_ID")
"""User UUID of the owner of the workflow."""
Expand Down
4 changes: 2 additions & 2 deletions reana_job_controller/job_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ def retrieve_job_logs(job_id):


def store_job_logs(job_id, logs):
"""Store job logs.
"""Store job logs in the database.

:param job_id: Internal REANA job ID.
:param logs: Job logs.
:param logs: Job logs to store.
:type job_id: str
:type logs: str
"""
Expand Down
121 changes: 111 additions & 10 deletions reana_job_controller/job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
from kubernetes import client, watch
from reana_commons.config import REANA_RUNTIME_KUBERNETES_NAMESPACE
from reana_commons.k8s.api_client import current_k8s_corev1_api_client
from reana_db.database import Session
from reana_db.models import Job, JobStatus
from reana_db.models import JobStatus

from reana_job_controller.config import (
COMPUTE_BACKENDS,
Expand All @@ -32,10 +31,11 @@
C4P_SSH_TIMEOUT,
C4P_SSH_BANNER_TIMEOUT,
C4P_SSH_AUTH_TIMEOUT,
KUEUE_ENABLED,
KUEUE_DEFAULT_QUEUE,
)

from reana_job_controller.job_db import JOB_DB, store_job_logs, update_job_status
from reana_job_controller.kubernetes_job_manager import KubernetesJobManager
from reana_job_controller.utils import (
SSHClient,
singleton,
Expand Down Expand Up @@ -104,7 +104,8 @@ def get_reana_job_id(self, backend_job_id: str) -> str:
remaining_jobs = self._get_remaining_jobs()
return remaining_jobs[backend_job_id]

def get_backend_job_id(self, job_pod):
@staticmethod
def get_backend_job_id(job_pod):
"""Get the backend job id for the backend object.

:param job_pod: Compute backend job object (Kubernetes V1Pod
Expand All @@ -115,7 +116,7 @@ def get_backend_job_id(self, job_pod):
"""
return job_pod.metadata.labels["job-name"]

def should_process_job(self, job_pod) -> bool:
def should_process_job_pod(self, job_pod) -> bool:
"""Decide whether the job should be processed or not.

Each job is processed only once, when it reaches a final state (either `failed` or `finished`).
Expand All @@ -141,6 +142,29 @@ def should_process_job(self, job_pod) -> bool:

return is_job_in_remaining_jobs and is_job_completed

def should_process_job(self, job) -> bool:
"""Decide whether the job should be processed or not.

Each job is processed only once, when it reaches a final state (either `failed` or `finished`).

:param job: Compute backend job object (Kubernetes V1Job
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Job.md)
"""
remaining_jobs = self._get_remaining_jobs(
statuses_to_skip=[
JobStatus.finished.name,
JobStatus.failed.name,
JobStatus.stopped.name,
]
)

is_job_in_remaining_jobs = job.metadata.name in remaining_jobs
is_job_completed = not job.status.active and (
job.status.succeeded or job.status.failed
)

return is_job_in_remaining_jobs and is_job_completed

@staticmethod
def _get_job_container_statuses(job_pod):
return (job_pod.status.container_statuses or []) + (
Expand Down Expand Up @@ -230,13 +254,77 @@ def get_job_status(self, job_pod) -> Optional[str]:

return status

def watch_jobs(self, job_db, app=None):
"""Open stream connection to k8s apiserver to watch all jobs status.
def watch_job_event_stream(self):
"""
Watch job events from the Kubernetes API.

:param job_db: Dictionary which contains all current jobs.
This method is used when MultiKueue is enabled, since in that case we can't
directly monitor the worker pods as they are remote.
"""
while True:
logging.info("Starting a new stream request to watch Jobs")

try:
w = watch.Watch()
for event in w.stream(
client.BatchV1Api().list_namespaced_job,
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
label_selector=f"reana-run-job-workflow-uuid={self.workflow_uuid}",
):
logging.info(f"New Job event received: {event["type"]}")

job = event["object"]
job_id = job.metadata.name
job_finished = (
job.status.succeeded
and not job.status.active
and not job.status.failed
)
job_status = (
JobStatus.finished.name
if job_finished
else (
JobStatus.failed.name
if job.status.failed
else JobStatus.running.name
)
)

if self.should_process_job(job):
reana_job_id = self.get_reana_job_id(job_id)

if job_status == JobStatus.failed.name:
self.log_disruption(
event["object"].status.conditions, job_id
)

# TODO: Fetch logs from the remote job pod on the remote worker when MultiKueue supports this
logs = self.job_manager_cls.get_logs(job_id)
if logs is not None:
store_job_logs(reana_job_id, logs)

update_job_status(
reana_job_id,
job_status,
)

if JobStatus.should_cleanup_job(job_status):
self.clean_job(job_id)

except client.rest.ApiException as e:
logging.exception(f"Error from Kubernetes API while watching jobs: {e}")
except Exception as e:
logging.error(traceback.format_exc())
logging.error("Unexpected error: {}".format(e))

def watch_pod_event_stream(self):
"""
Watch pod events from the Kubernetes API.

This method is used when MultiKueue is not enabled, since in that case we can
directly monitor the worker pods as they are running on the local cluster.
"""
while True:
try:
w = watch.Watch()
for event in w.stream(
Expand All @@ -249,7 +337,7 @@ def watch_jobs(self, job_db, app=None):

# Each job is processed once, when reaching a final state
# (either successfully or not)
if self.should_process_job(job_pod):
if self.should_process_job_pod(job_pod):
job_status = self.get_job_status(job_pod)
backend_job_id = self.get_backend_job_id(job_pod)
reana_job_id = self.get_reana_job_id(backend_job_id)
Expand All @@ -276,7 +364,20 @@ def watch_jobs(self, job_db, app=None):
logging.error(traceback.format_exc())
logging.error("Unexpected error: {}".format(e))

def log_disruption(self, conditions, backend_job_id):
def watch_jobs(self, job_db, app=None):
"""Open stream connection to k8s apiserver to watch all jobs status.

:param job_db: Dictionary which contains all current jobs.
"""
# If using MultiKueue, watch jobs instead of pods since worker pods could be
# running on a remote cluster that we can't directly monitor
if KUEUE_ENABLED:
self.watch_job_event_stream()
else:
self.watch_pod_event_stream()

@staticmethod
def log_disruption(conditions, backend_job_id):
"""Log disruption message from Kubernetes event conditions.

Usually it is pod eviction but can be any of https://kubernetes.io/docs/concepts/workloads/pods/disruptions/#pod-disruption-conditions.
Expand Down
45 changes: 30 additions & 15 deletions reana_job_controller/kubernetes_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
REANA_KUBERNETES_JOBS_MEMORY_LIMIT,
REANA_KUBERNETES_JOBS_MAX_USER_MEMORY_LIMIT,
REANA_USER_ID,
USE_KUEUE,
KUEUE_LOCAL_QUEUE_NAME,
KUEUE_ENABLED,
KUEUE_DEFAULT_QUEUE,
)
from reana_job_controller.errors import ComputingBackendSubmissionError
from reana_job_controller.job_manager import JobManager
Expand All @@ -67,6 +67,13 @@ class KubernetesJobManager(JobManager):
MAX_NUM_JOB_RESTARTS = 0
"""Maximum number of job restarts in case of internal failures."""

@property
def secrets(self):
"""Get cached secrets if present, otherwise fetch them from k8s."""
if self._secrets is None:
self._secrets = UserSecretsStore.fetch(REANA_USER_ID)
return self._secrets

def __init__(
self,
docker_img=None,
Expand All @@ -81,6 +88,7 @@ def __init__(
kerberos=False,
kubernetes_uid=None,
kubernetes_memory_limit=None,
kubernetes_queue=None,
voms_proxy=False,
rucio=False,
kubernetes_job_timeout: Optional[int] = None,
Expand Down Expand Up @@ -113,6 +121,8 @@ def __init__(
:type kubernetes_uid: int
:param kubernetes_memory_limit: Memory limit for job container.
:type kubernetes_memory_limit: str
:param kubernetes_queue: If Kueue is enabled of the MultiKueue LocalQueue to send jobs to.
:type kubernetes_queue: str
:param kubernetes_job_timeout: Job timeout in seconds.
:type kubernetes_job_timeout: int
:param voms_proxy: Decides if a voms-proxy certificate should be
Expand Down Expand Up @@ -142,41 +152,46 @@ def __init__(
self.rucio = rucio
self.set_user_id(kubernetes_uid)
self.set_memory_limit(kubernetes_memory_limit)
self.kubernetes_queue = kubernetes_queue
self.workflow_uuid = workflow_uuid
self.kubernetes_job_timeout = kubernetes_job_timeout
self._secrets: Optional[UserSecrets] = secrets

@property
def secrets(self):
"""Get cached secrets if present, otherwise fetch them from k8s."""
if self._secrets is None:
self._secrets = UserSecretsStore.fetch(REANA_USER_ID)
return self._secrets

@JobManager.execution_hook
def execute(self):
"""Execute a job in Kubernetes."""
backend_job_id = build_unique_component_name("run-job")

if KUEUE_ENABLED and not (self.kubernetes_queue or KUEUE_DEFAULT_QUEUE):
logging.error(
"Kueue is enabled but no queue name was provided. Please set a KUEUE_DEFAULT_QUEUE or ensure that all jobs set a value for kubernetes_queue in their spec."
)
raise

labels = {
"reana-run-job-workflow-uuid": self.workflow_uuid,
}

if KUEUE_ENABLED:
labels["kueue.x-k8s.io/queue-name"] = (
f"{self.kubernetes_queue or KUEUE_DEFAULT_QUEUE}-job-queue"
)

self.job = {
"kind": "Job",
"apiVersion": "batch/v1",
"metadata": {
"name": backend_job_id,
"namespace": REANA_RUNTIME_KUBERNETES_NAMESPACE,
"labels": (
{"kueue.x-k8s.io/queue-name": KUEUE_LOCAL_QUEUE_NAME}
if USE_KUEUE
else {}
),
"labels": labels,
},
"spec": {
"backoffLimit": KubernetesJobManager.MAX_NUM_JOB_RESTARTS,
"autoSelector": True,
"template": {
"metadata": {
"name": backend_job_id,
"labels": {"reana-run-job-workflow-uuid": self.workflow_uuid},
"labels": labels,
},
"spec": {
"automountServiceAccountToken": False,
Expand Down
1 change: 1 addition & 0 deletions reana_job_controller/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class JobRequest(Schema):
rucio = fields.Bool(required=False)
kubernetes_uid = fields.Int(required=False)
kubernetes_memory_limit = fields.Str(required=False)
kubernetes_queue = fields.Str(required=False)
kubernetes_job_timeout = fields.Int(required=False)
unpacked_img = fields.Bool(required=False)
htcondor_max_runtime = fields.Str(required=False)
Expand Down
Loading