diff --git a/deps/rabbit/src/rabbit_mirror_queue_slave.erl b/deps/rabbit/src/rabbit_mirror_queue_slave.erl index d0e2ceb9db..f8beb77367 100644 --- a/deps/rabbit/src/rabbit_mirror_queue_slave.erl +++ b/deps/rabbit/src/rabbit_mirror_queue_slave.erl @@ -385,8 +385,13 @@ handle_info({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), noreply(State); -handle_info(bump_reduce_memory_use, State) -> - noreply(State); +handle_info(bump_reduce_memory_use, State = #state{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQS1 = BQ:handle_info(bump_reduce_memory_use, BQS), + BQS2 = BQ:resume(BQS1), + noreply(State#state{ + backing_queue_state = BQS2 + }); %% In the event of a short partition during sync we can detect the %% master's 'death', drop out of sync, and then receive sync messages diff --git a/deps/rabbit/src/rabbit_mirror_queue_sync.erl b/deps/rabbit/src/rabbit_mirror_queue_sync.erl index 81f008bcef..896bdd5c61 100644 --- a/deps/rabbit/src/rabbit_mirror_queue_sync.erl +++ b/deps/rabbit/src/rabbit_mirror_queue_sync.erl @@ -288,6 +288,9 @@ wait_for_credit(SPids) -> end. wait_for_resources(Ref, SPids) -> + erlang:garbage_collect(), + % Probably bump_reduce_memory_use messages should be handled here as well, + % otherwise the BQ is not pushing messages to disk receive {conserve_resources, memory, false} -> SPids; @@ -367,7 +370,11 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, %% If the master throws an exception {'$gen_cast', {gm, {delete_and_terminate, Reason}}} -> BQ:delete_and_terminate(Reason, BQS), - {stop, Reason, {[], TRef, undefined}} + {stop, Reason, {[], TRef, undefined}}; + bump_reduce_memory_use -> + BQS1 = BQ:handle_info(bump_reduce_memory_use, BQS), + BQS2 = BQ:resume(BQS1), + slave_sync_loop(Args, {MA, TRef, BQS2}) end. %% We are partitioning messages by the Unacked element in the tuple.