Merge pull request #13363 from rabbitmq/loic-dynamic-buffer

Add dynamic socket buffer functionality to rabbit_reader
This commit is contained in:
Loïc Hoguin 2025-02-27 17:30:32 +01:00 committed by GitHub
commit 6c3789ee9a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 94 additions and 13 deletions

View File

@ -59,7 +59,10 @@
buf :: list(),
buf_len :: non_neg_integer(),
tracked_channels = maps:new() :: #{channel_number() => Session :: pid()},
stats_timer :: rabbit_event:state()
stats_timer :: rabbit_event:state(),
%% dynamic buffer
dynamic_buffer_size = 128,
dynamic_buffer_moving_average = 0.0
}).
-type state() :: #v1{}.

View File

@ -111,9 +111,10 @@ recvloop(Deb, State0 = #v1{recv_len = RecvLen,
mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
case rabbit_net:recv(Sock) of
{data, Data} ->
recvloop(Deb, State#v1{buf = [Data | Buf],
buf_len = BufLen + size(Data),
pending_recv = false});
State1 = maybe_resize_buffer(State, Data),
recvloop(Deb, State1#v1{buf = [Data | Buf],
buf_len = BufLen + size(Data),
pending_recv = false});
closed when State#v1.connection_state =:= closed ->
ok;
closed ->
@ -130,6 +131,37 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
end
end.
maybe_resize_buffer(State=#v1{sock=Sock, dynamic_buffer_size=BufferSize0,
dynamic_buffer_moving_average=MovingAvg0}, Data) ->
LowDynamicBuffer = 128,
HighDynamicBuffer = 131072,
DataLen = byte_size(Data),
MovingAvg = (MovingAvg0 * 7 + DataLen) / 8,
if
BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 ->
BufferSize = min(BufferSize0 * 2, HighDynamicBuffer),
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
ok -> State#v1{
dynamic_buffer_size=BufferSize,
dynamic_buffer_moving_average=MovingAvg
};
{error, Reason} ->
throw({inet_error, Reason})
end;
BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 ->
BufferSize = max(BufferSize0 div 2, LowDynamicBuffer),
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
ok -> State#v1{
dynamic_buffer_size=BufferSize,
dynamic_buffer_moving_average=MovingAvg
};
{error, Reason} ->
throw({inet_error, Reason})
end;
true ->
State#v1{dynamic_buffer_moving_average=MovingAvg}
end.
-spec handle_other(any(), state()) -> state() | stop.
handle_other(emit_stats, State) ->
emit_stats(State);

View File

@ -32,7 +32,7 @@
close_connection/2, close_connections/2, close_all_connections/1,
close_all_user_connections/2,
force_connection_event_refresh/1, force_non_amqp_connection_event_refresh/1,
handshake/2, tcp_host/1,
handshake/2, handshake/3, tcp_host/1,
ranch_ref/1, ranch_ref/2, ranch_ref_of_protocol/1,
listener_of_protocol/1, stop_ranch_listener_of_protocol/1,
list_local_connections_of_protocol/1]).
@ -551,6 +551,9 @@ failed_to_recv_proxy_header(Ref, Error) ->
exit({shutdown, failed_to_recv_proxy_header}).
handshake(Ref, ProxyProtocolEnabled) ->
handshake(Ref, ProxyProtocolEnabled, static_buffer).
handshake(Ref, ProxyProtocolEnabled, BufferStrategy) ->
case ProxyProtocolEnabled of
true ->
case ranch:recv_proxy_header(Ref, 3000) of
@ -560,23 +563,29 @@ handshake(Ref, ProxyProtocolEnabled) ->
failed_to_recv_proxy_header(Ref, Error);
{ok, ProxyInfo} ->
{ok, Sock} = ranch:handshake(Ref),
ok = tune_buffer_size(Sock),
ok = tune_buffer_size(Sock, BufferStrategy),
{ok, {rabbit_proxy_socket, Sock, ProxyInfo}}
end;
false ->
{ok, Sock} = ranch:handshake(Ref),
ok = tune_buffer_size(Sock),
ok = tune_buffer_size(Sock, BufferStrategy),
{ok, Sock}
end.
tune_buffer_size(Sock) ->
case tune_buffer_size1(Sock) of
tune_buffer_size(Sock, dynamic_buffer) ->
case rabbit_net:setopts(Sock, [{buffer, 128}]) of
ok -> ok;
{error, _} -> rabbit_net:fast_close(Sock),
exit(normal)
end;
tune_buffer_size(Sock, static_buffer) ->
case tune_buffer_size_static(Sock) of
ok -> ok;
{error, _} -> rabbit_net:fast_close(Sock),
exit(normal)
end.
tune_buffer_size1(Sock) ->
tune_buffer_size_static(Sock) ->
case rabbit_net:getopts(Sock, [sndbuf, recbuf, buffer]) of
{ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
rabbit_net:setopts(Sock, [{buffer, BufSz}]);

View File

@ -99,7 +99,11 @@
%% throttling state, for both
%% credit- and resource-driven flow control
throttle,
proxy_socket}).
proxy_socket,
%% dynamic buffer
dynamic_buffer_size = 128,
dynamic_buffer_moving_average = 0.0
}).
-record(throttle, {
%% never | timestamp()
@ -155,7 +159,8 @@ shutdown(Pid, Explanation) ->
init(Parent, HelperSups, Ref) ->
?LG_PROCESS_TYPE(reader),
{ok, Sock} = rabbit_networking:handshake(Ref,
application:get_env(rabbit, proxy_protocol, false)),
application:get_env(rabbit, proxy_protocol, false),
dynamic_buffer),
Deb = sys:debug_options([]),
start_connection(Parent, HelperSups, Ref, Deb, Sock).
@ -512,8 +517,9 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
end,
case Recv of
{data, Data} ->
State1 = maybe_resize_buffer(State, Data),
recvloop(Deb, [Data | Buf], BufLen + size(Data),
State#v1{pending_recv = false});
State1#v1{pending_recv = false});
closed when State#v1.connection_state =:= closed ->
State;
closed when CS =:= pre_init andalso Buf =:= [] ->
@ -536,6 +542,37 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
end
end.
maybe_resize_buffer(State=#v1{sock=Sock, dynamic_buffer_size=BufferSize0,
dynamic_buffer_moving_average=MovingAvg0}, Data) ->
LowDynamicBuffer = 128,
HighDynamicBuffer = 131072,
DataLen = byte_size(Data),
MovingAvg = (MovingAvg0 * 7 + DataLen) / 8,
if
BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 ->
BufferSize = min(BufferSize0 * 2, HighDynamicBuffer),
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
ok -> State#v1{
dynamic_buffer_size=BufferSize,
dynamic_buffer_moving_average=MovingAvg
};
Error ->
stop(Error, State)
end;
BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 ->
BufferSize = max(BufferSize0 div 2, LowDynamicBuffer),
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
ok -> State#v1{
dynamic_buffer_size=BufferSize,
dynamic_buffer_moving_average=MovingAvg
};
Error ->
stop(Error, State)
end;
true ->
State#v1{dynamic_buffer_moving_average=MovingAvg}
end.
-spec stop(_, #v1{}) -> no_return().
stop(tcp_healthcheck, State) ->
%% The connection was closed before any packet was received. It's