Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions vllm_ascend/eplb/eplb_updator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

from vllm_ascend.eplb.core.eplb_utils import EPLBParamUtils
from vllm_ascend.eplb.core.eplb_worker import EplbProcess

from vllm_ascend.utils import (npu_stream_switch,
moe_load_async_stream)

class EplbUpdator:

Expand Down Expand Up @@ -152,20 +153,21 @@ def compute_and_set_moe_load(self, is_clear=False):

self._gather_buffer = None
if dist.is_initialized():
self.world_size = dist.get_world_size()
self.device = local_load.device
if self._gather_buffer is None:
shape = (self.world_size, *local_load.shape)
self._gather_buffer = torch.empty(shape,
dtype=local_load.dtype,
device=self.device)

dist.all_gather_into_tensor(self._gather_buffer, local_load)

moe_load = self._gather_buffer.permute(1, 0, 2)
self.shared_dict["moe_load"] = moe_load.cpu()
logger.debug(
f"[ModelRunner] Updated shared_dict['moe_load'] shape={moe_load.shape}"
with npu_stream_switch(moe_load_async_stream()):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better to set moe_load_async_stream as class attribute of EplbUpdator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already move this function to eplb module, since other file would also call this stream, so move to eplb utils is better

self.world_size = dist.get_world_size()
self.device = local_load.device
if self._gather_buffer is None:
shape = (self.world_size, *local_load.shape)
self._gather_buffer = torch.empty(shape,
dtype=local_load.dtype,
device=self.device)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The self._gather_buffer is reset to None on every call to compute_and_set_moe_load (on line 154). This makes this condition always true, causing the buffer to be re-allocated on every invocation, which is inefficient. To avoid this performance issue, self._gather_buffer should be initialized to None in the __init__ method of the EplbUpdator class, and the line self._gather_buffer = None should be removed from this method.


dist.all_gather_into_tensor(self._gather_buffer, local_load)

moe_load = self._gather_buffer.permute(1, 0, 2)
self.shared_dict["moe_load"] = moe_load.cpu()
logger.debug(
f"[ModelRunner] Updated shared_dict['moe_load'] shape={moe_load.shape}"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a race condition here. The moe_load tensors are updated asynchronously on moe_load_async_stream in fused_moe.py. However, self.adaptor.get_rank_expert_workload() is called on the default stream (on line 152, before this block) to read these tensors without any synchronization. This can lead to reading stale or incomplete data, causing incorrect load balancing. To fix this, you must synchronize the streams before reading moe_load. For example, you could add moe_load_async_stream().synchronize() before the call to self.adaptor.get_rank_expert_workload() on line 152.

else:
moe_load = local_load.unsqueeze(1)
Expand Down
7 changes: 5 additions & 2 deletions vllm_ascend/ops/fused_moe/fused_moe.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
is_enable_nz, npu_stream_switch,
shared_expert_dp_enabled,
shared_experts_calculation_stream,
moe_load_async_stream,
vllm_version_is)

if vllm_version_is("0.11.0"):
Expand Down Expand Up @@ -392,8 +393,10 @@ def forward_impl(self, hidden_states: torch.Tensor,
if isinstance(final_hidden_states, tuple):
final_hidden_states, group_list_type, expert_tokens = final_hidden_states
if self.dynamic_eplb:
self.moe_load += expert_tokens if group_list_type == 1 else \
torch.cat([expert_tokens[:1], expert_tokens[1:] - expert_tokens[:-1]])
with npu_stream_switch(moe_load_async_stream()):
moe_load_async_stream().wait_stream(torch.npu.current_stream(device=expert_tokens.device))
self.moe_load += expert_tokens if group_list_type == 1 else \
torch.cat([expert_tokens[:1], expert_tokens[1:] - expert_tokens[:-1]])

final_hidden_states = forward_context.moe_comm_method.finalize(
hidden_states=final_hidden_states,
Expand Down
10 changes: 10 additions & 0 deletions vllm_ascend/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
_CURRENT_STREAM = None
_PREFETCH_STREAM = None
_SHARED_EXPERTS_CALCULATION_STREAM = None
_MOE_LOAD_ASYNC_STREAM = None
_ASCEND_CUSTOMOP_IS_REIGISTERED = False
_DEFAULT_BUFFER_SIZE = 200
_MIN_DP_BUFFER_SIZE = 50
Expand Down Expand Up @@ -336,6 +337,15 @@ def shared_experts_calculation_stream() -> torch.npu.Stream:
return _SHARED_EXPERTS_CALCULATION_STREAM


def moe_load_async_stream() -> torch_npu.npu.Stream:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this function to eplb module

global _MOE_LOAD_ASYNC_STREAM
if _MOE_LOAD_ASYNC_STREAM is None:
# when this function is called before any stream is set,
# we return the default stream.
_MOE_LOAD_ASYNC_STREAM = torch_npu.npu.Stream()
return _MOE_LOAD_ASYNC_STREAM


def adapt_patch(is_global_patch: bool = False):
if is_global_patch:
from vllm_ascend.patch import platform # noqa: F401
Expand Down
Loading