-
-
Notifications
You must be signed in to change notification settings - Fork 2.8k
fs: support io_uring with tokio::fs::read
#7696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,166 @@ | ||
| use crate::fs::OpenOptions; | ||
| use crate::runtime::driver::op::Op; | ||
|
|
||
| use std::io; | ||
| use std::io::ErrorKind; | ||
| use std::os::fd::OwnedFd; | ||
| use std::path::Path; | ||
|
|
||
| // this algorithm is inspired from rust std lib version 1.90.0 | ||
| // https://doc.rust-lang.org/1.90.0/src/std/io/mod.rs.html#409 | ||
| const PROBE_SIZE: usize = 32; | ||
| const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32; | ||
|
|
||
| // Max bytes we can read using io uring submission at a time | ||
| // SAFETY: cannot be higher than u32::MAX for safe cast | ||
| const MAX_READ_SIZE: usize = u32::MAX as usize; | ||
|
|
||
| pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> { | ||
| let file = OpenOptions::new().read(true).open(path).await?; | ||
|
|
||
| // TODO: use io uring in the future to obtain metadata | ||
| let size_hint: Option<usize> = file.metadata().await.map(|m| m.len() as usize).ok(); | ||
|
|
||
| let fd: OwnedFd = file | ||
| .try_into_std() | ||
| .expect("unexpected in-flight operation detected") | ||
| .into(); | ||
|
|
||
| // extra single capacity for the whole size to fit without any reallocation | ||
| let buf = Vec::with_capacity(size_hint.unwrap_or(0)); | ||
|
|
||
| read_to_end_uring(size_hint, fd, buf).await | ||
| } | ||
|
|
||
| async fn read_to_end_uring( | ||
| size_hint: Option<usize>, | ||
| mut fd: OwnedFd, | ||
| mut buf: Vec<u8>, | ||
| ) -> io::Result<Vec<u8>> { | ||
| let mut offset = 0; | ||
|
|
||
| let start_cap = buf.capacity(); | ||
|
|
||
| // if buffer has no room and no size_hint, start with a small probe_read from 0 offset | ||
| if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < PROBE_SIZE { | ||
| let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?; | ||
|
|
||
| if size_read == 0 { | ||
| return Ok(r_buf); | ||
| } | ||
|
|
||
| buf = r_buf; | ||
| fd = r_fd; | ||
| offset += size_read as u64; | ||
| } | ||
|
|
||
| loop { | ||
| if buf.len() == buf.capacity() && buf.capacity() == start_cap { | ||
| // The buffer might be an exact fit. Let's read into a probe buffer | ||
| // and see if it returns `Ok(0)`. If so, we've avoided an | ||
| // unnecessary increasing of the capacity. But if not, append the | ||
| // probe buffer to the primary buffer and let its capacity grow. | ||
| let (size_read, r_fd, r_buf) = small_probe_read(fd, buf, offset).await?; | ||
|
|
||
| if size_read == 0 { | ||
| return Ok(r_buf); | ||
| } | ||
|
|
||
| buf = r_buf; | ||
| fd = r_fd; | ||
| offset += size_read as u64; | ||
| } | ||
|
|
||
| // buf is full, need more capacity | ||
| if buf.len() == buf.capacity() { | ||
| buf.try_reserve(PROBE_SIZE)?; | ||
| } | ||
|
|
||
| // doesn't matter if we have a valid size_hint or not, if we do more | ||
| // than 2 consecutive_short_reads, gradually increase the buffer | ||
| // capacity to read more data at a time | ||
|
|
||
| // prepare the spare capacity to be read into | ||
| let buf_len = usize::min(buf.spare_capacity_mut().len(), MAX_READ_SIZE); | ||
|
|
||
| // SAFETY: buf_len cannot be greater than u32::MAX because max_read_size | ||
| // is u32::MAX | ||
| let mut read_len = buf_len as u32; | ||
|
|
||
| loop { | ||
| // read into spare capacity | ||
| let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, offset).await; | ||
|
Comment on lines
+90
to
+92
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This file has two different loops around
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Darksonn The newly changed code has custom handling with the match around the result of |
||
|
|
||
| match res { | ||
| Ok(0) => return Ok(r_buf), | ||
| Ok(size_read) => { | ||
| fd = r_fd; | ||
| buf = r_buf; | ||
| offset += size_read as u64; | ||
| read_len -= size_read; | ||
|
|
||
| // keep reading if there's something left to be read | ||
| if read_len > 0 { | ||
| continue; | ||
| } else { | ||
| break; | ||
| } | ||
| } | ||
| Err(e) if e.kind() == ErrorKind::Interrupted => { | ||
| buf = r_buf; | ||
| fd = r_fd; | ||
|
|
||
| continue; | ||
| } | ||
| Err(e) => return Err(e), | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| async fn small_probe_read( | ||
| mut fd: OwnedFd, | ||
| mut buf: Vec<u8>, | ||
| offset: u64, | ||
| ) -> io::Result<(u32, OwnedFd, Vec<u8>)> { | ||
| let mut temp_arr = [0; PROBE_SIZE]; | ||
| let has_enough = buf.len() > PROBE_SIZE; | ||
|
|
||
| if has_enough { | ||
| // if we have more than PROBE_SIZE bytes in the buffer already then | ||
| // don't call reserve as we might potentially read 0 bytes | ||
| let back_bytes_len = buf.len() - PROBE_SIZE; | ||
| temp_arr.copy_from_slice(&buf[back_bytes_len..]); | ||
| // We're decreasing the length of the buffer and len is greater | ||
| // than PROBE_SIZE. So we can read into the discarded length | ||
| buf.truncate(back_bytes_len); | ||
| } else { | ||
| // we don't even have PROBE_SIZE length in the buffer, we need this | ||
| // reservation | ||
| buf.reserve_exact(PROBE_SIZE); | ||
| } | ||
|
|
||
| loop { | ||
| let (res, r_fd, mut r_buf) = Op::read(fd, buf, PROBE_SIZE_U32, offset).await; | ||
|
|
||
| match res { | ||
| // return early if we inserted into reserved PROBE_SIZE | ||
| // bytes | ||
| Ok(size_read) if !has_enough => return Ok((size_read, r_fd, r_buf)), | ||
| Ok(size_read) => { | ||
| let old_len = r_buf.len() - (size_read as usize); | ||
|
|
||
| r_buf.splice(old_len..old_len, temp_arr); | ||
|
|
||
| return Ok((size_read, r_fd, r_buf)); | ||
| } | ||
| Err(e) if e.kind() == ErrorKind::Interrupted => { | ||
| buf = r_buf; | ||
| fd = r_fd; | ||
|
|
||
| continue; | ||
| } | ||
| Err(e) => return Err(e), | ||
| } | ||
| } | ||
| } | ||
ADD-SP marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| pub(crate) mod open; | ||
| pub(crate) mod read; | ||
| pub(crate) mod utils; | ||
| pub(crate) mod write; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; | ||
|
|
||
| use io_uring::{opcode, types}; | ||
| use std::io::{self, Error}; | ||
| use std::os::fd::{AsRawFd, OwnedFd}; | ||
|
|
||
| #[derive(Debug)] | ||
| pub(crate) struct Read { | ||
| fd: OwnedFd, | ||
| buf: Vec<u8>, | ||
| } | ||
|
|
||
| impl Completable for Read { | ||
| type Output = (io::Result<u32>, OwnedFd, Vec<u8>); | ||
|
|
||
| fn complete(self, cqe: CqeResult) -> Self::Output { | ||
| let mut buf = self.buf; | ||
|
|
||
| if let Ok(len) = cqe.result { | ||
| let new_len = buf.len() + len as usize; | ||
| // SAFETY: Kernel read len bytes | ||
| unsafe { buf.set_len(new_len) }; | ||
| } | ||
|
|
||
| (cqe.result, self.fd, buf) | ||
| } | ||
|
|
||
| fn complete_with_error(self, err: Error) -> Self::Output { | ||
| (Err(err), self.fd, self.buf) | ||
| } | ||
| } | ||
|
|
||
| impl Cancellable for Read { | ||
| fn cancel(self) -> CancelData { | ||
| CancelData::Read(self) | ||
| } | ||
| } | ||
|
|
||
| impl Op<Read> { | ||
| // Submit a request to read a FD at given length and offset into a | ||
| // dynamic buffer with uinitialized memory. The read happens on unitialized | ||
| // buffer and no overwiting happens. | ||
|
|
||
| // SAFETY: The `len` of the amount to be read and the buffer that is passed | ||
| // should have capacity > len. | ||
| // | ||
| // If `len` read is higher than vector capacity then setting its length by | ||
| // the caller in terms of size_read can be unsound. | ||
| pub(crate) fn read(fd: OwnedFd, mut buf: Vec<u8>, len: u32, offset: u64) -> Self { | ||
Daksh14 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // don't overwrite on already written part | ||
| assert!(buf.spare_capacity_mut().len() <= len as usize); | ||
| let buf_mut_ptr = buf.spare_capacity_mut().as_mut_ptr().cast(); | ||
|
|
||
| let read_op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, len) | ||
| .offset(offset) | ||
| .build(); | ||
|
|
||
| // SAFETY: Parameters are valid for the entire duration of the operation | ||
| unsafe { Op::new(read_op, Read { fd, buf }) } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.