Merge branch 'configurable_gc_threshold' of https://github.com/nyczol/rabbitmq-common into nyczol-configurable_gc_threshold

This commit is contained in:
Michael Klishin 2019-12-20 03:34:11 +03:00
commit d5f67e1f9f
1 changed files with 54 additions and 20 deletions

View File

@ -36,7 +36,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
-export([start/6, start_link/6, start/7, start_link/7]).
-export([start/6, start_link/6, start/7, start_link/7, start/8, start_link/8]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@ -46,7 +46,7 @@
send_command_flow/2, send_command_flow/3,
flush/1]).
-export([internal_send_command/4, internal_send_command/6]).
-export([msg_size/1, maybe_gc_large_msg/1]).
-export([msg_size/1, maybe_gc_large_msg/1, maybe_gc_large_msg/2]).
%% internal
-export([enter_mainloop/2, mainloop/2, mainloop1/2]).
@ -66,10 +66,13 @@
stats_timer,
%% data pending delivery (between socket
%% flushes)
pending
pending,
%% defines how ofter gc will be executed
gc_threshold
}).
-define(HIBERNATE_AFTER, 5000).
-define(GC_THRESHOLD, 1000000).
%%---------------------------------------------------------------------------
@ -93,6 +96,16 @@
non_neg_integer(), rabbit_types:protocol(), pid(),
rabbit_types:proc_name(), boolean()) ->
rabbit_types:ok(pid()).
-spec start
(rabbit_net:socket(), rabbit_channel:channel_number(),
non_neg_integer(), rabbit_types:protocol(), pid(),
rabbit_types:proc_name(), boolean(), undefined|non_neg_integer()) ->
rabbit_types:ok(pid()).
-spec start_link
(rabbit_net:socket(), rabbit_channel:channel_number(),
non_neg_integer(), rabbit_types:protocol(), pid(),
rabbit_types:proc_name(), boolean(), undefined|non_neg_integer()) ->
rabbit_types:ok(pid()).
-spec system_code_change(_,_,_,_) -> {'ok',_}.
-spec system_continue(_,_,#wstate{}) -> any().
@ -132,6 +145,9 @@
-spec maybe_gc_large_msg
(rabbit_types:content() | rabbit_types:message()) -> non_neg_integer().
-spec maybe_gc_large_msg
(rabbit_types:content() | rabbit_types:message(),
undefined | non_neg_integer()) -> undefined | non_neg_integer().
%%---------------------------------------------------------------------------
@ -143,26 +159,38 @@ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity) ->
start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
ReaderWantsStats) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats),
{ok, proc_lib:spawn(?MODULE, enter_mainloop, [Identity, State])}.
start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
ReaderWantsStats, ?GC_THRESHOLD).
start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
ReaderWantsStats) ->
start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
ReaderWantsStats, ?GC_THRESHOLD).
start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
ReaderWantsStats, GCThreshold) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats),
ReaderWantsStats, GCThreshold),
{ok, proc_lib:spawn(?MODULE, enter_mainloop, [Identity, State])}.
start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
ReaderWantsStats, GCThreshold) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats, GCThreshold),
{ok, proc_lib:spawn_link(?MODULE, enter_mainloop, [Identity, State])}.
initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats
,GCThreshold) ->
(case ReaderWantsStats of
true -> fun rabbit_event:init_stats_timer/2;
false -> fun rabbit_event:init_disabled_stats_timer/2
end)(#wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
protocol = Protocol,
reader = ReaderPid,
pending = []},
end)(#wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
protocol = Protocol,
reader = ReaderPid,
pending = [],
gc_threshold = GCThreshold},
#wstate.stats_timer).
system_continue(Parent, Deb, State) ->
@ -335,13 +363,14 @@ internal_send_command_async(MethodRecord,
maybe_flush(State#wstate{pending = [Frame | Pending]}).
internal_send_command_async(MethodRecord, Content,
State = #wstate{channel = Channel,
frame_max = FrameMax,
protocol = Protocol,
pending = Pending}) ->
State = #wstate{channel = Channel,
frame_max = FrameMax,
protocol = Protocol,
pending = Pending,
gc_threshold = GCThreshold}) ->
Frames = assemble_frames(Channel, MethodRecord, Content, FrameMax,
Protocol),
maybe_gc_large_msg(Content),
maybe_gc_large_msg(Content, GCThreshold),
maybe_flush(State#wstate{pending = [Frames | Pending]}).
%% When the amount of protocol method data buffered exceeds
@ -395,13 +424,18 @@ port_cmd(Sock, Data) ->
%% message is 1MB then that's ugly). So count how many bytes of
%% message we have processed, and force a GC every so often.
maybe_gc_large_msg(Content) ->
maybe_gc_large_msg(Content, ?GC_THRESHOLD).
maybe_gc_large_msg(_Content, undefined) ->
undefined;
maybe_gc_large_msg(Content, GCThreshold) ->
Size = msg_size(Content),
Current = case get(msg_size_for_gc) of
undefined -> 0;
C -> C
end,
New = Current + Size,
put(msg_size_for_gc, case New > 1000000 of
put(msg_size_for_gc, case New > GCThreshold of
true -> erlang:garbage_collect(),
0;
false -> New