@@ -25,15 +25,16 @@ import {
2525} from './gen/video/sfu/signal_rpc/signal' ;
2626import { ICETrickle , TrackType } from './gen/video/sfu/models/models' ;
2727import { StreamClient } from './coordinator/connection/client' ;
28- import { generateUUIDv4 , sleep } from './coordinator/connection/utils' ;
28+ import { generateUUIDv4 } from './coordinator/connection/utils' ;
2929import { Credentials } from './gen/coordinator' ;
3030import { Logger } from './coordinator/connection/types' ;
3131import { getLogger , getLogLevel } from './logger' ;
32- import { withoutConcurrency } from './helpers/concurrency' ;
3332import {
3433 promiseWithResolvers ,
3534 PromiseWithResolvers ,
36- } from './helpers/withResolvers' ;
35+ makeSafePromise ,
36+ SafePromise ,
37+ } from './helpers/promise' ;
3738import { getTimers } from './timers' ;
3839
3940export type StreamSfuClientConstructor = {
@@ -102,7 +103,7 @@ export class StreamSfuClient {
102103 /**
103104 * Promise that resolves when the WebSocket connection is ready (open).
104105 */
105- private signalReady ! : Promise < WebSocket > ;
106+ private signalReady ! : SafePromise < WebSocket > ;
106107
107108 /**
108109 * Flag to indicate if the client is in the process of leaving the call.
@@ -117,15 +118,14 @@ export class StreamSfuClient {
117118 private pingIntervalInMs = 10 * 1000 ;
118119 private unhealthyTimeoutInMs = this . pingIntervalInMs + 5 * 1000 ;
119120 private lastMessageTimestamp ?: Date ;
120- private readonly restoreWebSocketConcurrencyTag = Symbol ( 'recoverWebSocket' ) ;
121121 private readonly unsubscribeIceTrickle : ( ) => void ;
122122 private readonly unsubscribeNetworkChanged : ( ) => void ;
123123 private readonly onSignalClose : ( ( ) => void ) | undefined ;
124124 private readonly logger : Logger ;
125125 private readonly logTag : string ;
126126 private readonly credentials : Credentials ;
127127 private readonly dispatcher : Dispatcher ;
128- private readonly joinResponseTimeout ? : number ;
128+ private readonly joinResponseTimeout : number ;
129129 private networkAvailableTask : PromiseWithResolvers < void > | undefined ;
130130 /**
131131 * Promise that resolves when the JoinResponse is received.
@@ -228,32 +228,31 @@ export class StreamSfuClient {
228228 } ) ;
229229
230230 this . signalWs . addEventListener ( 'close' , this . handleWebSocketClose ) ;
231- this . signalWs . addEventListener ( 'error' , this . restoreWebSocket ) ;
232-
233- this . signalReady = new Promise ( ( resolve ) => {
234- const onOpen = ( ) => {
235- this . signalWs . removeEventListener ( 'open' , onOpen ) ;
236- resolve ( this . signalWs ) ;
237- } ;
238- this . signalWs . addEventListener ( 'open' , onOpen ) ;
239- } ) ;
231+
232+ this . signalReady = makeSafePromise (
233+ Promise . race < WebSocket > ( [
234+ new Promise ( ( resolve ) => {
235+ const onOpen = ( ) => {
236+ this . signalWs . removeEventListener ( 'open' , onOpen ) ;
237+ resolve ( this . signalWs ) ;
238+ } ;
239+ this . signalWs . addEventListener ( 'open' , onOpen ) ;
240+ } ) ,
241+
242+ new Promise ( ( resolve , reject ) => {
243+ setTimeout (
244+ ( ) => reject ( new Error ( 'SFU WS connection timed out' ) ) ,
245+ this . joinResponseTimeout ,
246+ ) ;
247+ } ) ,
248+ ] ) ,
249+ ) ;
240250 } ;
241251
242252 private cleanUpWebSocket = ( ) => {
243- this . signalWs . removeEventListener ( 'error' , this . restoreWebSocket ) ;
244253 this . signalWs . removeEventListener ( 'close' , this . handleWebSocketClose ) ;
245254 } ;
246255
247- private restoreWebSocket = ( ) => {
248- withoutConcurrency ( this . restoreWebSocketConcurrencyTag , async ( ) => {
249- await this . networkAvailableTask ?. promise ;
250- this . logger ( 'debug' , 'Restoring SFU WS connection' ) ;
251- this . cleanUpWebSocket ( ) ;
252- await sleep ( 500 ) ;
253- this . createWebSocket ( ) ;
254- } ) . catch ( ( err ) => this . logger ( 'debug' , `Can't restore WS connection` , err ) ) ;
255- } ;
256-
257256 get isHealthy ( ) {
258257 return this . signalWs . readyState === WebSocket . OPEN ;
259258 }
@@ -410,7 +409,7 @@ export class StreamSfuClient {
410409 data : Omit < JoinRequest , 'sessionId' | 'token' > ,
411410 ) : Promise < JoinResponse > => {
412411 // wait for the signal web socket to be ready before sending "joinRequest"
413- await this . signalReady ;
412+ await this . signalReady ( ) ;
414413 if ( this . joinResponseTask . isResolved || this . joinResponseTask . isRejected ) {
415414 // we need to lock the RPC requests until we receive a JoinResponse.
416415 // that's why we have this primitive lock mechanism.
@@ -479,7 +478,7 @@ export class StreamSfuClient {
479478 } ;
480479
481480 private send = async ( message : SfuRequest ) => {
482- await this . signalReady ; // wait for the signal ws to be open
481+ await this . signalReady ( ) ; // wait for the signal ws to be open
483482 const msgJson = SfuRequest . toJson ( message ) ;
484483 if ( this . signalWs . readyState !== WebSocket . OPEN ) {
485484 this . logger ( 'debug' , 'Signal WS is not open. Skipping message' , msgJson ) ;
0 commit comments