@@ -376,7 +376,8 @@ def __init__(
376376 self ._flush_pending = False
377377 self ._subprocess_flush_pending = False
378378 self ._io_loop = pub_thread .io_loop
379- self ._new_buffer ()
379+ self ._buffer_lock = threading .RLock ()
380+ self ._buffer = StringIO ()
380381 self .echo = None
381382 self ._isatty = bool (isatty )
382383
@@ -533,7 +534,8 @@ def write(self, string: str) -> int:
533534
534535 is_child = (not self ._is_master_process ())
535536 # only touch the buffer in the IO thread to avoid races
536- self .pub_thread .schedule (lambda : self ._buffer .write (string ))
537+ with self ._buffer_lock :
538+ self ._buffer .write (string )
537539 if is_child :
538540 # mp.Pool cannot be trusted to flush promptly (or ever),
539541 # and this helps.
@@ -558,17 +560,15 @@ def writable(self):
558560 return True
559561
560562 def _flush_buffer (self ):
561- """clear the current buffer and return the current buffer data.
562-
563- This should only be called in the IO thread.
564- """
565- data = ''
566- if self ._buffer is not None :
567- buf = self ._buffer
568- self ._new_buffer ()
569- data = buf .getvalue ()
570- buf .close ()
563+ """clear the current buffer and return the current buffer data."""
564+ buf = self ._rotate_buffer ()
565+ data = buf .getvalue ()
566+ buf .close ()
571567 return data
572568
573- def _new_buffer (self ):
574- self ._buffer = StringIO ()
569+ def _rotate_buffer (self ):
570+ """Returns the current buffer and replaces it with an empty buffer."""
571+ with self ._buffer_lock :
572+ old_buffer = self ._buffer
573+ self ._buffer = StringIO ()
574+ return old_buffer
0 commit comments