Skip to content

Commit 95f1e61

Browse files
committed
feat: deal will array of tuple which missing fields
1 parent b7f8498 commit 95f1e61

File tree

3 files changed

+187
-96
lines changed

3 files changed

+187
-96
lines changed

src/query/sql/src/evaluator/block_operator.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ impl BlockOperator {
6767
None => Ok(input),
6868
}
6969
} else {
70+
// for (i, c) in input.columns().iter().enumerate() {
71+
// println!("{i}: {} = {:?}",c.data_type, c.value.index(0))
72+
// }
7073
for expr in exprs {
7174
let evaluator = Evaluator::new(&input, func_ctx, &BUILTIN_FUNCTIONS);
7275
let result = evaluator.run(expr)?;

src/query/storages/common/stage/src/read/columnar/projection.rs

Lines changed: 125 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use databend_common_expression::Expr;
2121
use databend_common_expression::RemoteExpr;
2222
use databend_common_expression::Scalar;
2323
use databend_common_expression::TableDataType;
24+
use databend_common_expression::TableField;
2425
use databend_common_expression::TableSchemaRef;
2526
use databend_common_functions::BUILTIN_FUNCTIONS;
2627
use databend_common_meta_app::principal::NullAs;
@@ -91,6 +92,39 @@ pub fn project_columnar(
9192
from_field.data_type.remove_nullable(),
9293
to_field.data_type.remove_nullable(),
9394
) {
95+
(
96+
TableDataType::Array(box TableDataType::Nullable(
97+
box TableDataType::Tuple {
98+
fields_name: from_fields_name,
99+
fields_type: _from_fields_type,
100+
},
101+
)),
102+
TableDataType::Array(box TableDataType::Nullable(
103+
box TableDataType::Tuple {
104+
fields_name: to_fields_name,
105+
fields_type: _to_fields_type,
106+
},
107+
)),
108+
) => {
109+
let mut v = vec![];
110+
for to in to_fields_name.iter() {
111+
match from_fields_name.iter().position(|k| k == to) {
112+
Some(p) => v.push(p as i32),
113+
None => v.push(-1),
114+
};
115+
}
116+
let name = v
117+
.iter()
118+
.map(|v| v.to_string())
119+
.collect::<Vec<_>>()
120+
.join(",");
121+
Expr::ColumnRef {
122+
span: None,
123+
id: pos,
124+
data_type: from_field.data_type().into(),
125+
display_name: format!("#!{name}",),
126+
}
127+
}
94128
(
95129
TableDataType::Tuple {
96130
fields_name: from_fields_name,
@@ -100,102 +134,15 @@ pub fn project_columnar(
100134
fields_name: to_fields_name,
101135
fields_type: to_fields_type,
102136
},
103-
) => {
104-
println!("tuple: {from_fields_name:?} {from_fields_type:?} to {to_fields_name:?} {to_fields_type:?}");
105-
let mut inner_columns = Vec::with_capacity(to_fields_name.len());
106-
107-
for (to_field_name, to_field_type) in
108-
to_fields_name.iter().zip(to_fields_type.iter())
109-
{
110-
let inner_column = match from_fields_name
111-
.iter()
112-
.position(|k| k == to_field_name)
113-
{
114-
Some(idx) => {
115-
let from_field_type =
116-
from_fields_type.get(idx).unwrap();
117-
let tuple_idx = Scalar::Number(NumberScalar::Int64(
118-
(idx + 1) as i64,
119-
));
120-
let inner_column = check_function(
121-
None,
122-
"get",
123-
&[tuple_idx],
124-
&[expr.clone()],
125-
&BUILTIN_FUNCTIONS,
126-
)?;
127-
if from_field_type != to_field_type {
128-
check_cast(
129-
None,
130-
false,
131-
inner_column,
132-
&to_field_type.into(),
133-
&BUILTIN_FUNCTIONS,
134-
)?
135-
} else {
136-
inner_column
137-
}
138-
}
139-
None => {
140-
// if inner field not exists, fill default value.
141-
let data_type: DataType = to_field_type.into();
142-
let scalar = Scalar::default_value(&data_type);
143-
Expr::Constant {
144-
span: None,
145-
scalar,
146-
data_type,
147-
}
148-
}
149-
};
150-
inner_columns.push(inner_column);
151-
}
152-
let tuple_column = check_function(
153-
None,
154-
"tuple",
155-
&[],
156-
&inner_columns,
157-
&BUILTIN_FUNCTIONS,
158-
)?;
159-
let tuple_column = if from_field.data_type != to_field.data_type {
160-
let dest_ty: DataType = (&to_field.data_type).into();
161-
check_cast(
162-
None,
163-
false,
164-
tuple_column,
165-
&dest_ty,
166-
&BUILTIN_FUNCTIONS,
167-
)?
168-
} else {
169-
tuple_column
170-
};
171-
172-
if from_field.data_type.is_nullable()
173-
&& to_field.data_type.is_nullable()
174-
{
175-
// add if function to cast null value
176-
let is_not_null = check_function(
177-
None,
178-
"is_not_null",
179-
&[],
180-
&[expr.clone()],
181-
&BUILTIN_FUNCTIONS,
182-
)?;
183-
let null_scalar = Expr::Constant {
184-
span: None,
185-
scalar: Scalar::Null,
186-
data_type: DataType::Null,
187-
};
188-
check_function(
189-
None,
190-
"if",
191-
&[],
192-
&[is_not_null, tuple_column, null_scalar],
193-
&BUILTIN_FUNCTIONS,
194-
)?
195-
} else {
196-
tuple_column
197-
}
198-
}
137+
) => project_tuple(
138+
expr,
139+
from_field,
140+
to_field,
141+
&from_fields_name,
142+
&from_fields_type,
143+
&to_fields_name,
144+
&to_fields_type,
145+
)?,
199146
(_, _) => {
200147
return Err(ErrorCode::BadDataValueType(format!(
201148
"fail to load file {}: Cannot cast column {} from {:?} to {:?}",
@@ -256,3 +203,85 @@ pub fn project_columnar(
256203
}
257204
Ok((output_projection, pushdown_columns))
258205
}
206+
207+
fn project_tuple(
208+
expr: Expr,
209+
from_field: &TableField,
210+
to_field: &TableField,
211+
from_fields_name: &[String],
212+
from_fields_type: &[TableDataType],
213+
to_fields_name: &[String],
214+
to_fields_type: &[TableDataType],
215+
) -> databend_common_exception::Result<Expr> {
216+
let mut inner_columns = Vec::with_capacity(to_fields_name.len());
217+
218+
for (to_field_name, to_field_type) in to_fields_name.iter().zip(to_fields_type.iter()) {
219+
let inner_column = match from_fields_name.iter().position(|k| k == to_field_name) {
220+
Some(idx) => {
221+
let from_field_type = from_fields_type.get(idx).unwrap();
222+
let tuple_idx = Scalar::Number(NumberScalar::Int64((idx + 1) as i64));
223+
let inner_column = check_function(
224+
None,
225+
"get",
226+
&[tuple_idx],
227+
&[expr.clone()],
228+
&BUILTIN_FUNCTIONS,
229+
)?;
230+
if from_field_type != to_field_type {
231+
check_cast(
232+
None,
233+
false,
234+
inner_column,
235+
&to_field_type.into(),
236+
&BUILTIN_FUNCTIONS,
237+
)?
238+
} else {
239+
inner_column
240+
}
241+
}
242+
None => {
243+
// if inner field not exists, fill default value.
244+
let data_type: DataType = to_field_type.into();
245+
let scalar = Scalar::default_value(&data_type);
246+
Expr::Constant {
247+
span: None,
248+
scalar,
249+
data_type,
250+
}
251+
}
252+
};
253+
inner_columns.push(inner_column);
254+
}
255+
let tuple_column = check_function(None, "tuple", &[], &inner_columns, &BUILTIN_FUNCTIONS)?;
256+
let tuple_column = if from_field.data_type != to_field.data_type {
257+
let dest_ty: DataType = (&to_field.data_type).into();
258+
check_cast(None, false, tuple_column, &dest_ty, &BUILTIN_FUNCTIONS)?
259+
} else {
260+
tuple_column
261+
};
262+
263+
if from_field.data_type.is_nullable() && to_field.data_type.is_nullable() {
264+
// add if function to cast null value
265+
let is_not_null = check_function(
266+
None,
267+
"is_not_null",
268+
&[],
269+
&[expr.clone()],
270+
&BUILTIN_FUNCTIONS,
271+
)?;
272+
let null_scalar = Expr::Constant {
273+
span: None,
274+
scalar: Scalar::Null,
275+
data_type: DataType::Null,
276+
};
277+
check_function(
278+
None,
279+
"if",
280+
&[],
281+
&[is_not_null, tuple_column, null_scalar],
282+
&BUILTIN_FUNCTIONS,
283+
)
284+
} else {
285+
Ok(tuple_column)
286+
}
287+
}

src/query/storages/orc/src/copy_into_table/processors/decoder.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,19 @@ use arrow_array::RecordBatch;
2222
use databend_common_catalog::query_kind::QueryKind;
2323
use databend_common_catalog::table_context::TableContext;
2424
use databend_common_exception::Result;
25+
use databend_common_expression::types::ArrayColumn;
26+
use databend_common_expression::types::DataType;
27+
use databend_common_expression::types::NullableColumn;
2528
use databend_common_expression::BlockEntry;
2629
use databend_common_expression::BlockMetaInfoDowncast;
30+
use databend_common_expression::Column;
31+
use databend_common_expression::ColumnBuilder;
2732
use databend_common_expression::DataBlock;
2833
use databend_common_expression::DataSchemaRef;
2934
use databend_common_expression::Evaluator;
3035
use databend_common_expression::Expr;
3136
use databend_common_expression::FunctionContext;
37+
use databend_common_expression::Value;
3238
use databend_common_functions::BUILTIN_FUNCTIONS;
3339
use databend_common_pipeline_core::processors::Event;
3440
use databend_common_pipeline_core::processors::InputPort;
@@ -96,6 +102,59 @@ impl StripeDecoderForCopy {
96102
let evaluator = Evaluator::new(&block, &self.func_ctx, &BUILTIN_FUNCTIONS);
97103
let mut columns = Vec::with_capacity(projection.len());
98104
for (field, expr) in self.output_schema.fields().iter().zip(projection.iter()) {
105+
if let Expr::ColumnRef {
106+
display_name, id, ..
107+
} = expr
108+
{
109+
if display_name.starts_with("#!") {
110+
let typs = match field.data_type() {
111+
DataType::Nullable(box DataType::Array(box DataType::Nullable(
112+
box DataType::Tuple(v),
113+
))) => v,
114+
_ => {
115+
log::error!("expect array of tuple, got {:?}", field);
116+
unreachable!("expect value: array of tuple")
117+
}
118+
};
119+
let positions = display_name[2..]
120+
.split(',')
121+
.map(|s| s.parse::<i32>().unwrap())
122+
.collect::<Vec<i32>>();
123+
let mut e = block.columns()[*id].clone();
124+
match e.value {
125+
Value::Column(Column::Nullable(box NullableColumn {
126+
column:
127+
Column::Array(box ArrayColumn {
128+
values:
129+
Column::Nullable(box NullableColumn {
130+
column: Column::Tuple(ref mut v),
131+
..
132+
}),
133+
..
134+
}),
135+
..
136+
})) => {
137+
let len = v[0].len();
138+
let mut v2 = vec![];
139+
for (i, p) in positions.iter().enumerate() {
140+
if *p < 0 {
141+
v2.push(ColumnBuilder::repeat_default(&typs[i], len).build());
142+
} else {
143+
v2.push(v[*p as usize].clone());
144+
}
145+
}
146+
*v = v2
147+
}
148+
_ => {
149+
log::error!("expect array of tuple, got {:?} {:?}", field, e.value);
150+
unreachable!("expect value: array of tuple")
151+
}
152+
}
153+
let column = BlockEntry::new(field.data_type().clone(), e.value);
154+
columns.push(column);
155+
continue;
156+
}
157+
}
99158
let value = evaluator.run(expr)?;
100159
let column = BlockEntry::new(field.data_type().clone(), value);
101160
columns.push(column);

0 commit comments

Comments
 (0)