Skip to content

Commit c496fce

Browse files
fix(query): fix bug while mysql external dictionary table contains null values (#16978)
* fix bug while mysql table contains null values * remove option on key
1 parent 8466df7 commit c496fce

File tree

3 files changed

+133
-86
lines changed

3 files changed

+133
-86
lines changed

src/query/service/src/pipelines/processors/transforms/transform_dictionary.rs

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,14 @@ use crate::sql::IndexType;
5757

5858
macro_rules! sqlx_fetch_optional {
5959
($pool:expr, $sql:expr, $key_type:ty, $val_type:ty, $format_val_fn:expr) => {{
60-
let res: Option<($key_type, $val_type)> =
60+
let res: Option<($key_type, Option<$val_type>)> =
6161
sqlx::query_as(&$sql).fetch_optional($pool).await?;
62-
Ok(res.map(|(_, v)| $format_val_fn(v)))
62+
Ok(res.and_then(|(_, v)| v.map($format_val_fn)))
6363
}};
6464
}
6565

6666
macro_rules! fetch_single_row_by_sqlx {
67-
($pool:expr, $sql:expr, $key_scalar:expr, $val_type:ty, $format_val_fn:expr) => {{
67+
($pool:expr, $sql:expr, $key_scalar:expr, $val_type:ty, $format_val_fn:expr) => {
6868
match $key_scalar {
6969
DataType::Boolean => {
7070
sqlx_fetch_optional!($pool, $sql, bool, $val_type, $format_val_fn)
@@ -88,45 +88,62 @@ macro_rules! fetch_single_row_by_sqlx {
8888
$key_scalar,
8989
))),
9090
}
91-
}};
91+
};
9292
}
9393

9494
macro_rules! fetch_all_rows_by_sqlx {
9595
($pool:expr, $sql:expr, $key_scalar:expr, $val_type:ty, $format_key_fn:expr) => {
9696
match $key_scalar {
9797
DataType::Boolean => {
98-
let res: Vec<(bool, $val_type)> = sqlx::query_as($sql).fetch_all($pool).await?;
98+
let res: Vec<(bool, Option<$val_type>)> =
99+
sqlx::query_as($sql).fetch_all($pool).await?;
99100
res.into_iter()
100-
.map(|(k, v)| ($format_key_fn(ScalarRef::Boolean(k)), v))
101+
.filter_map(|(key, val)| match (key, val) {
102+
(k, Some(v)) => Some(($format_key_fn(ScalarRef::Boolean(k)), v)),
103+
_ => None,
104+
})
101105
.collect()
102106
}
103107
DataType::String => {
104-
let res: Vec<(String, $val_type)> = sqlx::query_as($sql).fetch_all($pool).await?;
108+
let res: Vec<(String, Option<$val_type>)> =
109+
sqlx::query_as($sql).fetch_all($pool).await?;
105110
res.into_iter()
106-
.map(|(k, v)| ($format_key_fn(ScalarRef::String(&k)), v))
111+
.filter_map(|(key, val)| match (key, val) {
112+
(k, Some(v)) => Some(($format_key_fn(ScalarRef::String(&k)), v)),
113+
_ => None,
114+
})
107115
.collect()
108116
}
109117
DataType::Number(num_ty) => {
110118
with_integer_mapped_type!(|NUM_TYPE| match num_ty {
111119
NumberDataType::NUM_TYPE => {
112-
let res: Vec<(NUM_TYPE, $val_type)> =
120+
let res: Vec<(NUM_TYPE, Option<$val_type>)> =
113121
sqlx::query_as($sql).fetch_all($pool).await?;
114122
res.into_iter()
115-
.map(|(k, v)| (format!("{}", k), v))
123+
.filter_map(|(key, val)| match (key, val) {
124+
(k, Some(v)) => Some((format!("{}", k), v)),
125+
_ => None,
126+
})
116127
.collect()
117128
}
118129
NumberDataType::Float32 => {
119-
let res: Vec<(f32, $val_type)> =
130+
let res: Vec<(f32, Option<$val_type>)> =
120131
sqlx::query_as($sql).fetch_all($pool).await?;
121132
res.into_iter()
122-
.map(|(k, v)| (format!("{}", k), v))
133+
.filter_map(|(key, val)| match (key, val) {
134+
(k, Some(v)) => Some((format!("{}", k), v)),
135+
_ => None,
136+
})
123137
.collect()
124138
}
125139
NumberDataType::Float64 => {
126-
let res: Vec<(f64, $val_type)> =
140+
let res: Vec<(f64, Option<$val_type>)> =
127141
sqlx::query_as($sql).fetch_all($pool).await?;
128142
res.into_iter()
129-
.map(|(k, v)| (format!("{}", k), v))
143+
.filter_map(|(key, val)| match (key, val) {
144+
(k, Some(v)) => Some((format!("{}", k), v)),
145+
_ => None,
146+
})
130147
.collect()
131148
}
132149
})

tests/sqllogictests/src/mock_source/mysql_source.rs

Lines changed: 100 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ pub fn run_mysql_source() {
6969
struct Backend {
7070
table: String,
7171
schema: Vec<Column>,
72-
block: Vec<Vec<Value>>,
72+
block: Vec<Vec<Option<Value>>>,
7373

7474
prepared_id: u32,
7575
prepared: HashMap<u32, (usize, Vec<usize>, Vec<Expr>)>,
@@ -112,27 +112,42 @@ impl Backend {
112112
},
113113
];
114114

115-
let block = vec![
116-
vec![Value::Int(1), Value::Int(2), Value::Int(3), Value::Int(4)],
115+
let block: Vec<Vec<Option<Value>>> = vec![
117116
vec![
118-
Value::Bytes("Alice".as_bytes().to_vec()),
119-
Value::Bytes("Bob".as_bytes().to_vec()),
120-
Value::Bytes("Lily".as_bytes().to_vec()),
121-
Value::Bytes("Tom".as_bytes().to_vec()),
117+
Some(Value::Int(1)),
118+
Some(Value::Int(2)),
119+
Some(Value::Int(3)),
120+
Some(Value::Int(4)),
121+
Some(Value::Int(5)),
122122
],
123123
vec![
124-
Value::UInt(24),
125-
Value::UInt(35),
126-
Value::UInt(41),
127-
Value::UInt(55),
124+
Some(Value::Bytes("Alice".as_bytes().to_vec())),
125+
Some(Value::Bytes("Bob".as_bytes().to_vec())),
126+
Some(Value::Bytes("Lily".as_bytes().to_vec())),
127+
Some(Value::Bytes("Tom".as_bytes().to_vec())),
128+
None,
128129
],
129130
vec![
130-
Value::Double(100.0),
131-
Value::Double(200.1),
132-
Value::Double(1000.20),
133-
Value::Double(3000.55),
131+
Some(Value::UInt(24)),
132+
Some(Value::UInt(35)),
133+
Some(Value::UInt(41)),
134+
Some(Value::UInt(55)),
135+
None,
136+
],
137+
vec![
138+
Some(Value::Double(100.0)),
139+
Some(Value::Double(200.1)),
140+
Some(Value::Double(1000.20)),
141+
Some(Value::Double(3000.55)),
142+
None,
143+
],
144+
vec![
145+
Some(Value::Int(1)),
146+
Some(Value::Int(0)),
147+
Some(Value::Int(1)),
148+
Some(Value::Int(0)),
149+
None,
134150
],
135-
vec![Value::Int(1), Value::Int(0), Value::Int(1), Value::Int(0)],
136151
];
137152

138153
Self {
@@ -242,7 +257,7 @@ impl<W: io::Read + io::Write> MysqlShim<W> for Backend {
242257
let key_field = self.schema[*key_idx].clone();
243258
let key_column = self.block[*key_idx].clone();
244259

245-
// find matched rows by compare key params.
260+
// step-1: find matched rows by compare key params.
246261
let mut rows: Vec<Option<usize>> = vec![];
247262
match key_field.coltype {
248263
ColumnType::MYSQL_TYPE_TINY => {
@@ -254,9 +269,11 @@ impl<W: io::Read + io::Write> MysqlShim<W> for Backend {
254269
let key = param.parse::<bool>().unwrap();
255270
let key_param = Value::Int(key.into());
256271
for (i, key) in key_column.iter().enumerate() {
257-
if key == &key_param {
258-
rows.push(Some(i));
259-
break;
272+
if let Some(key) = key {
273+
if key == &key_param {
274+
rows.push(Some(i));
275+
break;
276+
}
260277
}
261278
}
262279
}
@@ -270,9 +287,11 @@ impl<W: io::Read + io::Write> MysqlShim<W> for Backend {
270287
let key = param.parse::<u64>().unwrap();
271288
let key_param = Value::UInt(key);
272289
for (i, key) in key_column.iter().enumerate() {
273-
if key == &key_param {
274-
rows.push(Some(i));
275-
break;
290+
if let Some(key) = key {
291+
if key == &key_param {
292+
rows.push(Some(i));
293+
break;
294+
}
276295
}
277296
}
278297
}
@@ -286,9 +305,11 @@ impl<W: io::Read + io::Write> MysqlShim<W> for Backend {
286305
let key = param.parse::<i64>().unwrap();
287306
let key_param = Value::Int(key);
288307
for (i, key) in key_column.iter().enumerate() {
289-
if key == &key_param {
290-
rows.push(Some(i));
291-
break;
308+
if let Some(key) = key {
309+
if key == &key_param {
310+
rows.push(Some(i));
311+
break;
312+
}
292313
}
293314
}
294315
}
@@ -302,9 +323,11 @@ impl<W: io::Read + io::Write> MysqlShim<W> for Backend {
302323
let key = param.parse::<f64>().unwrap();
303324
let key_param = Value::Double(key);
304325
for (i, key) in key_column.iter().enumerate() {
305-
if key == &key_param {
306-
rows.push(Some(i));
307-
break;
326+
if let Some(key) = key {
327+
if key == &key_param {
328+
rows.push(Some(i));
329+
break;
330+
}
308331
}
309332
}
310333
}
@@ -322,16 +345,19 @@ impl<W: io::Read + io::Write> MysqlShim<W> for Backend {
322345
let key = param_str.as_bytes().to_vec();
323346
let key_param = Value::Bytes(key);
324347
for (i, key) in key_column.iter().enumerate() {
325-
if key == &key_param {
326-
rows.push(Some(i));
327-
break;
348+
if let Some(key) = key {
349+
if key == &key_param {
350+
rows.push(Some(i));
351+
break;
352+
}
328353
}
329354
}
330355
}
331356
}
332357
_ => {}
333358
}
334359

360+
// step-2: write columns based on the matched rows
335361
// return NULL if params not matched.
336362
if rows.is_empty() {
337363
return results.completed(0, 0);
@@ -346,7 +372,6 @@ impl<W: io::Read + io::Write> MysqlShim<W> for Backend {
346372
let value_column2 = self.block[value_idx2].clone();
347373

348374
let cols = vec![value_field1.clone(), value_field2.clone()];
349-
350375
let mut rw = results.start(&cols)?;
351376

352377
for row in rows.into_iter().map(|r| r.unwrap()) {
@@ -357,51 +382,56 @@ impl<W: io::Read + io::Write> MysqlShim<W> for Backend {
357382
(value2, value_field2.clone()),
358383
] {
359384
match value {
360-
Value::Bytes(v) => {
361-
rw.write_col(v)?;
362-
}
363-
Value::Int(v) => match value_field.coltype {
364-
ColumnType::MYSQL_TYPE_TINY => {
365-
rw.write_col(v as i8)?;
366-
}
367-
ColumnType::MYSQL_TYPE_SHORT => {
368-
rw.write_col(v as u16)?;
369-
}
370-
ColumnType::MYSQL_TYPE_LONG => {
371-
rw.write_col(v as i32)?;
372-
}
373-
ColumnType::MYSQL_TYPE_LONGLONG => {
385+
Some(val) => match val {
386+
Value::Bytes(v) => {
374387
rw.write_col(v)?;
375388
}
376-
_ => {
377-
unreachable!()
378-
}
379-
},
380-
Value::UInt(v) => match value_field.coltype {
381-
ColumnType::MYSQL_TYPE_TINY => {
382-
rw.write_col(v as u8)?;
383-
}
384-
ColumnType::MYSQL_TYPE_SHORT => {
385-
rw.write_col(v as u16)?;
386-
}
387-
ColumnType::MYSQL_TYPE_LONG => {
388-
rw.write_col(v as u32)?;
389+
Value::Int(v) => match value_field.coltype {
390+
ColumnType::MYSQL_TYPE_TINY => {
391+
rw.write_col(v as i8)?;
392+
}
393+
ColumnType::MYSQL_TYPE_SHORT => {
394+
rw.write_col(v as u16)?;
395+
}
396+
ColumnType::MYSQL_TYPE_LONG => {
397+
rw.write_col(v as i32)?;
398+
}
399+
ColumnType::MYSQL_TYPE_LONGLONG => {
400+
rw.write_col(v)?;
401+
}
402+
_ => {
403+
unreachable!()
404+
}
405+
},
406+
Value::UInt(v) => match value_field.coltype {
407+
ColumnType::MYSQL_TYPE_TINY => {
408+
rw.write_col(v as u8)?;
409+
}
410+
ColumnType::MYSQL_TYPE_SHORT => {
411+
rw.write_col(v as u16)?;
412+
}
413+
ColumnType::MYSQL_TYPE_LONG => {
414+
rw.write_col(v as u32)?;
415+
}
416+
ColumnType::MYSQL_TYPE_LONGLONG => {
417+
rw.write_col(v)?;
418+
}
419+
_ => {
420+
unreachable!()
421+
}
422+
},
423+
Value::Float(v) => {
424+
rw.write_col(v)?;
389425
}
390-
ColumnType::MYSQL_TYPE_LONGLONG => {
426+
Value::Double(v) => {
391427
rw.write_col(v)?;
392428
}
393429
_ => {
394-
unreachable!()
430+
rw.write_col("")?;
395431
}
396432
},
397-
Value::Float(v) => {
398-
rw.write_col(v)?;
399-
}
400-
Value::Double(v) => {
401-
rw.write_col(v)?;
402-
}
403-
_ => {
404-
rw.write_col("")?;
433+
None => {
434+
rw.write_col(None::<i64>)?;
405435
}
406436
}
407437
}

tests/sqllogictests/suites/query/functions/02_0077_function_dict_get.test

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,12 +174,12 @@ select dict_get(mysql_dic_id, 'id', 1), dict_get(mysql_dic_id, 'name', 1), dict_
174174
query ITIFT
175175
select dict_get(mysql_dic_id, 'id', 5), dict_get(mysql_dic_id, 'name', 5), dict_get(mysql_dic_id, 'age', 5), dict_get(mysql_dic_id, 'salary', 5), dict_get(mysql_dic_id, 'active', 5)
176176
----
177-
NULL NULL NULL NULL NULL
177+
5 NULL NULL NULL NULL
178178

179179
query ITIFT
180180
select dict_get(mysql_dic_id_not_null, 'id', 5), dict_get(mysql_dic_id_not_null, 'name', 5), dict_get(mysql_dic_id_not_null, 'age', 5), dict_get(mysql_dic_id_not_null, 'salary', 5), dict_get(mysql_dic_id_not_null, 'active', 5)
181181
----
182-
0 default_name 0 0.0 0
182+
5 default_name 0 0.0 0
183183

184184
statement error 1006
185185
select dict_get(mysql_dic_id, 'id2', 5)

0 commit comments

Comments
 (0)