File tree Expand file tree Collapse file tree 2 files changed +9
-4
lines changed
src/query/storages/iceberg/src Expand file tree Collapse file tree 2 files changed +9
-4
lines changed Original file line number Diff line number Diff line change @@ -171,15 +171,15 @@ impl IcebergTable {
171171 ) )
172172 } ) ?;
173173
174- Ok ( iceberg:: table:: Table :: builder ( )
174+ iceberg:: table:: Table :: builder ( )
175175 . identifier ( identifier)
176176 . metadata ( metadata)
177177 . metadata_location ( metadata_location)
178178 . file_io ( file_io)
179179 . build ( )
180180 . map_err ( |err| {
181181 ErrorCode :: ReadTableDataError ( format ! ( "Rebuild iceberg table failed: {err:?}" ) )
182- } ) ? )
182+ } )
183183 }
184184
185185 /// create a new table on the table directory
Original file line number Diff line number Diff line change @@ -135,8 +135,13 @@ impl Processor for IcebergTableSource {
135135 // And we should try to build another stream (in next event loop).
136136 } else if let Some ( part) = self . ctx . get_partition ( ) {
137137 let part = IcebergPartInfo :: from_part ( & part) ?;
138- // TODO: enable row filter?
139- let reader = self . table . table . reader_builder ( ) . build ( ) ;
138+ let reader = self
139+ . table
140+ . table
141+ . reader_builder ( )
142+ . with_batch_size ( self . ctx . get_settings ( ) . get_parquet_max_block_size ( ) ? as usize )
143+ . with_row_group_filtering_enabled ( true )
144+ . build ( ) ;
140145 // TODO: don't use stream here.
141146 let stream = reader
142147 . read ( Box :: pin ( stream:: iter ( [ Ok ( part. to_task ( ) ) ] ) ) )
You can’t perform that action at this time.
0 commit comments