Skip to content

Commit 5b82925

Browse files
committed
feat(config): make number of threads configurable for Dask (#719)
Following the integration of Dask into REANA and discussions with several REANA users, we identified the need to make the number of threads configurable. This commit adds the number of threads configuration to the `/api/info` endpoint. It also validates that the requested number of threads for a single Dask worker does not exceed the defined upper limit. Closes reanahub/reana#874
1 parent aae2ccb commit 5b82925

File tree

6 files changed

+96
-10
lines changed

6 files changed

+96
-10
lines changed

docs/openapi.json

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,10 @@
449449
"title": "The amount of memory used by default by a single Dask worker",
450450
"value": "2Gi"
451451
},
452+
"dask_cluster_default_single_worker_threads": {
453+
"title": "The number of threads used by default by a single Dask worker",
454+
"value": "4"
455+
},
452456
"dask_cluster_max_memory_limit": {
453457
"title": "The maximum memory limit for Dask clusters created by users",
454458
"value": "16Gi"
@@ -461,6 +465,10 @@
461465
"title": "The maximum amount of memory that users can ask for the single Dask worker",
462466
"value": "8Gi"
463467
},
468+
"dask_cluster_max_single_worker_threads": {
469+
"title": "The maximum number of threads that users can ask for the single Dask worker",
470+
"value": "8"
471+
},
464472
"dask_enabled": {
465473
"title": "Dask workflows allowed in the cluster",
466474
"value": "False"
@@ -606,6 +614,17 @@
606614
},
607615
"type": "object"
608616
},
617+
"dask_cluster_default_single_worker_threads": {
618+
"properties": {
619+
"title": {
620+
"type": "string"
621+
},
622+
"value": {
623+
"type": "string"
624+
}
625+
},
626+
"type": "object"
627+
},
609628
"dask_cluster_max_memory_limit": {
610629
"properties": {
611630
"title": {
@@ -639,6 +658,17 @@
639658
},
640659
"type": "object"
641660
},
661+
"dask_cluster_max_single_worker_threads": {
662+
"properties": {
663+
"title": {
664+
"type": "string"
665+
},
666+
"value": {
667+
"type": "string"
668+
}
669+
},
670+
"type": "object"
671+
},
642672
"dask_enabled": {
643673
"properties": {
644674
"title": {

reana_server/config.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,10 @@
5959
)
6060

6161
DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true"))
62-
"""Whether Dask is enabled in the cluster or not"""
62+
"""Whether Dask is enabled in the cluster or not."""
6363

6464
DASK_AUTOSCALER_ENABLED = os.getenv("DASK_AUTOSCALER_ENABLED", "true").lower() == "true"
65-
"""Whether Dask autoscaler is enabled in the cluster or not"""
65+
"""Whether Dask autoscaler is enabled in the cluster or not."""
6666

6767
REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT = os.getenv(
6868
"REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT", "16Gi"
@@ -72,7 +72,7 @@
7272
REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS = int(
7373
os.getenv("REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS", 2)
7474
)
75-
"""Number of workers in Dask cluster by default """
75+
"""Number of workers in Dask cluster by default."""
7676

7777
REANA_DASK_CLUSTER_MAX_NUMBER_OF_WORKERS = int(
7878
os.getenv("REANA_DASK_CLUSTER_MAX_NUMBER_OF_WORKERS", 20)
@@ -89,6 +89,16 @@
8989
)
9090
"""Maximum memory for one Dask worker."""
9191

92+
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_THREADS = int(
93+
os.getenv("REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_THREADS", 4)
94+
)
95+
"""Number of threads for one Dask worker by default."""
96+
97+
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_THREADS = int(
98+
os.getenv("REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_THREADS", 8)
99+
)
100+
"""Maximum number of threads for one Dask worker."""
101+
92102
REANA_KUBERNETES_JOBS_MEMORY_LIMIT = os.getenv("REANA_KUBERNETES_JOBS_MEMORY_LIMIT")
93103
"""Maximum memory limit for user job containers for workflow complexity estimation."""
94104

reana_server/rest/info.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
#
33
# This file is part of REANA.
4-
# Copyright (C) 2021, 2022, 2024 CERN.
4+
# Copyright (C) 2021, 2022, 2024, 2025 CERN.
55
#
66
# REANA is free software; you can redistribute it and/or modify it
77
# under the terms of the MIT License; see LICENSE file for more details.
@@ -34,6 +34,8 @@
3434
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
3535
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY,
3636
REANA_DASK_CLUSTER_MAX_NUMBER_OF_WORKERS,
37+
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_THREADS,
38+
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_THREADS,
3739
)
3840
from reana_server.decorators import signin_required
3941

@@ -237,6 +239,13 @@ def info(user, **kwargs): # noqa
237239
value:
238240
type: string
239241
type: object
242+
dask_cluster_default_single_worker_threads:
243+
properties:
244+
title:
245+
type: string
246+
value:
247+
type: string
248+
type: object
240249
dask_cluster_max_single_worker_memory:
241250
properties:
242251
title:
@@ -251,6 +260,13 @@ def info(user, **kwargs): # noqa
251260
value:
252261
type: string
253262
type: object
263+
dask_cluster_max_single_worker_threads:
264+
properties:
265+
title:
266+
type: string
267+
value:
268+
type: string
269+
type: object
254270
type: object
255271
examples:
256272
application/json:
@@ -355,6 +371,10 @@ def info(user, **kwargs): # noqa
355371
"title": "The amount of memory used by default by a single Dask worker",
356372
"value": "2Gi"
357373
},
374+
"dask_cluster_default_single_worker_threads": {
375+
"title": "The number of threads used by default by a single Dask worker",
376+
"value": "4"
377+
},
358378
"dask_cluster_max_single_worker_memory": {
359379
"title": "The maximum amount of memory that users can ask for the single Dask worker",
360380
"value": "8Gi"
@@ -363,6 +383,10 @@ def info(user, **kwargs): # noqa
363383
"title": "The maximum number of workers that users can ask for the single Dask cluster",
364384
"value": "20"
365385
},
386+
"dask_cluster_max_single_worker_threads": {
387+
"title": "The maximum number of threads that users can ask for the single Dask worker",
388+
"value": "8"
389+
},
366390
}
367391
500:
368392
description: >-
@@ -480,6 +504,14 @@ def info(user, **kwargs): # noqa
480504
title="The maximum number of workers that users can ask for the single Dask cluster",
481505
value=REANA_DASK_CLUSTER_MAX_NUMBER_OF_WORKERS,
482506
)
507+
cluster_information["dask_cluster_default_single_worker_threads"] = dict(
508+
title="The number of threads used by default by a single Dask worker",
509+
value=REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_THREADS,
510+
)
511+
cluster_information["dask_cluster_max_single_worker_threads"] = dict(
512+
title="The maximum number of threads that users can ask for the single Dask worker",
513+
value=REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_THREADS,
514+
)
483515

484516
return InfoSchema().dump(cluster_information)
485517

@@ -541,3 +573,5 @@ class InfoSchema(Schema):
541573
dask_cluster_default_single_worker_memory = fields.Nested(StringInfoValue)
542574
dask_cluster_max_single_worker_memory = fields.Nested(StringInfoValue)
543575
dask_cluster_max_number_of_workers = fields.Nested(StringInfoValue)
576+
dask_cluster_default_single_worker_threads = fields.Nested(StringInfoValue)
577+
dask_cluster_max_single_worker_threads = fields.Nested(StringInfoValue)

reana_server/rest/workflows.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
#
33
# This file is part of REANA.
4-
# Copyright (C) 2018, 2019, 2020, 2021, 2022, 2023, 2024 CERN.
4+
# Copyright (C) 2018, 2019, 2020, 2021, 2022, 2023, 2024, 2025 CERN.
55
#
66
# REANA is free software; you can redistribute it and/or modify it
77
# under the terms of the MIT License; see LICENSE file for more details.
@@ -49,7 +49,7 @@
4949
validate_inputs,
5050
validate_workflow,
5151
validate_workspace_path,
52-
validate_dask_memory_and_cores_limits,
52+
validate_dask_limits,
5353
)
5454
from webargs import fields, validate
5555
from webargs.flaskparser import use_kwargs
@@ -567,7 +567,7 @@ def create_workflow(user): # noqa
567567

568568
validate_inputs(reana_spec_file)
569569

570-
validate_dask_memory_and_cores_limits(reana_spec_file)
570+
validate_dask_limits(reana_spec_file)
571571

572572
retention_days = reana_spec_file.get("workspace", {}).get("retention_days")
573573
retention_rules = get_workspace_retention_rules(retention_days)

reana_server/validation.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
#
33
# This file is part of REANA.
4-
# Copyright (C) 2022, 2024 CERN.
4+
# Copyright (C) 2022, 2024, 2025 CERN.
55
#
66
# REANA is free software; you can redistribute it and/or modify it
77
# under the terms of the MIT License; see LICENSE file for more details.
@@ -29,6 +29,8 @@
2929
REANA_DASK_CLUSTER_MAX_NUMBER_OF_WORKERS,
3030
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
3131
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY,
32+
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_THREADS,
33+
REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_THREADS,
3234
)
3335
from reana_server import utils
3436

@@ -165,7 +167,7 @@ def validate_retention_rule(rule: str, days: int) -> None:
165167
)
166168

167169

168-
def validate_dask_memory_and_cores_limits(reana_yaml: Dict) -> None:
170+
def validate_dask_limits(reana_yaml: Dict) -> None:
169171
"""Validate Dask workflows are allowed in the cluster and memory limits are respected."""
170172
# Validate Dask workflows are allowed in the cluster
171173
dask_resources = reana_yaml["workflow"].get("resources", {}).get("dask", {})
@@ -195,6 +197,16 @@ def validate_dask_memory_and_cores_limits(reana_yaml: Dict) -> None:
195197
f"The number of requested Dask workers ({number_of_workers}) exceeds the maximum limit ({REANA_DASK_CLUSTER_MAX_NUMBER_OF_WORKERS})."
196198
)
197199

200+
single_worker_threads = dask_resources.get(
201+
"single_worker_threads",
202+
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_THREADS,
203+
)
204+
205+
if single_worker_threads > REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_THREADS:
206+
raise REANAValidationError(
207+
f'The "single_worker_threads" provided in the dask resources exceeds the limit ({REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_THREADS}).'
208+
)
209+
198210
requested_dask_cluster_memory = (
199211
kubernetes_memory_to_bytes(single_worker_memory) * number_of_workers
200212
)

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
"Flask>=2.1.1,<2.3.0", # same upper pin as invenio-base
4949
"gitpython>=3.1",
5050
"marshmallow>2.13.0,<3.0.0",
51-
"reana-commons[kubernetes,yadage,snakemake,cwl]>=0.95.0a7,<0.96.0",
51+
"reana-commons[kubernetes,yadage,snakemake,cwl]>=0.95.0a9,<0.96.0",
5252
"reana-db>=0.95.0a5,<0.96.0",
5353
"requests>=2.25.0",
5454
"tablib>=0.12.1",

0 commit comments

Comments
 (0)