Make aggregate_entry a pure function returning a set of

ets operations.

This avoids inserting into the same exometer slide data structure
multiple times each collection and thus reduces the amount of
unrealised rate data is kept around until the next interval.

[#140575873]
This commit is contained in:
kjnilsson 2017-03-02 08:13:17 +00:00
parent bc59afd97e
commit 558770374b
2 changed files with 293 additions and 219 deletions

View File

@ -2,6 +2,7 @@
.*.sw? .*.sw?
*.beam *.beam
.erlang.mk/ .erlang.mk/
*.plt
cover/ cover/
deps/ deps/
doc/ doc/

View File

@ -151,161 +151,183 @@ handle_deleted_queues(_T, _R, _P) -> ok.
aggregate_metrics(Timestamp, #state{table = Table, aggregate_metrics(Timestamp, #state{table = Table,
policies = {_, _, GPolicies}} = State0) -> policies = {_, _, GPolicies}} = State0) ->
Table = State0#state.table, Table = State0#state.table,
{Next, #state{old_aggr_stats = Remainders}} = ets:foldl( {Next, Ops, #state{old_aggr_stats = Remainders}} =
fun(R, {Dict, State}) -> ets:foldl(fun(R, {NextStats, O, State}) ->
aggregate_entry(Timestamp, R, Dict, State) aggregate_entry(R, NextStats, O, State)
end, {dict:new(), State0}, Table), end, {dict:new(), dict:new(), State0}, Table),
dict:fold(fun(Tbl, TblOps, Acc) ->
_ = exec_table_ops(Tbl, Timestamp, TblOps),
Acc
end, no_acc, Ops),
handle_deleted_queues(Table, Remainders, GPolicies), handle_deleted_queues(Table, Remainders, GPolicies),
State0#state{old_aggr_stats = Next}. State0#state{old_aggr_stats = Next}.
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = connection_created} = State) -> exec_table_ops(Table, Timestamp, TableOps) ->
dict:fold(fun(_Id, {insert, Entry}, A) ->
ets:insert(Table, Entry),
A;
(Id, {insert_with_index, Entry}, A) ->
insert_with_index(Table, Id, Entry),
A;
({Id, Size, Interval, Incremental},
{insert_entry, Entry}, A) ->
insert_entry(Table, Id, Timestamp, Entry, Size,
Interval, Incremental),
A
end, no_acc, TableOps).
aggregate_entry({Id, Metrics}, NextStats, Ops0,
#state{table = connection_created} = State) ->
Ftd = rabbit_mgmt_format:format( Ftd = rabbit_mgmt_format:format(
Metrics, Metrics,
{fun rabbit_mgmt_format:format_connection_created/1, true}), {fun rabbit_mgmt_format:format_connection_created/1, true}),
ets:insert(connection_created_stats, Entry = ?connection_created_stats(Id, pget(name, Ftd, unknown), Ftd),
?connection_created_stats(Id, pget(name, Ftd, unknown), Ftd)), Ops = insert_op(connection_created_stats, Id, Entry, Ops0),
{NextStats, State}; {NextStats, Ops, State};
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = connection_metrics} = State) -> aggregate_entry({Id, Metrics}, NextStats, Ops0,
ets:insert(connection_stats, ?connection_stats(Id, Metrics)), #state{table = connection_metrics} = State) ->
{NextStats, State}; Entry = ?connection_stats(Id, Metrics),
aggregate_entry(TS, {Id, RecvOct, SendOct, Reductions, 0}, NextStats, Ops = insert_op(connection_stats, Id, Entry, Ops0),
{NextStats, Ops, State};
aggregate_entry({Id, RecvOct, SendOct, Reductions, 0}, NextStats, Ops0,
#state{table = connection_coarse_metrics, #state{table = connection_coarse_metrics,
policies = {BPolicies, _, GPolicies}} = State) -> policies = {BPolicies, _, GPolicies}} = State) ->
Stats = ?vhost_stats_coarse_conn_stats(RecvOct, SendOct), Stats = ?vhost_stats_coarse_conn_stats(RecvOct, SendOct),
Diff = get_difference(Id, Stats, State), Diff = get_difference(Id, Stats, State),
[insert_entry(vhost_stats_coarse_conn_stats, vhost({connection_created, Id}),
TS, Diff, Size, Interval, true) || {Size, Interval} <- GPolicies], Ops1 = insert_entry_ops(vhost_stats_coarse_conn_stats,
[begin vhost({connection_created, Id}), true, Diff, Ops0,
insert_entry(connection_stats_coarse_conn_stats, Id, TS, GPolicies),
?connection_stats_coarse_conn_stats(RecvOct, SendOct, Reductions),
Size, Interval, false) Entry = ?connection_stats_coarse_conn_stats(RecvOct, SendOct, Reductions),
end || {Size, Interval} <- BPolicies], Ops2 = insert_entry_ops(connection_stats_coarse_conn_stats, Id, false, Entry,
{insert_old_aggr_stats(NextStats, Id, Stats), State}; Ops1, BPolicies),
aggregate_entry(TS, {Id, RecvOct, SendOct, _Reductions, 1}, NextStats, {insert_old_aggr_stats(NextStats, Id, Stats), Ops2, State};
aggregate_entry({Id, RecvOct, SendOct, _Reductions, 1}, NextStats, Ops0,
#state{table = connection_coarse_metrics, #state{table = connection_coarse_metrics,
policies = {_BPolicies, _, GPolicies}} = State) -> policies = {_BPolicies, _, GPolicies}} = State) ->
Stats = ?vhost_stats_coarse_conn_stats(RecvOct, SendOct), Stats = ?vhost_stats_coarse_conn_stats(RecvOct, SendOct),
Diff = get_difference(Id, Stats, State), Diff = get_difference(Id, Stats, State),
[insert_entry(vhost_stats_coarse_conn_stats, vhost({connection_created, Id}), Ops1 = insert_entry_ops(vhost_stats_coarse_conn_stats,
TS, Diff, Size, Interval, true) || {Size, Interval} <- GPolicies], vhost({connection_created, Id}), true, Diff, Ops0,
GPolicies),
rabbit_core_metrics:delete(connection_coarse_metrics, Id), rabbit_core_metrics:delete(connection_coarse_metrics, Id),
{NextStats, State}; {NextStats, Ops1, State};
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = channel_created} = State) -> aggregate_entry({Id, Metrics}, NextStats, Ops0,
#state{table = channel_created} = State) ->
Ftd = rabbit_mgmt_format:format(Metrics, {[], false}), Ftd = rabbit_mgmt_format:format(Metrics, {[], false}),
ets:insert(channel_created_stats, Entry = ?channel_created_stats(Id, pget(name, Ftd, unknown), Ftd),
?channel_created_stats(Id, pget(name, Ftd, unknown), Ftd)), Ops = insert_op(channel_created_stats, Id, Entry, Ops0),
{NextStats, State}; {NextStats, Ops, State};
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = channel_metrics} = State) -> aggregate_entry({Id, Metrics}, NextStats, Ops0,
#state{table = channel_metrics} = State) ->
Ftd = rabbit_mgmt_format:format(Metrics, Ftd = rabbit_mgmt_format:format(Metrics,
{fun rabbit_mgmt_format:format_channel_stats/1, true}), {fun rabbit_mgmt_format:format_channel_stats/1, true}),
ets:insert(channel_stats, ?channel_stats(Id, Ftd)),
{NextStats, State}; Entry = ?channel_stats(Id, Ftd),
aggregate_entry(TS, {{Ch, X} = Id, Publish0, Confirm, ReturnUnroutable, 0}, NextStats, Ops = insert_op(channel_stats, Id, Entry, Ops0),
{NextStats, Ops, State};
aggregate_entry({{Ch, X} = Id, Publish0, Confirm, ReturnUnroutable, 0},
NextStats, Ops0,
#state{table = channel_exchange_metrics, #state{table = channel_exchange_metrics,
policies = {BPolicies, DPolicies, GPolicies}, policies = {BPolicies, DPolicies, GPolicies},
rates_mode = RatesMode, rates_mode = RatesMode,
lookup_exchange = ExchangeFun} = State) -> lookup_exchange = ExchangeFun} = State) ->
Stats = ?channel_stats_fine_stats(Publish0, Confirm, ReturnUnroutable), Stats = ?channel_stats_fine_stats(Publish0, Confirm, ReturnUnroutable),
{Publish, _, _} = Diff = get_difference(Id, Stats, State), {Publish, _, _} = Diff = get_difference(Id, Stats, State),
[begin
insert_entry(channel_stats_fine_stats, Ch, TS, Diff, Size, Interval, Ops1 = insert_entry_ops(channel_stats_fine_stats, Ch, true, Diff, Ops0,
true) BPolicies),
end || {Size, Interval} <- BPolicies], Ops2 = insert_entry_ops(vhost_stats_fine_stats, vhost(X), true, Diff, Ops1,
[begin GPolicies),
insert_entry(vhost_stats_fine_stats, vhost(X), TS, Diff, Size, Ops3 = case {ExchangeFun(X), RatesMode} of
Interval, true) {true, basic} ->
end || {Size, Interval} <- GPolicies], Entry = ?exchange_stats_publish_in(Publish),
_ = case {ExchangeFun(X), RatesMode} of insert_entry_ops(exchange_stats_publish_in, X, true, Entry,
{true, basic} -> Ops2, DPolicies);
[insert_entry(exchange_stats_publish_in, X, TS, {true, _} ->
?exchange_stats_publish_in(Publish), Size, Interval, true) Entry = ?exchange_stats_publish_in(Publish),
|| {Size, Interval} <- DPolicies]; O = insert_entry_ops(exchange_stats_publish_in, X, true,
{true, _} -> Entry, Ops2, DPolicies),
[begin insert_entry_ops(channel_exchange_stats_fine_stats, Id,
insert_entry(exchange_stats_publish_in, X, TS, false, Stats, O, DPolicies);
?exchange_stats_publish_in(Publish), Size, Interval, true), _ ->
insert_entry(channel_exchange_stats_fine_stats, Id, TS, Stats, Ops2
Size, Interval, false) end,
end || {Size, Interval} <- DPolicies]; {insert_old_aggr_stats(NextStats, Id, Stats), Ops3, State};
_ -> aggregate_entry({{_Ch, X} = Id, Publish0, Confirm, ReturnUnroutable, 1},
ok NextStats, Ops0,
end,
{insert_old_aggr_stats(NextStats, Id, Stats), State};
aggregate_entry(TS, {{_Ch, X} = Id, Publish0, Confirm, ReturnUnroutable, 1}, NextStats,
#state{table = channel_exchange_metrics, #state{table = channel_exchange_metrics,
policies = {_BPolicies, DPolicies, GPolicies}, policies = {_BPolicies, DPolicies, GPolicies},
lookup_exchange = ExchangeFun} = State) -> lookup_exchange = ExchangeFun} = State) ->
Stats = ?channel_stats_fine_stats(Publish0, Confirm, ReturnUnroutable), Stats = ?channel_stats_fine_stats(Publish0, Confirm, ReturnUnroutable),
{Publish, _, _} = Diff = get_difference(Id, Stats, State), {Publish, _, _} = Diff = get_difference(Id, Stats, State),
[begin Ops1 = insert_entry_ops(vhost_stats_fine_stats, vhost(X), true, Diff, Ops0,
insert_entry(vhost_stats_fine_stats, vhost(X), TS, Diff, Size, GPolicies),
Interval, true) Ops2 = case ExchangeFun(X) of
end || {Size, Interval} <- GPolicies], true ->
_ = case ExchangeFun(X) of Entry = ?exchange_stats_publish_in(Publish),
true -> insert_entry_ops(exchange_stats_publish_in, X, true, Entry,
[insert_entry(exchange_stats_publish_in, X, TS, Ops1, DPolicies);
?exchange_stats_publish_in(Publish), Size, Interval, true) _ ->
|| {Size, Interval} <- DPolicies]; Ops1
_ -> end,
ok
end,
rabbit_core_metrics:delete(channel_exchange_metrics, Id), rabbit_core_metrics:delete(channel_exchange_metrics, Id),
{NextStats, State}; {NextStats, Ops2, State};
aggregate_entry(TS, {{Ch, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck, Redeliver, Ack, 0}, aggregate_entry({{Ch, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck,
NextStats, Redeliver, Ack, 0}, NextStats, Ops0,
#state{table = channel_queue_metrics, #state{table = channel_queue_metrics,
policies = {BPolicies, DPolicies, GPolicies}, policies = {BPolicies, DPolicies, GPolicies},
rates_mode = RatesMode, rates_mode = RatesMode,
lookup_queue = QueueFun} = State) -> lookup_queue = QueueFun} = State) ->
Stats = ?vhost_stats_deliver_stats(Get, GetNoAck, Deliver, DeliverNoAck, Stats = ?vhost_stats_deliver_stats(Get, GetNoAck, Deliver, DeliverNoAck,
Redeliver, Ack, Redeliver, Ack,
Deliver + DeliverNoAck + Get + GetNoAck), Deliver + DeliverNoAck + Get + GetNoAck),
Diff = get_difference(Id, Stats, State), Diff = get_difference(Id, Stats, State),
[begin
insert_entry(vhost_stats_deliver_stats, vhost(Q), TS, Diff, Size, Ops1 = insert_entry_ops(vhost_stats_deliver_stats, vhost(Q), true, Diff,
Interval, true) Ops0, GPolicies),
end || {Size, Interval} <- GPolicies],
[begin Ops2 = insert_entry_ops(channel_stats_deliver_stats, Ch, true, Diff, Ops1,
insert_entry(channel_stats_deliver_stats, Ch, TS, Diff, Size, Interval, BPolicies),
true)
end || {Size, Interval} <- BPolicies], Ops3 = case {QueueFun(Q), RatesMode} of
_ = case {QueueFun(Q), RatesMode} of {true, basic} ->
{true, basic} -> insert_entry_ops(queue_stats_deliver_stats, Q, true, Diff,
[insert_entry(queue_stats_deliver_stats, Q, TS, Diff, Size, Interval, Ops2, BPolicies);
true) || {Size, Interval} <- BPolicies]; {true, _} ->
{true, _} -> O = insert_entry_ops(queue_stats_deliver_stats, Q, true,
[insert_entry(queue_stats_deliver_stats, Q, TS, Diff, Size, Interval, Diff, Ops2, BPolicies),
true) || {Size, Interval} <- BPolicies], insert_entry_ops(channel_queue_stats_deliver_stats, Id,
[insert_entry(channel_queue_stats_deliver_stats, Id, TS, Stats, Size, false, Stats, O, DPolicies);
Interval, false) _ ->
|| {Size, Interval} <- DPolicies]; Ops2
_ -> end,
ok {insert_old_aggr_stats(NextStats, Id, Stats), Ops3, State};
end, aggregate_entry({{_, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck,
{insert_old_aggr_stats(NextStats, Id, Stats), State}; Redeliver, Ack, 1}, NextStats, Ops0,
aggregate_entry(TS, {{_, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck, Redeliver, Ack, 1},
NextStats,
#state{table = channel_queue_metrics, #state{table = channel_queue_metrics,
policies = {BPolicies, _, GPolicies}, policies = {BPolicies, _, GPolicies},
lookup_queue = QueueFun} = State) -> lookup_queue = QueueFun} = State) ->
Stats = ?vhost_stats_deliver_stats(Get, GetNoAck, Deliver, DeliverNoAck, Stats = ?vhost_stats_deliver_stats(Get, GetNoAck, Deliver, DeliverNoAck,
Redeliver, Ack, Redeliver, Ack,
Deliver + DeliverNoAck + Get + GetNoAck), Deliver + DeliverNoAck + Get + GetNoAck),
Diff = get_difference(Id, Stats, State), Diff = get_difference(Id, Stats, State),
[begin
insert_entry(vhost_stats_deliver_stats, vhost(Q), TS, Diff, Size, Ops1 = insert_entry_ops(vhost_stats_deliver_stats, vhost(Q), true, Diff,
Interval, true) Ops0, GPolicies),
end || {Size, Interval} <- GPolicies], Ops2 = case QueueFun(Q) of
_ = case QueueFun(Q) of true ->
true -> insert_entry_ops(queue_stats_deliver_stats, Q, true, Diff,
[insert_entry(queue_stats_deliver_stats, Q, TS, Diff, Size, Interval, Ops1, BPolicies);
true) || {Size, Interval} <- BPolicies]; _ ->
_ -> Ops1
ok end,
end,
rabbit_core_metrics:delete(channel_queue_metrics, Id), rabbit_core_metrics:delete(channel_queue_metrics, Id),
{NextStats, State}; {NextStats, Ops2, State};
aggregate_entry(TS, {{_Ch, {Q, X}} = Id, Publish, ToDelete}, NextStats, aggregate_entry({{_Ch, {Q, X}} = Id, Publish, ToDelete}, NextStats, Ops0,
#state{table = channel_queue_exchange_metrics, #state{table = channel_queue_exchange_metrics,
policies = {BPolicies, _, _}, policies = {BPolicies, _, _},
rates_mode = RatesMode, rates_mode = RatesMode,
@ -313,148 +335,157 @@ aggregate_entry(TS, {{_Ch, {Q, X}} = Id, Publish, ToDelete}, NextStats,
lookup_exchange = ExchangeFun} = State) -> lookup_exchange = ExchangeFun} = State) ->
Stats = ?queue_stats_publish(Publish), Stats = ?queue_stats_publish(Publish),
Diff = get_difference(Id, Stats, State), Diff = get_difference(Id, Stats, State),
_ = case {QueueFun(Q), ExchangeFun(X), RatesMode, ToDelete} of Ops1 = case {QueueFun(Q), ExchangeFun(X), RatesMode, ToDelete} of
{true, false, _, _} -> {true, false, _, _} ->
[insert_entry(queue_stats_publish, Q, TS, Diff, Size, Interval, true) insert_entry_ops(queue_stats_publish, Q, true, Diff,
|| {Size, Interval} <- BPolicies]; Ops0, BPolicies);
{false, true, _, _} -> {false, true, _, _} ->
[insert_entry(exchange_stats_publish_out, X, TS, Diff, Size, Interval, true) insert_entry_ops(exchange_stats_publish_out, X, true, Diff,
|| {Size, Interval} <- BPolicies]; Ops0, BPolicies);
{true, true, basic, _} -> {true, true, basic, _} ->
[begin O = insert_entry_ops(queue_stats_publish, Q, true, Diff,
insert_entry(queue_stats_publish, Q, TS, Diff, Size, Interval, true), Ops0, BPolicies),
insert_entry(exchange_stats_publish_out, X, TS, Diff, Size, Interval, true) insert_entry_ops(exchange_stats_publish_out, X, true, Diff,
end || {Size, Interval} <- BPolicies]; O, BPolicies);
{true, true, _, 0} -> {true, true, _, 0} ->
[begin O1 = insert_entry_ops(queue_stats_publish, Q, true, Diff,
insert_entry(queue_stats_publish, Q, TS, Diff, Size, Interval, true), Ops0, BPolicies),
insert_entry(exchange_stats_publish_out, X, TS, Diff, Size, Interval, true), O2 = insert_entry_ops(exchange_stats_publish_out, X, true,
insert_entry(queue_exchange_stats_publish, {Q, X}, TS, Diff, Size, Interval, true) Diff, O1, BPolicies),
end || {Size, Interval} <- BPolicies]; insert_entry_ops(queue_exchange_stats_publish, {Q, X},
{true, true, _, 1} -> true, Diff, O2, BPolicies);
[begin {true, true, _, 1} ->
insert_entry(queue_stats_publish, Q, TS, Diff, Size, Interval, true), O = insert_entry_ops(queue_stats_publish, Q, true, Diff,
insert_entry(exchange_stats_publish_out, X, TS, Diff, Size, Interval, true) Ops0, BPolicies),
end || {Size, Interval} <- BPolicies]; insert_entry_ops(exchange_stats_publish_out, X, true,
_ -> Diff, O, BPolicies);
ok _ ->
end, Ops0
end,
case ToDelete of case ToDelete of
0 -> 0 ->
{insert_old_aggr_stats(NextStats, Id, Stats), State}; {insert_old_aggr_stats(NextStats, Id, Stats), Ops1, State};
1 -> 1 ->
rabbit_core_metrics:delete(channel_queue_exchange_metrics, Id), rabbit_core_metrics:delete(channel_queue_exchange_metrics, Id),
{NextStats, State} {NextStats, Ops1, State}
end; end;
aggregate_entry(TS, {Id, Reductions}, NextStats, aggregate_entry({Id, Reductions}, NextStats, Ops0,
#state{table = channel_process_metrics, #state{table = channel_process_metrics,
policies = {BPolicies, _, _}} = State) -> policies = {BPolicies, _, _}} = State) ->
[begin Entry = ?channel_process_stats(Reductions),
insert_entry(channel_process_stats, Id, TS, ?channel_process_stats(Reductions), Ops = insert_entry_ops(channel_process_stats, Id, false,
Size, Interval, false) Entry, Ops0, BPolicies),
end || {Size, Interval} <- BPolicies], {NextStats, Ops, State};
{NextStats, State}; aggregate_entry({Id, Exclusive, AckRequired, PrefetchCount, Args},
aggregate_entry(_TS, {Id, Exclusive, AckRequired, PrefetchCount, Args}, NextStats, NextStats, Ops0,
#state{table = consumer_created} = State) -> #state{table = consumer_created} = State) ->
Fmt = rabbit_mgmt_format:format([{exclusive, Exclusive}, Fmt = rabbit_mgmt_format:format([{exclusive, Exclusive},
{ack_required, AckRequired}, {ack_required, AckRequired},
{prefetch_count, PrefetchCount}, {prefetch_count, PrefetchCount},
{arguments, Args}], {[], false}), {arguments, Args}], {[], false}),
insert_with_index(consumer_stats, Id, ?consumer_stats(Id, Fmt)), Entry = ?consumer_stats(Id, Fmt),
{NextStats, State}; Ops = insert_with_index_op(consumer_stats, Id, Entry, Ops0),
aggregate_entry(TS, {Id, Metrics, 0}, NextStats, {NextStats, Ops ,State};
aggregate_entry({Id, Metrics, 0}, NextStats, Ops0,
#state{table = queue_metrics, #state{table = queue_metrics,
policies = {BPolicies, _, GPolicies}, policies = {BPolicies, _, GPolicies},
lookup_queue = QueueFun} = State) -> lookup_queue = QueueFun} = State) ->
Stats = ?queue_msg_rates(pget(disk_reads, Metrics, 0), pget(disk_writes, Metrics, 0)), Stats = ?queue_msg_rates(pget(disk_reads, Metrics, 0),
pget(disk_writes, Metrics, 0)),
Diff = get_difference(Id, Stats, State), Diff = get_difference(Id, Stats, State),
[insert_entry(vhost_msg_rates, vhost(Id), TS, Diff, Size, Interval, true) Ops1 = insert_entry_ops(vhost_msg_rates, vhost(Id), true, Diff, Ops0,
|| {Size, Interval} <- GPolicies], GPolicies),
case QueueFun(Id) of Ops2 = case QueueFun(Id) of
true -> true ->
[insert_entry(queue_msg_rates, Id, TS, Stats, Size, Interval, false) O = insert_entry_ops(queue_msg_rates, Id, false, Stats, Ops1,
|| {Size, Interval} <- BPolicies], BPolicies),
Fmt = rabbit_mgmt_format:format( Fmt = rabbit_mgmt_format:format(
Metrics, Metrics,
{fun rabbit_mgmt_format:format_queue_stats/1, false}), {fun rabbit_mgmt_format:format_queue_stats/1, false}),
ets:insert(queue_stats, ?queue_stats(Id, Fmt)); insert_op(queue_stats, Id, ?queue_stats(Id, Fmt), O);
false -> false ->
ok Ops1
end, end,
{insert_old_aggr_stats(NextStats, Id, Stats), State}; {insert_old_aggr_stats(NextStats, Id, Stats), Ops2, State};
aggregate_entry(TS, {Id, Metrics, 1}, NextStats, aggregate_entry({Id, Metrics, 1}, NextStats, Ops0,
#state{table = queue_metrics, #state{table = queue_metrics,
policies = {_, _, GPolicies}} = State) -> policies = {_, _, GPolicies}} = State) ->
Stats = ?queue_msg_rates(pget(disk_reads, Metrics, 0), pget(disk_writes, Metrics, 0)), Stats = ?queue_msg_rates(pget(disk_reads, Metrics, 0),
pget(disk_writes, Metrics, 0)),
Diff = get_difference(Id, Stats, State), Diff = get_difference(Id, Stats, State),
[insert_entry(vhost_msg_rates, vhost(Id), TS, Diff, Size, Interval, true) Ops = insert_entry_ops(vhost_msg_rates, vhost(Id), true, Diff, Ops0,
|| {Size, Interval} <- GPolicies], GPolicies),
rabbit_core_metrics:delete(queue_metrics, Id), rabbit_core_metrics:delete(queue_metrics, Id),
{NextStats, State}; {NextStats, Ops, State};
aggregate_entry(TS, {Name, Ready, Unack, Msgs, Red}, NextStats, aggregate_entry({Name, Ready, Unack, Msgs, Red}, NextStats, Ops0,
#state{table = queue_coarse_metrics, #state{table = queue_coarse_metrics,
old_aggr_stats = Old, old_aggr_stats = Old,
policies = {BPolicies, _, GPolicies}, policies = {BPolicies, _, GPolicies},
lookup_queue = QueueFun} = State) -> lookup_queue = QueueFun} = State) ->
Stats = ?vhost_msg_stats(Ready, Unack, Msgs), Stats = ?vhost_msg_stats(Ready, Unack, Msgs),
Diff = get_difference(Name, Stats, State), Diff = get_difference(Name, Stats, State),
[insert_entry(vhost_msg_stats, vhost(Name), TS, Diff, Size, Interval, true) Ops1 = insert_entry_ops(vhost_msg_stats, vhost(Name), true, Diff, Ops0,
|| {Size, Interval} <- GPolicies], GPolicies),
_ = case QueueFun(Name) of Ops2 = case QueueFun(Name) of
true -> true ->
[begin QPS =?queue_process_stats(Red),
insert_entry(queue_process_stats, Name, TS, ?queue_process_stats(Red), O1 = insert_entry_ops(queue_process_stats, Name, false, QPS,
Size, Interval, false), Ops1, BPolicies),
insert_entry(queue_msg_stats, Name, TS, ?queue_msg_stats(Ready, Unack, Msgs), QMS = ?queue_msg_stats(Ready, Unack, Msgs),
Size, Interval, false) insert_entry_ops(queue_msg_stats, Name, false, QMS,
end || {Size, Interval} <- BPolicies]; O1, BPolicies);
_ -> _ ->
ok Ops1
end, end,
State1 = State#state{old_aggr_stats = dict:erase(Name, Old)}, State1 = State#state{old_aggr_stats = dict:erase(Name, Old)},
{insert_old_aggr_stats(NextStats, Name, Stats), State1}; {insert_old_aggr_stats(NextStats, Name, Stats), Ops2, State1};
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = node_metrics} = State) -> aggregate_entry({Id, Metrics}, NextStats, Ops0,
ets:insert(node_stats, {Id, Metrics}), #state{table = node_metrics} = State) ->
{NextStats, State}; Ops = insert_op(node_stats, Id, {Id, Metrics}, Ops0),
aggregate_entry(TS, {Id, Metrics}, NextStats, {NextStats, Ops, State};
aggregate_entry({Id, Metrics}, NextStats, Ops0,
#state{table = node_coarse_metrics, #state{table = node_coarse_metrics,
policies = {_, _, GPolicies}} = State) -> policies = {_, _, GPolicies}} = State) ->
Stats = ?node_coarse_stats( Stats = ?node_coarse_stats(
pget(fd_used, Metrics, 0), pget(sockets_used, Metrics, 0), pget(fd_used, Metrics, 0), pget(sockets_used, Metrics, 0),
pget(mem_used, Metrics, 0), pget(disk_free, Metrics, 0), pget(mem_used, Metrics, 0), pget(disk_free, Metrics, 0),
pget(proc_used, Metrics, 0), pget(gc_num, Metrics, 0), pget(proc_used, Metrics, 0), pget(gc_num, Metrics, 0),
pget(gc_bytes_reclaimed, Metrics, 0), pget(context_switches, Metrics, 0)), pget(gc_bytes_reclaimed, Metrics, 0),
[insert_entry(node_coarse_stats, Id, TS, Stats, Size, Interval, false) pget(context_switches, Metrics, 0)),
|| {Size, Interval} <- GPolicies], Ops = insert_entry_ops(node_coarse_stats, Id, false, Stats, Ops0,
{NextStats, State}; GPolicies),
aggregate_entry(TS, {Id, Metrics}, NextStats, {NextStats, Ops, State};
aggregate_entry({Id, Metrics}, NextStats, Ops0,
#state{table = node_persister_metrics, #state{table = node_persister_metrics,
policies = {_, _, GPolicies}} = State) -> policies = {_, _, GPolicies}} = State) ->
Stats = ?node_persister_stats( Stats = ?node_persister_stats(
pget(io_read_count, Metrics, 0), pget(io_read_bytes, Metrics, 0), pget(io_read_count, Metrics, 0), pget(io_read_bytes, Metrics, 0),
pget(io_read_time, Metrics, 0), pget(io_write_count, Metrics, 0), pget(io_read_time, Metrics, 0), pget(io_write_count, Metrics, 0),
pget(io_write_bytes, Metrics, 0), pget(io_write_time, Metrics, 0), pget(io_write_bytes, Metrics, 0), pget(io_write_time, Metrics, 0),
pget(io_sync_count, Metrics, 0), pget(io_sync_time, Metrics, 0), pget(io_sync_count, Metrics, 0), pget(io_sync_time, Metrics, 0),
pget(io_seek_count, Metrics, 0), pget(io_seek_time, Metrics, 0), pget(io_seek_count, Metrics, 0), pget(io_seek_time, Metrics, 0),
pget(io_reopen_count, Metrics, 0), pget(mnesia_ram_tx_count, Metrics, 0), pget(io_reopen_count, Metrics, 0), pget(mnesia_ram_tx_count, Metrics, 0),
pget(mnesia_disk_tx_count, Metrics, 0), pget(msg_store_read_count, Metrics, 0), pget(mnesia_disk_tx_count, Metrics, 0), pget(msg_store_read_count, Metrics, 0),
pget(msg_store_write_count, Metrics, 0), pget(msg_store_write_count, Metrics, 0),
pget(queue_index_journal_write_count, Metrics, 0), pget(queue_index_journal_write_count, Metrics, 0),
pget(queue_index_write_count, Metrics, 0), pget(queue_index_read_count, Metrics, 0), pget(queue_index_write_count, Metrics, 0), pget(queue_index_read_count, Metrics, 0),
pget(io_file_handle_open_attempt_count, Metrics, 0), pget(io_file_handle_open_attempt_count, Metrics, 0),
pget(io_file_handle_open_attempt_time, Metrics, 0)), pget(io_file_handle_open_attempt_time, Metrics, 0)),
[insert_entry(node_persister_stats, Id, TS, Stats, Size, Interval, false) Ops = insert_entry_ops(node_persister_stats, Id, false, Stats, Ops0,
|| {Size, Interval} <- GPolicies], GPolicies),
{NextStats, State}; {NextStats, Ops, State};
aggregate_entry(TS, {Id, Metrics}, NextStats, aggregate_entry({Id, Metrics}, NextStats, Ops0,
#state{table = node_node_metrics, #state{table = node_node_metrics,
policies = {_, _, GPolicies}} = State) -> policies = {_, _, GPolicies}} = State) ->
Stats = ?node_node_coarse_stats(pget(send_bytes, Metrics, 0), pget(recv_bytes, Metrics, 0)), Stats = ?node_node_coarse_stats(pget(send_bytes, Metrics, 0),
CleanMetrics = lists:keydelete(recv_bytes, 1, lists:keydelete(send_bytes, 1, Metrics)), pget(recv_bytes, Metrics, 0)),
ets:insert(node_node_stats, ?node_node_stats(Id, CleanMetrics)), CleanMetrics = lists:keydelete(recv_bytes, 1,
[insert_entry(node_node_coarse_stats, Id, TS, Stats, Size, Interval, false) lists:keydelete(send_bytes, 1, Metrics)),
|| {Size, Interval} <- GPolicies], Ops1 = insert_op(node_node_stats, Id, ?node_node_stats(Id, CleanMetrics),
{NextStats, State}. Ops0),
Ops = insert_entry_ops(node_node_coarse_stats, Id, false, Stats, Ops1,
GPolicies),
{NextStats, Ops, State}.
insert_entry(Table, Id, TS, Entry, Size, Interval0, Incremental) -> insert_entry(Table, Id, TS, Entry, Size, Interval0, Incremental) ->
Key = {Id, Interval0}, Key = {Id, Interval0},
@ -470,7 +501,40 @@ insert_entry(Table, Id, TS, Entry, Size, Interval0, Incremental) ->
{max_n, ceil(Size / Interval0) + 1}, {max_n, ceil(Size / Interval0) + 1},
{incremental, Incremental}]) {incremental, Incremental}])
end, end,
insert_with_index(Table, Key, {Key, exometer_slide:add_element(TS, Entry, Slide)}). insert_with_index(Table, Key, {Key, exometer_slide:add_element(TS, Entry,
Slide)}).
update_op(Table, Key, Op, Ops) ->
TableOps = case dict:find(Table, Ops) of
{ok, Inner} ->
dict:store(Key, Op, Inner);
error ->
Inner = dict:new(),
dict:store(Key, Op, Inner)
end,
dict:store(Table, TableOps, Ops).
insert_with_index_op(Table, Key, Entry, Ops) ->
update_op(Table, Key, {insert_with_index, Entry}, Ops).
insert_op(Table, Key, Entry, Ops) ->
update_op(Table, Key, {insert, Entry}, Ops).
insert_entry_op(Table, Key, Entry, Ops) ->
TableOps0 = case dict:find(Table, Ops) of
{ok, Inner} -> Inner;
error -> dict:new()
end,
TableOps = dict:update(Key, fun({insert_entry, Entry0}) ->
{insert_entry, sum_entry(Entry0, Entry)}
end, {insert_entry, Entry}, TableOps0),
dict:store(Table, TableOps, Ops).
insert_entry_ops(Table, Id, Incr, Entry, Ops, Policies) ->
lists:foldl(fun({Size, Interval}, Acc) ->
Key = {Id, Size, Interval, Incr},
insert_entry_op(Table, Key, Entry, Acc)
end, Ops, Policies).
get_difference(Id, Stats, #state{old_aggr_stats = OldStats}) -> get_difference(Id, Stats, #state{old_aggr_stats = OldStats}) ->
case dict:find(Id, OldStats) of case dict:find(Id, OldStats) of
@ -480,6 +544,15 @@ get_difference(Id, Stats, #state{old_aggr_stats = OldStats}) ->
difference(OldStat, Stats) difference(OldStat, Stats)
end. end.
sum_entry({A0}, {B0}) ->
{B0 + A0};
sum_entry({A0, A1}, {B0, B1}) ->
{B0 + A0, B1 + A1};
sum_entry({A0, A1, A2}, {B0, B1, B2}) ->
{B0 + A0, B1 + A1, B2 + A2};
sum_entry({A0, A1, A2, A3, A4, A5, A6}, {B0, B1, B2, B3, B4, B5, B6}) ->
{B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5, B6 + A6}.
difference({A0}, {B0}) -> difference({A0}, {B0}) ->
{B0 - A0}; {B0 - A0};
difference({A0, A1}, {B0, B1}) -> difference({A0, A1}, {B0, B1}) ->