diff --git a/deps/rabbitmq_stomp/.hgignore b/deps/rabbitmq_stomp/.hgignore index 589d0ea2de..67187c170b 100644 --- a/deps/rabbitmq_stomp/.hgignore +++ b/deps/rabbitmq_stomp/.hgignore @@ -5,3 +5,5 @@ ^cover/ ^erl_crash.dump$ \.pyc$ +^test/certs/ +^test/ebin/test.config diff --git a/deps/rabbitmq_stomp/ebin/rabbitmq_stomp.app.in b/deps/rabbitmq_stomp/ebin/rabbitmq_stomp.app.in index a968ec64cb..9e985ab5ab 100644 --- a/deps/rabbitmq_stomp/ebin/rabbitmq_stomp.app.in +++ b/deps/rabbitmq_stomp/ebin/rabbitmq_stomp.app.in @@ -5,6 +5,7 @@ {registered, []}, {mod, {rabbit_stomp, []}}, {env, [{tcp_listeners, [61613]}, + {ssl_listeners, []}, {tcp_listen_options, [binary, {packet, raw}, {reuseaddr, true}, diff --git a/deps/rabbitmq_stomp/package.mk b/deps/rabbitmq_stomp/package.mk index b850fad29c..6e41d4ed07 100644 --- a/deps/rabbitmq_stomp/package.mk +++ b/deps/rabbitmq_stomp/package.mk @@ -3,11 +3,38 @@ DEPS:=rabbitmq-server rabbitmq-erlang-client STANDALONE_TEST_COMMANDS:=eunit:test([rabbit_stomp_test_util,rabbit_stomp_test_frame],[verbose]) WITH_BROKER_TEST_SCRIPTS:=$(PACKAGE_DIR)/test/src/test.py +RABBITMQ_TEST_PATH=../../rabbitmq-test +CERTS_DIR:=$(abspath test/certs) +CAN_RUN_SSL:=$(shell if [ -d $(RABBITMQ_TEST_PATH) ]; then echo "true"; else echo "false"; fi) + +TEST_CONFIG_PATH=$(TEST_EBIN_DIR)/test.config +WITH_BROKER_TEST_CONFIG:=$(TEST_EBIN_DIR)/test + +ifeq ($(CAN_RUN_SSL),true) + +WITH_BROKER_TEST_SCRIPTS += $(PACKAGE_DIR)/test/src/test_ssl.py + +$(TEST_CONFIG_PATH): $(CERTS_DIR) + sed -e "s|%%CERTS_DIR%%|$(CERTS_DIR)|g" < test/src/ssl.config > $@ + echo $(WITH_BROKER_TEST_CONFIG) + +$(CERTS_DIR): + mkdir -p $(CERTS_DIR) + make -C $(RABBITMQ_TEST_PATH)/certs all PASSWORD=test DIR=$(CERTS_DIR) + +else +$(TEST_CONFIG_PATH): + echo "[]." >> $@ +endif + define package_rules -$(PACKAGE_DIR)+pre-test:: +$(PACKAGE_DIR)+pre-test:: $(TEST_CONFIG_PATH) make -C $(PACKAGE_DIR)/deps/stomppy +$(PACKAGE_DIR)+clean:: + rm -rf $(CERTS_DIR) + $(PACKAGE_DIR)+clean-with-deps:: make -C $(PACKAGE_DIR)/deps/stomppy distclean diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp.erl b/deps/rabbitmq_stomp/src/rabbit_stomp.erl index 0f5e226ebf..c769dbd8bc 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp.erl @@ -46,6 +46,11 @@ stop(_State) -> parse_listener_configuration() -> case application:get_env(tcp_listeners) of - undefined -> throw({error, {stomp_configuration_not_found}}); - {ok, Listeners} -> Listeners + undefined -> + throw({error, {stomp_configuration_not_found}}); + {ok, Listeners} -> + case application:get_env(ssl_listeners) of + undefined -> {Listeners, []}; + {ok, SslListeners} -> {Listeners, SslListeners} + end end. diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl index 872c047ce6..6b2be9956c 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl @@ -356,12 +356,18 @@ do_login(_, _, _, _, _, _, State) -> adapter_info(Sock, Version) -> {ok, {Addr, Port}} = rabbit_net:sockname(Sock), {ok, {PeerAddr, PeerPort}} = rabbit_net:peername(Sock), - #adapter_info{protocol = {'STOMP', Version}, + #adapter_info{protocol = {adapter_protocol(Sock), Version}, address = Addr, port = Port, peer_address = PeerAddr, peer_port = PeerPort}. +adapter_protocol(Sock) -> + case rabbit_net:is_ssl(Sock) of + true -> "STOMP/SSL"; + false -> "STOMP" + end. + do_subscribe(Destination, DestHdr, Frame, State = #state{subscriptions = Subs, connection = Connection, @@ -660,7 +666,7 @@ ensure_heartbeats(Heartbeats, X <- re:split(Heartbeats, ",", [{return, list}])], SendFun = fun() -> - catch gen_tcp:send(Sock, <<0>>) + catch rabbit_net:send(Sock, <<0>>) end, Pid = self(), diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl index 711f9cba5e..fd393756e5 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl @@ -44,12 +44,11 @@ start_link(ProcessorPid) -> init(ProcessorPid) -> receive {go, Sock} -> - ok = inet:setopts(Sock, [{active, false}]), - - {ok, {PeerAddress, PeerPort}} = inet:peername(Sock), + {ok, {PeerAddress, PeerPort}} = rabbit_net:peername(Sock), PeerAddressS = inet_parse:ntoa(PeerAddress), error_logger:info_msg("starting STOMP connection ~p from ~s:~p~n", [self(), PeerAddressS, PeerPort]), + ParseState = rabbit_stomp_frame:initial_state(), try ?MODULE:mainloop( diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_sup.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_sup.erl index a65922db47..65832cc0ad 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_sup.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_sup.erl @@ -33,41 +33,72 @@ -export([start_link/1, init/1]). --export([listener_started/2, listener_stopped/2, start_client/1]). +-export([listener_started/3, listener_stopped/3, + start_client/1, start_ssl_client/2]). start_link(Listeners) -> supervisor:start_link({local, ?MODULE}, ?MODULE, [Listeners]). -init([Listeners]) -> +init([{Listeners, SslListeners}]) -> {ok, SocketOpts} = application:get_env(rabbitmq_stomp, tcp_listen_options), + + SslOpts = case SslListeners of + [] -> none; + _ -> rabbit_networking:ensure_ssl() + end, + {ok, {{one_for_all, 10, 10}, [{rabbit_stomp_client_sup_sup, {rabbit_client_sup, start_link, [{local, rabbit_stomp_client_sup_sup}, {rabbit_stomp_client_sup, start_link,[]}]}, transient, infinity, supervisor, [rabbit_client_sup]} | - [{Name, - {tcp_listener_sup, start_link, - [IPAddress, Port, - [Family | SocketOpts], - {?MODULE, listener_started, []}, - {?MODULE, listener_stopped, []}, - {?MODULE, start_client, []}, "STOMP Listener"]}, - transient, infinity, supervisor, [tcp_listener_sup]} || - Listener <- Listeners, - {IPAddress, Port, Family, Name} <- - rabbit_networking:check_tcp_listener_address( - rabbit_stomp_listener_sup, Listener)]]}}. + listener_specs(fun tcp_listener_spec/1, [SocketOpts], Listeners) ++ + listener_specs(fun ssl_listener_spec/1, + [SocketOpts, SslOpts], SslListeners)]}}. -listener_started(IPAddress, Port) -> - rabbit_networking:tcp_listener_started(stomp, IPAddress, Port). -listener_stopped(IPAddress, Port) -> - rabbit_networking:tcp_listener_stopped(stomp, IPAddress, Port). + +listener_specs(Fun, Args, Listeners) -> + [Fun([Address | Args]) || + Listener <- Listeners, + Address <- rabbit_networking:check_tcp_listener_address( + rabbit_stomp_listener_sup, Listener)]. + +tcp_listener_spec([Address, SocketOpts]) -> + listener_spec(Address, SocketOpts, stomp, + {?MODULE, start_client, []}, "STOMP TCP Listener"). + +ssl_listener_spec([Address, SocketOpts, SslOpts]) -> + listener_spec(Address, SocketOpts, 'stomp/ssl', + {?MODULE, start_ssl_client, [SslOpts]}, "STOMP SSL Listener"). + +listener_spec({IPAddress, Port, Family, Name}, + SocketOpts, Protocol, OnConnect, Label) -> + {Name, + {tcp_listener_sup, start_link, + [IPAddress, Port, + [Family | SocketOpts], + {?MODULE, listener_started, [Protocol]}, + {?MODULE, listener_stopped, [Protocol]}, + OnConnect, Label]}, + transient, infinity, supervisor, [tcp_listener_sup]}. + +listener_started(Protocol, IPAddress, Port) -> + rabbit_networking:tcp_listener_started(Protocol, IPAddress, Port). + +listener_stopped(Protocol, IPAddress, Port) -> + rabbit_networking:tcp_listener_stopped(Protocol, IPAddress, Port). start_client(Sock) -> {ok, SupPid, ReaderPid} = supervisor:start_child(rabbit_stomp_client_sup_sup, [Sock]), - ok = gen_tcp:controlling_process(Sock, ReaderPid), + ok = rabbit_net:controlling_process(Sock, ReaderPid), ReaderPid ! {go, Sock}, SupPid. + +start_ssl_client(SslOpts, Sock) -> + Transform = rabbit_networking:ssl_transform_fun(SslOpts), + {ok, SslSock} = Transform(Sock), + start_client(SslSock). + diff --git a/deps/rabbitmq_stomp/test/src/ssl.config b/deps/rabbitmq_stomp/test/src/ssl.config new file mode 100644 index 0000000000..67ec5c4ae7 --- /dev/null +++ b/deps/rabbitmq_stomp/test/src/ssl.config @@ -0,0 +1,12 @@ +[ + {rabbitmq_stomp, [ + {ssl_listeners, [61614]} + ]}, + {rabbit, [ + {ssl_options, [{cacertfile,"%%CERTS_DIR%%/testca/cacert.pem"}, + {certfile,"%%CERTS_DIR%%/server/cert.pem"}, + {keyfile,"%%CERTS_DIR%%/server/key.pem"}, + {verify,verify_peer}, + {fail_if_no_peer_cert,false}]} + ]} +]. diff --git a/deps/rabbitmq_stomp/test/src/ssl_lifecycle.py b/deps/rabbitmq_stomp/test/src/ssl_lifecycle.py new file mode 100644 index 0000000000..554d94983d --- /dev/null +++ b/deps/rabbitmq_stomp/test/src/ssl_lifecycle.py @@ -0,0 +1,51 @@ +import unittest +import os + +import stomp +import base + +class TestSslClient(unittest.TestCase): + + def __ssl_connect(self): + ssl_key_file = os.path.abspath("test/certs/client/key.pem") + ssl_cert_file = os.path.abspath("test/certs/client/cert.pem") + ssl_ca_certs = os.path.abspath("test/certs/testca/cacert.pem") + + conn = stomp.Connection(user="guest", passcode="guest", + host_and_ports = [ ('localhost', 61614) ], + use_ssl = True, ssl_key_file = ssl_key_file, + ssl_cert_file = ssl_cert_file, + ssl_ca_certs = ssl_ca_certs) + + conn.start() + conn.connect() + return conn + + def test_ssl_connect(self): + conn = self.__ssl_connect() + conn.stop() + + def test_ssl_send_receive(self): + conn = self.__ssl_connect() + + try: + listener = base.WaitableListener() + + conn.set_listener('', listener) + + d = "/topic/ssl.test" + conn.subscribe(destination=d, receipt="sub") + + self.assertTrue(listener.await(1)) + + self.assertEquals("sub", + listener.receipts[0]['headers']['receipt-id']) + + listener.reset(1) + conn.send("Hello SSL!", destination=d) + + self.assertTrue(listener.await()) + + self.assertEquals("Hello SSL!", listener.messages[0]['message']) + finally: + conn.disconnect() diff --git a/deps/rabbitmq_stomp/test/src/test.py b/deps/rabbitmq_stomp/test/src/test.py index a16b62e5da..4a5edb6d42 100755 --- a/deps/rabbitmq_stomp/test/src/test.py +++ b/deps/rabbitmq_stomp/test/src/test.py @@ -1,30 +1,9 @@ #!/usr/bin/env python -import unittest -import sys -import os - -def add_deps_to_path(): - deps_dir = os.path.realpath(os.path.join(__file__, "..", "..", "..", "deps")) - sys.path.append(os.path.join(deps_dir, "stomppy", "stomppy")) - -def run_unittests(): - add_deps_to_path() - modules = ['parsing', 'destinations', 'lifecycle', 'transactions', - 'ack', 'errors'] - - suite = unittest.TestSuite() - for m in modules: - mod = __import__(m) - for name in dir(mod): - obj = getattr(mod, name) - if name.startswith("Test") and issubclass(obj, unittest.TestCase): - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(obj)) - - ts = unittest.TextTestRunner().run(unittest.TestSuite(suite)) - if ts.errors or ts.failures: - sys.exit(1) +import test_runner if __name__ == '__main__': - run_unittests() + modules = ['parsing', 'destinations', 'lifecycle', 'transactions', + 'ack', 'errors'] + test_runner.run_unittests(modules) diff --git a/deps/rabbitmq_stomp/test/src/test_runner.py b/deps/rabbitmq_stomp/test/src/test_runner.py new file mode 100644 index 0000000000..7216865e7c --- /dev/null +++ b/deps/rabbitmq_stomp/test/src/test_runner.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python + +import unittest +import sys +import os + +def add_deps_to_path(): + deps_dir = os.path.realpath(os.path.join(__file__, "..", "..", "..", "deps")) + sys.path.append(os.path.join(deps_dir, "stomppy", "stomppy")) + +def run_unittests(modules): + add_deps_to_path() + + suite = unittest.TestSuite() + for m in modules: + mod = __import__(m) + for name in dir(mod): + obj = getattr(mod, name) + if name.startswith("Test") and issubclass(obj, unittest.TestCase): + suite.addTest(unittest.TestLoader().loadTestsFromTestCase(obj)) + + ts = unittest.TextTestRunner().run(unittest.TestSuite(suite)) + if ts.errors or ts.failures: + sys.exit(1) + diff --git a/deps/rabbitmq_stomp/test/src/test_ssl.py b/deps/rabbitmq_stomp/test/src/test_ssl.py new file mode 100755 index 0000000000..b62755f31e --- /dev/null +++ b/deps/rabbitmq_stomp/test/src/test_ssl.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python + +import test_runner + +if __name__ == '__main__': + modules = ['ssl_lifecycle'] + test_runner.run_unittests(modules) +