Merge bug24304 into default

This commit is contained in:
Steve Powell 2011-08-10 10:24:55 +01:00
commit 25556a657b
10 changed files with 531 additions and 53 deletions

View File

@ -1,6 +1,6 @@
diff -r 16a4000624a7 stomp/connect.py
--- a/stomp/connect.py Sun May 02 18:15:34 2010 +0100
+++ b/stomp/connect.py Wed Jun 08 13:20:12 2011 +0100
+++ b/stomp/connect.py Mon Aug 01 17:25:05 2011 +0100
@@ -88,7 +88,10 @@
ssl_key_file = None,
ssl_cert_file = None,
@ -57,15 +57,34 @@ diff -r 16a4000624a7 stomp/connect.py
if self.__socket is not None:
if self.__ssl:
#
@@ -403,7 +429,6 @@
@@ -390,20 +416,20 @@
#
try:
self.__socket = self.__socket.unwrap()
- except Exception:
+ except Exception as e:
#
# unwrap seems flaky on Win with the backported ssl mod, so catch any exception and log it
#
- _, e, _ = sys.exc_info()
- log.warn(e)
+ log.warning("socket unwrap() threw exception: %s" % e)
elif hasattr(socket, 'SHUT_RDWR'):
self.__socket.shutdown(socket.SHUT_RDWR)
#
if self.__socket is not None:
- # split this into a separate check, because sometimes the socket is nulled between shutdown and this call
+ # caution, because sometimes the socket is nulled between shutdown and this call
#
- if self.__socket is not None:
+ try:
self.__socket.close()
- self.__current_host_and_port = None
+ except Exception as e:
+ log.warning("socket close() threw exception: %s" % e)
def __convert_dict(self, payload):
"""
@@ -449,6 +474,9 @@
@@ -449,6 +475,9 @@
raise KeyError("Command %s requires header %r" % (command, required_header_key))
self.__send_frame(command, headers, payload)
@ -75,7 +94,7 @@ diff -r 16a4000624a7 stomp/connect.py
def __send_frame(self, command, headers={}, payload=''):
"""
Send a STOMP frame.
@@ -680,4 +708,4 @@
@@ -680,4 +709,4 @@
sleep_exp += 1
if not self.__socket:

View File

@ -2,6 +2,7 @@ RELEASABLE:=true
DEPS:=rabbitmq-server rabbitmq-erlang-client
STANDALONE_TEST_COMMANDS:=eunit:test([rabbit_stomp_test_util,rabbit_stomp_test_frame],[verbose])
WITH_BROKER_TEST_SCRIPTS:=$(PACKAGE_DIR)/test/src/test.py $(PACKAGE_DIR)/test/src/test_connect_options.py
WITH_BROKER_TEST_COMMANDS:=rabbit_stomp_amqqueue_test:all_tests()
RABBITMQ_TEST_PATH=$(PACKAGE_DIR)/../../rabbitmq-test
ABS_PACKAGE_DIR:=$(abspath $(PACKAGE_DIR))

View File

@ -42,7 +42,7 @@
-record(state, {socket, session_id, channel,
connection, subscriptions, version,
start_heartbeat_fun, pending_receipts,
config}).
config, reply_queues}).
-record(subscription, {dest_hdr, channel, multi_ack, description}).
@ -76,7 +76,8 @@ init([Sock, StartHeartbeatFun, Configuration]) ->
version = none,
start_heartbeat_fun = StartHeartbeatFun,
pending_receipts = undefined,
config = Configuration},
config = Configuration,
reply_queues = dict:new()},
hibernate,
{backoff, 1000, 1000, 10000}
}.
@ -108,7 +109,10 @@ handle_cast(_Request, State = #state{channel = none,
handle_cast({Command, Frame}, State) ->
process_request(
fun(StateN) ->
handle_frame(Command, Frame, StateN)
case validate_frame(Command, Frame, StateN) of
R = {error, _, _, _} -> R;
_ -> handle_frame(Command, Frame, StateN)
end
end,
fun(StateM) ->
ensure_receipt(Frame, StateM)
@ -200,6 +204,23 @@ process_connect(Implicit,
fun(StateM) -> StateM end,
State).
%%----------------------------------------------------------------------------
%% Frame Validation
%%----------------------------------------------------------------------------
validate_frame(Command, Frame, State)
when Command =:= "SUBSCRIBE" orelse Command =:= "UNSUBSCRIBE" ->
Hdr = fun(Name) -> rabbit_stomp_frame:header(Frame, Name) end,
case {Hdr("persistent"), Hdr("id")} of
{{ok, "true"}, not_found} ->
error("Missing Header",
"Header 'id' is required for durable subscriptions", State);
_ ->
ok(State)
end;
validate_frame(_Command, _Frame, State) ->
ok(State).
%%----------------------------------------------------------------------------
%% Frame handlers
%%----------------------------------------------------------------------------
@ -213,7 +234,7 @@ handle_frame("SUBSCRIBE", Frame, State) ->
handle_frame("UNSUBSCRIBE", Frame, State) ->
ConsumerTag = rabbit_stomp_util:consumer_tag(Frame),
cancel_subscription(ConsumerTag, State);
cancel_subscription(ConsumerTag, Frame, State);
handle_frame("SEND", Frame, State) ->
with_destination("SEND", Frame, State, fun do_send/4);
@ -277,12 +298,12 @@ ack_action(Command, Frame,
%% Internal helpers for processing frames callbacks
%%----------------------------------------------------------------------------
cancel_subscription({error, _}, State) ->
cancel_subscription({error, _}, _Frame, State) ->
error("Missing destination or id",
"UNSUBSCRIBE must include a 'destination' or 'id' header\n",
State);
cancel_subscription({ok, ConsumerTag, Description},
cancel_subscription({ok, ConsumerTag, Description}, Frame,
State = #state{channel = MainChannel,
subscriptions = Subs}) ->
case dict:find(ConsumerTag, Subs) of
@ -292,14 +313,14 @@ cancel_subscription({ok, ConsumerTag, Description},
"Subscription to ~p not found.\n",
[Description],
State);
{ok, #subscription{channel = SubChannel}} ->
{ok, #subscription{dest_hdr = DestHdr, channel = SubChannel}} ->
case amqp_channel:call(SubChannel,
#'basic.cancel'{
consumer_tag = ConsumerTag}) of
#'basic.cancel_ok'{consumer_tag = ConsumerTag} ->
ok = ensure_subchannel_closed(SubChannel, MainChannel),
NewSubs = dict:erase(ConsumerTag, Subs),
ensure_subchannel_closed(SubChannel,
MainChannel,
maybe_delete_durable_sub(DestHdr, Frame,
State#state{
subscriptions = NewSubs});
_ ->
@ -310,13 +331,32 @@ cancel_subscription({ok, ConsumerTag, Description},
end
end.
ensure_subchannel_closed(SubChannel, MainChannel, State)
when SubChannel == MainChannel ->
ok(State);
maybe_delete_durable_sub(DestHdr, Frame, State = #state{channel = Channel}) ->
case rabbit_stomp_util:parse_destination(DestHdr) of
{ok, {topic, Name}} ->
case rabbit_stomp_frame:boolean_header(Frame,
"persistent", false) of
true ->
{ok, Id} = rabbit_stomp_frame:header(Frame, "id"),
QName =
rabbit_stomp_util:durable_subscription_queue(Name, Id),
amqp_channel:call(Channel, #'queue.delete'{queue = QName,
nowait = false}),
ok(State);
false ->
ok(State)
end;
_ ->
ok(State)
end.
ensure_subchannel_closed(SubChannel, _MainChannel, State) ->
ensure_subchannel_closed(SubChannel, MainChannel)
when SubChannel == MainChannel ->
ok;
ensure_subchannel_closed(SubChannel, _MainChannel) ->
amqp_channel:close(SubChannel),
ok(State).
ok.
with_destination(Command, Frame, State, Fun) ->
case rabbit_stomp_frame:header(Frame, "destination") of
@ -333,7 +373,8 @@ with_destination(Command, Frame, State, Fun) ->
error("Unknown destination",
"'~s' is not a valid destination.\n" ++
"Valid destination types are: " ++
"/exchange, /topic or /queue.\n",
string:join(rabbit_stomp_util:valid_dest_prefixes(),", ") ++
".\n",
[Content],
State)
end;
@ -443,7 +484,7 @@ do_subscribe(Destination, DestHdr, Frame,
{AckMode, IsMulti} = rabbit_stomp_util:ack_mode(Frame),
{ok, Queue} = ensure_queue(subscribe, Destination, Channel),
{ok, Queue} = ensure_queue(subscribe, Destination, Frame, Channel),
{ok, ConsumerTag, Description} = rabbit_stomp_util:consumer_tag(Frame),
@ -469,9 +510,11 @@ do_subscribe(Destination, DestHdr, Frame,
do_send(Destination, _DestHdr,
Frame = #stomp_frame{body_iolist = BodyFragments},
State = #state{channel = Channel}) ->
{ok, _Q} = ensure_queue(send, Destination, Channel),
{ok, _Q} = ensure_queue(send, Destination, Frame, Channel),
Props = rabbit_stomp_util:message_properties(Frame),
{Frame1, State1} = ensure_reply_to(Frame, State),
Props = rabbit_stomp_util:message_properties(Frame1),
{Exchange, RoutingKey} =
rabbit_stomp_util:parse_routing_information(Destination),
@ -482,17 +525,17 @@ do_send(Destination, _DestHdr,
mandatory = false,
immediate = false},
case transactional(Frame) of
case transactional(Frame1) of
{yes, Transaction} ->
extend_transaction(Transaction,
fun(StateN) ->
maybe_record_receipt(Frame, StateN)
maybe_record_receipt(Frame1, StateN)
end,
{Method, Props, BodyFragments},
State);
State1);
no ->
ok(send_method(Method, Props, BodyFragments,
maybe_record_receipt(Frame, State)))
maybe_record_receipt(Frame1, State1)))
end.
create_ack_method(DeliveryTag, #subscription{multi_ack = IsMulti}) ->
@ -572,6 +615,63 @@ default_prefetch({queue, _}) ->
default_prefetch(_) ->
undefined.
%%----------------------------------------------------------------------------
%% Reply-To
%%----------------------------------------------------------------------------
ensure_reply_to(Frame = #stomp_frame{headers = Headers}, State) ->
case rabbit_stomp_frame:header(Frame, "reply-to") of
not_found ->
{Frame, State};
{ok, ReplyTo} ->
{ok, Destination} = rabbit_stomp_util:parse_destination(ReplyTo),
case Destination of
{temp_queue, TempQueueId} ->
{ReplyQueue, State1} =
ensure_reply_queue(TempQueueId, State),
{Frame#stomp_frame{
headers = lists:keyreplace("reply-to", 1, Headers,
{"reply-to", ReplyQueue})},
State1};
_ ->
{Frame, State}
end
end.
ensure_reply_queue(TempQueueId, State = #state{channel = Channel,
reply_queues = RQS,
subscriptions = Subs}) ->
case dict:find(TempQueueId, RQS) of
{ok, RQ} ->
{RQ, RQS};
error ->
#'queue.declare_ok'{queue = Queue} =
amqp_channel:call(Channel,
#'queue.declare'{auto_delete = true,
exclusive = true}),
#'basic.consume_ok'{consumer_tag = ConsumerTag} =
amqp_channel:subscribe(Channel,
#'basic.consume'{
queue = Queue,
no_ack = true,
nowait = false},
self()),
Destination = "/reply-queue/" ++ binary_to_list(Queue),
%% synthesise a subscription to the reply queue destination
Subs1 = dict:store(ConsumerTag,
#subscription{dest_hdr = Destination,
channel = Channel,
multi_ack = false},
Subs),
{Destination, State#state{
reply_queues = dict:store(TempQueueId, Queue, RQS),
subscriptions = Subs1}}
end.
%%----------------------------------------------------------------------------
%% Receipt Handling
%%----------------------------------------------------------------------------
@ -747,16 +847,16 @@ millis_to_seconds(M) ->
%% Queue and Binding Setup
%%----------------------------------------------------------------------------
ensure_queue(subscribe, {exchange, _}, Channel) ->
ensure_queue(subscribe, {exchange, _}, _Frame, Channel) ->
%% Create anonymous, exclusive queue for SUBSCRIBE on /exchange destinations
#'queue.declare_ok'{queue = Queue} =
amqp_channel:call(Channel, #'queue.declare'{auto_delete = true,
exclusive = true}),
{ok, Queue};
ensure_queue(send, {exchange, _}, _Channel) ->
ensure_queue(send, {exchange, _}, _Frame, _Channel) ->
%% Don't create queues on SEND for /exchange destinations
{ok, undefined};
ensure_queue(_, {queue, Name}, Channel) ->
ensure_queue(_, {queue, Name}, _Frame, Channel) ->
%% Always create named queue for /queue destinations
Queue = list_to_binary(Name),
amqp_channel:cast(Channel,
@ -764,15 +864,30 @@ ensure_queue(_, {queue, Name}, Channel) ->
queue = Queue,
nowait = true}),
{ok, Queue};
ensure_queue(subscribe, {topic, _}, Channel) ->
%% Create anonymous, exclusive queue for SUBSCRIBE on /topic destinations
ensure_queue(subscribe, {topic, Name}, Frame, Channel) ->
%% Create queue for SUBSCRIBE on /topic destinations Queues are
%% anonymous, auto_delete and exclusive for transient
%% subscriptions. Durable subscriptions get shared, named, durable
%% queues.
Method =
case rabbit_stomp_frame:boolean_header(Frame, "persistent", false) of
true ->
{ok, Id} = rabbit_stomp_frame:header(Frame, "id"),
QName = rabbit_stomp_util:durable_subscription_queue(Name, Id),
#'queue.declare'{durable = true, queue = QName};
false ->
#'queue.declare'{auto_delete = true, exclusive = true}
end,
#'queue.declare_ok'{queue = Queue} =
amqp_channel:call(Channel, #'queue.declare'{auto_delete = true,
exclusive = true}),
amqp_channel:call(Channel, Method),
{ok, Queue};
ensure_queue(send, {topic, _}, _Channel) ->
ensure_queue(send, {topic, _}, _Frame, _Channel) ->
%% Don't create queues on SEND for /topic destinations
{ok, undefined}.
{ok, undefined};
ensure_queue(_, {Type, Name}, _Frame, _Channel)
when Type =:= reply_queue orelse Type =:= amqqueue ->
{ok, list_to_binary(Name)}.
ensure_queue_binding(QueueBin, {"", Queue}, _Channel) ->
%% i.e., we should only be asked to bind to the default exchange a

View File

@ -32,10 +32,11 @@
-module(rabbit_stomp_util).
-export([parse_destination/1, parse_routing_information/1,
parse_message_id/1]).
parse_message_id/1, durable_subscription_queue/2]).
-export([longstr_field/2]).
-export([ack_mode/1, consumer_tag/1, message_headers/4, message_properties/1]).
-export([negotiate_version/2]).
-export([valid_dest_prefixes/0]).
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_stomp_frame.hrl").
@ -43,6 +44,14 @@
-define(QUEUE_PREFIX, "/queue").
-define(TOPIC_PREFIX, "/topic").
-define(EXCHANGE_PREFIX, "/exchange").
-define(AMQQUEUE_PREFIX, "/amq/queue").
-define(TEMP_QUEUE_PREFIX, "/temp-queue").
%% reply queues names can have slashes in the content so no further
%% parsing happens.
-define(REPLY_QUEUE_PREFIX, "/reply-queue/").
-define(VALID_DEST_PREFIXES, [?EXCHANGE_PREFIX, ?TOPIC_PREFIX, ?QUEUE_PREFIX,
?AMQQUEUE_PREFIX, ?TEMP_QUEUE_PREFIX, ?REPLY_QUEUE_PREFIX]).
-define(MESSAGE_ID_SEPARATOR, "@@").
-define(HEADER_CONTENT_TYPE, "content-type").
@ -112,10 +121,10 @@ message_headers(Destination, SessionId,
maybe_header(Header, element(Index, Props), Acc)
end,
case ConsumerTag of
<<"Q_", _/binary>> ->
Basic;
<<"T_", Id/binary>> ->
[{"subscription", binary_to_list(Id)} | Basic]
[{"subscription", binary_to_list(Id)} | Basic];
_ ->
Basic
end,
[{?HEADER_CONTENT_TYPE, #'P_basic'.content_type},
{?HEADER_CONTENT_ENCODING, #'P_basic'.content_encoding},
@ -224,6 +233,13 @@ parse_destination(?QUEUE_PREFIX ++ Rest) ->
parse_simple_destination(queue, Rest);
parse_destination(?TOPIC_PREFIX ++ Rest) ->
parse_simple_destination(topic, Rest);
parse_destination(?AMQQUEUE_PREFIX ++ Rest) ->
parse_simple_destination(amqqueue, Rest);
parse_destination(?TEMP_QUEUE_PREFIX ++ Rest) ->
parse_simple_destination(temp_queue, Rest);
parse_destination(?REPLY_QUEUE_PREFIX ++ Rest) ->
%% reply queue names might have slashes
{ok, {reply_queue, Rest}};
parse_destination(?EXCHANGE_PREFIX ++ Rest) ->
case parse_content(Rest) of
%% One cannot refer to the default exchange this way; it has
@ -240,10 +256,17 @@ parse_routing_information({exchange, {Name, undefined}}) ->
{Name, ""};
parse_routing_information({exchange, {Name, Pattern}}) ->
{Name, Pattern};
parse_routing_information({queue, Name}) ->
{"", Name};
parse_routing_information({topic, Name}) ->
{"amq.topic", Name}.
{"amq.topic", Name};
parse_routing_information({Type, Name})
when Type =:= queue orelse Type =:= reply_queue orelse Type =:= amqqueue ->
{"", Name}.
valid_dest_prefixes() -> ?VALID_DEST_PREFIXES.
durable_subscription_queue(Destination, SubscriptionId) ->
<<(list_to_binary("stomp.dsub." ++ Destination ++ "."))/binary,
(erlang:md5(SubscriptionId))/binary>>.
%% ---- Destination parsing helpers ----

View File

@ -67,7 +67,7 @@ class TestAck(base.BaseTest):
conn2 = self.create_connection()
try:
listener2 = base.WaitableListener()
listener2.reset(2)
listener2.reset(2) ## expecting 2 messages
conn2.set_listener('', listener2)
conn2.subscribe(destination=d, ack='client-individual',
headers={'prefetch-count': '10'})
@ -84,10 +84,12 @@ class TestAck(base.BaseTest):
conn3 = self.create_connection()
try:
listener3 = base.WaitableListener()
listener3.reset(1) ## expecting a single message
conn3.set_listener('', listener3)
conn3.subscribe(destination=d)
self.assertTrue(listener3.await(3),
self.assertTrue(listener3.await(20),
"Expected to see a message. ACK not working?")
self.assertEquals(1, len(listener3.messages))
self.assertEquals("test1", listener3.messages[0]['message'])
finally:
conn3.stop()

View File

@ -95,18 +95,24 @@ class WaitableListener(object):
def reset(self, count=1):
if self.debug:
print '(reset listener)',
print '#messages:', len(self.messages),
print '#errors:', len(self.errors),
print '#receipts:', len(self.receipts), 'Now expecting:', count
self.print_state('(reset listener--old state)')
self.messages = []
self.errors = []
self.receipts = []
self.latch = Latch(count)
if self.debug:
self.print_state('(reset listener--new state)')
def await(self, timeout=10):
return self.latch.await(timeout)
def print_state(self, hdr=""):
print hdr,
print '#messages:', len(self.messages),
print '#errors:', len(self.errors),
print '#receipts:', len(self.receipts),
print 'Remaining count:', self.latch.get_count()
class Latch(object):
def __init__(self, count=1):
@ -134,3 +140,9 @@ class Latch(object):
finally:
self.cond.release()
def get_count(self):
try:
self.cond.acquire()
return self.count
finally:
self.cond.release()

View File

@ -165,7 +165,7 @@ class TestQueue(base.BaseTest):
self.conn.send('third', destination=d, receipt='b', transaction=tx)
self.conn.commit(transaction=tx)
self.assertTrue("Missing messages/confirms", self.listener.await(20))
self.assertTrue(self.listener.await(40), "Missing messages/confirms")
expected = set(['a', 'b'])
missing = expected.difference(self.__gather_receipts())
@ -253,3 +253,166 @@ class TestTopic(base.BaseTest):
conn1.stop()
conn2.stop()
class TestReplyQueue(base.BaseTest):
def test_reply_queue(self):
''' Test with two separate clients. Client 1 sends
message to a known destination with a defined reply
queue. Client 2 receives on known destination and replies
on the reply destination. Client 1 gets the reply message'''
known = '/queue/known'
reply = '/temp-queue/0'
## Client 1 uses pre-supplied connection and listener
## Set up client 2
conn2, listener2 = self.create_subscriber_connection(known)
try:
self.conn.send("test", destination=known,
headers = {"reply-to": reply})
self.assertTrue(listener2.await(5))
self.assertEquals(1, len(listener2.messages))
reply_to = listener2.messages[0]['headers']['reply-to']
self.assertTrue(reply_to.startswith('/reply-queue/'))
conn2.send("reply", destination=reply_to)
self.assertTrue(self.listener.await(5))
self.assertEquals("reply", self.listener.messages[0]['message'])
finally:
conn2.stop()
class TestDurableSubscription(base.BaseTest):
ID = 'test.subscription'
def __subscribe(self, dest, conn=None, id=None):
if not conn:
conn = self.conn
if not id:
id = TestDurableSubscription.ID
conn.subscribe(destination=dest,
headers ={'persistent': 'true',
'receipt': 1,
'id': id})
def __assert_receipt(self, listener=None):
if not listener:
listener = self.listener
self.assertTrue(listener.await(5))
self.assertEquals(1, len(self.listener.receipts))
def __assert_message(self, msg, listener=None):
if not listener:
listener = self.listener
self.assertTrue(listener.await(5))
self.assertEquals(1, len(listener.messages))
self.assertEquals(msg, listener.messages[0]['message'])
def test_durability(self):
d = '/topic/durable'
self.__subscribe(d)
self.__assert_receipt()
# send first message without unsubscribing
self.listener.reset(1)
self.conn.send("first", destination=d)
self.__assert_message("first")
# now unsubscribe (disconnect only)
self.conn.unsubscribe(id=TestDurableSubscription.ID)
# send again
self.listener.reset(1)
self.conn.send("second", destination=d)
# resubscribe and expect message
self.__subscribe(d)
self.__assert_message("second")
# now unsubscribe (cancel)
self.conn.unsubscribe(id=TestDurableSubscription.ID,
headers={'persistent': 'true'})
# send again
self.listener.reset(1)
self.conn.send("third", destination=d)
# resubscribe and expect no message
self.__subscribe(d)
self.assertTrue(self.listener.await(3))
self.assertEquals(0, len(self.listener.messages))
self.assertEquals(1, len(self.listener.receipts))
def test_share_subscription(self):
d = '/topic/durable-shared'
conn2 = self.create_connection()
conn2.set_listener('', self.listener)
try:
self.__subscribe(d)
self.__assert_receipt()
self.listener.reset(1)
self.__subscribe(d, conn2)
self.__assert_receipt()
self.listener.reset(100)
# send 100 messages
for x in xrange(0, 100):
self.conn.send("msg" + str(x), destination=d)
self.assertTrue(self.listener.await(5))
self.assertEquals(100, len(self.listener.messages))
finally:
conn2.stop()
def test_separate_ids(self):
d = '/topic/durable-separate'
conn2 = self.create_connection()
listener2 = base.WaitableListener()
conn2.set_listener('', listener2)
try:
# ensure durable subscription exists for each ID
self.__subscribe(d)
self.__assert_receipt()
self.__subscribe(d, conn2, "other.id")
self.__assert_receipt(listener2)
self.conn.unsubscribe(id=TestDurableSubscription.ID)
conn2.unsubscribe(id="other.id")
self.listener.reset(101)
listener2.reset(101) ## 100 messages and 1 receipt
# send 100 messages
for x in xrange(0, 100):
self.conn.send("msg" + str(x), destination=d)
self.__subscribe(d)
self.__subscribe(d, conn2, "other.id")
for l in [self.listener, listener2]:
self.assertTrue(l.await(10))
self.assertEquals(100, len(l.messages))
finally:
conn2.stop()
def test_durable_subscribe_no_id(self):
d = '/topic/durable-invalid'
self.conn.subscribe(destination=d, headers={'persistent':'true'}),
self.listener.await(3)
self.assertEquals(1, len(self.listener.errors))
self.assertEquals("Missing Header", self.listener.errors[0]['headers']['message'])

View File

@ -0,0 +1,125 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ Management Console.
%%
%% The Initial Developers of the Original Code are Rabbit Technologies Ltd.
%%
%% Copyright (C) 2011 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
%% Contributor(s): ______________________________________.
%%
-module(rabbit_stomp_amqqueue_test).
-export([all_tests/0]).
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_stomp_frame.hrl").
-define(QUEUE, <<"TestQueue">>).
-define(DESTINATION, "/amq/queue/TestQueue").
all_tests() ->
[ok = run_test(TestFun) || TestFun <- [fun test_subscribe_error/2,
fun test_subscribe/2,
fun test_send/2]],
ok.
run_test(TestFun) ->
{ok, Connection} = amqp_connection:start(#amqp_params_direct{}),
{ok, Channel} = amqp_connection:open_channel(Connection),
{ok, Sock} = stomp_connect(),
Result = (catch TestFun(Channel, Sock)),
stomp_disconnect(Sock),
amqp_channel:close(Channel),
amqp_connection:close(Connection),
Result.
test_subscribe_error(_Channel, Sock) ->
%% SUBSCRIBE to missing queue
stomp_send(Sock, "SUBSCRIBE", [{"destination", ?DESTINATION}]),
#stomp_frame{command = "ERROR", headers = Hdrs} = stomp_recv(Sock),
"not_found" = proplists:get_value("message", Hdrs),
ok.
test_subscribe(Channel, Sock) ->
#'queue.declare_ok'{} =
amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE,
auto_delete = true}),
%% subscribe and wait for receipt
stomp_send(Sock, "SUBSCRIBE", [{"destination", ?DESTINATION},
{"receipt", "foo"}]),
#stomp_frame{command = "RECEIPT"} = stomp_recv(Sock),
%% send from amqp
Method = #'basic.publish'{
exchange = <<"">>,
routing_key = ?QUEUE},
amqp_channel:call(Channel, Method, #amqp_msg{props = #'P_basic'{},
payload = <<"hello">>}),
#stomp_frame{command = "MESSAGE",
body_iolist = [<<"hello">>]} = stomp_recv(Sock),
ok.
test_send(Channel, Sock) ->
#'queue.declare_ok'{} =
amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE,
auto_delete = true}),
%% subscribe and wait for receipt
stomp_send(Sock, "SUBSCRIBE", [{"destination", ?DESTINATION},
{"receipt", "foo"}]),
#stomp_frame{command = "RECEIPT"} = stomp_recv(Sock),
%% send from stomp
stomp_send(Sock, "SEND", [{"destination", ?DESTINATION}], ["hello"]),
#stomp_frame{command = "MESSAGE",
body_iolist = [<<"hello">>]} = stomp_recv(Sock),
ok.
stomp_connect() ->
{ok, Sock} = gen_tcp:connect(localhost, 61613, [{active, false}, binary]),
stomp_send(Sock, "CONNECT"),
#stomp_frame{command = "CONNECTED"} = stomp_recv(Sock),
{ok, Sock}.
stomp_disconnect(Sock) ->
stomp_send(Sock, "DISCONNECT").
stomp_send(Sock, Command) ->
stomp_send(Sock, Command, []).
stomp_send(Sock, Command, Headers) ->
stomp_send(Sock, Command, Headers, []).
stomp_send(Sock, Command, Headers, Body) ->
gen_tcp:send(Sock, rabbit_stomp_frame:serialize(
#stomp_frame{command = list_to_binary(Command),
headers = Headers,
body_iolist = Body})).
stomp_recv(Sock) ->
{ok, Payload} = gen_tcp:recv(Sock, 0),
{ok, Frame, _Rest} =
rabbit_stomp_frame:parse(Payload,
rabbit_stomp_frame:initial_state()),
Frame.

View File

@ -195,10 +195,19 @@ valid_topic_test() ->
valid_exchange_test() ->
{ok, {exchange, {"test", undefined}}} = parse_destination("/exchange/test").
valid_temp_queue_test() ->
{ok, {temp_queue, "test"}} = parse_destination("/temp-queue/test").
valid_reply_queue_test() ->
{ok, {reply_queue, "test"}} = parse_destination("/reply-queue/test").
valid_exchange_with_pattern_test() ->
{ok, {exchange, {"test", "pattern"}}} =
parse_destination("/exchange/test/pattern").
valid_amqqueue_test() ->
{ok, {amqqueue, "test"}} = parse_destination("/amq/queue/test").
queue_with_no_name_test() ->
{error, {invalid_destination, queue, ""}} = parse_destination("/queue").
@ -213,6 +222,10 @@ exchange_default_name_test() ->
{error, {invalid_destination, exchange, "//foo"}} =
parse_destination("/exchange//foo").
amqqueue_with_no_name_test() ->
{error, {invalid_destination, amqqueue, ""}} =
parse_destination("/amq/queue").
queue_with_no_name_slash_test() ->
{error, {invalid_destination, queue, "/"}} = parse_destination("/queue/").

View File

@ -1,6 +1,7 @@
import base
import stomp
import unittest
import time
class TestReliability(base.BaseTest):
@ -20,10 +21,14 @@ class TestReliability(base.BaseTest):
for x in range(0, count):
pub_conn.send(msg + str(x), destination=d)
time.sleep(2.0)
pub_conn.close_socket()
self.assertTrue(listener.await(10))
self.assertEquals(count, len(listener.messages))
if listener.await(30):
self.assertEquals(count, len(listener.messages))
else:
listener.print_state("Final state of listener:")
self.fail("Did not receive %s messages in time" % count)
finally:
if pub_conn.is_connected():
pub_conn.disconnect()