@@ -6,10 +6,10 @@ use crate::write_buffer::table_buffer::TableBuffer;
66use crate :: { ChunkFilter , ParquetFile , ParquetFileId , PersistedSnapshot } ;
77use anyhow:: Context ;
88use arrow:: {
9- array:: AsArray ,
9+ array:: { AsArray , UInt64Array } ,
10+ compute:: take,
1011 datatypes:: TimestampNanosecondType ,
1112 record_batch:: RecordBatch ,
12- row:: { RowConverter , SortField } ,
1313} ;
1414use async_trait:: async_trait;
1515use data_types:: {
@@ -203,8 +203,7 @@ impl QueryableBuffer {
203203 let table_name =
204204 db_schema. table_id_to_name ( table_id) . expect ( "table exists" ) ;
205205 // mapping between time to main record batch array's index
206- let mut smaller_chunks: HashMap < i64 , ( MinMax , Vec < usize > ) > =
207- HashMap :: new ( ) ;
206+ let mut smaller_chunks: HashMap < i64 , ( MinMax , Vec < u64 > ) > = HashMap :: new ( ) ;
208207 let smaller_duration = Duration :: from_secs ( 10 ) . as_nanos ( ) as i64 ;
209208 let all_times = chunk
210209 . record_batch
@@ -214,59 +213,62 @@ impl QueryableBuffer {
214213 . values ( ) ;
215214 for ( idx, time) in all_times. iter ( ) . enumerate ( ) {
216215 let smaller_chunk_time = time - ( time % smaller_duration) ;
217- let ( min_max, vec_indices) =
218- smaller_chunks. entry ( smaller_chunk_time) . or_insert_with ( || {
219- ( MinMax :: new ( i64:: MAX , i64:: MIN ) , Vec :: new ( ) )
220- } ) ;
216+ let ( min_max, vec_indices) = smaller_chunks
217+ . entry ( smaller_chunk_time)
218+ . or_insert_with ( || ( MinMax :: new ( i64:: MAX , i64:: MIN ) , Vec :: new ( ) ) ) ;
221219
222220 min_max. update ( * time) ;
223- vec_indices. push ( idx) ;
221+ vec_indices. push ( idx as u64 ) ;
222+ }
223+
224+ let total_row_count = chunk. record_batch . column ( 0 ) . len ( ) ;
225+
226+ for ( smaller_chunk_time, ( min_max, all_indexes) ) in smaller_chunks. iter ( ) {
227+ debug ! (
228+ ?smaller_chunk_time,
229+ ?min_max,
230+ num_indexes = ?all_indexes. len( ) ,
231+ ?total_row_count,
232+ ">>> number of small chunks" ) ;
224233 }
225234
226235 // at this point we have a bucket for each 10 sec block, we can create
227236 // smaller record batches here but maybe wasteful if we ever needed one
228237 // batch (let's see how this works first and then decide what can happen)
229238 let batch_schema = chunk. record_batch . schema ( ) ;
239+ debug ! ( schema = ?chunk. schema, ">>> influx schema" ) ;
240+ debug ! ( arrow_schema = ?batch_schema, ">>> batch schema" ) ;
230241 let parent_cols = chunk. record_batch . columns ( ) ;
231- let fields = batch_schema
232- . fields ( )
233- . iter ( )
234- . map ( |field| SortField :: new ( field. data_type ( ) . clone ( ) ) )
235- . collect ( ) ;
236- debug ! ( ?fields, ">>> schema fields" ) ;
237-
238- let converter =
239- RowConverter :: new ( fields) . expect ( "row converter created from fields" ) ;
240- let rows = converter
241- . convert_columns ( parent_cols)
242- . expect ( "convert cols to rows to succeed" ) ;
243-
244- for ( smaller_chunk_time, ( min_max, all_indexes) ) in smaller_chunks. iter ( ) {
245- // create a record batch using just all_indexes from parent recordbatch
246- let all_rows = all_indexes
247- . iter ( )
248- . map ( |idx| rows. row ( * idx) )
249- . collect :: < Vec < _ > > ( ) ;
250-
251- let child_cols = converter
252- . convert_rows ( all_rows)
253- . expect ( "should convert rows back to cols" ) ;
254242
243+ for ( smaller_chunk_time, ( min_max, all_indexes) ) in
244+ smaller_chunks. into_iter ( )
245+ {
246+ let mut smaller_chunk_cols = vec ! [ ] ;
247+ let indices = UInt64Array :: from_iter ( all_indexes) ;
248+ for arr in parent_cols {
249+ let filtered =
250+ take ( & arr, & indices, None )
251+ . expect ( "index should be accessible in parent cols" ) ;
252+
253+ debug ! ( smaller_chunk_len = ?filtered. len( ) , ">>> filtered size" ) ;
254+ smaller_chunk_cols. push ( filtered) ;
255+ }
256+ debug ! ( smaller_chunk_len = ?smaller_chunk_cols. len( ) , ">>> smaller chunks size" ) ;
255257 let smaller_rec_batch =
256- RecordBatch :: try_new ( Arc :: clone ( & batch_schema) , child_cols )
258+ RecordBatch :: try_new ( Arc :: clone ( & batch_schema) , smaller_chunk_cols )
257259 . expect ( "create smaller record batch" ) ;
258260 let persist_job = PersistJob {
259261 database_id : * database_id,
260262 table_id : * table_id,
261263 table_name : Arc :: clone ( & table_name) ,
262- chunk_time : * smaller_chunk_time,
264+ chunk_time : smaller_chunk_time,
263265 path : ParquetFilePath :: new (
264266 self . persister . node_identifier_prefix ( ) ,
265267 db_schema. name . as_ref ( ) ,
266268 database_id. as_u32 ( ) ,
267269 table_name. as_ref ( ) ,
268270 table_id. as_u32 ( ) ,
269- * smaller_chunk_time,
271+ smaller_chunk_time,
270272 snapshot_details. last_wal_sequence_number ,
271273 ) ,
272274 batch : smaller_rec_batch,
@@ -277,6 +279,63 @@ impl QueryableBuffer {
277279 } ;
278280 persisting_chunks. push ( persist_job) ;
279281 }
282+ // let fields = batch_schema
283+ // .fields()
284+ // .iter()
285+ // .map(|field| SortField::new(field.data_type().clone()))
286+ // .collect();
287+ // debug!(?fields, ">>> schema fields");
288+ //
289+ // let converter =
290+ // RowConverter::new(fields).expect("row converter created from fields");
291+ // debug!(?converter, ">>> converter");
292+ //
293+ // let rows = converter
294+ // .convert_columns(parent_cols)
295+ // .expect("convert cols to rows to succeed");
296+ // debug!(?rows, ">>> all rows");
297+ //
298+ // for (smaller_chunk_time, (min_max, all_indexes)) in smaller_chunks.iter() {
299+ //
300+ // // create a record batch using just all_indexes from parent recordbatch
301+ // let all_rows = all_indexes
302+ // .iter()
303+ // .map(|idx| rows.row(*idx))
304+ // .collect::<Vec<_>>();
305+ // debug!(?rows, ">>> all filtered child rows");
306+ //
307+ // // hmmm this conversion turns Dictionary types to StringArray, not sure
308+ // // why
309+ // let child_cols = converter
310+ // .convert_rows(all_rows)
311+ // .expect("should convert rows back to cols");
312+ // debug!(?child_cols, ">>> all child cols");
313+ //
314+ // let smaller_rec_batch =
315+ // RecordBatch::try_new(Arc::clone(&batch_schema), child_cols)
316+ // .expect("create smaller record batch");
317+ // let persist_job = PersistJob {
318+ // database_id: *database_id,
319+ // table_id: *table_id,
320+ // table_name: Arc::clone(&table_name),
321+ // chunk_time: *smaller_chunk_time,
322+ // path: ParquetFilePath::new(
323+ // self.persister.node_identifier_prefix(),
324+ // db_schema.name.as_ref(),
325+ // database_id.as_u32(),
326+ // table_name.as_ref(),
327+ // table_id.as_u32(),
328+ // *smaller_chunk_time,
329+ // snapshot_details.last_wal_sequence_number,
330+ // ),
331+ // batch: smaller_rec_batch,
332+ // // this schema.clone() can be avoided?
333+ // schema: chunk.schema.clone(),
334+ // timestamp_min_max: min_max.to_ts_min_max(),
335+ // sort_key: table_buffer.sort_key.clone(),
336+ // };
337+ // persisting_chunks.push(persist_job);
338+ // }
280339 }
281340 }
282341 }
@@ -494,6 +553,7 @@ impl QueryableBuffer {
494553 }
495554}
496555
556+ #[ derive( Debug ) ]
497557struct MinMax {
498558 min : i64 ,
499559 max : i64 ,
@@ -505,10 +565,7 @@ impl MinMax {
505565 // it's good to start with i64::MAX for min and i64::MIN
506566 // for max in loops so this type unlike TimestampMinMax
507567 // doesn't check this pre-condition
508- Self {
509- min,
510- max
511- }
568+ Self { min, max }
512569 }
513570
514571 fn update ( & mut self , other : i64 ) {
0 commit comments