2025-10-07 22:30:20 +08:00
|
|
|
%% This Source Code Form is subject to the terms of the Mozilla Public
|
|
|
|
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
|
|
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
|
|
|
%%
|
|
|
|
|
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
|
|
|
|
%%
|
|
|
|
|
|
|
|
|
|
-module(shovel_dynamic_SUITE).
|
|
|
|
|
%% Common test cases to all protocols
|
|
|
|
|
|
|
|
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
|
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
|
|
|
|
|
|
|
|
|
|
-compile(export_all).
|
|
|
|
|
|
2025-10-16 17:43:01 +08:00
|
|
|
-import(rabbit_ct_helpers, [eventually/3]).
|
2025-10-14 21:11:46 +08:00
|
|
|
-import(shovel_test_utils, [await_autodelete/2,
|
|
|
|
|
set_param/3,
|
|
|
|
|
set_param_nowait/3,
|
2025-10-16 20:31:42 +08:00
|
|
|
clear_param/2,
|
2025-10-07 22:30:20 +08:00
|
|
|
with_amqp10_session/2,
|
2025-10-08 23:43:53 +08:00
|
|
|
amqp10_publish_expect/5,
|
2025-10-14 21:11:46 +08:00
|
|
|
amqp10_declare_queue/3,
|
|
|
|
|
amqp10_publish/4,
|
2025-10-16 20:31:42 +08:00
|
|
|
amqp10_publish_msg/4,
|
|
|
|
|
amqp10_expect/3,
|
|
|
|
|
amqp10_expect_one/2,
|
|
|
|
|
amqp10_expect_count/3,
|
|
|
|
|
amqp10_expect_empty/2,
|
|
|
|
|
amqp10_subscribe/2,
|
|
|
|
|
make_uri/2, make_uri/3,
|
|
|
|
|
make_uri/5,
|
|
|
|
|
await_no_shovel/2
|
2025-10-14 21:11:46 +08:00
|
|
|
]).
|
2025-10-07 22:30:20 +08:00
|
|
|
|
|
|
|
|
-define(PARAM, <<"test">>).
|
|
|
|
|
|
|
|
|
|
all() ->
|
|
|
|
|
[
|
|
|
|
|
{group, amqp091},
|
|
|
|
|
{group, amqp10},
|
|
|
|
|
{group, local},
|
|
|
|
|
{group, amqp091_to_amqp10},
|
|
|
|
|
{group, amqp091_to_local},
|
|
|
|
|
{group, amqp10_to_amqp091},
|
|
|
|
|
{group, amqp10_to_local},
|
|
|
|
|
{group, local_to_amqp091},
|
|
|
|
|
{group, local_to_amqp10}
|
|
|
|
|
].
|
|
|
|
|
|
|
|
|
|
groups() ->
|
|
|
|
|
[
|
|
|
|
|
{amqp091, [], tests()},
|
|
|
|
|
{amqp10, [], tests()},
|
|
|
|
|
{local, [], tests()},
|
|
|
|
|
{amqp091_to_amqp10, [], tests()},
|
|
|
|
|
{amqp091_to_local, [], tests()},
|
|
|
|
|
{amqp10_to_amqp091, [], tests()},
|
|
|
|
|
{amqp10_to_local, [], tests()},
|
|
|
|
|
{local_to_amqp091, [], tests()},
|
|
|
|
|
{local_to_amqp10, [], tests()}
|
|
|
|
|
].
|
|
|
|
|
|
|
|
|
|
tests() ->
|
|
|
|
|
[
|
2025-10-08 23:43:53 +08:00
|
|
|
simple,
|
|
|
|
|
simple_classic_no_ack,
|
|
|
|
|
simple_classic_on_confirm,
|
|
|
|
|
simple_classic_on_publish,
|
|
|
|
|
simple_quorum_no_ack,
|
|
|
|
|
simple_quorum_on_confirm,
|
2025-10-14 21:11:46 +08:00
|
|
|
simple_quorum_on_publish,
|
2025-10-16 20:31:42 +08:00
|
|
|
simple_stream_on_confirm,
|
|
|
|
|
simple_stream_on_publish,
|
|
|
|
|
%% Credit flow tests are just simple tests that publish a high
|
|
|
|
|
%% number of messages, on the attempt to trigger the different
|
|
|
|
|
%% credit flow mechanisms. Having the same test twice (simple/credit)
|
|
|
|
|
%% helps to isolate the problem.
|
|
|
|
|
credit_flow_classic_no_ack,
|
|
|
|
|
credit_flow_classic_on_confirm,
|
|
|
|
|
credit_flow_classic_on_publish,
|
|
|
|
|
credit_flow_quorum_no_ack,
|
|
|
|
|
credit_flow_quorum_on_confirm,
|
|
|
|
|
credit_flow_quorum_on_publish,
|
|
|
|
|
credit_flow_stream_on_confirm,
|
|
|
|
|
credit_flow_stream_on_publish,
|
|
|
|
|
delete_after_never,
|
2025-10-14 21:11:46 +08:00
|
|
|
autodelete_classic_on_confirm,
|
|
|
|
|
autodelete_quorum_on_confirm,
|
|
|
|
|
autodelete_classic_on_publish,
|
|
|
|
|
autodelete_quorum_on_publish,
|
|
|
|
|
autodelete_no_ack,
|
|
|
|
|
autodelete_classic_on_confirm_no_transfer,
|
|
|
|
|
autodelete_quorum_on_confirm_no_transfer,
|
|
|
|
|
autodelete_classic_on_publish_no_transfer,
|
2025-10-16 17:43:01 +08:00
|
|
|
autodelete_quorum_on_publish_no_transfer,
|
2025-10-20 15:14:17 +08:00
|
|
|
%% AMQP091 and local shovels requeue messages on reject
|
2025-10-20 23:29:26 +08:00
|
|
|
%% while AMQP10 discards messages on reject.
|
|
|
|
|
%% These two tests will remain commented out until a decision on
|
|
|
|
|
%% which behavior to adopt for both is made and implemented.
|
2025-10-20 15:14:17 +08:00
|
|
|
%% autodelete_classic_on_confirm_with_rejections,
|
|
|
|
|
%% autodelete_quorum_on_confirm_with_rejections,
|
2025-10-16 17:43:01 +08:00
|
|
|
autodelete_classic_on_publish_with_rejections,
|
2025-10-16 20:31:42 +08:00
|
|
|
autodelete_quorum_on_publish_with_rejections,
|
|
|
|
|
no_vhost_access,
|
|
|
|
|
no_user_access,
|
|
|
|
|
application_properties,
|
|
|
|
|
delete_src_queue,
|
|
|
|
|
shovel_status,
|
2025-10-30 17:26:36 +08:00
|
|
|
change_definition,
|
|
|
|
|
disk_alarm
|
2025-10-07 22:30:20 +08:00
|
|
|
].
|
|
|
|
|
|
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
|
%% Testsuite setup/teardown.
|
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
init_per_suite(Config0) ->
|
|
|
|
|
{ok, _} = application:ensure_all_started(amqp10_client),
|
|
|
|
|
rabbit_ct_helpers:log_environment(),
|
|
|
|
|
Config1 = rabbit_ct_helpers:set_config(Config0, [
|
|
|
|
|
{rmq_nodename_suffix, ?MODULE},
|
|
|
|
|
{ignored_crashes, [
|
|
|
|
|
"server_initiated_close,404",
|
|
|
|
|
"writer,send_failed,closed",
|
|
|
|
|
"source_queue_down",
|
2025-10-16 20:31:42 +08:00
|
|
|
"dest_queue_down",
|
|
|
|
|
"inbound_link_detached"
|
2025-10-07 22:30:20 +08:00
|
|
|
]}
|
|
|
|
|
]),
|
|
|
|
|
rabbit_ct_helpers:run_setup_steps(
|
|
|
|
|
Config1,
|
|
|
|
|
rabbit_ct_broker_helpers:setup_steps() ++
|
|
|
|
|
rabbit_ct_client_helpers:setup_steps()).
|
|
|
|
|
|
|
|
|
|
end_per_suite(Config) ->
|
|
|
|
|
application:stop(amqp10_client),
|
|
|
|
|
rabbit_ct_helpers:run_teardown_steps(Config,
|
|
|
|
|
rabbit_ct_client_helpers:teardown_steps() ++
|
|
|
|
|
rabbit_ct_broker_helpers:teardown_steps()).
|
|
|
|
|
|
|
|
|
|
init_per_group(amqp091, Config) ->
|
|
|
|
|
rabbit_ct_helpers:set_config(
|
|
|
|
|
Config,
|
|
|
|
|
[
|
|
|
|
|
{src_protocol, <<"amqp091">>},
|
|
|
|
|
{dest_protocol, <<"amqp091">>},
|
|
|
|
|
{src_address, <<"src-queue">>},
|
|
|
|
|
{dest_address, <<"dest-queue">>}
|
|
|
|
|
]);
|
|
|
|
|
init_per_group(amqp10, Config) ->
|
|
|
|
|
rabbit_ct_helpers:set_config(
|
|
|
|
|
Config,
|
|
|
|
|
[
|
|
|
|
|
{src_protocol, <<"amqp10">>},
|
|
|
|
|
{dest_protocol, <<"amqp10">>},
|
|
|
|
|
{src_address, <<"src-address">>},
|
|
|
|
|
{dest_address, <<"dest-address">>}
|
2025-10-20 23:29:26 +08:00
|
|
|
]);
|
2025-10-14 21:25:25 +08:00
|
|
|
init_per_group(local, Config) ->
|
|
|
|
|
rabbit_ct_helpers:set_config(
|
|
|
|
|
Config,
|
|
|
|
|
[
|
|
|
|
|
{src_protocol, <<"local">>},
|
|
|
|
|
{dest_protocol, <<"local">>},
|
|
|
|
|
{src_address, <<"src-queue">>},
|
|
|
|
|
{dest_address, <<"dest-queue">>}
|
|
|
|
|
]);
|
2025-10-07 22:30:20 +08:00
|
|
|
init_per_group(amqp091_to_amqp10, Config) ->
|
|
|
|
|
rabbit_ct_helpers:set_config(
|
|
|
|
|
Config,
|
|
|
|
|
[
|
|
|
|
|
{src_protocol, <<"amqp091">>},
|
|
|
|
|
{dest_protocol, <<"amqp10">>},
|
|
|
|
|
{src_address, <<"src-queue">>},
|
|
|
|
|
{dest_address, <<"dest-address">>}
|
|
|
|
|
]);
|
2025-10-14 21:25:25 +08:00
|
|
|
init_per_group(amqp091_to_local, Config) ->
|
|
|
|
|
rabbit_ct_helpers:set_config(
|
|
|
|
|
Config,
|
|
|
|
|
[
|
|
|
|
|
{src_protocol, <<"amqp091">>},
|
|
|
|
|
{dest_protocol, <<"local">>},
|
|
|
|
|
{src_address, <<"src-queue">>},
|
|
|
|
|
{dest_address, <<"dest-queue">>}
|
|
|
|
|
]);
|
2025-10-07 22:30:20 +08:00
|
|
|
init_per_group(amqp10_to_amqp091, Config) ->
|
|
|
|
|
rabbit_ct_helpers:set_config(
|
|
|
|
|
Config,
|
|
|
|
|
[
|
|
|
|
|
{src_protocol, <<"amqp10">>},
|
|
|
|
|
{dest_protocol, <<"amqp091">>},
|
|
|
|
|
{src_address, <<"src-address">>},
|
|
|
|
|
{dest_address, <<"dest-queue">>}
|
|
|
|
|
]);
|
2025-10-14 21:25:25 +08:00
|
|
|
init_per_group(amqp10_to_local, Config) ->
|
|
|
|
|
rabbit_ct_helpers:set_config(
|
|
|
|
|
Config,
|
|
|
|
|
[
|
|
|
|
|
{src_protocol, <<"amqp10">>},
|
|
|
|
|
{dest_protocol, <<"local">>},
|
|
|
|
|
{src_address, <<"src-address">>},
|
|
|
|
|
{dest_address, <<"dest-queue">>}
|
|
|
|
|
]);
|
|
|
|
|
init_per_group(local_to_amqp091, Config) ->
|
|
|
|
|
rabbit_ct_helpers:set_config(
|
|
|
|
|
Config,
|
|
|
|
|
[
|
|
|
|
|
{src_protocol, <<"local">>},
|
|
|
|
|
{dest_protocol, <<"amqp091">>},
|
|
|
|
|
{src_address, <<"src-queue">>},
|
|
|
|
|
{dest_address, <<"dest-queue">>}
|
|
|
|
|
]);
|
|
|
|
|
init_per_group(local_to_amqp10, Config) ->
|
|
|
|
|
rabbit_ct_helpers:set_config(
|
|
|
|
|
Config,
|
|
|
|
|
[
|
|
|
|
|
{src_protocol, <<"local">>},
|
|
|
|
|
{dest_protocol, <<"amqp10">>},
|
|
|
|
|
{src_address, <<"src-queue">>},
|
|
|
|
|
{dest_address, <<"dest-address">>}
|
|
|
|
|
]).
|
2025-10-07 22:30:20 +08:00
|
|
|
|
|
|
|
|
end_per_group(_, Config) ->
|
|
|
|
|
Config.
|
|
|
|
|
|
|
|
|
|
init_per_testcase(Testcase, Config0) ->
|
|
|
|
|
SrcQ = list_to_binary(atom_to_list(Testcase) ++ "_src"),
|
|
|
|
|
DestQ = list_to_binary(atom_to_list(Testcase) ++ "_dest"),
|
2025-10-16 20:31:42 +08:00
|
|
|
VHost = list_to_binary(atom_to_list(Testcase) ++ "_vhost"),
|
2025-10-07 22:30:20 +08:00
|
|
|
ShovelArgs = [{<<"src-protocol">>, ?config(src_protocol, Config0)},
|
|
|
|
|
{<<"dest-protocol">>, ?config(dest_protocol, Config0)},
|
|
|
|
|
{?config(src_address, Config0), SrcQ},
|
|
|
|
|
{?config(dest_address, Config0), DestQ}],
|
|
|
|
|
Config = rabbit_ct_helpers:set_config(
|
|
|
|
|
Config0,
|
2025-10-16 20:31:42 +08:00
|
|
|
[{srcq, SrcQ}, {destq, DestQ}, {shovel_args, ShovelArgs},
|
|
|
|
|
{alt_vhost, VHost}]),
|
2025-10-07 22:30:20 +08:00
|
|
|
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
|
|
|
|
|
|
|
|
|
end_per_testcase(Testcase, Config) ->
|
|
|
|
|
shovel_test_utils:clear_param(Config, ?PARAM),
|
|
|
|
|
rabbit_ct_broker_helpers:rpc(Config, 0, shovel_test_utils, delete_all_queues, []),
|
2025-10-16 20:31:42 +08:00
|
|
|
_ = rabbit_ct_broker_helpers:delete_vhost(Config, ?config(alt_vhost, Config)),
|
2025-10-07 22:30:20 +08:00
|
|
|
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
|
|
|
|
|
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
|
%% Testcases.
|
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
|
simple(Config) ->
|
|
|
|
|
Src = ?config(srcq, Config),
|
|
|
|
|
Dest = ?config(destq, Config),
|
|
|
|
|
with_amqp10_session(
|
|
|
|
|
Config,
|
|
|
|
|
fun (Sess) ->
|
2025-10-08 23:43:53 +08:00
|
|
|
set_param(Config, ?PARAM, ?config(shovel_args, Config)),
|
2025-10-07 22:30:20 +08:00
|
|
|
amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1),
|
2025-10-08 23:43:53 +08:00
|
|
|
Status = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, ?PARAM}]),
|
2025-10-07 22:30:20 +08:00
|
|
|
?assertMatch([_|_], Status),
|
|
|
|
|
?assertMatch(#{metrics := #{forwarded := 1}}, maps:from_list(Status))
|
|
|
|
|
end).
|
|
|
|
|
|
2025-10-08 23:43:53 +08:00
|
|
|
simple_classic_no_ack(Config) ->
|
2025-10-16 20:31:42 +08:00
|
|
|
simple_queue_type_ack_mode(Config, <<"classic">>, <<"no-ack">>, 10).
|
2025-10-08 23:43:53 +08:00
|
|
|
|
|
|
|
|
simple_classic_on_confirm(Config) ->
|
2025-10-16 20:31:42 +08:00
|
|
|
simple_queue_type_ack_mode(Config, <<"classic">>, <<"on-confirm">>, 10).
|
2025-10-08 23:43:53 +08:00
|
|
|
|
|
|
|
|
simple_classic_on_publish(Config) ->
|
2025-10-16 20:31:42 +08:00
|
|
|
simple_queue_type_ack_mode(Config, <<"classic">>, <<"on-publish">>, 10).
|
2025-10-08 23:43:53 +08:00
|
|
|
|
|
|
|
|
simple_quorum_no_ack(Config) ->
|
2025-10-16 20:31:42 +08:00
|
|
|
simple_queue_type_ack_mode(Config, <<"quorum">>, <<"no-ack">>, 10).
|
2025-10-08 23:43:53 +08:00
|
|
|
|
|
|
|
|
simple_quorum_on_confirm(Config) ->
|
2025-10-16 20:31:42 +08:00
|
|
|
simple_queue_type_ack_mode(Config, <<"quorum">>, <<"on-confirm">>, 10).
|
2025-10-08 23:43:53 +08:00
|
|
|
|
|
|
|
|
simple_quorum_on_publish(Config) ->
|
2025-10-16 20:31:42 +08:00
|
|
|
simple_queue_type_ack_mode(Config, <<"quorum">>, <<"on-publish">>, 10).
|
2025-10-08 23:43:53 +08:00
|
|
|
|
2025-10-16 20:31:42 +08:00
|
|
|
simple_stream_on_confirm(Config) ->
|
|
|
|
|
simple_stream(Config, <<"on-confirm">>, 10).
|
|
|
|
|
|
|
|
|
|
simple_stream_on_publish(Config) ->
|
|
|
|
|
simple_stream(Config, <<"on-publish">>, 10).
|
|
|
|
|
|
|
|
|
|
simple_stream(Config, AckMode, NMsgs) ->
|
|
|
|
|
Src = ?config(srcq, Config),
|
|
|
|
|
Dest = ?config(destq, Config),
|
|
|
|
|
with_amqp10_session(Config,
|
|
|
|
|
fun (Sess) ->
|
|
|
|
|
amqp10_declare_queue(Sess, Src, #{<<"x-queue-type">> => {utf8, <<"stream">>}}),
|
|
|
|
|
amqp10_declare_queue(Sess, Dest, #{<<"x-queue-type">> => {utf8, <<"stream">>}}),
|
|
|
|
|
set_param(Config, ?PARAM,
|
|
|
|
|
?config(shovel_args, Config) ++ [{<<"ack-mode">>, AckMode}]),
|
|
|
|
|
Receiver = amqp10_subscribe(Sess, Dest),
|
|
|
|
|
amqp10_publish(Sess, Src, <<"tag1">>, NMsgs),
|
|
|
|
|
?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := NMsgs}, _}],
|
|
|
|
|
rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
|
|
|
rabbit_shovel_status, status, []),
|
|
|
|
|
30000),
|
|
|
|
|
_ = amqp10_expect(Receiver, NMsgs, []),
|
|
|
|
|
amqp10_client:detach_link(Receiver)
|
|
|
|
|
end).
|
|
|
|
|
|
|
|
|
|
credit_flow_classic_no_ack(Config) ->
|
|
|
|
|
simple_queue_type_ack_mode(Config, <<"classic">>, <<"no-ack">>, 5000).
|
|
|
|
|
|
|
|
|
|
credit_flow_classic_on_confirm(Config) ->
|
|
|
|
|
simple_queue_type_ack_mode(Config, <<"classic">>, <<"on-confirm">>, 5000).
|
|
|
|
|
|
|
|
|
|
credit_flow_classic_on_publish(Config) ->
|
|
|
|
|
simple_queue_type_ack_mode(Config, <<"classic">>, <<"on-publish">>, 5000).
|
|
|
|
|
|
|
|
|
|
credit_flow_quorum_no_ack(Config) ->
|
|
|
|
|
simple_queue_type_ack_mode(Config, <<"quorum">>, <<"no-ack">>, 5000).
|
|
|
|
|
|
|
|
|
|
credit_flow_quorum_on_confirm(Config) ->
|
|
|
|
|
simple_queue_type_ack_mode(Config, <<"quorum">>, <<"on-confirm">>, 5000).
|
|
|
|
|
|
|
|
|
|
credit_flow_quorum_on_publish(Config) ->
|
|
|
|
|
simple_queue_type_ack_mode(Config, <<"quorum">>, <<"on-publish">>, 5000).
|
|
|
|
|
|
|
|
|
|
credit_flow_stream_on_confirm(Config) ->
|
|
|
|
|
simple_stream(Config, <<"on-confirm">>, 5000).
|
|
|
|
|
|
|
|
|
|
credit_flow_stream_on_publish(Config) ->
|
|
|
|
|
simple_stream(Config, <<"on-publish">>, 5000).
|
|
|
|
|
|
|
|
|
|
simple_queue_type_ack_mode(Config, Type, AckMode, NMsgs) ->
|
2025-10-08 23:43:53 +08:00
|
|
|
Src = ?config(srcq, Config),
|
|
|
|
|
Dest = ?config(destq, Config),
|
|
|
|
|
with_amqp10_session(
|
|
|
|
|
Config,
|
|
|
|
|
fun (Sess) ->
|
|
|
|
|
amqp10_declare_queue(Sess, Src, #{<<"x-queue-type">> => {utf8, Type}}),
|
|
|
|
|
amqp10_declare_queue(Sess, Dest, #{<<"x-queue-type">> => {utf8, Type}}),
|
|
|
|
|
ExtraArgs = [{<<"ack-mode">>, AckMode}],
|
|
|
|
|
ShovelArgs = ?config(shovel_args, Config) ++ ExtraArgs,
|
|
|
|
|
set_param(Config, ?PARAM, ShovelArgs),
|
2025-10-16 20:31:42 +08:00
|
|
|
amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, NMsgs)
|
|
|
|
|
end).
|
|
|
|
|
|
|
|
|
|
delete_after_never(Config) ->
|
|
|
|
|
Src = ?config(srcq, Config),
|
|
|
|
|
Dest = ?config(destq, Config),
|
|
|
|
|
with_amqp10_session(
|
|
|
|
|
Config,
|
|
|
|
|
fun (Sess) ->
|
|
|
|
|
set_param(Config, ?PARAM,
|
|
|
|
|
?config(shovel_args, Config) ++
|
|
|
|
|
[{<<"src-delete-after">>, <<"never">>}]),
|
|
|
|
|
amqp10_publish_expect(Sess, Src, Dest, <<"carrots">>, 5000),
|
|
|
|
|
?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 5000}, _}],
|
|
|
|
|
rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
|
|
|
rabbit_shovel_status, status, []),
|
|
|
|
|
30000)
|
2025-10-08 23:43:53 +08:00
|
|
|
end).
|
2025-10-14 21:11:46 +08:00
|
|
|
|
|
|
|
|
autodelete_classic_on_confirm_no_transfer(Config) ->
|
|
|
|
|
autodelete(Config, <<"classic">>, <<"on-confirm">>, 0, 100, 0).
|
|
|
|
|
|
|
|
|
|
autodelete_quorum_on_confirm_no_transfer(Config) ->
|
|
|
|
|
autodelete(Config, <<"quorum">>, <<"on-confirm">>, 0, 100, 0).
|
|
|
|
|
|
|
|
|
|
autodelete_classic_on_publish_no_transfer(Config) ->
|
|
|
|
|
autodelete(Config, <<"classic">>, <<"on-publish">>, 0, 100, 0).
|
|
|
|
|
|
|
|
|
|
autodelete_quorum_on_publish_no_transfer(Config) ->
|
|
|
|
|
autodelete(Config, <<"quorum">>, <<"on-publish">>, 0, 100, 0).
|
|
|
|
|
|
|
|
|
|
autodelete_classic_on_confirm(Config) ->
|
|
|
|
|
autodelete(Config, <<"classic">>, <<"on-confirm">>, 50, 50, 50).
|
|
|
|
|
|
|
|
|
|
autodelete_quorum_on_confirm(Config) ->
|
|
|
|
|
autodelete(Config, <<"quorum">>, <<"on-confirm">>, 50, 50, 50).
|
|
|
|
|
|
|
|
|
|
autodelete_classic_on_publish(Config) ->
|
|
|
|
|
autodelete(Config, <<"classic">>, <<"on-publish">>, 50, 50, 50).
|
|
|
|
|
|
|
|
|
|
autodelete_quorum_on_publish(Config) ->
|
|
|
|
|
autodelete(Config, <<"quorum">>, <<"on-publish">>, 50, 50, 50).
|
|
|
|
|
|
|
|
|
|
autodelete_no_ack(Config) ->
|
|
|
|
|
ExtraArgs = [{<<"ack-mode">>, <<"no-ack">>},
|
|
|
|
|
{<<"src-delete-after">>, 100}],
|
|
|
|
|
ShovelArgs = ?config(shovel_args, Config) ++ ExtraArgs,
|
2025-10-16 20:31:42 +08:00
|
|
|
Uri = make_uri(Config, 0),
|
2025-10-14 21:11:46 +08:00
|
|
|
?assertMatch({error_string, _},
|
|
|
|
|
rabbit_ct_broker_helpers:rpc(
|
|
|
|
|
Config, 0, rabbit_runtime_parameters, set,
|
|
|
|
|
[<<"/">>, <<"shovel">>, ?PARAM,
|
|
|
|
|
[{<<"src-uri">>, Uri},
|
|
|
|
|
{<<"dest-uri">>, [Uri]}] ++ ShovelArgs,
|
|
|
|
|
none])).
|
|
|
|
|
|
|
|
|
|
autodelete(Config, Type, AckMode, After, ExpSrc, ExpDest) ->
|
|
|
|
|
Src = ?config(srcq, Config),
|
|
|
|
|
Dest = ?config(destq, Config),
|
|
|
|
|
with_amqp10_session(
|
|
|
|
|
Config,
|
|
|
|
|
fun (Sess) ->
|
|
|
|
|
amqp10_declare_queue(Sess, Src, #{<<"x-queue-type">> => {utf8, Type}}),
|
|
|
|
|
amqp10_declare_queue(Sess, Dest, #{<<"x-queue-type">> => {utf8, Type}}),
|
|
|
|
|
amqp10_publish(Sess, Src, <<"hello">>, 100),
|
|
|
|
|
ExtraArgs = [{<<"ack-mode">>, AckMode},
|
|
|
|
|
{<<"src-delete-after">>, After}],
|
|
|
|
|
ShovelArgs = ?config(shovel_args, Config) ++ ExtraArgs,
|
|
|
|
|
set_param_nowait(Config, ?PARAM, ShovelArgs),
|
|
|
|
|
await_autodelete(Config, ?PARAM),
|
|
|
|
|
amqp10_expect_count(Sess, Src, ExpSrc),
|
|
|
|
|
amqp10_expect_count(Sess, Dest, ExpDest)
|
|
|
|
|
end).
|
|
|
|
|
|
2025-10-16 17:43:01 +08:00
|
|
|
autodelete_classic_on_confirm_with_rejections(Config) ->
|
|
|
|
|
autodelete_with_rejections(Config, <<"classic">>, <<"on-confirm">>, 5, 5).
|
|
|
|
|
|
|
|
|
|
autodelete_quorum_on_confirm_with_rejections(Config) ->
|
|
|
|
|
ExpSrc = fun(ExpDest) -> 100 - ExpDest end,
|
|
|
|
|
autodelete_with_quorum_rejections(Config, <<"on-confirm">>, ExpSrc).
|
|
|
|
|
|
|
|
|
|
autodelete_classic_on_publish_with_rejections(Config) ->
|
|
|
|
|
autodelete_with_rejections(Config, <<"classic">>, <<"on-publish">>, 0, 5).
|
|
|
|
|
|
|
|
|
|
autodelete_quorum_on_publish_with_rejections(Config) ->
|
|
|
|
|
ExpSrc = fun(_) -> 0 end,
|
|
|
|
|
autodelete_with_quorum_rejections(Config, <<"on-publish">>, ExpSrc).
|
|
|
|
|
|
|
|
|
|
autodelete_with_rejections(Config, Type, AckMode, ExpSrc, ExpDest) ->
|
|
|
|
|
Src = ?config(srcq, Config),
|
|
|
|
|
Dest = ?config(destq, Config),
|
|
|
|
|
with_amqp10_session(
|
|
|
|
|
Config,
|
|
|
|
|
fun (Sess) ->
|
|
|
|
|
amqp10_declare_queue(Sess, Src, #{<<"x-queue-type">> => {utf8, Type}}),
|
|
|
|
|
amqp10_declare_queue(Sess, Dest, #{<<"x-queue-type">> => {utf8, Type},
|
|
|
|
|
<<"x-overflow">> => {utf8, <<"reject-publish">>},
|
|
|
|
|
<<"x-max-length">> => {ulong, 5}
|
|
|
|
|
}),
|
|
|
|
|
amqp10_publish(Sess, Src, <<"hello">>, 10),
|
|
|
|
|
ExtraArgs = [{<<"ack-mode">>, AckMode},
|
|
|
|
|
{<<"src-delete-after">>, 10}],
|
|
|
|
|
ShovelArgs = ?config(shovel_args, Config) ++ ExtraArgs,
|
|
|
|
|
set_param_nowait(Config, ?PARAM, ShovelArgs),
|
|
|
|
|
await_autodelete(Config, ?PARAM),
|
|
|
|
|
Expected = lists:sort([[Src, integer_to_binary(ExpSrc)],
|
|
|
|
|
[Dest, integer_to_binary(ExpDest)]]),
|
|
|
|
|
?awaitMatch(
|
|
|
|
|
Expected,
|
|
|
|
|
lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
|
|
|
|
|
Config, 0,
|
|
|
|
|
["list_queues", "name", "messages", "--no-table-headers"])),
|
|
|
|
|
45_000),
|
|
|
|
|
amqp10_expect_count(Sess, Src, ExpSrc),
|
|
|
|
|
amqp10_expect_count(Sess, Dest, ExpDest)
|
|
|
|
|
end).
|
|
|
|
|
|
|
|
|
|
autodelete_with_quorum_rejections(Config, AckMode, ExpSrcFun) ->
|
|
|
|
|
Src = ?config(srcq, Config),
|
|
|
|
|
Dest = ?config(destq, Config),
|
|
|
|
|
Type = <<"quorum">>,
|
|
|
|
|
with_amqp10_session(
|
|
|
|
|
Config,
|
|
|
|
|
fun (Sess) ->
|
|
|
|
|
amqp10_declare_queue(Sess, Src, #{<<"x-queue-type">> => {utf8, Type}}),
|
|
|
|
|
amqp10_declare_queue(Sess, Dest, #{<<"x-queue-type">> => {utf8, Type},
|
|
|
|
|
<<"x-overflow">> => {utf8, <<"reject-publish">>},
|
|
|
|
|
<<"x-max-length">> => {ulong, 5}
|
|
|
|
|
}),
|
|
|
|
|
amqp10_publish(Sess, Src, <<"hello">>, 100),
|
|
|
|
|
ExtraArgs = [{<<"ack-mode">>, AckMode},
|
|
|
|
|
{<<"src-delete-after">>, 50}],
|
|
|
|
|
ShovelArgs = ?config(shovel_args, Config) ++ ExtraArgs,
|
|
|
|
|
set_param_nowait(Config, ?PARAM, ShovelArgs),
|
|
|
|
|
await_autodelete(Config, ?PARAM),
|
|
|
|
|
eventually(
|
|
|
|
|
?_assert(
|
|
|
|
|
list_queue_messages(Config, Dest) >= 5),
|
|
|
|
|
1000, 45),
|
|
|
|
|
ExpDest = list_queue_messages(Config, Dest),
|
|
|
|
|
amqp10_expect_count(Sess, Src, ExpSrcFun(ExpDest)),
|
|
|
|
|
amqp10_expect_count(Sess, Dest, ExpDest)
|
|
|
|
|
end).
|
|
|
|
|
|
2025-10-16 20:31:42 +08:00
|
|
|
no_vhost_access(Config) ->
|
|
|
|
|
AltVHost = ?config(alt_vhost, Config),
|
|
|
|
|
ok = rabbit_ct_broker_helpers:add_vhost(Config, AltVHost),
|
|
|
|
|
Uri = make_uri(Config, 0, AltVHost),
|
|
|
|
|
ExtraArgs = [{<<"src-uri">>, Uri}, {<<"dest-uri">>, [Uri]}],
|
|
|
|
|
ShovelArgs = ?config(shovel_args, Config) ++ ExtraArgs,
|
|
|
|
|
ok = rabbit_ct_broker_helpers:rpc(
|
|
|
|
|
Config, 0, rabbit_runtime_parameters, set,
|
|
|
|
|
[<<"/">>, <<"shovel">>, ?PARAM, ShovelArgs, none]),
|
|
|
|
|
await_no_shovel(Config, ?PARAM).
|
|
|
|
|
|
|
|
|
|
no_user_access(Config) ->
|
|
|
|
|
Uri = make_uri(
|
|
|
|
|
Config, 0, <<"guest">>, <<"forgotmypassword">>, <<"%2F">>),
|
|
|
|
|
ShovelArgs = [{<<"src-uri">>, Uri},
|
|
|
|
|
{<<"dest-uri">>, [Uri]}] ++ ?config(shovel_args, Config),
|
|
|
|
|
ok = rabbit_ct_broker_helpers:rpc(
|
|
|
|
|
Config, 0, rabbit_runtime_parameters, set,
|
|
|
|
|
[<<"/">>, <<"shovel">>, ?PARAM, ShovelArgs, none]),
|
|
|
|
|
await_no_shovel(Config, ?PARAM).
|
|
|
|
|
|
|
|
|
|
application_properties(Config) ->
|
|
|
|
|
Src = ?config(srcq, Config),
|
|
|
|
|
Dest = ?config(destq, Config),
|
|
|
|
|
with_amqp10_session(
|
|
|
|
|
Config,
|
|
|
|
|
fun (Sess) ->
|
|
|
|
|
set_param(Config, ?PARAM, ?config(shovel_args, Config)),
|
|
|
|
|
Tag = <<"tag1">>,
|
|
|
|
|
Msg = amqp10_msg:set_application_properties(
|
|
|
|
|
#{<<"key">> => <<"value">>},
|
|
|
|
|
amqp10_msg:set_headers(
|
|
|
|
|
#{durable => true},
|
|
|
|
|
amqp10_msg:new(Tag, <<"hello">>, false))),
|
|
|
|
|
amqp10_publish_msg(Sess, Src, Tag, Msg),
|
|
|
|
|
MsgRcv = amqp10_expect_one(Sess, Dest),
|
|
|
|
|
AppProps = amqp10_msg:application_properties(MsgRcv),
|
|
|
|
|
?assertMatch(#{<<"key">> := <<"value">>},
|
|
|
|
|
AppProps)
|
|
|
|
|
end).
|
|
|
|
|
|
|
|
|
|
delete_src_queue(Config) ->
|
|
|
|
|
Src = ?config(srcq, Config),
|
|
|
|
|
Dest = ?config(destq, Config),
|
|
|
|
|
with_amqp10_session(Config,
|
|
|
|
|
fun (Sess) ->
|
|
|
|
|
set_param(Config, ?PARAM, ?config(shovel_args, Config)),
|
|
|
|
|
_ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1),
|
|
|
|
|
?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 1}, _}],
|
|
|
|
|
rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
|
|
|
rabbit_shovel_status, status, []),
|
|
|
|
|
30000),
|
|
|
|
|
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queue,
|
|
|
|
|
[Src, <<"/">>]),
|
|
|
|
|
?awaitMatch(
|
|
|
|
|
[[Dest, _],
|
|
|
|
|
[Src, _]],
|
|
|
|
|
lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
|
|
|
|
|
Config, 0,
|
|
|
|
|
["list_queues", "name", "messages", "--no-table-headers"])),
|
|
|
|
|
45_000),
|
|
|
|
|
?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 0}, _}],
|
|
|
|
|
rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
|
|
|
rabbit_shovel_status, status, []),
|
|
|
|
|
30000),
|
|
|
|
|
_ = amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1)
|
|
|
|
|
end).
|
|
|
|
|
|
|
|
|
|
shovel_status(Config) ->
|
|
|
|
|
Src = ?config(srcq, Config),
|
|
|
|
|
Dest = ?config(destq, Config),
|
|
|
|
|
SrcProtocol = ?config(src_protocol, Config),
|
|
|
|
|
DestProtocol = ?config(dest_protocol, Config),
|
|
|
|
|
set_param(Config, ?PARAM, ?config(shovel_args, Config)),
|
|
|
|
|
Status = rabbit_ct_broker_helpers:rpc(Config, 0,
|
|
|
|
|
rabbit_shovel_status, status, []),
|
|
|
|
|
?assertMatch([{_, dynamic, {running, _}, _, _}], Status),
|
|
|
|
|
[{_, dynamic, {running, Info}, _, _}] = Status,
|
|
|
|
|
?assertMatch(SrcProtocol, proplists:get_value(src_protocol, Info)),
|
|
|
|
|
?assertMatch(DestProtocol, proplists:get_value(dest_protocol, Info)),
|
|
|
|
|
SrcAddress = binary_to_atom(binary:replace(?config(src_address, Config), <<"-">>, <<"_">>)),
|
|
|
|
|
DestAddress = binary_to_atom(binary:replace(?config(dest_address, Config), <<"-">>, <<"_">>)),
|
|
|
|
|
?assertMatch(Src, proplists:get_value(SrcAddress, Info)),
|
|
|
|
|
?assertMatch(Dest, proplists:get_value(DestAddress, Info)),
|
|
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
change_definition(Config) ->
|
|
|
|
|
Src = ?config(srcq, Config),
|
|
|
|
|
Dest = ?config(destq, Config),
|
|
|
|
|
Dest2 = <<Dest/binary,<<"_2">>/binary>>,
|
|
|
|
|
DestAddress = ?config(dest_address, Config),
|
|
|
|
|
with_amqp10_session(Config,
|
|
|
|
|
fun (Sess) ->
|
|
|
|
|
ShovelArgs = ?config(shovel_args, Config),
|
|
|
|
|
set_param(Config, ?PARAM, ShovelArgs),
|
|
|
|
|
amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 1),
|
|
|
|
|
ShovelArgs0 = proplists:delete(DestAddress, ShovelArgs),
|
|
|
|
|
ShovelArgs2 = [{DestAddress, Dest2} | ShovelArgs0],
|
|
|
|
|
set_param(Config, ?PARAM, ShovelArgs2),
|
|
|
|
|
amqp10_publish_expect(Sess, Src, Dest2, <<"hello">>, 1),
|
|
|
|
|
amqp10_expect_empty(Sess, Dest),
|
|
|
|
|
clear_param(Config, ?PARAM),
|
|
|
|
|
amqp10_publish_expect(Sess, Src, Src, <<"hello">>, 1),
|
|
|
|
|
amqp10_expect_empty(Sess, Dest),
|
|
|
|
|
amqp10_expect_empty(Sess, Dest2)
|
|
|
|
|
end).
|
|
|
|
|
|
2025-10-30 17:26:36 +08:00
|
|
|
disk_alarm(Config) ->
|
|
|
|
|
Src = ?config(srcq, Config),
|
|
|
|
|
Dest = ?config(destq, Config),
|
|
|
|
|
with_amqp10_session(Config,
|
|
|
|
|
fun (Sess) ->
|
|
|
|
|
ShovelArgs = ?config(shovel_args, Config),
|
|
|
|
|
amqp10_publish(Sess, Src, <<"hello">>, 10),
|
|
|
|
|
rabbit_ct_broker_helpers:set_alarm(Config, 0, disk),
|
|
|
|
|
set_param(Config, ?PARAM, ShovelArgs),
|
|
|
|
|
amqp10_expect_empty(Sess, Dest),
|
|
|
|
|
rabbit_ct_broker_helpers:clear_alarm(Config, 0, disk),
|
|
|
|
|
amqp10_expect_count(Sess, Dest, 10)
|
|
|
|
|
end).
|
|
|
|
|
|
2025-10-14 21:11:46 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
maybe_skip_local_protocol(Config) ->
|
|
|
|
|
[Node] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
|
|
|
|
case rabbit_ct_broker_helpers:enable_feature_flag(
|
|
|
|
|
Config, [Node], 'rabbitmq_4.0.0') of
|
|
|
|
|
ok ->
|
|
|
|
|
Config;
|
|
|
|
|
_ ->
|
|
|
|
|
{skip, "This group requires rabbitmq_4.0.0 feature flag"}
|
|
|
|
|
end.
|
2025-10-16 17:43:01 +08:00
|
|
|
|
|
|
|
|
list_queue_messages(Config, QName) ->
|
|
|
|
|
List = rabbit_ct_broker_helpers:rabbitmqctl_list(
|
|
|
|
|
Config, 0,
|
|
|
|
|
["list_queues", "name", "messages", "--no-table-headers"]),
|
|
|
|
|
[[_, Messages]] = lists:filter(fun([Q, _]) ->
|
|
|
|
|
Q == QName
|
|
|
|
|
end, List),
|
|
|
|
|
binary_to_integer(Messages).
|
2025-10-16 20:31:42 +08:00
|
|
|
|
|
|
|
|
delete_queue(Name, VHost) ->
|
|
|
|
|
QName = rabbit_misc:r(VHost, queue, Name),
|
|
|
|
|
case rabbit_amqqueue:lookup(QName) of
|
|
|
|
|
{ok, Q} ->
|
|
|
|
|
{ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>);
|
|
|
|
|
_ ->
|
|
|
|
|
ok
|
|
|
|
|
end.
|