Merge pull request #13250 from cloudamqp/large_message_rdq_scan

4.x: Optimise msg_store recovery in case of large message file
This commit is contained in:
Michael Klishin 2025-02-19 15:49:29 -05:00 committed by GitHub
commit 0eb65c2f86
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 45 additions and 17 deletions

View File

@ -1515,28 +1515,38 @@ scan_data(<<Size:64, MsgIdAndMsg:Size/binary, 255, Rest/bits>> = Data,
%% a remnant from a previous compaction, but it might
%% simply be a coincidence. Try the next byte.
#{MsgIdInt := true} ->
<<_, Rest2/bits>> = Data,
scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc);
scan_next_byte(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc);
%% Data looks to be a message.
_ ->
%% Avoid sub-binary construction.
MsgId = <<MsgIdInt:128>>,
TotalSize = Size + 9,
case Fun({MsgId, TotalSize, Offset}) of
%% Confirmed to be a message by the provided fun.
{valid, Entry} ->
case check_msg(Fun, MsgIdInt, TotalSize, Offset, Acc) of
{continue, NewAcc} ->
scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize,
MsgIdsFound#{MsgIdInt => true}, [Entry|Acc]);
%% Confirmed to be a message but we don't need it anymore.
previously_valid ->
scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize,
MsgIdsFound#{MsgIdInt => true}, Acc);
%% Not a message, try the next byte.
invalid ->
<<_, Rest2/bits>> = Data,
scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc)
MsgIdsFound#{MsgIdInt => true}, NewAcc);
try_next_byte ->
scan_next_byte(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
end
end;
%% Large message alone in its own file
scan_data(<<Size:64, MsgIdInt:128, _Rest/bits>> = Data, Fd, Fun, Offset, FileSize, _MsgIdsFound, _Acc)
when Offset == 0,
FileSize == Size + 9 ->
{ok, CurrentPos} = file:position(Fd, cur),
case file:pread(Fd, FileSize - 1, 1) of
{ok, <<255>>} ->
TotalSize = FileSize,
case check_msg(Fun, MsgIdInt, TotalSize, Offset, []) of
{continue, NewAcc} ->
NewAcc;
try_next_byte ->
{ok, _} = file:position(Fd, CurrentPos),
scan_next_byte(Data, Fd, Fun, Offset, FileSize, #{}, [])
end;
_ ->
%% Wrong end marker
{ok, _} = file:position(Fd, CurrentPos),
scan_next_byte(Data, Fd, Fun, Offset, FileSize, #{}, [])
end;
%% This might be the start of a message.
scan_data(<<Size:64, Rest/bits>> = Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
when byte_size(Rest) < Size + 1, Size < FileSize - Offset ->
@ -1545,9 +1555,27 @@ scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
when byte_size(Data) < 8 ->
scan(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc);
%% This is definitely not a message. Try the next byte.
scan_data(<<_, Rest/bits>>, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) ->
scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) ->
scan_next_byte(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc).
scan_next_byte(<<_, Rest/bits>>, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) ->
scan_data(Rest, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc).
check_msg(Fun, MsgIdInt, TotalSize, Offset, Acc) ->
%% Avoid sub-binary construction.
MsgId = <<MsgIdInt:128>>,
case Fun({MsgId, TotalSize, Offset}) of
%% Confirmed to be a message by the provided fun.
{valid, Entry} ->
{continue, [Entry|Acc]};
%% Confirmed to be a message but we don't need it anymore.
previously_valid ->
{continue, Acc};
%% Not a message, try the next byte.
invalid ->
try_next_byte
end.
%%----------------------------------------------------------------------------
%% Ets index
%%----------------------------------------------------------------------------