diff --git a/inference/core/env.py b/inference/core/env.py index f862231a63..e8f83e014b 100644 --- a/inference/core/env.py +++ b/inference/core/env.py @@ -722,6 +722,8 @@ # seconds WEBRTC_MODAL_RESPONSE_TIMEOUT = int(os.getenv("WEBRTC_MODAL_RESPONSE_TIMEOUT", "60")) # seconds +WEBRTC_MODAL_WATCHDOG_TIMEMOUT = int(os.getenv("WEBRTC_MODAL_WATCHDOG_TIMEMOUT", "60")) +# seconds WEBRTC_MODAL_FUNCTION_TIME_LIMIT = int( os.getenv("WEBRTC_MODAL_FUNCTION_TIME_LIMIT", "3600") ) diff --git a/inference/core/interfaces/webrtc_worker/modal.py b/inference/core/interfaces/webrtc_worker/modal.py index 31120ef2e5..351282d940 100644 --- a/inference/core/interfaces/webrtc_worker/modal.py +++ b/inference/core/interfaces/webrtc_worker/modal.py @@ -43,6 +43,7 @@ WEBRTC_MODAL_SHUTDOWN_RESERVE, WEBRTC_MODAL_TOKEN_ID, WEBRTC_MODAL_TOKEN_SECRET, + WEBRTC_MODAL_WATCHDOG_TIMEMOUT, WORKFLOWS_CUSTOM_PYTHON_EXECUTION_MODE, ) from inference.core.exceptions import RoboflowAPIUnsuccessfulRequestError @@ -184,6 +185,7 @@ def check_nvidia_smi_gpu() -> str: "WEBRTC_MODAL_RTSP_PLACEHOLDER": WEBRTC_MODAL_RTSP_PLACEHOLDER, "WEBRTC_MODAL_RTSP_PLACEHOLDER_URL": WEBRTC_MODAL_RTSP_PLACEHOLDER_URL, "WEBRTC_MODAL_SHUTDOWN_RESERVE": str(WEBRTC_MODAL_SHUTDOWN_RESERVE), + "WEBRTC_MODAL_WATCHDOG_TIMEMOUT": str(WEBRTC_MODAL_WATCHDOG_TIMEMOUT), }, "volumes": {MODEL_CACHE_DIR: rfcache_volume}, } @@ -198,7 +200,7 @@ async def run_rtc_peer_connection_with_watchdog( ) watchdog = Watchdog( - timeout_seconds=30, + timeout_seconds=WEBRTC_MODAL_WATCHDOG_TIMEMOUT, ) rtc_peer_connection_task = asyncio.create_task( @@ -211,7 +213,6 @@ async def run_rtc_peer_connection_with_watchdog( ) def on_timeout(): - logger.info("Watchdog timeout reached") rtc_peer_connection_task.cancel() watchdog.on_timeout = on_timeout @@ -266,6 +267,7 @@ def rtc_peer_connection_modal( logger.info("declared_fps: %s", webrtc_request.declared_fps) logger.info("rtsp_url: %s", webrtc_request.rtsp_url) logger.info("processing_timeout: %s", webrtc_request.processing_timeout) + logger.info("watchdog_timeout: %s", WEBRTC_MODAL_WATCHDOG_TIMEMOUT) logger.info("requested_plan: %s", webrtc_request.requested_plan) logger.info("requested_region: %s", webrtc_request.requested_region) logger.info( diff --git a/inference/core/interfaces/webrtc_worker/watchdog.py b/inference/core/interfaces/webrtc_worker/watchdog.py index a24c907694..05efe4e441 100644 --- a/inference/core/interfaces/webrtc_worker/watchdog.py +++ b/inference/core/interfaces/webrtc_worker/watchdog.py @@ -20,6 +20,7 @@ def __init__( self._heartbeats = 0 def start(self): + logger.info("Starting watchdog with timeout %s", self.timeout_seconds) if not self.on_timeout: raise ValueError( "on_timeout callback must be provided before starting the watchdog" @@ -32,12 +33,13 @@ def stop(self): self._thread.join() def _watchdog_thread(self): + logger.info("Watchdog thread started") while not self._stopping: if not self.is_alive(): logger.error("Watchdog timeout reached") self.on_timeout() break - time.sleep(0.1) + time.sleep(1) logger.info("Watchdog stopped") def heartbeat(self): diff --git a/inference/core/interfaces/webrtc_worker/webrtc.py b/inference/core/interfaces/webrtc_worker/webrtc.py index 033490edcb..5fe10cdebb 100644 --- a/inference/core/interfaces/webrtc_worker/webrtc.py +++ b/inference/core/interfaces/webrtc_worker/webrtc.py @@ -634,6 +634,7 @@ async def recv(self): async def _wait_ice_complete(peer_connection: RTCPeerConnectionWithLoop, timeout=2.0): if peer_connection.iceGatheringState == "complete": + logger.info("ICE gathering state already complete") return fut = asyncio.get_running_loop().create_future() @@ -662,6 +663,7 @@ async def init_rtc_peer_connection_with_loop( shutdown_reserve: int = WEBRTC_MODAL_SHUTDOWN_RESERVE, heartbeat_callback: Optional[Callable[[], None]] = None, ) -> RTCPeerConnectionWithLoop: + logger.info("Initializing RTC peer connection with loop") # ice._mdns is instantiated on the module level, it has a lock that is bound to the event loop # avoid RuntimeError: asyncio.locks.Lock is bound to a different event loop if hasattr(ice, "_mdns"): @@ -956,10 +958,13 @@ def on_message(message): answer = await peer_connection.createAnswer() await peer_connection.setLocalDescription(answer) - logger.debug(f"WebRTC connection status: {peer_connection.connectionState}") - await _wait_ice_complete(peer_connection, timeout=2.0) + logger.info( + "Initialized RTC peer connection with loop (status: %s), sending answer", + peer_connection.connectionState, + ) + send_answer( WebRTCWorkerResult( answer={ @@ -969,7 +974,9 @@ def on_message(message): ) ) + logger.info("Answer sent, waiting for termination event") await terminate_event.wait() + logger.info("Termination event received, closing WebRTC connection") if player: logger.info("Stopping player") player.video.stop()