parent
							
								
									fa23f67e76
								
							
						
					
					
						commit
						a8430308fe
					
				| 
						 | 
				
			
			@ -11,6 +11,9 @@
 | 
			
		|||
-include_lib("amqp_client/include/amqp_client.hrl").
 | 
			
		||||
 | 
			
		||||
-import(rabbit_ct_helpers, [eventually/1]).
 | 
			
		||||
-import(shovel_test_utils, [await_autodelete/2,
 | 
			
		||||
                            invalid_param/2, invalid_param/3,
 | 
			
		||||
                            valid_param/2, valid_param/3]).
 | 
			
		||||
 | 
			
		||||
-compile(export_all).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -814,23 +817,6 @@ expect_count(Ch, Q, M, Count) ->
 | 
			
		|||
     end || _ <- lists:seq(1, Count)],
 | 
			
		||||
    expect_empty(Ch, Q).
 | 
			
		||||
 | 
			
		||||
invalid_param(Config, Value, User) ->
 | 
			
		||||
    {error_string, _} = rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
      rabbit_runtime_parameters, set,
 | 
			
		||||
      [<<"/">>, <<"shovel">>, <<"invalid">>, Value, User]).
 | 
			
		||||
 | 
			
		||||
valid_param(Config, Value, User) ->
 | 
			
		||||
    rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
      ?MODULE, valid_param1, [Config, Value, User]).
 | 
			
		||||
 | 
			
		||||
valid_param1(_Config, Value, User) ->
 | 
			
		||||
    ok = rabbit_runtime_parameters:set(
 | 
			
		||||
           <<"/">>, <<"shovel">>, <<"name">>, Value, User),
 | 
			
		||||
    ok = rabbit_runtime_parameters:clear(<<"/">>, <<"shovel">>, <<"name">>, <<"acting-user">>).
 | 
			
		||||
 | 
			
		||||
invalid_param(Config, Value) -> invalid_param(Config, Value, none).
 | 
			
		||||
valid_param(Config, Value) -> valid_param(Config, Value, none).
 | 
			
		||||
 | 
			
		||||
lookup_user(Config, Name) ->
 | 
			
		||||
    {ok, User} = rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
      rabbit_access_control, check_user_login, [Name, []]),
 | 
			
		||||
| 
						 | 
				
			
			@ -849,23 +835,6 @@ cleanup1(_Config) ->
 | 
			
		|||
    [rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>)
 | 
			
		||||
     || Q <- rabbit_amqqueue:list()].
 | 
			
		||||
 | 
			
		||||
await_autodelete(Config, Name) ->
 | 
			
		||||
    rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
      ?MODULE, await_autodelete1, [Config, Name]).
 | 
			
		||||
 | 
			
		||||
await_autodelete1(_Config, Name) ->
 | 
			
		||||
    shovel_test_utils:await(
 | 
			
		||||
      fun () -> not lists:member(Name, shovels_from_parameters()) end),
 | 
			
		||||
    shovel_test_utils:await(
 | 
			
		||||
      fun () ->
 | 
			
		||||
              not lists:member(Name,
 | 
			
		||||
                               shovel_test_utils:shovels_from_status())
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
shovels_from_parameters() ->
 | 
			
		||||
    L = rabbit_runtime_parameters:list(<<"/">>, <<"shovel">>),
 | 
			
		||||
    [rabbit_misc:pget(name, Shovel) || Shovel <- L].
 | 
			
		||||
 | 
			
		||||
set_default_credit(Config, Value) ->
 | 
			
		||||
    Key = credit_flow_default_credit,
 | 
			
		||||
    OrigValue = rabbit_ct_broker_helpers:rpc(Config, persistent_term, get, [Key]),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -11,7 +11,13 @@
 | 
			
		|||
-include_lib("eunit/include/eunit.hrl").
 | 
			
		||||
-compile(export_all).
 | 
			
		||||
 | 
			
		||||
-import(shovel_test_utils, [await_credit/1]).
 | 
			
		||||
-import(shovel_test_utils, [with_amqp10_session/2,
 | 
			
		||||
                            amqp10_publish/3, amqp10_publish/5,
 | 
			
		||||
                            amqp10_expect_empty/2,
 | 
			
		||||
                            await_amqp10_event/3, amqp10_expect_one/2,
 | 
			
		||||
                            amqp10_expect_count/3, amqp10_publish/4,
 | 
			
		||||
                            amqp10_publish_expect/5,
 | 
			
		||||
                            await_autodelete/2]).
 | 
			
		||||
 | 
			
		||||
all() ->
 | 
			
		||||
    [
 | 
			
		||||
| 
						 | 
				
			
			@ -86,7 +92,7 @@ end_per_testcase(Testcase, Config) ->
 | 
			
		|||
simple(Config) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              test_amqp10_destination(Config, Src, Dest, Sess, <<"amqp10">>,
 | 
			
		||||
                                      <<"src-address">>)
 | 
			
		||||
| 
						 | 
				
			
			@ -95,7 +101,7 @@ simple(Config) ->
 | 
			
		|||
simple_amqp10_dest(Config) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              test_amqp10_destination(Config, Src, Dest, Sess, <<"amqp091">>,
 | 
			
		||||
                                      <<"src-queue">>)
 | 
			
		||||
| 
						 | 
				
			
			@ -105,7 +111,7 @@ amqp091_to_amqp10_with_dead_lettering(Config) ->
 | 
			
		|||
    Dest = ?config(destq, Config),
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    TmpQ = <<"tmp">>,
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Sess, <<"my link pair">>),
 | 
			
		||||
              {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, TmpQ,
 | 
			
		||||
| 
						 | 
				
			
			@ -118,10 +124,10 @@ amqp091_to_amqp10_with_dead_lettering(Config) ->
 | 
			
		|||
                                                              unsettled,
 | 
			
		||||
                                                              unsettled_state),
 | 
			
		||||
              ok = await_amqp10_event(link, Sender, attached),
 | 
			
		||||
              expect_empty(Sess, TmpQ),
 | 
			
		||||
              amqp10_expect_empty(Sess, TmpQ),
 | 
			
		||||
              test_amqp10_destination(Config, Src, Dest, Sess, <<"amqp091">>, <<"src-queue">>),
 | 
			
		||||
              %% publish to tmp, it should be dead-lettered to src and then shovelled to dest
 | 
			
		||||
              _ = publish_expect(Sess, TmpQ, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, TmpQ, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
test_amqp10_destination(Config, Src, Dest, Sess, Protocol, ProtocolSrc) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -156,7 +162,7 @@ test_amqp10_destination(Config, Src, Dest, Sess, Protocol, ProtocolSrc) ->
 | 
			
		|||
                                          [{<<"x-message-ann-key">>,
 | 
			
		||||
                                            <<"message-ann-value">>}]
 | 
			
		||||
                                  end}]),
 | 
			
		||||
    Msg = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>),
 | 
			
		||||
    [Msg] = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1),
 | 
			
		||||
    AppProps = amqp10_msg:application_properties(Msg),
 | 
			
		||||
    Anns = amqp10_msg:message_annotations(Msg),
 | 
			
		||||
    %% We no longer add/override properties, application properties or
 | 
			
		||||
| 
						 | 
				
			
			@ -176,7 +182,7 @@ simple_amqp10_src(Config) ->
 | 
			
		|||
    MapConfig = ?config(map_config, Config),
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              shovel_test_utils:set_param(
 | 
			
		||||
                Config,
 | 
			
		||||
| 
						 | 
				
			
			@ -192,8 +198,7 @@ simple_amqp10_src(Config) ->
 | 
			
		|||
                                    _    -> [{<<"cluster_id">>, <<"x">>}]
 | 
			
		||||
                                end}
 | 
			
		||||
                            ]),
 | 
			
		||||
              _Msg = publish_expect(Sess, Src, Dest, <<"tag1">>,
 | 
			
		||||
                                    <<"hello">>),
 | 
			
		||||
              _Msg = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1),
 | 
			
		||||
              % the fidelity loss is quite high when consuming using the amqp10
 | 
			
		||||
              % plugin. For example custom headers aren't current translated.
 | 
			
		||||
              % This isn't due to the shovel though.
 | 
			
		||||
| 
						 | 
				
			
			@ -204,7 +209,7 @@ amqp10_to_amqp091_application_properties(Config) ->
 | 
			
		|||
    MapConfig = ?config(map_config, Config),
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              shovel_test_utils:set_param(
 | 
			
		||||
                Config,
 | 
			
		||||
| 
						 | 
				
			
			@ -240,25 +245,25 @@ change_definition(Config) ->
 | 
			
		|||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    Dest2 = ?config(destq2, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              shovel_test_utils:set_param(Config, <<"test">>,
 | 
			
		||||
                                          [{<<"src-address">>,  Src},
 | 
			
		||||
                                           {<<"src-protocol">>, <<"amqp10">>},
 | 
			
		||||
                                           {<<"dest-protocol">>, <<"amqp10">>},
 | 
			
		||||
                                           {<<"dest-address">>, Dest}]),
 | 
			
		||||
              publish_expect(Sess, Src, Dest, <<"tag2">>,<<"hello">>),
 | 
			
		||||
              amqp10_publish_expect(Sess, Src, Dest, <<"hello1">>, 1),
 | 
			
		||||
              shovel_test_utils:set_param(Config, <<"test">>,
 | 
			
		||||
                                          [{<<"src-address">>,  Src},
 | 
			
		||||
                                           {<<"src-protocol">>, <<"amqp10">>},
 | 
			
		||||
                                           {<<"dest-protocol">>, <<"amqp10">>},
 | 
			
		||||
                                           {<<"dest-address">>, Dest2}]),
 | 
			
		||||
              publish_expect(Sess, Src, Dest2, <<"tag3">>, <<"hello">>),
 | 
			
		||||
              expect_empty(Sess, Dest),
 | 
			
		||||
              amqp10_publish_expect(Sess, Src, Dest2, <<"hello2">>, 1),
 | 
			
		||||
              amqp10_expect_empty(Sess, Dest),
 | 
			
		||||
              shovel_test_utils:clear_param(Config, <<"test">>),
 | 
			
		||||
              publish_expect(Sess, Src, Src, <<"tag4">>, <<"hello2">>),
 | 
			
		||||
              expect_empty(Sess, Dest),
 | 
			
		||||
              expect_empty(Sess, Dest2)
 | 
			
		||||
              amqp10_publish_expect(Sess, Src, Src, <<"hello3">>, 1),
 | 
			
		||||
              amqp10_expect_empty(Sess, Dest),
 | 
			
		||||
              amqp10_expect_empty(Sess, Dest2)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
autodelete_amqp091_src_on_confirm(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -282,13 +287,13 @@ autodelete_amqp091_dest_on_publish(Config) ->
 | 
			
		|||
    ok.
 | 
			
		||||
 | 
			
		||||
autodelete_case(Config, Args, CaseFun) ->
 | 
			
		||||
    with_session(Config, CaseFun(Config, Args)).
 | 
			
		||||
    with_amqp10_session(Config, CaseFun(Config, Args)).
 | 
			
		||||
 | 
			
		||||
autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    fun (Session) ->
 | 
			
		||||
            publish_count(Session, Src, <<"hello">>, 100),
 | 
			
		||||
            amqp10_publish(Session, Src, <<"hello">>, 100),
 | 
			
		||||
            shovel_test_utils:set_param_nowait(
 | 
			
		||||
              Config,
 | 
			
		||||
              <<"test">>, [{<<"src-address">>,    Src},
 | 
			
		||||
| 
						 | 
				
			
			@ -300,15 +305,15 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) ->
 | 
			
		|||
                           {<<"ack-mode">>,     AckMode}
 | 
			
		||||
                          ]),
 | 
			
		||||
            await_autodelete(Config, <<"test">>),
 | 
			
		||||
            expect_count(Session, Dest, ExpDest),
 | 
			
		||||
            expect_count(Session, Src, ExpSrc)
 | 
			
		||||
            amqp10_expect_count(Session, Dest, ExpDest),
 | 
			
		||||
            amqp10_expect_count(Session, Src, ExpSrc)
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
autodelete_amqp091_src(Config, {AckMode, After, ExpSrc, ExpDest}) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    fun (Session) ->
 | 
			
		||||
            publish_count(Session, Src, <<"hello">>, 100),
 | 
			
		||||
            amqp10_publish(Session, Src, <<"hello">>, 100),
 | 
			
		||||
            shovel_test_utils:set_param_nowait(
 | 
			
		||||
              Config,
 | 
			
		||||
              <<"test">>, [{<<"src-queue">>, Src},
 | 
			
		||||
| 
						 | 
				
			
			@ -320,15 +325,15 @@ autodelete_amqp091_src(Config, {AckMode, After, ExpSrc, ExpDest}) ->
 | 
			
		|||
                           {<<"ack-mode">>, AckMode}
 | 
			
		||||
                          ]),
 | 
			
		||||
            await_autodelete(Config, <<"test">>),
 | 
			
		||||
            expect_count(Session, Dest, ExpDest),
 | 
			
		||||
            expect_count(Session, Src, ExpSrc)
 | 
			
		||||
            amqp10_expect_count(Session, Dest, ExpDest),
 | 
			
		||||
            amqp10_expect_count(Session, Src, ExpSrc)
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    fun (Session) ->
 | 
			
		||||
            publish_count(Session, Src, <<"hello">>, 100),
 | 
			
		||||
            amqp10_publish(Session, Src, <<"hello">>, 100),
 | 
			
		||||
            shovel_test_utils:set_param_nowait(
 | 
			
		||||
              Config,
 | 
			
		||||
              <<"test">>, [{<<"src-address">>, Src},
 | 
			
		||||
| 
						 | 
				
			
			@ -340,8 +345,8 @@ autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) ->
 | 
			
		|||
                           {<<"ack-mode">>, AckMode}
 | 
			
		||||
                          ]),
 | 
			
		||||
            await_autodelete(Config, <<"test">>),
 | 
			
		||||
            expect_count(Session, Dest, ExpDest),
 | 
			
		||||
            expect_count(Session, Src, ExpSrc)
 | 
			
		||||
            amqp10_expect_count(Session, Dest, ExpDest),
 | 
			
		||||
            amqp10_expect_count(Session, Src, ExpSrc)
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
test_amqp10_delete_after_queue_length(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -364,27 +369,6 @@ test_amqp10_delete_after_queue_length(Config) ->
 | 
			
		|||
    ?assertMatch(match, re:run(Msg, "Validation failed.*", [{capture, none}])).
 | 
			
		||||
 | 
			
		||||
%%----------------------------------------------------------------------------
 | 
			
		||||
 | 
			
		||||
with_session(Config, Fun) ->
 | 
			
		||||
    Hostname = ?config(rmq_hostname, Config),
 | 
			
		||||
    Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
 | 
			
		||||
    {ok, Conn} = amqp10_client:open_connection(Hostname, Port),
 | 
			
		||||
    {ok, Sess} = amqp10_client:begin_session(Conn),
 | 
			
		||||
    Fun(Sess),
 | 
			
		||||
    amqp10_client:close_connection(Conn),
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
publish(Sender, Tag, Payload) when is_binary(Payload) ->
 | 
			
		||||
    Headers = #{durable => true},
 | 
			
		||||
    Msg = amqp10_msg:set_headers(Headers,
 | 
			
		||||
                                 amqp10_msg:new(Tag, Payload, false)),
 | 
			
		||||
    ok = amqp10_client:send_msg(Sender, Msg),
 | 
			
		||||
    receive
 | 
			
		||||
        {amqp10_disposition, {accepted, Tag}} -> ok
 | 
			
		||||
    after 3000 ->
 | 
			
		||||
              exit(publish_disposition_not_received)
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
publish(Sender, Msg) ->
 | 
			
		||||
    ok = amqp10_client:send_msg(Sender, Msg),
 | 
			
		||||
    Tag = amqp10_msg:delivery_tag(Msg),
 | 
			
		||||
| 
						 | 
				
			
			@ -394,16 +378,6 @@ publish(Sender, Msg) ->
 | 
			
		|||
            exit(publish_disposition_not_received)
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
publish_expect(Session, Source, Dest, Tag, Payload) ->
 | 
			
		||||
    LinkName = <<"dynamic-sender-", Dest/binary>>,
 | 
			
		||||
    {ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, Source,
 | 
			
		||||
                                                    unsettled, unsettled_state),
 | 
			
		||||
    ok = await_amqp10_event(link, Sender, attached),
 | 
			
		||||
    await_credit(Sender),
 | 
			
		||||
    publish(Sender, Tag, Payload),
 | 
			
		||||
    amqp10_client:detach_link(Sender),
 | 
			
		||||
    expect_one(Session, Dest).
 | 
			
		||||
 | 
			
		||||
publish_expect_msg(Session, Source, Dest, Msg) ->
 | 
			
		||||
    LinkName = <<"dynamic-sender-", Dest/binary>>,
 | 
			
		||||
    {ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, Source,
 | 
			
		||||
| 
						 | 
				
			
			@ -411,104 +385,9 @@ publish_expect_msg(Session, Source, Dest, Msg) ->
 | 
			
		|||
    ok = await_amqp10_event(link, Sender, attached),
 | 
			
		||||
    publish(Sender, Msg),
 | 
			
		||||
    amqp10_client:detach_link(Sender),
 | 
			
		||||
    expect_one(Session, Dest).
 | 
			
		||||
 | 
			
		||||
await_amqp10_event(On, Ref, Evt) ->
 | 
			
		||||
    receive
 | 
			
		||||
        {amqp10_event, {On, Ref, Evt}} -> ok
 | 
			
		||||
    after 5000 ->
 | 
			
		||||
          exit({amqp10_event_timeout, On, Ref, Evt})
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
expect_one(Session, Dest) ->
 | 
			
		||||
    LinkName = <<"dynamic-receiver-", Dest/binary>>,
 | 
			
		||||
    {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName,
 | 
			
		||||
                                                        Dest, settled,
 | 
			
		||||
                                                        unsettled_state),
 | 
			
		||||
    ok = amqp10_client:flow_link_credit(Receiver, 1, never),
 | 
			
		||||
    Msg = expect(Receiver),
 | 
			
		||||
    amqp10_client:detach_link(Receiver),
 | 
			
		||||
    Msg.
 | 
			
		||||
 | 
			
		||||
expect(Receiver) ->
 | 
			
		||||
    receive
 | 
			
		||||
        {amqp10_msg, Receiver, InMsg} ->
 | 
			
		||||
            InMsg
 | 
			
		||||
    after 4000 ->
 | 
			
		||||
              throw(timeout_in_expect_waiting_for_delivery)
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
expect_empty(Session, Dest) ->
 | 
			
		||||
    {ok, Receiver} = amqp10_client:attach_receiver_link(Session,
 | 
			
		||||
                                                        <<"dynamic-receiver">>,
 | 
			
		||||
                                                        Dest, settled,
 | 
			
		||||
                                                        unsettled_state),
 | 
			
		||||
    % probably good enough given we don't currently have a means of
 | 
			
		||||
    % echoing flow state
 | 
			
		||||
    {error, timeout} = amqp10_client:get_msg(Receiver, 250),
 | 
			
		||||
    amqp10_client:detach_link(Receiver).
 | 
			
		||||
 | 
			
		||||
publish_count(Session, Address, Payload, Count) ->
 | 
			
		||||
    LinkName = <<"dynamic-sender-", Address/binary>>,
 | 
			
		||||
    {ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName,
 | 
			
		||||
                                                    Address, unsettled,
 | 
			
		||||
                                                    unsettled_state),
 | 
			
		||||
    ok = await_amqp10_event(link, Sender, attached),
 | 
			
		||||
    [begin
 | 
			
		||||
         Tag = rabbit_data_coercion:to_binary(I),
 | 
			
		||||
         publish(Sender, Tag, <<Payload/binary, Tag/binary>>)
 | 
			
		||||
     end || I <- lists:seq(1, Count)],
 | 
			
		||||
     amqp10_client:detach_link(Sender).
 | 
			
		||||
 | 
			
		||||
expect_count(Session, Address, Count) ->
 | 
			
		||||
    {ok, Receiver} = amqp10_client:attach_receiver_link(Session,
 | 
			
		||||
                                                        <<"dynamic-receiver",
 | 
			
		||||
                                                          Address/binary>>,
 | 
			
		||||
                                                        Address, settled,
 | 
			
		||||
                                                        unsettled_state),
 | 
			
		||||
    ok = amqp10_client:flow_link_credit(Receiver, Count, never),
 | 
			
		||||
    [begin
 | 
			
		||||
         expect(Receiver)
 | 
			
		||||
     end || _ <- lists:seq(1, Count)],
 | 
			
		||||
    expect_empty(Session, Address),
 | 
			
		||||
    amqp10_client:detach_link(Receiver).
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
invalid_param(Config, Value, User) ->
 | 
			
		||||
    {error_string, _} = rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
      rabbit_runtime_parameters, set,
 | 
			
		||||
      [<<"/">>, <<"shovel">>, <<"invalid">>, Value, User]).
 | 
			
		||||
 | 
			
		||||
valid_param(Config, Value, User) ->
 | 
			
		||||
    rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
      ?MODULE, valid_param1, [Config, Value, User]).
 | 
			
		||||
 | 
			
		||||
valid_param1(_Config, Value, User) ->
 | 
			
		||||
    ok = rabbit_runtime_parameters:set(
 | 
			
		||||
           <<"/">>, <<"shovel">>, <<"a">>, Value, User),
 | 
			
		||||
    ok = rabbit_runtime_parameters:clear(<<"/">>, <<"shovel">>, <<"a">>, <<"acting-user">>).
 | 
			
		||||
 | 
			
		||||
invalid_param(Config, Value) -> invalid_param(Config, Value, none).
 | 
			
		||||
valid_param(Config, Value) -> valid_param(Config, Value, none).
 | 
			
		||||
    amqp10_expect_one(Session, Dest).
 | 
			
		||||
 | 
			
		||||
lookup_user(Config, Name) ->
 | 
			
		||||
    {ok, User} = rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
      rabbit_access_control, check_user_login, [Name, []]),
 | 
			
		||||
    User.
 | 
			
		||||
 | 
			
		||||
await_autodelete(Config, Name) ->
 | 
			
		||||
    rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
      ?MODULE, await_autodelete1, [Config, Name], 10000).
 | 
			
		||||
 | 
			
		||||
await_autodelete1(_Config, Name) ->
 | 
			
		||||
    shovel_test_utils:await(
 | 
			
		||||
      fun () -> not lists:member(Name, shovels_from_parameters()) end),
 | 
			
		||||
    shovel_test_utils:await(
 | 
			
		||||
      fun () ->
 | 
			
		||||
              not lists:member(Name,
 | 
			
		||||
                               shovel_test_utils:shovels_from_status())
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
shovels_from_parameters() ->
 | 
			
		||||
    L = rabbit_runtime_parameters:list(<<"/">>, <<"shovel">>),
 | 
			
		||||
    [rabbit_misc:pget(name, Shovel) || Shovel <- L].
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -146,7 +146,7 @@ amqp10_destination(Config, AckMode) ->
 | 
			
		|||
            ?assertMatch(#{durable := true}, amqp10_msg:headers(InMsg)),
 | 
			
		||||
            ok
 | 
			
		||||
    after ?TIMEOUT ->
 | 
			
		||||
              throw(timeout_waiting_for_deliver1)
 | 
			
		||||
            throw(timeout_waiting_for_deliver1)
 | 
			
		||||
    end,
 | 
			
		||||
 | 
			
		||||
    [{test_shovel, static, {running, _Info}, _Metrics, _Time}] =
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -14,7 +14,12 @@
 | 
			
		|||
 | 
			
		||||
-compile(export_all).
 | 
			
		||||
 | 
			
		||||
-import(shovel_test_utils, [await_amqp10_event/3, await_credit/1]).
 | 
			
		||||
-import(shovel_test_utils, [with_amqp10_session/2, with_amqp10_session/3,
 | 
			
		||||
                            amqp10_expect_empty/2,
 | 
			
		||||
                            amqp10_publish/4, amqp10_expect_one/2,
 | 
			
		||||
                            amqp10_expect_count/3, amqp10_expect/3,
 | 
			
		||||
                            amqp10_publish_expect/5,
 | 
			
		||||
                            await_autodelete/2]).
 | 
			
		||||
 | 
			
		||||
-define(PARAM, <<"test">>).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -134,7 +139,7 @@ init_per_testcase(Testcase, Config0) ->
 | 
			
		|||
 | 
			
		||||
end_per_testcase(Testcase, Config) ->
 | 
			
		||||
    shovel_test_utils:clear_param(Config, ?PARAM),
 | 
			
		||||
    rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_all_queues, []),
 | 
			
		||||
    rabbit_ct_broker_helpers:rpc(Config, 0, shovel_test_utils, delete_all_queues, []),
 | 
			
		||||
    _ = rabbit_ct_broker_helpers:delete_vhost(Config, ?config(alt_vhost, Config)),
 | 
			
		||||
    rabbit_ct_helpers:testcase_finished(Config, Testcase).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -145,7 +150,7 @@ end_per_testcase(Testcase, Config) ->
 | 
			
		|||
local_to_local_opt_headers(Config) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -155,7 +160,7 @@ local_to_local_opt_headers(Config) ->
 | 
			
		|||
                                           {<<"dest-add-forward-headers">>, true},
 | 
			
		||||
                                           {<<"dest-add-timestamp-header">>, true}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              Msg = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>),
 | 
			
		||||
              [Msg] = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1),
 | 
			
		||||
              ?assertMatch(#{<<"x-opt-shovel-name">> := ?PARAM,
 | 
			
		||||
                             <<"x-opt-shovel-type">> := <<"dynamic">>,
 | 
			
		||||
                             <<"x-opt-shovelled-by">> := _,
 | 
			
		||||
| 
						 | 
				
			
			@ -166,7 +171,7 @@ local_to_local_opt_headers(Config) ->
 | 
			
		|||
local_to_local_queue_dest(Config) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -174,7 +179,7 @@ local_to_local_queue_dest(Config) ->
 | 
			
		|||
                                           {<<"dest-protocol">>, <<"local">>},
 | 
			
		||||
                                           {<<"dest-queue">>, Dest}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_original_dest(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -186,7 +191,7 @@ local_to_local_original_dest(Config) ->
 | 
			
		|||
    ok = rabbit_ct_broker_helpers:add_vhost(Config, AltVHost),
 | 
			
		||||
    ok = rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, AltVHost),
 | 
			
		||||
    declare_queue(Config, AltVHost, Dest),
 | 
			
		||||
    with_session(
 | 
			
		||||
    with_amqp10_session(
 | 
			
		||||
      Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              SrcUri = shovel_test_utils:make_uri(Config, 0, <<"%2F">>),
 | 
			
		||||
| 
						 | 
				
			
			@ -200,11 +205,11 @@ local_to_local_original_dest(Config) ->
 | 
			
		|||
                                                      {<<"dest-protocol">>, <<"local">>}],
 | 
			
		||||
                      none]),
 | 
			
		||||
              shovel_test_utils:await_shovel(Config, 0, ?PARAM),
 | 
			
		||||
              _ = publish(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish(Sess, Src, <<"hello">>, 1)
 | 
			
		||||
      end),
 | 
			
		||||
    with_session(Config, AltVHost,
 | 
			
		||||
    with_amqp10_session(Config, AltVHost,
 | 
			
		||||
                 fun (Sess) ->
 | 
			
		||||
                         expect_one(Sess, Dest)
 | 
			
		||||
                         amqp10_expect_one(Sess, Dest)
 | 
			
		||||
                 end).
 | 
			
		||||
 | 
			
		||||
local_to_local_exchange_dest(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -214,7 +219,7 @@ local_to_local_exchange_dest(Config) ->
 | 
			
		|||
    RoutingKey = <<"funky-routing-key">>,
 | 
			
		||||
    declare_exchange(Config, <<"/">>, AltExchange),
 | 
			
		||||
    declare_and_bind_queue(Config, <<"/">>, AltExchange, Dest, RoutingKey),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -223,7 +228,7 @@ local_to_local_exchange_dest(Config) ->
 | 
			
		|||
                                           {<<"dest-exchange">>, AltExchange},
 | 
			
		||||
                                           {<<"dest-exchange-key">>, RoutingKey}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_missing_exchange_dest(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -244,7 +249,7 @@ local_to_local_predeclared_src(Config) ->
 | 
			
		|||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    declare_queue(Config, <<"/">>, Src),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -253,14 +258,14 @@ local_to_local_predeclared_src(Config) ->
 | 
			
		|||
                                           {<<"dest-protocol">>, <<"local">>},
 | 
			
		||||
                                           {<<"dest-queue">>, Dest}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_predeclared_quorum_src(Config) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    declare_queue(Config, <<"/">>, Src, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -269,7 +274,7 @@ local_to_local_predeclared_quorum_src(Config) ->
 | 
			
		|||
                                           {<<"dest-protocol">>, <<"local">>},
 | 
			
		||||
                                           {<<"dest-queue">>, Dest}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_predeclared_stream_first_offset_src(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -277,9 +282,9 @@ local_to_local_predeclared_stream_first_offset_src(Config) ->
 | 
			
		|||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    declare_queue(Config, <<"/">>, Src, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              publish_many(Sess, Src, Dest, <<"tag1">>, 20),
 | 
			
		||||
              amqp10_publish(Sess, Src, <<"tag1">>, 20),
 | 
			
		||||
              shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
                                           {<<"src-queue">>, Src},
 | 
			
		||||
| 
						 | 
				
			
			@ -288,9 +293,9 @@ local_to_local_predeclared_stream_first_offset_src(Config) ->
 | 
			
		|||
                                           {<<"dest-protocol">>, <<"local">>},
 | 
			
		||||
                                           {<<"dest-queue">>, Dest}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              expect_many(Sess, Dest, 20),
 | 
			
		||||
              expect_none(Sess, Dest),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              amqp10_expect_count(Sess, Dest, 20),
 | 
			
		||||
              amqp10_expect_empty(Sess, Dest),
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_predeclared_stream_last_offset_src(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -298,9 +303,9 @@ local_to_local_predeclared_stream_last_offset_src(Config) ->
 | 
			
		|||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    declare_queue(Config, <<"/">>, Src, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              publish_many(Sess, Src, Dest, <<"tag1">>, 20),
 | 
			
		||||
              amqp10_publish(Sess, Src, <<"tag1">>, 20),
 | 
			
		||||
              shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
                                           {<<"src-queue">>, Src},
 | 
			
		||||
| 
						 | 
				
			
			@ -310,9 +315,9 @@ local_to_local_predeclared_stream_last_offset_src(Config) ->
 | 
			
		|||
                                           {<<"dest-queue">>, Dest}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              %% Deliver last
 | 
			
		||||
              expect_many(Sess, Dest, 1),
 | 
			
		||||
              expect_none(Sess, Dest),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              amqp10_expect_count(Sess, Dest, 1),
 | 
			
		||||
              amqp10_expect_empty(Sess, Dest),
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_missing_predeclared_src(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -338,7 +343,7 @@ local_to_local_missing_predeclared_src(Config) ->
 | 
			
		|||
local_to_local_exchange_src(Config) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -348,7 +353,7 @@ local_to_local_exchange_src(Config) ->
 | 
			
		|||
                                           {<<"dest-queue">>, Dest}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              Target = <<"/exchange/amq.direct/", Src/binary>>,
 | 
			
		||||
              _ = publish_expect(Sess, Target, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Target, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_queue_args_src(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -387,7 +392,7 @@ local_to_local_predeclared_dest(Config) ->
 | 
			
		|||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    declare_queue(Config, <<"/">>, Dest),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -396,14 +401,14 @@ local_to_local_predeclared_dest(Config) ->
 | 
			
		|||
                                           {<<"dest-protocol">>, <<"local">>},
 | 
			
		||||
                                           {<<"dest-queue">>, Dest}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_predeclared_quorum_dest(Config) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    declare_queue(Config, <<"/">>, Dest, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -412,7 +417,7 @@ local_to_local_predeclared_quorum_dest(Config) ->
 | 
			
		|||
                                           {<<"dest-protocol">>, <<"local">>},
 | 
			
		||||
                                           {<<"dest-queue">>, Dest}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_missing_predeclared_dest(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -522,7 +527,7 @@ local_to_local_queue_and_exchange_dest_fails(Config) ->
 | 
			
		|||
local_to_local_delete_after_never(Config) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -530,8 +535,8 @@ local_to_local_delete_after_never(Config) ->
 | 
			
		|||
                                           {<<"dest-protocol">>, <<"local">>},
 | 
			
		||||
                                           {<<"dest-queue">>, Dest}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              publish_many(Sess, Src, Dest, <<"tag1">>, 20),
 | 
			
		||||
              expect_many(Sess, Dest, 20)
 | 
			
		||||
              amqp10_publish(Sess, Src, <<"tag1">>, 20),
 | 
			
		||||
              amqp10_expect_count(Sess, Dest, 20)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_delete_after_queue_length_zero(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -556,9 +561,9 @@ local_to_local_delete_after_queue_length(Config) ->
 | 
			
		|||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    declare_queue(Config, <<"/">>, Src),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              publish_many(Sess, Src, Dest, <<"tag1">>, 18),
 | 
			
		||||
              amqp10_publish(Sess, Src, <<"tag1">>, 18),
 | 
			
		||||
              shovel_test_utils:set_param_nowait(Config, ?PARAM,
 | 
			
		||||
                                                 [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
                                                  {<<"src-predeclared">>, true},
 | 
			
		||||
| 
						 | 
				
			
			@ -570,22 +575,18 @@ local_to_local_delete_after_queue_length(Config) ->
 | 
			
		|||
              %% The shovel parameter is only deleted when 'delete-after'
 | 
			
		||||
              %% is used. In any other failure, the shovel should
 | 
			
		||||
              %% remain and try to restart
 | 
			
		||||
              expect_many(Sess, Dest, 18),
 | 
			
		||||
              ?awaitMatch(not_found, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, lookup, [<<"/">>, <<"shovel">>, ?PARAM]), 30_000),
 | 
			
		||||
              ?awaitMatch([],
 | 
			
		||||
                          rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
                                                       rabbit_shovel_status, status, []),
 | 
			
		||||
                          30_000),
 | 
			
		||||
              publish_many(Sess, Src, Dest, <<"tag1">>, 5),
 | 
			
		||||
              expect_none(Sess, Dest)
 | 
			
		||||
              amqp10_expect_count(Sess, Dest, 18),
 | 
			
		||||
              await_autodelete(Config, ?PARAM),
 | 
			
		||||
              amqp10_publish(Sess, Src, <<"tag1">>, 5),
 | 
			
		||||
              amqp10_expect_empty(Sess, Dest)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_delete_after_number(Config) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              publish_many(Sess, Src, Dest, <<"tag1">>, 5),
 | 
			
		||||
              amqp10_publish(Sess, Src, <<"tag1">>, 5),
 | 
			
		||||
              shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
                                           {<<"src-queue">>, Src},
 | 
			
		||||
| 
						 | 
				
			
			@ -593,17 +594,17 @@ local_to_local_delete_after_number(Config) ->
 | 
			
		|||
                                           {<<"dest-protocol">>, <<"local">>},
 | 
			
		||||
                                           {<<"dest-queue">>, Dest}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              expect_many(Sess, Dest, 5),
 | 
			
		||||
              publish_many(Sess, Src, Dest, <<"tag1">>, 10),
 | 
			
		||||
              expect_many(Sess, Dest, 5),
 | 
			
		||||
              ?assertMatch(not_found, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, lookup, [<<"/">>, <<"shovel">>, ?PARAM])),
 | 
			
		||||
              expect_none(Sess, Dest)
 | 
			
		||||
              amqp10_expect_count(Sess, Dest, 5),
 | 
			
		||||
              amqp10_publish(Sess, Src, <<"tag1">>, 10),
 | 
			
		||||
              amqp10_expect_count(Sess, Dest, 5),
 | 
			
		||||
              await_autodelete(Config, ?PARAM),
 | 
			
		||||
              amqp10_expect_empty(Sess, Dest)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_no_ack(Config) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -612,7 +613,7 @@ local_to_local_no_ack(Config) ->
 | 
			
		|||
                                           {<<"dest-queue">>, Dest},
 | 
			
		||||
                                           {<<"ack-mode">>, <<"no-ack">>}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_quorum_no_ack(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -621,7 +622,7 @@ local_to_local_quorum_no_ack(Config) ->
 | 
			
		|||
    VHost = <<"/">>,
 | 
			
		||||
    declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
 | 
			
		||||
    declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -632,7 +633,7 @@ local_to_local_quorum_no_ack(Config) ->
 | 
			
		|||
                                           {<<"dest-queue">>, Dest},
 | 
			
		||||
                                           {<<"ack-mode">>, <<"no-ack">>}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_stream_no_ack(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -640,7 +641,7 @@ local_to_local_stream_no_ack(Config) ->
 | 
			
		|||
    Dest = ?config(destq, Config),
 | 
			
		||||
    declare_queue(Config, <<"/">>, Src, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
 | 
			
		||||
    declare_queue(Config, <<"/">>, Dest, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -652,19 +653,19 @@ local_to_local_stream_no_ack(Config) ->
 | 
			
		|||
                                           {<<"ack-mode">>, <<"no-ack">>}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              Receiver = subscribe(Sess, Dest),
 | 
			
		||||
              publish_many(Sess, Src, Dest, <<"tag1">>, 10),
 | 
			
		||||
              amqp10_publish(Sess, Src, <<"tag1">>, 10),
 | 
			
		||||
              ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 10}, _}],
 | 
			
		||||
                          rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
                                                       rabbit_shovel_status, status, []),
 | 
			
		||||
                          30000),
 | 
			
		||||
              _ = expect(Receiver, 10, []),
 | 
			
		||||
              _ = amqp10_expect(Receiver, 10, []),
 | 
			
		||||
              amqp10_client:detach_link(Receiver)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_on_confirm(Config) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -673,7 +674,7 @@ local_to_local_on_confirm(Config) ->
 | 
			
		|||
                                           {<<"dest-queue">>, Dest},
 | 
			
		||||
                                           {<<"ack-mode">>, <<"on-confirm">>}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_quorum_on_confirm(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -682,7 +683,7 @@ local_to_local_quorum_on_confirm(Config) ->
 | 
			
		|||
    VHost = <<"/">>,
 | 
			
		||||
    declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
 | 
			
		||||
    declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -693,7 +694,7 @@ local_to_local_quorum_on_confirm(Config) ->
 | 
			
		|||
                                           {<<"dest-queue">>, Dest},
 | 
			
		||||
                                           {<<"ack-mode">>, <<"on-confirm">>}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_stream_on_confirm(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -702,7 +703,7 @@ local_to_local_stream_on_confirm(Config) ->
 | 
			
		|||
    VHost = <<"/">>,
 | 
			
		||||
    declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
 | 
			
		||||
    declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -714,19 +715,19 @@ local_to_local_stream_on_confirm(Config) ->
 | 
			
		|||
                                           {<<"ack-mode">>, <<"on-confirm">>}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              Receiver = subscribe(Sess, Dest),
 | 
			
		||||
              publish_many(Sess, Src, Dest, <<"tag1">>, 10),
 | 
			
		||||
              amqp10_publish(Sess, Src, <<"tag1">>, 10),
 | 
			
		||||
              ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 10}, _}],
 | 
			
		||||
                          rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
                                                       rabbit_shovel_status, status, []),
 | 
			
		||||
                          30000),
 | 
			
		||||
              _ = expect(Receiver, 10, []),
 | 
			
		||||
              _ = amqp10_expect(Receiver, 10, []),
 | 
			
		||||
              amqp10_client:detach_link(Receiver)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_on_publish(Config) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -735,7 +736,7 @@ local_to_local_on_publish(Config) ->
 | 
			
		|||
                                           {<<"dest-queue">>, Dest},
 | 
			
		||||
                                           {<<"ack-mode">>, <<"on-publish">>}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_quorum_on_publish(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -744,7 +745,7 @@ local_to_local_quorum_on_publish(Config) ->
 | 
			
		|||
    VHost = <<"/">>,
 | 
			
		||||
    declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
 | 
			
		||||
    declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -755,7 +756,7 @@ local_to_local_quorum_on_publish(Config) ->
 | 
			
		|||
                                           {<<"dest-queue">>, Dest},
 | 
			
		||||
                                           {<<"ack-mode">>, <<"on-publish">>}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_stream_on_publish(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -764,7 +765,7 @@ local_to_local_stream_on_publish(Config) ->
 | 
			
		|||
    VHost = <<"/">>,
 | 
			
		||||
    declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
 | 
			
		||||
    declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -776,12 +777,12 @@ local_to_local_stream_on_publish(Config) ->
 | 
			
		|||
                                           {<<"ack-mode">>, <<"on-publish">>}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              Receiver = subscribe(Sess, Dest),
 | 
			
		||||
              publish_many(Sess, Src, Dest, <<"tag1">>, 10),
 | 
			
		||||
              amqp10_publish(Sess, Src, <<"tag1">>, 10),
 | 
			
		||||
              ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 10}, _}],
 | 
			
		||||
                          rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
                                                       rabbit_shovel_status, status, []),
 | 
			
		||||
                          30000),
 | 
			
		||||
              _ = expect(Receiver, 10, []),
 | 
			
		||||
              _ = amqp10_expect(Receiver, 10, []),
 | 
			
		||||
              amqp10_client:detach_link(Receiver)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -791,10 +792,10 @@ local_to_local_reject_publish(Config) ->
 | 
			
		|||
    declare_queue(Config, <<"/">>, Dest, [{<<"x-max-length">>, long, 1},
 | 
			
		||||
                                          {<<"x-overflow">>, longstr, <<"reject-publish">>}
 | 
			
		||||
                                         ]),
 | 
			
		||||
    with_session(
 | 
			
		||||
    with_amqp10_session(
 | 
			
		||||
      Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              publish_many(Sess, Src, Dest, <<"tag1">>, 5),
 | 
			
		||||
              amqp10_publish(Sess, Src, <<"tag1">>, 5),
 | 
			
		||||
              shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
                                           {<<"src-queue">>, Src},
 | 
			
		||||
| 
						 | 
				
			
			@ -803,14 +804,13 @@ local_to_local_reject_publish(Config) ->
 | 
			
		|||
                                           {<<"dest-queue">>, Dest},
 | 
			
		||||
                                           {<<"ack-mode">>, <<"on-confirm">>}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              expect_many(Sess, Dest, 1),
 | 
			
		||||
              expect_none(Sess, Dest)
 | 
			
		||||
              amqp10_expect_count(Sess, Dest, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_amqp091(Config) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -818,13 +818,13 @@ local_to_amqp091(Config) ->
 | 
			
		|||
                                           {<<"dest-protocol">>, <<"amqp091">>},
 | 
			
		||||
                                           {<<"dest-queue">>, Dest}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_amqp10(Config) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -832,13 +832,13 @@ local_to_amqp10(Config) ->
 | 
			
		|||
                                           {<<"dest-protocol">>, <<"amqp10">>},
 | 
			
		||||
                                           {<<"dest-address">>, Dest}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
amqp091_to_local(Config) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"amqp091">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -846,13 +846,13 @@ amqp091_to_local(Config) ->
 | 
			
		|||
                                           {<<"dest-protocol">>, <<"local">>},
 | 
			
		||||
                                           {<<"dest-queue">>, Dest}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
amqp10_to_local(Config) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"amqp10">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -860,13 +860,13 @@ amqp10_to_local(Config) ->
 | 
			
		|||
                                           {<<"dest-protocol">>, <<"local">>},
 | 
			
		||||
                                           {<<"dest-queue">>, Dest}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>)
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_delete_src_queue(Config) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -874,7 +874,7 @@ local_to_local_delete_src_queue(Config) ->
 | 
			
		|||
                                           {<<"dest-protocol">>, <<"local">>},
 | 
			
		||||
                                           {<<"dest-queue">>, Dest}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>),
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1),
 | 
			
		||||
              ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 1}, _}],
 | 
			
		||||
                          rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
                                                       rabbit_shovel_status, status, []),
 | 
			
		||||
| 
						 | 
				
			
			@ -890,7 +890,7 @@ local_to_local_delete_src_queue(Config) ->
 | 
			
		|||
local_to_local_delete_dest_queue(Config) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -898,7 +898,7 @@ local_to_local_delete_dest_queue(Config) ->
 | 
			
		|||
                                           {<<"dest-protocol">>, <<"local">>},
 | 
			
		||||
                                           {<<"dest-queue">>, Dest}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              _ = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>),
 | 
			
		||||
              _ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1),
 | 
			
		||||
              ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 1}, _}],
 | 
			
		||||
                          rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
                                                       rabbit_shovel_status, status, []),
 | 
			
		||||
| 
						 | 
				
			
			@ -956,7 +956,7 @@ local_to_local_credit_flow_no_ack(Config) ->
 | 
			
		|||
local_to_local_credit_flow(Config, AckMode) ->
 | 
			
		||||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -965,8 +965,8 @@ local_to_local_credit_flow(Config, AckMode) ->
 | 
			
		|||
                                           {<<"dest-queue">>, Dest},
 | 
			
		||||
                                           {<<"ack-mode">>, AckMode}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              publish_many(Sess, Src, Dest, <<"tag1">>, 1000),
 | 
			
		||||
              expect_many(Sess, Dest, 1000)
 | 
			
		||||
              amqp10_publish(Sess, Src, <<"tag1">>, 1000),
 | 
			
		||||
              amqp10_expect_count(Sess, Dest, 1000)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_quorum_credit_flow_on_confirm(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -984,7 +984,7 @@ local_to_local_quorum_credit_flow(Config, AckMode) ->
 | 
			
		|||
    VHost = <<"/">>,
 | 
			
		||||
    declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
 | 
			
		||||
    declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -995,8 +995,8 @@ local_to_local_quorum_credit_flow(Config, AckMode) ->
 | 
			
		|||
                                           {<<"dest-predeclared">>, true},
 | 
			
		||||
                                           {<<"ack-mode">>, AckMode}
 | 
			
		||||
                                          ]),
 | 
			
		||||
              publish_many(Sess, Src, Dest, <<"tag1">>, 1000),
 | 
			
		||||
              expect_many(Sess, Dest, 1000)
 | 
			
		||||
              amqp10_publish(Sess, Src, <<"tag1">>, 1000),
 | 
			
		||||
              amqp10_expect_count(Sess, Dest, 1000)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
local_to_local_stream_credit_flow_on_confirm(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -1014,7 +1014,7 @@ local_to_local_stream_credit_flow(Config, AckMode) ->
 | 
			
		|||
    VHost = <<"/">>,
 | 
			
		||||
    declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
 | 
			
		||||
    declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
 | 
			
		||||
    with_session(Config,
 | 
			
		||||
    with_amqp10_session(Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
             shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
                                          [{<<"src-protocol">>, <<"local">>},
 | 
			
		||||
| 
						 | 
				
			
			@ -1027,12 +1027,12 @@ local_to_local_stream_credit_flow(Config, AckMode) ->
 | 
			
		|||
                                          ]),
 | 
			
		||||
 | 
			
		||||
              Receiver = subscribe(Sess, Dest),
 | 
			
		||||
              publish_many(Sess, Src, Dest, <<"tag1">>, 1000),
 | 
			
		||||
              amqp10_publish(Sess, Src, <<"tag1">>, 1000),
 | 
			
		||||
              ?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 1000}, _}],
 | 
			
		||||
                          rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
                                                       rabbit_shovel_status, status, []),
 | 
			
		||||
                          30000),
 | 
			
		||||
              _ = expect(Receiver, 1000, []),
 | 
			
		||||
              _ = amqp10_expect(Receiver, 1000, []),
 | 
			
		||||
              amqp10_client:detach_link(Receiver)
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1057,7 +1057,7 @@ local_to_local_counters(Config) ->
 | 
			
		|||
    %% Let's restart the node so the counters are reset
 | 
			
		||||
    ok = rabbit_ct_broker_helpers:stop_node(Config, 0),
 | 
			
		||||
    ok = rabbit_ct_broker_helpers:start_node(Config, 0),
 | 
			
		||||
    with_session(
 | 
			
		||||
    with_amqp10_session(
 | 
			
		||||
      Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              ?awaitMatch(#{publishers := 0, consumers := 0},
 | 
			
		||||
| 
						 | 
				
			
			@ -1070,7 +1070,7 @@ local_to_local_counters(Config) ->
 | 
			
		|||
                                          ]),
 | 
			
		||||
              ?awaitMatch(#{publishers := 1, consumers := 1},
 | 
			
		||||
                          get_global_counters(Config), 30_000),
 | 
			
		||||
              _ = publish_many(Sess, Src, Dest, <<"tag1">>, 150),
 | 
			
		||||
              _ = amqp10_publish(Sess, Src, <<"tag1">>, 150),
 | 
			
		||||
              ?awaitMatch(#{consumers := 1, publishers := 1,
 | 
			
		||||
                            messages_received_total := 150,
 | 
			
		||||
                            messages_received_confirm_total := 150,
 | 
			
		||||
| 
						 | 
				
			
			@ -1082,81 +1082,6 @@ local_to_local_counters(Config) ->
 | 
			
		|||
      end).
 | 
			
		||||
 | 
			
		||||
%%----------------------------------------------------------------------------
 | 
			
		||||
with_session(Config, Fun) ->
 | 
			
		||||
    with_session(Config, <<"/">>, Fun).
 | 
			
		||||
 | 
			
		||||
with_session(Config, VHost, Fun) ->
 | 
			
		||||
    Hostname = ?config(rmq_hostname, Config),
 | 
			
		||||
    Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
 | 
			
		||||
    Cfg = #{address => Hostname,
 | 
			
		||||
            port => Port,
 | 
			
		||||
            sasl => {plain, <<"guest">>, <<"guest">>},
 | 
			
		||||
            hostname => <<"vhost:", VHost/binary>>},
 | 
			
		||||
    {ok, Conn} = amqp10_client:open_connection(Cfg),
 | 
			
		||||
    {ok, Sess} = amqp10_client:begin_session(Conn),
 | 
			
		||||
    Fun(Sess),
 | 
			
		||||
    amqp10_client:close_connection(Conn),
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
publish(Sender, Tag, Payload) when is_binary(Payload) ->
 | 
			
		||||
    Headers = #{durable => true},
 | 
			
		||||
    Msg = amqp10_msg:set_headers(Headers,
 | 
			
		||||
                                 amqp10_msg:new(Tag, Payload, false)),
 | 
			
		||||
    %% N.B.: this function does not attach a link and does not
 | 
			
		||||
    %%       need to use await_credit/1
 | 
			
		||||
    ok = amqp10_client:send_msg(Sender, Msg),
 | 
			
		||||
    receive
 | 
			
		||||
        {amqp10_disposition, {accepted, Tag}} -> ok
 | 
			
		||||
    after 3000 ->
 | 
			
		||||
              exit(publish_disposition_not_received)
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
publish(Session, Source, Dest, Tag, Payloads) ->
 | 
			
		||||
    LinkName = <<"dynamic-sender-", Dest/binary>>,
 | 
			
		||||
    {ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, Source,
 | 
			
		||||
                                                    unsettled, unsettled_state),
 | 
			
		||||
    ok = await_amqp10_event(link, Sender, attached),
 | 
			
		||||
    ok = await_credit(Sender),
 | 
			
		||||
    case is_list(Payloads) of
 | 
			
		||||
        true ->
 | 
			
		||||
            [publish(Sender, Tag, Payload) || Payload <- Payloads];
 | 
			
		||||
        false ->
 | 
			
		||||
            publish(Sender, Tag, Payloads)
 | 
			
		||||
    end,
 | 
			
		||||
    amqp10_client:detach_link(Sender).
 | 
			
		||||
 | 
			
		||||
publish_expect(Session, Source, Dest, Tag, Payload) ->
 | 
			
		||||
    publish(Session, Source, Dest, Tag, Payload),
 | 
			
		||||
    expect_one(Session, Dest).
 | 
			
		||||
 | 
			
		||||
publish_many(Session, Source, Dest, Tag, N) ->
 | 
			
		||||
    Payloads = [integer_to_binary(Payload) || Payload <- lists:seq(1, N)],
 | 
			
		||||
    publish(Session, Source, Dest, Tag, Payloads).
 | 
			
		||||
 | 
			
		||||
expect_one(Session, Dest) ->
 | 
			
		||||
    LinkName = <<"dynamic-receiver-", Dest/binary>>,
 | 
			
		||||
    {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName,
 | 
			
		||||
                                                        Dest, settled,
 | 
			
		||||
                                                        unsettled_state),
 | 
			
		||||
    ok = amqp10_client:flow_link_credit(Receiver, 1, never),
 | 
			
		||||
    Msg = expect(Receiver),
 | 
			
		||||
    amqp10_client:detach_link(Receiver),
 | 
			
		||||
    Msg.
 | 
			
		||||
 | 
			
		||||
expect_none(Session, Dest) ->
 | 
			
		||||
    LinkName = <<"dynamic-receiver-", Dest/binary>>,
 | 
			
		||||
    {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName,
 | 
			
		||||
                                                        Dest, settled,
 | 
			
		||||
                                                        unsettled_state),
 | 
			
		||||
    ok = amqp10_client:flow_link_credit(Receiver, 1, never),
 | 
			
		||||
    receive
 | 
			
		||||
        {amqp10_msg, Receiver, _} ->
 | 
			
		||||
            throw(unexpected_msg)
 | 
			
		||||
    after 4000 ->
 | 
			
		||||
            ok
 | 
			
		||||
    end,
 | 
			
		||||
    amqp10_client:detach_link(Receiver).
 | 
			
		||||
 | 
			
		||||
subscribe(Session, Dest) ->
 | 
			
		||||
    LinkName = <<"dynamic-receiver-", Dest/binary>>,
 | 
			
		||||
    {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName,
 | 
			
		||||
| 
						 | 
				
			
			@ -1165,34 +1090,6 @@ subscribe(Session, Dest) ->
 | 
			
		|||
    ok = amqp10_client:flow_link_credit(Receiver, 10, 1),
 | 
			
		||||
    Receiver.
 | 
			
		||||
 | 
			
		||||
expect_many(Session, Dest, N) ->
 | 
			
		||||
    LinkName = <<"dynamic-receiver-", Dest/binary>>,
 | 
			
		||||
    {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName,
 | 
			
		||||
                                                        Dest, settled,
 | 
			
		||||
                                                        unsettled_state),
 | 
			
		||||
    ok = amqp10_client:flow_link_credit(Receiver, 10, 1),
 | 
			
		||||
    Msgs = expect(Receiver, N, []),
 | 
			
		||||
    amqp10_client:detach_link(Receiver),
 | 
			
		||||
    Msgs.
 | 
			
		||||
 | 
			
		||||
expect(_, 0, Acc) ->
 | 
			
		||||
    Acc;
 | 
			
		||||
expect(Receiver, N, Acc) ->
 | 
			
		||||
    receive
 | 
			
		||||
        {amqp10_msg, Receiver, InMsg} ->
 | 
			
		||||
            expect(Receiver, N - 1, [amqp10_msg:body(InMsg) | Acc])
 | 
			
		||||
    after 4000 ->
 | 
			
		||||
            throw({timeout_in_expect_waiting_for_delivery, N, Acc})
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
expect(Receiver) ->
 | 
			
		||||
    receive
 | 
			
		||||
        {amqp10_msg, Receiver, InMsg} ->
 | 
			
		||||
            InMsg
 | 
			
		||||
    after 4000 ->
 | 
			
		||||
            throw(timeout_in_expect_waiting_for_delivery)
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
declare_queue(Config, VHost, QName) ->
 | 
			
		||||
    declare_queue(Config, VHost, QName, []).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1233,13 +1130,6 @@ declare_exchange(Config, VHost, Exchange) ->
 | 
			
		|||
    rabbit_ct_client_helpers:close_channel(Ch),
 | 
			
		||||
    rabbit_ct_client_helpers:close_connection(Conn).
 | 
			
		||||
 | 
			
		||||
delete_all_queues() ->
 | 
			
		||||
    Queues = rabbit_amqqueue:list(),
 | 
			
		||||
    lists:foreach(
 | 
			
		||||
      fun(Q) ->
 | 
			
		||||
              {ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
 | 
			
		||||
      end, Queues).
 | 
			
		||||
 | 
			
		||||
delete_queue(Name, VHost) ->
 | 
			
		||||
    QName = rabbit_misc:r(VHost, queue, Name),
 | 
			
		||||
    case rabbit_amqqueue:lookup(QName) of
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -14,7 +14,8 @@
 | 
			
		|||
 | 
			
		||||
-compile(export_all).
 | 
			
		||||
 | 
			
		||||
-import(shovel_test_utils, [await_amqp10_event/3, await_credit/1]).
 | 
			
		||||
-import(shovel_test_utils, [await_amqp10_event/3, await_credit/1,
 | 
			
		||||
                            with_amqp10_session/2]).
 | 
			
		||||
 | 
			
		||||
-define(PARAM, <<"test">>).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -84,7 +85,7 @@ init_per_testcase(Testcase, Config0) ->
 | 
			
		|||
 | 
			
		||||
end_per_testcase(Testcase, Config) ->
 | 
			
		||||
    shovel_test_utils:clear_param(Config, ?PARAM),
 | 
			
		||||
    rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_all_queues, []),
 | 
			
		||||
    rabbit_ct_broker_helpers:rpc(Config, 0, shovel_test_utils, delete_all_queues, []),
 | 
			
		||||
    _ = rabbit_ct_broker_helpers:delete_vhost(Config, ?config(alt_vhost, Config)),
 | 
			
		||||
    rabbit_ct_helpers:testcase_finished(Config, Testcase).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -97,7 +98,7 @@ local_to_local_dest_down(Config) ->
 | 
			
		|||
    Dest = ?config(destq, Config),
 | 
			
		||||
    declare_queue(Config, 0, <<"/">>, Src),
 | 
			
		||||
    declare_queue(Config, 1, <<"/">>, Dest),
 | 
			
		||||
    with_session(
 | 
			
		||||
    with_amqp10_session(
 | 
			
		||||
      Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
| 
						 | 
				
			
			@ -128,7 +129,7 @@ local_to_local_multiple_all_dest_down(Config) ->
 | 
			
		|||
    declare_queue(Config, 0, <<"/">>, Src),
 | 
			
		||||
    declare_and_bind_queue(Config, 1, <<"/">>, <<"amq.fanout">>, Dest, Dest),
 | 
			
		||||
    declare_and_bind_queue(Config, 1, <<"/">>, <<"amq.fanout">>, Dest2, Dest2),
 | 
			
		||||
    with_session(
 | 
			
		||||
    with_amqp10_session(
 | 
			
		||||
      Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
| 
						 | 
				
			
			@ -164,7 +165,7 @@ local_to_local_multiple_some_dest_down(Config) ->
 | 
			
		|||
    %% and should be requeued.
 | 
			
		||||
    declare_and_bind_queue(Config, 1, <<"/">>, <<"amq.fanout">>, Dest, Dest),
 | 
			
		||||
    declare_and_bind_queue(Config, 2, <<"/">>, <<"amq.fanout">>, Dest2, Dest2),
 | 
			
		||||
    with_session(
 | 
			
		||||
    with_amqp10_session(
 | 
			
		||||
      Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
| 
						 | 
				
			
			@ -200,7 +201,7 @@ local_to_local_no_destination(Config) ->
 | 
			
		|||
    Src = ?config(srcq, Config),
 | 
			
		||||
    Dest = ?config(destq, Config),
 | 
			
		||||
    declare_queue(Config, 0, <<"/">>, Src),
 | 
			
		||||
    with_session(
 | 
			
		||||
    with_amqp10_session(
 | 
			
		||||
      Config,
 | 
			
		||||
      fun (Sess) ->
 | 
			
		||||
              shovel_test_utils:set_param(Config, ?PARAM,
 | 
			
		||||
| 
						 | 
				
			
			@ -229,22 +230,6 @@ to_int(<<>>) ->
 | 
			
		|||
to_int(Int) ->
 | 
			
		||||
    binary_to_integer(Int).
 | 
			
		||||
 | 
			
		||||
with_session(Config, Fun) ->
 | 
			
		||||
    with_session(Config, <<"/">>, Fun).
 | 
			
		||||
 | 
			
		||||
with_session(Config, VHost, Fun) ->
 | 
			
		||||
    Hostname = ?config(rmq_hostname, Config),
 | 
			
		||||
    Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
 | 
			
		||||
    Cfg = #{address => Hostname,
 | 
			
		||||
            port => Port,
 | 
			
		||||
            sasl => {plain, <<"guest">>, <<"guest">>},
 | 
			
		||||
            hostname => <<"vhost:", VHost/binary>>},
 | 
			
		||||
    {ok, Conn} = amqp10_client:open_connection(Cfg),
 | 
			
		||||
    {ok, Sess} = amqp10_client:begin_session(Conn),
 | 
			
		||||
    Fun(Sess),
 | 
			
		||||
    amqp10_client:close_connection(Conn),
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
publish(Sender, Tag, Payload) when is_binary(Payload) ->
 | 
			
		||||
    Headers = #{durable => true},
 | 
			
		||||
    Msg = amqp10_msg:set_headers(Headers,
 | 
			
		||||
| 
						 | 
				
			
			@ -344,13 +329,6 @@ declare_exchange(Config, VHost, Exchange) ->
 | 
			
		|||
    rabbit_ct_client_helpers:close_channel(Ch),
 | 
			
		||||
    rabbit_ct_client_helpers:close_connection(Conn).
 | 
			
		||||
 | 
			
		||||
delete_all_queues() ->
 | 
			
		||||
    Queues = rabbit_amqqueue:list(),
 | 
			
		||||
    lists:foreach(
 | 
			
		||||
      fun(Q) ->
 | 
			
		||||
              {ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
 | 
			
		||||
      end, Queues).
 | 
			
		||||
 | 
			
		||||
delete_queue(Name, VHost) ->
 | 
			
		||||
    QName = rabbit_misc:r(VHost, queue, Name),
 | 
			
		||||
    case rabbit_amqqueue:lookup(QName) of
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -16,7 +16,16 @@
 | 
			
		|||
         await/1, await/2, await_amqp10_event/3, await_credit/1,
 | 
			
		||||
         clear_param/2, clear_param/3, make_uri/2,
 | 
			
		||||
         make_uri/3, make_uri/5,
 | 
			
		||||
         await_shovel1/4, await_no_shovel/2]).
 | 
			
		||||
         await_shovel1/4, await_no_shovel/2,
 | 
			
		||||
         delete_all_queues/0,
 | 
			
		||||
         with_amqp10_session/2, with_amqp10_session/3,
 | 
			
		||||
         amqp10_publish/3, amqp10_publish/4,
 | 
			
		||||
         amqp10_expect_empty/2, amqp10_expect_one/2,
 | 
			
		||||
         amqp10_expect_count/3, amqp10_expect/3,
 | 
			
		||||
         amqp10_publish_expect/5,
 | 
			
		||||
         await_autodelete/2, await_autodelete1/2,
 | 
			
		||||
         invalid_param/2, invalid_param/3,
 | 
			
		||||
         valid_param/2, valid_param/3, valid_param1/3]).
 | 
			
		||||
 | 
			
		||||
make_uri(Config, Node) ->
 | 
			
		||||
    Hostname = ?config(rmq_hostname, Config),
 | 
			
		||||
| 
						 | 
				
			
			@ -170,3 +179,145 @@ restart_shovel(Config, Name) ->
 | 
			
		|||
restart_shovel(Config, Node, Name) ->
 | 
			
		||||
    rabbit_ct_broker_helpers:rpc(Config,
 | 
			
		||||
                        Node, rabbit_shovel_util, restart_shovel, [<<"/">>, Name]).
 | 
			
		||||
 | 
			
		||||
delete_all_queues() ->
 | 
			
		||||
    Queues = rabbit_amqqueue:list(),
 | 
			
		||||
    lists:foreach(
 | 
			
		||||
      fun(Q) ->
 | 
			
		||||
              {ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
 | 
			
		||||
      end, Queues).
 | 
			
		||||
 | 
			
		||||
with_amqp10_session(Config, Fun) ->
 | 
			
		||||
    with_amqp10_session(Config, <<"/">>, Fun).
 | 
			
		||||
 | 
			
		||||
with_amqp10_session(Config, VHost, Fun) ->
 | 
			
		||||
    Hostname = ?config(rmq_hostname, Config),
 | 
			
		||||
    Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
 | 
			
		||||
    Cfg = #{address => Hostname,
 | 
			
		||||
            port => Port,
 | 
			
		||||
            sasl => {plain, <<"guest">>, <<"guest">>},
 | 
			
		||||
            hostname => <<"vhost:", VHost/binary>>},
 | 
			
		||||
    {ok, Conn} = amqp10_client:open_connection(Cfg),
 | 
			
		||||
    {ok, Sess} = amqp10_client:begin_session(Conn),
 | 
			
		||||
    Fun(Sess),
 | 
			
		||||
    amqp10_client:close_connection(Conn),
 | 
			
		||||
    ok.
 | 
			
		||||
 | 
			
		||||
amqp10_publish(Sender, Tag, Payload) when is_binary(Payload) ->
 | 
			
		||||
    Headers = #{durable => true},
 | 
			
		||||
    Msg = amqp10_msg:set_headers(Headers,
 | 
			
		||||
                                 amqp10_msg:new(Tag, Payload, false)),
 | 
			
		||||
    ok = amqp10_client:send_msg(Sender, Msg),
 | 
			
		||||
    receive
 | 
			
		||||
        {amqp10_disposition, {accepted, Tag}} -> ok
 | 
			
		||||
    after 3000 ->
 | 
			
		||||
              exit(publish_disposition_not_received)
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
amqp10_expect_empty(Session, Dest) ->
 | 
			
		||||
    LinkName = <<"dynamic-receiver-", Dest/binary>>,
 | 
			
		||||
    {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName,
 | 
			
		||||
                                                        Dest, settled,
 | 
			
		||||
                                                        unsettled_state),
 | 
			
		||||
    ok = amqp10_client:flow_link_credit(Receiver, 1, never),
 | 
			
		||||
    receive
 | 
			
		||||
        {amqp10_msg, Receiver, _} ->
 | 
			
		||||
            throw(unexpected_msg)
 | 
			
		||||
    after 500 ->
 | 
			
		||||
            ok
 | 
			
		||||
    end,
 | 
			
		||||
    amqp10_client:detach_link(Receiver).
 | 
			
		||||
 | 
			
		||||
amqp10_publish(Session, Address, Payload, Count) ->
 | 
			
		||||
    LinkName = <<"dynamic-sender-", Address/binary>>,
 | 
			
		||||
    {ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, Address,
 | 
			
		||||
                                                    unsettled, unsettled_state),
 | 
			
		||||
    ok = await_amqp10_event(link, Sender, attached),
 | 
			
		||||
    ok = await_credit(Sender),
 | 
			
		||||
    [begin
 | 
			
		||||
         Tag = rabbit_data_coercion:to_binary(I),
 | 
			
		||||
         amqp10_publish(Sender, Tag, <<Payload/binary, Tag/binary>>)
 | 
			
		||||
     end || I <- lists:seq(1, Count)],
 | 
			
		||||
    amqp10_client:detach_link(Sender).
 | 
			
		||||
 | 
			
		||||
amqp10_publish_expect(Session, Source, Destination, Payload, Count) ->
 | 
			
		||||
    amqp10_publish(Session, Source, Payload, Count),
 | 
			
		||||
    amqp10_expect_count(Session, Destination, Count).
 | 
			
		||||
 | 
			
		||||
amqp10_expect_one(Session, Dest) ->
 | 
			
		||||
    LinkName = <<"dynamic-receiver-", Dest/binary>>,
 | 
			
		||||
    {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName,
 | 
			
		||||
                                                        Dest, settled,
 | 
			
		||||
                                                        unsettled_state),
 | 
			
		||||
    ok = amqp10_client:flow_link_credit(Receiver, 1, never),
 | 
			
		||||
    Msg = amqp10_expect(Receiver),
 | 
			
		||||
    amqp10_client:detach_link(Receiver),
 | 
			
		||||
    Msg.
 | 
			
		||||
 | 
			
		||||
amqp10_expect_count(Session, Dest, Count) ->
 | 
			
		||||
    LinkName = <<"dynamic-receiver-", Dest/binary>>,
 | 
			
		||||
    {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName,
 | 
			
		||||
                                                        Dest, settled,
 | 
			
		||||
                                                        unsettled_state),
 | 
			
		||||
    ok = amqp10_client:flow_link_credit(Receiver, Count, never),
 | 
			
		||||
    Msgs = amqp10_expect(Receiver, Count, []),
 | 
			
		||||
    receive
 | 
			
		||||
        {amqp10_msg, Receiver, Msg} ->
 | 
			
		||||
            throw({unexpected_msg, Msg})
 | 
			
		||||
    after 500 ->
 | 
			
		||||
            ok
 | 
			
		||||
    end,
 | 
			
		||||
    amqp10_client:detach_link(Receiver),
 | 
			
		||||
    Msgs.
 | 
			
		||||
 | 
			
		||||
amqp10_expect(_, 0, Acc) ->
 | 
			
		||||
    Acc;
 | 
			
		||||
amqp10_expect(Receiver, N, Acc) ->
 | 
			
		||||
    receive
 | 
			
		||||
        {amqp10_msg, Receiver, InMsg} ->
 | 
			
		||||
            amqp10_expect(Receiver, N - 1, [InMsg | Acc])
 | 
			
		||||
    after 4000 ->
 | 
			
		||||
            throw({timeout_in_expect_waiting_for_delivery, N, Acc})
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
amqp10_expect(Receiver) ->
 | 
			
		||||
    receive
 | 
			
		||||
        {amqp10_msg, Receiver, InMsg} ->
 | 
			
		||||
            InMsg
 | 
			
		||||
    after 4000 ->
 | 
			
		||||
            throw(timeout_in_expect_waiting_for_delivery)
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
await_autodelete(Config, Name) ->
 | 
			
		||||
    rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
      ?MODULE, await_autodelete1, [Config, Name], 10000).
 | 
			
		||||
 | 
			
		||||
await_autodelete1(_Config, Name) ->
 | 
			
		||||
    await(
 | 
			
		||||
      fun () -> not lists:member(Name, shovels_from_parameters()) end),
 | 
			
		||||
    await(
 | 
			
		||||
      fun () ->
 | 
			
		||||
              not lists:member(Name,
 | 
			
		||||
                               shovels_from_status())
 | 
			
		||||
      end).
 | 
			
		||||
 | 
			
		||||
shovels_from_parameters() ->
 | 
			
		||||
    L = rabbit_runtime_parameters:list(<<"/">>, <<"shovel">>),
 | 
			
		||||
    [rabbit_misc:pget(name, Shovel) || Shovel <- L].
 | 
			
		||||
 | 
			
		||||
invalid_param(Config, Value, User) ->
 | 
			
		||||
    {error_string, _} = rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
      rabbit_runtime_parameters, set,
 | 
			
		||||
      [<<"/">>, <<"shovel">>, <<"invalid">>, Value, User]).
 | 
			
		||||
 | 
			
		||||
valid_param(Config, Value, User) ->
 | 
			
		||||
    rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
			
		||||
      ?MODULE, valid_param1, [Config, Value, User]).
 | 
			
		||||
 | 
			
		||||
valid_param1(_Config, Value, User) ->
 | 
			
		||||
    ok = rabbit_runtime_parameters:set(
 | 
			
		||||
           <<"/">>, <<"shovel">>, <<"name">>, Value, User),
 | 
			
		||||
    ok = rabbit_runtime_parameters:clear(<<"/">>, <<"shovel">>, <<"name">>, <<"acting-user">>).
 | 
			
		||||
 | 
			
		||||
invalid_param(Config, Value) -> invalid_param(Config, Value, none).
 | 
			
		||||
valid_param(Config, Value) -> valid_param(Config, Value, none).
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue