Fix CQ shared store files not deleted with large messages

We must consider whether the previous current file is empty
(has data written, but was already removed) when writing
large messages and opening a file specifically for the large
message. If we don't, then the file will never get deleted
as we only consider files for deletion when a message gets
removed (and there are none).

This is only an issue for large messages. Small messages
write a message than roll over to a new file, so there is
at least one valid message. Large messages close the current
file first, regardless of there being a valid message.
This commit is contained in:
Loïc Hoguin 2025-02-26 11:30:22 +01:00
parent 9dd6fa7fdd
commit 6cf69e2a19
No known key found for this signature in database
GPG Key ID: C69E26E3A9DF618F
1 changed files with 11 additions and 4 deletions

View File

@ -1274,19 +1274,26 @@ write_large_message(MsgId, MsgBodyBin,
ok = index_insert(IndexEts,
#msg_location { msg_id = MsgId, ref_count = 1, file = LargeMsgFile,
offset = 0, total_size = TotalSize }),
_ = case CurFile of
State1 = case CurFile of
%% We didn't open a new file. We must update the existing value.
LargeMsgFile ->
[_,_] = ets:update_counter(FileSummaryEts, LargeMsgFile,
[{#file_summary.valid_total_size, TotalSize},
{#file_summary.file_size, TotalSize}]);
{#file_summary.file_size, TotalSize}]),
State0;
%% We opened a new file. We can insert it all at once.
%% We must also check whether we need to delete the previous
%% current file, because if there is no valid data this is
%% the only time we will consider it (outside recovery).
_ ->
true = ets:insert_new(FileSummaryEts, #file_summary {
file = LargeMsgFile,
valid_total_size = TotalSize,
file_size = TotalSize,
locked = false })
locked = false }),
delete_file_if_empty(CurFile, State0 #msstate { current_file_handle = LargeMsgHdl,
current_file = LargeMsgFile,
current_file_offset = TotalSize })
end,
%% Roll over to the next file.
NextFile = LargeMsgFile + 1,
@ -1299,7 +1306,7 @@ write_large_message(MsgId, MsgBodyBin,
%% Delete messages from the cache that were written to disk.
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
%% Process confirms (this won't flush; we already did) and continue.
State = internal_sync(State0),
State = internal_sync(State1),
State #msstate { current_file_handle = NextHdl,
current_file = NextFile,
current_file_offset = 0 }.