Skip to content

Commit 66ac1c5

Browse files
authored
Merge pull request #175 from influxdata/crepererum/df50
chore: DataFusion 50
2 parents c997806 + cadf3a5 commit 66ac1c5

File tree

36 files changed

+1047
-913
lines changed

36 files changed

+1047
-913
lines changed

Cargo.lock

Lines changed: 176 additions & 100 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ license = "MIT OR Apache-2.0"
1717

1818
[workspace.dependencies]
1919
anyhow = { version = "1.0.100", default-features = false }
20-
arrow = { version = "55.2.0", default-features = false, features = ["ipc"] }
20+
arrow = { version = "56.2.0", default-features = false, features = ["ipc"] }
2121
chrono = { version = "0.4.42", default-features = false }
22-
datafusion = { version = "49.0.1", default-features = false }
23-
datafusion-common = { version = "49.0.1", default-features = false }
24-
datafusion-expr = { version = "49.0.1", default-features = false }
25-
datafusion-sql = { version = "49.0.1", default-features = false }
22+
datafusion = { version = "50.3.0", default-features = false }
23+
datafusion-common = { version = "50.3.0", default-features = false }
24+
datafusion-expr = { version = "50.3.0", default-features = false }
25+
datafusion-sql = { version = "50.3.0", default-features = false }
2626
datafusion-udf-wasm-arrow2bytes = { path = "arrow2bytes", version = "0.1.0" }
2727
datafusion-udf-wasm-bundle = { path = "guests/bundle", version = "0.1.0" }
2828
datafusion-udf-wasm-evil = { path = "guests/evil", version = "0.1.0" }
@@ -36,13 +36,14 @@ insta = { version = "1.43.2", "default-features" = false }
3636
log = { version = "0.4.28", default-features = false }
3737
pyo3 = { version = "0.27.1", default-features = false, features = ["macros"] }
3838
regex = { version = "1", default-features = false }
39-
sqlparser = { version = "0.55.0", default-features = false, features = [
39+
sqlparser = { version = "0.58.0", default-features = false, features = [
4040
"std",
4141
"visitor"
4242
] }
4343
tar = { version = "0.4.44", default-features = false }
4444
tempfile = { version = "3.23.0", default-features = false }
4545
tokio = { version = "1.48.0", default-features = false }
46+
uuid = { version = "1.18.1", default-features = false, features = ["v4"] }
4647
wasip2 = { version = "1" }
4748
wasmtime = { version = "38.0.4", default-features = false, features = [
4849
"async",
@@ -90,14 +91,6 @@ bare_urls = "deny"
9091
broken_intra_doc_links = "deny"
9192
private_intra_doc_links = "deny"
9293

93-
[patch.crates-io]
94-
# use same DataFusion fork as InfluxDB
95-
# See https://github.com/influxdata/arrow-datafusion/pull/72
96-
datafusion = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "8347a71f62d4fef8d37548f22b93877170039357" }
97-
datafusion-common = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "8347a71f62d4fef8d37548f22b93877170039357" }
98-
datafusion-expr = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "8347a71f62d4fef8d37548f22b93877170039357" }
99-
datafusion-sql = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "8347a71f62d4fef8d37548f22b93877170039357" }
100-
10194
# faster tests
10295
[profile.dev.package]
10396
# improve wasmtime compilation speed

guests/evil/src/runtime.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
//! Evil payloads that try to break the sandbox when the UDF is called.
2-
use std::sync::Arc;
2+
use std::{hash::Hash, sync::Arc};
33

44
use arrow::datatypes::DataType;
55
use datafusion_common::{Result as DataFusionResult, ScalarValue};
@@ -49,6 +49,20 @@ impl std::fmt::Debug for SideEffect {
4949
}
5050
}
5151

52+
impl PartialEq<Self> for SideEffect {
53+
fn eq(&self, other: &Self) -> bool {
54+
self.name == other.name
55+
}
56+
}
57+
58+
impl Eq for SideEffect {}
59+
60+
impl Hash for SideEffect {
61+
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
62+
self.name.hash(state);
63+
}
64+
}
65+
5266
impl ScalarUDFImpl for SideEffect {
5367
fn as_any(&self) -> &dyn std::any::Any {
5468
self

guests/python/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ datafusion-common.workspace = true
1414
datafusion-expr.workspace = true
1515
datafusion-udf-wasm-guest.workspace = true
1616
pyo3.workspace = true
17+
uuid.workspace = true
1718
wasip2.workspace = true
1819

1920
[build-dependencies]

guests/python/src/lib.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
//! [CPython]: https://www.python.org/
55
//! [`pyo3`]: https://pyo3.rs/
66
use std::any::Any;
7+
use std::hash::Hash;
78
use std::ops::{ControlFlow, Range};
89
use std::sync::{Arc, Once};
910

@@ -15,6 +16,7 @@ use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signatur
1516
use datafusion_udf_wasm_guest::export;
1617
use pyo3::prelude::*;
1718
use pyo3::types::PyTuple;
19+
use uuid::Uuid;
1820

1921
use crate::error::py_err_to_string;
2022
use crate::inspect::inspect_python_code;
@@ -35,6 +37,9 @@ struct PythonScalarUDF {
3537
/// Handle of the wrapped python function.
3638
python_function: PythonFn,
3739

40+
/// We treat every python UDF as unique, but we need a proxy value to express that.
41+
id: Uuid,
42+
3843
/// Signature of the UDF.
3944
///
4045
/// We store this here because [`ScalarUDFImpl::signature`] requires us to return a reference.
@@ -56,6 +61,7 @@ impl PythonScalarUDF {
5661

5762
Self {
5863
python_function,
64+
id: Uuid::new_v4(),
5965
signature,
6066
}
6167
}
@@ -94,6 +100,20 @@ impl PythonScalarUDF {
94100
}
95101
}
96102

103+
impl PartialEq<Self> for PythonScalarUDF {
104+
fn eq(&self, other: &Self) -> bool {
105+
self.id == other.id
106+
}
107+
}
108+
109+
impl Eq for PythonScalarUDF {}
110+
111+
impl Hash for PythonScalarUDF {
112+
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
113+
self.id.hash(state);
114+
}
115+
}
116+
97117
impl ScalarUDFImpl for PythonScalarUDF {
98118
fn as_any(&self) -> &dyn Any {
99119
self
@@ -118,6 +138,7 @@ impl ScalarUDFImpl for PythonScalarUDF {
118138
arg_fields,
119139
number_rows,
120140
return_field,
141+
config_options: _,
121142
} = args;
122143

123144
let return_dt = self.python_function.signature.return_type.t.data_type();

guests/rust/examples/add_one.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signatur
1313
use datafusion_udf_wasm_guest::export;
1414

1515
/// UDF that implements "add one".
16-
#[derive(Debug)]
16+
#[derive(Debug, PartialEq, Eq, Hash)]
1717
struct AddOne {
1818
/// Signature of the UDF.
1919
///
@@ -58,6 +58,7 @@ impl ScalarUDFImpl for AddOne {
5858
arg_fields: _,
5959
number_rows: _,
6060
return_field: _,
61+
config_options: _,
6162
} = args;
6263

6364
// extract inputs

guests/rust/src/conversion.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
//! Conversion routes from/to [WIT types](crate::bindings).
2+
use std::sync::Arc;
3+
24
use arrow::{
35
array::ArrayRef,
46
datatypes::{DataType, Field, FieldRef},
57
};
6-
use datafusion_common::{error::DataFusionError, scalar::ScalarValue};
8+
use datafusion_common::{config::ConfigOptions, error::DataFusionError, scalar::ScalarValue};
79
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
810
use datafusion_udf_wasm_arrow2bytes::{array2bytes, bytes2array, bytes2datatype, datatype2bytes};
911

@@ -238,6 +240,9 @@ impl TryFrom<wit_types::ScalarFunctionArgs> for ScalarFunctionArgs {
238240
.collect::<Result<_, _>>()?,
239241
number_rows: value.number_rows as usize,
240242
return_field: value.return_field.try_into()?,
243+
config_options: Arc::new(ConfigOptions::from_string_hash_map(
244+
&value.config_options.into_iter().collect(),
245+
)?),
241246
})
242247
}
243248
}

host/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ rand = { version = "0.9" }
1717
siphasher = { version = "1", default-features = false }
1818
tar.workspace = true
1919
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "sync"] }
20+
uuid.workspace = true
2021
wasmtime.workspace = true
2122
wasmtime-wasi.workspace = true
2223
wasmtime-wasi-http.workspace = true

host/src/conversion.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,16 @@ impl TryFrom<ScalarFunctionArgs> for wit_types::ScalarFunctionArgs {
221221
.collect(),
222222
number_rows: value.number_rows as u64,
223223
return_field: value.return_field.as_ref().clone().into(),
224+
config_options: value
225+
.config_options
226+
.entries()
227+
.into_iter()
228+
.filter_map(|e| {
229+
let k = e.key;
230+
let v = e.value?;
231+
Some((k, v))
232+
})
233+
.collect(),
224234
})
225235
}
226236
}

host/src/lib.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@
22
//!
33
//!
44
//! [DataFusion]: https://datafusion.apache.org/
5-
use std::{any::Any, collections::BTreeMap, ops::DerefMut, sync::Arc};
5+
use std::{any::Any, collections::BTreeMap, hash::Hash, ops::DerefMut, sync::Arc};
66

77
use ::http::HeaderName;
88
use arrow::datatypes::DataType;
9-
use datafusion_common::{DataFusionError, Result as DataFusionResult, config::ConfigOptions};
9+
use datafusion_common::{DataFusionError, Result as DataFusionResult};
1010
use datafusion_expr::{
1111
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
1212
async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl},
1313
};
1414
use tokio::{runtime::Handle, sync::Mutex};
15+
use uuid::Uuid;
1516
use wasmtime::{
1617
Engine, Store,
1718
component::{Component, ResourceAny},
@@ -322,6 +323,9 @@ pub struct WasmScalarUdf {
322323
/// [`ScalarUDFImpl::name`] is sync and requires us to return a reference.
323324
name: String,
324325

326+
/// We treat every UDF as unique, but we need a proxy value to express that.
327+
id: Uuid,
328+
325329
/// Signature of the UDF.
326330
///
327331
/// This was pre-fetched during UDF generation because
@@ -458,6 +462,7 @@ impl WasmScalarUdf {
458462
bindings: Arc::clone(&bindings),
459463
resource,
460464
name,
465+
id: Uuid::new_v4(),
461466
signature,
462467
return_type,
463468
});
@@ -508,6 +513,7 @@ impl std::fmt::Debug for WasmScalarUdf {
508513
bindings: _,
509514
resource,
510515
name,
516+
id,
511517
signature,
512518
return_type,
513519
} = self;
@@ -517,12 +523,27 @@ impl std::fmt::Debug for WasmScalarUdf {
517523
.field("bindings", &"<BINDINGS>")
518524
.field("resource", resource)
519525
.field("name", name)
526+
.field("id", id)
520527
.field("signature", signature)
521528
.field("return_type", return_type)
522529
.finish()
523530
}
524531
}
525532

533+
impl PartialEq<Self> for WasmScalarUdf {
534+
fn eq(&self, other: &Self) -> bool {
535+
self.id == other.id
536+
}
537+
}
538+
539+
impl Eq for WasmScalarUdf {}
540+
541+
impl Hash for WasmScalarUdf {
542+
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
543+
self.id.hash(state);
544+
}
545+
}
546+
526547
impl ScalarUDFImpl for WasmScalarUdf {
527548
fn as_any(&self) -> &dyn Any {
528549
self
@@ -579,8 +600,7 @@ impl AsyncScalarUDFImpl for WasmScalarUdf {
579600
async fn invoke_async_with_args(
580601
&self,
581602
args: ScalarFunctionArgs,
582-
_option: &ConfigOptions,
583-
) -> DataFusionResult<arrow::array::ArrayRef> {
603+
) -> DataFusionResult<ColumnarValue> {
584604
let args = args.try_into()?;
585605
let mut store_guard = self.store.lock().await;
586606
let return_type = self
@@ -596,10 +616,6 @@ impl AsyncScalarUDFImpl for WasmScalarUdf {
596616

597617
drop(store_guard);
598618

599-
let columnar_value: ColumnarValue = return_type.try_into()?;
600-
match columnar_value {
601-
ColumnarValue::Array(v) => Ok(v),
602-
ColumnarValue::Scalar(v) => v.to_array_of_size(args.number_rows as usize),
603-
}
619+
return_type.try_into()
604620
}
605621
}

0 commit comments

Comments
 (0)