This is my attempt at HTTP messaging. There are many like it, but this one is mine.
This commit is contained in:
parent
f8acbe1334
commit
9e0cf7e6b3
|
|
@ -195,6 +195,10 @@ function fmt_idle_long(obj) {
|
|||
}
|
||||
}
|
||||
|
||||
function fmt_escape_html(txt) {
|
||||
return txt.replace(/</g, '<').replace(/>/g, '>');
|
||||
}
|
||||
|
||||
function alt_rows(i) {
|
||||
return (i % 2 == 0) ? ' class="alt1"' : ' class="alt2"';
|
||||
}
|
||||
|
|
|
|||
|
|
@ -167,7 +167,12 @@ function dispatcher() {
|
|||
}
|
||||
return false;
|
||||
});
|
||||
|
||||
this.post('#/queues/get', function() {
|
||||
var path = fill_path_template('/queues/:vhost/:name/get',
|
||||
this.params);
|
||||
get_msg(path);
|
||||
return false;
|
||||
});
|
||||
this.post('#/bindings', function() {
|
||||
if (sync_post(this, '/bindings/:vhost/e/:source/:destination_type/:destination'))
|
||||
update();
|
||||
|
|
@ -533,16 +538,30 @@ function toggle_visibility(item) {
|
|||
}
|
||||
}
|
||||
|
||||
function get_msg(path) {
|
||||
with_req('POST', path, function(resp) {
|
||||
var msg = jQuery.parseJSON(resp.responseText);
|
||||
$('#msg-wrapper').slideUp(200);
|
||||
replace_content('msg-wrapper', format('get-ok', {'msg': msg}));
|
||||
$('#msg-wrapper').slideDown(200);
|
||||
}, function(resp) {
|
||||
show_popup('info', 'Queue is empty');
|
||||
});
|
||||
}
|
||||
|
||||
function with_reqs(reqs, acc, fun) {
|
||||
if (keys(reqs).length > 0) {
|
||||
var key = keys(reqs)[0];
|
||||
with_req('../api' + reqs[key], function(resp) {
|
||||
with_req('GET', reqs[key], function(resp) {
|
||||
acc[key] = jQuery.parseJSON(resp.responseText);
|
||||
var remainder = {};
|
||||
for (var k in reqs) {
|
||||
if (k != key) remainder[k] = reqs[k];
|
||||
}
|
||||
with_reqs(remainder, acc, fun);
|
||||
}, function(resp) {
|
||||
var html = format('404', {});
|
||||
replace_content('main', html);
|
||||
});
|
||||
}
|
||||
else {
|
||||
|
|
@ -583,10 +602,10 @@ function update_status(status) {
|
|||
replace_content('status', html);
|
||||
}
|
||||
|
||||
function with_req(path, fun) {
|
||||
function with_req(method, path, fun, not_found_fun) {
|
||||
var json;
|
||||
var req = xmlHttpRequest();
|
||||
req.open( "GET", path, true );
|
||||
req.open(method, '../api' + path, true );
|
||||
req.onreadystatechange = function () {
|
||||
if (req.readyState == 4) {
|
||||
if (req.status == 200) {
|
||||
|
|
@ -603,8 +622,7 @@ function with_req(path, fun) {
|
|||
update_status('error');
|
||||
}
|
||||
else if (req.status == 404) {
|
||||
var html = format('404', {});
|
||||
replace_content('main', html);
|
||||
not_found_fun(req);
|
||||
}
|
||||
else {
|
||||
debug("Got response code " + req.status);
|
||||
|
|
|
|||
|
|
@ -123,6 +123,23 @@
|
|||
|
||||
<%= format('add-binding', {'mode': 'queue', 'parent': queue}) %>
|
||||
|
||||
<div class="section-hidden">
|
||||
<h2>Get Messages</h2>
|
||||
<div class="hider">
|
||||
<p>
|
||||
Warning: getting a message from a queue is a destructive action.
|
||||
Clicking "Get Message" will consume a message from the queue.
|
||||
</p>
|
||||
<form action="#/queues/get" method="post">
|
||||
<input type="hidden" name="vhost" value="<%= queue.vhost %>"/>
|
||||
<input type="hidden" name="name" value="<%= queue.name %>"/>
|
||||
<input type="submit" value="Get Message" />
|
||||
</form>
|
||||
<div id="msg-wrapper"></div>
|
||||
<span class="br"></span>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="section-hidden">
|
||||
<h2>Delete / Purge</h2>
|
||||
<div class="hider">
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ dispatcher() ->
|
|||
{["queues", vhost, queue], rabbit_mgmt_wm_queue, []},
|
||||
{["queues", vhost, destination, "bindings"], rabbit_mgmt_wm_bindings, [queue]},
|
||||
{["queues", vhost, queue, "contents"], rabbit_mgmt_wm_queue_purge, []},
|
||||
{["queues", vhost, queue, "get"], rabbit_mgmt_wm_queue_get, []},
|
||||
{["bindings"], rabbit_mgmt_wm_bindings, [all]},
|
||||
{["bindings", vhost], rabbit_mgmt_wm_bindings, [all]},
|
||||
{["bindings", vhost, "e", source, dtype, destination], rabbit_mgmt_wm_bindings, [source_destination]},
|
||||
|
|
|
|||
|
|
@ -20,9 +20,10 @@
|
|||
-export([node_and_pid/1, protocol/1, resource/1, permissions/1, queue/1]).
|
||||
-export([exchange/1, user/1, internal_user/1, binding/1, url/2]).
|
||||
-export([pack_binding_props/2, unpack_binding_props/1, tokenise/1]).
|
||||
-export([to_amqp_table/1, listener/1, properties/1]).
|
||||
-export([to_amqp_table/1, listener/1, properties/1, basic_properties/1]).
|
||||
|
||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
-include_lib("rabbit_common/include/rabbit_framing.hrl").
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
|
|
@ -70,9 +71,10 @@ properties(unknown) -> unknown;
|
|||
properties(Table) -> {struct, [{Name, tuple(Value)} ||
|
||||
{Name, Value} <- Table]}.
|
||||
|
||||
amqp_table(unknown) -> unknown;
|
||||
amqp_table(Table) -> {struct, [{Name, amqp_value(Type, Value)} ||
|
||||
{Name, Type, Value} <- Table]}.
|
||||
amqp_table(unknown) -> unknown;
|
||||
amqp_table(undefined) -> amqp_table([]);
|
||||
amqp_table(Table) -> {struct, [{Name, amqp_value(Type, Value)} ||
|
||||
{Name, Type, Value} <- Table]}.
|
||||
|
||||
amqp_value(array, Val) -> [amqp_value(T, V) || {T, V} <- Val];
|
||||
amqp_value(table, Val) -> amqp_table(Val);
|
||||
|
|
@ -247,3 +249,15 @@ binding(#binding{source = S,
|
|||
{properties_key, pack_binding_props(Key, Args)}],
|
||||
[{fun (Res) -> resource(source, Res) end, [source]},
|
||||
{fun amqp_table/1, [arguments]}]).
|
||||
|
||||
basic_properties(Props = #'P_basic'{}) ->
|
||||
{Res, _Idx} = lists:foldl(fun (K, {L, Idx}) ->
|
||||
V = element(Idx, Props),
|
||||
NewL = case V of
|
||||
undefined -> L;
|
||||
_ -> [{K, V}|L]
|
||||
end,
|
||||
{NewL, Idx + 1}
|
||||
end, {[], 2},
|
||||
record_info(fields, 'P_basic')),
|
||||
format(Res, [{fun amqp_table/1, [headers]}]).
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@
|
|||
-export([is_authorized_vhost/2, is_authorized/3, is_authorized_user/3]).
|
||||
-export([bad_request/3, id/2, parse_bool/1]).
|
||||
-export([with_decode/4, with_decode_opts/4, not_found/3, amqp_request/4]).
|
||||
-export([with_amqp_request/4, with_amqp_request/5]).
|
||||
-export([props_to_method/2]).
|
||||
-export([all_or_one_vhost/2, http_to_amqp/5, reply/3, filter_vhost/3]).
|
||||
-export([filter_user/3, with_decode/5, redirect/2, args/1]).
|
||||
|
|
@ -304,9 +305,19 @@ parse_bool(V) -> throw({error, {not_boolean, V}}).
|
|||
amqp_request(VHost, ReqData, Context, Method) ->
|
||||
amqp_request(VHost, ReqData, Context, node(), Method).
|
||||
|
||||
amqp_request(VHost, ReqData,
|
||||
Context = #context{ user = #user { username = Username },
|
||||
password = Password }, Node, Method) ->
|
||||
amqp_request(VHost, ReqData, Context, Node, Method) ->
|
||||
with_amqp_request(VHost, ReqData, Context, Node,
|
||||
fun (Ch) ->
|
||||
amqp_channel:call(Ch, Method),
|
||||
{true, ReqData, Context}
|
||||
end).
|
||||
|
||||
with_amqp_request(VHost, ReqData, Context, Fun) ->
|
||||
with_amqp_request(VHost, ReqData, Context, node(), Fun).
|
||||
|
||||
with_amqp_request(VHost, ReqData,
|
||||
Context = #context{ user = #user { username = Username },
|
||||
password = Password }, Node, Fun) ->
|
||||
try
|
||||
Params = #amqp_params{username = Username,
|
||||
password = Password,
|
||||
|
|
@ -315,10 +326,10 @@ amqp_request(VHost, ReqData,
|
|||
case amqp_connection:start(direct, Params) of
|
||||
{ok, Conn} ->
|
||||
{ok, Ch} = amqp_connection:open_channel(Conn),
|
||||
amqp_channel:call(Ch, Method),
|
||||
Res = Fun(Ch),
|
||||
amqp_channel:close(Ch),
|
||||
amqp_connection:close(Conn),
|
||||
{true, ReqData, Context};
|
||||
Res;
|
||||
{error, auth_failure} ->
|
||||
not_authorised(<<"">>, ReqData, Context);
|
||||
{error, {nodedown, N}} ->
|
||||
|
|
|
|||
|
|
@ -701,6 +701,32 @@ sorting_test() ->
|
|||
http_delete("/vhosts/vh1", ?NO_CONTENT),
|
||||
ok.
|
||||
|
||||
get_test() ->
|
||||
%% Real world example...
|
||||
Headers = [{<<"x-forwarding">>, array,
|
||||
[{table,
|
||||
[{<<"uri">>, longstr,
|
||||
<<"amqp://localhost/%2f/upstream">>}]}]}],
|
||||
http_put("/queues/%2f/myqueue", [], ?NO_CONTENT),
|
||||
{ok, Conn} = amqp_connection:start(network),
|
||||
{ok, Ch} = amqp_connection:open_channel(Conn),
|
||||
amqp_channel:cast(Ch, #'basic.publish'{exchange = <<>>,
|
||||
routing_key = <<"myqueue">>},
|
||||
#amqp_msg{props = #'P_basic'{headers = Headers},
|
||||
payload = <<"Hello world">>}),
|
||||
amqp_connection:close(Conn),
|
||||
Msg = http_post("/queues/%2f/myqueue/get", [], ?OK),
|
||||
|
||||
false = pget(redelivered, Msg),
|
||||
<<>> = pget(exchange, Msg),
|
||||
<<"myqueue">> = pget(routing_key, Msg),
|
||||
<<"Hello world">> = pget(payload, Msg),
|
||||
[{'x-forwarding',
|
||||
[[{uri,<<"amqp://localhost/%2f/upstream">>}]]}] =
|
||||
pget(headers, pget(properties, Msg)),
|
||||
http_post("/queues/%2f/myqueue/get", [], ?NOT_FOUND),
|
||||
ok.
|
||||
|
||||
%%---------------------------------------------------------------------------
|
||||
http_get(Path) ->
|
||||
http_get(Path, ?OK).
|
||||
|
|
|
|||
Loading…
Reference in New Issue