From 19a71d8d28d098a91f1536afccfa546b11eee73f Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Tue, 9 Jul 2024 17:06:12 +0200 Subject: [PATCH] rabbit_node_monitor: use a leader query for cluster members on node_added event If the membership hasn't been updated locally yet, the event is never generated --- deps/rabbit/BUILD.bazel | 8 ++ deps/rabbit/app.bzl | 9 ++ deps/rabbit/src/rabbit_db_cluster.erl | 14 +++ deps/rabbit/src/rabbit_khepri.erl | 8 +- deps/rabbit/src/rabbit_node_monitor.erl | 2 +- deps/rabbit/src/rabbit_nodes.erl | 10 +- deps/rabbit/test/clustering_events_SUITE.erl | 117 +++++++++++++++++++ 7 files changed, 165 insertions(+), 3 deletions(-) create mode 100644 deps/rabbit/test/clustering_events_SUITE.erl diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 42ea7ac7b8..7df4bb1793 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -328,6 +328,14 @@ rabbitmq_integration_suite( size = "medium", ) +rabbitmq_integration_suite( + name = "clustering_events_SUITE", + additional_beam = [ + ":test_event_recorder_beam", + ], + size = "medium", +) + rabbitmq_integration_suite( name = "quorum_queue_member_reconciliation_SUITE", size = "medium", diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index a943f47da2..44095b8a7d 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -831,6 +831,15 @@ def test_suite_beam_files(name = "test_suite_beam_files"): erlc_opts = "//:test_erlc_opts", deps = ["//deps/amqp_client:erlang_app"], ) + erlang_bytecode( + name = "clustering_events_SUITE_beam_files", + testonly = True, + srcs = ["test/clustering_events_SUITE.erl"], + outs = ["test/clustering_events_SUITE.beam"], + app_name = "rabbit", + erlc_opts = "//:test_erlc_opts", + deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"], + ) erlang_bytecode( name = "clustering_management_SUITE_beam_files", diff --git a/deps/rabbit/src/rabbit_db_cluster.erl b/deps/rabbit/src/rabbit_db_cluster.erl index 551291cd5b..dfb6e70324 100644 --- a/deps/rabbit/src/rabbit_db_cluster.erl +++ b/deps/rabbit/src/rabbit_db_cluster.erl @@ -17,6 +17,7 @@ -export([change_node_type/1]). -export([is_clustered/0, members/0, + consistent_members/0, disc_members/0, node_type/0, check_compatibility/1, @@ -306,6 +307,19 @@ members_using_khepri() -> %% so we need to allow callers to be more defensive in this case. rabbit_khepri:locally_known_nodes(). +-spec consistent_members() -> Members when + Members :: [node()]. +%% @doc Returns the list of cluster members. + +consistent_members() -> + case rabbit_khepri:get_feature_state() of + enabled -> consistent_members_using_khepri(); + _ -> members_using_mnesia() + end. + +consistent_members_using_khepri() -> + rabbit_khepri:nodes(). + -spec disc_members() -> Members when Members :: [node()]. %% @private diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index bb798465ab..a36189e947 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -414,7 +414,13 @@ post_add_member(JoiningNode, JoinedNode, Error) -> %% @private leave_cluster(Node) -> - retry_khepri_op(fun() -> remove_member(Node) end, 60). + case retry_khepri_op(fun() -> remove_member(Node) end, 60) of + ok -> + rabbit_event:notify(node_deleted, [{node, Node}]), + ok; + Any -> + Any + end. %% @private diff --git a/deps/rabbit/src/rabbit_node_monitor.erl b/deps/rabbit/src/rabbit_node_monitor.erl index 9c56f92916..2f5d5cbced 100644 --- a/deps/rabbit/src/rabbit_node_monitor.erl +++ b/deps/rabbit/src/rabbit_node_monitor.erl @@ -172,7 +172,7 @@ notify_node_up() -> notify_joined_cluster() -> NewMember = node(), - Nodes = alive_rabbit_nodes() -- [NewMember], + Nodes = alive_rabbit_nodes(rabbit_nodes:list_consistent_members()) -- [NewMember], gen_server:abcast(Nodes, ?SERVER, {joined_cluster, node(), rabbit_db_cluster:node_type()}), diff --git a/deps/rabbit/src/rabbit_nodes.erl b/deps/rabbit/src/rabbit_nodes.erl index 879d36cf9d..03c56afb17 100644 --- a/deps/rabbit/src/rabbit_nodes.erl +++ b/deps/rabbit/src/rabbit_nodes.erl @@ -15,7 +15,7 @@ is_running/2, is_process_running/2, cluster_name/0, set_cluster_name/1, set_cluster_name/2, ensure_epmd/0, all_running/0, - is_member/1, list_members/0, + is_member/1, list_members/0, list_consistent_members/0, filter_members/1, is_reachable/1, list_reachable/0, list_unreachable/0, filter_reachable/1, filter_unreachable/1, @@ -182,6 +182,14 @@ is_member(Node) when is_atom(Node) -> list_members() -> rabbit_db_cluster:members(). +-spec list_consistent_members() -> Nodes when + Nodes :: [node()]. +%% @doc Returns the list of nodes in the cluster as reported by the leader. +%% + +list_consistent_members() -> + rabbit_db_cluster:consistent_members(). + -spec filter_members(Nodes) -> Nodes when Nodes :: [node()]. %% @doc Filters the given list of nodes to only select those belonging to the diff --git a/deps/rabbit/test/clustering_events_SUITE.erl b/deps/rabbit/test/clustering_events_SUITE.erl new file mode 100644 index 0000000000..a12c0b5af4 --- /dev/null +++ b/deps/rabbit/test/clustering_events_SUITE.erl @@ -0,0 +1,117 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(clustering_events_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). + +-import(rabbit_ct_helpers, [eventually/3]). +-import(event_recorder, + [assert_event_type/2, + assert_event_prop/2]). + +-compile(export_all). + +all() -> + [ + {group, tests} + ]. + +groups() -> + [ + {tests, [], [ + node_added_event, + node_deleted_event + ]} + ]. + +%% ------------------------------------------------------------------- +%% Per Suite +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +%% +%% Per Group +%% + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +%% +%% Per Test Case +%% +init_per_testcase(node_added_event = TestCase, Config) -> + Config1 = configure_cluster_essentials(Config, TestCase, false), + Config2 = rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + rabbit_ct_helpers:testcase_started(Config2, TestCase); +init_per_testcase(node_deleted_event = TestCase, Config) -> + Config1 = configure_cluster_essentials(Config, TestCase, true), + Config2 = rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + rabbit_ct_helpers:testcase_started(Config2, TestCase). + +end_per_testcase(TestCase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, TestCase). + +%% +%% Helpers +%% +configure_cluster_essentials(Config, Group, Clustered) -> + rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Group}, + {rmq_nodes_count, 3}, + {rmq_nodes_clustered, Clustered} + ]). + +node_added_event(Config) -> + [Server1, Server2, _Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ok = event_recorder:start(Config), + join_cluster(Server2, Server1), + E = event_recorder:get_events(Config), + ok = event_recorder:stop(Config), + ?assert(lists:any(fun(#event{type = node_added}) -> + true; + (_) -> + false + end, E)). + +node_deleted_event(Config) -> + [Server1, Server2, _Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ok = event_recorder:start(Config), + ok = rabbit_ct_broker_helpers:stop_node(Config, Server2), + ok = rabbit_control_helper:command(forget_cluster_node, Server1, [atom_to_list(Server2)], + []), + E = event_recorder:get_events(Config), + ok = event_recorder:stop(Config), + ?assert(lists:any(fun(#event{type = node_deleted}) -> + true; + (_) -> + false + end, E)). + +join_cluster(Node, Cluster) -> + ok = rabbit_control_helper:command(stop_app, Node), + ok = rabbit_control_helper:command(join_cluster, Node, [atom_to_list(Cluster)], []), + rabbit_control_helper:command(start_app, Node).