Skip to content

Commit 6892f0c

Browse files
authored
fix(query): fix virtual column bind column (#17907)
* fix(query): fix virtual column bind column * check virtual column data row count * add enable_experimental_virtual_column settings * fix tests * fix tests
1 parent d639b03 commit 6892f0c

File tree

14 files changed

+519
-232
lines changed

14 files changed

+519
-232
lines changed

src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ async fn test_fuse_do_refresh_virtual_column() -> Result<()> {
3434
.default_session()
3535
.get_settings()
3636
.set_data_retention_time_in_days(0)?;
37+
fixture
38+
.default_session()
39+
.get_settings()
40+
.set_enable_experimental_virtual_column(1)?;
3741
fixture.create_default_database().await?;
3842
fixture.create_variant_table().await?;
3943

src/query/ee/tests/it/storages/fuse/operations/virtual_columns_builder.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ use jsonb::OwnedJsonb;
3737
async fn test_virtual_column_builder() -> Result<()> {
3838
let fixture = TestFixture::setup_with_custom(EESetup::new()).await?;
3939

40+
fixture
41+
.default_session()
42+
.get_settings()
43+
.set_enable_experimental_virtual_column(1)?;
4044
fixture.create_default_database().await?;
4145
fixture.create_variant_table().await?;
4246

src/query/settings/src/settings_default.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1299,6 +1299,13 @@ impl DefaultSettings {
12991299
scope: SettingScope::Both,
13001300
range: Some(SettingRange::Numeric(0..=100)),
13011301
}),
1302+
("enable_experimental_virtual_column", DefaultSettingValue {
1303+
value: UserSettingValue::UInt64(0),
1304+
desc: "Enables experimental virtual column",
1305+
mode: SettingMode::Both,
1306+
scope: SettingScope::Both,
1307+
range: Some(SettingRange::Numeric(0..=1)),
1308+
}),
13021309
]);
13031310

13041311
Ok(Arc::new(DefaultSettings {

src/query/settings/src/settings_getter_setter.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -952,4 +952,12 @@ impl Settings {
952952
pub fn get_trace_sample_rate(&self) -> Result<u64> {
953953
self.try_get_u64("trace_sample_rate")
954954
}
955+
956+
pub fn set_enable_experimental_virtual_column(&self, val: u64) -> Result<()> {
957+
self.try_set_u64("enable_experimental_virtual_column", val)
958+
}
959+
960+
pub fn get_enable_experimental_virtual_column(&self) -> Result<bool> {
961+
Ok(self.try_get_u64("enable_experimental_virtual_column")? == 1)
962+
}
955963
}

src/query/sql/src/planner/binder/bind_mutation/mutation_expression.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,11 @@ impl MutationExpression {
114114
.ok_or_else(|| ErrorCode::Internal("Can't get target table index"))?;
115115

116116
// Remove stream columns in source context.
117-
source_context
118-
.columns
119-
.retain(|v| v.visibility == Visibility::Visible);
117+
source_context.columns.retain(|col| {
118+
let read_guard = binder.metadata.read();
119+
let column_entry = read_guard.column(col.index);
120+
!column_entry.is_stream_column()
121+
});
120122

121123
// Add source table columns to required columns.
122124
for column_index in source_context.column_set().iter() {
@@ -199,9 +201,12 @@ impl MutationExpression {
199201
let from_s_expr = if let Some(from) = from {
200202
let (from_s_expr, mut from_context) =
201203
binder.bind_table_reference(&mut bind_context, from)?;
202-
from_context
203-
.columns
204-
.retain(|v| v.visibility == Visibility::Visible);
204+
// Remove stream columns in source context.
205+
let read_guard = binder.metadata.read();
206+
from_context.columns.retain(|col| {
207+
let column_entry = read_guard.column(col.index);
208+
!column_entry.is_stream_column()
209+
});
205210
for column in from_context.columns.iter() {
206211
required_columns.insert(column.index);
207212
bind_context.add_column_binding(column.clone());

src/query/sql/src/planner/binder/bind_query/bind_select.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,14 @@ impl Binder {
7575
}
7676

7777
// whether allow rewrite virtual column and pushdown
78-
let allow_virtual_column = LicenseManagerSwitch::instance()
79-
.check_enterprise_enabled(self.ctx.get_license_key(), Feature::VirtualColumn)
80-
.is_ok();
81-
bind_context.allow_virtual_column = allow_virtual_column;
78+
bind_context.allow_virtual_column = self
79+
.ctx
80+
.get_settings()
81+
.get_enable_experimental_virtual_column()
82+
.unwrap_or_default()
83+
&& LicenseManagerSwitch::instance()
84+
.check_enterprise_enabled(self.ctx.get_license_key(), Feature::VirtualColumn)
85+
.is_ok();
8286

8387
let (mut s_expr, mut from_context) = if stmt.from.is_empty() {
8488
let select_list = &stmt.select_list;

src/query/sql/src/planner/metadata/metadata.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use databend_common_catalog::plan::DataSourcePlan;
2626
use databend_common_catalog::plan::InternalColumn;
2727
use databend_common_catalog::table::Table;
2828
use databend_common_expression::display::display_tuple_field_name;
29+
use databend_common_expression::is_stream_column_id;
2930
use databend_common_expression::types::DataType;
3031
use databend_common_expression::ComputedExpr;
3132
use databend_common_expression::TableDataType;
@@ -766,6 +767,19 @@ impl ColumnEntry {
766767
ColumnEntry::VirtualColumn(VirtualColumn { table_index, .. }) => Some(*table_index),
767768
}
768769
}
770+
771+
pub fn is_stream_column(&self) -> bool {
772+
if let ColumnEntry::BaseTableColumn(BaseTableColumn {
773+
column_id: Some(column_id),
774+
..
775+
}) = self
776+
{
777+
if is_stream_column_id(*column_id) {
778+
return true;
779+
}
780+
}
781+
false
782+
}
769783
}
770784

771785
pub fn optimize_remove_count_args(name: &str, distinct: bool, args: &[&Expr]) -> bool {

src/query/sql/src/planner/semantic/type_check.rs

Lines changed: 12 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -171,12 +171,10 @@ use crate::plans::WindowOrderBy;
171171
use crate::BaseTableColumn;
172172
use crate::BindContext;
173173
use crate::ColumnBinding;
174-
use crate::ColumnBindingBuilder;
175174
use crate::ColumnEntry;
176175
use crate::DefaultExprBinder;
177176
use crate::IndexType;
178177
use crate::MetadataRef;
179-
use crate::Visibility;
180178

181179
/// A helper for type checking.
182180
///
@@ -601,15 +599,6 @@ impl<'a> TypeChecker<'a> {
601599
// as we cast JSON null to SQL NULL.
602600
let target_type = if data_type.remove_nullable() == DataType::Variant {
603601
let target_type = checked_expr.data_type().nest_wrap_nullable();
604-
605-
if let Some(new_scalar) = self.try_rewrite_virtual_column_cast(
606-
expr.span(),
607-
&scalar,
608-
&target_type,
609-
false,
610-
) {
611-
return Ok(Box::new((new_scalar, target_type)));
612-
}
613602
target_type
614603
// if the source type is nullable, cast target type should also be nullable.
615604
} else if data_type.is_nullable_or_null() {
@@ -659,15 +648,6 @@ impl<'a> TypeChecker<'a> {
659648
// as we cast JSON null to SQL NULL.
660649
let target_type = if data_type.remove_nullable() == DataType::Variant {
661650
let target_type = checked_expr.data_type().nest_wrap_nullable();
662-
663-
if let Some(new_scalar) = self.try_rewrite_virtual_column_cast(
664-
expr.span(),
665-
&scalar,
666-
&target_type,
667-
true,
668-
) {
669-
return Ok(Box::new((new_scalar, target_type)));
670-
}
671651
target_type
672652
} else {
673653
checked_expr.data_type().clone()
@@ -1194,105 +1174,6 @@ impl<'a> TypeChecker<'a> {
11941174
Ok(Box::new((scalar, data_type)))
11951175
}
11961176

1197-
fn try_rewrite_virtual_column_cast(
1198-
&mut self,
1199-
span: Span,
1200-
scalar: &ScalarExpr,
1201-
target_type: &DataType,
1202-
is_try: bool,
1203-
) -> Option<ScalarExpr> {
1204-
let Ok(cast_ty) = infer_schema_type(target_type) else {
1205-
return None;
1206-
};
1207-
let ScalarExpr::BoundColumnRef(BoundColumnRef { ref column, .. }) = scalar else {
1208-
return None;
1209-
};
1210-
let table_index = column.table_index?;
1211-
1212-
if column.index >= self.metadata.read().columns().len() {
1213-
return None;
1214-
}
1215-
1216-
// Change the type of virtual column to user specified cast type avoids additional casting overhead,
1217-
// since the user usually knows the real type.
1218-
let column_entry = self.metadata.read().column(column.index).clone();
1219-
let ColumnEntry::VirtualColumn(virtual_column) = column_entry else {
1220-
return None;
1221-
};
1222-
1223-
let virtual_column_name = if is_try {
1224-
format!(
1225-
"try_cast({} as {})",
1226-
column.column_name,
1227-
target_type.remove_nullable().to_string().to_lowercase()
1228-
)
1229-
} else {
1230-
format!(
1231-
"{}::{}",
1232-
column.column_name,
1233-
target_type.remove_nullable().to_string().to_lowercase()
1234-
)
1235-
};
1236-
1237-
// Try resolve the virtual column with the cast type.
1238-
if let Ok(box (new_scalar, _)) = self.resolve(&Expr::ColumnRef {
1239-
span,
1240-
column: ColumnRef {
1241-
database: column
1242-
.database_name
1243-
.as_ref()
1244-
.map(|name| Identifier::from_name(span, name)),
1245-
table: column
1246-
.table_name
1247-
.as_ref()
1248-
.map(|name| Identifier::from_name(span, name)),
1249-
column: ColumnID::Name(Identifier::from_name(span, &virtual_column_name)),
1250-
},
1251-
}) {
1252-
return Some(new_scalar);
1253-
}
1254-
1255-
// Generate a new virtual column with the cast type.
1256-
let database_name = column.database_name.clone();
1257-
let table_name = column.table_name.clone();
1258-
1259-
let mut guard = self.metadata.write();
1260-
let new_column_index = guard.add_virtual_column(
1261-
virtual_column.table_index,
1262-
virtual_column.source_column_name.clone(),
1263-
virtual_column.source_column_id,
1264-
virtual_column.column_id,
1265-
virtual_column_name.clone(),
1266-
cast_ty,
1267-
is_try,
1268-
);
1269-
1270-
let new_column_binding = ColumnBindingBuilder::new(
1271-
virtual_column_name,
1272-
new_column_index,
1273-
Box::new(target_type.clone()),
1274-
Visibility::InVisible,
1275-
)
1276-
.table_name(table_name)
1277-
.database_name(database_name)
1278-
.table_index(Some(table_index))
1279-
.build();
1280-
// Add virtual column with the cast type to the context.
1281-
self.bind_context
1282-
.add_column_binding(new_column_binding.clone());
1283-
1284-
if let Some(scan_id) = guard.base_column_scan_id(virtual_column.column_index) {
1285-
let mut base_column_scan_id = HashMap::new();
1286-
base_column_scan_id.insert(new_column_index, scan_id);
1287-
guard.add_base_column_scan_id(base_column_scan_id);
1288-
}
1289-
1290-
Some(ScalarExpr::BoundColumnRef(BoundColumnRef {
1291-
span,
1292-
column: new_column_binding,
1293-
}))
1294-
}
1295-
12961177
// TODO: remove this function
12971178
fn rewrite_substring(args: &mut [ScalarExpr]) {
12981179
if let ScalarExpr::ConstantExpr(expr) = &args[1] {
@@ -3996,22 +3877,12 @@ impl<'a> TypeChecker<'a> {
39963877
{
39973878
if data_type.remove_nullable() == DataType::Variant {
39983879
let target_type = DataType::Nullable(Box::new(DataType::String));
3999-
let new_scalar = if let Some(new_scalar) = self
4000-
.try_rewrite_virtual_column_cast(
4001-
scalar.span(),
4002-
&scalar,
4003-
&target_type,
4004-
false,
4005-
) {
4006-
new_scalar
4007-
} else {
4008-
ScalarExpr::CastExpr(CastExpr {
4009-
span: scalar.span(),
4010-
is_try: false,
4011-
argument: Box::new(scalar),
4012-
target_type: Box::new(target_type.clone()),
4013-
})
4014-
};
3880+
let new_scalar = ScalarExpr::CastExpr(CastExpr {
3881+
span: scalar.span(),
3882+
is_try: false,
3883+
argument: Box::new(scalar),
3884+
target_type: Box::new(target_type.clone()),
3885+
});
40153886
return Some(Ok(Box::new((new_scalar, target_type))));
40163887
}
40173888
}
@@ -4161,21 +4032,12 @@ impl<'a> TypeChecker<'a> {
41614032
) {
41624033
if func_name == "get_by_keypath_string" {
41634034
let target_type = DataType::Nullable(Box::new(DataType::String));
4164-
let new_scalar = if let Some(new_scalar) = self.try_rewrite_virtual_column_cast(
4165-
scalar.span(),
4166-
&scalar,
4167-
&target_type,
4168-
false,
4169-
) {
4170-
new_scalar
4171-
} else {
4172-
ScalarExpr::CastExpr(CastExpr {
4173-
span: scalar.span(),
4174-
is_try: false,
4175-
argument: Box::new(scalar),
4176-
target_type: Box::new(target_type.clone()),
4177-
})
4178-
};
4035+
let new_scalar = ScalarExpr::CastExpr(CastExpr {
4036+
span: scalar.span(),
4037+
is_try: false,
4038+
argument: Box::new(scalar),
4039+
target_type: Box::new(target_type.clone()),
4040+
});
41794041
return Some(Ok(Box::new((new_scalar, target_type))));
41804042
} else {
41814043
return Some(Ok(Box::new((scalar, data_type))));

src/query/storages/fuse/src/io/write/virtual_column_builder.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ impl VirtualColumnBuilder {
7676
ctx: Arc<dyn TableContext>,
7777
table_info: &TableInfo,
7878
) -> Option<VirtualColumnBuilder> {
79+
if !ctx
80+
.get_settings()
81+
.get_enable_experimental_virtual_column()
82+
.unwrap_or_default()
83+
{
84+
return None;
85+
}
7986
if LicenseManagerSwitch::instance()
8087
.check_enterprise_enabled(ctx.get_license_key(), Feature::VirtualColumn)
8188
.is_err()

src/query/storages/fuse/src/pruning/block_pruner.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,10 @@ impl BlockPruner {
235235
// Check whether can read virtual columns,
236236
// and ignore the source columns.
237237
let virtual_block_meta = virtual_column_pruner
238-
.prune_virtual_columns(&block_meta.virtual_block_meta)
238+
.prune_virtual_columns(
239+
block_meta.row_count,
240+
&block_meta.virtual_block_meta,
241+
)
239242
.await?;
240243
prune_result.virtual_block_meta = virtual_block_meta;
241244
}

0 commit comments

Comments
 (0)