@@ -20,6 +20,7 @@ use databend_common_exception::Result;
2020use databend_common_expression:: types:: Bitmap ;
2121use databend_common_expression:: types:: DataType ;
2222use databend_common_expression:: AggrStateRegister ;
23+ use databend_common_expression:: AggrStateType ;
2324use databend_common_expression:: ColumnBuilder ;
2425use databend_common_expression:: InputColumns ;
2526use databend_common_expression:: Scalar ;
@@ -36,6 +37,7 @@ use crate::aggregates::AggregateFunctionRef;
3637#[ derive( Clone ) ]
3738pub struct AggregateStateCombinator {
3839 name : String ,
40+ data_type : DataType ,
3941 nested : AggregateFunctionRef ,
4042}
4143
@@ -56,7 +58,25 @@ impl AggregateStateCombinator {
5658
5759 let nested = AggregateFunctionFactory :: instance ( ) . get ( nested_name, params, arguments) ?;
5860
59- Ok ( Arc :: new ( AggregateStateCombinator { name, nested } ) )
61+ let mut register = AggrStateRegister :: default ( ) ;
62+ nested. register_state ( & mut register) ;
63+
64+ let sub_types = register
65+ . states ( )
66+ . iter ( )
67+ . map ( |typ| match typ {
68+ AggrStateType :: Bool => DataType :: Boolean ,
69+ AggrStateType :: Custom ( _) => DataType :: Binary ,
70+ } )
71+ . collect ( ) ;
72+
73+ let data_type = DataType :: Tuple ( sub_types) ;
74+
75+ Ok ( Arc :: new ( AggregateStateCombinator {
76+ name,
77+ data_type,
78+ nested,
79+ } ) )
6080 }
6181
6282 pub fn combinator_desc ( ) -> CombinatorDescription {
@@ -70,17 +90,13 @@ impl AggregateFunction for AggregateStateCombinator {
7090 }
7191
7292 fn return_type ( & self ) -> Result < DataType > {
73- Ok ( DataType :: Binary )
93+ Ok ( self . data_type . clone ( ) )
7494 }
7595
7696 fn init_state ( & self , place : & AggrState ) {
7797 self . nested . init_state ( place) ;
7898 }
7999
80- fn is_state ( & self ) -> bool {
81- true
82- }
83-
84100 fn state_layout ( & self ) -> Layout {
85101 unreachable ! ( )
86102 }
@@ -146,12 +162,21 @@ impl AggregateFunction for AggregateStateCombinator {
146162 self . nested . merge_states ( place, rhs)
147163 }
148164
149- fn merge_result ( & self , _place : & AggrState , _builder : & mut ColumnBuilder ) -> Result < ( ) > {
150- todo ! ( )
151- // let str_builder = builder.as_binary_mut().unwrap();
152- // self.serialize(place, &mut str_builder.data)?;
153- // str_builder.commit_row();
154- // Ok(())
165+ fn merge_result ( & self , place : & AggrState , builder : & mut ColumnBuilder ) -> Result < ( ) > {
166+ let builders = builder. as_tuple_mut ( ) . unwrap ( ) ;
167+
168+ let loc = place
169+ . loc ( )
170+ . iter ( )
171+ . enumerate ( )
172+ . map ( |( i, loc) | match loc {
173+ AggrStateLoc :: Bool ( _, offset) => AggrStateLoc :: Bool ( i, * offset) ,
174+ AggrStateLoc :: Custom ( _, offset) => AggrStateLoc :: Custom ( i, * offset) ,
175+ } )
176+ . collect :: < Vec < _ > > ( )
177+ . into_boxed_slice ( ) ;
178+ let place = AggrState :: with_loc ( place. addr , loc) ;
179+ self . nested . serialize_builder ( & place, builders)
155180 }
156181
157182 fn need_manual_drop_state ( & self ) -> bool {
0 commit comments