Local shovels: Handle multiple rejects on node down

This commit is contained in:
Diana Parra Corbacho 2025-08-26 12:22:59 +02:00
parent 0174f59c4e
commit 1c72316d2e
2 changed files with 48 additions and 10 deletions

View File

@ -325,7 +325,7 @@ handle_dest({{'DOWN', #resource{kind = queue,
{ok, QState1, Actions} ->
State1 = State0#{dest => Dest#{current => Current#{queue_states => QState1}}},
handle_dest_queue_actions(Actions, State1);
{eol, QState1, QRef} ->
{eol, QState1, _QRef} ->
State0#{dest => Dest#{current => Current#{queue_states => QState1}}}
end;
handle_dest(_Msg, State) ->
@ -652,11 +652,15 @@ confirm_to_inbound(ConfirmFun, SeqNos, State)
end, State, SeqNos);
confirm_to_inbound(ConfirmFun, Seq,
State0 = #{dest := #{unacked := Unacked} = Dst}) ->
#{Seq := InTag} = Unacked,
Unacked1 = maps:remove(Seq, Unacked),
State = rabbit_shovel_behaviour:decr_remaining(
1, State0#{dest => Dst#{unacked => Unacked1}}),
ConfirmFun(InTag, State).
case Unacked of
#{Seq := InTag} ->
Unacked1 = maps:remove(Seq, Unacked),
State = rabbit_shovel_behaviour:decr_remaining(
1, State0#{dest => Dst#{unacked => Unacked1}}),
ConfirmFun(InTag, State);
_ ->
State0
end.
sent_delivery(#{source := #{delivery_count := DeliveryCount0,
credit := Credit0} = Src

View File

@ -26,7 +26,8 @@ all() ->
groups() ->
[
{tests, [], [
local_to_local_dest_down
local_to_local_dest_down,
local_to_local_multiple_dest_down
]}
].
@ -105,7 +106,7 @@ local_to_local_dest_down(Config) ->
ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
publish_many(Sess, Src, Dest, <<"tag1">>, 10),
?awaitMatch([[<<"local_to_local_dest_down_dest">>, <<>>, <<>>, <<>>],
[<<"local_to_local_dest_down_src">>, <<"10">>, <<"0">>, <<"10">>]],
[<<"local_to_local_dest_down_src">>, <<"10">>, _, _]],
list_queue_messages(Config),
30000),
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
@ -116,6 +117,39 @@ local_to_local_dest_down(Config) ->
expect_many(Sess, Dest, 10)
end).
local_to_local_multiple_dest_down(Config) ->
Src = ?config(srcq, Config),
Dest = ?config(destq, Config),
Dest2 = ?config(destq2, Config),
declare_queue(Config, 0, <<"/">>, Src),
declare_and_bind_queue(Config, 1, <<"/">>, <<"amq.fanout">>, Dest, Dest),
declare_and_bind_queue(Config, 1, <<"/">>, <<"amq.fanout">>, Dest2, Dest2),
with_session(
Config,
fun (Sess) ->
shovel_test_utils:set_param(Config, ?PARAM,
[{<<"src-protocol">>, <<"local">>},
{<<"src-queue">>, Src},
{<<"dest-protocol">>, <<"local">>},
{<<"dest-exchange">>, <<"amq.fanout">>},
{<<"dest-exchange-key">>, Dest}
]),
ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
publish_many(Sess, Src, Dest, <<"tag1">>, 10),
?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, <<>>, <<>>, <<>>],
[<<"local_to_local_multiple_dest_down_dest2">>, <<>>, <<>>, <<>>],
[<<"local_to_local_multiple_dest_down_src">>, <<"10">>, _, _]],
list_queue_messages(Config),
30000),
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, <<"10">>, <<"10">>, <<"0">>],
[<<"local_to_local_multiple_dest_down_dest2">>, <<"10">>, <<"10">>, <<"0">>],
[<<"local_to_local_multiple_dest_down_src">>, <<"0">>, <<"0">>, <<"0">>]],
list_queue_messages(Config),
30000),
expect_many(Sess, Dest, 10)
end).
%%----------------------------------------------------------------------------
list_queue_messages(Config) ->
lists:sort(
@ -211,8 +245,8 @@ declare_queue(Config, Node, VHost, QName, Args) ->
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn).
declare_and_bind_queue(Config, VHost, Exchange, QName, RoutingKey) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
declare_and_bind_queue(Config, Node, VHost, Exchange, QName, RoutingKey) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node, VHost),
{ok, Ch} = amqp_connection:open_channel(Conn),
?assertEqual(
{'queue.declare_ok', QName, 0, 0},