Merge pull request #14673 from rabbitmq/shovel-tests
Trigger a 4.3.x alpha release build / trigger_alpha_build (push) Waiting to run
Details
Test (make) / Build and Xref (1.18, 26) (push) Waiting to run
Details
Test (make) / Build and Xref (1.18, 27) (push) Waiting to run
Details
Test (make) / Build and Xref (1.18, 28) (push) Waiting to run
Details
Test (make) / Test (1.18, 28, khepri) (push) Waiting to run
Details
Test (make) / Test (1.18, 28, mnesia) (push) Waiting to run
Details
Test (make) / Test mixed clusters (1.18, 28, khepri) (push) Waiting to run
Details
Test (make) / Test mixed clusters (1.18, 28, mnesia) (push) Waiting to run
Details
Test (make) / Type check (1.18, 28) (push) Waiting to run
Details
Trigger a 4.3.x alpha release build / trigger_alpha_build (push) Waiting to run
Details
Test (make) / Build and Xref (1.18, 26) (push) Waiting to run
Details
Test (make) / Build and Xref (1.18, 27) (push) Waiting to run
Details
Test (make) / Build and Xref (1.18, 28) (push) Waiting to run
Details
Test (make) / Test (1.18, 28, khepri) (push) Waiting to run
Details
Test (make) / Test (1.18, 28, mnesia) (push) Waiting to run
Details
Test (make) / Test mixed clusters (1.18, 28, khepri) (push) Waiting to run
Details
Test (make) / Test mixed clusters (1.18, 28, mnesia) (push) Waiting to run
Details
Test (make) / Type check (1.18, 28) (push) Waiting to run
Details
Shovels: start local shovels with predeclared queues when queue arguments don't match
This commit is contained in:
commit
656a4bfa65
|
|
@ -594,7 +594,12 @@ decl_queue(QName, QArgs, VHost, User) ->
|
|||
Method = #'queue.declare'{queue = QName,
|
||||
durable = true,
|
||||
arguments = Args},
|
||||
decl_fun([Method], VHost, User).
|
||||
try
|
||||
decl_fun([#'queue.declare'{queue = QName,
|
||||
passive = true}], VHost, User)
|
||||
catch exit:{amqp_error, not_found, _, _} ->
|
||||
decl_fun([Method], VHost, User)
|
||||
end.
|
||||
|
||||
dest_check_queue(none, _, _, _) ->
|
||||
ok;
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ groups() ->
|
|||
restart,
|
||||
change_definition,
|
||||
autodelete,
|
||||
autodelete_with_rejections,
|
||||
validation,
|
||||
security_validation,
|
||||
get_connection_name,
|
||||
|
|
@ -519,6 +520,35 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) ->
|
|||
expect_count(Ch, <<"src">>, <<"hello">>, ExpSrc)
|
||||
end.
|
||||
|
||||
autodelete_with_rejections(Config) ->
|
||||
Src = <<"src">>,
|
||||
Dest = <<"dst">>,
|
||||
Args = [{<<"x-max-length">>, long, 5},
|
||||
{<<"x-overflow">>, longstr, <<"reject-publish">>}],
|
||||
with_ch(Config,
|
||||
fun (Ch) ->
|
||||
amqp_channel:call(Ch, #'queue.declare'{queue = Dest,
|
||||
durable = true,
|
||||
arguments = Args}),
|
||||
shovel_test_utils:set_param(Config, <<"test">>,
|
||||
[{<<"src-protocol">>, <<"local">>},
|
||||
{<<"src-queue">>, Src},
|
||||
{<<"src-delete-after">>, 10},
|
||||
{<<"dest-protocol">>, <<"local">>},
|
||||
{<<"dest-predeclared">>, true},
|
||||
{<<"dest-queue">>, Dest}
|
||||
]),
|
||||
publish_count(Ch, <<>>, Src, <<"hello">>, 10),
|
||||
await_autodelete(Config, <<"test">>),
|
||||
Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]),
|
||||
eventually(
|
||||
?_assertMatch(
|
||||
Expected,
|
||||
lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
|
||||
Config, 0,
|
||||
["list_queues", "name", "messages_ready", "--no-table-headers"]))))
|
||||
end).
|
||||
|
||||
validation(Config) ->
|
||||
URIs = [{<<"src-uri">>, <<"amqp://">>},
|
||||
{<<"dest-uri">>, <<"amqp://">>}],
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@
|
|||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
|
||||
-compile(export_all).
|
||||
|
||||
-import(shovel_test_utils, [with_amqp10_session/2,
|
||||
|
|
@ -16,9 +17,11 @@
|
|||
amqp10_expect_empty/2,
|
||||
await_amqp10_event/3, amqp10_expect_one/2,
|
||||
amqp10_expect_count/3, amqp10_publish/4,
|
||||
amqp10_publish_expect/5,
|
||||
amqp10_publish_expect/5, amqp10_declare_queue/3,
|
||||
await_autodelete/2]).
|
||||
|
||||
-define(PARAM, <<"test">>).
|
||||
|
||||
all() ->
|
||||
[
|
||||
{group, non_parallel_tests},
|
||||
|
|
@ -34,6 +37,7 @@ groups() ->
|
|||
autodelete_amqp091_src_on_publish,
|
||||
autodelete_amqp091_dest_on_confirm,
|
||||
autodelete_amqp091_dest_on_publish,
|
||||
autodelete_with_rejections,
|
||||
simple_amqp10_dest,
|
||||
simple_amqp10_src,
|
||||
amqp091_to_amqp10_with_dead_lettering,
|
||||
|
|
@ -83,6 +87,8 @@ init_per_testcase(Testcase, Config0) ->
|
|||
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
||||
|
||||
end_per_testcase(Testcase, Config) ->
|
||||
shovel_test_utils:clear_param(Config, ?PARAM),
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0, shovel_test_utils, delete_all_queues, []),
|
||||
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
|
|
@ -113,11 +119,9 @@ amqp091_to_amqp10_with_dead_lettering(Config) ->
|
|||
TmpQ = <<"tmp">>,
|
||||
with_amqp10_session(Config,
|
||||
fun (Sess) ->
|
||||
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Sess, <<"my link pair">>),
|
||||
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, TmpQ,
|
||||
#{arguments =>#{<<"x-max-length">> => {uint, 0},
|
||||
<<"x-dead-letter-exchange">> => {utf8, <<"">>},
|
||||
<<"x-dead-letter-routing-key">> => {utf8, Src}}}),
|
||||
amqp10_declare_queue(Sess, TmpQ, #{<<"x-max-length">> => {uint, 0},
|
||||
<<"x-dead-letter-exchange">> => {utf8, <<"">>},
|
||||
<<"x-dead-letter-routing-key">> => {utf8, Src}}),
|
||||
{ok, Sender} = amqp10_client:attach_sender_link(Sess,
|
||||
<<"sender-tmp">>,
|
||||
<<"/queues/", TmpQ/binary>>,
|
||||
|
|
@ -132,7 +136,7 @@ amqp091_to_amqp10_with_dead_lettering(Config) ->
|
|||
|
||||
test_amqp10_destination(Config, Src, Dest, Sess, Protocol, ProtocolSrc) ->
|
||||
MapConfig = ?config(map_config, Config),
|
||||
shovel_test_utils:set_param(Config, <<"test">>,
|
||||
shovel_test_utils:set_param(Config, ?PARAM,
|
||||
[{<<"src-protocol">>, Protocol},
|
||||
{ProtocolSrc, Src},
|
||||
{<<"dest-protocol">>, <<"amqp10">>},
|
||||
|
|
@ -186,18 +190,18 @@ simple_amqp10_src(Config) ->
|
|||
fun (Sess) ->
|
||||
shovel_test_utils:set_param(
|
||||
Config,
|
||||
<<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
|
||||
{<<"src-address">>, Src},
|
||||
{<<"dest-protocol">>, <<"amqp091">>},
|
||||
{<<"dest-queue">>, Dest},
|
||||
{<<"add-forward-headers">>, true},
|
||||
{<<"dest-add-timestamp-header">>, true},
|
||||
{<<"publish-properties">>,
|
||||
case MapConfig of
|
||||
true -> #{<<"cluster_id">> => <<"x">>};
|
||||
_ -> [{<<"cluster_id">>, <<"x">>}]
|
||||
end}
|
||||
]),
|
||||
?PARAM, [{<<"src-protocol">>, <<"amqp10">>},
|
||||
{<<"src-address">>, Src},
|
||||
{<<"dest-protocol">>, <<"amqp091">>},
|
||||
{<<"dest-queue">>, Dest},
|
||||
{<<"add-forward-headers">>, true},
|
||||
{<<"dest-add-timestamp-header">>, true},
|
||||
{<<"publish-properties">>,
|
||||
case MapConfig of
|
||||
true -> #{<<"cluster_id">> => <<"x">>};
|
||||
_ -> [{<<"cluster_id">>, <<"x">>}]
|
||||
end}
|
||||
]),
|
||||
_Msg = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1),
|
||||
% the fidelity loss is quite high when consuming using the amqp10
|
||||
% plugin. For example custom headers aren't current translated.
|
||||
|
|
@ -213,18 +217,18 @@ amqp10_to_amqp091_application_properties(Config) ->
|
|||
fun (Sess) ->
|
||||
shovel_test_utils:set_param(
|
||||
Config,
|
||||
<<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
|
||||
{<<"src-address">>, Src},
|
||||
{<<"dest-protocol">>, <<"amqp091">>},
|
||||
{<<"dest-queue">>, Dest},
|
||||
{<<"add-forward-headers">>, true},
|
||||
{<<"dest-add-timestamp-header">>, true},
|
||||
{<<"publish-properties">>,
|
||||
case MapConfig of
|
||||
true -> #{<<"cluster_id">> => <<"x">>};
|
||||
_ -> [{<<"cluster_id">>, <<"x">>}]
|
||||
end}
|
||||
]),
|
||||
?PARAM, [{<<"src-protocol">>, <<"amqp10">>},
|
||||
{<<"src-address">>, Src},
|
||||
{<<"dest-protocol">>, <<"amqp091">>},
|
||||
{<<"dest-queue">>, Dest},
|
||||
{<<"add-forward-headers">>, true},
|
||||
{<<"dest-add-timestamp-header">>, true},
|
||||
{<<"publish-properties">>,
|
||||
case MapConfig of
|
||||
true -> #{<<"cluster_id">> => <<"x">>};
|
||||
_ -> [{<<"cluster_id">>, <<"x">>}]
|
||||
end}
|
||||
]),
|
||||
|
||||
MsgSent = amqp10_msg:set_application_properties(
|
||||
#{<<"key">> => <<"value">>},
|
||||
|
|
@ -247,13 +251,13 @@ change_definition(Config) ->
|
|||
Dest2 = ?config(destq2, Config),
|
||||
with_amqp10_session(Config,
|
||||
fun (Sess) ->
|
||||
shovel_test_utils:set_param(Config, <<"test">>,
|
||||
shovel_test_utils:set_param(Config, ?PARAM,
|
||||
[{<<"src-address">>, Src},
|
||||
{<<"src-protocol">>, <<"amqp10">>},
|
||||
{<<"dest-protocol">>, <<"amqp10">>},
|
||||
{<<"dest-address">>, Dest}]),
|
||||
amqp10_publish_expect(Sess, Src, Dest, <<"hello1">>, 1),
|
||||
shovel_test_utils:set_param(Config, <<"test">>,
|
||||
shovel_test_utils:set_param(Config, ?PARAM,
|
||||
[{<<"src-address">>, Src},
|
||||
{<<"src-protocol">>, <<"amqp10">>},
|
||||
{<<"dest-protocol">>, <<"amqp10">>},
|
||||
|
|
@ -296,14 +300,14 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) ->
|
|||
amqp10_publish(Session, Src, <<"hello">>, 100),
|
||||
shovel_test_utils:set_param_nowait(
|
||||
Config,
|
||||
<<"test">>, [{<<"src-address">>, Src},
|
||||
{<<"src-protocol">>, <<"amqp10">>},
|
||||
{<<"src-delete-after">>, After},
|
||||
{<<"src-prefetch-count">>, 5},
|
||||
{<<"dest-address">>, Dest},
|
||||
{<<"dest-protocol">>, <<"amqp10">>},
|
||||
{<<"ack-mode">>, AckMode}
|
||||
]),
|
||||
?PARAM, [{<<"src-address">>, Src},
|
||||
{<<"src-protocol">>, <<"amqp10">>},
|
||||
{<<"src-delete-after">>, After},
|
||||
{<<"src-prefetch-count">>, 5},
|
||||
{<<"dest-address">>, Dest},
|
||||
{<<"dest-protocol">>, <<"amqp10">>},
|
||||
{<<"ack-mode">>, AckMode}
|
||||
]),
|
||||
await_autodelete(Config, <<"test">>),
|
||||
amqp10_expect_count(Session, Dest, ExpDest),
|
||||
amqp10_expect_count(Session, Src, ExpSrc)
|
||||
|
|
@ -316,14 +320,14 @@ autodelete_amqp091_src(Config, {AckMode, After, ExpSrc, ExpDest}) ->
|
|||
amqp10_publish(Session, Src, <<"hello">>, 100),
|
||||
shovel_test_utils:set_param_nowait(
|
||||
Config,
|
||||
<<"test">>, [{<<"src-queue">>, Src},
|
||||
{<<"src-protocol">>, <<"amqp091">>},
|
||||
{<<"src-delete-after">>, After},
|
||||
{<<"src-prefetch-count">>, 5},
|
||||
{<<"dest-address">>, Dest},
|
||||
{<<"dest-protocol">>, <<"amqp10">>},
|
||||
{<<"ack-mode">>, AckMode}
|
||||
]),
|
||||
?PARAM, [{<<"src-queue">>, Src},
|
||||
{<<"src-protocol">>, <<"amqp091">>},
|
||||
{<<"src-delete-after">>, After},
|
||||
{<<"src-prefetch-count">>, 5},
|
||||
{<<"dest-address">>, Dest},
|
||||
{<<"dest-protocol">>, <<"amqp10">>},
|
||||
{<<"ack-mode">>, AckMode}
|
||||
]),
|
||||
await_autodelete(Config, <<"test">>),
|
||||
amqp10_expect_count(Session, Dest, ExpDest),
|
||||
amqp10_expect_count(Session, Src, ExpSrc)
|
||||
|
|
@ -336,19 +340,47 @@ autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) ->
|
|||
amqp10_publish(Session, Src, <<"hello">>, 100),
|
||||
shovel_test_utils:set_param_nowait(
|
||||
Config,
|
||||
<<"test">>, [{<<"src-address">>, Src},
|
||||
{<<"src-protocol">>, <<"amqp10">>},
|
||||
{<<"src-delete-after">>, After},
|
||||
{<<"src-prefetch-count">>, 5},
|
||||
{<<"dest-queue">>, Dest},
|
||||
{<<"dest-protocol">>, <<"amqp091">>},
|
||||
{<<"ack-mode">>, AckMode}
|
||||
]),
|
||||
?PARAM, [{<<"src-address">>, Src},
|
||||
{<<"src-protocol">>, <<"amqp10">>},
|
||||
{<<"src-delete-after">>, After},
|
||||
{<<"src-prefetch-count">>, 5},
|
||||
{<<"dest-queue">>, Dest},
|
||||
{<<"dest-protocol">>, <<"amqp091">>},
|
||||
{<<"ack-mode">>, AckMode}
|
||||
]),
|
||||
await_autodelete(Config, <<"test">>),
|
||||
amqp10_expect_count(Session, Dest, ExpDest),
|
||||
amqp10_expect_count(Session, Src, ExpSrc)
|
||||
end.
|
||||
|
||||
autodelete_with_rejections(Config) ->
|
||||
Src = ?config(srcq, Config),
|
||||
Dest = ?config(destq, Config),
|
||||
with_amqp10_session(
|
||||
Config,
|
||||
fun (Sess) ->
|
||||
amqp10_declare_queue(Sess, Dest, #{<<"x-max-length">> => {uint, 5},
|
||||
<<"x-overflow">> => {utf8, <<"reject-publish">>}}),
|
||||
|
||||
shovel_test_utils:set_param(Config, ?PARAM,
|
||||
[{<<"src-protocol">>, <<"local">>},
|
||||
{<<"src-queue">>, Src},
|
||||
{<<"src-delete-after">>, 10},
|
||||
{<<"dest-protocol">>, <<"local">>},
|
||||
{<<"dest-predeclared">>, true},
|
||||
{<<"dest-queue">>, Dest}
|
||||
]),
|
||||
amqp10_publish(Sess, Src, <<"hello">>, 10),
|
||||
await_autodelete(Config, <<"test">>),
|
||||
Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]),
|
||||
?awaitMatch(
|
||||
Expected,
|
||||
lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
|
||||
Config, 0,
|
||||
["list_queues", "name", "messages_ready", "--no-table-headers"])),
|
||||
30_000)
|
||||
end).
|
||||
|
||||
test_amqp10_delete_after_queue_length(Config) ->
|
||||
Src = ?config(srcq, Config),
|
||||
Dest = ?config(destq, Config),
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@
|
|||
amqp10_expect_empty/2,
|
||||
amqp10_publish/4, amqp10_expect_one/2,
|
||||
amqp10_expect_count/3, amqp10_expect/3,
|
||||
amqp10_publish_expect/5,
|
||||
amqp10_publish_expect/5, amqp10_subscribe/2,
|
||||
await_autodelete/2]).
|
||||
|
||||
-define(PARAM, <<"test">>).
|
||||
|
|
@ -54,6 +54,7 @@ groups() ->
|
|||
local_to_local_delete_after_queue_length,
|
||||
local_to_local_delete_after_queue_length_zero,
|
||||
local_to_local_delete_after_number,
|
||||
local_to_local_delete_after_with_rejections,
|
||||
local_to_local_no_ack,
|
||||
local_to_local_quorum_no_ack,
|
||||
local_to_local_stream_no_ack,
|
||||
|
|
@ -586,6 +587,34 @@ local_to_local_delete_after_number(Config) ->
|
|||
amqp10_expect_empty(Sess, Dest)
|
||||
end).
|
||||
|
||||
local_to_local_delete_after_with_rejections(Config) ->
|
||||
Src = ?config(srcq, Config),
|
||||
Dest = ?config(destq, Config),
|
||||
VHost = <<"/">>,
|
||||
declare_queue(Config, VHost, Dest, [{<<"x-max-length">>, long, 5},
|
||||
{<<"x-overflow">>, longstr, <<"reject-publish">>}]),
|
||||
with_amqp10_session(Config,
|
||||
fun (Sess) ->
|
||||
shovel_test_utils:set_param(Config, ?PARAM,
|
||||
[{<<"src-protocol">>, <<"local">>},
|
||||
{<<"src-queue">>, Src},
|
||||
{<<"src-delete-after">>, 10},
|
||||
{<<"dest-protocol">>, <<"local">>},
|
||||
{<<"dest-predeclared">>, true},
|
||||
{<<"dest-queue">>, Dest}
|
||||
]),
|
||||
amqp10_publish(Sess, Src, <<"tag1">>, 10),
|
||||
?awaitMatch(not_found, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, lookup, [<<"/">>, <<"shovel">>, ?PARAM]), 30_000),
|
||||
Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]),
|
||||
?awaitMatch(
|
||||
Expected,
|
||||
lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
|
||||
Config, 0,
|
||||
["list_queues", "name", "messages_ready", "--no-table-headers"])),
|
||||
30_000)
|
||||
|
||||
end).
|
||||
|
||||
local_to_local_no_ack(Config) ->
|
||||
Src = ?config(srcq, Config),
|
||||
Dest = ?config(destq, Config),
|
||||
|
|
@ -637,7 +666,7 @@ local_to_local_stream_no_ack(Config) ->
|
|||
{<<"dest-queue">>, Dest},
|
||||
{<<"ack-mode">>, <<"no-ack">>}
|
||||
]),
|
||||
Receiver = subscribe(Sess, Dest),
|
||||
Receiver = amqp10_subscribe(Sess, Dest),
|
||||
amqp10_publish(Sess, Src, <<"tag1">>, 10),
|
||||
?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 10}, _}],
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||
|
|
@ -699,7 +728,7 @@ local_to_local_stream_on_confirm(Config) ->
|
|||
{<<"dest-queue">>, Dest},
|
||||
{<<"ack-mode">>, <<"on-confirm">>}
|
||||
]),
|
||||
Receiver = subscribe(Sess, Dest),
|
||||
Receiver = amqp10_subscribe(Sess, Dest),
|
||||
amqp10_publish(Sess, Src, <<"tag1">>, 10),
|
||||
?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 10}, _}],
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||
|
|
@ -761,7 +790,7 @@ local_to_local_stream_on_publish(Config) ->
|
|||
{<<"dest-queue">>, Dest},
|
||||
{<<"ack-mode">>, <<"on-publish">>}
|
||||
]),
|
||||
Receiver = subscribe(Sess, Dest),
|
||||
Receiver = amqp10_subscribe(Sess, Dest),
|
||||
amqp10_publish(Sess, Src, <<"tag1">>, 10),
|
||||
?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 10}, _}],
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||
|
|
@ -1011,7 +1040,7 @@ local_to_local_stream_credit_flow(Config, AckMode) ->
|
|||
{<<"ack-mode">>, AckMode}
|
||||
]),
|
||||
|
||||
Receiver = subscribe(Sess, Dest),
|
||||
Receiver = amqp10_subscribe(Sess, Dest),
|
||||
amqp10_publish(Sess, Src, <<"tag1">>, 1000),
|
||||
?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 1000}, _}],
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||
|
|
@ -1067,14 +1096,6 @@ local_to_local_counters(Config) ->
|
|||
end).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
subscribe(Session, Dest) ->
|
||||
LinkName = <<"dynamic-receiver-", Dest/binary>>,
|
||||
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName,
|
||||
Dest, settled,
|
||||
unsettled_state),
|
||||
ok = amqp10_client:flow_link_credit(Receiver, 10, 1),
|
||||
Receiver.
|
||||
|
||||
declare_queue(Config, VHost, QName) ->
|
||||
declare_queue(Config, VHost, QName, []).
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,8 @@
|
|||
|
||||
-import(shovel_test_utils, [set_param/3,
|
||||
with_amqp10_session/2,
|
||||
amqp10_publish_expect/5]).
|
||||
amqp10_publish_expect/5,
|
||||
amqp10_declare_queue/3]).
|
||||
|
||||
-define(PARAM, <<"test">>).
|
||||
|
||||
|
|
@ -48,7 +49,13 @@ groups() ->
|
|||
|
||||
tests() ->
|
||||
[
|
||||
simple
|
||||
simple,
|
||||
simple_classic_no_ack,
|
||||
simple_classic_on_confirm,
|
||||
simple_classic_on_publish,
|
||||
simple_quorum_no_ack,
|
||||
simple_quorum_on_confirm,
|
||||
simple_quorum_on_publish
|
||||
].
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
|
|
@ -189,19 +196,49 @@ end_per_testcase(Testcase, Config) ->
|
|||
%% Testcases.
|
||||
%% -------------------------------------------------------------------
|
||||
simple(Config) ->
|
||||
Name = <<"test">>,
|
||||
Src = ?config(srcq, Config),
|
||||
Dest = ?config(destq, Config),
|
||||
with_amqp10_session(
|
||||
Config,
|
||||
fun (Sess) ->
|
||||
set_param(Config, Name, ?config(shovel_args, Config)),
|
||||
set_param(Config, ?PARAM, ?config(shovel_args, Config)),
|
||||
amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1),
|
||||
Status = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, Name}]),
|
||||
Status = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, ?PARAM}]),
|
||||
?assertMatch([_|_], Status),
|
||||
?assertMatch(#{metrics := #{forwarded := 1}}, maps:from_list(Status))
|
||||
end).
|
||||
|
||||
simple_classic_no_ack(Config) ->
|
||||
simple_queue_type_ack_mode(Config, <<"classic">>, <<"no-ack">>).
|
||||
|
||||
simple_classic_on_confirm(Config) ->
|
||||
simple_queue_type_ack_mode(Config, <<"classic">>, <<"on-confirm">>).
|
||||
|
||||
simple_classic_on_publish(Config) ->
|
||||
simple_queue_type_ack_mode(Config, <<"classic">>, <<"on-publish">>).
|
||||
|
||||
simple_quorum_no_ack(Config) ->
|
||||
simple_queue_type_ack_mode(Config, <<"quorum">>, <<"no-ack">>).
|
||||
|
||||
simple_quorum_on_confirm(Config) ->
|
||||
simple_queue_type_ack_mode(Config, <<"quorum">>, <<"on-confirm">>).
|
||||
|
||||
simple_quorum_on_publish(Config) ->
|
||||
simple_queue_type_ack_mode(Config, <<"quorum">>, <<"on-publish">>).
|
||||
|
||||
simple_queue_type_ack_mode(Config, Type, AckMode) ->
|
||||
Src = ?config(srcq, Config),
|
||||
Dest = ?config(destq, Config),
|
||||
with_amqp10_session(
|
||||
Config,
|
||||
fun (Sess) ->
|
||||
amqp10_declare_queue(Sess, Src, #{<<"x-queue-type">> => {utf8, Type}}),
|
||||
amqp10_declare_queue(Sess, Dest, #{<<"x-queue-type">> => {utf8, Type}}),
|
||||
ExtraArgs = [{<<"ack-mode">>, AckMode}],
|
||||
ShovelArgs = ?config(shovel_args, Config) ++ ExtraArgs,
|
||||
set_param(Config, ?PARAM, ShovelArgs),
|
||||
amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 10)
|
||||
end).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
maybe_skip_local_protocol(Config) ->
|
||||
|
|
|
|||
|
|
@ -22,7 +22,8 @@
|
|||
amqp10_publish/3, amqp10_publish/4,
|
||||
amqp10_expect_empty/2, amqp10_expect_one/2,
|
||||
amqp10_expect_count/3, amqp10_expect/3,
|
||||
amqp10_publish_expect/5,
|
||||
amqp10_publish_expect/5, amqp10_declare_queue/3,
|
||||
amqp10_subscribe/2, amqp10_expect/2,
|
||||
await_autodelete/2, await_autodelete1/2,
|
||||
invalid_param/2, invalid_param/3,
|
||||
valid_param/2, valid_param/3, valid_param1/3]).
|
||||
|
|
@ -200,7 +201,8 @@ with_amqp10_session(Config, VHost, Fun) ->
|
|||
{ok, Conn} = amqp10_client:open_connection(Cfg),
|
||||
{ok, Sess} = amqp10_client:begin_session(Conn),
|
||||
Fun(Sess),
|
||||
amqp10_client:close_connection(Conn),
|
||||
ok = amqp10_client:end_session(Sess),
|
||||
ok = amqp10_client:close_connection(Conn),
|
||||
ok.
|
||||
|
||||
amqp10_publish(Sender, Tag, Payload) when is_binary(Payload) ->
|
||||
|
|
@ -254,21 +256,8 @@ amqp10_expect_one(Session, Dest) ->
|
|||
amqp10_client:detach_link(Receiver),
|
||||
Msg.
|
||||
|
||||
amqp10_expect_count(Session, Dest, Count) ->
|
||||
LinkName = <<"dynamic-receiver-", Dest/binary>>,
|
||||
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName,
|
||||
Dest, settled,
|
||||
unsettled_state),
|
||||
ok = amqp10_client:flow_link_credit(Receiver, Count, never),
|
||||
Msgs = amqp10_expect(Receiver, Count, []),
|
||||
receive
|
||||
{amqp10_msg, Receiver, Msg} ->
|
||||
throw({unexpected_msg, Msg})
|
||||
after 500 ->
|
||||
ok
|
||||
end,
|
||||
amqp10_client:detach_link(Receiver),
|
||||
Msgs.
|
||||
amqp10_expect(Receiver, N) ->
|
||||
amqp10_expect(Receiver, N, []).
|
||||
|
||||
amqp10_expect(_, 0, Acc) ->
|
||||
Acc;
|
||||
|
|
@ -288,6 +277,35 @@ amqp10_expect(Receiver) ->
|
|||
throw(timeout_in_expect_waiting_for_delivery)
|
||||
end.
|
||||
|
||||
amqp10_declare_queue(Sess, QName, Args) ->
|
||||
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Sess, <<"mgmt link pair">>),
|
||||
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{arguments => Args}),
|
||||
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair).
|
||||
|
||||
amqp10_subscribe(Session, Dest) ->
|
||||
LinkName = <<"dynamic-receiver-", Dest/binary>>,
|
||||
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName,
|
||||
Dest, settled,
|
||||
unsettled_state),
|
||||
ok = amqp10_client:flow_link_credit(Receiver, 10, 1),
|
||||
Receiver.
|
||||
|
||||
amqp10_expect_count(Session, Dest, Count) ->
|
||||
LinkName = <<"dynamic-receiver-", Dest/binary>>,
|
||||
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName,
|
||||
Dest, settled,
|
||||
unsettled_state),
|
||||
ok = amqp10_client:flow_link_credit(Receiver, Count, never),
|
||||
Msgs = amqp10_expect(Receiver, Count, []),
|
||||
receive
|
||||
{amqp10_msg, Receiver, Msg} ->
|
||||
throw({unexpected_msg, Msg})
|
||||
after 500 ->
|
||||
ok
|
||||
end,
|
||||
amqp10_client:detach_link(Receiver),
|
||||
Msgs.
|
||||
|
||||
await_autodelete(Config, Name) ->
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||
?MODULE, await_autodelete1, [Config, Name], 10000).
|
||||
|
|
|
|||
Loading…
Reference in New Issue