The start of a publish mechanism.

This commit is contained in:
Simon MacMullen 2011-02-23 17:59:34 +00:00
parent a59ecdcc4b
commit 1383290ef9
8 changed files with 129 additions and 19 deletions

View File

@ -95,6 +95,7 @@ form.inline-form-right { float: right; }
input, select { padding: 0.2em; }
input[type=text] { font: 1.1em Lucidatypewriter, Courier, monospace; }
input[type=text].wide { width: 300px; }
textarea { width: 600px; height: 200px; }
.mand { color: #f88; padding: 0 5px;}
table.form { margin-bottom: 0.5em; }

View File

@ -142,6 +142,10 @@ function dispatcher() {
go_to('#/exchanges');
return false;
});
this.post('#/exchanges/publish', function() {
publish_msg(this.params);
return false;
});
path('#/queues', {'queues': '/queues', 'vhosts': '/vhosts', 'nodes': '/nodes'}, 'queues');
this.get('#/queues/:vhost/:name', function() {
@ -536,6 +540,18 @@ function toggle_visibility(item) {
}
}
function publish_msg(params) {
var path = fill_path_template('/exchanges/:vhost/:name/publish', params);
with_req('POST', path, JSON.stringify(params), function(resp) {
var result = jQuery.parseJSON(resp.responseText);
if (result.routed) {
show_popup('info', 'Message published.');
} else {
show_popup('warn', 'Message published, but not routed.');
}
});
}
function get_msgs(params) {
var path = fill_path_template('/queues/:vhost/:name/get', params);
with_req('POST', path, JSON.stringify(params), function(resp) {

View File

@ -94,6 +94,30 @@
</div>
</div>
<div class="section-hidden">
<h2>Publish Message</h2>
<div class="hider">
<form action="#/exchanges/publish" method="post">
<input type="hidden" name="vhost" value="<%= exchange.vhost %>"/>
<input type="hidden" name="name" value="<%= fmt_exchange_url(exchange.name) %>"/>
<input type="hidden" name="properties" value=""/>
<table class="form">
<tr>
<th><label>Routing key:</label></th>
<td><input type="text" name="routing_key" value=""/></td>
</tr>
<tr>
<th><label>Payload:</label></th>
<td><textarea name="payload"></textarea></td>
</tr>
</table>
<input type="submit" value="Publish Message" />
</form>
<div id="msg-wrapper"></div>
<span class="br"></span>
</div>
</div>
<% if (exchange.name != "") { %>
<div class="section-hidden">
<h2>Delete This Exchange</h2>

View File

@ -30,6 +30,7 @@ dispatcher() ->
{["exchanges"], rabbit_mgmt_wm_exchanges, []},
{["exchanges", vhost], rabbit_mgmt_wm_exchanges, []},
{["exchanges", vhost, exchange], rabbit_mgmt_wm_exchange, []},
{["exchanges", vhost, exchange, "publish"], rabbit_mgmt_wm_exchange_publish, []},
{["exchanges", vhost, exchange, "bindings", "source"], rabbit_mgmt_wm_bindings, [exchange_source]},
{["exchanges", vhost, exchange, "bindings", "destination"], rabbit_mgmt_wm_bindings, [exchange_destination]},
{["queues"], rabbit_mgmt_wm_queues, []},

View File

@ -20,12 +20,12 @@
-export([is_authorized_vhost/2, is_authorized/3, is_authorized_user/3]).
-export([bad_request/3, id/2, parse_bool/1, parse_int/1]).
-export([with_decode/4, with_decode_opts/4, not_found/3, amqp_request/4]).
-export([with_amqp_request/4, with_amqp_request/5]).
-export([with_channel/4, with_channel/5]).
-export([props_to_method/2]).
-export([all_or_one_vhost/2, http_to_amqp/5, reply/3, filter_vhost/3]).
-export([filter_user/3, with_decode/5, redirect/2, args/1]).
-export([reply_list/3, reply_list/4, sort_list/4, destination_type/1]).
-export([relativise/2]).
-export([relativise/2, post_respond/3]).
-include("rabbit_mgmt.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
@ -317,18 +317,18 @@ amqp_request(VHost, ReqData, Context, Method) ->
amqp_request(VHost, ReqData, Context, node(), Method).
amqp_request(VHost, ReqData, Context, Node, Method) ->
with_amqp_request(VHost, ReqData, Context, Node,
with_channel(VHost, ReqData, Context, Node,
fun (Ch) ->
amqp_channel:call(Ch, Method),
{true, ReqData, Context}
end).
with_amqp_request(VHost, ReqData, Context, Fun) ->
with_amqp_request(VHost, ReqData, Context, node(), Fun).
with_channel(VHost, ReqData, Context, Fun) ->
with_channel(VHost, ReqData, Context, node(), Fun).
with_amqp_request(VHost, ReqData,
Context = #context{ user = #user { username = Username },
password = Password }, Node, Fun) ->
with_channel(VHost, ReqData,
Context = #context{ user = #user { username = Username },
password = Password }, Node, Fun) ->
try
Params = #amqp_params{username = Username,
password = Password,
@ -404,4 +404,8 @@ relativise0(From, To) ->
relativise(From, To, Diff) ->
string:join(lists:duplicate(length(From) - Diff, "..") ++ To, "/").
post_respond(Response, ReqData, Context) ->
{JSON, _, _} = reply(Response, ReqData, Context),
{true, wrq:set_resp_header(
"content-type", "application/json",
wrq:append_to_response_body(JSON, ReqData)), Context}.

View File

@ -17,7 +17,7 @@
-export([init/1, resource_exists/2, to_json/2,
content_types_provided/2, content_types_accepted/2,
is_authorized/2, allowed_methods/2, accept_content/2,
delete_resource/2, exchange/2]).
delete_resource/2, exchange/1, exchange/2]).
-include("rabbit_mgmt.hrl").
-include_lib("webmachine/include/webmachine.hrl").

View File

@ -0,0 +1,70 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ Management Plugin.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
-module(rabbit_mgmt_wm_exchange_publish).
-export([init/1, resource_exists/2, post_is_create/2, is_authorized/2,
allowed_methods/2, process_post/2]).
-include("rabbit_mgmt.hrl").
-include_lib("webmachine/include/webmachine.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
%%--------------------------------------------------------------------
init(_Config) -> {ok, #context{}}.
allowed_methods(ReqData, Context) ->
{['POST'], ReqData, Context}.
resource_exists(ReqData, Context) ->
{case rabbit_mgmt_wm_exchange:exchange(ReqData) of
not_found -> false;
_ -> true
end, ReqData, Context}.
post_is_create(ReqData, Context) ->
{false, ReqData, Context}.
process_post(ReqData, Context) ->
VHost = rabbit_mgmt_util:vhost(ReqData),
X = rabbit_mgmt_util:id(exchange, ReqData),
rabbit_mgmt_util:with_decode(
[routing_key, properties, payload], ReqData, Context,
fun([RoutingKey, Properties, Payload]) ->
rabbit_mgmt_util:with_channel(
VHost, ReqData, Context,
fun (Ch) ->
amqp_channel:register_confirm_handler(Ch, self()),
amqp_channel:register_return_handler(Ch, self()),
amqp_channel:call(Ch, #'confirm.select'{}),
Props = rabbit_mgmt_format:to_amqp_table(Properties),
amqp_channel:cast(Ch, #'basic.publish'{
exchange = X,
routing_key = RoutingKey,
mandatory = true},
#amqp_msg{props = Props,
payload = Payload}),
Routed = receive
{#'basic.return'{}, _} -> false;
#'basic.ack'{} -> true
end,
rabbit_mgmt_util:post_respond(
[{routed, Routed}], ReqData, Context)
end)
end).
is_authorized(ReqData, Context) ->
rabbit_mgmt_util:is_authorized_vhost(ReqData, Context).
%%--------------------------------------------------------------------

View File

@ -43,13 +43,13 @@ process_post(ReqData, Context) ->
rabbit_mgmt_util:with_decode(
[requeue, count], ReqData, Context,
fun([RequeueBin, CountBin]) ->
rabbit_mgmt_util:with_amqp_request(
rabbit_mgmt_util:with_channel(
VHost, ReqData, Context,
fun (Ch) ->
NoAck = not rabbit_mgmt_util:parse_bool(RequeueBin),
Count = rabbit_mgmt_util:parse_int(CountBin),
post_respond(basic_gets(Count, Ch, Q, NoAck),
ReqData, Context)
rabbit_mgmt_util:post_respond(
basic_gets(Count, Ch, Q, NoAck), ReqData, Context)
end)
end).
@ -90,12 +90,6 @@ basic_get(Ch, Q, NoAck) ->
none
end.
post_respond(Response, ReqData, Context) ->
{JSON, _, _} = rabbit_mgmt_util:reply(Response, ReqData, Context),
{true, wrq:set_resp_header(
"content-type", "application/json",
wrq:append_to_response_body(JSON, ReqData)), Context}.
is_authorized(ReqData, Context) ->
rabbit_mgmt_util:is_authorized_vhost(ReqData, Context).