Skip to content

Commit d577e76

Browse files
authored
feat: add key prefix count assertions to transactions (#17053)
Enable transaction conditions based on the number of keys matching a prefix. Example: ```rust let txn = TxnRequest { condition: vec![ TxnCondition::match_keys_with_prefix("key/", Eq, 3) ], //... } ``` This allows transactions to proceed only when a prefix matches an expected number of keys, providing atomic prefix-based cardinality checks. This commit involves a databend-meta server side change: - Add `txn_condition::Target::KeysWithPrefix`. To provide compatibility, any change to the client that uses this feature must update the compatibility doc and upgrade the databend-meta cluster first.
1 parent 2e5fec4 commit d577e76

File tree

7 files changed

+270
-95
lines changed

7 files changed

+270
-95
lines changed

src/meta/client/src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,14 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
110110
/// require the client to call kv_read_v1 for get/mget/list,
111111
/// which is added `2024-01-07: since 1.2.287`
112112
///
113-
/// - 2024-11-2*: since 1.2.6**
113+
/// - 2024-11-23: since 1.2.663
114114
/// 👥 client: remove use of `Operation::AsIs`
115115
///
116+
/// - 2024-12-1*: since 1.2.*
117+
/// 🖥 server: add `txn_condition::Target::KeysWithPrefix`,
118+
/// to support matching the key count by a prefix.
119+
///
120+
///
116121
/// Server feature set:
117122
/// ```yaml
118123
/// server_features:

src/meta/kvapi/src/kvapi/test_suite.rs

Lines changed: 107 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ impl kvapi::TestSuite {
9292
self.kv_transaction_with_ttl(&builder.build().await).await?;
9393
self.kv_transaction_delete_match_seq_none(&builder.build().await)
9494
.await?;
95+
self.kv_transaction_condition_keys_with_prefix(&builder.build().await)
96+
.await?;
9597
self.kv_transaction_delete_match_seq_some_not_match(&builder.build().await)
9698
.await?;
9799
self.kv_transaction_delete_match_seq_some_match(&builder.build().await)
@@ -111,7 +113,7 @@ impl kvapi::TestSuite {
111113
// write
112114
let res = kv.upsert_kv(UpsertKV::update("foo", b"bar")).await?;
113115
assert_eq!(None, res.prev);
114-
assert_eq!(Some(SeqV::with_meta(1, None, b"bar".to_vec())), res.result);
116+
assert_eq!(Some(SeqV::new(1, b("bar"))), res.result);
115117
}
116118

117119
{
@@ -120,10 +122,7 @@ impl kvapi::TestSuite {
120122
.upsert_kv(UpsertKV::update("foo", b"bar").with(MatchSeq::Exact(2)))
121123
.await?;
122124
assert_eq!(
123-
(
124-
Some(SeqV::with_meta(1, None, b"bar".to_vec())),
125-
Some(SeqV::with_meta(1, None, b"bar".to_vec())),
126-
),
125+
(Some(SeqV::new(1, b("bar"))), Some(SeqV::new(1, b("bar"))),),
127126
(res.prev, res.result),
128127
"nothing changed"
129128
);
@@ -134,16 +133,8 @@ impl kvapi::TestSuite {
134133
let res = kv
135134
.upsert_kv(UpsertKV::update("foo", b"wow").with(MatchSeq::Exact(1)))
136135
.await?;
137-
assert_eq!(
138-
Some(SeqV::with_meta(1, None, b"bar".to_vec())),
139-
res.prev,
140-
"old value"
141-
);
142-
assert_eq!(
143-
Some(SeqV::with_meta(2, None, b"wow".to_vec())),
144-
res.result,
145-
"new value"
146-
);
136+
assert_eq!(Some(SeqV::new(1, b("bar"))), res.prev, "old value");
137+
assert_eq!(Some(SeqV::new(2, b("wow"))), res.result, "new value");
147138
}
148139

149140
Ok(())
@@ -194,10 +185,7 @@ impl kvapi::TestSuite {
194185
let res = kv.upsert_kv(UpsertKV::delete(test_key)).await?;
195186
// dbg!("delete", &res);
196187

197-
assert_eq!(
198-
(Some(SeqV::with_meta(2, None, b"v2".to_vec())), None),
199-
(res.prev, res.result)
200-
);
188+
assert_eq!((Some(SeqV::new(2, b("v2"))), None), (res.prev, res.result));
201189

202190
Ok(())
203191
}
@@ -213,38 +201,35 @@ impl kvapi::TestSuite {
213201
assert_eq!((None, None), (r.prev, r.result), "not changed");
214202

215203
let r = kv.upsert_kv(UpsertKV::update(test_key, b"v1")).await?;
216-
assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.result);
204+
assert_eq!(Some(SeqV::new(1, b("v1"))), r.result);
217205
let seq = r.result.unwrap().seq;
218206

219207
// unmatched seq
220208
let r = kv
221209
.upsert_kv(UpsertKV::update(test_key, b"v2").with(MatchSeq::Exact(seq + 1)))
222210
.await?;
223-
assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.prev);
224-
assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.result);
211+
assert_eq!(Some(SeqV::new(1, b("v1"))), r.prev);
212+
assert_eq!(Some(SeqV::new(1, b("v1"))), r.result);
225213

226214
// matched seq
227215
let r = kv
228216
.upsert_kv(UpsertKV::update(test_key, b"v2").with(MatchSeq::Exact(seq)))
229217
.await?;
230-
assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.prev);
231-
assert_eq!(Some(SeqV::with_meta(2, None, b"v2".to_vec())), r.result);
218+
assert_eq!(Some(SeqV::new(1, b("v1"))), r.prev);
219+
assert_eq!(Some(SeqV::new(2, b("v2"))), r.result);
232220

233221
// blind update
234222
let r = kv
235223
.upsert_kv(UpsertKV::update(test_key, b"v3").with(MatchSeq::GE(1)))
236224
.await?;
237-
assert_eq!(Some(SeqV::with_meta(2, None, b"v2".to_vec())), r.prev);
238-
assert_eq!(Some(SeqV::with_meta(3, None, b"v3".to_vec())), r.result);
225+
assert_eq!(Some(SeqV::new(2, b("v2"))), r.prev);
226+
assert_eq!(Some(SeqV::new(3, b("v3"))), r.result);
239227

240228
// value updated
241229
let key_value = kv.get_kv(test_key).await?;
242230
assert!(key_value.is_some());
243231
let key_value = key_value.unwrap();
244-
assert_eq!(
245-
key_value,
246-
SeqV::with_meta(key_value.seq, None, b"v3".to_vec())
247-
);
232+
assert_eq!(key_value, SeqV::new(key_value.seq, b("v3")));
248233
Ok(())
249234
}
250235

@@ -380,7 +365,7 @@ impl kvapi::TestSuite {
380365
let now_sec = SeqV::<()>::now_sec();
381366

382367
let r = kv.upsert_kv(UpsertKV::update(test_key, b"v1")).await?;
383-
assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.result);
368+
assert_eq!(Some(SeqV::new(1, b("v1"))), r.result);
384369
let seq = r.result.unwrap().seq;
385370

386371
info!("--- mismatching seq does nothing");
@@ -389,24 +374,24 @@ impl kvapi::TestSuite {
389374
.upsert_kv(UpsertKV::new(
390375
test_key,
391376
MatchSeq::Exact(seq + 1),
392-
Operation::Update(b"v1".to_vec()),
377+
Operation::Update(b("v1")),
393378
Some(MetaSpec::new_ttl(Duration::from_secs(20))),
394379
))
395380
.await?;
396-
assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.prev);
397-
assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.result);
381+
assert_eq!(Some(SeqV::new(1, b("v1"))), r.prev);
382+
assert_eq!(Some(SeqV::new(1, b("v1"))), r.result);
398383

399384
info!("--- matching seq only update meta");
400385

401386
let r = kv
402387
.upsert_kv(UpsertKV::new(
403388
test_key,
404389
MatchSeq::Exact(seq),
405-
Operation::Update(b"v1".to_vec()),
390+
Operation::Update(b("v1")),
406391
Some(MetaSpec::new_ttl(Duration::from_secs(20))),
407392
))
408393
.await?;
409-
assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.prev);
394+
assert_eq!(Some(SeqV::new(1, b("v1"))), r.prev);
410395

411396
{
412397
let res = r.result.unwrap();
@@ -478,15 +463,15 @@ impl kvapi::TestSuite {
478463

479464
let res = kv.mget_kv(&["k1".to_string(), "k2".to_string()]).await?;
480465
assert_eq!(res, vec![
481-
Some(SeqV::with_meta(1, None, b"v1".to_vec(),)),
466+
Some(SeqV::new(1, b("v1"),)),
482467
// NOTE, the sequence number is increased globally (inside the namespace of generic kv)
483-
Some(SeqV::with_meta(2, None, b"v2".to_vec(),)),
468+
Some(SeqV::new(2, b("v2"),)),
484469
]);
485470

486471
let res = kv
487472
.mget_kv(&["k1".to_string(), "key_no exist".to_string()])
488473
.await?;
489-
assert_eq!(res, vec![Some(SeqV::new(1, b"v1".to_vec())), None]);
474+
assert_eq!(res, vec![Some(SeqV::new(1, b("v1"))), None]);
490475

491476
Ok(())
492477
}
@@ -663,7 +648,7 @@ impl kvapi::TestSuite {
663648
// first case: get and set one key transaction
664649
{
665650
let k1 = "txn_1_K1";
666-
let val1 = b"v1".to_vec();
651+
let val1 = b("v1");
667652

668653
// first insert k1 value
669654
kv.upsert_kv(UpsertKV::update(k1, &val1)).await?;
@@ -740,11 +725,11 @@ impl kvapi::TestSuite {
740725
// 3rd case: get two key and set both key transaction
741726
{
742727
let k1 = "txn_3_K1";
743-
let val1 = b"v1".to_vec();
744-
let val1_new = b"v1_new".to_vec();
728+
let val1 = b("v1");
729+
let val1_new = b("v1_new");
745730

746731
let k2 = "txn_3_K2";
747-
let val2 = b"v1".to_vec();
732+
let val2 = b("v1");
748733

749734
// first insert k1 and k2 value
750735
kv.upsert_kv(UpsertKV::update(k1, &val1)).await?;
@@ -1095,6 +1080,78 @@ impl kvapi::TestSuite {
10951080
Ok(())
10961081
}
10971082

1083+
/// A transaction that checks the number of keys with given prefix.
1084+
pub async fn kv_transaction_condition_keys_with_prefix<KV: kvapi::KVApi>(
1085+
&self,
1086+
kv: &KV,
1087+
) -> anyhow::Result<()> {
1088+
let prefix = func_name!();
1089+
1090+
let sample_keys_prefix = format!("{}/xxx", prefix);
1091+
1092+
let sample = |suffix| format!("{}/{}", sample_keys_prefix, suffix);
1093+
let positive = format!("{prefix}/positive");
1094+
let negative = format!("{prefix}/negative");
1095+
1096+
kv.upsert_kv(UpsertKV::update(sample("a"), &b("a"))).await?;
1097+
kv.upsert_kv(UpsertKV::update(sample("b"), &b("b"))).await?;
1098+
kv.upsert_kv(UpsertKV::update(sample("c"), &b("c"))).await?;
1099+
1100+
use ConditionResult::*;
1101+
1102+
// A transaction that set positive key if succeeded,
1103+
// otherwise set the negative key.
1104+
let txn = |op: ConditionResult, n: u64| TxnRequest {
1105+
condition: vec![TxnCondition::match_keys_with_prefix(
1106+
&sample_keys_prefix,
1107+
op,
1108+
n,
1109+
)],
1110+
if_then: vec![TxnOp::put(&positive, b(format!("{op:?}")))],
1111+
else_then: vec![TxnOp::put(&negative, b(format!("{op:?}")))],
1112+
};
1113+
1114+
for (op, n, expected) in [
1115+
(Eq, 2, false),
1116+
(Eq, 3, true),
1117+
(Eq, 4, false),
1118+
(Ne, 2, true),
1119+
(Ne, 3, false),
1120+
(Ne, 4, true),
1121+
(Lt, 3, false),
1122+
(Lt, 4, true),
1123+
(Lt, 5, true),
1124+
(Le, 2, false),
1125+
(Le, 3, true),
1126+
(Le, 4, true),
1127+
(Gt, 2, true),
1128+
(Gt, 3, false),
1129+
(Gt, 4, false),
1130+
(Ge, 2, true),
1131+
(Ge, 3, true),
1132+
(Ge, 4, false),
1133+
] {
1134+
kv.upsert_kv(UpsertKV::update(&positive, &b(""))).await?;
1135+
kv.upsert_kv(UpsertKV::update(&negative, &b(""))).await?;
1136+
1137+
let resp = kv.transaction(txn(op, n)).await?;
1138+
assert_eq!(
1139+
resp.success, expected,
1140+
"case: {op:?} {n}, expected: {expected}"
1141+
);
1142+
1143+
let expected_key = if expected { &positive } else { &negative };
1144+
let got = kv.get_kv(expected_key).await?.unwrap().data;
1145+
assert_eq!(
1146+
got,
1147+
b(format!("{op:?}")),
1148+
"case: {op:?} {n}, expected: {expected}"
1149+
);
1150+
}
1151+
1152+
Ok(())
1153+
}
1154+
10981155
/// If `TxnDeleteRequest.match_seq` is not set,
10991156
/// the delete operation will always be executed.
11001157
pub async fn kv_transaction_delete_match_seq_none<KV: kvapi::KVApi>(
@@ -1103,7 +1160,7 @@ impl kvapi::TestSuite {
11031160
) -> anyhow::Result<()> {
11041161
info!("--- {}", func_name!());
11051162
let key = || "txn_1_K1".to_string();
1106-
let val = || b"v1".to_vec();
1163+
let val = || b("v1");
11071164

11081165
kv.upsert_kv(UpsertKV::update(key(), &val())).await?;
11091166

@@ -1137,7 +1194,7 @@ impl kvapi::TestSuite {
11371194
) -> anyhow::Result<()> {
11381195
info!("--- {}", func_name!());
11391196
let key = || "txn_1_K1".to_string();
1140-
let val = || b"v1".to_vec();
1197+
let val = || b("v1");
11411198

11421199
kv.upsert_kv(UpsertKV::update(key(), &val())).await?;
11431200

@@ -1175,7 +1232,7 @@ impl kvapi::TestSuite {
11751232
) -> anyhow::Result<()> {
11761233
info!("--- {}", func_name!());
11771234
let key = || "txn_1_K1".to_string();
1178-
let val = || b"v1".to_vec();
1235+
let val = || b("v1");
11791236

11801237
kv.upsert_kv(UpsertKV::update(key(), &val())).await?;
11811238

@@ -1231,7 +1288,7 @@ impl kvapi::TestSuite {
12311288
{
12321289
let res = kv2.get_kv("t").await?;
12331290
let res = res.unwrap();
1234-
assert_eq!(b"t".to_vec(), res.data);
1291+
assert_eq!(b("t"), res.data);
12351292
}
12361293

12371294
info!("--- test mget on other node");
@@ -1243,7 +1300,7 @@ impl kvapi::TestSuite {
12431300
Some(SeqV {
12441301
seq: 11,
12451302
meta: None,
1246-
data: b"v".to_vec()
1303+
data: b("v")
12471304
})
12481305
],
12491306
res
@@ -1267,6 +1324,6 @@ impl kvapi::TestSuite {
12671324
}
12681325
}
12691326

1270-
fn b(s: &str) -> Vec<u8> {
1271-
s.as_bytes().to_vec()
1327+
fn b(x: impl ToString) -> Vec<u8> {
1328+
x.to_string().as_bytes().to_vec()
12721329
}

0 commit comments

Comments
 (0)