1717from kubernetes import client , watch
1818from reana_commons .config import REANA_RUNTIME_KUBERNETES_NAMESPACE
1919from reana_commons .k8s .api_client import current_k8s_corev1_api_client
20- from reana_db .database import Session
21- from reana_db .models import Job , JobStatus
20+ from reana_db .models import JobStatus
2221
2322from reana_job_controller .config import (
2423 COMPUTE_BACKENDS ,
3231 C4P_SSH_TIMEOUT ,
3332 C4P_SSH_BANNER_TIMEOUT ,
3433 C4P_SSH_AUTH_TIMEOUT ,
34+ KUEUE_ENABLED ,
35+ KUEUE_DEFAULT_QUEUE ,
3536)
3637
3738from reana_job_controller .job_db import JOB_DB , store_job_logs , update_job_status
38- from reana_job_controller .kubernetes_job_manager import KubernetesJobManager
3939from reana_job_controller .utils import (
4040 SSHClient ,
4141 singleton ,
@@ -104,7 +104,8 @@ def get_reana_job_id(self, backend_job_id: str) -> str:
104104 remaining_jobs = self ._get_remaining_jobs ()
105105 return remaining_jobs [backend_job_id ]
106106
107- def get_backend_job_id (self , job_pod ):
107+ @staticmethod
108+ def get_backend_job_id (job_pod ):
108109 """Get the backend job id for the backend object.
109110
110111 :param job_pod: Compute backend job object (Kubernetes V1Pod
@@ -115,7 +116,7 @@ def get_backend_job_id(self, job_pod):
115116 """
116117 return job_pod .metadata .labels ["job-name" ]
117118
118- def should_process_job (self , job_pod ) -> bool :
119+ def should_process_job_pod (self , job_pod ) -> bool :
119120 """Decide whether the job should be processed or not.
120121
121122 Each job is processed only once, when it reaches a final state (either `failed` or `finished`).
@@ -141,6 +142,29 @@ def should_process_job(self, job_pod) -> bool:
141142
142143 return is_job_in_remaining_jobs and is_job_completed
143144
145+ def should_process_job (self , job ) -> bool :
146+ """Decide whether the job should be processed or not.
147+
148+ Each job is processed only once, when it reaches a final state (either `failed` or `finished`).
149+
150+ :param job: Compute backend job object (Kubernetes V1Job
151+ https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Job.md)
152+ """
153+ remaining_jobs = self ._get_remaining_jobs (
154+ statuses_to_skip = [
155+ JobStatus .finished .name ,
156+ JobStatus .failed .name ,
157+ JobStatus .stopped .name ,
158+ ]
159+ )
160+
161+ is_job_in_remaining_jobs = job .metadata .name in remaining_jobs
162+ is_job_completed = not job .status .active and (
163+ job .status .succeeded or job .status .failed
164+ )
165+
166+ return is_job_in_remaining_jobs and is_job_completed
167+
144168 @staticmethod
145169 def _get_job_container_statuses (job_pod ):
146170 return (job_pod .status .container_statuses or []) + (
@@ -230,13 +254,77 @@ def get_job_status(self, job_pod) -> Optional[str]:
230254
231255 return status
232256
233- def watch_jobs (self , job_db , app = None ):
234- """Open stream connection to k8s apiserver to watch all jobs status.
257+ def watch_job_event_stream (self ):
258+ """
259+ Watch job events from the Kubernetes API.
235260
236- :param job_db: Dictionary which contains all current jobs.
261+ This method is used when MultiKueue is enabled, since in that case we can't
262+ directly monitor the worker pods as they are remote.
237263 """
238264 while True :
239265 logging .info ("Starting a new stream request to watch Jobs" )
266+
267+ try :
268+ w = watch .Watch ()
269+ for event in w .stream (
270+ client .BatchV1Api ().list_namespaced_job ,
271+ namespace = REANA_RUNTIME_KUBERNETES_NAMESPACE ,
272+ label_selector = f"reana-run-job-workflow-uuid={ self .workflow_uuid } " ,
273+ ):
274+ logging .info (f"New Job event received: { event ["type" ]} " )
275+
276+ job = event ["object" ]
277+ job_id = job .metadata .name
278+ job_finished = (
279+ job .status .succeeded
280+ and not job .status .active
281+ and not job .status .failed
282+ )
283+ job_status = (
284+ JobStatus .finished .name
285+ if job_finished
286+ else (
287+ JobStatus .failed .name
288+ if job .status .failed
289+ else JobStatus .running .name
290+ )
291+ )
292+
293+ if self .should_process_job (job ):
294+ reana_job_id = self .get_reana_job_id (job_id )
295+
296+ if job_status == JobStatus .failed .name :
297+ self .log_disruption (
298+ event ["object" ].status .conditions , job_id
299+ )
300+
301+ # TODO: Fetch logs from the remote job pod on the remote worker when MultiKueue supports this
302+ logs = self .job_manager_cls .get_logs (job_id )
303+ if logs is not None :
304+ store_job_logs (reana_job_id , logs )
305+
306+ update_job_status (
307+ reana_job_id ,
308+ job_status ,
309+ )
310+
311+ if JobStatus .should_cleanup_job (job_status ):
312+ self .clean_job (job_id )
313+
314+ except client .rest .ApiException as e :
315+ logging .exception (f"Error from Kubernetes API while watching jobs: { e } " )
316+ except Exception as e :
317+ logging .error (traceback .format_exc ())
318+ logging .error ("Unexpected error: {}" .format (e ))
319+
320+ def watch_pod_event_stream (self ):
321+ """
322+ Watch pod events from the Kubernetes API.
323+
324+ This method is used when MultiKueue is not enabled, since in that case we can
325+ directly monitor the worker pods as they are running on the local cluster.
326+ """
327+ while True :
240328 try :
241329 w = watch .Watch ()
242330 for event in w .stream (
@@ -249,7 +337,7 @@ def watch_jobs(self, job_db, app=None):
249337
250338 # Each job is processed once, when reaching a final state
251339 # (either successfully or not)
252- if self .should_process_job (job_pod ):
340+ if self .should_process_job_pod (job_pod ):
253341 job_status = self .get_job_status (job_pod )
254342 backend_job_id = self .get_backend_job_id (job_pod )
255343 reana_job_id = self .get_reana_job_id (backend_job_id )
@@ -276,7 +364,20 @@ def watch_jobs(self, job_db, app=None):
276364 logging .error (traceback .format_exc ())
277365 logging .error ("Unexpected error: {}" .format (e ))
278366
279- def log_disruption (self , conditions , backend_job_id ):
367+ def watch_jobs (self , job_db , app = None ):
368+ """Open stream connection to k8s apiserver to watch all jobs status.
369+
370+ :param job_db: Dictionary which contains all current jobs.
371+ """
372+ # If using MultiKueue, watch jobs instead of pods since worker pods could be
373+ # running on a remote cluster that we can't directly monitor
374+ if KUEUE_ENABLED :
375+ self .watch_job_event_stream ()
376+ else :
377+ self .watch_pod_event_stream ()
378+
379+ @staticmethod
380+ def log_disruption (conditions , backend_job_id ):
280381 """Log disruption message from Kubernetes event conditions.
281382
282383 Usually it is pod eviction but can be any of https://kubernetes.io/docs/concepts/workloads/pods/disruptions/#pod-disruption-conditions.
0 commit comments