Skip to content

Commit 3a18082

Browse files
committed
Add corrupted request logic with IterationStats and removal from SchedulerStats
Signed-off-by: atalhens <[email protected]>
1 parent a52cdaa commit 3a18082

File tree

9 files changed

+47
-72
lines changed

9 files changed

+47
-72
lines changed

examples/online_serving/prometheus_grafana/grafana.json

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -593,22 +593,6 @@
593593
"range": true,
594594
"refId": "C",
595595
"useBackend": false
596-
},
597-
{
598-
"datasource": {
599-
"type": "prometheus",
600-
"uid": "${DS_PROMETHEUS}"
601-
},
602-
"disableTextWrap": false,
603-
"editorMode": "builder",
604-
"expr": "vllm:num_requests_corrupted{model_name=\"$model_name\"}",
605-
"fullMetaSearch": false,
606-
"includeNullMetadata": true,
607-
"instant": false,
608-
"legendFormat": "Num Corrupted",
609-
"range": true,
610-
"refId": "D",
611-
"useBackend": false
612596
}
613597
],
614598
"title": "Scheduler State",

vllm/config/scheduler.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -137,12 +137,6 @@ class SchedulerConfig:
137137
structured outputs, speculative decoding, and pipeline parallelism.
138138
"""
139139

140-
include_corrupted_requests: bool = False
141-
"""If set to True, include corrupted requests in scheduler statistics.
142-
This adds computational overhead but provides more detailed metrics for
143-
monitoring and debugging purposes.
144-
"""
145-
146140
def compute_hash(self) -> str:
147141
"""
148142
WARNING: Whenever a new field is added to this config,

vllm/engine/arg_utils.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,6 @@ class EngineArgs:
550550
"""Custom logitproc types"""
551551

552552
async_scheduling: bool = SchedulerConfig.async_scheduling
553-
include_corrupted_requests: bool = SchedulerConfig.include_corrupted_requests
554553

555554
kv_sharing_fast_prefill: bool = CacheConfig.kv_sharing_fast_prefill
556555

@@ -1042,10 +1041,6 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser:
10421041
scheduler_group.add_argument(
10431042
"--async-scheduling", **scheduler_kwargs["async_scheduling"]
10441043
)
1045-
scheduler_group.add_argument(
1046-
"--include-corrupted-requests",
1047-
**scheduler_kwargs["include_corrupted_requests"],
1048-
)
10491044

10501045
# Compilation arguments
10511046
compilation_kwargs = get_kwargs(CompilationConfig)
@@ -1591,7 +1586,6 @@ def create_engine_config(
15911586
long_prefill_token_threshold=self.long_prefill_token_threshold,
15921587
disable_hybrid_kv_cache_manager=self.disable_hybrid_kv_cache_manager,
15931588
async_scheduling=self.async_scheduling,
1594-
include_corrupted_requests=self.include_corrupted_requests,
15951589
)
15961590

15971591
if not model_config.is_multimodal_model and self.default_mm_loras:

vllm/v1/core/sched/interface.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,8 @@ def reset_prefix_cache(self) -> bool:
136136
raise NotImplementedError
137137

138138
@abstractmethod
139-
def get_request_counts(self) -> tuple[int, int, int]:
140-
"""Returns (num_running_reqs, num_waiting_reqs, num_corrupted_reqs)."""
139+
def get_request_counts(self) -> tuple[int, int]:
140+
"""Returns (num_running_reqs, num_waiting_reqs)."""
141141
raise NotImplementedError
142142

143143
@abstractmethod

vllm/v1/core/sched/scheduler.py

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ def __init__(
124124
# Priority queues for requests.
125125
self.waiting = create_request_queue(self.policy)
126126
self.running: list[Request] = []
127-
self.corrupted: list[Request] = []
128127

129128
# The request IDs that are finished in between the previous and the
130129
# current steps. This is used to notify the workers about the finished
@@ -1043,6 +1042,7 @@ def update_from_output(
10431042
kv_transfer_params=kv_transfer_params,
10441043
trace_headers=request.trace_headers,
10451044
num_cached_tokens=request.num_cached_tokens,
1045+
num_nans_in_logits=request.num_nans_in_logits,
10461046
)
10471047
)
10481048
else:
@@ -1162,22 +1162,9 @@ def update_draft_token_ids(
11621162
else:
11631163
request.spec_token_ids = spec_token_ids
11641164

1165-
def _get_corrupted_requests_count(self, include_corrupted: bool) -> int:
1166-
"""Get the count of corrupted requests if enabled, otherwise return 0.
1167-
1168-
This method centralizes the corrupted requests counting logic to avoid
1169-
code duplication and improve performance.
1170-
"""
1171-
if not include_corrupted:
1172-
return 0
1173-
return sum(req.is_output_corrupted for req in self.running)
1174-
1175-
def get_request_counts(self) -> tuple[int, int, int]:
1176-
"""Returns (num_running_reqs, num_waiting_reqs, num_corrupted_reqs)."""
1177-
num_corrupted_reqs = self._get_corrupted_requests_count(
1178-
self.scheduler_config.include_corrupted_requests
1179-
)
1180-
return len(self.running), len(self.waiting), num_corrupted_reqs
1165+
def get_request_counts(self) -> tuple[int, int]:
1166+
"""Returns (num_running_reqs, num_waiting_reqs)."""
1167+
return len(self.running), len(self.waiting)
11811168

11821169
def add_request(self, request: Request) -> None:
11831170
self.waiting.add_request(request)
@@ -1271,7 +1258,6 @@ def make_stats(
12711258
return SchedulerStats(
12721259
num_running_reqs=len(self.running),
12731260
num_waiting_reqs=len(self.waiting),
1274-
num_corrupted_reqs=num_corrupted_reqs,
12751261
kv_cache_usage=self.kv_cache_manager.usage,
12761262
prefix_cache_stats=prefix_cache_stats,
12771263
connector_prefix_cache_stats=connector_prefix_cache_stats,

vllm/v1/engine/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ class EngineCoreOutput(
121121
trace_headers: Mapping[str, str] | None = None
122122
# The number of tokens with prefix cache hits.
123123
num_cached_tokens: int = 0
124+
# The number of NaNs in logits for this request.
125+
num_nans_in_logits: int = 0
124126

125127
@property
126128
def finished(self) -> bool:

vllm/v1/engine/core.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,6 @@ def __init__(
9898
)
9999

100100
self.log_stats = log_stats
101-
self.include_corrupted_requests = (
102-
vllm_config.scheduler_config.include_corrupted_requests
103-
)
104101

105102
# Setup Model.
106103
self.model_executor = executor_class(vllm_config)
@@ -1057,7 +1054,7 @@ def __init__(
10571054
# finished with DP peers every N steps.
10581055
self.step_counter = 0
10591056
self.current_wave = 0
1060-
self.last_counts = (0, 0, 0)
1057+
self.last_counts = (0, 0)
10611058

10621059
# Initialize the engine.
10631060
dp_rank = vllm_config.parallel_config.data_parallel_rank

vllm/v1/metrics/loggers.py

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from prometheus_client import Counter, Gauge, Histogram
1111

12+
import vllm.envs as envs
1213
from vllm.config import SupportsMetricsInfo, VllmConfig
1314
from vllm.distributed.kv_transfer.kv_connector.v1.metrics import KVConnectorLogging
1415
from vllm.logger import init_logger
@@ -104,6 +105,9 @@ def __init__(self, vllm_config: VllmConfig, engine_index: int = 0):
104105
self.engine_is_idle = False
105106
self.aggregated = False
106107

108+
# Track the number of corrupted requests, never reset.
109+
self.num_corrupted_reqs: int = 0
110+
107111
def _reset(self, now):
108112
self.last_log_time = now
109113

@@ -115,6 +119,7 @@ def _track_iteration_stats(self, iteration_stats: IterationStats):
115119
# Save tracked stats for token counters.
116120
self.num_prompt_tokens += iteration_stats.num_prompt_tokens
117121
self.num_generation_tokens += iteration_stats.num_generation_tokens
122+
self.num_corrupted_reqs += iteration_stats.num_corrupted_reqs
118123

119124
def _get_throughput(self, tracked_stats: int, now: float) -> float:
120125
# Compute summary metrics for tracked stats
@@ -187,7 +192,6 @@ def log(self):
187192
"Avg generation throughput: %.1f tokens/s",
188193
"Running: %d reqs",
189194
"Waiting: %d reqs",
190-
"Corrupted: %d reqs",
191195
"GPU KV cache usage: %.1f%%",
192196
"Prefix cache hit rate: %.1f%%",
193197
]
@@ -196,13 +200,15 @@ def log(self):
196200
self.last_generation_throughput,
197201
self.last_scheduler_stats.num_running_reqs,
198202
self.last_scheduler_stats.num_waiting_reqs,
199-
self.last_scheduler_stats.num_corrupted_reqs,
200203
self.last_scheduler_stats.kv_cache_usage * 100,
201204
self.prefix_caching_metrics.hit_rate * 100,
202205
]
203206
if not self.connector_prefix_caching_metrics.empty:
204207
log_parts.append("External prefix cache hit rate: %.1f%%")
205208
log_args.append(self.connector_prefix_caching_metrics.hit_rate * 100)
209+
if envs.VLLM_COMPUTE_NANS_IN_LOGITS:
210+
log_parts.append("Corrupted: %d reqs")
211+
log_args.append(self.num_corrupted_reqs)
206212
if not self.mm_caching_metrics.empty:
207213
log_parts.append("MM cache hit rate: %.1f%%")
208214
log_args.append(self.mm_caching_metrics.hit_rate * 100)
@@ -271,9 +277,6 @@ def aggregate_scheduler_stats(self):
271277
self.last_scheduler_stats.num_running_reqs += (
272278
last_scheduler_stats.num_running_reqs
273279
)
274-
self.last_scheduler_stats.num_corrupted_reqs += (
275-
last_scheduler_stats.num_corrupted_reqs
276-
)
277280
self.last_scheduler_stats.kv_cache_usage += (
278281
last_scheduler_stats.kv_cache_usage
279282
)
@@ -387,16 +390,6 @@ def __init__(
387390
gauge_scheduler_waiting, engine_indexes, model_name
388391
)
389392

390-
gauge_scheduler_corrupted = self._gauge_cls(
391-
name="vllm:num_requests_corrupted",
392-
documentation="Number of requests corrupted.",
393-
multiprocess_mode="mostrecent",
394-
labelnames=labelnames,
395-
)
396-
self.gauge_scheduler_corrupted = make_per_engine(
397-
gauge_scheduler_corrupted, engine_indexes, model_name
398-
)
399-
400393
#
401394
# GPU cache
402395
#
@@ -458,6 +451,16 @@ def __init__(
458451
gauge_kv_cache_usage, engine_indexes, model_name
459452
)
460453

454+
if envs.VLLM_COMPUTE_NANS_IN_LOGITS:
455+
counter_corrupted_requests = self._counter_cls(
456+
name="vllm:corrupted_requests",
457+
documentation="Number of requests corrupted out of running requests.",
458+
labelnames=labelnames,
459+
)
460+
self.counter_corrupted_requests = make_per_engine(
461+
counter_corrupted_requests, engine_indexes, model_name
462+
)
463+
461464
counter_prefix_cache_queries = self._counter_cls(
462465
name="vllm:prefix_cache_queries",
463466
documentation=(
@@ -910,10 +913,6 @@ def record(
910913
self.gauge_scheduler_waiting[engine_idx].set(
911914
scheduler_stats.num_waiting_reqs
912915
)
913-
self.gauge_scheduler_corrupted[engine_idx].set(
914-
scheduler_stats.num_corrupted_reqs
915-
)
916-
917916
if self.show_hidden_metrics:
918917
self.gauge_gpu_cache_usage[engine_idx].set(
919918
scheduler_stats.kv_cache_usage
@@ -958,6 +957,10 @@ def record(
958957
self.counter_num_preempted_reqs[engine_idx].inc(
959958
iteration_stats.num_preempted_reqs
960959
)
960+
if envs.VLLM_COMPUTE_NANS_IN_LOGITS and iteration_stats.num_corrupted_reqs > 0:
961+
self.counter_corrupted_requests[engine_idx].inc(
962+
iteration_stats.num_corrupted_reqs
963+
)
961964
self.counter_prompt_tokens[engine_idx].inc(iteration_stats.num_prompt_tokens)
962965
self.counter_generation_tokens[engine_idx].inc(
963966
iteration_stats.num_generation_tokens

vllm/v1/metrics/stats.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ class SchedulerStats:
156156

157157
num_running_reqs: int = 0
158158
num_waiting_reqs: int = 0
159-
num_corrupted_reqs: int = 0
160159

161160
# These are used for internal DP load-balancing.
162161
step_counter: int = 0
@@ -195,6 +194,9 @@ class RequestStateStats:
195194
# first token latency
196195
first_token_latency: float = 0.0
197196

197+
# Track if output has NaNs in logits
198+
num_nans_in_logits: int = 0
199+
198200

199201
@dataclass
200202
class FinishedRequestStats:
@@ -210,6 +212,7 @@ class FinishedRequestStats:
210212
inference_time: float = 0.0
211213
decode_time: float = 0.0
212214
mean_time_per_output_token: float = 0.0
215+
is_corrupted: bool = False
213216

214217

215218
class IterationStats:
@@ -220,6 +223,7 @@ def __init__(self):
220223
self.num_generation_tokens = 0
221224
self.num_prompt_tokens = 0
222225
self.num_preempted_reqs = 0
226+
self.num_corrupted_reqs = 0
223227
self.finished_requests: list[FinishedRequestStats] = []
224228
self.max_num_generation_tokens_iter: list[int] = []
225229
self.n_params_iter: list[int] = []
@@ -257,6 +261,10 @@ def update_from_output(
257261

258262
req_stats.num_generation_tokens += num_new_generation_tokens
259263

264+
# Track NaNs in logits if present
265+
if output.num_nans_in_logits > 0:
266+
req_stats.num_nans_in_logits += output.num_nans_in_logits
267+
260268
# Process request-level engine core events
261269
if output.events is not None:
262270
self.update_from_events(
@@ -327,6 +335,10 @@ def update_from_finished_request(
327335
else 0
328336
)
329337

338+
# Check if output was corrupted based on NaN count
339+
# (will be 0 if VLLM_COMPUTE_NANS_IN_LOGITS was not enabled)
340+
is_corrupted = req_stats.num_nans_in_logits > 0
341+
330342
finished_req = FinishedRequestStats(
331343
finish_reason=finish_reason,
332344
e2e_latency=e2e_latency,
@@ -338,7 +350,10 @@ def update_from_finished_request(
338350
inference_time=inference_time,
339351
decode_time=decode_time,
340352
mean_time_per_output_token=mean_time_per_output_token,
353+
is_corrupted=is_corrupted,
341354
)
355+
if is_corrupted:
356+
self.num_corrupted_reqs += 1
342357
self.finished_requests.append(finished_req)
343358

344359

0 commit comments

Comments
 (0)