Local shovels: fix handling of acks/nacks from multiple queues

This commit is contained in:
Diana Parra Corbacho 2025-09-02 12:01:10 +02:00
parent 8a116500e0
commit 41d52835bf
3 changed files with 223 additions and 31 deletions

View File

@ -128,7 +128,7 @@ connect_dest(State = #{dest := Dest = #{resource_decl := {M, F, MFArgs},
State#{dest => Dest#{current => #{queue_states => QState,
delivery_id => 1,
vhost => VHost},
unconfirmed => rabbit_confirms:init(),
unconfirmed => rabbit_shovel_confirms:init(),
rejected => [],
rejected_count => 0,
confirmed => [],
@ -136,7 +136,7 @@ connect_dest(State = #{dest := Dest = #{resource_decl := {M, F, MFArgs},
_ ->
State#{dest => Dest#{current => #{queue_states => QState,
vhost => VHost},
unconfirmed => rabbit_confirms:init(),
unconfirmed => rabbit_shovel_confirms:init(),
confirmed => [],
confirmed_count => 0,
rejected => [],
@ -361,8 +361,8 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} =
{#{}, State0}
end,
Msg = set_annotations(Msg0, Dest),
QNames = route(Msg, Dest),
Queues = rabbit_amqqueue:lookup_many(QNames),
RoutedQNames = route(Msg, Dest),
Queues = rabbit_amqqueue:lookup_many(RoutedQNames),
case rabbit_queue_type:deliver(Queues, Msg, Options, QState) of
{ok, QState1, Actions} ->
State1 = State#{dest => Dest1#{current => Current1#{queue_states => QState1}}},
@ -374,12 +374,15 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} =
on_publish ->
rabbit_shovel_behaviour:decr_remaining(
1,
record_confirms([{Tag, none}], State2));
record_confirms([{Tag, Tag}], State2));
_ ->
rabbit_shovel_behaviour:decr_remaining(1, State2)
end),
MsgSeqNo = maps:get(correlation, Options, undefined),
State4 = process_routing_confirm(MsgSeqNo, QNames, State3),
QNames = lists:map(fun({QName, _}) -> QName;
(QName) -> QName
end, RoutedQNames),
State4 = process_routing_confirm(MsgSeqNo, Tag, QNames, State3),
send_confirms_and_nacks(handle_dest_queue_actions(Actions, State4));
{error, Reason} ->
exit({shutdown, Reason})
@ -470,7 +473,7 @@ handle_dest_queue_actions(Actions, State) ->
{U, Rej} =
lists:foldr(
fun(SeqNo, {U1, Acc}) ->
case rabbit_confirms:reject(SeqNo, U1) of
case rabbit_shovel_confirms:reject(SeqNo, U1) of
{ok, MX, U2} ->
{U2, [MX | Acc]};
{error, not_found} ->
@ -736,15 +739,14 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
at_least_one_credit_req_in_flight => false}}
end.
process_routing_confirm(undefined, _, State) ->
process_routing_confirm(undefined, _, _, State) ->
State;
process_routing_confirm(MsgSeqNo, [], State)
process_routing_confirm(MsgSeqNo, Tag, [], State)
when is_integer(MsgSeqNo) ->
record_confirms([{MsgSeqNo, none}], State);
process_routing_confirm(MsgSeqNo, QRefs, #{dest := Dst = #{unconfirmed := Unconfirmed}} = State) when is_integer(MsgSeqNo) ->
XName = rabbit_misc:r(<<"/">>, exchange, <<>>),
record_confirms([{MsgSeqNo, Tag}], State);
process_routing_confirm(MsgSeqNo, Tag, QRefs, #{dest := Dst = #{unconfirmed := Unconfirmed}} = State) when is_integer(MsgSeqNo) ->
State#{dest => Dst#{unconfirmed =>
rabbit_confirms:insert(MsgSeqNo, QRefs, XName, Unconfirmed)}}.
rabbit_shovel_confirms:insert(MsgSeqNo, QRefs, Tag, Unconfirmed)}}.
record_confirms([], State) ->
State;
@ -765,7 +767,7 @@ record_rejects(MXs, State = #{dest := Dst = #{rejected := R,
rejected_count => RC + Num}}).
confirm(MsgSeqNos, QRef, State = #{dest := Dst = #{unconfirmed := UC}}) ->
{ConfirmMXs, UC1} = rabbit_confirms:confirm(MsgSeqNos, QRef, UC),
{ConfirmMXs, UC1} = rabbit_shovel_confirms:confirm(MsgSeqNos, QRef, UC),
record_confirms(ConfirmMXs, State#{dest => Dst#{unconfirmed => UC1}}).
send_nacks([], _, State) ->
@ -789,9 +791,9 @@ send_confirms(Cs, Rs, State) ->
coalesce_and_send(MsgSeqNos, NegativeMsgSeqNos, MkMsgFun,
State = #{dest := #{unconfirmed := UC}}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
UnconfirmedCutoff = case rabbit_confirms:is_empty(UC) of
UnconfirmedCutoff = case rabbit_shovel_confirms:is_empty(UC) of
true -> lists:last(SMsgSeqNos) + 1;
false -> rabbit_confirms:smallest(UC)
false -> rabbit_shovel_confirms:smallest(UC)
end,
Cutoff = lists:min([UnconfirmedCutoff | NegativeMsgSeqNos]),
{Ms, Ss} = lists:splitwith(fun(X) -> X < Cutoff end, SMsgSeqNos),
@ -810,15 +812,15 @@ send_confirms_and_nacks(State = #{dest := #{confirmed := [],
send_confirms_and_nacks(State = #{dest := Dst = #{confirmed := C,
rejected := R}}) ->
Confirms = lists:append(C),
ConfirmMsgSeqNos = [MsgSeqNo || {MsgSeqNo, _} <- Confirms],
ConfirmTags = [Tag || {_, Tag} <- Confirms],
Rejects = lists:append(R),
RejectMsgSeqNos = [MsgSeqNo || {MsgSeqNo, _} <- Rejects],
RejectTags = [Tag || {_, Tag} <- Rejects],
State1 = #{dest := Dst2}
= send_confirms(ConfirmMsgSeqNos,
RejectMsgSeqNos,
= send_confirms(ConfirmTags,
RejectTags,
State#{dest => Dst#{confirmed => [],
confirmed_count => 0}}),
send_nacks(RejectMsgSeqNos,
ConfirmMsgSeqNos,
send_nacks(RejectTags,
ConfirmTags,
State1#{dest => Dst2#{rejected => [],
rejected_count => 0}}).

View File

@ -0,0 +1,148 @@
-module(rabbit_shovel_confirms).
-compile({no_auto_import, [size/1]}).
-include_lib("rabbit_common/include/rabbit.hrl").
-export([init/0,
insert/4,
confirm/3,
reject/2,
remove_queue/2,
smallest/1,
size/1,
is_empty/1]).
-type seq_no() :: non_neg_integer().
-type delivery_tag() :: non_neg_integer().
-type queue_name() :: rabbit_amqqueue:name().
-record(?MODULE, {smallest :: undefined | seq_no(),
unconfirmed = #{} :: #{seq_no() =>
{delivery_tag(),
#{queue_name() => ok}}}
}).
-type mx() :: {seq_no(), delivery_tag()}.
-opaque state() :: #?MODULE{}.
-export_type([
state/0
]).
-spec init() -> state().
init() ->
#?MODULE{}.
-spec insert(seq_no(), [queue_name()], delivery_tag(), state()) ->
state().
insert(SeqNo, QNames, Tag,
#?MODULE{smallest = S0,
unconfirmed = U0} = State)
when is_integer(SeqNo)
andalso is_list(QNames)
andalso not is_map_key(SeqNo, U0) ->
U = U0#{SeqNo => {Tag, maps:from_keys(QNames, ok)}},
S = case S0 of
undefined -> SeqNo;
_ -> S0
end,
State#?MODULE{smallest = S,
unconfirmed = U}.
-spec confirm([seq_no()], queue_name(), state()) ->
{[mx()], state()}.
confirm(SeqNos, QName, #?MODULE{smallest = Smallest0,
unconfirmed = U0} = State)
when is_list(SeqNos) ->
{Confirmed, ConfirmedSmallest, U} =
lists:foldl(
fun (SeqNo, Acc) ->
confirm_one(SeqNo, QName, Smallest0, Acc)
end, {[], false, U0}, SeqNos),
Smallest = case ConfirmedSmallest of
true ->
%% work out new smallest
next_smallest(Smallest0, U);
false ->
Smallest0
end,
{Confirmed, State#?MODULE{smallest = Smallest,
unconfirmed = U}}.
-spec reject(seq_no(), state()) ->
{ok, mx(), state()} | {error, not_found}.
reject(SeqNo, #?MODULE{smallest = Smallest0,
unconfirmed = U0} = State)
when is_integer(SeqNo) ->
case maps:take(SeqNo, U0) of
{{Tag, _QS}, U} ->
Smallest = case SeqNo of
Smallest0 ->
%% need to scan as the smallest was removed
next_smallest(Smallest0, U);
_ ->
Smallest0
end,
{ok, {SeqNo, Tag}, State#?MODULE{unconfirmed = U,
smallest = Smallest}};
error ->
{error, not_found}
end.
%% idempotent
-spec remove_queue(queue_name(), state()) ->
{[mx()], state()}.
remove_queue(QName, #?MODULE{unconfirmed = U} = State) ->
SeqNos = maps:fold(
fun (SeqNo, {_Tag, QS0}, Acc) ->
case maps:is_key(QName, QS0) of
true ->
[SeqNo | Acc];
false ->
Acc
end
end, [], U),
confirm(lists:sort(SeqNos), QName,State).
-spec smallest(state()) -> seq_no() | undefined.
smallest(#?MODULE{smallest = Smallest}) ->
Smallest.
-spec size(state()) -> non_neg_integer().
size(#?MODULE{unconfirmed = U}) ->
maps:size(U).
-spec is_empty(state()) -> boolean().
is_empty(State) ->
size(State) == 0.
%% INTERNAL
confirm_one(SeqNo, QName, Smallest, {Acc, ConfirmedSmallest0, U0}) ->
case maps:take(SeqNo, U0) of
{{Tag, QS}, U1}
when is_map_key(QName, QS)
andalso map_size(QS) == 1 ->
%% last queue confirm
ConfirmedSmallest = case SeqNo of
Smallest -> true;
_ -> ConfirmedSmallest0
end,
{[{SeqNo, Tag} | Acc], ConfirmedSmallest, U1};
{{Tag, QS}, U1} ->
{Acc, ConfirmedSmallest0, U1#{SeqNo => {Tag, maps:remove(QName, QS)}}};
error ->
{Acc, ConfirmedSmallest0, U0}
end.
next_smallest(_S, U) when map_size(U) == 0 ->
undefined;
next_smallest(S, U) when is_map_key(S, U) ->
S;
next_smallest(S, U) ->
%% TODO: this is potentially infinitely recursive if called incorrectly
next_smallest(S+1, U).

View File

@ -27,7 +27,8 @@ groups() ->
[
{tests, [], [
local_to_local_dest_down,
local_to_local_multiple_dest_down,
local_to_local_multiple_all_dest_down,
local_to_local_multiple_some_dest_down,
local_to_local_no_destination
]}
].
@ -120,7 +121,7 @@ local_to_local_dest_down(Config) ->
expect_many(Sess, Dest, 10)
end).
local_to_local_multiple_dest_down(Config) ->
local_to_local_multiple_all_dest_down(Config) ->
Src = ?config(srcq, Config),
Dest = ?config(destq, Config),
Dest2 = ?config(destq2, Config),
@ -139,16 +140,57 @@ local_to_local_multiple_dest_down(Config) ->
]),
ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
publish_many(Sess, Src, Dest, <<"tag1">>, 10),
?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, 0, 0, 0],
[<<"local_to_local_multiple_dest_down_dest2">>, 0, 0, 0],
[<<"local_to_local_multiple_dest_down_src">>, 10, _, _]],
?awaitMatch([[<<"local_to_local_multiple_all_dest_down_dest">>, 0, 0, 0],
[<<"local_to_local_multiple_all_dest_down_dest2">>, 0, 0, 0],
[<<"local_to_local_multiple_all_dest_down_src">>, 10, _, _]],
list_queue_messages(Config),
30000),
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, N, N, 0],
[<<"local_to_local_multiple_dest_down_dest2">>, M, M, 0],
[<<"local_to_local_multiple_dest_down_src">>, 0, 0, 0]]
when ((N >= 10) and (M >= 10)),
?awaitMatch([[<<"local_to_local_multiple_all_dest_down_dest">>, 10, 10, 0],
[<<"local_to_local_multiple_all_dest_down_dest2">>, 10, 10, 0],
[<<"local_to_local_multiple_all_dest_down_src">>, 0, 0, 0]],
list_queue_messages(Config),
30000),
expect_many(Sess, Dest, 10)
end).
local_to_local_multiple_some_dest_down(Config) ->
Src = ?config(srcq, Config),
Dest = ?config(destq, Config),
Dest2 = ?config(destq2, Config),
declare_queue(Config, 0, <<"/">>, Src),
%% Declare each destination queue in a different node. Just one of
%% them will be down, but this still means the message can't be confirmed
%% and should be requeued.
declare_and_bind_queue(Config, 1, <<"/">>, <<"amq.fanout">>, Dest, Dest),
declare_and_bind_queue(Config, 2, <<"/">>, <<"amq.fanout">>, Dest2, Dest2),
with_session(
Config,
fun (Sess) ->
shovel_test_utils:set_param(Config, ?PARAM,
[{<<"src-protocol">>, <<"local">>},
{<<"src-queue">>, Src},
{<<"dest-protocol">>, <<"local">>},
{<<"dest-exchange">>, <<"amq.fanout">>},
{<<"dest-exchange-key">>, <<"">>}
]),
ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
publish_many(Sess, Src, Dest, <<"tag1">>, 10),
%% Messages won't be confirmed to source until all destination
%% queues are able to confirm them, until them we keep retrying
%% This generates multiple duplicates, but that's how publishing
%% works.
?awaitMatch([[<<"local_to_local_multiple_some_dest_down_dest">>, 0, 0, 0],
[<<"local_to_local_multiple_some_dest_down_dest2">>, M, M, 0],
[<<"local_to_local_multiple_some_dest_down_src">>, 10, _, _]]
when (M > 10),
list_queue_messages(Config),
30000),
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
?awaitMatch([[<<"local_to_local_multiple_some_dest_down_dest">>, N, N, 0],
[<<"local_to_local_multiple_some_dest_down_dest2">>, M, M, 0],
[<<"local_to_local_multiple_some_dest_down_src">>, 0, 0, 0]]
when ((N == 10) and (M >= 10)),
list_queue_messages(Config),
30000),
expect_many(Sess, Dest, 10)