Enforce AMQP 1.0 channel-max (#12221)

* Enforce AMQP 1.0 channel-max

Enforce AMQP 1.0 field `channel-max` in the `open` frame by introducing
a new more user friendly setting called `session_max`:
> The channel-max value is the highest channel number that can be used on the connection.
> This value plus one is the maximum number of sessions that can be simultaneously active on the connection.

We set the default value of `session_max` to 64 such that, by
default, RabbitMQ 4.0 allows maximum 64 AMQP 1.0 sessions per AMQP 1.0 connection.

More than 64 AMQP 1.0 sessions per connection make little sense.
See also https://www.rabbitmq.com/blog/2024/09/02/amqp-flow-control#session

Limiting the maximum number of sessions per connection can be useful to
protect against
* applications that accidentally open new sessions without ending old sessions
  (session leaks)
* too many metrics being exposed, for example in the future via the
  "/metrics/per-object" Prometheus endpoint with timeseries per session
  being emitted.

This commit does not make use of the existing `channel_max` setting
because:
1. Given that `channel_max = 0` means "no limit", there is no way for an
   operator to limit the number of sessions per connections to 1.
2. Operators might want to set different limits for maximum number of
   AMQP 0.9.1 channels and maximum number of AMQP 1.0 sessions.
3. The default of `channel_max` is very high: It allows using more than
   2,000 AMQP 0.9.1 channels per connection. Lowering this default might
   break existing AMQP 0.9.1 applications.

This commit also fixes a bug in the AMQP 1.0 Erlang client which, prior
to this commit used channel number 1 for the first session. That's wrong
if a broker allows maximum 1 session by replying with `channel-max = 0`
in the `open` frame. Additionally, the spec recommends:
> To make it easier to monitor AMQP sessions, it is RECOMMENDED that implementations always assign the lowest available unused channel number.

Note that in AMQP 0.9.1, channel number 0 has a special meaning:
> The channel number is 0 for all frames which are global to the connection and 1-65535 for frames that
refer to specific channels.

* Apply PR feedback
This commit is contained in:
David Ansari 2024-09-05 17:45:27 +02:00 committed by GitHub
parent 30c82d1396
commit c2ce905797
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 111 additions and 50 deletions

View File

@ -76,7 +76,7 @@
}.
-record(state,
{next_channel = 1 :: pos_integer(),
{next_channel = 0 :: non_neg_integer(),
connection_sup :: pid(),
reader_m_ref :: reference() | undefined,
sessions_sup :: pid() | undefined,

View File

@ -719,14 +719,14 @@ insufficient_credit(Config) ->
OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) ->
{Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]}
end,
BeginStep = fun({1 = Ch, #'v1_0.begin'{}, _Pay}) ->
{Ch, [#'v1_0.begin'{remote_channel = {ushort, 1},
BeginStep = fun({0 = Ch, #'v1_0.begin'{}, _Pay}) ->
{Ch, [#'v1_0.begin'{remote_channel = {ushort, Ch},
next_outgoing_id = {uint, 1},
incoming_window = {uint, 1000},
outgoing_window = {uint, 1000}}
]}
end,
AttachStep = fun({1 = Ch, #'v1_0.attach'{role = false,
AttachStep = fun({0 = Ch, #'v1_0.attach'{role = false,
name = Name}, <<>>}) ->
{Ch, [#'v1_0.attach'{name = Name,
handle = {uint, 99},
@ -759,14 +759,14 @@ multi_transfer_without_delivery_id(Config) ->
OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) ->
{Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]}
end,
BeginStep = fun({1 = Ch, #'v1_0.begin'{}, _Pay}) ->
{Ch, [#'v1_0.begin'{remote_channel = {ushort, 1},
BeginStep = fun({0 = Ch, #'v1_0.begin'{}, _Pay}) ->
{Ch, [#'v1_0.begin'{remote_channel = {ushort, Ch},
next_outgoing_id = {uint, 1},
incoming_window = {uint, 1000},
outgoing_window = {uint, 1000}}
]}
end,
AttachStep = fun({1 = Ch, #'v1_0.attach'{role = true,
AttachStep = fun({0 = Ch, #'v1_0.attach'{role = true,
name = Name}, <<>>}) ->
{Ch, [#'v1_0.attach'{name = Name,
handle = {uint, 99},
@ -775,7 +775,7 @@ multi_transfer_without_delivery_id(Config) ->
]}
end,
LinkCreditStep = fun({1 = Ch, #'v1_0.flow'{}, <<>>}) ->
LinkCreditStep = fun({0 = Ch, #'v1_0.flow'{}, <<>>}) ->
{Ch, {multi, [[#'v1_0.transfer'{handle = {uint, 99},
delivery_id = {uint, 12},
more = true},

View File

@ -43,6 +43,7 @@ _APP_ENV = """[
{frame_max, 131072},
%% see rabbitmq-server#1593
{channel_max, 2047},
{session_max, 64},
{ranch_connection_max, infinity},
{heartbeat, 60},
{msg_store_file_size_limit, 16777216},

View File

@ -23,6 +23,7 @@ define PROJECT_ENV
{frame_max, 131072},
%% see rabbitmq-server#1593
{channel_max, 2047},
{session_max, 64},
{ranch_connection_max, infinity},
{heartbeat, 60},
{msg_store_file_size_limit, 16777216},

View File

@ -948,6 +948,13 @@ end}.
end
}.
%% Sets the maximum number of AMQP 1.0 sessions that can be simultaneously
%% active on an AMQP 1.0 connection.
%%
%% {session_max, 1},
{mapping, "session_max", "rabbit.session_max",
[{datatype, integer}, {validators, ["positive_16_bit_integer"]}]}.
%% Set the max permissible number of client connections per node.
%% `infinity` means "no limit".
%%
@ -2429,7 +2436,7 @@ end}.
{mapping, "raft.segment_max_entries", "ra.segment_max_entries", [
{datatype, integer},
{validators, ["non_zero_positive_integer", "non_zero_positive_16_bit_integer"]}
{validators, ["non_zero_positive_integer", "positive_16_bit_integer"]}
]}.
{translation, "ra.segment_max_entries",
@ -2736,7 +2743,7 @@ fun(Int) when is_integer(Int) ->
Int >= 1
end}.
{validator, "non_zero_positive_16_bit_integer", "number should be between 1 and 65535",
{validator, "positive_16_bit_integer", "number should be between 1 and 65535",
fun(Int) when is_integer(Int) ->
(Int >= 1) and (Int =< 65535)
end}.

View File

@ -114,6 +114,7 @@ unpack_from_0_9_1(
timeout = ?NORMAL_TIMEOUT,
incoming_max_frame_size = ?INITIAL_MAX_FRAME_SIZE,
outgoing_max_frame_size = ?INITIAL_MAX_FRAME_SIZE,
%% "Prior to any explicit negotiation, [...] the maximum channel number is 0." [2.4.1]
channel_max = 0,
auth_mechanism = sasl_init_unprocessed,
auth_state = unauthenticated}}.
@ -292,7 +293,7 @@ handle_session_exit(ChannelNum, SessionPid, Reason, State0) ->
"Session error: ~tp",
[Reason])
end,
handle_exception(State, SessionPid, R)
handle_exception(State, ChannelNum, R)
end,
maybe_close(S).
@ -318,19 +319,19 @@ error_frame(Condition, Fmt, Args) ->
handle_exception(State = #v1{connection_state = closed}, Channel,
#'v1_0.error'{description = {utf8, Desc}}) ->
rabbit_log_connection:error(
"Error on AMQP 1.0 connection ~tp (~tp), channel ~tp:~n~tp",
"Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp",
[self(), closed, Channel, Desc]),
State;
handle_exception(State = #v1{connection_state = CS}, Channel,
Error = #'v1_0.error'{description = {utf8, Desc}})
when ?IS_RUNNING(State) orelse CS =:= closing ->
rabbit_log_connection:error(
"Error on AMQP 1.0 connection ~tp (~tp), channel ~tp:~n~tp",
"Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp",
[self(), CS, Channel, Desc]),
close(Error, State);
handle_exception(State, Channel, Error) ->
handle_exception(State, _Channel, Error) ->
silent_close_delay(),
throw({handshake_error, State#v1.connection_state, Channel, Error}).
throw({handshake_error, State#v1.connection_state, Error}).
is_connection_frame(#'v1_0.open'{}) -> true;
is_connection_frame(#'v1_0.close'{}) -> true;
@ -341,21 +342,30 @@ handle_frame(Mode, Channel, Body, State) ->
handle_frame0(Mode, Channel, Body, State)
catch
_:#'v1_0.error'{} = Reason ->
handle_exception(State, 0, Reason);
handle_exception(State, Channel, Reason);
_:{error, {not_allowed, Username}} ->
%% section 2.8.15 in http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-complete-v1.0-os.pdf
handle_exception(State, 0, error_frame(
?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
"Access for user '~ts' was refused: insufficient permissions",
[Username]));
handle_exception(State,
Channel,
error_frame(
?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
"Access for user '~ts' was refused: insufficient permissions",
[Username]));
_:Reason:Trace ->
handle_exception(State, 0, error_frame(
?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
"Reader error: ~tp~n~tp",
[Reason, Trace]))
handle_exception(State,
Channel,
error_frame(
?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
"Reader error: ~tp~n~tp",
[Reason, Trace]))
end.
%% Nothing specifies that connection methods have to be on a particular channel.
handle_frame0(amqp, Channel, _Body,
#v1{connection = #v1_connection{channel_max = ChannelMax}})
when Channel > ChannelMax ->
protocol_error(?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
"channel number (~b) exceeds maximum channel number (~b)",
[Channel, ChannelMax]);
handle_frame0(_Mode, Channel, Body,
State = #v1{connection_state = CS})
when CS =:= closing orelse
@ -466,20 +476,25 @@ handle_connection_frame(
SendTimeoutSec, SendFun,
ReceiveTimeoutSec, ReceiveFun),
{ok, IncomingMaxFrameSize} = application:get_env(rabbit, frame_max),
%% TODO enforce channel_max
ChannelMax = case ClientChannelMax of
undefined ->
%% default as per 2.7.1
16#ff_ff;
{ushort, N} ->
N
end,
{ok, SessionMax} = application:get_env(rabbit, session_max),
%% "The channel-max value is the highest channel number that can be used on the connection.
%% This value plus one is the maximum number of sessions that can be simultaneously active
%% on the connection." [2.7.1]
ChannelMax = SessionMax - 1,
%% Assert config is valid.
true = ChannelMax >= 0 andalso ChannelMax =< 16#ff_ff,
EffectiveChannelMax = case ClientChannelMax of
undefined ->
ChannelMax;
{ushort, N} ->
min(N, ChannelMax)
end,
State1 = State0#v1{connection_state = running,
connection = Connection#v1_connection{
vhost = Vhost,
incoming_max_frame_size = IncomingMaxFrameSize,
outgoing_max_frame_size = OutgoingMaxFrameSize,
channel_max = ChannelMax,
channel_max = EffectiveChannelMax,
properties = Properties,
timeout = ReceiveTimeoutMillis},
heartbeater = Heartbeater},
@ -504,7 +519,7 @@ handle_connection_frame(
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay
{symbol, <<"ANONYMOUS-RELAY">>}],
Open = #'v1_0.open'{
channel_max = ClientChannelMax,
channel_max = {ushort, EffectiveChannelMax},
max_frame_size = {uint, IncomingMaxFrameSize},
%% "the value in idle-time-out SHOULD be half the peer's actual timeout threshold" [2.4.5]
idle_time_out = {uint, ReceiveTimeoutMillis div 2},

View File

@ -376,10 +376,13 @@ process_frame(Pid, FrameBody) ->
gen_server:cast(Pid, {frame_body, FrameBody}).
init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
#'v1_0.begin'{next_outgoing_id = ?UINT(RemoteNextOutgoingId),
incoming_window = ?UINT(RemoteIncomingWindow),
outgoing_window = ?UINT(RemoteOutgoingWindow),
handle_max = HandleMax0}}) ->
#'v1_0.begin'{
%% "If a session is locally initiated, the remote-channel MUST NOT be set." [2.7.2]
remote_channel = undefined,
next_outgoing_id = ?UINT(RemoteNextOutgoingId),
incoming_window = ?UINT(RemoteIncomingWindow),
outgoing_window = ?UINT(RemoteOutgoingWindow),
handle_max = HandleMax0}}) ->
process_flag(trap_exit, true),
process_flag(message_queue_data, off_heap),
@ -406,11 +409,14 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
?UINT(Max) -> Max;
_ -> ?DEFAULT_MAX_HANDLE
end,
Reply = #'v1_0.begin'{remote_channel = {ushort, ChannelNum},
handle_max = ?UINT(HandleMax),
next_outgoing_id = ?UINT(NextOutgoingId),
incoming_window = ?UINT(IncomingWindow),
outgoing_window = ?UINT_OUTGOING_WINDOW},
Reply = #'v1_0.begin'{
%% "When an endpoint responds to a remotely initiated session, the remote-channel
%% MUST be set to the channel on which the remote session sent the begin." [2.7.2]
remote_channel = {ushort, ChannelNum},
handle_max = ?UINT(HandleMax),
next_outgoing_id = ?UINT(NextOutgoingId),
incoming_window = ?UINT(IncomingWindow),
outgoing_window = ?UINT_OUTGOING_WINDOW},
rabbit_amqp_writer:send_command(WriterPid, ChannelNum, Reply),
{ok, #state{next_incoming_id = RemoteNextOutgoingId,

View File

@ -441,7 +441,7 @@ log_connection_exception(Severity, Name, ConnectedAt, {connection_closed_abruptl
log_connection_exception_with_severity(Severity, Fmt,
[self(), Name, ConnDuration]);
%% failed connection.tune negotiations
log_connection_exception(Severity, Name, ConnectedAt, {handshake_error, tuning, _Channel,
log_connection_exception(Severity, Name, ConnectedAt, {handshake_error, tuning,
{exit, #amqp_error{explanation = Explanation},
_Method, _Stacktrace}}) ->
ConnDuration = connection_duration(ConnectedAt),
@ -873,11 +873,11 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol,
" user: '~ts', state: ~tp):~n~ts",
[self(), ConnName, User#user.username, tuning, ErrMsg]),
send_error_on_channel0_and_close(Channel, Protocol, Reason, State);
handle_exception(State, Channel, Reason) ->
handle_exception(State, _Channel, Reason) ->
%% We don't trust the client at this point - force them to wait
%% for a bit so they can't DOS us with repeated failed logins etc.
timer:sleep(?SILENT_CLOSE_DELAY * 1000),
throw({handshake_error, State#v1.connection_state, Channel, Reason}).
throw({handshake_error, State#v1.connection_state, Reason}).
%% we've "lost sync" with the client and hence must not accept any
%% more input

View File

@ -140,7 +140,8 @@ groups() ->
incoming_window_closed_rabbitmq_internal_flow_classic_queue,
incoming_window_closed_rabbitmq_internal_flow_quorum_queue,
tcp_back_pressure_rabbitmq_internal_flow_classic_queue,
tcp_back_pressure_rabbitmq_internal_flow_quorum_queue
tcp_back_pressure_rabbitmq_internal_flow_quorum_queue,
session_max
]},
{cluster_size_3, [shuffle],
@ -4168,7 +4169,7 @@ trace(Config) ->
<<"connection">> := <<"127.0.0.1:", _/binary>>,
<<"node">> := Node,
<<"vhost">> := <<"/">>,
<<"channel">> := 1,
<<"channel">> := 0,
<<"user">> := <<"guest">>,
<<"properties">> := #{<<"correlation_id">> := CorrelationId},
<<"routed_queues">> := [Q]},
@ -4183,7 +4184,7 @@ trace(Config) ->
<<"connection">> := <<"127.0.0.1:", _/binary>>,
<<"node">> := Node,
<<"vhost">> := <<"/">>,
<<"channel">> := 2,
<<"channel">> := 1,
<<"user">> := <<"guest">>,
<<"properties">> := #{<<"correlation_id">> := CorrelationId},
<<"redelivered">> := 0},
@ -5621,6 +5622,28 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) ->
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).
session_max(Config) ->
App = rabbit,
Par = session_max,
{ok, Default} = rpc(Config, application, get_env, [App, Par]),
%% Let's allow only 1 session per connection.
ok = rpc(Config, application, set_env, [App, Par, 1]),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
%% The 1st session should succeed.
{ok, _Session1} = amqp10_client:begin_session_sync(Connection),
%% The 2nd session should fail.
{ok, _Session2} = amqp10_client:begin_session(Connection),
receive {amqp10_event, {connection, Connection, {closed, Reason}}} ->
?assertEqual(
{framing_error, <<"channel number (1) exceeds maximum channel number (0)">>},
Reason)
after 5000 -> ct:fail(missing_closed)
end,
ok = rpc(Config, application, set_env, [App, Par, Default]).
%% internal
%%

View File

@ -429,6 +429,14 @@ tcp_listen_options.exit_on_close = false",
"channel_max_per_node = infinity",
[{rabbit,[{channel_max_per_node, infinity}]}],
[]},
{session_max_1,
"session_max = 1",
[{rabbit,[{session_max, 1}]}],
[]},
{session_max,
"session_max = 65000",
[{rabbit,[{session_max, 65000}]}],
[]},
{consumer_max_per_channel,
"consumer_max_per_channel = 16",
[{rabbit,[{consumer_max_per_channel, 16}]}],