make garbage collector threshold configurable

This commit is contained in:
ANycz 2019-12-11 23:11:15 +01:00
parent 178757932e
commit 3daa6be55c
3 changed files with 24 additions and 10 deletions

View File

@ -129,7 +129,9 @@ define PROJECT_ENV
%% interval at which the channel can perform periodic actions
{channel_tick_interval, 60000},
%% Default max message size is 128 MB
{max_message_size, 134217728}
{max_message_size, 134217728},
%% Default is ~ 1MB
{gc_threshold, 1000000}
]
endef

View File

@ -22,7 +22,8 @@
message/3, message/4, properties/1, prepend_table_header/3,
extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
header_routes/1, parse_expiration/1, header/2, header/3]).
-export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]).
-export([build_content/2, from_content/1, msg_size/1,
maybe_gc_large_msg/1, maybe_gc_large_msg/2]).
-export([add_header/4]).
%%----------------------------------------------------------------------------
@ -311,6 +312,11 @@ parse_expiration(#'P_basic'{expiration = Expiration}) ->
maybe_gc_large_msg(Content) ->
rabbit_writer:maybe_gc_large_msg(Content).
maybe_gc_large_msg(Content, undefined) ->
rabbit_writer:msg_size(Content);
maybe_gc_large_msg(Content, GCThreshold) ->
rabbit_writer:maybe_gc_large_msg(Content, GCThreshold).
msg_size(Content) ->
rabbit_writer:msg_size(Content).

View File

@ -169,7 +169,9 @@
delivery_flow,
interceptor_state,
queue_states,
tick_timer
tick_timer,
%% defines how ofter gc will be executed
gc_threshold
}).
-define(QUEUE, lqueue).
@ -508,6 +510,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
MaxMessageSize = get_max_message_size(),
ConsumerTimeout = get_consumer_timeout(),
OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
{ok, GCThreshold} = application:get_env(rabbit, gc_threshold),
State = #ch{cfg = #conf{state = starting,
protocol = Protocol,
channel = Channel,
@ -543,7 +546,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined,
queue_states = #{}
queue_states = #{},
gc_threshold = GCThreshold
},
State1 = State#ch{
interceptor_state = rabbit_channel_interceptor:init(State)},
@ -1110,8 +1114,8 @@ extract_variable_map_from_amqp_params([Value]) ->
extract_variable_map_from_amqp_params(_) ->
#{}.
check_msg_size(Content, MaxMessageSize) ->
Size = rabbit_basic:maybe_gc_large_msg(Content),
check_msg_size(Content, MaxMessageSize, GCThreshold) ->
Size = rabbit_basic:maybe_gc_large_msg(Content, GCThreshold),
case Size of
S when S > MaxMessageSize ->
ErrorMessage = case MaxMessageSize of
@ -1309,9 +1313,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
},
tx = Tx,
confirm_enabled = ConfirmEnabled,
delivery_flow = Flow
delivery_flow = Flow,
gc_threshold = GCThreshold
}) ->
check_msg_size(Content, MaxMessageSize),
check_msg_size(Content, MaxMessageSize, GCThreshold),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, User, AuthzContext),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@ -2689,7 +2694,8 @@ handle_deliver(ConsumerTag, AckRequired,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}},
State = #ch{cfg = #conf{writer_pid = WriterPid},
next_tag = DeliveryTag}) ->
next_tag = DeliveryTag,
gc_threshold = GCThreshold}) ->
Deliver = #'basic.deliver'{consumer_tag = ConsumerTag,
delivery_tag = DeliveryTag,
redelivered = Redelivered,
@ -2702,7 +2708,7 @@ handle_deliver(ConsumerTag, AckRequired,
false ->
ok = rabbit_writer:send_command(WriterPid, Deliver, Content)
end,
rabbit_basic:maybe_gc_large_msg(Content),
rabbit_basic:maybe_gc_large_msg(Content, GCThreshold),
record_sent(deliver, ConsumerTag, AckRequired, Msg, State).
handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,