restructured ACK code to extract the common parts for NACK

This commit is contained in:
Rob Harrop 2011-01-11 11:25:44 +00:00
parent 2a9487d4fa
commit c265a60a5b
1 changed files with 41 additions and 31 deletions

View File

@ -188,38 +188,11 @@ handle_frame("UNSUBSCRIBE", Frame, State = #state{subscriptions = Subs}) ->
handle_frame("SEND", Frame, State) ->
with_destination("SEND", Frame, State, fun do_send/4);
handle_frame("ACK", Frame, State = #state{session_id = SessionId,
subscriptions = Subs}) ->
case rabbit_stomp_frame:header(Frame, "message-id") of
{ok, IdStr} ->
case rabbit_stomp_util:parse_message_id(IdStr) of
{ok, {ConsumerTag, SessionId, DeliveryTag}} ->
#subscription{channel = SubChannel,
multi_ack = IsMulti} =
dict:fetch(ConsumerTag, Subs),
handle_frame("ACK", Frame, State) ->
with_id_and_subscription("ACK", Frame, State, fun do_ack/4);
Method = #'basic.ack'{delivery_tag = DeliveryTag,
multiple = IsMulti},
case transactional(Frame) of
{yes, Transaction} ->
extend_transaction(Transaction,
{SubChannel, Method},
State);
no ->
amqp_channel:call(SubChannel, Method),
ok(State)
end;
_ ->
error("Invalid message-id",
"ACK must include a valid 'message-id' header\n",
State)
end;
not_found ->
error("Missing message-id",
"ACK must include a 'message-id' header\n",
State)
end;
handle_frame("NACK", Frame, State) ->
ok;
handle_frame("BEGIN", Frame, State) ->
transactional_action(Frame, "BEGIN", fun begin_transaction/2, State);
@ -239,6 +212,27 @@ handle_frame(Command, _Frame, State) ->
%% Internal helpers for processing frames callbacks
%%----------------------------------------------------------------------------
with_id_and_subscription(Command, Frame,
State = #state{subscriptions = Subs}, Fun) ->
case rabbit_stomp_frame:header(Frame, "message-id") of
{ok, IdStr} ->
case rabbit_stomp_util:parse_message_id(IdStr) of
{ok, MessageId = {ConsumerTag, _SessionId, _DeliveryTag}} ->
Subscription = dict:fetch(ConsumerTag, Subs),
Fun(MessageId, Subscription, Frame, State);
_ ->
error("Invalid message-id",
"~p must include a valid 'message-id' header\n",
[Command],
State)
end;
not_found ->
error("Missing message-id",
"~p must include a 'message-id' header\n",
[Command],
State)
end.
with_destination(Command, Frame, State, Fun) ->
case rabbit_stomp_frame:header(Frame, "destination") of
{ok, DestHdr} ->
@ -358,6 +352,22 @@ do_send(Destination, _DestHdr,
ok(send_method(Method, Props, BodyFragments, State))
end.
do_ack({_ConsumerTag, _SessionId, DeliveryTag},
#subscription{channel = SubChannel, multi_ack = IsMulti},
Frame, State) ->
Method = #'basic.ack'{delivery_tag = DeliveryTag,
multiple = IsMulti},
case transactional(Frame) of
{yes, Transaction} ->
extend_transaction(Transaction,
{SubChannel, Method},
State);
no ->
amqp_channel:call(SubChannel, Method),
ok(State)
end.
negotiate_version(Frame) ->
ClientVers = re:split(
rabbit_stomp_frame:header(Frame, "accept-version", "1.0"),