-
-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Description
Version
1.47.1
Platform
macOS 15.6.1 (24G90)
Repro
let (tx, mut rx) = pipe::pipe().unwrap();
let mut tx = File::from(tx.into_blocking_fd().unwrap());
tx.write_all(b"xx").unwrap();
// This works:
// let mut buf = Vec::new();
// rx.read_buf(&mut buf).await.unwrap();
// eprintln!("buf={buf:?}");
// `now_or_never` returns `None`, that is, nothing can be read.
let mut buf = Vec::new();
rx.read_buf(&mut buf)
.now_or_never()
.expect("not yet")
.unwrap();
eprintln!("buf={buf:?}");
// Similarly `try_read` returns `WouldBlock`.
// rx.try_read(&mut vec![0; 10]).unwrap();
What is happening here
- we write to a pipe outside of tokio machinery to make sure we don't trigger mio
- if we read with
async, it works fine as expected - if we do
now_or_never, it does not return anything - similarly if
try_readis used,WouldBlockis returned
Why it is a problem
Process is terminated, need to read what's left in the pipe. But if the other process is holding the file descriptor (for example, non-terminated child of terminated process), stop reading after the pipe is drained.
We cannot use await because it may hang forever, and without await it does not read.
Workaround
Convert Receiver::into_nonblocking_fd and read using standard block IO until WouldBlock.
Is it expected behavior
I am not sure. Documentation says:
tokio/tokio/src/net/unix/pipe.rs
Lines 1103 to 1106 in 5f3f5b0
| /// Reads any pending data from the pipe but does not wait for new data | |
| /// to arrive. On success, returns the number of bytes read. Because | |
| /// `try_read()` is non-blocking, the buffer does not have to be stored by | |
| /// the async task and can exist entirely on the stack. |
What is "pending data"? I would guess if data is in the pipe, it is pending. So if this is expected, maybe different phrasing needed.
Relevant sources
Implementation of PollEvented::poll_read checking the registration before attemp to read:
tokio/tokio/src/io/poll_evented.rs
Lines 159 to 172 in 5f3f5b0
| impl<E: Source> PollEvented<E> { | |
| // Safety: The caller must ensure that `E` can read into uninitialized memory | |
| pub(crate) unsafe fn poll_read<'a>( | |
| &'a self, | |
| cx: &mut Context<'_>, | |
| buf: &mut ReadBuf<'_>, | |
| ) -> Poll<io::Result<()>> | |
| where | |
| &'a E: io::Read + 'a, | |
| { | |
| use std::io::Read; | |
| loop { | |
| let evt = ready!(self.registration.poll_read_ready(cx))?; |
Similarly, Receiver::try_read checks the registration first without attempt to read from the buffer:
tokio/tokio/src/net/unix/pipe.rs
Lines 1160 to 1164 in 5f3f5b0
| pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { | |
| self.io | |
| .registration() | |
| .try_io(Interest::READABLE, || (&*self.io).read(buf)) | |
| } |
Possible fix
I think we cannot skip registration check in poll_read without significant performance overhead.
But try_read does not accept cx parameter, so it called after we expect to read something, so try_read may want to skip registration check.