Merge with default

This commit is contained in:
Rob Harrop 2011-01-24 23:50:22 +00:00
commit 29699ea26a
12 changed files with 147 additions and 85 deletions

View File

@ -7,8 +7,6 @@ TEST_APPS=rabbit_stomp
TEST_SCRIPTS=./test/test.py
UNIT_TEST_COMMANDS=eunit:test([rabbit_stomp_test_util,rabbit_stomp_test_frame],[verbose])
TEST_ARGS=-rabbit_stomp listeners "[{\"0.0.0.0\",61613}]"
include ../include.mk
testdeps:

View File

@ -32,21 +32,26 @@ You need to install the rabbit\_stomp.ez and amqp\_client.ez packages.
## Running the STOMP adapter
### Configuring the server to start the plugin automatically
Most RabbitMQ server packages are set up to cause the server to pick
up configuration from `/etc/rabbitmq/rabbitmq.conf`. To tell the
server to start your plugin, first make sure it is compiled, and then
add the following text to `/etc/rabbitmq/rabbitmq.conf`:
SERVER_START_ARGS='-rabbit_stomp listeners [{"0.0.0.0",61613}]'
Then restart the server with
sudo /etc/init.d/rabbitmq-server restart
When no configuration is specified the STOMP Adapter will listen on
localhost by default.
all interfaces on port 61613.
To change this, edit your [Configuration file](http://www.rabbitmq.com/install.html#configfile),
to contain a tcp_listeners variable for the rabbit_stomp application.
For example, a complete configuration file which changes the listener
port to 12345 would look like:
[
{rabbit_stomp, [{tcp_listeners, [12345]} ]}
].
while one which changes the listener to listen only on localhost (for
both IPv4 and IPv6) would look like:
[
{rabbit_stomp, [{tcp_listeners, [{"127.0.0.1", 61613},
{"::1", 61613} ]} ]}
].
### Checking that the adapter is running

View File

@ -12,5 +12,5 @@
]},
{registered, []},
{mod, {rabbit_stomp, []}},
{env, [{listeners, [{"127.0.0.1", 61613}]}]},
{env, [{tcp_listeners, [61613]}]},
{applications, [kernel, stdlib, rabbit]}]}.

View File

@ -45,7 +45,7 @@ stop(_State) ->
ok.
parse_listener_configuration() ->
case application:get_env(listeners) of
case application:get_env(tcp_listeners) of
undefined -> throw({error, {stomp_configuration_not_found}});
{ok, Listeners} -> Listeners
end.

View File

@ -42,7 +42,7 @@
connection, subscriptions, version,
start_heartbeat_fun}).
-record(subscription, {dest_hdr, channel, multi_ack}).
-record(subscription, {dest_hdr, channel, multi_ack, description}).
-define(SUPPORTED_VERSIONS, ["1.0", "1.1"]).
-define(DEFAULT_QUEUE_PREFETCH, 1).
@ -124,6 +124,8 @@ handle_cast(client_timeout, State) ->
handle_info(#'basic.consume_ok'{}, State) ->
{noreply, State};
handle_info(#'basic.cancel_ok'{}, State) ->
{noreply, State};
handle_info({Delivery = #'basic.deliver'{},
#amqp_msg{props = Props, payload = Payload}}, State) ->
{noreply, send_delivery(Delivery, Props, Payload, State)}.
@ -234,20 +236,42 @@ cancel_subscription({error, _}, State) ->
"UNSUBSCRIBE must include a 'destination' or 'id' header\n",
State);
cancel_subscription({ok, ConsumerTag}, State = #state{subscriptions = Subs}) ->
cancel_subscription({ok, ConsumerTag, Description},
State = #state{channel = MainChannel,
subscriptions = Subs}) ->
case dict:find(ConsumerTag, Subs) of
error ->
error("No subscription found",
"UNSUBSCRIBE must refer to an existing subscription\n",
"UNSUBSCRIBE must refer to an existing subscription.\n"
"Subscription to ~p not found.\n",
[Description],
State);
{ok, #subscription{channel = Channel}} ->
ok(send_method(#'basic.cancel'{consumer_tag = ConsumerTag,
nowait = true},
Channel,
State#state{subscriptions =
dict:erase(ConsumerTag, Subs)}))
{ok, #subscription{channel = SubChannel}} ->
case amqp_channel:call(SubChannel,
#'basic.cancel'{
consumer_tag = ConsumerTag}) of
#'basic.cancel_ok'{consumer_tag = ConsumerTag} ->
NewSubs = dict:erase(ConsumerTag, Subs),
ensure_subchannel_closed(SubChannel,
MainChannel,
State#state{
subscriptions = NewSubs});
_ ->
error("Failed to cancel subscription",
"UNSUBSCRIBE to ~p failed.\n",
[Description],
State)
end
end.
ensure_subchannel_closed(SubChannel, MainChannel, State)
when SubChannel == MainChannel ->
ok(State);
ensure_subchannel_closed(SubChannel, _MainChannel, State) ->
amqp_channel:close(SubChannel),
ok(State).
with_destination(Command, Frame, State, Fun) ->
case rabbit_stomp_frame:header(Frame, "destination") of
{ok, DestHdr} ->
@ -320,7 +344,7 @@ do_subscribe(Destination, DestHdr, Frame,
{ok, Queue} = ensure_queue(subscribe, Destination, Channel),
{ok, ConsumerTag} = rabbit_stomp_util:consumer_tag(Frame),
{ok, ConsumerTag, Description} = rabbit_stomp_util:consumer_tag(Frame),
amqp_channel:subscribe(Channel,
#'basic.consume'{
@ -338,7 +362,8 @@ do_subscribe(Destination, DestHdr, Frame,
dict:store(ConsumerTag,
#subscription{dest_hdr = DestHdr,
channel = Channel,
multi_ack = IsMulti},
multi_ack = IsMulti,
description = Description},
Subs)}).
do_send(Destination, _DestHdr,
@ -400,7 +425,7 @@ send_delivery(Delivery = #'basic.deliver'{consumer_tag = ConsumerTag},
State);
error ->
send_error("Subscription not found",
"There is no current subscription '~s'.",
"There is no current subscription with tag '~s'.",
[ConsumerTag],
State)
end.
@ -414,6 +439,9 @@ send_method(Method, State = #state{channel = Channel}) ->
send_method(Method, Properties, BodyFragments,
State = #state{channel = Channel}) ->
send_method(Method, Channel, Properties, BodyFragments, State).
send_method(Method, Channel, Properties, BodyFragments, State) ->
amqp_channel:call(Channel, Method, #amqp_msg{
props = Properties,
payload = lists:reverse(BodyFragments)}),
@ -508,8 +536,7 @@ abort_transaction(Transaction, State0) ->
perform_transaction_action({Method}, State) ->
send_method(Method, State);
perform_transaction_action({Channel, Method}, State) ->
amqp_channel:call(Channel, Method),
State;
send_method(Method, Channel, State);
perform_transaction_action({Method, Props, BodyFragments}, State) ->
send_method(Method, Props, BodyFragments, State).

View File

@ -52,25 +52,24 @@ init([Listeners]) ->
{ok, {{one_for_all, 10, 10}, ChildSpecs}}.
make_listener_specs(Listeners) ->
lists:foldl(
fun({Host, Port}, Acc) ->
{IPAddress, Name} = rabbit_networking:check_tcp_listener_address(
rabbit_stomp_listener_sup,
Host,
Port),
[{Name,
[make_listener_spec(Spec)
|| Spec <- lists:append([rabbit_networking:check_tcp_listener_address(
rabbit_stomp_listener_sup, Listener)
|| Listener <- Listeners])].
make_listener_spec({IPAddress, Port, Family, Name}) ->
{Name,
{tcp_listener_sup, start_link,
[IPAddress, Port,
[binary,
[Family,
binary,
{packet, raw},
{reuseaddr, true},
{backlog, 128}],
{?MODULE, listener_started, []},
{?MODULE, listener_stopped, []},
{?MODULE, start_client, []}, "STOMP Listener"]},
transient, infinity, supervisor, [tcp_listener_sup]} | Acc]
end, [], Listeners).
transient, infinity, supervisor, [tcp_listener_sup]}.
listener_started(IPAddress, Port) ->
rabbit_networking:tcp_listener_started(stomp, IPAddress, Port).

View File

@ -60,11 +60,11 @@
consumer_tag(Frame) ->
case rabbit_stomp_frame:header(Frame, "id") of
{ok, Str} ->
{ok, list_to_binary("T_" ++ Str)};
{ok, list_to_binary("T_" ++ Str), "id='" ++ Str ++ "'"};
not_found ->
case rabbit_stomp_frame:header(Frame, "destination") of
{ok, DestHdr} ->
{ok, list_to_binary("Q_" ++ DestHdr)};
{ok, list_to_binary("Q_" ++ DestHdr), "destination='" ++ DestHdr ++ "'"};
not_found ->
{error, missing_destination_header}
end

View File

@ -49,7 +49,7 @@ class BaseTest(unittest.TestCase):
self.assertEquals("foo", msg['message'])
self.assertEquals(dest, msg['headers']['destination'])
def assertListener(self, errMsg, numMsgs=0, numErrs=0, numRcts=0, timeout=3):
def assertListener(self, errMsg, numMsgs=0, numErrs=0, numRcts=0, timeout=1):
if numMsgs + numErrs + numRcts > 0:
self.assertTrue(self.listener.await(timeout), errMsg + " (#awaiting)")
else:
@ -58,7 +58,7 @@ class BaseTest(unittest.TestCase):
self.assertEquals(numErrs, len(self.listener.errors), errMsg + " (#errors)")
self.assertEquals(numRcts, len(self.listener.receipts), errMsg + " (#receipts)")
def assertListenerAfter(self, verb, errMsg="", numMsgs=0, numErrs=0, numRcts=0, timeout=3):
def assertListenerAfter(self, verb, errMsg="", numMsgs=0, numErrs=0, numRcts=0, timeout=1):
num = numMsgs + numErrs + numRcts
self.listener.reset(num if num>0 else 1)
verb()
@ -77,13 +77,13 @@ class WaitableListener(object):
def on_receipt(self, headers, message):
if self.debug:
print '(on_message) message:', message, 'headers:', headers
print '(on_receipt) message:', message, 'headers:', headers
self.receipts.append({'message' : message, 'headers' : headers})
self.latch.countdown()
def on_error(self, headers, message):
if self.debug:
print '(on_message) message:', message, 'headers:', headers
print '(on_error) message:', message, 'headers:', headers
self.errors.append({'message' : message, 'headers' : headers})
self.latch.countdown()
@ -95,9 +95,10 @@ class WaitableListener(object):
def reset(self, count=1):
if self.debug:
print '(reset listener) #messages:', len(self.messages),
print '#errors', len(self.errors),
print '#receipts', len(self.receipts), 'Now expecting:', count
print '(reset listener)',
print '#messages:', len(self.messages),
print '#errors:', len(self.errors),
print '#receipts:', len(self.receipts), 'Now expecting:', count
self.messages = []
self.errors = []
self.receipts = []
@ -116,6 +117,7 @@ class Latch(object):
def countdown(self):
self.cond.acquire()
if self.count > 0:
self.count -= 1
if self.count == 0:
self.cond.notify_all()

View File

@ -1,6 +1,7 @@
import unittest
import stomp
import base
import time
class TestExchange(base.BaseTest):
@ -32,7 +33,7 @@ class TestExchange(base.BaseTest):
self.assertEquals("not_found", err['headers']['message'])
self.assertEquals("no exchange 'does.not.exist' in vhost '/'\n",
err['message'])
time.sleep(1)
self.assertFalse(self.conn.is_connected())
def __test_exchange_send_rec(self, exchange, route = None):

View File

@ -65,4 +65,3 @@ class TestErrors(base.BaseTest):
dtype + " destination\n",
err['message'])

View File

@ -7,21 +7,46 @@ class TestLifecycle(base.BaseTest):
def test_unsubscribe_exchange_destination(self):
''' Test UNSUBSCRIBE command with exchange'''
self.unsub_test(self.sub_and_send("/exchange/amq.fanout"))
d = "/exchange/amq.fanout"
self.unsub_test(d, self.sub_and_send(d))
def test_unsubscribe_exchange_destination_with_receipt(self):
''' Test receipted UNSUBSCRIBE command with exchange'''
d = "/exchange/amq.fanout"
self.unsub_test(d, self.sub_and_send(d, receipt="unsub.rct"), numRcts=1)
def test_unsubscribe_queue_destination(self):
''' Test UNSUBSCRIBE command with queue'''
self.unsub_test(self.sub_and_send("/queue/unsub01"))
d = "/queue/unsub01"
self.unsub_test(d, self.sub_and_send(d))
def test_unsubscribe_queue_destination_with_receipt(self):
''' Test receipted UNSUBSCRIBE command with queue'''
d = "/queue/unsub02"
self.unsub_test(d, self.sub_and_send(d, receipt="unsub.rct"), numRcts=1)
def test_unsubscribe_exchange_id(self):
''' Test UNSUBSCRIBE command with exchange by id'''
self.unsub_test(self.subid_and_send("/exchange/amq.fanout", "exchid"))
d = "/exchange/amq.fanout"
self.unsub_test(d, self.sub_and_send(d, subid="exchid"))
def test_unsubscribe_exchange_id_with_receipt(self):
''' Test receipted UNSUBSCRIBE command with exchange by id'''
d = "/exchange/amq.fanout"
self.unsub_test(d, self.sub_and_send(d, subid="exchid", receipt="unsub.rct"), numRcts=1)
def test_unsubscribe_queue_id(self):
''' Test UNSUBSCRIBE command with queue by id'''
self.unsub_test(self.subid_and_send("/queue/unsub02", "queid"))
d = "/queue/unsub03"
self.unsub_test(d, self.sub_and_send(d, subid="queid"))
def test_unsubscribe_queue_id_with_receipt(self):
''' Test receipted UNSUBSCRIBE command with queue by id'''
d = "/queue/unsub04"
self.unsub_test(d, self.sub_and_send(d, subid="queid", receipt="unsub.rct"), numRcts=1)
def test_connect_version_1_1(self):
''' Test CONNECT with version 1.1'''
self.conn.disconnect()
new_conn = self.create_connection(version="1.1,1.0")
try:
@ -30,6 +55,7 @@ class TestLifecycle(base.BaseTest):
new_conn.disconnect()
def test_heartbeat_disconnects_client(self):
''' Test heartbeat disconnection'''
self.conn.disconnect()
new_conn = self.create_connection(heartbeat="1500,0")
try:
@ -42,9 +68,8 @@ class TestLifecycle(base.BaseTest):
if new_conn.is_connected():
new_conn.disconnect()
def test_unsupported_version(self):
''' Test unsupported version on CONNECT command'''
self.conn.disconnect()
new_conn = stomp.Connection(user="guest",
passcode="guest",
@ -62,29 +87,35 @@ class TestLifecycle(base.BaseTest):
new_conn.disconnect()
def test_disconnect(self):
''' Run DISCONNECT command '''
''' Test DISCONNECT command'''
self.conn.disconnect()
self.assertFalse(self.conn.is_connected())
def unsub_test(self, verbs):
def unsub_test(self, dest, verbs, numRcts=0):
def afterfun():
self.conn.send("after-test", destination=dest)
subverb, unsubverb = verbs
self.assertListenerAfter(subverb,
numMsgs=1, errMsg="FAILED to subscribe and send")
self.assertListenerAfter(unsubverb,
self.assertListenerAfter(subverb, numMsgs=1,
errMsg="FAILED to subscribe and send")
self.assertListenerAfter(unsubverb, numRcts=numRcts,
errMsg="Incorrect responses from UNSUBSCRIBE")
self.assertListenerAfter(afterfun,
errMsg="Still receiving messages")
def subid_and_send(self, dest, subid):
def sub_and_send(self, dest, subid="", receipt=""):
def subfun():
if subid=="":
self.conn.subscribe(destination=dest)
else:
self.conn.subscribe(destination=dest, id=subid)
self.conn.send("test", destination=dest)
def unsubfun():
self.conn.unsubscribe(id=subid)
return subfun, unsubfun
def sub_and_send(self, dest):
def subfun():
self.conn.subscribe(destination=dest)
self.conn.send("test", destination=dest)
def unsubfun():
if subid=="" and receipt=="":
self.conn.unsubscribe(destination=dest)
elif receipt=="":
self.conn.unsubscribe(id=subid)
elif subid=="":
self.conn.unsubscribe(destination=dest, receipt=receipt)
else:
self.conn.unsubscribe(id=subid, receipt=receipt)
return subfun, unsubfun

View File

@ -150,11 +150,11 @@ ack_mode_client_individual_test() ->
consumer_tag_id_test() ->
Frame = #stomp_frame{headers = [{"id", "foo"}]},
{ok, <<"T_foo">>} = rabbit_stomp_util:consumer_tag(Frame).
{ok, <<"T_foo">>, _} = rabbit_stomp_util:consumer_tag(Frame).
consumer_tag_destination_test() ->
Frame = #stomp_frame{headers = [{"destination", "foo"}]},
{ok, <<"Q_foo">>} = rabbit_stomp_util:consumer_tag(Frame).
{ok, <<"Q_foo">>, _} = rabbit_stomp_util:consumer_tag(Frame).
consumer_tag_invalid_test() ->
Frame = #stomp_frame{headers = []},