diff --git a/deps/rabbit/src/rabbit_db.erl b/deps/rabbit/src/rabbit_db.erl index 2bf52b3a01..f8bf3d0ea1 100644 --- a/deps/rabbit/src/rabbit_db.erl +++ b/deps/rabbit/src/rabbit_db.erl @@ -328,7 +328,7 @@ list_in_khepri(Path) -> Objects :: [term()]. list_in_khepri(Path, Options) -> - case rabbit_khepri:match(Path, Options) of + case rabbit_khepri:get_many(Path, Options) of {ok, Map} -> maps:values(Map); _ -> [] end. diff --git a/deps/rabbit/src/rabbit_db_rtparams.erl b/deps/rabbit/src/rabbit_db_rtparams.erl index 68decc6ca9..39141903aa 100644 --- a/deps/rabbit/src/rabbit_db_rtparams.erl +++ b/deps/rabbit/src/rabbit_db_rtparams.erl @@ -357,7 +357,7 @@ delete_vhost_in_mnesia_tx(VHostName) -> delete_vhost_in_khepri(VHostName) -> Pattern = khepri_vhost_rp_path( VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR), - case rabbit_khepri:adv_delete_many(Pattern) of + case rabbit_khepri:adv_delete(Pattern) of {ok, NodePropsMap} -> RTParams = maps:fold( diff --git a/deps/rabbit/src/rabbit_db_user.erl b/deps/rabbit/src/rabbit_db_user.erl index 81deccfa6c..3a700a3b35 100644 --- a/deps/rabbit/src/rabbit_db_user.erl +++ b/deps/rabbit/src/rabbit_db_user.erl @@ -402,7 +402,7 @@ match_user_permissions_in_mnesia_tx(Username, VHostName) -> match_user_permissions_in_khepri('_' = _Username, '_' = _VHostName) -> Path = khepri_user_permission_path(?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR), - case rabbit_khepri:match(Path) of + case rabbit_khepri:get_many(Path) of {ok, Map} -> maps:values(Map); _ -> diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index a94d50b8ba..be9d5b42b0 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -2,7 +2,8 @@ %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% Copyright (c) 2023-2025 Broadcom. All Rights Reserved. The term “Broadcom” +%% refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% %% @doc Khepri database uses wrapper. @@ -107,7 +108,9 @@ dir/0, get_ra_cluster_name/0, get_store_id/0, - root_path/0]). + root_path/0, + + info/0]). %% Clustering. -export([can_join_cluster/1, @@ -130,45 +133,38 @@ status/0, cli_cluster_status/0]). --export([fence/1, - info/0, +%% "Proxy" functions to Khepri query/update API. +-export([is_empty/0, - is_empty/0, - create/2, - adv_create/2, - update/2, - cas/3, - fold/3, fold/4, - foreach/2, - filter/2, - - get/1, - get/2, + get/1, get/2, + adv_get/1, adv_get/2, + get_many/1, get_many/2, + adv_get_many/1, adv_get_many/2, + exists/1, exists/2, count/1, count/2, - get_many/1, - adv_get/1, - adv_get_many/1, - match/1, - match/2, - exists/1, - list/1, - list_child_nodes/1, - count_children/1, + fold/3, fold/4, + foreach/2, foreach/3, + map/2, map/3, + filter/2, filter/3, put/2, put/3, - adv_put/2, - clear_payload/1, + adv_put/2, adv_put/3, + create/2, create/3, + adv_create/2, adv_create/3, + update/2, update/3, + adv_update/2, adv_update/3, + delete/1, delete/2, - delete_or_fail/1, - adv_delete_many/1, + adv_delete/1, adv_delete/2, + clear_payload/1, clear_payload/2, - transaction/1, - transaction/2, - transaction/3, + transaction/1, transaction/2, transaction/3, - clear_store/0, + fence/1, - handle_async_ret/1]). + handle_async_ret/1, + + delete_or_fail/1]). %% Used during migration to join the standalone Khepri nodes and form the %% equivalent cluster @@ -177,9 +173,6 @@ is_enabled/0, is_enabled/1, get_feature_state/0, get_feature_state/1, handle_fallback/1]). -%% To add the current node to an existing cluster --export([]). --export([]). -ifdef(TEST). -export([register_projections/0, @@ -427,6 +420,10 @@ get_store_id() -> root_path() -> ?RABBITMQ_KHEPRI_ROOT_PATH. +info() -> + ok = setup(), + khepri:info(?STORE_ID). + %% ------------------------------------------------------------------- %% Clustering. %% ------------------------------------------------------------------- @@ -1140,100 +1137,161 @@ cli_cluster_status() -> %% They are some additional functions too, because they are useful in %% RabbitMQ. They might be moved to Khepri in the future. -is_empty() -> khepri:is_empty(?STORE_ID). +is_empty() -> + khepri:is_empty(?STORE_ID). -create(Path, Data) -> - khepri:create(?STORE_ID, Path, Data, ?DEFAULT_COMMAND_OPTIONS). -adv_create(Path, Data) -> adv_create(Path, Data, #{}). -adv_create(Path, Data, Options0) -> - Options = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options0), - khepri_adv:create(?STORE_ID, Path, Data, Options). -update(Path, Data) -> - khepri:update(?STORE_ID, Path, Data, ?DEFAULT_COMMAND_OPTIONS). -cas(Path, Pattern, Data) -> - khepri:compare_and_swap( - ?STORE_ID, Path, Pattern, Data, ?DEFAULT_COMMAND_OPTIONS). +get(PathPattern) -> + khepri:get(?STORE_ID, PathPattern). -fold(Path, Pred, Acc) -> - khepri:fold(?STORE_ID, Path, Pred, Acc). +get(PathPattern, Options) -> + khepri:get(?STORE_ID, PathPattern, Options). -fold(Path, Pred, Acc, Options) -> - khepri:fold(?STORE_ID, Path, Pred, Acc, Options). +adv_get(PathPattern) -> + khepri_adv:get(?STORE_ID, PathPattern). -foreach(Path, Pred) -> - khepri:foreach(?STORE_ID, Path, Pred). - -filter(Path, Pred) -> - khepri:filter(?STORE_ID, Path, Pred). - -get(Path) -> - khepri:get(?STORE_ID, Path). - -get(Path, Options) -> - khepri:get(?STORE_ID, Path, Options). - -count(PathPattern) -> - khepri:count(?STORE_ID, PathPattern, #{favor => low_latency}). - -count(Path, Options) -> - Options1 = Options#{favor => low_latency}, - khepri:count(?STORE_ID, Path, Options1). +adv_get(PathPattern, Options) -> + khepri_adv:get(?STORE_ID, PathPattern, Options). get_many(PathPattern) -> khepri:get_many(?STORE_ID, PathPattern). -adv_get(Path) -> - khepri_adv:get(?STORE_ID, Path). +get_many(PathPattern, Options) -> + khepri:get_many(?STORE_ID, PathPattern, Options). adv_get_many(PathPattern) -> khepri_adv:get_many(?STORE_ID, PathPattern). -match(Path) -> - match(Path, #{}). +adv_get_many(PathPattern, Options) -> + khepri_adv:get_many(?STORE_ID, PathPattern, Options). -match(Path, Options) -> - khepri:get_many(?STORE_ID, Path, Options). +exists(PathPattern) -> + khepri:exists(?STORE_ID, PathPattern). -exists(Path) -> khepri:exists(?STORE_ID, Path). +exists(PathPattern, Options) -> + khepri:exists(?STORE_ID, PathPattern, Options). -list(Path) -> - khepri:get_many( - ?STORE_ID, Path ++ [?KHEPRI_WILDCARD_STAR]). +%% `count/{1,2}' sets the `favor => low_latency' option. -list_child_nodes(Path) -> - Options = #{props_to_return => [child_names]}, - case khepri_adv:get_many(?STORE_ID, Path, Options) of - {ok, Result} -> - case maps:values(Result) of - [#{child_names := ChildNames}] -> - {ok, ChildNames}; - [] -> - [] - end; - Error -> - Error +count(PathPattern) -> + count(PathPattern, #{}). + +count(PathPattern, Options) -> + Options1 = Options#{favor => low_latency}, + khepri:count(?STORE_ID, PathPattern, Options1). + +fold(PathPattern, Pred, Acc) -> + khepri:fold(?STORE_ID, PathPattern, Pred, Acc). + +fold(PathPattern, Pred, Acc, Options) -> + khepri:fold(?STORE_ID, PathPattern, Pred, Acc, Options). + +foreach(PathPattern, Pred) -> + khepri:foreach(?STORE_ID, PathPattern, Pred). + +foreach(PathPattern, Pred, Options) -> + khepri:foreach(?STORE_ID, PathPattern, Pred, Options). + +map(PathPattern, Pred) -> + khepri:map(?STORE_ID, PathPattern, Pred). + +map(PathPattern, Pred, Options) -> + khepri:map(?STORE_ID, PathPattern, Pred, Options). + +filter(PathPattern, Pred) -> + khepri:filter(?STORE_ID, PathPattern, Pred). + +filter(PathPattern, Pred, Options) -> + khepri:filter(?STORE_ID, PathPattern, Pred, Options). + +put(PathPattern, Data) -> + put(PathPattern, Data, #{}). + +put(PathPattern, Data, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + khepri:put(?STORE_ID, PathPattern, Data, Options1). + +adv_put(PathPattern, Data) -> + adv_put(PathPattern, Data, #{}). + +adv_put(PathPattern, Data, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + khepri_adv:put(?STORE_ID, PathPattern, Data, Options1). + +create(PathPattern, Data) -> + create(PathPattern, Data, #{}). + +create(PathPattern, Data, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + khepri:create(?STORE_ID, PathPattern, Data, Options1). + +adv_create(PathPattern, Data) -> + adv_create(PathPattern, Data, #{}). + +adv_create(PathPattern, Data, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + khepri_adv:create(?STORE_ID, PathPattern, Data, Options1). + +update(PathPattern, Data) -> + update(PathPattern, Data, #{}). + +update(PathPattern, Data, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + khepri:update(?STORE_ID, PathPattern, Data, Options1). + +adv_update(PathPattern, Data) -> + adv_update(PathPattern, Data, #{}). + +adv_update(PathPattern, Data, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + khepri_adv:update(?STORE_ID, PathPattern, Data, Options1). + +%% `delete/{1,2}' calls `khepri:delete_many/2. + +delete(PathPattern) -> + delete(PathPattern, #{}). + +delete(PathPattern, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + khepri:delete_many(?STORE_ID, PathPattern, Options1). + +adv_delete(PathPattern) -> + adv_delete(PathPattern, #{}). + +adv_delete(PathPattern, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + khepri_adv:delete_many(?STORE_ID, PathPattern, Options1). + +clear_payload(PathPattern) -> + clear_payload(PathPattern, #{}). + +clear_payload(PathPattern, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + khepri:clear_payload(?STORE_ID, PathPattern, Options1). + +transaction(Fun) -> + transaction(Fun, auto, #{}). + +transaction(Fun, ReadWrite) -> + transaction(Fun, ReadWrite, #{}). + +transaction(Fun, ReadWrite, Options) -> + Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), + case khepri:transaction(?STORE_ID, Fun, ReadWrite, Options1) of + ok -> ok; %% Async transaction. + {ok, Result} -> Result; + {error, Reason} -> throw({error, Reason}) end. -count_children(Path) -> - Options = #{props_to_return => [child_list_length]}, - case khepri_adv:get_many(?STORE_ID, Path, Options) of - {ok, Map} -> - lists:sum([L || #{child_list_length := L} <- maps:values(Map)]); - _ -> - 0 - end. +fence(Timeout) -> + khepri:fence(?STORE_ID, Timeout). -clear_payload(Path) -> - khepri:clear_payload(?STORE_ID, Path, ?DEFAULT_COMMAND_OPTIONS). +handle_async_ret(RaEvent) -> + khepri:handle_async_ret(?STORE_ID, RaEvent). -delete(Path) -> - khepri:delete_many(?STORE_ID, Path, ?DEFAULT_COMMAND_OPTIONS). - -delete(Path, Options0) -> - Options = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options0), - khepri:delete_many(?STORE_ID, Path, Options). +%% `delete_or_fail/1' is not a proxy to a Khepri function. delete_or_fail(Path) -> + %% `Path' must not be a pattern. case khepri_adv:delete(?STORE_ID, Path, ?DEFAULT_COMMAND_OPTIONS) of {ok, #{Path := NodeProps}} -> case maps:size(NodeProps) of @@ -1246,78 +1304,9 @@ delete_or_fail(Path) -> Error end. -adv_delete_many(Path) -> - khepri_adv:delete_many(?STORE_ID, Path, ?DEFAULT_COMMAND_OPTIONS). - -put(PathPattern, Data) -> - khepri:put( - ?STORE_ID, PathPattern, Data, ?DEFAULT_COMMAND_OPTIONS). - -put(PathPattern, Data, Options0) -> - Options = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options0), - khepri:put( - ?STORE_ID, PathPattern, Data, Options). - -adv_put(PathPattern, Data) -> - khepri_adv:put( - ?STORE_ID, PathPattern, Data, ?DEFAULT_COMMAND_OPTIONS). - -transaction(Fun) -> - transaction(Fun, auto, #{}). - -transaction(Fun, ReadWrite) -> - transaction(Fun, ReadWrite, #{}). - -transaction(Fun, ReadWrite, Options) -> - Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options), - case khepri:transaction(?STORE_ID, Fun, ReadWrite, Options1) of - ok -> ok; - {ok, Result} -> Result; - {error, Reason} -> throw({error, Reason}) - end. - -clear_store() -> - khepri:delete_many(?STORE_ID, "*", ?DEFAULT_COMMAND_OPTIONS). - -info() -> - ok = setup(), - khepri:info(?STORE_ID). - -handle_async_ret(RaEvent) -> - khepri:handle_async_ret(?STORE_ID, RaEvent). - -fence(Timeout) -> - khepri:fence(?STORE_ID, Timeout). - --spec unregister_legacy_projections() -> Ret when - Ret :: ok | timeout_error(). -%% @doc Unregisters any projections which were registered in RabbitMQ 3.13.x -%% versions. -%% -%% In 3.13.x until 3.13.8 we mistakenly registered these projections even if -%% Khepri was not enabled. This function is used by the `khepri_db' enable -%% callback to remove those projections before we register the ones necessary -%% for 4.0.x. -%% -%% @private - -unregister_legacy_projections() -> - %% Note that we don't use `all' since `khepri_mnesia_migration' also - %% creates a projection table which we don't want to unregister. Instead - %% we list all of the legacy projection names: - LegacyNames = [ - rabbit_khepri_exchange, - rabbit_khepri_queue, - rabbit_khepri_vhost, - rabbit_khepri_users, - rabbit_khepri_global_rtparams, - rabbit_khepri_per_vhost_rtparams, - rabbit_khepri_user_permissions, - rabbit_khepri_bindings, - rabbit_khepri_index_route, - rabbit_khepri_topic_trie - ], - khepri:unregister_projections(?STORE_ID, LegacyNames). +%% ------------------------------------------------------------------- +%% Projections setup. +%% ------------------------------------------------------------------- register_projections() -> RegFuns = [fun register_rabbit_exchange_projection/0, @@ -1945,6 +1934,36 @@ do_migrate_mnesia_tables(FeatureName, Migrations) -> {error, {migration_failure, Error}} end. +-spec unregister_legacy_projections() -> Ret when + Ret :: ok | timeout_error(). +%% @doc Unregisters any projections which were registered in RabbitMQ 3.13.x +%% versions. +%% +%% In 3.13.x until 3.13.8 we mistakenly registered these projections even if +%% Khepri was not enabled. This function is used by the `khepri_db' enable +%% callback to remove those projections before we register the ones necessary +%% for 4.0.x. +%% +%% @private + +unregister_legacy_projections() -> + %% Note that we don't use `all' since `khepri_mnesia_migration' also + %% creates a projection table which we don't want to unregister. Instead + %% we list all of the legacy projection names: + LegacyNames = [ + rabbit_khepri_exchange, + rabbit_khepri_queue, + rabbit_khepri_vhost, + rabbit_khepri_users, + rabbit_khepri_global_rtparams, + rabbit_khepri_per_vhost_rtparams, + rabbit_khepri_user_permissions, + rabbit_khepri_bindings, + rabbit_khepri_index_route, + rabbit_khepri_topic_trie + ], + khepri:unregister_projections(?STORE_ID, LegacyNames). + -spec handle_fallback(Funs) -> Ret when Funs :: #{mnesia := Fun, khepri := Fun | Ret}, Fun :: fun(() -> Ret), diff --git a/deps/rabbit/test/metadata_store_phase1_SUITE.erl b/deps/rabbit/test/metadata_store_phase1_SUITE.erl index 051e2c9c5d..becc8990fa 100644 --- a/deps/rabbit/test/metadata_store_phase1_SUITE.erl +++ b/deps/rabbit/test/metadata_store_phase1_SUITE.erl @@ -272,7 +272,7 @@ end_per_testcase(Testcase, Config) -> TableDefs), %% Clear all data in Khepri. - ok = rabbit_khepri:clear_store(), + ok = rabbit_khepri:delete("*"), rabbit_ct_helpers:testcase_finished(Config, Testcase). @@ -2719,4 +2719,4 @@ check_storage(khepri, none, Content) -> rabbit_khepri:info(), Path = [#if_all{conditions = [?KHEPRI_WILDCARD_STAR_STAR, #if_has_data{}]}], - ?assertEqual({ok, Content}, rabbit_khepri:match(Path)). + ?assertEqual({ok, Content}, rabbit_khepri:get_many(Path)). diff --git a/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl b/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl index 5c3221febc..b7e0911f98 100644 --- a/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl +++ b/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl @@ -265,6 +265,6 @@ child_id_format(Config) -> ?assertMatch( {ok, #{Path := _}}, rabbit_ct_broker_helpers:rpc( - Config, NewNode, rabbit_khepri, list, - [Pattern])) + Config, NewNode, rabbit_khepri, get_many, + [Pattern ++ [?KHEPRI_WILDCARD_STAR]])) end.