436 lines
15 KiB
Erlang
436 lines
15 KiB
Erlang
%% This Source Code Form is subject to the terms of the Mozilla Public
|
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
%%
|
|
%% The Initial Developer of the Original Code is AWeber Communications.
|
|
%% Copyright (c) 2015-2016 AWeber Communications
|
|
%% Copyright (c) 2007-2024 Broadcom. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. All rights reserved.
|
|
%%
|
|
|
|
-module(system_SUITE).
|
|
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
-include("rabbit_peer_discovery_etcd.hrl").
|
|
|
|
-define(ETCD_GIT_REPO, "https://github.com/etcd-io/etcd.git").
|
|
-define(ETCD_GIT_REF, "v3.5.13").
|
|
|
|
-export([all/0,
|
|
groups/0,
|
|
init_per_suite/1,
|
|
end_per_suite/1,
|
|
init_per_group/2,
|
|
end_per_group/2,
|
|
init_per_testcase/2,
|
|
end_per_testcase/2,
|
|
|
|
init_opens_a_connection_test/1,
|
|
registration_with_locking_test/1,
|
|
start_one_member_at_a_time/1,
|
|
start_members_concurrently/1]).
|
|
|
|
all() ->
|
|
[
|
|
{group, v3_client},
|
|
{group, clustering}
|
|
].
|
|
|
|
groups() ->
|
|
[
|
|
{v3_client, [], [
|
|
init_opens_a_connection_test,
|
|
registration_with_locking_test
|
|
]},
|
|
{clustering, [], [start_one_member_at_a_time,
|
|
start_members_concurrently]}
|
|
].
|
|
|
|
init_per_suite(Config) ->
|
|
rabbit_ct_helpers:log_environment(),
|
|
rabbit_ct_helpers:run_setup_steps(
|
|
Config,
|
|
[fun clone_etcd/1,
|
|
fun compile_etcd/1,
|
|
fun start_etcd/1]).
|
|
|
|
end_per_suite(Config) ->
|
|
rabbit_ct_helpers:run_teardown_steps(Config, [fun stop_etcd/1]).
|
|
|
|
init_per_group(clustering, Config) ->
|
|
Config1 = rabbit_ct_helpers:set_config(
|
|
Config,
|
|
[{rmq_nodes_count, 3},
|
|
{rmq_nodes_clustered, false}]),
|
|
rabbit_ct_helpers:merge_app_env(
|
|
Config1, {rabbit, [{forced_feature_flags_on_init, []}]});
|
|
init_per_group(_Group, Config) ->
|
|
Config.
|
|
|
|
end_per_group(_Group, Config) ->
|
|
Config.
|
|
|
|
init_per_testcase(Testcase, Config)
|
|
when Testcase =:= start_one_member_at_a_time orelse
|
|
Testcase =:= start_members_concurrently ->
|
|
rabbit_ct_helpers:testcase_started(Config, Testcase),
|
|
TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
|
|
ClusterSize = ?config(rmq_nodes_count, Config),
|
|
Config1 = rabbit_ct_helpers:set_config(
|
|
Config,
|
|
[{rmq_nodename_suffix, Testcase},
|
|
{tcp_ports_base, {skip_n_nodes,
|
|
TestNumber * ClusterSize}}
|
|
]),
|
|
Config2 = rabbit_ct_helpers:merge_app_env(
|
|
Config1, {rabbit, [{log, [{file, [{level, debug}]}]}]}),
|
|
Config3 = rabbit_ct_helpers:run_steps(
|
|
Config2,
|
|
rabbit_ct_broker_helpers:setup_steps() ++
|
|
rabbit_ct_client_helpers:setup_steps()),
|
|
case Config3 of
|
|
_ when is_list(Config3) ->
|
|
try
|
|
SameMacVer = (
|
|
rabbit_ct_broker_helpers:
|
|
do_nodes_run_same_ra_machine_version(
|
|
Config3, khepri_machine)),
|
|
case SameMacVer of
|
|
true ->
|
|
_ = rabbit_ct_broker_helpers:rpc_all(
|
|
Config3,
|
|
rabbit_peer_discovery_backend, api_version, []),
|
|
Config3;
|
|
false ->
|
|
Config5 = rabbit_ct_helpers:run_steps(
|
|
Config3,
|
|
rabbit_ct_client_helpers:teardown_steps()
|
|
++
|
|
rabbit_ct_broker_helpers:teardown_steps()),
|
|
rabbit_ct_helpers:testcase_finished(Config5, Testcase),
|
|
{skip,
|
|
"Nodes are using different Khepri Ra machine "
|
|
"versions; clustering will likely fail"}
|
|
end
|
|
catch
|
|
error:{exception, undef,
|
|
[{rabbit_peer_discovery_backend, api_version, _, _}
|
|
| _]} ->
|
|
Config4 = rabbit_ct_helpers:run_steps(
|
|
Config3,
|
|
rabbit_ct_client_helpers:teardown_steps() ++
|
|
rabbit_ct_broker_helpers:teardown_steps()),
|
|
rabbit_ct_helpers:testcase_finished(Config4, Testcase),
|
|
{skip,
|
|
"Some nodes use the old discover->register order; "
|
|
"the testcase would likely fail"}
|
|
end;
|
|
{skip, _} ->
|
|
Config3
|
|
end;
|
|
init_per_testcase(_Testcase, Config) ->
|
|
Config.
|
|
|
|
end_per_testcase(Testcase, Config)
|
|
when Testcase =:= start_one_member_at_a_time orelse
|
|
Testcase =:= start_members_concurrently ->
|
|
Config1 = rabbit_ct_helpers:run_steps(
|
|
Config,
|
|
rabbit_ct_client_helpers:teardown_steps() ++
|
|
rabbit_ct_broker_helpers:teardown_steps()),
|
|
rabbit_ct_helpers:testcase_finished(Config1, Testcase);
|
|
end_per_testcase(_Testcase, Config) ->
|
|
Config.
|
|
|
|
clone_etcd(Config) ->
|
|
DataDir = ?config(data_dir, Config),
|
|
EtcdSrcdir = filename:join(DataDir, "etcd"),
|
|
Cmd = case filelib:is_dir(EtcdSrcdir) of
|
|
true ->
|
|
ct:pal(
|
|
"Checking out etcd Git reference, ref = ~s",
|
|
[?ETCD_GIT_REF]),
|
|
["git", "-C", EtcdSrcdir,
|
|
"checkout", ?ETCD_GIT_REF];
|
|
false ->
|
|
ct:pal(
|
|
"Cloning etcd Git repository, ref = ~s",
|
|
[?ETCD_GIT_REF]),
|
|
["git", "clone",
|
|
"--branch", ?ETCD_GIT_REF,
|
|
?ETCD_GIT_REPO, EtcdSrcdir]
|
|
end,
|
|
case rabbit_ct_helpers:exec(Cmd) of
|
|
{ok, _} ->
|
|
rabbit_ct_helpers:set_config(Config, {etcd_srcdir, EtcdSrcdir});
|
|
{error, _} ->
|
|
{skip, "Failed to clone etcd"}
|
|
end.
|
|
|
|
compile_etcd(Config) ->
|
|
EtcdSrcdir = ?config(etcd_srcdir, Config),
|
|
ct:pal("Compiling etcd in ~ts", [EtcdSrcdir]),
|
|
Script0 = case os:type() of
|
|
{win32, _} -> "build.bat";
|
|
_ -> "build.sh"
|
|
end,
|
|
Script1 = filename:join(EtcdSrcdir, Script0),
|
|
Cmd = [Script1],
|
|
GOPATH = filename:join(EtcdSrcdir, "go"),
|
|
GOFLAGS = "-modcacherw",
|
|
Options = [{cd, EtcdSrcdir},
|
|
{env, [{"BINDIR", false},
|
|
{"GOPATH", GOPATH},
|
|
{"GOFLAGS", GOFLAGS}]}],
|
|
case rabbit_ct_helpers:exec(Cmd, Options) of
|
|
{ok, _} ->
|
|
EtcdExe = case os:type() of
|
|
{win32, _} -> "etcd.exe";
|
|
_ -> "etcd"
|
|
end,
|
|
EtcdBin = filename:join([EtcdSrcdir, "bin", EtcdExe]),
|
|
?assert(filelib:is_regular(EtcdBin)),
|
|
rabbit_ct_helpers:set_config(Config, {etcd_bin, EtcdBin});
|
|
{error, _} ->
|
|
{skip, "Failed to compile etcd"}
|
|
end.
|
|
|
|
start_etcd(Config) ->
|
|
ct:pal("Starting etcd daemon"),
|
|
EtcdBin = ?config(etcd_bin, Config),
|
|
PrivDir = ?config(priv_dir, Config),
|
|
EtcdDataDir = filename:join(PrivDir, "data.etcd"),
|
|
EtcdName = ?MODULE_STRING,
|
|
EtcdHost = "localhost",
|
|
EtcdClientPort = 2379,
|
|
EtcdClientUrl = rabbit_misc:format(
|
|
"http://~s:~b", [EtcdHost, EtcdClientPort]),
|
|
EtcdAdvPort = 2380,
|
|
EtcdAdvUrl = rabbit_misc:format(
|
|
"http://~s:~b", [EtcdHost, EtcdAdvPort]),
|
|
Cmd = [EtcdBin,
|
|
"--data-dir", EtcdDataDir,
|
|
"--name", EtcdName,
|
|
"--initial-advertise-peer-urls", EtcdAdvUrl,
|
|
"--listen-peer-urls", EtcdAdvUrl,
|
|
"--advertise-client-urls", EtcdClientUrl,
|
|
"--listen-client-urls", EtcdClientUrl,
|
|
"--initial-cluster", EtcdName ++ "=" ++ EtcdAdvUrl,
|
|
"--initial-cluster-state", "new",
|
|
"--initial-cluster-token", "test-token",
|
|
"--log-level", "debug", "--log-outputs", "stdout"],
|
|
EtcdPid = spawn(fun() -> do_start_etcd(Cmd) end),
|
|
|
|
EtcdEndpoints = [rabbit_misc:format("~s:~b", [EtcdHost, EtcdClientPort])],
|
|
Config1 = rabbit_ct_helpers:set_config(
|
|
Config,
|
|
[{etcd_pid, EtcdPid},
|
|
{etcd_endpoints, EtcdEndpoints}]),
|
|
|
|
#{level := Level} = logger:get_primary_config(),
|
|
logger:set_primary_config(level, critical),
|
|
try
|
|
wait_for_etcd(EtcdEndpoints),
|
|
Config1
|
|
catch
|
|
exit:{test_case_failed, _} ->
|
|
stop_etcd(Config1),
|
|
{skip, "Failed to start etcd"}
|
|
after
|
|
logger:set_primary_config(level, Level)
|
|
end.
|
|
|
|
do_start_etcd(Cmd) ->
|
|
case rabbit_ct_helpers:exec(Cmd) of
|
|
{ok, Stdout} ->
|
|
ct:pal("etcd daemon exited:~n~s", [Stdout]);
|
|
{error, Reason, Stdout} ->
|
|
ct:pal(
|
|
"etcd daemon exited with error ~0p:~n~s",
|
|
[Reason, Stdout])
|
|
end.
|
|
|
|
wait_for_etcd(EtcdEndpoints) ->
|
|
application:ensure_all_started(eetcd),
|
|
|
|
Timeout = 60000,
|
|
rabbit_ct_helpers:await_condition(
|
|
fun() ->
|
|
Ret = eetcd:open(test, EtcdEndpoints),
|
|
ct:pal("Ret = ~p", [Ret]),
|
|
case Ret of
|
|
{ok, _Pid} -> true;
|
|
_ -> false
|
|
end
|
|
end, Timeout),
|
|
|
|
Condition1 = fun() -> 1 =:= length(eetcd_conn_sup:info()) end,
|
|
try
|
|
rabbit_ct_helpers:await_condition(Condition1, Timeout)
|
|
after
|
|
eetcd:close(test)
|
|
end,
|
|
Condition2 = fun() -> 0 =:= length(eetcd_conn_sup:info()) end,
|
|
rabbit_ct_helpers:await_condition(Condition2, Timeout).
|
|
|
|
stop_etcd(Config) ->
|
|
case rabbit_ct_helpers:get_config(Config, etcd_pid) of
|
|
EtcdPid when is_pid(EtcdPid) ->
|
|
ct:pal(
|
|
"Stopping etcd daemon by killing control process ~p",
|
|
[EtcdPid]),
|
|
erlang:exit(EtcdPid, kill);
|
|
undefined ->
|
|
ok
|
|
end,
|
|
Config.
|
|
|
|
%%
|
|
%% Test cases
|
|
%%
|
|
|
|
init_opens_a_connection_test(Config) ->
|
|
Endpoints = ?config(etcd_endpoints, Config),
|
|
{ok, Pid} = start_client(Endpoints),
|
|
Condition = fun() ->
|
|
1 =:= length(eetcd_conn_sup:info())
|
|
end,
|
|
try
|
|
rabbit_ct_helpers:await_condition(Condition, 90000)
|
|
after
|
|
gen_statem:stop(Pid)
|
|
end,
|
|
?assertEqual(0, length(eetcd_conn_sup:info())).
|
|
|
|
|
|
registration_with_locking_test(Config) ->
|
|
Endpoints = ?config(etcd_endpoints, Config),
|
|
{ok, Pid} = start_client(Endpoints),
|
|
Condition1 = fun() ->
|
|
1 =:= length(eetcd_conn_sup:info())
|
|
end,
|
|
rabbit_ct_helpers:await_condition(Condition1, 90000),
|
|
|
|
{ok, LockOwnerKey} = rabbitmq_peer_discovery_etcd_v3_client:lock(Pid, node()),
|
|
rabbitmq_peer_discovery_etcd_v3_client:register(Pid),
|
|
?assertEqual(ok, rabbitmq_peer_discovery_etcd_v3_client:unlock(Pid, LockOwnerKey)),
|
|
|
|
Condition2 = fun() ->
|
|
case rabbitmq_peer_discovery_etcd_v3_client:list_nodes(Pid) of
|
|
[{_, N}] when N =:= node() ->
|
|
true;
|
|
_ ->
|
|
false
|
|
end
|
|
end,
|
|
try
|
|
rabbit_ct_helpers:await_condition(Condition2, 45000)
|
|
after
|
|
gen_statem:stop(Pid)
|
|
end.
|
|
|
|
start_one_member_at_a_time(Config) ->
|
|
Config1 = configure_peer_discovery(Config),
|
|
|
|
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config1, nodename),
|
|
lists:foreach(
|
|
fun(Node) ->
|
|
?assertEqual(
|
|
ok,
|
|
rabbit_ct_broker_helpers:start_node(Config1, Node))
|
|
end, Nodes),
|
|
|
|
assert_full_cluster(Config1).
|
|
|
|
start_members_concurrently(Config) ->
|
|
Config1 = configure_peer_discovery(Config),
|
|
|
|
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config1, nodename),
|
|
Parent = self(),
|
|
Pids = lists:map(
|
|
fun(Node) ->
|
|
spawn_link(
|
|
fun() ->
|
|
receive
|
|
go ->
|
|
?assertEqual(
|
|
ok,
|
|
rabbit_ct_broker_helpers:start_node(
|
|
Config1, Node)),
|
|
Parent ! started
|
|
end
|
|
end)
|
|
end, Nodes),
|
|
|
|
lists:foreach(fun(Pid) -> Pid ! go end, Pids),
|
|
lists:foreach(fun(_Pid) -> receive started -> ok end end, Pids),
|
|
|
|
assert_full_cluster(Config1).
|
|
|
|
configure_peer_discovery(Config) ->
|
|
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
|
lists:foreach(
|
|
fun(Node) ->
|
|
Members = lists:sort(
|
|
rabbit_ct_broker_helpers:cluster_members_online(
|
|
Config, Node)),
|
|
?assertEqual([Node], Members)
|
|
end, Nodes),
|
|
|
|
lists:foreach(
|
|
fun(Node) ->
|
|
?assertEqual(
|
|
ok,
|
|
rabbit_ct_broker_helpers:stop_broker(Config, Node)),
|
|
?assertEqual(
|
|
ok,
|
|
rabbit_ct_broker_helpers:reset_node(Config, Node)),
|
|
?assertEqual(
|
|
ok,
|
|
rabbit_ct_broker_helpers:stop_node(Config, Node))
|
|
end, Nodes),
|
|
|
|
Endpoints = ?config(etcd_endpoints, Config),
|
|
Config1 = rabbit_ct_helpers:merge_app_env(
|
|
Config,
|
|
{rabbit,
|
|
[{cluster_formation,
|
|
[{peer_discovery_backend, rabbit_peer_discovery_etcd},
|
|
{peer_discovery_etcd,
|
|
[{endpoints, Endpoints},
|
|
{etcd_prefix, "rabbitmq"},
|
|
{cluster_name, atom_to_list(?FUNCTION_NAME)}]}]}]}),
|
|
lists:foreach(
|
|
fun(Node) ->
|
|
?assertEqual(
|
|
ok,
|
|
rabbit_ct_broker_helpers:rewrite_node_config_file(
|
|
Config1, Node))
|
|
end, Nodes),
|
|
|
|
Config1.
|
|
|
|
assert_full_cluster(Config) ->
|
|
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
|
ExpectedMembers = lists:sort(Nodes),
|
|
lists:foreach(
|
|
fun(Node) ->
|
|
Members = lists:sort(
|
|
rabbit_ct_broker_helpers:cluster_members_online(
|
|
Config, Node)),
|
|
?assertEqual(ExpectedMembers, Members)
|
|
end, Nodes).
|
|
|
|
%%
|
|
%% Helpers
|
|
%%
|
|
|
|
start_client(Endpoints) ->
|
|
case rabbitmq_peer_discovery_etcd_v3_client:start(#{endpoints => Endpoints}) of
|
|
{ok, Pid} ->
|
|
{ok, Pid};
|
|
{error, {already_started, Pid}} ->
|
|
{ok, Pid}
|
|
end.
|