diff --git a/deps/rabbitmq_web_mqtt/Makefile b/deps/rabbitmq_web_mqtt/Makefile index 0ba7dbcec2..5cc930d7ab 100644 --- a/deps/rabbitmq_web_mqtt/Makefile +++ b/deps/rabbitmq_web_mqtt/Makefile @@ -19,7 +19,7 @@ export BUILD_WITHOUT_QUIC LOCAL_DEPS = ssl DEPS = rabbit cowboy rabbitmq_mqtt -TEST_DEPS = emqtt rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management rabbitmq_stomp rabbitmq_consistent_hash_exchange +TEST_DEPS = gun emqtt rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management rabbitmq_stomp rabbitmq_consistent_hash_exchange PLT_APPS += rabbitmq_cli elixir cowlib ssl @@ -27,6 +27,7 @@ PLT_APPS += rabbitmq_cli elixir cowlib ssl # See rabbitmq-components.mk. BUILD_DEPS += ranch +dep_gun = hex 2.2.0 dep_emqtt = git https://github.com/emqx/emqtt.git 1.14.6 DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl index e1f7bbb55c..fae2591ad5 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl @@ -124,7 +124,7 @@ start_tcp_listener(TCPConf0, CowboyOpts) -> start_tls_listener([], _) -> ok; -start_tls_listener(TLSConf0, CowboyOpts) -> +start_tls_listener(TLSConf0, CowboyOpts0) -> _ = rabbit_networking:ensure_ssl(), {TLSConf, TLSIpStr, TLSPort} = get_tls_conf(TLSConf0), RanchRef = rabbit_networking:ranch_ref(TLSConf), @@ -135,6 +135,10 @@ start_tls_listener(TLSConf0, CowboyOpts) -> num_acceptors => get_env(num_ssl_acceptors, 10), num_conns_sups => get_env(num_conns_sup, 1) }, + CowboyOpts = CowboyOpts0#{ + %% Enable HTTP/2 Websocket if not explicitly disabled. + enable_connect_protocol => maps:get(enable_connect_protocol, CowboyOpts0, true) + }, case cowboy:start_tls(RanchRef, RanchTransportOpts, CowboyOpts) of {ok, _} -> ok; diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl index 801cb83f8a..9c5c62f6c3 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl @@ -56,9 +56,40 @@ upgrade(Req, Env, Handler, HandlerState) -> upgrade(Req, Env, Handler, HandlerState, #{}). +%% We create a proxy socket for HTTP/2 even if no proxy was used, +%% and add a special field 'http_version' to indicate this is HTTP/2. +upgrade(Req=#{version := 'HTTP/2', pid := Parent, peer := Peer, sock := Sock}, + Env, Handler, HandlerState, Opts) -> + %% Cowboy doesn't expose the socket when HTTP/2 is used. + %% We take it directly from the connection's state. + %% + %% @todo Ideally we would not need the real socket for + %% normal operations. But we currently need it for + %% the heartbeat processes to do their job. In the + %% future we should not rely on those processes + %% and instead do the heartbeating directly in the + %% Websocket handler. + RealSocket = element(4,element(1,sys:get_state(Parent))), + ProxyInfo = case Req of + #{proxy_header := ProxyHeader} -> + ProxyHeader#{http_version => 'HTTP/2'}; + _ -> + {SrcAddr, SrcPort} = Peer, + {DestAddr, DestPort} = Sock, + #{ + http_version => 'HTTP/2', + src_address => SrcAddr, + src_port => SrcPort, + dest_address => DestAddr, + dest_port => DestPort + } + end, + ProxySocket = {rabbit_proxy_socket, RealSocket, ProxyInfo}, + cowboy_websocket:upgrade(Req, Env, Handler, HandlerState#state{socket = ProxySocket}, Opts); upgrade(Req, Env, Handler, HandlerState, Opts) -> cowboy_websocket:upgrade(Req, Env, Handler, HandlerState, Opts). +%% This is only called for HTTP/1.1. takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState}) -> Sock = case HandlerState#state.socket of undefined -> @@ -84,7 +115,7 @@ init(Req, Opts) -> stats_timer = rabbit_event:init_stats_timer()}, WsOpts0 = proplists:get_value(ws_opts, Opts, #{}), WsOpts = maps:merge(#{compress => true}, WsOpts0), - {?MODULE, Req1, State, WsOpts} + {?MODULE, Req1, State, WsOpts#{data_delivery => relay}} end end. diff --git a/deps/rabbitmq_web_mqtt/test/web_mqtt_system_SUITE.erl b/deps/rabbitmq_web_mqtt/test/web_mqtt_system_SUITE.erl index feaaeb4168..17939336ae 100644 --- a/deps/rabbitmq_web_mqtt/test/web_mqtt_system_SUITE.erl +++ b/deps/rabbitmq_web_mqtt/test/web_mqtt_system_SUITE.erl @@ -24,6 +24,7 @@ groups() -> ,unacceptable_data_type ,handle_invalid_packets ,duplicate_connect + ,wss_http2 ]} ]. @@ -36,7 +37,25 @@ init_per_suite(Config) -> {rmq_nodename_suffix, ?MODULE}, {protocol, "ws"} ]), - rabbit_ct_helpers:run_setup_steps(Config1, + Config2 = rabbit_ct_helpers:run_setup_steps(Config1), + {rmq_certsdir, CertsDir} = proplists:lookup(rmq_certsdir, Config2), + Config3 = rabbit_ct_helpers:merge_app_env( + Config2, + {rabbitmq_web_mqtt, + [{ssl_config, + [{cacertfile, filename:join([CertsDir, "testca", "cacert.pem"])}, + {certfile, filename:join([CertsDir, "server", "cert.pem"])}, + {keyfile, filename:join([CertsDir, "server", "key.pem"])}, + %% We only want to ensure HTTP/2 Websocket is working. + {fail_if_no_peer_cert, false}, + {versions, ['tlsv1.3']}, + %% We hard code this port number here because it will be computed later by + %% rabbit_ct_broker_helpers:init_tcp_port_numbers/3 when we start the broker. + %% (The alternative is to first start the broker, stop the rabbitmq_web_amqp app, + %% configure tls_config, and then start the app again.) + {port, 21010} + ]}]}), + rabbit_ct_helpers:run_setup_steps(Config3, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()). @@ -114,6 +133,26 @@ duplicate_connect(Config) -> after 500 -> ct:fail("expected web socket to exit") end. +wss_http2(Config) -> + {ok, _} = application:ensure_all_started(gun), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt_tls), + {ok, ConnPid} = gun:open("localhost", Port, #{ + transport => tls, + tls_opts => [{verify, verify_none}], + protocols => [http2], + http2_opts => #{notify_settings_changed => true}, + ws_opts => #{protocols => [{<<"mqtt">>, gun_ws_h}]} + }), + {ok, http2} = gun:await_up(ConnPid), + {notify, settings_changed, #{enable_connect_protocol := true}} + = gun:await(ConnPid, undefined), + StreamRef = gun:ws_upgrade(ConnPid, "/ws", []), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + gun:ws_send(ConnPid, StreamRef, {binary, rabbit_ws_test_util:mqtt_3_1_1_connect_packet()}), + {ws, {binary, _P}} = gun:await(ConnPid, StreamRef), + eventually(?_assertEqual(1, num_mqtt_connections(Config, 0))), + ok. + %% ------------------------------------------------------------------- %% Internal helpers %% ------------------------------------------------------------------- diff --git a/deps/rabbitmq_web_stomp/Makefile b/deps/rabbitmq_web_stomp/Makefile index 131f9df3ce..c3c795850f 100644 --- a/deps/rabbitmq_web_stomp/Makefile +++ b/deps/rabbitmq_web_stomp/Makefile @@ -20,7 +20,7 @@ define PROJECT_APP_EXTRA_KEYS endef DEPS = cowboy rabbit_common rabbit rabbitmq_stomp -TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers +TEST_DEPS = gun rabbitmq_ct_helpers rabbitmq_ct_client_helpers PLT_APPS += cowlib @@ -28,10 +28,12 @@ PLT_APPS += cowlib # See rabbitmq-components.mk. BUILD_DEPS += ranch +dep_gun = hex 2.2.0 + DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk include ../../rabbitmq-components.mk include ../../erlang.mk -CT_HOOKS = rabbit_ct_hook \ No newline at end of file +CT_HOOKS = rabbit_ct_hook diff --git a/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl b/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl index 01abbb7da3..032d89dc88 100644 --- a/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl +++ b/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl @@ -52,9 +52,40 @@ upgrade(Req, Env, Handler, HandlerState) -> upgrade(Req, Env, Handler, HandlerState, #{}). +%% We create a proxy socket for HTTP/2 even if no proxy was used, +%% and add a special field 'http_version' to indicate this is HTTP/2. +upgrade(Req=#{version := 'HTTP/2', pid := Parent, peer := Peer, sock := Sock}, + Env, Handler, HandlerState, Opts) -> + %% Cowboy doesn't expose the socket when HTTP/2 is used. + %% We take it directly from the connection's state. + %% + %% @todo Ideally we would not need the real socket for + %% normal operations. But we currently need it for + %% the heartbeat processes to do their job. In the + %% future we should not rely on those processes + %% and instead do the heartbeating directly in the + %% Websocket handler. + RealSocket = element(4,element(1,sys:get_state(Parent))), + ProxyInfo = case Req of + #{proxy_header := ProxyHeader} -> + ProxyHeader#{http_version => 'HTTP/2'}; + _ -> + {SrcAddr, SrcPort} = Peer, + {DestAddr, DestPort} = Sock, + #{ + http_version => 'HTTP/2', + src_address => SrcAddr, + src_port => SrcPort, + dest_address => DestAddr, + dest_port => DestPort + } + end, + ProxySocket = {rabbit_proxy_socket, RealSocket, ProxyInfo}, + cowboy_websocket:upgrade(Req, Env, Handler, HandlerState#state{socket = ProxySocket}, Opts); upgrade(Req, Env, Handler, HandlerState, Opts) -> cowboy_websocket:upgrade(Req, Env, Handler, HandlerState, Opts). +%% This is only called for HTTP/1.1. takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState}) -> Sock = case HandlerState#state.socket of undefined -> @@ -95,7 +126,7 @@ init(Req0, Opts) -> socket = SockInfo, peername = PeerAddr, auth_hd = cowboy_req:header(<<"authorization">>, Req) - }, WsOpts}. + }, WsOpts#{data_delivery => relay}}. websocket_init(State) -> process_flag(trap_exit, true), @@ -218,8 +249,23 @@ websocket_info({start_heartbeats, {SendTimeout, ReceiveTimeout}}, Self = self(), SendFun = fun () -> Self ! {send, <<$\n>>}, ok end, ReceiveFun = fun() -> Self ! client_timeout end, - Heartbeat = rabbit_heartbeat:start(SupPid, Sock, SendTimeout, - SendFun, ReceiveTimeout, ReceiveFun), + Heartbeat = case Sock of + {rabbit_proxy_socket, RealSocket, #{http_version := 'HTTP/2'}} -> + %% HTTP/2 Websocket may have multiple Websocket sessions + %% on a single connection (this can happen for example + %% when refreshing a page). As a result we need to attach + %% the heartbeat processes to the session. We do this via + %% a link for now. @todo In the future we will have a + %% mechanism in Cowboy to attach them to the stream. + {ok, Sender} = rabbit_heartbeat:start_heartbeat_sender(RealSocket, + SendTimeout, SendFun, {heartbeat_sender, unknown}), + {ok, Receiver} = rabbit_heartbeat:start_heartbeat_receiver(RealSocket, + ReceiveTimeout, ReceiveFun, {heartbeat_receiver, unknown}), + {Sender, Receiver}; + _ -> + rabbit_heartbeat:start(SupPid, Sock, SendTimeout, + SendFun, ReceiveTimeout, ReceiveFun) + end, {ok, State#state{heartbeat = Heartbeat}}; websocket_info(client_timeout, State) -> stop(State); @@ -248,10 +294,17 @@ websocket_info(Msg, State) -> [Msg]), {ok, State}. -terminate(_Reason, _Req, #state{proc_state = undefined}) -> - ok; -terminate(_Reason, _Req, #state{proc_state = ProcState}) -> +terminate(_Reason, _Req, State = #state{proc_state = undefined}) -> + terminate_heartbeaters(State); +terminate(_Reason, _Req, State = #state{proc_state = ProcState}) -> _ = rabbit_stomp_processor:flush_and_die(ProcState), + terminate_heartbeaters(State). + +terminate_heartbeaters(#state{heartbeat = {none, none}}) -> + ok; +terminate_heartbeaters(#state{heartbeat = {SPid, RPid}}) -> + sys:terminate(SPid, shutdown), + sys:terminate(RPid, shutdown), ok. %%---------------------------------------------------------------------------- diff --git a/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_listener.erl b/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_listener.erl index becaf8c564..22144d7076 100644 --- a/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_listener.erl +++ b/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_listener.erl @@ -136,16 +136,20 @@ start_tls_listener(TLSConf0, CowboyOpts0, Routes) -> TLSPort = proplists:get_value(port, TLSConf0), TLSConf = maybe_parse_ip(TLSConf0), RanchTransportOpts = #{ - socket_opts => TLSConf, + socket_opts => [{alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]}|TLSConf], connection_type => supervisor, max_connections => get_max_connections(), num_acceptors => NumSslAcceptors, num_conns_sups => 1 }, - CowboyOpts = CowboyOpts0#{env => #{dispatch => Routes}, - middlewares => [cowboy_router, - rabbit_web_stomp_middleware, - cowboy_handler]}, + CowboyOpts = CowboyOpts0#{ + env => #{dispatch => Routes}, + middlewares => [cowboy_router, + rabbit_web_stomp_middleware, + cowboy_handler], + %% Enable HTTP/2 Websocket if not explicitly disabled. + enable_connect_protocol => maps:get(enable_connect_protocol, CowboyOpts0, true) + }, case ranch:start_listener(rabbit_networking:ranch_ref(TLSConf), ranch_ssl, RanchTransportOpts, diff --git a/deps/rabbitmq_web_stomp/test/cowboy_websocket_SUITE.erl b/deps/rabbitmq_web_stomp/test/cowboy_websocket_SUITE.erl index 625c0b02a8..1dd08b38fe 100644 --- a/deps/rabbitmq_web_stomp/test/cowboy_websocket_SUITE.erl +++ b/deps/rabbitmq_web_stomp/test/cowboy_websocket_SUITE.erl @@ -27,7 +27,8 @@ groups() -> pubsub_binary, sub_non_existent, disconnect, - http_auth + http_auth, + wss_http2 ]}, %% rabbitmq/rabbitmq-web-stomp#110 {default_login_enabled, [], @@ -48,7 +49,25 @@ init_per_suite(Config) -> [{rmq_nodename_suffix, ?MODULE}, {protocol, "ws"}]), rabbit_ct_helpers:log_environment(), - rabbit_ct_helpers:run_setup_steps(Config1, + Config2 = rabbit_ct_helpers:run_setup_steps(Config1), + {rmq_certsdir, CertsDir} = proplists:lookup(rmq_certsdir, Config2), + Config3 = rabbit_ct_helpers:merge_app_env( + Config2, + {rabbitmq_web_stomp, + [{ssl_config, + [{cacertfile, filename:join([CertsDir, "testca", "cacert.pem"])}, + {certfile, filename:join([CertsDir, "server", "cert.pem"])}, + {keyfile, filename:join([CertsDir, "server", "key.pem"])}, + %% We only want to ensure HTTP/2 Websocket is working. + {fail_if_no_peer_cert, false}, + {versions, ['tlsv1.3']}, + %% We hard code this port number here because it will be computed later by + %% rabbit_ct_broker_helpers:init_tcp_port_numbers/3 when we start the broker. + %% (The alternative is to first start the broker, stop the rabbitmq_web_amqp app, + %% configure tls_config, and then start the app again.) + {port, 21014} + ]}]}), + rabbit_ct_helpers:run_setup_steps(Config3, rabbit_ct_broker_helpers:setup_steps()). end_per_suite(Config) -> @@ -286,3 +305,23 @@ Protocol = ?config(protocol, Config), {close, _} = rfc6455_client:close(WS3), ok. + +wss_http2(Config) -> + {ok, _} = application:ensure_all_started(gun), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_stomp_tls), + {ok, ConnPid} = gun:open("localhost", Port, #{ + transport => tls, + tls_opts => [{verify, verify_none}], + protocols => [http2], + http2_opts => #{notify_settings_changed => true}, + ws_opts => #{protocols => [{<<"v12.stomp">>, gun_ws_h}]} + }), + {ok, http2} = gun:await_up(ConnPid), + {notify, settings_changed, #{enable_connect_protocol := true}} + = gun:await(ConnPid, undefined), + StreamRef = gun:ws_upgrade(ConnPid, "/ws", []), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + gun:ws_send(ConnPid, StreamRef, {text, stomp:marshal("CONNECT", [{"login","guest"}, {"passcode", "guest"}])}), + {ws, {text, P}} = gun:await(ConnPid, StreamRef), + {<<"CONNECTED">>, _, <<>>} = stomp:unmarshal(P), + ok. diff --git a/deps/rabbitmq_web_stomp_examples/priv/echo.html b/deps/rabbitmq_web_stomp_examples/priv/echo.html index e94096bc0f..47baae5d65 100644 --- a/deps/rabbitmq_web_stomp_examples/priv/echo.html +++ b/deps/rabbitmq_web_stomp_examples/priv/echo.html @@ -79,7 +79,8 @@ }; // Stomp.js boilerplate - var client = Stomp.client('ws://' + window.location.hostname + ':15674/ws'); + const wsProtocol = location.protocol === 'https:' ? 'wss:' : 'ws:'; + var client = Stomp.client(wsProtocol + '//' + window.location.hostname + ':15674/ws'); client.debug = pipe('#second'); var print_first = pipe('#first', function(data) { diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index c3bcf2d5d5..0c426bbe97 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -40,8 +40,8 @@ endif # all projects use the same versions. It avoids conflicts. dep_accept = hex 0.3.5 -dep_cowboy = hex 2.13.0 -dep_cowlib = hex 2.14.0 +dep_cowboy = hex 2.14.0 +dep_cowlib = hex 2.16.0 dep_credentials_obfuscation = hex 3.5.0 dep_cuttlefish = hex 3.5.0 dep_gen_batch_server = hex 0.8.8