Merge bug26107

This commit is contained in:
Simon MacMullen 2014-05-19 10:39:23 +01:00
commit cd068d1a59
10 changed files with 173 additions and 23 deletions

View File

@ -34,6 +34,7 @@ VERSION = '%%VSN%%'
LISTABLE = {'connections': {'vhost': False},
'channels': {'vhost': False},
'consumers': {'vhost': True},
'exchanges': {'vhost': True},
'queues': {'vhost': True},
'bindings': {'vhost': True},

View File

@ -286,6 +286,22 @@ Content-Length: 0</pre>
<td class="path">/api/channels/<i>channel</i></td>
<td>Details about an individual channel.</td>
</tr>
<tr>
<td>X</td>
<td></td>
<td></td>
<td></td>
<td class="path">/api/consumers</td>
<td>A list of all consumers.</td>
</tr>
<tr>
<td>X</td>
<td></td>
<td></td>
<td></td>
<td class="path">/api/consumers/<i>vhost</i></td>
<td>A list of all consumers in a given virtual host.</td>
</tr>
<tr>
<td>X</td>
<td></td>

View File

@ -734,11 +734,26 @@ function publish_msg(params0) {
params['properties']['delivery_mode'] = parseInt(params['delivery_mode']);
if (params['headers'] != '')
params['properties']['headers'] = params['headers'];
var props = ['content_type', 'content_encoding', 'priority', 'correlation_id', 'reply_to', 'expiration', 'message_id', 'timestamp', 'type', 'user_id', 'app_id', 'cluster_id'];
var props = [['content_type', 'str'],
['content_encoding', 'str'],
['correlation_id', 'str'],
['reply_to', 'str'],
['expiration', 'str'],
['message_id', 'str'],
['type', 'str'],
['user_id', 'str'],
['app_id', 'str'],
['cluster_id', 'str'],
['priority', 'int'],
['timestamp', 'int']];
for (var i in props) {
var p = props[i];
if (params['props'][p] != '')
params['properties'][p] = params['props'][p];
var name = props[i][0];
var type = props[i][1];
if (params['props'][name] != undefined && params['props'][name] != '') {
var value = params['props'][name];
if (type == 'int') value = parseInt(value);
params['properties'][name] = value;
}
}
with_req('POST', path, JSON.stringify(params), function(resp) {
var result = jQuery.parseJSON(resp.responseText);
@ -994,11 +1009,12 @@ function collapse_multifields(params0) {
var v = params0[name + '_' + id + '_mfvalue'];
var t = params0[name + '_' + id + '_mftype'];
var val = null;
var top_level = id_parts.length == 1;
if (t == 'list') {
val = [];
id_map[name][id] = val;
}
else if (set(k) || set(v)) {
else if ((set(k) && top_level) || set(v)) {
if (t == 'boolean') {
if (v != 'true' && v != 'false')
throw(k + ' must be "true" or "false"; got ' + v);
@ -1015,7 +1031,7 @@ function collapse_multifields(params0) {
}
}
if (val != null) {
if (id_parts.length == 1) {
if (top_level) {
params[name][k] = val;
}
else {

View File

@ -27,11 +27,12 @@
augment_nodes/1, augment_vhosts/2,
get_channel/2, get_connection/2,
get_all_channels/1, get_all_connections/1,
get_all_consumers/0, get_all_consumers/1,
get_overview/2, get_overview/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_cast/3,
format_message_queue/2]).
code_change/3, handle_pre_hibernate/1,
prioritise_cast/3, prioritise_call/4, format_message_queue/2]).
%% For testing
-export([override_lookups/1, reset_lookups/0]).
@ -176,6 +177,9 @@ prioritise_cast({event, #event{type = Type,
prioritise_cast(_Msg, _Len, _State) ->
0.
%% We want timely replies to queries even when overloaded!
prioritise_call(_Msg, _From, _Len, _State) -> 5.
%%----------------------------------------------------------------------------
%% API
%%----------------------------------------------------------------------------
@ -203,6 +207,9 @@ get_connection(Name, R) -> safe_call({get_connection, Name, R}, not_found).
get_all_channels(R) -> safe_call({get_all_channels, R}).
get_all_connections(R) -> safe_call({get_all_connections, R}).
get_all_consumers() -> safe_call({get_all_consumers, all}).
get_all_consumers(V) -> safe_call({get_all_consumers, V}).
get_overview(User, R) -> safe_call({get_overview, User, R}).
get_overview(R) -> safe_call({get_overview, all, R}).
@ -293,6 +300,14 @@ handle_call({get_all_connections, Ranges}, _From,
Conns = created_events(connection_stats, Tables),
reply(connection_stats(Ranges, Conns, State), State);
handle_call({get_all_consumers, VHost},
_From, State = #state{tables = Tables}) ->
All = ets:tab2list(orddict:fetch(consumers_by_queue, Tables)),
{reply, [augment_msg_stats(
augment_consumer(Obj), State) ||
{{#resource{virtual_host = VHostC}, _Ch, _CTag}, Obj} <- All,
VHost =:= all orelse VHost =:= VHostC], State};
handle_call({get_overview, User, Ranges}, _From,
State = #state{tables = Tables}) ->
VHosts = case User of
@ -333,6 +348,12 @@ handle_call({override_lookups, Lookups}, _From, State) ->
handle_call(reset_lookups, _From, State) ->
reply(ok, reset_lookups(State));
%% Used in rabbit_mgmt_test_db where we need guarantees events have
%% been handled before querying
handle_call({event, Event = #event{reference = none}}, _From, State) ->
handle_event(Event, State),
reply(ok, State);
handle_call(_Request, _From, State) ->
reply(not_understood, State).
@ -1034,6 +1055,7 @@ augment_channel_pid(Pid, #state{tables = Tables}) ->
{pget(connection, Ch), create}),
[{name, pget(name, Ch)},
{number, pget(number, Ch)},
{user, pget(user, Ch)},
{connection_name, pget(name, Conn)},
{peer_port, pget(peer_port, Conn)},
{peer_host, pget(peer_host, Conn)}].

View File

@ -55,6 +55,8 @@ dispatcher() ->
{["connections", connection, "channels"], rabbit_mgmt_wm_connection_channels, []},
{["channels"], rabbit_mgmt_wm_channels, []},
{["channels", channel], rabbit_mgmt_wm_channel, []},
{["consumers"], rabbit_mgmt_wm_consumers, []},
{["consumers", vhost], rabbit_mgmt_wm_consumers, []},
{["exchanges"], rabbit_mgmt_wm_exchanges, []},
{["exchanges", vhost], rabbit_mgmt_wm_exchanges, []},
{["exchanges", vhost, exchange], rabbit_mgmt_wm_exchange, []},

View File

@ -277,8 +277,16 @@ to_basic_properties({struct, P}) ->
to_basic_properties(P);
to_basic_properties(Props) ->
Fmt = fun (headers, H) -> to_amqp_table(H);
(_K , V) -> V
E = fun (A, B) -> throw({error, {A, B}}) end,
Fmt = fun (headers, H) -> to_amqp_table(H);
(delivery_mode, V) when is_integer(V) -> V;
(delivery_mode, _V) -> E(not_int,delivery_mode);
(priority, V) when is_integer(V) -> V;
(priority, _V) -> E(not_int, priority);
(timestamp, V) when is_integer(V) -> V;
(timestamp, _V) -> E(not_int, timestamp);
(_, V) when is_binary(V) -> V;
(K, _V) -> E(not_string, K)
end,
{Res, _Ix} = lists:foldl(
fun (K, {P, Ix}) ->

View File

@ -28,7 +28,7 @@
-export([with_channel/4, with_channel/5]).
-export([props_to_method/2, props_to_method/4]).
-export([all_or_one_vhost/2, http_to_amqp/5, reply/3, filter_vhost/3]).
-export([filter_conn_ch_list/3, filter_user/2]).
-export([filter_conn_ch_list/3, filter_user/2, list_login_vhosts/1]).
-export([with_decode/5, decode/1, decode/2, redirect/2, args/1]).
-export([reply_list/3, reply_list/4, sort_list/2, destination_type/1]).
-export([post_respond/1, columns/1, is_monitor/1]).

View File

@ -0,0 +1,56 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ Management Plugin.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2010-2014 GoPivotal, Inc. All rights reserved.
-module(rabbit_mgmt_wm_consumers).
-export([init/1, to_json/2, content_types_provided/2, resource_exists/2,
is_authorized/2]).
-import(rabbit_misc, [pget/2]).
-include("rabbit_mgmt.hrl").
-include_lib("webmachine/include/webmachine.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
%%--------------------------------------------------------------------
init(_Config) -> {ok, #context{}}.
content_types_provided(ReqData, Context) ->
{[{"application/json", to_json}], ReqData, Context}.
resource_exists(ReqData, Context) ->
{case rabbit_mgmt_util:vhost(ReqData) of
vhost_not_found -> false;
_ -> true
end, ReqData, Context}.
to_json(ReqData, Context = #context{user = User}) ->
Consumers = case rabbit_mgmt_util:vhost(ReqData) of
none -> rabbit_mgmt_db:get_all_consumers();
VHost -> rabbit_mgmt_db:get_all_consumers(VHost)
end,
rabbit_mgmt_util:reply_list(
filter_user(Consumers, User), ReqData, Context).
is_authorized(ReqData, Context) ->
rabbit_mgmt_util:is_authorized(ReqData, Context).
filter_user(List, #user{username = Username, tags = Tags}) ->
case rabbit_mgmt_util:is_monitor(Tags) of
true -> List;
false -> [I || I <- List,
pget(user, pget(channel_details, I)) == Username]
end.

View File

@ -204,11 +204,11 @@ delete_ch(Name, Timestamp) ->
event(channel_closed, [{pid, pid_del(Name)}], Timestamp).
event(Type, Stats, Timestamp) ->
gen_server:cast({global, rabbit_mgmt_db},
{event, #event{type = Type,
props = Stats,
reference = none,
timestamp = sec_to_triple(Timestamp)}}).
ok = gen_server:call(rabbit_mgmt_db,
{event, #event{type = Type,
props = Stats,
reference = none,
timestamp = sec_to_triple(Timestamp)}}).
sec_to_triple(Sec) -> {Sec div 1000000, Sec rem 1000000, 0}.

View File

@ -520,21 +520,25 @@ get_conn(Username, Password) ->
[LocalPort]),
{Conn, ConnPath, ChPath, ConnChPath}.
permissions_connection_channel_test() ->
permissions_connection_channel_consumer_test() ->
PermArgs = [{configure, <<".*">>}, {write, <<".*">>}, {read, <<".*">>}],
http_put("/users/user", [{password, <<"user">>},
{tags, <<"management">>}], ?NO_CONTENT),
http_put("/permissions/%2f/user", PermArgs, ?NO_CONTENT),
http_put("/users/monitor", [{password, <<"monitor">>},
{tags, <<"monitoring">>}], ?NO_CONTENT),
{tags, <<"monitoring">>}], ?NO_CONTENT),
http_put("/permissions/%2f/monitor", PermArgs, ?NO_CONTENT),
http_put("/queues/%2f/test", [], ?NO_CONTENT),
{Conn1, UserConn, UserCh, UserConnCh} = get_conn("user", "user"),
{Conn2, MonConn, MonCh, MonConnCh} = get_conn("monitor", "monitor"),
{Conn3, AdmConn, AdmCh, AdmConnCh} = get_conn("guest", "guest"),
{ok, _Ch1} = amqp_connection:open_channel(Conn1),
{ok, _Ch2} = amqp_connection:open_channel(Conn2),
{ok, _Ch3} = amqp_connection:open_channel(Conn3),
{ok, Ch1} = amqp_connection:open_channel(Conn1),
{ok, Ch2} = amqp_connection:open_channel(Conn2),
{ok, Ch3} = amqp_connection:open_channel(Conn3),
[amqp_channel:subscribe(
Ch, #'basic.consume'{queue = <<"test">>}, self()) ||
Ch <- [Ch1, Ch2, Ch3]],
AssertLength = fun (Path, User, Len) ->
?assertEqual(Len,
length(http_get(Path, User, User, ?OK)))
@ -543,7 +547,7 @@ permissions_connection_channel_test() ->
AssertLength(P, "user", 1),
AssertLength(P, "monitor", 3),
AssertLength(P, "guest", 3)
end || P <- ["/connections", "/channels"]],
end || P <- ["/connections", "/channels", "/consumers", "/consumers/%2f"]],
AssertRead = fun(Path, UserStatus) ->
http_get(Path, "user", "user", UserStatus),
@ -573,6 +577,22 @@ permissions_connection_channel_test() ->
http_delete("/users/monitor", ?NO_CONTENT),
http_get("/connections/foo", ?NOT_FOUND),
http_get("/channels/foo", ?NOT_FOUND),
http_delete("/queues/%2f/test", ?NO_CONTENT),
ok.
consumers_test() ->
http_put("/queues/%2f/test", [], ?NO_CONTENT),
{Conn, _ConnPath, _ChPath, _ConnChPath} = get_conn("guest", "guest"),
{ok, Ch} = amqp_connection:open_channel(Conn),
amqp_channel:subscribe(
Ch, #'basic.consume'{queue = <<"test">>,
no_ack = false,
consumer_tag = <<"my-ctag">> }, self()),
assert_list([[{exclusive, false},
{ack_required, true},
{consumer_tag, <<"my-ctag">>}]], http_get("/consumers")),
amqp_connection:close(Conn),
http_delete("/queues/%2f/test", ?NO_CONTENT),
ok.
unicode_test() ->
@ -959,6 +979,15 @@ publish_fail_test() ->
{payload, [<<"not a string">>]},
{payload_encoding, <<"string">>}],
http_post("/exchanges/%2f/amq.default/publish", Msg3, ?BAD_REQUEST),
MsgTemplate = [{exchange, <<"">>},
{routing_key, <<"myqueue">>},
{payload, <<"Hello world">>},
{payload_encoding, <<"string">>}],
[http_post("/exchanges/%2f/amq.default/publish",
[{properties, [BadProp]} | MsgTemplate], ?BAD_REQUEST)
|| BadProp <- [{priority, <<"really high">>},
{timestamp, <<"recently">>},
{expiration, 1234}]],
http_delete("/users/myuser", ?NO_CONTENT),
ok.