Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ rustyline = "14"
scroll = "0.12.0"
self_cell = "1.2.0"
semver = "1.0.14"
seq-marked = { version = "0.3.3", features = ["seq-marked-serde", "seq-marked-bincode", "seqv-serde"] }
seq-marked = { version = "0.3.5", features = ["seq-marked-serde", "seq-marked-bincode", "seqv-serde"] }
serde = { version = "1.0.164", features = ["derive", "rc"] }
serde_derive = "1"
serde_ignored = "0.1.10"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use rotbl::v001::SeqMarked;
use crate::leveled_store::map_api::MapKey;
use crate::leveled_store::map_api::MapKeyDecode;
use crate::leveled_store::map_api::MapKeyEncode;
use crate::leveled_store::persisted_codec::PersistedCodec;
use crate::leveled_store::rotbl_codec::RotblCodec;
use crate::leveled_store::value_convert::ValueConvert;

/// A wrapper that implements the `ScopedSnapshot*` trait for the `DB`.
#[derive(Debug, Clone)]
Expand All @@ -42,7 +42,7 @@ where
K: ViewKey,
K: MapKeyEncode + MapKeyDecode,
K::V: ViewValue,
SeqMarked<K::V>: ValueConvert<SeqMarked>,
SeqMarked<K::V>: PersistedCodec<SeqMarked>,
{
async fn get(&self, key: K, _snapshot_seq: u64) -> Result<SeqMarked<K::V>, io::Error> {
// TODO: DB does not consider snapshot_seq
Expand All @@ -55,7 +55,7 @@ where
return Ok(SeqMarked::new_not_found());
};

let marked = SeqMarked::<K::V>::conv_from(seq_marked)?;
let marked = SeqMarked::<K::V>::decode_from(seq_marked)?;
Ok(marked)
}
}
Expand All @@ -68,7 +68,7 @@ where
K: ViewKey,
K: MapKeyEncode + MapKeyDecode,
K::V: ViewValue,
SeqMarked<K::V>: ValueConvert<SeqMarked>,
SeqMarked<K::V>: PersistedCodec<SeqMarked>,
{
async fn range<R>(
&self,
Expand All @@ -88,7 +88,7 @@ where
let strm = strm.map(|res_item: Result<(String, SeqMarked), io::Error>| {
let (str_k, seq_marked) = res_item?;
let key = RotblCodec::decode_key(&str_k)?;
let marked = SeqMarked::conv_from(seq_marked)?;
let marked = SeqMarked::decode_from(seq_marked)?;
Ok((key, marked))
});

Expand Down
6 changes: 3 additions & 3 deletions src/meta/raft-store/src/leveled_store/immutable_data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::leveled_store::level::LevelStat;
use crate::leveled_store::level_index::LevelIndex;
use crate::leveled_store::map_api::MapKeyDecode;
use crate::leveled_store::map_api::MapKeyEncode;
use crate::leveled_store::value_convert::ValueConvert;
use crate::leveled_store::persisted_codec::PersistedCodec;
use crate::leveled_store::ScopedSeqBoundedRead;

mod compact_into_stream;
Expand Down Expand Up @@ -148,7 +148,7 @@ where
K: ViewKey,
K::V: ViewValue,
K: MapKeyEncode + MapKeyDecode,
SeqMarked<K::V>: ValueConvert<SeqMarked>,
SeqMarked<K::V>: PersistedCodec<SeqMarked>,
Immutable: mvcc::ScopedSeqBoundedGet<K, K::V>,
{
async fn get(&self, key: K, snapshot_seq: u64) -> Result<SeqMarked<K::V>, Error> {
Expand All @@ -173,7 +173,7 @@ where
K: ViewKey,
K::V: ViewValue,
K: MapKeyEncode + MapKeyDecode,
SeqMarked<K::V>: ValueConvert<SeqMarked>,
SeqMarked<K::V>: PersistedCodec<SeqMarked>,
Immutable: mvcc::ScopedSeqBoundedRange<K, K::V>,
{
async fn range<R>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::leveled_store::level::Level;
use crate::leveled_store::leveled_map::LeveledMap;
use crate::leveled_store::map_api::MapKeyDecode;
use crate::leveled_store::map_api::MapKeyEncode;
use crate::leveled_store::value_convert::ValueConvert;
use crate::leveled_store::persisted_codec::PersistedCodec;

// TODO: test it
#[async_trait::async_trait]
Expand All @@ -36,7 +36,7 @@ where
K: ViewKey,
K: MapKeyEncode + MapKeyDecode,
K::V: ViewValue,
SeqMarked<K::V>: ValueConvert<SeqMarked>,
SeqMarked<K::V>: PersistedCodec<SeqMarked>,
Level: GetSubTable<K, K::V>,
Immutable: mvcc::ScopedSeqBoundedGet<K, K::V>,
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::leveled_store::level::Level;
use crate::leveled_store::leveled_map::LeveledMap;
use crate::leveled_store::map_api::MapKeyDecode;
use crate::leveled_store::map_api::MapKeyEncode;
use crate::leveled_store::value_convert::ValueConvert;
use crate::leveled_store::persisted_codec::PersistedCodec;

// TODO: test it
#[async_trait::async_trait]
Expand All @@ -45,7 +45,7 @@ where
K: ViewKey,
K: MapKeyEncode + MapKeyDecode,
K::V: ViewValue,
SeqMarked<K::V>: ValueConvert<SeqMarked>,
SeqMarked<K::V>: PersistedCodec<SeqMarked>,
Level: GetSubTable<K, K::V>,
Immutable: mvcc::ScopedSeqBoundedRange<K, K::V>,
{
Expand Down
4 changes: 3 additions & 1 deletion src/meta/raft-store/src/leveled_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ pub mod level;
pub mod level_index;
pub mod leveled_map;
pub mod map_api;
pub mod persisted_codec;
pub mod rotbl_codec;
pub(crate) mod snapshot;
pub mod sys_data;
pub mod sys_data_api;
pub mod types;
pub mod util;
pub mod value_convert;
pub mod view;

#[cfg(test)]
Expand All @@ -39,6 +39,8 @@ mod db_open_snapshot_impl;
mod db_scoped_seq_bounded_read_test;
pub mod immutable_data;
mod key_spaces_impl;
mod rotbl_meta_value_impl;
mod rotbl_seq_data_impl;
mod rotbl_seq_mark_impl;

pub use db_impl_scoped_seq_bounded_read::ScopedSeqBoundedRead;
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

use std::io;

/// Convert one type to another type for this crate to convert between 3rd party types.
pub trait ValueConvert<T>
/// Convert one type `Self` to type `T` for persisting on disk for this crate to convert between 3rd party types.
pub trait PersistedCodec<T>
where Self: Sized
{
/// Convert `Self` to `T`.
fn conv_to(self) -> Result<T, io::Error>;
fn encode_to(self) -> Result<T, io::Error>;

/// Convert `T` to `Self`.
fn conv_from(value: T) -> Result<Self, io::Error>;
/// Parse `T` back to `Self`.
fn decode_from(value: T) -> Result<Self, io::Error>;
}
6 changes: 3 additions & 3 deletions src/meta/raft-store/src/leveled_store/rotbl_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use rotbl::v001::SeqMarked;
use crate::leveled_store::map_api::MapKey;
use crate::leveled_store::map_api::MapKeyDecode;
use crate::leveled_store::map_api::MapKeyEncode;
use crate::leveled_store::value_convert::ValueConvert;
use crate::leveled_store::persisted_codec::PersistedCodec;

pub struct RotblCodec;

Expand Down Expand Up @@ -79,10 +79,10 @@ impl RotblCodec {
where
K: MapKey,
K: MapKeyEncode,
SeqMarked<K::V>: ValueConvert<SeqMarked>,
SeqMarked<K::V>: PersistedCodec<SeqMarked>,
{
let k = Self::encode_key(key)?;
let v = marked.conv_to()?;
let v = marked.encode_to()?;
Ok((k, v))
}

Expand Down
157 changes: 157 additions & 0 deletions src/meta/raft-store/src/leveled_store/rotbl_meta_value_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Implement the conversion between `rotbl::SeqMarked` and `Marked` that is used by this crate.
//!
//! `UserKey`
//! `SeqV <-> SeqMarked<(Option<KVMeta>, bytes)> <-> SeqMarked`
//!
//! `ExpireKey`
//! `ExpireValue <-> SeqMarked<String> <-> SeqMarked`

use std::io;

use state_machine_api::KVMeta;
use state_machine_api::MetaValue;

use crate::leveled_store::persisted_codec::PersistedCodec;

impl PersistedCodec<Vec<u8>> for MetaValue {
fn encode_to(self) -> Result<Vec<u8>, io::Error> {
let (meta, value) = self;

let kv_meta_str = serde_json::to_string(&meta).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("fail to encode KVMeta to json: {}", e),
)
})?;

// version, meta in json string, value
let packed = (1u8, kv_meta_str, value);

let d = bincode::encode_to_vec(packed, bincode_config()).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("fail to encode rotbl value: {}", e),
)
})?;

Ok(d)
}

fn decode_from(data: Vec<u8>) -> Result<Self, io::Error> {
// version, meta, value
let ((ver, meta_str, value), size): ((u8, String, Vec<u8>), usize) =
bincode::decode_from_slice(&data, bincode_config()).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("fail to decode rotbl value: {}", e),
)
})?;

if ver != 1 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("unsupported rotbl value version: {}", ver),
));
}

if size != data.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"remaining bytes in rotbl value: has read: {}, total: {}",
size,
data.len()
),
));
}

let kv_meta: Option<KVMeta> = serde_json::from_str(&meta_str).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("fail to decode KVMeta from rotbl value: {}", e),
)
})?;

let meta_value = (kv_meta, value);
Ok(meta_value)
}
}

fn bincode_config() -> impl bincode::config::Config {
bincode::config::standard()
.with_big_endian()
.with_variable_int_encoding()
}

#[cfg(test)]
mod tests {
use super::*;
use crate::leveled_store::persisted_codec::PersistedCodec;

#[test]
fn test_meta_value_try_from_bytes() -> io::Result<()> {
t_try_from((None, b("hello")), b("\x01\x04null\x05hello"));

t_try_from(
(Some(KVMeta::new_expires_at(20)), b("hello")),
b("\x01\x10{\"expire_at\":20}\x05hello"),
);

Ok(())
}

fn t_try_from(marked: MetaValue, seq_marked: Vec<u8>) {
let got: Vec<u8> = marked.clone().encode_to().unwrap();
assert_eq!(seq_marked, got);

let got = MetaValue::decode_from(got).unwrap();
assert_eq!(marked, got);
}

#[test]
fn test_invalid_data() {
t_invalid(
b("\x00\x10{\"expire_at\":20}\x05hello"),
"unsupported rotbl value version: 0",
);

t_invalid(
b("\x01\x10{\"expire_at\":2x}\x05hello"),
"fail to decode KVMeta from rotbl value: expected `,` or `}` at line 1 column 15",
);

t_invalid(
b("\x01\x10{\"expire_at\":20}\x05h"),
"fail to decode rotbl value: UnexpectedEnd { additional: 4 }",
);

t_invalid(
b("\x01\x10{\"expire_at\":20}\x05hello-"),
"remaining bytes in rotbl value: has read: 24, total: 25",
);
}

fn t_invalid(data: Vec<u8>, want_err: impl ToString) {
let res = MetaValue::decode_from(data);
let err = res.unwrap_err();
assert_eq!(want_err.to_string(), err.to_string());
}

fn b(v: impl ToString) -> Vec<u8> {
v.to_string().into_bytes()
}
}
Loading
Loading