rabbitmq-server/deps/rabbit/test/clustering_recovery_SUITE.erl

548 lines
20 KiB
Erlang

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(clustering_recovery_SUITE).
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-compile(nowarn_export_all).
-compile(export_all).
-import(clustering_utils, [
assert_status/2,
assert_cluster_status/2,
assert_clustered/1,
assert_not_clustered/1
]).
all() ->
[
{group, mnesia_store},
{group, khepri_store}
].
groups() ->
[{mnesia_store, [], [
{clustered_3_nodes, [],
[{cluster_size_3, [], [
force_shrink_quorum_queue,
force_shrink_all_quorum_queues
]}
]}
]},
{khepri_store, [], [
{clustered_3_nodes, [],
[{cluster_size_3, [], [
force_standalone_boot,
force_standalone_boot_and_restart,
force_standalone_boot_and_restart_with_quorum_queues,
recover_after_partition_with_leader
]}
]},
{clustered_5_nodes, [],
[{cluster_size_5, [], [
rolling_restart,
rolling_kill_restart,
forget_down_node
]}]
}
]}
].
suite() ->
[
%% If a testcase hangs, no need to wait for 30 minutes.
{timetrap, {minutes, 10}}
].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(
Config,
[fun rabbit_ct_broker_helpers:configure_dist_proxy/1]).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(khepri_store, Config) ->
case rabbit_ct_broker_helpers:configured_metadata_store(Config) of
mnesia ->
{skip, "These tests target Khepri"};
_ ->
Config
end;
init_per_group(mnesia_store, Config) ->
case rabbit_ct_broker_helpers:configured_metadata_store(Config) of
khepri ->
{skip, "These tests target mnesia"};
_ ->
Config
end;
init_per_group(clustered_3_nodes, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]);
init_per_group(clustered_5_nodes, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]);
init_per_group(cluster_size_3, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}]);
init_per_group(cluster_size_5, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 5}]).
end_per_group(_, Config) ->
Config.
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase),
ClusterSize = ?config(rmq_nodes_count, Config),
TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Testcase},
{tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}},
{keep_pid_file_on_exit, true}
]),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
end_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
%% -------------------------------------------------------------------
%% Testcases
%% -------------------------------------------------------------------
force_shrink_all_quorum_queues(Config) ->
[Rabbit, Hare, Bunny] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
QName1 = quorum_queue_name(1),
QName2 = quorum_queue_name(2),
QName3 = quorum_queue_name(3),
Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}],
declare_and_publish_to_queue(Config, Rabbit, QName1, Args),
declare_and_publish_to_queue(Config, Rabbit, QName2, Args),
declare_and_publish_to_queue(Config, Rabbit, QName3, Args),
ok = rabbit_ct_broker_helpers:stop_node(Config, Hare),
ok = rabbit_ct_broker_helpers:stop_node(Config, Bunny),
Ch = rabbit_ct_client_helpers:open_channel(Config, Rabbit),
?assertExit(
{{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
amqp_channel:subscribe(Ch, #'basic.consume'{queue = QName1,
consumer_tag = <<"ctag">>},
self())),
ok = rabbit_ct_broker_helpers:rpc(Config, Rabbit, rabbit_quorum_queue, force_all_queues_shrink_member_to_current_member, []),
ok = consume_from_queue(Config, Rabbit, QName1),
ok = consume_from_queue(Config, Rabbit, QName2),
ok = consume_from_queue(Config, Rabbit, QName3),
ok.
force_shrink_quorum_queue(Config) ->
[Rabbit, Hare, Bunny] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
QName1 = quorum_queue_name(1),
Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}],
declare_and_publish_to_queue(Config, Rabbit, QName1, Args),
ok = rabbit_ct_broker_helpers:stop_node(Config, Hare),
ok = rabbit_ct_broker_helpers:stop_node(Config, Bunny),
Ch = rabbit_ct_client_helpers:open_channel(Config, Rabbit),
?assertExit(
{{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
amqp_channel:subscribe(Ch, #'basic.consume'{queue = QName1,
consumer_tag = <<"ctag">>},
self())),
ok = rabbit_ct_broker_helpers:rpc(Config, Rabbit, rabbit_quorum_queue, force_shrink_member_to_current_member, [<<"/">>, QName1]),
ok = consume_from_queue(Config, Rabbit, QName1).
force_standalone_boot(Config) ->
%% Test for disaster recovery procedure command
[Rabbit, Hare, Bunny] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
assert_cluster_status({[Rabbit, Hare, Bunny], [Rabbit, Hare, Bunny], [Rabbit, Hare, Bunny]},
[Rabbit, Hare, Bunny]),
ok = rabbit_ct_broker_helpers:stop_node(Config, Hare),
ok = rabbit_ct_broker_helpers:stop_node(Config, Bunny),
ok = force_standalone_khepri_boot(Rabbit),
assert_cluster_status({[Rabbit], [Rabbit], [Rabbit], [Rabbit], [Rabbit]},
[Rabbit]),
ok.
force_standalone_boot_and_restart(Config) ->
%% Test for disaster recovery procedure
%%
%% 3-node cluster. Declare and publish to a classic queue on node 1.
%% Stop the two remaining nodes. Force standalone boot on the node
%% left. Restart it. Consume all the messages.
[Rabbit, Hare, Bunny] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
assert_cluster_status({[Rabbit, Hare, Bunny], [Rabbit, Hare, Bunny], [Rabbit, Hare, Bunny]},
[Rabbit, Hare, Bunny]),
QName = classic_queue_name(Rabbit),
Args = [{<<"x-queue-type">>, longstr, <<"classic">>}],
declare_and_publish_to_queue(Config, Rabbit, QName, Args),
ok = rabbit_ct_broker_helpers:stop_node(Config, Hare),
ok = rabbit_ct_broker_helpers:stop_node(Config, Bunny),
ok = force_standalone_khepri_boot(Rabbit),
assert_cluster_status({[Rabbit], [Rabbit], [Rabbit], [Rabbit], [Rabbit]},
[Rabbit]),
ok = rabbit_ct_broker_helpers:stop_node(Config, Rabbit),
ok = rabbit_ct_broker_helpers:start_node(Config, Rabbit),
consume_from_queue(Config, Rabbit, QName),
ok.
force_standalone_boot_and_restart_with_quorum_queues(Config) ->
%% Test for disaster recovery procedure
%%
%% 3-node cluster. Declare and publish to a classic queue on node 1.
%% Stop the two remaining nodes. Force standalone boot on the node
%% left. Restart it. Consume all the messages.
[Rabbit, Hare, Bunny] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
assert_cluster_status({[Rabbit, Hare, Bunny], [Rabbit, Hare, Bunny], [Rabbit, Hare, Bunny]},
[Rabbit, Hare, Bunny]),
QName1 = quorum_queue_name(1),
QName2 = quorum_queue_name(2),
Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}],
declare_and_publish_to_queue(Config, Rabbit, QName1, Args),
declare_and_publish_to_queue(Config, Rabbit, QName2, Args),
ok = rabbit_ct_broker_helpers:stop_node(Config, Hare),
ok = rabbit_ct_broker_helpers:stop_node(Config, Bunny),
ok = force_standalone_khepri_boot(Rabbit),
ok = rabbit_ct_broker_helpers:rpc(Config, Rabbit, rabbit_quorum_queue, force_all_queues_shrink_member_to_current_member, []),
assert_cluster_status({[Rabbit], [Rabbit], [Rabbit], [Rabbit], [Rabbit]},
[Rabbit]),
ok = rabbit_ct_broker_helpers:stop_node(Config, Rabbit),
ok = rabbit_ct_broker_helpers:start_node(Config, Rabbit),
consume_from_queue(Config, Rabbit, QName1),
consume_from_queue(Config, Rabbit, QName2),
ok.
recover_after_partition_with_leader(Config) ->
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
%% We use intermediate Erlang nodes between the common_test control node
%% and the RabbitMQ nodes, using `peer' standard_io communication. The goal
%% is to make sure the common_test control node doesn't interfere with the
%% nodes the RabbitMQ nodes can see, despite the blocking of the Erlang
%% distribution connection.
Proxies0 = [begin
{ok, Proxy, PeerNode} = peer:start_link(
#{name => peer:random_name(),
connection => standard_io,
wait_boot => 120000}),
ct:pal("Proxy ~0p -> ~0p", [Proxy, PeerNode]),
Proxy
end || _ <- Nodes],
Proxies = maps:from_list(lists:zip(Nodes, Proxies0)),
ct:pal("Proxies: ~p", [Proxies]),
Config1 = [{proxies, Proxies} | Config],
NodeA = hd(Nodes),
ct:pal("Prevent automatic reconnection on the common_test node"),
application:set_env(kernel, dist_auto_connect, never),
ct:pal("Disconnect the common_test node from RabbitMQ nodes"),
lists:foreach(fun erlang:disconnect_node/1, Nodes),
ct:pal(
"Ensure RabbitMQ nodes only know about the RabbitMQ nodes "
"(and their proxy)"),
lists:foreach(
fun(Node) ->
?awaitMatch(
Nodes,
get_connected_nodes(Config1, Node),
30000)
end, Nodes),
ct:pal("Wait for a Khepri leader to be elected"),
?awaitMatch({ok, _}, get_leader_node(Config1, NodeA), 30000),
ct:pal("Query the Khepri leader nodename"),
{ok, Leader} = get_leader_node(Config1, NodeA),
Followers = Nodes -- [Leader],
ct:pal("Leader: ~0p~nFollowers: ~p", [Leader, Followers]),
lists:foreach(
fun(Follower) ->
ct:pal(
?LOW_IMPORTANCE,
"Blocking traffic between ~ts and ~ts",
[Leader, Follower]),
?assertEqual(
ok,
proxied_rpc(
Config1, Leader, inet_tcp_proxy_dist, block, [Follower])),
?assertEqual(
ok,
proxied_rpc(
Config1, Follower, inet_tcp_proxy_dist, block, [Leader]))
end, Followers),
ct:pal(
"Ensure the leader node is disconnected from other RabbitMQ nodes"),
?awaitMatch(
[Leader],
get_connected_nodes(Config1, Leader),
30000),
ct:pal(
"Ensure the follower nodes are disconnected from the leader node"),
lists:foreach(
fun(Follower) ->
?awaitMatch(
Followers,
get_connected_nodes(Config1, Follower),
30000)
end, Followers),
ct:pal("Wait for each side of the partition to have its own leader"),
Follower1 = hd(Followers),
?awaitMatch(
false,
begin
LeaderA = get_leader_node(Config1, Leader),
LeaderB = get_leader_node(Config1, Follower1),
ct:pal("LeaderA: ~0p~nLeaderB: ~0p", [LeaderA, LeaderB]),
LeaderA =:= LeaderB
end,
30000),
ct:pal("Waiting for 2 minutes"),
timer:sleep(120000),
ct:pal("Query Khepri status for each RabbitMQ node"),
PerNodeStatus1 = get_per_node_khepri_status(Config1),
ct:pal("Per-node Khepri status (during partition):~n~p", [PerNodeStatus1]),
lists:foreach(
fun(Follower) ->
ct:pal(
?LOW_IMPORTANCE,
"Unblocking traffic between ~ts and ~ts",
[Leader, Follower]),
?assertEqual(
ok,
proxied_rpc(
Config1, Leader, inet_tcp_proxy_dist, allow, [Follower])),
?assertEqual(
ok,
proxied_rpc(
Config1, Follower, inet_tcp_proxy_dist, allow, [Leader]))
end, Followers),
ct:pal("Wait for the whole cluster to agree on the same leader"),
?awaitMatch(
true,
begin
LeaderA = get_leader_node(Config1, Leader),
LeaderB = get_leader_node(Config1, Follower1),
ct:pal("LeaderA: ~0p~nLeaderB: ~0p", [LeaderA, LeaderB]),
LeaderA =:= LeaderB
end,
30000),
ct:pal("Query Khepri status for each RabbitMQ node"),
PerNodeStatus2 = get_per_node_khepri_status(Config1),
ct:pal("Per-node Khepri status (after recovery):~n~p", [PerNodeStatus2]),
ct:pal("Restore automatic reconnection on the common_test node"),
application:unset_env(kernel, dist_auto_connect),
ok.
proxied_rpc(Config, Node, Module, Function, Args) ->
Proxies = ?config(proxies, Config),
Proxy = maps:get(Node, Proxies),
peer:call(
Proxy, rabbit_ct_broker_helpers, rpc,
[Config, Node, Module, Function, Args]).
get_leader_node(Config, Node) ->
StoreId = rabbit_khepri:get_store_id(),
Ret = proxied_rpc(
Config, Node,
ra_leaderboard, lookup_leader, [StoreId]),
case Ret of
{StoreId, LeaderNode} ->
{ok, LeaderNode};
undefined ->
{error, no_leader}
end.
get_connected_nodes(Config, Node) ->
Proxies = ?config(proxies, Config),
Proxy = maps:get(Node, Proxies),
Peer = peer:call(Proxy, erlang, node, []),
OtherNodes = proxied_rpc(Config, Node, erlang, nodes, []),
lists:sort([Node | OtherNodes -- [Peer]]).
get_per_node_khepri_status(Config) ->
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
maps:from_list(
lists:map(
fun(Node) ->
Status = proxied_rpc(Config, Node, rabbit_khepri, status, []),
{Node, Status}
end, Nodes)).
rolling_restart(Config) ->
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Args = [{<<"x-queue-type">>, longstr, <<"classic">>}],
[begin
QName = classic_queue_name(N),
declare_and_publish_to_queue(Config, N, QName, Args)
end || N <- Nodes],
[begin
ok = rabbit_ct_broker_helpers:stop_node(Config, N),
ok = rabbit_ct_broker_helpers:start_node(Config, N)
end || N <- Nodes],
assert_cluster_status({Nodes, Nodes, Nodes}, Nodes),
[begin
QName = classic_queue_name(N),
consume_from_queue(Config, N, QName)
end || N <- Nodes],
ok.
rolling_kill_restart(Config) ->
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Args = [{<<"x-queue-type">>, longstr, <<"classic">>}],
[begin
QName = classic_queue_name(N),
declare_and_publish_to_queue(Config, N, QName, Args)
end || N <- Nodes],
Ret0 =
[begin
ok = rabbit_ct_broker_helpers:kill_node(Config, N),
{N, rabbit_ct_broker_helpers:start_node(Config, N)}
end || N <- Nodes],
Failed = [Pair || {_, V} = Pair <- Ret0, V =/= ok],
?assert(length(Failed) =< 1),
case Failed of
[] ->
assert_cluster_status({Nodes, Nodes, Nodes}, Nodes),
[begin
QName = classic_queue_name(N),
consume_from_queue(Config, N, QName)
end || N <- Nodes];
[{FailedNode, {error, _}}] ->
[Node0 | _] = RemainingNodes = Nodes -- [FailedNode],
ok = forget_cluster_node(Node0, FailedNode),
assert_cluster_status({RemainingNodes, RemainingNodes, RemainingNodes}, RemainingNodes)
end,
ok.
forget_down_node(Config) ->
[Rabbit, Hare | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ok = rabbit_ct_broker_helpers:stop_node(Config, Rabbit),
ok = forget_cluster_node(Hare, Rabbit),
NNodes = lists:nthtail(1, Nodes),
assert_cluster_status({NNodes, NNodes, NNodes}, NNodes),
ok.
%% -------------------------------------------------------------------
%% Internal utils
%% -------------------------------------------------------------------
declare_and_publish_to_queue(Config, Node, QName, Args) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, Node),
declare(Ch, QName, Args),
publish_many(Ch, QName, 10),
rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
quorum_queue_name(Number) ->
list_to_binary(io_lib:format("quorum_queue_~p", [Number])).
classic_queue_name(Node) ->
list_to_binary(io_lib:format("classic_queue_~p", [Node])).
declare(Ch, Name, Args) ->
amqp_channel:call(Ch, #'queue.declare'{durable = true,
queue = Name,
arguments = Args}).
consume_from_queue(Config, Node, QName) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, Node),
subscribe(Ch, QName),
consume(10),
rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
force_standalone_khepri_boot(Node) ->
rabbit_control_helper:command(force_standalone_khepri_boot, Node, []).
forget_cluster_node(Node, Removee) ->
rabbit_control_helper:command(forget_cluster_node, Node, [atom_to_list(Removee)], []).
publish_many(Ch, QName, N) ->
amqp_channel:call(Ch, #'confirm.select'{}),
[amqp_channel:cast(Ch, #'basic.publish'{routing_key = QName},
#amqp_msg{props = #'P_basic'{delivery_mode = 2}})
|| _ <- lists:seq(1, N)],
amqp_channel:wait_for_confirms(Ch).
subscribe(Ch, QName) ->
CTag = <<"ctag">>,
amqp_channel:subscribe(Ch, #'basic.consume'{queue = QName,
consumer_tag = CTag},
self()),
receive
#'basic.consume_ok'{consumer_tag = CTag} ->
ok
after 30000 ->
exit(consume_ok_timeout)
end.
consume(0) ->
ok;
consume(N) ->
receive
{#'basic.deliver'{consumer_tag = <<"ctag">>}, _} ->
consume(N - 1)
after 30000 ->
exit(deliver_timeout)
end.