From e7489d2cb7d0fd8c1101a9276e4ef6347a92915f Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 30 Apr 2024 12:25:28 -0400 Subject: [PATCH] Handle database failures when deleting exchanges A common case for exchange deletion is that callers want the deletion to be idempotent: they treat the `ok` and `{error, not_found}` returns from `rabbit_exchange:delete/3` the same way. To simplify these callsites we add a `rabbit_exchange:ensure_deleted/3` that wraps `rabbit_exchange:delete/3` and returns `ok` when the exchange did not exist. Part of this commit is to update callsites to use this helper. The other part is to handle the `rabbit_khepri:timeout()` error possible when Khepri is in a minority. For most callsites this is just a matter of adding a branch to their `case` clauses and an appropriate error and message. --- deps/rabbit/src/rabbit_amqp_management.erl | 11 +++++-- deps/rabbit/src/rabbit_channel.erl | 11 ++++--- deps/rabbit/src/rabbit_db_exchange.erl | 5 ++- deps/rabbit/src/rabbit_exchange.erl | 31 +++++++++++++++++-- deps/rabbit/src/rabbit_logger_exchange_h.erl | 11 ++++++- deps/rabbit/src/rabbit_vhost.erl | 2 +- deps/rabbit/test/bindings_SUITE.erl | 3 +- deps/rabbit/test/cluster_minority_SUITE.erl | 9 ++++++ deps/rabbit/test/exchanges_SUITE.erl | 3 +- .../src/rabbit_exchange_type_event.erl | 9 ++++-- .../src/rabbit_stream_manager.erl | 9 +++--- 11 files changed, 84 insertions(+), 20 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index 503b26d5d2..ea1fdf75a3 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -285,8 +285,15 @@ handle_http_req(<<"DELETE">>, ok = prohibit_default_exchange(XName), ok = prohibit_reserved_amq(XName), PermCache = check_resource_access(XName, configure, User, PermCache0), - _ = rabbit_exchange:delete(XName, false, Username), - {<<"204">>, null, {PermCache, TopicPermCache}}; + case rabbit_exchange:ensure_deleted(XName, false, Username) of + ok -> + {<<"204">>, null, {PermCache, TopicPermCache}}; + {error, timeout} -> + throw( + <<"500">>, + "failed to delete exchange '~ts' due to a timeout", + [XNameBin]) + end; handle_http_req(<<"POST">>, [<<"bindings">>], diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index cdd34b1609..123795416f 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -2512,13 +2512,16 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, check_not_default_exchange(ExchangeName), check_exchange_deletion(ExchangeName), check_configure_permitted(ExchangeName, User, AuthzContext), - case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of - {error, not_found} -> + case rabbit_exchange:ensure_deleted(ExchangeName, IfUnused, Username) of + ok -> ok; {error, in_use} -> rabbit_misc:precondition_failed("~ts in use", [rabbit_misc:rs(ExchangeName)]); - ok -> - ok + {error, timeout} -> + rabbit_misc:protocol_error( + internal_error, + "failed to delete exchange '~ts' due to a timeout", + [rabbit_misc:rs(ExchangeName)]) end; handle_method(#'queue.purge'{queue = QueueNameBin}, ConnPid, AuthzContext, _CollectorPid, VHostPath, User) -> diff --git a/deps/rabbit/src/rabbit_db_exchange.erl b/deps/rabbit/src/rabbit_db_exchange.erl index 6ee4d8704e..486e715ad5 100644 --- a/deps/rabbit/src/rabbit_db_exchange.erl +++ b/deps/rabbit/src/rabbit_db_exchange.erl @@ -561,7 +561,10 @@ next_serial_in_khepri_tx(#exchange{name = XName}) -> Exchange :: rabbit_types:exchange(), Binding :: rabbit_types:binding(), Deletions :: dict:dict(), - Ret :: {error, not_found} | {error, in_use} | {deleted, Exchange, [Binding], Deletions}. + Ret :: {deleted, Exchange, [Binding], Deletions} | + {error, not_found} | + {error, in_use} | + rabbit_khepri:timeout_error(). %% @doc Deletes an exchange record from the database. If `IfUnused' is set %% to `true', it is only deleted when there are no bindings present on the %% exchange. diff --git a/deps/rabbit/src/rabbit_exchange.erl b/deps/rabbit/src/rabbit_exchange.erl index 22fbaafb69..10388ea8a4 100644 --- a/deps/rabbit/src/rabbit_exchange.erl +++ b/deps/rabbit/src/rabbit_exchange.erl @@ -13,7 +13,8 @@ lookup/1, lookup_many/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2, update_scratch/3, update_decorators/2, immutable/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4, - route/2, route/3, delete/3, validate_binding/2, count/0]). + route/2, route/3, delete/3, validate_binding/2, count/0, + ensure_deleted/3]). -export([list_names/0]). -export([serialise_events/1]). -export([serial/1, peek_serial/1]). @@ -444,9 +445,13 @@ cons_if_present(XName, L) -> -spec delete (name(), 'true', rabbit_types:username()) -> - 'ok'| rabbit_types:error('not_found' | 'in_use'); + 'ok' | + rabbit_types:error('not_found' | 'in_use') | + rabbit_khepri:timeout_error(); (name(), 'false', rabbit_types:username()) -> - 'ok' | rabbit_types:error('not_found'). + 'ok' | + rabbit_types:error('not_found') | + rabbit_khepri:timeout_error(). delete(XName, IfUnused, Username) -> try @@ -478,6 +483,26 @@ process_deletions({deleted, #exchange{name = XName} = X, Bs, Deletions}) -> rabbit_binding:add_deletion( XName, {X, deleted, Bs}, Deletions)). +-spec ensure_deleted(ExchangeName, IfUnused, Username) -> Ret when + ExchangeName :: name(), + IfUnused :: boolean(), + Username :: rabbit_types:username(), + Ret :: ok | + rabbit_types:error('in_use') | + rabbit_khepri:timeout_error(). +%% @doc A wrapper around `delete/3' which returns `ok' in the case that the +%% exchange did not exist at time of deletion. + +ensure_deleted(XName, IfUnused, Username) -> + case delete(XName, IfUnused, Username) of + ok -> + ok; + {error, not_found} -> + ok; + {error, _} = Err -> + Err + end. + -spec validate_binding (rabbit_types:exchange(), rabbit_types:binding()) -> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}). diff --git a/deps/rabbit/src/rabbit_logger_exchange_h.erl b/deps/rabbit/src/rabbit_logger_exchange_h.erl index 9cdde43b59..69f3522d3a 100644 --- a/deps/rabbit/src/rabbit_logger_exchange_h.erl +++ b/deps/rabbit/src/rabbit_logger_exchange_h.erl @@ -196,7 +196,16 @@ unconfigure_exchange( virtual_host = VHost} = Exchange, setup_proc := Pid}}) -> Pid ! stop, - _ = rabbit_exchange:delete(Exchange, false, ?INTERNAL_USER), + case rabbit_exchange:ensure_deleted(Exchange, false, ?INTERNAL_USER) of + ok -> + ok; + {error, timeout} -> + ?LOG_ERROR( + "Could not delete exchange '~ts' in vhost '~ts' due to a timeout", + [Name, VHost], + #{domain => ?RMQLOG_DOMAIN_GLOBAL}), + ok + end, ?LOG_INFO( "Logging to exchange '~ts' in vhost '~ts' disabled", [Name, VHost], diff --git a/deps/rabbit/src/rabbit_vhost.erl b/deps/rabbit/src/rabbit_vhost.erl index 42838f4451..e9982765c1 100644 --- a/deps/rabbit/src/rabbit_vhost.erl +++ b/deps/rabbit/src/rabbit_vhost.erl @@ -275,7 +275,7 @@ delete(VHost, ActingUser) -> assert_benign(rabbit_amqqueue:with(Name, QDelFun), ActingUser) end || Q <- rabbit_amqqueue:list(VHost)], rabbit_log:info("Deleting exchanges in vhost '~ts' because it's being deleted", [VHost]), - [assert_benign(rabbit_exchange:delete(Name, false, ActingUser), ActingUser) || + [ok = rabbit_exchange:ensure_deleted(Name, false, ActingUser) || #exchange{name = Name} <- rabbit_exchange:list(VHost)], rabbit_log:info("Clearing policies and runtime parameters in vhost '~ts' because it's being deleted", [VHost]), _ = rabbit_runtime_parameters:clear_vhost(VHost, ActingUser), diff --git a/deps/rabbit/test/bindings_SUITE.erl b/deps/rabbit/test/bindings_SUITE.erl index 5ffb010b26..b80a09eb1a 100644 --- a/deps/rabbit/test/bindings_SUITE.erl +++ b/deps/rabbit/test/bindings_SUITE.erl @@ -873,7 +873,8 @@ delete_queues() -> || Q <- rabbit_amqqueue:list()]. delete_exchange(Name) -> - _ = rabbit_exchange:delete(rabbit_misc:r(<<"/">>, exchange, Name), false, <<"dummy">>). + ok = rabbit_exchange:ensure_deleted( + rabbit_misc:r(<<"/">>, exchange, Name), false, <<"dummy">>). declare(Ch, Q, Args) -> declare(Ch, Q, Args, true). diff --git a/deps/rabbit/test/cluster_minority_SUITE.erl b/deps/rabbit/test/cluster_minority_SUITE.erl index a3ec055a03..a6a8f4759b 100644 --- a/deps/rabbit/test/cluster_minority_SUITE.erl +++ b/deps/rabbit/test/cluster_minority_SUITE.erl @@ -24,6 +24,7 @@ groups() -> {client_operations, [], [open_connection, open_channel, declare_exchange, + delete_exchange, declare_binding, delete_binding, declare_queue, @@ -100,6 +101,8 @@ init_per_group(Group, Config0) when Group == client_operations; #'exchange.bind_ok'{} = amqp_channel:call(Ch, #'exchange.bind'{destination = <<"amq.fanout">>, source = <<"amq.direct">>, routing_key = <<"binding-to-be-deleted">>}), + %% To be used in delete_exchange + #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = <<"exchange-to-be-deleted">>}), %% Lower the default Khepri command timeout. By default this is set %% to 30s in `rabbit_khepri:setup/1' which makes the cases in this @@ -157,6 +160,12 @@ declare_exchange(Config) -> ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _}, amqp_channel:call(Ch, #'exchange.declare'{exchange = <<"test-exchange">>})). +delete_exchange(Config) -> + [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A), + ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _}, + amqp_channel:call(Ch, #'exchange.delete'{exchange = <<"exchange-to-be-deleted">>})). + declare_binding(Config) -> [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A), diff --git a/deps/rabbit/test/exchanges_SUITE.erl b/deps/rabbit/test/exchanges_SUITE.erl index b0f5694dce..e74cd95917 100644 --- a/deps/rabbit/test/exchanges_SUITE.erl +++ b/deps/rabbit/test/exchanges_SUITE.erl @@ -340,7 +340,8 @@ delete_queues() -> || Q <- rabbit_amqqueue:list()]. delete_exchange(Name) -> - _ = rabbit_exchange:delete(rabbit_misc:r(<<"/">>, exchange, Name), false, <<"dummy">>). + ok = rabbit_exchange:ensure_deleted( + rabbit_misc:r(<<"/">>, exchange, Name), false, <<"dummy">>). declare(Ch, Q, Args) -> declare(Ch, Q, Args, true). diff --git a/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl b/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl index 3c75cb03d9..81a191e475 100644 --- a/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl +++ b/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl @@ -43,8 +43,13 @@ register() -> gen_event:add_handler(rabbit_event, ?MODULE, []). unregister() -> - _ = rabbit_exchange:delete(exchange(), false, ?INTERNAL_USER), - gen_event:delete_handler(rabbit_event, ?MODULE, []). + case rabbit_exchange:ensure_deleted(exchange(), false, ?INTERNAL_USER) of + ok -> + gen_event:delete_handler(rabbit_event, ?MODULE, []), + ok; + {error, _} = Err -> + Err + end. exchange() -> exchange(get_vhost()). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 9137fefc86..d0032f3890 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -889,11 +889,12 @@ delete_super_stream_exchange(VirtualHost, Name, Username) -> case rabbit_stream_utils:enforce_correct_name(Name) of {ok, CorrectName} -> ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName), - case rabbit_exchange:delete(ExchangeName, false, Username) of - {error, not_found} -> - ok; + case rabbit_exchange:ensure_deleted( + ExchangeName, false, Username) of ok -> - ok + ok; + {error, timeout} = Err -> + Err end; error -> {error, validation_failed}