rabbitmq-server/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl

244 lines
11 KiB
Erlang

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(amqp10_dynamic_SUITE).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-compile(export_all).
-import(shovel_test_utils, [with_amqp10_session/2,
amqp10_publish/3, amqp10_publish/5,
amqp10_expect_empty/2,
await_amqp10_event/3, amqp10_expect_one/2,
amqp10_expect_count/3, amqp10_publish/4,
amqp10_publish_expect/5, amqp10_declare_queue/3,
await_autodelete/2]).
-define(PARAM, <<"test">>).
all() ->
[
{group, non_parallel_tests},
{group, with_map_config}
].
groups() ->
[
{non_parallel_tests, [], [
simple,
change_definition,
simple_amqp10_dest,
amqp091_to_amqp10_with_dead_lettering,
test_amqp10_delete_after_queue_length
]},
{with_map_config, [], [
simple,
simple_amqp10_dest
]}
].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
init_per_suite(Config0) ->
{ok, _} = application:ensure_all_started(amqp10_client),
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config0, [
{rmq_nodename_suffix, ?MODULE}
]),
rabbit_ct_helpers:run_setup_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
end_per_suite(Config) ->
application:stop(amqp10_client),
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_group(with_map_config, Config) ->
rabbit_ct_helpers:set_config(Config, [{map_config, true}]);
init_per_group(_, Config) ->
rabbit_ct_helpers:set_config(Config, [{map_config, false}]).
end_per_group(_, Config) ->
Config.
init_per_testcase(Testcase, Config0) ->
SrcQ = list_to_binary(atom_to_list(Testcase) ++ "_src"),
DestQ = list_to_binary(atom_to_list(Testcase) ++ "_dest"),
DestQ2 = list_to_binary(atom_to_list(Testcase) ++ "_dest2"),
Config = [{srcq, SrcQ}, {destq, DestQ}, {destq2, DestQ2} | Config0],
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
shovel_test_utils:clear_param(Config, ?PARAM),
rabbit_ct_broker_helpers:rpc(Config, 0, shovel_test_utils, delete_all_queues, []),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
%% -------------------------------------------------------------------
%% Testcases.
%% -------------------------------------------------------------------
simple(Config) ->
Src = ?config(srcq, Config),
Dest = ?config(destq, Config),
with_amqp10_session(Config,
fun (Sess) ->
test_amqp10_destination(Config, Src, Dest, Sess, <<"amqp10">>,
<<"src-address">>)
end).
simple_amqp10_dest(Config) ->
Src = ?config(srcq, Config),
Dest = ?config(destq, Config),
with_amqp10_session(Config,
fun (Sess) ->
test_amqp10_destination(Config, Src, Dest, Sess, <<"amqp091">>,
<<"src-queue">>)
end).
amqp091_to_amqp10_with_dead_lettering(Config) ->
Dest = ?config(destq, Config),
Src = ?config(srcq, Config),
TmpQ = <<"tmp">>,
with_amqp10_session(Config,
fun (Sess) ->
amqp10_declare_queue(Sess, TmpQ, #{<<"x-max-length">> => {uint, 0},
<<"x-dead-letter-exchange">> => {utf8, <<"">>},
<<"x-dead-letter-routing-key">> => {utf8, Src}}),
{ok, Sender} = amqp10_client:attach_sender_link(Sess,
<<"sender-tmp">>,
<<"/queues/", TmpQ/binary>>,
unsettled,
unsettled_state),
ok = await_amqp10_event(link, Sender, attached),
amqp10_expect_empty(Sess, TmpQ),
test_amqp10_destination(Config, Src, Dest, Sess, <<"amqp091">>, <<"src-queue">>),
%% publish to tmp, it should be dead-lettered to src and then shovelled to dest
_ = amqp10_publish_expect(Sess, TmpQ, Dest, <<"hello">>, 1)
end).
test_amqp10_destination(Config, Src, Dest, Sess, Protocol, ProtocolSrc) ->
MapConfig = ?config(map_config, Config),
shovel_test_utils:set_param(Config, ?PARAM,
[{<<"src-protocol">>, Protocol},
{ProtocolSrc, Src},
{<<"dest-protocol">>, <<"amqp10">>},
{<<"dest-address">>, Dest},
{<<"dest-add-forward-headers">>, true},
{<<"dest-add-timestamp-header">>, true},
{<<"dest-application-properties">>,
case MapConfig of
true ->
#{<<"app-prop-key">> => <<"app-prop-value">>};
_ ->
[{<<"app-prop-key">>, <<"app-prop-value">>}]
end},
{<<"dest-properties">>,
case MapConfig of
true ->
#{<<"user_id">> => <<"guest">>};
_ ->
[{<<"user_id">>, <<"guest">>}]
end},
{<<"dest-message-annotations">>,
case MapConfig of
true ->
#{<<"x-message-ann-key">> =>
<<"message-ann-value">>};
_ ->
[{<<"x-message-ann-key">>,
<<"message-ann-value">>}]
end}]),
[Msg] = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1),
AppProps = amqp10_msg:application_properties(Msg),
Anns = amqp10_msg:message_annotations(Msg),
%% We no longer add/override properties, application properties or
%% message annotations. Just the forward headers and timestamp as
%% message annotations. The AMQP 1.0 message is inmutable
?assertNot(maps:is_key(user_id, amqp10_msg:properties(Msg))),
?assertNot(maps:is_key(<<"app-prop-key">>, AppProps)),
?assertEqual(undefined, maps:get(<<"delivery_mode">>, AppProps, undefined)),
?assertNot(maps:is_key(<<"x-message-ann-key">>, Anns)),
?assertMatch(#{<<"x-opt-shovel-name">> := <<"test">>,
<<"x-opt-shovel-type">> := <<"dynamic">>,
<<"x-opt-shovelled-by">> := _,
<<"x-opt-shovelled-timestamp">> := _
}, Anns).
change_definition(Config) ->
Src = ?config(srcq, Config),
Dest = ?config(destq, Config),
Dest2 = ?config(destq2, Config),
with_amqp10_session(Config,
fun (Sess) ->
shovel_test_utils:set_param(Config, ?PARAM,
[{<<"src-address">>, Src},
{<<"src-protocol">>, <<"amqp10">>},
{<<"dest-protocol">>, <<"amqp10">>},
{<<"dest-address">>, Dest}]),
amqp10_publish_expect(Sess, Src, Dest, <<"hello1">>, 1),
shovel_test_utils:set_param(Config, ?PARAM,
[{<<"src-address">>, Src},
{<<"src-protocol">>, <<"amqp10">>},
{<<"dest-protocol">>, <<"amqp10">>},
{<<"dest-address">>, Dest2}]),
amqp10_publish_expect(Sess, Src, Dest2, <<"hello2">>, 1),
amqp10_expect_empty(Sess, Dest),
shovel_test_utils:clear_param(Config, <<"test">>),
amqp10_publish_expect(Sess, Src, Src, <<"hello3">>, 1),
amqp10_expect_empty(Sess, Dest),
amqp10_expect_empty(Sess, Dest2)
end).
test_amqp10_delete_after_queue_length(Config) ->
Src = ?config(srcq, Config),
Dest = ?config(destq, Config),
Uri = shovel_test_utils:make_uri(Config, 0),
Error = rabbit_ct_broker_helpers:rpc(
Config, 0,
rabbit_runtime_parameters, set,
[<<"/">>, <<"shovel">>, <<"test">>, [{<<"src-uri">>, Uri},
{<<"dest-uri">>, [Uri]},
{<<"src-protocol">>, <<"amqp10">>},
{<<"src-address">>, Src},
{<<"src-delete-after">>, <<"queue-length">>},
{<<"dest-protocol">>, <<"amqp10">>},
{<<"dest-address">>, Dest}],
none]),
?assertMatch({error_string, _}, Error),
{_, Msg} = Error,
?assertMatch(match, re:run(Msg, "Validation failed.*", [{capture, none}])).
%%----------------------------------------------------------------------------
publish(Sender, Msg) ->
ok = amqp10_client:send_msg(Sender, Msg),
Tag = amqp10_msg:delivery_tag(Msg),
receive
{amqp10_disposition, {accepted, Tag}} -> ok
after 3000 ->
exit(publish_disposition_not_received)
end.
publish_expect_msg(Session, Source, Dest, Msg) ->
LinkName = <<"dynamic-sender-", Dest/binary>>,
{ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, Source,
unsettled, unsettled_state),
ok = await_amqp10_event(link, Sender, attached),
publish(Sender, Msg),
amqp10_client:detach_link(Sender),
amqp10_expect_one(Session, Dest).
lookup_user(Config, Name) ->
{ok, User} = rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_access_control, check_user_login, [Name, []]),
User.