merge bug19250 into default

This commit is contained in:
Tony Garnock-Jones 2008-10-30 14:10:55 +00:00
commit 44295f44d7
1 changed files with 42 additions and 23 deletions

View File

@ -572,29 +572,18 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
nowait = NoWait,
arguments = Arguments},
_, State = #ch{ virtual_host = VHostPath }) ->
%% FIXME: connection exception (!) on failure?? (see rule named "failure" in spec-XML)
%% FIXME: don't allow binding to internal exchanges - including the one named "" !
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
case rabbit_amqqueue:add_binding(QueueName, ExchangeName,
ActualRoutingKey, Arguments) of
{error, queue_not_found} ->
rabbit_misc:protocol_error(
not_found, "no ~s", [rabbit_misc:rs(QueueName)]);
{error, exchange_not_found} ->
rabbit_misc:protocol_error(
not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
{error, durability_settings_incompatible} ->
rabbit_misc:protocol_error(
not_allowed, "durability settings of ~s incompatible with ~s",
[rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]);
{ok, _BindingCount} ->
return_ok(State, NoWait, #'queue.bind_ok'{})
end;
arguments = Arguments}, _, State) ->
binding_action(fun rabbit_amqqueue:add_binding/4, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
NoWait, State);
handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
binding_action(fun rabbit_amqqueue:delete_binding/4, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
false, State);
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
@ -636,6 +625,36 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) ->
%% FIXME: connection exception (!) on failure??
%% (see rule named "failure" in spec-XML)
%% FIXME: don't allow binding to internal exchanges -
%% including the one named "" !
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
case Fun(QueueName, ExchangeName, ActualRoutingKey, Arguments) of
{error, queue_not_found} ->
rabbit_misc:protocol_error(
not_found, "no ~s", [rabbit_misc:rs(QueueName)]);
{error, exchange_not_found} ->
rabbit_misc:protocol_error(
not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
{error, binding_not_found} ->
rabbit_misc:protocol_error(
not_found, "no binding ~s between ~s and ~s",
[RoutingKey, rabbit_misc:rs(ExchangeName),
rabbit_misc:rs(QueueName)]);
{error, durability_settings_incompatible} ->
rabbit_misc:protocol_error(
not_allowed, "durability settings of ~s incompatible with ~s",
[rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]);
{ok, _BindingCount} ->
return_ok(State, NoWait, ReturnMethod)
end.
publish(Mandatory, Immediate, Message, QPids,
State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) ->
Handled = deliver(QPids, Mandatory, Immediate, TxnKey,