From e63fe8254c70c2d842f7cc1f46f931a68af6d8e4 Mon Sep 17 00:00:00 2001 From: jesse Date: Fri, 19 Sep 2025 10:39:58 +0800 Subject: [PATCH 01/29] Prompt Embeddings Support for v1 Engine Signed-off-by: jesse --- vllm_ascend/worker/model_runner_v1.py | 79 ++++++++++++++++++++++++--- vllm_ascend/worker/npu_input_batch.py | 55 ++++++++++++++++--- 2 files changed, 119 insertions(+), 15 deletions(-) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 6e42da1367c..42e1bfce48a 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -67,7 +67,8 @@ from vllm.tasks import GenerationTask, PoolingTask, SupportedTask from vllm.utils import (STR_DTYPE_TO_TORCH_DTYPE, DeviceMemoryProfiler, LazyLoader, cdiv, get_dtype_size, - is_pin_memory_available) + is_pin_memory_available, + length_from_prompt_token_ids_or_embeds) from vllm.v1.attention.backends.gdn_attn import GDNAttentionMetadataBuilder from vllm.v1.attention.backends.utils import \ reorder_batch_to_split_decodes_and_prefills @@ -285,11 +286,14 @@ def __init__(self, vllm_config: VllmConfig, device: torch.device): self.is_multimodal_model = self.model_config.is_multimodal_model self.is_pooling_model = self.model_config.pooler_config is not None + self.enable_prompt_embeds = self.model_config.enable_prompt_embeds if self.is_multimodal_model: - self.inputs_embeds = torch.zeros( - (self.max_num_tokens, self.model_config.get_hidden_size()), - dtype=self.dtype, - device=self.device) + self.inputs_embeds = self._make_buffer(self.max_num_tokens, + self.model_config.get_hidden_size(), + dtype=self.dtype, + numpy=False) + self.is_token_ids = self._make_buffer(self.max_num_tokens, + dtype=torch.bool) # Set up Attention self.attn_backend = get_attn_backend( @@ -572,6 +576,7 @@ def _update_states(self, scheduler_output: "SchedulerOutput") -> None: self.requests[req_id] = CachedRequestState( req_id=req_id, prompt_token_ids=new_req_data.prompt_token_ids, + prompt_embeds=new_req_data.prompt_embeds, mm_kwargs=new_req_data.mm_kwargs, mm_positions=new_req_data.mm_positions, sampling_params=sampling_params, @@ -843,7 +848,8 @@ def _calc_mrope_positions(self, scheduler_output: "SchedulerOutput"): self.input_batch.num_computed_tokens_cpu[index] num_scheduled_tokens = \ scheduler_output.num_scheduled_tokens[req_id] - num_prompt_tokens = len(req.prompt_token_ids) + num_prompt_tokens = length_from_prompt_token_ids_or_embeds( + req.prompt_token_ids, req.prompt_embeds) if num_computed_tokens + num_scheduled_tokens > num_prompt_tokens: prompt_part_len = max(0, @@ -1016,6 +1022,8 @@ def _prepare_input_ids(self, total_num_scheduled_tokens: int, self.input_ids[:total_num_scheduled_tokens].copy_( self.input_ids_cpu[:total_num_scheduled_tokens], non_blocking=True) + self.inputs_embeds.copy_to_gpu(total_num_scheduled_tokens) + self.is_token_ids.copy_to_gpu(total_num_scheduled_tokens) return # Async scheduling case, where some decode requests from the previous @@ -1043,6 +1051,8 @@ def _prepare_input_ids(self, total_num_scheduled_tokens: int, self.input_ids[:total_num_scheduled_tokens].copy_( self.input_ids_cpu[:total_num_scheduled_tokens], non_blocking=True) + self.inputs_embeds.copy_to_gpu(total_num_scheduled_tokens) + self.is_token_ids.copy_to_gpu(total_num_scheduled_tokens) if num_commmon_tokens == 0: # No requests in common with the previous iteration # So input_ids_cpu will have all the input ids. @@ -1056,6 +1066,7 @@ def _prepare_input_ids(self, total_num_scheduled_tokens: int, self.input_batch.prev_sampled_token_ids[:num_commmon_tokens, 0], non_blocking=True) + self.is_token_ids.gpu[:num_commmon_tokens] = True return # Upload the index tensors asynchronously # so the scatter can be non-blocking. @@ -1195,15 +1206,60 @@ def _prepare_inputs( # where M is the max_model_len. token_indices = (positions_np + req_indices * self.input_batch.token_ids_cpu.shape[1]) - + token_indices_tensor = torch.from_numpy(token_indices) # Prepare input_ids. # NOTE(woosuk): We use torch.index_select instead of np.take here # because torch.index_select is much faster than np.take for large # tensors. torch.index_select(self.input_batch.token_ids_cpu_tensor.flatten(), 0, - torch.from_numpy(token_indices), + token_indices_tensor, out=self.input_ids_cpu[:total_num_scheduled_tokens]) + is_token_ids = self.input_batch.is_token_ids.flatten() + torch.index_select( + is_token_ids, + 0, + token_indices_tensor, + out=self.is_token_ids.cpu[:total_num_scheduled_tokens]) + + # Because we did not pre-allocate a massive prompt_embeds CPU tensor on + # the InputBatch, we need to fill in the prompt embeds into the expected + # spots in the GpuModelRunner's pre-allocated prompt_embeds tensor. + if self.input_batch.req_prompt_embeds: + output_idx = 0 + for req_idx in range(num_reqs): + num_sched = num_scheduled_tokens[req_idx] + + # Skip if this request doesn't have embeddings + if req_idx not in self.input_batch.req_prompt_embeds: + output_idx += num_sched + continue + + # Skip if no tokens scheduled + if num_sched <= 0: + output_idx += num_sched + continue + + req_embeds = self.input_batch.req_prompt_embeds[req_idx] + start_pos = self.input_batch.num_computed_tokens_cpu[req_idx] + + # Skip if trying to read beyond available embeddings + if start_pos >= req_embeds.shape[0]: + output_idx += num_sched + continue + + # Copy available embeddings + end_pos = start_pos + num_sched + actual_end = min(end_pos, req_embeds.shape[0]) + actual_num_sched = actual_end - start_pos + + if actual_num_sched > 0: + self.inputs_embeds.cpu[output_idx:output_idx + + actual_num_sched].copy_( + req_embeds[start_pos:actual_end] + ) + + output_idx += num_sched # Prepare some information for building Attention-Metadata # Compute and commit slot mapping @@ -1985,6 +2041,7 @@ def execute_model( self.input_batch.token_ids_cpu[req_idx, start_idx:end_idx] = sampled_ids + self.input_batch.is_token_ids[req_idx, start_idx:end_idx] = True self.input_batch.num_tokens_no_spec[req_idx] = end_idx self.input_batch.num_tokens[req_idx] = end_idx req_id = self.input_batch.req_ids[req_idx] @@ -2200,6 +2257,9 @@ def _dummy_run( if self.is_multimodal_model: input_ids = None inputs_embeds = self.inputs_embeds[:num_tokens] + elif self.enable_prompt_embeds: + input_ids = None + inputs_embeds = self.inputs_embeds.gpu[:num_tokens] else: input_ids = self.input_ids[:num_tokens] inputs_embeds = None @@ -3070,6 +3130,9 @@ def _get_prompt_logprobs_dict( # Get metadata for this request. request = self.requests[req_id] + if request.prompt_token_ids is None: + # Prompt logprobs is incompatible with prompt embeddings + continue num_prompt_tokens = len(request.prompt_token_ids) prompt_token_ids = torch.tensor(request.prompt_token_ids).to( self.device, non_blocking=True) diff --git a/vllm_ascend/worker/npu_input_batch.py b/vllm_ascend/worker/npu_input_batch.py index ce37ff921c2..c778742066b 100644 --- a/vllm_ascend/worker/npu_input_batch.py +++ b/vllm_ascend/worker/npu_input_batch.py @@ -28,7 +28,7 @@ PlaceholderRange) from vllm.pooling_params import PoolingParams from vllm.sampling_params import SamplingParams, SamplingType -from vllm.utils import swap_dict_values +from vllm.utils import length_from_prompt_token_ids_or_embeds,swap_dict_values from vllm.v1.outputs import LogprobsTensors from vllm.v1.pool.metadata import PoolingMetadata from vllm.v1.sample.logits_processor import (BatchUpdateBuilder, @@ -45,7 +45,7 @@ class CachedRequestState: req_id: str - prompt_token_ids: list[int] + prompt_token_ids: Optional[list[int]] mm_kwargs: list[MultiModalKwargsItem] mm_positions: list[PlaceholderRange] mm_hashes: list[str] @@ -61,9 +61,11 @@ class CachedRequestState: mrope_position_delta: Optional[int] = None lora_request: Optional[LoRARequest] = None + prompt_embeds: Optional[torch.Tensor] = None def __post_init__(self): - self.num_prompt_tokens = len(self.prompt_token_ids) + self.num_prompt_tokens = length_from_prompt_token_ids_or_embeds( + self.prompt_token_ids, self.prompt_embeds) @property def num_tokens(self) -> int: @@ -78,6 +80,10 @@ def mm_inputs(self) -> list[MultiModalKwargs]: def get_token_id(self, idx: int) -> int: if idx < self.num_prompt_tokens: + if self.prompt_token_ids is None: + raise ValueError( + f"Tried to access token index {idx}, but that token was " + "provided via prompt_embeds, and its ID is unknown.") return self.prompt_token_ids[idx] else: return self.output_token_ids[idx - self.num_prompt_tokens] @@ -122,6 +128,14 @@ def __init__( pin_memory=False, ) self.token_ids_cpu = self.token_ids_cpu_tensor.numpy() + self.is_token_ids = torch.zeros((max_num_reqs, max_model_len), + device="cpu", + dtype=bool, + pin_memory=False) + # Store prompt embeddings per request to avoid OOM from large upfront + # allocation if max_model_len is big. + # Maps req_index -> tensor of shape (num_prompt_tokens, hidden_size) + self.req_prompt_embeds: dict[int, torch.Tensor] = {} self.num_tokens = np.zeros(max_num_reqs, dtype=np.int32) self.num_tokens_no_spec = np.zeros(max_num_reqs, dtype=np.int32) self.num_prompt_tokens = np.zeros(max_num_reqs, dtype=np.int32) @@ -326,15 +340,23 @@ def add_request( self.req_id_to_index[req_id] = req_index # Copy the prompt token ids and output token ids. - num_prompt_tokens = len(request.prompt_token_ids) + num_prompt_tokens = length_from_prompt_token_ids_or_embeds( + request.prompt_token_ids, request.prompt_embeds) self.num_prompt_tokens[req_index] = num_prompt_tokens - self.token_ids_cpu[ - req_index, :num_prompt_tokens] = request.prompt_token_ids start_idx = num_prompt_tokens end_idx = start_idx + len(request.output_token_ids) + if request.prompt_token_ids is not None: + self.token_ids_cpu[ + req_index, :num_prompt_tokens] = request.prompt_token_ids + self.is_token_ids[req_index, :num_prompt_tokens] = True + else: + self.is_token_ids[req_index, :num_prompt_tokens] = False + if request.prompt_embeds is not None: + self.req_prompt_embeds[req_index] = request.prompt_embeds self.token_ids_cpu[req_index, start_idx:end_idx] = request.output_token_ids - # Number of token ids in token_ids_cpu. + self.is_token_ids[req_index, start_idx:end_idx] = True + # Number of token ids in prompt (token_ids_cpu or prompt_embeds). # NOTE(woosuk): This may include spec decode tokens. self.num_tokens[req_index] = request.num_tokens # Number of tokens without spec decode tokens. @@ -534,6 +556,20 @@ def swap_states(self, i1: int, i2: int) -> None: self.token_ids_cpu[i1, ...] = self.token_ids_cpu[i2, ...] self.token_ids_cpu[i2, ...] = tmp + self.is_token_ids[[i1, i2], ...] = self.is_token_ids[[i2, i1], ...] + + # Swap prompt embeddings if they exist + embeds_i1 = self.req_prompt_embeds.get(i1) + embeds_i2 = self.req_prompt_embeds.get(i2) + if embeds_i1 is not None: + self.req_prompt_embeds[i2] = embeds_i1 + else: + self.req_prompt_embeds.pop(i2, None) + if embeds_i2 is not None: + self.req_prompt_embeds[i1] = embeds_i2 + else: + self.req_prompt_embeds.pop(i1, None) + swap_dict_values(self.generators, i1, i2) swap_dict_values(self.bad_words_token_ids, i1, i2) @@ -612,6 +648,11 @@ def condense(self) -> None: num_tokens = self.num_tokens[last_req_index] self.token_ids_cpu[empty_index, :num_tokens] = self.token_ids_cpu[ last_req_index, :num_tokens] + self.is_token_ids[empty_index, :num_tokens] = self.is_token_ids[ + last_req_index, :num_tokens] + if last_req_index in self.req_prompt_embeds: + self.req_prompt_embeds[ + empty_index] = self.req_prompt_embeds.pop(last_req_index) self.num_tokens[empty_index] = num_tokens self.num_tokens_no_spec[empty_index] = self.num_tokens_no_spec[ last_req_index] From 777046fa025cadf78892070edec8f6a9fc018ab5 Mon Sep 17 00:00:00 2001 From: jesse Date: Mon, 22 Sep 2025 16:21:45 +0800 Subject: [PATCH 02/29] [Fix] Update input embeddings condition to include prompt embeddings for multimodal models Signed-off-by: jesse --- vllm_ascend/worker/model_runner_v1.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index d820c8a5604..755c8df9a28 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -297,7 +297,7 @@ def __init__(self, vllm_config: VllmConfig, device: torch.device): self.is_multimodal_model = self.model_config.is_multimodal_model self.is_pooling_model = self.model_config.pooler_config is not None self.enable_prompt_embeds = self.model_config.enable_prompt_embeds - if self.is_multimodal_model: + if self.is_multimodal_model or self.enable_prompt_embeds: self.inputs_embeds = self._make_buffer(self.max_num_tokens, self.model_config.get_hidden_size(), dtype=self.dtype, From c522293243da1705322b1555041c92af35e81391 Mon Sep 17 00:00:00 2001 From: jesse Date: Mon, 22 Sep 2025 16:30:16 +0800 Subject: [PATCH 03/29] fix param Signed-off-by: jesse --- vllm_ascend/worker/npu_input_batch.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/vllm_ascend/worker/npu_input_batch.py b/vllm_ascend/worker/npu_input_batch.py index 826af3123ee..01ef3b32768 100644 --- a/vllm_ascend/worker/npu_input_batch.py +++ b/vllm_ascend/worker/npu_input_batch.py @@ -48,9 +48,6 @@ class CachedRequestState: req_id: str prompt_token_ids: Optional[list[int]] - mm_kwargs: list[MultiModalKwargsItem] - mm_positions: list[PlaceholderRange] - mm_hashes: list[str] sampling_params: Optional[SamplingParams] pooling_params: Optional[PoolingParams] generator: Optional[torch.Generator] From a8187a51758ea3ae01ccac84ef7499258f3047cc Mon Sep 17 00:00:00 2001 From: jesse Date: Fri, 26 Sep 2025 12:32:12 +0800 Subject: [PATCH 04/29] format Signed-off-by: jesse --- vllm_ascend/worker/model_runner_v1.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 640bada2d0b..530536a309f 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -302,10 +302,11 @@ def __init__(self, vllm_config: VllmConfig, device: torch.device): self.is_pooling_model = self.model_config.pooler_config is not None self.enable_prompt_embeds = self.model_config.enable_prompt_embeds if self.is_multimodal_model or self.enable_prompt_embeds: - self.inputs_embeds = self._make_buffer(self.max_num_tokens, - self.model_config.get_hidden_size(), - dtype=self.dtype, - numpy=False) + self.inputs_embeds = self._make_buffer( + self.max_num_tokens, + self.model_config.get_hidden_size(), + dtype=self.dtype, + numpy=False) self.is_token_ids = self._make_buffer(self.max_num_tokens, dtype=torch.bool) @@ -2203,7 +2204,8 @@ def execute_model( self.input_batch.token_ids_cpu[req_idx, start_idx:end_idx] = sampled_ids - self.input_batch.is_token_ids[req_idx, start_idx:end_idx] = True + self.input_batch.is_token_ids[req_idx, + start_idx:end_idx] = True self.input_batch.num_tokens_no_spec[req_idx] = end_idx self.input_batch.num_tokens[req_idx] = end_idx req_id = self.input_batch.req_ids[req_idx] @@ -3625,4 +3627,4 @@ def _to_list(self, sampled_token_ids: torch.Tensor) -> list[list[int]]: pinned.copy_(sampled_token_ids, non_blocking=True) self.transfer_event.record() self.transfer_event.synchronize() - return pinned.tolist() \ No newline at end of file + return pinned.tolist() From 75cfdd7be175158afa92ae69726b7d5bed0bd7b1 Mon Sep 17 00:00:00 2001 From: jesse Date: Fri, 26 Sep 2025 12:52:06 +0800 Subject: [PATCH 05/29] add test Signed-off-by: jesse --- .../test_completion_with_prompt_embeds.py | 413 ++++++++++++++++++ tests/e2e/utils.py | 162 ++++++- 2 files changed, 574 insertions(+), 1 deletion(-) create mode 100644 tests/e2e/singlecard/test_completion_with_prompt_embeds.py diff --git a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py new file mode 100644 index 00000000000..02ba41d72d7 --- /dev/null +++ b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py @@ -0,0 +1,413 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +import base64 +import io + +import openai # use the official client for correctness check +import pytest +import pytest_asyncio +import torch +# downloading lora to test lora requests +from openai import BadRequestError +from transformers import AutoConfig + +from ..utils import RemoteOpenAIServer + +# any model with a chat template should work here +MODEL_NAME = "facebook/opt-125m" + +CONFIG = AutoConfig.from_pretrained(MODEL_NAME) + +class RemoteOpenAIServer: + DUMMY_API_KEY = "token-abc123" # vLLM's OpenAI server does not need API key + + def _start_server(self, model: str, vllm_serve_args: list[str], + env_dict: Optional[dict[str, str]]) -> None: + """Subclasses override this method to customize server process launch + """ + env = os.environ.copy() + # the current process might initialize cuda, + # to be safe, we should use spawn method + env['VLLM_WORKER_MULTIPROC_METHOD'] = 'spawn' + if env_dict is not None: + env.update(env_dict) + self.proc: subprocess.Popen = subprocess.Popen( + ["vllm", "serve", model, *vllm_serve_args], + env=env, + stdout=sys.stdout, + stderr=sys.stderr, + ) + + def __init__(self, + model: str, + vllm_serve_args: list[str], + *, + env_dict: Optional[dict[str, str]] = None, + seed: Optional[int] = 0, + auto_port: bool = True, + max_wait_seconds: Optional[float] = None, + override_hf_configs: Optional[dict[str, Any]] = None) -> None: + if auto_port: + if "-p" in vllm_serve_args or "--port" in vllm_serve_args: + raise ValueError("You have manually specified the port " + "when `auto_port=True`.") + + # No need for a port if using unix sockets + if "--uds" not in vllm_serve_args: + # Don't mutate the input args + vllm_serve_args = vllm_serve_args + [ + "--port", str(get_open_port()) + ] + if seed is not None: + if "--seed" in vllm_serve_args: + raise ValueError("You have manually specified the seed " + f"when `seed={seed}`.") + + vllm_serve_args = vllm_serve_args + ["--seed", str(seed)] + + if override_hf_configs is not None: + vllm_serve_args = vllm_serve_args + [ + "--hf-overrides", + json.dumps(override_hf_configs) + ] + + parser = FlexibleArgumentParser( + description="vLLM's remote OpenAI server.") + subparsers = parser.add_subparsers(required=False, dest="subparser") + parser = ServeSubcommand().subparser_init(subparsers) + args = parser.parse_args(["--model", model, *vllm_serve_args]) + self.uds = args.uds + if args.uds: + self.host = None + self.port = None + else: + self.host = str(args.host or 'localhost') + self.port = int(args.port) + + self.show_hidden_metrics = \ + args.show_hidden_metrics_for_version is not None + + # download the model before starting the server to avoid timeout + is_local = os.path.isdir(model) + if not is_local: + engine_args = AsyncEngineArgs.from_cli_args(args) + model_config = engine_args.create_model_config() + load_config = engine_args.create_load_config() + + model_loader = get_model_loader(load_config) + model_loader.download_model(model_config) + + self._start_server(model, vllm_serve_args, env_dict) + max_wait_seconds = max_wait_seconds or 240 + self._wait_for_server(url=self.url_for("health"), + timeout=max_wait_seconds) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.proc.terminate() + try: + self.proc.wait(8) + except subprocess.TimeoutExpired: + # force kill if needed + self.proc.kill() + + def _poll(self) -> Optional[int]: + """Subclasses override this method to customize process polling""" + return self.proc.poll() + + def _wait_for_server(self, *, url: str, timeout: float): + # run health check + start = time.time() + client = (httpx.Client(transport=httpx.HTTPTransport( + uds=self.uds)) if self.uds else requests) + while True: + try: + if client.get(url).status_code == 200: + break + except Exception: + # this exception can only be raised by requests.get, + # which means the server is not ready yet. + # the stack trace is not useful, so we suppress it + # by using `raise from None`. + result = self._poll() + if result is not None and result != 0: + raise RuntimeError("Server exited unexpectedly.") from None + + time.sleep(0.5) + if time.time() - start > timeout: + raise RuntimeError( + "Server failed to start in time.") from None + + @property + def url_root(self) -> str: + return (f"http://{self.uds.split('/')[-1]}" + if self.uds else f"http://{self.host}:{self.port}") + + def url_for(self, *parts: str) -> str: + return self.url_root + "/" + "/".join(parts) + + def get_client(self, **kwargs): + if "timeout" not in kwargs: + kwargs["timeout"] = 600 + return openai.OpenAI( + base_url=self.url_for("v1"), + api_key=self.DUMMY_API_KEY, + max_retries=0, + **kwargs, + ) + + def get_async_client(self, **kwargs): + if "timeout" not in kwargs: + kwargs["timeout"] = 600 + return openai.AsyncOpenAI(base_url=self.url_for("v1"), + api_key=self.DUMMY_API_KEY, + max_retries=0, + **kwargs) + + +@pytest.fixture(scope="module") +def default_server_args() -> list[str]: + return [ + # use half precision for speed and memory savings in CI environment + "--dtype", + "bfloat16", + "--max-model-len", + "2048", + "--max-num-seqs", + "128", + "--enforce-eager", + # Prompt Embeds server args + "--enable-prompt-embeds", + ] + + +EXAMPLE_PROMPTS = [ + "Hello, my name is", + "What is an LLM?", +] + + +def _encode_embeds(embeds: torch.Tensor): + buffer = io.BytesIO() + torch.save(embeds, buffer) + return base64.b64encode(buffer.getvalue()).decode('utf-8') + + +@pytest.fixture(scope="module") +def example_prompt_embeds(hf_runner): + """Create example embeddings and return them as base64 encoded string.""" + with hf_runner(MODEL_NAME) as hf_model: + example_embeddings = hf_model.get_prompt_embeddings(EXAMPLE_PROMPTS) + + return [_encode_embeds(item) for item in example_embeddings] + + +@pytest.fixture(scope="module", + params=["", "--disable-frontend-multiprocessing"]) +def server_with_prompt_embeds(default_server_args, request): + if request.param: + default_server_args.append(request.param) + + with RemoteOpenAIServer(MODEL_NAME, default_server_args) as remote_server: + yield remote_server + + +@pytest_asyncio.fixture +async def client_with_prompt_embeds(server_with_prompt_embeds): + async with server_with_prompt_embeds.get_async_client() as async_client: + yield async_client + + +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_completions_with_prompt_embeds( + example_prompt_embeds, + client_with_prompt_embeds: openai.AsyncOpenAI, + model_name: str, +): + encoded_embeds, encoded_embeds2 = example_prompt_embeds + + # Test case: Single prompt embeds input + completion = await client_with_prompt_embeds.completions.create( + model=model_name, + prompt="", # Add empty prompt as required parameter + max_tokens=5, + temperature=0.0, + extra_body={"prompt_embeds": encoded_embeds}) + assert len(completion.choices[0].text) >= 1 + assert completion.choices[0].prompt_logprobs is None + + # Test case: batch completion with prompt_embeds + completion = await client_with_prompt_embeds.completions.create( + model=model_name, + prompt="", # Add empty prompt as required parameter + max_tokens=5, + temperature=0.0, + extra_body={"prompt_embeds": [encoded_embeds, encoded_embeds2]}) + assert len(completion.choices) == 2 + assert len(completion.choices[0].text) >= 1 + assert len(completion.choices[1].text) >= 1 + + # Test case: streaming with prompt_embeds + single_completion = await client_with_prompt_embeds.completions.create( + model=model_name, + prompt="", # Add empty prompt as required parameter + max_tokens=5, + temperature=0.0, + extra_body={"prompt_embeds": encoded_embeds}) + single_output = single_completion.choices[0].text + + stream = await client_with_prompt_embeds.completions.create( + model=model_name, + prompt="", # Add empty prompt as required parameter + max_tokens=5, + temperature=0.0, + stream=True, + extra_body={"prompt_embeds": encoded_embeds}) + chunks = [] + finish_reason_count = 0 + async for chunk in stream: + chunks.append(chunk.choices[0].text) + if chunk.choices[0].finish_reason is not None: + finish_reason_count += 1 + assert finish_reason_count == 1 + assert chunk.choices[0].finish_reason == "length" + assert chunk.choices[0].text + assert "".join(chunks) == single_output + + # Test case: batch streaming with prompt_embeds + stream = await client_with_prompt_embeds.completions.create( + model=model_name, + prompt="", # Add empty prompt as required parameter + max_tokens=5, + temperature=0.0, + stream=True, + extra_body={"prompt_embeds": [encoded_embeds, encoded_embeds2]}) + chunks_stream_embeds: list[list[str]] = [[], []] + finish_reason_count = 0 + async for chunk in stream: + chunks_stream_embeds[chunk.choices[0].index].append( + chunk.choices[0].text) + if chunk.choices[0].finish_reason is not None: + finish_reason_count += 1 + assert finish_reason_count == 2 + assert chunk.choices[0].finish_reason == "length" + assert chunk.choices[0].text + assert len(chunks_stream_embeds[0]) > 0 + assert len(chunks_stream_embeds[1]) > 0 + + # Test case: mixed text and prompt_embeds + completion_mixed = await client_with_prompt_embeds.completions.create( + model=model_name, + prompt="This is a prompt", + max_tokens=5, + temperature=0.0, + extra_body={"prompt_embeds": encoded_embeds}) + assert len(completion.choices) == 2 + completion_text_only = await client_with_prompt_embeds.completions.create( + model=model_name, + prompt="This is a prompt", + max_tokens=5, + temperature=0.0, + ) + completion_embeds_only = await client_with_prompt_embeds.completions.create( + model=model_name, + prompt="", + max_tokens=5, + temperature=0.0, + extra_body={"prompt_embeds": encoded_embeds}) + # Embeddings responses should be handled first + assert completion_mixed.choices[0].text == completion_embeds_only.choices[ + 0].text + assert completion_mixed.choices[1].text == completion_text_only.choices[ + 0].text + + +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_completions_errors_with_prompt_embeds( + client_with_prompt_embeds: openai.AsyncOpenAI, model_name: str): + # Test error case: invalid prompt_embeds + with pytest.raises(BadRequestError): + await client_with_prompt_embeds.completions.create( + prompt="", + model=model_name, + max_tokens=5, + temperature=0.0, + extra_body={"prompt_embeds": "invalid_base64"}) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("logprobs_arg", [1, 0]) +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_completions_with_logprobs_and_prompt_embeds( + example_prompt_embeds, + client_with_prompt_embeds: openai.AsyncOpenAI, + logprobs_arg: int, + model_name: str, +): + encoded_embeds, encoded_embeds2 = example_prompt_embeds + + # Test case: Logprobs using prompt_embeds + completion = await client_with_prompt_embeds.completions.create( + model=model_name, + prompt="", # Add empty prompt as required parameter + max_tokens=5, + temperature=0.0, + echo=False, + logprobs=logprobs_arg, + extra_body={"prompt_embeds": encoded_embeds}) + + logprobs = completion.choices[0].logprobs + assert logprobs is not None + assert len(logprobs.text_offset) == 5 + assert len(logprobs.token_logprobs) == 5 + assert len(logprobs.top_logprobs) == 5 + for top_logprobs in logprobs.top_logprobs[1:]: + assert max(logprobs_arg, 1) <= len(top_logprobs) <= logprobs_arg + 1 + assert len(logprobs.tokens) == 5 + + # Test case: Log probs with batch completion and prompt_embeds + completion = await client_with_prompt_embeds.completions.create( + model=model_name, + prompt="", # Add empty prompt as required parameter + max_tokens=5, + temperature=0.0, + echo=False, + logprobs=logprobs_arg, + extra_body={"prompt_embeds": [encoded_embeds, encoded_embeds2]}) + + assert len(completion.choices) == 2 + for choice in completion.choices: + logprobs = choice.logprobs + assert logprobs is not None + assert len(logprobs.text_offset) == 5 + assert len(logprobs.token_logprobs) == 5 + assert len(logprobs.top_logprobs) == 5 + for top_logprobs in logprobs.top_logprobs[1:]: + assert max(logprobs_arg, + 1) <= len(top_logprobs) <= logprobs_arg + 1 + assert len(logprobs.tokens) == 5 + + +@pytest.mark.asyncio +async def test_prompt_logprobs_raises_error( + example_prompt_embeds, + client_with_prompt_embeds: openai.AsyncOpenAI, +): + encoded_embeds, _ = example_prompt_embeds + + with pytest.raises(BadRequestError, match="not compatible"): + await client_with_prompt_embeds.completions.create( + model=MODEL_NAME, + prompt="", + max_tokens=5, + temperature=0.0, + extra_body={ + "prompt_embeds": encoded_embeds, + "prompt_logprobs": True + }, + ) diff --git a/tests/e2e/utils.py b/tests/e2e/utils.py index 279b7672411..532f424a7c3 100644 --- a/tests/e2e/utils.py +++ b/tests/e2e/utils.py @@ -18,14 +18,25 @@ # import functools +import json import os import signal +import subprocess +import sys +import time from collections.abc import Sequence -from typing import Callable +from typing import Any, Callable, Optional +import httpx +import openai +import requests import torch import torch.nn.functional as F from typing_extensions import ParamSpec +from vllm.engine.arg_utils import AsyncEngineArgs +from vllm.entrypoints.cli.serve import ServeSubcommand +from vllm.model_executor.model_loader import get_model_loader +from vllm.utils import FlexibleArgumentParser, get_open_port _P = ParamSpec("_P") @@ -104,3 +115,152 @@ def check_embeddings_close( f"\n{name_1}:\t{embeddings_1[:16]!r}") assert sim >= 1 - tol, fail_msg + + +class RemoteOpenAIServer: + DUMMY_API_KEY = "token-abc123" # vLLM's OpenAI server does not need API key + + def _start_server(self, model: str, vllm_serve_args: list[str], + env_dict: Optional[dict[str, str]]) -> None: + """Subclasses override this method to customize server process launch + """ + env = os.environ.copy() + # the current process might initialize cuda, + # to be safe, we should use spawn method + env['VLLM_WORKER_MULTIPROC_METHOD'] = 'spawn' + if env_dict is not None: + env.update(env_dict) + self.proc: subprocess.Popen = subprocess.Popen( + ["vllm", "serve", model, *vllm_serve_args], + env=env, + stdout=sys.stdout, + stderr=sys.stderr, + ) + + def __init__(self, + model: str, + vllm_serve_args: list[str], + *, + env_dict: Optional[dict[str, str]] = None, + seed: Optional[int] = 0, + auto_port: bool = True, + max_wait_seconds: Optional[float] = None, + override_hf_configs: Optional[dict[str, Any]] = None) -> None: + if auto_port: + if "-p" in vllm_serve_args or "--port" in vllm_serve_args: + raise ValueError("You have manually specified the port " + "when `auto_port=True`.") + + # No need for a port if using unix sockets + if "--uds" not in vllm_serve_args: + # Don't mutate the input args + vllm_serve_args = vllm_serve_args + [ + "--port", str(get_open_port()) + ] + if seed is not None: + if "--seed" in vllm_serve_args: + raise ValueError("You have manually specified the seed " + f"when `seed={seed}`.") + + vllm_serve_args = vllm_serve_args + ["--seed", str(seed)] + + if override_hf_configs is not None: + vllm_serve_args = vllm_serve_args + [ + "--hf-overrides", + json.dumps(override_hf_configs) + ] + + parser = FlexibleArgumentParser( + description="vLLM's remote OpenAI server.") + subparsers = parser.add_subparsers(required=False, dest="subparser") + parser = ServeSubcommand().subparser_init(subparsers) + args = parser.parse_args(["--model", model, *vllm_serve_args]) + self.uds = args.uds + if args.uds: + self.host = None + self.port = None + else: + self.host = str(args.host or 'localhost') + self.port = int(args.port) + + self.show_hidden_metrics = \ + args.show_hidden_metrics_for_version is not None + + # download the model before starting the server to avoid timeout + is_local = os.path.isdir(model) + if not is_local: + engine_args = AsyncEngineArgs.from_cli_args(args) + model_config = engine_args.create_model_config() + load_config = engine_args.create_load_config() + + model_loader = get_model_loader(load_config) + model_loader.download_model(model_config) + + self._start_server(model, vllm_serve_args, env_dict) + max_wait_seconds = max_wait_seconds or 240 + self._wait_for_server(url=self.url_for("health"), + timeout=max_wait_seconds) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.proc.terminate() + try: + self.proc.wait(8) + except subprocess.TimeoutExpired: + # force kill if needed + self.proc.kill() + + def _poll(self) -> Optional[int]: + """Subclasses override this method to customize process polling""" + return self.proc.poll() + + def _wait_for_server(self, *, url: str, timeout: float): + # run health check + start = time.time() + client = (httpx.Client(transport=httpx.HTTPTransport( + uds=self.uds)) if self.uds else requests) + while True: + try: + if client.get(url).status_code == 200: + break + except Exception: + # this exception can only be raised by requests.get, + # which means the server is not ready yet. + # the stack trace is not useful, so we suppress it + # by using `raise from None`. + result = self._poll() + if result is not None and result != 0: + raise RuntimeError("Server exited unexpectedly.") from None + + time.sleep(0.5) + if time.time() - start > timeout: + raise RuntimeError( + "Server failed to start in time.") from None + + @property + def url_root(self) -> str: + return (f"http://{self.uds.split('/')[-1]}" + if self.uds else f"http://{self.host}:{self.port}") + + def url_for(self, *parts: str) -> str: + return self.url_root + "/" + "/".join(parts) + + def get_client(self, **kwargs): + if "timeout" not in kwargs: + kwargs["timeout"] = 600 + return openai.OpenAI( + base_url=self.url_for("v1"), + api_key=self.DUMMY_API_KEY, + max_retries=0, + **kwargs, + ) + + def get_async_client(self, **kwargs): + if "timeout" not in kwargs: + kwargs["timeout"] = 600 + return openai.AsyncOpenAI(base_url=self.url_for("v1"), + api_key=self.DUMMY_API_KEY, + max_retries=0, + **kwargs) From 6d47582344cd58ef9a9e32a979b6eeeed6e433f7 Mon Sep 17 00:00:00 2001 From: jesse Date: Thu, 9 Oct 2025 17:04:12 +0800 Subject: [PATCH 06/29] fix test Signed-off-by: jesse --- .../test_completion_with_prompt_embeds.py | 15 ++++++++++++++- vllm_ascend/worker/npu_input_batch.py | 2 +- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py index 02ba41d72d7..4bdce38c573 100644 --- a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py +++ b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py @@ -3,14 +3,26 @@ import base64 import io - +import json +import os +import subprocess +import sys +import time +from typing import Any, Optional + +import httpx import openai # use the official client for correctness check import pytest import pytest_asyncio +import requests import torch # downloading lora to test lora requests from openai import BadRequestError from transformers import AutoConfig +from vllm.engine.arg_utils import AsyncEngineArgs +from vllm.entrypoints.cli.serve import ServeSubcommand +from vllm.model_executor.model_loader import get_model_loader +from vllm.utils import (FlexibleArgumentParser, get_open_port) from ..utils import RemoteOpenAIServer @@ -19,6 +31,7 @@ CONFIG = AutoConfig.from_pretrained(MODEL_NAME) + class RemoteOpenAIServer: DUMMY_API_KEY = "token-abc123" # vLLM's OpenAI server does not need API key diff --git a/vllm_ascend/worker/npu_input_batch.py b/vllm_ascend/worker/npu_input_batch.py index 01ef3b32768..6b2165aefe8 100644 --- a/vllm_ascend/worker/npu_input_batch.py +++ b/vllm_ascend/worker/npu_input_batch.py @@ -29,7 +29,7 @@ MultiModalKwargsItems, PlaceholderRange) from vllm.pooling_params import PoolingParams from vllm.sampling_params import SamplingParams, SamplingType -from vllm.utils import length_from_prompt_token_ids_or_embeds,swap_dict_values +from vllm.utils import length_from_prompt_token_ids_or_embeds, swap_dict_values from vllm.v1.outputs import LogprobsTensors from vllm.v1.pool.metadata import PoolingMetadata from vllm.v1.sample.logits_processor import (BatchUpdateBuilder, From f706331dddb207ecbf4a8b971850ec0703047c12 Mon Sep 17 00:00:00 2001 From: jesse Date: Thu, 9 Oct 2025 17:14:53 +0800 Subject: [PATCH 07/29] fix test Signed-off-by: jesse --- tests/e2e/singlecard/test_completion_with_prompt_embeds.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py index 4bdce38c573..e0ae0390fa5 100644 --- a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py +++ b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py @@ -22,9 +22,7 @@ from vllm.engine.arg_utils import AsyncEngineArgs from vllm.entrypoints.cli.serve import ServeSubcommand from vllm.model_executor.model_loader import get_model_loader -from vllm.utils import (FlexibleArgumentParser, get_open_port) - -from ..utils import RemoteOpenAIServer +from vllm.utils import FlexibleArgumentParser, get_open_port # any model with a chat template should work here MODEL_NAME = "facebook/opt-125m" From 7d6f8192cec65e41841c92b6e4f3704cc9b33a16 Mon Sep 17 00:00:00 2001 From: jesse Date: Fri, 10 Oct 2025 09:46:17 +0800 Subject: [PATCH 08/29] fix test Signed-off-by: jesse --- tests/e2e/singlecard/test_completion_with_prompt_embeds.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py index e0ae0390fa5..58a1926e0ac 100644 --- a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py +++ b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py @@ -217,7 +217,7 @@ def example_prompt_embeds(hf_runner): @pytest.fixture(scope="module", - params=["", "--disable-frontend-multiprocessing"]) + params=["", "--disable-frontend-multiprocessing --enable-prompt-embeds"]) def server_with_prompt_embeds(default_server_args, request): if request.param: default_server_args.append(request.param) From 24706a5eea84c2ef6946490231781fe3ff6221b8 Mon Sep 17 00:00:00 2001 From: jesse Date: Fri, 10 Oct 2025 10:20:50 +0800 Subject: [PATCH 09/29] fix test Signed-off-by: jesse --- tests/e2e/singlecard/test_completion_with_prompt_embeds.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py index 58a1926e0ac..b0b82dd8bb5 100644 --- a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py +++ b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py @@ -216,8 +216,9 @@ def example_prompt_embeds(hf_runner): return [_encode_embeds(item) for item in example_embeddings] -@pytest.fixture(scope="module", - params=["", "--disable-frontend-multiprocessing --enable-prompt-embeds"]) +@pytest.fixture( + scope="module", + params=["", "--disable-frontend-multiprocessing --enable-prompt-embeds"]) def server_with_prompt_embeds(default_server_args, request): if request.param: default_server_args.append(request.param) From cff8886cda1fad6e73bb3429a633d23172a3e987 Mon Sep 17 00:00:00 2001 From: jesse Date: Fri, 10 Oct 2025 16:40:34 +0800 Subject: [PATCH 10/29] fix test Signed-off-by: jesse --- vllm_ascend/worker/model_runner_v1.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index a5551d310e5..bf0c4639d36 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -1090,7 +1090,8 @@ def _prepare_input_ids(self, total_num_scheduled_tokens: int, self.input_ids[:total_num_scheduled_tokens].copy_( self.input_ids_cpu[:total_num_scheduled_tokens], non_blocking=True) - self.inputs_embeds.copy_to_gpu(total_num_scheduled_tokens) + if self.is_multimodal_model or self.enable_prompt_embeds: + self.inputs_embeds.copy_to_gpu(total_num_scheduled_tokens) self.is_token_ids.copy_to_gpu(total_num_scheduled_tokens) return @@ -1119,7 +1120,8 @@ def _prepare_input_ids(self, total_num_scheduled_tokens: int, self.input_ids[:total_num_scheduled_tokens].copy_( self.input_ids_cpu[:total_num_scheduled_tokens], non_blocking=True) - self.inputs_embeds.copy_to_gpu(total_num_scheduled_tokens) + if self.is_multimodal_model or self.enable_prompt_embeds: + self.inputs_embeds.copy_to_gpu(total_num_scheduled_tokens) self.is_token_ids.copy_to_gpu(total_num_scheduled_tokens) if num_commmon_tokens == 0: # No requests in common with the previous iteration @@ -1293,7 +1295,8 @@ def _prepare_inputs( # Because we did not pre-allocate a massive prompt_embeds CPU tensor on # the InputBatch, we need to fill in the prompt embeds into the expected # spots in the GpuModelRunner's pre-allocated prompt_embeds tensor. - if self.input_batch.req_prompt_embeds: + if self.input_batch.req_prompt_embeds and (self.is_multimodal_model or + self.enable_prompt_embeds): output_idx = 0 for req_idx in range(num_reqs): num_sched = num_scheduled_tokens[req_idx] @@ -3627,4 +3630,4 @@ def _to_list(self, sampled_token_ids: torch.Tensor) -> list[list[int]]: pinned.copy_(sampled_token_ids, non_blocking=True) self.transfer_event.record() self.transfer_event.synchronize() - return pinned.tolist() \ No newline at end of file + return pinned.tolist() From 4daf97067e004060c7e8281b3585783c6cf52a0d Mon Sep 17 00:00:00 2001 From: jesse Date: Sat, 11 Oct 2025 08:41:21 +0800 Subject: [PATCH 11/29] fix test Signed-off-by: jesse --- vllm_ascend/worker/model_runner_v1.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index bf0c4639d36..55cdd0d3d94 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -1390,7 +1390,7 @@ def _prepare_inputs( # _prepare_inputs may reorder the batch, so we must gather # multi-modal outputs after that to ensure the correct order - if self.is_multimodal_model: + if self.is_multimodal_model or self.enable_prompt_embeds: # Run the multimodal encoder if any. self._execute_mm_encoder(scheduler_output) mm_embeds = self._gather_mm_embeddings(scheduler_output) @@ -2411,7 +2411,7 @@ def _dummy_run( with self.maybe_dummy_run_with_lora(self.lora_config, num_scheduled_tokens): - if self.is_multimodal_model: + if self.is_multimodal_model or self.enable_prompt_embeds: input_ids = None inputs_embeds = self.inputs_embeds[:num_tokens] elif self.enable_prompt_embeds: From 41b6cdc4e9e430b26dc4a00591cd666ecf08fdc8 Mon Sep 17 00:00:00 2001 From: jesse Date: Sat, 11 Oct 2025 09:22:38 +0800 Subject: [PATCH 12/29] fix test Signed-off-by: jesse --- vllm_ascend/worker/model_runner_v1.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 55cdd0d3d94..c7b6c84daf1 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -2411,9 +2411,9 @@ def _dummy_run( with self.maybe_dummy_run_with_lora(self.lora_config, num_scheduled_tokens): - if self.is_multimodal_model or self.enable_prompt_embeds: + if self.is_multimodal_model: input_ids = None - inputs_embeds = self.inputs_embeds[:num_tokens] + inputs_embeds = None elif self.enable_prompt_embeds: input_ids = None inputs_embeds = self.inputs_embeds.gpu[:num_tokens] From 565e3bf04ab5816eaa9feee40556fb1dd28ec65e Mon Sep 17 00:00:00 2001 From: jesse Date: Sat, 11 Oct 2025 11:23:06 +0800 Subject: [PATCH 13/29] fix test Signed-off-by: jesse --- vllm_ascend/worker/model_runner_v1.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index c7b6c84daf1..66c78a7264e 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -2411,10 +2411,7 @@ def _dummy_run( with self.maybe_dummy_run_with_lora(self.lora_config, num_scheduled_tokens): - if self.is_multimodal_model: - input_ids = None - inputs_embeds = None - elif self.enable_prompt_embeds: + if self.is_multimodal_model or self.enable_prompt_embeds: input_ids = None inputs_embeds = self.inputs_embeds.gpu[:num_tokens] else: From b84600a4e00881128d7ced8f5b228db2404a8bc5 Mon Sep 17 00:00:00 2001 From: jesse Date: Mon, 13 Oct 2025 14:50:20 +0800 Subject: [PATCH 14/29] fix test Signed-off-by: jesse --- vllm_ascend/worker/model_runner_v1.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 66c78a7264e..4b6ca37f46e 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -1405,7 +1405,7 @@ def _prepare_inputs( else: inputs_embeds = self.model.get_input_embeddings(input_ids) # TODO(woosuk): Avoid the copy. Optimize. - self.inputs_embeds[:total_num_scheduled_tokens].copy_( + self.inputs_embeds.gpu[:total_num_scheduled_tokens].copy_( inputs_embeds) inputs_embeds = self.inputs_embeds[:num_input_tokens] input_ids = None From 8973cd3eee60aece04d720e7725208bcbed23a9f Mon Sep 17 00:00:00 2001 From: jesse Date: Mon, 13 Oct 2025 15:20:07 +0800 Subject: [PATCH 15/29] fix test Signed-off-by: jesse --- vllm_ascend/worker/model_runner_v1.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 4b6ca37f46e..2249ae5b729 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -1407,7 +1407,7 @@ def _prepare_inputs( # TODO(woosuk): Avoid the copy. Optimize. self.inputs_embeds.gpu[:total_num_scheduled_tokens].copy_( inputs_embeds) - inputs_embeds = self.inputs_embeds[:num_input_tokens] + inputs_embeds = self.inputs_embeds.gpu[:num_input_tokens] input_ids = None else: # For text-only models, we use token ids as input. From aef7626f0405dd0dbf5bc06c39aa8fadedb08dbb Mon Sep 17 00:00:00 2001 From: jesse Date: Thu, 16 Oct 2025 10:40:46 +0800 Subject: [PATCH 16/29] fix test Signed-off-by: jesse --- vllm_ascend/worker/model_runner_v1.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 547f264a9cc..504a66fadb7 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -1413,7 +1413,7 @@ def _prepare_inputs( # _prepare_inputs may reorder the batch, so we must gather # multi-modal outputs after that to ensure the correct order - if self.is_multimodal_model or self.enable_prompt_embeds: + if self.is_multimodal_model: # Run the multimodal encoder if any. self._execute_mm_encoder(scheduler_output) mm_embeds = self._gather_mm_embeddings(scheduler_output) From 60dbffe408209822b94e6a245d86364d349729b4 Mon Sep 17 00:00:00 2001 From: jesse Date: Thu, 16 Oct 2025 10:58:18 +0800 Subject: [PATCH 17/29] fix code Signed-off-by: jesse --- vllm_ascend/worker/model_runner_v1.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 504a66fadb7..af40012dc3c 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -1450,7 +1450,7 @@ def _prepare_inputs( .squeeze(1) # Some tokens ids may need to become embeds if token_ids_idx.numel() > 0: - token_ids = self.input_ids.gpu[token_ids_idx] + token_ids = self.input_ids[token_ids_idx] tokens_to_embeds = self.model.get_input_embeddings( input_ids=token_ids) self.inputs_embeds.gpu[token_ids_idx] = tokens_to_embeds From 6a5ea171a26626545a2792b0aa3685d3bb53e04c Mon Sep 17 00:00:00 2001 From: jesse Date: Thu, 16 Oct 2025 11:00:40 +0800 Subject: [PATCH 18/29] add example Signed-off-by: jesse --- examples/prompt_embed_inference.py | 97 ++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 examples/prompt_embed_inference.py diff --git a/examples/prompt_embed_inference.py b/examples/prompt_embed_inference.py new file mode 100644 index 00000000000..67c9741d96c --- /dev/null +++ b/examples/prompt_embed_inference.py @@ -0,0 +1,97 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +Demonstrates how to generate prompt embeddings using +Hugging Face Transformers and use them as input to vLLM +for both single and batch inference. + +Model: meta-llama/Llama-3.2-1B-Instruct +Note: This model is gated on Hugging Face Hub. + You must request access to use it: + https://huggingface.co/meta-llama/Llama-3.2-1B-Instruct + +Requirements: +- vLLM +- transformers + +Run: + python examples/prompt_embed_inference.py +""" + +import torch +from transformers import AutoModelForCausalLM, AutoTokenizer, PreTrainedTokenizer + +from vllm import LLM + + +def init_tokenizer_and_llm(model_name: str): + llm = LLM(model=model_name, enable_prompt_embeds=True) + tokenizer = AutoTokenizer.from_pretrained(model_name) + transformers_model = AutoModelForCausalLM.from_pretrained(model_name) + embedding_layer = transformers_model.get_input_embeddings() + return tokenizer, embedding_layer, llm + + +def get_prompt_embeds( + chat: list[dict[str, str]], + tokenizer: PreTrainedTokenizer, + embedding_layer: torch.nn.Module, +): + token_ids = tokenizer.apply_chat_template( + chat, add_generation_prompt=True, return_tensors="pt" + ) + prompt_embeds = embedding_layer(token_ids).squeeze(0) + return prompt_embeds + + +def single_prompt_inference( + llm: LLM, tokenizer: PreTrainedTokenizer, embedding_layer: torch.nn.Module +): + chat = [{"role": "user", "content": "Please tell me about the capital of France."}] + prompt_embeds = get_prompt_embeds(chat, tokenizer, embedding_layer) + + outputs = llm.generate( + { + "prompt_embeds": prompt_embeds, + } + ) + + print("\n[Single Inference Output]") + print("-" * 30) + for o in outputs: + print(o.outputs[0].text) + print("-" * 30) + + +def batch_prompt_inference( + llm: LLM, tokenizer: PreTrainedTokenizer, embedding_layer: torch.nn.Module +): + chats = [ + [{"role": "user", "content": "Please tell me about the capital of France."}], + [{"role": "user", "content": "When is the day longest during the year?"}], + [{"role": "user", "content": "Where is bigger, the moon or the sun?"}], + ] + + prompt_embeds_list = [ + get_prompt_embeds(chat, tokenizer, embedding_layer) for chat in chats + ] + + outputs = llm.generate([{"prompt_embeds": embeds} for embeds in prompt_embeds_list]) + + print("\n[Batch Inference Outputs]") + print("-" * 30) + for i, o in enumerate(outputs): + print(f"Q{i + 1}: {chats[i][0]['content']}") + print(f"A{i + 1}: {o.outputs[0].text}\n") + print("-" * 30) + + +def main(): + model_name = "meta-llama/Llama-3.2-1B-Instruct" + tokenizer, embedding_layer, llm = init_tokenizer_and_llm(model_name) + single_prompt_inference(llm, tokenizer, embedding_layer) + batch_prompt_inference(llm, tokenizer, embedding_layer) + + +if __name__ == "__main__": + main() From 75ee4a2a7d25f41e82483259c75f8c8a645974eb Mon Sep 17 00:00:00 2001 From: jesse Date: Thu, 16 Oct 2025 11:45:10 +0800 Subject: [PATCH 19/29] fix Signed-off-by: jesse --- vllm_ascend/worker/model_runner_v1.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index af40012dc3c..b8b35a33137 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -1445,7 +1445,7 @@ def _prepare_inputs( # If a batch only has token ids, then including the embedding layer # in the acl graph will be more performant (like in the else case # below). - token_ids_idx = self.is_token_ids.gpu[:num_scheduled_tokens] \ + token_ids_idx = self.is_token_ids.gpu[:total_num_scheduled_tokens] \ .nonzero(as_tuple=False) \ .squeeze(1) # Some tokens ids may need to become embeds From b072b3ce66a6ab441f9d18ed4fc73bc12334a980 Mon Sep 17 00:00:00 2001 From: jesse Date: Thu, 23 Oct 2025 16:33:20 +0800 Subject: [PATCH 20/29] fix comment Signed-off-by: jesse --- tests/e2e/singlecard/test_completion_with_prompt_embeds.py | 2 +- tests/e2e/utils.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py index b0b82dd8bb5..ba5365f88ae 100644 --- a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py +++ b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py @@ -38,7 +38,7 @@ def _start_server(self, model: str, vllm_serve_args: list[str], """Subclasses override this method to customize server process launch """ env = os.environ.copy() - # the current process might initialize cuda, + # the current process might initialize npu, # to be safe, we should use spawn method env['VLLM_WORKER_MULTIPROC_METHOD'] = 'spawn' if env_dict is not None: diff --git a/tests/e2e/utils.py b/tests/e2e/utils.py index 532f424a7c3..2adbd78760a 100644 --- a/tests/e2e/utils.py +++ b/tests/e2e/utils.py @@ -125,7 +125,7 @@ def _start_server(self, model: str, vllm_serve_args: list[str], """Subclasses override this method to customize server process launch """ env = os.environ.copy() - # the current process might initialize cuda, + # the current process might initialize npu, # to be safe, we should use spawn method env['VLLM_WORKER_MULTIPROC_METHOD'] = 'spawn' if env_dict is not None: From dfada670c78aa062bbfbeea71c9880883378b430 Mon Sep 17 00:00:00 2001 From: jesse Date: Fri, 24 Oct 2025 15:49:40 +0800 Subject: [PATCH 21/29] remove unused Signed-off-by: jesse --- vllm_ascend/worker/model_runner_v1.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index bef8313c78e..657afaa213f 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -4029,18 +4029,3 @@ def _list_to_tensor(lst, device, dtype=torch.int32): long_seq_metadata.pcp_prefill_mask = self.extra_long_seq_kwargs[ 'pcp_prefill_mask'] return long_seq_metadata - - def _to_list(self, sampled_token_ids: torch.Tensor) -> list[list[int]]: - # This is a short term mitigation for issue mentioned in - # https://github.com/vllm-project/vllm/issues/22754. - # `tolist` would trigger a npu wise stream sync, which - # would block other copy ops from other npu streams. - # A npu event sync would avoid such a situation. Since - # this is in the critical path of every single model - # forward loop, this has caused perf issue for a disagg - # setup. - pinned = self.sampled_token_ids_pinned_cpu[:sampled_token_ids.shape[0]] - pinned.copy_(sampled_token_ids, non_blocking=True) - self.transfer_event.record() - self.transfer_event.synchronize() - return pinned.tolist() From 407bf7553066684c149b50d2126d1c3d3ab4909c Mon Sep 17 00:00:00 2001 From: jesse Date: Sun, 26 Oct 2025 09:40:48 +0800 Subject: [PATCH 22/29] add test to workflows Signed-off-by: jesse --- .github/workflows/_e2e_test.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/_e2e_test.yaml b/.github/workflows/_e2e_test.yaml index 1638e7f198e..9e89d34f285 100644 --- a/.github/workflows/_e2e_test.yaml +++ b/.github/workflows/_e2e_test.yaml @@ -88,6 +88,7 @@ jobs: # We found that if running aclgraph tests in batch, it will cause AclmdlRICaptureBegin error. So we run # the test separately. + pytest -sv tests/e2e/singlecard/test_completion_with_prompt_embeds.py pytest -sv tests/e2e/singlecard/test_aclgraph.py pytest -sv tests/e2e/singlecard/test_ascend_scheduler.py pytest -sv tests/e2e/singlecard/test_bge_model.py From d3b9fbb8de9fe147dfeb1cecf758a4a2c7023559 Mon Sep 17 00:00:00 2001 From: jesse Date: Sun, 26 Oct 2025 10:25:47 +0800 Subject: [PATCH 23/29] fix test Signed-off-by: jesse --- .../test_completion_with_prompt_embeds.py | 611 ++++++------------ vllm_ascend/worker/model_runner_v1.py | 3 +- 2 files changed, 192 insertions(+), 422 deletions(-) diff --git a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py index ba5365f88ae..cf52c8b0721 100644 --- a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py +++ b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py @@ -1,425 +1,196 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project - -import base64 -import io -import json +# +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# Copyright 2023 The vLLM team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# This file is a part of the vllm-ascend project. +# import os -import subprocess -import sys -import time -from typing import Any, Optional -import httpx -import openai # use the official client for correctness check import pytest -import pytest_asyncio -import requests -import torch -# downloading lora to test lora requests -from openai import BadRequestError -from transformers import AutoConfig -from vllm.engine.arg_utils import AsyncEngineArgs -from vllm.entrypoints.cli.serve import ServeSubcommand -from vllm.model_executor.model_loader import get_model_loader -from vllm.utils import FlexibleArgumentParser, get_open_port - -# any model with a chat template should work here -MODEL_NAME = "facebook/opt-125m" - -CONFIG = AutoConfig.from_pretrained(MODEL_NAME) - - -class RemoteOpenAIServer: - DUMMY_API_KEY = "token-abc123" # vLLM's OpenAI server does not need API key - - def _start_server(self, model: str, vllm_serve_args: list[str], - env_dict: Optional[dict[str, str]]) -> None: - """Subclasses override this method to customize server process launch - """ - env = os.environ.copy() - # the current process might initialize npu, - # to be safe, we should use spawn method - env['VLLM_WORKER_MULTIPROC_METHOD'] = 'spawn' - if env_dict is not None: - env.update(env_dict) - self.proc: subprocess.Popen = subprocess.Popen( - ["vllm", "serve", model, *vllm_serve_args], - env=env, - stdout=sys.stdout, - stderr=sys.stderr, - ) - - def __init__(self, - model: str, - vllm_serve_args: list[str], - *, - env_dict: Optional[dict[str, str]] = None, - seed: Optional[int] = 0, - auto_port: bool = True, - max_wait_seconds: Optional[float] = None, - override_hf_configs: Optional[dict[str, Any]] = None) -> None: - if auto_port: - if "-p" in vllm_serve_args or "--port" in vllm_serve_args: - raise ValueError("You have manually specified the port " - "when `auto_port=True`.") - - # No need for a port if using unix sockets - if "--uds" not in vllm_serve_args: - # Don't mutate the input args - vllm_serve_args = vllm_serve_args + [ - "--port", str(get_open_port()) - ] - if seed is not None: - if "--seed" in vllm_serve_args: - raise ValueError("You have manually specified the seed " - f"when `seed={seed}`.") - - vllm_serve_args = vllm_serve_args + ["--seed", str(seed)] - - if override_hf_configs is not None: - vllm_serve_args = vllm_serve_args + [ - "--hf-overrides", - json.dumps(override_hf_configs) - ] - - parser = FlexibleArgumentParser( - description="vLLM's remote OpenAI server.") - subparsers = parser.add_subparsers(required=False, dest="subparser") - parser = ServeSubcommand().subparser_init(subparsers) - args = parser.parse_args(["--model", model, *vllm_serve_args]) - self.uds = args.uds - if args.uds: - self.host = None - self.port = None - else: - self.host = str(args.host or 'localhost') - self.port = int(args.port) - - self.show_hidden_metrics = \ - args.show_hidden_metrics_for_version is not None - - # download the model before starting the server to avoid timeout - is_local = os.path.isdir(model) - if not is_local: - engine_args = AsyncEngineArgs.from_cli_args(args) - model_config = engine_args.create_model_config() - load_config = engine_args.create_load_config() - - model_loader = get_model_loader(load_config) - model_loader.download_model(model_config) - - self._start_server(model, vllm_serve_args, env_dict) - max_wait_seconds = max_wait_seconds or 240 - self._wait_for_server(url=self.url_for("health"), - timeout=max_wait_seconds) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - self.proc.terminate() - try: - self.proc.wait(8) - except subprocess.TimeoutExpired: - # force kill if needed - self.proc.kill() - - def _poll(self) -> Optional[int]: - """Subclasses override this method to customize process polling""" - return self.proc.poll() - - def _wait_for_server(self, *, url: str, timeout: float): - # run health check - start = time.time() - client = (httpx.Client(transport=httpx.HTTPTransport( - uds=self.uds)) if self.uds else requests) - while True: - try: - if client.get(url).status_code == 200: - break - except Exception: - # this exception can only be raised by requests.get, - # which means the server is not ready yet. - # the stack trace is not useful, so we suppress it - # by using `raise from None`. - result = self._poll() - if result is not None and result != 0: - raise RuntimeError("Server exited unexpectedly.") from None - - time.sleep(0.5) - if time.time() - start > timeout: - raise RuntimeError( - "Server failed to start in time.") from None - - @property - def url_root(self) -> str: - return (f"http://{self.uds.split('/')[-1]}" - if self.uds else f"http://{self.host}:{self.port}") - - def url_for(self, *parts: str) -> str: - return self.url_root + "/" + "/".join(parts) - - def get_client(self, **kwargs): - if "timeout" not in kwargs: - kwargs["timeout"] = 600 - return openai.OpenAI( - base_url=self.url_for("v1"), - api_key=self.DUMMY_API_KEY, - max_retries=0, - **kwargs, - ) - - def get_async_client(self, **kwargs): - if "timeout" not in kwargs: - kwargs["timeout"] = 600 - return openai.AsyncOpenAI(base_url=self.url_for("v1"), - api_key=self.DUMMY_API_KEY, - max_retries=0, - **kwargs) - - -@pytest.fixture(scope="module") -def default_server_args() -> list[str]: - return [ - # use half precision for speed and memory savings in CI environment - "--dtype", - "bfloat16", - "--max-model-len", - "2048", - "--max-num-seqs", - "128", - "--enforce-eager", - # Prompt Embeds server args - "--enable-prompt-embeds", +from transformers import AutoModelForCausalLM, AutoTokenizer + +from tests.e2e.conftest import VllmRunner + +os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" + +MODELS = ["facebook/opt-125m"] + + +def get_prompt_embeds(chat, tokenizer, embedding_layer): + """Convert chat messages to prompt embeddings.""" + token_ids = tokenizer.apply_chat_template(chat, + add_generation_prompt=True, + return_tensors='pt') + prompt_embeds = embedding_layer(token_ids).squeeze(0) + return prompt_embeds + + +@pytest.mark.parametrize("model_name", MODELS) +def test_single_prompt_embeds_inference(model_name): + """Test single prompt inference with prompt embeddings.""" + # Prepare prompt embeddings + tokenizer = AutoTokenizer.from_pretrained(model_name) + transformers_model = AutoModelForCausalLM.from_pretrained(model_name) + embedding_layer = transformers_model.get_input_embeddings() + + chat = [{ + "role": "user", + "content": "Please tell me about the capital of France." + }] + prompt_embeds = get_prompt_embeds(chat, tokenizer, embedding_layer) + + # Run inference with prompt embeddings + with VllmRunner( + model_name, + enable_prompt_embeds=True, + enforce_eager=True, + ) as vllm_runner: + outputs = vllm_runner.model.generate({ + "prompt_embeds": prompt_embeds, + }) + + # Verify output + assert len(outputs) == 1 + assert len(outputs[0].outputs) > 0 + assert len(outputs[0].outputs[0].text) > 0 + print(f"\n[Single Inference Output]: {outputs[0].outputs[0].text}") + + +@pytest.mark.parametrize("model_name", MODELS) +def test_batch_prompt_embeds_inference(model_name): + """Test batch prompt inference with prompt embeddings.""" + # Prepare prompt embeddings + tokenizer = AutoTokenizer.from_pretrained(model_name) + transformers_model = AutoModelForCausalLM.from_pretrained(model_name) + embedding_layer = transformers_model.get_input_embeddings() + + chats = [[{ + "role": "user", + "content": "Please tell me about the capital of France." + }], + [{ + "role": "user", + "content": "When is the day longest during the year?" + }], + [{ + "role": "user", + "content": "Where is bigger, the moon or the sun?" + }]] + + prompt_embeds_list = [ + get_prompt_embeds(chat, tokenizer, embedding_layer) for chat in chats ] - -EXAMPLE_PROMPTS = [ - "Hello, my name is", - "What is an LLM?", -] - - -def _encode_embeds(embeds: torch.Tensor): - buffer = io.BytesIO() - torch.save(embeds, buffer) - return base64.b64encode(buffer.getvalue()).decode('utf-8') - - -@pytest.fixture(scope="module") -def example_prompt_embeds(hf_runner): - """Create example embeddings and return them as base64 encoded string.""" - with hf_runner(MODEL_NAME) as hf_model: - example_embeddings = hf_model.get_prompt_embeddings(EXAMPLE_PROMPTS) - - return [_encode_embeds(item) for item in example_embeddings] - - -@pytest.fixture( - scope="module", - params=["", "--disable-frontend-multiprocessing --enable-prompt-embeds"]) -def server_with_prompt_embeds(default_server_args, request): - if request.param: - default_server_args.append(request.param) - - with RemoteOpenAIServer(MODEL_NAME, default_server_args) as remote_server: - yield remote_server - - -@pytest_asyncio.fixture -async def client_with_prompt_embeds(server_with_prompt_embeds): - async with server_with_prompt_embeds.get_async_client() as async_client: - yield async_client - - -@pytest.mark.asyncio -@pytest.mark.parametrize("model_name", [MODEL_NAME]) -async def test_completions_with_prompt_embeds( - example_prompt_embeds, - client_with_prompt_embeds: openai.AsyncOpenAI, - model_name: str, -): - encoded_embeds, encoded_embeds2 = example_prompt_embeds - - # Test case: Single prompt embeds input - completion = await client_with_prompt_embeds.completions.create( - model=model_name, - prompt="", # Add empty prompt as required parameter - max_tokens=5, - temperature=0.0, - extra_body={"prompt_embeds": encoded_embeds}) - assert len(completion.choices[0].text) >= 1 - assert completion.choices[0].prompt_logprobs is None - - # Test case: batch completion with prompt_embeds - completion = await client_with_prompt_embeds.completions.create( - model=model_name, - prompt="", # Add empty prompt as required parameter - max_tokens=5, - temperature=0.0, - extra_body={"prompt_embeds": [encoded_embeds, encoded_embeds2]}) - assert len(completion.choices) == 2 - assert len(completion.choices[0].text) >= 1 - assert len(completion.choices[1].text) >= 1 - - # Test case: streaming with prompt_embeds - single_completion = await client_with_prompt_embeds.completions.create( - model=model_name, - prompt="", # Add empty prompt as required parameter - max_tokens=5, - temperature=0.0, - extra_body={"prompt_embeds": encoded_embeds}) - single_output = single_completion.choices[0].text - - stream = await client_with_prompt_embeds.completions.create( - model=model_name, - prompt="", # Add empty prompt as required parameter - max_tokens=5, - temperature=0.0, - stream=True, - extra_body={"prompt_embeds": encoded_embeds}) - chunks = [] - finish_reason_count = 0 - async for chunk in stream: - chunks.append(chunk.choices[0].text) - if chunk.choices[0].finish_reason is not None: - finish_reason_count += 1 - assert finish_reason_count == 1 - assert chunk.choices[0].finish_reason == "length" - assert chunk.choices[0].text - assert "".join(chunks) == single_output - - # Test case: batch streaming with prompt_embeds - stream = await client_with_prompt_embeds.completions.create( - model=model_name, - prompt="", # Add empty prompt as required parameter - max_tokens=5, - temperature=0.0, - stream=True, - extra_body={"prompt_embeds": [encoded_embeds, encoded_embeds2]}) - chunks_stream_embeds: list[list[str]] = [[], []] - finish_reason_count = 0 - async for chunk in stream: - chunks_stream_embeds[chunk.choices[0].index].append( - chunk.choices[0].text) - if chunk.choices[0].finish_reason is not None: - finish_reason_count += 1 - assert finish_reason_count == 2 - assert chunk.choices[0].finish_reason == "length" - assert chunk.choices[0].text - assert len(chunks_stream_embeds[0]) > 0 - assert len(chunks_stream_embeds[1]) > 0 - - # Test case: mixed text and prompt_embeds - completion_mixed = await client_with_prompt_embeds.completions.create( - model=model_name, - prompt="This is a prompt", - max_tokens=5, - temperature=0.0, - extra_body={"prompt_embeds": encoded_embeds}) - assert len(completion.choices) == 2 - completion_text_only = await client_with_prompt_embeds.completions.create( - model=model_name, - prompt="This is a prompt", - max_tokens=5, - temperature=0.0, - ) - completion_embeds_only = await client_with_prompt_embeds.completions.create( - model=model_name, - prompt="", - max_tokens=5, - temperature=0.0, - extra_body={"prompt_embeds": encoded_embeds}) - # Embeddings responses should be handled first - assert completion_mixed.choices[0].text == completion_embeds_only.choices[ - 0].text - assert completion_mixed.choices[1].text == completion_text_only.choices[ - 0].text - - -@pytest.mark.asyncio -@pytest.mark.parametrize("model_name", [MODEL_NAME]) -async def test_completions_errors_with_prompt_embeds( - client_with_prompt_embeds: openai.AsyncOpenAI, model_name: str): - # Test error case: invalid prompt_embeds - with pytest.raises(BadRequestError): - await client_with_prompt_embeds.completions.create( - prompt="", - model=model_name, - max_tokens=5, - temperature=0.0, - extra_body={"prompt_embeds": "invalid_base64"}) - - -@pytest.mark.asyncio -@pytest.mark.parametrize("logprobs_arg", [1, 0]) -@pytest.mark.parametrize("model_name", [MODEL_NAME]) -async def test_completions_with_logprobs_and_prompt_embeds( - example_prompt_embeds, - client_with_prompt_embeds: openai.AsyncOpenAI, - logprobs_arg: int, - model_name: str, -): - encoded_embeds, encoded_embeds2 = example_prompt_embeds - - # Test case: Logprobs using prompt_embeds - completion = await client_with_prompt_embeds.completions.create( - model=model_name, - prompt="", # Add empty prompt as required parameter - max_tokens=5, - temperature=0.0, - echo=False, - logprobs=logprobs_arg, - extra_body={"prompt_embeds": encoded_embeds}) - - logprobs = completion.choices[0].logprobs - assert logprobs is not None - assert len(logprobs.text_offset) == 5 - assert len(logprobs.token_logprobs) == 5 - assert len(logprobs.top_logprobs) == 5 - for top_logprobs in logprobs.top_logprobs[1:]: - assert max(logprobs_arg, 1) <= len(top_logprobs) <= logprobs_arg + 1 - assert len(logprobs.tokens) == 5 - - # Test case: Log probs with batch completion and prompt_embeds - completion = await client_with_prompt_embeds.completions.create( - model=model_name, - prompt="", # Add empty prompt as required parameter - max_tokens=5, - temperature=0.0, - echo=False, - logprobs=logprobs_arg, - extra_body={"prompt_embeds": [encoded_embeds, encoded_embeds2]}) - - assert len(completion.choices) == 2 - for choice in completion.choices: - logprobs = choice.logprobs - assert logprobs is not None - assert len(logprobs.text_offset) == 5 - assert len(logprobs.token_logprobs) == 5 - assert len(logprobs.top_logprobs) == 5 - for top_logprobs in logprobs.top_logprobs[1:]: - assert max(logprobs_arg, - 1) <= len(top_logprobs) <= logprobs_arg + 1 - assert len(logprobs.tokens) == 5 - - -@pytest.mark.asyncio -async def test_prompt_logprobs_raises_error( - example_prompt_embeds, - client_with_prompt_embeds: openai.AsyncOpenAI, -): - encoded_embeds, _ = example_prompt_embeds - - with pytest.raises(BadRequestError, match="not compatible"): - await client_with_prompt_embeds.completions.create( - model=MODEL_NAME, - prompt="", - max_tokens=5, - temperature=0.0, - extra_body={ - "prompt_embeds": encoded_embeds, - "prompt_logprobs": True - }, - ) + # Run batch inference with prompt embeddings + with VllmRunner( + model_name, + enable_prompt_embeds=True, + enforce_eager=True, + ) as vllm_runner: + outputs = vllm_runner.model.generate([{ + "prompt_embeds": embeds + } for embeds in prompt_embeds_list]) + + # Verify outputs + assert len(outputs) == len(chats) + for i, output in enumerate(outputs): + assert len(output.outputs) > 0 + assert len(output.outputs[0].text) > 0 + print(f"\nQ{i+1}: {chats[i][0]['content']}") + print(f"A{i+1}: {output.outputs[0].text}") + + +@pytest.mark.parametrize("model_name", MODELS) +def test_prompt_embeds_with_aclgraph(model_name): + """Test prompt embeddings with ACL graph enabled vs disabled.""" + # Prepare prompt embeddings + tokenizer = AutoTokenizer.from_pretrained(model_name) + transformers_model = AutoModelForCausalLM.from_pretrained(model_name) + embedding_layer = transformers_model.get_input_embeddings() + + chat = [{"role": "user", "content": "What is the capital of China?"}] + prompt_embeds = get_prompt_embeds(chat, tokenizer, embedding_layer) + + # Run with ACL graph enabled (enforce_eager=False) + with VllmRunner( + model_name, + enable_prompt_embeds=True, + enforce_eager=False, + ) as vllm_aclgraph_runner: + aclgraph_outputs = vllm_aclgraph_runner.model.generate({ + "prompt_embeds": + prompt_embeds, + }) + + # Run with ACL graph disabled (enforce_eager=True) + with VllmRunner( + model_name, + enable_prompt_embeds=True, + enforce_eager=True, + ) as vllm_eager_runner: + eager_outputs = vllm_eager_runner.model.generate({ + "prompt_embeds": + prompt_embeds, + }) + + # Verify both produce valid outputs + assert len(aclgraph_outputs) == 1 + assert len(eager_outputs) == 1 + assert len(aclgraph_outputs[0].outputs[0].text) > 0 + assert len(eager_outputs[0].outputs[0].text) > 0 + + print("\n[ACL Graph Output]:", aclgraph_outputs[0].outputs[0].text) + print("[Eager Output]:", eager_outputs[0].outputs[0].text) + + # Note: Outputs may differ slightly due to different execution paths, + # but both should be valid responses + + +@pytest.mark.parametrize("model_name", MODELS) +def test_mixed_prompt_embeds_and_text(model_name): + """Test mixed inputs with both prompt embeddings and text prompts.""" + # Prepare prompt embeddings for first request + tokenizer = AutoTokenizer.from_pretrained(model_name) + transformers_model = AutoModelForCausalLM.from_pretrained(model_name) + embedding_layer = transformers_model.get_input_embeddings() + + chat = [{"role": "user", "content": "What is AI?"}] + prompt_embeds = get_prompt_embeds(chat, tokenizer, embedding_layer) + + # Prepare text prompt for second request + text_prompt = "What is machine learning?" + + # Run inference with mixed inputs + with VllmRunner( + model_name, + enable_prompt_embeds=True, + enforce_eager=True, + ) as vllm_runner: + # Test prompt embeddings + embeds_output = vllm_runner.model.generate({ + "prompt_embeds": + prompt_embeds, + }) + + # Test text prompt + text_output = vllm_runner.model.generate(text_prompt) + + # Verify both types of inputs work + assert len(embeds_output) == 1 + assert len(text_output) == 1 + assert len(embeds_output[0].outputs[0].text) > 0 + assert len(text_output[0].outputs[0].text) > 0 + + print("\n[Prompt Embeds Output]:", embeds_output[0].outputs[0].text) + print("[Text Prompt Output]:", text_output[0].outputs[0].text) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 21e44f19646..6b717ac7f96 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -72,8 +72,7 @@ from vllm.sampling_params import SamplingType from vllm.sequence import IntermediateTensors from vllm.tasks import GenerationTask, PoolingTask, SupportedTask -from vllm.utils import (cdiv, - is_pin_memory_available, +from vllm.utils import (cdiv, is_pin_memory_available, length_from_prompt_token_ids_or_embeds) from vllm.utils.jsontree import json_map_leaves from vllm.v1.attention.backends.gdn_attn import GDNAttentionMetadataBuilder From 60256c452e9f2fcb1a864953337beb509ea35134 Mon Sep 17 00:00:00 2001 From: jesse Date: Sun, 26 Oct 2025 10:59:49 +0800 Subject: [PATCH 24/29] fix test Signed-off-by: jesse --- tests/e2e/singlecard/test_completion_with_prompt_embeds.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py index cf52c8b0721..7189f323b57 100644 --- a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py +++ b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py @@ -24,7 +24,7 @@ os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" -MODELS = ["facebook/opt-125m"] +MODELS = ["meta-llama/Llama-3.2-1B-Instruct"] def get_prompt_embeds(chat, tokenizer, embedding_layer): From b4e409821e7e98c65550d27757e0515a761dc411 Mon Sep 17 00:00:00 2001 From: jesse Date: Sun, 26 Oct 2025 11:12:36 +0800 Subject: [PATCH 25/29] fix test Signed-off-by: jesse --- tests/e2e/singlecard/test_completion_with_prompt_embeds.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py index 7189f323b57..9a165281d71 100644 --- a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py +++ b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py @@ -22,6 +22,7 @@ from tests.e2e.conftest import VllmRunner +os.environ["VLLM_USE_MODELSCOPE"] = "True" os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" MODELS = ["meta-llama/Llama-3.2-1B-Instruct"] From 46358ed804af3347e3e51385431f1cfcd37e14cd Mon Sep 17 00:00:00 2001 From: jesse Date: Sun, 26 Oct 2025 11:31:29 +0800 Subject: [PATCH 26/29] fix test Signed-off-by: jesse --- tests/e2e/singlecard/test_completion_with_prompt_embeds.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py index 9a165281d71..b72dc0d0585 100644 --- a/tests/e2e/singlecard/test_completion_with_prompt_embeds.py +++ b/tests/e2e/singlecard/test_completion_with_prompt_embeds.py @@ -25,7 +25,7 @@ os.environ["VLLM_USE_MODELSCOPE"] = "True" os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" -MODELS = ["meta-llama/Llama-3.2-1B-Instruct"] +MODELS = ["Qwen/Qwen2.5-0.5B-Instruct"] def get_prompt_embeds(chat, tokenizer, embedding_layer): From c74dc8f9cc3f301bd054bae1bf2e9aeffc954e25 Mon Sep 17 00:00:00 2001 From: jesse Date: Mon, 27 Oct 2025 09:35:35 +0800 Subject: [PATCH 27/29] fix test Signed-off-by: jesse --- tests/e2e/utils.py | 162 +-------------------------------------------- 1 file changed, 1 insertion(+), 161 deletions(-) diff --git a/tests/e2e/utils.py b/tests/e2e/utils.py index 2adbd78760a..279b7672411 100644 --- a/tests/e2e/utils.py +++ b/tests/e2e/utils.py @@ -18,25 +18,14 @@ # import functools -import json import os import signal -import subprocess -import sys -import time from collections.abc import Sequence -from typing import Any, Callable, Optional +from typing import Callable -import httpx -import openai -import requests import torch import torch.nn.functional as F from typing_extensions import ParamSpec -from vllm.engine.arg_utils import AsyncEngineArgs -from vllm.entrypoints.cli.serve import ServeSubcommand -from vllm.model_executor.model_loader import get_model_loader -from vllm.utils import FlexibleArgumentParser, get_open_port _P = ParamSpec("_P") @@ -115,152 +104,3 @@ def check_embeddings_close( f"\n{name_1}:\t{embeddings_1[:16]!r}") assert sim >= 1 - tol, fail_msg - - -class RemoteOpenAIServer: - DUMMY_API_KEY = "token-abc123" # vLLM's OpenAI server does not need API key - - def _start_server(self, model: str, vllm_serve_args: list[str], - env_dict: Optional[dict[str, str]]) -> None: - """Subclasses override this method to customize server process launch - """ - env = os.environ.copy() - # the current process might initialize npu, - # to be safe, we should use spawn method - env['VLLM_WORKER_MULTIPROC_METHOD'] = 'spawn' - if env_dict is not None: - env.update(env_dict) - self.proc: subprocess.Popen = subprocess.Popen( - ["vllm", "serve", model, *vllm_serve_args], - env=env, - stdout=sys.stdout, - stderr=sys.stderr, - ) - - def __init__(self, - model: str, - vllm_serve_args: list[str], - *, - env_dict: Optional[dict[str, str]] = None, - seed: Optional[int] = 0, - auto_port: bool = True, - max_wait_seconds: Optional[float] = None, - override_hf_configs: Optional[dict[str, Any]] = None) -> None: - if auto_port: - if "-p" in vllm_serve_args or "--port" in vllm_serve_args: - raise ValueError("You have manually specified the port " - "when `auto_port=True`.") - - # No need for a port if using unix sockets - if "--uds" not in vllm_serve_args: - # Don't mutate the input args - vllm_serve_args = vllm_serve_args + [ - "--port", str(get_open_port()) - ] - if seed is not None: - if "--seed" in vllm_serve_args: - raise ValueError("You have manually specified the seed " - f"when `seed={seed}`.") - - vllm_serve_args = vllm_serve_args + ["--seed", str(seed)] - - if override_hf_configs is not None: - vllm_serve_args = vllm_serve_args + [ - "--hf-overrides", - json.dumps(override_hf_configs) - ] - - parser = FlexibleArgumentParser( - description="vLLM's remote OpenAI server.") - subparsers = parser.add_subparsers(required=False, dest="subparser") - parser = ServeSubcommand().subparser_init(subparsers) - args = parser.parse_args(["--model", model, *vllm_serve_args]) - self.uds = args.uds - if args.uds: - self.host = None - self.port = None - else: - self.host = str(args.host or 'localhost') - self.port = int(args.port) - - self.show_hidden_metrics = \ - args.show_hidden_metrics_for_version is not None - - # download the model before starting the server to avoid timeout - is_local = os.path.isdir(model) - if not is_local: - engine_args = AsyncEngineArgs.from_cli_args(args) - model_config = engine_args.create_model_config() - load_config = engine_args.create_load_config() - - model_loader = get_model_loader(load_config) - model_loader.download_model(model_config) - - self._start_server(model, vllm_serve_args, env_dict) - max_wait_seconds = max_wait_seconds or 240 - self._wait_for_server(url=self.url_for("health"), - timeout=max_wait_seconds) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - self.proc.terminate() - try: - self.proc.wait(8) - except subprocess.TimeoutExpired: - # force kill if needed - self.proc.kill() - - def _poll(self) -> Optional[int]: - """Subclasses override this method to customize process polling""" - return self.proc.poll() - - def _wait_for_server(self, *, url: str, timeout: float): - # run health check - start = time.time() - client = (httpx.Client(transport=httpx.HTTPTransport( - uds=self.uds)) if self.uds else requests) - while True: - try: - if client.get(url).status_code == 200: - break - except Exception: - # this exception can only be raised by requests.get, - # which means the server is not ready yet. - # the stack trace is not useful, so we suppress it - # by using `raise from None`. - result = self._poll() - if result is not None and result != 0: - raise RuntimeError("Server exited unexpectedly.") from None - - time.sleep(0.5) - if time.time() - start > timeout: - raise RuntimeError( - "Server failed to start in time.") from None - - @property - def url_root(self) -> str: - return (f"http://{self.uds.split('/')[-1]}" - if self.uds else f"http://{self.host}:{self.port}") - - def url_for(self, *parts: str) -> str: - return self.url_root + "/" + "/".join(parts) - - def get_client(self, **kwargs): - if "timeout" not in kwargs: - kwargs["timeout"] = 600 - return openai.OpenAI( - base_url=self.url_for("v1"), - api_key=self.DUMMY_API_KEY, - max_retries=0, - **kwargs, - ) - - def get_async_client(self, **kwargs): - if "timeout" not in kwargs: - kwargs["timeout"] = 600 - return openai.AsyncOpenAI(base_url=self.url_for("v1"), - api_key=self.DUMMY_API_KEY, - max_retries=0, - **kwargs) From 256414b4c8b7faf7a55cf27eea7b9dd7c735f9a7 Mon Sep 17 00:00:00 2001 From: jesse Date: Tue, 28 Oct 2025 16:43:37 +0800 Subject: [PATCH 28/29] fix test Signed-off-by: jesse --- tests/e2e/common.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/common.sh b/tests/e2e/common.sh index bb99b386231..8222a31070f 100644 --- a/tests/e2e/common.sh +++ b/tests/e2e/common.sh @@ -1,4 +1,4 @@ -# bash fonts colors +# bash fonts colors cyan='\e[96m' yellow='\e[33m' red='\e[31m' From 182c91168bc31a6a0e1cbe146e129463816c96ce Mon Sep 17 00:00:00 2001 From: jesse Date: Tue, 28 Oct 2025 16:43:46 +0800 Subject: [PATCH 29/29] fix test Signed-off-by: jesse --- tests/e2e/common.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/common.sh b/tests/e2e/common.sh index 8222a31070f..bb99b386231 100644 --- a/tests/e2e/common.sh +++ b/tests/e2e/common.sh @@ -1,4 +1,4 @@ -# bash fonts colors +# bash fonts colors cyan='\e[96m' yellow='\e[33m' red='\e[31m'