1717from kubernetes import client , watch
1818from reana_commons .config import REANA_RUNTIME_KUBERNETES_NAMESPACE
1919from reana_commons .k8s .api_client import current_k8s_corev1_api_client
20- from reana_db .database import Session
21- from reana_db .models import Job , JobStatus
20+ from reana_db .models import JobStatus
2221
2322from reana_job_controller .config import (
2423 COMPUTE_BACKENDS ,
3231 C4P_SSH_TIMEOUT ,
3332 C4P_SSH_BANNER_TIMEOUT ,
3433 C4P_SSH_AUTH_TIMEOUT ,
34+ USE_KUEUE ,
3535)
3636
3737from reana_job_controller .job_db import JOB_DB , store_job_logs , update_job_status
38- from reana_job_controller .kubernetes_job_manager import KubernetesJobManager
3938from reana_job_controller .utils import (
4039 SSHClient ,
4140 singleton ,
@@ -115,7 +114,7 @@ def get_backend_job_id(self, job_pod):
115114 """
116115 return job_pod .metadata .labels ["job-name" ]
117116
118- def should_process_job (self , job_pod ) -> bool :
117+ def should_process_job_pod (self , job_pod ) -> bool :
119118 """Decide whether the job should be processed or not.
120119
121120 Each job is processed only once, when it reaches a final state (either `failed` or `finished`).
@@ -141,6 +140,27 @@ def should_process_job(self, job_pod) -> bool:
141140
142141 return is_job_in_remaining_jobs and is_job_completed
143142
143+ def should_process_job (self , job ) -> bool :
144+ """Decide whether the job should be processed or not.
145+
146+ Each job is processed only once, when it reaches a final state (either `failed` or `finished`).
147+
148+ :param job: Compute backend job object (Kubernetes V1Job
149+ https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Job.md)
150+ """
151+ remaining_jobs = self ._get_remaining_jobs (
152+ statuses_to_skip = [
153+ JobStatus .finished .name ,
154+ JobStatus .failed .name ,
155+ JobStatus .stopped .name ,
156+ ]
157+ )
158+
159+ is_job_in_remaining_jobs = job .metadata .name in remaining_jobs
160+ is_job_completed = job .status .succeeded and not job .status .active
161+
162+ return is_job_in_remaining_jobs and is_job_completed
163+
144164 @staticmethod
145165 def _get_job_container_statuses (job_pod ):
146166 return (job_pod .status .container_statuses or []) + (
@@ -235,46 +255,107 @@ def watch_jobs(self, job_db, app=None):
235255
236256 :param job_db: Dictionary which contains all current jobs.
237257 """
238- while True :
239- logging .info ("Starting a new stream request to watch Jobs" )
240- try :
241- w = watch .Watch ()
242- for event in w .stream (
243- current_k8s_corev1_api_client .list_namespaced_pod ,
244- namespace = REANA_RUNTIME_KUBERNETES_NAMESPACE ,
245- label_selector = f"reana-run-job-workflow-uuid={ self .workflow_uuid } " ,
246- ):
247- logging .info ("New Pod event received: {0}" .format (event ["type" ]))
248- job_pod = event ["object" ]
249-
250- # Each job is processed once, when reaching a final state
251- # (either successfully or not)
252- if self .should_process_job (job_pod ):
253- job_status = self .get_job_status (job_pod )
254- backend_job_id = self .get_backend_job_id (job_pod )
255- reana_job_id = self .get_reana_job_id (backend_job_id )
258+ # If using MultiKueue, watch jobs instead of pods since worker pods could be
259+ # running on a remote cluster that we can't directly monitor
260+ if USE_KUEUE :
261+ while True :
262+ logging .info ("Starting a new stream request to watch Jobs" )
256263
257- logs = self .job_manager_cls .get_logs (
258- backend_job_id , job_pod = job_pod
264+ try :
265+ w = watch .Watch ()
266+ for event in w .stream (
267+ client .BatchV1Api ().list_namespaced_job ,
268+ namespace = REANA_RUNTIME_KUBERNETES_NAMESPACE ,
269+ label_selector = f"reana-run-job-workflow-uuid={ self .workflow_uuid } " ,
270+ ):
271+ logging .info (f"New Job event received: { event ["type" ]} " )
272+
273+ job = event ["object" ]
274+ job_id = job .metadata .name
275+ job_finished = (
276+ job .status .succeeded
277+ and not job .status .active
278+ and not job .status .failed
259279 )
280+ job_status = (
281+ JobStatus .finished .name
282+ if job_finished
283+ else (
284+ JobStatus .failed .name
285+ if job .status .failed
286+ else JobStatus .running .name
287+ )
288+ )
289+
290+ if self .should_process_job (job ):
291+ reana_job_id = self .get_reana_job_id (job_id )
292+
293+ if job_status == JobStatus .failed .name :
294+ self .log_disruption (
295+ event ["object" ].status .conditions , job_id
296+ )
297+
298+ # TODO: fetch logs from pod on remote worker when MultiKueue supports this
299+ # logs = self.job_manager_cls.get_logs(job_id)
300+ # store_job_logs(reana_job_id, logs)
260301
261- if job_status == JobStatus . failed . name :
262- self . log_disruption (
263- event [ "object" ]. status . conditions , backend_job_id
302+ update_job_status (
303+ reana_job_id ,
304+ job_status ,
264305 )
265306
266- store_job_logs ( reana_job_id , logs )
267- update_job_status ( reana_job_id , job_status )
307+ if JobStatus . should_cleanup_job ( job_status ):
308+ self . clean_job ( job_id )
268309
269- if JobStatus .should_cleanup_job (job_status ):
270- self .clean_job (backend_job_id )
271- except client .rest .ApiException as e :
272- logging .exception (
273- f"Error from Kubernetes API while watching jobs pods: { e } "
274- )
275- except Exception as e :
276- logging .error (traceback .format_exc ())
277- logging .error ("Unexpected error: {}" .format (e ))
310+ except client .rest .ApiException as e :
311+ logging .exception (
312+ f"Error from Kubernetes API while watching jobs: { e } "
313+ )
314+ except Exception as e :
315+ logging .error (traceback .format_exc ())
316+ logging .error ("Unexpected error: {}" .format (e ))
317+ else :
318+ while True :
319+ try :
320+ w = watch .Watch ()
321+ for event in w .stream (
322+ current_k8s_corev1_api_client .list_namespaced_pod ,
323+ namespace = REANA_RUNTIME_KUBERNETES_NAMESPACE ,
324+ label_selector = f"reana-run-job-workflow-uuid={ self .workflow_uuid } " ,
325+ ):
326+ logging .info (
327+ "New Pod event received: {0}" .format (event ["type" ])
328+ )
329+ job_pod = event ["object" ]
330+
331+ # Each job is processed once, when reaching a final state
332+ # (either successfully or not)
333+ if self .should_process_job_pod (job_pod ):
334+ job_status = self .get_job_status (job_pod )
335+ backend_job_id = self .get_backend_job_id (job_pod )
336+ reana_job_id = self .get_reana_job_id (backend_job_id )
337+
338+ logs = self .job_manager_cls .get_logs (
339+ backend_job_id , job_pod = job_pod
340+ )
341+
342+ if job_status == JobStatus .failed .name :
343+ self .log_disruption (
344+ event ["object" ].status .conditions , backend_job_id
345+ )
346+
347+ store_job_logs (reana_job_id , logs )
348+ update_job_status (reana_job_id , job_status )
349+
350+ if JobStatus .should_cleanup_job (job_status ):
351+ self .clean_job (backend_job_id )
352+ except client .rest .ApiException as e :
353+ logging .exception (
354+ f"Error from Kubernetes API while watching jobs pods: { e } "
355+ )
356+ except Exception as e :
357+ logging .error (traceback .format_exc ())
358+ logging .error ("Unexpected error: {}" .format (e ))
278359
279360 def log_disruption (self , conditions , backend_job_id ):
280361 """Log disruption message from Kubernetes event conditions.
0 commit comments