Skip to content

Commit 2efc816

Browse files
committed
fix
1 parent c0f49fa commit 2efc816

File tree

4 files changed

+20
-5
lines changed

4 files changed

+20
-5
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ struct JoinRuntimeFilterPacketBuilder<'a> {
4242
inlist_threshold: usize,
4343
bloom_threshold: usize,
4444
min_max_threshold: usize,
45+
is_spill_happened: bool,
4546
selectivity_threshold: u64,
4647
}
4748

@@ -54,6 +55,7 @@ impl<'a> JoinRuntimeFilterPacketBuilder<'a> {
5455
bloom_threshold: usize,
5556
min_max_threshold: usize,
5657
selectivity_threshold: u64,
58+
is_spill_happened: bool,
5759
) -> Result<Self> {
5860
let build_key_column = Self::eval_build_key_column(data_blocks, func_ctx, build_key)?;
5961
Ok(Self {
@@ -63,14 +65,17 @@ impl<'a> JoinRuntimeFilterPacketBuilder<'a> {
6365
bloom_threshold,
6466
min_max_threshold,
6567
selectivity_threshold,
68+
is_spill_happened,
6669
})
6770
}
6871
fn build(&self, desc: &RuntimeFilterDesc) -> Result<RuntimeFilterPacket> {
69-
if !should_enable_runtime_filter(
70-
desc,
71-
self.build_key_column.len(),
72-
self.selectivity_threshold,
73-
) {
72+
if self.is_spill_happened
73+
|| !should_enable_runtime_filter(
74+
desc,
75+
self.build_key_column.len(),
76+
self.selectivity_threshold,
77+
)
78+
{
7479
return Ok(RuntimeFilterPacket {
7580
id: desc.id,
7681
inlist: None,
@@ -249,6 +254,7 @@ pub fn build_runtime_filter_packet(
249254
bloom_threshold: usize,
250255
min_max_threshold: usize,
251256
selectivity_threshold: u64,
257+
is_spill_happened: bool,
252258
) -> Result<JoinRuntimeFilterPacket> {
253259
if build_num_rows == 0 {
254260
return Ok(JoinRuntimeFilterPacket {
@@ -268,6 +274,7 @@ pub fn build_runtime_filter_packet(
268274
bloom_threshold,
269275
min_max_threshold,
270276
selectivity_threshold,
277+
is_spill_happened,
271278
)?
272279
.build(rf)?,
273280
);

src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/interface.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::atomic::Ordering;
1516
use std::time::Instant;
1617

1718
use databend_common_exception::Result;
@@ -48,6 +49,10 @@ pub async fn build_and_push_down_runtime_filter(
4849
.get_join_runtime_filter_selectivity_threshold()?;
4950

5051
let build_start = Instant::now();
52+
let is_spill_happened = join
53+
.hash_join_state
54+
.is_spill_happened
55+
.load(Ordering::Acquire);
5156
let mut packet = build_runtime_filter_packet(
5257
build_chunks,
5358
build_num_rows,
@@ -57,6 +62,7 @@ pub async fn build_and_push_down_runtime_filter(
5762
bloom_threshold,
5863
min_max_threshold,
5964
selectivity_threshold,
65+
is_spill_happened,
6066
)?;
6167
let build_time = build_start.elapsed();
6268

src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_build.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ impl Processor for TransformHashJoinBuild {
300300
.chunks
301301
.clone()
302302
};
303+
303304
let build_num_rows = unsafe {
304305
(*self.build_state.hash_join_state.build_state.get())
305306
.generation_state

src/query/service/src/pipelines/processors/transforms/new_hash_join/memory/inner_join.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ impl Join for InnerHashJoin {
103103
desc.bloom_threshold,
104104
desc.min_max_threshold,
105105
desc.selectivity_threshold,
106+
false,
106107
)
107108
}
108109

0 commit comments

Comments
 (0)