Skip to content

Commit 4e9f2df

Browse files
authored
Scalar array operation pushdown to bloom filters (#8465)
Transform `x = any(array[...])` into `bloom1_contains_any` function call that checks the array elements against the bloom filter sparse index.
1 parent c943d7f commit 4e9f2df

20 files changed

+837
-62
lines changed

.unreleased/bloom-contains-any

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Implements: #8465 Speed up the filters like `x = any(array[...])` using bloom filter sparse indexes.

sql/sparse_index.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ RETURNS bool
77
AS '@MODULE_PATHNAME@', 'ts_bloom1_contains'
88
LANGUAGE C IMMUTABLE PARALLEL SAFE;
99

10+
CREATE OR REPLACE FUNCTION _timescaledb_functions.bloom1_contains_any(_timescaledb_internal.bloom1, anyarray)
11+
RETURNS bool
12+
AS '@MODULE_PATHNAME@', 'ts_bloom1_contains_any'
13+
LANGUAGE C IMMUTABLE PARALLEL SAFE;
14+
1015
CREATE OR REPLACE FUNCTION _timescaledb_functions.jsonb_get_matching_index_entry(
1116
config jsonb,
1217
attr_name text,

sql/updates/latest-dev.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
CREATE FUNCTION _timescaledb_functions.bloom1_contains_any(_timescaledb_internal.bloom1, anyarray)
2+
RETURNS bool
3+
AS '@MODULE_PATHNAME@', 'ts_update_placeholder'
4+
LANGUAGE C IMMUTABLE PARALLEL SAFE;
5+
16
DROP FUNCTION IF EXISTS _timescaledb_functions.policy_job_stat_history_retention;
27
DROP VIEW IF EXISTS timescaledb_information.chunks;
38

sql/updates/reverse-dev.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
1+
DROP FUNCTION _timescaledb_functions.bloom1_contains_any(_timescaledb_internal.bloom1, anyarray);
2+
13
DROP FUNCTION IF EXISTS _timescaledb_functions.policy_job_stat_history_retention;
24
DROP VIEW IF EXISTS timescaledb_information.chunks;

src/cross_module_fn.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ CROSSMODULE_WRAPPER(create_compressed_chunk);
8989
CROSSMODULE_WRAPPER(compress_chunk);
9090
CROSSMODULE_WRAPPER(decompress_chunk);
9191
CROSSMODULE_WRAPPER(bloom1_contains);
92+
CROSSMODULE_WRAPPER(bloom1_contains_any);
9293

9394
/* continuous aggregate */
9495
CROSSMODULE_WRAPPER(continuous_agg_refresh);
@@ -403,6 +404,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
403404
.uuid_compressor_append = error_no_default_fn_pg_community,
404405
.uuid_compressor_finish = error_no_default_fn_pg_community,
405406
.bloom1_contains = error_no_default_fn_pg_community,
407+
.bloom1_contains_any = error_no_default_fn_pg_community,
406408
.bloom1_get_hash_function = bloom1_get_hash_function_default,
407409

408410
.decompress_batches_for_insert = error_no_default_fn_chunk_insert_state_community,

src/cross_module_fn.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ typedef struct CrossModuleFunctions
164164
PGFunction uuid_compressor_append;
165165
PGFunction uuid_compressor_finish;
166166
PGFunction bloom1_contains;
167+
PGFunction bloom1_contains_any;
167168
PGFunction (*bloom1_get_hash_function)(Oid type, FmgrInfo **finfo);
168169

169170
PGFunction create_chunk;

tsl/src/compression/batch_metadata_builder_bloom1.c

Lines changed: 223 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,77 @@ bloom1_update_val(void *builder_, Datum needle)
403403
}
404404
}
405405

406+
/*
407+
* We cache some information across function calls in this context.
408+
*/
409+
typedef struct Bloom1ContainsContext
410+
{
411+
PGFunction hash_function_pointer;
412+
FmgrInfo *hash_function_finfo;
413+
414+
Oid element_type;
415+
int16 element_typlen;
416+
bool element_typbyval;
417+
char element_typalign;
418+
419+
/* This is per-row, here for convenience. */
420+
struct varlena *current_row_bloom;
421+
} Bloom1ContainsContext;
422+
423+
static Bloom1ContainsContext *
424+
bloom1_contains_context_prepare(FunctionCallInfo fcinfo, bool use_element_type)
425+
{
426+
Bloom1ContainsContext *context = (Bloom1ContainsContext *) fcinfo->flinfo->fn_extra;
427+
if (context == NULL)
428+
{
429+
Ensure(PG_NARGS() == 2, "bloom1_contains called with wrong number of arguments");
430+
431+
context = MemoryContextAllocZero(fcinfo->flinfo->fn_mcxt, sizeof(*context));
432+
433+
context->element_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
434+
if (use_element_type)
435+
{
436+
context->element_type = get_element_type(context->element_type);
437+
Ensure(OidIsValid(context->element_type),
438+
"cannot determine array element type for bloom1_contains_any");
439+
}
440+
441+
context->hash_function_pointer =
442+
bloom1_get_hash_function(context->element_type, &context->hash_function_finfo);
443+
444+
/*
445+
* Technically this function is callable by user with arbitrary argument
446+
* that might not have an extended hash function, so report this error
447+
* gracefully.
448+
*/
449+
if (context->hash_function_pointer == NULL)
450+
{
451+
ereport(ERROR,
452+
(errcode(ERRCODE_DATA_EXCEPTION),
453+
errmsg("the argument type %s lacks an extended hash function",
454+
format_type_be(context->element_type))));
455+
}
456+
457+
get_typlenbyvalalign(context->element_type,
458+
&context->element_typlen,
459+
&context->element_typbyval,
460+
&context->element_typalign);
461+
462+
fcinfo->flinfo->fn_extra = context;
463+
}
464+
465+
if (PG_ARGISNULL(0))
466+
{
467+
context->current_row_bloom = NULL;
468+
}
469+
else
470+
{
471+
context->current_row_bloom = PG_GETARG_VARLENA_P(0);
472+
}
473+
474+
return context;
475+
}
476+
406477
/*
407478
* Checks whether the given element can be present in the given bloom filter.
408479
* This is what we use in predicate pushdown. The SQL signature is:
@@ -411,11 +482,15 @@ bloom1_update_val(void *builder_, Datum needle)
411482
Datum
412483
bloom1_contains(PG_FUNCTION_ARGS)
413484
{
485+
Bloom1ContainsContext *context =
486+
bloom1_contains_context_prepare(fcinfo, /* use_element_type = */ false);
487+
414488
/*
415489
* This function is not strict, because if we don't have a bloom filter, this
416490
* means the condition can potentially be true.
417491
*/
418-
if (PG_ARGISNULL(0))
492+
struct varlena *bloom = context->current_row_bloom;
493+
if (bloom == NULL)
419494
{
420495
PG_RETURN_BOOL(true);
421496
}
@@ -429,23 +504,9 @@ bloom1_contains(PG_FUNCTION_ARGS)
429504
PG_RETURN_BOOL(false);
430505
}
431506

432-
Oid type_oid = get_fn_expr_argtype(fcinfo->flinfo, 1);
433-
FmgrInfo *finfo = NULL;
434-
PGFunction fn = bloom1_get_hash_function(type_oid, &finfo);
435507
/*
436-
* Technically this function is callable by user with arbitrary argument
437-
* that might not have an extended hash function, so report this error
438-
* gracefully.
508+
* Compute the bloom filter parameters.
439509
*/
440-
if (fn == NULL)
441-
{
442-
ereport(ERROR,
443-
(errcode(ERRCODE_DATA_EXCEPTION),
444-
errmsg("the argument type %s lacks an extended hash function",
445-
format_type_be(type_oid))));
446-
}
447-
448-
struct varlena *bloom = PG_GETARG_VARLENA_P(0);
449510
const char *words_buf = bloom1_words_buf(bloom);
450511
const uint32 num_bits = bloom1_num_bits(bloom);
451512

@@ -464,7 +525,8 @@ bloom1_contains(PG_FUNCTION_ARGS)
464525
Assert((word_mask >> num_word_bits) == 0);
465526

466527
Datum needle = PG_GETARG_DATUM(1);
467-
const uint64 datum_hash_1 = calculate_hash(fn, finfo, needle);
528+
const uint64 datum_hash_1 =
529+
calculate_hash(context->hash_function_pointer, context->hash_function_finfo, needle);
468530
const uint32 absolute_mask = num_bits - 1;
469531
for (int i = 0; i < BLOOM1_HASHES; i++)
470532
{
@@ -479,6 +541,150 @@ bloom1_contains(PG_FUNCTION_ARGS)
479541
PG_RETURN_BOOL(true);
480542
}
481543

544+
#define ST_SORT sort_hashes
545+
#define ST_ELEMENT_TYPE uint64
546+
#define ST_COMPARE(a, b) ((*(a) > *(b)) - (*(a) < *(b)))
547+
#define ST_SCOPE static
548+
#define ST_DEFINE
549+
#include <lib/sort_template.h>
550+
551+
/*
552+
* Checks whether any element of the given array can be present in the given
553+
* bloom filter. This is used for predicate pushdown for x = any(array[...]).
554+
* The SQL signature is:
555+
* _timescaledb_functions.bloom1_contains_any(bloom1, anyarray)
556+
*/
557+
Datum
558+
bloom1_contains_any(PG_FUNCTION_ARGS)
559+
{
560+
Bloom1ContainsContext *context =
561+
bloom1_contains_context_prepare(fcinfo, /* use_element_type = */ true);
562+
563+
/*
564+
* This function is not strict, because if we don't have a bloom filter, this
565+
* means the condition can potentially be true.
566+
*/
567+
struct varlena *bloom = context->current_row_bloom;
568+
if (bloom == NULL)
569+
{
570+
PG_RETURN_BOOL(true);
571+
}
572+
573+
/*
574+
* A null value cannot match the equality condition, although this probably
575+
* should be optimized away by the planner.
576+
*/
577+
if (PG_ARGISNULL(1))
578+
{
579+
PG_RETURN_BOOL(false);
580+
}
581+
582+
int num_items;
583+
Datum *items;
584+
bool *nulls;
585+
deconstruct_array(PG_GETARG_ARRAYTYPE_P(1),
586+
context->element_type,
587+
context->element_typlen,
588+
context->element_typbyval,
589+
context->element_typalign,
590+
&items,
591+
&nulls,
592+
&num_items);
593+
594+
if (num_items == 0)
595+
{
596+
PG_RETURN_BOOL(false);
597+
}
598+
599+
/*
600+
* Calculate the per-item base hashes that will be used for computing the
601+
* individual bloom filter bit offsets. We can reuse the "items" space to
602+
* avoid more allocations, but have to allocate as a fallback on 32-bit
603+
* systems.
604+
*/
605+
#if FLOAT8PASSBYVAL
606+
uint64 *item_base_hashes = items;
607+
#else
608+
uint64 *item_base_hashes = palloc(sizeof(uint64) * num_items);
609+
#endif
610+
611+
FmgrInfo *finfo = context->hash_function_finfo;
612+
PGFunction hash_fn = context->hash_function_pointer;
613+
614+
int valid = 0;
615+
for (int i = 0; i < num_items; i++)
616+
{
617+
if (nulls[i])
618+
{
619+
/*
620+
* A null value cannot match the equality condition.
621+
*/
622+
continue;
623+
}
624+
625+
item_base_hashes[valid++] = calculate_hash(hash_fn, finfo, items[i]);
626+
}
627+
628+
if (valid == 0)
629+
{
630+
/*
631+
* No non-null elements.
632+
*/
633+
PG_RETURN_BOOL(false);
634+
}
635+
636+
/*
637+
* Sort the hashes for cache-friendly probing.
638+
*/
639+
sort_hashes(item_base_hashes, valid);
640+
641+
/*
642+
* Get the bloom filter parameters.
643+
*/
644+
const char *words_buf = bloom1_words_buf(bloom);
645+
const uint32 num_bits = bloom1_num_bits(bloom);
646+
647+
/* Must be a power of two. */
648+
CheckCompressedData(num_bits == (1ULL << pg_leftmost_one_pos32(num_bits)));
649+
650+
/* Must be >= 64 bits. */
651+
CheckCompressedData(num_bits >= 64);
652+
653+
const uint32 num_word_bits = sizeof(*words_buf) * 8;
654+
Assert(num_bits % num_word_bits == 0);
655+
const uint32 log2_word_bits = pg_leftmost_one_pos32(num_word_bits);
656+
Assert(num_word_bits == (1ULL << log2_word_bits));
657+
658+
const uint32 word_mask = num_word_bits - 1;
659+
Assert((word_mask >> num_word_bits) == 0);
660+
661+
const uint32 absolute_mask = num_bits - 1;
662+
663+
/* Probe the bloom filter. */
664+
for (int item_index = 0; item_index < valid; item_index++)
665+
{
666+
const uint64 base_hash = item_base_hashes[item_index];
667+
bool match = true;
668+
for (int i = 0; i < BLOOM1_HASHES; i++)
669+
{
670+
const uint32 absolute_bit_index = bloom1_get_one_offset(base_hash, i) & absolute_mask;
671+
const uint32 word_index = absolute_bit_index >> log2_word_bits;
672+
const uint32 word_bit_index = absolute_bit_index & word_mask;
673+
if ((words_buf[word_index] & (1ULL << word_bit_index)) == 0)
674+
{
675+
match = false;
676+
break;
677+
}
678+
}
679+
if (match)
680+
{
681+
PG_RETURN_BOOL(true);
682+
}
683+
}
684+
685+
PG_RETURN_BOOL(false);
686+
}
687+
482688
static int
483689
bloom1_varlena_alloc_size(int num_bits)
484690
{

tsl/src/compression/sparse_index_bloom1.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,6 @@
99

1010
Datum bloom1_contains(PG_FUNCTION_ARGS);
1111

12+
Datum bloom1_contains_any(PG_FUNCTION_ARGS);
13+
1214
PGFunction bloom1_get_hash_function(Oid type, FmgrInfo **finfo);

tsl/src/init.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ CrossModuleFunctions tsl_cm_functions = {
170170
.uuid_compressor_append = tsl_uuid_compressor_append,
171171
.uuid_compressor_finish = tsl_uuid_compressor_finish,
172172
.bloom1_contains = bloom1_contains,
173+
.bloom1_contains_any = bloom1_contains_any,
173174
.bloom1_get_hash_function = bloom1_get_hash_function,
174175
.process_compress_table = tsl_process_compress_table,
175176
.process_altertable_cmd = tsl_process_altertable_cmd,

0 commit comments

Comments
 (0)