Merge pull request #2973 from luos/mirror-sync-reduce-memory-use
Classic queue mirrors should also continue pushing messages to disk when syncing
(cherry picked from commit a728662fd7)
This commit is contained in:
parent
55176066cf
commit
d62ebea692
|
|
@ -385,8 +385,13 @@ handle_info({bump_credit, Msg}, State) ->
|
||||||
credit_flow:handle_bump_msg(Msg),
|
credit_flow:handle_bump_msg(Msg),
|
||||||
noreply(State);
|
noreply(State);
|
||||||
|
|
||||||
handle_info(bump_reduce_memory_use, State) ->
|
handle_info(bump_reduce_memory_use, State = #state{backing_queue = BQ,
|
||||||
noreply(State);
|
backing_queue_state = BQS}) ->
|
||||||
|
BQS1 = BQ:handle_info(bump_reduce_memory_use, BQS),
|
||||||
|
BQS2 = BQ:resume(BQS1),
|
||||||
|
noreply(State#state{
|
||||||
|
backing_queue_state = BQS2
|
||||||
|
});
|
||||||
|
|
||||||
%% In the event of a short partition during sync we can detect the
|
%% In the event of a short partition during sync we can detect the
|
||||||
%% master's 'death', drop out of sync, and then receive sync messages
|
%% master's 'death', drop out of sync, and then receive sync messages
|
||||||
|
|
|
||||||
|
|
@ -288,6 +288,9 @@ wait_for_credit(SPids) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
wait_for_resources(Ref, SPids) ->
|
wait_for_resources(Ref, SPids) ->
|
||||||
|
erlang:garbage_collect(),
|
||||||
|
% Probably bump_reduce_memory_use messages should be handled here as well,
|
||||||
|
% otherwise the BQ is not pushing messages to disk
|
||||||
receive
|
receive
|
||||||
{conserve_resources, memory, false} ->
|
{conserve_resources, memory, false} ->
|
||||||
SPids;
|
SPids;
|
||||||
|
|
@ -367,7 +370,11 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
|
||||||
%% If the master throws an exception
|
%% If the master throws an exception
|
||||||
{'$gen_cast', {gm, {delete_and_terminate, Reason}}} ->
|
{'$gen_cast', {gm, {delete_and_terminate, Reason}}} ->
|
||||||
BQ:delete_and_terminate(Reason, BQS),
|
BQ:delete_and_terminate(Reason, BQS),
|
||||||
{stop, Reason, {[], TRef, undefined}}
|
{stop, Reason, {[], TRef, undefined}};
|
||||||
|
bump_reduce_memory_use ->
|
||||||
|
BQS1 = BQ:handle_info(bump_reduce_memory_use, BQS),
|
||||||
|
BQS2 = BQ:resume(BQS1),
|
||||||
|
slave_sync_loop(Args, {MA, TRef, BQS2})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% We are partitioning messages by the Unacked element in the tuple.
|
%% We are partitioning messages by the Unacked element in the tuple.
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue