Skip to content

Commit 37d96b2

Browse files
authored
chore(query): improve fuse row fetch, use accumulating (#17309)
* fix(query): improve fuse row fetch, use accumulating * fix(query): improve fuse row fetch, use accumulating * fix(query): improve fuse row fetch, use accumulating * fix(query): add extra log
1 parent e6e3aec commit 37d96b2

File tree

4 files changed

+46
-27
lines changed

4 files changed

+46
-27
lines changed

src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ use databend_common_pipeline_core::processors::Processor;
2626
pub trait AsyncAccumulatingTransform: Send {
2727
const NAME: &'static str;
2828

29+
async fn on_start(&mut self) -> Result<()> {
30+
Ok(())
31+
}
32+
2933
async fn transform(&mut self, data: DataBlock) -> Result<Option<DataBlock>>;
3034

3135
async fn on_finish(&mut self, _output: bool) -> Result<Option<DataBlock>> {
@@ -38,6 +42,7 @@ pub struct AsyncAccumulatingTransformer<T: AsyncAccumulatingTransform + 'static>
3842
input: Arc<InputPort>,
3943
output: Arc<OutputPort>,
4044

45+
called_on_start: bool,
4146
called_on_finish: bool,
4247
input_data: Option<DataBlock>,
4348
output_data: Option<DataBlock>,
@@ -51,6 +56,7 @@ impl<T: AsyncAccumulatingTransform + 'static> AsyncAccumulatingTransformer<T> {
5156
output,
5257
input_data: None,
5358
output_data: None,
59+
called_on_start: false,
5460
called_on_finish: false,
5561
})
5662
}
@@ -67,6 +73,10 @@ impl<T: AsyncAccumulatingTransform + 'static> Processor for AsyncAccumulatingTra
6773
}
6874

6975
fn event(&mut self) -> Result<Event> {
76+
if !self.called_on_start {
77+
return Ok(Event::Async);
78+
}
79+
7080
if self.output.is_finished() {
7181
if !self.called_on_finish {
7282
return Ok(Event::Async);
@@ -111,6 +121,12 @@ impl<T: AsyncAccumulatingTransform + 'static> Processor for AsyncAccumulatingTra
111121

112122
#[async_backtrace::framed]
113123
async fn async_process(&mut self) -> Result<()> {
124+
if !self.called_on_start {
125+
self.called_on_start = true;
126+
self.inner.on_start().await?;
127+
return Ok(());
128+
}
129+
114130
if let Some(data_block) = self.input_data.take() {
115131
self.output_data = self.inner.transform(data_block).await?;
116132
return Ok(());

src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,13 @@ use databend_common_expression::types::DataType;
2525
use databend_common_expression::types::NumberDataType;
2626
use databend_common_expression::BlockEntry;
2727
use databend_common_expression::Column;
28-
use databend_common_expression::ColumnBuilder;
2928
use databend_common_expression::DataBlock;
30-
use databend_common_expression::DataSchema;
3129
use databend_common_expression::Value;
3230
use databend_common_pipeline_core::processors::InputPort;
3331
use databend_common_pipeline_core::processors::OutputPort;
3432
use databend_common_pipeline_core::processors::ProcessorPtr;
35-
use databend_common_pipeline_transforms::processors::AsyncTransform;
36-
use databend_common_pipeline_transforms::processors::AsyncTransformer;
33+
use databend_common_pipeline_transforms::AsyncAccumulatingTransform;
34+
use databend_common_pipeline_transforms::AsyncAccumulatingTransformer;
3735
use databend_storages_common_io::ReadSettings;
3836

3937
use super::native_rows_fetcher::NativeRowsFetcher;
@@ -132,17 +130,17 @@ pub fn row_fetch_processor(
132130
pub trait RowsFetcher {
133131
async fn on_start(&mut self) -> Result<()>;
134132
async fn fetch(&mut self, row_ids: &[u64]) -> Result<DataBlock>;
135-
fn schema(&self) -> DataSchema;
136133
}
137134

138135
pub struct TransformRowsFetcher<F: RowsFetcher> {
139136
row_id_col_offset: usize,
140137
fetcher: F,
141138
need_wrap_nullable: bool,
139+
blocks: Vec<DataBlock>,
142140
}
143141

144142
#[async_trait::async_trait]
145-
impl<F> AsyncTransform for TransformRowsFetcher<F>
143+
impl<F> AsyncAccumulatingTransform for TransformRowsFetcher<F>
146144
where F: RowsFetcher + Send + Sync + 'static
147145
{
148146
const NAME: &'static str = "TransformRowsFetcher";
@@ -152,18 +150,25 @@ where F: RowsFetcher + Send + Sync + 'static
152150
self.fetcher.on_start().await
153151
}
154152

153+
async fn transform(&mut self, data: DataBlock) -> Result<Option<DataBlock>> {
154+
self.blocks.push(data);
155+
Ok(None)
156+
}
157+
155158
#[async_backtrace::framed]
156-
async fn transform(&mut self, mut data: DataBlock) -> Result<DataBlock> {
159+
async fn on_finish(&mut self, _output: bool) -> Result<Option<DataBlock>> {
160+
if self.blocks.is_empty() {
161+
return Ok(None);
162+
}
163+
164+
let start_time = std::time::Instant::now();
165+
let num_blocks = self.blocks.len();
166+
let mut data = DataBlock::concat(&self.blocks)?;
167+
self.blocks.clear();
168+
157169
let num_rows = data.num_rows();
158170
if num_rows == 0 {
159-
// Although the data block is empty, we need to add empty columns to align the schema.
160-
let fetched_schema = self.fetcher.schema();
161-
for f in fetched_schema.fields().iter() {
162-
let builder = ColumnBuilder::with_capacity(f.data_type(), 0);
163-
let col = builder.build();
164-
data.add_column(BlockEntry::new(f.data_type().clone(), Value::Column(col)));
165-
}
166-
return Ok(data);
171+
return Ok(None);
167172
}
168173

169174
let entry = &data.columns()[self.row_id_col_offset];
@@ -189,7 +194,14 @@ where F: RowsFetcher + Send + Sync + 'static
189194
}
190195
}
191196

192-
Ok(data)
197+
log::info!(
198+
"TransformRowsFetcher on_finish: num_rows: {}, input blocks: {} in {} milliseconds",
199+
num_rows,
200+
num_blocks,
201+
start_time.elapsed().as_millis()
202+
);
203+
204+
Ok(Some(data))
193205
}
194206
}
195207

@@ -203,10 +215,11 @@ where F: RowsFetcher + Send + Sync + 'static
203215
fetcher: F,
204216
need_wrap_nullable: bool,
205217
) -> ProcessorPtr {
206-
ProcessorPtr::create(AsyncTransformer::create(input, output, Self {
218+
ProcessorPtr::create(AsyncAccumulatingTransformer::create(input, output, Self {
207219
row_id_col_offset,
208220
fetcher,
209221
need_wrap_nullable,
222+
blocks: vec![],
210223
}))
211224
}
212225
}

src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use databend_common_catalog::plan::Projection;
2626
use databend_common_catalog::table::Table;
2727
use databend_common_exception::Result;
2828
use databend_common_expression::DataBlock;
29-
use databend_common_expression::DataSchema;
3029
use databend_common_expression::TableSchemaRef;
3130
use databend_common_storage::ColumnNodes;
3231
use databend_storages_common_cache::LoadParams;
@@ -148,10 +147,6 @@ impl<const BLOCKING_IO: bool> RowsFetcher for NativeRowsFetcher<BLOCKING_IO> {
148147

149148
Ok(DataBlock::take_blocks(&blocks, &indices, num_rows))
150149
}
151-
152-
fn schema(&self) -> DataSchema {
153-
self.reader.data_schema()
154-
}
155150
}
156151

157152
impl<const BLOCKING_IO: bool> NativeRowsFetcher<BLOCKING_IO> {

src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use databend_common_catalog::table::Table;
2626
use databend_common_exception::ErrorCode;
2727
use databend_common_exception::Result;
2828
use databend_common_expression::DataBlock;
29-
use databend_common_expression::DataSchema;
3029
use databend_common_expression::TableSchemaRef;
3130
use databend_common_storage::ColumnNodes;
3231
use databend_storages_common_cache::LoadParams;
@@ -158,10 +157,6 @@ impl<const BLOCKING_IO: bool> RowsFetcher for ParquetRowsFetcher<BLOCKING_IO> {
158157

159158
Ok(DataBlock::take_blocks(&blocks, &indices, num_rows))
160159
}
161-
162-
fn schema(&self) -> DataSchema {
163-
self.reader.data_schema()
164-
}
165160
}
166161

167162
impl<const BLOCKING_IO: bool> ParquetRowsFetcher<BLOCKING_IO> {

0 commit comments

Comments
 (0)