From 7f4cf918135e668bf95b78be6e97f1f737512d3a Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Tue, 9 Nov 2010 14:12:59 +0000 Subject: [PATCH] Reworked ack handling to be inline with the 1.1 spec. Re-introduced the prefetch-count header to make ACK more useful --- .../src/rabbit_stomp_processor.erl | 40 +++++++---- deps/rabbitmq_stomp/src/rabbit_stomp_util.erl | 5 +- deps/rabbitmq_stomp/test/ack.py | 66 ++++++++++++++++--- .../test/rabbit_stomp_test_util.erl | 10 ++- 4 files changed, 96 insertions(+), 25 deletions(-) diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl index a61a961554..ced348f6d2 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl @@ -41,7 +41,10 @@ -record(state, {socket, session_id, channel, connection, subscriptions, version}). +-record(subscription, {dest_hdr, channel, ack_mode, multi_ack}). + -define(SUPPORTED_VERSIONS, ["1.0", "1.1"]). +-define(DEFAULT_QUEUE_PREFETCH, 1). %%---------------------------------------------------------------------------- %% Public API @@ -156,10 +159,12 @@ handle_frame("ACK", Frame, State = #state{session_id = SessionId, {ok, IdStr} -> case rabbit_stomp_util:parse_message_id(IdStr) of {ok, {ConsumerTag, SessionId, DeliveryTag}} -> - {_DestHdr, SubChannel} = dict:fetch(ConsumerTag, Subs), + #subscription{channel = SubChannel, + multi_ack = IsMulti} = + dict:fetch(ConsumerTag, Subs), Method = #'basic.ack'{delivery_tag = DeliveryTag, - multiple = false}, + multiple = IsMulti}, case transactional(Frame) of {yes, Transaction} -> @@ -250,18 +255,22 @@ do_subscribe(Destination, DestHdr, Frame, connection = Connection, channel = MainChannel}) -> - Channel = case Destination of - {queue, _} -> + Prefetch = rabbit_stomp_frame:integer_header(Frame, "prefetch-count", + default_prefetch(Destination)), + + Channel = case Prefetch of + undefined -> + MainChannel; + _ -> {ok, Channel1} = amqp_connection:open_channel(Connection), amqp_channel:call(Channel1, #'basic.qos'{prefetch_size = 0, - prefetch_count = 1, + prefetch_count = Prefetch, global = false}), - Channel1; - _ -> MainChannel + Channel1 end, - AckMode = rabbit_stomp_util:ack_mode(Frame), + {AckMode, IsMulti} = rabbit_stomp_util:ack_mode(Frame), {ok, Queue} = ensure_queue(subscribe, Destination, Channel), @@ -281,7 +290,12 @@ do_subscribe(Destination, DestHdr, Frame, {noreply, State#state{subscriptions = - dict:store(ConsumerTag, {DestHdr, Channel}, Subs)}}. + dict:store(ConsumerTag, + #subscription{dest_hdr = DestHdr, + channel = Channel, + ack_mode = AckMode, + multi_ack = IsMulti}, + Subs)}}. do_send(Destination, _DestHdr, Frame = #stomp_frame{body_iolist = BodyFragments}, @@ -325,7 +339,7 @@ send_delivery(Delivery = #'basic.deliver'{consumer_tag = ConsumerTag}, Properties, Body, State = #state{session_id = SessionId, subscriptions = Subs}) -> - {Destination, _SubChannel} = dict:fetch(ConsumerTag, Subs), + #subscription{dest_hdr = Destination} = dict:fetch(ConsumerTag, Subs), send_frame( "MESSAGE", @@ -363,7 +377,7 @@ shutdown_channel_and_connection(State = #state{channel = Channel, connection = Connection, subscriptions = Subs}) -> dict:fold( - fun(_ConsumerTag, {_DestHdr, SubChannel}, Acc) -> + fun(_ConsumerTag, #subscription{channel = SubChannel}, Acc) -> case SubChannel of Channel -> Acc; _ -> @@ -376,6 +390,10 @@ shutdown_channel_and_connection(State = #state{channel = Channel, amqp_connection:close(Connection), State#state{channel = none, connection = none}. +default_prefetch({queue, _}) -> + ?DEFAULT_QUEUE_PREFETCH; +default_prefetch(_) -> + undefined. %%---------------------------------------------------------------------------- %% Transaction Support diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl index e638afa2f3..f68fa75aea 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl @@ -65,8 +65,9 @@ consumer_tag(Frame) -> ack_mode(Frame) -> case rabbit_stomp_frame:header(Frame, "ack", "auto") of - "auto" -> auto; - "client" -> client + "auto" -> {auto, false}; + "client" -> {client, true}; + "client-individual" -> {client, false} end. message_properties(Frame = #stomp_frame{headers = Headers}) -> diff --git a/deps/rabbitmq_stomp/test/ack.py b/deps/rabbitmq_stomp/test/ack.py index 11958ae495..c165e5fb6b 100644 --- a/deps/rabbitmq_stomp/test/ack.py +++ b/deps/rabbitmq_stomp/test/ack.py @@ -9,11 +9,13 @@ class TestAck(base.BaseTest): d = "/queue/ack-test" # subscribe and send message - self.listener.reset() - self.conn.subscribe(destination=d, ack='client') - self.conn.send("test", destination=d) - self.assertTrue(self.listener.await(3), "initial message not received") - self.assertEquals(1, len(self.listener.messages)) + self.listener.reset(2) ## expecting 2 messages + self.conn.subscribe(destination=d, ack='client', + headers={'prefetch-count': '10'}) + self.conn.send("test1", destination=d) + self.conn.send("test2", destination=d) + self.assertTrue(self.listener.await(4), "initial message not received") + self.assertEquals(2, len(self.listener.messages)) # disconnect with no ack self.conn.disconnect() @@ -22,13 +24,15 @@ class TestAck(base.BaseTest): conn2 = self.create_connection() try: listener2 = base.WaitableListener() + listener2.reset(2) conn2.set_listener('', listener2) - conn2.subscribe(destination=d, ack='client') + conn2.subscribe(destination=d, ack='client', + headers={'prefetch-count': '10'}) self.assertTrue(listener2.await(), "message not received again") - self.assertEquals(1, len(listener2.messages)) + self.assertEquals(2, len(listener2.messages)) - # now ack - mid = listener2.messages[0]['headers']['message-id'] + # now ack only the last message - expecting cumulative behaviour + mid = listener2.messages[1]['headers']['message-id'] conn2.ack({'message-id':mid}) finally: conn2.stop() @@ -44,6 +48,50 @@ class TestAck(base.BaseTest): finally: conn3.stop() + def test_ack_client_individual(self): + d = "/queue/ack-test-individual" + + # subscribe and send message + self.listener.reset(2) ## expecting 2 messages + self.conn.subscribe(destination=d, ack='client-individual', + headers={'prefetch-count': '10'}) + self.conn.send("test1", destination=d) + self.conn.send("test2", destination=d) + self.assertTrue(self.listener.await(4), "initial message not received") + self.assertEquals(2, len(self.listener.messages)) + + # disconnect with no ack + self.conn.disconnect() + + # now reconnect + conn2 = self.create_connection() + try: + listener2 = base.WaitableListener() + listener2.reset(2) + conn2.set_listener('', listener2) + conn2.subscribe(destination=d, ack='client-individual', + headers={'prefetch-count': '10'}) + self.assertTrue(listener2.await(), "message not received again") + self.assertEquals(2, len(listener2.messages)) + + # now ack only the last message - expecting individual behaviour + mid = listener2.messages[1]['headers']['message-id'] + conn2.ack({'message-id':mid}) + finally: + conn2.stop() + + # now reconnect again, shouldn't see the message + conn3 = self.create_connection() + try: + listener3 = base.WaitableListener() + conn3.set_listener('', listener3) + conn3.subscribe(destination=d) + self.assertTrue(listener3.await(3), + "Expected to see a message. ACK not working?") + self.assertEquals("test1", listener3.messages[0]['message']) + finally: + conn3.stop() + def test_ack_client_tx(self): d = "/queue/ack-test-tx" diff --git a/deps/rabbitmq_stomp/test/rabbit_stomp_test_util.erl b/deps/rabbitmq_stomp/test/rabbit_stomp_test_util.erl index 06a4bde55c..dafaa9b893 100644 --- a/deps/rabbitmq_stomp/test/rabbit_stomp_test_util.erl +++ b/deps/rabbitmq_stomp/test/rabbit_stomp_test_util.erl @@ -134,15 +134,19 @@ negotiate_version_choice_duplicates_test() -> ack_mode_auto_test() -> Frame = #stomp_frame{headers = [{"ack", "auto"}]}, - auto = rabbit_stomp_util:ack_mode(Frame). + {auto, _} = rabbit_stomp_util:ack_mode(Frame). ack_mode_auto_default_test() -> Frame = #stomp_frame{headers = []}, - auto = rabbit_stomp_util:ack_mode(Frame). + {auto, _} = rabbit_stomp_util:ack_mode(Frame). ack_mode_client_test() -> Frame = #stomp_frame{headers = [{"ack", "client"}]}, - client = rabbit_stomp_util:ack_mode(Frame). + {client, true} = rabbit_stomp_util:ack_mode(Frame). + +ack_mode_client_individual_test() -> + Frame = #stomp_frame{headers = [{"ack", "client-individual"}]}, + {client, false} = rabbit_stomp_util:ack_mode(Frame). consumer_tag_id_test() -> Frame = #stomp_frame{headers = [{"id", "foo"}]},