Merge pull request #12131 from rabbitmq/filter-running-nodes-to-cluster-khepri

Khepri: Filter running nodes when selecting a node to cluster with
This commit is contained in:
Jean-Sébastien Pédron 2024-08-29 10:33:55 +02:00 committed by GitHub
commit b6e8586657
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 35 additions and 13 deletions

View File

@ -373,20 +373,42 @@ add_member(JoiningNode, JoinedNode) when is_atom(JoinedNode) ->
JoiningNode, rabbit_khepri, do_join, [JoinedNode]),
post_add_member(JoiningNode, JoinedNode, Ret);
add_member(JoiningNode, [_ | _] = Cluster) ->
JoinedNode = pick_node_in_cluster(Cluster),
case pick_node_in_cluster(Cluster) of
{ok, JoinedNode} ->
?LOG_INFO(
"Khepri clustering: Attempt to add node ~p to cluster ~0p "
"through node ~p",
[JoiningNode, Cluster, JoinedNode],
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
%% Recurse with a single node taken in the `Cluster' list.
add_member(JoiningNode, JoinedNode).
add_member(JoiningNode, JoinedNode);
{error, _} = Error ->
Error
end.
pick_node_in_cluster([_ | _] = Cluster) when is_list(Cluster) ->
pick_node_in_cluster([_ | _] = Cluster) ->
RunningNodes = lists:filter(
fun(Node) ->
try
erpc:call(
Node,
khepri_cluster, is_store_running,
[?STORE_ID])
catch
_:_ ->
false
end
end, Cluster),
case RunningNodes of
[_ | _] ->
ThisNode = node(),
case lists:member(ThisNode, Cluster) of
SelectedNode = case lists:member(ThisNode, RunningNodes) of
true -> ThisNode;
false -> hd(Cluster)
false -> hd(RunningNodes)
end,
{ok, SelectedNode};
[] ->
{error, {no_nodes_to_cluster_with, Cluster}}
end.
do_join(RemoteNode) when RemoteNode =/= node() ->