diff --git a/.unreleased/pr_8738 b/.unreleased/pr_8738 new file mode 100644 index 00000000000..a2278201455 --- /dev/null +++ b/.unreleased/pr_8738 @@ -0,0 +1 @@ +Implements: #8738 Add function to migrate between WAL-based and trigger-based invalidation collection diff --git a/sql/cagg_api.sql b/sql/cagg_api.sql index 483d73a8d22..e919a077412 100644 --- a/sql/cagg_api.sql +++ b/sql/cagg_api.sql @@ -42,7 +42,7 @@ DECLARE BEGIN SELECT ht.id AS hypertable_id, di.column_type::regtype, - EXISTS(SELECT FROM _timescaledb_catalog.continuous_agg where raw_hypertable_id = ht.id) AS has_cagg + EXISTS(SELECT FROM _timescaledb_catalog.continuous_agg WHERE raw_hypertable_id = ht.id) AS has_cagg INTO info FROM _timescaledb_catalog.hypertable ht JOIN _timescaledb_catalog.dimension di ON ht.id = di.hypertable_id diff --git a/sql/cagg_migrate.sql b/sql/cagg_migrate.sql index 064a40531a7..170c7a3988e 100644 --- a/sql/cagg_migrate.sql +++ b/sql/cagg_migrate.sql @@ -674,3 +674,10 @@ $BODY$; -- into a CAgg using the regular time_bucket function CREATE OR REPLACE PROCEDURE _timescaledb_functions.cagg_migrate_to_time_bucket(cagg REGCLASS) AS '@MODULE_PATHNAME@', 'ts_continuous_agg_migrate_to_time_bucket' LANGUAGE C; + +-- Migrate all continuous aggregates attached to a hypertable to use +-- WAL-based invalidation or trigger-based invalidation. +CREATE OR REPLACE PROCEDURE _timescaledb_functions.set_invalidation_method( + method NAME, + hypertable REGCLASS +) AS '@MODULE_PATHNAME@', 'ts_continuous_agg_set_invalidation_method' LANGUAGE C; diff --git a/src/cross_module_fn.c b/src/cross_module_fn.c index 346ddd8e2f4..a2df30e536e 100644 --- a/src/cross_module_fn.c +++ b/src/cross_module_fn.c @@ -17,10 +17,7 @@ #define CROSSMODULE_WRAPPER(func) \ TS_FUNCTION_INFO_V1(ts_##func); \ - Datum ts_##func(PG_FUNCTION_ARGS) \ - { \ - PG_RETURN_DATUM(ts_cm_functions->func(fcinfo)); \ - } + Datum ts_##func(PG_FUNCTION_ARGS) { PG_RETURN_DATUM(ts_cm_functions->func(fcinfo)); } /* bgw policy functions */ CROSSMODULE_WRAPPER(policy_compression_add); @@ -94,6 +91,7 @@ CROSSMODULE_WRAPPER(bloom1_contains); CROSSMODULE_WRAPPER(continuous_agg_invalidation_trigger); CROSSMODULE_WRAPPER(continuous_agg_refresh); CROSSMODULE_WRAPPER(continuous_agg_process_hypertable_invalidations); +CROSSMODULE_WRAPPER(continuous_agg_set_invalidation_method); CROSSMODULE_WRAPPER(continuous_agg_validate_query); CROSSMODULE_WRAPPER(continuous_agg_get_bucket_function); CROSSMODULE_WRAPPER(continuous_agg_get_bucket_function_info); @@ -364,6 +362,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .continuous_agg_process_hypertable_invalidations = error_no_default_fn_pg_community, .continuous_agg_invalidate_raw_ht = continuous_agg_invalidate_raw_ht_all_default, .continuous_agg_invalidate_mat_ht = continuous_agg_invalidate_mat_ht_all_default, + .continuous_agg_set_invalidation_method = error_no_default_fn_pg_community, .continuous_agg_update_options = continuous_agg_update_options_default, .continuous_agg_validate_query = error_no_default_fn_pg_community, .continuous_agg_get_bucket_function = error_no_default_fn_pg_community, diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index 6a7da9d7ca3..5c524546091 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -106,6 +106,7 @@ typedef struct CrossModuleFunctions PGFunction continuous_agg_invalidation_trigger; PGFunction continuous_agg_refresh; PGFunction continuous_agg_process_hypertable_invalidations; + PGFunction continuous_agg_set_invalidation_method; void (*continuous_agg_invalidate_raw_ht)(const Hypertable *raw_ht, int64 start, int64 end); void (*continuous_agg_invalidate_mat_ht)(const Hypertable *raw_ht, const Hypertable *mat_ht, int64 start, int64 end); diff --git a/src/ts_catalog/catalog.h b/src/ts_catalog/catalog.h index 69177f6be8c..5cf895619c4 100644 --- a/src/ts_catalog/catalog.h +++ b/src/ts_catalog/catalog.h @@ -1333,7 +1333,7 @@ extern bool ts_is_catalog_table(Oid relid); /* Functions should operate on a passed-in Catalog struct */ static inline Oid -catalog_get_table_id(Catalog *catalog, CatalogTable tableid) +catalog_get_table_id(const Catalog *catalog, CatalogTable tableid) { return catalog->tables[tableid].id; } diff --git a/src/ts_catalog/continuous_agg.c b/src/ts_catalog/continuous_agg.c index f375b4d142f..43c7ce27092 100644 --- a/src/ts_catalog/continuous_agg.c +++ b/src/ts_catalog/continuous_agg.c @@ -155,8 +155,8 @@ init_materialization_invalidation_log_scan_by_materialization_id(ScanIterator *i Int32GetDatum(materialization_id)); } -static int32 -number_of_continuous_aggs_attached(int32 raw_hypertable_id) +int32 +ts_number_of_continuous_aggs_attached(int32 raw_hypertable_id) { ScanIterator iterator = ts_scan_iterator_create(CONTINUOUS_AGG, AccessShareLock, CurrentMemoryContext); @@ -537,8 +537,8 @@ ts_continuous_agg_get_all_caggs_info(int32 raw_hypertable_id) * A hypertable is using the WAL-based invalidation collection if it has a * attached continuous aggregate but does not have an invalidation trigger. */ -static bool -hypertable_invalidation_slot_used(void) +bool +ts_hypertable_invalidation_slot_used(void) { ScanIterator iterator = ts_scan_iterator_create(CONTINUOUS_AGG, AccessShareLock, CurrentMemoryContext); @@ -910,7 +910,7 @@ drop_continuous_agg(FormData_continuous_agg *cadata, bool drop_user_view) raw_hypertable_has_other_caggs = OidIsValid(raw_hypertable.objectId) && - number_of_continuous_aggs_attached(cadata->raw_hypertable_id) > 1; + ts_number_of_continuous_aggs_attached(cadata->raw_hypertable_id) > 1; if (!raw_hypertable_has_other_caggs) { @@ -992,7 +992,8 @@ drop_continuous_agg(FormData_continuous_agg *cadata, bool drop_user_view) */ char slot_name[TS_INVALIDATION_SLOT_NAME_MAX]; ts_get_invalidation_replication_slot_name(slot_name, sizeof(slot_name)); - if (!hypertable_invalidation_slot_used() && SearchNamedReplicationSlot(slot_name, true) != NULL) + if (!ts_hypertable_invalidation_slot_used() && + SearchNamedReplicationSlot(slot_name, true) != NULL) ts_hypertable_drop_invalidation_replication_slot(slot_name); if (OidIsValid(mat_hypertable.objectId)) diff --git a/src/ts_catalog/continuous_agg.h b/src/ts_catalog/continuous_agg.h index 308b5f15dd4..15620ffdfc0 100644 --- a/src/ts_catalog/continuous_agg.h +++ b/src/ts_catalog/continuous_agg.h @@ -177,6 +177,7 @@ extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_view_name(const char ContinuousAggViewType type); extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_relid(Oid relid); extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_rv(const RangeVar *rv); +extern TSDLLEXPORT bool ts_hypertable_invalidation_slot_used(void); extern bool ts_continuous_agg_drop(const char *view_schema, const char *view_name); extern void ts_continuous_agg_drop_hypertable_callback(int32 hypertable_id); @@ -214,3 +215,4 @@ ts_continuous_agg_fixed_bucket_width(const ContinuousAggBucketFunction *bucket_f extern TSDLLEXPORT int64 ts_continuous_agg_bucket_width(const ContinuousAggBucketFunction *bucket_function); extern TSDLLEXPORT void ts_get_invalidation_replication_slot_name(char *slotname, Size szslot); +extern TSDLLEXPORT int32 ts_number_of_continuous_aggs_attached(int32 raw_hypertable_id); diff --git a/tsl/src/continuous_aggs/common.c b/tsl/src/continuous_aggs/common.c index c11565b4620..7bc810e3417 100644 --- a/tsl/src/continuous_aggs/common.c +++ b/tsl/src/continuous_aggs/common.c @@ -1653,3 +1653,32 @@ cagg_find_groupingcols(ContinuousAgg *agg, Hypertable *mat_ht) } return retlist; } + +/* + * Get all hypertables that are using WAL-invalidations. + */ +List * +get_all_wal_using_hypertables(void) +{ + ScanIterator iterator = + ts_scan_iterator_create(CONTINUOUS_AGG, AccessShareLock, CurrentMemoryContext); + List *hypertables = NIL; + + /* Collect OID of all tables using continuous aggregates */ + ts_scanner_foreach(&iterator) + { + bool isnull; + Datum datum = slot_getattr(ts_scan_iterator_slot(&iterator), + Anum_continuous_agg_raw_hypertable_id, + &isnull); + + Assert(!isnull); + int32 hypertable_id = DatumGetInt32(datum); + Oid relid = ts_hypertable_id_to_relid(hypertable_id, false); + if (!has_invalidation_trigger(relid)) + hypertables = list_append_unique_int(hypertables, hypertable_id); + } + ts_scan_iterator_close(&iterator); + + return hypertables; +} diff --git a/tsl/src/continuous_aggs/common.h b/tsl/src/continuous_aggs/common.h index 6c26ab8dd4d..87af832457c 100644 --- a/tsl/src/continuous_aggs/common.h +++ b/tsl/src/continuous_aggs/common.h @@ -151,4 +151,5 @@ cagg_get_time_min(const ContinuousAgg *cagg) return ts_time_get_min(cagg->partition_type); } -ContinuousAggBucketFunction *ts_cagg_get_bucket_function_info(Oid view_oid); +extern ContinuousAggBucketFunction *ts_cagg_get_bucket_function_info(Oid view_oid); +extern List *get_all_wal_using_hypertables(void); diff --git a/tsl/src/continuous_aggs/create.c b/tsl/src/continuous_aggs/create.c index 4f57ccf8227..0bd8c435b35 100644 --- a/tsl/src/continuous_aggs/create.c +++ b/tsl/src/continuous_aggs/create.c @@ -97,7 +97,6 @@ static void create_bucket_function_catalog_entry(int32 matht_id, Oid bucket_func const bool bucket_fixed_width); static void cagg_create_hypertable(int32 hypertable_id, Oid mat_tbloid, const char *matpartcolname, int64 mat_tbltimecol_interval); -static void cagg_add_trigger_hypertable(Oid relid, int32 hypertable_id); static void mattablecolumninfo_add_mattable_index(MaterializationHypertableColumnInfo *matcolinfo, Hypertable *ht); static ObjectAddress create_view_for_query(Query *selquery, RangeVar *viewrel); @@ -359,8 +358,8 @@ cagg_create_hypertable(int32 hypertable_id, Oid mat_tbloid, const char *matpartc * created in LogicalDecodingProcessRecord() trying to allocate too much * memory if you have too many other concurrent transactions in progress. */ -static void -cagg_add_logical_decoding_slot_prepare(const char *slot_name) +void +ts_cagg_add_logical_decoding_slot_prepare(const char *slot_name) { CatalogSecurityContext sec_ctx; LogicalDecodingContext *ctx = NULL; @@ -411,8 +410,8 @@ cagg_add_logical_decoding_slot_prepare(const char *slot_name) ts_catalog_restore_user(&sec_ctx); } -static void -cagg_add_logical_decoding_slot_finalize(void) +void +ts_cagg_add_logical_decoding_slot_finalize(void) { TS_DEBUG_LOG("persist invalidation slot"); ReplicationSlotPersist(); @@ -425,7 +424,7 @@ cagg_add_logical_decoding_slot_finalize(void) * hypertableid - argument to pass to trigger * (the hypertable id from timescaledb catalog) */ -static void +void cagg_add_trigger_hypertable(Oid relid, int32 hypertable_id) { char hypertable_id_str[12]; @@ -720,12 +719,6 @@ fixup_userview_query_tlist(Query *userquery, List *tlist_aliases) } } -static const char *invalidate_using_info[] = { - [ContinuousAggInvalidateUsingDefault] = "default", - [ContinuousAggInvalidateUsingTrigger] = "trigger", - [ContinuousAggInvalidateUsingWal] = "wal", -}; - static ContinuousAggInvalidateUsing get_invalidate_using(WithClauseResult *with_clause_options) { @@ -734,14 +727,7 @@ get_invalidate_using(WithClauseResult *with_clause_options) const char *invalidate_using = text_to_cstring( DatumGetTextP(with_clause_options[CreateMaterializedViewFlagInvalidateUsing].parsed)); - - for (size_t i = 0; i < sizeof(invalidate_using_info) / sizeof(*invalidate_using_info); ++i) - if (strcmp(invalidate_using_info[i], invalidate_using) == 0) - return i; - ereport(ERROR, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("unrecognized value \"%s\" for invalidate_using", invalidate_using)); - return -1; /* To keep linter happy */ + return invalidation_parse_using(invalidate_using); } static void @@ -905,7 +891,7 @@ cagg_create(const CreateTableAsStmt *create_stmt, ViewStmt *stmt, Query *panquer if (invalidate_using == ContinuousAggInvalidateUsingWal && SearchNamedReplicationSlot(slot_name, true) == NULL) { - cagg_add_logical_decoding_slot_prepare(slot_name); + ts_cagg_add_logical_decoding_slot_prepare(slot_name); slot_prepared = true; } @@ -1045,7 +1031,7 @@ cagg_create(const CreateTableAsStmt *create_stmt, ViewStmt *stmt, Query *panquer if (invalidate_using == ContinuousAggInvalidateUsingTrigger) cagg_add_trigger_hypertable(bucket_info->htoid, bucket_info->htid); else if (slot_prepared) - cagg_add_logical_decoding_slot_finalize(); + ts_cagg_add_logical_decoding_slot_finalize(); } DDLResult diff --git a/tsl/src/continuous_aggs/create.h b/tsl/src/continuous_aggs/create.h index 673804f19a0..fc7ac4c75be 100644 --- a/tsl/src/continuous_aggs/create.h +++ b/tsl/src/continuous_aggs/create.h @@ -14,5 +14,9 @@ DDLResult tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void *pstmt, WithClauseResult *with_clause_options); +extern void ts_cagg_add_logical_decoding_slot_finalize(void); +extern void ts_cagg_add_logical_decoding_slot_prepare(const char *slot_name); + extern void cagg_flip_realtime_view_definition(ContinuousAgg *agg, Hypertable *mat_ht); extern void cagg_rename_view_columns(ContinuousAgg *agg); +extern void cagg_add_trigger_hypertable(Oid relid, int32 hypertable_id); diff --git a/tsl/src/continuous_aggs/invalidation.c b/tsl/src/continuous_aggs/invalidation.c index 540cb3691c8..a7a3c29023f 100644 --- a/tsl/src/continuous_aggs/invalidation.c +++ b/tsl/src/continuous_aggs/invalidation.c @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -24,12 +25,14 @@ #include #include #include +#include #include #include #include #include #include "cache.h" +#include "continuous_aggs/create.h" #include "continuous_aggs/invalidation_multi.h" #include "continuous_aggs/invalidation_threshold.h" #include "continuous_aggs/materialize.h" @@ -167,6 +170,12 @@ static void cagg_invalidation_state_cleanup(const ContinuousAggInvalidationState static void continuous_agg_process_hypertable_invalidations_single(Oid hypertable_relid); static void continuous_agg_process_hypertable_invalidations_multi(ArrayType *hypertable_array); +static const char *invalidate_using_info[] = { + [ContinuousAggInvalidateUsingDefault] = "default", + [ContinuousAggInvalidateUsingTrigger] = "trigger", + [ContinuousAggInvalidateUsingWal] = "wal", +}; + static Relation open_cagg_table(ContinuousAggTableType type, LOCKMODE lockmode) { @@ -1143,7 +1152,7 @@ invalidation_store_free(InvalidationStore *store) * It accepts NAME, REGCLASS, or REGCLASS[]. We need the NAME overload since * we do not want the user to have to write "'my_table'::regclass" when * calling it with a simple table name and the "fallback" case for implicit - * conversions is any type int he "String" type category. + * conversions is any type in the "String" type category. * * See https://www.postgresql.org/docs/current/typeconv-func.html#TYPECONV-FUNC */ @@ -1318,3 +1327,135 @@ continuous_agg_process_hypertable_invalidations_multi(ArrayType *hypertable_arra multi_invalidation_process_hypertable_log(hypertables); } + +ContinuousAggInvalidateUsing +invalidation_parse_using(const char *using) +{ + StringInfoData alts; + + for (size_t i = 0; i < lengthof(invalidate_using_info); ++i) + if (strcmp(invalidate_using_info[i], using) == 0) + return i; + + initStringInfo(&alts); + + for (size_t i = 0; i < lengthof(invalidate_using_info); ++i) + appendStringInfo(&alts, + "%s%s\"%s\"", + (i == 0 ? "" : ", "), + (i == lengthof(invalidate_using_info) - 1 ? "or " : ""), + invalidate_using_info[i]); + + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("incorrect invalidation collection method given"), + errdetail("Expected one of %s but got \"%s\"", NameStr(alts), using)); + + return -1; /* To keep linter happy */ +} + +/* + * This method is used to migrate to or from using WAL-based invalidation + * collection. The procedure is roughly the same in both cases: + * + * 1. Lock the materialization invalidation log to prevent other refreshes + * from running concurrently. This should take a very heavy lock. + * + * 2. Move all invalidation from the WAL or the hypertable invalidation log to + * the materialization invalidation log. + * + * 3. Add or remove the trigger and add or remove the slot, as necessary. + * + * 4. Unlock the materialization invalidation log. + * + * When moving from WAL-based invalidation collection it is necessary to + * process invalidations for *all* hypertables with continuous aggregates, not + * just *this* hypertable. When going in the other direction it is sufficient + * to process the invalidations for the hypertable in question (since you just + * change one hypertable at a time). + * + * Once the lock is released on the materialization invalidation log, all + * waiting processes will attempt to move invalidations, but since there are + * none, it will be a no-op. + */ +Datum +continuous_agg_set_invalidation_method(PG_FUNCTION_ARGS) +{ + Name new_method = PG_GETARG_NAME(0); + Oid table_relid = PG_GETARG_OID(1); + bool has_trigger; + Cache *hcache; + Hypertable *hypertable; + Relation logrel; + Oid relid; + const ContinuousAggInvalidateUsing new_using = invalidation_parse_using(NameStr(*new_method)); + const Catalog *const catalog = ts_catalog_get(); + char slot_name[TS_INVALIDATION_SLOT_NAME_MAX]; + bool slot_prepared = false; + + /* + * The slot name is needed both for moving to and from WAL-based + * invalidation collection, so we fetch the name here + */ + ts_get_invalidation_replication_slot_name(slot_name, sizeof(slot_name)); + + /* + * It is necessary to prepare the slot before doing anything that starts a + * transaction, so we speculatively prepare the slot here if it *might* be + * necessary and throw it away later if it is not needed. + */ + if (new_using == ContinuousAggInvalidateUsingWal && + SearchNamedReplicationSlot(slot_name, true) == NULL) + { + ts_cagg_add_logical_decoding_slot_prepare(slot_name); + slot_prepared = true; + } + + hypertable = ts_hypertable_cache_get_cache_and_entry(table_relid, CACHE_FLAG_NONE, &hcache); + has_trigger = has_invalidation_trigger(table_relid); + + if (new_using == ContinuousAggInvalidateUsingWal && !has_trigger) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hypertable is already using WAL-based invalidation collection")); + + if (new_using == ContinuousAggInvalidateUsingTrigger && has_trigger) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hypertable is already using trigger-based invalidation collection")); + + if (ts_number_of_continuous_aggs_attached(hypertable->fd.id) == 0) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hypertable does not have any continuous aggregates attached")); + + relid = catalog_get_table_id(catalog, CONTINUOUS_AGGS_MATERIALIZATION_INVALIDATION_LOG); + logrel = table_open(relid, AccessExclusiveLock); + + if (new_using == ContinuousAggInvalidateUsingWal) + { + const Dimension *const dim = hyperspace_get_open_dimension(hypertable->space, 0); + const Oid dimtype = ts_dimension_get_partition_type(dim); + + invalidation_process_hypertable_log(hypertable->fd.id, dimtype); + ts_hypertable_drop_trigger(table_relid, CAGGINVAL_TRIGGER_NAME); + + if (slot_prepared) + ts_cagg_add_logical_decoding_slot_finalize(); + } + else if (new_using == ContinuousAggInvalidateUsingTrigger) + { + multi_invalidation_process_hypertable_log(get_all_wal_using_hypertables()); + cagg_add_trigger_hypertable(table_relid, hypertable->fd.id); + + if (!ts_hypertable_invalidation_slot_used() && + SearchNamedReplicationSlot(slot_name, true) != NULL) + ts_hypertable_drop_invalidation_replication_slot(slot_name); + } + + table_close(logrel, NoLock); + + ts_cache_release(&hcache); + + PG_RETURN_VOID(); +} diff --git a/tsl/src/continuous_aggs/invalidation.h b/tsl/src/continuous_aggs/invalidation.h index 7d7c85b435b..f355bd69ec4 100644 --- a/tsl/src/continuous_aggs/invalidation.h +++ b/tsl/src/continuous_aggs/invalidation.h @@ -42,7 +42,9 @@ extern void continuous_agg_invalidate_raw_ht(const Hypertable *raw_ht, int64 sta extern void continuous_agg_invalidate_mat_ht(const Hypertable *raw_ht, const Hypertable *mat_ht, int64 start, int64 end); extern Datum continuous_agg_process_hypertable_invalidations(PG_FUNCTION_ARGS); +extern Datum continuous_agg_set_invalidation_method(PG_FUNCTION_ARGS); extern void invalidation_process_hypertable_log(int32 hypertable_id, Oid dimtype); +extern ContinuousAggInvalidateUsing invalidation_parse_using(const char *using); extern InvalidationStore *invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window, diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index a2fed5156b6..92f5f3dff42 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -19,6 +19,7 @@ #include #include "bgw_policy/policies_v2.h" +#include "continuous_aggs/common.h" #include "continuous_aggs/invalidation_multi.h" #include "debug_point.h" #include "dimension.h" @@ -83,35 +84,6 @@ static void fill_bucket_offset_origin(const ContinuousAgg *cagg, const InternalTimeRange *const refresh_window, NullableDatum *offset, NullableDatum *origin); -/* - * Get all hypertables that are using WAL-invalidations. - */ -static List * -get_all_wal_using_hypertables(void) -{ - ScanIterator iterator = - ts_scan_iterator_create(CONTINUOUS_AGG, AccessShareLock, CurrentMemoryContext); - List *hypertables = NIL; - - /* Collect OID of all tables using continuous aggregates */ - ts_scanner_foreach(&iterator) - { - bool isnull; - Datum datum = slot_getattr(ts_scan_iterator_slot(&iterator), - Anum_continuous_agg_raw_hypertable_id, - &isnull); - - Assert(!isnull); - int32 hypertable_id = DatumGetInt32(datum); - Oid relid = ts_hypertable_id_to_relid(hypertable_id, false); - if (!has_invalidation_trigger(relid)) - hypertables = list_append_unique_int(hypertables, hypertable_id); - } - ts_scan_iterator_close(&iterator); - - return hypertables; -} - static Hypertable * cagg_get_hypertable_or_fail(int32 hypertable_id) { diff --git a/tsl/src/init.c b/tsl/src/init.c index 6260ec747a3..89f1156cbfc 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -138,6 +138,7 @@ CrossModuleFunctions tsl_cm_functions = { .continuous_agg_refresh = continuous_agg_refresh, .continuous_agg_process_hypertable_invalidations = continuous_agg_process_hypertable_invalidations, + .continuous_agg_set_invalidation_method = continuous_agg_set_invalidation_method, .continuous_agg_invalidate_raw_ht = continuous_agg_invalidate_raw_ht, .continuous_agg_invalidate_mat_ht = continuous_agg_invalidate_mat_ht, .continuous_agg_update_options = continuous_agg_update_options, diff --git a/tsl/test/expected/cagg_refresh_migrate.out b/tsl/test/expected/cagg_refresh_migrate.out new file mode 100644 index 00000000000..2078febb716 --- /dev/null +++ b/tsl/test/expected/cagg_refresh_migrate.out @@ -0,0 +1,205 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +-- Test that refresh works even when migrating between invalidation +-- methods. +-- +-- Since inserts into the hypertable ends up in either the WAL or the +-- hypertable invalidation log table, we check that moving between +-- them actually does what is expected and move all invalidations to +-- the materialization log. +CREATE VIEW hypertable_invalidation_thresholds AS +SELECT format('%I.%I', ht.schema_name, ht.table_name)::regclass AS hypertable, + _timescaledb_functions.to_timestamp_without_timezone(watermark) AS threshold + FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold + JOIN _timescaledb_catalog.hypertable ht + ON hypertable_id = ht.id; +CREATE VIEW materialization_invalidations AS +SELECT ca.user_view_name AS aggregate_name, + ht.table_name, + _timescaledb_functions.to_timestamp_without_timezone(lowest_modified_value) AS lowest, + _timescaledb_functions.to_timestamp_without_timezone(greatest_modified_value) AS greatest + FROM _timescaledb_catalog.continuous_agg ca + JOIN _timescaledb_catalog.continuous_aggs_materialization_invalidation_log ml + ON ca.mat_hypertable_id = ml.materialization_id + JOIN _timescaledb_catalog.hypertable ht + ON materialization_id = ht.id + WHERE lowest_modified_value BETWEEN 0 AND 1759302000000000 + AND greatest_modified_value BETWEEN 0 AND 1759302000000000; +CREATE TABLE device_readings ( + created_at timestamp NOT NULL, + device_id text NOT NULL, + metric double precision NOT NULL, + PRIMARY KEY (device_id, created_at) +); +SELECT table_name FROM create_hypertable ('device_readings', 'created_at'); +WARNING: column type "timestamp without time zone" used for "created_at" does not follow best practices + table_name +----------------- + device_readings +(1 row) + +CREATE MATERIALIZED VIEW device_summary_hourly +WITH (timescaledb.continuous, timescaledb.materialized_only = TRUE) AS +SELECT + time_bucket ('1 hour', created_at) AS bucket, + device_id, + sum(metric) AS metric_sum, + max(metric) - min(metric) AS metric_spread +FROM + device_readings +GROUP BY + bucket, + device_id +WITH NO DATA; +CREATE MATERIALIZED VIEW device_summary_daily +WITH (timescaledb.continuous, timescaledb.materialized_only = TRUE) AS +SELECT + time_bucket ('1 day', created_at) AS bucket, + device_id, + sum(metric) AS metric_sum, + max(metric) - min(metric) AS metric_spread +FROM + device_readings +GROUP BY + bucket, + device_id +WITH NO DATA; +-- Shall be empty +SELECT * FROM device_summary_hourly; + bucket | device_id | metric_sum | metric_spread +--------+-----------+------------+--------------- +(0 rows) + +SELECT * FROM device_summary_daily; + bucket | device_id | metric_sum | metric_spread +--------+-----------+------------+--------------- +(0 rows) + +-- Call refresh to set the threshold correctly. +CALL refresh_continuous_aggregate('device_summary_hourly', NULL, '2025-10-01'); +CALL refresh_continuous_aggregate('device_summary_daily', NULL, '2025-10-01'); +SELECT * FROM hypertable_invalidation_thresholds ORDER BY 1,2; + hypertable | threshold +-----------------+-------------------------- + device_readings | Wed Oct 01 00:00:00 2025 +(1 row) + +-- Insert something before the threshold, run refresh, and check the result +INSERT INTO device_readings VALUES + ('2025-09-10 12:34:56', 1, 1), + ('2025-09-10 12:34:57', 1, 2); +CALL refresh_continuous_aggregate('device_summary_hourly', NULL, '2025-10-01'); +CALL refresh_continuous_aggregate('device_summary_daily', NULL, '2025-10-01'); +SELECT * FROM device_summary_hourly; + bucket | device_id | metric_sum | metric_spread +--------------------------+-----------+------------+--------------- + Wed Sep 10 12:00:00 2025 | 1 | 3 | 1 +(1 row) + +SELECT * FROM device_summary_daily; + bucket | device_id | metric_sum | metric_spread +--------------------------+-----------+------------+--------------- + Wed Sep 10 00:00:00 2025 | 1 | 3 | 1 +(1 row) + +-- +-- Test that we can migrate to use WAL without losing any invalidations. +-- +-- Now insert new values in the hypertable +INSERT INTO device_readings(created_at, device_id, metric) +SELECT created_at, device_id, 1 + FROM generate_series('2025-09-11 11:00'::timestamptz, '2025-09-11 11:59', '1 minute'::interval) created_at, + generate_series(1, 4) device_id; +-- Migrate the table to use WAL. We should see the value in the +-- materialization log after the migration +SELECT * FROM materialization_invalidations; + aggregate_name | table_name | lowest | greatest +----------------+------------+--------+---------- +(0 rows) + +CALL _timescaledb_functions.set_invalidation_method('wal', 'device_readings'); +SELECT * FROM materialization_invalidations; + aggregate_name | table_name | lowest | greatest +-----------------------+----------------------------+--------------------------+--------------------------------- + device_summary_hourly | _materialized_hypertable_2 | Thu Sep 11 11:00:00 2025 | Thu Sep 11 11:59:59.999999 2025 + device_summary_daily | _materialized_hypertable_3 | Thu Sep 11 00:00:00 2025 | Thu Sep 11 23:59:59.999999 2025 +(2 rows) + +-- Refresh it to check that the refresh works as expected +CALL refresh_continuous_aggregate('device_summary_hourly', NULL, '2025-10-01'); +CALL refresh_continuous_aggregate('device_summary_daily', NULL, '2025-10-01'); +SELECT * FROM device_summary_hourly; + bucket | device_id | metric_sum | metric_spread +--------------------------+-----------+------------+--------------- + Wed Sep 10 12:00:00 2025 | 1 | 3 | 1 + Thu Sep 11 11:00:00 2025 | 1 | 60 | 0 + Thu Sep 11 11:00:00 2025 | 2 | 60 | 0 + Thu Sep 11 11:00:00 2025 | 3 | 60 | 0 + Thu Sep 11 11:00:00 2025 | 4 | 60 | 0 +(5 rows) + +SELECT * FROM device_summary_daily; + bucket | device_id | metric_sum | metric_spread +--------------------------+-----------+------------+--------------- + Wed Sep 10 00:00:00 2025 | 1 | 3 | 1 + Thu Sep 11 00:00:00 2025 | 1 | 60 | 0 + Thu Sep 11 00:00:00 2025 | 2 | 60 | 0 + Thu Sep 11 00:00:00 2025 | 3 | 60 | 0 + Thu Sep 11 00:00:00 2025 | 4 | 60 | 0 +(5 rows) + +-- +-- Test that we can migrate back to use trigger without losing any invalidations. +-- +-- Insert new values in the hypertable +INSERT INTO device_readings(created_at, device_id, metric) +SELECT created_at, device_id, 1 + FROM generate_series('2025-09-12 11:00'::timestamptz, '2025-09-12 11:59', '1 minute'::interval) created_at, + generate_series(1, 4) device_id; +-- Migrate the table to use WAL. We should see the value in the +-- materialization log after the migration +SELECT * FROM materialization_invalidations; + aggregate_name | table_name | lowest | greatest +----------------+------------+--------+---------- +(0 rows) + +CALL _timescaledb_functions.set_invalidation_method('trigger', 'device_readings'); +SELECT * FROM materialization_invalidations; + aggregate_name | table_name | lowest | greatest +-----------------------+----------------------------+--------------------------+-------------------------- + device_summary_hourly | _materialized_hypertable_2 | Fri Sep 12 11:00:00 2025 | Fri Sep 12 12:00:00 2025 + device_summary_daily | _materialized_hypertable_3 | Fri Sep 12 00:00:00 2025 | Sat Sep 13 00:00:00 2025 +(2 rows) + +-- Refresh it to check that the refresh works as expected +CALL refresh_continuous_aggregate('device_summary_hourly', NULL, '2025-10-01'); +CALL refresh_continuous_aggregate('device_summary_daily', NULL, '2025-10-01'); +SELECT * FROM device_summary_hourly; + bucket | device_id | metric_sum | metric_spread +--------------------------+-----------+------------+--------------- + Wed Sep 10 12:00:00 2025 | 1 | 3 | 1 + Thu Sep 11 11:00:00 2025 | 1 | 60 | 0 + Thu Sep 11 11:00:00 2025 | 2 | 60 | 0 + Thu Sep 11 11:00:00 2025 | 3 | 60 | 0 + Thu Sep 11 11:00:00 2025 | 4 | 60 | 0 + Fri Sep 12 11:00:00 2025 | 1 | 60 | 0 + Fri Sep 12 11:00:00 2025 | 2 | 60 | 0 + Fri Sep 12 11:00:00 2025 | 3 | 60 | 0 + Fri Sep 12 11:00:00 2025 | 4 | 60 | 0 +(9 rows) + +SELECT * FROM device_summary_daily; + bucket | device_id | metric_sum | metric_spread +--------------------------+-----------+------------+--------------- + Wed Sep 10 00:00:00 2025 | 1 | 3 | 1 + Thu Sep 11 00:00:00 2025 | 1 | 60 | 0 + Thu Sep 11 00:00:00 2025 | 2 | 60 | 0 + Thu Sep 11 00:00:00 2025 | 3 | 60 | 0 + Thu Sep 11 00:00:00 2025 | 4 | 60 | 0 + Fri Sep 12 00:00:00 2025 | 1 | 60 | 0 + Fri Sep 12 00:00:00 2025 | 2 | 60 | 0 + Fri Sep 12 00:00:00 2025 | 3 | 60 | 0 + Fri Sep 12 00:00:00 2025 | 4 | 60 | 0 +(9 rows) + diff --git a/tsl/test/expected/cagg_usage-15.out b/tsl/test/expected/cagg_usage-15.out index 5d1b55204c3..8d0e88f9e35 100644 --- a/tsl/test/expected/cagg_usage-15.out +++ b/tsl/test/expected/cagg_usage-15.out @@ -615,6 +615,8 @@ SELECT count(*) FROM invalidation_slots WHERE database = current_database(); CREATE TABLE magic1(time timestamptz not null, device int, value float); CREATE TABLE magic2(time timestamptz not null, device int, value float); CREATE TABLE magic3(time timestamptz not null, device int, value float); +CREATE TABLE no_caggs(time timestamptz not null, device int, value float); +CREATE TABLE not_hypertable(time timestamptz not null, device int, value float); SELECT table_name FROM create_hypertable('magic1','time'); table_name ------------ @@ -633,6 +635,12 @@ SELECT table_name FROM create_hypertable('magic3','time'); magic3 (1 row) +SELECT table_name FROM create_hypertable('no_caggs','time'); + table_name +------------ + no_caggs +(1 row) + INSERT INTO magic1 SELECT generate_series('1999-12-20'::timestamptz, '2000-02-01'::timestamptz, '12 day'::interval), (100 * random())::int, @@ -668,7 +676,7 @@ select currval('_timescaledb_catalog.hypertable_id_seq') - 1 as prev_htid \gset select setval('_timescaledb_catalog.hypertable_id_seq', :prev_htid, false); setval -------- - 19 + 20 (1 row) \set ON_ERROR_STOP 0 @@ -754,7 +762,7 @@ CREATE MATERIALIZED VIEW magic_summary1_magic AS SELECT time_bucket('1 day', time), device, avg(value) FROM magic1 GROUP BY 1,2; -ERROR: unrecognized value "magic" for invalidate_using +ERROR: incorrect invalidation collection method given -- This should error out since we cannot add the trigger to the -- hypertable when there are multiple caggs connected. Changing -- invalidation collection method is more complicated than this. @@ -782,7 +790,7 @@ SELECT count(*) FROM pg_replication_slots (1 row) DROP MATERIALIZED VIEW magic2_summary1_wal; -NOTICE: drop cascades to table _timescaledb_internal._hyper_23_40_chunk +NOTICE: drop cascades to table _timescaledb_internal._hyper_24_40_chunk -- Slot should be there. We have another hypertable using WAL-based -- invalidation collection. SELECT count(*) FROM pg_replication_slots @@ -794,7 +802,7 @@ SELECT count(*) FROM pg_replication_slots (1 row) DROP MATERIALIZED VIEW magic1_summary1_wal; -NOTICE: drop cascades to table _timescaledb_internal._hyper_21_38_chunk +NOTICE: drop cascades to table _timescaledb_internal._hyper_22_38_chunk -- Slot should be there. We have yet another continuous aggregate for -- the hypertable using WAL-based invalidation collection. SELECT count(*) FROM pg_replication_slots @@ -806,7 +814,7 @@ SELECT count(*) FROM pg_replication_slots (1 row) DROP MATERIALIZED VIEW magic1_summary2_wal; -NOTICE: drop cascades to table _timescaledb_internal._hyper_22_39_chunk +NOTICE: drop cascades to table _timescaledb_internal._hyper_23_39_chunk -- Now slot should be gone and we should not have any continuous -- aggregates using WAL-based invalidation collection. SELECT count(*) FROM pg_replication_slots @@ -854,3 +862,54 @@ ALTER MATERIALIZED VIEW magic3_day_summary_trigger SET (timescaledb.invalidate_using = 'wal'); ERROR: cannot change invalidation method for continuous aggregate \set ON_ERROR_STOP 1 +-- Test functions to migrate between invalidation collection methods. +\set VERBOSITY default +\set ON_ERROR_STOP 0 +-- Wrong method used +CALL _timescaledb_functions.set_invalidation_method('wrong', 'magic1'); +ERROR: incorrect invalidation collection method given +DETAIL: Expected one of "default", "trigger", or "wal" but got "wrong" +-- Already using WAL +CALL _timescaledb_functions.set_invalidation_method('wal', 'magic1'); +ERROR: hypertable is already using WAL-based invalidation collection +-- Already using trigger +CALL _timescaledb_functions.set_invalidation_method('trigger', 'magic3'); +ERROR: hypertable is already using trigger-based invalidation collection +-- Hypertable without continuous aggregates, which will not have a +-- trigger, hence look like it is WAL-based. +CALL _timescaledb_functions.set_invalidation_method('trigger', 'no_caggs'); +ERROR: hypertable does not have any continuous aggregates attached +-- Normal table, which definitely do not have any triggers and might +-- look like it is WAL-based. +CALL _timescaledb_functions.set_invalidation_method('trigger', 'not_hypertable'); +ERROR: table "not_hypertable" is not a hypertable +\set ON_ERROR_STOP 1 +SELECT hypertable_name, view_name, invalidate_using + FROM timescaledb_information.continuous_aggregates + WHERE hypertable_name = 'magic3'; + hypertable_name | view_name | invalidate_using +-----------------+-----------------------------+------------------ + magic3 | magic3_day_summary_trigger | trigger + magic3 | magic3_week_summary_trigger | trigger +(2 rows) + +CALL _timescaledb_functions.set_invalidation_method('wal', 'magic3'); +SELECT hypertable_name, view_name, invalidate_using + FROM timescaledb_information.continuous_aggregates + WHERE hypertable_name = 'magic3'; + hypertable_name | view_name | invalidate_using +-----------------+-----------------------------+------------------ + magic3 | magic3_day_summary_trigger | wal + magic3 | magic3_week_summary_trigger | wal +(2 rows) + +CALL _timescaledb_functions.set_invalidation_method('trigger', 'magic3'); +SELECT hypertable_name, view_name, invalidate_using + FROM timescaledb_information.continuous_aggregates + WHERE hypertable_name = 'magic3'; + hypertable_name | view_name | invalidate_using +-----------------+-----------------------------+------------------ + magic3 | magic3_day_summary_trigger | trigger + magic3 | magic3_week_summary_trigger | trigger +(2 rows) + diff --git a/tsl/test/expected/cagg_usage-16.out b/tsl/test/expected/cagg_usage-16.out index 75370fbc4d0..9b9aac609e1 100644 --- a/tsl/test/expected/cagg_usage-16.out +++ b/tsl/test/expected/cagg_usage-16.out @@ -615,6 +615,8 @@ SELECT count(*) FROM invalidation_slots WHERE database = current_database(); CREATE TABLE magic1(time timestamptz not null, device int, value float); CREATE TABLE magic2(time timestamptz not null, device int, value float); CREATE TABLE magic3(time timestamptz not null, device int, value float); +CREATE TABLE no_caggs(time timestamptz not null, device int, value float); +CREATE TABLE not_hypertable(time timestamptz not null, device int, value float); SELECT table_name FROM create_hypertable('magic1','time'); table_name ------------ @@ -633,6 +635,12 @@ SELECT table_name FROM create_hypertable('magic3','time'); magic3 (1 row) +SELECT table_name FROM create_hypertable('no_caggs','time'); + table_name +------------ + no_caggs +(1 row) + INSERT INTO magic1 SELECT generate_series('1999-12-20'::timestamptz, '2000-02-01'::timestamptz, '12 day'::interval), (100 * random())::int, @@ -668,7 +676,7 @@ select currval('_timescaledb_catalog.hypertable_id_seq') - 1 as prev_htid \gset select setval('_timescaledb_catalog.hypertable_id_seq', :prev_htid, false); setval -------- - 19 + 20 (1 row) \set ON_ERROR_STOP 0 @@ -754,7 +762,7 @@ CREATE MATERIALIZED VIEW magic_summary1_magic AS SELECT time_bucket('1 day', time), device, avg(value) FROM magic1 GROUP BY 1,2; -ERROR: unrecognized value "magic" for invalidate_using +ERROR: incorrect invalidation collection method given -- This should error out since we cannot add the trigger to the -- hypertable when there are multiple caggs connected. Changing -- invalidation collection method is more complicated than this. @@ -782,7 +790,7 @@ SELECT count(*) FROM pg_replication_slots (1 row) DROP MATERIALIZED VIEW magic2_summary1_wal; -NOTICE: drop cascades to table _timescaledb_internal._hyper_23_40_chunk +NOTICE: drop cascades to table _timescaledb_internal._hyper_24_40_chunk -- Slot should be there. We have another hypertable using WAL-based -- invalidation collection. SELECT count(*) FROM pg_replication_slots @@ -794,7 +802,7 @@ SELECT count(*) FROM pg_replication_slots (1 row) DROP MATERIALIZED VIEW magic1_summary1_wal; -NOTICE: drop cascades to table _timescaledb_internal._hyper_21_38_chunk +NOTICE: drop cascades to table _timescaledb_internal._hyper_22_38_chunk -- Slot should be there. We have yet another continuous aggregate for -- the hypertable using WAL-based invalidation collection. SELECT count(*) FROM pg_replication_slots @@ -806,7 +814,7 @@ SELECT count(*) FROM pg_replication_slots (1 row) DROP MATERIALIZED VIEW magic1_summary2_wal; -NOTICE: drop cascades to table _timescaledb_internal._hyper_22_39_chunk +NOTICE: drop cascades to table _timescaledb_internal._hyper_23_39_chunk -- Now slot should be gone and we should not have any continuous -- aggregates using WAL-based invalidation collection. SELECT count(*) FROM pg_replication_slots @@ -854,3 +862,54 @@ ALTER MATERIALIZED VIEW magic3_day_summary_trigger SET (timescaledb.invalidate_using = 'wal'); ERROR: cannot change invalidation method for continuous aggregate \set ON_ERROR_STOP 1 +-- Test functions to migrate between invalidation collection methods. +\set VERBOSITY default +\set ON_ERROR_STOP 0 +-- Wrong method used +CALL _timescaledb_functions.set_invalidation_method('wrong', 'magic1'); +ERROR: incorrect invalidation collection method given +DETAIL: Expected one of "default", "trigger", or "wal" but got "wrong" +-- Already using WAL +CALL _timescaledb_functions.set_invalidation_method('wal', 'magic1'); +ERROR: hypertable is already using WAL-based invalidation collection +-- Already using trigger +CALL _timescaledb_functions.set_invalidation_method('trigger', 'magic3'); +ERROR: hypertable is already using trigger-based invalidation collection +-- Hypertable without continuous aggregates, which will not have a +-- trigger, hence look like it is WAL-based. +CALL _timescaledb_functions.set_invalidation_method('trigger', 'no_caggs'); +ERROR: hypertable does not have any continuous aggregates attached +-- Normal table, which definitely do not have any triggers and might +-- look like it is WAL-based. +CALL _timescaledb_functions.set_invalidation_method('trigger', 'not_hypertable'); +ERROR: table "not_hypertable" is not a hypertable +\set ON_ERROR_STOP 1 +SELECT hypertable_name, view_name, invalidate_using + FROM timescaledb_information.continuous_aggregates + WHERE hypertable_name = 'magic3'; + hypertable_name | view_name | invalidate_using +-----------------+-----------------------------+------------------ + magic3 | magic3_day_summary_trigger | trigger + magic3 | magic3_week_summary_trigger | trigger +(2 rows) + +CALL _timescaledb_functions.set_invalidation_method('wal', 'magic3'); +SELECT hypertable_name, view_name, invalidate_using + FROM timescaledb_information.continuous_aggregates + WHERE hypertable_name = 'magic3'; + hypertable_name | view_name | invalidate_using +-----------------+-----------------------------+------------------ + magic3 | magic3_day_summary_trigger | wal + magic3 | magic3_week_summary_trigger | wal +(2 rows) + +CALL _timescaledb_functions.set_invalidation_method('trigger', 'magic3'); +SELECT hypertable_name, view_name, invalidate_using + FROM timescaledb_information.continuous_aggregates + WHERE hypertable_name = 'magic3'; + hypertable_name | view_name | invalidate_using +-----------------+-----------------------------+------------------ + magic3 | magic3_day_summary_trigger | trigger + magic3 | magic3_week_summary_trigger | trigger +(2 rows) + diff --git a/tsl/test/expected/cagg_usage-17.out b/tsl/test/expected/cagg_usage-17.out index 75370fbc4d0..9b9aac609e1 100644 --- a/tsl/test/expected/cagg_usage-17.out +++ b/tsl/test/expected/cagg_usage-17.out @@ -615,6 +615,8 @@ SELECT count(*) FROM invalidation_slots WHERE database = current_database(); CREATE TABLE magic1(time timestamptz not null, device int, value float); CREATE TABLE magic2(time timestamptz not null, device int, value float); CREATE TABLE magic3(time timestamptz not null, device int, value float); +CREATE TABLE no_caggs(time timestamptz not null, device int, value float); +CREATE TABLE not_hypertable(time timestamptz not null, device int, value float); SELECT table_name FROM create_hypertable('magic1','time'); table_name ------------ @@ -633,6 +635,12 @@ SELECT table_name FROM create_hypertable('magic3','time'); magic3 (1 row) +SELECT table_name FROM create_hypertable('no_caggs','time'); + table_name +------------ + no_caggs +(1 row) + INSERT INTO magic1 SELECT generate_series('1999-12-20'::timestamptz, '2000-02-01'::timestamptz, '12 day'::interval), (100 * random())::int, @@ -668,7 +676,7 @@ select currval('_timescaledb_catalog.hypertable_id_seq') - 1 as prev_htid \gset select setval('_timescaledb_catalog.hypertable_id_seq', :prev_htid, false); setval -------- - 19 + 20 (1 row) \set ON_ERROR_STOP 0 @@ -754,7 +762,7 @@ CREATE MATERIALIZED VIEW magic_summary1_magic AS SELECT time_bucket('1 day', time), device, avg(value) FROM magic1 GROUP BY 1,2; -ERROR: unrecognized value "magic" for invalidate_using +ERROR: incorrect invalidation collection method given -- This should error out since we cannot add the trigger to the -- hypertable when there are multiple caggs connected. Changing -- invalidation collection method is more complicated than this. @@ -782,7 +790,7 @@ SELECT count(*) FROM pg_replication_slots (1 row) DROP MATERIALIZED VIEW magic2_summary1_wal; -NOTICE: drop cascades to table _timescaledb_internal._hyper_23_40_chunk +NOTICE: drop cascades to table _timescaledb_internal._hyper_24_40_chunk -- Slot should be there. We have another hypertable using WAL-based -- invalidation collection. SELECT count(*) FROM pg_replication_slots @@ -794,7 +802,7 @@ SELECT count(*) FROM pg_replication_slots (1 row) DROP MATERIALIZED VIEW magic1_summary1_wal; -NOTICE: drop cascades to table _timescaledb_internal._hyper_21_38_chunk +NOTICE: drop cascades to table _timescaledb_internal._hyper_22_38_chunk -- Slot should be there. We have yet another continuous aggregate for -- the hypertable using WAL-based invalidation collection. SELECT count(*) FROM pg_replication_slots @@ -806,7 +814,7 @@ SELECT count(*) FROM pg_replication_slots (1 row) DROP MATERIALIZED VIEW magic1_summary2_wal; -NOTICE: drop cascades to table _timescaledb_internal._hyper_22_39_chunk +NOTICE: drop cascades to table _timescaledb_internal._hyper_23_39_chunk -- Now slot should be gone and we should not have any continuous -- aggregates using WAL-based invalidation collection. SELECT count(*) FROM pg_replication_slots @@ -854,3 +862,54 @@ ALTER MATERIALIZED VIEW magic3_day_summary_trigger SET (timescaledb.invalidate_using = 'wal'); ERROR: cannot change invalidation method for continuous aggregate \set ON_ERROR_STOP 1 +-- Test functions to migrate between invalidation collection methods. +\set VERBOSITY default +\set ON_ERROR_STOP 0 +-- Wrong method used +CALL _timescaledb_functions.set_invalidation_method('wrong', 'magic1'); +ERROR: incorrect invalidation collection method given +DETAIL: Expected one of "default", "trigger", or "wal" but got "wrong" +-- Already using WAL +CALL _timescaledb_functions.set_invalidation_method('wal', 'magic1'); +ERROR: hypertable is already using WAL-based invalidation collection +-- Already using trigger +CALL _timescaledb_functions.set_invalidation_method('trigger', 'magic3'); +ERROR: hypertable is already using trigger-based invalidation collection +-- Hypertable without continuous aggregates, which will not have a +-- trigger, hence look like it is WAL-based. +CALL _timescaledb_functions.set_invalidation_method('trigger', 'no_caggs'); +ERROR: hypertable does not have any continuous aggregates attached +-- Normal table, which definitely do not have any triggers and might +-- look like it is WAL-based. +CALL _timescaledb_functions.set_invalidation_method('trigger', 'not_hypertable'); +ERROR: table "not_hypertable" is not a hypertable +\set ON_ERROR_STOP 1 +SELECT hypertable_name, view_name, invalidate_using + FROM timescaledb_information.continuous_aggregates + WHERE hypertable_name = 'magic3'; + hypertable_name | view_name | invalidate_using +-----------------+-----------------------------+------------------ + magic3 | magic3_day_summary_trigger | trigger + magic3 | magic3_week_summary_trigger | trigger +(2 rows) + +CALL _timescaledb_functions.set_invalidation_method('wal', 'magic3'); +SELECT hypertable_name, view_name, invalidate_using + FROM timescaledb_information.continuous_aggregates + WHERE hypertable_name = 'magic3'; + hypertable_name | view_name | invalidate_using +-----------------+-----------------------------+------------------ + magic3 | magic3_day_summary_trigger | wal + magic3 | magic3_week_summary_trigger | wal +(2 rows) + +CALL _timescaledb_functions.set_invalidation_method('trigger', 'magic3'); +SELECT hypertable_name, view_name, invalidate_using + FROM timescaledb_information.continuous_aggregates + WHERE hypertable_name = 'magic3'; + hypertable_name | view_name | invalidate_using +-----------------+-----------------------------+------------------ + magic3 | magic3_day_summary_trigger | trigger + magic3 | magic3_week_summary_trigger | trigger +(2 rows) + diff --git a/tsl/test/shared/expected/extension.out b/tsl/test/shared/expected/extension.out index fd5da0305c5..af28bcc461e 100644 --- a/tsl/test/shared/expected/extension.out +++ b/tsl/test/shared/expected/extension.out @@ -32,6 +32,7 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text _timescaledb_functions.bookend_deserializefunc(bytea,internal) _timescaledb_functions.bookend_finalfunc(internal,anyelement,"any") _timescaledb_functions.bookend_serializefunc(internal) + _timescaledb_functions.set_invalidation_method(name,regclass) _timescaledb_functions.cagg_get_bucket_function_info(integer) _timescaledb_functions.cagg_migrate_create_plan(_timescaledb_catalog.continuous_agg,text,boolean,boolean) _timescaledb_functions.cagg_migrate_execute_copy_data(_timescaledb_catalog.continuous_agg,_timescaledb_catalog.continuous_agg_migrate_plan_step) diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index 0243f3e387b..74a0957de6f 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -17,6 +17,7 @@ set(TEST_FILES cagg_policy_move.sql cagg_policy_concurrent.sql cagg_plugin.sql + cagg_refresh_migrate.sql cagg_refresh_using_trigger.sql cagg_refresh_using_wal.sql cagg_refresh_using_merge.sql diff --git a/tsl/test/sql/cagg_policy.sql.orig b/tsl/test/sql/cagg_policy.sql.orig deleted file mode 100644 index 020f84dc11b..00000000000 --- a/tsl/test/sql/cagg_policy.sql.orig +++ /dev/null @@ -1,740 +0,0 @@ --- This file and its contents are licensed under the Timescale License. --- Please see the included NOTICE for copyright information and --- LICENSE-TIMESCALE for a copy of the license. - --- test add and remove refresh policy apis - -SET ROLE :ROLE_DEFAULT_PERM_USER; - ---TEST1 --- ---basic test with count -CREATE TABLE int_tab (a integer, b integer, c integer); -SELECT table_name FROM create_hypertable('int_tab', 'a', chunk_time_interval=> 10); - -INSERT INTO int_tab VALUES( 3 , 16 , 20); -INSERT INTO int_tab VALUES( 1 , 10 , 20); -INSERT INTO int_tab VALUES( 1 , 11 , 20); -INSERT INTO int_tab VALUES( 1 , 12 , 20); -INSERT INTO int_tab VALUES( 1 , 13 , 20); -INSERT INTO int_tab VALUES( 1 , 14 , 20); -INSERT INTO int_tab VALUES( 2 , 14 , 20); -INSERT INTO int_tab VALUES( 2 , 15 , 20); -INSERT INTO int_tab VALUES( 2 , 16 , 20); - -CREATE OR REPLACE FUNCTION integer_now_int_tab() returns int LANGUAGE SQL STABLE as $$ SELECT coalesce(max(a), 0) FROM int_tab $$; -SELECT set_integer_now_func('int_tab', 'integer_now_int_tab'); - -CREATE MATERIALIZED VIEW mat_m1( a, countb ) -WITH (timescaledb.continuous, timescaledb.materialized_only=true) -as -SELECT a, count(b) -FROM int_tab -GROUP BY time_bucket(1, a), a WITH NO DATA; - -\c :TEST_DBNAME :ROLE_SUPERUSER - -SET timezone TO PST8PDT; - -DELETE FROM _timescaledb_config.bgw_job WHERE TRUE; - -SET ROLE :ROLE_DEFAULT_PERM_USER; -SELECT count(*) FROM _timescaledb_config.bgw_job; - -\set ON_ERROR_STOP 0 -\set VERBOSITY default - --- Test 1 step policy for integer type buckets -ALTER materialized view mat_m1 set (timescaledb.compress = true); --- No policy is added if one errors out -SELECT timescaledb_experimental.add_policies('mat_m1', refresh_start_offset => 1, refresh_end_offset => 10, compress_after => 11, drop_after => 20); -SELECT timescaledb_experimental.show_policies('mat_m1'); - --- All policies are added in one step -SELECT timescaledb_experimental.add_policies('mat_m1', refresh_start_offset => 10, refresh_end_offset => 1, compress_after => 11, drop_after => 20); -SELECT timescaledb_experimental.show_policies('mat_m1'); - ---Test coverage: new view for policies on CAggs -SELECT * FROM timescaledb_experimental.policies ORDER BY relation_name, proc_name; - ---Test coverage: new view for policies only shows the policies for CAggs -SELECT add_retention_policy('int_tab', 20); -SELECT * FROM timescaledb_experimental.policies ORDER BY relation_name, proc_name; -SELECT remove_retention_policy('int_tab'); - --- Test for duplicated policies (issue #5492) -CREATE MATERIALIZED VIEW mat_m2( a, sumb ) -WITH (timescaledb.continuous, timescaledb.materialized_only=true) -as -SELECT a, sum(b) -FROM int_tab -GROUP BY time_bucket(1, a), a WITH NO DATA; - --- add refresh policy -SELECT timescaledb_experimental.add_policies('mat_m2', refresh_start_offset => 10, refresh_end_offset => 1); -SELECT timescaledb_experimental.show_policies('mat_m2'); --- check for only one refresh policy for each cagg -SELECT * FROM timescaledb_experimental.policies WHERE proc_name ~ 'refresh' ORDER BY relation_name, proc_name; - -SELECT timescaledb_experimental.remove_all_policies('mat_m2'); -DROP MATERIALIZED VIEW mat_m2; - --- Alter policies -SELECT timescaledb_experimental.alter_policies('mat_m1', refresh_start_offset => 11, compress_after=>13, drop_after => 25); -SELECT timescaledb_experimental.show_policies('mat_m1'); - --- Remove one or more policy -SELECT timescaledb_experimental.remove_policies('mat_m1', false, 'policy_refresh_continuous_aggregate', 'policy_compression'); -SELECT timescaledb_experimental.show_policies('mat_m1'); - --- Add one policy -SELECT timescaledb_experimental.add_policies('mat_m1', refresh_start_offset => 10, refresh_end_offset => 1); -SELECT timescaledb_experimental.show_policies('mat_m1'); - --- Remove all policies -SELECT timescaledb_experimental.remove_policies('mat_m1', false, 'policy_refresh_continuous_aggregate', 'policy_retention'); -SELECT timescaledb_experimental.show_policies('mat_m1'); - ---Cross policy checks ---refresh and compression policy overlap -SELECT timescaledb_experimental.add_policies('mat_m1', refresh_start_offset => 12, refresh_end_offset => 1, compress_after=>11); - ---refresh and retention policy overlap -SELECT timescaledb_experimental.add_policies('mat_m1', refresh_start_offset => 12, refresh_end_offset => 1, drop_after=>11); - ---compression and retention policy overlap -SELECT timescaledb_experimental.add_policies('mat_m1', compress_after => 10, drop_after => 10); - --- Alter non existent policies -SELECT timescaledb_experimental.alter_policies('mat_m1', refresh_start_offset => 12, compress_after=>11, drop_after => 15); - -ALTER materialized view mat_m1 set (timescaledb.compress = false); - -SELECT add_continuous_aggregate_policy('int_tab', '1 day'::interval, 10 , '1 h'::interval); -SELECT add_continuous_aggregate_policy('mat_m1', '1 day'::interval, 10 , '1 h'::interval); -SELECT add_continuous_aggregate_policy('mat_m1', '1 day'::interval, 10 ); -SELECT add_continuous_aggregate_policy('mat_m1', 10, '1 day'::interval, '1 h'::interval); ---start_interval < end_interval -SELECT add_continuous_aggregate_policy('mat_m1', 5, 10, '1h'::interval); ---refresh window less than two buckets -SELECT add_continuous_aggregate_policy('mat_m1', 11, 10, '1h'::interval); -SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval) as job_id \gset - ---adding again should warn/error -SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_exists=>false); -SELECT add_continuous_aggregate_policy('mat_m1', 20, 15, '1h'::interval, if_not_exists=>true); -SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_exists=>true); - --- modify config and try to add, should error out -SELECT config FROM _timescaledb_config.bgw_job where id = :job_id; -SELECT hypertable_id as mat_id FROM _timescaledb_config.bgw_job where id = :job_id \gset -\set VERBOSITY terse -\set ON_ERROR_STOP 1 - -\c :TEST_DBNAME :ROLE_SUPERUSER - -SET timezone TO PST8PDT; - -UPDATE _timescaledb_config.bgw_job -SET config = jsonb_build_object('mat_hypertable_id', :mat_id) -WHERE id = :job_id; -SET ROLE :ROLE_DEFAULT_PERM_USER; -SELECT config FROM _timescaledb_config.bgw_job where id = :job_id; - -\set ON_ERROR_STOP 0 -\set VERBOSITY default -SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_exists=>true); - -SELECT remove_continuous_aggregate_policy('int_tab'); -SELECT remove_continuous_aggregate_policy('mat_m1'); --- add with NULL offset, readd with NULL offset -SELECT add_continuous_aggregate_policy('mat_m1', 20, NULL, '1h'::interval, if_not_exists=>true); -SELECT add_continuous_aggregate_policy('mat_m1', 20, NULL, '1h'::interval, if_not_exists=>true); -- same param values, so we get a NOTICE -SELECT add_continuous_aggregate_policy('mat_m1', NULL, NULL, '1h'::interval, if_not_exists=>true); -- different values, so we get a WARNING -SELECT remove_continuous_aggregate_policy('mat_m1'); -SELECT add_continuous_aggregate_policy('mat_m1', NULL, 20, '1h'::interval, if_not_exists=>true); -SELECT add_continuous_aggregate_policy('mat_m1', NULL, 20, '1h'::interval, if_not_exists=>true); -SELECT add_continuous_aggregate_policy('mat_m1', NULL, NULL, '1h'::interval, if_not_exists=>true); -SELECT remove_continuous_aggregate_policy('mat_m1'); - ---this one will fail -SELECT remove_continuous_aggregate_policy('mat_m1'); -SELECT remove_continuous_aggregate_policy('mat_m1', if_not_exists=>true); - ---now try to add a policy as a different user than the one that created the cagg ---should fail -SET ROLE :ROLE_DEFAULT_PERM_USER_2; -SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval) as job_id ; -\set VERBOSITY terse -\set ON_ERROR_STOP 1 - -SET ROLE :ROLE_DEFAULT_PERM_USER; -DROP MATERIALIZED VIEW mat_m1; - ---- code coverage tests : add policy for timestamp and date based table --- -CREATE TABLE continuous_agg_max_mat_date(time DATE); -SELECT create_hypertable('continuous_agg_max_mat_date', 'time'); -CREATE MATERIALIZED VIEW max_mat_view_date - WITH (timescaledb.continuous, timescaledb.materialized_only=true) - AS SELECT time_bucket('7 days', time) - FROM continuous_agg_max_mat_date - GROUP BY 1 WITH NO DATA; - -\set ON_ERROR_STOP 0 -\set VERBOSITY default - --- Test 1 step policy for timestamp type buckets -ALTER materialized view max_mat_view_date set (timescaledb.compress = true); --- Only works for cagg -SELECT timescaledb_experimental.add_policies('continuous_agg_max_mat_date', refresh_start_offset => '1 day'::interval, refresh_end_offset => '2 day'::interval, compress_after => '20 days'::interval, drop_after => '25 days'::interval); -SELECT timescaledb_experimental.show_policies('continuous_agg_max_mat_date'); -SELECT timescaledb_experimental.alter_policies('continuous_agg_max_mat_date', compress_after=>'16 days'::interval); -SELECT timescaledb_experimental.remove_policies('continuous_agg_max_mat_date', false, 'policy_refresh_continuous_aggregate'); - --- No policy is added if one errors out -SELECT timescaledb_experimental.add_policies('max_mat_view_date', refresh_start_offset => '1 day'::interval, refresh_end_offset => '2 day'::interval, compress_after => '20 days'::interval, drop_after => '25 days'::interval); -SELECT timescaledb_experimental.show_policies('max_mat_view_date'); - --- Create open ended refresh_policy -SELECT timescaledb_experimental.add_policies('max_mat_view_date', refresh_end_offset => '2 day'::interval); -SELECT timescaledb_experimental.show_policies('max_mat_view_date'); -SELECT timescaledb_experimental.remove_policies('max_mat_view_date', false, 'policy_refresh_continuous_aggregate'); - -SELECT timescaledb_experimental.add_policies('max_mat_view_date', refresh_end_offset => '2 day'::interval, refresh_start_offset=>'-infinity'); -SELECT timescaledb_experimental.show_policies('max_mat_view_date'); -SELECT timescaledb_experimental.remove_policies('max_mat_view_date', false, 'policy_refresh_continuous_aggregate'); - -SELECT timescaledb_experimental.add_policies('max_mat_view_date', refresh_start_offset => '2 day'::interval); -SELECT timescaledb_experimental.show_policies('max_mat_view_date'); -SELECT timescaledb_experimental.remove_policies('max_mat_view_date', false, 'policy_refresh_continuous_aggregate'); - -SELECT timescaledb_experimental.add_policies('max_mat_view_date', refresh_start_offset => '2 day'::interval, refresh_end_offset=>'infinity'); -SELECT timescaledb_experimental.show_policies('max_mat_view_date'); -SELECT timescaledb_experimental.remove_policies('max_mat_view_date', false, 'policy_refresh_continuous_aggregate'); - --- Open ended at both sides, for code coverage -SELECT timescaledb_experimental.add_policies('max_mat_view_date', refresh_end_offset => 'infinity', refresh_start_offset => '-infinity'); -SELECT timescaledb_experimental.show_policies('max_mat_view_date'); -SELECT timescaledb_experimental.remove_policies('max_mat_view_date', false, 'policy_refresh_continuous_aggregate'); - --- All policies are added in one step -SELECT timescaledb_experimental.add_policies('max_mat_view_date', refresh_start_offset => '15 days'::interval, refresh_end_offset => '1 day'::interval, compress_after => '20 days'::interval, drop_after => '25 days'::interval); -SELECT timescaledb_experimental.show_policies('max_mat_view_date'); - --- Alter policies -SELECT timescaledb_experimental.alter_policies('max_mat_view_date', refresh_start_offset => '16 days'::interval, compress_after=>'26 days'::interval, drop_after => '40 days'::interval); -SELECT timescaledb_experimental.show_policies('max_mat_view_date'); - ---Alter refresh_policy to make it open ended -SELECT timescaledb_experimental.remove_policies('max_mat_view_date', false, 'policy_retention', 'policy_compression'); -SELECT timescaledb_experimental.alter_policies('max_mat_view_date', refresh_start_offset =>'-infinity'); -SELECT timescaledb_experimental.show_policies('max_mat_view_date'); - -SELECT timescaledb_experimental.alter_policies('max_mat_view_date', refresh_end_offset =>'infinity', refresh_start_offset =>'5 days'::interval); -SELECT timescaledb_experimental.show_policies('max_mat_view_date'); - ---Cross policy checks --- Refresh and compression policies overlap -SELECT timescaledb_experimental.add_policies('max_mat_view_date', compress_after => '20 days'::interval, drop_after => '25 days'::interval); -SELECT timescaledb_experimental.alter_policies('max_mat_view_date', compress_after=> '4 days'::interval); -SELECT timescaledb_experimental.show_policies('max_mat_view_date'); - --- Refresh and retention policies overlap -SELECT timescaledb_experimental.alter_policies('max_mat_view_date', refresh_start_offset =>'5 days'::interval, drop_after=> '4 days'::interval); -SELECT timescaledb_experimental.show_policies('max_mat_view_date'); - ---Do not allow refreshed data to be deleted -SELECT add_retention_policy('continuous_agg_max_mat_date', '25 days'::interval); -SELECT timescaledb_experimental.alter_policies('max_mat_view_date', refresh_start_offset =>'25 days'::interval); -SELECT remove_retention_policy('continuous_agg_max_mat_date'); - --- Remove one or more policy --- Code coverage: no policy names provided -SELECT timescaledb_experimental.remove_policies('max_mat_view_date', false); - --- Code coverage: incorrect name of policy -SELECT timescaledb_experimental.remove_policies('max_mat_view_date', false, 'refresh_policy'); - -SELECT timescaledb_experimental.remove_policies('max_mat_view_date', false, 'policy_refresh_continuous_aggregate', 'policy_compression'); -SELECT timescaledb_experimental.show_policies('max_mat_view_date'); - --- Add one policy -SELECT timescaledb_experimental.add_policies('max_mat_view_date', refresh_start_offset => '15 day'::interval, refresh_end_offset => '1 day'::interval); -SELECT timescaledb_experimental.show_policies('max_mat_view_date'); - --- Remove all policies -SELECT * FROM timescaledb_experimental.policies ORDER BY relation_name, proc_name; -SELECT timescaledb_experimental.remove_all_policies(NULL); -- should fail -SELECT timescaledb_experimental.remove_all_policies('continuous_agg_max_mat_date'); -- should fail -SELECT timescaledb_experimental.remove_all_policies('max_mat_view_date', false); -SELECT timescaledb_experimental.remove_all_policies('max_mat_view_date', false); -- should fail -CREATE OR REPLACE FUNCTION custom_func(jobid int, args jsonb) RETURNS RECORD LANGUAGE SQL AS -$$ - VALUES($1, $2, 'custom_func'); -$$; - -- inject custom job -SELECT add_job('custom_func','1h', config:='{"type":"function"}'::jsonb, initial_start => '2000-01-01 00:00:00+00'::timestamptz) AS job_id \gset -SELECT _timescaledb_functions.alter_job_set_hypertable_id( :job_id, 'max_mat_view_date'::regclass); -SELECT * FROM timescaledb_information.jobs WHERE job_id != 1 ORDER BY 1; -SELECT timescaledb_experimental.remove_all_policies('max_mat_view_date', true); -- ignore custom job -SELECT delete_job(:job_id); -DROP FUNCTION custom_func; -SELECT timescaledb_experimental.show_policies('max_mat_view_date'); - -ALTER materialized view max_mat_view_date set (timescaledb.compress = false); - -SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', 10, '1 day'::interval); ---start_interval < end_interval -SELECT add_continuous_aggregate_policy('max_mat_view_date', '1 day'::interval, '2 days'::interval , '1 day'::interval) ; ---interval less than two buckets -SELECT add_continuous_aggregate_policy('max_mat_view_date', '7 days', '1 day', '1 day'::interval); -SELECT add_continuous_aggregate_policy('max_mat_view_date', '14 days', '1 day', '1 day'::interval); -SELECT add_continuous_aggregate_policy('max_mat_view_date', '13 days', '-10 hours', '1 day'::interval); -\set VERBOSITY terse -\set ON_ERROR_STOP 1 - --- Negative start offset gives two bucket window: -SELECT add_continuous_aggregate_policy('max_mat_view_date', '13 days', '-1 day', '1 day'::interval); -SELECT remove_continuous_aggregate_policy('max_mat_view_date'); --- Both offsets NULL: -SELECT add_continuous_aggregate_policy('max_mat_view_date', NULL, NULL, '1 day'::interval); -SELECT remove_continuous_aggregate_policy('max_mat_view_date'); - -SELECT add_continuous_aggregate_policy('max_mat_view_date', '15 days', '1 day', '1 day'::interval) as job_id \gset -SELECT config FROM _timescaledb_config.bgw_job -WHERE id = :job_id; - -INSERT INTO continuous_agg_max_mat_date - SELECT generate_series('2019-09-01'::date, '2019-09-10'::date, '1 day'); ---- to prevent NOTICES set message level to warning -SET client_min_messages TO warning; -CALL run_job(:job_id); -RESET client_min_messages; -DROP MATERIALIZED VIEW max_mat_view_date; - -CREATE TABLE continuous_agg_timestamp(time TIMESTAMP); -SELECT create_hypertable('continuous_agg_timestamp', 'time'); - -CREATE MATERIALIZED VIEW max_mat_view_timestamp - WITH (timescaledb.continuous, timescaledb.materialized_only=true) - AS SELECT time_bucket('7 days', time) - FROM continuous_agg_timestamp - GROUP BY 1 WITH NO DATA; - ---the start offset overflows the smallest time value, but is capped at ---the min value -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '1000000 years', '1 day' , '1 h'::interval); -SELECT remove_continuous_aggregate_policy('max_mat_view_timestamp'); - -\set ON_ERROR_STOP 0 -\set VERBOSITY default ---start and end offset capped at the lowest time value, which means ---zero size window -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '1000000 years', '900000 years' , '1 h'::interval); -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '301 days', '10 months' , '1 h'::interval); -\set VERBOSITY terse -\set ON_ERROR_STOP 1 - -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '15 days', '1 h'::interval , '1 h'::interval) as job_id \gset - ---- to prevent NOTICES set message level to warning -SET client_min_messages TO warning; -CALL run_job(:job_id); -RESET client_min_messages ; - -SELECT config FROM _timescaledb_config.bgw_job -WHERE id = :job_id; - -\c :TEST_DBNAME :ROLE_SUPERUSER - -SET timezone TO PST8PDT; - -UPDATE _timescaledb_config.bgw_job -SET config = jsonb_build_object('mat_hypertable_id', :mat_id) -WHERE id = :job_id; - -SET ROLE :ROLE_DEFAULT_PERM_USER; -SELECT config FROM _timescaledb_config.bgw_job where id = :job_id; -\set ON_ERROR_STOP 0 -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '15 day', '1 day', '1h'::interval, if_not_exists=>true); -SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', 'xyz', '1 day', '1h'::interval, if_not_exists=>true); -\set ON_ERROR_STOP 1 - -DROP MATERIALIZED VIEW max_mat_view_timestamp; - ---smallint table -CREATE TABLE smallint_tab (a smallint); -SELECT table_name FROM create_hypertable('smallint_tab', 'a', chunk_time_interval=> 10); -CREATE OR REPLACE FUNCTION integer_now_smallint_tab() returns smallint LANGUAGE SQL STABLE as $$ SELECT coalesce(max(a)::smallint, 0::smallint) FROM smallint_tab ; $$; -SELECT set_integer_now_func('smallint_tab', 'integer_now_smallint_tab'); - -CREATE MATERIALIZED VIEW mat_smallint( a, countb ) -WITH (timescaledb.continuous, timescaledb.materialized_only=true) -as -SELECT time_bucket( SMALLINT '1', a) , count(*) -FROM smallint_tab -GROUP BY 1 WITH NO DATA; -\set ON_ERROR_STOP 0 -\set VERBOSITY default - --- Test 1 step policy for smallint type buckets -ALTER materialized view mat_smallint set (timescaledb.compress = true); - --- All policies are added in one step -SELECT timescaledb_experimental.add_policies('mat_smallint', refresh_start_offset => 10::smallint, refresh_end_offset => 1::smallint, compress_after => 11::smallint, drop_after => 20::smallint); -SELECT timescaledb_experimental.show_policies('mat_smallint'); - --- Alter policies -SELECT timescaledb_experimental.alter_policies('mat_smallint', refresh_start_offset => 11::smallint, compress_after=>13::smallint, drop_after => 25::smallint); -SELECT timescaledb_experimental.show_policies('mat_smallint'); - -SELECT timescaledb_experimental.remove_all_policies('mat_smallint', false); -ALTER materialized view mat_smallint set (timescaledb.compress = false); - -SELECT add_continuous_aggregate_policy('mat_smallint', 15, 0 , '1 h'::interval); -SELECT add_continuous_aggregate_policy('mat_smallint', 98898::smallint , 0::smallint, '1 h'::interval); -SELECT add_continuous_aggregate_policy('mat_smallint', 5::smallint, 10::smallint , '1 h'::interval) as job_id \gset -\set VERBOSITY terse -\set ON_ERROR_STOP 1 -SELECT add_continuous_aggregate_policy('mat_smallint', 15::smallint, 0::smallint , '1 h'::interval) as job_id \gset -INSERT INTO smallint_tab VALUES(5); -INSERT INTO smallint_tab VALUES(10); -INSERT INTO smallint_tab VALUES(20); -CALL run_job(:job_id); -SELECT * FROM mat_smallint ORDER BY 1; - ---remove all the data-- -TRUNCATE table smallint_tab; -CALL refresh_continuous_aggregate('mat_smallint', NULL, NULL); -SELECT * FROM mat_smallint ORDER BY 1; - --- Case 1: overflow by subtracting from PG_INT16_MIN ---overflow start_interval, end_interval [-32768, -32768) -SELECT remove_continuous_aggregate_policy('mat_smallint'); -INSERT INTO smallint_tab VALUES( -32768 ); -SELECT integer_now_smallint_tab(); -SELECT add_continuous_aggregate_policy('mat_smallint', 10::smallint, 5::smallint , '1 h'::interval) as job_id \gset - -\set ON_ERROR_STOP 0 -CALL run_job(:job_id); -\set ON_ERROR_STOP 1 -SELECT * FROM mat_smallint ORDER BY 1; - --- overflow start_interval. now this runs as range is capped [-32768, -32765) -INSERT INTO smallint_tab VALUES( -32760 ); -SELECT maxval, maxval - 10, maxval -5 FROM integer_now_smallint_tab() as maxval; -CALL run_job(:job_id); -SELECT * FROM mat_smallint ORDER BY 1; - ---remove all the data-- -TRUNCATE table smallint_tab; -CALL refresh_continuous_aggregate('mat_smallint', NULL, NULL); -SELECT * FROM mat_smallint ORDER BY 1; - --- Case 2: overflow by subtracting from PG_INT16_MAX ---overflow start and end . will fail as range is [32767, 32767] -SELECT remove_continuous_aggregate_policy('mat_smallint'); -INSERT INTO smallint_tab VALUES( 32766 ); -INSERT INTO smallint_tab VALUES( 32767 ); -SELECT maxval, maxval - (-1), maxval - (-2) FROM integer_now_smallint_tab() as maxval; -SELECT add_continuous_aggregate_policy('mat_smallint', -1::smallint, -3::smallint , '1 h'::interval) as job_id \gset -\set ON_ERROR_STOP 0 -CALL run_job(:job_id); -\set ON_ERROR_STOP 1 -SELECT * FROM mat_smallint ORDER BY 1; - -SELECT remove_continuous_aggregate_policy('mat_smallint'); ---overflow end . will work range is [32765, 32767) -SELECT maxval, maxval - (1), maxval - (-2) FROM integer_now_smallint_tab() as maxval; -SELECT add_continuous_aggregate_policy('mat_smallint', 1::smallint, -3::smallint , '1 h'::interval) as job_id \gset -\set ON_ERROR_STOP 0 -CALL run_job(:job_id); -SELECT * FROM mat_smallint ORDER BY 1; - --- tests for interval argument conversions --- -\set ON_ERROR_STOP 0 -SELECT add_continuous_aggregate_policy('mat_smallint', 15, 10, '1h'::interval, if_not_exists=>true); -SELECT add_continuous_aggregate_policy('mat_smallint', '15', 10, '1h'::interval, if_not_exists=>true); -SELECT add_continuous_aggregate_policy('mat_smallint', '15', '10', '1h'::interval, if_not_exists=>true); -\set ON_ERROR_STOP 1 - - ---bigint table -CREATE TABLE bigint_tab (a bigint); -SELECT table_name FROM create_hypertable('bigint_tab', 'a', chunk_time_interval=> 10); -CREATE OR REPLACE FUNCTION integer_now_bigint_tab() returns bigint LANGUAGE SQL STABLE as $$ SELECT 20::bigint $$; -SELECT set_integer_now_func('bigint_tab', 'integer_now_bigint_tab'); - -CREATE MATERIALIZED VIEW mat_bigint( a, countb ) -WITH (timescaledb.continuous, timescaledb.materialized_only=true) -as -SELECT time_bucket( BIGINT '1', a) , count(*) -FROM bigint_tab -GROUP BY 1 WITH NO DATA; - --- Test 1 step policy for bigint type buckets -ALTER materialized view mat_bigint set (timescaledb.compress = true); - --- All policies are added in one step -SELECT timescaledb_experimental.add_policies('mat_bigint', refresh_start_offset => 10::bigint, refresh_end_offset => 1::bigint, compress_after => 11::bigint, drop_after => 20::bigint); -SELECT timescaledb_experimental.show_policies('mat_bigint'); - --- Alter policies -SELECT timescaledb_experimental.alter_policies('mat_bigint', refresh_start_offset => 11::bigint, compress_after=>13::bigint, drop_after => 25::bigint); -SELECT timescaledb_experimental.show_policies('mat_bigint'); - -SELECT timescaledb_experimental.remove_all_policies('mat_bigint', false); -ALTER materialized view mat_bigint set (timescaledb.compress = false); - -\set ON_ERROR_STOP 0 -SELECT add_continuous_aggregate_policy('mat_bigint', 5::bigint, 10::bigint , '1 h'::interval) ; -\set ON_ERROR_STOP 1 -SELECT add_continuous_aggregate_policy('mat_bigint', 15::bigint, 0::bigint , '1 h'::interval) as job_mid \gset -INSERT INTO bigint_tab VALUES(5); -INSERT INTO bigint_tab VALUES(10); -INSERT INTO bigint_tab VALUES(20); -CALL run_job(:job_mid); -SELECT * FROM mat_bigint ORDER BY 1; - --- test NULL for end -SELECT remove_continuous_aggregate_policy('mat_bigint'); -SELECT add_continuous_aggregate_policy('mat_bigint', 1::smallint, NULL , '1 h'::interval) as job_id \gset -INSERT INTO bigint_tab VALUES(500); -CALL run_job(:job_id); -SELECT * FROM mat_bigint WHERE a>100 ORDER BY 1; - -ALTER MATERIALIZED VIEW mat_bigint SET (timescaledb.compress); -ALTER MATERIALIZED VIEW mat_smallint SET (timescaledb.compress); --- With immutable compressed chunks, these policies would fail by overlapping the refresh window -SELECT add_compression_policy('mat_smallint', -4::smallint); -SELECT remove_compression_policy('mat_smallint'); -SELECT add_compression_policy('mat_bigint', 0::bigint); -SELECT remove_compression_policy('mat_bigint'); --- End previous limitation tests - -SELECT add_compression_policy('mat_smallint', 5::smallint); -SELECT add_compression_policy('mat_bigint', 20::bigint); - --- end of coverage tests - ---TEST continuous aggregate + compression policy on caggs -CREATE TABLE metrics ( - time timestamptz NOT NULL, - device_id int, - device_id_peer int, - v0 int, - v1 int, - v2 float, - v3 float -); - -SELECT create_hypertable('metrics', 'time'); - -INSERT INTO metrics (time, device_id, device_id_peer, v0, v1, v2, v3) -SELECT time, - device_id, - 0, - device_id + 1, - device_id + 2, - 0.5, - NULL -FROM generate_series('2000-01-01 0:00:00+0'::timestamptz, '2000-01-02 23:55:00+0', '20m') gtime (time), - generate_series(1, 2, 1) gdevice (device_id); - -ALTER TABLE metrics SET ( timescaledb.compress ); -SELECT compress_chunk(ch) FROM show_chunks('metrics') ch; - -CREATE MATERIALIZED VIEW metrics_cagg WITH (timescaledb.continuous, - timescaledb.materialized_only = true) -AS -SELECT time_bucket('1 day', time) as dayb, device_id, - sum(v0), avg(v3) -FROM metrics -GROUP BY 1, 2 -WITH NO DATA; - --- this was previously crashing -SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, NULL, '1 h'::interval, if_not_exists => true); -\set ON_ERROR_STOP 0 -SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, '1 day'::interval, '1 h'::interval, if_not_exists => true); -SELECT remove_continuous_aggregate_policy('metrics_cagg'); -SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, '1 day'::interval, '1h'::interval, if_not_exists=>true); -SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, '1 day'::interval, '1h'::interval, if_not_exists=>true); -- same param values, so we get a NOTICE -SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, NULL, '1h'::interval, if_not_exists=>true); -- different values, so we get a WARNING -SELECT remove_continuous_aggregate_policy('metrics_cagg'); ---can set compression policy only after setting up refresh policy -- -SELECT add_compression_policy('metrics_cagg', '1 day'::interval); - ---can set compression policy only after enabling compression -- -SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, '1 day'::interval, '1 h'::interval) as "REFRESH_JOB" \gset -SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ; -ALTER MATERIALIZED VIEW metrics_cagg SET (timescaledb.compress); - ---cannot use compress_created_before with cagg -SELECT add_compression_policy('metrics_cagg', compress_created_before => '8 day'::interval) AS "COMP_JOB" ; -\set ON_ERROR_STOP 1 - - -SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ; -SELECT remove_compression_policy('metrics_cagg'); -SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" \gset - ---verify that jobs were added for the policies --- -SELECT materialization_hypertable_name AS "MAT_TABLE_NAME", - view_name AS "VIEW_NAME" -FROM timescaledb_information.continuous_aggregates -WHERE view_name = 'metrics_cagg' \gset - -SELECT count(*) FROM timescaledb_information.jobs -WHERE hypertable_name = :'VIEW_NAME'; - ---exec the cagg compression job -- -CALL refresh_continuous_aggregate('metrics_cagg', NULL, '2001-02-01 00:00:00+0'); -CALL run_job(:COMP_JOB); -SELECT count(*), count(*) FILTER ( WHERE is_compressed is TRUE ) -FROM timescaledb_information.chunks -WHERE hypertable_name = :'MAT_TABLE_NAME' ORDER BY 1; - ---add some new data into metrics_cagg so that cagg policy job has something to do -INSERT INTO metrics (time, device_id, device_id_peer, v0, v1, v2, v3) -SELECT now() - '5 day'::interval, 102, 0, 10, 10, 10, 10; -CALL run_job(:REFRESH_JOB); ---now we have a new chunk and it is not compressed -SELECT count(*), count(*) FILTER ( WHERE is_compressed is TRUE ) -FROM timescaledb_information.chunks -WHERE hypertable_name = :'MAT_TABLE_NAME' ORDER BY 1; - ---verify that both jobs are dropped when view is dropped -DROP MATERIALIZED VIEW metrics_cagg; - -SELECT count(*) FROM timescaledb_information.jobs -WHERE hypertable_name = :'VIEW_NAME'; - --- add test case for issue 4252 -CREATE TABLE IF NOT EXISTS sensor_data( -time TIMESTAMPTZ NOT NULL, -sensor_id INTEGER, -temperature DOUBLE PRECISION, -cpu DOUBLE PRECISION); - -SELECT create_hypertable('sensor_data','time'); - -INSERT INTO sensor_data(time, sensor_id, cpu, temperature) -SELECT -time, -sensor_id, -extract(dow from time) AS cpu, -extract(doy from time) AS temperature -FROM -generate_series('2022-05-05'::timestamp at time zone 'UTC' - interval '6 weeks', '2022-05-05'::timestamp at time zone 'UTC', interval '5 hours') as g1(time), -generate_series(1,1000,1) as g2(sensor_id); - -CREATE materialized view deals_best_weekly -WITH (timescaledb.continuous) AS -SELECT -time_bucket('7 days', "time") AS bucket, -avg(temperature) AS avg_temp, -max(cpu) AS max_rating -FROM sensor_data -GROUP BY bucket -WITH NO DATA; - -CREATE materialized view deals_best_daily -WITH (timescaledb.continuous) AS -SELECT -time_bucket('1 day', "time") AS bucket, -avg(temperature) AS avg_temp, -max(cpu) AS max_rating -FROM sensor_data -GROUP BY bucket -WITH NO DATA; - -ALTER materialized view deals_best_weekly set (timescaledb.materialized_only=true); -ALTER materialized view deals_best_daily set (timescaledb.materialized_only=true); - --- we have data from 6 weeks before to May 5 2022 (Thu) -CALL refresh_continuous_aggregate('deals_best_weekly', '2022-04-24', '2022-05-03'); -SELECT * FROM deals_best_weekly ORDER BY bucket; -CALL refresh_continuous_aggregate('deals_best_daily', '2022-04-20', '2022-05-04'); -SELECT * FROM deals_best_daily ORDER BY bucket LIMIT 2; --- expect to get an up-to-date notice -CALL refresh_continuous_aggregate('deals_best_weekly', '2022-04-24', '2022-05-05'); -SELECT * FROM deals_best_weekly ORDER BY bucket; - --- github issue 5907: segfault when creating 1-step policies on cagg --- whose underlying hypertable has a retention policy setup -CREATE TABLE t(a integer NOT NULL, b integer); -SELECT create_hypertable('t', 'a', chunk_time_interval=> 10); - -CREATE OR REPLACE FUNCTION unix_now() returns int LANGUAGE SQL IMMUTABLE as $$ SELECT extract(epoch from now())::INT $$; -SELECT set_integer_now_func('t', 'unix_now'); - -SELECT add_retention_policy('t', 20); - -CREATE MATERIALIZED VIEW cagg(a, sumb) WITH (timescaledb.continuous) -AS SELECT time_bucket(1, a), sum(b) - FROM t GROUP BY time_bucket(1, a); - -SELECT timescaledb_experimental.add_policies('cagg'); - --- Issue #6902 -<<<<<<< HEAD --- Fix timestamp out of range in a refresh policy when setting `end_offset=>NULL` --- for a CAgg with variable sized bucket (i.e: using `time_bucket` with timezone) -======= ->>>>>>> cda33125c (Fix timestamp out of range in CAgg refresh policy) -CREATE TABLE issue_6902 ( - ts TIMESTAMPTZ NOT NULL, - temperature NUMERIC -) WITH ( - timescaledb.hypertable, - timescaledb.partition_column='ts', - timescaledb.chunk_interval='1 day', - timescaledb.compress='off' -); - -INSERT INTO issue_6902 -SELECT t, 1 FROM generate_series(now() - interval '3 hours', now(), interval '1 minute') AS t; - -CREATE MATERIALIZED VIEW issue_6902_by_hour -WITH (timescaledb.continuous) AS -SELECT - time_bucket(INTERVAL '1 hour', ts, 'America/Sao_Paulo') AS bucket, -- using timezone - MAX(temperature), - MIN(temperature), - COUNT(*) -FROM issue_6902 -GROUP BY 1 -WITH NO DATA; - -SELECT add_continuous_aggregate_policy ( - 'issue_6902_by_hour', - start_offset => INTERVAL '3 hours', - end_offset => NULL, - schedule_interval => INTERVAL '12 hour', - initial_start => now() + INTERVAL '12 hour' -) AS job_id \gset - --- 181 rows -CALL run_job(:job_id); -SELECT count(*) FROM issue_6902; - --- run again without any change, remain 181 rows -CALL run_job(:job_id); -SELECT count(*) FROM issue_6902; - --- change existing data -UPDATE issue_6902 -SET temperature = temperature + 1; - --- run again without any change, remain 181 rows -CALL run_job(:job_id); -SELECT count(*) FROM issue_6902; - --- insert more data -INSERT INTO issue_6902 -SELECT t, 1 FROM generate_series(now() - interval '3 hours', now(), interval '1 minute') AS t; - --- run again without and should have 362 rows -CALL run_job(:job_id); -SELECT count(*) FROM issue_6902; diff --git a/tsl/test/sql/cagg_refresh_migrate.sql b/tsl/test/sql/cagg_refresh_migrate.sql new file mode 100644 index 00000000000..7f27cb554b6 --- /dev/null +++ b/tsl/test/sql/cagg_refresh_migrate.sql @@ -0,0 +1,130 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +-- Test that refresh works even when migrating between invalidation +-- methods. +-- +-- Since inserts into the hypertable ends up in either the WAL or the +-- hypertable invalidation log table, we check that moving between +-- them actually does what is expected and move all invalidations to +-- the materialization log. + +CREATE VIEW hypertable_invalidation_thresholds AS +SELECT format('%I.%I', ht.schema_name, ht.table_name)::regclass AS hypertable, + _timescaledb_functions.to_timestamp_without_timezone(watermark) AS threshold + FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold + JOIN _timescaledb_catalog.hypertable ht + ON hypertable_id = ht.id; + +CREATE VIEW materialization_invalidations AS +SELECT ca.user_view_name AS aggregate_name, + ht.table_name, + _timescaledb_functions.to_timestamp_without_timezone(lowest_modified_value) AS lowest, + _timescaledb_functions.to_timestamp_without_timezone(greatest_modified_value) AS greatest + FROM _timescaledb_catalog.continuous_agg ca + JOIN _timescaledb_catalog.continuous_aggs_materialization_invalidation_log ml + ON ca.mat_hypertable_id = ml.materialization_id + JOIN _timescaledb_catalog.hypertable ht + ON materialization_id = ht.id + WHERE lowest_modified_value BETWEEN 0 AND 1759302000000000 + AND greatest_modified_value BETWEEN 0 AND 1759302000000000; + +CREATE TABLE device_readings ( + created_at timestamp NOT NULL, + device_id text NOT NULL, + metric double precision NOT NULL, + PRIMARY KEY (device_id, created_at) +); + +SELECT table_name FROM create_hypertable ('device_readings', 'created_at'); + +CREATE MATERIALIZED VIEW device_summary_hourly +WITH (timescaledb.continuous, timescaledb.materialized_only = TRUE) AS +SELECT + time_bucket ('1 hour', created_at) AS bucket, + device_id, + sum(metric) AS metric_sum, + max(metric) - min(metric) AS metric_spread +FROM + device_readings +GROUP BY + bucket, + device_id +WITH NO DATA; + +CREATE MATERIALIZED VIEW device_summary_daily +WITH (timescaledb.continuous, timescaledb.materialized_only = TRUE) AS +SELECT + time_bucket ('1 day', created_at) AS bucket, + device_id, + sum(metric) AS metric_sum, + max(metric) - min(metric) AS metric_spread +FROM + device_readings +GROUP BY + bucket, + device_id +WITH NO DATA; + +-- Shall be empty +SELECT * FROM device_summary_hourly; +SELECT * FROM device_summary_daily; + +-- Call refresh to set the threshold correctly. +CALL refresh_continuous_aggregate('device_summary_hourly', NULL, '2025-10-01'); +CALL refresh_continuous_aggregate('device_summary_daily', NULL, '2025-10-01'); +SELECT * FROM hypertable_invalidation_thresholds ORDER BY 1,2; + +-- Insert something before the threshold, run refresh, and check the result +INSERT INTO device_readings VALUES + ('2025-09-10 12:34:56', 1, 1), + ('2025-09-10 12:34:57', 1, 2); +CALL refresh_continuous_aggregate('device_summary_hourly', NULL, '2025-10-01'); +CALL refresh_continuous_aggregate('device_summary_daily', NULL, '2025-10-01'); +SELECT * FROM device_summary_hourly; +SELECT * FROM device_summary_daily; + +-- +-- Test that we can migrate to use WAL without losing any invalidations. +-- + +-- Now insert new values in the hypertable +INSERT INTO device_readings(created_at, device_id, metric) +SELECT created_at, device_id, 1 + FROM generate_series('2025-09-11 11:00'::timestamptz, '2025-09-11 11:59', '1 minute'::interval) created_at, + generate_series(1, 4) device_id; + +-- Migrate the table to use WAL. We should see the value in the +-- materialization log after the migration +SELECT * FROM materialization_invalidations; +CALL _timescaledb_functions.set_invalidation_method('wal', 'device_readings'); +SELECT * FROM materialization_invalidations; + +-- Refresh it to check that the refresh works as expected +CALL refresh_continuous_aggregate('device_summary_hourly', NULL, '2025-10-01'); +CALL refresh_continuous_aggregate('device_summary_daily', NULL, '2025-10-01'); +SELECT * FROM device_summary_hourly; +SELECT * FROM device_summary_daily; + +-- +-- Test that we can migrate back to use trigger without losing any invalidations. +-- + +-- Insert new values in the hypertable +INSERT INTO device_readings(created_at, device_id, metric) +SELECT created_at, device_id, 1 + FROM generate_series('2025-09-12 11:00'::timestamptz, '2025-09-12 11:59', '1 minute'::interval) created_at, + generate_series(1, 4) device_id; + +-- Migrate the table to use WAL. We should see the value in the +-- materialization log after the migration +SELECT * FROM materialization_invalidations; +CALL _timescaledb_functions.set_invalidation_method('trigger', 'device_readings'); +SELECT * FROM materialization_invalidations; + +-- Refresh it to check that the refresh works as expected +CALL refresh_continuous_aggregate('device_summary_hourly', NULL, '2025-10-01'); +CALL refresh_continuous_aggregate('device_summary_daily', NULL, '2025-10-01'); +SELECT * FROM device_summary_hourly; +SELECT * FROM device_summary_daily; diff --git a/tsl/test/sql/cagg_usage.sql.in b/tsl/test/sql/cagg_usage.sql.in index 970226765fe..de4615f8e19 100644 --- a/tsl/test/sql/cagg_usage.sql.in +++ b/tsl/test/sql/cagg_usage.sql.in @@ -390,10 +390,13 @@ SELECT count(*) FROM invalidation_slots WHERE database = current_database(); CREATE TABLE magic1(time timestamptz not null, device int, value float); CREATE TABLE magic2(time timestamptz not null, device int, value float); CREATE TABLE magic3(time timestamptz not null, device int, value float); +CREATE TABLE no_caggs(time timestamptz not null, device int, value float); +CREATE TABLE not_hypertable(time timestamptz not null, device int, value float); SELECT table_name FROM create_hypertable('magic1','time'); SELECT table_name FROM create_hypertable('magic2','time'); SELECT table_name FROM create_hypertable('magic3','time'); +SELECT table_name FROM create_hypertable('no_caggs','time'); INSERT INTO magic1 SELECT generate_series('1999-12-20'::timestamptz, '2000-02-01'::timestamptz, '12 day'::interval), @@ -569,3 +572,38 @@ ALTER MATERIALIZED VIEW magic3_day_summary_trigger SET (timescaledb.invalidate_using = 'wal'); \set ON_ERROR_STOP 1 + +-- Test functions to migrate between invalidation collection methods. + +\set VERBOSITY default +\set ON_ERROR_STOP 0 +-- Wrong method used +CALL _timescaledb_functions.set_invalidation_method('wrong', 'magic1'); +-- Already using WAL +CALL _timescaledb_functions.set_invalidation_method('wal', 'magic1'); +-- Already using trigger +CALL _timescaledb_functions.set_invalidation_method('trigger', 'magic3'); +-- Hypertable without continuous aggregates, which will not have a +-- trigger, hence look like it is WAL-based. +CALL _timescaledb_functions.set_invalidation_method('trigger', 'no_caggs'); +-- Normal table, which definitely do not have any triggers and might +-- look like it is WAL-based. +CALL _timescaledb_functions.set_invalidation_method('trigger', 'not_hypertable'); +\set ON_ERROR_STOP 1 + +SELECT hypertable_name, view_name, invalidate_using + FROM timescaledb_information.continuous_aggregates + WHERE hypertable_name = 'magic3'; + +CALL _timescaledb_functions.set_invalidation_method('wal', 'magic3'); + +SELECT hypertable_name, view_name, invalidate_using + FROM timescaledb_information.continuous_aggregates + WHERE hypertable_name = 'magic3'; + +CALL _timescaledb_functions.set_invalidation_method('trigger', 'magic3'); + +SELECT hypertable_name, view_name, invalidate_using + FROM timescaledb_information.continuous_aggregates + WHERE hypertable_name = 'magic3'; +