rabbitmq-server/deps/rabbitmq_recent_history_exc.../test/system_SUITE.erl

391 lines
15 KiB
Erlang

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
-module(system_SUITE).
-compile(export_all).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_recent_history.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
all() ->
[
{group, mnesia_store},
{group, khepri_store},
{group, khepri_migration}
].
groups() ->
[
{mnesia_store, [], [
{non_parallel_tests, [], all_tests()}
]},
{khepri_store, [], [
{non_parallel_tests, [], all_tests()}
]},
{khepri_migration, [], [
from_mnesia_to_khepri
]}
].
all_tests() ->
[
default_length_test,
length_argument_test,
wrong_argument_type_test,
no_store_test,
e2e_test,
multinode_test,
lifecycle_test
].
%% -------------------------------------------------------------------
%% Test suite setup/teardown.
%% -------------------------------------------------------------------
init_per_suite(Config) ->
inets:start(),
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(mnesia_store, Config) ->
case rabbit_ct_broker_helpers:configured_metadata_store(Config) of
khepri -> {skip, "These tests target Mnesia"};
_ -> Config
end;
init_per_group(khepri_store, Config) ->
case rabbit_ct_broker_helpers:configured_metadata_store(Config) of
mnesia -> {skip, "These tests target Khepri"};
_ -> Config
end;
init_per_group(_, Config) ->
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, ?MODULE},
{rmq_nodes_count, 2}
]),
rabbit_ct_helpers:run_setup_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
end_per_group(mnesia_store, Config) ->
Config;
end_per_group(khepri_store, Config) ->
Config;
end_per_group(_, Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(Testcase, Config) ->
TestCaseName = rabbit_ct_helpers:config_to_testcase_name(Config, Testcase),
rabbit_ct_helpers:set_config(Config, {test_resource_name,
re:replace(TestCaseName, "/", "-", [global, {return, list}])}).
end_per_testcase(_Testcase, Config) ->
Config.
%% -------------------------------------------------------------------
%% Test cases.
%% -------------------------------------------------------------------
default_length_test(Config) ->
Qs = qs(),
test0(Config, fun () ->
#'basic.publish'{exchange = make_exchange_name(Config, "0")}
end,
fun() ->
#amqp_msg{props = #'P_basic'{}, payload = <<>>}
end, [], Qs, 100, length(Qs) * ?KEEP_NB).
length_argument_test(Config) ->
Qs = qs(),
test0(Config, fun () ->
#'basic.publish'{exchange = make_exchange_name(Config, "0")}
end,
fun() ->
#amqp_msg{props = #'P_basic'{}, payload = <<>>}
end, [{<<"x-recent-history-length">>, long, 30}], Qs, 100, length(Qs) * 30).
wrong_argument_type_test(Config) ->
wrong_argument_type_test0(Config, -30),
wrong_argument_type_test0(Config, 0).
no_store_test(Config) ->
Qs = qs(),
test0(Config, fun () ->
#'basic.publish'{exchange = make_exchange_name(Config, "0")}
end,
fun() ->
H = [{<<"x-recent-history-no-store">>, bool, true}],
#amqp_msg{props = #'P_basic'{headers = H}, payload = <<>>}
end, [], Qs, 100, 0).
e2e_test(Config) ->
MsgCount = 10,
{Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
#'exchange.declare_ok'{} =
amqp_channel:call(Chan,
#'exchange.declare' {
exchange = make_exchange_name(Config, "1"),
type = <<"x-recent-history">>,
auto_delete = true
}),
#'exchange.declare_ok'{} =
amqp_channel:call(Chan,
#'exchange.declare' {
exchange = make_exchange_name(Config, "2"),
type = <<"direct">>,
auto_delete = true
}),
#'queue.declare_ok'{queue = Q} =
amqp_channel:call(Chan, #'queue.declare' {
queue = <<"q">>
}),
#'queue.bind_ok'{} =
amqp_channel:call(Chan, #'queue.bind' {
queue = Q,
exchange = make_exchange_name(Config, "2"),
routing_key = <<"">>
}),
#'tx.select_ok'{} = amqp_channel:call(Chan, #'tx.select'{}),
[amqp_channel:call(Chan,
#'basic.publish'{exchange = make_exchange_name(Config, "1")},
#amqp_msg{props = #'P_basic'{}, payload = <<>>}) ||
_ <- lists:duplicate(MsgCount, const)],
amqp_channel:call(Chan, #'tx.commit'{}),
amqp_channel:call(Chan,
#'exchange.bind' {
source = make_exchange_name(Config, "1"),
destination = make_exchange_name(Config, "2"),
routing_key = <<"">>
}),
%% Wait for all messages to be queued.
?awaitMatch(#'queue.declare_ok'{message_count = MsgCount, queue = Q},
amqp_channel:call(Chan, #'queue.declare' {
passive = true,
queue = Q
}),
30000),
amqp_channel:call(Chan, #'exchange.delete' { exchange = make_exchange_name(Config, "1") }),
amqp_channel:call(Chan, #'exchange.delete' { exchange = make_exchange_name(Config, "2") }),
amqp_channel:call(Chan, #'queue.delete' { queue = Q }),
rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
ok.
multinode_test(Config) ->
{Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 1),
#'exchange.declare_ok'{} =
amqp_channel:call(Chan,
#'exchange.declare' {
exchange = make_exchange_name(Config, "1"),
type = <<"x-recent-history">>,
auto_delete = false
}),
#'queue.declare_ok'{queue = Q} =
amqp_channel:call(Chan, #'queue.declare' {
queue = <<"q">>
}),
#'queue.bind_ok'{} =
amqp_channel:call(Chan, #'queue.bind' {
queue = Q,
exchange = make_exchange_name(Config, "1"),
routing_key = <<"">>
}),
amqp_channel:call(Chan, #'queue.delete' { queue = Q }),
rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
rabbit_ct_broker_helpers:restart_broker(Config, 1),
{Conn2, Chan2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
#'queue.declare_ok'{queue = Q2} =
amqp_channel:call(Chan2, #'queue.declare' {
queue = <<"q2">>
}),
#'queue.bind_ok'{} =
amqp_channel:call(Chan2, #'queue.bind' {
queue = Q2,
exchange = make_exchange_name(Config, "1"),
routing_key = <<"">>
}),
amqp_channel:call(Chan2, #'exchange.delete' { exchange = make_exchange_name(Config, "2") }),
amqp_channel:call(Chan2, #'queue.delete' { queue = Q2 }),
rabbit_ct_client_helpers:close_connection_and_channel(Conn2, Chan2),
ok.
lifecycle_test(Config) ->
%% Ensure that the boot and cleanup steps run as expected and return 'ok'.
ok = rabbit_ct_broker_helpers:rpc(
Config,
rabbit, stop_apps, [[rabbitmq_recent_history_exchange]]),
ok = rabbit_ct_broker_helpers:rpc(
Config,
rabbit, start_apps, [[rabbitmq_recent_history_exchange]]),
ok.
test0(Config, MakeMethod, MakeMsg, DeclareArgs, Queues, MsgCount, ExpectedCount) ->
Chan = rabbit_ct_client_helpers:open_channel(Config),
#'exchange.declare_ok'{} =
amqp_channel:call(Chan,
#'exchange.declare' {
exchange = make_exchange_name(Config, "0"),
type = <<"x-recent-history">>,
auto_delete = true,
arguments = DeclareArgs
}),
#'tx.select_ok'{} = amqp_channel:call(Chan, #'tx.select'{}),
[amqp_channel:call(Chan,
MakeMethod(),
MakeMsg()) || _ <- lists:duplicate(MsgCount, const)],
amqp_channel:call(Chan, #'tx.commit'{}),
[#'queue.declare_ok'{} =
amqp_channel:call(Chan, #'queue.declare' {
queue = Q, exclusive = true }) || Q <- Queues],
[#'queue.bind_ok'{} =
amqp_channel:call(Chan, #'queue.bind' { queue = Q,
exchange = make_exchange_name(Config, "0"),
routing_key = <<"">>})
|| Q <- Queues],
%% Wait a few seconds for all messages to be queued.
timer:sleep(3000),
Counts =
[begin
#'queue.declare_ok'{message_count = M} =
amqp_channel:call(Chan, #'queue.declare' {queue = Q,
exclusive = true }),
M
end || Q <- Queues],
?assertEqual(ExpectedCount, lists:sum(Counts)),
amqp_channel:call(Chan, #'exchange.delete' { exchange = make_exchange_name(Config, "0") }),
[amqp_channel:call(Chan, #'queue.delete' { queue = Q }) || Q <- Queues],
rabbit_ct_client_helpers:close_channel(Chan),
ok.
wrong_argument_type_test0(Config, Length) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
Chan = amqp_connection:open_channel(Conn),
DeclareArgs = [{<<"x-recent-history-length">>, long, Length}],
process_flag(trap_exit, true),
?assertExit(_, amqp_channel:call(Chan,
#'exchange.declare' {
exchange = make_exchange_name(Config, "0"),
type = <<"x-recent-history">>,
auto_delete = true,
arguments = DeclareArgs
})),
amqp_connection:close(Conn),
ok.
qs() ->
[<<"q0">>, <<"q1">>, <<"q2">>, <<"q3">>].
make_exchange_name(Config, Suffix) ->
B = rabbit_ct_helpers:get_config(Config, test_resource_name),
erlang:list_to_binary("x-" ++ B ++ "-" ++ Suffix).
from_mnesia_to_khepri(Config) ->
MsgCount = 10,
{Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
#'exchange.declare_ok'{} =
amqp_channel:call(Chan,
#'exchange.declare' {
exchange = make_exchange_name(Config, "1"),
type = <<"x-recent-history">>,
auto_delete = true
}),
#'exchange.declare_ok'{} =
amqp_channel:call(Chan,
#'exchange.declare' {
exchange = make_exchange_name(Config, "2"),
type = <<"direct">>,
auto_delete = true
}),
#'queue.declare_ok'{queue = Q} =
amqp_channel:call(Chan, #'queue.declare' {
queue = <<"q">>
}),
#'queue.bind_ok'{} =
amqp_channel:call(Chan, #'queue.bind' {
queue = Q,
exchange = make_exchange_name(Config, "2"),
routing_key = <<"">>
}),
#'tx.select_ok'{} = amqp_channel:call(Chan, #'tx.select'{}),
[amqp_channel:call(Chan,
#'basic.publish'{exchange = make_exchange_name(Config, "1")},
#amqp_msg{props = #'P_basic'{}, payload = <<>>}) ||
_ <- lists:duplicate(MsgCount, const)],
amqp_channel:call(Chan, #'tx.commit'{}),
amqp_channel:call(Chan,
#'exchange.bind' {
source = make_exchange_name(Config, "1"),
destination = make_exchange_name(Config, "2"),
routing_key = <<"">>
}),
case rabbit_ct_broker_helpers:enable_feature_flag(Config, khepri_db) of
ok ->
case rabbit_ct_broker_helpers:enable_feature_flag(Config, rabbit_recent_history_exchange_raft_based_metadata_store) of
ok ->
#'queue.declare_ok'{message_count = Count, queue = Q} =
amqp_channel:call(Chan, #'queue.declare' {
passive = true,
queue = Q
}),
?assertEqual(MsgCount, Count),
amqp_channel:call(Chan, #'exchange.delete' { exchange = make_exchange_name(Config, "1") }),
amqp_channel:call(Chan, #'exchange.delete' { exchange = make_exchange_name(Config, "2") }),
amqp_channel:call(Chan, #'queue.delete' { queue = Q }),
rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
ok;
Skip ->
Skip
end;
Skip ->
Skip
end.