Skip to content

Commit c5e901a

Browse files
authored
refactor: meta-service: simplify raft store and state machine (#18749)
- Remove `is_opened` flag from `RaftStore` - Remove obsolete config no_sync, which is only used by sled tree store - Add config to MetaRaftLog - Remove `RaftStoreInner`
1 parent 6f596db commit c5e901a

File tree

19 files changed

+373
-494
lines changed

19 files changed

+373
-494
lines changed

src/meta/control/src/export_from_disk.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,9 @@
1414

1515
use std::fs::File;
1616
use std::io::Write;
17-
use std::sync::Arc;
1817

1918
use databend_common_meta_raft_store::config::RaftConfig;
20-
use databend_meta::store::RaftStoreInner;
19+
use databend_meta::store::RaftStore;
2120
use futures::TryStreamExt;
2221

2322
use crate::args::ExportArgs;
@@ -36,8 +35,8 @@ pub async fn export_from_dir(args: &ExportArgs) -> anyhow::Result<()> {
3635
eprintln!();
3736
eprintln!("Export:");
3837

39-
let sto_inn = RaftStoreInner::open(&raft_config).await?;
40-
let mut lines = Arc::new(sto_inn).export();
38+
let sto = RaftStore::open(&raft_config).await?;
39+
let mut lines = sto.clone().export();
4140

4241
eprintln!(" From: {}", raft_config.raft_dir);
4342

src/meta/control/src/import.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ async fn init_new_cluster(
237237
payload: EntryPayload::Membership(membership),
238238
};
239239

240-
let mut log = sto.log.clone();
240+
let mut log = sto.log().clone();
241241
log.blocking_append([entry]).await?;
242242

243243
// insert AddNodes logs
@@ -261,7 +261,7 @@ async fn init_new_cluster(
261261

262262
// Reset node id
263263
{
264-
let mut log = sto.log.write().await;
264+
let mut log = sto.log().write().await;
265265
log.save_user_data(Some(raft_log_v004::LogStoreMeta {
266266
node_id: Some(args.id),
267267
}))?;

src/meta/raft-store/src/config.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,6 @@ pub struct RaftConfig {
5353
/// The dir to store persisted meta state, including raft logs, state machine etc.
5454
pub raft_dir: String,
5555

56-
/// Whether to fsync meta to disk for every meta write(raft log, state machine etc).
57-
/// No-sync brings risks of data loss during a crash.
58-
/// You should only use this in a testing environment, unless YOU KNOW WHAT YOU ARE DOING.
59-
pub no_sync: bool,
60-
6156
/// Maximum log entries to cache in memory.
6257
///
6358
/// Higher values improve read performance but use more memory.
@@ -188,7 +183,6 @@ impl Default for RaftConfig {
188183
raft_advertise_host: get_default_raft_advertise_host(),
189184
raft_api_port: 28004,
190185
raft_dir: "./.databend/meta".to_string(),
191-
no_sync: false,
192186

193187
log_cache_max_items: 1_000_000,
194188
log_cache_capacity: 1024 * 1024 * 1024,
@@ -283,11 +277,6 @@ impl RaftConfig {
283277
}
284278
}
285279

286-
/// Returns true to fsync after a write operation to meta.
287-
pub fn is_sync(&self) -> bool {
288-
!self.no_sync
289-
}
290-
291280
/// Returns the min and max election timeout, in milli seconds.
292281
///
293282
/// Raft will choose a random timeout in this range for next election.

src/meta/raft-store/src/ondisk/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ impl OnDisk {
160160

161161
let db = init_get_sled_db(config.raft_dir.clone(), 1024 * 1024 * 1024);
162162

163-
let tree = SledTree::open(&db, TREE_HEADER, config.is_sync())?;
163+
let tree = SledTree::open(&db, TREE_HEADER)?;
164164
let ks = tree.key_space::<DataHeader>();
165165

166166
let header = ks.get(&Self::KEY_HEADER.to_string()).map_err(|e| {

src/meta/raft-store/src/ondisk/upgrade_to_v004.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ impl OnDisk {
6363

6464
// Read the purged index
6565
let first_log_index = {
66-
let tree = SledTree::open(&db, "raft_log", self.config.is_sync())?;
66+
let tree = SledTree::open(&db, "raft_log")?;
6767
let ks = tree.key_space::<LogMeta>();
6868
let purged = ks.get(&LogMetaKey::LastPurged).map_err(|e| {
6969
io::Error::other(format!(
@@ -77,7 +77,7 @@ impl OnDisk {
7777

7878
// import logs
7979
{
80-
let tree = SledTree::open(&db, "raft_log", self.config.is_sync())?;
80+
let tree = SledTree::open(&db, "raft_log")?;
8181
let it = tree.tree.iter();
8282

8383
for (i, rkv) in it.enumerate() {
@@ -111,7 +111,7 @@ impl OnDisk {
111111

112112
// import raft_state
113113
{
114-
let tree = SledTree::open(&db, "raft_state", self.config.is_sync())?;
114+
let tree = SledTree::open(&db, "raft_state")?;
115115
let kvs = tree.export()?;
116116
for kv in kvs {
117117
let ent = RaftStoreEntry::deserialize(&kv[0], &kv[1])?;

src/meta/service/src/api/grpc/grpc_service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ impl MetaService for MetaServiceImpl {
413413

414414
let meta_node = self.try_get_meta_node()?;
415415

416-
let strm = meta_node.raft_store.inner().export();
416+
let strm = meta_node.raft_store.clone().export();
417417

418418
let chunk_size = 32;
419419
// - Chunk up upto 32 Ok items inside a Vec<String>;
@@ -446,7 +446,7 @@ impl MetaService for MetaServiceImpl {
446446

447447
let meta_node = self.try_get_meta_node()?;
448448

449-
let strm = meta_node.raft_store.inner().export();
449+
let strm = meta_node.raft_store.clone().export();
450450

451451
let chunk_size = request.get_ref().chunk_size.unwrap_or(32) as usize;
452452
// - Chunk up upto `chunk_size` Ok items inside a Vec<String>;

src/meta/service/src/configs/outer_v0.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,6 @@ pub struct ConfigViaEnv {
290290
pub kvsrv_advertise_host: String,
291291
pub kvsrv_api_port: u16,
292292
pub kvsrv_raft_dir: String,
293-
pub kvsrv_no_sync: bool,
294293

295294
pub kvsrv_log_cache_max_items: u64,
296295
pub kvsrv_log_cache_capacity: u64,
@@ -348,7 +347,6 @@ impl From<Config> for ConfigViaEnv {
348347
kvsrv_advertise_host: cfg.raft_config.raft_advertise_host,
349348
kvsrv_api_port: cfg.raft_config.raft_api_port,
350349
kvsrv_raft_dir: cfg.raft_config.raft_dir,
351-
kvsrv_no_sync: cfg.raft_config.no_sync,
352350

353351
kvsrv_log_cache_max_items: 1_000_000,
354352
kvsrv_log_cache_capacity: 1024 * 1024 * 1024,
@@ -385,7 +383,6 @@ impl Into<Config> for ConfigViaEnv {
385383
raft_advertise_host: self.kvsrv_advertise_host,
386384
raft_api_port: self.kvsrv_api_port,
387385
raft_dir: self.kvsrv_raft_dir,
388-
no_sync: self.kvsrv_no_sync,
389386

390387
log_cache_max_items: self.kvsrv_log_cache_max_items,
391388
log_cache_capacity: self.kvsrv_log_cache_capacity,
@@ -482,12 +479,6 @@ pub struct RaftConfig {
482479
#[clap(long, default_value = "./.databend/meta")]
483480
pub raft_dir: String,
484481

485-
/// Whether to fsync meta to disk for every meta write(raft log, state machine etc).
486-
/// No-sync brings risks of data loss during a crash.
487-
/// You should only use this in a testing environment, unless YOU KNOW WHAT YOU ARE DOING.
488-
#[clap(long)]
489-
pub no_sync: bool,
490-
491482
/// The maximum number of log entries for log entries cache. Default value is 1_000_000.
492483
#[clap(long, default_value = "1000000")]
493484
pub log_cache_max_items: u64,
@@ -614,7 +605,6 @@ impl From<RaftConfig> for InnerRaftConfig {
614605
raft_advertise_host: x.raft_advertise_host,
615606
raft_api_port: x.raft_api_port,
616607
raft_dir: x.raft_dir,
617-
no_sync: x.no_sync,
618608

619609
log_cache_max_items: x.log_cache_max_items,
620610
log_cache_capacity: x.log_cache_capacity,
@@ -651,7 +641,6 @@ impl From<InnerRaftConfig> for RaftConfig {
651641
raft_advertise_host: inner.raft_advertise_host,
652642
raft_api_port: inner.raft_api_port,
653643
raft_dir: inner.raft_dir,
654-
no_sync: inner.no_sync,
655644

656645
log_cache_max_items: inner.log_cache_max_items,
657646
log_cache_capacity: inner.log_cache_capacity,

src/meta/service/src/meta_service/meta_node.rs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ impl MetaNodeBuilder {
172172

173173
let net = NetworkFactory::new(sto.clone());
174174

175-
let log_store = sto.log.clone();
176-
let sm_store = sto.state_machine.clone();
175+
let log_store = sto.log().clone();
176+
let sm_store = sto.state_machine().clone();
177177

178178
let raft = MetaRaft::new(node_id, Arc::new(config), net, log_store, sm_store)
179179
.await
@@ -374,25 +374,12 @@ impl MetaNode {
374374
) -> Result<Arc<MetaNode>, MetaStartupError> {
375375
info!("MetaNode::open, config: {:?}", config);
376376

377-
let mut config = config.clone();
378-
379-
// Always disable fsync on mac.
380-
// Because there are some integration tests running on mac VM.
381-
//
382-
// On mac File::sync_all() takes 10 ms ~ 30 ms, 500 ms at worst, which very likely to fail a test.
383-
if cfg!(target_os = "macos") {
384-
warn!("Disabled fsync for meta data tests. fsync on mac is quite slow");
385-
config.no_sync = true;
386-
}
377+
let config = config.clone();
387378

388379
let log_store = RaftStore::open(&config).await?;
389380

390381
// config.id only used for the first time
391-
let self_node_id = if log_store.is_opened {
392-
log_store.id
393-
} else {
394-
config.id
395-
};
382+
let self_node_id = log_store.id;
396383

397384
let builder = MetaNode::builder(&config)
398385
.sto(log_store.clone())
@@ -1264,11 +1251,11 @@ impl MetaNode {
12641251

12651252
/// Get the size in bytes of the on disk files of the raft log storage.
12661253
async fn get_raft_log_size(&self) -> u64 {
1267-
self.raft_store.log.read().await.on_disk_size()
1254+
self.raft_store.log().read().await.on_disk_size()
12681255
}
12691256

12701257
async fn get_raft_log_stat(&self) -> RaftLogStat {
1271-
self.raft_store.log.read().await.stat()
1258+
self.raft_store.log().read().await.stat()
12721259
}
12731260

12741261
async fn get_snapshot_key_count(&self) -> u64 {

src/meta/service/src/store/meta_raft_log/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::ops::Deref;
1616
use std::sync::Arc;
1717

18+
use databend_common_meta_raft_store::config::RaftConfig;
1819
use databend_common_meta_raft_store::raft_log_v004::RaftLogV004;
1920
use databend_common_meta_types::raft_types::NodeId;
2021
use tokio::sync::RwLock;
@@ -25,6 +26,8 @@ mod impl_raft_log_storage;
2526
#[derive(Debug, Clone)]
2627
pub struct MetaRaftLog {
2728
pub(crate) id: NodeId,
29+
#[allow(dead_code)]
30+
pub(crate) config: RaftConfig,
2831
inner: Arc<RwLock<RaftLogV004>>,
2932
}
3033

@@ -37,9 +40,10 @@ impl Deref for MetaRaftLog {
3740
}
3841

3942
impl MetaRaftLog {
40-
pub fn new(id: NodeId, inner: RaftLogV004) -> Self {
43+
pub fn new(id: NodeId, config: RaftConfig, inner: RaftLogV004) -> Self {
4144
Self {
4245
id,
46+
config,
4347
inner: Arc::new(RwLock::new(inner)),
4448
}
4549
}

src/meta/service/src/store/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,5 @@ pub mod meta_raft_log;
1616
pub mod meta_raft_state_machine;
1717
#[allow(clippy::module_inception)]
1818
mod store;
19-
mod store_inner;
2019

2120
pub use store::RaftStore;
22-
pub use store_inner::RaftStoreInner;

0 commit comments

Comments
 (0)