Skip to content

Commit 0bce36f

Browse files
authored
Fix flaky client tests by waiting for first request's reply (#1091)
* Fix flaky client tests by waiting for first request's reply The tests were sending two requests back-to-back: 1. First call without reply=True (to check msg_id is returned) 2. Second call with reply=True (to check reply content) This caused flakiness because the reply for the first request would sit in the channel, and _async_recv_reply would receive it but skip it since msg_ids didn't match. On slow systems (like MacOS CI), the second reply might not arrive before timeout. Fix by using _recv_reply() / _async_recv_reply() to wait for the first request's reply instead of sending a second request. * Fix flaky client tests and handle flush race condition Two fixes: 1. Fix flaky client tests by draining first reply before testing reply=True The tests send two requests back-to-back: - First call without reply=True (to check msg_id is returned) - Second call with reply=True (to check reply content) This caused flakiness because the reply for the first request would sit in the channel, and _async_recv_reply would skip it since msg_ids didn't match. On slow systems (like MacOS CI), the second reply might not arrive before timeout. Fix by draining the first reply before sending the second request. This still tests both paths: returning msg_id and the reply=True convenience functionality. 2. Handle stream closed between flush scheduling and execution The _flush() callback runs on the ioloop thread after being scheduled by flush(). Between scheduling and execution, the stream may be closed by stop_channels() during teardown. This is an expected race condition during shutdown, not a programming error, so we handle it gracefully rather than asserting. Uncovered by qtconsole downstream tests after #1089.
1 parent a53ade9 commit 0bce36f

File tree

2 files changed

+40
-1
lines changed

2 files changed

+40
-1
lines changed

jupyter_client/threaded.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,13 @@ def flush(f: Any) -> None:
214214

215215
def _flush(self) -> None:
216216
"""Callback for :method:`self.flush`."""
217-
assert self.stream is not None
217+
# Race condition: flush() checks stream validity then schedules this
218+
# callback on the ioloop thread. Between scheduling and execution,
219+
# stop_channels() may close the stream (e.g., during teardown).
220+
# Handle gracefully rather than asserting, since this is an expected
221+
# edge case during shutdown, not a programming error.
222+
if self.stream is None or self.stream.closed():
223+
return
218224
self.stream.flush()
219225
self._flushed = True
220226

tests/test_client.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,34 +50,49 @@ def test_history(self):
5050
kc = self.kc
5151
msg_id = kc.history(session=0)
5252
self.assertIsInstance(msg_id, str)
53+
# Drain the first reply to avoid race condition
54+
kc._recv_reply(msg_id, timeout=TIMEOUT)
55+
# Now test the reply=True convenience path
5356
reply = kc.history(session=0, reply=True, timeout=TIMEOUT)
5457
self._check_reply("history", reply)
5558

5659
def test_inspect(self):
5760
kc = self.kc
5861
msg_id = kc.inspect("who cares")
5962
self.assertIsInstance(msg_id, str)
63+
# Drain the first reply to avoid race condition
64+
kc._recv_reply(msg_id, timeout=TIMEOUT)
65+
# Now test the reply=True convenience path
6066
reply = kc.inspect("code", reply=True, timeout=TIMEOUT)
6167
self._check_reply("inspect", reply)
6268

6369
def test_complete(self):
6470
kc = self.kc
6571
msg_id = kc.complete("who cares")
6672
self.assertIsInstance(msg_id, str)
73+
# Drain the first reply to avoid race condition
74+
kc._recv_reply(msg_id, timeout=TIMEOUT)
75+
# Now test the reply=True convenience path
6776
reply = kc.complete("code", reply=True, timeout=TIMEOUT)
6877
self._check_reply("complete", reply)
6978

7079
def test_kernel_info(self):
7180
kc = self.kc
7281
msg_id = kc.kernel_info()
7382
self.assertIsInstance(msg_id, str)
83+
# Drain the first reply to avoid race condition
84+
kc._recv_reply(msg_id, timeout=TIMEOUT)
85+
# Now test the reply=True convenience path
7486
reply = kc.kernel_info(reply=True, timeout=TIMEOUT)
7587
self._check_reply("kernel_info", reply)
7688

7789
def test_comm_info(self):
7890
kc = self.kc
7991
msg_id = kc.comm_info()
8092
self.assertIsInstance(msg_id, str)
93+
# Drain the first reply to avoid race condition
94+
kc._recv_reply(msg_id, timeout=TIMEOUT)
95+
# Now test the reply=True convenience path
8196
reply = kc.comm_info(reply=True, timeout=TIMEOUT)
8297
self._check_reply("comm_info", reply)
8398

@@ -150,36 +165,54 @@ def output_hook(msg):
150165
async def test_history(self, kc):
151166
msg_id = kc.history(session=0)
152167
assert isinstance(msg_id, str)
168+
# Drain the first reply to avoid race condition
169+
await kc._async_recv_reply(msg_id, timeout=TIMEOUT)
170+
# Now test the reply=True convenience path
153171
reply = await kc.history(session=0, reply=True, timeout=TIMEOUT)
154172
self._check_reply("history", reply)
155173

156174
async def test_inspect(self, kc):
157175
msg_id = kc.inspect("who cares")
158176
assert isinstance(msg_id, str)
177+
# Drain the first reply to avoid race condition
178+
await kc._async_recv_reply(msg_id, timeout=TIMEOUT)
179+
# Now test the reply=True convenience path
159180
reply = await kc.inspect("code", reply=True, timeout=TIMEOUT)
160181
self._check_reply("inspect", reply)
161182

162183
async def test_complete(self, kc):
163184
msg_id = kc.complete("who cares")
164185
assert isinstance(msg_id, str)
186+
# Drain the first reply to avoid race condition
187+
await kc._async_recv_reply(msg_id, timeout=TIMEOUT)
188+
# Now test the reply=True convenience path
165189
reply = await kc.complete("code", reply=True, timeout=TIMEOUT)
166190
self._check_reply("complete", reply)
167191

168192
async def test_is_complete(self, kc):
169193
msg_id = kc.is_complete("who cares")
170194
assert isinstance(msg_id, str)
195+
# Drain the first reply to avoid race condition
196+
await kc._async_recv_reply(msg_id, timeout=TIMEOUT)
197+
# Now test the reply=True convenience path
171198
reply = await kc.is_complete("code", reply=True, timeout=TIMEOUT)
172199
self._check_reply("is_complete", reply)
173200

174201
async def test_kernel_info(self, kc):
175202
msg_id = kc.kernel_info()
176203
assert isinstance(msg_id, str)
204+
# Drain the first reply to avoid race condition
205+
await kc._async_recv_reply(msg_id, timeout=TIMEOUT)
206+
# Now test the reply=True convenience path
177207
reply = await kc.kernel_info(reply=True, timeout=TIMEOUT)
178208
self._check_reply("kernel_info", reply)
179209

180210
async def test_comm_info(self, kc):
181211
msg_id = kc.comm_info()
182212
assert isinstance(msg_id, str)
213+
# Drain the first reply to avoid race condition
214+
await kc._async_recv_reply(msg_id, timeout=TIMEOUT)
215+
# Now test the reply=True convenience path
183216
reply = await kc.comm_info(reply=True, timeout=TIMEOUT)
184217
self._check_reply("comm_info", reply)
185218

0 commit comments

Comments
 (0)