Emit live indexes

This commit is contained in:
David Ansari 2025-05-23 13:27:57 +02:00
parent c9694cadd0
commit ab7a5a63bb
4 changed files with 49 additions and 4 deletions

View File

@ -999,8 +999,9 @@ message_properties(Message, Confirm, #q{ttl = TTL}) ->
calculate_msg_expiry(Msg, TTL) ->
MsgTTL = mc:ttl(Msg),
case lists:min([TTL, MsgTTL]) of
undefined -> undefined;
case min(TTL, MsgTTL) of
undefined ->
undefined;
T ->
os:system_time(microsecond) + T * 1000
end.

View File

@ -43,6 +43,7 @@
apply/3,
state_enter/2,
tick/2,
live_indexes/1,
overview/1,
get_checked_out/4,
@ -888,6 +889,25 @@ tick(Ts, #?STATE{cfg = #cfg{resource = QName}} = State) ->
[{aux, {handle_tick, [QName, overview(State), all_nodes(State)]}}]
end.
-spec live_indexes(state()) -> [ra:index()].
live_indexes(#?STATE{cfg = #cfg{filter_enabled = true},
messages = Messages,
ra_indexes = Indexes,
dlx = DlxState}) ->
{Size0, Idxs0} = rabbit_fifo_filter_q:ra_indexes(Messages),
{Size1, Idxs1} = rabbit_fifo_dlx:raft_indexes(DlxState),
Size2 = rabbit_fifo_index:size(Indexes),
Idxs2 = rabbit_fifo_index:to_list(Indexes),
%% Avoid copying the longest list
case max(max(Size0, Size1), Size2) of
Size0 -> Idxs1 ++ (Idxs2 ++ Idxs0);
Size1 -> Idxs0 ++ (Idxs2 ++ Idxs1);
Size2 -> Idxs0 ++ (Idxs1 ++ Idxs2)
end;
live_indexes(#?STATE{cfg = #cfg{filter_enabled = false}}) ->
%%TODO is this correct?
[].
-spec overview(state()) -> map().
overview(#?STATE{consumers = Cons,
enqueuers = Enqs,
@ -3148,7 +3168,7 @@ smallest_raft_index(#?STATE{messages = Messages,
SmallestDlxRaIdx = rabbit_fifo_dlx:smallest_raft_index(DlxState),
SmallestMsgsRaIdx = rabbit_fifo_q:get_lowest_index(Messages),
SmallestRaIdx = rabbit_fifo_index:smallest(Indexes),
lists:min([SmallestDlxRaIdx, SmallestMsgsRaIdx, SmallestRaIdx]).
min(min(SmallestDlxRaIdx, SmallestMsgsRaIdx), SmallestRaIdx).
make_requeue(ConsumerKey, Notify, [{MsgId, Idx, Header, Msg}], Acc) ->
lists:reverse([{append,

View File

@ -25,7 +25,8 @@
dehydrate/1,
stat/1,
update_config/4,
smallest_raft_index/1
smallest_raft_index/1,
raft_indexes/1
]).
-record(checkout, {consumer :: pid(),
@ -364,5 +365,11 @@ dehydrate(State) ->
smallest_raft_index(#?MODULE{ra_indexes = Indexes}) ->
rabbit_fifo_index:smallest(Indexes).
-spec raft_indexes(state()) ->
{non_neg_integer(), [ra:index()]}.
raft_indexes(#?MODULE{ra_indexes = Indexes}) ->
{rabbit_fifo_index:size(Indexes),
rabbit_fifo_index:to_list(Indexes)}.
annotate_msg(H, Msg) ->
rabbit_fifo:annotate_msg(H, Msg).

View File

@ -9,6 +9,7 @@
take/3,
take/4,
is_fully_scanned/2,
ra_indexes/1,
overview/1
]).
@ -143,6 +144,22 @@ is_fully_scanned({HiIdx, NoIdx}, #?MODULE{hi_max = HiMax,
HiIdx >= HiMax andalso
NoIdx >= NoMax.
-spec ra_indexes(state()) ->
{non_neg_integer(), [ra:index()]}.
ra_indexes(#?MODULE{hi = Hi,
no = No}) ->
SizeHi = gb_trees:size(Hi),
SizeNo = gb_trees:size(No),
KeysHi = gb_trees:keys(Hi),
KeysNo = gb_trees:keys(No),
Keys = case SizeHi < SizeNo of
true ->
KeysHi ++ KeysNo;
false ->
KeysNo ++ KeysHi
end,
{SizeHi + SizeNo, Keys}.
-spec overview(state()) ->
#{len := non_neg_integer(),
num_hi := non_neg_integer(),