Merge pull request #11112 from rabbitmq/loic-faster-cq-shared-store-gc

4.x: Additional CQv2 message store optimisations
This commit is contained in:
Loïc Hoguin 2024-06-17 15:30:25 +02:00 committed by GitHub
commit d959c8a43d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 319 additions and 409 deletions

View File

@ -39,7 +39,6 @@ _APP_ENV = """[
{vm_memory_calculation_strategy, rss},
{memory_monitor_interval, 2500},
{disk_free_limit, 50000000}, %% 50MB
{msg_store_index_module, rabbit_msg_store_ets_index},
{backing_queue_module, rabbit_variable_queue},
%% 0 ("no limit") would make a better default, but that
%% breaks the QPid Java client

View File

@ -19,7 +19,6 @@ define PROJECT_ENV
{vm_memory_calculation_strategy, rss},
{memory_monitor_interval, 2500},
{disk_free_limit, 50000000}, %% 50MB
{msg_store_index_module, rabbit_msg_store_ets_index},
{backing_queue_module, rabbit_variable_queue},
%% 0 ("no limit") would make a better default, but that
%% breaks the QPid Java client

3
deps/rabbit/app.bzl vendored
View File

@ -170,7 +170,6 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_mirror_queue_misc.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_msg_store.erl",
"src/rabbit_msg_store_ets_index.erl",
"src/rabbit_msg_store_gc.erl",
"src/rabbit_networking.erl",
"src/rabbit_networking_store.erl",
@ -431,7 +430,6 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_mirror_queue_misc.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_msg_store.erl",
"src/rabbit_msg_store_ets_index.erl",
"src/rabbit_msg_store_gc.erl",
"src/rabbit_networking.erl",
"src/rabbit_networking_store.erl",
@ -711,7 +709,6 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_mirror_queue_misc.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_msg_store.erl",
"src/rabbit_msg_store_ets_index.erl",
"src/rabbit_msg_store_gc.erl",
"src/rabbit_networking.erl",
"src/rabbit_networking_store.erl",

View File

@ -1125,8 +1125,11 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
empty ->
ok = gatherer:stop(Gatherer),
finished;
%% From v1 index walker. @todo Remove when no longer possible to convert from v1.
{value, {MsgId, Count}} ->
{MsgId, Count, {next, Gatherer}}
{MsgId, Count, {next, Gatherer}};
{value, MsgIds} ->
{MsgIds, {next, Gatherer}}
end.
queue_index_walker_reader(#resource{ virtual_host = VHost } = Name, Gatherer) ->
@ -1153,27 +1156,30 @@ queue_index_walker_segment(F, Gatherer) ->
{ok, <<?MAGIC:32,?VERSION:8,
FromSeqId:64/unsigned,ToSeqId:64/unsigned,
_/bits>>} ->
queue_index_walker_segment(Fd, Gatherer, 0, ToSeqId - FromSeqId);
queue_index_walker_segment(Fd, Gatherer, 0, ToSeqId - FromSeqId, []);
_ ->
%% Invalid segment file. Skip.
ok
end,
ok = file:close(Fd).
queue_index_walker_segment(_, _, N, N) ->
queue_index_walker_segment(_, Gatherer, N, N, Acc) ->
%% We reached the end of the segment file.
gatherer:sync_in(Gatherer, Acc),
ok;
queue_index_walker_segment(Fd, Gatherer, N, Total) ->
queue_index_walker_segment(Fd, Gatherer, N, Total, Acc) ->
case file:read(Fd, ?ENTRY_SIZE) of
%% We found a non-ack persistent entry. Gather it.
{ok, <<1,_:7,1:1,_,1,Id:16/binary,_/bits>>} ->
gatherer:sync_in(Gatherer, {Id, 1}),
queue_index_walker_segment(Fd, Gatherer, N + 1, Total);
queue_index_walker_segment(Fd, Gatherer, N + 1, Total, [Id|Acc]);
%% We found an ack, a transient entry or a non-entry. Skip it.
{ok, _} ->
queue_index_walker_segment(Fd, Gatherer, N + 1, Total);
queue_index_walker_segment(Fd, Gatherer, N + 1, Total, Acc);
%% We reached the end of a partial segment file.
eof when Acc =:= [] ->
ok;
eof ->
gatherer:sync_in(Gatherer, Acc),
ok
end.

View File

@ -24,23 +24,21 @@
%%----------------------------------------------------------------------------
-include_lib("rabbit_common/include/rabbit_msg_store.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
%% We flush to disk when the write buffer gets above the max size,
%% or at an interval to make sure we don't keep the data in memory
%% too long. Confirms are sent after the data is flushed to disk.
-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB.
-define(SYNC_INTERVAL, 200). %% Milliseconds.
-type(msg() :: any()).
-record(msg_location, {msg_id, ref_count, file, offset, total_size}).
%% We flush to disk at an interval to make sure we don't keep
%% the data in memory too long. Confirms are sent after the
%% data is flushed to disk.
-define(SYNC_INTERVAL, 200). %% Milliseconds.
-define(CLEAN_FILENAME, "clean.dot").
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
-define(BINARY_MODE, [raw, binary]).
-define(READ_MODE, [read]).
-define(WRITE_MODE, [write]).
-define(FILE_EXTENSION, ".rdq").
-define(FILE_EXTENSION_TMP, ".rdt").
%% We keep track of flying messages for writes and removes. The idea is that
%% when a remove comes in before we could process the write, we skip the
@ -73,11 +71,8 @@
{
%% store directory
dir :: file:filename(),
%% the module for index ops,
%% rabbit_msg_store_ets_index by default
index_module,
%% where are messages?
index_state,
%% index table
index_ets,
%% current file name as number
current_file,
%% current file handle since the last fsync?
@ -119,8 +114,7 @@
{ server,
client_ref,
reader,
index_state,
index_module,
index_ets,
dir,
file_handles_ets,
cur_file_cache_ets,
@ -133,8 +127,7 @@
-record(gc_state,
{ dir,
index_module,
index_state,
index_ets,
file_summary_ets,
file_handles_ets,
msg_store
@ -151,8 +144,7 @@
-export_type([gc_state/0, file_num/0]).
-type gc_state() :: #gc_state { dir :: file:filename(),
index_module :: atom(),
index_state :: any(),
index_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
file_handles_ets :: ets:tid(),
msg_store :: server()
@ -165,8 +157,7 @@
server :: server(),
client_ref :: client_ref(),
reader :: undefined | {non_neg_integer(), file:fd()},
index_state :: any(),
index_module :: atom(),
index_ets :: any(),
%% Stored as binary() as opposed to file:filename() to save memory.
dir :: binary(),
file_handles_ets :: ets:tid(),
@ -410,7 +401,7 @@ successfully_recovered_state(Server) ->
-spec client_init(server(), client_ref(), maybe_msg_id_fun()) -> client_msstate().
client_init(Server, Ref, MsgOnDiskFun) when is_pid(Server); is_atom(Server) ->
{IState, IModule, Dir, FileHandlesEts, CurFileCacheEts, FlyingEts} =
{IndexEts, Dir, FileHandlesEts, CurFileCacheEts, FlyingEts} =
gen_server2:call(
Server, {new_client_state, Ref, self(), MsgOnDiskFun},
infinity),
@ -419,8 +410,7 @@ client_init(Server, Ref, MsgOnDiskFun) when is_pid(Server); is_atom(Server) ->
#client_msstate { server = Server,
client_ref = Ref,
reader = undefined,
index_state = IState,
index_module = IModule,
index_ets = IndexEts,
dir = rabbit_file:filename_to_binary(Dir),
file_handles_ets = FileHandlesEts,
cur_file_cache_ets = CurFileCacheEts,
@ -471,14 +461,14 @@ write(MsgRef, MsgId, Msg, CState) -> client_write(MsgRef, MsgId, Msg, noflow, CS
-spec read(rabbit_types:msg_id(), client_msstate()) ->
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}.
read(MsgId,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
read(MsgId, CState = #client_msstate { index_ets = IndexEts,
cur_file_cache_ets = CurFileCacheEts }) ->
file_handle_cache_stats:update(msg_store_read),
%% Check the cur file cache
case ets:lookup(CurFileCacheEts, MsgId) of
[] ->
%% @todo It's probably a bug if we don't get a positive ref count.
case index_lookup_positive_ref_count(MsgId, CState) of
case index_lookup_positive_ref_count(IndexEts, MsgId) of
not_found -> {not_found, CState};
MsgLocation -> client_read3(MsgLocation, CState)
end;
@ -489,11 +479,6 @@ read(MsgId,
-spec read_many([rabbit_types:msg_id()], client_msstate())
-> {#{rabbit_types:msg_id() => msg()}, client_msstate()}.
%% We disable read_many when the index module is not ETS for the time being.
%% We can introduce the new index module callback as a breaking change in 4.0.
read_many(_, CState = #client_msstate{ index_module = IndexMod })
when IndexMod =/= rabbit_msg_store_ets_index ->
{#{}, CState};
read_many(MsgIds, CState) ->
file_handle_cache_stats:inc(msg_store_read, length(MsgIds)),
%% We receive MsgIds in rouhgly the younger->older order so
@ -512,8 +497,8 @@ read_many_cache([], CState, Acc) ->
{Acc, CState}.
%% We will read from disk one file at a time in no particular order.
read_many_disk([MsgId|Tail], CState, Acc) ->
case index_lookup_positive_ref_count(MsgId, CState) of
read_many_disk([MsgId|Tail], CState = #client_msstate{ index_ets = IndexEts }, Acc) ->
case index_lookup_positive_ref_count(IndexEts, MsgId) of
%% We ignore this message if it's not found and will try
%% to read it individually later instead. We can't call
%% the message store and expect good performance here.
@ -525,6 +510,7 @@ read_many_disk([], CState, Acc) ->
{Acc, CState}.
read_many_file2(MsgIds0, CState = #client_msstate{ dir = Dir,
index_ets = IndexEts,
file_handles_ets = FileHandlesEts,
reader = Reader0,
client_ref = Ref }, Acc0, File) ->
@ -534,7 +520,7 @@ read_many_file2(MsgIds0, CState = #client_msstate{ dir = Dir,
%% It's possible that we get no results here if compaction
%% was in progress. That's OK: we will try again with those
%% MsgIds to get them from the new file.
MsgLocations0 = index_select_from_file(MsgIds0, File, CState),
MsgLocations0 = index_select_from_file(IndexEts, MsgIds0, File),
case MsgLocations0 of
[] ->
read_many_file3(MsgIds0, CState, Acc0, File);
@ -563,10 +549,6 @@ read_many_file2(MsgIds0, CState = #client_msstate{ dir = Dir,
read_many_file3(MsgIds, CState#client_msstate{ reader = Reader }, Acc, File)
end.
index_select_from_file(MsgIds, File, #client_msstate { index_module = Index,
index_state = State }) ->
Index:select_from_file(MsgIds, File, State).
consolidate_reads([#msg_location{offset=NextOffset, total_size=NextSize}|Locs], [{Offset, Size}|Acc])
when Offset + Size =:= NextOffset ->
consolidate_reads(Locs, [{Offset, Size + NextSize}|Acc]);
@ -628,14 +610,15 @@ client_write(MsgRef, MsgId, Msg, Flow,
%% And the file only gets deleted after all data was copied, index
%% was updated and file handles got closed.
client_read3(#msg_location { msg_id = MsgId, file = File },
CState = #client_msstate { file_handles_ets = FileHandlesEts,
CState = #client_msstate { index_ets = IndexEts,
file_handles_ets = FileHandlesEts,
client_ref = Ref }) ->
%% We immediately mark the handle open so that we don't get the
%% file truncated while we are reading from it. The file may still
%% be truncated past that point but that's OK because we do a second
%% index lookup to ensure that we get the updated message location.
mark_handle_open(FileHandlesEts, File, Ref),
case index_lookup(MsgId, CState) of
case index_lookup(IndexEts, MsgId) of
#msg_location { file = File, ref_count = RefCount } = MsgLocation when RefCount > 0 ->
{Msg, CState1} = read_from_disk(MsgLocation, CState),
mark_handle_closed(FileHandlesEts, File, Ref),
@ -720,9 +703,6 @@ init([VHost, Type, BaseDir, ClientRefs, StartupFunState]) ->
Dir = filename:join(BaseDir, atom_to_list(Type)),
Name = filename:join(filename:basename(BaseDir), atom_to_list(Type)),
{ok, IndexModule} = application:get_env(rabbit, msg_store_index_module),
rabbit_log:info("Message store ~tp: using ~tp to provide index", [Name, IndexModule]),
AttemptFileSummaryRecovery =
case ClientRefs of
%% Transient.
@ -738,8 +718,8 @@ init([VHost, Type, BaseDir, ClientRefs, StartupFunState]) ->
%% we start recovering messages from the files on disk.
{FileSummaryRecovered, FileSummaryEts} =
recover_file_summary(AttemptFileSummaryRecovery, Dir),
{CleanShutdown, IndexState, ClientRefs1} =
recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
{CleanShutdown, IndexEts, ClientRefs1} =
recover_index_and_client_refs(FileSummaryRecovered,
ClientRefs, Dir, Name),
Clients = maps:from_list(
[{CRef, {undefined, undefined}} ||
@ -766,8 +746,7 @@ init([VHost, Type, BaseDir, ClientRefs, StartupFunState]) ->
{ok, GCPid} = rabbit_msg_store_gc:start_link(
#gc_state { dir = Dir,
index_module = IndexModule,
index_state = IndexState,
index_ets = IndexEts,
file_summary_ets = FileSummaryEts,
file_handles_ets = FileHandlesEts,
msg_store = self()
@ -777,8 +756,7 @@ init([VHost, Type, BaseDir, ClientRefs, StartupFunState]) ->
?CREDIT_DISC_BOUND),
State = #msstate { dir = Dir,
index_module = IndexModule,
index_state = IndexState,
index_ets = IndexEts,
current_file = 0,
current_file_handle = undefined,
current_file_offset = 0,
@ -841,15 +819,14 @@ handle_call(successfully_recovered_state, _From, State) ->
handle_call({new_client_state, CRef, CPid, MsgOnDiskFun}, _From,
State = #msstate { dir = Dir,
index_state = IndexState,
index_module = IndexModule,
index_ets = IndexEts,
file_handles_ets = FileHandlesEts,
cur_file_cache_ets = CurFileCacheEts,
flying_ets = FlyingEts,
clients = Clients }) ->
Clients1 = maps:put(CRef, {CPid, MsgOnDiskFun}, Clients),
erlang:monitor(process, CPid),
reply({IndexState, IndexModule, Dir, FileHandlesEts,
reply({IndexEts, Dir, FileHandlesEts,
CurFileCacheEts, FlyingEts},
State #msstate { clients = Clients1 });
@ -877,7 +854,8 @@ handle_cast({client_delete, CRef},
noreply(clear_client(CRef, State1));
handle_cast({write, CRef, MsgRef, MsgId, Flow},
State = #msstate { cur_file_cache_ets = CurFileCacheEts,
State = #msstate { index_ets = IndexEts,
cur_file_cache_ets = CurFileCacheEts,
clients = Clients,
credit_disc_bound = CreditDiscBound }) ->
case Flow of
@ -905,8 +883,8 @@ handle_cast({write, CRef, MsgRef, MsgId, Flow},
%% or the non-current files. If the message *is* in the
%% current file then the cache entry will be removed by
%% the normal logic for that in write_message/4 and
%% maybe_roll_to_new_file/2.
case index_lookup(MsgId, State) of
%% flush_or_roll_to_new_file/2.
case index_lookup(IndexEts, MsgId) of
#msg_location { file = File }
when File == State #msstate.current_file ->
ok;
@ -968,8 +946,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) ->
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
terminate(Reason, State = #msstate { index_state = IndexState,
index_module = IndexModule,
terminate(Reason, State = #msstate { index_ets = IndexEts,
current_file_handle = CurHdl,
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
@ -1004,9 +981,8 @@ terminate(Reason, State = #msstate { index_state = IndexState,
end,
[true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts,
CurFileCacheEts, FlyingEts]],
IndexModule:terminate(IndexState),
case store_recovery_terms([{client_refs, maps:keys(Clients)},
{index_module, IndexModule}], Dir) of
index_terminate(IndexEts, Dir),
case store_recovery_terms([{client_refs, maps:keys(Clients)}], Dir) of
ok ->
rabbit_log:info("Message store for directory '~ts' is stopped", [Dir]),
ok;
@ -1015,8 +991,7 @@ terminate(Reason, State = #msstate { index_state = IndexState,
" for directory ~tp~nError: ~tp",
[Dir, RTErr])
end,
State3 #msstate { index_state = undefined,
current_file_handle = undefined,
State3 #msstate { current_file_handle = undefined,
current_file_offset = 0 }.
code_change(_OldVsn, State, _Extra) ->
@ -1117,14 +1092,15 @@ write_action({false, not_found}, _MsgId, State) ->
write_action({Mask, #msg_location { ref_count = 0, file = File,
total_size = TotalSize }},
MsgId, State = #msstate { current_file = CurrentFile,
index_ets = IndexEts,
file_summary_ets = FileSummaryEts }) ->
case {Mask, ets:lookup(FileSummaryEts, File)} of
%% Never increase the ref_count for a file that is about to get deleted.
{_, [#file_summary{valid_total_size = 0}]} when File =/= CurrentFile ->
ok = index_delete(MsgId, State),
ok = index_delete(IndexEts, MsgId),
{write, State};
{false, [#file_summary { locked = true }]} ->
ok = index_delete(MsgId, State),
ok = index_delete(IndexEts, MsgId),
{write, State};
{false_if_increment, [#file_summary { locked = true }]} ->
%% The msg for MsgId is older than the client death
@ -1133,13 +1109,13 @@ write_action({Mask, #msg_location { ref_count = 0, file = File,
%% ignore this write.
{ignore, File, State};
{_Mask, [#file_summary {}]} ->
ok = index_update_ref_count(MsgId, 1, State),
ok = index_update_ref_counter(IndexEts, MsgId, +1), %% Effectively set to 1.
State1 = adjust_valid_total_size(File, TotalSize, State),
{confirm, File, State1}
end;
write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
MsgId, State) ->
ok = index_update_ref_count(MsgId, RefCount + 1, State),
write_action({_Mask, #msg_location { file = File }},
MsgId, State = #msstate{ index_ets = IndexEts }) ->
ok = index_update_ref_counter(IndexEts, MsgId, +1),
%% We already know about it, just update counter. Only update
%% field otherwise bad interaction with concurrent GC
{confirm, File, State}.
@ -1166,7 +1142,7 @@ write_message(MsgId, Msg, CRef,
end, CRef, State1)
end.
remove_message(MsgId, CRef, State) ->
remove_message(MsgId, CRef, State = #msstate{ index_ets = IndexEts }) ->
case should_mask_action(CRef, MsgId, State) of
{true, _Location} ->
State;
@ -1182,8 +1158,7 @@ remove_message(MsgId, CRef, State) ->
when RefCount > 0 ->
%% only update field, otherwise bad interaction with
%% concurrent GC
Dec = fun () -> index_update_ref_count(
MsgId, RefCount - 1, State) end,
Dec = fun () -> index_update_ref_counter(IndexEts, MsgId, -1) end,
case RefCount of
%% don't remove from cur_file_cache_ets here because
%% there may be further writes in the mailbox for the
@ -1214,28 +1189,127 @@ gc_candidate(File, State = #msstate{ gc_candidates = Candidates,
gc_candidate(File, State = #msstate{ gc_candidates = Candidates }) ->
State#msstate{ gc_candidates = Candidates#{ File => true }}.
write_message(MsgId, Msg,
State0 = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
current_file_offset = CurOffset,
file_summary_ets = FileSummaryEts }) ->
{MaybeFlush, TotalSize} = writer_append(CurHdl, MsgId, Msg),
State = case MaybeFlush of
flush -> internal_sync(State0);
ok -> State0
end,
ok = index_insert(
%% This value must be smaller enough than ?SCAN_BLOCK_SIZE
%% to ensure we only ever need 2 reads when scanning files.
%% Hence the choice of 4MB here and 4MiB there, the difference
%% in size being more than enough to ensure that property.
-define(LARGE_MESSAGE_THRESHOLD, 4000000). %% 4MB.
write_message(MsgId, MsgBody, State) ->
MsgBodyBin = term_to_binary(MsgBody),
%% Large messages get written to their own files.
if
byte_size(MsgBodyBin) >= ?LARGE_MESSAGE_THRESHOLD ->
write_large_message(MsgId, MsgBodyBin, State);
true ->
write_small_message(MsgId, MsgBodyBin, State)
end.
write_small_message(MsgId, MsgBodyBin,
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
current_file_offset = CurOffset,
index_ets = IndexEts,
file_summary_ets = FileSummaryEts }) ->
{MaybeFlush, TotalSize} = writer_append(CurHdl, MsgId, MsgBodyBin),
ok = index_insert(IndexEts,
#msg_location { msg_id = MsgId, ref_count = 1, file = CurFile,
offset = CurOffset, total_size = TotalSize }, State),
offset = CurOffset, total_size = TotalSize }),
[_,_] = ets:update_counter(FileSummaryEts, CurFile,
[{#file_summary.valid_total_size, TotalSize},
{#file_summary.file_size, TotalSize}]),
maybe_roll_to_new_file(CurOffset + TotalSize,
flush_or_roll_to_new_file(CurOffset + TotalSize, MaybeFlush,
State #msstate {
current_file_offset = CurOffset + TotalSize }).
contains_message(MsgId, From, State) ->
MsgLocation = index_lookup_positive_ref_count(MsgId, State),
flush_or_roll_to_new_file(
Offset, _MaybeFlush,
State = #msstate { dir = Dir,
current_file_handle = CurHdl,
current_file = CurFile,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
file_size_limit = FileSizeLimit })
when Offset >= FileSizeLimit ->
State1 = internal_sync(State),
ok = writer_close(CurHdl),
NextFile = CurFile + 1,
{ok, NextHdl} = writer_open(Dir, NextFile),
true = ets:insert_new(FileSummaryEts, #file_summary {
file = NextFile,
valid_total_size = 0,
file_size = 0,
locked = false }),
%% Delete messages from the cache that were written to disk.
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
State1 #msstate { current_file_handle = NextHdl,
current_file = NextFile,
current_file_offset = 0 };
%% If we need to flush, do so here.
flush_or_roll_to_new_file(_, flush, State) ->
internal_sync(State);
flush_or_roll_to_new_file(_, _, State) ->
State.
write_large_message(MsgId, MsgBodyBin,
State0 = #msstate { dir = Dir,
current_file_handle = CurHdl,
current_file = CurFile,
current_file_offset = CurOffset,
index_ets = IndexEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts }) ->
{LargeMsgFile, LargeMsgHdl} = case CurOffset of
%% We haven't written in the file yet. Use it.
0 ->
{CurFile, CurHdl};
%% Flush the current file and close it. Open a new file.
_ ->
ok = writer_flush(CurHdl),
ok = writer_close(CurHdl),
LargeMsgFile0 = CurFile + 1,
{ok, LargeMsgHdl0} = writer_open(Dir, LargeMsgFile0),
{LargeMsgFile0, LargeMsgHdl0}
end,
%% Write the message directly and close the file.
TotalSize = writer_direct_write(LargeMsgHdl, MsgId, MsgBodyBin),
ok = writer_close(LargeMsgHdl),
%% Update ets with the new information.
ok = index_insert(IndexEts,
#msg_location { msg_id = MsgId, ref_count = 1, file = LargeMsgFile,
offset = 0, total_size = TotalSize }),
_ = case CurFile of
%% We didn't open a new file. We must update the existing value.
LargeMsgFile ->
[_,_] = ets:update_counter(FileSummaryEts, LargeMsgFile,
[{#file_summary.valid_total_size, TotalSize},
{#file_summary.file_size, TotalSize}]);
%% We opened a new file. We can insert it all at once.
_ ->
true = ets:insert_new(FileSummaryEts, #file_summary {
file = LargeMsgFile,
valid_total_size = TotalSize,
file_size = TotalSize,
locked = false })
end,
%% Roll over to the next file.
NextFile = LargeMsgFile + 1,
{ok, NextHdl} = writer_open(Dir, NextFile),
true = ets:insert_new(FileSummaryEts, #file_summary {
file = NextFile,
valid_total_size = 0,
file_size = 0,
locked = false }),
%% Delete messages from the cache that were written to disk.
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
%% Process confirms (this won't flush; we already did) and continue.
State = internal_sync(State0),
State #msstate { current_file_handle = NextHdl,
current_file = NextFile,
current_file_offset = 0 }.
contains_message(MsgId, From, State = #msstate{ index_ets = IndexEts }) ->
MsgLocation = index_lookup_positive_ref_count(IndexEts, MsgId),
gen_server2:reply(From, MsgLocation =/= not_found),
State.
@ -1289,9 +1363,9 @@ record_pending_confirm(CRef, MsgId, State) ->
%% rewrite the msg - rewriting it would make it younger than the death
%% msg and thus should be ignored. Note that this (correctly) returns
%% false when testing to remove the death msg itself.
should_mask_action(CRef, MsgId,
State = #msstate{dying_clients = DyingClients}) ->
case {maps:find(CRef, DyingClients), index_lookup(MsgId, State)} of
should_mask_action(CRef, MsgId, #msstate{
index_ets = IndexEts, dying_clients = DyingClients}) ->
case {maps:find(CRef, DyingClients), index_lookup(IndexEts, MsgId)} of
{error, Location} ->
{false, Location};
{{ok, _}, not_found} ->
@ -1331,8 +1405,7 @@ writer_recover(Dir, Num, Offset) ->
ok = file:truncate(Fd),
{ok, #writer{fd = Fd, buffer = prim_buffer:new()}}.
writer_append(#writer{buffer = Buffer}, MsgId, MsgBody) ->
MsgBodyBin = term_to_binary(MsgBody),
writer_append(#writer{buffer = Buffer}, MsgId, MsgBodyBin) ->
MsgBodyBinSize = byte_size(MsgBodyBin),
EntrySize = MsgBodyBinSize + 16, %% Size of MsgId + MsgBodyBin.
%% We send an iovec to the buffer instead of building a binary.
@ -1360,6 +1433,21 @@ writer_flush(#writer{fd = Fd, buffer = Buffer}) ->
file:write(Fd, prim_buffer:read_iovec(Buffer, Size))
end.
%% For large messages we don't buffer anything. Large messages
%% are kept within their own files.
%%
%% This is basically the same as writer_append except no buffering.
writer_direct_write(#writer{fd = Fd}, MsgId, MsgBodyBin) ->
MsgBodyBinSize = byte_size(MsgBodyBin),
EntrySize = MsgBodyBinSize + 16, %% Size of MsgId + MsgBodyBin.
ok = file:write(Fd, [
<<EntrySize:64>>,
MsgId,
MsgBodyBin,
<<255>> %% OK marker.
]),
EntrySize + 9.
writer_close(#writer{fd = Fd}) ->
file:close(Fd).
@ -1457,85 +1545,129 @@ scan_data(<<_, Rest/bits>>, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
scan_data(Rest, Fd, Offset + 1, FileSize, MsgIdsFound, Acc).
%%----------------------------------------------------------------------------
%% index
%% Ets index
%%----------------------------------------------------------------------------
index_lookup_positive_ref_count(Key, State) ->
case index_lookup(Key, State) of
-define(INDEX_TABLE_NAME, rabbit_msg_store_ets_index).
-define(INDEX_FILE_NAME, "msg_store_index.ets").
index_new(Dir) ->
_ = file:delete(filename:join(Dir, ?INDEX_FILE_NAME)),
ets:new(?INDEX_TABLE_NAME, [set, public, {keypos, #msg_location.msg_id}]).
index_recover(Dir) ->
Path = filename:join(Dir, ?INDEX_FILE_NAME),
case ets:file2tab(Path) of
{ok, IndexEts} -> _ = file:delete(Path),
{ok, IndexEts};
Error -> Error
end.
index_lookup(IndexEts, Key) ->
case ets:lookup(IndexEts, Key) of
[] -> not_found;
[Entry] -> Entry
end.
index_lookup_positive_ref_count(IndexEts, Key) ->
case index_lookup(IndexEts, Key) of
not_found -> not_found;
#msg_location { ref_count = 0 } -> not_found;
#msg_location {} = MsgLocation -> MsgLocation
end.
index_update_ref_count(Key, RefCount, State) ->
index_update_fields(Key, {#msg_location.ref_count, RefCount}, State).
%% @todo We currently fetch all and then filter by file.
%% This might lead to too many lookups... How to best
%% optimize this? ets:select didn't seem great.
index_select_from_file(IndexEts, MsgIds, File) ->
All = [index_lookup(IndexEts, Id) || Id <- MsgIds],
[MsgLoc || MsgLoc=#msg_location{file=MsgFile} <- All, MsgFile =:= File].
index_lookup(Key, #gc_state { index_module = Index,
index_state = State }) ->
Index:lookup(Key, State);
index_insert(IndexEts, Obj) ->
true = ets:insert_new(IndexEts, Obj),
ok.
index_lookup(Key, #client_msstate { index_module = Index,
index_state = State }) ->
Index:lookup(Key, State);
index_update(IndexEts, Obj) ->
true = ets:insert(IndexEts, Obj),
ok.
index_lookup(Key, #msstate { index_module = Index, index_state = State }) ->
Index:lookup(Key, State).
index_update_fields(IndexEts, Key, Updates) ->
true = ets:update_element(IndexEts, Key, Updates),
ok.
index_insert(Obj, #msstate { index_module = Index, index_state = State }) ->
Index:insert(Obj, State).
index_update_ref_counter(IndexEts, Key, RefCount) ->
_ = ets:update_counter(IndexEts, Key, RefCount),
ok.
index_update(Obj, #msstate { index_module = Index, index_state = State }) ->
Index:update(Obj, State).
index_update_ref_counter(IndexEts, Key, RefCount, Default) ->
_ = ets:update_counter(IndexEts, Key, RefCount, Default),
ok.
index_update_fields(Key, Updates, #msstate{ index_module = Index,
index_state = State }) ->
Index:update_fields(Key, Updates, State);
index_update_fields(Key, Updates, #gc_state{ index_module = Index,
index_state = State }) ->
Index:update_fields(Key, Updates, State).
index_delete(IndexEts, Key) ->
true = ets:delete(IndexEts, Key),
ok.
index_delete(Key, #msstate { index_module = Index, index_state = State }) ->
Index:delete(Key, State).
index_delete_object(IndexEts, Obj) ->
true = ets:delete_object(IndexEts, Obj),
ok.
index_delete_object(Obj, #gc_state{ index_module = Index,
index_state = State }) ->
Index:delete_object(Obj, State).
index_clean_up_temporary_reference_count_entries(IndexEts) ->
MatchHead = #msg_location { file = undefined, _ = '_' },
ets:select_delete(IndexEts, [{MatchHead, [], [true]}]),
ok.
index_clean_up_temporary_reference_count_entries(
#msstate { index_module = Index,
index_state = State }) ->
Index:clean_up_temporary_reference_count_entries_without_file(State).
index_terminate(IndexEts, Dir) ->
case ets:tab2file(IndexEts, filename:join(Dir, ?INDEX_FILE_NAME),
[{extended_info, [object_count]}]) of
ok -> ok;
{error, Err} ->
rabbit_log:error("Unable to save message store index"
" for directory ~tp.~nError: ~tp",
[Dir, Err])
end,
ets:delete(IndexEts).
%%----------------------------------------------------------------------------
%% shutdown and recovery
%%----------------------------------------------------------------------------
recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Name) ->
{false, IndexModule:new(Dir), []};
recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Name) ->
recover_index_and_client_refs(_Recover, undefined, Dir, _Name) ->
{false, index_new(Dir), []};
recover_index_and_client_refs(false, _ClientRefs, Dir, Name) ->
rabbit_log:warning("Message store ~tp: rebuilding indices from scratch", [Name]),
{false, IndexModule:new(Dir), []};
recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Name) ->
{false, index_new(Dir), []};
recover_index_and_client_refs(true, ClientRefs, Dir, Name) ->
Fresh = fun (ErrorMsg, ErrorArgs) ->
rabbit_log:warning("Message store ~tp : " ++ ErrorMsg ++ "~n"
"rebuilding indices from scratch",
[Name | ErrorArgs]),
{false, IndexModule:new(Dir), []}
{false, index_new(Dir), []}
end,
case read_recovery_terms(Dir) of
{false, Error} ->
Fresh("failed to read recovery terms: ~tp", [Error]);
{true, Terms} ->
RecClientRefs = proplists:get_value(client_refs, Terms, []),
RecIndexModule = proplists:get_value(index_module, Terms),
%% We expect the index module to either be unset or be set
%% to rabbit_msg_store_ets_index. This is needed for graceful
%% upgrade to RabbitMQ 4.0 and above. Starting from 4.0
%% however RabbitMQ will not save the index module in the
%% recovery terms, so this check can be removed in 4.1 or later.
%% What this effectively does is that for users that had a
%% custom index module in 3.13 we force a dirty recovery
%% to switch them to ets. Others can proceed as normal.
RecIndexModule = proplists:get_value(index_module, Terms,
rabbit_msg_store_ets_index),
case (lists:sort(ClientRefs) =:= lists:sort(RecClientRefs)
andalso IndexModule =:= RecIndexModule) of
true -> case IndexModule:recover(Dir) of
{ok, IndexState1} ->
{true, IndexState1, ClientRefs};
andalso RecIndexModule =:= rabbit_msg_store_ets_index) of
true -> case index_recover(Dir) of
{ok, IndexEts} ->
{true, IndexEts, ClientRefs};
{error, Error} ->
Fresh("failed to recover index: ~tp", [Error])
end;
false when RecIndexModule =/= rabbit_msg_store_ets_index ->
Fresh("custom index backends have been removed; using ETS index", []);
false -> Fresh("recovery terms differ from present", [])
end
end.
@ -1571,28 +1703,24 @@ recover_file_summary(true, Dir) ->
{error, _Error} -> recover_file_summary(false, Dir)
end.
count_msg_refs(Gen, Seed, State) ->
count_msg_refs(Gen, Seed, State = #msstate{ index_ets = IndexEts }) ->
case Gen(Seed) of
finished ->
ok;
%% @todo This is currently required by tests but can't happen otherwise?
{_MsgId, 0, Next} ->
count_msg_refs(Gen, Next, State);
{MsgId, Delta, Next} ->
ok = case index_lookup(MsgId, State) of
not_found ->
index_insert(#msg_location { msg_id = MsgId,
file = undefined,
ref_count = Delta },
State);
#msg_location { ref_count = RefCount } = StoreEntry ->
NewRefCount = RefCount + Delta,
case NewRefCount of
0 -> index_delete(MsgId, State);
_ -> index_update(StoreEntry #msg_location {
ref_count = NewRefCount },
State)
end
end,
%% This clause is kept for v1 compatibility purposes.
%% It can be removed once we no longer support converting from v1 data.
{MsgId, 1, Next} ->
index_update_ref_counter(IndexEts, MsgId, +1,
#msg_location{msg_id=MsgId, file=undefined, ref_count=1}),
count_msg_refs(Gen, Next, State);
{MsgIds, Next} ->
lists:foreach(fun(MsgId) ->
index_update_ref_counter(IndexEts, MsgId, +1,
#msg_location{msg_id=MsgId, file=undefined, ref_count=1})
end, MsgIds),
count_msg_refs(Gen, Next, State)
end.
@ -1616,26 +1744,27 @@ build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
State1, Files)}
end.
build_index_worker(Gatherer, State = #msstate { dir = Dir },
build_index_worker(Gatherer, #msstate { index_ets = IndexEts, dir = Dir },
File, Files) ->
FileName = filenum_to_name(File),
rabbit_log:debug("Rebuilding message location index from ~ts (~B file(s) remaining)",
[form_filename(Dir, FileName), length(Files)]),
{ok, Messages0, FileSize} =
%% The scan function already dealt with duplicate messages
%% within the file. We then get messages in reverse order.
{ok, Messages, FileSize} =
scan_file_for_valid_messages(Dir, FileName),
%% The scan gives us messages end-of-file first so we reverse the list
%% in case a compaction had occurred before shutdown to not have to repeat it.
Messages = lists:reverse(Messages0),
%% Valid messages are in file order so the last message is
%% the last message from the list.
{ValidMessages, ValidTotalSize} =
lists:foldl(
fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
%% We only keep the first message in the file. Duplicates (due to compaction) get ignored.
case index_lookup(MsgId, State) of
%% Fan-out may result in the same message data in multiple
%% files so we have to guard against it.
case index_lookup(IndexEts, MsgId) of
#msg_location { file = undefined } = StoreEntry ->
ok = index_update(StoreEntry #msg_location {
ok = index_update(IndexEts, StoreEntry #msg_location {
file = File, offset = Offset,
total_size = TotalSize },
State),
total_size = TotalSize }),
{[Obj | VMAcc], VTSAcc + TotalSize};
_ ->
{VMAcc, VTSAcc}
@ -1649,7 +1778,7 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
[] -> case ValidMessages of
[] -> 0;
_ -> {_MsgId, TotalSize, Offset} =
hd(ValidMessages),
lists:last(ValidMessages),
Offset + TotalSize
end;
[_|_] -> FileSize
@ -1675,11 +1804,12 @@ enqueue_build_index_workers(Gatherer, [File|Files], State) ->
enqueue_build_index_workers(Gatherer, Files, State).
reduce_index(Gatherer, LastFile,
State = #msstate { file_summary_ets = FileSummaryEts }) ->
State = #msstate { index_ets = IndexEts,
file_summary_ets = FileSummaryEts }) ->
case gatherer:out(Gatherer) of
empty ->
ok = gatherer:stop(Gatherer),
ok = index_clean_up_temporary_reference_count_entries(State),
ok = index_clean_up_temporary_reference_count_entries(IndexEts),
Offset = case ets:lookup(FileSummaryEts, LastFile) of
[] -> 0;
[#file_summary { file_size = FileSize }] -> FileSize
@ -1706,33 +1836,6 @@ rebuild_index(Gatherer, Files, State) ->
%% garbage collection / compaction / aggregation -- internal
%%----------------------------------------------------------------------------
maybe_roll_to_new_file(
Offset,
State = #msstate { dir = Dir,
current_file_handle = CurHdl,
current_file = CurFile,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
file_size_limit = FileSizeLimit })
when Offset >= FileSizeLimit ->
State1 = internal_sync(State),
ok = writer_close(CurHdl),
NextFile = CurFile + 1,
{ok, NextHdl} = writer_open(Dir, NextFile),
true = ets:insert_new(FileSummaryEts, #file_summary {
file = NextFile,
valid_total_size = 0,
file_size = 0,
locked = false }),
%% We only delete messages from the cache that were written to disk
%% in the previous file.
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
State1 #msstate { current_file_handle = NextHdl,
current_file = NextFile,
current_file_offset = 0 };
maybe_roll_to_new_file(_, State) ->
State.
%% We keep track of files that have seen removes and
%% check those periodically for compaction. We only
%% compact files that have less than half valid data.
@ -1821,7 +1924,8 @@ delete_file_if_empty(File, State = #msstate {
%% We do not try to look at messages that are not the last because we do not want to
%% accidentally write over messages that were moved earlier.
compact_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
compact_file(File, State = #gc_state { index_ets = IndexEts,
file_summary_ets = FileSummaryEts,
dir = Dir,
msg_store = Server }) ->
%% Get metadata about the file. Will be used to calculate
@ -1848,9 +1952,8 @@ compact_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
%% be there for readers. Note that it's OK if we crash at any point before we
%% update the index because the old data is still there until we truncate.
lists:foreach(fun ({UpdateMsgId, UpdateOffset}) ->
ok = index_update_fields(UpdateMsgId,
[{#msg_location.offset, UpdateOffset}],
State)
ok = index_update_fields(IndexEts, UpdateMsgId,
[{#msg_location.offset, UpdateOffset}])
end, IndexUpdates),
%% We can truncate only if there are no files opened before this timestamp.
ThresholdTimestamp = erlang:monotonic_time(),
@ -1995,17 +2098,17 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
ok
end.
scan_and_vacuum_message_file(File, State = #gc_state{ dir = Dir }) ->
scan_and_vacuum_message_file(File, #gc_state{ index_ets = IndexEts, dir = Dir }) ->
%% Messages here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
lists:foldl(
fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) ->
case index_lookup(MsgId, State) of
case index_lookup(IndexEts, MsgId) of
#msg_location { file = File, total_size = TotalSize,
offset = Offset, ref_count = 0 } = Entry ->
ok = index_delete_object(Entry, State),
index_delete_object(IndexEts, Entry),
Acc;
#msg_location { file = File, total_size = TotalSize,
offset = Offset } = Entry ->

View File

@ -1,87 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_msg_store_ets_index).
-include_lib("rabbit_common/include/rabbit_msg_store.hrl").
-behaviour(rabbit_msg_store_index).
-export([new/1, recover/1,
lookup/2, select_from_file/3, insert/2, update/2, update_fields/3, delete/2,
delete_object/2, clean_up_temporary_reference_count_entries_without_file/1, terminate/1]).
-define(MSG_LOC_NAME, rabbit_msg_store_ets_index).
-define(FILENAME, "msg_store_index.ets").
-record(state,
{table,
%% Stored as binary() as opposed to file:filename() to save memory.
dir :: binary()}).
new(Dir) ->
_ = file:delete(filename:join(Dir, ?FILENAME)),
Tid = ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.msg_id}]),
#state { table = Tid, dir = rabbit_file:filename_to_binary(Dir) }.
recover(Dir) ->
Path = filename:join(Dir, ?FILENAME),
case ets:file2tab(Path) of
{ok, Tid} -> _ = file:delete(Path),
{ok, #state { table = Tid, dir = rabbit_file:filename_to_binary(Dir) }};
Error -> Error
end.
lookup(Key, State) ->
case ets:lookup(State #state.table, Key) of
[] -> not_found;
[Entry] -> Entry
end.
%% @todo We currently fetch all and then filter by file.
%% This might lead to too many lookups... How to best
%% optimize this? ets:select didn't seem great.
select_from_file(MsgIds, File, State) ->
All = [lookup(Id, State) || Id <- MsgIds],
[MsgLoc || MsgLoc=#msg_location{file=MsgFile} <- All, MsgFile =:= File].
insert(Obj, State) ->
true = ets:insert_new(State #state.table, Obj),
ok.
update(Obj, State) ->
true = ets:insert(State #state.table, Obj),
ok.
update_fields(Key, Updates, State) ->
true = ets:update_element(State #state.table, Key, Updates),
ok.
delete(Key, State) ->
true = ets:delete(State #state.table, Key),
ok.
delete_object(Obj, State) ->
true = ets:delete_object(State #state.table, Obj),
ok.
clean_up_temporary_reference_count_entries_without_file(State) ->
MatchHead = #msg_location { file = undefined, _ = '_' },
ets:select_delete(State #state.table, [{MatchHead, [], [true]}]),
ok.
terminate(#state { table = MsgLocations, dir = DirBin }) ->
Dir = rabbit_file:binary_to_filename(DirBin),
case ets:tab2file(MsgLocations, filename:join(Dir, ?FILENAME),
[{extended_info, [object_count]}]) of
ok -> ok;
{error, Err} ->
rabbit_log:error("Unable to save message store index"
" for directory ~tp.~nError: ~tp",
[Dir, Err])
end,
ets:delete(MsgLocations).

View File

@ -91,7 +91,7 @@ end_per_suite(Config) ->
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
ClusterSize = 2,
ClusterSize = 1,
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Group},
{rmq_nodes_count, ClusterSize}

View File

@ -59,7 +59,6 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_json.erl",
"src/rabbit_log.erl",
"src/rabbit_misc.erl",
"src/rabbit_msg_store_index.erl",
"src/rabbit_net.erl",
"src/rabbit_nodes_common.erl",
"src/rabbit_numerical.erl",
@ -155,7 +154,6 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_json.erl",
"src/rabbit_log.erl",
"src/rabbit_misc.erl",
"src/rabbit_msg_store_index.erl",
"src/rabbit_net.erl",
"src/rabbit_nodes_common.erl",
"src/rabbit_numerical.erl",
@ -246,7 +244,6 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_json.erl",
"src/rabbit_log.erl",
"src/rabbit_misc.erl",
"src/rabbit_msg_store_index.erl",
"src/rabbit_net.erl",
"src/rabbit_nodes_common.erl",
"src/rabbit_numerical.erl",
@ -286,7 +283,6 @@ def all_srcs(name = "all_srcs"):
"include/rabbit_framing.hrl",
"include/rabbit_memory.hrl",
"include/rabbit_misc.hrl",
"include/rabbit_msg_store.hrl",
"include/resource.hrl",
],
)

View File

@ -1,12 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-include("rabbit.hrl").
-type(msg() :: any()).
-record(msg_location, {msg_id, ref_count, file, offset, total_size}).

View File

@ -1,89 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_msg_store_index).
-include("rabbit_msg_store.hrl").
%% Behaviour module to provide pluggable message store index.
%% The index is used to locate message on disk and for reference-counting.
%% Message store have several additional assumptions about performance and
%% atomicity of some operations. See comments for each callback.
-type(dir() :: string()).
-type(index_state() :: any()).
-type(fieldpos() :: non_neg_integer()).
-type(fieldvalue() :: any()).
-type(msg_location() :: #msg_location{}).
%% There are two ways of starting an index:
%% - `new` - starts a clean index
%% - `recover` - attempts to read a saved index
%% In both cases the old saved state should be deleted from directory.
%% Initialize a fresh index state for msg store directory.
-callback new(dir()) -> index_state().
%% Try to recover gracefully stopped index state.
-callback recover(dir()) -> rabbit_types:ok_or_error2(index_state(), any()).
%% Gracefully shutdown the index.
%% Should save the index state, which will be loaded by the 'recover' function.
-callback terminate(index_state()) -> any().
%% Lookup an entry in the index.
%% Is called concurrently by msg_store, it's clients and GC processes.
%% This function is called multiple times for each message store operation.
%% Message store tries to avoid writing messages on disk if consumers can
%% process them fast, so there will be a lot of lookups for non-existent
%% entries, which should be as fast as possible.
-callback lookup(rabbit_types:msg_id(), index_state()) -> ('not_found' | msg_location()).
%% Insert an entry into the index.
%% Is called by a msg_store process only.
%% This function can exit if there is already an entry with the same ID
-callback insert(msg_location(), index_state()) -> 'ok'.
%% Update an entry in the index.
%% Is called by a msg_store process only.
%% The function is called during message store recovery after crash.
%% The difference between update and insert functions, is that update
%% should not fail if entry already exist, and should be atomic.
-callback update(msg_location(), index_state()) -> 'ok'.
%% Update positional fields in the entry tuple.
%% Is called by msg_store and GC processes concurrently.
%% This function can exit if there is no entry with specified ID
%% This function is called to update reference-counters and file locations.
%% File locations are updated from a GC process, reference-counters are
%% updated from a message store process.
%% This function should be atomic.
-callback update_fields(rabbit_types:msg_id(), ({fieldpos(), fieldvalue()} |
[{fieldpos(), fieldvalue()}]),
index_state()) -> 'ok'.
%% Delete an entry from the index by ID.
%% Is called from a msg_store process only.
%% This function should be atomic.
-callback delete(rabbit_types:msg_id(), index_state()) -> 'ok'.
%% Delete an exactly matching entry from the index.
%% Is called by GC process only.
%% This function should match exact object to avoid deleting a zero-reference
%% object, which reference-counter is being concurrently updated.
%% This function should be atomic.
-callback delete_object(msg_location(), index_state()) -> 'ok'.
%% Delete temporary reference count entries with the 'file' record field equal to 'undefined'.
%% Is called during index rebuild from scratch (e.g. after non-clean stop)
%% During recovery after non-clean stop or file corruption, reference-counters
%% are added to the index with `undefined` value for the `file` field.
%% If message is found in a message store file, it's file field is updated.
%% If some reference-counters miss the message location after recovery - they
%% should be deleted.
-callback clean_up_temporary_reference_count_entries_without_file(index_state()) -> 'ok'.

View File

@ -686,7 +686,6 @@ rabbit:
- rabbit_mirror_queue_misc
- rabbit_mnesia
- rabbit_msg_store
- rabbit_msg_store_ets_index
- rabbit_msg_store_gc
- rabbit_networking
- rabbit_networking_store
@ -802,7 +801,6 @@ rabbit_common:
- rabbit_json
- rabbit_log
- rabbit_misc
- rabbit_msg_store_index
- rabbit_net
- rabbit_nodes_common
- rabbit_numerical