Skip to content

Commit b26e7bf

Browse files
authored
feat(meta-service): add snapshot V004 streaming protocol (#18763)
* feat(meta-service): add snapshot V004 streaming protocol Implements KV-entry streaming for snapshot transmission, replacing V003's binary chunk approach in future. The new protocol streams individual key-value entries in protobuf format, enabling transfers and cross-format compatibility for rolling upgrades. Key improvements: - Memory efficient: processes entries as they arrive vs buffering entire snapshot - Cross-format support: enables rolling upgrades between snapshot formats - Fallback compatibility: automatically falls back to V003 for older nodes - Structured data: protobuf streaming vs raw binary chunks * M src/meta/service/src/meta_service/raft_service_impl.rs * M Cargo.lock * M src/meta/types/Cargo.toml
1 parent b33e779 commit b26e7bf

File tree

19 files changed

+997
-122
lines changed

19 files changed

+997
-122
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ rustyline = "14"
472472
scroll = "0.12.0"
473473
self_cell = "1.2.0"
474474
semver = "1.0.14"
475-
seq-marked = { version = "0.3.3", features = ["seq-marked-serde", "seq-marked-bincode", "seqv-serde"] }
475+
seq-marked = { version = "0.3.5", features = ["seq-marked-serde", "seq-marked-bincode", "seqv-serde"] }
476476
serde = { version = "1.0.164", features = ["derive", "rc"] }
477477
serde_derive = "1"
478478
serde_ignored = "0.1.10"

src/meta/raft-store/src/leveled_store/db_impl_scoped_seq_bounded_read.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ use rotbl::v001::SeqMarked;
2727
use crate::leveled_store::map_api::MapKey;
2828
use crate::leveled_store::map_api::MapKeyDecode;
2929
use crate::leveled_store::map_api::MapKeyEncode;
30+
use crate::leveled_store::persisted_codec::PersistedCodec;
3031
use crate::leveled_store::rotbl_codec::RotblCodec;
31-
use crate::leveled_store::value_convert::ValueConvert;
3232

3333
/// A wrapper that implements the `ScopedSnapshot*` trait for the `DB`.
3434
#[derive(Debug, Clone)]
@@ -42,7 +42,7 @@ where
4242
K: ViewKey,
4343
K: MapKeyEncode + MapKeyDecode,
4444
K::V: ViewValue,
45-
SeqMarked<K::V>: ValueConvert<SeqMarked>,
45+
SeqMarked<K::V>: PersistedCodec<SeqMarked>,
4646
{
4747
async fn get(&self, key: K, _snapshot_seq: u64) -> Result<SeqMarked<K::V>, io::Error> {
4848
// TODO: DB does not consider snapshot_seq
@@ -55,7 +55,7 @@ where
5555
return Ok(SeqMarked::new_not_found());
5656
};
5757

58-
let marked = SeqMarked::<K::V>::conv_from(seq_marked)?;
58+
let marked = SeqMarked::<K::V>::decode_from(seq_marked)?;
5959
Ok(marked)
6060
}
6161
}
@@ -68,7 +68,7 @@ where
6868
K: ViewKey,
6969
K: MapKeyEncode + MapKeyDecode,
7070
K::V: ViewValue,
71-
SeqMarked<K::V>: ValueConvert<SeqMarked>,
71+
SeqMarked<K::V>: PersistedCodec<SeqMarked>,
7272
{
7373
async fn range<R>(
7474
&self,
@@ -88,7 +88,7 @@ where
8888
let strm = strm.map(|res_item: Result<(String, SeqMarked), io::Error>| {
8989
let (str_k, seq_marked) = res_item?;
9090
let key = RotblCodec::decode_key(&str_k)?;
91-
let marked = SeqMarked::conv_from(seq_marked)?;
91+
let marked = SeqMarked::decode_from(seq_marked)?;
9292
Ok((key, marked))
9393
});
9494

src/meta/raft-store/src/leveled_store/immutable_data/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::leveled_store::level::LevelStat;
3939
use crate::leveled_store::level_index::LevelIndex;
4040
use crate::leveled_store::map_api::MapKeyDecode;
4141
use crate::leveled_store::map_api::MapKeyEncode;
42-
use crate::leveled_store::value_convert::ValueConvert;
42+
use crate::leveled_store::persisted_codec::PersistedCodec;
4343
use crate::leveled_store::ScopedSeqBoundedRead;
4444

4545
mod compact_into_stream;
@@ -148,7 +148,7 @@ where
148148
K: ViewKey,
149149
K::V: ViewValue,
150150
K: MapKeyEncode + MapKeyDecode,
151-
SeqMarked<K::V>: ValueConvert<SeqMarked>,
151+
SeqMarked<K::V>: PersistedCodec<SeqMarked>,
152152
Immutable: mvcc::ScopedSeqBoundedGet<K, K::V>,
153153
{
154154
async fn get(&self, key: K, snapshot_seq: u64) -> Result<SeqMarked<K::V>, Error> {
@@ -173,7 +173,7 @@ where
173173
K: ViewKey,
174174
K::V: ViewValue,
175175
K: MapKeyEncode + MapKeyDecode,
176-
SeqMarked<K::V>: ValueConvert<SeqMarked>,
176+
SeqMarked<K::V>: PersistedCodec<SeqMarked>,
177177
Immutable: mvcc::ScopedSeqBoundedRange<K, K::V>,
178178
{
179179
async fn range<R>(

src/meta/raft-store/src/leveled_store/leveled_map/impl_scoped_seq_bounded_get.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::leveled_store::level::Level;
2626
use crate::leveled_store::leveled_map::LeveledMap;
2727
use crate::leveled_store::map_api::MapKeyDecode;
2828
use crate::leveled_store::map_api::MapKeyEncode;
29-
use crate::leveled_store::value_convert::ValueConvert;
29+
use crate::leveled_store::persisted_codec::PersistedCodec;
3030

3131
// TODO: test it
3232
#[async_trait::async_trait]
@@ -36,7 +36,7 @@ where
3636
K: ViewKey,
3737
K: MapKeyEncode + MapKeyDecode,
3838
K::V: ViewValue,
39-
SeqMarked<K::V>: ValueConvert<SeqMarked>,
39+
SeqMarked<K::V>: PersistedCodec<SeqMarked>,
4040
Level: GetSubTable<K, K::V>,
4141
Immutable: mvcc::ScopedSeqBoundedGet<K, K::V>,
4242
{

src/meta/raft-store/src/leveled_store/leveled_map/impl_scoped_seq_bounded_range.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::leveled_store::level::Level;
3535
use crate::leveled_store::leveled_map::LeveledMap;
3636
use crate::leveled_store::map_api::MapKeyDecode;
3737
use crate::leveled_store::map_api::MapKeyEncode;
38-
use crate::leveled_store::value_convert::ValueConvert;
38+
use crate::leveled_store::persisted_codec::PersistedCodec;
3939

4040
// TODO: test it
4141
#[async_trait::async_trait]
@@ -45,7 +45,7 @@ where
4545
K: ViewKey,
4646
K: MapKeyEncode + MapKeyDecode,
4747
K::V: ViewValue,
48-
SeqMarked<K::V>: ValueConvert<SeqMarked>,
48+
SeqMarked<K::V>: PersistedCodec<SeqMarked>,
4949
Level: GetSubTable<K, K::V>,
5050
Immutable: mvcc::ScopedSeqBoundedRange<K, K::V>,
5151
{

src/meta/raft-store/src/leveled_store/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ pub mod level;
2121
pub mod level_index;
2222
pub mod leveled_map;
2323
pub mod map_api;
24+
pub mod persisted_codec;
2425
pub mod rotbl_codec;
2526
pub(crate) mod snapshot;
2627
pub mod sys_data;
2728
pub mod sys_data_api;
2829
pub mod types;
2930
pub mod util;
30-
pub mod value_convert;
3131
pub mod view;
3232

3333
#[cfg(test)]
@@ -39,6 +39,8 @@ mod db_open_snapshot_impl;
3939
mod db_scoped_seq_bounded_read_test;
4040
pub mod immutable_data;
4141
mod key_spaces_impl;
42+
mod rotbl_meta_value_impl;
43+
mod rotbl_seq_data_impl;
4244
mod rotbl_seq_mark_impl;
4345

4446
pub use db_impl_scoped_seq_bounded_read::ScopedSeqBoundedRead;

src/meta/raft-store/src/leveled_store/value_convert.rs renamed to src/meta/raft-store/src/leveled_store/persisted_codec.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616
1717
use std::io;
1818

19-
/// Convert one type to another type for this crate to convert between 3rd party types.
20-
pub trait ValueConvert<T>
19+
/// Convert one type `Self` to type `T` for persisting on disk for this crate to convert between 3rd party types.
20+
pub trait PersistedCodec<T>
2121
where Self: Sized
2222
{
2323
/// Convert `Self` to `T`.
24-
fn conv_to(self) -> Result<T, io::Error>;
24+
fn encode_to(self) -> Result<T, io::Error>;
2525

26-
/// Convert `T` to `Self`.
27-
fn conv_from(value: T) -> Result<Self, io::Error>;
26+
/// Parse `T` back to `Self`.
27+
fn decode_from(value: T) -> Result<Self, io::Error>;
2828
}

src/meta/raft-store/src/leveled_store/rotbl_codec.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use rotbl::v001::SeqMarked;
2525
use crate::leveled_store::map_api::MapKey;
2626
use crate::leveled_store::map_api::MapKeyDecode;
2727
use crate::leveled_store::map_api::MapKeyEncode;
28-
use crate::leveled_store::value_convert::ValueConvert;
28+
use crate::leveled_store::persisted_codec::PersistedCodec;
2929

3030
pub struct RotblCodec;
3131

@@ -79,10 +79,10 @@ impl RotblCodec {
7979
where
8080
K: MapKey,
8181
K: MapKeyEncode,
82-
SeqMarked<K::V>: ValueConvert<SeqMarked>,
82+
SeqMarked<K::V>: PersistedCodec<SeqMarked>,
8383
{
8484
let k = Self::encode_key(key)?;
85-
let v = marked.conv_to()?;
85+
let v = marked.encode_to()?;
8686
Ok((k, v))
8787
}
8888

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! Implement the conversion between `rotbl::SeqMarked` and `Marked` that is used by this crate.
16+
//!
17+
//! `UserKey`
18+
//! `SeqV <-> SeqMarked<(Option<KVMeta>, bytes)> <-> SeqMarked`
19+
//!
20+
//! `ExpireKey`
21+
//! `ExpireValue <-> SeqMarked<String> <-> SeqMarked`
22+
23+
use std::io;
24+
25+
use state_machine_api::KVMeta;
26+
use state_machine_api::MetaValue;
27+
28+
use crate::leveled_store::persisted_codec::PersistedCodec;
29+
30+
impl PersistedCodec<Vec<u8>> for MetaValue {
31+
fn encode_to(self) -> Result<Vec<u8>, io::Error> {
32+
let (meta, value) = self;
33+
34+
let kv_meta_str = serde_json::to_string(&meta).map_err(|e| {
35+
io::Error::new(
36+
io::ErrorKind::InvalidData,
37+
format!("fail to encode KVMeta to json: {}", e),
38+
)
39+
})?;
40+
41+
// version, meta in json string, value
42+
let packed = (1u8, kv_meta_str, value);
43+
44+
let d = bincode::encode_to_vec(packed, bincode_config()).map_err(|e| {
45+
io::Error::new(
46+
io::ErrorKind::InvalidData,
47+
format!("fail to encode rotbl value: {}", e),
48+
)
49+
})?;
50+
51+
Ok(d)
52+
}
53+
54+
fn decode_from(data: Vec<u8>) -> Result<Self, io::Error> {
55+
// version, meta, value
56+
let ((ver, meta_str, value), size): ((u8, String, Vec<u8>), usize) =
57+
bincode::decode_from_slice(&data, bincode_config()).map_err(|e| {
58+
io::Error::new(
59+
io::ErrorKind::InvalidData,
60+
format!("fail to decode rotbl value: {}", e),
61+
)
62+
})?;
63+
64+
if ver != 1 {
65+
return Err(io::Error::new(
66+
io::ErrorKind::InvalidData,
67+
format!("unsupported rotbl value version: {}", ver),
68+
));
69+
}
70+
71+
if size != data.len() {
72+
return Err(io::Error::new(
73+
io::ErrorKind::InvalidData,
74+
format!(
75+
"remaining bytes in rotbl value: has read: {}, total: {}",
76+
size,
77+
data.len()
78+
),
79+
));
80+
}
81+
82+
let kv_meta: Option<KVMeta> = serde_json::from_str(&meta_str).map_err(|e| {
83+
io::Error::new(
84+
io::ErrorKind::InvalidData,
85+
format!("fail to decode KVMeta from rotbl value: {}", e),
86+
)
87+
})?;
88+
89+
let meta_value = (kv_meta, value);
90+
Ok(meta_value)
91+
}
92+
}
93+
94+
fn bincode_config() -> impl bincode::config::Config {
95+
bincode::config::standard()
96+
.with_big_endian()
97+
.with_variable_int_encoding()
98+
}
99+
100+
#[cfg(test)]
101+
mod tests {
102+
use super::*;
103+
use crate::leveled_store::persisted_codec::PersistedCodec;
104+
105+
#[test]
106+
fn test_meta_value_try_from_bytes() -> io::Result<()> {
107+
t_try_from((None, b("hello")), b("\x01\x04null\x05hello"));
108+
109+
t_try_from(
110+
(Some(KVMeta::new_expires_at(20)), b("hello")),
111+
b("\x01\x10{\"expire_at\":20}\x05hello"),
112+
);
113+
114+
Ok(())
115+
}
116+
117+
fn t_try_from(marked: MetaValue, seq_marked: Vec<u8>) {
118+
let got: Vec<u8> = marked.clone().encode_to().unwrap();
119+
assert_eq!(seq_marked, got);
120+
121+
let got = MetaValue::decode_from(got).unwrap();
122+
assert_eq!(marked, got);
123+
}
124+
125+
#[test]
126+
fn test_invalid_data() {
127+
t_invalid(
128+
b("\x00\x10{\"expire_at\":20}\x05hello"),
129+
"unsupported rotbl value version: 0",
130+
);
131+
132+
t_invalid(
133+
b("\x01\x10{\"expire_at\":2x}\x05hello"),
134+
"fail to decode KVMeta from rotbl value: expected `,` or `}` at line 1 column 15",
135+
);
136+
137+
t_invalid(
138+
b("\x01\x10{\"expire_at\":20}\x05h"),
139+
"fail to decode rotbl value: UnexpectedEnd { additional: 4 }",
140+
);
141+
142+
t_invalid(
143+
b("\x01\x10{\"expire_at\":20}\x05hello-"),
144+
"remaining bytes in rotbl value: has read: 24, total: 25",
145+
);
146+
}
147+
148+
fn t_invalid(data: Vec<u8>, want_err: impl ToString) {
149+
let res = MetaValue::decode_from(data);
150+
let err = res.unwrap_err();
151+
assert_eq!(want_err.to_string(), err.to_string());
152+
}
153+
154+
fn b(v: impl ToString) -> Vec<u8> {
155+
v.to_string().into_bytes()
156+
}
157+
}

0 commit comments

Comments
 (0)