Skip to content

Commit 636cfb8

Browse files
committed
io-uring: re-think fs read with std lib's implementation
1 parent 856e687 commit 636cfb8

File tree

5 files changed

+294
-61
lines changed

5 files changed

+294
-61
lines changed

tokio/src/fs/mod.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,9 +237,6 @@ pub use self::metadata::metadata;
237237

238238
mod open_options;
239239
pub use self::open_options::OpenOptions;
240-
cfg_io_uring! {
241-
pub(crate) use self::open_options::UringOpenOptions;
242-
}
243240

244241
mod read;
245242
pub use self::read::read;
@@ -298,6 +295,13 @@ cfg_windows! {
298295
pub use self::symlink_file::symlink_file;
299296
}
300297

298+
cfg_io_uring! {
299+
pub(crate) mod read_uring;
300+
pub(crate) use self::read_uring::read_uring;
301+
302+
pub(crate) use self::open_options::UringOpenOptions;
303+
}
304+
301305
use std::io;
302306

303307
#[cfg(not(test))]

tokio/src/fs/read.rs

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ pub async fn read(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
5454
target_os = "linux"
5555
))]
5656
{
57+
use crate::fs::read_uring;
58+
5759
let handle = crate::runtime::Handle::current();
5860
let driver_handle = handle.inner.driver().io();
5961
if driver_handle.check_and_init()? {
@@ -63,40 +65,3 @@ pub async fn read(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
6365

6466
asyncify(move || std::fs::read(path)).await
6567
}
66-
67-
#[cfg(all(
68-
tokio_unstable,
69-
feature = "io-uring",
70-
feature = "rt",
71-
feature = "fs",
72-
target_os = "linux"
73-
))]
74-
async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
75-
use crate::{fs::OpenOptions, runtime::driver::op::Op};
76-
use std::os::fd::OwnedFd;
77-
78-
let file = OpenOptions::new().read(true).open(path).await?;
79-
80-
let size = file
81-
.metadata()
82-
.await
83-
.map(|m| m.len() as usize)
84-
.ok()
85-
.unwrap_or(0);
86-
87-
let buf = Vec::with_capacity(size);
88-
89-
let fd: OwnedFd = file
90-
.try_into_std()
91-
.expect("unexpected in-flight operation detected")
92-
.into();
93-
94-
let (read_size, mut buf) = Op::read(fd, buf)?.await?;
95-
96-
// SAFETY:
97-
// 1. The buffer is initialized with `size` capacity
98-
// 2. the read_size is the number of bytes read from the file
99-
unsafe { buf.set_len(read_size as _) };
100-
101-
Ok(buf)
102-
}

tokio/src/fs/read_uring.rs

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
use crate::fs::OpenOptions;
2+
use crate::runtime::driver::op::Op;
3+
4+
use std::io::ErrorKind;
5+
use std::os::fd::OwnedFd;
6+
use std::path::Path;
7+
use std::{cmp, io};
8+
9+
// this algorithm is inspired from rust std lib version 1.90.0
10+
// https://doc.rust-lang.org/1.90.0/src/std/io/mod.rs.html#409
11+
const PROBE_SIZE: usize = 32;
12+
const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32;
13+
14+
// Max bytes we can read using io uring submission at a time
15+
// SAFETY: cannot be higher than u32::MAX for safe cast
16+
const MAX_READ_SIZE: usize = u32::MAX as usize;
17+
18+
pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
19+
let file = OpenOptions::new().read(true).open(path).await?;
20+
21+
// TODO: use io uring in the future to obtain metadata
22+
let size_hint: Option<usize> = file
23+
.metadata()
24+
.await
25+
.map(|m| usize::try_from(m.len()).unwrap_or(usize::MAX))
26+
.ok();
27+
28+
let fd: OwnedFd = file
29+
.try_into_std()
30+
.expect("unexpected in-flight operation detected")
31+
.into();
32+
33+
let buf = Vec::with_capacity(size_hint.unwrap_or(0) + 1);
34+
35+
read_to_end_uring(size_hint, fd, buf).await
36+
}
37+
38+
async fn read_to_end_uring(
39+
size_hint: Option<usize>,
40+
mut fd: OwnedFd,
41+
mut buf: Vec<u8>,
42+
) -> io::Result<Vec<u8>> {
43+
let mut offset = 0;
44+
let mut consecutive_short_reads = 0;
45+
46+
let start_cap = buf.capacity();
47+
48+
// if buffer has no room and no size_hint, start with a small probe_read from 0 offset
49+
if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < (PROBE_SIZE) {
50+
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?;
51+
52+
if size_read == 0 {
53+
return Ok(r_buf);
54+
}
55+
56+
buf = r_buf;
57+
fd = r_fd;
58+
offset += size_read as u64;
59+
}
60+
61+
loop {
62+
if buf.len() == buf.capacity() && buf.capacity() == start_cap {
63+
// The buffer might be an exact fit. Let's read into a probe buffer
64+
// and see if it returns `Ok(0)`. If so, we've avoided an
65+
// unnecessary increasing of the capacity. But if not, append the
66+
// probe buffer to the primary buffer and let its capacity grow.
67+
let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?;
68+
69+
if size_read == 0 {
70+
return Ok(r_buf);
71+
}
72+
73+
buf = r_buf;
74+
fd = r_fd;
75+
offset += size_read as u64;
76+
}
77+
78+
// buf is full, need more capacity
79+
if buf.len() == buf.capacity() {
80+
if consecutive_short_reads > 1 {
81+
buf.try_reserve(PROBE_SIZE.saturating_mul(consecutive_short_reads))?;
82+
} else {
83+
buf.try_reserve(PROBE_SIZE)?;
84+
}
85+
}
86+
87+
// doesn't matter if we have a valid size_hint or not, if we do more
88+
// than 2 consecutive_short_reads, gradually increase the buffer
89+
// capacity to read more data at a time
90+
91+
// prepare the spare capacity to be read into
92+
let spare = buf.capacity() - buf.len();
93+
let buf_len = cmp::min(spare, MAX_READ_SIZE);
94+
95+
let mut read_len = {
96+
// SAFETY:
97+
// 1. buf_len cannot be greater than u32::MAX because max_read_size
98+
// is u32::MAX
99+
// 2. This is faster than a cast
100+
unsafe { TryInto::<u32>::try_into(buf_len).unwrap_unchecked() }
101+
};
102+
103+
loop {
104+
// read into spare capacity
105+
let (res, r_fd, mut r_buf) = Op::read(fd, buf, read_len, offset).await;
106+
107+
match res {
108+
Ok(size_read) => {
109+
if size_read == 0 {
110+
return Ok(r_buf);
111+
}
112+
113+
let size_read_usize = size_read as usize;
114+
115+
let new_len = size_read_usize + r_buf.len();
116+
// SAFETY: We didn't read more than what as reserved
117+
// as capacity, the _size_read was initialized by the kernel
118+
// via a mutable pointer
119+
unsafe { r_buf.set_len(new_len) }
120+
121+
fd = r_fd;
122+
buf = r_buf;
123+
offset += size_read as u64;
124+
read_len -= size_read;
125+
126+
// 1. In case of no size_hint and a large file, if we keep reading
127+
// PROBE_SIZE, we want to increment number of short reads in order to gradually
128+
// increase read size per Op submission
129+
// 2. In case of small reads by the kernel, also gradually increase
130+
// read size per Op submission to read files in lesser cycles
131+
if size_read_usize <= buf.len() {
132+
consecutive_short_reads += 1;
133+
} else {
134+
consecutive_short_reads = 0;
135+
}
136+
137+
// keep reading if there's something left to be read
138+
if read_len > 0 {
139+
continue;
140+
} else {
141+
break;
142+
}
143+
}
144+
Err(e) => {
145+
if e.kind() == ErrorKind::Interrupted {
146+
buf = r_buf;
147+
fd = r_fd;
148+
149+
continue;
150+
} else {
151+
return Err(e);
152+
}
153+
}
154+
}
155+
}
156+
}
157+
}
158+
159+
async fn small_probe_read(
160+
mut fd: OwnedFd,
161+
mut buf: Vec<u8>,
162+
offset: u64,
163+
) -> io::Result<(u32, OwnedFd, Vec<u8>)> {
164+
// don't reserve more than PROBE_SIZE or double the capacity using
165+
// try_reserve beacuse we'll be reading only PROBE_SIZE length
166+
buf.reserve_exact(PROBE_SIZE);
167+
168+
loop {
169+
let (res, r_fd, mut r_buf) = Op::read(fd, buf, PROBE_SIZE_U32, offset).await;
170+
171+
match res {
172+
Ok(size_read) => {
173+
let size_read_usize = size_read as usize;
174+
175+
let new_len = size_read_usize + r_buf.len();
176+
// SAFETY: We didn't read more than what as reserved
177+
// as capacity, the _size_read was initialized by the kernel
178+
// via a mutable pointer
179+
unsafe { r_buf.set_len(new_len) }
180+
181+
return Ok((size_read, r_fd, r_buf));
182+
}
183+
Err(e) => {
184+
if e.kind() == ErrorKind::Interrupted {
185+
buf = r_buf;
186+
fd = r_fd;
187+
188+
continue;
189+
} else {
190+
return Err(e);
191+
}
192+
}
193+
}
194+
}
195+
}

tokio/src/io/uring/read.rs

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,24 @@
11
use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op};
2+
23
use io_uring::{opcode, types};
3-
use std::{
4-
io,
5-
os::fd::{AsRawFd, OwnedFd},
6-
};
4+
use std::io::{self, Error};
5+
use std::os::fd::{AsRawFd, OwnedFd};
76

87
#[derive(Debug)]
98
pub(crate) struct Read {
10-
buf: Vec<u8>,
119
fd: OwnedFd,
10+
buf: Vec<u8>,
1211
}
1312

1413
impl Completable for Read {
15-
type Output = (u32, Vec<u8>);
16-
fn complete(self, cqe: CqeResult) -> io::Result<Self::Output> {
17-
let res = cqe.result?;
14+
type Output = (io::Result<u32>, OwnedFd, Vec<u8>);
1815

19-
Ok((res, self.buf))
16+
fn complete(self, cqe: CqeResult) -> Self::Output {
17+
(cqe.result, self.fd, self.buf)
18+
}
19+
20+
fn complete_with_error(self, err: Error) -> Self::Output {
21+
(Err(err), self.fd, self.buf)
2022
}
2123
}
2224

@@ -27,16 +29,26 @@ impl Cancellable for Read {
2729
}
2830

2931
impl Op<Read> {
30-
/// Submit a request to open a file.
31-
pub(crate) fn read(fd: OwnedFd, mut buf: Vec<u8>) -> io::Result<Self> {
32-
let buf_mut_ptr = buf.as_mut_ptr();
33-
let cap = buf.capacity() as _;
34-
35-
let read_op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, cap).build();
32+
// Submit a request to read a FD at given length and offset into a
33+
// dynamic buffer with uinitialized memory. The read happens on unitialized
34+
// buffer and no overwiting happens.
35+
36+
// SAFETY: The `len` of the amount to be read and the buffer that is passed
37+
// should have capacity > len.
38+
//
39+
// If `len` read is higher than vector capacity then setting its length by
40+
// the caller in terms of size_read can be unsound.
41+
pub(crate) fn read(fd: OwnedFd, mut buf: Vec<u8>, len: u32, offset: u64) -> Self {
42+
// don't overwrite on already written part
43+
let written = buf.len();
44+
let slice: &mut [u8] = &mut buf[written..];
45+
let buf_mut_ptr = slice.as_mut_ptr();
46+
47+
let read_op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, len)
48+
.offset(offset)
49+
.build();
3650

3751
// SAFETY: Parameters are valid for the entire duration of the operation
38-
let op = unsafe { Op::new(read_op, Read { fd, buf }) };
39-
40-
Ok(op)
52+
unsafe { Op::new(read_op, Read { fd, buf }) }
4153
}
4254
}

0 commit comments

Comments
 (0)