diff --git a/deps/rabbit/src/rabbit_fifo_index.erl b/deps/rabbit/src/rabbit_fifo_index.erl index 8a8fbbdb9e..852724c35a 100644 --- a/deps/rabbit/src/rabbit_fifo_index.erl +++ b/deps/rabbit/src/rabbit_fifo_index.erl @@ -7,7 +7,8 @@ delete/2, size/1, smallest/1, - map/2 + map/2, + to_list/1 ]). -compile({no_auto_import, [size/1]}). @@ -87,6 +88,10 @@ smallest(#?MODULE{smallest = Smallest}) -> map(F, #?MODULE{data = Data} = State) -> State#?MODULE{data = maps:map(F, Data)}. +% Note: the ordering of the list is undefined. Sort the list for ordering. +-spec to_list(state()) -> [integer()]. +to_list(#?MODULE{data = Data}) -> + maps:keys(Data). %% internal diff --git a/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl b/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl index 273597982f..31d3842493 100644 --- a/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_prop_SUITE.erl @@ -64,6 +64,7 @@ all_tests() -> scenario32, upgrade, messages_total, + ra_indexes, simple_prefetch, simple_prefetch_without_checkout_cancel, simple_prefetch_01, @@ -910,6 +911,30 @@ messages_total(_Config) -> end) end, [], Size). +ra_indexes(_Config) -> + meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> false end), + Size = 256, + run_proper( + fun () -> + ?FORALL({Length, Bytes, DeliveryLimit, SingleActive}, + frequency([{5, {undefined, undefined, undefined, false}}, + {5, {oneof([range(1, 10), undefined]), + oneof([range(1, 1000), undefined]), + oneof([range(1, 3), undefined]), + oneof([true, false]) + }}]), + begin + Config = config(?FUNCTION_NAME, + Length, + Bytes, + SingleActive, + DeliveryLimit), + ?FORALL(O, ?LET(Ops, log_gen(Size), expand(Ops, Config)), + collect({log_size, length(O)}, + ra_indexes_prop(Config, O))) + end) + end, [], Size). + simple_prefetch(_Config) -> Size = 500, meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> true end), @@ -1464,6 +1489,38 @@ messages_total_invariant() -> end end. +ra_indexes_prop(Conf0, Commands) -> + Conf = Conf0#{release_cursor_interval => 100}, + Indexes = lists:seq(1, length(Commands)), + Entries = lists:zip(Indexes, Commands), + InitState = test_init(Conf), + run_log(InitState, Entries, ra_indexes_invariant()), + true. + +ra_indexes_invariant() -> + %% The raft indexes contained in the `ra_indexes` `rabbit_fifo_index` must + %% be the same as all indexes checked out by consumers plus those in the + %% `returns` queue. + fun(#rabbit_fifo{ra_indexes = Index, + consumers = C, + returns = R}) -> + RIdxs = lqueue:fold(fun(?MSG(I, _), Acc) -> [I | Acc] end, [], R), + CIdxs = maps:fold(fun(_, #consumer{checked_out = Ch}, Acc0) -> + maps:fold(fun(_, ?MSG(I, _), Acc) -> + [I | Acc] + end, Acc0, Ch) + end, [], C), + ActualIdxs = lists:sort(RIdxs ++ CIdxs), + IndexIdxs = lists:sort(rabbit_fifo_index:to_list(Index)), + case ActualIdxs == IndexIdxs of + true -> true; + false -> + ct:pal("ra_indexes invariant failed Expected ~b Got ~b", + [ActualIdxs, IndexIdxs]), + false + end + end. + simple_prefetch_prop(Conf0, Commands, WithCheckoutCancel) -> Conf = Conf0#{release_cursor_interval => 100}, Indexes = lists:seq(1, length(Commands)),