CQ: Improve reading from shared message store

This commit replaces file combining with single-file compaction
where data is moved near the beginning of the file before
updating the index entries. The file is then truncated when
all existing readers are gone. This allows removing the lock
that existed before and enables reading multiple messages at
once from the shared files. This also helps us avoid many
ets operations and simplify the code greatly.

This commit still has some issues: reading a single message
is currently slow due to the removal of FHC in the client
code. This will be resolved by implementing read buffering
in a similar way as FHC but without keeping files open
more than necessary.

The dirty recovery code also likely has a number of issues
because of the compaction changes.
This commit is contained in:
Loïc Hoguin 2022-10-21 13:38:51 +02:00
parent 6bee34926d
commit 32816c0a76
No known key found for this signature in database
GPG Key ID: C69E26E3A9DF618F
7 changed files with 677 additions and 729 deletions

View File

@ -7,7 +7,7 @@
-module(rabbit_msg_file).
-export([append/3, read/2, scan/4]).
-export([append/3, read/2, pread/2, pread/3, scan/4]).
%%----------------------------------------------------------------------------
@ -39,6 +39,9 @@
append(FileHdl, MsgId, MsgBody)
when is_binary(MsgId) andalso size(MsgId) =:= ?MSG_ID_SIZE_BYTES ->
%% @todo I think we are actually writing MsgId TWICE: once in
%% the header, once in the body. Might be better to reduce
%% the size of the body...
MsgBodyBin = term_to_binary(MsgBody),
MsgBodyBinSize = size(MsgBodyBin),
Size = MsgBodyBinSize + ?MSG_ID_SIZE_BYTES,
@ -67,6 +70,44 @@ read(FileHdl, TotalSize) ->
KO -> KO
end.
-spec pread(io_device(), position(), msg_size()) ->
rabbit_types:ok_or_error2({rabbit_types:msg_id(), msg()},
any()).
pread(FileHdl, Offset, TotalSize) ->
Size = TotalSize - ?FILE_PACKING_ADJUSTMENT,
BodyBinSize = Size - ?MSG_ID_SIZE_BYTES,
case file:pread(FileHdl, Offset, TotalSize) of
{ok, <<Size:?INTEGER_SIZE_BITS,
MsgId:?MSG_ID_SIZE_BYTES/binary,
MsgBodyBin:BodyBinSize/binary,
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} ->
{ok, {MsgId, binary_to_term(MsgBodyBin)}};
KO -> KO
end.
-spec pread(io_device(), [{position(), msg_size()}]) ->
{ok, [msg()]} | {error, any()} | eof.
pread(FileHdl, LocNums) ->
case file:pread(FileHdl, LocNums) of
{ok, DataL} -> {ok, pread_parse(DataL)};
KO -> KO
end.
pread_parse([<<Size:?INTEGER_SIZE_BITS,
_MsgId:?MSG_ID_SIZE_BYTES/binary,
Rest0/bits>>|Tail]) ->
BodyBinSize = Size - ?MSG_ID_SIZE_BYTES,
<<MsgBodyBin:BodyBinSize/binary,
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS,
Rest/bits>> = Rest0,
[binary_to_term(MsgBodyBin)|pread_parse([Rest|Tail])];
pread_parse([<<>>]) ->
[];
pread_parse([<<>>|Tail]) ->
pread_parse(Tail).
-spec scan(io_device(), file_size(), message_accumulator(A), A) ->
{'ok', A, position()}.

File diff suppressed because it is too large Load Diff

View File

@ -12,7 +12,7 @@
-behaviour(rabbit_msg_store_index).
-export([new/1, recover/1,
lookup/2, insert/2, update/2, update_fields/3, delete/2,
lookup/2, select_from_file/3, select_all_from_file/2, insert/2, update/2, update_fields/3, delete/2,
delete_object/2, clean_up_temporary_reference_count_entries_without_file/1, terminate/1]).
-define(MSG_LOC_NAME, rabbit_msg_store_ets_index).
@ -42,6 +42,18 @@ lookup(Key, State) ->
[Entry] -> Entry
end.
%% @todo We currently fetch all and then filter by file.
%% This might lead to too many lookups... How to best
%% optimize this? ets:select didn't seem great.
select_from_file(MsgIds, File, State) ->
All = [lookup(Id, State) || Id <- MsgIds],
[MsgLoc || MsgLoc=#msg_location{file=MsgFile} <- All, MsgFile =:= File].
%% Note that this function is not terribly efficient and should only be
%% used for compaction or similar.
select_all_from_file(File, State) ->
ets:match_object(State #state.table, #msg_location { file = File, _ = '_' }).
insert(Obj, State) ->
true = ets:insert_new(State #state.table, Obj),
ok.

View File

@ -9,7 +9,7 @@
-behaviour(gen_server2).
-export([start_link/1, combine/3, delete/2, no_readers/2, stop/1]).
-export([start_link/1, compact/2, truncate/4, delete/2, stop/1]).
-export([set_maximum_since_use/2]).
@ -17,8 +17,8 @@
terminate/2, code_change/3, prioritise_cast/3]).
-record(state,
{ pending_no_readers,
on_action,
{ pending,
timer_ref,
msg_store_state
}).
@ -33,22 +33,21 @@ start_link(MsgStoreState) ->
gen_server2:start_link(?MODULE, [MsgStoreState],
[{timeout, infinity}]).
-spec combine(pid(), rabbit_msg_store:file_num(),
rabbit_msg_store:file_num()) -> 'ok'.
-spec compact(pid(), rabbit_msg_store:file_num()) -> 'ok'.
combine(Server, Source, Destination) ->
gen_server2:cast(Server, {combine, Source, Destination}).
compact(Server, File) ->
gen_server2:cast(Server, {compact, File}).
-spec truncate(pid(), rabbit_msg_store:file_num(), non_neg_integer(), integer()) -> 'ok'.
truncate(Server, File, TruncateSize, ThresholdTimestamp) ->
gen_server2:cast(Server, {truncate, File, TruncateSize, ThresholdTimestamp}).
-spec delete(pid(), rabbit_msg_store:file_num()) -> 'ok'.
delete(Server, File) ->
gen_server2:cast(Server, {delete, File}).
-spec no_readers(pid(), rabbit_msg_store:file_num()) -> 'ok'.
no_readers(Server, File) ->
gen_server2:cast(Server, {no_readers, File}).
-spec stop(pid()) -> 'ok'.
stop(Server) ->
@ -64,8 +63,7 @@ set_maximum_since_use(Pid, Age) ->
init([MsgStoreState]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
{ok, #state { pending_no_readers = #{},
on_action = [],
{ok, #state { pending = #{},
msg_store_state = MsgStoreState }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@ -75,28 +73,44 @@ prioritise_cast(_Msg, _Len, _State) -> 0.
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
handle_cast({combine, Source, Destination}, State) ->
{noreply, attempt_action(combine, [Source, Destination], State), hibernate};
handle_cast({compact, File}, State) ->
%% Since we don't compact files that have a valid size of 0,
%% we cannot have a delete queued at the same time as we are
%% asked to compact. We can always compact.
{noreply, attempt_action(compact, [File], State), hibernate};
handle_cast({delete, File}, State) ->
{noreply, attempt_action(delete, [File], State), hibernate};
handle_cast({truncate, File, TruncateSize, ThresholdTimestamp}, State = #state{pending = Pending}) ->
case Pending of
%% No need to truncate if we are going to delete.
#{File := {delete, _}} ->
{noreply, State, hibernate};
%% Attempt to truncate otherwise. If a truncate was already
%% scheduled we drop it in favor of the new truncate.
_ ->
State1 = State#state{pending = maps:remove(File, Pending)},
{noreply, attempt_action(truncate, [File, TruncateSize, ThresholdTimestamp], State1), hibernate}
end;
handle_cast({no_readers, File},
State = #state { pending_no_readers = Pending }) ->
{noreply, case maps:find(File, Pending) of
error ->
State;
{ok, {Action, Files}} ->
Pending1 = maps:remove(File, Pending),
attempt_action(
Action, Files,
State #state { pending_no_readers = Pending1 })
end, hibernate};
handle_cast({delete, File}, State = #state{pending = Pending}) ->
%% We drop any pending action because deletion takes precedence over truncation.
State1 = State#state{pending = maps:remove(File, Pending)},
{noreply, attempt_action(delete, [File], State1), hibernate};
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
{noreply, State, hibernate}.
%% Run all pending actions.
handle_info({timeout, TimerRef, do_pending},
State = #state{ pending = Pending,
timer_ref = TimerRef }) ->
State1 = State#state{ pending = #{},
timer_ref = undefined },
State2 = maps:fold(fun(_File, {Action, Args}, StateFold) ->
attempt_action(Action, Args, StateFold)
end, State1, Pending),
{noreply, State2, hibernate};
handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.
@ -106,20 +120,27 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
attempt_action(Action, Files,
State = #state { pending_no_readers = Pending,
on_action = Thunks,
attempt_action(Action, Args,
State = #state { pending = Pending,
msg_store_state = MsgStoreState }) ->
case do_action(Action, Files, MsgStoreState) of
{ok, OkThunk} ->
State#state{on_action = lists:filter(fun (Thunk) -> not Thunk() end,
[OkThunk | Thunks])};
{defer, [File | _]} ->
Pending1 = maps:put(File, {Action, Files}, Pending),
State #state { pending_no_readers = Pending1 }
case do_action(Action, Args, MsgStoreState) of
ok ->
State;
defer ->
[File|_] = Args,
Pending1 = maps:put(File, {Action, Args}, Pending),
ensure_pending_timer(State #state { pending = Pending1 })
end.
do_action(combine, [Source, Destination], MsgStoreState) ->
rabbit_msg_store:combine_files(Source, Destination, MsgStoreState);
do_action(compact, [File], MsgStoreState) ->
rabbit_msg_store:compact_file(File, MsgStoreState);
do_action(truncate, [File, Size, ThresholdTimestamp], MsgStoreState) ->
rabbit_msg_store:truncate_file(File, Size, ThresholdTimestamp, MsgStoreState);
do_action(delete, [File], MsgStoreState) ->
rabbit_msg_store:delete_file(File, MsgStoreState).
ensure_pending_timer(State = #state{timer_ref = undefined}) ->
TimerRef = erlang:start_timer(5000, self(), do_pending),
State#state{timer_ref = TimerRef};
ensure_pending_timer(State) ->
State.

View File

@ -1298,27 +1298,26 @@ msg_status(Version, IsPersistent, IsDelivered, SeqId,
msg_props = MsgProps}.
beta_msg_status({Msg = #basic_message{id = MsgId},
SeqId, rabbit_queue_index, MsgProps, IsPersistent}) ->
SeqId, MsgLocation, MsgProps, IsPersistent}) ->
MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent),
MS0#msg_status{msg_id = MsgId,
msg = Msg,
persist_to = queue_index,
msg_location = memory};
beta_msg_status({Msg = #basic_message{id = MsgId},
SeqId, {rabbit_classic_queue_store_v2, _, _}, MsgProps, IsPersistent}) ->
MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent),
MS0#msg_status{msg_id = MsgId,
msg = Msg,
persist_to = queue_store,
msg_location = memory};
persist_to = case MsgLocation of
rabbit_queue_index -> queue_index;
{rabbit_classic_queue_store_v2, _, _} -> queue_store;
rabbit_msg_store -> msg_store
end,
msg_location = case MsgLocation of
rabbit_queue_index -> memory;
_ -> MsgLocation
end};
beta_msg_status({MsgId, SeqId, MsgLocation, MsgProps, IsPersistent}) ->
MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent),
MS0#msg_status{msg_id = MsgId,
msg = undefined,
persist_to = case is_tuple(MsgLocation) of
true -> queue_store;
true -> queue_store; %% @todo I'm not sure this clause is triggered anymore.
false -> msg_store
end,
msg_location = MsgLocation}.
@ -2591,12 +2590,12 @@ maybe_deltas_to_betas(DelsAndAcksFun,
index_mod = IndexMod,
index_state = IndexState,
store_state = StoreState,
msg_store_clients = {MCStateP, MCStateT},
ram_msg_count = RamMsgCount,
ram_bytes = RamBytes,
disk_read_count = DiskReadCount,
delta_transient_bytes = DeltaTransientBytes,
transient_threshold = TransientThreshold,
version = Version },
transient_threshold = TransientThreshold },
MemoryLimit) ->
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
@ -2619,15 +2618,61 @@ maybe_deltas_to_betas(DelsAndAcksFun,
lists:min([IndexMod:next_segment_boundary(DeltaSeqId),
DeltaSeqLimit, DeltaSeqIdEnd]),
{List0, IndexState1} = IndexMod:read(DeltaSeqId, DeltaSeqId1, IndexState),
{List, StoreState2} = case Version of
1 -> {List0, StoreState};
%% When using v2 we try to read all messages from disk at once
%% instead of 1 by 1 at fetch time.
2 ->
Reads = [{SeqId, MsgLocation}
|| {_, SeqId, MsgLocation, _, _} <- List0, is_tuple(MsgLocation)],
{Msgs, StoreState1} = rabbit_classic_queue_store_v2:read_many(Reads, StoreState),
{merge_read_msgs(List0, Reads, Msgs), StoreState1}
%% We try to read messages from disk all at once instead of
%% 1 by 1 at fetch time. When v1 is used and messages are
%% embedded, then the message content is already read from
%% disk at this point. For v2 embedded we must do a separate
%% call to obtain the contents and then merge the contents
%% back into the #msg_status records.
%%
%% For shared message store messages we do the same but only
%% for messages < 20000 bytes and when there are at least 10
%% messages to fetch (otherwise we do the fetch 1 by 1 right
%% before sending the messages). The values have been
%% obtained through experiments because after a certain size
%% the performance drops and it become no longer interesting
%% to keep the extra data in memory. Since we have
%% two different shared stores for persistent/transient
%% they are treated separately when deciding whether to
%% read_many from either of them.
%%
%% Because v2 and shared stores function differently we
%% must keep different information for performing the reads.
{V2Reads0, ShPersistReads, ShTransientReads} = lists:foldl(fun
({_, SeqId, MsgLocation, _, _}, {V2ReadsAcc, ShPReadsAcc, ShTReadsAcc}) when is_tuple(MsgLocation) ->
{[{SeqId, MsgLocation}|V2ReadsAcc], ShPReadsAcc, ShTReadsAcc};
({MsgId, _, rabbit_msg_store, #message_properties{size = Size}, true}, {V2ReadsAcc, ShPReadsAcc, ShTReadsAcc}) when Size =< 20000 ->
{V2ReadsAcc, [MsgId|ShPReadsAcc], ShTReadsAcc};
({MsgId, _, rabbit_msg_store, #message_properties{size = Size}, false}, {V2ReadsAcc, ShPReadsAcc, ShTReadsAcc}) when Size =< 20000 ->
{V2ReadsAcc, ShPReadsAcc, [MsgId|ShTReadsAcc]};
(_, Acc) ->
Acc
end, {[], [], []}, List0),
%% In order to properly read and merge V2 messages we want them
%% in the older->younger order.
V2Reads = lists:reverse(V2Reads0),
%% We do read_many for v2 store unconditionally.
{V2Msgs, StoreState2} = rabbit_classic_queue_store_v2:read_many(V2Reads, StoreState),
List1 = merge_read_msgs(List0, V2Reads, V2Msgs),
%% We read from the shared message store only if there are multiple messages
%% (10+ as we wouldn't get much benefits from smaller number of messages)
%% otherwise we wait and read later.
%%
%% Because read_many does not use FHC we do not get an updated MCState
%% like with normal reads.
List2 = case length(ShPersistReads) < 10 of
true ->
List1;
false ->
ShPersistMsgs = rabbit_msg_store:read_many(ShPersistReads, MCStateP),
merge_sh_read_msgs(List1, ShPersistMsgs)
end,
List = case length(ShTransientReads) < 10 of
true ->
List2;
false ->
ShTransientMsgs = rabbit_msg_store:read_many(ShTransientReads, MCStateT),
merge_sh_read_msgs(List2, ShTransientMsgs)
end,
{Q3a, RamCountsInc, RamBytesInc, State1, TransientCount, TransientBytes} =
betas_from_index_entries(List, TransientThreshold,
@ -2670,9 +2715,21 @@ merge_read_msgs([M = {_, SeqId, _, _, _}|MTail], [{SeqId, _}|RTail], [Msg|MsgTai
[setelement(1, M, Msg)|merge_read_msgs(MTail, RTail, MsgTail)];
merge_read_msgs([M|MTail], RTail, MsgTail) ->
[M|merge_read_msgs(MTail, RTail, MsgTail)];
%% @todo We probably don't need to unwrap until the end.
merge_read_msgs([], [], []) ->
[].
%% We may not get as many messages as we tried reading.
merge_sh_read_msgs([M = {MsgId, _, _, _, _}|MTail], Reads) ->
case Reads of
#{MsgId := Msg} ->
[setelement(1, M, Msg)|merge_sh_read_msgs(MTail, Reads)];
_ ->
[M|merge_sh_read_msgs(MTail, Reads)]
end;
merge_sh_read_msgs(MTail, _Reads) ->
MTail.
%% Flushes queue index batch caches and updates queue index state.
ui(#vqstate{index_mod = IndexMod,
index_state = IndexState,

View File

@ -558,6 +558,7 @@ obtain() -> obtain(1).
set_reservation() -> set_reservation(1).
release() -> release(1).
release_reservation() -> release_reservation(file).
%% @todo This isn't used.
transfer(Pid) -> transfer(Pid, 1).
obtain(Count) -> obtain(Count, socket).

View File

@ -9,7 +9,7 @@
%% stats about read / write operations that go through the fhc.
-export([init/0, update/3, update/2, update/1, get/0]).
-export([init/0, update/3, update/2, update/1, inc/2, get/0]).
-define(TABLE, ?MODULE).
@ -46,6 +46,10 @@ update(Op) ->
ets:update_counter(?TABLE, {Op, count}, 1),
ok.
inc(Op, Count) ->
_ = ets:update_counter(?TABLE, {Op, count}, Count),
ok.
get() ->
lists:sort(ets:tab2list(?TABLE)).