Handle database failures when deleting exchanges
A common case for exchange deletion is that callers want the deletion
to be idempotent: they treat the `ok` and `{error, not_found}` returns
from `rabbit_exchange:delete/3` the same way. To simplify these
callsites we add a `rabbit_exchange:ensure_deleted/3` that wraps
`rabbit_exchange:delete/3` and returns `ok` when the exchange did not
exist. Part of this commit is to update callsites to use this helper.
The other part is to handle the `rabbit_khepri:timeout()` error possible
when Khepri is in a minority. For most callsites this is just a matter
of adding a branch to their `case` clauses and an appropriate error and
message.
This commit is contained in:
parent
80f599b001
commit
e7489d2cb7
|
|
@ -285,8 +285,15 @@ handle_http_req(<<"DELETE">>,
|
|||
ok = prohibit_default_exchange(XName),
|
||||
ok = prohibit_reserved_amq(XName),
|
||||
PermCache = check_resource_access(XName, configure, User, PermCache0),
|
||||
_ = rabbit_exchange:delete(XName, false, Username),
|
||||
{<<"204">>, null, {PermCache, TopicPermCache}};
|
||||
case rabbit_exchange:ensure_deleted(XName, false, Username) of
|
||||
ok ->
|
||||
{<<"204">>, null, {PermCache, TopicPermCache}};
|
||||
{error, timeout} ->
|
||||
throw(
|
||||
<<"500">>,
|
||||
"failed to delete exchange '~ts' due to a timeout",
|
||||
[XNameBin])
|
||||
end;
|
||||
|
||||
handle_http_req(<<"POST">>,
|
||||
[<<"bindings">>],
|
||||
|
|
|
|||
|
|
@ -2512,13 +2512,16 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
|
|||
check_not_default_exchange(ExchangeName),
|
||||
check_exchange_deletion(ExchangeName),
|
||||
check_configure_permitted(ExchangeName, User, AuthzContext),
|
||||
case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of
|
||||
{error, not_found} ->
|
||||
case rabbit_exchange:ensure_deleted(ExchangeName, IfUnused, Username) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, in_use} ->
|
||||
rabbit_misc:precondition_failed("~ts in use", [rabbit_misc:rs(ExchangeName)]);
|
||||
ok ->
|
||||
ok
|
||||
{error, timeout} ->
|
||||
rabbit_misc:protocol_error(
|
||||
internal_error,
|
||||
"failed to delete exchange '~ts' due to a timeout",
|
||||
[rabbit_misc:rs(ExchangeName)])
|
||||
end;
|
||||
handle_method(#'queue.purge'{queue = QueueNameBin},
|
||||
ConnPid, AuthzContext, _CollectorPid, VHostPath, User) ->
|
||||
|
|
|
|||
|
|
@ -561,7 +561,10 @@ next_serial_in_khepri_tx(#exchange{name = XName}) ->
|
|||
Exchange :: rabbit_types:exchange(),
|
||||
Binding :: rabbit_types:binding(),
|
||||
Deletions :: dict:dict(),
|
||||
Ret :: {error, not_found} | {error, in_use} | {deleted, Exchange, [Binding], Deletions}.
|
||||
Ret :: {deleted, Exchange, [Binding], Deletions} |
|
||||
{error, not_found} |
|
||||
{error, in_use} |
|
||||
rabbit_khepri:timeout_error().
|
||||
%% @doc Deletes an exchange record from the database. If `IfUnused' is set
|
||||
%% to `true', it is only deleted when there are no bindings present on the
|
||||
%% exchange.
|
||||
|
|
|
|||
|
|
@ -13,7 +13,8 @@
|
|||
lookup/1, lookup_many/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
|
||||
update_scratch/3, update_decorators/2, immutable/1,
|
||||
info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4,
|
||||
route/2, route/3, delete/3, validate_binding/2, count/0]).
|
||||
route/2, route/3, delete/3, validate_binding/2, count/0,
|
||||
ensure_deleted/3]).
|
||||
-export([list_names/0]).
|
||||
-export([serialise_events/1]).
|
||||
-export([serial/1, peek_serial/1]).
|
||||
|
|
@ -444,9 +445,13 @@ cons_if_present(XName, L) ->
|
|||
|
||||
-spec delete
|
||||
(name(), 'true', rabbit_types:username()) ->
|
||||
'ok'| rabbit_types:error('not_found' | 'in_use');
|
||||
'ok' |
|
||||
rabbit_types:error('not_found' | 'in_use') |
|
||||
rabbit_khepri:timeout_error();
|
||||
(name(), 'false', rabbit_types:username()) ->
|
||||
'ok' | rabbit_types:error('not_found').
|
||||
'ok' |
|
||||
rabbit_types:error('not_found') |
|
||||
rabbit_khepri:timeout_error().
|
||||
|
||||
delete(XName, IfUnused, Username) ->
|
||||
try
|
||||
|
|
@ -478,6 +483,26 @@ process_deletions({deleted, #exchange{name = XName} = X, Bs, Deletions}) ->
|
|||
rabbit_binding:add_deletion(
|
||||
XName, {X, deleted, Bs}, Deletions)).
|
||||
|
||||
-spec ensure_deleted(ExchangeName, IfUnused, Username) -> Ret when
|
||||
ExchangeName :: name(),
|
||||
IfUnused :: boolean(),
|
||||
Username :: rabbit_types:username(),
|
||||
Ret :: ok |
|
||||
rabbit_types:error('in_use') |
|
||||
rabbit_khepri:timeout_error().
|
||||
%% @doc A wrapper around `delete/3' which returns `ok' in the case that the
|
||||
%% exchange did not exist at time of deletion.
|
||||
|
||||
ensure_deleted(XName, IfUnused, Username) ->
|
||||
case delete(XName, IfUnused, Username) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, not_found} ->
|
||||
ok;
|
||||
{error, _} = Err ->
|
||||
Err
|
||||
end.
|
||||
|
||||
-spec validate_binding
|
||||
(rabbit_types:exchange(), rabbit_types:binding())
|
||||
-> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}).
|
||||
|
|
|
|||
|
|
@ -196,7 +196,16 @@ unconfigure_exchange(
|
|||
virtual_host = VHost} = Exchange,
|
||||
setup_proc := Pid}}) ->
|
||||
Pid ! stop,
|
||||
_ = rabbit_exchange:delete(Exchange, false, ?INTERNAL_USER),
|
||||
case rabbit_exchange:ensure_deleted(Exchange, false, ?INTERNAL_USER) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, timeout} ->
|
||||
?LOG_ERROR(
|
||||
"Could not delete exchange '~ts' in vhost '~ts' due to a timeout",
|
||||
[Name, VHost],
|
||||
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
|
||||
ok
|
||||
end,
|
||||
?LOG_INFO(
|
||||
"Logging to exchange '~ts' in vhost '~ts' disabled",
|
||||
[Name, VHost],
|
||||
|
|
|
|||
|
|
@ -275,7 +275,7 @@ delete(VHost, ActingUser) ->
|
|||
assert_benign(rabbit_amqqueue:with(Name, QDelFun), ActingUser)
|
||||
end || Q <- rabbit_amqqueue:list(VHost)],
|
||||
rabbit_log:info("Deleting exchanges in vhost '~ts' because it's being deleted", [VHost]),
|
||||
[assert_benign(rabbit_exchange:delete(Name, false, ActingUser), ActingUser) ||
|
||||
[ok = rabbit_exchange:ensure_deleted(Name, false, ActingUser) ||
|
||||
#exchange{name = Name} <- rabbit_exchange:list(VHost)],
|
||||
rabbit_log:info("Clearing policies and runtime parameters in vhost '~ts' because it's being deleted", [VHost]),
|
||||
_ = rabbit_runtime_parameters:clear_vhost(VHost, ActingUser),
|
||||
|
|
|
|||
|
|
@ -873,7 +873,8 @@ delete_queues() ->
|
|||
|| Q <- rabbit_amqqueue:list()].
|
||||
|
||||
delete_exchange(Name) ->
|
||||
_ = rabbit_exchange:delete(rabbit_misc:r(<<"/">>, exchange, Name), false, <<"dummy">>).
|
||||
ok = rabbit_exchange:ensure_deleted(
|
||||
rabbit_misc:r(<<"/">>, exchange, Name), false, <<"dummy">>).
|
||||
|
||||
declare(Ch, Q, Args) ->
|
||||
declare(Ch, Q, Args, true).
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ groups() ->
|
|||
{client_operations, [], [open_connection,
|
||||
open_channel,
|
||||
declare_exchange,
|
||||
delete_exchange,
|
||||
declare_binding,
|
||||
delete_binding,
|
||||
declare_queue,
|
||||
|
|
@ -100,6 +101,8 @@ init_per_group(Group, Config0) when Group == client_operations;
|
|||
#'exchange.bind_ok'{} = amqp_channel:call(Ch, #'exchange.bind'{destination = <<"amq.fanout">>,
|
||||
source = <<"amq.direct">>,
|
||||
routing_key = <<"binding-to-be-deleted">>}),
|
||||
%% To be used in delete_exchange
|
||||
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = <<"exchange-to-be-deleted">>}),
|
||||
|
||||
%% Lower the default Khepri command timeout. By default this is set
|
||||
%% to 30s in `rabbit_khepri:setup/1' which makes the cases in this
|
||||
|
|
@ -157,6 +160,12 @@ declare_exchange(Config) ->
|
|||
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
|
||||
amqp_channel:call(Ch, #'exchange.declare'{exchange = <<"test-exchange">>})).
|
||||
|
||||
delete_exchange(Config) ->
|
||||
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
|
||||
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
|
||||
amqp_channel:call(Ch, #'exchange.delete'{exchange = <<"exchange-to-be-deleted">>})).
|
||||
|
||||
declare_binding(Config) ->
|
||||
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
|
||||
|
|
|
|||
|
|
@ -340,7 +340,8 @@ delete_queues() ->
|
|||
|| Q <- rabbit_amqqueue:list()].
|
||||
|
||||
delete_exchange(Name) ->
|
||||
_ = rabbit_exchange:delete(rabbit_misc:r(<<"/">>, exchange, Name), false, <<"dummy">>).
|
||||
ok = rabbit_exchange:ensure_deleted(
|
||||
rabbit_misc:r(<<"/">>, exchange, Name), false, <<"dummy">>).
|
||||
|
||||
declare(Ch, Q, Args) ->
|
||||
declare(Ch, Q, Args, true).
|
||||
|
|
|
|||
|
|
@ -43,8 +43,13 @@ register() ->
|
|||
gen_event:add_handler(rabbit_event, ?MODULE, []).
|
||||
|
||||
unregister() ->
|
||||
_ = rabbit_exchange:delete(exchange(), false, ?INTERNAL_USER),
|
||||
gen_event:delete_handler(rabbit_event, ?MODULE, []).
|
||||
case rabbit_exchange:ensure_deleted(exchange(), false, ?INTERNAL_USER) of
|
||||
ok ->
|
||||
gen_event:delete_handler(rabbit_event, ?MODULE, []),
|
||||
ok;
|
||||
{error, _} = Err ->
|
||||
Err
|
||||
end.
|
||||
|
||||
exchange() ->
|
||||
exchange(get_vhost()).
|
||||
|
|
|
|||
|
|
@ -889,11 +889,12 @@ delete_super_stream_exchange(VirtualHost, Name, Username) ->
|
|||
case rabbit_stream_utils:enforce_correct_name(Name) of
|
||||
{ok, CorrectName} ->
|
||||
ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName),
|
||||
case rabbit_exchange:delete(ExchangeName, false, Username) of
|
||||
{error, not_found} ->
|
||||
ok;
|
||||
case rabbit_exchange:ensure_deleted(
|
||||
ExchangeName, false, Username) of
|
||||
ok ->
|
||||
ok
|
||||
ok;
|
||||
{error, timeout} = Err ->
|
||||
Err
|
||||
end;
|
||||
error ->
|
||||
{error, validation_failed}
|
||||
|
|
|
|||
Loading…
Reference in New Issue