Skip to content

Commit 26c29ab

Browse files
committed
Fix streamed runner hanging when session.add_items raises exception
1 parent db68d1c commit 26c29ab

File tree

2 files changed

+38
-0
lines changed

2 files changed

+38
-0
lines changed

src/agents/run.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1285,6 +1285,14 @@ async def _start_streaming(
12851285
if streamed_result.trace:
12861286
streamed_result.trace.finish(reset_current=True)
12871287

1288+
# Ensure QueueCompleteSentinel is always put in the queue when the stream ends,
1289+
# even if an exception occurs before the inner try/except block (e.g., in
1290+
# _save_result_to_session at the beginning). Without this, stream_events()
1291+
# would hang forever waiting for more items.
1292+
if not streamed_result.is_complete:
1293+
streamed_result.is_complete = True
1294+
streamed_result._event_queue.put_nowait(QueueCompleteSentinel())
1295+
12881296
@classmethod
12891297
async def _run_single_turn_streamed(
12901298
cls,

tests/test_session.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,3 +534,33 @@ def add_item(item):
534534
expected = {f"Message {i}" for i in range(10)}
535535
assert contents == expected
536536
session.close()
537+
538+
539+
@pytest.mark.asyncio
540+
async def test_session_add_items_exception_propagates_in_streamed():
541+
"""Test that exceptions from session.add_items are properly propagated
542+
in run_streamed instead of causing the stream to hang forever.
543+
Regression test for https://github.com/openai/openai-agents-python/issues/2130
544+
"""
545+
session = SQLiteSession("test_exception_session")
546+
547+
async def _failing_add_items(_items):
548+
raise RuntimeError("Simulated session.add_items failure")
549+
550+
session.add_items = _failing_add_items # type: ignore[method-assign]
551+
552+
model = FakeModel()
553+
agent = Agent(name="test", model=model)
554+
model.set_next_output([get_text_message("This should not be reached")])
555+
556+
result = Runner.run_streamed(agent, "Hello", session=session)
557+
558+
async def consume_stream():
559+
async for _event in result.stream_events():
560+
pass
561+
562+
with pytest.raises(RuntimeError, match="Simulated session.add_items failure"):
563+
# Timeout ensures test fails fast instead of hanging forever if bug regresses
564+
await asyncio.wait_for(consume_stream(), timeout=5.0)
565+
566+
session.close()

0 commit comments

Comments
 (0)