diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 9099415835..320ab69961 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -1025,13 +1025,10 @@ apply_leader_locator_strategy(#{leader_locator_strategy := <<"client-local">>} = Conf; apply_leader_locator_strategy(#{leader_node := Leader, replica_nodes := Replicas0, - leader_locator_strategy := <<"random">>, - name := StreamId} = Conf) -> + leader_locator_strategy := <<"random">>} = Conf) -> Replicas = [Leader | Replicas0], - ClusterSize = length(Replicas), - Hash = erlang:phash2(StreamId), - Pos = (Hash rem ClusterSize) + 1, - NewLeader = lists:nth(Pos, Replicas), + PotentialLeaders = potential_leaders(Replicas), + NewLeader = lists:nth(rand:uniform(length(PotentialLeaders)), PotentialLeaders), NewReplicas = lists:delete(NewLeader, Replicas), Conf#{leader_node => NewLeader, replica_nodes => NewReplicas}; @@ -1039,7 +1036,8 @@ apply_leader_locator_strategy(#{leader_node := Leader, replica_nodes := Replicas0, leader_locator_strategy := <<"least-leaders">>} = Conf) -> Replicas = [Leader | Replicas0], - Counters0 = maps:from_list([{R, 0} || R <- Replicas]), + PotentialLeaders = potential_leaders(Replicas), + Counters0 = maps:from_list([{R, 0} || R <- PotentialLeaders]), Counters = maps:to_list( lists:foldl(fun(Q, Acc) -> P = amqqueue:get_pid(Q), @@ -1059,6 +1057,16 @@ apply_leader_locator_strategy(#{leader_node := Leader, Conf#{leader_node => NewLeader, replica_nodes => NewReplicas}. +potential_leaders(Nodes) -> + case rabbit_maintenance:filter_out_drained_nodes_local_read(Nodes) of + [] -> + %% All nodes are drained. Let's place the leader on a drained node + %% respecting the requested queue-leader-locator streategy. + Nodes; + Filtered -> + Filtered + end. + select_first_matching_node([{N, _} | Rest], Replicas) -> case lists:member(N, Replicas) of true -> N; diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index c898c58144..08f75432fa 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -51,7 +51,9 @@ groups() -> add_replicas, publish_coordinator_unavailable, leader_locator_policy, - queue_size_on_declare + queue_size_on_declare, + leader_locator_random_maintenance, + leader_locator_least_leaders_maintenance ]}, {cluster_size_3_1, [], [shrink_coordinator_cluster]}, {cluster_size_3_2, [], [recover, @@ -1860,6 +1862,26 @@ leader_locator_random(Config) -> end, 10), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). +leader_locator_random_maintenance(Config) -> + [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, S1), + Q = ?config(queue_name, Config), + rabbit_ct_broker_helpers:mark_as_being_drained(Config, S2), + + [begin + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"random">>}])), + + Info = find_queue_info(Config, [leader]), + Leader = proplists:get_value(leader, Info), + ?assert(lists:member(Leader, [S1, S3])), + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})) + end || _ <- lists:seq(1, 7)], + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, S2). + leader_locator_least_leaders(Config) -> [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1885,6 +1907,31 @@ leader_locator_least_leaders(Config) -> ?assert(lists:member(Leader, [Server2, Server3])), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). +leader_locator_least_leaders_maintenance(Config) -> + [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1), + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2), + Q = ?config(queue_name, Config), + Q1 = <<"q1">>, + Q2 = <<"q2">>, + ?assertEqual({'queue.declare_ok', Q1, 0, 0}, + declare(Ch1, Q1, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), + ?assertEqual({'queue.declare_ok', Q2, 0, 0}, + declare(Ch2, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), + rabbit_ct_broker_helpers:mark_as_being_drained(Config, Server3), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"least-leaders">>}])), + + Info = find_queue_info(Config, [leader]), + Leader = proplists:get_value(leader, Info), + ?assert(lists:member(Leader, [Server1, Server2])), + + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, Server3), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). + leader_locator_policy(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),