Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/bloom-contains-any
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #8465 Speed up the filters like `x = any(array[...])` using bloom filter sparse indexes.
5 changes: 5 additions & 0 deletions sql/sparse_index.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ RETURNS bool
AS '@MODULE_PATHNAME@', 'ts_bloom1_contains'
LANGUAGE C IMMUTABLE PARALLEL SAFE;

CREATE OR REPLACE FUNCTION _timescaledb_functions.bloom1_contains_any(_timescaledb_internal.bloom1, anyarray)
RETURNS bool
AS '@MODULE_PATHNAME@', 'ts_bloom1_contains_any'
LANGUAGE C IMMUTABLE PARALLEL SAFE;

CREATE OR REPLACE FUNCTION _timescaledb_functions.jsonb_get_matching_index_entry(
config jsonb,
attr_name text,
Expand Down
5 changes: 5 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ DROP FUNCTION IF EXISTS ts_hypercore_proxy_handler;
DROP FUNCTION IF EXISTS ts_hypercore_handler;
DROP FUNCTION IF EXISTS _timescaledb_debug.is_compressed_tid;

CREATE FUNCTION _timescaledb_functions.bloom1_contains_any(_timescaledb_internal.bloom1, anyarray)
RETURNS bool
AS '@MODULE_PATHNAME@', 'ts_update_placeholder'
LANGUAGE C IMMUTABLE PARALLEL SAFE;

DROP PROCEDURE IF EXISTS _timescaledb_functions.policy_compression_execute;
DROP FUNCTION IF EXISTS @[email protected]_compression_policy;
DROP PROCEDURE IF EXISTS @[email protected]_columnstore_policy;
Expand Down
2 changes: 2 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ DEFAULT FOR TYPE int4 USING hypercore_proxy AS
CREATE FUNCTION _timescaledb_debug.is_compressed_tid(tid) RETURNS BOOL
AS '@MODULE_PATHNAME@', 'ts_update_placeholder' LANGUAGE C STRICT;

DROP FUNCTION _timescaledb_functions.bloom1_contains_any(_timescaledb_internal.bloom1, anyarray);

DROP PROCEDURE IF EXISTS _timescaledb_functions.policy_compression_execute;
DROP FUNCTION IF EXISTS @[email protected]_compression_policy;
DROP PROCEDURE IF EXISTS @[email protected]_columnstore_policy;
Expand Down
2 changes: 2 additions & 0 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ CROSSMODULE_WRAPPER(create_compressed_chunk);
CROSSMODULE_WRAPPER(compress_chunk);
CROSSMODULE_WRAPPER(decompress_chunk);
CROSSMODULE_WRAPPER(bloom1_contains);
CROSSMODULE_WRAPPER(bloom1_contains_any);

/* continuous aggregate */
CROSSMODULE_WRAPPER(continuous_agg_invalidation_trigger);
Expand Down Expand Up @@ -407,6 +408,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.uuid_compressor_append = error_no_default_fn_pg_community,
.uuid_compressor_finish = error_no_default_fn_pg_community,
.bloom1_contains = error_no_default_fn_pg_community,
.bloom1_contains_any = error_no_default_fn_pg_community,
.bloom1_get_hash_function = bloom1_get_hash_function_default,

.decompress_batches_for_insert = error_no_default_fn_chunk_insert_state_community,
Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ typedef struct CrossModuleFunctions
PGFunction uuid_compressor_append;
PGFunction uuid_compressor_finish;
PGFunction bloom1_contains;
PGFunction bloom1_contains_any;
PGFunction (*bloom1_get_hash_function)(Oid type, FmgrInfo **finfo);

PGFunction create_chunk;
Expand Down
144 changes: 144 additions & 0 deletions tsl/src/compression/batch_metadata_builder_bloom1.c
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,150 @@ bloom1_contains(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(true);
}

#define ST_SORT sort_hashes
#define ST_ELEMENT_TYPE uint64
#define ST_COMPARE(a, b) ((*(a) > *(b)) - (*(a) < *(b)))
#define ST_SCOPE static
#define ST_DEFINE
#include <lib/sort_template.h>

/*
* Checks whether any element of the given array can be present in the given
* bloom filter. This is used for predicate pushdown for x = any(array[...]).
* The SQL signature is:
* _timescaledb_functions.bloom1_contains_any(bloom1, anyarray)
*/
Datum
bloom1_contains_any(PG_FUNCTION_ARGS)
{
Bloom1ContainsContext *context =
bloom1_contains_context_prepare(fcinfo, /* use_element_type = */ true);

/*
* This function is not strict, because if we don't have a bloom filter, this
* means the condition can potentially be true.
*/
struct varlena *bloom = context->current_row_bloom;
if (bloom == NULL)
{
PG_RETURN_BOOL(true);
}

/*
* A null value cannot match the equality condition, although this probably
* should be optimized away by the planner.
*/
if (PG_ARGISNULL(1))
{
PG_RETURN_BOOL(false);
}

int num_items;
Datum *items;
bool *nulls;
deconstruct_array(PG_GETARG_ARRAYTYPE_P(1),
context->element_type,
context->element_typlen,
context->element_typbyval,
context->element_typalign,
&items,
&nulls,
&num_items);

if (num_items == 0)
{
PG_RETURN_BOOL(false);
}

/*
* Calculate the per-item base hashes that will be used for computing the
* individual bloom filter bit offsets. We can reuse the "items" space to
* avoid more allocations, but have to allocate as a fallback on 32-bit
* systems.
*/
#if FLOAT8PASSBYVAL
uint64 *item_base_hashes = items;
#else
uint64 *item_base_hashes = palloc(sizeof(uint64) * num_items);
#endif

FmgrInfo *finfo = context->hash_function_finfo;
PGFunction hash_fn = context->hash_function_pointer;

int valid = 0;
for (int i = 0; i < num_items; i++)
{
if (nulls[i])
{
/*
* A null value cannot match the equality condition.
*/
continue;
}

item_base_hashes[valid++] = calculate_hash(hash_fn, finfo, items[i]);
}

if (valid == 0)
{
/*
* No non-null elements.
*/
PG_RETURN_BOOL(false);
}

/*
* Sort the hashes for cache-friendly probing.
*/
sort_hashes(item_base_hashes, valid);

/*
* Get the bloom filter parameters.
*/
const char *words_buf = bloom1_words_buf(bloom);
const uint32 num_bits = bloom1_num_bits(bloom);

/* Must be a power of two. */
CheckCompressedData(num_bits == (1ULL << pg_leftmost_one_pos32(num_bits)));

/* Must be >= 64 bits. */
CheckCompressedData(num_bits >= 64);

const uint32 num_word_bits = sizeof(*words_buf) * 8;
Assert(num_bits % num_word_bits == 0);
const uint32 log2_word_bits = pg_leftmost_one_pos32(num_word_bits);
Assert(num_word_bits == (1ULL << log2_word_bits));

const uint32 word_mask = num_word_bits - 1;
Assert((word_mask >> num_word_bits) == 0);

const uint32 absolute_mask = num_bits - 1;

/* Probe the bloom filter. */
for (int item_index = 0; item_index < valid; item_index++)
{
const uint64 base_hash = item_base_hashes[item_index];
bool match = true;
for (int i = 0; i < BLOOM1_HASHES; i++)
{
const uint32 absolute_bit_index = bloom1_get_one_offset(base_hash, i) & absolute_mask;
const uint32 word_index = absolute_bit_index >> log2_word_bits;
const uint32 word_bit_index = absolute_bit_index & word_mask;
if ((words_buf[word_index] & (1ULL << word_bit_index)) == 0)
{
match = false;
break;
}
}
if (match)
{
PG_RETURN_BOOL(true);
}
}

PG_RETURN_BOOL(false);
}

static int
bloom1_varlena_alloc_size(int num_bits)
{
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/compression/sparse_index_bloom1.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@

Datum bloom1_contains(PG_FUNCTION_ARGS);

Datum bloom1_contains_any(PG_FUNCTION_ARGS);

PGFunction bloom1_get_hash_function(Oid type, FmgrInfo **finfo);
1 change: 1 addition & 0 deletions tsl/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ CrossModuleFunctions tsl_cm_functions = {
.uuid_compressor_append = tsl_uuid_compressor_append,
.uuid_compressor_finish = tsl_uuid_compressor_finish,
.bloom1_contains = bloom1_contains,
.bloom1_contains_any = bloom1_contains_any,
.bloom1_get_hash_function = bloom1_get_hash_function,
.process_compress_table = tsl_process_compress_table,
.process_altertable_cmd = tsl_process_altertable_cmd,
Expand Down
138 changes: 138 additions & 0 deletions tsl/src/nodes/decompress_chunk/qual_pushdown.c
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,127 @@ pushdown_op_to_segment_meta_bloom1(QualPushdownContext *context, OpExpr *orig_op
COERCE_EXPLICIT_CALL);
}

/*
* Try to transform x = any(array[]) into bloom1_contains_any(bloom_x, array[]).
*/
static void *
pushdown_saop_bloom1(QualPushdownContext *context, ScalarArrayOpExpr *orig_saop)
{
/*
* This always requires rechecking the decompressed data.
*/
context->needs_recheck = true;

if (!orig_saop->useOr)
{
context->can_pushdown = false;
return orig_saop;
}

List *expr_args = orig_saop->args;
if (list_length(expr_args) != 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can ScalarArrayOpExpr have other than 2 args? Maybe Assert would be better?

{
context->can_pushdown = false;
return orig_saop;
}

Expr *orig_leftop = linitial(expr_args);
Expr *orig_rightop = lsecond(expr_args);

if (IsA(orig_leftop, RelabelType))
orig_leftop = ((RelabelType *) orig_leftop)->arg;
if (IsA(orig_rightop, RelabelType))
orig_rightop = ((RelabelType *) orig_rightop)->arg;

/*
* For scalar array operation, we expect a var on the left side.
*/
AttrNumber bloom1_attno = InvalidAttrNumber;
expr_fetch_bloom1_metadata(context, orig_leftop, &bloom1_attno);
if (bloom1_attno == InvalidAttrNumber)
{
/* No metadata for left operand. */
context->can_pushdown = false;
return orig_saop;
}

Var *var_with_segment_meta = castNode(Var, orig_leftop);

/*
* Play it safe and don't push down if the operator collation doesn't match
* the column collation.
*/
Oid op_collation = orig_saop->inputcollid;
if (var_with_segment_meta->varcollid != op_collation)
{
context->can_pushdown = false;
return orig_saop;
}

/*
* We cannot use bloom filters for non-deterministic collations.
*/
if (OidIsValid(op_collation) && !get_collation_isdeterministic(op_collation))
{
context->can_pushdown = false;
return orig_saop;
}

/*
* We only support hashable equality operators.
*/
const Oid op_oid = orig_saop->opno;
TypeCacheEntry *tce =
lookup_type_cache(var_with_segment_meta->vartype, TYPECACHE_HASH_OPFAMILY);
const int strategy = get_op_opfamily_strategy(op_oid, tce->hash_opf);
if (strategy != HTEqualStrategyNumber)
{
context->can_pushdown = false;
return orig_saop;
}

/*
* The hash equality operators are supposed to be strict.
*/
Assert(op_strict(op_oid));

/*
* Check if the righthand expression is safe to push down. We cannot combine
* it with the original operator if there can be false negatives.
*/
QualPushdownContext tmp_context = copy_context(context);
Expr *pushed_down_rightop = (Expr *) qual_pushdown_mutator((Node *) orig_rightop, &tmp_context);
if (!tmp_context.can_pushdown || tmp_context.needs_recheck)
{
context->can_pushdown = false;
return orig_saop;
}
Assert(pushed_down_rightop != NULL);

/*
* var = any(array) implies bloom1_contains_any(var_bloom, array).
*/
Var *bloom_var = makeVar(context->compressed_rel->relid,
bloom1_attno,
ts_custom_type_cache_get(CUSTOM_TYPE_BLOOM1)->type_oid,
-1,
InvalidOid,
0);

Oid func = LookupFuncName(list_make2(makeString("_timescaledb_functions"),
makeString("bloom1_contains_any")),
/* nargs = */ -1,
/* argtypes = */ (void *) -1,
/* missing_ok = */ false);

return makeFuncExpr(func,
BOOLOID,
list_make2(bloom_var, pushed_down_rightop),
/* funccollid = */ InvalidOid,
/* inputcollid = */ InvalidOid,
COERCE_EXPLICIT_CALL);
}

static bool
contain_volatile_functions_checker(Oid func_id, void *context)
{
Expand Down Expand Up @@ -655,6 +776,23 @@ qual_pushdown_mutator(Node *orig_node, QualPushdownContext *context)
return pushed_down;
}

ScalarArrayOpExpr *saop = castNode(ScalarArrayOpExpr, orig_node);

/*
* Try to transform x = any(array[]) into
* bloom1_contains_any(bloom_x, array[]).
*/
if (ts_guc_enable_sparse_index_bloom)
{
tmp_context = *context;
pushed_down = pushdown_saop_bloom1(&tmp_context, saop);
if (tmp_context.can_pushdown)
{
context->needs_recheck |= tmp_context.needs_recheck;
return pushed_down;
}
}

/*
* No other ways to push it down, so consider it failed.
*/
Expand Down
Loading
Loading