rabbit_db_binding: Use a sproc for binding deletion

Note that the ChecksFun arg was always passed `fun(_, _,) -> ok end` so
we eliminate this part of the deletion transaction.
This commit is contained in:
Michael Davis 2024-10-02 11:12:55 -04:00
parent 9f757ab515
commit c429d267ca
No known key found for this signature in database
1 changed files with 39 additions and 30 deletions

View File

@ -7,6 +7,8 @@
-module(rabbit_db_binding). -module(rabbit_db_binding).
-feature(maybe_expr, enable).
-include_lib("khepri/include/khepri.hrl"). -include_lib("khepri/include/khepri.hrl").
-include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit_common/include/rabbit.hrl").
@ -59,6 +61,7 @@
-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding). -define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding).
-define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route). -define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route).
-define(KHEPRI_CREATION_SPROC_PATH, [rabbitmq, sprocs, ?MODULE, create]). -define(KHEPRI_CREATION_SPROC_PATH, [rabbitmq, sprocs, ?MODULE, create]).
-define(KHEPRI_DELETION_SPROC_PATH, [rabbitmq, sprocs, ?MODULE, delete]).
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% setup(). %% setup().
@ -67,7 +70,10 @@
-spec setup() -> ok | rabbit_khepri:timeout_error(). -spec setup() -> ok | rabbit_khepri:timeout_error().
setup() -> setup() ->
rabbit_khepri:put(?KHEPRI_CREATION_SPROC_PATH, create_in_khepri_tx_fn()). maybe
ok ?= rabbit_khepri:put(?KHEPRI_CREATION_SPROC_PATH, create_in_khepri_tx_fn()),
ok ?= rabbit_khepri:put(?KHEPRI_DELETION_SPROC_PATH, delete_in_khepri_tx_fn())
end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% exists(). %% exists().
@ -293,7 +299,7 @@ serial_in_khepri(true, X) ->
delete(Binding, ChecksFun) -> delete(Binding, ChecksFun) ->
rabbit_khepri:handle_fallback( rabbit_khepri:handle_fallback(
#{mnesia => fun() -> delete_in_mnesia(Binding, ChecksFun) end, #{mnesia => fun() -> delete_in_mnesia(Binding, ChecksFun) end,
khepri => fun() -> delete_in_khepri(Binding, ChecksFun) end khepri => fun() -> delete_in_khepri(Binding) end
}). }).
delete_in_mnesia(Binding, ChecksFun) -> delete_in_mnesia(Binding, ChecksFun) ->
@ -350,41 +356,44 @@ not_found_or_absent_in_mnesia(#resource{kind = queue} = Name) ->
{ok, Q} -> {absent, Q, nodedown} {ok, Q} -> {absent, Q, nodedown}
end. end.
delete_in_khepri(#binding{source = SrcName, delete_in_khepri(Binding) ->
destination = DstName} = Binding, ChecksFun) -> case khepri:transaction(
Path = khepri_route_path(Binding), rabbitmq_metadata,
case rabbit_khepri:transaction( ?KHEPRI_DELETION_SPROC_PATH,
fun () -> [Binding], rw, #{}) of
case {lookup_resource_in_khepri_tx(SrcName),
lookup_resource_in_khepri_tx(DstName)} of
{[Src], [Dst]} ->
case exists_in_khepri(Path, Binding) of
false ->
ok;
true ->
case ChecksFun(Src, Dst) of
ok ->
ok = delete_in_khepri(Binding),
maybe_auto_delete_exchange_in_khepri(Binding#binding.source, [Binding], rabbit_binding:new_deletions());
{error, _} = Err ->
Err
end
end;
_Errs ->
%% No absent queues, always present on disk
ok
end
end) of
ok -> ok ->
ok; ok;
{error, _} = Err -> {error, _} = Err ->
Err; Err;
Deletions -> {ok, Deletions} ->
ok = rabbit_binding:process_deletions(Deletions), ok = rabbit_binding:process_deletions(Deletions),
{ok, Deletions} {ok, Deletions}
end. end.
exists_in_khepri(Path, Binding) -> delete_in_khepri_tx_fn() ->
fun(#binding{source = SrcName, destination = DstName} = Binding) ->
Path = khepri_route_path(Binding),
%% TODO: Can we remove this as well?
case {lookup_resource_in_khepri_tx(SrcName),
lookup_resource_in_khepri_tx(DstName)} of
{[_Src], [_Dst]} ->
case exists_in_khepri_tx(Path, Binding) of
false ->
ok;
true ->
ok = delete_in_khepri_tx(Binding),
maybe_auto_delete_exchange_in_khepri(
Binding#binding.source,
[Binding],
rabbit_binding:new_deletions())
end;
_Errs ->
%% No absent queues, always present on disk
ok
end
end.
exists_in_khepri_tx(Path, Binding) ->
case khepri_tx:get(Path) of case khepri_tx:get(Path) of
{ok, Set} -> {ok, Set} ->
sets:is_element(Binding, Set); sets:is_element(Binding, Set);
@ -392,7 +401,7 @@ exists_in_khepri(Path, Binding) ->
false false
end. end.
delete_in_khepri(Binding) -> delete_in_khepri_tx(Binding) ->
Path = khepri_route_path(Binding), Path = khepri_route_path(Binding),
case khepri_tx:get(Path) of case khepri_tx:get(Path) of
{ok, Set0} -> {ok, Set0} ->