Prefer running nodes for replica selection

When declaring a quorum queue or a stream, select its replicas in the
following order:
1. local RabbitMQ node (to have data locality for declaring client)
2. running RabbitMQ nodes
3. RabbitMQ nodes with least quorum queue or stream replicas (to have a "balanced" RabbitMQ cluster).

From now on, quorum queues and streams behave the same way for replica
selection strategy and leader locator strategy.
This commit is contained in:
David Ansari 2022-04-06 18:48:26 +02:00
parent f903ef95cc
commit 1315b1d4b1
7 changed files with 257 additions and 180 deletions

View File

@ -0,0 +1,119 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_queue_location).
-include("amqqueue.hrl").
-export([select_leader_and_followers/2]).
select_leader_and_followers(Q, Size)
when (?amqqueue_is_quorum(Q) orelse ?amqqueue_is_stream(Q)) andalso is_integer(Size) ->
QueueType = amqqueue:get_type(Q),
GetQueues0 = get_queues_for_type(QueueType),
{AllNodes, _DiscNodes, RunningNodes} = rabbit_mnesia:cluster_nodes(status),
{Replicas, GetQueues} = select_replicas(Size, AllNodes, RunningNodes, GetQueues0),
LeaderLocator = leader_locator(
rabbit_queue_type_util:args_policy_lookup(
<<"queue-leader-locator">>,
fun (PolVal, _ArgVal) ->
PolVal
end, Q)),
Leader = leader_node(LeaderLocator, Replicas, RunningNodes, GetQueues),
Followers = lists:delete(Leader, Replicas),
{Leader, Followers}.
select_replicas(Size, AllNodes, _, Fun)
when length(AllNodes) =< Size ->
{AllNodes, Fun};
select_replicas(Size, _, RunningNodes, Fun)
when length(RunningNodes) =:= Size ->
{RunningNodes, Fun};
select_replicas(Size, AllNodes, RunningNodes, GetQueues) ->
%% Select nodes in the following order:
%% 1. local node (to have data locality for declaring client)
%% 2. running nodes
%% 3. nodes with least replicas (to have a "balanced" RabbitMQ cluster).
Local = node(),
true = lists:member(Local, AllNodes),
true = lists:member(Local, RunningNodes),
Counters0 = maps:from_list([{Node, 0} || Node <- lists:delete(Local, AllNodes)]),
Queues = GetQueues(),
Counters = lists:foldl(fun(Q, Acc) ->
#{nodes := Nodes} = amqqueue:get_type_state(Q),
lists:foldl(fun(N, A)
when is_map_key(N, A) ->
maps:update_with(N, fun(C) -> C+1 end, A);
(_, A) ->
A
end, Acc, Nodes)
end, Counters0, Queues),
L0 = maps:to_list(Counters),
L1 = lists:sort(fun({N0, C0}, {N1, C1}) ->
case {lists:member(N0, RunningNodes),
lists:member(N1, RunningNodes)} of
{true, false} ->
true;
{false, true} ->
false;
_ ->
C0 =< C1
end
end, L0),
{L2, _} = lists:split(Size - 1, L1),
L = lists:map(fun({N, _}) -> N end, L2),
{[Local | L], fun() -> Queues end}.
leader_locator(undefined) -> <<"client-local">>;
leader_locator(Val) -> Val.
leader_node(<<"client-local">>, _, _, _) ->
node();
leader_node(<<"random">>, Nodes0, RunningNodes, _) ->
Nodes = potential_leaders(Nodes0, RunningNodes),
lists:nth(rand:uniform(length(Nodes)), Nodes);
leader_node(<<"least-leaders">>, Nodes0, RunningNodes, GetQueues)
when is_function(GetQueues, 0) ->
Nodes = potential_leaders(Nodes0, RunningNodes),
Counters0 = maps:from_list([{N, 0} || N <- Nodes]),
Counters = lists:foldl(fun(Q, Acc) ->
case amqqueue:get_pid(Q) of
{RaName, LeaderNode}
when is_atom(RaName), is_atom(LeaderNode), is_map_key(LeaderNode, Acc) ->
maps:update_with(LeaderNode, fun(C) -> C+1 end, Acc);
StreamLeaderPid
when is_pid(StreamLeaderPid), is_map_key(node(StreamLeaderPid), Acc) ->
maps:update_with(node(StreamLeaderPid), fun(C) -> C+1 end, Acc);
_ ->
Acc
end
end, Counters0, GetQueues()),
{Node, _} = hd(lists:keysort(2, maps:to_list(Counters))),
Node.
potential_leaders(Nodes, AllRunningNodes) ->
RunningNodes = lists:filter(fun(N) ->
lists:member(N, AllRunningNodes)
end, Nodes),
case rabbit_maintenance:filter_out_drained_nodes_local_read(RunningNodes) of
[] ->
%% All running nodes are drained. Let's place the leader on a drained node
%% respecting the requested queue-leader-locator streategy.
RunningNodes;
Filtered ->
Filtered
end.
%% Return a function so that queues are fetched lazily (i.e. only when needed,
%% and at most once when no amqqueue migration is going on).
get_queues_for_type(QueueType) ->
fun() -> rabbit_amqqueue:list_with_possible_retry(
fun() ->
mnesia:dirty_match_object(rabbit_queue,
amqqueue:pattern_match_on_type(QueueType))
end)
end.

View File

@ -1,3 +1,10 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_queue_type).
-include("amqqueue.hrl").
-include_lib("rabbit_common/include/resource.hrl").

View File

@ -180,18 +180,10 @@ start_cluster(Q) ->
rabbit_data_coercion:to_atom(ra:new_uid(N))
end,
Id = {RaName, node()},
AllQuorumQs = rabbit_amqqueue:list_with_possible_retry(
fun() ->
mnesia:dirty_match_object(rabbit_queue,
amqqueue:pattern_match_on_type(?MODULE))
end),
Nodes = select_quorum_nodes(QuorumSize, rabbit_nodes:all(), AllQuorumQs),
LeaderLocator = leader_locator(args_policy_lookup(<<"queue-leader-locator">>,
fun policyHasPrecedence/2, Q)),
LeaderNode = leader_node(LeaderLocator, Nodes, AllQuorumQs),
LeaderId = {RaName, LeaderNode},
{Leader, Followers} = rabbit_queue_location:select_leader_and_followers(Q, QuorumSize),
LeaderId = {RaName, Leader},
NewQ0 = amqqueue:set_pid(Q, LeaderId),
NewQ1 = amqqueue:set_type_state(NewQ0, #{nodes => Nodes}),
NewQ1 = amqqueue:set_type_state(NewQ0, #{nodes => [Leader | Followers]}),
rabbit_log:debug("Will start up to ~w replicas for quorum queue ~s",
[QuorumSize, rabbit_misc:rs(QName)]),
@ -1608,62 +1600,6 @@ get_default_quorum_initial_group_size(Arguments) ->
Val
end.
select_quorum_nodes(Size, AllNodes, _)
when length(AllNodes) =< Size ->
AllNodes;
select_quorum_nodes(Size, AllNodes, AllQuorumQs) ->
%% Select local node (to have data locality for declaring client)
%% and nodes with least quorum queue replicas (to have a "balanced" RabbitMQ cluster).
Local = node(),
true = lists:member(Local, AllNodes),
Counters0 = maps:from_list([{Node, 0} || Node <- lists:delete(Local, AllNodes)]),
Counters = lists:foldl(fun(Q, Acc) ->
lists:foldl(fun(N, A)
when is_map_key(N, A) ->
maps:update_with(N, fun(C) -> C+1 end, A);
(_, A) ->
A
end, Acc, get_nodes(Q))
end, Counters0, AllQuorumQs),
L0 = maps:to_list(Counters),
L1 = lists:keysort(2, L0),
{L, _} = lists:split(Size - 1, L1),
LeastReplicas = lists:map(fun({N, _}) -> N end, L),
[Local | LeastReplicas].
leader_locator(undefined) -> <<"client-local">>;
leader_locator(Val) -> Val.
leader_node(<<"client-local">>, _, _) ->
node();
leader_node(<<"random">>, Nodes0, _) ->
Nodes = potential_leaders(Nodes0),
lists:nth(rand:uniform(length(Nodes)), Nodes);
leader_node(<<"least-leaders">>, Nodes0, AllQuorumQs) ->
Nodes = potential_leaders(Nodes0),
Counters0 = maps:from_list([{N, 0} || N <- Nodes]),
Counters = lists:foldl(fun(Q, Acc) ->
case amqqueue:get_pid(Q) of
{_, LeaderNode}
when is_map_key(LeaderNode, Acc) ->
maps:update_with(LeaderNode, fun(C) -> C+1 end, Acc);
_ ->
Acc
end
end, Counters0, AllQuorumQs),
{Node, _} = hd(lists:keysort(2, maps:to_list(Counters))),
Node.
potential_leaders(Nodes) ->
case rabbit_maintenance:filter_out_drained_nodes_local_read(Nodes) of
[] ->
%% All nodes are drained. Let's place the leader on a drained node
%% respecting the requested queue-leader-locator streategy.
Nodes;
Filtered ->
Filtered
end.
%% member with the current leader first
members(Q) when ?amqqueue_is_quorum(Q) ->
{RaName, LeaderNode} = amqqueue:get_pid(Q),

View File

@ -86,30 +86,35 @@ is_enabled() ->
-spec declare(amqqueue:amqqueue(), node()) ->
{'new' | 'existing', amqqueue:amqqueue()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
declare(Q0, Node) when ?amqqueue_is_stream(Q0) ->
declare(Q0, _Node) when ?amqqueue_is_stream(Q0) ->
case rabbit_queue_type_util:run_checks(
[fun rabbit_queue_type_util:check_auto_delete/1,
fun rabbit_queue_type_util:check_exclusive/1,
fun rabbit_queue_type_util:check_non_durable/1],
Q0) of
ok ->
create_stream(Q0, Node);
create_stream(Q0);
Err ->
Err
end.
create_stream(Q0, Node) ->
create_stream(Q0) ->
Arguments = amqqueue:get_arguments(Q0),
QName = amqqueue:get_name(Q0),
Opts = amqqueue:get_options(Q0),
ActingUser = maps:get(user, Opts, ?UNKNOWN_USER),
Conf0 = make_stream_conf(Node, Q0),
Conf = apply_leader_locator_strategy(Conf0),
#{leader_node := LeaderNode} = Conf,
Conf0 = make_stream_conf(Q0),
InitialClusterSize = initial_cluster_size(
args_policy_lookup(<<"initial-cluster-size">>,
fun policy_precedence/2, Q0)),
{Leader, Followers} = rabbit_queue_location:select_leader_and_followers(Q0, InitialClusterSize),
Conf = maps:merge(Conf0, #{nodes => [Leader | Followers],
leader_node => Leader,
replica_nodes => Followers}),
Q1 = amqqueue:set_type_state(Q0, Conf),
case rabbit_amqqueue:internal_declare(Q1, false) of
{created, Q} ->
case rabbit_stream_coordinator:new_stream(Q, LeaderNode) of
case rabbit_stream_coordinator:new_stream(Q, Leader) of
{ok, {ok, LeaderPid}, _} ->
%% update record with leader pid
set_leader_pid(LeaderPid, amqqueue:get_name(Q)),
@ -770,21 +775,13 @@ delete_replica(VHost, Name, Node) ->
E
end.
make_stream_conf(Node, Q) ->
make_stream_conf(Q) ->
QName = amqqueue:get_name(Q),
Name = stream_name(QName),
%% MaxLength = args_policy_lookup(<<"max-length">>, policy_precedence/2, Q),
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun policy_precedence/2, Q),
MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun policy_precedence/2, Q)),
MaxSegmentSizeBytes = args_policy_lookup(<<"stream-max-segment-size-bytes">>, fun policy_precedence/2, Q),
LeaderLocator = queue_leader_locator(args_policy_lookup(<<"queue-leader-locator">>,
fun policy_precedence/2, Q)),
InitialClusterSize = initial_cluster_size(
args_policy_lookup(<<"initial-cluster-size">>,
fun policy_precedence/2, Q)),
Replicas0 = rabbit_nodes:all() -- [Node],
%% TODO: try to avoid nodes that are not connected
Replicas = select_stream_nodes(InitialClusterSize - 1, Replicas0),
Formatter = {?MODULE, format_osiris_event, [QName]},
Retention = lists:filter(fun({_, R}) ->
R =/= undefined
@ -794,30 +791,9 @@ make_stream_conf(Node, Q) ->
#{reference => QName,
name => Name,
retention => Retention,
nodes => [Node | Replicas],
leader_locator_strategy => LeaderLocator,
leader_node => Node,
replica_nodes => Replicas,
event_formatter => Formatter,
epoch => 1}).
select_stream_nodes(Size, All) when length(All) =< Size ->
All;
select_stream_nodes(Size, All) ->
Node = node(),
case lists:member(Node, All) of
true ->
select_stream_nodes(Size - 1, lists:delete(Node, All), [Node]);
false ->
select_stream_nodes(Size, All, [])
end.
select_stream_nodes(0, _, Selected) ->
Selected;
select_stream_nodes(Size, Rest, Selected) ->
S = lists:nth(rand:uniform(length(Rest)), Rest),
select_stream_nodes(Size - 1, lists:delete(S, Rest), [S | Selected]).
update_stream_conf(undefined, #{} = Conf) ->
Conf;
update_stream_conf(Q, #{} = Conf) when ?is_amqqueue(Q) ->
@ -846,9 +822,6 @@ max_age(Bin) when is_binary(Bin) ->
max_age(Age) ->
Age.
queue_leader_locator(undefined) -> <<"client-local">>;
queue_leader_locator(Val) -> Val.
initial_cluster_size(undefined) ->
length(rabbit_nodes:all());
initial_cluster_size(Val) ->
@ -1020,55 +993,3 @@ set_leader_pid(Pid, QName) ->
_ ->
ok
end.
apply_leader_locator_strategy(#{leader_locator_strategy := <<"client-local">>} = Conf) ->
Conf;
apply_leader_locator_strategy(#{leader_node := Leader,
replica_nodes := Replicas0,
leader_locator_strategy := <<"random">>} = Conf) ->
Replicas = [Leader | Replicas0],
PotentialLeaders = potential_leaders(Replicas),
NewLeader = lists:nth(rand:uniform(length(PotentialLeaders)), PotentialLeaders),
NewReplicas = lists:delete(NewLeader, Replicas),
Conf#{leader_node => NewLeader,
replica_nodes => NewReplicas};
apply_leader_locator_strategy(#{leader_node := Leader,
replica_nodes := Replicas0,
leader_locator_strategy := <<"least-leaders">>} = Conf) ->
Replicas = [Leader | Replicas0],
PotentialLeaders = potential_leaders(Replicas),
Counters0 = maps:from_list([{R, 0} || R <- PotentialLeaders]),
Counters = maps:to_list(
lists:foldl(fun(Q, Acc) ->
P = amqqueue:get_pid(Q),
case amqqueue:get_type(Q) of
?MODULE when is_pid(P) ->
maps:update_with(node(P), fun(V) -> V + 1 end, 1, Acc);
_ ->
Acc
end
end, Counters0, rabbit_amqqueue:list())),
Ordered = lists:keysort(2, Counters),
%% We could have potentially introduced nodes that are not in the list of replicas if
%% initial cluster size is smaller than the cluster size. Let's select the first one
%% that is on the list of replicas
NewLeader = select_first_matching_node(Ordered, Replicas),
NewReplicas = lists:delete(NewLeader, Replicas),
Conf#{leader_node => NewLeader,
replica_nodes => NewReplicas}.
potential_leaders(Nodes) ->
case rabbit_maintenance:filter_out_drained_nodes_local_read(Nodes) of
[] ->
%% All nodes are drained. Let's place the leader on a drained node
%% respecting the requested queue-leader-locator streategy.
Nodes;
Filtered ->
Filtered
end.
select_first_matching_node([{N, _} | Rest], Replicas) ->
case lists:member(N, Replicas) of
true -> N;
false -> select_first_matching_node(Rest, Replicas)
end.

View File

@ -84,7 +84,8 @@ groups() ->
quorum_cluster_size_3,
quorum_cluster_size_7,
node_removal_is_not_quorum_critical,
select_nodes_with_least_replicas
select_nodes_with_least_replicas,
select_nodes_with_least_replicas_node_down
]},
{clustered_with_partitions, [],
[
@ -1402,6 +1403,11 @@ declare_during_node_down(Config) ->
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
RaName = ra_name(QQ),
{ok, Members0, _} = ra:members({RaName, Server}),
%% Since there are not sufficient running nodes, we expect that
%% also stopped nodes are selected as replicas.
Members = lists:map(fun({_, N}) -> N end, Members0),
?assert(same_elements(Members, Servers)),
timer:sleep(2000),
rabbit_ct_broker_helpers:start_node(Config, DownServer),
publish(Ch, QQ),
@ -2736,18 +2742,51 @@ select_nodes_with_least_replicas(Config) ->
declare(Ch, Q,
[{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-quorum-initial-group-size">>, long, 3}])),
{ok, Members, _} = ra:members({ra_name(Q), Server}),
?assertEqual(3, length(Members)),
lists:map(fun({_, N}) -> N end, Members)
{ok, Members0, _} = ra:members({ra_name(Q), Server}),
?assertEqual(3, length(Members0)),
lists:map(fun({_, N}) -> N end, Members0)
end || Q <- Qs],
%% Assert that second queue selected the nodes where first queue does not have replicas.
?assertEqual(5, sets:size(sets:from_list(lists:flatten(Members)))),
[?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
|| Q <- Qs].
select_nodes_with_least_replicas_node_down(Config) ->
[S1, S2 | _ ] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
?assertEqual(ok, rabbit_control_helper:command(stop_app, S2)),
RunningNodes = lists:delete(S2, Servers),
Ch = rabbit_ct_client_helpers:open_channel(Config, S1),
Qs = [?config(queue_name, Config),
?config(alt_queue_name, Config)],
timer:sleep(1000),
Members = [begin
?assertMatch({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q,
[{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-quorum-initial-group-size">>, long, 3}])),
{ok, Members0, _} = ra:members({ra_name(Q), S1}),
?assertEqual(3, length(Members0)),
lists:map(fun({_, N}) -> N end, Members0)
end || Q <- Qs],
%% Assert that
%% 1. no replicas got placed on a node which is down because there are sufficient running nodes, and
%% 2. second queue selected the nodes where first queue does not have replicas.
?assert(same_elements(lists:flatten(Members), RunningNodes)),
?assertEqual(ok, rabbit_control_helper:command(start_app, S2)),
[?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
|| Q <- Qs].
%%----------------------------------------------------------------------------
same_elements(L1, L2)
when is_list(L1), is_list(L2) ->
lists:usort(L1) =:= lists:usort(L2).
declare(Ch, Q) ->
declare(Ch, Q, []).

View File

@ -53,11 +53,15 @@ groups() ->
leader_locator_policy,
queue_size_on_declare,
leader_locator_random_maintenance,
leader_locator_least_leaders_maintenance
leader_locator_least_leaders_maintenance,
leader_locator_random,
leader_locator_least_leaders,
select_nodes_with_least_replicas
]},
{cluster_size_3_1, [], [shrink_coordinator_cluster]},
{cluster_size_3_2, [], [recover,
declare_with_node_down]},
declare_with_node_down_1,
declare_with_node_down_2]},
{cluster_size_3_parallel_1, [parallel], [
delete_replica,
delete_last_replica,
@ -68,9 +72,7 @@ groups() ->
initial_cluster_size_two,
initial_cluster_size_one_policy,
leader_locator_client_local,
declare_delete_same_stream,
leader_locator_random,
leader_locator_least_leaders
declare_delete_same_stream
]},
{cluster_size_3_parallel_2, [parallel], all_tests()},
{unclustered_size_3_1, [], [add_replica]},
@ -708,19 +710,39 @@ restart_single_node(Config) ->
%% the failing case for this test relies on a particular random condition
%% please never consider this a flake
declare_with_node_down(Config) ->
declare_with_node_down_1(Config) ->
[Server1, Server2, Server3] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
rabbit_ct_broker_helpers:stop_node(Config, Server2),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-initial-cluster-size">>, long, 3}])),
check_leader_and_replicas(Config, [Server1, Server3]),
%% Since there are not sufficient running nodes, we expect that
%% also stopped nodes are selected as replicas.
check_members(Config, Servers),
rabbit_ct_broker_helpers:start_node(Config, Server2),
check_leader_and_replicas(Config, Servers),
ok.
declare_with_node_down_2(Config) ->
[Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
rabbit_ct_broker_helpers:stop_node(Config, Server2),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-initial-cluster-size">>, long, 2},
{<<"x-queue-leader-locator">>, longstr, <<"random">>}])),
check_leader_and_replicas(Config, [Server1, Server3]),
%% Since there are sufficient running nodes, we expect that
%% stopped nodes are not selected as replicas.
check_members(Config, [Server1, Server3]),
rabbit_ct_broker_helpers:start_node(Config, Server2),
check_leader_and_replicas(Config, [Server1, Server3]),
ok.
recover(Config) ->
[Server | _] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@ -1887,16 +1909,16 @@ leader_locator_least_leaders(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q = ?config(queue_name, Config),
Bin = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
Q1 = <<Bin/binary, "_q1">>,
Q2 = <<Bin/binary, "_q2">>,
Q1 = <<"q1">>,
Q2 = <<"q2">>,
?assertEqual({'queue.declare_ok', Q1, 0, 0},
declare(Ch, Q1, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
?assertEqual({'queue.declare_ok', Q2, 0, 0},
declare(Ch, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"least-leaders">>}])),
@ -1905,7 +1927,32 @@ leader_locator_least_leaders(Config) ->
Leader = proplists:get_value(leader, Info),
?assert(lists:member(Leader, [Server2, Server3])),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, [[Q2, Q1, Q]]).
select_nodes_with_least_replicas(Config) ->
[Server1 | _ ] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q = ?config(queue_name, Config),
Bin = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
Q1 = <<Bin/binary, "_q1">>,
Qs = [Q1, Q],
[Q1Members, QMembers] =
lists:map(fun(Q0) ->
?assertEqual({'queue.declare_ok', Q0, 0, 0},
declare(Ch, Q0, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-initial-cluster-size">>, long, 2}])),
Infos = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, info_all,
[<<"/">>, [name, members]]),
Name = rabbit_misc:r(<<"/">>, queue, Q0),
[Info] = [Props || Props <- Infos, lists:member({name, Name}, Props)],
proplists:get_value(members, Info)
end, Qs),
%% We expect that the second stream chose nodes where the first stream does not have replicas.
?assertEqual(lists:usort(Servers),
lists:usort(Q1Members ++ QMembers)),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, [Qs]).
leader_locator_least_leaders_maintenance(Config) ->
[Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@ -2137,9 +2184,8 @@ purge(Config) ->
%%----------------------------------------------------------------------------
delete_queues() ->
[{ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
|| Q <- rabbit_amqqueue:list()].
delete_queues(Qs) when is_list(Qs) ->
lists:foreach(fun delete_testcase_queue/1, Qs).
delete_testcase_queue(Name) ->
QName = rabbit_misc:r(<<"/">>, queue, Name),
@ -2177,7 +2223,16 @@ check_leader_and_replicas(Config, Members, Tag) ->
ct:pal("~s members ~w ~p", [?FUNCTION_NAME, Members, Info]),
lists:member(proplists:get_value(leader, Info), Members)
andalso (lists:sort(Members) == lists:sort(proplists:get_value(Tag, Info)))
end, 60000).
end, 60_000).
check_members(Config, ExpectedMembers) ->
rabbit_ct_helpers:await_condition(
fun () ->
Info = find_queue_info(Config, 0, [members]),
Members = proplists:get_value(members, Info),
ct:pal("~s members ~w ~p", [?FUNCTION_NAME, Members, Info]),
lists:sort(ExpectedMembers) == lists:sort(Members)
end, 20_000).
publish(Ch, Queue) ->
publish(Ch, Queue, <<"msg">>).

View File

@ -1033,7 +1033,7 @@ is_mixed_versions(Config) ->
%% -------------------------------------------------------------------
await_condition(ConditionFun) ->
await_condition(ConditionFun, 10000).
await_condition(ConditionFun, 10_000).
await_condition(ConditionFun, Timeout) ->
Retries = ceil(Timeout / 50),