Merge branch 'stable'

Conflicts:
	src/rabbit_queue_location_random.erl
	test/queue_master_location_SUITE.erl
This commit is contained in:
Michael Klishin 2017-09-23 22:37:07 +03:00
commit 065c849786
No known key found for this signature in database
GPG Key ID: D1A1B77724CE35D5
6 changed files with 149 additions and 34 deletions

1
.gitignore vendored
View File

@ -3,6 +3,7 @@
.*.sw?
*.beam
*.coverdata
MnesiaCore.*
/.erlang.mk/
/cover/
/debug/

View File

@ -19,8 +19,9 @@
-export([remove_from_queue/3, on_vhost_up/1, add_mirrors/3,
report_deaths/4, store_updated_slaves/1,
initial_queue_node/2, suggested_queue_nodes/1,
is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1,
initial_queue_node/2, suggested_queue_nodes/1, actual_queue_nodes/1,
is_mirrored/1, is_mirrored_ha_nodes/1,
update_mirrors/2, update_mirrors/1, validate_policy/1,
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
sync_batch_size/1, log_info/3, log_warning/3]).
@ -31,6 +32,8 @@
-include("rabbit.hrl").
-define(HA_NODES_MODULE, rabbit_mirror_queue_mode_nodes).
-rabbit_boot_step(
{?MODULE,
[{description, "HA policy validation"},
@ -370,6 +373,12 @@ is_mirrored(Q) ->
_ -> false
end.
is_mirrored_ha_nodes(Q) ->
case module(Q) of
{ok, ?HA_NODES_MODULE} -> true;
_ -> false
end.
actual_queue_nodes(#amqqueue{pid = MPid,
slave_pids = SPids,
sync_slave_pids = SSPids}) ->

View File

@ -37,8 +37,8 @@ description() ->
[{description,
<<"Locate queue master node from cluster node with least bound queues">>}].
queue_master_location(#amqqueue{}) ->
Cluster = rabbit_queue_master_location_misc:all_nodes(),
queue_master_location(#amqqueue{} = Q) ->
Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
VHosts = rabbit_vhost:list(),
BoundQueueMasters = get_bound_queue_masters_per_vhost(VHosts, []),
{_Count, MinMaster}= get_min_master(Cluster, BoundQueueMasters),

View File

@ -37,8 +37,8 @@ description() ->
[{description,
<<"Locate queue master node from cluster in a random manner">>}].
queue_master_location(#amqqueue{}) ->
Cluster = rabbit_queue_master_location_misc:all_nodes(),
queue_master_location(#amqqueue{} = Q) ->
Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
RandomPos = erlang:phash2(erlang:monotonic_time(), length(Cluster)),
MasterNode = lists:nth(RandomPos + 1, Cluster),
{ok, MasterNode}.

View File

@ -24,7 +24,7 @@
get_location_mod_by_config/1,
get_location_mod_by_args/1,
get_location_mod_by_policy/1,
all_nodes/0]).
all_nodes/1]).
lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin),
is_binary(VHostPath) ->
@ -92,4 +92,20 @@ get_location_mod_by_config(#amqqueue{}) ->
_ -> {error, "queue_master_locator undefined"}
end.
all_nodes() -> rabbit_mnesia:cluster_nodes(running).
all_nodes(Queue = #amqqueue{}) ->
handle_is_mirrored_ha_nodes(rabbit_mirror_queue_misc:is_mirrored_ha_nodes(Queue), Queue).
handle_is_mirrored_ha_nodes(false, _Queue) ->
% Note: ha-mode is NOT 'nodes' - it is either exactly or all, which means
% that any node in the cluster is eligible to be the new queue master node
rabbit_nodes:all_running();
handle_is_mirrored_ha_nodes(true, Queue) ->
% Note: ha-mode is 'nodes', which explicitly specifies allowed nodes.
% We must use suggested_queue_nodes to get that list of nodes as the
% starting point for finding the queue master location
handle_suggested_queue_nodes(rabbit_mirror_queue_misc:suggested_queue_nodes(Queue)).
handle_suggested_queue_nodes({_MNode, []}) ->
rabbit_nodes:all_running();
handle_suggested_queue_nodes({MNode, SNodes}) ->
[MNode | SNodes].

View File

@ -34,6 +34,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
@ -50,6 +51,9 @@ groups() ->
{cluster_size_3, [], [
declare_args,
declare_policy,
declare_policy_nodes,
declare_policy_all,
declare_policy_exactly,
declare_config,
calculate_min_master,
calculate_random,
@ -111,7 +115,7 @@ end_per_testcase(Testcase, Config) ->
declare_args(Config) ->
setup_test_environment(Config),
unset_location_config(Config),
QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
Args = [{<<"x-queue-master-locator">>, longstr, <<"min-masters">>}],
declare(Config, QueueName, false, false, Args, none),
verify_min_master(Config, Q).
@ -120,14 +124,75 @@ declare_policy(Config) ->
setup_test_environment(Config),
unset_location_config(Config),
set_location_policy(Config, ?POLICY, <<"min-masters">>),
QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
declare(Config, QueueName, false, false, _Args=[], none),
verify_min_master(Config, Q).
declare_policy_nodes(Config) ->
setup_test_environment(Config),
unset_location_config(Config),
% Note:
% Node0 has 15 queues, Node1 has 8 and Node2 has 1
Node0Name = rabbit_data_coercion:to_binary(
rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)),
Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
Node1Name = rabbit_data_coercion:to_binary(Node1),
Nodes = [Node1Name, Node0Name],
Policy = [{<<"queue-master-locator">>, <<"min-masters">>},
{<<"ha-mode">>, <<"nodes">>},
{<<"ha-params">>, Nodes}],
ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?POLICY,
<<".*">>, <<"queues">>, Policy),
QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
declare(Config, QueueName, false, false, _Args=[], none),
verify_min_master(Config, Q, Node1).
declare_policy_all(Config) ->
setup_test_environment(Config),
unset_location_config(Config),
% Note:
% Node0 has 15 queues, Node1 has 8 and Node2 has 1
Policy = [{<<"queue-master-locator">>, <<"min-masters">>},
{<<"ha-mode">>, <<"all">>}],
ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?POLICY,
<<".*">>, <<"queues">>, Policy),
QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
declare(Config, QueueName, false, false, _Args=[], none),
verify_min_master(Config, Q).
declare_policy_exactly(Config) ->
setup_test_environment(Config),
unset_location_config(Config),
Policy = [{<<"queue-master-locator">>, <<"min-masters">>},
{<<"ha-mode">>, <<"exactly">>},
{<<"ha-params">>, 2}],
ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?POLICY,
<<".*">>, <<"queues">>, Policy),
QueueRes = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
declare(Config, QueueRes, false, false, _Args=[], none),
Node0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
rabbit_ct_broker_helpers:control_action(sync_queue, Node0,
[binary_to_list(Q)], [{"-p", "/"}]),
wait_for_sync(Config, Node0, QueueRes, 1),
{ok, Queue} = rabbit_ct_broker_helpers:rpc(Config, Node0,
rabbit_amqqueue, lookup, [QueueRes]),
{MNode0, [SNode], [SSNode]} = rabbit_ct_broker_helpers:rpc(Config, Node0,
rabbit_mirror_queue_misc,
actual_queue_nodes, [Queue]),
?assertEqual(SNode, SSNode),
{ok, MNode1} = rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_queue_master_location_misc,
lookup_master, [Q, ?DEFAULT_VHOST_PATH]),
?assertEqual(MNode0, MNode1),
Node2 = rabbit_ct_broker_helpers:get_node_config(Config, 2, nodename),
?assertEqual(MNode1, Node2).
declare_config(Config) ->
setup_test_environment(Config),
set_location_config(Config, <<"min-masters">>),
QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
declare(Config, QueueName, false, false, _Args=[], none),
verify_min_master(Config, Q),
unset_location_config(Config),
@ -139,7 +204,7 @@ declare_config(Config) ->
calculate_min_master(Config) ->
setup_test_environment(Config),
QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
Args = [{<<"x-queue-master-locator">>, longstr, <<"min-masters">>}],
declare(Config, QueueName, false, false, Args, none),
verify_min_master(Config, Q),
@ -147,7 +212,7 @@ calculate_min_master(Config) ->
calculate_random(Config) ->
setup_test_environment(Config),
QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
Args = [{<<"x-queue-master-locator">>, longstr, <<"random">>}],
declare(Config, QueueName, false, false, Args, none),
verify_random(Config, Q),
@ -155,7 +220,7 @@ calculate_random(Config) ->
calculate_client_local(Config) ->
setup_test_environment(Config),
QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>),
Args = [{<<"x-queue-master-locator">>, longstr, <<"client-local">>}],
declare(Config, QueueName, false, false, Args, none),
verify_client_local(Config, Q),
@ -232,42 +297,66 @@ min_master_node(Config) ->
set_location_config(Config, Strategy) ->
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[ok = rpc:call(Node, application, set_env,
[rabbit, queue_master_locator, Strategy]) || Node <- Nodes],
[ok = rabbit_ct_broker_helpers:rpc(Config, Node,
application, set_env,
[rabbit, queue_master_locator, Strategy]) || Node <- Nodes],
ok.
unset_location_config(Config) ->
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[ok = rpc:call(Node, application, unset_env,
[rabbit, queue_master_locator]) || Node <- Nodes],
[ok = rabbit_ct_broker_helpers:rpc(Config, Node,
application, unset_env,
[rabbit, queue_master_locator]) || Node <- Nodes],
ok.
declare(Config, QueueName, Durable, AutoDelete, Args, Owner) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
{new, Queue} = rpc:call(Node, rabbit_amqqueue, declare,
[QueueName, Durable, AutoDelete, Args, Owner,
<<"acting-user">>]),
declare(Config, QueueName, Durable, AutoDelete, Args0, Owner) ->
Args1 = [QueueName, Durable, AutoDelete, Args0, Owner, <<"acting-user">>],
{new, Queue} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, declare, Args1),
Queue.
verify_min_master(Config, Q, MinMasterNode) ->
Rpc = rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_queue_master_location_misc,
lookup_master, [Q, ?DEFAULT_VHOST_PATH]),
?assertEqual({ok, MinMasterNode}, Rpc).
verify_min_master(Config, Q) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
MinMaster = min_master_node(Config),
ct:pal("Expecting min master ~p~n", [MinMaster]),
{ok, MinMaster} = rpc:call(Node, rabbit_queue_master_location_misc,
lookup_master, [Q, ?DEFAULT_VHOST_PATH]).
verify_min_master(Config, Q, MinMaster).
verify_random(Config, Q) ->
[Node | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config,
nodename),
{ok, Master} = rpc:call(Node, rabbit_queue_master_location_misc,
lookup_master, [Q, ?DEFAULT_VHOST_PATH]),
true = lists:member(Master, Nodes).
[Node | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
{ok, Master} = rabbit_ct_broker_helpers:rpc(Config, Node,
rabbit_queue_master_location_misc,
lookup_master, [Q, ?DEFAULT_VHOST_PATH]),
?assert(lists:member(Master, Nodes)).
verify_client_local(Config, Q) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
{ok, Node} = rpc:call(Node, rabbit_queue_master_location_misc,
lookup_master, [Q, ?DEFAULT_VHOST_PATH]).
Rpc = rabbit_ct_broker_helpers:rpc(Config, Node,
rabbit_queue_master_location_misc,
lookup_master, [Q, ?DEFAULT_VHOST_PATH]),
?assertEqual({ok, Node}, Rpc).
set_location_policy(Config, Name, Strategy) ->
ok = rabbit_ct_broker_helpers:set_policy(Config, 0,
Name, <<".*">>, <<"queues">>, [{<<"queue-master-locator">>, Strategy}]).
wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen) ->
wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen, 600).
wait_for_sync(_, _, _, _, 0) ->
throw(sync_timeout);
wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen, N) ->
case synced(Config, Nodename, Q, ExpectedSSPidLen) of
true -> ok;
false -> timer:sleep(100),
wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen, N-1)
end.
synced(Config, Nodename, Q, ExpectedSSPidLen) ->
Args = [<<"/">>, [name, synchronised_slave_pids]],
Info = rabbit_ct_broker_helpers:rpc(Config, Nodename,
rabbit_amqqueue, info_all, Args),
[SSPids] = [Pids || [{name, Q1}, {synchronised_slave_pids, Pids}] <- Info, Q =:= Q1],
length(SSPids) =:= ExpectedSSPidLen.