rabbit_peer_discovery: Acquire a lock for the joining and the joined nodes only

[Why]
A lock is acquired to protect against concurrent cluster joins.

Some backends used to use the entire list of discovered nodes and used
`global` as the lock implementation. This was a problem because a side
effect was that all discovered Erlang nodes were connected to each
other. This led to conflicts in the global process name registry and
thus processes were killed randomly.

This was the case with the feature flags controller for instance. Nodes
are running some feature flags operation early in boot before they are
ready to cluster or run the peer discovery code. But if another node was
executing peer discovery, it could make all nodes connected. Feature
flags controller unrelated instances were thus killed because of another
node running peer discovery.

[How]
Acquiring a lock on the joining and the joined nodes only is enough to
achieve the goal of protecting against concurrent joins. This is
possible because of the new core logic which ensures the same node is
used as the "seed node". I.e. all nodes will join the same node.

Therefore the API of `rabbit_peer_discovery_backend:lock/1` is changed
to take a list of nodes (the two nodes mentionned above) instead of one
node (which was the current node, so not that helpful in the first
place).

These backends also used to check if the current node was part of the
discovered nodes. But that's already handled in the generic peer
discovery code already.

CAUTION: This brings a breaking change in the peer discovery backend
API. The `Backend:lock/1` callback now takes a list of node names
instead of a single node name. This list will contain the current node
name.
This commit is contained in:
Jean-Sébastien Pédron 2023-11-27 12:18:47 +01:00
parent 44926f05f8
commit 1e46d334bf
No known key found for this signature in database
GPG Key ID: 39E99761A5FD94CC
14 changed files with 107 additions and 108 deletions

View File

@ -613,7 +613,7 @@ join_selected_node(Backend, SelectedNode, NodeType) ->
?LOG_DEBUG( ?LOG_DEBUG(
"Peer discovery: trying to acquire lock", "Peer discovery: trying to acquire lock",
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
LockResult = lock(Backend), LockResult = lock(Backend, SelectedNode),
?LOG_DEBUG( ?LOG_DEBUG(
"Peer discovery: rabbit_peer_discovery:lock/0 returned ~0tp", "Peer discovery: rabbit_peer_discovery:lock/0 returned ~0tp",
[LockResult], [LockResult],
@ -788,18 +788,34 @@ unregister(Backend) ->
ok ok
end. end.
-spec lock(Backend) -> Ret when -spec lock(Backend, SelectedNode) -> Ret when
Backend :: backend(), Backend :: backend(),
SelectedNode :: node(),
Ret :: {ok, Data} | not_supported | {error, Reason}, Ret :: {ok, Data} | not_supported | {error, Reason},
Data :: any(), Data :: any(),
Reason :: string(). Reason :: string().
lock(Backend) -> lock(Backend, SelectedNode) ->
?LOG_INFO( ?LOG_INFO(
"Peer discovery: will try to lock with peer discovery backend ~ts", "Peer discovery: will try to lock with peer discovery backend ~ts",
[Backend], [Backend],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}), #{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
case Backend:lock(node()) of %% We want to acquire a lock for two nodes: this one and the selected
%% node. This protects against concurrent cluster joins.
%%
%% Some backends used to use the entire list of discovered nodes and used
%% `global' as the lock implementation. This was a problem because a side
%% effect was that all discovered Erlang nodes were connected to each
%% other. This led to conflicts in the global process name registry and
%% thus processes killed randomly. This was the case with the feature
%% flags controller for instance.
%%
%% Peer discovery shouldn't connect to all discovered nodes before it is
%% ready to actually join another node. And it should only connect to that
%% specific node, not all of them.
ThisNode = node(),
NodesToLock = [ThisNode, SelectedNode],
case Backend:lock(NodesToLock) of
{error, Reason} = Error -> {error, Reason} = Error ->
?LOG_ERROR( ?LOG_ERROR(
"Peer discovery: failed to lock with peer discovery " "Peer discovery: failed to lock with peer discovery "

View File

@ -35,11 +35,12 @@ add_this_node(Nodes) ->
false -> [ThisNode | Nodes] false -> [ThisNode | Nodes]
end. end.
-spec lock(Node :: node()) -> {ok, {{ResourceId :: string(), LockRequesterId :: node()}, Nodes :: [node()]}} | -spec lock(Nodes :: [node()]) ->
{error, Reason :: string()}. {ok, {{ResourceId :: string(), LockRequesterId :: node()}, Nodes :: [node()]}} |
{error, Reason :: string()}.
lock(Node) -> lock(Nodes) ->
{ok, {Nodes, _NodeType}} = list_nodes(), Node = node(),
case lists:member(Node, Nodes) of case lists:member(Node, Nodes) of
false when Nodes =/= [] -> false when Nodes =/= [] ->
rabbit_log:warning("Local node ~ts is not part of configured nodes ~tp. " rabbit_log:warning("Local node ~ts is not part of configured nodes ~tp. "

View File

@ -63,9 +63,9 @@ unregister() ->
post_registration() -> post_registration() ->
ok. ok.
-spec lock(Node :: atom()) -> not_supported. -spec lock(Nodes :: [node()]) -> not_supported.
lock(_Node) -> lock(_Nodes) ->
not_supported. not_supported.
-spec unlock(Data :: term()) -> ok. -spec unlock(Data :: term()) -> ok.

View File

@ -52,7 +52,7 @@
-callback post_registration() -> ok | {error, Reason :: string()}. -callback post_registration() -> ok | {error, Reason :: string()}.
-callback lock(Node :: atom()) -> {ok, Data :: term()} | not_supported | {error, Reason :: string()}. -callback lock(Nodes :: [node()]) -> {ok, Data :: term()} | not_supported | {error, Reason :: string()}.
-callback unlock(Data :: term()) -> ok. -callback unlock(Data :: term()) -> ok.

View File

@ -113,34 +113,28 @@ unregister() ->
post_registration() -> post_registration() ->
ok. ok.
-spec lock(Node :: node()) -> {ok, {{ResourceId :: string(), LockRequesterId :: node()}, Nodes :: [node()]}} | -spec lock(Nodes :: [node()]) ->
{error, Reason :: string()}. {ok, {{ResourceId :: string(), LockRequesterId :: node()}, Nodes :: [node()]}} |
{error, Reason :: string()}.
lock(Node) -> lock(Nodes) ->
%% call list_nodes/0 externally such that meck can mock the function Node = node(),
case ?MODULE:list_nodes() of case lists:member(Node, Nodes) of
{ok, {[], disc}} ->
{error, "Cannot lock since no nodes got discovered."};
{ok, {Nodes, disc}} ->
case lists:member(Node, Nodes) of
true -> true ->
rabbit_log:info("Will try to lock connecting to nodes ~tp", [Nodes]), rabbit_log:info("Will try to lock connecting to nodes ~tp", [Nodes]),
LockId = rabbit_nodes:lock_id(Node), LockId = rabbit_nodes:lock_id(Node),
Retries = rabbit_nodes:lock_retries(), Retries = rabbit_nodes:lock_retries(),
case global:set_lock(LockId, Nodes, Retries) of case global:set_lock(LockId, Nodes, Retries) of
true -> true ->
{ok, {LockId, Nodes}}; {ok, {LockId, Nodes}};
false -> false ->
{error, io_lib:format("Acquiring lock taking too long, bailing out after ~b retries", [Retries])} {error, io_lib:format("Acquiring lock taking too long, bailing out after ~b retries", [Retries])}
end; end;
false -> false ->
%% Don't try to acquire the global lock when our own node is not discoverable by peers. %% Don't try to acquire the global lock when our own node is not discoverable by peers.
%% We shouldn't run into this branch because our node is running and should have been discovered. %% We shouldn't run into this branch because our node is running and should have been discovered.
{error, lists:flatten(io_lib:format("Local node ~ts is not part of discovered nodes ~tp", [Node, Nodes]))} {error, lists:flatten(io_lib:format("Local node ~ts is not part of discovered nodes ~tp", [Node, Nodes]))}
end; end.
{error, _} = Error ->
Error
end.
-spec unlock({{ResourceId :: string(), LockRequestedId :: atom()}, Nodes :: [atom()]}) -> 'ok'. -spec unlock({{ResourceId :: string(), LockRequestedId :: atom()}, Nodes :: [atom()]}) -> 'ok'.
unlock({LockId, Nodes}) -> unlock({LockId, Nodes}) ->

View File

@ -45,7 +45,7 @@ unregister() ->
post_registration() -> post_registration() ->
?DELEGATE:post_registration(). ?DELEGATE:post_registration().
-spec lock(Node :: node()) -> {ok, {ResourceId :: string(), LockRequesterId :: node()}} | {error, Reason :: string()}. -spec lock(Nodes :: [node()]) -> {ok, {ResourceId :: string(), LockRequesterId :: node()}} | {error, Reason :: string()}.
lock(Node) -> lock(Node) ->
?DELEGATE:lock(Node). ?DELEGATE:lock(Node).

View File

@ -28,8 +28,7 @@ groups() ->
{lock, [], [ {lock, [], [
lock_single_node, lock_single_node,
lock_multiple_nodes, lock_multiple_nodes,
lock_local_node_not_discovered, lock_local_node_not_discovered
lock_list_nodes_fails
]} ]}
]. ].
@ -76,34 +75,28 @@ registration_support(_Config) ->
lock_single_node(_Config) -> lock_single_node(_Config) ->
LocalNode = node(), LocalNode = node(),
Nodes = [LocalNode], Nodes = [LocalNode],
meck:expect(rabbit_peer_discovery_aws, list_nodes, 0, {ok, {Nodes, disc}}),
{ok, {LockId, Nodes}} = rabbit_peer_discovery_aws:lock(LocalNode), {ok, {LockId, Nodes}} = rabbit_peer_discovery_aws:lock([LocalNode]),
?assertEqual(ok, rabbit_peer_discovery_aws:unlock({LockId, Nodes})). ?assertEqual(ok, rabbit_peer_discovery_aws:unlock({LockId, Nodes})).
lock_multiple_nodes(_Config) -> lock_multiple_nodes(_Config) ->
application:set_env(rabbit, cluster_formation, [{internal_lock_retries, 2}]), application:set_env(rabbit, cluster_formation, [{internal_lock_retries, 2}]),
LocalNode = node(), LocalNode = node(),
OtherNode = other@host, OtherNodeA = a@host,
Nodes = [OtherNode, LocalNode], OtherNodeB = b@host,
meck:expect(rabbit_peer_discovery_aws, list_nodes, 0, {ok, {Nodes, disc}}),
{ok, {{LockResourceId, OtherNode}, Nodes}} = rabbit_peer_discovery_aws:lock(OtherNode), meck:expect(rabbit_nodes, lock_id, 1, {rabbit_nodes:cookie_hash(), OtherNodeA}),
?assertEqual({error, "Acquiring lock taking too long, bailing out after 2 retries"}, {ok, {{LockResourceId, OtherNodeA}, [LocalNode, OtherNodeA]}} = rabbit_peer_discovery_aws:lock([LocalNode, OtherNodeA]),
rabbit_peer_discovery_aws:lock(LocalNode)), meck:expect(rabbit_nodes, lock_id, 1, {rabbit_nodes:cookie_hash(), OtherNodeB}),
?assertEqual(ok, rabbitmq_peer_discovery_aws:unlock({{LockResourceId, OtherNode}, Nodes})), ?assertEqual({error, "Acquiring lock taking too long, bailing out after 2 retries"}, rabbit_peer_discovery_aws:lock([LocalNode, OtherNodeB])),
?assertEqual(ok, rabbit_peer_discovery_aws:unlock({{LockResourceId, OtherNodeA}, [LocalNode, OtherNodeA]})),
?assertEqual({ok, {{LockResourceId, LocalNode}, Nodes}}, rabbit_peer_discovery_aws:lock(LocalNode)), ?assertEqual({ok, {{LockResourceId, OtherNodeB}, [LocalNode, OtherNodeB]}}, rabbit_peer_discovery_aws:lock([LocalNode, OtherNodeB])),
?assertEqual(ok, rabbitmq_peer_discovery_aws:unlock({{LockResourceId, LocalNode}, Nodes})). ?assertEqual(ok, rabbit_peer_discovery_aws:unlock({{LockResourceId, OtherNodeB}, [LocalNode, OtherNodeB]})),
meck:unload(rabbit_nodes).
lock_local_node_not_discovered(_Config) -> lock_local_node_not_discovered(_Config) ->
meck:expect(rabbit_peer_discovery_aws, list_nodes, 0, {ok, {[n1@host, n2@host], disc}} ), Expectation = {error, "Local node " ++ atom_to_list(node()) ++ " is not part of discovered nodes [me@host]"},
Expectation = {error, "Local node me@host is not part of discovered nodes [n1@host,n2@host]"}, ?assertEqual(Expectation, rabbit_peer_discovery_aws:lock([me@host])).
?assertEqual(Expectation, rabbit_peer_discovery_aws:lock(me@host)).
lock_list_nodes_fails(_Config) ->
meck:expect(rabbit_peer_discovery_aws, list_nodes, 0, {error, "failed for some reason"}),
?assertEqual({error, "failed for some reason"}, rabbit_peer_discovery_aws:lock(me@host)).
%%% %%%
%%% Implementation %%% Implementation

View File

@ -160,13 +160,15 @@ post_registration() ->
send_health_check_pass(), send_health_check_pass(),
ok. ok.
-spec lock(Node :: atom()) -> {ok, Data :: term()} | {error, Reason :: string()}. -spec lock(Nodes :: [node()]) ->
{ok, Data :: term()} | {error, Reason :: string()}.
lock(Node) -> lock(_Nodes) ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
?LOG_DEBUG( ?LOG_DEBUG(
"Effective Consul peer discovery configuration: ~tp", [M], "Effective Consul peer discovery configuration: ~tp", [M],
#{domain => ?RMQLOG_DOMAIN_PEER_DIS}), #{domain => ?RMQLOG_DOMAIN_PEER_DIS}),
Node = node(),
case create_session(Node, get_config_key(consul_svc_ttl, M)) of case create_session(Node, get_config_key(consul_svc_ttl, M)) of
{ok, SessionId} -> {ok, SessionId} ->
TRef = start_session_ttl_updater(SessionId), TRef = start_session_ttl_updater(SessionId),

View File

@ -42,7 +42,7 @@ unregister() ->
post_registration() -> post_registration() ->
?DELEGATE:post_registration(). ?DELEGATE:post_registration().
-spec lock(Node :: atom()) -> {ok, Data :: term()} | {error, Reason :: string()}. -spec lock(Nodes :: [node()]) -> {ok, Data :: term()} | {error, Reason :: string()}.
lock(Node) -> lock(Node) ->
?DELEGATE:lock(Node). ?DELEGATE:lock(Node).

View File

@ -93,9 +93,11 @@ unregister() ->
post_registration() -> post_registration() ->
ok. ok.
-spec lock(Node :: atom()) -> {ok, Data :: term()} | {error, Reason :: string()}. -spec lock(Nodes :: [node()]) ->
{ok, Data :: term()} | {error, Reason :: string()}.
lock(Node) when is_atom(Node) -> lock(Nodes) when is_list(Nodes) ->
Node = node(),
case rabbitmq_peer_discovery_etcd_v3_client:lock(Node) of case rabbitmq_peer_discovery_etcd_v3_client:lock(Node) of
{ok, GeneratedKey} -> {ok, GeneratedKey}; {ok, GeneratedKey} -> {ok, GeneratedKey};
{error, _} = Error -> Error {error, _} = Error -> Error

View File

@ -44,7 +44,7 @@ unregister() ->
post_registration() -> post_registration() ->
?DELEGATE:post_registration(). ?DELEGATE:post_registration().
-spec lock(Node :: atom()) -> {'ok', term()} | {'error', string()}. -spec lock(Nodes :: [node()]) -> {'ok', term()} | {'error', string()}.
lock(Node) -> lock(Node) ->
?DELEGATE:lock(Node). ?DELEGATE:lock(Node).

View File

@ -68,33 +68,29 @@ register() ->
unregister() -> unregister() ->
ok. ok.
-spec lock(Node :: node()) -> {ok, {{ResourceId :: string(), LockRequesterId :: node()}, Nodes :: [node()]}} | -spec lock(Nodes :: [node()]) ->
{error, Reason :: string()}. {ok, {{ResourceId :: string(), LockRequesterId :: node()}, Nodes :: [node()]}} |
{error, Reason :: string()}.
lock(Node) -> lock(Nodes) ->
%% call list_nodes/0 externally such that meck can mock the function Node = node(),
case ?MODULE:list_nodes() of case lists:member(Node, Nodes) of
{ok, {Nodes, disc}} ->
case lists:member(Node, Nodes) of
true -> true ->
rabbit_log:info("Will try to lock connecting to nodes ~tp", [Nodes]), rabbit_log:info("Will try to lock connecting to nodes ~tp", [Nodes]),
LockId = rabbit_nodes:lock_id(Node), LockId = rabbit_nodes:lock_id(Node),
Retries = rabbit_nodes:lock_retries(), Retries = rabbit_nodes:lock_retries(),
case global:set_lock(LockId, Nodes, Retries) of case global:set_lock(LockId, Nodes, Retries) of
true -> true ->
{ok, {LockId, Nodes}}; {ok, {LockId, Nodes}};
false -> false ->
{error, io_lib:format("Acquiring lock taking too long, bailing out after ~b retries", [Retries])} {error, io_lib:format("Acquiring lock taking too long, bailing out after ~b retries", [Retries])}
end; end;
false -> false ->
%% Don't try to acquire the global lock when local node is not discoverable by peers. %% Don't try to acquire the global lock when local node is not discoverable by peers.
%% This branch is just an additional safety check. We should never run into this branch %% This branch is just an additional safety check. We should never run into this branch
%% because the local Pod is in state 'Running' and we listed both ready and not-ready addresses. %% because the local Pod is in state 'Running' and we listed both ready and not-ready addresses.
{error, lists:flatten(io_lib:format("Local node ~ts is not part of discovered nodes ~tp", [Node, Nodes]))} {error, lists:flatten(io_lib:format("Local node ~ts is not part of discovered nodes ~tp", [Node, Nodes]))}
end; end.
{error, _} = Error ->
Error
end.
-spec unlock({{ResourceId :: string(), LockRequestedId :: atom()}, Nodes :: [atom()]}) -> 'ok'. -spec unlock({{ResourceId :: string(), LockRequestedId :: atom()}, Nodes :: [atom()]}) -> 'ok'.
unlock({LockId, Nodes}) -> unlock({LockId, Nodes}) ->

View File

@ -44,7 +44,7 @@ unregister() ->
post_registration() -> post_registration() ->
?DELEGATE:post_registration(). ?DELEGATE:post_registration().
-spec lock(Node :: node()) -> {ok, {ResourceId :: string(), LockRequesterId :: node()}} | {error, Reason :: string()}. -spec lock(Nodes :: [node()]) -> {ok, {ResourceId :: string(), LockRequesterId :: node()}} | {error, Reason :: string()}.
lock(Node) -> lock(Node) ->
?DELEGATE:lock(Node). ?DELEGATE:lock(Node).

View File

@ -40,8 +40,7 @@ groups() ->
{lock, [], [ {lock, [], [
lock_single_node, lock_single_node,
lock_multiple_nodes, lock_multiple_nodes,
lock_local_node_not_discovered, lock_local_node_not_discovered
lock_list_nodes_fails
]} ]}
]. ].
@ -146,29 +145,25 @@ event_v1_test(_Config) ->
lock_single_node(_Config) -> lock_single_node(_Config) ->
LocalNode = node(), LocalNode = node(),
Nodes = [LocalNode], Nodes = [LocalNode],
meck:expect(rabbit_peer_discovery_k8s, list_nodes, 0, {ok, {[LocalNode], disc}}),
{ok, {LockId, Nodes}} = rabbit_peer_discovery_k8s:lock(LocalNode), {ok, {LockId, Nodes}} = rabbit_peer_discovery_k8s:lock([LocalNode]),
?assertEqual(ok, rabbit_peer_discovery_k8s:unlock({LockId, Nodes})). ?assertEqual(ok, rabbit_peer_discovery_k8s:unlock({LockId, Nodes})).
lock_multiple_nodes(_Config) -> lock_multiple_nodes(_Config) ->
application:set_env(rabbit, cluster_formation, [{internal_lock_retries, 2}]), application:set_env(rabbit, cluster_formation, [{internal_lock_retries, 2}]),
LocalNode = node(), LocalNode = node(),
OtherNode = other@host, OtherNodeA = a@host,
Nodes = [OtherNode, LocalNode], OtherNodeB = b@host,
meck:expect(rabbit_peer_discovery_k8s, list_nodes, 0, {ok, {Nodes, disc}}),
{ok, {{LockResourceId, OtherNode}, Nodes}} = rabbit_peer_discovery_k8s:lock(OtherNode), meck:expect(rabbit_nodes, lock_id, 1, {rabbit_nodes:cookie_hash(), OtherNodeA}),
?assertEqual({error, "Acquiring lock taking too long, bailing out after 2 retries"}, rabbit_peer_discovery_k8s:lock(LocalNode)), {ok, {{LockResourceId, OtherNodeA}, [LocalNode, OtherNodeA]}} = rabbit_peer_discovery_k8s:lock([LocalNode, OtherNodeA]),
?assertEqual(ok, rabbitmq_peer_discovery_k8s:unlock({{LockResourceId, OtherNode}, Nodes})), meck:expect(rabbit_nodes, lock_id, 1, {rabbit_nodes:cookie_hash(), OtherNodeB}),
?assertEqual({ok, {{LockResourceId, LocalNode}, Nodes}}, rabbit_peer_discovery_k8s:lock(LocalNode)), ?assertEqual({error, "Acquiring lock taking too long, bailing out after 2 retries"}, rabbit_peer_discovery_k8s:lock([LocalNode, OtherNodeB])),
?assertEqual(ok, rabbitmq_peer_discovery_k8s:unlock({{LockResourceId, LocalNode}, Nodes})). ?assertEqual(ok, rabbit_peer_discovery_k8s:unlock({{LockResourceId, OtherNodeA}, [LocalNode, OtherNodeA]})),
?assertEqual({ok, {{LockResourceId, OtherNodeB}, [LocalNode, OtherNodeB]}}, rabbit_peer_discovery_k8s:lock([LocalNode, OtherNodeB])),
?assertEqual(ok, rabbit_peer_discovery_k8s:unlock({{LockResourceId, OtherNodeB}, [LocalNode, OtherNodeB]})),
meck:unload(rabbit_nodes).
lock_local_node_not_discovered(_Config) -> lock_local_node_not_discovered(_Config) ->
meck:expect(rabbit_peer_discovery_k8s, list_nodes, 0, {ok, {[n1@host, n2@host], disc}} ), Expectation = {error, "Local node " ++ atom_to_list(node()) ++ " is not part of discovered nodes [me@host]"},
Expectation = {error, "Local node me@host is not part of discovered nodes [n1@host,n2@host]"}, ?assertEqual(Expectation, rabbit_peer_discovery_k8s:lock([me@host])).
?assertEqual(Expectation, rabbit_peer_discovery_k8s:lock(me@host)).
lock_list_nodes_fails(_Config) ->
meck:expect(rabbit_peer_discovery_k8s, list_nodes, 0, {error, "K8s API unavailable"}),
?assertEqual({error, "K8s API unavailable"}, rabbit_peer_discovery_k8s:lock(me@host)).