Convert random and least-leaders to balanced

Deprecate queue-leader-locator values 'random' and 'least-leaders'.
Both become value 'balanced'.

From now on only queue-leader-locator values 'client-local', and
'balanced' should be set.

'balanced' will place the leader on the node with the least leaders if
there are few queues and will select a random leader if there are many
queues.
This avoid expensive least leaders calculation if there are many queues.

This change also allows us to change the implementation of 'balanced' in
the future. For example 'balanced' could place a leader on a node
depending on resource usage or available node resources.

There is no need to expose implementation details like 'random' or
'least-leaders' as configuration to users.
This commit is contained in:
David Ansari 2022-04-11 09:43:50 +02:00
parent 9180f5f085
commit f32e80c01c
7 changed files with 146 additions and 250 deletions

View File

@ -168,6 +168,9 @@ validate_policy0(<<"max-age">>, Value) ->
validate_policy0(<<"queue-leader-locator">>, <<"client-local">>) ->
ok;
validate_policy0(<<"queue-leader-locator">>, <<"balanced">>) ->
ok;
%% 'random' and 'least-leaders' are deprecated and get mapped to 'balanced'
validate_policy0(<<"queue-leader-locator">>, <<"random">>) ->
ok;
validate_policy0(<<"queue-leader-locator">>, <<"least-leaders">>) ->

View File

@ -12,9 +12,9 @@
-export([queue_leader_locators/0,
select_leader_and_followers/2]).
-define(QUEUES_LIMIT_FOR_LEAST_REPLICAS_SELECTION, 1_000).
-define(QUEUE_LEADER_LOCATORS, [<<"client-local">>, <<"random">>, <<"least-leaders">>]).
-define(DEFAULT_QUEUE_LEADER_LOCATOR, <<"client-local">>).
-define(QUEUE_LEADER_LOCATORS_DEPRECATED, [<<"random">>, <<"least-leaders">>]).
-define(QUEUE_LEADER_LOCATORS, [<<"client-local">>, <<"balanced">>] ++ ?QUEUE_LEADER_LOCATORS_DEPRECATED).
-define(QUEUE_COUNT_START_RANDOM_SELECTION, 1_000).
-type queue_leader_locator() :: nonempty_binary().
@ -27,98 +27,100 @@ queue_leader_locators() ->
{Leader :: node(), Followers :: [node()]}.
select_leader_and_followers(Q, Size)
when (?amqqueue_is_quorum(Q) orelse ?amqqueue_is_stream(Q)) andalso is_integer(Size) ->
{AllNodes, _DiscNodes, RunningNodes} = rabbit_mnesia:cluster_nodes(status),
true = lists:member(node(), AllNodes),
QueueType = amqqueue:get_type(Q),
GetQueues0 = get_queues_for_type(QueueType),
{AllNodes, _DiscNodes, RunningNodes} = rabbit_mnesia:cluster_nodes(status),
{Replicas, GetQueues} = select_replicas(Size, AllNodes, RunningNodes, GetQueues0),
QueueCount = rabbit_amqqueue:count(),
{Replicas, GetQueues} = select_replicas(Size, AllNodes, RunningNodes, QueueCount, GetQueues0),
LeaderLocator = leader_locator(Q),
Leader = leader_node(LeaderLocator, Replicas, RunningNodes, GetQueues),
Leader = leader_node(LeaderLocator, Replicas, RunningNodes, QueueCount, GetQueues),
Followers = lists:delete(Leader, Replicas),
{Leader, Followers}.
-spec select_replicas(pos_integer(), [node(),...], [node(),...], function()) ->
{[node(),...], function()}.
select_replicas(Size, AllNodes, _, Fun)
when length(AllNodes) =< Size ->
{AllNodes, Fun};
select_replicas(Size, AllNodes, RunningNodes, GetQueues) ->
%% Select nodes in the following order:
%% 1. Local node to have data locality for declaring client.
%% 2. Running nodes.
%% 3.1. If there are few queues: Nodes with least replicas to have a "balanced" RabbitMQ cluster.
%% 3.2. If there are many queues: Randomly to avoid expensive calculation of counting replicas
%% per node. Random replica selection is good enough for most use cases.
Local = node(),
true = lists:member(Local, AllNodes),
RemoteNodes = lists:delete(Local, AllNodes),
case rabbit_amqqueue:count() of
Count when Count =< ?QUEUES_LIMIT_FOR_LEAST_REPLICAS_SELECTION ->
Counters0 = maps:from_list([{N, 0} || N <- RemoteNodes]),
Queues = GetQueues(),
Counters = lists:foldl(fun(Q, Acc) ->
#{nodes := Nodes} = amqqueue:get_type_state(Q),
lists:foldl(fun(N, A)
when is_map_key(N, A) ->
maps:update_with(N, fun(C) -> C+1 end, A);
(_, A) ->
A
end, Acc, Nodes)
end, Counters0, Queues),
L0 = maps:to_list(Counters),
L1 = lists:sort(fun({N0, C0}, {N1, C1}) ->
case {lists:member(N0, RunningNodes),
lists:member(N1, RunningNodes)} of
{true, false} ->
true;
{false, true} ->
false;
_ ->
C0 =< C1
end
end, L0),
{L2, _} = lists:split(Size - 1, L1),
L = lists:map(fun({N, _}) -> N end, L2),
{[Local | L], fun() -> Queues end};
_ ->
L0 = shuffle(RemoteNodes),
L1 = lists:sort(fun(X, _Y) ->
lists:member(X, RunningNodes)
end, L0),
{L, _} = lists:split(Size - 1, L1),
{[Local | L], GetQueues}
end.
-spec leader_locator(amqqueue:amqqueue()) ->
queue_leader_locator().
leader_locator(Q) ->
case rabbit_queue_type_util:args_policy_lookup(
<<"queue-leader-locator">>,
fun (PolVal, _ArgVal) -> PolVal end,
Q) of
undefined ->
case application:get_env(rabbit, queue_leader_locator) of
{ok, Locator} ->
case lists:member(Locator, ?QUEUE_LEADER_LOCATORS) of
true ->
Locator;
false ->
?DEFAULT_QUEUE_LEADER_LOCATOR
end;
undefined ->
?DEFAULT_QUEUE_LEADER_LOCATOR
end;
Val ->
Val
end.
L = case rabbit_queue_type_util:args_policy_lookup(
<<"queue-leader-locator">>,
fun (PolVal, _ArgVal) -> PolVal end,
Q) of
undefined ->
application:get_env(rabbit, queue_leader_locator, undefined);
Val ->
Val
end,
leader_locator0(L).
-spec leader_node(queue_leader_locator(), [node(),...], [node(),...], function()) ->
leader_locator0(<<"client-local">>) ->
<<"client-local">>;
leader_locator0(<<"balanced">>) ->
<<"balanced">>;
%% 'random' and 'least-leaders' are deprecated
leader_locator0(<<"random">>) ->
<<"balanced">>;
leader_locator0(<<"least-leaders">>) ->
<<"balanced">>;
leader_locator0(_) ->
%% default
<<"client-local">>.
-spec select_replicas(pos_integer(), [node(),...], [node(),...], non_neg_integer(), function()) ->
{[node(),...], function()}.
select_replicas(Size, AllNodes, _, _, Fun)
when length(AllNodes) =< Size ->
{AllNodes, Fun};
%% Select nodes in the following order:
%% 1. Local node to have data locality for declaring client.
%% 2. Running nodes.
%% 3.1. If there are many queues: Randomly to avoid expensive calculation of counting replicas
%% per node. Random replica selection is good enough for most use cases.
%% 3.2. If there are few queues: Nodes with least replicas to have a "balanced" RabbitMQ cluster.
select_replicas(Size, AllNodes, RunningNodes, QueueCount, GetQueues)
when QueueCount >= ?QUEUE_COUNT_START_RANDOM_SELECTION ->
L0 = shuffle(lists:delete(node(), AllNodes)),
L1 = lists:sort(fun(X, _Y) ->
lists:member(X, RunningNodes)
end, L0),
{L, _} = lists:split(Size - 1, L1),
{[node() | L], GetQueues};
select_replicas(Size, AllNodes, RunningNodes, _, GetQueues) ->
Counters0 = maps:from_list([{N, 0} || N <- lists:delete(node(), AllNodes)]),
Queues = GetQueues(),
Counters = lists:foldl(fun(Q, Acc) ->
#{nodes := Nodes} = amqqueue:get_type_state(Q),
lists:foldl(fun(N, A)
when is_map_key(N, A) ->
maps:update_with(N, fun(C) -> C+1 end, A);
(_, A) ->
A
end, Acc, Nodes)
end, Counters0, Queues),
L0 = maps:to_list(Counters),
L1 = lists:sort(fun({N0, C0}, {N1, C1}) ->
case {lists:member(N0, RunningNodes),
lists:member(N1, RunningNodes)} of
{true, false} ->
true;
{false, true} ->
false;
_ ->
C0 =< C1
end
end, L0),
{L2, _} = lists:split(Size - 1, L1),
L = lists:map(fun({N, _}) -> N end, L2),
{[node() | L], fun() -> Queues end}.
-spec leader_node(queue_leader_locator(), [node(),...], [node(),...], non_neg_integer(), function()) ->
node().
leader_node(<<"client-local">>, _, _, _) ->
leader_node(<<"client-local">>, _, _, _, _) ->
node();
leader_node(<<"random">>, Nodes0, RunningNodes, _) ->
leader_node(<<"balanced">>, Nodes0, RunningNodes, QueueCount, _)
when QueueCount >= ?QUEUE_COUNT_START_RANDOM_SELECTION ->
Nodes = potential_leaders(Nodes0, RunningNodes),
lists:nth(rand:uniform(length(Nodes)), Nodes);
leader_node(<<"least-leaders">>, Nodes0, RunningNodes, GetQueues)
leader_node(<<"balanced">>, Nodes0, RunningNodes, _, GetQueues)
when is_function(GetQueues, 0) ->
Nodes = potential_leaders(Nodes0, RunningNodes),
Counters0 = maps:from_list([{N, 0} || N <- Nodes]),
@ -146,8 +148,6 @@ potential_leaders(Replicas, RunningNodes) ->
RunningReplicas ->
case rabbit_maintenance:filter_out_drained_nodes_local_read(RunningReplicas) of
[] ->
%% All selected replica nodes are drained. Let's place the leader on a
%% drained node respecting the requested queue-leader-locator streategy.
RunningReplicas;
Filtered ->
Filtered

View File

@ -72,10 +72,8 @@ groups() ->
file_handle_reservations_above_limit,
node_removal_is_not_quorum_critical,
leader_locator_client_local,
leader_locator_least_leaders,
leader_locator_least_leaders_maintenance,
leader_locator_random,
leader_locator_random_maintenance,
leader_locator_balanced,
leader_locator_balanced_maintenance,
leader_locator_policy
]
++ all_tests()},
@ -2624,7 +2622,7 @@ leader_locator_client_local(Config) ->
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
end || Server <- Servers].
leader_locator_least_leaders(Config) ->
leader_locator_balanced(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Qs = [?config(queue_name, Config),
@ -2635,7 +2633,7 @@ leader_locator_least_leaders(Config) ->
?assertMatch({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q,
[{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-queue-leader-locator">>, longstr, <<"least-leaders">>}])),
{<<"x-queue-leader-locator">>, longstr, <<"balanced">>}])),
{ok, _, {_, Leader}} = ra:members({ra_name(Q), Server}),
Leader
end || Q <- Qs],
@ -2645,7 +2643,7 @@ leader_locator_least_leaders(Config) ->
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
|| Q <- Qs].
leader_locator_least_leaders_maintenance(Config) ->
leader_locator_balanced_maintenance(Config) ->
[S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, S1),
Qs = [?config(queue_name, Config),
@ -2657,7 +2655,7 @@ leader_locator_least_leaders_maintenance(Config) ->
?assertMatch({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q,
[{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-queue-leader-locator">>, longstr, <<"least-leaders">>}])),
{<<"x-queue-leader-locator">>, longstr, <<"balanced">>}])),
{ok, _, {_, Leader}} = ra:members({ra_name(Q), S1}),
Leader
end || Q <- Qs],
@ -2670,44 +2668,6 @@ leader_locator_least_leaders_maintenance(Config) ->
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
|| Q <- Qs].
leader_locator_random(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config),
Leaders = [begin
?assertMatch({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q,
[{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-queue-leader-locator">>, longstr, <<"random">>}])),
{ok, _, {_, Leader}} = ra:members({ra_name(Q), Server}),
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
Leader
end || _ <- lists:seq(1, 15)],
?assertEqual(3, sets:size(sets:from_list(Leaders))).
leader_locator_random_maintenance(Config) ->
[S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, S1),
Q = ?config(queue_name, Config),
rabbit_ct_broker_helpers:mark_as_being_drained(Config, S2),
Leaders = [begin
?assertMatch({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q,
[{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-queue-leader-locator">>, longstr, <<"random">>}])),
{ok, _, {_, Leader}} = ra:members({ra_name(Q), S1}),
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
Leader
end || _ <- lists:seq(1, 12)],
?assert(lists:member(S1, Leaders)),
?assertNot(lists:member(S2, Leaders)),
?assert(lists:member(S3, Leaders)),
rabbit_ct_broker_helpers:unmark_as_being_drained(Config, S2).
leader_locator_policy(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
@ -2716,7 +2676,7 @@ leader_locator_policy(Config) ->
<<"leader_locator_policy_q3">>],
ok = rabbit_ct_broker_helpers:set_policy(
Config, 0, <<"my-leader-locator">>, <<"leader_locator_policy_.*">>, <<"queues">>,
[{<<"queue-leader-locator">>, <<"least-leaders">>}]),
[{<<"queue-leader-locator">>, <<"balanced">>}]),
Leaders = [begin
?assertMatch({'queue.declare_ok', Q, 0, 0},

View File

@ -52,10 +52,8 @@ groups() ->
publish_coordinator_unavailable,
leader_locator_policy,
queue_size_on_declare,
leader_locator_random_maintenance,
leader_locator_least_leaders_maintenance,
leader_locator_random,
leader_locator_least_leaders,
leader_locator_balanced,
leader_locator_balanced_maintenance,
select_nodes_with_least_replicas
]},
{cluster_size_3_1, [], [shrink_coordinator_cluster]},
@ -734,7 +732,7 @@ declare_with_node_down_2(Config) ->
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-initial-cluster-size">>, long, 2},
{<<"x-queue-leader-locator">>, longstr, <<"random">>}])),
{<<"x-queue-leader-locator">>, longstr, <<"balanced">>}])),
check_leader_and_replicas(Config, [Server1, Server3]),
%% Since there are sufficient running nodes, we expect that
%% stopped nodes are not selected as replicas.
@ -1852,82 +1850,51 @@ leader_locator_client_local(Config) ->
amqp_channel:call(Ch3, #'queue.delete'{queue = Q})),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
leader_locator_random(Config) ->
[Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"random">>}])),
Info = find_queue_info(Config, [leader]),
Leader = proplists:get_value(leader, Info),
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
repeat_until(
fun() ->
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"random">>}])),
Info2 = find_queue_info(Config, [leader]),
Leader2 = proplists:get_value(leader, Info2),
Leader =/= Leader2
end, 10),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
leader_locator_random_maintenance(Config) ->
[S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, S1),
Q = ?config(queue_name, Config),
rabbit_ct_broker_helpers:mark_as_being_drained(Config, S2),
[begin
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"random">>}])),
Info = find_queue_info(Config, [leader]),
Leader = proplists:get_value(leader, Info),
?assert(lists:member(Leader, [S1, S3])),
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
end || _ <- lists:seq(1, 7)],
rabbit_ct_broker_helpers:unmark_as_being_drained(Config, S2).
leader_locator_least_leaders(Config) ->
leader_locator_balanced(Config) ->
[Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q = ?config(queue_name, Config),
Bin = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
Q1 = <<Bin/binary, "_q1">>,
Q2 = <<Bin/binary, "_q2">>,
?assertEqual({'queue.declare_ok', Q1, 0, 0},
declare(Ch, Q1, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
?assertEqual({'queue.declare_ok', Q2, 0, 0},
declare(Ch, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"least-leaders">>}])),
{<<"x-queue-leader-locator">>, longstr, <<"balanced">>}])),
Info = find_queue_info(Config, [leader]),
Leader = proplists:get_value(leader, Info),
?assert(lists:member(Leader, [Server2, Server3])),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, [[Q2, Q1, Q]]).
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, [[Q1, Q]]).
leader_locator_balanced_maintenance(Config) ->
[Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
Q = ?config(queue_name, Config),
Q1 = <<"q1">>,
Q2 = <<"q2">>,
?assertEqual({'queue.declare_ok', Q1, 0, 0},
declare(Ch1, Q1, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
?assertEqual({'queue.declare_ok', Q2, 0, 0},
declare(Ch2, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
rabbit_ct_broker_helpers:mark_as_being_drained(Config, Server3),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"balanced">>}])),
Info = find_queue_info(Config, [leader]),
Leader = proplists:get_value(leader, Info),
?assert(lists:member(Leader, [Server1, Server2])),
rabbit_ct_broker_helpers:unmark_as_being_drained(Config, Server3),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
select_nodes_with_least_replicas(Config) ->
[Server1 | _ ] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@ -1954,68 +1921,34 @@ select_nodes_with_least_replicas(Config) ->
lists:usort(Q1Members ++ QMembers)),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, [Qs]).
leader_locator_least_leaders_maintenance(Config) ->
[Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
Q = ?config(queue_name, Config),
Q1 = <<"q1">>,
Q2 = <<"q2">>,
?assertEqual({'queue.declare_ok', Q1, 0, 0},
declare(Ch1, Q1, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
?assertEqual({'queue.declare_ok', Q2, 0, 0},
declare(Ch2, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
rabbit_ct_broker_helpers:mark_as_being_drained(Config, Server3),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"least-leaders">>}])),
Info = find_queue_info(Config, [leader]),
Leader = proplists:get_value(leader, Info),
?assert(lists:member(Leader, [Server1, Server2])),
rabbit_ct_broker_helpers:unmark_as_being_drained(Config, Server3),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
leader_locator_policy(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q = ?config(queue_name, Config),
Bin = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
Q1 = <<Bin/binary, "_q1">>,
ok = rabbit_ct_broker_helpers:set_policy(
Config, 0, <<"leader-locator">>, <<"leader_locator_.*">>, <<"queues">>,
[{<<"queue-leader-locator">>, <<"random">>}]),
Config, 0, <<"my-leader-locator">>, Q, <<"queues">>,
[{<<"queue-leader-locator">>, <<"balanced">>}]),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q1, 0, 0},
declare(Ch, Q1, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
Info = find_queue_info(Config, [policy, operator_policy, effective_policy_definition, leader]),
?assertEqual(<<"leader-locator">>, proplists:get_value(policy, Info)),
?assertEqual(<<"my-leader-locator">>, proplists:get_value(policy, Info)),
?assertEqual('', proplists:get_value(operator_policy, Info)),
?assertEqual([{<<"queue-leader-locator">>, <<"random">>}],
?assertEqual([{<<"queue-leader-locator">>, <<"balanced">>}],
proplists:get_value(effective_policy_definition, Info)),
Leader = proplists:get_value(leader, Info),
?assert(lists:member(Leader, [Server2, Server3])),
repeat_until(
fun() ->
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
Info2 = find_queue_info(Config, [leader]),
Leader2 = proplists:get_value(leader, Info2),
Leader =/= Leader2
end, 10),
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"leader-locator">>),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"my-leader-locator">>),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, [[Q1, Q]]).
queue_size_on_declare(Config) ->
[Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

View File

@ -237,7 +237,7 @@ var HELP = {
'Set the queue into master location mode, determining the rule by which the queue master is located when declared on a cluster of nodes.<br/>(Sets the "<a target="_blank" href="https://www.rabbitmq.com/ha.html">x-queue-master-locator</a>" argument.)',
'queue-leader-locator':
'Set the rule by which the queue leader is located when declared on a cluster of nodes. Valid values are <code>client-local</code> (default), <code>random</code> and <code>least-leaders</code>.',
'Set the rule by which the queue leader is located when declared on a cluster of nodes. Valid values are <code>client-local</code> (default) and <code>balanced</code>.',
'queue-initial-cluster-size':
'Set the queue initial cluster size.',

View File

@ -99,6 +99,9 @@ validate_stream_arguments(#{stream_max_segment_size_bytes := Value} =
validate_stream_arguments(#{leader_locator := <<"client-local">>} =
Opts) ->
validate_stream_arguments(maps:remove(leader_locator, Opts));
validate_stream_arguments(#{leader_locator := <<"balanced">>} = Opts) ->
validate_stream_arguments(maps:remove(leader_locator, Opts));
%% 'random' and 'least-leaders' are deprecated and get mapped to 'balanced'
validate_stream_arguments(#{leader_locator := <<"random">>} = Opts) ->
validate_stream_arguments(maps:remove(leader_locator, Opts));
validate_stream_arguments(#{leader_locator := <<"least-leaders">>} =
@ -107,7 +110,7 @@ validate_stream_arguments(#{leader_locator := <<"least-leaders">>} =
validate_stream_arguments(#{leader_locator := _}) ->
{validation_failure,
"Invalid value for --leader-locator, valid values "
"are client-local, random, least-leaders."};
"are client-local, balanced."};
validate_stream_arguments(#{initial_cluster_size := Value} = Opts) ->
try
case rabbit_data_coercion:to_integer(Value) of
@ -160,8 +163,7 @@ usage_additional() ->
"example values: 500mb, 1gb."],
["--leader-locator <leader-locator>",
"Leader locator strategy for partition streams, "
"possible values are client-local, least-leaders, "
"random."],
"possible values are client-local, balanced."],
["--initial-cluster-size <initial-cluster-size>",
"The initial cluster size of partition streams."]].

View File

@ -169,9 +169,7 @@ validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long,
validate_stream_queue_arguments([{<<"x-queue-leader-locator">>,
longstr, Locator}
| T]) ->
case lists:member(Locator,
[<<"client-local">>, <<"random">>, <<"least-leaders">>])
of
case lists:member(Locator, rabbit_queue_location:queue_leader_locators()) of
true ->
validate_stream_queue_arguments(T);
false ->