rabbit_nodes: Add list functions to clarify which nodes we are interested in
So far, we had the following functions to list nodes in a RabbitMQ cluster: * `rabbit_mnesia:cluster_nodes/1` to get members of the Mnesia cluster; the argument was used to select members (all members or only those running Mnesia and participating in the cluster) * `rabbit_nodes:all/0` to get all members of the Mnesia cluster * `rabbit_nodes:all_running/0` to get all members who currently run Mnesia Basically: * `rabbit_nodes:all/0` calls `rabbit_mnesia:cluster_nodes(all)` * `rabbit_nodes:all_running/0` calls `rabbit_mnesia:cluster_nodes(running)` We also have: * `rabbit_node_monitor:alive_nodes/1` which filters the given list of nodes to only select those currently running Mnesia * `rabbit_node_monitor:alive_rabbit_nodes/1` which filters the given list of nodes to only select those currently running RabbitMQ Most of the code uses `rabbit_mnesia:cluster_nodes/1` or the `rabbit_nodes:all*/0` functions. `rabbit_mnesia:cluster_nodes(running)` or `rabbit_nodes:all_running/0` is often used as a close approximation of "all cluster members running RabbitMQ". This list might be incorrect in times where a node is joining the clustered or is being worked on (i.e. Mnesia is running but not RabbitMQ). With Khepri, there won't be the same possible approximation because we will try to keep Khepri/Ra running even if RabbitMQ is stopped to expand/shrink the cluster. So in order to clarify what we want when we query a list of nodes, this patch introduces the following functions: * `rabbit_nodes:list_members/0` to get all cluster members, regardless of their state * `rabbit_nodes:list_reachable/0` to get all cluster members we can reach using Erlang distribution, regardless of the state of RabbitMQ * `rabbit_nodes:list_running/0` to get all cluster members who run RabbitMQ, regardless of the maintenance state * `rabbit_nodes:list_serving/0` to get all cluster members who run RabbitMQ and are accepting clients In addition to the list functions, there are the corresponding `rabbit_nodes:is_*(Node)` checks and `rabbit_nodes:filter_*(Nodes)` filtering functions. The code is modified to use these new functions. One possible significant change is that the new list functions will perform RPC calls to query the nodes' state, unlike `rabbit_mnesia:cluster_nodes(running)`.
This commit is contained in:
parent
35cf51b506
commit
d65637190a
|
@ -379,7 +379,7 @@ rebalance(Type, VhostSpec, QueueSpec) ->
|
|||
maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
|
||||
rabbit_log:info("Starting queue rebalance operation: '~ts' for vhosts matching '~ts' and queues matching '~ts'",
|
||||
[Type, VhostSpec, QueueSpec]),
|
||||
Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running()),
|
||||
Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:list_running()),
|
||||
NumRunning = length(Running),
|
||||
ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
|
||||
filter_per_type(Type, Q),
|
||||
|
@ -1056,7 +1056,7 @@ check_queue_type(_Val, _Args) ->
|
|||
|
||||
list() ->
|
||||
All = rabbit_db_queue:get_all(),
|
||||
NodesRunning = rabbit_nodes:all_running(),
|
||||
NodesRunning = rabbit_nodes:list_running(),
|
||||
lists:filter(fun (Q) ->
|
||||
Pid = amqqueue:get_pid(Q),
|
||||
St = amqqueue:get_state(Q),
|
||||
|
@ -1238,7 +1238,7 @@ is_in_virtual_host(Q, VHostName) ->
|
|||
-spec list(vhost:name()) -> [amqqueue:amqqueue()].
|
||||
list(VHostPath) ->
|
||||
All = rabbit_db_queue:get_all(VHostPath),
|
||||
NodesRunning = rabbit_nodes:all_running(),
|
||||
NodesRunning = rabbit_nodes:list_running(),
|
||||
lists:filter(fun (Q) ->
|
||||
Pid = amqqueue:get_pid(Q),
|
||||
St = amqqueue:get_state(Q),
|
||||
|
@ -1252,7 +1252,7 @@ list_down(VHostPath) ->
|
|||
false -> [];
|
||||
true ->
|
||||
Alive = sets:from_list([amqqueue:get_name(Q) || Q <- list(VHostPath)]),
|
||||
NodesRunning = rabbit_nodes:all_running(),
|
||||
NodesRunning = rabbit_nodes:list_running(),
|
||||
rabbit_db_queue:filter_all_durable(
|
||||
fun (Q) ->
|
||||
N = amqqueue:get_name(Q),
|
||||
|
@ -1356,7 +1356,7 @@ emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) ->
|
|||
rabbit_control_misc:await_emitters_termination(Pids).
|
||||
|
||||
collect_info_all(VHostPath, Items) ->
|
||||
Nodes = rabbit_nodes:all_running(),
|
||||
Nodes = rabbit_nodes:list_running(),
|
||||
Ref = make_ref(),
|
||||
Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, self()]) || Node <- Nodes ],
|
||||
rabbit_control_misc:await_emitters_termination(Pids),
|
||||
|
@ -1744,10 +1744,8 @@ forget_node_for_queue(DeadNode, [H|T], Q) when ?is_amqqueue(Q) ->
|
|||
node_permits_offline_promotion(Node) ->
|
||||
case node() of
|
||||
Node -> not rabbit:is_running(); %% [1]
|
||||
_ -> All = rabbit_nodes:all(),
|
||||
Running = rabbit_nodes:all_running(),
|
||||
lists:member(Node, All) andalso
|
||||
not lists:member(Node, Running) %% [2]
|
||||
_ -> NotRunning = rabbit_nodes:list_not_running(),
|
||||
lists:member(Node, NotRunning) %% [2]
|
||||
end.
|
||||
%% [1] In this case if we are a real running node (i.e. rabbitmqctl
|
||||
%% has RPCed into us) then we cannot allow promotion. If on the other
|
||||
|
|
|
@ -145,7 +145,7 @@ enabled() ->
|
|||
end.
|
||||
|
||||
leader() ->
|
||||
[Leader | _] = lists:usort(rabbit_nodes:all()),
|
||||
[Leader | _] = lists:usort(rabbit_nodes:list_members()),
|
||||
Leader.
|
||||
|
||||
%% This is the winner receiving its last notification that a node has
|
||||
|
@ -411,7 +411,7 @@ partition_value(Partition) ->
|
|||
%% only know which nodes we have been partitioned from, not which
|
||||
%% nodes are partitioned from each other.
|
||||
check_other_nodes(LocalPartitions) ->
|
||||
Nodes = rabbit_nodes:all(),
|
||||
Nodes = rabbit_nodes:list_members(),
|
||||
{Results, Bad} = rabbit_node_monitor:status(Nodes -- [node()]),
|
||||
RemotePartitions = [{Node, proplists:get_value(partitions, Res)}
|
||||
|| {Node, Res} <- Results],
|
||||
|
|
|
@ -369,7 +369,7 @@ send_drained(Pid, CTagCredit) ->
|
|||
-spec list() -> [pid()].
|
||||
|
||||
list() ->
|
||||
Nodes = rabbit_nodes:all_running(),
|
||||
Nodes = rabbit_nodes:list_running(),
|
||||
rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_channel, list_local, [], ?RPC_TIMEOUT).
|
||||
|
||||
-spec list_local() -> [pid()].
|
||||
|
|
|
@ -262,7 +262,7 @@ list() ->
|
|||
lists:foldl(
|
||||
fun (Node, Acc) ->
|
||||
Acc ++ list_on_node(Node)
|
||||
end, [], rabbit_nodes:all_running()).
|
||||
end, [], rabbit_nodes:list_running()).
|
||||
|
||||
-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()].
|
||||
|
||||
|
@ -307,7 +307,7 @@ list_on_node_mnesia(Node) ->
|
|||
#tracked_channel{_ = '_'})
|
||||
catch exit:{aborted, {no_exists, _}} ->
|
||||
%% The table might not exist yet (or is already gone)
|
||||
%% between the time rabbit_nodes:all_running() runs and
|
||||
%% between the time rabbit_nodes:list_running() runs and
|
||||
%% returns a specific node, and
|
||||
%% mnesia:dirty_match_object() is called for that node's
|
||||
%% table.
|
||||
|
|
|
@ -433,7 +433,7 @@ get_all_tracked_connection_table_names_for_node(Node) ->
|
|||
-spec lookup(rabbit_types:connection_name()) -> rabbit_types:tracked_connection() | 'not_found'.
|
||||
|
||||
lookup(Name) ->
|
||||
Nodes = rabbit_nodes:all_running(),
|
||||
Nodes = rabbit_nodes:list_running(),
|
||||
lookup(Name, Nodes).
|
||||
|
||||
lookup(_, []) ->
|
||||
|
@ -468,7 +468,7 @@ list() ->
|
|||
lists:foldl(
|
||||
fun (Node, Acc) ->
|
||||
Acc ++ list_on_node(Node)
|
||||
end, [], rabbit_nodes:all_running()).
|
||||
end, [], rabbit_nodes:list_running()).
|
||||
|
||||
-spec count() -> non_neg_integer().
|
||||
|
||||
|
@ -476,7 +476,7 @@ count() ->
|
|||
lists:foldl(
|
||||
fun (Node, Acc) ->
|
||||
count_on_node(Node) + Acc
|
||||
end, 0, rabbit_nodes:all_running()).
|
||||
end, 0, rabbit_nodes:list_running()).
|
||||
|
||||
count_on_node(Node) ->
|
||||
case rabbit_feature_flags:is_enabled(tracking_records_in_ets) of
|
||||
|
@ -549,7 +549,7 @@ list_on_node_mnesia(Node) ->
|
|||
#tracked_connection{_ = '_'})
|
||||
catch exit:{aborted, {no_exists, _}} ->
|
||||
%% The table might not exist yet (or is already gone)
|
||||
%% between the time rabbit_nodes:all_running() runs and
|
||||
%% between the time rabbit_nodes:list_running() runs and
|
||||
%% returns a specific node, and
|
||||
%% mnesia:dirty_match_object() is called for that node's
|
||||
%% table.
|
||||
|
|
|
@ -102,7 +102,7 @@ gc_exchanges() ->
|
|||
gc_process_and_entity(channel_exchange_metrics, GbSet).
|
||||
|
||||
gc_nodes() ->
|
||||
Nodes = rabbit_nodes:all(),
|
||||
Nodes = rabbit_nodes:list_members(),
|
||||
GbSet = gb_sets:from_list(Nodes),
|
||||
gc_entity(node_node_metrics, GbSet).
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@ list_local() ->
|
|||
-spec list() -> [pid()].
|
||||
|
||||
list() ->
|
||||
Nodes = rabbit_nodes:all_running(),
|
||||
Nodes = rabbit_nodes:list_running(),
|
||||
rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_direct, list_local, [], ?RPC_TIMEOUT).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
|
|
@ -914,21 +914,21 @@ post_enable(#{states_per_node := _}, FeatureName, Nodes) ->
|
|||
|
||||
-ifndef(TEST).
|
||||
all_nodes() ->
|
||||
lists:usort([node() | mnesia:system_info(db_nodes)]).
|
||||
lists:usort([node() | rabbit_nodes:list_members()]).
|
||||
|
||||
running_nodes() ->
|
||||
lists:usort([node() | mnesia:system_info(running_db_nodes)]).
|
||||
lists:usort([node() | rabbit_nodes:list_running()]).
|
||||
-else.
|
||||
all_nodes() ->
|
||||
RemoteNodes = case rabbit_feature_flags:get_overriden_nodes() of
|
||||
undefined -> mnesia:system_info(db_nodes);
|
||||
undefined -> rabbit_nodes:list_members();
|
||||
Nodes -> Nodes
|
||||
end,
|
||||
lists:usort([node() | RemoteNodes]).
|
||||
|
||||
running_nodes() ->
|
||||
RemoteNodes = case rabbit_feature_flags:get_overriden_running_nodes() of
|
||||
undefined -> mnesia:system_info(running_db_nodes);
|
||||
undefined -> rabbit_nodes:list_running();
|
||||
Nodes -> Nodes
|
||||
end,
|
||||
lists:usort([node() | RemoteNodes]).
|
||||
|
|
|
@ -290,7 +290,7 @@ stop_local_quorum_queue_followers() ->
|
|||
|
||||
-spec primary_replica_transfer_candidate_nodes() -> [node()].
|
||||
primary_replica_transfer_candidate_nodes() ->
|
||||
filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]).
|
||||
filter_out_drained_nodes_consistent_read(rabbit_nodes:list_running() -- [node()]).
|
||||
|
||||
-spec random_primary_replica_transfer_candidate_node([node()], [node()]) -> {ok, node()} | undefined.
|
||||
random_primary_replica_transfer_candidate_node([], _Preferred) ->
|
||||
|
|
|
@ -154,7 +154,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
|
|||
slaves_to_start_on_failure(Q, DeadGMPids) ->
|
||||
%% In case Mnesia has not caught up yet, filter out nodes we know
|
||||
%% to be dead..
|
||||
ClusterNodes = rabbit_nodes:all_running() --
|
||||
ClusterNodes = rabbit_nodes:list_running() --
|
||||
[node(P) || P <- DeadGMPids],
|
||||
{_, OldNodes, _} = actual_queue_nodes(Q),
|
||||
{_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes),
|
||||
|
@ -321,7 +321,7 @@ store_updated_slaves(Q0) when ?is_amqqueue(Q0) ->
|
|||
%% a long time without being removed.
|
||||
update_recoverable(SPids, RS) ->
|
||||
SNodes = [node(SPid) || SPid <- SPids],
|
||||
RunningNodes = rabbit_nodes:all_running(),
|
||||
RunningNodes = rabbit_nodes:list_running(),
|
||||
AddNodes = SNodes -- RS,
|
||||
DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave
|
||||
(RS -- DelNodes) ++ AddNodes.
|
||||
|
@ -375,17 +375,17 @@ promote_slave([SPid | SPids]) ->
|
|||
-spec initial_queue_node(amqqueue:amqqueue(), node()) -> node().
|
||||
|
||||
initial_queue_node(Q, DefNode) ->
|
||||
{MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, rabbit_nodes:all_running()),
|
||||
{MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, rabbit_nodes:list_running()),
|
||||
MNode.
|
||||
|
||||
-spec suggested_queue_nodes(amqqueue:amqqueue()) ->
|
||||
{node(), [node()]}.
|
||||
|
||||
suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:all_running()).
|
||||
suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:list_running()).
|
||||
suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All).
|
||||
|
||||
%% The third argument exists so we can pull a call to
|
||||
%% rabbit_nodes:all_running() out of a loop or transaction
|
||||
%% rabbit_nodes:list_running() out of a loop or transaction
|
||||
%% or both.
|
||||
suggested_queue_nodes(Q, DefNode, All) when ?is_amqqueue(Q) ->
|
||||
Owner = amqqueue:get_exclusive_owner(Q),
|
||||
|
|
|
@ -100,7 +100,7 @@ prepare(Node, NodeMapList) ->
|
|||
|
||||
%% Check that we are in the cluster, all old nodes are in the
|
||||
%% cluster, and no new nodes are.
|
||||
Nodes = rabbit_nodes:all(),
|
||||
Nodes = rabbit_nodes:list_members(),
|
||||
case {FromNodes -- Nodes, ToNodes -- (ToNodes -- Nodes),
|
||||
lists:member(Node, Nodes ++ ToNodes)} of
|
||||
{[], [], true} -> ok;
|
||||
|
@ -130,7 +130,7 @@ restore_backup(Backup) ->
|
|||
-spec maybe_finish() -> ok.
|
||||
|
||||
maybe_finish() ->
|
||||
AllNodes = rabbit_nodes:all(),
|
||||
AllNodes = rabbit_nodes:list_members(),
|
||||
maybe_finish(AllNodes).
|
||||
|
||||
-spec maybe_finish([node()]) -> 'ok'.
|
||||
|
@ -144,7 +144,7 @@ maybe_finish(AllNodes) ->
|
|||
finish(FromNode, ToNode, AllNodes) ->
|
||||
case node() of
|
||||
ToNode ->
|
||||
case rabbit_nodes:filter_nodes_running_rabbitmq(AllNodes) of
|
||||
case rabbit_nodes:filter_running(AllNodes) of
|
||||
[] -> finish_primary(FromNode, ToNode);
|
||||
_ -> finish_secondary(FromNode, ToNode, AllNodes)
|
||||
end;
|
||||
|
@ -257,8 +257,8 @@ update_term(_NodeMap, Term) ->
|
|||
Term.
|
||||
|
||||
rename_in_running_mnesia(FromNode, ToNode) ->
|
||||
All = rabbit_nodes:all(),
|
||||
Running = rabbit_nodes:all_running(),
|
||||
All = rabbit_nodes:list_members(),
|
||||
Running = rabbit_mnesia:cluster_nodes(running),
|
||||
case {lists:member(FromNode, Running), lists:member(ToNode, All)} of
|
||||
{false, true} -> ok;
|
||||
{true, _} -> exit({old_node_running, FromNode});
|
||||
|
|
|
@ -463,7 +463,7 @@ maybe_get_epmd_port(Name, Host) ->
|
|||
-spec active_listeners() -> [rabbit_types:listener()].
|
||||
|
||||
active_listeners() ->
|
||||
Nodes = rabbit_mnesia:cluster_nodes(running),
|
||||
Nodes = rabbit_nodes:list_running(),
|
||||
lists:append([node_listeners(Node) || Node <- Nodes]).
|
||||
|
||||
-spec node_listeners(node()) -> [rabbit_types:listener()].
|
||||
|
@ -529,7 +529,7 @@ unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid).
|
|||
-spec connections() -> [rabbit_types:connection()].
|
||||
|
||||
connections() ->
|
||||
Nodes = rabbit_nodes:all_running(),
|
||||
Nodes = rabbit_nodes:list_running(),
|
||||
rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_networking, connections_local, [], ?RPC_TIMEOUT).
|
||||
|
||||
-spec local_connections() -> [rabbit_types:connection()].
|
||||
|
@ -552,7 +552,7 @@ unregister_non_amqp_connection(Pid) -> pg_local:leave(rabbit_non_amqp_connection
|
|||
-spec non_amqp_connections() -> [rabbit_types:connection()].
|
||||
|
||||
non_amqp_connections() ->
|
||||
Nodes = rabbit_nodes:all_running(),
|
||||
Nodes = rabbit_nodes:list_running(),
|
||||
rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_networking, local_non_amqp_connections, [], ?RPC_TIMEOUT).
|
||||
|
||||
-spec local_non_amqp_connections() -> [rabbit_types:connection()].
|
||||
|
|
|
@ -167,7 +167,7 @@ notify_node_up() ->
|
|||
|
||||
notify_joined_cluster() ->
|
||||
NewMember = node(),
|
||||
Nodes = rabbit_nodes:all_running() -- [NewMember],
|
||||
Nodes = rabbit_nodes:list_running() -- [NewMember],
|
||||
gen_server:abcast(Nodes, ?SERVER,
|
||||
{joined_cluster, node(), rabbit_mnesia:node_type()}),
|
||||
|
||||
|
@ -176,7 +176,7 @@ notify_joined_cluster() ->
|
|||
-spec notify_left_cluster(node()) -> 'ok'.
|
||||
|
||||
notify_left_cluster(Node) ->
|
||||
Nodes = rabbit_nodes:all_running(),
|
||||
Nodes = rabbit_nodes:list_running(),
|
||||
gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}),
|
||||
ok.
|
||||
|
||||
|
@ -413,7 +413,7 @@ handle_call(_Request, _From, State) ->
|
|||
{noreply, State}.
|
||||
|
||||
handle_cast(notify_node_up, State = #state{guid = GUID}) ->
|
||||
Nodes = rabbit_nodes:all_running() -- [node()],
|
||||
Nodes = rabbit_nodes:list_running() -- [node()],
|
||||
gen_server:abcast(Nodes, ?SERVER,
|
||||
{node_up, node(), rabbit_mnesia:node_type(), GUID}),
|
||||
%% register other active rabbits with this rabbit
|
||||
|
@ -470,7 +470,7 @@ handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) ->
|
|||
handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID},
|
||||
State = #state{guid = MyGUID,
|
||||
node_guids = GUIDs}) ->
|
||||
case lists:member(Node, rabbit_nodes:all_running()) andalso
|
||||
case lists:member(Node, rabbit_nodes:list_reachable()) andalso
|
||||
maps:find(Node, GUIDs) =:= {ok, NodeGUID} of
|
||||
true -> spawn_link( %%[1]
|
||||
fun () ->
|
||||
|
@ -623,7 +623,7 @@ handle_info({nodedown, Node, Info}, State = #state{guid = MyGUID,
|
|||
Node, node(), DownGUID, CheckGUID, MyGUID})
|
||||
end,
|
||||
_ = case maps:find(Node, GUIDs) of
|
||||
{ok, DownGUID} -> Alive = rabbit_nodes:all_running()
|
||||
{ok, DownGUID} -> Alive = rabbit_nodes:list_reachable()
|
||||
-- [node(), Node],
|
||||
[case maps:find(N, GUIDs) of
|
||||
{ok, CheckGUID} -> Check(N, CheckGUID, DownGUID);
|
||||
|
@ -822,7 +822,7 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions,
|
|||
%% going away. It's only safe to forget anything about partitions when
|
||||
%% there are no partitions.
|
||||
Down = Partitions -- alive_rabbit_nodes(),
|
||||
NoLongerPartitioned = rabbit_nodes:all_running(),
|
||||
NoLongerPartitioned = rabbit_mnesia:cluster_nodes(running),
|
||||
Partitions1 = case Partitions -- Down -- NoLongerPartitioned of
|
||||
[] -> [];
|
||||
_ -> Partitions
|
||||
|
@ -903,8 +903,7 @@ disconnect(Node) ->
|
|||
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% mnesia:system_info(db_nodes) (and hence
|
||||
%% rabbit_nodes:all_running()) does not return all nodes
|
||||
%% mnesia:system_info(db_nodes) does not return all nodes
|
||||
%% when partitioned, just those that we are sharing Mnesia state
|
||||
%% with. So we have a small set of replacement functions
|
||||
%% here. "rabbit" in a function's name implies we test if the rabbit
|
||||
|
@ -919,7 +918,7 @@ majority() ->
|
|||
majority([]).
|
||||
|
||||
majority(NodesDown) ->
|
||||
Nodes = rabbit_nodes:all(),
|
||||
Nodes = rabbit_nodes:list_members(),
|
||||
AliveNodes = alive_nodes(Nodes) -- NodesDown,
|
||||
length(AliveNodes) / length(Nodes) > 0.5.
|
||||
|
||||
|
@ -932,44 +931,44 @@ in_preferred_partition(PreferredNodes) ->
|
|||
in_preferred_partition(PreferredNodes, []).
|
||||
|
||||
in_preferred_partition(PreferredNodes, NodesDown) ->
|
||||
Nodes = rabbit_nodes:all(),
|
||||
Nodes = rabbit_nodes:list_members(),
|
||||
RealPreferredNodes = [N || N <- PreferredNodes, lists:member(N, Nodes)],
|
||||
AliveNodes = alive_nodes(RealPreferredNodes) -- NodesDown,
|
||||
RealPreferredNodes =:= [] orelse AliveNodes =/= [].
|
||||
|
||||
all_nodes_up() ->
|
||||
Nodes = rabbit_nodes:all(),
|
||||
Nodes = rabbit_nodes:list_members(),
|
||||
length(alive_nodes(Nodes)) =:= length(Nodes).
|
||||
|
||||
-spec all_rabbit_nodes_up() -> boolean().
|
||||
|
||||
all_rabbit_nodes_up() ->
|
||||
Nodes = rabbit_nodes:all(),
|
||||
Nodes = rabbit_nodes:list_members(),
|
||||
length(alive_rabbit_nodes(Nodes)) =:= length(Nodes).
|
||||
|
||||
alive_nodes() -> alive_nodes(rabbit_nodes:all()).
|
||||
alive_nodes() -> rabbit_nodes:list_reachable().
|
||||
|
||||
-spec alive_nodes([node()]) -> [node()].
|
||||
|
||||
alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])].
|
||||
alive_nodes(Nodes) -> rabbit_nodes:filter_reachable(Nodes).
|
||||
|
||||
alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_nodes:all()).
|
||||
alive_rabbit_nodes() -> rabbit_nodes:list_running().
|
||||
|
||||
-spec alive_rabbit_nodes([node()]) -> [node()].
|
||||
|
||||
alive_rabbit_nodes(Nodes) ->
|
||||
[N || N <- alive_nodes(Nodes), rabbit:is_running(N)].
|
||||
rabbit_nodes:filter_running(Nodes).
|
||||
|
||||
%% This one is allowed to connect!
|
||||
|
||||
-spec ping_all() -> 'ok'.
|
||||
|
||||
ping_all() ->
|
||||
[net_adm:ping(N) || N <- rabbit_nodes:all()],
|
||||
[net_adm:ping(N) || N <- rabbit_nodes:list_members()],
|
||||
ok.
|
||||
|
||||
possibly_partitioned_nodes() ->
|
||||
alive_rabbit_nodes() -- rabbit_nodes:all_running().
|
||||
alive_rabbit_nodes() -- rabbit_mnesia:cluster_nodes(running).
|
||||
|
||||
startup_log([]) ->
|
||||
rabbit_log:info("Starting rabbit_node_monitor", []);
|
||||
|
|
|
@ -7,11 +7,22 @@
|
|||
|
||||
-module(rabbit_nodes).
|
||||
|
||||
-include_lib("kernel/include/logger.hrl").
|
||||
|
||||
-include_lib("rabbit_common/include/logging.hrl").
|
||||
|
||||
-export([names/1, diagnostics/1, make/1, make/2, parts/1, cookie_hash/0,
|
||||
is_running/2, is_process_running/2,
|
||||
cluster_name/0, set_cluster_name/1, set_cluster_name/2, ensure_epmd/0,
|
||||
all_running/0, all_running_mnesia/0,
|
||||
all_running_rabbitmq/0, filter_nodes_running_rabbitmq/1,
|
||||
all_running/0,
|
||||
is_member/1, list_members/0,
|
||||
filter_members/1,
|
||||
is_reachable/1, list_reachable/0, list_unreachable/0,
|
||||
filter_reachable/1, filter_unreachable/1,
|
||||
is_running/1, list_running/0, list_not_running/0,
|
||||
filter_running/1, filter_not_running/1,
|
||||
is_serving/1, list_serving/0, list_not_serving/0,
|
||||
filter_serving/1, filter_not_serving/1,
|
||||
name_type/0, running_count/0, total_count/0,
|
||||
await_running_count/2, is_single_node_cluster/0,
|
||||
boot/0]).
|
||||
|
@ -20,10 +31,14 @@
|
|||
if_reached_target_cluster_size/2]).
|
||||
-export([lock_id/1, lock_retries/0]).
|
||||
|
||||
-deprecated({all, 0, "Use rabbit_nodes:list_members/0 instead"}).
|
||||
-deprecated({all_running, 0, "Use rabbit_nodes:list_running/0 instead"}).
|
||||
|
||||
-include_lib("kernel/include/inet.hrl").
|
||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
|
||||
-define(SAMPLING_INTERVAL, 1000).
|
||||
-define(FILTER_RPC_TIMEOUT, 10000).
|
||||
|
||||
-define(INTERNAL_CLUSTER_ID_PARAM_NAME, internal_cluster_id).
|
||||
|
||||
|
@ -143,28 +158,376 @@ ensure_epmd() ->
|
|||
rabbit_nodes_common:ensure_epmd().
|
||||
|
||||
-spec all() -> [node()].
|
||||
all() -> rabbit_mnesia:cluster_nodes(all).
|
||||
all() -> list_members().
|
||||
|
||||
-spec all_running() -> [node()].
|
||||
all_running() -> all_running_mnesia().
|
||||
all_running() -> list_running().
|
||||
|
||||
-spec all_running_mnesia() -> [node()].
|
||||
all_running_mnesia() -> rabbit_mnesia:cluster_nodes(running).
|
||||
-spec is_member(Node) -> IsMember when
|
||||
Node :: node(),
|
||||
IsMember :: boolean().
|
||||
%% @doc Indicates if the given node is a cluster member.
|
||||
%%
|
||||
%% @see filter_members/1.
|
||||
|
||||
-spec all_running_rabbitmq() -> [node()].
|
||||
all_running_rabbitmq() ->
|
||||
AllNodes = all(),
|
||||
filter_nodes_running_rabbitmq(AllNodes).
|
||||
is_member(Node) when is_atom(Node) ->
|
||||
[Node] =:= filter_members([Node]).
|
||||
|
||||
-spec filter_nodes_running_rabbitmq([node()]) -> [node()].
|
||||
filter_nodes_running_rabbitmq(Nodes) ->
|
||||
[N || N <- Nodes, rabbit:is_running(N)].
|
||||
-spec list_members() -> Nodes when
|
||||
Nodes :: [node()].
|
||||
%% @doc Returns the list of nodes in the cluster.
|
||||
%%
|
||||
%% @see filter_members/1.
|
||||
|
||||
list_members() ->
|
||||
mnesia:system_info(db_nodes).
|
||||
|
||||
-spec filter_members(Nodes) -> Nodes when
|
||||
Nodes :: [node()].
|
||||
%% @doc Filters the given list of nodes to only select those belonging to the
|
||||
%% cluster.
|
||||
%%
|
||||
%% The cluster being considered is the one which the node running this
|
||||
%% function belongs to.
|
||||
|
||||
filter_members(Nodes) ->
|
||||
%% Before calling {@link filter_members/2}, we filter out any node which
|
||||
%% is not part of the same cluster as the node running this function.
|
||||
Members = list_members(),
|
||||
[Node || Node <- Nodes, lists:member(Node, Members)].
|
||||
|
||||
-spec is_reachable(Node) -> IsReachable when
|
||||
Node :: node(),
|
||||
IsReachable :: boolean().
|
||||
%% @doc Indicates if the given node is reachable.
|
||||
%%
|
||||
%% @see filter_reachable/1.
|
||||
|
||||
is_reachable(Node) when is_atom(Node) ->
|
||||
[Node] =:= filter_reachable([Node]).
|
||||
|
||||
-spec list_reachable() -> Nodes when
|
||||
Nodes :: [node()].
|
||||
%% @doc Returns the list of nodes in the cluster we can reach.
|
||||
%%
|
||||
%% A reachable node is one we can connect to using Erlang distribution.
|
||||
%%
|
||||
%% @see filter_reachable/1.
|
||||
|
||||
list_reachable() ->
|
||||
Members = list_members(),
|
||||
filter_reachable(Members).
|
||||
|
||||
-spec list_unreachable() -> Nodes when
|
||||
Nodes :: [node()].
|
||||
%% @doc Returns the list of nodes in the cluster we can't reach.
|
||||
%%
|
||||
%% A reachable node is one we can connect to using Erlang distribution.
|
||||
%%
|
||||
%% @see filter_unreachable/1.
|
||||
|
||||
list_unreachable() ->
|
||||
Members = list_members(),
|
||||
filter_unreachable(Members).
|
||||
|
||||
-spec filter_reachable(Nodes) -> Nodes when
|
||||
Nodes :: [node()].
|
||||
%% @doc Filters the given list of nodes to only select those belonging to the
|
||||
%% cluster and we can reach.
|
||||
%%
|
||||
%% The cluster being considered is the one which the node running this
|
||||
%% function belongs to.
|
||||
%%
|
||||
%% A reachable node is one we can connect to using Erlang distribution.
|
||||
|
||||
filter_reachable(Nodes) ->
|
||||
Members = filter_members(Nodes),
|
||||
do_filter_reachable(Members).
|
||||
|
||||
-spec filter_unreachable(Nodes) -> Nodes when
|
||||
Nodes :: [node()].
|
||||
%% @doc Filters the given list of nodes to only select those belonging to the
|
||||
%% cluster but we can't reach.
|
||||
%%
|
||||
%% @see filter_reachable/1.
|
||||
|
||||
filter_unreachable(Nodes) ->
|
||||
Members = filter_members(Nodes),
|
||||
Reachable = do_filter_reachable(Members),
|
||||
Members -- Reachable.
|
||||
|
||||
-spec do_filter_reachable(Members) -> Members when
|
||||
Members :: [node()].
|
||||
%% @doc Filters the given list of cluster members to only select those we can
|
||||
%% reach.
|
||||
%%
|
||||
%% The given list of nodes must have been verified to only contain cluster
|
||||
%% members.
|
||||
%%
|
||||
%% @private
|
||||
|
||||
do_filter_reachable(Members) ->
|
||||
%% All clustered members we can reach, regardless of the state of RabbitMQ
|
||||
%% on those nodes.
|
||||
%%
|
||||
%% We are using `nodes/0' to get the list of nodes we should be able to
|
||||
%% reach. This list might be out-of-date, but it's the only way we can
|
||||
%% filter `Members' without trying to effectively connect to each node. If
|
||||
%% we try to connect, it breaks `rabbit_node_monitor' partial partition
|
||||
%% detection mechanism.
|
||||
Nodes = [node() | nodes()],
|
||||
lists:filter(
|
||||
fun(Member) -> lists:member(Member, Nodes) end,
|
||||
Members).
|
||||
|
||||
-spec is_running(Node) -> IsRunning when
|
||||
Node :: node(),
|
||||
IsRunning :: boolean().
|
||||
%% @doc Indicates if the given node is running.
|
||||
%%
|
||||
%% @see filter_running/1.
|
||||
|
||||
is_running(Node) when is_atom(Node) ->
|
||||
[Node] =:= filter_running([Node]).
|
||||
|
||||
-spec list_running() -> Nodes when
|
||||
Nodes :: [node()].
|
||||
%% @doc Returns the list of nodes in the cluster where RabbitMQ is running.
|
||||
%%
|
||||
%% Note that even if RabbitMQ is running, the node could reject clients if it
|
||||
%% is under maintenance.
|
||||
%%
|
||||
%% @see filter_running/1.
|
||||
%% @see rabbit:is_running/0.
|
||||
|
||||
list_running() ->
|
||||
Members = list_members(),
|
||||
filter_running(Members).
|
||||
|
||||
-spec list_not_running() -> Nodes when
|
||||
Nodes :: [node()].
|
||||
%% @doc Returns the list of nodes in the cluster where RabbitMQ is not running
|
||||
%% or we can't reach.
|
||||
%%
|
||||
%% @see filter_not_running/1.
|
||||
%% @see rabbit:is_running/0.
|
||||
|
||||
list_not_running() ->
|
||||
Members = list_members(),
|
||||
filter_not_running(Members).
|
||||
|
||||
-spec filter_running(Nodes) -> Nodes when
|
||||
Nodes :: [node()].
|
||||
%% @doc Filters the given list of nodes to only select those belonging to the
|
||||
%% cluster and where RabbitMQ is running.
|
||||
%%
|
||||
%% The cluster being considered is the one which the node running this
|
||||
%% function belongs to.
|
||||
%%
|
||||
%% Note that even if RabbitMQ is running, the node could reject clients if it
|
||||
%% is under maintenance.
|
||||
%%
|
||||
%% @see rabbit:is_running/0.
|
||||
|
||||
filter_running(Nodes) ->
|
||||
Members = filter_members(Nodes),
|
||||
do_filter_running(Members).
|
||||
|
||||
-spec filter_not_running(Nodes) -> Nodes when
|
||||
Nodes :: [node()].
|
||||
%% @doc Filters the given list of nodes to only select those belonging to the
|
||||
%% cluster and where RabbitMQ is not running or we can't reach.
|
||||
%%
|
||||
%% The cluster being considered is the one which the node running this
|
||||
%% function belongs to.
|
||||
%%
|
||||
%% @see filter_running/1.
|
||||
|
||||
filter_not_running(Nodes) ->
|
||||
Members = filter_members(Nodes),
|
||||
Running = do_filter_running(Members),
|
||||
Members -- Running.
|
||||
|
||||
-spec do_filter_running(Members) -> Members when
|
||||
Members :: [node()].
|
||||
%% @doc Filters the given list of cluster members to only select those who
|
||||
%% run `rabbit'.
|
||||
%%
|
||||
%% Those nodes could run `rabbit' without accepting clients.
|
||||
%%
|
||||
%% The given list of nodes must have been verified to only contain cluster
|
||||
%% members.
|
||||
%%
|
||||
%% @private
|
||||
|
||||
do_filter_running(Members) ->
|
||||
%% All clustered members where `rabbit' is running, regardless if they are
|
||||
%% under maintenance or not.
|
||||
Rets = erpc:multicall(
|
||||
Members, rabbit, is_running, [], ?FILTER_RPC_TIMEOUT),
|
||||
RetPerMember = lists:zip(Members, Rets),
|
||||
lists:filtermap(
|
||||
fun
|
||||
({Member, {ok, true}}) ->
|
||||
{true, Member};
|
||||
({_, {ok, false}}) ->
|
||||
false;
|
||||
({_, {error, {erpc, Reason}}})
|
||||
when Reason =:= noconnection orelse Reason =:= timeout ->
|
||||
false;
|
||||
({Member, Error}) ->
|
||||
?LOG_ERROR(
|
||||
"~s:~s: Failed to query node ~ts: ~p",
|
||||
[?MODULE, ?FUNCTION_NAME, Member, Error],
|
||||
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
|
||||
false
|
||||
end, RetPerMember).
|
||||
|
||||
-spec is_serving(Node) -> IsServing when
|
||||
Node :: node(),
|
||||
IsServing :: boolean().
|
||||
%% @doc Indicates if the given node is serving.
|
||||
%%
|
||||
%% @see filter_serving/1.
|
||||
|
||||
is_serving(Node) when is_atom(Node) ->
|
||||
[Node] =:= filter_serving([Node]).
|
||||
|
||||
-spec list_serving() -> Nodes when
|
||||
Nodes :: [node()].
|
||||
%% @doc Returns the list of nodes in the cluster who accept clients.
|
||||
%%
|
||||
%% @see filter_serving/1.
|
||||
%% @see rabbit:is_serving/0.
|
||||
|
||||
list_serving() ->
|
||||
Members = list_members(),
|
||||
filter_serving(Members).
|
||||
|
||||
-spec list_not_serving() -> Nodes when
|
||||
Nodes :: [node()].
|
||||
%% @doc Returns the list of nodes in the cluster who reject clients, don't
|
||||
%% run RabbitMQ or we can't reach.
|
||||
%%
|
||||
%% @see filter_serving/1.
|
||||
%% @see rabbit:is_serving/0.
|
||||
|
||||
list_not_serving() ->
|
||||
Members = list_members(),
|
||||
filter_not_serving(Members).
|
||||
|
||||
-spec filter_serving(Nodes) -> Nodes when
|
||||
Nodes :: [node()].
|
||||
%% @doc Filters the given list of nodes to only select those belonging to the
|
||||
%% cluster and who accept clients.
|
||||
%%
|
||||
%% The cluster being considered is the one which the node running this
|
||||
%% function belongs to.
|
||||
%%
|
||||
%% @see rabbit:is_serving/0.
|
||||
|
||||
filter_serving(Nodes) ->
|
||||
Members = filter_members(Nodes),
|
||||
do_filter_serving(Members).
|
||||
|
||||
-spec filter_not_serving(Nodes) -> Nodes when
|
||||
Nodes :: [node()].
|
||||
%% @doc Filters the given list of nodes to only select those belonging to the
|
||||
%% cluster and where RabbitMQ is rejecting clients, is not running or we can't
|
||||
%% reach.
|
||||
%%
|
||||
%% The cluster being considered is the one which the node running this
|
||||
%% function belongs to.
|
||||
%%
|
||||
%% @see filter_serving/1.
|
||||
|
||||
filter_not_serving(Nodes) ->
|
||||
Members = filter_members(Nodes),
|
||||
Serving = do_filter_serving(Members),
|
||||
Members -- Serving.
|
||||
|
||||
-spec do_filter_serving(Members) -> Members when
|
||||
Members :: [node()].
|
||||
%% @doc Filters the given list of cluster members to only select those who
|
||||
%% accept clients.
|
||||
%%
|
||||
%% The given list of nodes must have been verified to only contain cluster
|
||||
%% members.
|
||||
%%
|
||||
%% @private
|
||||
|
||||
do_filter_serving(Members) ->
|
||||
%% All clustered members serving clients. This implies that `rabbit' is
|
||||
%% running.
|
||||
Rets = erpc:multicall(
|
||||
Members, rabbit, is_serving, [], ?FILTER_RPC_TIMEOUT),
|
||||
RetPerMember0 = lists:zip(Members, Rets),
|
||||
RetPerMember1 = handle_is_serving_undefined(RetPerMember0, []),
|
||||
lists:filtermap(
|
||||
fun
|
||||
({Member, {ok, true}}) ->
|
||||
{true, Member};
|
||||
({_, {ok, false}}) ->
|
||||
false;
|
||||
({_, {error, {erpc, Reason}}})
|
||||
when Reason =:= noconnection orelse Reason =:= timeout ->
|
||||
false;
|
||||
({Member, Error}) ->
|
||||
?LOG_ERROR(
|
||||
"~s:~s: Failed to query node ~ts: ~p",
|
||||
[?MODULE, ?FUNCTION_NAME, Member, Error],
|
||||
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
|
||||
false
|
||||
end, RetPerMember1).
|
||||
|
||||
handle_is_serving_undefined(
|
||||
[{Member, {error, {exception, undef, [{rabbit, is_serving, [], _} | _]}}}
|
||||
| Rest],
|
||||
Result) ->
|
||||
%% The remote node must be RabbitMQ 3.11.x without the
|
||||
%% `rabbit:is_serving()' function. That's ok, we can perform two calls
|
||||
%% instead.
|
||||
%%
|
||||
%% This function can go away once we require a RabbitMQ version which has
|
||||
%% `rabbit:is_serving()'.
|
||||
?LOG_NOTICE(
|
||||
"~s:~s: rabbit:is_serving() unavailable on node ~ts, falling back to "
|
||||
"two RPC calls",
|
||||
[?MODULE, ?FUNCTION_NAME, Member],
|
||||
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
|
||||
try
|
||||
IsRunning = erpc:call(
|
||||
Member, rabbit, is_running, [], ?FILTER_RPC_TIMEOUT),
|
||||
case IsRunning of
|
||||
true ->
|
||||
InMaintenance = erpc:call(
|
||||
Member,
|
||||
rabbit_maintenance,
|
||||
is_being_drained_local_read, [Member],
|
||||
?FILTER_RPC_TIMEOUT),
|
||||
Result1 = [{Member, {ok, not InMaintenance}} | Result],
|
||||
handle_is_serving_undefined(Rest, Result1);
|
||||
false ->
|
||||
Result1 = [{Member, {ok, false}} | Result],
|
||||
handle_is_serving_undefined(Rest, Result1)
|
||||
end
|
||||
catch
|
||||
Class:Reason ->
|
||||
Result2 = [{Member, {Class, Reason}} | Result],
|
||||
handle_is_serving_undefined(Rest, Result2)
|
||||
end;
|
||||
handle_is_serving_undefined(
|
||||
[Ret | Rest], Result) ->
|
||||
handle_is_serving_undefined(Rest, [Ret | Result]);
|
||||
handle_is_serving_undefined(
|
||||
[], Result) ->
|
||||
lists:reverse(Result).
|
||||
|
||||
-spec running_count() -> integer().
|
||||
running_count() -> length(all_running()).
|
||||
running_count() -> length(list_running()).
|
||||
|
||||
-spec total_count() -> integer().
|
||||
total_count() -> length(rabbit_nodes:all()).
|
||||
total_count() -> length(list_members()).
|
||||
|
||||
-spec is_single_node_cluster() -> boolean().
|
||||
is_single_node_cluster() ->
|
||||
|
@ -188,7 +551,7 @@ await_running_count_with_retries(TargetCount, Retries) ->
|
|||
|
||||
-spec all_running_with_hashes() -> #{non_neg_integer() => node()}.
|
||||
all_running_with_hashes() ->
|
||||
maps:from_list([{erlang:phash2(Node), Node} || Node <- all_running()]).
|
||||
maps:from_list([{erlang:phash2(Node), Node} || Node <- list_running()]).
|
||||
|
||||
-spec target_cluster_size_hint() -> non_neg_integer().
|
||||
target_cluster_size_hint() ->
|
||||
|
|
|
@ -27,7 +27,8 @@ queue_leader_locators() ->
|
|||
{Leader :: node(), Followers :: [node()]}.
|
||||
select_leader_and_followers(Q, Size)
|
||||
when (?amqqueue_is_quorum(Q) orelse ?amqqueue_is_stream(Q)) andalso is_integer(Size) ->
|
||||
{AllNodes, _DiscNodes, RunningNodes} = rabbit_mnesia:cluster_nodes(status),
|
||||
AllNodes = rabbit_nodes:list_members(),
|
||||
RunningNodes = rabbit_nodes:filter_running(AllNodes),
|
||||
true = lists:member(node(), AllNodes),
|
||||
QueueType = amqqueue:get_type(Q),
|
||||
GetQueues0 = get_queues_for_type(QueueType),
|
||||
|
|
|
@ -95,7 +95,7 @@ all_nodes(Queue) when ?is_amqqueue(Queue) ->
|
|||
handle_is_mirrored_ha_nodes(false, _Queue) ->
|
||||
% Note: ha-mode is NOT 'nodes' - it is either exactly or all, which means
|
||||
% that any node in the cluster is eligible to be the new queue master node
|
||||
rabbit_nodes:all_running();
|
||||
rabbit_nodes:list_serving();
|
||||
handle_is_mirrored_ha_nodes(true, Queue) ->
|
||||
% Note: ha-mode is 'nodes', which explicitly specifies allowed nodes.
|
||||
% We must use suggested_queue_nodes to get that list of nodes as the
|
||||
|
@ -103,6 +103,6 @@ handle_is_mirrored_ha_nodes(true, Queue) ->
|
|||
handle_suggested_queue_nodes(rabbit_mirror_queue_misc:suggested_queue_nodes(Queue)).
|
||||
|
||||
handle_suggested_queue_nodes({_MNode, []}) ->
|
||||
rabbit_nodes:all_running();
|
||||
rabbit_nodes:list_serving();
|
||||
handle_suggested_queue_nodes({MNode, SNodes}) ->
|
||||
[MNode | SNodes].
|
||||
|
|
|
@ -367,7 +367,7 @@ filter_quorum_critical(Queues) ->
|
|||
%% '%2F_qq.1590' => leader,'%2F_qq.1363' => leader,
|
||||
%% '%2F_qq.882' => leader,'%2F_qq.1161' => leader,...}}
|
||||
ReplicaStates = maps:from_list(
|
||||
rabbit_misc:append_rpc_all_nodes(rabbit_nodes:all_running(),
|
||||
rabbit_misc:append_rpc_all_nodes(rabbit_nodes:list_running(),
|
||||
?MODULE, all_replica_states, [])),
|
||||
filter_quorum_critical(Queues, ReplicaStates).
|
||||
|
||||
|
@ -475,7 +475,7 @@ handle_tick(QName,
|
|||
| infos(QName, Keys)],
|
||||
rabbit_core_metrics:queue_stats(QName, Infos),
|
||||
ok = repair_leader_record(QName, Self),
|
||||
ExpectedNodes = rabbit_nodes:all(),
|
||||
ExpectedNodes = rabbit_nodes:list_members(),
|
||||
case Nodes -- ExpectedNodes of
|
||||
[] ->
|
||||
ok;
|
||||
|
@ -1059,7 +1059,7 @@ add_member(VHost, Name, Node, Timeout) ->
|
|||
{error, classic_queue_not_supported};
|
||||
{ok, Q} when ?amqqueue_is_quorum(Q) ->
|
||||
QNodes = get_nodes(Q),
|
||||
case lists:member(Node, rabbit_nodes:all_running()) of
|
||||
case lists:member(Node, rabbit_nodes:list_running()) of
|
||||
false ->
|
||||
{error, node_not_running};
|
||||
true ->
|
||||
|
@ -1199,7 +1199,7 @@ shrink_all(Node) ->
|
|||
[{rabbit_amqqueue:name(),
|
||||
{ok, pos_integer()} | {error, pos_integer(), term()}}].
|
||||
grow(Node, VhostSpec, QueueSpec, Strategy) ->
|
||||
Running = rabbit_nodes:all_running(),
|
||||
Running = rabbit_nodes:list_running(),
|
||||
[begin
|
||||
Size = length(get_nodes(Q)),
|
||||
QName = amqqueue:get_name(Q),
|
||||
|
|
|
@ -426,7 +426,7 @@ ensure_coordinator_started() ->
|
|||
end.
|
||||
|
||||
start_coordinator_cluster() ->
|
||||
Nodes = rabbit_mnesia:cluster_nodes(running),
|
||||
Nodes = rabbit_nodes:list_running(),
|
||||
rabbit_log:debug("Starting stream coordinator on nodes: ~w", [Nodes]),
|
||||
case ra:start_cluster(?RA_SYSTEM, [make_ra_conf(Node, Nodes) || Node <- Nodes]) of
|
||||
{ok, Started, _} ->
|
||||
|
@ -439,7 +439,7 @@ start_coordinator_cluster() ->
|
|||
end.
|
||||
|
||||
all_coord_members() ->
|
||||
Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
|
||||
Nodes = rabbit_nodes:list_running() -- [node()],
|
||||
[{?MODULE, Node} || Node <- [node() | Nodes]].
|
||||
|
||||
version() -> 4.
|
||||
|
@ -684,8 +684,8 @@ maybe_resize_coordinator_cluster() ->
|
|||
case ra:members({?MODULE, node()}) of
|
||||
{_, Members, _} ->
|
||||
MemberNodes = [Node || {_, Node} <- Members],
|
||||
Running = rabbit_mnesia:cluster_nodes(running),
|
||||
All = rabbit_nodes:all(),
|
||||
Running = rabbit_nodes:list_running(),
|
||||
All = rabbit_nodes:list_members(),
|
||||
case Running -- MemberNodes of
|
||||
[] ->
|
||||
ok;
|
||||
|
|
|
@ -831,7 +831,7 @@ add_replica(VHost, Name, Node) ->
|
|||
{ok, Q} when ?amqqueue_is_quorum(Q) ->
|
||||
{error, quorum_queue_not_supported};
|
||||
{ok, Q} when ?amqqueue_is_stream(Q) ->
|
||||
case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) of
|
||||
case lists:member(Node, rabbit_nodes:list_running()) of
|
||||
false ->
|
||||
{error, node_not_running};
|
||||
true ->
|
||||
|
@ -849,7 +849,7 @@ delete_replica(VHost, Name, Node) ->
|
|||
{ok, Q} when ?amqqueue_is_quorum(Q) ->
|
||||
{error, quorum_queue_not_supported};
|
||||
{ok, Q} when ?amqqueue_is_stream(Q) ->
|
||||
case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) of
|
||||
case lists:member(Node, rabbit_nodes:list_running()) of
|
||||
false ->
|
||||
{error, node_not_running};
|
||||
true ->
|
||||
|
@ -909,7 +909,7 @@ max_age(Age) ->
|
|||
Age.
|
||||
|
||||
initial_cluster_size(undefined) ->
|
||||
length(rabbit_nodes:all());
|
||||
length(rabbit_nodes:list_members());
|
||||
initial_cluster_size(Val) ->
|
||||
Val.
|
||||
|
||||
|
|
|
@ -121,10 +121,10 @@ wait(TableNames, Timeout, Retries) ->
|
|||
ok ->
|
||||
ok;
|
||||
{timeout, BadTabs} ->
|
||||
AllNodes = rabbit_nodes:all(),
|
||||
AllNodes = rabbit_nodes:list_members(),
|
||||
{error, {timeout_waiting_for_tables, AllNodes, BadTabs}};
|
||||
{error, Reason} ->
|
||||
AllNodes = rabbit_nodes:all(),
|
||||
AllNodes = rabbit_nodes:list_members(),
|
||||
{error, {failed_waiting_for_tables, AllNodes, Reason}}
|
||||
end,
|
||||
case {Retries, Result} of
|
||||
|
|
|
@ -42,7 +42,7 @@ id(Node, Name) -> {Node, Name}.
|
|||
-spec count_on_all_nodes(module(), atom(), [term()], iodata()) ->
|
||||
non_neg_integer().
|
||||
count_on_all_nodes(Mod, Fun, Args, ContextMsg) ->
|
||||
Nodes = rabbit_nodes:all_running(),
|
||||
Nodes = rabbit_nodes:list_running(),
|
||||
ResL = erpc:multicall(Nodes, Mod, Fun, Args),
|
||||
sum_rpc_multicall_result(ResL, Nodes, ContextMsg, 0).
|
||||
|
||||
|
@ -83,7 +83,7 @@ count_tracked_items_mnesia(TableNameFun, CountRecPosition, Key, ContextMsg) ->
|
|||
[ContextMsg, Key, Node, Err]),
|
||||
Acc
|
||||
end
|
||||
end, 0, rabbit_nodes:all_running()).
|
||||
end, 0, rabbit_nodes:list_running()).
|
||||
|
||||
-spec match_tracked_items_ets(atom(), tuple()) -> term().
|
||||
match_tracked_items_ets(Tab, MatchSpec) ->
|
||||
|
@ -98,7 +98,7 @@ match_tracked_items_ets(Tab, MatchSpec) ->
|
|||
_ ->
|
||||
Acc
|
||||
end
|
||||
end, [], rabbit_nodes:all_running()).
|
||||
end, [], rabbit_nodes:list_running()).
|
||||
|
||||
match_tracked_items_local(Tab, MatchSpec) ->
|
||||
ets:match_object(Tab, MatchSpec).
|
||||
|
@ -110,7 +110,7 @@ match_tracked_items_mnesia(TableNameFun, MatchSpec) ->
|
|||
Acc ++ mnesia:dirty_match_object(
|
||||
Tab,
|
||||
MatchSpec)
|
||||
end, [], rabbit_nodes:all_running()).
|
||||
end, [], rabbit_nodes:list_running()).
|
||||
|
||||
-spec clear_tracking_table(atom()) -> ok.
|
||||
clear_tracking_table(TableName) ->
|
||||
|
@ -133,7 +133,7 @@ delete_tracking_table(TableName, Node, ContextMsg) ->
|
|||
|
||||
-spec delete_tracked_entry({atom(), atom(), list()}, atom(), function(), term()) -> ok.
|
||||
delete_tracked_entry(_ExistsCheckSpec = {M, F, A}, TableName, TableNameFun, Key) ->
|
||||
ClusterNodes = rabbit_nodes:all_running(),
|
||||
ClusterNodes = rabbit_nodes:list_running(),
|
||||
ExistsInCluster =
|
||||
lists:any(fun(Node) -> rpc:call(Node, M, F, A) end, ClusterNodes),
|
||||
case ExistsInCluster of
|
||||
|
|
|
@ -34,12 +34,12 @@ await_online_synchronised_mirrors(Timeout) ->
|
|||
|
||||
online_members(Component) ->
|
||||
lists:filter(fun erlang:is_pid/1,
|
||||
rabbit_misc:append_rpc_all_nodes(rabbit_nodes:all_running(),
|
||||
rabbit_misc:append_rpc_all_nodes(rabbit_nodes:list_running(),
|
||||
erlang, whereis, [Component])).
|
||||
|
||||
endangered_critical_components() ->
|
||||
CriticalComponents = [rabbit_stream_coordinator],
|
||||
Nodes = rabbit_nodes:all(),
|
||||
Nodes = rabbit_nodes:list_members(),
|
||||
lists:filter(fun (Component) ->
|
||||
NumAlive = length(online_members(Component)),
|
||||
ServerIds = lists:zip(lists:duplicate(length(Nodes), Component), Nodes),
|
||||
|
|
|
@ -422,7 +422,7 @@ is_running_on_all_nodes(VHost) ->
|
|||
|
||||
-spec vhost_cluster_state(vhost:name()) -> [{atom(), atom()}].
|
||||
vhost_cluster_state(VHost) ->
|
||||
Nodes = rabbit_nodes:all_running(),
|
||||
Nodes = rabbit_nodes:list_running(),
|
||||
lists:map(fun(Node) ->
|
||||
State = case rabbit_misc:rpc_call(Node,
|
||||
rabbit_vhost_sup_sup, is_vhost_alive,
|
||||
|
|
|
@ -52,7 +52,7 @@ init([]) ->
|
|||
|
||||
start_on_all_nodes(VHost) ->
|
||||
%% Do not try to start a vhost on booting peer nodes
|
||||
AllBooted = [Node || Node <- rabbit_nodes:all_running(), rabbit:is_booted(Node)],
|
||||
AllBooted = [Node || Node <- rabbit_nodes:list_running()],
|
||||
Nodes = [node() | AllBooted],
|
||||
Results = [{Node, start_vhost(VHost, Node)} || Node <- Nodes],
|
||||
Failures = lists:filter(fun
|
||||
|
@ -67,7 +67,7 @@ start_on_all_nodes(VHost) ->
|
|||
end.
|
||||
|
||||
delete_on_all_nodes(VHost) ->
|
||||
_ = [ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
|
||||
_ = [ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:list_running() ],
|
||||
ok.
|
||||
|
||||
stop_and_delete_vhost(VHost) ->
|
||||
|
|
|
@ -216,6 +216,9 @@ setup_feature_flags_file(Config) ->
|
|||
|
||||
start_controller() ->
|
||||
?LOG_INFO("Starting feature flags controller"),
|
||||
ThisNode = node(),
|
||||
ok = rabbit_feature_flags:override_nodes([ThisNode]),
|
||||
ok = rabbit_feature_flags:override_running_nodes([ThisNode]),
|
||||
{ok, Pid} = rabbit_ff_controller:start(),
|
||||
?LOG_INFO("Feature flags controller: ~tp", [Pid]),
|
||||
ok.
|
||||
|
|
|
@ -78,7 +78,7 @@ list_queues_online_and_offline(Config) ->
|
|||
|
||||
rabbit_ct_helpers:await_condition(
|
||||
fun() ->
|
||||
[A] == rpc:call(A, rabbit_mnesia, cluster_nodes, [running])
|
||||
[A] == rpc:call(A, rabbit_nodes, list_running, [])
|
||||
end, 60000),
|
||||
|
||||
GotUp = lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(Config, A,
|
||||
|
|
|
@ -694,7 +694,7 @@ publish_coordinator_unavailable(Config) ->
|
|||
ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
|
||||
rabbit_ct_helpers:await_condition(
|
||||
fun () ->
|
||||
N = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_mnesia, cluster_nodes, [running]),
|
||||
N = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_nodes, list_running, []),
|
||||
length(N) == 1
|
||||
end),
|
||||
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
|
||||
|
|
|
@ -35,6 +35,9 @@ groups() ->
|
|||
|
||||
init_per_suite(Config) ->
|
||||
application:set_env(rabbit, feature_flags_file, "", [{persistent, true}]),
|
||||
ThisNode = node(),
|
||||
ok = rabbit_feature_flags:override_nodes([ThisNode]),
|
||||
ok = rabbit_feature_flags:override_running_nodes([ThisNode]),
|
||||
{ok, _Pid} = rabbit_ff_controller:start(),
|
||||
Config.
|
||||
|
||||
|
|
|
@ -62,9 +62,18 @@ defmodule RabbitMQ.CLI.Core.Helpers do
|
|||
end
|
||||
|
||||
def with_nodes_in_cluster(node, fun, timeout \\ :infinity) do
|
||||
case :rpc.call(node, :rabbit_mnesia, :cluster_nodes, [:running], timeout) do
|
||||
{:badrpc, _} = err -> err
|
||||
value -> fun.(value)
|
||||
case :rpc.call(node, :rabbit_nodes, :list_running, [], timeout) do
|
||||
{:badrpc, {:EXIT, {:undef, [{:rabbit_nodes, :list_running, [], []} | _]}}} ->
|
||||
case :rpc.call(node, :rabbit_mnesia, :cluster_nodes, [:running], timeout) do
|
||||
{:badrpc, _} = err -> err
|
||||
value -> fun.(value)
|
||||
end
|
||||
|
||||
{:badrpc, _} = err ->
|
||||
err
|
||||
|
||||
value ->
|
||||
fun.(value)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do
|
|||
err
|
||||
|
||||
status ->
|
||||
case :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :cluster_nodes, [:running]) do
|
||||
case :rabbit_misc.rpc_call(node_name, :rabbit_nodes, :list_running, []) do
|
||||
{:badrpc, _} = err ->
|
||||
err
|
||||
|
||||
|
|
|
@ -1661,7 +1661,13 @@ kill_node_after(Config, Node, Sleep) ->
|
|||
kill_node(Config, Node).
|
||||
|
||||
cluster_members_online(Config, Node) ->
|
||||
rpc(Config, Node, rabbit_nodes, all_running, []).
|
||||
try
|
||||
rpc(Config, Node, rabbit_nodes, list_running, [])
|
||||
catch
|
||||
error:{exception, undef, [{rabbit_nodes, list_running, [], _} | _]} ->
|
||||
Nodes = rpc(Config, Node, rabbit_nodes, all_running, []),
|
||||
lists:filter(fun rabbit:is_running/1, Nodes)
|
||||
end.
|
||||
|
||||
await_os_pid_death(Pid) ->
|
||||
case rabbit_misc:is_os_process_alive(Pid) of
|
||||
|
|
|
@ -162,5 +162,5 @@ run_four_rabbitmq_nodes(Config) ->
|
|||
Config, rabbit_mnesia, is_clustered, [])),
|
||||
ClusteredNodes = lists:sort(
|
||||
rabbit_ct_broker_helpers:rpc(
|
||||
Config, 0, rabbit_mnesia, cluster_nodes, [running])),
|
||||
Config, 0, rabbit_nodes, list_running, [])),
|
||||
?assertEqual(ClusteredNodes, RabbitMQNodes).
|
||||
|
|
|
@ -251,7 +251,7 @@ handle_pre_hibernate(State) ->
|
|||
%% rabbit_mgmt_db is hibernating the odds are rabbit_event is
|
||||
%% quiescing in some way too).
|
||||
_ = rpc:multicall(
|
||||
rabbit_nodes:all_running(), rabbit_mgmt_db_handler, gc, []),
|
||||
rabbit_nodes:list_running(), rabbit_mgmt_db_handler, gc, []),
|
||||
{hibernate, State}.
|
||||
|
||||
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
|
||||
|
|
|
@ -27,7 +27,7 @@ allowed_methods(ReqData, Context) ->
|
|||
resource_exists(ReqData, Context) ->
|
||||
case get_node(ReqData) of
|
||||
none -> {true, ReqData, Context};
|
||||
{ok, Node} -> {lists:member(Node, rabbit_nodes:all_running()),
|
||||
{ok, Node} -> {rabbit:is_running(Node),
|
||||
ReqData, Context}
|
||||
end.
|
||||
|
||||
|
|
|
@ -105,7 +105,7 @@ gc_exchanges() ->
|
|||
gc_process_and_entity(channel_exchange_stats_fine_stats, GbSet).
|
||||
|
||||
gc_nodes() ->
|
||||
Nodes = rabbit_nodes:all(),
|
||||
Nodes = rabbit_nodes:list_members(),
|
||||
GbSet = gb_sets:from_list(Nodes),
|
||||
gc_entity(node_stats, GbSet),
|
||||
gc_entity(node_coarse_stats, GbSet),
|
||||
|
|
|
@ -30,7 +30,7 @@ reset() ->
|
|||
|
||||
reset_all() ->
|
||||
_ = [rpc:call(Node, rabbit_mgmt_storage, reset, [])
|
||||
|| Node <- rabbit_nodes:all_running()],
|
||||
|| Node <- rabbit_nodes:list_running()],
|
||||
ok.
|
||||
|
||||
init(_) ->
|
||||
|
|
|
@ -25,7 +25,7 @@ server_id(Node) ->
|
|||
{?ID_NAME, Node}.
|
||||
|
||||
all_node_ids() ->
|
||||
[server_id(N) || N <- rabbit_nodes:all(),
|
||||
[server_id(N) || N <- rabbit_nodes:list_members(),
|
||||
can_participate_in_clientid_tracking(N)].
|
||||
|
||||
start() ->
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
%%
|
||||
|
||||
lock(Name) ->
|
||||
Nodes = rabbit_nodes:all_running(),
|
||||
Nodes = rabbit_nodes:list_running(),
|
||||
Retries = rabbit_nodes:lock_retries(),
|
||||
%% try to acquire a lock to avoid duplicate starts
|
||||
LockId = case global:set_lock({dynamic_shovel, Name}, Nodes, Retries) of
|
||||
|
@ -24,7 +24,7 @@ lock(Name) ->
|
|||
LockId.
|
||||
|
||||
unlock(LockId) ->
|
||||
Nodes = rabbit_nodes:all_running(),
|
||||
Nodes = rabbit_nodes:list_running(),
|
||||
case LockId of
|
||||
undefined -> ok;
|
||||
Value -> global:del_lock({dynamic_shovel, Value}, Nodes)
|
||||
|
|
|
@ -72,12 +72,12 @@ status() ->
|
|||
|
||||
-spec cluster_status() -> [status_tuple()].
|
||||
cluster_status() ->
|
||||
Nodes = rabbit_nodes:all_running(),
|
||||
Nodes = rabbit_nodes:list_running(),
|
||||
lists:usort(rabbit_misc:append_rpc_all_nodes(Nodes, ?MODULE, status, [])).
|
||||
|
||||
-spec cluster_status_with_nodes() -> [status_tuple()].
|
||||
cluster_status_with_nodes() ->
|
||||
Nodes = rabbit_nodes:all_running(),
|
||||
Nodes = rabbit_nodes:list_running(),
|
||||
lists:foldl(
|
||||
fun(Node, Acc) ->
|
||||
case rabbit_misc:rpc_call(Node, ?MODULE, status, []) of
|
||||
|
|
|
@ -52,7 +52,7 @@ init([]) ->
|
|||
end}
|
||||
end,
|
||||
|
||||
Nodes = rabbit_nodes:all(),
|
||||
Nodes = rabbit_nodes:list_members(),
|
||||
OsirisConf = #{nodes => Nodes},
|
||||
|
||||
ServerConfiguration =
|
||||
|
|
Loading…
Reference in New Issue