QQ: remove use of rabbit_fifo_index
Peer Discovery AWS Integration Test / Integration Test (push) Has been cancelled Details

From the main state machine (still used in the dlx module).

This can be done as we no longer need to super efficiently query
the smallest raft index.

Removing it will reduce peak memory use somewhat as well as
simplifying the code.
This commit is contained in:
Karl Nilsson 2025-09-26 09:14:16 +01:00
parent 45121a900b
commit 0053af8146
3 changed files with 36 additions and 140 deletions

View File

@ -60,7 +60,6 @@
query_messages_checked_out/1,
query_messages_total/1,
query_processes/1,
query_ra_indexes/1,
query_waiting_consumers/1,
query_consumer_count/1,
query_consumers/1,
@ -309,12 +308,10 @@ apply(Meta, #modify{consumer_key = ConsumerKey,
apply(#{index := Idx} = Meta,
#requeue{consumer_key = ConsumerKey,
msg_id = MsgId,
index = OldIdx,
index = _OldIdx,
header = Header0},
#?STATE{consumers = Cons,
messages = Messages,
ra_indexes = Indexes0,
enqueue_count = EnqCount} = State00) ->
messages = Messages} = State00) ->
%% the actual consumer key was looked up in the aux handler so we
%% dont need to use find_consumer/2 here
case Cons of
@ -326,12 +323,9 @@ apply(#{index := Idx} = Meta,
State0 = add_bytes_return(Header, State00),
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked0),
credit = increase_credit(Con0, 1)},
State1 = State0#?STATE{ra_indexes = rabbit_fifo_index:delete(OldIdx,
Indexes0),
messages = rabbit_fifo_q:in(no,
State1 = State0#?STATE{messages = rabbit_fifo_q:in(no,
?MSG(Idx, Header),
Messages),
enqueue_count = EnqCount + 1},
Messages)},
State2 = update_or_remove_con(Meta, ConsumerKey, Con, State1),
{State3, Effects} = activate_next_consumer({State2, []}),
checkout(Meta, State0, State3, Effects);
@ -467,31 +461,10 @@ apply(#{index := Idx} = Meta,
checkout(Meta, State0, State2, [{monitor, process, Pid} | Effs], Reply);
apply(#{index := Index}, #purge{},
#?STATE{messages_total = Total,
returns = Returns,
ra_indexes = Indexes0,
msg_bytes_enqueue = MsgBytesEnqueue
} = State0) ->
NumReady = messages_ready(State0),
Indexes = case Total of
NumReady ->
%% All messages are either in 'messages' queue or
%% 'returns' queue.
%% No message is awaiting acknowledgement.
%% Optimization: empty all 'ra_indexes'.
rabbit_fifo_index:empty();
_ ->
%% Some messages are checked out to consumers
%% awaiting acknowledgement.
%% Therefore we cannot empty all 'ra_indexes'.
%% We only need to delete the indexes from the 'returns'
%% queue because messages of the 'messages' queue are
%% not part of the 'ra_indexes'.
lqueue:fold(fun(?MSG(I, _), Acc) ->
rabbit_fifo_index:delete(I, Acc)
end, Indexes0, Returns)
end,
State1 = State0#?STATE{ra_indexes = Indexes,
messages = rabbit_fifo_q:new(),
State1 = State0#?STATE{messages = rabbit_fifo_q:new(),
messages_total = Total - NumReady,
returns = lqueue:new(),
msg_bytes_enqueue = 0
@ -557,7 +530,7 @@ apply(#{system_time := Ts} = Meta,
apply(#{system_time := Ts} = Meta,
{down, Pid, noconnection},
#?STATE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
enqueuers = Enqs0} = State0) ->
%% A node has been disconnected. This doesn't necessarily mean that
%% any processes on this node are down, they _may_ come back so here
%% we just mark them as suspected (effectively deactivated)
@ -716,7 +689,8 @@ snapshot_installed(_Meta, #?MODULE{cfg = #cfg{resource = QR},
convert_v7_to_v8(#{} = _Meta, StateV7) ->
StateV8 = StateV7,
StateV8.
StateV8#?STATE{discarded_bytes = 0,
unused_0 = ?NIL}.
purge_node(Meta, Node, State, Effects) ->
lists:foldl(fun(Pid, {S0, E0}) ->
@ -856,7 +830,6 @@ tick(Ts, #?STATE{cfg = #cfg{resource = QName}} = State) ->
-spec overview(state()) -> map().
overview(#?STATE{consumers = Cons,
enqueuers = Enqs,
% enqueue_count = EnqCount,
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes,
cfg = Cfg,
@ -939,9 +912,7 @@ which_module(8) -> ?MODULE.
-record(snapshot, {index :: ra:index(),
timestamp :: milliseconds(),
% smallest_index :: undefined | ra:index(),
messages_total = 0 :: non_neg_integer(),
% indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(),
bytes_out = 0 :: non_neg_integer()}).
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
-record(?AUX, {name :: atom(),
@ -1227,9 +1198,6 @@ query_processes(#?STATE{enqueuers = Enqs, consumers = Cons0}) ->
maps:keys(maps:merge(Enqs, Cons)).
query_ra_indexes(#?STATE{ra_indexes = RaIndexes}) ->
RaIndexes.
query_waiting_consumers(#?STATE{waiting_consumers = WaitingConsumers}) ->
WaitingConsumers.
@ -1585,14 +1553,12 @@ apply_enqueue(#{index := RaftIdx,
decr_total(#?STATE{messages_total = Tot} = State) ->
State#?STATE{messages_total = Tot - 1}.
drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects) ->
drop_head(#?STATE{} = State0, Effects) ->
case take_next_msg(State0) of
{?MSG(Idx, Header) = Msg, State1} ->
Indexes = rabbit_fifo_index:delete(Idx, Indexes0),
State2 = State1#?STATE{ra_indexes = Indexes},
State3 = decr_total(add_bytes_drop(Header, State2)),
{?MSG(_Idx, Header) = Msg, State1} ->
State = decr_total(add_bytes_drop(Header, State1)),
#?STATE{cfg = #cfg{dead_letter_handler = DLH},
dlx = DlxState} = State = State3,
dlx = DlxState} = State,
{_, DlxEffects} = rabbit_fifo_dlx:discard([Msg], maxlen, DLH, DlxState),
{State, combine_effects(DlxEffects, Effects)};
empty ->
@ -1660,7 +1626,6 @@ update_expiry_header(ExpiryTs, Header) ->
maybe_enqueue(RaftIdx, Ts, undefined, undefined, RawMsg,
{_MetaSize, BodySize},
Effects, #?STATE{msg_bytes_enqueue = Enqueue,
enqueue_count = EnqCount,
messages = Messages,
messages_total = Total} = State0) ->
% direct enqueue without tracking
@ -1670,7 +1635,6 @@ maybe_enqueue(RaftIdx, Ts, undefined, undefined, RawMsg,
Msg = ?MSG(RaftIdx, Header),
PTag = priority_tag(RawMsg),
State = State0#?STATE{msg_bytes_enqueue = Enqueue + Size,
enqueue_count = EnqCount + 1,
messages_total = Total + 1,
messages = rabbit_fifo_q:in(PTag, Msg, Messages)
},
@ -1678,7 +1642,6 @@ maybe_enqueue(RaftIdx, Ts, undefined, undefined, RawMsg,
maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg,
{_MetaSize, BodySize} = Size,
Effects0, #?STATE{msg_bytes_enqueue = Enqueue,
enqueue_count = EnqCount,
enqueuers = Enqueuers0,
messages = Messages,
messages_total = Total} = State0) ->
@ -1704,7 +1667,6 @@ maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg,
end,
PTag = priority_tag(RawMsg),
State = State0#?STATE{msg_bytes_enqueue = Enqueue + BodySize,
enqueue_count = EnqCount + 1,
messages_total = Total + 1,
messages = rabbit_fifo_q:in(PTag, Msg, Messages),
enqueuers = Enqueuers0#{From => Enq},
@ -1747,47 +1709,42 @@ return(Meta, ConsumerKey,
% used to process messages that are finished
complete(Meta, ConsumerKey, [MsgId],
#consumer{checked_out = Checked0} = Con0,
#?STATE{ra_indexes = Indexes0,
msg_bytes_checkout = BytesCheckout,
#?STATE{msg_bytes_checkout = BytesCheckout,
messages_total = Tot} = State0,
Effects) ->
case maps:take(MsgId, Checked0) of
{?MSG(Idx, Hdr), Checked} ->
{?MSG(_Idx, Hdr), Checked} ->
SettledSize = get_header(size, Hdr),
Indexes = rabbit_fifo_index:delete(Idx, Indexes0),
Con = Con0#consumer{checked_out = Checked,
credit = increase_credit(Con0, 1)},
State1 = update_or_remove_con(Meta, ConsumerKey, Con, State0),
{State1#?STATE{ra_indexes = Indexes,
msg_bytes_checkout = BytesCheckout - SettledSize,
messages_total = Tot - 1},
{State1#?STATE{msg_bytes_checkout = BytesCheckout - SettledSize,
messages_total = Tot - 1},
[{aux, {bytes_out, SettledSize}} | Effects]};
error ->
{State0, Effects}
end;
complete(Meta, ConsumerKey, MsgIds,
#consumer{checked_out = Checked0} = Con0,
#?STATE{ra_indexes = Indexes0,
msg_bytes_checkout = BytesCheckout,
#?STATE{msg_bytes_checkout = BytesCheckout,
messages_total = Tot} = State0, Effects) ->
{SettledSize, Checked, Indexes}
{SettledSize, Checked}
= lists:foldl(
fun (MsgId, {S0, Ch0, Idxs}) ->
fun (MsgId, {S0, Ch0}) ->
case maps:take(MsgId, Ch0) of
{?MSG(Idx, Hdr), Ch} ->
{?MSG(_Idx, Hdr), Ch} ->
S = get_header(size, Hdr) + S0,
{S, Ch, rabbit_fifo_index:delete(Idx, Idxs)};
{S, Ch};
error ->
{S0, Ch0, Idxs}
{S0, Ch0}
end
end, {0, Checked0, Indexes0}, MsgIds),
end, {0, Checked0}, MsgIds),
Len = map_size(Checked0) - map_size(Checked),
Con = Con0#consumer{checked_out = Checked,
credit = increase_credit(Con0, Len)},
State1 = update_or_remove_con(Meta, ConsumerKey, Con, State0),
{State1#?STATE{ra_indexes = Indexes,
msg_bytes_checkout = BytesCheckout - SettledSize,
messages_total = Tot - Len},
{State1#?STATE{msg_bytes_checkout = BytesCheckout - SettledSize,
messages_total = Tot - Len},
[{aux, {bytes_out, SettledSize}} | Effects]}.
increase_credit(#consumer{cfg = #consumer_cfg{lifetime = once},
@ -2064,9 +2021,7 @@ add_delivery_effects(Effects0, AccMap, State) ->
end, Effects0, AccMap).
take_next_msg(#?STATE{returns = Returns0,
messages = Messages0,
ra_indexes = Indexes0
} = State) ->
messages = Messages0} = State) ->
case lqueue:out(Returns0) of
{{value, NextMsg}, Returns} ->
{NextMsg, State#?STATE{returns = Returns}};
@ -2074,11 +2029,8 @@ take_next_msg(#?STATE{returns = Returns0,
case rabbit_fifo_q:out(Messages0) of
empty ->
empty;
{?MSG(RaftIdx, _) = Msg, Messages} ->
%% add index here
Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0),
{Msg, State#?STATE{messages = Messages,
ra_indexes = Indexes}}
{?MSG(_RaftIdx, _) = Msg, Messages} ->
{Msg, State#?STATE{messages = Messages}}
end
end.
@ -2216,18 +2168,15 @@ expire_msgs(RaCmdTs, Result, State, Effects) ->
end.
expire(RaCmdTs, State0, Effects) ->
{?MSG(Idx, Header) = Msg,
{?MSG(_Idx, Header) = Msg,
#?STATE{cfg = #cfg{dead_letter_handler = DLH},
dlx = DlxState0,
ra_indexes = Indexes0,
messages_total = Tot,
msg_bytes_enqueue = MsgBytesEnqueue} = State1} =
take_next_msg(State0),
{DlxState, DlxEffects} = rabbit_fifo_dlx:discard([Msg], expired,
DLH, DlxState0),
Indexes = rabbit_fifo_index:delete(Idx, Indexes0),
State = State1#?STATE{dlx = DlxState,
ra_indexes = Indexes,
messages_total = Tot - 1,
msg_bytes_enqueue =
MsgBytesEnqueue - get_header(size, Header)},

View File

@ -187,9 +187,11 @@
messages_total = 0 :: non_neg_integer(),
% queue of returned msg_in_ids - when checking out it picks from
returns = lqueue:new() :: lqueue:lqueue(term()),
% a counter of enqueues - used to trigger shadow copy points
% discareded bytes - a counter that is incremented every time a command
% is procesesed that does not need to be kept (live indexes).
% Approximate, used for triggering snapshots
% reset to 0 when release_cursor gets stored
enqueue_count = 0 :: non_neg_integer(),
discarded_bytes = 0,
% a map containing all the live processes that have ever enqueued
% a message to this queue
enqueuers = #{} :: #{pid() => #enqueuer{}},
@ -198,7 +200,7 @@
% rabbit_fifo_index can be slow when calculating the smallest
% index when there are large gaps but should be faster than gb_trees
% for normal appending operations as it's backed by a map
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
unused_0 = ?NIL,
unused_1 = ?NIL,
% consumers need to reflect consumer state at time of snapshot
consumers = #{} :: #{consumer_key() => consumer()},
@ -211,6 +213,8 @@
%% one is picked if active consumer is cancelled or dies
%% used only when single active consumer is on
waiting_consumers = [] :: [{consumer_key(), consumer()}],
%% records the timestamp whenever the queue was last considered
%% active in terms of consumer activity
last_active :: option(non_neg_integer()),
msg_cache :: option({ra:index(), raw_msg()}),
unused_2 = ?NIL

View File

@ -61,7 +61,6 @@ all_tests() ->
scenario32,
upgrade,
messages_total,
ra_indexes,
simple_prefetch,
simple_prefetch_without_checkout_cancel,
simple_prefetch_01,
@ -910,30 +909,6 @@ messages_total(_Config) ->
end)
end, [], Size).
ra_indexes(_Config) ->
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> false end),
Size = 256,
run_proper(
fun () ->
?FORALL({Length, Bytes, DeliveryLimit, SingleActive},
frequency([{5, {undefined, undefined, undefined, false}},
{5, {oneof([range(1, 10), undefined]),
oneof([range(1, 1000), undefined]),
oneof([range(1, 3), undefined]),
oneof([true, false])
}}]),
begin
Config = config(?FUNCTION_NAME,
Length,
Bytes,
SingleActive,
DeliveryLimit),
?FORALL(O, ?LET(Ops, log_gen(Size), expand(Ops, Config)),
collect({log_size, length(O)},
ra_indexes_prop(Config, O)))
end)
end, [], Size).
simple_prefetch(_Config) ->
Size = 500,
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> true end),
@ -1592,38 +1567,6 @@ messages_total_invariant() ->
end
end.
ra_indexes_prop(Conf0, Commands) ->
Conf = Conf0#{release_cursor_interval => 100},
Indexes = lists:seq(1, length(Commands)),
Entries = lists:zip(Indexes, Commands),
InitState = test_init(Conf),
run_log(InitState, Entries, ra_indexes_invariant()),
true.
ra_indexes_invariant() ->
%% The raft indexes contained in the `ra_indexes` `rabbit_fifo_index` must
%% be the same as all indexes checked out by consumers plus those in the
%% `returns` queue.
fun(#rabbit_fifo{ra_indexes = Index,
consumers = C,
returns = R}) ->
RIdxs = lqueue:fold(fun(?MSG(I, _), Acc) -> [I | Acc] end, [], R),
CIdxs = maps:fold(fun(_, #consumer{checked_out = Ch}, Acc0) ->
maps:fold(fun(_, ?MSG(I, _), Acc) ->
[I | Acc]
end, Acc0, Ch)
end, [], C),
ActualIdxs = lists:sort(RIdxs ++ CIdxs),
IndexIdxs = lists:sort(rabbit_fifo_index:to_list(Index)),
case ActualIdxs == IndexIdxs of
true -> true;
false ->
ct:pal("ra_indexes invariant failed Expected ~b Got ~b",
[ActualIdxs, IndexIdxs]),
false
end
end.
simple_prefetch_prop(Conf0, Commands, WithCheckoutCancel) ->
Conf = Conf0#{release_cursor_interval => 100},
Indexes = lists:seq(1, length(Commands)),