Choose master from intersection of sync slaves and policy nodes
Previously, when applying an HA policy with the 'nodes' type, if the current master was not part of the new policy, then the first synchronised slave was chosen as the new master. This would happen even if the node was not part of the new policy. For example, if we began with: Master A Slaves [B, C] Then requested a new policy of just [C], we'd end up with: Master B Slaves [C] Instead, choose the master from the intersection of (nodes in the policy) and (synchronised slaves). Issue #990 [#126767013]
This commit is contained in:
parent
75795f977c
commit
c1c56a29b0
|
|
@ -32,29 +32,37 @@
|
|||
description() ->
|
||||
[{description, <<"Mirror queue to specified nodes">>}].
|
||||
|
||||
suggested_queue_nodes(Nodes0, MNode, _SNodes, SSNodes, Poss) ->
|
||||
Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
|
||||
suggested_queue_nodes(PolicyNodes0, CurrentMaster, _SNodes, SSNodes, NodesRunningRabbitMQ) ->
|
||||
PolicyNodes1 = [list_to_atom(binary_to_list(Node)) || Node <- PolicyNodes0],
|
||||
%% If the current master is not in the nodes specified, then what we want
|
||||
%% to do depends on whether there are any synchronised slaves. If there
|
||||
%% are then we can just kill the current master - the admin has asked for
|
||||
%% a migration and we should give it to them. If there are not however
|
||||
%% then we must keep the master around so as not to lose messages.
|
||||
Nodes = case SSNodes of
|
||||
[] -> lists:usort([MNode | Nodes1]);
|
||||
_ -> Nodes1
|
||||
end,
|
||||
Unavailable = Nodes -- Poss,
|
||||
Available = Nodes -- Unavailable,
|
||||
case Available of
|
||||
|
||||
PolicyNodes = case SSNodes of
|
||||
[] -> lists:usort([CurrentMaster | PolicyNodes1]);
|
||||
_ -> PolicyNodes1
|
||||
end,
|
||||
Unavailable = PolicyNodes -- NodesRunningRabbitMQ,
|
||||
AvailablePolicyNodes = PolicyNodes -- Unavailable,
|
||||
case AvailablePolicyNodes of
|
||||
[] -> %% We have never heard of anything? Not much we can do but
|
||||
%% keep the master alive.
|
||||
{MNode, []};
|
||||
_ -> case lists:member(MNode, Available) of
|
||||
true -> {MNode, Available -- [MNode]};
|
||||
{CurrentMaster, []};
|
||||
_ -> case lists:member(CurrentMaster, AvailablePolicyNodes) of
|
||||
true -> {CurrentMaster,
|
||||
AvailablePolicyNodes -- [CurrentMaster]};
|
||||
false -> %% Make sure the new master is synced! In order to
|
||||
%% get here SSNodes must not be empty.
|
||||
[NewMNode | _] = SSNodes,
|
||||
{NewMNode, Available -- [NewMNode]}
|
||||
SyncPolicyNodes = [Node ||
|
||||
Node <- AvailablePolicyNodes,
|
||||
lists:member(Node, SSNodes)],
|
||||
NewMaster = case SyncPolicyNodes of
|
||||
[Node | _] -> Node;
|
||||
[] -> erlang:hd(SSNodes)
|
||||
end,
|
||||
{NewMaster, AvailablePolicyNodes -- [NewMaster]}
|
||||
end
|
||||
end.
|
||||
|
||||
|
|
|
|||
|
|
@ -61,7 +61,8 @@ groups() ->
|
|||
]},
|
||||
{cluster_size_3, [], [
|
||||
change_policy,
|
||||
rapid_change
|
||||
rapid_change,
|
||||
nodes_policy_should_pick_master_from_its_params
|
||||
% FIXME: Re-enable those tests when the know issues are
|
||||
% fixed.
|
||||
%failing_random_policies,
|
||||
|
|
@ -258,6 +259,48 @@ promote_on_shutdown(Config) ->
|
|||
durable = true}),
|
||||
ok.
|
||||
|
||||
nodes_policy_should_pick_master_from_its_params(Config) ->
|
||||
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config,
|
||||
nodename),
|
||||
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config, A),
|
||||
?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A],
|
||||
[all])),
|
||||
%% --> Master: A
|
||||
%% Slaves: [B, C] or [C, B]
|
||||
Info = find_queue(?QNAME, A),
|
||||
SSPids = proplists:get_value(synchronised_slave_pids, Info),
|
||||
|
||||
%% Choose slave that isn't the first sync slave. Cover a bug that always
|
||||
%% chose the first, even if it was not part of the policy
|
||||
LastSlave = node(lists:last(SSPids)),
|
||||
?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A],
|
||||
[{nodes, [LastSlave]}])),
|
||||
%% --> Master: B or C (depends on the order of current slaves)
|
||||
%% Slaves: []
|
||||
|
||||
%% Now choose a new master that isn't synchronised. The previous
|
||||
%% policy made sure that the queue only runs on one node (the last
|
||||
%% from the initial synchronised list). Thus, by taking the first
|
||||
%% node from this list, we know it is not synchronised.
|
||||
%%
|
||||
%% Because the policy doesn't cover any synchronised slave, RabbitMQ
|
||||
%% should instead use an existing synchronised slave as the new master,
|
||||
%% even though that isn't in the policy.
|
||||
?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A],
|
||||
[{nodes, [LastSlave, A]}])),
|
||||
%% --> Master: B or C (same as previous policy)
|
||||
%% Slaves: [A]
|
||||
|
||||
NewMaster = node(erlang:hd(SSPids)),
|
||||
?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A],
|
||||
[{nodes, [NewMaster]}])),
|
||||
%% --> Master: B or C (the other one compared to previous policy)
|
||||
%% Slaves: []
|
||||
|
||||
amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}),
|
||||
_ = rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY).
|
||||
|
||||
random_policy(Config) ->
|
||||
run_proper(fun prop_random_policy/1, [Config]).
|
||||
|
||||
|
|
@ -364,9 +407,8 @@ prop_random_policy(Config) ->
|
|||
Policies, non_empty(list(policy_gen(Nodes))),
|
||||
test_random_policy(Config, Nodes, Policies)).
|
||||
|
||||
test_random_policy(Config, Nodes, Policies) ->
|
||||
apply_policy_to_declared_queue(Config, Ch, Nodes, Policies) ->
|
||||
[NodeA | _] = Nodes,
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA),
|
||||
amqp_channel:call(Ch, #'queue.declare'{queue = ?QNAME}),
|
||||
%% Add some load so mirrors can be busy synchronising
|
||||
rabbit_ct_client_helpers:publish(Ch, ?QNAME, 100000),
|
||||
|
|
@ -375,7 +417,12 @@ test_random_policy(Config, Nodes, Policies) ->
|
|||
%% Give it some time to generate all internal notifications
|
||||
timer:sleep(2000),
|
||||
%% Check the result
|
||||
Result = wait_for_last_policy(?QNAME, NodeA, Policies, 30),
|
||||
wait_for_last_policy(?QNAME, NodeA, Policies, 30).
|
||||
|
||||
test_random_policy(Config, Nodes, Policies) ->
|
||||
[NodeA | _] = Nodes,
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA),
|
||||
Result = apply_policy_to_declared_queue(Config, Ch, Nodes, Policies),
|
||||
%% Cleanup
|
||||
amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}),
|
||||
_ = rabbit_ct_broker_helpers:clear_policy(Config, NodeA, ?POLICY),
|
||||
|
|
|
|||
Loading…
Reference in New Issue