Skip to content

Commit 71e0c50

Browse files
authored
chore(query): improve udf server headers (#17349)
* chore(query): improve udf server headers * chore(query): improve udf server headers * chore(query): improve udf server headers
1 parent 7969d36 commit 71e0c50

File tree

5 files changed

+37
-33
lines changed

5 files changed

+37
-33
lines changed

src/common/base/src/headers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub const HEADER_QUERY_ID: &str = "X-DATABEND-QUERY-ID";
1717
pub const HEADER_USER: &str = "X-DATABEND-USER";
1818

1919
pub const HEADER_FUNCTION: &str = "X-DATABEND-FUNCTION";
20+
pub const HEADER_FUNCTION_HANDLER: &str = "X-DATABEND-FUNCTION-HANDLER";
2021

2122
pub const HEADER_DEDUPLICATE_LABEL: &str = "X-DATABEND-DEDUPLICATE-LABEL";
2223
pub const HEADER_NODE_ID: &str = "X-DATABEND-NODE-ID";

src/query/expression/src/utils/udf_client.rs

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use arrow_flight::flight_service_client::FlightServiceClient;
2323
use arrow_flight::FlightDescriptor;
2424
use arrow_select::concat::concat_batches;
2525
use databend_common_base::headers::HEADER_FUNCTION;
26+
use databend_common_base::headers::HEADER_FUNCTION_HANDLER;
2627
use databend_common_base::headers::HEADER_QUERY_ID;
2728
use databend_common_base::headers::HEADER_TENANT;
2829
use databend_common_base::version::DATABEND_SEMVER;
@@ -120,43 +121,38 @@ impl UDFFlightClient {
120121
})
121122
}
122123

123-
/// Set tenant for the UDF client.
124-
pub fn with_tenant(mut self, tenant: &str) -> Result<Self> {
125-
let key = HEADER_TENANT.to_lowercase();
126-
let key = MetadataKey::from_str(key.as_str()).map_err(|err| {
127-
ErrorCode::UDFDataError(format!("Parse tenant header key error: {err}"))
128-
})?;
129-
let value = MetadataValue::from_str(tenant).map_err(|err| {
130-
ErrorCode::UDFDataError(format!("Parse tenant header value error: {err}"))
131-
})?;
132-
self.headers.insert(key, value);
124+
pub fn with_headers<'a, H: IntoIterator<Item = (&'a str, &'a str)>>(
125+
mut self,
126+
headers: H,
127+
) -> Result<Self> {
128+
for (key, value) in headers.into_iter() {
129+
let key = MetadataKey::from_str(key)
130+
.map_err(|err| ErrorCode::UDFDataError(format!("Parse key {key} error: {err}")))?;
131+
let value = MetadataValue::from_str(value).map_err(|err| {
132+
ErrorCode::UDFDataError(format!("Parse value {value} error: {err}"))
133+
})?;
134+
self.headers.insert(key, value);
135+
}
133136
Ok(self)
134137
}
135138

139+
/// Set tenant for the UDF client.
140+
pub fn with_tenant(self, tenant: &str) -> Result<Self> {
141+
self.with_headers([(HEADER_TENANT, tenant)])
142+
}
143+
136144
/// Set function name for the UDF client.
137-
pub fn with_func_name(mut self, func_name: &str) -> Result<Self> {
138-
let key = HEADER_FUNCTION.to_lowercase();
139-
let key = MetadataKey::from_str(key.as_str()).map_err(|err| {
140-
ErrorCode::UDFDataError(format!("Parse function name header key error: {err}"))
141-
})?;
142-
let value = MetadataValue::from_str(func_name).map_err(|err| {
143-
ErrorCode::UDFDataError(format!("Parse function name header value error: {err}"))
144-
})?;
145-
self.headers.insert(key, value);
146-
Ok(self)
145+
pub fn with_func_name(self, func_name: &str) -> Result<Self> {
146+
self.with_headers([(HEADER_FUNCTION, func_name)])
147+
}
148+
149+
pub fn with_handler_name(self, handler_name: &str) -> Result<Self> {
150+
self.with_headers([(HEADER_FUNCTION_HANDLER, handler_name)])
147151
}
148152

149153
/// Set query id for the UDF client.
150-
pub fn with_query_id(mut self, query_id: &str) -> Result<Self> {
151-
let key = HEADER_QUERY_ID.to_lowercase();
152-
let key = MetadataKey::from_str(key.as_str()).map_err(|err| {
153-
ErrorCode::UDFDataError(format!("Parse query id header key error: {err}"))
154-
})?;
155-
let value = MetadataValue::from_str(query_id).map_err(|err| {
156-
ErrorCode::UDFDataError(format!("Parse query id header value error: {err}"))
157-
})?;
158-
self.headers.insert(key, value);
159-
Ok(self)
154+
pub fn with_query_id(self, query_id: &str) -> Result<Self> {
155+
self.with_headers([(HEADER_QUERY_ID, query_id)])
160156
}
161157

162158
fn make_request<T>(&self, t: T) -> Request<T> {

src/query/service/src/pipelines/processors/transforms/transform_udf_server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,8 @@ impl TransformUdfServer {
157157
let mut client = UDFFlightClient::connect(endpoint, connect_timeout, 65536)
158158
.await?
159159
.with_tenant(ctx.get_tenant().tenant_name())?
160-
.with_func_name(&func.func_name)?
160+
.with_func_name(&func.name)?
161+
.with_handler_name(&func.func_name)?
161162
.with_query_id(&ctx.get_id())?;
162163

163164
let connect_duration = instant.elapsed();

src/query/service/src/table_functions/others/udf.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ impl Table for UdfEchoTable {
148148
.await?
149149
.with_tenant(ctx.get_tenant().tenant_name())?
150150
.with_func_name("builtin_echo")?
151+
.with_handler_name("builtin_echo")?
151152
.with_query_id(&ctx.get_id())?;
152153

153154
let array = arrow_array::LargeStringArray::from(vec![self.arg.clone()]);

src/query/sql/src/planner/binder/udf.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,13 @@ impl Binder {
106106

107107
let endpoint =
108108
UDFFlightClient::build_endpoint(address, connect_timeout, request_timeout)?;
109-
let mut client =
110-
UDFFlightClient::connect(endpoint, connect_timeout, batch_rows).await?;
109+
110+
let mut client = UDFFlightClient::connect(endpoint, connect_timeout, batch_rows)
111+
.await?
112+
.with_tenant(self.ctx.get_tenant().tenant_name())?
113+
.with_func_name(&name)?
114+
.with_handler_name(handler)?
115+
.with_query_id(&self.ctx.get_id())?;
111116
client
112117
.check_schema(handler, &arg_datatypes, &return_type)
113118
.await?;

0 commit comments

Comments
 (0)