diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl index 229c475d7f..afae0e755b 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl @@ -333,7 +333,7 @@ forward(Tag, Mc, State#{dest => Dst#{pending => {Pend}}}; forward(Tag, Msg0, #{dest := #{current := #{link := Link}, - unacked := Unacked} = Dst, + unacked := Unacked}, ack_mode := AckMode} = State) -> OutTag = rabbit_data_coercion:to_binary(Tag), Msg1 = add_timestamp_header(State, add_forward_headers(State, Msg0)), @@ -341,15 +341,16 @@ forward(Tag, Msg0, Msg3 = amqp10_raw_msg:new(AckMode =/= on_confirm, Tag, iolist_to_binary(Msg2)), case send_msg(Link, Msg3) of ok -> + #{dest := Dst1} = State1 = rabbit_shovel_behaviour:incr_forwarded(State), rabbit_shovel_behaviour:decr_remaining_unacked( case AckMode of no_ack -> - rabbit_shovel_behaviour:decr_remaining(1, State); + rabbit_shovel_behaviour:decr_remaining(1, State1); on_confirm -> - State#{dest => Dst#{unacked => Unacked#{OutTag => Tag}}}; + State1#{dest => Dst1#{unacked => Unacked#{OutTag => Tag}}}; on_publish -> - State1 = rabbit_shovel_behaviour:ack(Tag, false, State), - rabbit_shovel_behaviour:decr_remaining(1, State1) + State2 = rabbit_shovel_behaviour:ack(Tag, false, State1), + rabbit_shovel_behaviour:decr_remaining(1, State2) end); Stop -> Stop diff --git a/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl index 2f169fdf7b..0706d0873c 100644 --- a/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl @@ -11,6 +11,9 @@ -include_lib("amqp_client/include/amqp_client.hrl"). -import(rabbit_ct_helpers, [eventually/1]). +-import(shovel_test_utils, [await_autodelete/2, + invalid_param/2, invalid_param/3, + valid_param/2, valid_param/3]). -compile(export_all). @@ -27,7 +30,6 @@ all() -> groups() -> [ {core_tests, [], [ - simple, set_properties_using_proplist, set_properties_using_map, set_empty_properties_using_proplist, @@ -115,21 +117,6 @@ end_per_testcase(Testcase, Config) -> %% ------------------------------------------------------------------- %% Testcases. %% ------------------------------------------------------------------- - -simple(Config) -> - Name = <<"test">>, - with_ch(Config, - fun (Ch) -> - shovel_test_utils:set_param( - Config, - Name, [{<<"src-queue">>, <<"src">>}, - {<<"dest-queue">>, <<"dest">>}]), - publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>), - Status = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, Name}]), - ?assertMatch([_|_], Status), - ?assertMatch(#{metrics := #{forwarded := 1}}, maps:from_list(Status)) - end). - quorum_queues(Config) -> with_ch(Config, fun (Ch) -> @@ -814,23 +801,6 @@ expect_count(Ch, Q, M, Count) -> end || _ <- lists:seq(1, Count)], expect_empty(Ch, Q). -invalid_param(Config, Value, User) -> - {error_string, _} = rabbit_ct_broker_helpers:rpc(Config, 0, - rabbit_runtime_parameters, set, - [<<"/">>, <<"shovel">>, <<"invalid">>, Value, User]). - -valid_param(Config, Value, User) -> - rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, valid_param1, [Config, Value, User]). - -valid_param1(_Config, Value, User) -> - ok = rabbit_runtime_parameters:set( - <<"/">>, <<"shovel">>, <<"name">>, Value, User), - ok = rabbit_runtime_parameters:clear(<<"/">>, <<"shovel">>, <<"name">>, <<"acting-user">>). - -invalid_param(Config, Value) -> invalid_param(Config, Value, none). -valid_param(Config, Value) -> valid_param(Config, Value, none). - lookup_user(Config, Name) -> {ok, User} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_access_control, check_user_login, [Name, []]), @@ -849,23 +819,6 @@ cleanup1(_Config) -> [rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>) || Q <- rabbit_amqqueue:list()]. -await_autodelete(Config, Name) -> - rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, await_autodelete1, [Config, Name]). - -await_autodelete1(_Config, Name) -> - shovel_test_utils:await( - fun () -> not lists:member(Name, shovels_from_parameters()) end), - shovel_test_utils:await( - fun () -> - not lists:member(Name, - shovel_test_utils:shovels_from_status()) - end). - -shovels_from_parameters() -> - L = rabbit_runtime_parameters:list(<<"/">>, <<"shovel">>), - [rabbit_misc:pget(name, Shovel) || Shovel <- L]. - set_default_credit(Config, Value) -> Key = credit_flow_default_credit, OrigValue = rabbit_ct_broker_helpers:rpc(Config, persistent_term, get, [Key]), diff --git a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl index e537620c89..1933b758f6 100644 --- a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl @@ -11,7 +11,13 @@ -include_lib("eunit/include/eunit.hrl"). -compile(export_all). --import(shovel_test_utils, [await_credit/1]). +-import(shovel_test_utils, [with_amqp10_session/2, + amqp10_publish/3, amqp10_publish/5, + amqp10_expect_empty/2, + await_amqp10_event/3, amqp10_expect_one/2, + amqp10_expect_count/3, amqp10_publish/4, + amqp10_publish_expect/5, + await_autodelete/2]). all() -> [ @@ -86,7 +92,7 @@ end_per_testcase(Testcase, Config) -> simple(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> test_amqp10_destination(Config, Src, Dest, Sess, <<"amqp10">>, <<"src-address">>) @@ -95,7 +101,7 @@ simple(Config) -> simple_amqp10_dest(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> test_amqp10_destination(Config, Src, Dest, Sess, <<"amqp091">>, <<"src-queue">>) @@ -105,7 +111,7 @@ amqp091_to_amqp10_with_dead_lettering(Config) -> Dest = ?config(destq, Config), Src = ?config(srcq, Config), TmpQ = <<"tmp">>, - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Sess, <<"my link pair">>), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, TmpQ, @@ -118,10 +124,10 @@ amqp091_to_amqp10_with_dead_lettering(Config) -> unsettled, unsettled_state), ok = await_amqp10_event(link, Sender, attached), - expect_empty(Sess, TmpQ), + amqp10_expect_empty(Sess, TmpQ), test_amqp10_destination(Config, Src, Dest, Sess, <<"amqp091">>, <<"src-queue">>), %% publish to tmp, it should be dead-lettered to src and then shovelled to dest - _ = publish_expect(Sess, TmpQ, Dest, <<"tag1">>, <<"hello">>) + _ = amqp10_publish_expect(Sess, TmpQ, Dest, <<"hello">>, 1) end). test_amqp10_destination(Config, Src, Dest, Sess, Protocol, ProtocolSrc) -> @@ -156,7 +162,7 @@ test_amqp10_destination(Config, Src, Dest, Sess, Protocol, ProtocolSrc) -> [{<<"x-message-ann-key">>, <<"message-ann-value">>}] end}]), - Msg = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>), + [Msg] = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1), AppProps = amqp10_msg:application_properties(Msg), Anns = amqp10_msg:message_annotations(Msg), %% We no longer add/override properties, application properties or @@ -176,7 +182,7 @@ simple_amqp10_src(Config) -> MapConfig = ?config(map_config, Config), Src = ?config(srcq, Config), Dest = ?config(destq, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param( Config, @@ -192,8 +198,7 @@ simple_amqp10_src(Config) -> _ -> [{<<"cluster_id">>, <<"x">>}] end} ]), - _Msg = publish_expect(Sess, Src, Dest, <<"tag1">>, - <<"hello">>), + _Msg = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1), % the fidelity loss is quite high when consuming using the amqp10 % plugin. For example custom headers aren't current translated. % This isn't due to the shovel though. @@ -204,7 +209,7 @@ amqp10_to_amqp091_application_properties(Config) -> MapConfig = ?config(map_config, Config), Src = ?config(srcq, Config), Dest = ?config(destq, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param( Config, @@ -240,25 +245,25 @@ change_definition(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), Dest2 = ?config(destq2, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, <<"test">>, [{<<"src-address">>, Src}, {<<"src-protocol">>, <<"amqp10">>}, {<<"dest-protocol">>, <<"amqp10">>}, {<<"dest-address">>, Dest}]), - publish_expect(Sess, Src, Dest, <<"tag2">>,<<"hello">>), + amqp10_publish_expect(Sess, Src, Dest, <<"hello1">>, 1), shovel_test_utils:set_param(Config, <<"test">>, [{<<"src-address">>, Src}, {<<"src-protocol">>, <<"amqp10">>}, {<<"dest-protocol">>, <<"amqp10">>}, {<<"dest-address">>, Dest2}]), - publish_expect(Sess, Src, Dest2, <<"tag3">>, <<"hello">>), - expect_empty(Sess, Dest), + amqp10_publish_expect(Sess, Src, Dest2, <<"hello2">>, 1), + amqp10_expect_empty(Sess, Dest), shovel_test_utils:clear_param(Config, <<"test">>), - publish_expect(Sess, Src, Src, <<"tag4">>, <<"hello2">>), - expect_empty(Sess, Dest), - expect_empty(Sess, Dest2) + amqp10_publish_expect(Sess, Src, Src, <<"hello3">>, 1), + amqp10_expect_empty(Sess, Dest), + amqp10_expect_empty(Sess, Dest2) end). autodelete_amqp091_src_on_confirm(Config) -> @@ -282,13 +287,13 @@ autodelete_amqp091_dest_on_publish(Config) -> ok. autodelete_case(Config, Args, CaseFun) -> - with_session(Config, CaseFun(Config, Args)). + with_amqp10_session(Config, CaseFun(Config, Args)). autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), fun (Session) -> - publish_count(Session, Src, <<"hello">>, 100), + amqp10_publish(Session, Src, <<"hello">>, 100), shovel_test_utils:set_param_nowait( Config, <<"test">>, [{<<"src-address">>, Src}, @@ -300,15 +305,15 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) -> {<<"ack-mode">>, AckMode} ]), await_autodelete(Config, <<"test">>), - expect_count(Session, Dest, ExpDest), - expect_count(Session, Src, ExpSrc) + amqp10_expect_count(Session, Dest, ExpDest), + amqp10_expect_count(Session, Src, ExpSrc) end. autodelete_amqp091_src(Config, {AckMode, After, ExpSrc, ExpDest}) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), fun (Session) -> - publish_count(Session, Src, <<"hello">>, 100), + amqp10_publish(Session, Src, <<"hello">>, 100), shovel_test_utils:set_param_nowait( Config, <<"test">>, [{<<"src-queue">>, Src}, @@ -320,15 +325,15 @@ autodelete_amqp091_src(Config, {AckMode, After, ExpSrc, ExpDest}) -> {<<"ack-mode">>, AckMode} ]), await_autodelete(Config, <<"test">>), - expect_count(Session, Dest, ExpDest), - expect_count(Session, Src, ExpSrc) + amqp10_expect_count(Session, Dest, ExpDest), + amqp10_expect_count(Session, Src, ExpSrc) end. autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), fun (Session) -> - publish_count(Session, Src, <<"hello">>, 100), + amqp10_publish(Session, Src, <<"hello">>, 100), shovel_test_utils:set_param_nowait( Config, <<"test">>, [{<<"src-address">>, Src}, @@ -340,8 +345,8 @@ autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) -> {<<"ack-mode">>, AckMode} ]), await_autodelete(Config, <<"test">>), - expect_count(Session, Dest, ExpDest), - expect_count(Session, Src, ExpSrc) + amqp10_expect_count(Session, Dest, ExpDest), + amqp10_expect_count(Session, Src, ExpSrc) end. test_amqp10_delete_after_queue_length(Config) -> @@ -364,27 +369,6 @@ test_amqp10_delete_after_queue_length(Config) -> ?assertMatch(match, re:run(Msg, "Validation failed.*", [{capture, none}])). %%---------------------------------------------------------------------------- - -with_session(Config, Fun) -> - Hostname = ?config(rmq_hostname, Config), - Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - {ok, Conn} = amqp10_client:open_connection(Hostname, Port), - {ok, Sess} = amqp10_client:begin_session(Conn), - Fun(Sess), - amqp10_client:close_connection(Conn), - ok. - -publish(Sender, Tag, Payload) when is_binary(Payload) -> - Headers = #{durable => true}, - Msg = amqp10_msg:set_headers(Headers, - amqp10_msg:new(Tag, Payload, false)), - ok = amqp10_client:send_msg(Sender, Msg), - receive - {amqp10_disposition, {accepted, Tag}} -> ok - after 3000 -> - exit(publish_disposition_not_received) - end. - publish(Sender, Msg) -> ok = amqp10_client:send_msg(Sender, Msg), Tag = amqp10_msg:delivery_tag(Msg), @@ -394,16 +378,6 @@ publish(Sender, Msg) -> exit(publish_disposition_not_received) end. -publish_expect(Session, Source, Dest, Tag, Payload) -> - LinkName = <<"dynamic-sender-", Dest/binary>>, - {ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, Source, - unsettled, unsettled_state), - ok = await_amqp10_event(link, Sender, attached), - await_credit(Sender), - publish(Sender, Tag, Payload), - amqp10_client:detach_link(Sender), - expect_one(Session, Dest). - publish_expect_msg(Session, Source, Dest, Msg) -> LinkName = <<"dynamic-sender-", Dest/binary>>, {ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, Source, @@ -411,104 +385,9 @@ publish_expect_msg(Session, Source, Dest, Msg) -> ok = await_amqp10_event(link, Sender, attached), publish(Sender, Msg), amqp10_client:detach_link(Sender), - expect_one(Session, Dest). - -await_amqp10_event(On, Ref, Evt) -> - receive - {amqp10_event, {On, Ref, Evt}} -> ok - after 5000 -> - exit({amqp10_event_timeout, On, Ref, Evt}) - end. - -expect_one(Session, Dest) -> - LinkName = <<"dynamic-receiver-", Dest/binary>>, - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName, - Dest, settled, - unsettled_state), - ok = amqp10_client:flow_link_credit(Receiver, 1, never), - Msg = expect(Receiver), - amqp10_client:detach_link(Receiver), - Msg. - -expect(Receiver) -> - receive - {amqp10_msg, Receiver, InMsg} -> - InMsg - after 4000 -> - throw(timeout_in_expect_waiting_for_delivery) - end. - -expect_empty(Session, Dest) -> - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, - <<"dynamic-receiver">>, - Dest, settled, - unsettled_state), - % probably good enough given we don't currently have a means of - % echoing flow state - {error, timeout} = amqp10_client:get_msg(Receiver, 250), - amqp10_client:detach_link(Receiver). - -publish_count(Session, Address, Payload, Count) -> - LinkName = <<"dynamic-sender-", Address/binary>>, - {ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, - Address, unsettled, - unsettled_state), - ok = await_amqp10_event(link, Sender, attached), - [begin - Tag = rabbit_data_coercion:to_binary(I), - publish(Sender, Tag, <>) - end || I <- lists:seq(1, Count)], - amqp10_client:detach_link(Sender). - -expect_count(Session, Address, Count) -> - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, - <<"dynamic-receiver", - Address/binary>>, - Address, settled, - unsettled_state), - ok = amqp10_client:flow_link_credit(Receiver, Count, never), - [begin - expect(Receiver) - end || _ <- lists:seq(1, Count)], - expect_empty(Session, Address), - amqp10_client:detach_link(Receiver). - - -invalid_param(Config, Value, User) -> - {error_string, _} = rabbit_ct_broker_helpers:rpc(Config, 0, - rabbit_runtime_parameters, set, - [<<"/">>, <<"shovel">>, <<"invalid">>, Value, User]). - -valid_param(Config, Value, User) -> - rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, valid_param1, [Config, Value, User]). - -valid_param1(_Config, Value, User) -> - ok = rabbit_runtime_parameters:set( - <<"/">>, <<"shovel">>, <<"a">>, Value, User), - ok = rabbit_runtime_parameters:clear(<<"/">>, <<"shovel">>, <<"a">>, <<"acting-user">>). - -invalid_param(Config, Value) -> invalid_param(Config, Value, none). -valid_param(Config, Value) -> valid_param(Config, Value, none). + amqp10_expect_one(Session, Dest). lookup_user(Config, Name) -> {ok, User} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_access_control, check_user_login, [Name, []]), User. - -await_autodelete(Config, Name) -> - rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, await_autodelete1, [Config, Name], 10000). - -await_autodelete1(_Config, Name) -> - shovel_test_utils:await( - fun () -> not lists:member(Name, shovels_from_parameters()) end), - shovel_test_utils:await( - fun () -> - not lists:member(Name, - shovel_test_utils:shovels_from_status()) - end). - -shovels_from_parameters() -> - L = rabbit_runtime_parameters:list(<<"/">>, <<"shovel">>), - [rabbit_misc:pget(name, Shovel) || Shovel <- L]. diff --git a/deps/rabbitmq_shovel/test/amqp10_static_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_static_SUITE.erl index e18fc70cc5..4885c8e814 100644 --- a/deps/rabbitmq_shovel/test/amqp10_static_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_static_SUITE.erl @@ -146,7 +146,7 @@ amqp10_destination(Config, AckMode) -> ?assertMatch(#{durable := true}, amqp10_msg:headers(InMsg)), ok after ?TIMEOUT -> - throw(timeout_waiting_for_deliver1) + throw(timeout_waiting_for_deliver1) end, [{test_shovel, static, {running, _Info}, _Metrics, _Time}] = diff --git a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl index 94e8acac49..bd6a7b982d 100644 --- a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl @@ -14,7 +14,12 @@ -compile(export_all). --import(shovel_test_utils, [await_amqp10_event/3, await_credit/1]). +-import(shovel_test_utils, [with_amqp10_session/2, with_amqp10_session/3, + amqp10_expect_empty/2, + amqp10_publish/4, amqp10_expect_one/2, + amqp10_expect_count/3, amqp10_expect/3, + amqp10_publish_expect/5, + await_autodelete/2]). -define(PARAM, <<"test">>). @@ -27,7 +32,6 @@ groups() -> [ {tests, [], [ local_to_local_opt_headers, - local_to_local_queue_dest, local_to_local_original_dest, local_to_local_exchange_dest, local_to_local_missing_exchange_dest, @@ -134,7 +138,7 @@ init_per_testcase(Testcase, Config0) -> end_per_testcase(Testcase, Config) -> shovel_test_utils:clear_param(Config, ?PARAM), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_all_queues, []), + rabbit_ct_broker_helpers:rpc(Config, 0, shovel_test_utils, delete_all_queues, []), _ = rabbit_ct_broker_helpers:delete_vhost(Config, ?config(alt_vhost, Config)), rabbit_ct_helpers:testcase_finished(Config, Testcase). @@ -145,7 +149,7 @@ end_per_testcase(Testcase, Config) -> local_to_local_opt_headers(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -155,7 +159,7 @@ local_to_local_opt_headers(Config) -> {<<"dest-add-forward-headers">>, true}, {<<"dest-add-timestamp-header">>, true} ]), - Msg = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>), + [Msg] = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1), ?assertMatch(#{<<"x-opt-shovel-name">> := ?PARAM, <<"x-opt-shovel-type">> := <<"dynamic">>, <<"x-opt-shovelled-by">> := _, @@ -163,20 +167,6 @@ local_to_local_opt_headers(Config) -> amqp10_msg:message_annotations(Msg)) end). -local_to_local_queue_dest(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - with_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>) - end). - local_to_local_original_dest(Config) -> %% Publish with the original routing keys, but use a different vhost %% to avoid a loop (this is a single-node test). @@ -186,7 +176,7 @@ local_to_local_original_dest(Config) -> ok = rabbit_ct_broker_helpers:add_vhost(Config, AltVHost), ok = rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, AltVHost), declare_queue(Config, AltVHost, Dest), - with_session( + with_amqp10_session( Config, fun (Sess) -> SrcUri = shovel_test_utils:make_uri(Config, 0, <<"%2F">>), @@ -200,11 +190,11 @@ local_to_local_original_dest(Config) -> {<<"dest-protocol">>, <<"local">>}], none]), shovel_test_utils:await_shovel(Config, 0, ?PARAM), - _ = publish(Sess, Src, Dest, <<"tag1">>, <<"hello">>) + _ = amqp10_publish(Sess, Src, <<"hello">>, 1) end), - with_session(Config, AltVHost, + with_amqp10_session(Config, AltVHost, fun (Sess) -> - expect_one(Sess, Dest) + amqp10_expect_one(Sess, Dest) end). local_to_local_exchange_dest(Config) -> @@ -214,7 +204,7 @@ local_to_local_exchange_dest(Config) -> RoutingKey = <<"funky-routing-key">>, declare_exchange(Config, <<"/">>, AltExchange), declare_and_bind_queue(Config, <<"/">>, AltExchange, Dest, RoutingKey), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -223,7 +213,7 @@ local_to_local_exchange_dest(Config) -> {<<"dest-exchange">>, AltExchange}, {<<"dest-exchange-key">>, RoutingKey} ]), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>) + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) end). local_to_local_missing_exchange_dest(Config) -> @@ -244,7 +234,7 @@ local_to_local_predeclared_src(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), declare_queue(Config, <<"/">>, Src), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -253,14 +243,14 @@ local_to_local_predeclared_src(Config) -> {<<"dest-protocol">>, <<"local">>}, {<<"dest-queue">>, Dest} ]), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>) + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) end). local_to_local_predeclared_quorum_src(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), declare_queue(Config, <<"/">>, Src, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -269,7 +259,7 @@ local_to_local_predeclared_quorum_src(Config) -> {<<"dest-protocol">>, <<"local">>}, {<<"dest-queue">>, Dest} ]), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>) + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) end). local_to_local_predeclared_stream_first_offset_src(Config) -> @@ -277,9 +267,9 @@ local_to_local_predeclared_stream_first_offset_src(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), declare_queue(Config, <<"/">>, Src, [{<<"x-queue-type">>, longstr, <<"stream">>}]), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> - publish_many(Sess, Src, Dest, <<"tag1">>, 20), + amqp10_publish(Sess, Src, <<"tag1">>, 20), shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, {<<"src-queue">>, Src}, @@ -288,9 +278,9 @@ local_to_local_predeclared_stream_first_offset_src(Config) -> {<<"dest-protocol">>, <<"local">>}, {<<"dest-queue">>, Dest} ]), - expect_many(Sess, Dest, 20), - expect_none(Sess, Dest), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>) + amqp10_expect_count(Sess, Dest, 20), + amqp10_expect_empty(Sess, Dest), + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) end). local_to_local_predeclared_stream_last_offset_src(Config) -> @@ -298,9 +288,9 @@ local_to_local_predeclared_stream_last_offset_src(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), declare_queue(Config, <<"/">>, Src, [{<<"x-queue-type">>, longstr, <<"stream">>}]), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> - publish_many(Sess, Src, Dest, <<"tag1">>, 20), + amqp10_publish(Sess, Src, <<"tag1">>, 20), shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, {<<"src-queue">>, Src}, @@ -310,9 +300,9 @@ local_to_local_predeclared_stream_last_offset_src(Config) -> {<<"dest-queue">>, Dest} ]), %% Deliver last - expect_many(Sess, Dest, 1), - expect_none(Sess, Dest), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>) + amqp10_expect_count(Sess, Dest, 1), + amqp10_expect_empty(Sess, Dest), + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) end). local_to_local_missing_predeclared_src(Config) -> @@ -338,7 +328,7 @@ local_to_local_missing_predeclared_src(Config) -> local_to_local_exchange_src(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -348,7 +338,7 @@ local_to_local_exchange_src(Config) -> {<<"dest-queue">>, Dest} ]), Target = <<"/exchange/amq.direct/", Src/binary>>, - _ = publish_expect(Sess, Target, Dest, <<"tag1">>, <<"hello">>) + _ = amqp10_publish_expect(Sess, Target, Dest, <<"hello">>, 1) end). local_to_local_queue_args_src(Config) -> @@ -387,7 +377,7 @@ local_to_local_predeclared_dest(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), declare_queue(Config, <<"/">>, Dest), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -396,14 +386,14 @@ local_to_local_predeclared_dest(Config) -> {<<"dest-protocol">>, <<"local">>}, {<<"dest-queue">>, Dest} ]), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>) + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) end). local_to_local_predeclared_quorum_dest(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), declare_queue(Config, <<"/">>, Dest, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -412,7 +402,7 @@ local_to_local_predeclared_quorum_dest(Config) -> {<<"dest-protocol">>, <<"local">>}, {<<"dest-queue">>, Dest} ]), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>) + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) end). local_to_local_missing_predeclared_dest(Config) -> @@ -522,7 +512,7 @@ local_to_local_queue_and_exchange_dest_fails(Config) -> local_to_local_delete_after_never(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -530,8 +520,8 @@ local_to_local_delete_after_never(Config) -> {<<"dest-protocol">>, <<"local">>}, {<<"dest-queue">>, Dest} ]), - publish_many(Sess, Src, Dest, <<"tag1">>, 20), - expect_many(Sess, Dest, 20) + amqp10_publish(Sess, Src, <<"tag1">>, 20), + amqp10_expect_count(Sess, Dest, 20) end). local_to_local_delete_after_queue_length_zero(Config) -> @@ -556,9 +546,9 @@ local_to_local_delete_after_queue_length(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), declare_queue(Config, <<"/">>, Src), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> - publish_many(Sess, Src, Dest, <<"tag1">>, 18), + amqp10_publish(Sess, Src, <<"tag1">>, 18), shovel_test_utils:set_param_nowait(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, {<<"src-predeclared">>, true}, @@ -570,22 +560,18 @@ local_to_local_delete_after_queue_length(Config) -> %% The shovel parameter is only deleted when 'delete-after' %% is used. In any other failure, the shovel should %% remain and try to restart - expect_many(Sess, Dest, 18), - ?awaitMatch(not_found, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, lookup, [<<"/">>, <<"shovel">>, ?PARAM]), 30_000), - ?awaitMatch([], - rabbit_ct_broker_helpers:rpc(Config, 0, - rabbit_shovel_status, status, []), - 30_000), - publish_many(Sess, Src, Dest, <<"tag1">>, 5), - expect_none(Sess, Dest) + amqp10_expect_count(Sess, Dest, 18), + await_autodelete(Config, ?PARAM), + amqp10_publish(Sess, Src, <<"tag1">>, 5), + amqp10_expect_empty(Sess, Dest) end). local_to_local_delete_after_number(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> - publish_many(Sess, Src, Dest, <<"tag1">>, 5), + amqp10_publish(Sess, Src, <<"tag1">>, 5), shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, {<<"src-queue">>, Src}, @@ -593,17 +579,17 @@ local_to_local_delete_after_number(Config) -> {<<"dest-protocol">>, <<"local">>}, {<<"dest-queue">>, Dest} ]), - expect_many(Sess, Dest, 5), - publish_many(Sess, Src, Dest, <<"tag1">>, 10), - expect_many(Sess, Dest, 5), - ?assertMatch(not_found, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, lookup, [<<"/">>, <<"shovel">>, ?PARAM])), - expect_none(Sess, Dest) + amqp10_expect_count(Sess, Dest, 5), + amqp10_publish(Sess, Src, <<"tag1">>, 10), + amqp10_expect_count(Sess, Dest, 5), + await_autodelete(Config, ?PARAM), + amqp10_expect_empty(Sess, Dest) end). local_to_local_no_ack(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -612,7 +598,7 @@ local_to_local_no_ack(Config) -> {<<"dest-queue">>, Dest}, {<<"ack-mode">>, <<"no-ack">>} ]), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>) + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) end). local_to_local_quorum_no_ack(Config) -> @@ -621,7 +607,7 @@ local_to_local_quorum_no_ack(Config) -> VHost = <<"/">>, declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -632,7 +618,7 @@ local_to_local_quorum_no_ack(Config) -> {<<"dest-queue">>, Dest}, {<<"ack-mode">>, <<"no-ack">>} ]), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>) + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) end). local_to_local_stream_no_ack(Config) -> @@ -640,7 +626,7 @@ local_to_local_stream_no_ack(Config) -> Dest = ?config(destq, Config), declare_queue(Config, <<"/">>, Src, [{<<"x-queue-type">>, longstr, <<"stream">>}]), declare_queue(Config, <<"/">>, Dest, [{<<"x-queue-type">>, longstr, <<"stream">>}]), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -652,19 +638,19 @@ local_to_local_stream_no_ack(Config) -> {<<"ack-mode">>, <<"no-ack">>} ]), Receiver = subscribe(Sess, Dest), - publish_many(Sess, Src, Dest, <<"tag1">>, 10), + amqp10_publish(Sess, Src, <<"tag1">>, 10), ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 10}, _}], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []), 30000), - _ = expect(Receiver, 10, []), + _ = amqp10_expect(Receiver, 10, []), amqp10_client:detach_link(Receiver) end). local_to_local_on_confirm(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -673,7 +659,7 @@ local_to_local_on_confirm(Config) -> {<<"dest-queue">>, Dest}, {<<"ack-mode">>, <<"on-confirm">>} ]), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>) + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) end). local_to_local_quorum_on_confirm(Config) -> @@ -682,7 +668,7 @@ local_to_local_quorum_on_confirm(Config) -> VHost = <<"/">>, declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -693,7 +679,7 @@ local_to_local_quorum_on_confirm(Config) -> {<<"dest-queue">>, Dest}, {<<"ack-mode">>, <<"on-confirm">>} ]), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>) + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) end). local_to_local_stream_on_confirm(Config) -> @@ -702,7 +688,7 @@ local_to_local_stream_on_confirm(Config) -> VHost = <<"/">>, declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"stream">>}]), declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"stream">>}]), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -714,19 +700,19 @@ local_to_local_stream_on_confirm(Config) -> {<<"ack-mode">>, <<"on-confirm">>} ]), Receiver = subscribe(Sess, Dest), - publish_many(Sess, Src, Dest, <<"tag1">>, 10), + amqp10_publish(Sess, Src, <<"tag1">>, 10), ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 10}, _}], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []), 30000), - _ = expect(Receiver, 10, []), + _ = amqp10_expect(Receiver, 10, []), amqp10_client:detach_link(Receiver) end). local_to_local_on_publish(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -735,7 +721,7 @@ local_to_local_on_publish(Config) -> {<<"dest-queue">>, Dest}, {<<"ack-mode">>, <<"on-publish">>} ]), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>) + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) end). local_to_local_quorum_on_publish(Config) -> @@ -744,7 +730,7 @@ local_to_local_quorum_on_publish(Config) -> VHost = <<"/">>, declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -755,7 +741,7 @@ local_to_local_quorum_on_publish(Config) -> {<<"dest-queue">>, Dest}, {<<"ack-mode">>, <<"on-publish">>} ]), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>) + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) end). local_to_local_stream_on_publish(Config) -> @@ -764,7 +750,7 @@ local_to_local_stream_on_publish(Config) -> VHost = <<"/">>, declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"stream">>}]), declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"stream">>}]), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -776,12 +762,12 @@ local_to_local_stream_on_publish(Config) -> {<<"ack-mode">>, <<"on-publish">>} ]), Receiver = subscribe(Sess, Dest), - publish_many(Sess, Src, Dest, <<"tag1">>, 10), + amqp10_publish(Sess, Src, <<"tag1">>, 10), ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 10}, _}], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []), 30000), - _ = expect(Receiver, 10, []), + _ = amqp10_expect(Receiver, 10, []), amqp10_client:detach_link(Receiver) end). @@ -791,10 +777,10 @@ local_to_local_reject_publish(Config) -> declare_queue(Config, <<"/">>, Dest, [{<<"x-max-length">>, long, 1}, {<<"x-overflow">>, longstr, <<"reject-publish">>} ]), - with_session( + with_amqp10_session( Config, fun (Sess) -> - publish_many(Sess, Src, Dest, <<"tag1">>, 5), + amqp10_publish(Sess, Src, <<"tag1">>, 5), shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, {<<"src-queue">>, Src}, @@ -803,14 +789,13 @@ local_to_local_reject_publish(Config) -> {<<"dest-queue">>, Dest}, {<<"ack-mode">>, <<"on-confirm">>} ]), - expect_many(Sess, Dest, 1), - expect_none(Sess, Dest) + amqp10_expect_count(Sess, Dest, 1) end). local_to_amqp091(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -818,13 +803,13 @@ local_to_amqp091(Config) -> {<<"dest-protocol">>, <<"amqp091">>}, {<<"dest-queue">>, Dest} ]), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>) + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) end). local_to_amqp10(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -832,13 +817,13 @@ local_to_amqp10(Config) -> {<<"dest-protocol">>, <<"amqp10">>}, {<<"dest-address">>, Dest} ]), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>) + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) end). amqp091_to_local(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"amqp091">>}, @@ -846,13 +831,13 @@ amqp091_to_local(Config) -> {<<"dest-protocol">>, <<"local">>}, {<<"dest-queue">>, Dest} ]), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>) + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) end). amqp10_to_local(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"amqp10">>}, @@ -860,13 +845,13 @@ amqp10_to_local(Config) -> {<<"dest-protocol">>, <<"local">>}, {<<"dest-queue">>, Dest} ]), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>) + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1) end). local_to_local_delete_src_queue(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -874,7 +859,7 @@ local_to_local_delete_src_queue(Config) -> {<<"dest-protocol">>, <<"local">>}, {<<"dest-queue">>, Dest} ]), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>), + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1), ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 1}, _}], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []), @@ -890,7 +875,7 @@ local_to_local_delete_src_queue(Config) -> local_to_local_delete_dest_queue(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -898,7 +883,7 @@ local_to_local_delete_dest_queue(Config) -> {<<"dest-protocol">>, <<"local">>}, {<<"dest-queue">>, Dest} ]), - _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>), + _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1), ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 1}, _}], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []), @@ -956,7 +941,7 @@ local_to_local_credit_flow_no_ack(Config) -> local_to_local_credit_flow(Config, AckMode) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -965,8 +950,8 @@ local_to_local_credit_flow(Config, AckMode) -> {<<"dest-queue">>, Dest}, {<<"ack-mode">>, AckMode} ]), - publish_many(Sess, Src, Dest, <<"tag1">>, 1000), - expect_many(Sess, Dest, 1000) + amqp10_publish(Sess, Src, <<"tag1">>, 1000), + amqp10_expect_count(Sess, Dest, 1000) end). local_to_local_quorum_credit_flow_on_confirm(Config) -> @@ -984,7 +969,7 @@ local_to_local_quorum_credit_flow(Config, AckMode) -> VHost = <<"/">>, declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -995,8 +980,8 @@ local_to_local_quorum_credit_flow(Config, AckMode) -> {<<"dest-predeclared">>, true}, {<<"ack-mode">>, AckMode} ]), - publish_many(Sess, Src, Dest, <<"tag1">>, 1000), - expect_many(Sess, Dest, 1000) + amqp10_publish(Sess, Src, <<"tag1">>, 1000), + amqp10_expect_count(Sess, Dest, 1000) end). local_to_local_stream_credit_flow_on_confirm(Config) -> @@ -1014,7 +999,7 @@ local_to_local_stream_credit_flow(Config, AckMode) -> VHost = <<"/">>, declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"stream">>}]), declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"stream">>}]), - with_session(Config, + with_amqp10_session(Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, [{<<"src-protocol">>, <<"local">>}, @@ -1027,12 +1012,12 @@ local_to_local_stream_credit_flow(Config, AckMode) -> ]), Receiver = subscribe(Sess, Dest), - publish_many(Sess, Src, Dest, <<"tag1">>, 1000), + amqp10_publish(Sess, Src, <<"tag1">>, 1000), ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 1000}, _}], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []), 30000), - _ = expect(Receiver, 1000, []), + _ = amqp10_expect(Receiver, 1000, []), amqp10_client:detach_link(Receiver) end). @@ -1057,7 +1042,7 @@ local_to_local_counters(Config) -> %% Let's restart the node so the counters are reset ok = rabbit_ct_broker_helpers:stop_node(Config, 0), ok = rabbit_ct_broker_helpers:start_node(Config, 0), - with_session( + with_amqp10_session( Config, fun (Sess) -> ?awaitMatch(#{publishers := 0, consumers := 0}, @@ -1070,7 +1055,7 @@ local_to_local_counters(Config) -> ]), ?awaitMatch(#{publishers := 1, consumers := 1}, get_global_counters(Config), 30_000), - _ = publish_many(Sess, Src, Dest, <<"tag1">>, 150), + _ = amqp10_publish(Sess, Src, <<"tag1">>, 150), ?awaitMatch(#{consumers := 1, publishers := 1, messages_received_total := 150, messages_received_confirm_total := 150, @@ -1082,81 +1067,6 @@ local_to_local_counters(Config) -> end). %%---------------------------------------------------------------------------- -with_session(Config, Fun) -> - with_session(Config, <<"/">>, Fun). - -with_session(Config, VHost, Fun) -> - Hostname = ?config(rmq_hostname, Config), - Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - Cfg = #{address => Hostname, - port => Port, - sasl => {plain, <<"guest">>, <<"guest">>}, - hostname => <<"vhost:", VHost/binary>>}, - {ok, Conn} = amqp10_client:open_connection(Cfg), - {ok, Sess} = amqp10_client:begin_session(Conn), - Fun(Sess), - amqp10_client:close_connection(Conn), - ok. - -publish(Sender, Tag, Payload) when is_binary(Payload) -> - Headers = #{durable => true}, - Msg = amqp10_msg:set_headers(Headers, - amqp10_msg:new(Tag, Payload, false)), - %% N.B.: this function does not attach a link and does not - %% need to use await_credit/1 - ok = amqp10_client:send_msg(Sender, Msg), - receive - {amqp10_disposition, {accepted, Tag}} -> ok - after 3000 -> - exit(publish_disposition_not_received) - end. - -publish(Session, Source, Dest, Tag, Payloads) -> - LinkName = <<"dynamic-sender-", Dest/binary>>, - {ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, Source, - unsettled, unsettled_state), - ok = await_amqp10_event(link, Sender, attached), - ok = await_credit(Sender), - case is_list(Payloads) of - true -> - [publish(Sender, Tag, Payload) || Payload <- Payloads]; - false -> - publish(Sender, Tag, Payloads) - end, - amqp10_client:detach_link(Sender). - -publish_expect(Session, Source, Dest, Tag, Payload) -> - publish(Session, Source, Dest, Tag, Payload), - expect_one(Session, Dest). - -publish_many(Session, Source, Dest, Tag, N) -> - Payloads = [integer_to_binary(Payload) || Payload <- lists:seq(1, N)], - publish(Session, Source, Dest, Tag, Payloads). - -expect_one(Session, Dest) -> - LinkName = <<"dynamic-receiver-", Dest/binary>>, - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName, - Dest, settled, - unsettled_state), - ok = amqp10_client:flow_link_credit(Receiver, 1, never), - Msg = expect(Receiver), - amqp10_client:detach_link(Receiver), - Msg. - -expect_none(Session, Dest) -> - LinkName = <<"dynamic-receiver-", Dest/binary>>, - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName, - Dest, settled, - unsettled_state), - ok = amqp10_client:flow_link_credit(Receiver, 1, never), - receive - {amqp10_msg, Receiver, _} -> - throw(unexpected_msg) - after 4000 -> - ok - end, - amqp10_client:detach_link(Receiver). - subscribe(Session, Dest) -> LinkName = <<"dynamic-receiver-", Dest/binary>>, {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName, @@ -1165,34 +1075,6 @@ subscribe(Session, Dest) -> ok = amqp10_client:flow_link_credit(Receiver, 10, 1), Receiver. -expect_many(Session, Dest, N) -> - LinkName = <<"dynamic-receiver-", Dest/binary>>, - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName, - Dest, settled, - unsettled_state), - ok = amqp10_client:flow_link_credit(Receiver, 10, 1), - Msgs = expect(Receiver, N, []), - amqp10_client:detach_link(Receiver), - Msgs. - -expect(_, 0, Acc) -> - Acc; -expect(Receiver, N, Acc) -> - receive - {amqp10_msg, Receiver, InMsg} -> - expect(Receiver, N - 1, [amqp10_msg:body(InMsg) | Acc]) - after 4000 -> - throw({timeout_in_expect_waiting_for_delivery, N, Acc}) - end. - -expect(Receiver) -> - receive - {amqp10_msg, Receiver, InMsg} -> - InMsg - after 4000 -> - throw(timeout_in_expect_waiting_for_delivery) - end. - declare_queue(Config, VHost, QName) -> declare_queue(Config, VHost, QName, []). @@ -1233,13 +1115,6 @@ declare_exchange(Config, VHost, Exchange) -> rabbit_ct_client_helpers:close_channel(Ch), rabbit_ct_client_helpers:close_connection(Conn). -delete_all_queues() -> - Queues = rabbit_amqqueue:list(), - lists:foreach( - fun(Q) -> - {ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) - end, Queues). - delete_queue(Name, VHost) -> QName = rabbit_misc:r(VHost, queue, Name), case rabbit_amqqueue:lookup(QName) of diff --git a/deps/rabbitmq_shovel/test/local_dynamic_cluster_SUITE.erl b/deps/rabbitmq_shovel/test/local_dynamic_cluster_SUITE.erl index 4aadb92798..4347cb2b44 100644 --- a/deps/rabbitmq_shovel/test/local_dynamic_cluster_SUITE.erl +++ b/deps/rabbitmq_shovel/test/local_dynamic_cluster_SUITE.erl @@ -14,7 +14,8 @@ -compile(export_all). --import(shovel_test_utils, [await_amqp10_event/3, await_credit/1]). +-import(shovel_test_utils, [await_amqp10_event/3, await_credit/1, + with_amqp10_session/2]). -define(PARAM, <<"test">>). @@ -84,7 +85,7 @@ init_per_testcase(Testcase, Config0) -> end_per_testcase(Testcase, Config) -> shovel_test_utils:clear_param(Config, ?PARAM), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_all_queues, []), + rabbit_ct_broker_helpers:rpc(Config, 0, shovel_test_utils, delete_all_queues, []), _ = rabbit_ct_broker_helpers:delete_vhost(Config, ?config(alt_vhost, Config)), rabbit_ct_helpers:testcase_finished(Config, Testcase). @@ -97,7 +98,7 @@ local_to_local_dest_down(Config) -> Dest = ?config(destq, Config), declare_queue(Config, 0, <<"/">>, Src), declare_queue(Config, 1, <<"/">>, Dest), - with_session( + with_amqp10_session( Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, @@ -128,7 +129,7 @@ local_to_local_multiple_all_dest_down(Config) -> declare_queue(Config, 0, <<"/">>, Src), declare_and_bind_queue(Config, 1, <<"/">>, <<"amq.fanout">>, Dest, Dest), declare_and_bind_queue(Config, 1, <<"/">>, <<"amq.fanout">>, Dest2, Dest2), - with_session( + with_amqp10_session( Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, @@ -164,7 +165,7 @@ local_to_local_multiple_some_dest_down(Config) -> %% and should be requeued. declare_and_bind_queue(Config, 1, <<"/">>, <<"amq.fanout">>, Dest, Dest), declare_and_bind_queue(Config, 2, <<"/">>, <<"amq.fanout">>, Dest2, Dest2), - with_session( + with_amqp10_session( Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, @@ -200,7 +201,7 @@ local_to_local_no_destination(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), declare_queue(Config, 0, <<"/">>, Src), - with_session( + with_amqp10_session( Config, fun (Sess) -> shovel_test_utils:set_param(Config, ?PARAM, @@ -229,22 +230,6 @@ to_int(<<>>) -> to_int(Int) -> binary_to_integer(Int). -with_session(Config, Fun) -> - with_session(Config, <<"/">>, Fun). - -with_session(Config, VHost, Fun) -> - Hostname = ?config(rmq_hostname, Config), - Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - Cfg = #{address => Hostname, - port => Port, - sasl => {plain, <<"guest">>, <<"guest">>}, - hostname => <<"vhost:", VHost/binary>>}, - {ok, Conn} = amqp10_client:open_connection(Cfg), - {ok, Sess} = amqp10_client:begin_session(Conn), - Fun(Sess), - amqp10_client:close_connection(Conn), - ok. - publish(Sender, Tag, Payload) when is_binary(Payload) -> Headers = #{durable => true}, Msg = amqp10_msg:set_headers(Headers, @@ -344,13 +329,6 @@ declare_exchange(Config, VHost, Exchange) -> rabbit_ct_client_helpers:close_channel(Ch), rabbit_ct_client_helpers:close_connection(Conn). -delete_all_queues() -> - Queues = rabbit_amqqueue:list(), - lists:foreach( - fun(Q) -> - {ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) - end, Queues). - delete_queue(Name, VHost) -> QName = rabbit_misc:r(VHost, queue, Name), case rabbit_amqqueue:lookup(QName) of diff --git a/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl new file mode 100644 index 0000000000..d938128f46 --- /dev/null +++ b/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl @@ -0,0 +1,215 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(shovel_dynamic_SUITE). +%% Common test cases to all protocols + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). + +-compile(export_all). + +-import(shovel_test_utils, [set_param/3, + with_amqp10_session/2, + amqp10_publish_expect/5]). + +-define(PARAM, <<"test">>). + +all() -> + [ + {group, amqp091}, + {group, amqp10}, + {group, local}, + {group, amqp091_to_amqp10}, + {group, amqp091_to_local}, + {group, amqp10_to_amqp091}, + {group, amqp10_to_local}, + {group, local_to_amqp091}, + {group, local_to_amqp10} + ]. + +groups() -> + [ + {amqp091, [], tests()}, + {amqp10, [], tests()}, + {local, [], tests()}, + {amqp091_to_amqp10, [], tests()}, + {amqp091_to_local, [], tests()}, + {amqp10_to_amqp091, [], tests()}, + {amqp10_to_local, [], tests()}, + {local_to_amqp091, [], tests()}, + {local_to_amqp10, [], tests()} + ]. + +tests() -> + [ + simple + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config0) -> + {ok, _} = application:ensure_all_started(amqp10_client), + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config0, [ + {rmq_nodename_suffix, ?MODULE}, + {ignored_crashes, [ + "server_initiated_close,404", + "writer,send_failed,closed", + "source_queue_down", + "dest_queue_down" + ]} + ]), + rabbit_ct_helpers:run_setup_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_suite(Config) -> + application:stop(amqp10_client), + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(amqp091, Config) -> + rabbit_ct_helpers:set_config( + Config, + [ + {src_protocol, <<"amqp091">>}, + {dest_protocol, <<"amqp091">>}, + {src_address, <<"src-queue">>}, + {dest_address, <<"dest-queue">>} + ]); +init_per_group(amqp10, Config) -> + rabbit_ct_helpers:set_config( + Config, + [ + {src_protocol, <<"amqp10">>}, + {dest_protocol, <<"amqp10">>}, + {src_address, <<"src-address">>}, + {dest_address, <<"dest-address">>} + ]); +init_per_group(local, Config0) -> + Config = rabbit_ct_helpers:set_config( + Config0, + [ + {src_protocol, <<"local">>}, + {dest_protocol, <<"local">>}, + {src_address, <<"src-queue">>}, + {dest_address, <<"dest-queue">>} + ]), + maybe_skip_local_protocol(Config); +init_per_group(amqp091_to_amqp10, Config) -> + rabbit_ct_helpers:set_config( + Config, + [ + {src_protocol, <<"amqp091">>}, + {dest_protocol, <<"amqp10">>}, + {src_address, <<"src-queue">>}, + {dest_address, <<"dest-address">>} + ]); +init_per_group(amqp091_to_local, Config0) -> + Config = rabbit_ct_helpers:set_config( + Config0, + [ + {src_protocol, <<"amqp091">>}, + {dest_protocol, <<"local">>}, + {src_address, <<"src-queue">>}, + {dest_address, <<"dest-queue">>} + ]), + maybe_skip_local_protocol(Config); +init_per_group(amqp10_to_amqp091, Config) -> + rabbit_ct_helpers:set_config( + Config, + [ + {src_protocol, <<"amqp10">>}, + {dest_protocol, <<"amqp091">>}, + {src_address, <<"src-address">>}, + {dest_address, <<"dest-queue">>} + ]); +init_per_group(amqp10_to_local, Config0) -> + Config = rabbit_ct_helpers:set_config( + Config0, + [ + {src_protocol, <<"amqp10">>}, + {dest_protocol, <<"local">>}, + {src_address, <<"src-address">>}, + {dest_address, <<"dest-queue">>} + ]), + maybe_skip_local_protocol(Config); +init_per_group(local_to_amqp091, Config0) -> + Config = rabbit_ct_helpers:set_config( + Config0, + [ + {src_protocol, <<"local">>}, + {dest_protocol, <<"amqp091">>}, + {src_address, <<"src-queue">>}, + {dest_address, <<"dest-queue">>} + ]), + maybe_skip_local_protocol(Config); +init_per_group(local_to_amqp10, Config0) -> + Config = rabbit_ct_helpers:set_config( + Config0, + [ + {src_protocol, <<"local">>}, + {dest_protocol, <<"amqp10">>}, + {src_address, <<"src-queue">>}, + {dest_address, <<"dest-address">>} + ]), + maybe_skip_local_protocol(Config). + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config0) -> + SrcQ = list_to_binary(atom_to_list(Testcase) ++ "_src"), + DestQ = list_to_binary(atom_to_list(Testcase) ++ "_dest"), + ShovelArgs = [{<<"src-protocol">>, ?config(src_protocol, Config0)}, + {<<"dest-protocol">>, ?config(dest_protocol, Config0)}, + {?config(src_address, Config0), SrcQ}, + {?config(dest_address, Config0), DestQ}], + Config = rabbit_ct_helpers:set_config( + Config0, + [{srcq, SrcQ}, {destq, DestQ}, {shovel_args, ShovelArgs}]), + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + shovel_test_utils:clear_param(Config, ?PARAM), + rabbit_ct_broker_helpers:rpc(Config, 0, shovel_test_utils, delete_all_queues, []), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- +simple(Config) -> + Name = <<"test">>, + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp10_session( + Config, + fun (Sess) -> + set_param(Config, Name, ?config(shovel_args, Config)), + amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1), + Status = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, Name}]), + ?assertMatch([_|_], Status), + ?assertMatch(#{metrics := #{forwarded := 1}}, maps:from_list(Status)) + end). + + +%%---------------------------------------------------------------------------- +maybe_skip_local_protocol(Config) -> + [Node] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + case rabbit_ct_broker_helpers:enable_feature_flag( + Config, [Node], 'rabbitmq_4.0.0') of + ok -> + Config; + _ -> + {skip, "This group requires rabbitmq_4.0.0 feature flag"} + end. diff --git a/deps/rabbitmq_shovel/test/shovel_test_utils.erl b/deps/rabbitmq_shovel/test/shovel_test_utils.erl index e0448db5b4..3b7b40758e 100644 --- a/deps/rabbitmq_shovel/test/shovel_test_utils.erl +++ b/deps/rabbitmq_shovel/test/shovel_test_utils.erl @@ -16,7 +16,16 @@ await/1, await/2, await_amqp10_event/3, await_credit/1, clear_param/2, clear_param/3, make_uri/2, make_uri/3, make_uri/5, - await_shovel1/4, await_no_shovel/2]). + await_shovel1/4, await_no_shovel/2, + delete_all_queues/0, + with_amqp10_session/2, with_amqp10_session/3, + amqp10_publish/3, amqp10_publish/4, + amqp10_expect_empty/2, amqp10_expect_one/2, + amqp10_expect_count/3, amqp10_expect/3, + amqp10_publish_expect/5, + await_autodelete/2, await_autodelete1/2, + invalid_param/2, invalid_param/3, + valid_param/2, valid_param/3, valid_param1/3]). make_uri(Config, Node) -> Hostname = ?config(rmq_hostname, Config), @@ -170,3 +179,145 @@ restart_shovel(Config, Name) -> restart_shovel(Config, Node, Name) -> rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_shovel_util, restart_shovel, [<<"/">>, Name]). + +delete_all_queues() -> + Queues = rabbit_amqqueue:list(), + lists:foreach( + fun(Q) -> + {ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) + end, Queues). + +with_amqp10_session(Config, Fun) -> + with_amqp10_session(Config, <<"/">>, Fun). + +with_amqp10_session(Config, VHost, Fun) -> + Hostname = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + Cfg = #{address => Hostname, + port => Port, + sasl => {plain, <<"guest">>, <<"guest">>}, + hostname => <<"vhost:", VHost/binary>>}, + {ok, Conn} = amqp10_client:open_connection(Cfg), + {ok, Sess} = amqp10_client:begin_session(Conn), + Fun(Sess), + amqp10_client:close_connection(Conn), + ok. + +amqp10_publish(Sender, Tag, Payload) when is_binary(Payload) -> + Headers = #{durable => true}, + Msg = amqp10_msg:set_headers(Headers, + amqp10_msg:new(Tag, Payload, false)), + ok = amqp10_client:send_msg(Sender, Msg), + receive + {amqp10_disposition, {accepted, Tag}} -> ok + after 3000 -> + exit(publish_disposition_not_received) + end. + +amqp10_expect_empty(Session, Dest) -> + LinkName = <<"dynamic-receiver-", Dest/binary>>, + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName, + Dest, settled, + unsettled_state), + ok = amqp10_client:flow_link_credit(Receiver, 1, never), + receive + {amqp10_msg, Receiver, _} -> + throw(unexpected_msg) + after 500 -> + ok + end, + amqp10_client:detach_link(Receiver). + +amqp10_publish(Session, Address, Payload, Count) -> + LinkName = <<"dynamic-sender-", Address/binary>>, + {ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, Address, + unsettled, unsettled_state), + ok = await_amqp10_event(link, Sender, attached), + ok = await_credit(Sender), + [begin + Tag = rabbit_data_coercion:to_binary(I), + amqp10_publish(Sender, Tag, <>) + end || I <- lists:seq(1, Count)], + amqp10_client:detach_link(Sender). + +amqp10_publish_expect(Session, Source, Destination, Payload, Count) -> + amqp10_publish(Session, Source, Payload, Count), + amqp10_expect_count(Session, Destination, Count). + +amqp10_expect_one(Session, Dest) -> + LinkName = <<"dynamic-receiver-", Dest/binary>>, + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName, + Dest, settled, + unsettled_state), + ok = amqp10_client:flow_link_credit(Receiver, 1, never), + Msg = amqp10_expect(Receiver), + amqp10_client:detach_link(Receiver), + Msg. + +amqp10_expect_count(Session, Dest, Count) -> + LinkName = <<"dynamic-receiver-", Dest/binary>>, + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName, + Dest, settled, + unsettled_state), + ok = amqp10_client:flow_link_credit(Receiver, Count, never), + Msgs = amqp10_expect(Receiver, Count, []), + receive + {amqp10_msg, Receiver, Msg} -> + throw({unexpected_msg, Msg}) + after 500 -> + ok + end, + amqp10_client:detach_link(Receiver), + Msgs. + +amqp10_expect(_, 0, Acc) -> + Acc; +amqp10_expect(Receiver, N, Acc) -> + receive + {amqp10_msg, Receiver, InMsg} -> + amqp10_expect(Receiver, N - 1, [InMsg | Acc]) + after 4000 -> + throw({timeout_in_expect_waiting_for_delivery, N, Acc}) + end. + +amqp10_expect(Receiver) -> + receive + {amqp10_msg, Receiver, InMsg} -> + InMsg + after 4000 -> + throw(timeout_in_expect_waiting_for_delivery) + end. + +await_autodelete(Config, Name) -> + rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, await_autodelete1, [Config, Name], 10000). + +await_autodelete1(_Config, Name) -> + await( + fun () -> not lists:member(Name, shovels_from_parameters()) end), + await( + fun () -> + not lists:member(Name, + shovels_from_status()) + end). + +shovels_from_parameters() -> + L = rabbit_runtime_parameters:list(<<"/">>, <<"shovel">>), + [rabbit_misc:pget(name, Shovel) || Shovel <- L]. + +invalid_param(Config, Value, User) -> + {error_string, _} = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_runtime_parameters, set, + [<<"/">>, <<"shovel">>, <<"invalid">>, Value, User]). + +valid_param(Config, Value, User) -> + rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, valid_param1, [Config, Value, User]). + +valid_param1(_Config, Value, User) -> + ok = rabbit_runtime_parameters:set( + <<"/">>, <<"shovel">>, <<"name">>, Value, User), + ok = rabbit_runtime_parameters:clear(<<"/">>, <<"shovel">>, <<"name">>, <<"acting-user">>). + +invalid_param(Config, Value) -> invalid_param(Config, Value, none). +valid_param(Config, Value) -> valid_param(Config, Value, none).