rabbit_db: Move generic init steps from `rabbit_mnesia`
[Why] When a single node or a cluster is initialized, we go through a few steps which are not Mnesia-specific or even related. Things such as synchronizing feature flag states or emit a "cluster joined" notification with `rabbit_node_monitor`. When we will introduce Khepri, we will have to go through the same generic steps. Therefore, it makes sense to drive those steps from `rabbit_db` and `rabbit_db_cluster` and only call into `rabbit_mnesia` when needed. [How] The generic code is moved from `rabbit_mnesia` to `rabbit_db*` and `rabbit_peer_discovery` modules.
This commit is contained in:
parent
9c358dd9f3
commit
eb6327a09a
|
|
@ -42,12 +42,23 @@ init() ->
|
|||
?LOG_DEBUG(
|
||||
"DB: this node is virgin: ~ts", [IsVirgin],
|
||||
#{domain => ?RMQLOG_DOMAIN_DB}),
|
||||
|
||||
ensure_dir_exists(),
|
||||
case init_using_mnesia() of
|
||||
rabbit_peer_discovery:log_configured_backend(),
|
||||
rabbit_peer_discovery:maybe_init(),
|
||||
|
||||
pre_init(IsVirgin),
|
||||
|
||||
Ret = run(
|
||||
#{mnesia => fun init_using_mnesia/0}),
|
||||
case Ret of
|
||||
ok ->
|
||||
?LOG_DEBUG(
|
||||
"DB: initialization successeful",
|
||||
#{domain => ?RMQLOG_DOMAIN_DB}),
|
||||
|
||||
post_init(IsVirgin),
|
||||
|
||||
ok;
|
||||
Error ->
|
||||
?LOG_DEBUG(
|
||||
|
|
@ -56,6 +67,17 @@ init() ->
|
|||
Error
|
||||
end.
|
||||
|
||||
pre_init(IsVirgin) ->
|
||||
Members = rabbit_db_cluster:members(),
|
||||
OtherMembers = rabbit_nodes:nodes_excl_me(Members),
|
||||
rabbit_db_cluster:ensure_feature_flags_are_in_sync(OtherMembers, IsVirgin).
|
||||
|
||||
post_init(false = _IsVirgin) ->
|
||||
rabbit_peer_discovery:maybe_register();
|
||||
post_init(true = _IsVirgin) ->
|
||||
%% Registration handled by rabbit_peer_discovery.
|
||||
ok.
|
||||
|
||||
init_using_mnesia() ->
|
||||
?LOG_DEBUG(
|
||||
"DB: initialize Mnesia",
|
||||
|
|
@ -115,8 +137,11 @@ force_load_on_next_boot_using_mnesia() ->
|
|||
%% @see is_virgin_node/1.
|
||||
|
||||
is_virgin_node() ->
|
||||
ThisNode = node(),
|
||||
is_virgin_node(ThisNode).
|
||||
run(
|
||||
#{mnesia => fun is_virgin_node_using_mnesia/0}).
|
||||
|
||||
is_virgin_node_using_mnesia() ->
|
||||
rabbit_mnesia:is_virgin_node().
|
||||
|
||||
-spec is_virgin_node(Node) -> IsVirgin | undefined when
|
||||
Node :: node(),
|
||||
|
|
@ -129,14 +154,11 @@ is_virgin_node() ->
|
|||
%% @returns `true' if the node is virgin, `false' if it is not, or `undefined'
|
||||
%% if the given node is remote and we couldn't determine it.
|
||||
|
||||
is_virgin_node(Node) when Node =:= node() ->
|
||||
is_virgin_node();
|
||||
is_virgin_node(Node) when is_atom(Node) ->
|
||||
is_virgin_node_with_mnesia(Node).
|
||||
|
||||
is_virgin_node_with_mnesia(Node) when Node =:= node() ->
|
||||
rabbit_mnesia:is_virgin_node();
|
||||
is_virgin_node_with_mnesia(Node) ->
|
||||
try
|
||||
erpc:call(Node, rabbit_mnesia, is_virgin_node, [], ?TIMEOUT)
|
||||
erpc:call(Node, ?MODULE, is_virgin_node, [], ?TIMEOUT)
|
||||
catch
|
||||
_:_ ->
|
||||
undefined
|
||||
|
|
@ -149,7 +171,8 @@ is_virgin_node_with_mnesia(Node) ->
|
|||
%% @returns the directory path.
|
||||
|
||||
dir() ->
|
||||
mnesia_dir().
|
||||
run(
|
||||
#{mnesia => fun mnesia_dir/0}).
|
||||
|
||||
mnesia_dir() ->
|
||||
rabbit_mnesia:dir().
|
||||
|
|
@ -183,9 +206,9 @@ ensure_dir_exists() ->
|
|||
run(Funs)
|
||||
when is_map(Funs) andalso is_map_key(mnesia, Funs) ->
|
||||
#{mnesia := MnesiaFun} = Funs,
|
||||
run_with_mnesia(MnesiaFun).
|
||||
run_using_mnesia(MnesiaFun).
|
||||
|
||||
run_with_mnesia(Fun) ->
|
||||
run_using_mnesia(Fun) ->
|
||||
Fun().
|
||||
|
||||
list_in_mnesia(Table, Match) ->
|
||||
|
|
|
|||
|
|
@ -11,7 +11,8 @@
|
|||
|
||||
-include_lib("rabbit_common/include/logging.hrl").
|
||||
|
||||
-export([join/2,
|
||||
-export([ensure_feature_flags_are_in_sync/2,
|
||||
join/2,
|
||||
forget_member/2]).
|
||||
-export([change_node_type/1]).
|
||||
-export([is_clustered/0,
|
||||
|
|
@ -36,6 +37,14 @@
|
|||
%% Cluster formation.
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
ensure_feature_flags_are_in_sync(Nodes, NodeIsVirgin) ->
|
||||
Ret = rabbit_feature_flags:sync_feature_flags_with_cluster(
|
||||
Nodes, NodeIsVirgin),
|
||||
case Ret of
|
||||
ok -> ok;
|
||||
{error, Reason} -> throw({error, {incompatible_feature_flags, Reason}})
|
||||
end.
|
||||
|
||||
-spec join(RemoteNode, NodeType) -> Ret when
|
||||
RemoteNode :: node(),
|
||||
NodeType :: rabbit_db_cluster:node_type(),
|
||||
|
|
|
|||
|
|
@ -99,8 +99,6 @@ init() ->
|
|||
rabbit_log:info("Node database directory at ~ts is empty. "
|
||||
"Assuming we need to join an existing cluster or initialise from scratch...",
|
||||
[dir()]),
|
||||
rabbit_peer_discovery:log_configured_backend(),
|
||||
rabbit_peer_discovery:maybe_init(),
|
||||
rabbit_peer_discovery:maybe_create_cluster(
|
||||
fun create_cluster_callback/2);
|
||||
false ->
|
||||
|
|
@ -115,9 +113,7 @@ init() ->
|
|||
true ->
|
||||
init_db_and_upgrade(cluster_nodes(all), NodeType,
|
||||
NodeType =:= ram, _Retry = true)
|
||||
end,
|
||||
rabbit_peer_discovery:maybe_init(),
|
||||
rabbit_peer_discovery:maybe_register()
|
||||
end
|
||||
end,
|
||||
%% We intuitively expect the global name server to be synced when
|
||||
%% Mnesia is up. In fact that's not guaranteed to be the case -
|
||||
|
|
@ -132,7 +128,6 @@ create_cluster_callback(none, NodeType) ->
|
|||
true -> NodeType
|
||||
end,
|
||||
init_db_and_upgrade(DiscNodes, NodeType1, true, _Retry = true),
|
||||
rabbit_node_monitor:notify_joined_cluster(),
|
||||
ok;
|
||||
create_cluster_callback(RemoteNode, NodeType) ->
|
||||
{ok, {_, DiscNodes, _}} = discover_cluster0(RemoteNode),
|
||||
|
|
@ -141,7 +136,6 @@ create_cluster_callback(RemoteNode, NodeType) ->
|
|||
true -> NodeType
|
||||
end,
|
||||
init_db_and_upgrade(DiscNodes, NodeType1, true, _Retry = true),
|
||||
rabbit_node_monitor:notify_joined_cluster(),
|
||||
ok.
|
||||
|
||||
%% Make the node join a cluster. The node will be reset automatically
|
||||
|
|
@ -517,23 +511,6 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
|
|||
|
||||
NodeIsVirgin = is_virgin_node(),
|
||||
rabbit_log:debug("Does data directory looks like that of a blank (uninitialised) node? ~tp", [NodeIsVirgin]),
|
||||
%% We want to synchronize feature flags first before we wait for
|
||||
%% tables (which is needed to ensure the local view of the tables
|
||||
%% matches the rest of the cluster). The reason is that some
|
||||
%% feature flags may add or remove tables. In this case the list
|
||||
%% of tables returned by `rabbit_table:definitions()' usually
|
||||
%% depends on the state of feature flags but this state is local.
|
||||
%%
|
||||
%% For instance, a feature flag may remove a table (so it's gone
|
||||
%% from the cluster). If we were to wait for that table locally
|
||||
%% before synchronizing feature flags, we would wait forever;
|
||||
%% indeed the feature flag being disabled before sync,
|
||||
%% `rabbit_table:definitions()' would return the old table.
|
||||
%%
|
||||
%% Feature flags need to be synced before any change to Mnesia
|
||||
%% membership. If enabling feature flags fails, Mnesia could remain
|
||||
%% in an inconsistent state that prevents later joining the nodes.
|
||||
ensure_feature_flags_are_in_sync(rabbit_nodes:nodes_excl_me(ClusterNodes), NodeIsVirgin),
|
||||
Nodes = change_extra_db_nodes(ClusterNodes, CheckOtherNodes),
|
||||
%% Note that we use `system_info' here and not the cluster status
|
||||
%% since when we start rabbit for the first time the cluster
|
||||
|
|
@ -623,14 +600,6 @@ ensure_mnesia_not_running() ->
|
|||
throw({error, mnesia_unexpectedly_running})
|
||||
end.
|
||||
|
||||
ensure_feature_flags_are_in_sync(Nodes, NodeIsVirgin) ->
|
||||
Ret = rabbit_feature_flags:sync_feature_flags_with_cluster(
|
||||
Nodes, NodeIsVirgin),
|
||||
case Ret of
|
||||
ok -> ok;
|
||||
{error, Reason} -> throw({error, {incompatible_feature_flags, Reason}})
|
||||
end.
|
||||
|
||||
ensure_schema_integrity() ->
|
||||
case rabbit_table:check_schema_integrity(_Retry = true) of
|
||||
ok ->
|
||||
|
|
|
|||
|
|
@ -259,7 +259,7 @@ join_discovered_peers_with_retries(
|
|||
"Starting as a blank standalone node...",
|
||||
[string:join(lists:map(fun atom_to_list/1, TryNodes), ",")],
|
||||
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
|
||||
CreateClusterCallback(none, disc);
|
||||
init_single_node(CreateClusterCallback);
|
||||
join_discovered_peers_with_retries(
|
||||
TryNodes, NodeType, RetriesLeft, DelayInterval, CreateClusterCallback) ->
|
||||
case find_reachable_peer_to_cluster_with(TryNodes) of
|
||||
|
|
@ -268,7 +268,7 @@ join_discovered_peers_with_retries(
|
|||
"Peer discovery: Node '~ts' selected for auto-clustering",
|
||||
[Node],
|
||||
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
|
||||
CreateClusterCallback(Node, NodeType);
|
||||
create_cluster(Node, NodeType, CreateClusterCallback);
|
||||
none ->
|
||||
RetriesLeft1 = RetriesLeft - 1,
|
||||
?LOG_INFO(
|
||||
|
|
@ -298,6 +298,33 @@ find_reachable_peer_to_cluster_with([Node | Nodes]) when Node =/= node() ->
|
|||
find_reachable_peer_to_cluster_with([Node | Nodes]) when Node =:= node() ->
|
||||
find_reachable_peer_to_cluster_with(Nodes).
|
||||
|
||||
init_single_node(CreateClusterCallback) ->
|
||||
IsVirgin = rabbit_db:is_virgin_node(),
|
||||
rabbit_db_cluster:ensure_feature_flags_are_in_sync([], IsVirgin),
|
||||
CreateClusterCallback(none, disc),
|
||||
ok.
|
||||
|
||||
create_cluster(RemoteNode, NodeType, CreateClusterCallback) ->
|
||||
%% We want to synchronize feature flags first before we update the cluster
|
||||
%% membership. This is needed to ensure the local list of Mnesia tables
|
||||
%% matches the rest of the cluster for example, in case a feature flag
|
||||
%% adds or removes tables.
|
||||
%%
|
||||
%% For instance, a feature flag may remove a table (so it's gone from the
|
||||
%% cluster). If we were to wait for that table locally before
|
||||
%% synchronizing feature flags, we would wait forever; indeed the feature
|
||||
%% flag being disabled before sync, `rabbit_table:definitions()' would
|
||||
%% return the old table.
|
||||
%%
|
||||
%% Feature flags need to be synced before any change to Mnesia membership.
|
||||
%% If enabling feature flags fails, Mnesia could remain in an inconsistent
|
||||
%% state that prevents later joining the nodes.
|
||||
IsVirgin = rabbit_db:is_virgin_node(),
|
||||
rabbit_db_cluster:ensure_feature_flags_are_in_sync([RemoteNode], IsVirgin),
|
||||
CreateClusterCallback(RemoteNode, NodeType),
|
||||
rabbit_node_monitor:notify_joined_cluster(),
|
||||
ok.
|
||||
|
||||
%% This module doesn't currently sanity-check the return value of
|
||||
%% `Backend:list_nodes()`. Therefore, it could return something invalid:
|
||||
%% thus the `{œk, any()} in the spec.
|
||||
|
|
|
|||
Loading…
Reference in New Issue