1515use std:: io:: Write ;
1616use std:: sync:: Arc ;
1717
18+ use arrow_array:: ArrayRef ;
19+ use arrow_ipc:: reader:: FileReaderBuilder ;
1820use arrow_schema:: Schema ;
1921use buf_list:: BufList ;
2022use buf_list:: Cursor ;
2123use bytes:: Buf ;
2224use databend_common_base:: base:: Alignment ;
2325use databend_common_base:: base:: DmaWriteBuf ;
26+ use databend_common_exception:: ErrorCode ;
2427use databend_common_exception:: Result ;
25- use databend_common_expression:: arrow:: read_column;
2628use databend_common_expression:: arrow:: write_column;
2729use databend_common_expression:: infer_table_schema;
30+ use databend_common_expression:: types:: DataType ;
31+ use databend_common_expression:: BlockEntry ;
2832use databend_common_expression:: DataBlock ;
2933use databend_common_expression:: DataField ;
3034use databend_common_expression:: DataSchema ;
35+ use databend_common_expression:: Value ;
3136use opendal:: Buffer ;
3237use parquet:: arrow:: arrow_reader:: ParquetRecordBatchReader ;
3338use parquet:: arrow:: ArrowWriter ;
@@ -101,21 +106,9 @@ impl BlocksEncoder {
101106 }
102107}
103108
104- pub ( super ) fn deserialize_block ( columns_layout : & Layout , mut data : Buffer ) -> Result < DataBlock > {
109+ pub ( super ) fn deserialize_block ( columns_layout : & Layout , data : Buffer ) -> Result < DataBlock > {
105110 match columns_layout {
106- Layout :: ArrowIpc ( layout) => {
107- let columns = layout
108- . iter ( )
109- . map ( |& layout| {
110- let ls = BufList :: from_iter ( data. slice ( 0 ..layout) ) ;
111- data. advance ( layout) ;
112- let mut cursor = Cursor :: new ( ls) ;
113- read_column ( & mut cursor)
114- } )
115- . collect :: < Result < Vec < _ > , _ > > ( ) ?;
116-
117- Ok ( DataBlock :: new_from_columns ( columns) )
118- }
111+ Layout :: ArrowIpc ( layout) => bare_blocks_from_arrow_ipc ( layout, data) ,
119112 Layout :: Parquet => bare_blocks_from_parquet ( Reader ( data) ) ,
120113 Layout :: Aggregate => unreachable ! ( ) ,
121114 }
@@ -131,14 +124,51 @@ fn fake_data_schema(block: &DataBlock) -> DataSchema {
131124 DataSchema :: new ( fields)
132125}
133126
127+ fn bare_blocks_from_arrow_ipc ( layout : & [ usize ] , mut data : Buffer ) -> Result < DataBlock > {
128+ assert ! ( !layout. is_empty( ) ) ;
129+ let mut columns = Vec :: with_capacity ( layout. len ( ) ) ;
130+ let mut read_array = |layout : usize | -> Result < ( ArrayRef , DataType ) > {
131+ let ls = BufList :: from_iter ( data. slice ( 0 ..layout) ) ;
132+ data. advance ( layout) ;
133+ let mut reader = FileReaderBuilder :: new ( ) . build ( Cursor :: new ( ls) ) ?;
134+ let schema = reader. schema ( ) ;
135+ let f = DataField :: try_from ( schema. field ( 0 ) ) ?;
136+ let data_type = f. data_type ( ) . clone ( ) ;
137+ let col = reader
138+ . next ( )
139+ . ok_or_else ( || ErrorCode :: Internal ( "expected one arrow array" ) ) ??
140+ . remove_column ( 0 ) ;
141+ Ok ( ( col, data_type) )
142+ } ;
143+ let ( array, data_type) = read_array ( layout[ 0 ] ) ?;
144+ let num_rows = array. len ( ) ;
145+ let val = Value :: from_arrow_rs ( array, & data_type) ?;
146+ columns. push ( BlockEntry :: new ( data_type, val) ) ;
147+ for & layout in layout. iter ( ) . skip ( 1 ) {
148+ let ( array, data_type) = read_array ( layout) ?;
149+ let val = Value :: from_arrow_rs ( array, & data_type) ?;
150+ columns. push ( BlockEntry :: new ( data_type, val) ) ;
151+ }
152+ Ok ( DataBlock :: new ( columns, num_rows) )
153+ }
154+
134155/// Deserialize bare data block from parquet format.
135156fn bare_blocks_from_parquet < R : ChunkReader + ' static > ( data : R ) -> Result < DataBlock > {
136157 let reader = ParquetRecordBatchReader :: try_new ( data, usize:: MAX ) ?;
137158 let mut blocks = Vec :: new ( ) ;
138159 for record_batch in reader {
139160 let record_batch = record_batch?;
140161 let schema = DataSchema :: try_from ( record_batch. schema ( ) . as_ref ( ) ) ?;
141- let ( block, _) = DataBlock :: from_record_batch ( & schema, & record_batch) ?;
162+ let num_rows = record_batch. num_rows ( ) ;
163+ let mut columns = Vec :: with_capacity ( record_batch. num_columns ( ) ) ;
164+ for ( array, field) in record_batch. columns ( ) . iter ( ) . zip ( schema. fields ( ) ) {
165+ let data_type = field. data_type ( ) ;
166+ columns. push ( BlockEntry :: new (
167+ data_type. clone ( ) ,
168+ Value :: from_arrow_rs ( array. clone ( ) , data_type) ?,
169+ ) )
170+ }
171+ let block = DataBlock :: new ( columns, num_rows) ;
142172 blocks. push ( block) ;
143173 }
144174
0 commit comments