Merge bug25466

This commit is contained in:
Simon MacMullen 2013-03-07 15:04:19 +00:00
commit 412e594fa0
2 changed files with 79 additions and 52 deletions

View File

@ -443,7 +443,17 @@ with_destination(Command, Frame, State, Fun) ->
{ok, DestHdr} -> {ok, DestHdr} ->
case rabbit_routing_util:parse_endpoint(DestHdr) of case rabbit_routing_util:parse_endpoint(DestHdr) of
{ok, Destination} -> {ok, Destination} ->
Fun(Destination, DestHdr, Frame, State); case Fun(Destination, DestHdr, Frame, State) of
{error, invalid_endpoint} ->
error("Invalid destination",
"'~s' is not a valid destination for '~s'~n",
[DestHdr, Command],
State);
{error, Reason} ->
throw(Reason);
Result ->
Result
end;
{error, {invalid_destination, Type, Content}} -> {error, {invalid_destination, Type, Content}} ->
error("Invalid destination", error("Invalid destination",
"'~s' is not a valid ~p destination~n", "'~s' is not a valid ~p destination~n",
@ -540,66 +550,75 @@ do_subscribe(Destination, DestHdr, Frame,
global = false}), global = false}),
Channel1 Channel1
end, end,
{AckMode, IsMulti} = rabbit_stomp_util:ack_mode(Frame), {AckMode, IsMulti} = rabbit_stomp_util:ack_mode(Frame),
case ensure_endpoint(source, Destination, Frame, Channel, RouteState) of
{ok, Queue, RouteState1} = {ok, Queue, RouteState1} ->
ensure_endpoint(source, Destination, Frame, Channel, RouteState), {ok, ConsumerTag, Description} =
rabbit_stomp_util:consumer_tag(Frame),
{ok, ConsumerTag, Description} = rabbit_stomp_util:consumer_tag(Frame), amqp_channel:subscribe(Channel,
#'basic.consume'{
amqp_channel:subscribe(Channel, queue = Queue,
#'basic.consume'{ consumer_tag = ConsumerTag,
queue = Queue, no_local = false,
consumer_tag = ConsumerTag, no_ack = (AckMode == auto),
no_local = false, exclusive = false},
no_ack = (AckMode == auto), self()),
exclusive = false}, ExchangeAndKey = rabbit_routing_util:parse_routing(Destination),
self()), ok = rabbit_routing_util:ensure_binding(
ExchangeAndKey = rabbit_routing_util:parse_routing(Destination), Queue, ExchangeAndKey, Channel),
ok = rabbit_routing_util:ensure_binding(Queue, ExchangeAndKey, Channel), ok(State#state{subscriptions =
dict:store(
ok(State#state{subscriptions = ConsumerTag,
dict:store(ConsumerTag, #subscription{dest_hdr = DestHdr,
#subscription{dest_hdr = DestHdr, channel = Channel,
channel = Channel, ack_mode = AckMode,
ack_mode = AckMode, multi_ack = IsMulti,
multi_ack = IsMulti, description = Description},
description = Description}, Subs),
Subs), route_state = RouteState1});
route_state = RouteState1}). {error, _} = Err ->
Err
end.
do_send(Destination, _DestHdr, do_send(Destination, _DestHdr,
Frame = #stomp_frame{body_iolist = BodyFragments}, Frame = #stomp_frame{body_iolist = BodyFragments},
State = #state{channel = Channel, route_state = RouteState}) -> State = #state{channel = Channel, route_state = RouteState}) ->
{ok, _Q, RouteState1} = ensure_endpoint(dest, Destination, Frame, Channel,
RouteState),
{Frame1, State1} = case ensure_endpoint(dest, Destination, Frame, Channel, RouteState) of
ensure_reply_to(Frame, State#state{route_state = RouteState1}),
Props = rabbit_stomp_util:message_properties(Frame1), {ok, _Q, RouteState1} ->
{Exchange, RoutingKey} = {Frame1, State1} =
rabbit_routing_util:parse_routing(Destination), ensure_reply_to(Frame, State#state{route_state = RouteState1}),
Method = #'basic.publish'{ Props = rabbit_stomp_util:message_properties(Frame1),
exchange = list_to_binary(Exchange),
routing_key = list_to_binary(RoutingKey),
mandatory = false,
immediate = false},
case transactional(Frame1) of {Exchange, RoutingKey} =
{yes, Transaction} -> rabbit_routing_util:parse_routing(Destination),
extend_transaction(Transaction,
fun(StateN) -> Method = #'basic.publish'{
maybe_record_receipt(Frame1, StateN) exchange = list_to_binary(Exchange),
end, routing_key = list_to_binary(RoutingKey),
{Method, Props, BodyFragments}, mandatory = false,
State1); immediate = false},
no ->
ok(send_method(Method, Props, BodyFragments, case transactional(Frame1) of
maybe_record_receipt(Frame1, State1))) {yes, Transaction} ->
extend_transaction(
Transaction,
fun(StateN) ->
maybe_record_receipt(Frame1, StateN)
end,
{Method, Props, BodyFragments},
State1);
no ->
ok(send_method(Method, Props, BodyFragments,
maybe_record_receipt(Frame1, State1)))
end;
{error, _} = Err ->
Err
end. end.
create_ack_method(DeliveryTag, #subscription{multi_ack = IsMulti}) -> create_ack_method(DeliveryTag, #subscription{multi_ack = IsMulti}) ->

View File

@ -35,7 +35,8 @@ all_tests() ->
fun test_subscribe_ack/3, fun test_subscribe_ack/3,
fun test_send/3, fun test_send/3,
fun test_delete_queue_subscribe/3, fun test_delete_queue_subscribe/3,
fun test_temp_destination_queue/3]] fun test_temp_destination_queue/3,
fun test_temp_destination_in_send/3]]
|| Version <- ?SUPPORTED_VERSIONS], || Version <- ?SUPPORTED_VERSIONS],
ok. ok.
@ -200,6 +201,13 @@ test_temp_destination_queue(Channel, Client, _Version) ->
{ok, _Client1, _, [<<"pong">>]} = stomp_receive(Client, "MESSAGE"), {ok, _Client1, _, [<<"pong">>]} = stomp_receive(Client, "MESSAGE"),
ok. ok.
test_temp_destination_in_send(Channel, Client, _Version) ->
rabbit_stomp_client:send( Client, "SEND", [{"destination", "/temp-queue/foo"}],
["poing"]),
{ok, _Client1, Hdrs, _} = stomp_receive(Client, "ERROR"),
"Invalid destination" = proplists:get_value("message", Hdrs),
ok.
stomp_receive(Client, Command) -> stomp_receive(Client, Command) ->
{#stomp_frame{command = Command, {#stomp_frame{command = Command,
headers = Hdrs, headers = Hdrs,