Khepri: Clean up the setup/clustering code of the integration code

[Why]
The `rabbit_khepri` module grew during the work to add Khepri support to
RabbitMQ and while Khepri was itself written. The current code is
therefore unorganized.

[How]
This commit tries to sort the code that manages the setup of Khepri and
the functions tha deal with the Khepri cluster. It also groups functions
which provide support for CLI commands.

It also adds documentation to several functions.

Finally, when a node joins a cluster, we stop displaying the content of
the Khepri tree.
This commit is contained in:
Jean-Sébastien Pédron 2025-04-23 18:49:49 +02:00
parent d2b5f51bfd
commit bd3aee35b4
No known key found for this signature in database
GPG Key ID: 39E99761A5FD94CC
2 changed files with 442 additions and 251 deletions

View File

@ -279,7 +279,7 @@ forget_member_using_khepri(_Node, true) ->
#{domain => ?RMQLOG_DOMAIN_DB}),
{error, not_supported};
forget_member_using_khepri(Node, false = _RemoveWhenOffline) ->
rabbit_khepri:leave_cluster(Node).
rabbit_khepri:remove_member(Node).
%% -------------------------------------------------------------------
%% Cluster update.

View File

@ -98,21 +98,40 @@
-include("include/rabbit_khepri.hrl").
%% Initialisation.
-export([setup/0,
setup/1,
register_projections/0,
init/1,
can_join_cluster/1,
reset/0,
dir/0,
get_ra_cluster_name/0,
get_store_id/0,
root_path/0]).
%% Clustering.
-export([can_join_cluster/1,
add_member/2,
do_join/1, %% Internal RPC from this module.
remove_member/1,
members/0,
locally_known_members/0,
nodes/0,
locally_known_nodes/0,
get_ra_cluster_name/0,
get_store_id/0,
transfer_leadership/1,
fence/1,
check_cluster_consistency/0,
node_info/0, %% Internal RPC from this module.
cluster_status_from_khepri/0,
transfer_leadership/1]).
%% CLI command support.
-export([force_shrink_member_to_current_member/0,
status/0,
cli_cluster_status/0]).
-export([fence/1,
info/0,
is_empty/0,
create/2,
@ -149,13 +168,8 @@
clear_store/0,
dir/0,
info/0,
root_path/0,
handle_async_ret/1]).
handle_async_ret/1,
status/0]).
%% Used during migration to join the standalone Khepri nodes and form the
%% equivalent cluster
-export([khepri_db_migration_enable/1,
@ -163,20 +177,13 @@
is_enabled/0, is_enabled/1,
get_feature_state/0, get_feature_state/1,
handle_fallback/1]).
-export([do_join/1]).
%% To add the current node to an existing cluster
-export([leave_cluster/1]).
-export([check_cluster_consistency/0,
check_cluster_consistency/2,
node_info/0]).
-export([reset/0]).
-export([cluster_status_from_khepri/0,
cli_cluster_status/0]).
-export([force_shrink_member_to_current_member/0]).
-export([]).
-export([]).
-ifdef(TEST).
-export([force_metadata_store/1,
-export([register_projections/0,
force_metadata_store/1,
clear_forced_metadata_store/0]).
-endif.
@ -238,19 +245,35 @@
]).
%% -------------------------------------------------------------------
%% API wrapping Khepri.
%% Khepri integration initialisation.
%% -------------------------------------------------------------------
-spec setup() -> ok | no_return().
%% @private
%% @doc Starts the local Khepri store.
%%
%% @see setup/1.
setup() ->
setup(rabbit_prelaunch:get_context()).
-spec setup(map()) -> ok | no_return().
%% @private
-spec setup(Context) -> ok | no_return() when
Context :: map().
%% @doc Starts the local Khepri store.
%%
%% Before starting the Khepri store, it ensures that the underlying Ra system
%% we want to use is also running.
%%
%% This function is idempotent whether the Khepri store is started for the
%% first time or it is restarted.
%%
%% This function blocks until a leader is elected.
%%
%% The Khepri application must be running.
%%
%% If it fails to start the Khepri store or if it reaches a timeout waiting for
%% a leader, this function exits.
setup(_) ->
setup(_Context) ->
?LOG_DEBUG("Starting Khepri-based " ?RA_FRIENDLY_NAME),
ok = ensure_ra_system_started(),
Timeout = application:get_env(rabbit, khepri_default_timeout, 30000),
@ -279,17 +302,25 @@ setup(_) ->
exit(Error)
end.
ensure_ra_system_started() ->
{ok, _} = application:ensure_all_started(khepri),
ok = rabbit_ra_systems:ensure_ra_system_started(?RA_SYSTEM).
retry_timeout() ->
case application:get_env(rabbit, khepri_leader_wait_retry_timeout) of
{ok, T} -> T;
{ok, T} when is_integer(T) andalso T >= 0 -> T;
undefined -> 300_000
end.
%% @private
-spec init(IsVirgin) -> Ret when
IsVirgin :: boolean(),
Ret :: ok | timeout_error().
%% @doc Ensures the store has caught up with the cluster.
%%
%% In addition to making sure the local Khepri store is on the same page as the
%% leader, it initialises the Khepri projections if this node is virgin.
%%
%% Finally, it requests the deletion of transient queues on this node.
init(IsVirgin) ->
case members() of
@ -307,10 +338,8 @@ init(IsVirgin) ->
"up to the Raft cluster leader", [],
#{domain => ?RMQLOG_DOMAIN_DB}),
ok ?= case IsVirgin of
true ->
register_projections();
false ->
ok
true -> register_projections();
false -> ok
end,
%% Delete transient queues on init.
%% Note that we also do this in the
@ -332,6 +361,89 @@ await_replication() ->
#{domain => ?RMQLOG_DOMAIN_DB}),
fence(Timeout).
-spec reset() -> ok | no_return().
%% @doc Reset and stops the local Khepri store.
%%
%% This function first ensures that the local Khepri store is running.
%%
%% Then it resets the store. This includes removing it from its cluster if
%% any, and deleting all tree nodes.
%%
%% Finally, it stops the store and deteles files on disk.
%%
%% The Khepri application is left running.
%%
%% RabbitMQ must be stopped on this Erlang node. This functions throws an
%% exception if it is called while RabbitMQ is still running.
%%
%% @private
reset() ->
case rabbit:is_running() of
false ->
%% Rabbit should be stopped, but Khepri needs to be running.
%% Restart it.
ok = setup(),
ok = khepri_cluster:reset(?RA_CLUSTER_NAME),
ok = khepri:stop(?RA_CLUSTER_NAME),
_ = file:delete(rabbit_guid:filename()),
ok;
true ->
throw({error, rabbitmq_unexpectedly_running})
end.
-spec dir() -> Dir when
Dir :: file:filename_all().
%% @doc Returns the Khepri store directory.
%%
%% This corresponds to the underlying Ra system's directory.
dir() ->
DataDir = rabbit_mnesia:dir(),
StoreDir = filename:join(DataDir, atom_to_list(?STORE_ID)),
StoreDir.
-spec get_ra_cluster_name() -> RaClusterName when
RaClusterName :: ra:cluster_name().
%% @doc Returns the Ra cluster name.
get_ra_cluster_name() ->
?RA_CLUSTER_NAME.
-spec get_store_id() -> StoreId when
StoreId :: khepri:store_id().
%% @doc Returns the Khepri store identifier.
get_store_id() ->
?STORE_ID.
-spec root_path() -> RootPath when
RootPath :: khepri_path:path().
%% @doc Returns the path where RabbitMQ stores every metadata.
%%
%% This path must be prepended to all paths used by RabbitMQ subsystems.
root_path() ->
?RABBITMQ_KHEPRI_ROOT_PATH.
%% -------------------------------------------------------------------
%% Clustering.
%% -------------------------------------------------------------------
-spec can_join_cluster(DiscoveryNode) -> Ret when
DiscoveryNode :: node(),
Ret :: {ok, ClusterNodes} | {error, any()},
ClusterNodes :: [node()].
%% @doc Indicates if this node can join `DiscoveryNode' cluster.
%%
%% At the level of Khepri, it is always possible to join a remote cluster for
%% now. Therefore this function only queries the list of members in
%% `DiscoveryNode' cluster and returns it.
%%
%% @returns an `ok' tuple with the list of members in `DiscoveryNode' cluster,
%% or an error tuple.
%%
%% @private
can_join_cluster(DiscoveryNode) when is_atom(DiscoveryNode) ->
@ -339,7 +451,7 @@ can_join_cluster(DiscoveryNode) when is_atom(DiscoveryNode) ->
try
ClusterNodes0 = erpc:call(
DiscoveryNode,
rabbit_khepri, locally_known_nodes, []),
?MODULE, locally_known_nodes, []),
ClusterNodes1 = ClusterNodes0 -- [ThisNode],
{ok, ClusterNodes1}
catch
@ -347,6 +459,21 @@ can_join_cluster(DiscoveryNode) when is_atom(DiscoveryNode) ->
{error, Reason}
end.
-spec add_member(JoiningNode, JoinedNode | JoinedCluster) -> Ret when
JoiningNode :: node(),
JoinedNode :: node(),
JoinedCluster :: [node()],
Ret :: ok | {error, any()}.
%% @doc Adds `JoiningNode' to `JoinedNode''s cluster.
%%
%% If a list of nodes is passed as `JoinedCluster', this function will pick
%% this node if it is part of the list and the Khepri store is running, or the
%% first node in the list that runs the Khepri store.
%%
%% The actual join code runs on the node that wants to join a cluster.
%% Therefore, if `JoiningNode' is this node, the code runs locally. Otherwise,
%% this function does an RPC call to execute the remote function.
%%
%% @private
add_member(JoiningNode, JoinedNode)
@ -355,7 +482,7 @@ add_member(JoiningNode, JoinedNode)
post_add_member(JoiningNode, JoinedNode, Ret);
add_member(JoiningNode, JoinedNode) when is_atom(JoinedNode) ->
Ret = rabbit_misc:rpc_call(
JoiningNode, rabbit_khepri, do_join, [JoinedNode]),
JoiningNode, ?MODULE, do_join, [JoinedNode]),
post_add_member(JoiningNode, JoinedNode, Ret);
add_member(JoiningNode, [_ | _] = Cluster) ->
case pick_node_in_cluster(Cluster) of
@ -396,6 +523,22 @@ pick_node_in_cluster([_ | _] = Cluster) ->
{error, {no_nodes_to_cluster_with, Cluster}}
end.
-spec do_join(RemoteNode) -> Ret when
RemoteNode :: node(),
Ret :: ok | {error, any()}.
%% @doc Adds this node to `RemoteNode''s cluster.
%%
%% Before adding this node to the remote node's cluster, this function call
%% {@link setup/0} to ensure the Khepri store is running.
%%
%% It also pings the remote node to make sure it is reachable.
%%
%% If RabbitMQ is still running on the Erlang node, it will put it in
%% maintenance before proceeding. It will resume RabbitMQ after the join (or if
%% the join fails).
%%
%% @private
do_join(RemoteNode) when RemoteNode =/= node() ->
ThisNode = node(),
@ -408,7 +551,6 @@ do_join(RemoteNode) when RemoteNode =/= node() ->
%% Ensure the local Khepri store is running before we can reset it. It
%% could be stopped if RabbitMQ is not running for instance.
ok = setup(),
khepri:info(?RA_CLUSTER_NAME),
%% Ensure the remote node is reachable before we add it.
case net_adm:ping(RemoteNode) of
@ -470,14 +612,27 @@ post_add_member(JoiningNode, JoinedNode, Error) ->
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
Error.
-spec remove_member(NodeToRemove) -> ok when
NodeToRemove :: node().
%% @doc Removes `NodeToRemove' from its cluster.
%%
%% This function runs on the node calling it.
%%
%% If `NodeToRemove' is reachable, this function calls the regular {@link
%% khepri_cluster:reset/1} on `NodeToRemove'. If it is unreachable, this
%% function call Ra on this node to remove the remote member.
%%
%% @private
leave_cluster(Node) ->
retry_khepri_op(fun() -> remove_member(Node) end, 60).
remove_member(Node) ->
retry_khepri_op(fun() -> do_remove_member(Node) end, 60).
-spec do_remove_member(NodeToRemove) -> Ret when
NodeToRemove :: node(),
Ret :: ok | {error, any()}.
%% @private
remove_member(NodeToRemove) when NodeToRemove =/= node() ->
do_remove_member(NodeToRemove) when NodeToRemove =/= node() ->
?LOG_DEBUG(
"Trying to remove node ~s from Khepri cluster \"~s\" on node ~s",
[NodeToRemove, ?RA_CLUSTER_NAME, node()],
@ -525,7 +680,7 @@ remove_reachable_member(NodeToRemove) ->
[NodeToRemove, ?RA_CLUSTER_NAME],
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
ok;
Error ->
{error, _} = Error ->
?LOG_ERROR(
"Failed to remove remote node ~s from Khepri "
"cluster \"~s\": ~p",
@ -563,33 +718,25 @@ remove_down_member(NodeToRemove) ->
{error, timeout}
end.
%% @private
reset() ->
case rabbit:is_running() of
false ->
%% Rabbit should be stopped, but Khepri needs to be running.
%% Restart it.
ok = setup(),
ok = khepri_cluster:reset(?RA_CLUSTER_NAME),
ok = khepri:stop(?RA_CLUSTER_NAME),
_ = file:delete(rabbit_guid:filename()),
ok;
retry_khepri_op(Fun, 0) ->
Fun();
retry_khepri_op(Fun, N) ->
case Fun() of
{error, {no_more_servers_to_try, Reasons}} = Err ->
case lists:member({error,cluster_change_not_permitted}, Reasons) of
true ->
throw({error, rabbitmq_unexpectedly_running})
timer:sleep(1000),
retry_khepri_op(Fun, N - 1);
false ->
Err
end;
{error, cluster_change_not_permitted} ->
timer:sleep(1000),
retry_khepri_op(Fun, N - 1);
Any ->
Any
end.
%% @private
force_shrink_member_to_current_member() ->
ok = ra_server_proc:force_shrink_members_to_current_member(
{?RA_CLUSTER_NAME, node()}).
ensure_ra_system_started() ->
{ok, _} = application:ensure_all_started(khepri),
ok = rabbit_ra_systems:ensure_ra_system_started(?RA_SYSTEM).
-spec members() -> Members when
Members :: [ra:server_id()].
%% @doc Returns the list of Ra server identifiers that are part of the
@ -652,51 +799,177 @@ locally_known_nodes() ->
{error, _Reason} -> []
end.
-spec get_ra_cluster_name() -> RaClusterName when
RaClusterName :: ra:cluster_name().
%% @doc Returns the Ra cluster name.
get_ra_cluster_name() ->
?RA_CLUSTER_NAME.
-spec get_store_id() -> StoreId when
StoreId :: khepri:store_id().
%% @doc Returns the Khepri store identifier.
get_store_id() ->
?STORE_ID.
-spec dir() -> Dir when
Dir :: file:filename_all().
%% @doc Returns the Khepri store directory.
-spec check_cluster_consistency() -> Ret when
Ret :: ok | {error, any()}.
%% @doc Performs various checks to validate that this node is healthy at the
%% metadata store level.
%%
%% This corresponds to the underlying Ra system's directory.
%% @private
dir() ->
filename:join(rabbit_mnesia:dir(), atom_to_list(?STORE_ID)).
check_cluster_consistency() ->
%% We want to find 0 or 1 consistent nodes.
ReachableNodes = rabbit_nodes:list_reachable(),
case lists:foldl(
fun (Node, {error, _}) -> check_cluster_consistency(Node, true);
(_Node, {ok, Status}) -> {ok, Status}
end, {error, not_found}, nodes_excl_me(ReachableNodes))
of
{ok, {RemoteAllNodes, _Running}} ->
case ordsets:is_subset(ordsets:from_list(ReachableNodes),
ordsets:from_list(RemoteAllNodes)) of
true ->
ok;
false ->
%% We delete the schema here since we think we are
%% clustered with nodes that are no longer in the
%% cluster and there is no other way to remove
%% them from our schema. On the other hand, we are
%% sure that there is another online node that we
%% can use to sync the tables with. There is a
%% race here: if between this check and the
%% `init_db' invocation the cluster gets
%% disbanded, we're left with a node with no
%% mnesia data that will try to connect to offline
%% nodes.
%% TODO delete schema in khepri ???
ok
end;
{error, not_found} ->
ok;
{error, _} = E ->
E
end.
-spec transfer_leadership([node()]) ->
{ok, in_progress | undefined | node()} | {error, any()}.
-spec check_cluster_consistency(Node, CheckNodesConsistency) -> Ret when
Node :: node(),
CheckNodesConsistency :: boolean(),
Ret :: {ok, Status} | {error, any()},
Status :: {All, Running},
All :: [node()],
Running :: [node()].
%% @private
check_cluster_consistency(Node, CheckNodesConsistency) ->
case (catch remote_node_info(Node)) of
{badrpc, _Reason} ->
{error, not_found};
{'EXIT', {badarg, _Reason}} ->
{error, not_found};
{_OTP, _Rabbit, {error, _Reason}} ->
{error, not_found};
{_OTP, _Rabbit, {ok, Status}} when CheckNodesConsistency ->
case rabbit_db_cluster:check_compatibility(Node) of
ok ->
case check_nodes_consistency(Node, Status) of
ok -> {ok, Status};
Error -> Error
end;
Error ->
Error
end;
{_OTP, _Rabbit, {ok, Status}} ->
{ok, Status}
end.
-spec remote_node_info(Node) -> Info when
Node :: node(),
Info :: {OtpVersion, RabbitMQVersion, ClusterStatus},
OtpVersion :: string(),
RabbitMQVersion :: string(),
ClusterStatus :: {ok, {All, Running}} | {error, any()},
All :: [node()],
Running :: [node()].
%% @private
remote_node_info(Node) ->
rpc:call(Node, ?MODULE, node_info, []).
-spec node_info() -> Info when
Info :: {OtpVersion, RabbitMQVersion, ClusterStatus},
OtpVersion :: string(),
RabbitMQVersion :: string(),
ClusterStatus :: {ok, {All, Running}} | {error, khepri_not_running},
All :: [node()],
Running :: [node()].
%% @private
node_info() ->
{rabbit_misc:otp_release(),
rabbit_misc:version(),
cluster_status_from_khepri()}.
check_nodes_consistency(Node, {RemoteAllNodes, _RemoteRunningNodes}) ->
case me_in_nodes(RemoteAllNodes) of
true ->
ok;
false ->
{error, {inconsistent_cluster,
format_inconsistent_cluster_message(node(), Node)}}
end.
format_inconsistent_cluster_message(Thinker, Dissident) ->
rabbit_misc:format("Khepri: node ~tp thinks it's clustered "
"with node ~tp, but ~tp disagrees",
[Thinker, Dissident, Dissident]).
nodes_excl_me(Nodes) -> Nodes -- [node()].
me_in_nodes(Nodes) -> lists:member(node(), Nodes).
-spec cluster_status_from_khepri() -> ClusterStatus when
ClusterStatus :: {ok, {All, Running}} | {error, khepri_not_running},
All :: [node()],
Running :: [node()].
%% @private
cluster_status_from_khepri() ->
try
_ = get_ra_key_metrics(node()),
All = locally_known_nodes(),
Running = lists:filter(
fun(N) ->
rabbit_nodes:is_running(N)
end, All),
{ok, {All, Running}}
catch
_:_ ->
{error, khepri_not_running}
end.
-spec transfer_leadership(Candidates) -> Ret when
Candidates :: [node()],
Ret :: {ok, Result} | {error, any()},
Result :: node() | undefined.
%% @private
transfer_leadership([]) ->
rabbit_log:warning("Skipping leadership transfer of metadata store: no candidate "
"(online, not under maintenance) nodes to transfer to!");
?LOG_WARNING(
"Skipping leadership transfer of metadata store: no candidate "
"(online, not under maintenance) nodes to transfer to!",
#{domain => ?RMQLOG_DOMAIN_DB}),
{error, no_candidates};
transfer_leadership(TransferCandidates) ->
case get_feature_state() of
enabled ->
transfer_leadership0(TransferCandidates);
do_transfer_leadership(TransferCandidates);
_ ->
rabbit_log:info("Skipping leadership transfer of metadata store: Khepri is not enabled")
?LOG_INFO(
"Skipping leadership transfer of metadata store: Khepri "
"is not enabled",
#{domain => ?RMQLOG_DOMAIN_DB}),
{error, khepri_not_enabled}
end.
-spec transfer_leadership0([node()]) ->
{ok, in_progress | undefined | node()} | {error, any()}.
transfer_leadership0([]) ->
rabbit_log:warning("Khepri clustering: failed to transfer leadership, no more candidates available", []),
do_transfer_leadership([]) ->
?LOG_WARNING(
"Khepri clustering: failed to transfer leadership, no more "
"candidates available",
#{domain => ?RMQLOG_DOMAIN_DB}),
{error, not_migrated};
transfer_leadership0([Destination | TransferCandidates]) ->
rabbit_log:info("Khepri clustering: transferring leadership to node ~p", [Destination]),
do_transfer_leadership([Destination | TransferCandidates]) ->
?LOG_INFO(
"Khepri clustering: transferring leadership to node ~p",
[Destination],
#{domain => ?RMQLOG_DOMAIN_DB}),
case ra_leaderboard:lookup_leader(?STORE_ID) of
{Name, Node} = Id when Node == node() ->
Timeout = khepri_app:get_default_timeout(),
@ -704,30 +977,80 @@ transfer_leadership0([Destination | TransferCandidates]) ->
ok ->
case ra:members(Id, Timeout) of
{_, _, {_, NewNode}} ->
rabbit_log:info("Khepri clustering: successfully transferred leadership to node ~p", [Destination]),
?LOG_INFO(
"Khepri clustering: successfully "
"transferred leadership to node ~p",
[Destination],
#{domain => ?RMQLOG_DOMAIN_DB}),
{ok, NewNode};
{timeout, _} ->
rabbit_log:warning("Khepri clustering: maybe failed to transfer leadership to node ~p, members query has timed out", [Destination]),
?LOG_WARNING(
"Khepri clustering: maybe failed to transfer "
"leadership to node ~p, members query has "
"timed out",
[Destination],
#{domain => ?RMQLOG_DOMAIN_DB}),
{error, not_migrated}
end;
already_leader ->
rabbit_log:info("Khepri clustering: successfully transferred leadership to node ~p, already the leader", [Destination]),
?LOG_INFO(
"Khepri clustering: successfully transferred "
"leadership to node ~p, already the leader",
[Destination],
#{domain => ?RMQLOG_DOMAIN_DB}),
{ok, Destination};
{error, Reason} ->
rabbit_log:warning("Khepri clustering: failed to transfer leadership to node ~p with the following error ~p", [Destination, Reason]),
transfer_leadership0(TransferCandidates);
?LOG_WARNING(
"Khepri clustering: failed to transfer leadership "
"to node ~p with the following error ~p",
[Destination, Reason],
#{domain => ?RMQLOG_DOMAIN_DB}),
do_transfer_leadership(TransferCandidates);
{timeout, _} ->
rabbit_log:warning("Khepri clustering: failed to transfer leadership to node ~p with a timeout", [Destination]),
transfer_leadership0(TransferCandidates)
?LOG_WARNING(
"Khepri clustering: failed to transfer leadership "
"to node ~p with a timeout",
[Destination],
#{domain => ?RMQLOG_DOMAIN_DB}),
do_transfer_leadership(TransferCandidates)
end;
{_, Node} ->
rabbit_log:info("Khepri clustering: skipping leadership transfer, leader is already in node ~p", [Node]),
?LOG_INFO(
"Khepri clustering: skipping leadership transfer, leader is "
"already on node ~p",
[Node],
#{domain => ?RMQLOG_DOMAIN_DB}),
{ok, Node};
undefined ->
rabbit_log:info("Khepri clustering: skipping leadership transfer, leader not elected", []),
?LOG_INFO(
"Khepri clustering: skipping leadership transfer, leader "
"not elected",
#{domain => ?RMQLOG_DOMAIN_DB}),
{ok, undefined}
end.
%% -------------------------------------------------------------------
%% CLI command support functions.
%% -------------------------------------------------------------------
-spec force_shrink_member_to_current_member() -> ok.
%% @doc Shrinks the local Khepri store to be alone in its cluster.d
%%
%% The difference with a reset is that it does not lose its data.
%%
%% This is only used by the CLI's `force_standalone_khepri_boot' command.
%%
%% @private
force_shrink_member_to_current_member() ->
ok = ra_server_proc:force_shrink_members_to_current_member(
{?RA_CLUSTER_NAME, node()}).
-spec status() -> Status when
Status :: [Metrics],
Metrics :: [{Key, Value}],
Key :: binary(),
Value :: any().
%% @private
status() ->
@ -769,6 +1092,9 @@ status() ->
]
end || N <- Nodes].
-spec get_ra_key_metrics(Node) -> Metrics when
Node :: node(),
Metrics :: map().
%% @private
get_ra_key_metrics(Node) ->
@ -783,6 +1109,11 @@ get_ra_key_metrics(Node) ->
Metrics1 = Metrics0#{machine_version => MacVer},
Metrics1.
-spec cli_cluster_status() -> Status when
Status :: [{nodes, [{disc, [node()]}]} |
{running_nodes, [node()]} |
{cluster_name, binary()} |
{partitions, []}].
%% @private
cli_cluster_status() ->
@ -797,119 +1128,6 @@ cli_cluster_status() ->
[]
end.
%% @private
check_cluster_consistency() ->
%% We want to find 0 or 1 consistent nodes.
ReachableNodes = rabbit_nodes:list_reachable(),
case lists:foldl(
fun (Node, {error, _}) -> check_cluster_consistency(Node, true);
(_Node, {ok, Status}) -> {ok, Status}
end, {error, not_found}, nodes_excl_me(ReachableNodes))
of
{ok, {RemoteAllNodes, _Running}} ->
case ordsets:is_subset(ordsets:from_list(ReachableNodes),
ordsets:from_list(RemoteAllNodes)) of
true ->
ok;
false ->
%% We delete the schema here since we think we are
%% clustered with nodes that are no longer in the
%% cluster and there is no other way to remove
%% them from our schema. On the other hand, we are
%% sure that there is another online node that we
%% can use to sync the tables with. There is a
%% race here: if between this check and the
%% `init_db' invocation the cluster gets
%% disbanded, we're left with a node with no
%% mnesia data that will try to connect to offline
%% nodes.
%% TODO delete schema in khepri ???
ok
end;
{error, not_found} ->
ok;
{error, _} = E ->
E
end.
nodes_excl_me(Nodes) -> Nodes -- [node()].
%% @private
check_cluster_consistency(Node, CheckNodesConsistency) ->
case (catch remote_node_info(Node)) of
{badrpc, _Reason} ->
{error, not_found};
{'EXIT', {badarg, _Reason}} ->
{error, not_found};
{_OTP, _Rabbit, {error, _Reason}} ->
{error, not_found};
{_OTP, _Rabbit, {ok, Status}} when CheckNodesConsistency ->
case rabbit_db_cluster:check_compatibility(Node) of
ok ->
case check_nodes_consistency(Node, Status) of
ok -> {ok, Status};
Error -> Error
end;
Error ->
Error
end;
{_OTP, _Rabbit, {ok, Status}} ->
{ok, Status}
end.
remote_node_info(Node) ->
rpc:call(Node, ?MODULE, node_info, []).
check_nodes_consistency(Node, {RemoteAllNodes, _RemoteRunningNodes}) ->
case me_in_nodes(RemoteAllNodes) of
true ->
ok;
false ->
{error, {inconsistent_cluster,
format_inconsistent_cluster_message(node(), Node)}}
end.
format_inconsistent_cluster_message(Thinker, Dissident) ->
rabbit_misc:format("Khepri: node ~tp thinks it's clustered "
"with node ~tp, but ~tp disagrees",
[Thinker, Dissident, Dissident]).
me_in_nodes(Nodes) -> lists:member(node(), Nodes).
%% @private
node_info() ->
{rabbit_misc:otp_release(),
rabbit_misc:version(),
cluster_status_from_khepri()}.
%% @private
cluster_status_from_khepri() ->
try
_ = get_ra_key_metrics(node()),
All = locally_known_nodes(),
Running = lists:filter(
fun(N) ->
rabbit_nodes:is_running(N)
end, All),
{ok, {All, Running}}
catch
_:_ ->
{error, khepri_not_running}
end.
-spec root_path() -> RootPath when
RootPath :: khepri_path:path().
%% @doc Returns the path where RabbitMQ stores every metadata.
%%
%% This path must be prepended to all paths used by RabbitMQ subsystems.
root_path() ->
?RABBITMQ_KHEPRI_ROOT_PATH.
%% -------------------------------------------------------------------
%% "Proxy" functions to Khepri API.
%% -------------------------------------------------------------------
@ -1421,33 +1639,6 @@ follow_down_update(Table, Exchange, LeafNodeId, [], UpdateFn) ->
keep
end.
retry_khepri_op(Fun, 0) ->
Fun();
retry_khepri_op(Fun, N) ->
case Fun() of
{error, {no_more_servers_to_try, Reasons}} = Err ->
case lists:member({error,cluster_change_not_permitted}, Reasons) of
true ->
timer:sleep(1000),
retry_khepri_op(Fun, N - 1);
false ->
Err
end;
{no_more_servers_to_try, Reasons} = Err ->
case lists:member({error,cluster_change_not_permitted}, Reasons) of
true ->
timer:sleep(1000),
retry_khepri_op(Fun, N - 1);
false ->
Err
end;
{error, cluster_change_not_permitted} ->
timer:sleep(1000),
retry_khepri_op(Fun, N - 1);
Any ->
Any
end.
%% -------------------------------------------------------------------
%% Mnesia->Khepri migration code.
%% -------------------------------------------------------------------
@ -1580,7 +1771,7 @@ khepri_db_migration_post_enable(
FeatureName :: rabbit_feature_flags:feature_name(),
Ret :: ok | {error, Reason},
Reason :: any().
%% @doc Initializes the Khepri cluster based on the Mnesia cluster.
%% @doc Initialises the Khepri cluster based on the Mnesia cluster.
%%
%% It uses the `khepri_mnesia_migration' application to synchronize membership
%% between both cluster.