Use index tables to clean up statistics database

This commit is contained in:
Daniil Fedotov 2016-09-21 13:08:11 +01:00
parent dc99947b06
commit a54cdfe118
4 changed files with 89 additions and 41 deletions

View File

@ -54,8 +54,19 @@
{node_node_coarse_stats, set},
{queue_msg_rates, set},
{vhost_msg_rates, set},
{old_aggr_stats, set}
]).
{old_aggr_stats, set}]).
-define(INDEX_TABLES, [consumer_stats_queue_index,
consumer_stats_channel_index,
old_aggr_stats_queue_index,
old_aggr_stats_channel_index,
channel_exchange_stats_fine_stats_exchange_index,
channel_exchange_stats_fine_stats_channel_index,
channel_queue_stats_deliver_stats_queue_index,
channel_queue_stats_deliver_stats_channel_index,
queue_exchange_stats_publish_queue_index,
queue_exchange_stats_publish_exchange_index,
node_node_coarse_stats_node_index]).
-define(GC_EVENTS, [connection_closed, channel_closed, consumer_deleted,
exchange_deleted, queue_deleted, vhost_deleted,

View File

@ -18,7 +18,7 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-include("rabbit_mgmt_metrics.hrl").
-behaviour(gen_server2).
-behaviour(gen_server).
-spec start_link(atom()) -> rabbit_types:ok_pid_or_error().
@ -28,6 +28,7 @@
-export([delete_queue/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([index_table/2]).
-import(rabbit_misc, [pget/3]).
-import(rabbit_mgmt_db, [pget/2, lookup_element/3]).
@ -38,16 +39,16 @@ name(Table) ->
list_to_atom((atom_to_list(Table) ++ "_metrics_collector")).
start_link(Table) ->
gen_server2:start_link({local, name(Table)}, ?MODULE, [Table], []).
gen_server:start_link({local, name(Table)}, ?MODULE, [Table], []).
override_lookups(Table, Lookups) ->
gen_server2:call(name(Table), {override_lookups, Lookups}, infinity).
gen_server:call(name(Table), {override_lookups, Lookups}, infinity).
reset_lookups(Table) ->
gen_server2:call(name(Table), reset_lookups, infinity).
gen_server:call(name(Table), reset_lookups, infinity).
delete_queue(Table, Queue, Stats) ->
gen_server2:cast(name(Table), {delete_queue, Queue, Stats}).
gen_server:cast(name(Table), {delete_queue, Queue, Stats}).
init([Table]) ->
{ok, RatesMode} = application:get_env(rabbitmq_management, rates_mode),
@ -70,6 +71,8 @@ handle_call(reset_lookups, _From, State) ->
handle_call({override_lookups, Lookups}, _From, State) ->
{reply, ok, State#state{lookup_queue = pget(queue, Lookups),
lookup_exchange = pget(exchange, Lookups)}};
handle_call({submit, Fun}, _From, State) ->
{reply, Fun(), State};
handle_call(_Request, _From, State) ->
{noreply, State}.
@ -164,6 +167,8 @@ aggregate_entry(TS, {{Ch, X} = Id, Metrics}, #state{table = channel_exchange_met
pget(return_unroutable, Metrics, 0)),
{Publish, _, _} = Diff = get_difference(Id, Stats),
ets:insert(old_aggr_stats, ?old_aggr_stats(Id, Stats)),
%% Custom insert for channel only to avoid ambiguity with {Channel, Queue} key
ets:insert(index_table(old_aggr_stats, channel), {Ch, Id}),
[begin
insert_entry(channel_stats_fine_stats, Ch, TS, Diff, Size, Interval,
true),
@ -197,7 +202,7 @@ aggregate_entry(TS, {{Ch, Q} = Id, Metrics}, #state{table = channel_queue_metric
pget(ack, Metrics, 0),
Deliver + DeliverNoAck + Get + GetNoAck),
Diff = get_difference(Id, Stats),
ets:insert(old_aggr_stats, ?old_aggr_stats(Id, Stats)),
insert_with_index(old_aggr_stats, Id, ?old_aggr_stats(Id, Stats)),
[begin
insert_entry(vhost_stats_deliver_stats, vhost(Q), TS, Diff, Size,
Interval, true),
@ -259,7 +264,7 @@ aggregate_entry(_TS, {Id, Exclusive, AckRequired, PrefetchCount, Args},
{ack_required, AckRequired},
{prefetch_count, PrefetchCount},
{arguments, Args}], {[], false}),
ets:insert(consumer_stats, ?consumer_stats(Id, Fmt)),
insert_with_index(consumer_stats, Id, ?consumer_stats(Id, Fmt)),
ok;
aggregate_entry(TS, {Id, Metrics}, #state{table = queue_metrics,
policies = {BPolicies, _, GPolicies},
@ -344,7 +349,7 @@ insert_entry(Table, Id, TS, Entry, Size, Interval, Incremental) ->
exometer_slide:new(Size * 1000, [{interval, Interval * 1000},
{incremental, Incremental}])
end,
ets:insert(Table, {Key, exometer_slide:add_element(TS, Entry, Slide)}).
insert_with_index(Table, Key, {Key, exometer_slide:add_element(TS, Entry, Slide)}).
get_difference(Id, Stats) ->
case ets:lookup(old_aggr_stats, Id) of
@ -385,3 +390,39 @@ queue_exists(Name) ->
_ ->
false
end.
insert_with_index(Table, Key, Tuple) ->
Insert = ets:insert(Table, Tuple),
insert_index(Table, Key),
Insert.
insert_index(consumer_stats, {Q, Ch, _} = Key) ->
ets:insert(index_table(consumer_stats, queue), {Q, Key}),
ets:insert(index_table(consumer_stats, channel), {Ch, Key});
insert_index(old_aggr_stats, {Ch, Q} = Key) ->
ets:insert(index_table(old_aggr_stats, queue), {Q, Key}),
ets:insert(index_table(old_aggr_stats, channel), {Ch, Key});
insert_index(channel_exchange_stats_fine_stats, {{Ch, Ex}, _} = Key) ->
ets:insert(index_table(channel_exchange_stats_fine_stats, exchange), {Ex, Key}),
ets:insert(index_table(channel_exchange_stats_fine_stats, channel), {Ch, Key});
insert_index(channel_queue_stats_deliver_stats, {{Ch, Q}, _} = Key) ->
ets:insert(index_table(channel_queue_stats_deliver_stats, queue), {Q, Key}),
ets:insert(index_table(channel_queue_stats_deliver_stats, channel), {Ch, Key});
insert_index(queue_exchange_stats_publish, {{Q, Ex}, _} = Key) ->
ets:insert(index_table(queue_exchange_stats_publish, queue), {Q, Key}),
ets:insert(index_table(queue_exchange_stats_publish, exchange), {Ex, Key});
insert_index(node_node_coarse_stats, {{_, Node}, _} = Key) ->
ets:insert(index_table(node_node_coarse_stats, node), {Node, Key});
insert_index(_, _) -> ok.
index_table(consumer_stats, queue) -> consumer_stats_queue_index;
index_table(consumer_stats, channel) -> consumer_stats_channel_index;
index_table(old_aggr_stats, queue) -> old_aggr_stats_queue_index;
index_table(old_aggr_stats, channel) -> old_aggr_stats_channel_index;
index_table(channel_exchange_stats_fine_stats, exchange) -> channel_exchange_stats_fine_stats_exchange_index;
index_table(channel_exchange_stats_fine_stats, channel) -> channel_exchange_stats_fine_stats_channel_index;
index_table(channel_queue_stats_deliver_stats, queue) -> channel_queue_stats_deliver_stats_queue_index;
index_table(channel_queue_stats_deliver_stats, channel) -> channel_queue_stats_deliver_stats_channel_index;
index_table(queue_exchange_stats_publish, queue) -> queue_exchange_stats_publish_queue_index;
index_table(queue_exchange_stats_publish, exchange) -> queue_exchange_stats_publish_exchange_index;
index_table(node_node_coarse_stats, node) -> node_node_coarse_stats_node_index.

View File

@ -34,7 +34,7 @@ name(EventType) ->
list_to_atom((atom_to_list(EventType) ++ "_metrics_gc")).
start_link(EventType) ->
gen_server2:start_link({local, name(EventType)}, ?MODULE, [], []).
gen_server:start_link({local, name(EventType)}, ?MODULE, [], []).
init(_) ->
{ok, Policies} = application:get_env(
@ -64,8 +64,7 @@ handle_cast({event, #event{type = exchange_deleted, props = Props}},
remove_exchange(Name, Intervals),
{noreply, State};
handle_cast({event, #event{type = queue_deleted, props = Props}},
State = #state{basic_i = BIntervals,
detailed_i = DIntervals}) ->
State = #state{basic_i = BIntervals, detailed_i = DIntervals}) ->
Name = pget(name, Props),
remove_queue(Name, BIntervals, DIntervals),
{noreply, State};
@ -102,30 +101,30 @@ remove_channel(Id, Intervals) ->
delete_samples(channel_process_stats, Id, Intervals),
delete_samples(channel_stats_fine_stats, Id, Intervals),
delete_samples(channel_stats_deliver_stats, Id, Intervals),
ets:select_delete(consumer_stats, match_consumer_spec(Id)),
ets:select_delete(old_aggr_stats, match_spec(Id)),
ets:select_delete(channel_exchange_stats_fine_stats, match_interval_spec(Id)),
ets:select_delete(channel_queue_stats_deliver_stats, match_interval_spec(Id)),
index_delete(consumer_stats, channel, Id),
index_delete(old_aggr_stats, channel, Id),
index_delete(channel_exchange_stats_fine_stats, channel, Id),
index_delete(channel_queue_stats_deliver_stats, channel, Id),
ok.
remove_consumer(Props) ->
Id = {pget(queue, Props), pget(channel, Props), pget(consumer_tag, Props)},
ets:delete(consumer_stats, Id).
ets:delete(consumer_stats, Id),
cleanup_index(consumer_stats, Id).
remove_exchange(Name, Intervals) ->
delete_samples(exchange_stats_publish_out, Name, Intervals),
delete_samples(exchange_stats_publish_in, Name, Intervals),
ets:select_delete(queue_exchange_stats_publish, match_second_interval_spec({Name})),
ets:select_delete(channel_exchange_stats_fine_stats, match_second_interval_spec({Name})).
index_delete(queue_exchange_stats_publish, exchange, Name),
index_delete(channel_exchange_stats_fine_stats, exchange, Name).
remove_queue(Name, BIntervals, DIntervals) ->
ets:delete(queue_stats, Name),
delete_samples(queue_stats_publish, Name, DIntervals),
delete_samples(queue_stats_deliver_stats, Name, DIntervals),
ets:select_delete(channel_queue_stats_deliver_stats, match_second_interval_spec({Name})),
ets:select_delete(queue_exchange_stats_publish, match_interval_spec({Name})),
delete_samples(queue_process_stats, Name, BIntervals),
delete_samples(queue_msg_stats, Name, BIntervals),
delete_samples(queue_msg_rates, Name, BIntervals),
%% vhost message counts must be updated with the deletion of the messages in this queue
case ets:lookup(old_aggr_stats, Name) of
[{Name, Stats}] ->
@ -135,8 +134,12 @@ remove_queue(Name, BIntervals, DIntervals) ->
end,
ets:delete(old_aggr_stats, Name),
ets:delete(old_aggr_stats, {Name, rates}),
ets:select_delete(old_aggr_stats, match_second_spec({Name})),
ets:select_delete(consumer_stats, match_queue_consumer_spec({Name})),
index_delete(channel_queue_stats_deliver_stats, queue, Name),
index_delete(queue_exchange_stats_publish, queue, Name),
index_delete(old_aggr_stats, queue, Name),
index_delete(consumer_stats, queue, Name),
ok.
remove_vhost(Name, BIntervals, DIntervals) ->
@ -145,7 +148,7 @@ remove_vhost(Name, BIntervals, DIntervals) ->
delete_samples(vhost_stats_deliver_stats, Name, DIntervals).
remove_node_node(Name) ->
ets:select_delete(node_node_coarse_stats, match_second_interval_spec({Name})).
index_delete(node_node_coarse_stats, node, Name).
intervals(Type, Policies) ->
[I || {_, I} <- proplists:get_value(Type, Policies)].
@ -153,20 +156,12 @@ intervals(Type, Policies) ->
delete_samples(Table, Id, Intervals) ->
[ets:delete(Table, {Id, I}) || I <- Intervals].
match_spec(Id) ->
[{{{'$1', '_'}, '_'}, [{'==', Id, '$1'}], [true]}].
index_delete(Table, Type, Id) ->
IndexTable = rabbit_mgmt_metrics_collector:index_table(Table, Type),
Keys = ets:lookup(IndexTable, Id),
[ ets:delete(Table, Key) || Key <- Keys ],
ets:delete(IndexTable, Id).
match_second_spec(Id) ->
[{{{'_', '$1'}, '_'}, [{'==', Id, '$1'}], [true]}].
match_interval_spec(Id) ->
[{{{{'$1', '_'}, '_'}, '_'}, [{'==', Id, '$1'}], [true]}].
match_second_interval_spec(Id) ->
[{{{{'_', '$1'}, '_'}, '_'}, [{'==', Id, '$1'}], [true]}].
match_consumer_spec(Id) ->
[{{{'_', '$1', '_'}, '_'}, [{'==', Id, '$1'}], [true]}].
match_queue_consumer_spec(Id) ->
[{{{'$1', '_', '_'}, '_'}, [{'==', Id, '$1'}], [true]}].
cleanup_index(consumer_stats, {Q, Ch, _} = Key) ->
ets:delete_object(consumer_stats_queue_index, {Q, Key}),
ets:delete_object(consumer_stats_channel_index, {Ch, Key}).

View File

@ -31,6 +31,7 @@ start_link() ->
init(_) ->
[ets:new(Key, [public, Type, named_table]) || {Key, Type} <- ?TABLES],
[ets:new(Key, [public, bag, named_table]) || Key <- ?INDEX_TABLES],
{ok, #state{}}.
handle_call(_Request, _From, State) ->