Skip to content

Commit d458704

Browse files
committed
refactor(meta-service): respond mget items in stream instead of in a vector
Since this commit, mget won't block the response until all items retrieved. Now it create a stream and return at once. The read IO will happen when the client receives the items. And add logging about the statistics when a read stream is closed: ``` StreamElapsed: total: 104.458µs; items: 1, items/ms: 1, item_latency: 104.458µs; ReadRequest: ListKV(ListKVReq { prefix: "__fd_marked_deleted_index/" }) ```
1 parent c35ba9d commit d458704

File tree

8 files changed

+104
-27
lines changed

8 files changed

+104
-27
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/meta/raft-store/src/sm_v003/sm_v003.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,8 @@ impl SMV003 {
196196
}
197197

198198
pub async fn get_maybe_expired_kv(&self, key: &str) -> Result<Option<SeqV>, io::Error> {
199-
let view = self.data().to_readonly_view();
200-
let got = view.get(UserKey::new(key.to_string())).await?;
199+
let readonly_view = self.data().to_readonly_view();
200+
let got = readonly_view.get(UserKey::new(key.to_string())).await?;
201201
let seqv = Into::<Option<SeqV>>::into(got);
202202
Ok(seqv)
203203
}

src/meta/raft-store/src/sm_v003/sm_v003_kv_api.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use map_api::mvcc::ScopedViewReadonly;
2929
use seq_marked::SeqValue;
3030
use state_machine_api::UserKey;
3131

32+
use crate::leveled_store::leveled_map::ReadonlyView;
3233
use crate::sm_v003::SMV003;
3334
use crate::testing::since_epoch_millis;
3435
use crate::utils::add_cooperative_yielding;
@@ -50,16 +51,13 @@ impl kvapi::KVApi for SMV003KVApi<'_> {
5051

5152
async fn get_kv_stream(&self, keys: &[String]) -> Result<KVStream<Self::Error>, Self::Error> {
5253
let local_now_ms = since_epoch_millis();
54+
let strm = readonly_view_get_kv_stream(
55+
self.sm.data().to_readonly_view(),
56+
keys.to_vec(),
57+
local_now_ms,
58+
);
5359

54-
let mut items = Vec::with_capacity(keys.len());
55-
56-
for k in keys {
57-
let got = self.sm.get_maybe_expired_kv(k).await?;
58-
let v = Self::non_expired(got, local_now_ms);
59-
items.push(Ok(StreamItem::from((k.clone(), v))));
60-
}
61-
62-
Ok(futures::stream::iter(items).boxed())
60+
Ok(strm)
6361
}
6462

6563
async fn list_kv(&self, prefix: &str) -> Result<KVStream<Self::Error>, Self::Error> {
@@ -100,3 +98,21 @@ impl SMV003KVApi<'_> {
10098
}
10199
}
102100
}
101+
102+
/// A helper function that get many keys in stream.
103+
#[futures_async_stream::try_stream(boxed, ok = StreamItem, error = io::Error)]
104+
async fn readonly_view_get_kv_stream(
105+
readonly_view: ReadonlyView,
106+
keys: Vec<String>,
107+
local_now_ms: u64,
108+
) {
109+
for key in keys {
110+
let got = readonly_view.get(UserKey::new(key.clone())).await?;
111+
112+
let seqv = Into::<Option<SeqV>>::into(got);
113+
114+
let non_expired = SMV003KVApi::non_expired(seqv, local_now_ms);
115+
116+
yield StreamItem::from((key, non_expired));
117+
}
118+
}

src/meta/service/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ log = { workspace = true }
4949
logcall = { workspace = true }
5050
map-api = { workspace = true }
5151
maplit = { workspace = true }
52+
pin-project = { workspace = true}
5253
poem = { workspace = true }
5354
prometheus-client = { workspace = true }
5455
prost = { workspace = true }

src/meta/service/src/api/grpc/grpc_service.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
use std::future;
1616
use std::io;
1717
use std::pin::Pin;
18+
use std::sync::atomic::AtomicU64;
1819
use std::sync::Arc;
1920
use std::sync::Weak;
21+
use std::time::Instant;
2022
use std::time::SystemTime;
2123

2224
use arrow_flight::BasicAuth;
@@ -83,6 +85,7 @@ use watcher::util::try_forward;
8385
use watcher::watch_stream::WatchStream;
8486
use watcher::watch_stream::WatchStreamSender;
8587

88+
use crate::api::grpc::OnCompleteStream;
8689
use crate::message::ForwardRequest;
8790
use crate::message::ForwardRequestBody;
8891
use crate::meta_service::watcher::DispatcherHandle;
@@ -327,17 +330,36 @@ impl MetaService for MetaServiceImpl {
327330

328331
let req: MetaGrpcReadReq = GrpcHelper::parse_req(request)?;
329332
let req_typ = req.type_name();
333+
let req_str = format!("ReadRequest: {:?}", req);
330334

331335
ThreadTracker::tracking_future(async move {
332336
let _guard = InFlightRead::guard();
337+
let start = Instant::now();
333338
let (endpoint, strm) = self.handle_kv_read_v1(req).in_span(root).await?;
334339

335-
let strm = strm
336-
.map(move |item| {
337-
network_metrics::incr_stream_sent_item(req_typ);
338-
item
339-
})
340-
.boxed();
340+
// Counter to track total items sent
341+
let count = Arc::new(AtomicU64::new(0));
342+
let count2 = count.clone();
343+
344+
let strm = strm.map(move |item| {
345+
network_metrics::incr_stream_sent_item(req_typ);
346+
count2.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
347+
item
348+
});
349+
350+
// Log the total time and item count when the stream is finished.
351+
let strm = OnCompleteStream::new(strm, move || {
352+
let total = start.elapsed();
353+
let total_items = count.load(std::sync::atomic::Ordering::Relaxed);
354+
let items_per_ms = total_items / (total.as_millis() as u64 + 1);
355+
let latency = total / (total_items.max(1) as u32);
356+
info!(
357+
"StreamElapsed: total: {:?}; items: {}, items/ms: {}, item_latency: {:?}; {}",
358+
total, total_items, items_per_ms, latency, req_str
359+
);
360+
});
361+
362+
let strm = strm.boxed();
341363

342364
let mut resp = Response::new(strm);
343365
GrpcHelper::add_response_meta_leader(&mut resp, endpoint.as_ref());

src/meta/service/src/api/grpc/mod.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,47 @@
1313
// limitations under the License.
1414

1515
pub mod grpc_service;
16+
17+
use std::pin::Pin;
18+
use std::task::Context;
19+
use std::task::Poll;
20+
21+
use futures::Stream;
22+
use pin_project::pin_project;
23+
24+
#[pin_project]
25+
pub(crate) struct OnCompleteStream<S> {
26+
#[pin]
27+
stream: S,
28+
callback: Option<Box<dyn FnOnce() + Send + 'static>>,
29+
}
30+
31+
impl<S> OnCompleteStream<S> {
32+
pub fn new(stream: S, callback: impl FnOnce() + Send + 'static) -> Self {
33+
Self {
34+
stream,
35+
callback: Some(Box::new(callback)),
36+
}
37+
}
38+
}
39+
40+
impl<S: Stream> Stream for OnCompleteStream<S> {
41+
type Item = S::Item;
42+
43+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
44+
let this = self.project();
45+
46+
let res = this.stream.poll_next(cx);
47+
48+
match &res {
49+
Poll::Ready(None) => {
50+
if let Some(callback) = this.callback.take() {
51+
callback();
52+
}
53+
}
54+
_ => {}
55+
}
56+
57+
res
58+
}
59+
}

src/meta/service/src/meta_service/meta_leader.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -133,16 +133,9 @@ impl Handler<MetaGrpcReadReq> for MetaLeader<'_> {
133133

134134
MetaGrpcReadReq::MGetKV(req) => {
135135
// safe unwrap(): Infallible
136-
let values = kv_api.mget_kv(&req.keys).await.unwrap();
136+
let strm = kv_api.get_kv_stream(&req.keys).await.unwrap();
137137

138-
let kv_iter = req
139-
.keys
140-
.clone()
141-
.into_iter()
142-
.zip(values)
143-
.map(|(k, v)| Ok(StreamItem::from((k, v))));
144-
145-
let strm = futures::stream::iter(kv_iter);
138+
let strm = strm.map_err(|e| Status::internal(e.to_string()));
146139

147140
Ok(strm.boxed())
148141
}

src/meta/service/src/meta_service/meta_node.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1393,7 +1393,7 @@ impl MetaNode {
13931393
Err(e) => MetaOperationError::ForwardToLeader(e),
13941394
};
13951395

1396-
// If needs to forward, deal with it. Otherwise return the unhandlable error.
1396+
// If it needs to forward, deal with it. Otherwise, return the unhandlable error.
13971397
let to_leader = match op_err {
13981398
MetaOperationError::ForwardToLeader(err) => err,
13991399
MetaOperationError::DataError(d_err) => {

0 commit comments

Comments
 (0)