AMQP1.0 client: Support HTTP/2 Websocket

This commit is contained in:
Loïc Hoguin 2025-09-26 11:04:13 +02:00
parent 2eb8ce5e0a
commit 0587a18227
No known key found for this signature in database
GPG Key ID: C69E26E3A9DF618F
1 changed files with 18 additions and 3 deletions

View File

@ -29,13 +29,27 @@
amqp10_client_connection:connection_config()) ->
{ok, socket()} | {error, any()}.
connect(Host, Port, #{ws_path := Path} = Opts) ->
GunOpts0 = maps:get(ws_opts, Opts, #{}),
HTTP2Opts = maps:get(http2_opts, GunOpts0, #{}),
GunOpts1 = GunOpts0#{http2_opts => HTTP2Opts#{notify_settings_changed => true}},
GunOpts = maps:merge(#{tcp_opts => [{nodelay, true}]},
maps:get(ws_opts, Opts, #{})),
GunOpts1),
maybe
{ok, _Started} ?= application:ensure_all_started(gun),
{ok, Pid} ?= gun:open(Host, Port, GunOpts),
MRef = monitor(process, Pid),
{ok, _HttpVsn} ?= gun:await_up(Pid, MRef),
{ok, HttpVsn} ?= gun:await_up(Pid, MRef),
ok ?= case HttpVsn of
http ->
ok;
http2 ->
receive
{gun_notify, Pid, settings_changed, #{enable_connect_protocol := true}} ->
ok
after 5000 ->
{error, {ws_enable_connect_protocol, timeout}}
end
end,
{ok, StreamRef} ?= ws_upgrade(Pid, Path),
{ok, {ws, Pid, StreamRef}}
end;
@ -97,5 +111,6 @@ close({tcp, Socket}) ->
gen_tcp:close(Socket);
close({ssl, Socket}) ->
ssl:close(Socket);
close({ws, Pid, _Ref}) ->
close({ws, Pid, Ref}) ->
gun:ws_send(Pid, Ref, close),
gun:shutdown(Pid).