Configure memory reduction for all queue types

This commit is contained in:
Diana Corbacho 2016-12-22 16:43:19 +01:00
parent 58dd55b008
commit 70fdf59cb5
2 changed files with 30 additions and 23 deletions

View File

@ -104,7 +104,8 @@ define PROJECT_ENV
{passphrase, undefined} {passphrase, undefined}
]}, ]},
%% rabbitmq-server-973 %% rabbitmq-server-973
{lazy_queue_explicit_gc_run_operation_threshold, 250}, {queue_explicit_gc_run_operation_threshold, 1000},
{lazy_queue_explicit_gc_run_operation_threshold, 1000},
{background_gc_enabled, true}, {background_gc_enabled, true},
{background_gc_target_interval, 60000} {background_gc_target_interval, 60000}
] ]

View File

@ -434,14 +434,20 @@
%% we define the garbage collector threshold %% we define the garbage collector threshold
%% it needs to tune the GC calls inside `reduce_memory_use` %% it needs to tune the `reduce_memory_use` calls. Thus, the garbage collection.
%% see: rabbitmq-server-973 and `maybe_execute_gc` function %% see: rabbitmq-server-973 and rabbitmq-server-964
-define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 250). -define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 1000).
-define(EXPLICIT_GC_RUN_OP_THRESHOLD, -define(EXPLICIT_GC_RUN_OP_THRESHOLD(Mode),
case get(explicit_gc_run_operation_threshold) of case get(explicit_gc_run_operation_threshold) of
undefined -> undefined ->
Val = rabbit_misc:get_env(rabbit, lazy_queue_explicit_gc_run_operation_threshold, Val = case Mode of
?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD), lazy -> rabbit_misc:get_env(rabbit,
lazy_queue_explicit_gc_run_operation_threshold,
?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD);
_ -> rabbit_misc:get_env(rabbit,
queue_explicit_gc_run_operation_threshold,
?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD)
end,
put(explicit_gc_run_operation_threshold, Val), put(explicit_gc_run_operation_threshold, Val),
Val; Val;
Val -> Val Val -> Val
@ -586,27 +592,27 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) ->
publish1(Msg, MsgProps, IsDelivered, ChPid, Flow, publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
fun maybe_write_to_disk/4, fun maybe_write_to_disk/4,
State), State),
a(reduce_memory_use(maybe_update_rates(State1))). a(maybe_reduce_memory_use(maybe_update_rates(State1))).
batch_publish(Publishes, ChPid, Flow, State) -> batch_publish(Publishes, ChPid, Flow, State) ->
{ChPid, Flow, State1} = {ChPid, Flow, State1} =
lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes), lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes),
State2 = ui(State1), State2 = ui(State1),
a(reduce_memory_use(maybe_update_rates(State2))). a(maybe_reduce_memory_use(maybe_update_rates(State2))).
publish_delivered(Msg, MsgProps, ChPid, Flow, State) -> publish_delivered(Msg, MsgProps, ChPid, Flow, State) ->
{SeqId, State1} = {SeqId, State1} =
publish_delivered1(Msg, MsgProps, ChPid, Flow, publish_delivered1(Msg, MsgProps, ChPid, Flow,
fun maybe_write_to_disk/4, fun maybe_write_to_disk/4,
State), State),
{SeqId, a(reduce_memory_use(maybe_update_rates(State1)))}. {SeqId, a(maybe_reduce_memory_use(maybe_update_rates(State1)))}.
batch_publish_delivered(Publishes, ChPid, Flow, State) -> batch_publish_delivered(Publishes, ChPid, Flow, State) ->
{ChPid, Flow, SeqIds, State1} = {ChPid, Flow, SeqIds, State1} =
lists:foldl(fun batch_publish_delivered1/2, lists:foldl(fun batch_publish_delivered1/2,
{ChPid, Flow, [], State}, Publishes), {ChPid, Flow, [], State}, Publishes),
State2 = ui(State1), State2 = ui(State1),
{lists:reverse(SeqIds), a(reduce_memory_use(maybe_update_rates(State2)))}. {lists:reverse(SeqIds), a(maybe_reduce_memory_use(maybe_update_rates(State2)))}.
discard(_MsgId, _ChPid, _Flow, State) -> State. discard(_MsgId, _ChPid, _Flow, State) -> State.
@ -710,7 +716,7 @@ requeue(AckTags, #vqstate { mode = default,
{Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1, {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
State2), State2),
MsgCount = length(MsgIds2), MsgCount = length(MsgIds2),
{MsgIds2, a(reduce_memory_use( {MsgIds2, a(maybe_reduce_memory_use(
maybe_update_rates(ui( maybe_update_rates(ui(
State3 #vqstate { delta = Delta1, State3 #vqstate { delta = Delta1,
q3 = Q3a, q3 = Q3a,
@ -728,7 +734,7 @@ requeue(AckTags, #vqstate { mode = lazy,
{Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds, {Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds,
State1), State1),
MsgCount = length(MsgIds1), MsgCount = length(MsgIds1),
{MsgIds1, a(reduce_memory_use( {MsgIds1, a(maybe_reduce_memory_use(
maybe_update_rates(ui( maybe_update_rates(ui(
State2 #vqstate { delta = Delta1, State2 #vqstate { delta = Delta1,
q3 = Q3a, q3 = Q3a,
@ -778,7 +784,7 @@ set_ram_duration_target(
(TargetRamCount =/= infinity andalso (TargetRamCount =/= infinity andalso
TargetRamCount1 >= TargetRamCount) of TargetRamCount1 >= TargetRamCount) of
true -> State1; true -> State1;
false -> reduce_memory_use(State1) false -> maybe_reduce_memory_use(State1)
end). end).
maybe_update_rates(State = #vqstate{ in_counter = InCount, maybe_update_rates(State = #vqstate{ in_counter = InCount,
@ -860,7 +866,7 @@ timeout(State = #vqstate { index_state = IndexState }) ->
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
resume(State) -> a(reduce_memory_use(State)). resume(State) -> a(maybe_reduce_memory_use(State)).
msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
out = AvgEgressRate } }) -> out = AvgEgressRate } }) ->
@ -2310,12 +2316,12 @@ ifold(Fun, Acc, Its, State) ->
%% Phase changes %% Phase changes
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
maybe_execute_gc(State = #vqstate {memory_reduction_run_count = MRedRunCount}) -> maybe_reduce_memory_use(State = #vqstate {memory_reduction_run_count = MRedRunCount,
case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD of mode = Mode}) ->
true -> garbage_collect(), case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD(Mode) of
State#vqstate{memory_reduction_run_count = 0}; true -> State1 = reduce_memory_use(State),
State1#vqstate{memory_reduction_run_count = 0};
false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1} false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1}
end. end.
reduce_memory_use(State = #vqstate { target_ram_count = infinity }) -> reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
@ -2330,7 +2336,6 @@ reduce_memory_use(State = #vqstate {
out = AvgEgress, out = AvgEgress,
ack_in = AvgAckIngress, ack_in = AvgAckIngress,
ack_out = AvgAckEgress } }) -> ack_out = AvgAckEgress } }) ->
State1 = #vqstate { q2 = Q2, q3 = Q3 } = State1 = #vqstate { q2 = Q2, q3 = Q3 } =
case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
0 -> State; 0 -> State;
@ -2390,7 +2395,8 @@ reduce_memory_use(State = #vqstate {
S2 -> S2 ->
push_betas_to_deltas(S2, State1) push_betas_to_deltas(S2, State1)
end, end,
maybe_execute_gc(State3). garbage_collect(),
State3.
limit_ram_acks(0, State) -> limit_ram_acks(0, State) ->
{0, ui(State)}; {0, ui(State)};