Skip to content

Commit afd9f8d

Browse files
feat(rest): add kueue_enabled to info endpoint (#746)
Closes reanahub/reana-job-controller#493
1 parent e04d1fd commit afd9f8d

File tree

4 files changed

+109
-2
lines changed

4 files changed

+109
-2
lines changed

docs/openapi.json

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,17 @@
500500
"title": "Default memory limit for Kubernetes jobs",
501501
"value": "3Gi"
502502
},
503+
"kueue_available_queues": {
504+
"title": "List of local queues available for batch job processing",
505+
"value": [
506+
"local-queue-1",
507+
"local-queue-2"
508+
]
509+
},
510+
"kueue_enabled": {
511+
"title": "Whether Kueue is enabled for batch job processing",
512+
"value": "False"
513+
},
503514
"maximum_kubernetes_jobs_timeout": {
504515
"title": "Maximum timeout for Kubernetes jobs",
505516
"value": "1209600"
@@ -750,6 +761,31 @@
750761
},
751762
"type": "object"
752763
},
764+
"kueue_available_queues": {
765+
"properties": {
766+
"title": {
767+
"type": "string"
768+
},
769+
"value": {
770+
"items": {
771+
"type": "string"
772+
},
773+
"type": "array"
774+
}
775+
},
776+
"type": "object"
777+
},
778+
"kueue_enabled": {
779+
"properties": {
780+
"title": {
781+
"type": "string"
782+
},
783+
"value": {
784+
"type": "string"
785+
}
786+
},
787+
"type": "object"
788+
},
753789
"maximum_interactive_session_inactivity_period": {
754790
"properties": {
755791
"title": {

reana_server/config.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,17 @@
9999
)
100100
"""Maximum number of threads for one Dask worker."""
101101

102+
KUEUE_ENABLED = bool(strtobool(os.getenv("KUEUE_ENABLED", "False")))
103+
"""Whether to use Kueue to manage job execution."""
104+
105+
KUEUE_AVAILABLE_QUEUES: list[dict] = (
106+
json.loads(os.getenv("KUEUE_AVAILABLE_QUEUES", "[]")) or []
107+
)
108+
"""List of local queues available for workflow job processing."""
109+
110+
KUEUE_DEFAULT_QUEUE: str = os.getenv("KUEUE_DEFAULT_QUEUE", "")
111+
"""Default queue to send workflow jobs to."""
112+
102113
REANA_KUBERNETES_JOBS_MEMORY_LIMIT = os.getenv("REANA_KUBERNETES_JOBS_MEMORY_LIMIT")
103114
"""Maximum memory limit for user job containers for workflow complexity estimation."""
104115

reana_server/rest/info.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@
3636
REANA_DASK_CLUSTER_MAX_NUMBER_OF_WORKERS,
3737
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_THREADS,
3838
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_THREADS,
39+
KUEUE_ENABLED,
40+
KUEUE_AVAILABLE_QUEUES,
41+
KUEUE_DEFAULT_QUEUE,
3942
)
4043
from reana_server.decorators import signin_required
4144

@@ -97,6 +100,22 @@ def info(user, **kwargs): # noqa
97100
value:
98101
type: string
99102
type: object
103+
kueue_enabled:
104+
properties:
105+
title:
106+
type: string
107+
value:
108+
type: string
109+
type: object
110+
kueue_available_queues:
111+
properties:
112+
title:
113+
type: string
114+
value:
115+
items:
116+
type: string
117+
type: array
118+
type: object
100119
kubernetes_max_memory_limit:
101120
properties:
102121
title:
@@ -291,6 +310,17 @@ def info(user, **kwargs): # noqa
291310
"title": "Default memory limit for Kubernetes jobs",
292311
"value": "3Gi"
293312
},
313+
"kueue_enabled": {
314+
"title": "Whether Kueue is enabled for batch job processing",
315+
"value": "False"
316+
},
317+
"kueue_available_queues": {
318+
"title": "List of local queues available for batch job processing",
319+
"value": [
320+
"local-queue-1",
321+
"local-queue-2"
322+
]
323+
},
294324
"kubernetes_max_memory_limit": {
295325
"title": "Maximum allowed memory limit for Kubernetes jobs",
296326
"value": "10Gi"
@@ -419,6 +449,10 @@ def info(user, **kwargs): # noqa
419449
title="Default memory limit for Kubernetes jobs",
420450
value=REANA_KUBERNETES_JOBS_MEMORY_LIMIT,
421451
),
452+
kueue_enabled=dict(
453+
title="Kueue enabled for batch job processing",
454+
value=KUEUE_ENABLED,
455+
),
422456
kubernetes_max_memory_limit=dict(
423457
title="Maximum allowed memory limit for Kubernetes jobs",
424458
value=REANA_KUBERNETES_JOBS_MAX_USER_MEMORY_LIMIT,
@@ -479,6 +513,20 @@ def info(user, **kwargs): # noqa
479513
),
480514
)
481515

516+
if KUEUE_ENABLED:
517+
cluster_information["kueue_available_queues"] = dict(
518+
title="Local queues available for batch job processing",
519+
value=[
520+
f"{queue["node"]}-{queue["name"]} ({queue['description']})"
521+
for queue in KUEUE_AVAILABLE_QUEUES
522+
],
523+
)
524+
525+
cluster_information["kueue_default_queue"] = dict(
526+
title="Default queue to send workflow jobs to",
527+
value=KUEUE_DEFAULT_QUEUE,
528+
)
529+
482530
if DASK_ENABLED:
483531
cluster_information["dask_autoscaler_enabled"] = dict(
484532
title="Dask autoscaler enabled in the cluster",
@@ -555,7 +603,6 @@ class InfoSchema(Schema):
555603
maximum_interactive_session_inactivity_period = fields.Nested(
556604
StringNullableInfoValue
557605
)
558-
kubernetes_max_memory_limit = fields.Nested(StringInfoValue)
559606
interactive_session_recommended_jupyter_images = fields.Nested(ListStringInfoValue)
560607
interactive_sessions_custom_image_allowed = fields.Nested(StringInfoValue)
561608
supported_workflow_engines = fields.Nested(ListStringInfoValue)
@@ -566,6 +613,7 @@ class InfoSchema(Schema):
566613
yadage_engine_packtivity_version = fields.Nested(StringInfoValue)
567614
snakemake_engine_version = fields.Nested(StringInfoValue)
568615
dask_enabled = fields.Nested(StringInfoValue)
616+
569617
if DASK_ENABLED:
570618
dask_autoscaler_enabled = fields.Nested(StringInfoValue)
571619
dask_cluster_default_number_of_workers = fields.Nested(StringInfoValue)
@@ -575,3 +623,7 @@ class InfoSchema(Schema):
575623
dask_cluster_max_number_of_workers = fields.Nested(StringInfoValue)
576624
dask_cluster_default_single_worker_threads = fields.Nested(StringInfoValue)
577625
dask_cluster_max_single_worker_threads = fields.Nested(StringInfoValue)
626+
627+
kueue_enabled = fields.Nested(StringInfoValue)
628+
if KUEUE_ENABLED:
629+
kueue_available_queues = fields.Nested(ListStringInfoValue)

reana_server/validation.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@
1010

1111
import itertools
1212
import pathlib
13-
from typing import Dict, List
13+
from typing import Dict
1414

1515
from reana_commons.config import WORKSPACE_PATHS
1616
from reana_commons.errors import REANAValidationError
1717
from reana_commons.validation.compute_backends import build_compute_backends_validator
18+
from reana_commons.validation.kubernetes_queues import validate_kubernetes_queues
1819
from reana_commons.validation.operational_options import validate_operational_options
1920
from reana_commons.validation.parameters import build_parameters_validator
2021
from reana_commons.validation.utils import validate_reana_yaml, validate_workspace
@@ -31,6 +32,8 @@
3132
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY,
3233
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_THREADS,
3334
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_THREADS,
35+
KUEUE_AVAILABLE_QUEUES,
36+
KUEUE_ENABLED,
3437
)
3538
from reana_server import utils
3639

@@ -137,6 +140,11 @@ def validate_workflow(reana_yaml: Dict, input_parameters: Dict) -> Dict:
137140
validate_compute_backends(reana_yaml)
138141
validate_workspace_path(reana_yaml)
139142
validate_inputs(reana_yaml)
143+
validate_kubernetes_queues(
144+
reana_yaml,
145+
KUEUE_ENABLED,
146+
supported_queues=[queue.get("name") for queue in KUEUE_AVAILABLE_QUEUES],
147+
)
140148
return reana_yaml_warnings
141149

142150

0 commit comments

Comments
 (0)