Skip to content

Commit 77364e2

Browse files
authored
feat: support vacuum aggregating index (#17231)
* mark delete index * vacuum drop index * add ut * fix show table function * add compatibility test * add logic test * move test to ee * simplify marked deleted index * fix test * modify comment * make lint * fix result set * remove unnecessary derived trait * add comment * put tenant at first * fix test
1 parent be6657d commit 77364e2

File tree

25 files changed

+903
-8
lines changed

25 files changed

+903
-8
lines changed

src/meta/api/src/schema_api.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ use databend_common_meta_app::schema::ExtendLockRevReq;
5252
use databend_common_meta_app::schema::GcDroppedTableReq;
5353
use databend_common_meta_app::schema::GetDatabaseReq;
5454
use databend_common_meta_app::schema::GetIndexReply;
55+
use databend_common_meta_app::schema::GetMarkedDeletedIndexesReply;
5556
use databend_common_meta_app::schema::GetTableCopiedFileReply;
5657
use databend_common_meta_app::schema::GetTableCopiedFileReq;
5758
use databend_common_meta_app::schema::GetTableReq;
@@ -95,6 +96,7 @@ use databend_common_meta_app::schema::UpdateVirtualColumnReq;
9596
use databend_common_meta_app::schema::UpsertTableOptionReply;
9697
use databend_common_meta_app::schema::UpsertTableOptionReq;
9798
use databend_common_meta_app::schema::VirtualColumnMeta;
99+
use databend_common_meta_app::tenant::Tenant;
98100
use databend_common_meta_kvapi::kvapi;
99101
use databend_common_meta_types::seq_value::SeqV;
100102
use databend_common_meta_types::Change;
@@ -163,6 +165,12 @@ pub trait SchemaApi: Send + Sync {
163165
name_ident: &IndexNameIdent,
164166
) -> Result<Option<GetIndexReply>, MetaError>;
165167

168+
async fn get_marked_deleted_indexes(
169+
&self,
170+
tenant: &Tenant,
171+
table_id: Option<u64>,
172+
) -> Result<GetMarkedDeletedIndexesReply, MetaError>;
173+
166174
async fn update_index(
167175
&self,
168176
id_ident: IndexIdIdent,
@@ -387,4 +395,11 @@ pub trait SchemaApi: Send + Sync {
387395
where
388396
K: kvapi::Key + Sync + 'static,
389397
K::ValueType: FromToProto + 'static;
398+
399+
async fn remove_marked_deleted_index_ids(
400+
&self,
401+
tenant: &Tenant,
402+
table_id: u64,
403+
index_ids: &[u64],
404+
) -> Result<(), MetaTxnError>;
390405
}

src/meta/api/src/schema_api_impl.rs

Lines changed: 110 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ use databend_common_meta_app::schema::index_id_ident::IndexIdIdent;
6868
use databend_common_meta_app::schema::index_id_to_name_ident::IndexIdToNameIdent;
6969
use databend_common_meta_app::schema::index_name_ident::IndexName;
7070
use databend_common_meta_app::schema::least_visible_time_ident::LeastVisibleTimeIdent;
71+
use databend_common_meta_app::schema::marked_deleted_index_id::MarkedDeletedIndexId;
72+
use databend_common_meta_app::schema::marked_deleted_index_ident::MarkedDeletedIndexIdIdent;
7173
use databend_common_meta_app::schema::table_niv::TableNIV;
7274
use databend_common_meta_app::schema::CatalogIdToNameIdent;
7375
use databend_common_meta_app::schema::CatalogInfo;
@@ -110,6 +112,7 @@ use databend_common_meta_app::schema::ExtendLockRevReq;
110112
use databend_common_meta_app::schema::GcDroppedTableReq;
111113
use databend_common_meta_app::schema::GetDatabaseReq;
112114
use databend_common_meta_app::schema::GetIndexReply;
115+
use databend_common_meta_app::schema::GetMarkedDeletedIndexesReply;
113116
use databend_common_meta_app::schema::GetTableCopiedFileReply;
114117
use databend_common_meta_app::schema::GetTableCopiedFileReq;
115118
use databend_common_meta_app::schema::GetTableReq;
@@ -129,6 +132,8 @@ use databend_common_meta_app::schema::ListTableReq;
129132
use databend_common_meta_app::schema::ListVirtualColumnsReq;
130133
use databend_common_meta_app::schema::LockInfo;
131134
use databend_common_meta_app::schema::LockMeta;
135+
use databend_common_meta_app::schema::MarkedDeletedIndexMeta;
136+
use databend_common_meta_app::schema::MarkedDeletedIndexType;
132137
use databend_common_meta_app::schema::RenameDatabaseReply;
133138
use databend_common_meta_app::schema::RenameDatabaseReq;
134139
use databend_common_meta_app::schema::RenameDictionaryReq;
@@ -780,12 +785,46 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
780785
&self,
781786
name_ident: &IndexNameIdent,
782787
) -> Result<Option<(SeqV<IndexId>, SeqV<IndexMeta>)>, MetaTxnError> {
783-
let dropped = self
784-
.remove_id_value(name_ident, |id| {
785-
vec![IndexIdToNameIdent::new_generic(name_ident.tenant(), id).to_string_key()]
786-
})
787-
.await?;
788-
Ok(dropped)
788+
let mut trials = txn_backoff(None, func_name!());
789+
790+
loop {
791+
trials.next().unwrap()?.await;
792+
let mut txn = TxnRequest::default();
793+
794+
// remove name->id, id->meta, id->name
795+
let get_res = self.get_id_value(name_ident).await?;
796+
let Some((seq_id, seq_meta)) = get_res else {
797+
return Ok(None);
798+
};
799+
let id_ident = seq_id.data.into_t_ident(name_ident.tenant());
800+
txn_delete_exact(&mut txn, name_ident, seq_id.seq);
801+
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
802+
txn.if_then.push(TxnOp::delete(
803+
IndexIdToNameIdent::new_generic(name_ident.tenant(), seq_id.data).to_string_key(),
804+
));
805+
806+
// add __fd_marked_deleted_index/<table_id>/<index_id> -> marked_deleted_index_meta
807+
let marked_deleted_index_id_ident = MarkedDeletedIndexIdIdent::new_generic(
808+
name_ident.tenant(),
809+
MarkedDeletedIndexId::new(seq_meta.data.table_id, *seq_id.data),
810+
);
811+
let marked_deleted_index_meta = MarkedDeletedIndexMeta {
812+
dropped_on: Utc::now(),
813+
index_type: MarkedDeletedIndexType::AGGREGATING,
814+
};
815+
816+
txn.if_then.push(TxnOp::put(
817+
marked_deleted_index_id_ident.to_string_key(),
818+
serialize_struct(&marked_deleted_index_meta)?,
819+
));
820+
821+
let (succ, _responses) = send_txn(self, txn).await?;
822+
debug!(key :? =name_ident, id :? =&id_ident,succ = succ; "{}", func_name!());
823+
824+
if succ {
825+
return Ok(Some((seq_id, seq_meta)));
826+
}
827+
}
789828
}
790829

791830
#[logcall::logcall]
@@ -808,6 +847,41 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
808847
}))
809848
}
810849

850+
#[logcall::logcall]
851+
#[fastrace::trace]
852+
async fn get_marked_deleted_indexes(
853+
&self,
854+
tenant: &Tenant,
855+
table_id: Option<u64>,
856+
) -> Result<GetMarkedDeletedIndexesReply, MetaError> {
857+
let dir = match table_id {
858+
Some(table_id) => {
859+
let ident = MarkedDeletedIndexIdIdent::new_generic(
860+
tenant,
861+
MarkedDeletedIndexId::new(table_id, 0),
862+
);
863+
DirName::new(ident)
864+
}
865+
None => {
866+
let ident =
867+
MarkedDeletedIndexIdIdent::new_generic(tenant, MarkedDeletedIndexId::new(0, 0));
868+
DirName::new_with_level(ident, 2)
869+
}
870+
};
871+
let list_res = self.list_pb_vec(&dir).await?;
872+
let mut table_indexes = HashMap::new();
873+
for (k, v) in list_res {
874+
let table_id = k.name().table_id;
875+
let index_id = k.name().index_id;
876+
let index_meta = v.data;
877+
table_indexes
878+
.entry(table_id)
879+
.or_insert_with(Vec::new)
880+
.push((index_id, index_meta));
881+
}
882+
Ok(GetMarkedDeletedIndexesReply { table_indexes })
883+
}
884+
811885
#[logcall::logcall]
812886
#[fastrace::trace]
813887
async fn update_index(
@@ -3061,6 +3135,36 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
30613135
}
30623136
}
30633137
}
3138+
3139+
#[logcall::logcall]
3140+
#[fastrace::trace]
3141+
async fn remove_marked_deleted_index_ids(
3142+
&self,
3143+
tenant: &Tenant,
3144+
table_id: u64,
3145+
index_ids: &[u64],
3146+
) -> Result<(), MetaTxnError> {
3147+
let mut trials = txn_backoff(None, func_name!());
3148+
3149+
loop {
3150+
trials.next().unwrap()?.await;
3151+
let mut txn = TxnRequest::default();
3152+
3153+
for index_id in index_ids {
3154+
txn.if_then
3155+
.push(txn_op_del(&MarkedDeletedIndexIdIdent::new_generic(
3156+
tenant,
3157+
MarkedDeletedIndexId::new(table_id, *index_id),
3158+
)));
3159+
}
3160+
3161+
let (succ, _responses) = send_txn(self, txn).await?;
3162+
3163+
if succ {
3164+
return Ok(());
3165+
}
3166+
}
3167+
}
30643168
}
30653169

30663170
async fn get_history_table_metas(

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ use databend_common_meta_app::schema::ListLockRevReq;
9898
use databend_common_meta_app::schema::ListTableReq;
9999
use databend_common_meta_app::schema::ListVirtualColumnsReq;
100100
use databend_common_meta_app::schema::LockKey;
101+
use databend_common_meta_app::schema::MarkedDeletedIndexType;
101102
use databend_common_meta_app::schema::RenameDatabaseReq;
102103
use databend_common_meta_app::schema::RenameDictionaryReq;
103104
use databend_common_meta_app::schema::RenameTableReq;
@@ -6157,6 +6158,7 @@ impl SchemaApiTestSuite {
61576158
let mut util = Util::new(mt, tenant_name, "db1", "tb1", "eng1");
61586159
let table_id;
61596160
let index_id;
6161+
let index_id_2;
61606162

61616163
info!("--- prepare db and table");
61626164
{
@@ -6228,7 +6230,8 @@ impl SchemaApiTestSuite {
62286230
meta: index_meta_2.clone(),
62296231
};
62306232

6231-
mt.create_index(req).await?;
6233+
let res = mt.create_index(req).await?;
6234+
index_id_2 = res.index_id;
62326235
}
62336236

62346237
{
@@ -6279,13 +6282,46 @@ impl SchemaApiTestSuite {
62796282
assert_eq!(2, res.len());
62806283
}
62816284

6285+
{
6286+
info!("--- get marked deleted indexes");
6287+
let res = mt
6288+
.get_marked_deleted_indexes(&tenant, Some(table_id))
6289+
.await?;
6290+
assert_eq!(res.table_indexes.len(), 0);
6291+
6292+
let res = mt.get_marked_deleted_indexes(&tenant, None).await?;
6293+
assert_eq!(res.table_indexes.len(), 0);
6294+
}
6295+
62826296
{
62836297
info!("--- drop index");
62846298

62856299
let res = mt.drop_index(&name_ident_2).await?;
62866300
assert!(res.is_some())
62876301
}
62886302

6303+
{
6304+
info!("--- get marked deleted indexes after drop one");
6305+
let results = vec![
6306+
mt.get_marked_deleted_indexes(&tenant, Some(table_id))
6307+
.await?,
6308+
mt.get_marked_deleted_indexes(&tenant, None).await?,
6309+
];
6310+
for res in results {
6311+
assert_eq!(res.table_indexes.len(), 1);
6312+
let index = res.table_indexes.get(&table_id);
6313+
assert!(index.is_some());
6314+
let index = index.unwrap();
6315+
assert_eq!(index.len(), 1);
6316+
let (res_index_id, res_index_meta) = index[0].clone();
6317+
assert_eq!(res_index_id, index_id_2);
6318+
assert_eq!(
6319+
res_index_meta.index_type,
6320+
MarkedDeletedIndexType::AGGREGATING
6321+
);
6322+
}
6323+
}
6324+
62896325
{
62906326
info!("--- list index after drop one");
62916327
let req = ListIndexesReq::new(&tenant, Some(table_id));
@@ -6321,6 +6357,62 @@ impl SchemaApiTestSuite {
63216357
assert!(res.is_empty())
63226358
}
63236359

6360+
{
6361+
info!("--- get marked deleted indexes after drop all");
6362+
let results = vec![
6363+
mt.get_marked_deleted_indexes(&tenant, Some(table_id))
6364+
.await?,
6365+
mt.get_marked_deleted_indexes(&tenant, None).await?,
6366+
];
6367+
for res in results {
6368+
assert_eq!(res.table_indexes.len(), 1);
6369+
let index = res.table_indexes.get(&table_id);
6370+
assert!(index.is_some());
6371+
let index = index.unwrap();
6372+
assert_eq!(index.len(), 2);
6373+
let res_index_ids = index.iter().map(|(id, _)| id).collect::<HashSet<_>>();
6374+
assert!(res_index_ids.contains(&index_id));
6375+
assert!(res_index_ids.contains(&index_id_2));
6376+
6377+
assert!(index.iter().all(|(_, meta)| {
6378+
matches!(meta.index_type, MarkedDeletedIndexType::AGGREGATING)
6379+
}));
6380+
}
6381+
}
6382+
6383+
{
6384+
info!("--- remove marked deleted indexes");
6385+
mt.remove_marked_deleted_index_ids(&tenant, table_id, &[index_id])
6386+
.await?;
6387+
let results = vec![
6388+
mt.get_marked_deleted_indexes(&tenant, Some(table_id))
6389+
.await?,
6390+
mt.get_marked_deleted_indexes(&tenant, None).await?,
6391+
];
6392+
for res in results {
6393+
assert_eq!(res.table_indexes.len(), 1);
6394+
let index = res.table_indexes.get(&table_id);
6395+
assert!(index.is_some());
6396+
let index = index.unwrap();
6397+
assert_eq!(index.len(), 1);
6398+
let (res_index_id, res_index_meta) = index[0].clone();
6399+
assert_eq!(res_index_id, index_id_2);
6400+
assert_eq!(
6401+
res_index_meta.index_type,
6402+
MarkedDeletedIndexType::AGGREGATING
6403+
);
6404+
}
6405+
6406+
mt.remove_marked_deleted_index_ids(&tenant, table_id, &[index_id_2])
6407+
.await?;
6408+
let res = mt
6409+
.get_marked_deleted_indexes(&tenant, Some(table_id))
6410+
.await?;
6411+
assert_eq!(res.table_indexes.len(), 0);
6412+
let res = mt.get_marked_deleted_indexes(&tenant, None).await?;
6413+
assert_eq!(res.table_indexes.len(), 0);
6414+
}
6415+
63246416
{
63256417
info!("--- drop unknown index");
63266418
let res = mt.drop_index(&name_ident_1).await?;

src/meta/app/src/schema/index.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::fmt;
1617
use std::fmt::Display;
1718
use std::fmt::Formatter;
@@ -67,6 +68,18 @@ pub struct IndexMeta {
6768
pub sync_creation: bool,
6869
}
6970

71+
#[derive(Clone, Debug, Eq, PartialEq, num_derive::FromPrimitive)]
72+
pub enum MarkedDeletedIndexType {
73+
AGGREGATING = 1,
74+
INVERTED = 2,
75+
}
76+
77+
#[derive(Clone, Debug, Eq, PartialEq)]
78+
pub struct MarkedDeletedIndexMeta {
79+
pub dropped_on: DateTime<Utc>,
80+
pub index_type: MarkedDeletedIndexType,
81+
}
82+
7083
impl Default for IndexMeta {
7184
fn default() -> Self {
7285
IndexMeta {
@@ -161,6 +174,12 @@ pub struct GetIndexReply {
161174
pub index_meta: IndexMeta,
162175
}
163176

177+
/// Maps table_id to a vector of (index_id, marked_deleted_index_meta) pairs.
178+
#[derive(Clone, Debug, PartialEq, Eq)]
179+
pub struct GetMarkedDeletedIndexesReply {
180+
pub table_indexes: HashMap<u64, Vec<(u64, MarkedDeletedIndexMeta)>>,
181+
}
182+
164183
#[derive(Clone, Debug, PartialEq, Eq)]
165184
pub struct UpdateIndexReq {
166185
pub tenant: Tenant,

0 commit comments

Comments
 (0)