Support AMQP over WebSocket (OSS part)

This commit is contained in:
David Ansari 2025-01-14 14:11:56 +01:00
parent 5a467934e3
commit 579c58603e
33 changed files with 379 additions and 594 deletions

View File

@ -5,13 +5,14 @@
[RabbitMQ](https://rabbitmq.com) is a [feature rich](https://www.rabbitmq.com/docs),
multi-protocol messaging and streaming broker. It supports:
* AMQP 0-9-1
* AMQP 1.0
* AMQP 0-9-1
* [RabbitMQ Stream Protocol](https://www.rabbitmq.com/docs/streams)
* MQTT 3.1, 3.1.1, and 5.0
* STOMP 1.0 through 1.2
* [MQTT over WebSockets](https://www.rabbitmq.com/docs/web-mqtt)
* [STOMP over WebSockets](https://www.rabbitmq.com/docs/web-stomp)
* [MQTT over WebSocket](https://www.rabbitmq.com/docs/web-mqtt)
* [STOMP over WebSocket](https://www.rabbitmq.com/docs/web-stomp)
* AMQP 1.0 over WebSocket (supported in [VMware Tanzu RabbitMQ](https://www.vmware.com/products/app-platform/tanzu-rabbitmq))
## Installation

View File

@ -43,14 +43,6 @@
node
] ++ ?AUTH_EVENT_KEYS).
-define(INFO_ITEMS,
[connection_state,
recv_oct,
recv_cnt,
send_oct,
send_cnt
] ++ ?ITEMS).
%% for rabbit_event connection_created
-define(CONNECTION_EVENT_KEYS,
[type,

View File

@ -0,0 +1,11 @@
-define(SIMPLE_METRICS, [pid,
recv_oct,
send_oct,
reductions]).
-define(OTHER_METRICS, [recv_cnt,
send_cnt,
send_pend,
state,
channels,
garbage_collection]).

View File

@ -0,0 +1,63 @@
%% same values as in rabbit_reader
-define(NORMAL_TIMEOUT, 3_000).
-define(CLOSING_TIMEOUT, 30_000).
-define(SILENT_CLOSE_DELAY, 3_000).
%% Allow for potentially large sets of tokens during the SASL exchange.
%% https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html#_Toc67999915
-define(INITIAL_MAX_FRAME_SIZE, 8192).
-type protocol() :: amqp | sasl.
-type channel_number() :: non_neg_integer().
-type callback() :: handshake |
{frame_header, protocol()} |
{frame_body, protocol(), DataOffset :: pos_integer(), channel_number()}.
-record(v1_connection,
{name :: binary(),
container_id = none :: none | binary(),
vhost = none :: none | rabbit_types:vhost(),
%% server host
host :: inet:ip_address() | inet:hostname(),
%% client host
peer_host :: inet:ip_address() | inet:hostname(),
%% server port
port :: inet:port_number(),
%% client port
peer_port :: inet:port_number(),
connected_at :: integer(),
user = unauthenticated :: unauthenticated | rabbit_types:user(),
timeout = ?NORMAL_TIMEOUT :: non_neg_integer(),
incoming_max_frame_size = ?INITIAL_MAX_FRAME_SIZE :: pos_integer(),
outgoing_max_frame_size = ?INITIAL_MAX_FRAME_SIZE :: unlimited | pos_integer(),
%% "Prior to any explicit negotiation, [...] the maximum channel number is 0." [2.4.1]
channel_max = 0 :: non_neg_integer(),
auth_mechanism = sasl_init_unprocessed :: sasl_init_unprocessed | {binary(), module()},
auth_state = unauthenticated :: term(),
credential_timer :: undefined | reference(),
properties :: undefined | {map, list(tuple())}
}).
-record(v1,
{parent :: pid(),
helper_sup :: pid(),
writer = none :: none | pid(),
heartbeater = none :: none | rabbit_heartbeat:heartbeaters(),
session_sup = none :: none | pid(),
websocket :: boolean(),
sock :: none | rabbit_net:socket(),
proxy_socket :: undefined | {rabbit_proxy_socket, any(), any()},
connection :: none | #v1_connection{},
connection_state :: waiting_amqp3100 | received_amqp3100 | waiting_sasl_init |
securing | waiting_amqp0100 | waiting_open | running |
closing | closed,
callback :: callback(),
recv_len = 8 :: non_neg_integer(),
pending_recv :: boolean(),
buf :: list(),
buf_len :: non_neg_integer(),
tracked_channels = maps:new() :: #{channel_number() => Session :: pid()},
stats_timer :: rabbit_event:state()
}).
-type state() :: #v1{}.

View File

@ -10,8 +10,9 @@
-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").
-include("rabbit_amqp_reader.hrl").
-include("rabbit_amqp.hrl").
-include("rabbit_amqp_metrics.hrl").
-include("rabbit_amqp_reader.hrl").
-export([init/1,
info/2,
@ -22,110 +23,38 @@
system_terminate/4,
system_code_change/4]).
-export([advertise_sasl_mechanism/1,
handle_input/2,
handle_other/2,
ensure_stats_timer/1]).
-import(rabbit_amqp_util, [protocol_error/3]).
%% same values as in rabbit_reader
-define(NORMAL_TIMEOUT, 3_000).
-define(CLOSING_TIMEOUT, 30_000).
-define(SILENT_CLOSE_DELAY, 3_000).
%% Allow for potentially large sets of tokens during the SASL exchange.
%% https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html#_Toc67999915
-define(INITIAL_MAX_FRAME_SIZE, 8192).
-type protocol() :: amqp | sasl.
-type channel_number() :: non_neg_integer().
-record(v1_connection,
{name :: binary(),
container_id :: none | binary(),
vhost :: none | rabbit_types:vhost(),
%% server host
host :: inet:ip_address() | inet:hostname(),
%% client host
peer_host :: inet:ip_address() | inet:hostname(),
%% server port
port :: inet:port_number(),
%% client port
peer_port :: inet:port_number(),
connected_at :: integer(),
user :: unauthenticated | rabbit_types:user(),
timeout :: non_neg_integer(),
incoming_max_frame_size :: pos_integer(),
outgoing_max_frame_size :: unlimited | pos_integer(),
channel_max :: non_neg_integer(),
auth_mechanism :: sasl_init_unprocessed | {binary(), module()},
auth_state :: term(),
credential_timer :: undefined | reference(),
properties :: undefined | {map, list(tuple())}
}).
-record(v1,
{
parent :: pid(),
helper_sup :: pid(),
writer :: none | pid(),
heartbeater :: none | rabbit_heartbeat:heartbeaters(),
session_sup :: rabbit_types:option(pid()),
sock :: rabbit_net:socket(),
proxy_socket :: undefined | {rabbit_proxy_socket, any(), any()},
connection :: #v1_connection{},
connection_state :: received_amqp3100 | waiting_sasl_init | securing |
waiting_amqp0100 | waiting_open | running |
closing | closed,
callback :: handshake |
{frame_header, protocol()} |
{frame_body, protocol(), DataOffset :: pos_integer(), channel_number()},
recv_len :: non_neg_integer(),
pending_recv :: boolean(),
buf :: list(),
buf_len :: non_neg_integer(),
tracked_channels :: #{channel_number() => Session :: pid()},
stats_timer :: rabbit_event:state()
}).
-type state() :: #v1{}.
-define(IS_RUNNING(State), State#v1.connection_state =:= running).
%%--------------------------------------------------------------------------
unpack_from_0_9_1(
{Sock, PendingRecv, SupPid, Buf, BufLen, ProxySocket,
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer},
Parent) ->
logger:update_process_metadata(#{connection => ConnectionName}),
#v1{parent = Parent,
websocket = false,
sock = Sock,
callback = {frame_header, sasl},
recv_len = 8,
pending_recv = PendingRecv,
heartbeater = none,
helper_sup = SupPid,
buf = Buf,
buf_len = BufLen,
proxy_socket = ProxySocket,
tracked_channels = maps:new(),
writer = none,
connection_state = received_amqp3100,
stats_timer = StatsTimer,
connection = #v1_connection{
name = ConnectionName,
container_id = none,
vhost = none,
host = Host,
peer_host = PeerHost,
port = Port,
peer_port = PeerPort,
connected_at = ConnectedAt,
user = unauthenticated,
timeout = ?NORMAL_TIMEOUT,
incoming_max_frame_size = ?INITIAL_MAX_FRAME_SIZE,
outgoing_max_frame_size = ?INITIAL_MAX_FRAME_SIZE,
%% "Prior to any explicit negotiation, [...] the maximum channel number is 0." [2.4.1]
channel_max = 0,
auth_mechanism = sasl_init_unprocessed,
auth_state = unauthenticated}}.
connected_at = ConnectedAt}}.
-spec system_continue(pid(), [sys:dbg_opt()], state()) -> no_return() | ok.
system_continue(Parent, Deb, State) ->
@ -152,8 +81,6 @@ set_credential(Pid, Credential) ->
%%--------------------------------------------------------------------------
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
recvloop(Deb, State = #v1{pending_recv = true}) ->
mainloop(Deb, State);
recvloop(Deb, State = #v1{sock = Sock,
@ -166,8 +93,7 @@ recvloop(Deb, State = #v1{sock = Sock,
{error, Reason} ->
throw({inet_error, Reason})
end;
recvloop(Deb, State0 = #v1{callback = Callback,
recv_len = RecvLen,
recvloop(Deb, State0 = #v1{recv_len = RecvLen,
buf = Buf,
buf_len = BufLen}) ->
Bin = case Buf of
@ -177,7 +103,7 @@ recvloop(Deb, State0 = #v1{callback = Callback,
{Data, Rest} = split_binary(Bin, RecvLen),
State1 = State0#v1{buf = [Rest],
buf_len = BufLen - RecvLen},
State = handle_input(Callback, Data, State1),
State = handle_input(Data, State1),
recvloop(Deb, State).
-spec mainloop([sys:dbg_opt()], state()) ->
@ -204,6 +130,7 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
end
end.
-spec handle_other(any(), state()) -> state() | stop.
handle_other(emit_stats, State) ->
emit_stats(State);
handle_other(ensure_stats_timer, State) ->
@ -236,7 +163,7 @@ handle_other(heartbeat_timeout, State) ->
Error = error_frame(?V_1_0_AMQP_ERROR_RESOURCE_LIMIT_EXCEEDED,
"no frame received from client within idle timeout threshold", []),
handle_exception(State, 0, Error);
handle_other({'$gen_call', From, {shutdown, Explanation}},
handle_other({rabbit_call, From, {shutdown, Explanation}},
State = #v1{connection = #v1_connection{properties = Properties}}) ->
Ret = case Explanation =:= "Node was put into maintenance mode" andalso
ignore_maintenance(Properties) of
@ -245,7 +172,7 @@ handle_other({'$gen_call', From, {shutdown, Explanation}},
end,
gen_server:reply(From, ok),
Ret;
handle_other({'$gen_call', From, {info, Items}}, State) ->
handle_other({rabbit_call, From, {info, Items}}, State) ->
Reply = try infos(Items, State) of
Infos ->
{ok, Infos}
@ -254,6 +181,9 @@ handle_other({'$gen_call', From, {info, Items}}, State) ->
end,
gen_server:reply(From, Reply),
State;
handle_other({'$gen_call', From, Req}, State) ->
%% Delete this function clause when feature flag 'rabbitmq_4.1.0' becomes required.
handle_other({rabbit_call, From, Req}, State);
handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) ->
case ?IS_RUNNING(State) of
true ->
@ -290,8 +220,7 @@ terminate(_, _) ->
%%--------------------------------------------------------------------------
%% error handling / termination
close(Error, State = #v1{sock = Sock,
connection = #v1_connection{timeout = Timeout}}) ->
close(Error, State = #v1{connection = #v1_connection{timeout = Timeout}}) ->
%% Client properties will be emitted in the connection_closed event by rabbit_reader.
ClientProperties = i(client_properties, State),
put(client_properties, ClientProperties),
@ -301,7 +230,7 @@ close(Error, State = #v1{sock = Sock,
false -> ?CLOSING_TIMEOUT
end,
_TRef = erlang:send_after(Time, self(), terminate_connection),
ok = send_on_channel0(Sock, #'v1_0.close'{error = Error}),
ok = send_on_channel0(State, #'v1_0.close'{error = Error}, amqp10_framing),
State#v1{connection_state = closed}.
handle_session_exit(ChannelNum, SessionPid, Reason, State0) ->
@ -491,12 +420,9 @@ handle_connection_frame(
end,
{ok, ReceiveTimeoutSec} = application:get_env(rabbit, heartbeat),
ReceiveTimeoutMillis = ReceiveTimeoutSec * 1000,
SendFun = fun() ->
Frame = amqp10_binary_generator:build_heartbeat_frame(),
catch rabbit_net:send(Sock, Frame)
end,
Parent = self(),
ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
Reader = self(),
ReceiveFun = fun() -> Reader ! heartbeat_timeout end,
SendFun = heartbeat_send_fun(Reader, State0),
%% TODO: only start heartbeat receive timer at next next frame
Heartbeater = rabbit_heartbeat:start(
HelperSupPid, Sock, ConnectionName,
@ -556,16 +482,21 @@ handle_connection_frame(
container_id = {utf8, rabbit_nodes:cluster_name()},
offered_capabilities = rabbit_amqp_util:capabilities(Caps),
properties = server_properties()},
ok = send_on_channel0(Sock, Open),
ok = send_on_channel0(State, Open, amqp10_framing),
State;
handle_connection_frame(#'v1_0.close'{}, State0) ->
State = State0#v1{connection_state = closing},
close(undefined, State).
start_writer(#v1{helper_sup = SupPid,
websocket = WebSocket,
sock = Sock} = State) ->
Socket = case WebSocket of
true -> websocket;
false -> Sock
end,
ChildSpec = #{id => writer,
start => {rabbit_amqp_writer, start_link, [Sock, self()]},
start => {rabbit_amqp_writer, start_link, [Socket, self()]},
restart => transient,
significant => true,
shutdown => ?WORKER_WAIT,
@ -620,15 +551,15 @@ handle_sasl_frame(#'v1_0.sasl_response'{response = {binary, Response}},
handle_sasl_frame(Performative, State) ->
throw({unexpected_1_0_sasl_frame, Performative, State}).
handle_input(handshake,
<<"AMQP",0,1,0,0>>,
#v1{connection_state = waiting_amqp0100,
sock = Sock,
-spec handle_input(binary(), state()) -> state().
handle_input(Handshake = <<"AMQP",0,1,0,0>>,
#v1{callback = handshake,
connection_state = waiting_amqp0100,
connection = #v1_connection{user = #user{}},
helper_sup = HelperSup
} = State0) ->
%% At this point, client already got successfully authenticated by SASL.
send_handshake(Sock, <<"AMQP",0,1,0,0>>),
send(State0, Handshake),
ChildSpec = #{id => session_sup,
start => {rabbit_amqp_session_sup, start_link, [self()]},
restart => transient,
@ -643,9 +574,9 @@ handle_input(handshake,
%% sending any other frames." [2.4.1]
connection_state = waiting_open},
switch_callback(State, {frame_header, amqp}, 8);
handle_input({frame_header, Mode},
Header = <<Size:32, DOff:8, Type:8, Channel:16>>,
State0) when DOff >= 2 ->
handle_input(Header = <<Size:32, DOff:8, Type:8, Channel:16>>,
State0 = #v1{callback = {frame_header, Mode}})
when DOff >= 2 ->
case {Mode, Type} of
{amqp, 0} -> ok;
{sasl, 1} -> ok;
@ -665,19 +596,16 @@ handle_input({frame_header, Mode},
switch_callback(State0, {frame_body, Mode, DOff, Channel}, Size - 8)
end,
ensure_stats_timer(State);
handle_input({frame_header, _Mode}, Malformed, _State) ->
handle_input(Malformed, #v1{callback = {frame_header, _Mode}}) ->
throw({bad_1_0_header, Malformed});
handle_input({frame_body, Mode, DOff, Channel},
FrameBin,
State) ->
handle_input(FrameBin, State0 = #v1{callback = {frame_body, Mode, DOff, Channel}}) ->
%% Figure 2.16
%% DOff = 4-byte words minus 8 bytes we've already read
ExtendedHeaderSize = (DOff * 32 - 64),
<<_IgnoreExtendedHeader:ExtendedHeaderSize, FrameBody/binary>> = FrameBin,
handle_frame(Mode, Channel, FrameBody,
switch_callback(State, {frame_header, Mode}, 8));
handle_input(Callback, Data, _State) ->
State = switch_callback(State0, {frame_header, Mode}, 8),
handle_frame(Mode, Channel, FrameBody, State);
handle_input(Data, #v1{callback = Callback}) ->
throw({bad_input, Callback, Data}).
-spec init(tuple()) -> no_return().
@ -689,26 +617,42 @@ init(PackedState) ->
%% By invoking recvloop here we become 1.0.
recvloop(sys:debug_options([]), State).
-spec advertise_sasl_mechanism(state()) -> state().
advertise_sasl_mechanism(State0 = #v1{connection_state = received_amqp3100,
sock = Sock}) ->
send_handshake(Sock, <<"AMQP",3,1,0,0>>),
send(State0, <<"AMQP",3,1,0,0>>),
Ms0 = [{symbol, atom_to_binary(M)} || M <- auth_mechanisms(Sock)],
Ms1 = {array, symbol, Ms0},
Ms = #'v1_0.sasl_mechanisms'{sasl_server_mechanisms = Ms1},
ok = send_on_channel0(Sock, Ms, rabbit_amqp_sasl),
ok = send_on_channel0(State0, Ms, rabbit_amqp_sasl),
State = State0#v1{connection_state = waiting_sasl_init},
switch_callback(State, {frame_header, sasl}, 8).
send_handshake(Sock, Handshake) ->
ok = inet_op(fun () -> rabbit_net:send(Sock, Handshake) end).
send_on_channel0(State, Performative, Framing) ->
Data = rabbit_amqp_writer:assemble_frame(0, Performative, Framing),
send(State, Data).
send_on_channel0(Sock, Method) ->
send_on_channel0(Sock, Method, amqp10_framing).
send(#v1{websocket = true}, Data) ->
self() ! {send_ws, self(), Data},
ok;
send(#v1{websocket = false,
sock = Sock}, Data) ->
rabbit_misc:throw_on_error(
inet_error,
fun() -> rabbit_net:send(Sock, Data) end).
send_on_channel0(Sock, Method, Framing) ->
ok = rabbit_amqp_writer:internal_send_command(Sock, Method, Framing).
%% End 1-0
heartbeat_send_fun(Reader, #v1{websocket = true}) ->
fun() ->
Frame = amqp10_binary_generator:build_heartbeat_frame(),
Reader ! {send_ws, self(), Frame},
ok
end;
heartbeat_send_fun(_, #v1{websocket = false,
sock = Sock}) ->
fun() ->
Frame = amqp10_binary_generator:build_heartbeat_frame(),
catch rabbit_net:send(Sock, Frame)
end.
auth_mechanism_to_module(TypeBin, Sock) ->
case rabbit_registry:binary_to_type(TypeBin) of
@ -742,8 +686,7 @@ auth_mechanisms(Sock) ->
auth_phase(
Response,
State = #v1{sock = Sock,
connection = Conn = #v1_connection{auth_mechanism = {Name, AuthMechanism},
State = #v1{connection = Conn = #v1_connection{auth_mechanism = {Name, AuthMechanism},
auth_state = AuthState}}) ->
case AuthMechanism:handle_response(Response, AuthState) of
{refused, Username, Msg, Args} ->
@ -753,7 +696,7 @@ auth_phase(
auth_fail(Username, State),
silent_close_delay(),
Outcome = #'v1_0.sasl_outcome'{code = ?V_1_0_SASL_CODE_AUTH},
ok = send_on_channel0(Sock, Outcome, rabbit_amqp_sasl),
ok = send_on_channel0(State, Outcome, rabbit_amqp_sasl),
protocol_error(
?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, "~ts login refused: ~ts",
[Name, io_lib:format(Msg, Args)]);
@ -762,12 +705,12 @@ auth_phase(
protocol_error(?V_1_0_AMQP_ERROR_DECODE_ERROR, Msg, Args);
{challenge, Challenge, AuthState1} ->
Challenge = #'v1_0.sasl_challenge'{challenge = {binary, Challenge}},
ok = send_on_channel0(Sock, Challenge, rabbit_amqp_sasl),
ok = send_on_channel0(State, Challenge, rabbit_amqp_sasl),
State1 = State#v1{connection = Conn#v1_connection{auth_state = AuthState1}},
switch_callback(State1, {frame_header, sasl}, 8);
{ok, User} ->
Outcome = #'v1_0.sasl_outcome'{code = ?V_1_0_SASL_CODE_OK},
ok = send_on_channel0(Sock, Outcome, rabbit_amqp_sasl),
ok = send_on_channel0(State, Outcome, rabbit_amqp_sasl),
State1 = State#v1{connection_state = waiting_amqp0100,
connection = Conn#v1_connection{user = User,
auth_state = authenticated}},
@ -967,17 +910,11 @@ silent_close_delay() ->
-spec info(rabbit_types:connection(), rabbit_types:info_keys()) ->
rabbit_types:infos().
info(Pid, InfoItems) ->
KnownItems = [session_pids | ?INFO_ITEMS],
case InfoItems -- KnownItems of
[] ->
case gen_server:call(Pid, {info, InfoItems}, infinity) of
{ok, InfoList} ->
InfoList;
{error, Error} ->
throw(Error)
end;
UnknownItems ->
throw({bad_argument, UnknownItems})
case gen_server:call(Pid, {info, InfoItems}, infinity) of
{ok, InfoList} ->
InfoList;
{error, Reason} ->
throw(Reason)
end.
infos(Items, State) ->
@ -987,8 +924,12 @@ i(pid, #v1{}) ->
self();
i(type, #v1{}) ->
network;
i(protocol, #v1{}) ->
{1, 0};
i(protocol, #v1{websocket = WebSocket}) ->
Vsn = {1, 0},
case WebSocket of
true -> {'Web AMQP', Vsn};
false -> Vsn
end;
i(connection, #v1{connection = Val}) ->
Val;
i(node, #v1{}) ->

View File

@ -1,17 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
-define(SIMPLE_METRICS, [pid,
recv_oct,
send_oct,
reductions]).
-define(OTHER_METRICS, [recv_cnt,
send_cnt,
send_pend,
state,
channels,
garbage_collection]).

View File

@ -16,7 +16,7 @@
send_command/4,
send_command_sync/3,
send_command_and_notify/5,
internal_send_command/3]).
assemble_frame/3]).
%% gen_server callbacks
-export([init/1,
@ -26,7 +26,7 @@
format_status/1]).
-record(state, {
sock :: rabbit_net:socket(),
sock :: rabbit_net:socket() | websocket,
reader :: rabbit_types:connection(),
pending :: iolist(),
%% This field is just an optimisation to minimize the cost of erlang:iolist_size/1
@ -85,13 +85,6 @@ send_command_and_notify(Writer, QueuePid, ChannelNum, Performative, Payload) ->
Request = {send_command_and_notify, QueuePid, self(), ChannelNum, Performative, Payload},
maybe_send(Writer, Request).
-spec internal_send_command(rabbit_net:socket(),
performative(),
amqp10_framing | rabbit_amqp_sasl) -> ok.
internal_send_command(Sock, Performative, Protocol) ->
Data = assemble_frame(0, Performative, Protocol),
ok = tcp_send(Sock, Data).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% gen_server callbacks %%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -125,13 +118,16 @@ handle_call({send_command, ChannelNum, Performative}, _From, State0) ->
State = flush(State1),
{reply, ok, State}.
handle_info(timeout, State0) ->
State = flush(State0),
{noreply, State};
handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
no_reply(State);
handle_info(emit_stats, State0 = #state{reader = ReaderPid}) ->
ReaderPid ! ensure_stats_timer,
State = rabbit_event:reset_stats_timer(State0, #state.stats_timer),
no_reply(State);
handle_info(timeout, State0) ->
State = flush(State0),
{noreply, State};
handle_info({{'DOWN', session}, _MRef, process, SessionPid, _Reason},
State0 = #state{monitored_sessions = Sessions}) ->
credit_flow:peer_down(SessionPid),
@ -203,6 +199,9 @@ internal_send_command_async(Channel, Performative, Payload,
assemble_frame(Channel, Performative) ->
assemble_frame(Channel, Performative, amqp10_framing).
-spec assemble_frame(rabbit_types:channel_number(),
performative(),
amqp10_framing | rabbit_amqp_sasl) -> iolist().
assemble_frame(Channel, Performative, amqp10_framing) ->
?TRACE("channel ~b <-~n ~tp",
[Channel, amqp10_framing:pprint(Performative)]),
@ -220,11 +219,6 @@ assemble_frame_with_payload(Channel, Performative, Payload) ->
PerfIoData = amqp10_framing:encode_bin(Performative),
amqp10_binary_generator:build_frame(Channel, [PerfIoData, Payload]).
tcp_send(Sock, Data) ->
rabbit_misc:throw_on_error(
inet_error,
fun() -> rabbit_net:send(Sock, Data) end).
%% Flush when more than 2.5 * 1460 bytes (TCP over Ethernet MSS) = 3650 bytes of data
%% has accumulated. The idea is to get the TCP data sections full (i.e. fill 1460 bytes)
%% as often as possible to reduce the overhead of TCP/IP headers.
@ -238,6 +232,13 @@ maybe_flush(State = #state{pending_size = PendingSize}) ->
flush(State = #state{pending = []}) ->
State;
flush(State = #state{sock = websocket,
reader = Reader,
pending = Pending}) ->
credit_flow:send(Reader),
Reader ! {send_ws, self(), lists:reverse(Pending)},
State#state{pending = [],
pending_size = 0};
flush(State0 = #state{sock = Sock,
pending = Pending}) ->
case rabbit_net:send(Sock, lists:reverse(Pending)) of

View File

@ -42,7 +42,7 @@
-include_lib("rabbit_common/include/rabbit_framing.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include("rabbit_amqp_reader.hrl").
-include("rabbit_amqp_metrics.hrl").
-export([start_link/2, info/2, force_event_refresh/2,
shutdown/2]).
@ -146,10 +146,9 @@ start_link(HelperSups, Ref) ->
Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSups, Ref]),
{ok, Pid}.
-spec shutdown(pid(), string()) -> 'ok'.
-spec shutdown(pid(), string()) -> ok.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
gen_call(Pid, {shutdown, Explanation}, infinity).
-spec init(pid(), {pid(), pid()}, ranch:ref()) ->
no_return().
@ -176,11 +175,10 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
-spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos().
info(Pid, Items) ->
case gen_server:call(Pid, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
case gen_call(Pid, {info, Items}, infinity) of
{ok, InfoList} -> InfoList;
{error, Reason} -> throw(Reason)
end.
-spec force_event_refresh(pid(), reference()) -> 'ok'.
@ -296,7 +294,7 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) ->
{PeerHost, PeerPort, Host, Port} =
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
?store_proc_name(Name),
ConnectedAt = os:system_time(milli_seconds),
ConnectedAt = os:system_time(millisecond),
State = #v1{parent = Parent,
ranch_ref = RanchRef,
sock = RealSocket,
@ -604,18 +602,21 @@ handle_other(heartbeat_timeout,
State = #v1{connection = #connection{timeout_sec = T}}) ->
maybe_emit_stats(State),
throw({heartbeat_timeout, T});
handle_other({'$gen_call', From, {shutdown, Explanation}}, State) ->
handle_other({rabbit_call, From, {shutdown, Explanation}}, State) ->
{ForceTermination, NewState} = terminate(Explanation, State),
gen_server:reply(From, ok),
case ForceTermination of
force -> stop;
normal -> NewState
end;
handle_other({'$gen_call', From, {info, Items}}, State) ->
handle_other({rabbit_call, From, {info, Items}}, State) ->
gen_server:reply(From, try {ok, infos(Items, State)}
catch Error -> {error, Error}
end),
State;
handle_other({'$gen_call', From, Req}, State) ->
%% Delete this function clause when feature flag 'rabbitmq_4.1.0' becomes required.
handle_other({rabbit_call, From, Req}, State);
handle_other({'$gen_cast', {force_event_refresh, Ref}}, State)
when ?IS_RUNNING(State) ->
rabbit_event:notify(
@ -1842,3 +1843,19 @@ connection_duration(ConnectedAt) ->
true ->
io_lib:format("~Bms", [DurationMillis])
end.
gen_call(Pid, Req, Timeout) ->
case rabbit_feature_flags:is_enabled('rabbitmq_4.1.0') of
true ->
%% We use gen:call/4 with label rabbit_call instead of gen_server:call/3 with label '$gen_call'
%% because cowboy_websocket does not let rabbit_web_amqp_handler handle '$gen_call' messages:
%% https://github.com/ninenines/cowboy/blob/2.12.0/src/cowboy_websocket.erl#L427-L430
case catch gen:call(Pid, rabbit_call, Req, Timeout) of
{ok, Res} ->
Res;
{'EXIT', Reason} ->
exit({Reason, {?MODULE, ?FUNCTION_NAME, [Pid, Req, Timeout]}})
end;
false ->
gen_server:call(Pid, Req, Timeout)
end.

View File

@ -19,7 +19,8 @@
-import(rabbit_ct_helpers,
[eventually/1]).
-import(amqp_utils,
[flush/1,
[connection_config/1,
flush/1,
wait_for_credit/1]).
-define(TIMEOUT, 30_000).
@ -647,14 +648,6 @@ cleanup({Connection, LinkPair = #link_pair{session = Session}}) ->
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).
connection_config(Config) ->
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
#{address => Host,
port => Port,
container_id => <<"my container">>,
sasl => {plain, <<"guest">>, <<"guest">>}}.
wait_for_settled(State, Tag) ->
receive
{amqp10_disposition, {State, Tag}} ->

View File

@ -24,7 +24,8 @@
[assert_event_type/2,
assert_event_prop/2]).
-import(amqp_utils,
[flush/1,
[web_amqp/1,
flush/1,
wait_for_credit/1,
close_connection_sync/1]).
@ -584,15 +585,9 @@ target_per_message_topic(Config) ->
authn_failure_event(Config) ->
ok = event_recorder:start(Config),
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
Vhost = ?config(test_vhost, Config),
User = ?config(test_user, Config),
OpnConf = #{address => Host,
port => Port,
container_id => <<"my container">>,
sasl => {plain, User, <<"wrong password">>},
hostname => <<"vhost:", Vhost/binary>>},
OpnConf0 = connection_config(Config),
OpnConf = maps:update(sasl, {plain, User, <<"wrong password">>}, OpnConf0),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
receive {amqp10_event, {connection, Connection, {closed, sasl_auth_failure}}} -> ok
@ -603,11 +598,15 @@ authn_failure_event(Config) ->
[E | _] = event_recorder:get_events(Config),
ok = event_recorder:stop(Config),
Proto = case web_amqp(Config) of
true -> {'Web AMQP', {1, 0}};
false -> {1, 0}
end,
assert_event_type(user_authentication_failure, E),
assert_event_prop([{name, <<"test user">>},
{auth_mechanism, <<"PLAIN">>},
{ssl, false},
{protocol, {1, 0}}],
{protocol, Proto}],
E).
sasl_anonymous_success(Config) ->
@ -1037,14 +1036,10 @@ connection_config(Config) ->
connection_config(Config, Vhost).
connection_config(Config, Vhost) ->
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
Cfg = amqp_utils:connection_config(Config),
User = Password = ?config(test_user, Config),
#{address => Host,
port => Port,
container_id => <<"my container">>,
sasl => {plain, User, Password},
hostname => <<"vhost:", Vhost/binary>>}.
Cfg#{hostname => <<"vhost:", Vhost/binary>>,
sasl := {plain, User, Password}}.
set_permissions(Config, ConfigurePerm, WritePerm, ReadPerm) ->
ok = rabbit_ct_broker_helpers:set_permissions(Config,

View File

@ -30,6 +30,7 @@
-import(amqp_utils,
[init/1, init/2,
connection_config/1, connection_config/2,
web_amqp/1,
flush/1,
wait_for_credit/1,
wait_for_accepts/1,
@ -1898,17 +1899,20 @@ events(Config) ->
ok = event_recorder:stop(Config),
ct:pal("Recorded events: ~p", [Events]),
Protocol = {protocol, {1, 0}},
Proto = case web_amqp(Config) of
true -> {'Web AMQP', {1, 0}};
false -> {1, 0}
end,
AuthProps = [{name, <<"guest">>},
{auth_mechanism, <<"PLAIN">>},
{ssl, false},
Protocol],
{protocol, Proto}],
?assertMatch(
{value, _},
find_event(user_authentication_success, AuthProps, Events)),
Node = get_node_config(Config, 0, nodename),
ConnectionCreatedProps = [Protocol,
ConnectionCreatedProps = [{protocol, Proto},
{node, Node},
{vhost, <<"/">>},
{user, <<"guest">>},
@ -3969,7 +3973,7 @@ leader_transfer_send(QName, QType, Config) ->
end.
%% rabbitmqctl list_connections
%% should list both AMQP 1.0 and AMQP 0.9.1 connections.
%% should list both (Web) AMQP 1.0 and AMQP 0.9.1 connections.
list_connections(Config) ->
%% Close any open AMQP 0.9.1 connections from previous test cases.
[ok = rabbit_ct_client_helpers:close_channels_and_connection(Config, Node) || Node <- [0, 1, 2]],
@ -3993,10 +3997,13 @@ list_connections(Config) ->
%% Remove any whitespaces.
Protocols1 = [binary:replace(Subject, <<" ">>, <<>>, [global]) || Subject <- Protocols0],
Protocols = lists:sort(Protocols1),
?assertEqual([<<"{0,9,1}">>,
<<"{1,0}">>,
<<"{1,0}">>],
Protocols),
Expected = case web_amqp(Config) of
true ->
[<<"{'WebAMQP',{1,0}}">>, <<"{'WebAMQP',{1,0}}">>, <<"{0,9,1}">>];
false ->
[<<"{0,9,1}">>, <<"{1,0}">>, <<"{1,0}">>]
end,
?assertEqual(Expected, Protocols),
%% CLI should list AMQP 1.0 container-id
{ok, StdOut1} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["list_connections", "--silent", "container_id"]),
@ -4640,7 +4647,12 @@ idle_time_out_on_server(Config) ->
rabbit_ct_broker_helpers:setup_meck(Config),
Mod = rabbit_net,
ok = rpc(Config, meck, new, [Mod, [no_link, passthrough]]),
ok = rpc(Config, meck, expect, [Mod, getstat, 2, {ok, [{recv_oct, 999}]}]),
ok = rpc(Config, meck, expect, [Mod, getstat, fun(_Sock, [recv_oct]) ->
{ok, [{recv_oct, 999}]};
(Sock, Opts) ->
meck:passthrough([Sock, Opts])
end]),
%% The server "SHOULD try to gracefully close the connection using a close
%% frame with an error explaining why" [2.4.5].
%% Since we chose a heartbeat value of 1 second, the server should easily
@ -4677,13 +4689,15 @@ idle_time_out_on_client(Config) ->
%% All good, the server sent us frames every second.
%% Mock the server to not send anything.
%% Mocking gen_tcp:send/2 allows this test to work for
%% * AMQP: https://github.com/rabbitmq/rabbitmq-server/blob/v4.1.0-beta.3/deps/rabbit_common/src/rabbit_net.erl#L174
%% * AMQP over WebSocket: https://github.com/ninenines/ranch/blob/2.1.0/src/ranch_tcp.erl#L191
rabbit_ct_broker_helpers:setup_meck(Config),
Mod = rabbit_net,
ok = rpc(Config, meck, new, [Mod, [no_link, passthrough]]),
Mod = gen_tcp,
ok = rpc(Config, meck, new, [Mod, [unstick, no_link, passthrough]]),
ok = rpc(Config, meck, expect, [Mod, send, 2, ok]),
%% Our client should time out within less than 5 seconds given that the
%% idle-time-out is 1 second.
%% Our client should time out soon given that the idle-time-out is 1 second.
receive
{amqp10_event,
{connection, Connection,
@ -4709,9 +4723,19 @@ handshake_timeout(Config) ->
Par = ?FUNCTION_NAME,
{ok, DefaultVal} = rpc(Config, application, get_env, [App, Par]),
ok = rpc(Config, application, set_env, [App, Par, 200]),
Port = get_node_config(Config, 0, tcp_port_amqp),
{ok, Socket} = gen_tcp:connect("localhost", Port, [{active, false}]),
?assertEqual({error, closed}, gen_tcp:recv(Socket, 0, 400)),
case web_amqp(Config) of
true ->
Port = get_node_config(Config, 0, tcp_port_web_amqp),
Uri = "ws://127.0.0.1:" ++ integer_to_list(Port) ++ "/ws",
Ws = rfc6455_client:new(Uri, self(), undefined, ["amqp"]),
{ok, [{http_response, Resp}]} = rfc6455_client:open(Ws),
?assertNotEqual(nomatch, string:prefix(Resp, "HTTP/1.1 101 Switching Protocols")),
?assertMatch({close, _}, rfc6455_client:recv(Ws, 1000));
false ->
Port = get_node_config(Config, 0, tcp_port_amqp),
{ok, Socket} = gen_tcp:connect("localhost", Port, [{active, false}]),
?assertEqual({error, closed}, gen_tcp:recv(Socket, 0, 1000))
end,
ok = rpc(Config, application, set_env, [App, Par, DefaultVal]).
credential_expires(Config) ->
@ -5905,20 +5929,35 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) ->
end,
flush(receiver_attached),
{_GenStatemState,
#{reader := ReaderPid,
socket := {tcp, Socket}}} = formatted_state(Session),
{_GenStatemStateSession, StateSession} = formatted_state(Session),
Socket = case web_amqp(Config) of
true ->
#{socket := {ws, GunPid, _GunStreamRef}} = StateSession,
{_GenStatemStateGun, StateGun} = formatted_state(GunPid),
%% https://github.com/ninenines/gun/blob/2.1.0/src/gun.erl#L315
element(12, StateGun);
false ->
#{socket := {tcp, Sock}} = StateSession,
Sock
end,
?assert(is_port(Socket)),
%% Provoke TCP back-pressure from client to server by using very small buffers.
%% Provoke TCP back-pressure from client to server by:
%% 1. using very small buffers
ok = inet:setopts(Socket, [{recbuf, 256},
{buffer, 256}]),
%% Suspend the receiving client such that it stops reading from its socket
%% causing TCP back-pressure to the server being applied.
true = erlang:suspend_process(ReaderPid),
%% 2. stopping reading from the socket
Mod = inet,
ok = meck:new(Mod, [unstick, no_link, passthrough]),
ok = meck:expect(Mod, setopts, fun(_Sock, [{active, once}]) ->
ok;
(Sock, Opts) ->
meck:passthrough([Sock, Opts])
end),
ok = amqp10_client:flow_link_credit(Receiver, Num, never),
%% We give the queue time to send messages to the session proc and writer proc.
timer:sleep(1000),
timer:sleep(2000),
%% Here, we do a bit of white box testing: We assert that RabbitMQ has some form of internal
%% flow control by checking that the queue sent some but, more importantly, not all its
@ -5932,7 +5971,9 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) ->
ok = inet:setopts(Socket, [{recbuf, 65536},
{buffer, 65536}]),
%% When we resume the receiving client, we expect to receive all messages.
true = erlang:resume_process(ReaderPid),
?assert(meck:validate(Mod)),
ok = meck:unload(Mod),
ok = Mod:setopts(Socket, [{active, once}]),
receive_messages(Receiver, Num),
ok = detach_link_sync(Receiver),

View File

@ -271,10 +271,7 @@ application_properties_section(Config) ->
{ok, Receiver0} = amqp10_client:attach_receiver_link(
Session, <<"receiver 0">>, Address,
unsettled, configuration, Filter0),
%% Wait for the attach so the detach command won't fail
receive {amqp10_event,
{link, Receiver0, {attached, #'v1_0.attach'{}}}} ->
ok
receive {amqp10_event, {link, Receiver0, {attached, #'v1_0.attach'{}}}} -> ok
after 30000 -> ct:fail({missing_event, ?LINE})
end,
ok = amqp10_client:flow_link_credit(Receiver0, 10, never),
@ -597,6 +594,9 @@ string_modifier(Config) ->
{ok, Receiver2} = amqp10_client:attach_receiver_link(
Session, <<"receiver 2">>, Address,
settled, configuration, Filter2),
receive {amqp10_event, {link, Receiver2, attached}} -> ok
after 30000 -> ct:fail({missing_event, ?LINE})
end,
ok = amqp10_client:flow_link_credit(Receiver2, 10, never),
ok = assert_no_msg_received(?LINE),
ok = detach_link_sync(Receiver2),

View File

@ -101,10 +101,10 @@ v2_local(Config) ->
%% use wireshark with one of the Java tests to record those
amqp_1_0_frame(header_sasl) ->
hex_frame_to_binary("414d515003010000");
amqp_1_0_frame(header_amqp) ->
hex_frame_to_binary("414d515000010000");
amqp_1_0_frame(sasl_init) ->
hex_frame_to_binary("0000001902010000005341c00c01a309414e4f4e594d4f5553");
amqp_1_0_frame(header_amqp) ->
hex_frame_to_binary("414d515000010000");
amqp_1_0_frame(open) ->
hex_frame_to_binary("0000003f02000000005310c03202a12438306335323662332d653530662d343835352d613564302d336466643738623537633730a1096c6f63616c686f7374").

View File

@ -11,6 +11,7 @@
-export([init/1, init/2,
connection_config/1, connection_config/2,
web_amqp/1,
flush/1,
wait_for_credit/1,
wait_for_accepts/1,
@ -35,11 +36,21 @@ connection_config(Config) ->
connection_config(Node, Config) ->
Host = proplists:get_value(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_amqp),
#{address => Host,
port => Port,
container_id => <<"my container">>,
sasl => {plain, <<"guest">>, <<"guest">>}}.
Cfg = #{address => Host,
container_id => <<"my container">>,
sasl => {plain, <<"guest">>, <<"guest">>}},
case web_amqp(Config) of
true ->
Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_web_amqp),
Cfg#{port => Port,
ws_path => "/ws"};
false ->
Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_amqp),
Cfg#{port => Port}
end.
web_amqp(Config) ->
proplists:get_value(web_amqp, Config, false).
flush(Prefix) ->
receive

View File

@ -93,7 +93,9 @@ classic_queue_flow_control_enabled(Config) ->
?assertMatch({0, _}, gen_server2_queue(QPid)),
%% The connection gets into flow state
?assertEqual([{state, flow}], rabbit_reader:info(ConnPid, [state])),
?assertEqual(
[{state, flow}],
rabbit_ct_broker_helpers:rpc(Config, rabbit_reader, info, [ConnPid, [state]])),
Dict = proc_info(ConnPid, dictionary),
?assertMatch([_|_], proplists:get_value(credit_blocked, Dict)),
@ -111,7 +113,9 @@ classic_queue_flow_control_disabled(Config) ->
?assertMatch({0, _}, gen_server2_queue(QPid)),
%% The connection dos not get into flow state
?assertEqual([{state, running}], rabbit_reader:info(ConnPid, [state])),
?assertEqual(
[{state, running}],
rabbit_ct_broker_helpers:rpc(Config, rabbit_reader, info, [ConnPid, [state]])),
Dict = proc_info(ConnPid, dictionary),
?assertMatch([], proplists:get_value(credit_blocked, Dict, []))

View File

@ -164,6 +164,9 @@ $(if $(RABBITMQ_NODE_PORT), {tcp_listeners$(comma) [$(RABBITMQ_NODE_PORT)]}
{rabbitmq_management, [
$(if $(RABBITMQ_NODE_PORT), {listener$(comma) [{port$(comma) $(shell echo "$$(($(RABBITMQ_NODE_PORT) + 10000))")}]},)
]},
{rabbitmq_web_amqp, [
$(if $(RABBITMQ_NODE_PORT), {tcp_config$(comma) [{port$(comma) $(shell echo "$$((15678 + $(RABBITMQ_NODE_PORT) - 5672))")}]},)
]},
{rabbitmq_mqtt, [
$(if $(RABBITMQ_NODE_PORT), {tcp_listeners$(comma) [$(shell echo "$$((1883 + $(RABBITMQ_NODE_PORT) - 5672))")]},)
]},

View File

@ -33,9 +33,7 @@
%%
%% Grepping the project files for `credit_flow` will reveal the places
%% where this module is currently used, with extra comments on what's
%% going on at each instance. Note that credit flow between mirrors
%% synchronization has not been documented, since this doesn't affect
%% client publishes.
%% going on at each instance.
-define(DEFAULT_CREDIT, persistent_term:get(credit_flow_default_credit)).
@ -116,18 +114,18 @@ send(From) -> send(From, ?DEFAULT_CREDIT).
send(From, {InitialCredit, _MoreCreditAfter}) ->
?UPDATE({credit_from, From}, InitialCredit, C,
if C == 1 -> block(From),
0;
true -> C - 1
if C =:= 1 -> block(From),
0;
true -> C - 1
end).
ack(To) -> ack(To, ?DEFAULT_CREDIT).
ack(To, {_InitialCredit, MoreCreditAfter}) ->
?UPDATE({credit_to, To}, MoreCreditAfter, C,
if C == 1 -> grant(To, MoreCreditAfter),
MoreCreditAfter;
true -> C - 1
if C =:= 1 -> grant(To, MoreCreditAfter),
MoreCreditAfter;
true -> C - 1
end).
handle_bump_msg({From, MoreCredit}) ->
@ -193,10 +191,15 @@ unblock(From) ->
?TRACE_UNBLOCKED(self(), From),
?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),
case blocked() of
false -> case erase(credit_deferred) of
undefined -> ok;
Credits -> _ = [To ! Msg || {To, Msg} <- Credits],
ok
end;
true -> ok
false ->
case erase(credit_deferred) of
undefined ->
ok;
Credits ->
lists:foreach(fun({To, Msg}) ->
To ! Msg
end, Credits)
end;
true ->
ok
end.

View File

@ -223,7 +223,7 @@ connection_string(Sock, Direction) ->
end.
socket_ends(Sock, Direction) when ?IS_SSL(Sock);
is_port(Sock) ->
is_port(Sock) ->
{From, To} = sock_funs(Direction),
case {From(Sock), To(Sock)} of
{{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} ->

View File

@ -261,10 +261,12 @@ defmodule RabbitMQ.CLI.Core.Listeners do
def protocol_label(:"stomp/ssl"), do: "STOMP over TLS"
def protocol_label(:http), do: "HTTP API"
def protocol_label(:https), do: "HTTP API over TLS (HTTPS)"
def protocol_label(:"http/web-mqtt"), do: "MQTT over WebSockets"
def protocol_label(:"https/web-mqtt"), do: "MQTT over WebSockets and TLS (HTTPS)"
def protocol_label(:"http/web-stomp"), do: "STOMP over WebSockets"
def protocol_label(:"https/web-stomp"), do: "STOMP over WebSockets and TLS (HTTPS)"
def protocol_label(:"http/web-amqp"), do: "AMQP over WebSocket"
def protocol_label(:"https/web-amqp"), do: "AMQP over WebSocket and TLS (HTTPS)"
def protocol_label(:"http/web-mqtt"), do: "MQTT over WebSocket"
def protocol_label(:"https/web-mqtt"), do: "MQTT over WebSocket and TLS (HTTPS)"
def protocol_label(:"http/web-stomp"), do: "STOMP over WebSocket"
def protocol_label(:"https/web-stomp"), do: "STOMP over WebSocket and TLS (HTTPS)"
def protocol_label(:"http/prometheus"), do: "Prometheus exporter API over HTTP"
def protocol_label(:"https/prometheus"), do: "Prometheus exporter API over TLS (HTTPS)"
def protocol_label(:clustering), do: "inter-node and CLI tool communication"

View File

@ -37,8 +37,8 @@ defmodule CoreListenersTest do
assert protocol_label(:"stomp/ssl") == "STOMP over TLS"
assert protocol_label(:http) == "HTTP API"
assert protocol_label(:https) == "HTTP API over TLS (HTTPS)"
assert protocol_label(:"https/web-stomp") == "STOMP over WebSockets and TLS (HTTPS)"
assert protocol_label(:"https/web-mqtt") == "MQTT over WebSockets and TLS (HTTPS)"
assert protocol_label(:"https/web-stomp") == "STOMP over WebSocket and TLS (HTTPS)"
assert protocol_label(:"https/web-mqtt") == "MQTT over WebSocket and TLS (HTTPS)"
assert protocol_label(:"http/prometheus") == "Prometheus exporter API over HTTP"
assert protocol_label(:"https/prometheus") == "Prometheus exporter API over TLS (HTTPS)"

View File

@ -5,7 +5,7 @@ DEPS = rabbit_common rabbitmq_ct_helpers amqp_client
DEP_PLUGINS = rabbit_common/mk/rabbitmq-build.mk
PLT_APPS = common_test
PLT_APPS += common_test crypto
include ../../rabbitmq-components.mk
include ../../erlang.mk

View File

@ -23,8 +23,8 @@ new(WsUrl, PPid, AuthInfo, Protocols) ->
new(WsUrl, PPid, AuthInfo, Protocols, <<>>).
new(WsUrl, PPid, AuthInfo, Protocols, TcpPreface) ->
crypto:start(),
application:ensure_all_started(ssl),
_ = crypto:start(),
_ = application:ensure_all_started(ssl),
{Transport, Url} = case WsUrl of
"ws://" ++ Rest -> {gen_tcp, Rest};
"wss://" ++ SslRest -> {ssl, SslRest}
@ -113,7 +113,7 @@ start_conn(State = #state{transport = Transport}, AuthInfo, Protocols, TcpPrefac
{ok, Socket0} = gen_tcp:connect(State#state.host, State#state.port,
[binary,
{packet, 0}]),
gen_tcp:send(Socket0, TcpPreface),
ok = gen_tcp:send(Socket0, TcpPreface),
case Transport of
gen_tcp -> {ok, Socket0};
ssl -> Transport:connect(Socket0, [{verify, verify_none}])
@ -173,7 +173,7 @@ do_recv(State = #state{phase = Phase, data = Data, socket = Socket, transport =
<<F:1, _:3, O:4, 0:1, 127:7, L2:64, Payload:L2/binary, Rest/binary>> ->
{F, O, Payload, Rest};
<<_:1, _:3, _:4, 1:1, _/binary>> ->
<<_:1, _:3, _:4, 1:1, _/bitstring>> ->
%% According o rfc6455 5.1 the server must not mask any frames.
die(Socket, Transport, PPid, {1006, "Protocol error"}, normal);
_ ->
@ -200,7 +200,7 @@ do_recv2(State = #state{phase = Phase, socket = Socket, ppid = PPid, transport =
end,
case Phase of
open -> %% echo
do_close(State, WsReason),
_ = do_close(State, WsReason),
Transport:close(Socket);
closing ->
ok
@ -260,7 +260,7 @@ loop(State = #state{socket = Socket, transport = Transport, ppid = PPid, data =
error({unknown_message, Other, Socket})
end.
-spec die(any(), any(), pid(), any(), any()) -> no_return().
die(Socket, Transport, PPid, WsReason, Reason) ->
Transport:shutdown(Socket, read_write),
PPid ! {rfc6455, close, self(), WsReason},
@ -285,9 +285,6 @@ split(SubStr, Str, Limit, Acc, Default) ->
split(SubStr, R, Limit-1, [L | Acc], Default).
apply_mask(Mask, Data) when is_number(Mask) ->
apply_mask(<<Mask:32>>, Data);
apply_mask(<<0:32>>, Data) ->
Data;
apply_mask(Mask, Data) ->

View File

@ -194,6 +194,8 @@
tcp_port_erlang_dist_proxy,
tcp_port_mqtt,
tcp_port_mqtt_tls,
tcp_port_web_amqp,
tcp_port_web_amqp_tls,
tcp_port_web_mqtt,
tcp_port_web_mqtt_tls,
tcp_port_stomp,
@ -547,6 +549,13 @@ update_tcp_ports_in_rmq_config(NodeConfig, [tcp_port_mqtt_tls = Key | Rest]) ->
NodeConfig1 = rabbit_ct_helpers:merge_app_env(NodeConfig,
{rabbitmq_mqtt, [{ssl_listeners, [?config(Key, NodeConfig)]}]}),
update_tcp_ports_in_rmq_config(NodeConfig1, Rest);
update_tcp_ports_in_rmq_config(NodeConfig, [tcp_port_web_amqp_tls | Rest]) ->
%% Skip this one, because we need more than just a port to configure
update_tcp_ports_in_rmq_config(NodeConfig, Rest);
update_tcp_ports_in_rmq_config(NodeConfig, [tcp_port_web_amqp = Key | Rest]) ->
NodeConfig1 = rabbit_ct_helpers:merge_app_env(NodeConfig,
{rabbitmq_web_amqp, [{tcp_config, [{port, ?config(Key, NodeConfig)}]}]}),
update_tcp_ports_in_rmq_config(NodeConfig1, Rest);
update_tcp_ports_in_rmq_config(NodeConfig, [tcp_port_web_mqtt_tls | Rest]) ->
%% Skip this one, because we need more than just a port to configure
update_tcp_ports_in_rmq_config(NodeConfig, Rest);

View File

@ -55,7 +55,8 @@ dispatcher_add(function(sammy) {
};
// First, get the connection details to check the protocol
var connectionDetails = JSON.parse(sync_get(connectionPath));
if (connectionDetails.protocol === 'AMQP 1-0') {
if (connectionDetails.protocol === 'AMQP 1-0' ||
connectionDetails.protocol === 'Web AMQP 1-0') {
reqs['sessions'] = connectionPath + '/sessions';
} else {
reqs['channels'] = connectionPath + '/channels';

View File

@ -84,7 +84,8 @@
</div>
</div>
<% if (connection.protocol === 'AMQP 1-0') { %>
<% if (connection.protocol === 'AMQP 1-0' ||
connection.protocol === 'Web AMQP 1-0') { %>
<div class="section" id="connection-sessions-section">
<h2 class="updatable" >Sessions (<%=(sessions.length)%>)</h2>

View File

@ -35,10 +35,13 @@ resource_exists(ReqData, Context) ->
to_json(ReqData, Context) ->
Conn = conn(ReqData),
case proplists:get_value(protocol, Conn) of
{1, 0} ->
Vsn = {1, 0},
Protocol = proplists:get_value(protocol, Conn),
case Protocol =:= Vsn orelse
Protocol =:= {'Web AMQP', Vsn} of
true ->
ConnPid = proplists:get_value(pid, Conn),
try rabbit_amqp_reader:info(ConnPid, [session_pids]) of
try rabbit_reader:info(ConnPid, [session_pids]) of
[{session_pids, Pids}] ->
rabbit_mgmt_util:reply_list(session_infos(Pids),
["channel_number"],
@ -52,7 +55,7 @@ to_json(ReqData, Context) ->
[ConnPid, Type, Reason0]))),
rabbit_mgmt_util:internal_server_error(Reason, ReqData, Context)
end;
_ ->
false ->
rabbit_mgmt_util:bad_request(<<"connection does not use AMQP 1.0">>,
ReqData,
Context)

View File

@ -17,8 +17,8 @@
open_unmanaged_connection/1]).
-import(rabbit_ct_broker_helpers, [rpc/4]).
-import(rabbit_ct_helpers,
[eventually/3,
eventually/1]).
[eventually/1,
eventually/3]).
-import(rabbit_mgmt_test_util, [assert_list/2, assert_item/2, test_item/2,
assert_keys/2, assert_no_keys/2,
decode_body/1,

View File

@ -1,6 +1,6 @@
<!doctype html>
<html><head>
<script src="https://code.jquery.com/jquery-3.4.1.min.js"></script>
<script src="https://code.jquery.com/jquery-3.7.1.min.js"></script>
<script src="mqttws31.js" type="text/javascript"></script>
<style>

View File

@ -3,7 +3,7 @@
<head>
<meta http-equiv="Content-Type" content="text/html;charset=utf-8"/>
<title>RabbitMQ Web MQTT Example</title>
<script src="https://code.jquery.com/jquery-3.4.1.min.js"></script>
<script src="https://code.jquery.com/jquery-3.7.1.min.js"></script>
<script src="mqttws31.js" type="text/javascript"></script>
<style>
.box {

View File

@ -1,287 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rfc6455_client).
-export([new/2, new/3, new/4, new/5, open/1, recv/1, send/2, close/1, close/2]).
-record(state, {host, port, addr, path, ppid, socket, data, phase, transport}).
%% --------------------------------------------------------------------------
new(WsUrl, PPid) ->
new(WsUrl, PPid, undefined, [], <<>>).
new(WsUrl, PPid, AuthInfo) ->
new(WsUrl, PPid, AuthInfo, [], <<>>).
new(WsUrl, PPid, AuthInfo, Protocols) ->
new(WsUrl, PPid, AuthInfo, Protocols, <<>>).
new(WsUrl, PPid, AuthInfo, Protocols, TcpPreface) ->
crypto:start(),
application:ensure_all_started(ssl),
{Transport, Url} = case WsUrl of
"ws://" ++ Rest -> {gen_tcp, Rest};
"wss://" ++ SslRest -> {ssl, SslRest}
end,
[Addr, Path] = split("/", Url, 1),
[Host0, MaybePort] = split(":", Addr, 1, empty),
Host = case inet:parse_ipv4_address(Host0) of
{ok, IP} -> IP;
_ -> Host0
end,
Port = case MaybePort of
empty -> 80;
V -> {I, ""} = string:to_integer(V), I
end,
State = #state{host = Host,
port = Port,
addr = Addr,
path = "/" ++ Path,
ppid = PPid,
transport = Transport},
spawn_link(fun () ->
start_conn(State, AuthInfo, Protocols, TcpPreface)
end).
open(WS) ->
receive
{rfc6455, open, WS, Opts} ->
{ok, Opts};
{rfc6455, close, WS, R} ->
{close, R}
end.
recv(WS) ->
receive
{rfc6455, recv, WS, Payload} ->
{ok, Payload};
{rfc6455, recv_binary, WS, Payload} ->
{binary, Payload};
{rfc6455, close, WS, R} ->
{close, R}
end.
send(WS, IoData) ->
WS ! {send, IoData},
ok.
close(WS) ->
close(WS, {1000, ""}).
close(WS, WsReason) ->
WS ! {close, WsReason},
receive
{rfc6455, close, WS, R} ->
{close, R}
end.
%% --------------------------------------------------------------------------
start_conn(State = #state{transport = Transport}, AuthInfo, Protocols, TcpPreface) ->
{ok, Socket} = case TcpPreface of
<<>> ->
TlsOpts = case Transport of
ssl -> [{verify, verify_none}];
_ -> []
end,
Transport:connect(State#state.host, State#state.port,
[binary, {packet, 0}] ++ TlsOpts);
_ ->
{ok, Socket0} = gen_tcp:connect(State#state.host, State#state.port,
[binary,
{packet, 0}]),
gen_tcp:send(Socket0, TcpPreface),
case Transport of
gen_tcp -> {ok, Socket0};
ssl -> Transport:connect(Socket0, [{verify, verify_none}])
end
end,
AuthHd = case AuthInfo of
undefined -> "";
_ ->
Login = proplists:get_value(login, AuthInfo),
Passcode = proplists:get_value(passcode, AuthInfo),
"Authorization: Basic "
++ base64:encode_to_string(Login ++ ":" ++ Passcode)
++ "\r\n"
end,
ProtocolHd = case Protocols of
[] -> "";
_ -> "Sec-Websocket-Protocol: " ++ string:join(Protocols, ", ") ++ "\r\n"
end,
Key = base64:encode_to_string(crypto:strong_rand_bytes(16)),
Transport:send(Socket,
"GET " ++ State#state.path ++ " HTTP/1.1\r\n" ++
"Host: " ++ State#state.addr ++ "\r\n" ++
"Upgrade: websocket\r\n" ++
"Connection: Upgrade\r\n" ++
AuthHd ++
ProtocolHd ++
"Sec-WebSocket-Key: " ++ Key ++ "\r\n" ++
"Origin: null\r\n" ++
"Sec-WebSocket-Version: 13\r\n\r\n"),
loop(State#state{socket = Socket,
data = <<>>,
phase = opening}).
do_recv(State = #state{phase = opening, ppid = PPid, data = Data}) ->
case split("\r\n\r\n", binary_to_list(Data), 1, empty) of
[_Http, empty] -> State;
[Http, Data1] ->
%% TODO: don't ignore http response data, verify key
PPid ! {rfc6455, open, self(), [{http_response, Http}]},
State#state{phase = open,
data = Data1}
end;
do_recv(State = #state{phase = Phase, data = Data, socket = Socket, transport = Transport, ppid = PPid})
when Phase =:= open orelse Phase =:= closing ->
R = case Data of
<<F:1, _:3, O:4, 0:1, L:7, Payload:L/binary, Rest/binary>>
when L < 126 ->
{F, O, Payload, Rest};
<<F:1, _:3, O:4, 0:1, 126:7, L2:16, Payload:L2/binary, Rest/binary>> ->
{F, O, Payload, Rest};
<<F:1, _:3, O:4, 0:1, 127:7, L2:64, Payload:L2/binary, Rest/binary>> ->
{F, O, Payload, Rest};
<<_:1, _:3, _:4, 1:1, _/binary>> ->
%% According o rfc6455 5.1 the server must not mask any frames.
die(Socket, Transport, PPid, {1006, "Protocol error"}, normal);
_ ->
moredata
end,
case R of
moredata ->
State;
_ -> do_recv2(State, R)
end.
do_recv2(State = #state{phase = Phase, socket = Socket, ppid = PPid, transport = Transport}, R) ->
case R of
{1, 1, Payload, Rest} ->
PPid ! {rfc6455, recv, self(), Payload},
State#state{data = Rest};
{1, 2, Payload, Rest} ->
PPid ! {rfc6455, recv_binary, self(), Payload},
State#state{data = Rest};
{1, 8, Payload, _Rest} ->
WsReason = case Payload of
<<WC:16, WR/binary>> -> {WC, WR};
<<>> -> {1005, "No status received"}
end,
case Phase of
open -> %% echo
do_close(State, WsReason),
Transport:close(Socket);
closing ->
ok
end,
die(Socket, Transport, PPid, WsReason, normal);
{_, _, _, _Rest} ->
io:format("Unknown frame type~n"),
die(Socket, Transport, PPid, {1006, "Unknown frame type"}, normal)
end.
encode_frame(F, O, Payload) ->
Mask = crypto:strong_rand_bytes(4),
MaskedPayload = apply_mask(Mask, iolist_to_binary(Payload)),
L = byte_size(MaskedPayload),
IoData = case L of
_ when L < 126 ->
[<<F:1, 0:3, O:4, 1:1, L:7>>, Mask, MaskedPayload];
_ when L < 65536 ->
[<<F:1, 0:3, O:4, 1:1, 126:7, L:16>>, Mask, MaskedPayload];
_ ->
[<<F:1, 0:3, O:4, 1:1, 127:7, L:64>>, Mask, MaskedPayload]
end,
iolist_to_binary(IoData).
do_send(State = #state{socket = Socket, transport = Transport}, Payload) ->
Transport:send(Socket, encode_frame(1, 1, Payload)),
State.
do_close(State = #state{socket = Socket, transport = Transport}, {Code, Reason}) ->
Payload = iolist_to_binary([<<Code:16>>, Reason]),
Transport:send(Socket, encode_frame(1, 8, Payload)),
State#state{phase = closing}.
loop(State = #state{socket = Socket, transport = Transport, ppid = PPid, data = Data,
phase = Phase}) ->
receive
{In, Socket, Bin} when In =:= tcp; In =:= ssl ->
State1 = State#state{data = iolist_to_binary([Data, Bin])},
loop(do_recv(State1));
{send, Payload} when Phase == open ->
loop(do_send(State, Payload));
{Closed, Socket} when Closed =:= tcp_closed; Closed =:= ssl_closed ->
die(Socket, Transport, PPid, {1006, "Connection closed abnormally"}, normal);
{close, WsReason} when Phase == open ->
loop(do_close(State, WsReason));
{Error, Socket, Reason} when Error =:= tcp_error; Error =:= ssl_error ->
die(Socket, Transport, PPid, {1006, "Connection closed abnormally"}, Reason);
Other ->
error({unknown_message, Other, Socket})
end.
die(Socket, Transport, PPid, WsReason, Reason) ->
Transport:shutdown(Socket, read_write),
PPid ! {rfc6455, close, self(), WsReason},
exit(Reason).
%% --------------------------------------------------------------------------
split(SubStr, Str, Limit) ->
split(SubStr, Str, Limit, "").
split(SubStr, Str, Limit, Default) ->
Acc = split(SubStr, Str, Limit, [], Default),
lists:reverse(Acc).
split(_SubStr, Str, 0, Acc, _Default) -> [Str | Acc];
split(SubStr, Str, Limit, Acc, Default) ->
{L, R} = case string:str(Str, SubStr) of
0 -> {Str, Default};
I -> {string:substr(Str, 1, I-1),
string:substr(Str, I+length(SubStr))}
end,
split(SubStr, R, Limit-1, [L | Acc], Default).
apply_mask(Mask, Data) when is_number(Mask) ->
apply_mask(<<Mask:32>>, Data);
apply_mask(<<0:32>>, Data) ->
Data;
apply_mask(Mask, Data) ->
iolist_to_binary(lists:reverse(apply_mask2(Mask, Data, []))).
apply_mask2(M = <<Mask:32>>, <<Data:32, Rest/binary>>, Acc) ->
T = Data bxor Mask,
apply_mask2(M, Rest, [<<T:32>> | Acc]);
apply_mask2(<<Mask:24, _:8>>, <<Data:24>>, Acc) ->
T = Data bxor Mask,
[<<T:24>> | Acc];
apply_mask2(<<Mask:16, _:16>>, <<Data:16>>, Acc) ->
T = Data bxor Mask,
[<<T:16>> | Acc];
apply_mask2(<<Mask:8, _:24>>, <<Data:8>>, Acc) ->
T = Data bxor Mask,
[<<T:8>> | Acc];
apply_mask2(_, <<>>, Acc) ->
Acc.

View File

@ -1,6 +1,6 @@
<!doctype html>
<html><head>
<script src="https://code.jquery.com/jquery-3.4.1.min.js"></script>
<script src="https://code.jquery.com/jquery-3.7.1.min.js"></script>
<script src="stomp.js"></script>
<style>

View File

@ -1,6 +1,6 @@
<!DOCTYPE html>
<html><head>
<script src="https://code.jquery.com/jquery-3.4.1.min.js"></script>
<script src="https://code.jquery.com/jquery-3.7.1.min.js"></script>
<script src="stomp.js"></script>
<style>
.box {

View File

@ -1,6 +1,6 @@
<!DOCTYPE html>
<html><head>
<script src="https://code.jquery.com/jquery-3.4.1.min.js"></script>
<script src="https://code.jquery.com/jquery-3.7.1.min.js"></script>
<script src="stomp.js"></script>
<style>
.box {