This commit is contained in:
Péter Gömöri 2025-05-07 17:59:51 +00:00 committed by GitHub
commit 4f8c128081
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 61 additions and 4 deletions

View File

@ -20,7 +20,10 @@
%% queue implementation itself.
-export([pre_publish/7, flush_pre_publish_cache/2,
sync/1, needs_sync/1, flush/1,
bounds/1, next_segment_boundary/1]).
bounds/2, next_segment_boundary/1]).
%% Only used by tests
-export([bounds/1]).
%% Used to upgrade/downgrade from/to the v1 index.
-export([init_for_conversion/3]).
@ -480,7 +483,7 @@ recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = DirBin },
%% When resuming after a crash we need to double check the messages that are both
%% in the v1 and v2 index (effectively the messages below the upper bound of the
%% v1 index that are about to be written to it).
{_, V2HiSeqId, _} = bounds(State0),
{_, V2HiSeqId, _} = bounds(State0, undefined),
SkipFun = fun
(SeqId, FunState0) when SeqId < V2HiSeqId ->
case read(SeqId, SeqId + 1, FunState0) of
@ -1188,14 +1191,22 @@ flush_pre_publish_cache(TargetRamCount, State) ->
%% the test suite to pass. This can probably be made more accurate
%% in the future.
%% `bounds/1` is only used by tests
-spec bounds(State) ->
{non_neg_integer(), non_neg_integer(), State}
when State::state().
bounds(State) ->
bounds(State, undefined).
bounds(State = #qi{ segments = Segments }) ->
-spec bounds(State, non_neg_integer() | undefined) ->
{non_neg_integer(), non_neg_integer(), State}
when State::state().
bounds(State = #qi{ segments = Segments }, NextSeqIdHint) ->
?DEBUG("~0p", [State]),
%% We must special case when we are empty to make tests happy.
if
Segments =:= #{} andalso is_integer(NextSeqIdHint) ->
{NextSeqIdHint, NextSeqIdHint, State};
Segments =:= #{} ->
{0, 0, State};
true ->

View File

@ -1172,7 +1172,13 @@ expand_delta(_SeqId, #delta { count = Count,
init(IsDurable, IndexState, StoreState, DeltaCount, DeltaBytes, Terms,
PersistentClient, TransientClient, VHost) ->
{LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState),
NextSeqIdHint =
case Terms of
non_clean_shutdown -> undefined;
_ -> proplists:get_value(next_seq_id, Terms)
end,
{LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState, NextSeqIdHint),
{NextSeqId, NextDeliverSeqId, DeltaCount1, DeltaBytes1} =
case Terms of

View File

@ -29,6 +29,7 @@
variable_queue_dropfetchwhile,
variable_queue_dropwhile_restart,
variable_queue_dropwhile_sync_restart,
variable_queue_restart_large_seq_id,
variable_queue_ack_limiting,
variable_queue_purge,
variable_queue_requeue,
@ -1421,6 +1422,45 @@ variable_queue_dropwhile_sync_restart2(VQ0, QName) ->
VQ5.
variable_queue_restart_large_seq_id(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, variable_queue_restart_large_seq_id1, [Config]).
variable_queue_restart_large_seq_id1(Config) ->
with_fresh_variable_queue(
fun variable_queue_restart_large_seq_id2/2,
?config(variable_queue_type, Config)).
variable_queue_restart_large_seq_id2(VQ0, QName) ->
Count = 1,
%% publish and consume a message
VQ1 = publish_fetch_and_ack(Count, 0, VQ0),
%% should be empty now
true = rabbit_variable_queue:is_empty(VQ1),
_VQ2 = rabbit_variable_queue:terminate(shutdown, VQ1),
Terms = variable_queue_read_terms(QName),
Count = proplists:get_value(next_seq_id, Terms),
%% set a very high next_seq_id as if 100M messages have been
%% published and consumed
Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000}),
{TInit, VQ3} =
timer:tc(
fun() -> variable_queue_init(test_amqqueue(QName, true), Terms2) end,
millisecond),
%% even with a very high next_seq_id start of an empty queue
%% should be quick (few milliseconds, but let's give it 100ms, to
%% avoid flaking on slow servers)
{true, _} = {TInit < 100, TInit},
%% should be empty now
true = rabbit_variable_queue:is_empty(VQ3),
VQ3.
variable_queue_ack_limiting(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, variable_queue_ack_limiting1, [Config]).