From 5f8a0fd46a7e6d1118976e68fb8295ace2fb8688 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Wed, 7 Nov 2018 16:00:54 +0100 Subject: [PATCH] Refactor Web-STOMP to be closer to Web-MQTT This also discards a fair bit of unnecessary code that comes from the old SockJS implementation, and reduces the number of processes per connection by one. --- .../src/rabbit_ws_client.erl | 264 ----------------- .../src/rabbit_ws_client_sup.erl | 40 --- .../src/rabbit_ws_connection_sup.erl | 60 ++++ .../src/rabbit_ws_handler.erl | 269 ++++++++++++++---- .../src/rabbit_ws_listener.erl | 8 +- .../src/rabbit_ws_middleware.erl | 12 +- .../src/rabbit_ws_protocol.erl | 33 --- deps/rabbitmq_web_stomp/src/rabbit_ws_sup.erl | 9 +- .../test/src/rfc6455_client.erl | 2 +- 9 files changed, 284 insertions(+), 413 deletions(-) delete mode 100644 deps/rabbitmq_web_stomp/src/rabbit_ws_client.erl delete mode 100644 deps/rabbitmq_web_stomp/src/rabbit_ws_client_sup.erl create mode 100644 deps/rabbitmq_web_stomp/src/rabbit_ws_connection_sup.erl delete mode 100644 deps/rabbitmq_web_stomp/src/rabbit_ws_protocol.erl diff --git a/deps/rabbitmq_web_stomp/src/rabbit_ws_client.erl b/deps/rabbitmq_web_stomp/src/rabbit_ws_client.erl deleted file mode 100644 index 28a3b5d9d4..0000000000 --- a/deps/rabbitmq_web_stomp/src/rabbit_ws_client.erl +++ /dev/null @@ -1,264 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. -%% - --module(rabbit_ws_client). --behaviour(gen_server). - --include_lib("rabbitmq_stomp/include/rabbit_stomp.hrl"). --include_lib("amqp_client/include/amqp_client.hrl"). - --export([start_link/1]). --export([msg/2, closed/1]). - --export([init/1, handle_call/3, handle_info/2, terminate/2, - code_change/3, handle_cast/2]). - --record(state, {conn, proc_state, parse_state, stats_timer, connection, heartbeat_mode, heartbeat, heartbeat_sup}). - -%%---------------------------------------------------------------------------- - -start_link(Params) -> - gen_server:start_link(?MODULE, Params, []). - -msg(Pid, Data) -> - gen_server:cast(Pid, {msg, Data}). - -closed(Pid) -> - gen_server:cast(Pid, closed). - -%%---------------------------------------------------------------------------- - -init({SupPid, Conn, Heartbeat}) -> - ok = file_handle_cache:obtain(), - process_flag(trap_exit, true), - {ok, ProcessorState} = init_processor_state(Conn), - {ok, rabbit_event:init_stats_timer( - #state{conn = Conn, - proc_state = ProcessorState, - parse_state = rabbit_stomp_frame:initial_state(), - heartbeat_sup = SupPid, - heartbeat = {none, none}, - heartbeat_mode = Heartbeat}, - #state.stats_timer)}. - -init_processor_state({ConnMod, ConnProps}) -> - SendFun = fun (_Sync, Data) -> - ConnMod:send(ConnProps, Data), - ok - end, - Info = ConnMod:info(ConnProps), - Headers = proplists:get_value(headers, Info), - - UseHTTPAuth = application:get_env(rabbitmq_web_stomp, use_http_auth, false), - - StompConfig0 = #stomp_configuration{implicit_connect = false}, - - StompConfig = case UseHTTPAuth of - true -> - case lists:keyfind(authorization, 1, Headers) of - false -> - %% We fall back to the default STOMP credentials. - UserConfig = application:get_env(rabbitmq_stomp, default_user, undefined), - StompConfig1 = rabbit_stomp:parse_default_user(UserConfig, StompConfig0), - StompConfig1#stomp_configuration{force_default_creds = true}; - {_, AuthHd} -> - {basic, HTTPLogin, HTTPPassCode} - = cow_http_hd:parse_authorization(rabbit_data_coercion:to_binary(AuthHd)), - StompConfig0#stomp_configuration{ - default_login = HTTPLogin, - default_passcode = HTTPPassCode, - force_default_creds = true} - end; - false -> - StompConfig0 - end, - - Sock = proplists:get_value(socket, Info), - {PeerAddr, _} = proplists:get_value(peername, Info), - AdapterInfo0 = #amqp_adapter_info{additional_info=Extra} - = amqp_connection:socket_adapter_info(Sock, {'Web STOMP', 0}), - %% Flow control is not supported for Web-STOMP connections. - AdapterInfo = AdapterInfo0#amqp_adapter_info{ - additional_info=[{state, running}|Extra]}, - - ProcessorState = rabbit_stomp_processor:initial_state( - StompConfig, - {SendFun, AdapterInfo, none, PeerAddr}), - {ok, ProcessorState}. - -handle_cast({msg, Data}, State = #state{proc_state = ProcessorState, - parse_state = ParseState, - connection = ConnPid}) -> - case process_received_bytes(Data, ProcessorState, ParseState, ConnPid) of - {ok, NewProcState, ParseState1, ConnPid1} -> - {noreply, ensure_stats_timer(State#state{ - parse_state = ParseState1, - proc_state = NewProcState, - connection = ConnPid1})}; - {stop, Reason, NewProcState, ParseState1} -> - {stop, Reason, State#state{ - parse_state = ParseState1, - proc_state = NewProcState}} - end; - -handle_cast(closed, State) -> - {stop, normal, State}; - -handle_cast(client_timeout, State) -> - {stop, {shutdown, client_heartbeat_timeout}, State}; - -handle_cast(Cast, State) -> - {stop, {odd_cast, Cast}, State}. - -%% TODO this is a bit rubbish - after the preview release we should -%% make the credit_flow:send/1 invocation in -%% rabbit_stomp_processor:process_frame/2 optional. -handle_info({bump_credit, {_, _}}, State) -> - {noreply, State}; - -handle_info(#'basic.consume_ok'{}, State) -> - {noreply, State}; -handle_info(#'basic.cancel_ok'{}, State) -> - {noreply, State}; -handle_info(#'basic.ack'{delivery_tag = Tag, multiple = IsMulti}, State) -> - ProcState = processor_state(State), - NewProcState = rabbit_stomp_processor:flush_pending_receipts(Tag, - IsMulti, - ProcState), - {noreply, processor_state(NewProcState, State)}; -handle_info({Delivery = #'basic.deliver'{}, - #amqp_msg{props = Props, payload = Payload}, - DeliveryCtx}, - State) -> - ProcState = processor_state(State), - NewProcState = rabbit_stomp_processor:send_delivery(Delivery, - Props, - Payload, - DeliveryCtx, - ProcState), - {noreply, processor_state(NewProcState, State)}; -handle_info(#'basic.cancel'{consumer_tag = Ctag}, State) -> - ProcState = processor_state(State), - case rabbit_stomp_processor:cancel_consumer(Ctag, ProcState) of - {ok, NewProcState, _Connection} -> - {noreply, processor_state(NewProcState, State)}; - {stop, Reason, NewProcState} -> - {stop, Reason, processor_state(NewProcState, State)} - end; - -handle_info({start_heartbeats, _}, - State = #state{heartbeat_mode = no_heartbeat}) -> - {noreply, State}; - -handle_info({start_heartbeats, {0, 0}}, State) -> - {noreply, State}; -handle_info({start_heartbeats, {SendTimeout, ReceiveTimeout}}, - State = #state{conn = {ConnMod, ConnProps}, - heartbeat_sup = SupPid, - heartbeat_mode = heartbeat}) -> - Info = ConnMod:info(ConnProps), - Sock = proplists:get_value(socket, Info), - Pid = self(), - SendFun = fun () -> ConnMod:send(ConnProps, <<$\n>>), ok end, - ReceiveFun = fun() -> gen_server2:cast(Pid, client_timeout) end, - Heartbeat = rabbit_heartbeat:start(SupPid, Sock, SendTimeout, - SendFun, ReceiveTimeout, ReceiveFun), - {noreply, State#state{heartbeat = Heartbeat}}; - - - -%%---------------------------------------------------------------------------- -handle_info({'EXIT', From, Reason}, State) -> - ProcState = processor_state(State), - case rabbit_stomp_processor:handle_exit(From, Reason, ProcState) of - {stop, Reason, NewProcState} -> - {stop, Reason, processor_state(NewProcState, State)}; - unknown_exit -> - {stop, {connection_died, Reason}, State} - end; -%%---------------------------------------------------------------------------- - -handle_info(emit_stats, State) -> - {noreply, emit_stats(State)}; - -handle_info(Info, State) -> - {stop, {odd_info, Info}, State}. - - - -handle_call(Request, _From, State) -> - {stop, {odd_request, Request}, State}. - -terminate(_Reason, State = #state{conn = {ConnMod, ConnProps}, - proc_state = ProcessorState}) -> - maybe_emit_stats(State), - ok = file_handle_cache:release(), - rabbit_stomp_processor:flush_and_die(ProcessorState), - ConnMod:close(ConnProps, 1000, "STOMP died"), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -%%---------------------------------------------------------------------------- - - -process_received_bytes(Bytes, ProcessorState, ParseState, ConnPid) -> - case rabbit_stomp_frame:parse(Bytes, ParseState) of - {ok, Frame, Rest} -> - case rabbit_stomp_processor:process_frame(Frame, ProcessorState) of - {ok, NewProcState, ConnPid1} -> - ParseState1 = rabbit_stomp_frame:initial_state(), - process_received_bytes(Rest, NewProcState, ParseState1, ConnPid1); - {stop, Reason, NewProcState} -> - {stop, Reason, NewProcState, ParseState} - end; - {more, ParseState1} -> - {ok, ProcessorState, ParseState1, ConnPid} - end. - -processor_state(#state{ proc_state = ProcState }) -> ProcState. -processor_state(ProcState, #state{} = State) -> - State#state{ proc_state = ProcState}. - -%%---------------------------------------------------------------------------- - -ensure_stats_timer(State) -> - rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats). - -maybe_emit_stats(State) -> - rabbit_event:if_enabled(State, #state.stats_timer, - fun() -> emit_stats(State) end). - -emit_stats(State=#state{connection = C}) when C == none; C == undefined -> - %% Avoid emitting stats on terminate when the connection has not yet been - %% established, as this causes orphan entries on the stats database - State1 = rabbit_event:reset_stats_timer(State, #state.stats_timer), - State1; -emit_stats(State=#state{conn={ConnMod, ConnProps}, connection=ConnPid}) -> - Info = ConnMod:info(ConnProps), - Sock = proplists:get_value(socket, Info), - SockInfos = case rabbit_net:getstat(Sock, - [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]) of - {ok, SI} -> SI; - {error, _} -> [] - end, - Infos = [{pid, ConnPid}|SockInfos], - rabbit_core_metrics:connection_stats(ConnPid, Infos), - rabbit_event:notify(connection_stats, Infos), - State1 = rabbit_event:reset_stats_timer(State, #state.stats_timer), - State1. diff --git a/deps/rabbitmq_web_stomp/src/rabbit_ws_client_sup.erl b/deps/rabbitmq_web_stomp/src/rabbit_ws_client_sup.erl deleted file mode 100644 index 1c5aa0b681..0000000000 --- a/deps/rabbitmq_web_stomp/src/rabbit_ws_client_sup.erl +++ /dev/null @@ -1,40 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. -%% - --module(rabbit_ws_client_sup). --behaviour(supervisor2). - --export([start_client/1]). --export([init/1]). - --include_lib("amqp_client/include/amqp_client.hrl"). - - -%% -------------------------------------------------------------------------- - -start_client({Conn, Heartbeat}) -> - {ok, SupPid} = supervisor2:start_link(?MODULE, []), - {ok, Client} = supervisor2:start_child( - SupPid, client_spec(SupPid, Conn, Heartbeat)), - {ok, SupPid, Client}. - - -client_spec(SupPid, Conn, Heartbeat) -> - {rabbit_ws_client, {rabbit_ws_client, start_link, [{SupPid, Conn, Heartbeat}]}, - intrinsic, ?WORKER_WAIT, worker, [rabbit_ws_client]}. - -init(_Any) -> - {ok, {{one_for_all, 0, 1}, []}}. diff --git a/deps/rabbitmq_web_stomp/src/rabbit_ws_connection_sup.erl b/deps/rabbitmq_web_stomp/src/rabbit_ws_connection_sup.erl new file mode 100644 index 0000000000..1e3515e1ce --- /dev/null +++ b/deps/rabbitmq_web_stomp/src/rabbit_ws_connection_sup.erl @@ -0,0 +1,60 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_ws_connection_sup). + +-behaviour(supervisor2). +-behaviour(ranch_protocol). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-export([start_link/4, start_keepalive_link/0]). +-export([init/1]). + +%%---------------------------------------------------------------------------- + +start_link(Ref, Sock, Transport, CowboyOpts0) -> + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, KeepaliveSup} = supervisor2:start_child( + SupPid, + {rabbit_web_stomp_keepalive_sup, + {?MODULE, start_keepalive_link, []}, + intrinsic, infinity, supervisor, [rabbit_keepalive_sup]}), + %% In order for the Websocket handler to receive the KeepaliveSup + %% variable, we need to pass it first through the environment and + %% then have the middleware rabbit_web_mqtt_middleware place it + %% in the initial handler state. + Env = maps:get(env, CowboyOpts0), + CowboyOpts = CowboyOpts0#{env => Env#{keepalive_sup => KeepaliveSup, + socket => Sock}}, + Protocol = case Transport of + ranch_tcp -> cowboy_clear; + ranch_ssl -> cowboy_tls + end, + {ok, ReaderPid} = supervisor2:start_child( + SupPid, + {Protocol, + {Protocol, start_link, [Ref, Sock, Transport, CowboyOpts]}, + intrinsic, ?WORKER_WAIT, worker, [Protocol]}), + {ok, SupPid, ReaderPid}. + +start_keepalive_link() -> + supervisor2:start_link(?MODULE, []). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_all, 0, 1}, []}}. diff --git a/deps/rabbitmq_web_stomp/src/rabbit_ws_handler.erl b/deps/rabbitmq_web_stomp/src/rabbit_ws_handler.erl index afac785a6b..19fb7d45c2 100644 --- a/deps/rabbitmq_web_stomp/src/rabbit_ws_handler.erl +++ b/deps/rabbitmq_web_stomp/src/rabbit_ws_handler.erl @@ -17,6 +17,9 @@ -module(rabbit_ws_handler). -behaviour(cowboy_websocket). +-include_lib("rabbitmq_stomp/include/rabbit_stomp.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + %% Websocket. -export([init/2]). -export([websocket_init/1]). @@ -24,19 +27,28 @@ -export([websocket_info/2]). -export([terminate/3]). -%% SockJS interface --export([info/1]). --export([send/2]). --export([close/3]). - --record(state, {conn, pid, type}). +-record(state, { + frame_type, + heartbeat_mode, + heartbeat, + heartbeat_sup, + parse_state, + proc_state, + socket, + peername, + auth_hd, + stats_timer, + connection +}). %% Websocket. init(Req0, Opts) -> - Req = case cowboy_req:header(<<"sec-websocket-protocol">>, Req0) of + {PeerAddr, _PeerPort} = maps:get(peer, Req0), + {_, KeepaliveSup} = lists:keyfind(keepalive_sup, 1, Opts), + {_, Sock} = lists:keyfind(socket, 1, Opts), + Req = case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req0) of undefined -> Req0; - ProtocolHd -> - Protocols = parse_sec_websocket_protocol_req(ProtocolHd), + Protocols -> case filter_stomp_protocols(Protocols) of [] -> Req0; [StompProtocol|_] -> @@ -44,57 +56,154 @@ init(Req0, Opts) -> StompProtocol, Req0) end end, - {_, FrameType} = lists:keyfind(type, 1, Opts), - Socket = maps:get(socket, Req0), - Peername = cowboy_req:peer(Req), - {ok, Sockname} = rabbit_net:sockname(Socket), - Headers = case cowboy_req:header(<<"authorization">>, Req) of - undefined -> []; - AuthHd -> [{authorization, binary_to_list(AuthHd)}] - end, WsOpts0 = proplists:get_value(ws_opts, Opts, #{}), WsOpts = maps:merge(#{compress => true}, WsOpts0), - {cowboy_websocket, Req, {Socket, Peername, Sockname, Headers, FrameType}, WsOpts}. + {cowboy_websocket, Req, #state{ + frame_type = proplists:get_value(type, Opts, text), + heartbeat_sup = KeepaliveSup, + heartbeat = {none, none}, + heartbeat_mode = heartbeat, + socket = Sock, + peername = PeerAddr, + auth_hd = cowboy_req:header(<<"authorization">>, Req) + }, WsOpts}. -websocket_init({Socket, Peername, Sockname, Headers, FrameType}) -> - Info = [{socket, Socket}, - {peername, Peername}, - {sockname, Sockname}, - {headers, Headers}], - {ok, _Sup, Pid} = rabbit_ws_sup:start_client({{?MODULE, - #{pid => self(), - info => Info}}, - heartbeat}), - {ok, #state{pid=Pid, type=FrameType}}. +websocket_init(State) -> + ok = file_handle_cache:obtain(), + process_flag(trap_exit, true), + {ok, ProcessorState} = init_processor_state(State), + {ok, rabbit_event:init_stats_timer( + State#state{proc_state = ProcessorState, + parse_state = rabbit_stomp_frame:initial_state()}, + #state.stats_timer)}. -websocket_handle({text, Data}, State=#state{pid=Pid}) -> - rabbit_ws_client:msg(Pid, Data), - {ok, State}; -websocket_handle({binary, Data}, State=#state{pid=Pid}) -> - rabbit_ws_client:msg(Pid, Data), - {ok, State}; +init_processor_state(#state{socket=Sock, peername=PeerAddr, auth_hd=AuthHd}) -> + Self = self(), + SendFun = fun (_Sync, Data) -> + Self ! {send, Data}, + ok + end, + + UseHTTPAuth = application:get_env(rabbitmq_web_stomp, use_http_auth, false), + StompConfig0 = #stomp_configuration{implicit_connect = false}, + StompConfig = case UseHTTPAuth of + true -> + case AuthHd of + undefined -> + %% We fall back to the default STOMP credentials. + UserConfig = application:get_env(rabbitmq_stomp, default_user, undefined), + StompConfig1 = rabbit_stomp:parse_default_user(UserConfig, StompConfig0), + StompConfig1#stomp_configuration{force_default_creds = true}; + _ -> + {basic, HTTPLogin, HTTPPassCode} + = cow_http_hd:parse_authorization(AuthHd), + StompConfig0#stomp_configuration{ + default_login = HTTPLogin, + default_passcode = HTTPPassCode, + force_default_creds = true} + end; + false -> + StompConfig0 + end, + + AdapterInfo0 = #amqp_adapter_info{additional_info=Extra} + = amqp_connection:socket_adapter_info(Sock, {'Web STOMP', 0}), + %% Flow control is not supported for Web-STOMP connections. + AdapterInfo = AdapterInfo0#amqp_adapter_info{ + additional_info=[{state, running}|Extra]}, + + ProcessorState = rabbit_stomp_processor:initial_state( + StompConfig, + {SendFun, AdapterInfo, none, PeerAddr}), + {ok, ProcessorState}. + +websocket_handle({text, Data}, State) -> + handle_data(Data, State); +websocket_handle({binary, Data}, State) -> + handle_data(Data, State); websocket_handle(_Frame, State) -> {ok, State}. -websocket_info({send, Msg}, State=#state{type=FrameType}) -> +websocket_info({send, Msg}, State=#state{frame_type=FrameType}) -> {reply, {FrameType, Msg}, State}; -websocket_info(Frame = {close, _, _}, State) -> - {reply, Frame, State}; -websocket_info(_Info, State) -> + +%% TODO this is a bit rubbish - after the preview release we should +%% make the credit_flow:send/1 invocation in +%% rabbit_stomp_processor:process_frame/2 optional. +websocket_info({bump_credit, {_, _}}, State) -> + {ok, State}; + +websocket_info(#'basic.consume_ok'{}, State) -> + {ok, State}; +websocket_info(#'basic.cancel_ok'{}, State) -> + {ok, State}; +websocket_info(#'basic.ack'{delivery_tag = Tag, multiple = IsMulti}, + State=#state{ proc_state = ProcState0 }) -> + ProcState = rabbit_stomp_processor:flush_pending_receipts(Tag, + IsMulti, + ProcState0), + {ok, State#state{ proc_state = ProcState }}; +websocket_info({Delivery = #'basic.deliver'{}, + #amqp_msg{props = Props, payload = Payload}, + DeliveryCtx}, + State=#state{ proc_state = ProcState0 }) -> + ProcState = rabbit_stomp_processor:send_delivery(Delivery, + Props, + Payload, + DeliveryCtx, + ProcState0), + {ok, State#state{ proc_state = ProcState }}; +websocket_info(#'basic.cancel'{consumer_tag = Ctag}, + State=#state{ proc_state = ProcState0 }) -> + case rabbit_stomp_processor:cancel_consumer(Ctag, ProcState0) of + {ok, ProcState, _Connection} -> + {ok, State#state{ proc_state = ProcState }}; + {stop, _Reason, ProcState} -> + stop(State#state{ proc_state = ProcState }) + end; + +websocket_info({start_heartbeats, _}, + State = #state{heartbeat_mode = no_heartbeat}) -> + {ok, State}; + +websocket_info({start_heartbeats, {0, 0}}, State) -> + {ok, State}; +websocket_info({start_heartbeats, {SendTimeout, ReceiveTimeout}}, + State = #state{socket = Sock, + heartbeat_sup = SupPid, + heartbeat_mode = heartbeat}) -> + Self = self(), + SendFun = fun () -> Self ! {send, <<$\n>>}, ok end, + ReceiveFun = fun() -> Self ! client_timeout end, + Heartbeat = rabbit_heartbeat:start(SupPid, Sock, SendTimeout, + SendFun, ReceiveTimeout, ReceiveFun), + {ok, State#state{heartbeat = Heartbeat}}; +websocket_info(client_timeout, State) -> + stop(State); + +%%---------------------------------------------------------------------------- +websocket_info({'EXIT', From, Reason}, + State=#state{ proc_state = ProcState0 }) -> + case rabbit_stomp_processor:handle_exit(From, Reason, ProcState0) of + {stop, _Reason, ProcState} -> + stop(State#state{ proc_state = ProcState }); + unknown_exit -> + stop(State) + end; +%%---------------------------------------------------------------------------- + +websocket_info(emit_stats, State) -> + {ok, emit_stats(State)}; + +websocket_info(Msg, State) -> + rabbit_log_connection:info("WEB-STOMP: unexpected message ~p~n", + [Msg]), {ok, State}. -terminate(_Reason, _Req, {_, _, _, _, _}) -> - ok; -terminate(_Reason, _Req, #state{pid=Pid}) -> - rabbit_ws_client:closed(Pid), +terminate(_Reason, _Req, _State) -> ok. -%% When moving to Cowboy 2, this code should be replaced -%% with a simple call to cow_http_hd:parse_sec_websocket_protocol_req/1. - -parse_sec_websocket_protocol_req(Bin) -> - Protocols = binary:split(Bin, [<<$,>>, <<$\s>>], [global]), - [P || P <- Protocols, P =/= <<>>]. +%%---------------------------------------------------------------------------- %% The protocols v10.stomp, v11.stomp and v12.stomp are registered %% at IANA: https://www.iana.org/assignments/websocket/websocket.xhtml @@ -108,18 +217,58 @@ filter_stomp_protocols(Protocols) -> end, Protocols))). -%% TODO -%% Ideally all the STOMP interaction should be done from -%% within the Websocket process. This could be a good refactoring -%% once SockJS gets removed. +%%---------------------------------------------------------------------------- -info(#{info := Info}) -> - Info. +handle_data(<<>>, State) -> + {ok, ensure_stats_timer(State)}; +handle_data(Bytes, State = #state{proc_state = ProcState, + parse_state = ParseState}) -> + case rabbit_stomp_frame:parse(Bytes, ParseState) of + {more, ParseState1} -> + {ok, ensure_stats_timer(State#state{ parse_state = ParseState1 })}; + {ok, Frame, Rest} -> + case rabbit_stomp_processor:process_frame(Frame, ProcState) of + {ok, ProcState1, ConnPid} -> + ParseState1 = rabbit_stomp_frame:initial_state(), + handle_data( + Rest, + State #state{ parse_state = ParseState1, + proc_state = ProcState1, + connection = ConnPid }); + {stop, _Reason, ProcState1} -> + io:format(user, "~p~n", [_Reason]), + stop(State#state{ proc_state = ProcState1 }) + end + end. -send(#{pid := Pid}, Data) -> - Pid ! {send, Data}, - ok. +stop(State = #state{proc_state = ProcState}) -> + maybe_emit_stats(State), + ok = file_handle_cache:release(), + rabbit_stomp_processor:flush_and_die(ProcState), + {reply, {close, 1000, "STOMP died"}, State}. -close(#{pid := Pid}, Code, Reason) -> - Pid ! {close, Code, Reason}, - ok. +%%---------------------------------------------------------------------------- + +ensure_stats_timer(State) -> + rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats). + +maybe_emit_stats(State) -> + rabbit_event:if_enabled(State, #state.stats_timer, + fun() -> emit_stats(State) end). + +emit_stats(State=#state{connection = C}) when C == none; C == undefined -> + %% Avoid emitting stats on terminate when the connection has not yet been + %% established, as this causes orphan entries on the stats database + State1 = rabbit_event:reset_stats_timer(State, #state.stats_timer), + State1; +emit_stats(State=#state{socket=Sock, connection=Conn}) -> + SockInfos = case rabbit_net:getstat(Sock, + [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]) of + {ok, SI} -> SI; + {error, _} -> [] + end, + Infos = [{pid, Conn}|SockInfos], + rabbit_core_metrics:connection_stats(Conn, Infos), + rabbit_event:notify(connection_stats, Infos), + State1 = rabbit_event:reset_stats_timer(State, #state.stats_timer), + State1. diff --git a/deps/rabbitmq_web_stomp/src/rabbit_ws_listener.erl b/deps/rabbitmq_web_stomp/src/rabbit_ws_listener.erl index e131bd2f13..47d5bc3edd 100644 --- a/deps/rabbitmq_web_stomp/src/rabbit_ws_listener.erl +++ b/deps/rabbitmq_web_stomp/src/rabbit_ws_listener.erl @@ -46,8 +46,8 @@ init() -> end, case ranch:start_listener( http, NumTcpAcceptors, - ranch_tcp, TcpConf, - rabbit_ws_protocol, + ranch_tcp, [{connection_type, supervisor}|TcpConf], + rabbit_ws_connection_sup, CowboyOpts#{env => #{dispatch => Routes}, middlewares => [cowboy_router, rabbit_ws_middleware, @@ -78,8 +78,8 @@ init() -> end, {ok, _} = ranch:start_listener( https, NumSslAcceptors, - ranch_ssl, TLSConf, - rabbit_ws_protocol, + ranch_ssl, [{connection_type, supervisor}|TLSConf], + rabbit_ws_connection_sup, CowboyOpts#{env => #{dispatch => Routes}, middlewares => [cowboy_router, rabbit_ws_middleware, diff --git a/deps/rabbitmq_web_stomp/src/rabbit_ws_middleware.erl b/deps/rabbitmq_web_stomp/src/rabbit_ws_middleware.erl index 87cb6e8ef1..e1393244bb 100644 --- a/deps/rabbitmq_web_stomp/src/rabbit_ws_middleware.erl +++ b/deps/rabbitmq_web_stomp/src/rabbit_ws_middleware.erl @@ -19,8 +19,12 @@ -export([execute/2]). execute(Req, Env) -> + #{keepalive_sup := KeepaliveSup} = Env, Sock = maps:get(socket, Env), - {ok, Req#{socket => Sock}, Env}. - - - + case maps:get(handler_opts, Env, undefined) of + undefined -> {ok, Req, Env}; + Opts when is_list(Opts) -> + {ok, Req, Env#{handler_opts => [{keepalive_sup, KeepaliveSup}, + {socket, Sock} + |Opts]}} + end. diff --git a/deps/rabbitmq_web_stomp/src/rabbit_ws_protocol.erl b/deps/rabbitmq_web_stomp/src/rabbit_ws_protocol.erl deleted file mode 100644 index aa7f96f030..0000000000 --- a/deps/rabbitmq_web_stomp/src/rabbit_ws_protocol.erl +++ /dev/null @@ -1,33 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. -%% - --module(rabbit_ws_protocol). --behaviour(ranch_protocol). - --export([start_link/4]). - -start_link(Ref, Sock, Transport, CowboyOpts0) -> - %% In order for the Websocket handler to receive the KeepaliveSup - %% variable, we need to pass it first through the environment and - %% then have the middleware rabbit_web_mqtt_middleware place it - %% in the initial handler state. - Env = maps:get(env, CowboyOpts0), - CowboyOpts = CowboyOpts0#{env => Env#{socket => Sock}}, - Protocol = case Transport of - ranch_tcp -> cowboy_clear; - ranch_ssl -> cowboy_tls - end, - Protocol:start_link(Ref, Sock, Transport, CowboyOpts). diff --git a/deps/rabbitmq_web_stomp/src/rabbit_ws_sup.erl b/deps/rabbitmq_web_stomp/src/rabbit_ws_sup.erl index 16647cbced..6184c805c6 100644 --- a/deps/rabbitmq_web_stomp/src/rabbit_ws_sup.erl +++ b/deps/rabbitmq_web_stomp/src/rabbit_ws_sup.erl @@ -17,7 +17,7 @@ -module(rabbit_ws_sup). -behaviour(supervisor2). --export([start_link/0, init/1, start_client/1]). +-export([start_link/0, init/1]). -define(SUP_NAME, ?MODULE). @@ -28,9 +28,4 @@ start_link() -> supervisor2:start_link({local, ?SUP_NAME}, ?MODULE, []). init([]) -> - {ok, {{simple_one_for_one, 0, 1}, - [{client, {rabbit_ws_client_sup, start_client, []}, - temporary, infinity, supervisor, [rabbit_ws_client_sup]}]}}. - -start_client(Params) -> - supervisor2:start_child(?SUP_NAME, [Params]). + {ok, {{one_for_one, 1, 5}, []}}. diff --git a/deps/rabbitmq_web_stomp/test/src/rfc6455_client.erl b/deps/rabbitmq_web_stomp/test/src/rfc6455_client.erl index 0eac030fbf..624cf2b277 100644 --- a/deps/rabbitmq_web_stomp/test/src/rfc6455_client.erl +++ b/deps/rabbitmq_web_stomp/test/src/rfc6455_client.erl @@ -107,7 +107,7 @@ start_conn(State = #state{transport = Transport}, AuthInfo, Protocols) -> ProtocolHd = case Protocols of [] -> ""; - _ -> "Sec-Websocket-Protocol: " ++ string:join(Protocols, ", ") + _ -> "Sec-Websocket-Protocol: " ++ string:join(Protocols, ", ") ++ "\r\n" end, Key = base64:encode_to_string(crypto:strong_rand_bytes(16)),