Skip to content

pipe::Receiver::try_read may not read from non-empty pipe #7625

@stepancheg

Description

@stepancheg

Version

1.47.1

Platform

macOS 15.6.1 (24G90)

Repro

        let (tx, mut rx) = pipe::pipe().unwrap();

        let mut tx = File::from(tx.into_blocking_fd().unwrap());
        tx.write_all(b"xx").unwrap();

        // This works:
        // let mut buf = Vec::new();
        // rx.read_buf(&mut buf).await.unwrap();
        // eprintln!("buf={buf:?}");

        // `now_or_never` returns `None`, that is, nothing can be read.
        let mut buf = Vec::new();
        rx.read_buf(&mut buf)
            .now_or_never()
            .expect("not yet")
            .unwrap();
        eprintln!("buf={buf:?}");

        // Similarly `try_read` returns `WouldBlock`.
        // rx.try_read(&mut vec![0; 10]).unwrap();

What is happening here

  • we write to a pipe outside of tokio machinery to make sure we don't trigger mio
  • if we read with async, it works fine as expected
  • if we do now_or_never, it does not return anything
  • similarly if try_read is used, WouldBlock is returned

Why it is a problem

Process is terminated, need to read what's left in the pipe. But if the other process is holding the file descriptor (for example, non-terminated child of terminated process), stop reading after the pipe is drained.

We cannot use await because it may hang forever, and without await it does not read.

Workaround

Convert Receiver::into_nonblocking_fd and read using standard block IO until WouldBlock.

Is it expected behavior

I am not sure. Documentation says:

/// Reads any pending data from the pipe but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read()` is non-blocking, the buffer does not have to be stored by
/// the async task and can exist entirely on the stack.

What is "pending data"? I would guess if data is in the pipe, it is pending. So if this is expected, maybe different phrasing needed.

Relevant sources

Implementation of PollEvented::poll_read checking the registration before attemp to read:

impl<E: Source> PollEvented<E> {
// Safety: The caller must ensure that `E` can read into uninitialized memory
pub(crate) unsafe fn poll_read<'a>(
&'a self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>>
where
&'a E: io::Read + 'a,
{
use std::io::Read;
loop {
let evt = ready!(self.registration.poll_read_ready(cx))?;

Similarly, Receiver::try_read checks the registration first without attempt to read from the buffer:

pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}

Possible fix

I think we cannot skip registration check in poll_read without significant performance overhead.

But try_read does not accept cx parameter, so it called after we expect to read something, so try_read may want to skip registration check.

Metadata

Metadata

Assignees

No one assigned

    Labels

    A-tokioArea: The main tokio crateC-bugCategory: This is a bug.M-netModule: tokio/net

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions