From e31131dca1edcad6cf43559684f76a7fb673861e Mon Sep 17 00:00:00 2001 From: ANycz Date: Wed, 11 Dec 2019 21:35:53 +0100 Subject: [PATCH] make garbage collector threshold configurable --- deps/rabbit_common/src/rabbit_writer.erl | 74 +++++++++++++++++------- 1 file changed, 54 insertions(+), 20 deletions(-) diff --git a/deps/rabbit_common/src/rabbit_writer.erl b/deps/rabbit_common/src/rabbit_writer.erl index 7b8757d9bb..3913c53688 100644 --- a/deps/rabbit_common/src/rabbit_writer.erl +++ b/deps/rabbit_common/src/rabbit_writer.erl @@ -36,7 +36,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/6, start_link/6, start/7, start_link/7]). +-export([start/6, start_link/6, start/7, start_link/7, start/8, start_link/8]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -46,7 +46,7 @@ send_command_flow/2, send_command_flow/3, flush/1]). -export([internal_send_command/4, internal_send_command/6]). --export([msg_size/1, maybe_gc_large_msg/1]). +-export([msg_size/1, maybe_gc_large_msg/1, maybe_gc_large_msg/2]). %% internal -export([enter_mainloop/2, mainloop/2, mainloop1/2]). @@ -66,10 +66,13 @@ stats_timer, %% data pending delivery (between socket %% flushes) - pending + pending, + %% defines how ofter gc will be executed + gc_threshold }). -define(HIBERNATE_AFTER, 5000). +-define(GC_THRESHOLD, 1000000). %%--------------------------------------------------------------------------- @@ -93,6 +96,16 @@ non_neg_integer(), rabbit_types:protocol(), pid(), rabbit_types:proc_name(), boolean()) -> rabbit_types:ok(pid()). +-spec start + (rabbit_net:socket(), rabbit_channel:channel_number(), + non_neg_integer(), rabbit_types:protocol(), pid(), + rabbit_types:proc_name(), boolean(), undefined|non_neg_integer()) -> + rabbit_types:ok(pid()). +-spec start_link + (rabbit_net:socket(), rabbit_channel:channel_number(), + non_neg_integer(), rabbit_types:protocol(), pid(), + rabbit_types:proc_name(), boolean(), undefined|non_neg_integer()) -> + rabbit_types:ok(pid()). -spec system_code_change(_,_,_,_) -> {'ok',_}. -spec system_continue(_,_,#wstate{}) -> any(). @@ -132,6 +145,9 @@ -spec maybe_gc_large_msg (rabbit_types:content() | rabbit_types:message()) -> non_neg_integer(). +-spec maybe_gc_large_msg + (rabbit_types:content() | rabbit_types:message(), + undefined | non_neg_integer()) -> undefined | non_neg_integer(). %%--------------------------------------------------------------------------- @@ -143,26 +159,38 @@ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity) -> start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, ReaderWantsStats) -> - State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, - ReaderWantsStats), - {ok, proc_lib:spawn(?MODULE, enter_mainloop, [Identity, State])}. + start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, + ReaderWantsStats, ?GC_THRESHOLD). start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, ReaderWantsStats) -> + start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, + ReaderWantsStats, ?GC_THRESHOLD). + +start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, + ReaderWantsStats, GCThreshold) -> State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, - ReaderWantsStats), + ReaderWantsStats, GCThreshold), + {ok, proc_lib:spawn(?MODULE, enter_mainloop, [Identity, State])}. + +start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, + ReaderWantsStats, GCThreshold) -> + State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, + ReaderWantsStats, GCThreshold), {ok, proc_lib:spawn_link(?MODULE, enter_mainloop, [Identity, State])}. -initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> +initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats + ,GCThreshold) -> (case ReaderWantsStats of true -> fun rabbit_event:init_stats_timer/2; false -> fun rabbit_event:init_disabled_stats_timer/2 - end)(#wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol, - reader = ReaderPid, - pending = []}, + end)(#wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax, + protocol = Protocol, + reader = ReaderPid, + pending = [], + gc_threshold = GCThreshold}, #wstate.stats_timer). system_continue(Parent, Deb, State) -> @@ -335,13 +363,14 @@ internal_send_command_async(MethodRecord, maybe_flush(State#wstate{pending = [Frame | Pending]}). internal_send_command_async(MethodRecord, Content, - State = #wstate{channel = Channel, - frame_max = FrameMax, - protocol = Protocol, - pending = Pending}) -> + State = #wstate{channel = Channel, + frame_max = FrameMax, + protocol = Protocol, + pending = Pending, + gc_threshold = GCThreshold}) -> Frames = assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol), - maybe_gc_large_msg(Content), + maybe_gc_large_msg(Content, GCThreshold), maybe_flush(State#wstate{pending = [Frames | Pending]}). %% When the amount of protocol method data buffered exceeds @@ -395,13 +424,18 @@ port_cmd(Sock, Data) -> %% message is 1MB then that's ugly). So count how many bytes of %% message we have processed, and force a GC every so often. maybe_gc_large_msg(Content) -> + maybe_gc_large_msg(Content, ?GC_THRESHOLD). + +maybe_gc_large_msg(_Content, undefined) -> + undefined; +maybe_gc_large_msg(Content, GCThreshold) -> Size = msg_size(Content), Current = case get(msg_size_for_gc) of undefined -> 0; C -> C end, New = Current + Size, - put(msg_size_for_gc, case New > 1000000 of + put(msg_size_for_gc, case New > GCThreshold of true -> erlang:garbage_collect(), 0; false -> New