From 9b4e97b377ea3ca5301aca6df88011c012501620 Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Fri, 3 Oct 2025 16:37:51 +0200 Subject: [PATCH] Shovel: tests delete after with queue rejections --- .../test/amqp091_dynamic_SUITE.erl | 30 ++++++++++++++ .../test/amqp10_dynamic_SUITE.erl | 40 +++++++++++++++++++ .../test/local_dynamic_SUITE.erl | 29 ++++++++++++++ 3 files changed, 99 insertions(+) diff --git a/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl index b2ada51afd..ab765d5cbd 100644 --- a/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl @@ -38,6 +38,7 @@ groups() -> restart, change_definition, autodelete, + autodelete_with_rejections, validation, security_validation, get_connection_name, @@ -535,6 +536,35 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) -> expect_count(Ch, <<"src">>, <<"hello">>, ExpSrc) end. +autodelete_with_rejections(Config) -> + Src = <<"src">>, + Dest = <<"dst">>, + Args = [{<<"x-max-length">>, long, 5}, + {<<"x-overflow">>, longstr, <<"reject-publish">>}], + with_ch(Config, + fun (Ch) -> + amqp_channel:call(Ch, #'queue.declare'{queue = Dest, + durable = true, + arguments = Args}), + shovel_test_utils:set_param(Config, <<"test">>, + [{<<"src-protocol">>, <<"local">>}, + {<<"src-queue">>, Src}, + {<<"src-delete-after">>, 10}, + {<<"dest-protocol">>, <<"local">>}, + {<<"dest-predeclared">>, true}, + {<<"dest-queue">>, Dest} + ]), + publish_count(Ch, <<>>, Src, <<"hello">>, 10), + await_autodelete(Config, <<"test">>), + Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]), + eventually( + ?_assertMatch( + Expected, + lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, + ["list_queues", "name", "messages_ready", "--no-table-headers"])))) + end). + validation(Config) -> URIs = [{<<"src-uri">>, <<"amqp://">>}, {<<"dest-uri">>, <<"amqp://">>}], diff --git a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl index e537620c89..955080d3e0 100644 --- a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl @@ -9,6 +9,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile(export_all). -import(shovel_test_utils, [await_credit/1]). @@ -28,6 +29,7 @@ groups() -> autodelete_amqp091_src_on_publish, autodelete_amqp091_dest_on_confirm, autodelete_amqp091_dest_on_publish, + autodelete_with_rejections, simple_amqp10_dest, simple_amqp10_src, amqp091_to_amqp10_with_dead_lettering, @@ -77,6 +79,7 @@ init_per_testcase(Testcase, Config0) -> rabbit_ct_helpers:testcase_started(Config, Testcase). end_per_testcase(Testcase, Config) -> + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_all_queues, []), rabbit_ct_helpers:testcase_finished(Config, Testcase). %% ------------------------------------------------------------------- @@ -344,6 +347,36 @@ autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) -> expect_count(Session, Src, ExpSrc) end. +autodelete_with_rejections(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_session( + Config, + fun (Sess) -> + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Sess, <<"my link pair">>), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, Dest, + #{arguments =>#{<<"x-max-length">> => {uint, 5}, + <<"x-overflow">> => {utf8, <<"reject-publish">>}}}), + + shovel_test_utils:set_param(Config, <<"test">>, + [{<<"src-protocol">>, <<"local">>}, + {<<"src-queue">>, Src}, + {<<"src-delete-after">>, 10}, + {<<"dest-protocol">>, <<"local">>}, + {<<"dest-predeclared">>, true}, + {<<"dest-queue">>, Dest} + ]), + publish_count(Sess, Src, <<"hello">>, 10), + await_autodelete(Config, <<"test">>), + Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]), + ?awaitMatch( + Expected, + lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, + ["list_queues", "name", "messages_ready", "--no-table-headers"])), + 30_000) + end). + test_amqp10_delete_after_queue_length(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), @@ -512,3 +545,10 @@ await_autodelete1(_Config, Name) -> shovels_from_parameters() -> L = rabbit_runtime_parameters:list(<<"/">>, <<"shovel">>), [rabbit_misc:pget(name, Shovel) || Shovel <- L]. + +delete_all_queues() -> + Queues = rabbit_amqqueue:list(), + lists:foreach( + fun(Q) -> + {ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) + end, Queues). diff --git a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl index 94e8acac49..86f0d83b45 100644 --- a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl @@ -50,6 +50,7 @@ groups() -> local_to_local_delete_after_queue_length, local_to_local_delete_after_queue_length_zero, local_to_local_delete_after_number, + local_to_local_delete_after_with_rejections, local_to_local_no_ack, local_to_local_quorum_no_ack, local_to_local_stream_no_ack, @@ -600,6 +601,34 @@ local_to_local_delete_after_number(Config) -> expect_none(Sess, Dest) end). +local_to_local_delete_after_with_rejections(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + VHost = <<"/">>, + declare_queue(Config, VHost, Dest, [{<<"x-max-length">>, long, 5}, + {<<"x-overflow">>, longstr, <<"reject-publish">>}]), + with_session(Config, + fun (Sess) -> + shovel_test_utils:set_param(Config, ?PARAM, + [{<<"src-protocol">>, <<"local">>}, + {<<"src-queue">>, Src}, + {<<"src-delete-after">>, 10}, + {<<"dest-protocol">>, <<"local">>}, + {<<"dest-predeclared">>, true}, + {<<"dest-queue">>, Dest} + ]), + publish_many(Sess, Src, Dest, <<"tag1">>, 10), + ?awaitMatch(not_found, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, lookup, [<<"/">>, <<"shovel">>, ?PARAM]), 30_000), + Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]), + ?awaitMatch( + Expected, + lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, + ["list_queues", "name", "messages_ready", "--no-table-headers"])), + 30_000) + + end). + local_to_local_no_ack(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config),