Channel metrics. Implements aggregation

This commit is contained in:
Diana Corbacho 2016-08-26 15:57:42 +01:00
parent ab61eda23a
commit 9748a1be0d
7 changed files with 412 additions and 60 deletions

View File

@ -24,7 +24,7 @@
-define(FINE_STATS_TYPES, [channel_queue_stats, channel_exchange_stats,
channel_queue_exchange_stats]).
-define(TABLES, [queue_stats, channel_stats,
-define(TABLES, [queue_stats,
consumers_by_queue, consumers_by_channel,
node_stats, node_node_stats,
%% What the previous info item was for any given

View File

@ -73,9 +73,25 @@
-define(NEW_TABLES, [connection_stats_coarse_conn_stats,
vhost_stats_coarse_conn_stats,
connection_created_stats,
connection_stats
connection_stats,
channel_created_stats,
channel_stats,
channel_stats_fine_stats,
channel_exchange_stats_fine_stats,
channel_queue_stats_deliver_stats,
vhost_stats_fine_stats,
queue_stats_deliver_stats,
vhost_stats_deliver_stats,
channel_stats_deliver_stats,
channel_process_stats,
queue_stats_publish,
queue_exchange_stats_publish,
exchange_stats_publish_out,
old_aggr_stats
]).
%% TODO Define new records!!!
-define(GC_EVENTS, [connection_closed]).
%% TODO remove unused tables
@ -163,7 +179,7 @@
]).
-define(PROC_STATS_TABLES,
[channel_stats]).
[]).
%% Records are only used to retrieve the field position and to facilitate
%% keeping track of the data

View File

@ -61,10 +61,12 @@
-record(slide, {size = 0 :: integer(), % ms window
n = 0 :: integer(), % number of elements in buf1
max_n :: undefined | integer(), % max no of elements
incremental = false :: boolean(),
interval :: integer(),
last = 0 :: integer(), % millisecond timestamp
buf1 = [] :: list(),
buf2 = [] :: list()}).
buf2 = [] :: list(),
total :: any()}).
-spec timestamp() -> timestamp().
@ -102,6 +104,7 @@ new(Size, Opts) ->
max_n = proplists:get_value(max_n, Opts, infinity),
interval = proplists:get_value(interval, Opts, infinity),
last = timestamp(),
incremental = proplists:get_value(incremental, Opts, false),
buf1 = [],
buf2 = []}.
@ -154,9 +157,30 @@ add_element(TS, Evt, Slide) ->
%%
add_element(_TS, _Evt, Slide, Wrap) when Slide#slide.size == 0 ->
add_ret(Wrap, false, Slide);
add_element(TS, Evt, #slide{last = Last, interval = Interval, total = Total0, incremental = true}
= Slide, _Wrap)
when (TS - Last) < Interval ->
Total = add_to_total(Evt, Total0),
Slide#slide{total = Total};
add_element(TS, _Evt, #slide{last = Last, interval = Interval} = Slide, _Wrap)
when (TS - Last) < Interval->
Slide;
add_element(TS, Evt, #slide{last = Last, size = Sz, incremental = true,
n = N, max_n = MaxN, total = Total0,
buf1 = Buf1} = Slide, Wrap) ->
N1 = N+1,
Total = add_to_total(Evt, Total0),
if TS - Last > Sz; N1 > MaxN ->
%% swap
add_ret(Wrap, true, Slide#slide{last = TS,
n = 1,
buf1 = [{TS, Total}],
buf2 = Buf1,
total = Total});
true ->
add_ret(Wrap, false, Slide#slide{n = N1, buf1 = [{TS, Total} | Buf1],
last = TS, total = Total})
end;
add_element(TS, Evt, #slide{last = Last, size = Sz,
n = N, max_n = MaxN,
buf1 = Buf1} = Slide, Wrap) ->
@ -172,6 +196,15 @@ add_element(TS, Evt, #slide{last = Last, size = Sz,
last = TS})
end.
add_to_total(Evt, undefined) ->
Evt;
add_to_total({A0}, {B0}) ->
{B0 + A0};
add_to_total({A0, A1, A2}, {B0, B1, B2}) ->
{B0 + A0, B1 + A1, B2 + A2};
add_to_total({A0, A1, A2, A3, A4, A5, A6}, {B0, B1, B2, B3, B4, B5, B6}) ->
{B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5, B6 + A6}.
add_ret(false, _, Slide) ->
Slide;
add_ret(true, Flag, Slide) ->
@ -189,13 +222,13 @@ to_list(#slide{size = Sz, n = N, max_n = MaxN, buf1 = Buf1, buf2 = Buf2}) ->
-spec last_two(#slide{}) -> [{timestamp(), value()}].
%% @doc Returns the newest 2 elements on the sample
last_two(#slide{buf1 = [H1, H2 | _], buf2 = Buf2}) ->
last_two(#slide{buf1 = [H1, H2 | _]}) ->
[H1, H2];
last_two(#slide{buf1 = [H1], buf2 = [H2 | _]}) ->
[H1, H2];
last_two(#slide{buf1 = [H1], buf2 = []}) ->
[H1];
last_two(#slide{buf1 = [], buf2 = [H1, H2 | T]}) ->
last_two(#slide{buf1 = [], buf2 = [H1, H2 | _]}) ->
[H1, H2];
last_two(#slide{buf1 = [], buf2 = [H1]}) ->
[H1];
@ -211,7 +244,7 @@ last_two(_) ->
%% @end
foldl(_Timestamp, _Fun, _Acc, #slide{size = Sz}) when Sz == 0 ->
[];
foldl(Timestamp, Fun, Acc, #slide{size = Sz, n = N, max_n = MaxN,
foldl(Timestamp, Fun, Acc, #slide{n = N, max_n = MaxN,
buf1 = Buf1, buf2 = Buf2}) ->
Start = Timestamp,
lists:foldr(

View File

@ -208,7 +208,7 @@ handle_call({augment_nodes, Nodes, Ranges}, _From,
handle_call({get_channel, Name, Ranges}, _From,
#state{interval = Interval} = State) ->
case created_event(Name, channel_stats) of
case created_stats(Name, channel_created_stats) of
not_found -> reply(not_found, State);
Ch -> [Result] = detail_channel_stats(Ranges, [Ch], Interval),
reply(Result, State)
@ -224,7 +224,7 @@ handle_call({get_connection, Name, Ranges}, _From,
handle_call({get_all_channels, Ranges}, _From,
#state{interval = Interval} = State) ->
Chans = created_events(channel_stats),
Chans = created_stats(channel_created_stats),
reply(list_channel_stats(Ranges, Chans, Interval), State);
handle_call({get_all_connections, Ranges}, _From,
@ -402,18 +402,67 @@ connection_stats(Ranges, Objs, Interval) ->
end || Obj <- Objs].
list_channel_stats(Ranges, Objs, Interval) ->
merge_stats(Objs, [basic_stats_fun(channel_stats),
simple_stats_fun(Ranges, channel_stats, Interval),
augment_msg_stats_fun()]).
[begin
Id = id_lookup(channel_stats, Obj),
Props = lookup_element(channel_stats, Id), %% TODO needs formatting?
%% TODO rest of stats!
Stats = [{message_stats,
rabbit_mgmt_stats:format(pick_range(fine_stats, Ranges),
channel_stats_fine_stats,
Id, Interval) ++
rabbit_mgmt_stats:format(pick_range(deliver_get, Ranges),
channel_stats_deliver_stats,
Id, Interval)} |
rabbit_mgmt_stats:format(pick_range(process_stats, Ranges),
channel_process_stats,
Id, Interval)],
Details = augment_details(Obj, []),
combine(Props, Obj) ++ Details ++ Stats
end || Obj <- Objs].
%% merge_stats(Objs, [basic_stats_fun(channel_stats),
%% simple_stats_fun(Ranges, channel_stats, Interval),
%% augment_msg_stats_fun()]).
detail_channel_stats(Ranges, Objs, Interval) ->
merge_stats(Objs, [basic_stats_fun(channel_stats),
simple_stats_fun(Ranges, channel_stats, Interval),
consumer_details_fun(
fun (Props) -> pget(pid, Props) end,
consumers_by_channel),
detail_stats_fun(Ranges, ?CHANNEL_DETAILS, Interval),
augment_msg_stats_fun()]).
[begin
Id = id_lookup(channel_stats, Obj),
Props = lookup_element(channel_stats, Id), %% TODO needs formatting?
%% TODO rest of stats!
Stats = [{message_stats,
rabbit_mgmt_stats:format(pick_range(fine_stats, Ranges),
channel_stats_fine_stats,
Id, Interval) ++
rabbit_mgmt_stats:format(pick_range(deliver_get, Ranges),
channel_stats_deliver_stats,
Id, Interval)} |
rabbit_mgmt_stats:format(pick_range(process_stats, Ranges),
channel_process_stats,
Id, Interval)],
Details = augment_details(Obj, []),
StatsD = [{publishes, new_detail_stats(channel_exchange_stats_fine_stats,
fine_stats, first(Id), Ranges,
Interval)},
{deliveries, new_detail_stats(channel_queue_stats_deliver_stats,
fine_stats, first(Id), Ranges,
Interval)}],
combine(Props, Obj) ++ Details ++ Stats ++ StatsD
end || Obj <- Objs].
%% merge_stats(Objs, [basic_stats_fun(channel_stats),
%% simple_stats_fun(Ranges, channel_stats, Interval),
%% consumer_details_fun(
%% fun (Props) -> pget(pid, Props) end,
%% consumers_by_channel),
%% detail_stats_fun(Ranges, ?CHANNEL_DETAILS, Interval),
%% augment_msg_stats_fun()]).
new_detail_stats(Table, Type, Id, Ranges, Interval) ->
[begin
S = rabbit_mgmt_stats:format(pick_range(Type, Ranges), Table, Key, Interval),
[{stats, S} | format_detail_id(revert(Id, Key))]
end || Key <- rabbit_mgmt_stats:get_new_keys(Table, Id)].
vhost_stats(Ranges, Objs, Interval) ->
merge_stats(Objs, [simple_stats_fun(Ranges, vhost_stats, Interval)]).
@ -607,14 +656,6 @@ created_stats(Name, Type) ->
[Elem] -> Elem
end.
created_event(Name, Type) ->
case ets:select(Type, [{{{'_', '$1'}, '$2', '$3'}, [{'==', 'create', '$1'},
{'==', Name, '$3'}],
['$2']}]) of
[] -> not_found;
[Elem] -> Elem
end.
created_stats(Type) ->
%% TODO better tab2list?
ets:select(Type, [{{'_', '_', '$3'}, [], ['$3']}]).
@ -690,7 +731,7 @@ augment_queue_msg_stats_fun() ->
end.
augment_channel_pid(Pid) ->
Ch = lookup_element(channel_stats, {Pid, create}),
Ch = lookup_element(channel_created_stats, Pid, 3),
Conn = lookup_element(connection_created_stats, pget(connection, Ch), 3),
[{name, pget(name, Ch)},
{number, pget(number, Ch)},

View File

@ -17,6 +17,8 @@
-record(state, {table, agent, policies}).
-include_lib("rabbit_common/include/rabbit.hrl").
-spec start_link(atom()) -> rabbit_types:ok_pid_or_error().
-export([name/1]).
@ -25,6 +27,7 @@
code_change/3]).
-import(rabbit_misc, [pget/3]).
-import(rabbit_mgmt_db, [pget/2, lookup_element/3]).
name(Table) ->
list_to_atom((atom_to_list(Table) ++ "_metrics_collector")).
@ -61,7 +64,14 @@ code_change(_OldVsn, State, _Extra) ->
retention_policy(connection_created) -> basic; %% really nothing
retention_policy(connection_metrics) -> basic;
retention_policy(connection_metrics_simple_metrics) -> basic.
retention_policy(connection_metrics_simple_metrics) -> basic;
retention_policy(channel_created) -> basic;
retention_policy(channel_metrics) -> basic;
retention_policy(channel_queue_exchange_metrics) -> detailed;
retention_policy(channel_exchange_metrics) -> detailed;
retention_policy(channel_queue_metrics) -> detailed;
retention_policy(channel_process_metrics) -> basic;
retention_policy(channel_process_stats) -> basic.
take_smaller(Policies) ->
lists:min([I || {_, I} <- Policies]).
@ -70,32 +80,108 @@ aggregate_metrics(Timestamp, Table, Policies, Records) ->
[aggregate_entry(Timestamp, Table, Policies, R) || R <- Records].
aggregate_entry(_TS, connection_created, _, {Id, Metrics}) ->
case ets:member(connection_created_stats, Id) of
true ->
ok;
false ->
Ftd = rabbit_mgmt_format:format(
Metrics,
{fun rabbit_mgmt_format:format_connection_created/1, true}),
ets:insert(connection_created_stats, {Id, pget(name, Ftd, unknown), Ftd})
end;
Ftd = rabbit_mgmt_format:format(
Metrics,
{fun rabbit_mgmt_format:format_connection_created/1, true}),
ets:insert(connection_created_stats, {Id, pget(name, Ftd, unknown), Ftd});
aggregate_entry(_TS, connection_metrics, _, {Id, Metrics}) ->
ets:insert(connection_stats, {Id, Metrics});
aggregate_entry(TS, connection_metrics_simple_metrics, Policies,
{Id, RecvOct, SendOct, Reductions}) ->
[begin
insert_entry(connection_stats_coarse_conn_stats, Id, TS,
{RecvOct, SendOct, Reductions}, Size, Interval),
insert_entry(vhost_stats_coarse_conn_stats, Id, TS,
{RecvOct, SendOct, Reductions}, Size, Interval)
end || {Size, Interval} <- Policies].
{RecvOct, SendOct, Reductions}, Size, Interval, false),
insert_entry(vhost_stats_coarse_conn_stats,
vhost({connection_created_stats, Id}), TS,
{RecvOct, SendOct, Reductions}, Size, Interval, false)
end || {Size, Interval} <- Policies];
aggregate_entry(_TS, channel_created, _, {Id, Metrics}) ->
Ftd = rabbit_mgmt_format:format(Metrics, {[], false}),
ets:insert(channel_created_stats, {Id, pget(name, Ftd, unknown), Ftd});
aggregate_entry(_TS, channel_metrics, _, {Id, Metrics}) ->
ets:insert(channel_stats, {Id, Metrics});
aggregate_entry(TS, channel_exchange_metrics, Policies, {{Ch, X} = Id, Metrics}) ->
%% TODO publish_in only for exchange_stats (aggr)
%% TODO check queue and exchange exists
Stats = {pget(publish, Metrics, 0), pget(confirm, Metrics, 0),
pget(return_unroutable, Metrics, 0)},
Diff = get_difference(Id, Stats),
ets:insert(old_aggr_stats, {Id, Stats}),
[begin
insert_entry(channel_stats_fine_stats, Ch, TS, Diff, Size, Interval,
true),
insert_entry(vhost_stats_fine_stats, vhost(X), TS, Diff, Size,
Interval, true),
insert_entry(channel_exchange_stats_fine_stats, Id,
TS, Stats, Size, Interval, false)
end || {Size, Interval} <- Policies];
aggregate_entry(TS, channel_queue_metrics, Policies, {{Ch, Q} = Id, Metrics}) ->
%% TODO check queue and exchange exists
Deliver = pget(deliver, Metrics, 0),
DeliverNoAck = pget(deliver_no_ack, Metrics, 0),
Get = pget(get, Metrics, 0),
GetNoAck = pget(get_no_ack, Metrics, 0),
Stats = {Get, GetNoAck, Deliver, DeliverNoAck, pget(redeliver, Metrics, 0),
pget(ack, Metrics, 0), Deliver + DeliverNoAck + Get + GetNoAck},
Diff = get_difference(Id, Stats),
ets:insert(old_aggr_stats, {Id, Stats}),
[begin
insert_entry(queue_stats_deliver_stats, Q, TS, Diff, Size, Interval,
true),
insert_entry(vhost_stats_deliver_stats, vhost(Q), TS, Diff, Size,
Interval, true),
insert_entry(channel_stats_deliver_stats, Ch, TS, Diff, Size, Interval,
true),
insert_entry(channel_queue_stats_deliver_stats, Id, TS, Stats, Size,
Interval, false)
end || {Size, Interval} <- Policies];
aggregate_entry(TS, channel_queue_exchange_metrics, Policies, {{_Ch, {Q, X} = Id}, Publish}) ->
%% TODO check queue and exchange exists
Stats = {Publish},
Diff = get_difference(Id, Stats),
ets:insert(old_aggr_stats, {Id, Stats}),
[begin
insert_entry(queue_stats_publish, Q, TS, Diff, Size, Interval, true),
insert_entry(queue_exchange_stats_publish, Id, TS, Diff, Size, Interval, true),
insert_entry(exchange_stats_publish_out, X, TS, Diff, Size, Interval, true)
end || {Size, Interval} <- Policies];
aggregate_entry(TS, channel_process_metrics, Policies, {Id, Reductions}) ->
[begin
insert_entry(channel_process_stats, Id, TS, {Reductions}, Size, Interval,
false)
end || {Size, Interval} <- Policies];
aggregate_entry(_, _, _, _) ->
ok.
insert_entry(Table, Id, TS, Entry, Size, Interval) ->
insert_entry(Table, Id, TS, Entry, Size, Interval, Incremental) ->
Key = {Id, Interval},
Slide = case ets:lookup(Table, Key) of
[{Key, S}] ->
S;
[] ->
exometer_slide:new(Size * 1000, [{interval, Interval * 1000}])
exometer_slide:new(Size * 1000, [{interval, Interval * 1000},
{incremental, Incremental}])
end,
ets:insert(Table, {Key, exometer_slide:add_element(TS, Entry, Slide)}).
get_difference(Id, Stats) ->
case ets:lookup(old_aggr_stats, Id) of
[] ->
Stats;
[{Id, OldStats}] ->
difference(OldStats, Stats)
end.
difference({A0}, {B0}) ->
{B0 - A0};
difference({A0, A1, A2}, {B0, B1, B2}) ->
{B0 - A0, B1 - A1, B2 - A2};
difference({A0, A1, A2, A3, A4, A5, A6}, {B0, B1, B2, B3, B4, B5, B6}) ->
{B0 - A0, B1 - A1, B2 - A2, B3 - A3, B4 - A4, B5 - A5, B6 - A6}.
vhost(#resource{virtual_host = VHost}) ->
VHost;
vhost({queue_stats, #resource{virtual_host = VHost}}) ->
VHost;
vhost({TName, Pid}) ->
pget(vhost, lookup_element(TName, Pid, 3)).

View File

@ -22,7 +22,8 @@
-export([blank/1, is_blank/3, record/5, format/5, sum/1, gc/3,
free/1, delete_stats/2, get_keys/2]).
-export([format/4]).
-export([format/4,
get_new_keys/2]).
-import(rabbit_misc, [pget/2]).
@ -103,6 +104,14 @@ delete_complex_stats_loop(Table, [{Id} | Ids]) ->
get_keys(Table, Id0) ->
ets:select(rabbit_mgmt_stats_tables:key_index(Table), match_spec_keys(Id0)).
get_new_keys(Table, Id0) ->
ets:select(Table, match_spec_new_keys(Id0)).
match_spec_new_keys(Id) ->
MatchCondition = to_match_condition(Id),
MatchHead = {{{'$1', '$2'}, '_'}, '_'},
[{MatchHead, [MatchCondition], [{{'$1', '$2'}}]}].
%%----------------------------------------------------------------------------
%% Event-time
%%----------------------------------------------------------------------------
@ -139,7 +148,7 @@ format(Range, Table, Id, Interval) ->
calculate_instant_rate(Table, Id, RangePoint) ->
case ets:lookup(Table, {Id, select_smaller_sample(Table)}) of
[] ->
[];
{new_empty(Table, 0), new_empty(Table, 0.0)};
[{_, Slide}] ->
case exometer_slide:last_two(Slide) of
[] ->
@ -209,10 +218,25 @@ append_missing_samples(MissingSamples, Sample, Samples, Totals) ->
missing_samples(Next, Incr, TS) ->
lists:seq(Next, TS, Incr).
%% connection_stats_coarse_conn_stats
%% connection_stats_coarse_conn_stats, channel_stats_fine_stats,
%% vhost_stats_fine_stats, channel_exchange_stats_fine_stats
append_full_sample(TS, {V1, V2, V3}, {S1, S2, S3}, {T1, T2, T3}) ->
{{append_sample(V1, TS, S1), append_sample(V2, TS, S2), append_sample(V3, TS, S3)},
{V1 + T1, V2 + T2, V3 + T3}}.
{V1 + T1, V2 + T2, V3 + T3}};
%% channel_queue_stats_deliver_stats, queue_stats_deliver_stats,
%% vhost_stats_deliver_stats, channel_stats_deliver_stats
append_full_sample(TS, {V1, V2, V3, V4, V5, V6, V7},
{S1, S2, S3, S4, S5, S6, S7},
{T1, T2, T3, T4, T5, T6, T7}) ->
{{append_sample(V1, TS, S1), append_sample(V2, TS, S2),
append_sample(V3, TS, S3), append_sample(V4, TS, S4),
append_sample(V5, TS, S5), append_sample(V6, TS, S6),
append_sample(V7, TS, S7)},
{V1 + T1, V2 + T2, V3 + T3, V4 + T4, V5 + T5, V6 + T6, V7 + T7}};
%% channel_process_stats, queue_stats_publish, queue_exchange_stats_publish,
%% exchange_stats_publish_out
append_full_sample(TS, {V1}, {S1}, {T1}) ->
{{append_sample(V1, TS, S1)}, {V1 + T1}}.
select_range_sample(Table, #range{first = First, last = Last}) ->
Range = Last - First,
@ -223,14 +247,6 @@ select_range_sample(Table, #range{first = First, last = Last}) ->
{_, Sample} = select_largest_below(T, TablePolicies, Range),
Sample.
select_sample(Table, Interval) ->
{ok, Policies} = application:get_env(
rabbitmq_management, sample_retention_policies),
Policy = retention_policy(Table),
TablePolicies = proplists:get_value(Policy, Policies),
[V | Values] = lists:sort([I || {_, I} <- TablePolicies]),
select_largest_below(V, Values, Interval).
select_smaller_sample(Table) ->
{ok, Policies} = application:get_env(
rabbitmq_management, sample_retention_policies),
@ -247,6 +263,28 @@ select_largest_below(_, [H | T], Interval) ->
select_largest_below(H, T, Interval).
retention_policy(connection_stats_coarse_conn_stats) ->
basic;
retention_policy(channel_stats_fine_stats) ->
basic;
retention_policy(channel_queue_stats_deliver_stats) ->
detailed;
retention_policy(channel_exchange_stats_fine_stats) ->
detailed;
retention_policy(channel_process_stats) ->
basic;
retention_policy(vhost_stats_fine_stats) ->
global;
retention_policy(vhost_stats_deliver_stats) ->
global;
retention_policy(channel_stats_deliver_stats) ->
basic;
retention_policy(queue_stats_deliver_stats) ->
basic;
retention_policy(queue_stats_publish) ->
basic;
retention_policy(queue_exchange_stats_publish) ->
basic;
retention_policy(exhcange_stats_publish_out) ->
basic.
format_rate(connection_stats_coarse_conn_stats, {TR, TS, TRe}, {RR, RS, RRe}) ->
@ -257,6 +295,56 @@ format_rate(connection_stats_coarse_conn_stats, {TR, TS, TRe}, {RR, RS, RRe}) ->
{recv_oct_details, [{rate, RR}]},
{reductions, TRe},
{reductions_details, [{rate, RRe}]}
];
format_rate(Type, {TP, TC, TRe}, {RP, RC, RRe})
when Type =:= channel_stats_fine_stats;
Type =:= vhost_stats_fine_stats;
Type =:= channel_exchange_stats_fine_stats ->
[
{publish, TP},
{publish_details, [{rate, RP}]},
{confirm, TC},
{confirm_details, [{rate, RC}]},
{return_unroutable, TRe},
{return_unroutable_details, [{rate, RRe}]}
];
format_rate(Type, {TG, TGN, TD, TDN, TR, TA, TDG},
{RG, RGN, RD, RDN, RR, RA, RDG})
when Type =:= channel_queue_stats_deliver_stats;
Type =:= channel_stats_deliver_stats;
Type =:= vhost_stats_deliver_stats;
Type =:= queue_stats_deliver_stats ->
[
{get, TG},
{get_details, [{rate, RG}]},
{get_no_ack, TGN},
{get_no_ack_details, [{rate, TGN}]},
{deliver, TD},
{deliver_details, [{rate, RD}]},
{deliver_no_ack, TDN},
{deliver_no_ack_details, [{rate, RDN}]},
{redeliver, TR},
{redeliver_details, [{rate, RR}]},
{ack, TA},
{ack_details, [{rate, RA}]},
{deliver_get, TDG},
{deliver_get_details, [{rate, RDG}]}
];
format_rate(channel_process_stats, {TR}, {RR}) ->
[
{reductions, TR},
{reductions_details, [{rate, RR}]}
];
format_rate(exchange_stats_publish_out, {TP}, {RP}) ->
[
{publish_out, TP},
{publish_out_details, [{rate, RP}]}
];
format_rate(Type, {TP}, {RP}) when Type =:= queue_stats_publish;
Type =:= queue_exchange_stats_publish ->
[
{publish, TP},
{publish_details, [{rate, RP}]}
].
format_rate(connection_stats_coarse_conn_stats, {TR, TS, TRe}, {RR, RS, RRe},
@ -271,6 +359,72 @@ format_rate(connection_stats_coarse_conn_stats, {TR, TS, TRe}, {RR, RS, RRe},
{reductions, TRe},
{reductions_details, [{rate, RRe},
{samples, SRe}] ++ average(SRe, STRe, Length)}
];
format_rate(Type, {TP, TC, TRe}, {RP, RC, RRe},
{SP, SC, SRe}, {STP, STC, STRe}, Length)
when Type =:= channel_stats_fine_stats;
Type =:= vhost_stats_fine_stats;
Type =:= channel_exchange_stats_fine_stats ->
[
{publish, TP},
{publish_details, [{rate, RP},
{samples, SP}] ++ average(SP, STP, Length)},
{confirm, TC},
{confirm_details, [{rate, RC},
{samples, SC}] ++ average(SC, STC, Length)},
{return_unroutable, TRe},
{return_unroutable_details, [{rate, RRe},
{samples, SRe}] ++ average(SRe, STRe, Length)}
];
format_rate(Type, {TG, TGN, TD, TDN, TR, TA, TDG}, {RG, RGN, RD, RDN, RR, RA, RDG},
{SG, SGN, SD, SDN, SR, SA, SDG}, {STG, STGN, STD, STDN, STR, STA, STDG},
Length)
when Type =:= channel_queue_stats_deliver_stats;
Type =:= channel_stats_deliver_stats;
Type =:= vhost_stats_deliver_stats;
Type =:= queue_stats_deliver_stats ->
[
{get, TG},
{get_details, [{rate, RG},
{samples, SG}] ++ average(SG, STG, Length)},
{get_no_ack, TGN},
{get_no_ack_details, [{rate, TGN},
{samples, SGN}] ++ average(SGN, STGN, Length)},
{deliver, TD},
{deliver_details, [{rate, RD},
{samples, SD}] ++ average(SD, STD, Length)},
{deliver_no_ack, TDN},
{deliver_no_ack_details, [{rate, RDN},
{samples, SDN}] ++ average(SDN, STDN, Length)},
{redeliver, TR},
{redeliver_details, [{rate, RR},
{samples, SR}] ++ average(SR, STR, Length)},
{ack, TA},
{ack_details, [{rate, RA},
{samples, SA}] ++ average(SA, STA, Length)},
{deliver_get, TDG},
{deliver_get_details, [{rate, RDG},
{samples, SDG}] ++ average(SDG, STDG, Length)}
];
format_rate(channel_process_stats, {TR}, {RR}, {SR}, {STR}, Length) ->
[
{reductions, TR},
{reductions_details, [{rate, RR},
{samples, SR}] ++ average(SR, STR, Length)}
];
format_rate(exchange_stats_publish_out, {TP}, {RP}, {SP}, {STP}, Length) ->
[
{publish_out, TP},
{publish_out_details, [{rate, RP},
{samples, SP}] ++ average(SP, STP, Length)}
];
format_rate(Type, {TP}, {RP}, {SP}, {STP}, Length)
when Type =:= queue_stats_publish;
Type =:= queue_exchange_stats_publish ->
[
{publish_out, TP},
{publish_out_details, [{rate, RP},
{samples, SP}] ++ average(SP, STP, Length)}
].
average(_Samples, _Total, Length) when Length =< 1->
@ -303,13 +457,35 @@ rate_from_last_increment(Total, [H | _T]) ->
rate_from_difference({TS0, {A0, A1, A2}}, {TS1, {B0, B1, B2}}) ->
Interval = TS0 - TS1,
{rate(A0 - B0, Interval), rate(A1 - B1, Interval), rate(A2 - B2, Interval)}.
{rate(A0 - B0, Interval), rate(A1 - B1, Interval), rate(A2 - B2, Interval)};
rate_from_difference({TS0, {A0, A1, A2, A3, A4, A5, A6}},
{TS1, {B0, B1, B2, B3, B4, B5, B6}}) ->
Interval = TS0 - TS1,
{rate(A0 - B0, Interval), rate(A1 - B1, Interval), rate(A2 - B2, Interval),
rate(A3 - B3, Interval), rate(A4 - B4, Interval), rate(A5 - B5, Interval),
rate(A6 - B6, Interval)};
rate_from_difference({TS0, {A0}}, {TS1, {B0}}) ->
Interval = TS0 - TS1,
{rate(A0 - B0, Interval)}.
rate(V, Interval) ->
V * 1000 / Interval.
new_empty(connection_stats_coarse_conn_stats, V) ->
{V, V, V}.
new_empty(Type, V) when Type =:= connection_stats_coarse_conn_stats;
Type =:= channel_stats_fine_stats;
Type =:= channel_exchange_stats_fine_stats;
Type =:= vhost_stats_fine_stats ->
{V, V, V};
new_empty(Type, V) when Type =:= channel_queue_stats_deliver_stats;
Type =:= queue_stats_deliver_stats;
Type =:= vhost_stats_deliver_stats;
Type =:= channel_stats_deliver_stats ->
{V, V, V, V, V, V, V};
new_empty(Type, V) when Type =:= channel_process_stats;
Type =:= queue_stats_publish;
Type =:= queue_exchange_stats_publish;
Type =:= exchange_stats_publish_out ->
{V}.
format(no_range, Table, Id, Interval, Type) ->
Now = time_compat:os_system_time(milli_seconds),

View File

@ -7,7 +7,7 @@
{env, [{listener, [{port, 15672}]},
{http_log_dir, none},
{load_definitions, none},
{rates_mode, basic},
{rates_mode, detailed},
{sample_retention_policies,
%% List of {MaxAgeInSeconds, SampleEveryNSeconds}
[{global, [{605, 5}, {3660, 60}, {29400, 600}, {86400, 1800}]},