diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index 6e8cdfa3c3..a6285901af 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -594,7 +594,12 @@ decl_queue(QName, QArgs, VHost, User) -> Method = #'queue.declare'{queue = QName, durable = true, arguments = Args}, - decl_fun([Method], VHost, User). + try + decl_fun([#'queue.declare'{queue = QName, + passive = true}], VHost, User) + catch exit:{amqp_error, not_found, _, _} -> + decl_fun([Method], VHost, User) + end. dest_check_queue(none, _, _, _) -> ok; diff --git a/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl index 0706d0873c..ae8cdf47fa 100644 --- a/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl @@ -40,6 +40,7 @@ groups() -> restart, change_definition, autodelete, + autodelete_with_rejections, validation, security_validation, get_connection_name, @@ -519,6 +520,35 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) -> expect_count(Ch, <<"src">>, <<"hello">>, ExpSrc) end. +autodelete_with_rejections(Config) -> + Src = <<"src">>, + Dest = <<"dst">>, + Args = [{<<"x-max-length">>, long, 5}, + {<<"x-overflow">>, longstr, <<"reject-publish">>}], + with_ch(Config, + fun (Ch) -> + amqp_channel:call(Ch, #'queue.declare'{queue = Dest, + durable = true, + arguments = Args}), + shovel_test_utils:set_param(Config, <<"test">>, + [{<<"src-protocol">>, <<"local">>}, + {<<"src-queue">>, Src}, + {<<"src-delete-after">>, 10}, + {<<"dest-protocol">>, <<"local">>}, + {<<"dest-predeclared">>, true}, + {<<"dest-queue">>, Dest} + ]), + publish_count(Ch, <<>>, Src, <<"hello">>, 10), + await_autodelete(Config, <<"test">>), + Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]), + eventually( + ?_assertMatch( + Expected, + lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, + ["list_queues", "name", "messages_ready", "--no-table-headers"])))) + end). + validation(Config) -> URIs = [{<<"src-uri">>, <<"amqp://">>}, {<<"dest-uri">>, <<"amqp://">>}], diff --git a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl index 1933b758f6..7773cef146 100644 --- a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl @@ -9,6 +9,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile(export_all). -import(shovel_test_utils, [with_amqp10_session/2, @@ -16,9 +17,11 @@ amqp10_expect_empty/2, await_amqp10_event/3, amqp10_expect_one/2, amqp10_expect_count/3, amqp10_publish/4, - amqp10_publish_expect/5, + amqp10_publish_expect/5, amqp10_declare_queue/3, await_autodelete/2]). +-define(PARAM, <<"test">>). + all() -> [ {group, non_parallel_tests}, @@ -34,6 +37,7 @@ groups() -> autodelete_amqp091_src_on_publish, autodelete_amqp091_dest_on_confirm, autodelete_amqp091_dest_on_publish, + autodelete_with_rejections, simple_amqp10_dest, simple_amqp10_src, amqp091_to_amqp10_with_dead_lettering, @@ -83,6 +87,8 @@ init_per_testcase(Testcase, Config0) -> rabbit_ct_helpers:testcase_started(Config, Testcase). end_per_testcase(Testcase, Config) -> + shovel_test_utils:clear_param(Config, ?PARAM), + rabbit_ct_broker_helpers:rpc(Config, 0, shovel_test_utils, delete_all_queues, []), rabbit_ct_helpers:testcase_finished(Config, Testcase). %% ------------------------------------------------------------------- @@ -113,11 +119,9 @@ amqp091_to_amqp10_with_dead_lettering(Config) -> TmpQ = <<"tmp">>, with_amqp10_session(Config, fun (Sess) -> - {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Sess, <<"my link pair">>), - {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, TmpQ, - #{arguments =>#{<<"x-max-length">> => {uint, 0}, - <<"x-dead-letter-exchange">> => {utf8, <<"">>}, - <<"x-dead-letter-routing-key">> => {utf8, Src}}}), + amqp10_declare_queue(Sess, TmpQ, #{<<"x-max-length">> => {uint, 0}, + <<"x-dead-letter-exchange">> => {utf8, <<"">>}, + <<"x-dead-letter-routing-key">> => {utf8, Src}}), {ok, Sender} = amqp10_client:attach_sender_link(Sess, <<"sender-tmp">>, <<"/queues/", TmpQ/binary>>, @@ -132,7 +136,7 @@ amqp091_to_amqp10_with_dead_lettering(Config) -> test_amqp10_destination(Config, Src, Dest, Sess, Protocol, ProtocolSrc) -> MapConfig = ?config(map_config, Config), - shovel_test_utils:set_param(Config, <<"test">>, + shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, Protocol}, {ProtocolSrc, Src}, {<<"dest-protocol">>, <<"amqp10">>}, @@ -186,18 +190,18 @@ simple_amqp10_src(Config) -> fun (Sess) -> shovel_test_utils:set_param( Config, - <<"test">>, [{<<"src-protocol">>, <<"amqp10">>}, - {<<"src-address">>, Src}, - {<<"dest-protocol">>, <<"amqp091">>}, - {<<"dest-queue">>, Dest}, - {<<"add-forward-headers">>, true}, - {<<"dest-add-timestamp-header">>, true}, - {<<"publish-properties">>, - case MapConfig of - true -> #{<<"cluster_id">> => <<"x">>}; - _ -> [{<<"cluster_id">>, <<"x">>}] - end} - ]), + ?PARAM, [{<<"src-protocol">>, <<"amqp10">>}, + {<<"src-address">>, Src}, + {<<"dest-protocol">>, <<"amqp091">>}, + {<<"dest-queue">>, Dest}, + {<<"add-forward-headers">>, true}, + {<<"dest-add-timestamp-header">>, true}, + {<<"publish-properties">>, + case MapConfig of + true -> #{<<"cluster_id">> => <<"x">>}; + _ -> [{<<"cluster_id">>, <<"x">>}] + end} + ]), _Msg = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1), % the fidelity loss is quite high when consuming using the amqp10 % plugin. For example custom headers aren't current translated. @@ -213,18 +217,18 @@ amqp10_to_amqp091_application_properties(Config) -> fun (Sess) -> shovel_test_utils:set_param( Config, - <<"test">>, [{<<"src-protocol">>, <<"amqp10">>}, - {<<"src-address">>, Src}, - {<<"dest-protocol">>, <<"amqp091">>}, - {<<"dest-queue">>, Dest}, - {<<"add-forward-headers">>, true}, - {<<"dest-add-timestamp-header">>, true}, - {<<"publish-properties">>, - case MapConfig of - true -> #{<<"cluster_id">> => <<"x">>}; - _ -> [{<<"cluster_id">>, <<"x">>}] - end} - ]), + ?PARAM, [{<<"src-protocol">>, <<"amqp10">>}, + {<<"src-address">>, Src}, + {<<"dest-protocol">>, <<"amqp091">>}, + {<<"dest-queue">>, Dest}, + {<<"add-forward-headers">>, true}, + {<<"dest-add-timestamp-header">>, true}, + {<<"publish-properties">>, + case MapConfig of + true -> #{<<"cluster_id">> => <<"x">>}; + _ -> [{<<"cluster_id">>, <<"x">>}] + end} + ]), MsgSent = amqp10_msg:set_application_properties( #{<<"key">> => <<"value">>}, @@ -247,13 +251,13 @@ change_definition(Config) -> Dest2 = ?config(destq2, Config), with_amqp10_session(Config, fun (Sess) -> - shovel_test_utils:set_param(Config, <<"test">>, + shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-address">>, Src}, {<<"src-protocol">>, <<"amqp10">>}, {<<"dest-protocol">>, <<"amqp10">>}, {<<"dest-address">>, Dest}]), amqp10_publish_expect(Sess, Src, Dest, <<"hello1">>, 1), - shovel_test_utils:set_param(Config, <<"test">>, + shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-address">>, Src}, {<<"src-protocol">>, <<"amqp10">>}, {<<"dest-protocol">>, <<"amqp10">>}, @@ -296,14 +300,14 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) -> amqp10_publish(Session, Src, <<"hello">>, 100), shovel_test_utils:set_param_nowait( Config, - <<"test">>, [{<<"src-address">>, Src}, - {<<"src-protocol">>, <<"amqp10">>}, - {<<"src-delete-after">>, After}, - {<<"src-prefetch-count">>, 5}, - {<<"dest-address">>, Dest}, - {<<"dest-protocol">>, <<"amqp10">>}, - {<<"ack-mode">>, AckMode} - ]), + ?PARAM, [{<<"src-address">>, Src}, + {<<"src-protocol">>, <<"amqp10">>}, + {<<"src-delete-after">>, After}, + {<<"src-prefetch-count">>, 5}, + {<<"dest-address">>, Dest}, + {<<"dest-protocol">>, <<"amqp10">>}, + {<<"ack-mode">>, AckMode} + ]), await_autodelete(Config, <<"test">>), amqp10_expect_count(Session, Dest, ExpDest), amqp10_expect_count(Session, Src, ExpSrc) @@ -316,14 +320,14 @@ autodelete_amqp091_src(Config, {AckMode, After, ExpSrc, ExpDest}) -> amqp10_publish(Session, Src, <<"hello">>, 100), shovel_test_utils:set_param_nowait( Config, - <<"test">>, [{<<"src-queue">>, Src}, - {<<"src-protocol">>, <<"amqp091">>}, - {<<"src-delete-after">>, After}, - {<<"src-prefetch-count">>, 5}, - {<<"dest-address">>, Dest}, - {<<"dest-protocol">>, <<"amqp10">>}, - {<<"ack-mode">>, AckMode} - ]), + ?PARAM, [{<<"src-queue">>, Src}, + {<<"src-protocol">>, <<"amqp091">>}, + {<<"src-delete-after">>, After}, + {<<"src-prefetch-count">>, 5}, + {<<"dest-address">>, Dest}, + {<<"dest-protocol">>, <<"amqp10">>}, + {<<"ack-mode">>, AckMode} + ]), await_autodelete(Config, <<"test">>), amqp10_expect_count(Session, Dest, ExpDest), amqp10_expect_count(Session, Src, ExpSrc) @@ -336,19 +340,47 @@ autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) -> amqp10_publish(Session, Src, <<"hello">>, 100), shovel_test_utils:set_param_nowait( Config, - <<"test">>, [{<<"src-address">>, Src}, - {<<"src-protocol">>, <<"amqp10">>}, - {<<"src-delete-after">>, After}, - {<<"src-prefetch-count">>, 5}, - {<<"dest-queue">>, Dest}, - {<<"dest-protocol">>, <<"amqp091">>}, - {<<"ack-mode">>, AckMode} - ]), + ?PARAM, [{<<"src-address">>, Src}, + {<<"src-protocol">>, <<"amqp10">>}, + {<<"src-delete-after">>, After}, + {<<"src-prefetch-count">>, 5}, + {<<"dest-queue">>, Dest}, + {<<"dest-protocol">>, <<"amqp091">>}, + {<<"ack-mode">>, AckMode} + ]), await_autodelete(Config, <<"test">>), amqp10_expect_count(Session, Dest, ExpDest), amqp10_expect_count(Session, Src, ExpSrc) end. +autodelete_with_rejections(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp10_session( + Config, + fun (Sess) -> + amqp10_declare_queue(Sess, Dest, #{<<"x-max-length">> => {uint, 5}, + <<"x-overflow">> => {utf8, <<"reject-publish">>}}), + + shovel_test_utils:set_param(Config, ?PARAM, + [{<<"src-protocol">>, <<"local">>}, + {<<"src-queue">>, Src}, + {<<"src-delete-after">>, 10}, + {<<"dest-protocol">>, <<"local">>}, + {<<"dest-predeclared">>, true}, + {<<"dest-queue">>, Dest} + ]), + amqp10_publish(Sess, Src, <<"hello">>, 10), + await_autodelete(Config, <<"test">>), + Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]), + ?awaitMatch( + Expected, + lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, + ["list_queues", "name", "messages_ready", "--no-table-headers"])), + 30_000) + end). + test_amqp10_delete_after_queue_length(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), diff --git a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl index bd6a7b982d..0b58e45e2e 100644 --- a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl @@ -18,7 +18,7 @@ amqp10_expect_empty/2, amqp10_publish/4, amqp10_expect_one/2, amqp10_expect_count/3, amqp10_expect/3, - amqp10_publish_expect/5, + amqp10_publish_expect/5, amqp10_subscribe/2, await_autodelete/2]). -define(PARAM, <<"test">>). @@ -54,6 +54,7 @@ groups() -> local_to_local_delete_after_queue_length, local_to_local_delete_after_queue_length_zero, local_to_local_delete_after_number, + local_to_local_delete_after_with_rejections, local_to_local_no_ack, local_to_local_quorum_no_ack, local_to_local_stream_no_ack, @@ -586,6 +587,34 @@ local_to_local_delete_after_number(Config) -> amqp10_expect_empty(Sess, Dest) end). +local_to_local_delete_after_with_rejections(Config) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + VHost = <<"/">>, + declare_queue(Config, VHost, Dest, [{<<"x-max-length">>, long, 5}, + {<<"x-overflow">>, longstr, <<"reject-publish">>}]), + with_amqp10_session(Config, + fun (Sess) -> + shovel_test_utils:set_param(Config, ?PARAM, + [{<<"src-protocol">>, <<"local">>}, + {<<"src-queue">>, Src}, + {<<"src-delete-after">>, 10}, + {<<"dest-protocol">>, <<"local">>}, + {<<"dest-predeclared">>, true}, + {<<"dest-queue">>, Dest} + ]), + amqp10_publish(Sess, Src, <<"tag1">>, 10), + ?awaitMatch(not_found, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, lookup, [<<"/">>, <<"shovel">>, ?PARAM]), 30_000), + Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]), + ?awaitMatch( + Expected, + lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, + ["list_queues", "name", "messages_ready", "--no-table-headers"])), + 30_000) + + end). + local_to_local_no_ack(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), @@ -637,7 +666,7 @@ local_to_local_stream_no_ack(Config) -> {<<"dest-queue">>, Dest}, {<<"ack-mode">>, <<"no-ack">>} ]), - Receiver = subscribe(Sess, Dest), + Receiver = amqp10_subscribe(Sess, Dest), amqp10_publish(Sess, Src, <<"tag1">>, 10), ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 10}, _}], rabbit_ct_broker_helpers:rpc(Config, 0, @@ -699,7 +728,7 @@ local_to_local_stream_on_confirm(Config) -> {<<"dest-queue">>, Dest}, {<<"ack-mode">>, <<"on-confirm">>} ]), - Receiver = subscribe(Sess, Dest), + Receiver = amqp10_subscribe(Sess, Dest), amqp10_publish(Sess, Src, <<"tag1">>, 10), ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 10}, _}], rabbit_ct_broker_helpers:rpc(Config, 0, @@ -761,7 +790,7 @@ local_to_local_stream_on_publish(Config) -> {<<"dest-queue">>, Dest}, {<<"ack-mode">>, <<"on-publish">>} ]), - Receiver = subscribe(Sess, Dest), + Receiver = amqp10_subscribe(Sess, Dest), amqp10_publish(Sess, Src, <<"tag1">>, 10), ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 10}, _}], rabbit_ct_broker_helpers:rpc(Config, 0, @@ -1011,7 +1040,7 @@ local_to_local_stream_credit_flow(Config, AckMode) -> {<<"ack-mode">>, AckMode} ]), - Receiver = subscribe(Sess, Dest), + Receiver = amqp10_subscribe(Sess, Dest), amqp10_publish(Sess, Src, <<"tag1">>, 1000), ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 1000}, _}], rabbit_ct_broker_helpers:rpc(Config, 0, @@ -1067,14 +1096,6 @@ local_to_local_counters(Config) -> end). %%---------------------------------------------------------------------------- -subscribe(Session, Dest) -> - LinkName = <<"dynamic-receiver-", Dest/binary>>, - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName, - Dest, settled, - unsettled_state), - ok = amqp10_client:flow_link_credit(Receiver, 10, 1), - Receiver. - declare_queue(Config, VHost, QName) -> declare_queue(Config, VHost, QName, []). diff --git a/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl index d938128f46..6d946f8d67 100644 --- a/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl @@ -16,7 +16,8 @@ -import(shovel_test_utils, [set_param/3, with_amqp10_session/2, - amqp10_publish_expect/5]). + amqp10_publish_expect/5, + amqp10_declare_queue/3]). -define(PARAM, <<"test">>). @@ -48,7 +49,13 @@ groups() -> tests() -> [ - simple + simple, + simple_classic_no_ack, + simple_classic_on_confirm, + simple_classic_on_publish, + simple_quorum_no_ack, + simple_quorum_on_confirm, + simple_quorum_on_publish ]. %% ------------------------------------------------------------------- @@ -189,19 +196,49 @@ end_per_testcase(Testcase, Config) -> %% Testcases. %% ------------------------------------------------------------------- simple(Config) -> - Name = <<"test">>, Src = ?config(srcq, Config), Dest = ?config(destq, Config), with_amqp10_session( Config, fun (Sess) -> - set_param(Config, Name, ?config(shovel_args, Config)), + set_param(Config, ?PARAM, ?config(shovel_args, Config)), amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1), - Status = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, Name}]), + Status = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, ?PARAM}]), ?assertMatch([_|_], Status), ?assertMatch(#{metrics := #{forwarded := 1}}, maps:from_list(Status)) end). +simple_classic_no_ack(Config) -> + simple_queue_type_ack_mode(Config, <<"classic">>, <<"no-ack">>). + +simple_classic_on_confirm(Config) -> + simple_queue_type_ack_mode(Config, <<"classic">>, <<"on-confirm">>). + +simple_classic_on_publish(Config) -> + simple_queue_type_ack_mode(Config, <<"classic">>, <<"on-publish">>). + +simple_quorum_no_ack(Config) -> + simple_queue_type_ack_mode(Config, <<"quorum">>, <<"no-ack">>). + +simple_quorum_on_confirm(Config) -> + simple_queue_type_ack_mode(Config, <<"quorum">>, <<"on-confirm">>). + +simple_quorum_on_publish(Config) -> + simple_queue_type_ack_mode(Config, <<"quorum">>, <<"on-publish">>). + +simple_queue_type_ack_mode(Config, Type, AckMode) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp10_session( + Config, + fun (Sess) -> + amqp10_declare_queue(Sess, Src, #{<<"x-queue-type">> => {utf8, Type}}), + amqp10_declare_queue(Sess, Dest, #{<<"x-queue-type">> => {utf8, Type}}), + ExtraArgs = [{<<"ack-mode">>, AckMode}], + ShovelArgs = ?config(shovel_args, Config) ++ ExtraArgs, + set_param(Config, ?PARAM, ShovelArgs), + amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 10) + end). %%---------------------------------------------------------------------------- maybe_skip_local_protocol(Config) -> diff --git a/deps/rabbitmq_shovel/test/shovel_test_utils.erl b/deps/rabbitmq_shovel/test/shovel_test_utils.erl index 3b7b40758e..4d5bace315 100644 --- a/deps/rabbitmq_shovel/test/shovel_test_utils.erl +++ b/deps/rabbitmq_shovel/test/shovel_test_utils.erl @@ -22,7 +22,8 @@ amqp10_publish/3, amqp10_publish/4, amqp10_expect_empty/2, amqp10_expect_one/2, amqp10_expect_count/3, amqp10_expect/3, - amqp10_publish_expect/5, + amqp10_publish_expect/5, amqp10_declare_queue/3, + amqp10_subscribe/2, amqp10_expect/2, await_autodelete/2, await_autodelete1/2, invalid_param/2, invalid_param/3, valid_param/2, valid_param/3, valid_param1/3]). @@ -200,7 +201,8 @@ with_amqp10_session(Config, VHost, Fun) -> {ok, Conn} = amqp10_client:open_connection(Cfg), {ok, Sess} = amqp10_client:begin_session(Conn), Fun(Sess), - amqp10_client:close_connection(Conn), + ok = amqp10_client:end_session(Sess), + ok = amqp10_client:close_connection(Conn), ok. amqp10_publish(Sender, Tag, Payload) when is_binary(Payload) -> @@ -254,21 +256,8 @@ amqp10_expect_one(Session, Dest) -> amqp10_client:detach_link(Receiver), Msg. -amqp10_expect_count(Session, Dest, Count) -> - LinkName = <<"dynamic-receiver-", Dest/binary>>, - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName, - Dest, settled, - unsettled_state), - ok = amqp10_client:flow_link_credit(Receiver, Count, never), - Msgs = amqp10_expect(Receiver, Count, []), - receive - {amqp10_msg, Receiver, Msg} -> - throw({unexpected_msg, Msg}) - after 500 -> - ok - end, - amqp10_client:detach_link(Receiver), - Msgs. +amqp10_expect(Receiver, N) -> + amqp10_expect(Receiver, N, []). amqp10_expect(_, 0, Acc) -> Acc; @@ -288,6 +277,35 @@ amqp10_expect(Receiver) -> throw(timeout_in_expect_waiting_for_delivery) end. +amqp10_declare_queue(Sess, QName, Args) -> + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Sess, <<"mgmt link pair">>), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{arguments => Args}), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair). + +amqp10_subscribe(Session, Dest) -> + LinkName = <<"dynamic-receiver-", Dest/binary>>, + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName, + Dest, settled, + unsettled_state), + ok = amqp10_client:flow_link_credit(Receiver, 10, 1), + Receiver. + +amqp10_expect_count(Session, Dest, Count) -> + LinkName = <<"dynamic-receiver-", Dest/binary>>, + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName, + Dest, settled, + unsettled_state), + ok = amqp10_client:flow_link_credit(Receiver, Count, never), + Msgs = amqp10_expect(Receiver, Count, []), + receive + {amqp10_msg, Receiver, Msg} -> + throw({unexpected_msg, Msg}) + after 500 -> + ok + end, + amqp10_client:detach_link(Receiver), + Msgs. + await_autodelete(Config, Name) -> rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, await_autodelete1, [Config, Name], 10000).