Merge bug22005 into default.

This commit is contained in:
Steve Powell 2011-06-07 16:55:22 +01:00
commit 80f865601b
12 changed files with 198 additions and 52 deletions

View File

@ -5,3 +5,5 @@
^cover/
^erl_crash.dump$
\.pyc$
^test/certs/
^test/ebin/test.config

View File

@ -5,6 +5,7 @@
{registered, []},
{mod, {rabbit_stomp, []}},
{env, [{tcp_listeners, [61613]},
{ssl_listeners, []},
{tcp_listen_options, [binary,
{packet, raw},
{reuseaddr, true},

View File

@ -3,11 +3,38 @@ DEPS:=rabbitmq-server rabbitmq-erlang-client
STANDALONE_TEST_COMMANDS:=eunit:test([rabbit_stomp_test_util,rabbit_stomp_test_frame],[verbose])
WITH_BROKER_TEST_SCRIPTS:=$(PACKAGE_DIR)/test/src/test.py
RABBITMQ_TEST_PATH=../../rabbitmq-test
CERTS_DIR:=$(abspath test/certs)
CAN_RUN_SSL:=$(shell if [ -d $(RABBITMQ_TEST_PATH) ]; then echo "true"; else echo "false"; fi)
TEST_CONFIG_PATH=$(TEST_EBIN_DIR)/test.config
WITH_BROKER_TEST_CONFIG:=$(TEST_EBIN_DIR)/test
ifeq ($(CAN_RUN_SSL),true)
WITH_BROKER_TEST_SCRIPTS += $(PACKAGE_DIR)/test/src/test_ssl.py
$(TEST_CONFIG_PATH): $(CERTS_DIR)
sed -e "s|%%CERTS_DIR%%|$(CERTS_DIR)|g" < test/src/ssl.config > $@
echo $(WITH_BROKER_TEST_CONFIG)
$(CERTS_DIR):
mkdir -p $(CERTS_DIR)
make -C $(RABBITMQ_TEST_PATH)/certs all PASSWORD=test DIR=$(CERTS_DIR)
else
$(TEST_CONFIG_PATH):
echo "[]." >> $@
endif
define package_rules
$(PACKAGE_DIR)+pre-test::
$(PACKAGE_DIR)+pre-test:: $(TEST_CONFIG_PATH)
make -C $(PACKAGE_DIR)/deps/stomppy
$(PACKAGE_DIR)+clean::
rm -rf $(CERTS_DIR)
$(PACKAGE_DIR)+clean-with-deps::
make -C $(PACKAGE_DIR)/deps/stomppy distclean

View File

@ -46,6 +46,11 @@ stop(_State) ->
parse_listener_configuration() ->
case application:get_env(tcp_listeners) of
undefined -> throw({error, {stomp_configuration_not_found}});
{ok, Listeners} -> Listeners
undefined ->
throw({error, {stomp_configuration_not_found}});
{ok, Listeners} ->
case application:get_env(ssl_listeners) of
undefined -> {Listeners, []};
{ok, SslListeners} -> {Listeners, SslListeners}
end
end.

View File

@ -356,12 +356,18 @@ do_login(_, _, _, _, _, _, State) ->
adapter_info(Sock, Version) ->
{ok, {Addr, Port}} = rabbit_net:sockname(Sock),
{ok, {PeerAddr, PeerPort}} = rabbit_net:peername(Sock),
#adapter_info{protocol = {'STOMP', Version},
#adapter_info{protocol = {adapter_protocol(Sock), Version},
address = Addr,
port = Port,
peer_address = PeerAddr,
peer_port = PeerPort}.
adapter_protocol(Sock) ->
case rabbit_net:is_ssl(Sock) of
true -> "STOMP/SSL";
false -> "STOMP"
end.
do_subscribe(Destination, DestHdr, Frame,
State = #state{subscriptions = Subs,
connection = Connection,
@ -660,7 +666,7 @@ ensure_heartbeats(Heartbeats,
X <- re:split(Heartbeats, ",", [{return, list}])],
SendFun = fun() ->
catch gen_tcp:send(Sock, <<0>>)
catch rabbit_net:send(Sock, <<0>>)
end,
Pid = self(),

View File

@ -44,12 +44,11 @@ start_link(ProcessorPid) ->
init(ProcessorPid) ->
receive
{go, Sock} ->
ok = inet:setopts(Sock, [{active, false}]),
{ok, {PeerAddress, PeerPort}} = inet:peername(Sock),
{ok, {PeerAddress, PeerPort}} = rabbit_net:peername(Sock),
PeerAddressS = inet_parse:ntoa(PeerAddress),
error_logger:info_msg("starting STOMP connection ~p from ~s:~p~n",
[self(), PeerAddressS, PeerPort]),
ParseState = rabbit_stomp_frame:initial_state(),
try
?MODULE:mainloop(

View File

@ -33,41 +33,72 @@
-export([start_link/1, init/1]).
-export([listener_started/2, listener_stopped/2, start_client/1]).
-export([listener_started/3, listener_stopped/3,
start_client/1, start_ssl_client/2]).
start_link(Listeners) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [Listeners]).
init([Listeners]) ->
init([{Listeners, SslListeners}]) ->
{ok, SocketOpts} = application:get_env(rabbitmq_stomp, tcp_listen_options),
SslOpts = case SslListeners of
[] -> none;
_ -> rabbit_networking:ensure_ssl()
end,
{ok, {{one_for_all, 10, 10},
[{rabbit_stomp_client_sup_sup,
{rabbit_client_sup, start_link,
[{local, rabbit_stomp_client_sup_sup},
{rabbit_stomp_client_sup, start_link,[]}]},
transient, infinity, supervisor, [rabbit_client_sup]} |
[{Name,
{tcp_listener_sup, start_link,
[IPAddress, Port,
[Family | SocketOpts],
{?MODULE, listener_started, []},
{?MODULE, listener_stopped, []},
{?MODULE, start_client, []}, "STOMP Listener"]},
transient, infinity, supervisor, [tcp_listener_sup]} ||
Listener <- Listeners,
{IPAddress, Port, Family, Name} <-
rabbit_networking:check_tcp_listener_address(
rabbit_stomp_listener_sup, Listener)]]}}.
listener_specs(fun tcp_listener_spec/1, [SocketOpts], Listeners) ++
listener_specs(fun ssl_listener_spec/1,
[SocketOpts, SslOpts], SslListeners)]}}.
listener_started(IPAddress, Port) ->
rabbit_networking:tcp_listener_started(stomp, IPAddress, Port).
listener_stopped(IPAddress, Port) ->
rabbit_networking:tcp_listener_stopped(stomp, IPAddress, Port).
listener_specs(Fun, Args, Listeners) ->
[Fun([Address | Args]) ||
Listener <- Listeners,
Address <- rabbit_networking:check_tcp_listener_address(
rabbit_stomp_listener_sup, Listener)].
tcp_listener_spec([Address, SocketOpts]) ->
listener_spec(Address, SocketOpts, stomp,
{?MODULE, start_client, []}, "STOMP TCP Listener").
ssl_listener_spec([Address, SocketOpts, SslOpts]) ->
listener_spec(Address, SocketOpts, 'stomp/ssl',
{?MODULE, start_ssl_client, [SslOpts]}, "STOMP SSL Listener").
listener_spec({IPAddress, Port, Family, Name},
SocketOpts, Protocol, OnConnect, Label) ->
{Name,
{tcp_listener_sup, start_link,
[IPAddress, Port,
[Family | SocketOpts],
{?MODULE, listener_started, [Protocol]},
{?MODULE, listener_stopped, [Protocol]},
OnConnect, Label]},
transient, infinity, supervisor, [tcp_listener_sup]}.
listener_started(Protocol, IPAddress, Port) ->
rabbit_networking:tcp_listener_started(Protocol, IPAddress, Port).
listener_stopped(Protocol, IPAddress, Port) ->
rabbit_networking:tcp_listener_stopped(Protocol, IPAddress, Port).
start_client(Sock) ->
{ok, SupPid, ReaderPid} =
supervisor:start_child(rabbit_stomp_client_sup_sup, [Sock]),
ok = gen_tcp:controlling_process(Sock, ReaderPid),
ok = rabbit_net:controlling_process(Sock, ReaderPid),
ReaderPid ! {go, Sock},
SupPid.
start_ssl_client(SslOpts, Sock) ->
Transform = rabbit_networking:ssl_transform_fun(SslOpts),
{ok, SslSock} = Transform(Sock),
start_client(SslSock).

12
deps/rabbitmq_stomp/test/src/ssl.config vendored Normal file
View File

@ -0,0 +1,12 @@
[
{rabbitmq_stomp, [
{ssl_listeners, [61614]}
]},
{rabbit, [
{ssl_options, [{cacertfile,"%%CERTS_DIR%%/testca/cacert.pem"},
{certfile,"%%CERTS_DIR%%/server/cert.pem"},
{keyfile,"%%CERTS_DIR%%/server/key.pem"},
{verify,verify_peer},
{fail_if_no_peer_cert,false}]}
]}
].

View File

@ -0,0 +1,51 @@
import unittest
import os
import stomp
import base
class TestSslClient(unittest.TestCase):
def __ssl_connect(self):
ssl_key_file = os.path.abspath("test/certs/client/key.pem")
ssl_cert_file = os.path.abspath("test/certs/client/cert.pem")
ssl_ca_certs = os.path.abspath("test/certs/testca/cacert.pem")
conn = stomp.Connection(user="guest", passcode="guest",
host_and_ports = [ ('localhost', 61614) ],
use_ssl = True, ssl_key_file = ssl_key_file,
ssl_cert_file = ssl_cert_file,
ssl_ca_certs = ssl_ca_certs)
conn.start()
conn.connect()
return conn
def test_ssl_connect(self):
conn = self.__ssl_connect()
conn.stop()
def test_ssl_send_receive(self):
conn = self.__ssl_connect()
try:
listener = base.WaitableListener()
conn.set_listener('', listener)
d = "/topic/ssl.test"
conn.subscribe(destination=d, receipt="sub")
self.assertTrue(listener.await(1))
self.assertEquals("sub",
listener.receipts[0]['headers']['receipt-id'])
listener.reset(1)
conn.send("Hello SSL!", destination=d)
self.assertTrue(listener.await())
self.assertEquals("Hello SSL!", listener.messages[0]['message'])
finally:
conn.disconnect()

View File

@ -1,30 +1,9 @@
#!/usr/bin/env python
import unittest
import sys
import os
def add_deps_to_path():
deps_dir = os.path.realpath(os.path.join(__file__, "..", "..", "..", "deps"))
sys.path.append(os.path.join(deps_dir, "stomppy", "stomppy"))
def run_unittests():
add_deps_to_path()
modules = ['parsing', 'destinations', 'lifecycle', 'transactions',
'ack', 'errors']
suite = unittest.TestSuite()
for m in modules:
mod = __import__(m)
for name in dir(mod):
obj = getattr(mod, name)
if name.startswith("Test") and issubclass(obj, unittest.TestCase):
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(obj))
ts = unittest.TextTestRunner().run(unittest.TestSuite(suite))
if ts.errors or ts.failures:
sys.exit(1)
import test_runner
if __name__ == '__main__':
run_unittests()
modules = ['parsing', 'destinations', 'lifecycle', 'transactions',
'ack', 'errors']
test_runner.run_unittests(modules)

View File

@ -0,0 +1,25 @@
#!/usr/bin/env python
import unittest
import sys
import os
def add_deps_to_path():
deps_dir = os.path.realpath(os.path.join(__file__, "..", "..", "..", "deps"))
sys.path.append(os.path.join(deps_dir, "stomppy", "stomppy"))
def run_unittests(modules):
add_deps_to_path()
suite = unittest.TestSuite()
for m in modules:
mod = __import__(m)
for name in dir(mod):
obj = getattr(mod, name)
if name.startswith("Test") and issubclass(obj, unittest.TestCase):
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(obj))
ts = unittest.TextTestRunner().run(unittest.TestSuite(suite))
if ts.errors or ts.failures:
sys.exit(1)

8
deps/rabbitmq_stomp/test/src/test_ssl.py vendored Executable file
View File

@ -0,0 +1,8 @@
#!/usr/bin/env python
import test_runner
if __name__ == '__main__':
modules = ['ssl_lifecycle']
test_runner.run_unittests(modules)