rabbitmq-server/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl

555 lines
22 KiB
Erlang

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(amqp091_dynamic_SUITE).
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-import(rabbit_ct_helpers, [eventually/1]).
-import(shovel_test_utils, [await_autodelete/2,
invalid_param/2, invalid_param/3,
valid_param/2, valid_param/3,
with_amqp091_ch/2, amqp091_publish_expect/5,
amqp091_publish/4, amqp091_expect_empty/2,
amqp091_expect/3]).
-compile(export_all).
-export([spawn_suspender_proc/1]).
all() ->
[
{group, core_tests},
{group, quorum_queue_tests},
{group, stream_queue_tests}
].
groups() ->
[
{core_tests, [], [
set_properties_using_proplist,
set_properties_using_map,
set_empty_properties_using_proplist,
set_empty_properties_using_map,
headers,
restart,
validation,
security_validation,
get_connection_name,
credit_flow
]},
{quorum_queue_tests, [], [
quorum_queues
]},
{stream_queue_tests, [], [
stream_queues
]}
].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, ?MODULE},
{ignored_crashes, [
"server_initiated_close,404",
"writer,send_failed,closed"
]}
]),
rabbit_ct_helpers:run_setup_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_group(quorum_queue_tests, Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
false -> Config;
_ -> {skip, "quorum queue tests are skipped in mixed mode"}
end;
init_per_group(stream_queue_tests, Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
false -> Config;
_ -> {skip, "stream queue tests are skipped in mixed mode"}
end;
init_per_group(_, Config) ->
Config.
end_per_group(_, Config) ->
Config.
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, cleanup1, [Config]),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
%% -------------------------------------------------------------------
%% Testcases.
%% -------------------------------------------------------------------
quorum_queues(Config) ->
with_amqp091_ch(Config,
fun (Ch) ->
shovel_test_utils:set_param(
Config,
<<"test">>, [
{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>},
{<<"src-queue-args">>, #{<<"x-queue-type">> => <<"quorum">>}},
{<<"dest-queue-args">>, #{<<"x-queue-type">> => <<"quorum">>}}
]),
amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>)
end).
stream_queues(Config) ->
with_amqp091_ch(Config,
fun (Ch) ->
shovel_test_utils:set_param(
Config,
<<"test">>, [
{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>},
{<<"src-queue-args">>, #{<<"x-queue-type">> => <<"stream">>}},
{<<"src-consumer-args">>, #{<<"x-stream-offset">> => <<"first">>}}
]),
amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>)
end).
set_properties_using_map(Config) ->
with_amqp091_ch(Config,
fun (Ch) ->
Ps = [{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>},
{<<"publish-properties">>, #{<<"cluster_id">> => <<"x">>}}],
shovel_test_utils:set_param(Config, <<"test">>, Ps),
#amqp_msg{props = #'P_basic'{cluster_id = Cluster}} =
amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi">>),
<<"x">> = Cluster
end).
set_properties_using_proplist(Config) ->
with_amqp091_ch(Config,
fun (Ch) ->
Ps = [{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>},
{<<"publish-properties">>, [{<<"cluster_id">>, <<"x">>}]}],
shovel_test_utils:set_param(Config, <<"test">>, Ps),
#amqp_msg{props = #'P_basic'{cluster_id = Cluster}} =
amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi">>),
<<"x">> = Cluster
end).
set_empty_properties_using_map(Config) ->
with_amqp091_ch(Config,
fun (Ch) ->
Ps = [{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>},
{<<"publish-properties">>, #{}}],
shovel_test_utils:set_param(Config, <<"test">>, Ps),
#amqp_msg{props = #'P_basic'{}} =
amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi">>)
end).
set_empty_properties_using_proplist(Config) ->
with_amqp091_ch(Config,
fun (Ch) ->
Ps = [{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>},
{<<"publish-properties">>, []}],
shovel_test_utils:set_param(Config, <<"test">>, Ps),
#amqp_msg{props = #'P_basic'{}} =
amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi">>)
end).
headers(Config) ->
with_amqp091_ch(Config,
fun(Ch) ->
%% No headers by default
shovel_test_utils:set_param(Config,
<<"test">>,
[{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>}]),
?assertMatch(#amqp_msg{props = #'P_basic'{headers = H0}}
when H0 == undefined orelse H0 == [],
amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi1">>)),
shovel_test_utils:set_param(Config,
<<"test">>,
[{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>},
{<<"add-forward-headers">>, true},
{<<"add-timestamp-header">>, true}]),
Timestmp = os:system_time(seconds),
#amqp_msg{props = #'P_basic'{headers = Headers}} =
amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi2">>),
[{<<"x-shovelled">>, _, [{table, ShovelledHeader}]},
{<<"x-shovelled-timestamp">>, long, TS}] = Headers,
%% We assume that the message was shovelled within a 2 second
%% window.
true = TS >= Timestmp andalso TS =< Timestmp + 2,
{<<"shovel-type">>, _, <<"dynamic">>} =
lists:keyfind(<<"shovel-type">>, 1, ShovelledHeader),
{<<"shovel-vhost">>, _, <<"/">>} =
lists:keyfind(<<"shovel-vhost">>, 1, ShovelledHeader),
{<<"shovel-name">>, _, <<"test">>} =
lists:keyfind(<<"shovel-name">>, 1, ShovelledHeader),
shovel_test_utils:set_param(Config,
<<"test">>,
[{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>},
{<<"add-timestamp-header">>, true}]),
#amqp_msg{props = #'P_basic'{headers = [{<<"x-shovelled-timestamp">>,
long, _}]}} =
amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi3">>),
shovel_test_utils:set_param(Config,
<<"test">>,
[{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>},
{<<"add-forward-headers">>, true}]),
#amqp_msg{props = #'P_basic'{headers = [{<<"x-shovelled">>,
_, _}]}} =
amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hi4">>)
end).
restart(Config) ->
with_amqp091_ch(Config,
fun (Ch) ->
shovel_test_utils:set_param(Config,
<<"test">>, [{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>}]),
%% The catch is because connections link to the shovel,
%% so one connection will die, kill the shovel, kill
%% the other connection, then we can't close it
Conns = rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_direct, list, []),
[catch amqp_connection:close(C) || C <- Conns],
amqp091_publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>)
end).
validation(Config) ->
URIs = [{<<"src-uri">>, <<"amqp://">>},
{<<"dest-uri">>, <<"amqp://">>}],
%% Need valid src and dest URIs
invalid_param(Config, []),
invalid_param(Config,
[{<<"src-queue">>, <<"test">>},
{<<"src-uri">>, <<"derp">>},
{<<"dest-uri">>, <<"amqp://">>}]),
invalid_param(Config,
[{<<"src-queue">>, <<"test">>},
{<<"src-uri">>, [<<"derp">>]},
{<<"dest-uri">>, <<"amqp://">>}]),
invalid_param(Config,
[{<<"src-queue">>, <<"test">>},
{<<"dest-uri">>, <<"amqp://">>}]),
%% Also need src exchange or queue
invalid_param(Config,
URIs),
valid_param(Config,
[{<<"src-exchange">>, <<"test">>} | URIs]),
QURIs = [{<<"src-queue">>, <<"test">>} | URIs],
valid_param(Config, QURIs),
%% But not both
invalid_param(Config,
[{<<"src-exchange">>, <<"test">>} | QURIs]),
%% Check these are of right type
invalid_param(Config,
[{<<"prefetch-count">>, <<"three">>} | QURIs]),
invalid_param(Config,
[{<<"reconnect-delay">>, <<"three">>} | QURIs]),
invalid_param(Config,
[{<<"ack-mode">>, <<"whenever">>} | QURIs]),
invalid_param(Config,
[{<<"src-delete-after">>, <<"whenever">>} | QURIs]),
%% Check properties have to look property-ish
valid_param(Config,
[{<<"src-exchange">>, <<"test">>},
{<<"publish-properties">>, [{<<"cluster_id">>, <<"rabbit@localhost">>},
{<<"delivery_mode">>, 2}]}
| URIs]),
valid_param(Config,
#{<<"publish-properties">> => #{<<"cluster_id">> => <<"rabbit@localhost">>,
<<"delivery_mode">> => 2},
<<"src-exchange">> => <<"test">>,
<<"src-uri">> => <<"amqp://">>,
<<"dest-uri">> => <<"amqp://">>}),
invalid_param(Config,
[{<<"publish-properties">>, [{<<"nonexistent">>, <<>>}]}]),
invalid_param(Config,
#{<<"publish-properties">> => #{<<"nonexistent">> => <<>>}}),
invalid_param(Config,
[{<<"publish-properties">>, [{<<"cluster_id">>, 2}]}]),
invalid_param(Config,
[{<<"publish-properties">>, <<"something">>}]),
%% Can't use explicit message count and no-ack together
invalid_param(Config,
[{<<"src-delete-after">>, 1},
{<<"ack-mode">>, <<"no-ack">>} | QURIs]),
%% superseded by src-delete-after
invalid_param(Config,
[{<<"delete-after">>, 1},
{<<"ack-mode">>, <<"no-ack">>} | QURIs]),
ok.
security_validation(Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, security_validation_add_user, []),
Qs = [{<<"src-queue">>, <<"test">>},
{<<"dest-queue">>, <<"test2">>}],
A = lookup_user(Config, <<"a">>),
valid_param(Config, [{<<"src-uri">>, <<"amqp://localhost:5672/a">>},
{<<"dest-uri">>, <<"amqp://localhost:5672/b">>} | Qs], A),
%% src-uri and dest-uri are not valid URIs
invalid_param(Config,
[{<<"src-uri">>, <<"an arbitrary string">>},
{<<"dest-uri">>, <<"\o/ \o/ \o/">>} | Qs], A),
%% missing src-queue and dest-queue
invalid_param(Config,
[{<<"src-uri">>, <<"amqp://localhost/a">>},
{<<"dest-uri">>, <<"amqp://localhost/b">>}], A),
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, security_validation_remove_user, []),
ok.
security_validation_add_user() ->
[begin
rabbit_vhost:add(U, <<"acting-user">>),
rabbit_auth_backend_internal:add_user(U, <<>>, <<"acting-user">>),
rabbit_auth_backend_internal:set_permissions(
U, U, <<".*">>, <<".*">>, <<".*">>, <<"acting-user">>)
end || U <- [<<"a">>, <<"b">>]],
ok.
security_validation_remove_user() ->
[begin
rabbit_vhost:delete(U, <<"acting-user">>),
rabbit_auth_backend_internal:delete_user(U, <<"acting-user">>)
end || U <- [<<"a">>, <<"b">>]],
ok.
get_connection_name(_Config) ->
<<"Shovel static_shovel_name_as_atom">> = rabbit_shovel_worker:get_connection_name(static_shovel_name_as_atom),
<<"Shovel dynamic_shovel_name_as_binary">> = rabbit_shovel_worker:get_connection_name({<<"/">>, <<"dynamic_shovel_name_as_binary">>}),
<<"Shovel">> = rabbit_shovel_worker:get_connection_name({<<"/">>, {unexpected, tuple}}),
<<"Shovel">> = rabbit_shovel_worker:get_connection_name({one, two, three}),
<<"Shovel">> = rabbit_shovel_worker:get_connection_name(<<"anything else">>).
credit_flow(Config) ->
OrigCredit = set_default_credit(Config, {20, 10}),
with_amqp091_ch(Config,
fun (Ch) ->
try
shovel_test_utils:set_param_nowait(
Config,
<<"test">>, [{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>},
{<<"src-prefetch-count">>, 50},
{<<"ack-mode">>, <<"on-publish">>},
{<<"src-delete-after">>, <<"never">>}]),
shovel_test_utils:await_shovel(Config, <<"test">>),
running = shovel_test_utils:get_shovel_status(Config, <<"test">>),
ShovelPid = find_shovel_pid(Config),
#{dest :=
#{current :=
{_DestConn, DestChan, _DestUri}}} =
get_shovel_state(ShovelPid),
WriterPid = find_writer_pid_for_channel(Config, DestChan),
%% When the broker-side channel is blocked by flow
%% control, it stops reading from the tcp
%% socket. After all the OS, BEAM and process buffers
%% are full, gen_tcp:send/2 will block the writer
%% process. Simulate this by suspending the writer process.
true = suspend_process(Config, WriterPid),
%% Publish 1000 messages to the src queue
amqp_channel:call(Ch, #'confirm.select'{}),
publish_count(Ch, <<>>, <<"src">>, <<"hello">>, 1000),
amqp_channel:wait_for_confirms(Ch),
%% Wait until the shovel is blocked
shovel_test_utils:await(
fun() ->
case shovel_test_utils:get_shovel_status(Config, <<"test">>) of
flow -> true;
Status -> Status
end
end,
5000),
%% There should be only one process with a message buildup
Top = [{WriterPid, MQLen, _}, {_, P, _} | _] =
rabbit_ct_broker_helpers:rpc(
Config, 0, recon, proc_count, [message_queue_len, 10]),
ct:pal("Top processes by message queue length: ~p", [Top]),
?assert(P < 3),
%% The writer process should have only a limited
%% message queue. The shovel stops sending messages
%% when the channel and shovel process used up all
%% their initial credit (that is 20 + 20).
2 * 20 = MQLen = proc_info(WriterPid, message_queue_len),
%% Most messages should still be in the queue either ready or unacked
ExpDestCnt = 0,
#{messages := ExpDestCnt} = message_count(Config, <<"dest">>),
ExpSrcCnt = 1000 - MQLen,
#{messages := ExpSrcCnt,
messages_unacknowledged := 50} = message_count(Config, <<"src">>),
%% After the writer process is resumed all messages
%% should be shoveled to the dest queue, and process
%% message queues should be empty
resume_process(Config),
shovel_test_utils:await(
fun() ->
#{messages := Cnt} = message_count(Config, <<"src">>),
Cnt =:= 0
end,
5000),
#{messages := 1000} = message_count(Config, <<"dest">>),
[{_, P, _}] =
rabbit_ct_broker_helpers:rpc(
Config, 0, recon, proc_count, [message_queue_len, 1]),
?assert(P < 3),
%% Status only transitions from flow to running
%% after a 1 second state-change-interval
timer:sleep(1000),
running = shovel_test_utils:get_shovel_status(Config, <<"test">>)
after
resume_process(Config),
set_default_credit(Config, OrigCredit)
end
end).
%%----------------------------------------------------------------------------
publish_count(Ch, X, Key, M, Count) ->
[begin
amqp091_publish(Ch, X, Key, M)
end || _ <- lists:seq(1, Count)].
expect_count(Ch, Q, M, Count) ->
[begin
amqp091_expect(Ch, Q, M)
end || _ <- lists:seq(1, Count)],
amqp091_expect_empty(Ch, Q).
lookup_user(Config, Name) ->
{ok, User} = rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_access_control, check_user_login, [Name, []]),
User.
set_default_credit(Config, Value) ->
Key = credit_flow_default_credit,
OrigValue = rabbit_ct_broker_helpers:rpc(Config, persistent_term, get, [Key]),
ok = rabbit_ct_broker_helpers:rpc(Config, persistent_term, put, [Key, Value]),
OrigValue.
message_count(Config, QueueName) ->
Resource = rabbit_misc:r(<<"/">>, queue, QueueName),
{ok, Q} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [Resource]),
maps:from_list(
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, info,
[Q, [messages, messages_unacknowledged]])).
%% A process can be only suspended by another process on the same node
suspend_process(Config, Pid) ->
true = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, spawn_suspender_proc, [Pid]),
suspended = proc_info(Pid, status),
true.
%% When the suspender process terminates, the suspended process is also resumed
resume_process(Config) ->
case rabbit_ct_broker_helpers:rpc(Config, 0, erlang, whereis, [suspender]) of
undefined ->
false;
SusPid ->
exit(SusPid, kill)
end.
spawn_suspender_proc(Pid) ->
undefined = whereis(suspender),
ReqPid = self(),
SusPid =
spawn(
fun() ->
register(suspender, self()),
Res = catch (true = erlang:suspend_process(Pid)),
ReqPid ! {suspend_res, self(), Res},
%% wait indefinitely
receive stop -> ok end
end),
receive
{suspend_res, SusPid, Res} -> Res
after
5000 -> timeout
end.
find_shovel_pid(Config) ->
[ShovelPid] = [P || P <- rabbit_ct_broker_helpers:rpc(
Config, 0, erlang, processes, []),
rabbit_shovel_worker ==
(catch element(1, erpc:call(node(P), proc_lib, initial_call, [P])))],
ShovelPid.
get_shovel_state(ShovelPid) ->
gen_server2:with_state(ShovelPid, fun rabbit_shovel_worker:get_internal_config/1).
find_writer_pid_for_channel(Config, ChanPid) ->
{amqp_channel, ChanName} = process_name(ChanPid),
[WriterPid] = [P || P <- rabbit_ct_broker_helpers:rpc(
Config, 0, erlang, processes, []),
{rabbit_writer, ChanName} == process_name(P)],
WriterPid.
process_name(Pid) ->
try proc_info(Pid, dictionary) of
Dict ->
proplists:get_value(process_name, Dict)
catch _:_ ->
undefined
end.
proc_info(Pid, Item) ->
{Item, Value} = erpc:call(node(Pid), erlang, process_info, [Pid, Item]),
Value.
cleanup1(_Config) ->
[rabbit_runtime_parameters:clear(rabbit_misc:pget(vhost, P),
rabbit_misc:pget(component, P),
rabbit_misc:pget(name, P),
<<"acting-user">>) ||
P <- rabbit_runtime_parameters:list()],
[rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>)
|| Q <- rabbit_amqqueue:list()].