Skip to content

Commit bd79024

Browse files
authored
Merge pull request #341 from rumpl/stream-start-stop
Add session and agent info in the stream start/stop events
2 parents 4e7b576 + 577232d commit bd79024

File tree

4 files changed

+28
-80
lines changed

4 files changed

+28
-80
lines changed

pkg/runtime/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,9 +361,9 @@ func (c *Client) runAgentWithAgentName(ctx context.Context, sessionID, agent, ag
361361
case "error":
362362
eventChan <- Error(event["error"].(string))
363363
case "stream_started":
364-
eventChan <- StreamStarted()
364+
eventChan <- StreamStarted(sessionID, event["agent_name"].(string))
365365
case "stream_stopped":
366-
eventChan <- StreamStopped()
366+
eventChan <- StreamStopped(sessionID, event["agent_name"].(string))
367367
case "authorization_required":
368368
eventChan <- AuthorizationRequired(event["server_url"].(string), event["server_type"].(string), event["confirmation"].(string))
369369
case "session_compaction":

pkg/runtime/event.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,17 +103,21 @@ func ToolCallResponse(toolCall tools.ToolCall, response, agentName string) Event
103103
func (e *ToolCallResponseEvent) isEvent() {}
104104

105105
type StreamStartedEvent struct {
106-
Type string `json:"type"`
106+
Type string `json:"type"`
107+
SessionID string `json:"session_id,omitempty"`
108+
AgentContext
107109
}
108110

109-
func StreamStarted() Event {
111+
func StreamStarted(sessionID, agentName string) Event {
110112
return &StreamStartedEvent{
111-
Type: "stream_started",
113+
Type: "stream_started",
114+
SessionID: sessionID,
115+
AgentContext: AgentContext{AgentName: agentName},
112116
}
113117
}
114118

115119
func (e *StreamStartedEvent) GetAgentName() string {
116-
return ""
120+
return e.AgentName
117121
}
118122

119123
func (e *StreamStartedEvent) isEvent() {}
@@ -253,17 +257,21 @@ func SessionCompaction(sessionID, status string) Event {
253257
func (e *SessionCompactionEvent) isEvent() {}
254258

255259
type StreamStoppedEvent struct {
256-
Type string `json:"type"`
260+
Type string `json:"type"`
261+
SessionID string `json:"session_id,omitempty"`
262+
AgentContext
257263
}
258264

259-
func StreamStopped() Event {
265+
func StreamStopped(sessionID, agentName string) Event {
260266
return &StreamStoppedEvent{
261-
Type: "stream_stopped",
267+
Type: "stream_stopped",
268+
SessionID: sessionID,
269+
AgentContext: AgentContext{AgentName: agentName},
262270
}
263271
}
264272

265273
func (e *StreamStoppedEvent) GetAgentName() string {
266-
return ""
274+
return e.AgentName
267275
}
268276

269277
func (e *StreamStoppedEvent) isEvent() {}

pkg/runtime/runtime.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func (r *runtime) handleOAuthAuthorizationFlow(ctx context.Context, sess *sessio
148148
func (r *runtime) finalizeEventChannel(ctx context.Context, sess *session.Session, events chan Event) {
149149
defer close(events)
150150

151-
events <- StreamStopped()
151+
events <- StreamStopped(sess.ID, r.currentAgent)
152152

153153
telemetry.RecordSessionEnd(ctx)
154154

@@ -172,7 +172,7 @@ func (r *runtime) RunStream(ctx context.Context, sess *session.Session) <-chan E
172172
events <- UserMessage(messages[len(messages)-1].Content)
173173
}
174174

175-
events <- StreamStarted()
175+
events <- StreamStarted(sess.ID, a.Name())
176176

177177
defer r.finalizeEventChannel(ctx, sess, events)
178178

pkg/runtime/runtime_test.go

Lines changed: 8 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -181,10 +181,10 @@ func TestSimple(t *testing.T) {
181181

182182
expectedEvents := []Event{
183183
UserMessage("Hi"),
184-
StreamStarted(),
184+
StreamStarted(sess.ID, "root"),
185185
AgentChoice("root", "Hello"),
186186
TokenUsage(3, 2, 5, 0, 0),
187-
StreamStopped(),
187+
StreamStopped(sess.ID, "root"),
188188
}
189189

190190
require.Equal(t, expectedEvents, events)
@@ -206,14 +206,14 @@ func TestMultipleContentChunks(t *testing.T) {
206206

207207
expectedEvents := []Event{
208208
UserMessage("Please greet me"),
209-
StreamStarted(),
209+
StreamStarted(sess.ID, "root"),
210210
AgentChoice("root", "Hello "),
211211
AgentChoice("root", "there, "),
212212
AgentChoice("root", "how "),
213213
AgentChoice("root", "are "),
214214
AgentChoice("root", "you?"),
215215
TokenUsage(8, 12, 20, 0, 0),
216-
StreamStopped(),
216+
StreamStopped(sess.ID, "root"),
217217
}
218218

219219
require.Equal(t, expectedEvents, events)
@@ -233,12 +233,12 @@ func TestWithReasoning(t *testing.T) {
233233

234234
expectedEvents := []Event{
235235
UserMessage("Hi"),
236-
StreamStarted(),
236+
StreamStarted(sess.ID, "root"),
237237
AgentChoiceReasoning("root", "Let me think about this..."),
238238
AgentChoiceReasoning("root", " I should respond politely."),
239239
AgentChoice("root", "Hello, how can I help you?"),
240240
TokenUsage(10, 15, 25, 0, 0),
241-
StreamStopped(),
241+
StreamStopped(sess.ID, "root"),
242242
}
243243

244244
require.Equal(t, expectedEvents, events)
@@ -259,13 +259,13 @@ func TestMixedContentAndReasoning(t *testing.T) {
259259

260260
expectedEvents := []Event{
261261
UserMessage("Hi there"),
262-
StreamStarted(),
262+
StreamStarted(sess.ID, "root"),
263263
AgentChoiceReasoning("root", "The user wants a greeting"),
264264
AgentChoice("root", "Hello!"),
265265
AgentChoiceReasoning("root", " I should be friendly"),
266266
AgentChoice("root", " How can I help you today?"),
267267
TokenUsage(15, 20, 35, 0, 0),
268-
StreamStopped(),
268+
StreamStopped(sess.ID, "root"),
269269
}
270270

271271
require.Equal(t, expectedEvents, events)
@@ -318,66 +318,6 @@ func TestErrorEvent(t *testing.T) {
318318
require.Contains(t, errorEvent.Error, "simulated error")
319319
}
320320

321-
func TestRuntimeRunStream_TableDriven(t *testing.T) {
322-
tests := []struct {
323-
name string
324-
streamBuilder func() *streamBuilder
325-
userMessage string
326-
expectedEvents []Event
327-
}{
328-
{
329-
name: "single_word_response",
330-
streamBuilder: func() *streamBuilder {
331-
return newStreamBuilder().AddContent("Yes").AddStopWithUsage(2, 1)
332-
},
333-
userMessage: "Confirm",
334-
expectedEvents: []Event{
335-
UserMessage("Confirm"),
336-
StreamStarted(),
337-
AgentChoice("root", "Yes"),
338-
TokenUsage(2, 1, 3, 0, 0),
339-
StreamStopped(),
340-
},
341-
},
342-
{
343-
name: "reasoning_only_response",
344-
streamBuilder: func() *streamBuilder {
345-
return newStreamBuilder().AddReasoning("Thinking...").AddStopWithUsage(5, 3)
346-
},
347-
userMessage: "Think about this",
348-
expectedEvents: []Event{
349-
UserMessage("Think about this"),
350-
StreamStarted(),
351-
AgentChoiceReasoning("root", "Thinking..."),
352-
TokenUsage(5, 3, 8, 0, 0),
353-
StreamStopped(),
354-
},
355-
},
356-
{
357-
name: "zero_token_response",
358-
streamBuilder: func() *streamBuilder {
359-
return newStreamBuilder().AddStopWithUsage(0, 0)
360-
},
361-
userMessage: "Empty",
362-
expectedEvents: []Event{
363-
UserMessage("Empty"),
364-
StreamStarted(),
365-
TokenUsage(0, 0, 0, 0, 0),
366-
StreamStopped(),
367-
},
368-
},
369-
}
370-
371-
for _, tt := range tests {
372-
t.Run(tt.name, func(t *testing.T) {
373-
stream := tt.streamBuilder().Build()
374-
sess := session.New(session.WithUserMessage("", tt.userMessage))
375-
events := runSession(t, sess, stream)
376-
require.Equal(t, tt.expectedEvents, events)
377-
})
378-
}
379-
}
380-
381321
func TestContextCancellation(t *testing.T) {
382322
stream := newStreamBuilder().
383323
AddContent("This should not complete").

0 commit comments

Comments
 (0)