Skip to content

Commit 56024af

Browse files
feat: produce snapshot chunks lazily
1 parent 58558ed commit 56024af

File tree

5 files changed

+261
-152
lines changed

5 files changed

+261
-152
lines changed

influxdb3/src/commands/serve.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -573,8 +573,8 @@ pub async fn command(config: Config) -> Result<()> {
573573

574574
info!("setting up background mem check for query buffer");
575575
background_buffer_checker(
576-
// config.force_snapshot_mem_threshold.bytes(),
577-
734003200,
576+
config.force_snapshot_mem_threshold.bytes(),
577+
// 734003200,
578578
&write_buffer_impl,
579579
)
580580
.await;

influxdb3_write/src/paths.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ pub struct ParquetFilePath(ObjPath);
5555
impl ParquetFilePath {
5656
/// Generate a parquet file path using the given arguments. This will convert the provided
5757
/// `chunk_time` into a date time string with format `'YYYY-MM-DD/HH-MM'`
58+
#[allow(clippy::too_many_arguments)]
5859
pub fn new(
5960
host_prefix: &str,
6061
db_name: &str,
@@ -63,14 +64,26 @@ impl ParquetFilePath {
6364
table_id: u32,
6465
chunk_time: i64,
6566
wal_file_sequence_number: WalFileSequenceNumber,
67+
sub_chunk_index: Option<u64>,
6668
) -> Self {
6769
let date_time = DateTime::<Utc>::from_timestamp_nanos(chunk_time);
68-
let path = ObjPath::from(format!(
69-
"{host_prefix}/dbs/{db_name}-{db_id}/{table_name}-{table_id}/{date_string}/{wal_seq:010}.{ext}",
70-
date_string = date_time.format("%Y-%m-%d/%H-%M"),
71-
wal_seq = wal_file_sequence_number.as_u64(),
72-
ext = PARQUET_FILE_EXTENSION
73-
));
70+
let path = if sub_chunk_index.is_some() {
71+
ObjPath::from(format!(
72+
"{host_prefix}/dbs/{db_name}-{db_id}/{table_name}-{table_id}/{date_string}/{wal_seq:010}-{chunk_idx}.{ext}",
73+
date_string = date_time.format("%Y-%m-%d/%H-%M"),
74+
wal_seq = wal_file_sequence_number.as_u64(),
75+
chunk_idx = sub_chunk_index.unwrap(),
76+
ext = PARQUET_FILE_EXTENSION
77+
))
78+
79+
} else {
80+
ObjPath::from(format!(
81+
"{host_prefix}/dbs/{db_name}-{db_id}/{table_name}-{table_id}/{date_string}/{wal_seq:010}.{ext}",
82+
date_string = date_time.format("%Y-%m-%d/%H-%M"),
83+
wal_seq = wal_file_sequence_number.as_u64(),
84+
ext = PARQUET_FILE_EXTENSION
85+
))
86+
};
7487
Self(path)
7588
}
7689
}
@@ -143,6 +156,7 @@ fn parquet_file_path_new() {
143156
.timestamp_nanos_opt()
144157
.unwrap(),
145158
WalFileSequenceNumber::new(1337),
159+
None,
146160
),
147161
ObjPath::from("my_host/dbs/my_db-0/my_table-0/2038-01-19/03-14/0000001337.parquet")
148162
);
@@ -162,6 +176,7 @@ fn parquet_file_percent_encoded() {
162176
.timestamp_nanos_opt()
163177
.unwrap(),
164178
WalFileSequenceNumber::new(100),
179+
None,
165180
)
166181
.as_ref()
167182
.as_ref(),

influxdb3_write/src/persister.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -968,6 +968,7 @@ mod tests {
968968
0,
969969
Utc::now().timestamp_nanos_opt().unwrap(),
970970
WalFileSequenceNumber::new(1),
971+
None,
971972
);
972973
let (bytes_written, meta, _) = persister
973974
.persist_parquet_file(path.clone(), stream_builder.build())

influxdb3_write/src/write_buffer/queryable_buffer.rs

Lines changed: 103 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::chunk::BufferChunk;
1+
use crate::{chunk::BufferChunk, write_buffer::table_buffer::SnaphotChunkIter};
22
use crate::paths::ParquetFilePath;
33
use crate::persister::Persister;
44
use crate::write_buffer::persisted_files::PersistedFiles;
@@ -193,149 +193,134 @@ impl QueryableBuffer {
193193
for (database_id, table_map) in buffer.db_to_table.iter_mut() {
194194
let db_schema = catalog.db_schema_by_id(database_id).expect("db exists");
195195
for (table_id, table_buffer) in table_map.iter_mut() {
196+
info!(db_name = ?db_schema.name, ?table_id, ">>> working on db, table");
196197
let table_def = db_schema
197198
.table_definition_by_id(table_id)
198199
.expect("table exists");
199-
let snapshot_chunks =
200-
table_buffer.snapshot(table_def, snapshot_details.end_time_marker);
200+
let sort_key = table_buffer.sort_key.clone();
201+
let all_keys_to_remove = table_buffer.get_keys_to_remove(snapshot_details.end_time_marker);
202+
info!(num_keys_to_remove = ?all_keys_to_remove.len(), ">>> num keys to remove");
203+
204+
let chunk_time_to_chunk = &mut table_buffer.chunk_time_to_chunks;
205+
let snapshot_chunks = &mut table_buffer.snapshotting_chunks;
206+
let snapshot_chunks_iter = SnaphotChunkIter {
207+
keys_to_remove: all_keys_to_remove.iter(),
208+
map: chunk_time_to_chunk,
209+
table_def,
210+
};
201211

202-
for chunk in snapshot_chunks {
212+
for chunk in snapshot_chunks_iter {
213+
debug!(">>> starting with new chunk");
203214
let table_name =
204215
db_schema.table_id_to_name(table_id).expect("table exists");
205-
// mapping between time to main record batch array's index
206-
let mut smaller_chunks: HashMap<i64, (MinMax, Vec<u64>)> = HashMap::new();
207-
let smaller_duration = Duration::from_secs(10).as_nanos() as i64;
208-
let all_times = chunk
209-
.record_batch
210-
.column_by_name("time")
211-
.expect("time col to be present")
212-
.as_primitive::<TimestampNanosecondType>()
213-
.values();
214-
for (idx, time) in all_times.iter().enumerate() {
215-
let smaller_chunk_time = time - (time % smaller_duration);
216-
let (min_max, vec_indices) = smaller_chunks
217-
.entry(smaller_chunk_time)
218-
.or_insert_with(|| (MinMax::new(i64::MAX, i64::MIN), Vec::new()));
219-
220-
min_max.update(*time);
221-
vec_indices.push(idx as u64);
222-
}
223216

224-
let total_row_count = chunk.record_batch.column(0).len();
217+
if snapshot_details.forced {
218+
// when forced, we're already under memory pressure so create smaller
219+
// chunks (by time) and they need to be non-overlapping.
220+
// 1. Create smaller groups (using smaller duration), 10 secs here
221+
let mut smaller_chunks: HashMap<i64, (MinMax, Vec<u64>)> = HashMap::new();
222+
let smaller_duration = Duration::from_secs(10).as_nanos() as i64;
223+
let all_times = chunk
224+
.record_batch
225+
.column_by_name("time")
226+
.expect("time col to be present")
227+
.as_primitive::<TimestampNanosecondType>()
228+
.values();
229+
230+
for (idx, time) in all_times.iter().enumerate() {
231+
let smaller_chunk_time = time - (time % smaller_duration);
232+
let (min_max, vec_indices) = smaller_chunks
233+
.entry(smaller_chunk_time)
234+
.or_insert_with(|| (MinMax::new(i64::MAX, i64::MIN), Vec::new()));
235+
236+
min_max.update(*time);
237+
vec_indices.push(idx as u64);
238+
}
225239

226-
for (smaller_chunk_time, (min_max, all_indexes)) in smaller_chunks.iter() {
227-
debug!(
228-
?smaller_chunk_time,
229-
?min_max,
230-
num_indexes = ?all_indexes.len(),
231-
?total_row_count,
232-
">>> number of small chunks");
233-
}
240+
let total_row_count = chunk.record_batch.column(0).len();
234241

235-
// at this point we have a bucket for each 10 sec block, we can create
236-
// smaller record batches here but maybe wasteful if we ever needed one
237-
// batch (let's see how this works first and then decide what can happen)
238-
let batch_schema = chunk.record_batch.schema();
239-
debug!(schema = ?chunk.schema, ">>> influx schema");
240-
debug!(arrow_schema = ?batch_schema, ">>> batch schema");
241-
let parent_cols = chunk.record_batch.columns();
242-
243-
for (smaller_chunk_time, (min_max, all_indexes)) in
244-
smaller_chunks.into_iter()
245-
{
246-
let mut smaller_chunk_cols = vec![];
247-
let indices = UInt64Array::from_iter(all_indexes);
248-
for arr in parent_cols {
249-
let filtered =
250-
take(&arr, &indices, None)
251-
.expect("index should be accessible in parent cols");
252-
253-
debug!(smaller_chunk_len = ?filtered.len(), ">>> filtered size");
254-
smaller_chunk_cols.push(filtered);
242+
for (smaller_chunk_time, (min_max, all_indexes)) in smaller_chunks.iter() {
243+
debug!(
244+
?smaller_chunk_time,
245+
?min_max,
246+
num_indexes = ?all_indexes.len(),
247+
?total_row_count,
248+
">>> number of small chunks");
255249
}
256-
debug!(smaller_chunk_len = ?smaller_chunk_cols.len(), ">>> smaller chunks size");
257-
let smaller_rec_batch =
258-
RecordBatch::try_new(Arc::clone(&batch_schema), smaller_chunk_cols)
259-
.expect("create smaller record batch");
250+
251+
// 2. At this point we have a bucket for each 10 sec block with related
252+
// indexes from main chunk. Use those indexes to "cheaply" create
253+
// smaller record batches.
254+
let batch_schema = chunk.record_batch.schema();
255+
let parent_cols = chunk.record_batch.columns();
256+
257+
for (loop_idx, (smaller_chunk_time, (min_max, all_indexes))) in
258+
smaller_chunks.into_iter().enumerate()
259+
{
260+
let mut smaller_chunk_cols = vec![];
261+
let indices = UInt64Array::from_iter(all_indexes);
262+
for arr in parent_cols {
263+
// `take` here minimises allocations but is not completely free,
264+
// it still needs to allocate for smaller batches. The
265+
// allocations are in `ScalarBuffer::from_iter` under the hood
266+
let filtered =
267+
take(&arr, &indices, None)
268+
.expect("index should be accessible in parent cols");
269+
270+
smaller_chunk_cols.push(filtered);
271+
}
272+
let smaller_rec_batch =
273+
RecordBatch::try_new(Arc::clone(&batch_schema), smaller_chunk_cols)
274+
.expect("create smaller record batch");
275+
let persist_job = PersistJob {
276+
database_id: *database_id,
277+
table_id: *table_id,
278+
table_name: Arc::clone(&table_name),
279+
chunk_time: smaller_chunk_time,
280+
path: ParquetFilePath::new(
281+
self.persister.node_identifier_prefix(),
282+
db_schema.name.as_ref(),
283+
database_id.as_u32(),
284+
table_name.as_ref(),
285+
table_id.as_u32(),
286+
smaller_chunk_time,
287+
snapshot_details.last_wal_sequence_number,
288+
Some(loop_idx as u64),
289+
),
290+
batch: smaller_rec_batch,
291+
schema: chunk.schema.clone(),
292+
timestamp_min_max: min_max.to_ts_min_max(),
293+
sort_key: sort_key.clone(),
294+
};
295+
persisting_chunks.push(persist_job);
296+
}
297+
298+
} else {
260299
let persist_job = PersistJob {
261300
database_id: *database_id,
262301
table_id: *table_id,
263302
table_name: Arc::clone(&table_name),
264-
chunk_time: smaller_chunk_time,
303+
chunk_time: chunk.chunk_time,
265304
path: ParquetFilePath::new(
266305
self.persister.node_identifier_prefix(),
267306
db_schema.name.as_ref(),
268307
database_id.as_u32(),
269308
table_name.as_ref(),
270309
table_id.as_u32(),
271-
smaller_chunk_time,
310+
chunk.chunk_time,
272311
snapshot_details.last_wal_sequence_number,
312+
None,
273313
),
274-
batch: smaller_rec_batch,
275-
// this schema.clone() can be avoided?
314+
// these clones are cheap and done one at a time
315+
batch: chunk.record_batch.clone(),
276316
schema: chunk.schema.clone(),
277-
timestamp_min_max: min_max.to_ts_min_max(),
278-
sort_key: table_buffer.sort_key.clone(),
317+
timestamp_min_max: chunk.timestamp_min_max,
318+
sort_key: sort_key.clone(),
279319
};
280320
persisting_chunks.push(persist_job);
281321
}
282-
// let fields = batch_schema
283-
// .fields()
284-
// .iter()
285-
// .map(|field| SortField::new(field.data_type().clone()))
286-
// .collect();
287-
// debug!(?fields, ">>> schema fields");
288-
//
289-
// let converter =
290-
// RowConverter::new(fields).expect("row converter created from fields");
291-
// debug!(?converter, ">>> converter");
292-
//
293-
// let rows = converter
294-
// .convert_columns(parent_cols)
295-
// .expect("convert cols to rows to succeed");
296-
// debug!(?rows, ">>> all rows");
297-
//
298-
// for (smaller_chunk_time, (min_max, all_indexes)) in smaller_chunks.iter() {
299-
//
300-
// // create a record batch using just all_indexes from parent recordbatch
301-
// let all_rows = all_indexes
302-
// .iter()
303-
// .map(|idx| rows.row(*idx))
304-
// .collect::<Vec<_>>();
305-
// debug!(?rows, ">>> all filtered child rows");
306-
//
307-
// // hmmm this conversion turns Dictionary types to StringArray, not sure
308-
// // why
309-
// let child_cols = converter
310-
// .convert_rows(all_rows)
311-
// .expect("should convert rows back to cols");
312-
// debug!(?child_cols, ">>> all child cols");
313-
//
314-
// let smaller_rec_batch =
315-
// RecordBatch::try_new(Arc::clone(&batch_schema), child_cols)
316-
// .expect("create smaller record batch");
317-
// let persist_job = PersistJob {
318-
// database_id: *database_id,
319-
// table_id: *table_id,
320-
// table_name: Arc::clone(&table_name),
321-
// chunk_time: *smaller_chunk_time,
322-
// path: ParquetFilePath::new(
323-
// self.persister.node_identifier_prefix(),
324-
// db_schema.name.as_ref(),
325-
// database_id.as_u32(),
326-
// table_name.as_ref(),
327-
// table_id.as_u32(),
328-
// *smaller_chunk_time,
329-
// snapshot_details.last_wal_sequence_number,
330-
// ),
331-
// batch: smaller_rec_batch,
332-
// // this schema.clone() can be avoided?
333-
// schema: chunk.schema.clone(),
334-
// timestamp_min_max: min_max.to_ts_min_max(),
335-
// sort_key: table_buffer.sort_key.clone(),
336-
// };
337-
// persisting_chunks.push(persist_job);
338-
// }
322+
snapshot_chunks.push(chunk);
323+
debug!(">>> finished with chunk");
339324
}
340325
}
341326
}

0 commit comments

Comments
 (0)