From c265a60a5b962cfa3fd21c313ea00f9d9dc99496 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Tue, 11 Jan 2011 11:25:44 +0000 Subject: [PATCH] restructured ACK code to extract the common parts for NACK --- .../src/rabbit_stomp_processor.erl | 72 +++++++++++-------- 1 file changed, 41 insertions(+), 31 deletions(-) diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl index 3b501b741e..e5586f25bc 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl @@ -188,38 +188,11 @@ handle_frame("UNSUBSCRIBE", Frame, State = #state{subscriptions = Subs}) -> handle_frame("SEND", Frame, State) -> with_destination("SEND", Frame, State, fun do_send/4); -handle_frame("ACK", Frame, State = #state{session_id = SessionId, - subscriptions = Subs}) -> - case rabbit_stomp_frame:header(Frame, "message-id") of - {ok, IdStr} -> - case rabbit_stomp_util:parse_message_id(IdStr) of - {ok, {ConsumerTag, SessionId, DeliveryTag}} -> - #subscription{channel = SubChannel, - multi_ack = IsMulti} = - dict:fetch(ConsumerTag, Subs), +handle_frame("ACK", Frame, State) -> + with_id_and_subscription("ACK", Frame, State, fun do_ack/4); - Method = #'basic.ack'{delivery_tag = DeliveryTag, - multiple = IsMulti}, - - case transactional(Frame) of - {yes, Transaction} -> - extend_transaction(Transaction, - {SubChannel, Method}, - State); - no -> - amqp_channel:call(SubChannel, Method), - ok(State) - end; - _ -> - error("Invalid message-id", - "ACK must include a valid 'message-id' header\n", - State) - end; - not_found -> - error("Missing message-id", - "ACK must include a 'message-id' header\n", - State) - end; +handle_frame("NACK", Frame, State) -> + ok; handle_frame("BEGIN", Frame, State) -> transactional_action(Frame, "BEGIN", fun begin_transaction/2, State); @@ -239,6 +212,27 @@ handle_frame(Command, _Frame, State) -> %% Internal helpers for processing frames callbacks %%---------------------------------------------------------------------------- +with_id_and_subscription(Command, Frame, + State = #state{subscriptions = Subs}, Fun) -> + case rabbit_stomp_frame:header(Frame, "message-id") of + {ok, IdStr} -> + case rabbit_stomp_util:parse_message_id(IdStr) of + {ok, MessageId = {ConsumerTag, _SessionId, _DeliveryTag}} -> + Subscription = dict:fetch(ConsumerTag, Subs), + Fun(MessageId, Subscription, Frame, State); + _ -> + error("Invalid message-id", + "~p must include a valid 'message-id' header\n", + [Command], + State) + end; + not_found -> + error("Missing message-id", + "~p must include a 'message-id' header\n", + [Command], + State) + end. + with_destination(Command, Frame, State, Fun) -> case rabbit_stomp_frame:header(Frame, "destination") of {ok, DestHdr} -> @@ -358,6 +352,22 @@ do_send(Destination, _DestHdr, ok(send_method(Method, Props, BodyFragments, State)) end. +do_ack({_ConsumerTag, _SessionId, DeliveryTag}, + #subscription{channel = SubChannel, multi_ack = IsMulti}, + Frame, State) -> + Method = #'basic.ack'{delivery_tag = DeliveryTag, + multiple = IsMulti}, + + case transactional(Frame) of + {yes, Transaction} -> + extend_transaction(Transaction, + {SubChannel, Method}, + State); + no -> + amqp_channel:call(SubChannel, Method), + ok(State) + end. + negotiate_version(Frame) -> ClientVers = re:split( rabbit_stomp_frame:header(Frame, "accept-version", "1.0"),