Skip to content

Commit 1daaa7b

Browse files
committed
fix: use worker to prevent timer throttling
1 parent 4ec9488 commit 1daaa7b

File tree

8 files changed

+208
-10
lines changed

8 files changed

+208
-10
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#!/usr/bin/env bash
2+
3+
npx tsc src/timers/worker.ts \
4+
--skipLibCheck \
5+
--removeComments \
6+
--module preserve \
7+
--lib ES2020,WebWorker \
8+
--outDir worker-dist
9+
10+
cat <<EOF >src/timers/worker.build.ts
11+
export const timerWorker = {
12+
src: \`$(<worker-dist/worker.js)\`,
13+
};
14+
EOF
15+
16+
rm -r worker-dist

packages/client/package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@
1111
"scripts": {
1212
"clean": "rimraf dist",
1313
"start": "rollup -w -c",
14-
"build": "yarn clean && rollup -c",
14+
"build": "yarn clean && ./generate-timer-worker.sh && rollup -c",
1515
"test": "vitest",
1616
"clean:docs": "rimraf generated-docs",
1717
"test-ci": "vitest --coverage",
1818
"generate:open-api": "./generate-openapi.sh protocol",
19-
"generate:open-api:dev": "./generate-openapi.sh chat"
19+
"generate:open-api:dev": "./generate-openapi.sh chat",
20+
"generate:timer-worker": "./generate-timer-worker.sh"
2021
},
2122
"files": [
2223
"dist",

packages/client/src/StreamSfuClient.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import {
3434
promiseWithResolvers,
3535
PromiseWithResolvers,
3636
} from './helpers/withResolvers';
37+
import { getTimers } from './timers';
3738

3839
export type StreamSfuClientConstructor = {
3940
/**
@@ -110,7 +111,7 @@ export class StreamSfuClient {
110111
isLeaving = false;
111112

112113
private readonly rpc: SignalServerClient;
113-
private keepAliveInterval?: NodeJS.Timeout;
114+
private keepAliveInterval?: number;
114115
private connectionCheckTimeout?: NodeJS.Timeout;
115116
private migrateAwayTimeout?: NodeJS.Timeout;
116117
private pingIntervalInMs = 10 * 1000;
@@ -263,7 +264,7 @@ export class StreamSfuClient {
263264

264265
private handleWebSocketClose = () => {
265266
this.signalWs.removeEventListener('close', this.handleWebSocketClose);
266-
clearInterval(this.keepAliveInterval);
267+
getTimers().clearInterval(this.keepAliveInterval);
267268
clearTimeout(this.connectionCheckTimeout);
268269
this.onSignalClose?.();
269270
};
@@ -489,8 +490,9 @@ export class StreamSfuClient {
489490
};
490491

491492
private keepAlive = () => {
492-
clearInterval(this.keepAliveInterval);
493-
this.keepAliveInterval = setInterval(() => {
493+
const timers = getTimers();
494+
timers.clearInterval(this.keepAliveInterval);
495+
this.keepAliveInterval = timers.setInterval(() => {
494496
this.ping().catch((e) => {
495497
this.logger('error', 'Error sending healthCheckRequest to SFU', e);
496498
});

packages/client/src/coordinator/connection/connection.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import type {
2222
UR,
2323
} from './types';
2424
import type { ConnectedEvent, WSAuthMessage } from '../../gen/coordinator';
25+
import { getTimers } from '../../timers';
2526

2627
// Type guards to check WebSocket error type
2728
const isCloseEvent = (
@@ -58,7 +59,7 @@ export class StableWSConnection {
5859
authenticationSent: boolean;
5960
consecutiveFailures: number;
6061
pingInterval: number;
61-
healthCheckTimeoutRef?: NodeJS.Timeout;
62+
healthCheckTimeoutRef?: number;
6263
isConnecting: boolean;
6364
isDisconnected: boolean;
6465
isHealthy: boolean;
@@ -249,7 +250,7 @@ export class StableWSConnection {
249250

250251
// start by removing all the listeners
251252
if (this.healthCheckTimeoutRef) {
252-
clearInterval(this.healthCheckTimeoutRef);
253+
getTimers().clearInterval(this.healthCheckTimeoutRef);
253254
}
254255
if (this.connectionCheckTimeoutRef) {
255256
clearInterval(this.connectionCheckTimeoutRef);
@@ -757,12 +758,13 @@ export class StableWSConnection {
757758
* Schedules a next health check ping for websocket.
758759
*/
759760
scheduleNextPing = () => {
761+
const timers = getTimers();
760762
if (this.healthCheckTimeoutRef) {
761-
clearTimeout(this.healthCheckTimeoutRef);
763+
timers.clearTimeout(this.healthCheckTimeoutRef);
762764
}
763765

764766
// 30 seconds is the recommended interval (messenger uses this)
765-
this.healthCheckTimeoutRef = setTimeout(() => {
767+
this.healthCheckTimeoutRef = timers.setTimeout(() => {
766768
// send the healthcheck..., server replies with a health check event
767769
const data = [{ type: 'health.check', client_id: this.client.clientID }];
768770
// try to send on the connection
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import { lazy } from '../helpers/lazy';
2+
import { getLogger } from '../logger';
3+
import { TimerWorkerEvent, TimerWorkerRequest } from './types';
4+
import { timerWorker } from './worker.build';
5+
6+
class TimerWorker {
7+
private currentTimerId = 1;
8+
private callbacks = new Map<number, () => void>();
9+
private worker: Worker | undefined;
10+
private fallback = false;
11+
12+
setup(): void {
13+
try {
14+
const source = timerWorker.src;
15+
const blob = new Blob([source], {
16+
type: 'application/javascript; charset=utf-8',
17+
});
18+
const script = URL.createObjectURL(blob);
19+
this.worker = new Worker(script, { name: 'str-timer-worker' });
20+
this.worker.addEventListener('message', (event) => {
21+
const { type, id } = event.data as TimerWorkerEvent;
22+
if (type === 'tick') {
23+
this.callbacks.get(id)?.();
24+
}
25+
});
26+
} catch (err: any) {
27+
getLogger(['timer-worker'])('error', err);
28+
this.fallback = true;
29+
}
30+
}
31+
32+
destroy(): void {
33+
this.callbacks.clear();
34+
this.worker?.terminate();
35+
this.worker = undefined;
36+
this.fallback = false;
37+
}
38+
39+
get ready() {
40+
return this.fallback || Boolean(this.worker);
41+
}
42+
43+
setInterval(callback: () => void, timeout: number): number {
44+
return this.setTimer('setInterval', callback, timeout);
45+
}
46+
47+
clearInterval(id?: number): void {
48+
this.clearTimer('clearInterval', id);
49+
}
50+
51+
setTimeout(callback: () => void, timeout: number): number {
52+
return this.setTimer('setTimeout', callback, timeout);
53+
}
54+
55+
clearTimeout(id?: number): void {
56+
this.clearTimer('clearTimeout', id);
57+
}
58+
59+
private setTimer(
60+
type: 'setTimeout' | 'setInterval',
61+
callback: () => void,
62+
timeout: number,
63+
) {
64+
if (!this.ready) {
65+
this.setup();
66+
}
67+
68+
if (this.fallback) {
69+
return (type === 'setTimeout' ? setTimeout : setInterval)(
70+
callback,
71+
timeout,
72+
) as unknown as number;
73+
}
74+
75+
const id = this.getTimerId();
76+
this.callbacks.set(id, callback);
77+
this.sendMessage({ type, id, timeout });
78+
return id;
79+
}
80+
81+
private clearTimer(type: 'clearTimeout' | 'clearInterval', id?: number) {
82+
if (!id) {
83+
return;
84+
}
85+
86+
if (!this.ready) {
87+
this.setup();
88+
}
89+
90+
if (this.fallback) {
91+
this.clearInterval(id);
92+
return;
93+
}
94+
95+
this.callbacks.delete(id);
96+
this.sendMessage({ type, id });
97+
}
98+
99+
private getTimerId() {
100+
return this.currentTimerId++;
101+
}
102+
103+
private sendMessage(message: TimerWorkerRequest) {
104+
if (!this.worker) {
105+
throw new Error("Cannot use timer worker before it's set up");
106+
}
107+
108+
this.worker.postMessage(message);
109+
}
110+
}
111+
112+
export const getTimers = lazy(() => {
113+
const instance = new TimerWorker();
114+
instance.setup();
115+
return instance;
116+
});
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
export type TimerWorkerRequest =
2+
| {
3+
type: 'setInterval' | 'setTimeout';
4+
id: number;
5+
timeout: number;
6+
}
7+
| {
8+
type: 'clearInterval' | 'clearTimeout';
9+
id: number;
10+
};
11+
12+
export type TimerWorkerEvent = {
13+
type: 'tick';
14+
id: number;
15+
};
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
// Do not modify this file manually. You can edit worker.ts if necessary
2+
// and the run ./generate-timer-worker.sh
3+
export const timerWorker = {
4+
get src(): string {
5+
throw new Error(
6+
'Timer worker source missing. Did you forget to run generate-timer-worker.sh?',
7+
);
8+
},
9+
};
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/* eslint-disable */
2+
3+
import type { TimerWorkerEvent, TimerWorkerRequest } from './types';
4+
5+
const timerIdMapping = new Map<number, NodeJS.Timeout>();
6+
7+
self.addEventListener('message', (event: MessageEvent) => {
8+
const request = event.data as TimerWorkerRequest;
9+
10+
switch (request.type) {
11+
case 'setTimeout':
12+
case 'setInterval':
13+
timerIdMapping.set(
14+
request.id,
15+
(request.type === 'setTimeout' ? setTimeout : setInterval)(
16+
() => tick(request.id),
17+
request.timeout,
18+
),
19+
);
20+
break;
21+
22+
case 'clearTimeout':
23+
case 'clearInterval':
24+
(request.type === 'clearTimeout' ? clearTimeout : clearInterval)(
25+
timerIdMapping.get(request.id),
26+
);
27+
timerIdMapping.delete(request.id);
28+
break;
29+
}
30+
});
31+
32+
function tick(id: number) {
33+
const message: TimerWorkerEvent = { type: 'tick', id };
34+
self.postMessage(message);
35+
}
36+
37+
/* eslint-enable */

0 commit comments

Comments
 (0)