Skip to content

Commit 15d0b97

Browse files
committed
feat(helm): add initial Dask support (#701)
1 parent 70e8ad9 commit 15d0b97

File tree

6 files changed

+243
-5
lines changed

6 files changed

+243
-5
lines changed

AUTHORS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
The list of contributors in alphabetical order:
44

55
- [Adelina Lintuluoto](https://orcid.org/0000-0002-0726-1452)
6+
- [Alp Tuna](https://orcid.org/0009-0001-1915-3993)
67
- [Anton Khodak](https://orcid.org/0000-0003-3263-4553)
78
- [Audrius Mecionis](https://orcid.org/0000-0002-3759-1663)
89
- [Bruno Rosendo](https://orcid.org/0000-0002-0923-3148)

docs/openapi.json

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,26 @@
429429
"slurmcern"
430430
]
431431
},
432+
"dask_cluster_default_number_of_workers": {
433+
"title": "The number of Dask workers created by default",
434+
"value": "2Gi"
435+
},
436+
"dask_cluster_default_single_worker_memory": {
437+
"title": "The amount of memory used by default by a single Dask worker",
438+
"value": "2Gi"
439+
},
440+
"dask_cluster_max_memory_limit": {
441+
"title": "The maximum memory limit for Dask clusters created by users",
442+
"value": "16Gi"
443+
},
444+
"dask_cluster_max_single_worker_memory": {
445+
"title": "The maximum amount of memory that users can ask for the single Dask worker",
446+
"value": "8Gi"
447+
},
448+
"dask_enabled": {
449+
"title": "Dask workflows allowed in the cluster",
450+
"value": "False"
451+
},
432452
"default_kubernetes_jobs_timeout": {
433453
"title": "Default timeout for Kubernetes jobs",
434454
"value": "604800"
@@ -479,6 +499,61 @@
479499
},
480500
"type": "object"
481501
},
502+
"dask_cluster_default_number_of_workers": {
503+
"properties": {
504+
"title": {
505+
"type": "string"
506+
},
507+
"value": {
508+
"type": "string"
509+
}
510+
},
511+
"type": "object"
512+
},
513+
"dask_cluster_default_single_worker_memory": {
514+
"properties": {
515+
"title": {
516+
"type": "string"
517+
},
518+
"value": {
519+
"type": "string"
520+
}
521+
},
522+
"type": "object"
523+
},
524+
"dask_cluster_max_memory_limit": {
525+
"properties": {
526+
"title": {
527+
"type": "string"
528+
},
529+
"value": {
530+
"type": "string"
531+
}
532+
},
533+
"type": "object"
534+
},
535+
"dask_cluster_max_single_worker_memory": {
536+
"properties": {
537+
"title": {
538+
"type": "string"
539+
},
540+
"value": {
541+
"type": "string"
542+
}
543+
},
544+
"type": "object"
545+
},
546+
"dask_enabled": {
547+
"properties": {
548+
"title": {
549+
"type": "string"
550+
},
551+
"value": {
552+
"type": "string"
553+
}
554+
},
555+
"type": "object"
556+
},
482557
"default_kubernetes_jobs_timeout": {
483558
"properties": {
484559
"title": {

reana_server/config.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
#
33
# This file is part of REANA.
4-
# Copyright (C) 2017, 2018, 2019, 2020, 2021, 2022, 2023 CERN.
4+
# Copyright (C) 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024 CERN.
55
#
66
# REANA is free software; you can redistribute it and/or modify it
77
# under the terms of the MIT License; see LICENSE file for more details.
@@ -58,6 +58,29 @@
5858
os.getenv("LOGIN_PROVIDERS_SECRETS", "{}")
5959
)
6060

61+
DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true"))
62+
"""Whether Dask is enabled in the cluster or not"""
63+
64+
REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT = os.getenv(
65+
"REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT", "16Gi"
66+
)
67+
"""Maximum memory limit for Dask clusters."""
68+
69+
REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS = int(
70+
os.getenv("REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS", 2)
71+
)
72+
"""Number of workers in Dask cluster by default """
73+
74+
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY = os.getenv(
75+
"REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY", "2Gi"
76+
)
77+
"""Memory for one Dask worker by default."""
78+
79+
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY = os.getenv(
80+
"REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY", "8Gi"
81+
)
82+
"""Maximum memory for one Dask worker."""
83+
6184
REANA_KUBERNETES_JOBS_MEMORY_LIMIT = os.getenv("REANA_KUBERNETES_JOBS_MEMORY_LIMIT")
6285
"""Maximum memory limit for user job containers for workflow complexity estimation."""
6386

reana_server/rest/info.py

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
#
33
# This file is part of REANA.
4-
# Copyright (C) 2021, 2022 CERN.
4+
# Copyright (C) 2021, 2022, 2024 CERN.
55
#
66
# REANA is free software; you can redistribute it and/or modify it
77
# under the terms of the MIT License; see LICENSE file for more details.
@@ -24,6 +24,11 @@
2424
REANA_KUBERNETES_JOBS_TIMEOUT_LIMIT,
2525
REANA_KUBERNETES_JOBS_MAX_USER_TIMEOUT_LIMIT,
2626
REANA_INTERACTIVE_SESSION_MAX_INACTIVITY_PERIOD,
27+
DASK_ENABLED,
28+
REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS,
29+
REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT,
30+
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
31+
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY,
2732
)
2833
from reana_server.decorators import signin_required
2934

@@ -125,6 +130,41 @@ def info(user, **kwargs): # noqa
125130
type: string
126131
type: array
127132
type: object
133+
dask_enabled:
134+
properties:
135+
title:
136+
type: string
137+
value:
138+
type: string
139+
type: object
140+
dask_cluster_max_memory_limit:
141+
properties:
142+
title:
143+
type: string
144+
value:
145+
type: string
146+
type: object
147+
dask_cluster_default_number_of_workers:
148+
properties:
149+
title:
150+
type: string
151+
value:
152+
type: string
153+
type: object
154+
dask_cluster_default_single_worker_memory:
155+
properties:
156+
title:
157+
type: string
158+
value:
159+
type: string
160+
type: object
161+
dask_cluster_max_single_worker_memory:
162+
properties:
163+
title:
164+
type: string
165+
value:
166+
type: string
167+
type: object
128168
type: object
129169
examples:
130170
application/json:
@@ -165,6 +205,26 @@ def info(user, **kwargs): # noqa
165205
"title": "Maximum timeout for Kubernetes jobs",
166206
"value": "1209600"
167207
},
208+
"dask_enabled": {
209+
"title": "Dask workflows allowed in the cluster",
210+
"value": "False"
211+
},
212+
"dask_cluster_max_memory_limit": {
213+
"title": "The maximum memory limit for Dask clusters created by users",
214+
"value": "16Gi"
215+
},
216+
"dask_cluster_default_number_of_workers": {
217+
"title": "The number of Dask workers created by default",
218+
"value": "2Gi"
219+
},
220+
"dask_cluster_default_single_worker_memory": {
221+
"title": "The amount of memory used by default by a single Dask worker",
222+
"value": "2Gi"
223+
},
224+
"dask_cluster_max_single_worker_memory": {
225+
"title": "The maximum amount of memory that users can ask for the single Dask worker",
226+
"value": "8Gi"
227+
},
168228
}
169229
500:
170230
description: >-
@@ -217,7 +277,29 @@ def info(user, **kwargs): # noqa
217277
title="Maximum inactivity period in days before automatic closure of interactive sessions",
218278
value=REANA_INTERACTIVE_SESSION_MAX_INACTIVITY_PERIOD,
219279
),
280+
dask_enabled=dict(
281+
title="Dask workflows allowed in the cluster",
282+
value=bool(DASK_ENABLED),
283+
),
220284
)
285+
if DASK_ENABLED:
286+
cluster_information["dask_cluster_default_number_of_workers"] = dict(
287+
title="The number of Dask workers created by default",
288+
value=REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS,
289+
)
290+
cluster_information["dask_cluster_max_memory_limit"] = dict(
291+
title="The maximum memory limit for Dask clusters created by users",
292+
value=REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT,
293+
)
294+
cluster_information["dask_cluster_default_single_worker_memory"] = dict(
295+
title="The amount of memory used by default by a single Dask worker",
296+
value=REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
297+
)
298+
cluster_information["dask_cluster_max_single_worker_memory"] = dict(
299+
title="The maximum amount of memory that users can ask for the single Dask worker",
300+
value=REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY,
301+
)
302+
221303
return InfoSchema().dump(cluster_information)
222304

223305
except Exception as e:
@@ -260,3 +342,10 @@ class InfoSchema(Schema):
260342
maximum_interactive_session_inactivity_period = fields.Nested(
261343
StringNullableInfoValue
262344
)
345+
kubernetes_max_memory_limit = fields.Nested(StringInfoValue)
346+
dask_enabled = fields.Nested(StringInfoValue)
347+
if DASK_ENABLED:
348+
dask_cluster_default_number_of_workers = fields.Nested(StringInfoValue)
349+
dask_cluster_max_memory_limit = fields.Nested(StringInfoValue)
350+
dask_cluster_default_single_worker_memory = fields.Nested(StringInfoValue)
351+
dask_cluster_max_single_worker_memory = fields.Nested(StringInfoValue)

reana_server/rest/workflows.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
validate_inputs,
5050
validate_workflow,
5151
validate_workspace_path,
52+
validate_dask_memory_and_cores_limits,
5253
)
5354
from webargs import fields, validate
5455
from webargs.flaskparser import use_kwargs
@@ -566,6 +567,8 @@ def create_workflow(user): # noqa
566567

567568
validate_inputs(reana_spec_file)
568569

570+
validate_dask_memory_and_cores_limits(reana_spec_file)
571+
569572
retention_days = reana_spec_file.get("workspace", {}).get("retention_days")
570573
retention_rules = get_workspace_retention_rules(retention_days)
571574

reana_server/validation.py

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
#
33
# This file is part of REANA.
4-
# Copyright (C) 2022 CERN.
4+
# Copyright (C) 2022, 2024 CERN.
55
#
66
# REANA is free software; you can redistribute it and/or modify it
77
# under the terms of the MIT License; see LICENSE file for more details.
@@ -18,8 +18,17 @@
1818
from reana_commons.validation.operational_options import validate_operational_options
1919
from reana_commons.validation.parameters import build_parameters_validator
2020
from reana_commons.validation.utils import validate_reana_yaml, validate_workspace
21-
22-
from reana_server.config import SUPPORTED_COMPUTE_BACKENDS, WORKSPACE_RETENTION_PERIOD
21+
from reana_commons.job_utils import kubernetes_memory_to_bytes
22+
23+
from reana_server.config import (
24+
SUPPORTED_COMPUTE_BACKENDS,
25+
WORKSPACE_RETENTION_PERIOD,
26+
DASK_ENABLED,
27+
REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT,
28+
REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS,
29+
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
30+
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY,
31+
)
2332
from reana_server import utils
2433

2534

@@ -153,3 +162,41 @@ def validate_retention_rule(rule: str, days: int) -> None:
153162
"Maximum workflow retention period was reached. "
154163
f"Please use less than {WORKSPACE_RETENTION_PERIOD} days."
155164
)
165+
166+
167+
def validate_dask_memory_and_cores_limits(reana_yaml: Dict) -> None:
168+
"""Validate Dask workflows are allowed in the cluster and memory limits are respected."""
169+
# Validate Dask workflows are allowed in the cluster
170+
dask_resources = reana_yaml["workflow"].get("resources", {}).get("dask", {})
171+
if not DASK_ENABLED and dask_resources != {}:
172+
raise REANAValidationError("Dask workflows are not allowed in this cluster.")
173+
174+
# Validate Dask memory limit requested by the workflow
175+
if dask_resources:
176+
single_worker_memory = dask_resources.get(
177+
"single_worker_memory", REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY
178+
)
179+
if kubernetes_memory_to_bytes(
180+
single_worker_memory
181+
) > kubernetes_memory_to_bytes(REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY):
182+
raise REANAValidationError(
183+
f'The "single_worker_memory" provided in the dask resources exceeds the limit ({REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY}).'
184+
)
185+
186+
number_of_workers = int(
187+
dask_resources.get(
188+
"number_of_workers", REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS
189+
)
190+
)
191+
requested_dask_cluster_memory = (
192+
kubernetes_memory_to_bytes(single_worker_memory) * number_of_workers
193+
)
194+
195+
if requested_dask_cluster_memory > kubernetes_memory_to_bytes(
196+
REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT
197+
):
198+
raise REANAValidationError(
199+
f'The "memory" requested in the dask resources exceeds the limit ({REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT}).\nDecrease the number of workers requested or amount of memory consumed by a single worker.'
200+
)
201+
202+
return None

0 commit comments

Comments
 (0)