Skip to content

Commit 4a80ca2

Browse files
forsaken628TCeason
andauthored
fix(query): sort spilling may hang (#16672)
* fix Signed-off-by: coldWater <[email protected]> * refactor sort test Signed-off-by: coldWater <[email protected]> * test Signed-off-by: coldWater <[email protected]> * fix lint Signed-off-by: coldWater <[email protected]> --------- Signed-off-by: coldWater <[email protected]> Co-authored-by: TCeason <[email protected]>
1 parent 2059d71 commit 4a80ca2

File tree

10 files changed

+415
-174
lines changed

10 files changed

+415
-174
lines changed

src/query/pipeline/transforms/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@
2020
#![feature(iter_map_windows)]
2121

2222
pub mod processors;
23+
pub use processors::*;

src/query/service/src/pipelines/processors/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
pub use databend_common_pipeline_core::processors::*;
16-
pub(crate) mod transforms;
16+
pub mod transforms;
1717

1818
pub use transforms::HashJoinBuildState;
1919
pub use transforms::HashJoinDesc;

src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_cell.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ impl<T: HashMethodBounds, V: Send + Sync + 'static> HashTableCell<T, V> {
6767
self.hashtable.len()
6868
}
6969

70+
pub fn is_empty(&self) -> bool {
71+
self.len() == 0
72+
}
73+
7074
pub fn allocated_bytes(&self) -> usize {
7175
self.hashtable.bytes_len(false)
7276
+ self.arena.allocated_bytes()

src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ impl<Method: HashMethodBounds> BlockMetaTransform<ExchangeShuffleMeta>
163163
&self.location_prefix,
164164
payload,
165165
)?,
166-
false => agg_spilling_aggregate_payload::<Method>(
166+
false => agg_spilling_aggregate_payload(
167167
self.ctx.clone(),
168168
self.operator.clone(),
169169
&self.location_prefix,
@@ -239,7 +239,7 @@ impl<Method: HashMethodBounds> BlockMetaTransform<ExchangeShuffleMeta>
239239
}
240240
}
241241

242-
fn agg_spilling_aggregate_payload<Method: HashMethodBounds>(
242+
fn agg_spilling_aggregate_payload(
243243
ctx: Arc<QueryContext>,
244244
operator: Operator,
245245
location_prefix: &str,

src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ impl<Method: HashMethodBounds> BlockMetaTransform<ExchangeShuffleMeta>
214214
&self.location_prefix,
215215
payload,
216216
)?,
217-
false => agg_spilling_group_by_payload::<Method>(
217+
false => agg_spilling_group_by_payload(
218218
self.ctx.clone(),
219219
self.operator.clone(),
220220
&self.location_prefix,
@@ -292,7 +292,7 @@ fn get_columns(data_block: DataBlock) -> Vec<BlockEntry> {
292292
data_block.columns().to_vec()
293293
}
294294

295-
fn agg_spilling_group_by_payload<Method: HashMethodBounds>(
295+
fn agg_spilling_group_by_payload(
296296
ctx: Arc<QueryContext>,
297297
operator: Operator,
298298
location_prefix: &str,

src/query/service/src/pipelines/processors/transforms/transform_async_function.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ pub struct TransformAsyncFunction {
4040
}
4141

4242
impl TransformAsyncFunction {
43-
pub fn new(
43+
pub(crate) fn new(
4444
ctx: Arc<QueryContext>,
4545
async_func_descs: Vec<AsyncFunctionDesc>,
4646
operators: BTreeMap<usize, Arc<DictionaryOperator>>,

src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,13 @@ where
129129
}
130130

131131
if let Some(block) = self.output_data.take() {
132-
debug_assert!(matches!(self.state, State::MergeFinal | State::Finish));
132+
assert!(matches!(self.state, State::MergeFinal | State::Finish));
133133
self.output_block(block);
134134
return Ok(Event::NeedConsume);
135135
}
136136

137137
if matches!(self.state, State::Finish) {
138-
debug_assert!(self.input.is_finished());
138+
assert!(self.input.is_finished());
139139
self.output.finish();
140140
return Ok(Event::Finished);
141141
}
@@ -179,6 +179,7 @@ where
179179
if meta.is_none() {
180180
// It means we get the last block.
181181
// We can launch external merge sort now.
182+
self.input.finish();
182183
self.state = State::Merging;
183184
}
184185
self.input_data = Some(block);

src/query/service/tests/it/pipelines/transforms/sort.rs

Lines changed: 20 additions & 166 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use databend_common_base::base::tokio;
1919
use databend_common_base::base::tokio::sync::mpsc::channel;
2020
use databend_common_base::base::tokio::sync::mpsc::Receiver;
2121
use databend_common_exception::Result;
22-
use databend_common_expression::block_debug::pretty_format_blocks;
2322
use databend_common_expression::types::Int32Type;
2423
use databend_common_expression::DataBlock;
2524
use databend_common_expression::DataField;
@@ -34,130 +33,16 @@ use databend_common_pipeline_core::PipeItem;
3433
use databend_common_pipeline_core::Pipeline;
3534
use databend_common_pipeline_sinks::SyncSenderSink;
3635
use databend_common_pipeline_sources::BlocksSource;
37-
use databend_common_pipeline_transforms::processors::add_k_way_merge_sort;
3836
use databend_query::pipelines::executor::ExecutorSettings;
3937
use databend_query::pipelines::executor::QueryPipelineExecutor;
4038
use databend_query::sessions::QueryContext;
4139
use databend_query::test_kits::TestFixture;
42-
use itertools::Itertools;
43-
use parking_lot::Mutex;
4440
use rand::rngs::ThreadRng;
4541
use rand::Rng;
4642

47-
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
48-
async fn test_k_way_merge_sort() -> Result<()> {
49-
let fixture = TestFixture::setup().await?;
50-
let ctx = fixture.new_query_ctx().await?;
51-
52-
let worker = 3;
53-
let block_size = 4;
54-
let limit = None;
55-
let (data, expected) = basic_test_data(None);
56-
let (executor, mut rx) = create_pipeline(ctx, data, worker, block_size, limit)?;
57-
58-
executor.execute()?;
59-
60-
let mut got: Vec<DataBlock> = Vec::new();
61-
while !rx.is_empty() {
62-
got.push(rx.recv().await.unwrap()?);
63-
}
64-
65-
check_result(got, expected);
66-
67-
Ok(())
68-
}
69-
70-
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
71-
async fn test_k_way_merge_sort_fuzz() -> Result<()> {
72-
let mut rng = rand::thread_rng();
73-
let fixture = TestFixture::setup().await?;
74-
75-
for _ in 0..10 {
76-
let ctx = fixture.new_query_ctx().await?;
77-
run_fuzz(ctx, &mut rng, false).await?;
78-
}
79-
80-
for _ in 0..10 {
81-
let ctx = fixture.new_query_ctx().await?;
82-
run_fuzz(ctx, &mut rng, true).await?;
83-
}
84-
Ok(())
85-
}
86-
87-
async fn run_fuzz(ctx: Arc<QueryContext>, rng: &mut ThreadRng, with_limit: bool) -> Result<()> {
88-
let worker = rng.gen_range(1..=5);
89-
let block_size = rng.gen_range(1..=20);
90-
let (data, expected, limit) = random_test_data(rng, with_limit);
91-
92-
// println!("\nwith_limit {with_limit}");
93-
// for (input, blocks) in data.iter().enumerate() {
94-
// println!("intput {input}");
95-
// for b in blocks {
96-
// println!("{:?}", b.columns()[0].value);
97-
// }
98-
// }
99-
100-
let (executor, mut rx) = create_pipeline(ctx, data, worker, block_size, limit)?;
101-
executor.execute()?;
102-
103-
let mut got: Vec<DataBlock> = Vec::new();
104-
while !rx.is_empty() {
105-
got.push(rx.recv().await.unwrap()?);
106-
}
107-
108-
check_result(got, expected);
109-
110-
Ok(())
111-
}
112-
113-
fn create_pipeline(
114-
ctx: Arc<QueryContext>,
115-
data: Vec<Vec<DataBlock>>,
116-
worker: usize,
117-
block_size: usize,
118-
limit: Option<usize>,
119-
) -> Result<(Arc<QueryPipelineExecutor>, Receiver<Result<DataBlock>>)> {
120-
let mut pipeline = Pipeline::create();
121-
122-
let data_type = data[0][0].get_by_offset(0).data_type.clone();
123-
let source_pipe = create_source_pipe(ctx, data)?;
124-
pipeline.add_pipe(source_pipe);
125-
126-
let schema = DataSchemaRefExt::create(vec![DataField::new("a", data_type)]);
127-
let sort_desc = Arc::new(vec![SortColumnDescription {
128-
offset: 0,
129-
asc: true,
130-
nulls_first: true,
131-
is_nullable: false,
132-
}]);
133-
add_k_way_merge_sort(
134-
&mut pipeline,
135-
schema,
136-
worker,
137-
block_size,
138-
limit,
139-
sort_desc,
140-
false,
141-
true,
142-
)?;
143-
144-
let (mut rx, sink_pipe) = create_sink_pipe(1)?;
145-
let rx = rx.pop().unwrap();
146-
pipeline.add_pipe(sink_pipe);
147-
pipeline.set_max_threads(3);
148-
149-
let settings = ExecutorSettings {
150-
query_id: Arc::new("".to_string()),
151-
max_execute_time_in_seconds: Default::default(),
152-
enable_queries_executor: false,
153-
max_threads: 8,
154-
executor_node_id: "".to_string(),
155-
};
156-
let executor = QueryPipelineExecutor::create(pipeline, settings)?;
157-
Ok((executor, rx))
158-
}
159-
16043
fn create_source_pipe(ctx: Arc<QueryContext>, data: Vec<Vec<DataBlock>>) -> Result<Pipe> {
44+
use parking_lot::Mutex;
45+
16146
let size = data.len();
16247
let mut items = Vec::with_capacity(size);
16348

@@ -179,7 +64,7 @@ fn create_source_pipe(ctx: Arc<QueryContext>, data: Vec<Vec<DataBlock>>) -> Resu
17964
fn create_sink_pipe(size: usize) -> Result<(Vec<Receiver<Result<DataBlock>>>, Pipe)> {
18065
let mut rxs = Vec::with_capacity(size);
18166
let mut items = Vec::with_capacity(size);
182-
for _index in 0..size {
67+
for _ in 0..size {
18368
let input = InputPort::create();
18469
let (tx, rx) = channel(1000);
18570
rxs.push(rx);
@@ -193,21 +78,11 @@ fn create_sink_pipe(size: usize) -> Result<(Vec<Receiver<Result<DataBlock>>>, Pi
19378
Ok((rxs, Pipe::create(size, 0, items)))
19479
}
19580

196-
/// Returns (input, expected)
197-
pub fn basic_test_data(limit: Option<usize>) -> (Vec<Vec<DataBlock>>, DataBlock) {
198-
let data = vec![
199-
vec![vec![1, 2, 3, 4], vec![4, 5, 6, 7]],
200-
vec![vec![1, 1, 1, 1], vec![1, 10, 100, 2000]],
201-
vec![vec![0, 2, 4, 5]],
202-
];
203-
204-
prepare_input_and_result(data, limit)
205-
}
206-
207-
fn prepare_input_and_result(
81+
fn prepare_multi_input_and_result(
20882
data: Vec<Vec<Vec<i32>>>,
20983
limit: Option<usize>,
21084
) -> (Vec<Vec<DataBlock>>, DataBlock) {
85+
use itertools::Itertools;
21186
let input = data
21287
.clone()
21388
.into_iter()
@@ -229,7 +104,17 @@ fn prepare_input_and_result(
229104
(input, result)
230105
}
231106

107+
fn prepare_single_input_and_result(
108+
data: Vec<Vec<i32>>,
109+
limit: Option<usize>,
110+
) -> (Vec<DataBlock>, DataBlock) {
111+
let (mut input, expected) = prepare_multi_input_and_result(vec![data], limit);
112+
(input.remove(0), expected)
113+
}
114+
232115
fn check_result(result: Vec<DataBlock>, expected: DataBlock) {
116+
use databend_common_expression::block_debug::pretty_format_blocks;
117+
233118
if expected.is_empty() {
234119
if !result.is_empty() && !DataBlock::concat(&result).unwrap().is_empty() {
235120
panic!(
@@ -240,46 +125,15 @@ fn check_result(result: Vec<DataBlock>, expected: DataBlock) {
240125
return;
241126
}
242127

243-
let result_rows: usize = result.iter().map(|v| v.num_rows()).sum();
244-
let result = pretty_format_blocks(&result).unwrap();
245128
let expected_rows = expected.num_rows();
246129
let expected = pretty_format_blocks(&[expected]).unwrap();
130+
let result_rows: usize = result.iter().map(|v| v.num_rows()).sum();
131+
let result = pretty_format_blocks(&result).unwrap();
247132
assert_eq!(
248133
expected, result,
249-
"\nexpected (num_rows = {}):\n{}\nactual (num_rows = {}):\n{}",
250-
expected_rows, expected, result_rows, result
134+
"\nexpected (num_rows = {expected_rows}):\n{expected}\nactual (num_rows = {result_rows}):\n{result}",
251135
);
252136
}
253137

254-
fn random_test_data(
255-
rng: &mut ThreadRng,
256-
with_limit: bool,
257-
) -> (Vec<Vec<DataBlock>>, DataBlock, Option<usize>) {
258-
let random_batch_size = rng.gen_range(1..=10);
259-
let random_num_streams = rng.gen_range(5..=10);
260-
261-
let random_data = (0..random_num_streams)
262-
.map(|_| {
263-
let random_num_blocks = rng.gen_range(1..=10);
264-
let mut data = (0..random_batch_size * random_num_blocks)
265-
.map(|_| rng.gen_range(0..=1000))
266-
.collect::<Vec<_>>();
267-
data.sort();
268-
data.chunks(random_batch_size)
269-
.map(|v| v.to_vec())
270-
.collect::<Vec<_>>()
271-
})
272-
.collect::<Vec<_>>();
273-
274-
let num_rows = random_data
275-
.iter()
276-
.map(|v| v.iter().map(|v| v.len()).sum::<usize>())
277-
.sum::<usize>();
278-
let limit = if with_limit {
279-
Some(rng.gen_range(0..=num_rows))
280-
} else {
281-
None
282-
};
283-
let (input, expected) = prepare_input_and_result(random_data, limit);
284-
(input, expected, limit)
285-
}
138+
mod k_way;
139+
mod spill;

0 commit comments

Comments
 (0)