Synchronously add mirrors when transferring ownership

of classic mirrored queues.

There are cases when asynchronously adding mirrors makes
a lot of sense: e.g. when a new node joins the cluster.
In this case, if we add mirrors asynchronously, this
operation will race with the step that removes mirrors.
As a result, we can end up with a queue that decided
that it had no promotable replicas => data loss
from the transfer.

Closes #2749.

Pairs: @dcorbacho, @mkuratczyk
This commit is contained in:
Michael Klishin 2021-01-26 14:47:15 +03:00
parent 4f43f393bf
commit 429f87913e
No known key found for this signature in database
GPG Key ID: E80EDCFA0CDB21EE
2 changed files with 55 additions and 34 deletions

View File

@ -25,7 +25,7 @@
resume_all_client_listeners/0,
close_all_client_connections/0,
primary_replica_transfer_candidate_nodes/0,
random_primary_replica_transfer_candidate_node/1,
random_primary_replica_transfer_candidate_node/2,
transfer_leadership_of_quorum_queues/1,
transfer_leadership_of_classic_mirrored_queues/1,
status_table_name/0,
@ -256,12 +256,14 @@ transfer_leadership_of_classic_mirrored_queues(TransferCandidates) ->
ReadableCandidates = readable_candidate_list(TransferCandidates),
rabbit_log:info("Will transfer leadership of ~b classic mirrored queues hosted on this node to these peer nodes: ~s",
[length(Queues), ReadableCandidates]),
[begin
Name = amqqueue:get_name(Q),
case random_primary_replica_transfer_candidate_node(TransferCandidates) of
ExistingReplicaNodes = [node(Pid) || Pid <- amqqueue:get_sync_slave_pids(Q)],
rabbit_log:debug("Local ~s has replicas on nodes ~s",
[rabbit_misc:rs(Name), readable_candidate_list(ExistingReplicaNodes)]),
case random_primary_replica_transfer_candidate_node(TransferCandidates, ExistingReplicaNodes) of
{ok, Pick} ->
rabbit_log:debug("Will transfer leadership of local queue ~s to node ~s",
rabbit_log:debug("Will transfer leadership of local ~s to node ~s",
[rabbit_misc:rs(Name), Pick]),
case rabbit_mirror_queue_misc:transfer_leadership(Q, Pick) of
{migrated, _} ->
@ -300,18 +302,30 @@ stop_local_quorum_queue_followers() ->
end || Q <- Queues],
rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node").
-spec primary_replica_transfer_candidate_nodes() -> [node()].
-spec primary_replica_transfer_candidate_nodes() -> [node()].
primary_replica_transfer_candidate_nodes() ->
filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]).
-spec random_primary_replica_transfer_candidate_node([node()]) -> {ok, node()} | undefined.
random_primary_replica_transfer_candidate_node([]) ->
-spec random_primary_replica_transfer_candidate_node([node()], [node()]) -> {ok, node()} | undefined.
random_primary_replica_transfer_candidate_node([], _Preferred) ->
undefined;
random_primary_replica_transfer_candidate_node(Candidates) ->
Nth = erlang:phash2(erlang:monotonic_time(), length(Candidates)),
Candidate = lists:nth(Nth + 1, Candidates),
random_primary_replica_transfer_candidate_node(Candidates, PreferredNodes) ->
Overlap = sets:to_list(sets:intersection(sets:from_list(Candidates), sets:from_list(PreferredNodes))),
Candidate = case Overlap of
[] ->
%% Since ownership transfer is meant to be run only when we are sure
%% there are in-sync replicas to transfer to, this is an edge case.
%% We skip the transfer.
undefined;
Nodes ->
random_nth(Nodes)
end,
{ok, Candidate}.
random_nth(Nodes) ->
Nth = erlang:phash2(erlang:monotonic_time(), length(Nodes)),
lists:nth(Nth + 1, Nodes).
revive_local_quorum_queue_replicas() ->
Queues = rabbit_amqqueue:list_local_followers(),
[begin

View File

@ -235,11 +235,9 @@ add_mirror(QName, MirrorNode, SyncMode) ->
case rabbit_vhost_sup_sup:get_vhost_sup(VHost, MirrorNode) of
{ok, _} ->
try
SPid = rabbit_amqqueue_sup_sup:start_queue_process(
MirrorNode, Q, slave),
log_info(QName, "Adding mirror on node ~p: ~p~n",
[MirrorNode, SPid]),
rabbit_mirror_queue_slave:go(SPid, SyncMode)
MirrorPid = rabbit_amqqueue_sup_sup:start_queue_process(MirrorNode, Q, slave),
log_info(QName, "Adding mirror on node ~p: ~p~n", [MirrorNode, MirrorPid]),
rabbit_mirror_queue_slave:go(MirrorPid, SyncMode)
of
_ -> ok
catch
@ -447,14 +445,15 @@ is_mirrored_ha_nodes(Q) ->
end.
actual_queue_nodes(Q) when ?is_amqqueue(Q) ->
MPid = amqqueue:get_pid(Q),
SPids = amqqueue:get_slave_pids(Q),
SSPids = amqqueue:get_sync_slave_pids(Q),
Nodes = fun (L) -> [node(Pid) || Pid <- L] end,
{case MPid of
none -> none;
_ -> node(MPid)
end, Nodes(SPids), Nodes(SSPids)}.
PrimaryPid = amqqueue:get_pid(Q),
MirrorPids = amqqueue:get_slave_pids(Q),
InSyncMirrorPids = amqqueue:get_sync_slave_pids(Q),
CollectNodes = fun (L) -> [node(Pid) || Pid <- L] end,
NodeHostingPrimary = case PrimaryPid of
none -> none;
_ -> node(PrimaryPid)
end,
{NodeHostingPrimary, CollectNodes(MirrorPids), CollectNodes(InSyncMirrorPids)}.
-spec maybe_auto_sync(amqqueue:amqqueue()) -> 'ok'.
@ -520,19 +519,19 @@ update_mirrors(OldQ, NewQ) when ?amqqueue_pids_are_equal(OldQ, NewQ) ->
update_mirrors(Q) when ?is_amqqueue(Q) ->
QName = amqqueue:get_name(Q),
{OldMNode, OldSNodes, _} = actual_queue_nodes(Q),
{NewMNode, NewSNodes} = suggested_queue_nodes(Q),
OldNodes = [OldMNode | OldSNodes],
NewNodes = [NewMNode | NewSNodes],
{PreTransferPrimaryNode, PreTransferMirrorNodes, __PreTransferInSyncMirrorNodes} = actual_queue_nodes(Q),
{NewlySelectedPrimaryNode, NewlySelectedMirrorNodes} = suggested_queue_nodes(Q),
PreTransferNodesWithReplicas = [PreTransferPrimaryNode | PreTransferMirrorNodes],
NewlySelectedNodesWithReplicas = [NewlySelectedPrimaryNode | NewlySelectedMirrorNodes],
%% When a mirror dies, remove_from_queue/2 might have to add new
%% mirrors (in "exactly" mode). It will check mnesia to see which
%% mirrors (in "exactly" mode). It will check the queue record to see which
%% mirrors there currently are. If drop_mirror/2 is invoked first
%% then when we end up in remove_from_queue/2 it will not see the
%% mirrors that add_mirror/2 will add, and also want to add them
%% (even though we are not responding to the death of a
%% mirror). Breakage ensues.
add_mirrors (QName, NewNodes -- OldNodes, async),
drop_mirrors(QName, OldNodes -- NewNodes),
add_mirrors(QName, NewlySelectedNodesWithReplicas -- PreTransferNodesWithReplicas, async),
drop_mirrors(QName, PreTransferNodesWithReplicas -- NewlySelectedNodesWithReplicas),
%% This is for the case where no extra nodes were added but we changed to
%% a policy requiring auto-sync.
maybe_auto_sync(Q),
@ -548,10 +547,18 @@ get_replicas(Q) ->
transfer_leadership(Q, Destination) ->
QName = amqqueue:get_name(Q),
{OldMNode, OldSNodes, _} = actual_queue_nodes(Q),
OldNodes = [OldMNode | OldSNodes],
add_mirrors(QName, [Destination] -- OldNodes, async),
drop_mirrors(QName, OldNodes -- [Destination]),
{PreTransferPrimaryNode, PreTransferMirrorNodes, _PreTransferInSyncMirrorNodes} = actual_queue_nodes(Q),
PreTransferNodesWithReplicas = [PreTransferPrimaryNode | PreTransferMirrorNodes],
NodesToAddMirrorsOn = [Destination] -- PreTransferNodesWithReplicas,
%% This will wait for the transfer/eager sync to finish before we begin dropping
%% mirrors on the next step. In this case we cannot add mirrors asynchronously
%% as that will race with the dropping step.
add_mirrors(QName, NodesToAddMirrorsOn, sync),
NodesToDropMirrorsOn = PreTransferNodesWithReplicas -- [Destination],
drop_mirrors(QName, NodesToDropMirrorsOn),
{Result, NewQ} = wait_for_new_master(QName, Destination),
update_mirrors(NewQ),
Result.