Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions inference/core/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,8 @@
# seconds
WEBRTC_MODAL_RESPONSE_TIMEOUT = int(os.getenv("WEBRTC_MODAL_RESPONSE_TIMEOUT", "60"))
# seconds
WEBRTC_MODAL_WATCHDOG_TIMEMOUT = int(os.getenv("WEBRTC_MODAL_WATCHDOG_TIMEMOUT", "60"))
# seconds
WEBRTC_MODAL_FUNCTION_TIME_LIMIT = int(
os.getenv("WEBRTC_MODAL_FUNCTION_TIME_LIMIT", "3600")
)
Expand Down
6 changes: 4 additions & 2 deletions inference/core/interfaces/webrtc_worker/modal.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
WEBRTC_MODAL_SHUTDOWN_RESERVE,
WEBRTC_MODAL_TOKEN_ID,
WEBRTC_MODAL_TOKEN_SECRET,
WEBRTC_MODAL_WATCHDOG_TIMEMOUT,
WORKFLOWS_CUSTOM_PYTHON_EXECUTION_MODE,
)
from inference.core.exceptions import RoboflowAPIUnsuccessfulRequestError
Expand Down Expand Up @@ -184,6 +185,7 @@ def check_nvidia_smi_gpu() -> str:
"WEBRTC_MODAL_RTSP_PLACEHOLDER": WEBRTC_MODAL_RTSP_PLACEHOLDER,
"WEBRTC_MODAL_RTSP_PLACEHOLDER_URL": WEBRTC_MODAL_RTSP_PLACEHOLDER_URL,
"WEBRTC_MODAL_SHUTDOWN_RESERVE": str(WEBRTC_MODAL_SHUTDOWN_RESERVE),
"WEBRTC_MODAL_WATCHDOG_TIMEMOUT": str(WEBRTC_MODAL_WATCHDOG_TIMEMOUT),
},
"volumes": {MODEL_CACHE_DIR: rfcache_volume},
}
Expand All @@ -198,7 +200,7 @@ async def run_rtc_peer_connection_with_watchdog(
)

watchdog = Watchdog(
timeout_seconds=30,
timeout_seconds=WEBRTC_MODAL_WATCHDOG_TIMEMOUT,
)

rtc_peer_connection_task = asyncio.create_task(
Expand All @@ -211,7 +213,6 @@ async def run_rtc_peer_connection_with_watchdog(
)

def on_timeout():
logger.info("Watchdog timeout reached")
rtc_peer_connection_task.cancel()

watchdog.on_timeout = on_timeout
Expand Down Expand Up @@ -266,6 +267,7 @@ def rtc_peer_connection_modal(
logger.info("declared_fps: %s", webrtc_request.declared_fps)
logger.info("rtsp_url: %s", webrtc_request.rtsp_url)
logger.info("processing_timeout: %s", webrtc_request.processing_timeout)
logger.info("watchdog_timeout: %s", WEBRTC_MODAL_WATCHDOG_TIMEMOUT)
logger.info("requested_plan: %s", webrtc_request.requested_plan)
logger.info("requested_region: %s", webrtc_request.requested_region)
logger.info(
Expand Down
4 changes: 3 additions & 1 deletion inference/core/interfaces/webrtc_worker/watchdog.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def __init__(
self._heartbeats = 0

def start(self):
logger.info("Starting watchdog with timeout %s", self.timeout_seconds)
if not self.on_timeout:
raise ValueError(
"on_timeout callback must be provided before starting the watchdog"
Expand All @@ -32,12 +33,13 @@ def stop(self):
self._thread.join()

def _watchdog_thread(self):
logger.info("Watchdog thread started")
while not self._stopping:
if not self.is_alive():
logger.error("Watchdog timeout reached")
self.on_timeout()
break
time.sleep(0.1)
time.sleep(1)
logger.info("Watchdog stopped")

def heartbeat(self):
Expand Down
11 changes: 9 additions & 2 deletions inference/core/interfaces/webrtc_worker/webrtc.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ async def recv(self):

async def _wait_ice_complete(peer_connection: RTCPeerConnectionWithLoop, timeout=2.0):
if peer_connection.iceGatheringState == "complete":
logger.info("ICE gathering state already complete")
return
fut = asyncio.get_running_loop().create_future()

Expand Down Expand Up @@ -662,6 +663,7 @@ async def init_rtc_peer_connection_with_loop(
shutdown_reserve: int = WEBRTC_MODAL_SHUTDOWN_RESERVE,
heartbeat_callback: Optional[Callable[[], None]] = None,
) -> RTCPeerConnectionWithLoop:
logger.info("Initializing RTC peer connection with loop")
# ice._mdns is instantiated on the module level, it has a lock that is bound to the event loop
# avoid RuntimeError: asyncio.locks.Lock is bound to a different event loop
if hasattr(ice, "_mdns"):
Expand Down Expand Up @@ -956,10 +958,13 @@ def on_message(message):
answer = await peer_connection.createAnswer()
await peer_connection.setLocalDescription(answer)

logger.debug(f"WebRTC connection status: {peer_connection.connectionState}")

await _wait_ice_complete(peer_connection, timeout=2.0)

logger.info(
"Initialized RTC peer connection with loop (status: %s), sending answer",
peer_connection.connectionState,
)

send_answer(
WebRTCWorkerResult(
answer={
Expand All @@ -969,7 +974,9 @@ def on_message(message):
)
)

logger.info("Answer sent, waiting for termination event")
await terminate_event.wait()
logger.info("Termination event received, closing WebRTC connection")
if player:
logger.info("Stopping player")
player.video.stop()
Expand Down