Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
1e356d3
Improve uuid support in the vectorized pipeline
akuzm Sep 3, 2025
8b018ad
fix
akuzm Sep 3, 2025
2571f3d
fix the groupagg test
akuzm Sep 3, 2025
2b99ab1
ref
akuzm Sep 3, 2025
5d64912
ignore the test
akuzm Sep 4, 2025
224616e
ignore not skip
akuzm Sep 4, 2025
fab4dce
some experiments
akuzm Sep 4, 2025
929fbaf
opexpr
akuzm Sep 5, 2025
48112a7
memory context
akuzm Sep 5, 2025
5f4d32b
things
akuzm Sep 8, 2025
fd4f81d
stuff
akuzm Sep 10, 2025
ebc4ef3
benchmark vectorized expressions (2025-09-10 no. 2)
akuzm Sep 10, 2025
4cb0b89
refactor more
akuzm Sep 15, 2025
4eb49db
some progress
akuzm Sep 16, 2025
b5f28b2
byref
akuzm Sep 16, 2025
9572e43
benchmark vectorized expressions (2025-09-16 no. 3)
akuzm Sep 16, 2025
0907c12
typo
akuzm Sep 16, 2025
02a7cad
benchmark vectorized expressions (2025-09-16 no. 4)
akuzm Sep 16, 2025
37d5ea7
experiments
akuzm Sep 17, 2025
38c51fe
try to support text
akuzm Sep 17, 2025
7366184
benchmark vectorized expressions (2025-09-17 no. 5)
akuzm Sep 17, 2025
dd8180e
cleanups
akuzm Sep 18, 2025
3e5c0cc
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Sep 18, 2025
bae656d
null results
akuzm Sep 18, 2025
12b74bc
more tests
akuzm Sep 19, 2025
48e475d
Make the vector_agg_groupagg test collation-independent
akuzm Sep 19, 2025
f472b39
refs
akuzm Sep 19, 2025
f38c818
Merge remote-tracking branch 'akuzm/collation-dash' into HEAD
akuzm Sep 19, 2025
b3ea9e3
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Sep 24, 2025
f622ecf
some tests
akuzm Sep 24, 2025
a37d970
fix
akuzm Sep 24, 2025
7e72054
ref
akuzm Sep 25, 2025
88582ae
bool
akuzm Sep 26, 2025
7c8845e
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Sep 26, 2025
d4e2583
benchmark vectorized expressions (2025-09-26 no. 1)
akuzm Sep 26, 2025
696568a
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Oct 6, 2025
462bb3d
cleanup
akuzm Oct 6, 2025
35970e3
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Oct 8, 2025
78363d7
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Oct 8, 2025
af793f2
fix after merge
akuzm Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions tsl/src/compression/arrow_c_data_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,18 @@ arrow_set_row_validity(uint64 *bitmap, size_t row_number, bool value)
}

/*
* Combine the validity bitmaps into the given storage.
* Combine the validity bitmaps into the given storage. Can return one of the
* input filters if the others are NULL.
*/
static inline const uint64 *
static inline pg_nodiscard const uint64 *
arrow_combine_validity(size_t num_words, uint64 *restrict storage, const uint64 *filter1,
const uint64 *filter2, const uint64 *filter3)
{
Assert(num_words != 0);
Assert(storage != filter1);
Assert(storage != filter2);
Assert(storage != filter3);

/*
* Any and all of the filters can be null. For simplicity, move the non-null
* filters to the leading positions.
Expand Down Expand Up @@ -256,6 +262,23 @@ arrow_combine_validity(size_t num_words, uint64 *restrict storage, const uint64
return storage;
}

/*
* Do the &= operation on bitmaps. The right argument can be NULL.
*/
static inline void
arrow_validity_and(int num_words, uint64 *restrict left, const uint64 *right)
{
if (right == NULL)
{
return;
}

for (int i = 0; i < num_words; i++)
{
left[i] &= right[i];
}
}

/*
* Increase the `source_value` to be an even multiple of `pad_to`.
*/
Expand All @@ -268,6 +291,8 @@ pad_to_multiple(uint64 pad_to, uint64 source_value)
static inline int
arrow_num_valid(const uint64 *bitmap, size_t total_rows)
{
Assert(total_rows != 0);

if (bitmap == NULL)
{
return total_rows;
Expand Down
97 changes: 4 additions & 93 deletions tsl/src/nodes/decompress_chunk/compressed_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ make_single_value_arrow(Oid pgtype, Datum datum, bool isnull)
return make_single_value_arrow_arithmetic(pgtype, datum, isnull);
}

static int
int
get_max_text_datum_size(ArrowArray *text_array)
{
int maxbytes = 0;
Expand Down Expand Up @@ -1079,21 +1079,6 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext,
}
}

static void
store_text_datum(CompressedColumnValues *column_values, int arrow_row)
{
const uint32 start = ((uint32 *) column_values->buffers[1])[arrow_row];
const int32 value_bytes = ((uint32 *) column_values->buffers[1])[arrow_row + 1] - start;
Assert(value_bytes >= 0);

const int total_bytes = value_bytes + VARHDRSZ;
Assert(DatumGetPointer(*column_values->output_value) != NULL);
SET_VARSIZE(*column_values->output_value, total_bytes);
memcpy(VARDATA(*column_values->output_value),
&((uint8 *) column_values->buffers[2])[start],
value_bytes);
}

/*
* Construct the next tuple in the decompressed scan slot.
* Doesn't check the quals.
Expand All @@ -1106,83 +1091,9 @@ make_next_tuple(DecompressBatchState *batch_state, uint16 arrow_row, int num_dat
Assert(batch_state->total_batch_rows > 0);
Assert(batch_state->next_batch_row < batch_state->total_batch_rows);

for (int i = 0; i < num_data_columns; i++)
{
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
if (column_values->decompression_type == DT_Iterator)
{
DecompressionIterator *iterator = (DecompressionIterator *) column_values->buffers[0];
DecompressResult result = iterator->try_next(iterator);

if (result.is_done)
{
elog(ERROR, "compressed column out of sync with batch counter");
}

*column_values->output_isnull = result.is_null;
*column_values->output_value = result.val;
}
else if (column_values->decompression_type > SIZEOF_DATUM)
{
/*
* Fixed-width by-reference type that doesn't fit into a Datum.
* For now this only happens for 8-byte types on 32-bit systems,
* but eventually we could also use it for bigger by-value types
* such as UUID.
*/
const uint8 value_bytes = column_values->decompression_type;
const char *src = column_values->buffers[1];
*column_values->output_value = PointerGetDatum(&src[value_bytes * arrow_row]);
*column_values->output_isnull =
!arrow_row_is_valid(column_values->buffers[0], arrow_row);
}
else if (column_values->decompression_type == DT_ArrowBits)
{
/*
* The DT_ArrowBits type is a special case, because the value is
* stored as an Array of bits.
*/
*column_values->output_value =
BoolGetDatum(arrow_row_is_valid(column_values->buffers[1], arrow_row));
*column_values->output_isnull =
!arrow_row_is_valid(column_values->buffers[0], arrow_row);
}
else if (column_values->decompression_type > 0)
{
/*
* Fixed-width by-value type that fits into a Datum.
*
* The conversion of Datum to more narrow types will truncate
* the higher bytes, so we don't care if we read some garbage
* into them, and can always read 8 bytes. These are unaligned
* reads, so technically we have to do memcpy.
*/
const uint8 value_bytes = column_values->decompression_type;
Assert(value_bytes <= SIZEOF_DATUM);
const char *src = column_values->buffers[1];
memcpy(column_values->output_value, &src[value_bytes * arrow_row], SIZEOF_DATUM);
*column_values->output_isnull =
!arrow_row_is_valid(column_values->buffers[0], arrow_row);
}
else if (column_values->decompression_type == DT_ArrowText)
{
store_text_datum(column_values, arrow_row);
*column_values->output_isnull =
!arrow_row_is_valid(column_values->buffers[0], arrow_row);
}
else if (column_values->decompression_type == DT_ArrowTextDict)
{
const int16 index = ((int16 *) column_values->buffers[3])[arrow_row];
store_text_datum(column_values, index);
*column_values->output_isnull =
!arrow_row_is_valid(column_values->buffers[0], arrow_row);
}
else
{
/* A compressed column with default value, do nothing. */
Assert(column_values->decompression_type == DT_Scalar);
}
}
compressed_columns_to_postgres_data(batch_state->compressed_columns,
num_data_columns,
arrow_row);

/*
* It's a virtual tuple slot, so no point in clearing/storing it
Expand Down
116 changes: 116 additions & 0 deletions tsl/src/nodes/decompress_chunk/compressed_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,119 @@ typedef struct CompressedBatchVectorQualState

const ArrowArray *compressed_batch_get_arrow_array(VectorQualState *vqstate, Expr *expr,
bool *is_default_value);
int get_max_text_datum_size(ArrowArray *text_array);

inline static void
store_text_datum(CompressedColumnValues *column_values, int arrow_row)
{
const uint32 start = ((uint32 *) column_values->buffers[1])[arrow_row];
const int32 value_bytes = ((uint32 *) column_values->buffers[1])[arrow_row + 1] - start;
Assert(value_bytes >= 0);

const int total_bytes = value_bytes + VARHDRSZ;
Assert(DatumGetPointer(*column_values->output_value) != NULL);
SET_VARSIZE(*column_values->output_value, total_bytes);
memcpy(VARDATA(*column_values->output_value),
&((uint8 *) column_values->buffers[2])[start],
value_bytes);
}

inline static void
compressed_columns_to_postgres_data(CompressedColumnValues *columns, int num_data_columns,
uint16 arrow_row)
{
for (int i = 0; i < num_data_columns; i++)
{
CompressedColumnValues *column_values = &columns[i];
switch ((int) column_values->decompression_type)
{
case DT_Iterator:
{
DecompressionIterator *iterator =
(DecompressionIterator *) column_values->buffers[0];
DecompressResult result = iterator->try_next(iterator);

if (result.is_done)
{
elog(ERROR, "compressed column out of sync with batch counter");
}

*column_values->output_isnull = result.is_null;
*column_values->output_value = result.val;
break;
}
#ifndef USE_FLOAT8_BYVAL
case 8:
#endif
case 16:
{
/*
* Fixed-width by-reference type that doesn't fit into a Datum.
* For now this only happens for 8-byte types on 32-bit systems,
* but eventually we could also use it for bigger by-value types
* such as UUID.
*/
const uint8 value_bytes = column_values->decompression_type;
const char *src = column_values->buffers[1];
*column_values->output_value = PointerGetDatum(&src[value_bytes * arrow_row]);
*column_values->output_isnull =
!arrow_row_is_valid(column_values->buffers[0], arrow_row);
break;
}
case DT_ArrowBits:
{
/*
* The DT_ArrowBits type is a special case, because the value is
* stored as an Array of bits.
*/
*column_values->output_value =
BoolGetDatum(arrow_row_is_valid(column_values->buffers[1], arrow_row));
*column_values->output_isnull =
!arrow_row_is_valid(column_values->buffers[0], arrow_row);
break;
}
case 2:
case 4:
#ifdef USE_FLOAT8_BYVAL
case 8:
#endif
{
/*
* Fixed-width by-value type that fits into a Datum.
*
* The conversion of Datum to more narrow types will truncate
* the higher bytes, so we don't care if we read some garbage
* into them, and can always read 8 bytes. These are unaligned
* reads, so technically we have to do memcpy.
*/
const uint8 value_bytes = column_values->decompression_type;
Assert(value_bytes <= SIZEOF_DATUM);
const char *src = column_values->buffers[1];
memcpy(column_values->output_value, &src[value_bytes * arrow_row], SIZEOF_DATUM);
*column_values->output_isnull =
!arrow_row_is_valid(column_values->buffers[0], arrow_row);
break;
}
case DT_ArrowText:
{
store_text_datum(column_values, arrow_row);
*column_values->output_isnull =
!arrow_row_is_valid(column_values->buffers[0], arrow_row);
break;
}
case DT_ArrowTextDict:
{
const int16 index = ((int16 *) column_values->buffers[3])[arrow_row];
store_text_datum(column_values, index);
*column_values->output_isnull =
!arrow_row_is_valid(column_values->buffers[0], arrow_row);
break;
}
default:
{
/* A compressed column with default value, do nothing. */
Assert(column_values->decompression_type == DT_Scalar);
}
}
}
}
28 changes: 28 additions & 0 deletions tsl/src/nodes/decompress_chunk/vector_predicates.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,31 @@ vector_booleantest(const ArrowArray *arrow, int test_type, uint64 *restrict resu
break;
}
}

static void
vector_int8pl(const ArrowArray **args, int nargs, ArrowArray *result)
{
Ensure(nargs == 2, "wrong number of arguments %d given to function %s", nargs, __FUNCTION__);

const int n = args[0]->length;
Ensure(args[1]->length == n, "argument length mismatch");
Ensure(result->length == n, "result length mismatch");

int64 *restrict values = (int64 *) result->buffers[1];
for (int i = 0; i < n; i++)
{
values[i] =
((const int64 *) args[0]->buffers[1])[i] + ((const int64 *) args[1]->buffers[1])[i];
}
}

VectorFunction *
get_vector_function(Oid pg_function)
{
switch (pg_function)
{
case F_INT8PL:
return vector_int8pl;
}
return NULL;
}
4 changes: 4 additions & 0 deletions tsl/src/nodes/decompress_chunk/vector_predicates.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ typedef void(VectorPredicate)(const ArrowArray *, Datum, uint64 *restrict);

VectorPredicate *get_vector_const_predicate(Oid pg_predicate);

typedef void(VectorFunction)(const ArrowArray **args, int nargs, ArrowArray *result);

VectorFunction *get_vector_function(Oid pg_function);

void vector_array_predicate(VectorPredicate *vector_const_predicate, bool is_or,
const ArrowArray *vector, Datum array, uint64 *restrict final_result);

Expand Down
Loading
Loading