From ea68a41ba56feec2a30fa4c78c2b64674b75f13b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20G=C3=B6m=C3=B6ri?= Date: Fri, 9 May 2025 22:00:50 +0200 Subject: [PATCH 1/2] Add tests for rabbit_classic_queue_index_v2:bounds/2 (cherry picked from commit 55e3c458c289a94addb7508dc2ee837aebbe91b6) --- .../src/rabbit_classic_queue_index_v2.erl | 10 ---------- deps/rabbit/test/backing_queue_SUITE.erl | 20 ++++++++++--------- 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl index 70c2579dcf..ee5ca8af66 100644 --- a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl @@ -22,9 +22,6 @@ sync/1, needs_sync/1, flush/1, bounds/2, next_segment_boundary/1]). -%% Only used by tests --export([bounds/1]). - %% Used to upgrade/downgrade from/to the v1 index. -export([init_for_conversion/3]). -export([init_args/1]). @@ -1191,13 +1188,6 @@ flush_pre_publish_cache(TargetRamCount, State) -> %% the test suite to pass. This can probably be made more accurate %% in the future. -%% `bounds/1` is only used by tests --spec bounds(State) -> - {non_neg_integer(), non_neg_integer(), State} - when State::state(). -bounds(State) -> - bounds(State, undefined). - -spec bounds(State, non_neg_integer() | undefined) -> {non_neg_integer(), non_neg_integer(), State} when State::state(). diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index adda1cdf8b..1871307bff 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -801,7 +801,9 @@ bq_queue_index1(_Config) -> TwoSegs = SegmentSize + SegmentSize, MostOfASegment = trunc(SegmentSize*0.75), SeqIdsA = lists:seq(0, MostOfASegment-1), + NextSeqIdA = MostOfASegment, SeqIdsB = lists:seq(MostOfASegment, 2*MostOfASegment), + NextSeqIdB = 2 * MostOfASegment + 1, SeqIdsC = lists:seq(0, trunc(SegmentSize/2)), SeqIdsD = lists:seq(0, SegmentSize*4), @@ -809,17 +811,17 @@ bq_queue_index1(_Config) -> with_empty_test_queue( fun (Qi0, QName) -> - {0, 0, Qi1} = IndexMod:bounds(Qi0), + {0, 0, Qi1} = IndexMod:bounds(Qi0, undefined), {Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1), - {0, SegmentSize, Qi3} = IndexMod:bounds(Qi2), + {0, SegmentSize, Qi3} = IndexMod:bounds(Qi2, NextSeqIdA), {ReadA, Qi4} = IndexMod:read(0, SegmentSize, Qi3), ok = VerifyReadWithPublishedFun(false, ReadA, lists:reverse(SeqIdsMsgIdsA)), %% should get length back as 0, as all the msgs were transient {0, 0, Qi6} = restart_test_queue(Qi4, QName), - {0, 0, Qi7} = IndexMod:bounds(Qi6), + {NextSeqIdA, NextSeqIdA, Qi7} = IndexMod:bounds(Qi6, NextSeqIdA), {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7), - {0, TwoSegs, Qi9} = IndexMod:bounds(Qi8), + {0, TwoSegs, Qi9} = IndexMod:bounds(Qi8, NextSeqIdB), {ReadB, Qi10} = IndexMod:read(0, SegmentSize, Qi9), ok = VerifyReadWithPublishedFun(true, ReadB, lists:reverse(SeqIdsMsgIdsB)), @@ -827,7 +829,7 @@ bq_queue_index1(_Config) -> LenB = length(SeqIdsB), BytesB = LenB * 10, {LenB, BytesB, Qi12} = restart_test_queue(Qi10, QName), - {0, TwoSegs, Qi13} = IndexMod:bounds(Qi12), + {0, TwoSegs, Qi13} = IndexMod:bounds(Qi12, NextSeqIdB), Qi15 = case IndexMod of rabbit_queue_index -> Qi14 = IndexMod:deliver(SeqIdsB, Qi13), @@ -841,7 +843,7 @@ bq_queue_index1(_Config) -> {_DeletedSegments, Qi16} = IndexMod:ack(SeqIdsB, Qi15), Qi17 = IndexMod:flush(Qi16), %% Everything will have gone now because #pubs == #acks - {0, 0, Qi18} = IndexMod:bounds(Qi17), + {NextSeqIdB, NextSeqIdB, Qi18} = IndexMod:bounds(Qi17, NextSeqIdB), %% should get length back as 0 because all persistent %% msgs have been acked {0, 0, Qi19} = restart_test_queue(Qi18, QName), @@ -996,7 +998,7 @@ v2_delete_segment_file_completely_acked1(_Config) -> %% Publish a full segment file. {Qi1, SeqIdsMsgIds} = queue_index_publish(SeqIds, true, Qi0), SegmentSize = length(SeqIdsMsgIds), - {0, SegmentSize, Qi2} = IndexMod:bounds(Qi1), + {0, SegmentSize, Qi2} = IndexMod:bounds(Qi1, undefined), %% Confirm that the file exists on disk. Path = IndexMod:segment_file(0, Qi2), true = filelib:is_file(Path), @@ -1024,7 +1026,7 @@ v2_delete_segment_file_partially_acked1(_Config) -> %% Publish a partial segment file. {Qi1, SeqIdsMsgIds} = queue_index_publish(SeqIds, true, Qi0), SeqIdsLen = length(SeqIdsMsgIds), - {0, SegmentSize, Qi2} = IndexMod:bounds(Qi1), + {0, SegmentSize, Qi2} = IndexMod:bounds(Qi1, undefined), %% Confirm that the file exists on disk. Path = IndexMod:segment_file(0, Qi2), true = filelib:is_file(Path), @@ -1054,7 +1056,7 @@ v2_delete_segment_file_partially_acked_with_holes1(_Config) -> {Qi1, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, true, Qi0), {Qi2, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi1), SeqIdsLen = length(SeqIdsMsgIdsA) + length(SeqIdsMsgIdsB), - {0, SegmentSize, Qi3} = IndexMod:bounds(Qi2), + {0, SegmentSize, Qi3} = IndexMod:bounds(Qi2, undefined), %% Confirm that the file exists on disk. Path = IndexMod:segment_file(0, Qi3), true = filelib:is_file(Path), From 383818e27f8c0c0bfaeb76e2d0fb565d67a19edc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20G=C3=B6m=C3=B6ri?= Date: Fri, 9 May 2025 22:14:30 +0200 Subject: [PATCH 2/2] Fix comment about CQ v1->v2 index recovery (cherry picked from commit ec455d5cff2e101f5b756e784a13afafa22baeae) --- deps/rabbit/src/rabbit_classic_queue_index_v2.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl index ee5ca8af66..3dc4d2f9bc 100644 --- a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl @@ -479,7 +479,7 @@ recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = DirBin }, {LoSeqId, HiSeqId, _} = rabbit_queue_index:bounds(V1State), %% When resuming after a crash we need to double check the messages that are both %% in the v1 and v2 index (effectively the messages below the upper bound of the - %% v1 index that are about to be written to it). + %% v2 index that are about to be written to it). {_, V2HiSeqId, _} = bounds(State0, undefined), SkipFun = fun (SeqId, FunState0) when SeqId < V2HiSeqId ->