Skip to content

Commit e084d6f

Browse files
committed
fix
1 parent 498dab7 commit e084d6f

File tree

9 files changed

+58
-58
lines changed

9 files changed

+58
-58
lines changed

src/query/expression/src/block.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ impl BlockEntry {
7777
pub fn to_column(&self, num_rows: usize) -> Column {
7878
self.value.convert_to_full_column(&self.data_type, num_rows)
7979
}
80+
81+
pub fn into_column(self, num_rows: usize) -> Column {
82+
self.value.into_full_column(&self.data_type, num_rows)
83+
}
8084
}
8185

8286
#[typetag::serde(tag = "type")]

src/query/expression/src/utils/block_thresholds.rs

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,7 @@ impl BlockThresholds {
6767
}
6868

6969
#[inline]
70-
pub fn calc_rows_per_block(
71-
&self,
72-
total_bytes: usize,
73-
total_rows: usize,
74-
block_per_seg: Option<usize>,
75-
) -> usize {
70+
pub fn calc_rows_per_block(&self, total_bytes: usize, total_rows: usize) -> usize {
7671
if self.check_for_compact(total_rows, total_bytes) {
7772
return total_rows;
7873
}
@@ -83,35 +78,6 @@ impl BlockThresholds {
8378
return self.max_rows_per_block;
8479
}
8580

86-
let mut rows_per_block = total_rows.div_ceil(block_num_by_size);
87-
if let Some(block_per_seg) = block_per_seg {
88-
if block_num_by_size >= block_per_seg {
89-
return block_num_by_size;
90-
}
91-
}
92-
93-
let max_bytes_per_block = match rows_per_block {
94-
v if v < self.max_rows_per_block / 10 => {
95-
// If block rows < 100_000, max_bytes_per_block set to 200M
96-
2 * self.max_bytes_per_block
97-
}
98-
v if v < self.max_rows_per_block / 2 => {
99-
// If block rows < 500_000, max_bytes_per_block set to 150M
100-
3 * self.max_bytes_per_block / 2
101-
}
102-
v if v < self.min_rows_per_block => {
103-
// If block rows < 800_000, max_bytes_per_block set to 125M
104-
5 * self.max_bytes_per_block / 4
105-
}
106-
_ => self.max_bytes_per_block,
107-
};
108-
109-
if max_bytes_per_block > self.max_bytes_per_block {
110-
rows_per_block = std::cmp::max(
111-
total_rows / (std::cmp::max(total_bytes / max_bytes_per_block, 1)),
112-
1,
113-
);
114-
}
115-
rows_per_block
81+
total_rows.div_ceil(block_num_by_size)
11682
}
11783
}

src/query/expression/src/values.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,16 @@ impl Value<AnyType> {
329329
}
330330
}
331331

332+
pub fn into_full_column(self, ty: &DataType, num_rows: usize) -> Column {
333+
match self {
334+
Value::Scalar(s) => {
335+
let builder = ColumnBuilder::repeat(&s.as_ref(), num_rows, ty);
336+
builder.build()
337+
}
338+
Value::Column(c) => c,
339+
}
340+
}
341+
332342
pub fn try_downcast<T: ValueType>(&self) -> Option<Value<T>> {
333343
Some(match self {
334344
Value::Scalar(scalar) => Value::Scalar(T::to_owned_scalar(T::try_downcast_scalar(

src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,30 +12,58 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
1517
use databend_common_exception::Result;
1618
use databend_common_expression::BlockThresholds;
1719
use databend_common_expression::Column;
1820
use databend_common_expression::DataBlock;
1921
use databend_common_expression::Value;
22+
use databend_common_pipeline_core::processors::InputPort;
23+
use databend_common_pipeline_core::processors::OutputPort;
24+
use databend_common_pipeline_core::processors::ProcessorPtr;
2025
use databend_common_pipeline_core::Pipeline;
2126

2227
use crate::processors::AccumulatingTransform;
2328
use crate::processors::BlockCompactMeta;
2429
use crate::processors::TransformCompactBlock;
2530
use crate::processors::TransformPipelineHelper;
31+
use crate::Transform;
32+
use crate::Transformer;
2633

2734
pub fn build_compact_block_pipeline(
2835
pipeline: &mut Pipeline,
2936
thresholds: BlockThresholds,
3037
) -> Result<()> {
3138
let output_len = pipeline.output_len();
39+
pipeline.add_transform(ConvertToFullTransform::create)?;
3240
pipeline.try_resize(1)?;
3341
pipeline.add_accumulating_transformer(|| BlockCompactBuilder::new(thresholds));
3442
pipeline.try_resize(output_len)?;
3543
pipeline.add_block_meta_transformer(TransformCompactBlock::default);
3644
Ok(())
3745
}
3846

47+
pub(crate) struct ConvertToFullTransform;
48+
49+
impl ConvertToFullTransform {
50+
pub(crate) fn create(input: Arc<InputPort>, output: Arc<OutputPort>) -> Result<ProcessorPtr> {
51+
Ok(ProcessorPtr::create(Transformer::create(
52+
input,
53+
output,
54+
ConvertToFullTransform {},
55+
)))
56+
}
57+
}
58+
59+
impl Transform for ConvertToFullTransform {
60+
const NAME: &'static str = "ConvertToFullTransform";
61+
62+
fn transform(&mut self, data: DataBlock) -> Result<DataBlock> {
63+
Ok(data.consume_convert_to_full())
64+
}
65+
}
66+
3967
pub struct BlockCompactBuilder {
4068
thresholds: BlockThresholds,
4169
// Holds blocks that are partially accumulated but haven't reached the threshold.
@@ -71,9 +99,7 @@ impl AccumulatingTransform for BlockCompactBuilder {
7199
// holding slices of blocks to merge later may lead to oom, so
72100
// 1. we expect blocks from file formats are not slice.
73101
// 2. if block is split here, cut evenly and emit them at once.
74-
let rows_per_block = self
75-
.thresholds
76-
.calc_rows_per_block(num_bytes, num_rows, None);
102+
let rows_per_block = self.thresholds.calc_rows_per_block(num_bytes, num_rows);
77103
Ok(vec![DataBlock::empty_with_meta(Box::new(
78104
BlockCompactMeta::Split {
79105
block: data,
@@ -122,7 +148,11 @@ pub fn memory_size(data_block: &DataBlock) -> usize {
122148
.columns()
123149
.iter()
124150
.map(|entry| match &entry.value {
125-
Value::Column(Column::Nullable(col)) if col.validity.true_count() == 0 => 0,
151+
Value::Column(Column::Nullable(col)) if col.validity.true_count() == 0 => {
152+
// For `Nullable` columns with no valid values,
153+
// only the size of the validity bitmap is counted.
154+
col.validity.as_slice().0.len()
155+
}
126156
_ => entry.memory_size(),
127157
})
128158
.sum()

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@ use databend_common_sql::MetadataRef;
6060
use databend_common_sql::NameResolutionContext;
6161
use databend_common_sql::ScalarExpr;
6262
use databend_common_sql::TypeChecker;
63-
use databend_common_storages_fuse::DEFAULT_BLOCK_PER_SEGMENT;
64-
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
6563
use databend_enterprise_hilbert_clustering::get_hilbert_clustering_handler;
6664
use databend_storages_common_table_meta::meta::TableSnapshot;
6765
use databend_storages_common_table_meta::table::ClusterType;
@@ -307,17 +305,10 @@ impl ReclusterTableInterpreter {
307305
let table_info = tbl.get_table_info().clone();
308306

309307
let block_thresholds = tbl.get_block_thresholds();
310-
let block_per_seg = table_info
311-
.options()
312-
.get(FUSE_OPT_KEY_BLOCK_PER_SEGMENT)
313-
.and_then(|s| s.parse().ok())
314-
.unwrap_or(DEFAULT_BLOCK_PER_SEGMENT);
315-
316308
let total_bytes = recluster_info.removed_statistics.uncompressed_byte_size as usize;
317309
let total_rows = recluster_info.removed_statistics.row_count as usize;
318310

319-
let rows_per_block =
320-
block_thresholds.calc_rows_per_block(total_bytes, total_rows, Some(block_per_seg));
311+
let rows_per_block = block_thresholds.calc_rows_per_block(total_bytes, total_rows);
321312
let total_partitions = std::cmp::max(total_rows / rows_per_block, 1);
322313
warn!("Do hilbert recluster, total_bytes: {}, total_rows: {}, total_partitions: {}, rows_per_block: {}",
323314
total_bytes, total_rows, total_partitions, rows_per_block);

src/query/service/src/pipelines/builders/builder_recluster.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ impl PipelineBuilder {
152152

153153
// merge sort
154154
let sort_block_size =
155-
block_thresholds.calc_rows_per_block(task.total_bytes, task.total_rows, None);
155+
block_thresholds.calc_rows_per_block(task.total_bytes, task.total_rows);
156156

157157
let sort_pipeline_builder =
158158
SortPipelineBuilder::create(self.ctx.clone(), schema, Arc::new(sort_descs))?

src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ pub fn agg_spilling_aggregate_payload(
206206
let mut columns_data = Vec::with_capacity(columns.len());
207207
let mut columns_layout = Vec::with_capacity(columns.len());
208208
for column in columns.into_iter() {
209-
let column = column.to_column(data_block.num_rows());
209+
let column = column.into_column(data_block.num_rows());
210210
let column_data = serialize_column(&column);
211211
write_size += column_data.len() as u64;
212212
columns_layout.push(column_data.len() as u64);

src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ fn exchange_agg_spilling_aggregate_payload(
210210
let mut columns_layout = Vec::with_capacity(columns.len());
211211

212212
for column in columns.into_iter() {
213-
let column = column.to_column(data_block.num_rows());
213+
let column = column.into_column(data_block.num_rows());
214214
let column_data = serialize_column(&column);
215215
write_size += column_data.len() as u64;
216216
columns_layout.push(column_data.len() as u64);

src/query/service/src/spillers/serialize.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,12 @@ impl BlocksEncoder {
8080
let block = if blocks.len() == 1 {
8181
blocks.remove(0)
8282
} else {
83-
DataBlock::concat(&blocks).unwrap()
83+
DataBlock::concat(&std::mem::take(&mut blocks)).unwrap()
8484
};
85+
let num_rows = block.num_rows();
8586
let columns_layout = std::iter::once(self.size())
86-
.chain(block.columns().iter().map(|entry| {
87-
let column = entry
88-
.value
89-
.convert_to_full_column(&entry.data_type, block.num_rows());
87+
.chain(block.take_columns().into_iter().map(|entry| {
88+
let column = entry.value.into_full_column(&entry.data_type, num_rows);
9089
write_column(&column, &mut self.buf).unwrap();
9190
self.size()
9291
}))

0 commit comments

Comments
 (0)