From 1aa1feb2e82e3918c484ef217f0f3bebb1c14ddc Mon Sep 17 00:00:00 2001 From: Kobe Chen Date: Wed, 27 Aug 2025 23:47:39 +0000 Subject: [PATCH 01/17] Custom modifications to production stack - Modified router service templates and configs - Updated vLLM router application logic - Enhanced request parsing and service discovery - Added custom Dockerfile for production deployment - Updated disaggregated prefill configuration Signed-off-by: Kobe Chen --- docker/Dockerfile.pd | 29 +++ helm/templates/deployment-router.yaml | 30 +++ helm/templates/deployment-vllm-multi.yaml | 69 ++++-- helm/templates/service-router.yaml | 20 ++ helm/templates/service-vllm.yaml | 20 ++ src/vllm_router/app.py | 16 ++ src/vllm_router/parsers/parser.py | 29 +++ src/vllm_router/service_discovery.py | 35 ++- .../services/request_service/request.py | 200 +++++++++++++++++- .../assets/values-16-disagg-prefill.yaml | 57 +++-- 10 files changed, 459 insertions(+), 46 deletions(-) create mode 100644 docker/Dockerfile.pd diff --git a/docker/Dockerfile.pd b/docker/Dockerfile.pd new file mode 100644 index 000000000..4c89e27de --- /dev/null +++ b/docker/Dockerfile.pd @@ -0,0 +1,29 @@ +FROM lmcache/vllm-openai:latest-nightly + +WORKDIR /app + +# hadolint ignore=DL3008 +RUN --mount=type=cache,target=/var/lib/apt --mount=type=cache,target=/var/cache/apt \ + apt-get update && \ + apt-get install -y --no-install-recommends git && \ + rm -rf /var/lib/apt/lists/* + +# Copy the pyproject.toml and the git metadata first (leverage Docker layer caching) +COPY pyproject.toml . +COPY .git/ .git/ + +# Copy the rest of the application code +COPY src/ src/ + +ARG INSTALL_OPTIONAL_DEP=semantic_cache,lmcache +ENV INSTALL_OPTIONAL_DEP=${INSTALL_OPTIONAL_DEP} + +# hadolint ignore=SC1091 +RUN . /opt/venv/bin/activate && \ + uv pip install --upgrade --no-cache-dir pip setuptools_scm && \ + uv pip install --no-cache-dir . && \ + uv pip install zmq msgspec + +# Set the entrypoint +ENTRYPOINT ["/opt/venv/bin/vllm-router"] +CMD [] diff --git a/helm/templates/deployment-router.yaml b/helm/templates/deployment-router.yaml index cef012716..b1ad9f011 100644 --- a/helm/templates/deployment-router.yaml +++ b/helm/templates/deployment-router.yaml @@ -113,6 +113,26 @@ spec: - "--lmcache-controller-port" - "{{ .Values.routerSpec.lmcacheControllerPort }}" {{- end }} + {{- if .Values.routerSpec.nixlPeerHost }} + - "--nixl-peer-host" + - "{{ .Values.routerSpec.nixlPeerHost }}" + {{- end }} + {{- if .Values.routerSpec.nixlPeerInitPort }} + - "--nixl-peer-init-port" + - "{{ .Values.routerSpec.nixlPeerInitPort }}" + {{- end }} + {{- if .Values.routerSpec.nixlPeerAllocPort }} + - "--nixl-peer-alloc-port" + - "{{ .Values.routerSpec.nixlPeerAllocPort }}" + {{- end }} + {{- if .Values.routerSpec.nixlProxyHost }} + - "--nixl-proxy-host" + - "{{ .Values.routerSpec.nixlProxyHost }}" + {{- end }} + {{- if .Values.routerSpec.nixlProxyPort }} + - "--nixl-proxy-port" + - "{{ .Values.routerSpec.nixlProxyPort }}" + {{- end }} {{- if .Values.routerSpec.resources }} resources: {{- if .Values.routerSpec.resources.requests }} @@ -135,6 +155,16 @@ spec: containerPort: {{ .Values.routerSpec.containerPort }} - name: "lmcache-port" containerPort: 9000 + - name: pd-port-1 + containerPort: 7100 + - name: pd-port-2 + containerPort: 7200 + - name: pd-port-3 + containerPort: 7300 + - name: pd-port-4 + containerPort: 7400 + - name: pd-port-5 + containerPort: 7500 livenessProbe: initialDelaySeconds: 30 periodSeconds: 5 diff --git a/helm/templates/deployment-vllm-multi.yaml b/helm/templates/deployment-vllm-multi.yaml index 2a63e2f5c..e77aa5856 100644 --- a/helm/templates/deployment-vllm-multi.yaml +++ b/helm/templates/deployment-vllm-multi.yaml @@ -183,7 +183,11 @@ spec: {{- if $modelSpec.lmcacheConfig.enabled }} {{- if hasKey $modelSpec.lmcacheConfig "enablePD" }} - "--kv-transfer-config" - - '{"kv_connector":"LMCacheConnectorV1","kv_role":"{{ $kv_role }}","kv_connector_extra_config":{"discard_partial_chunks": false, "lmcache_rpc_port": {{ $modelSpec.lmcacheConfig.nixlRole | quote }}}}' + {{- if eq $kv_role "kv_producer" }} + - '{"kv_connector":"LMCacheConnectorV1","kv_role":"{{ $kv_role }}","kv_connector_extra_config":{"discard_partial_chunks": false, "lmcache_rpc_port": "{{ $modelSpec.lmcacheConfig.rpcPort | default "producer1" }}"}}' + {{- else }} + - '{"kv_connector":"LMCacheConnectorV1","kv_role":"{{ $kv_role }}","kv_connector_extra_config":{"discard_partial_chunks": false, "lmcache_rpc_port": "{{ $modelSpec.lmcacheConfig.rpcPort | default "consumer1" }}", "skip_last_n_tokens": {{ $modelSpec.lmcacheConfig.skipLastNTokens | default 1 }}}}' + {{- end }} {{- else if and (hasKey $modelSpec.vllmConfig "v0") (eq (toString $modelSpec.vllmConfig.v0) "1") }} - "--kv-transfer-config" - '{"kv_connector":"LMCacheConnector","kv_role":"{{ $kv_role }}"}' @@ -259,16 +263,18 @@ spec: value: "True" - name: VLLM_RPC_TIMEOUT value: "1000000" + - name: PYTHONHASHSEED + value: "0" + - name: VLLM_ENABLE_V1_MULTIPROCESSING + value: "1" + - name: VLLM_WORKER_MULTIPROC_METHOD + value: "spawn" {{- end }} {{- if hasKey $modelSpec.lmcacheConfig "cudaVisibleDevices" }} - name: CUDA_VISIBLE_DEVICES value: {{ $modelSpec.lmcacheConfig.cudaVisibleDevices | quote }} {{- end }} {{- if and (hasKey $modelSpec.lmcacheConfig "enablePD") ($modelSpec.lmcacheConfig.enablePD) }} - - name: LMCACHE_LOCAL_CPU - value: "False" - - name: LMCACHE_MAX_LOCAL_CPU_SIZE - value: "0" - name: LMCACHE_REMOTE_SERDE value: "NULL" - name: UCX_TLS @@ -281,14 +287,29 @@ spec: - name: LMCACHE_NIXL_ROLE value: {{ $modelSpec.lmcacheConfig.nixlRole | quote }} {{- end }} + {{- if hasKey $modelSpec.lmcacheConfig "enableXpyd" }} + - name: LMCACHE_ENABLE_XPYD + value: {{ ternary "True" "False" $modelSpec.lmcacheConfig.enableXpyd | quote }} + {{- end }} + {{- if hasKey $modelSpec.lmcacheConfig "nixlProxyHost" }} + - name: LMCACHE_NIXL_PROXY_HOST + value: {{ $modelSpec.lmcacheConfig.nixlProxyHost | quote }} + {{- end }} + {{- if hasKey $modelSpec.lmcacheConfig "nixlProxyPort" }} + - name: LMCACHE_NIXL_PROXY_PORT + value: {{ $modelSpec.lmcacheConfig.nixlProxyPort | quote }} + {{- end }} {{- if hasKey $modelSpec.lmcacheConfig "nixlPeerHost" }} - - name: LMCACHE_NIXL_RECEIVER_HOST - # value: "0.0.0.0" + - name: LMCACHE_NIXL_PEER_HOST value: {{ $modelSpec.lmcacheConfig.nixlPeerHost | quote }} {{- end }} - {{- if hasKey $modelSpec.lmcacheConfig "nixlPeerPort" }} - - name: LMCACHE_NIXL_RECEIVER_PORT - value: {{ $modelSpec.lmcacheConfig.nixlPeerPort | quote }} + {{- if hasKey $modelSpec.lmcacheConfig "nixlPeerInitPort" }} + - name: LMCACHE_NIXL_PEER_INIT_PORT + value: {{ $modelSpec.lmcacheConfig.nixlPeerInitPort | quote }} + {{- end }} + {{- if hasKey $modelSpec.lmcacheConfig "nixlPeerAllocPort" }} + - name: LMCACHE_NIXL_PEER_ALLOC_PORT + value: {{ $modelSpec.lmcacheConfig.nixlPeerAllocPort | quote }} {{- end }} {{- if hasKey $modelSpec.lmcacheConfig "nixlBufferSize" }} - name: LMCACHE_NIXL_BUFFER_SIZE @@ -298,22 +319,26 @@ spec: - name: LMCACHE_NIXL_BUFFER_DEVICE value: {{ $modelSpec.lmcacheConfig.nixlBufferDevice | quote }} {{- end }} + {{- if hasKey $modelSpec.lmcacheConfig "nixlBackends" }} + - name: LMCACHE_NIXL_BACKENDS + value: {{ $modelSpec.lmcacheConfig.nixlBackends | toJson | quote }} + {{- end }} {{- if hasKey $modelSpec.lmcacheConfig "nixlEnableGc" }} - name: LMCACHE_NIXL_ENABLE_GC value: {{ ternary "True" "False" $modelSpec.lmcacheConfig.nixlEnableGc | quote }} {{- end }} {{- end }} - {{- if hasKey $modelSpec.lmcacheConfig "cpuOffloadingBufferSize" }} - {{- if gt (int $modelSpec.lmcacheConfig.cpuOffloadingBufferSize) 0 }} + {{- if hasKey $modelSpec.lmcacheConfig "localCpu" }} - name: LMCACHE_LOCAL_CPU - value: "True" + value: {{ ternary "True" "False" $modelSpec.lmcacheConfig.localCpu | quote }} + {{- end }} + {{- if hasKey $modelSpec.lmcacheConfig "maxLocalCpuSize" }} - name: LMCACHE_MAX_LOCAL_CPU_SIZE - value: "{{ $modelSpec.lmcacheConfig.cpuOffloadingBufferSize }}" - {{- end}} + value: {{ $modelSpec.lmcacheConfig.maxLocalCpuSize | quote }} {{- end }} - {{- if hasKey $modelSpec.lmcacheConfig "diskOffloadingBufferSize" }} + {{- if hasKey $modelSpec.lmcacheConfig "maxLocalDiskSize" }} - name: LMCACHE_MAX_LOCAL_DISK_SIZE - value: "{{ $modelSpec.lmcacheConfig.diskOffloadingBufferSize }}" + value: {{ $modelSpec.lmcacheConfig.maxLocalDiskSize | quote }} {{- end }} {{- if .Values.cacheserverSpec }} - name: LMCACHE_REMOTE_URL @@ -356,6 +381,16 @@ spec: containerPort: 55555 - name: ucx-port containerPort: 9999 + - name: pd-port-1 + containerPort: 7100 + - name: pd-port-2 + containerPort: 7200 + - name: pd-port-3 + containerPort: 7300 + - name: pd-port-4 + containerPort: 7400 + - name: pd-port-5 + containerPort: 7500 {{- include "chart.probes" . | indent 10 }} resources: {{- include "chart.resources" $modelSpec | nindent 12 }} {{- if or (hasKey $modelSpec "pvcStorage") (and $modelSpec.vllmConfig (hasKey $modelSpec.vllmConfig "tensorParallelSize")) (hasKey $modelSpec "chatTemplate") (hasKey $modelSpec "extraVolumeMounts") }} diff --git a/helm/templates/service-router.yaml b/helm/templates/service-router.yaml index 1340eaf30..1aa83151f 100644 --- a/helm/templates/service-router.yaml +++ b/helm/templates/service-router.yaml @@ -20,6 +20,26 @@ spec: port: 9000 targetPort: lmcache-port protocol: TCP + - name: pd-port-1 + port: 7100 + targetPort: pd-port-1 + protocol: TCP + - name: pd-port-2 + port: 7200 + targetPort: pd-port-2 + protocol: TCP + - name: pd-port-3 + port: 7300 + targetPort: pd-port-3 + protocol: TCP + - name: pd-port-4 + port: 7400 + targetPort: pd-port-4 + protocol: TCP + - name: pd-port-5 + port: 7500 + targetPort: pd-port-5 + protocol: TCP selector: {{- include "chart.routerLabels" . | nindent 4 }} {{- end }} diff --git a/helm/templates/service-vllm.yaml b/helm/templates/service-vllm.yaml index e9220d384..bd6a7b85c 100644 --- a/helm/templates/service-vllm.yaml +++ b/helm/templates/service-vllm.yaml @@ -23,6 +23,26 @@ spec: port: 9999 targetPort: ucx-port protocol: TCP + - name: pd-port-1 + port: 7100 + targetPort: pd-port-1 + protocol: TCP + - name: pd-port-2 + port: 7200 + targetPort: pd-port-2 + protocol: TCP + - name: pd-port-3 + port: 7300 + targetPort: pd-port-3 + protocol: TCP + - name: pd-port-4 + port: 7400 + targetPort: pd-port-4 + protocol: TCP + - name: pd-port-5 + port: 7500 + targetPort: pd-port-5 + protocol: TCP selector: model: "{{ $modelSpec.name }}" helm-release-name: "{{ $.Release.Name }}" diff --git a/src/vllm_router/app.py b/src/vllm_router/app.py index 0713e9c0f..b5569e877 100644 --- a/src/vllm_router/app.py +++ b/src/vllm_router/app.py @@ -46,6 +46,10 @@ from vllm_router.services.request_service.rewriter import ( get_request_rewriter, ) +from vllm_router.services.request_service.request import ( + start_zmq_task, + stop_zmq_task, +) from vllm_router.stats.engine_stats import ( get_engine_stats_scraper, initialize_engine_stats_scraper, @@ -62,6 +66,8 @@ set_ulimit, ) +import asyncio + try: # Semantic cache integration from vllm_router.experimental.semantic_cache import ( @@ -90,7 +96,16 @@ async def lifespan(app: FastAPI): if hasattr(service_discovery, "initialize_client_sessions"): await service_discovery.initialize_client_sessions() + app.state.event_loop = asyncio.get_event_loop() + + # Start the ZMQ task + await start_zmq_task() + yield + + # Stop the ZMQ task + await stop_zmq_task() + await app.state.aiohttp_client_wrapper.stop() # Close the threaded-components @@ -270,6 +285,7 @@ def initialize_all(app: FastAPI, args): app.state.request_stats_monitor = get_request_stats_monitor() app.state.router = get_routing_logic() app.state.request_rewriter = get_request_rewriter() + app.state.args = args app = FastAPI(lifespan=lifespan) diff --git a/src/vllm_router/parsers/parser.py b/src/vllm_router/parsers/parser.py index 8b12cf983..14778056f 100644 --- a/src/vllm_router/parsers/parser.py +++ b/src/vllm_router/parsers/parser.py @@ -379,6 +379,35 @@ def parse_args(): help="The threshold for kv-aware routing.", ) + parser.add_argument( + "--nixl-peer-host", + type=str, + help="The hostname or IP address of the NIXL peer service. Only use for DisaggregatedPrefillRouter.", + ) + parser.add_argument( + "--nixl-peer-init-port", + type=int, + default=7300, + help="The initialization port for the NIXL peer service. Only use for DisaggregatedPrefillRouter.", + ) + parser.add_argument( + "--nixl-peer-alloc-port", + type=int, + default=7400, + help="The allocation port for the NIXL peer service. Only use for DisaggregatedPrefillRouter.", + ) + parser.add_argument( + "--nixl-proxy-host", + type=str, + help="The hostname or IP address for the NIXL proxy server. Only use for DisaggregatedPrefillRouter.", + ) + parser.add_argument( + "--nixl-proxy-port", + type=int, + default=7500, + help="The port for the NIXL proxy server. Only use for DisaggregatedPrefillRouter.", + ) + args = parser.parse_args() args = load_initial_config_from_config_file_if_required(parser, args) diff --git a/src/vllm_router/service_discovery.py b/src/vllm_router/service_discovery.py index 8ee8f089a..be5dd092e 100644 --- a/src/vllm_router/service_discovery.py +++ b/src/vllm_router/service_discovery.py @@ -330,11 +330,13 @@ async def initialize_client_sessions(self) -> None: endpoint_infos = self.get_endpoint_info() for endpoint_info in endpoint_infos: if endpoint_info.model_label in self.prefill_model_labels: + # TODO: fix Unclosed client session self.app.state.prefill_client = aiohttp.ClientSession( base_url=endpoint_info.url, timeout=aiohttp.ClientTimeout(total=None), ) elif endpoint_info.model_label in self.decode_model_labels: + # TODO: fix Unclosed client session self.app.state.decode_client = aiohttp.ClientSession( base_url=endpoint_info.url, timeout=aiohttp.ClientTimeout(total=None), @@ -662,6 +664,14 @@ def _add_engine( # Store model information in the endpoint info self.available_engines[engine_name].model_info = model_info + try: + fut = asyncio.run_coroutine_threadsafe( + self.initialize_client_sessions(), self.app.state.event_loop + ) + fut.result() + except Exception as e: + logger.error(f"Error initializing client sessions: {e}") + def _delete_engine(self, engine_name: str): logger.info(f"Serving engine {engine_name} is deleted") with self.available_engines_lock: @@ -748,15 +758,18 @@ async def initialize_client_sessions(self) -> None: endpoint_infos = self.get_endpoint_info() for endpoint_info in endpoint_infos: if endpoint_info.model_label in self.prefill_model_labels: + # TODO: fix Unclosed client session self.app.state.prefill_client = aiohttp.ClientSession( - base_url=endpoint_info.url, - timeout=aiohttp.ClientTimeout(total=None), - ) + base_url=endpoint_info.url, + timeout=aiohttp.ClientTimeout(total=None), + ) + elif endpoint_info.model_label in self.decode_model_labels: + # TODO: fix Unclosed client session self.app.state.decode_client = aiohttp.ClientSession( - base_url=endpoint_info.url, - timeout=aiohttp.ClientTimeout(total=None), - ) + base_url=endpoint_info.url, + timeout=aiohttp.ClientTimeout(total=None), + ) class K8sServiceNameServiceDiscovery(ServiceDiscovery): @@ -1080,6 +1093,14 @@ def _add_engine(self, engine_name: str, model_names: List[str], model_label: str # Store model information in the endpoint info self.available_engines[engine_name].model_info = model_info + try: + fut = asyncio.run_coroutine_threadsafe( + self.initialize_client_sessions(), self.app.state.event_loop + ) + fut.result() + except Exception as e: + logger.error(f"Error initializing client sessions: {e}") + def _delete_engine(self, engine_name: str): logger.info(f"Serving engine {engine_name} is deleted") with self.available_engines_lock: @@ -1165,11 +1186,13 @@ async def initialize_client_sessions(self) -> None: endpoint_infos = self.get_endpoint_info() for endpoint_info in endpoint_infos: if endpoint_info.model_label in self.prefill_model_labels: + # TODO: fix Unclosed client session self.app.state.prefill_client = aiohttp.ClientSession( base_url=endpoint_info.url, timeout=aiohttp.ClientTimeout(total=None), ) elif endpoint_info.model_label in self.decode_model_labels: + # TODO: fix Unclosed client session self.app.state.decode_client = aiohttp.ClientSession( base_url=endpoint_info.url, timeout=aiohttp.ClientTimeout(total=None), diff --git a/src/vllm_router/services/request_service/request.py b/src/vllm_router/services/request_service/request.py index 0c5005715..f0742cdd3 100644 --- a/src/vllm_router/services/request_service/request.py +++ b/src/vllm_router/services/request_service/request.py @@ -37,6 +37,10 @@ ) from vllm_router.utils import replace_model_in_request_body, update_content_length +from lmcache.v1.storage_backend.connector.nixl_connector_v3 import ( + NixlMsg, +) + try: # Semantic cache integration from vllm_router.experimental.semantic_cache_integration import ( @@ -50,6 +54,74 @@ logger = init_logger(__name__) +import zmq +import zmq.asyncio +import msgspec +import asyncio + +finished_reqs = set() +run_proxy = True +zmq_ctx = zmq.asyncio.Context() + +async def zmq_pull_server(): + try: + socket = zmq_ctx.socket(zmq.PULL) + try: + from vllm_router.app import app + proxy_host = app.state.args.nixl_proxy_host + proxy_port = app.state.args.nixl_proxy_port + except Exception as e: + logger.error(f"Failed to get proxy host and port from app state: {e}") + proxy_url = f"{proxy_host}:{proxy_port}" + socket.bind(f"tcp://{proxy_url}") + logger.info(f"ZMQ proxy server started on {proxy_url}") + except Exception as e: + logger.error(f"Failed to bind ZMQ socket to {proxy_url}: {e}") + socket.close() + return + + while run_proxy: + try: + msg_bytes = await socket.recv() + msg = msgspec.msgpack.decode(msg_bytes, type=NixlMsg) + req_id = msg.req_id + finished_reqs.add(req_id) + logger.info(f"Prefill of req {req_id} done.") + except zmq.Again: + await asyncio.sleep(0.01) # Avoid busy loop + except Exception as e: + logger.error(f"ZMQ Error in message processing: {e}") + break + + socket.close() + logger.info("ZMQ PULL server stopped.") + +# ZMQ task will be created in the FastAPI lifespan manager +zmq_task = None + +async def start_zmq_task(): + """Start the ZMQ pull server task.""" + global zmq_task + if zmq_task is None: + zmq_task = asyncio.create_task(zmq_pull_server()) + logger.info("ZMQ task started") + + # Add a small delay to allow the task to start and potentially log any errors + await asyncio.sleep(0.1) + +async def stop_zmq_task(): + """Stop the ZMQ pull server task.""" + global zmq_task, run_proxy + if zmq_task is not None: + run_proxy = False + zmq_task.cancel() + try: + await zmq_task + except asyncio.CancelledError: + pass + zmq_task = None + logger.info("ZMQ task stopped") + # TODO: (Brian) check if request is json beforehand async def process_request( @@ -164,7 +236,7 @@ async def route_general_request( # Same as vllm, Get request_id from X-Request-Id header if available request_id = request.headers.get("X-Request-Id") or str(uuid.uuid4()) request_body = await request.body() - request_json = json.loads(request_body) + request_json = await request.json() # TODO (ApostaC): merge two awaits into one if request.query_params: request_endpoint = request.query_params.get("id") @@ -204,6 +276,7 @@ async def route_general_request( status_code=400, detail="Request body is not JSON parsable." ) + # TODO (ApostaC): merge two awaits into one service_discovery = get_service_discovery() endpoints = service_discovery.get_endpoint_info() @@ -301,7 +374,7 @@ async def route_general_request( media_type="text/event-stream", ) - +# TODO: Combine with send_request_to_tokenizer and send_request_to_decode async def send_request_to_prefiller( client: aiohttp.ClientSession, endpoint: str, req_data: dict, request_id: str ): @@ -321,6 +394,22 @@ async def send_request_to_prefiller( return await response.json() +async def send_request_to_tokenizer( + client: aiohttp.ClientSession, endpoint: str, req_data: dict, request_id: str +): + """ + Send a request to a tokenizer service. + """ + headers = { + "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}", + "X-Request-Id": request_id, + } + + async with client.post(endpoint, json=req_data, headers=headers) as response: + response.raise_for_status() + return await response.json() + + async def send_request_to_decode( client: aiohttp.ClientSession, endpoint: str, req_data: dict, request_id: str ): @@ -336,6 +425,12 @@ async def send_request_to_decode( yield chunk +async def wait_decode_kv_ready(req_id: str): + while req_id not in finished_reqs: + await asyncio.sleep(0.0001) # sleep for 0.1 ms + logger.debug(f"Prefill node signaled kv ready for req {req_id}") + finished_reqs.remove(req_id) + async def route_disaggregated_prefill_request( request: Request, endpoint: str, @@ -347,10 +442,73 @@ async def route_disaggregated_prefill_request( request_json = await request.json() orig_max_tokens = request_json.get("max_tokens", 0) - request_json["max_tokens"] = 1 + stream_options = request_json.pop("stream_options", None) + + # # Check if client sessions are initialized, if not, try to initialize them + # if not hasattr(request.app.state, 'prefill_client') or request.app.state.prefill_client is None: + # logger.warning("prefill_client not initialized, attempting to initialize client sessions") + # try: + # from vllm_router.service_discovery import get_service_discovery + # service_discovery = get_service_discovery() + # if hasattr(service_discovery, '_reinitialize_client_sessions'): + # logger.info("In route_disaggregated_prefill_request: Calling _reinitialize_client_sessions") + # await service_discovery._reinitialize_client_sessions() + # logger.info("Successfully initialized client sessions") + # else: + # logger.error("Service discovery does not have _reinitialize_client_sessions method") + # except Exception as e: + # logger.error(f"Failed to initialize client sessions: {e}") + # return JSONResponse( + # status_code=500, + # content={ + # "error": { + # "message": "Failed to initialize client sessions", + # "type": "initialization_error", + # "code": 500, + # } + # }, + # headers={"X-Request-Id": request_id}, + # ) + st = time.time() try: - await send_request_to_prefiller( + # Step 1: Tokenize the prompt + # request_json {'model': 'facebook/opt-125m', 'prompt': 'What date is today?', 'max_tokens': 20, 'temperature': 0.0} + # # print every key-value pair in prefill_client + # for key, value in request.app.state.prefill_client.__dict__.items(): + # print(f"{key}: {value}") + + tokenize_output = await send_request_to_tokenizer( + request.app.state.prefill_client, "/tokenize", {"prompt": request_json["prompt"]}, request_id + ) + # tokenize_output {'count': 6, 'max_model_len': 2048, 'tokens': [2, 2264, 1248, 16, 452, 116], 'token_strs': None} + + # Update request with tokenized prompt + request_json["prompt"] = tokenize_output["tokens"] + request_json["max_tokens"] = 1 + + # Step 2: Create disagg_spec for KV transfer + disagg_spec = { + "req_id": request_id, + "receiver_host": request.app.state.args.nixl_peer_host, + "receiver_init_port": [request.app.state.args.nixl_peer_init_port], + "receiver_alloc_port": [request.app.state.args.nixl_peer_alloc_port], + } + # disagg_spec = { + # "req_id": request_id, + # "receiver_host": "0.0.0.0", + # "receiver_init_port": [7300], + # "receiver_alloc_port": [7400], + # } + + request_json["kv_transfer_params"] = { + "ret_first_tok": True, + "disagg_spec": disagg_spec, + } + request_json["stream"] = False + + # Step 3: Send to prefiller + prefill_output = await send_request_to_prefiller( request.app.state.prefill_client, endpoint, request_json, request_id ) et = time.time() @@ -358,7 +516,15 @@ async def route_disaggregated_prefill_request( logger.info( f"Routing request {request_id} with session id None to {request.app.state.prefill_client._base_url} at {et}, process time = {et - in_router_time:.4f}" ) - request_json["max_tokens"] = orig_max_tokens + + # Step 4: Prepare decode request + request_json["max_tokens"] = orig_max_tokens - 1 + request_json["prompt"].append(prefill_output["kv_transfer_params"]["first_tok"]) + request_json.pop("kv_transfer_params") + request_json["stream"] = True + if stream_options is not None: + request_json["stream_options"] = stream_options + except aiohttp.ClientResponseError as e: logger.error(f"HTTP error in prefiller: {e}", exc_info=True) return JSONResponse( @@ -388,6 +554,30 @@ async def route_disaggregated_prefill_request( async def generate_stream(): try: + # Yield initial chunk with prefill data + head_chunk = { + "id": prefill_output["id"], + "object": "text_completion", + "created": prefill_output["created"], + "model": prefill_output["model"], + "choices": [ + { + "index": 0, + "text": prefill_output["choices"][0]["text"], + "logprobs": None, + "finish_reason": None, + "stop_reason": None, + } + ], + "usage": None, + } + yield ( + "data: " + json.dumps(head_chunk, separators=(",", ":")) + "\n\n" + ).encode() + + await wait_decode_kv_ready(request_id) + + # Stream the rest from decode service async for chunk in send_request_to_decode( request.app.state.decode_client, endpoint, request_json, request_id ): diff --git a/tutorials/assets/values-16-disagg-prefill.yaml b/tutorials/assets/values-16-disagg-prefill.yaml index 35bcf410c..b849e0be5 100644 --- a/tutorials/assets/values-16-disagg-prefill.yaml +++ b/tutorials/assets/values-16-disagg-prefill.yaml @@ -1,13 +1,13 @@ # Unified configuration for disaggregated prefill setup servingEngineSpec: enableEngine: true - runtimeClassName: "" + runtimeClassName: "nvidia" containerPort: 8000 modelSpec: # Prefill node configuration - name: "llama-prefill" repository: "lmcache/vllm-openai" - tag: "2025-05-27-v1" + tag: "nightly-2025-08-20" modelURL: "meta-llama/Llama-3.1-8B-Instruct" replicaCount: 1 requestCPU: 8 @@ -15,28 +15,34 @@ servingEngineSpec: # requestGPU: 1 pvcStorage: "50Gi" vllmConfig: - enablePrefixCaching: true - maxModelLen: 32000 + enablePrefixCaching: false + # maxModelLen: 2048 + extraArgs: + - "--enforce-eager" + - "--disable-log-requests" lmcacheConfig: cudaVisibleDevices: "0" enabled: true kvRole: "kv_producer" + localCpu: true + maxLocalCpuSize: 5 + maxLocalDiskSize: 0 enableNixl: true + enableXpyd: true nixlRole: "sender" - nixlPeerHost: "vllm-llama-decode-engine-service" - nixlPeerPort: "55555" - nixlBufferSize: "1073741824" # 1GB + nixlProxyHost: "vllm-router-service" + nixlProxyPort: 7500 + nixlBufferSize: "1073741824" nixlBufferDevice: "cuda" - nixlEnableGc: true enablePD: true - cpuOffloadingBufferSize: 0 - hf_token: + rpcPort: "producer1" labels: model: "llama-prefill" + hf_token: # Decode node configuration - name: "llama-decode" repository: "lmcache/vllm-openai" - tag: "2025-05-27-v1" + tag: "nightly-2025-08-20" modelURL: "meta-llama/Llama-3.1-8B-Instruct" replicaCount: 1 requestCPU: 8 @@ -44,20 +50,29 @@ servingEngineSpec: # requestGPU: 1 pvcStorage: "50Gi" vllmConfig: - enablePrefixCaching: true - maxModelLen: 32000 + enablePrefixCaching: false + # maxModelLen: 2048 + extraArgs: + - "--enforce-eager" + - "--disable-log-requests" lmcacheConfig: cudaVisibleDevices: "1" enabled: true kvRole: "kv_consumer" # Set decode node as consumer + localCpu: false + maxLocalCpuSize: 0 enableNixl: true + enableXpyd: true nixlRole: "receiver" nixlPeerHost: "0.0.0.0" - nixlPeerPort: "55555" - nixlBufferSize: "1073741824" # 1GB + nixlPeerInitPort: 7300 + nixlPeerAllocPort: 7400 + nixlBufferSize: "2147483648" nixlBufferDevice: "cuda" - nixlEnableGc: true + nixlBackends: ["UCX"] enablePD: true + rpcPort: "consumer1" + skipLastNTokens: 1 hf_token: labels: model: "llama-decode" @@ -67,8 +82,9 @@ servingEngineSpec: - SYS_PTRACE routerSpec: enableRouter: true - repository: "lmcache/lmstack-router" - tag: "pd" + repository: "xiaokunchen/vllm-router" + tag: "08-27-v8" + imagePullPolicy: "Always" replicaCount: 1 containerPort: 8000 servicePort: 80 @@ -91,3 +107,8 @@ routerSpec: - "llama-prefill" - "--decode-model-labels" - "llama-decode" + nixlPeerHost: "vllm-llama-decode-engine-service" + nixlPeerInitPort: 7300 + nixlPeerAllocPort: 7400 + nixlProxyHost: "0.0.0.0" + nixlProxyPort: 7500 From b5073577b5cb53b897d096e5edd14071a8c2c0f6 Mon Sep 17 00:00:00 2001 From: Kobe Chen Date: Thu, 28 Aug 2025 00:18:12 +0000 Subject: [PATCH 02/17] try to pass CI Signed-off-by: Kobe Chen --- docker/Dockerfile.pd | 2 +- docs/source/developer_guide/docker.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/Dockerfile.pd b/docker/Dockerfile.pd index 4c89e27de..61d5aeb6a 100644 --- a/docker/Dockerfile.pd +++ b/docker/Dockerfile.pd @@ -1,4 +1,4 @@ -FROM lmcache/vllm-openai:latest-nightly +FROM lmcache/vllm-openai:nightly-2025-08-20 WORKDIR /app diff --git a/docs/source/developer_guide/docker.rst b/docs/source/developer_guide/docker.rst index b035397fc..19ffa56d6 100644 --- a/docs/source/developer_guide/docker.rst +++ b/docs/source/developer_guide/docker.rst @@ -10,4 +10,4 @@ Run this command from the root folder path of the project: .. code-block:: bash - docker build -t : -f docker/Dockerfile . + docker build -t : -f docker/Dockerfile.pd . From 72141a01c211be83dd0dd74302278d6468b8497f Mon Sep 17 00:00:00 2001 From: Kobe Chen Date: Thu, 28 Aug 2025 00:22:27 +0000 Subject: [PATCH 03/17] pass pre-commit Signed-off-by: Kobe Chen --- src/vllm_router/app.py | 13 +++--- src/vllm_router/service_discovery.py | 14 +++--- .../services/request_service/request.py | 44 ++++++++++++------- .../assets/values-16-disagg-prefill.yaml | 2 +- 4 files changed, 41 insertions(+), 32 deletions(-) diff --git a/src/vllm_router/app.py b/src/vllm_router/app.py index b5569e877..23a0a234c 100644 --- a/src/vllm_router/app.py +++ b/src/vllm_router/app.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import logging import threading from contextlib import asynccontextmanager @@ -43,13 +44,13 @@ from vllm_router.services.batch_service import initialize_batch_processor from vllm_router.services.callbacks_service.callbacks import configure_custom_callbacks from vllm_router.services.files_service import initialize_storage -from vllm_router.services.request_service.rewriter import ( - get_request_rewriter, -) from vllm_router.services.request_service.request import ( start_zmq_task, stop_zmq_task, ) +from vllm_router.services.request_service.rewriter import ( + get_request_rewriter, +) from vllm_router.stats.engine_stats import ( get_engine_stats_scraper, initialize_engine_stats_scraper, @@ -66,8 +67,6 @@ set_ulimit, ) -import asyncio - try: # Semantic cache integration from vllm_router.experimental.semantic_cache import ( @@ -102,10 +101,10 @@ async def lifespan(app: FastAPI): await start_zmq_task() yield - + # Stop the ZMQ task await stop_zmq_task() - + await app.state.aiohttp_client_wrapper.stop() # Close the threaded-components diff --git a/src/vllm_router/service_discovery.py b/src/vllm_router/service_discovery.py index be5dd092e..d4bf3b228 100644 --- a/src/vllm_router/service_discovery.py +++ b/src/vllm_router/service_discovery.py @@ -760,16 +760,16 @@ async def initialize_client_sessions(self) -> None: if endpoint_info.model_label in self.prefill_model_labels: # TODO: fix Unclosed client session self.app.state.prefill_client = aiohttp.ClientSession( - base_url=endpoint_info.url, - timeout=aiohttp.ClientTimeout(total=None), - ) - + base_url=endpoint_info.url, + timeout=aiohttp.ClientTimeout(total=None), + ) + elif endpoint_info.model_label in self.decode_model_labels: # TODO: fix Unclosed client session self.app.state.decode_client = aiohttp.ClientSession( - base_url=endpoint_info.url, - timeout=aiohttp.ClientTimeout(total=None), - ) + base_url=endpoint_info.url, + timeout=aiohttp.ClientTimeout(total=None), + ) class K8sServiceNameServiceDiscovery(ServiceDiscovery): diff --git a/src/vllm_router/services/request_service/request.py b/src/vllm_router/services/request_service/request.py index f0742cdd3..a37ab63c0 100644 --- a/src/vllm_router/services/request_service/request.py +++ b/src/vllm_router/services/request_service/request.py @@ -22,6 +22,9 @@ import aiohttp from fastapi import BackgroundTasks, HTTPException, Request, UploadFile from fastapi.responses import JSONResponse, StreamingResponse +from lmcache.v1.storage_backend.connector.nixl_connector_v3 import ( + NixlMsg, +) from requests import JSONDecodeError from vllm_router.log import init_logger @@ -37,10 +40,6 @@ ) from vllm_router.utils import replace_model_in_request_body, update_content_length -from lmcache.v1.storage_backend.connector.nixl_connector_v3 import ( - NixlMsg, -) - try: # Semantic cache integration from vllm_router.experimental.semantic_cache_integration import ( @@ -54,20 +53,23 @@ logger = init_logger(__name__) +import asyncio + +import msgspec import zmq import zmq.asyncio -import msgspec -import asyncio finished_reqs = set() run_proxy = True zmq_ctx = zmq.asyncio.Context() + async def zmq_pull_server(): try: socket = zmq_ctx.socket(zmq.PULL) try: from vllm_router.app import app + proxy_host = app.state.args.nixl_proxy_host proxy_port = app.state.args.nixl_proxy_port except Exception as e: @@ -96,19 +98,22 @@ async def zmq_pull_server(): socket.close() logger.info("ZMQ PULL server stopped.") + # ZMQ task will be created in the FastAPI lifespan manager zmq_task = None + async def start_zmq_task(): """Start the ZMQ pull server task.""" global zmq_task if zmq_task is None: zmq_task = asyncio.create_task(zmq_pull_server()) logger.info("ZMQ task started") - + # Add a small delay to allow the task to start and potentially log any errors await asyncio.sleep(0.1) + async def stop_zmq_task(): """Stop the ZMQ pull server task.""" global zmq_task, run_proxy @@ -374,6 +379,7 @@ async def route_general_request( media_type="text/event-stream", ) + # TODO: Combine with send_request_to_tokenizer and send_request_to_decode async def send_request_to_prefiller( client: aiohttp.ClientSession, endpoint: str, req_data: dict, request_id: str @@ -431,6 +437,7 @@ async def wait_decode_kv_ready(req_id: str): logger.debug(f"Prefill node signaled kv ready for req {req_id}") finished_reqs.remove(req_id) + async def route_disaggregated_prefill_request( request: Request, endpoint: str, @@ -443,7 +450,7 @@ async def route_disaggregated_prefill_request( orig_max_tokens = request_json.get("max_tokens", 0) stream_options = request_json.pop("stream_options", None) - + # # Check if client sessions are initialized, if not, try to initialize them # if not hasattr(request.app.state, 'prefill_client') or request.app.state.prefill_client is None: # logger.warning("prefill_client not initialized, attempting to initialize client sessions") @@ -469,7 +476,7 @@ async def route_disaggregated_prefill_request( # }, # headers={"X-Request-Id": request_id}, # ) - + st = time.time() try: # Step 1: Tokenize the prompt @@ -479,14 +486,17 @@ async def route_disaggregated_prefill_request( # print(f"{key}: {value}") tokenize_output = await send_request_to_tokenizer( - request.app.state.prefill_client, "/tokenize", {"prompt": request_json["prompt"]}, request_id + request.app.state.prefill_client, + "/tokenize", + {"prompt": request_json["prompt"]}, + request_id, ) # tokenize_output {'count': 6, 'max_model_len': 2048, 'tokens': [2, 2264, 1248, 16, 452, 116], 'token_strs': None} - + # Update request with tokenized prompt request_json["prompt"] = tokenize_output["tokens"] request_json["max_tokens"] = 1 - + # Step 2: Create disagg_spec for KV transfer disagg_spec = { "req_id": request_id, @@ -500,13 +510,13 @@ async def route_disaggregated_prefill_request( # "receiver_init_port": [7300], # "receiver_alloc_port": [7400], # } - + request_json["kv_transfer_params"] = { "ret_first_tok": True, "disagg_spec": disagg_spec, } request_json["stream"] = False - + # Step 3: Send to prefiller prefill_output = await send_request_to_prefiller( request.app.state.prefill_client, endpoint, request_json, request_id @@ -516,7 +526,7 @@ async def route_disaggregated_prefill_request( logger.info( f"Routing request {request_id} with session id None to {request.app.state.prefill_client._base_url} at {et}, process time = {et - in_router_time:.4f}" ) - + # Step 4: Prepare decode request request_json["max_tokens"] = orig_max_tokens - 1 request_json["prompt"].append(prefill_output["kv_transfer_params"]["first_tok"]) @@ -524,7 +534,7 @@ async def route_disaggregated_prefill_request( request_json["stream"] = True if stream_options is not None: request_json["stream_options"] = stream_options - + except aiohttp.ClientResponseError as e: logger.error(f"HTTP error in prefiller: {e}", exc_info=True) return JSONResponse( @@ -574,7 +584,7 @@ async def generate_stream(): yield ( "data: " + json.dumps(head_chunk, separators=(",", ":")) + "\n\n" ).encode() - + await wait_decode_kv_ready(request_id) # Stream the rest from decode service diff --git a/tutorials/assets/values-16-disagg-prefill.yaml b/tutorials/assets/values-16-disagg-prefill.yaml index b849e0be5..21c3052d3 100644 --- a/tutorials/assets/values-16-disagg-prefill.yaml +++ b/tutorials/assets/values-16-disagg-prefill.yaml @@ -31,7 +31,7 @@ servingEngineSpec: enableXpyd: true nixlRole: "sender" nixlProxyHost: "vllm-router-service" - nixlProxyPort: 7500 + nixlProxyPort: 7500 nixlBufferSize: "1073741824" nixlBufferDevice: "cuda" enablePD: true From 7623b4873eacbf0107fcde4740af7c2110fd8564 Mon Sep 17 00:00:00 2001 From: Kobe Chen Date: Thu, 28 Aug 2025 00:27:02 +0000 Subject: [PATCH 04/17] pass pre-commit Signed-off-by: Kobe Chen --- src/vllm_router/services/request_service/request.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/vllm_router/services/request_service/request.py b/src/vllm_router/services/request_service/request.py index a37ab63c0..3f18eaee4 100644 --- a/src/vllm_router/services/request_service/request.py +++ b/src/vllm_router/services/request_service/request.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio + # --- Request Processing & Routing --- import json import os @@ -20,6 +22,9 @@ from typing import Optional import aiohttp +import msgspec +import zmq +import zmq.asyncio from fastapi import BackgroundTasks, HTTPException, Request, UploadFile from fastapi.responses import JSONResponse, StreamingResponse from lmcache.v1.storage_backend.connector.nixl_connector_v3 import ( @@ -53,12 +58,6 @@ logger = init_logger(__name__) -import asyncio - -import msgspec -import zmq -import zmq.asyncio - finished_reqs = set() run_proxy = True zmq_ctx = zmq.asyncio.Context() From 6b5a5335f7d8704d6a10cc5d4e39309704a22253 Mon Sep 17 00:00:00 2001 From: Kobe Chen Date: Thu, 28 Aug 2025 00:28:21 +0000 Subject: [PATCH 05/17] modify CI dockerfiles Signed-off-by: Kobe Chen --- .github/workflows/functionality-helm-chart.yml | 2 +- .github/workflows/router-e2e-test.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/functionality-helm-chart.yml b/.github/workflows/functionality-helm-chart.yml index 42bb9541a..d3f4c995b 100644 --- a/.github/workflows/functionality-helm-chart.yml +++ b/.github/workflows/functionality-helm-chart.yml @@ -58,7 +58,7 @@ jobs: run: | cd ${{ github.workspace }} kubectl config use-context minikube - sudo docker build --build-arg INSTALL_OPTIONAL_DEP=default -t localhost:5000/git-act-router -f docker/Dockerfile . + sudo docker build --build-arg INSTALL_OPTIONAL_DEP=default -t localhost:5000/git-act-router -f docker/Dockerfile.pd . sudo docker push localhost:5000/git-act-router sudo sysctl fs.protected_regular=0 minikube image load localhost:5000/git-act-router diff --git a/.github/workflows/router-e2e-test.yml b/.github/workflows/router-e2e-test.yml index 85f7c4c47..1aaec1b88 100644 --- a/.github/workflows/router-e2e-test.yml +++ b/.github/workflows/router-e2e-test.yml @@ -135,7 +135,7 @@ jobs: echo "🔨 Building router docker image" cd ${{ github.workspace }} eval "$(minikube docker-env)" - docker build --build-arg INSTALL_OPTIONAL_DEP=default -t git-act-router -f docker/Dockerfile.kvaware . + docker build --build-arg INSTALL_OPTIONAL_DEP=default -t git-act-router -f docker/Dockerfile.pd . - name: Run all k8s discovery routing tests run: | From 099161fe30b28599f51ef0ab7d32cd920999c4c6 Mon Sep 17 00:00:00 2001 From: Kobe Chen Date: Thu, 28 Aug 2025 04:58:42 +0000 Subject: [PATCH 06/17] wrap import NixlMsg into a try-catch block Signed-off-by: Kobe Chen --- src/vllm_router/services/request_service/request.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/vllm_router/services/request_service/request.py b/src/vllm_router/services/request_service/request.py index 3f18eaee4..9986d097f 100644 --- a/src/vllm_router/services/request_service/request.py +++ b/src/vllm_router/services/request_service/request.py @@ -27,9 +27,13 @@ import zmq.asyncio from fastapi import BackgroundTasks, HTTPException, Request, UploadFile from fastapi.responses import JSONResponse, StreamingResponse -from lmcache.v1.storage_backend.connector.nixl_connector_v3 import ( - NixlMsg, -) + +try: + from lmcache.v1.storage_backend.connector.nixl_connector_v3 import ( + NixlMsg, + ) +except ImportError: + pass from requests import JSONDecodeError from vllm_router.log import init_logger From b92826eb1d743065a713611ed05e5fe5e08c54c5 Mon Sep 17 00:00:00 2001 From: Kobe Chen Date: Fri, 5 Sep 2025 21:18:12 +0000 Subject: [PATCH 07/17] fuctional new docker image and remove nixlBackend UCX Signed-off-by: Kobe Chen --- tutorials/assets/values-16-disagg-prefill.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tutorials/assets/values-16-disagg-prefill.yaml b/tutorials/assets/values-16-disagg-prefill.yaml index 21c3052d3..43cb154d4 100644 --- a/tutorials/assets/values-16-disagg-prefill.yaml +++ b/tutorials/assets/values-16-disagg-prefill.yaml @@ -7,7 +7,7 @@ servingEngineSpec: # Prefill node configuration - name: "llama-prefill" repository: "lmcache/vllm-openai" - tag: "nightly-2025-08-20" + tag: "nightly-2025-09-04" modelURL: "meta-llama/Llama-3.1-8B-Instruct" replicaCount: 1 requestCPU: 8 @@ -42,7 +42,7 @@ servingEngineSpec: # Decode node configuration - name: "llama-decode" repository: "lmcache/vllm-openai" - tag: "nightly-2025-08-20" + tag: "nightly-2025-09-04" modelURL: "meta-llama/Llama-3.1-8B-Instruct" replicaCount: 1 requestCPU: 8 @@ -69,7 +69,7 @@ servingEngineSpec: nixlPeerAllocPort: 7400 nixlBufferSize: "2147483648" nixlBufferDevice: "cuda" - nixlBackends: ["UCX"] + # nixlBackends: ["UCX"] enablePD: true rpcPort: "consumer1" skipLastNTokens: 1 From 1db19aadee444033abfb93ff82d072378b680c5e Mon Sep 17 00:00:00 2001 From: Kobe Chen Date: Fri, 5 Sep 2025 21:21:32 +0000 Subject: [PATCH 08/17] only start the ZMQ task if the routing logic is RoutingLogic.DISAGGREGATED_PREFILL Signed-off-by: Kobe Chen --- src/vllm_router/app.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/vllm_router/app.py b/src/vllm_router/app.py index 23a0a234c..7102aeea0 100644 --- a/src/vllm_router/app.py +++ b/src/vllm_router/app.py @@ -35,6 +35,7 @@ from vllm_router.routers.routing_logic import ( get_routing_logic, initialize_routing_logic, + DisaggregatedPrefillRouter, ) from vllm_router.service_discovery import ( ServiceDiscoveryType, @@ -97,13 +98,16 @@ async def lifespan(app: FastAPI): app.state.event_loop = asyncio.get_event_loop() - # Start the ZMQ task - await start_zmq_task() + # only start the ZMQ task if the routing logic is RoutingLogic.DISAGGREGATED_PREFILL + if isinstance(app.state.router, DisaggregatedPrefillRouter): + logger.info("Starting ZMQ task because the routing logic is RoutingLogic.DISAGGREGATED_PREFILL") + # Start the ZMQ task + await start_zmq_task() - yield + yield - # Stop the ZMQ task - await stop_zmq_task() + # Stop the ZMQ task + await stop_zmq_task() await app.state.aiohttp_client_wrapper.stop() From 7998394d23f310a029fd08117684dbaedbe5cb71 Mon Sep 17 00:00:00 2001 From: Kobe Chen Date: Fri, 5 Sep 2025 21:27:38 +0000 Subject: [PATCH 09/17] replace .github/values-10-disagg-prefill.yaml with values-16-disagg-prefill.yaml Signed-off-by: Kobe Chen --- .github/values-10-disagg-prefill.yaml | 89 +++++++++++++++------------ 1 file changed, 48 insertions(+), 41 deletions(-) diff --git a/.github/values-10-disagg-prefill.yaml b/.github/values-10-disagg-prefill.yaml index 548d284f5..43cb154d4 100644 --- a/.github/values-10-disagg-prefill.yaml +++ b/.github/values-10-disagg-prefill.yaml @@ -1,88 +1,90 @@ # Unified configuration for disaggregated prefill setup servingEngineSpec: - strategy: - type: Recreate enableEngine: true - runtimeClassName: "" + runtimeClassName: "nvidia" containerPort: 8000 modelSpec: # Prefill node configuration - - name: "opt125m-prefill" + - name: "llama-prefill" repository: "lmcache/vllm-openai" - tag: "2025-05-27-v1" - modelURL: "facebook/opt-125m" + tag: "nightly-2025-09-04" + modelURL: "meta-llama/Llama-3.1-8B-Instruct" replicaCount: 1 requestCPU: 8 requestMemory: "30Gi" # requestGPU: 1 pvcStorage: "50Gi" vllmConfig: - enablePrefixCaching: true - maxModelLen: 1024 - v1: 1 - gpuMemoryUtilization: 0.6 + enablePrefixCaching: false + # maxModelLen: 2048 + extraArgs: + - "--enforce-eager" + - "--disable-log-requests" lmcacheConfig: cudaVisibleDevices: "0" enabled: true kvRole: "kv_producer" + localCpu: true + maxLocalCpuSize: 5 + maxLocalDiskSize: 0 enableNixl: true + enableXpyd: true nixlRole: "sender" - nixlPeerHost: "vllm-opt125m-decode-engine-service" - nixlPeerPort: "55555" - nixlBufferSize: "1073741824" # 1GB + nixlProxyHost: "vllm-router-service" + nixlProxyPort: 7500 + nixlBufferSize: "1073741824" nixlBufferDevice: "cuda" - nixlEnableGc: true enablePD: true - cpuOffloadingBufferSize: 0 + rpcPort: "producer1" labels: - model: "opt125m-prefill" - chatTemplate: "chat.jinja2" - chatTemplateConfigMap: |- - {% for message in messages %}{{'<|im_start|>' + message['role'] + '\n' + message['content']}}{% if (loop.last and add_generation_prompt) or not loop.last %}{{ '<|im_end|>' + '\n'}}{% endif %}{% endfor %} - {% if add_generation_prompt and messages[-1]['role'] != 'assistant' %}{{ '<|im_start|>assistant\n' }}{% endif %} + model: "llama-prefill" + hf_token: # Decode node configuration - - name: "opt125m-decode" + - name: "llama-decode" repository: "lmcache/vllm-openai" - tag: "2025-05-27-v1" - modelURL: "facebook/opt-125m" + tag: "nightly-2025-09-04" + modelURL: "meta-llama/Llama-3.1-8B-Instruct" replicaCount: 1 requestCPU: 8 requestMemory: "30Gi" # requestGPU: 1 pvcStorage: "50Gi" vllmConfig: - enablePrefixCaching: true - maxModelLen: 1024 - v1: 1 + enablePrefixCaching: false + # maxModelLen: 2048 + extraArgs: + - "--enforce-eager" + - "--disable-log-requests" lmcacheConfig: cudaVisibleDevices: "1" enabled: true kvRole: "kv_consumer" # Set decode node as consumer + localCpu: false + maxLocalCpuSize: 0 enableNixl: true + enableXpyd: true nixlRole: "receiver" nixlPeerHost: "0.0.0.0" - nixlPeerPort: "55555" - nixlBufferSize: "1073741824" # 1GB + nixlPeerInitPort: 7300 + nixlPeerAllocPort: 7400 + nixlBufferSize: "2147483648" nixlBufferDevice: "cuda" - nixlEnableGc: true + # nixlBackends: ["UCX"] enablePD: true + rpcPort: "consumer1" + skipLastNTokens: 1 + hf_token: labels: - model: "opt125m-decode" - chatTemplate: "chat.jinja2" - chatTemplateConfigMap: |- - {% for message in messages %}{{'<|im_start|>' + message['role'] + '\n' + message['content']}}{% if (loop.last and add_generation_prompt) or not loop.last %}{{ '<|im_end|>' + '\n'}}{% endif %}{% endfor %} - {% if add_generation_prompt and messages[-1]['role'] != 'assistant' %}{{ '<|im_start|>assistant\n' }}{% endif %} + model: "llama-decode" containerSecurityContext: capabilities: add: - SYS_PTRACE - routerSpec: enableRouter: true - repository: "git-act-router" - imagePullPolicy: "IfNotPresent" - strategy: - type: Recreate + repository: "xiaokunchen/vllm-router" + tag: "08-27-v8" + imagePullPolicy: "Always" replicaCount: 1 containerPort: 8000 servicePort: 80 @@ -102,6 +104,11 @@ routerSpec: release: "router" extraArgs: - "--prefill-model-labels" - - "opt125m-prefill" + - "llama-prefill" - "--decode-model-labels" - - "opt125m-decode" + - "llama-decode" + nixlPeerHost: "vllm-llama-decode-engine-service" + nixlPeerInitPort: 7300 + nixlPeerAllocPort: 7400 + nixlProxyHost: "0.0.0.0" + nixlProxyPort: 7500 From 0b654026ab234ed15addb49e44021d52b17f5252 Mon Sep 17 00:00:00 2001 From: Kobe Chen Date: Fri, 5 Sep 2025 22:34:15 +0000 Subject: [PATCH 10/17] pass pre-commit Signed-off-by: Kobe Chen --- src/vllm_router/app.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/vllm_router/app.py b/src/vllm_router/app.py index 7102aeea0..278cefbf0 100644 --- a/src/vllm_router/app.py +++ b/src/vllm_router/app.py @@ -33,9 +33,9 @@ from vllm_router.routers.main_router import main_router from vllm_router.routers.metrics_router import metrics_router from vllm_router.routers.routing_logic import ( + DisaggregatedPrefillRouter, get_routing_logic, initialize_routing_logic, - DisaggregatedPrefillRouter, ) from vllm_router.service_discovery import ( ServiceDiscoveryType, @@ -100,7 +100,9 @@ async def lifespan(app: FastAPI): # only start the ZMQ task if the routing logic is RoutingLogic.DISAGGREGATED_PREFILL if isinstance(app.state.router, DisaggregatedPrefillRouter): - logger.info("Starting ZMQ task because the routing logic is RoutingLogic.DISAGGREGATED_PREFILL") + logger.info( + "Starting ZMQ task because the routing logic is RoutingLogic.DISAGGREGATED_PREFILL" + ) # Start the ZMQ task await start_zmq_task() From 4ebc430da4925f3f0d21edb177c432745007660b Mon Sep 17 00:00:00 2001 From: Kobe Chen Date: Fri, 5 Sep 2025 23:45:09 +0000 Subject: [PATCH 11/17] fix yield Signed-off-by: Kobe Chen --- src/vllm_router/app.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/vllm_router/app.py b/src/vllm_router/app.py index 278cefbf0..bc1995b7e 100644 --- a/src/vllm_router/app.py +++ b/src/vllm_router/app.py @@ -110,6 +110,8 @@ async def lifespan(app: FastAPI): # Stop the ZMQ task await stop_zmq_task() + else: + yield await app.state.aiohttp_client_wrapper.stop() From 7dcc0439800255f16659dce179a7c723333289ad Mon Sep 17 00:00:00 2001 From: Kobe Chen Date: Sat, 6 Sep 2025 01:37:58 +0000 Subject: [PATCH 12/17] Change router tag Signed-off-by: Kobe Chen --- .github/values-10-disagg-prefill.yaml | 2 +- docker/Dockerfile.pd | 14 ++++++++------ tutorials/assets/values-16-disagg-prefill.yaml | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/.github/values-10-disagg-prefill.yaml b/.github/values-10-disagg-prefill.yaml index 43cb154d4..c33de90d6 100644 --- a/.github/values-10-disagg-prefill.yaml +++ b/.github/values-10-disagg-prefill.yaml @@ -83,7 +83,7 @@ servingEngineSpec: routerSpec: enableRouter: true repository: "xiaokunchen/vllm-router" - tag: "08-27-v8" + tag: "09-05-v1" imagePullPolicy: "Always" replicaCount: 1 containerPort: 8000 diff --git a/docker/Dockerfile.pd b/docker/Dockerfile.pd index 61d5aeb6a..fbf41f036 100644 --- a/docker/Dockerfile.pd +++ b/docker/Dockerfile.pd @@ -1,12 +1,14 @@ -FROM lmcache/vllm-openai:nightly-2025-08-20 +FROM python:3.12-slim WORKDIR /app # hadolint ignore=DL3008 RUN --mount=type=cache,target=/var/lib/apt --mount=type=cache,target=/var/cache/apt \ apt-get update && \ - apt-get install -y --no-install-recommends git && \ - rm -rf /var/lib/apt/lists/* + apt-get install -y --no-install-recommends git curl && \ + rm -rf /var/lib/apt/lists/* && \ + curl -LsSf https://astral.sh/uv/install.sh | sh && \ + /root/.local/bin/uv venv /opt/venv # Copy the pyproject.toml and the git metadata first (leverage Docker layer caching) COPY pyproject.toml . @@ -20,9 +22,9 @@ ENV INSTALL_OPTIONAL_DEP=${INSTALL_OPTIONAL_DEP} # hadolint ignore=SC1091 RUN . /opt/venv/bin/activate && \ - uv pip install --upgrade --no-cache-dir pip setuptools_scm && \ - uv pip install --no-cache-dir . && \ - uv pip install zmq msgspec + /root/.local/bin/uv pip install --upgrade --no-cache-dir pip setuptools_scm && \ + /root/.local/bin/uv pip install --no-cache-dir .[$INSTALL_OPTIONAL_DEP] && \ + /root/.local/bin/uv pip install zmq msgspec # Set the entrypoint ENTRYPOINT ["/opt/venv/bin/vllm-router"] diff --git a/tutorials/assets/values-16-disagg-prefill.yaml b/tutorials/assets/values-16-disagg-prefill.yaml index 43cb154d4..c33de90d6 100644 --- a/tutorials/assets/values-16-disagg-prefill.yaml +++ b/tutorials/assets/values-16-disagg-prefill.yaml @@ -83,7 +83,7 @@ servingEngineSpec: routerSpec: enableRouter: true repository: "xiaokunchen/vllm-router" - tag: "08-27-v8" + tag: "09-05-v1" imagePullPolicy: "Always" replicaCount: 1 containerPort: 8000 From d64bafa28fc66c3ab4905d1234b2a3fa31a2989f Mon Sep 17 00:00:00 2001 From: Kobe Chen Date: Sat, 6 Sep 2025 05:23:03 +0000 Subject: [PATCH 13/17] working version; temporarily push with older Dockerfile.pd Signed-off-by: Kobe Chen --- .github/values-10-disagg-prefill.yaml | 14 +++++++------- tutorials/assets/values-16-disagg-prefill.yaml | 14 +++++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/.github/values-10-disagg-prefill.yaml b/.github/values-10-disagg-prefill.yaml index c33de90d6..8bfb1e255 100644 --- a/.github/values-10-disagg-prefill.yaml +++ b/.github/values-10-disagg-prefill.yaml @@ -8,7 +8,7 @@ servingEngineSpec: - name: "llama-prefill" repository: "lmcache/vllm-openai" tag: "nightly-2025-09-04" - modelURL: "meta-llama/Llama-3.1-8B-Instruct" + modelURL: "Qwen/Qwen3-8B" replicaCount: 1 requestCPU: 8 requestMemory: "30Gi" @@ -32,18 +32,18 @@ servingEngineSpec: nixlRole: "sender" nixlProxyHost: "vllm-router-service" nixlProxyPort: 7500 - nixlBufferSize: "1073741824" + nixlBufferSize: "3774873600" nixlBufferDevice: "cuda" enablePD: true rpcPort: "producer1" labels: model: "llama-prefill" - hf_token: + # hf_token: # Decode node configuration - name: "llama-decode" repository: "lmcache/vllm-openai" tag: "nightly-2025-09-04" - modelURL: "meta-llama/Llama-3.1-8B-Instruct" + modelURL: "Qwen/Qwen3-8B" replicaCount: 1 requestCPU: 8 requestMemory: "30Gi" @@ -67,13 +67,13 @@ servingEngineSpec: nixlPeerHost: "0.0.0.0" nixlPeerInitPort: 7300 nixlPeerAllocPort: 7400 - nixlBufferSize: "2147483648" + nixlBufferSize: "3774873600" nixlBufferDevice: "cuda" # nixlBackends: ["UCX"] enablePD: true rpcPort: "consumer1" skipLastNTokens: 1 - hf_token: + # hf_token: labels: model: "llama-decode" containerSecurityContext: @@ -83,7 +83,7 @@ servingEngineSpec: routerSpec: enableRouter: true repository: "xiaokunchen/vllm-router" - tag: "09-05-v1" + tag: "08-27-v8" imagePullPolicy: "Always" replicaCount: 1 containerPort: 8000 diff --git a/tutorials/assets/values-16-disagg-prefill.yaml b/tutorials/assets/values-16-disagg-prefill.yaml index c33de90d6..8bfb1e255 100644 --- a/tutorials/assets/values-16-disagg-prefill.yaml +++ b/tutorials/assets/values-16-disagg-prefill.yaml @@ -8,7 +8,7 @@ servingEngineSpec: - name: "llama-prefill" repository: "lmcache/vllm-openai" tag: "nightly-2025-09-04" - modelURL: "meta-llama/Llama-3.1-8B-Instruct" + modelURL: "Qwen/Qwen3-8B" replicaCount: 1 requestCPU: 8 requestMemory: "30Gi" @@ -32,18 +32,18 @@ servingEngineSpec: nixlRole: "sender" nixlProxyHost: "vllm-router-service" nixlProxyPort: 7500 - nixlBufferSize: "1073741824" + nixlBufferSize: "3774873600" nixlBufferDevice: "cuda" enablePD: true rpcPort: "producer1" labels: model: "llama-prefill" - hf_token: + # hf_token: # Decode node configuration - name: "llama-decode" repository: "lmcache/vllm-openai" tag: "nightly-2025-09-04" - modelURL: "meta-llama/Llama-3.1-8B-Instruct" + modelURL: "Qwen/Qwen3-8B" replicaCount: 1 requestCPU: 8 requestMemory: "30Gi" @@ -67,13 +67,13 @@ servingEngineSpec: nixlPeerHost: "0.0.0.0" nixlPeerInitPort: 7300 nixlPeerAllocPort: 7400 - nixlBufferSize: "2147483648" + nixlBufferSize: "3774873600" nixlBufferDevice: "cuda" # nixlBackends: ["UCX"] enablePD: true rpcPort: "consumer1" skipLastNTokens: 1 - hf_token: + # hf_token: labels: model: "llama-decode" containerSecurityContext: @@ -83,7 +83,7 @@ servingEngineSpec: routerSpec: enableRouter: true repository: "xiaokunchen/vllm-router" - tag: "09-05-v1" + tag: "08-27-v8" imagePullPolicy: "Always" replicaCount: 1 containerPort: 8000 From 4fb4c01c3f0d770f02a152c814e1656ca4492575 Mon Sep 17 00:00:00 2001 From: Kobe Chen Date: Thu, 11 Sep 2025 05:14:50 +0000 Subject: [PATCH 14/17] temp opt working version Signed-off-by: Kobe Chen --- .github/values-10-disagg-prefill.yaml | 14 +- docker/Dockerfile.pd | 14 +- src/vllm_router/routers/main_router.py | 12 +- src/vllm_router/service_discovery.py | 55 +++-- .../services/request_service/request.py | 218 +++++++++++++++--- 5 files changed, 245 insertions(+), 68 deletions(-) diff --git a/.github/values-10-disagg-prefill.yaml b/.github/values-10-disagg-prefill.yaml index 8bfb1e255..5f8444891 100644 --- a/.github/values-10-disagg-prefill.yaml +++ b/.github/values-10-disagg-prefill.yaml @@ -8,7 +8,7 @@ servingEngineSpec: - name: "llama-prefill" repository: "lmcache/vllm-openai" tag: "nightly-2025-09-04" - modelURL: "Qwen/Qwen3-8B" + modelURL: "facebook/opt-125m" replicaCount: 1 requestCPU: 8 requestMemory: "30Gi" @@ -38,12 +38,16 @@ servingEngineSpec: rpcPort: "producer1" labels: model: "llama-prefill" + chatTemplate: "chat.jinja2" + chatTemplateConfigMap: |- + {% for message in messages %}{{'<|im_start|>' + message['role'] + '\n' + message['content']}}{% if (loop.last and add_generation_prompt) or not loop.last %}{{ '<|im_end|>' + '\n'}}{% endif %}{% endfor %} + {% if add_generation_prompt and messages[-1]['role'] != 'assistant' %}{{ '<|im_start|>assistant\n' }}{% endif %} # hf_token: # Decode node configuration - name: "llama-decode" repository: "lmcache/vllm-openai" tag: "nightly-2025-09-04" - modelURL: "Qwen/Qwen3-8B" + modelURL: "facebook/opt-125m" replicaCount: 1 requestCPU: 8 requestMemory: "30Gi" @@ -76,6 +80,10 @@ servingEngineSpec: # hf_token: labels: model: "llama-decode" + chatTemplate: "chat.jinja2" + chatTemplateConfigMap: |- + {% for message in messages %}{{'<|im_start|>' + message['role'] + '\n' + message['content']}}{% if (loop.last and add_generation_prompt) or not loop.last %}{{ '<|im_end|>' + '\n'}}{% endif %}{% endfor %} + {% if add_generation_prompt and messages[-1]['role'] != 'assistant' %}{{ '<|im_start|>assistant\n' }}{% endif %} containerSecurityContext: capabilities: add: @@ -83,7 +91,7 @@ servingEngineSpec: routerSpec: enableRouter: true repository: "xiaokunchen/vllm-router" - tag: "08-27-v8" + tag: "09-10-v9" imagePullPolicy: "Always" replicaCount: 1 containerPort: 8000 diff --git a/docker/Dockerfile.pd b/docker/Dockerfile.pd index fbf41f036..730c2849d 100644 --- a/docker/Dockerfile.pd +++ b/docker/Dockerfile.pd @@ -1,14 +1,12 @@ -FROM python:3.12-slim +FROM lmcache/vllm-openai:nightly-2025-09-04 WORKDIR /app # hadolint ignore=DL3008 RUN --mount=type=cache,target=/var/lib/apt --mount=type=cache,target=/var/cache/apt \ apt-get update && \ - apt-get install -y --no-install-recommends git curl && \ - rm -rf /var/lib/apt/lists/* && \ - curl -LsSf https://astral.sh/uv/install.sh | sh && \ - /root/.local/bin/uv venv /opt/venv + apt-get install -y --no-install-recommends git && \ + rm -rf /var/lib/apt/lists/* # Copy the pyproject.toml and the git metadata first (leverage Docker layer caching) COPY pyproject.toml . @@ -22,9 +20,9 @@ ENV INSTALL_OPTIONAL_DEP=${INSTALL_OPTIONAL_DEP} # hadolint ignore=SC1091 RUN . /opt/venv/bin/activate && \ - /root/.local/bin/uv pip install --upgrade --no-cache-dir pip setuptools_scm && \ - /root/.local/bin/uv pip install --no-cache-dir .[$INSTALL_OPTIONAL_DEP] && \ - /root/.local/bin/uv pip install zmq msgspec + uv pip install --upgrade --no-cache-dir pip setuptools_scm && \ + uv pip install --no-cache-dir . && \ + uv pip install zmq msgspec # Set the entrypoint ENTRYPOINT ["/opt/venv/bin/vllm-router"] diff --git a/src/vllm_router/routers/main_router.py b/src/vllm_router/routers/main_router.py index 5d77124dd..386473b75 100644 --- a/src/vllm_router/routers/main_router.py +++ b/src/vllm_router/routers/main_router.py @@ -18,13 +18,14 @@ BackgroundTasks, Request, ) -from fastapi.responses import JSONResponse, Response +from fastapi.responses import JSONResponse, Response, StreamingResponse from vllm_router.dynamic_config import get_dynamic_config_watcher from vllm_router.log import init_logger from vllm_router.protocols import ModelCard, ModelList from vllm_router.service_discovery import get_service_discovery from vllm_router.services.request_service.request import ( + route_disaggregated_prefill_request, route_general_request, route_general_transcriptions, route_sleep_wakeup_request, @@ -58,6 +59,15 @@ async def route_chat_completion(request: Request, background_tasks: BackgroundTa logger.info("Serving response from semantic cache") return cache_response + # Check if using disaggregated prefill router + from vllm_router.routers.routing_logic import DisaggregatedPrefillRouter + + if isinstance(request.app.state.router, DisaggregatedPrefillRouter): + # route_disaggregated_prefill_request now handles chat completions format conversion + return await route_disaggregated_prefill_request( + request, "/v1/chat/completions", background_tasks + ) + logger.debug("No cache hit, forwarding request to backend") return await route_general_request( request, "/v1/chat/completions", background_tasks diff --git a/src/vllm_router/service_discovery.py b/src/vllm_router/service_discovery.py index d4bf3b228..8f8c4aaa5 100644 --- a/src/vllm_router/service_discovery.py +++ b/src/vllm_router/service_discovery.py @@ -23,6 +23,7 @@ from typing import Dict, List, Optional import aiohttp +import httpx import requests from kubernetes import client, config, watch @@ -320,7 +321,7 @@ def get_endpoint_info(self) -> List[EndpointInfo]: async def initialize_client_sessions(self) -> None: """ - Initialize aiohttp ClientSession objects for prefill and decode endpoints. + Initialize httpx AsyncClient objects for prefill and decode endpoints. This must be called from an async context during app startup. """ if ( @@ -330,16 +331,20 @@ async def initialize_client_sessions(self) -> None: endpoint_infos = self.get_endpoint_info() for endpoint_info in endpoint_infos: if endpoint_info.model_label in self.prefill_model_labels: - # TODO: fix Unclosed client session - self.app.state.prefill_client = aiohttp.ClientSession( + # Use httpx AsyncClient instead of aiohttp + import httpx + + self.app.state.prefill_client = httpx.AsyncClient( base_url=endpoint_info.url, - timeout=aiohttp.ClientTimeout(total=None), + timeout=None, ) elif endpoint_info.model_label in self.decode_model_labels: - # TODO: fix Unclosed client session - self.app.state.decode_client = aiohttp.ClientSession( + # Use httpx AsyncClient instead of aiohttp + import httpx + + self.app.state.decode_client = httpx.AsyncClient( base_url=endpoint_info.url, - timeout=aiohttp.ClientTimeout(total=None), + timeout=None, ) @@ -748,7 +753,7 @@ def close(self): async def initialize_client_sessions(self) -> None: """ - Initialize aiohttp ClientSession objects for prefill and decode endpoints. + Initialize httpx AsyncClient objects for prefill and decode endpoints. This must be called from an async context during app startup. """ if ( @@ -758,17 +763,21 @@ async def initialize_client_sessions(self) -> None: endpoint_infos = self.get_endpoint_info() for endpoint_info in endpoint_infos: if endpoint_info.model_label in self.prefill_model_labels: - # TODO: fix Unclosed client session - self.app.state.prefill_client = aiohttp.ClientSession( + # Use httpx AsyncClient instead of aiohttp + import httpx + + self.app.state.prefill_client = httpx.AsyncClient( base_url=endpoint_info.url, - timeout=aiohttp.ClientTimeout(total=None), + timeout=None, ) elif endpoint_info.model_label in self.decode_model_labels: - # TODO: fix Unclosed client session - self.app.state.decode_client = aiohttp.ClientSession( + # Use httpx AsyncClient instead of aiohttp + import httpx + + self.app.state.decode_client = httpx.AsyncClient( base_url=endpoint_info.url, - timeout=aiohttp.ClientTimeout(total=None), + timeout=None, ) @@ -1176,7 +1185,7 @@ def close(self): async def initialize_client_sessions(self) -> None: """ - Initialize aiohttp ClientSession objects for prefill and decode endpoints. + Initialize httpx AsyncClient objects for prefill and decode endpoints. This must be called from an async context during app startup. """ if ( @@ -1186,16 +1195,20 @@ async def initialize_client_sessions(self) -> None: endpoint_infos = self.get_endpoint_info() for endpoint_info in endpoint_infos: if endpoint_info.model_label in self.prefill_model_labels: - # TODO: fix Unclosed client session - self.app.state.prefill_client = aiohttp.ClientSession( + # Use httpx AsyncClient instead of aiohttp + import httpx + + self.app.state.prefill_client = httpx.AsyncClient( base_url=endpoint_info.url, - timeout=aiohttp.ClientTimeout(total=None), + timeout=None, ) elif endpoint_info.model_label in self.decode_model_labels: - # TODO: fix Unclosed client session - self.app.state.decode_client = aiohttp.ClientSession( + # Use httpx AsyncClient instead of aiohttp + import httpx + + self.app.state.decode_client = httpx.AsyncClient( base_url=endpoint_info.url, - timeout=aiohttp.ClientTimeout(total=None), + timeout=None, ) diff --git a/src/vllm_router/services/request_service/request.py b/src/vllm_router/services/request_service/request.py index 9986d097f..1589d2276 100644 --- a/src/vllm_router/services/request_service/request.py +++ b/src/vllm_router/services/request_service/request.py @@ -385,9 +385,9 @@ async def route_general_request( # TODO: Combine with send_request_to_tokenizer and send_request_to_decode async def send_request_to_prefiller( - client: aiohttp.ClientSession, endpoint: str, req_data: dict, request_id: str + client, endpoint: str, req_data: dict, request_id: str ): - """Send a request to a prefiller service.""" + """Send a request to a prefiller service using httpx.""" req_data = req_data.copy() req_data["max_tokens"] = 1 if "max_completion_tokens" in req_data: @@ -398,39 +398,45 @@ async def send_request_to_prefiller( "X-Request-Id": request_id, } - async with client.post(endpoint, json=req_data, headers=headers) as response: - response.raise_for_status() - return await response.json() + response = await client.post(endpoint, json=req_data, headers=headers) + response.raise_for_status() + return response.json() async def send_request_to_tokenizer( - client: aiohttp.ClientSession, endpoint: str, req_data: dict, request_id: str + client, endpoint: str, req_data: dict, request_id: str ): """ - Send a request to a tokenizer service. + Send a request to a tokenizer service using httpx. """ headers = { "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}", "X-Request-Id": request_id, } - async with client.post(endpoint, json=req_data, headers=headers) as response: - response.raise_for_status() - return await response.json() + response = await client.post(endpoint, json=req_data, headers=headers) + response.raise_for_status() + return response.json() async def send_request_to_decode( - client: aiohttp.ClientSession, endpoint: str, req_data: dict, request_id: str + client, endpoint: str, req_data: dict, request_id: str ): - """Asynchronously stream the response from a service using a persistent client.""" + """ + Asynchronously stream the response from a service using a persistent client. + Uses httpx streaming like the reference implementation. + """ headers = { "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}", "X-Request-Id": request_id, } - async with client.post(endpoint, json=req_data, headers=headers) as response: + # Use httpx streaming pattern from reference + async with client.stream( + "POST", endpoint, json=req_data, headers=headers + ) as response: response.raise_for_status() - async for chunk in response.content.iter_any(): + async for chunk in response.aiter_bytes(): yield chunk @@ -488,10 +494,16 @@ async def route_disaggregated_prefill_request( # for key, value in request.app.state.prefill_client.__dict__.items(): # print(f"{key}: {value}") + # Handle different tokenization formats for chat vs completions + if "messages" in request_json: + tokenize_payload = {"messages": request_json["messages"]} + else: + tokenize_payload = {"prompt": request_json["prompt"]} + tokenize_output = await send_request_to_tokenizer( request.app.state.prefill_client, "/tokenize", - {"prompt": request_json["prompt"]}, + tokenize_payload, request_id, ) # tokenize_output {'count': 6, 'max_model_len': 2048, 'tokens': [2, 2264, 1248, 16, 452, 116], 'token_strs': None} @@ -522,7 +534,10 @@ async def route_disaggregated_prefill_request( # Step 3: Send to prefiller prefill_output = await send_request_to_prefiller( - request.app.state.prefill_client, endpoint, request_json, request_id + request.app.state.prefill_client, + "/v1/completions", + request_json, + request_id, ) et = time.time() logger.info(f"{request_id} prefill time (TTFT): {et - st:.4f}") @@ -567,23 +582,63 @@ async def route_disaggregated_prefill_request( async def generate_stream(): try: - # Yield initial chunk with prefill data - head_chunk = { - "id": prefill_output["id"], - "object": "text_completion", - "created": prefill_output["created"], - "model": prefill_output["model"], - "choices": [ - { - "index": 0, - "text": prefill_output["choices"][0]["text"], - "logprobs": None, - "finish_reason": None, - "stop_reason": None, - } - ], - "usage": None, - } + # Check if this is for chat completions based on original request having messages + is_chat_completion = "messages" in request_json + + if is_chat_completion: + # For chat completions, yield initial chunk with role + initial_chunk = { + "id": prefill_output["id"], + "object": "chat.completion.chunk", + "created": prefill_output["created"], + "model": prefill_output["model"], + "choices": [ + { + "index": 0, + "delta": {"role": "assistant", "content": ""}, + "logprobs": None, + "finish_reason": None, + } + ], + } + yield ( + "data: " + json.dumps(initial_chunk, separators=(",", ":")) + "\n\n" + ).encode() + + # Then yield head chunk with content + head_chunk = { + "id": prefill_output["id"], + "object": "chat.completion.chunk", + "created": prefill_output["created"], + "model": prefill_output["model"], + "choices": [ + { + "index": 0, + "delta": {"content": prefill_output["choices"][0]["text"]}, + "logprobs": None, + "finish_reason": None, + } + ], + } + else: + # For completions, use original format (clean, without extra fields) + head_chunk = { + "id": prefill_output["id"], + "object": "text_completion", + "created": prefill_output["created"], + "model": prefill_output["model"], + "choices": [ + { + "index": 0, + "text": prefill_output["choices"][0]["text"], + "logprobs": None, + "finish_reason": None, + "stop_reason": None, + } + ], + "usage": None, + } + yield ( "data: " + json.dumps(head_chunk, separators=(",", ":")) + "\n\n" ).encode() @@ -592,9 +647,102 @@ async def generate_stream(): # Stream the rest from decode service async for chunk in send_request_to_decode( - request.app.state.decode_client, endpoint, request_json, request_id + request.app.state.decode_client, + "/v1/completions", + request_json, + request_id, ): - yield chunk + if is_chat_completion: + # Convert completion chunks to chat completion format (same logic as reference) + chunk_str = chunk.decode("utf-8") + if chunk_str.startswith("data: ") and not chunk_str.startswith( + "data: [DONE]" + ): + try: + json_str = chunk_str[6:].strip() # Remove 'data: ' prefix + if json_str: + completion_data = json.loads(json_str) + chat_completion_data = { + "id": completion_data["id"], + "object": "chat.completion.chunk", + "created": completion_data["created"], + "model": completion_data["model"], + "choices": [ + { + "index": 0, + "delta": { + "content": completion_data["choices"][ + 0 + ]["text"] + }, + "logprobs": completion_data["choices"][ + 0 + ].get("logprobs"), + "finish_reason": completion_data["choices"][ + 0 + ].get("finish_reason"), + } + ], + } + converted_chunk = ( + "data: " + + json.dumps( + chat_completion_data, separators=(",", ":") + ) + + "\n\n" + ).encode() + yield converted_chunk + except (json.JSONDecodeError, KeyError): + yield chunk + else: + yield chunk + else: + # For completions, filter out extra fields(prompt_token_ids, token_ids) from decode service + chunk_str = chunk.decode("utf-8") + if chunk_str.startswith("data: ") and not chunk_str.startswith( + "data: [DONE]" + ): + try: + json_str = chunk_str[6:].strip() # Remove 'data: ' prefix + if json_str: + completion_data = json.loads(json_str) + # Clean completion chunk without extra fields + clean_completion_data = { + "id": completion_data["id"], + "object": "text_completion", + "created": completion_data["created"], + "model": completion_data["model"], + "choices": [ + { + "index": 0, + "text": completion_data["choices"][0][ + "text" + ], + "logprobs": completion_data["choices"][ + 0 + ].get("logprobs"), + "finish_reason": completion_data["choices"][ + 0 + ].get("finish_reason"), + "stop_reason": completion_data["choices"][ + 0 + ].get("stop_reason"), + } + ], + "usage": completion_data.get("usage"), + } + cleaned_chunk = ( + "data: " + + json.dumps( + clean_completion_data, separators=(",", ":") + ) + + "\n\n" + ).encode() + yield cleaned_chunk + except (json.JSONDecodeError, KeyError): + yield chunk + else: + yield chunk except aiohttp.ClientResponseError as e: logger.error(f"HTTP error in decoder: {e}", exc_info=True) try: From faee188ca2e51b50625ae2e6bb91fa7520469b67 Mon Sep 17 00:00:00 2001 From: Kobe Chen Date: Thu, 11 Sep 2025 05:35:43 +0000 Subject: [PATCH 15/17] fix Signed-off-by: Kobe Chen --- src/vllm_router/requirements.txt | 1 + src/vllm_router/routers/main_router.py | 2 +- src/vllm_router/service_discovery.py | 1 - 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/vllm_router/requirements.txt b/src/vllm_router/requirements.txt index 4861ba4be..02a88aca9 100644 --- a/src/vllm_router/requirements.txt +++ b/src/vllm_router/requirements.txt @@ -1,6 +1,7 @@ aiofiles==24.1.0 aiohttp==3.9.5 fastapi==0.115.8 +httpx==0.28.1 kubernetes==32.0.0 numpy==1.26.4 prometheus_client==0.21.1 diff --git a/src/vllm_router/routers/main_router.py b/src/vllm_router/routers/main_router.py index 386473b75..93b87b8ad 100644 --- a/src/vllm_router/routers/main_router.py +++ b/src/vllm_router/routers/main_router.py @@ -18,7 +18,7 @@ BackgroundTasks, Request, ) -from fastapi.responses import JSONResponse, Response, StreamingResponse +from fastapi.responses import JSONResponse, Response from vllm_router.dynamic_config import get_dynamic_config_watcher from vllm_router.log import init_logger diff --git a/src/vllm_router/service_discovery.py b/src/vllm_router/service_discovery.py index d2e45ce73..d604f8f7a 100644 --- a/src/vllm_router/service_discovery.py +++ b/src/vllm_router/service_discovery.py @@ -23,7 +23,6 @@ from typing import Dict, List, Optional, Set import aiohttp -import httpx import requests from kubernetes import client, config, watch From bdea203bd332f02ff7ab79a2becedbb2ad70e1d2 Mon Sep 17 00:00:00 2001 From: Kobe Chen Date: Thu, 11 Sep 2025 07:36:13 +0000 Subject: [PATCH 16/17] CI fix for static router and k8s Signed-off-by: Kobe Chen --- .github/values-06-session-routing.yaml | 63 ++++++++++++------- .github/values-07-prefix-routing.yaml | 63 ++++++++++++------- .github/values-08-roundrobin-routing.yaml | 63 ++++++++++++------- .github/values-09-kvaware-routing.yaml | 63 ++++++++++++------- src/vllm_router/app.py | 4 +- .../services/request_service/request.py | 13 +--- .../e2e/run-static-discovery-routing-test.sh | 7 ++- 7 files changed, 168 insertions(+), 108 deletions(-) diff --git a/.github/values-06-session-routing.yaml b/.github/values-06-session-routing.yaml index 43974f94e..da331289d 100644 --- a/.github/values-06-session-routing.yaml +++ b/.github/values-06-session-routing.yaml @@ -1,12 +1,13 @@ +# Unified configuration for disaggregated prefill setup servingEngineSpec: - strategy: - type: Recreate - runtimeClassName: "" + enableEngine: true + runtimeClassName: "nvidia" + containerPort: 8000 modelSpec: # Prefill node configuration - - name: "opt125m-prefill" + - name: "llama-prefill" repository: "lmcache/vllm-openai" - tag: "2025-05-27-v1" + tag: "nightly-2025-09-04" modelURL: "facebook/opt-125m" replicaCount: 1 requestCPU: 8 @@ -14,33 +15,38 @@ servingEngineSpec: # requestGPU: 1 pvcStorage: "50Gi" vllmConfig: - enablePrefixCaching: true - maxModelLen: 1024 - v1: 1 - gpuMemoryUtilization: 0.6 + enablePrefixCaching: false + # maxModelLen: 2048 + extraArgs: + - "--enforce-eager" + - "--disable-log-requests" lmcacheConfig: cudaVisibleDevices: "0" enabled: true kvRole: "kv_producer" + localCpu: true + maxLocalCpuSize: 5 + maxLocalDiskSize: 0 enableNixl: true + enableXpyd: true nixlRole: "sender" - nixlPeerHost: "vllm-opt125m-decode-engine-service" - nixlPeerPort: "55555" - nixlBufferSize: "1073741824" # 1GB + nixlProxyHost: "vllm-router-service" + nixlProxyPort: 7500 + nixlBufferSize: "3774873600" nixlBufferDevice: "cuda" - nixlEnableGc: true enablePD: true - cpuOffloadingBufferSize: 0 + rpcPort: "producer1" labels: - model: "opt125m-prefill" + model: "llama-prefill" chatTemplate: "chat.jinja2" chatTemplateConfigMap: |- {% for message in messages %}{{'<|im_start|>' + message['role'] + '\n' + message['content']}}{% if (loop.last and add_generation_prompt) or not loop.last %}{{ '<|im_end|>' + '\n'}}{% endif %}{% endfor %} {% if add_generation_prompt and messages[-1]['role'] != 'assistant' %}{{ '<|im_start|>assistant\n' }}{% endif %} + # hf_token: # Decode node configuration - - name: "opt125m-decode" + - name: "llama-decode" repository: "lmcache/vllm-openai" - tag: "2025-05-27-v1" + tag: "nightly-2025-09-04" modelURL: "facebook/opt-125m" replicaCount: 1 requestCPU: 8 @@ -48,23 +54,32 @@ servingEngineSpec: # requestGPU: 1 pvcStorage: "50Gi" vllmConfig: - enablePrefixCaching: true - maxModelLen: 1024 - v1: 1 + enablePrefixCaching: false + # maxModelLen: 2048 + extraArgs: + - "--enforce-eager" + - "--disable-log-requests" lmcacheConfig: cudaVisibleDevices: "1" enabled: true kvRole: "kv_consumer" # Set decode node as consumer + localCpu: false + maxLocalCpuSize: 0 enableNixl: true + enableXpyd: true nixlRole: "receiver" nixlPeerHost: "0.0.0.0" - nixlPeerPort: "55555" - nixlBufferSize: "1073741824" # 1GB + nixlPeerInitPort: 7300 + nixlPeerAllocPort: 7400 + nixlBufferSize: "3774873600" nixlBufferDevice: "cuda" - nixlEnableGc: true + # nixlBackends: ["UCX"] enablePD: true + rpcPort: "consumer1" + skipLastNTokens: 1 + # hf_token: labels: - model: "opt125m-decode" + model: "llama-decode" chatTemplate: "chat.jinja2" chatTemplateConfigMap: |- {% for message in messages %}{{'<|im_start|>' + message['role'] + '\n' + message['content']}}{% if (loop.last and add_generation_prompt) or not loop.last %}{{ '<|im_end|>' + '\n'}}{% endif %}{% endfor %} diff --git a/.github/values-07-prefix-routing.yaml b/.github/values-07-prefix-routing.yaml index 4b8bf76af..140fe1431 100644 --- a/.github/values-07-prefix-routing.yaml +++ b/.github/values-07-prefix-routing.yaml @@ -1,12 +1,13 @@ +# Unified configuration for disaggregated prefill setup servingEngineSpec: - strategy: - type: Recreate - runtimeClassName: "" + enableEngine: true + runtimeClassName: "nvidia" + containerPort: 8000 modelSpec: # Prefill node configuration - - name: "opt125m-prefill" + - name: "llama-prefill" repository: "lmcache/vllm-openai" - tag: "2025-05-27-v1" + tag: "nightly-2025-09-04" modelURL: "facebook/opt-125m" replicaCount: 1 requestCPU: 8 @@ -14,33 +15,38 @@ servingEngineSpec: # requestGPU: 1 pvcStorage: "50Gi" vllmConfig: - enablePrefixCaching: true - maxModelLen: 1024 - v1: 1 - gpuMemoryUtilization: 0.6 + enablePrefixCaching: false + # maxModelLen: 2048 + extraArgs: + - "--enforce-eager" + - "--disable-log-requests" lmcacheConfig: cudaVisibleDevices: "0" enabled: true kvRole: "kv_producer" + localCpu: true + maxLocalCpuSize: 5 + maxLocalDiskSize: 0 enableNixl: true + enableXpyd: true nixlRole: "sender" - nixlPeerHost: "vllm-opt125m-decode-engine-service" - nixlPeerPort: "55555" - nixlBufferSize: "1073741824" # 1GB + nixlProxyHost: "vllm-router-service" + nixlProxyPort: 7500 + nixlBufferSize: "3774873600" nixlBufferDevice: "cuda" - nixlEnableGc: true enablePD: true - cpuOffloadingBufferSize: 0 + rpcPort: "producer1" labels: - model: "opt125m-prefill" + model: "llama-prefill" chatTemplate: "chat.jinja2" chatTemplateConfigMap: |- {% for message in messages %}{{'<|im_start|>' + message['role'] + '\n' + message['content']}}{% if (loop.last and add_generation_prompt) or not loop.last %}{{ '<|im_end|>' + '\n'}}{% endif %}{% endfor %} {% if add_generation_prompt and messages[-1]['role'] != 'assistant' %}{{ '<|im_start|>assistant\n' }}{% endif %} + # hf_token: # Decode node configuration - - name: "opt125m-decode" + - name: "llama-decode" repository: "lmcache/vllm-openai" - tag: "2025-05-27-v1" + tag: "nightly-2025-09-04" modelURL: "facebook/opt-125m" replicaCount: 1 requestCPU: 8 @@ -48,23 +54,32 @@ servingEngineSpec: # requestGPU: 1 pvcStorage: "50Gi" vllmConfig: - enablePrefixCaching: true - maxModelLen: 1024 - v1: 1 + enablePrefixCaching: false + # maxModelLen: 2048 + extraArgs: + - "--enforce-eager" + - "--disable-log-requests" lmcacheConfig: cudaVisibleDevices: "1" enabled: true kvRole: "kv_consumer" # Set decode node as consumer + localCpu: false + maxLocalCpuSize: 0 enableNixl: true + enableXpyd: true nixlRole: "receiver" nixlPeerHost: "0.0.0.0" - nixlPeerPort: "55555" - nixlBufferSize: "1073741824" # 1GB + nixlPeerInitPort: 7300 + nixlPeerAllocPort: 7400 + nixlBufferSize: "3774873600" nixlBufferDevice: "cuda" - nixlEnableGc: true + # nixlBackends: ["UCX"] enablePD: true + rpcPort: "consumer1" + skipLastNTokens: 1 + # hf_token: labels: - model: "opt125m-decode" + model: "llama-decode" chatTemplate: "chat.jinja2" chatTemplateConfigMap: |- {% for message in messages %}{{'<|im_start|>' + message['role'] + '\n' + message['content']}}{% if (loop.last and add_generation_prompt) or not loop.last %}{{ '<|im_end|>' + '\n'}}{% endif %}{% endfor %} diff --git a/.github/values-08-roundrobin-routing.yaml b/.github/values-08-roundrobin-routing.yaml index e9362eee6..e9051cf80 100644 --- a/.github/values-08-roundrobin-routing.yaml +++ b/.github/values-08-roundrobin-routing.yaml @@ -1,12 +1,13 @@ +# Unified configuration for disaggregated prefill setup servingEngineSpec: - strategy: - type: Recreate - runtimeClassName: "" + enableEngine: true + runtimeClassName: "nvidia" + containerPort: 8000 modelSpec: # Prefill node configuration - - name: "opt125m-prefill" + - name: "llama-prefill" repository: "lmcache/vllm-openai" - tag: "2025-05-27-v1" + tag: "nightly-2025-09-04" modelURL: "facebook/opt-125m" replicaCount: 1 requestCPU: 8 @@ -14,33 +15,38 @@ servingEngineSpec: # requestGPU: 1 pvcStorage: "50Gi" vllmConfig: - enablePrefixCaching: true - maxModelLen: 1024 - v1: 1 - gpuMemoryUtilization: 0.6 + enablePrefixCaching: false + # maxModelLen: 2048 + extraArgs: + - "--enforce-eager" + - "--disable-log-requests" lmcacheConfig: cudaVisibleDevices: "0" enabled: true kvRole: "kv_producer" + localCpu: true + maxLocalCpuSize: 5 + maxLocalDiskSize: 0 enableNixl: true + enableXpyd: true nixlRole: "sender" - nixlPeerHost: "vllm-opt125m-decode-engine-service" - nixlPeerPort: "55555" - nixlBufferSize: "1073741824" # 1GB + nixlProxyHost: "vllm-router-service" + nixlProxyPort: 7500 + nixlBufferSize: "3774873600" nixlBufferDevice: "cuda" - nixlEnableGc: true enablePD: true - cpuOffloadingBufferSize: 0 + rpcPort: "producer1" labels: - model: "opt125m-prefill" + model: "llama-prefill" chatTemplate: "chat.jinja2" chatTemplateConfigMap: |- {% for message in messages %}{{'<|im_start|>' + message['role'] + '\n' + message['content']}}{% if (loop.last and add_generation_prompt) or not loop.last %}{{ '<|im_end|>' + '\n'}}{% endif %}{% endfor %} {% if add_generation_prompt and messages[-1]['role'] != 'assistant' %}{{ '<|im_start|>assistant\n' }}{% endif %} + # hf_token: # Decode node configuration - - name: "opt125m-decode" + - name: "llama-decode" repository: "lmcache/vllm-openai" - tag: "2025-05-27-v1" + tag: "nightly-2025-09-04" modelURL: "facebook/opt-125m" replicaCount: 1 requestCPU: 8 @@ -48,23 +54,32 @@ servingEngineSpec: # requestGPU: 1 pvcStorage: "50Gi" vllmConfig: - enablePrefixCaching: true - maxModelLen: 1024 - v1: 1 + enablePrefixCaching: false + # maxModelLen: 2048 + extraArgs: + - "--enforce-eager" + - "--disable-log-requests" lmcacheConfig: cudaVisibleDevices: "1" enabled: true kvRole: "kv_consumer" # Set decode node as consumer + localCpu: false + maxLocalCpuSize: 0 enableNixl: true + enableXpyd: true nixlRole: "receiver" nixlPeerHost: "0.0.0.0" - nixlPeerPort: "55555" - nixlBufferSize: "1073741824" # 1GB + nixlPeerInitPort: 7300 + nixlPeerAllocPort: 7400 + nixlBufferSize: "3774873600" nixlBufferDevice: "cuda" - nixlEnableGc: true + # nixlBackends: ["UCX"] enablePD: true + rpcPort: "consumer1" + skipLastNTokens: 1 + # hf_token: labels: - model: "opt125m-decode" + model: "llama-decode" chatTemplate: "chat.jinja2" chatTemplateConfigMap: |- {% for message in messages %}{{'<|im_start|>' + message['role'] + '\n' + message['content']}}{% if (loop.last and add_generation_prompt) or not loop.last %}{{ '<|im_end|>' + '\n'}}{% endif %}{% endfor %} diff --git a/.github/values-09-kvaware-routing.yaml b/.github/values-09-kvaware-routing.yaml index ac58c26f6..9a90404e2 100644 --- a/.github/values-09-kvaware-routing.yaml +++ b/.github/values-09-kvaware-routing.yaml @@ -1,12 +1,13 @@ +# Unified configuration for disaggregated prefill setup servingEngineSpec: - strategy: - type: Recreate - runtimeClassName: "" + enableEngine: true + runtimeClassName: "nvidia" + containerPort: 8000 modelSpec: # Prefill node configuration - - name: "opt125m-prefill" + - name: "llama-prefill" repository: "lmcache/vllm-openai" - tag: "2025-05-27-v1" + tag: "nightly-2025-09-04" modelURL: "facebook/opt-125m" replicaCount: 1 requestCPU: 8 @@ -14,33 +15,38 @@ servingEngineSpec: # requestGPU: 1 pvcStorage: "50Gi" vllmConfig: - enablePrefixCaching: true - maxModelLen: 1024 - v1: 1 - gpuMemoryUtilization: 0.6 + enablePrefixCaching: false + # maxModelLen: 2048 + extraArgs: + - "--enforce-eager" + - "--disable-log-requests" lmcacheConfig: cudaVisibleDevices: "0" enabled: true kvRole: "kv_producer" + localCpu: true + maxLocalCpuSize: 5 + maxLocalDiskSize: 0 enableNixl: true + enableXpyd: true nixlRole: "sender" - nixlPeerHost: "vllm-opt125m-decode-engine-service" - nixlPeerPort: "55555" - nixlBufferSize: "1073741824" # 1GB + nixlProxyHost: "vllm-router-service" + nixlProxyPort: 7500 + nixlBufferSize: "3774873600" nixlBufferDevice: "cuda" - nixlEnableGc: true enablePD: true - cpuOffloadingBufferSize: 0 + rpcPort: "producer1" labels: - model: "opt125m-prefill" + model: "llama-prefill" chatTemplate: "chat.jinja2" chatTemplateConfigMap: |- {% for message in messages %}{{'<|im_start|>' + message['role'] + '\n' + message['content']}}{% if (loop.last and add_generation_prompt) or not loop.last %}{{ '<|im_end|>' + '\n'}}{% endif %}{% endfor %} {% if add_generation_prompt and messages[-1]['role'] != 'assistant' %}{{ '<|im_start|>assistant\n' }}{% endif %} + # hf_token: # Decode node configuration - - name: "opt125m-decode" + - name: "llama-decode" repository: "lmcache/vllm-openai" - tag: "2025-05-27-v1" + tag: "nightly-2025-09-04" modelURL: "facebook/opt-125m" replicaCount: 1 requestCPU: 8 @@ -48,23 +54,32 @@ servingEngineSpec: # requestGPU: 1 pvcStorage: "50Gi" vllmConfig: - enablePrefixCaching: true - maxModelLen: 1024 - v1: 1 + enablePrefixCaching: false + # maxModelLen: 2048 + extraArgs: + - "--enforce-eager" + - "--disable-log-requests" lmcacheConfig: cudaVisibleDevices: "1" enabled: true kvRole: "kv_consumer" # Set decode node as consumer + localCpu: false + maxLocalCpuSize: 0 enableNixl: true + enableXpyd: true nixlRole: "receiver" nixlPeerHost: "0.0.0.0" - nixlPeerPort: "55555" - nixlBufferSize: "1073741824" # 1GB + nixlPeerInitPort: 7300 + nixlPeerAllocPort: 7400 + nixlBufferSize: "3774873600" nixlBufferDevice: "cuda" - nixlEnableGc: true + # nixlBackends: ["UCX"] enablePD: true + rpcPort: "consumer1" + skipLastNTokens: 1 + # hf_token: labels: - model: "opt125m-decode" + model: "llama-decode" chatTemplate: "chat.jinja2" chatTemplateConfigMap: |- {% for message in messages %}{{'<|im_start|>' + message['role'] + '\n' + message['content']}}{% if (loop.last and add_generation_prompt) or not loop.last %}{{ '<|im_end|>' + '\n'}}{% endif %}{% endfor %} diff --git a/src/vllm_router/app.py b/src/vllm_router/app.py index bc1995b7e..324d9299b 100644 --- a/src/vllm_router/app.py +++ b/src/vllm_router/app.py @@ -104,7 +104,9 @@ async def lifespan(app: FastAPI): "Starting ZMQ task because the routing logic is RoutingLogic.DISAGGREGATED_PREFILL" ) # Start the ZMQ task - await start_zmq_task() + await start_zmq_task( + app.state.args.nixl_proxy_host, app.state.args.nixl_proxy_port + ) yield diff --git a/src/vllm_router/services/request_service/request.py b/src/vllm_router/services/request_service/request.py index 84b7fa136..ed5e9886e 100644 --- a/src/vllm_router/services/request_service/request.py +++ b/src/vllm_router/services/request_service/request.py @@ -68,16 +68,9 @@ zmq_ctx = zmq.asyncio.Context() -async def zmq_pull_server(): +async def zmq_pull_server(proxy_host: str = "0.0.0.0", proxy_port: int = 7500): try: socket = zmq_ctx.socket(zmq.PULL) - try: - from vllm_router.app import app - - proxy_host = app.state.args.nixl_proxy_host - proxy_port = app.state.args.nixl_proxy_port - except Exception as e: - logger.error(f"Failed to get proxy host and port from app state: {e}") proxy_url = f"{proxy_host}:{proxy_port}" socket.bind(f"tcp://{proxy_url}") logger.info(f"ZMQ proxy server started on {proxy_url}") @@ -107,11 +100,11 @@ async def zmq_pull_server(): zmq_task = None -async def start_zmq_task(): +async def start_zmq_task(proxy_host: str = "0.0.0.0", proxy_port: int = 7500): """Start the ZMQ pull server task.""" global zmq_task if zmq_task is None: - zmq_task = asyncio.create_task(zmq_pull_server()) + zmq_task = asyncio.create_task(zmq_pull_server(proxy_host, proxy_port)) logger.info("ZMQ task started") # Add a small delay to allow the task to start and potentially log any errors diff --git a/tests/e2e/run-static-discovery-routing-test.sh b/tests/e2e/run-static-discovery-routing-test.sh index 0b011ccd7..61168d52f 100755 --- a/tests/e2e/run-static-discovery-routing-test.sh +++ b/tests/e2e/run-static-discovery-routing-test.sh @@ -99,7 +99,12 @@ start_router() { --decode-model-labels "decode" \ --static-model-labels "prefill,decode" \ --session-key "$SESSION_KEY" \ - --routing-logic "$routing_logic" > "$log_file" 2>&1 & + --routing-logic "$routing_logic" \ + --nixl-peer-host "localhost" \ + --nixl-peer-init-port 7300 \ + --nixl-peer-alloc-port 7400 \ + --nixl-proxy-host "localhost" \ + --nixl-proxy-port 7500 > "$log_file" 2>&1 & ROUTER_PID=$! print_status "Router started with PID: $ROUTER_PID" From 9b2f57ddc153790149fe36a4923b7127d77fe4ec Mon Sep 17 00:00:00 2001 From: Kobe Chen Date: Fri, 12 Sep 2025 22:21:23 +0000 Subject: [PATCH 17/17] CI change Signed-off-by: Kobe Chen --- .github/values-06-session-routing.yaml | 3 ++- .github/values-07-prefix-routing.yaml | 3 ++- .github/values-08-roundrobin-routing.yaml | 3 ++- .github/values-09-kvaware-routing.yaml | 3 ++- tests/e2e/run-static-discovery-routing-test.sh | 4 ++-- 5 files changed, 10 insertions(+), 6 deletions(-) diff --git a/.github/values-06-session-routing.yaml b/.github/values-06-session-routing.yaml index da331289d..123a89f87 100644 --- a/.github/values-06-session-routing.yaml +++ b/.github/values-06-session-routing.yaml @@ -90,7 +90,8 @@ servingEngineSpec: - SYS_PTRACE routerSpec: - repository: "git-act-router" + repository: "xiaokunchen/vllm-router" + tag: "09-10-v9" imagePullPolicy: "IfNotPresent" strategy: type: Recreate diff --git a/.github/values-07-prefix-routing.yaml b/.github/values-07-prefix-routing.yaml index 140fe1431..4d6ef990f 100644 --- a/.github/values-07-prefix-routing.yaml +++ b/.github/values-07-prefix-routing.yaml @@ -90,7 +90,8 @@ servingEngineSpec: - SYS_PTRACE routerSpec: - repository: "git-act-router" + repository: "xiaokunchen/vllm-router" + tag: "09-10-v9" imagePullPolicy: "IfNotPresent" strategy: type: Recreate diff --git a/.github/values-08-roundrobin-routing.yaml b/.github/values-08-roundrobin-routing.yaml index e9051cf80..3894df334 100644 --- a/.github/values-08-roundrobin-routing.yaml +++ b/.github/values-08-roundrobin-routing.yaml @@ -90,7 +90,8 @@ servingEngineSpec: - SYS_PTRACE routerSpec: - repository: "git-act-router" + repository: "xiaokunchen/vllm-router" + tag: "09-10-v9" imagePullPolicy: "IfNotPresent" strategy: type: Recreate diff --git a/.github/values-09-kvaware-routing.yaml b/.github/values-09-kvaware-routing.yaml index 9a90404e2..31bbfac8a 100644 --- a/.github/values-09-kvaware-routing.yaml +++ b/.github/values-09-kvaware-routing.yaml @@ -90,7 +90,8 @@ servingEngineSpec: - SYS_PTRACE routerSpec: - repository: "git-act-router" + repository: "xiaokunchen/vllm-router" + tag: "09-10-v9" imagePullPolicy: "IfNotPresent" strategy: type: Recreate diff --git a/tests/e2e/run-static-discovery-routing-test.sh b/tests/e2e/run-static-discovery-routing-test.sh index 61168d52f..9df479961 100755 --- a/tests/e2e/run-static-discovery-routing-test.sh +++ b/tests/e2e/run-static-discovery-routing-test.sh @@ -100,10 +100,10 @@ start_router() { --static-model-labels "prefill,decode" \ --session-key "$SESSION_KEY" \ --routing-logic "$routing_logic" \ - --nixl-peer-host "localhost" \ + --nixl-peer-host "0.0.0.0" \ --nixl-peer-init-port 7300 \ --nixl-peer-alloc-port 7400 \ - --nixl-proxy-host "localhost" \ + --nixl-proxy-host "0.0.0.0" \ --nixl-proxy-port 7500 > "$log_file" 2>&1 & ROUTER_PID=$!