rabbitmq-server/deps/rabbit/test/amqp_address_SUITE.erl

665 lines
25 KiB
Erlang
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 VMware, Inc. or its affiliates. All rights reserved.
-module(amqp_address_SUITE).
-compile([export_all,
nowarn_export_all]).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-include_lib("rabbitmq_amqp_client/include/rabbitmq_amqp_client.hrl").
-import(rabbit_ct_broker_helpers,
[rpc/4]).
-import(rabbit_ct_helpers,
[eventually/1]).
-import(amqp_utils,
[flush/1,
wait_for_credit/1]).
all() ->
[
{group, v1_permitted},
{group, v1_denied}
].
groups() ->
[
{v1_permitted, [shuffle],
common_tests()
},
{v1_denied, [shuffle],
[
target_queue_absent,
source_queue_absent,
target_bad_v2_address,
source_bad_v2_address
] ++ common_tests()
}
].
common_tests() ->
[
target_exchange_routing_key,
target_exchange_routing_key_with_slash,
target_exchange_routing_key_empty,
target_exchange,
target_exchange_absent,
queue,
queue_with_slash,
target_per_message_exchange_routing_key,
target_per_message_exchange,
target_per_message_queue,
target_per_message_unset_to_address,
target_per_message_bad_to_address,
target_per_message_exchange_absent_settled,
target_per_message_exchange_absent_unsettled,
target_bad_address,
source_bad_address
].
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(amqp10_client),
rabbit_ct_helpers:log_environment(),
Config.
end_per_suite(Config) ->
Config.
init_per_group(Group, Config0) ->
PermitV1 = case Group of
v1_permitted -> true;
v1_denied -> false
end,
Config = rabbit_ct_helpers:merge_app_env(
Config0,
{rabbit,
[{permit_deprecated_features,
#{amqp_address_v1 => PermitV1}
}]
}),
rabbit_ct_helpers:run_setup_steps(
Config,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
end_per_group(_Group, Config) ->
rabbit_ct_helpers:run_teardown_steps(
Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
%% Test v2 target address
%% /exchanges/:exchange/:routing-key
target_exchange_routing_key(Config) ->
XName = <<"👉"/utf8>>,
RKey = <<"🗝️"/utf8>>,
target_exchange_routing_key0(XName, RKey, Config).
%% Test v2 target address
%% /exchanges/:exchange/:routing-key
%% where both :exchange and :routing-key contains a "/" character.
target_exchange_routing_key_with_slash(Config) ->
XName = <<"my/exchange">>,
RKey = <<"my/key">>,
target_exchange_routing_key0(XName, RKey, Config).
target_exchange_routing_key0(XName, RKey, Config) ->
TargetAddr = rabbitmq_amqp_address:exchange(XName, RKey),
QName = atom_to_binary(?FUNCTION_NAME),
Init = {_, LinkPair = #link_pair{session = Session}} = init(Config),
ok = rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{}),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, XName, RKey, #{}),
SrcAddr = rabbitmq_amqp_address:queue(QName),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, SrcAddr),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr),
ok = wait_for_credit(Sender),
Body = <<"body">>,
Msg0 = amqp10_msg:new(<<"tag">>, Body, true),
%% Although mc_amqp:essential_properties/1 parses these annotations, they should be ignored.
Msg1 = amqp10_msg:set_message_annotations(
#{<<"x-exchange">> => <<"ignored">>,
<<"x-routing-key">> => <<"ignored">>},
Msg0),
ok = amqp10_client:send_msg(Sender, Msg1),
{ok, Msg} = amqp10_client:get_msg(Receiver),
?assertEqual([Body], amqp10_msg:body(Msg)),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = rabbitmq_amqp_client:delete_exchange(LinkPair, XName),
ok = cleanup(Init).
%% Test v2 target address
%% /exchanges/:exchange/
%% Routing key is empty.
target_exchange_routing_key_empty(Config) ->
XName = <<"amq.fanout">>,
TargetAddr = rabbitmq_amqp_address:exchange(XName, <<>>),
QName = atom_to_binary(?FUNCTION_NAME),
Init = {_, LinkPair = #link_pair{session = Session}} = init(Config),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, XName, <<"ignored">>, #{}),
SrcAddr = rabbitmq_amqp_address:queue(QName),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, SrcAddr),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr),
ok = wait_for_credit(Sender),
Body = <<"body">>,
Msg0 = amqp10_msg:new(<<"tag">>, Body, true),
ok = amqp10_client:send_msg(Sender, Msg0),
{ok, Msg} = amqp10_client:get_msg(Receiver),
?assertEqual([Body], amqp10_msg:body(Msg)),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = cleanup(Init).
%% Test v2 target address
%% /exchanges/:exchange
%% Routing key is empty.
target_exchange(Config) ->
XName = <<"amq.fanout">>,
TargetAddr = rabbitmq_amqp_address:exchange(XName),
QName = atom_to_binary(?FUNCTION_NAME),
Init = {_, LinkPair = #link_pair{session = Session}} = init(Config),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, XName, <<"ignored">>, #{}),
SrcAddr = rabbitmq_amqp_address:queue(QName),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, SrcAddr),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr),
ok = wait_for_credit(Sender),
Body = <<"body">>,
Msg0 = amqp10_msg:new(<<"tag">>, Body, true),
ok = amqp10_client:send_msg(Sender, Msg0),
{ok, Msg} = amqp10_client:get_msg(Receiver),
?assertEqual([Body], amqp10_msg:body(Msg)),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = cleanup(Init).
%% Test v2 target address
%% /exchanges/:exchange
%% where the target exchange does not exist.
target_exchange_absent(Config) ->
XName = <<"🎈"/utf8>>,
TargetAddr = rabbitmq_amqp_address:exchange(XName),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, _Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr),
receive
{amqp10_event,
{session, Session,
{ended,
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, <<"no exchange '", XName:(byte_size(XName))/binary,
"' in vhost '/'">>}}}}} -> ok
after 5000 ->
Reason = {missing_event, ?LINE},
flush(Reason),
ct:fail(Reason)
end,
ok = amqp10_client:close_connection(Connection).
%% Test v2 target and source address
%% /queues/:queue
queue(Config) ->
QName = <<"🎈"/utf8>>,
queue0(QName, Config).
%% Test v2 target and source address
%% /queues/:queue
%% where :queue contains a "/" character.
queue_with_slash(Config) ->
QName = <<"my/queue">>,
queue0(QName, Config).
queue0(QName, Config) ->
Addr = rabbitmq_amqp_address:queue(QName),
Init = {_, LinkPair = #link_pair{session = Session}} = init(Config),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Addr),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Addr),
ok = wait_for_credit(Sender),
Body = <<"body">>,
Msg0 = amqp10_msg:new(<<"tag">>, Body, true),
ok = amqp10_client:send_msg(Sender, Msg0),
{ok, Msg} = amqp10_client:get_msg(Receiver),
?assertEqual([Body], amqp10_msg:body(Msg)),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = cleanup(Init).
%% Test v2 target address
%% /queues/:queue
%% where the target queue does not exist.
target_queue_absent(Config) ->
QName = <<"🎈"/utf8>>,
TargetAddr = rabbitmq_amqp_address:queue(QName),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, _Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr),
receive
{amqp10_event,
{session, Session,
{ended,
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, <<"no queue '", QName:(byte_size(QName))/binary,
"' in vhost '/'">>}}}}} -> ok
after 5000 ->
Reason = {missing_event, ?LINE},
flush(Reason),
ct:fail(Reason)
end,
ok = amqp10_client:close_connection(Connection).
%% Test v2 target address 'null' and 'to'
%% /exchanges/:exchange/:routing-key
%% with varying routing keys.
target_per_message_exchange_routing_key(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
DirectX = <<"amq.direct">>,
RKey1 = <<"🗝1"/utf8>>,
RKey2 = <<"🗝2"/utf8>>,
To1 = rabbitmq_amqp_address:exchange(DirectX, RKey1),
To2 = rabbitmq_amqp_address:exchange(DirectX, RKey2),
Init = {_, LinkPair = #link_pair{session = Session}} = init(Config),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, DirectX, RKey1, #{}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, DirectX, RKey2, #{}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null),
ok = wait_for_credit(Sender),
Tag1 = Body1 = <<1>>,
Tag2 = Body2 = <<2>>,
%% Although mc_amqp:essential_properties/1 parses the x-exchange annotation, it should be ignored.
Msg1 = amqp10_msg:set_message_annotations(
#{<<"x-exchange">> => <<"ignored">>},
amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, Body1))),
Msg2 = amqp10_msg:set_properties(#{to => To2}, amqp10_msg:new(Tag2, Body2)),
ok = amqp10_client:send_msg(Sender, Msg1),
ok = amqp10_client:send_msg(Sender, Msg2),
ok = wait_for_settled(accepted, Tag1),
ok = wait_for_settled(accepted, Tag2),
{ok, #{message_count := 2}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = cleanup(Init).
%% Test v2 target address 'null' and 'to'
%% /exchanges/:exchange
%% with varying exchanges.
target_per_message_exchange(Config) ->
XFanout = <<"amq.fanout">>,
XHeaders = <<"amq.headers">>,
To1 = rabbitmq_amqp_address:exchange(XFanout),
To2 = rabbitmq_amqp_address:exchange(XHeaders),
QName = atom_to_binary(?FUNCTION_NAME),
Init = {_, LinkPair = #link_pair{session = Session}} = init(Config),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, XFanout, <<>>, #{}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, XHeaders, <<>>,
#{<<"my key">> => true,
<<"x-match">> => {utf8, <<"any">>}}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null),
ok = wait_for_credit(Sender),
Tag1 = Body1 = <<1>>,
Tag2 = Body2 = <<2>>,
Msg1 = amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, Body1)),
Msg2 = amqp10_msg:set_application_properties(
#{<<"my key">> => true},
amqp10_msg:set_properties(#{to => To2}, amqp10_msg:new(Tag2, Body2))),
ok = amqp10_client:send_msg(Sender, Msg1),
ok = amqp10_client:send_msg(Sender, Msg2),
ok = wait_for_settled(accepted, Tag1),
ok = wait_for_settled(accepted, Tag2),
{ok, #{message_count := 2}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = cleanup(Init).
%% Test v2 target address 'null' and 'to'
%% /queues/:queue
target_per_message_queue(Config) ->
Q1 = <<"q1">>,
Q2 = <<"q2">>,
Q3 = <<"q3">>,
To1 = rabbitmq_amqp_address:queue(Q1),
To2 = rabbitmq_amqp_address:queue(Q2),
To3 = rabbitmq_amqp_address:queue(Q3),
Init = {_, LinkPair = #link_pair{session = Session}} = init(Config),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, Q1, #{}),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, Q2, #{}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null),
ok = wait_for_credit(Sender),
Tag1 = Body1 = <<1>>,
Tag2 = Body2 = <<2>>,
Tag3 = Body3 = <<3>>,
Msg1 = amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, Body1)),
Msg2 = amqp10_msg:set_properties(#{to => To2}, amqp10_msg:new(Tag2, Body2)),
Msg3 = amqp10_msg:set_properties(#{to => To3}, amqp10_msg:new(Tag3, Body3)),
ok = amqp10_client:send_msg(Sender, Msg1),
ok = amqp10_client:send_msg(Sender, Msg2),
ok = amqp10_client:send_msg(Sender, Msg3),
ok = wait_for_settled(accepted, Tag1),
ok = wait_for_settled(accepted, Tag2),
ok = wait_for_settled(released, Tag3),
{ok, #{message_count := 1}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q1),
{ok, #{message_count := 1}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q2),
ok = cleanup(Init).
%% Test v2 target address 'null', but 'to' not set.
target_per_message_unset_to_address(Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null),
ok = wait_for_credit(Sender),
%% Send message with 'to' unset.
DTag = <<1>>,
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag, <<0>>)),
ExpectedError = #'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}},
ok = wait_for_settled({rejected, ExpectedError}, DTag),
ok = amqp10_client:detach_link(Sender),
receive {amqp10_event, {link, Sender, {detached, normal}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
end,
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).
bad_v2_addresses() ->
[
%% valid v1, but bad v2 target addresses
<<"/topic/mytopic">>,
<<"/amq/queue/myqueue">>,
<<"myqueue">>,
<<"/queue">>,
%% bad v2 target addresses
<<>>,
<<0>>,
<<"/">>,
<<"//">>,
<<"/queues">>,
<<"/queues/">>,
<<"/queue/">>,
<<"/exchanges">>,
%% default exchange in v2 target address is disallowed
<<"/exchanges/">>,
<<"/exchanges//">>,
<<"/exchanges//mykey">>,
<<"/exchanges/amq.default">>,
<<"/exchanges/amq.default/">>,
<<"/exchanges/amq.default/mykey">>,
<<"/ex/✋"/utf8>>,
<<"/exchange">>,
<<"/exchange/">>,
<<"/exchange/amq.default">>,
<<"/exchange//key/">>,
<<"/exchange//key/mykey">>,
<<"/exchange/amq.default/key/">>,
<<"/exchange/amq.default/key/mykey">>,
%% The following addresses should be percent encoded, but aren't.
<<"/queues/missing%encoding">>,
<<"/queues/missing/encoding">>,
<<"/queues/✋"/utf8>>,
<<"/exchanges/missing%encoding">>,
<<"/exchanges/missing/encoding/routingkey">>,
<<"/exchanges/exchange/missing%encoding">>,
<<"/exchanges/✋"/utf8>>
].
%% Test v2 target address 'null' with an invalid 'to' addresses.
target_per_message_bad_to_address(Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null),
ok = wait_for_credit(Sender),
lists:foreach(
fun(Addr) ->
DTag = <<"some delivery tag">>,
Msg = amqp10_msg:set_properties(#{to => Addr}, amqp10_msg:new(DTag, <<0>>, false)),
ok = amqp10_client:send_msg(Sender, Msg),
receive
{amqp10_disposition, {{rejected, Error}, DTag}} ->
?assertMatch(#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"bad 'to' address", _Rest/binary>>}},
Error)
after 5000 ->
flush(missing_disposition),
ct:fail(missing_disposition)
end
end, bad_v2_addresses()),
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).
target_per_message_exchange_absent_settled(Config) ->
Init = {_, LinkPair = #link_pair{session = Session}} = init(Config),
XName = <<"🎈"/utf8>>,
Address = rabbitmq_amqp_address:exchange(XName),
ok = rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null),
ok = wait_for_credit(Sender),
DTag1 = <<1>>,
Msg1 = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag1, <<"m1">>)),
ok = amqp10_client:send_msg(Sender, Msg1),
ok = wait_for_settled(released, DTag1),
ok = rabbitmq_amqp_client:delete_exchange(LinkPair, XName),
DTag2 = <<2>>,
Msg2 = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag2, <<"m2">>, true)),
ok = amqp10_client:send_msg(Sender, Msg2),
%% "the routing node MUST detach the link over which the message was sent with an error.
%% [...] Additionally the info field of error MUST contain an entry with symbolic key delivery-tag
%% and binary value of the delivery-tag of the message which caused the failure."
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors
receive {amqp10_event, {link, Sender, {detached, Error}}} ->
?assertEqual(
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, <<"no exchange '", XName/binary, "' in vhost '/'">>},
info = {map, [{{symbol, <<"delivery-tag">>}, {binary, DTag2}}]}
},
Error)
after 5000 -> ct:fail("server did not close our outgoing link")
end,
ok = cleanup(Init).
target_per_message_exchange_absent_unsettled(Config) ->
Init = {_, LinkPair = #link_pair{session = Session}} = init(Config),
XName = <<"🎈"/utf8>>,
Address = rabbitmq_amqp_address:exchange(XName),
ok = rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null),
ok = wait_for_credit(Sender),
DTag1 = <<"my tag">>,
Msg1 = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag1, <<"hey">>)),
ok = amqp10_client:send_msg(Sender, Msg1),
ok = wait_for_settled(released, DTag1),
ok = rabbitmq_amqp_client:delete_exchange(LinkPair, XName),
%% "If the source of the link supports the rejected outcome, and the message has not
%% already been settled by the sender, then the routing node MUST reject the message.
%% In this case the error field of rejected MUST contain the error which would have been communicated
%% in the detach which would have be sent if a link to the same address had been attempted."
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors
%% We test here multiple rejections implicilty checking that link flow control works correctly.
ExpectedError = #'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, <<"no exchange '", XName/binary, "' in vhost '/'">>}},
[begin
DTag = Body = integer_to_binary(N),
Msg = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag, Body, false)),
ok = amqp10_client:send_msg(Sender, Msg),
ok = wait_for_settled({rejected, ExpectedError}, DTag)
end || N <- lists:seq(1, 300)],
ok = cleanup(Init).
target_bad_address(Config) ->
%% bad v1 and bad v2 target address
TargetAddr = <<"/qqq/🎈"/utf8>>,
target_bad_address0(TargetAddr, Config).
target_bad_v2_address(Config) ->
lists:foreach(fun(Addr) ->
ok = target_bad_address0(Addr, Config)
end, bad_v2_addresses()).
target_bad_address0(TargetAddress, Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, _Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddress),
receive
{amqp10_event,
{session, Session,
{ended,
#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD}}}} -> ok
after 5000 ->
Reason = {missing_event, ?LINE, TargetAddress},
flush(Reason),
ct:fail(Reason)
end,
ok = amqp10_client:close_connection(Connection).
%% Test v2 source address
%% /queues/:queue
%% where the source queue does not exist.
source_queue_absent(Config) ->
QName = <<"🎈"/utf8>>,
SourceAddr = rabbitmq_amqp_address:queue(QName),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, _Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, SourceAddr),
receive
{amqp10_event,
{session, Session,
{ended,
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, <<"no queue '", QName:(byte_size(QName))/binary,
"' in vhost '/'">>}}}}} -> ok
after 5000 ->
Reason = {missing_event, ?LINE},
flush(Reason),
ct:fail(Reason)
end,
ok = amqp10_client:close_connection(Connection).
source_bad_address(Config) ->
%% bad v1 and bad v2 source address
SourceAddr = <<"/qqq/🎈"/utf8>>,
source_bad_address0(SourceAddr, Config).
source_bad_v2_address(Config) ->
%% valid v1, but bad v2 source addresses
SourceAddresses = [<<"/exchange/myroutingkey">>,
<<"/topic/mytopic">>,
<<"/amq/queue/myqueue">>,
<<"myqueue">>],
lists:foreach(fun(Addr) ->
ok = source_bad_address0(Addr, Config)
end, SourceAddresses).
source_bad_address0(SourceAddress, Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, _Receiver} = amqp10_client:attach_receiver_link(Session, <<"sender">>, SourceAddress),
receive
{amqp10_event,
{session, Session,
{ended,
#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD}}}} -> ok
after 5000 ->
Reason = {missing_event, ?LINE},
flush(Reason),
ct:fail(Reason)
end,
ok = amqp10_client:close_connection(Connection).
init(Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"mgmt link pair">>),
{Connection, LinkPair}.
cleanup({Connection, LinkPair = #link_pair{session = Session}}) ->
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).
connection_config(Config) ->
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
#{address => Host,
port => Port,
container_id => <<"my container">>,
sasl => {plain, <<"guest">>, <<"guest">>}}.
wait_for_settled(State, Tag) ->
receive
{amqp10_disposition, {State, Tag}} ->
ok
after 5000 ->
Reason = {?FUNCTION_NAME, State, Tag},
flush(Reason),
ct:fail(Reason)
end.