-
Notifications
You must be signed in to change notification settings - Fork 646
[Feat] Multi-stream for eplb heat collection and aggregation #4214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
f316716
7b05287
e014822
b2a0571
59c9b0e
e09650a
d5b59ad
9eaa03d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,7 +23,8 @@ | |
|
|
||
| from vllm_ascend.eplb.core.eplb_utils import EPLBParamUtils | ||
| from vllm_ascend.eplb.core.eplb_worker import EplbProcess | ||
|
|
||
| from vllm_ascend.utils import (npu_stream_switch, | ||
| moe_load_async_stream) | ||
|
|
||
| class EplbUpdator: | ||
|
|
||
|
|
@@ -152,20 +153,21 @@ def compute_and_set_moe_load(self, is_clear=False): | |
|
|
||
| self._gather_buffer = None | ||
| if dist.is_initialized(): | ||
| self.world_size = dist.get_world_size() | ||
| self.device = local_load.device | ||
| if self._gather_buffer is None: | ||
| shape = (self.world_size, *local_load.shape) | ||
| self._gather_buffer = torch.empty(shape, | ||
| dtype=local_load.dtype, | ||
| device=self.device) | ||
|
|
||
| dist.all_gather_into_tensor(self._gather_buffer, local_load) | ||
|
|
||
| moe_load = self._gather_buffer.permute(1, 0, 2) | ||
| self.shared_dict["moe_load"] = moe_load.cpu() | ||
| logger.debug( | ||
| f"[ModelRunner] Updated shared_dict['moe_load'] shape={moe_load.shape}" | ||
| with npu_stream_switch(moe_load_async_stream()): | ||
| self.world_size = dist.get_world_size() | ||
| self.device = local_load.device | ||
| if self._gather_buffer is None: | ||
| shape = (self.world_size, *local_load.shape) | ||
| self._gather_buffer = torch.empty(shape, | ||
| dtype=local_load.dtype, | ||
| device=self.device) | ||
|
||
|
|
||
| dist.all_gather_into_tensor(self._gather_buffer, local_load) | ||
|
|
||
| moe_load = self._gather_buffer.permute(1, 0, 2) | ||
| self.shared_dict["moe_load"] = moe_load.cpu() | ||
| logger.debug( | ||
| f"[ModelRunner] Updated shared_dict['moe_load'] shape={moe_load.shape}" | ||
| ) | ||
|
||
| else: | ||
| moe_load = local_load.unsqueeze(1) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,6 +53,7 @@ | |
| _CURRENT_STREAM = None | ||
| _PREFETCH_STREAM = None | ||
| _SHARED_EXPERTS_CALCULATION_STREAM = None | ||
| _MOE_LOAD_ASYNC_STREAM = None | ||
| _ASCEND_CUSTOMOP_IS_REIGISTERED = False | ||
| _DEFAULT_BUFFER_SIZE = 200 | ||
| _MIN_DP_BUFFER_SIZE = 50 | ||
|
|
@@ -336,6 +337,15 @@ def shared_experts_calculation_stream() -> torch.npu.Stream: | |
| return _SHARED_EXPERTS_CALCULATION_STREAM | ||
|
|
||
|
|
||
| def moe_load_async_stream() -> torch_npu.npu.Stream: | ||
|
||
| global _MOE_LOAD_ASYNC_STREAM | ||
| if _MOE_LOAD_ASYNC_STREAM is None: | ||
| # when this function is called before any stream is set, | ||
| # we return the default stream. | ||
| _MOE_LOAD_ASYNC_STREAM = torch_npu.npu.Stream() | ||
| return _MOE_LOAD_ASYNC_STREAM | ||
|
|
||
|
|
||
| def adapt_patch(is_global_patch: bool = False): | ||
| if is_global_patch: | ||
| from vllm_ascend.patch import platform # noqa: F401 | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe better to set moe_load_async_stream as class attribute of EplbUpdator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
already move this function to eplb module, since other file would also call this stream, so move to eplb utils is better