Reworked ack handling to be inline with the 1.1 spec. Re-introduced the prefetch-count header to make ACK more useful

This commit is contained in:
Rob Harrop 2010-11-09 14:12:59 +00:00
parent 043a1de112
commit 7f4cf91813
4 changed files with 96 additions and 25 deletions

View File

@ -41,7 +41,10 @@
-record(state, {socket, session_id, channel, connection, subscriptions,
version}).
-record(subscription, {dest_hdr, channel, ack_mode, multi_ack}).
-define(SUPPORTED_VERSIONS, ["1.0", "1.1"]).
-define(DEFAULT_QUEUE_PREFETCH, 1).
%%----------------------------------------------------------------------------
%% Public API
@ -156,10 +159,12 @@ handle_frame("ACK", Frame, State = #state{session_id = SessionId,
{ok, IdStr} ->
case rabbit_stomp_util:parse_message_id(IdStr) of
{ok, {ConsumerTag, SessionId, DeliveryTag}} ->
{_DestHdr, SubChannel} = dict:fetch(ConsumerTag, Subs),
#subscription{channel = SubChannel,
multi_ack = IsMulti} =
dict:fetch(ConsumerTag, Subs),
Method = #'basic.ack'{delivery_tag = DeliveryTag,
multiple = false},
multiple = IsMulti},
case transactional(Frame) of
{yes, Transaction} ->
@ -250,18 +255,22 @@ do_subscribe(Destination, DestHdr, Frame,
connection = Connection,
channel = MainChannel}) ->
Channel = case Destination of
{queue, _} ->
Prefetch = rabbit_stomp_frame:integer_header(Frame, "prefetch-count",
default_prefetch(Destination)),
Channel = case Prefetch of
undefined ->
MainChannel;
_ ->
{ok, Channel1} = amqp_connection:open_channel(Connection),
amqp_channel:call(Channel1,
#'basic.qos'{prefetch_size = 0,
prefetch_count = 1,
prefetch_count = Prefetch,
global = false}),
Channel1;
_ -> MainChannel
Channel1
end,
AckMode = rabbit_stomp_util:ack_mode(Frame),
{AckMode, IsMulti} = rabbit_stomp_util:ack_mode(Frame),
{ok, Queue} = ensure_queue(subscribe, Destination, Channel),
@ -281,7 +290,12 @@ do_subscribe(Destination, DestHdr, Frame,
{noreply,
State#state{subscriptions =
dict:store(ConsumerTag, {DestHdr, Channel}, Subs)}}.
dict:store(ConsumerTag,
#subscription{dest_hdr = DestHdr,
channel = Channel,
ack_mode = AckMode,
multi_ack = IsMulti},
Subs)}}.
do_send(Destination, _DestHdr,
Frame = #stomp_frame{body_iolist = BodyFragments},
@ -325,7 +339,7 @@ send_delivery(Delivery = #'basic.deliver'{consumer_tag = ConsumerTag},
Properties, Body,
State = #state{session_id = SessionId,
subscriptions = Subs}) ->
{Destination, _SubChannel} = dict:fetch(ConsumerTag, Subs),
#subscription{dest_hdr = Destination} = dict:fetch(ConsumerTag, Subs),
send_frame(
"MESSAGE",
@ -363,7 +377,7 @@ shutdown_channel_and_connection(State = #state{channel = Channel,
connection = Connection,
subscriptions = Subs}) ->
dict:fold(
fun(_ConsumerTag, {_DestHdr, SubChannel}, Acc) ->
fun(_ConsumerTag, #subscription{channel = SubChannel}, Acc) ->
case SubChannel of
Channel -> Acc;
_ ->
@ -376,6 +390,10 @@ shutdown_channel_and_connection(State = #state{channel = Channel,
amqp_connection:close(Connection),
State#state{channel = none, connection = none}.
default_prefetch({queue, _}) ->
?DEFAULT_QUEUE_PREFETCH;
default_prefetch(_) ->
undefined.
%%----------------------------------------------------------------------------
%% Transaction Support

View File

@ -65,8 +65,9 @@ consumer_tag(Frame) ->
ack_mode(Frame) ->
case rabbit_stomp_frame:header(Frame, "ack", "auto") of
"auto" -> auto;
"client" -> client
"auto" -> {auto, false};
"client" -> {client, true};
"client-individual" -> {client, false}
end.
message_properties(Frame = #stomp_frame{headers = Headers}) ->

View File

@ -9,11 +9,13 @@ class TestAck(base.BaseTest):
d = "/queue/ack-test"
# subscribe and send message
self.listener.reset()
self.conn.subscribe(destination=d, ack='client')
self.conn.send("test", destination=d)
self.assertTrue(self.listener.await(3), "initial message not received")
self.assertEquals(1, len(self.listener.messages))
self.listener.reset(2) ## expecting 2 messages
self.conn.subscribe(destination=d, ack='client',
headers={'prefetch-count': '10'})
self.conn.send("test1", destination=d)
self.conn.send("test2", destination=d)
self.assertTrue(self.listener.await(4), "initial message not received")
self.assertEquals(2, len(self.listener.messages))
# disconnect with no ack
self.conn.disconnect()
@ -22,13 +24,15 @@ class TestAck(base.BaseTest):
conn2 = self.create_connection()
try:
listener2 = base.WaitableListener()
listener2.reset(2)
conn2.set_listener('', listener2)
conn2.subscribe(destination=d, ack='client')
conn2.subscribe(destination=d, ack='client',
headers={'prefetch-count': '10'})
self.assertTrue(listener2.await(), "message not received again")
self.assertEquals(1, len(listener2.messages))
self.assertEquals(2, len(listener2.messages))
# now ack
mid = listener2.messages[0]['headers']['message-id']
# now ack only the last message - expecting cumulative behaviour
mid = listener2.messages[1]['headers']['message-id']
conn2.ack({'message-id':mid})
finally:
conn2.stop()
@ -44,6 +48,50 @@ class TestAck(base.BaseTest):
finally:
conn3.stop()
def test_ack_client_individual(self):
d = "/queue/ack-test-individual"
# subscribe and send message
self.listener.reset(2) ## expecting 2 messages
self.conn.subscribe(destination=d, ack='client-individual',
headers={'prefetch-count': '10'})
self.conn.send("test1", destination=d)
self.conn.send("test2", destination=d)
self.assertTrue(self.listener.await(4), "initial message not received")
self.assertEquals(2, len(self.listener.messages))
# disconnect with no ack
self.conn.disconnect()
# now reconnect
conn2 = self.create_connection()
try:
listener2 = base.WaitableListener()
listener2.reset(2)
conn2.set_listener('', listener2)
conn2.subscribe(destination=d, ack='client-individual',
headers={'prefetch-count': '10'})
self.assertTrue(listener2.await(), "message not received again")
self.assertEquals(2, len(listener2.messages))
# now ack only the last message - expecting individual behaviour
mid = listener2.messages[1]['headers']['message-id']
conn2.ack({'message-id':mid})
finally:
conn2.stop()
# now reconnect again, shouldn't see the message
conn3 = self.create_connection()
try:
listener3 = base.WaitableListener()
conn3.set_listener('', listener3)
conn3.subscribe(destination=d)
self.assertTrue(listener3.await(3),
"Expected to see a message. ACK not working?")
self.assertEquals("test1", listener3.messages[0]['message'])
finally:
conn3.stop()
def test_ack_client_tx(self):
d = "/queue/ack-test-tx"

View File

@ -134,15 +134,19 @@ negotiate_version_choice_duplicates_test() ->
ack_mode_auto_test() ->
Frame = #stomp_frame{headers = [{"ack", "auto"}]},
auto = rabbit_stomp_util:ack_mode(Frame).
{auto, _} = rabbit_stomp_util:ack_mode(Frame).
ack_mode_auto_default_test() ->
Frame = #stomp_frame{headers = []},
auto = rabbit_stomp_util:ack_mode(Frame).
{auto, _} = rabbit_stomp_util:ack_mode(Frame).
ack_mode_client_test() ->
Frame = #stomp_frame{headers = [{"ack", "client"}]},
client = rabbit_stomp_util:ack_mode(Frame).
{client, true} = rabbit_stomp_util:ack_mode(Frame).
ack_mode_client_individual_test() ->
Frame = #stomp_frame{headers = [{"ack", "client-individual"}]},
{client, false} = rabbit_stomp_util:ack_mode(Frame).
consumer_tag_id_test() ->
Frame = #stomp_frame{headers = [{"id", "foo"}]},