Skip to content

Commit b8b06d6

Browse files
authored
feat(query): the HTTP protocol supports returning a body in arrow IPC format (#18890)
* refine * BodyFormat * from_internal * test * fix * fix * test_arrow_ipc_no_data * json if no data * fix * fix * fix
1 parent 54c2a76 commit b8b06d6

File tree

9 files changed

+283
-74
lines changed

9 files changed

+283
-74
lines changed

โ€ŽMakefileโ€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ lint:
2020
# Cargo.toml file formatter(make setup to install)
2121
taplo fmt
2222
# Python file formatter(make setup to install)
23-
ruff format tests/
23+
# ruff format tests/
2424
# Bash file formatter(make setup to install)
2525
shfmt -l -w scripts/*
2626

โ€Žsrc/query/service/src/servers/http/v1/http_query_handlers.rsโ€Ž

Lines changed: 149 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,13 @@ use databend_common_base::runtime::ParentMemStat;
2727
use databend_common_base::runtime::ThreadTracker;
2828
use databend_common_base::runtime::GLOBAL_MEM_STAT;
2929
use databend_common_config::GlobalConfig;
30-
use databend_common_expression::DataSchemaRef;
30+
use databend_common_exception::ErrorCode;
31+
use databend_common_expression::DataSchema;
3132
use databend_common_management::WorkloadGroupResourceManager;
3233
use databend_common_metrics::http::metrics_incr_http_response_errors_count;
3334
use fastrace::prelude::*;
35+
use headers::Header;
36+
use headers::HeaderMapExt;
3437
use http::HeaderMap;
3538
use http::HeaderValue;
3639
use http::StatusCode;
@@ -47,8 +50,10 @@ use poem::put;
4750
use poem::web::Json;
4851
use poem::web::Path;
4952
use poem::EndpointExt;
53+
use poem::FromRequest;
5054
use poem::IntoResponse;
5155
use poem::Request;
56+
use poem::RequestBody;
5257
use poem::Response;
5358
use poem::Route;
5459
use serde::Deserialize;
@@ -60,6 +65,7 @@ use super::query::ExecuteStateKind;
6065
use super::query::HttpQuery;
6166
use super::query::HttpQueryRequest;
6267
use super::query::HttpQueryResponseInternal;
68+
use super::query::ResponseState;
6369
use crate::clusters::ClusterDiscovery;
6470
use crate::servers::http::error::HttpErrorCode;
6571
use crate::servers::http::error::QueryError;
@@ -123,7 +129,7 @@ pub struct QueryResponseField {
123129
}
124130

125131
impl QueryResponseField {
126-
pub fn from_schema(schema: DataSchemaRef) -> Vec<Self> {
132+
pub fn from_schema(schema: &DataSchema) -> Vec<Self> {
127133
schema
128134
.fields()
129135
.iter()
@@ -165,17 +171,34 @@ pub struct QueryResponse {
165171
}
166172

167173
impl QueryResponse {
168-
pub(crate) fn from_internal(
174+
fn from_internal(
169175
id: String,
170-
r: HttpQueryResponseInternal,
176+
HttpQueryResponseInternal {
177+
data,
178+
session_id,
179+
session,
180+
node_id,
181+
result_timeout_secs,
182+
state:
183+
ResponseState {
184+
has_result_set,
185+
schema,
186+
running_time_ms,
187+
progresses,
188+
state,
189+
affect,
190+
error,
191+
warnings,
192+
},
193+
}: HttpQueryResponseInternal,
171194
is_final: bool,
172-
) -> impl IntoResponse {
173-
let state = r.state.clone();
195+
body_format: BodyFormat,
196+
) -> Response {
174197
let (data, next_uri) = if is_final {
175198
(Arc::new(BlocksSerializer::empty()), None)
176199
} else {
177-
match state.state {
178-
ExecuteStateKind::Running | ExecuteStateKind::Starting => match r.data {
200+
match state {
201+
ExecuteStateKind::Running | ExecuteStateKind::Starting => match data {
179202
None => (
180203
Arc::new(BlocksSerializer::empty()),
181204
Some(make_state_uri(&id)),
@@ -192,7 +215,7 @@ impl QueryResponse {
192215
Arc::new(BlocksSerializer::empty()),
193216
Some(make_final_uri(&id)),
194217
),
195-
ExecuteStateKind::Succeeded => match r.data {
218+
ExecuteStateKind::Succeeded => match data {
196219
None => (
197220
Arc::new(BlocksSerializer::empty()),
198221
Some(make_final_uri(&id)),
@@ -208,39 +231,64 @@ impl QueryResponse {
208231
}
209232
};
210233

211-
if let Some(err) = &r.state.error {
234+
if let Some(err) = &error {
212235
metrics_incr_http_response_errors_count(err.name(), err.code());
213236
}
214237

215-
let session_id = r.session_id.clone();
216-
let stats = QueryStats {
217-
progresses: state.progresses.clone(),
218-
running_time_ms: state.running_time_ms,
219-
};
220238
let rows = data.num_rows();
221-
222-
Json(QueryResponse {
223-
data,
224-
state: state.state,
225-
schema: state.schema.clone(),
226-
session_id: Some(session_id),
227-
node_id: r.node_id,
228-
session: r.session,
229-
stats,
230-
affect: state.affect,
231-
warnings: r.state.warnings,
239+
let mut res = QueryResponse {
232240
id: id.clone(),
241+
session_id: Some(session_id),
242+
node_id,
243+
state,
244+
session,
245+
stats: QueryStats {
246+
progresses,
247+
running_time_ms,
248+
},
249+
schema: vec![],
250+
data: Arc::new(BlocksSerializer::empty()),
251+
affect,
252+
warnings,
233253
next_uri,
234254
stats_uri: Some(make_state_uri(&id)),
235255
final_uri: Some(make_final_uri(&id)),
236256
kill_uri: Some(make_kill_uri(&id)),
237-
error: r.state.error.map(QueryError::from_error_code),
238-
has_result_set: r.state.has_result_set,
239-
result_timeout_secs: Some(r.result_timeout_secs),
240-
})
241-
.with_header(HEADER_QUERY_ID, id.clone())
242-
.with_header(HEADER_QUERY_STATE, state.state.to_string())
243-
.with_header(HEADER_QUERY_PAGE_ROWS, rows)
257+
error: error.map(QueryError::from_error_code),
258+
has_result_set,
259+
result_timeout_secs: Some(result_timeout_secs),
260+
};
261+
262+
match body_format {
263+
BodyFormat::Arrow if !schema.fields.is_empty() && !data.is_empty() => {
264+
let buf: Result<_, ErrorCode> = try {
265+
const META_KEY: &str = "response_header";
266+
let json_res = serde_json::to_string(&res)?;
267+
data.to_arrow_ipc(&schema, vec![(META_KEY.to_string(), json_res)])?
268+
};
269+
270+
match buf {
271+
Ok(buf) => Response::builder()
272+
.header(HEADER_QUERY_ID, id)
273+
.header(HEADER_QUERY_STATE, state.to_string())
274+
.header(HEADER_QUERY_PAGE_ROWS, rows)
275+
.content_type(body_format.content_type())
276+
.body(buf),
277+
Err(err) => Response::builder()
278+
.status(StatusCode::INTERNAL_SERVER_ERROR)
279+
.body(err.to_string()),
280+
}
281+
}
282+
_ => {
283+
res.data = data;
284+
res.schema = QueryResponseField::from_schema(&schema);
285+
Json(res)
286+
.with_header(HEADER_QUERY_ID, id)
287+
.with_header(HEADER_QUERY_STATE, state.to_string())
288+
.with_header(HEADER_QUERY_PAGE_ROWS, rows)
289+
.into_response()
290+
}
291+
}
244292
}
245293
}
246294

@@ -307,6 +355,7 @@ impl StateResponse {
307355
#[poem::handler]
308356
async fn query_final_handler(
309357
ctx: &HttpQueryContext,
358+
body_format: BodyFormat,
310359
Path(query_id): Path<String>,
311360
) -> PoemResult<impl IntoResponse> {
312361
ctx.check_node_id(&query_id)?;
@@ -335,7 +384,12 @@ async fn query_final_handler(
335384
// it is safe to set these 2 fields to None, because client now check for null/None first.
336385
response.session = None;
337386
response.state.affect = None;
338-
Ok(QueryResponse::from_internal(query_id, response, true))
387+
Ok(QueryResponse::from_internal(
388+
query_id,
389+
response,
390+
true,
391+
body_format,
392+
))
339393
}
340394
None => Err(query_id_not_found(&query_id, &ctx.node_id)),
341395
}
@@ -408,6 +462,7 @@ async fn query_state_handler(
408462
#[poem::handler]
409463
async fn query_page_handler(
410464
ctx: &HttpQueryContext,
465+
body_format: BodyFormat,
411466
Path((query_id, page_no)): Path<(String, usize)>,
412467
) -> PoemResult<impl IntoResponse> {
413468
ctx.check_node_id(&query_id)?;
@@ -459,7 +514,12 @@ async fn query_page_handler(
459514
query
460515
.update_expire_time(false, resp.is_data_drained())
461516
.await;
462-
Ok(QueryResponse::from_internal(query_id, resp, false))
517+
Ok(QueryResponse::from_internal(
518+
query_id,
519+
resp,
520+
false,
521+
body_format,
522+
))
463523
}
464524
}
465525
};
@@ -483,13 +543,12 @@ async fn query_page_handler(
483543

484544
#[poem::handler]
485545
#[async_backtrace::framed]
486-
#[fastrace::trace]
487546
pub(crate) async fn query_handler(
488547
ctx: &HttpQueryContext,
548+
body_format: BodyFormat,
489549
Json(mut req): Json<HttpQueryRequest>,
490550
) -> PoemResult<impl IntoResponse> {
491551
let session = ctx.session.clone();
492-
493552
let query_handle = async {
494553
let agent_info = ctx
495554
.user_agent
@@ -553,7 +612,10 @@ pub(crate) async fn query_handler(
553612
query
554613
.update_expire_time(false, resp.is_data_drained())
555614
.await;
556-
Ok(QueryResponse::from_internal(query.id.to_string(), resp, false).into_response())
615+
Ok(
616+
QueryResponse::from_internal(query.id.to_string(), resp, false, body_format)
617+
.into_response(),
618+
)
557619
}
558620
}
559621
};
@@ -924,3 +986,52 @@ pub(crate) fn get_http_tracing_span(
924986
Span::root(name, SpanContext::new(trace_id, SpanId(rand::random())))
925987
.with_properties(|| ctx.to_fastrace_properties())
926988
}
989+
990+
#[derive(Debug, Clone, Copy)]
991+
enum BodyFormat {
992+
Json,
993+
Arrow,
994+
}
995+
996+
impl Header for BodyFormat {
997+
fn name() -> &'static http::HeaderName {
998+
&http::header::ACCEPT
999+
}
1000+
1001+
fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
1002+
where
1003+
Self: Sized,
1004+
I: Iterator<Item = &'i HeaderValue>,
1005+
{
1006+
if let Some(v) = values.next() {
1007+
match v.to_str() {
1008+
Ok(s) if s == BodyFormat::Arrow.content_type() => return Ok(BodyFormat::Arrow),
1009+
Err(_) => return Err(headers::Error::invalid()),
1010+
_ => {}
1011+
};
1012+
}
1013+
Ok(BodyFormat::Json)
1014+
}
1015+
1016+
fn encode<E: Extend<HeaderValue>>(&self, values: &mut E) {
1017+
values.extend([HeaderValue::from_static(self.content_type())]);
1018+
}
1019+
}
1020+
1021+
impl BodyFormat {
1022+
pub const fn content_type(&self) -> &'static str {
1023+
match self {
1024+
BodyFormat::Json => "application/json",
1025+
BodyFormat::Arrow => "application/vnd.apache.arrow.stream",
1026+
}
1027+
}
1028+
}
1029+
1030+
impl<'a> FromRequest<'a> for BodyFormat {
1031+
async fn from_request(req: &'a Request, _body: &mut RequestBody) -> Result<Self, PoemError> {
1032+
Ok(req
1033+
.headers()
1034+
.typed_get::<Self>()
1035+
.unwrap_or(BodyFormat::Json))
1036+
}
1037+
}

โ€Žsrc/query/service/src/servers/http/v1/query/blocks_serializer.rsโ€Ž

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,18 @@
1515
use std::cell::RefCell;
1616
use std::ops::DerefMut;
1717

18+
use arrow_ipc::writer::IpcWriteOptions;
19+
use arrow_ipc::writer::StreamWriter;
20+
use arrow_ipc::CompressionType;
21+
use arrow_ipc::MetadataVersion;
22+
use databend_common_exception::Result;
1823
use databend_common_expression::types::date::date_to_string;
1924
use databend_common_expression::types::interval::interval_to_string;
2025
use databend_common_expression::types::timestamp::timestamp_to_string;
2126
use databend_common_expression::BlockEntry;
2227
use databend_common_expression::Column;
2328
use databend_common_expression::DataBlock;
29+
use databend_common_expression::DataSchema;
2430
use databend_common_formats::field_encoder::FieldEncoderValues;
2531
use databend_common_io::ewkb_to_geo;
2632
use databend_common_io::geo_to_ewkb;
@@ -100,6 +106,28 @@ impl BlocksSerializer {
100106
pub fn num_rows(&self) -> usize {
101107
self.columns.iter().map(|(_, num_rows)| *num_rows).sum()
102108
}
109+
110+
pub fn to_arrow_ipc(
111+
&self,
112+
data_schema: &DataSchema,
113+
ext_meta: Vec<(String, String)>,
114+
) -> Result<Vec<u8>> {
115+
let mut schema = arrow_schema::Schema::from(data_schema);
116+
schema.metadata.extend(ext_meta);
117+
118+
let mut buf = Vec::new();
119+
let opts = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?
120+
.try_with_compression(Some(CompressionType::LZ4_FRAME))?;
121+
let mut writer = StreamWriter::try_new_with_options(&mut buf, &schema, opts)?;
122+
123+
for (block, _) in &self.columns {
124+
let block = DataBlock::new_from_columns(block.clone());
125+
let batch = block.to_record_batch_with_dataschema(data_schema)?;
126+
writer.write(&batch)?;
127+
}
128+
writer.finish()?;
129+
Ok(buf)
130+
}
103131
}
104132

105133
impl serde::Serialize for BlocksSerializer {

0 commit comments

Comments
ย (0)