rabbit_node_monitor: use a leader query for cluster members on node_added event

If the membership hasn't been updated locally yet, the event is never generated
This commit is contained in:
Diana Parra Corbacho 2024-07-09 17:06:12 +02:00
parent 82d6709f91
commit 19a71d8d28
7 changed files with 165 additions and 3 deletions

View File

@ -328,6 +328,14 @@ rabbitmq_integration_suite(
size = "medium",
)
rabbitmq_integration_suite(
name = "clustering_events_SUITE",
additional_beam = [
":test_event_recorder_beam",
],
size = "medium",
)
rabbitmq_integration_suite(
name = "quorum_queue_member_reconciliation_SUITE",
size = "medium",

9
deps/rabbit/app.bzl vendored
View File

@ -831,6 +831,15 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "clustering_events_SUITE_beam_files",
testonly = True,
srcs = ["test/clustering_events_SUITE.erl"],
outs = ["test/clustering_events_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "clustering_management_SUITE_beam_files",

View File

@ -17,6 +17,7 @@
-export([change_node_type/1]).
-export([is_clustered/0,
members/0,
consistent_members/0,
disc_members/0,
node_type/0,
check_compatibility/1,
@ -306,6 +307,19 @@ members_using_khepri() ->
%% so we need to allow callers to be more defensive in this case.
rabbit_khepri:locally_known_nodes().
-spec consistent_members() -> Members when
Members :: [node()].
%% @doc Returns the list of cluster members.
consistent_members() ->
case rabbit_khepri:get_feature_state() of
enabled -> consistent_members_using_khepri();
_ -> members_using_mnesia()
end.
consistent_members_using_khepri() ->
rabbit_khepri:nodes().
-spec disc_members() -> Members when
Members :: [node()].
%% @private

View File

@ -414,7 +414,13 @@ post_add_member(JoiningNode, JoinedNode, Error) ->
%% @private
leave_cluster(Node) ->
retry_khepri_op(fun() -> remove_member(Node) end, 60).
case retry_khepri_op(fun() -> remove_member(Node) end, 60) of
ok ->
rabbit_event:notify(node_deleted, [{node, Node}]),
ok;
Any ->
Any
end.
%% @private

View File

@ -172,7 +172,7 @@ notify_node_up() ->
notify_joined_cluster() ->
NewMember = node(),
Nodes = alive_rabbit_nodes() -- [NewMember],
Nodes = alive_rabbit_nodes(rabbit_nodes:list_consistent_members()) -- [NewMember],
gen_server:abcast(Nodes, ?SERVER,
{joined_cluster, node(), rabbit_db_cluster:node_type()}),

View File

@ -15,7 +15,7 @@
is_running/2, is_process_running/2,
cluster_name/0, set_cluster_name/1, set_cluster_name/2, ensure_epmd/0,
all_running/0,
is_member/1, list_members/0,
is_member/1, list_members/0, list_consistent_members/0,
filter_members/1,
is_reachable/1, list_reachable/0, list_unreachable/0,
filter_reachable/1, filter_unreachable/1,
@ -182,6 +182,14 @@ is_member(Node) when is_atom(Node) ->
list_members() ->
rabbit_db_cluster:members().
-spec list_consistent_members() -> Nodes when
Nodes :: [node()].
%% @doc Returns the list of nodes in the cluster as reported by the leader.
%%
list_consistent_members() ->
rabbit_db_cluster:consistent_members().
-spec filter_members(Nodes) -> Nodes when
Nodes :: [node()].
%% @doc Filters the given list of nodes to only select those belonging to the

View File

@ -0,0 +1,117 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(clustering_events_SUITE).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-import(rabbit_ct_helpers, [eventually/3]).
-import(event_recorder,
[assert_event_type/2,
assert_event_prop/2]).
-compile(export_all).
all() ->
[
{group, tests}
].
groups() ->
[
{tests, [], [
node_added_event,
node_deleted_event
]}
].
%% -------------------------------------------------------------------
%% Per Suite
%% -------------------------------------------------------------------
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
%%
%% Per Group
%%
init_per_group(_, Config) ->
Config.
end_per_group(_, Config) ->
Config.
%%
%% Per Test Case
%%
init_per_testcase(node_added_event = TestCase, Config) ->
Config1 = configure_cluster_essentials(Config, TestCase, false),
Config2 = rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
rabbit_ct_helpers:testcase_started(Config2, TestCase);
init_per_testcase(node_deleted_event = TestCase, Config) ->
Config1 = configure_cluster_essentials(Config, TestCase, true),
Config2 = rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
rabbit_ct_helpers:testcase_started(Config2, TestCase).
end_per_testcase(TestCase, Config) ->
Config1 = rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config1, TestCase).
%%
%% Helpers
%%
configure_cluster_essentials(Config, Group, Clustered) ->
rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Group},
{rmq_nodes_count, 3},
{rmq_nodes_clustered, Clustered}
]).
node_added_event(Config) ->
[Server1, Server2, _Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ok = event_recorder:start(Config),
join_cluster(Server2, Server1),
E = event_recorder:get_events(Config),
ok = event_recorder:stop(Config),
?assert(lists:any(fun(#event{type = node_added}) ->
true;
(_) ->
false
end, E)).
node_deleted_event(Config) ->
[Server1, Server2, _Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ok = event_recorder:start(Config),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
ok = rabbit_control_helper:command(forget_cluster_node, Server1, [atom_to_list(Server2)],
[]),
E = event_recorder:get_events(Config),
ok = event_recorder:stop(Config),
?assert(lists:any(fun(#event{type = node_deleted}) ->
true;
(_) ->
false
end, E)).
join_cluster(Node, Cluster) ->
ok = rabbit_control_helper:command(stop_app, Node),
ok = rabbit_control_helper:command(join_cluster, Node, [atom_to_list(Cluster)], []),
rabbit_control_helper:command(start_app, Node).