Local shovels: ack messages not routed to any queue using exchanges

This commit is contained in:
Diana Parra Corbacho 2025-08-26 15:54:44 +02:00
parent 1c72316d2e
commit 68b98bfd3b
2 changed files with 50 additions and 18 deletions

View File

@ -360,9 +360,13 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current,
case AckMode of case AckMode of
no_ack -> no_ack ->
rabbit_shovel_behaviour:decr_remaining(1, State2); rabbit_shovel_behaviour:decr_remaining(1, State2);
on_confirm -> on_confirm when length(Queues) > 0 ->
Correlation = maps:get(correlation, Options), Correlation = maps:get(correlation, Options),
State2#{dest => Dst1#{unacked => Unacked#{Correlation => Tag}}}; State2#{dest => Dst1#{unacked => Unacked#{Correlation => Tag}}};
on_confirm ->
%% Drop the messages as 0.9.1, no destination available
State3 = rabbit_shovel_behaviour:ack(Tag, false, State2),
rabbit_shovel_behaviour:decr_remaining(1, State3);
on_publish -> on_publish ->
State3 = rabbit_shovel_behaviour:ack(Tag, false, State2), State3 = rabbit_shovel_behaviour:ack(Tag, false, State2),
rabbit_shovel_behaviour:decr_remaining(1, State3) rabbit_shovel_behaviour:decr_remaining(1, State3)

View File

@ -27,7 +27,8 @@ groups() ->
[ [
{tests, [], [ {tests, [], [
local_to_local_dest_down, local_to_local_dest_down,
local_to_local_multiple_dest_down local_to_local_multiple_dest_down,
local_to_local_no_destination
]} ]}
]. ].
@ -40,7 +41,7 @@ init_per_suite(Config0) ->
rabbit_ct_helpers:log_environment(), rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config0, [ Config1 = rabbit_ct_helpers:set_config(Config0, [
{rmq_nodename_suffix, ?MODULE}, {rmq_nodename_suffix, ?MODULE},
{rmq_nodes_count, 2}, {rmq_nodes_count, 3},
{rmq_nodes_clustered, true}, {rmq_nodes_clustered, true},
{ignored_crashes, [ {ignored_crashes, [
"server_initiated_close,404", "server_initiated_close,404",
@ -105,13 +106,13 @@ local_to_local_dest_down(Config) ->
]), ]),
ok = rabbit_ct_broker_helpers:stop_node(Config, 1), ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
publish_many(Sess, Src, Dest, <<"tag1">>, 10), publish_many(Sess, Src, Dest, <<"tag1">>, 10),
?awaitMatch([[<<"local_to_local_dest_down_dest">>, <<>>, <<>>, <<>>], ?awaitMatch([[<<"local_to_local_dest_down_dest">>, 0, 0, 0],
[<<"local_to_local_dest_down_src">>, <<"10">>, _, _]], [<<"local_to_local_dest_down_src">>, 10, _, _]],
list_queue_messages(Config), list_queue_messages(Config),
30000), 30000),
ok = rabbit_ct_broker_helpers:start_node(Config, 1), ok = rabbit_ct_broker_helpers:start_node(Config, 1),
?awaitMatch([[<<"local_to_local_dest_down_dest">>, <<"10">>, <<"10">>, <<"0">>], ?awaitMatch([[<<"local_to_local_dest_down_dest">>, 10, 10, 0],
[<<"local_to_local_dest_down_src">>, <<"0">>, <<"0">>, <<"0">>]], [<<"local_to_local_dest_down_src">>, 0, 0, 0]],
list_queue_messages(Config), list_queue_messages(Config),
30000), 30000),
expect_many(Sess, Dest, 10) expect_many(Sess, Dest, 10)
@ -132,30 +133,57 @@ local_to_local_multiple_dest_down(Config) ->
{<<"src-queue">>, Src}, {<<"src-queue">>, Src},
{<<"dest-protocol">>, <<"local">>}, {<<"dest-protocol">>, <<"local">>},
{<<"dest-exchange">>, <<"amq.fanout">>}, {<<"dest-exchange">>, <<"amq.fanout">>},
{<<"dest-exchange-key">>, Dest} {<<"dest-exchange-key">>, <<"">>}
]), ]),
ok = rabbit_ct_broker_helpers:stop_node(Config, 1), ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
publish_many(Sess, Src, Dest, <<"tag1">>, 10), publish_many(Sess, Src, Dest, <<"tag1">>, 10),
?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, <<>>, <<>>, <<>>], ?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, 0, 0, 0],
[<<"local_to_local_multiple_dest_down_dest2">>, <<>>, <<>>, <<>>], [<<"local_to_local_multiple_dest_down_dest2">>, 0, 0, 0],
[<<"local_to_local_multiple_dest_down_src">>, <<"10">>, _, _]], [<<"local_to_local_multiple_dest_down_src">>, 10, _, _]],
list_queue_messages(Config), list_queue_messages(Config),
30000), 30000),
ok = rabbit_ct_broker_helpers:start_node(Config, 1), ok = rabbit_ct_broker_helpers:start_node(Config, 1),
?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, <<"10">>, <<"10">>, <<"0">>], ?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, N, N, 0],
[<<"local_to_local_multiple_dest_down_dest2">>, <<"10">>, <<"10">>, <<"0">>], [<<"local_to_local_multiple_dest_down_dest2">>, M, M, 0],
[<<"local_to_local_multiple_dest_down_src">>, <<"0">>, <<"0">>, <<"0">>]], [<<"local_to_local_multiple_dest_down_src">>, 0, 0, 0]]
when ((N >= 10) and (M >= 10)),
list_queue_messages(Config), list_queue_messages(Config),
30000), 30000),
expect_many(Sess, Dest, 10) expect_many(Sess, Dest, 10)
end). end).
local_to_local_no_destination(Config) ->
Src = ?config(srcq, Config),
Dest = ?config(destq, Config),
declare_queue(Config, 0, <<"/">>, Src),
with_session(
Config,
fun (Sess) ->
shovel_test_utils:set_param(Config, ?PARAM,
[{<<"src-protocol">>, <<"local">>},
{<<"src-queue">>, Src},
{<<"dest-protocol">>, <<"local">>},
{<<"dest-exchange">>, <<"amq.fanout">>},
{<<"dest-exchange-key">>, Dest}
]),
publish_many(Sess, Src, Dest, <<"tag1">>, 10),
?awaitMatch([[<<"local_to_local_no_destination_src">>, 0, 0, 0]],
list_queue_messages(Config),
30000)
end).
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
list_queue_messages(Config) -> list_queue_messages(Config) ->
lists:sort( [[N, to_int(M), to_int(MR), to_int(MU)]
rabbit_ct_broker_helpers:rabbitmqctl_list( || [N, M, MR, MU] <- lists:sort(
Config, 0, rabbit_ct_broker_helpers:rabbitmqctl_list(
["list_queues", "name", "messages", "messages_ready", "messages_unacknowledged", "--no-table-headers"])). Config, 0,
["list_queues", "name", "messages", "messages_ready", "messages_unacknowledged", "--no-table-headers"]))].
to_int(<<>>) ->
0;
to_int(Int) ->
binary_to_integer(Int).
with_session(Config, Fun) -> with_session(Config, Fun) ->
with_session(Config, <<"/">>, Fun). with_session(Config, <<"/">>, Fun).