Distributed querying for channels and channel

channel queries now perform an additional query to get the consumer detials once
the channel pids are known
This commit is contained in:
kjnilsson 2016-09-16 09:41:19 +01:00
parent 7b8eddb765
commit 04a9004b8d
4 changed files with 150 additions and 14 deletions

View File

@ -233,6 +233,10 @@ handle_call({get_all_consumers, VHost}, _From, State) ->
{reply, [augment_msg_stats(augment_consumer(C)) ||
C <- consumers_by_vhost(VHost)], State};
handle_call({get_consumers, ChPids}, _From, State) ->
{reply, [augment_msg_stats(augment_consumer(C)) ||
C <- consumers_by_channel_pids(ChPids)], State};
handle_call({get_overview, User, Ranges}, _From,
#state{interval = Interval} = State) ->
VHosts = case User of
@ -490,7 +494,8 @@ detail_channel_stats(Ranges, Objs, Interval) ->
{deliveries, detail_stats(channel_queue_stats_deliver_stats,
fine_stats, first(Id), Ranges,
Interval)}],
augment_msg_stats(combine(Props, Obj)) ++ Consumers ++ Stats ++ StatsD
[{pid, Id}] ++
augment_msg_stats(combine(Props, Obj)) ++ Consumers ++ Stats ++ StatsD
end || Obj <- Objs].
augment_consumer({{Q, Ch, CTag}, Props}) ->
@ -610,6 +615,17 @@ consumers_by_vhost(VHost) ->
[{'orelse', {'==', 'all', VHost}, {'==', VHost, '$1'}}],
['$_']}]).
consumers_by_channel_pids(ChPids) ->
lists:foldl(fun (ChPid, Agg) ->
consumers_by_channel_pid(ChPid) ++ Agg
end, [], ChPids).
consumers_by_channel_pid(ChPid) ->
ets:select(consumer_stats,
[{{{'_', '$1', '_'}, '_'},
[{'==', ChPid, '$1'}],
['$_']}]).
%%----------------------------------------------------------------------------
%% Internal, query-time augmentation
%%----------------------------------------------------------------------------

View File

@ -20,6 +20,8 @@
-export([resource_exists/2]).
-export([variances/2]).
-import(rabbit_misc, [pset/3]).
-include("rabbit_mgmt.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
@ -58,5 +60,14 @@ is_authorized(ReqData, Context) ->
%%--------------------------------------------------------------------
channel(ReqData) ->
rabbit_mgmt_db:get_channel(rabbit_mgmt_util:id(channel, ReqData),
rabbit_mgmt_util:range(ReqData)).
MemberPids = pg2:get_members(management_db),
{PidResults, _} = delegate:call(MemberPids, "delegate_management_",
{get_channel, rabbit_mgmt_util:id(channel, ReqData),
rabbit_mgmt_util:range(ReqData)}),
ChannelPids = [rabbit_misc:pget(pid, R) || {_, [_|_] = R} <- PidResults],
{PidConsumers, _} = delegate:call(MemberPids, "delegate_management_",
{get_consumers, ChannelPids}),
Consumers = lists:append([Cs || {_, Cs} <- PidConsumers]),
[Channel] = [R || {_, [_|_] = R} <- PidResults],
pset(consumer_details, Consumers, Channel).

View File

@ -51,6 +51,9 @@ is_authorized(ReqData, Context) ->
rabbit_mgmt_util:is_authorized(ReqData, Context).
augmented(ReqData, Context) ->
MemberPids = pg2:get_members(management_db),
{PidResults, _} = delegate:call(MemberPids, "delegate_management_",
{get_all_channels, rabbit_mgmt_util:range(ReqData)}),
rabbit_mgmt_util:filter_conn_ch_list(
rabbit_mgmt_db:get_all_channels(
rabbit_mgmt_util:range(ReqData)), ReqData, Context).
lists:append([R || {_, R} <- PidResults]), ReqData, Context).

View File

@ -44,7 +44,13 @@ groups() ->
queue_consumer_channel_closed,
queues_single,
queues_multiple,
queues_removed
queues_removed,
channels_multiple_on_different_nodes,
channels_cancelled,
channel,
channel_other_node,
channel_with_consumer_on_other_node,
channel_with_consumer_on_same_node
]}
].
@ -125,11 +131,11 @@ multi_node_case1_test(Config) ->
restart_node(Config, 1),
Q2 = wait_for(Config, "/queues/%2f/ha-queue"),
http_delete(Config, "/queues/%2f/ha-queue", ?NO_CONTENT),
http_delete(Config, "/policies/%2f/HA", ?NO_CONTENT),
assert_node(Nodename1, pget(node, Q2)),
assert_single_node(Nodename2, pget(slave_nodes, Q2)),
assert_single_node(Nodename2, pget(synchronised_slave_nodes, Q2)),
http_delete(Config, "/queues/%2f/ha-queue", ?NO_CONTENT),
http_delete(Config, "/policies/%2f/HA", ?NO_CONTENT),
passed.
@ -147,13 +153,13 @@ ha_queue_hosted_on_other_node(Config) ->
timer:sleep(2000), % wait for metrics to be pushed :(
Res = http_get(Config, "/queues/%2f/ha-queue"),
amqp_channel:close(Chan),
http_delete(Config, "/queues/%2f/ha-queue", ?NO_CONTENT),
http_delete(Config, "/policies/%2f/HA", ?NO_CONTENT),
% assert some basic data is there
[Cons] = pget(consumer_details, Res),
[_|_] = pget(channel_details, Cons), % channel details proplist must not be empty
0 = pget(prefetch_count, Cons), % check one of the augmented properties
<<"ha-queue">> = pget(name, Res),
http_delete(Config, "/queues/%2f/ha-queue", ?NO_CONTENT),
http_delete(Config, "/policies/%2f/HA", ?NO_CONTENT),
ok.
ha_queue_with_multiple_consumers(Config) ->
@ -171,6 +177,8 @@ ha_queue_with_multiple_consumers(Config) ->
timer:sleep(3000), % wait for metrics to be pushed
Res = http_get(Config, "/queues/%2f/ha-queue"),
amqp_channel:close(Chan),
http_delete(Config, "/queues/%2f/ha-queue", ?NO_CONTENT),
http_delete(Config, "/policies/%2f/HA", ?NO_CONTENT),
% assert some basic data is there
[C1, C2] = pget(consumer_details, Res),
% channel details proplist must not be empty
@ -180,8 +188,6 @@ ha_queue_with_multiple_consumers(Config) ->
0 = pget(prefetch_count, C1),
0 = pget(prefetch_count, C2),
<<"ha-queue">> = pget(name, Res),
http_delete(Config, "/queues/%2f/ha-queue", ?NO_CONTENT),
http_delete(Config, "/policies/%2f/HA", ?NO_CONTENT),
ok.
queue_on_other_node(Config) ->
@ -194,12 +200,12 @@ queue_on_other_node(Config) ->
timer:sleep(2000), % wait for metrics to be pushed :(
Res = http_get(Config, "/queues/%2f/some-queue"),
amqp_channel:close(Chan),
http_delete(Config, "/queues/%2f/some-queue", ?NO_CONTENT),
% assert some basic data is present
[Cons] = pget(consumer_details, Res),
[_|_] = pget(channel_details, Cons), % channel details proplist must not be empty
0 = pget(prefetch_count, Cons), % check one of the augmented properties
<<"some-queue">> = pget(name, Res),
http_delete(Config, "/queues/%2f/some-queue", ?NO_CONTENT),
ok.
queue_with_multiple_consumers(Config) ->
@ -296,13 +302,113 @@ queues_removed(Config) ->
http_delete(Config, "/queues/%2f/some-queue", ?NO_CONTENT),
timer:sleep(2000), % wait for metrics to be pushed :(
Res = http_get(Config, "/queues/%2f"),
ct:pal("Res: ~p~n", [Res]),
% assert single queue is returned
[] = Res,
ok.
channels_multiple_on_different_nodes(Config) ->
Nodename2 = get_node_config(Config, 1, nodename),
QArgs = [{node, list_to_binary(atom_to_list(Nodename2))}],
http_put(Config, "/queues/%2f/some-queue", QArgs, ?CREATED),
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
{ok, Chan2} = amqp_connection:open_channel(Conn),
consume(Chan, <<"some-queue">>),
timer:sleep(2000), % wait for metrics to be pushed :(
Res = http_get(Config, "/channels"),
amqp_channel:close(Chan),
amqp_channel:close(Chan2),
amqp_connection:close(Conn),
http_delete(Config, "/queues/%2f/some-queue", ?NO_CONTENT),
% assert two channels are present
[_,_] = Res,
ok.
channels_cancelled(Config) ->
Nodename2 = get_node_config(Config, 1, nodename),
QArgs = [{node, list_to_binary(atom_to_list(Nodename2))}],
http_put(Config, "/queues/%2f/some-queue", QArgs, ?CREATED),
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
{ok, Chan2} = amqp_connection:open_channel(Conn),
consume(Chan, <<"some-queue">>),
timer:sleep(2000), % wait for metrics to be pushed
amqp_channel:close(Chan2),
amqp_connection:close(Conn),
timer:sleep(2000), % wait for metrics to be pushed
Res = http_get(Config, "/channels"),
http_delete(Config, "/queues/%2f/some-queue", ?NO_CONTENT),
amqp_channel:close(Chan),
% assert one channel is present
[_] = Res,
ok.
channel(Config) ->
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
[{_, ChData}] = rabbit_ct_broker_helpers:rpc(Config, 0, ets, tab2list, [channel_created]),
ChName = http_uri:encode(binary_to_list(pget(name, ChData))),
timer:sleep(2000), % wait for metrics to be pushed :(
Res = http_get(Config, "/channels/" ++ ChName ),
amqp_channel:close(Chan),
% assert channel is non empty
[_|_] = Res,
ok.
channel_other_node(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
{ok, _} = amqp_connection:open_channel(Conn),
[{_, ChData}] = rabbit_ct_broker_helpers:rpc(Config, 1, ets, tab2list,
[channel_created]),
ChName = http_uri:encode(binary_to_list(pget(name, ChData))),
timer:sleep(2000), % wait for metrics to be pushed :(
Res = http_get(Config, "/channels/" ++ ChName ),
amqp_connection:close(Conn),
% assert channel is non empty
[_|_] = Res,
ok.
channel_with_consumer_on_other_node(Config) ->
Nodename2 = get_node_config(Config, 1, nodename),
QArgs = [{node, list_to_binary(atom_to_list(Nodename2))}],
http_put(Config, "/queues/%2f/some-queue", QArgs, ?CREATED),
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
ChName = get_channel_name(Config, 0),
consume(Chan, <<"some-queue">>),
timer:sleep(2000), % wait for metrics to be pushed
Res = http_get(Config, "/channels/" ++ ChName ),
http_delete(Config, "/queues/%2f/some-queue", ?NO_CONTENT),
amqp_channel:close(Chan),
% assert channel is non empty
[_|_] = Res,
[_] = pget(consumer_details, Res),
ok.
channel_with_consumer_on_same_node(Config) ->
Nodename1 = get_node_config(Config, 0, nodename),
QArgs = [{node, list_to_binary(atom_to_list(Nodename1))}],
http_put(Config, "/queues/%2f/some-queue", QArgs, ?CREATED),
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
ChName = get_channel_name(Config, 0),
consume(Chan, <<"some-queue">>),
timer:sleep(2000), % wait for metrics to be pushed
Res = http_get(Config, "/channels/" ++ ChName ),
amqp_channel:close(Chan),
http_delete(Config, "/queues/%2f/some-queue", ?NO_CONTENT),
% assert channel is non empty
[_|_] = Res,
[_] = pget(consumer_details, Res),
ok.
%%----------------------------------------------------------------------------
get_channel_name(Config, Node) ->
[{_, ChData}|_] = rabbit_ct_broker_helpers:rpc(Config, Node, ets, tab2list,
[channel_created]),
http_uri:encode(binary_to_list(pget(name, ChData))).
consume(Channel, Queue) ->
#'basic.consume_ok'{consumer_tag = Tag} =
amqp_channel:call(Channel, #'basic.consume'{queue = Queue}),