Reduce GC operations by keeping the old stats in each collector
process instead of ETS.
This commit is contained in:
parent
eef35ab636
commit
7a3a2ef89b
|
|
@ -26,7 +26,7 @@
|
|||
-export([name/1]).
|
||||
-export([start_link/1]).
|
||||
-export([override_lookups/2, reset_lookups/1]).
|
||||
-export([delete_queue/3]).
|
||||
-export([delete_queue/2]).
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||
code_change/3]).
|
||||
-export([index_table/2]).
|
||||
|
|
@ -34,7 +34,8 @@
|
|||
-import(rabbit_misc, [pget/3]).
|
||||
-import(rabbit_mgmt_db, [pget/2, lookup_element/3]).
|
||||
|
||||
-record(state, {table, interval, policies, rates_mode, lookup_queue, lookup_exchange}).
|
||||
-record(state, {table, interval, policies, rates_mode, lookup_queue,
|
||||
lookup_exchange, old_aggr_stats}).
|
||||
|
||||
%% Data is stored in ETS tables:
|
||||
%% * One ETS table per metric (queue_stats, channel_stats_deliver_stats...)
|
||||
|
|
@ -61,8 +62,8 @@ override_lookups(Table, Lookups) ->
|
|||
reset_lookups(Table) ->
|
||||
gen_server:call(name(Table), reset_lookups, infinity).
|
||||
|
||||
delete_queue(Table, Queue, Stats) ->
|
||||
gen_server:cast(name(Table), {delete_queue, Queue, Stats}).
|
||||
delete_queue(Table, Queue) ->
|
||||
gen_server:cast(name(Table), {delete_queue, Queue}).
|
||||
|
||||
init([Table]) ->
|
||||
{ok, RatesMode} = application:get_env(rabbitmq_management, rates_mode),
|
||||
|
|
@ -77,33 +78,40 @@ init([Table]) ->
|
|||
proplists:get_value(global, Policies)},
|
||||
rates_mode = RatesMode,
|
||||
lookup_queue = fun queue_exists/1,
|
||||
old_aggr_stats = dict:new(),
|
||||
lookup_exchange = fun exchange_exists/1}}.
|
||||
|
||||
handle_call(reset_lookups, _From, State) ->
|
||||
{reply, ok, State#state{lookup_queue = fun queue_exists/1,
|
||||
lookup_exchange = fun exchange_exists/1}};
|
||||
lookup_exchange = fun exchange_exists/1}};
|
||||
handle_call({override_lookups, Lookups}, _From, State) ->
|
||||
{reply, ok, State#state{lookup_queue = pget(queue, Lookups),
|
||||
lookup_exchange = pget(exchange, Lookups)}};
|
||||
lookup_exchange = pget(exchange, Lookups)}};
|
||||
handle_call({submit, Fun}, _From, State) ->
|
||||
{reply, Fun(), State};
|
||||
handle_call(_Request, _From, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_cast({delete_queue, Queue, {R, U, M}}, State = #state{table = queue_coarse_metrics,
|
||||
policies = {_, _, GPolicies}}) ->
|
||||
handle_cast({delete_queue, Queue}, State = #state{table = queue_coarse_metrics,
|
||||
old_aggr_stats = Old,
|
||||
policies = {_, _, GPolicies}}) ->
|
||||
TS = exometer_slide:timestamp(),
|
||||
NegStats = ?vhost_msg_stats(-R, -U, -M),
|
||||
[insert_entry(vhost_msg_stats, vhost(Queue), TS, NegStats, Size, Interval, true)
|
||||
|| {Size, Interval} <- GPolicies],
|
||||
{noreply, State};
|
||||
case dict:find(Queue, Old) of
|
||||
error ->
|
||||
{noreply, State};
|
||||
{ok, {R, U, M}} ->
|
||||
NegStats = ?vhost_msg_stats(-R, -U, -M),
|
||||
[insert_entry(vhost_msg_stats, vhost(Queue), TS, NegStats, Size, Interval, true)
|
||||
|| {Size, Interval} <- GPolicies],
|
||||
{noreply, State}
|
||||
end;
|
||||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
|
||||
handle_info(collect_metrics, #state{interval = Interval} = State) ->
|
||||
handle_info(collect_metrics, #state{interval = Interval} = State0) ->
|
||||
Timestamp = exometer_slide:timestamp(),
|
||||
aggregate_metrics(Timestamp, State),
|
||||
State = aggregate_metrics(Timestamp, State0),
|
||||
erlang:send_after(Interval, self(), collect_metrics),
|
||||
{noreply, State};
|
||||
handle_info(_Msg, State) ->
|
||||
|
|
@ -135,55 +143,57 @@ retention_policy(node_node_metrics) -> global.
|
|||
take_smaller(Policies) ->
|
||||
lists:min([I || {_, I} <- Policies]).
|
||||
|
||||
insert_old_aggr_stats(NextStats, Id, Stat) ->
|
||||
dict:store(Id, Stat, NextStats).
|
||||
|
||||
aggregate_metrics(Timestamp, State) ->
|
||||
Table = State#state.table,
|
||||
ets:foldl(
|
||||
fun(R, noacc) ->
|
||||
_ = aggregate_entry(Timestamp, R, State),
|
||||
noacc
|
||||
end,
|
||||
noacc,
|
||||
Table).
|
||||
Res = ets:foldl(
|
||||
fun(R, Dict) ->
|
||||
aggregate_entry(Timestamp, R, Dict, State)
|
||||
end, dict:new(), Table),
|
||||
State#state{old_aggr_stats = Res}.
|
||||
|
||||
aggregate_entry(_TS, {Id, Metrics}, #state{table = connection_created}) ->
|
||||
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = connection_created}) ->
|
||||
Ftd = rabbit_mgmt_format:format(
|
||||
Metrics,
|
||||
{fun rabbit_mgmt_format:format_connection_created/1, true}),
|
||||
ets:insert(connection_created_stats,
|
||||
?connection_created_stats(Id, pget(name, Ftd, unknown), Ftd));
|
||||
aggregate_entry(_TS, {Id, Metrics}, #state{table = connection_metrics}) ->
|
||||
ets:insert(connection_stats, ?connection_stats(Id, Metrics));
|
||||
aggregate_entry(TS, {Id, RecvOct, SendOct, Reductions},
|
||||
?connection_created_stats(Id, pget(name, Ftd, unknown), Ftd)),
|
||||
NextStats;
|
||||
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = connection_metrics}) ->
|
||||
ets:insert(connection_stats, ?connection_stats(Id, Metrics)),
|
||||
NextStats;
|
||||
aggregate_entry(TS, {Id, RecvOct, SendOct, Reductions}, NextStats,
|
||||
#state{table = connection_coarse_metrics,
|
||||
policies = {BPolicies, _, GPolicies}}) ->
|
||||
policies = {BPolicies, _, GPolicies}} = State) ->
|
||||
Stats = ?vhost_stats_coarse_conn_stats(RecvOct, SendOct),
|
||||
Diff = get_difference(Id, Stats),
|
||||
ets:insert(old_aggr_stats, ?old_aggr_stats(Id, Stats)),
|
||||
Diff = get_difference(Id, Stats, State),
|
||||
[insert_entry(vhost_stats_coarse_conn_stats, vhost({connection_created_stats, Id}),
|
||||
TS, Diff, Size, Interval, true) || {Size, Interval} <- GPolicies],
|
||||
[begin
|
||||
insert_entry(connection_stats_coarse_conn_stats, Id, TS,
|
||||
?connection_stats_coarse_conn_stats(RecvOct, SendOct, Reductions),
|
||||
Size, Interval, false)
|
||||
end || {Size, Interval} <- BPolicies];
|
||||
aggregate_entry(_TS, {Id, Metrics}, #state{table = channel_created}) ->
|
||||
end || {Size, Interval} <- BPolicies],
|
||||
insert_old_aggr_stats(NextStats, Id, Stats);
|
||||
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = channel_created}) ->
|
||||
Ftd = rabbit_mgmt_format:format(Metrics, {[], false}),
|
||||
ets:insert(channel_created_stats,
|
||||
?channel_created_stats(Id, pget(name, Ftd, unknown), Ftd));
|
||||
aggregate_entry(_TS, {Id, Metrics}, #state{table = channel_metrics}) ->
|
||||
?channel_created_stats(Id, pget(name, Ftd, unknown), Ftd)),
|
||||
NextStats;
|
||||
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = channel_metrics}) ->
|
||||
Ftd = rabbit_mgmt_format:format(Metrics,
|
||||
{fun rabbit_mgmt_format:format_channel_stats/1, true}),
|
||||
ets:insert(channel_stats, ?channel_stats(Id, Ftd));
|
||||
aggregate_entry(TS, {{Ch, X} = Id, Publish0, Confirm, ReturnUnroutable},
|
||||
ets:insert(channel_stats, ?channel_stats(Id, Ftd)),
|
||||
NextStats;
|
||||
aggregate_entry(TS, {{Ch, X} = Id, Publish0, Confirm, ReturnUnroutable}, NextStats,
|
||||
#state{table = channel_exchange_metrics,
|
||||
policies = {BPolicies, DPolicies, GPolicies},
|
||||
rates_mode = RatesMode,
|
||||
lookup_exchange = ExchangeFun}) ->
|
||||
lookup_exchange = ExchangeFun} = State) ->
|
||||
Stats = ?channel_stats_fine_stats(Publish0, Confirm, ReturnUnroutable),
|
||||
{Publish, _, _} = Diff = get_difference(Id, Stats),
|
||||
ets:insert(old_aggr_stats, ?old_aggr_stats(Id, Stats)),
|
||||
%% Custom insert for channel only to avoid ambiguity with {Channel, Queue} key
|
||||
ets:insert(index_table(old_aggr_stats, channel), {Ch, Id}),
|
||||
{Publish, _, _} = Diff = get_difference(Id, Stats, State),
|
||||
[begin
|
||||
insert_entry(channel_stats_fine_stats, Ch, TS, Diff, Size, Interval,
|
||||
true)
|
||||
|
|
@ -206,17 +216,18 @@ aggregate_entry(TS, {{Ch, X} = Id, Publish0, Confirm, ReturnUnroutable},
|
|||
end || {Size, Interval} <- DPolicies];
|
||||
_ ->
|
||||
ok
|
||||
end;
|
||||
end,
|
||||
insert_old_aggr_stats(NextStats, Id, Stats);
|
||||
aggregate_entry(TS, {{Ch, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck, Redeliver, Ack},
|
||||
NextStats,
|
||||
#state{table = channel_queue_metrics,
|
||||
policies = {BPolicies, DPolicies, GPolicies},
|
||||
rates_mode = RatesMode,
|
||||
lookup_queue = QueueFun}) ->
|
||||
lookup_queue = QueueFun} = State) ->
|
||||
Stats = ?vhost_stats_deliver_stats(Get, GetNoAck, Deliver, DeliverNoAck,
|
||||
Redeliver, Ack,
|
||||
Deliver + DeliverNoAck + Get + GetNoAck),
|
||||
Diff = get_difference(Id, Stats),
|
||||
insert_with_index(old_aggr_stats, Id, ?old_aggr_stats(Id, Stats)),
|
||||
Diff = get_difference(Id, Stats, State),
|
||||
[begin
|
||||
insert_entry(vhost_stats_deliver_stats, vhost(Q), TS, Diff, Size,
|
||||
Interval, true)
|
||||
|
|
@ -226,27 +237,27 @@ aggregate_entry(TS, {{Ch, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck, Redeliv
|
|||
true)
|
||||
end || {Size, Interval} <- BPolicies],
|
||||
case {QueueFun(Q), RatesMode} of
|
||||
{true, basic} ->
|
||||
[insert_entry(queue_stats_deliver_stats, Q, TS, Diff, Size, Interval,
|
||||
true) || {Size, Interval} <- BPolicies];
|
||||
{true, _} ->
|
||||
[insert_entry(queue_stats_deliver_stats, Q, TS, Diff, Size, Interval,
|
||||
true) || {Size, Interval} <- BPolicies],
|
||||
[insert_entry(channel_queue_stats_deliver_stats, Id, TS, Stats, Size,
|
||||
Interval, false)
|
||||
|| {Size, Interval} <- DPolicies];
|
||||
_ ->
|
||||
ok
|
||||
end;
|
||||
aggregate_entry(TS, {{_Ch, {Q, X}} = Id, Publish},
|
||||
{true, basic} ->
|
||||
[insert_entry(queue_stats_deliver_stats, Q, TS, Diff, Size, Interval,
|
||||
true) || {Size, Interval} <- BPolicies];
|
||||
{true, _} ->
|
||||
[insert_entry(queue_stats_deliver_stats, Q, TS, Diff, Size, Interval,
|
||||
true) || {Size, Interval} <- BPolicies],
|
||||
[insert_entry(channel_queue_stats_deliver_stats, Id, TS, Stats, Size,
|
||||
Interval, false)
|
||||
|| {Size, Interval} <- DPolicies];
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
insert_old_aggr_stats(NextStats, Id, Stats);
|
||||
aggregate_entry(TS, {{_Ch, {Q, X}} = Id, Publish}, NextStats,
|
||||
#state{table = channel_queue_exchange_metrics,
|
||||
policies = {BPolicies, _, _},
|
||||
rates_mode = RatesMode,
|
||||
lookup_queue = QueueFun,
|
||||
lookup_exchange = ExchangeFun}) ->
|
||||
lookup_exchange = ExchangeFun} = State) ->
|
||||
Stats = ?queue_stats_publish(Publish),
|
||||
Diff = get_difference(Id, Stats),
|
||||
insert_with_index(old_aggr_stats, Id, ?old_aggr_stats(Id, Stats)),
|
||||
Diff = get_difference(Id, Stats, State),
|
||||
case {QueueFun(Q), ExchangeFun(Q), RatesMode} of
|
||||
{true, false, _} ->
|
||||
[insert_entry(queue_stats_publish, Q, TS, Diff, Size, Interval, true)
|
||||
|
|
@ -267,46 +278,48 @@ aggregate_entry(TS, {{_Ch, {Q, X}} = Id, Publish},
|
|||
end || {Size, Interval} <- BPolicies];
|
||||
_ ->
|
||||
ok
|
||||
end;
|
||||
aggregate_entry(TS, {Id, Reductions}, #state{table = channel_process_metrics,
|
||||
end,
|
||||
insert_old_aggr_stats(NextStats, Id, Stats);
|
||||
aggregate_entry(TS, {Id, Reductions}, NextStats, #state{table = channel_process_metrics,
|
||||
policies = {BPolicies, _, _}}) ->
|
||||
[begin
|
||||
insert_entry(channel_process_stats, Id, TS, ?channel_process_stats(Reductions),
|
||||
Size, Interval, false)
|
||||
end || {Size, Interval} <- BPolicies];
|
||||
aggregate_entry(_TS, {Id, Exclusive, AckRequired, PrefetchCount, Args},
|
||||
end || {Size, Interval} <- BPolicies],
|
||||
NextStats;
|
||||
aggregate_entry(_TS, {Id, Exclusive, AckRequired, PrefetchCount, Args}, NextStats,
|
||||
#state{table = consumer_created}) ->
|
||||
Fmt = rabbit_mgmt_format:format([{exclusive, Exclusive},
|
||||
{ack_required, AckRequired},
|
||||
{prefetch_count, PrefetchCount},
|
||||
{arguments, Args}], {[], false}),
|
||||
insert_with_index(consumer_stats, Id, ?consumer_stats(Id, Fmt)),
|
||||
ok;
|
||||
aggregate_entry(TS, {Id, Metrics}, #state{table = queue_metrics,
|
||||
policies = {BPolicies, _, GPolicies},
|
||||
lookup_queue = QueueFun}) ->
|
||||
NextStats;
|
||||
aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = queue_metrics,
|
||||
policies = {BPolicies, _, GPolicies},
|
||||
lookup_queue = QueueFun} = State) ->
|
||||
Stats = ?queue_msg_rates(pget(disk_reads, Metrics, 0), pget(disk_writes, Metrics, 0)),
|
||||
Diff = get_difference({Id, rates}, Stats),
|
||||
ets:insert(old_aggr_stats, ?old_aggr_stats({Id, rates}, Stats)),
|
||||
Diff = get_difference(Id, Stats, State),
|
||||
[insert_entry(vhost_msg_rates, vhost(Id), TS, Diff, Size, Interval, true)
|
||||
|| {Size, Interval} <- GPolicies],
|
||||
case QueueFun(Id) of
|
||||
true ->
|
||||
[insert_entry(queue_msg_rates, Id, TS, Stats, Size, Interval, false)
|
||||
|| {Size, Interval} <- BPolicies],
|
||||
Fmt = rabbit_mgmt_format:format(
|
||||
Metrics,
|
||||
{fun rabbit_mgmt_format:format_queue_stats/1, false}),
|
||||
ets:insert(queue_stats, ?queue_stats(Id, Fmt));
|
||||
false ->
|
||||
ok
|
||||
end;
|
||||
aggregate_entry(TS, {Name, Ready, Unack, Msgs, Red}, #state{table = queue_coarse_metrics,
|
||||
policies = {BPolicies, _, GPolicies},
|
||||
lookup_queue = QueueFun}) ->
|
||||
true ->
|
||||
[insert_entry(queue_msg_rates, Id, TS, Stats, Size, Interval, false)
|
||||
|| {Size, Interval} <- BPolicies],
|
||||
Fmt = rabbit_mgmt_format:format(
|
||||
Metrics,
|
||||
{fun rabbit_mgmt_format:format_queue_stats/1, false}),
|
||||
ets:insert(queue_stats, ?queue_stats(Id, Fmt));
|
||||
false ->
|
||||
ok
|
||||
end,
|
||||
insert_old_aggr_stats(NextStats, Id, Stats);
|
||||
aggregate_entry(TS, {Name, Ready, Unack, Msgs, Red}, NextStats,
|
||||
#state{table = queue_coarse_metrics,
|
||||
policies = {BPolicies, _, GPolicies},
|
||||
lookup_queue = QueueFun} = State) ->
|
||||
Stats = ?vhost_msg_stats(Ready, Unack, Msgs),
|
||||
Diff = get_difference(Name, Stats),
|
||||
ets:insert(old_aggr_stats, ?old_aggr_stats(Name, Stats)),
|
||||
Diff = get_difference(Name, Stats, State),
|
||||
[insert_entry(vhost_msg_stats, vhost(Name), TS, Diff, Size, Interval, true)
|
||||
|| {Size, Interval} <- GPolicies],
|
||||
case QueueFun(Name) of
|
||||
|
|
@ -319,10 +332,12 @@ aggregate_entry(TS, {Name, Ready, Unack, Msgs, Red}, #state{table = queue_coarse
|
|||
end || {Size, Interval} <- BPolicies];
|
||||
_ ->
|
||||
ok
|
||||
end;
|
||||
aggregate_entry(_TS, {Id, Metrics}, #state{table = node_metrics}) ->
|
||||
ets:insert(node_stats, {Id, Metrics});
|
||||
aggregate_entry(TS, {Id, Metrics}, #state{table = node_coarse_metrics,
|
||||
end,
|
||||
insert_old_aggr_stats(NextStats, Name, Stats);
|
||||
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = node_metrics}) ->
|
||||
ets:insert(node_stats, {Id, Metrics}),
|
||||
NextStats;
|
||||
aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = node_coarse_metrics,
|
||||
policies = {_, _, GPolicies}}) ->
|
||||
Stats = ?node_coarse_stats(
|
||||
pget(fd_used, Metrics, 0), pget(sockets_used, Metrics, 0),
|
||||
|
|
@ -330,9 +345,10 @@ aggregate_entry(TS, {Id, Metrics}, #state{table = node_coarse_metrics,
|
|||
pget(proc_used, Metrics, 0), pget(gc_num, Metrics, 0),
|
||||
pget(gc_bytes_reclaimed, Metrics, 0), pget(context_switches, Metrics, 0)),
|
||||
[insert_entry(node_coarse_stats, Id, TS, Stats, Size, Interval, false)
|
||||
|| {Size, Interval} <- GPolicies];
|
||||
aggregate_entry(TS, {Id, Metrics}, #state{table = node_persister_metrics,
|
||||
policies = {_, _, GPolicies}}) ->
|
||||
|| {Size, Interval} <- GPolicies],
|
||||
NextStats;
|
||||
aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = node_persister_metrics,
|
||||
policies = {_, _, GPolicies}}) ->
|
||||
Stats = ?node_persister_stats(
|
||||
pget(io_read_count, Metrics, 0), pget(io_read_bytes, Metrics, 0),
|
||||
pget(io_read_time, Metrics, 0), pget(io_write_count, Metrics, 0),
|
||||
|
|
@ -347,14 +363,16 @@ aggregate_entry(TS, {Id, Metrics}, #state{table = node_persister_metrics,
|
|||
pget(io_file_handle_open_attempt_count, Metrics, 0),
|
||||
pget(io_file_handle_open_attempt_time, Metrics, 0)),
|
||||
[insert_entry(node_persister_stats, Id, TS, Stats, Size, Interval, false)
|
||||
|| {Size, Interval} <- GPolicies];
|
||||
aggregate_entry(TS, {Id, Metrics}, #state{table = node_node_metrics,
|
||||
policies = {_, _, GPolicies}}) ->
|
||||
|| {Size, Interval} <- GPolicies],
|
||||
NextStats;
|
||||
aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = node_node_metrics,
|
||||
policies = {_, _, GPolicies}}) ->
|
||||
Stats = ?node_node_coarse_stats(pget(send_bytes, Metrics, 0), pget(recv_bytes, Metrics, 0)),
|
||||
CleanMetrics = lists:keydelete(recv_bytes, 1, lists:keydelete(send_bytes, 1, Metrics)),
|
||||
ets:insert(node_node_stats, ?node_node_stats(Id, CleanMetrics)),
|
||||
[insert_entry(node_node_coarse_stats, Id, TS, Stats, Size, Interval, false)
|
||||
|| {Size, Interval} <- GPolicies].
|
||||
|| {Size, Interval} <- GPolicies],
|
||||
NextStats.
|
||||
|
||||
insert_entry(Table, Id, TS, Entry, Size, Interval, Incremental) ->
|
||||
Key = {Id, Interval},
|
||||
|
|
@ -371,12 +389,12 @@ insert_entry(Table, Id, TS, Entry, Size, Interval, Incremental) ->
|
|||
end,
|
||||
insert_with_index(Table, Key, {Key, exometer_slide:add_element(TS, Entry, Slide)}).
|
||||
|
||||
get_difference(Id, Stats) ->
|
||||
case ets:lookup(old_aggr_stats, Id) of
|
||||
[] ->
|
||||
Stats;
|
||||
[{Id, OldStats}] ->
|
||||
difference(OldStats, Stats)
|
||||
get_difference(Id, Stats, #state{old_aggr_stats = OldStats}) ->
|
||||
case dict:find(Id, OldStats) of
|
||||
error ->
|
||||
Stats;
|
||||
{ok, OldStat} ->
|
||||
difference(OldStat, Stats)
|
||||
end.
|
||||
|
||||
difference({A0}, {B0}) ->
|
||||
|
|
@ -397,18 +415,18 @@ vhost({TName, Pid}) ->
|
|||
|
||||
exchange_exists(Name) ->
|
||||
case rabbit_exchange:lookup(Name) of
|
||||
{ok, _} ->
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
{ok, _} ->
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
end.
|
||||
|
||||
queue_exists(Name) ->
|
||||
case rabbit_amqqueue:lookup(Name) of
|
||||
{ok, _} ->
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
{ok, _} ->
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
end.
|
||||
|
||||
insert_with_index(Table, Key, Tuple) ->
|
||||
|
|
|
|||
|
|
@ -60,17 +60,17 @@ handle_cast({event, #event{type = consumer_deleted, props = Props}}, State) ->
|
|||
remove_consumer(Props),
|
||||
{noreply, State};
|
||||
handle_cast({event, #event{type = exchange_deleted, props = Props}},
|
||||
State = #state{basic_i = BIntervals}) ->
|
||||
State = #state{basic_i = BIntervals}) ->
|
||||
Name = pget(name, Props),
|
||||
remove_exchange(Name, BIntervals),
|
||||
{noreply, State};
|
||||
handle_cast({event, #event{type = queue_deleted, props = Props}},
|
||||
State = #state{basic_i = BIntervals}) ->
|
||||
State = #state{basic_i = BIntervals}) ->
|
||||
Name = pget(name, Props),
|
||||
remove_queue(Name, BIntervals),
|
||||
{noreply, State};
|
||||
handle_cast({event, #event{type = vhost_deleted, props = Props}},
|
||||
State = #state{global_i = GIntervals}) ->
|
||||
State = #state{global_i = GIntervals}) ->
|
||||
Name = pget(name, Props),
|
||||
remove_vhost(Name, GIntervals),
|
||||
{noreply, State};
|
||||
|
|
@ -91,7 +91,6 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
remove_connection(Id, BIntervals, GIntervals) ->
|
||||
ets:delete(connection_created_stats, Id),
|
||||
ets:delete(connection_stats, Id),
|
||||
ets:delete(old_aggr_stats, Id),
|
||||
delete_samples(connection_stats_coarse_conn_stats, Id, BIntervals),
|
||||
delete_samples(vhost_stats_coarse_conn_stats, Id, GIntervals),
|
||||
ok.
|
||||
|
|
@ -103,7 +102,6 @@ remove_channel(Id, BIntervals) ->
|
|||
delete_samples(channel_stats_fine_stats, Id, BIntervals),
|
||||
delete_samples(channel_stats_deliver_stats, Id, BIntervals),
|
||||
index_delete(consumer_stats, channel, Id),
|
||||
index_delete(old_aggr_stats, channel, Id),
|
||||
index_delete(channel_exchange_stats_fine_stats, channel, Id),
|
||||
index_delete(channel_queue_stats_deliver_stats, channel, Id),
|
||||
ok.
|
||||
|
|
@ -129,18 +127,9 @@ remove_queue(Name, BIntervals) ->
|
|||
delete_samples(queue_msg_stats, Name, BIntervals),
|
||||
delete_samples(queue_msg_rates, Name, BIntervals),
|
||||
%% vhost message counts must be updated with the deletion of the messages in this queue
|
||||
case ets:lookup(old_aggr_stats, Name) of
|
||||
[{Name, Stats}] ->
|
||||
rabbit_mgmt_metrics_collector:delete_queue(queue_coarse_metrics, Name, Stats);
|
||||
[] ->
|
||||
ok
|
||||
end,
|
||||
ets:delete(old_aggr_stats, Name),
|
||||
ets:delete(old_aggr_stats, {Name, rates}),
|
||||
|
||||
rabbit_mgmt_metrics_collector:delete_queue(queue_coarse_metrics, Name),
|
||||
index_delete(channel_queue_stats_deliver_stats, queue, Name),
|
||||
index_delete(queue_exchange_stats_publish, queue, Name),
|
||||
index_delete(old_aggr_stats, queue, Name),
|
||||
index_delete(consumer_stats, queue, Name),
|
||||
|
||||
ok.
|
||||
|
|
@ -177,14 +166,6 @@ cleanup_index(consumer_stats, {Q, Ch, _} = Key) ->
|
|||
delete_index(consumer_stats, queue, {Q, Key}),
|
||||
delete_index(consumer_stats, channel, {Ch, Key}),
|
||||
ok;
|
||||
cleanup_index(old_aggr_stats, {Ch, {Q, _X}} = Key) ->
|
||||
delete_index(old_aggr_stats, queue, {Q, Key}),
|
||||
delete_index(old_aggr_stats, channel, {Ch, Key}),
|
||||
ok;
|
||||
cleanup_index(old_aggr_stats, {Ch, Q} = Key) ->
|
||||
delete_index(old_aggr_stats, queue, {Q, Key}),
|
||||
delete_index(old_aggr_stats, channel, {Ch, Key}),
|
||||
ok;
|
||||
cleanup_index(channel_exchange_stats_fine_stats, {{Ch, Ex}, _} = Key) ->
|
||||
delete_index(channel_exchange_stats_fine_stats, exchange, {Ex, Key}),
|
||||
delete_index(channel_exchange_stats_fine_stats, channel, {Ch, Key}),
|
||||
|
|
|
|||
|
|
@ -603,6 +603,7 @@ overview(Config) ->
|
|||
timer:sleep(5000), % TODO force stat emission
|
||||
force_stats(), % channel count needs a bit longer for 2nd chan
|
||||
Res = http_get(Config, "/overview"),
|
||||
ct:pal("Res ~p", [Res]),
|
||||
amqp_channel:close(Chan),
|
||||
rabbit_ct_client_helpers:close_connection(Conn),
|
||||
http_delete(Config, "/queues/%2f/queue-n1", ?NO_CONTENT),
|
||||
|
|
|
|||
|
|
@ -180,6 +180,8 @@ fine_stats_aggregation_test1(_Config) ->
|
|||
{[{x, 2}], [{q1, x, 10}, {q2, x, 2}], []},
|
||||
{[{x, 3}], [{q1, x, 25}, {q2, x, 2}], []}]),
|
||||
timer:sleep(5001),
|
||||
ct:pal("ets: ~p", [ets:tab2list(queue_coarse_metrics)]),
|
||||
|
||||
fine_stats_aggregation_test0(true, First),
|
||||
delete_q(q2),
|
||||
timer:sleep(5000),
|
||||
|
|
@ -402,16 +404,15 @@ detailed_stats_absent(Name, List) ->
|
|||
|
||||
filter_detailed_stats(Name, List) ->
|
||||
lists:foldl(fun(L, Acc) ->
|
||||
{[{stats, Stats}], [{_, Details}]} = lists:partition(fun({K, _}) ->
|
||||
K == stats
|
||||
end, L),
|
||||
case (pget(name, Details) =:= a2b(Name)) of
|
||||
true ->
|
||||
[Stats | Acc];
|
||||
false ->
|
||||
Acc
|
||||
end
|
||||
end, [], List).
|
||||
{[{stats, Stats}], [{_, Details}]} =
|
||||
lists:partition(fun({K, _}) -> K == stats end, L),
|
||||
case (pget(name, Details) =:= a2b(Name)) of
|
||||
true ->
|
||||
[Stats | Acc];
|
||||
false ->
|
||||
Acc
|
||||
end
|
||||
end, [], List).
|
||||
|
||||
expand(in) -> incoming;
|
||||
expand(out) -> outgoing;
|
||||
|
|
|
|||
Loading…
Reference in New Issue