From c1ac5e6ea95752fbd4c4723ca9b980a0f6111e00 Mon Sep 17 00:00:00 2001 From: Diana Corbacho Date: Thu, 25 Aug 2016 18:58:36 +0100 Subject: [PATCH] Fix vhost stats --- .../include/rabbit_mgmt_metrics.hrl | 1 + .../src/rabbit_mgmt_db.erl | 19 ++++- .../src/rabbit_mgmt_metrics_collector.erl | 82 ++++++++++--------- .../src/rabbit_mgmt_metrics_gc.erl | 2 + .../src/rabbit_mgmt_stats.erl | 42 ++++++++-- 5 files changed, 101 insertions(+), 45 deletions(-) diff --git a/deps/rabbitmq_management/include/rabbit_mgmt_metrics.hrl b/deps/rabbitmq_management/include/rabbit_mgmt_metrics.hrl index 4a820008d9..9d39c24498 100644 --- a/deps/rabbitmq_management/include/rabbit_mgmt_metrics.hrl +++ b/deps/rabbitmq_management/include/rabbit_mgmt_metrics.hrl @@ -91,6 +91,7 @@ consumer_stats, queue_stats, queue_msg_stats, + vhost_msg_stats, queue_process_stats, node_stats, node_coarse_stats, diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_db.erl b/deps/rabbitmq_management/src/rabbit_mgmt_db.erl index 6e5625e6b1..70db557858 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_db.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_db.erl @@ -544,7 +544,24 @@ new_detail_stats(Table, Type, Id, Ranges, Interval) -> end || Key <- rabbit_mgmt_stats:get_new_keys(Table, Id)]. vhost_stats(Ranges, Objs, Interval) -> - merge_stats(Objs, [simple_stats_fun(Ranges, vhost_stats, Interval)]). + [begin + Id = id_lookup(vhost_stats, Obj), + Stats = rabbit_mgmt_stats:format(pick_range(coarse_conn_stats, Ranges), + vhost_stats_coarse_conn_stats, + Id, Interval) + ++ rabbit_mgmt_stats:format(pick_range(queue_msg_rates, Ranges), + vhost_msg_stats, Id, Interval), + StatsD = [{message_stats, rabbit_mgmt_stats:format(pick_range(fine_stats, Ranges), + vhost_stats_fine_stats, + Id, Interval) + ++ rabbit_mgmt_stats:format(pick_range(deliver_get, Ranges), + vhost_stats_deliver_stats, + Id, Interval)}], + Details = augment_details(Obj, []), + Obj ++ Details ++ Stats ++ StatsD + end || Obj <- Objs]. + +%% merge_stats(Objs, [simple_stats_fun(Ranges, vhost_stats, Interval)]). node_stats(Ranges, Objs, Interval) -> [begin diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_metrics_collector.erl b/deps/rabbitmq_management/src/rabbit_mgmt_metrics_collector.erl index e3e5e22e73..043e38cc12 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_metrics_collector.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_metrics_collector.erl @@ -40,20 +40,22 @@ init([Table]) -> {ok, Policies} = application:get_env( rabbitmq_management, sample_retention_policies), Policy = retention_policy(Table), - TablePolicies = proplists:get_value(Policy, Policies), - Interval = take_smaller(TablePolicies), + Interval = take_smaller(proplists:get_value(Policy, Policies)), {ok, Agent} = rabbit_mgmt_agent_collector_sup:start_child(self(), Table, Interval * 1000), - {ok, #state{table = Table, agent = Agent, policies = TablePolicies, + {ok, #state{table = Table, agent = Agent, + policies = {proplists:get_value(basic, Policies), + proplists:get_value(detailed, Policies), + proplists:get_value(global, Policies)}, rates_mode = RatesMode}}. handle_call(_Request, _From, State) -> {noreply, State}. handle_cast({metrics, Timestamp, Records}, State = #state{table = Table, - policies = TablePolicies, + policies = Policies, rates_mode = RatesMode}) -> - aggregate_metrics(Timestamp, Table, TablePolicies, Records, RatesMode), + aggregate_metrics(Timestamp, Table, Policies, Records, RatesMode), {noreply, State}. handle_info(_Msg, State) -> @@ -95,22 +97,23 @@ aggregate_entry(_TS, connection_created, _, {Id, Metrics}, _) -> ets:insert(connection_created_stats, {Id, pget(name, Ftd, unknown), Ftd}); aggregate_entry(_TS, connection_metrics, _, {Id, Metrics}, _) -> ets:insert(connection_stats, {Id, Metrics}); -aggregate_entry(TS, connection_coarse_metrics, Policies, +aggregate_entry(TS, connection_coarse_metrics, {BPolicies, _, GPolicies}, {Id, RecvOct, SendOct, Reductions}, _) -> - %% VHOSTS are aggregated?? + Stats = {RecvOct, SendOct}, + Diff = get_difference(Id, Stats), + ets:insert(old_aggr_stats, {Id, Stats}), + [insert_entry(vhost_stats_coarse_conn_stats, vhost({connection_created_stats, Id}), + TS, Diff, Size, Interval, true) || {Size, Interval} <- GPolicies], [begin insert_entry(connection_stats_coarse_conn_stats, Id, TS, - {RecvOct, SendOct, Reductions}, Size, Interval, false), - insert_entry(vhost_stats_coarse_conn_stats, - vhost({connection_created_stats, Id}), TS, {RecvOct, SendOct, Reductions}, Size, Interval, false) - end || {Size, Interval} <- Policies]; + end || {Size, Interval} <- BPolicies]; aggregate_entry(_TS, channel_created, _, {Id, Metrics}, _) -> Ftd = rabbit_mgmt_format:format(Metrics, {[], false}), ets:insert(channel_created_stats, {Id, pget(name, Ftd, unknown), Ftd}); aggregate_entry(_TS, channel_metrics, _, {Id, Metrics}, _) -> ets:insert(channel_stats, {Id, Metrics}); -aggregate_entry(TS, channel_exchange_metrics, Policies, {{Ch, X} = Id, Metrics}, +aggregate_entry(TS, channel_exchange_metrics, {_, DPolicies, _}, {{Ch, X} = Id, Metrics}, RatesMode) -> Stats = {pget(publish, Metrics, 0), pget(confirm, Metrics, 0), pget(return_unroutable, Metrics, 0)}, @@ -121,22 +124,22 @@ aggregate_entry(TS, channel_exchange_metrics, Policies, {{Ch, X} = Id, Metrics}, true), insert_entry(vhost_stats_fine_stats, vhost(X), TS, Diff, Size, Interval, true) - end || {Size, Interval} <- Policies], + end || {Size, Interval} <- DPolicies], case {exchange_exists(X), RatesMode} of {true, basic} -> [insert_entry(exchange_stats_publish_in, X, TS, {Publish}, Size, Interval, - true) || {Size, Interval} <- Policies]; + true) || {Size, Interval} <- DPolicies]; {true, _} -> [begin insert_entry(exchange_stats_publish_in, X, TS, {Publish}, Size, Interval, true), insert_entry(channel_exchange_stats_fine_stats, Id, TS, Stats, Size, Interval, false) - end || {Size, Interval} <- Policies]; + end || {Size, Interval} <- DPolicies]; _ -> ok end; -aggregate_entry(TS, channel_queue_metrics, Policies, {{Ch, Q} = Id, Metrics}, +aggregate_entry(TS, channel_queue_metrics, {_, DPolicies, _}, {{Ch, Q} = Id, Metrics}, RatesMode) -> Deliver = pget(deliver, Metrics, 0), DeliverNoAck = pget(deliver_no_ack, Metrics, 0), @@ -151,53 +154,52 @@ aggregate_entry(TS, channel_queue_metrics, Policies, {{Ch, Q} = Id, Metrics}, Interval, true), insert_entry(channel_stats_deliver_stats, Ch, TS, Diff, Size, Interval, true) - end || {Size, Interval} <- Policies], + end || {Size, Interval} <- DPolicies], case {queue_exists(Q), RatesMode} of {true, basic} -> [insert_entry(queue_stats_deliver_stats, Q, TS, Diff, Size, Interval, - true) || {Size, Interval} <- Policies]; + true) || {Size, Interval} <- DPolicies]; {true, _} -> [begin insert_entry(queue_stats_deliver_stats, Q, TS, Diff, Size, Interval, true), insert_entry(channel_queue_stats_deliver_stats, Id, TS, Stats, Size, Interval, false) - end || {Size, Interval} <- Policies]; + end || {Size, Interval} <- DPolicies]; _ -> ok end; -aggregate_entry(TS, channel_queue_exchange_metrics, Policies, +aggregate_entry(TS, channel_queue_exchange_metrics, {_, DPolicies, _}, {{_Ch, {Q, X} = Id}, Publish}, RatesMode) -> Stats = {Publish}, Diff = get_difference(Id, Stats), ets:insert(old_aggr_stats, {Id, Stats}), - %% channel_exch, queue_exch, echange_stats case {queue_exists(Q), exchange_exists(Q), RatesMode} of {true, false, _} -> [insert_entry(queue_stats_publish, Q, TS, Diff, Size, Interval, true) - || {Size, Interval} <- Policies]; + || {Size, Interval} <- DPolicies]; {false, true, _} -> [insert_entry(exchange_stats_publish_out, X, TS, Diff, Size, Interval, true) - || {Size, Interval} <- Policies]; + || {Size, Interval} <- DPolicies]; {true, true, basic} -> [begin insert_entry(queue_stats_publish, Q, TS, Diff, Size, Interval, true), insert_entry(exchange_stats_publish_out, X, TS, Diff, Size, Interval, true) - end || {Size, Interval} <- Policies]; + end || {Size, Interval} <- DPolicies]; {true, true, _} -> [begin insert_entry(queue_stats_publish, Q, TS, Diff, Size, Interval, true), insert_entry(exchange_stats_publish_out, X, TS, Diff, Size, Interval, true), insert_entry(queue_exchange_stats_publish, Id, TS, Diff, Size, Interval, true) - end || {Size, Interval} <- Policies]; + end || {Size, Interval} <- DPolicies]; _ -> ok end; -aggregate_entry(TS, channel_process_metrics, Policies, {Id, Reductions}, _) -> +aggregate_entry(TS, channel_process_metrics, {BPolicies, _, _}, {Id, Reductions}, _) -> [begin insert_entry(channel_process_stats, Id, TS, {Reductions}, Size, Interval, false) - end || {Size, Interval} <- Policies]; + end || {Size, Interval} <- BPolicies]; aggregate_entry(_TS, consumer_created, _, {Id, Exclusive, AckRequired, PrefetchCount, Args}, _) -> Fmt = rabbit_mgmt_format:format([{exclusive, Exclusive}, @@ -216,9 +218,13 @@ aggregate_entry(_TS, queue_metrics, _, {Id, Metrics}, _) -> false -> ok end; -aggregate_entry(TS, queue_coarse_metrics, Policies, {Name, Ready, Unack, Msgs, - Red}, _) -> - %% TODO vhost stats ready, unack, msg +aggregate_entry(TS, queue_coarse_metrics, {BPolicies, _, GPolicies}, + {Name, Ready, Unack, Msgs, Red}, _) -> + Stats = {Ready, Unack, Msgs}, + Diff = get_difference(Name, Stats), + ets:insert(old_aggr_stats, {Name, Stats}), + [insert_entry(vhost_msg_stats, vhost(Name), TS, Diff, Size, Interval, true) + || {Size, Interval} <- GPolicies], case queue_exists(Name) of true -> [begin @@ -226,20 +232,20 @@ aggregate_entry(TS, queue_coarse_metrics, Policies, {Name, Ready, Unack, Msgs, Size, Interval, false), insert_entry(queue_msg_stats, Name, TS, {Ready, Unack, Msgs}, Size, Interval, false) - end || {Size, Interval} <- Policies]; + end || {Size, Interval} <- BPolicies]; _ -> ok end; aggregate_entry(_TS, node_metrics, _, {Id, Metrics}, _) -> ets:insert(node_stats, {Id, Metrics}); -aggregate_entry(TS, node_coarse_metrics, Policies, {Id, Metrics}, _) -> +aggregate_entry(TS, node_coarse_metrics, {_, _, GPolicies}, {Id, Metrics}, _) -> Stats = {pget(fd_used, Metrics, 0), pget(sockets_used, Metrics, 0), pget(mem_used, Metrics, 0), pget(disk_free, Metrics, 0), pget(proc_used, Metrics, 0), pget(gc_num, Metrics, 0), pget(gc_bytes_reclaimed, Metrics, 0), pget(context_switches, Metrics, 0)}, [insert_entry(node_coarse_stats, Id, TS, Stats, Size, Interval, false) - || {Size, Interval} <- Policies]; -aggregate_entry(TS, node_persister_metrics, Policies, {Id, Metrics}, _) -> + || {Size, Interval} <- GPolicies]; +aggregate_entry(TS, node_persister_metrics, {_, _, GPolicies}, {Id, Metrics}, _) -> Stats = {pget(io_read_count, Metrics, 0), pget(io_read_bytes, Metrics, 0), pget(io_read_time, Metrics, 0), pget(io_write_count, Metrics, 0), pget(io_write_bytes, Metrics, 0), pget(io_write_time, Metrics, 0), @@ -253,13 +259,13 @@ aggregate_entry(TS, node_persister_metrics, Policies, {Id, Metrics}, _) -> pget(io_file_handle_open_attempt_count, Metrics, 0), pget(io_file_handle_open_attempt_time, Metrics, 0)}, [insert_entry(node_persister_stats, Id, TS, Stats, Size, Interval, false) - || {Size, Interval} <- Policies]; -aggregate_entry(TS, node_node_metrics, Policies, {Id, Metrics}, _) -> + || {Size, Interval} <- GPolicies]; +aggregate_entry(TS, node_node_metrics, {_, _, GPolicies}, {Id, Metrics}, _) -> Stats = {pget(send_bytes, Metrics, 0), pget(recv_bytes, Metrics, 0)}, CleanMetrics = lists:keydelete(recv_bytes, 1, lists:keydelete(send_bytes, 1, Metrics)), ets:insert(node_node_stats, {Id, CleanMetrics}), [insert_entry(node_node_coarse_stats, Id, TS, Stats, Size, Interval, false) - || {Size, Interval} <- Policies]. + || {Size, Interval} <- GPolicies]. insert_entry(Table, Id, TS, Entry, Size, Interval, Incremental) -> Key = {Id, Interval}, @@ -282,6 +288,8 @@ get_difference(Id, Stats) -> difference({A0}, {B0}) -> {B0 - A0}; +difference({A0, A1}, {B0, B1}) -> + {B0 - A0, B1 - A0}; difference({A0, A1, A2}, {B0, B1, B2}) -> {B0 - A0, B1 - A1, B2 - A2}; difference({A0, A1, A2, A3, A4, A5, A6}, {B0, B1, B2, B3, B4, B5, B6}) -> diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_metrics_gc.erl b/deps/rabbitmq_management/src/rabbit_mgmt_metrics_gc.erl index dd7e38c1b4..4be3021af9 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_metrics_gc.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_metrics_gc.erl @@ -92,6 +92,7 @@ code_change(_OldVsn, State, _Extra) -> remove_connection(Id, Intervals) -> ets:delete(connection_created_stats, Id), ets:delete(connection_stats, Id), + ets:delete(old_aggr_stats, Id), delete_samples(connection_stats_coarse_conn_stats, Id, Intervals), delete_samples(vhost_stats_coarse_conn_stats, Id, Intervals). @@ -125,6 +126,7 @@ remove_queue(Name, BIntervals, DIntervals) -> ets:select_delete(queue_exchange_stats_publish, match_interval_spec({Name})), delete_samples(queue_process_stats, Name, BIntervals), delete_samples(queue_msg_stats, Name, BIntervals), + ets:delete(old_aggr_stats, Name), ets:select_delete(old_aggr_stats, match_second_spec({Name})), ets:select_delete(consumer_stats, match_queue_consumer_spec({Name})), ok. diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_stats.erl b/deps/rabbitmq_management/src/rabbit_mgmt_stats.erl index 713612339d..0354428bcb 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_stats.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_stats.erl @@ -220,7 +220,7 @@ missing_samples(Next, Incr, TS) -> %% connection_stats_coarse_conn_stats, channel_stats_fine_stats, %% vhost_stats_fine_stats, channel_exchange_stats_fine_stats, -%% queue_msg_stats +%% queue_msg_stats, vhost_msg_stats append_full_sample(TS, {V1, V2, V3}, {S1, S2, S3}, {T1, T2, T3}) -> {{append_sample(V1, TS, S1), append_sample(V2, TS, S2), append_sample(V3, TS, S3)}, {V1 + T1, V2 + T2, V3 + T3}}; @@ -269,7 +269,7 @@ append_full_sample(TS, {V1 + T1, V2 + T2, V3 + T3, V4 + T4, V5 + T5, V6 + T6, V7 + T7, V8 + T8, V9 + T9, V10 + T10, V11 + T11, V12 + T12, V13 + T13, V14 + T14, V15 + T15, V16 + T16, V17 + T17, V18 + T18, V19 + T19, V20 + T20}}; -%% node_node_coarse_stats +%% node_node_coarse_stats, vhost_stats_coarse_connection_stats append_full_sample(TS, {V1, V2}, {S1, S2}, {T1, T2}) -> {{append_sample(V1, TS, S1), append_sample(V2, TS, S2)}, {V1 + T1, V2 + T2}}. @@ -311,6 +311,8 @@ retention_policy(vhost_stats_fine_stats) -> global; retention_policy(vhost_stats_deliver_stats) -> global; +retention_policy(vhost_stats_coarse_conn_stats) -> + global; retention_policy(channel_stats_deliver_stats) -> basic; retention_policy(queue_stats_deliver_stats) -> @@ -327,6 +329,8 @@ retention_policy(queue_process_stats) -> basic; retention_policy(queue_msg_stats) -> basic; +retention_policy(vhost_msg_stats) -> + global; retention_policy(node_coarse_stats) -> global; retention_policy(node_persister_stats) -> @@ -343,6 +347,13 @@ format_rate(connection_stats_coarse_conn_stats, {TR, TS, TRe}, {RR, RS, RRe}) -> {reductions, TRe}, {reductions_details, [{rate, RRe}]} ]; +format_rate(vhost_stats_coarse_conn_stats, {TR, TS}, {RR, RS}) -> + [ + {send_oct, TS}, + {send_oct_details, [{rate, RS}]}, + {recv_oct, TR}, + {recv_oct_details, [{rate, RR}]} + ]; format_rate(Type, {TP, TC, TRe}, {RP, RC, RRe}) when Type =:= channel_stats_fine_stats; Type =:= vhost_stats_fine_stats; @@ -399,7 +410,8 @@ format_rate(Type, {TP}, {RP}) when Type =:= queue_stats_publish; {publish, TP}, {publish_details, [{rate, RP}]} ]; -format_rate(queue_msg_stats, {TR, TU, TM}, {RR, RU, RM}) -> +format_rate(Type, {TR, TU, TM}, {RR, RU, RM}) when Type =:= queue_msg_stats; + Type =:= vhost_msg_stats -> [ {messages_ready, TR}, {messages_ready_details, [{rate, RR}]}, @@ -503,6 +515,16 @@ format_rate(connection_stats_coarse_conn_stats, {TR, TS, TRe}, {RR, RS, RRe}, {reductions_details, [{rate, RRe}, {samples, SRe}] ++ average(SRe, STRe, Length)} ]; +format_rate(vhost_stats_coarse_conn_stats, {TR, TS}, {RR, RS}, {SR, SS}, + {STR, STS}, Length) -> + [ + {send_oct, TS}, + {send_oct_details, [{rate, RS}, + {samples, SS}] ++ average(SS, STS, Length)}, + {recv_oct, TR}, + {recv_oct_details, [{rate, RR}, + {samples, SR}] ++ average(SR, STR, Length)} + ]; format_rate(Type, {TP, TC, TRe}, {RP, RC, RRe}, {SP, SC, SRe}, {STP, STC, STRe}, Length) when Type =:= channel_stats_fine_stats; @@ -577,8 +599,9 @@ format_rate(Type, {TP}, {RP}, {SP}, {STP}, Length) {publish_out_details, [{rate, RP}, {samples, SP}] ++ average(SP, STP, Length)} ]; -format_rate(queue_msg_stats, {TR, TU, TM}, {RR, RU, RM}, - {SR, SU, SM}, {STR, STU, STM}, Length) -> +format_rate(Type, {TR, TU, TM}, {RR, RU, RM}, {SR, SU, SM}, {STR, STU, STM}, + Length) when Type =:= queue_msg_stats; + Type =:= vhost_msg_stats -> [ {messages_ready, TR}, {messages_ready_details, [{rate, RR}, @@ -745,6 +768,9 @@ rate_from_last_increment(Total, [H | _T]) -> rate_from_difference({TS0, {A0, A1, A2}}, {TS1, {B0, B1, B2}}) -> Interval = TS0 - TS1, {rate(A0 - B0, Interval), rate(A1 - B1, Interval), rate(A2 - B2, Interval)}; +rate_from_difference({TS0, {A0, A1}}, {TS1, {B0, B1}}) -> + Interval = TS0 - TS1, + {rate(A0 - B0, Interval), rate(A1 - B1, Interval)}; rate_from_difference({TS0, {A0, A1, A2, A3, A4, A5, A6}}, {TS1, {B0, B1, B2, B3, B4, B5, B6}}) -> Interval = TS0 - TS1, @@ -780,7 +806,8 @@ new_empty(Type, V) when Type =:= connection_stats_coarse_conn_stats; Type =:= channel_stats_fine_stats; Type =:= channel_exchange_stats_fine_stats; Type =:= vhost_stats_fine_stats; - Type =:= queue_msg_stats -> + Type =:= queue_msg_stats; + Type =:= vhost_msg_stats -> {V, V, V}; new_empty(Type, V) when Type =:= channel_queue_stats_deliver_stats; Type =:= queue_stats_deliver_stats; @@ -798,7 +825,8 @@ new_empty(node_coarse_stats, V) -> {V, V, V, V, V, V, V, V}; new_empty(node_persister_stats, V) -> {V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V}; -new_empty(node_node_coarse_stats, V) -> +new_empty(Type, V) when Type =:= node_node_coarse_stats; + Type =:= vhost_stats_coarse_conn_stats -> {V, V}. format(no_range, Table, Id, Interval, Type) ->