Skip to content

Commit b9c3885

Browse files
committed
io-uring: re-think fs read with std lib's implementation
1 parent 90b21eb commit b9c3885

File tree

5 files changed

+272
-61
lines changed

5 files changed

+272
-61
lines changed

tokio/src/fs/mod.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,9 +237,6 @@ pub use self::metadata::metadata;
237237

238238
mod open_options;
239239
pub use self::open_options::OpenOptions;
240-
cfg_io_uring! {
241-
pub(crate) use self::open_options::UringOpenOptions;
242-
}
243240

244241
mod read;
245242
pub use self::read::read;
@@ -298,6 +295,13 @@ cfg_windows! {
298295
pub use self::symlink_file::symlink_file;
299296
}
300297

298+
cfg_io_uring! {
299+
pub(crate) mod read_uring;
300+
pub(crate) use self::read_uring::read_uring;
301+
302+
pub(crate) use self::open_options::UringOpenOptions;
303+
}
304+
301305
use std::io;
302306

303307
#[cfg(not(test))]

tokio/src/fs/read.rs

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ pub async fn read(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
5454
target_os = "linux"
5555
))]
5656
{
57+
use crate::fs::read_uring;
58+
5759
let handle = crate::runtime::Handle::current();
5860
let driver_handle = handle.inner.driver().io();
5961
if driver_handle.check_and_init()? {
@@ -63,40 +65,3 @@ pub async fn read(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
6365

6466
asyncify(move || std::fs::read(path)).await
6567
}
66-
67-
#[cfg(all(
68-
tokio_unstable,
69-
feature = "io-uring",
70-
feature = "rt",
71-
feature = "fs",
72-
target_os = "linux"
73-
))]
74-
async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
75-
use crate::{fs::OpenOptions, runtime::driver::op::Op};
76-
use std::os::fd::OwnedFd;
77-
78-
let file = OpenOptions::new().read(true).open(path).await?;
79-
80-
let size = file
81-
.metadata()
82-
.await
83-
.map(|m| m.len() as usize)
84-
.ok()
85-
.unwrap_or(0);
86-
87-
let buf = Vec::with_capacity(size);
88-
89-
let fd: OwnedFd = file
90-
.try_into_std()
91-
.expect("unexpected in-flight operation detected")
92-
.into();
93-
94-
let (read_size, mut buf) = Op::read(fd, buf)?.await?;
95-
96-
// SAFETY:
97-
// 1. The buffer is initialized with `size` capacity
98-
// 2. the read_size is the number of bytes read from the file
99-
unsafe { buf.set_len(read_size as _) };
100-
101-
Ok(buf)
102-
}

tokio/src/fs/read_uring.rs

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
use crate::fs::OpenOptions;
2+
use crate::runtime::driver::op::Op;
3+
4+
use std::io::ErrorKind;
5+
use std::os::fd::OwnedFd;
6+
use std::path::Path;
7+
use std::{cmp, io};
8+
9+
// this algorithm is inspired from rust std lib version 1.90.0
10+
// https://doc.rust-lang.org/1.90.0/src/std/io/mod.rs.html#409
11+
const PROBE_SIZE: usize = 32;
12+
const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32;
13+
14+
// Max bytes we can read using io uring submission at a time
15+
// SAFETY: cannot be higher than u32::MAX for safe cast
16+
const MAX_READ_SIZE: usize = u32::MAX as usize;
17+
18+
pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
19+
let file = OpenOptions::new().read(true).open(path).await?;
20+
21+
// TODO: use io uring in the future to obtain metadata
22+
let size_hint: Option<usize> = file.metadata().await.map(|m| m.len() as usize).ok();
23+
24+
let fd: OwnedFd = file
25+
.try_into_std()
26+
.expect("unexpected in-flight operation detected")
27+
.into();
28+
29+
// extra single capacity for the whole size to fit without any reallocation
30+
let buf = Vec::with_capacity(size_hint.unwrap_or(0).saturating_add(1));
31+
32+
read_to_end_uring(size_hint, fd, buf).await
33+
}
34+
35+
async fn read_to_end_uring(
36+
size_hint: Option<usize>,
37+
mut fd: OwnedFd,
38+
mut buf: Vec<u8>,
39+
) -> io::Result<Vec<u8>> {
40+
let mut offset = 0;
41+
let mut consecutive_short_reads = 0;
42+
43+
let start_cap = buf.capacity();
44+
45+
// if buffer has no room and no size_hint, start with a small probe_read from 0 offset
46+
if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < (PROBE_SIZE) {
47+
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?;
48+
49+
if size_read == 0 {
50+
return Ok(r_buf);
51+
}
52+
53+
buf = r_buf;
54+
fd = r_fd;
55+
offset += size_read as u64;
56+
}
57+
58+
loop {
59+
if buf.len() == buf.capacity() && buf.capacity() == start_cap {
60+
// The buffer might be an exact fit. Let's read into a probe buffer
61+
// and see if it returns `Ok(0)`. If so, we've avoided an
62+
// unnecessary increasing of the capacity. But if not, append the
63+
// probe buffer to the primary buffer and let its capacity grow.
64+
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?;
65+
66+
if size_read == 0 {
67+
return Ok(r_buf);
68+
}
69+
70+
buf = r_buf;
71+
fd = r_fd;
72+
offset += size_read as u64;
73+
}
74+
75+
// buf is full, need more capacity
76+
if buf.len() == buf.capacity() {
77+
if consecutive_short_reads > 1 {
78+
buf.try_reserve(PROBE_SIZE.saturating_mul(consecutive_short_reads))?;
79+
} else {
80+
buf.try_reserve(PROBE_SIZE)?;
81+
}
82+
}
83+
84+
// doesn't matter if we have a valid size_hint or not, if we do more
85+
// than 2 consecutive_short_reads, gradually increase the buffer
86+
// capacity to read more data at a time
87+
88+
// prepare the spare capacity to be read into
89+
let spare = buf.capacity() - buf.len();
90+
let buf_len = cmp::min(spare, MAX_READ_SIZE);
91+
92+
// SAFETY: buf_len cannot be greater than u32::MAX because max_read_size
93+
// is u32::MAX
94+
let mut read_len = buf_len as u32;
95+
96+
loop {
97+
// read into spare capacity
98+
let (res, r_fd, mut r_buf) = Op::read(fd, buf, read_len, offset).await;
99+
100+
match res {
101+
Ok(0) => return Ok(r_buf),
102+
Ok(size_read) => {
103+
let new_len = size_read as usize + r_buf.len();
104+
// SAFETY: We didn't read more than what as reserved
105+
// as capacity, the _size_read was initialized by the kernel
106+
// via a mutable pointer
107+
unsafe { r_buf.set_len(new_len) }
108+
109+
let requested = read_len;
110+
111+
fd = r_fd;
112+
buf = r_buf;
113+
offset += size_read as u64;
114+
read_len -= size_read;
115+
116+
// 1. In case of no size_hint and a large file, if we keep reading
117+
// PROBE_SIZE, we want to increment number of short reads in order to gradually
118+
// increase read size per Op submission
119+
// 2. In case of small reads by the kernel, also gradually increase
120+
// read size per Op submission to read files in lesser cycles
121+
if size_read <= requested {
122+
consecutive_short_reads += 1;
123+
} else {
124+
consecutive_short_reads = 0;
125+
}
126+
127+
// keep reading if there's something left to be read
128+
if read_len > 0 {
129+
continue;
130+
} else {
131+
break;
132+
}
133+
}
134+
Err(e) if e.kind() == ErrorKind::Interrupted => {
135+
buf = r_buf;
136+
fd = r_fd;
137+
138+
continue;
139+
}
140+
Err(e) => return Err(e),
141+
}
142+
}
143+
}
144+
}
145+
146+
async fn small_probe_read(
147+
mut fd: OwnedFd,
148+
mut buf: Vec<u8>,
149+
offset: u64,
150+
) -> io::Result<(u32, OwnedFd, Vec<u8>)> {
151+
// don't reserve more than PROBE_SIZE or double the capacity using
152+
// try_reserve beacuse we'll be reading only PROBE_SIZE length
153+
buf.reserve_exact(PROBE_SIZE);
154+
155+
loop {
156+
let (res, r_fd, mut r_buf) = Op::read(fd, buf, PROBE_SIZE_U32, offset).await;
157+
158+
match res {
159+
Ok(size_read) => {
160+
let size_read_usize = size_read as usize;
161+
162+
let new_len = size_read_usize + r_buf.len();
163+
// SAFETY: We didn't read more than what as reserved
164+
// as capacity, the _size_read was initialized by the kernel
165+
// via a mutable pointer
166+
unsafe { r_buf.set_len(new_len) }
167+
168+
return Ok((size_read, r_fd, r_buf));
169+
}
170+
Err(e) if e.kind() == ErrorKind::Interrupted => {
171+
buf = r_buf;
172+
fd = r_fd;
173+
174+
continue;
175+
}
176+
Err(e) => return Err(e),
177+
}
178+
}
179+
}

tokio/src/io/uring/read.rs

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,24 @@
11
use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op};
2+
23
use io_uring::{opcode, types};
3-
use std::{
4-
io,
5-
os::fd::{AsRawFd, OwnedFd},
6-
};
4+
use std::io::{self, Error};
5+
use std::os::fd::{AsRawFd, OwnedFd};
76

87
#[derive(Debug)]
98
pub(crate) struct Read {
10-
buf: Vec<u8>,
119
fd: OwnedFd,
10+
buf: Vec<u8>,
1211
}
1312

1413
impl Completable for Read {
15-
type Output = (u32, Vec<u8>);
16-
fn complete(self, cqe: CqeResult) -> io::Result<Self::Output> {
17-
let res = cqe.result?;
14+
type Output = (io::Result<u32>, OwnedFd, Vec<u8>);
1815

19-
Ok((res, self.buf))
16+
fn complete(self, cqe: CqeResult) -> Self::Output {
17+
(cqe.result, self.fd, self.buf)
18+
}
19+
20+
fn complete_with_error(self, err: Error) -> Self::Output {
21+
(Err(err), self.fd, self.buf)
2022
}
2123
}
2224

@@ -27,16 +29,26 @@ impl Cancellable for Read {
2729
}
2830

2931
impl Op<Read> {
30-
/// Submit a request to open a file.
31-
pub(crate) fn read(fd: OwnedFd, mut buf: Vec<u8>) -> io::Result<Self> {
32-
let buf_mut_ptr = buf.as_mut_ptr();
33-
let cap = buf.capacity() as _;
34-
35-
let read_op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, cap).build();
32+
// Submit a request to read a FD at given length and offset into a
33+
// dynamic buffer with uinitialized memory. The read happens on unitialized
34+
// buffer and no overwiting happens.
35+
36+
// SAFETY: The `len` of the amount to be read and the buffer that is passed
37+
// should have capacity > len.
38+
//
39+
// If `len` read is higher than vector capacity then setting its length by
40+
// the caller in terms of size_read can be unsound.
41+
pub(crate) fn read(fd: OwnedFd, mut buf: Vec<u8>, len: u32, offset: u64) -> Self {
42+
// don't overwrite on already written part
43+
let written = buf.len();
44+
let slice: &mut [u8] = &mut buf[written..];
45+
let buf_mut_ptr = slice.as_mut_ptr();
46+
47+
let read_op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, len)
48+
.offset(offset)
49+
.build();
3650

3751
// SAFETY: Parameters are valid for the entire duration of the operation
38-
let op = unsafe { Op::new(read_op, Read { fd, buf }) };
39-
40-
Ok(op)
52+
unsafe { Op::new(read_op, Read { fd, buf }) }
4153
}
4254
}

0 commit comments

Comments
 (0)