Make empty CQ init faster in case of clean shutdown

At CQ startup variable_queue went through each seqid from 0 to
next_seq_id looking for the first message even if there were no
messages in the queue (no segment files).

In case of a clean shutdown the value next_seq_id is stored in
recovery terms. This value can be utilized by the queue index to
provide better seqid bounds in absence of segment files.

Before this patch starting an empty classic queue with next_seq_id =
100_000_000 used to take about 26 seconds. With this patch it takes
less than 1ms.
This commit is contained in:
Péter Gömöri 2025-04-30 18:22:43 +02:00
parent c458cba923
commit 150172f008
3 changed files with 61 additions and 4 deletions

View File

@ -20,7 +20,10 @@
%% queue implementation itself.
-export([pre_publish/7, flush_pre_publish_cache/2,
sync/1, needs_sync/1, flush/1,
bounds/1, next_segment_boundary/1]).
bounds/2, next_segment_boundary/1]).
%% Only used by tests
-export([bounds/1]).
%% Used to upgrade/downgrade from/to the v1 index.
-export([init_for_conversion/3]).
@ -480,7 +483,7 @@ recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = DirBin },
%% When resuming after a crash we need to double check the messages that are both
%% in the v1 and v2 index (effectively the messages below the upper bound of the
%% v1 index that are about to be written to it).
{_, V2HiSeqId, _} = bounds(State0),
{_, V2HiSeqId, _} = bounds(State0, undefined),
SkipFun = fun
(SeqId, FunState0) when SeqId < V2HiSeqId ->
case read(SeqId, SeqId + 1, FunState0) of
@ -1188,14 +1191,22 @@ flush_pre_publish_cache(TargetRamCount, State) ->
%% the test suite to pass. This can probably be made more accurate
%% in the future.
%% `bounds/1` is only used by tests
-spec bounds(State) ->
{non_neg_integer(), non_neg_integer(), State}
when State::state().
bounds(State) ->
bounds(State, undefined).
bounds(State = #qi{ segments = Segments }) ->
-spec bounds(State, non_neg_integer() | undefined) ->
{non_neg_integer(), non_neg_integer(), State}
when State::state().
bounds(State = #qi{ segments = Segments }, NextSeqIdHint) ->
?DEBUG("~0p", [State]),
%% We must special case when we are empty to make tests happy.
if
Segments =:= #{} andalso is_integer(NextSeqIdHint) ->
{NextSeqIdHint, NextSeqIdHint, State};
Segments =:= #{} ->
{0, 0, State};
true ->

View File

@ -1172,7 +1172,13 @@ expand_delta(_SeqId, #delta { count = Count,
init(IsDurable, IndexState, StoreState, DeltaCount, DeltaBytes, Terms,
PersistentClient, TransientClient, VHost) ->
{LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState),
NextSeqIdHint =
case Terms of
non_clean_shutdown -> undefined;
_ -> proplists:get_value(next_seq_id, Terms)
end,
{LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState, NextSeqIdHint),
{NextSeqId, NextDeliverSeqId, DeltaCount1, DeltaBytes1} =
case Terms of

View File

@ -29,6 +29,7 @@
variable_queue_dropfetchwhile,
variable_queue_dropwhile_restart,
variable_queue_dropwhile_sync_restart,
variable_queue_restart_large_seq_id,
variable_queue_ack_limiting,
variable_queue_purge,
variable_queue_requeue,
@ -1421,6 +1422,45 @@ variable_queue_dropwhile_sync_restart2(VQ0, QName) ->
VQ5.
variable_queue_restart_large_seq_id(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, variable_queue_restart_large_seq_id1, [Config]).
variable_queue_restart_large_seq_id1(Config) ->
with_fresh_variable_queue(
fun variable_queue_restart_large_seq_id2/2,
?config(variable_queue_type, Config)).
variable_queue_restart_large_seq_id2(VQ0, QName) ->
Count = 1,
%% publish and consume a message
VQ1 = publish_fetch_and_ack(Count, 0, VQ0),
%% should be empty now
true = rabbit_variable_queue:is_empty(VQ1),
_VQ2 = rabbit_variable_queue:terminate(shutdown, VQ1),
Terms = variable_queue_read_terms(QName),
Count = proplists:get_value(next_seq_id, Terms),
%% set a very high next_seq_id as if 100M messages have been
%% published and consumed
Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000}),
{TInit, VQ3} =
timer:tc(
fun() -> variable_queue_init(test_amqqueue(QName, true), Terms2) end,
millisecond),
%% even with a very high next_seq_id start of an empty queue
%% should be quick (few milliseconds, but let's give it 100ms, to
%% avoid flaking on slow servers)
{true, _} = {TInit < 100, TInit},
%% should be empty now
true = rabbit_variable_queue:is_empty(VQ3),
VQ3.
variable_queue_ack_limiting(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, variable_queue_ack_limiting1, [Config]).