QQ: when invoking drain only shut down small batches at a time

Then wait for elections to complete before shutting further
members down.

This should help avoid election storms when enabling maintenance
mode.

Transfer khepri before queues to ensure meta data store is
ready to accept pid updates.

Some other state related tweaks.
This commit is contained in:
Karl Nilsson 2025-08-19 08:14:57 +01:00
parent 268a16cb68
commit 87154f9b03
3 changed files with 77 additions and 33 deletions

View File

@ -76,12 +76,15 @@ drain() ->
}),
TransferCandidates = primary_replica_transfer_candidate_nodes(),
%% Transfer metadata store before queues as each queue needs to perform
%% a metadata update after an election
transfer_leadership_of_metadata_store(TransferCandidates),
%% Note: only QQ leadership is transferred because it is a reasonably quick thing to do a lot of queues
%% in the cluster, unlike with CMQs.
rabbit_queue_type:drain(TransferCandidates),
transfer_leadership_of_metadata_store(TransferCandidates),
%% allow plugins to react
rabbit_event:notify(maintenance_draining, #{
reason => <<"node is being put into maintenance">>

View File

@ -169,6 +169,7 @@
-define(MIN_CHECKPOINT_INTERVAL, 64).
-define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000).
-define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000).
-define(RA_MEMBERS_TIMEOUT, 30_000).
%%----------- QQ policies ---------------------------------------------------
@ -1229,7 +1230,6 @@ policy_changed(Q) ->
end.
-spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'.
cluster_state(Name) ->
case whereis(Name) of
undefined -> down;
@ -1577,12 +1577,13 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
-spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}.
-spec transfer_leadership(amqqueue:amqqueue(), node()) ->
{migrated, node()} | {not_migrated, atom()}.
transfer_leadership(Q, Destination) ->
{RaName, _} = Pid = amqqueue:get_pid(Q),
case ra:transfer_leadership(Pid, {RaName, Destination}) of
{RaName, _} = Leader = amqqueue:get_pid(Q),
case ra:transfer_leadership(Leader, {RaName, Destination}) of
ok ->
case ra:members(Pid) of
case ra:members(Leader, ?RA_MEMBERS_TIMEOUT) of
{_, _, {_, NewNode}} ->
{migrated, NewNode};
{timeout, _} ->
@ -1750,9 +1751,17 @@ i(memory, Q) when ?is_amqqueue(Q) ->
0
end;
i(state, Q) when ?is_amqqueue(Q) ->
{Name, Node} = amqqueue:get_pid(Q),
{Name, Node} = case find_leader(Q) of
undefined ->
%% fall back to queue record
amqqueue:get_pid(Q);
Leader ->
Leader
end,
%% Check against the leader or last known leader
case erpc_call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of
{error, {erpc, timeout}} ->
timeout;
{error, _} ->
down;
State ->
@ -1912,7 +1921,12 @@ format(Q, Ctx) when ?is_amqqueue(Q) ->
rabbit_nodes:list_running()
end,
Online = [N || N <- Nodes, lists:member(N, Running)],
{_, LeaderNode} = amqqueue:get_pid(Q),
{_, LeaderNode} = case find_leader(Q) of
undefined ->
amqqueue:get_pid(Q);
Leader ->
Leader
end,
State = case is_minority(Nodes, Online) of
true when length(Online) == 0 ->
down;
@ -2299,27 +2313,50 @@ drain(TransferCandidates) ->
transfer_leadership([]) ->
?LOG_WARNING("Skipping leadership transfer of quorum queues: no candidate "
"(online, not under maintenance) nodes to transfer to!");
transfer_leadership(_TransferCandidates) ->
transfer_leadership(_CandidateNodes) ->
%% we only transfer leadership for QQs that have local leaders
Queues = rabbit_amqqueue:list_local_leaders(),
LocalLeaderQueues = rabbit_amqqueue:list_local_leaders(),
QueuesChunked = ra_lib:lists_chunk(256, LocalLeaderQueues),
?LOG_INFO("Will transfer leadership of ~b quorum queues with current leader on this node",
[length(Queues)]),
_ = [begin
Name = amqqueue:get_name(Q),
?LOG_DEBUG("Will trigger a leader election for local quorum queue ~ts",
[rabbit_misc:rs(Name)]),
[length(LocalLeaderQueues)]),
[begin
[begin
%% we trigger an election and exclude this node from the list of candidates
%% by simply shutting its local QQ replica (Ra server)
RaLeader = amqqueue:get_pid(Q),
?LOG_DEBUG("Will stop Ra server ~tp", [RaLeader]),
?LOG_DEBUG("Will stop Ra leader ~tp", [RaLeader]),
case rabbit_quorum_queue:stop_server(RaLeader) of
ok ->
?LOG_DEBUG("Successfully stopped Ra server ~tp", [RaLeader]);
{error, nodedown} ->
?LOG_ERROR("Failed to stop Ra server ~tp: target node was reported as down")
end
end,
ok
end || Q <- Queues],
?LOG_INFO("Leadership transfer for quorum queues hosted on this node has been initiated").
%% wait for leader elections before processing next chunk of queues
[begin
{RaName, LeaderNode} = amqqueue:get_pid(Q),
MemberNodes = lists:delete(LeaderNode, amqqueue:get_nodes(Q)),
%% we don't do any explicit error handling here as it is more
%% important to make progress
_ = lists:any(fun (N) ->
case ra:members({RaName, N}, ?RA_MEMBERS_TIMEOUT) of
{ok, _, _} ->
true;
Err ->
Name = amqqueue:get_name(Q),
?LOG_DEBUG("Failed to wait for leader election for queue ~ts on ~tp Err ~ts",
[Name, N, Err]),
false
end
end, MemberNodes),
ok
end || Q <- Queues],
ok
end || Queues <- QueuesChunked],
?LOG_INFO("Leadership transfer for quorum queues hosted on this node has been initiated"),
ok.
%% TODO: I just copied it over, it looks like was always called inside maintenance so...
-spec stop_local_quorum_queue_followers() -> ok.

View File

@ -601,6 +601,10 @@ function fmt_object_state(obj) {
explanation = 'The queue does not have sufficient online members to ' +
'make progress'
}
else if (obj.state == 'timeout') {
colour = 'yellow';
explanation = 'The queue did not respond to it\'s status request ';
}
return fmt_state(colour, text, explanation);
}