diff --git a/deps/rabbit/include/rabbit_amqp_reader.hrl b/deps/rabbit/include/rabbit_amqp_reader.hrl index 732bc9f043..4b1500d00e 100644 --- a/deps/rabbit/include/rabbit_amqp_reader.hrl +++ b/deps/rabbit/include/rabbit_amqp_reader.hrl @@ -59,7 +59,10 @@ buf :: list(), buf_len :: non_neg_integer(), tracked_channels = maps:new() :: #{channel_number() => Session :: pid()}, - stats_timer :: rabbit_event:state() + stats_timer :: rabbit_event:state(), + %% dynamic buffer + dynamic_buffer_size = 128, + dynamic_buffer_moving_average = 0.0 }). -type state() :: #v1{}. diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index 3e5d5cc08d..b92ba8d3ce 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -111,9 +111,10 @@ recvloop(Deb, State0 = #v1{recv_len = RecvLen, mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> case rabbit_net:recv(Sock) of {data, Data} -> - recvloop(Deb, State#v1{buf = [Data | Buf], - buf_len = BufLen + size(Data), - pending_recv = false}); + State1 = maybe_resize_buffer(State, Data), + recvloop(Deb, State1#v1{buf = [Data | Buf], + buf_len = BufLen + size(Data), + pending_recv = false}); closed when State#v1.connection_state =:= closed -> ok; closed -> @@ -130,6 +131,37 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> end end. +maybe_resize_buffer(State=#v1{sock=Sock, dynamic_buffer_size=BufferSize0, + dynamic_buffer_moving_average=MovingAvg0}, Data) -> + LowDynamicBuffer = 128, + HighDynamicBuffer = 131072, + DataLen = byte_size(Data), + MovingAvg = (MovingAvg0 * 7 + DataLen) / 8, + if + BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 -> + BufferSize = min(BufferSize0 * 2, HighDynamicBuffer), + case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of + ok -> State#v1{ + dynamic_buffer_size=BufferSize, + dynamic_buffer_moving_average=MovingAvg + }; + {error, Reason} -> + throw({inet_error, Reason}) + end; + BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 -> + BufferSize = max(BufferSize0 div 2, LowDynamicBuffer), + case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of + ok -> State#v1{ + dynamic_buffer_size=BufferSize, + dynamic_buffer_moving_average=MovingAvg + }; + {error, Reason} -> + throw({inet_error, Reason}) + end; + true -> + State#v1{dynamic_buffer_moving_average=MovingAvg} + end. + -spec handle_other(any(), state()) -> state() | stop. handle_other(emit_stats, State) -> emit_stats(State); diff --git a/deps/rabbit/src/rabbit_networking.erl b/deps/rabbit/src/rabbit_networking.erl index 16576f9b6b..a2a01ab822 100644 --- a/deps/rabbit/src/rabbit_networking.erl +++ b/deps/rabbit/src/rabbit_networking.erl @@ -32,7 +32,7 @@ close_connection/2, close_connections/2, close_all_connections/1, close_all_user_connections/2, force_connection_event_refresh/1, force_non_amqp_connection_event_refresh/1, - handshake/2, tcp_host/1, + handshake/2, handshake/3, tcp_host/1, ranch_ref/1, ranch_ref/2, ranch_ref_of_protocol/1, listener_of_protocol/1, stop_ranch_listener_of_protocol/1, list_local_connections_of_protocol/1]). @@ -551,6 +551,9 @@ failed_to_recv_proxy_header(Ref, Error) -> exit({shutdown, failed_to_recv_proxy_header}). handshake(Ref, ProxyProtocolEnabled) -> + handshake(Ref, ProxyProtocolEnabled, static_buffer). + +handshake(Ref, ProxyProtocolEnabled, BufferStrategy) -> case ProxyProtocolEnabled of true -> case ranch:recv_proxy_header(Ref, 3000) of @@ -560,23 +563,29 @@ handshake(Ref, ProxyProtocolEnabled) -> failed_to_recv_proxy_header(Ref, Error); {ok, ProxyInfo} -> {ok, Sock} = ranch:handshake(Ref), - ok = tune_buffer_size(Sock), + ok = tune_buffer_size(Sock, BufferStrategy), {ok, {rabbit_proxy_socket, Sock, ProxyInfo}} end; false -> {ok, Sock} = ranch:handshake(Ref), - ok = tune_buffer_size(Sock), + ok = tune_buffer_size(Sock, BufferStrategy), {ok, Sock} end. -tune_buffer_size(Sock) -> - case tune_buffer_size1(Sock) of +tune_buffer_size(Sock, dynamic_buffer) -> + case rabbit_net:setopts(Sock, [{buffer, 128}]) of + ok -> ok; + {error, _} -> rabbit_net:fast_close(Sock), + exit(normal) + end; +tune_buffer_size(Sock, static_buffer) -> + case tune_buffer_size_static(Sock) of ok -> ok; {error, _} -> rabbit_net:fast_close(Sock), exit(normal) end. -tune_buffer_size1(Sock) -> +tune_buffer_size_static(Sock) -> case rabbit_net:getopts(Sock, [sndbuf, recbuf, buffer]) of {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]), rabbit_net:setopts(Sock, [{buffer, BufSz}]); diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index 723ca4b5df..276b6fa03f 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -99,7 +99,11 @@ %% throttling state, for both %% credit- and resource-driven flow control throttle, - proxy_socket}). + proxy_socket, + %% dynamic buffer + dynamic_buffer_size = 128, + dynamic_buffer_moving_average = 0.0 +}). -record(throttle, { %% never | timestamp() @@ -155,7 +159,8 @@ shutdown(Pid, Explanation) -> init(Parent, HelperSups, Ref) -> ?LG_PROCESS_TYPE(reader), {ok, Sock} = rabbit_networking:handshake(Ref, - application:get_env(rabbit, proxy_protocol, false)), + application:get_env(rabbit, proxy_protocol, false), + dynamic_buffer), Deb = sys:debug_options([]), start_connection(Parent, HelperSups, Ref, Deb, Sock). @@ -512,8 +517,9 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock, end, case Recv of {data, Data} -> + State1 = maybe_resize_buffer(State, Data), recvloop(Deb, [Data | Buf], BufLen + size(Data), - State#v1{pending_recv = false}); + State1#v1{pending_recv = false}); closed when State#v1.connection_state =:= closed -> State; closed when CS =:= pre_init andalso Buf =:= [] -> @@ -536,6 +542,37 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock, end end. +maybe_resize_buffer(State=#v1{sock=Sock, dynamic_buffer_size=BufferSize0, + dynamic_buffer_moving_average=MovingAvg0}, Data) -> + LowDynamicBuffer = 128, + HighDynamicBuffer = 131072, + DataLen = byte_size(Data), + MovingAvg = (MovingAvg0 * 7 + DataLen) / 8, + if + BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 -> + BufferSize = min(BufferSize0 * 2, HighDynamicBuffer), + case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of + ok -> State#v1{ + dynamic_buffer_size=BufferSize, + dynamic_buffer_moving_average=MovingAvg + }; + Error -> + stop(Error, State) + end; + BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 -> + BufferSize = max(BufferSize0 div 2, LowDynamicBuffer), + case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of + ok -> State#v1{ + dynamic_buffer_size=BufferSize, + dynamic_buffer_moving_average=MovingAvg + }; + Error -> + stop(Error, State) + end; + true -> + State#v1{dynamic_buffer_moving_average=MovingAvg} + end. + -spec stop(_, #v1{}) -> no_return(). stop(tcp_healthcheck, State) -> %% The connection was closed before any packet was received. It's