Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added proposals/imgs/enqueue_flowchart.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added proposals/imgs/sched_loop_flowchart.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
147 changes: 147 additions & 0 deletions proposals/queue_manager_README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Queue Manager for LLM Endpoint Routing

This module implements an asynchronous queue manager for dispatching of LLM inference requests to backend endpoints. It directly dispatches or queues requests based on endpoint load and endpoint metrics, improving overall Quality of Experience (QoE).

---

## Features

- Per-endpoint asynchronous request queues
- Condition-based dispatching using `asyncio.Condition`
- Request rerouting if an endpoint remains overloaded too long
- Session affinity preservation (stubbed for future KV cache usage)
- Graceful shutdown of all schedulers
- Queue can be enabled or disabled. Default is enabled.
- Note that queue manager is still instantiated, just not used.

---

## Flow Chart

![Logic flow for incoming request.](imgs/enqueue_flowchart.png)
![Logic flow for scheduler loop that runs per endpoint.](imgs/sched_loop_flowchart.png)

---

## File: `src/vllm_router/services/queue_service/queue_manager.py`

### Class: `EndpointQueueManager`

This class manages:

- `endpoint_queues`: A `PriorityQueue` per endpoint holding pending requests.
- `conditions`: An `asyncio.Condition` per endpoint used to notify the scheduler loop.
- `endpoint_tasks`: Background async tasks for each endpoint’s queue loop.
- `EngineStatsScraper`: Periodically scrapes GPU & model stats per endpoint.

---

## Request Lifecycle

### 1. Check Endpoint Availability

```python
if not queue_manager._endpoint_is_free(server_url):
```

- If the endpoint is overloaded (e.g. high GPU usage or too many active requests), the request is queued.
- If it's free, the request is dispatched immediately.

---

### 2. Enqueue Logic

```python
queue_manager.register_endpoint(server_url)

await queue_manager.enqueue(
server_url,
{
"request": request,
"request_id": request_id,
"body": request_body,
"endpoint": endpoint,
"background_tasks": background_tasks,
"result_future": response_future

},
priority=queue_manager.calculate_request_priority(request)
)
```

- Registers the endpoint queue and scheduler if not already present.
- Adds the request to a `PriorityQueue`.
- Notifies the condition variable to wake the scheduler.

If queued, awaits future response.

---

### 3. Scheduler Loop

```python
async def _scheduler_loop(self, endpoint_url: str):
```

Runs a background task for each endpoint:

- Waits for new requests in the queue.
- If the endpoint is free, dispatches the request.
- If a request has waited longer than max_queue_wait_time, the scheduler calls `_reroute_or_dispatch_stale_request` to determine next actions.

---

### 4. Dispatch Logic

```python
async def _dispatch_and_signal(...)
```

- Sends the request to the backend via `process_request(...)`.
- Returns a streaming response with appropriate headers.

---

### 5. Rerouting Stale Requests

If a request exceeds the `max_queue_wait_time` threshold:

```python
await self._reroute_or_dispatch_stale_request(request, original_endpoint)
```

- Attempts to reroute the request to a different free endpoint.
- Currently always reroutes
- If the new endpoint is also busy, queues the request there.

In future, can choose to keep request at that endpoint if it has session history, or KVCache matches.

---

## Configuration

```python
queue_manager = EndpointQueueManager(max_queue_wait_time=10)
```

- `max_queue_wait_time`: Max seconds a request can wait in queue before being rerouted or retried.

---

## Dependencies

- `asyncio`
- `EngineStatsScraper` from `vllm_router.stats.engine_stats`
- `process_request()` from `vllm_router.services.request_service.request`

---

## TODOs

- [ ] Implement KV cache-aware, session affinity logic
- [ ] Implement request priority classification
- [ ] Replace round-robin stale routing policy
- [ ] Retry policies and smarter rerouting heuristics
- [ ] Implement knapsack-like selection allowing for a group of requests to be dispatched at once

---
214 changes: 214 additions & 0 deletions src/tests/test_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import asyncio
import json
import time
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
import pytest_asyncio
from fastapi.responses import StreamingResponse

from vllm_router.services.queue_service.queue import (
get_queue_manager,
initialize_queue_manager,
)


@pytest.fixture
def mock_scraper():
scraper = MagicMock()
scraper.get_engine_stats.return_value = {
"endpoint1": MagicMock(num_running_requests=0, gpu_cache_usage_perc=0),
"endpoint2": MagicMock(num_running_requests=5, gpu_cache_usage_perc=50),
}
return scraper


@pytest_asyncio.fixture
async def queue_manager(mock_scraper):
initialize_queue_manager(
max_queue_wait_time=10,
max_running_requests=10,
max_gpu_perc=95,
scraper=mock_scraper,
)
manager = get_queue_manager()
await manager.register_endpoint("endpoint1")
await manager.register_endpoint("endpoint2")
yield manager
await manager.close()


@pytest.mark.asyncio
async def test_queue_manager_initialization(mock_scraper):
initialize_queue_manager(
max_queue_wait_time=10,
max_running_requests=10,
max_gpu_perc=95,
scraper=mock_scraper,
)
manager = get_queue_manager()
assert manager.max_queue_wait_time == 10
assert manager.max_running_requests == 10
assert manager.max_gpu_perc == 95
assert manager.scraper == mock_scraper


@pytest.mark.asyncio
async def test_register_endpoint(queue_manager):
for endpoint in ["endpoint1", "endpoint2"]:
assert endpoint in queue_manager.endpoint_queues
assert endpoint in queue_manager.conditions
assert endpoint in queue_manager.endpoint_tasks


@pytest.mark.asyncio
async def test_enqueue_request(queue_manager):
test_request = {"request_id": "test123", "body": "test"}
future = asyncio.Future()
test_request["_result_future"] = future
await queue_manager.enqueue("endpoint1", test_request, priority=1)
assert not queue_manager.endpoint_queues["endpoint1"].empty()
assert not future.done()


@pytest.mark.asyncio
async def test_endpoint_is_free(queue_manager, mock_scraper):
assert queue_manager._endpoint_is_free("endpoint1") is True
assert queue_manager._endpoint_is_free("endpoint2") is True

mock_scraper.get_engine_stats.return_value["endpoint2"].num_running_requests = 15
assert queue_manager._endpoint_is_free("endpoint2") is False

mock_scraper.get_engine_stats.return_value["endpoint2"].num_running_requests = 5
mock_scraper.get_engine_stats.return_value["endpoint2"].gpu_cache_usage_perc = 96
assert queue_manager._endpoint_is_free("endpoint2") is False


@pytest.mark.asyncio
async def test_dispatch_and_signal(queue_manager):
test_request = {
"request_id": "test123",
"body": json.dumps({"prompt": "hello"}),
"request": MagicMock(),
"endpoint": "endpoint1",
"background_tasks": MagicMock(),
"result_future": asyncio.Future(),
}

with patch(
"vllm_router.services.request_service.request.process_request",
new_callable=AsyncMock,
) as mock_process:

async def mock_stream():
yield ("content-type", 200)
yield StreamingResponse(content=MagicMock())

mock_process.return_value.__aiter__.return_value = mock_stream()

await queue_manager._dispatch_and_signal("endpoint1", test_request)


@pytest.mark.asyncio
async def test_scheduler_loop(queue_manager):
test_request = {
"request_id": "test123",
"body": json.dumps({"prompt": "hello"}),
"request": MagicMock(),
"endpoint": "endpoint1",
"background_tasks": MagicMock(),
"result_future": asyncio.Future(),
}

with patch(
"vllm_router.services.request_service.request.process_request"
) as mock_process:
mock_headers = {"content-type": "application/json"}
mock_status = 200
mock_stream = MagicMock()
mock_process.return_value = (mock_headers, mock_status, mock_stream)

await queue_manager.enqueue("endpoint1", test_request)
await asyncio.sleep(1.5) # Wait enough time for scheduler loop

assert test_request["result_future"].done()


@pytest.mark.asyncio
@patch(
"vllm_router.services.request_service.request.process_request",
new_callable=AsyncMock,
)
@patch(
"vllm_router.services.queue_service.queue.EndpointQueueManager._reroute_or_dispatch_stale_request",
new_callable=AsyncMock,
)
async def test_stale_request_rerouting(
mock_reroute, mock_process_request, queue_manager
):
dummy_request = MagicMock()
dummy_request.state = MagicMock()

# Simulate a quick response stream
async def dummy_stream():
yield ({"content-type": "application/json"}, 200)

mock_process_request.return_value = dummy_stream()

# Simulate a stale request
stale_request = {
"request_id": "stale123",
"body": '{"input": "hello"}',
"model_name": "test-model",
"session_id": "abc",
"request": dummy_request,
"endpoint": "endpoint1",
"background_tasks": MagicMock(),
"result_future": asyncio.Future(),
"enqueue_timestamp": time.time() - 15, # 15s ago
}
queue_manager._endpoint_is_free = MagicMock(return_value=False)

await queue_manager.enqueue("endpoint1", stale_request)

# Let scheduler tick
await asyncio.sleep(15)

mock_reroute.assert_called_once()


@pytest.mark.asyncio
async def test_shutdown(queue_manager):
assert not queue_manager._shutdown_event.is_set()
await queue_manager.close()
assert queue_manager._shutdown_event.is_set()
for task in queue_manager.endpoint_tasks.values():
assert task.done()


@pytest.mark.asyncio
async def test_singleton_pattern():
from vllm_router.services.queue_service import queue as queue_module

queue_module._global_queue_manager = None

scraper = MagicMock()
scraper.get_engine_stats.return_value = {
"endpoint1": MagicMock(num_running_requests=0, gpu_cache_usage_perc=0),
}

queue_module.initialize_queue_manager(
max_queue_wait_time=10,
max_running_requests=10,
max_gpu_perc=95,
scraper=scraper,
)
manager1 = queue_module.get_queue_manager()
manager2 = queue_module.get_queue_manager()
assert manager1 is manager2

await manager1.close()
queue_module._global_queue_manager = None

with pytest.raises(ValueError, match="Queue manager not initialized"):
queue_module.get_queue_manager()
Loading