Khepri: Clean up the proxy functions of the integration code

[Why]
The `rabbit_khepri` module grew during the work to add Khepri support to
RabbitMQ and while Khepri was itself written. The current code is
therefore unorganized.

[How]
This commit tries to change proxy functions to be close to their Khepri
equivalent.

The module continues to set non-default options for write functions. We
also add the variants that take an option map to be consistent and not
have to deal with that in the future.

Several legacy functions were removed, either because they were no
longer called or because they were replace by a regular Khepri call.
This commit is contained in:
Jean-Sébastien Pédron 2025-04-24 13:00:35 +02:00
parent bd3aee35b4
commit 5300076e33
No known key found for this signature in database
GPG Key ID: 39E99761A5FD94CC
6 changed files with 206 additions and 187 deletions

View File

@ -328,7 +328,7 @@ list_in_khepri(Path) ->
Objects :: [term()].
list_in_khepri(Path, Options) ->
case rabbit_khepri:match(Path, Options) of
case rabbit_khepri:get_many(Path, Options) of
{ok, Map} -> maps:values(Map);
_ -> []
end.

View File

@ -357,7 +357,7 @@ delete_vhost_in_mnesia_tx(VHostName) ->
delete_vhost_in_khepri(VHostName) ->
Pattern = khepri_vhost_rp_path(
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
case rabbit_khepri:adv_delete_many(Pattern) of
case rabbit_khepri:adv_delete(Pattern) of
{ok, NodePropsMap} ->
RTParams =
maps:fold(

View File

@ -402,7 +402,7 @@ match_user_permissions_in_mnesia_tx(Username, VHostName) ->
match_user_permissions_in_khepri('_' = _Username, '_' = _VHostName) ->
Path = khepri_user_permission_path(?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
case rabbit_khepri:match(Path) of
case rabbit_khepri:get_many(Path) of
{ok, Map} ->
maps:values(Map);
_ ->

View File

@ -2,7 +2,8 @@
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%% Copyright (c) 2023-2025 Broadcom. All Rights Reserved. The term Broadcom
%% refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
%% @doc Khepri database uses wrapper.
@ -107,7 +108,9 @@
dir/0,
get_ra_cluster_name/0,
get_store_id/0,
root_path/0]).
root_path/0,
info/0]).
%% Clustering.
-export([can_join_cluster/1,
@ -130,45 +133,38 @@
status/0,
cli_cluster_status/0]).
-export([fence/1,
info/0,
%% "Proxy" functions to Khepri query/update API.
-export([is_empty/0,
is_empty/0,
create/2,
adv_create/2,
update/2,
cas/3,
fold/3, fold/4,
foreach/2,
filter/2,
get/1,
get/2,
get/1, get/2,
adv_get/1, adv_get/2,
get_many/1, get_many/2,
adv_get_many/1, adv_get_many/2,
exists/1, exists/2,
count/1, count/2,
get_many/1,
adv_get/1,
adv_get_many/1,
match/1,
match/2,
exists/1,
list/1,
list_child_nodes/1,
count_children/1,
fold/3, fold/4,
foreach/2, foreach/3,
map/2, map/3,
filter/2, filter/3,
put/2, put/3,
adv_put/2,
clear_payload/1,
adv_put/2, adv_put/3,
create/2, create/3,
adv_create/2, adv_create/3,
update/2, update/3,
adv_update/2, adv_update/3,
delete/1, delete/2,
delete_or_fail/1,
adv_delete_many/1,
adv_delete/1, adv_delete/2,
clear_payload/1, clear_payload/2,
transaction/1,
transaction/2,
transaction/3,
transaction/1, transaction/2, transaction/3,
clear_store/0,
fence/1,
handle_async_ret/1]).
handle_async_ret/1,
delete_or_fail/1]).
%% Used during migration to join the standalone Khepri nodes and form the
%% equivalent cluster
@ -177,9 +173,6 @@
is_enabled/0, is_enabled/1,
get_feature_state/0, get_feature_state/1,
handle_fallback/1]).
%% To add the current node to an existing cluster
-export([]).
-export([]).
-ifdef(TEST).
-export([register_projections/0,
@ -427,6 +420,10 @@ get_store_id() ->
root_path() ->
?RABBITMQ_KHEPRI_ROOT_PATH.
info() ->
ok = setup(),
khepri:info(?STORE_ID).
%% -------------------------------------------------------------------
%% Clustering.
%% -------------------------------------------------------------------
@ -1140,100 +1137,161 @@ cli_cluster_status() ->
%% They are some additional functions too, because they are useful in
%% RabbitMQ. They might be moved to Khepri in the future.
is_empty() -> khepri:is_empty(?STORE_ID).
is_empty() ->
khepri:is_empty(?STORE_ID).
create(Path, Data) ->
khepri:create(?STORE_ID, Path, Data, ?DEFAULT_COMMAND_OPTIONS).
adv_create(Path, Data) -> adv_create(Path, Data, #{}).
adv_create(Path, Data, Options0) ->
Options = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options0),
khepri_adv:create(?STORE_ID, Path, Data, Options).
update(Path, Data) ->
khepri:update(?STORE_ID, Path, Data, ?DEFAULT_COMMAND_OPTIONS).
cas(Path, Pattern, Data) ->
khepri:compare_and_swap(
?STORE_ID, Path, Pattern, Data, ?DEFAULT_COMMAND_OPTIONS).
get(PathPattern) ->
khepri:get(?STORE_ID, PathPattern).
fold(Path, Pred, Acc) ->
khepri:fold(?STORE_ID, Path, Pred, Acc).
get(PathPattern, Options) ->
khepri:get(?STORE_ID, PathPattern, Options).
fold(Path, Pred, Acc, Options) ->
khepri:fold(?STORE_ID, Path, Pred, Acc, Options).
adv_get(PathPattern) ->
khepri_adv:get(?STORE_ID, PathPattern).
foreach(Path, Pred) ->
khepri:foreach(?STORE_ID, Path, Pred).
filter(Path, Pred) ->
khepri:filter(?STORE_ID, Path, Pred).
get(Path) ->
khepri:get(?STORE_ID, Path).
get(Path, Options) ->
khepri:get(?STORE_ID, Path, Options).
count(PathPattern) ->
khepri:count(?STORE_ID, PathPattern, #{favor => low_latency}).
count(Path, Options) ->
Options1 = Options#{favor => low_latency},
khepri:count(?STORE_ID, Path, Options1).
adv_get(PathPattern, Options) ->
khepri_adv:get(?STORE_ID, PathPattern, Options).
get_many(PathPattern) ->
khepri:get_many(?STORE_ID, PathPattern).
adv_get(Path) ->
khepri_adv:get(?STORE_ID, Path).
get_many(PathPattern, Options) ->
khepri:get_many(?STORE_ID, PathPattern, Options).
adv_get_many(PathPattern) ->
khepri_adv:get_many(?STORE_ID, PathPattern).
match(Path) ->
match(Path, #{}).
adv_get_many(PathPattern, Options) ->
khepri_adv:get_many(?STORE_ID, PathPattern, Options).
match(Path, Options) ->
khepri:get_many(?STORE_ID, Path, Options).
exists(PathPattern) ->
khepri:exists(?STORE_ID, PathPattern).
exists(Path) -> khepri:exists(?STORE_ID, Path).
exists(PathPattern, Options) ->
khepri:exists(?STORE_ID, PathPattern, Options).
list(Path) ->
khepri:get_many(
?STORE_ID, Path ++ [?KHEPRI_WILDCARD_STAR]).
%% `count/{1,2}' sets the `favor => low_latency' option.
list_child_nodes(Path) ->
Options = #{props_to_return => [child_names]},
case khepri_adv:get_many(?STORE_ID, Path, Options) of
{ok, Result} ->
case maps:values(Result) of
[#{child_names := ChildNames}] ->
{ok, ChildNames};
[] ->
[]
end;
Error ->
Error
count(PathPattern) ->
count(PathPattern, #{}).
count(PathPattern, Options) ->
Options1 = Options#{favor => low_latency},
khepri:count(?STORE_ID, PathPattern, Options1).
fold(PathPattern, Pred, Acc) ->
khepri:fold(?STORE_ID, PathPattern, Pred, Acc).
fold(PathPattern, Pred, Acc, Options) ->
khepri:fold(?STORE_ID, PathPattern, Pred, Acc, Options).
foreach(PathPattern, Pred) ->
khepri:foreach(?STORE_ID, PathPattern, Pred).
foreach(PathPattern, Pred, Options) ->
khepri:foreach(?STORE_ID, PathPattern, Pred, Options).
map(PathPattern, Pred) ->
khepri:map(?STORE_ID, PathPattern, Pred).
map(PathPattern, Pred, Options) ->
khepri:map(?STORE_ID, PathPattern, Pred, Options).
filter(PathPattern, Pred) ->
khepri:filter(?STORE_ID, PathPattern, Pred).
filter(PathPattern, Pred, Options) ->
khepri:filter(?STORE_ID, PathPattern, Pred, Options).
put(PathPattern, Data) ->
put(PathPattern, Data, #{}).
put(PathPattern, Data, Options) ->
Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options),
khepri:put(?STORE_ID, PathPattern, Data, Options1).
adv_put(PathPattern, Data) ->
adv_put(PathPattern, Data, #{}).
adv_put(PathPattern, Data, Options) ->
Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options),
khepri_adv:put(?STORE_ID, PathPattern, Data, Options1).
create(PathPattern, Data) ->
create(PathPattern, Data, #{}).
create(PathPattern, Data, Options) ->
Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options),
khepri:create(?STORE_ID, PathPattern, Data, Options1).
adv_create(PathPattern, Data) ->
adv_create(PathPattern, Data, #{}).
adv_create(PathPattern, Data, Options) ->
Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options),
khepri_adv:create(?STORE_ID, PathPattern, Data, Options1).
update(PathPattern, Data) ->
update(PathPattern, Data, #{}).
update(PathPattern, Data, Options) ->
Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options),
khepri:update(?STORE_ID, PathPattern, Data, Options1).
adv_update(PathPattern, Data) ->
adv_update(PathPattern, Data, #{}).
adv_update(PathPattern, Data, Options) ->
Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options),
khepri_adv:update(?STORE_ID, PathPattern, Data, Options1).
%% `delete/{1,2}' calls `khepri:delete_many/2.
delete(PathPattern) ->
delete(PathPattern, #{}).
delete(PathPattern, Options) ->
Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options),
khepri:delete_many(?STORE_ID, PathPattern, Options1).
adv_delete(PathPattern) ->
adv_delete(PathPattern, #{}).
adv_delete(PathPattern, Options) ->
Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options),
khepri_adv:delete_many(?STORE_ID, PathPattern, Options1).
clear_payload(PathPattern) ->
clear_payload(PathPattern, #{}).
clear_payload(PathPattern, Options) ->
Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options),
khepri:clear_payload(?STORE_ID, PathPattern, Options1).
transaction(Fun) ->
transaction(Fun, auto, #{}).
transaction(Fun, ReadWrite) ->
transaction(Fun, ReadWrite, #{}).
transaction(Fun, ReadWrite, Options) ->
Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options),
case khepri:transaction(?STORE_ID, Fun, ReadWrite, Options1) of
ok -> ok; %% Async transaction.
{ok, Result} -> Result;
{error, Reason} -> throw({error, Reason})
end.
count_children(Path) ->
Options = #{props_to_return => [child_list_length]},
case khepri_adv:get_many(?STORE_ID, Path, Options) of
{ok, Map} ->
lists:sum([L || #{child_list_length := L} <- maps:values(Map)]);
_ ->
0
end.
fence(Timeout) ->
khepri:fence(?STORE_ID, Timeout).
clear_payload(Path) ->
khepri:clear_payload(?STORE_ID, Path, ?DEFAULT_COMMAND_OPTIONS).
handle_async_ret(RaEvent) ->
khepri:handle_async_ret(?STORE_ID, RaEvent).
delete(Path) ->
khepri:delete_many(?STORE_ID, Path, ?DEFAULT_COMMAND_OPTIONS).
delete(Path, Options0) ->
Options = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options0),
khepri:delete_many(?STORE_ID, Path, Options).
%% `delete_or_fail/1' is not a proxy to a Khepri function.
delete_or_fail(Path) ->
%% `Path' must not be a pattern.
case khepri_adv:delete(?STORE_ID, Path, ?DEFAULT_COMMAND_OPTIONS) of
{ok, #{Path := NodeProps}} ->
case maps:size(NodeProps) of
@ -1246,78 +1304,9 @@ delete_or_fail(Path) ->
Error
end.
adv_delete_many(Path) ->
khepri_adv:delete_many(?STORE_ID, Path, ?DEFAULT_COMMAND_OPTIONS).
put(PathPattern, Data) ->
khepri:put(
?STORE_ID, PathPattern, Data, ?DEFAULT_COMMAND_OPTIONS).
put(PathPattern, Data, Options0) ->
Options = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options0),
khepri:put(
?STORE_ID, PathPattern, Data, Options).
adv_put(PathPattern, Data) ->
khepri_adv:put(
?STORE_ID, PathPattern, Data, ?DEFAULT_COMMAND_OPTIONS).
transaction(Fun) ->
transaction(Fun, auto, #{}).
transaction(Fun, ReadWrite) ->
transaction(Fun, ReadWrite, #{}).
transaction(Fun, ReadWrite, Options) ->
Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options),
case khepri:transaction(?STORE_ID, Fun, ReadWrite, Options1) of
ok -> ok;
{ok, Result} -> Result;
{error, Reason} -> throw({error, Reason})
end.
clear_store() ->
khepri:delete_many(?STORE_ID, "*", ?DEFAULT_COMMAND_OPTIONS).
info() ->
ok = setup(),
khepri:info(?STORE_ID).
handle_async_ret(RaEvent) ->
khepri:handle_async_ret(?STORE_ID, RaEvent).
fence(Timeout) ->
khepri:fence(?STORE_ID, Timeout).
-spec unregister_legacy_projections() -> Ret when
Ret :: ok | timeout_error().
%% @doc Unregisters any projections which were registered in RabbitMQ 3.13.x
%% versions.
%%
%% In 3.13.x until 3.13.8 we mistakenly registered these projections even if
%% Khepri was not enabled. This function is used by the `khepri_db' enable
%% callback to remove those projections before we register the ones necessary
%% for 4.0.x.
%%
%% @private
unregister_legacy_projections() ->
%% Note that we don't use `all' since `khepri_mnesia_migration' also
%% creates a projection table which we don't want to unregister. Instead
%% we list all of the legacy projection names:
LegacyNames = [
rabbit_khepri_exchange,
rabbit_khepri_queue,
rabbit_khepri_vhost,
rabbit_khepri_users,
rabbit_khepri_global_rtparams,
rabbit_khepri_per_vhost_rtparams,
rabbit_khepri_user_permissions,
rabbit_khepri_bindings,
rabbit_khepri_index_route,
rabbit_khepri_topic_trie
],
khepri:unregister_projections(?STORE_ID, LegacyNames).
%% -------------------------------------------------------------------
%% Projections setup.
%% -------------------------------------------------------------------
register_projections() ->
RegFuns = [fun register_rabbit_exchange_projection/0,
@ -1945,6 +1934,36 @@ do_migrate_mnesia_tables(FeatureName, Migrations) ->
{error, {migration_failure, Error}}
end.
-spec unregister_legacy_projections() -> Ret when
Ret :: ok | timeout_error().
%% @doc Unregisters any projections which were registered in RabbitMQ 3.13.x
%% versions.
%%
%% In 3.13.x until 3.13.8 we mistakenly registered these projections even if
%% Khepri was not enabled. This function is used by the `khepri_db' enable
%% callback to remove those projections before we register the ones necessary
%% for 4.0.x.
%%
%% @private
unregister_legacy_projections() ->
%% Note that we don't use `all' since `khepri_mnesia_migration' also
%% creates a projection table which we don't want to unregister. Instead
%% we list all of the legacy projection names:
LegacyNames = [
rabbit_khepri_exchange,
rabbit_khepri_queue,
rabbit_khepri_vhost,
rabbit_khepri_users,
rabbit_khepri_global_rtparams,
rabbit_khepri_per_vhost_rtparams,
rabbit_khepri_user_permissions,
rabbit_khepri_bindings,
rabbit_khepri_index_route,
rabbit_khepri_topic_trie
],
khepri:unregister_projections(?STORE_ID, LegacyNames).
-spec handle_fallback(Funs) -> Ret when
Funs :: #{mnesia := Fun, khepri := Fun | Ret},
Fun :: fun(() -> Ret),

View File

@ -272,7 +272,7 @@ end_per_testcase(Testcase, Config) ->
TableDefs),
%% Clear all data in Khepri.
ok = rabbit_khepri:clear_store(),
ok = rabbit_khepri:delete("*"),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
@ -2719,4 +2719,4 @@ check_storage(khepri, none, Content) ->
rabbit_khepri:info(),
Path = [#if_all{conditions = [?KHEPRI_WILDCARD_STAR_STAR,
#if_has_data{}]}],
?assertEqual({ok, Content}, rabbit_khepri:match(Path)).
?assertEqual({ok, Content}, rabbit_khepri:get_many(Path)).

View File

@ -265,6 +265,6 @@ child_id_format(Config) ->
?assertMatch(
{ok, #{Path := _}},
rabbit_ct_broker_helpers:rpc(
Config, NewNode, rabbit_khepri, list,
[Pattern]))
Config, NewNode, rabbit_khepri, get_many,
[Pattern ++ [?KHEPRI_WILDCARD_STAR]]))
end.