Merge pull request #14149 from rabbitmq/mergify/bp/v4.1.x/pr-14145
Trigger a 4.1.x alpha release build / trigger_alpha_build (push) Has been cancelled Details
Test (make) / Build and Xref (1.17, 26) (push) Has been cancelled Details
Test (make) / Build and Xref (1.17, 27) (push) Has been cancelled Details
Test (make) / Test (1.17, 27, khepri) (push) Has been cancelled Details
Test (make) / Test (1.17, 27, mnesia) (push) Has been cancelled Details
Test (make) / Test mixed clusters (1.17, 27, khepri) (push) Has been cancelled Details
Test (make) / Test mixed clusters (1.17, 27, mnesia) (push) Has been cancelled Details
Test (make) / Type check (1.17, 27) (push) Has been cancelled Details

QQ: fix SAC activation bug (backport #14145)
This commit is contained in:
Michael Klishin 2025-06-27 21:28:40 +04:00 committed by GitHub
commit 8283fa7b55
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 173 additions and 12 deletions

View File

@ -317,7 +317,8 @@ apply(Meta, #modify{consumer_key = ConsumerKey,
_ ->
{State, ok}
end;
apply(#{index := Idx} = Meta,
apply(#{index := Idx,
machine_version := MacVer} = Meta,
#requeue{consumer_key = ConsumerKey,
msg_id = MsgId,
index = OldIdx,
@ -344,7 +345,13 @@ apply(#{index := Idx} = Meta,
Messages),
enqueue_count = EnqCount + 1},
State2 = update_or_remove_con(Meta, ConsumerKey, Con, State1),
checkout(Meta, State0, State2, []);
{State3, Effects} = case MacVer >= 7 of
true ->
activate_next_consumer({State2, []});
false ->
{State2, []}
end,
checkout(Meta, State0, State3, Effects);
_ ->
{State00, ok, []}
end;
@ -923,7 +930,7 @@ get_checked_out(CKey, From, To, #?STATE{consumers = Consumers}) ->
end.
-spec version() -> pos_integer().
version() -> 6.
version() -> 7.
which_module(0) -> rabbit_fifo_v0;
which_module(1) -> rabbit_fifo_v1;
@ -931,7 +938,8 @@ which_module(2) -> rabbit_fifo_v3;
which_module(3) -> rabbit_fifo_v3;
which_module(4) -> ?MODULE;
which_module(5) -> ?MODULE;
which_module(6) -> ?MODULE.
which_module(6) -> ?MODULE;
which_module(7) -> ?MODULE.
-define(AUX, aux_v3).
@ -1747,8 +1755,8 @@ maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg,
{duplicate, State0, Effects0}
end.
return(#{} = Meta, ConsumerKey, MsgIds, IncrDelCount, Anns,
Checked, Effects0, State0)
return(#{machine_version := MacVer} = Meta, ConsumerKey,
MsgIds, IncrDelCount, Anns, Checked, Effects0, State0)
when is_map(Anns) ->
%% We requeue in the same order as messages got returned by the client.
{State1, Effects1} =
@ -1768,7 +1776,13 @@ return(#{} = Meta, ConsumerKey, MsgIds, IncrDelCount, Anns,
_ ->
State1
end,
checkout(Meta, State0, State2, Effects1).
{State3, Effects2} = case MacVer >= 7 of
true ->
activate_next_consumer({State2, Effects1});
false ->
{State2, Effects1}
end,
checkout(Meta, State0, State3, Effects2).
% used to process messages that are finished
complete(Meta, ConsumerKey, [MsgId],
@ -2798,7 +2812,10 @@ convert(Meta, 4, To, State) ->
convert(Meta, 5, To, State);
convert(Meta, 5, To, State) ->
%% no conversion needed, this version only includes a logic change
convert(Meta, 6, To, State).
convert(Meta, 6, To, State);
convert(Meta, 6, To, State) ->
%% no conversion needed, this version only includes a logic change
convert(Meta, 7, To, State).
smallest_raft_index(#?STATE{messages = Messages,
ra_indexes = Indexes,

View File

@ -95,6 +95,8 @@ groups() ->
format,
add_member_2,
single_active_consumer_priority_take_over,
single_active_consumer_priority_take_over_return,
single_active_consumer_priority_take_over_requeue,
single_active_consumer_priority,
force_shrink_member_to_current_member,
force_all_queues_shrink_member_to_current_member,
@ -1139,6 +1141,72 @@ single_active_consumer_priority_take_over(Config) ->
?DEFAULT_AWAIT),
ok.
single_active_consumer_priority_take_over_return(Config) ->
single_active_consumer_priority_take_over_base(20, Config).
single_active_consumer_priority_take_over_requeue(Config) ->
single_active_consumer_priority_take_over_base(-1, Config).
single_active_consumer_priority_take_over_base(DelLimit, Config) ->
check_quorum_queues_v4_compat(Config),
[Server0, Server1, _Server2] = Nodes =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
MinMacVers = lists:min([V || {ok, V} <-
erpc:multicall(Nodes, rabbit_fifo, version, [])]),
if MinMacVers < 7 ->
throw({skip, "single_active_consumer_priority_take_over_base needs a higher machine verison"});
true ->
ok
end,
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
QName = ?config(queue_name, Config),
Q1 = <<QName/binary, "_1">>,
RaNameQ1 = binary_to_atom(<<"%2F", "_", Q1/binary>>, utf8),
QueryFun = fun rabbit_fifo:query_single_active_consumer/1,
Args = [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-delivery-limit">>, long, DelLimit},
{<<"x-single-active-consumer">>, bool, true}],
?assertEqual({'queue.declare_ok', Q1, 0, 0}, declare(Ch1, Q1, Args)),
ok = subscribe(Ch1, Q1, false, <<"ch1-ctag1">>, [{"x-priority", byte, 1}]),
?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _},
rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun])),
#'confirm.select_ok'{} = amqp_channel:call(Ch2, #'confirm.select'{}),
publish_confirm(Ch2, Q1),
%% higher priority consumer attaches
ok = subscribe(Ch2, Q1, false, <<"ch2-ctag1">>, [{"x-priority", byte, 3}]),
%% Q1 should still have Ch1 as consumer as it has pending messages
?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _},
rpc:call(Server0, ra, local_query,
[RaNameQ1, QueryFun])),
%% ack the message
receive
{#'basic.deliver'{consumer_tag = <<"ch1-ctag1">>,
delivery_tag = DeliveryTag}, _} ->
amqp_channel:cast(Ch1, #'basic.nack'{delivery_tag = DeliveryTag})
after ?TIMEOUT ->
flush(1),
exit(basic_deliver_timeout)
end,
?awaitMatch({ok, {_, {value, {<<"ch2-ctag1">>, _}}}, _},
rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun]),
?DEFAULT_AWAIT),
receive
{#'basic.deliver'{consumer_tag = <<"ch2-ctag1">>,
delivery_tag = DeliveryTag2}, _} ->
amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag2})
after ?TIMEOUT ->
flush(1),
exit(basic_deliver_timeout_2)
end,
ok.
single_active_consumer_priority(Config) ->
check_quorum_queues_v4_compat(Config),
[Server0, Server1, Server2] =

View File

@ -42,12 +42,12 @@ groups() ->
].
init_per_group(tests, Config) ->
[{machine_version, 5} | Config];
[{machine_version, rabbit_fifo:version()} | Config];
init_per_group(machine_version_conversion, Config) ->
Config.
init_per_testcase(_Testcase, Config) ->
FF = ?config(machine_version, Config) == 5,
FF = ?config(machine_version, Config) == rabbit_fifo:version(),
ok = meck:new(rabbit_feature_flags, [passthrough]),
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> FF end),
Config.
@ -1932,6 +1932,83 @@ single_active_consumer_higher_waiting_disconnected_test(Config) ->
ok.
single_active_consumer_higher_waiting_return_test(Config) ->
S0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue, ?FUNCTION_NAME_B),
single_active_consumer_on => true}),
Pid1 = test_util:fake_pid(node()),
C1Pid = test_util:fake_pid(n1@banana),
C2Pid = test_util:fake_pid(n2@banana),
% % adding some consumers
{CK1, C1} = {?LINE, {?LINE_B, C1Pid}},
{CK2, C2} = {?LINE, {?LINE_B, C2Pid}},
Entries =
[
%% add a consumer
{CK1, make_checkout(C1, {auto, {simple_prefetch, 1}}, #{priority => 1})},
?ASSERT(#rabbit_fifo{consumers = #{CK1 := #consumer{status = up}},
waiting_consumers = []}),
%% enqueue a message
{?LINE , rabbit_fifo:make_enqueue(Pid1, 1, msg1)},
%% add a consumer with a higher priority, current is quiescing
{CK2, make_checkout(C2, {auto, {simple_prefetch, 1}}, #{priority => 2})},
?ASSERT(#rabbit_fifo{consumers = #{CK1 := #consumer{status = quiescing}},
waiting_consumers = [{CK2, _}]}),
%% C1 returns message
{?LINE, rabbit_fifo:make_return(CK1, [0])},
%% C2 should activated
?ASSERT(#rabbit_fifo{consumers = #{CK2 := #consumer{status = up,
checked_out = Ch,
credit = 0}},
waiting_consumers = [_]} when map_size(Ch) == 1)
],
{_S1, _} = run_log(Config, S0, Entries, fun single_active_invariant/1),
ok.
single_active_consumer_higher_waiting_requeue_test(Config) ->
S0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue, ?FUNCTION_NAME_B),
single_active_consumer_on => true}),
Pid1 = test_util:fake_pid(node()),
C1Pid = test_util:fake_pid(n1@banana),
C2Pid = test_util:fake_pid(n2@banana),
% % adding some consumers
{CK1, C1} = {?LINE, {?LINE_B, C1Pid}},
EnqIdx = ?LINE,
RequeueIdx = ?LINE,
{CK2, C2} = {?LINE, {?LINE_B, C2Pid}},
Entries =
[
%% add a consumer
{CK1, make_checkout(C1, {auto, {simple_prefetch, 1}}, #{priority => 1})},
?ASSERT(#rabbit_fifo{consumers = #{CK1 := #consumer{status = up}},
waiting_consumers = []}),
%% enqueue a message
{EnqIdx , rabbit_fifo:make_enqueue(Pid1, 1, msg1)},
%% add a consumer with a higher priority, current is quiescing
{CK2, make_checkout(C2, {auto, {simple_prefetch, 1}}, #{priority => 2})},
?ASSERT(#rabbit_fifo{consumers = #{CK1 := #consumer{status = quiescing}},
waiting_consumers = [{CK2, _}]}),
%% C1 returns message
% {?LINE, rabbit_fifo:make_requeue(CK1, [0])},
{RequeueIdx , element(2, hd(rabbit_fifo:make_requeue(CK1, {notify, 1, self()},
[{0, EnqIdx, 0, msg1}], [])))},
%% C2 should activated
?ASSERT(#rabbit_fifo{consumers = #{CK2 := #consumer{status = up,
checked_out = Ch,
credit = 0}},
waiting_consumers = [_]} when map_size(Ch) == 1)
],
{_S1, _} = run_log(Config, S0, Entries, fun single_active_invariant/1),
ok.
single_active_consumer_quiescing_disconnected_test(Config) ->
S0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue, ?FUNCTION_NAME_B),
@ -2455,8 +2532,7 @@ machine_version_test(C) ->
consumers = #{Cid := #consumer{cfg = #consumer_cfg{priority = 0}}},
service_queue = S,
messages = Msgs}, ok,
[_|_]} = apply(meta(C, Idx),
{machine_version, 0, 2}, S1),
[_|_]} = apply(meta(C, Idx), {machine_version, 0, 2}, S1),
%% validate message conversion to lqueue
?assertEqual(1, lqueue:len(Msgs)),
?assert(priority_queue:is_queue(S)),