Support AMQP over WebSocket in Erlang client
## What? Implement the AMQP over WebSocket Binding Committee Specification 01 in the AMQP 1.0 Erlang client: https://docs.oasis-open.org/amqp-bindmap/amqp-wsb/v1.0/cs01/amqp-wsb-v1.0-cs01.html ## Why? 1. This allows writing integration tests for the server implementation of AMQP over WebSocket. 2. Erlang and Elixir clients can use AMQP over WebSocket in environments where firewalls prohibit access to the AMQP port. ## How? Use gun as WebSocket client. The new module `amqp10_client_socket` handles socket operations (open, close, send) for: * TCP sockets * SSL sockets * WebSockets Prior to this commit, the amqp10_client_connection process closed only the write end of the socket after it sent the AMQP close performative. This commit removed premature socket closure because: 1. There is no equivalent feature provided in Gun since sending a WebSocket close frame causes Gun to cleanly close the connection for both writing and reading. 2. It's unnecessary and can result in unexpected and confusing behaviour on the server. 3. It's better practive to keep the TCP connection fully open until the AMQP closing handshake completes. 4. When amqp10_client_frame_reader terminates, it will cleanly close the socket for both writing and reading.
This commit is contained in:
parent
75c4c3977d
commit
5a467934e3
|
@ -29,7 +29,7 @@ endef
|
|||
PACKAGES_DIR ?= $(abspath PACKAGES)
|
||||
|
||||
BUILD_DEPS = rabbit_common elvis_mk
|
||||
DEPS = amqp10_common credentials_obfuscation
|
||||
DEPS = amqp10_common credentials_obfuscation gun
|
||||
TEST_DEPS = rabbit rabbitmq_ct_helpers
|
||||
LOCAL_DEPS = ssl inets crypto public_key
|
||||
|
||||
|
|
|
@ -43,8 +43,6 @@
|
|||
|
||||
-export([format_status/1]).
|
||||
|
||||
-type amqp10_socket() :: {tcp, gen_tcp:socket()} | {ssl, ssl:sslsocket()}.
|
||||
|
||||
-type milliseconds() :: non_neg_integer().
|
||||
|
||||
-type address() :: inet:socket_address() | inet:hostname().
|
||||
|
@ -60,6 +58,8 @@
|
|||
address => address(),
|
||||
port => inet:port_number(),
|
||||
tls_opts => {secure_port, [ssl:tls_option()]},
|
||||
ws_path => string(),
|
||||
ws_opts => gun:opts(),
|
||||
notify => pid() | none, % the pid to send connection events to
|
||||
notify_when_opened => pid() | none,
|
||||
notify_when_closed => pid() | none,
|
||||
|
@ -83,14 +83,13 @@
|
|||
sessions_sup :: pid() | undefined,
|
||||
pending_session_reqs = [] :: [term()],
|
||||
reader :: pid() | undefined,
|
||||
socket :: amqp10_socket() | undefined,
|
||||
socket :: amqp10_client_socket:socket() | undefined,
|
||||
idle_time_out :: non_neg_integer() | undefined,
|
||||
heartbeat_timer :: timer:tref() | undefined,
|
||||
config :: connection_config()
|
||||
}).
|
||||
|
||||
-export_type([connection_config/0,
|
||||
amqp10_socket/0]).
|
||||
-export_type([connection_config/0]).
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% Public API.
|
||||
|
@ -152,7 +151,7 @@ start_link(Sup, Config) ->
|
|||
set_other_procs(Pid, OtherProcs) ->
|
||||
gen_statem:cast(Pid, {set_other_procs, OtherProcs}).
|
||||
|
||||
-spec socket_ready(pid(), amqp10_socket()) -> ok.
|
||||
-spec socket_ready(pid(), amqp10_client_socket:socket()) -> ok.
|
||||
socket_ready(Pid, Socket) ->
|
||||
gen_statem:cast(Pid, {socket_ready, Socket}).
|
||||
|
||||
|
@ -186,10 +185,10 @@ expecting_socket(_EvtType, {socket_ready, Socket},
|
|||
Sasl = credentials_obfuscation:decrypt(maps:get(sasl, Cfg)),
|
||||
case Sasl of
|
||||
none ->
|
||||
ok = socket_send(Socket, ?AMQP_PROTOCOL_HEADER),
|
||||
ok = amqp10_client_socket:send(Socket, ?AMQP_PROTOCOL_HEADER),
|
||||
{next_state, hdr_sent, State1};
|
||||
_ ->
|
||||
ok = socket_send(Socket, ?SASL_PROTOCOL_HEADER),
|
||||
ok = amqp10_client_socket:send(Socket, ?SASL_PROTOCOL_HEADER),
|
||||
{next_state, sasl_hdr_sent, State1}
|
||||
end;
|
||||
expecting_socket(_EvtType, {set_other_procs, OtherProcs}, State) ->
|
||||
|
@ -207,7 +206,10 @@ sasl_hdr_sent(_EvtType, {protocol_header_received, 3, 1, 0, 0}, State) ->
|
|||
sasl_hdr_sent({call, From}, begin_session,
|
||||
#state{pending_session_reqs = PendingSessionReqs} = State) ->
|
||||
State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
|
||||
{keep_state, State1}.
|
||||
{keep_state, State1};
|
||||
sasl_hdr_sent(info, {'DOWN', MRef, process, _Pid, _},
|
||||
#state{reader_m_ref = MRef}) ->
|
||||
{stop, {shutdown, reader_down}}.
|
||||
|
||||
sasl_hdr_rcvds(_EvtType, #'v1_0.sasl_mechanisms'{
|
||||
sasl_server_mechanisms = {array, symbol, AvailableMechs}},
|
||||
|
@ -228,7 +230,7 @@ sasl_hdr_rcvds({call, From}, begin_session,
|
|||
|
||||
sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, 0}},
|
||||
#state{socket = Socket} = State) ->
|
||||
ok = socket_send(Socket, ?AMQP_PROTOCOL_HEADER),
|
||||
ok = amqp10_client_socket:send(Socket, ?AMQP_PROTOCOL_HEADER),
|
||||
{next_state, hdr_sent, State};
|
||||
sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, C}},
|
||||
#state{} = State) when C==1;C==2;C==3;C==4 ->
|
||||
|
@ -285,7 +287,7 @@ open_sent({call, From}, begin_session,
|
|||
#state{pending_session_reqs = PendingSessionReqs} = State) ->
|
||||
State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
|
||||
{keep_state, State1};
|
||||
open_sent(info, {'DOWN', MRef, _, _, _},
|
||||
open_sent(info, {'DOWN', MRef, process, _, _},
|
||||
#state{reader_m_ref = MRef}) ->
|
||||
{stop, {shutdown, reader_down}}.
|
||||
|
||||
|
@ -294,46 +296,56 @@ opened(_EvtType, heartbeat, State = #state{idle_time_out = T}) ->
|
|||
{ok, Tmr} = start_heartbeat_timer(T),
|
||||
{keep_state, State#state{heartbeat_timer = Tmr}};
|
||||
opened(_EvtType, {close, Reason}, State) ->
|
||||
%% We send the first close frame and wait for the reply.
|
||||
%% TODO: stop all sessions writing
|
||||
%% We could still accept incoming frames (See: 2.4.6)
|
||||
case send_close(State, Reason) of
|
||||
ok -> {next_state, close_sent, State};
|
||||
{error, closed} -> {stop, normal, State};
|
||||
Error -> {stop, Error, State}
|
||||
ok ->
|
||||
%% "After writing this frame the peer SHOULD continue to read from the connection
|
||||
%% until it receives the partner's close frame (in order to guard against
|
||||
%% erroneously or maliciously implemented partners, a peer SHOULD implement a
|
||||
%% timeout to give its partner a reasonable time to receive and process the close
|
||||
%% before giving up and simply closing the underlying transport mechanism)." [§2.4.3]
|
||||
{next_state, close_sent, State, {state_timeout, ?TIMEOUT, received_no_close_frame}};
|
||||
{error, closed} ->
|
||||
{stop, normal, State};
|
||||
Error ->
|
||||
{stop, Error, State}
|
||||
end;
|
||||
opened(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) ->
|
||||
%% We receive the first close frame, reply and terminate.
|
||||
ok = notify_closed(Config, Close),
|
||||
_ = send_close(State, none),
|
||||
{stop, normal, State};
|
||||
case send_close(State, none) of
|
||||
ok -> {stop, normal, State};
|
||||
{error, closed} -> {stop, normal, State};
|
||||
Error -> {stop, Error, State}
|
||||
end;
|
||||
opened({call, From}, begin_session, State) ->
|
||||
{Ret, State1} = handle_begin_session(From, State),
|
||||
{keep_state, State1, [{reply, From, Ret}]};
|
||||
opened(info, {'DOWN', MRef, _, _, _Info},
|
||||
State = #state{reader_m_ref = MRef, config = Config}) ->
|
||||
opened(info, {'DOWN', MRef, process, _, _Info},
|
||||
#state{reader_m_ref = MRef, config = Config}) ->
|
||||
%% reader has gone down and we are not already shutting down
|
||||
ok = notify_closed(Config, shutdown),
|
||||
{stop, normal, State};
|
||||
{stop, normal};
|
||||
opened(_EvtType, Frame, State) ->
|
||||
logger:warning("Unexpected connection frame ~tp when in state ~tp ",
|
||||
[Frame, State]),
|
||||
{keep_state, State}.
|
||||
keep_state_and_data.
|
||||
|
||||
close_sent(_EvtType, heartbeat, State) ->
|
||||
{next_state, close_sent, State};
|
||||
close_sent(_EvtType, {'EXIT', _Pid, shutdown}, State) ->
|
||||
close_sent(_EvtType, heartbeat, _Data) ->
|
||||
keep_state_and_data;
|
||||
close_sent(_EvtType, {'EXIT', _Pid, shutdown}, _Data) ->
|
||||
%% monitored processes may exit during closure
|
||||
{next_state, close_sent, State};
|
||||
close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _},
|
||||
#state{reader = ReaderPid} = State) ->
|
||||
%% if the reader exits we probably wont receive a close frame
|
||||
{stop, normal, State};
|
||||
close_sent(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) ->
|
||||
keep_state_and_data;
|
||||
close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _Reason},
|
||||
#state{reader = ReaderPid}) ->
|
||||
%% if the reader exits we probably won't receive a close frame
|
||||
{stop, normal};
|
||||
close_sent(_EvtType, #'v1_0.close'{} = Close, #state{config = Config}) ->
|
||||
ok = notify_closed(Config, Close),
|
||||
%% TODO: we should probably set up a timer before this to ensure
|
||||
%% we close down event if no reply is received
|
||||
{stop, normal, State}.
|
||||
{stop, normal};
|
||||
close_sent(state_timeout, received_no_close_frame, _Data) ->
|
||||
{stop, normal}.
|
||||
|
||||
set_other_procs0(OtherProcs, State) ->
|
||||
#{sessions_sup := SessionsSup,
|
||||
|
@ -435,7 +447,7 @@ send_open(#state{socket = Socket, config = Config0}) ->
|
|||
Encoded = amqp10_framing:encode_bin(Open),
|
||||
Frame = amqp10_binary_generator:build_frame(0, Encoded),
|
||||
?DBG("CONN <- ~tp", [Open]),
|
||||
socket_send(Socket, Frame).
|
||||
amqp10_client_socket:send(Socket, Frame).
|
||||
|
||||
|
||||
send_close(#state{socket = Socket}, _Reason) ->
|
||||
|
@ -443,14 +455,7 @@ send_close(#state{socket = Socket}, _Reason) ->
|
|||
Encoded = amqp10_framing:encode_bin(Close),
|
||||
Frame = amqp10_binary_generator:build_frame(0, Encoded),
|
||||
?DBG("CONN <- ~tp", [Close]),
|
||||
Ret = socket_send(Socket, Frame),
|
||||
case Ret of
|
||||
ok -> _ =
|
||||
socket_shutdown(Socket, write),
|
||||
ok;
|
||||
_ -> ok
|
||||
end,
|
||||
Ret.
|
||||
amqp10_client_socket:send(Socket, Frame).
|
||||
|
||||
send_sasl_init(State, anon) ->
|
||||
Frame = #'v1_0.sasl_init'{mechanism = {symbol, <<"ANONYMOUS">>}},
|
||||
|
@ -474,21 +479,11 @@ send(Record, FrameType, #state{socket = Socket}) ->
|
|||
Encoded = amqp10_framing:encode_bin(Record),
|
||||
Frame = amqp10_binary_generator:build_frame(0, FrameType, Encoded),
|
||||
?DBG("CONN <- ~tp", [Record]),
|
||||
socket_send(Socket, Frame).
|
||||
amqp10_client_socket:send(Socket, Frame).
|
||||
|
||||
send_heartbeat(#state{socket = Socket}) ->
|
||||
Frame = amqp10_binary_generator:build_heartbeat_frame(),
|
||||
socket_send(Socket, Frame).
|
||||
|
||||
socket_send({tcp, Socket}, Data) ->
|
||||
gen_tcp:send(Socket, Data);
|
||||
socket_send({ssl, Socket}, Data) ->
|
||||
ssl:send(Socket, Data).
|
||||
|
||||
socket_shutdown({tcp, Socket}, How) ->
|
||||
gen_tcp:shutdown(Socket, How);
|
||||
socket_shutdown({ssl, Socket}, How) ->
|
||||
ssl:shutdown(Socket, How).
|
||||
amqp10_client_socket:send(Socket, Frame).
|
||||
|
||||
notify_opened(#{notify_when_opened := none}, _) ->
|
||||
ok;
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
%% API
|
||||
-export([start_link/2,
|
||||
set_connection/2,
|
||||
close/1,
|
||||
register_session/3,
|
||||
unregister_session/4]).
|
||||
|
||||
|
@ -29,11 +28,6 @@
|
|||
code_change/4,
|
||||
terminate/3]).
|
||||
|
||||
-define(RABBIT_TCP_OPTS, [binary,
|
||||
{packet, 0},
|
||||
{active, false},
|
||||
{nodelay, true}]).
|
||||
|
||||
-type frame_type() :: amqp | sasl.
|
||||
|
||||
-record(frame_state,
|
||||
|
@ -44,7 +38,7 @@
|
|||
|
||||
-record(state,
|
||||
{connection_sup :: pid(),
|
||||
socket :: amqp10_client_connection:amqp10_socket() | undefined,
|
||||
socket :: amqp10_client_socket:socket() | closed,
|
||||
buffer = <<>> :: binary(),
|
||||
frame_state :: #frame_state{} | undefined,
|
||||
connection :: pid() | undefined,
|
||||
|
@ -72,9 +66,6 @@ start_link(Sup, Config) ->
|
|||
set_connection(Reader, Connection) ->
|
||||
gen_statem:cast(Reader, {set_connection, Connection}).
|
||||
|
||||
close(Reader) ->
|
||||
gen_statem:cast(Reader, close).
|
||||
|
||||
register_session(Reader, Session, OutgoingChannel) ->
|
||||
gen_statem:cast(Reader, {register_session, Session, OutgoingChannel}).
|
||||
|
||||
|
@ -97,34 +88,19 @@ init([Sup, ConnConfig]) when is_map(ConnConfig) ->
|
|||
Address -> Addresses0 ++ [Address]
|
||||
end,
|
||||
case connect_any(Addresses, Port, ConnConfig) of
|
||||
{error, Reason} ->
|
||||
{stop, Reason};
|
||||
Socket ->
|
||||
State = #state{connection_sup = Sup, socket = Socket,
|
||||
{ok, Socket} ->
|
||||
State = #state{connection_sup = Sup,
|
||||
socket = Socket,
|
||||
connection_config = ConnConfig},
|
||||
{ok, expecting_connection_pid, State}
|
||||
end.
|
||||
|
||||
connect(Address, Port, #{tls_opts := {secure_port, Opts0}}) ->
|
||||
Opts = rabbit_ssl_options:fix_client(Opts0),
|
||||
case ssl:connect(Address, Port, ?RABBIT_TCP_OPTS ++ Opts) of
|
||||
{ok, S} ->
|
||||
{ssl, S};
|
||||
Err ->
|
||||
Err
|
||||
end;
|
||||
connect(Address, Port, _) ->
|
||||
case gen_tcp:connect(Address, Port, ?RABBIT_TCP_OPTS) of
|
||||
{ok, S} ->
|
||||
{tcp, S};
|
||||
Err ->
|
||||
Err
|
||||
{ok, expecting_connection_pid, State};
|
||||
{error, Reason} ->
|
||||
{stop, Reason}
|
||||
end.
|
||||
|
||||
connect_any([Address], Port, ConnConfig) ->
|
||||
connect(Address, Port, ConnConfig);
|
||||
amqp10_client_socket:connect(Address, Port, ConnConfig);
|
||||
connect_any([Address | Addresses], Port, ConnConfig) ->
|
||||
case connect(Address, Port, ConnConfig) of
|
||||
case amqp10_client_socket:connect(Address, Port, ConnConfig) of
|
||||
{error, _} ->
|
||||
connect_any(Addresses, Port, ConnConfig);
|
||||
R ->
|
||||
|
@ -134,7 +110,7 @@ connect_any([Address | Addresses], Port, ConnConfig) ->
|
|||
handle_event(cast, {set_connection, ConnectionPid}, expecting_connection_pid,
|
||||
State=#state{socket = Socket}) ->
|
||||
ok = amqp10_client_connection:socket_ready(ConnectionPid, Socket),
|
||||
set_active_once(State),
|
||||
amqp10_client_socket:set_active_once(Socket),
|
||||
State1 = State#state{connection = ConnectionPid},
|
||||
{next_state, expecting_frame_header, State1};
|
||||
handle_event(cast, {register_session, Session, OutgoingChannel}, _StateName,
|
||||
|
@ -151,41 +127,47 @@ handle_event(cast, {unregister_session, _Session, OutgoingChannel, IncomingChann
|
|||
State1 = State#state{outgoing_channels = OutgoingChannels1,
|
||||
incoming_channels = IncomingChannels1},
|
||||
{keep_state, State1};
|
||||
handle_event(cast, close, _StateName, State = #state{socket = Socket}) ->
|
||||
_ = close_socket(Socket),
|
||||
{stop, normal, State#state{socket = undefined}};
|
||||
|
||||
handle_event({call, From}, _Action, _State, _Data) ->
|
||||
{keep_state_and_data, [{reply, From, ok}]};
|
||||
|
||||
handle_event(info, {Tcp, _, Packet}, StateName, #state{buffer = Buffer} = State)
|
||||
handle_event(info, {Tcp, _Sock, Packet}, StateName, State)
|
||||
when Tcp == tcp orelse Tcp == ssl ->
|
||||
Data = <<Buffer/binary, Packet/binary>>,
|
||||
case handle_input(StateName, Data, State) of
|
||||
{ok, NextState, Remaining, NewState0} ->
|
||||
NewState = defer_heartbeat_timer(NewState0),
|
||||
set_active_once(NewState),
|
||||
{next_state, NextState, NewState#state{buffer = Remaining}};
|
||||
{error, Reason, NewState} ->
|
||||
{stop, Reason, NewState}
|
||||
handle_socket_input(Packet, StateName, State);
|
||||
handle_event(info, {gun_ws, WsPid, StreamRef, WsFrame}, StateName,
|
||||
#state{socket = {ws, WsPid, StreamRef}} = State) ->
|
||||
case WsFrame of
|
||||
{binary, Bin} ->
|
||||
handle_socket_input(Bin, StateName, State);
|
||||
close ->
|
||||
logger:info("peer closed AMQP over WebSocket connection in state '~s'",
|
||||
[StateName]),
|
||||
{stop, normal, socket_closed(State)};
|
||||
{close, ReasonStatusCode, ReasonUtf8} ->
|
||||
logger:info("peer closed AMQP over WebSocket connection in state '~s', reason: ~b ~ts",
|
||||
[StateName, ReasonStatusCode, ReasonUtf8]),
|
||||
{stop, {shutdown, {ReasonStatusCode, ReasonUtf8}}, socket_closed(State)}
|
||||
end;
|
||||
|
||||
handle_event(info, {TcpError, _, Reason}, StateName, State)
|
||||
handle_event(info, {TcpError, _Sock, Reason}, StateName, State)
|
||||
when TcpError == tcp_error orelse TcpError == ssl_error ->
|
||||
logger:warning("AMQP 1.0 connection socket errored, connection state: '~ts', reason: '~tp'",
|
||||
[StateName, Reason]),
|
||||
State1 = State#state{socket = undefined,
|
||||
buffer = <<>>,
|
||||
frame_state = undefined},
|
||||
{stop, {error, Reason}, State1};
|
||||
{stop, {error, Reason}, socket_closed(State)};
|
||||
handle_event(info, {TcpClosed, _}, StateName, State)
|
||||
when TcpClosed == tcp_closed orelse TcpClosed == ssl_closed ->
|
||||
logger:warning("AMQP 1.0 connection socket was closed, connection state: '~ts'",
|
||||
logger:info("AMQP 1.0 connection socket was closed, connection state: '~ts'",
|
||||
[StateName]),
|
||||
State1 = State#state{socket = undefined,
|
||||
buffer = <<>>,
|
||||
frame_state = undefined},
|
||||
{stop, normal, State1};
|
||||
{stop, normal, socket_closed(State)};
|
||||
handle_event(info, {gun_down, WsPid, _Proto, Reason, _Streams}, StateName,
|
||||
#state{socket = {ws, WsPid, _StreamRef}} = State) ->
|
||||
logger:warning("AMQP over WebSocket process ~p lost connection in state: '~s': ~p",
|
||||
[WsPid, StateName, Reason]),
|
||||
{stop, Reason, socket_closed(State)};
|
||||
handle_event(info, {'DOWN', _Mref, process, WsPid, Reason}, StateName,
|
||||
#state{socket = {ws, WsPid, _StreamRef}} = State) ->
|
||||
logger:warning("AMQP over WebSocket process ~p terminated in state: '~s': ~p",
|
||||
[WsPid, StateName, Reason]),
|
||||
{stop, Reason, socket_closed(State)};
|
||||
|
||||
handle_event(info, heartbeat, _StateName, #state{connection = Connection}) ->
|
||||
amqp10_client_connection:close(Connection,
|
||||
|
@ -193,10 +175,8 @@ handle_event(info, heartbeat, _StateName, #state{connection = Connection}) ->
|
|||
% do not stop as may want to read the peer's close frame
|
||||
keep_state_and_data.
|
||||
|
||||
terminate(normal, _StateName, #state{connection_sup = _Sup, socket = Socket}) ->
|
||||
maybe_close_socket(Socket);
|
||||
terminate(_Reason, _StateName, #state{connection_sup = _Sup, socket = Socket}) ->
|
||||
maybe_close_socket(Socket).
|
||||
terminate(_Reason, _StateName, #state{socket = Socket}) ->
|
||||
close_socket(Socket).
|
||||
|
||||
code_change(_Vsn, State, Data, _Extra) ->
|
||||
{ok, State, Data}.
|
||||
|
@ -205,20 +185,27 @@ code_change(_Vsn, State, Data, _Extra) ->
|
|||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
|
||||
maybe_close_socket(undefined) ->
|
||||
socket_closed(State) ->
|
||||
State#state{socket = closed,
|
||||
buffer = <<>>,
|
||||
frame_state = undefined}.
|
||||
|
||||
close_socket(closed) ->
|
||||
ok;
|
||||
maybe_close_socket(Socket) ->
|
||||
close_socket(Socket).
|
||||
close_socket(Socket) ->
|
||||
amqp10_client_socket:close(Socket).
|
||||
|
||||
close_socket({tcp, Socket}) ->
|
||||
gen_tcp:close(Socket);
|
||||
close_socket({ssl, Socket}) ->
|
||||
ssl:close(Socket).
|
||||
|
||||
set_active_once(#state{socket = {tcp, Socket}}) ->
|
||||
ok = inet:setopts(Socket, [{active, once}]);
|
||||
set_active_once(#state{socket = {ssl, Socket}}) ->
|
||||
ok = ssl:setopts(Socket, [{active, once}]).
|
||||
handle_socket_input(Input, StateName, #state{socket = Socket,
|
||||
buffer = Buffer} = State0) ->
|
||||
Data = <<Buffer/binary, Input/binary>>,
|
||||
case handle_input(StateName, Data, State0) of
|
||||
{ok, NextStateName, Remaining, State1} ->
|
||||
State = defer_heartbeat_timer(State1),
|
||||
amqp10_client_socket:set_active_once(Socket),
|
||||
{next_state, NextStateName, State#state{buffer = Remaining}};
|
||||
{error, Reason, State} ->
|
||||
{stop, Reason, State}
|
||||
end.
|
||||
|
||||
handle_input(expecting_frame_header,
|
||||
<<"AMQP", Protocol/unsigned, Maj/unsigned, Min/unsigned,
|
||||
|
|
|
@ -146,7 +146,7 @@
|
|||
remote_outgoing_window = 0 :: non_neg_integer(),
|
||||
|
||||
reader :: pid(),
|
||||
socket :: amqp10_client_connection:amqp10_socket() | undefined,
|
||||
socket :: amqp10_client_socket:socket() | undefined,
|
||||
links = #{} :: #{output_handle() => #link{}},
|
||||
link_index = #{} :: #{{link_role(), link_name()} => output_handle()},
|
||||
link_handle_index = #{} :: #{input_handle() => output_handle()},
|
||||
|
@ -222,7 +222,7 @@ disposition(#link_ref{role = receiver,
|
|||
start_link(From, Channel, Reader, ConnConfig) ->
|
||||
gen_statem:start_link(?MODULE, [From, Channel, Reader, ConnConfig], []).
|
||||
|
||||
-spec socket_ready(pid(), amqp10_client_connection:amqp10_socket()) -> ok.
|
||||
-spec socket_ready(pid(), amqp10_client_socket:socket()) -> ok.
|
||||
socket_ready(Pid, Socket) ->
|
||||
gen_statem:cast(Pid, {socket_ready, Socket}).
|
||||
|
||||
|
@ -1163,8 +1163,9 @@ amqp10_session_event(Evt) ->
|
|||
{amqp10_event, {session, self(), Evt}}.
|
||||
|
||||
socket_send(Sock, Data) ->
|
||||
case socket_send0(Sock, Data) of
|
||||
ok -> ok;
|
||||
case amqp10_client_socket:send(Sock, Data) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, _Reason} ->
|
||||
throw({stop, normal})
|
||||
end.
|
||||
|
@ -1175,12 +1176,6 @@ notify_credit_exhausted(Link = #link{auto_flow = never}) ->
|
|||
notify_credit_exhausted(_Link) ->
|
||||
ok.
|
||||
|
||||
-dialyzer({no_fail_call, socket_send0/2}).
|
||||
socket_send0({tcp, Socket}, Data) ->
|
||||
gen_tcp:send(Socket, Data);
|
||||
socket_send0({ssl, Socket}, Data) ->
|
||||
ssl:send(Socket, Data).
|
||||
|
||||
-spec make_link_ref(link_role(), pid(), output_handle()) ->
|
||||
link_ref().
|
||||
make_link_ref(Role, Session, Handle) ->
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
%% This Source Code Form is subject to the terms of the Mozilla Public
|
||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
%%
|
||||
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
||||
%%
|
||||
-module(amqp10_client_socket).
|
||||
|
||||
-feature(maybe_expr, enable).
|
||||
|
||||
-export([connect/3,
|
||||
set_active_once/1,
|
||||
send/2,
|
||||
close/1]).
|
||||
|
||||
-type socket() :: {tcp, gen_tcp:socket()} |
|
||||
{ssl, ssl:sslsocket()} |
|
||||
{ws, pid(), gun:stream_ref()}.
|
||||
|
||||
-export_type([socket/0]).
|
||||
|
||||
-define(TCP_OPTS, [binary,
|
||||
{packet, 0},
|
||||
{active, false},
|
||||
{nodelay, true}]).
|
||||
|
||||
-spec connect(inet:hostname() | inet:ip_address(),
|
||||
inet:port_number(),
|
||||
amqp10_client_connection:connection_config()) ->
|
||||
{ok, socket()} | {error, any()}.
|
||||
connect(Host, Port, #{ws_path := Path} = Opts) ->
|
||||
GunOpts = maps:merge(#{tcp_opts => [{nodelay, true}]},
|
||||
maps:get(ws_opts, Opts, #{})),
|
||||
maybe
|
||||
{ok, _Started} ?= application:ensure_all_started(gun),
|
||||
{ok, Pid} ?= gun:open(Host, Port, GunOpts),
|
||||
MRef = monitor(process, Pid),
|
||||
{ok, _HttpVsn} ?= gun:await_up(Pid, MRef),
|
||||
{ok, StreamRef} ?= ws_upgrade(Pid, Path),
|
||||
{ok, {ws, Pid, StreamRef}}
|
||||
end;
|
||||
connect(Host, Port, #{tls_opts := {secure_port, Opts0}}) ->
|
||||
Opts = rabbit_ssl_options:fix_client(Opts0),
|
||||
case ssl:connect(Host, Port, ?TCP_OPTS ++ Opts) of
|
||||
{ok, S} ->
|
||||
{ok, {ssl, S}};
|
||||
Err ->
|
||||
Err
|
||||
end;
|
||||
connect(Host, Port, _) ->
|
||||
case gen_tcp:connect(Host, Port, ?TCP_OPTS) of
|
||||
{ok, S} ->
|
||||
{ok, {tcp, S}};
|
||||
Err ->
|
||||
Err
|
||||
end.
|
||||
|
||||
ws_upgrade(Pid, Path) ->
|
||||
StreamRef = gun:ws_upgrade(Pid,
|
||||
Path,
|
||||
[{<<"cache-control">>, <<"no-cache">>}],
|
||||
#{protocols => [{<<"amqp">>, gun_ws_h}]}),
|
||||
receive
|
||||
{gun_upgrade, Pid, StreamRef, [<<"websocket">>], _Headers} ->
|
||||
{ok, StreamRef};
|
||||
{gun_response, Pid, _, _, Status, Headers} ->
|
||||
{error, {ws_upgrade, Status, Headers}};
|
||||
{gun_error, Pid, StreamRef, Reason} ->
|
||||
{error, {ws_upgrade, Reason}}
|
||||
after 5000 ->
|
||||
{error, {ws_upgrade, timeout}}
|
||||
end.
|
||||
|
||||
-spec set_active_once(socket()) -> ok.
|
||||
set_active_once({tcp, Sock}) ->
|
||||
ok = inet:setopts(Sock, [{active, once}]);
|
||||
set_active_once({ssl, Sock}) ->
|
||||
ok = ssl:setopts(Sock, [{active, once}]);
|
||||
set_active_once({ws, _Pid, _Ref}) ->
|
||||
%% Gun also has an active-like mode via the flow option and gun:update_flow.
|
||||
%% It will even make Gun stop reading from the socket if flow is zero.
|
||||
%% If needed, we can make use of it in future.
|
||||
ok.
|
||||
|
||||
-spec send(socket(), iodata()) ->
|
||||
ok | {error, any()}.
|
||||
send({tcp, Socket}, Data) ->
|
||||
gen_tcp:send(Socket, Data);
|
||||
send({ssl, Socket}, Data) ->
|
||||
ssl:send(Socket, Data);
|
||||
send({ws, Pid, Ref}, Data) ->
|
||||
gun:ws_send(Pid, Ref, {binary, Data}).
|
||||
|
||||
-spec close(socket()) ->
|
||||
ok | {error, any()}.
|
||||
close({tcp, Socket}) ->
|
||||
gen_tcp:close(Socket);
|
||||
close({ssl, Socket}) ->
|
||||
ssl:close(Socket);
|
||||
close({ws, Pid, _Ref}) ->
|
||||
gun:shutdown(Pid).
|
Loading…
Reference in New Issue