(WIP) revert the columns-passing-down-into-the-db thing, but never retrieve consumer details for lists - that's really expensive; the fine stats is much less so. Also refactor a bit.

This commit is contained in:
Simon MacMullen 2011-05-09 18:27:57 +01:00
parent 6197d4c3ae
commit 9f4f3c7f90
8 changed files with 76 additions and 82 deletions

View File

@ -20,9 +20,9 @@
-export([start_link/0]).
-export([get_queues/2, get_queue/2, get_exchanges/2, get_exchange/2,
-export([get_queues/1, get_queue/1, get_exchanges/1, get_exchange/1,
get_connections/0, get_connection/1, get_overview/1,
get_overview/0, get_channels/1, get_channel/2]).
get_overview/0, get_channels/0, get_channel/1]).
%% TODO can these not be exported any more?
-export([pget/2, add/2, rates/5]).
@ -92,16 +92,16 @@ start_link() ->
Else
end.
get_queues(Qs, Cols) -> safe_call({get_queues, Qs, Cols, list}, Qs).
get_queue(Q, Cols) -> safe_call({get_queues, [Q], Cols, detail}, [Q]).
get_exchanges(Xs, Cols) -> safe_call({get_exchanges, Xs, Cols, list}, Xs).
get_exchange(X, Cols) -> safe_call({get_exchanges, [X], Cols, detail}, [X]).
get_connections() -> safe_call(get_connections).
get_connection(Name) -> safe_call({get_connection, Name}).
get_channels(Cols) -> safe_call({get_channels, Cols}).
get_channel(Name, Cols) -> safe_call({get_channel, Name, Cols}).
get_overview(User) -> safe_call({get_overview, User}).
get_overview() -> safe_call({get_overview, all}).
get_queues(Qs) -> safe_call({get_queues, Qs}, Qs).
get_queue(Q) -> safe_call({get_queue, Q}, Q).
get_exchanges(Xs) -> safe_call({get_exchanges, Xs, list}, Xs).
get_exchange(X) -> safe_call({get_exchanges, [X], detail}, [X]).
get_connections() -> safe_call(get_connections).
get_connection(Name) -> safe_call({get_connection, Name}).
get_channels() -> safe_call(get_channels).
get_channel(Name) -> safe_call({get_channel, Name}).
get_overview(User) -> safe_call({get_overview, User}).
get_overview() -> safe_call({get_overview, all}).
safe_call(Term) -> safe_call(Term, []).
@ -204,24 +204,27 @@ init([]) ->
[{Key, ets:new(anon, [private])} ||
Key <- ?TABLES])}}.
handle_call({get_queues, Qs0, Cols, Mode}, _From,
State = #state{tables = Tables}) ->
FineSpecs = case Mode of
list -> ?FINE_STATS_QUEUE_LIST;
detail -> ?FINE_STATS_QUEUE_DETAIL
end,
Qs1 = queue_stats(Qs0, Cols, FineSpecs, Tables),
%% TODO refactor
handle_call({get_queue, Q0}, _From, State = #state{tables = Tables}) ->
Qs1 = detail_queue_stats([Q0], Tables),
Qs2 = [[{messages, add(pget(messages_ready, Q),
pget(messages_unacknowledged, Q))} | Q] || Q <- Qs1],
[Q3] = adjust_hibernated_memory_use(Qs2),
{reply, Q3, State};
handle_call({get_queues, Qs0}, _From, State = #state{tables = Tables}) ->
Qs1 = list_queue_stats(Qs0, Tables),
Qs2 = [[{messages, add(pget(messages_ready, Q),
pget(messages_unacknowledged, Q))} | Q] || Q <- Qs1],
{reply, adjust_hibernated_memory_use(Qs2), State};
handle_call({get_exchanges, Xs, Cols, Mode}, _From,
handle_call({get_exchanges, Xs, Mode}, _From,
State = #state{tables = Tables}) ->
FineSpecs = case Mode of
list -> ?FINE_STATS_EXCHANGE_LIST;
detail -> ?FINE_STATS_EXCHANGE_DETAIL
end,
{reply, exchange_stats(Xs, Cols, FineSpecs, Tables), State};
{reply, exchange_stats(Xs, FineSpecs, Tables), State};
handle_call(get_connections, _From, State = #state{tables = Tables}) ->
Conns = created_events(connection_stats, Tables),
@ -232,14 +235,13 @@ handle_call({get_connection, Name}, _From, State = #state{tables = Tables}) ->
[Res] = connection_stats(Conns, Tables),
{reply, result_or_error(Res), State};
handle_call({get_channels, Cols}, _From, State = #state{tables = Tables}) ->
handle_call(get_channels, _From, State = #state{tables = Tables}) ->
Chs = created_events(channel_stats, Tables),
{reply, channel_stats(Chs, Cols, ?FINE_STATS_CHANNEL_LIST, Tables), State};
{reply, list_channel_stats(Chs, Tables), State};
handle_call({get_channel, Name, Cols}, _From,
State = #state{tables = Tables}) ->
handle_call({get_channel, Name}, _From, State = #state{tables = Tables}) ->
Chs = created_event(Name, channel_stats, Tables),
[Res] = channel_stats(Chs, Cols, ?FINE_STATS_CHANNEL_DETAIL, Tables),
[Res] = detail_channel_stats(Chs, Tables),
{reply, result_or_error(Res), State};
handle_call({get_overview, User}, _From, State = #state{tables = Tables}) ->
@ -477,21 +479,15 @@ basic_stats_fun(Type, Tables) ->
zero_old_rates(lookup_element(Table, {pget(pid, Props), stats}))
end.
fine_stats_fun(Cols, FineSpecs, Tables) ->
case rabbit_mgmt_util:want_column(message_stats, Cols) of
true ->
FineStats = [{AttachName, AttachBy,
get_fine_stats(FineStatsType, GroupBy, Tables)}
|| {FineStatsType, GroupBy, AttachName, AttachBy}
<- FineSpecs],
fun (Props) ->
lists:foldl(fun (FineStat, StatProps) ->
fine_stat(Props, StatProps, FineStat,
Tables)
end, [], FineStats)
end;
false ->
fun nothing/1
fine_stats_fun(FineSpecs, Tables) ->
FineStats = [{AttachName, AttachBy,
get_fine_stats(FineStatsType, GroupBy, Tables)}
|| {FineStatsType, GroupBy, AttachName, AttachBy}
<- FineSpecs],
fun (Props) ->
lists:foldl(fun (FineStat, StatProps) ->
fine_stat(Props, StatProps, FineStat, Tables)
end, [], FineStats)
end.
fine_stat(Props, StatProps, {AttachName, AttachBy, Dict}, Tables) ->
@ -515,19 +511,14 @@ augment_fine_stats(Dict, Tables) when element(1, Dict) == dict ->
augment_fine_stats(Stats, _Tables) ->
Stats.
consumer_details_fun(Cols, PatternFun, Tables) ->
case rabbit_mgmt_util:want_column(consumer_details, Cols) of
true ->
Table = orddict:fetch(consumers, Tables),
fun ([]) -> [];
(Props) -> Pattern = PatternFun(Props),
[{consumer_details,
[augment_msg_stats(Obj, Tables)
|| Obj <- lists:append(
ets:match(Table, {Pattern, '$1'}))]}]
end;
false ->
fun nothing/1
consumer_details_fun(PatternFun, Tables) ->
Table = orddict:fetch(consumers, Tables),
fun ([]) -> [];
(Props) -> Pattern = PatternFun(Props),
[{consumer_details,
[augment_msg_stats(Obj, Tables)
|| Obj <- lists:append(
ets:match(Table, {Pattern, '$1'}))]}]
end.
zero_old_rates(Stats) -> [maybe_zero_rate(S) || S <- Stats].
@ -546,7 +537,6 @@ is_details(Key) -> lists:suffix("_details", atom_to_list(Key)).
details_key(Key) -> list_to_atom(atom_to_list(Key) ++ "_details").
nothing(_) -> [].
%%----------------------------------------------------------------------------
augment_msg_stats(Props, Tables) ->
@ -606,27 +596,36 @@ basic_queue_stats(Objs, Tables) ->
merge_stats(Objs, [basic_stats_fun(queue_stats, Tables),
augment_msg_stats_fun(Tables)]).
queue_stats(Objs, Cols, FineSpecs, Tables) ->
list_queue_stats(Objs, Tables) ->
merge_stats(Objs, [basic_stats_fun(queue_stats, Tables),
consumer_details_fun(
Cols, fun (Props) -> {pget(pid, Props), '_'} end,
Tables),
fine_stats_fun(Cols, FineSpecs, Tables),
fine_stats_fun(?FINE_STATS_QUEUE_LIST, Tables),
augment_msg_stats_fun(Tables)]).
exchange_stats(Objs, Cols, FineSpecs, Tables) ->
merge_stats(Objs, [fine_stats_fun(Cols, FineSpecs, Tables),
detail_queue_stats(Objs, Tables) ->
merge_stats(Objs, [basic_stats_fun(queue_stats, Tables),
consumer_details_fun(
fun (Props) -> {pget(pid, Props), '_'} end, Tables),
fine_stats_fun(?FINE_STATS_QUEUE_DETAIL, Tables),
augment_msg_stats_fun(Tables)]).
exchange_stats(Objs, FineSpecs, Tables) ->
merge_stats(Objs, [fine_stats_fun(FineSpecs, Tables),
augment_msg_stats_fun(Tables)]).
connection_stats(Objs, Tables) ->
merge_stats(Objs, [basic_stats_fun(connection_stats, Tables),
augment_msg_stats_fun(Tables)]).
channel_stats(Objs, Cols, FineSpecs, Tables) ->
list_channel_stats(Objs, Tables) ->
merge_stats(Objs, [basic_stats_fun(channel_stats, Tables),
consumer_details_fun(Cols,
fine_stats_fun(?FINE_STATS_CHANNEL_LIST, Tables),
augment_msg_stats_fun(Tables)]).
detail_channel_stats(Objs, Tables) ->
merge_stats(Objs, [basic_stats_fun(channel_stats, Tables),
consumer_details_fun(
fun (Props) -> {'_', pget(pid, Props)} end, Tables),
fine_stats_fun(Cols, FineSpecs, Tables),
fine_stats_fun(?FINE_STATS_CHANNEL_DETAIL, Tables),
augment_msg_stats_fun(Tables)]).
%%----------------------------------------------------------------------------

View File

@ -43,5 +43,4 @@ is_authorized(ReqData, Context) ->
%%--------------------------------------------------------------------
channel(ReqData) ->
rabbit_mgmt_db:get_channel(rabbit_mgmt_util:id(channel, ReqData),
rabbit_mgmt_util:columns(ReqData)).
rabbit_mgmt_db:get_channel(rabbit_mgmt_util:id(channel, ReqData)).

View File

@ -28,10 +28,9 @@ content_types_provided(ReqData, Context) ->
{[{"application/json", to_json}], ReqData, Context}.
to_json(ReqData, Context) ->
rabbit_mgmt_util:reply_list(
rabbit_mgmt_util:filter_user(
rabbit_mgmt_db:get_channels(rabbit_mgmt_util:columns(ReqData)),
ReqData, Context), ReqData, Context).
Chs = rabbit_mgmt_util:filter_user(rabbit_mgmt_db:get_channels(),
ReqData, Context),
rabbit_mgmt_util:reply_list(Chs, ReqData, Context).
is_authorized(ReqData, Context) ->
rabbit_mgmt_util:is_authorized(ReqData, Context).

View File

@ -43,7 +43,7 @@ resource_exists(ReqData, Context) ->
to_json(ReqData, Context) ->
X0 = exchange(ReqData),
[X] = rabbit_mgmt_db:get_exchange(X0, rabbit_mgmt_util:columns(ReqData)),
[X] = rabbit_mgmt_db:get_exchange(X0),
rabbit_mgmt_util:reply(X, ReqData, Context).
accept_content(ReqData, Context) ->

View File

@ -37,8 +37,7 @@ resource_exists(ReqData, Context) ->
to_json(ReqData, Context) ->
rabbit_mgmt_util:reply_list(
rabbit_mgmt_db:get_exchanges(
rabbit_mgmt_util:filter_vhost(exchanges(ReqData), ReqData, Context),
rabbit_mgmt_util:columns(ReqData)),
rabbit_mgmt_util:filter_vhost(exchanges(ReqData), ReqData, Context)),
ReqData, Context).
is_authorized(ReqData, Context) ->

View File

@ -43,7 +43,7 @@ resource_exists(ReqData, Context) ->
to_json(ReqData, Context) ->
Q0 = queue(ReqData),
[Q] = rabbit_mgmt_db:get_queue(Q0, rabbit_mgmt_util:columns(ReqData)),
Q = rabbit_mgmt_db:get_queue(Q0),
rabbit_mgmt_util:reply(Q, ReqData, Context).
accept_content(ReqData, Context) ->

View File

@ -37,8 +37,7 @@ resource_exists(ReqData, Context) ->
to_json(ReqData, Context) ->
rabbit_mgmt_util:reply_list(
rabbit_mgmt_db:get_queues(
rabbit_mgmt_util:filter_vhost(queues(ReqData), ReqData, Context),
rabbit_mgmt_util:columns(ReqData)),
rabbit_mgmt_util:filter_vhost(queues(ReqData), ReqData, Context)),
ReqData, Context).
is_authorized(ReqData, Context) ->

View File

@ -81,7 +81,7 @@ test_queues(_Conn, Chan) ->
[fun() ->
Qs = rabbit_mgmt_db:get_queues(
[rabbit_mgmt_format:queue(Q) ||
Q <- rabbit_amqqueue:list(<<"/">>)], all),
Q <- rabbit_amqqueue:list(<<"/">>)]),
Q1Info = find_by_name(Q1, Qs),
Q2Info = find_by_name(Q2, Qs),
@ -271,17 +271,16 @@ get_channel(C, Number) ->
Port = local_port(C),
rabbit_mgmt_db:get_channel(list_to_binary(
"127.0.0.1:" ++ integer_to_list(Port) ++ ":" ++
integer_to_list(Number)), all).
integer_to_list(Number))).
get_exchange(XName) ->
X = rabbit_mgmt_wm_exchange:exchange(<<"/">>, XName),
[Res] = rabbit_mgmt_db:get_exchange(X, all),
[Res] = rabbit_mgmt_db:get_exchange(X),
Res.
get_queue(QName) ->
Q = rabbit_mgmt_wm_queue:queue(<<"/">>, QName),
[Res] = rabbit_mgmt_db:get_queue(Q, all),
Res.
rabbit_mgmt_db:get_queue(Q).
declare_queue(Chan) ->
#'queue.declare_ok'{ queue = Q } =