-
Notifications
You must be signed in to change notification settings - Fork 1.4k
feat: implement SEP-1699 SSE polling via server-side disconnect #1129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
commit: |
ebd4179 to
0b61e98
Compare
src/server/streamableHttp.test.ts
Outdated
| toolResolve!(); | ||
| }); | ||
|
|
||
| it('should support POST SSE polling with client reconnection', async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably the most interesting test
src/server/streamableHttp.test.ts
Outdated
| const primingEventId = primingIdMatch![1]; | ||
|
|
||
| // Server closes the stream to trigger polling | ||
| transport.closeSSEStream(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this feels slightly weird. The whole point is for the server to be able to close SSE streams, so maybe this method should be exposed on server?
On the other hand, this disconnection only makes sense in the SHTTP transport - whereas the server is actually transport agnostic.
Maybe the correct expectation is to have the server call this method directly via its reference to the transport if necessary and actually available, so this might be fine actually.`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be called by the user on the server. A bunch of frameworks dont give the user easy access to the transport but they do the server. Proxy by reference is good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me it would feel very weird to have a closeSSEStream on the server that should have no knowledge of sessions let alone streams.
A bunch of frameworks dont give the user easy access to the transport but they do the server. Proxy by reference is good
This is a good point but maybe the right API is to expose the transport from the server?
a556d78 to
6368bfa
Compare
Add support for SSE retry field to enable server-controlled client reconnection timing. Client changes: - Capture server-provided retry field from SSE events - Use retry value for reconnection delay instead of exponential backoff - Reconnect on graceful stream close with Last-Event-ID header Server changes: - Add retryInterval option to StreamableHTTPServerTransportOptions - Send priming events with id/retry/empty-data when eventStore is configured - Add closeSSEStream(requestId) API to close POST SSE streams for polling - Priming events establish resumption capability before actual messages Tests: - Client: retry field capture, exponential backoff fallback, graceful reconnection - Server: priming events, retry field, stream closure, POST SSE polling flow
6368bfa to
464b1f8
Compare
|
Hi @jonathanhefner would love feedback on whether this accurately captures the intent of your original SEP @paoloricciuti in case you want to TAL as you had valuable input on modelcontextprotocol/conformance#35 |
paoloricciuti
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we actually had a brief discussion about this on discord and I think the spec should be clearer here: from the discussion we had the reconnection should be different from the standalone stream. So if the client was doing two POST requests each POST would have it's own lastEventId.
On reconnect this should basically result in 3 new GET SSE streams, two to get the remaining notificiations/responses from the POST requests and one as a standalone stream for the new notifications (and to resume the notifications eventually sent during the disconnection period).
However right now the server still errors out if there's more that one SSE stream. Should this be fixed? Is this even the right interpretation of the spec?
src/server/streamableHttp.test.ts
Outdated
| const primingEventId = primingIdMatch![1]; | ||
|
|
||
| // Server closes the stream to trigger polling | ||
| transport.closeSSEStream(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me it would feel very weird to have a closeSSEStream on the server that should have no knowledge of sessions let alone streams.
A bunch of frameworks dont give the user easy access to the transport but they do the server. Proxy by reference is good
This is a good point but maybe the right API is to expose the transport from the server?
jonathanhefner
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would love feedback on whether this accurately captures the intent of your original SEP
Yes, looks great! Thank you! 😃
src/server/streamableHttp.test.ts
Outdated
| toolResolve!(); | ||
|
|
||
| // Give the tool time to complete and store the result | ||
| await new Promise(resolve => setTimeout(resolve, 50)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is a concern, but this could lead to flaky tests. I'm not sure what we could await as an alternative though.
Addresses PR feedback from paoloricciuti requesting test coverage for the scenario where multiple messages are sent while the SSE client is disconnected. Uses a batch of tool calls to generate multiple responses that get stored and replayed on reconnection.
Ack, after the discusson on Discord seems clear we need to not error out on multiple streams. Still figuring out the right way to do that on this PR, will come back with an update soon, moving to draft for now. |
- Fix replayEvents to use streamId from last-event-id header - Add conflict check per streamId (not global) - Add missing close handler to clean up stream mapping - Add test demonstrating concurrent GET streams resuming different POST streams This aligns with the spec: "The client MAY remain connected to multiple SSE streams simultaneously."
151614e to
6d32c15
Compare
|
Horrible timeout based tests - but looking for feedback on the implementation before figuring out prettier tests. |
- Add closeStandaloneSSEStream() method to allow server to close the standalone GET notification stream, triggering client reconnection - Send priming event with retry field on GET streams (when eventStore configured) for resumability - Add tests for GET stream priming events and closeStandaloneSSEStream - Fix flaky test timeout for POST SSE polling test
Keep PR focused on POST stream resumability only: - Remove closeStandaloneSSEStream() method - Remove priming event from GET notification stream - Remove GET stream polling tests - Update resumability tests to not expect GET priming events
The server's replayEvents now requires getStreamIdForEventId to resolve the stream ID from an event ID. Without this, the fallback parsing with '::' separator doesn't match InMemoryEventStore's '_' separator, causing the resumability test to fail.
When getStreamIdForEventId is not implemented, fall back to using the streamId returned by replayEventsAfter() instead of requiring a specific event ID format. This preserves compatibility with existing EventStore implementations that don't implement the new optional method.
The test was incorrectly calling client.request() with resumptionToken expecting a POST response. Per the spec's "Resumability and Redelivery" section, resumption uses GET with Last-Event-ID header: > If the client wishes to resume after a broken connection, it SHOULD > issue an HTTP GET to the MCP endpoint, and include the Last-Event-ID > header to indicate the last event ID it received. See: https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#resumability-and-redelivery When resumptionToken is provided, the client's send() method only reconnects the GET SSE stream and returns early - it never sends the POST request. Fix by using transport.send() with a notification (no response expected) to properly trigger GET-based SSE reconnection.
| // Per spec, resumption uses GET with Last-Event-ID header, not POST | ||
| // When resumptionToken is provided, send() only triggers GET reconnection and returns early | ||
| // We use a notification (no id) so we don't expect a response | ||
| await transport2.send( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed this test to use GET for resumption as required by the spec: https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#resumability-and-redelivery
cc: @mattzcarey as we chatted about this weirdness between POST & GET
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels weird to have to send a notification to resume the stream...wouldn't it make sense to have a specific method to resume?
Summary
Implements SEP-1699 which enables servers to disconnect SSE connections at will by sending priming events and retry fields.
Motivation and Context
SEP-1699 introduces SSE polling behavior that allows servers to control client reconnection timing and close connections gracefully. This enables more efficient resource management on the server side while maintaining resumability.
We implement this on the
POSTSSE stream as implied by the SEP language linked above. I.e. when a server establishes an SSE stream:cancelSSEStreamto close the stream while still gathering the events.retryIntervalsupplied by the server before disconnection.How Has This Been Tested?
Breaking Changes
None. Client falls back to exponential backoff if no retry field is provided.
Types of changes
Checklist