From f32e80c01c9a11320167d91357627854d42bcae4 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 11 Apr 2022 09:43:50 +0200 Subject: [PATCH] Convert random and least-leaders to balanced Deprecate queue-leader-locator values 'random' and 'least-leaders'. Both become value 'balanced'. From now on only queue-leader-locator values 'client-local', and 'balanced' should be set. 'balanced' will place the leader on the node with the least leaders if there are few queues and will select a random leader if there are many queues. This avoid expensive least leaders calculation if there are many queues. This change also allows us to change the implementation of 'balanced' in the future. For example 'balanced' could place a leader on a node depending on resource usage or available node resources. There is no need to expose implementation details like 'random' or 'least-leaders' as configuration to users. --- deps/rabbit/src/rabbit_policies.erl | 3 + deps/rabbit/src/rabbit_queue_location.erl | 166 +++++++++--------- deps/rabbit/test/quorum_queue_SUITE.erl | 54 +----- .../rabbit/test/rabbit_stream_queue_SUITE.erl | 159 +++++------------ .../rabbitmq_management/priv/www/js/global.js | 2 +- ...CLI.Ctl.Commands.AddSuperStreamCommand.erl | 8 +- .../src/rabbit_stream_manager.erl | 4 +- 7 files changed, 146 insertions(+), 250 deletions(-) diff --git a/deps/rabbit/src/rabbit_policies.erl b/deps/rabbit/src/rabbit_policies.erl index 5e93869882..a52b4f2c52 100644 --- a/deps/rabbit/src/rabbit_policies.erl +++ b/deps/rabbit/src/rabbit_policies.erl @@ -168,6 +168,9 @@ validate_policy0(<<"max-age">>, Value) -> validate_policy0(<<"queue-leader-locator">>, <<"client-local">>) -> ok; +validate_policy0(<<"queue-leader-locator">>, <<"balanced">>) -> + ok; +%% 'random' and 'least-leaders' are deprecated and get mapped to 'balanced' validate_policy0(<<"queue-leader-locator">>, <<"random">>) -> ok; validate_policy0(<<"queue-leader-locator">>, <<"least-leaders">>) -> diff --git a/deps/rabbit/src/rabbit_queue_location.erl b/deps/rabbit/src/rabbit_queue_location.erl index b631cf5167..9730914e48 100644 --- a/deps/rabbit/src/rabbit_queue_location.erl +++ b/deps/rabbit/src/rabbit_queue_location.erl @@ -12,9 +12,9 @@ -export([queue_leader_locators/0, select_leader_and_followers/2]). --define(QUEUES_LIMIT_FOR_LEAST_REPLICAS_SELECTION, 1_000). --define(QUEUE_LEADER_LOCATORS, [<<"client-local">>, <<"random">>, <<"least-leaders">>]). --define(DEFAULT_QUEUE_LEADER_LOCATOR, <<"client-local">>). +-define(QUEUE_LEADER_LOCATORS_DEPRECATED, [<<"random">>, <<"least-leaders">>]). +-define(QUEUE_LEADER_LOCATORS, [<<"client-local">>, <<"balanced">>] ++ ?QUEUE_LEADER_LOCATORS_DEPRECATED). +-define(QUEUE_COUNT_START_RANDOM_SELECTION, 1_000). -type queue_leader_locator() :: nonempty_binary(). @@ -27,98 +27,100 @@ queue_leader_locators() -> {Leader :: node(), Followers :: [node()]}. select_leader_and_followers(Q, Size) when (?amqqueue_is_quorum(Q) orelse ?amqqueue_is_stream(Q)) andalso is_integer(Size) -> + {AllNodes, _DiscNodes, RunningNodes} = rabbit_mnesia:cluster_nodes(status), + true = lists:member(node(), AllNodes), QueueType = amqqueue:get_type(Q), GetQueues0 = get_queues_for_type(QueueType), - {AllNodes, _DiscNodes, RunningNodes} = rabbit_mnesia:cluster_nodes(status), - {Replicas, GetQueues} = select_replicas(Size, AllNodes, RunningNodes, GetQueues0), + QueueCount = rabbit_amqqueue:count(), + {Replicas, GetQueues} = select_replicas(Size, AllNodes, RunningNodes, QueueCount, GetQueues0), LeaderLocator = leader_locator(Q), - Leader = leader_node(LeaderLocator, Replicas, RunningNodes, GetQueues), + Leader = leader_node(LeaderLocator, Replicas, RunningNodes, QueueCount, GetQueues), Followers = lists:delete(Leader, Replicas), {Leader, Followers}. --spec select_replicas(pos_integer(), [node(),...], [node(),...], function()) -> - {[node(),...], function()}. -select_replicas(Size, AllNodes, _, Fun) - when length(AllNodes) =< Size -> - {AllNodes, Fun}; -select_replicas(Size, AllNodes, RunningNodes, GetQueues) -> - %% Select nodes in the following order: - %% 1. Local node to have data locality for declaring client. - %% 2. Running nodes. - %% 3.1. If there are few queues: Nodes with least replicas to have a "balanced" RabbitMQ cluster. - %% 3.2. If there are many queues: Randomly to avoid expensive calculation of counting replicas - %% per node. Random replica selection is good enough for most use cases. - Local = node(), - true = lists:member(Local, AllNodes), - RemoteNodes = lists:delete(Local, AllNodes), - case rabbit_amqqueue:count() of - Count when Count =< ?QUEUES_LIMIT_FOR_LEAST_REPLICAS_SELECTION -> - Counters0 = maps:from_list([{N, 0} || N <- RemoteNodes]), - Queues = GetQueues(), - Counters = lists:foldl(fun(Q, Acc) -> - #{nodes := Nodes} = amqqueue:get_type_state(Q), - lists:foldl(fun(N, A) - when is_map_key(N, A) -> - maps:update_with(N, fun(C) -> C+1 end, A); - (_, A) -> - A - end, Acc, Nodes) - end, Counters0, Queues), - L0 = maps:to_list(Counters), - L1 = lists:sort(fun({N0, C0}, {N1, C1}) -> - case {lists:member(N0, RunningNodes), - lists:member(N1, RunningNodes)} of - {true, false} -> - true; - {false, true} -> - false; - _ -> - C0 =< C1 - end - end, L0), - {L2, _} = lists:split(Size - 1, L1), - L = lists:map(fun({N, _}) -> N end, L2), - {[Local | L], fun() -> Queues end}; - _ -> - L0 = shuffle(RemoteNodes), - L1 = lists:sort(fun(X, _Y) -> - lists:member(X, RunningNodes) - end, L0), - {L, _} = lists:split(Size - 1, L1), - {[Local | L], GetQueues} - end. - -spec leader_locator(amqqueue:amqqueue()) -> queue_leader_locator(). leader_locator(Q) -> - case rabbit_queue_type_util:args_policy_lookup( - <<"queue-leader-locator">>, - fun (PolVal, _ArgVal) -> PolVal end, - Q) of - undefined -> - case application:get_env(rabbit, queue_leader_locator) of - {ok, Locator} -> - case lists:member(Locator, ?QUEUE_LEADER_LOCATORS) of - true -> - Locator; - false -> - ?DEFAULT_QUEUE_LEADER_LOCATOR - end; - undefined -> - ?DEFAULT_QUEUE_LEADER_LOCATOR - end; - Val -> - Val - end. + L = case rabbit_queue_type_util:args_policy_lookup( + <<"queue-leader-locator">>, + fun (PolVal, _ArgVal) -> PolVal end, + Q) of + undefined -> + application:get_env(rabbit, queue_leader_locator, undefined); + Val -> + Val + end, + leader_locator0(L). --spec leader_node(queue_leader_locator(), [node(),...], [node(),...], function()) -> +leader_locator0(<<"client-local">>) -> + <<"client-local">>; +leader_locator0(<<"balanced">>) -> + <<"balanced">>; +%% 'random' and 'least-leaders' are deprecated +leader_locator0(<<"random">>) -> + <<"balanced">>; +leader_locator0(<<"least-leaders">>) -> + <<"balanced">>; +leader_locator0(_) -> + %% default + <<"client-local">>. + +-spec select_replicas(pos_integer(), [node(),...], [node(),...], non_neg_integer(), function()) -> + {[node(),...], function()}. +select_replicas(Size, AllNodes, _, _, Fun) + when length(AllNodes) =< Size -> + {AllNodes, Fun}; +%% Select nodes in the following order: +%% 1. Local node to have data locality for declaring client. +%% 2. Running nodes. +%% 3.1. If there are many queues: Randomly to avoid expensive calculation of counting replicas +%% per node. Random replica selection is good enough for most use cases. +%% 3.2. If there are few queues: Nodes with least replicas to have a "balanced" RabbitMQ cluster. +select_replicas(Size, AllNodes, RunningNodes, QueueCount, GetQueues) + when QueueCount >= ?QUEUE_COUNT_START_RANDOM_SELECTION -> + L0 = shuffle(lists:delete(node(), AllNodes)), + L1 = lists:sort(fun(X, _Y) -> + lists:member(X, RunningNodes) + end, L0), + {L, _} = lists:split(Size - 1, L1), + {[node() | L], GetQueues}; +select_replicas(Size, AllNodes, RunningNodes, _, GetQueues) -> + Counters0 = maps:from_list([{N, 0} || N <- lists:delete(node(), AllNodes)]), + Queues = GetQueues(), + Counters = lists:foldl(fun(Q, Acc) -> + #{nodes := Nodes} = amqqueue:get_type_state(Q), + lists:foldl(fun(N, A) + when is_map_key(N, A) -> + maps:update_with(N, fun(C) -> C+1 end, A); + (_, A) -> + A + end, Acc, Nodes) + end, Counters0, Queues), + L0 = maps:to_list(Counters), + L1 = lists:sort(fun({N0, C0}, {N1, C1}) -> + case {lists:member(N0, RunningNodes), + lists:member(N1, RunningNodes)} of + {true, false} -> + true; + {false, true} -> + false; + _ -> + C0 =< C1 + end + end, L0), + {L2, _} = lists:split(Size - 1, L1), + L = lists:map(fun({N, _}) -> N end, L2), + {[node() | L], fun() -> Queues end}. + +-spec leader_node(queue_leader_locator(), [node(),...], [node(),...], non_neg_integer(), function()) -> node(). -leader_node(<<"client-local">>, _, _, _) -> +leader_node(<<"client-local">>, _, _, _, _) -> node(); -leader_node(<<"random">>, Nodes0, RunningNodes, _) -> +leader_node(<<"balanced">>, Nodes0, RunningNodes, QueueCount, _) + when QueueCount >= ?QUEUE_COUNT_START_RANDOM_SELECTION -> Nodes = potential_leaders(Nodes0, RunningNodes), lists:nth(rand:uniform(length(Nodes)), Nodes); -leader_node(<<"least-leaders">>, Nodes0, RunningNodes, GetQueues) +leader_node(<<"balanced">>, Nodes0, RunningNodes, _, GetQueues) when is_function(GetQueues, 0) -> Nodes = potential_leaders(Nodes0, RunningNodes), Counters0 = maps:from_list([{N, 0} || N <- Nodes]), @@ -146,8 +148,6 @@ potential_leaders(Replicas, RunningNodes) -> RunningReplicas -> case rabbit_maintenance:filter_out_drained_nodes_local_read(RunningReplicas) of [] -> - %% All selected replica nodes are drained. Let's place the leader on a - %% drained node respecting the requested queue-leader-locator streategy. RunningReplicas; Filtered -> Filtered diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index a045cba9ed..6f3fa5fe98 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -72,10 +72,8 @@ groups() -> file_handle_reservations_above_limit, node_removal_is_not_quorum_critical, leader_locator_client_local, - leader_locator_least_leaders, - leader_locator_least_leaders_maintenance, - leader_locator_random, - leader_locator_random_maintenance, + leader_locator_balanced, + leader_locator_balanced_maintenance, leader_locator_policy ] ++ all_tests()}, @@ -2624,7 +2622,7 @@ leader_locator_client_local(Config) -> amqp_channel:call(Ch, #'queue.delete'{queue = Q})) end || Server <- Servers]. -leader_locator_least_leaders(Config) -> +leader_locator_balanced(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), Qs = [?config(queue_name, Config), @@ -2635,7 +2633,7 @@ leader_locator_least_leaders(Config) -> ?assertMatch({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-queue-leader-locator">>, longstr, <<"least-leaders">>}])), + {<<"x-queue-leader-locator">>, longstr, <<"balanced">>}])), {ok, _, {_, Leader}} = ra:members({ra_name(Q), Server}), Leader end || Q <- Qs], @@ -2645,7 +2643,7 @@ leader_locator_least_leaders(Config) -> amqp_channel:call(Ch, #'queue.delete'{queue = Q})) || Q <- Qs]. -leader_locator_least_leaders_maintenance(Config) -> +leader_locator_balanced_maintenance(Config) -> [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, S1), Qs = [?config(queue_name, Config), @@ -2657,7 +2655,7 @@ leader_locator_least_leaders_maintenance(Config) -> ?assertMatch({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-queue-leader-locator">>, longstr, <<"least-leaders">>}])), + {<<"x-queue-leader-locator">>, longstr, <<"balanced">>}])), {ok, _, {_, Leader}} = ra:members({ra_name(Q), S1}), Leader end || Q <- Qs], @@ -2670,44 +2668,6 @@ leader_locator_least_leaders_maintenance(Config) -> amqp_channel:call(Ch, #'queue.delete'{queue = Q})) || Q <- Qs]. -leader_locator_random(Config) -> - Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - Q = ?config(queue_name, Config), - - Leaders = [begin - ?assertMatch({'queue.declare_ok', Q, 0, 0}, - declare(Ch, Q, - [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-queue-leader-locator">>, longstr, <<"random">>}])), - {ok, _, {_, Leader}} = ra:members({ra_name(Q), Server}), - ?assertMatch(#'queue.delete_ok'{}, - amqp_channel:call(Ch, #'queue.delete'{queue = Q})), - Leader - end || _ <- lists:seq(1, 15)], - ?assertEqual(3, sets:size(sets:from_list(Leaders))). - -leader_locator_random_maintenance(Config) -> - [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, S1), - Q = ?config(queue_name, Config), - - rabbit_ct_broker_helpers:mark_as_being_drained(Config, S2), - Leaders = [begin - ?assertMatch({'queue.declare_ok', Q, 0, 0}, - declare(Ch, Q, - [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-queue-leader-locator">>, longstr, <<"random">>}])), - {ok, _, {_, Leader}} = ra:members({ra_name(Q), S1}), - ?assertMatch(#'queue.delete_ok'{}, - amqp_channel:call(Ch, #'queue.delete'{queue = Q})), - Leader - end || _ <- lists:seq(1, 12)], - ?assert(lists:member(S1, Leaders)), - ?assertNot(lists:member(S2, Leaders)), - ?assert(lists:member(S3, Leaders)), - rabbit_ct_broker_helpers:unmark_as_being_drained(Config, S2). - leader_locator_policy(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), @@ -2716,7 +2676,7 @@ leader_locator_policy(Config) -> <<"leader_locator_policy_q3">>], ok = rabbit_ct_broker_helpers:set_policy( Config, 0, <<"my-leader-locator">>, <<"leader_locator_policy_.*">>, <<"queues">>, - [{<<"queue-leader-locator">>, <<"least-leaders">>}]), + [{<<"queue-leader-locator">>, <<"balanced">>}]), Leaders = [begin ?assertMatch({'queue.declare_ok', Q, 0, 0}, diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index e2808b11a6..edd8082f4b 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -52,10 +52,8 @@ groups() -> publish_coordinator_unavailable, leader_locator_policy, queue_size_on_declare, - leader_locator_random_maintenance, - leader_locator_least_leaders_maintenance, - leader_locator_random, - leader_locator_least_leaders, + leader_locator_balanced, + leader_locator_balanced_maintenance, select_nodes_with_least_replicas ]}, {cluster_size_3_1, [], [shrink_coordinator_cluster]}, @@ -734,7 +732,7 @@ declare_with_node_down_2(Config) -> ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-initial-cluster-size">>, long, 2}, - {<<"x-queue-leader-locator">>, longstr, <<"random">>}])), + {<<"x-queue-leader-locator">>, longstr, <<"balanced">>}])), check_leader_and_replicas(Config, [Server1, Server3]), %% Since there are sufficient running nodes, we expect that %% stopped nodes are not selected as replicas. @@ -1852,82 +1850,51 @@ leader_locator_client_local(Config) -> amqp_channel:call(Ch3, #'queue.delete'{queue = Q})), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). -leader_locator_random(Config) -> - [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), - Q = ?config(queue_name, Config), - - ?assertEqual({'queue.declare_ok', Q, 0, 0}, - declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-queue-leader-locator">>, longstr, <<"random">>}])), - - Info = find_queue_info(Config, [leader]), - Leader = proplists:get_value(leader, Info), - - ?assertMatch(#'queue.delete_ok'{}, - amqp_channel:call(Ch, #'queue.delete'{queue = Q})), - - repeat_until( - fun() -> - ?assertMatch(#'queue.delete_ok'{}, - amqp_channel:call(Ch, #'queue.delete'{queue = Q})), - - ?assertEqual({'queue.declare_ok', Q, 0, 0}, - declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-queue-leader-locator">>, longstr, <<"random">>}])), - - Info2 = find_queue_info(Config, [leader]), - Leader2 = proplists:get_value(leader, Info2), - - Leader =/= Leader2 - end, 10), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). - -leader_locator_random_maintenance(Config) -> - [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, S1), - Q = ?config(queue_name, Config), - rabbit_ct_broker_helpers:mark_as_being_drained(Config, S2), - - [begin - ?assertEqual({'queue.declare_ok', Q, 0, 0}, - declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-queue-leader-locator">>, longstr, <<"random">>}])), - - Info = find_queue_info(Config, [leader]), - Leader = proplists:get_value(leader, Info), - ?assert(lists:member(Leader, [S1, S3])), - ?assertMatch(#'queue.delete_ok'{}, - amqp_channel:call(Ch, #'queue.delete'{queue = Q})) - end || _ <- lists:seq(1, 7)], - rabbit_ct_broker_helpers:unmark_as_being_drained(Config, S2). - -leader_locator_least_leaders(Config) -> +leader_locator_balanced(Config) -> [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), Q = ?config(queue_name, Config), Bin = rabbit_data_coercion:to_binary(?FUNCTION_NAME), Q1 = <>, - Q2 = <>, ?assertEqual({'queue.declare_ok', Q1, 0, 0}, declare(Ch, Q1, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), - ?assertEqual({'queue.declare_ok', Q2, 0, 0}, - declare(Ch, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-queue-leader-locator">>, longstr, <<"least-leaders">>}])), + {<<"x-queue-leader-locator">>, longstr, <<"balanced">>}])), Info = find_queue_info(Config, [leader]), Leader = proplists:get_value(leader, Info), ?assert(lists:member(Leader, [Server2, Server3])), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, [[Q2, Q1, Q]]). + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, [[Q1, Q]]). + +leader_locator_balanced_maintenance(Config) -> + [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1), + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2), + Q = ?config(queue_name, Config), + Q1 = <<"q1">>, + Q2 = <<"q2">>, + ?assertEqual({'queue.declare_ok', Q1, 0, 0}, + declare(Ch1, Q1, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), + ?assertEqual({'queue.declare_ok', Q2, 0, 0}, + declare(Ch2, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), + rabbit_ct_broker_helpers:mark_as_being_drained(Config, Server3), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"balanced">>}])), + + Info = find_queue_info(Config, [leader]), + Leader = proplists:get_value(leader, Info), + ?assert(lists:member(Leader, [Server1, Server2])), + + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, Server3), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). select_nodes_with_least_replicas(Config) -> [Server1 | _ ] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1954,68 +1921,34 @@ select_nodes_with_least_replicas(Config) -> lists:usort(Q1Members ++ QMembers)), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, [Qs]). -leader_locator_least_leaders_maintenance(Config) -> - [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1), - Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2), - Q = ?config(queue_name, Config), - Q1 = <<"q1">>, - Q2 = <<"q2">>, - ?assertEqual({'queue.declare_ok', Q1, 0, 0}, - declare(Ch1, Q1, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), - ?assertEqual({'queue.declare_ok', Q2, 0, 0}, - declare(Ch2, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), - rabbit_ct_broker_helpers:mark_as_being_drained(Config, Server3), - ?assertEqual({'queue.declare_ok', Q, 0, 0}, - declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-queue-leader-locator">>, longstr, <<"least-leaders">>}])), - - Info = find_queue_info(Config, [leader]), - Leader = proplists:get_value(leader, Info), - ?assert(lists:member(Leader, [Server1, Server2])), - - rabbit_ct_broker_helpers:unmark_as_being_drained(Config, Server3), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). - leader_locator_policy(Config) -> - [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + Bin = rabbit_data_coercion:to_binary(?FUNCTION_NAME), + Q1 = <>, ok = rabbit_ct_broker_helpers:set_policy( - Config, 0, <<"leader-locator">>, <<"leader_locator_.*">>, <<"queues">>, - [{<<"queue-leader-locator">>, <<"random">>}]), + Config, 0, <<"my-leader-locator">>, Q, <<"queues">>, + [{<<"queue-leader-locator">>, <<"balanced">>}]), - Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q1, 0, 0}, + declare(Ch, Q1, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), Info = find_queue_info(Config, [policy, operator_policy, effective_policy_definition, leader]), - - ?assertEqual(<<"leader-locator">>, proplists:get_value(policy, Info)), + ?assertEqual(<<"my-leader-locator">>, proplists:get_value(policy, Info)), ?assertEqual('', proplists:get_value(operator_policy, Info)), - ?assertEqual([{<<"queue-leader-locator">>, <<"random">>}], + ?assertEqual([{<<"queue-leader-locator">>, <<"balanced">>}], proplists:get_value(effective_policy_definition, Info)), - Leader = proplists:get_value(leader, Info), + ?assert(lists:member(Leader, [Server2, Server3])), - repeat_until( - fun() -> - ?assertMatch(#'queue.delete_ok'{}, - amqp_channel:call(Ch, #'queue.delete'{queue = Q})), - - ?assertEqual({'queue.declare_ok', Q, 0, 0}, - declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), - - Info2 = find_queue_info(Config, [leader]), - Leader2 = proplists:get_value(leader, Info2), - Leader =/= Leader2 - end, 10), - - ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"leader-locator">>), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"my-leader-locator">>), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, [[Q1, Q]]). queue_size_on_declare(Config) -> [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), diff --git a/deps/rabbitmq_management/priv/www/js/global.js b/deps/rabbitmq_management/priv/www/js/global.js index 9addcca21d..2946f50a58 100644 --- a/deps/rabbitmq_management/priv/www/js/global.js +++ b/deps/rabbitmq_management/priv/www/js/global.js @@ -237,7 +237,7 @@ var HELP = { 'Set the queue into master location mode, determining the rule by which the queue master is located when declared on a cluster of nodes.
(Sets the "x-queue-master-locator" argument.)', 'queue-leader-locator': - 'Set the rule by which the queue leader is located when declared on a cluster of nodes. Valid values are client-local (default), random and least-leaders.', + 'Set the rule by which the queue leader is located when declared on a cluster of nodes. Valid values are client-local (default) and balanced.', 'queue-initial-cluster-size': 'Set the queue initial cluster size.', diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl index 147c446dfb..885ddb690c 100644 --- a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl +++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl @@ -99,6 +99,9 @@ validate_stream_arguments(#{stream_max_segment_size_bytes := Value} = validate_stream_arguments(#{leader_locator := <<"client-local">>} = Opts) -> validate_stream_arguments(maps:remove(leader_locator, Opts)); +validate_stream_arguments(#{leader_locator := <<"balanced">>} = Opts) -> + validate_stream_arguments(maps:remove(leader_locator, Opts)); +%% 'random' and 'least-leaders' are deprecated and get mapped to 'balanced' validate_stream_arguments(#{leader_locator := <<"random">>} = Opts) -> validate_stream_arguments(maps:remove(leader_locator, Opts)); validate_stream_arguments(#{leader_locator := <<"least-leaders">>} = @@ -107,7 +110,7 @@ validate_stream_arguments(#{leader_locator := <<"least-leaders">>} = validate_stream_arguments(#{leader_locator := _}) -> {validation_failure, "Invalid value for --leader-locator, valid values " - "are client-local, random, least-leaders."}; + "are client-local, balanced."}; validate_stream_arguments(#{initial_cluster_size := Value} = Opts) -> try case rabbit_data_coercion:to_integer(Value) of @@ -160,8 +163,7 @@ usage_additional() -> "example values: 500mb, 1gb."], ["--leader-locator ", "Leader locator strategy for partition streams, " - "possible values are client-local, least-leaders, " - "random."], + "possible values are client-local, balanced."], ["--initial-cluster-size ", "The initial cluster size of partition streams."]]. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 2bb65d581d..6f8758ce8d 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -169,9 +169,7 @@ validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long, validate_stream_queue_arguments([{<<"x-queue-leader-locator">>, longstr, Locator} | T]) -> - case lists:member(Locator, - [<<"client-local">>, <<"random">>, <<"least-leaders">>]) - of + case lists:member(Locator, rabbit_queue_location:queue_leader_locators()) of true -> validate_stream_queue_arguments(T); false ->