Add test for random replica and leader selection
This commit is contained in:
parent
eeb7bc98bc
commit
367c8b7d1a
|
|
@ -32,9 +32,13 @@ select_leader_and_followers(Q, Size)
|
|||
QueueType = amqqueue:get_type(Q),
|
||||
GetQueues0 = get_queues_for_type(QueueType),
|
||||
QueueCount = rabbit_amqqueue:count(),
|
||||
{Replicas, GetQueues} = select_replicas(Size, AllNodes, RunningNodes, QueueCount, GetQueues0),
|
||||
QueueCountStartRandom = application:get_env(rabbit, queue_count_start_random_selection,
|
||||
?QUEUE_COUNT_START_RANDOM_SELECTION),
|
||||
{Replicas, GetQueues} = select_replicas(Size, AllNodes, RunningNodes,
|
||||
QueueCount, QueueCountStartRandom, GetQueues0),
|
||||
LeaderLocator = leader_locator(Q),
|
||||
Leader = leader_node(LeaderLocator, Replicas, RunningNodes, QueueCount, GetQueues),
|
||||
Leader = leader_node(LeaderLocator, Replicas, RunningNodes,
|
||||
QueueCount, QueueCountStartRandom, GetQueues),
|
||||
Followers = lists:delete(Leader, Replicas),
|
||||
{Leader, Followers}.
|
||||
|
||||
|
|
@ -65,9 +69,10 @@ leader_locator0(_) ->
|
|||
%% default
|
||||
<<"client-local">>.
|
||||
|
||||
-spec select_replicas(pos_integer(), [node(),...], [node(),...], non_neg_integer(), function()) ->
|
||||
-spec select_replicas(pos_integer(), [node(),...], [node(),...],
|
||||
non_neg_integer(), non_neg_integer(), function()) ->
|
||||
{[node(),...], function()}.
|
||||
select_replicas(Size, AllNodes, _, _, Fun)
|
||||
select_replicas(Size, AllNodes, _, _, _, Fun)
|
||||
when length(AllNodes) =< Size ->
|
||||
{AllNodes, Fun};
|
||||
%% Select nodes in the following order:
|
||||
|
|
@ -76,15 +81,15 @@ select_replicas(Size, AllNodes, _, _, Fun)
|
|||
%% 3.1. If there are many queues: Randomly to avoid expensive calculation of counting replicas
|
||||
%% per node. Random replica selection is good enough for most use cases.
|
||||
%% 3.2. If there are few queues: Nodes with least replicas to have a "balanced" RabbitMQ cluster.
|
||||
select_replicas(Size, AllNodes, RunningNodes, QueueCount, GetQueues)
|
||||
when QueueCount >= ?QUEUE_COUNT_START_RANDOM_SELECTION ->
|
||||
select_replicas(Size, AllNodes, RunningNodes, QueueCount, QueueCountStartRandom, GetQueues)
|
||||
when QueueCount >= QueueCountStartRandom ->
|
||||
L0 = shuffle(lists:delete(node(), AllNodes)),
|
||||
L1 = lists:sort(fun(X, _Y) ->
|
||||
lists:member(X, RunningNodes)
|
||||
end, L0),
|
||||
{L, _} = lists:split(Size - 1, L1),
|
||||
{[node() | L], GetQueues};
|
||||
select_replicas(Size, AllNodes, RunningNodes, _, GetQueues) ->
|
||||
select_replicas(Size, AllNodes, RunningNodes, _, _, GetQueues) ->
|
||||
Counters0 = maps:from_list([{N, 0} || N <- lists:delete(node(), AllNodes)]),
|
||||
Queues = GetQueues(),
|
||||
Counters = lists:foldl(fun(Q, Acc) ->
|
||||
|
|
@ -112,15 +117,16 @@ select_replicas(Size, AllNodes, RunningNodes, _, GetQueues) ->
|
|||
L = lists:map(fun({N, _}) -> N end, L2),
|
||||
{[node() | L], fun() -> Queues end}.
|
||||
|
||||
-spec leader_node(queue_leader_locator(), [node(),...], [node(),...], non_neg_integer(), function()) ->
|
||||
-spec leader_node(queue_leader_locator(), [node(),...], [node(),...],
|
||||
non_neg_integer(), non_neg_integer(), function()) ->
|
||||
node().
|
||||
leader_node(<<"client-local">>, _, _, _, _) ->
|
||||
leader_node(<<"client-local">>, _, _, _, _, _) ->
|
||||
node();
|
||||
leader_node(<<"balanced">>, Nodes0, RunningNodes, QueueCount, _)
|
||||
when QueueCount >= ?QUEUE_COUNT_START_RANDOM_SELECTION ->
|
||||
leader_node(<<"balanced">>, Nodes0, RunningNodes, QueueCount, QueueCountStartRandom, _)
|
||||
when QueueCount >= QueueCountStartRandom ->
|
||||
Nodes = potential_leaders(Nodes0, RunningNodes),
|
||||
lists:nth(rand:uniform(length(Nodes)), Nodes);
|
||||
leader_node(<<"balanced">>, Nodes0, RunningNodes, _, GetQueues)
|
||||
leader_node(<<"balanced">>, Nodes0, RunningNodes, _, _, GetQueues)
|
||||
when is_function(GetQueues, 0) ->
|
||||
Nodes = potential_leaders(Nodes0, RunningNodes),
|
||||
Counters0 = maps:from_list([{N, 0} || N <- Nodes]),
|
||||
|
|
|
|||
|
|
@ -74,6 +74,7 @@ groups() ->
|
|||
leader_locator_client_local,
|
||||
leader_locator_balanced,
|
||||
leader_locator_balanced_maintenance,
|
||||
leader_locator_balanced_random_maintenance,
|
||||
leader_locator_policy
|
||||
]
|
||||
++ all_tests()},
|
||||
|
|
@ -289,6 +290,9 @@ init_per_testcase(Testcase, Config) ->
|
|||
leader_locator_client_local when IsMixed ->
|
||||
{skip, "leader_locator_client_local isn't mixed versions compatible because "
|
||||
"delete_declare isn't mixed versions reliable"};
|
||||
leader_locator_balanced_random_maintenance when IsMixed ->
|
||||
{skip, "leader_locator_balanced_random_maintenance isn't mixed versions compatible because "
|
||||
"delete_declare isn't mixed versions reliable"};
|
||||
_ ->
|
||||
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
|
||||
|
|
@ -2674,6 +2678,39 @@ leader_locator_balanced_maintenance(Config) ->
|
|||
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
|
||||
|| Q <- Qs].
|
||||
|
||||
leader_locator_balanced_random_maintenance(Config) ->
|
||||
[S1, S2, S3] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config, S1),
|
||||
Q = ?config(queue_name, Config),
|
||||
|
||||
true = rabbit_ct_broker_helpers:mark_as_being_drained(Config, S2),
|
||||
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,
|
||||
[rabbit, queue_leader_locator, <<"balanced">>]),
|
||||
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,
|
||||
[rabbit, queue_count_start_random_selection, 0]),
|
||||
|
||||
Leaders = [begin
|
||||
?assertMatch({'queue.declare_ok', Q, 0, 0},
|
||||
declare(Ch, Q,
|
||||
[{<<"x-queue-type">>, longstr, <<"quorum">>},
|
||||
{<<"x-quorum-initial-group-size">>, long, 2}])),
|
||||
{ok, [{_, R1}, {_, R2}], {_, Leader}} = ra:members({ra_name(Q), S1}),
|
||||
?assert(lists:member(R1, Servers)),
|
||||
?assert(lists:member(R2, Servers)),
|
||||
?assertMatch(#'queue.delete_ok'{},
|
||||
amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
|
||||
Leader
|
||||
end || _ <- lists:seq(1, 10)],
|
||||
?assert(lists:member(S1, Leaders)),
|
||||
?assertNot(lists:member(S2, Leaders)),
|
||||
?assert(lists:member(S3, Leaders)),
|
||||
|
||||
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, unset_env,
|
||||
[rabbit, queue_leader_locator]),
|
||||
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, unset_env,
|
||||
[rabbit, queue_count_start_random_selection]),
|
||||
true = rabbit_ct_broker_helpers:unmark_as_being_drained(Config, S2).
|
||||
|
||||
leader_locator_policy(Config) ->
|
||||
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
|
||||
|
|
|
|||
Loading…
Reference in New Issue