This commit is contained in:
Emile Joubert 2011-12-01 12:51:02 +00:00
parent 4f304d6f8a
commit bada7600e5
2 changed files with 14 additions and 14 deletions

View File

@ -357,8 +357,8 @@ cancel_subscription({ok, ConsumerTag, Description}, Frame,
maybe_delete_durable_sub(DestHdr, Frame, State = #state{channel = Channel}) -> maybe_delete_durable_sub(DestHdr, Frame, State = #state{channel = Channel}) ->
case rabbit_stomp_util:parse_destination(DestHdr) of case rabbit_stomp_util:parse_destination(DestHdr) of
{ok, {topic, Name}} -> {ok, {topic, Name}} ->
case rabbit_stomp_frame:boolean_header case rabbit_stomp_frame:boolean_header(Frame,
(Frame, ?HEADER_PERSISTENT, false) of ?HEADER_PERSISTENT, false) of
true -> true ->
{ok, Id} = rabbit_stomp_frame:header(Frame, ?HEADER_ID), {ok, Id} = rabbit_stomp_frame:header(Frame, ?HEADER_ID),
QName = QName =
@ -442,8 +442,8 @@ do_login(Username0, Password0, VirtualHost0, Heartbeat, AdapterInfo,
ensure_heartbeats(Heartbeat, State), ensure_heartbeats(Heartbeat, State),
ok("CONNECTED", ok("CONNECTED",
[{?HEADER_SESSION, SessionId}, [{?HEADER_SESSION, SessionId},
{?HEADER_HEART_BEAT, io_lib:format("~B,~B", {?HEADER_HEART_BEAT,
[SendTimeout, ReceiveTimeout])}, io_lib:format("~B,~B", [SendTimeout, ReceiveTimeout])},
{?HEADER_VERSION, Version}], {?HEADER_VERSION, Version}],
"", "",
State1#state{session_id = SessionId, State1#state{session_id = SessionId,
@ -589,11 +589,9 @@ create_nack_method(DeliveryTag, #subscription{multi_ack = IsMulti}) ->
multiple = IsMulti}. multiple = IsMulti}.
negotiate_version(Frame) -> negotiate_version(Frame) ->
ClientVers = re:split( ClientVers = re:split(rabbit_stomp_frame:header(
rabbit_stomp_frame:header(Frame, ?HEADER_ACCEPT_VERSION, Frame, ?HEADER_ACCEPT_VERSION, "1.0"),
"1.0"), ",", [{return, list}]),
",",
[{return, list}]),
rabbit_stomp_util:negotiate_version(ClientVers, ?SUPPORTED_VERSIONS). rabbit_stomp_util:negotiate_version(ClientVers, ?SUPPORTED_VERSIONS).
@ -654,8 +652,9 @@ ensure_reply_to(Frame = #stomp_frame{headers = Headers}, State) ->
{ReplyQueue, State1} = {ReplyQueue, State1} =
ensure_reply_queue(TempQueueId, State), ensure_reply_queue(TempQueueId, State),
{Frame#stomp_frame{ {Frame#stomp_frame{
headers = lists:keyreplace(?HEADER_REPLY_TO, 1, Headers, headers = lists:keyreplace(
{?HEADER_REPLY_TO, ReplyQueue})}, ?HEADER_REPLY_TO, 1, Headers,
{?HEADER_REPLY_TO, ReplyQueue})},
State1}; State1};
_ -> _ ->
{Frame, State} {Frame, State}
@ -894,8 +893,8 @@ ensure_queue(subscribe, {topic, Name}, Frame, Channel) ->
%% subscriptions. Durable subscriptions get shared, named, durable %% subscriptions. Durable subscriptions get shared, named, durable
%% queues. %% queues.
Method = Method =
case rabbit_stomp_frame:boolean_header case rabbit_stomp_frame:boolean_header(Frame,
(Frame, ?HEADER_PERSISTENT, false) of ?HEADER_PERSISTENT, false) of
true -> true ->
{ok, Id} = rabbit_stomp_frame:header(Frame, ?HEADER_ID), {ok, Id} = rabbit_stomp_frame:header(Frame, ?HEADER_ID),
QName = rabbit_stomp_util:durable_subscription_queue(Name, Id), QName = rabbit_stomp_util:durable_subscription_queue(Name, Id),

View File

@ -55,7 +55,8 @@ consumer_tag(Frame) ->
not_found -> not_found ->
case rabbit_stomp_frame:header(Frame, ?HEADER_DESTINATION) of case rabbit_stomp_frame:header(Frame, ?HEADER_DESTINATION) of
{ok, DestHdr} -> {ok, DestHdr} ->
{ok, list_to_binary("Q_" ++ DestHdr), "destination='" ++ DestHdr ++ "'"}; {ok, list_to_binary("Q_" ++ DestHdr),
"destination='" ++ DestHdr ++ "'"};
not_found -> not_found ->
{error, missing_destination_header} {error, missing_destination_header}
end end