CQ: Improve reading from shared message store
This commit replaces file combining with single-file compaction where data is moved near the beginning of the file before updating the index entries. The file is then truncated when all existing readers are gone. This allows removing the lock that existed before and enables reading multiple messages at once from the shared files. This also helps us avoid many ets operations and simplify the code greatly. This commit still has some issues: reading a single message is currently slow due to the removal of FHC in the client code. This will be resolved by implementing read buffering in a similar way as FHC but without keeping files open more than necessary. The dirty recovery code also likely has a number of issues because of the compaction changes.
This commit is contained in:
parent
6bee34926d
commit
32816c0a76
|
@ -7,7 +7,7 @@
|
||||||
|
|
||||||
-module(rabbit_msg_file).
|
-module(rabbit_msg_file).
|
||||||
|
|
||||||
-export([append/3, read/2, scan/4]).
|
-export([append/3, read/2, pread/2, pread/3, scan/4]).
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
%%----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -39,6 +39,9 @@
|
||||||
|
|
||||||
append(FileHdl, MsgId, MsgBody)
|
append(FileHdl, MsgId, MsgBody)
|
||||||
when is_binary(MsgId) andalso size(MsgId) =:= ?MSG_ID_SIZE_BYTES ->
|
when is_binary(MsgId) andalso size(MsgId) =:= ?MSG_ID_SIZE_BYTES ->
|
||||||
|
%% @todo I think we are actually writing MsgId TWICE: once in
|
||||||
|
%% the header, once in the body. Might be better to reduce
|
||||||
|
%% the size of the body...
|
||||||
MsgBodyBin = term_to_binary(MsgBody),
|
MsgBodyBin = term_to_binary(MsgBody),
|
||||||
MsgBodyBinSize = size(MsgBodyBin),
|
MsgBodyBinSize = size(MsgBodyBin),
|
||||||
Size = MsgBodyBinSize + ?MSG_ID_SIZE_BYTES,
|
Size = MsgBodyBinSize + ?MSG_ID_SIZE_BYTES,
|
||||||
|
@ -67,6 +70,44 @@ read(FileHdl, TotalSize) ->
|
||||||
KO -> KO
|
KO -> KO
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec pread(io_device(), position(), msg_size()) ->
|
||||||
|
rabbit_types:ok_or_error2({rabbit_types:msg_id(), msg()},
|
||||||
|
any()).
|
||||||
|
|
||||||
|
pread(FileHdl, Offset, TotalSize) ->
|
||||||
|
Size = TotalSize - ?FILE_PACKING_ADJUSTMENT,
|
||||||
|
BodyBinSize = Size - ?MSG_ID_SIZE_BYTES,
|
||||||
|
case file:pread(FileHdl, Offset, TotalSize) of
|
||||||
|
{ok, <<Size:?INTEGER_SIZE_BITS,
|
||||||
|
MsgId:?MSG_ID_SIZE_BYTES/binary,
|
||||||
|
MsgBodyBin:BodyBinSize/binary,
|
||||||
|
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} ->
|
||||||
|
{ok, {MsgId, binary_to_term(MsgBodyBin)}};
|
||||||
|
KO -> KO
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec pread(io_device(), [{position(), msg_size()}]) ->
|
||||||
|
{ok, [msg()]} | {error, any()} | eof.
|
||||||
|
|
||||||
|
pread(FileHdl, LocNums) ->
|
||||||
|
case file:pread(FileHdl, LocNums) of
|
||||||
|
{ok, DataL} -> {ok, pread_parse(DataL)};
|
||||||
|
KO -> KO
|
||||||
|
end.
|
||||||
|
|
||||||
|
pread_parse([<<Size:?INTEGER_SIZE_BITS,
|
||||||
|
_MsgId:?MSG_ID_SIZE_BYTES/binary,
|
||||||
|
Rest0/bits>>|Tail]) ->
|
||||||
|
BodyBinSize = Size - ?MSG_ID_SIZE_BYTES,
|
||||||
|
<<MsgBodyBin:BodyBinSize/binary,
|
||||||
|
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS,
|
||||||
|
Rest/bits>> = Rest0,
|
||||||
|
[binary_to_term(MsgBodyBin)|pread_parse([Rest|Tail])];
|
||||||
|
pread_parse([<<>>]) ->
|
||||||
|
[];
|
||||||
|
pread_parse([<<>>|Tail]) ->
|
||||||
|
pread_parse(Tail).
|
||||||
|
|
||||||
-spec scan(io_device(), file_size(), message_accumulator(A), A) ->
|
-spec scan(io_device(), file_size(), message_accumulator(A), A) ->
|
||||||
{'ok', A, position()}.
|
{'ok', A, position()}.
|
||||||
|
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -12,7 +12,7 @@
|
||||||
-behaviour(rabbit_msg_store_index).
|
-behaviour(rabbit_msg_store_index).
|
||||||
|
|
||||||
-export([new/1, recover/1,
|
-export([new/1, recover/1,
|
||||||
lookup/2, insert/2, update/2, update_fields/3, delete/2,
|
lookup/2, select_from_file/3, select_all_from_file/2, insert/2, update/2, update_fields/3, delete/2,
|
||||||
delete_object/2, clean_up_temporary_reference_count_entries_without_file/1, terminate/1]).
|
delete_object/2, clean_up_temporary_reference_count_entries_without_file/1, terminate/1]).
|
||||||
|
|
||||||
-define(MSG_LOC_NAME, rabbit_msg_store_ets_index).
|
-define(MSG_LOC_NAME, rabbit_msg_store_ets_index).
|
||||||
|
@ -42,6 +42,18 @@ lookup(Key, State) ->
|
||||||
[Entry] -> Entry
|
[Entry] -> Entry
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @todo We currently fetch all and then filter by file.
|
||||||
|
%% This might lead to too many lookups... How to best
|
||||||
|
%% optimize this? ets:select didn't seem great.
|
||||||
|
select_from_file(MsgIds, File, State) ->
|
||||||
|
All = [lookup(Id, State) || Id <- MsgIds],
|
||||||
|
[MsgLoc || MsgLoc=#msg_location{file=MsgFile} <- All, MsgFile =:= File].
|
||||||
|
|
||||||
|
%% Note that this function is not terribly efficient and should only be
|
||||||
|
%% used for compaction or similar.
|
||||||
|
select_all_from_file(File, State) ->
|
||||||
|
ets:match_object(State #state.table, #msg_location { file = File, _ = '_' }).
|
||||||
|
|
||||||
insert(Obj, State) ->
|
insert(Obj, State) ->
|
||||||
true = ets:insert_new(State #state.table, Obj),
|
true = ets:insert_new(State #state.table, Obj),
|
||||||
ok.
|
ok.
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
|
|
||||||
-behaviour(gen_server2).
|
-behaviour(gen_server2).
|
||||||
|
|
||||||
-export([start_link/1, combine/3, delete/2, no_readers/2, stop/1]).
|
-export([start_link/1, compact/2, truncate/4, delete/2, stop/1]).
|
||||||
|
|
||||||
-export([set_maximum_since_use/2]).
|
-export([set_maximum_since_use/2]).
|
||||||
|
|
||||||
|
@ -17,8 +17,8 @@
|
||||||
terminate/2, code_change/3, prioritise_cast/3]).
|
terminate/2, code_change/3, prioritise_cast/3]).
|
||||||
|
|
||||||
-record(state,
|
-record(state,
|
||||||
{ pending_no_readers,
|
{ pending,
|
||||||
on_action,
|
timer_ref,
|
||||||
msg_store_state
|
msg_store_state
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
@ -33,22 +33,21 @@ start_link(MsgStoreState) ->
|
||||||
gen_server2:start_link(?MODULE, [MsgStoreState],
|
gen_server2:start_link(?MODULE, [MsgStoreState],
|
||||||
[{timeout, infinity}]).
|
[{timeout, infinity}]).
|
||||||
|
|
||||||
-spec combine(pid(), rabbit_msg_store:file_num(),
|
-spec compact(pid(), rabbit_msg_store:file_num()) -> 'ok'.
|
||||||
rabbit_msg_store:file_num()) -> 'ok'.
|
|
||||||
|
|
||||||
combine(Server, Source, Destination) ->
|
compact(Server, File) ->
|
||||||
gen_server2:cast(Server, {combine, Source, Destination}).
|
gen_server2:cast(Server, {compact, File}).
|
||||||
|
|
||||||
|
-spec truncate(pid(), rabbit_msg_store:file_num(), non_neg_integer(), integer()) -> 'ok'.
|
||||||
|
|
||||||
|
truncate(Server, File, TruncateSize, ThresholdTimestamp) ->
|
||||||
|
gen_server2:cast(Server, {truncate, File, TruncateSize, ThresholdTimestamp}).
|
||||||
|
|
||||||
-spec delete(pid(), rabbit_msg_store:file_num()) -> 'ok'.
|
-spec delete(pid(), rabbit_msg_store:file_num()) -> 'ok'.
|
||||||
|
|
||||||
delete(Server, File) ->
|
delete(Server, File) ->
|
||||||
gen_server2:cast(Server, {delete, File}).
|
gen_server2:cast(Server, {delete, File}).
|
||||||
|
|
||||||
-spec no_readers(pid(), rabbit_msg_store:file_num()) -> 'ok'.
|
|
||||||
|
|
||||||
no_readers(Server, File) ->
|
|
||||||
gen_server2:cast(Server, {no_readers, File}).
|
|
||||||
|
|
||||||
-spec stop(pid()) -> 'ok'.
|
-spec stop(pid()) -> 'ok'.
|
||||||
|
|
||||||
stop(Server) ->
|
stop(Server) ->
|
||||||
|
@ -64,8 +63,7 @@ set_maximum_since_use(Pid, Age) ->
|
||||||
init([MsgStoreState]) ->
|
init([MsgStoreState]) ->
|
||||||
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
|
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
|
||||||
[self()]),
|
[self()]),
|
||||||
{ok, #state { pending_no_readers = #{},
|
{ok, #state { pending = #{},
|
||||||
on_action = [],
|
|
||||||
msg_store_state = MsgStoreState }, hibernate,
|
msg_store_state = MsgStoreState }, hibernate,
|
||||||
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
|
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
|
||||||
|
|
||||||
|
@ -75,28 +73,44 @@ prioritise_cast(_Msg, _Len, _State) -> 0.
|
||||||
handle_call(stop, _From, State) ->
|
handle_call(stop, _From, State) ->
|
||||||
{stop, normal, ok, State}.
|
{stop, normal, ok, State}.
|
||||||
|
|
||||||
handle_cast({combine, Source, Destination}, State) ->
|
handle_cast({compact, File}, State) ->
|
||||||
{noreply, attempt_action(combine, [Source, Destination], State), hibernate};
|
%% Since we don't compact files that have a valid size of 0,
|
||||||
|
%% we cannot have a delete queued at the same time as we are
|
||||||
|
%% asked to compact. We can always compact.
|
||||||
|
{noreply, attempt_action(compact, [File], State), hibernate};
|
||||||
|
|
||||||
handle_cast({delete, File}, State) ->
|
handle_cast({truncate, File, TruncateSize, ThresholdTimestamp}, State = #state{pending = Pending}) ->
|
||||||
{noreply, attempt_action(delete, [File], State), hibernate};
|
case Pending of
|
||||||
|
%% No need to truncate if we are going to delete.
|
||||||
|
#{File := {delete, _}} ->
|
||||||
|
{noreply, State, hibernate};
|
||||||
|
%% Attempt to truncate otherwise. If a truncate was already
|
||||||
|
%% scheduled we drop it in favor of the new truncate.
|
||||||
|
_ ->
|
||||||
|
State1 = State#state{pending = maps:remove(File, Pending)},
|
||||||
|
{noreply, attempt_action(truncate, [File, TruncateSize, ThresholdTimestamp], State1), hibernate}
|
||||||
|
end;
|
||||||
|
|
||||||
handle_cast({no_readers, File},
|
handle_cast({delete, File}, State = #state{pending = Pending}) ->
|
||||||
State = #state { pending_no_readers = Pending }) ->
|
%% We drop any pending action because deletion takes precedence over truncation.
|
||||||
{noreply, case maps:find(File, Pending) of
|
State1 = State#state{pending = maps:remove(File, Pending)},
|
||||||
error ->
|
{noreply, attempt_action(delete, [File], State1), hibernate};
|
||||||
State;
|
|
||||||
{ok, {Action, Files}} ->
|
|
||||||
Pending1 = maps:remove(File, Pending),
|
|
||||||
attempt_action(
|
|
||||||
Action, Files,
|
|
||||||
State #state { pending_no_readers = Pending1 })
|
|
||||||
end, hibernate};
|
|
||||||
|
|
||||||
handle_cast({set_maximum_since_use, Age}, State) ->
|
handle_cast({set_maximum_since_use, Age}, State) ->
|
||||||
ok = file_handle_cache:set_maximum_since_use(Age),
|
ok = file_handle_cache:set_maximum_since_use(Age),
|
||||||
{noreply, State, hibernate}.
|
{noreply, State, hibernate}.
|
||||||
|
|
||||||
|
%% Run all pending actions.
|
||||||
|
handle_info({timeout, TimerRef, do_pending},
|
||||||
|
State = #state{ pending = Pending,
|
||||||
|
timer_ref = TimerRef }) ->
|
||||||
|
State1 = State#state{ pending = #{},
|
||||||
|
timer_ref = undefined },
|
||||||
|
State2 = maps:fold(fun(_File, {Action, Args}, StateFold) ->
|
||||||
|
attempt_action(Action, Args, StateFold)
|
||||||
|
end, State1, Pending),
|
||||||
|
{noreply, State2, hibernate};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
{stop, {unhandled_info, Info}, State}.
|
{stop, {unhandled_info, Info}, State}.
|
||||||
|
|
||||||
|
@ -106,20 +120,27 @@ terminate(_Reason, State) ->
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
attempt_action(Action, Files,
|
attempt_action(Action, Args,
|
||||||
State = #state { pending_no_readers = Pending,
|
State = #state { pending = Pending,
|
||||||
on_action = Thunks,
|
|
||||||
msg_store_state = MsgStoreState }) ->
|
msg_store_state = MsgStoreState }) ->
|
||||||
case do_action(Action, Files, MsgStoreState) of
|
case do_action(Action, Args, MsgStoreState) of
|
||||||
{ok, OkThunk} ->
|
ok ->
|
||||||
State#state{on_action = lists:filter(fun (Thunk) -> not Thunk() end,
|
State;
|
||||||
[OkThunk | Thunks])};
|
defer ->
|
||||||
{defer, [File | _]} ->
|
[File|_] = Args,
|
||||||
Pending1 = maps:put(File, {Action, Files}, Pending),
|
Pending1 = maps:put(File, {Action, Args}, Pending),
|
||||||
State #state { pending_no_readers = Pending1 }
|
ensure_pending_timer(State #state { pending = Pending1 })
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_action(combine, [Source, Destination], MsgStoreState) ->
|
do_action(compact, [File], MsgStoreState) ->
|
||||||
rabbit_msg_store:combine_files(Source, Destination, MsgStoreState);
|
rabbit_msg_store:compact_file(File, MsgStoreState);
|
||||||
|
do_action(truncate, [File, Size, ThresholdTimestamp], MsgStoreState) ->
|
||||||
|
rabbit_msg_store:truncate_file(File, Size, ThresholdTimestamp, MsgStoreState);
|
||||||
do_action(delete, [File], MsgStoreState) ->
|
do_action(delete, [File], MsgStoreState) ->
|
||||||
rabbit_msg_store:delete_file(File, MsgStoreState).
|
rabbit_msg_store:delete_file(File, MsgStoreState).
|
||||||
|
|
||||||
|
ensure_pending_timer(State = #state{timer_ref = undefined}) ->
|
||||||
|
TimerRef = erlang:start_timer(5000, self(), do_pending),
|
||||||
|
State#state{timer_ref = TimerRef};
|
||||||
|
ensure_pending_timer(State) ->
|
||||||
|
State.
|
||||||
|
|
|
@ -1298,27 +1298,26 @@ msg_status(Version, IsPersistent, IsDelivered, SeqId,
|
||||||
msg_props = MsgProps}.
|
msg_props = MsgProps}.
|
||||||
|
|
||||||
beta_msg_status({Msg = #basic_message{id = MsgId},
|
beta_msg_status({Msg = #basic_message{id = MsgId},
|
||||||
SeqId, rabbit_queue_index, MsgProps, IsPersistent}) ->
|
SeqId, MsgLocation, MsgProps, IsPersistent}) ->
|
||||||
MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent),
|
MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent),
|
||||||
MS0#msg_status{msg_id = MsgId,
|
MS0#msg_status{msg_id = MsgId,
|
||||||
msg = Msg,
|
msg = Msg,
|
||||||
persist_to = queue_index,
|
persist_to = case MsgLocation of
|
||||||
msg_location = memory};
|
rabbit_queue_index -> queue_index;
|
||||||
|
{rabbit_classic_queue_store_v2, _, _} -> queue_store;
|
||||||
beta_msg_status({Msg = #basic_message{id = MsgId},
|
rabbit_msg_store -> msg_store
|
||||||
SeqId, {rabbit_classic_queue_store_v2, _, _}, MsgProps, IsPersistent}) ->
|
end,
|
||||||
MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent),
|
msg_location = case MsgLocation of
|
||||||
MS0#msg_status{msg_id = MsgId,
|
rabbit_queue_index -> memory;
|
||||||
msg = Msg,
|
_ -> MsgLocation
|
||||||
persist_to = queue_store,
|
end};
|
||||||
msg_location = memory};
|
|
||||||
|
|
||||||
beta_msg_status({MsgId, SeqId, MsgLocation, MsgProps, IsPersistent}) ->
|
beta_msg_status({MsgId, SeqId, MsgLocation, MsgProps, IsPersistent}) ->
|
||||||
MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent),
|
MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent),
|
||||||
MS0#msg_status{msg_id = MsgId,
|
MS0#msg_status{msg_id = MsgId,
|
||||||
msg = undefined,
|
msg = undefined,
|
||||||
persist_to = case is_tuple(MsgLocation) of
|
persist_to = case is_tuple(MsgLocation) of
|
||||||
true -> queue_store;
|
true -> queue_store; %% @todo I'm not sure this clause is triggered anymore.
|
||||||
false -> msg_store
|
false -> msg_store
|
||||||
end,
|
end,
|
||||||
msg_location = MsgLocation}.
|
msg_location = MsgLocation}.
|
||||||
|
@ -2591,12 +2590,12 @@ maybe_deltas_to_betas(DelsAndAcksFun,
|
||||||
index_mod = IndexMod,
|
index_mod = IndexMod,
|
||||||
index_state = IndexState,
|
index_state = IndexState,
|
||||||
store_state = StoreState,
|
store_state = StoreState,
|
||||||
|
msg_store_clients = {MCStateP, MCStateT},
|
||||||
ram_msg_count = RamMsgCount,
|
ram_msg_count = RamMsgCount,
|
||||||
ram_bytes = RamBytes,
|
ram_bytes = RamBytes,
|
||||||
disk_read_count = DiskReadCount,
|
disk_read_count = DiskReadCount,
|
||||||
delta_transient_bytes = DeltaTransientBytes,
|
delta_transient_bytes = DeltaTransientBytes,
|
||||||
transient_threshold = TransientThreshold,
|
transient_threshold = TransientThreshold },
|
||||||
version = Version },
|
|
||||||
MemoryLimit) ->
|
MemoryLimit) ->
|
||||||
#delta { start_seq_id = DeltaSeqId,
|
#delta { start_seq_id = DeltaSeqId,
|
||||||
count = DeltaCount,
|
count = DeltaCount,
|
||||||
|
@ -2619,15 +2618,61 @@ maybe_deltas_to_betas(DelsAndAcksFun,
|
||||||
lists:min([IndexMod:next_segment_boundary(DeltaSeqId),
|
lists:min([IndexMod:next_segment_boundary(DeltaSeqId),
|
||||||
DeltaSeqLimit, DeltaSeqIdEnd]),
|
DeltaSeqLimit, DeltaSeqIdEnd]),
|
||||||
{List0, IndexState1} = IndexMod:read(DeltaSeqId, DeltaSeqId1, IndexState),
|
{List0, IndexState1} = IndexMod:read(DeltaSeqId, DeltaSeqId1, IndexState),
|
||||||
{List, StoreState2} = case Version of
|
%% We try to read messages from disk all at once instead of
|
||||||
1 -> {List0, StoreState};
|
%% 1 by 1 at fetch time. When v1 is used and messages are
|
||||||
%% When using v2 we try to read all messages from disk at once
|
%% embedded, then the message content is already read from
|
||||||
%% instead of 1 by 1 at fetch time.
|
%% disk at this point. For v2 embedded we must do a separate
|
||||||
2 ->
|
%% call to obtain the contents and then merge the contents
|
||||||
Reads = [{SeqId, MsgLocation}
|
%% back into the #msg_status records.
|
||||||
|| {_, SeqId, MsgLocation, _, _} <- List0, is_tuple(MsgLocation)],
|
%%
|
||||||
{Msgs, StoreState1} = rabbit_classic_queue_store_v2:read_many(Reads, StoreState),
|
%% For shared message store messages we do the same but only
|
||||||
{merge_read_msgs(List0, Reads, Msgs), StoreState1}
|
%% for messages < 20000 bytes and when there are at least 10
|
||||||
|
%% messages to fetch (otherwise we do the fetch 1 by 1 right
|
||||||
|
%% before sending the messages). The values have been
|
||||||
|
%% obtained through experiments because after a certain size
|
||||||
|
%% the performance drops and it become no longer interesting
|
||||||
|
%% to keep the extra data in memory. Since we have
|
||||||
|
%% two different shared stores for persistent/transient
|
||||||
|
%% they are treated separately when deciding whether to
|
||||||
|
%% read_many from either of them.
|
||||||
|
%%
|
||||||
|
%% Because v2 and shared stores function differently we
|
||||||
|
%% must keep different information for performing the reads.
|
||||||
|
{V2Reads0, ShPersistReads, ShTransientReads} = lists:foldl(fun
|
||||||
|
({_, SeqId, MsgLocation, _, _}, {V2ReadsAcc, ShPReadsAcc, ShTReadsAcc}) when is_tuple(MsgLocation) ->
|
||||||
|
{[{SeqId, MsgLocation}|V2ReadsAcc], ShPReadsAcc, ShTReadsAcc};
|
||||||
|
({MsgId, _, rabbit_msg_store, #message_properties{size = Size}, true}, {V2ReadsAcc, ShPReadsAcc, ShTReadsAcc}) when Size =< 20000 ->
|
||||||
|
{V2ReadsAcc, [MsgId|ShPReadsAcc], ShTReadsAcc};
|
||||||
|
({MsgId, _, rabbit_msg_store, #message_properties{size = Size}, false}, {V2ReadsAcc, ShPReadsAcc, ShTReadsAcc}) when Size =< 20000 ->
|
||||||
|
{V2ReadsAcc, ShPReadsAcc, [MsgId|ShTReadsAcc]};
|
||||||
|
(_, Acc) ->
|
||||||
|
Acc
|
||||||
|
end, {[], [], []}, List0),
|
||||||
|
%% In order to properly read and merge V2 messages we want them
|
||||||
|
%% in the older->younger order.
|
||||||
|
V2Reads = lists:reverse(V2Reads0),
|
||||||
|
%% We do read_many for v2 store unconditionally.
|
||||||
|
{V2Msgs, StoreState2} = rabbit_classic_queue_store_v2:read_many(V2Reads, StoreState),
|
||||||
|
List1 = merge_read_msgs(List0, V2Reads, V2Msgs),
|
||||||
|
%% We read from the shared message store only if there are multiple messages
|
||||||
|
%% (10+ as we wouldn't get much benefits from smaller number of messages)
|
||||||
|
%% otherwise we wait and read later.
|
||||||
|
%%
|
||||||
|
%% Because read_many does not use FHC we do not get an updated MCState
|
||||||
|
%% like with normal reads.
|
||||||
|
List2 = case length(ShPersistReads) < 10 of
|
||||||
|
true ->
|
||||||
|
List1;
|
||||||
|
false ->
|
||||||
|
ShPersistMsgs = rabbit_msg_store:read_many(ShPersistReads, MCStateP),
|
||||||
|
merge_sh_read_msgs(List1, ShPersistMsgs)
|
||||||
|
end,
|
||||||
|
List = case length(ShTransientReads) < 10 of
|
||||||
|
true ->
|
||||||
|
List2;
|
||||||
|
false ->
|
||||||
|
ShTransientMsgs = rabbit_msg_store:read_many(ShTransientReads, MCStateT),
|
||||||
|
merge_sh_read_msgs(List2, ShTransientMsgs)
|
||||||
end,
|
end,
|
||||||
{Q3a, RamCountsInc, RamBytesInc, State1, TransientCount, TransientBytes} =
|
{Q3a, RamCountsInc, RamBytesInc, State1, TransientCount, TransientBytes} =
|
||||||
betas_from_index_entries(List, TransientThreshold,
|
betas_from_index_entries(List, TransientThreshold,
|
||||||
|
@ -2670,9 +2715,21 @@ merge_read_msgs([M = {_, SeqId, _, _, _}|MTail], [{SeqId, _}|RTail], [Msg|MsgTai
|
||||||
[setelement(1, M, Msg)|merge_read_msgs(MTail, RTail, MsgTail)];
|
[setelement(1, M, Msg)|merge_read_msgs(MTail, RTail, MsgTail)];
|
||||||
merge_read_msgs([M|MTail], RTail, MsgTail) ->
|
merge_read_msgs([M|MTail], RTail, MsgTail) ->
|
||||||
[M|merge_read_msgs(MTail, RTail, MsgTail)];
|
[M|merge_read_msgs(MTail, RTail, MsgTail)];
|
||||||
|
%% @todo We probably don't need to unwrap until the end.
|
||||||
merge_read_msgs([], [], []) ->
|
merge_read_msgs([], [], []) ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
|
%% We may not get as many messages as we tried reading.
|
||||||
|
merge_sh_read_msgs([M = {MsgId, _, _, _, _}|MTail], Reads) ->
|
||||||
|
case Reads of
|
||||||
|
#{MsgId := Msg} ->
|
||||||
|
[setelement(1, M, Msg)|merge_sh_read_msgs(MTail, Reads)];
|
||||||
|
_ ->
|
||||||
|
[M|merge_sh_read_msgs(MTail, Reads)]
|
||||||
|
end;
|
||||||
|
merge_sh_read_msgs(MTail, _Reads) ->
|
||||||
|
MTail.
|
||||||
|
|
||||||
%% Flushes queue index batch caches and updates queue index state.
|
%% Flushes queue index batch caches and updates queue index state.
|
||||||
ui(#vqstate{index_mod = IndexMod,
|
ui(#vqstate{index_mod = IndexMod,
|
||||||
index_state = IndexState,
|
index_state = IndexState,
|
||||||
|
|
|
@ -558,6 +558,7 @@ obtain() -> obtain(1).
|
||||||
set_reservation() -> set_reservation(1).
|
set_reservation() -> set_reservation(1).
|
||||||
release() -> release(1).
|
release() -> release(1).
|
||||||
release_reservation() -> release_reservation(file).
|
release_reservation() -> release_reservation(file).
|
||||||
|
%% @todo This isn't used.
|
||||||
transfer(Pid) -> transfer(Pid, 1).
|
transfer(Pid) -> transfer(Pid, 1).
|
||||||
|
|
||||||
obtain(Count) -> obtain(Count, socket).
|
obtain(Count) -> obtain(Count, socket).
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
|
|
||||||
%% stats about read / write operations that go through the fhc.
|
%% stats about read / write operations that go through the fhc.
|
||||||
|
|
||||||
-export([init/0, update/3, update/2, update/1, get/0]).
|
-export([init/0, update/3, update/2, update/1, inc/2, get/0]).
|
||||||
|
|
||||||
-define(TABLE, ?MODULE).
|
-define(TABLE, ?MODULE).
|
||||||
|
|
||||||
|
@ -46,6 +46,10 @@ update(Op) ->
|
||||||
ets:update_counter(?TABLE, {Op, count}, 1),
|
ets:update_counter(?TABLE, {Op, count}, 1),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
inc(Op, Count) ->
|
||||||
|
_ = ets:update_counter(?TABLE, {Op, count}, Count),
|
||||||
|
ok.
|
||||||
|
|
||||||
get() ->
|
get() ->
|
||||||
lists:sort(ets:tab2list(?TABLE)).
|
lists:sort(ets:tab2list(?TABLE)).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue