QQ: Revise checkpointing logic

To take more frequent checkpoints for large message workload

Lower the min_checkpoint_interval substantially to allow quorum queues
better control over when checkpoints are taken.

Track bytes enqueued in the aux state and suggest a checkpoint after
every 64MB enqueued (this value is scaled according to backlog just
like the indexes condition).
This should help with more timely checkpointing when very large
messages is used.

Try evaluating byte size independently of time window

also increase max size
This commit is contained in:
Karl Nilsson 2025-03-21 14:30:16 +00:00
parent bb208858a1
commit 6695282640
4 changed files with 59 additions and 26 deletions

View File

@ -932,7 +932,7 @@ which_module(5) -> ?MODULE.
smallest_index :: undefined | ra:index(), smallest_index :: undefined | ra:index(),
messages_total :: non_neg_integer(), messages_total :: non_neg_integer(),
indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(), indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(),
unused_1 = ?NIL}). bytes_in = 0 :: non_neg_integer()}).
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}). -record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
-record(aux, {name :: atom(), -record(aux, {name :: atom(),
capacity :: term(), capacity :: term(),
@ -943,7 +943,9 @@ which_module(5) -> ?MODULE.
gc = #aux_gc{} :: #aux_gc{}, gc = #aux_gc{} :: #aux_gc{},
tick_pid :: undefined | pid(), tick_pid :: undefined | pid(),
cache = #{} :: map(), cache = #{} :: map(),
last_checkpoint :: #checkpoint{}}). last_checkpoint :: #checkpoint{},
bytes_in = 0 :: non_neg_integer(),
bytes_out = 0 :: non_neg_integer()}).
init_aux(Name) when is_atom(Name) -> init_aux(Name) when is_atom(Name) ->
%% TODO: catch specific exception throw if table already exists %% TODO: catch specific exception throw if table already exists
@ -956,7 +958,7 @@ init_aux(Name) when is_atom(Name) ->
last_checkpoint = #checkpoint{index = 0, last_checkpoint = #checkpoint{index = 0,
timestamp = erlang:system_time(millisecond), timestamp = erlang:system_time(millisecond),
messages_total = 0, messages_total = 0,
unused_1 = ?NIL}}. bytes_in = 0}}.
handle_aux(RaftState, Tag, Cmd, #aux{name = Name, handle_aux(RaftState, Tag, Cmd, #aux{name = Name,
capacity = Cap, capacity = Cap,
@ -973,13 +975,14 @@ handle_aux(RaftState, Tag, Cmd, AuxV2, RaAux)
handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux); handle_aux(RaftState, Tag, Cmd, AuxV3, RaAux);
handle_aux(leader, cast, eval, handle_aux(leader, cast, eval,
#?AUX{last_decorators_state = LastDec, #?AUX{last_decorators_state = LastDec,
bytes_in = BytesIn,
last_checkpoint = Check0} = Aux0, last_checkpoint = Check0} = Aux0,
RaAux) -> RaAux) ->
#?STATE{cfg = #cfg{resource = QName}} = MacState = #?STATE{cfg = #cfg{resource = QName}} = MacState =
ra_aux:machine_state(RaAux), ra_aux:machine_state(RaAux),
Ts = erlang:system_time(millisecond), Ts = erlang:system_time(millisecond),
{Check, Effects0} = do_checkpoints(Ts, Check0, RaAux, false), {Check, Effects0} = do_checkpoints(Ts, Check0, RaAux, BytesIn, false),
%% this is called after each batch of commands have been applied %% this is called after each batch of commands have been applied
%% set timer for message expire %% set timer for message expire
@ -995,11 +998,16 @@ handle_aux(leader, cast, eval,
last_decorators_state = NewLast}, RaAux, Effects} last_decorators_state = NewLast}, RaAux, Effects}
end; end;
handle_aux(_RaftState, cast, eval, handle_aux(_RaftState, cast, eval,
#?AUX{last_checkpoint = Check0} = Aux0, #?AUX{last_checkpoint = Check0,
bytes_in = BytesIn} = Aux0,
RaAux) -> RaAux) ->
Ts = erlang:system_time(millisecond), Ts = erlang:system_time(millisecond),
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, false), {Check, Effects} = do_checkpoints(Ts, Check0, RaAux, BytesIn, false),
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects}; {no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects};
handle_aux(_RaftState, cast, {bytes_in, {MetaSize, BodySize}},
#?AUX{bytes_in = Bytes} = Aux0,
RaAux) ->
{no_reply, Aux0#?AUX{bytes_in = Bytes + MetaSize + BodySize}, RaAux, []};
handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds, handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
consumer_key = Key} = Ret, Corr, Pid}, consumer_key = Key} = Ret, Corr, Pid},
Aux0, RaAux0) -> Aux0, RaAux0) ->
@ -1129,12 +1137,13 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0,
handle_aux(_, _, garbage_collection, Aux, RaAux) -> handle_aux(_, _, garbage_collection, Aux, RaAux) ->
{no_reply, force_eval_gc(RaAux, Aux), RaAux}; {no_reply, force_eval_gc(RaAux, Aux), RaAux};
handle_aux(_RaState, _, force_checkpoint, handle_aux(_RaState, _, force_checkpoint,
#?AUX{last_checkpoint = Check0} = Aux, RaAux) -> #?AUX{last_checkpoint = Check0,
bytes_in = BytesIn} = Aux, RaAux) ->
Ts = erlang:system_time(millisecond), Ts = erlang:system_time(millisecond),
#?STATE{cfg = #cfg{resource = QR}} = ra_aux:machine_state(RaAux), #?STATE{cfg = #cfg{resource = QR}} = ra_aux:machine_state(RaAux),
rabbit_log:debug("~ts: rabbit_fifo: forcing checkpoint at ~b", rabbit_log:debug("~ts: rabbit_fifo: forcing checkpoint at ~b",
[rabbit_misc:rs(QR), ra_aux:last_applied(RaAux)]), [rabbit_misc:rs(QR), ra_aux:last_applied(RaAux)]),
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, true), {Check, Effects} = do_checkpoints(Ts, Check0, RaAux, BytesIn, true),
{no_reply, Aux#?AUX{last_checkpoint = Check}, RaAux, Effects}; {no_reply, Aux#?AUX{last_checkpoint = Check}, RaAux, Effects};
handle_aux(RaState, _, {dlx, _} = Cmd, Aux0, RaAux) -> handle_aux(RaState, _, {dlx, _} = Cmd, Aux0, RaAux) ->
#?STATE{dlx = DlxState, #?STATE{dlx = DlxState,
@ -1578,7 +1587,9 @@ maybe_return_all(#{system_time := Ts} = Meta, ConsumerKey,
apply_enqueue(#{index := RaftIdx, apply_enqueue(#{index := RaftIdx,
system_time := Ts} = Meta, From, system_time := Ts} = Meta, From,
Seq, RawMsg, Size, State0) -> Seq, RawMsg, Size, State0) ->
case maybe_enqueue(RaftIdx, Ts, From, Seq, RawMsg, Size, [], State0) of Effects0 = [{aux, {bytes_in, Size}}],
case maybe_enqueue(RaftIdx, Ts, From, Seq, RawMsg, Size,
Effects0, State0) of
{ok, State1, Effects1} -> {ok, State1, Effects1} ->
checkout(Meta, State0, State1, Effects1); checkout(Meta, State0, State1, Effects1);
{out_of_sequence, State, Effects} -> {out_of_sequence, State, Effects} ->
@ -2918,11 +2929,12 @@ priority_tag(Msg) ->
end. end.
do_checkpoints(Ts, do_checkpoints(Ts, #checkpoint{index = ChIdx,
#checkpoint{index = ChIdx, timestamp = ChTime,
timestamp = ChTime, smallest_index = LastSmallest,
smallest_index = LastSmallest, bytes_in = LastBytesIn,
indexes = MinIndexes} = Check0, RaAux, Force) -> indexes = MinIndexes} = Check0,
RaAux, BytesIn, Force) ->
LastAppliedIdx = ra_aux:last_applied(RaAux), LastAppliedIdx = ra_aux:last_applied(RaAux),
IndexesSince = LastAppliedIdx - ChIdx, IndexesSince = LastAppliedIdx - ChIdx,
#?STATE{} = MacState = ra_aux:machine_state(RaAux), #?STATE{} = MacState = ra_aux:machine_state(RaAux),
@ -2934,21 +2946,35 @@ do_checkpoints(Ts,
Smallest Smallest
end, end,
MsgsTot = messages_total(MacState), MsgsTot = messages_total(MacState),
%% more than 64MB (by default) of message data has been written to the log
%% best take a checkpoint
{CheckMinInterval, CheckMinIndexes, CheckMaxIndexes} = {CheckMinInterval, CheckMinIndexes, CheckMaxIndexes} =
persistent_term:get(quorum_queue_checkpoint_config, persistent_term:get(quorum_queue_checkpoint_config,
{?CHECK_MIN_INTERVAL_MS, ?CHECK_MIN_INDEXES, {?CHECK_MIN_INTERVAL_MS, ?CHECK_MIN_INDEXES,
?CHECK_MAX_INDEXES}), ?CHECK_MAX_INDEXES}),
%% scale the bytes limit as the backlog increases
MaxBytesFactor = max(1, MsgsTot / CheckMaxIndexes),
EnoughDataWritten = BytesIn - LastBytesIn > (?CHECK_MAX_BYTES * MaxBytesFactor),
EnoughTimeHasPassed = TimeSince > CheckMinInterval, EnoughTimeHasPassed = TimeSince > CheckMinInterval,
%% enough time has passed and enough indexes have been committed case (EnoughTimeHasPassed andalso
case (IndexesSince > MinIndexes andalso (
EnoughTimeHasPassed) orelse %% condition 1: enough indexes have been committed since the last
%% the queue is empty and some commands have been %% checkpoint
%% applied since the last checkpoint (IndexesSince > MinIndexes) orelse
(MsgsTot == 0 andalso %% condition 2: the queue is empty and _some_ commands
IndexesSince > CheckMinIndexes andalso %% have been applied since the last checkpoint
EnoughTimeHasPassed) orelse (MsgsTot == 0 andalso IndexesSince > 32)
Force of )
) orelse
%% condition 3: enough message data has been written to warrant a new
%% checkpoint, this ignores the time windowing
EnoughDataWritten orelse
%% force was requested, e.g. after a purge
Force
of
true -> true ->
%% take fewer checkpoints the more messages there are on queue %% take fewer checkpoints the more messages there are on queue
NextIndexes = min(max(MsgsTot, CheckMinIndexes), CheckMaxIndexes), NextIndexes = min(max(MsgsTot, CheckMinIndexes), CheckMaxIndexes),
@ -2957,6 +2983,7 @@ do_checkpoints(Ts,
timestamp = Ts, timestamp = Ts,
smallest_index = NewSmallest, smallest_index = NewSmallest,
messages_total = MsgsTot, messages_total = MsgsTot,
bytes_in = BytesIn,
indexes = NextIndexes}, indexes = NextIndexes},
[{checkpoint, LastAppliedIdx, MacState} | [{checkpoint, LastAppliedIdx, MacState} |
release_cursor(LastSmallest, NewSmallest)]}; release_cursor(LastSmallest, NewSmallest)]};

View File

@ -100,8 +100,11 @@
% represents a partially applied module call % represents a partially applied module call
-define(CHECK_MIN_INTERVAL_MS, 1000). -define(CHECK_MIN_INTERVAL_MS, 1000).
-define(CHECK_MIN_INDEXES, 4096). -define(CHECK_MIN_INDEXES, 4096 * 2).
-define(CHECK_MAX_INDEXES, 666_667). -define(CHECK_MAX_INDEXES, 666_667).
%% once these many bytes have been written since the last checkpoint
%% we request a checkpoint irrespectively
-define(CHECK_MAX_BYTES, 128_000_000).
-define(USE_AVG_HALF_LIFE, 10000.0). -define(USE_AVG_HALF_LIFE, 10000.0).
%% an average QQ without any message uses about 100KB so setting this limit %% an average QQ without any message uses about 100KB so setting this limit

View File

@ -145,8 +145,9 @@
-define(DELETE_TIMEOUT, 5000). -define(DELETE_TIMEOUT, 5000).
-define(MEMBER_CHANGE_TIMEOUT, 20_000). -define(MEMBER_CHANGE_TIMEOUT, 20_000).
-define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096 -define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096
% -define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra %% setting a low default here to allow quorum queues to better chose themselves
-define(MIN_CHECKPOINT_INTERVAL, 8192). %% the ra default is 16384 %% when to take a checkpoint
-define(MIN_CHECKPOINT_INTERVAL, 64).
-define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000). -define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000).
-define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000). -define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000).

View File

@ -1527,6 +1527,8 @@ gh_12635(Config) ->
publish_confirm(Ch0, QQ), publish_confirm(Ch0, QQ),
publish_confirm(Ch0, QQ), publish_confirm(Ch0, QQ),
%% a QQ will not take checkpoints more frequently than every 1s
timer:sleep(1000),
%% force a checkpoint on leader %% force a checkpoint on leader
ok = rpc:call(Server0, ra, cast_aux_command, [{RaName, Server0}, force_checkpoint]), ok = rpc:call(Server0, ra, cast_aux_command, [{RaName, Server0}, force_checkpoint]),
rabbit_ct_helpers:await_condition( rabbit_ct_helpers:await_condition(