@@ -3,7 +3,7 @@ use crate::persister::Persister;
33use crate :: write_buffer:: persisted_files:: PersistedFiles ;
44use crate :: write_buffer:: table_buffer:: TableBuffer ;
55use crate :: { ChunkFilter , ParquetFile , ParquetFileId , PersistedSnapshot } ;
6- use crate :: { chunk:: BufferChunk , write_buffer:: table_buffer:: SnaphotChunkIter } ;
6+ use crate :: { chunk:: BufferChunk , write_buffer:: table_buffer:: SnapshotChunkIter } ;
77use anyhow:: Context ;
88use arrow:: record_batch:: RecordBatch ;
99use arrow_util:: util:: ensure_schema;
@@ -215,7 +215,7 @@ impl QueryableBuffer {
215215
216216 let chunk_time_to_chunk = & mut table_buffer. chunk_time_to_chunks ;
217217 let snapshot_chunks = & mut table_buffer. snapshotting_chunks ;
218- let snapshot_chunks_iter = SnaphotChunkIter {
218+ let snapshot_chunks_iter = SnapshotChunkIter {
219219 keys_to_remove : all_keys_to_remove. iter ( ) ,
220220 map : chunk_time_to_chunk,
221221 table_def,
@@ -453,7 +453,6 @@ impl QueryableBuffer {
453453 }
454454}
455455
456- #[ allow( clippy:: too_many_arguments) ]
457456async fn sort_dedupe_parallel < I : Iterator < Item = PersistJob > > (
458457 iterator : I ,
459458 persister : & Arc < Persister > ,
@@ -505,7 +504,6 @@ async fn sort_dedupe_parallel<I: Iterator<Item = PersistJob>>(
505504 }
506505}
507506
508- #[ allow( clippy:: too_many_arguments) ]
509507async fn sort_dedupe_serial < I : Iterator < Item = PersistJob > > (
510508 iterator : I ,
511509 persister : & Arc < Persister > ,
@@ -839,6 +837,19 @@ impl<'a> PersistJobGroupedIterator<'a> {
839837 ) ,
840838 }
841839 }
840+
841+ fn free_mem_hint ( & mut self ) -> ( u64 , u64 ) {
842+ self . system . refresh_memory ( ) ;
843+ let system_mem_bytes = self . system . free_memory ( ) - 100_000_000 ;
844+ let cgroup_free_mem_bytes = self
845+ . system
846+ . cgroup_limits ( )
847+ . map ( |limit| limit. free_memory )
848+ . unwrap_or ( u64:: MAX ) ;
849+ let system_mem_bytes = system_mem_bytes. min ( cgroup_free_mem_bytes) ;
850+ let max_size_bytes = self . max_size_bytes . min ( system_mem_bytes) ;
851+ ( system_mem_bytes, max_size_bytes)
852+ }
842853}
843854
844855impl Iterator for PersistJobGroupedIterator < ' _ > {
@@ -856,18 +867,18 @@ impl Iterator for PersistJobGroupedIterator<'_> {
856867 let mut min_chunk_time = current_data. chunk_time ;
857868 let mut current_size_bytes = current_data. total_batch_size ( ) ;
858869 debug ! ( ?current_size_bytes, table_name = ?current_data. table_name, ">>> current_size_bytes for table" ) ;
859- self . system . refresh_memory ( ) ;
860- let system_mem_bytes = self . system . free_memory ( ) - 100_000_000 ;
861- let max_size_bytes = self . max_size_bytes . min ( system_mem_bytes) ;
870+ let ( system_mem_bytes, max_size_bytes) = self . free_mem_hint ( ) ;
862871 debug ! (
863872 max_size_bytes,
864873 system_mem_bytes, ">>> max size bytes/system mem bytes"
865874 ) ;
866875
867876 while all_batches. len ( ) < self . chunk_size && current_size_bytes < max_size_bytes {
868- debug ! ( ?current_size_bytes, ">>> current_size_bytes" ) ;
877+ trace ! ( ?current_size_bytes, ">>> current_size_bytes" ) ;
869878 if let Some ( next_data) = self . iter . peek ( ) {
870- if next_data. table_id == * current_table_id {
879+ if next_data. table_id == * current_table_id
880+ && ( current_size_bytes + next_data. total_batch_size ( ) ) < max_size_bytes
881+ {
871882 let next = self . iter . next ( ) . unwrap ( ) ;
872883 ts_min_max = ts_min_max. union ( & next. timestamp_min_max ) ;
873884 min_chunk_time = min_chunk_time. min ( next. chunk_time ) ;
@@ -880,18 +891,17 @@ impl Iterator for PersistJobGroupedIterator<'_> {
880891 break ;
881892 }
882893 }
894+ debug ! ( ?current_size_bytes, ">>> final batch size in bytes" ) ;
883895
884896 let table_defn = self
885897 . catalog
886898 . db_schema_by_id ( & current_data. database_id ) ?
887899 . table_definition_by_id ( & current_data. table_id ) ?;
888900
901+ let arrow = table_defn. schema . as_arrow ( ) ;
889902 let all_schema_aligned_batches: Vec < RecordBatch > = all_batches
890903 . iter ( )
891- . map ( |batch| {
892- ensure_schema ( & table_defn. schema . as_arrow ( ) , batch)
893- . expect ( "batches should have same schema" )
894- } )
904+ . map ( |batch| ensure_schema ( & arrow, batch) . expect ( "batches should have same schema" ) )
895905 . collect ( ) ;
896906
897907 Some ( PersistJob {
@@ -918,8 +928,8 @@ impl Iterator for PersistJobGroupedIterator<'_> {
918928}
919929
920930pub ( crate ) struct SortDedupePersistSummary {
921- pub file_size_bytes : u64 ,
922- pub file_meta_data : FileMetaData ,
931+ pub ( crate ) file_size_bytes : u64 ,
932+ pub ( crate ) file_meta_data : FileMetaData ,
923933}
924934
925935impl SortDedupePersistSummary {
@@ -1523,7 +1533,7 @@ mod tests {
15231533 persisted_files : Arc :: new ( PersistedFiles :: new ( ) ) ,
15241534 parquet_cache : None ,
15251535 gen1_duration : Gen1Duration :: new_1m ( ) ,
1526- max_size_per_parquet_file_bytes : 100_000 ,
1536+ max_size_per_parquet_file_bytes : 150_000 ,
15271537 } ;
15281538 let queryable_buffer = QueryableBuffer :: new ( queryable_buffer_args) ;
15291539
@@ -1532,7 +1542,9 @@ mod tests {
15321542 for i in 0 ..2 {
15331543 // create another write, this time with two tags, in a different gen1 block
15341544 let ts = Gen1Duration :: new_1m ( ) . as_duration ( ) . as_nanos ( ) as i64 + ( i * 240_000_000_000 ) ;
1535- let lp = format ! ( "foo,t1=a,t2=b f1=3i,f2=3 {}" , ts) ;
1545+ // keep these tags different to bar so that it's easier to spot the byte differences
1546+ // in the logs, otherwise foo and bar report exact same usage in bytes
1547+ let lp = format ! ( "foo,t1=foo_a f1={}i,f2={} {}" , i, i, ts) ;
15361548 debug ! ( ?lp, ">>> writing line" ) ;
15371549 let val = WriteValidator :: initialize ( db. clone ( ) , Arc :: clone ( & catalog) , 0 ) . unwrap ( ) ;
15381550
@@ -1561,9 +1573,8 @@ mod tests {
15611573 for i in 0 ..10 {
15621574 // create another write, this time with two tags, in a different gen1 block
15631575 let ts = Gen1Duration :: new_1m ( ) . as_duration ( ) . as_nanos ( ) as i64 + ( i * 240_000_000_000 ) ;
1564- // let line = format!("bar,t1=a,t2=b f1=3i,f2=3 {}", ts);
15651576 let lp = format ! (
1566- "bar,t1=a ,t2=b f1=3i,f2=3 {}\n bar,t1=a ,t2=c f1=4i,f2=3 {}\n bar,t1=ab,t2=b f1=5i,f2=3 {}" ,
1577+ "bar,t1=br_a ,t2=br_b f1=3i,f2=3 {}\n bar,t1=br_a ,t2=br_c f1=4i,f2=3 {}\n bar,t1=ab,t2=bb f1=5i,f2=3 {}" ,
15671578 ts, ts, ts
15681579 ) ;
15691580 debug ! ( ?lp, ">>> writing line" ) ;
@@ -1632,12 +1643,27 @@ mod tests {
16321643 assert_eq ! ( 2 , foo_file. row_count) ;
16331644 }
16341645
1635- // bar had 10 writes with 3 lines, should write 4 files each with 9 rows or 3 row in them
1646+ // bar had 10 writes (each in separate chunk) with 3 lines in each write,
1647+ // so these are grouped but because of the larger writes and max memory
1648+ // is set to 150_000 bytes at the top, we end up with 4 files.
16361649 let table = db. table_definition ( "bar" ) . unwrap ( ) ;
16371650 let files = queryable_buffer
16381651 . persisted_files
16391652 . get_files ( db. id , table. table_id ) ;
16401653 debug ! ( ?files, ">>> test: queryable buffer persisted files" ) ;
1654+
1655+ // Below is the growth in memory (bytes) as reported by arrow record batches
1656+ //
1657+ // >>> current_size_bytes for table current_size_bytes=43952 table_name="bar"
1658+ // >>> final batch size in bytes current_size_bytes=131856
1659+ // >>> current_size_bytes for table current_size_bytes=43952 table_name="bar"
1660+ // >>> final batch size in bytes current_size_bytes=131856
1661+ // >>> current_size_bytes for table current_size_bytes=43952 table_name="bar"
1662+ // >>> final batch size in bytes current_size_bytes=131856
1663+ // >>> current_size_bytes for table current_size_bytes=43952 table_name="bar"
1664+ // >>> final batch size in bytes current_size_bytes=43952
1665+ // >>> current_size_bytes for table current_size_bytes=34408 table_name="foo"
1666+ // >>> final batch size in bytes current_size_bytes=68816
16411667 assert_eq ! ( 4 , files. len( ) ) ;
16421668 for bar_file in files {
16431669 debug ! ( ?bar_file, ">>> test: bar_file" ) ;
0 commit comments