Collect metrics from ets tables directly
This commit is contained in:
parent
49f0c56768
commit
dc99947b06
|
|
@ -32,7 +32,7 @@
|
||||||
-import(rabbit_misc, [pget/3]).
|
-import(rabbit_misc, [pget/3]).
|
||||||
-import(rabbit_mgmt_db, [pget/2, lookup_element/3]).
|
-import(rabbit_mgmt_db, [pget/2, lookup_element/3]).
|
||||||
|
|
||||||
-record(state, {table, agent, policies, rates_mode, lookup_queue, lookup_exchange}).
|
-record(state, {table, interval, policies, rates_mode, lookup_queue, lookup_exchange}).
|
||||||
|
|
||||||
name(Table) ->
|
name(Table) ->
|
||||||
list_to_atom((atom_to_list(Table) ++ "_metrics_collector")).
|
list_to_atom((atom_to_list(Table) ++ "_metrics_collector")).
|
||||||
|
|
@ -54,10 +54,9 @@ init([Table]) ->
|
||||||
{ok, Policies} = application:get_env(
|
{ok, Policies} = application:get_env(
|
||||||
rabbitmq_management, sample_retention_policies),
|
rabbitmq_management, sample_retention_policies),
|
||||||
Policy = retention_policy(Table),
|
Policy = retention_policy(Table),
|
||||||
Interval = take_smaller(proplists:get_value(Policy, Policies)),
|
Interval = take_smaller(proplists:get_value(Policy, Policies)) * 1000,
|
||||||
{ok, Agent} = rabbit_mgmt_agent_collector_sup:start_child(self(), Table,
|
erlang:send_after(Interval, self(), collect_metrics),
|
||||||
Interval * 1000),
|
{ok, #state{table = Table, interval = Interval,
|
||||||
{ok, #state{table = Table, agent = Agent,
|
|
||||||
policies = {proplists:get_value(basic, Policies),
|
policies = {proplists:get_value(basic, Policies),
|
||||||
proplists:get_value(detailed, Policies),
|
proplists:get_value(detailed, Policies),
|
||||||
proplists:get_value(global, Policies)},
|
proplists:get_value(global, Policies)},
|
||||||
|
|
@ -81,12 +80,15 @@ handle_cast({delete_queue, Queue, {R, U, M}}, State = #state{table = queue_coars
|
||||||
[insert_entry(vhost_msg_stats, vhost(Queue), TS, NegStats, Size, Interval, true)
|
[insert_entry(vhost_msg_stats, vhost(Queue), TS, NegStats, Size, Interval, true)
|
||||||
|| {Size, Interval} <- GPolicies],
|
|| {Size, Interval} <- GPolicies],
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
handle_cast({metrics, Timestamp, Records}, State) ->
|
|
||||||
aggregate_metrics(Timestamp, Records, State),
|
|
||||||
{noreply, State};
|
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
|
||||||
|
handle_info(collect_metrics, #state{interval = Interval} = State) ->
|
||||||
|
Timestamp = exometer_slide:timestamp(),
|
||||||
|
aggregate_metrics(Timestamp, State),
|
||||||
|
erlang:send_after(Interval, self(), collect_metrics),
|
||||||
|
{noreply, State};
|
||||||
handle_info(_Msg, State) ->
|
handle_info(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
|
@ -116,8 +118,15 @@ retention_policy(node_node_metrics) -> global.
|
||||||
take_smaller(Policies) ->
|
take_smaller(Policies) ->
|
||||||
lists:min([I || {_, I} <- Policies]).
|
lists:min([I || {_, I} <- Policies]).
|
||||||
|
|
||||||
aggregate_metrics(Timestamp, Records, State) ->
|
aggregate_metrics(Timestamp, State) ->
|
||||||
[aggregate_entry(Timestamp, R, State) || R <- Records].
|
Table = State#state.table,
|
||||||
|
ets:foldl(
|
||||||
|
fun(R, noacc) ->
|
||||||
|
aggregate_entry(Timestamp, R, State),
|
||||||
|
noacc
|
||||||
|
end,
|
||||||
|
noacc,
|
||||||
|
Table).
|
||||||
|
|
||||||
aggregate_entry(_TS, {Id, Metrics}, #state{table = connection_created}) ->
|
aggregate_entry(_TS, {Id, Metrics}, #state{table = connection_created}) ->
|
||||||
Ftd = rabbit_mgmt_format:format(
|
Ftd = rabbit_mgmt_format:format(
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue