Optimise msg_store recovery in case of large message file

Since 4.0.0 (commit d45fbc3d) the shared message store writes large
messages into their own rdq files. This information can be utilised
when scanning rdq files during recovery to avoid reading in the whole
message body into memory unnecessarily.

This commit addresses the same issue that was addressed in 3.13.x by
commit baeefbec (ie. appending a large binary together from 4MB chunks
leaves a lot of garbage and memory fragmentation behind) but even more
efficiently.

Large messages which were written before 4.0.0, which don't fully fill
the rdq file, are still handled as before.
This commit is contained in:
Péter Gömöri 2025-02-12 15:33:03 +01:00
parent c4706616db
commit fb21a19b72
1 changed files with 45 additions and 17 deletions

View File

@ -1515,28 +1515,38 @@ scan_data(<<Size:64, MsgIdAndMsg:Size/binary, 255, Rest/bits>> = Data,
%% a remnant from a previous compaction, but it might %% a remnant from a previous compaction, but it might
%% simply be a coincidence. Try the next byte. %% simply be a coincidence. Try the next byte.
#{MsgIdInt := true} -> #{MsgIdInt := true} ->
<<_, Rest2/bits>> = Data, scan_next_byte(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc);
scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc);
%% Data looks to be a message. %% Data looks to be a message.
_ -> _ ->
%% Avoid sub-binary construction.
MsgId = <<MsgIdInt:128>>,
TotalSize = Size + 9, TotalSize = Size + 9,
case Fun({MsgId, TotalSize, Offset}) of case check_msg(Fun, MsgIdInt, TotalSize, Offset, Acc) of
%% Confirmed to be a message by the provided fun. {continue, NewAcc} ->
{valid, Entry} ->
scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize, scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize,
MsgIdsFound#{MsgIdInt => true}, [Entry|Acc]); MsgIdsFound#{MsgIdInt => true}, NewAcc);
%% Confirmed to be a message but we don't need it anymore. try_next_byte ->
previously_valid -> scan_next_byte(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize,
MsgIdsFound#{MsgIdInt => true}, Acc);
%% Not a message, try the next byte.
invalid ->
<<_, Rest2/bits>> = Data,
scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc)
end end
end; end;
%% Large message alone in its own file
scan_data(<<Size:64, MsgIdInt:128, _Rest/bits>> = Data, Fd, Fun, Offset, FileSize, _MsgIdsFound, _Acc)
when Offset == 0,
FileSize == Size + 9 ->
{ok, CurrentPos} = file:position(Fd, cur),
case file:pread(Fd, FileSize - 1, 1) of
{ok, <<255>>} ->
TotalSize = FileSize,
case check_msg(Fun, MsgIdInt, TotalSize, Offset, []) of
{continue, NewAcc} ->
NewAcc;
try_next_byte ->
{ok, _} = file:position(Fd, CurrentPos),
scan_next_byte(Data, Fd, Fun, Offset, FileSize, #{}, [])
end;
_ ->
%% Wrong end marker
{ok, _} = file:position(Fd, CurrentPos),
scan_next_byte(Data, Fd, Fun, Offset, FileSize, #{}, [])
end;
%% This might be the start of a message. %% This might be the start of a message.
scan_data(<<Size:64, Rest/bits>> = Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) scan_data(<<Size:64, Rest/bits>> = Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
when byte_size(Rest) < Size + 1, Size < FileSize - Offset -> when byte_size(Rest) < Size + 1, Size < FileSize - Offset ->
@ -1545,9 +1555,27 @@ scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
when byte_size(Data) < 8 -> when byte_size(Data) < 8 ->
scan(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc); scan(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc);
%% This is definitely not a message. Try the next byte. %% This is definitely not a message. Try the next byte.
scan_data(<<_, Rest/bits>>, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) -> scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) ->
scan_next_byte(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc).
scan_next_byte(<<_, Rest/bits>>, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) ->
scan_data(Rest, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc). scan_data(Rest, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc).
check_msg(Fun, MsgIdInt, TotalSize, Offset, Acc) ->
%% Avoid sub-binary construction.
MsgId = <<MsgIdInt:128>>,
case Fun({MsgId, TotalSize, Offset}) of
%% Confirmed to be a message by the provided fun.
{valid, Entry} ->
{continue, [Entry|Acc]};
%% Confirmed to be a message but we don't need it anymore.
previously_valid ->
{continue, Acc};
%% Not a message, try the next byte.
invalid ->
try_next_byte
end.
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
%% Ets index %% Ets index
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------