Skip to content

Commit 67d4a16

Browse files
authored
feat: support specifying columns to include in the "Attach table" statement (#17442)
* feat: support specifying columns to include in the "Attach table" statement * update ut golden file of case parser::test_statement * logic test * clean up * tweak test case 02_0004_attach_table * adjust code comments * show create table * tweak case 02_0004_attach_table
1 parent 19c8921 commit 67d4a16

File tree

18 files changed

+314
-7
lines changed

18 files changed

+314
-7
lines changed

src/query/ast/src/ast/statements/table.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ pub struct AttachTableStmt {
249249
pub catalog: Option<Identifier>,
250250
pub database: Option<Identifier>,
251251
pub table: Identifier,
252+
pub columns_opt: Option<Vec<Identifier>>,
252253
pub uri_location: UriLocation,
253254
}
254255

@@ -263,6 +264,12 @@ impl Display for AttachTableStmt {
263264
.chain(Some(&self.table)),
264265
)?;
265266

267+
if let Some(cols) = &self.columns_opt {
268+
write!(f, " (")?;
269+
write_comma_separated_list(f, cols.iter())?;
270+
write!(f, ")")?;
271+
}
272+
266273
write!(f, " {}", self.uri_location)?;
267274

268275
Ok(())

src/query/ast/src/parser/statement.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -869,13 +869,15 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
869869

870870
let attach_table = map(
871871
rule! {
872-
ATTACH ~ TABLE ~ #dot_separated_idents_1_to_3 ~ #uri_location
872+
ATTACH ~ TABLE ~ #dot_separated_idents_1_to_3 ~ ("(" ~ #comma_separated_list1(ident) ~ ")")? ~ #uri_location
873873
},
874-
|(_, _, (catalog, database, table), uri_location)| {
874+
|(_, _, (catalog, database, table), columns_opt, uri_location)| {
875+
let columns_opt = columns_opt.map(|(_, v, _)| v);
875876
Statement::AttachTable(AttachTableStmt {
876877
catalog,
877878
database,
878879
table,
880+
columns_opt,
879881
uri_location,
880882
})
881883
},

src/query/ast/tests/it/testdata/stmt.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23705,6 +23705,7 @@ AttachTable(
2370523705
quote: None,
2370623706
ident_type: None,
2370723707
},
23708+
columns_opt: None,
2370823709
uri_location: UriLocation {
2370923710
protocol: "s3",
2371023711
name: "a",

src/query/ee/src/attach_table/handler.rs

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
use std::sync::Arc;
1616

1717
use databend_common_base::base::GlobalInstance;
18+
use databend_common_exception::ErrorCode;
19+
use databend_common_exception::Result;
20+
use databend_common_expression::TableSchema;
1821
use databend_common_meta_app::schema::CreateTableReq;
1922
use databend_common_meta_app::schema::TableMeta;
2023
use databend_common_meta_app::schema::TableNameIdent;
@@ -23,6 +26,7 @@ use databend_common_sql::plans::CreateTablePlan;
2326
use databend_common_storage::check_operator;
2427
use databend_common_storage::init_operator;
2528
use databend_common_storages_fuse::io::MetaReaders;
29+
use databend_common_storages_fuse::FUSE_OPT_KEY_ATTACH_COLUMN_IDS;
2630
use databend_common_storages_fuse::FUSE_TBL_LAST_SNAPSHOT_HINT;
2731
use databend_enterprise_attach_table::AttachTableHandler;
2832
use databend_enterprise_attach_table::AttachTableHandlerWrapper;
@@ -71,9 +75,11 @@ impl AttachTableHandler for RealAttachTableHandler {
7175
number_of_blocks: Some(snapshot.summary.block_count),
7276
};
7377

78+
let attach_table_schema = Self::gen_schema(&plan, &snapshot)?;
79+
7480
let field_comments = vec!["".to_string(); snapshot.schema.num_fields()];
7581
let table_meta = TableMeta {
76-
schema: Arc::new(snapshot.schema.clone()),
82+
schema: Arc::new(attach_table_schema),
7783
engine: plan.engine.to_string(),
7884
storage_params: plan.storage_params.clone(),
7985
options,
@@ -105,4 +111,63 @@ impl RealAttachTableHandler {
105111
GlobalInstance::set(Arc::new(wrapper));
106112
Ok(())
107113
}
114+
115+
fn gen_schema(
116+
plan: &&CreateTablePlan,
117+
base_table_snapshot: &Arc<TableSnapshot>,
118+
) -> Result<TableSchema> {
119+
let schema = if let Some(attached_columns) = &plan.attached_columns {
120+
// Columns to include are specified, let's check them
121+
let base_table_schema = &base_table_snapshot.schema;
122+
let mut fields_to_attach = Vec::with_capacity(attached_columns.len());
123+
124+
// The ids of columns being included
125+
let mut field_ids_to_include = Vec::with_capacity(attached_columns.len());
126+
127+
// Columns that do not exist in the table being attached to, if any
128+
let mut invalid_cols = vec![];
129+
for field in attached_columns {
130+
match base_table_schema.field_with_name(&field.name) {
131+
Ok(f) => {
132+
field_ids_to_include.push(f.column_id);
133+
fields_to_attach.push(f.clone())
134+
}
135+
Err(_) => invalid_cols.push(field.name.as_str()),
136+
}
137+
}
138+
if !invalid_cols.is_empty() {
139+
return Err(ErrorCode::InvalidArgument(format!(
140+
"Columns [{}] do not exist in the table being attached to",
141+
invalid_cols.join(",")
142+
)));
143+
}
144+
145+
let new_table_schema_metadata = if !field_ids_to_include.is_empty() {
146+
// If columns to include are specified explicitly, their ids should
147+
// be kept in the metadata of TableSchema.
148+
let ids = field_ids_to_include
149+
.iter()
150+
.map(|id| format!("{id}"))
151+
.collect::<Vec<_>>()
152+
.join(",");
153+
let mut v = base_table_schema.metadata.clone();
154+
v.insert(FUSE_OPT_KEY_ATTACH_COLUMN_IDS.to_owned(), ids);
155+
v
156+
} else {
157+
base_table_schema.metadata.clone()
158+
};
159+
160+
TableSchema {
161+
fields: fields_to_attach,
162+
metadata: new_table_schema_metadata,
163+
next_column_id: base_table_schema.next_column_id,
164+
}
165+
} else {
166+
// If columns are not specified, use all the fields of table being attached to,
167+
// in this case, no schema meta of key FUSE_OPT_KEY_ATTACH_COLUMN_IDS will be kept.
168+
base_table_snapshot.schema.clone()
169+
};
170+
171+
Ok(schema)
172+
}
108173
}

src/query/ee/tests/it/inverted_index/pruning.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ async fn test_block_pruner() -> Result<()> {
122122
as_select: None,
123123
cluster_key: None,
124124
inverted_indexes: None,
125+
attached_columns: None,
125126
};
126127

127128
let interpreter = CreateTableInterpreter::try_create(ctx.clone(), create_table_plan)?;

src/query/service/src/interpreters/interpreter_table_show_create.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use databend_common_expression::DataBlock;
2727
use databend_common_expression::Scalar;
2828
use databend_common_expression::Value;
2929
use databend_common_sql::plans::ShowCreateTablePlan;
30+
use databend_common_storages_fuse::FUSE_OPT_KEY_ATTACH_COLUMN_IDS;
3031
use databend_common_storages_stream::stream_table::StreamTable;
3132
use databend_common_storages_stream::stream_table::STREAM_ENGINE;
3233
use databend_common_storages_view::view_table::QUERY;
@@ -37,6 +38,7 @@ use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE;
3738
use databend_storages_common_table_meta::table::OPT_KEY_STORAGE_PREFIX;
3839
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_ATTACHED_DATA_URI;
3940
use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX;
41+
use itertools::Itertools;
4042

4143
use crate::interpreters::Interpreter;
4244
use crate::pipelines::PipelineBuildResult;
@@ -321,15 +323,35 @@ impl ShowCreateTableInterpreter {
321323
}
322324

323325
fn show_attach_table_query(table: &dyn Table, database: &str) -> String {
324-
// TODO table that attached before this PR, could not show location properly
326+
// Note: Tables that attached before this PR #13403, could not show location properly
325327
let location_not_available = "N/A".to_string();
326328
let table_data_location = table
327329
.options()
328330
.get(OPT_KEY_TABLE_ATTACHED_DATA_URI)
329331
.unwrap_or(&location_not_available);
330332

333+
let table_info = table.get_table_info();
334+
335+
let mut include_cols = "".to_string();
336+
if table_info
337+
.meta
338+
.schema
339+
.metadata
340+
.contains_key(FUSE_OPT_KEY_ATTACH_COLUMN_IDS)
341+
{
342+
let cols = table_info
343+
.meta
344+
.schema
345+
.fields
346+
.iter()
347+
.map(|f| &f.name)
348+
.join(",");
349+
include_cols = format!("({cols}) ");
350+
}
351+
331352
format!(
332-
"ATTACH TABLE `{}`.`{}` {}",
353+
"ATTACH TABLE {}`{}`.`{}` {}",
354+
include_cols,
333355
database,
334356
table.name(),
335357
table_data_location,

src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ async fn create_memory_table_for_cte_scan(
306306
cluster_key: None,
307307
as_select: None,
308308
inverted_indexes: None,
309+
attached_columns: None,
309310
};
310311
let create_table_interpreter =
311312
CreateTableInterpreter::try_create(ctx.clone(), create_table_plan)?;

src/query/service/src/test_kits/fixture.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,7 @@ impl TestFixture {
348348
as_select: None,
349349
cluster_key: Some("(id)".to_string()),
350350
inverted_indexes: None,
351+
attached_columns: None,
351352
}
352353
}
353354

@@ -372,6 +373,7 @@ impl TestFixture {
372373
as_select: None,
373374
cluster_key: None,
374375
inverted_indexes: None,
376+
attached_columns: None,
375377
}
376378
}
377379

@@ -407,6 +409,7 @@ impl TestFixture {
407409
as_select: None,
408410
cluster_key: None,
409411
inverted_indexes: None,
412+
attached_columns: None,
410413
}
411414
}
412415

@@ -442,6 +445,7 @@ impl TestFixture {
442445
as_select: None,
443446
cluster_key: None,
444447
inverted_indexes: None,
448+
attached_columns: None,
445449
}
446450
}
447451

@@ -486,6 +490,7 @@ impl TestFixture {
486490
as_select: None,
487491
cluster_key: None,
488492
inverted_indexes: None,
493+
attached_columns: None,
489494
}
490495
}
491496

src/query/service/tests/it/sql/planner/optimizer/agg_index_query_rewrite.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ fn create_table_plan(fixture: &TestFixture, format: &str) -> CreateTablePlan {
8484
as_select: None,
8585
cluster_key: None,
8686
inverted_indexes: None,
87+
attached_columns: None,
8788
}
8889
}
8990

src/query/service/tests/it/storages/fuse/operations/clustering.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ async fn test_fuse_alter_table_cluster_key() -> databend_common_exception::Resul
5353
as_select: None,
5454
cluster_key: None,
5555
inverted_indexes: None,
56+
attached_columns: None,
5657
};
5758

5859
// create test table

0 commit comments

Comments
 (0)