Skip to content

Commit 464b1f8

Browse files
feat: implement SEP-1699 SSE polling via server-side disconnect
Add support for SSE retry field to enable server-controlled client reconnection timing. Client changes: - Capture server-provided retry field from SSE events - Use retry value for reconnection delay instead of exponential backoff - Reconnect on graceful stream close with Last-Event-ID header Server changes: - Add retryInterval option to StreamableHTTPServerTransportOptions - Send priming events with id/retry/empty-data when eventStore is configured - Add closeSSEStream(requestId) API to close POST SSE streams for polling - Priming events establish resumption capability before actual messages Tests: - Client: retry field capture, exponential backoff fallback, graceful reconnection - Server: priming events, retry field, stream closure, POST SSE polling flow
1 parent 4debc74 commit 464b1f8

File tree

4 files changed

+536
-3
lines changed

4 files changed

+536
-3
lines changed

src/client/streamableHttp.test.ts

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,6 +1010,148 @@ describe('StreamableHTTPClientTransport', () => {
10101010
});
10111011
});
10121012

1013+
describe('SSE retry field handling', () => {
1014+
beforeEach(() => {
1015+
vi.useFakeTimers();
1016+
(global.fetch as Mock).mockReset();
1017+
});
1018+
afterEach(() => vi.useRealTimers());
1019+
1020+
it('should use server-provided retry value for reconnection delay', async () => {
1021+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
1022+
reconnectionOptions: {
1023+
initialReconnectionDelay: 100,
1024+
maxReconnectionDelay: 5000,
1025+
reconnectionDelayGrowFactor: 2,
1026+
maxRetries: 3
1027+
}
1028+
});
1029+
1030+
// Create a stream that sends a retry field
1031+
const encoder = new TextEncoder();
1032+
const stream = new ReadableStream({
1033+
start(controller) {
1034+
// Send SSE event with retry field
1035+
const event =
1036+
'retry: 3000\nevent: message\nid: evt-1\ndata: {"jsonrpc": "2.0", "method": "notification", "params": {}}\n\n';
1037+
controller.enqueue(encoder.encode(event));
1038+
// Close stream to trigger reconnection
1039+
controller.close();
1040+
}
1041+
});
1042+
1043+
const fetchMock = global.fetch as Mock;
1044+
fetchMock.mockResolvedValueOnce({
1045+
ok: true,
1046+
status: 200,
1047+
headers: new Headers({ 'content-type': 'text/event-stream' }),
1048+
body: stream
1049+
});
1050+
1051+
// Second request for reconnection
1052+
fetchMock.mockResolvedValueOnce({
1053+
ok: true,
1054+
status: 200,
1055+
headers: new Headers({ 'content-type': 'text/event-stream' }),
1056+
body: new ReadableStream()
1057+
});
1058+
1059+
await transport.start();
1060+
await transport['_startOrAuthSse']({});
1061+
1062+
// Wait for stream to close and reconnection to be scheduled
1063+
await vi.advanceTimersByTimeAsync(100);
1064+
1065+
// Verify the server retry value was captured
1066+
const transportInternal = transport as unknown as { _serverRetryMs?: number };
1067+
expect(transportInternal._serverRetryMs).toBe(3000);
1068+
1069+
// Verify the delay calculation uses server retry value
1070+
const getDelay = transport['_getNextReconnectionDelay'].bind(transport);
1071+
expect(getDelay(0)).toBe(3000); // Should use server value, not 100ms initial
1072+
expect(getDelay(5)).toBe(3000); // Should still use server value for any attempt
1073+
});
1074+
1075+
it('should fall back to exponential backoff when no server retry value', () => {
1076+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
1077+
reconnectionOptions: {
1078+
initialReconnectionDelay: 100,
1079+
maxReconnectionDelay: 5000,
1080+
reconnectionDelayGrowFactor: 2,
1081+
maxRetries: 3
1082+
}
1083+
});
1084+
1085+
// Without any SSE stream, _serverRetryMs should be undefined
1086+
const transportInternal = transport as unknown as { _serverRetryMs?: number };
1087+
expect(transportInternal._serverRetryMs).toBeUndefined();
1088+
1089+
// Should use exponential backoff
1090+
const getDelay = transport['_getNextReconnectionDelay'].bind(transport);
1091+
expect(getDelay(0)).toBe(100); // 100 * 2^0
1092+
expect(getDelay(1)).toBe(200); // 100 * 2^1
1093+
expect(getDelay(2)).toBe(400); // 100 * 2^2
1094+
expect(getDelay(10)).toBe(5000); // capped at max
1095+
});
1096+
1097+
it('should reconnect on graceful stream close', async () => {
1098+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
1099+
reconnectionOptions: {
1100+
initialReconnectionDelay: 10,
1101+
maxReconnectionDelay: 1000,
1102+
reconnectionDelayGrowFactor: 1,
1103+
maxRetries: 1
1104+
}
1105+
});
1106+
1107+
// Create a stream that closes gracefully after sending an event with ID
1108+
const encoder = new TextEncoder();
1109+
const stream = new ReadableStream({
1110+
start(controller) {
1111+
// Send priming event with ID and retry field
1112+
const event = 'id: evt-1\nretry: 100\ndata: \n\n';
1113+
controller.enqueue(encoder.encode(event));
1114+
// Graceful close
1115+
controller.close();
1116+
}
1117+
});
1118+
1119+
const fetchMock = global.fetch as Mock;
1120+
fetchMock.mockResolvedValueOnce({
1121+
ok: true,
1122+
status: 200,
1123+
headers: new Headers({ 'content-type': 'text/event-stream' }),
1124+
body: stream
1125+
});
1126+
1127+
// Second request for reconnection
1128+
fetchMock.mockResolvedValueOnce({
1129+
ok: true,
1130+
status: 200,
1131+
headers: new Headers({ 'content-type': 'text/event-stream' }),
1132+
body: new ReadableStream()
1133+
});
1134+
1135+
await transport.start();
1136+
await transport['_startOrAuthSse']({});
1137+
1138+
// Wait for stream to process and close
1139+
await vi.advanceTimersByTimeAsync(50);
1140+
1141+
// Wait for reconnection delay (100ms from retry field)
1142+
await vi.advanceTimersByTimeAsync(150);
1143+
1144+
// Should have attempted reconnection
1145+
expect(fetchMock).toHaveBeenCalledTimes(2);
1146+
expect(fetchMock.mock.calls[0][1]?.method).toBe('GET');
1147+
expect(fetchMock.mock.calls[1][1]?.method).toBe('GET');
1148+
1149+
// Second call should include Last-Event-ID
1150+
const secondCallHeaders = fetchMock.mock.calls[1][1]?.headers;
1151+
expect(secondCallHeaders?.get('last-event-id')).toBe('evt-1');
1152+
});
1153+
});
1154+
10131155
describe('prevent infinite recursion when server returns 401 after successful auth', () => {
10141156
it('should throw error when server returns 401 after successful auth', async () => {
10151157
const message: JSONRPCMessage = {

src/client/streamableHttp.ts

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ export class StreamableHTTPClientTransport implements Transport {
134134
private _reconnectionOptions: StreamableHTTPReconnectionOptions;
135135
private _protocolVersion?: string;
136136
private _hasCompletedAuthFlow = false; // Circuit breaker: detect auth success followed by immediate 401
137+
private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field
137138

138139
onclose?: () => void;
139140
onerror?: (error: Error) => void;
@@ -202,6 +203,7 @@ export class StreamableHTTPClientTransport implements Transport {
202203

203204
private async _startOrAuthSse(options: StartSSEOptions): Promise<void> {
204205
const { resumptionToken } = options;
206+
205207
try {
206208
// Try to open an initial SSE stream with GET to listen for server messages
207209
// This is optional according to the spec - server may not support it
@@ -248,7 +250,12 @@ export class StreamableHTTPClientTransport implements Transport {
248250
* @returns Time to wait in milliseconds before next reconnection attempt
249251
*/
250252
private _getNextReconnectionDelay(attempt: number): number {
251-
// Access default values directly, ensuring they're never undefined
253+
// Use server-provided retry value if available
254+
if (this._serverRetryMs !== undefined) {
255+
return this._serverRetryMs;
256+
}
257+
258+
// Fall back to exponential backoff
252259
const initialDelay = this._reconnectionOptions.initialReconnectionDelay;
253260
const growFactor = this._reconnectionOptions.reconnectionDelayGrowFactor;
254261
const maxDelay = this._reconnectionOptions.maxReconnectionDelay;
@@ -301,7 +308,14 @@ export class StreamableHTTPClientTransport implements Transport {
301308
// Create a pipeline: binary stream -> text decoder -> SSE parser
302309
const reader = stream
303310
.pipeThrough(new TextDecoderStream() as ReadableWritablePair<string, Uint8Array>)
304-
.pipeThrough(new EventSourceParserStream())
311+
.pipeThrough(
312+
new EventSourceParserStream({
313+
onRetry: (retryMs: number) => {
314+
// Capture server-provided retry value for reconnection timing
315+
this._serverRetryMs = retryMs;
316+
}
317+
})
318+
)
305319
.getReader();
306320

307321
while (true) {
@@ -328,6 +342,19 @@ export class StreamableHTTPClientTransport implements Transport {
328342
}
329343
}
330344
}
345+
346+
// Handle graceful server-side disconnect
347+
// Server may close connection after sending event ID and retry field
348+
if (isReconnectable && this._abortController && !this._abortController.signal.aborted) {
349+
this._scheduleReconnection(
350+
{
351+
resumptionToken: lastEventId,
352+
onresumptiontoken,
353+
replayMessageId
354+
},
355+
0
356+
);
357+
}
331358
} catch (error) {
332359
// Handle stream errors - likely a network disconnect
333360
this.onerror?.(new Error(`SSE stream disconnected: ${error}`));

0 commit comments

Comments
 (0)