From 5a467934e394a32d2ea23420aa8dea7984686af2 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Wed, 18 Dec 2024 17:14:54 +0100 Subject: [PATCH] Support AMQP over WebSocket in Erlang client ## What? Implement the AMQP over WebSocket Binding Committee Specification 01 in the AMQP 1.0 Erlang client: https://docs.oasis-open.org/amqp-bindmap/amqp-wsb/v1.0/cs01/amqp-wsb-v1.0-cs01.html ## Why? 1. This allows writing integration tests for the server implementation of AMQP over WebSocket. 2. Erlang and Elixir clients can use AMQP over WebSocket in environments where firewalls prohibit access to the AMQP port. ## How? Use gun as WebSocket client. The new module `amqp10_client_socket` handles socket operations (open, close, send) for: * TCP sockets * SSL sockets * WebSockets Prior to this commit, the amqp10_client_connection process closed only the write end of the socket after it sent the AMQP close performative. This commit removed premature socket closure because: 1. There is no equivalent feature provided in Gun since sending a WebSocket close frame causes Gun to cleanly close the connection for both writing and reading. 2. It's unnecessary and can result in unexpected and confusing behaviour on the server. 3. It's better practive to keep the TCP connection fully open until the AMQP closing handshake completes. 4. When amqp10_client_frame_reader terminates, it will cleanly close the socket for both writing and reading. --- deps/amqp10_client/Makefile | 2 +- .../src/amqp10_client_connection.erl | 107 ++++++------- .../src/amqp10_client_frame_reader.erl | 147 ++++++++---------- .../src/amqp10_client_session.erl | 15 +- .../src/amqp10_client_socket.erl | 101 ++++++++++++ 5 files changed, 225 insertions(+), 147 deletions(-) create mode 100644 deps/amqp10_client/src/amqp10_client_socket.erl diff --git a/deps/amqp10_client/Makefile b/deps/amqp10_client/Makefile index ceb96f3825..e080eb583d 100644 --- a/deps/amqp10_client/Makefile +++ b/deps/amqp10_client/Makefile @@ -29,7 +29,7 @@ endef PACKAGES_DIR ?= $(abspath PACKAGES) BUILD_DEPS = rabbit_common elvis_mk -DEPS = amqp10_common credentials_obfuscation +DEPS = amqp10_common credentials_obfuscation gun TEST_DEPS = rabbit rabbitmq_ct_helpers LOCAL_DEPS = ssl inets crypto public_key diff --git a/deps/amqp10_client/src/amqp10_client_connection.erl b/deps/amqp10_client/src/amqp10_client_connection.erl index 0ba172ffcb..764846a21a 100644 --- a/deps/amqp10_client/src/amqp10_client_connection.erl +++ b/deps/amqp10_client/src/amqp10_client_connection.erl @@ -43,8 +43,6 @@ -export([format_status/1]). --type amqp10_socket() :: {tcp, gen_tcp:socket()} | {ssl, ssl:sslsocket()}. - -type milliseconds() :: non_neg_integer(). -type address() :: inet:socket_address() | inet:hostname(). @@ -60,6 +58,8 @@ address => address(), port => inet:port_number(), tls_opts => {secure_port, [ssl:tls_option()]}, + ws_path => string(), + ws_opts => gun:opts(), notify => pid() | none, % the pid to send connection events to notify_when_opened => pid() | none, notify_when_closed => pid() | none, @@ -83,14 +83,13 @@ sessions_sup :: pid() | undefined, pending_session_reqs = [] :: [term()], reader :: pid() | undefined, - socket :: amqp10_socket() | undefined, + socket :: amqp10_client_socket:socket() | undefined, idle_time_out :: non_neg_integer() | undefined, heartbeat_timer :: timer:tref() | undefined, config :: connection_config() }). --export_type([connection_config/0, - amqp10_socket/0]). +-export_type([connection_config/0]). %% ------------------------------------------------------------------- %% Public API. @@ -152,7 +151,7 @@ start_link(Sup, Config) -> set_other_procs(Pid, OtherProcs) -> gen_statem:cast(Pid, {set_other_procs, OtherProcs}). --spec socket_ready(pid(), amqp10_socket()) -> ok. +-spec socket_ready(pid(), amqp10_client_socket:socket()) -> ok. socket_ready(Pid, Socket) -> gen_statem:cast(Pid, {socket_ready, Socket}). @@ -186,10 +185,10 @@ expecting_socket(_EvtType, {socket_ready, Socket}, Sasl = credentials_obfuscation:decrypt(maps:get(sasl, Cfg)), case Sasl of none -> - ok = socket_send(Socket, ?AMQP_PROTOCOL_HEADER), + ok = amqp10_client_socket:send(Socket, ?AMQP_PROTOCOL_HEADER), {next_state, hdr_sent, State1}; _ -> - ok = socket_send(Socket, ?SASL_PROTOCOL_HEADER), + ok = amqp10_client_socket:send(Socket, ?SASL_PROTOCOL_HEADER), {next_state, sasl_hdr_sent, State1} end; expecting_socket(_EvtType, {set_other_procs, OtherProcs}, State) -> @@ -205,9 +204,12 @@ expecting_socket({call, From}, begin_session, sasl_hdr_sent(_EvtType, {protocol_header_received, 3, 1, 0, 0}, State) -> {next_state, sasl_hdr_rcvds, State}; sasl_hdr_sent({call, From}, begin_session, - #state{pending_session_reqs = PendingSessionReqs} = State) -> + #state{pending_session_reqs = PendingSessionReqs} = State) -> State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, - {keep_state, State1}. + {keep_state, State1}; +sasl_hdr_sent(info, {'DOWN', MRef, process, _Pid, _}, + #state{reader_m_ref = MRef}) -> + {stop, {shutdown, reader_down}}. sasl_hdr_rcvds(_EvtType, #'v1_0.sasl_mechanisms'{ sasl_server_mechanisms = {array, symbol, AvailableMechs}}, @@ -228,7 +230,7 @@ sasl_hdr_rcvds({call, From}, begin_session, sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, 0}}, #state{socket = Socket} = State) -> - ok = socket_send(Socket, ?AMQP_PROTOCOL_HEADER), + ok = amqp10_client_socket:send(Socket, ?AMQP_PROTOCOL_HEADER), {next_state, hdr_sent, State}; sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, C}}, #state{} = State) when C==1;C==2;C==3;C==4 -> @@ -285,7 +287,7 @@ open_sent({call, From}, begin_session, #state{pending_session_reqs = PendingSessionReqs} = State) -> State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, {keep_state, State1}; -open_sent(info, {'DOWN', MRef, _, _, _}, +open_sent(info, {'DOWN', MRef, process, _, _}, #state{reader_m_ref = MRef}) -> {stop, {shutdown, reader_down}}. @@ -294,46 +296,56 @@ opened(_EvtType, heartbeat, State = #state{idle_time_out = T}) -> {ok, Tmr} = start_heartbeat_timer(T), {keep_state, State#state{heartbeat_timer = Tmr}}; opened(_EvtType, {close, Reason}, State) -> - %% We send the first close frame and wait for the reply. %% TODO: stop all sessions writing %% We could still accept incoming frames (See: 2.4.6) case send_close(State, Reason) of - ok -> {next_state, close_sent, State}; - {error, closed} -> {stop, normal, State}; - Error -> {stop, Error, State} + ok -> + %% "After writing this frame the peer SHOULD continue to read from the connection + %% until it receives the partner's close frame (in order to guard against + %% erroneously or maliciously implemented partners, a peer SHOULD implement a + %% timeout to give its partner a reasonable time to receive and process the close + %% before giving up and simply closing the underlying transport mechanism)." [§2.4.3] + {next_state, close_sent, State, {state_timeout, ?TIMEOUT, received_no_close_frame}}; + {error, closed} -> + {stop, normal, State}; + Error -> + {stop, Error, State} end; opened(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) -> %% We receive the first close frame, reply and terminate. ok = notify_closed(Config, Close), - _ = send_close(State, none), - {stop, normal, State}; + case send_close(State, none) of + ok -> {stop, normal, State}; + {error, closed} -> {stop, normal, State}; + Error -> {stop, Error, State} + end; opened({call, From}, begin_session, State) -> {Ret, State1} = handle_begin_session(From, State), {keep_state, State1, [{reply, From, Ret}]}; -opened(info, {'DOWN', MRef, _, _, _Info}, - State = #state{reader_m_ref = MRef, config = Config}) -> +opened(info, {'DOWN', MRef, process, _, _Info}, + #state{reader_m_ref = MRef, config = Config}) -> %% reader has gone down and we are not already shutting down ok = notify_closed(Config, shutdown), - {stop, normal, State}; + {stop, normal}; opened(_EvtType, Frame, State) -> logger:warning("Unexpected connection frame ~tp when in state ~tp ", - [Frame, State]), - {keep_state, State}. + [Frame, State]), + keep_state_and_data. -close_sent(_EvtType, heartbeat, State) -> - {next_state, close_sent, State}; -close_sent(_EvtType, {'EXIT', _Pid, shutdown}, State) -> +close_sent(_EvtType, heartbeat, _Data) -> + keep_state_and_data; +close_sent(_EvtType, {'EXIT', _Pid, shutdown}, _Data) -> %% monitored processes may exit during closure - {next_state, close_sent, State}; -close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _}, - #state{reader = ReaderPid} = State) -> - %% if the reader exits we probably wont receive a close frame - {stop, normal, State}; -close_sent(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) -> + keep_state_and_data; +close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _Reason}, + #state{reader = ReaderPid}) -> + %% if the reader exits we probably won't receive a close frame + {stop, normal}; +close_sent(_EvtType, #'v1_0.close'{} = Close, #state{config = Config}) -> ok = notify_closed(Config, Close), - %% TODO: we should probably set up a timer before this to ensure - %% we close down event if no reply is received - {stop, normal, State}. + {stop, normal}; +close_sent(state_timeout, received_no_close_frame, _Data) -> + {stop, normal}. set_other_procs0(OtherProcs, State) -> #{sessions_sup := SessionsSup, @@ -435,7 +447,7 @@ send_open(#state{socket = Socket, config = Config0}) -> Encoded = amqp10_framing:encode_bin(Open), Frame = amqp10_binary_generator:build_frame(0, Encoded), ?DBG("CONN <- ~tp", [Open]), - socket_send(Socket, Frame). + amqp10_client_socket:send(Socket, Frame). send_close(#state{socket = Socket}, _Reason) -> @@ -443,14 +455,7 @@ send_close(#state{socket = Socket}, _Reason) -> Encoded = amqp10_framing:encode_bin(Close), Frame = amqp10_binary_generator:build_frame(0, Encoded), ?DBG("CONN <- ~tp", [Close]), - Ret = socket_send(Socket, Frame), - case Ret of - ok -> _ = - socket_shutdown(Socket, write), - ok; - _ -> ok - end, - Ret. + amqp10_client_socket:send(Socket, Frame). send_sasl_init(State, anon) -> Frame = #'v1_0.sasl_init'{mechanism = {symbol, <<"ANONYMOUS">>}}, @@ -474,21 +479,11 @@ send(Record, FrameType, #state{socket = Socket}) -> Encoded = amqp10_framing:encode_bin(Record), Frame = amqp10_binary_generator:build_frame(0, FrameType, Encoded), ?DBG("CONN <- ~tp", [Record]), - socket_send(Socket, Frame). + amqp10_client_socket:send(Socket, Frame). send_heartbeat(#state{socket = Socket}) -> Frame = amqp10_binary_generator:build_heartbeat_frame(), - socket_send(Socket, Frame). - -socket_send({tcp, Socket}, Data) -> - gen_tcp:send(Socket, Data); -socket_send({ssl, Socket}, Data) -> - ssl:send(Socket, Data). - -socket_shutdown({tcp, Socket}, How) -> - gen_tcp:shutdown(Socket, How); -socket_shutdown({ssl, Socket}, How) -> - ssl:shutdown(Socket, How). + amqp10_client_socket:send(Socket, Frame). notify_opened(#{notify_when_opened := none}, _) -> ok; diff --git a/deps/amqp10_client/src/amqp10_client_frame_reader.erl b/deps/amqp10_client/src/amqp10_client_frame_reader.erl index 5c4c7c74d7..c54fa9aadd 100644 --- a/deps/amqp10_client/src/amqp10_client_frame_reader.erl +++ b/deps/amqp10_client/src/amqp10_client_frame_reader.erl @@ -18,7 +18,6 @@ %% API -export([start_link/2, set_connection/2, - close/1, register_session/3, unregister_session/4]). @@ -29,11 +28,6 @@ code_change/4, terminate/3]). --define(RABBIT_TCP_OPTS, [binary, - {packet, 0}, - {active, false}, - {nodelay, true}]). - -type frame_type() :: amqp | sasl. -record(frame_state, @@ -44,7 +38,7 @@ -record(state, {connection_sup :: pid(), - socket :: amqp10_client_connection:amqp10_socket() | undefined, + socket :: amqp10_client_socket:socket() | closed, buffer = <<>> :: binary(), frame_state :: #frame_state{} | undefined, connection :: pid() | undefined, @@ -72,9 +66,6 @@ start_link(Sup, Config) -> set_connection(Reader, Connection) -> gen_statem:cast(Reader, {set_connection, Connection}). -close(Reader) -> - gen_statem:cast(Reader, close). - register_session(Reader, Session, OutgoingChannel) -> gen_statem:cast(Reader, {register_session, Session, OutgoingChannel}). @@ -97,44 +88,29 @@ init([Sup, ConnConfig]) when is_map(ConnConfig) -> Address -> Addresses0 ++ [Address] end, case connect_any(Addresses, Port, ConnConfig) of - {error, Reason} -> - {stop, Reason}; - Socket -> - State = #state{connection_sup = Sup, socket = Socket, + {ok, Socket} -> + State = #state{connection_sup = Sup, + socket = Socket, connection_config = ConnConfig}, - {ok, expecting_connection_pid, State} - end. - -connect(Address, Port, #{tls_opts := {secure_port, Opts0}}) -> - Opts = rabbit_ssl_options:fix_client(Opts0), - case ssl:connect(Address, Port, ?RABBIT_TCP_OPTS ++ Opts) of - {ok, S} -> - {ssl, S}; - Err -> - Err - end; -connect(Address, Port, _) -> - case gen_tcp:connect(Address, Port, ?RABBIT_TCP_OPTS) of - {ok, S} -> - {tcp, S}; - Err -> - Err + {ok, expecting_connection_pid, State}; + {error, Reason} -> + {stop, Reason} end. connect_any([Address], Port, ConnConfig) -> - connect(Address, Port, ConnConfig); + amqp10_client_socket:connect(Address, Port, ConnConfig); connect_any([Address | Addresses], Port, ConnConfig) -> - case connect(Address, Port, ConnConfig) of - {error, _} -> - connect_any(Addresses, Port, ConnConfig); - R -> - R - end. + case amqp10_client_socket:connect(Address, Port, ConnConfig) of + {error, _} -> + connect_any(Addresses, Port, ConnConfig); + R -> + R + end. handle_event(cast, {set_connection, ConnectionPid}, expecting_connection_pid, State=#state{socket = Socket}) -> ok = amqp10_client_connection:socket_ready(ConnectionPid, Socket), - set_active_once(State), + amqp10_client_socket:set_active_once(Socket), State1 = State#state{connection = ConnectionPid}, {next_state, expecting_frame_header, State1}; handle_event(cast, {register_session, Session, OutgoingChannel}, _StateName, @@ -151,41 +127,47 @@ handle_event(cast, {unregister_session, _Session, OutgoingChannel, IncomingChann State1 = State#state{outgoing_channels = OutgoingChannels1, incoming_channels = IncomingChannels1}, {keep_state, State1}; -handle_event(cast, close, _StateName, State = #state{socket = Socket}) -> - _ = close_socket(Socket), - {stop, normal, State#state{socket = undefined}}; handle_event({call, From}, _Action, _State, _Data) -> {keep_state_and_data, [{reply, From, ok}]}; -handle_event(info, {Tcp, _, Packet}, StateName, #state{buffer = Buffer} = State) +handle_event(info, {Tcp, _Sock, Packet}, StateName, State) when Tcp == tcp orelse Tcp == ssl -> - Data = <>, - case handle_input(StateName, Data, State) of - {ok, NextState, Remaining, NewState0} -> - NewState = defer_heartbeat_timer(NewState0), - set_active_once(NewState), - {next_state, NextState, NewState#state{buffer = Remaining}}; - {error, Reason, NewState} -> - {stop, Reason, NewState} + handle_socket_input(Packet, StateName, State); +handle_event(info, {gun_ws, WsPid, StreamRef, WsFrame}, StateName, + #state{socket = {ws, WsPid, StreamRef}} = State) -> + case WsFrame of + {binary, Bin} -> + handle_socket_input(Bin, StateName, State); + close -> + logger:info("peer closed AMQP over WebSocket connection in state '~s'", + [StateName]), + {stop, normal, socket_closed(State)}; + {close, ReasonStatusCode, ReasonUtf8} -> + logger:info("peer closed AMQP over WebSocket connection in state '~s', reason: ~b ~ts", + [StateName, ReasonStatusCode, ReasonUtf8]), + {stop, {shutdown, {ReasonStatusCode, ReasonUtf8}}, socket_closed(State)} end; - -handle_event(info, {TcpError, _, Reason}, StateName, State) +handle_event(info, {TcpError, _Sock, Reason}, StateName, State) when TcpError == tcp_error orelse TcpError == ssl_error -> logger:warning("AMQP 1.0 connection socket errored, connection state: '~ts', reason: '~tp'", - [StateName, Reason]), - State1 = State#state{socket = undefined, - buffer = <<>>, - frame_state = undefined}, - {stop, {error, Reason}, State1}; + [StateName, Reason]), + {stop, {error, Reason}, socket_closed(State)}; handle_event(info, {TcpClosed, _}, StateName, State) when TcpClosed == tcp_closed orelse TcpClosed == ssl_closed -> - logger:warning("AMQP 1.0 connection socket was closed, connection state: '~ts'", - [StateName]), - State1 = State#state{socket = undefined, - buffer = <<>>, - frame_state = undefined}, - {stop, normal, State1}; + logger:info("AMQP 1.0 connection socket was closed, connection state: '~ts'", + [StateName]), + {stop, normal, socket_closed(State)}; +handle_event(info, {gun_down, WsPid, _Proto, Reason, _Streams}, StateName, + #state{socket = {ws, WsPid, _StreamRef}} = State) -> + logger:warning("AMQP over WebSocket process ~p lost connection in state: '~s': ~p", + [WsPid, StateName, Reason]), + {stop, Reason, socket_closed(State)}; +handle_event(info, {'DOWN', _Mref, process, WsPid, Reason}, StateName, + #state{socket = {ws, WsPid, _StreamRef}} = State) -> + logger:warning("AMQP over WebSocket process ~p terminated in state: '~s': ~p", + [WsPid, StateName, Reason]), + {stop, Reason, socket_closed(State)}; handle_event(info, heartbeat, _StateName, #state{connection = Connection}) -> amqp10_client_connection:close(Connection, @@ -193,10 +175,8 @@ handle_event(info, heartbeat, _StateName, #state{connection = Connection}) -> % do not stop as may want to read the peer's close frame keep_state_and_data. -terminate(normal, _StateName, #state{connection_sup = _Sup, socket = Socket}) -> - maybe_close_socket(Socket); -terminate(_Reason, _StateName, #state{connection_sup = _Sup, socket = Socket}) -> - maybe_close_socket(Socket). +terminate(_Reason, _StateName, #state{socket = Socket}) -> + close_socket(Socket). code_change(_Vsn, State, Data, _Extra) -> {ok, State, Data}. @@ -205,20 +185,27 @@ code_change(_Vsn, State, Data, _Extra) -> %%% Internal functions %%%=================================================================== -maybe_close_socket(undefined) -> +socket_closed(State) -> + State#state{socket = closed, + buffer = <<>>, + frame_state = undefined}. + +close_socket(closed) -> ok; -maybe_close_socket(Socket) -> - close_socket(Socket). +close_socket(Socket) -> + amqp10_client_socket:close(Socket). -close_socket({tcp, Socket}) -> - gen_tcp:close(Socket); -close_socket({ssl, Socket}) -> - ssl:close(Socket). - -set_active_once(#state{socket = {tcp, Socket}}) -> - ok = inet:setopts(Socket, [{active, once}]); -set_active_once(#state{socket = {ssl, Socket}}) -> - ok = ssl:setopts(Socket, [{active, once}]). +handle_socket_input(Input, StateName, #state{socket = Socket, + buffer = Buffer} = State0) -> + Data = <>, + case handle_input(StateName, Data, State0) of + {ok, NextStateName, Remaining, State1} -> + State = defer_heartbeat_timer(State1), + amqp10_client_socket:set_active_once(Socket), + {next_state, NextStateName, State#state{buffer = Remaining}}; + {error, Reason, State} -> + {stop, Reason, State} + end. handle_input(expecting_frame_header, <<"AMQP", Protocol/unsigned, Maj/unsigned, Min/unsigned, diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index be027e20c1..7b74180587 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -146,7 +146,7 @@ remote_outgoing_window = 0 :: non_neg_integer(), reader :: pid(), - socket :: amqp10_client_connection:amqp10_socket() | undefined, + socket :: amqp10_client_socket:socket() | undefined, links = #{} :: #{output_handle() => #link{}}, link_index = #{} :: #{{link_role(), link_name()} => output_handle()}, link_handle_index = #{} :: #{input_handle() => output_handle()}, @@ -222,7 +222,7 @@ disposition(#link_ref{role = receiver, start_link(From, Channel, Reader, ConnConfig) -> gen_statem:start_link(?MODULE, [From, Channel, Reader, ConnConfig], []). --spec socket_ready(pid(), amqp10_client_connection:amqp10_socket()) -> ok. +-spec socket_ready(pid(), amqp10_client_socket:socket()) -> ok. socket_ready(Pid, Socket) -> gen_statem:cast(Pid, {socket_ready, Socket}). @@ -1163,8 +1163,9 @@ amqp10_session_event(Evt) -> {amqp10_event, {session, self(), Evt}}. socket_send(Sock, Data) -> - case socket_send0(Sock, Data) of - ok -> ok; + case amqp10_client_socket:send(Sock, Data) of + ok -> + ok; {error, _Reason} -> throw({stop, normal}) end. @@ -1175,12 +1176,6 @@ notify_credit_exhausted(Link = #link{auto_flow = never}) -> notify_credit_exhausted(_Link) -> ok. --dialyzer({no_fail_call, socket_send0/2}). -socket_send0({tcp, Socket}, Data) -> - gen_tcp:send(Socket, Data); -socket_send0({ssl, Socket}, Data) -> - ssl:send(Socket, Data). - -spec make_link_ref(link_role(), pid(), output_handle()) -> link_ref(). make_link_ref(Role, Session, Handle) -> diff --git a/deps/amqp10_client/src/amqp10_client_socket.erl b/deps/amqp10_client/src/amqp10_client_socket.erl new file mode 100644 index 0000000000..d17167fbac --- /dev/null +++ b/deps/amqp10_client/src/amqp10_client_socket.erl @@ -0,0 +1,101 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% +-module(amqp10_client_socket). + +-feature(maybe_expr, enable). + +-export([connect/3, + set_active_once/1, + send/2, + close/1]). + +-type socket() :: {tcp, gen_tcp:socket()} | + {ssl, ssl:sslsocket()} | + {ws, pid(), gun:stream_ref()}. + +-export_type([socket/0]). + +-define(TCP_OPTS, [binary, + {packet, 0}, + {active, false}, + {nodelay, true}]). + +-spec connect(inet:hostname() | inet:ip_address(), + inet:port_number(), + amqp10_client_connection:connection_config()) -> + {ok, socket()} | {error, any()}. +connect(Host, Port, #{ws_path := Path} = Opts) -> + GunOpts = maps:merge(#{tcp_opts => [{nodelay, true}]}, + maps:get(ws_opts, Opts, #{})), + maybe + {ok, _Started} ?= application:ensure_all_started(gun), + {ok, Pid} ?= gun:open(Host, Port, GunOpts), + MRef = monitor(process, Pid), + {ok, _HttpVsn} ?= gun:await_up(Pid, MRef), + {ok, StreamRef} ?= ws_upgrade(Pid, Path), + {ok, {ws, Pid, StreamRef}} + end; +connect(Host, Port, #{tls_opts := {secure_port, Opts0}}) -> + Opts = rabbit_ssl_options:fix_client(Opts0), + case ssl:connect(Host, Port, ?TCP_OPTS ++ Opts) of + {ok, S} -> + {ok, {ssl, S}}; + Err -> + Err + end; +connect(Host, Port, _) -> + case gen_tcp:connect(Host, Port, ?TCP_OPTS) of + {ok, S} -> + {ok, {tcp, S}}; + Err -> + Err + end. + +ws_upgrade(Pid, Path) -> + StreamRef = gun:ws_upgrade(Pid, + Path, + [{<<"cache-control">>, <<"no-cache">>}], + #{protocols => [{<<"amqp">>, gun_ws_h}]}), + receive + {gun_upgrade, Pid, StreamRef, [<<"websocket">>], _Headers} -> + {ok, StreamRef}; + {gun_response, Pid, _, _, Status, Headers} -> + {error, {ws_upgrade, Status, Headers}}; + {gun_error, Pid, StreamRef, Reason} -> + {error, {ws_upgrade, Reason}} + after 5000 -> + {error, {ws_upgrade, timeout}} + end. + +-spec set_active_once(socket()) -> ok. +set_active_once({tcp, Sock}) -> + ok = inet:setopts(Sock, [{active, once}]); +set_active_once({ssl, Sock}) -> + ok = ssl:setopts(Sock, [{active, once}]); +set_active_once({ws, _Pid, _Ref}) -> + %% Gun also has an active-like mode via the flow option and gun:update_flow. + %% It will even make Gun stop reading from the socket if flow is zero. + %% If needed, we can make use of it in future. + ok. + +-spec send(socket(), iodata()) -> + ok | {error, any()}. +send({tcp, Socket}, Data) -> + gen_tcp:send(Socket, Data); +send({ssl, Socket}, Data) -> + ssl:send(Socket, Data); +send({ws, Pid, Ref}, Data) -> + gun:ws_send(Pid, Ref, {binary, Data}). + +-spec close(socket()) -> + ok | {error, any()}. +close({tcp, Socket}) -> + gen_tcp:close(Socket); +close({ssl, Socket}) -> + ssl:close(Socket); +close({ws, Pid, _Ref}) -> + gun:shutdown(Pid).