rabbitmq-server/deps/rabbit/test/cluster_SUITE.erl

353 lines
12 KiB
Erlang

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(cluster_SUITE).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include("amqqueue.hrl").
-compile(nowarn_export_all).
-compile(export_all).
-define(TIMEOUT, 30000).
-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
-define(CLUSTER_TESTCASES, [
delegates_async,
delegates_sync,
queue_cleanup,
declare_on_dead_queue
]).
all() ->
[
{group, cluster_tests},
{group, stop_app_tests}
].
groups() ->
[
{cluster_tests, [], [
{from_cluster_node1, [], ?CLUSTER_TESTCASES},
{from_cluster_node2, [], ?CLUSTER_TESTCASES}
]},
{stop_app_tests, [], [
credentials_obfuscation
]}
].
group(_) ->
[].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Group},
{rmq_nodes_count, 2}
]),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps() ++ [
fun(C) -> init_per_group1(Group, C) end
]);
false ->
rabbit_ct_helpers:run_steps(Config, [
fun(C) -> init_per_group1(Group, C) end
])
end.
init_per_group1(from_cluster_node1, Config) ->
rabbit_ct_helpers:set_config(Config, {test_direction, {0, 1}});
init_per_group1(from_cluster_node2, Config) ->
rabbit_ct_helpers:set_config(Config, {test_direction, {1, 0}});
init_per_group1(_, Config) ->
Config.
end_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps());
false ->
Config
end.
init_per_testcase(queue_cleanup = Testcase, Config) ->
case lists:any(fun(B) -> B end,
rabbit_ct_broker_helpers:rpc_all(
Config, rabbit_feature_flags, is_enabled,
[khepri_db])) of
true ->
{skip, "Invalid testcase using Khepri. All queues are durable"};
false ->
rabbit_ct_helpers:testcase_started(Config, Testcase)
end;
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
%% ---------------------------------------------------------------------------
%% Cluster-dependent tests.
%% ---------------------------------------------------------------------------
delegates_async(Config) ->
{I, J} = ?config(test_direction, Config),
From = rabbit_ct_broker_helpers:get_node_config(Config, I, nodename),
To = rabbit_ct_broker_helpers:get_node_config(Config, J, nodename),
rabbit_ct_broker_helpers:add_code_path_to_node(To, ?MODULE),
passed = rabbit_ct_broker_helpers:rpc(Config,
From, ?MODULE, delegates_async1, [Config, To]).
delegates_async1(_Config, SecondaryNode) ->
Self = self(),
Sender = fun (Pid) -> Pid ! {invoked, Self} end,
Responder = make_responder(fun ({invoked, Pid}) -> Pid ! response end),
ok = delegate:invoke_no_result(spawn(Responder), Sender),
ok = delegate:invoke_no_result(spawn(SecondaryNode, Responder), Sender),
await_response(2),
passed.
delegates_sync(Config) ->
{I, J} = ?config(test_direction, Config),
From = rabbit_ct_broker_helpers:get_node_config(Config, I, nodename),
To = rabbit_ct_broker_helpers:get_node_config(Config, J, nodename),
rabbit_ct_broker_helpers:add_code_path_to_node(To, ?MODULE),
passed = rabbit_ct_broker_helpers:rpc(Config,
From, ?MODULE, delegates_sync1, [Config, To]).
delegates_sync1(_Config, SecondaryNode) ->
Sender = fun (Pid) -> gen_server:call(Pid, invoked, infinity) end,
BadSender = fun (_Pid) -> exit(exception) end,
Responder = make_responder(fun ({'$gen_call', From, invoked}) ->
gen_server:reply(From, response)
end),
BadResponder = make_responder(fun ({'$gen_call', From, invoked}) ->
gen_server:reply(From, response)
end, bad_responder_died),
response = delegate:invoke(spawn(Responder), Sender),
response = delegate:invoke(spawn(SecondaryNode, Responder), Sender),
must_exit(fun () -> delegate:invoke(spawn(BadResponder), BadSender) end),
must_exit(fun () ->
delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end),
LocalGoodPids = spawn_responders(node(), Responder, 2),
RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2),
LocalBadPids = spawn_responders(node(), BadResponder, 2),
RemoteBadPids = spawn_responders(SecondaryNode, BadResponder, 2),
{GoodRes, []} = delegate:invoke(LocalGoodPids ++ RemoteGoodPids, Sender),
true = lists:all(fun ({_, response}) -> true end, GoodRes),
GoodResPids = [Pid || {Pid, _} <- GoodRes],
Good = lists:usort(LocalGoodPids ++ RemoteGoodPids),
Good = lists:usort(GoodResPids),
{[], BadRes} = delegate:invoke(LocalBadPids ++ RemoteBadPids, BadSender),
true = lists:all(fun ({_, {exit, exception, _}}) -> true end, BadRes),
BadResPids = [Pid || {Pid, _} <- BadRes],
Bad = lists:usort(LocalBadPids ++ RemoteBadPids),
Bad = lists:usort(BadResPids),
MagicalPids = [rabbit_misc:string_to_pid(Str) ||
Str <- ["<nonode@nohost.0.1.0>", "<nonode@nohost.0.2.0>"]],
{[], BadNodes} = delegate:invoke(MagicalPids, Sender),
true = lists:all(
fun ({_, {exit, {nodedown, nonode@nohost}, _Stack}}) -> true end,
BadNodes),
BadNodesPids = [Pid || {Pid, _} <- BadNodes],
Magical = lists:usort(MagicalPids),
Magical = lists:usort(BadNodesPids),
passed.
queue_cleanup(Config) ->
{I, J} = ?config(test_direction, Config),
From = rabbit_ct_broker_helpers:get_node_config(Config, I, nodename),
To = rabbit_ct_broker_helpers:get_node_config(Config, J, nodename),
rabbit_ct_broker_helpers:add_code_path_to_node(To, ?MODULE),
passed = rabbit_ct_broker_helpers:rpc(Config,
From, ?MODULE, queue_cleanup1, [Config, To]).
queue_cleanup1(_Config, _SecondaryNode) ->
{_Writer, Ch} = test_spawn(),
rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }),
receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} ->
ok
after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
end,
rabbit_channel:shutdown(Ch),
rabbit:stop(),
rabbit:start(),
{_Writer2, Ch2} = test_spawn(),
rabbit_channel:do(Ch2, #'queue.declare'{ passive = true,
queue = ?CLEANUP_QUEUE_NAME }),
receive
#'channel.close'{reply_code = ?NOT_FOUND} ->
ok
after ?TIMEOUT -> throw(failed_to_receive_channel_exit)
end,
rabbit_channel:shutdown(Ch2),
passed.
declare_on_dead_queue(Config) ->
{I, J} = ?config(test_direction, Config),
From = rabbit_ct_broker_helpers:get_node_config(Config, I, nodename),
To = rabbit_ct_broker_helpers:get_node_config(Config, J, nodename),
rabbit_ct_broker_helpers:add_code_path_to_node(To, ?MODULE),
passed = rabbit_ct_broker_helpers:rpc(Config,
From, ?MODULE, declare_on_dead_queue1, [Config, To]).
declare_on_dead_queue1(_Config, SecondaryNode) ->
QueueName = rabbit_misc:r(<<"/">>, queue, ?CLEANUP_QUEUE_NAME),
Self = self(),
Pid = spawn(SecondaryNode,
fun () ->
{new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, <<"acting-user">>),
QueueName = ?amqqueue_field_name(Q),
QPid = ?amqqueue_field_pid(Q),
exit(QPid, kill),
Self ! {self(), killed, QPid}
end),
receive
{Pid, killed, OldPid} ->
Q = dead_queue_loop(QueueName, OldPid),
{ok, 0} = rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>),
passed
after ?TIMEOUT -> throw(failed_to_create_and_kill_queue)
end.
make_responder(FMsg) -> make_responder(FMsg, timeout).
make_responder(FMsg, Throw) ->
fun () ->
receive Msg -> FMsg(Msg)
after ?TIMEOUT -> throw(Throw)
end
end.
spawn_responders(Node, Responder, Count) ->
[spawn(Node, Responder) || _ <- lists:seq(1, Count)].
await_response(0) ->
ok;
await_response(Count) ->
receive
response -> ok,
await_response(Count - 1)
after ?TIMEOUT -> throw(timeout)
end.
must_exit(Fun) ->
try
Fun(),
throw(exit_not_thrown)
catch
exit:_ -> ok
end.
dead_queue_loop(QueueName, OldPid) ->
{existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, <<"acting-user">>),
QPid = ?amqqueue_field_pid(Q),
case QPid of
OldPid -> timer:sleep(25),
dead_queue_loop(QueueName, OldPid);
_ -> true = rabbit_misc:is_process_alive(QPid),
Q
end.
test_spawn() ->
{Writer, _Limiter, Ch} = rabbit_ct_broker_helpers:test_channel(),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok)
end,
{Writer, Ch}.
test_spawn(Node) ->
rpc:call(Node, ?MODULE, test_spawn_remote, []).
%% Spawn an arbitrary long lived process, so we don't end up linking
%% the channel to the short-lived process (RPC, here) spun up by the
%% RPC server.
test_spawn_remote() ->
RPC = self(),
spawn(fun () ->
{Writer, Ch} = test_spawn(),
RPC ! {Writer, Ch},
link(Ch),
receive
_ -> ok
end
end),
receive Res -> Res
after ?TIMEOUT -> throw(failed_to_receive_result)
end.
queue_name(Config, Name) ->
Name1 = iolist_to_binary(rabbit_ct_helpers:config_to_testcase_name(Config, Name)),
queue_name(Name1).
queue_name(Name) ->
rabbit_misc:r(<<"/">>, queue, Name).
credentials_obfuscation(Config) ->
Value = <<"amqp://something">>,
Obfuscated0 = obfuscate_secret(Config, 0, Value),
Obfuscated1 = obfuscate_secret(Config, 1, Value),
ok = rabbit_ct_broker_helpers:restart_broker(Config, 1),
?assertEqual(Value, deobfuscate_secret(Config, 0, Obfuscated0)),
?assertEqual(Value, deobfuscate_secret(Config, 1, Obfuscated1)),
?assertEqual(Value, deobfuscate_secret(Config, 0, Obfuscated1)),
?assertEqual(Value, deobfuscate_secret(Config, 1, Obfuscated1)),
Obfuscated2 = obfuscate_secret(Config, 1, Value),
ok = rabbit_ct_broker_helpers:restart_broker(Config, 0),
?assertEqual(Value, deobfuscate_secret(Config, 0, Obfuscated2)),
ok.
obfuscate_secret(Config, Node, Value) ->
{encrypted, _} = Result = rabbit_ct_broker_helpers:rpc(Config, Node,
credentials_obfuscation, encrypt, [Value]),
Result.
deobfuscate_secret(Config, Node, Encrypted) ->
rabbit_ct_broker_helpers:rpc(Config, Node,
credentials_obfuscation, decrypt, [Encrypted]).