Optimisations

This commit is contained in:
Diana Corbacho 2016-01-20 10:49:26 +00:00
parent b99df47451
commit 25b5da5dbd
10 changed files with 470 additions and 383 deletions

View File

@ -264,8 +264,9 @@ handle_call({get_overview, User, Ranges}, _From,
X <- rabbit_exchange:list(V)])}, X <- rabbit_exchange:list(V)])},
{connections, F(created_events(connection_stats))}, {connections, F(created_events(connection_stats))},
{channels, F(created_events(channel_stats))}], {channels, F(created_events(channel_stats))}],
FormatMessage = format_samples(Ranges, MessageStats, Interval), Now = time_compat:os_system_time(milli_seconds),
FormatQueue = format_samples(Ranges, QueueStats, Interval), FormatMessage = format_samples(Ranges, MessageStats, Interval, Now),
FormatQueue = format_samples(Ranges, QueueStats, Interval, Now),
[rabbit_mgmt_stats:free(S) || {S, _, _} <- MessageStats], [rabbit_mgmt_stats:free(S) || {S, _, _} <- MessageStats],
[rabbit_mgmt_stats:free(S) || {S, _, _} <- QueueStats], [rabbit_mgmt_stats:free(S) || {S, _, _} <- QueueStats],
reply([{message_stats, FormatMessage}, reply([{message_stats, FormatMessage},
@ -365,11 +366,12 @@ second(Id) ->
list_queue_stats(Ranges, Objs, Interval) -> list_queue_stats(Ranges, Objs, Interval) ->
adjust_hibernated_memory_use( adjust_hibernated_memory_use(
merge_stats(Objs, queue_funs(Ranges, Interval))). merge_queue_stats(Objs, queue_funs(Ranges, Interval))).
detail_queue_stats(Ranges, Objs, Interval) -> detail_queue_stats(Ranges, Objs, Interval) ->
adjust_hibernated_memory_use( adjust_hibernated_memory_use(
merge_stats(Objs, [consumer_details_fun( merge_queue_stats(Objs,
[consumer_details_fun(
fun (Props) -> id_lookup(queue_stats, Props) end, fun (Props) -> id_lookup(queue_stats, Props) end,
consumers_by_queue), consumers_by_queue),
detail_stats_fun(Ranges, ?QUEUE_DETAILS, Interval) detail_stats_fun(Ranges, ?QUEUE_DETAILS, Interval)
@ -378,7 +380,7 @@ detail_queue_stats(Ranges, Objs, Interval) ->
queue_funs(Ranges, Interval) -> queue_funs(Ranges, Interval) ->
[basic_stats_fun(queue_stats), [basic_stats_fun(queue_stats),
simple_stats_fun(Ranges, queue_stats, Interval), simple_stats_fun(Ranges, queue_stats, Interval),
augment_msg_stats_fun()]. augment_queue_msg_stats_fun()].
list_exchange_stats(Ranges, Objs, Interval) -> list_exchange_stats(Ranges, Objs, Interval) ->
merge_stats(Objs, [simple_stats_fun(Ranges, exchange_stats, Interval), merge_stats(Objs, [simple_stats_fun(Ranges, exchange_stats, Interval),
@ -423,14 +425,29 @@ merge_stats(Objs, Funs) ->
%% * augment_msg_stats_fun() only needs the original object. Otherwise, %% * augment_msg_stats_fun() only needs the original object. Otherwise,
%% must fold over a very longs list %% must fold over a very longs list
%% * All other funs only require the Type that is in the original Obj %% * All other funs only require the Type that is in the original Obj
[lists:foldl(fun (Fun, Props) -> combine(Fun(Obj), Props) end, Obj, Funs) [combine_all_funs(Funs, Obj, Obj) || Obj <- Objs].
|| Obj <- Objs].
combine_all_funs([Fun | Funs], Obj, Props) ->
combine_all_funs(Funs, Obj, combine(Fun(Obj), Props));
combine_all_funs([], _Obj, Props) ->
Props.
merge_queue_stats(Objs, Funs) ->
%% Don't pass the props to the Fun in combine, as it contains the results
%% from previous funs and:
%% * augment_msg_stats_fun() only needs the original object. Otherwise,
%% must fold over a very longs list
%% * All other funs only require the Type that is in the original Obj
[begin
{pid, Pid} = lists:keyfind(pid, 1, Obj),
{Pid, combine_all_funs(Funs, Obj, rabbit_mgmt_format:strip_queue_pids(Obj))}
end || Obj <- Objs].
combine(New, Old) -> combine(New, Old) ->
case pget(state, Old) of case pget(state, Old) of
unknown -> New ++ Old; unknown -> New ++ Old;
live -> New ++ proplists:delete(state, Old); live -> New ++ lists:keydelete(state, 1, Old);
_ -> proplists:delete(state, New) ++ Old _ -> lists:keydelete(state, 1, New) ++ Old
end. end.
%% i.e. the non-calculated stats %% i.e. the non-calculated stats
@ -442,12 +459,12 @@ basic_stats_fun(Type) ->
%% i.e. coarse stats, and fine stats aggregated up to a single number per thing %% i.e. coarse stats, and fine stats aggregated up to a single number per thing
simple_stats_fun(Ranges, Type, Interval) -> simple_stats_fun(Ranges, Type, Interval) ->
{Msg, Other} = read_simple_stats(Type),
Now = time_compat:os_system_time(milli_seconds),
fun (Props) -> fun (Props) ->
Id = id_lookup(Type, Props), Id = id_lookup(Type, Props),
ManyStats = read_simple_stats(Type, Id), OtherStats = format_samples(Ranges, {Id, Other}, Interval, Now),
{Msg, Other} = extract_msg_stats(ManyStats), case format_samples(Ranges, {Id, Msg}, Interval, Now) of
OtherStats = format_samples(Ranges, Other, Interval),
case format_samples(Ranges, Msg, Interval) of
[] -> [] ->
OtherStats; OtherStats;
MsgStats -> MsgStats ->
@ -457,9 +474,10 @@ simple_stats_fun(Ranges, Type, Interval) ->
%% i.e. fine stats that are broken out per sub-thing %% i.e. fine stats that are broken out per sub-thing
detail_stats_fun(Ranges, {IdType, FineSpecs}, Interval) -> detail_stats_fun(Ranges, {IdType, FineSpecs}, Interval) ->
Now = time_compat:os_system_time(milli_seconds),
fun (Props) -> fun (Props) ->
Id = id_lookup(IdType, Props), Id = id_lookup(IdType, Props),
[detail_stats(Ranges, Name, AggregatedStatsType, IdFun(Id), Interval) [detail_stats(Ranges, Name, AggregatedStatsType, IdFun(Id), Interval, Now)
|| {Name, AggregatedStatsType, IdFun} <- FineSpecs] || {Name, AggregatedStatsType, IdFun} <- FineSpecs]
end. end.
@ -486,23 +504,24 @@ detail_and_basic_stats_fun(Type, Ranges, {IdType, FineSpecs}, Interval) ->
[{K, Items2}] [{K, Items2}]
end. end.
read_simple_stats(Type, Id) -> read_simple_stats(EventType) ->
Tables = rabbit_mgmt_stats_tables:aggr_tables(Type), lists:partition(
[{Table, rabbit_mgmt_stats_tables:type_from_table(Table), Id} fun({_, Type}) ->
|| Table <- Tables]. lists:member(Type, [fine_stats, deliver_get, queue_msg_rates])
end, rabbit_mgmt_stats_tables:aggr_tables(EventType)).
read_detail_stats(Type, Id) -> read_detail_stats(EventType, Id) ->
Tables = rabbit_mgmt_stats_tables:aggr_tables(Type), Tables = rabbit_mgmt_stats_tables:aggr_tables(EventType),
Keys = [{Table, Key} || Table <- Tables, Keys = [{Table, Type, Key} || {Table, Type} <- Tables,
Key <- rabbit_mgmt_stats:get_keys(Table, Id)], Key <- rabbit_mgmt_stats:get_keys(Table, Id)],
lists:foldl( lists:foldl(
fun ({Table, Id0}, L) -> fun ({_Table, _Type, Id0} = Entry, L) ->
NewId = revert(Id, Id0), NewId = revert(Id, Id0),
case lists:keyfind(NewId, 1, L) of case lists:keyfind(NewId, 1, L) of
false -> false ->
[{NewId, [{Table, rabbit_mgmt_stats_tables:type_from_table(Table), Id0}]} | L]; [{NewId, [Entry]} | L];
{NewId, KVs} -> {NewId, KVs} ->
lists:keyreplace(NewId, 1, L, {NewId, [{Table, rabbit_mgmt_stats_tables:type_from_table(Table), Id0} | KVs]}) lists:keyreplace(NewId, 1, L, {NewId, [Entry | KVs]})
end end
end, [], Keys). end, [], Keys).
@ -511,14 +530,9 @@ revert({'_', _}, {Id, _}) ->
revert({_, '_'}, {_, Id}) -> revert({_, '_'}, {_, Id}) ->
Id. Id.
extract_msg_stats(ManyStats) -> detail_stats(Ranges, Name, AggregatedStatsType, Id, Interval, Now) ->
lists:partition(fun({_, Type, _}) ->
lists:member(Type, [fine_stats, deliver_get, queue_msg_rates])
end, ManyStats).
detail_stats(Ranges, Name, AggregatedStatsType, Id, Interval) ->
{Name, {Name,
[[{stats, format_samples(Ranges, KVs, Interval)} | format_detail_id(G)] [[{stats, format_samples(Ranges, KVs, Interval, Now)} | format_detail_id(G)]
|| {G, KVs} <- read_detail_stats(AggregatedStatsType, Id)]}. || {G, KVs} <- read_detail_stats(AggregatedStatsType, Id)]}.
format_detail_id(ChPid) when is_pid(ChPid) -> format_detail_id(ChPid) when is_pid(ChPid) ->
@ -528,16 +542,31 @@ format_detail_id(#resource{name = Name, virtual_host = Vhost, kind = Kind}) ->
format_detail_id(Node) when is_atom(Node) -> format_detail_id(Node) when is_atom(Node) ->
[{name, Node}]. [{name, Node}].
format_samples(Ranges, ManyStats, Interval) -> format_samples(Ranges, {Id, ManyStats}, Interval, Now) ->
lists:append( lists:append(foldl_stats_format(ManyStats, Id, Ranges, Interval, Now, []));
lists:append( format_samples(Ranges, ManyStats, Interval, Now) ->
[case rabbit_mgmt_stats:is_blank(Table, Id, Record) of lists:append(foldl_stats_format(ManyStats, Ranges, Interval, Now, [])).
foldl_stats_format([{Table, Record} | T], Id, Ranges, Interval, Now, Acc) ->
foldl_stats_format(T, Id, Ranges, Interval, Now,
stats_format(Table, Id, Record, Ranges, Interval, Now, Acc));
foldl_stats_format([], _Id, _Ranges, _Interval, _Now, Acc) ->
Acc.
foldl_stats_format([{Table, Record, Id} | T], Ranges, Interval, Now, Acc) ->
foldl_stats_format(T, Ranges, Interval, Now,
stats_format(Table, Id, Record, Ranges, Interval, Now, Acc));
foldl_stats_format([], _Ranges, _Interval, _Now, Acc) ->
Acc.
stats_format(Table, Id, Record, Ranges, Interval, Now, Acc) ->
case rabbit_mgmt_stats:is_blank(Table, Id, Record) of
true -> true ->
[]; Acc;
false -> false ->
rabbit_mgmt_stats:format(pick_range(Record, Ranges), [rabbit_mgmt_stats:format(pick_range(Record, Ranges),
Table, Id, Interval, Record) Table, Id, Interval, Record, Now) | Acc]
end || {Table, Record, Id} <- ManyStats])). end.
pick_range(queue_msg_counts, {RangeL, _RangeM, _RangeD, _RangeN}) -> pick_range(queue_msg_counts, {RangeL, _RangeM, _RangeD, _RangeN}) ->
RangeL; RangeL;
@ -557,16 +586,16 @@ pick_range(K, {_RangeL, _RangeM, _RangeD, RangeN})
%% hibernation, so to do it when we receive a queue stats event would %% hibernation, so to do it when we receive a queue stats event would
%% be fiddly and racy. This should be quite cheap though. %% be fiddly and racy. This should be quite cheap though.
adjust_hibernated_memory_use(Qs) -> adjust_hibernated_memory_use(Qs) ->
Pids = [pget(pid, Q) || Pids = [Pid || {Pid, Q} <- Qs, pget(idle_since, Q, not_idle) =/= not_idle],
Q <- Qs, pget(idle_since, Q, not_idle) =/= not_idle],
%% We use delegate here not for ordering reasons but because we %% We use delegate here not for ordering reasons but because we
%% want to get the right amount of parallelism and minimise %% want to get the right amount of parallelism and minimise
%% cross-cluster communication. %% cross-cluster communication.
{Mem, _BadNodes} = delegate:invoke(Pids, {erlang, process_info, [memory]}), {Mem, _BadNodes} = delegate:invoke(Pids, {erlang, process_info, [memory]}),
[case lists:keyfind(pget(pid, Q), 1, Mem) of MemDict = dict:from_list([{P, M} || {P, M = {memory, _}} <- Mem]),
{_, {memory, _} = Memory} -> [Memory|proplists:delete(memory, Q)]; [case dict:find(Pid, MemDict) of
_ -> Q error -> Q;
end || Q <- Qs]. {ok, Memory} -> [Memory|proplists:delete(memory, Q)]
end || {Pid, Q} <- Qs].
created_event(Name, Type) -> created_event(Name, Type) ->
case ets:select(Type, [{{{'_', '$1'}, '$2', '$3'}, [{'==', 'create', '$1'}, case ets:select(Type, [{{{'_', '$1'}, '$2', '$3'}, [{'==', 'create', '$1'},
@ -597,7 +626,7 @@ consumer_details_fun(KeyFun, TableName) ->
augment_consumer(Obj) -> augment_consumer(Obj) ->
[{queue, rabbit_mgmt_format:resource(pget(queue, Obj))} | [{queue, rabbit_mgmt_format:resource(pget(queue, Obj))} |
proplists:delete(queue, Obj)]. lists:keydelete(queue, 1, Obj)].
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
%% Internal, query-time summing for overview %% Internal, query-time summing for overview
@ -618,22 +647,32 @@ augment_msg_stats(Props) ->
augment_msg_stats_fun() -> augment_msg_stats_fun() ->
fun(Props) -> fun(Props) ->
lists:foldl(fun({_, none}, Acc) -> augment_details(Props, [])
Acc; end.
({_, unknown}, Acc) ->
Acc; augment_details([{_, none} | T], Acc) ->
({connection, Value}, Acc) -> augment_details(T, Acc);
[{connection_details, augment_connection_pid(Value)} augment_details([{_, unknown} | T], Acc) ->
| Acc]; augment_details(T, Acc);
({channel, Value}, Acc) -> augment_details([{connection, Value} | T], Acc) ->
[{channel_details, augment_channel_pid(Value)} augment_details(T, [{connection_details, augment_connection_pid(Value)} | Acc]);
| Acc]; augment_details([{channel, Value} | T], Acc) ->
({owner_pid, Value}, Acc) -> augment_details(T, [{channel_details, augment_channel_pid(Value)} | Acc]);
[{owner_pid_details, augment_connection_pid(Value)} augment_details([{owner_pid, Value} | T], Acc) ->
| Acc]; augment_details(T, [{owner_pid_details, augment_connection_pid(Value)} | Acc]);
(_, Acc) -> augment_details([_ | T], Acc) ->
Acc augment_details(T, Acc);
end, [], Props) augment_details([], Acc) ->
Acc.
augment_queue_msg_stats_fun() ->
fun(Props) ->
case lists:keyfind(owner_pid, 1, Props) of
{owner_pid, Value} when is_pid(Value) ->
[{owner_pid_details, augment_connection_pid(Value)}];
_ ->
[]
end
end. end.
augment_channel_pid(Pid) -> augment_channel_pid(Pid) ->

View File

@ -28,6 +28,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1]). code_change/3, handle_pre_hibernate/1]).
-export([prioritise_cast/3]).
%% For testing %% For testing
-export([override_lookups/1, reset_lookups/0]). -export([override_lookups/1, reset_lookups/0]).
@ -36,6 +38,18 @@
%% See the comment on rabbit_mgmt_db for the explanation of %% See the comment on rabbit_mgmt_db for the explanation of
%% events and stats. %% events and stats.
-define(DROP_LENGTH, 1000).
prioritise_cast({event, #event{props = Props}}, Len, _State)
when Len > ?DROP_LENGTH ->
case pget(idle_since, Props) of
unknown ->
drop;
_ -> 0
end;
prioritise_cast(_Msg, _Len, _State) ->
0.
%% Although this gen_server could process all types of events through the %% Although this gen_server could process all types of events through the
%% handle_cast, rabbit_mgmt_db_handler (in the management agent) forwards %% handle_cast, rabbit_mgmt_db_handler (in the management agent) forwards
%% the prioritiy events channel_stats and queue_stats to their own gen_servers %% the prioritiy events channel_stats and queue_stats to their own gen_servers
@ -72,7 +86,7 @@ init([Ref]) ->
{ok, RatesMode} = application:get_env(rabbitmq_management, rates_mode), {ok, RatesMode} = application:get_env(rabbitmq_management, rates_mode),
rabbit_node_monitor:subscribe(self()), rabbit_node_monitor:subscribe(self()),
rabbit_log:info("Statistics event collector started.~n"), rabbit_log:info("Statistics event collector started.~n"),
?TABLES = [ets:new(Key, [public, ordered_set, named_table]) || Key <- ?TABLES], ?TABLES = [ets:new(Key, [public, set, named_table]) || Key <- ?TABLES],
?AGGR_TABLES = [rabbit_mgmt_stats:blank(Name) || Name <- ?AGGR_TABLES], ?AGGR_TABLES = [rabbit_mgmt_stats:blank(Name) || Name <- ?AGGR_TABLES],
{ok, reset_lookups( {ok, reset_lookups(
#state{interval = Interval, #state{interval = Interval,

View File

@ -169,7 +169,8 @@ handle_stats(TName, Stats, Timestamp, Funs, RatesKeys, NoAggRatesKeys,
append_set_of_samples( append_set_of_samples(
Stats, Timestamp, OldStats, IdSamples, RatesKeys, NoAggRatesKeys, State), Stats, Timestamp, OldStats, IdSamples, RatesKeys, NoAggRatesKeys, State),
StripKeys = [id_name(TName)] ++ RatesKeys ++ ?FINE_STATS_TYPES, StripKeys = [id_name(TName)] ++ RatesKeys ++ ?FINE_STATS_TYPES,
Stats1 = [{K, V} || {K, V} <- Stats, not lists:member(K, StripKeys)], Stats1 = [{K, V} || {K, V} <- Stats, not lists:member(K, StripKeys),
V =/= unknown],
Stats2 = rabbit_mgmt_format:format(Stats1, Funs), Stats2 = rabbit_mgmt_format:format(Stats1, Funs),
ets:insert(TName, {{Id, stats}, Stats2, Timestamp}), ets:insert(TName, {{Id, stats}, Stats2, Timestamp}),
ok. ok.
@ -253,7 +254,7 @@ handle_fine_stat(Id, Stats, Timestamp, OldStats, State) ->
delete_samples(Type, Id0) -> delete_samples(Type, Id0) ->
[rabbit_mgmt_stats:delete_stats(Table, Id0) [rabbit_mgmt_stats:delete_stats(Table, Id0)
|| Table <- rabbit_mgmt_stats_tables:aggr_tables(Type)]. || {Table, _} <- rabbit_mgmt_stats_tables:aggr_tables(Type)].
append_set_of_samples(Stats, TS, OldStats, Id, Keys, NoAggKeys, State) -> append_set_of_samples(Stats, TS, OldStats, Id, Keys, NoAggKeys, State) ->
%% Refactored to avoid duplicated calls to ignore_coarse_sample, ceil and %% Refactored to avoid duplicated calls to ignore_coarse_sample, ceil and

View File

@ -30,6 +30,8 @@
format_arguments/1, format_connection_created/1, format_arguments/1, format_connection_created/1,
format_accept_content/1, format_args/1]). format_accept_content/1, format_args/1]).
-export([strip_queue_pids/1]).
-import(rabbit_misc, [pget/2, pset/3]). -import(rabbit_misc, [pget/2, pset/3]).
-include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit_common/include/rabbit.hrl").
@ -45,6 +47,16 @@ format(Stats, {Fs, false}) ->
lists:concat([Fs(Stat) || {_Name, Value} = Stat <- Stats, lists:concat([Fs(Stat) || {_Name, Value} = Stat <- Stats,
Value =/= unknown]). Value =/= unknown]).
format_queue_stats({exclusive_consumer_pid, _}) ->
[];
format_queue_stats({slave_pids, ''}) ->
[];
format_queue_stats({slave_pids, Pids}) ->
[{slave_nodes, [node(Pid) || Pid <- Pids]}];
format_queue_stats({synchronised_slave_pids, ''}) ->
[];
format_queue_stats({synchronised_slave_pids, Pids}) ->
[{synchronised_slave_nodes, [node(Pid) || Pid <- Pids]}];
format_queue_stats({backing_queue_status, Value}) -> format_queue_stats({backing_queue_status, Value}) ->
[{backing_queue_status, properties(Value)}]; [{backing_queue_status, properties(Value)}];
format_queue_stats({idle_since, Value}) -> format_queue_stats({idle_since, Value}) ->
@ -359,43 +371,59 @@ to_basic_properties(Props) ->
a2b(A) -> a2b(A) ->
list_to_binary(atom_to_list(A)). list_to_binary(atom_to_list(A)).
strip_queue_pids(Item) ->
strip_queue_pids(Item, []).
strip_queue_pids([{_, unknown} | T], Acc) ->
strip_queue_pids(T, Acc);
strip_queue_pids([{pid, Pid} | T], Acc) when is_pid(Pid) ->
strip_queue_pids(T, [{node, node(Pid)} | Acc]);
strip_queue_pids([{pid, _} | T], Acc) ->
strip_queue_pids(T, Acc);
strip_queue_pids([{owner_pid, _} | T], Acc) ->
strip_queue_pids(T, Acc);
strip_queue_pids([Any | T], Acc) ->
strip_queue_pids(T, [Any | Acc]);
strip_queue_pids([], Acc) ->
Acc.
%% Items can be connections, channels, consumers or queues, hence remove takes %% Items can be connections, channels, consumers or queues, hence remove takes
%% various items. %% various items.
strip_pids(Item = [T | _]) when is_tuple(T) -> strip_pids(Item = [T | _]) when is_tuple(T) ->
lists:foldr( strip_pids(Item, []);
fun({_, unknown}, Acc) ->
Acc;
({pid, Pid}, Acc) when is_pid(Pid) ->
[{node, node(Pid)} | Acc];
({pid, _}, Acc) ->
Acc;
({connection, _}, Acc) ->
Acc;
({owner_pid, _}, Acc) ->
Acc;
({channel, _}, Acc) ->
Acc;
({exclusive_consumer_pid, _}, Acc) ->
Acc;
({slave_pids, ''}, Acc) ->
Acc;
({slave_pids, Pids}, Acc) ->
[{slave_nodes, [node(Pid) || Pid <- Pids]} | Acc];
({synchronised_slave_pids, ''}, Acc) ->
Acc;
({synchronised_slave_pids, Pids}, Acc) ->
[{synchronised_slave_nodes, [node(Pid) || Pid <- Pids]} | Acc];
(Any, Acc) ->
[Any | Acc]
end, [], Item);
strip_pids(Items) -> [strip_pids(I) || I <- Items]. strip_pids(Items) -> [strip_pids(I) || I <- Items].
strip_pids([{_, unknown} | T], Acc) ->
strip_pids(T, Acc);
strip_pids([{pid, Pid} | T], Acc) when is_pid(Pid) ->
strip_pids(T, [{node, node(Pid)} | Acc]);
strip_pids([{pid, _} | T], Acc) ->
strip_pids(T, Acc);
strip_pids([{connection, _} | T], Acc) ->
strip_pids(T, Acc);
strip_pids([{owner_pid, _} | T], Acc) ->
strip_pids(T, Acc);
strip_pids([{channel, _} | T], Acc) ->
strip_pids(T, Acc);
strip_pids([{exclusive_consumer_pid, _} | T], Acc) ->
strip_pids(T, Acc);
strip_pids([{slave_pids, ''} | T], Acc) ->
strip_pids(T, Acc);
strip_pids([{slave_pids, Pids} | T], Acc) ->
strip_pids(T, [{slave_nodes, [node(Pid) || Pid <- Pids]} | Acc]);
strip_pids([{synchronised_slave_pids, ''} | T], Acc) ->
strip_pids(T, Acc);
strip_pids([{synchronised_slave_pids, Pids} | T], Acc) ->
strip_pids(T, [{synchronised_slave_nodes, [node(Pid) || Pid <- Pids]} | Acc]);
strip_pids([Any | T], Acc) ->
strip_pids(T, [Any | Acc]);
strip_pids([], Acc) ->
Acc.
%% Format for JSON replies. Transforms '' into null %% Format for JSON replies. Transforms '' into null
format_nulls(Items) when is_list(Items) -> format_nulls(Items) when is_list(Items) ->
lists:foldr(fun (Pair, Acc) -> [format_null_item(Pair) || Pair <- Items];
[format_null_item(Pair) | Acc]
end, [], Items);
format_nulls(Item) -> format_nulls(Item) ->
format_null_item(Item). format_null_item(Item).

View File

@ -19,7 +19,7 @@
-include("rabbit_mgmt.hrl"). -include("rabbit_mgmt.hrl").
-include("rabbit_mgmt_metrics.hrl"). -include("rabbit_mgmt_metrics.hrl").
-export([blank/1, is_blank/3, record/5, format/5, sum/1, gc/3, -export([blank/1, is_blank/3, record/5, format/6, sum/1, gc/3,
free/1, delete_stats/2, get_keys/2]). free/1, delete_stats/2, get_keys/2]).
-import(rabbit_misc, [pget/2]). -import(rabbit_misc, [pget/2]).
@ -53,8 +53,8 @@ blank(Name) ->
ets:new(rabbit_mgmt_stats_tables:index(Name), ets:new(rabbit_mgmt_stats_tables:index(Name),
[bag, public, named_table]), [bag, public, named_table]),
ets:new(rabbit_mgmt_stats_tables:key_index(Name), ets:new(rabbit_mgmt_stats_tables:key_index(Name),
[ordered_set, public, named_table]), [set, public, named_table]),
ets:new(Name, [ordered_set, public, named_table]). ets:new(Name, [set, public, named_table]).
is_blank(Table, Id, Record) -> is_blank(Table, Id, Record) ->
case ets:lookup(Table, {Id, total}) of case ets:lookup(Table, {Id, total}) of
@ -96,15 +96,14 @@ record({Id, _TS} = Key, Pos, Diff, Record, Table) ->
%% Query-time %% Query-time
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
format(no_range, Table, Id, Interval, Type) -> format(no_range, Table, Id, Interval, Type, Now) ->
Counts = get_value(Table, Id, total, Type), Counts = get_value(Table, Id, total, Type),
Now = time_compat:os_system_time(milli_seconds),
RangePoint = ((Now div Interval) * Interval) - Interval, RangePoint = ((Now div Interval) * Interval) - Interval,
{Record, Factor} = format_rate_with( {Record, Factor} = format_rate_with(
Table, Id, RangePoint, Interval, Interval, Type), Table, Id, RangePoint, Interval, Interval, Type),
format_rate(Type, Record, Counts, Factor); format_rate(Type, Record, Counts, Factor);
format(Range, Table, Id, Interval, Type) -> format(Range, Table, Id, Interval, Type, _Now) ->
Base = get_value(Table, Id, base, Type), Base = get_value(Table, Id, base, Type),
RangePoint = Range#range.last - Interval, RangePoint = Range#range.last - Interval,
{Samples, Counts} = extract_samples(Range, Base, Table, Id, Type), {Samples, Counts} = extract_samples(Range, Base, Table, Id, Type),
@ -172,12 +171,21 @@ format_rate_with(Table, Id, RangePoint, Incr, Interval, Type) ->
%% still arriving for the last... %% still arriving for the last...
second_largest(Table, Id) -> second_largest(Table, Id) ->
case ets:lookup(rabbit_mgmt_stats_tables:index(Table), Id) of case ets:lookup(rabbit_mgmt_stats_tables:index(Table), Id) of
Match when length(Match) >= 2 -> [_, _ | _] = List ->
ets:lookup(Table, lists:nth(length(Match) - 1, lists:sort(Match))); ets:lookup(Table, sl(List, 0, 0));
_ -> _ ->
unknown unknown
end. end.
sl([{_, TS} = H | T], L1, L2) when TS > L1 ->
sl(T, H, L2);
sl([{_, TS} = H | T], L1, L2) when TS > L2 ->
sl(T, L1, H);
sl([_ | T], L1, L2) ->
sl(T, L1, L2);
sl([], _L1, L2) ->
L2.
%% What we want to do here is: given the #range{}, provide a set of %% What we want to do here is: given the #range{}, provide a set of
%% samples such that we definitely provide a set of samples which %% samples such that we definitely provide a set of samples which
%% covers the exact range requested, despite the fact that we might %% covers the exact range requested, despite the fact that we might
@ -494,41 +502,41 @@ match_spec_keys(Id) ->
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
format_rate(deliver_get, {_, D, DN, G, GN}, {_, TD, TDN, TG, TGN}, Factor) -> format_rate(deliver_get, {_, D, DN, G, GN}, {_, TD, TDN, TG, TGN}, Factor) ->
[ [
[{deliver, TD}, {deliver_details, [{rate, apply_factor(D, Factor)}]}], {deliver, TD}, {deliver_details, [{rate, apply_factor(D, Factor)}]},
[{deliver_no_ack, TDN}, {deliver_no_ack, TDN},
{deliver_no_ack_details, [{rate, apply_factor(DN, Factor)}]}], {deliver_no_ack_details, [{rate, apply_factor(DN, Factor)}]},
[{get, TG}, {get_details, [{rate, apply_factor(G, Factor)}]}], {get, TG}, {get_details, [{rate, apply_factor(G, Factor)}]},
[{get_no_ack, TGN}, {get_no_ack, TGN},
{get_no_ack_details, [{rate, apply_factor(GN, Factor)}]}] {get_no_ack_details, [{rate, apply_factor(GN, Factor)}]}
]; ];
format_rate(fine_stats, {_, P, PI, PO, A, D, C, RU, R}, format_rate(fine_stats, {_, P, PI, PO, A, D, C, RU, R},
{_, TP, TPI, TPO, TA, TD, TC, TRU, TR}, Factor) -> {_, TP, TPI, TPO, TA, TD, TC, TRU, TR}, Factor) ->
[ [
[{publish, TP}, {publish_details, [{rate, apply_factor(P, Factor)}]}], {publish, TP}, {publish_details, [{rate, apply_factor(P, Factor)}]},
[{publish_in, TPI}, {publish_in, TPI},
{publish_in_details, [{rate, apply_factor(PI, Factor)}]}], {publish_in_details, [{rate, apply_factor(PI, Factor)}]},
[{publish_out, TPO}, {publish_out, TPO},
{publish_out_details, [{rate, apply_factor(PO, Factor)}]}], {publish_out_details, [{rate, apply_factor(PO, Factor)}]},
[{ack, TA}, {ack_details, [{rate, apply_factor(A, Factor)}]}], {ack, TA}, {ack_details, [{rate, apply_factor(A, Factor)}]},
[{deliver_get, TD}, {deliver_get_details, [{rate, apply_factor(D, Factor)}]}], {deliver_get, TD}, {deliver_get_details, [{rate, apply_factor(D, Factor)}]},
[{confirm, TC}, {confirm_details, [{rate, apply_factor(C, Factor)}]}], {confirm, TC}, {confirm_details, [{rate, apply_factor(C, Factor)}]},
[{return_unroutable, TRU}, {return_unroutable, TRU},
{return_unroutable_details, [{rate, apply_factor(RU, Factor)}]}], {return_unroutable_details, [{rate, apply_factor(RU, Factor)}]},
[{redeliver, TR}, {redeliver_details, [{rate, apply_factor(R, Factor)}]}] {redeliver, TR}, {redeliver_details, [{rate, apply_factor(R, Factor)}]}
]; ];
format_rate(queue_msg_rates, {_, R, W}, {_, TR, TW}, Factor) -> format_rate(queue_msg_rates, {_, R, W}, {_, TR, TW}, Factor) ->
[ [
[{disk_reads, TR}, {disk_reads_details, [{rate, apply_factor(R, Factor)}]}], {disk_reads, TR}, {disk_reads_details, [{rate, apply_factor(R, Factor)}]},
[{disk_writes, TW}, {disk_writes_details, [{rate, apply_factor(W, Factor)}]}] {disk_writes, TW}, {disk_writes_details, [{rate, apply_factor(W, Factor)}]}
]; ];
format_rate(queue_msg_counts, {_, M, MR, MU}, {_, TM, TMR, TMU}, Factor) -> format_rate(queue_msg_counts, {_, M, MR, MU}, {_, TM, TMR, TMU}, Factor) ->
[ [
[{messages, TM}, {messages, TM},
{messages_details, [{rate, apply_factor(M, Factor)}]}], {messages_details, [{rate, apply_factor(M, Factor)}]},
[{messages_ready, TMR}, {messages_ready, TMR},
{messages_ready_details, [{rate, apply_factor(MR, Factor)}]}], {messages_ready_details, [{rate, apply_factor(MR, Factor)}]},
[{messages_unacknowledged, TMU}, {messages_unacknowledged, TMU},
{messages_unacknowledged_details, [{rate, apply_factor(MU, Factor)}]}] {messages_unacknowledged_details, [{rate, apply_factor(MU, Factor)}]}
]; ];
format_rate(coarse_node_stats, format_rate(coarse_node_stats,
{_, M, F, S, P, D, IR, IB, IA, IWC, IWB, IWAT, IS, ISAT, ISC, {_, M, F, S, P, D, IR, IB, IA, IWC, IWB, IWAT, IS, ISAT, ISC,
@ -537,135 +545,135 @@ format_rate(coarse_node_stats,
TISAT, TISC, TISEAT, TIRC, TMRTC, TMDTC, TMSRC, TMSWC, TQIJWC, TISAT, TISC, TISEAT, TIRC, TMRTC, TMDTC, TMSRC, TMSWC, TQIJWC,
TQIWC, TQIRC}, Factor) -> TQIWC, TQIRC}, Factor) ->
[ [
[{mem_used, TM}, {mem_used, TM},
{mem_used_details, [{rate, apply_factor(M, Factor)}]}], {mem_used_details, [{rate, apply_factor(M, Factor)}]},
[{fd_used, TF}, {fd_used, TF},
{fd_used_details, [{rate, apply_factor(F, Factor)}]}], {fd_used_details, [{rate, apply_factor(F, Factor)}]},
[{sockets_used, TS}, {sockets_used, TS},
{sockets_used_details, [{rate, apply_factor(S, Factor)}]}], {sockets_used_details, [{rate, apply_factor(S, Factor)}]},
[{proc_used, TP}, {proc_used, TP},
{proc_used_details, [{rate, apply_factor(P, Factor)}]}], {proc_used_details, [{rate, apply_factor(P, Factor)}]},
[{disk_free, TD}, {disk_free, TD},
{disk_free_details, [{rate, apply_factor(D, Factor)}]}], {disk_free_details, [{rate, apply_factor(D, Factor)}]},
[{io_read_count, TIR}, {io_read_count, TIR},
{io_read_count_details, [{rate, apply_factor(IR, Factor)}]}], {io_read_count_details, [{rate, apply_factor(IR, Factor)}]},
[{io_read_bytes, TIB}, {io_read_bytes, TIB},
{io_read_bytes_details, [{rate, apply_factor(IB, Factor)}]}], {io_read_bytes_details, [{rate, apply_factor(IB, Factor)}]},
[{io_read_avg_time, TIA}, {io_read_avg_time, TIA},
{io_read_avg_time_details, [{rate, apply_factor(IA, Factor)}]}], {io_read_avg_time_details, [{rate, apply_factor(IA, Factor)}]},
[{io_write_count, TIWC}, {io_write_count, TIWC},
{io_write_count_details, [{rate, apply_factor(IWC, Factor)}]}], {io_write_count_details, [{rate, apply_factor(IWC, Factor)}]},
[{io_write_bytes, TIWB}, {io_write_bytes, TIWB},
{io_write_bytes_details, [{rate, apply_factor(IWB, Factor)}]}], {io_write_bytes_details, [{rate, apply_factor(IWB, Factor)}]},
[{io_write_avg_time, TIWAT}, {io_write_avg_time, TIWAT},
{io_write_avg_time_details, [{rate, apply_factor(IWAT, Factor)}]}], {io_write_avg_time_details, [{rate, apply_factor(IWAT, Factor)}]},
[{io_sync_count, TIS}, {io_sync_count, TIS},
{io_sync_count_details, [{rate, apply_factor(IS, Factor)}]}], {io_sync_count_details, [{rate, apply_factor(IS, Factor)}]},
[{io_sync_avg_time, TISAT}, {io_sync_avg_time, TISAT},
{io_sync_avg_time_details, [{rate, apply_factor(ISAT, Factor)}]}], {io_sync_avg_time_details, [{rate, apply_factor(ISAT, Factor)}]},
[{io_seek_count, TISC}, {io_seek_count, TISC},
{io_seek_count_details, [{rate, apply_factor(ISC, Factor)}]}], {io_seek_count_details, [{rate, apply_factor(ISC, Factor)}]},
[{io_seek_avg_time, TISEAT}, {io_seek_avg_time, TISEAT},
{io_seek_avg_time_details, [{rate, apply_factor(ISEAT, Factor)}]}], {io_seek_avg_time_details, [{rate, apply_factor(ISEAT, Factor)}]},
[{io_reopen_count, TIRC}, {io_reopen_count, TIRC},
{io_reopen_count_details, [{rate, apply_factor(IRC, Factor)}]}], {io_reopen_count_details, [{rate, apply_factor(IRC, Factor)}]},
[{mnesia_ram_tx_count, TMRTC}, {mnesia_ram_tx_count, TMRTC},
{mnesia_ram_tx_count_details, [{rate, apply_factor(MRTC, Factor)}]}], {mnesia_ram_tx_count_details, [{rate, apply_factor(MRTC, Factor)}]},
[{mnesia_disk_tx_count, TMDTC}, {mnesia_disk_tx_count, TMDTC},
{mnesia_disk_tx_count_details, [{rate, apply_factor(MDTC, Factor)}]}], {mnesia_disk_tx_count_details, [{rate, apply_factor(MDTC, Factor)}]},
[{msg_store_read_count, TMSRC}, {msg_store_read_count, TMSRC},
{msg_store_read_count_details, [{rate, apply_factor(MSRC, Factor)}]}], {msg_store_read_count_details, [{rate, apply_factor(MSRC, Factor)}]},
[{msg_store_write_count, TMSWC}, {msg_store_write_count, TMSWC},
{msg_store_write_count_details, [{rate, apply_factor(MSWC, Factor)}]}], {msg_store_write_count_details, [{rate, apply_factor(MSWC, Factor)}]},
[{queue_index_journal_write_count, TQIJWC}, {queue_index_journal_write_count, TQIJWC},
{queue_index_journal_write_count_details, [{rate, apply_factor(QIJWC, Factor)}]}], {queue_index_journal_write_count_details, [{rate, apply_factor(QIJWC, Factor)}]},
[{queue_index_write_count, TQIWC}, {queue_index_write_count, TQIWC},
{queue_index_write_count_details, [{rate, apply_factor(QIWC, Factor)}]}], {queue_index_write_count_details, [{rate, apply_factor(QIWC, Factor)}]},
[{queue_index_read_count, TQIRC}, {queue_index_read_count, TQIRC},
{queue_index_read_count_details, [{rate, apply_factor(QIRC, Factor)}]}] {queue_index_read_count_details, [{rate, apply_factor(QIRC, Factor)}]}
]; ];
format_rate(coarse_node_node_stats, {_, S, R}, {_, TS, TR}, Factor) -> format_rate(coarse_node_node_stats, {_, S, R}, {_, TS, TR}, Factor) ->
[ [
[{send_bytes, TS}, {send_bytes, TS},
{send_bytes_details, [{rate, apply_factor(S, Factor)}]}], {send_bytes_details, [{rate, apply_factor(S, Factor)}]},
[{send_bytes, TR}, {send_bytes, TR},
{send_bytes_details, [{rate, apply_factor(R, Factor)}]}] {send_bytes_details, [{rate, apply_factor(R, Factor)}]}
]; ];
format_rate(coarse_conn_stats, {_, R, S}, {_, TR, TS}, Factor) -> format_rate(coarse_conn_stats, {_, R, S}, {_, TR, TS}, Factor) ->
[ [
[{send_oct, TS}, {send_oct, TS},
{send_oct_details, [{rate, apply_factor(S, Factor)}]}], {send_oct_details, [{rate, apply_factor(S, Factor)}]},
[{recv_oct, TR}, {recv_oct, TR},
{recv_oct_details, [{rate, apply_factor(R, Factor)}]}] {recv_oct_details, [{rate, apply_factor(R, Factor)}]}
]. ].
format_rate(deliver_get, {_, D, DN, G, GN}, {_, TD, TDN, TG, TGN}, format_rate(deliver_get, {_, D, DN, G, GN}, {_, TD, TDN, TG, TGN},
{_, SD, SDN, SG, SGN}, Factor) -> {_, SD, SDN, SG, SGN}, Factor) ->
Length = length(SD), Length = length(SD),
[ [
[{deliver, TD}, {deliver_details, [{rate, apply_factor(D, Factor)}, {deliver, TD}, {deliver_details, [{rate, apply_factor(D, Factor)},
{samples, SD}] ++ average(SD, Length)}], {samples, SD}] ++ average(SD, Length)},
[{deliver_no_ack, TDN}, {deliver_no_ack, TDN},
{deliver_no_ack_details, [{rate, apply_factor(DN, Factor)}, {deliver_no_ack_details, [{rate, apply_factor(DN, Factor)},
{samples, SDN}] ++ average(SDN, Length)}], {samples, SDN}] ++ average(SDN, Length)},
[{get, TG}, {get_details, [{rate, apply_factor(G, Factor)}, {get, TG}, {get_details, [{rate, apply_factor(G, Factor)},
{samples, SG}] ++ average(SG, Length)}], {samples, SG}] ++ average(SG, Length)},
[{get_no_ack, TGN}, {get_no_ack, TGN},
{get_no_ack_details, [{rate, apply_factor(GN, Factor)}, {get_no_ack_details, [{rate, apply_factor(GN, Factor)},
{samples, SGN}] ++ average(SGN, Length)}] {samples, SGN}] ++ average(SGN, Length)}
]; ];
format_rate(fine_stats, {_, P, PI, PO, A, D, C, RU, R}, format_rate(fine_stats, {_, P, PI, PO, A, D, C, RU, R},
{_, TP, TPI, TPO, TA, TD, TC, TRU, TR}, {_, TP, TPI, TPO, TA, TD, TC, TRU, TR},
{_, SP, SPI, SPO, SA, SD, SC, SRU, SR}, Factor) -> {_, SP, SPI, SPO, SA, SD, SC, SRU, SR}, Factor) ->
Length = length(SP), Length = length(SP),
[ [
[{publish, TP}, {publish, TP},
{publish_details, [{rate, apply_factor(P, Factor)}, {publish_details, [{rate, apply_factor(P, Factor)},
{samples, SP}] ++ average(SP, Length)}], {samples, SP}] ++ average(SP, Length)},
[{publish_in, TPI}, {publish_in, TPI},
{publish_in_details, [{rate, apply_factor(PI, Factor)}, {publish_in_details, [{rate, apply_factor(PI, Factor)},
{samples, SPI}] ++ average(SPI, Length)}], {samples, SPI}] ++ average(SPI, Length)},
[{publish_out, TPO}, {publish_out, TPO},
{publish_out_details, [{rate, apply_factor(PO, Factor)}, {publish_out_details, [{rate, apply_factor(PO, Factor)},
{samples, SPO}] ++ average(SPO, Length)}], {samples, SPO}] ++ average(SPO, Length)},
[{ack, TA}, {ack_details, [{rate, apply_factor(A, Factor)}, {ack, TA}, {ack_details, [{rate, apply_factor(A, Factor)},
{samples, SA}] ++ average(SA, Length)}], {samples, SA}] ++ average(SA, Length)},
[{deliver_get, TD}, {deliver_get, TD},
{deliver_get_details, [{rate, apply_factor(D, Factor)}, {deliver_get_details, [{rate, apply_factor(D, Factor)},
{samples, SD}] ++ average(SD, Length)}], {samples, SD}] ++ average(SD, Length)},
[{confirm, TC}, {confirm, TC},
{confirm_details, [{rate, apply_factor(C, Factor)}, {confirm_details, [{rate, apply_factor(C, Factor)},
{samples, SC}] ++ average(SC, Length)}], {samples, SC}] ++ average(SC, Length)},
[{return_unroutable, TRU}, {return_unroutable, TRU},
{return_unroutable_details, [{rate, apply_factor(RU, Factor)}, {return_unroutable_details, [{rate, apply_factor(RU, Factor)},
{samples, SRU}] ++ average(SRU, Length)}], {samples, SRU}] ++ average(SRU, Length)},
[{redeliver, TR}, {redeliver, TR},
{redeliver_details, [{rate, apply_factor(R, Factor)}, {redeliver_details, [{rate, apply_factor(R, Factor)},
{samples, SR}] ++ average(SR, Length)}] {samples, SR}] ++ average(SR, Length)}
]; ];
format_rate(queue_msg_rates, {_, R, W}, {_, TR, TW}, {_, SR, SW}, Factor) -> format_rate(queue_msg_rates, {_, R, W}, {_, TR, TW}, {_, SR, SW}, Factor) ->
Length = length(SR), Length = length(SR),
[ [
[{disk_reads, TR}, {disk_reads, TR},
{disk_reads_details, [{rate, apply_factor(R, Factor)}, {disk_reads_details, [{rate, apply_factor(R, Factor)},
{samples, SR}] ++ average(SR, Length)}], {samples, SR}] ++ average(SR, Length)},
[{disk_writes, TW}, {disk_writes, TW},
{disk_writes_details, [{rate, apply_factor(W, Factor)}, {disk_writes_details, [{rate, apply_factor(W, Factor)},
{samples, SW}] ++ average(SW, Length)}] {samples, SW}] ++ average(SW, Length)}
]; ];
format_rate(queue_msg_counts, {_, M, MR, MU}, {_, TM, TMR, TMU}, format_rate(queue_msg_counts, {_, M, MR, MU}, {_, TM, TMR, TMU},
{_, SM, SMR, SMU}, Factor) -> {_, SM, SMR, SMU}, Factor) ->
Length = length(SM), Length = length(SM),
[ [
[{messages, TM}, {messages, TM},
{messages_details, [{rate, apply_factor(M, Factor)}, {messages_details, [{rate, apply_factor(M, Factor)},
{samples, SM}] ++ average(SM, Length)}], {samples, SM}] ++ average(SM, Length)},
[{messages_ready, TMR}, {messages_ready, TMR},
{messages_ready_details, [{rate, apply_factor(MR, Factor)}, {messages_ready_details, [{rate, apply_factor(MR, Factor)},
{samples, SMR}] ++ average(SMR, Length)}], {samples, SMR}] ++ average(SMR, Length)},
[{messages_unacknowledged, TMU}, {messages_unacknowledged, TMU},
{messages_unacknowledged_details, [{rate, apply_factor(MU, Factor)}, {messages_unacknowledged_details, [{rate, apply_factor(MU, Factor)},
{samples, SMU}] ++ average(SMU, Length)}] {samples, SMU}] ++ average(SMU, Length)}
]; ];
format_rate(coarse_node_stats, format_rate(coarse_node_stats,
{_, M, F, S, P, D, IR, IB, IA, IWC, IWB, IWAT, IS, ISAT, ISC, {_, M, F, S, P, D, IR, IB, IA, IWC, IWB, IWAT, IS, ISAT, ISC,
@ -678,97 +686,97 @@ format_rate(coarse_node_stats,
SQIWC, SQIRC}, Factor) -> SQIWC, SQIRC}, Factor) ->
Length = length(SM), Length = length(SM),
[ [
[{mem_used, TM}, {mem_used, TM},
{mem_used_details, [{rate, apply_factor(M, Factor)}, {mem_used_details, [{rate, apply_factor(M, Factor)},
{samples, SM}] ++ average(SM, Length)}], {samples, SM}] ++ average(SM, Length)},
[{fd_used, TF}, {fd_used, TF},
{fd_used_details, [{rate, apply_factor(F, Factor)}, {fd_used_details, [{rate, apply_factor(F, Factor)},
{samples, SF}] ++ average(SF, Length)}], {samples, SF}] ++ average(SF, Length)},
[{sockets_used, TS}, {sockets_used, TS},
{sockets_used_details, [{rate, apply_factor(S, Factor)}, {sockets_used_details, [{rate, apply_factor(S, Factor)},
{samples, SS}] ++ average(SS, Length)}], {samples, SS}] ++ average(SS, Length)},
[{proc_used, TP}, {proc_used, TP},
{proc_used_details, [{rate, apply_factor(P, Factor)}, {proc_used_details, [{rate, apply_factor(P, Factor)},
{samples, SP}] ++ average(SP, Length)}], {samples, SP}] ++ average(SP, Length)},
[{disk_free, TD}, {disk_free, TD},
{disk_free_details, [{rate, apply_factor(D, Factor)}, {disk_free_details, [{rate, apply_factor(D, Factor)},
{samples, SD}] ++ average(SD, Length)}], {samples, SD}] ++ average(SD, Length)},
[{io_read_count, TIR}, {io_read_count, TIR},
{io_read_count_details, [{rate, apply_factor(IR, Factor)}, {io_read_count_details, [{rate, apply_factor(IR, Factor)},
{samples, SIR}] ++ average(SIR, Length)}], {samples, SIR}] ++ average(SIR, Length)},
[{io_read_bytes, TIB}, {io_read_bytes, TIB},
{io_read_bytes_details, [{rate, apply_factor(IB, Factor)}, {io_read_bytes_details, [{rate, apply_factor(IB, Factor)},
{samples, SIB}] ++ average(SIB, Length)}], {samples, SIB}] ++ average(SIB, Length)},
[{io_read_avg_time, TIA}, {io_read_avg_time, TIA},
{io_read_avg_time_details, [{rate, apply_factor(IA, Factor)}, {io_read_avg_time_details, [{rate, apply_factor(IA, Factor)},
{samples, SIA}] ++ average(SIA, Length)}], {samples, SIA}] ++ average(SIA, Length)},
[{io_write_count, TIWC}, {io_write_count, TIWC},
{io_write_count_details, [{rate, apply_factor(IWC, Factor)}, {io_write_count_details, [{rate, apply_factor(IWC, Factor)},
{samples, SIWC}] ++ average(SIWC, Length)}], {samples, SIWC}] ++ average(SIWC, Length)},
[{io_write_bytes, TIWB}, {io_write_bytes, TIWB},
{io_write_bytes_details, [{rate, apply_factor(IWB, Factor)}, {io_write_bytes_details, [{rate, apply_factor(IWB, Factor)},
{samples, SIWB}] ++ average(SIWB, Length)}], {samples, SIWB}] ++ average(SIWB, Length)},
[{io_write_avg_time, TIWAT}, {io_write_avg_time, TIWAT},
{io_write_avg_time_details, [{rate, apply_factor(IWAT, Factor)}, {io_write_avg_time_details, [{rate, apply_factor(IWAT, Factor)},
{samples, SIWAT}] ++ average(SIWAT, Length)}], {samples, SIWAT}] ++ average(SIWAT, Length)},
[{io_sync_count, TIS}, {io_sync_count, TIS},
{io_sync_count_details, [{rate, apply_factor(IS, Factor)}, {io_sync_count_details, [{rate, apply_factor(IS, Factor)},
{samples, SIS}] ++ average(SIS, Length)}], {samples, SIS}] ++ average(SIS, Length)},
[{io_sync_avg_time, TISAT}, {io_sync_avg_time, TISAT},
{io_sync_avg_time_details, [{rate, apply_factor(ISAT, Factor)}, {io_sync_avg_time_details, [{rate, apply_factor(ISAT, Factor)},
{samples, SISAT}] ++ average(SISAT, Length)}], {samples, SISAT}] ++ average(SISAT, Length)},
[{io_seek_count, TISC}, {io_seek_count, TISC},
{io_seek_count_details, [{rate, apply_factor(ISC, Factor)}, {io_seek_count_details, [{rate, apply_factor(ISC, Factor)},
{samples, SISC}] ++ average(SISC, Length)}], {samples, SISC}] ++ average(SISC, Length)},
[{io_seek_avg_time, TISEAT}, {io_seek_avg_time, TISEAT},
{io_seek_avg_time_details, [{rate, apply_factor(ISEAT, Factor)}, {io_seek_avg_time_details, [{rate, apply_factor(ISEAT, Factor)},
{samples, SISEAT}] ++ average(SISEAT, Length)}], {samples, SISEAT}] ++ average(SISEAT, Length)},
[{io_reopen_count, TIRC}, {io_reopen_count, TIRC},
{io_reopen_count_details, [{rate, apply_factor(IRC, Factor)}, {io_reopen_count_details, [{rate, apply_factor(IRC, Factor)},
{samples, SIRC}] ++ average(SIRC, Length)}], {samples, SIRC}] ++ average(SIRC, Length)},
[{mnesia_ram_tx_count, TMRTC}, {mnesia_ram_tx_count, TMRTC},
{mnesia_ram_tx_count_details, [{rate, apply_factor(MRTC, Factor)}, {mnesia_ram_tx_count_details, [{rate, apply_factor(MRTC, Factor)},
{samples, SMRTC}] ++ average(SMRTC, Length)}], {samples, SMRTC}] ++ average(SMRTC, Length)},
[{mnesia_disk_tx_count, TMDTC}, {mnesia_disk_tx_count, TMDTC},
{mnesia_disk_tx_count_details, [{rate, apply_factor(MDTC, Factor)}, {mnesia_disk_tx_count_details, [{rate, apply_factor(MDTC, Factor)},
{samples, SMDTC}] ++ average(SMDTC, Length)}], {samples, SMDTC}] ++ average(SMDTC, Length)},
[{msg_store_read_count, TMSRC}, {msg_store_read_count, TMSRC},
{msg_store_read_count_details, [{rate, apply_factor(MSRC, Factor)}, {msg_store_read_count_details, [{rate, apply_factor(MSRC, Factor)},
{samples, SMSRC}] ++ average(SMSRC, Length)}], {samples, SMSRC}] ++ average(SMSRC, Length)},
[{msg_store_write_count, TMSWC}, {msg_store_write_count, TMSWC},
{msg_store_write_count_details, [{rate, apply_factor(MSWC, Factor)}, {msg_store_write_count_details, [{rate, apply_factor(MSWC, Factor)},
{samples, SMSWC}] ++ average(SMSWC, Length)}], {samples, SMSWC}] ++ average(SMSWC, Length)},
[{queue_index_journal_write_count, TQIJWC}, {queue_index_journal_write_count, TQIJWC},
{queue_index_journal_write_count_details, {queue_index_journal_write_count_details,
[{rate, apply_factor(QIJWC, Factor)}, [{rate, apply_factor(QIJWC, Factor)},
{samples, SQIJWC}] ++ average(SQIJWC, Length)}], {samples, SQIJWC}] ++ average(SQIJWC, Length)},
[{queue_index_write_count, TQIWC}, {queue_index_write_count, TQIWC},
{queue_index_write_count_details, [{rate, apply_factor(QIWC, Factor)}, {queue_index_write_count_details, [{rate, apply_factor(QIWC, Factor)},
{samples, SQIWC}] ++ average(SQIWC, Length)}], {samples, SQIWC}] ++ average(SQIWC, Length)},
[{queue_index_read_count, TQIRC}, {queue_index_read_count, TQIRC},
{queue_index_read_count_details, [{rate, apply_factor(QIRC, Factor)}, {queue_index_read_count_details, [{rate, apply_factor(QIRC, Factor)},
{samples, SQIRC}] ++ average(SQIRC, Length)}] {samples, SQIRC}] ++ average(SQIRC, Length)}
]; ];
format_rate(coarse_node_node_stats, {_, S, R}, {_, TS, TR}, {_, SS, SR}, format_rate(coarse_node_node_stats, {_, S, R}, {_, TS, TR}, {_, SS, SR},
Factor) -> Factor) ->
Length = length(SS), Length = length(SS),
[ [
[{send_bytes, TS}, {send_bytes, TS},
{send_bytes_details, [{rate, apply_factor(S, Factor)}, {send_bytes_details, [{rate, apply_factor(S, Factor)},
{samples, SS}] ++ average(SS, Length)}], {samples, SS}] ++ average(SS, Length)},
[{send_bytes, TR}, {send_bytes, TR},
{send_bytes_details, [{rate, apply_factor(R, Factor)}, {send_bytes_details, [{rate, apply_factor(R, Factor)},
{samples, SR}] ++ average(SR, Length)}] {samples, SR}] ++ average(SR, Length)}
]; ];
format_rate(coarse_conn_stats, {_, R, S}, {_, TR, TS}, {_, SR, SS}, Factor) -> format_rate(coarse_conn_stats, {_, R, S}, {_, TR, TS}, {_, SR, SS}, Factor) ->
Length = length(SS), Length = length(SS),
[ [
[{send_oct, TS}, {send_oct, TS},
{send_oct_details, [{rate, apply_factor(S, Factor)}, {send_oct_details, [{rate, apply_factor(S, Factor)},
{samples, SS}] ++ average(SS, Length)}], {samples, SS}] ++ average(SS, Length)},
[{recv_oct, TR}, {recv_oct, TR},
{recv_oct_details, [{rate, apply_factor(R, Factor)}, {recv_oct_details, [{rate, apply_factor(R, Factor)},
{samples, SR}] ++ average(SR, Length)}] {samples, SR}] ++ average(SR, Length)}
]. ].
apply_factor(_, 0.0) -> apply_factor(_, 0.0) ->

View File

@ -49,10 +49,8 @@
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
start_link(Table) -> start_link(Table) ->
Ref = make_ref(), case gen_server2:start_link({global, name(Table)}, ?MODULE, [Table], []) of
case gen_server2:start_link({global, name(Table)}, ?MODULE, [Ref, Table], []) of
{ok, Pid} -> register(name(Table), Pid), %% [1] {ok, Pid} -> register(name(Table), Pid), %% [1]
rabbit:force_event_refresh(Ref),
{ok, Pid}; {ok, Pid};
Else -> Else Else -> Else
end. end.
@ -63,7 +61,7 @@ start_link(Table) ->
%% Internal, gen_server2 callbacks %% Internal, gen_server2 callbacks
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
init([_, Table]) -> init([Table]) ->
{ok, Interval} = application:get_env(rabbit, collect_statistics_interval), {ok, Interval} = application:get_env(rabbit, collect_statistics_interval),
rabbit_log:info("Statistics garbage collector started for table ~p.~n", [Table]), rabbit_log:info("Statistics garbage collector started for table ~p.~n", [Table]),
{ok, set_gc_timer(#state{interval = Interval, {ok, set_gc_timer(#state{interval = Interval,

View File

@ -131,71 +131,71 @@ aggr_table(node_node_stats, coarse_conn_stats) ->
aggr_table(connection_stats, coarse_conn_stats) -> aggr_table(connection_stats, coarse_conn_stats) ->
aggr_connection_stats_coarse_conn_stats. aggr_connection_stats_coarse_conn_stats.
-spec aggr_tables(event_type()) -> [table_name()]. -spec aggr_tables(event_type()) -> [{table_name(), type()}].
aggr_tables(queue_stats) -> aggr_tables(queue_stats) ->
[aggr_queue_stats_fine_stats, [{aggr_queue_stats_fine_stats, fine_stats},
aggr_queue_stats_deliver_get, {aggr_queue_stats_deliver_get, deliver_get},
aggr_queue_stats_queue_msg_counts]; {aggr_queue_stats_queue_msg_counts, queue_msg_counts}];
aggr_tables(queue_exchange_stats) -> aggr_tables(queue_exchange_stats) ->
[aggr_queue_exchange_stats_deliver_get, [{aggr_queue_exchange_stats_deliver_get, deliver_get},
aggr_queue_exchange_stats_fine_stats, {aggr_queue_exchange_stats_fine_stats, fine_stats},
aggr_queue_exchange_stats_queue_msg_rates, {aggr_queue_exchange_stats_queue_msg_rates, queue_msg_rates},
aggr_queue_exchange_stats_queue_msg_counts, {aggr_queue_exchange_stats_queue_msg_counts, queue_msg_counts},
aggr_queue_exchange_stats_coarse_node_stats, {aggr_queue_exchange_stats_coarse_node_stats, coarse_node_stats},
aggr_queue_exchange_stats_coarse_node_node_stats, {aggr_queue_exchange_stats_coarse_node_node_stats, coarse_node_node_stats},
aggr_queue_exchange_stats_coarse_conn_stats]; {aggr_queue_exchange_stats_coarse_conn_stats, coarse_conn_stats}];
aggr_tables(vhost_stats) -> aggr_tables(vhost_stats) ->
[aggr_vhost_stats_deliver_get, [{aggr_vhost_stats_deliver_get, deliver_get},
aggr_vhost_stats_fine_stats, {aggr_vhost_stats_fine_stats, fine_stats},
aggr_vhost_stats_queue_msg_rates, {aggr_vhost_stats_queue_msg_rates, queue_msg_rates},
aggr_vhost_stats_queue_msg_counts, {aggr_vhost_stats_queue_msg_counts, queue_msg_counts},
aggr_vhost_stats_coarse_node_stats, {aggr_vhost_stats_coarse_node_stats, coarse_node_stats},
aggr_vhost_stats_coarse_node_node_stats, {aggr_vhost_stats_coarse_node_node_stats, coarse_node_node_stats},
aggr_vhost_stats_coarse_conn_stats]; {aggr_vhost_stats_coarse_conn_stats, coarse_conn_stats}];
aggr_tables(channel_queue_stats) -> aggr_tables(channel_queue_stats) ->
[aggr_channel_queue_stats_deliver_get, [{aggr_channel_queue_stats_deliver_get, deliver_get},
aggr_channel_queue_stats_fine_stats, {aggr_channel_queue_stats_fine_stats, fine_stats},
aggr_channel_queue_stats_queue_msg_rates, {aggr_channel_queue_stats_queue_msg_rates, queue_msg_rates},
aggr_channel_queue_stats_queue_msg_counts, {aggr_channel_queue_stats_queue_msg_counts, queue_msg_counts},
aggr_channel_queue_stats_coarse_node_stats, {aggr_channel_queue_stats_coarse_node_stats, coarse_node_stats},
aggr_channel_queue_stats_coarse_node_node_stats, {aggr_channel_queue_stats_coarse_node_node_stats, coarse_node_node_stats},
aggr_channel_queue_stats_coarse_conn_stats]; {aggr_channel_queue_stats_coarse_conn_stats, coarse_conn_stats}];
aggr_tables(channel_stats) -> aggr_tables(channel_stats) ->
[aggr_channel_stats_deliver_get, [{aggr_channel_stats_deliver_get, deliver_get},
aggr_channel_stats_fine_stats, {aggr_channel_stats_fine_stats, fine_stats},
aggr_channel_stats_queue_msg_rates, {aggr_channel_stats_queue_msg_rates, queue_msg_rates},
aggr_channel_stats_queue_msg_counts, {aggr_channel_stats_queue_msg_counts, queue_msg_counts},
aggr_channel_stats_coarse_node_stats, {aggr_channel_stats_coarse_node_stats, coarse_node_stats},
aggr_channel_stats_coarse_node_node_stats, {aggr_channel_stats_coarse_node_node_stats, coarse_node_node_stats},
aggr_channel_stats_coarse_conn_stats]; {aggr_channel_stats_coarse_conn_stats, coarse_conn_stats}];
aggr_tables(channel_exchange_stats) -> aggr_tables(channel_exchange_stats) ->
[aggr_channel_exchange_stats_deliver_get, [{aggr_channel_exchange_stats_deliver_get, deliver_get},
aggr_channel_exchange_stats_fine_stats, {aggr_channel_exchange_stats_fine_stats, fine_stats},
aggr_channel_exchange_stats_queue_msg_rates, {aggr_channel_exchange_stats_queue_msg_rates, queue_msg_rates},
aggr_channel_exchange_stats_queue_msg_counts, {aggr_channel_exchange_stats_queue_msg_counts, queue_msg_counts},
aggr_channel_exchange_stats_coarse_node_stats, {aggr_channel_exchange_stats_coarse_node_stats, coarse_node_stats},
aggr_channel_exchange_stats_coarse_node_node_stats, {aggr_channel_exchange_stats_coarse_node_node_stats, coarse_node_node_stats},
aggr_channel_exchange_stats_coarse_conn_stats]; {aggr_channel_exchange_stats_coarse_conn_stats, coarse_conn_stats}];
aggr_tables(exchange_stats) -> aggr_tables(exchange_stats) ->
[aggr_exchange_stats_fine_stats]; [{aggr_exchange_stats_fine_stats, fine_stats}];
aggr_tables(node_stats) -> aggr_tables(node_stats) ->
[aggr_node_stats_deliver_get, [{aggr_node_stats_deliver_get, deliver_get},
aggr_node_stats_fine_stats, {aggr_node_stats_fine_stats, fine_stats},
aggr_node_stats_queue_msg_rates, {aggr_node_stats_queue_msg_rates, queue_msg_rates},
aggr_node_stats_queue_msg_counts, {aggr_node_stats_queue_msg_counts, queue_msg_counts},
aggr_node_stats_coarse_node_stats, {aggr_node_stats_coarse_node_stats, coarse_node_stats},
aggr_node_stats_coarse_node_node_stats, {aggr_node_stats_coarse_node_node_stats, coarse_node_node_stats},
aggr_node_stats_coarse_conn_stats]; {aggr_node_stats_coarse_conn_stats, coarse_conn_stats}];
aggr_tables(node_node_stats) -> aggr_tables(node_node_stats) ->
[aggr_node_node_stats_deliver_get, [{aggr_node_node_stats_deliver_get, deliver_get},
aggr_node_node_stats_fine_stats, {aggr_node_node_stats_fine_stats, fine_stats},
aggr_node_node_stats_queue_msg_rates, {aggr_node_node_stats_queue_msg_rates, queue_msg_rates},
aggr_node_node_stats_queue_msg_counts, {aggr_node_node_stats_queue_msg_counts, queue_msg_counts},
aggr_node_node_stats_coarse_node_stats, {aggr_node_node_stats_coarse_node_stats, coarse_node_stats},
aggr_node_node_stats_coarse_node_node_stats, {aggr_node_node_stats_coarse_node_node_stats, coarse_node_node_stats},
aggr_node_node_stats_coarse_conn_stats]; {aggr_node_node_stats_coarse_conn_stats, coarse_conn_stats}];
aggr_tables(connection_stats) -> aggr_tables(connection_stats) ->
[aggr_connection_stats_coarse_conn_stats]. [{aggr_connection_stats_coarse_conn_stats, coarse_conn_stats}].
-spec type_from_table(table_name()) -> type(). -spec type_from_table(table_name()) -> type().
type_from_table(aggr_queue_stats_deliver_get) -> type_from_table(aggr_queue_stats_deliver_get) ->

View File

@ -46,7 +46,7 @@ resource_exists(ReqData, Context) ->
to_json(ReqData, Context) -> to_json(ReqData, Context) ->
[Q] = rabbit_mgmt_db:augment_queues( [Q] = rabbit_mgmt_db:augment_queues(
[queue(ReqData)], rabbit_mgmt_util:range_ceil(ReqData), full), [queue(ReqData)], rabbit_mgmt_util:range_ceil(ReqData), full),
rabbit_mgmt_util:reply(rabbit_mgmt_format:strip_pids(Q), ReqData, Context). rabbit_mgmt_util:reply(Q, ReqData, Context).
accept_content(ReqData, Context) -> accept_content(ReqData, Context) ->
rabbit_mgmt_util:http_to_amqp( rabbit_mgmt_util:http_to_amqp(

View File

@ -48,10 +48,9 @@ is_authorized(ReqData, Context) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
augmented(ReqData, Context) -> augmented(ReqData, Context) ->
rabbit_mgmt_format:strip_pids(
rabbit_mgmt_db:augment_queues( rabbit_mgmt_db:augment_queues(
rabbit_mgmt_util:filter_vhost(basic(ReqData), ReqData, Context), rabbit_mgmt_util:filter_vhost(basic(ReqData), ReqData, Context),
rabbit_mgmt_util:range_ceil(ReqData), basic)). rabbit_mgmt_util:range_ceil(ReqData), basic).
basic(ReqData) -> basic(ReqData) ->
[rabbit_mgmt_format:queue(Q) || Q <- queues0(ReqData)] ++ [rabbit_mgmt_format:queue(Q) || Q <- queues0(ReqData)] ++

View File

@ -174,11 +174,11 @@ format_samples(Samples) ->
[[{sample, S}, {timestamp, TS * 1000}] || {TS, S} <- Samples]. [[{sample, S}, {timestamp, TS * 1000}] || {TS, S} <- Samples].
select_messages(List) -> select_messages(List) ->
case lists:filter(fun(E) -> case lists:filter(fun({K, _}) ->
proplists:is_defined(messages, E) (K == messages) or (K == messages_details)
end, List) of end, List) of
[Messages] ->
Messages;
[] -> [] ->
not_found not_found;
Messages ->
Messages
end. end.