Skip peer discovery cleanup when backend returns error
Peer Discovery AWS Integration Test / Integration Test (push) Has been cancelled Details

Previously if the peer discovery backend returned an error from failing
to discover nodes, the `service_discovery_nodes/0` helper returned an
empty list. During cleanup this would mean that any nodes unreachable
during a partition would have destructive action taken against them:
`rabbit_db_cluster:forget_member/2` and `rabbit_quorum_queue:shrink_all/1`.
The `list_nodes/0` callback can fail transiently, though, and a failure
shouldn't mean that the cluster is empty. It's safer to avoid cleaning
up any nodes when the peer discovery backend fails to return the
intended set of nodes.
This commit is contained in:
Michael Davis 2025-09-24 20:31:04 -04:00
parent f11198a799
commit 2d4f19cf42
No known key found for this signature in database
2 changed files with 64 additions and 37 deletions

View File

@ -240,19 +240,29 @@ maybe_cleanup(State, UnreachableNodes) ->
?LOG_DEBUG(
"Peer discovery: cleanup discovered unreachable nodes: ~tp",
[UnreachableNodes]),
case lists:subtract(as_list(UnreachableNodes), as_list(service_discovery_nodes())) of
[] ->
?LOG_DEBUG(
"Peer discovery: all unreachable nodes are still "
"registered with the discovery backend ~tp",
[rabbit_peer_discovery:backend()],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok;
Nodes ->
?LOG_DEBUG(
"Peer discovery: unreachable nodes are not registered "
"with the discovery backend ~tp", [Nodes]),
maybe_remove_nodes(Nodes, State#state.warn_only)
Module = rabbit_peer_discovery:backend(),
case rabbit_peer_discovery:normalize(Module:list_nodes()) of
{ok, {OneOrMultipleNodes, _Type}} ->
DiscoveredNodes = as_list(OneOrMultipleNodes),
case lists:subtract(UnreachableNodes, DiscoveredNodes) of
[] ->
?LOG_DEBUG(
"Peer discovery: all unreachable nodes are still "
"registered with the discovery backend ~tp",
[rabbit_peer_discovery:backend()],
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
ok;
Nodes ->
?LOG_DEBUG(
"Peer discovery: unreachable nodes are not registered "
"with the discovery backend ~tp", [Nodes]),
maybe_remove_nodes(Nodes, State#state.warn_only)
end;
{error, Reason} ->
?LOG_INFO(
"Peer discovery cleanup: ~tp returned error ~tp",
[Module, Reason]),
ok
end.
%%--------------------------------------------------------------------
@ -288,26 +298,3 @@ maybe_remove_nodes([Node | Nodes], false) ->
-spec unreachable_nodes() -> [node()].
unreachable_nodes() ->
rabbit_nodes:list_unreachable().
%%--------------------------------------------------------------------
%% @private
%% @doc Return the nodes that the service discovery backend knows about
%% @spec service_discovery_nodes() -> [node()]
%% @end
%%--------------------------------------------------------------------
-spec service_discovery_nodes() -> [node()].
service_discovery_nodes() ->
Module = rabbit_peer_discovery:backend(),
case rabbit_peer_discovery:normalize(Module:list_nodes()) of
{ok, {OneOrMultipleNodes, _Type}} ->
Nodes = as_list(OneOrMultipleNodes),
?LOG_DEBUG(
"Peer discovery cleanup: ~tp returned ~tp",
[Module, Nodes]),
Nodes;
{error, Reason} ->
?LOG_DEBUG(
"Peer discovery cleanup: ~tp returned error ~tp",
[Module, Reason]),
[]
end.

View File

@ -26,7 +26,8 @@ groups() ->
all_tests() ->
[
cleanup_queues
cleanup_queues,
backend_errors_do_not_trigger_cleanup
].
%% -------------------------------------------------------------------
@ -126,6 +127,45 @@ cleanup_queues(Config) ->
%% Cleanup.
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
backend_errors_do_not_trigger_cleanup(Config) ->
%% The backend could have some transient failures. While the backend is
%% not giving reliable peer information, skip cleanup.
[A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
QQ = <<"quorum-queue">>,
declare_queue(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
%% Have the backend return an error.
mock_list_nodes(Config, {error, "...internal server error..."}),
%% Make node C unreachable.
rabbit_ct_broker_helpers:block_traffic_between(A, C),
rabbit_ct_broker_helpers:block_traffic_between(B, C),
Ts1 = erlang:system_time(millisecond),
?awaitMatch([C],
rabbit_ct_broker_helpers:rpc(Config, A,
rabbit_nodes,
list_unreachable, []),
30_000, 1_000),
ct:log(?LOW_IMPORTANCE, "Node C became unreachable in ~bms",
[erlang:system_time(millisecond) - Ts1]),
ok = rabbit_ct_broker_helpers:rpc(Config, A,
rabbit_peer_discovery_cleanup,
check_cluster, []),
%% Node C should remain in the quorum queue members.
?assertEqual(
lists:sort([A, B, C]),
begin
Info = rpc:call(A, rabbit_quorum_queue, infos,
[rabbit_misc:r(<<"/">>, queue, QQ)]),
lists:sort(proplists:get_value(members, Info))
end),
%% Cleanup.
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
%%%
%%% Implementation
%%%