Merge pull request #11225 from rabbitmq/reorganize-khepri-tree

Reorganize data in the Khepri store
This commit is contained in:
Jean-Sébastien Pédron 2024-09-05 17:14:13 +02:00 committed by GitHub
commit 30c82d1396
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 101 additions and 87 deletions

1
deps/rabbit/app.bzl vendored
View File

@ -525,6 +525,7 @@ def all_srcs(name = "all_srcs"):
"include/amqqueue.hrl",
"include/amqqueue_v2.hrl",
"include/internal_user.hrl",
"include/khepri.hrl",
"include/mc.hrl",
"include/rabbit_amqp.hrl",
"include/rabbit_global_counters.hrl",

9
deps/rabbit/include/khepri.hrl vendored Normal file
View File

@ -0,0 +1,9 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2024 Broadcom. All Rights Reserved. The term Broadcom
%% refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-define(KHEPRI_ROOT_PATH, [rabbitmq]).

View File

@ -38,8 +38,7 @@
-export([
khepri_route_path/1, khepri_route_path/5,
khepri_route_path_to_args/1,
khepri_route_exchange_path/1
khepri_route_path_to_args/1
]).
%% Recovery is only needed for transient entities. Once mnesia is removed, these
@ -202,8 +201,6 @@ create_in_khepri(#binding{source = SrcName,
MaybeSerial = rabbit_exchange:serialise_events(Src),
Serial = rabbit_khepri:transaction(
fun() ->
ExchangePath = khepri_route_exchange_path(SrcName),
ok = khepri_tx:put(ExchangePath, #{type => Src#exchange.type}),
case khepri_tx:get(RoutePath) of
{ok, Set} ->
case sets:is_element(Binding, Set) of
@ -1010,18 +1007,21 @@ clear_in_khepri() ->
%% --------------------------------------------------------------
khepri_route_path(
#binding{source = #resource{virtual_host = VHost, name = SrcName},
destination = #resource{kind = Kind, name = DstName},
#binding{source = #resource{virtual_host = VHost,
kind = exchange,
name = SrcName},
destination = #resource{virtual_host = VHost,
kind = Kind,
name = DstName},
key = RoutingKey}) ->
khepri_route_path(VHost, SrcName, Kind, DstName, RoutingKey).
khepri_route_path(VHost, SrcName, Kind, DstName, RoutingKey)
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
?IS_KHEPRI_PATH_CONDITION(SrcName) andalso
?IS_KHEPRI_PATH_CONDITION(Kind) andalso
when ?IS_KHEPRI_PATH_CONDITION(Kind) andalso
?IS_KHEPRI_PATH_CONDITION(DstName) andalso
?IS_KHEPRI_PATH_CONDITION(RoutingKey) ->
[?MODULE, routes, VHost, SrcName, Kind, DstName, RoutingKey].
ExchangePath = rabbit_db_exchange:khepri_exchange_path(VHost, SrcName),
ExchangePath ++ [bindings, Kind, DstName, RoutingKey].
khepri_route_path_to_args(Path) ->
Pattern = khepri_route_path(
@ -1047,9 +1047,6 @@ khepri_route_path_to_args(
'$RoutingKey' := RoutingKey}) ->
{VHost, SrcName, Kind, DstName, RoutingKey}.
khepri_route_exchange_path(#resource{virtual_host = VHost, name = SrcName}) ->
[?MODULE, routes, VHost, SrcName].
%% --------------------------------------------------------------
%% Internal
%% --------------------------------------------------------------

View File

@ -45,7 +45,7 @@ init_copy_to_khepri(_StoreId, _MigrationId, Tables) ->
%% @private
copy_to_khepri(rabbit_route = Table,
#route{binding = #binding{source = XName} = Binding},
#route{binding = #binding{} = Binding},
State) ->
?LOG_DEBUG(
"Mnesia->Khepri data copy: [~0p] key: ~0p",
@ -55,18 +55,12 @@ copy_to_khepri(rabbit_route = Table,
rabbit_db_m2k_converter:with_correlation_id(
fun(CorrId) ->
Extra = #{async => CorrId},
XPath = rabbit_db_binding:khepri_route_exchange_path(XName),
?LOG_DEBUG(
"Mnesia->Khepri data copy: [~0p] path: ~0p corr: ~0p",
[Table, Path, CorrId],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
rabbit_khepri:transaction(
fun() ->
%% Store the exchange's type in the exchange name
%% branch of the tree.
[#exchange{type = XType}] =
rabbit_db_exchange:get_in_khepri_tx(XName),
ok = khepri_tx:put(XPath, #{type => XType}),
%% Add the binding to the set at the binding's
%% path.
Set = case khepri_tx:get(Path) of

View File

@ -10,6 +10,8 @@
-include_lib("khepri/include/khepri.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include("include/khepri.hrl").
-export([
get_all/0,
get_all/1,
@ -894,15 +896,11 @@ maybe_auto_delete_in_khepri(XName, OnlyDurable) ->
khepri_exchange_path(#resource{virtual_host = VHost, name = Name}) ->
khepri_exchange_path(VHost, Name).
khepri_exchange_path(VHost, Name)
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
?IS_KHEPRI_PATH_CONDITION(Name) ->
[?MODULE, exchanges, VHost, Name].
khepri_exchange_path(VHost, Name) when ?IS_KHEPRI_PATH_CONDITION(Name) ->
rabbit_db_vhost:khepri_vhost_path(VHost) ++ [exchanges, Name].
khepri_exchange_serial_path(#resource{virtual_host = VHost, name = Name}) ->
khepri_exchange_serial_path(VHost, Name).
khepri_exchange_serial_path(#resource{} = Resource) ->
khepri_exchange_path(Resource) ++ [serial].
khepri_exchange_serial_path(VHost, Name)
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
?IS_KHEPRI_PATH_CONDITION(Name) ->
[?MODULE, exchange_serials, VHost, Name].
khepri_exchange_serial_path(VHost, Name) ->
khepri_exchange_path(VHost, Name) ++ [serial].

View File

@ -10,6 +10,8 @@
-include_lib("khepri/include/khepri.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include("include/khepri.hrl").
-export([
table_definitions/0,
set/1,
@ -168,4 +170,4 @@ get_consistent_in_khepri(Node) ->
%% -------------------------------------------------------------------
khepri_maintenance_path(Node) when ?IS_KHEPRI_PATH_CONDITION(Node) ->
[?MODULE, maintenance, Node].
?KHEPRI_ROOT_PATH ++ [node_maintenance, Node].

View File

@ -10,6 +10,8 @@
-include_lib("khepri/include/khepri.hrl").
-include("mirrored_supervisor.hrl").
-include("include/khepri.hrl").
-export([
create_tables/0,
table_definitions/0,
@ -326,8 +328,8 @@ clear_in_khepri() ->
khepri_mirrored_supervisor_path(Group, Id)
when ?IS_KHEPRI_PATH_CONDITION(Group) andalso
?IS_KHEPRI_PATH_CONDITION(Id) ->
[?MODULE, mirrored_supervisor_childspec, Group, Id];
?KHEPRI_ROOT_PATH ++ [mirrored_supervisors, Group, Id];
khepri_mirrored_supervisor_path(Group, Id)
when is_atom(Group) ->
IdPath = Group:id_to_khepri_path(Id),
[?MODULE, mirrored_supervisor_childspec, Group] ++ IdPath.
?KHEPRI_ROOT_PATH ++ [mirrored_supervisors, Group] ++ IdPath.

View File

@ -1359,7 +1359,5 @@ list_with_possible_retry_in_khepri(Fun) ->
khepri_queue_path(#resource{virtual_host = VHost, name = Name}) ->
khepri_queue_path(VHost, Name).
khepri_queue_path(VHost, Name)
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
?IS_KHEPRI_PATH_CONDITION(Name) ->
[?MODULE, queues, VHost, Name].
khepri_queue_path(VHost, Name) when ?IS_KHEPRI_PATH_CONDITION(Name) ->
rabbit_db_vhost:khepri_vhost_path(VHost) ++ [queues, Name].

View File

@ -10,6 +10,8 @@
-include_lib("khepri/include/khepri.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include("include/khepri.hrl").
-export([set/2, set/4,
get/1,
get_all/0, get_all/2,
@ -362,10 +364,10 @@ khepri_rp_path(Key) ->
khepri_global_rp_path(Key).
khepri_global_rp_path(Key) when ?IS_KHEPRI_PATH_CONDITION(Key) ->
[?MODULE, global, Key].
?KHEPRI_ROOT_PATH ++ [runtime_params, Key].
khepri_vhost_rp_path(VHost, Component, Name)
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
?IS_KHEPRI_PATH_CONDITION(Component) andalso
when ?IS_KHEPRI_PATH_CONDITION(Component) andalso
?IS_KHEPRI_PATH_CONDITION(Name) ->
[?MODULE, per_vhost, VHost, Component, Name].
VHostPath = rabbit_db_vhost:khepri_vhost_path(VHost),
VHostPath ++ [runtime_params, Component, Name].

View File

@ -12,6 +12,8 @@
-include_lib("khepri/include/khepri.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include("include/khepri.hrl").
-export([create/1,
update/2,
get/1,
@ -489,13 +491,12 @@ set_user_permissions_in_khepri(Username, VHostName, UserPermission) ->
end)), rw).
set_user_permissions_in_khepri_tx(Username, VHostName, UserPermission) ->
%% TODO: Check user presence in a transaction.
Path = khepri_user_permission_path(
#if_all{conditions =
[Username,
#if_node_exists{exists = true}]},
Username,
VHostName),
Extra = #{keep_while =>
#{rabbit_db_vhost:khepri_vhost_path(VHostName) =>
#{rabbit_db_user:khepri_user_path(Username) =>
#if_node_exists{exists = true}}},
Ret = khepri_tx:put(
Path, UserPermission, Extra),
@ -877,14 +878,13 @@ set_topic_permissions_in_khepri(Username, VHostName, TopicPermission) ->
set_topic_permissions_in_khepri_tx(Username, VHostName, TopicPermission) ->
#topic_permission{topic_permission_key =
#topic_permission_key{exchange = ExchangeName}} = TopicPermission,
%% TODO: Check user presence in a transaction.
Path = khepri_topic_permission_path(
#if_all{conditions =
[Username,
#if_node_exists{exists = true}]},
Username,
VHostName,
ExchangeName),
Extra = #{keep_while =>
#{rabbit_db_vhost:khepri_vhost_path(VHostName) =>
#{rabbit_db_user:khepri_user_path(Username) =>
#if_node_exists{exists = true}}},
Ret = khepri_tx:put(Path, TopicPermission, Extra),
case Ret of
@ -1094,15 +1094,14 @@ clear_in_khepri() ->
khepri_user_path(Username)
when ?IS_KHEPRI_PATH_CONDITION(Username) ->
[?MODULE, users, Username].
?KHEPRI_ROOT_PATH ++ [users, Username].
khepri_user_permission_path(Username, VHostName)
when ?IS_KHEPRI_PATH_CONDITION(Username) andalso
?IS_KHEPRI_PATH_CONDITION(VHostName) ->
[?MODULE, users, Username, user_permissions, VHostName].
when ?IS_KHEPRI_PATH_CONDITION(Username) ->
(rabbit_db_vhost:khepri_vhost_path(VHostName) ++
[user_permissions, Username]).
khepri_topic_permission_path(Username, VHostName, Exchange)
when ?IS_KHEPRI_PATH_CONDITION(Username) andalso
?IS_KHEPRI_PATH_CONDITION(VHostName) andalso
?IS_KHEPRI_PATH_CONDITION(Exchange) ->
[?MODULE, users, Username, topic_permissions, VHostName, Exchange].
when ?IS_KHEPRI_PATH_CONDITION(Username) ->
(rabbit_db_exchange:khepri_exchange_path(VHostName, Exchange) ++
[user_permissions, Username]).

View File

@ -73,14 +73,12 @@ copy_to_khepri(
[Table, Username, VHost],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
Path = rabbit_db_user:khepri_user_permission_path(
#if_all{conditions =
[Username,
#if_node_exists{exists = true}]},
Username,
VHost),
rabbit_db_m2k_converter:with_correlation_id(
fun(CorrId) ->
Extra = #{keep_while =>
#{rabbit_db_vhost:khepri_vhost_path(VHost) =>
#{rabbit_db_user:khepri_user_path(Username) =>
#if_node_exists{exists = true}},
async => CorrId},
?LOG_DEBUG(
@ -103,15 +101,13 @@ copy_to_khepri(
[Table, Username, VHost],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
Path = rabbit_db_user:khepri_topic_permission_path(
#if_all{conditions =
[Username,
#if_node_exists{exists = true}]},
Username,
VHost,
Exchange),
rabbit_db_m2k_converter:with_correlation_id(
fun(CorrId) ->
Extra = #{keep_while =>
#{rabbit_db_vhost:khepri_vhost_path(VHost) =>
#{rabbit_db_user:khepri_user_path(Username) =>
#if_node_exists{exists = true}},
async => CorrId},
?LOG_DEBUG(

View File

@ -11,6 +11,7 @@
-include_lib("rabbit_common/include/logging.hrl").
-include_lib("khepri/include/khepri.hrl").
-include("include/khepri.hrl").
-include("vhost.hrl").
-export([create_or_get/3,
@ -532,4 +533,4 @@ clear_in_khepri() ->
%% --------------------------------------------------------------
khepri_vhost_path(VHost) when ?IS_KHEPRI_PATH_CONDITION(VHost) ->
[?MODULE, VHost].
?KHEPRI_ROOT_PATH ++ [vhosts, VHost].

View File

@ -94,6 +94,8 @@
-include_lib("rabbit_common/include/logging.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include("include/khepri.hrl").
-export([setup/0,
setup/1,
init/0,
@ -145,6 +147,7 @@
dir/0,
info/0,
root_path/0,
handle_async_ret/1,
@ -895,6 +898,15 @@ cluster_status_from_khepri() ->
{error, khepri_not_running}
end.
-spec root_path() -> RootPath when
RootPath :: khepri_path:path().
%% @doc Returns the path where RabbitMQ stores every metadata.
%%
%% This path must be prepended to all paths used by RabbitMQ subsystems.
root_path() ->
?KHEPRI_ROOT_PATH.
%% -------------------------------------------------------------------
%% "Proxy" functions to Khepri API.
%% -------------------------------------------------------------------
@ -1213,10 +1225,11 @@ register_rabbit_index_route_projection() ->
Options = #{type => bag, keypos => #index_route.source_key},
Projection = khepri_projection:new(
rabbit_khepri_index_route, ProjectionFun, Options),
DirectOrFanout = #if_data_matches{pattern = #{type => '$1'},
conditions = [{'andalso',
{'=/=', '$1', headers},
{'=/=', '$1', topic}}]},
DirectOrFanout = #if_data_matches{
pattern = #exchange{type = '$1', _ = '_'},
conditions = [{'andalso',
{'=/=', '$1', headers},
{'=/=', '$1', topic}}]},
PathPattern = rabbit_db_binding:khepri_route_path(
_VHost = ?KHEPRI_WILDCARD_STAR,
_Exchange = DirectOrFanout,
@ -1319,7 +1332,8 @@ register_rabbit_topic_graph_projection() ->
Projection = khepri_projection:new(Name, ProjectionFun, Options),
PathPattern = rabbit_db_binding:khepri_route_path(
_VHost = ?KHEPRI_WILDCARD_STAR,
_Exchange = #if_data_matches{pattern = #{type => topic}},
_Exchange = #if_data_matches{
pattern = #exchange{type = topic, _ = '_'}},
_Kind = ?KHEPRI_WILDCARD_STAR,
_DstName = ?KHEPRI_WILDCARD_STAR,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),

View File

@ -224,7 +224,6 @@ khepri_consistent_hash_path(#exchange{name = Name}) ->
khepri_consistent_hash_path(#resource{virtual_host = VHost, name = Name}) ->
khepri_consistent_hash_path(VHost, Name).
khepri_consistent_hash_path(VHost, Name)
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
?IS_KHEPRI_PATH_CONDITION(Name) ->
[?MODULE, exchange_type_consistent_hash_ring_state, VHost, Name].
khepri_consistent_hash_path(VHost, Name) ->
ExchangePath = rabbit_db_exchange:khepri_exchange_path(VHost, Name),
ExchangePath ++ [consistent_hash_ring_state].

View File

@ -235,7 +235,6 @@ remove_items(Dict, [Key | Keys]) -> remove_items(dict:erase(Key, Dict), Keys).
khepri_jms_topic_exchange_path(#resource{virtual_host = VHost, name = Name}) ->
khepri_jms_topic_exchange_path(VHost, Name).
khepri_jms_topic_exchange_path(VHost, Name)
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
?IS_KHEPRI_PATH_CONDITION(Name) ->
[?MODULE, jms_topic_exchange, VHost, Name].
khepri_jms_topic_exchange_path(VHost, Name) ->
ExchangePath = rabbit_db_exchange:khepri_exchange_path(VHost, Name),
ExchangePath ++ [jms_topic].

View File

@ -177,7 +177,6 @@ delete_in_khepri(XName) ->
khepri_recent_history_path(#resource{virtual_host = VHost, name = Name}) ->
khepri_recent_history_path(VHost, Name).
khepri_recent_history_path(VHost, Name)
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
?IS_KHEPRI_PATH_CONDITION(Name) ->
[?MODULE, recent_history_exchange, VHost, Name].
khepri_recent_history_path(VHost, Name) ->
ExchangePath = rabbit_db_exchange:khepri_exchange_path(VHost, Name),
ExchangePath ++ [recent_history].

View File

@ -58,11 +58,15 @@ end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(mnesia_store, Config) ->
rabbit_ct_helpers:set_config(Config, [{metadata_store, mnesia}]);
case rabbit_ct_broker_helpers:configured_metadata_store(Config) of
{khepri, _} -> {skip, "These tests target Mnesia"};
_ -> Config
end;
init_per_group(khepri_store, Config) ->
rabbit_ct_helpers:set_config(
Config,
[{metadata_store, {khepri, [khepri_db]}}]);
case rabbit_ct_broker_helpers:configured_metadata_store(Config) of
mnesia -> {skip, "These tests target Khepri"};
_ -> Config
end;
init_per_group(_, Config) ->
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, ?MODULE},