From 429f87913e1f81a635777c6ae8a8168c57764b9c Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 26 Jan 2021 14:47:15 +0300 Subject: [PATCH] Synchronously add mirrors when transferring ownership of classic mirrored queues. There are cases when asynchronously adding mirrors makes a lot of sense: e.g. when a new node joins the cluster. In this case, if we add mirrors asynchronously, this operation will race with the step that removes mirrors. As a result, we can end up with a queue that decided that it had no promotable replicas => data loss from the transfer. Closes #2749. Pairs: @dcorbacho, @mkuratczyk --- deps/rabbit/src/rabbit_maintenance.erl | 34 ++++++++---- deps/rabbit/src/rabbit_mirror_queue_misc.erl | 55 +++++++++++--------- 2 files changed, 55 insertions(+), 34 deletions(-) diff --git a/deps/rabbit/src/rabbit_maintenance.erl b/deps/rabbit/src/rabbit_maintenance.erl index 3b73512648..c8063a4ba4 100644 --- a/deps/rabbit/src/rabbit_maintenance.erl +++ b/deps/rabbit/src/rabbit_maintenance.erl @@ -25,7 +25,7 @@ resume_all_client_listeners/0, close_all_client_connections/0, primary_replica_transfer_candidate_nodes/0, - random_primary_replica_transfer_candidate_node/1, + random_primary_replica_transfer_candidate_node/2, transfer_leadership_of_quorum_queues/1, transfer_leadership_of_classic_mirrored_queues/1, status_table_name/0, @@ -256,12 +256,14 @@ transfer_leadership_of_classic_mirrored_queues(TransferCandidates) -> ReadableCandidates = readable_candidate_list(TransferCandidates), rabbit_log:info("Will transfer leadership of ~b classic mirrored queues hosted on this node to these peer nodes: ~s", [length(Queues), ReadableCandidates]), - [begin Name = amqqueue:get_name(Q), - case random_primary_replica_transfer_candidate_node(TransferCandidates) of + ExistingReplicaNodes = [node(Pid) || Pid <- amqqueue:get_sync_slave_pids(Q)], + rabbit_log:debug("Local ~s has replicas on nodes ~s", + [rabbit_misc:rs(Name), readable_candidate_list(ExistingReplicaNodes)]), + case random_primary_replica_transfer_candidate_node(TransferCandidates, ExistingReplicaNodes) of {ok, Pick} -> - rabbit_log:debug("Will transfer leadership of local queue ~s to node ~s", + rabbit_log:debug("Will transfer leadership of local ~s to node ~s", [rabbit_misc:rs(Name), Pick]), case rabbit_mirror_queue_misc:transfer_leadership(Q, Pick) of {migrated, _} -> @@ -300,18 +302,30 @@ stop_local_quorum_queue_followers() -> end || Q <- Queues], rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node"). - -spec primary_replica_transfer_candidate_nodes() -> [node()]. +-spec primary_replica_transfer_candidate_nodes() -> [node()]. primary_replica_transfer_candidate_nodes() -> filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]). --spec random_primary_replica_transfer_candidate_node([node()]) -> {ok, node()} | undefined. -random_primary_replica_transfer_candidate_node([]) -> +-spec random_primary_replica_transfer_candidate_node([node()], [node()]) -> {ok, node()} | undefined. +random_primary_replica_transfer_candidate_node([], _Preferred) -> undefined; -random_primary_replica_transfer_candidate_node(Candidates) -> - Nth = erlang:phash2(erlang:monotonic_time(), length(Candidates)), - Candidate = lists:nth(Nth + 1, Candidates), +random_primary_replica_transfer_candidate_node(Candidates, PreferredNodes) -> + Overlap = sets:to_list(sets:intersection(sets:from_list(Candidates), sets:from_list(PreferredNodes))), + Candidate = case Overlap of + [] -> + %% Since ownership transfer is meant to be run only when we are sure + %% there are in-sync replicas to transfer to, this is an edge case. + %% We skip the transfer. + undefined; + Nodes -> + random_nth(Nodes) + end, {ok, Candidate}. +random_nth(Nodes) -> + Nth = erlang:phash2(erlang:monotonic_time(), length(Nodes)), + lists:nth(Nth + 1, Nodes). + revive_local_quorum_queue_replicas() -> Queues = rabbit_amqqueue:list_local_followers(), [begin diff --git a/deps/rabbit/src/rabbit_mirror_queue_misc.erl b/deps/rabbit/src/rabbit_mirror_queue_misc.erl index 563b4f29fe..9a89caf236 100644 --- a/deps/rabbit/src/rabbit_mirror_queue_misc.erl +++ b/deps/rabbit/src/rabbit_mirror_queue_misc.erl @@ -235,11 +235,9 @@ add_mirror(QName, MirrorNode, SyncMode) -> case rabbit_vhost_sup_sup:get_vhost_sup(VHost, MirrorNode) of {ok, _} -> try - SPid = rabbit_amqqueue_sup_sup:start_queue_process( - MirrorNode, Q, slave), - log_info(QName, "Adding mirror on node ~p: ~p~n", - [MirrorNode, SPid]), - rabbit_mirror_queue_slave:go(SPid, SyncMode) + MirrorPid = rabbit_amqqueue_sup_sup:start_queue_process(MirrorNode, Q, slave), + log_info(QName, "Adding mirror on node ~p: ~p~n", [MirrorNode, MirrorPid]), + rabbit_mirror_queue_slave:go(MirrorPid, SyncMode) of _ -> ok catch @@ -447,14 +445,15 @@ is_mirrored_ha_nodes(Q) -> end. actual_queue_nodes(Q) when ?is_amqqueue(Q) -> - MPid = amqqueue:get_pid(Q), - SPids = amqqueue:get_slave_pids(Q), - SSPids = amqqueue:get_sync_slave_pids(Q), - Nodes = fun (L) -> [node(Pid) || Pid <- L] end, - {case MPid of - none -> none; - _ -> node(MPid) - end, Nodes(SPids), Nodes(SSPids)}. + PrimaryPid = amqqueue:get_pid(Q), + MirrorPids = amqqueue:get_slave_pids(Q), + InSyncMirrorPids = amqqueue:get_sync_slave_pids(Q), + CollectNodes = fun (L) -> [node(Pid) || Pid <- L] end, + NodeHostingPrimary = case PrimaryPid of + none -> none; + _ -> node(PrimaryPid) + end, + {NodeHostingPrimary, CollectNodes(MirrorPids), CollectNodes(InSyncMirrorPids)}. -spec maybe_auto_sync(amqqueue:amqqueue()) -> 'ok'. @@ -520,19 +519,19 @@ update_mirrors(OldQ, NewQ) when ?amqqueue_pids_are_equal(OldQ, NewQ) -> update_mirrors(Q) when ?is_amqqueue(Q) -> QName = amqqueue:get_name(Q), - {OldMNode, OldSNodes, _} = actual_queue_nodes(Q), - {NewMNode, NewSNodes} = suggested_queue_nodes(Q), - OldNodes = [OldMNode | OldSNodes], - NewNodes = [NewMNode | NewSNodes], + {PreTransferPrimaryNode, PreTransferMirrorNodes, __PreTransferInSyncMirrorNodes} = actual_queue_nodes(Q), + {NewlySelectedPrimaryNode, NewlySelectedMirrorNodes} = suggested_queue_nodes(Q), + PreTransferNodesWithReplicas = [PreTransferPrimaryNode | PreTransferMirrorNodes], + NewlySelectedNodesWithReplicas = [NewlySelectedPrimaryNode | NewlySelectedMirrorNodes], %% When a mirror dies, remove_from_queue/2 might have to add new - %% mirrors (in "exactly" mode). It will check mnesia to see which + %% mirrors (in "exactly" mode). It will check the queue record to see which %% mirrors there currently are. If drop_mirror/2 is invoked first %% then when we end up in remove_from_queue/2 it will not see the %% mirrors that add_mirror/2 will add, and also want to add them %% (even though we are not responding to the death of a %% mirror). Breakage ensues. - add_mirrors (QName, NewNodes -- OldNodes, async), - drop_mirrors(QName, OldNodes -- NewNodes), + add_mirrors(QName, NewlySelectedNodesWithReplicas -- PreTransferNodesWithReplicas, async), + drop_mirrors(QName, PreTransferNodesWithReplicas -- NewlySelectedNodesWithReplicas), %% This is for the case where no extra nodes were added but we changed to %% a policy requiring auto-sync. maybe_auto_sync(Q), @@ -548,10 +547,18 @@ get_replicas(Q) -> transfer_leadership(Q, Destination) -> QName = amqqueue:get_name(Q), - {OldMNode, OldSNodes, _} = actual_queue_nodes(Q), - OldNodes = [OldMNode | OldSNodes], - add_mirrors(QName, [Destination] -- OldNodes, async), - drop_mirrors(QName, OldNodes -- [Destination]), + {PreTransferPrimaryNode, PreTransferMirrorNodes, _PreTransferInSyncMirrorNodes} = actual_queue_nodes(Q), + PreTransferNodesWithReplicas = [PreTransferPrimaryNode | PreTransferMirrorNodes], + + NodesToAddMirrorsOn = [Destination] -- PreTransferNodesWithReplicas, + %% This will wait for the transfer/eager sync to finish before we begin dropping + %% mirrors on the next step. In this case we cannot add mirrors asynchronously + %% as that will race with the dropping step. + add_mirrors(QName, NodesToAddMirrorsOn, sync), + + NodesToDropMirrorsOn = PreTransferNodesWithReplicas -- [Destination], + drop_mirrors(QName, NodesToDropMirrorsOn), + {Result, NewQ} = wait_for_new_master(QName, Destination), update_mirrors(NewQ), Result.