Merge pull request #6467 from rabbitmq/cmq-optimisations
Classic Queue Mirroring: reduce memory usage during dead-lettering of many messages
This commit is contained in:
commit
e163ab4473
|
|
@ -104,7 +104,7 @@ group_by_queue_and_reason(Tables) ->
|
|||
ensure_xdeath_event_count(Augmented, N),
|
||||
Key, SeenKeys, Acc),
|
||||
{sets:add_element(Key, SeenKeys), Acc1}
|
||||
end, {sets:new(), []}, Tables),
|
||||
end, {sets:new([{version, 2}]), []}, Tables),
|
||||
Grouped.
|
||||
|
||||
update_x_death_header(Info, undefined) ->
|
||||
|
|
|
|||
|
|
@ -122,7 +122,7 @@ init_with_existing_bq(Q0, BQ, BQS) when ?is_amqqueue(Q0) ->
|
|||
backing_queue_state = BQS,
|
||||
seen_status = #{},
|
||||
confirmed = [],
|
||||
known_senders = sets:new(),
|
||||
known_senders = sets:new([{version, 2}]),
|
||||
wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000)};
|
||||
{error, Reason} ->
|
||||
%% The GM can shutdown before the coordinator has started up
|
||||
|
|
|
|||
|
|
@ -18,10 +18,10 @@
|
|||
-export([set_maximum_since_use/2, info/1, go/2]).
|
||||
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||
code_change/3, handle_pre_hibernate/1, prioritise_call/4,
|
||||
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
|
||||
code_change/3, handle_pre_hibernate/1, format_message_queue/2]).
|
||||
|
||||
-export([joined/2, members_changed/3, handle_msg/3, handle_terminate/2]).
|
||||
-export([joined/2, members_changed/3, handle_msg/3, handle_terminate/2,
|
||||
prioritise_cast/3, prioritise_info/3]).
|
||||
|
||||
-behaviour(gen_server2).
|
||||
-behaviour(gm).
|
||||
|
|
@ -66,6 +66,19 @@
|
|||
set_maximum_since_use(QPid, Age) ->
|
||||
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
|
||||
|
||||
|
||||
prioritise_cast(Msg, _Len, _State) ->
|
||||
case Msg of
|
||||
{run_backing_queue, _Mod, _Fun} -> 6;
|
||||
_ -> 0
|
||||
end.
|
||||
|
||||
prioritise_info(Msg, _Len, _State) ->
|
||||
case Msg of
|
||||
sync_timeout -> 6;
|
||||
_ -> 0
|
||||
end.
|
||||
|
||||
info(QPid) -> gen_server2:call(QPid, info, infinity).
|
||||
|
||||
init(Q) when ?is_amqqueue(Q) ->
|
||||
|
|
@ -446,29 +459,6 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
|
|||
BQS3 = BQ:handle_pre_hibernate(BQS2),
|
||||
{hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}.
|
||||
|
||||
prioritise_call(Msg, _From, _Len, _State) ->
|
||||
case Msg of
|
||||
info -> 9;
|
||||
{gm_deaths, _Dead} -> 5;
|
||||
_ -> 0
|
||||
end.
|
||||
|
||||
prioritise_cast(Msg, _Len, _State) ->
|
||||
case Msg of
|
||||
{set_ram_duration_target, _Duration} -> 8;
|
||||
{set_maximum_since_use, _Age} -> 8;
|
||||
{run_backing_queue, _Mod, _Fun} -> 6;
|
||||
{gm, _Msg} -> 5;
|
||||
_ -> 0
|
||||
end.
|
||||
|
||||
prioritise_info(Msg, _Len, _State) ->
|
||||
case Msg of
|
||||
update_ram_duration -> 8;
|
||||
sync_timeout -> 6;
|
||||
_ -> 0
|
||||
end.
|
||||
|
||||
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
|
||||
|
||||
%% ---------------------------------------------------------------------------
|
||||
|
|
@ -878,7 +868,7 @@ maybe_enqueue_message(
|
|||
|
||||
get_sender_queue(ChPid, SQ) ->
|
||||
case maps:find(ChPid, SQ) of
|
||||
error -> {queue:new(), sets:new(), running};
|
||||
error -> {queue:new(), sets:new([{version, 2}]), running};
|
||||
{ok, Val} -> Val
|
||||
end.
|
||||
|
||||
|
|
|
|||
|
|
@ -1326,10 +1326,8 @@ find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
|
|||
function_exported_or_default(Mod, Fun, Arity, Default) ->
|
||||
case erlang:function_exported(Mod, Fun, Arity) of
|
||||
true -> case Arity of
|
||||
3 -> fun (Msg, GS2State = #gs2_state { queue = Queue,
|
||||
state = State }) ->
|
||||
Length = priority_queue:len(Queue),
|
||||
case catch Mod:Fun(Msg, Length, State) of
|
||||
3 -> fun (Msg, GS2State = #gs2_state { state = State }) ->
|
||||
case catch Mod:Fun(Msg, 0, State) of
|
||||
drop ->
|
||||
drop;
|
||||
Res when is_integer(Res) ->
|
||||
|
|
@ -1338,10 +1336,8 @@ function_exported_or_default(Mod, Fun, Arity, Default) ->
|
|||
handle_common_termination(Err, Msg, GS2State)
|
||||
end
|
||||
end;
|
||||
4 -> fun (Msg, From, GS2State = #gs2_state { queue = Queue,
|
||||
state = State }) ->
|
||||
Length = priority_queue:len(Queue),
|
||||
case catch Mod:Fun(Msg, From, Length, State) of
|
||||
4 -> fun (Msg, From, GS2State = #gs2_state { state = State }) ->
|
||||
case catch Mod:Fun(Msg, From, 0, State) of
|
||||
Res when is_integer(Res) ->
|
||||
Res;
|
||||
Err ->
|
||||
|
|
|
|||
Loading…
Reference in New Issue