diff --git a/deps/rabbitmq_web_stomp/src/rabbit_ws_client.erl b/deps/rabbitmq_web_stomp/src/rabbit_ws_client.erl index 5da896adaf..28a3b5d9d4 100644 --- a/deps/rabbitmq_web_stomp/src/rabbit_ws_client.erl +++ b/deps/rabbitmq_web_stomp/src/rabbit_ws_client.erl @@ -54,12 +54,12 @@ init({SupPid, Conn, Heartbeat}) -> heartbeat_mode = Heartbeat}, #state.stats_timer)}. -init_processor_state(Conn) -> +init_processor_state({ConnMod, ConnProps}) -> SendFun = fun (_Sync, Data) -> - Conn:send(Data), + ConnMod:send(ConnProps, Data), ok end, - Info = Conn:info(), + Info = ConnMod:info(ConnProps), Headers = proplists:get_value(headers, Info), UseHTTPAuth = application:get_env(rabbitmq_web_stomp, use_http_auth, false), @@ -166,13 +166,13 @@ handle_info({start_heartbeats, _}, handle_info({start_heartbeats, {0, 0}}, State) -> {noreply, State}; handle_info({start_heartbeats, {SendTimeout, ReceiveTimeout}}, - State = #state{conn = Conn, + State = #state{conn = {ConnMod, ConnProps}, heartbeat_sup = SupPid, heartbeat_mode = heartbeat}) -> - Info = Conn:info(), + Info = ConnMod:info(ConnProps), Sock = proplists:get_value(socket, Info), Pid = self(), - SendFun = fun () -> Conn:send(<<$\n>>), ok end, + SendFun = fun () -> ConnMod:send(ConnProps, <<$\n>>), ok end, ReceiveFun = fun() -> gen_server2:cast(Pid, client_timeout) end, Heartbeat = rabbit_heartbeat:start(SupPid, Sock, SendTimeout, SendFun, ReceiveTimeout, ReceiveFun), @@ -202,11 +202,12 @@ handle_info(Info, State) -> handle_call(Request, _From, State) -> {stop, {odd_request, Request}, State}. -terminate(_Reason, State = #state{conn = Conn, proc_state = ProcessorState}) -> +terminate(_Reason, State = #state{conn = {ConnMod, ConnProps}, + proc_state = ProcessorState}) -> maybe_emit_stats(State), ok = file_handle_cache:release(), rabbit_stomp_processor:flush_and_die(ProcessorState), - Conn:close(1000, "STOMP died"), + ConnMod:close(ConnProps, 1000, "STOMP died"), ok. code_change(_OldVsn, State, _Extra) -> @@ -248,8 +249,8 @@ emit_stats(State=#state{connection = C}) when C == none; C == undefined -> %% established, as this causes orphan entries on the stats database State1 = rabbit_event:reset_stats_timer(State, #state.stats_timer), State1; -emit_stats(State=#state{conn=Conn, connection=ConnPid}) -> - Info = Conn:info(), +emit_stats(State=#state{conn={ConnMod, ConnProps}, connection=ConnPid}) -> + Info = ConnMod:info(ConnProps), Sock = proplists:get_value(socket, Info), SockInfos = case rabbit_net:getstat(Sock, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]) of diff --git a/deps/rabbitmq_web_stomp/src/rabbit_ws_handler.erl b/deps/rabbitmq_web_stomp/src/rabbit_ws_handler.erl index 5f2f0aeda1..a16cf83337 100644 --- a/deps/rabbitmq_web_stomp/src/rabbit_ws_handler.erl +++ b/deps/rabbitmq_web_stomp/src/rabbit_ws_handler.erl @@ -55,12 +55,14 @@ init(Req0, Opts) -> {cowboy_websocket, Req, {Socket, Peername, Sockname, Headers, FrameType}}. websocket_init({Socket, Peername, Sockname, Headers, FrameType}) -> - Conn = {?MODULE, self(), [ - {socket, Socket}, - {peername, Peername}, - {sockname, Sockname}, - {headers, Headers}]}, - {ok, _Sup, Pid} = rabbit_ws_sup:start_client({Conn, heartbeat}), + Info = [{socket, Socket}, + {peername, Peername}, + {sockname, Sockname}, + {headers, Headers}], + {ok, _Sup, Pid} = rabbit_ws_sup:start_client({{?MODULE, + #{pid => self(), + info => Info}}, + heartbeat}), {ok, #state{pid=Pid, type=FrameType}}. websocket_handle({text, Data}, State=#state{pid=Pid}) -> @@ -109,13 +111,13 @@ filter_stomp_protocols(Protocols) -> %% within the Websocket process. This could be a good refactoring %% once SockJS gets removed. -info({?MODULE, _, Info}) -> +info(#{info := Info}) -> Info. -send(Data, {?MODULE, Pid, _}) -> +send(#{pid := Pid}, Data) -> Pid ! {send, Data}, ok. -close(Code, Reason, {?MODULE, Pid, _}) -> +close(#{pid := Pid}, Code, Reason) -> Pid ! {close, Code, Reason}, ok.