parent
e08ee11ca3
commit
05d15c7865
|
|
@ -29,6 +29,8 @@
|
||||||
-export([become_leader/2, update_metrics/2]).
|
-export([become_leader/2, update_metrics/2]).
|
||||||
-export([rpc_delete_metrics/1]).
|
-export([rpc_delete_metrics/1]).
|
||||||
-export([format/1]).
|
-export([format/1]).
|
||||||
|
-export([io_metrics_handler/2, io_metrics_handler/3]).
|
||||||
|
-export([open_files/1]).
|
||||||
|
|
||||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||||
-include_lib("stdlib/include/qlc.hrl").
|
-include_lib("stdlib/include/qlc.hrl").
|
||||||
|
|
@ -81,7 +83,8 @@
|
||||||
garbage_collection,
|
garbage_collection,
|
||||||
leader,
|
leader,
|
||||||
online,
|
online,
|
||||||
members
|
members,
|
||||||
|
open_files
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
%%----------------------------------------------------------------------------
|
||||||
|
|
@ -114,7 +117,8 @@ declare(#amqqueue{name = QName,
|
||||||
case rabbit_amqqueue:internal_declare(NewQ0, false) of
|
case rabbit_amqqueue:internal_declare(NewQ0, false) of
|
||||||
{created, NewQ} ->
|
{created, NewQ} ->
|
||||||
RaMachine = ra_machine(NewQ),
|
RaMachine = ra_machine(NewQ),
|
||||||
case ra:start_cluster(RaName, RaMachine,
|
case ra:start_cluster(RaName, RaMachine, #{metrics_handler =>
|
||||||
|
{?MODULE, io_metrics_handler}},
|
||||||
[{RaName, Node} || Node <- Nodes]) of
|
[{RaName, Node} || Node <- Nodes]) of
|
||||||
{ok, _, _} ->
|
{ok, _, _} ->
|
||||||
FState = init_state(Id, QName),
|
FState = init_state(Id, QName),
|
||||||
|
|
@ -207,6 +211,16 @@ reductions(Name) ->
|
||||||
0
|
0
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
io_metrics_handler(open, Fun) ->
|
||||||
|
file_handle_cache_stats:safe_update(io_file_handle_open_attempt, Fun);
|
||||||
|
io_metrics_handler(close, Fun) ->
|
||||||
|
Fun();
|
||||||
|
io_metrics_handler(Type, Fun) ->
|
||||||
|
file_handle_cache_stats:update(Type, Fun).
|
||||||
|
|
||||||
|
io_metrics_handler(Type, Bytes, Fun) ->
|
||||||
|
file_handle_cache_stats:update(Type, Bytes, Fun).
|
||||||
|
|
||||||
recover(Queues) ->
|
recover(Queues) ->
|
||||||
[begin
|
[begin
|
||||||
case ra:restart_node({Name, node()}) of
|
case ra:restart_node({Name, node()}) of
|
||||||
|
|
@ -218,17 +232,19 @@ recover(Queues) ->
|
||||||
Err == name_not_registered ->
|
Err == name_not_registered ->
|
||||||
% queue was never started on this node
|
% queue was never started on this node
|
||||||
% so needs to be started from scratch.
|
% so needs to be started from scratch.
|
||||||
Machine = ra_machine(Q),
|
Machine = ra_machine(Q0),
|
||||||
RaNodes = [{Name, Node} || Node <- Nodes],
|
RaNodes = [{Name, Node} || Node <- Nodes],
|
||||||
% TODO: should we crash the vhost here or just log the error
|
% TODO: should we crash the vhost here or just log the error
|
||||||
% and continue?
|
% and continue?
|
||||||
ok = ra:start_node(Name, {Name, node()},
|
ok = ra:start_node(Name, {Name, node()},
|
||||||
Machine, RaNodes)
|
Machine, #{metrics_handler =>
|
||||||
|
{?MODULE, io_metrics_handler}},
|
||||||
|
RaNodes)
|
||||||
end,
|
end,
|
||||||
{_, Q} = rabbit_amqqueue:internal_declare(Q, true),
|
{_, Q} = rabbit_amqqueue:internal_declare(Q0, true),
|
||||||
Q
|
Q
|
||||||
end || #amqqueue{pid = {Name, _},
|
end || #amqqueue{pid = {Name, _},
|
||||||
quorum_nodes = Nodes} = Q <- Queues].
|
quorum_nodes = Nodes} = Q0 <- Queues].
|
||||||
|
|
||||||
stop(VHost) ->
|
stop(VHost) ->
|
||||||
_ = [ra:stop_node(Pid) || #amqqueue{pid = Pid} <- find_quorum_queues(VHost)],
|
_ = [ra:stop_node(Pid) || #amqqueue{pid = Pid} <- find_quorum_queues(VHost)],
|
||||||
|
|
@ -478,8 +494,21 @@ i(members, #amqqueue{quorum_nodes = Nodes}) ->
|
||||||
Nodes;
|
Nodes;
|
||||||
i(online, Q) -> online(Q);
|
i(online, Q) -> online(Q);
|
||||||
i(leader, Q) -> leader(Q);
|
i(leader, Q) -> leader(Q);
|
||||||
|
i(open_files, #amqqueue{pid = {Name, _},
|
||||||
|
quorum_nodes = Nodes}) ->
|
||||||
|
{Data, _} = rpc:multicall(Nodes, rabbit_quorum_queue, open_files, [Name]),
|
||||||
|
lists:flatten(Data);
|
||||||
i(_K, _Q) -> ''.
|
i(_K, _Q) -> ''.
|
||||||
|
|
||||||
|
open_files(Name) ->
|
||||||
|
case whereis(Name) of
|
||||||
|
undefined -> {node(), 0};
|
||||||
|
Pid -> case ets:lookup(ra_open_file_metrics, Pid) of
|
||||||
|
[] -> {node(), 0};
|
||||||
|
[{_, Count}] -> {node(), Count}
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
leader(#amqqueue{pid = {Name, Leader}}) ->
|
leader(#amqqueue{pid = {Name, Leader}}) ->
|
||||||
case is_process_alive(Name, Leader) of
|
case is_process_alive(Name, Leader) of
|
||||||
true -> Leader;
|
true -> Leader;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue