From 0587a18227e4ca7a31f4dcb38956441c23d93779 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 26 Sep 2025 11:04:13 +0200 Subject: [PATCH] AMQP1.0 client: Support HTTP/2 Websocket --- .../src/amqp10_client_socket.erl | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_socket.erl b/deps/amqp10_client/src/amqp10_client_socket.erl index d17167fbac..e31b0a884c 100644 --- a/deps/amqp10_client/src/amqp10_client_socket.erl +++ b/deps/amqp10_client/src/amqp10_client_socket.erl @@ -29,13 +29,27 @@ amqp10_client_connection:connection_config()) -> {ok, socket()} | {error, any()}. connect(Host, Port, #{ws_path := Path} = Opts) -> + GunOpts0 = maps:get(ws_opts, Opts, #{}), + HTTP2Opts = maps:get(http2_opts, GunOpts0, #{}), + GunOpts1 = GunOpts0#{http2_opts => HTTP2Opts#{notify_settings_changed => true}}, GunOpts = maps:merge(#{tcp_opts => [{nodelay, true}]}, - maps:get(ws_opts, Opts, #{})), + GunOpts1), maybe {ok, _Started} ?= application:ensure_all_started(gun), {ok, Pid} ?= gun:open(Host, Port, GunOpts), MRef = monitor(process, Pid), - {ok, _HttpVsn} ?= gun:await_up(Pid, MRef), + {ok, HttpVsn} ?= gun:await_up(Pid, MRef), + ok ?= case HttpVsn of + http -> + ok; + http2 -> + receive + {gun_notify, Pid, settings_changed, #{enable_connect_protocol := true}} -> + ok + after 5000 -> + {error, {ws_enable_connect_protocol, timeout}} + end + end, {ok, StreamRef} ?= ws_upgrade(Pid, Path), {ok, {ws, Pid, StreamRef}} end; @@ -97,5 +111,6 @@ close({tcp, Socket}) -> gen_tcp:close(Socket); close({ssl, Socket}) -> ssl:close(Socket); -close({ws, Pid, _Ref}) -> +close({ws, Pid, Ref}) -> + gun:ws_send(Pid, Ref, close), gun:shutdown(Pid).