Implement detailed stats for queue

Made strip_pids recursive
This commit is contained in:
kjnilsson 2016-09-30 09:26:14 +01:00
parent 13275fd1ad
commit d6b4c6087d
7 changed files with 94 additions and 53 deletions

View File

@ -44,6 +44,7 @@
-type range() :: #range{} | no_range.
-type ranges() :: {range(), range(), range(), range()}.
-type fun_or_mfa() :: fun((pid()) -> any()) | mfa().
-type lookup_key() :: atom() | {atom(), any()}.
%% The management database listens to events broadcast via the
%% rabbit_event mechanism, and responds to queries from the various
@ -275,7 +276,7 @@ handle_call({get_overview, User, Ranges}, _From,
reply([{message_stats, MessageStats},
{queue_totals, QueueStats},
{object_totals, ObjectTotals},
{statistics_db_event_queue, event_queue()}],
{statistics_db_event_queue, event_queue()}], % TODO: event queue?
State);
handle_call(_Request, _From, State) ->
@ -384,14 +385,13 @@ detail_queue_stats(Ranges, Objs, Interval) ->
Props = dict:fetch(queue_stats, QueueData),
Stats = queue_stats(QueueData, Ranges, Interval),
Consumers = [{consumer_details, dict:fetch(consumer_stats, QueueData)}],
% TODO deliveries and incoming details stats
StatsD = [{deliveries, detail_stats(channel_queue_stats_deliver_stats,
deliver_get, second(Id), Ranges,
Interval)},
{incoming, detail_stats(queue_exchange_stats_publish,
fine_stats, first(Id), Ranges,
Interval)}],
% TODO: augment_msg_stats - does this ever do anything?
StatsD = [{deliveries,
detail_stats_delegated(QueueData, channel_queue_stats_deliver_stats,
deliver_get, second(Id), Ranges, Interval)},
{incoming,
detail_stats_delegated(QueueData, queue_exchange_stats_publish,
fine_stats, first(Id), Ranges, Interval)}],
% TODO: augment_msg_stats - does this ever actually add anything useful?
{Pid, augment_msg_stats(combine(Props, Obj)) ++ Stats ++ StatsD ++ Consumers}
end || Obj <- Objs]),
@ -399,7 +399,13 @@ detail_queue_stats(Ranges, Objs, Interval) ->
ChPids = lists:usort(get_pids_for_missing_channel_details(QueueStats)),
ChDets = get_channel_detail_lookup(ChPids),
Merged = merge_channel_details(QueueStats, ChDets),
[rabbit_mgmt_format:clean_consumer_details(QS) || QS <- Merged].
Merged.
detail_stats_delegated(Lookup, Table, Type, Id, Ranges, Interval) ->
[begin
S = format_range(Lookup, {Table, Key}, pick_range(Type, Ranges), Interval),
[{stats, S} | format_detail_id(revert(Id, Key))]
end || {{T, Key}, _} <- dict:to_list(Lookup), T =:= Table].
queue_stats(QueueData, Ranges, Interval) ->
message_stats(format_range(QueueData, queue_stats_publish,
@ -419,10 +425,14 @@ channel_stats(ChannelData, Ranges, Interval) ->
format_range(ChannelData, channel_process_stats,
pick_range(process_stats, Ranges), Interval).
-spec format_range(slide_data(), atom(), range(), integer()) -> [any()].
format_range(Data, Table, Range, Interval) ->
InstantRateFun = fun() -> element(1, dict:fetch(Table, Data)) end,
SamplesFun = fun() -> element(2, dict:fetch(Table, Data)) end,
-spec format_range(slide_data(), lookup_key(), range(), integer()) -> [any()].
format_range(Data, Key, Range, Interval) ->
Table = case Key of
{T, _} -> T;
T -> T
end,
InstantRateFun = fun() -> element(1, dict:fetch(Key, Data)) end,
SamplesFun = fun() -> element(2, dict:fetch(Key, Data)) end,
rabbit_mgmt_stats:format_range(Range, Table, Interval, InstantRateFun,
SamplesFun).
@ -534,7 +544,7 @@ detail_channel_stats(Ranges, Objs, Interval) ->
Interval)}],
augment_msg_stats(combine(Props, Obj)) ++ Consumers ++ Stats ++ StatsD
end || Obj <- Objs],
[rabbit_mgmt_format:clean_consumer_details(QS) || QS <- ChannelStats].
rabbit_mgmt_format:strip_pids(ChannelStats).
vhost_stats(Ranges, Objs, Interval) ->
Ids = [id_lookup(vhost_stats, Obj) || Obj <- Objs],
@ -657,10 +667,6 @@ created_stats(Name, Type) ->
created_stats_delegated(Name, Type) ->
Data = delegate_invoke(fun (_) -> created_stats(Name, Type) end),
% case ets:select(Type, [{{'_', '$2', '$3'}, [{'==', Name, '$2'}], ['$3']}]) of
% [] -> not_found;
% [Elem] -> Elem
% end end),
case [X || X <- Data, X =/= not_found] of
[] -> not_found;
[X] -> X
@ -759,8 +765,25 @@ queue_raw_message_data(Ranges, Id) ->
raw_message_data(queue_process_stats, pick_range(process_stats, Ranges), Id),
raw_message_data(queue_msg_stats, pick_range(queue_msg_counts, Ranges), Id)].
queue_raw_deliver_stats_data(Ranges, Id) ->
[raw_message_data2(channel_queue_stats_deliver_stats,
pick_range(deliver_get, Ranges), Key)
|| Key <- rabbit_mgmt_stats:get_keys(channel_queue_stats_deliver_stats, second(Id))] ++
[raw_message_data2(queue_exchange_stats_publish,
pick_range(fine_stats, Ranges), Key)
|| Key <- rabbit_mgmt_stats:get_keys(queue_exchange_stats_publish, first(Id))].
raw_message_data2(Table, no_range, Id) ->
SmallSample = rabbit_mgmt_stats:lookup_smaller_sample(Table, Id),
{{Table, Id}, {SmallSample, not_found}};
raw_message_data2(Table, Range, Id) ->
SmallSample = rabbit_mgmt_stats:lookup_smaller_sample(Table, Id),
Samples = rabbit_mgmt_stats:lookup_samples(Table, Id, Range),
{{Table, Id}, {SmallSample, Samples}}.
detail_queue_data(Ranges, Id) ->
dict:from_list(queue_raw_message_data(Ranges, Id) ++
queue_raw_deliver_stats_data(Ranges, Id) ++
[{queue_stats, lookup_element(queue_stats, Id)},
{consumer_stats, get_queue_consumer_stats(Id)}]).
@ -830,25 +853,8 @@ consumers_by_vhost(VHost) ->
[{'orelse', {'==', 'all', VHost}, {'==', VHost, '$1'}}],
['$_']}]).
consumers_by_channel_pids(ChPids) ->
lists:foldl(fun (ChPid, Agg) ->
consumers_by_channel_pid(ChPid) ++ Agg
end, [], ChPids).
consumers_by_channel_pid(ChPid) ->
ets:select(consumer_stats,
[{{{'_', '$1', '_'}, '_'},
[{'==', ChPid, '$1'}],
['$_']}]).
augment_msg_stats(Props) ->
rabbit_mgmt_format:strip_pids(
(augment_msg_stats_fun())(Props) ++ Props).
augment_msg_stats_fun() ->
fun(Props) ->
augment_details(Props, [])
end.
augment_details(Props, []) ++ Props.
augment_details([{_, none} | T], Acc) ->
augment_details(T, Acc);

View File

@ -33,7 +33,7 @@
-export([strip_queue_pids/1]).
-export([clean_consumer_details/1, clean_consumer/1]).
-export([clean_consumer_details/1, clean_channel_details/1]).
-import(rabbit_misc, [pget/2, pset/3]).
@ -441,6 +441,8 @@ strip_pids([{owner_pid, _} | T], Acc) ->
strip_pids(T, Acc);
strip_pids([{channel, _} | T], Acc) ->
strip_pids(T, Acc);
strip_pids([{channel_pid, _} | T], Acc) ->
strip_pids(T, Acc);
strip_pids([{exclusive_consumer_pid, _} | T], Acc) ->
strip_pids(T, Acc);
strip_pids([{slave_pids, ''} | T], Acc) ->
@ -451,6 +453,10 @@ strip_pids([{synchronised_slave_pids, ''} | T], Acc) ->
strip_pids(T, Acc);
strip_pids([{synchronised_slave_pids, Pids} | T], Acc) ->
strip_pids(T, [{synchronised_slave_nodes, [node(Pid) || Pid <- Pids]} | Acc]);
strip_pids([{K, [P|_] = Nested} | T], Acc) when is_tuple(P) -> % recurse
strip_pids(T, [{K, strip_pids(Nested)} | Acc]);
strip_pids([{K, [L|_] = Nested} | T], Acc) when is_list(L) -> % recurse
strip_pids(T, [{K, strip_pids(Nested)} | Acc]);
strip_pids([Any | T], Acc) ->
strip_pids(T, [Any | Acc]);
strip_pids([], Acc) ->
@ -477,20 +483,22 @@ format_null_item([{_K, _V} | _T] = L) ->
format_null_item(Value) ->
Value.
-spec clean_consumer_details(proplists:proplist()) -> proplists:proplist().
clean_consumer_details(Obj) ->
case pget(consumer_details, Obj) of
undefined -> Obj;
Cds ->
Cons = [clean_consumer(Con) || Con <- Cds],
Cons = [clean_channel_details(Con) || Con <- Cds],
pset(consumer_details, Cons, Obj)
end.
clean_consumer(Consumer) ->
Consumer0 = lists:keydelete(channel_pid, 1, Consumer),
case pget(channel_details, Consumer0) of
undefined -> Consumer0;
-spec clean_channel_details(proplists:proplist()) -> proplists:proplist().
clean_channel_details(Obj) ->
Obj0 = lists:keydelete(channel_pid, 1, Obj),
case pget(channel_details, Obj0) of
undefined -> Obj0;
Chd ->
pset(channel_details,
lists:keydelete(pid, 1, Chd),
Consumer0)
Obj0)
end.

View File

@ -195,9 +195,9 @@ aggregate_entry(TS, {{Ch, X} = Id, Metrics}, #state{table = channel_exchange_met
ok
end;
aggregate_entry(TS, {{Ch, Q} = Id, Metrics}, #state{table = channel_queue_metrics,
policies = {_, DPolicies, _},
rates_mode = RatesMode,
lookup_queue = QueueFun}) ->
policies = {_, DPolicies, _},
rates_mode = RatesMode,
lookup_queue = QueueFun}) ->
Deliver = pget(deliver, Metrics, 0),
DeliverNoAck = pget(deliver_no_ack, Metrics, 0),
Get = pget(get, Metrics, 0),

View File

@ -51,7 +51,7 @@ to_json(ReqData, Context = #context{user = User}) ->
VHost -> VHost
end,
Consumers = lists:map(fun rabbit_mgmt_format:clean_consumer/1,
Consumers = rabbit_mgmt_format:strip_pids(
rabbit_mgmt_db:get_all_consumers(Arg)),
rabbit_mgmt_util:reply_list(
filter_user(Consumers, User), ReqData, Context).

View File

@ -96,4 +96,3 @@ queue(VHost, QName) ->
end.
qs_true(Key, ReqData) -> <<"true">> =:= element(1, cowboy_req:qs_val(list_to_binary(Key), ReqData)).

View File

@ -46,7 +46,8 @@ resource_exists(ReqData, Context) ->
to_json(ReqData, Context) ->
try
rabbit_mgmt_util:reply_list_or_paginate(
augmented(ReqData, Context), ReqData, Context)
rabbit_mgmt_format:strip_pids(
augmented(ReqData, Context)), ReqData, Context)
catch
{error, invalid_range_parameters, Reason} ->
rabbit_mgmt_util:bad_request(iolist_to_binary(Reason), ReqData, Context)

View File

@ -43,6 +43,7 @@ groups() ->
queue_with_multiple_consumers,
queue_consumer_cancelled,
queue_consumer_channel_closed,
queue,
queues_single,
queues_multiple,
queues_removed,
@ -74,6 +75,7 @@ merge_app_env(Config) ->
]}),
rabbit_ct_helpers:merge_app_env(Config1,
{rabbitmq_management, [
{rates_mode, detailed},
{sample_retention_policies,
%% List of {MaxAgeInSeconds, SampleEveryNSeconds}
[{global, [{605, 5}, {3660, 60}, {29400, 600}, {86400, 1800}]},
@ -302,6 +304,28 @@ queue_consumer_channel_closed(Config) ->
http_delete(Config, "/queues/%2f/some-queue", ?NO_CONTENT),
ok.
queue(Config) ->
% Nodename2 = get_node_config(Config, 1, nodename),
% QArgs = [{node, list_to_binary(atom_to_list(Nodename2))}],
http_put(Config, "/queues/%2f/some-queue", [], ?CREATED),
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
{ok, Chan2} = amqp_connection:open_channel(Conn),
publish(Chan, <<"some-queue">>),
basic_get(Chan, <<"some-queue">>),
publish(Chan2, <<"some-queue">>),
basic_get(Chan2, <<"some-queue">>),
force_stats(),
timer:sleep(10000),
dump_table(Config, channel_queue_metrics),
Res = http_get(Config, "/queues/%2f/some-queue"),
ct:pal("Res: ~p", [Res]),
rabbit_ct_client_helpers:close_connection(Conn),
http_delete(Config, "/queues/%2f/some-queue", ?NO_CONTENT),
% assert single queue is returned
[_|_] = pget(deliveries, Res),
ok.
queues_single(Config) ->
http_put(Config, "/queues/%2f/some-queue", [], ?CREATED),
force_stats(),
@ -442,7 +466,6 @@ consumers(Config) ->
{ok, Chan2} = amqp_connection:open_channel(Conn),
consume(Chan, <<"some-queue">>),
consume(Chan2, <<"some-queue">>),
trace_fun(Config, [{rabbit_mgmt_db, delegate_invoke}]),
force_stats(),
Res = http_get(Config, "/consumers"),
ct:pal("Consumers: ~p", [Res]),
@ -550,7 +573,6 @@ nodes(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
{ok, Chan2} = amqp_connection:open_channel(Conn),
publish(Chan2, <<"some-queue">>),
trace_fun(Config, [{rabbit_mgmt_external_stats, emit_update}]),
force_stats(),
% force_stats(),
Res = http_get(Config, "/nodes"),
@ -613,6 +635,10 @@ publish(Channel, Key) ->
Publish = #'basic.publish'{routing_key = Key},
amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}).
basic_get(Channel, Queue) ->
Publish = #'basic.get'{queue = Queue},
amqp_channel:call(Channel, Publish).
publish_to(Channel, Exchange, Key) ->
Payload = <<"foobar">>,
Publish = #'basic.publish'{routing_key = Key,
@ -684,7 +710,8 @@ trace_fun(Config, MFs) ->
dbg:n(Nodename1),
dbg:n(Nodename2),
dbg:p(all,c),
[ dbg:tpl(M, F, cx) || {M, F} <- MFs].
[ dbg:tpl(M, F, cx) || {M, F} <- MFs],
[ dbg:tpl(M, F, A, cx) || {M, F, A} <- MFs].
dump_table(Config, Table) ->
Data = rabbit_ct_broker_helpers:rpc(Config, 0, ets, tab2list, [Table]),