Skip to content

Commit 151614e

Browse files
Allow multiple GET streams for resuming different POST streams
- Fix replayEvents to use streamId from last-event-id header - Add conflict check per streamId (not global) - Add missing close handler to clean up stream mapping - Add test demonstrating concurrent GET streams resuming different POST streams This aligns with the spec: "The client MAY remain connected to multiple SSE streams simultaneously."
1 parent c1d581a commit 151614e

File tree

2 files changed

+228
-8
lines changed

2 files changed

+228
-8
lines changed

src/server/streamableHttp.test.ts

Lines changed: 170 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1270,16 +1270,21 @@ describe('StreamableHTTPServerTransport with resumability', () => {
12701270
let baseUrl: URL;
12711271
let sessionId: string;
12721272
let mcpServer: McpServer;
1273-
const storedEvents: Map<string, { eventId: string; message: JSONRPCMessage }> = new Map();
1273+
const storedEvents: Map<string, { eventId: string; message: JSONRPCMessage; streamId: string }> = new Map();
12741274

12751275
// Simple implementation of EventStore
12761276
const eventStore: EventStore = {
12771277
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
12781278
const eventId = `${streamId}_${randomUUID()}`;
1279-
storedEvents.set(eventId, { eventId, message });
1279+
storedEvents.set(eventId, { eventId, message, streamId });
12801280
return eventId;
12811281
},
12821282

1283+
async getStreamIdForEventId(eventId: string): Promise<string | undefined> {
1284+
const event = storedEvents.get(eventId);
1285+
return event?.streamId;
1286+
},
1287+
12831288
async replayEventsAfter(
12841289
lastEventId: EventId,
12851290
{
@@ -1288,11 +1293,11 @@ describe('StreamableHTTPServerTransport with resumability', () => {
12881293
send: (eventId: EventId, message: JSONRPCMessage) => Promise<void>;
12891294
}
12901295
): Promise<StreamId> {
1291-
const streamId = lastEventId.split('_')[0];
1292-
// Extract stream ID from the event ID
1296+
const event = storedEvents.get(lastEventId);
1297+
const streamId = event?.streamId || lastEventId.split('_')[0];
12931298
// For test simplicity, just return all events with matching streamId that aren't the lastEventId
1294-
for (const [eventId, { message }] of storedEvents.entries()) {
1295-
if (eventId.startsWith(streamId) && eventId !== lastEventId) {
1299+
for (const [eventId, { message, streamId: evtStreamId }] of storedEvents.entries()) {
1300+
if (evtStreamId === streamId && eventId !== lastEventId) {
12961301
await send(eventId, message);
12971302
}
12981303
}
@@ -1535,11 +1540,16 @@ describe('StreamableHTTPServerTransport POST SSE priming events', () => {
15351540
storedEvents.set(eventId, { eventId, message, streamId });
15361541
return eventId;
15371542
},
1543+
async getStreamIdForEventId(eventId: string): Promise<string | undefined> {
1544+
const event = storedEvents.get(eventId);
1545+
return event?.streamId;
1546+
},
15381547
async replayEventsAfter(
15391548
lastEventId: EventId,
15401549
{ send }: { send: (eventId: EventId, message: JSONRPCMessage) => Promise<void> }
15411550
): Promise<StreamId> {
1542-
const streamId = lastEventId.split('::')[0];
1551+
const event = storedEvents.get(lastEventId);
1552+
const streamId = event?.streamId || lastEventId.split('::')[0];
15431553
const eventsToReplay: Array<[string, { message: JSONRPCMessage }]> = [];
15441554
for (const [eventId, data] of storedEvents.entries()) {
15451555
if (data.streamId === streamId && eventId > lastEventId) {
@@ -1965,6 +1975,159 @@ describe('StreamableHTTPServerTransport POST SSE priming events', () => {
19651975
expect(eventIds).toBeTruthy();
19661976
expect(eventIds!.length).toBeGreaterThanOrEqual(3);
19671977
});
1978+
1979+
it('should allow resuming multiple POST streams via separate GET streams', async () => {
1980+
const result = await createTestServer({
1981+
sessionIdGenerator: () => randomUUID(),
1982+
eventStore: createEventStore(),
1983+
retryInterval: 1000
1984+
});
1985+
server = result.server;
1986+
transport = result.transport;
1987+
baseUrl = result.baseUrl;
1988+
mcpServer = result.mcpServer;
1989+
1990+
// Track tool execution state for two separate tools
1991+
let tool1Resolve: () => void;
1992+
let tool2Resolve: () => void;
1993+
const tool1Promise = new Promise<void>(resolve => {
1994+
tool1Resolve = resolve;
1995+
});
1996+
const tool2Promise = new Promise<void>(resolve => {
1997+
tool2Resolve = resolve;
1998+
});
1999+
2000+
// Register two tools
2001+
mcpServer.tool('stream-tool-1', 'First stream tool', {}, async () => {
2002+
await tool1Promise;
2003+
return { content: [{ type: 'text', text: 'Result from stream 1' }] };
2004+
});
2005+
mcpServer.tool('stream-tool-2', 'Second stream tool', {}, async () => {
2006+
await tool2Promise;
2007+
return { content: [{ type: 'text', text: 'Result from stream 2' }] };
2008+
});
2009+
2010+
// Initialize to get session ID
2011+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
2012+
sessionId = initResponse.headers.get('mcp-session-id') as string;
2013+
expect(sessionId).toBeDefined();
2014+
2015+
// POST tool call #1
2016+
const toolCall1: JSONRPCMessage = {
2017+
jsonrpc: '2.0',
2018+
id: 301,
2019+
method: 'tools/call',
2020+
params: { name: 'stream-tool-1', arguments: {} }
2021+
};
2022+
const post1Response = await fetch(baseUrl, {
2023+
method: 'POST',
2024+
headers: {
2025+
'Content-Type': 'application/json',
2026+
Accept: 'text/event-stream, application/json',
2027+
'mcp-session-id': sessionId,
2028+
'mcp-protocol-version': '2025-03-26'
2029+
},
2030+
body: JSON.stringify(toolCall1)
2031+
});
2032+
expect(post1Response.status).toBe(200);
2033+
2034+
// Read priming event and extract event ID for stream 1
2035+
const reader1 = post1Response.body?.getReader();
2036+
const { value: priming1 } = await reader1!.read();
2037+
const priming1Text = new TextDecoder().decode(priming1);
2038+
const priming1Match = priming1Text.match(/id: ([^\n]+)/);
2039+
expect(priming1Match).toBeTruthy();
2040+
const eventId1 = priming1Match![1];
2041+
2042+
// POST tool call #2
2043+
const toolCall2: JSONRPCMessage = {
2044+
jsonrpc: '2.0',
2045+
id: 302,
2046+
method: 'tools/call',
2047+
params: { name: 'stream-tool-2', arguments: {} }
2048+
};
2049+
const post2Response = await fetch(baseUrl, {
2050+
method: 'POST',
2051+
headers: {
2052+
'Content-Type': 'application/json',
2053+
Accept: 'text/event-stream, application/json',
2054+
'mcp-session-id': sessionId,
2055+
'mcp-protocol-version': '2025-03-26'
2056+
},
2057+
body: JSON.stringify(toolCall2)
2058+
});
2059+
expect(post2Response.status).toBe(200);
2060+
2061+
// Read priming event and extract event ID for stream 2
2062+
const reader2 = post2Response.body?.getReader();
2063+
const { value: priming2 } = await reader2!.read();
2064+
const priming2Text = new TextDecoder().decode(priming2);
2065+
const priming2Match = priming2Text.match(/id: ([^\n]+)/);
2066+
expect(priming2Match).toBeTruthy();
2067+
const eventId2 = priming2Match![1];
2068+
2069+
// Verify we have two different stream IDs
2070+
const streamId1 = eventId1.split('::')[0];
2071+
const streamId2 = eventId2.split('::')[0];
2072+
expect(streamId1).not.toBe(streamId2);
2073+
2074+
// Close both streams
2075+
transport.closeSSEStream(301);
2076+
transport.closeSSEStream(302);
2077+
2078+
// Verify both streams are closed
2079+
const { done: done1 } = await reader1!.read();
2080+
const { done: done2 } = await reader2!.read();
2081+
expect(done1).toBe(true);
2082+
expect(done2).toBe(true);
2083+
2084+
// Complete both tools while disconnected
2085+
tool1Resolve!();
2086+
tool2Resolve!();
2087+
await new Promise(resolve => setTimeout(resolve, 50));
2088+
2089+
// Resume BOTH streams via GET - they should work concurrently (no 409)
2090+
const [reconnect1Response, reconnect2Response] = await Promise.all([
2091+
fetch(baseUrl, {
2092+
method: 'GET',
2093+
headers: {
2094+
Accept: 'text/event-stream',
2095+
'mcp-session-id': sessionId,
2096+
'mcp-protocol-version': '2025-03-26',
2097+
'last-event-id': eventId1
2098+
}
2099+
}),
2100+
fetch(baseUrl, {
2101+
method: 'GET',
2102+
headers: {
2103+
Accept: 'text/event-stream',
2104+
'mcp-session-id': sessionId,
2105+
'mcp-protocol-version': '2025-03-26',
2106+
'last-event-id': eventId2
2107+
}
2108+
})
2109+
]);
2110+
2111+
// Both should succeed (not 409)
2112+
expect(reconnect1Response.status).toBe(200);
2113+
expect(reconnect2Response.status).toBe(200);
2114+
2115+
// Read results from both streams
2116+
const reconnect1Reader = reconnect1Response.body?.getReader();
2117+
const { value: replay1 } = await reconnect1Reader!.read();
2118+
const replay1Text = new TextDecoder().decode(replay1);
2119+
2120+
const reconnect2Reader = reconnect2Response.body?.getReader();
2121+
const { value: replay2 } = await reconnect2Reader!.read();
2122+
const replay2Text = new TextDecoder().decode(replay2);
2123+
2124+
// Each stream should have its own result
2125+
expect(replay1Text).toContain('Result from stream 1');
2126+
expect(replay1Text).toContain('"id":301');
2127+
2128+
expect(replay2Text).toContain('Result from stream 2');
2129+
expect(replay2Text).toContain('"id":302');
2130+
});
19682131
});
19692132

19702133
// Test onsessionclosed callback

src/server/streamableHttp.ts

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,17 @@ export interface EventStore {
3535
*/
3636
storeEvent(streamId: StreamId, message: JSONRPCMessage): Promise<EventId>;
3737

38+
/**
39+
* Get the stream ID associated with a given event ID.
40+
* @param eventId The event ID to look up
41+
* @returns The stream ID, or undefined if not found
42+
*
43+
* Optional: If not provided, the SDK will attempt to parse the streamId
44+
* from the eventId assuming format "streamId::...". Implementations should
45+
* provide this method for more reliable stream ID resolution.
46+
*/
47+
getStreamIdForEventId?(eventId: EventId): Promise<StreamId | undefined>;
48+
3849
replayEventsAfter(
3950
lastEventId: EventId,
4051
{
@@ -369,6 +380,44 @@ export class StreamableHTTPServerTransport implements Transport {
369380
return;
370381
}
371382
try {
383+
// Get streamId - prefer explicit method, fall back to parsing
384+
let streamId: string | undefined;
385+
if (this._eventStore.getStreamIdForEventId) {
386+
streamId = await this._eventStore.getStreamIdForEventId(lastEventId);
387+
} else {
388+
// Fallback: assume format "streamId::..."
389+
streamId = lastEventId.split('::')[0] || undefined;
390+
}
391+
392+
if (!streamId) {
393+
res.writeHead(400).end(
394+
JSON.stringify({
395+
jsonrpc: '2.0',
396+
error: {
397+
code: -32000,
398+
message: 'Invalid event ID format'
399+
},
400+
id: null
401+
})
402+
);
403+
return;
404+
}
405+
406+
// Check conflict with the SAME streamId we'll use for mapping
407+
if (this._streamMapping.get(streamId) !== undefined) {
408+
res.writeHead(409).end(
409+
JSON.stringify({
410+
jsonrpc: '2.0',
411+
error: {
412+
code: -32000,
413+
message: 'Conflict: Stream already has an active connection'
414+
},
415+
id: null
416+
})
417+
);
418+
return;
419+
}
420+
372421
const headers: Record<string, string> = {
373422
'Content-Type': 'text/event-stream',
374423
'Cache-Control': 'no-cache, no-transform',
@@ -380,16 +429,24 @@ export class StreamableHTTPServerTransport implements Transport {
380429
}
381430
res.writeHead(200, headers).flushHeaders();
382431

383-
const streamId = await this._eventStore?.replayEventsAfter(lastEventId, {
432+
// Replay events
433+
await this._eventStore.replayEventsAfter(lastEventId, {
384434
send: async (eventId: string, message: JSONRPCMessage) => {
385435
if (!this.writeSSEEvent(res, message, eventId)) {
386436
this.onerror?.(new Error('Failed replay events'));
387437
res.end();
388438
}
389439
}
390440
});
441+
442+
// Map using the same streamId we checked for conflicts
391443
this._streamMapping.set(streamId, res);
392444

445+
// Set up close handler for client disconnects
446+
res.on('close', () => {
447+
this._streamMapping.delete(streamId);
448+
});
449+
393450
// Add error handler for replay stream
394451
res.on('error', error => {
395452
this.onerror?.(error as Error);

0 commit comments

Comments
 (0)