Merge pull request #11678 from rabbitmq/mergify/bp/v3.13.x/pr-11677

rabbit_db_*: Wrap `ets` calls to projections in `whereis/1` checks (backport #11667) (backport #11677)
This commit is contained in:
Michael Davis 2024-07-11 14:38:51 -05:00 committed by GitHub
commit 0e8ea17d20
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 282 additions and 113 deletions

View File

@ -53,6 +53,8 @@
-define(MNESIA_SEMI_DURABLE_TABLE, rabbit_semi_durable_route). -define(MNESIA_SEMI_DURABLE_TABLE, rabbit_semi_durable_route).
-define(MNESIA_REVERSE_TABLE, rabbit_reverse_route). -define(MNESIA_REVERSE_TABLE, rabbit_reverse_route).
-define(MNESIA_INDEX_TABLE, rabbit_index_route). -define(MNESIA_INDEX_TABLE, rabbit_index_route).
-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_bindings).
-define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route).
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% exists(). %% exists().
@ -411,7 +413,12 @@ get_all_in_mnesia() ->
end). end).
get_all_in_khepri() -> get_all_in_khepri() ->
[B || #route{binding = B} <- ets:tab2list(rabbit_khepri_bindings)]. case ets:whereis(?KHEPRI_BINDINGS_PROJECTION) of
undefined ->
[];
Table ->
[B || #route{binding = B} <- ets:tab2list(Table)]
end.
-spec get_all(VHostName) -> [Binding] when -spec get_all(VHostName) -> [Binding] when
VHostName :: vhost:name(), VHostName :: vhost:name(),
@ -437,11 +444,16 @@ get_all_in_mnesia(VHost) ->
[B || #route{binding = B} <- rabbit_db:list_in_mnesia(?MNESIA_TABLE, Match)]. [B || #route{binding = B} <- rabbit_db:list_in_mnesia(?MNESIA_TABLE, Match)].
get_all_in_khepri(VHost) -> get_all_in_khepri(VHost) ->
case ets:whereis(?KHEPRI_BINDINGS_PROJECTION) of
undefined ->
[];
Table ->
VHostResource = rabbit_misc:r(VHost, '_'), VHostResource = rabbit_misc:r(VHost, '_'),
Match = #route{binding = #binding{source = VHostResource, Match = #route{binding = #binding{source = VHostResource,
destination = VHostResource, destination = VHostResource,
_ = '_'}}, _ = '_'}},
[B || #route{binding = B} <- ets:match_object(rabbit_khepri_bindings, Match)]. [B || #route{binding = B} <- ets:match_object(Table, Match)]
end.
-spec get_all(Src, Dst, Reverse) -> [Binding] when -spec get_all(Src, Dst, Reverse) -> [Binding] when
Src :: rabbit_types:binding_source(), Src :: rabbit_types:binding_source(),
@ -469,10 +481,15 @@ get_all_in_mnesia(SrcName, DstName, Reverse) ->
mnesia:async_dirty(Fun). mnesia:async_dirty(Fun).
get_all_in_khepri(SrcName, DstName) -> get_all_in_khepri(SrcName, DstName) ->
case ets:whereis(?KHEPRI_BINDINGS_PROJECTION) of
undefined ->
[];
Table ->
MatchHead = #route{binding = #binding{source = SrcName, MatchHead = #route{binding = #binding{source = SrcName,
destination = DstName, destination = DstName,
_ = '_'}}, _ = '_'}},
[B || #route{binding = B} <- ets:match_object(rabbit_khepri_bindings, MatchHead)]. [B || #route{binding = B} <- ets:match_object(Table, MatchHead)]
end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% get_all_for_source(). %% get_all_for_source().
@ -511,8 +528,13 @@ list_for_route(Route, true) ->
end. end.
get_all_for_source_in_khepri(Resource) -> get_all_for_source_in_khepri(Resource) ->
case ets:whereis(?KHEPRI_BINDINGS_PROJECTION) of
undefined ->
[];
Table ->
Route = #route{binding = #binding{source = Resource, _ = '_'}}, Route = #route{binding = #binding{source = Resource, _ = '_'}},
[B || #route{binding = B} <- ets:match_object(rabbit_khepri_bindings, Route)]. [B || #route{binding = B} <- ets:match_object(Table, Route)]
end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% get_all_for_destination(). %% get_all_for_destination().
@ -541,9 +563,14 @@ get_all_for_destination_in_mnesia(Dst) ->
mnesia:async_dirty(Fun). mnesia:async_dirty(Fun).
get_all_for_destination_in_khepri(Destination) -> get_all_for_destination_in_khepri(Destination) ->
case ets:whereis(?KHEPRI_BINDINGS_PROJECTION) of
undefined ->
[];
Table ->
Match = #route{binding = #binding{destination = Destination, Match = #route{binding = #binding{destination = Destination,
_ = '_'}}, _ = '_'}},
[B || #route{binding = B} <- ets:match_object(rabbit_khepri_bindings, Match)]. [B || #route{binding = B} <- ets:match_object(Table, Match)]
end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% fold(). %% fold().
@ -617,11 +644,16 @@ match_in_mnesia(SrcName, Match) ->
Routes, Match(Binding)]. Routes, Match(Binding)].
match_in_khepri(SrcName, Match) -> match_in_khepri(SrcName, Match) ->
case ets:whereis(?KHEPRI_BINDINGS_PROJECTION) of
undefined ->
[];
Table ->
MatchHead = #route{binding = #binding{source = SrcName, MatchHead = #route{binding = #binding{source = SrcName,
_ = '_'}}, _ = '_'}},
Routes = ets:select(rabbit_khepri_bindings, [{MatchHead, [], [['$_']]}]), Routes = ets:select(Table, [{MatchHead, [], [['$_']]}]),
[Dest || [#route{binding = Binding = #binding{destination = Dest}}] <- [Dest || [#route{binding = Binding = #binding{destination = Dest}}] <-
Routes, Match(Binding)]. Routes, Match(Binding)]
end.
%% Routing - HOT CODE PATH %% Routing - HOT CODE PATH
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
@ -654,18 +686,26 @@ match_routing_key_in_mnesia(SrcName, RoutingKeys, UseIndex) ->
route_in_mnesia_v1(SrcName, RoutingKeys) route_in_mnesia_v1(SrcName, RoutingKeys)
end. end.
match_routing_key_in_khepri(Src, ['_']) -> match_routing_key_in_khepri(Src, RoutingKeys) ->
case ets:whereis(?KHEPRI_INDEX_ROUTE_PROJECTION) of
undefined ->
[];
Table ->
do_match_routing_key_in_khepri(Table, Src, RoutingKeys)
end.
do_match_routing_key_in_khepri(Table, Src, ['_']) ->
MatchHead = #index_route{source_key = {Src, '_'}, MatchHead = #index_route{source_key = {Src, '_'},
destination = '$1', destination = '$1',
_ = '_'}, _ = '_'},
ets:select(rabbit_khepri_index_route, [{MatchHead, [], ['$1']}]); ets:select(Table, [{MatchHead, [], ['$1']}]);
match_routing_key_in_khepri(Src, RoutingKeys) -> do_match_routing_key_in_khepri(Table, Src, RoutingKeys) ->
lists:foldl( lists:foldl(
fun(RK, Acc) -> fun(RK, Acc) ->
try try
Dst = ets:lookup_element( Dst = ets:lookup_element(
rabbit_khepri_index_route, Table,
{Src, RK}, {Src, RK},
#index_route.destination), #index_route.destination),
Dst ++ Acc Dst ++ Acc

View File

@ -57,6 +57,7 @@
-define(MNESIA_TABLE, rabbit_exchange). -define(MNESIA_TABLE, rabbit_exchange).
-define(MNESIA_DURABLE_TABLE, rabbit_durable_exchange). -define(MNESIA_DURABLE_TABLE, rabbit_durable_exchange).
-define(MNESIA_SERIAL_TABLE, rabbit_exchange_serial). -define(MNESIA_SERIAL_TABLE, rabbit_exchange_serial).
-define(KHEPRI_PROJECTION, rabbit_khepri_exchange).
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% get_all(). %% get_all().
@ -182,9 +183,14 @@ get_in_mnesia(Name) ->
rabbit_mnesia:dirty_read({?MNESIA_TABLE, Name}). rabbit_mnesia:dirty_read({?MNESIA_TABLE, Name}).
get_in_khepri(Name) -> get_in_khepri(Name) ->
case ets:lookup(rabbit_khepri_exchange, Name) of case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
{error, not_found};
Table ->
case ets:lookup(Table, Name) of
[X] -> {ok, X}; [X] -> {ok, X};
[] -> {error, not_found} [] -> {error, not_found}
end
end. end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
@ -227,7 +233,12 @@ get_many_in_mnesia(Table, Names) when is_list(Names) ->
lists:append([ets:lookup(Table, Name) || Name <- Names]). lists:append([ets:lookup(Table, Name) || Name <- Names]).
get_many_in_khepri(Names) when is_list(Names) -> get_many_in_khepri(Names) when is_list(Names) ->
lists:append([ets:lookup(rabbit_khepri_exchange, Name) || Name <- Names]). case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
[];
Table ->
lists:append([ets:lookup(Table, Name) || Name <- Names])
end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% count(). %% count().

View File

@ -111,10 +111,15 @@ get_all_in_mnesia() ->
end). end).
get_all_in_khepri() -> get_all_in_khepri() ->
case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
[];
Table ->
list_with_possible_retry_in_khepri( list_with_possible_retry_in_khepri(
fun() -> fun() ->
ets:tab2list(?KHEPRI_PROJECTION) ets:tab2list(Table)
end). end)
end.
-spec get_all(VHostName) -> [Queue] when -spec get_all(VHostName) -> [Queue] when
VHostName :: vhost:name(), VHostName :: vhost:name(),
@ -140,11 +145,16 @@ get_all_in_mnesia(VHostName) ->
end). end).
get_all_in_khepri(VHostName) -> get_all_in_khepri(VHostName) ->
case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
[];
Table ->
list_with_possible_retry_in_khepri( list_with_possible_retry_in_khepri(
fun() -> fun() ->
Pattern = amqqueue:pattern_match_on_name(rabbit_misc:r(VHostName, queue)), Pattern = amqqueue:pattern_match_on_name(rabbit_misc:r(VHostName, queue)),
ets:match_object(?KHEPRI_PROJECTION, Pattern) ets:match_object(Table, Pattern)
end). end)
end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% get_all_durable(). %% get_all_durable().
@ -172,11 +182,16 @@ get_all_durable_in_mnesia() ->
end). end).
get_all_durable_in_khepri() -> get_all_durable_in_khepri() ->
case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
[];
Table ->
list_with_possible_retry_in_khepri( list_with_possible_retry_in_khepri(
fun() -> fun() ->
Pattern = amqqueue:pattern_match_on_durable(true), Pattern = amqqueue:pattern_match_on_durable(true),
ets:match_object(?KHEPRI_PROJECTION, Pattern) ets:match_object(Table, Pattern)
end). end)
end.
-spec get_all_durable_by_type(Type) -> [Queue] when -spec get_all_durable_by_type(Type) -> [Queue] when
Type :: atom(), Type :: atom(),
@ -199,8 +214,13 @@ get_all_durable_by_type_in_mnesia(Type) ->
rabbit_db:list_in_mnesia(?MNESIA_DURABLE_TABLE, Pattern). rabbit_db:list_in_mnesia(?MNESIA_DURABLE_TABLE, Pattern).
get_all_durable_by_type_in_khepri(Type) -> get_all_durable_by_type_in_khepri(Type) ->
case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
[];
Table ->
Pattern = amqqueue:pattern_match_on_type_and_durable(Type, true), Pattern = amqqueue:pattern_match_on_type_and_durable(Type, true),
ets:match_object(?KHEPRI_PROJECTION, Pattern). ets:match_object(Table, Pattern)
end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% filter_all_durable(). %% filter_all_durable().
@ -231,6 +251,10 @@ filter_all_durable_in_mnesia(FilterFun) ->
end). end).
filter_all_durable_in_khepri(FilterFun) -> filter_all_durable_in_khepri(FilterFun) ->
case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
[];
Table ->
ets:foldl( ets:foldl(
fun(Q, Acc0) -> fun(Q, Acc0) ->
case amqqueue:is_durable(Q) andalso FilterFun(Q) of case amqqueue:is_durable(Q) andalso FilterFun(Q) of
@ -238,7 +262,8 @@ filter_all_durable_in_khepri(FilterFun) ->
false -> Acc0 false -> Acc0
end end
end, end,
[], ?KHEPRI_PROJECTION). [], Table)
end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% list(). %% list().
@ -263,8 +288,13 @@ list_in_mnesia() ->
mnesia:dirty_all_keys(?MNESIA_TABLE). mnesia:dirty_all_keys(?MNESIA_TABLE).
list_in_khepri() -> list_in_khepri() ->
case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
[];
Table ->
Pattern = amqqueue:pattern_match_on_name('$1'), Pattern = amqqueue:pattern_match_on_name('$1'),
ets:select(?KHEPRI_PROJECTION, [{Pattern, [], ['$1']}]). ets:select(Table, [{Pattern, [], ['$1']}])
end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% count(). %% count().
@ -289,7 +319,12 @@ count_in_mnesia() ->
mnesia:table_info(?MNESIA_TABLE, size). mnesia:table_info(?MNESIA_TABLE, size).
count_in_khepri() -> count_in_khepri() ->
ets:info(?KHEPRI_PROJECTION, size). case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
0;
Table ->
ets:info(Table, size)
end.
-spec count(VHostName) -> Count when -spec count(VHostName) -> Count when
VHostName :: vhost:name(), VHostName :: vhost:name(),
@ -330,8 +365,13 @@ list_for_count_in_mnesia(VHostName) ->
end). end).
list_for_count_in_khepri(VHostName) -> list_for_count_in_khepri(VHostName) ->
case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
0;
Table ->
Pattern = amqqueue:pattern_match_on_name(rabbit_misc:r(VHostName, queue)), Pattern = amqqueue:pattern_match_on_name(rabbit_misc:r(VHostName, queue)),
ets:select_count(?KHEPRI_PROJECTION, [{Pattern, [], [true]}]). ets:select_count(Table, [{Pattern, [], [true]}])
end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% delete(). %% delete().
@ -426,9 +466,17 @@ internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) ->
get_many(Names) when is_list(Names) -> get_many(Names) when is_list(Names) ->
rabbit_khepri:handle_fallback( rabbit_khepri:handle_fallback(
#{mnesia => fun() -> get_many_in_ets(?MNESIA_TABLE, Names) end, #{mnesia => fun() -> get_many_in_ets(?MNESIA_TABLE, Names) end,
khepri => fun() -> get_many_in_ets(?KHEPRI_PROJECTION, Names) end khepri => fun() -> get_many_in_khepri(Names) end
}). }).
get_many_in_khepri(Names) ->
case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
[];
Table ->
get_many_in_ets(Table, Names)
end.
get_many_in_ets(Table, [{Name, RouteInfos}]) get_many_in_ets(Table, [{Name, RouteInfos}])
when is_map(RouteInfos) -> when is_map(RouteInfos) ->
case ets:lookup(Table, Name) of case ets:lookup(Table, Name) of
@ -468,9 +516,14 @@ get_in_mnesia(Name) ->
rabbit_mnesia:dirty_read({?MNESIA_TABLE, Name}). rabbit_mnesia:dirty_read({?MNESIA_TABLE, Name}).
get_in_khepri(Name) -> get_in_khepri(Name) ->
case ets:lookup(?KHEPRI_PROJECTION, Name) of case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
{error, not_found};
Table ->
case ets:lookup(Table, Name) of
[Q] -> {ok, Q}; [Q] -> {ok, Q};
[] -> {error, not_found} [] -> {error, not_found}
end
end. end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
@ -519,8 +572,13 @@ get_many_durable_in_mnesia(Names) ->
get_many_in_ets(?MNESIA_DURABLE_TABLE, Names). get_many_in_ets(?MNESIA_DURABLE_TABLE, Names).
get_many_durable_in_khepri(Names) -> get_many_durable_in_khepri(Names) ->
Queues = get_many_in_ets(?KHEPRI_PROJECTION, Names), case ets:whereis(?KHEPRI_PROJECTION) of
[Q || Q <- Queues, amqqueue:is_durable(Q)]. undefined ->
[];
Table ->
Queues = get_many_in_ets(Table, Names),
[Q || Q <- Queues, amqqueue:is_durable(Q)]
end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% update(). %% update().
@ -730,7 +788,12 @@ exists_in_mnesia(QName) ->
ets:member(?MNESIA_TABLE, QName). ets:member(?MNESIA_TABLE, QName).
exists_in_khepri(QName) -> exists_in_khepri(QName) ->
ets:member(?KHEPRI_PROJECTION, QName). case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
false;
Table ->
ets:member(Table, QName)
end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% exists(). %% exists().

View File

@ -151,9 +151,14 @@ get_in_mnesia(Key) ->
end. end.
get_in_khepri(Key) -> get_in_khepri(Key) ->
case ets:lookup(?KHEPRI_PROJECTION, Key) of case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
undefined;
Table ->
case ets:lookup(Table, Key) of
[] -> undefined; [] -> undefined;
[Record] -> Record [Record] -> Record
end
end. end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
@ -177,7 +182,12 @@ get_all_in_mnesia() ->
rabbit_mnesia:dirty_read_all(?MNESIA_TABLE). rabbit_mnesia:dirty_read_all(?MNESIA_TABLE).
get_all_in_khepri() -> get_all_in_khepri() ->
ets:tab2list(?KHEPRI_PROJECTION). case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
[];
Table ->
ets:tab2list(Table)
end.
-spec get_all(VHostName, Comp) -> Ret when -spec get_all(VHostName, Comp) -> Ret when
VHostName :: vhost:name() | '_', VHostName :: vhost:name() | '_',
@ -214,9 +224,14 @@ get_all_in_khepri(VHostName, Comp) ->
'_' -> ok; '_' -> ok;
_ -> rabbit_vhost:assert(VHostName) _ -> rabbit_vhost:assert(VHostName)
end, end,
case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
[];
Table ->
Match = #runtime_parameters{key = {VHostName, Comp, '_'}, Match = #runtime_parameters{key = {VHostName, Comp, '_'},
_ = '_'}, _ = '_'},
ets:match_object(?KHEPRI_PROJECTION, Match). ets:match_object(Table, Match)
end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% delete(). %% delete().

View File

@ -26,6 +26,7 @@
-define(MNESIA_NODE_TABLE, rabbit_topic_trie_node). -define(MNESIA_NODE_TABLE, rabbit_topic_trie_node).
-define(MNESIA_EDGE_TABLE, rabbit_topic_trie_edge). -define(MNESIA_EDGE_TABLE, rabbit_topic_trie_edge).
-define(MNESIA_BINDING_TABLE, rabbit_topic_trie_binding). -define(MNESIA_BINDING_TABLE, rabbit_topic_trie_binding).
-define(KHEPRI_PROJECTION, rabbit_khepri_topic_trie).
-type match_result() :: [rabbit_types:binding_destination() | -type match_result() :: [rabbit_types:binding_destination() |
{rabbit_amqqueue:name(), rabbit_types:binding_key()}]. {rabbit_amqqueue:name(), rabbit_types:binding_key()}].
@ -491,48 +492,52 @@ ensure_topic_deletion_ets() ->
%% Khepri topic graph %% Khepri topic graph
trie_match_in_khepri(X, Words, BKeys) -> trie_match_in_khepri(X, Words, BKeys) ->
trie_match_in_khepri(X, root, Words, BKeys, []). case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
[];
Table ->
trie_match_in_khepri(Table, X, root, Words, BKeys, [])
end.
trie_match_in_khepri(X, Node, [], BKeys, ResAcc0) -> trie_match_in_khepri(Table, X, Node, [], BKeys, ResAcc0) ->
Destinations = trie_bindings_in_khepri(X, Node, BKeys), Destinations = trie_bindings_in_khepri(Table, X, Node, BKeys),
ResAcc = add_matched(Destinations, BKeys, ResAcc0), ResAcc = add_matched(Destinations, BKeys, ResAcc0),
trie_match_part_in_khepri( trie_match_part_in_khepri(
X, Node, <<"#">>, Table, X, Node, <<"#">>,
fun trie_match_skip_any_in_khepri/5, [], BKeys, ResAcc); fun trie_match_skip_any_in_khepri/6, [], BKeys, ResAcc);
trie_match_in_khepri(X, Node, [W | RestW] = Words, BKeys, ResAcc) -> trie_match_in_khepri(Table, X, Node, [W | RestW] = Words, BKeys, ResAcc) ->
lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) -> lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) ->
trie_match_part_in_khepri( trie_match_part_in_khepri(
X, Node, WArg, MatchFun, RestWArg, BKeys, Acc) Table, X, Node, WArg, MatchFun, RestWArg, BKeys, Acc)
end, ResAcc, [{W, fun trie_match_in_khepri/5, RestW}, end, ResAcc, [{W, fun trie_match_in_khepri/6, RestW},
{<<"*">>, fun trie_match_in_khepri/5, RestW}, {<<"*">>, fun trie_match_in_khepri/6, RestW},
{<<"#">>, {<<"#">>,
fun trie_match_skip_any_in_khepri/5, Words}]). fun trie_match_skip_any_in_khepri/6, Words}]).
trie_match_part_in_khepri(X, Node, Search, MatchFun, RestW, BKeys, ResAcc) -> trie_match_part_in_khepri(
case trie_child_in_khepri(X, Node, Search) of Table, X, Node, Search, MatchFun, RestW, BKeys, ResAcc) ->
{ok, NextNode} -> MatchFun(X, NextNode, RestW, BKeys, ResAcc); case trie_child_in_khepri(Table, X, Node, Search) of
{ok, NextNode} -> MatchFun(Table, X, NextNode, RestW, BKeys, ResAcc);
error -> ResAcc error -> ResAcc
end. end.
trie_match_skip_any_in_khepri(X, Node, [], BKeys, ResAcc) -> trie_match_skip_any_in_khepri(Table, X, Node, [], BKeys, ResAcc) ->
trie_match_in_khepri(X, Node, [], BKeys, ResAcc); trie_match_in_khepri(Table, X, Node, [], BKeys, ResAcc);
trie_match_skip_any_in_khepri(X, Node, [_ | RestW] = Words, BKeys, ResAcc) -> trie_match_skip_any_in_khepri(Table, X, Node, [_ | RestW] = Words, BKeys, ResAcc) ->
trie_match_skip_any_in_khepri( trie_match_skip_any_in_khepri(
X, Node, RestW, BKeys, Table, X, Node, RestW, BKeys,
trie_match_in_khepri(X, Node, Words, BKeys, ResAcc)). trie_match_in_khepri(Table, X, Node, Words, BKeys, ResAcc)).
trie_child_in_khepri(X, Node, Word) -> trie_child_in_khepri(Table, X, Node, Word) ->
case ets:lookup(rabbit_khepri_topic_trie, case ets:lookup(Table, #trie_edge{exchange_name = X,
#trie_edge{exchange_name = X,
node_id = Node, node_id = Node,
word = Word}) of word = Word}) of
[#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode}; [#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode};
[] -> error [] -> error
end. end.
trie_bindings_in_khepri(X, Node, BKeys) -> trie_bindings_in_khepri(Table,X, Node, BKeys) ->
case ets:lookup(rabbit_khepri_topic_trie, case ets:lookup(Table, #trie_edge{exchange_name = X,
#trie_edge{exchange_name = X,
node_id = Node, node_id = Node,
word = bindings}) of word = bindings}) of
[#topic_trie_edge{node_id = {bindings, Bindings}}] -> [#topic_trie_edge{node_id = {bindings, Bindings}}] ->

View File

@ -72,6 +72,8 @@
-define(MNESIA_TABLE, rabbit_user). -define(MNESIA_TABLE, rabbit_user).
-define(PERM_MNESIA_TABLE, rabbit_user_permission). -define(PERM_MNESIA_TABLE, rabbit_user_permission).
-define(TOPIC_PERM_MNESIA_TABLE, rabbit_topic_permission). -define(TOPIC_PERM_MNESIA_TABLE, rabbit_topic_permission).
-define(KHEPRI_USERS_PROJECTION, rabbit_khepri_users).
-define(KHEPRI_PERMISSIONS_PROJECTION, rabbit_khepri_user_permissions).
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% create(). %% create().
@ -185,9 +187,14 @@ get_in_mnesia(Username) ->
end. end.
get_in_khepri(Username) -> get_in_khepri(Username) ->
case ets:lookup(rabbit_khepri_users, Username) of case ets:whereis(?KHEPRI_USERS_PROJECTION) of
undefined ->
undefined;
Table ->
case ets:lookup(Table, Username) of
[User] -> User; [User] -> User;
_ -> undefined _ -> undefined
end
end. end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
@ -290,11 +297,18 @@ get_user_permissions_in_mnesia(Username, VHostName) ->
end. end.
get_user_permissions_in_khepri(Username, VHostName) -> get_user_permissions_in_khepri(Username, VHostName) ->
case ets:whereis(?KHEPRI_PERMISSIONS_PROJECTION) of
undefined ->
undefined;
Table ->
UserVHost = #user_vhost{username = Username, UserVHost = #user_vhost{username = Username,
virtual_host = VHostName}, virtual_host = VHostName},
case ets:lookup(rabbit_khepri_user_permissions, UserVHost) of case ets:lookup(Table, UserVHost) of
[UserPermission] -> UserPermission; [UserPermission] ->
_ -> undefined UserPermission;
_ ->
undefined
end
end. end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------

View File

@ -53,6 +53,7 @@
-endif. -endif.
-define(MNESIA_TABLE, rabbit_vhost). -define(MNESIA_TABLE, rabbit_vhost).
-define(KHEPRI_PROJECTION, rabbit_khepri_vhost).
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% create_or_get(). %% create_or_get().
@ -242,7 +243,12 @@ exists_in_mnesia(VHostName) ->
mnesia:dirty_read({?MNESIA_TABLE, VHostName}) /= []. mnesia:dirty_read({?MNESIA_TABLE, VHostName}) /= [].
exists_in_khepri(VHostName) -> exists_in_khepri(VHostName) ->
ets:member(rabbit_khepri_vhost, VHostName). case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
false;
Table ->
ets:member(Table, VHostName)
end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% get(). %% get().
@ -270,9 +276,14 @@ get_in_mnesia(VHostName) ->
end. end.
get_in_khepri(VHostName) -> get_in_khepri(VHostName) ->
case ets:lookup(rabbit_khepri_vhost, VHostName) of case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
undefined;
Table ->
case ets:lookup(Table, VHostName) of
[Record] -> Record; [Record] -> Record;
_ -> undefined _ -> undefined
end
end. end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
@ -296,7 +307,12 @@ get_all_in_mnesia() ->
mnesia:dirty_match_object(?MNESIA_TABLE, vhost:pattern_match_all()). mnesia:dirty_match_object(?MNESIA_TABLE, vhost:pattern_match_all()).
get_all_in_khepri() -> get_all_in_khepri() ->
ets:tab2list(rabbit_khepri_vhost). case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
[];
Table ->
ets:tab2list(Table)
end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% list(). %% list().
@ -319,7 +335,12 @@ list_in_mnesia() ->
mnesia:dirty_all_keys(?MNESIA_TABLE). mnesia:dirty_all_keys(?MNESIA_TABLE).
list_in_khepri() -> list_in_khepri() ->
ets:select(rabbit_khepri_vhost, [{vhost:pattern_match_names(), [], ['$1']}]). case ets:whereis(?KHEPRI_PROJECTION) of
undefined ->
[];
Table ->
ets:select(Table, [{vhost:pattern_match_names(), [], ['$1']}])
end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% update_in_*tx(). %% update_in_*tx().