Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions tokio/src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,6 @@ pub use self::metadata::metadata;

mod open_options;
pub use self::open_options::OpenOptions;
cfg_io_uring! {
pub(crate) use self::open_options::UringOpenOptions;
}

mod read;
pub use self::read::read;
Expand Down Expand Up @@ -298,6 +295,13 @@ cfg_windows! {
pub use self::symlink_file::symlink_file;
}

cfg_io_uring! {
pub(crate) mod read_uring;
pub(crate) use self::read_uring::read_uring;

pub(crate) use self::open_options::UringOpenOptions;
}

use std::io;

#[cfg(not(test))]
Expand Down
18 changes: 18 additions & 0 deletions tokio/src/fs/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,23 @@ use std::{io, path::Path};
/// ```
pub async fn read(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
let path = path.as_ref().to_owned();

#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux"
))]
{
use crate::fs::read_uring;

let handle = crate::runtime::Handle::current();
let driver_handle = handle.inner.driver().io();
if driver_handle.check_and_init()? {
return read_uring(&path).await;
}
}

asyncify(move || std::fs::read(path)).await
}
166 changes: 166 additions & 0 deletions tokio/src/fs/read_uring.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
use crate::fs::OpenOptions;
use crate::runtime::driver::op::Op;

use std::io;
use std::io::ErrorKind;
use std::os::fd::OwnedFd;
use std::path::Path;

// this algorithm is inspired from rust std lib version 1.90.0
// https://doc.rust-lang.org/1.90.0/src/std/io/mod.rs.html#409
const PROBE_SIZE: usize = 32;
const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32;

// Max bytes we can read using io uring submission at a time
// SAFETY: cannot be higher than u32::MAX for safe cast
const MAX_READ_SIZE: usize = u32::MAX as usize;

pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
let file = OpenOptions::new().read(true).open(path).await?;

// TODO: use io uring in the future to obtain metadata
let size_hint: Option<usize> = file.metadata().await.map(|m| m.len() as usize).ok();

let fd: OwnedFd = file
.try_into_std()
.expect("unexpected in-flight operation detected")
.into();

// extra single capacity for the whole size to fit without any reallocation
let buf = Vec::with_capacity(size_hint.unwrap_or(0));

read_to_end_uring(size_hint, fd, buf).await
}

async fn read_to_end_uring(
size_hint: Option<usize>,
mut fd: OwnedFd,
mut buf: Vec<u8>,
) -> io::Result<Vec<u8>> {
let mut offset = 0;

let start_cap = buf.capacity();

// if buffer has no room and no size_hint, start with a small probe_read from 0 offset
if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < PROBE_SIZE {
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?;

if size_read == 0 {
return Ok(r_buf);
}

buf = r_buf;
fd = r_fd;
offset += size_read as u64;
}

loop {
if buf.len() == buf.capacity() && buf.capacity() == start_cap {
// The buffer might be an exact fit. Let's read into a probe buffer
// and see if it returns `Ok(0)`. If so, we've avoided an
// unnecessary increasing of the capacity. But if not, append the
// probe buffer to the primary buffer and let its capacity grow.
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?;

if size_read == 0 {
return Ok(r_buf);
}

buf = r_buf;
fd = r_fd;
offset += size_read as u64;
}

// buf is full, need more capacity
if buf.len() == buf.capacity() {
buf.try_reserve(PROBE_SIZE)?;
}

// doesn't matter if we have a valid size_hint or not, if we do more
// than 2 consecutive_short_reads, gradually increase the buffer
// capacity to read more data at a time

// prepare the spare capacity to be read into
let buf_len = usize::min(buf.spare_capacity_mut().len(), MAX_READ_SIZE);

// SAFETY: buf_len cannot be greater than u32::MAX because max_read_size
// is u32::MAX
let mut read_len = buf_len as u32;

loop {
// read into spare capacity
let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, offset).await;
Comment on lines +90 to +92
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file has two different loops around Op::read to handle the interrupted error. Perhaps we should instead extract that logic to a separate function call, which behaves just like Op::read except that it retries in case of interrupted, then use that function from both places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Darksonn The newly changed code has custom handling with the match around the result of Op::read so it will be a little hard to put it in a function, I think what we have is fine for now


match res {
Ok(0) => return Ok(r_buf),
Ok(size_read) => {
fd = r_fd;
buf = r_buf;
offset += size_read as u64;
read_len -= size_read;

// keep reading if there's something left to be read
if read_len > 0 {
continue;
} else {
break;
}
}
Err(e) if e.kind() == ErrorKind::Interrupted => {
buf = r_buf;
fd = r_fd;

continue;
}
Err(e) => return Err(e),
}
}
}
}

async fn small_probe_read(
mut fd: OwnedFd,
mut buf: Vec<u8>,
offset: u64,
) -> io::Result<(u32, OwnedFd, Vec<u8>)> {
let mut temp_arr = [0; PROBE_SIZE];
let has_enough = buf.len() > PROBE_SIZE;

if has_enough {
// if we have more than PROBE_SIZE bytes in the buffer already then
// don't call reserve as we might potentially read 0 bytes
let back_bytes_len = buf.len() - PROBE_SIZE;
temp_arr.copy_from_slice(&buf[back_bytes_len..]);
// We're decreasing the length of the buffer and len is greater
// than PROBE_SIZE. So we can read into the discarded length
buf.truncate(back_bytes_len);
} else {
// we don't even have PROBE_SIZE length in the buffer, we need this
// reservation
buf.reserve_exact(PROBE_SIZE);
}

loop {
let (res, r_fd, mut r_buf) = Op::read(fd, buf, PROBE_SIZE_U32, offset).await;

match res {
// return early if we inserted into reserved PROBE_SIZE
// bytes
Ok(size_read) if !has_enough => return Ok((size_read, r_fd, r_buf)),
Ok(size_read) => {
let old_len = r_buf.len() - (size_read as usize);

r_buf.splice(old_len..old_len, temp_arr);

return Ok((size_read, r_fd, r_buf));
}
Err(e) if e.kind() == ErrorKind::Interrupted => {
buf = r_buf;
fd = r_fd;

continue;
}
Err(e) => return Err(e),
}
}
}
1 change: 1 addition & 0 deletions tokio/src/io/uring/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub(crate) mod open;
pub(crate) mod read;
pub(crate) mod utils;
pub(crate) mod write;
61 changes: 61 additions & 0 deletions tokio/src/io/uring/read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op};

use io_uring::{opcode, types};
use std::io::{self, Error};
use std::os::fd::{AsRawFd, OwnedFd};

#[derive(Debug)]
pub(crate) struct Read {
fd: OwnedFd,
buf: Vec<u8>,
}

impl Completable for Read {
type Output = (io::Result<u32>, OwnedFd, Vec<u8>);

fn complete(self, cqe: CqeResult) -> Self::Output {
let mut buf = self.buf;

if let Ok(len) = cqe.result {
let new_len = buf.len() + len as usize;
// SAFETY: Kernel read len bytes
unsafe { buf.set_len(new_len) };
}

(cqe.result, self.fd, buf)
}

fn complete_with_error(self, err: Error) -> Self::Output {
(Err(err), self.fd, self.buf)
}
}

impl Cancellable for Read {
fn cancel(self) -> CancelData {
CancelData::Read(self)
}
}

impl Op<Read> {
// Submit a request to read a FD at given length and offset into a
// dynamic buffer with uinitialized memory. The read happens on unitialized
// buffer and no overwiting happens.

// SAFETY: The `len` of the amount to be read and the buffer that is passed
// should have capacity > len.
//
// If `len` read is higher than vector capacity then setting its length by
// the caller in terms of size_read can be unsound.
pub(crate) fn read(fd: OwnedFd, mut buf: Vec<u8>, len: u32, offset: u64) -> Self {
// don't overwrite on already written part
assert!(buf.spare_capacity_mut().len() <= len as usize);
let buf_mut_ptr = buf.spare_capacity_mut().as_mut_ptr().cast();

let read_op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, len)
.offset(offset)
.build();

// SAFETY: Parameters are valid for the entire duration of the operation
unsafe { Op::new(read_op, Read { fd, buf }) }
}
}
2 changes: 2 additions & 0 deletions tokio/src/runtime/driver/op.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::io::uring::open::Open;
use crate::io::uring::read::Read;
use crate::io::uring::write::Write;
use crate::runtime::Handle;

Expand All @@ -17,6 +18,7 @@ use std::task::{Context, Poll, Waker};
pub(crate) enum CancelData {
Open(Open),
Write(Write),
Read(Read),
}

#[derive(Debug)]
Expand Down
Loading