1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15+ use std:: collections:: HashSet ;
1516use std:: sync:: Arc ;
1617
18+ use databend_common_catalog:: plan:: split_row_id;
1719use databend_common_catalog:: plan:: DataSourcePlan ;
1820use databend_common_catalog:: plan:: Projection ;
1921use databend_common_catalog:: table_context:: TableContext ;
@@ -66,6 +68,7 @@ pub fn row_fetch_processor(
6668 input,
6769 output,
6870 row_id_col_offset,
71+ max_threads,
6972 NativeRowsFetcher :: < true > :: create (
7073 fuse_table. clone ( ) ,
7174 projection. clone ( ) ,
@@ -79,6 +82,7 @@ pub fn row_fetch_processor(
7982 input,
8083 output,
8184 row_id_col_offset,
85+ max_threads,
8286 NativeRowsFetcher :: < false > :: create (
8387 fuse_table. clone ( ) ,
8488 projection. clone ( ) ,
@@ -97,6 +101,7 @@ pub fn row_fetch_processor(
97101 input,
98102 output,
99103 row_id_col_offset,
104+ max_threads,
100105 ParquetRowsFetcher :: < true > :: create (
101106 fuse_table. clone ( ) ,
102107 projection. clone ( ) ,
@@ -111,6 +116,7 @@ pub fn row_fetch_processor(
111116 input,
112117 output,
113118 row_id_col_offset,
119+ max_threads,
114120 ParquetRowsFetcher :: < false > :: create (
115121 fuse_table. clone ( ) ,
116122 projection. clone ( ) ,
@@ -130,13 +136,17 @@ pub fn row_fetch_processor(
130136pub trait RowsFetcher {
131137 async fn on_start ( & mut self ) -> Result < ( ) > ;
132138 async fn fetch ( & mut self , row_ids : & [ u64 ] ) -> Result < DataBlock > ;
139+ fn clear_cache ( & mut self ) ;
133140}
134141
135142pub struct TransformRowsFetcher < F : RowsFetcher > {
136143 row_id_col_offset : usize ,
144+ max_threads : usize ,
137145 fetcher : F ,
138146 need_wrap_nullable : bool ,
139147 blocks : Vec < DataBlock > ,
148+ row_ids : Vec < u64 > ,
149+ distinct_block_ids : HashSet < u64 > ,
140150}
141151
142152#[ async_trait:: async_trait]
@@ -151,26 +161,7 @@ where F: RowsFetcher + Send + Sync + 'static
151161 }
152162
153163 async fn transform ( & mut self , data : DataBlock ) -> Result < Option < DataBlock > > {
154- self . blocks . push ( data) ;
155- Ok ( None )
156- }
157-
158- #[ async_backtrace:: framed]
159- async fn on_finish ( & mut self , _output : bool ) -> Result < Option < DataBlock > > {
160- if self . blocks . is_empty ( ) {
161- return Ok ( None ) ;
162- }
163-
164- let start_time = std:: time:: Instant :: now ( ) ;
165- let num_blocks = self . blocks . len ( ) ;
166- let mut data = DataBlock :: concat ( & self . blocks ) ?;
167- self . blocks . clear ( ) ;
168-
169164 let num_rows = data. num_rows ( ) ;
170- if num_rows == 0 {
171- return Ok ( None ) ;
172- }
173-
174165 let entry = & data. columns ( ) [ self . row_id_col_offset ] ;
175166 let value = entry
176167 . value
@@ -184,24 +175,43 @@ where F: RowsFetcher + Send + Sync + 'static
184175 value. column . into_number ( ) . unwrap ( ) . into_u_int64 ( ) . unwrap ( )
185176 } ;
186177
187- let fetched_block = self . fetcher . fetch ( & row_id_column) . await ?;
178+ // Process the row id column in block batch
179+ // Ensure that the same block would be processed in the same batch and threads
180+ let mut consumed_len = num_rows;
181+ for ( idx, row_id) in row_id_column. iter ( ) . enumerate ( ) {
182+ let ( prefix, _) = split_row_id ( * row_id) ;
188183
189- for col in fetched_block. columns ( ) . iter ( ) {
190- if self . need_wrap_nullable {
191- data. add_column ( wrap_true_validity ( col, num_rows) ) ;
192- } else {
193- data. add_column ( col. clone ( ) ) ;
184+ // Which means we are full now, new prefix will be processed in next batch
185+ if self . distinct_block_ids . len ( ) >= self . max_threads * 2
186+ && !self . distinct_block_ids . contains ( & prefix)
187+ {
188+ consumed_len = idx;
189+ break ;
194190 }
191+ self . distinct_block_ids . insert ( prefix) ;
195192 }
196193
197- log:: info!(
198- "TransformRowsFetcher on_finish: num_rows: {}, input blocks: {} in {} milliseconds" ,
199- num_rows,
200- num_blocks,
201- start_time. elapsed( ) . as_millis( )
202- ) ;
194+ self . row_ids
195+ . extend_from_slice ( & row_id_column. as_slice ( ) [ 0 ..consumed_len] ) ;
196+ self . blocks . push ( data. slice ( 0 ..consumed_len) ) ;
203197
204- Ok ( Some ( data) )
198+ if consumed_len < num_rows {
199+ let block = self . flush ( ) . await ;
200+ for row_id in row_id_column. as_slice ( ) [ consumed_len..num_rows] . iter ( ) {
201+ let ( prefix, _) = split_row_id ( * row_id) ;
202+ self . distinct_block_ids . insert ( prefix) ;
203+ self . row_ids . push ( * row_id) ;
204+ }
205+ self . blocks . push ( data. slice ( consumed_len..num_rows) ) ;
206+ block
207+ } else {
208+ Ok ( None )
209+ }
210+ }
211+
212+ #[ async_backtrace:: framed]
213+ async fn on_finish ( & mut self , _output : bool ) -> Result < Option < DataBlock > > {
214+ self . flush ( ) . await
205215 }
206216}
207217
@@ -212,16 +222,59 @@ where F: RowsFetcher + Send + Sync + 'static
212222 input : Arc < InputPort > ,
213223 output : Arc < OutputPort > ,
214224 row_id_col_offset : usize ,
225+ max_threads : usize ,
215226 fetcher : F ,
216227 need_wrap_nullable : bool ,
217228 ) -> ProcessorPtr {
218229 ProcessorPtr :: create ( AsyncAccumulatingTransformer :: create ( input, output, Self {
219230 row_id_col_offset,
231+ max_threads,
220232 fetcher,
221233 need_wrap_nullable,
222234 blocks : vec ! [ ] ,
235+ row_ids : vec ! [ ] ,
236+ distinct_block_ids : HashSet :: new ( ) ,
223237 } ) )
224238 }
239+
240+ async fn flush ( & mut self ) -> Result < Option < DataBlock > > {
241+ let blocks = std:: mem:: take ( & mut self . blocks ) ;
242+ if blocks. is_empty ( ) {
243+ return Ok ( None ) ;
244+ }
245+
246+ let start_time = std:: time:: Instant :: now ( ) ;
247+ let num_blocks = blocks. len ( ) ;
248+ let mut data = DataBlock :: concat ( & blocks) ?;
249+ let num_rows = data. num_rows ( ) ;
250+ if num_rows == 0 {
251+ return Ok ( None ) ;
252+ }
253+
254+ let row_ids = std:: mem:: take ( & mut self . row_ids ) ;
255+ self . distinct_block_ids . clear ( ) ;
256+ let fetched_block = self . fetcher . fetch ( & row_ids) . await ?;
257+ // Clear cache after fetch, the block will never be fetched in following batches
258+ // We ensure it in transform method
259+ self . fetcher . clear_cache ( ) ;
260+
261+ for col in fetched_block. columns ( ) . iter ( ) {
262+ if self . need_wrap_nullable {
263+ data. add_column ( wrap_true_validity ( col, num_rows) ) ;
264+ } else {
265+ data. add_column ( col. clone ( ) ) ;
266+ }
267+ }
268+
269+ log:: info!(
270+ "TransformRowsFetcher flush: num_rows: {}, input blocks: {} in {} milliseconds" ,
271+ num_rows,
272+ num_blocks,
273+ start_time. elapsed( ) . as_millis( )
274+ ) ;
275+
276+ Ok ( Some ( data) )
277+ }
225278}
226279
227280fn wrap_true_validity ( column : & BlockEntry , num_rows : usize ) -> BlockEntry {
0 commit comments