diff --git a/.unreleased/pr_7566 b/.unreleased/pr_7566 new file mode 100644 index 00000000000..ca3c51d8392 --- /dev/null +++ b/.unreleased/pr_7566 @@ -0,0 +1,2 @@ +Fixes: #7566 Improve transaction check in CAgg refresh +Thanks: @staticlibs for sending PR to improve transaction check in CAgg refresh diff --git a/src/process_utility.c b/src/process_utility.c index a390b7caa58..e23178b2057 100644 --- a/src/process_utility.c +++ b/src/process_utility.c @@ -88,6 +88,7 @@ void _process_utility_fini(void); static ProcessUtility_hook_type prev_ProcessUtility_hook; static bool expect_chunk_modification = false; +static ProcessUtilityContext last_process_utility_context = PROCESS_UTILITY_TOPLEVEL; static DDLResult process_altertable_set_options(AlterTableCmd *cmd, Hypertable *ht); static DDLResult process_altertable_reset_options(AlterTableCmd *cmd, Hypertable *ht); @@ -106,6 +107,13 @@ prev_ProcessUtility(ProcessUtilityArgs *args) args->queryEnv, args->dest, args->completion_tag); + + /* + * Reset the last_process_utility_context value that is saved at the + * entrance of the TS ProcessUtility hook and can be used for transaction + * checks inside refresh_cagg and other procedures. + */ + ts_process_utility_context_reset(); } static void @@ -4584,6 +4592,8 @@ timescaledb_ddl_command_start(PlannedStmt *pstmt, const char *query_string, bool QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletion *completion_tag) { + last_process_utility_context = context; + ProcessUtilityArgs args = { .query_string = query_string, .context = context, .params = params, @@ -4698,6 +4708,19 @@ ts_process_utility_set_expect_chunk_modification(bool expect) expect_chunk_modification = expect; } +bool +ts_process_utility_is_context_nonatomic(void) +{ + ProcessUtilityContext context = last_process_utility_context; + return context == PROCESS_UTILITY_TOPLEVEL || context == PROCESS_UTILITY_QUERY_NONATOMIC; +} + +void +ts_process_utility_context_reset(void) +{ + last_process_utility_context = PROCESS_UTILITY_TOPLEVEL; +} + static void process_utility_xact_abort(XactEvent event, void *arg) { diff --git a/src/process_utility.h b/src/process_utility.h index 49f019abd85..cfa16f8ab15 100644 --- a/src/process_utility.h +++ b/src/process_utility.h @@ -36,3 +36,56 @@ typedef enum typedef DDLResult (*ts_process_utility_handler_t)(ProcessUtilityArgs *args); extern void ts_process_utility_set_expect_chunk_modification(bool expect); + +/* + * Procedures that use multiple transactions cannot be run in a transaction + * block (from a function, from dynamic SQL) or in a subtransaction (from a + * procedure block with an EXCEPTION clause). Such procedures use + * PreventInTransactionBlock function to check whether they can be run. + * + * Though currently such checks are incomplete, because + * PreventInTransactionBlock requires isTopLevel argument to throw a + * consistent error when the call originates from a function. This + * isTopLevel flag (that is a bit poorly named - see below) is not readily + * available inside C procedures. The source of truth for it - + * ProcessUtilityContext parameter is passed to ProcessUtility hooks, but + * is not included with the function calls. There is an undocumented + * SPI_inside_nonatomic_context function, that would have been sufficient + * for isTopLevel flag, but it currently returns false when SPI connection + * is absent (that is a valid scenario when C procedures are called from + * top-lelev SQL instead of PLPG procedures or DO blocks) so it cannot be + * used. + * + * To work around this the value of ProcessUtilityContext parameter is + * saved when TS ProcessUtility hook is entered and can be accessed from + * C procedures using new ts_process_utility_is_context_nonatomic function. + * The result is called "non-atomic" instead of "top-level" because the way + * how isTopLevel flag is determined from the ProcessUtilityContext value + * in standard_ProcessUtility is insufficient for C procedures - it + * excludes PROCESS_UTILITY_QUERY_NONATOMIC value (used when called from + * PLPG procedure without an EXCEPTION clause) that is a valid use case for + * C procedures with transactions. See details in the description of + * ExecuteCallStmt function. + * + * It is expected that calls to C procedures are done with CALL and always + * pass though the ProcessUtility hook. The ProcessUtilityContext + * parameter is set to PROCESS_UTILITY_TOPLEVEL value by default. In + * unlikely case when a C procedure is called without passing through + * ProcessUtility hook and the call is done in atomic context, then + * PreventInTransactionBlock checks will pass, but SPI_commit will fail + * when checking that all current active snapshots are portal-owned + * snapshots (the same behaviour that was observed before this change). + * In atomic context there will be an additional snapshot set in + * _SPI_execute_plan, see the snapshot handling invariants description + * in that function. + */ +extern TSDLLEXPORT bool ts_process_utility_is_context_nonatomic(void); + +/* + * Currently in TS ProcessUtility hook the saved ProcessUtilityContext + * value is reset back to PROCESS_UTILITY_TOPLEVEL on normal exit but + * is NOT reset in case of ereport exit. C procedures can call this + * function to reset the saved value before doing the checks that can + * result in ereport exit. + */ +extern TSDLLEXPORT void ts_process_utility_context_reset(void); diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index b6097dbc8c5..ba5004b372d 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -31,6 +31,7 @@ #include "invalidation.h" #include "invalidation_threshold.h" #include "materialize.h" +#include "process_utility.h" #include "refresh.h" #define CAGG_REFRESH_LOG_LEVEL (callctx == CAGG_REFRESH_POLICY ? LOG : DEBUG1) @@ -764,6 +765,24 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, int32 mat_id = cagg->data.mat_hypertable_id; InternalTimeRange refresh_window = *refresh_window_arg; int64 invalidation_threshold; + bool nonatomic = ts_process_utility_is_context_nonatomic(); + + /* Reset the saved ProcessUtilityContext value promptly before + * calling Prevent* checks so the potential unsupported (atomic) + * value won't linger there in case of ereport exit. + */ + ts_process_utility_context_reset(); + + PreventCommandIfReadOnly(REFRESH_FUNCTION_NAME); + + /* Prevent running refresh if we're in a transaction block since a refresh + * can run two transactions and might take a long time to release locks if + * there's a lot to materialize. Strictly, it is optional to prohibit + * transaction blocks since there will be only one transaction if the + * invalidation threshold needs no update. However, materialization might + * still take a long time and it is probably best for consistency to always + * prevent transaction blocks. */ + PreventInTransactionBlock(nonatomic, REFRESH_FUNCTION_NAME); /* Connect to SPI manager due to the underlying SPI calls */ int rc = SPI_connect_ext(SPI_OPT_NONATOMIC); @@ -780,17 +799,6 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, get_relkind_objtype(get_rel_relkind(cagg->relid)), get_rel_name(cagg->relid)); - PreventCommandIfReadOnly(REFRESH_FUNCTION_NAME); - - /* Prevent running refresh if we're in a transaction block since a refresh - * can run two transactions and might take a long time to release locks if - * there's a lot to materialize. Strictly, it is optional to prohibit - * transaction blocks since there will be only one transaction if the - * invalidation threshold needs no update. However, materialization might - * still take a long time and it is probably best for consistency to always - * prevent transaction blocks. */ - PreventInTransactionBlock(true, REFRESH_FUNCTION_NAME); - /* No bucketing when open ended */ if (!(start_isnull && end_isnull)) { diff --git a/tsl/test/expected/cagg_refresh.out b/tsl/test/expected/cagg_refresh.out index 291ce335272..0c37449b708 100644 --- a/tsl/test/expected/cagg_refresh.out +++ b/tsl/test/expected/cagg_refresh.out @@ -489,3 +489,68 @@ SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp FROM conditions GROUP BY 1,2 WITH NO DATA; COMMIT; +-- refresh_continuous_aggregate can run two transactions, thus it cannot be +-- called in a transaction block (from a function, from dynamic SQL) or in a +-- subtransaction (from a procedure block with an EXCEPTION clause). Though it +-- does NOT require a top level context and can be called from a procedure +-- block without an EXCEPTION clause. +-- DO block +DO $$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +END; $$; +psql:include/cagg_refresh_common.sql:347: NOTICE: continuous aggregate "daily_temp" is already up-to-date +-- Procedure without subtransaction +CREATE OR REPLACE PROCEDURE refresh_cagg_proc_normal() +LANGUAGE PLPGSQL AS +$$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +END; $$; +CALL refresh_cagg_proc_normal(); +psql:include/cagg_refresh_common.sql:357: NOTICE: continuous aggregate "daily_temp" is already up-to-date +\set ON_ERROR_STOP 0 +-- Procedure with subtransaction +CREATE OR REPLACE PROCEDURE refresh_cagg_proc_subtransaction() +LANGUAGE PLPGSQL AS +$$ +DECLARE + errmsg TEXT; +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS errmsg = MESSAGE_TEXT; + RAISE EXCEPTION '%', errmsg; +END; $$; +CALL refresh_cagg_proc_subtransaction(); +psql:include/cagg_refresh_common.sql:374: ERROR: refresh_continuous_aggregate() cannot run inside a transaction block +-- Function +CREATE OR REPLACE FUNCTION refresh_cagg_fun() +RETURNS INT LANGUAGE PLPGSQL AS +$$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); + RETURN 1; +END; $$; +SELECT * from refresh_cagg_fun(); +psql:include/cagg_refresh_common.sql:385: ERROR: refresh_continuous_aggregate() cannot be executed from a function +-- Dynamic SQL +DO $$ +BEGIN + EXECUTE $inner$ + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); + $inner$; +END; $$; +psql:include/cagg_refresh_common.sql:393: ERROR: refresh_continuous_aggregate() cannot be executed from a function +-- Trigger +CREATE TABLE refresh_cagg_trigger_table(a int); +CREATE FUNCTION refresh_cagg_trigger_fun() +RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +END; $$; +CREATE TRIGGER refresh_cagg_trigger AFTER INSERT ON refresh_cagg_trigger_table +EXECUTE FUNCTION refresh_cagg_trigger_fun(); +INSERT INTO refresh_cagg_trigger_table VALUES(1); +psql:include/cagg_refresh_common.sql:407: ERROR: refresh_continuous_aggregate() cannot be executed from a function +\set ON_ERROR_STOP 1 diff --git a/tsl/test/expected/cagg_refresh_using_merge.out b/tsl/test/expected/cagg_refresh_using_merge.out index 6ad8fa3a46b..43f157f9140 100644 --- a/tsl/test/expected/cagg_refresh_using_merge.out +++ b/tsl/test/expected/cagg_refresh_using_merge.out @@ -490,6 +490,71 @@ SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp FROM conditions GROUP BY 1,2 WITH NO DATA; COMMIT; +-- refresh_continuous_aggregate can run two transactions, thus it cannot be +-- called in a transaction block (from a function, from dynamic SQL) or in a +-- subtransaction (from a procedure block with an EXCEPTION clause). Though it +-- does NOT require a top level context and can be called from a procedure +-- block without an EXCEPTION clause. +-- DO block +DO $$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +END; $$; +psql:include/cagg_refresh_common.sql:347: NOTICE: continuous aggregate "daily_temp" is already up-to-date +-- Procedure without subtransaction +CREATE OR REPLACE PROCEDURE refresh_cagg_proc_normal() +LANGUAGE PLPGSQL AS +$$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +END; $$; +CALL refresh_cagg_proc_normal(); +psql:include/cagg_refresh_common.sql:357: NOTICE: continuous aggregate "daily_temp" is already up-to-date +\set ON_ERROR_STOP 0 +-- Procedure with subtransaction +CREATE OR REPLACE PROCEDURE refresh_cagg_proc_subtransaction() +LANGUAGE PLPGSQL AS +$$ +DECLARE + errmsg TEXT; +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS errmsg = MESSAGE_TEXT; + RAISE EXCEPTION '%', errmsg; +END; $$; +CALL refresh_cagg_proc_subtransaction(); +psql:include/cagg_refresh_common.sql:374: ERROR: refresh_continuous_aggregate() cannot run inside a transaction block +-- Function +CREATE OR REPLACE FUNCTION refresh_cagg_fun() +RETURNS INT LANGUAGE PLPGSQL AS +$$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); + RETURN 1; +END; $$; +SELECT * from refresh_cagg_fun(); +psql:include/cagg_refresh_common.sql:385: ERROR: refresh_continuous_aggregate() cannot be executed from a function +-- Dynamic SQL +DO $$ +BEGIN + EXECUTE $inner$ + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); + $inner$; +END; $$; +psql:include/cagg_refresh_common.sql:393: ERROR: refresh_continuous_aggregate() cannot be executed from a function +-- Trigger +CREATE TABLE refresh_cagg_trigger_table(a int); +CREATE FUNCTION refresh_cagg_trigger_fun() +RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +END; $$; +CREATE TRIGGER refresh_cagg_trigger AFTER INSERT ON refresh_cagg_trigger_table +EXECUTE FUNCTION refresh_cagg_trigger_fun(); +INSERT INTO refresh_cagg_trigger_table VALUES(1); +psql:include/cagg_refresh_common.sql:407: ERROR: refresh_continuous_aggregate() cannot be executed from a function +\set ON_ERROR_STOP 1 -- Additional tests for MERGE refresh DROP TABLE conditions CASCADE; NOTICE: drop cascades to 10 other objects diff --git a/tsl/test/sql/include/cagg_refresh_common.sql b/tsl/test/sql/include/cagg_refresh_common.sql index 83400d9a245..e5443a62844 100644 --- a/tsl/test/sql/include/cagg_refresh_common.sql +++ b/tsl/test/sql/include/cagg_refresh_common.sql @@ -306,3 +306,77 @@ SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp FROM conditions GROUP BY 1,2 WITH NO DATA; COMMIT; + +-- refresh_continuous_aggregate can run two transactions, thus it cannot be +-- called in a transaction block (from a function, from dynamic SQL) or in a +-- subtransaction (from a procedure block with an EXCEPTION clause). Though it +-- does NOT require a top level context and can be called from a procedure +-- block without an EXCEPTION clause. + +-- DO block +DO $$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +END; $$; + +-- Procedure without subtransaction +CREATE OR REPLACE PROCEDURE refresh_cagg_proc_normal() +LANGUAGE PLPGSQL AS +$$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +END; $$; + +CALL refresh_cagg_proc_normal(); + +\set ON_ERROR_STOP 0 + +-- Procedure with subtransaction +CREATE OR REPLACE PROCEDURE refresh_cagg_proc_subtransaction() +LANGUAGE PLPGSQL AS +$$ +DECLARE + errmsg TEXT; +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS errmsg = MESSAGE_TEXT; + RAISE EXCEPTION '%', errmsg; +END; $$; + +CALL refresh_cagg_proc_subtransaction(); + +-- Function +CREATE OR REPLACE FUNCTION refresh_cagg_fun() +RETURNS INT LANGUAGE PLPGSQL AS +$$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); + RETURN 1; +END; $$; + +SELECT * from refresh_cagg_fun(); + +-- Dynamic SQL +DO $$ +BEGIN + EXECUTE $inner$ + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); + $inner$; +END; $$; + +-- Trigger +CREATE TABLE refresh_cagg_trigger_table(a int); + +CREATE FUNCTION refresh_cagg_trigger_fun() +RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ +BEGIN + CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC'); +END; $$; + +CREATE TRIGGER refresh_cagg_trigger AFTER INSERT ON refresh_cagg_trigger_table +EXECUTE FUNCTION refresh_cagg_trigger_fun(); + +INSERT INTO refresh_cagg_trigger_table VALUES(1); + +\set ON_ERROR_STOP 1