diff --git a/deps/rabbit/src/rabbit_maintenance.erl b/deps/rabbit/src/rabbit_maintenance.erl index 172b115530..f2393d76c9 100644 --- a/deps/rabbit/src/rabbit_maintenance.erl +++ b/deps/rabbit/src/rabbit_maintenance.erl @@ -76,12 +76,15 @@ drain() -> }), TransferCandidates = primary_replica_transfer_candidate_nodes(), + + %% Transfer metadata store before queues as each queue needs to perform + %% a metadata update after an election + transfer_leadership_of_metadata_store(TransferCandidates), + %% Note: only QQ leadership is transferred because it is a reasonably quick thing to do a lot of queues %% in the cluster, unlike with CMQs. rabbit_queue_type:drain(TransferCandidates), - transfer_leadership_of_metadata_store(TransferCandidates), - %% allow plugins to react rabbit_event:notify(maintenance_draining, #{ reason => <<"node is being put into maintenance">> diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index d068d51bb5..c7876f3c55 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -169,6 +169,7 @@ -define(MIN_CHECKPOINT_INTERVAL, 64). -define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000). -define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000). +-define(RA_MEMBERS_TIMEOUT, 30_000). %%----------- QQ policies --------------------------------------------------- @@ -1229,7 +1230,6 @@ policy_changed(Q) -> end. -spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'. - cluster_state(Name) -> case whereis(Name) of undefined -> down; @@ -1577,17 +1577,18 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) -> is_match(amqqueue:get_vhost(Q), VhostSpec) andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]. --spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}. +-spec transfer_leadership(amqqueue:amqqueue(), node()) -> + {migrated, node()} | {not_migrated, atom()}. transfer_leadership(Q, Destination) -> - {RaName, _} = Pid = amqqueue:get_pid(Q), - case ra:transfer_leadership(Pid, {RaName, Destination}) of + {RaName, _} = Leader = amqqueue:get_pid(Q), + case ra:transfer_leadership(Leader, {RaName, Destination}) of ok -> - case ra:members(Pid) of - {_, _, {_, NewNode}} -> - {migrated, NewNode}; - {timeout, _} -> - {not_migrated, ra_members_timeout} - end; + case ra:members(Leader, ?RA_MEMBERS_TIMEOUT) of + {_, _, {_, NewNode}} -> + {migrated, NewNode}; + {timeout, _} -> + {not_migrated, ra_members_timeout} + end; already_leader -> {not_migrated, already_leader}; {error, Reason} -> @@ -1750,9 +1751,17 @@ i(memory, Q) when ?is_amqqueue(Q) -> 0 end; i(state, Q) when ?is_amqqueue(Q) -> - {Name, Node} = amqqueue:get_pid(Q), + {Name, Node} = case find_leader(Q) of + undefined -> + %% fall back to queue record + amqqueue:get_pid(Q); + Leader -> + Leader + end, %% Check against the leader or last known leader case erpc_call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of + {error, {erpc, timeout}} -> + timeout; {error, _} -> down; State -> @@ -1912,7 +1921,12 @@ format(Q, Ctx) when ?is_amqqueue(Q) -> rabbit_nodes:list_running() end, Online = [N || N <- Nodes, lists:member(N, Running)], - {_, LeaderNode} = amqqueue:get_pid(Q), + {_, LeaderNode} = case find_leader(Q) of + undefined -> + amqqueue:get_pid(Q); + Leader -> + Leader + end, State = case is_minority(Nodes, Online) of true when length(Online) == 0 -> down; @@ -2299,27 +2313,50 @@ drain(TransferCandidates) -> transfer_leadership([]) -> ?LOG_WARNING("Skipping leadership transfer of quorum queues: no candidate " "(online, not under maintenance) nodes to transfer to!"); -transfer_leadership(_TransferCandidates) -> +transfer_leadership(_CandidateNodes) -> %% we only transfer leadership for QQs that have local leaders - Queues = rabbit_amqqueue:list_local_leaders(), + LocalLeaderQueues = rabbit_amqqueue:list_local_leaders(), + QueuesChunked = ra_lib:lists_chunk(256, LocalLeaderQueues), ?LOG_INFO("Will transfer leadership of ~b quorum queues with current leader on this node", - [length(Queues)]), - _ = [begin - Name = amqqueue:get_name(Q), - ?LOG_DEBUG("Will trigger a leader election for local quorum queue ~ts", - [rabbit_misc:rs(Name)]), - %% we trigger an election and exclude this node from the list of candidates - %% by simply shutting its local QQ replica (Ra server) - RaLeader = amqqueue:get_pid(Q), - ?LOG_DEBUG("Will stop Ra server ~tp", [RaLeader]), - case rabbit_quorum_queue:stop_server(RaLeader) of - ok -> - ?LOG_DEBUG("Successfully stopped Ra server ~tp", [RaLeader]); - {error, nodedown} -> - ?LOG_ERROR("Failed to stop Ra server ~tp: target node was reported as down") - end - end || Q <- Queues], - ?LOG_INFO("Leadership transfer for quorum queues hosted on this node has been initiated"). + [length(LocalLeaderQueues)]), + [begin + [begin + %% we trigger an election and exclude this node from the list of candidates + %% by simply shutting its local QQ replica (Ra server) + RaLeader = amqqueue:get_pid(Q), + ?LOG_DEBUG("Will stop Ra leader ~tp", [RaLeader]), + case rabbit_quorum_queue:stop_server(RaLeader) of + ok -> + ?LOG_DEBUG("Successfully stopped Ra server ~tp", [RaLeader]); + {error, nodedown} -> + ?LOG_ERROR("Failed to stop Ra server ~tp: target node was reported as down") + end, + ok + end || Q <- Queues], + %% wait for leader elections before processing next chunk of queues + [begin + {RaName, LeaderNode} = amqqueue:get_pid(Q), + MemberNodes = lists:delete(LeaderNode, amqqueue:get_nodes(Q)), + %% we don't do any explicit error handling here as it is more + %% important to make progress + _ = lists:any(fun (N) -> + case ra:members({RaName, N}, ?RA_MEMBERS_TIMEOUT) of + {ok, _, _} -> + true; + Err -> + Name = amqqueue:get_name(Q), + ?LOG_DEBUG("Failed to wait for leader election for queue ~ts on ~tp Err ~ts", + [Name, N, Err]), + false + end + end, MemberNodes), + ok + + end || Q <- Queues], + ok + end || Queues <- QueuesChunked], + ?LOG_INFO("Leadership transfer for quorum queues hosted on this node has been initiated"), + ok. %% TODO: I just copied it over, it looks like was always called inside maintenance so... -spec stop_local_quorum_queue_followers() -> ok. diff --git a/deps/rabbitmq_management/priv/www/js/formatters.js b/deps/rabbitmq_management/priv/www/js/formatters.js index bb68af880d..2131534a06 100644 --- a/deps/rabbitmq_management/priv/www/js/formatters.js +++ b/deps/rabbitmq_management/priv/www/js/formatters.js @@ -601,6 +601,10 @@ function fmt_object_state(obj) { explanation = 'The queue does not have sufficient online members to ' + 'make progress' } + else if (obj.state == 'timeout') { + colour = 'yellow'; + explanation = 'The queue did not respond to it\'s status request '; + } return fmt_state(colour, text, explanation); }