|
3 | 3 | import signal |
4 | 4 | import sys |
5 | 5 | import time |
| 6 | +import weakref |
6 | 7 | from dataclasses import dataclass |
7 | 8 | from enum import Enum, auto |
8 | 9 | from multiprocessing.process import BaseProcess |
|
19 | 20 | from vllm.executor.multiproc_worker_utils import ( |
20 | 21 | _add_prefix, set_multiprocessing_worker_envs) |
21 | 22 | from vllm.logger import init_logger |
22 | | -from vllm.utils import (get_distributed_init_method, get_exception_traceback, |
23 | | - get_mp_context, get_open_port, get_open_zmq_ipc_path, |
24 | | - kill_process_tree, zmq_socket_ctx) |
| 23 | +from vllm.utils import (get_distributed_init_method, get_mp_context, |
| 24 | + get_open_port, get_open_zmq_ipc_path, zmq_socket_ctx) |
25 | 25 | from vllm.v1.executor.abstract import Executor |
26 | 26 | from vllm.v1.outputs import ModelRunnerOutput |
27 | 27 | from vllm.worker.worker_base import WorkerWrapperBase |
|
35 | 35 | class MultiprocExecutor(Executor): |
36 | 36 |
|
37 | 37 | def __init__(self, vllm_config: VllmConfig) -> None: |
| 38 | + # Call self.shutdown at exit to clean up |
| 39 | + # and ensure workers will be terminated. |
| 40 | + self._finalizer = weakref.finalize(self, self.shutdown) |
38 | 41 |
|
39 | 42 | # The child processes will send SIGQUIT when unrecoverable |
40 | 43 | # errors happen. |
@@ -344,15 +347,12 @@ def signal_handler(signum, frame): |
344 | 347 | worker.worker_busy_loop() |
345 | 348 |
|
346 | 349 | except SystemExit: |
347 | | - # worker_busy_loop sends exceptions to Executor and raises |
348 | | - # SystemExit. |
349 | | - shutdown_requested = True |
350 | 350 | logger.debug("Worker interrupted.") |
351 | 351 |
|
352 | 352 | except Exception: |
353 | 353 | # worker_busy_loop sends exceptions exceptons to Executor |
354 | 354 | # for shutdown, but if there is an error in startup or an |
355 | | - # error with IPC |
| 355 | + # error with IPC itself, we need to alert the parent. |
356 | 356 | # itself, we need to alert the parent so we can shut down. |
357 | 357 | psutil.Process().parent().send_signal(signal.SIGQUIT) |
358 | 358 | raise |
@@ -390,18 +390,16 @@ class ResponseStatus(Enum): |
390 | 390 |
|
391 | 391 | def worker_busy_loop(self): |
392 | 392 | """Main busy loop for Multiprocessing Workers""" |
| 393 | + |
393 | 394 | while True: |
394 | 395 | method, args, kwargs = self.rpc_broadcast_mq.dequeue() |
395 | 396 |
|
396 | 397 | try: |
397 | | - if self.rank == 0: |
398 | | - raise ValueError("SIMULATE CUDA ERROR") |
399 | 398 | output = getattr(self.worker, method)(*args, **kwargs) |
400 | 399 | except Exception as e: |
401 | 400 | self.worker_response_mq.enqueue( |
402 | 401 | (WorkerProc.ResponseStatus.FAILURE, e)) |
403 | | - traceback = get_exception_traceback() |
404 | | - logger.error("WorkerProc hit an exception: %s", traceback) |
| 402 | + logger.exception("WorkerProc hit an exception: %s", exc_info=e) |
405 | 403 | continue |
406 | 404 |
|
407 | 405 | self.worker_response_mq.enqueue( |
|
0 commit comments