rabbit_peer_discovery: Move peer discovery driving code from `rabbit_mnesia`

[Why]
Peer discovery is not Mnesia-specific and will be used once we introduce
Khepri.

[How]
The whole peer discovery driving code is moved from `rabbit_mnesia` to
`rabbit_peer_discovery`. When `rabbit_mnesia` calls that code, it simply
passes a callback for the Mnesia-specific cluster expansion code.
This commit is contained in:
Jean-Sébastien Pédron 2023-06-13 10:22:19 +02:00
parent a595128d88
commit 9c358dd9f3
No known key found for this signature in database
GPG Key ID: 39E99761A5FD94CC
4 changed files with 225 additions and 145 deletions

View File

@ -7,6 +7,10 @@
-module(rabbit_mnesia).
-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/logging.hrl").
-export([%% Main interface
init/0,
join_cluster/2,
@ -67,7 +71,6 @@
-ifdef(TEST).
-compile(export_all).
-export([init_with_lock/3]).
-endif.
%%----------------------------------------------------------------------------
@ -98,7 +101,8 @@ init() ->
[dir()]),
rabbit_peer_discovery:log_configured_backend(),
rabbit_peer_discovery:maybe_init(),
init_with_lock();
rabbit_peer_discovery:maybe_create_cluster(
fun create_cluster_callback/2);
false ->
NodeType = node_type(),
case is_node_type_permitted(NodeType) of
@ -121,117 +125,24 @@ init() ->
ok = rabbit_node_monitor:global_sync(),
ok.
init_with_lock() ->
{Retries, Timeout} = rabbit_peer_discovery:locking_retry_timeout(),
init_with_lock(Retries, Timeout, fun run_peer_discovery/0).
init_with_lock(0, _, RunPeerDiscovery) ->
case rabbit_peer_discovery:lock_acquisition_failure_mode() of
ignore ->
rabbit_log:warning("Could not acquire a peer discovery lock, out of retries", []),
RunPeerDiscovery(),
rabbit_peer_discovery:maybe_register();
fail ->
exit(cannot_acquire_startup_lock)
end;
init_with_lock(Retries, Timeout, RunPeerDiscovery) ->
LockResult = rabbit_peer_discovery:lock(),
rabbit_log:debug("rabbit_peer_discovery:lock returned ~tp", [LockResult]),
case LockResult of
not_supported ->
RunPeerDiscovery(),
rabbit_peer_discovery:maybe_register();
{ok, Data} ->
try
RunPeerDiscovery(),
rabbit_peer_discovery:maybe_register()
after
rabbit_peer_discovery:unlock(Data)
end;
{error, _Reason} ->
timer:sleep(Timeout),
init_with_lock(Retries - 1, Timeout, RunPeerDiscovery)
end.
-spec run_peer_discovery() -> ok | {[node()], rabbit_db_cluster:node_type()}.
run_peer_discovery() ->
{RetriesLeft, DelayInterval} = rabbit_peer_discovery:discovery_retries(),
run_peer_discovery_with_retries(RetriesLeft, DelayInterval).
-spec run_peer_discovery_with_retries(non_neg_integer(), non_neg_integer()) -> ok | {[node()], rabbit_db_cluster:node_type()}.
run_peer_discovery_with_retries(0, _DelayInterval) ->
create_cluster_callback(none, NodeType) ->
DiscNodes = [node()],
NodeType1 = case is_node_type_permitted(NodeType) of
false -> disc;
true -> NodeType
end,
init_db_and_upgrade(DiscNodes, NodeType1, true, _Retry = true),
rabbit_node_monitor:notify_joined_cluster(),
ok;
run_peer_discovery_with_retries(RetriesLeft, DelayInterval) ->
FindBadNodeNames = fun
(Name, BadNames) when is_atom(Name) -> BadNames;
(Name, BadNames) -> [Name | BadNames]
end,
{DiscoveredNodes0, NodeType} =
case rabbit_peer_discovery:discover_cluster_nodes() of
{error, Reason} ->
RetriesLeft1 = RetriesLeft - 1,
rabbit_log:error("Peer discovery returned an error: ~tp. Will retry after a delay of ~b ms, ~b retries left...",
[Reason, DelayInterval, RetriesLeft1]),
timer:sleep(DelayInterval),
run_peer_discovery_with_retries(RetriesLeft1, DelayInterval);
{ok, {Nodes, Type} = Config}
when is_list(Nodes) andalso (Type == disc orelse Type == disk orelse Type == ram) ->
case lists:foldr(FindBadNodeNames, [], Nodes) of
[] -> Config;
BadNames -> e({invalid_cluster_node_names, BadNames})
end;
{ok, {_, BadType}} when BadType /= disc andalso BadType /= ram ->
e({invalid_cluster_node_type, BadType});
{ok, _} ->
e(invalid_cluster_nodes_conf)
end,
DiscoveredNodes = lists:usort(DiscoveredNodes0),
rabbit_log:info("All discovered existing cluster peers: ~ts",
[rabbit_peer_discovery:format_discovered_nodes(DiscoveredNodes)]),
Peers = rabbit_nodes:nodes_excl_me(DiscoveredNodes),
case Peers of
[] ->
rabbit_log:info("Discovered no peer nodes to cluster with. "
"Some discovery backends can filter nodes out based on a readiness criteria. "
"Enabling debug logging might help troubleshoot."),
init_db_and_upgrade([node()], disc, false, _Retry = true);
_ ->
NodeType1 = case is_node_type_permitted(NodeType) of
false -> disc;
true -> NodeType
end,
rabbit_log:info("Peer nodes we can cluster with: ~ts",
[rabbit_peer_discovery:format_discovered_nodes(Peers)]),
join_discovered_peers(Peers, NodeType1)
end.
%% Attempts to join discovered,
%% reachable and compatible (in terms of Mnesia internal protocol version and such)
%% cluster peers in order.
join_discovered_peers(TryNodes, NodeType) ->
{RetriesLeft, DelayInterval} = rabbit_peer_discovery:discovery_retries(),
join_discovered_peers_with_retries(TryNodes, NodeType, RetriesLeft, DelayInterval).
join_discovered_peers_with_retries(TryNodes, _NodeType, 0, _DelayInterval) ->
rabbit_log:info(
"Could not successfully contact any node of: ~ts (as in Erlang distribution). "
"Starting as a blank standalone node...",
[string:join(lists:map(fun atom_to_list/1, TryNodes), ",")]),
init_db_and_upgrade([node()], disc, false, _Retry = true);
join_discovered_peers_with_retries(TryNodes, NodeType, RetriesLeft, DelayInterval) ->
case find_reachable_peer_to_cluster_with(rabbit_nodes:nodes_excl_me(TryNodes)) of
{ok, Node} ->
rabbit_log:info("Node '~ts' selected for auto-clustering", [Node]),
{ok, {_, DiscNodes, _}} = discover_cluster0(Node),
init_db_and_upgrade(DiscNodes, NodeType, true, _Retry = true),
rabbit_node_monitor:notify_joined_cluster();
none ->
RetriesLeft1 = RetriesLeft - 1,
rabbit_log:info("Trying to join discovered peers failed. Will retry after a delay of ~b ms, ~b retries left...",
[DelayInterval, RetriesLeft1]),
timer:sleep(DelayInterval),
join_discovered_peers_with_retries(TryNodes, NodeType, RetriesLeft1, DelayInterval)
end.
create_cluster_callback(RemoteNode, NodeType) ->
{ok, {_, DiscNodes, _}} = discover_cluster0(RemoteNode),
NodeType1 = case is_node_type_permitted(NodeType) of
false -> disc;
true -> NodeType
end,
init_db_and_upgrade(DiscNodes, NodeType1, true, _Retry = true),
rabbit_node_monitor:notify_joined_cluster(),
ok.
%% Make the node join a cluster. The node will be reset automatically
%% before we actually cluster it. The nodes provided will be used to
@ -1149,23 +1060,6 @@ is_virgin_node() ->
List =:= []
end.
find_reachable_peer_to_cluster_with([]) ->
none;
find_reachable_peer_to_cluster_with([Node | Nodes]) ->
Fail = fun (Fmt, Args) ->
rabbit_log:warning(
"Could not auto-cluster with node ~ts: " ++ Fmt, [Node | Args]),
find_reachable_peer_to_cluster_with(Nodes)
end,
case rabbit_db_cluster:check_compatibility(Node) of
ok ->
{ok, Node};
{error, {badrpc, _} = Reason} ->
Fail("~tp", [Reason]);
Error ->
Fail("~tp", [Error])
end.
is_only_clustered_disc_node() ->
node_type() =:= disc andalso is_clustered() andalso
cluster_nodes(disc) =:= [node()].
@ -1177,17 +1071,6 @@ are_we_clustered_with(Node) ->
e(Tag) -> throw({error, {Tag, error_description(Tag)}}).
error_description({invalid_cluster_node_names, BadNames}) ->
"In the 'cluster_nodes' configuration key, the following node names "
"are invalid: " ++ lists:flatten(io_lib:format("~tp", [BadNames]));
error_description({invalid_cluster_node_type, BadType}) ->
"In the 'cluster_nodes' configuration key, the node type is invalid "
"(expected 'disc' or 'ram'): " ++
lists:flatten(io_lib:format("~tp", [BadType]));
error_description(invalid_cluster_nodes_conf) ->
"The 'cluster_nodes' configuration key is invalid, it must be of the "
"form {[Nodes], Type}, where Nodes is a list of node names and "
"Type is either 'disc' or 'ram'";
error_description(clustering_only_disc_node) ->
"You cannot cluster a node if it is the only disc node in its existing "
" cluster. If new nodes joined while this node was offline, use "

View File

@ -7,17 +7,30 @@
-module(rabbit_peer_discovery).
-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/logging.hrl").
%%
%% API
%%
-export([maybe_init/0, discover_cluster_nodes/0, backend/0, node_type/0,
-export([maybe_init/0, maybe_create_cluster/1, discover_cluster_nodes/0,
backend/0, node_type/0,
normalize/1, format_discovered_nodes/1, log_configured_backend/0,
register/0, unregister/0, maybe_register/0, maybe_unregister/0,
lock/0, unlock/1, discovery_retries/0]).
-export([append_node_prefix/1, node_prefix/0, locking_retry_timeout/0,
lock_acquisition_failure_mode/0]).
-ifdef(TEST).
-export([maybe_create_cluster/3]).
-endif.
-type create_cluster_callback() :: fun((node(),
rabbit_db_cluster:node_type())
-> ok).
-define(DEFAULT_BACKEND, rabbit_peer_discovery_classic_config).
%% what node type is used by default for this node when joining
@ -101,6 +114,189 @@ maybe_init() ->
ok
end.
maybe_create_cluster(CreateClusterCallback) ->
{Retries, Timeout} = locking_retry_timeout(),
maybe_create_cluster(Retries, Timeout, CreateClusterCallback).
maybe_create_cluster(0, _, CreateClusterCallback)
when is_function(CreateClusterCallback, 2) ->
case lock_acquisition_failure_mode() of
ignore ->
?LOG_WARNING(
"Peer discovery: Could not acquire a peer discovery lock, "
"out of retries", [],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
run_peer_discovery(CreateClusterCallback),
maybe_register();
fail ->
exit(cannot_acquire_startup_lock)
end;
maybe_create_cluster(Retries, Timeout, CreateClusterCallback)
when is_function(CreateClusterCallback, 2) ->
LockResult = lock(),
?LOG_DEBUG(
"Peer discovery: rabbit_peer_discovery:lock/0 returned ~tp",
[LockResult],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
case LockResult of
not_supported ->
run_peer_discovery(CreateClusterCallback),
maybe_register();
{ok, Data} ->
try
run_peer_discovery(CreateClusterCallback),
maybe_register()
after
unlock(Data)
end;
{error, _Reason} ->
timer:sleep(Timeout),
maybe_create_cluster(
Retries - 1, Timeout, CreateClusterCallback)
end.
-spec run_peer_discovery(CreateClusterCallback) -> Ret when
CreateClusterCallback :: create_cluster_callback(),
Ret :: ok | {Nodes, NodeType},
Nodes :: [node()],
NodeType :: rabbit_db_cluster:node_type().
run_peer_discovery(CreateClusterCallback) ->
{RetriesLeft, DelayInterval} = discovery_retries(),
run_peer_discovery_with_retries(
RetriesLeft, DelayInterval, CreateClusterCallback).
-spec run_peer_discovery_with_retries(
Retries, DelayInterval, CreateClusterCallback) -> ok when
CreateClusterCallback :: create_cluster_callback(),
Retries :: non_neg_integer(),
DelayInterval :: non_neg_integer().
run_peer_discovery_with_retries(
0, _DelayInterval, _CreateClusterCallback) ->
ok;
run_peer_discovery_with_retries(
RetriesLeft, DelayInterval, CreateClusterCallback) ->
FindBadNodeNames = fun
(Name, BadNames) when is_atom(Name) -> BadNames;
(Name, BadNames) -> [Name | BadNames]
end,
{DiscoveredNodes0, NodeType} =
case discover_cluster_nodes() of
{error, Reason} ->
RetriesLeft1 = RetriesLeft - 1,
?LOG_ERROR(
"Peer discovery: Failed to discover nodes: ~tp. "
"Will retry after a delay of ~b ms, ~b retries left...",
[Reason, DelayInterval, RetriesLeft1],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
timer:sleep(DelayInterval),
run_peer_discovery_with_retries(
RetriesLeft1, DelayInterval, CreateClusterCallback);
{ok, {Nodes, Type} = Config}
when is_list(Nodes) andalso
(Type == disc orelse Type == disk orelse Type == ram) ->
case lists:foldr(FindBadNodeNames, [], Nodes) of
[] -> Config;
BadNames -> e({invalid_cluster_node_names, BadNames})
end;
{ok, {_, BadType}} when BadType /= disc andalso BadType /= ram ->
e({invalid_cluster_node_type, BadType});
{ok, _} ->
e(invalid_cluster_nodes_conf)
end,
DiscoveredNodes = lists:usort(DiscoveredNodes0),
?LOG_INFO(
"Peer discovery: All discovered existing cluster peers: ~ts",
[format_discovered_nodes(DiscoveredNodes)],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
Peers = rabbit_nodes:nodes_excl_me(DiscoveredNodes),
case Peers of
[] ->
?LOG_INFO(
"Peer discovery: Discovered no peer nodes to cluster with. "
"Some discovery backends can filter nodes out based on a "
"readiness criteria. "
"Enabling debug logging might help troubleshoot.",
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
CreateClusterCallback(none, disc);
_ ->
?LOG_INFO(
"Peer discovery: Peer nodes we can cluster with: ~ts",
[format_discovered_nodes(Peers)],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
join_discovered_peers(Peers, NodeType, CreateClusterCallback)
end.
-spec e(any()) -> no_return().
e(Tag) -> throw({error, {Tag, error_description(Tag)}}).
error_description({invalid_cluster_node_names, BadNames}) ->
"In the 'cluster_nodes' configuration key, the following node names "
"are invalid: " ++ lists:flatten(io_lib:format("~tp", [BadNames]));
error_description({invalid_cluster_node_type, BadType}) ->
"In the 'cluster_nodes' configuration key, the node type is invalid "
"(expected 'disc' or 'ram'): " ++
lists:flatten(io_lib:format("~tp", [BadType]));
error_description(invalid_cluster_nodes_conf) ->
"The 'cluster_nodes' configuration key is invalid, it must be of the "
"form {[Nodes], Type}, where Nodes is a list of node names and "
"Type is either 'disc' or 'ram'".
%% Attempts to join discovered, reachable and compatible (in terms of Mnesia
%% internal protocol version and such) cluster peers in order.
join_discovered_peers(TryNodes, NodeType, CreateClusterCallback) ->
{RetriesLeft, DelayInterval} = discovery_retries(),
join_discovered_peers_with_retries(
TryNodes, NodeType, RetriesLeft, DelayInterval, CreateClusterCallback).
join_discovered_peers_with_retries(
TryNodes, _NodeType, 0, _DelayInterval, CreateClusterCallback) ->
?LOG_INFO(
"Peer discovery: Could not successfully contact any node of: ~ts "
"(as in Erlang distribution). "
"Starting as a blank standalone node...",
[string:join(lists:map(fun atom_to_list/1, TryNodes), ",")],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
CreateClusterCallback(none, disc);
join_discovered_peers_with_retries(
TryNodes, NodeType, RetriesLeft, DelayInterval, CreateClusterCallback) ->
case find_reachable_peer_to_cluster_with(TryNodes) of
{ok, Node} ->
?LOG_INFO(
"Peer discovery: Node '~ts' selected for auto-clustering",
[Node],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
CreateClusterCallback(Node, NodeType);
none ->
RetriesLeft1 = RetriesLeft - 1,
?LOG_INFO(
"Peer discovery: Trying to join discovered peers failed. "
"Will retry after a delay of ~b ms, ~b retries left...",
[DelayInterval, RetriesLeft1],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
timer:sleep(DelayInterval),
join_discovered_peers_with_retries(
TryNodes, NodeType, RetriesLeft1, DelayInterval,
CreateClusterCallback)
end.
find_reachable_peer_to_cluster_with([]) ->
none;
find_reachable_peer_to_cluster_with([Node | Nodes]) when Node =/= node() ->
case rabbit_db_cluster:check_compatibility(Node) of
ok ->
{ok, Node};
Error ->
?LOG_WARNING(
"Peer discovery: Could not auto-cluster with node ~ts: ~0p",
[Node, Error],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
find_reachable_peer_to_cluster_with(Nodes)
end;
find_reachable_peer_to_cluster_with([Node | Nodes]) when Node =:= node() ->
find_reachable_peer_to_cluster_with(Nodes).
%% This module doesn't currently sanity-check the return value of
%% `Backend:list_nodes()`. Therefore, it could return something invalid:

View File

@ -47,25 +47,25 @@ end_per_testcase(_, _) ->
init_with_lock_exits_after_errors(_Config) ->
meck:expect(rabbit_peer_discovery_classic_config, lock, fun(_) -> {error, "test error"} end),
?assertExit(cannot_acquire_startup_lock, rabbit_mnesia:init_with_lock(2, 10, fun() -> ok end)),
?assertExit(cannot_acquire_startup_lock, rabbit_peer_discovery:maybe_create_cluster(2, 10, fun(_, _) -> ok end)),
?assert(meck:validate(rabbit_peer_discovery_classic_config)),
passed.
init_with_lock_ignore_after_errors(_Config) ->
meck:expect(rabbit_peer_discovery_classic_config, lock, fun(_) -> {error, "test error"} end),
?assertEqual(ok, rabbit_mnesia:init_with_lock(2, 10, fun() -> ok end)),
?assertEqual(ok, rabbit_peer_discovery:maybe_create_cluster(2, 10, fun(_, _) -> ok end)),
?assert(meck:validate(rabbit_peer_discovery_classic_config)),
passed.
init_with_lock_not_supported(_Config) ->
meck:expect(rabbit_peer_discovery_classic_config, lock, fun(_) -> not_supported end),
?assertEqual(ok, rabbit_mnesia:init_with_lock(2, 10, fun() -> ok end)),
?assertEqual(ok, rabbit_peer_discovery:maybe_create_cluster(2, 10, fun(_, _) -> ok end)),
?assert(meck:validate(rabbit_peer_discovery_classic_config)),
passed.
init_with_lock_supported(_Config) ->
meck:expect(rabbit_peer_discovery_classic_config, lock, fun(_) -> {ok, data} end),
meck:expect(rabbit_peer_discovery_classic_config, unlock, fun(data) -> ok end),
?assertEqual(ok, rabbit_mnesia:init_with_lock(2, 10, fun() -> ok end)),
?assertEqual(ok, rabbit_peer_discovery:maybe_create_cluster(2, 10, fun(_, _) -> ok end)),
?assert(meck:validate(rabbit_peer_discovery_classic_config)),
passed.

View File

@ -7,6 +7,7 @@
-define(RMQLOG_DOMAIN_DB, ?DEFINE_RMQLOG_DOMAIN(db)).
-define(RMQLOG_DOMAIN_FEAT_FLAGS, ?DEFINE_RMQLOG_DOMAIN(feature_flags)).
-define(RMQLOG_DOMAIN_MIRRORING, ?DEFINE_RMQLOG_DOMAIN(mirroring)).
-define(RMQLOG_DOMAIN_PEER_DISC, ?DEFINE_RMQLOG_DOMAIN(peer_discovery)).
-define(RMQLOG_DOMAIN_PRELAUNCH, ?DEFINE_RMQLOG_DOMAIN(prelaunch)).
-define(RMQLOG_DOMAIN_QUEUE, ?DEFINE_RMQLOG_DOMAIN(queue)).
-define(RMQLOG_DOMAIN_UPGRADE, ?DEFINE_RMQLOG_DOMAIN(upgrade)).