Skip to content

Commit f7c3650

Browse files
zhyassdantengsky
authored andcommitted
refactor: Improved Hilbert Clustering with Range Partition (databendlabs#17424)
* fix * fix * fix * fix * fix * fix * update * for test * for test * for test * for test * fix * fix * fix * remove m_cte * fix * fix * fix * fix * fix * restore m cte * fix * fix * fix * remove m_cte * fix * fix * fix * fix * fix * fix * fix * fix * for test * fix * fix * fix * fix * for test * fix * fix memory size * fix * fix * fix * fix * fix * fix * fix * recover * fix * fix * fix * fix * fix fix fix * fix * fix test * fix test * fix test * fix test * add hilbert_range_index * fix * fix * fix * fix * fix * fix * fix * chore: add some extra code comments --------- Co-authored-by: dantengsky <[email protected]>
1 parent 61e96fc commit f7c3650

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1391
-867
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/exception/src/exception_code.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,6 @@ build_exceptions! {
427427
InvalidSessionState(4004),
428428

429429
// recluster error codes
430-
NoNeedToRecluster(4011),
431430
NoNeedToCompact(4012),
432431
UnsupportedClusterType(4013),
433432

src/query/ee/src/hilbert_clustering/handler.rs

Lines changed: 42 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ use databend_common_catalog::plan::ReclusterInfoSideCar;
2020
use databend_common_catalog::table::Table;
2121
use databend_common_catalog::table_context::TableContext;
2222
use databend_common_exception::Result;
23-
use databend_common_expression::BlockThresholds;
23+
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
24+
use databend_common_io::constants::DEFAULT_BLOCK_PER_SEGMENT;
2425
use databend_common_storages_fuse::pruning::create_segment_location_vector;
2526
use databend_common_storages_fuse::statistics::reducers::merge_statistics_mut;
2627
use databend_common_storages_fuse::FuseTable;
2728
use databend_common_storages_fuse::SegmentLocation;
29+
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
30+
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
2831
use databend_enterprise_hilbert_clustering::HilbertClusteringHandler;
2932
use databend_enterprise_hilbert_clustering::HilbertClusteringHandlerWrapper;
3033
use databend_storages_common_table_meta::meta::ClusterStatistics;
@@ -53,26 +56,26 @@ impl HilbertClusteringHandler for RealHilbertClusteringHandler {
5356
return Ok(None);
5457
};
5558

56-
let block_thresholds = fuse_table.get_block_thresholds();
57-
let thresholds = BlockThresholds {
58-
max_rows_per_block: block_thresholds.block_per_segment
59-
* block_thresholds.max_rows_per_block,
60-
min_rows_per_block: block_thresholds.block_per_segment
61-
* block_thresholds.min_rows_per_block,
62-
max_bytes_per_block: block_thresholds.block_per_segment
63-
* block_thresholds.max_bytes_per_block,
64-
max_bytes_per_file: block_thresholds.block_per_segment
65-
* block_thresholds.max_bytes_per_file,
66-
block_per_segment: block_thresholds.block_per_segment,
67-
};
59+
let block_per_seg =
60+
fuse_table.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT);
61+
let hilbert_clustering_min_bytes =
62+
ctx.get_settings().get_hilbert_clustering_min_bytes()? as usize;
63+
let max_bytes_per_block = fuse_table.get_option(
64+
FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD,
65+
DEFAULT_BLOCK_BUFFER_SIZE,
66+
);
67+
let hilbert_min_bytes = std::cmp::max(
68+
hilbert_clustering_min_bytes,
69+
max_bytes_per_block * block_per_seg,
70+
);
6871
let segment_locations = snapshot.segments.clone();
6972
let segment_locations = create_segment_location_vector(segment_locations, None);
7073

7174
let max_threads = ctx.get_settings().get_max_threads()? as usize;
7275
let chunk_size = max_threads * 4;
7376
let mut checker = ReclusterChecker::new(
7477
cluster_key_id,
75-
thresholds,
78+
hilbert_min_bytes,
7679
push_downs.as_ref().is_none_or(|v| v.filters.is_none()),
7780
);
7881
'FOR: for chunk in segment_locations.chunks(chunk_size) {
@@ -99,12 +102,6 @@ impl HilbertClusteringHandler for RealHilbertClusteringHandler {
99102
return Ok(None);
100103
}
101104

102-
let rows_per_block =
103-
block_thresholds.calc_rows_per_block(checker.total_size, checker.total_rows, 0) as u64;
104-
let block_size = ctx.get_settings().get_max_block_size()?;
105-
ctx.get_settings()
106-
.set_max_block_size(rows_per_block.min(block_size))?;
107-
108105
let mut removed_statistics = Statistics::default();
109106
let mut removed_segment_indexes = Vec::with_capacity(target_segments.len());
110107
for (segment_loc, segment) in target_segments {
@@ -137,82 +134,60 @@ impl RealHilbertClusteringHandler {
137134

138135
struct ReclusterChecker {
139136
segments: Vec<(SegmentLocation, Arc<CompactSegmentInfo>)>,
137+
last_segment: Option<(SegmentLocation, Arc<CompactSegmentInfo>)>,
140138
default_cluster_id: u32,
141-
thresholds: BlockThresholds,
142139

143-
total_rows: usize,
144-
total_size: usize,
140+
hilbert_min_bytes: usize,
141+
total_bytes: usize,
145142

146143
finished: bool,
147144
// Whether the target segments is at the head of snapshot.
148145
head_of_snapshot: bool,
149146
}
150147

151148
impl ReclusterChecker {
152-
fn new(default_cluster_id: u32, thresholds: BlockThresholds, head_of_snapshot: bool) -> Self {
149+
fn new(default_cluster_id: u32, hilbert_min_bytes: usize, head_of_snapshot: bool) -> Self {
153150
Self {
154151
segments: vec![],
152+
last_segment: None,
155153
default_cluster_id,
156-
thresholds,
157-
total_rows: 0,
158-
total_size: 0,
154+
hilbert_min_bytes,
155+
total_bytes: 0,
159156
finished: false,
160157
head_of_snapshot,
161158
}
162159
}
163160

164161
fn add(&mut self, location: SegmentLocation, segment: Arc<CompactSegmentInfo>) -> bool {
165-
let row_count = segment.summary.row_count as usize;
166-
let byte_size = segment.summary.uncompressed_byte_size as usize;
167-
self.total_rows += row_count;
168-
self.total_size += byte_size;
169-
if !self
170-
.thresholds
171-
.check_large_enough(self.total_rows, self.total_size)
172-
{
173-
// totals < N
174-
self.segments.push((location, segment));
175-
return false;
176-
}
177-
178162
let segment_should_recluster = self.should_recluster(&segment, |v| {
179163
v.cluster_key_id != self.default_cluster_id || v.level != -1
180164
});
181-
let mut retained = false;
182-
if !self.head_of_snapshot || segment_should_recluster {
183-
if self
184-
.thresholds
185-
.check_for_compact(self.total_rows, self.total_size)
186-
{
187-
// N <= totals < 2N
188-
self.segments.push((location, segment));
189-
retained = true;
190-
} else if segment_should_recluster {
191-
// totals >= 2N
192-
self.segments = vec![(location, segment)];
193-
self.total_rows = row_count;
194-
self.total_size = byte_size;
195-
self.finished = true;
196-
return true;
197-
}
165+
166+
if segment_should_recluster || !self.head_of_snapshot {
167+
self.total_bytes += segment.summary.uncompressed_byte_size as usize;
168+
self.segments.push((location.clone(), segment.clone()));
198169
}
199170

200-
if self.check_for_recluster() {
201-
if !retained {
202-
self.total_rows -= row_count;
203-
self.total_size -= byte_size;
171+
if !segment_should_recluster || self.total_bytes >= self.hilbert_min_bytes {
172+
if self.check_for_recluster() {
173+
self.finished = true;
174+
return true;
204175
}
205-
self.finished = true;
206-
return true;
176+
self.last_segment = Some((location, segment));
177+
self.reset();
207178
}
208179

209-
self.reset();
210180
false
211181
}
212182

213183
fn finalize(&mut self) -> Vec<(SegmentLocation, Arc<CompactSegmentInfo>)> {
214-
if !self.finished && !self.check_for_recluster() {
215-
return vec![];
184+
if !self.finished {
185+
if let Some((location, segment)) = self.last_segment.take() {
186+
self.segments.push((location, segment));
187+
}
188+
if !self.check_for_recluster() {
189+
return vec![];
190+
}
216191
}
217192
std::mem::take(&mut self.segments)
218193
}
@@ -233,8 +208,7 @@ impl ReclusterChecker {
233208
}
234209

235210
fn reset(&mut self) {
236-
self.total_rows = 0;
237-
self.total_size = 0;
211+
self.total_bytes = 0;
238212
self.head_of_snapshot = false;
239213
self.segments.clear();
240214
}

0 commit comments

Comments
 (0)