CQ shared store: Delete from index on remove or roll over (#13959)
Trigger a 4.2.x alpha release build / trigger_alpha_build (push) Has been cancelled Details
Test (make) / Build and Xref (1.17, 26) (push) Has been cancelled Details
Test (make) / Build and Xref (1.17, 27) (push) Has been cancelled Details
Test (make) / Test (1.17, 27, khepri) (push) Has been cancelled Details
Test (make) / Test (1.17, 27, mnesia) (push) Has been cancelled Details
Test (make) / Test mixed clusters (1.17, 27, khepri) (push) Has been cancelled Details
Test (make) / Test mixed clusters (1.17, 27, mnesia) (push) Has been cancelled Details
Test (make) / Type check (1.17, 27) (push) Has been cancelled Details

It was expensive to delete files because we had clean up
the index and to get the messages in the file we have to
scan it.

Instead of cleaning up the index on file delete this
commit deletes from the index as soon as possible.
There are two scenarios: messages that are removed
from the current write file, and messages that are
removed from other files. In the latter case, we
can just delete the index entry on remove. For messages
in the current write file, we want to keep the entry
in case fanout is used, because we don't want to write
the fanout message multiple times if we can avoid it.
So we keep track of removes in the current write file
and do a cleanup of these entries on file roll over.

Compared to the previous implementation we will no
longer increase the ref_count of messages that are
not in the current write file, meaning we may do more
writes in fanout scenarios. But at the same time the
file delete operation is much cheaper.

Additionally, we prioritise delete calls in rabbit_msg_store_gc.
Without that change, if the compaction was lagging behind,
we could have file deletion requests queued behind many compaction
requests, leading to many unnecessary compactions of files
that could already be deleted.

Co-authored-by: Michal Kuratczyk <michal.kuratczyk@broadcom.com>
This commit is contained in:
Loïc Hoguin 2025-05-30 17:51:08 +02:00 committed by GitHub
parent 795e66c663
commit 0278980ba0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 73 additions and 28 deletions

View File

@ -77,8 +77,10 @@
current_file, current_file,
%% current file handle since the last fsync? %% current file handle since the last fsync?
current_file_handle, current_file_handle,
%% file handle cache %% current write file offset
current_file_offset, current_file_offset,
%% messages that were potentially removed from the current write file
current_file_removes = [],
%% TRef for our interval timer %% TRef for our interval timer
sync_timer_ref, sync_timer_ref,
%% files that had removes %% files that had removes
@ -1150,7 +1152,11 @@ write_message(MsgId, Msg, CRef,
end, CRef, State1) end, CRef, State1)
end. end.
remove_message(MsgId, CRef, State = #msstate{ index_ets = IndexEts }) -> remove_message(MsgId, CRef,
State = #msstate{
index_ets = IndexEts,
current_file = CurrentFile,
current_file_removes = Removes }) ->
case should_mask_action(CRef, MsgId, State) of case should_mask_action(CRef, MsgId, State) of
{true, _Location} -> {true, _Location} ->
State; State;
@ -1162,22 +1168,32 @@ remove_message(MsgId, CRef, State = #msstate{ index_ets = IndexEts }) ->
%% ets:lookup(FileSummaryEts, File), %% ets:lookup(FileSummaryEts, File),
State; State;
{_Mask, #msg_location { ref_count = RefCount, file = File, {_Mask, #msg_location { ref_count = RefCount, file = File,
total_size = TotalSize }} total_size = TotalSize } = Entry}
when RefCount > 0 -> when RefCount > 0 ->
%% only update field, otherwise bad interaction with %% only update field, otherwise bad interaction with
%% concurrent GC %% concurrent GC
Dec = fun () -> index_update_ref_counter(IndexEts, MsgId, -1) end,
case RefCount of case RefCount of
%% don't remove from cur_file_cache_ets here because %% Don't remove from cur_file_cache_ets here because
%% there may be further writes in the mailbox for the %% there may be further writes in the mailbox for the
%% same msg. %% same msg. We will remove 0 ref_counts when rolling
1 -> ok = Dec(), %% over to the next write file.
delete_file_if_empty( 1 when File =:= CurrentFile ->
File, gc_candidate(File, index_update_ref_counter(IndexEts, MsgId, -1),
adjust_valid_total_size( State1 = State#msstate{current_file_removes =
File, -TotalSize, State))); [Entry#msg_location{ref_count=0}|Removes]},
_ -> ok = Dec(), delete_file_if_empty(
gc_candidate(File, State) File, gc_candidate(File,
adjust_valid_total_size(
File, -TotalSize, State1)));
1 ->
index_delete(IndexEts, MsgId),
delete_file_if_empty(
File, gc_candidate(File,
adjust_valid_total_size(
File, -TotalSize, State)));
_ ->
index_update_ref_counter(IndexEts, MsgId, -1),
gc_candidate(File, State)
end end
end. end.
@ -1239,7 +1255,9 @@ flush_or_roll_to_new_file(
cur_file_cache_ets = CurFileCacheEts, cur_file_cache_ets = CurFileCacheEts,
file_size_limit = FileSizeLimit }) file_size_limit = FileSizeLimit })
when Offset >= FileSizeLimit -> when Offset >= FileSizeLimit ->
State1 = internal_sync(State), %% Cleanup the index of messages that were removed before rolling over.
State0 = cleanup_index_on_roll_over(State),
State1 = internal_sync(State0),
ok = writer_close(CurHdl), ok = writer_close(CurHdl),
NextFile = CurFile + 1, NextFile = CurFile + 1,
{ok, NextHdl} = writer_open(Dir, NextFile), {ok, NextHdl} = writer_open(Dir, NextFile),
@ -1267,6 +1285,8 @@ write_large_message(MsgId, MsgBodyBin,
index_ets = IndexEts, index_ets = IndexEts,
file_summary_ets = FileSummaryEts, file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts }) -> cur_file_cache_ets = CurFileCacheEts }) ->
%% Cleanup the index of messages that were removed before rolling over.
State1 = cleanup_index_on_roll_over(State0),
{LargeMsgFile, LargeMsgHdl} = case CurOffset of {LargeMsgFile, LargeMsgHdl} = case CurOffset of
%% We haven't written in the file yet. Use it. %% We haven't written in the file yet. Use it.
0 -> 0 ->
@ -1286,13 +1306,13 @@ write_large_message(MsgId, MsgBodyBin,
ok = index_insert(IndexEts, ok = index_insert(IndexEts,
#msg_location { msg_id = MsgId, ref_count = 1, file = LargeMsgFile, #msg_location { msg_id = MsgId, ref_count = 1, file = LargeMsgFile,
offset = 0, total_size = TotalSize }), offset = 0, total_size = TotalSize }),
State1 = case CurFile of State2 = case CurFile of
%% We didn't open a new file. We must update the existing value. %% We didn't open a new file. We must update the existing value.
LargeMsgFile -> LargeMsgFile ->
[_,_] = ets:update_counter(FileSummaryEts, LargeMsgFile, [_,_] = ets:update_counter(FileSummaryEts, LargeMsgFile,
[{#file_summary.valid_total_size, TotalSize}, [{#file_summary.valid_total_size, TotalSize},
{#file_summary.file_size, TotalSize}]), {#file_summary.file_size, TotalSize}]),
State0; State1;
%% We opened a new file. We can insert it all at once. %% We opened a new file. We can insert it all at once.
%% We must also check whether we need to delete the previous %% We must also check whether we need to delete the previous
%% current file, because if there is no valid data this is %% current file, because if there is no valid data this is
@ -1303,7 +1323,7 @@ write_large_message(MsgId, MsgBodyBin,
valid_total_size = TotalSize, valid_total_size = TotalSize,
file_size = TotalSize, file_size = TotalSize,
locked = false }), locked = false }),
delete_file_if_empty(CurFile, State0 #msstate { current_file_handle = LargeMsgHdl, delete_file_if_empty(CurFile, State1 #msstate { current_file_handle = LargeMsgHdl,
current_file = LargeMsgFile, current_file = LargeMsgFile,
current_file_offset = TotalSize }) current_file_offset = TotalSize })
end, end,
@ -1318,11 +1338,22 @@ write_large_message(MsgId, MsgBodyBin,
%% Delete messages from the cache that were written to disk. %% Delete messages from the cache that were written to disk.
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}), true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
%% Process confirms (this won't flush; we already did) and continue. %% Process confirms (this won't flush; we already did) and continue.
State = internal_sync(State1), State = internal_sync(State2),
State #msstate { current_file_handle = NextHdl, State #msstate { current_file_handle = NextHdl,
current_file = NextFile, current_file = NextFile,
current_file_offset = 0 }. current_file_offset = 0 }.
cleanup_index_on_roll_over(State = #msstate{
index_ets = IndexEts,
current_file_removes = Removes}) ->
lists:foreach(fun(Entry) ->
%% We delete objects that have ref_count=0. If a message
%% got its ref_count increased, it will not be deleted.
%% We thus avoid extra index lookups to check for ref_count.
index_delete_object(IndexEts, Entry)
end, Removes),
State#msstate{current_file_removes=[]}.
contains_message(MsgId, From, State = #msstate{ index_ets = IndexEts }) -> contains_message(MsgId, From, State = #msstate{ index_ets = IndexEts }) ->
MsgLocation = index_lookup_positive_ref_count(IndexEts, MsgId), MsgLocation = index_lookup_positive_ref_count(IndexEts, MsgId),
gen_server2:reply(From, MsgLocation =/= not_found), gen_server2:reply(From, MsgLocation =/= not_found),
@ -1643,7 +1674,7 @@ index_update(IndexEts, Obj) ->
ok. ok.
index_update_fields(IndexEts, Key, Updates) -> index_update_fields(IndexEts, Key, Updates) ->
true = ets:update_element(IndexEts, Key, Updates), _ = ets:update_element(IndexEts, Key, Updates),
ok. ok.
index_update_ref_counter(IndexEts, Key, RefCount) -> index_update_ref_counter(IndexEts, Key, RefCount) ->
@ -1967,10 +1998,21 @@ delete_file_if_empty(File, State = #msstate {
%% We do not try to look at messages that are not the last because we do not want to %% We do not try to look at messages that are not the last because we do not want to
%% accidentally write over messages that were moved earlier. %% accidentally write over messages that were moved earlier.
compact_file(File, State = #gc_state { index_ets = IndexEts, compact_file(File, State = #gc_state { file_summary_ets = FileSummaryEts }) ->
file_summary_ets = FileSummaryEts, case ets:lookup(FileSummaryEts, File) of
dir = Dir, [] ->
msg_store = Server }) -> rabbit_log:debug("File ~tp has already been deleted; no need to compact",
[File]),
ok;
[#file_summary{file_size = FileSize}] ->
compact_file(File, FileSize, State)
end.
compact_file(File, FileSize,
State = #gc_state { index_ets = IndexEts,
file_summary_ets = FileSummaryEts,
dir = Dir,
msg_store = Server }) ->
%% Get metadata about the file. Will be used to calculate %% Get metadata about the file. Will be used to calculate
%% how much data was reclaimed as a result of compaction. %% how much data was reclaimed as a result of compaction.
[#file_summary{file_size = FileSize}] = ets:lookup(FileSummaryEts, File), [#file_summary{file_size = FileSize}] = ets:lookup(FileSummaryEts, File),
@ -2123,9 +2165,9 @@ truncate_file(File, Size, ThresholdTimestamp, #gc_state{ file_summary_ets = File
-spec delete_file(non_neg_integer(), gc_state()) -> ok | defer. -spec delete_file(non_neg_integer(), gc_state()) -> ok | defer.
delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, delete_file(File, #gc_state { file_summary_ets = FileSummaryEts,
file_handles_ets = FileHandlesEts, file_handles_ets = FileHandlesEts,
dir = Dir }) -> dir = Dir }) ->
case ets:match_object(FileHandlesEts, {{'_', File}, '_'}, 1) of case ets:match_object(FileHandlesEts, {{'_', File}, '_'}, 1) of
{[_|_], _Cont} -> {[_|_], _Cont} ->
rabbit_log:debug("Asked to delete file ~p but it has active readers. Deferring.", rabbit_log:debug("Asked to delete file ~p but it has active readers. Deferring.",
@ -2134,7 +2176,6 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
_ -> _ ->
[#file_summary{ valid_total_size = 0, [#file_summary{ valid_total_size = 0,
file_size = FileSize }] = ets:lookup(FileSummaryEts, File), file_size = FileSize }] = ets:lookup(FileSummaryEts, File),
[] = scan_and_vacuum_message_file(File, State),
ok = file:delete(form_filename(Dir, filenum_to_name(File))), ok = file:delete(form_filename(Dir, filenum_to_name(File))),
true = ets:delete(FileSummaryEts, File), true = ets:delete(FileSummaryEts, File),
rabbit_log:debug("Deleted empty file number ~tp; reclaimed ~tp bytes", [File, FileSize]), rabbit_log:debug("Deleted empty file number ~tp; reclaimed ~tp bytes", [File, FileSize]),

View File

@ -12,7 +12,7 @@
-export([start_link/1, compact/2, truncate/4, delete/2, stop/1]). -export([start_link/1, compact/2, truncate/4, delete/2, stop/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3, prioritise_cast/3]).
-record(state, -record(state,
{ pending, { pending,
@ -51,6 +51,10 @@ delete(Server, File) ->
stop(Server) -> stop(Server) ->
gen_server2:call(Server, stop, infinity). gen_server2:call(Server, stop, infinity).
%% TODO replace with priority messages for OTP28+
prioritise_cast({delete, _}, _Len, _State) -> 5;
prioritise_cast(_, _Len, _State) -> 0.
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
init([MsgStoreState]) -> init([MsgStoreState]) ->