diff --git a/deps/rabbit/src/rabbit_queue_location.erl b/deps/rabbit/src/rabbit_queue_location.erl index 21e59bbb64..f7c803eac5 100644 --- a/deps/rabbit/src/rabbit_queue_location.erl +++ b/deps/rabbit/src/rabbit_queue_location.erl @@ -32,9 +32,13 @@ select_leader_and_followers(Q, Size) QueueType = amqqueue:get_type(Q), GetQueues0 = get_queues_for_type(QueueType), QueueCount = rabbit_amqqueue:count(), - {Replicas, GetQueues} = select_replicas(Size, AllNodes, RunningNodes, QueueCount, GetQueues0), + QueueCountStartRandom = application:get_env(rabbit, queue_count_start_random_selection, + ?QUEUE_COUNT_START_RANDOM_SELECTION), + {Replicas, GetQueues} = select_replicas(Size, AllNodes, RunningNodes, + QueueCount, QueueCountStartRandom, GetQueues0), LeaderLocator = leader_locator(Q), - Leader = leader_node(LeaderLocator, Replicas, RunningNodes, QueueCount, GetQueues), + Leader = leader_node(LeaderLocator, Replicas, RunningNodes, + QueueCount, QueueCountStartRandom, GetQueues), Followers = lists:delete(Leader, Replicas), {Leader, Followers}. @@ -65,9 +69,10 @@ leader_locator0(_) -> %% default <<"client-local">>. --spec select_replicas(pos_integer(), [node(),...], [node(),...], non_neg_integer(), function()) -> +-spec select_replicas(pos_integer(), [node(),...], [node(),...], + non_neg_integer(), non_neg_integer(), function()) -> {[node(),...], function()}. -select_replicas(Size, AllNodes, _, _, Fun) +select_replicas(Size, AllNodes, _, _, _, Fun) when length(AllNodes) =< Size -> {AllNodes, Fun}; %% Select nodes in the following order: @@ -76,15 +81,15 @@ select_replicas(Size, AllNodes, _, _, Fun) %% 3.1. If there are many queues: Randomly to avoid expensive calculation of counting replicas %% per node. Random replica selection is good enough for most use cases. %% 3.2. If there are few queues: Nodes with least replicas to have a "balanced" RabbitMQ cluster. -select_replicas(Size, AllNodes, RunningNodes, QueueCount, GetQueues) - when QueueCount >= ?QUEUE_COUNT_START_RANDOM_SELECTION -> +select_replicas(Size, AllNodes, RunningNodes, QueueCount, QueueCountStartRandom, GetQueues) + when QueueCount >= QueueCountStartRandom -> L0 = shuffle(lists:delete(node(), AllNodes)), L1 = lists:sort(fun(X, _Y) -> lists:member(X, RunningNodes) end, L0), {L, _} = lists:split(Size - 1, L1), {[node() | L], GetQueues}; -select_replicas(Size, AllNodes, RunningNodes, _, GetQueues) -> +select_replicas(Size, AllNodes, RunningNodes, _, _, GetQueues) -> Counters0 = maps:from_list([{N, 0} || N <- lists:delete(node(), AllNodes)]), Queues = GetQueues(), Counters = lists:foldl(fun(Q, Acc) -> @@ -112,15 +117,16 @@ select_replicas(Size, AllNodes, RunningNodes, _, GetQueues) -> L = lists:map(fun({N, _}) -> N end, L2), {[node() | L], fun() -> Queues end}. --spec leader_node(queue_leader_locator(), [node(),...], [node(),...], non_neg_integer(), function()) -> +-spec leader_node(queue_leader_locator(), [node(),...], [node(),...], + non_neg_integer(), non_neg_integer(), function()) -> node(). -leader_node(<<"client-local">>, _, _, _, _) -> +leader_node(<<"client-local">>, _, _, _, _, _) -> node(); -leader_node(<<"balanced">>, Nodes0, RunningNodes, QueueCount, _) - when QueueCount >= ?QUEUE_COUNT_START_RANDOM_SELECTION -> +leader_node(<<"balanced">>, Nodes0, RunningNodes, QueueCount, QueueCountStartRandom, _) + when QueueCount >= QueueCountStartRandom -> Nodes = potential_leaders(Nodes0, RunningNodes), lists:nth(rand:uniform(length(Nodes)), Nodes); -leader_node(<<"balanced">>, Nodes0, RunningNodes, _, GetQueues) +leader_node(<<"balanced">>, Nodes0, RunningNodes, _, _, GetQueues) when is_function(GetQueues, 0) -> Nodes = potential_leaders(Nodes0, RunningNodes), Counters0 = maps:from_list([{N, 0} || N <- Nodes]), diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 2324d503d4..f5cb682950 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -74,6 +74,7 @@ groups() -> leader_locator_client_local, leader_locator_balanced, leader_locator_balanced_maintenance, + leader_locator_balanced_random_maintenance, leader_locator_policy ] ++ all_tests()}, @@ -289,6 +290,9 @@ init_per_testcase(Testcase, Config) -> leader_locator_client_local when IsMixed -> {skip, "leader_locator_client_local isn't mixed versions compatible because " "delete_declare isn't mixed versions reliable"}; + leader_locator_balanced_random_maintenance when IsMixed -> + {skip, "leader_locator_balanced_random_maintenance isn't mixed versions compatible because " + "delete_declare isn't mixed versions reliable"}; _ -> Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), @@ -2674,6 +2678,39 @@ leader_locator_balanced_maintenance(Config) -> amqp_channel:call(Ch, #'queue.delete'{queue = Q})) || Q <- Qs]. +leader_locator_balanced_random_maintenance(Config) -> + [S1, S2, S3] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, S1), + Q = ?config(queue_name, Config), + + true = rabbit_ct_broker_helpers:mark_as_being_drained(Config, S2), + ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, + [rabbit, queue_leader_locator, <<"balanced">>]), + ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, + [rabbit, queue_count_start_random_selection, 0]), + + Leaders = [begin + ?assertMatch({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, + [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-quorum-initial-group-size">>, long, 2}])), + {ok, [{_, R1}, {_, R2}], {_, Leader}} = ra:members({ra_name(Q), S1}), + ?assert(lists:member(R1, Servers)), + ?assert(lists:member(R2, Servers)), + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + Leader + end || _ <- lists:seq(1, 10)], + ?assert(lists:member(S1, Leaders)), + ?assertNot(lists:member(S2, Leaders)), + ?assert(lists:member(S3, Leaders)), + + ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, unset_env, + [rabbit, queue_leader_locator]), + ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, unset_env, + [rabbit, queue_count_start_random_selection]), + true = rabbit_ct_broker_helpers:unmark_as_being_drained(Config, S2). + leader_locator_policy(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server),