Skip to content

Commit 78629a5

Browse files
authored
feat(query): support json_object_delete and json_object_pick function (#16682)
* feat(query): support `json_object_delete`, `json_object_pick` function * add tests * fix
1 parent ab4bcd5 commit 78629a5

File tree

9 files changed

+527
-15
lines changed

9 files changed

+527
-15
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "7226
415415
color-eyre = { git = "https://github.com/eyre-rs/eyre.git", rev = "e5d92c3" }
416416
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "57795da" }
417417
ethnum = { git = "https://github.com/datafuse-extras/ethnum-rs", rev = "4cb05f1" }
418-
jsonb = { git = "https://github.com/databendlabs/jsonb", rev = "672e423" }
418+
jsonb = { git = "https://github.com/databendlabs/jsonb", rev = "ada713c" }
419419
openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" }
420420
orc-rust = { git = "https://github.com/datafuse-extras/datafusion-orc", rev = "03372b97" }
421421
recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "6af35a1" }

src/query/expression/src/evaluator.rs

Lines changed: 115 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::BTreeMap;
1516
use std::collections::HashMap;
1617
use std::ops::Not;
1718

@@ -35,6 +36,7 @@ use crate::types::array::ArrayColumn;
3536
use crate::types::boolean::BooleanDomain;
3637
use crate::types::nullable::NullableColumn;
3738
use crate::types::nullable::NullableDomain;
39+
use crate::types::string::StringColumnBuilder;
3840
use crate::types::ArgType;
3941
use crate::types::ArrayType;
4042
use crate::types::BooleanType;
@@ -507,7 +509,7 @@ impl<'a> Evaluator<'a> {
507509
},
508510
(DataType::Variant, DataType::Array(inner_dest_ty)) => {
509511
let empty_vec = vec![];
510-
let temp_array: jsonb::Value;
512+
let mut temp_array: jsonb::Value;
511513
match value {
512514
Value::Scalar(Scalar::Variant(x)) => {
513515
let array = if validity.as_ref().map(|v| v.get_bit(0)).unwrap_or(true) {
@@ -520,22 +522,16 @@ impl<'a> Evaluator<'a> {
520522
} else {
521523
&empty_vec
522524
};
523-
524-
let column = VariantType::create_column_from_variants(array.as_slice());
525-
526-
let validity = validity.map(|validity| {
527-
Bitmap::new_constant(
528-
validity.unset_bits() != validity.len(),
529-
column.len(),
530-
)
531-
});
532-
525+
let validity = None;
526+
let column = Column::Variant(VariantType::create_column_from_variants(
527+
array.as_slice(),
528+
));
533529
let new_array = self
534530
.run_cast(
535531
span,
536532
&DataType::Variant,
537533
inner_dest_ty,
538-
Value::Column(Column::Variant(column)),
534+
Value::Column(column),
539535
validity,
540536
options,
541537
)?
@@ -547,7 +543,6 @@ impl<'a> Evaluator<'a> {
547543
let mut array_builder =
548544
ArrayType::<VariantType>::create_builder(col.len(), &[]);
549545

550-
let mut temp_array: jsonb::Value;
551546
for (idx, x) in col.iter().enumerate() {
552547
let array = if validity.as_ref().map(|v| v.get_bit(idx)).unwrap_or(true)
553548
{
@@ -597,6 +592,113 @@ impl<'a> Evaluator<'a> {
597592
other => unreachable!("source: {}", other),
598593
}
599594
}
595+
(DataType::Variant, DataType::Map(box DataType::Tuple(fields_dest_ty)))
596+
if fields_dest_ty.len() == 2 && fields_dest_ty[0] == DataType::String =>
597+
{
598+
let empty_obj = BTreeMap::new();
599+
let mut temp_obj: jsonb::Value;
600+
match value {
601+
Value::Scalar(Scalar::Variant(x)) => {
602+
let obj = if validity.as_ref().map(|v| v.get_bit(0)).unwrap_or(true) {
603+
temp_obj = jsonb::from_slice(&x).map_err(|e| {
604+
ErrorCode::BadArguments(format!(
605+
"Expect to be valid json, got err: {e:?}"
606+
))
607+
})?;
608+
temp_obj.as_object().unwrap_or(&empty_obj)
609+
} else {
610+
&empty_obj
611+
};
612+
let validity = None;
613+
614+
let mut key_builder = StringColumnBuilder::with_capacity(obj.len(), 0);
615+
for k in obj.keys() {
616+
key_builder.put_str(k.as_str());
617+
key_builder.commit_row();
618+
}
619+
let key_column = Column::String(key_builder.build());
620+
621+
let values: Vec<_> = obj.values().cloned().collect();
622+
let value_column = Column::Variant(
623+
VariantType::create_column_from_variants(values.as_slice()),
624+
);
625+
626+
let new_value_column = self
627+
.run_cast(
628+
span,
629+
&DataType::Variant,
630+
&fields_dest_ty[1],
631+
Value::Column(value_column),
632+
validity,
633+
options,
634+
)?
635+
.into_column()
636+
.unwrap();
637+
Ok(Value::Scalar(Scalar::Map(Column::Tuple(vec![
638+
key_column,
639+
new_value_column,
640+
]))))
641+
}
642+
Value::Column(Column::Variant(col)) => {
643+
let mut key_builder = StringColumnBuilder::with_capacity(0, 0);
644+
let mut value_builder =
645+
ArrayType::<VariantType>::create_builder(col.len(), &[]);
646+
647+
for (idx, x) in col.iter().enumerate() {
648+
let obj = if validity.as_ref().map(|v| v.get_bit(idx)).unwrap_or(true) {
649+
temp_obj = jsonb::from_slice(x).map_err(|e| {
650+
ErrorCode::BadArguments(format!(
651+
"Expect to be valid json, got err: {e:?}"
652+
))
653+
})?;
654+
temp_obj.as_object().unwrap_or(&empty_obj)
655+
} else {
656+
&empty_obj
657+
};
658+
659+
for (k, v) in obj.iter() {
660+
key_builder.put_str(k.as_str());
661+
key_builder.commit_row();
662+
v.write_to_vec(&mut value_builder.builder.data);
663+
value_builder.builder.commit_row();
664+
}
665+
value_builder.commit_row();
666+
}
667+
let key_column = Column::String(key_builder.build());
668+
669+
let value_column = value_builder.build();
670+
let validity = validity.map(|validity| {
671+
let mut inner_validity = MutableBitmap::with_capacity(col.len());
672+
for (index, offsets) in value_column.offsets.windows(2).enumerate() {
673+
inner_validity.extend_constant(
674+
(offsets[1] - offsets[0]) as usize,
675+
validity.get_bit(index),
676+
);
677+
}
678+
inner_validity.into()
679+
});
680+
681+
let new_value_column = self
682+
.run_cast(
683+
span,
684+
&DataType::Variant,
685+
&fields_dest_ty[1],
686+
Value::Column(Column::Variant(value_column.values)),
687+
validity,
688+
options,
689+
)?
690+
.into_column()
691+
.unwrap();
692+
693+
let kv_column = Column::Tuple(vec![key_column, new_value_column]);
694+
Ok(Value::Column(Column::Map(Box::new(ArrayColumn {
695+
values: kv_column,
696+
offsets: col.offsets,
697+
}))))
698+
}
699+
other => unreachable!("source: {}", other),
700+
}
701+
}
600702
(DataType::EmptyMap, DataType::Map(inner_dest_ty)) => match value {
601703
Value::Scalar(Scalar::EmptyMap) => {
602704
let new_column = ColumnBuilder::with_capacity(inner_dest_ty, 0).build();

src/query/functions/src/scalars/variant.rs

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::borrow::Cow;
16+
use std::collections::BTreeSet;
1617
use std::collections::HashSet;
1718
use std::iter::once;
1819
use std::sync::Arc;
@@ -1679,6 +1680,72 @@ pub fn register(registry: &mut FunctionRegistry) {
16791680
},
16801681
}))
16811682
});
1683+
1684+
registry.register_function_factory("json_object_pick", |_, args_type| {
1685+
if args_type.len() < 2 {
1686+
return None;
1687+
}
1688+
if args_type[0].remove_nullable() != DataType::Variant && args_type[0] != DataType::Null {
1689+
return None;
1690+
}
1691+
for arg_type in args_type.iter().skip(1) {
1692+
if arg_type.remove_nullable() != DataType::String && *arg_type != DataType::Null {
1693+
return None;
1694+
}
1695+
}
1696+
let is_nullable = args_type[0].is_nullable_or_null();
1697+
let return_type = if is_nullable {
1698+
DataType::Nullable(Box::new(DataType::Variant))
1699+
} else {
1700+
DataType::Variant
1701+
};
1702+
Some(Arc::new(Function {
1703+
signature: FunctionSignature {
1704+
name: "json_object_pick".to_string(),
1705+
args_type: args_type.to_vec(),
1706+
return_type,
1707+
},
1708+
eval: FunctionEval::Scalar {
1709+
calc_domain: Box::new(|_, _| FunctionDomain::MayThrow),
1710+
eval: Box::new(move |args, ctx| {
1711+
json_object_pick_or_delete_fn(args, ctx, true, is_nullable)
1712+
}),
1713+
},
1714+
}))
1715+
});
1716+
1717+
registry.register_function_factory("json_object_delete", |_, args_type| {
1718+
if args_type.len() < 2 {
1719+
return None;
1720+
}
1721+
if args_type[0].remove_nullable() != DataType::Variant && args_type[0] != DataType::Null {
1722+
return None;
1723+
}
1724+
for arg_type in args_type.iter().skip(1) {
1725+
if arg_type.remove_nullable() != DataType::String && *arg_type != DataType::Null {
1726+
return None;
1727+
}
1728+
}
1729+
let is_nullable = args_type[0].is_nullable_or_null();
1730+
let return_type = if is_nullable {
1731+
DataType::Nullable(Box::new(DataType::Variant))
1732+
} else {
1733+
DataType::Variant
1734+
};
1735+
Some(Arc::new(Function {
1736+
signature: FunctionSignature {
1737+
name: "json_object_delete".to_string(),
1738+
args_type: args_type.to_vec(),
1739+
return_type,
1740+
},
1741+
eval: FunctionEval::Scalar {
1742+
calc_domain: Box::new(|_, _| FunctionDomain::MayThrow),
1743+
eval: Box::new(move |args, ctx| {
1744+
json_object_pick_or_delete_fn(args, ctx, false, is_nullable)
1745+
}),
1746+
},
1747+
}))
1748+
});
16821749
}
16831750

16841751
fn json_array_fn(args: &[ValueRef<AnyType>], ctx: &mut EvalContext) -> Value<AnyType> {
@@ -2149,6 +2216,84 @@ fn json_object_insert_fn(
21492216
}
21502217
}
21512218

2219+
fn json_object_pick_or_delete_fn(
2220+
args: &[ValueRef<AnyType>],
2221+
ctx: &mut EvalContext,
2222+
is_pick: bool,
2223+
is_nullable: bool,
2224+
) -> Value<AnyType> {
2225+
let len_opt = args.iter().find_map(|arg| match arg {
2226+
ValueRef::Column(col) => Some(col.len()),
2227+
_ => None,
2228+
});
2229+
let len = len_opt.unwrap_or(1);
2230+
let mut keys = BTreeSet::new();
2231+
let mut validity = MutableBitmap::with_capacity(len);
2232+
let mut builder = BinaryColumnBuilder::with_capacity(len, len * 50);
2233+
for idx in 0..len {
2234+
let value = match &args[0] {
2235+
ValueRef::Scalar(scalar) => scalar.clone(),
2236+
ValueRef::Column(col) => unsafe { col.index_unchecked(idx) },
2237+
};
2238+
if value == ScalarRef::Null {
2239+
builder.commit_row();
2240+
validity.push(false);
2241+
continue;
2242+
}
2243+
let value = value.as_variant().unwrap();
2244+
if !is_object(value) {
2245+
ctx.set_error(builder.len(), "Invalid json object");
2246+
builder.commit_row();
2247+
validity.push(false);
2248+
continue;
2249+
}
2250+
keys.clear();
2251+
for arg in args.iter().skip(1) {
2252+
let key = match &arg {
2253+
ValueRef::Scalar(scalar) => scalar.clone(),
2254+
ValueRef::Column(col) => unsafe { col.index_unchecked(idx) },
2255+
};
2256+
if key == ScalarRef::Null {
2257+
continue;
2258+
}
2259+
let key = key.as_string().unwrap();
2260+
keys.insert(*key);
2261+
}
2262+
let res = if is_pick {
2263+
jsonb::object_pick(value, &keys, &mut builder.data)
2264+
} else {
2265+
jsonb::object_delete(value, &keys, &mut builder.data)
2266+
};
2267+
if let Err(err) = res {
2268+
validity.push(false);
2269+
ctx.set_error(builder.len(), err.to_string());
2270+
} else {
2271+
validity.push(true);
2272+
}
2273+
builder.commit_row();
2274+
}
2275+
if is_nullable {
2276+
let validity: Bitmap = validity.into();
2277+
match len_opt {
2278+
Some(_) => {
2279+
Value::Column(Column::Variant(builder.build())).wrap_nullable(Some(validity))
2280+
}
2281+
None => {
2282+
if !validity.get_bit(0) {
2283+
Value::Scalar(Scalar::Null)
2284+
} else {
2285+
Value::Scalar(Scalar::Variant(builder.build_scalar()))
2286+
}
2287+
}
2288+
}
2289+
} else {
2290+
match len_opt {
2291+
Some(_) => Value::Column(Column::Variant(builder.build())),
2292+
None => Value::Scalar(Scalar::Variant(builder.build_scalar())),
2293+
}
2294+
}
2295+
}
2296+
21522297
// Extract string for string type, other types convert to JSON string.
21532298
fn cast_to_string(v: &[u8]) -> String {
21542299
match to_str(v) {

src/query/functions/tests/it/scalars/testdata/function_list.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2313,9 +2313,11 @@ Functions overloads:
23132313
0 json_extract_path_text(String, String) :: String NULL
23142314
1 json_extract_path_text(String NULL, String NULL) :: String NULL
23152315
0 json_object FACTORY
2316+
0 json_object_delete FACTORY
23162317
0 json_object_insert FACTORY
23172318
0 json_object_keep_null FACTORY
23182319
0 json_object_keys(Variant NULL) :: Variant NULL
2320+
0 json_object_pick FACTORY
23192321
0 json_path_exists FACTORY
23202322
0 json_path_match FACTORY
23212323
0 json_path_query FACTORY

0 commit comments

Comments
 (0)