Skip to content

Commit c9b1a82

Browse files
authored
fix(ci): flaky test (#16933)
* flaky test * fix * fix test
1 parent 10a37b9 commit c9b1a82

File tree

2 files changed

+28
-7
lines changed

2 files changed

+28
-7
lines changed

src/query/service/src/pipelines/builders/builder_mutation.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use databend_common_exception::Result;
2020
use databend_common_expression::BlockThresholds;
2121
use databend_common_expression::DataSchema;
2222
use databend_common_expression::DataSchemaRef;
23+
use databend_common_pipeline_core::processors::create_resize_item;
2324
use databend_common_pipeline_core::processors::InputPort;
2425
use databend_common_pipeline_core::processors::OutputPort;
2526
use databend_common_pipeline_core::processors::ProcessorPtr;
@@ -239,6 +240,18 @@ impl PipelineBuilder {
239240
) -> Result<()> {
240241
// we should avoid too much little block write, because for s3 write, there are too many
241242
// little blocks, it will cause high latency.
243+
let mut origin_len = transform_len;
244+
let mut resize_len = 1;
245+
let mut pipe_items = Vec::with_capacity(2);
246+
if need_match {
247+
origin_len += 1;
248+
resize_len += 1;
249+
pipe_items.push(create_dummy_item());
250+
}
251+
pipe_items.push(create_resize_item(transform_len, 1));
252+
self.main_pipeline
253+
.add_pipe(Pipe::create(origin_len, resize_len, pipe_items));
254+
242255
let mut builder = self.main_pipeline.add_transform_with_specified_len(
243256
|transform_input_port, transform_output_port| {
244257
Ok(ProcessorPtr::create(AccumulatingTransformer::create(
@@ -247,13 +260,21 @@ impl PipelineBuilder {
247260
BlockCompactBuilder::new(block_thresholds),
248261
)))
249262
},
250-
transform_len,
263+
1,
251264
)?;
252265
if need_match {
253266
builder.add_items_prepend(vec![create_dummy_item()]);
254267
}
255268
self.main_pipeline.add_pipe(builder.finalize());
256269

270+
let mut pipe_items = Vec::with_capacity(2);
271+
if need_match {
272+
pipe_items.push(create_dummy_item());
273+
}
274+
pipe_items.push(create_resize_item(1, transform_len));
275+
self.main_pipeline
276+
.add_pipe(Pipe::create(resize_len, origin_len, pipe_items));
277+
257278
let mut builder = self.main_pipeline.add_transform_with_specified_len(
258279
|transform_input_port, transform_output_port| {
259280
Ok(ProcessorPtr::create(BlockMetaTransformer::create(

tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,25 +91,25 @@ statement ok
9191
create table t2(a int)
9292

9393
statement ok
94-
insert into t2 values(1),(2),(8)
94+
insert into t2 values(0),(2),(1)
9595

9696
statement ok
9797
set enable_experimental_merge_into = 1
9898

9999
query TTT
100-
settings (max_threads = 8) merge into t using t2 on t.a = t2.a when matched and t2.a = 1 then update set t.a = 0 when matched and t2.a = 2 then delete when not matched then insert *
100+
merge into t using t2 on t.a = t2.a when matched and t2.a = 1 then update set t.a = 8 when matched and t2.a = 2 then delete when not matched then insert *
101101
----
102102
1 1 1
103103

104104
query IBBII
105105
select a, _origin_version is null, _origin_block_id is null, _origin_block_row_num, _row_version from t order by a
106106
----
107-
0 0 0 0 1
107+
0 0 0 0 0
108108
3 0 0 1 1
109109
5 0 0 0 0
110110
6 0 0 0 0
111111
7 0 0 1 0
112-
8 0 0 0 0
112+
8 0 0 0 1
113113

114114
statement ok
115115
create table t1(a int) change_tracking = true
@@ -131,12 +131,12 @@ merge into t using t1 on t.a = t1.a when matched and t1.a = 0 then update set t.
131131
query IBBII
132132
select a, _origin_version is null, _origin_block_id is null, _origin_block_row_num, _row_version from t order by a
133133
----
134-
1 0 0 0 2
134+
1 0 0 0 1
135135
2 0 0 1 2
136136
5 0 0 0 0
137137
6 0 0 0 0
138138
7 0 0 1 0
139-
8 0 0 0 0
139+
8 0 0 0 1
140140

141141
###############
142142
# issue 14955 #

0 commit comments

Comments
 (0)