Merge branch 'stable'

This commit is contained in:
Michael Klishin 2017-03-03 15:01:20 +03:00
commit 61b7a55152
2 changed files with 293 additions and 219 deletions

View File

@ -2,6 +2,7 @@
.*.sw?
*.beam
.erlang.mk/
*.plt
cover/
deps/
doc/

View File

@ -151,161 +151,183 @@ handle_deleted_queues(_T, _R, _P) -> ok.
aggregate_metrics(Timestamp, #state{table = Table,
policies = {_, _, GPolicies}} = State0) ->
Table = State0#state.table,
{Next, #state{old_aggr_stats = Remainders}} = ets:foldl(
fun(R, {Dict, State}) ->
aggregate_entry(Timestamp, R, Dict, State)
end, {dict:new(), State0}, Table),
{Next, Ops, #state{old_aggr_stats = Remainders}} =
ets:foldl(fun(R, {NextStats, O, State}) ->
aggregate_entry(R, NextStats, O, State)
end, {dict:new(), dict:new(), State0}, Table),
dict:fold(fun(Tbl, TblOps, Acc) ->
_ = exec_table_ops(Tbl, Timestamp, TblOps),
Acc
end, no_acc, Ops),
handle_deleted_queues(Table, Remainders, GPolicies),
State0#state{old_aggr_stats = Next}.
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = connection_created} = State) ->
exec_table_ops(Table, Timestamp, TableOps) ->
dict:fold(fun(_Id, {insert, Entry}, A) ->
ets:insert(Table, Entry),
A;
(Id, {insert_with_index, Entry}, A) ->
insert_with_index(Table, Id, Entry),
A;
({Id, Size, Interval, Incremental},
{insert_entry, Entry}, A) ->
insert_entry(Table, Id, Timestamp, Entry, Size,
Interval, Incremental),
A
end, no_acc, TableOps).
aggregate_entry({Id, Metrics}, NextStats, Ops0,
#state{table = connection_created} = State) ->
Ftd = rabbit_mgmt_format:format(
Metrics,
{fun rabbit_mgmt_format:format_connection_created/1, true}),
ets:insert(connection_created_stats,
?connection_created_stats(Id, pget(name, Ftd, unknown), Ftd)),
{NextStats, State};
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = connection_metrics} = State) ->
ets:insert(connection_stats, ?connection_stats(Id, Metrics)),
{NextStats, State};
aggregate_entry(TS, {Id, RecvOct, SendOct, Reductions, 0}, NextStats,
Entry = ?connection_created_stats(Id, pget(name, Ftd, unknown), Ftd),
Ops = insert_op(connection_created_stats, Id, Entry, Ops0),
{NextStats, Ops, State};
aggregate_entry({Id, Metrics}, NextStats, Ops0,
#state{table = connection_metrics} = State) ->
Entry = ?connection_stats(Id, Metrics),
Ops = insert_op(connection_stats, Id, Entry, Ops0),
{NextStats, Ops, State};
aggregate_entry({Id, RecvOct, SendOct, Reductions, 0}, NextStats, Ops0,
#state{table = connection_coarse_metrics,
policies = {BPolicies, _, GPolicies}} = State) ->
Stats = ?vhost_stats_coarse_conn_stats(RecvOct, SendOct),
Diff = get_difference(Id, Stats, State),
[insert_entry(vhost_stats_coarse_conn_stats, vhost({connection_created, Id}),
TS, Diff, Size, Interval, true) || {Size, Interval} <- GPolicies],
[begin
insert_entry(connection_stats_coarse_conn_stats, Id, TS,
?connection_stats_coarse_conn_stats(RecvOct, SendOct, Reductions),
Size, Interval, false)
end || {Size, Interval} <- BPolicies],
{insert_old_aggr_stats(NextStats, Id, Stats), State};
aggregate_entry(TS, {Id, RecvOct, SendOct, _Reductions, 1}, NextStats,
Ops1 = insert_entry_ops(vhost_stats_coarse_conn_stats,
vhost({connection_created, Id}), true, Diff, Ops0,
GPolicies),
Entry = ?connection_stats_coarse_conn_stats(RecvOct, SendOct, Reductions),
Ops2 = insert_entry_ops(connection_stats_coarse_conn_stats, Id, false, Entry,
Ops1, BPolicies),
{insert_old_aggr_stats(NextStats, Id, Stats), Ops2, State};
aggregate_entry({Id, RecvOct, SendOct, _Reductions, 1}, NextStats, Ops0,
#state{table = connection_coarse_metrics,
policies = {_BPolicies, _, GPolicies}} = State) ->
Stats = ?vhost_stats_coarse_conn_stats(RecvOct, SendOct),
Diff = get_difference(Id, Stats, State),
[insert_entry(vhost_stats_coarse_conn_stats, vhost({connection_created, Id}),
TS, Diff, Size, Interval, true) || {Size, Interval} <- GPolicies],
Ops1 = insert_entry_ops(vhost_stats_coarse_conn_stats,
vhost({connection_created, Id}), true, Diff, Ops0,
GPolicies),
rabbit_core_metrics:delete(connection_coarse_metrics, Id),
{NextStats, State};
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = channel_created} = State) ->
{NextStats, Ops1, State};
aggregate_entry({Id, Metrics}, NextStats, Ops0,
#state{table = channel_created} = State) ->
Ftd = rabbit_mgmt_format:format(Metrics, {[], false}),
ets:insert(channel_created_stats,
?channel_created_stats(Id, pget(name, Ftd, unknown), Ftd)),
{NextStats, State};
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = channel_metrics} = State) ->
Entry = ?channel_created_stats(Id, pget(name, Ftd, unknown), Ftd),
Ops = insert_op(channel_created_stats, Id, Entry, Ops0),
{NextStats, Ops, State};
aggregate_entry({Id, Metrics}, NextStats, Ops0,
#state{table = channel_metrics} = State) ->
Ftd = rabbit_mgmt_format:format(Metrics,
{fun rabbit_mgmt_format:format_channel_stats/1, true}),
ets:insert(channel_stats, ?channel_stats(Id, Ftd)),
{NextStats, State};
aggregate_entry(TS, {{Ch, X} = Id, Publish0, Confirm, ReturnUnroutable, 0}, NextStats,
Entry = ?channel_stats(Id, Ftd),
Ops = insert_op(channel_stats, Id, Entry, Ops0),
{NextStats, Ops, State};
aggregate_entry({{Ch, X} = Id, Publish0, Confirm, ReturnUnroutable, 0},
NextStats, Ops0,
#state{table = channel_exchange_metrics,
policies = {BPolicies, DPolicies, GPolicies},
rates_mode = RatesMode,
lookup_exchange = ExchangeFun} = State) ->
Stats = ?channel_stats_fine_stats(Publish0, Confirm, ReturnUnroutable),
{Publish, _, _} = Diff = get_difference(Id, Stats, State),
[begin
insert_entry(channel_stats_fine_stats, Ch, TS, Diff, Size, Interval,
true)
end || {Size, Interval} <- BPolicies],
[begin
insert_entry(vhost_stats_fine_stats, vhost(X), TS, Diff, Size,
Interval, true)
end || {Size, Interval} <- GPolicies],
_ = case {ExchangeFun(X), RatesMode} of
{true, basic} ->
[insert_entry(exchange_stats_publish_in, X, TS,
?exchange_stats_publish_in(Publish), Size, Interval, true)
|| {Size, Interval} <- DPolicies];
{true, _} ->
[begin
insert_entry(exchange_stats_publish_in, X, TS,
?exchange_stats_publish_in(Publish), Size, Interval, true),
insert_entry(channel_exchange_stats_fine_stats, Id, TS, Stats,
Size, Interval, false)
end || {Size, Interval} <- DPolicies];
_ ->
ok
end,
{insert_old_aggr_stats(NextStats, Id, Stats), State};
aggregate_entry(TS, {{_Ch, X} = Id, Publish0, Confirm, ReturnUnroutable, 1}, NextStats,
Ops1 = insert_entry_ops(channel_stats_fine_stats, Ch, true, Diff, Ops0,
BPolicies),
Ops2 = insert_entry_ops(vhost_stats_fine_stats, vhost(X), true, Diff, Ops1,
GPolicies),
Ops3 = case {ExchangeFun(X), RatesMode} of
{true, basic} ->
Entry = ?exchange_stats_publish_in(Publish),
insert_entry_ops(exchange_stats_publish_in, X, true, Entry,
Ops2, DPolicies);
{true, _} ->
Entry = ?exchange_stats_publish_in(Publish),
O = insert_entry_ops(exchange_stats_publish_in, X, true,
Entry, Ops2, DPolicies),
insert_entry_ops(channel_exchange_stats_fine_stats, Id,
false, Stats, O, DPolicies);
_ ->
Ops2
end,
{insert_old_aggr_stats(NextStats, Id, Stats), Ops3, State};
aggregate_entry({{_Ch, X} = Id, Publish0, Confirm, ReturnUnroutable, 1},
NextStats, Ops0,
#state{table = channel_exchange_metrics,
policies = {_BPolicies, DPolicies, GPolicies},
lookup_exchange = ExchangeFun} = State) ->
Stats = ?channel_stats_fine_stats(Publish0, Confirm, ReturnUnroutable),
{Publish, _, _} = Diff = get_difference(Id, Stats, State),
[begin
insert_entry(vhost_stats_fine_stats, vhost(X), TS, Diff, Size,
Interval, true)
end || {Size, Interval} <- GPolicies],
_ = case ExchangeFun(X) of
true ->
[insert_entry(exchange_stats_publish_in, X, TS,
?exchange_stats_publish_in(Publish), Size, Interval, true)
|| {Size, Interval} <- DPolicies];
_ ->
ok
end,
Ops1 = insert_entry_ops(vhost_stats_fine_stats, vhost(X), true, Diff, Ops0,
GPolicies),
Ops2 = case ExchangeFun(X) of
true ->
Entry = ?exchange_stats_publish_in(Publish),
insert_entry_ops(exchange_stats_publish_in, X, true, Entry,
Ops1, DPolicies);
_ ->
Ops1
end,
rabbit_core_metrics:delete(channel_exchange_metrics, Id),
{NextStats, State};
aggregate_entry(TS, {{Ch, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck, Redeliver, Ack, 0},
NextStats,
{NextStats, Ops2, State};
aggregate_entry({{Ch, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck,
Redeliver, Ack, 0}, NextStats, Ops0,
#state{table = channel_queue_metrics,
policies = {BPolicies, DPolicies, GPolicies},
rates_mode = RatesMode,
lookup_queue = QueueFun} = State) ->
Stats = ?vhost_stats_deliver_stats(Get, GetNoAck, Deliver, DeliverNoAck,
Redeliver, Ack,
Deliver + DeliverNoAck + Get + GetNoAck),
Deliver + DeliverNoAck + Get + GetNoAck),
Diff = get_difference(Id, Stats, State),
[begin
insert_entry(vhost_stats_deliver_stats, vhost(Q), TS, Diff, Size,
Interval, true)
end || {Size, Interval} <- GPolicies],
[begin
insert_entry(channel_stats_deliver_stats, Ch, TS, Diff, Size, Interval,
true)
end || {Size, Interval} <- BPolicies],
_ = case {QueueFun(Q), RatesMode} of
{true, basic} ->
[insert_entry(queue_stats_deliver_stats, Q, TS, Diff, Size, Interval,
true) || {Size, Interval} <- BPolicies];
{true, _} ->
[insert_entry(queue_stats_deliver_stats, Q, TS, Diff, Size, Interval,
true) || {Size, Interval} <- BPolicies],
[insert_entry(channel_queue_stats_deliver_stats, Id, TS, Stats, Size,
Interval, false)
|| {Size, Interval} <- DPolicies];
_ ->
ok
end,
{insert_old_aggr_stats(NextStats, Id, Stats), State};
aggregate_entry(TS, {{_, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck, Redeliver, Ack, 1},
NextStats,
Ops1 = insert_entry_ops(vhost_stats_deliver_stats, vhost(Q), true, Diff,
Ops0, GPolicies),
Ops2 = insert_entry_ops(channel_stats_deliver_stats, Ch, true, Diff, Ops1,
BPolicies),
Ops3 = case {QueueFun(Q), RatesMode} of
{true, basic} ->
insert_entry_ops(queue_stats_deliver_stats, Q, true, Diff,
Ops2, BPolicies);
{true, _} ->
O = insert_entry_ops(queue_stats_deliver_stats, Q, true,
Diff, Ops2, BPolicies),
insert_entry_ops(channel_queue_stats_deliver_stats, Id,
false, Stats, O, DPolicies);
_ ->
Ops2
end,
{insert_old_aggr_stats(NextStats, Id, Stats), Ops3, State};
aggregate_entry({{_, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck,
Redeliver, Ack, 1}, NextStats, Ops0,
#state{table = channel_queue_metrics,
policies = {BPolicies, _, GPolicies},
lookup_queue = QueueFun} = State) ->
Stats = ?vhost_stats_deliver_stats(Get, GetNoAck, Deliver, DeliverNoAck,
Redeliver, Ack,
Deliver + DeliverNoAck + Get + GetNoAck),
Deliver + DeliverNoAck + Get + GetNoAck),
Diff = get_difference(Id, Stats, State),
[begin
insert_entry(vhost_stats_deliver_stats, vhost(Q), TS, Diff, Size,
Interval, true)
end || {Size, Interval} <- GPolicies],
_ = case QueueFun(Q) of
true ->
[insert_entry(queue_stats_deliver_stats, Q, TS, Diff, Size, Interval,
true) || {Size, Interval} <- BPolicies];
_ ->
ok
end,
Ops1 = insert_entry_ops(vhost_stats_deliver_stats, vhost(Q), true, Diff,
Ops0, GPolicies),
Ops2 = case QueueFun(Q) of
true ->
insert_entry_ops(queue_stats_deliver_stats, Q, true, Diff,
Ops1, BPolicies);
_ ->
Ops1
end,
rabbit_core_metrics:delete(channel_queue_metrics, Id),
{NextStats, State};
aggregate_entry(TS, {{_Ch, {Q, X}} = Id, Publish, ToDelete}, NextStats,
{NextStats, Ops2, State};
aggregate_entry({{_Ch, {Q, X}} = Id, Publish, ToDelete}, NextStats, Ops0,
#state{table = channel_queue_exchange_metrics,
policies = {BPolicies, _, _},
rates_mode = RatesMode,
@ -313,148 +335,157 @@ aggregate_entry(TS, {{_Ch, {Q, X}} = Id, Publish, ToDelete}, NextStats,
lookup_exchange = ExchangeFun} = State) ->
Stats = ?queue_stats_publish(Publish),
Diff = get_difference(Id, Stats, State),
_ = case {QueueFun(Q), ExchangeFun(X), RatesMode, ToDelete} of
{true, false, _, _} ->
[insert_entry(queue_stats_publish, Q, TS, Diff, Size, Interval, true)
|| {Size, Interval} <- BPolicies];
{false, true, _, _} ->
[insert_entry(exchange_stats_publish_out, X, TS, Diff, Size, Interval, true)
|| {Size, Interval} <- BPolicies];
{true, true, basic, _} ->
[begin
insert_entry(queue_stats_publish, Q, TS, Diff, Size, Interval, true),
insert_entry(exchange_stats_publish_out, X, TS, Diff, Size, Interval, true)
end || {Size, Interval} <- BPolicies];
{true, true, _, 0} ->
[begin
insert_entry(queue_stats_publish, Q, TS, Diff, Size, Interval, true),
insert_entry(exchange_stats_publish_out, X, TS, Diff, Size, Interval, true),
insert_entry(queue_exchange_stats_publish, {Q, X}, TS, Diff, Size, Interval, true)
end || {Size, Interval} <- BPolicies];
{true, true, _, 1} ->
[begin
insert_entry(queue_stats_publish, Q, TS, Diff, Size, Interval, true),
insert_entry(exchange_stats_publish_out, X, TS, Diff, Size, Interval, true)
end || {Size, Interval} <- BPolicies];
_ ->
ok
end,
Ops1 = case {QueueFun(Q), ExchangeFun(X), RatesMode, ToDelete} of
{true, false, _, _} ->
insert_entry_ops(queue_stats_publish, Q, true, Diff,
Ops0, BPolicies);
{false, true, _, _} ->
insert_entry_ops(exchange_stats_publish_out, X, true, Diff,
Ops0, BPolicies);
{true, true, basic, _} ->
O = insert_entry_ops(queue_stats_publish, Q, true, Diff,
Ops0, BPolicies),
insert_entry_ops(exchange_stats_publish_out, X, true, Diff,
O, BPolicies);
{true, true, _, 0} ->
O1 = insert_entry_ops(queue_stats_publish, Q, true, Diff,
Ops0, BPolicies),
O2 = insert_entry_ops(exchange_stats_publish_out, X, true,
Diff, O1, BPolicies),
insert_entry_ops(queue_exchange_stats_publish, {Q, X},
true, Diff, O2, BPolicies);
{true, true, _, 1} ->
O = insert_entry_ops(queue_stats_publish, Q, true, Diff,
Ops0, BPolicies),
insert_entry_ops(exchange_stats_publish_out, X, true,
Diff, O, BPolicies);
_ ->
Ops0
end,
case ToDelete of
0 ->
{insert_old_aggr_stats(NextStats, Id, Stats), State};
{insert_old_aggr_stats(NextStats, Id, Stats), Ops1, State};
1 ->
rabbit_core_metrics:delete(channel_queue_exchange_metrics, Id),
{NextStats, State}
{NextStats, Ops1, State}
end;
aggregate_entry(TS, {Id, Reductions}, NextStats,
aggregate_entry({Id, Reductions}, NextStats, Ops0,
#state{table = channel_process_metrics,
policies = {BPolicies, _, _}} = State) ->
[begin
insert_entry(channel_process_stats, Id, TS, ?channel_process_stats(Reductions),
Size, Interval, false)
end || {Size, Interval} <- BPolicies],
{NextStats, State};
aggregate_entry(_TS, {Id, Exclusive, AckRequired, PrefetchCount, Args}, NextStats,
Entry = ?channel_process_stats(Reductions),
Ops = insert_entry_ops(channel_process_stats, Id, false,
Entry, Ops0, BPolicies),
{NextStats, Ops, State};
aggregate_entry({Id, Exclusive, AckRequired, PrefetchCount, Args},
NextStats, Ops0,
#state{table = consumer_created} = State) ->
Fmt = rabbit_mgmt_format:format([{exclusive, Exclusive},
{ack_required, AckRequired},
{prefetch_count, PrefetchCount},
{arguments, Args}], {[], false}),
insert_with_index(consumer_stats, Id, ?consumer_stats(Id, Fmt)),
{NextStats, State};
aggregate_entry(TS, {Id, Metrics, 0}, NextStats,
Entry = ?consumer_stats(Id, Fmt),
Ops = insert_with_index_op(consumer_stats, Id, Entry, Ops0),
{NextStats, Ops ,State};
aggregate_entry({Id, Metrics, 0}, NextStats, Ops0,
#state{table = queue_metrics,
policies = {BPolicies, _, GPolicies},
lookup_queue = QueueFun} = State) ->
Stats = ?queue_msg_rates(pget(disk_reads, Metrics, 0), pget(disk_writes, Metrics, 0)),
Stats = ?queue_msg_rates(pget(disk_reads, Metrics, 0),
pget(disk_writes, Metrics, 0)),
Diff = get_difference(Id, Stats, State),
[insert_entry(vhost_msg_rates, vhost(Id), TS, Diff, Size, Interval, true)
|| {Size, Interval} <- GPolicies],
case QueueFun(Id) of
true ->
[insert_entry(queue_msg_rates, Id, TS, Stats, Size, Interval, false)
|| {Size, Interval} <- BPolicies],
Fmt = rabbit_mgmt_format:format(
Metrics,
{fun rabbit_mgmt_format:format_queue_stats/1, false}),
ets:insert(queue_stats, ?queue_stats(Id, Fmt));
false ->
ok
end,
{insert_old_aggr_stats(NextStats, Id, Stats), State};
aggregate_entry(TS, {Id, Metrics, 1}, NextStats,
Ops1 = insert_entry_ops(vhost_msg_rates, vhost(Id), true, Diff, Ops0,
GPolicies),
Ops2 = case QueueFun(Id) of
true ->
O = insert_entry_ops(queue_msg_rates, Id, false, Stats, Ops1,
BPolicies),
Fmt = rabbit_mgmt_format:format(
Metrics,
{fun rabbit_mgmt_format:format_queue_stats/1, false}),
insert_op(queue_stats, Id, ?queue_stats(Id, Fmt), O);
false ->
Ops1
end,
{insert_old_aggr_stats(NextStats, Id, Stats), Ops2, State};
aggregate_entry({Id, Metrics, 1}, NextStats, Ops0,
#state{table = queue_metrics,
policies = {_, _, GPolicies}} = State) ->
Stats = ?queue_msg_rates(pget(disk_reads, Metrics, 0), pget(disk_writes, Metrics, 0)),
Stats = ?queue_msg_rates(pget(disk_reads, Metrics, 0),
pget(disk_writes, Metrics, 0)),
Diff = get_difference(Id, Stats, State),
[insert_entry(vhost_msg_rates, vhost(Id), TS, Diff, Size, Interval, true)
|| {Size, Interval} <- GPolicies],
Ops = insert_entry_ops(vhost_msg_rates, vhost(Id), true, Diff, Ops0,
GPolicies),
rabbit_core_metrics:delete(queue_metrics, Id),
{NextStats, State};
aggregate_entry(TS, {Name, Ready, Unack, Msgs, Red}, NextStats,
{NextStats, Ops, State};
aggregate_entry({Name, Ready, Unack, Msgs, Red}, NextStats, Ops0,
#state{table = queue_coarse_metrics,
old_aggr_stats = Old,
policies = {BPolicies, _, GPolicies},
lookup_queue = QueueFun} = State) ->
Stats = ?vhost_msg_stats(Ready, Unack, Msgs),
Diff = get_difference(Name, Stats, State),
[insert_entry(vhost_msg_stats, vhost(Name), TS, Diff, Size, Interval, true)
|| {Size, Interval} <- GPolicies],
_ = case QueueFun(Name) of
true ->
[begin
insert_entry(queue_process_stats, Name, TS, ?queue_process_stats(Red),
Size, Interval, false),
insert_entry(queue_msg_stats, Name, TS, ?queue_msg_stats(Ready, Unack, Msgs),
Size, Interval, false)
end || {Size, Interval} <- BPolicies];
_ ->
ok
end,
Ops1 = insert_entry_ops(vhost_msg_stats, vhost(Name), true, Diff, Ops0,
GPolicies),
Ops2 = case QueueFun(Name) of
true ->
QPS =?queue_process_stats(Red),
O1 = insert_entry_ops(queue_process_stats, Name, false, QPS,
Ops1, BPolicies),
QMS = ?queue_msg_stats(Ready, Unack, Msgs),
insert_entry_ops(queue_msg_stats, Name, false, QMS,
O1, BPolicies);
_ ->
Ops1
end,
State1 = State#state{old_aggr_stats = dict:erase(Name, Old)},
{insert_old_aggr_stats(NextStats, Name, Stats), State1};
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = node_metrics} = State) ->
ets:insert(node_stats, {Id, Metrics}),
{NextStats, State};
aggregate_entry(TS, {Id, Metrics}, NextStats,
{insert_old_aggr_stats(NextStats, Name, Stats), Ops2, State1};
aggregate_entry({Id, Metrics}, NextStats, Ops0,
#state{table = node_metrics} = State) ->
Ops = insert_op(node_stats, Id, {Id, Metrics}, Ops0),
{NextStats, Ops, State};
aggregate_entry({Id, Metrics}, NextStats, Ops0,
#state{table = node_coarse_metrics,
policies = {_, _, GPolicies}} = State) ->
Stats = ?node_coarse_stats(
pget(fd_used, Metrics, 0), pget(sockets_used, Metrics, 0),
pget(mem_used, Metrics, 0), pget(disk_free, Metrics, 0),
pget(proc_used, Metrics, 0), pget(gc_num, Metrics, 0),
pget(gc_bytes_reclaimed, Metrics, 0), pget(context_switches, Metrics, 0)),
[insert_entry(node_coarse_stats, Id, TS, Stats, Size, Interval, false)
|| {Size, Interval} <- GPolicies],
{NextStats, State};
aggregate_entry(TS, {Id, Metrics}, NextStats,
pget(fd_used, Metrics, 0), pget(sockets_used, Metrics, 0),
pget(mem_used, Metrics, 0), pget(disk_free, Metrics, 0),
pget(proc_used, Metrics, 0), pget(gc_num, Metrics, 0),
pget(gc_bytes_reclaimed, Metrics, 0),
pget(context_switches, Metrics, 0)),
Ops = insert_entry_ops(node_coarse_stats, Id, false, Stats, Ops0,
GPolicies),
{NextStats, Ops, State};
aggregate_entry({Id, Metrics}, NextStats, Ops0,
#state{table = node_persister_metrics,
policies = {_, _, GPolicies}} = State) ->
Stats = ?node_persister_stats(
pget(io_read_count, Metrics, 0), pget(io_read_bytes, Metrics, 0),
pget(io_read_time, Metrics, 0), pget(io_write_count, Metrics, 0),
pget(io_write_bytes, Metrics, 0), pget(io_write_time, Metrics, 0),
pget(io_sync_count, Metrics, 0), pget(io_sync_time, Metrics, 0),
pget(io_seek_count, Metrics, 0), pget(io_seek_time, Metrics, 0),
pget(io_reopen_count, Metrics, 0), pget(mnesia_ram_tx_count, Metrics, 0),
pget(mnesia_disk_tx_count, Metrics, 0), pget(msg_store_read_count, Metrics, 0),
pget(msg_store_write_count, Metrics, 0),
pget(queue_index_journal_write_count, Metrics, 0),
pget(queue_index_write_count, Metrics, 0), pget(queue_index_read_count, Metrics, 0),
pget(io_file_handle_open_attempt_count, Metrics, 0),
pget(io_file_handle_open_attempt_time, Metrics, 0)),
[insert_entry(node_persister_stats, Id, TS, Stats, Size, Interval, false)
|| {Size, Interval} <- GPolicies],
{NextStats, State};
aggregate_entry(TS, {Id, Metrics}, NextStats,
pget(io_read_count, Metrics, 0), pget(io_read_bytes, Metrics, 0),
pget(io_read_time, Metrics, 0), pget(io_write_count, Metrics, 0),
pget(io_write_bytes, Metrics, 0), pget(io_write_time, Metrics, 0),
pget(io_sync_count, Metrics, 0), pget(io_sync_time, Metrics, 0),
pget(io_seek_count, Metrics, 0), pget(io_seek_time, Metrics, 0),
pget(io_reopen_count, Metrics, 0), pget(mnesia_ram_tx_count, Metrics, 0),
pget(mnesia_disk_tx_count, Metrics, 0), pget(msg_store_read_count, Metrics, 0),
pget(msg_store_write_count, Metrics, 0),
pget(queue_index_journal_write_count, Metrics, 0),
pget(queue_index_write_count, Metrics, 0), pget(queue_index_read_count, Metrics, 0),
pget(io_file_handle_open_attempt_count, Metrics, 0),
pget(io_file_handle_open_attempt_time, Metrics, 0)),
Ops = insert_entry_ops(node_persister_stats, Id, false, Stats, Ops0,
GPolicies),
{NextStats, Ops, State};
aggregate_entry({Id, Metrics}, NextStats, Ops0,
#state{table = node_node_metrics,
policies = {_, _, GPolicies}} = State) ->
Stats = ?node_node_coarse_stats(pget(send_bytes, Metrics, 0), pget(recv_bytes, Metrics, 0)),
CleanMetrics = lists:keydelete(recv_bytes, 1, lists:keydelete(send_bytes, 1, Metrics)),
ets:insert(node_node_stats, ?node_node_stats(Id, CleanMetrics)),
[insert_entry(node_node_coarse_stats, Id, TS, Stats, Size, Interval, false)
|| {Size, Interval} <- GPolicies],
{NextStats, State}.
Stats = ?node_node_coarse_stats(pget(send_bytes, Metrics, 0),
pget(recv_bytes, Metrics, 0)),
CleanMetrics = lists:keydelete(recv_bytes, 1,
lists:keydelete(send_bytes, 1, Metrics)),
Ops1 = insert_op(node_node_stats, Id, ?node_node_stats(Id, CleanMetrics),
Ops0),
Ops = insert_entry_ops(node_node_coarse_stats, Id, false, Stats, Ops1,
GPolicies),
{NextStats, Ops, State}.
insert_entry(Table, Id, TS, Entry, Size, Interval0, Incremental) ->
Key = {Id, Interval0},
@ -470,7 +501,40 @@ insert_entry(Table, Id, TS, Entry, Size, Interval0, Incremental) ->
{max_n, ceil(Size / Interval0) + 1},
{incremental, Incremental}])
end,
insert_with_index(Table, Key, {Key, exometer_slide:add_element(TS, Entry, Slide)}).
insert_with_index(Table, Key, {Key, exometer_slide:add_element(TS, Entry,
Slide)}).
update_op(Table, Key, Op, Ops) ->
TableOps = case dict:find(Table, Ops) of
{ok, Inner} ->
dict:store(Key, Op, Inner);
error ->
Inner = dict:new(),
dict:store(Key, Op, Inner)
end,
dict:store(Table, TableOps, Ops).
insert_with_index_op(Table, Key, Entry, Ops) ->
update_op(Table, Key, {insert_with_index, Entry}, Ops).
insert_op(Table, Key, Entry, Ops) ->
update_op(Table, Key, {insert, Entry}, Ops).
insert_entry_op(Table, Key, Entry, Ops) ->
TableOps0 = case dict:find(Table, Ops) of
{ok, Inner} -> Inner;
error -> dict:new()
end,
TableOps = dict:update(Key, fun({insert_entry, Entry0}) ->
{insert_entry, sum_entry(Entry0, Entry)}
end, {insert_entry, Entry}, TableOps0),
dict:store(Table, TableOps, Ops).
insert_entry_ops(Table, Id, Incr, Entry, Ops, Policies) ->
lists:foldl(fun({Size, Interval}, Acc) ->
Key = {Id, Size, Interval, Incr},
insert_entry_op(Table, Key, Entry, Acc)
end, Ops, Policies).
get_difference(Id, Stats, #state{old_aggr_stats = OldStats}) ->
case dict:find(Id, OldStats) of
@ -480,6 +544,15 @@ get_difference(Id, Stats, #state{old_aggr_stats = OldStats}) ->
difference(OldStat, Stats)
end.
sum_entry({A0}, {B0}) ->
{B0 + A0};
sum_entry({A0, A1}, {B0, B1}) ->
{B0 + A0, B1 + A1};
sum_entry({A0, A1, A2}, {B0, B1, B2}) ->
{B0 + A0, B1 + A1, B2 + A2};
sum_entry({A0, A1, A2, A3, A4, A5, A6}, {B0, B1, B2, B3, B4, B5, B6}) ->
{B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5, B6 + A6}.
difference({A0}, {B0}) ->
{B0 - A0};
difference({A0, A1}, {B0, B1}) ->