Add dynamic buffer functionality to rabbit_reader

The `buffer` socket option will be changed dynamically
based on how much data is received.

This is restricted to AMQP protocols (old and 1.0).

The algorithm is a little different than Cowboy 2.13.
The moving average is less reactive (div 8 instead of 2)
and floats are used so that using smaller lower buffer
values is possible (otherwise the rounding prevents
increasing buffer sizes). The lower buffer size was
set to 128 as a result.

Compared to the previous which was to set `buffer` to
`rcvbuf` effectively, often to 131072 on Linux for
example, the performance sees a slight improvement
in various scenarios for all message sizes using
AMQP-0.9.1 and a lower memory usage as well. But
the difference is small in the benchmarks we have
run (5% to 10%), whereas Cowboy saw a huge improvement
because its default was very small (1460).

For AMQP-1.0 this seems to be no worse but we didn't
detect a clear improvement. We saw scenarios where
small message sizes showed improvement, and large
message sizes showed a regression. But we are even
less confident with these results. David (AMQP-1.0
native developer) ran a few tests and didn't see a
regression.

The dynamic buffer code is currently identical for
old and 1.0 AMQP. But we might tweak them differently
in the future so they're left as duplicate for now.
This is because different protocols have different
behaviors and so the algorithm may need to be tweaked
differently for each protocol.
This commit is contained in:
Loïc Hoguin 2025-02-18 14:48:00 +01:00
parent cdc042a2fd
commit 53444107b5
No known key found for this signature in database
GPG Key ID: C69E26E3A9DF618F
4 changed files with 94 additions and 13 deletions

View File

@ -59,7 +59,10 @@
buf :: list(),
buf_len :: non_neg_integer(),
tracked_channels = maps:new() :: #{channel_number() => Session :: pid()},
stats_timer :: rabbit_event:state()
stats_timer :: rabbit_event:state(),
%% dynamic buffer
dynamic_buffer_size = 128,
dynamic_buffer_moving_average = 0.0
}).
-type state() :: #v1{}.

View File

@ -111,7 +111,8 @@ recvloop(Deb, State0 = #v1{recv_len = RecvLen,
mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
case rabbit_net:recv(Sock) of
{data, Data} ->
recvloop(Deb, State#v1{buf = [Data | Buf],
State1 = maybe_resize_buffer(State, Data),
recvloop(Deb, State1#v1{buf = [Data | Buf],
buf_len = BufLen + size(Data),
pending_recv = false});
closed when State#v1.connection_state =:= closed ->
@ -130,6 +131,37 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
end
end.
maybe_resize_buffer(State=#v1{sock=Sock, dynamic_buffer_size=BufferSize0,
dynamic_buffer_moving_average=MovingAvg0}, Data) ->
LowDynamicBuffer = 128,
HighDynamicBuffer = 131072,
DataLen = byte_size(Data),
MovingAvg = (MovingAvg0 * 7 + DataLen) / 8,
if
BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 ->
BufferSize = min(BufferSize0 * 2, HighDynamicBuffer),
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
ok -> State#v1{
dynamic_buffer_size=BufferSize,
dynamic_buffer_moving_average=MovingAvg
};
{error, Reason} ->
throw({inet_error, Reason})
end;
BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 ->
BufferSize = max(BufferSize0 div 2, LowDynamicBuffer),
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
ok -> State#v1{
dynamic_buffer_size=BufferSize,
dynamic_buffer_moving_average=MovingAvg
};
{error, Reason} ->
throw({inet_error, Reason})
end;
true ->
State#v1{dynamic_buffer_moving_average=MovingAvg}
end.
-spec handle_other(any(), state()) -> state() | stop.
handle_other(emit_stats, State) ->
emit_stats(State);

View File

@ -32,7 +32,7 @@
close_connection/2, close_connections/2, close_all_connections/1,
close_all_user_connections/2,
force_connection_event_refresh/1, force_non_amqp_connection_event_refresh/1,
handshake/2, tcp_host/1,
handshake/2, handshake/3, tcp_host/1,
ranch_ref/1, ranch_ref/2, ranch_ref_of_protocol/1,
listener_of_protocol/1, stop_ranch_listener_of_protocol/1,
list_local_connections_of_protocol/1]).
@ -551,6 +551,9 @@ failed_to_recv_proxy_header(Ref, Error) ->
exit({shutdown, failed_to_recv_proxy_header}).
handshake(Ref, ProxyProtocolEnabled) ->
handshake(Ref, ProxyProtocolEnabled, static_buffer).
handshake(Ref, ProxyProtocolEnabled, BufferStrategy) ->
case ProxyProtocolEnabled of
true ->
case ranch:recv_proxy_header(Ref, 3000) of
@ -560,23 +563,29 @@ handshake(Ref, ProxyProtocolEnabled) ->
failed_to_recv_proxy_header(Ref, Error);
{ok, ProxyInfo} ->
{ok, Sock} = ranch:handshake(Ref),
ok = tune_buffer_size(Sock),
ok = tune_buffer_size(Sock, BufferStrategy),
{ok, {rabbit_proxy_socket, Sock, ProxyInfo}}
end;
false ->
{ok, Sock} = ranch:handshake(Ref),
ok = tune_buffer_size(Sock),
ok = tune_buffer_size(Sock, BufferStrategy),
{ok, Sock}
end.
tune_buffer_size(Sock) ->
case tune_buffer_size1(Sock) of
tune_buffer_size(Sock, dynamic_buffer) ->
case rabbit_net:setopts(Sock, [{buffer, 128}]) of
ok -> ok;
{error, _} -> rabbit_net:fast_close(Sock),
exit(normal)
end;
tune_buffer_size(Sock, static_buffer) ->
case tune_buffer_size_static(Sock) of
ok -> ok;
{error, _} -> rabbit_net:fast_close(Sock),
exit(normal)
end.
tune_buffer_size1(Sock) ->
tune_buffer_size_static(Sock) ->
case rabbit_net:getopts(Sock, [sndbuf, recbuf, buffer]) of
{ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
rabbit_net:setopts(Sock, [{buffer, BufSz}]);

View File

@ -99,7 +99,11 @@
%% throttling state, for both
%% credit- and resource-driven flow control
throttle,
proxy_socket}).
proxy_socket,
%% dynamic buffer
dynamic_buffer_size = 128,
dynamic_buffer_moving_average = 0.0
}).
-record(throttle, {
%% never | timestamp()
@ -155,7 +159,8 @@ shutdown(Pid, Explanation) ->
init(Parent, HelperSups, Ref) ->
?LG_PROCESS_TYPE(reader),
{ok, Sock} = rabbit_networking:handshake(Ref,
application:get_env(rabbit, proxy_protocol, false)),
application:get_env(rabbit, proxy_protocol, false),
dynamic_buffer),
Deb = sys:debug_options([]),
start_connection(Parent, HelperSups, Ref, Deb, Sock).
@ -512,8 +517,9 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
end,
case Recv of
{data, Data} ->
State1 = maybe_resize_buffer(State, Data),
recvloop(Deb, [Data | Buf], BufLen + size(Data),
State#v1{pending_recv = false});
State1#v1{pending_recv = false});
closed when State#v1.connection_state =:= closed ->
State;
closed when CS =:= pre_init andalso Buf =:= [] ->
@ -536,6 +542,37 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
end
end.
maybe_resize_buffer(State=#v1{sock=Sock, dynamic_buffer_size=BufferSize0,
dynamic_buffer_moving_average=MovingAvg0}, Data) ->
LowDynamicBuffer = 128,
HighDynamicBuffer = 131072,
DataLen = byte_size(Data),
MovingAvg = (MovingAvg0 * 7 + DataLen) / 8,
if
BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 ->
BufferSize = min(BufferSize0 * 2, HighDynamicBuffer),
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
ok -> State#v1{
dynamic_buffer_size=BufferSize,
dynamic_buffer_moving_average=MovingAvg
};
Error ->
stop(Error, State)
end;
BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 ->
BufferSize = max(BufferSize0 div 2, LowDynamicBuffer),
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
ok -> State#v1{
dynamic_buffer_size=BufferSize,
dynamic_buffer_moving_average=MovingAvg
};
Error ->
stop(Error, State)
end;
true ->
State#v1{dynamic_buffer_moving_average=MovingAvg}
end.
-spec stop(_, #v1{}) -> no_return().
stop(tcp_healthcheck, State) ->
%% The connection was closed before any packet was received. It's