Skip to content

Commit c0b9ed0

Browse files
authored
feat(meta-service): add initialization complete flag for watch (#17900)
When a watch requires initialization flush, the server now sends a flag message indicating when the initialization phase is completed. With this flag, the `cache` module can determine whether data has been fully initialized and is ready to serve, improving reliability of the caching system.
1 parent ebeab93 commit c0b9ed0

File tree

11 files changed

+157
-38
lines changed

11 files changed

+157
-38
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ url = "2.5.4"
524524
uuid = { version = "1.10.0", features = ["std", "serde", "v4", "v7"] }
525525
volo-thrift = "0.10"
526526
walkdir = "2.3.2"
527-
watcher = { version = "0.2.0" }
527+
watcher = { version = "0.4.0" }
528528
wiremock = "0.6"
529529
wkt = "0.11.1"
530530
xorf = { version = "0.11.0", default-features = false, features = ["binary-fuse"] }
@@ -648,5 +648,5 @@ sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafus
648648
tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370" }
649649
tantivy-common = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370", package = "tantivy-common" }
650650
tantivy-jieba = { git = "https://github.com/datafuse-extras/tantivy-jieba", rev = "0e300e9" }
651-
watcher = { git = "https://github.com/databendlabs/watcher", tag = "v0.2.0" }
651+
watcher = { git = "https://github.com/databendlabs/watcher", tag = "v0.4.0" }
652652
xorfilter-rs = { git = "https://github.com/datafuse-extras/xorfilter", tag = "databend-alpha.4" }

src/meta/README.md

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -79,26 +79,21 @@ The following is an illustration of the latest query-meta compatibility:
7979
`[, a.b.c)` denotes the range of versions from previous version(on the left column)(inclusive)
8080
upto `a.b.c` (exclusive).
8181

82-
TODO: xx is the version in which semaphore is added. update it when merged.
83-
84-
| `Meta\Query` | 1.1.34) | [, 1.2.287) | [, 1.2.361) | [, xx) | [, +∞) |
85-
|:-------------------|:--------|:------------|:------------|:-------|:-----------------|
86-
| [0.8.30, 0.8.35) | |||||
87-
| [0.8.35, 0.9.23) | |||||
88-
| [0.9.23, 0.9.42) | |||||
89-
| [0.9.42, 1.1.32) | |||||
90-
| [1.1.32, 1.2.63) | |||||
91-
| [1.2.63, 1.2.226) | |||||
92-
| [1.2.226, 1.2.258) | |||||
93-
| [1.2.258, 1.2.663) | |||| ✅(no semaphore) |
94-
| [1.2.663, 1.2.677) | |||| ✅(no semaphore) |
95-
| [1.2.677, +∞) | |||||
82+
83+
| `Meta\Query` | 1.2.287) | [, 1.2.361) | [, 1.2.715) | [, 1.2.726) | [, +∞) |
84+
|:-------------------|:---------|:------------|:------------|:-----------------|:-------|
85+
| [1.2.63, 1.2.226) | |||||
86+
| [1.2.226, 1.2.258) | |||||
87+
| [1.2.258, 1.2.663) | ||| ✅(no semaphore) ||
88+
| [1.2.663, 1.2.677) | ||| ✅(no semaphore) ||
89+
| [1.2.677, +∞) | |||||
9690

9791
History versions that are not included in the above chart:
9892

9993
- Query `[0.7.59, 0.8.80)` is compatible with Meta `[0.8.30, 0.9.23)`.
10094
- Query `[0.8.80, 0.9.41)` is compatible with Meta `[0.8.35, 0.9.42)`.
10195
- Query `[0.9.41, 1.1.34)` is compatible with Meta `[0.8.35, 1.2.663)`.
96+
- Query `[1.1.34, 1.2.287)` is compatible with Meta `[1.1.32, 1.2.63)`.
10297

10398

10499
## Compatibility between databend-meta

src/meta/client/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,12 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
134134
/// 👥 client: semaphore(watch) requires `WatchRequest::initial_flush`(`1,2.677`),
135135
/// other RPC does not require `1.2.677`, requires only `1.2.259`.
136136
///
137-
/// - 2025-04-15: since TODO: add version when merged.
137+
/// - 2025-04-15: since 1.2.726
138138
/// 👥 client: requires `1,2.677`.
139139
///
140+
/// - 2025-05-08: since TODO: add version when merged.
141+
/// 🖥 server: add `WatchResponse::is_initialization`,
142+
///
140143
///
141144
/// Server feature set:
142145
/// ```yaml

src/meta/semaphore/src/meta_event_subscriber/processor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ mod tests {
225225
let prev = PermitEntry::new("a", 1);
226226
let current = PermitEntry::new("b", 2);
227227

228-
let watch_response = WatchResponse::new3(
228+
let watch_response = WatchResponse::new_change_event(
229229
sem_key.format_key(),
230230
Some(SeqV::new(1, prev.encode_to_vec()?)),
231231
Some(SeqV::new(2, current.encode_to_vec()?)),

src/meta/service/src/api/grpc/grpc_service.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ use tonic::Response;
7474
use tonic::Status;
7575
use tonic::Streaming;
7676
use watcher::key_range::build_key_range;
77-
use watcher::util::new_watch_sink;
77+
use watcher::util::new_initialization_sink;
7878
use watcher::util::try_forward;
7979
use watcher::watch_stream::WatchStream;
8080
use watcher::watch_stream::WatchStreamSender;
@@ -459,10 +459,21 @@ impl MetaService for MetaServiceImpl {
459459
let ctx = "watch-Dispatcher";
460460

461461
if let Some(sender) = sm.event_sender() {
462-
let snk = new_watch_sink::<WatchTypes>(tx, ctx);
462+
let snk = new_initialization_sink::<WatchTypes>(tx.clone(), ctx);
463463
let strm = sm.range_kv(key_range).await?;
464464

465-
let fu = try_forward(strm, snk, ctx);
465+
let fu = async move {
466+
try_forward(strm, snk, ctx).await;
467+
468+
// Send an empty message with `is_initialization=false` to indicate
469+
// the end of the initialization flush.
470+
tx.send(Ok(WatchResponse::new_initialization_complete()))
471+
.await
472+
.map_err(|e| {
473+
error!("failed to send flush complete message: {}", e);
474+
})
475+
.ok();
476+
};
466477
let fu = Box::pin(fu);
467478

468479
info!(

src/meta/service/src/meta_service/watcher.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ use tonic::Status;
2828
use watcher::dispatch::Command;
2929
use watcher::dispatch::DispatcherHandle as GenericDispatcherHandle;
3030
use watcher::type_config::KVChange;
31+
use watcher::type_config::KeyOf;
3132
use watcher::type_config::TypeConfig;
33+
use watcher::type_config::ValueOf;
3234

3335
use crate::metrics::server_metrics;
3436

@@ -42,8 +44,12 @@ impl TypeConfig for WatchTypes {
4244
type Response = WatchResponse;
4345
type Error = Status;
4446

45-
fn new_response(change: KVChange<Self>) -> Self::Response {
46-
WatchResponse::new3(change.0, change.1, change.2)
47+
fn new_initialize_response(key: KeyOf<Self>, value: ValueOf<Self>) -> Self::Response {
48+
WatchResponse::new_initialization_event(key, value)
49+
}
50+
51+
fn new_change_response(change: KVChange<Self>) -> Self::Response {
52+
WatchResponse::new_change_event(change.0, change.1, change.2)
4753
}
4854

4955
fn data_error(error: Error) -> Self::Error {

src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ async fn test_watch() -> anyhow::Result<()> {
343343

344344
#[test(harness = meta_service_test_harness)]
345345
#[fastrace::trace]
346-
async fn test_watch_initial_flush() -> anyhow::Result<()> {
346+
async fn test_watch_initialization_flush() -> anyhow::Result<()> {
347347
let (tc, _addr) = crate::tests::start_metasrv().await?;
348348
let updates = vec![
349349
UpsertKV::update("a", b"a"),
@@ -369,11 +369,25 @@ async fn test_watch_initial_flush() -> anyhow::Result<()> {
369369
};
370370

371371
let cache = Arc::new(Mutex::new(BTreeMap::new()));
372-
373372
let c = cache.clone();
373+
374+
let is_initialization_completed = Arc::new(Mutex::new(false));
375+
let init_compl = is_initialization_completed.clone();
376+
377+
let flags = Arc::new(Mutex::new(vec![]));
378+
let f = flags.clone();
379+
374380
let cache_updater = async move {
375381
while let Ok(Some(resp)) = strm.message().await {
376-
let event = resp.event.unwrap();
382+
f.lock().unwrap().push(resp.is_initialization);
383+
384+
if resp.is_initialization_complete_flag() {
385+
*init_compl.lock().unwrap() = true;
386+
}
387+
388+
let Some(event) = resp.event else {
389+
continue;
390+
};
377391

378392
let mut cache = c.lock().unwrap();
379393
if let Some(value) = event.current {
@@ -387,6 +401,14 @@ async fn test_watch_initial_flush() -> anyhow::Result<()> {
387401
let _h = databend_common_base::runtime::spawn(cache_updater);
388402

389403
tokio::time::sleep(Duration::from_secs(1)).await;
404+
405+
assert_eq!(flags.lock().unwrap().clone(), vec![
406+
true, true, true, true, // existent key-values
407+
false, // initialization complete
408+
]);
409+
410+
assert!(*is_initialization_completed.lock().unwrap());
411+
390412
let keys = {
391413
let cache = cache.lock().unwrap();
392414
cache.keys().cloned().collect::<Vec<_>>()
@@ -398,6 +420,13 @@ async fn test_watch_initial_flush() -> anyhow::Result<()> {
398420
client.upsert_kv(UpsertKV::delete("c")).await?;
399421

400422
tokio::time::sleep(Duration::from_secs(1)).await;
423+
424+
assert_eq!(flags.lock().unwrap().clone(), vec![
425+
true, true, true, true, // existent key-values
426+
false, // initialization complete
427+
false, false, // changes
428+
]);
429+
401430
let values = {
402431
let cache = cache.lock().unwrap();
403432
cache

src/meta/types/proto/meta.proto

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,24 @@ message Event {
100100
optional SeqV prev = 3;
101101
}
102102

103-
message WatchResponse {Event event = 1;}
103+
// Response in a watch stream from server to the client.
104+
message WatchResponse {
105+
// The event containing key-value change information or a value emitted during initial-flush.
106+
// This includes the key, current value (if any), and previous value (if any).
107+
Event event = 1;
108+
109+
// Indicates whether this event is part of the initialization.
110+
//
111+
// When true:
112+
// - The event represents an existing key-value record at the time the watch was established
113+
// - These events are sent when initial_flush=true was specified in WatchRequest
114+
//
115+
// When false:
116+
// - The event represents a real-time change that occurred after the watch was established
117+
// - A special event with no key-value data may be sent to indicate the completion
118+
// of the initial flush phase
119+
bool is_initialization = 2;
120+
}
104121

105122
// messages for txn
106123
message TxnCondition {

src/meta/types/src/proto_display/watch_display.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,12 @@ impl fmt::Display for Event {
3434

3535
impl fmt::Display for WatchResponse {
3636
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37-
write!(f, "{}", self.event.display())
37+
let typ = if self.is_initialization {
38+
"INIT"
39+
} else {
40+
"CHANGE"
41+
};
42+
write!(f, "{typ}:{}", self.event.display())
3843
}
3944
}
4045

@@ -79,7 +84,7 @@ mod tests {
7984

8085
#[test]
8186
fn test_watch_response_display() {
82-
let watch_response = WatchResponse {
87+
let mut watch_response = WatchResponse {
8388
event: Some(Event {
8489
key: "test_key".to_string(),
8590
prev: Some(SeqV {
@@ -95,11 +100,24 @@ mod tests {
95100
meta: None,
96101
}),
97102
}),
103+
is_initialization: false,
104+
};
105+
assert_eq!(watch_response.to_string(), "CHANGE:(test_key: (seq=1 [expire=1970-01-01T00:16:40.000] 'test_prev') -> (seq=2 [] 'test_current'))");
106+
107+
watch_response.is_initialization = true;
108+
assert_eq!(watch_response.to_string(), "INIT:(test_key: (seq=1 [expire=1970-01-01T00:16:40.000] 'test_prev') -> (seq=2 [] 'test_current'))");
109+
110+
let watch_response = WatchResponse {
111+
event: None,
112+
is_initialization: true,
98113
};
99-
assert_eq!(watch_response.to_string(), "(test_key: (seq=1 [expire=1970-01-01T00:16:40.000] 'test_prev') -> (seq=2 [] 'test_current'))");
114+
assert_eq!(watch_response.to_string(), "INIT:None");
100115

101-
let watch_response = WatchResponse { event: None };
102-
assert_eq!(watch_response.to_string(), "None");
116+
let watch_response = WatchResponse {
117+
event: None,
118+
is_initialization: false,
119+
};
120+
assert_eq!(watch_response.to_string(), "CHANGE:None");
103121
}
104122

105123
#[test]

0 commit comments

Comments
 (0)