Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions src/query/expression/src/aggregate/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,15 +390,14 @@ impl Payload {
true
}

pub fn empty_block(&self) -> DataBlock {
let columns = self
.aggrs
.iter()
.map(|f| ColumnBuilder::with_capacity(&f.return_type().unwrap(), 0).build())
pub fn empty_block(&self, fake_rows: Option<usize>) -> DataBlock {
let fake_rows = fake_rows.unwrap_or(0);
let columns = (0..self.aggrs.len())
.map(|_| ColumnBuilder::repeat_default(&DataType::Binary, fake_rows).build())
.chain(
self.group_types
.iter()
.map(|t| ColumnBuilder::with_capacity(t, 0).build()),
.map(|t| ColumnBuilder::repeat_default(t, fake_rows).build()),
)
.collect_vec();
DataBlock::new_from_columns(columns)
Expand Down
4 changes: 2 additions & 2 deletions src/query/expression/src/aggregate/payload_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl Payload {
}

if blocks.is_empty() {
return Ok(self.empty_block());
return Ok(self.empty_block(None));
}
DataBlock::concat(&blocks)
}
Expand Down Expand Up @@ -173,7 +173,7 @@ impl Payload {
}

if blocks.is_empty() {
return Ok(self.empty_block());
return Ok(self.empty_block(None));
}

DataBlock::concat(&blocks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,26 +173,20 @@ impl FlightScatter for HashTableHashScatter {
AggregateMeta::Partitioned { .. } => unreachable!(),
AggregateMeta::AggregateSpilling(payload) => {
for p in scatter_partitioned_payload(payload, self.buckets)? {
blocks.push(match p.len() == 0 {
true => DataBlock::empty(),
false => DataBlock::empty_with_meta(
AggregateMeta::create_agg_spilling(p),
),
});
blocks.push(DataBlock::empty_with_meta(
AggregateMeta::create_agg_spilling(p),
));
}
}
AggregateMeta::AggregatePayload(p) => {
for payload in scatter_payload(p.payload, self.buckets)? {
blocks.push(match payload.len() == 0 {
true => DataBlock::empty(),
false => {
DataBlock::empty_with_meta(AggregateMeta::create_agg_payload(
p.bucket,
payload,
p.max_partition_count,
))
}
});
blocks.push(DataBlock::empty_with_meta(
AggregateMeta::create_agg_payload(
p.bucket,
payload,
p.max_partition_count,
),
));
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ impl NewTransformPartitionBucket {
#[allow(unused_assignments)]
fn add_bucket(&mut self, mut data_block: DataBlock) -> Result<(isize, usize)> {
let (mut bucket, mut partition_count) = (0, 0);
let mut is_empty_block = false;
if let Some(block_meta) = data_block.get_meta() {
if let Some(block_meta) = AggregateMeta::downcast_ref_from(block_meta) {
(bucket, partition_count) = match block_meta {
Expand Down Expand Up @@ -250,7 +251,11 @@ impl NewTransformPartitionBucket {
if let Some(AggregateMeta::Spilled(buckets_payload)) =
AggregateMeta::downcast_from(meta)
{
let partition_count = buckets_payload[0].max_partition_count;
let partition_count = if !buckets_payload.is_empty() {
buckets_payload[0].max_partition_count
} else {
MAX_PARTITION_COUNT
};
self.max_partition_count =
self.max_partition_count.max(partition_count);

Expand All @@ -274,12 +279,14 @@ impl NewTransformPartitionBucket {
unreachable!()
}
AggregateMeta::Serialized(payload) => {
is_empty_block = payload.data_block.is_empty();
self.max_partition_count =
self.max_partition_count.max(payload.max_partition_count);

(payload.bucket, payload.max_partition_count)
}
AggregateMeta::AggregatePayload(payload) => {
is_empty_block = payload.payload.len() == 0;
self.max_partition_count =
self.max_partition_count.max(payload.max_partition_count);

Expand All @@ -298,23 +305,25 @@ impl NewTransformPartitionBucket {
));
}

if self.all_inputs_init {
if partition_count != self.max_partition_count {
return Err(ErrorCode::Internal(
if !is_empty_block {
if self.all_inputs_init {
if partition_count != self.max_partition_count {
return Err(ErrorCode::Internal(
"Internal, the partition count does not equal the max partition count on TransformPartitionBucket.
",
));
}
match self.buckets_blocks.entry(bucket) {
Entry::Vacant(v) => {
v.insert(vec![data_block]);
}
Entry::Occupied(mut v) => {
v.get_mut().push(data_block);
}
};
} else {
self.unpartitioned_blocks.push(data_block);
match self.buckets_blocks.entry(bucket) {
Entry::Vacant(v) => {
v.insert(vec![data_block]);
}
Entry::Occupied(mut v) => {
v.get_mut().push(data_block);
}
};
} else {
self.unpartitioned_blocks.push(data_block);
}
}

Ok((bucket, partition_count))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,23 @@ pub struct AggregateSerdeMeta {
pub columns_layout: Vec<usize>,
// use for new agg hashtable
pub max_partition_count: usize,
pub is_empty: bool,
}

impl AggregateSerdeMeta {
pub fn create(bucket: isize) -> BlockMetaInfoPtr {
Box::new(AggregateSerdeMeta {
typ: BUCKET_TYPE,
bucket,
location: None,
data_range: None,
columns_layout: vec![],
max_partition_count: 0,
})
}

pub fn create_agg_payload(bucket: isize, max_partition_count: usize) -> BlockMetaInfoPtr {
pub fn create_agg_payload(
bucket: isize,
max_partition_count: usize,
is_empty: bool,
) -> BlockMetaInfoPtr {
Box::new(AggregateSerdeMeta {
typ: BUCKET_TYPE,
bucket,
location: None,
data_range: None,
columns_layout: vec![],
max_partition_count,
is_empty,
})
}

Expand All @@ -61,6 +56,7 @@ impl AggregateSerdeMeta {
location: String,
data_range: Range<u64>,
columns_layout: Vec<usize>,
is_empty: bool,
) -> BlockMetaInfoPtr {
Box::new(AggregateSerdeMeta {
typ: SPILLED_TYPE,
Expand All @@ -69,6 +65,7 @@ impl AggregateSerdeMeta {
location: Some(location),
data_range: Some(data_range),
max_partition_count: 0,
is_empty,
})
}

Expand All @@ -86,6 +83,7 @@ impl AggregateSerdeMeta {
location: Some(location),
data_range: Some(data_range),
max_partition_count,
is_empty: false,
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ pub struct SerializeAggregateStream {
pub payload: Pin<Box<SerializePayload>>,
flush_state: PayloadFlushState,
end_iter: bool,
nums: usize,
}

unsafe impl Send for SerializeAggregateStream {}
Expand All @@ -198,6 +199,7 @@ impl SerializeAggregateStream {
flush_state: PayloadFlushState::default(),
_params: params.clone(),
end_iter: false,
nums: 0,
}
}
}
Expand Down Expand Up @@ -225,10 +227,32 @@ impl SerializeAggregateStream {
}

match block {
Some(block) => Ok(Some(block.add_meta(Some(
AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count),
))?)),
None => Ok(None),
Some(block) => {
self.nums += 1;
Ok(Some(block.add_meta(Some(
AggregateSerdeMeta::create_agg_payload(
p.bucket,
p.max_partition_count,
false,
),
))?))
}
None => {
// always return at least one block
if self.nums == 0 {
self.nums += 1;
let block = p.payload.empty_block(Some(1));
Ok(Some(block.add_meta(Some(
AggregateSerdeMeta::create_agg_payload(
p.bucket,
p.max_partition_count,
true,
),
))?))
} else {
Ok(None)
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,26 @@ impl TransformDeserializer {
}
Some(meta) => {
return match meta.typ == BUCKET_TYPE {
true => Ok(DataBlock::empty_with_meta(
AggregateMeta::create_serialized(
meta.bucket,
deserialize_block(
dict,
fragment_data,
&self.schema,
self.arrow_schema.clone(),
)?,
meta.max_partition_count,
),
)),
true => {
let mut block = deserialize_block(
dict,
fragment_data,
&self.schema,
self.arrow_schema.clone(),
)?;

if meta.is_empty {
block = block.slice(0..0);
}

Ok(DataBlock::empty_with_meta(
AggregateMeta::create_serialized(
meta.bucket,
block,
meta.max_partition_count,
),
))
}
false => {
let data_schema = Arc::new(exchange_defines::spilled_schema());
let arrow_schema = Arc::new(exchange_defines::spilled_arrow_schema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,31 +149,28 @@ impl BlockMetaTransform<ExchangeShuffleMeta> for TransformExchangeAggregateSeria
}

Some(AggregateMeta::AggregatePayload(p)) => {
let (bucket, max_partition_count) = (p.bucket, p.max_partition_count);

if index == self.local_pos {
serialized_blocks.push(FlightSerialized::DataBlock(
block.add_meta(Some(Box::new(AggregateMeta::AggregatePayload(p))))?,
));
continue;
}

let bucket = compute_block_number(p.bucket, p.max_partition_count)?;
let block_number = compute_block_number(bucket, max_partition_count)?;
let stream = SerializeAggregateStream::create(
&self.params,
SerializePayload::AggregatePayload(p),
);
let mut stream_blocks = stream.into_iter().collect::<Result<Vec<_>>>()?;

if stream_blocks.is_empty() {
serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty()));
} else {
let mut c = DataBlock::concat(&stream_blocks)?;
if let Some(meta) = stream_blocks[0].take_meta() {
c.replace_meta(meta);
}

let c = serialize_block(bucket, c, &self.options)?;
serialized_blocks.push(FlightSerialized::DataBlock(c));
debug_assert!(!stream_blocks.is_empty());
let mut c = DataBlock::concat(&stream_blocks)?;
if let Some(meta) = stream_blocks[0].take_meta() {
c.replace_meta(meta);
}
let c = serialize_block(block_number, c, &self.options)?;
serialized_blocks.push(FlightSerialized::DataBlock(c));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl TransformFinalAggregate {
AggregateMeta::Serialized(payload) => match agg_hashtable.as_mut() {
Some(ht) => {
debug_assert!(bucket == payload.bucket);

let payload = payload.convert_to_partitioned_payload(
self.params.group_data_types.clone(),
self.params.aggregate_functions.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ impl IEJoinState {
fn intersection(&self, left_block: &DataBlock, right_block: &DataBlock) -> bool {
let left_len = left_block.num_rows();
let right_len = right_block.num_rows();
if left_len == 0 || right_len == 0 {
return false;
}

let left_l1_column = left_block.columns()[0]
.value
.convert_to_full_column(&self.l1_data_type, left_len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ impl BlockingTransform for TransformSRF {
}

let input = self.input.take().unwrap();
if input.is_empty() {
return Ok(None);
}

let mut result_size = 0;
let mut used = 0;
Expand Down
Loading