Use a readable queue name, and don't have it expire immediately.

This commit is contained in:
Simon MacMullen 2011-02-11 18:26:00 +00:00
parent 2b875b4c48
commit d2ae33229c
1 changed files with 11 additions and 5 deletions

View File

@ -69,7 +69,7 @@ validate(_X) ->
create(?TX, X = #exchange{ name = Downstream, arguments = Args }) ->
{array, UpstreamURIs0} =
rabbit_misc:table_lookup(Args, <<"upstreams">>),
UpstreamURIs = [binary_to_list(U) || {longstr, U} <- UpstreamURIs0],
UpstreamURIs = [U || {longstr, U} <- UpstreamURIs0],
{longstr, Type} = rabbit_misc:table_lookup(Args, <<"type">>),
{ok, Module} = rabbit_registry:lookup_module(
exchange, rabbit_exchange:check_type(Type)),
@ -192,8 +192,8 @@ terminate(_Reason, #state { downstream_channel = DCh,
connect_upstream(UpstreamURI) ->
Props0 = uri_parser:parse(
UpstreamURI, [{host, undefined}, {path, "/"},
{port, undefined}, {'query', []}]),
binary_to_list(UpstreamURI), [{host, undefined}, {path, "/"},
{port, undefined}, {'query', []}]),
[VHostEnc, XEnc] = string:tokens(
proplists:get_value(path, Props0), "/"),
VHost = httpd_util:decode_hex(VHostEnc),
@ -203,8 +203,14 @@ connect_upstream(UpstreamURI) ->
virtual_host = list_to_binary(VHost)},
{ok, Conn} = amqp_connection:start(network, Params),
{ok, Ch} = amqp_connection:open_channel(Conn),
#'queue.declare_ok' {queue = Q} =
amqp_channel:call(Ch, #'queue.declare'{ exclusive = true}),
XBin = list_to_binary(X),
%% TODO: this should really be our own URI. And the x-expires should be
%% configurable.
Node = list_to_binary(atom_to_list(node())),
Q = <<"federation: ", XBin/binary, " -> ", Node/binary>>,
amqp_channel:call(
Ch, #'queue.declare'{
queue = Q, arguments = [{<<"x-expires">>, long, 86400000}] }),
amqp_channel:subscribe(Ch, #'basic.consume'{ queue = Q,
no_ack = true }, %% FIXME
self()),