Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/bloom-contains-any
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #8465 Speed up the filters like `x = any(array[...])` using bloom filter sparse indexes.
5 changes: 5 additions & 0 deletions sql/sparse_index.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ RETURNS bool
AS '@MODULE_PATHNAME@', 'ts_bloom1_contains'
LANGUAGE C IMMUTABLE PARALLEL SAFE;

CREATE OR REPLACE FUNCTION _timescaledb_functions.bloom1_contains_any(_timescaledb_internal.bloom1, anyarray)
RETURNS bool
AS '@MODULE_PATHNAME@', 'ts_bloom1_contains_any'
LANGUAGE C IMMUTABLE PARALLEL SAFE;

CREATE OR REPLACE FUNCTION _timescaledb_functions.jsonb_get_matching_index_entry(
config jsonb,
attr_name text,
Expand Down
5 changes: 5 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
CREATE FUNCTION _timescaledb_functions.bloom1_contains_any(_timescaledb_internal.bloom1, anyarray)
RETURNS bool
AS '@MODULE_PATHNAME@', 'ts_update_placeholder'
LANGUAGE C IMMUTABLE PARALLEL SAFE;

DROP FUNCTION IF EXISTS _timescaledb_functions.policy_job_stat_history_retention;
DROP VIEW IF EXISTS timescaledb_information.chunks;

2 changes: 2 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
DROP FUNCTION _timescaledb_functions.bloom1_contains_any(_timescaledb_internal.bloom1, anyarray);

DROP FUNCTION IF EXISTS _timescaledb_functions.policy_job_stat_history_retention;
DROP VIEW IF EXISTS timescaledb_information.chunks;
2 changes: 2 additions & 0 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ CROSSMODULE_WRAPPER(create_compressed_chunk);
CROSSMODULE_WRAPPER(compress_chunk);
CROSSMODULE_WRAPPER(decompress_chunk);
CROSSMODULE_WRAPPER(bloom1_contains);
CROSSMODULE_WRAPPER(bloom1_contains_any);

/* continuous aggregate */
CROSSMODULE_WRAPPER(continuous_agg_refresh);
Expand Down Expand Up @@ -403,6 +404,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.uuid_compressor_append = error_no_default_fn_pg_community,
.uuid_compressor_finish = error_no_default_fn_pg_community,
.bloom1_contains = error_no_default_fn_pg_community,
.bloom1_contains_any = error_no_default_fn_pg_community,
.bloom1_get_hash_function = bloom1_get_hash_function_default,

.decompress_batches_for_insert = error_no_default_fn_chunk_insert_state_community,
Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ typedef struct CrossModuleFunctions
PGFunction uuid_compressor_append;
PGFunction uuid_compressor_finish;
PGFunction bloom1_contains;
PGFunction bloom1_contains_any;
PGFunction (*bloom1_get_hash_function)(Oid type, FmgrInfo **finfo);

PGFunction create_chunk;
Expand Down
240 changes: 223 additions & 17 deletions tsl/src/compression/batch_metadata_builder_bloom1.c
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,77 @@ bloom1_update_val(void *builder_, Datum needle)
}
}

/*
* We cache some information across function calls in this context.
*/
typedef struct Bloom1ContainsContext
{
PGFunction hash_function_pointer;
FmgrInfo *hash_function_finfo;

Oid element_type;
int16 element_typlen;
bool element_typbyval;
char element_typalign;

/* This is per-row, here for convenience. */
struct varlena *current_row_bloom;
} Bloom1ContainsContext;

static Bloom1ContainsContext *
bloom1_contains_context_prepare(FunctionCallInfo fcinfo, bool use_element_type)
{
Bloom1ContainsContext *context = (Bloom1ContainsContext *) fcinfo->flinfo->fn_extra;
if (context == NULL)
{
Ensure(PG_NARGS() == 2, "bloom1_contains called with wrong number of arguments");

context = MemoryContextAllocZero(fcinfo->flinfo->fn_mcxt, sizeof(*context));

context->element_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
if (use_element_type)
{
context->element_type = get_element_type(context->element_type);
Ensure(OidIsValid(context->element_type),
"cannot determine array element type for bloom1_contains_any");
}

context->hash_function_pointer =
bloom1_get_hash_function(context->element_type, &context->hash_function_finfo);

/*
* Technically this function is callable by user with arbitrary argument
* that might not have an extended hash function, so report this error
* gracefully.
*/
if (context->hash_function_pointer == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_DATA_EXCEPTION),
errmsg("the argument type %s lacks an extended hash function",
format_type_be(context->element_type))));
}

get_typlenbyvalalign(context->element_type,
&context->element_typlen,
&context->element_typbyval,
&context->element_typalign);

fcinfo->flinfo->fn_extra = context;
}

if (PG_ARGISNULL(0))
{
context->current_row_bloom = NULL;
}
else
{
context->current_row_bloom = PG_GETARG_VARLENA_P(0);
}

return context;
}

/*
* Checks whether the given element can be present in the given bloom filter.
* This is what we use in predicate pushdown. The SQL signature is:
Expand All @@ -411,11 +482,15 @@ bloom1_update_val(void *builder_, Datum needle)
Datum
bloom1_contains(PG_FUNCTION_ARGS)
{
Bloom1ContainsContext *context =
bloom1_contains_context_prepare(fcinfo, /* use_element_type = */ false);

/*
* This function is not strict, because if we don't have a bloom filter, this
* means the condition can potentially be true.
*/
if (PG_ARGISNULL(0))
struct varlena *bloom = context->current_row_bloom;
if (bloom == NULL)
{
PG_RETURN_BOOL(true);
}
Expand All @@ -429,23 +504,9 @@ bloom1_contains(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(false);
}

Oid type_oid = get_fn_expr_argtype(fcinfo->flinfo, 1);
FmgrInfo *finfo = NULL;
PGFunction fn = bloom1_get_hash_function(type_oid, &finfo);
/*
* Technically this function is callable by user with arbitrary argument
* that might not have an extended hash function, so report this error
* gracefully.
* Compute the bloom filter parameters.
*/
if (fn == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_DATA_EXCEPTION),
errmsg("the argument type %s lacks an extended hash function",
format_type_be(type_oid))));
}

struct varlena *bloom = PG_GETARG_VARLENA_P(0);
const char *words_buf = bloom1_words_buf(bloom);
const uint32 num_bits = bloom1_num_bits(bloom);

Expand All @@ -464,7 +525,8 @@ bloom1_contains(PG_FUNCTION_ARGS)
Assert((word_mask >> num_word_bits) == 0);

Datum needle = PG_GETARG_DATUM(1);
const uint64 datum_hash_1 = calculate_hash(fn, finfo, needle);
const uint64 datum_hash_1 =
calculate_hash(context->hash_function_pointer, context->hash_function_finfo, needle);
const uint32 absolute_mask = num_bits - 1;
for (int i = 0; i < BLOOM1_HASHES; i++)
{
Expand All @@ -479,6 +541,150 @@ bloom1_contains(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(true);
}

#define ST_SORT sort_hashes
#define ST_ELEMENT_TYPE uint64
#define ST_COMPARE(a, b) ((*(a) > *(b)) - (*(a) < *(b)))
#define ST_SCOPE static
#define ST_DEFINE
#include <lib/sort_template.h>

/*
* Checks whether any element of the given array can be present in the given
* bloom filter. This is used for predicate pushdown for x = any(array[...]).
* The SQL signature is:
* _timescaledb_functions.bloom1_contains_any(bloom1, anyarray)
*/
Datum
bloom1_contains_any(PG_FUNCTION_ARGS)
{
Bloom1ContainsContext *context =
bloom1_contains_context_prepare(fcinfo, /* use_element_type = */ true);

/*
* This function is not strict, because if we don't have a bloom filter, this
* means the condition can potentially be true.
*/
struct varlena *bloom = context->current_row_bloom;
if (bloom == NULL)
{
PG_RETURN_BOOL(true);
}

/*
* A null value cannot match the equality condition, although this probably
* should be optimized away by the planner.
*/
if (PG_ARGISNULL(1))
{
PG_RETURN_BOOL(false);
}

int num_items;
Datum *items;
bool *nulls;
deconstruct_array(PG_GETARG_ARRAYTYPE_P(1),
context->element_type,
context->element_typlen,
context->element_typbyval,
context->element_typalign,
&items,
&nulls,
&num_items);

if (num_items == 0)
{
PG_RETURN_BOOL(false);
}

/*
* Calculate the per-item base hashes that will be used for computing the
* individual bloom filter bit offsets. We can reuse the "items" space to
* avoid more allocations, but have to allocate as a fallback on 32-bit
* systems.
*/
#if FLOAT8PASSBYVAL
uint64 *item_base_hashes = items;
#else
uint64 *item_base_hashes = palloc(sizeof(uint64) * num_items);
#endif

FmgrInfo *finfo = context->hash_function_finfo;
PGFunction hash_fn = context->hash_function_pointer;

int valid = 0;
for (int i = 0; i < num_items; i++)
{
if (nulls[i])
{
/*
* A null value cannot match the equality condition.
*/
continue;
}

item_base_hashes[valid++] = calculate_hash(hash_fn, finfo, items[i]);
}

if (valid == 0)
{
/*
* No non-null elements.
*/
PG_RETURN_BOOL(false);
}

/*
* Sort the hashes for cache-friendly probing.
*/
sort_hashes(item_base_hashes, valid);

/*
* Get the bloom filter parameters.
*/
const char *words_buf = bloom1_words_buf(bloom);
const uint32 num_bits = bloom1_num_bits(bloom);

/* Must be a power of two. */
CheckCompressedData(num_bits == (1ULL << pg_leftmost_one_pos32(num_bits)));

/* Must be >= 64 bits. */
CheckCompressedData(num_bits >= 64);

const uint32 num_word_bits = sizeof(*words_buf) * 8;
Assert(num_bits % num_word_bits == 0);
const uint32 log2_word_bits = pg_leftmost_one_pos32(num_word_bits);
Assert(num_word_bits == (1ULL << log2_word_bits));

const uint32 word_mask = num_word_bits - 1;
Assert((word_mask >> num_word_bits) == 0);

const uint32 absolute_mask = num_bits - 1;

/* Probe the bloom filter. */
for (int item_index = 0; item_index < valid; item_index++)
{
const uint64 base_hash = item_base_hashes[item_index];
bool match = true;
for (int i = 0; i < BLOOM1_HASHES; i++)
{
const uint32 absolute_bit_index = bloom1_get_one_offset(base_hash, i) & absolute_mask;
const uint32 word_index = absolute_bit_index >> log2_word_bits;
const uint32 word_bit_index = absolute_bit_index & word_mask;
if ((words_buf[word_index] & (1ULL << word_bit_index)) == 0)
{
match = false;
break;
}
}
if (match)
{
PG_RETURN_BOOL(true);
}
}

PG_RETURN_BOOL(false);
}

static int
bloom1_varlena_alloc_size(int num_bits)
{
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/compression/sparse_index_bloom1.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@

Datum bloom1_contains(PG_FUNCTION_ARGS);

Datum bloom1_contains_any(PG_FUNCTION_ARGS);

PGFunction bloom1_get_hash_function(Oid type, FmgrInfo **finfo);
1 change: 1 addition & 0 deletions tsl/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ CrossModuleFunctions tsl_cm_functions = {
.uuid_compressor_append = tsl_uuid_compressor_append,
.uuid_compressor_finish = tsl_uuid_compressor_finish,
.bloom1_contains = bloom1_contains,
.bloom1_contains_any = bloom1_contains_any,
.bloom1_get_hash_function = bloom1_get_hash_function,
.process_compress_table = tsl_process_compress_table,
.process_altertable_cmd = tsl_process_altertable_cmd,
Expand Down
Loading
Loading