diff --git a/deps/rabbitmq_management/bin/rabbitmqadmin b/deps/rabbitmq_management/bin/rabbitmqadmin
index 6f4cb6c095..df6cc9d63c 100755
--- a/deps/rabbitmq_management/bin/rabbitmqadmin
+++ b/deps/rabbitmq_management/bin/rabbitmqadmin
@@ -34,6 +34,7 @@ VERSION = '%%VSN%%'
LISTABLE = {'connections': {'vhost': False},
'channels': {'vhost': False},
+ 'consumers': {'vhost': True},
'exchanges': {'vhost': True},
'queues': {'vhost': True},
'bindings': {'vhost': True},
diff --git a/deps/rabbitmq_management/priv/www/api/index.html b/deps/rabbitmq_management/priv/www/api/index.html
index a6ae001f0c..1fb5c14324 100644
--- a/deps/rabbitmq_management/priv/www/api/index.html
+++ b/deps/rabbitmq_management/priv/www/api/index.html
@@ -286,6 +286,22 @@ Content-Length: 0
/api/channels/channel |
Details about an individual channel. |
+
+ | X |
+ |
+ |
+ |
+ /api/consumers |
+ A list of all consumers. |
+
+
+ | X |
+ |
+ |
+ |
+ /api/consumers/vhost |
+ A list of all consumers in a given virtual host. |
+
| X |
|
diff --git a/deps/rabbitmq_management/priv/www/js/main.js b/deps/rabbitmq_management/priv/www/js/main.js
index 80e8b77e04..ab20fe18ec 100644
--- a/deps/rabbitmq_management/priv/www/js/main.js
+++ b/deps/rabbitmq_management/priv/www/js/main.js
@@ -734,11 +734,26 @@ function publish_msg(params0) {
params['properties']['delivery_mode'] = parseInt(params['delivery_mode']);
if (params['headers'] != '')
params['properties']['headers'] = params['headers'];
- var props = ['content_type', 'content_encoding', 'priority', 'correlation_id', 'reply_to', 'expiration', 'message_id', 'timestamp', 'type', 'user_id', 'app_id', 'cluster_id'];
+ var props = [['content_type', 'str'],
+ ['content_encoding', 'str'],
+ ['correlation_id', 'str'],
+ ['reply_to', 'str'],
+ ['expiration', 'str'],
+ ['message_id', 'str'],
+ ['type', 'str'],
+ ['user_id', 'str'],
+ ['app_id', 'str'],
+ ['cluster_id', 'str'],
+ ['priority', 'int'],
+ ['timestamp', 'int']];
for (var i in props) {
- var p = props[i];
- if (params['props'][p] != '')
- params['properties'][p] = params['props'][p];
+ var name = props[i][0];
+ var type = props[i][1];
+ if (params['props'][name] != undefined && params['props'][name] != '') {
+ var value = params['props'][name];
+ if (type == 'int') value = parseInt(value);
+ params['properties'][name] = value;
+ }
}
with_req('POST', path, JSON.stringify(params), function(resp) {
var result = jQuery.parseJSON(resp.responseText);
@@ -994,11 +1009,12 @@ function collapse_multifields(params0) {
var v = params0[name + '_' + id + '_mfvalue'];
var t = params0[name + '_' + id + '_mftype'];
var val = null;
+ var top_level = id_parts.length == 1;
if (t == 'list') {
val = [];
id_map[name][id] = val;
}
- else if (set(k) || set(v)) {
+ else if ((set(k) && top_level) || set(v)) {
if (t == 'boolean') {
if (v != 'true' && v != 'false')
throw(k + ' must be "true" or "false"; got ' + v);
@@ -1015,7 +1031,7 @@ function collapse_multifields(params0) {
}
}
if (val != null) {
- if (id_parts.length == 1) {
+ if (top_level) {
params[name][k] = val;
}
else {
diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_db.erl b/deps/rabbitmq_management/src/rabbit_mgmt_db.erl
index f8a677bbf6..3d487d89b4 100644
--- a/deps/rabbitmq_management/src/rabbit_mgmt_db.erl
+++ b/deps/rabbitmq_management/src/rabbit_mgmt_db.erl
@@ -27,11 +27,12 @@
augment_nodes/1, augment_vhosts/2,
get_channel/2, get_connection/2,
get_all_channels/1, get_all_connections/1,
+ get_all_consumers/0, get_all_consumers/1,
get_overview/2, get_overview/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3, handle_pre_hibernate/1, prioritise_cast/3,
- format_message_queue/2]).
+ code_change/3, handle_pre_hibernate/1,
+ prioritise_cast/3, prioritise_call/4, format_message_queue/2]).
%% For testing
-export([override_lookups/1, reset_lookups/0]).
@@ -176,6 +177,9 @@ prioritise_cast({event, #event{type = Type,
prioritise_cast(_Msg, _Len, _State) ->
0.
+%% We want timely replies to queries even when overloaded!
+prioritise_call(_Msg, _From, _Len, _State) -> 5.
+
%%----------------------------------------------------------------------------
%% API
%%----------------------------------------------------------------------------
@@ -203,6 +207,9 @@ get_connection(Name, R) -> safe_call({get_connection, Name, R}, not_found).
get_all_channels(R) -> safe_call({get_all_channels, R}).
get_all_connections(R) -> safe_call({get_all_connections, R}).
+get_all_consumers() -> safe_call({get_all_consumers, all}).
+get_all_consumers(V) -> safe_call({get_all_consumers, V}).
+
get_overview(User, R) -> safe_call({get_overview, User, R}).
get_overview(R) -> safe_call({get_overview, all, R}).
@@ -293,6 +300,14 @@ handle_call({get_all_connections, Ranges}, _From,
Conns = created_events(connection_stats, Tables),
reply(connection_stats(Ranges, Conns, State), State);
+handle_call({get_all_consumers, VHost},
+ _From, State = #state{tables = Tables}) ->
+ All = ets:tab2list(orddict:fetch(consumers_by_queue, Tables)),
+ {reply, [augment_msg_stats(
+ augment_consumer(Obj), State) ||
+ {{#resource{virtual_host = VHostC}, _Ch, _CTag}, Obj} <- All,
+ VHost =:= all orelse VHost =:= VHostC], State};
+
handle_call({get_overview, User, Ranges}, _From,
State = #state{tables = Tables}) ->
VHosts = case User of
@@ -333,6 +348,12 @@ handle_call({override_lookups, Lookups}, _From, State) ->
handle_call(reset_lookups, _From, State) ->
reply(ok, reset_lookups(State));
+%% Used in rabbit_mgmt_test_db where we need guarantees events have
+%% been handled before querying
+handle_call({event, Event = #event{reference = none}}, _From, State) ->
+ handle_event(Event, State),
+ reply(ok, State);
+
handle_call(_Request, _From, State) ->
reply(not_understood, State).
@@ -1034,6 +1055,7 @@ augment_channel_pid(Pid, #state{tables = Tables}) ->
{pget(connection, Ch), create}),
[{name, pget(name, Ch)},
{number, pget(number, Ch)},
+ {user, pget(user, Ch)},
{connection_name, pget(name, Conn)},
{peer_port, pget(peer_port, Conn)},
{peer_host, pget(peer_host, Conn)}].
diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_dispatcher.erl b/deps/rabbitmq_management/src/rabbit_mgmt_dispatcher.erl
index bcfcfc6de6..1e7100fae3 100644
--- a/deps/rabbitmq_management/src/rabbit_mgmt_dispatcher.erl
+++ b/deps/rabbitmq_management/src/rabbit_mgmt_dispatcher.erl
@@ -55,6 +55,8 @@ dispatcher() ->
{["connections", connection, "channels"], rabbit_mgmt_wm_connection_channels, []},
{["channels"], rabbit_mgmt_wm_channels, []},
{["channels", channel], rabbit_mgmt_wm_channel, []},
+ {["consumers"], rabbit_mgmt_wm_consumers, []},
+ {["consumers", vhost], rabbit_mgmt_wm_consumers, []},
{["exchanges"], rabbit_mgmt_wm_exchanges, []},
{["exchanges", vhost], rabbit_mgmt_wm_exchanges, []},
{["exchanges", vhost, exchange], rabbit_mgmt_wm_exchange, []},
diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_format.erl b/deps/rabbitmq_management/src/rabbit_mgmt_format.erl
index 617f3eedce..487ff860e6 100644
--- a/deps/rabbitmq_management/src/rabbit_mgmt_format.erl
+++ b/deps/rabbitmq_management/src/rabbit_mgmt_format.erl
@@ -277,8 +277,16 @@ to_basic_properties({struct, P}) ->
to_basic_properties(P);
to_basic_properties(Props) ->
- Fmt = fun (headers, H) -> to_amqp_table(H);
- (_K , V) -> V
+ E = fun (A, B) -> throw({error, {A, B}}) end,
+ Fmt = fun (headers, H) -> to_amqp_table(H);
+ (delivery_mode, V) when is_integer(V) -> V;
+ (delivery_mode, _V) -> E(not_int,delivery_mode);
+ (priority, V) when is_integer(V) -> V;
+ (priority, _V) -> E(not_int, priority);
+ (timestamp, V) when is_integer(V) -> V;
+ (timestamp, _V) -> E(not_int, timestamp);
+ (_, V) when is_binary(V) -> V;
+ (K, _V) -> E(not_string, K)
end,
{Res, _Ix} = lists:foldl(
fun (K, {P, Ix}) ->
diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_util.erl b/deps/rabbitmq_management/src/rabbit_mgmt_util.erl
index 2a425b10ce..235d674f61 100644
--- a/deps/rabbitmq_management/src/rabbit_mgmt_util.erl
+++ b/deps/rabbitmq_management/src/rabbit_mgmt_util.erl
@@ -28,7 +28,7 @@
-export([with_channel/4, with_channel/5]).
-export([props_to_method/2, props_to_method/4]).
-export([all_or_one_vhost/2, http_to_amqp/5, reply/3, filter_vhost/3]).
--export([filter_conn_ch_list/3, filter_user/2]).
+-export([filter_conn_ch_list/3, filter_user/2, list_login_vhosts/1]).
-export([with_decode/5, decode/1, decode/2, redirect/2, args/1]).
-export([reply_list/3, reply_list/4, sort_list/2, destination_type/1]).
-export([post_respond/1, columns/1, is_monitor/1]).
diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_consumers.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_consumers.erl
new file mode 100644
index 0000000000..d531fc4d97
--- /dev/null
+++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_consumers.erl
@@ -0,0 +1,56 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ Management Plugin.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2010-2014 GoPivotal, Inc. All rights reserved.
+
+-module(rabbit_mgmt_wm_consumers).
+
+-export([init/1, to_json/2, content_types_provided/2, resource_exists/2,
+ is_authorized/2]).
+
+-import(rabbit_misc, [pget/2]).
+
+-include("rabbit_mgmt.hrl").
+-include_lib("webmachine/include/webmachine.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+%%--------------------------------------------------------------------
+
+init(_Config) -> {ok, #context{}}.
+
+content_types_provided(ReqData, Context) ->
+ {[{"application/json", to_json}], ReqData, Context}.
+
+resource_exists(ReqData, Context) ->
+ {case rabbit_mgmt_util:vhost(ReqData) of
+ vhost_not_found -> false;
+ _ -> true
+ end, ReqData, Context}.
+
+to_json(ReqData, Context = #context{user = User}) ->
+ Consumers = case rabbit_mgmt_util:vhost(ReqData) of
+ none -> rabbit_mgmt_db:get_all_consumers();
+ VHost -> rabbit_mgmt_db:get_all_consumers(VHost)
+ end,
+ rabbit_mgmt_util:reply_list(
+ filter_user(Consumers, User), ReqData, Context).
+
+is_authorized(ReqData, Context) ->
+ rabbit_mgmt_util:is_authorized(ReqData, Context).
+
+filter_user(List, #user{username = Username, tags = Tags}) ->
+ case rabbit_mgmt_util:is_monitor(Tags) of
+ true -> List;
+ false -> [I || I <- List,
+ pget(user, pget(channel_details, I)) == Username]
+ end.
diff --git a/deps/rabbitmq_management/test/src/rabbit_mgmt_test_db.erl b/deps/rabbitmq_management/test/src/rabbit_mgmt_test_db.erl
index ef2f5be09f..e73cbbe850 100644
--- a/deps/rabbitmq_management/test/src/rabbit_mgmt_test_db.erl
+++ b/deps/rabbitmq_management/test/src/rabbit_mgmt_test_db.erl
@@ -204,11 +204,11 @@ delete_ch(Name, Timestamp) ->
event(channel_closed, [{pid, pid_del(Name)}], Timestamp).
event(Type, Stats, Timestamp) ->
- gen_server:cast({global, rabbit_mgmt_db},
- {event, #event{type = Type,
- props = Stats,
- reference = none,
- timestamp = sec_to_triple(Timestamp)}}).
+ ok = gen_server:call(rabbit_mgmt_db,
+ {event, #event{type = Type,
+ props = Stats,
+ reference = none,
+ timestamp = sec_to_triple(Timestamp)}}).
sec_to_triple(Sec) -> {Sec div 1000000, Sec rem 1000000, 0}.
diff --git a/deps/rabbitmq_management/test/src/rabbit_mgmt_test_http.erl b/deps/rabbitmq_management/test/src/rabbit_mgmt_test_http.erl
index 123d5d4cc6..cf4fc8ef2a 100644
--- a/deps/rabbitmq_management/test/src/rabbit_mgmt_test_http.erl
+++ b/deps/rabbitmq_management/test/src/rabbit_mgmt_test_http.erl
@@ -520,21 +520,25 @@ get_conn(Username, Password) ->
[LocalPort]),
{Conn, ConnPath, ChPath, ConnChPath}.
-permissions_connection_channel_test() ->
+permissions_connection_channel_consumer_test() ->
PermArgs = [{configure, <<".*">>}, {write, <<".*">>}, {read, <<".*">>}],
http_put("/users/user", [{password, <<"user">>},
{tags, <<"management">>}], ?NO_CONTENT),
http_put("/permissions/%2f/user", PermArgs, ?NO_CONTENT),
http_put("/users/monitor", [{password, <<"monitor">>},
- {tags, <<"monitoring">>}], ?NO_CONTENT),
+ {tags, <<"monitoring">>}], ?NO_CONTENT),
http_put("/permissions/%2f/monitor", PermArgs, ?NO_CONTENT),
+ http_put("/queues/%2f/test", [], ?NO_CONTENT),
+
{Conn1, UserConn, UserCh, UserConnCh} = get_conn("user", "user"),
{Conn2, MonConn, MonCh, MonConnCh} = get_conn("monitor", "monitor"),
{Conn3, AdmConn, AdmCh, AdmConnCh} = get_conn("guest", "guest"),
- {ok, _Ch1} = amqp_connection:open_channel(Conn1),
- {ok, _Ch2} = amqp_connection:open_channel(Conn2),
- {ok, _Ch3} = amqp_connection:open_channel(Conn3),
-
+ {ok, Ch1} = amqp_connection:open_channel(Conn1),
+ {ok, Ch2} = amqp_connection:open_channel(Conn2),
+ {ok, Ch3} = amqp_connection:open_channel(Conn3),
+ [amqp_channel:subscribe(
+ Ch, #'basic.consume'{queue = <<"test">>}, self()) ||
+ Ch <- [Ch1, Ch2, Ch3]],
AssertLength = fun (Path, User, Len) ->
?assertEqual(Len,
length(http_get(Path, User, User, ?OK)))
@@ -543,7 +547,7 @@ permissions_connection_channel_test() ->
AssertLength(P, "user", 1),
AssertLength(P, "monitor", 3),
AssertLength(P, "guest", 3)
- end || P <- ["/connections", "/channels"]],
+ end || P <- ["/connections", "/channels", "/consumers", "/consumers/%2f"]],
AssertRead = fun(Path, UserStatus) ->
http_get(Path, "user", "user", UserStatus),
@@ -573,6 +577,22 @@ permissions_connection_channel_test() ->
http_delete("/users/monitor", ?NO_CONTENT),
http_get("/connections/foo", ?NOT_FOUND),
http_get("/channels/foo", ?NOT_FOUND),
+ http_delete("/queues/%2f/test", ?NO_CONTENT),
+ ok.
+
+consumers_test() ->
+ http_put("/queues/%2f/test", [], ?NO_CONTENT),
+ {Conn, _ConnPath, _ChPath, _ConnChPath} = get_conn("guest", "guest"),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ amqp_channel:subscribe(
+ Ch, #'basic.consume'{queue = <<"test">>,
+ no_ack = false,
+ consumer_tag = <<"my-ctag">> }, self()),
+ assert_list([[{exclusive, false},
+ {ack_required, true},
+ {consumer_tag, <<"my-ctag">>}]], http_get("/consumers")),
+ amqp_connection:close(Conn),
+ http_delete("/queues/%2f/test", ?NO_CONTENT),
ok.
unicode_test() ->
@@ -959,6 +979,15 @@ publish_fail_test() ->
{payload, [<<"not a string">>]},
{payload_encoding, <<"string">>}],
http_post("/exchanges/%2f/amq.default/publish", Msg3, ?BAD_REQUEST),
+ MsgTemplate = [{exchange, <<"">>},
+ {routing_key, <<"myqueue">>},
+ {payload, <<"Hello world">>},
+ {payload_encoding, <<"string">>}],
+ [http_post("/exchanges/%2f/amq.default/publish",
+ [{properties, [BadProp]} | MsgTemplate], ?BAD_REQUEST)
+ || BadProp <- [{priority, <<"really high">>},
+ {timestamp, <<"recently">>},
+ {expiration, 1234}]],
http_delete("/users/myuser", ?NO_CONTENT),
ok.