-
Notifications
You must be signed in to change notification settings - Fork 624
[P/D][main] Clean connector history information #4650
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -8,11 +8,12 @@ | |||||||||||||||||||||
| import struct | ||||||||||||||||||||||
| import threading | ||||||||||||||||||||||
| import time | ||||||||||||||||||||||
| from collections import defaultdict, deque | ||||||||||||||||||||||
| from collections import OrderedDict, defaultdict, deque | ||||||||||||||||||||||
| from collections.abc import Iterator | ||||||||||||||||||||||
| from concurrent.futures import ThreadPoolExecutor | ||||||||||||||||||||||
| from dataclasses import dataclass | ||||||||||||||||||||||
| from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple | ||||||||||||||||||||||
| from uuid import uuid4 | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| import httpx | ||||||||||||||||||||||
| import msgspec | ||||||||||||||||||||||
|
|
@@ -67,6 +68,27 @@ | |||||||||||||||||||||
| metaserver: Optional[str] | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| @dataclass | ||||||||||||||||||||||
| class SizedDict(OrderedDict): | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def __init__(self, max_size=2, *args, **kwargs): | ||||||||||||||||||||||
| self.max_size = max_size | ||||||||||||||||||||||
| super().__init__(*args, **kwargs) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def __setitem__(self, key, value): | ||||||||||||||||||||||
| super().__setitem__(key, value) | ||||||||||||||||||||||
| if len(self) > self.max_size: | ||||||||||||||||||||||
| self.popitem(last=False) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def __getitem__(self, key): | ||||||||||||||||||||||
| try: | ||||||||||||||||||||||
| return super().__getitem__(key) | ||||||||||||||||||||||
| except KeyError: | ||||||||||||||||||||||
| value = {} | ||||||||||||||||||||||
|
Check failure on line 87 in vllm_ascend/distributed/mooncake_layerwise_connector.py
|
||||||||||||||||||||||
| self[key] = value | ||||||||||||||||||||||
| return value | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| class KVCacheSendingLayerThread(threading.Thread): | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def __init__(self, | ||||||||||||||||||||||
|
|
@@ -365,7 +387,7 @@ | |||||||||||||||||||||
| role: KVConnectorRole, | ||||||||||||||||||||||
| kv_cache_config: Optional[KVCacheConfig] = None): | ||||||||||||||||||||||
| assert vllm_config.kv_transfer_config is not None | ||||||||||||||||||||||
| self.engine_id = vllm_config.kv_transfer_config.engine_id | ||||||||||||||||||||||
| self.engine_id = f"{vllm_config.kv_transfer_config.engine_id}-{uuid4()}" | ||||||||||||||||||||||
| self._connector_metadata = MooncakeLayerwiseConnectorMetadata() | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if role == KVConnectorRole.SCHEDULER: | ||||||||||||||||||||||
|
|
@@ -695,9 +717,9 @@ | |||||||||||||||||||||
| self.encoder = msgspec.msgpack.Encoder() | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| self.remote_kv_caches_base_addr: dict[str, dict[int, list[int]]] = \ | ||||||||||||||||||||||
| defaultdict(dict) | ||||||||||||||||||||||
| SizedDict() | ||||||||||||||||||||||
| self.remote_te_port: dict[str, dict[int, int]] = \ | ||||||||||||||||||||||
| defaultdict(dict) | ||||||||||||||||||||||
| SizedDict() | ||||||||||||||||||||||
|
Comment on lines
697
to
+721
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new It would be more robust to make this cache size configurable. For example, you could add a configuration option and pass it to the # In MooncakeLayerwiseConnectorWorker.__init__
max_cached_engines = self.vllm_config.kv_transfer_config.get_from_extra_config(
'max_cached_engines', 128) # A more reasonable default
# ...
self.remote_kv_caches_base_addr: dict[str, dict[int, list[int]]] = \
SizedDict(max_size=max_cached_engines)
self.remote_te_port: dict[str, dict[int, int]] = \
SizedDict(max_size=max_cached_engines)This would allow operators to tune the cache size based on their specific deployment topology. For now, I'm suggesting a larger default.
Suggested change
|
||||||||||||||||||||||
| self.remote_sockets_lock = threading.Lock() | ||||||||||||||||||||||
| self.remote_sockets: dict[ # type: ignore | ||||||||||||||||||||||
| str, deque[zmq.Socket]] = defaultdict( # type: ignore | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of
@dataclasson theSizedDictclass is unnecessary and potentially misleading. This class defines a custom__init__method and inherits fromOrderedDict, which is not a dataclass. The@dataclassdecorator is designed for classes that primarily store data and can auto-generate methods like__init__and__repr__. In this case, it provides no benefit and could cause confusion or unexpected behavior during future maintenance. It's better to define it as a regular class for clarity and correctness.