Skip to content

Commit d639b03

Browse files
authored
fix: avoid excessive concurrency in row fetch (#17885)
* feat: drop unused rows as early as possible during row fetch * clear cache * limit max fetch size * Revert "limit max fetch size" This reverts commit 367acb7. * Revert "feat: drop unused rows as early as possible during row fetch" This reverts commit d11cd2a. * limit the concurrency of row fetch * Revert "Revert "feat: drop unused rows as early as possible during row fetch"" This reverts commit 313cef7. * simplify * use global io runtime
1 parent c0b9ed0 commit d639b03

File tree

3 files changed

+83
-84
lines changed

3 files changed

+83
-84
lines changed

src/query/service/src/pipelines/builders/builder_row_fetch.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
17+
use databend_common_base::runtime::GlobalIORuntime;
1518
use databend_common_exception::Result;
1619
use databend_common_pipeline_core::processors::InputPort;
1720
use databend_common_pipeline_core::processors::OutputPort;
@@ -21,18 +24,24 @@ use databend_common_pipeline_transforms::processors::create_dummy_item;
2124
use databend_common_sql::executor::physical_plans::RowFetch;
2225
use databend_common_sql::executor::PhysicalPlan;
2326
use databend_common_storages_fuse::operations::row_fetch_processor;
27+
use databend_common_storages_fuse::TableContext;
28+
use tokio::sync::Semaphore;
2429

2530
use crate::pipelines::PipelineBuilder;
26-
2731
impl PipelineBuilder {
2832
pub(crate) fn build_row_fetch(&mut self, row_fetch: &RowFetch) -> Result<()> {
2933
self.build_pipeline(&row_fetch.input)?;
34+
let max_io_requests = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
35+
let row_fetch_runtime = GlobalIORuntime::instance();
36+
let row_fetch_semaphore = Arc::new(Semaphore::new(max_io_requests));
3037
let processor = row_fetch_processor(
3138
self.ctx.clone(),
3239
row_fetch.row_id_col_offset,
3340
&row_fetch.source,
3441
row_fetch.cols_to_fetch.clone(),
3542
row_fetch.need_wrap_nullable,
43+
row_fetch_semaphore,
44+
row_fetch_runtime,
3645
)?;
3746
if !matches!(&*row_fetch.input, PhysicalPlan::MutationSplit(_)) {
3847
self.main_pipeline.add_transform(processor)?;

src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
use std::collections::HashSet;
1616
use std::sync::Arc;
1717

18+
use databend_common_base::base::tokio::sync::Semaphore;
19+
use databend_common_base::runtime::Runtime;
1820
use databend_common_catalog::plan::split_row_id;
1921
use databend_common_catalog::plan::DataSourcePlan;
2022
use databend_common_catalog::plan::Projection;
@@ -49,6 +51,8 @@ pub fn row_fetch_processor(
4951
source: &DataSourcePlan,
5052
projection: Projection,
5153
need_wrap_nullable: bool,
54+
semaphore: Arc<Semaphore>,
55+
runtime: Arc<Runtime>,
5256
) -> Result<RowFetcher> {
5357
let table = ctx.build_table_from_source_plan(source)?;
5458
let fuse_table = table
@@ -107,7 +111,8 @@ pub fn row_fetch_processor(
107111
projection.clone(),
108112
block_reader.clone(),
109113
read_settings,
110-
max_threads,
114+
semaphore.clone(),
115+
runtime.clone(),
111116
),
112117
need_wrap_nullable,
113118
)
@@ -122,7 +127,8 @@ pub fn row_fetch_processor(
122127
projection.clone(),
123128
block_reader.clone(),
124129
read_settings,
125-
max_threads,
130+
semaphore.clone(),
131+
runtime.clone(),
126132
),
127133
need_wrap_nullable,
128134
)

src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs

Lines changed: 65 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ use std::collections::HashMap;
1616
use std::collections::HashSet;
1717
use std::sync::Arc;
1818

19-
use databend_common_base::runtime::execute_futures_in_parallel;
19+
use databend_common_base::base::tokio::sync::Semaphore;
20+
use databend_common_base::runtime::Runtime;
2021
use databend_common_catalog::plan::block_idx_in_segment;
2122
use databend_common_catalog::plan::split_prefix;
2223
use databend_common_catalog::plan::split_row_id;
@@ -25,13 +26,15 @@ use databend_common_catalog::plan::Projection;
2526
use databend_common_catalog::table::Table;
2627
use databend_common_exception::ErrorCode;
2728
use databend_common_exception::Result;
29+
use databend_common_expression::BlockRowIndex;
2830
use databend_common_expression::DataBlock;
2931
use databend_common_expression::TableSchemaRef;
3032
use databend_common_storage::ColumnNodes;
3133
use databend_storages_common_cache::LoadParams;
3234
use databend_storages_common_io::ReadSettings;
3335
use databend_storages_common_table_meta::meta::BlockMeta;
3436
use databend_storages_common_table_meta::meta::TableSnapshot;
37+
use futures_util::future;
3538
use itertools::Itertools;
3639

3740
use super::fuse_rows_fetcher::RowsFetcher;
@@ -54,8 +57,8 @@ pub(super) struct ParquetRowsFetcher<const BLOCKING_IO: bool> {
5457
part_map: HashMap<u64, PartInfoPtr>,
5558
segment_blocks_cache: HashMap<u64, Vec<Arc<BlockMeta>>>,
5659

57-
// To control the parallelism of fetching blocks.
58-
max_threads: usize,
60+
semaphore: Arc<Semaphore>,
61+
runtime: Arc<Runtime>,
5962
}
6063

6164
#[async_trait::async_trait]
@@ -68,6 +71,7 @@ impl<const BLOCKING_IO: bool> RowsFetcher for ParquetRowsFetcher<BLOCKING_IO> {
6871

6972
fn clear_cache(&mut self) {
7073
self.part_map.clear();
74+
self.segment_blocks_cache.clear();
7175
}
7276

7377
#[async_backtrace::framed]
@@ -77,72 +81,57 @@ impl<const BLOCKING_IO: bool> RowsFetcher for ParquetRowsFetcher<BLOCKING_IO> {
7781
let num_rows = row_ids.len();
7882
let mut part_set = HashSet::new();
7983
let mut row_set = Vec::with_capacity(num_rows);
84+
let mut block_row_indices = HashMap::new();
8085
for row_id in row_ids {
8186
let (prefix, idx) = split_row_id(*row_id);
8287
part_set.insert(prefix);
8388
row_set.push((prefix, idx));
89+
block_row_indices
90+
.entry(prefix)
91+
.or_insert(Vec::new())
92+
.push((0u32, idx as u32, 1usize));
8493
}
8594

8695
// Read blocks in `prefix` order.
8796
let part_set = part_set.into_iter().sorted().collect::<Vec<_>>();
88-
let idx_map = part_set
97+
let mut idx_map = part_set
8998
.iter()
9099
.enumerate()
91-
.map(|(i, p)| (*p, i))
100+
.map(|(i, p)| (*p, (i, 0)))
92101
.collect::<HashMap<_, _>>();
93-
// parts_per_thread = num_parts / max_threads
94-
// remain = num_parts % max_threads
95-
// task distribution:
96-
// Part number of each task | Task number
97-
// ------------------------------------------------------
98-
// parts_per_thread + 1 | remain
99-
// parts_per_thread | max_threads - remain
100-
let num_parts = part_set.len();
101-
let mut tasks = Vec::with_capacity(self.max_threads);
102-
// Fetch blocks in parallel.
103-
let part_size = num_parts / self.max_threads;
104-
let remainder = num_parts % self.max_threads;
105-
let mut begin = 0;
106-
for i in 0..self.max_threads {
107-
let end = if i < remainder {
108-
begin + part_size + 1
109-
} else {
110-
begin + part_size
111-
};
112-
if begin == end {
113-
break;
114-
}
115-
let parts = part_set[begin..end]
116-
.iter()
117-
.map(|idx| self.part_map[idx].clone())
118-
.collect::<Vec<_>>();
119-
tasks.push(Self::fetch_blocks(
102+
103+
let mut tasks = Vec::with_capacity(part_set.len());
104+
for part in &part_set {
105+
tasks.push(Self::fetch_block(
120106
self.reader.clone(),
121-
parts,
107+
self.part_map[part].clone(),
122108
self.settings,
109+
block_row_indices[part].clone(),
123110
));
124-
begin = end;
125111
}
126112

127-
let num_task = tasks.len();
128-
let blocks = execute_futures_in_parallel(
129-
tasks,
130-
num_task,
131-
num_task * 2,
132-
"parqeut rows fetch".to_string(),
133-
)
134-
.await?
135-
.into_iter()
136-
.collect::<Result<Vec<_>>>()?
137-
.into_iter()
138-
.flatten()
139-
.collect::<Vec<_>>();
113+
let tasks = tasks.into_iter().map(|v| {
114+
|permit| async {
115+
let r = v.await;
116+
drop(permit);
117+
r
118+
}
119+
});
120+
let join_handlers = self
121+
.runtime
122+
.try_spawn_batch_with_owned_semaphore(self.semaphore.clone(), tasks)
123+
.await?;
124+
125+
let joint = future::try_join_all(join_handlers).await?;
126+
let blocks = joint.into_iter().collect::<Result<Vec<_>>>()?;
140127
// Take result rows from blocks.
141128
let indices = row_set
142129
.iter()
143-
.map(|(prefix, row_idx)| {
144-
let block_idx = idx_map[prefix];
145-
(block_idx as u32, *row_idx as u32, 1_usize)
130+
.map(|(prefix, _)| {
131+
let (block_idx, row_idx_in_block) = idx_map.get_mut(prefix).unwrap();
132+
let row_idx = *row_idx_in_block;
133+
*row_idx_in_block += 1;
134+
(*block_idx as u32, row_idx as u32, 1_usize)
146135
})
147136
.collect::<Vec<_>>();
148137

@@ -169,7 +158,8 @@ impl<const BLOCKING_IO: bool> ParquetRowsFetcher<BLOCKING_IO> {
169158
projection: Projection,
170159
reader: Arc<BlockReader>,
171160
settings: ReadSettings,
172-
max_threads: usize,
161+
semaphore: Arc<Semaphore>,
162+
runtime: Arc<Runtime>,
173163
) -> Self {
174164
let schema = table.schema();
175165
let segment_reader =
@@ -184,7 +174,8 @@ impl<const BLOCKING_IO: bool> ParquetRowsFetcher<BLOCKING_IO> {
184174
settings,
185175
part_map: HashMap::new(),
186176
segment_blocks_cache: HashMap::new(),
187-
max_threads,
177+
semaphore,
178+
runtime,
188179
}
189180
}
190181

@@ -238,38 +229,31 @@ impl<const BLOCKING_IO: bool> ParquetRowsFetcher<BLOCKING_IO> {
238229
}
239230

240231
#[async_backtrace::framed]
241-
async fn fetch_blocks(
232+
async fn fetch_block(
242233
reader: Arc<BlockReader>,
243-
parts: Vec<PartInfoPtr>,
234+
part: PartInfoPtr,
244235
settings: ReadSettings,
245-
) -> Result<Vec<DataBlock>> {
246-
let mut chunks = Vec::with_capacity(parts.len());
247-
if BLOCKING_IO {
248-
for part in parts.iter() {
249-
let chunk = reader.sync_read_columns_data_by_merge_io(&settings, part, &None)?;
250-
chunks.push(chunk);
251-
}
236+
block_row_indices: Vec<BlockRowIndex>,
237+
) -> Result<DataBlock> {
238+
let chunk = if BLOCKING_IO {
239+
reader.sync_read_columns_data_by_merge_io(&settings, &part, &None)?
252240
} else {
253-
for part in parts.iter() {
254-
let part = FuseBlockPartInfo::from_part(part)?;
255-
let chunk = reader
256-
.read_columns_data_by_merge_io(
257-
&settings,
258-
&part.location,
259-
&part.columns_meta,
260-
&None,
261-
)
262-
.await?;
263-
chunks.push(chunk);
264-
}
265-
}
266-
let fetched_blocks = chunks
267-
.into_iter()
268-
.zip(parts.iter())
269-
.map(|(chunk, part)| Self::build_block(&reader, part, chunk))
270-
.collect::<Result<Vec<_>>>()?;
271-
272-
Ok(fetched_blocks)
241+
let fuse_part = FuseBlockPartInfo::from_part(&part)?;
242+
reader
243+
.read_columns_data_by_merge_io(
244+
&settings,
245+
&fuse_part.location,
246+
&fuse_part.columns_meta,
247+
&None,
248+
)
249+
.await?
250+
};
251+
let block = Self::build_block(&reader, &part, chunk)?;
252+
Ok(DataBlock::take_blocks(
253+
&[block],
254+
&block_row_indices,
255+
block_row_indices.len(),
256+
))
273257
}
274258

275259
fn build_block(

0 commit comments

Comments
 (0)