Skip to content

Commit ce56a7a

Browse files
Make P2P more configurable (#8469)
1 parent 276fc35 commit ce56a7a

File tree

14 files changed

+109
-32
lines changed

14 files changed

+109
-32
lines changed

distributed/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
"distributed.scheduler.events-log-length": "distributed.admin.low-level-log-length",
5858
"recent-messages-log-length": "distributed.admin.low-level-log-length",
5959
"distributed.comm.recent-messages-log-length": "distributed.admin.low-level-log-length",
60+
"distributed.p2p.disk": "distributed.p2p.storage.disk",
6061
}
6162

6263
# Affects yaml and env variables configs, as well as calls to dask.config.set()

distributed/distributed-schema.yaml

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,6 +1046,28 @@ properties:
10461046
description: Configuration settings for Dask communications specific to P2P
10471047
properties:
10481048

1049+
buffer:
1050+
type:
1051+
- string
1052+
- integer
1053+
description: |
1054+
The maximum amount of data for P2P's comm buffers to buffer in-memory per worker.
1055+
This limit is not absolute but used to apply back pressure.
1056+
concurrency:
1057+
type: integer
1058+
description: Number of concurrent background tasks used for IO-intensive operations in per P2P comm buffer.
1059+
message-bytes-limit:
1060+
type:
1061+
- string
1062+
- integer
1063+
description: |
1064+
The maximum amount of data for P2P to send to another worker in a single operation
1065+
1066+
Data is sent in batches, and if the first shard is larger than this value,
1067+
the task shard still be sent to ensure progress. Hence, this limit is not absolute.
1068+
Note that this limit applies to a single send operation and a worker may send data to
1069+
multiple workers in parallel.
1070+
10491071
retry:
10501072
type: object
10511073
description: |
@@ -1067,10 +1089,28 @@ properties:
10671089
max:
10681090
type: string
10691091
description: The maximum delay between retries
1070-
disk:
1071-
type: boolean
1072-
description: |
1073-
Whether or not P2P stores intermediate data on disk instead of memory
1092+
1093+
storage:
1094+
type: object
1095+
description: Configuration settings for P2P storage
1096+
properties:
1097+
1098+
buffer:
1099+
type:
1100+
- string
1101+
- integer
1102+
description: |
1103+
The maximum amount of data for P2P's disk buffers to buffer in-memory per worker
1104+
This limit is not absolute but used to apply back pressure.
1105+
disk:
1106+
type: boolean
1107+
description: |
1108+
Whether or not P2P stores intermediate data on disk instead of memory
1109+
threads:
1110+
type:
1111+
- integer
1112+
- "null"
1113+
description: Number of threads used for CPU-intensive operations per worker. Defaults to number of worker threads.
10741114

10751115
dashboard:
10761116
type: object

distributed/distributed.yaml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,12 +306,18 @@ distributed:
306306

307307
p2p:
308308
comm:
309+
buffer: 1 GiB
310+
concurrency: 10
311+
message-bytes-limit: 2 MiB
309312
retry:
310313
count: 10
311314
delay:
312315
min: 1s # the first non-zero delay between re-tries
313316
max: 30s # the maximum delay between re-tries
314-
disk: True
317+
storage:
318+
buffer: 100 MiB
319+
disk: True
320+
threads: null
315321

316322
###################
317323
# Bokeh dashboard #

distributed/shuffle/_buffer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ def _continue() -> bool:
149149
if self.max_message_size > 0:
150150
size = 0
151151
shards = []
152+
# FIXME: We always exceed the limit, not just on the first shard.
152153
while size < self.max_message_size:
153154
try:
154155
shard = self.shards[part_id].pop()

distributed/shuffle/_comms.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
from collections.abc import Awaitable, Callable
44
from typing import Any
55

6-
from dask.utils import parse_bytes
7-
86
from distributed.core import ErrorMessage, OKMessage, clean_exception
97
from distributed.metrics import context_meter
108
from distributed.shuffle._disk import ShardsBuffer
@@ -49,20 +47,19 @@ class CommShardsBuffer(ShardsBuffer):
4947
Number of background tasks to run.
5048
"""
5149

52-
max_message_size = parse_bytes("2 MiB")
53-
5450
def __init__(
5551
self,
5652
send: Callable[
5753
[str, list[tuple[Any, Any]]], Awaitable[OKMessage | ErrorMessage]
5854
],
5955
memory_limiter: ResourceLimiter,
60-
concurrency_limit: int = 10,
56+
max_message_size: int,
57+
concurrency_limit: int,
6158
):
6259
super().__init__(
6360
memory_limiter=memory_limiter,
6461
concurrency_limit=concurrency_limit,
65-
max_message_size=CommShardsBuffer.max_message_size,
62+
max_message_size=max_message_size,
6663
)
6764
self.send = send
6865

distributed/shuffle/_core.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import dask.config
2828
from dask.core import flatten
2929
from dask.typing import Key
30-
from dask.utils import parse_timedelta
30+
from dask.utils import parse_bytes, parse_timedelta
3131

3232
from distributed.core import ErrorMessage, OKMessage, PooledRPCCall, error_message
3333
from distributed.exceptions import Reschedule
@@ -132,8 +132,15 @@ def __init__(
132132
self._disk_buffer = MemoryShardsBuffer(deserialize=self.deserialize)
133133

134134
with self._capture_metrics("background-comms"):
135+
max_message_size = parse_bytes(
136+
dask.config.get("distributed.p2p.comm.message-bytes-limit")
137+
)
138+
concurrency_limit = dask.config.get("distributed.p2p.comm.concurrency")
135139
self._comm_buffer = CommShardsBuffer(
136-
send=self.send, memory_limiter=memory_limiter_comms
140+
send=self.send,
141+
max_message_size=max_message_size,
142+
memory_limiter=memory_limiter_comms,
143+
concurrency_limit=concurrency_limit,
137144
)
138145

139146
# TODO: reduce number of connections to number of workers

distributed/shuffle/_merge.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ def hash_join_p2p(
108108
lhs = _calculate_partitions(lhs, left_on, npartitions)
109109
rhs = _calculate_partitions(rhs, right_on, npartitions)
110110
merge_name = "hash-join-" + tokenize(lhs, rhs, **merge_kwargs)
111-
disk: bool = dask.config.get("distributed.p2p.disk")
111+
disk: bool = dask.config.get("distributed.p2p.storage.disk")
112112
join_layer = HashJoinP2PLayer(
113113
name=merge_name,
114114
name_input_left=lhs._name,

distributed/shuffle/_rechunk.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ def rechunk_p2p(
237237

238238
token = tokenize(x, chunks)
239239
name = rechunk_name(token)
240-
disk: bool = dask.config.get("distributed.p2p.disk")
240+
disk: bool = dask.config.get("distributed.p2p.storage.disk")
241241

242242
layer = P2PRechunkLayer(
243243
name=name,

distributed/shuffle/_shuffle.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ def rearrange_by_column_p2p(
142142
)
143143

144144
name = f"shuffle_p2p-{token}"
145-
disk: bool = dask.config.get("distributed.p2p.disk")
145+
disk: bool = dask.config.get("distributed.p2p.storage.disk")
146146

147147
layer = P2PShuffleLayer(
148148
name,

distributed/shuffle/_worker_plugin.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from concurrent.futures import ThreadPoolExecutor
77
from typing import TYPE_CHECKING, Any, overload
88

9+
import dask
910
from dask.context import thread_state
1011
from dask.typing import Key
1112
from dask.utils import parse_bytes
@@ -283,14 +284,19 @@ def setup(self, worker: Worker) -> None:
283284
# Initialize
284285
self.worker = worker
285286
self.shuffle_runs = _ShuffleRunManager(self)
287+
comm_limit = parse_bytes(dask.config.get("distributed.p2p.comm.buffer"))
286288
self.memory_limiter_comms = ResourceLimiter(
287-
parse_bytes("100 MiB"), metrics_label="p2p-comms-limiter"
289+
comm_limit, metrics_label="p2p-comms-limiter"
288290
)
291+
storage_limit = parse_bytes(dask.config.get("distributed.p2p.storage.buffer"))
289292
self.memory_limiter_disk = ResourceLimiter(
290-
parse_bytes("1 GiB"), metrics_label="p2p-disk-limiter"
293+
storage_limit, metrics_label="p2p-disk-limiter"
291294
)
292295
self.closed = False
293-
self._executor = ThreadPoolExecutor(self.worker.state.nthreads)
296+
nthreads = (
297+
dask.config.get("distributed.p2p.threads") or self.worker.state.nthreads
298+
)
299+
self._executor = ThreadPoolExecutor(nthreads)
294300

295301
def __str__(self) -> str:
296302
return f"ShuffleWorkerPlugin on {self.worker.address}"

0 commit comments

Comments
 (0)