Skip to content

Commit b2f122b

Browse files
committed
Add migration to/from WAL-based invalidation
To support migration from trigger-based invalidation collection to WAL-based invalidation collection, or the opposite, the function `_timescaledb_functions.cagg_change_invalidation_method` is introduced. When called with a hypertable (that has continuous aggregates connected) and an invalidation method, it will flush the hypertable invalidations and changes the collection method for the hypertable to use the requested one. It will changes the collection method for *all* continuous aggregates connected to the given hypertable, not just for a single continuous aggregate. Also adds tests for the migration to check that it works properly.
1 parent 752bdf9 commit b2f122b

25 files changed

+778
-816
lines changed

.unreleased/pr_8738

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Implements: #8738 Add function to migrate between WAL-based and trigger-based invalidation collection

sql/cagg_api.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ DECLARE
4242
BEGIN
4343
SELECT ht.id AS hypertable_id,
4444
di.column_type::regtype,
45-
EXISTS(SELECT FROM _timescaledb_catalog.continuous_agg where raw_hypertable_id = ht.id) AS has_cagg
45+
EXISTS(SELECT FROM _timescaledb_catalog.continuous_agg WHERE raw_hypertable_id = ht.id) AS has_cagg
4646
INTO info
4747
FROM _timescaledb_catalog.hypertable ht
4848
JOIN _timescaledb_catalog.dimension di ON ht.id = di.hypertable_id

sql/cagg_migrate.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,3 +674,10 @@ $BODY$;
674674
-- into a CAgg using the regular time_bucket function
675675
CREATE OR REPLACE PROCEDURE _timescaledb_functions.cagg_migrate_to_time_bucket(cagg REGCLASS)
676676
AS '@MODULE_PATHNAME@', 'ts_continuous_agg_migrate_to_time_bucket' LANGUAGE C;
677+
678+
-- Migrate all continuous aggregates attached to a hypertable to use
679+
-- WAL-based invalidation or trigger-based invalidation.
680+
CREATE OR REPLACE PROCEDURE _timescaledb_functions.set_invalidation_method(
681+
hypertable REGCLASS,
682+
method NAME
683+
) AS '@MODULE_PATHNAME@', 'ts_continuous_agg_change_collection_method' LANGUAGE C;

src/cross_module_fn.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ CROSSMODULE_WRAPPER(bloom1_contains);
9494
CROSSMODULE_WRAPPER(continuous_agg_invalidation_trigger);
9595
CROSSMODULE_WRAPPER(continuous_agg_refresh);
9696
CROSSMODULE_WRAPPER(continuous_agg_process_hypertable_invalidations);
97+
CROSSMODULE_WRAPPER(continuous_agg_change_collection_method);
9798
CROSSMODULE_WRAPPER(continuous_agg_validate_query);
9899
CROSSMODULE_WRAPPER(continuous_agg_get_bucket_function);
99100
CROSSMODULE_WRAPPER(continuous_agg_get_bucket_function_info);
@@ -364,6 +365,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
364365
.continuous_agg_process_hypertable_invalidations = error_no_default_fn_pg_community,
365366
.continuous_agg_invalidate_raw_ht = continuous_agg_invalidate_raw_ht_all_default,
366367
.continuous_agg_invalidate_mat_ht = continuous_agg_invalidate_mat_ht_all_default,
368+
.continuous_agg_change_collection_method = error_no_default_fn_pg_community,
367369
.continuous_agg_update_options = continuous_agg_update_options_default,
368370
.continuous_agg_validate_query = error_no_default_fn_pg_community,
369371
.continuous_agg_get_bucket_function = error_no_default_fn_pg_community,

src/cross_module_fn.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ typedef struct CrossModuleFunctions
106106
PGFunction continuous_agg_invalidation_trigger;
107107
PGFunction continuous_agg_refresh;
108108
PGFunction continuous_agg_process_hypertable_invalidations;
109+
PGFunction continuous_agg_change_collection_method;
109110
void (*continuous_agg_invalidate_raw_ht)(const Hypertable *raw_ht, int64 start, int64 end);
110111
void (*continuous_agg_invalidate_mat_ht)(const Hypertable *raw_ht, const Hypertable *mat_ht,
111112
int64 start, int64 end);

src/ts_catalog/catalog.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1333,7 +1333,7 @@ extern bool ts_is_catalog_table(Oid relid);
13331333

13341334
/* Functions should operate on a passed-in Catalog struct */
13351335
static inline Oid
1336-
catalog_get_table_id(Catalog *catalog, CatalogTable tableid)
1336+
catalog_get_table_id(const Catalog *catalog, CatalogTable tableid)
13371337
{
13381338
return catalog->tables[tableid].id;
13391339
}

src/ts_catalog/continuous_agg.c

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,8 @@ init_materialization_invalidation_log_scan_by_materialization_id(ScanIterator *i
155155
Int32GetDatum(materialization_id));
156156
}
157157

158-
static int32
159-
number_of_continuous_aggs_attached(int32 raw_hypertable_id)
158+
int32
159+
ts_number_of_continuous_aggs_attached(int32 raw_hypertable_id)
160160
{
161161
ScanIterator iterator =
162162
ts_scan_iterator_create(CONTINUOUS_AGG, AccessShareLock, CurrentMemoryContext);
@@ -537,8 +537,8 @@ ts_continuous_agg_get_all_caggs_info(int32 raw_hypertable_id)
537537
* A hypertable is using the WAL-based invalidation collection if it has a
538538
* attached continuous aggregate but does not have an invalidation trigger.
539539
*/
540-
static bool
541-
hypertable_invalidation_slot_used(void)
540+
bool
541+
ts_hypertable_invalidation_slot_used(void)
542542
{
543543
ScanIterator iterator =
544544
ts_scan_iterator_create(CONTINUOUS_AGG, AccessShareLock, CurrentMemoryContext);
@@ -910,7 +910,7 @@ drop_continuous_agg(FormData_continuous_agg *cadata, bool drop_user_view)
910910

911911
raw_hypertable_has_other_caggs =
912912
OidIsValid(raw_hypertable.objectId) &&
913-
number_of_continuous_aggs_attached(cadata->raw_hypertable_id) > 1;
913+
ts_number_of_continuous_aggs_attached(cadata->raw_hypertable_id) > 1;
914914

915915
if (!raw_hypertable_has_other_caggs)
916916
{
@@ -992,7 +992,8 @@ drop_continuous_agg(FormData_continuous_agg *cadata, bool drop_user_view)
992992
*/
993993
char slot_name[TS_INVALIDATION_SLOT_NAME_MAX];
994994
ts_get_invalidation_replication_slot_name(slot_name, sizeof(slot_name));
995-
if (!hypertable_invalidation_slot_used() && SearchNamedReplicationSlot(slot_name, true) != NULL)
995+
if (!ts_hypertable_invalidation_slot_used() &&
996+
SearchNamedReplicationSlot(slot_name, true) != NULL)
996997
ts_hypertable_drop_invalidation_replication_slot(slot_name);
997998

998999
if (OidIsValid(mat_hypertable.objectId))

src/ts_catalog/continuous_agg.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_view_name(const char
177177
ContinuousAggViewType type);
178178
extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_relid(Oid relid);
179179
extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_rv(const RangeVar *rv);
180+
extern TSDLLEXPORT bool ts_hypertable_invalidation_slot_used(void);
180181

181182
extern bool ts_continuous_agg_drop(const char *view_schema, const char *view_name);
182183
extern void ts_continuous_agg_drop_hypertable_callback(int32 hypertable_id);
@@ -214,3 +215,4 @@ ts_continuous_agg_fixed_bucket_width(const ContinuousAggBucketFunction *bucket_f
214215
extern TSDLLEXPORT int64
215216
ts_continuous_agg_bucket_width(const ContinuousAggBucketFunction *bucket_function);
216217
extern TSDLLEXPORT void ts_get_invalidation_replication_slot_name(char *slotname, Size szslot);
218+
extern TSDLLEXPORT int32 ts_number_of_continuous_aggs_attached(int32 raw_hypertable_id);

tsl/src/continuous_aggs/common.c

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1653,3 +1653,32 @@ cagg_find_groupingcols(ContinuousAgg *agg, Hypertable *mat_ht)
16531653
}
16541654
return retlist;
16551655
}
1656+
1657+
/*
1658+
* Get all hypertables that are using WAL-invalidations.
1659+
*/
1660+
List *
1661+
get_all_wal_using_hypertables(void)
1662+
{
1663+
ScanIterator iterator =
1664+
ts_scan_iterator_create(CONTINUOUS_AGG, AccessShareLock, CurrentMemoryContext);
1665+
List *hypertables = NIL;
1666+
1667+
/* Collect OID of all tables using continuous aggregates */
1668+
ts_scanner_foreach(&iterator)
1669+
{
1670+
bool isnull;
1671+
Datum datum = slot_getattr(ts_scan_iterator_slot(&iterator),
1672+
Anum_continuous_agg_raw_hypertable_id,
1673+
&isnull);
1674+
1675+
Assert(!isnull);
1676+
int32 hypertable_id = DatumGetInt32(datum);
1677+
Oid relid = ts_hypertable_id_to_relid(hypertable_id, false);
1678+
if (!has_invalidation_trigger(relid))
1679+
hypertables = list_append_unique_int(hypertables, hypertable_id);
1680+
}
1681+
ts_scan_iterator_close(&iterator);
1682+
1683+
return hypertables;
1684+
}

tsl/src/continuous_aggs/common.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,5 @@ cagg_get_time_min(const ContinuousAgg *cagg)
151151
return ts_time_get_min(cagg->partition_type);
152152
}
153153

154-
ContinuousAggBucketFunction *ts_cagg_get_bucket_function_info(Oid view_oid);
154+
extern ContinuousAggBucketFunction *ts_cagg_get_bucket_function_info(Oid view_oid);
155+
extern List *get_all_wal_using_hypertables(void);

0 commit comments

Comments
 (0)