Refactor Web-STOMP to be closer to Web-MQTT
This also discards a fair bit of unnecessary code that comes from the old SockJS implementation, and reduces the number of processes per connection by one.
This commit is contained in:
parent
231225ae55
commit
5f8a0fd46a
|
|
@ -1,264 +0,0 @@
|
|||
%% The contents of this file are subject to the Mozilla Public License
|
||||
%% Version 1.1 (the "License"); you may not use this file except in
|
||||
%% compliance with the License. You may obtain a copy of the License
|
||||
%% at http://www.mozilla.org/MPL/
|
||||
%%
|
||||
%% Software distributed under the License is distributed on an "AS IS"
|
||||
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
||||
%% the License for the specific language governing rights and
|
||||
%% limitations under the License.
|
||||
%%
|
||||
%% The Original Code is RabbitMQ.
|
||||
%%
|
||||
%% The Initial Developer of the Original Code is GoPivotal, Inc.
|
||||
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
|
||||
%%
|
||||
|
||||
-module(rabbit_ws_client).
|
||||
-behaviour(gen_server).
|
||||
|
||||
-include_lib("rabbitmq_stomp/include/rabbit_stomp.hrl").
|
||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
|
||||
-export([start_link/1]).
|
||||
-export([msg/2, closed/1]).
|
||||
|
||||
-export([init/1, handle_call/3, handle_info/2, terminate/2,
|
||||
code_change/3, handle_cast/2]).
|
||||
|
||||
-record(state, {conn, proc_state, parse_state, stats_timer, connection, heartbeat_mode, heartbeat, heartbeat_sup}).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
start_link(Params) ->
|
||||
gen_server:start_link(?MODULE, Params, []).
|
||||
|
||||
msg(Pid, Data) ->
|
||||
gen_server:cast(Pid, {msg, Data}).
|
||||
|
||||
closed(Pid) ->
|
||||
gen_server:cast(Pid, closed).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
init({SupPid, Conn, Heartbeat}) ->
|
||||
ok = file_handle_cache:obtain(),
|
||||
process_flag(trap_exit, true),
|
||||
{ok, ProcessorState} = init_processor_state(Conn),
|
||||
{ok, rabbit_event:init_stats_timer(
|
||||
#state{conn = Conn,
|
||||
proc_state = ProcessorState,
|
||||
parse_state = rabbit_stomp_frame:initial_state(),
|
||||
heartbeat_sup = SupPid,
|
||||
heartbeat = {none, none},
|
||||
heartbeat_mode = Heartbeat},
|
||||
#state.stats_timer)}.
|
||||
|
||||
init_processor_state({ConnMod, ConnProps}) ->
|
||||
SendFun = fun (_Sync, Data) ->
|
||||
ConnMod:send(ConnProps, Data),
|
||||
ok
|
||||
end,
|
||||
Info = ConnMod:info(ConnProps),
|
||||
Headers = proplists:get_value(headers, Info),
|
||||
|
||||
UseHTTPAuth = application:get_env(rabbitmq_web_stomp, use_http_auth, false),
|
||||
|
||||
StompConfig0 = #stomp_configuration{implicit_connect = false},
|
||||
|
||||
StompConfig = case UseHTTPAuth of
|
||||
true ->
|
||||
case lists:keyfind(authorization, 1, Headers) of
|
||||
false ->
|
||||
%% We fall back to the default STOMP credentials.
|
||||
UserConfig = application:get_env(rabbitmq_stomp, default_user, undefined),
|
||||
StompConfig1 = rabbit_stomp:parse_default_user(UserConfig, StompConfig0),
|
||||
StompConfig1#stomp_configuration{force_default_creds = true};
|
||||
{_, AuthHd} ->
|
||||
{basic, HTTPLogin, HTTPPassCode}
|
||||
= cow_http_hd:parse_authorization(rabbit_data_coercion:to_binary(AuthHd)),
|
||||
StompConfig0#stomp_configuration{
|
||||
default_login = HTTPLogin,
|
||||
default_passcode = HTTPPassCode,
|
||||
force_default_creds = true}
|
||||
end;
|
||||
false ->
|
||||
StompConfig0
|
||||
end,
|
||||
|
||||
Sock = proplists:get_value(socket, Info),
|
||||
{PeerAddr, _} = proplists:get_value(peername, Info),
|
||||
AdapterInfo0 = #amqp_adapter_info{additional_info=Extra}
|
||||
= amqp_connection:socket_adapter_info(Sock, {'Web STOMP', 0}),
|
||||
%% Flow control is not supported for Web-STOMP connections.
|
||||
AdapterInfo = AdapterInfo0#amqp_adapter_info{
|
||||
additional_info=[{state, running}|Extra]},
|
||||
|
||||
ProcessorState = rabbit_stomp_processor:initial_state(
|
||||
StompConfig,
|
||||
{SendFun, AdapterInfo, none, PeerAddr}),
|
||||
{ok, ProcessorState}.
|
||||
|
||||
handle_cast({msg, Data}, State = #state{proc_state = ProcessorState,
|
||||
parse_state = ParseState,
|
||||
connection = ConnPid}) ->
|
||||
case process_received_bytes(Data, ProcessorState, ParseState, ConnPid) of
|
||||
{ok, NewProcState, ParseState1, ConnPid1} ->
|
||||
{noreply, ensure_stats_timer(State#state{
|
||||
parse_state = ParseState1,
|
||||
proc_state = NewProcState,
|
||||
connection = ConnPid1})};
|
||||
{stop, Reason, NewProcState, ParseState1} ->
|
||||
{stop, Reason, State#state{
|
||||
parse_state = ParseState1,
|
||||
proc_state = NewProcState}}
|
||||
end;
|
||||
|
||||
handle_cast(closed, State) ->
|
||||
{stop, normal, State};
|
||||
|
||||
handle_cast(client_timeout, State) ->
|
||||
{stop, {shutdown, client_heartbeat_timeout}, State};
|
||||
|
||||
handle_cast(Cast, State) ->
|
||||
{stop, {odd_cast, Cast}, State}.
|
||||
|
||||
%% TODO this is a bit rubbish - after the preview release we should
|
||||
%% make the credit_flow:send/1 invocation in
|
||||
%% rabbit_stomp_processor:process_frame/2 optional.
|
||||
handle_info({bump_credit, {_, _}}, State) ->
|
||||
{noreply, State};
|
||||
|
||||
handle_info(#'basic.consume_ok'{}, State) ->
|
||||
{noreply, State};
|
||||
handle_info(#'basic.cancel_ok'{}, State) ->
|
||||
{noreply, State};
|
||||
handle_info(#'basic.ack'{delivery_tag = Tag, multiple = IsMulti}, State) ->
|
||||
ProcState = processor_state(State),
|
||||
NewProcState = rabbit_stomp_processor:flush_pending_receipts(Tag,
|
||||
IsMulti,
|
||||
ProcState),
|
||||
{noreply, processor_state(NewProcState, State)};
|
||||
handle_info({Delivery = #'basic.deliver'{},
|
||||
#amqp_msg{props = Props, payload = Payload},
|
||||
DeliveryCtx},
|
||||
State) ->
|
||||
ProcState = processor_state(State),
|
||||
NewProcState = rabbit_stomp_processor:send_delivery(Delivery,
|
||||
Props,
|
||||
Payload,
|
||||
DeliveryCtx,
|
||||
ProcState),
|
||||
{noreply, processor_state(NewProcState, State)};
|
||||
handle_info(#'basic.cancel'{consumer_tag = Ctag}, State) ->
|
||||
ProcState = processor_state(State),
|
||||
case rabbit_stomp_processor:cancel_consumer(Ctag, ProcState) of
|
||||
{ok, NewProcState, _Connection} ->
|
||||
{noreply, processor_state(NewProcState, State)};
|
||||
{stop, Reason, NewProcState} ->
|
||||
{stop, Reason, processor_state(NewProcState, State)}
|
||||
end;
|
||||
|
||||
handle_info({start_heartbeats, _},
|
||||
State = #state{heartbeat_mode = no_heartbeat}) ->
|
||||
{noreply, State};
|
||||
|
||||
handle_info({start_heartbeats, {0, 0}}, State) ->
|
||||
{noreply, State};
|
||||
handle_info({start_heartbeats, {SendTimeout, ReceiveTimeout}},
|
||||
State = #state{conn = {ConnMod, ConnProps},
|
||||
heartbeat_sup = SupPid,
|
||||
heartbeat_mode = heartbeat}) ->
|
||||
Info = ConnMod:info(ConnProps),
|
||||
Sock = proplists:get_value(socket, Info),
|
||||
Pid = self(),
|
||||
SendFun = fun () -> ConnMod:send(ConnProps, <<$\n>>), ok end,
|
||||
ReceiveFun = fun() -> gen_server2:cast(Pid, client_timeout) end,
|
||||
Heartbeat = rabbit_heartbeat:start(SupPid, Sock, SendTimeout,
|
||||
SendFun, ReceiveTimeout, ReceiveFun),
|
||||
{noreply, State#state{heartbeat = Heartbeat}};
|
||||
|
||||
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
handle_info({'EXIT', From, Reason}, State) ->
|
||||
ProcState = processor_state(State),
|
||||
case rabbit_stomp_processor:handle_exit(From, Reason, ProcState) of
|
||||
{stop, Reason, NewProcState} ->
|
||||
{stop, Reason, processor_state(NewProcState, State)};
|
||||
unknown_exit ->
|
||||
{stop, {connection_died, Reason}, State}
|
||||
end;
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
handle_info(emit_stats, State) ->
|
||||
{noreply, emit_stats(State)};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
{stop, {odd_info, Info}, State}.
|
||||
|
||||
|
||||
|
||||
handle_call(Request, _From, State) ->
|
||||
{stop, {odd_request, Request}, State}.
|
||||
|
||||
terminate(_Reason, State = #state{conn = {ConnMod, ConnProps},
|
||||
proc_state = ProcessorState}) ->
|
||||
maybe_emit_stats(State),
|
||||
ok = file_handle_cache:release(),
|
||||
rabbit_stomp_processor:flush_and_die(ProcessorState),
|
||||
ConnMod:close(ConnProps, 1000, "STOMP died"),
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
|
||||
process_received_bytes(Bytes, ProcessorState, ParseState, ConnPid) ->
|
||||
case rabbit_stomp_frame:parse(Bytes, ParseState) of
|
||||
{ok, Frame, Rest} ->
|
||||
case rabbit_stomp_processor:process_frame(Frame, ProcessorState) of
|
||||
{ok, NewProcState, ConnPid1} ->
|
||||
ParseState1 = rabbit_stomp_frame:initial_state(),
|
||||
process_received_bytes(Rest, NewProcState, ParseState1, ConnPid1);
|
||||
{stop, Reason, NewProcState} ->
|
||||
{stop, Reason, NewProcState, ParseState}
|
||||
end;
|
||||
{more, ParseState1} ->
|
||||
{ok, ProcessorState, ParseState1, ConnPid}
|
||||
end.
|
||||
|
||||
processor_state(#state{ proc_state = ProcState }) -> ProcState.
|
||||
processor_state(ProcState, #state{} = State) ->
|
||||
State#state{ proc_state = ProcState}.
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
ensure_stats_timer(State) ->
|
||||
rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats).
|
||||
|
||||
maybe_emit_stats(State) ->
|
||||
rabbit_event:if_enabled(State, #state.stats_timer,
|
||||
fun() -> emit_stats(State) end).
|
||||
|
||||
emit_stats(State=#state{connection = C}) when C == none; C == undefined ->
|
||||
%% Avoid emitting stats on terminate when the connection has not yet been
|
||||
%% established, as this causes orphan entries on the stats database
|
||||
State1 = rabbit_event:reset_stats_timer(State, #state.stats_timer),
|
||||
State1;
|
||||
emit_stats(State=#state{conn={ConnMod, ConnProps}, connection=ConnPid}) ->
|
||||
Info = ConnMod:info(ConnProps),
|
||||
Sock = proplists:get_value(socket, Info),
|
||||
SockInfos = case rabbit_net:getstat(Sock,
|
||||
[recv_oct, recv_cnt, send_oct, send_cnt, send_pend]) of
|
||||
{ok, SI} -> SI;
|
||||
{error, _} -> []
|
||||
end,
|
||||
Infos = [{pid, ConnPid}|SockInfos],
|
||||
rabbit_core_metrics:connection_stats(ConnPid, Infos),
|
||||
rabbit_event:notify(connection_stats, Infos),
|
||||
State1 = rabbit_event:reset_stats_timer(State, #state.stats_timer),
|
||||
State1.
|
||||
|
|
@ -1,40 +0,0 @@
|
|||
%% The contents of this file are subject to the Mozilla Public License
|
||||
%% Version 1.1 (the "License"); you may not use this file except in
|
||||
%% compliance with the License. You may obtain a copy of the License
|
||||
%% at http://www.mozilla.org/MPL/
|
||||
%%
|
||||
%% Software distributed under the License is distributed on an "AS IS"
|
||||
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
||||
%% the License for the specific language governing rights and
|
||||
%% limitations under the License.
|
||||
%%
|
||||
%% The Original Code is RabbitMQ.
|
||||
%%
|
||||
%% The Initial Developer of the Original Code is GoPivotal, Inc.
|
||||
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
|
||||
%%
|
||||
|
||||
-module(rabbit_ws_client_sup).
|
||||
-behaviour(supervisor2).
|
||||
|
||||
-export([start_client/1]).
|
||||
-export([init/1]).
|
||||
|
||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
|
||||
|
||||
%% --------------------------------------------------------------------------
|
||||
|
||||
start_client({Conn, Heartbeat}) ->
|
||||
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
|
||||
{ok, Client} = supervisor2:start_child(
|
||||
SupPid, client_spec(SupPid, Conn, Heartbeat)),
|
||||
{ok, SupPid, Client}.
|
||||
|
||||
|
||||
client_spec(SupPid, Conn, Heartbeat) ->
|
||||
{rabbit_ws_client, {rabbit_ws_client, start_link, [{SupPid, Conn, Heartbeat}]},
|
||||
intrinsic, ?WORKER_WAIT, worker, [rabbit_ws_client]}.
|
||||
|
||||
init(_Any) ->
|
||||
{ok, {{one_for_all, 0, 1}, []}}.
|
||||
|
|
@ -0,0 +1,60 @@
|
|||
%% The contents of this file are subject to the Mozilla Public License
|
||||
%% Version 1.1 (the "License"); you may not use this file except in
|
||||
%% compliance with the License. You may obtain a copy of the License
|
||||
%% at http://www.mozilla.org/MPL/
|
||||
%%
|
||||
%% Software distributed under the License is distributed on an "AS IS"
|
||||
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
||||
%% the License for the specific language governing rights and
|
||||
%% limitations under the License.
|
||||
%%
|
||||
%% The Original Code is RabbitMQ.
|
||||
%%
|
||||
%% The Initial Developer of the Original Code is GoPivotal, Inc.
|
||||
%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
|
||||
%%
|
||||
|
||||
-module(rabbit_ws_connection_sup).
|
||||
|
||||
-behaviour(supervisor2).
|
||||
-behaviour(ranch_protocol).
|
||||
|
||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
|
||||
-export([start_link/4, start_keepalive_link/0]).
|
||||
-export([init/1]).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
start_link(Ref, Sock, Transport, CowboyOpts0) ->
|
||||
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
|
||||
{ok, KeepaliveSup} = supervisor2:start_child(
|
||||
SupPid,
|
||||
{rabbit_web_stomp_keepalive_sup,
|
||||
{?MODULE, start_keepalive_link, []},
|
||||
intrinsic, infinity, supervisor, [rabbit_keepalive_sup]}),
|
||||
%% In order for the Websocket handler to receive the KeepaliveSup
|
||||
%% variable, we need to pass it first through the environment and
|
||||
%% then have the middleware rabbit_web_mqtt_middleware place it
|
||||
%% in the initial handler state.
|
||||
Env = maps:get(env, CowboyOpts0),
|
||||
CowboyOpts = CowboyOpts0#{env => Env#{keepalive_sup => KeepaliveSup,
|
||||
socket => Sock}},
|
||||
Protocol = case Transport of
|
||||
ranch_tcp -> cowboy_clear;
|
||||
ranch_ssl -> cowboy_tls
|
||||
end,
|
||||
{ok, ReaderPid} = supervisor2:start_child(
|
||||
SupPid,
|
||||
{Protocol,
|
||||
{Protocol, start_link, [Ref, Sock, Transport, CowboyOpts]},
|
||||
intrinsic, ?WORKER_WAIT, worker, [Protocol]}),
|
||||
{ok, SupPid, ReaderPid}.
|
||||
|
||||
start_keepalive_link() ->
|
||||
supervisor2:start_link(?MODULE, []).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
{ok, {{one_for_all, 0, 1}, []}}.
|
||||
|
|
@ -17,6 +17,9 @@
|
|||
-module(rabbit_ws_handler).
|
||||
-behaviour(cowboy_websocket).
|
||||
|
||||
-include_lib("rabbitmq_stomp/include/rabbit_stomp.hrl").
|
||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
|
||||
%% Websocket.
|
||||
-export([init/2]).
|
||||
-export([websocket_init/1]).
|
||||
|
|
@ -24,19 +27,28 @@
|
|||
-export([websocket_info/2]).
|
||||
-export([terminate/3]).
|
||||
|
||||
%% SockJS interface
|
||||
-export([info/1]).
|
||||
-export([send/2]).
|
||||
-export([close/3]).
|
||||
|
||||
-record(state, {conn, pid, type}).
|
||||
-record(state, {
|
||||
frame_type,
|
||||
heartbeat_mode,
|
||||
heartbeat,
|
||||
heartbeat_sup,
|
||||
parse_state,
|
||||
proc_state,
|
||||
socket,
|
||||
peername,
|
||||
auth_hd,
|
||||
stats_timer,
|
||||
connection
|
||||
}).
|
||||
|
||||
%% Websocket.
|
||||
init(Req0, Opts) ->
|
||||
Req = case cowboy_req:header(<<"sec-websocket-protocol">>, Req0) of
|
||||
{PeerAddr, _PeerPort} = maps:get(peer, Req0),
|
||||
{_, KeepaliveSup} = lists:keyfind(keepalive_sup, 1, Opts),
|
||||
{_, Sock} = lists:keyfind(socket, 1, Opts),
|
||||
Req = case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req0) of
|
||||
undefined -> Req0;
|
||||
ProtocolHd ->
|
||||
Protocols = parse_sec_websocket_protocol_req(ProtocolHd),
|
||||
Protocols ->
|
||||
case filter_stomp_protocols(Protocols) of
|
||||
[] -> Req0;
|
||||
[StompProtocol|_] ->
|
||||
|
|
@ -44,57 +56,154 @@ init(Req0, Opts) ->
|
|||
StompProtocol, Req0)
|
||||
end
|
||||
end,
|
||||
{_, FrameType} = lists:keyfind(type, 1, Opts),
|
||||
Socket = maps:get(socket, Req0),
|
||||
Peername = cowboy_req:peer(Req),
|
||||
{ok, Sockname} = rabbit_net:sockname(Socket),
|
||||
Headers = case cowboy_req:header(<<"authorization">>, Req) of
|
||||
undefined -> [];
|
||||
AuthHd -> [{authorization, binary_to_list(AuthHd)}]
|
||||
end,
|
||||
WsOpts0 = proplists:get_value(ws_opts, Opts, #{}),
|
||||
WsOpts = maps:merge(#{compress => true}, WsOpts0),
|
||||
{cowboy_websocket, Req, {Socket, Peername, Sockname, Headers, FrameType}, WsOpts}.
|
||||
{cowboy_websocket, Req, #state{
|
||||
frame_type = proplists:get_value(type, Opts, text),
|
||||
heartbeat_sup = KeepaliveSup,
|
||||
heartbeat = {none, none},
|
||||
heartbeat_mode = heartbeat,
|
||||
socket = Sock,
|
||||
peername = PeerAddr,
|
||||
auth_hd = cowboy_req:header(<<"authorization">>, Req)
|
||||
}, WsOpts}.
|
||||
|
||||
websocket_init({Socket, Peername, Sockname, Headers, FrameType}) ->
|
||||
Info = [{socket, Socket},
|
||||
{peername, Peername},
|
||||
{sockname, Sockname},
|
||||
{headers, Headers}],
|
||||
{ok, _Sup, Pid} = rabbit_ws_sup:start_client({{?MODULE,
|
||||
#{pid => self(),
|
||||
info => Info}},
|
||||
heartbeat}),
|
||||
{ok, #state{pid=Pid, type=FrameType}}.
|
||||
websocket_init(State) ->
|
||||
ok = file_handle_cache:obtain(),
|
||||
process_flag(trap_exit, true),
|
||||
{ok, ProcessorState} = init_processor_state(State),
|
||||
{ok, rabbit_event:init_stats_timer(
|
||||
State#state{proc_state = ProcessorState,
|
||||
parse_state = rabbit_stomp_frame:initial_state()},
|
||||
#state.stats_timer)}.
|
||||
|
||||
websocket_handle({text, Data}, State=#state{pid=Pid}) ->
|
||||
rabbit_ws_client:msg(Pid, Data),
|
||||
{ok, State};
|
||||
websocket_handle({binary, Data}, State=#state{pid=Pid}) ->
|
||||
rabbit_ws_client:msg(Pid, Data),
|
||||
{ok, State};
|
||||
init_processor_state(#state{socket=Sock, peername=PeerAddr, auth_hd=AuthHd}) ->
|
||||
Self = self(),
|
||||
SendFun = fun (_Sync, Data) ->
|
||||
Self ! {send, Data},
|
||||
ok
|
||||
end,
|
||||
|
||||
UseHTTPAuth = application:get_env(rabbitmq_web_stomp, use_http_auth, false),
|
||||
StompConfig0 = #stomp_configuration{implicit_connect = false},
|
||||
StompConfig = case UseHTTPAuth of
|
||||
true ->
|
||||
case AuthHd of
|
||||
undefined ->
|
||||
%% We fall back to the default STOMP credentials.
|
||||
UserConfig = application:get_env(rabbitmq_stomp, default_user, undefined),
|
||||
StompConfig1 = rabbit_stomp:parse_default_user(UserConfig, StompConfig0),
|
||||
StompConfig1#stomp_configuration{force_default_creds = true};
|
||||
_ ->
|
||||
{basic, HTTPLogin, HTTPPassCode}
|
||||
= cow_http_hd:parse_authorization(AuthHd),
|
||||
StompConfig0#stomp_configuration{
|
||||
default_login = HTTPLogin,
|
||||
default_passcode = HTTPPassCode,
|
||||
force_default_creds = true}
|
||||
end;
|
||||
false ->
|
||||
StompConfig0
|
||||
end,
|
||||
|
||||
AdapterInfo0 = #amqp_adapter_info{additional_info=Extra}
|
||||
= amqp_connection:socket_adapter_info(Sock, {'Web STOMP', 0}),
|
||||
%% Flow control is not supported for Web-STOMP connections.
|
||||
AdapterInfo = AdapterInfo0#amqp_adapter_info{
|
||||
additional_info=[{state, running}|Extra]},
|
||||
|
||||
ProcessorState = rabbit_stomp_processor:initial_state(
|
||||
StompConfig,
|
||||
{SendFun, AdapterInfo, none, PeerAddr}),
|
||||
{ok, ProcessorState}.
|
||||
|
||||
websocket_handle({text, Data}, State) ->
|
||||
handle_data(Data, State);
|
||||
websocket_handle({binary, Data}, State) ->
|
||||
handle_data(Data, State);
|
||||
websocket_handle(_Frame, State) ->
|
||||
{ok, State}.
|
||||
|
||||
websocket_info({send, Msg}, State=#state{type=FrameType}) ->
|
||||
websocket_info({send, Msg}, State=#state{frame_type=FrameType}) ->
|
||||
{reply, {FrameType, Msg}, State};
|
||||
websocket_info(Frame = {close, _, _}, State) ->
|
||||
{reply, Frame, State};
|
||||
websocket_info(_Info, State) ->
|
||||
|
||||
%% TODO this is a bit rubbish - after the preview release we should
|
||||
%% make the credit_flow:send/1 invocation in
|
||||
%% rabbit_stomp_processor:process_frame/2 optional.
|
||||
websocket_info({bump_credit, {_, _}}, State) ->
|
||||
{ok, State};
|
||||
|
||||
websocket_info(#'basic.consume_ok'{}, State) ->
|
||||
{ok, State};
|
||||
websocket_info(#'basic.cancel_ok'{}, State) ->
|
||||
{ok, State};
|
||||
websocket_info(#'basic.ack'{delivery_tag = Tag, multiple = IsMulti},
|
||||
State=#state{ proc_state = ProcState0 }) ->
|
||||
ProcState = rabbit_stomp_processor:flush_pending_receipts(Tag,
|
||||
IsMulti,
|
||||
ProcState0),
|
||||
{ok, State#state{ proc_state = ProcState }};
|
||||
websocket_info({Delivery = #'basic.deliver'{},
|
||||
#amqp_msg{props = Props, payload = Payload},
|
||||
DeliveryCtx},
|
||||
State=#state{ proc_state = ProcState0 }) ->
|
||||
ProcState = rabbit_stomp_processor:send_delivery(Delivery,
|
||||
Props,
|
||||
Payload,
|
||||
DeliveryCtx,
|
||||
ProcState0),
|
||||
{ok, State#state{ proc_state = ProcState }};
|
||||
websocket_info(#'basic.cancel'{consumer_tag = Ctag},
|
||||
State=#state{ proc_state = ProcState0 }) ->
|
||||
case rabbit_stomp_processor:cancel_consumer(Ctag, ProcState0) of
|
||||
{ok, ProcState, _Connection} ->
|
||||
{ok, State#state{ proc_state = ProcState }};
|
||||
{stop, _Reason, ProcState} ->
|
||||
stop(State#state{ proc_state = ProcState })
|
||||
end;
|
||||
|
||||
websocket_info({start_heartbeats, _},
|
||||
State = #state{heartbeat_mode = no_heartbeat}) ->
|
||||
{ok, State};
|
||||
|
||||
websocket_info({start_heartbeats, {0, 0}}, State) ->
|
||||
{ok, State};
|
||||
websocket_info({start_heartbeats, {SendTimeout, ReceiveTimeout}},
|
||||
State = #state{socket = Sock,
|
||||
heartbeat_sup = SupPid,
|
||||
heartbeat_mode = heartbeat}) ->
|
||||
Self = self(),
|
||||
SendFun = fun () -> Self ! {send, <<$\n>>}, ok end,
|
||||
ReceiveFun = fun() -> Self ! client_timeout end,
|
||||
Heartbeat = rabbit_heartbeat:start(SupPid, Sock, SendTimeout,
|
||||
SendFun, ReceiveTimeout, ReceiveFun),
|
||||
{ok, State#state{heartbeat = Heartbeat}};
|
||||
websocket_info(client_timeout, State) ->
|
||||
stop(State);
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
websocket_info({'EXIT', From, Reason},
|
||||
State=#state{ proc_state = ProcState0 }) ->
|
||||
case rabbit_stomp_processor:handle_exit(From, Reason, ProcState0) of
|
||||
{stop, _Reason, ProcState} ->
|
||||
stop(State#state{ proc_state = ProcState });
|
||||
unknown_exit ->
|
||||
stop(State)
|
||||
end;
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
websocket_info(emit_stats, State) ->
|
||||
{ok, emit_stats(State)};
|
||||
|
||||
websocket_info(Msg, State) ->
|
||||
rabbit_log_connection:info("WEB-STOMP: unexpected message ~p~n",
|
||||
[Msg]),
|
||||
{ok, State}.
|
||||
|
||||
terminate(_Reason, _Req, {_, _, _, _, _}) ->
|
||||
ok;
|
||||
terminate(_Reason, _Req, #state{pid=Pid}) ->
|
||||
rabbit_ws_client:closed(Pid),
|
||||
terminate(_Reason, _Req, _State) ->
|
||||
ok.
|
||||
|
||||
%% When moving to Cowboy 2, this code should be replaced
|
||||
%% with a simple call to cow_http_hd:parse_sec_websocket_protocol_req/1.
|
||||
|
||||
parse_sec_websocket_protocol_req(Bin) ->
|
||||
Protocols = binary:split(Bin, [<<$,>>, <<$\s>>], [global]),
|
||||
[P || P <- Protocols, P =/= <<>>].
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
%% The protocols v10.stomp, v11.stomp and v12.stomp are registered
|
||||
%% at IANA: https://www.iana.org/assignments/websocket/websocket.xhtml
|
||||
|
|
@ -108,18 +217,58 @@ filter_stomp_protocols(Protocols) ->
|
|||
end,
|
||||
Protocols))).
|
||||
|
||||
%% TODO
|
||||
%% Ideally all the STOMP interaction should be done from
|
||||
%% within the Websocket process. This could be a good refactoring
|
||||
%% once SockJS gets removed.
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
info(#{info := Info}) ->
|
||||
Info.
|
||||
handle_data(<<>>, State) ->
|
||||
{ok, ensure_stats_timer(State)};
|
||||
handle_data(Bytes, State = #state{proc_state = ProcState,
|
||||
parse_state = ParseState}) ->
|
||||
case rabbit_stomp_frame:parse(Bytes, ParseState) of
|
||||
{more, ParseState1} ->
|
||||
{ok, ensure_stats_timer(State#state{ parse_state = ParseState1 })};
|
||||
{ok, Frame, Rest} ->
|
||||
case rabbit_stomp_processor:process_frame(Frame, ProcState) of
|
||||
{ok, ProcState1, ConnPid} ->
|
||||
ParseState1 = rabbit_stomp_frame:initial_state(),
|
||||
handle_data(
|
||||
Rest,
|
||||
State #state{ parse_state = ParseState1,
|
||||
proc_state = ProcState1,
|
||||
connection = ConnPid });
|
||||
{stop, _Reason, ProcState1} ->
|
||||
io:format(user, "~p~n", [_Reason]),
|
||||
stop(State#state{ proc_state = ProcState1 })
|
||||
end
|
||||
end.
|
||||
|
||||
send(#{pid := Pid}, Data) ->
|
||||
Pid ! {send, Data},
|
||||
ok.
|
||||
stop(State = #state{proc_state = ProcState}) ->
|
||||
maybe_emit_stats(State),
|
||||
ok = file_handle_cache:release(),
|
||||
rabbit_stomp_processor:flush_and_die(ProcState),
|
||||
{reply, {close, 1000, "STOMP died"}, State}.
|
||||
|
||||
close(#{pid := Pid}, Code, Reason) ->
|
||||
Pid ! {close, Code, Reason},
|
||||
ok.
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
ensure_stats_timer(State) ->
|
||||
rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats).
|
||||
|
||||
maybe_emit_stats(State) ->
|
||||
rabbit_event:if_enabled(State, #state.stats_timer,
|
||||
fun() -> emit_stats(State) end).
|
||||
|
||||
emit_stats(State=#state{connection = C}) when C == none; C == undefined ->
|
||||
%% Avoid emitting stats on terminate when the connection has not yet been
|
||||
%% established, as this causes orphan entries on the stats database
|
||||
State1 = rabbit_event:reset_stats_timer(State, #state.stats_timer),
|
||||
State1;
|
||||
emit_stats(State=#state{socket=Sock, connection=Conn}) ->
|
||||
SockInfos = case rabbit_net:getstat(Sock,
|
||||
[recv_oct, recv_cnt, send_oct, send_cnt, send_pend]) of
|
||||
{ok, SI} -> SI;
|
||||
{error, _} -> []
|
||||
end,
|
||||
Infos = [{pid, Conn}|SockInfos],
|
||||
rabbit_core_metrics:connection_stats(Conn, Infos),
|
||||
rabbit_event:notify(connection_stats, Infos),
|
||||
State1 = rabbit_event:reset_stats_timer(State, #state.stats_timer),
|
||||
State1.
|
||||
|
|
|
|||
|
|
@ -46,8 +46,8 @@ init() ->
|
|||
end,
|
||||
case ranch:start_listener(
|
||||
http, NumTcpAcceptors,
|
||||
ranch_tcp, TcpConf,
|
||||
rabbit_ws_protocol,
|
||||
ranch_tcp, [{connection_type, supervisor}|TcpConf],
|
||||
rabbit_ws_connection_sup,
|
||||
CowboyOpts#{env => #{dispatch => Routes},
|
||||
middlewares => [cowboy_router,
|
||||
rabbit_ws_middleware,
|
||||
|
|
@ -78,8 +78,8 @@ init() ->
|
|||
end,
|
||||
{ok, _} = ranch:start_listener(
|
||||
https, NumSslAcceptors,
|
||||
ranch_ssl, TLSConf,
|
||||
rabbit_ws_protocol,
|
||||
ranch_ssl, [{connection_type, supervisor}|TLSConf],
|
||||
rabbit_ws_connection_sup,
|
||||
CowboyOpts#{env => #{dispatch => Routes},
|
||||
middlewares => [cowboy_router,
|
||||
rabbit_ws_middleware,
|
||||
|
|
|
|||
|
|
@ -19,8 +19,12 @@
|
|||
-export([execute/2]).
|
||||
|
||||
execute(Req, Env) ->
|
||||
#{keepalive_sup := KeepaliveSup} = Env,
|
||||
Sock = maps:get(socket, Env),
|
||||
{ok, Req#{socket => Sock}, Env}.
|
||||
|
||||
|
||||
|
||||
case maps:get(handler_opts, Env, undefined) of
|
||||
undefined -> {ok, Req, Env};
|
||||
Opts when is_list(Opts) ->
|
||||
{ok, Req, Env#{handler_opts => [{keepalive_sup, KeepaliveSup},
|
||||
{socket, Sock}
|
||||
|Opts]}}
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -1,33 +0,0 @@
|
|||
%% The contents of this file are subject to the Mozilla Public License
|
||||
%% Version 1.1 (the "License"); you may not use this file except in
|
||||
%% compliance with the License. You may obtain a copy of the License
|
||||
%% at http://www.mozilla.org/MPL/
|
||||
%%
|
||||
%% Software distributed under the License is distributed on an "AS IS"
|
||||
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
||||
%% the License for the specific language governing rights and
|
||||
%% limitations under the License.
|
||||
%%
|
||||
%% The Original Code is RabbitMQ.
|
||||
%%
|
||||
%% The Initial Developer of the Original Code is GoPivotal, Inc.
|
||||
%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
|
||||
%%
|
||||
|
||||
-module(rabbit_ws_protocol).
|
||||
-behaviour(ranch_protocol).
|
||||
|
||||
-export([start_link/4]).
|
||||
|
||||
start_link(Ref, Sock, Transport, CowboyOpts0) ->
|
||||
%% In order for the Websocket handler to receive the KeepaliveSup
|
||||
%% variable, we need to pass it first through the environment and
|
||||
%% then have the middleware rabbit_web_mqtt_middleware place it
|
||||
%% in the initial handler state.
|
||||
Env = maps:get(env, CowboyOpts0),
|
||||
CowboyOpts = CowboyOpts0#{env => Env#{socket => Sock}},
|
||||
Protocol = case Transport of
|
||||
ranch_tcp -> cowboy_clear;
|
||||
ranch_ssl -> cowboy_tls
|
||||
end,
|
||||
Protocol:start_link(Ref, Sock, Transport, CowboyOpts).
|
||||
|
|
@ -17,7 +17,7 @@
|
|||
-module(rabbit_ws_sup).
|
||||
-behaviour(supervisor2).
|
||||
|
||||
-export([start_link/0, init/1, start_client/1]).
|
||||
-export([start_link/0, init/1]).
|
||||
|
||||
-define(SUP_NAME, ?MODULE).
|
||||
|
||||
|
|
@ -28,9 +28,4 @@ start_link() ->
|
|||
supervisor2:start_link({local, ?SUP_NAME}, ?MODULE, []).
|
||||
|
||||
init([]) ->
|
||||
{ok, {{simple_one_for_one, 0, 1},
|
||||
[{client, {rabbit_ws_client_sup, start_client, []},
|
||||
temporary, infinity, supervisor, [rabbit_ws_client_sup]}]}}.
|
||||
|
||||
start_client(Params) ->
|
||||
supervisor2:start_child(?SUP_NAME, [Params]).
|
||||
{ok, {{one_for_one, 1, 5}, []}}.
|
||||
|
|
|
|||
|
|
@ -107,7 +107,7 @@ start_conn(State = #state{transport = Transport}, AuthInfo, Protocols) ->
|
|||
|
||||
ProtocolHd = case Protocols of
|
||||
[] -> "";
|
||||
_ -> "Sec-Websocket-Protocol: " ++ string:join(Protocols, ", ")
|
||||
_ -> "Sec-Websocket-Protocol: " ++ string:join(Protocols, ", ") ++ "\r\n"
|
||||
end,
|
||||
|
||||
Key = base64:encode_to_string(crypto:strong_rand_bytes(16)),
|
||||
|
|
|
|||
Loading…
Reference in New Issue