Fix vhost stats

This commit is contained in:
Diana Corbacho 2016-08-25 18:58:36 +01:00
parent f2b455e451
commit c1ac5e6ea9
5 changed files with 101 additions and 45 deletions

View File

@ -91,6 +91,7 @@
consumer_stats,
queue_stats,
queue_msg_stats,
vhost_msg_stats,
queue_process_stats,
node_stats,
node_coarse_stats,

View File

@ -544,7 +544,24 @@ new_detail_stats(Table, Type, Id, Ranges, Interval) ->
end || Key <- rabbit_mgmt_stats:get_new_keys(Table, Id)].
vhost_stats(Ranges, Objs, Interval) ->
merge_stats(Objs, [simple_stats_fun(Ranges, vhost_stats, Interval)]).
[begin
Id = id_lookup(vhost_stats, Obj),
Stats = rabbit_mgmt_stats:format(pick_range(coarse_conn_stats, Ranges),
vhost_stats_coarse_conn_stats,
Id, Interval)
++ rabbit_mgmt_stats:format(pick_range(queue_msg_rates, Ranges),
vhost_msg_stats, Id, Interval),
StatsD = [{message_stats, rabbit_mgmt_stats:format(pick_range(fine_stats, Ranges),
vhost_stats_fine_stats,
Id, Interval)
++ rabbit_mgmt_stats:format(pick_range(deliver_get, Ranges),
vhost_stats_deliver_stats,
Id, Interval)}],
Details = augment_details(Obj, []),
Obj ++ Details ++ Stats ++ StatsD
end || Obj <- Objs].
%% merge_stats(Objs, [simple_stats_fun(Ranges, vhost_stats, Interval)]).
node_stats(Ranges, Objs, Interval) ->
[begin

View File

@ -40,20 +40,22 @@ init([Table]) ->
{ok, Policies} = application:get_env(
rabbitmq_management, sample_retention_policies),
Policy = retention_policy(Table),
TablePolicies = proplists:get_value(Policy, Policies),
Interval = take_smaller(TablePolicies),
Interval = take_smaller(proplists:get_value(Policy, Policies)),
{ok, Agent} = rabbit_mgmt_agent_collector_sup:start_child(self(), Table,
Interval * 1000),
{ok, #state{table = Table, agent = Agent, policies = TablePolicies,
{ok, #state{table = Table, agent = Agent,
policies = {proplists:get_value(basic, Policies),
proplists:get_value(detailed, Policies),
proplists:get_value(global, Policies)},
rates_mode = RatesMode}}.
handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast({metrics, Timestamp, Records}, State = #state{table = Table,
policies = TablePolicies,
policies = Policies,
rates_mode = RatesMode}) ->
aggregate_metrics(Timestamp, Table, TablePolicies, Records, RatesMode),
aggregate_metrics(Timestamp, Table, Policies, Records, RatesMode),
{noreply, State}.
handle_info(_Msg, State) ->
@ -95,22 +97,23 @@ aggregate_entry(_TS, connection_created, _, {Id, Metrics}, _) ->
ets:insert(connection_created_stats, {Id, pget(name, Ftd, unknown), Ftd});
aggregate_entry(_TS, connection_metrics, _, {Id, Metrics}, _) ->
ets:insert(connection_stats, {Id, Metrics});
aggregate_entry(TS, connection_coarse_metrics, Policies,
aggregate_entry(TS, connection_coarse_metrics, {BPolicies, _, GPolicies},
{Id, RecvOct, SendOct, Reductions}, _) ->
%% VHOSTS are aggregated??
Stats = {RecvOct, SendOct},
Diff = get_difference(Id, Stats),
ets:insert(old_aggr_stats, {Id, Stats}),
[insert_entry(vhost_stats_coarse_conn_stats, vhost({connection_created_stats, Id}),
TS, Diff, Size, Interval, true) || {Size, Interval} <- GPolicies],
[begin
insert_entry(connection_stats_coarse_conn_stats, Id, TS,
{RecvOct, SendOct, Reductions}, Size, Interval, false),
insert_entry(vhost_stats_coarse_conn_stats,
vhost({connection_created_stats, Id}), TS,
{RecvOct, SendOct, Reductions}, Size, Interval, false)
end || {Size, Interval} <- Policies];
end || {Size, Interval} <- BPolicies];
aggregate_entry(_TS, channel_created, _, {Id, Metrics}, _) ->
Ftd = rabbit_mgmt_format:format(Metrics, {[], false}),
ets:insert(channel_created_stats, {Id, pget(name, Ftd, unknown), Ftd});
aggregate_entry(_TS, channel_metrics, _, {Id, Metrics}, _) ->
ets:insert(channel_stats, {Id, Metrics});
aggregate_entry(TS, channel_exchange_metrics, Policies, {{Ch, X} = Id, Metrics},
aggregate_entry(TS, channel_exchange_metrics, {_, DPolicies, _}, {{Ch, X} = Id, Metrics},
RatesMode) ->
Stats = {pget(publish, Metrics, 0), pget(confirm, Metrics, 0),
pget(return_unroutable, Metrics, 0)},
@ -121,22 +124,22 @@ aggregate_entry(TS, channel_exchange_metrics, Policies, {{Ch, X} = Id, Metrics},
true),
insert_entry(vhost_stats_fine_stats, vhost(X), TS, Diff, Size,
Interval, true)
end || {Size, Interval} <- Policies],
end || {Size, Interval} <- DPolicies],
case {exchange_exists(X), RatesMode} of
{true, basic} ->
[insert_entry(exchange_stats_publish_in, X, TS, {Publish}, Size, Interval,
true) || {Size, Interval} <- Policies];
true) || {Size, Interval} <- DPolicies];
{true, _} ->
[begin
insert_entry(exchange_stats_publish_in, X, TS, {Publish}, Size, Interval,
true),
insert_entry(channel_exchange_stats_fine_stats, Id, TS, Stats,
Size, Interval, false)
end || {Size, Interval} <- Policies];
end || {Size, Interval} <- DPolicies];
_ ->
ok
end;
aggregate_entry(TS, channel_queue_metrics, Policies, {{Ch, Q} = Id, Metrics},
aggregate_entry(TS, channel_queue_metrics, {_, DPolicies, _}, {{Ch, Q} = Id, Metrics},
RatesMode) ->
Deliver = pget(deliver, Metrics, 0),
DeliverNoAck = pget(deliver_no_ack, Metrics, 0),
@ -151,53 +154,52 @@ aggregate_entry(TS, channel_queue_metrics, Policies, {{Ch, Q} = Id, Metrics},
Interval, true),
insert_entry(channel_stats_deliver_stats, Ch, TS, Diff, Size, Interval,
true)
end || {Size, Interval} <- Policies],
end || {Size, Interval} <- DPolicies],
case {queue_exists(Q), RatesMode} of
{true, basic} ->
[insert_entry(queue_stats_deliver_stats, Q, TS, Diff, Size, Interval,
true) || {Size, Interval} <- Policies];
true) || {Size, Interval} <- DPolicies];
{true, _} ->
[begin
insert_entry(queue_stats_deliver_stats, Q, TS, Diff, Size, Interval,
true),
insert_entry(channel_queue_stats_deliver_stats, Id, TS, Stats, Size,
Interval, false)
end || {Size, Interval} <- Policies];
end || {Size, Interval} <- DPolicies];
_ ->
ok
end;
aggregate_entry(TS, channel_queue_exchange_metrics, Policies,
aggregate_entry(TS, channel_queue_exchange_metrics, {_, DPolicies, _},
{{_Ch, {Q, X} = Id}, Publish}, RatesMode) ->
Stats = {Publish},
Diff = get_difference(Id, Stats),
ets:insert(old_aggr_stats, {Id, Stats}),
%% channel_exch, queue_exch, echange_stats
case {queue_exists(Q), exchange_exists(Q), RatesMode} of
{true, false, _} ->
[insert_entry(queue_stats_publish, Q, TS, Diff, Size, Interval, true)
|| {Size, Interval} <- Policies];
|| {Size, Interval} <- DPolicies];
{false, true, _} ->
[insert_entry(exchange_stats_publish_out, X, TS, Diff, Size, Interval, true)
|| {Size, Interval} <- Policies];
|| {Size, Interval} <- DPolicies];
{true, true, basic} ->
[begin
insert_entry(queue_stats_publish, Q, TS, Diff, Size, Interval, true),
insert_entry(exchange_stats_publish_out, X, TS, Diff, Size, Interval, true)
end || {Size, Interval} <- Policies];
end || {Size, Interval} <- DPolicies];
{true, true, _} ->
[begin
insert_entry(queue_stats_publish, Q, TS, Diff, Size, Interval, true),
insert_entry(exchange_stats_publish_out, X, TS, Diff, Size, Interval, true),
insert_entry(queue_exchange_stats_publish, Id, TS, Diff, Size, Interval, true)
end || {Size, Interval} <- Policies];
end || {Size, Interval} <- DPolicies];
_ ->
ok
end;
aggregate_entry(TS, channel_process_metrics, Policies, {Id, Reductions}, _) ->
aggregate_entry(TS, channel_process_metrics, {BPolicies, _, _}, {Id, Reductions}, _) ->
[begin
insert_entry(channel_process_stats, Id, TS, {Reductions}, Size, Interval,
false)
end || {Size, Interval} <- Policies];
end || {Size, Interval} <- BPolicies];
aggregate_entry(_TS, consumer_created, _, {Id, Exclusive, AckRequired,
PrefetchCount, Args}, _) ->
Fmt = rabbit_mgmt_format:format([{exclusive, Exclusive},
@ -216,9 +218,13 @@ aggregate_entry(_TS, queue_metrics, _, {Id, Metrics}, _) ->
false ->
ok
end;
aggregate_entry(TS, queue_coarse_metrics, Policies, {Name, Ready, Unack, Msgs,
Red}, _) ->
%% TODO vhost stats ready, unack, msg
aggregate_entry(TS, queue_coarse_metrics, {BPolicies, _, GPolicies},
{Name, Ready, Unack, Msgs, Red}, _) ->
Stats = {Ready, Unack, Msgs},
Diff = get_difference(Name, Stats),
ets:insert(old_aggr_stats, {Name, Stats}),
[insert_entry(vhost_msg_stats, vhost(Name), TS, Diff, Size, Interval, true)
|| {Size, Interval} <- GPolicies],
case queue_exists(Name) of
true ->
[begin
@ -226,20 +232,20 @@ aggregate_entry(TS, queue_coarse_metrics, Policies, {Name, Ready, Unack, Msgs,
Size, Interval, false),
insert_entry(queue_msg_stats, Name, TS, {Ready, Unack, Msgs},
Size, Interval, false)
end || {Size, Interval} <- Policies];
end || {Size, Interval} <- BPolicies];
_ ->
ok
end;
aggregate_entry(_TS, node_metrics, _, {Id, Metrics}, _) ->
ets:insert(node_stats, {Id, Metrics});
aggregate_entry(TS, node_coarse_metrics, Policies, {Id, Metrics}, _) ->
aggregate_entry(TS, node_coarse_metrics, {_, _, GPolicies}, {Id, Metrics}, _) ->
Stats = {pget(fd_used, Metrics, 0), pget(sockets_used, Metrics, 0),
pget(mem_used, Metrics, 0), pget(disk_free, Metrics, 0),
pget(proc_used, Metrics, 0), pget(gc_num, Metrics, 0),
pget(gc_bytes_reclaimed, Metrics, 0), pget(context_switches, Metrics, 0)},
[insert_entry(node_coarse_stats, Id, TS, Stats, Size, Interval, false)
|| {Size, Interval} <- Policies];
aggregate_entry(TS, node_persister_metrics, Policies, {Id, Metrics}, _) ->
|| {Size, Interval} <- GPolicies];
aggregate_entry(TS, node_persister_metrics, {_, _, GPolicies}, {Id, Metrics}, _) ->
Stats = {pget(io_read_count, Metrics, 0), pget(io_read_bytes, Metrics, 0),
pget(io_read_time, Metrics, 0), pget(io_write_count, Metrics, 0),
pget(io_write_bytes, Metrics, 0), pget(io_write_time, Metrics, 0),
@ -253,13 +259,13 @@ aggregate_entry(TS, node_persister_metrics, Policies, {Id, Metrics}, _) ->
pget(io_file_handle_open_attempt_count, Metrics, 0),
pget(io_file_handle_open_attempt_time, Metrics, 0)},
[insert_entry(node_persister_stats, Id, TS, Stats, Size, Interval, false)
|| {Size, Interval} <- Policies];
aggregate_entry(TS, node_node_metrics, Policies, {Id, Metrics}, _) ->
|| {Size, Interval} <- GPolicies];
aggregate_entry(TS, node_node_metrics, {_, _, GPolicies}, {Id, Metrics}, _) ->
Stats = {pget(send_bytes, Metrics, 0), pget(recv_bytes, Metrics, 0)},
CleanMetrics = lists:keydelete(recv_bytes, 1, lists:keydelete(send_bytes, 1, Metrics)),
ets:insert(node_node_stats, {Id, CleanMetrics}),
[insert_entry(node_node_coarse_stats, Id, TS, Stats, Size, Interval, false)
|| {Size, Interval} <- Policies].
|| {Size, Interval} <- GPolicies].
insert_entry(Table, Id, TS, Entry, Size, Interval, Incremental) ->
Key = {Id, Interval},
@ -282,6 +288,8 @@ get_difference(Id, Stats) ->
difference({A0}, {B0}) ->
{B0 - A0};
difference({A0, A1}, {B0, B1}) ->
{B0 - A0, B1 - A0};
difference({A0, A1, A2}, {B0, B1, B2}) ->
{B0 - A0, B1 - A1, B2 - A2};
difference({A0, A1, A2, A3, A4, A5, A6}, {B0, B1, B2, B3, B4, B5, B6}) ->

View File

@ -92,6 +92,7 @@ code_change(_OldVsn, State, _Extra) ->
remove_connection(Id, Intervals) ->
ets:delete(connection_created_stats, Id),
ets:delete(connection_stats, Id),
ets:delete(old_aggr_stats, Id),
delete_samples(connection_stats_coarse_conn_stats, Id, Intervals),
delete_samples(vhost_stats_coarse_conn_stats, Id, Intervals).
@ -125,6 +126,7 @@ remove_queue(Name, BIntervals, DIntervals) ->
ets:select_delete(queue_exchange_stats_publish, match_interval_spec({Name})),
delete_samples(queue_process_stats, Name, BIntervals),
delete_samples(queue_msg_stats, Name, BIntervals),
ets:delete(old_aggr_stats, Name),
ets:select_delete(old_aggr_stats, match_second_spec({Name})),
ets:select_delete(consumer_stats, match_queue_consumer_spec({Name})),
ok.

View File

@ -220,7 +220,7 @@ missing_samples(Next, Incr, TS) ->
%% connection_stats_coarse_conn_stats, channel_stats_fine_stats,
%% vhost_stats_fine_stats, channel_exchange_stats_fine_stats,
%% queue_msg_stats
%% queue_msg_stats, vhost_msg_stats
append_full_sample(TS, {V1, V2, V3}, {S1, S2, S3}, {T1, T2, T3}) ->
{{append_sample(V1, TS, S1), append_sample(V2, TS, S2), append_sample(V3, TS, S3)},
{V1 + T1, V2 + T2, V3 + T3}};
@ -269,7 +269,7 @@ append_full_sample(TS,
{V1 + T1, V2 + T2, V3 + T3, V4 + T4, V5 + T5, V6 + T6, V7 + T7, V8 + T8,
V9 + T9, V10 + T10, V11 + T11, V12 + T12, V13 + T13, V14 + T14, V15 + T15,
V16 + T16, V17 + T17, V18 + T18, V19 + T19, V20 + T20}};
%% node_node_coarse_stats
%% node_node_coarse_stats, vhost_stats_coarse_connection_stats
append_full_sample(TS, {V1, V2}, {S1, S2}, {T1, T2}) ->
{{append_sample(V1, TS, S1), append_sample(V2, TS, S2)}, {V1 + T1, V2 + T2}}.
@ -311,6 +311,8 @@ retention_policy(vhost_stats_fine_stats) ->
global;
retention_policy(vhost_stats_deliver_stats) ->
global;
retention_policy(vhost_stats_coarse_conn_stats) ->
global;
retention_policy(channel_stats_deliver_stats) ->
basic;
retention_policy(queue_stats_deliver_stats) ->
@ -327,6 +329,8 @@ retention_policy(queue_process_stats) ->
basic;
retention_policy(queue_msg_stats) ->
basic;
retention_policy(vhost_msg_stats) ->
global;
retention_policy(node_coarse_stats) ->
global;
retention_policy(node_persister_stats) ->
@ -343,6 +347,13 @@ format_rate(connection_stats_coarse_conn_stats, {TR, TS, TRe}, {RR, RS, RRe}) ->
{reductions, TRe},
{reductions_details, [{rate, RRe}]}
];
format_rate(vhost_stats_coarse_conn_stats, {TR, TS}, {RR, RS}) ->
[
{send_oct, TS},
{send_oct_details, [{rate, RS}]},
{recv_oct, TR},
{recv_oct_details, [{rate, RR}]}
];
format_rate(Type, {TP, TC, TRe}, {RP, RC, RRe})
when Type =:= channel_stats_fine_stats;
Type =:= vhost_stats_fine_stats;
@ -399,7 +410,8 @@ format_rate(Type, {TP}, {RP}) when Type =:= queue_stats_publish;
{publish, TP},
{publish_details, [{rate, RP}]}
];
format_rate(queue_msg_stats, {TR, TU, TM}, {RR, RU, RM}) ->
format_rate(Type, {TR, TU, TM}, {RR, RU, RM}) when Type =:= queue_msg_stats;
Type =:= vhost_msg_stats ->
[
{messages_ready, TR},
{messages_ready_details, [{rate, RR}]},
@ -503,6 +515,16 @@ format_rate(connection_stats_coarse_conn_stats, {TR, TS, TRe}, {RR, RS, RRe},
{reductions_details, [{rate, RRe},
{samples, SRe}] ++ average(SRe, STRe, Length)}
];
format_rate(vhost_stats_coarse_conn_stats, {TR, TS}, {RR, RS}, {SR, SS},
{STR, STS}, Length) ->
[
{send_oct, TS},
{send_oct_details, [{rate, RS},
{samples, SS}] ++ average(SS, STS, Length)},
{recv_oct, TR},
{recv_oct_details, [{rate, RR},
{samples, SR}] ++ average(SR, STR, Length)}
];
format_rate(Type, {TP, TC, TRe}, {RP, RC, RRe},
{SP, SC, SRe}, {STP, STC, STRe}, Length)
when Type =:= channel_stats_fine_stats;
@ -577,8 +599,9 @@ format_rate(Type, {TP}, {RP}, {SP}, {STP}, Length)
{publish_out_details, [{rate, RP},
{samples, SP}] ++ average(SP, STP, Length)}
];
format_rate(queue_msg_stats, {TR, TU, TM}, {RR, RU, RM},
{SR, SU, SM}, {STR, STU, STM}, Length) ->
format_rate(Type, {TR, TU, TM}, {RR, RU, RM}, {SR, SU, SM}, {STR, STU, STM},
Length) when Type =:= queue_msg_stats;
Type =:= vhost_msg_stats ->
[
{messages_ready, TR},
{messages_ready_details, [{rate, RR},
@ -745,6 +768,9 @@ rate_from_last_increment(Total, [H | _T]) ->
rate_from_difference({TS0, {A0, A1, A2}}, {TS1, {B0, B1, B2}}) ->
Interval = TS0 - TS1,
{rate(A0 - B0, Interval), rate(A1 - B1, Interval), rate(A2 - B2, Interval)};
rate_from_difference({TS0, {A0, A1}}, {TS1, {B0, B1}}) ->
Interval = TS0 - TS1,
{rate(A0 - B0, Interval), rate(A1 - B1, Interval)};
rate_from_difference({TS0, {A0, A1, A2, A3, A4, A5, A6}},
{TS1, {B0, B1, B2, B3, B4, B5, B6}}) ->
Interval = TS0 - TS1,
@ -780,7 +806,8 @@ new_empty(Type, V) when Type =:= connection_stats_coarse_conn_stats;
Type =:= channel_stats_fine_stats;
Type =:= channel_exchange_stats_fine_stats;
Type =:= vhost_stats_fine_stats;
Type =:= queue_msg_stats ->
Type =:= queue_msg_stats;
Type =:= vhost_msg_stats ->
{V, V, V};
new_empty(Type, V) when Type =:= channel_queue_stats_deliver_stats;
Type =:= queue_stats_deliver_stats;
@ -798,7 +825,8 @@ new_empty(node_coarse_stats, V) ->
{V, V, V, V, V, V, V, V};
new_empty(node_persister_stats, V) ->
{V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V};
new_empty(node_node_coarse_stats, V) ->
new_empty(Type, V) when Type =:= node_node_coarse_stats;
Type =:= vhost_stats_coarse_conn_stats ->
{V, V}.
format(no_range, Table, Id, Interval, Type) ->