feature_flags_SUITE: wait for cluster status instead of a fixed time
Extracted to clustering_utils.erl the utility functions to check cluster status
This commit is contained in:
parent
0b92354cde
commit
1b90263417
|
|
@ -345,6 +345,9 @@ rabbitmq_integration_suite(
|
|||
flaky = True,
|
||||
shard_count = 18,
|
||||
sharding_method = "case",
|
||||
additional_beam = [
|
||||
":test_clustering_utils_beam",
|
||||
],
|
||||
)
|
||||
|
||||
rabbitmq_integration_suite(
|
||||
|
|
@ -455,6 +458,9 @@ rabbitmq_integration_suite(
|
|||
runtime_deps = [
|
||||
"//deps/rabbit/test/feature_flags_SUITE_data/my_plugin:erlang_app",
|
||||
],
|
||||
additional_beam = [
|
||||
":test_clustering_utils_beam",
|
||||
],
|
||||
)
|
||||
|
||||
rabbitmq_integration_suite(
|
||||
|
|
|
|||
|
|
@ -840,6 +840,14 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
|
|||
erlc_opts = "//:test_erlc_opts",
|
||||
deps = ["//deps/amqp_client:erlang_app"],
|
||||
)
|
||||
erlang_bytecode(
|
||||
name = "test_clustering_utils_beam",
|
||||
testonly = True,
|
||||
srcs = ["test/clustering_utils.erl"],
|
||||
outs = ["test/clustering_utils.beam"],
|
||||
app_name = "rabbit",
|
||||
erlc_opts = "//:test_erlc_opts",
|
||||
)
|
||||
erlang_bytecode(
|
||||
name = "clustering_recovery_SUITE_beam_files",
|
||||
testonly = True,
|
||||
|
|
|
|||
|
|
@ -13,7 +13,11 @@
|
|||
|
||||
-compile(export_all).
|
||||
|
||||
-define(LOOP_RECURSION_DELAY, 100).
|
||||
-import(clustering_utils, [
|
||||
assert_cluster_status/2,
|
||||
assert_clustered/1,
|
||||
assert_not_clustered/1
|
||||
]).
|
||||
|
||||
all() ->
|
||||
[
|
||||
|
|
@ -707,85 +711,6 @@ pid_from_file(PidFile) ->
|
|||
cluster_members(Config) ->
|
||||
rabbit_ct_broker_helpers:get_node_configs(Config, nodename).
|
||||
|
||||
assert_cluster_status({All, Disc, Running}, Nodes) ->
|
||||
assert_cluster_status({All, Running, All, Disc, Running}, Nodes);
|
||||
assert_cluster_status(Status0, Nodes) ->
|
||||
Status = sort_cluster_status(Status0),
|
||||
AllNodes = case Status of
|
||||
{undef, undef, All, _, _} ->
|
||||
%% Support mixed-version clusters
|
||||
All;
|
||||
{All, _, _, _, _} ->
|
||||
All
|
||||
end,
|
||||
wait_for_cluster_status(Status, AllNodes, Nodes).
|
||||
|
||||
wait_for_cluster_status(Status, AllNodes, Nodes) ->
|
||||
Max = 10000 / ?LOOP_RECURSION_DELAY,
|
||||
wait_for_cluster_status(0, Max, Status, AllNodes, Nodes).
|
||||
|
||||
wait_for_cluster_status(N, Max, Status, _AllNodes, Nodes) when N >= Max ->
|
||||
erlang:error({cluster_status_max_tries_failed,
|
||||
[{nodes, Nodes},
|
||||
{expected_status, Status},
|
||||
{max_tried, Max},
|
||||
{status, sort_cluster_status(cluster_status(hd(Nodes)))}]});
|
||||
wait_for_cluster_status(N, Max, Status, AllNodes, Nodes) ->
|
||||
case lists:all(fun (Node) ->
|
||||
verify_status_equal(Node, Status, AllNodes)
|
||||
end, Nodes) of
|
||||
true -> ok;
|
||||
false -> timer:sleep(?LOOP_RECURSION_DELAY),
|
||||
wait_for_cluster_status(N + 1, Max, Status, AllNodes, Nodes)
|
||||
end.
|
||||
|
||||
verify_status_equal(Node, Status, AllNodes) ->
|
||||
NodeStatus = sort_cluster_status(cluster_status(Node)),
|
||||
IsClustered = case rpc:call(Node, rabbit_db_cluster, is_clustered, []) of
|
||||
{badrpc, {'EXIT', {undef, _}}} ->
|
||||
rpc:call(Node, rabbit_mnesia, is_clustered, []);
|
||||
Ret ->
|
||||
Ret
|
||||
end,
|
||||
(AllNodes =/= [Node]) =:= IsClustered andalso equal(Status, NodeStatus).
|
||||
|
||||
equal({_, _, A, B, C}, {undef, undef, A, B, C}) ->
|
||||
true;
|
||||
equal({_, _, _, _, _}, {undef, undef, _, _, _}) ->
|
||||
false;
|
||||
equal(Status0, Status1) ->
|
||||
Status0 == Status1.
|
||||
|
||||
cluster_status(Node) ->
|
||||
AllMembers = rpc:call(Node, rabbit_nodes, list_members, []),
|
||||
RunningMembers = rpc:call(Node, rabbit_nodes, list_running, []),
|
||||
|
||||
AllDbNodes = case rpc:call(Node, rabbit_db_cluster, members, []) of
|
||||
{badrpc, {'EXIT', {undef, _}}} ->
|
||||
rpc:call(Node, rabbit_mnesia, cluster_nodes, [all]);
|
||||
Ret ->
|
||||
Ret
|
||||
end,
|
||||
DiscDbNodes = rpc:call(Node, rabbit_mnesia, cluster_nodes, [disc]),
|
||||
RunningDbNodes = rpc:call(Node, rabbit_mnesia, cluster_nodes, [running]),
|
||||
|
||||
{AllMembers,
|
||||
RunningMembers,
|
||||
AllDbNodes,
|
||||
DiscDbNodes,
|
||||
RunningDbNodes}.
|
||||
|
||||
sort_cluster_status({{badrpc, {'EXIT', {undef, _}}}, {badrpc, {'EXIT', {undef, _}}}, AllM, DiscM, RunningM}) ->
|
||||
{undef, undef, lists:sort(AllM), lists:sort(DiscM), lists:sort(RunningM)};
|
||||
sort_cluster_status({All, Running, AllM, DiscM, RunningM}) ->
|
||||
{lists:sort(All), lists:sort(Running), lists:sort(AllM), lists:sort(DiscM), lists:sort(RunningM)}.
|
||||
|
||||
assert_clustered(Nodes) ->
|
||||
assert_cluster_status({Nodes, Nodes, Nodes, Nodes, Nodes}, Nodes).
|
||||
|
||||
assert_not_clustered(Node) ->
|
||||
assert_cluster_status({[Node], [Node], [Node], [Node], [Node]}, [Node]).
|
||||
|
||||
assert_failure(Fun) ->
|
||||
case catch Fun() of
|
||||
{error, _Code, Reason} -> Reason;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,95 @@
|
|||
%% This Source Code Form is subject to the terms of the Mozilla Public
|
||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
%%
|
||||
%% Copyright (c) 2018-2023 VMware, Inc. or its affiliates. All rights reserved.
|
||||
%%
|
||||
-module(clustering_utils).
|
||||
|
||||
-export([
|
||||
assert_cluster_status/2,
|
||||
assert_clustered/1,
|
||||
assert_not_clustered/1
|
||||
]).
|
||||
|
||||
-define(LOOP_RECURSION_DELAY, 100).
|
||||
|
||||
assert_cluster_status({All, Disc, Running}, Nodes) ->
|
||||
assert_cluster_status({All, Running, All, Disc, Running}, Nodes);
|
||||
assert_cluster_status(Status0, Nodes) ->
|
||||
Status = sort_cluster_status(Status0),
|
||||
AllNodes = case Status of
|
||||
{undef, undef, All, _, _} ->
|
||||
%% Support mixed-version clusters
|
||||
All;
|
||||
{All, _, _, _, _} ->
|
||||
All
|
||||
end,
|
||||
wait_for_cluster_status(Status, AllNodes, Nodes).
|
||||
|
||||
wait_for_cluster_status(Status, AllNodes, Nodes) ->
|
||||
Max = 10000 / ?LOOP_RECURSION_DELAY,
|
||||
wait_for_cluster_status(0, Max, Status, AllNodes, Nodes).
|
||||
|
||||
wait_for_cluster_status(N, Max, Status, _AllNodes, Nodes) when N >= Max ->
|
||||
erlang:error({cluster_status_max_tries_failed,
|
||||
[{nodes, Nodes},
|
||||
{expected_status, Status},
|
||||
{max_tried, Max},
|
||||
{status, sort_cluster_status(cluster_status(hd(Nodes)))}]});
|
||||
wait_for_cluster_status(N, Max, Status, AllNodes, Nodes) ->
|
||||
case lists:all(fun (Node) ->
|
||||
verify_status_equal(Node, Status, AllNodes)
|
||||
end, Nodes) of
|
||||
true -> ok;
|
||||
false -> timer:sleep(?LOOP_RECURSION_DELAY),
|
||||
wait_for_cluster_status(N + 1, Max, Status, AllNodes, Nodes)
|
||||
end.
|
||||
|
||||
verify_status_equal(Node, Status, AllNodes) ->
|
||||
NodeStatus = sort_cluster_status(cluster_status(Node)),
|
||||
IsClustered = case rpc:call(Node, rabbit_db_cluster, is_clustered, []) of
|
||||
{badrpc, {'EXIT', {undef, _}}} ->
|
||||
rpc:call(Node, rabbit_mnesia, is_clustered, []);
|
||||
Ret ->
|
||||
Ret
|
||||
end,
|
||||
(AllNodes =/= [Node]) =:= IsClustered andalso equal(Status, NodeStatus).
|
||||
|
||||
equal({_, _, A, B, C}, {undef, undef, A, B, C}) ->
|
||||
true;
|
||||
equal({_, _, _, _, _}, {undef, undef, _, _, _}) ->
|
||||
false;
|
||||
equal(Status0, Status1) ->
|
||||
Status0 == Status1.
|
||||
|
||||
cluster_status(Node) ->
|
||||
AllMembers = rpc:call(Node, rabbit_nodes, list_members, []),
|
||||
RunningMembers = rpc:call(Node, rabbit_nodes, list_running, []),
|
||||
|
||||
AllDbNodes = case rpc:call(Node, rabbit_db_cluster, members, []) of
|
||||
{badrpc, {'EXIT', {undef, _}}} ->
|
||||
rpc:call(Node, rabbit_mnesia, cluster_nodes, [all]);
|
||||
Ret ->
|
||||
Ret
|
||||
end,
|
||||
DiscDbNodes = rpc:call(Node, rabbit_mnesia, cluster_nodes, [disc]),
|
||||
RunningDbNodes = rpc:call(Node, rabbit_mnesia, cluster_nodes, [running]),
|
||||
|
||||
{AllMembers,
|
||||
RunningMembers,
|
||||
AllDbNodes,
|
||||
DiscDbNodes,
|
||||
RunningDbNodes}.
|
||||
|
||||
sort_cluster_status({{badrpc, {'EXIT', {undef, _}}}, {badrpc, {'EXIT', {undef, _}}}, AllM, DiscM, RunningM}) ->
|
||||
{undef, undef, lists:sort(AllM), lists:sort(DiscM), lists:sort(RunningM)};
|
||||
sort_cluster_status({All, Running, AllM, DiscM, RunningM}) ->
|
||||
{lists:sort(All), lists:sort(Running), lists:sort(AllM), lists:sort(DiscM), lists:sort(RunningM)}.
|
||||
|
||||
assert_clustered(Nodes) ->
|
||||
assert_cluster_status({Nodes, Nodes, Nodes, Nodes, Nodes}, Nodes).
|
||||
|
||||
assert_not_clustered(Node) ->
|
||||
assert_cluster_status({[Node], [Node], [Node], [Node], [Node]}, [Node]).
|
||||
|
||||
|
|
@ -937,8 +937,8 @@ do_enable_feature_flag_when_ff_file_is_unwritable(Config) ->
|
|||
enable_feature_flag_with_a_network_partition(Config) ->
|
||||
FeatureName = ff_from_testsuite,
|
||||
ClusterSize = ?config(rmq_nodes_count, Config),
|
||||
[A, B, C, D, E] = rabbit_ct_broker_helpers:get_node_configs(
|
||||
Config, nodename),
|
||||
[A, B, C, D, E] = All = rabbit_ct_broker_helpers:get_node_configs(
|
||||
Config, nodename),
|
||||
True = lists:duplicate(ClusterSize, true),
|
||||
False = lists:duplicate(ClusterSize, false),
|
||||
|
||||
|
|
@ -958,7 +958,9 @@ enable_feature_flag_with_a_network_partition(Config) ->
|
|||
{E, C},
|
||||
{E, D}],
|
||||
block(NodePairs),
|
||||
timer:sleep(1000),
|
||||
|
||||
%% Wait for the network partition to happen
|
||||
clustering_utils:assert_cluster_status({All, All, [A, C, D]}, [A, C, D]),
|
||||
|
||||
%% Enabling the feature flag should fail in the specific case of
|
||||
%% `ff_from_testsuite', if the network is broken.
|
||||
|
|
@ -971,11 +973,11 @@ enable_feature_flag_with_a_network_partition(Config) ->
|
|||
|
||||
%% Repair the network and try again to enable the feature flag.
|
||||
unblock(NodePairs),
|
||||
timer:sleep(10000),
|
||||
[?assertEqual(ok, rabbit_ct_broker_helpers:stop_node(Config, N))
|
||||
|| N <- [A, C, D]],
|
||||
[?assertEqual(ok, rabbit_ct_broker_helpers:start_node(Config, N))
|
||||
|| N <- [A, C, D]],
|
||||
clustering_utils:assert_cluster_status({All, All, All}, All),
|
||||
declare_arbitrary_feature_flag(Config),
|
||||
|
||||
%% Enabling the feature flag works.
|
||||
|
|
@ -1011,7 +1013,7 @@ mark_feature_flag_as_enabled_with_a_network_partition(Config) ->
|
|||
{B, D},
|
||||
{B, E}],
|
||||
block(NodePairs),
|
||||
timer:sleep(1000),
|
||||
clustering_utils:assert_cluster_status({AllNodes, AllNodes, [A, C, D, E]}, [A, C, D, E]),
|
||||
|
||||
%% Mark the feature flag as enabled on all nodes from node B. This
|
||||
%% is expected to timeout.
|
||||
|
|
|
|||
Loading…
Reference in New Issue