rabbitmq-server/deps/rabbitmq_mqtt/test/cluster_SUITE.erl

163 lines
5.2 KiB
Erlang

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(cluster_SUITE).
-compile([export_all, nowarn_export_all]).
-include_lib("eunit/include/eunit.hrl").
-import(util, [expect_publishes/3,
connect/3,
connect/4,
await_exit/1]).
-import(rabbit_ct_broker_helpers,
[setup_steps/0,
teardown_steps/0,
get_node_config/3,
rabbitmqctl/3,
rpc/4, rpc/5,
stop_node/2
]).
-import(rabbit_ct_helpers,
[eventually/3]).
-define(OPTS, [{connect_timeout, 1},
{ack_timeout, 1}]).
all() ->
[
{group, v4},
{group, v5}
].
groups() ->
[
{v4, [], cluster_size_5()},
{v5, [], cluster_size_5()}
].
cluster_size_5() ->
[
connection_id_tracking,
connection_id_tracking_on_nodedown
].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
merge_app_env(Config) ->
rabbit_ct_helpers:merge_app_env(
Config,
{rabbit, [
{collect_statistics, basic},
{collect_statistics_interval, 100}
]}).
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(Group, Config) ->
rabbit_ct_helpers:set_config(
Config, [{rmq_nodes_count, 5},
{mqtt_version, Group},
{start_rmq_with_plugins_disabled, true}]).
end_per_group(_, Config) ->
Config.
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase),
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Testcase},
{rmq_nodes_clustered, true}
]),
Config2 = rabbit_ct_helpers:run_setup_steps(
Config1,
[fun merge_app_env/1] ++
setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
util:enable_plugin(Config2, rabbitmq_mqtt),
Config2.
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
%% -------------------------------------------------------------------
%% Test cases
%% -------------------------------------------------------------------
%% Note about running this testsuite in a mixed-versions cluster:
%% All even-numbered nodes will use the same code base when using a
%% secondary Umbrella. Odd-numbered nodes might use an incompatible code
%% base. When cluster-wide client ID tracking was introduced, it was not
%% put behind a feature flag because there was no need for one. Here, we
%% don't have a way to ensure that all nodes participate in client ID
%% tracking. However, those using the same code should. That's why we
%% limit our RPC calls to those nodes.
%%
%% That's also the reason why we use a 5-node cluster: with node 2 and
%% 4 which might not participate, it leaves nodes 1, 3 and 5: thus 3
%% nodes, the minimum to use Ra in proper conditions.
connection_id_tracking(Config) ->
Id = <<"duplicate-id">>,
C1 = connect(Id, Config, 0, ?OPTS),
{ok, _, _} = emqtt:subscribe(C1, <<"TopicA">>, qos0),
ok = emqtt:publish(C1, <<"TopicA">>, <<"Payload">>),
ok = expect_publishes(C1, <<"TopicA">>, [<<"Payload">>]),
%% there's one connection
assert_connection_count(Config, 4, 1),
%% connect to the same node (A or 0)
process_flag(trap_exit, true),
C2 = connect(Id, Config, 0, ?OPTS),
await_exit(C1),
assert_connection_count(Config, 4, 1),
%% connect to a different node (C or 2)
C3 = connect(Id, Config, 2, ?OPTS),
await_exit(C2),
assert_connection_count(Config, 4, 1),
ok = emqtt:disconnect(C3).
connection_id_tracking_on_nodedown(Config) ->
C = connect(<<"simpleClient">>, Config, ?OPTS),
{ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos0),
ok = emqtt:publish(C, <<"TopicA">>, <<"Payload">>),
ok = expect_publishes(C, <<"TopicA">>, [<<"Payload">>]),
assert_connection_count(Config, 4, 1),
process_flag(trap_exit, true),
ok = stop_node(Config, 0),
await_exit(C),
ok = eventually(?_assertEqual([], util:all_connection_pids(Config)), 500, 4).
%%
%% Helpers
%%
assert_connection_count(_Config, 0, NumElements) ->
ct:fail("failed to match connection count ~b", [NumElements]);
assert_connection_count(Config, Retries, NumElements) ->
case util:all_connection_pids(Config) of
Pids when length(Pids) =:= NumElements ->
ok;
Pids ->
ct:pal("Waiting for ~b connections, got following connections: ~p",
[NumElements, Pids]),
timer:sleep(500),
assert_connection_count(Config, Retries-1, NumElements)
end.