diff --git a/deps/rabbitmq_stomp/deps/stomppy/rabbit.patch b/deps/rabbitmq_stomp/deps/stomppy/rabbit.patch index 838b7667df..d2f9ffd857 100644 --- a/deps/rabbitmq_stomp/deps/stomppy/rabbit.patch +++ b/deps/rabbitmq_stomp/deps/stomppy/rabbit.patch @@ -1,6 +1,6 @@ diff -r 16a4000624a7 stomp/connect.py --- a/stomp/connect.py Sun May 02 18:15:34 2010 +0100 -+++ b/stomp/connect.py Wed Jun 08 13:20:12 2011 +0100 ++++ b/stomp/connect.py Mon Aug 01 17:25:05 2011 +0100 @@ -88,7 +88,10 @@ ssl_key_file = None, ssl_cert_file = None, @@ -57,15 +57,34 @@ diff -r 16a4000624a7 stomp/connect.py if self.__socket is not None: if self.__ssl: # -@@ -403,7 +429,6 @@ +@@ -390,20 +416,20 @@ + # + try: + self.__socket = self.__socket.unwrap() +- except Exception: ++ except Exception as e: + # + # unwrap seems flaky on Win with the backported ssl mod, so catch any exception and log it + # +- _, e, _ = sys.exc_info() +- log.warn(e) ++ log.warning("socket unwrap() threw exception: %s" % e) + elif hasattr(socket, 'SHUT_RDWR'): + self.__socket.shutdown(socket.SHUT_RDWR) # - if self.__socket is not None: +- # split this into a separate check, because sometimes the socket is nulled between shutdown and this call ++ # caution, because sometimes the socket is nulled between shutdown and this call + # +- if self.__socket is not None: ++ try: self.__socket.close() - self.__current_host_and_port = None ++ except Exception as e: ++ log.warning("socket close() threw exception: %s" % e) def __convert_dict(self, payload): """ -@@ -449,6 +474,9 @@ +@@ -449,6 +475,9 @@ raise KeyError("Command %s requires header %r" % (command, required_header_key)) self.__send_frame(command, headers, payload) @@ -75,7 +94,7 @@ diff -r 16a4000624a7 stomp/connect.py def __send_frame(self, command, headers={}, payload=''): """ Send a STOMP frame. -@@ -680,4 +708,4 @@ +@@ -680,4 +709,4 @@ sleep_exp += 1 if not self.__socket: diff --git a/deps/rabbitmq_stomp/package.mk b/deps/rabbitmq_stomp/package.mk index 457f3793a6..96b3e37bb2 100644 --- a/deps/rabbitmq_stomp/package.mk +++ b/deps/rabbitmq_stomp/package.mk @@ -2,6 +2,7 @@ RELEASABLE:=true DEPS:=rabbitmq-server rabbitmq-erlang-client STANDALONE_TEST_COMMANDS:=eunit:test([rabbit_stomp_test_util,rabbit_stomp_test_frame],[verbose]) WITH_BROKER_TEST_SCRIPTS:=$(PACKAGE_DIR)/test/src/test.py $(PACKAGE_DIR)/test/src/test_connect_options.py +WITH_BROKER_TEST_COMMANDS:=rabbit_stomp_amqqueue_test:all_tests() RABBITMQ_TEST_PATH=$(PACKAGE_DIR)/../../rabbitmq-test ABS_PACKAGE_DIR:=$(abspath $(PACKAGE_DIR)) diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl index 364121127c..ac70f1e7f4 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl @@ -42,7 +42,7 @@ -record(state, {socket, session_id, channel, connection, subscriptions, version, start_heartbeat_fun, pending_receipts, - config}). + config, reply_queues}). -record(subscription, {dest_hdr, channel, multi_ack, description}). @@ -76,7 +76,8 @@ init([Sock, StartHeartbeatFun, Configuration]) -> version = none, start_heartbeat_fun = StartHeartbeatFun, pending_receipts = undefined, - config = Configuration}, + config = Configuration, + reply_queues = dict:new()}, hibernate, {backoff, 1000, 1000, 10000} }. @@ -108,7 +109,10 @@ handle_cast(_Request, State = #state{channel = none, handle_cast({Command, Frame}, State) -> process_request( fun(StateN) -> - handle_frame(Command, Frame, StateN) + case validate_frame(Command, Frame, StateN) of + R = {error, _, _, _} -> R; + _ -> handle_frame(Command, Frame, StateN) + end end, fun(StateM) -> ensure_receipt(Frame, StateM) @@ -200,6 +204,23 @@ process_connect(Implicit, fun(StateM) -> StateM end, State). +%%---------------------------------------------------------------------------- +%% Frame Validation +%%---------------------------------------------------------------------------- + +validate_frame(Command, Frame, State) + when Command =:= "SUBSCRIBE" orelse Command =:= "UNSUBSCRIBE" -> + Hdr = fun(Name) -> rabbit_stomp_frame:header(Frame, Name) end, + case {Hdr("persistent"), Hdr("id")} of + {{ok, "true"}, not_found} -> + error("Missing Header", + "Header 'id' is required for durable subscriptions", State); + _ -> + ok(State) + end; +validate_frame(_Command, _Frame, State) -> + ok(State). + %%---------------------------------------------------------------------------- %% Frame handlers %%---------------------------------------------------------------------------- @@ -213,7 +234,7 @@ handle_frame("SUBSCRIBE", Frame, State) -> handle_frame("UNSUBSCRIBE", Frame, State) -> ConsumerTag = rabbit_stomp_util:consumer_tag(Frame), - cancel_subscription(ConsumerTag, State); + cancel_subscription(ConsumerTag, Frame, State); handle_frame("SEND", Frame, State) -> with_destination("SEND", Frame, State, fun do_send/4); @@ -277,12 +298,12 @@ ack_action(Command, Frame, %% Internal helpers for processing frames callbacks %%---------------------------------------------------------------------------- -cancel_subscription({error, _}, State) -> +cancel_subscription({error, _}, _Frame, State) -> error("Missing destination or id", "UNSUBSCRIBE must include a 'destination' or 'id' header\n", State); -cancel_subscription({ok, ConsumerTag, Description}, +cancel_subscription({ok, ConsumerTag, Description}, Frame, State = #state{channel = MainChannel, subscriptions = Subs}) -> case dict:find(ConsumerTag, Subs) of @@ -292,14 +313,14 @@ cancel_subscription({ok, ConsumerTag, Description}, "Subscription to ~p not found.\n", [Description], State); - {ok, #subscription{channel = SubChannel}} -> + {ok, #subscription{dest_hdr = DestHdr, channel = SubChannel}} -> case amqp_channel:call(SubChannel, #'basic.cancel'{ consumer_tag = ConsumerTag}) of #'basic.cancel_ok'{consumer_tag = ConsumerTag} -> + ok = ensure_subchannel_closed(SubChannel, MainChannel), NewSubs = dict:erase(ConsumerTag, Subs), - ensure_subchannel_closed(SubChannel, - MainChannel, + maybe_delete_durable_sub(DestHdr, Frame, State#state{ subscriptions = NewSubs}); _ -> @@ -310,13 +331,32 @@ cancel_subscription({ok, ConsumerTag, Description}, end end. -ensure_subchannel_closed(SubChannel, MainChannel, State) - when SubChannel == MainChannel -> - ok(State); +maybe_delete_durable_sub(DestHdr, Frame, State = #state{channel = Channel}) -> + case rabbit_stomp_util:parse_destination(DestHdr) of + {ok, {topic, Name}} -> + case rabbit_stomp_frame:boolean_header(Frame, + "persistent", false) of + true -> + {ok, Id} = rabbit_stomp_frame:header(Frame, "id"), + QName = + rabbit_stomp_util:durable_subscription_queue(Name, Id), + amqp_channel:call(Channel, #'queue.delete'{queue = QName, + nowait = false}), + ok(State); + false -> + ok(State) + end; + _ -> + ok(State) + end. -ensure_subchannel_closed(SubChannel, _MainChannel, State) -> +ensure_subchannel_closed(SubChannel, MainChannel) + when SubChannel == MainChannel -> + ok; + +ensure_subchannel_closed(SubChannel, _MainChannel) -> amqp_channel:close(SubChannel), - ok(State). + ok. with_destination(Command, Frame, State, Fun) -> case rabbit_stomp_frame:header(Frame, "destination") of @@ -333,7 +373,8 @@ with_destination(Command, Frame, State, Fun) -> error("Unknown destination", "'~s' is not a valid destination.\n" ++ "Valid destination types are: " ++ - "/exchange, /topic or /queue.\n", + string:join(rabbit_stomp_util:valid_dest_prefixes(),", ") ++ + ".\n", [Content], State) end; @@ -443,7 +484,7 @@ do_subscribe(Destination, DestHdr, Frame, {AckMode, IsMulti} = rabbit_stomp_util:ack_mode(Frame), - {ok, Queue} = ensure_queue(subscribe, Destination, Channel), + {ok, Queue} = ensure_queue(subscribe, Destination, Frame, Channel), {ok, ConsumerTag, Description} = rabbit_stomp_util:consumer_tag(Frame), @@ -469,9 +510,11 @@ do_subscribe(Destination, DestHdr, Frame, do_send(Destination, _DestHdr, Frame = #stomp_frame{body_iolist = BodyFragments}, State = #state{channel = Channel}) -> - {ok, _Q} = ensure_queue(send, Destination, Channel), + {ok, _Q} = ensure_queue(send, Destination, Frame, Channel), - Props = rabbit_stomp_util:message_properties(Frame), + {Frame1, State1} = ensure_reply_to(Frame, State), + + Props = rabbit_stomp_util:message_properties(Frame1), {Exchange, RoutingKey} = rabbit_stomp_util:parse_routing_information(Destination), @@ -482,17 +525,17 @@ do_send(Destination, _DestHdr, mandatory = false, immediate = false}, - case transactional(Frame) of + case transactional(Frame1) of {yes, Transaction} -> extend_transaction(Transaction, fun(StateN) -> - maybe_record_receipt(Frame, StateN) + maybe_record_receipt(Frame1, StateN) end, {Method, Props, BodyFragments}, - State); + State1); no -> ok(send_method(Method, Props, BodyFragments, - maybe_record_receipt(Frame, State))) + maybe_record_receipt(Frame1, State1))) end. create_ack_method(DeliveryTag, #subscription{multi_ack = IsMulti}) -> @@ -572,6 +615,63 @@ default_prefetch({queue, _}) -> default_prefetch(_) -> undefined. +%%---------------------------------------------------------------------------- +%% Reply-To +%%---------------------------------------------------------------------------- +ensure_reply_to(Frame = #stomp_frame{headers = Headers}, State) -> + case rabbit_stomp_frame:header(Frame, "reply-to") of + not_found -> + {Frame, State}; + {ok, ReplyTo} -> + {ok, Destination} = rabbit_stomp_util:parse_destination(ReplyTo), + case Destination of + {temp_queue, TempQueueId} -> + {ReplyQueue, State1} = + ensure_reply_queue(TempQueueId, State), + {Frame#stomp_frame{ + headers = lists:keyreplace("reply-to", 1, Headers, + {"reply-to", ReplyQueue})}, + State1}; + _ -> + {Frame, State} + end + end. + +ensure_reply_queue(TempQueueId, State = #state{channel = Channel, + reply_queues = RQS, + subscriptions = Subs}) -> + case dict:find(TempQueueId, RQS) of + {ok, RQ} -> + {RQ, RQS}; + error -> + #'queue.declare_ok'{queue = Queue} = + amqp_channel:call(Channel, + #'queue.declare'{auto_delete = true, + exclusive = true}), + + #'basic.consume_ok'{consumer_tag = ConsumerTag} = + amqp_channel:subscribe(Channel, + #'basic.consume'{ + queue = Queue, + no_ack = true, + nowait = false}, + self()), + + Destination = "/reply-queue/" ++ binary_to_list(Queue), + + %% synthesise a subscription to the reply queue destination + Subs1 = dict:store(ConsumerTag, + #subscription{dest_hdr = Destination, + channel = Channel, + multi_ack = false}, + Subs), + + {Destination, State#state{ + reply_queues = dict:store(TempQueueId, Queue, RQS), + subscriptions = Subs1}} + end. + + %%---------------------------------------------------------------------------- %% Receipt Handling %%---------------------------------------------------------------------------- @@ -747,16 +847,16 @@ millis_to_seconds(M) -> %% Queue and Binding Setup %%---------------------------------------------------------------------------- -ensure_queue(subscribe, {exchange, _}, Channel) -> +ensure_queue(subscribe, {exchange, _}, _Frame, Channel) -> %% Create anonymous, exclusive queue for SUBSCRIBE on /exchange destinations #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, #'queue.declare'{auto_delete = true, exclusive = true}), {ok, Queue}; -ensure_queue(send, {exchange, _}, _Channel) -> +ensure_queue(send, {exchange, _}, _Frame, _Channel) -> %% Don't create queues on SEND for /exchange destinations {ok, undefined}; -ensure_queue(_, {queue, Name}, Channel) -> +ensure_queue(_, {queue, Name}, _Frame, Channel) -> %% Always create named queue for /queue destinations Queue = list_to_binary(Name), amqp_channel:cast(Channel, @@ -764,15 +864,30 @@ ensure_queue(_, {queue, Name}, Channel) -> queue = Queue, nowait = true}), {ok, Queue}; -ensure_queue(subscribe, {topic, _}, Channel) -> - %% Create anonymous, exclusive queue for SUBSCRIBE on /topic destinations +ensure_queue(subscribe, {topic, Name}, Frame, Channel) -> + %% Create queue for SUBSCRIBE on /topic destinations Queues are + %% anonymous, auto_delete and exclusive for transient + %% subscriptions. Durable subscriptions get shared, named, durable + %% queues. + Method = + case rabbit_stomp_frame:boolean_header(Frame, "persistent", false) of + true -> + {ok, Id} = rabbit_stomp_frame:header(Frame, "id"), + QName = rabbit_stomp_util:durable_subscription_queue(Name, Id), + #'queue.declare'{durable = true, queue = QName}; + false -> + #'queue.declare'{auto_delete = true, exclusive = true} + end, + #'queue.declare_ok'{queue = Queue} = - amqp_channel:call(Channel, #'queue.declare'{auto_delete = true, - exclusive = true}), + amqp_channel:call(Channel, Method), {ok, Queue}; -ensure_queue(send, {topic, _}, _Channel) -> +ensure_queue(send, {topic, _}, _Frame, _Channel) -> %% Don't create queues on SEND for /topic destinations - {ok, undefined}. + {ok, undefined}; +ensure_queue(_, {Type, Name}, _Frame, _Channel) + when Type =:= reply_queue orelse Type =:= amqqueue -> + {ok, list_to_binary(Name)}. ensure_queue_binding(QueueBin, {"", Queue}, _Channel) -> %% i.e., we should only be asked to bind to the default exchange a diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl index 9da7882d6f..1b8b07d53c 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl @@ -32,10 +32,11 @@ -module(rabbit_stomp_util). -export([parse_destination/1, parse_routing_information/1, - parse_message_id/1]). + parse_message_id/1, durable_subscription_queue/2]). -export([longstr_field/2]). -export([ack_mode/1, consumer_tag/1, message_headers/4, message_properties/1]). -export([negotiate_version/2]). +-export([valid_dest_prefixes/0]). -include_lib("amqp_client/include/amqp_client.hrl"). -include("rabbit_stomp_frame.hrl"). @@ -43,6 +44,14 @@ -define(QUEUE_PREFIX, "/queue"). -define(TOPIC_PREFIX, "/topic"). -define(EXCHANGE_PREFIX, "/exchange"). +-define(AMQQUEUE_PREFIX, "/amq/queue"). +-define(TEMP_QUEUE_PREFIX, "/temp-queue"). +%% reply queues names can have slashes in the content so no further +%% parsing happens. +-define(REPLY_QUEUE_PREFIX, "/reply-queue/"). + +-define(VALID_DEST_PREFIXES, [?EXCHANGE_PREFIX, ?TOPIC_PREFIX, ?QUEUE_PREFIX, + ?AMQQUEUE_PREFIX, ?TEMP_QUEUE_PREFIX, ?REPLY_QUEUE_PREFIX]). -define(MESSAGE_ID_SEPARATOR, "@@"). -define(HEADER_CONTENT_TYPE, "content-type"). @@ -112,10 +121,10 @@ message_headers(Destination, SessionId, maybe_header(Header, element(Index, Props), Acc) end, case ConsumerTag of - <<"Q_", _/binary>> -> - Basic; <<"T_", Id/binary>> -> - [{"subscription", binary_to_list(Id)} | Basic] + [{"subscription", binary_to_list(Id)} | Basic]; + _ -> + Basic end, [{?HEADER_CONTENT_TYPE, #'P_basic'.content_type}, {?HEADER_CONTENT_ENCODING, #'P_basic'.content_encoding}, @@ -224,6 +233,13 @@ parse_destination(?QUEUE_PREFIX ++ Rest) -> parse_simple_destination(queue, Rest); parse_destination(?TOPIC_PREFIX ++ Rest) -> parse_simple_destination(topic, Rest); +parse_destination(?AMQQUEUE_PREFIX ++ Rest) -> + parse_simple_destination(amqqueue, Rest); +parse_destination(?TEMP_QUEUE_PREFIX ++ Rest) -> + parse_simple_destination(temp_queue, Rest); +parse_destination(?REPLY_QUEUE_PREFIX ++ Rest) -> + %% reply queue names might have slashes + {ok, {reply_queue, Rest}}; parse_destination(?EXCHANGE_PREFIX ++ Rest) -> case parse_content(Rest) of %% One cannot refer to the default exchange this way; it has @@ -240,10 +256,17 @@ parse_routing_information({exchange, {Name, undefined}}) -> {Name, ""}; parse_routing_information({exchange, {Name, Pattern}}) -> {Name, Pattern}; -parse_routing_information({queue, Name}) -> - {"", Name}; parse_routing_information({topic, Name}) -> - {"amq.topic", Name}. + {"amq.topic", Name}; +parse_routing_information({Type, Name}) + when Type =:= queue orelse Type =:= reply_queue orelse Type =:= amqqueue -> + {"", Name}. + +valid_dest_prefixes() -> ?VALID_DEST_PREFIXES. + +durable_subscription_queue(Destination, SubscriptionId) -> + <<(list_to_binary("stomp.dsub." ++ Destination ++ "."))/binary, + (erlang:md5(SubscriptionId))/binary>>. %% ---- Destination parsing helpers ---- diff --git a/deps/rabbitmq_stomp/test/src/ack.py b/deps/rabbitmq_stomp/test/src/ack.py index 14f5cc6d59..1ca5465117 100644 --- a/deps/rabbitmq_stomp/test/src/ack.py +++ b/deps/rabbitmq_stomp/test/src/ack.py @@ -67,7 +67,7 @@ class TestAck(base.BaseTest): conn2 = self.create_connection() try: listener2 = base.WaitableListener() - listener2.reset(2) + listener2.reset(2) ## expecting 2 messages conn2.set_listener('', listener2) conn2.subscribe(destination=d, ack='client-individual', headers={'prefetch-count': '10'}) @@ -84,10 +84,12 @@ class TestAck(base.BaseTest): conn3 = self.create_connection() try: listener3 = base.WaitableListener() + listener3.reset(1) ## expecting a single message conn3.set_listener('', listener3) conn3.subscribe(destination=d) - self.assertTrue(listener3.await(3), + self.assertTrue(listener3.await(20), "Expected to see a message. ACK not working?") + self.assertEquals(1, len(listener3.messages)) self.assertEquals("test1", listener3.messages[0]['message']) finally: conn3.stop() diff --git a/deps/rabbitmq_stomp/test/src/base.py b/deps/rabbitmq_stomp/test/src/base.py index 8fd1a398c4..f91cc0af8f 100644 --- a/deps/rabbitmq_stomp/test/src/base.py +++ b/deps/rabbitmq_stomp/test/src/base.py @@ -95,18 +95,24 @@ class WaitableListener(object): def reset(self, count=1): if self.debug: - print '(reset listener)', - print '#messages:', len(self.messages), - print '#errors:', len(self.errors), - print '#receipts:', len(self.receipts), 'Now expecting:', count + self.print_state('(reset listener--old state)') self.messages = [] self.errors = [] self.receipts = [] self.latch = Latch(count) + if self.debug: + self.print_state('(reset listener--new state)') def await(self, timeout=10): return self.latch.await(timeout) + def print_state(self, hdr=""): + print hdr, + print '#messages:', len(self.messages), + print '#errors:', len(self.errors), + print '#receipts:', len(self.receipts), + print 'Remaining count:', self.latch.get_count() + class Latch(object): def __init__(self, count=1): @@ -134,3 +140,9 @@ class Latch(object): finally: self.cond.release() + def get_count(self): + try: + self.cond.acquire() + return self.count + finally: + self.cond.release() diff --git a/deps/rabbitmq_stomp/test/src/destinations.py b/deps/rabbitmq_stomp/test/src/destinations.py index 754f202ba0..cf3bec7ee1 100644 --- a/deps/rabbitmq_stomp/test/src/destinations.py +++ b/deps/rabbitmq_stomp/test/src/destinations.py @@ -165,7 +165,7 @@ class TestQueue(base.BaseTest): self.conn.send('third', destination=d, receipt='b', transaction=tx) self.conn.commit(transaction=tx) - self.assertTrue("Missing messages/confirms", self.listener.await(20)) + self.assertTrue(self.listener.await(40), "Missing messages/confirms") expected = set(['a', 'b']) missing = expected.difference(self.__gather_receipts()) @@ -253,3 +253,166 @@ class TestTopic(base.BaseTest): conn1.stop() conn2.stop() +class TestReplyQueue(base.BaseTest): + + def test_reply_queue(self): + ''' Test with two separate clients. Client 1 sends + message to a known destination with a defined reply + queue. Client 2 receives on known destination and replies + on the reply destination. Client 1 gets the reply message''' + + known = '/queue/known' + reply = '/temp-queue/0' + + ## Client 1 uses pre-supplied connection and listener + ## Set up client 2 + conn2, listener2 = self.create_subscriber_connection(known) + + try: + self.conn.send("test", destination=known, + headers = {"reply-to": reply}) + + self.assertTrue(listener2.await(5)) + self.assertEquals(1, len(listener2.messages)) + + reply_to = listener2.messages[0]['headers']['reply-to'] + self.assertTrue(reply_to.startswith('/reply-queue/')) + + conn2.send("reply", destination=reply_to) + self.assertTrue(self.listener.await(5)) + self.assertEquals("reply", self.listener.messages[0]['message']) + finally: + conn2.stop() + +class TestDurableSubscription(base.BaseTest): + + ID = 'test.subscription' + + def __subscribe(self, dest, conn=None, id=None): + if not conn: + conn = self.conn + if not id: + id = TestDurableSubscription.ID + + conn.subscribe(destination=dest, + headers ={'persistent': 'true', + 'receipt': 1, + 'id': id}) + + def __assert_receipt(self, listener=None): + if not listener: + listener = self.listener + + self.assertTrue(listener.await(5)) + self.assertEquals(1, len(self.listener.receipts)) + + def __assert_message(self, msg, listener=None): + if not listener: + listener = self.listener + + self.assertTrue(listener.await(5)) + self.assertEquals(1, len(listener.messages)) + self.assertEquals(msg, listener.messages[0]['message']) + + def test_durability(self): + d = '/topic/durable' + + self.__subscribe(d) + self.__assert_receipt() + + # send first message without unsubscribing + self.listener.reset(1) + self.conn.send("first", destination=d) + self.__assert_message("first") + + # now unsubscribe (disconnect only) + self.conn.unsubscribe(id=TestDurableSubscription.ID) + + # send again + self.listener.reset(1) + self.conn.send("second", destination=d) + + # resubscribe and expect message + self.__subscribe(d) + self.__assert_message("second") + + # now unsubscribe (cancel) + self.conn.unsubscribe(id=TestDurableSubscription.ID, + headers={'persistent': 'true'}) + + # send again + self.listener.reset(1) + self.conn.send("third", destination=d) + + # resubscribe and expect no message + self.__subscribe(d) + self.assertTrue(self.listener.await(3)) + self.assertEquals(0, len(self.listener.messages)) + self.assertEquals(1, len(self.listener.receipts)) + + def test_share_subscription(self): + d = '/topic/durable-shared' + + conn2 = self.create_connection() + conn2.set_listener('', self.listener) + + try: + self.__subscribe(d) + self.__assert_receipt() + self.listener.reset(1) + self.__subscribe(d, conn2) + self.__assert_receipt() + + self.listener.reset(100) + + # send 100 messages + for x in xrange(0, 100): + self.conn.send("msg" + str(x), destination=d) + + self.assertTrue(self.listener.await(5)) + self.assertEquals(100, len(self.listener.messages)) + finally: + conn2.stop() + + def test_separate_ids(self): + d = '/topic/durable-separate' + + conn2 = self.create_connection() + listener2 = base.WaitableListener() + conn2.set_listener('', listener2) + + try: + # ensure durable subscription exists for each ID + self.__subscribe(d) + self.__assert_receipt() + self.__subscribe(d, conn2, "other.id") + self.__assert_receipt(listener2) + self.conn.unsubscribe(id=TestDurableSubscription.ID) + conn2.unsubscribe(id="other.id") + + self.listener.reset(101) + listener2.reset(101) ## 100 messages and 1 receipt + + # send 100 messages + for x in xrange(0, 100): + self.conn.send("msg" + str(x), destination=d) + + self.__subscribe(d) + self.__subscribe(d, conn2, "other.id") + + for l in [self.listener, listener2]: + self.assertTrue(l.await(10)) + self.assertEquals(100, len(l.messages)) + + finally: + conn2.stop() + + def test_durable_subscribe_no_id(self): + d = '/topic/durable-invalid' + + self.conn.subscribe(destination=d, headers={'persistent':'true'}), + self.listener.await(3) + self.assertEquals(1, len(self.listener.errors)) + self.assertEquals("Missing Header", self.listener.errors[0]['headers']['message']) + + diff --git a/deps/rabbitmq_stomp/test/src/rabbit_stomp_amqqueue_test.erl b/deps/rabbitmq_stomp/test/src/rabbit_stomp_amqqueue_test.erl new file mode 100644 index 0000000000..3207cc0688 --- /dev/null +++ b/deps/rabbitmq_stomp/test/src/rabbit_stomp_amqqueue_test.erl @@ -0,0 +1,125 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ Management Console. +%% +%% The Initial Developers of the Original Code are Rabbit Technologies Ltd. +%% +%% Copyright (C) 2011 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% +-module(rabbit_stomp_amqqueue_test). +-export([all_tests/0]). + +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_stomp_frame.hrl"). + +-define(QUEUE, <<"TestQueue">>). +-define(DESTINATION, "/amq/queue/TestQueue"). + +all_tests() -> + [ok = run_test(TestFun) || TestFun <- [fun test_subscribe_error/2, + fun test_subscribe/2, + fun test_send/2]], + ok. + +run_test(TestFun) -> + {ok, Connection} = amqp_connection:start(#amqp_params_direct{}), + {ok, Channel} = amqp_connection:open_channel(Connection), + {ok, Sock} = stomp_connect(), + + Result = (catch TestFun(Channel, Sock)), + + stomp_disconnect(Sock), + amqp_channel:close(Channel), + amqp_connection:close(Connection), + Result. + +test_subscribe_error(_Channel, Sock) -> + %% SUBSCRIBE to missing queue + stomp_send(Sock, "SUBSCRIBE", [{"destination", ?DESTINATION}]), + #stomp_frame{command = "ERROR", headers = Hdrs} = stomp_recv(Sock), + "not_found" = proplists:get_value("message", Hdrs), + ok. + +test_subscribe(Channel, Sock) -> + #'queue.declare_ok'{} = + amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE, + auto_delete = true}), + + %% subscribe and wait for receipt + stomp_send(Sock, "SUBSCRIBE", [{"destination", ?DESTINATION}, + {"receipt", "foo"}]), + #stomp_frame{command = "RECEIPT"} = stomp_recv(Sock), + + + %% send from amqp + Method = #'basic.publish'{ + exchange = <<"">>, + routing_key = ?QUEUE}, + + amqp_channel:call(Channel, Method, #amqp_msg{props = #'P_basic'{}, + payload = <<"hello">>}), + + #stomp_frame{command = "MESSAGE", + body_iolist = [<<"hello">>]} = stomp_recv(Sock), + + ok. + +test_send(Channel, Sock) -> + #'queue.declare_ok'{} = + amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE, + auto_delete = true}), + + %% subscribe and wait for receipt + stomp_send(Sock, "SUBSCRIBE", [{"destination", ?DESTINATION}, + {"receipt", "foo"}]), + #stomp_frame{command = "RECEIPT"} = stomp_recv(Sock), + + + %% send from stomp + stomp_send(Sock, "SEND", [{"destination", ?DESTINATION}], ["hello"]), + + #stomp_frame{command = "MESSAGE", + body_iolist = [<<"hello">>]} = stomp_recv(Sock), + + ok. + +stomp_connect() -> + {ok, Sock} = gen_tcp:connect(localhost, 61613, [{active, false}, binary]), + stomp_send(Sock, "CONNECT"), + #stomp_frame{command = "CONNECTED"} = stomp_recv(Sock), + {ok, Sock}. + +stomp_disconnect(Sock) -> + stomp_send(Sock, "DISCONNECT"). + +stomp_send(Sock, Command) -> + stomp_send(Sock, Command, []). + +stomp_send(Sock, Command, Headers) -> + stomp_send(Sock, Command, Headers, []). + +stomp_send(Sock, Command, Headers, Body) -> + gen_tcp:send(Sock, rabbit_stomp_frame:serialize( + #stomp_frame{command = list_to_binary(Command), + headers = Headers, + body_iolist = Body})). + +stomp_recv(Sock) -> + {ok, Payload} = gen_tcp:recv(Sock, 0), + {ok, Frame, _Rest} = + rabbit_stomp_frame:parse(Payload, + rabbit_stomp_frame:initial_state()), + Frame. + diff --git a/deps/rabbitmq_stomp/test/src/rabbit_stomp_test_util.erl b/deps/rabbitmq_stomp/test/src/rabbit_stomp_test_util.erl index fa1fb2ff19..2b720943c8 100644 --- a/deps/rabbitmq_stomp/test/src/rabbit_stomp_test_util.erl +++ b/deps/rabbitmq_stomp/test/src/rabbit_stomp_test_util.erl @@ -195,10 +195,19 @@ valid_topic_test() -> valid_exchange_test() -> {ok, {exchange, {"test", undefined}}} = parse_destination("/exchange/test"). +valid_temp_queue_test() -> + {ok, {temp_queue, "test"}} = parse_destination("/temp-queue/test"). + +valid_reply_queue_test() -> + {ok, {reply_queue, "test"}} = parse_destination("/reply-queue/test"). + valid_exchange_with_pattern_test() -> {ok, {exchange, {"test", "pattern"}}} = parse_destination("/exchange/test/pattern"). +valid_amqqueue_test() -> + {ok, {amqqueue, "test"}} = parse_destination("/amq/queue/test"). + queue_with_no_name_test() -> {error, {invalid_destination, queue, ""}} = parse_destination("/queue"). @@ -213,6 +222,10 @@ exchange_default_name_test() -> {error, {invalid_destination, exchange, "//foo"}} = parse_destination("/exchange//foo"). +amqqueue_with_no_name_test() -> + {error, {invalid_destination, amqqueue, ""}} = + parse_destination("/amq/queue"). + queue_with_no_name_slash_test() -> {error, {invalid_destination, queue, "/"}} = parse_destination("/queue/"). diff --git a/deps/rabbitmq_stomp/test/src/reliability.py b/deps/rabbitmq_stomp/test/src/reliability.py index 71068f777c..08476d5b6b 100644 --- a/deps/rabbitmq_stomp/test/src/reliability.py +++ b/deps/rabbitmq_stomp/test/src/reliability.py @@ -1,6 +1,7 @@ import base import stomp import unittest +import time class TestReliability(base.BaseTest): @@ -20,10 +21,14 @@ class TestReliability(base.BaseTest): for x in range(0, count): pub_conn.send(msg + str(x), destination=d) - + time.sleep(2.0) pub_conn.close_socket() - self.assertTrue(listener.await(10)) - self.assertEquals(count, len(listener.messages)) + + if listener.await(30): + self.assertEquals(count, len(listener.messages)) + else: + listener.print_state("Final state of listener:") + self.fail("Did not receive %s messages in time" % count) finally: if pub_conn.is_connected(): pub_conn.disconnect()