Skip to content

Commit 90b21eb

Browse files
committed
io-uring: Implement tokio::fs::read
1 parent 454fd8c commit 90b21eb

File tree

5 files changed

+253
-0
lines changed

5 files changed

+253
-0
lines changed

tokio/src/fs/read.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,58 @@ use std::{io, path::Path};
4545
/// ```
4646
pub async fn read(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
4747
let path = path.as_ref().to_owned();
48+
49+
#[cfg(all(
50+
tokio_unstable,
51+
feature = "io-uring",
52+
feature = "rt",
53+
feature = "fs",
54+
target_os = "linux"
55+
))]
56+
{
57+
let handle = crate::runtime::Handle::current();
58+
let driver_handle = handle.inner.driver().io();
59+
if driver_handle.check_and_init()? {
60+
return read_uring(&path).await;
61+
}
62+
}
63+
4864
asyncify(move || std::fs::read(path)).await
4965
}
66+
67+
#[cfg(all(
68+
tokio_unstable,
69+
feature = "io-uring",
70+
feature = "rt",
71+
feature = "fs",
72+
target_os = "linux"
73+
))]
74+
async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
75+
use crate::{fs::OpenOptions, runtime::driver::op::Op};
76+
use std::os::fd::OwnedFd;
77+
78+
let file = OpenOptions::new().read(true).open(path).await?;
79+
80+
let size = file
81+
.metadata()
82+
.await
83+
.map(|m| m.len() as usize)
84+
.ok()
85+
.unwrap_or(0);
86+
87+
let buf = Vec::with_capacity(size);
88+
89+
let fd: OwnedFd = file
90+
.try_into_std()
91+
.expect("unexpected in-flight operation detected")
92+
.into();
93+
94+
let (read_size, mut buf) = Op::read(fd, buf)?.await?;
95+
96+
// SAFETY:
97+
// 1. The buffer is initialized with `size` capacity
98+
// 2. the read_size is the number of bytes read from the file
99+
unsafe { buf.set_len(read_size as _) };
100+
101+
Ok(buf)
102+
}

tokio/src/io/uring/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub(crate) mod open;
2+
pub(crate) mod read;
23
pub(crate) mod utils;
34
pub(crate) mod write;

tokio/src/io/uring/read.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op};
2+
use io_uring::{opcode, types};
3+
use std::{
4+
io,
5+
os::fd::{AsRawFd, OwnedFd},
6+
};
7+
8+
#[derive(Debug)]
9+
pub(crate) struct Read {
10+
buf: Vec<u8>,
11+
fd: OwnedFd,
12+
}
13+
14+
impl Completable for Read {
15+
type Output = (u32, Vec<u8>);
16+
fn complete(self, cqe: CqeResult) -> io::Result<Self::Output> {
17+
let res = cqe.result?;
18+
19+
Ok((res, self.buf))
20+
}
21+
}
22+
23+
impl Cancellable for Read {
24+
fn cancel(self) -> CancelData {
25+
CancelData::Read(self)
26+
}
27+
}
28+
29+
impl Op<Read> {
30+
/// Submit a request to open a file.
31+
pub(crate) fn read(fd: OwnedFd, mut buf: Vec<u8>) -> io::Result<Self> {
32+
let buf_mut_ptr = buf.as_mut_ptr();
33+
let cap = buf.capacity() as _;
34+
35+
let read_op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, cap).build();
36+
37+
// SAFETY: Parameters are valid for the entire duration of the operation
38+
let op = unsafe { Op::new(read_op, Read { fd, buf }) };
39+
40+
Ok(op)
41+
}
42+
}

tokio/src/runtime/driver/op.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::io::uring::open::Open;
2+
use crate::io::uring::read::Read;
23
use crate::io::uring::write::Write;
34
use crate::runtime::Handle;
45

@@ -17,6 +18,7 @@ use std::task::{Context, Poll, Waker};
1718
pub(crate) enum CancelData {
1819
Open(Open),
1920
Write(Write),
21+
Read(Read),
2022
}
2123

2224
#[derive(Debug)]

tokio/tests/fs_uring_read.rs

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
//! Uring file operations tests.
2+
3+
#![cfg(all(
4+
tokio_unstable,
5+
feature = "io-uring",
6+
feature = "rt",
7+
feature = "fs",
8+
target_os = "linux"
9+
))]
10+
11+
use futures::future::FutureExt;
12+
use std::io::Write;
13+
use std::sync::mpsc;
14+
use std::task::Poll;
15+
use std::time::Duration;
16+
use std::{future::poll_fn, path::PathBuf};
17+
use tempfile::NamedTempFile;
18+
use tokio::{
19+
fs::read,
20+
runtime::{Builder, Runtime},
21+
};
22+
use tokio_util::task::TaskTracker;
23+
24+
fn multi_rt(n: usize) -> Box<dyn Fn() -> Runtime> {
25+
Box::new(move || {
26+
Builder::new_multi_thread()
27+
.worker_threads(n)
28+
.enable_all()
29+
.build()
30+
.unwrap()
31+
})
32+
}
33+
34+
fn current_rt() -> Box<dyn Fn() -> Runtime> {
35+
Box::new(|| Builder::new_current_thread().enable_all().build().unwrap())
36+
}
37+
38+
fn rt_combinations() -> Vec<Box<dyn Fn() -> Runtime>> {
39+
vec![
40+
current_rt(),
41+
multi_rt(1),
42+
multi_rt(2),
43+
multi_rt(8),
44+
multi_rt(64),
45+
multi_rt(256),
46+
]
47+
}
48+
49+
#[test]
50+
fn shutdown_runtime_while_performing_io_uring_ops() {
51+
fn run(rt: Runtime) {
52+
let (tx, rx) = mpsc::channel();
53+
let (done_tx, done_rx) = mpsc::channel();
54+
55+
let (_tmp, path) = create_tmp_files(1);
56+
rt.spawn(async move {
57+
let path = path[0].clone();
58+
59+
// spawning a bunch of uring operations.
60+
loop {
61+
let path = path.clone();
62+
tokio::spawn(async move {
63+
let bytes = read(path).await.unwrap();
64+
65+
assert_eq!(bytes, vec![20; 2]);
66+
});
67+
68+
// Avoid busy looping.
69+
tokio::task::yield_now().await;
70+
}
71+
});
72+
73+
std::thread::spawn(move || {
74+
let rt: Runtime = rx.recv().unwrap();
75+
rt.shutdown_timeout(Duration::from_millis(300));
76+
done_tx.send(()).unwrap();
77+
});
78+
79+
tx.send(rt).unwrap();
80+
done_rx.recv().unwrap();
81+
}
82+
83+
for rt in rt_combinations() {
84+
run(rt());
85+
}
86+
}
87+
88+
#[test]
89+
fn read_many_files() {
90+
fn run(rt: Runtime) {
91+
const NUM_FILES: usize = 512;
92+
93+
let (_tmp_files, paths): (Vec<NamedTempFile>, Vec<PathBuf>) = create_tmp_files(NUM_FILES);
94+
95+
rt.block_on(async move {
96+
let tracker = TaskTracker::new();
97+
98+
for i in 0..10_000 {
99+
let path = paths.get(i % NUM_FILES).unwrap().clone();
100+
tracker.spawn(async move {
101+
let bytes = read(path).await.unwrap();
102+
assert_eq!(bytes, vec![20; 2]);
103+
});
104+
}
105+
tracker.close();
106+
tracker.wait().await;
107+
});
108+
}
109+
110+
for rt in rt_combinations() {
111+
run(rt());
112+
}
113+
}
114+
115+
#[tokio::test]
116+
async fn cancel_op_future() {
117+
let (_tmp_file, path): (Vec<NamedTempFile>, Vec<PathBuf>) = create_tmp_files(1);
118+
119+
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
120+
let handle = tokio::spawn(async move {
121+
poll_fn(|cx| {
122+
let fut = read(&path[0]);
123+
124+
// If io_uring is enabled (and not falling back to the thread pool),
125+
// the first poll should return Pending.
126+
let _pending = Box::pin(fut).poll_unpin(cx);
127+
128+
tx.send(()).unwrap();
129+
130+
Poll::<()>::Pending
131+
})
132+
.await;
133+
});
134+
135+
// Wait for the first poll
136+
rx.recv().await.unwrap();
137+
138+
handle.abort();
139+
140+
let res = handle.await.unwrap_err();
141+
assert!(res.is_cancelled());
142+
}
143+
144+
fn create_tmp_files(num_files: usize) -> (Vec<NamedTempFile>, Vec<PathBuf>) {
145+
let mut files = Vec::with_capacity(num_files);
146+
for _ in 0..num_files {
147+
let mut tmp = NamedTempFile::new().unwrap();
148+
let buf = vec![20; 2];
149+
tmp.write_all(&buf).unwrap();
150+
let path = tmp.path().to_path_buf();
151+
files.push((tmp, path));
152+
}
153+
154+
files.into_iter().unzip()
155+
}

0 commit comments

Comments
 (0)