Skip to content

Commit 1d93f72

Browse files
authored
chore: enforce the client to publish options on SDP level (#1976)
Enforcing client publish options is now done on the SDP level, as we can't always rely on the SFU's built-in logic. 🎫 Ticket: https://linear.app/stream/issue/VID-1059/add-flags-in-pronto-for-sdp-munging
1 parent 45d1169 commit 1d93f72

File tree

12 files changed

+801
-48
lines changed

12 files changed

+801
-48
lines changed

packages/client/src/Call.ts

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { StreamSfuClient } from './StreamSfuClient';
22
import {
3+
BasePeerConnectionOpts,
34
Dispatcher,
45
getGenericSdp,
56
isAudioTrackType,
@@ -118,6 +119,7 @@ import {
118119
ClientDetails,
119120
Codec,
120121
ParticipantSource,
122+
PeerType,
121123
PublishOption,
122124
SubscribeOption,
123125
TrackType,
@@ -987,9 +989,11 @@ export class Call {
987989
// prepare a generic SDP and send it to the SFU.
988990
// these are throw-away SDPs that the SFU will use to determine
989991
// the capabilities of the client (codec support, etc.)
992+
const { dangerouslyForceCodec, fmtpLine, subscriberFmtpLine } =
993+
this.clientPublishOptions || {};
990994
const [subscriberSdp, publisherSdp] = await Promise.all([
991-
getGenericSdp('recvonly'),
992-
getGenericSdp('sendonly'),
995+
getGenericSdp('recvonly', dangerouslyForceCodec, subscriberFmtpLine),
996+
getGenericSdp('sendonly', dangerouslyForceCodec, fmtpLine),
993997
]);
994998
const isReconnecting =
995999
this.reconnectStrategy !== WebsocketReconnectStrategy.UNSPECIFIED;
@@ -1237,20 +1241,23 @@ export class Call {
12371241
if (closePreviousInstances && this.subscriber) {
12381242
this.subscriber.dispose();
12391243
}
1240-
this.subscriber = new Subscriber({
1244+
const basePeerConnectionOptions: BasePeerConnectionOpts = {
12411245
sfuClient,
12421246
dispatcher: this.dispatcher,
12431247
state: this.state,
12441248
connectionConfig,
12451249
tag: sfuClient.tag,
12461250
enableTracing,
1247-
onReconnectionNeeded: (kind, reason) => {
1251+
clientPublishOptions: this.clientPublishOptions,
1252+
onReconnectionNeeded: (kind, reason, peerType) => {
12481253
this.reconnect(kind, reason).catch((err) => {
1249-
const message = `[Reconnect] Error reconnecting after a subscriber error: ${reason}`;
1254+
const message = `[Reconnect] Error reconnecting, after a ${PeerType[peerType]} error: ${reason}`;
12501255
this.logger.warn(message, err);
12511256
});
12521257
},
1253-
});
1258+
};
1259+
1260+
this.subscriber = new Subscriber(basePeerConnectionOptions);
12541261

12551262
// anonymous users can't publish anything hence, there is no need
12561263
// to create Publisher Peer Connection for them
@@ -1259,21 +1266,7 @@ export class Call {
12591266
if (closePreviousInstances && this.publisher) {
12601267
this.publisher.dispose();
12611268
}
1262-
this.publisher = new Publisher({
1263-
sfuClient,
1264-
dispatcher: this.dispatcher,
1265-
state: this.state,
1266-
connectionConfig,
1267-
publishOptions,
1268-
tag: sfuClient.tag,
1269-
enableTracing,
1270-
onReconnectionNeeded: (kind, reason) => {
1271-
this.reconnect(kind, reason).catch((err) => {
1272-
const message = `[Reconnect] Error reconnecting after a publisher error: ${reason}`;
1273-
this.logger.warn(message, err);
1274-
});
1275-
},
1276-
});
1269+
this.publisher = new Publisher(basePeerConnectionOptions, publishOptions);
12771270
}
12781271

12791272
this.statsReporter?.stop();

packages/client/src/rtc/BasePeerConnection.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import { StreamSfuClient } from '../StreamSfuClient';
1313
import { AllSfuEvents, Dispatcher } from './Dispatcher';
1414
import { withoutConcurrency } from '../helpers/concurrency';
1515
import { StatsTracer, Tracer, traceRTCPeerConnection } from '../stats';
16-
import { BasePeerConnectionOpts, OnReconnectionNeeded } from './types';
16+
import type { BasePeerConnectionOpts, OnReconnectionNeeded } from './types';
17+
import type { ClientPublishOptions } from '../types';
1718

1819
/**
1920
* A base class for the `Publisher` and `Subscriber` classes.
@@ -25,6 +26,7 @@ export abstract class BasePeerConnection {
2526
protected readonly pc: RTCPeerConnection;
2627
protected readonly state: CallState;
2728
protected readonly dispatcher: Dispatcher;
29+
protected readonly clientPublishOptions?: ClientPublishOptions;
2830
protected sfuClient: StreamSfuClient;
2931

3032
private onReconnectionNeeded?: OnReconnectionNeeded;
@@ -55,6 +57,7 @@ export abstract class BasePeerConnection {
5557
onReconnectionNeeded,
5658
tag,
5759
enableTracing,
60+
clientPublishOptions,
5861
iceRestartDelay = 2500,
5962
}: BasePeerConnectionOpts,
6063
) {
@@ -63,6 +66,7 @@ export abstract class BasePeerConnection {
6366
this.state = state;
6467
this.dispatcher = dispatcher;
6568
this.iceRestartDelay = iceRestartDelay;
69+
this.clientPublishOptions = clientPublishOptions;
6670
this.onReconnectionNeeded = onReconnectionNeeded;
6771
this.logger = videoLoggerSystem.getLogger(
6872
peerType === PeerType.SUBSCRIBER ? 'Subscriber' : 'Publisher',
@@ -145,7 +149,7 @@ export abstract class BasePeerConnection {
145149
e.error.code === ErrorCode.PARTICIPANT_SIGNAL_LOST
146150
? WebsocketReconnectStrategy.FAST
147151
: WebsocketReconnectStrategy.REJOIN;
148-
this.onReconnectionNeeded?.(strategy, reason);
152+
this.onReconnectionNeeded?.(strategy, reason, this.peerType);
149153
});
150154
};
151155

@@ -284,6 +288,7 @@ export abstract class BasePeerConnection {
284288
this.onReconnectionNeeded?.(
285289
WebsocketReconnectStrategy.REJOIN,
286290
'Connection failed',
291+
this.peerType,
287292
);
288293
return;
289294
}

packages/client/src/rtc/Publisher.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { BasePeerConnection } from './BasePeerConnection';
2-
import {
2+
import type {
3+
BasePeerConnectionOpts,
34
PublishBundle,
4-
PublisherConstructorOpts,
55
TrackPublishOptions,
66
} from './types';
77
import { NegotiationError } from './NegotiationError';
@@ -21,7 +21,7 @@ import {
2121
} from './layers';
2222
import { isSvcCodec } from './codecs';
2323
import { isAudioTrackType } from './helpers/tracks';
24-
import { extractMid } from './helpers/sdp';
24+
import { extractMid, removeCodecsExcept } from './helpers/sdp';
2525
import { withoutConcurrency } from '../helpers/concurrency';
2626
import { isReactNative } from '../helpers/platforms';
2727

@@ -38,7 +38,10 @@ export class Publisher extends BasePeerConnection {
3838
/**
3939
* Constructs a new `Publisher` instance.
4040
*/
41-
constructor({ publishOptions, ...baseOptions }: PublisherConstructorOpts) {
41+
constructor(
42+
baseOptions: BasePeerConnectionOpts,
43+
publishOptions: PublishOption[],
44+
) {
4245
super(PeerType.PUBLISHER_UNSPECIFIED, baseOptions);
4346
this.publishOptions = publishOptions;
4447

@@ -378,7 +381,12 @@ export class Publisher extends BasePeerConnection {
378381
this.isIceRestarting = options?.iceRestart ?? false;
379382
await this.pc.setLocalDescription(offer);
380383

381-
const { sdp = '' } = offer;
384+
const { sdp: baseSdp = '' } = offer;
385+
const { dangerouslyForceCodec, fmtpLine } =
386+
this.clientPublishOptions || {};
387+
const sdp = dangerouslyForceCodec
388+
? removeCodecsExcept(baseSdp, dangerouslyForceCodec, fmtpLine)
389+
: baseSdp;
382390
const { response } = await this.sfuClient.setPublisher({ sdp, tracks });
383391
if (response.error) throw new NegotiationError(response.error);
384392

packages/client/src/rtc/Subscriber.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { NegotiationError } from './NegotiationError';
44
import { PeerType } from '../gen/video/sfu/models/models';
55
import { SubscriberOffer } from '../gen/video/sfu/event/events';
66
import { toTrackType, trackTypeToParticipantStreamKey } from './helpers/tracks';
7-
import { enableStereo } from './helpers/sdp';
7+
import { enableStereo, removeCodecsExcept } from './helpers/sdp';
88

99
/**
1010
* A wrapper around the `RTCPeerConnection` that handles the incoming
@@ -153,6 +153,15 @@ export class Subscriber extends BasePeerConnection {
153153
const answer = await this.pc.createAnswer();
154154
if (answer.sdp) {
155155
answer.sdp = enableStereo(subscriberOffer.sdp, answer.sdp);
156+
const { dangerouslyForceCodec, subscriberFmtpLine } =
157+
this.clientPublishOptions || {};
158+
if (dangerouslyForceCodec) {
159+
answer.sdp = removeCodecsExcept(
160+
answer.sdp,
161+
dangerouslyForceCodec,
162+
subscriberFmtpLine,
163+
);
164+
}
156165
}
157166
await this.pc.setLocalDescription(answer);
158167

packages/client/src/rtc/__tests__/Publisher.test.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,15 @@ describe('Publisher', () => {
6363
sfuClient['sessionId'] = sessionId;
6464

6565
state = new CallState();
66-
publisher = new Publisher({
67-
sfuClient,
68-
dispatcher,
69-
state,
70-
tag: 'test',
71-
enableTracing: false,
72-
publishOptions: [
66+
publisher = new Publisher(
67+
{
68+
sfuClient,
69+
dispatcher,
70+
state,
71+
tag: 'test',
72+
enableTracing: false,
73+
},
74+
[
7375
{
7476
id: 1,
7577
trackType: TrackType.VIDEO,
@@ -81,7 +83,7 @@ describe('Publisher', () => {
8183
maxSpatialLayers: 3,
8284
},
8385
],
84-
});
86+
);
8587
});
8688

8789
afterEach(() => {
@@ -309,6 +311,7 @@ describe('Publisher', () => {
309311
expect(publisher['onReconnectionNeeded']).toHaveBeenCalledWith(
310312
WebsocketReconnectStrategy.FAST,
311313
anyString(),
314+
PeerType.PUBLISHER_UNSPECIFIED,
312315
);
313316

314317
expect(pc.setLocalDescription).toHaveBeenCalledTimes(2);
@@ -354,6 +357,7 @@ describe('Publisher', () => {
354357
expect(publisher['onReconnectionNeeded']).toHaveBeenCalledWith(
355358
WebsocketReconnectStrategy.REJOIN,
356359
anyString(),
360+
PeerType.PUBLISHER_UNSPECIFIED,
357361
);
358362
});
359363

packages/client/src/rtc/codecs.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,28 @@
1+
import { removeCodecsExcept } from './helpers/sdp';
2+
13
/**
24
* Returns a generic SDP for the given direction.
35
* We use this SDP to send it as part of our JoinRequest so that the SFU
46
* can use it to determine the client's codec capabilities.
57
*
68
* @param direction the direction of the transceiver.
9+
* @param codecToKeep the codec mime type to keep (video/h264 or audio/opus).
10+
* @param fmtpProfileToKeep optional fmtp profile to keep.
711
*/
8-
export const getGenericSdp = async (direction: RTCRtpTransceiverDirection) => {
12+
export const getGenericSdp = async (
13+
direction: RTCRtpTransceiverDirection,
14+
codecToKeep: string | undefined,
15+
fmtpProfileToKeep: string | undefined,
16+
) => {
917
const tempPc = new RTCPeerConnection();
1018
tempPc.addTransceiver('video', { direction });
1119
tempPc.addTransceiver('audio', { direction });
1220

1321
const offer = await tempPc.createOffer();
14-
const sdp = offer.sdp ?? '';
22+
const { sdp: baseSdp = '' } = offer;
23+
const sdp = codecToKeep
24+
? removeCodecsExcept(baseSdp, codecToKeep, fmtpProfileToKeep)
25+
: baseSdp;
1526

1627
tempPc.getTransceivers().forEach((t) => {
1728
t.stop?.();

0 commit comments

Comments
 (0)