Skip to content

Commit 3c41e93

Browse files
committed
chore: simplify alter and drop cluster key logic
1 parent cce8c74 commit 3c41e93

File tree

27 files changed

+134
-288
lines changed

27 files changed

+134
-288
lines changed

src/common/base/src/runtime/metrics/family_metrics/counter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ impl<Labels: FamilyLabels> FamilyCounter<Labels> {
4242
FamilyCounter {
4343
index,
4444
labels,
45-
value: Arc::new(Default::default()),
45+
value: Default::default(),
4646
}
4747
}
4848

src/query/catalog/src/table.rs

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::any::Any;
1616
use std::collections::BTreeMap;
17+
use std::collections::HashMap;
1718
use std::sync::Arc;
1819

1920
use chrono::DateTime;
@@ -22,12 +23,13 @@ use databend_common_exception::ErrorCode;
2223
use databend_common_exception::Result;
2324
use databend_common_expression::BlockThresholds;
2425
use databend_common_expression::ColumnId;
25-
use databend_common_expression::RemoteExpr;
2626
use databend_common_expression::Scalar;
2727
use databend_common_expression::TableSchema;
2828
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
2929
use databend_common_io::constants::DEFAULT_BLOCK_MAX_ROWS;
3030
use databend_common_io::constants::DEFAULT_BLOCK_MIN_ROWS;
31+
use databend_common_meta_app::app_error::AppError;
32+
use databend_common_meta_app::app_error::UnknownTableId;
3133
use databend_common_meta_app::schema::TableIdent;
3234
use databend_common_meta_app::schema::TableInfo;
3335
use databend_common_meta_app::schema::TableMeta;
@@ -37,9 +39,12 @@ use databend_common_meta_types::MetaId;
3739
use databend_common_pipeline_core::Pipeline;
3840
use databend_common_storage::Histogram;
3941
use databend_common_storage::StorageMetrics;
42+
use databend_storages_common_table_meta::meta::ClusterKey;
4043
use databend_storages_common_table_meta::meta::SnapshotId;
4144
use databend_storages_common_table_meta::meta::TableSnapshot;
4245
use databend_storages_common_table_meta::table::ChangeType;
46+
use databend_storages_common_table_meta::table::ClusterType;
47+
use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE;
4348
use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX;
4449
use databend_storages_common_table_meta::table_id_ranges::is_temp_table_id;
4550

@@ -117,8 +122,18 @@ pub trait Table: Sync + Send {
117122
false
118123
}
119124

120-
fn cluster_keys(&self, _ctx: Arc<dyn TableContext>) -> Vec<RemoteExpr<String>> {
121-
vec![]
125+
fn cluster_key_meta(&self) -> Option<ClusterKey> {
126+
None
127+
}
128+
129+
fn cluster_type(&self) -> Option<ClusterType> {
130+
self.cluster_key_meta()?;
131+
let cluster_type = self
132+
.options()
133+
.get(OPT_KEY_CLUSTER_TYPE)
134+
.and_then(|s| s.parse::<ClusterType>().ok())
135+
.unwrap_or(ClusterType::Linear);
136+
Some(cluster_type)
122137
}
123138

124139
fn change_tracking_enabled(&self) -> bool {
@@ -159,31 +174,6 @@ pub trait Table: Sync + Send {
159174
false
160175
}
161176

162-
#[async_backtrace::framed]
163-
async fn alter_table_cluster_keys(
164-
&self,
165-
ctx: Arc<dyn TableContext>,
166-
cluster_key: String,
167-
cluster_type: String,
168-
) -> Result<()> {
169-
let (_, _, _) = (ctx, cluster_key, cluster_type);
170-
171-
Err(ErrorCode::UnsupportedEngineParams(format!(
172-
"Altering table cluster keys is not supported for the '{}' engine.",
173-
self.engine()
174-
)))
175-
}
176-
177-
#[async_backtrace::framed]
178-
async fn drop_table_cluster_keys(&self, ctx: Arc<dyn TableContext>) -> Result<()> {
179-
let _ = ctx;
180-
181-
Err(ErrorCode::UnsupportedEngineParams(format!(
182-
"Dropping table cluster keys is not supported for the '{}' engine.",
183-
self.engine()
184-
)))
185-
}
186-
187177
/// Gather partitions to be scanned according to the push_downs
188178
#[async_backtrace::framed]
189179
async fn read_partitions(
@@ -584,11 +574,6 @@ pub struct NavigationDescriptor {
584574
pub point: NavigationPoint,
585575
}
586576

587-
use std::collections::HashMap;
588-
589-
use databend_common_meta_app::app_error::AppError;
590-
use databend_common_meta_app::app_error::UnknownTableId;
591-
592577
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)]
593578
pub struct ParquetTableColumnStatisticsProvider {
594579
column_stats: HashMap<ColumnId, Option<BasicColumnStatistics>>,

src/query/service/src/interpreters/hook/compact_hook.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ async fn compact_table(
194194

195195
{
196196
// do recluster.
197-
if !table.cluster_keys(ctx.clone()).is_empty() {
197+
if table.cluster_key_meta().is_some() {
198198
let recluster = RelOperator::Recluster(Recluster {
199199
catalog: compact_target.catalog,
200200
database: compact_target.database,

src/query/service/src/interpreters/interpreter_cluster_key_alter.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,14 @@
1414

1515
use std::sync::Arc;
1616

17+
use databend_common_catalog::table::Table;
18+
use databend_common_catalog::table::TableExt;
1719
use databend_common_exception::Result;
20+
use databend_common_meta_app::schema::UpdateTableMetaReq;
21+
use databend_common_meta_types::MatchSeq;
1822
use databend_common_sql::plans::AlterTableClusterKeyPlan;
23+
use databend_common_storages_fuse::FuseTable;
24+
use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE;
1925

2026
use super::Interpreter;
2127
use crate::pipelines::PipelineBuildResult;
@@ -52,12 +58,35 @@ impl Interpreter for AlterTableClusterKeyInterpreter {
5258
let table = catalog
5359
.get_table(&tenant, &plan.database, &plan.table)
5460
.await?;
61+
// check mutability
62+
table.check_mutable()?;
5563

64+
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
5665
let cluster_key_str = format!("({})", plan.cluster_keys.join(", "));
66+
// if new cluster_key_str is the same with old one,
67+
// no need to change
68+
if let Some(old_cluster_key_str) = fuse_table.cluster_key_str()
69+
&& *old_cluster_key_str == cluster_key_str
70+
{
71+
let old_cluster_type = fuse_table.cluster_type();
72+
if old_cluster_type.is_some_and(|v| v.to_string().to_lowercase() == plan.cluster_type) {
73+
return Ok(PipelineBuildResult::create());
74+
}
75+
}
5776

58-
table
59-
.alter_table_cluster_keys(self.ctx.clone(), cluster_key_str, plan.cluster_type.clone())
60-
.await?;
77+
let table_info = fuse_table.get_table_info();
78+
let mut new_table_meta = table_info.meta.clone();
79+
new_table_meta
80+
.options
81+
.insert(OPT_KEY_CLUSTER_TYPE.to_owned(), plan.cluster_type.clone());
82+
new_table_meta = new_table_meta.push_cluster_key(cluster_key_str);
83+
84+
let req = UpdateTableMetaReq {
85+
table_id: table_info.ident.table_id,
86+
seq: MatchSeq::Exact(table_info.ident.seq),
87+
new_table_meta,
88+
};
89+
catalog.update_single_table_meta(req, table_info).await?;
6190

6291
Ok(PipelineBuildResult::create())
6392
}

src/query/service/src/interpreters/interpreter_cluster_key_drop.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,14 @@
1414

1515
use std::sync::Arc;
1616

17+
use databend_common_catalog::table::Table;
18+
use databend_common_catalog::table::TableExt;
1719
use databend_common_exception::Result;
20+
use databend_common_meta_app::schema::UpdateTableMetaReq;
21+
use databend_common_meta_types::MatchSeq;
1822
use databend_common_sql::plans::DropTableClusterKeyPlan;
23+
use databend_common_storages_fuse::FuseTable;
24+
use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE;
1925

2026
use super::Interpreter;
2127
use crate::pipelines::PipelineBuildResult;
@@ -52,8 +58,25 @@ impl Interpreter for DropTableClusterKeyInterpreter {
5258
let table = catalog
5359
.get_table(&tenant, &plan.database, &plan.table)
5460
.await?;
61+
if table.cluster_key_meta().is_none() {
62+
return Ok(PipelineBuildResult::create());
63+
}
64+
// check mutability
65+
table.check_mutable()?;
5566

56-
table.drop_table_cluster_keys(self.ctx.clone()).await?;
67+
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
68+
let table_info = fuse_table.get_table_info();
69+
let mut new_table_meta = table_info.meta.clone();
70+
new_table_meta.default_cluster_key = None;
71+
new_table_meta.default_cluster_key_id = None;
72+
new_table_meta.options.remove(OPT_KEY_CLUSTER_TYPE);
73+
74+
let req = UpdateTableMetaReq {
75+
table_id: table_info.ident.table_id,
76+
seq: MatchSeq::Exact(table_info.ident.seq),
77+
new_table_meta,
78+
};
79+
catalog.update_single_table_meta(req, table_info).await?;
5780

5881
Ok(PipelineBuildResult::create())
5982
}

src/query/service/src/interpreters/interpreter_replace.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ impl ReplaceInterpreter {
290290
.ctx
291291
.get_settings()
292292
.get_replace_into_bloom_pruning_max_column_number()?;
293-
let bloom_filter_column_indexes = if !table.cluster_keys(self.ctx.clone()).is_empty() {
293+
let bloom_filter_column_indexes = if table.cluster_key_meta().is_some() {
294294
fuse_table
295295
.choose_bloom_filter_columns(
296296
self.ctx.clone(),

src/query/service/src/servers/flight_sql/flight_sql_service/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl FlightSqlServiceImpl {
5959
pub fn create() -> Self {
6060
FlightSqlServiceImpl {
6161
sessions: Mutex::new(Default::default()),
62-
statements: Arc::new(Default::default()),
62+
statements: Default::default(),
6363
}
6464
}
6565
}

src/query/service/src/sessions/query_ctx.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ impl QueryContext {
180180
fragment_id: Arc::new(AtomicUsize::new(0)),
181181
inserted_segment_locs: Arc::new(RwLock::new(HashSet::new())),
182182
block_threshold: Arc::new(RwLock::new(BlockThresholds::default())),
183-
m_cte_temp_table: Arc::new(Default::default()),
183+
m_cte_temp_table: Default::default(),
184184
})
185185
}
186186

src/query/service/src/sessions/query_ctx_shared.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,8 @@ impl QueryContextShared {
181181
finish_time: Default::default(),
182182
on_error_map: Arc::new(RwLock::new(None)),
183183
on_error_mode: Arc::new(RwLock::new(None)),
184-
copy_status: Arc::new(Default::default()),
185-
mutation_status: Arc::new(Default::default()),
184+
copy_status: Default::default(),
185+
mutation_status: Default::default(),
186186
partitions_shas: Arc::new(RwLock::new(vec![])),
187187
cacheable: Arc::new(AtomicBool::new(true)),
188188
can_scan_from_agg_index: Arc::new(AtomicBool::new(true)),

src/query/service/src/sessions/session.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ impl Session {
6363
session_ctx: Box<SessionContext>,
6464
mysql_connection_id: Option<u32>,
6565
) -> Result<Session> {
66-
let status = Arc::new(Default::default());
66+
let status = Default::default();
6767
Ok(Session {
6868
id,
6969
typ: RwLock::new(typ),

0 commit comments

Comments
 (0)