Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/pr_8738
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #8738 Add function to migrate between WAL-based and trigger-based invalidation collection
2 changes: 1 addition & 1 deletion sql/cagg_api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ DECLARE
BEGIN
SELECT ht.id AS hypertable_id,
di.column_type::regtype,
EXISTS(SELECT FROM _timescaledb_catalog.continuous_agg where raw_hypertable_id = ht.id) AS has_cagg
EXISTS(SELECT FROM _timescaledb_catalog.continuous_agg WHERE raw_hypertable_id = ht.id) AS has_cagg
INTO info
FROM _timescaledb_catalog.hypertable ht
JOIN _timescaledb_catalog.dimension di ON ht.id = di.hypertable_id
Expand Down
7 changes: 7 additions & 0 deletions sql/cagg_migrate.sql
Original file line number Diff line number Diff line change
Expand Up @@ -674,3 +674,10 @@ $BODY$;
-- into a CAgg using the regular time_bucket function
CREATE OR REPLACE PROCEDURE _timescaledb_functions.cagg_migrate_to_time_bucket(cagg REGCLASS)
AS '@MODULE_PATHNAME@', 'ts_continuous_agg_migrate_to_time_bucket' LANGUAGE C;

-- Migrate all continuous aggregates attached to a hypertable to use
-- WAL-based invalidation or trigger-based invalidation.
CREATE OR REPLACE PROCEDURE _timescaledb_functions.set_invalidation_method(
method NAME,
hypertable REGCLASS
) AS '@MODULE_PATHNAME@', 'ts_continuous_agg_set_invalidation_method' LANGUAGE C;
7 changes: 3 additions & 4 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

#define CROSSMODULE_WRAPPER(func) \
TS_FUNCTION_INFO_V1(ts_##func); \
Datum ts_##func(PG_FUNCTION_ARGS) \
{ \
PG_RETURN_DATUM(ts_cm_functions->func(fcinfo)); \
}
Datum ts_##func(PG_FUNCTION_ARGS) { PG_RETURN_DATUM(ts_cm_functions->func(fcinfo)); }

/* bgw policy functions */
CROSSMODULE_WRAPPER(policy_compression_add);
Expand Down Expand Up @@ -94,6 +91,7 @@ CROSSMODULE_WRAPPER(bloom1_contains);
CROSSMODULE_WRAPPER(continuous_agg_invalidation_trigger);
CROSSMODULE_WRAPPER(continuous_agg_refresh);
CROSSMODULE_WRAPPER(continuous_agg_process_hypertable_invalidations);
CROSSMODULE_WRAPPER(continuous_agg_set_invalidation_method);
CROSSMODULE_WRAPPER(continuous_agg_validate_query);
CROSSMODULE_WRAPPER(continuous_agg_get_bucket_function);
CROSSMODULE_WRAPPER(continuous_agg_get_bucket_function_info);
Expand Down Expand Up @@ -364,6 +362,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.continuous_agg_process_hypertable_invalidations = error_no_default_fn_pg_community,
.continuous_agg_invalidate_raw_ht = continuous_agg_invalidate_raw_ht_all_default,
.continuous_agg_invalidate_mat_ht = continuous_agg_invalidate_mat_ht_all_default,
.continuous_agg_set_invalidation_method = error_no_default_fn_pg_community,
.continuous_agg_update_options = continuous_agg_update_options_default,
.continuous_agg_validate_query = error_no_default_fn_pg_community,
.continuous_agg_get_bucket_function = error_no_default_fn_pg_community,
Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ typedef struct CrossModuleFunctions
PGFunction continuous_agg_invalidation_trigger;
PGFunction continuous_agg_refresh;
PGFunction continuous_agg_process_hypertable_invalidations;
PGFunction continuous_agg_set_invalidation_method;
void (*continuous_agg_invalidate_raw_ht)(const Hypertable *raw_ht, int64 start, int64 end);
void (*continuous_agg_invalidate_mat_ht)(const Hypertable *raw_ht, const Hypertable *mat_ht,
int64 start, int64 end);
Expand Down
2 changes: 1 addition & 1 deletion src/ts_catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,7 @@ extern bool ts_is_catalog_table(Oid relid);

/* Functions should operate on a passed-in Catalog struct */
static inline Oid
catalog_get_table_id(Catalog *catalog, CatalogTable tableid)
catalog_get_table_id(const Catalog *catalog, CatalogTable tableid)
{
return catalog->tables[tableid].id;
}
Expand Down
13 changes: 7 additions & 6 deletions src/ts_catalog/continuous_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ init_materialization_invalidation_log_scan_by_materialization_id(ScanIterator *i
Int32GetDatum(materialization_id));
}

static int32
number_of_continuous_aggs_attached(int32 raw_hypertable_id)
int32
ts_number_of_continuous_aggs_attached(int32 raw_hypertable_id)
{
ScanIterator iterator =
ts_scan_iterator_create(CONTINUOUS_AGG, AccessShareLock, CurrentMemoryContext);
Expand Down Expand Up @@ -537,8 +537,8 @@ ts_continuous_agg_get_all_caggs_info(int32 raw_hypertable_id)
* A hypertable is using the WAL-based invalidation collection if it has a
* attached continuous aggregate but does not have an invalidation trigger.
*/
static bool
hypertable_invalidation_slot_used(void)
bool
ts_hypertable_invalidation_slot_used(void)
{
ScanIterator iterator =
ts_scan_iterator_create(CONTINUOUS_AGG, AccessShareLock, CurrentMemoryContext);
Expand Down Expand Up @@ -910,7 +910,7 @@ drop_continuous_agg(FormData_continuous_agg *cadata, bool drop_user_view)

raw_hypertable_has_other_caggs =
OidIsValid(raw_hypertable.objectId) &&
number_of_continuous_aggs_attached(cadata->raw_hypertable_id) > 1;
ts_number_of_continuous_aggs_attached(cadata->raw_hypertable_id) > 1;

if (!raw_hypertable_has_other_caggs)
{
Expand Down Expand Up @@ -992,7 +992,8 @@ drop_continuous_agg(FormData_continuous_agg *cadata, bool drop_user_view)
*/
char slot_name[TS_INVALIDATION_SLOT_NAME_MAX];
ts_get_invalidation_replication_slot_name(slot_name, sizeof(slot_name));
if (!hypertable_invalidation_slot_used() && SearchNamedReplicationSlot(slot_name, true) != NULL)
if (!ts_hypertable_invalidation_slot_used() &&
SearchNamedReplicationSlot(slot_name, true) != NULL)
ts_hypertable_drop_invalidation_replication_slot(slot_name);

if (OidIsValid(mat_hypertable.objectId))
Expand Down
2 changes: 2 additions & 0 deletions src/ts_catalog/continuous_agg.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_view_name(const char
ContinuousAggViewType type);
extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_relid(Oid relid);
extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_rv(const RangeVar *rv);
extern TSDLLEXPORT bool ts_hypertable_invalidation_slot_used(void);

extern bool ts_continuous_agg_drop(const char *view_schema, const char *view_name);
extern void ts_continuous_agg_drop_hypertable_callback(int32 hypertable_id);
Expand Down Expand Up @@ -214,3 +215,4 @@ ts_continuous_agg_fixed_bucket_width(const ContinuousAggBucketFunction *bucket_f
extern TSDLLEXPORT int64
ts_continuous_agg_bucket_width(const ContinuousAggBucketFunction *bucket_function);
extern TSDLLEXPORT void ts_get_invalidation_replication_slot_name(char *slotname, Size szslot);
extern TSDLLEXPORT int32 ts_number_of_continuous_aggs_attached(int32 raw_hypertable_id);
29 changes: 29 additions & 0 deletions tsl/src/continuous_aggs/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1653,3 +1653,32 @@ cagg_find_groupingcols(ContinuousAgg *agg, Hypertable *mat_ht)
}
return retlist;
}

/*
* Get all hypertables that are using WAL-invalidations.
*/
List *
get_all_wal_using_hypertables(void)
{
ScanIterator iterator =
ts_scan_iterator_create(CONTINUOUS_AGG, AccessShareLock, CurrentMemoryContext);
List *hypertables = NIL;

/* Collect OID of all tables using continuous aggregates */
ts_scanner_foreach(&iterator)
{
bool isnull;
Datum datum = slot_getattr(ts_scan_iterator_slot(&iterator),
Anum_continuous_agg_raw_hypertable_id,
&isnull);

Assert(!isnull);
int32 hypertable_id = DatumGetInt32(datum);
Oid relid = ts_hypertable_id_to_relid(hypertable_id, false);
if (!has_invalidation_trigger(relid))
hypertables = list_append_unique_int(hypertables, hypertable_id);
}
ts_scan_iterator_close(&iterator);

return hypertables;
}
3 changes: 2 additions & 1 deletion tsl/src/continuous_aggs/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,5 @@ cagg_get_time_min(const ContinuousAgg *cagg)
return ts_time_get_min(cagg->partition_type);
}

ContinuousAggBucketFunction *ts_cagg_get_bucket_function_info(Oid view_oid);
extern ContinuousAggBucketFunction *ts_cagg_get_bucket_function_info(Oid view_oid);
extern List *get_all_wal_using_hypertables(void);
30 changes: 8 additions & 22 deletions tsl/src/continuous_aggs/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ static void create_bucket_function_catalog_entry(int32 matht_id, Oid bucket_func
const bool bucket_fixed_width);
static void cagg_create_hypertable(int32 hypertable_id, Oid mat_tbloid, const char *matpartcolname,
int64 mat_tbltimecol_interval);
static void cagg_add_trigger_hypertable(Oid relid, int32 hypertable_id);
static void mattablecolumninfo_add_mattable_index(MaterializationHypertableColumnInfo *matcolinfo,
Hypertable *ht);
static ObjectAddress create_view_for_query(Query *selquery, RangeVar *viewrel);
Expand Down Expand Up @@ -359,8 +358,8 @@ cagg_create_hypertable(int32 hypertable_id, Oid mat_tbloid, const char *matpartc
* created in LogicalDecodingProcessRecord() trying to allocate too much
* memory if you have too many other concurrent transactions in progress.
*/
static void
cagg_add_logical_decoding_slot_prepare(const char *slot_name)
void
ts_cagg_add_logical_decoding_slot_prepare(const char *slot_name)
{
CatalogSecurityContext sec_ctx;
LogicalDecodingContext *ctx = NULL;
Expand Down Expand Up @@ -411,8 +410,8 @@ cagg_add_logical_decoding_slot_prepare(const char *slot_name)
ts_catalog_restore_user(&sec_ctx);
}

static void
cagg_add_logical_decoding_slot_finalize(void)
void
ts_cagg_add_logical_decoding_slot_finalize(void)
{
TS_DEBUG_LOG("persist invalidation slot");
ReplicationSlotPersist();
Expand All @@ -425,7 +424,7 @@ cagg_add_logical_decoding_slot_finalize(void)
* hypertableid - argument to pass to trigger
* (the hypertable id from timescaledb catalog)
*/
static void
void
cagg_add_trigger_hypertable(Oid relid, int32 hypertable_id)
{
char hypertable_id_str[12];
Expand Down Expand Up @@ -720,12 +719,6 @@ fixup_userview_query_tlist(Query *userquery, List *tlist_aliases)
}
}

static const char *invalidate_using_info[] = {
[ContinuousAggInvalidateUsingDefault] = "default",
[ContinuousAggInvalidateUsingTrigger] = "trigger",
[ContinuousAggInvalidateUsingWal] = "wal",
};

static ContinuousAggInvalidateUsing
get_invalidate_using(WithClauseResult *with_clause_options)
{
Expand All @@ -734,14 +727,7 @@ get_invalidate_using(WithClauseResult *with_clause_options)

const char *invalidate_using = text_to_cstring(
DatumGetTextP(with_clause_options[CreateMaterializedViewFlagInvalidateUsing].parsed));

for (size_t i = 0; i < sizeof(invalidate_using_info) / sizeof(*invalidate_using_info); ++i)
if (strcmp(invalidate_using_info[i], invalidate_using) == 0)
return i;
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("unrecognized value \"%s\" for invalidate_using", invalidate_using));
return -1; /* To keep linter happy */
return invalidation_parse_using(invalidate_using);
}

static void
Expand Down Expand Up @@ -905,7 +891,7 @@ cagg_create(const CreateTableAsStmt *create_stmt, ViewStmt *stmt, Query *panquer
if (invalidate_using == ContinuousAggInvalidateUsingWal &&
SearchNamedReplicationSlot(slot_name, true) == NULL)
{
cagg_add_logical_decoding_slot_prepare(slot_name);
ts_cagg_add_logical_decoding_slot_prepare(slot_name);
slot_prepared = true;
}

Expand Down Expand Up @@ -1045,7 +1031,7 @@ cagg_create(const CreateTableAsStmt *create_stmt, ViewStmt *stmt, Query *panquer
if (invalidate_using == ContinuousAggInvalidateUsingTrigger)
cagg_add_trigger_hypertable(bucket_info->htoid, bucket_info->htid);
else if (slot_prepared)
cagg_add_logical_decoding_slot_finalize();
ts_cagg_add_logical_decoding_slot_finalize();
}

DDLResult
Expand Down
4 changes: 4 additions & 0 deletions tsl/src/continuous_aggs/create.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,9 @@
DDLResult tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void *pstmt,
WithClauseResult *with_clause_options);

extern void ts_cagg_add_logical_decoding_slot_finalize(void);
extern void ts_cagg_add_logical_decoding_slot_prepare(const char *slot_name);

extern void cagg_flip_realtime_view_definition(ContinuousAgg *agg, Hypertable *mat_ht);
extern void cagg_rename_view_columns(ContinuousAgg *agg);
extern void cagg_add_trigger_hypertable(Oid relid, int32 hypertable_id);
Loading
Loading