Filter out drained nodes when selecting stream leader

This commit is contained in:
David Ansari 2022-04-06 13:05:14 +02:00
parent f4503fb8d8
commit f903ef95cc
2 changed files with 63 additions and 8 deletions

View File

@ -1025,13 +1025,10 @@ apply_leader_locator_strategy(#{leader_locator_strategy := <<"client-local">>} =
Conf;
apply_leader_locator_strategy(#{leader_node := Leader,
replica_nodes := Replicas0,
leader_locator_strategy := <<"random">>,
name := StreamId} = Conf) ->
leader_locator_strategy := <<"random">>} = Conf) ->
Replicas = [Leader | Replicas0],
ClusterSize = length(Replicas),
Hash = erlang:phash2(StreamId),
Pos = (Hash rem ClusterSize) + 1,
NewLeader = lists:nth(Pos, Replicas),
PotentialLeaders = potential_leaders(Replicas),
NewLeader = lists:nth(rand:uniform(length(PotentialLeaders)), PotentialLeaders),
NewReplicas = lists:delete(NewLeader, Replicas),
Conf#{leader_node => NewLeader,
replica_nodes => NewReplicas};
@ -1039,7 +1036,8 @@ apply_leader_locator_strategy(#{leader_node := Leader,
replica_nodes := Replicas0,
leader_locator_strategy := <<"least-leaders">>} = Conf) ->
Replicas = [Leader | Replicas0],
Counters0 = maps:from_list([{R, 0} || R <- Replicas]),
PotentialLeaders = potential_leaders(Replicas),
Counters0 = maps:from_list([{R, 0} || R <- PotentialLeaders]),
Counters = maps:to_list(
lists:foldl(fun(Q, Acc) ->
P = amqqueue:get_pid(Q),
@ -1059,6 +1057,16 @@ apply_leader_locator_strategy(#{leader_node := Leader,
Conf#{leader_node => NewLeader,
replica_nodes => NewReplicas}.
potential_leaders(Nodes) ->
case rabbit_maintenance:filter_out_drained_nodes_local_read(Nodes) of
[] ->
%% All nodes are drained. Let's place the leader on a drained node
%% respecting the requested queue-leader-locator streategy.
Nodes;
Filtered ->
Filtered
end.
select_first_matching_node([{N, _} | Rest], Replicas) ->
case lists:member(N, Replicas) of
true -> N;

View File

@ -51,7 +51,9 @@ groups() ->
add_replicas,
publish_coordinator_unavailable,
leader_locator_policy,
queue_size_on_declare
queue_size_on_declare,
leader_locator_random_maintenance,
leader_locator_least_leaders_maintenance
]},
{cluster_size_3_1, [], [shrink_coordinator_cluster]},
{cluster_size_3_2, [], [recover,
@ -1860,6 +1862,26 @@ leader_locator_random(Config) ->
end, 10),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
leader_locator_random_maintenance(Config) ->
[S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, S1),
Q = ?config(queue_name, Config),
rabbit_ct_broker_helpers:mark_as_being_drained(Config, S2),
[begin
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"random">>}])),
Info = find_queue_info(Config, [leader]),
Leader = proplists:get_value(leader, Info),
?assert(lists:member(Leader, [S1, S3])),
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
end || _ <- lists:seq(1, 7)],
rabbit_ct_broker_helpers:unmark_as_being_drained(Config, S2).
leader_locator_least_leaders(Config) ->
[Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@ -1885,6 +1907,31 @@ leader_locator_least_leaders(Config) ->
?assert(lists:member(Leader, [Server2, Server3])),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
leader_locator_least_leaders_maintenance(Config) ->
[Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
Q = ?config(queue_name, Config),
Q1 = <<"q1">>,
Q2 = <<"q2">>,
?assertEqual({'queue.declare_ok', Q1, 0, 0},
declare(Ch1, Q1, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
?assertEqual({'queue.declare_ok', Q2, 0, 0},
declare(Ch2, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
rabbit_ct_broker_helpers:mark_as_being_drained(Config, Server3),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"least-leaders">>}])),
Info = find_queue_info(Config, [leader]),
Leader = proplists:get_value(leader, Info),
?assert(lists:member(Leader, [Server1, Server2])),
rabbit_ct_broker_helpers:unmark_as_being_drained(Config, Server3),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
leader_locator_policy(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),