Skip to content

BatchCoalescer but without automatic batching #8850

@corasaurus-hex

Description

@corasaurus-hex

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

I am creating new record batches from a stream of record batches. I need to ensure that identical values in a sorted column are always located within the same record batch. BatchCoalescer isn't a good fit for this but I would like to take advantage of the optimized machinery within BatchCoalescer to accomplish this -- except nothing outside of that crate has access to that machinery.

Describe the solution you'd like

I would like something akin to BatchCoalescer but that gives me the control to decide when to batch buffered records into a batch (and maybe how many of the buffered records should be in the batch?). It must allow me to not know what the largest record batch size will be up front.

Implementation-wise, @timsaucer suggested it could be expressed in terms of a BatchCoalescer<T> that gives you control of batching by you implementing a trait and passing that a value of that trait into the coalescer.

I was thinking that it could be something like a lower-level vec + batch abstraction, where you can add new batches to it and it will re-allocate the internal arrays when the capacity is exhausted or even chain arrays together as more capacity is needed. This can have a function to emit a record batch based on a slice of that data... although maybe that implies the need for something like a ring buffer that can have an offset at the start of the arrays and a capacity that can be grown.

Describe alternatives you've considered

I created a wrapper around BatchCoalescer. I wanted to set an impossibly large target batch size, but the problem is that that size is used to allocate the arrays for columns up front and so using something like usize::MAX isn't possible. I instead have to guess at what the maximum batch size will be with the trade-off that I don't want it to be too large and use too much memory. If I run out of space I create a new BatchCoalescer with a larger capacity and re-push the data within the previous BatchCoalescer into that new BatchCoalescer before pushing my new records into it.

Alternatively I could just not use any sort of optimized machinery and instead slice up record batches myself and then concat them myself as well.

Additional context

@timsaucer also recently had a need for a custom BatchCoalescer except he needs it to spit out batches when it either reaches a certain number of rows or a specific total size in bytes. It seems like there might be more cases out there where re-batching based on different criteria is needed.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementAny new improvement worthy of a entry in the changelog

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions