rabbit_khepri: Add macros for path patterns

This commit is contained in:
Michael Davis 2024-10-10 11:17:13 -04:00
parent e8fb9b6889
commit c3c7675bda
No known key found for this signature in database
10 changed files with 90 additions and 26 deletions

View File

@ -6,4 +6,59 @@
%% refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-define(KHEPRI_ROOT_PATH, [rabbitmq]).
%% This header has macros that define the `khepri_path:native_pattern()'
%% path patterns used for each piece of metadata in the store. We use macros
%% for these so that we can pattern match on these path patterns as well as
%% create them as new terms.
%%
%% If you are creating a path pattern to use in a call to the Khepri API (for
%% example `rabbit_khepri:get/1') you should prefer using the
%% `khepri_<entity>_path' function from the `rabbit_db_<entity>' modules
%% instead, for example `rabbit_db_queue:khepri_queue_path/2', since those
%% functions have guards to assert that the variables passed are valid pattern
%% components.
-define(RABBITMQ_KHEPRI_ROOT_PATH, ?RABBITMQ_KHEPRI_ROOT_PATH([])).
-define(RABBITMQ_KHEPRI_ROOT_PATH(Rest), [rabbitmq | Rest]).
-define(RABBITMQ_KHEPRI_MAINTENANCE_PATH(Node),
?RABBITMQ_KHEPRI_ROOT_PATH([node_maintenance, Node])).
-define(RABBITMQ_KHEPRI_GLOBAL_RUNTIME_PARAM_PATH(Key),
?RABBITMQ_KHEPRI_ROOT_PATH([runtime_params, Key])).
-define(RABBITMQ_KHEPRI_USER_PATH(Username),
?RABBITMQ_KHEPRI_ROOT_PATH([users, Username])).
-define(RABBITMQ_KHEPRI_MIRRORED_SUPERVISOR_PATH(Group, Id),
?RABBITMQ_KHEPRI_ROOT_PATH([mirrored_supervisors, Group, Id])).
-define(RABBITMQ_KHEPRI_VHOST_PATH(Name),
?RABBITMQ_KHEPRI_VHOST_PATH(Name, [])).
-define(RABBITMQ_KHEPRI_VHOST_PATH(Name, Rest),
?RABBITMQ_KHEPRI_ROOT_PATH([vhosts, Name | Rest])).
-define(RABBITMQ_KHEPRI_VHOST_RUNTIME_PARAM_PATH(VHost, Component, Name),
?RABBITMQ_KHEPRI_VHOST_PATH(VHost, [runtime_params, Component, Name])).
-define(RABBITMQ_KHEPRI_USER_PERMISSION_PATH(VHost, Username),
?RABBITMQ_KHEPRI_VHOST_PATH(VHost, [user_permissions, Username])).
-define(RABBITMQ_KHEPRI_EXCHANGE_PATH(VHost, Name),
?RABBITMQ_KHEPRI_EXCHANGE_PATH(VHost, Name, [])).
-define(RABBITMQ_KHEPRI_EXCHANGE_PATH(VHost, Name, Rest),
?RABBITMQ_KHEPRI_VHOST_PATH(VHost, [exchanges, Name | Rest])).
-define(RABBITMQ_KHEPRI_EXCHANGE_SERIAL_PATH(VHost, Name),
?RABBITMQ_KHEPRI_EXCHANGE_PATH(VHost, Name, [serial])).
-define(RABBITMQ_KHEPRI_TOPIC_PERMISSION_PATH(VHost, Exchange, Username),
?RABBITMQ_KHEPRI_EXCHANGE_PATH(
VHost, Exchange, [user_permissions, Username])).
-define(RABBITMQ_KHEPRI_ROUTE_PATH(VHost, SrcName, Kind, DstName, RoutingKey),
?RABBITMQ_KHEPRI_EXCHANGE_PATH(
VHost, SrcName, [bindings, Kind, DstName, RoutingKey])).
-define(RABBITMQ_KHEPRI_QUEUE_PATH(VHost, Name),
?RABBITMQ_KHEPRI_VHOST_PATH(VHost, [queues, Name])).

View File

@ -10,6 +10,8 @@
-include_lib("khepri/include/khepri.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include("include/rabbit_khepri.hrl").
-export([exists/1,
create/2,
delete/2,
@ -1021,8 +1023,7 @@ khepri_route_path(VHost, SrcName, Kind, DstName, RoutingKey)
when ?IS_KHEPRI_PATH_CONDITION(Kind) andalso
?IS_KHEPRI_PATH_CONDITION(DstName) andalso
?IS_KHEPRI_PATH_CONDITION(RoutingKey) ->
ExchangePath = rabbit_db_exchange:khepri_exchange_path(VHost, SrcName),
ExchangePath ++ [bindings, Kind, DstName, RoutingKey].
?RABBITMQ_KHEPRI_ROUTE_PATH(VHost, SrcName, Kind, DstName, RoutingKey).
khepri_route_path_to_args(Path) ->
Pattern = khepri_route_path(

View File

@ -960,11 +960,15 @@ maybe_auto_delete_in_khepri(XName, OnlyDurable) ->
khepri_exchange_path(#resource{virtual_host = VHost, name = Name}) ->
khepri_exchange_path(VHost, Name).
khepri_exchange_path(VHost, Name) when ?IS_KHEPRI_PATH_CONDITION(Name) ->
rabbit_db_vhost:khepri_vhost_path(VHost) ++ [exchanges, Name].
khepri_exchange_path(VHost, Name)
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
?IS_KHEPRI_PATH_CONDITION(Name) ->
?RABBITMQ_KHEPRI_EXCHANGE_PATH(VHost, Name).
khepri_exchange_serial_path(#resource{} = Resource) ->
khepri_exchange_path(Resource) ++ [serial].
khepri_exchange_serial_path(#resource{virtual_host = VHost, name = Name}) ->
khepri_exchange_serial_path(VHost, Name).
khepri_exchange_serial_path(VHost, Name) ->
khepri_exchange_path(VHost, Name) ++ [serial].
khepri_exchange_serial_path(VHost, Name)
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
?IS_KHEPRI_PATH_CONDITION(Name) ->
?RABBITMQ_KHEPRI_EXCHANGE_SERIAL_PATH(VHost, Name).

View File

@ -170,4 +170,4 @@ get_consistent_in_khepri(Node) ->
%% -------------------------------------------------------------------
khepri_maintenance_path(Node) when ?IS_KHEPRI_PATH_CONDITION(Node) ->
?KHEPRI_ROOT_PATH ++ [node_maintenance, Node].
?RABBITMQ_KHEPRI_MAINTENANCE_PATH(Node).

View File

@ -328,8 +328,8 @@ clear_in_khepri() ->
khepri_mirrored_supervisor_path(Group, Id)
when ?IS_KHEPRI_PATH_CONDITION(Group) andalso
?IS_KHEPRI_PATH_CONDITION(Id) ->
?KHEPRI_ROOT_PATH ++ [mirrored_supervisors, Group, Id];
?RABBITMQ_KHEPRI_MIRRORED_SUPERVISOR_PATH(Group, Id);
khepri_mirrored_supervisor_path(Group, Id)
when is_atom(Group) ->
IdPath = Group:id_to_khepri_path(Id),
?KHEPRI_ROOT_PATH ++ [mirrored_supervisors, Group] ++ IdPath.
?RABBITMQ_KHEPRI_ROOT_PATH ++ [mirrored_supervisors, Group] ++ IdPath.

View File

@ -13,6 +13,8 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-include("amqqueue.hrl").
-include("include/rabbit_khepri.hrl").
-export([
get/1,
get_many/1,
@ -1394,5 +1396,7 @@ list_with_possible_retry_in_khepri(Fun) ->
khepri_queue_path(#resource{virtual_host = VHost, name = Name}) ->
khepri_queue_path(VHost, Name).
khepri_queue_path(VHost, Name) when ?IS_KHEPRI_PATH_CONDITION(Name) ->
rabbit_db_vhost:khepri_vhost_path(VHost) ++ [queues, Name].
khepri_queue_path(VHost, Name)
when ?IS_KHEPRI_PATH_CONDITION(VHost) andalso
?IS_KHEPRI_PATH_CONDITION(Name) ->
?RABBITMQ_KHEPRI_QUEUE_PATH(VHost, Name).

View File

@ -364,10 +364,9 @@ khepri_rp_path(Key) ->
khepri_global_rp_path(Key).
khepri_global_rp_path(Key) when ?IS_KHEPRI_PATH_CONDITION(Key) ->
?KHEPRI_ROOT_PATH ++ [runtime_params, Key].
?RABBITMQ_KHEPRI_GLOBAL_RUNTIME_PARAM_PATH(Key).
khepri_vhost_rp_path(VHost, Component, Name)
when ?IS_KHEPRI_PATH_CONDITION(Component) andalso
?IS_KHEPRI_PATH_CONDITION(Name) ->
VHostPath = rabbit_db_vhost:khepri_vhost_path(VHost),
VHostPath ++ [runtime_params, Component, Name].
?RABBITMQ_KHEPRI_VHOST_RUNTIME_PARAM_PATH(VHost, Component, Name).

View File

@ -1094,14 +1094,15 @@ clear_in_khepri() ->
khepri_user_path(Username)
when ?IS_KHEPRI_PATH_CONDITION(Username) ->
?KHEPRI_ROOT_PATH ++ [users, Username].
?RABBITMQ_KHEPRI_USER_PATH(Username).
khepri_user_permission_path(Username, VHostName)
when ?IS_KHEPRI_PATH_CONDITION(Username) ->
(rabbit_db_vhost:khepri_vhost_path(VHostName) ++
[user_permissions, Username]).
when ?IS_KHEPRI_PATH_CONDITION(Username) andalso
?IS_KHEPRI_PATH_CONDITION(VHostName) ->
?RABBITMQ_KHEPRI_USER_PERMISSION_PATH(VHostName, Username).
khepri_topic_permission_path(Username, VHostName, Exchange)
when ?IS_KHEPRI_PATH_CONDITION(Username) ->
(rabbit_db_exchange:khepri_exchange_path(VHostName, Exchange) ++
[user_permissions, Username]).
when ?IS_KHEPRI_PATH_CONDITION(Username) andalso
?IS_KHEPRI_PATH_CONDITION(VHostName) andalso
?IS_KHEPRI_PATH_CONDITION(Exchange) ->
?RABBITMQ_KHEPRI_TOPIC_PERMISSION_PATH(VHostName, Exchange, Username).

View File

@ -533,4 +533,4 @@ clear_in_khepri() ->
%% --------------------------------------------------------------
khepri_vhost_path(VHost) when ?IS_KHEPRI_PATH_CONDITION(VHost) ->
?KHEPRI_ROOT_PATH ++ [vhosts, VHost].
?RABBITMQ_KHEPRI_VHOST_PATH(VHost).

View File

@ -943,7 +943,7 @@ cluster_status_from_khepri() ->
%% This path must be prepended to all paths used by RabbitMQ subsystems.
root_path() ->
?KHEPRI_ROOT_PATH.
?RABBITMQ_KHEPRI_ROOT_PATH.
%% -------------------------------------------------------------------
%% "Proxy" functions to Khepri API.