CQ: Fix shared store crashes

Crashes could happen because compaction would wrongly write
over valid messages, or truncate over valid messages, because
when looking for messages into the files it would encounter
leftover data that made it look like there was a message,
which prompted compaction to not look for the real messages
hidden within.

To avoid this we ensure that there can't be leftover data
as a result of compaction. We get this guarantee by blanking
data in the holes in the file before we start copying messages
closer to the start of the file. This requires us to do a few
more writes but we know that the only data in the files at any
point are valid messages.

Note that it's possible that some of the messages in the files
are no longer referenced; that's OK. We filter them out after
scanning the file.

This was also a good time to merge two almost identical scan
functions, and be more explicit about what messages should be
dropped after scanning the file (the messages no longer in the
ets index and the fan-out messages that ended up re-written in
a more recent file).
This commit is contained in:
Loïc Hoguin 2024-04-24 17:28:50 +02:00
parent e48b0c5214
commit fcd011f9cf
No known key found for this signature in database
GPG Key ID: C69E26E3A9DF618F
2 changed files with 46 additions and 25 deletions

View File

@ -1830,8 +1830,15 @@ compact_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
%% Open the file.
FileName = filenum_to_name(File),
{ok, Fd} = file:open(form_filename(Dir, FileName), [read, write, binary, raw]),
%% Load the messages.
Messages = load_and_vacuum_message_file(File, State),
%% Load the messages. It's possible to get 0 messages here;
%% that's OK. That means we have little to do as the file is
%% about to be deleted.
{Messages, _} = scan_and_vacuum_message_file(File, State),
%% Blank holes. We must do this first otherwise the file is left
%% with data that may confuse the code (for example data that looks
%% like a message, isn't a message, but spans over a real message).
%% We blank more than is likely required but better safe than sorry.
blank_holes_in_file(Fd, Messages),
%% Compact the file.
{ok, TruncateSize, IndexUpdates} = do_compact_file(Fd, 0, Messages, lists:reverse(Messages), []),
%% Sync and close the file.
@ -1876,6 +1883,32 @@ compact_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
garbage_collect(),
ok.
%% We must special case the blanking of the beginning of the file.
blank_holes_in_file(Fd, [#msg_location{ offset = Offset }|Tail])
when Offset =/= 0 ->
Bytes = <<0:Offset/unit:8>>,
ok = file:pwrite(Fd, 0, Bytes),
blank_holes_in_file1(Fd, Tail);
blank_holes_in_file(Fd, Messages) ->
blank_holes_in_file1(Fd, Messages).
blank_holes_in_file1(Fd, [
#msg_location{ offset = OneOffset, total_size = OneSize },
#msg_location{ offset = TwoOffset } = Two
|Tail]) when OneOffset + OneSize < TwoOffset ->
Offset = OneOffset + OneSize,
Size = TwoOffset - Offset,
Bytes = <<0:Size/unit:8>>,
ok = file:pwrite(Fd, Offset, Bytes),
blank_holes_in_file1(Fd, [Two|Tail]);
%% We either have only one message left, or contiguous messages.
blank_holes_in_file1(Fd, [_|Tail]) ->
blank_holes_in_file1(Fd, Tail);
%% No need to blank the hole past the last message as we will
%% not write there (no confusion possible) and truncate afterwards.
blank_holes_in_file1(_, []) ->
ok.
%% If the message at the end fits into the hole we have found, we copy it there.
%% We will do the ets:updates after the data is synced to disk.
do_compact_file(Fd, Offset, Start = [#msg_location{ offset = StartMsgOffset }|_],
@ -1962,27 +1995,7 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
ok
end.
load_and_vacuum_message_file(File, State = #gc_state{ dir = Dir }) ->
%% Messages here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
lists:foldl(
fun ({MsgId, TotalSize, Offset}, Acc) ->
case index_lookup(MsgId, State) of
#msg_location { file = File, total_size = TotalSize,
offset = Offset, ref_count = 0 } = Entry ->
ok = index_delete_object(Entry, State),
Acc;
#msg_location { file = File, total_size = TotalSize,
offset = Offset } = Entry ->
[ Entry | Acc ];
_ ->
Acc
end
end, [], Messages).
scan_and_vacuum_message_file(File, State = #gc_state { dir = Dir }) ->
scan_and_vacuum_message_file(File, State = #gc_state{ dir = Dir }) ->
%% Messages here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
@ -1997,7 +2010,15 @@ scan_and_vacuum_message_file(File, State = #gc_state { dir = Dir }) ->
#msg_location { file = File, total_size = TotalSize,
offset = Offset } = Entry ->
{[ Entry | List ], TotalSize + Size};
_ ->
%% Fan-out may remove the entry but also write a new
%% entry in a different file when it needs to write
%% a message and the existing reference is in a file
%% that's about to be deleted. So we explicitly accept
%% these cases and ignore this message.
#msg_location { file = OtherFile, total_size = TotalSize }
when File =/= OtherFile ->
Acc;
not_found ->
Acc
end
end, {[], 0}, Messages).

View File

@ -1,4 +1,4 @@
% This Source Code Form is subject to the terms of the Mozilla Public
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%