CQ: Remove ability to change shared store index module

It will always use the ETS index. This change lets us
do optimisations that would otherwise not be possible,
including 81b2c39834953d9e1bd28938b7a6e472498fdf13.

A small functional change is included in this commit:
we now always use ets:update_counter to update the
ref_count, instead of a mix of update_{counter,fields}.

When upgrading to 4.0, the index will be rebuilt for
all users that were using a custom index module.
This commit is contained in:
Loïc Hoguin 2024-06-11 15:01:08 +02:00
parent d45fbc3da4
commit 41ce4da5ca
No known key found for this signature in database
GPG Key ID: C69E26E3A9DF618F
9 changed files with 156 additions and 321 deletions

View File

@ -39,7 +39,6 @@ _APP_ENV = """[
{vm_memory_calculation_strategy, rss},
{memory_monitor_interval, 2500},
{disk_free_limit, 50000000}, %% 50MB
{msg_store_index_module, rabbit_msg_store_ets_index},
{backing_queue_module, rabbit_variable_queue},
%% 0 ("no limit") would make a better default, but that
%% breaks the QPid Java client

View File

@ -19,7 +19,6 @@ define PROJECT_ENV
{vm_memory_calculation_strategy, rss},
{memory_monitor_interval, 2500},
{disk_free_limit, 50000000}, %% 50MB
{msg_store_index_module, rabbit_msg_store_ets_index},
{backing_queue_module, rabbit_variable_queue},
%% 0 ("no limit") would make a better default, but that
%% breaks the QPid Java client

3
deps/rabbit/app.bzl vendored
View File

@ -170,7 +170,6 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_mirror_queue_misc.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_msg_store.erl",
"src/rabbit_msg_store_ets_index.erl",
"src/rabbit_msg_store_gc.erl",
"src/rabbit_networking.erl",
"src/rabbit_networking_store.erl",
@ -431,7 +430,6 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_mirror_queue_misc.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_msg_store.erl",
"src/rabbit_msg_store_ets_index.erl",
"src/rabbit_msg_store_gc.erl",
"src/rabbit_networking.erl",
"src/rabbit_networking_store.erl",
@ -711,7 +709,6 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_mirror_queue_misc.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_msg_store.erl",
"src/rabbit_msg_store_ets_index.erl",
"src/rabbit_msg_store_gc.erl",
"src/rabbit_networking.erl",
"src/rabbit_networking_store.erl",

View File

@ -67,11 +67,8 @@
{
%% store directory
dir :: file:filename(),
%% the module for index ops,
%% rabbit_msg_store_ets_index by default
index_module,
%% where are messages?
index_state,
%% index table
index_ets,
%% current file name as number
current_file,
%% current file handle since the last fsync?
@ -113,8 +110,7 @@
{ server,
client_ref,
reader,
index_state,
index_module,
index_ets,
dir,
file_handles_ets,
cur_file_cache_ets,
@ -127,8 +123,7 @@
-record(gc_state,
{ dir,
index_module,
index_state,
index_ets,
file_summary_ets,
file_handles_ets,
msg_store
@ -145,8 +140,7 @@
-export_type([gc_state/0, file_num/0]).
-type gc_state() :: #gc_state { dir :: file:filename(),
index_module :: atom(),
index_state :: any(),
index_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
file_handles_ets :: ets:tid(),
msg_store :: server()
@ -159,8 +153,7 @@
server :: server(),
client_ref :: client_ref(),
reader :: undefined | {non_neg_integer(), file:fd()},
index_state :: any(),
index_module :: atom(),
index_ets :: any(),
%% Stored as binary() as opposed to file:filename() to save memory.
dir :: binary(),
file_handles_ets :: ets:tid(),
@ -404,7 +397,7 @@ successfully_recovered_state(Server) ->
-spec client_init(server(), client_ref(), maybe_msg_id_fun()) -> client_msstate().
client_init(Server, Ref, MsgOnDiskFun) when is_pid(Server); is_atom(Server) ->
{IState, IModule, Dir, FileHandlesEts, CurFileCacheEts, FlyingEts} =
{IndexEts, Dir, FileHandlesEts, CurFileCacheEts, FlyingEts} =
gen_server2:call(
Server, {new_client_state, Ref, self(), MsgOnDiskFun},
infinity),
@ -413,8 +406,7 @@ client_init(Server, Ref, MsgOnDiskFun) when is_pid(Server); is_atom(Server) ->
#client_msstate { server = Server,
client_ref = Ref,
reader = undefined,
index_state = IState,
index_module = IModule,
index_ets = IndexEts,
dir = rabbit_file:filename_to_binary(Dir),
file_handles_ets = FileHandlesEts,
cur_file_cache_ets = CurFileCacheEts,
@ -465,14 +457,14 @@ write(MsgRef, MsgId, Msg, CState) -> client_write(MsgRef, MsgId, Msg, noflow, CS
-spec read(rabbit_types:msg_id(), client_msstate()) ->
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}.
read(MsgId,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
read(MsgId, CState = #client_msstate { index_ets = IndexEts,
cur_file_cache_ets = CurFileCacheEts }) ->
file_handle_cache_stats:update(msg_store_read),
%% Check the cur file cache
case ets:lookup(CurFileCacheEts, MsgId) of
[] ->
%% @todo It's probably a bug if we don't get a positive ref count.
case index_lookup_positive_ref_count(MsgId, CState) of
case index_lookup_positive_ref_count(IndexEts, MsgId) of
not_found -> {not_found, CState};
MsgLocation -> client_read3(MsgLocation, CState)
end;
@ -483,11 +475,6 @@ read(MsgId,
-spec read_many([rabbit_types:msg_id()], client_msstate())
-> {#{rabbit_types:msg_id() => msg()}, client_msstate()}.
%% We disable read_many when the index module is not ETS for the time being.
%% We can introduce the new index module callback as a breaking change in 4.0.
read_many(_, CState = #client_msstate{ index_module = IndexMod })
when IndexMod =/= rabbit_msg_store_ets_index ->
{#{}, CState};
read_many(MsgIds, CState) ->
file_handle_cache_stats:inc(msg_store_read, length(MsgIds)),
%% We receive MsgIds in rouhgly the younger->older order so
@ -506,8 +493,8 @@ read_many_cache([], CState, Acc) ->
{Acc, CState}.
%% We will read from disk one file at a time in no particular order.
read_many_disk([MsgId|Tail], CState, Acc) ->
case index_lookup_positive_ref_count(MsgId, CState) of
read_many_disk([MsgId|Tail], CState = #client_msstate{ index_ets = IndexEts }, Acc) ->
case index_lookup_positive_ref_count(IndexEts, MsgId) of
%% We ignore this message if it's not found and will try
%% to read it individually later instead. We can't call
%% the message store and expect good performance here.
@ -519,6 +506,7 @@ read_many_disk([], CState, Acc) ->
{Acc, CState}.
read_many_file2(MsgIds0, CState = #client_msstate{ dir = Dir,
index_ets = IndexEts,
file_handles_ets = FileHandlesEts,
reader = Reader0,
client_ref = Ref }, Acc0, File) ->
@ -528,7 +516,7 @@ read_many_file2(MsgIds0, CState = #client_msstate{ dir = Dir,
%% It's possible that we get no results here if compaction
%% was in progress. That's OK: we will try again with those
%% MsgIds to get them from the new file.
MsgLocations0 = index_select_from_file(MsgIds0, File, CState),
MsgLocations0 = index_select_from_file(IndexEts, MsgIds0, File),
case MsgLocations0 of
[] ->
read_many_file3(MsgIds0, CState, Acc0, File);
@ -557,10 +545,6 @@ read_many_file2(MsgIds0, CState = #client_msstate{ dir = Dir,
read_many_file3(MsgIds, CState#client_msstate{ reader = Reader }, Acc, File)
end.
index_select_from_file(MsgIds, File, #client_msstate { index_module = Index,
index_state = State }) ->
Index:select_from_file(MsgIds, File, State).
consolidate_reads([#msg_location{offset=NextOffset, total_size=NextSize}|Locs], [{Offset, Size}|Acc])
when Offset + Size =:= NextOffset ->
consolidate_reads(Locs, [{Offset, Size + NextSize}|Acc]);
@ -622,14 +606,15 @@ client_write(MsgRef, MsgId, Msg, Flow,
%% And the file only gets deleted after all data was copied, index
%% was updated and file handles got closed.
client_read3(#msg_location { msg_id = MsgId, file = File },
CState = #client_msstate { file_handles_ets = FileHandlesEts,
CState = #client_msstate { index_ets = IndexEts,
file_handles_ets = FileHandlesEts,
client_ref = Ref }) ->
%% We immediately mark the handle open so that we don't get the
%% file truncated while we are reading from it. The file may still
%% be truncated past that point but that's OK because we do a second
%% index lookup to ensure that we get the updated message location.
mark_handle_open(FileHandlesEts, File, Ref),
case index_lookup(MsgId, CState) of
case index_lookup(IndexEts, MsgId) of
#msg_location { file = File, ref_count = RefCount } = MsgLocation when RefCount > 0 ->
{Msg, CState1} = read_from_disk(MsgLocation, CState),
mark_handle_closed(FileHandlesEts, File, Ref),
@ -714,9 +699,6 @@ init([VHost, Type, BaseDir, ClientRefs, StartupFunState]) ->
Dir = filename:join(BaseDir, atom_to_list(Type)),
Name = filename:join(filename:basename(BaseDir), atom_to_list(Type)),
{ok, IndexModule} = application:get_env(rabbit, msg_store_index_module),
rabbit_log:info("Message store ~tp: using ~tp to provide index", [Name, IndexModule]),
AttemptFileSummaryRecovery =
case ClientRefs of
%% Transient.
@ -732,8 +714,8 @@ init([VHost, Type, BaseDir, ClientRefs, StartupFunState]) ->
%% we start recovering messages from the files on disk.
{FileSummaryRecovered, FileSummaryEts} =
recover_file_summary(AttemptFileSummaryRecovery, Dir),
{CleanShutdown, IndexState, ClientRefs1} =
recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
{CleanShutdown, IndexEts, ClientRefs1} =
recover_index_and_client_refs(FileSummaryRecovered,
ClientRefs, Dir, Name),
Clients = maps:from_list(
[{CRef, {undefined, undefined}} ||
@ -760,8 +742,7 @@ init([VHost, Type, BaseDir, ClientRefs, StartupFunState]) ->
{ok, GCPid} = rabbit_msg_store_gc:start_link(
#gc_state { dir = Dir,
index_module = IndexModule,
index_state = IndexState,
index_ets = IndexEts,
file_summary_ets = FileSummaryEts,
file_handles_ets = FileHandlesEts,
msg_store = self()
@ -771,8 +752,7 @@ init([VHost, Type, BaseDir, ClientRefs, StartupFunState]) ->
?CREDIT_DISC_BOUND),
State = #msstate { dir = Dir,
index_module = IndexModule,
index_state = IndexState,
index_ets = IndexEts,
current_file = 0,
current_file_handle = undefined,
current_file_offset = 0,
@ -835,15 +815,14 @@ handle_call(successfully_recovered_state, _From, State) ->
handle_call({new_client_state, CRef, CPid, MsgOnDiskFun}, _From,
State = #msstate { dir = Dir,
index_state = IndexState,
index_module = IndexModule,
index_ets = IndexEts,
file_handles_ets = FileHandlesEts,
cur_file_cache_ets = CurFileCacheEts,
flying_ets = FlyingEts,
clients = Clients }) ->
Clients1 = maps:put(CRef, {CPid, MsgOnDiskFun}, Clients),
erlang:monitor(process, CPid),
reply({IndexState, IndexModule, Dir, FileHandlesEts,
reply({IndexEts, Dir, FileHandlesEts,
CurFileCacheEts, FlyingEts},
State #msstate { clients = Clients1 });
@ -871,7 +850,8 @@ handle_cast({client_delete, CRef},
noreply(clear_client(CRef, State1));
handle_cast({write, CRef, MsgRef, MsgId, Flow},
State = #msstate { cur_file_cache_ets = CurFileCacheEts,
State = #msstate { index_ets = IndexEts,
cur_file_cache_ets = CurFileCacheEts,
clients = Clients,
credit_disc_bound = CreditDiscBound }) ->
case Flow of
@ -900,7 +880,7 @@ handle_cast({write, CRef, MsgRef, MsgId, Flow},
%% current file then the cache entry will be removed by
%% the normal logic for that in write_message/4 and
%% flush_or_roll_to_new_file/2.
case index_lookup(MsgId, State) of
case index_lookup(IndexEts, MsgId) of
#msg_location { file = File }
when File == State #msstate.current_file ->
ok;
@ -962,8 +942,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) ->
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
terminate(Reason, State = #msstate { index_state = IndexState,
index_module = IndexModule,
terminate(Reason, State = #msstate { index_ets = IndexEts,
current_file_handle = CurHdl,
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
@ -998,9 +977,8 @@ terminate(Reason, State = #msstate { index_state = IndexState,
end,
[true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts,
CurFileCacheEts, FlyingEts]],
IndexModule:terminate(IndexState),
case store_recovery_terms([{client_refs, maps:keys(Clients)},
{index_module, IndexModule}], Dir) of
index_terminate(IndexEts, Dir),
case store_recovery_terms([{client_refs, maps:keys(Clients)}], Dir) of
ok ->
rabbit_log:info("Message store for directory '~ts' is stopped", [Dir]),
ok;
@ -1009,8 +987,7 @@ terminate(Reason, State = #msstate { index_state = IndexState,
" for directory ~tp~nError: ~tp",
[Dir, RTErr])
end,
State3 #msstate { index_state = undefined,
current_file_handle = undefined,
State3 #msstate { current_file_handle = undefined,
current_file_offset = 0 }.
code_change(_OldVsn, State, _Extra) ->
@ -1111,14 +1088,15 @@ write_action({false, not_found}, _MsgId, State) ->
write_action({Mask, #msg_location { ref_count = 0, file = File,
total_size = TotalSize }},
MsgId, State = #msstate { current_file = CurrentFile,
index_ets = IndexEts,
file_summary_ets = FileSummaryEts }) ->
case {Mask, ets:lookup(FileSummaryEts, File)} of
%% Never increase the ref_count for a file that is about to get deleted.
{_, [#file_summary{valid_total_size = 0}]} when File =/= CurrentFile ->
ok = index_delete(MsgId, State),
ok = index_delete(IndexEts, MsgId),
{write, State};
{false, [#file_summary { locked = true }]} ->
ok = index_delete(MsgId, State),
ok = index_delete(IndexEts, MsgId),
{write, State};
{false_if_increment, [#file_summary { locked = true }]} ->
%% The msg for MsgId is older than the client death
@ -1127,13 +1105,13 @@ write_action({Mask, #msg_location { ref_count = 0, file = File,
%% ignore this write.
{ignore, File, State};
{_Mask, [#file_summary {}]} ->
ok = index_update_ref_count(MsgId, 1, State),
ok = index_update_ref_counter(IndexEts, MsgId, +1), %% Effectively set to 1.
State1 = adjust_valid_total_size(File, TotalSize, State),
{confirm, File, State1}
end;
write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
MsgId, State) ->
ok = index_update_ref_count(MsgId, RefCount + 1, State),
write_action({_Mask, #msg_location { file = File }},
MsgId, State = #msstate{ index_ets = IndexEts }) ->
ok = index_update_ref_counter(IndexEts, MsgId, +1),
%% We already know about it, just update counter. Only update
%% field otherwise bad interaction with concurrent GC
{confirm, File, State}.
@ -1160,7 +1138,7 @@ write_message(MsgId, Msg, CRef,
end, CRef, State1)
end.
remove_message(MsgId, CRef, State) ->
remove_message(MsgId, CRef, State = #msstate{ index_ets = IndexEts }) ->
case should_mask_action(CRef, MsgId, State) of
{true, _Location} ->
State;
@ -1176,8 +1154,7 @@ remove_message(MsgId, CRef, State) ->
when RefCount > 0 ->
%% only update field, otherwise bad interaction with
%% concurrent GC
Dec = fun () -> index_update_ref_count(
MsgId, RefCount - 1, State) end,
Dec = fun () -> index_update_ref_counter(IndexEts, MsgId, -1) end,
case RefCount of
%% don't remove from cur_file_cache_ets here because
%% there may be further writes in the mailbox for the
@ -1228,11 +1205,12 @@ write_small_message(MsgId, MsgBodyBin,
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
current_file_offset = CurOffset,
index_ets = IndexEts,
file_summary_ets = FileSummaryEts }) ->
{MaybeFlush, TotalSize} = writer_append(CurHdl, MsgId, MsgBodyBin),
ok = index_insert(
ok = index_insert(IndexEts,
#msg_location { msg_id = MsgId, ref_count = 1, file = CurFile,
offset = CurOffset, total_size = TotalSize }, State),
offset = CurOffset, total_size = TotalSize }),
[_,_] = ets:update_counter(FileSummaryEts, CurFile,
[{#file_summary.valid_total_size, TotalSize},
{#file_summary.file_size, TotalSize}]),
@ -1274,6 +1252,7 @@ write_large_message(MsgId, MsgBodyBin,
current_file_handle = CurHdl,
current_file = CurFile,
current_file_offset = CurOffset,
index_ets = IndexEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts }) ->
{LargeMsgFile, LargeMsgHdl} = case CurOffset of
@ -1292,9 +1271,9 @@ write_large_message(MsgId, MsgBodyBin,
TotalSize = writer_direct_write(LargeMsgHdl, MsgId, MsgBodyBin),
ok = writer_close(LargeMsgHdl),
%% Update ets with the new information.
ok = index_insert(
ok = index_insert(IndexEts,
#msg_location { msg_id = MsgId, ref_count = 1, file = LargeMsgFile,
offset = 0, total_size = TotalSize }, State0),
offset = 0, total_size = TotalSize }),
_ = case CurFile of
%% We didn't open a new file. We must update the existing value.
LargeMsgFile ->
@ -1325,8 +1304,8 @@ write_large_message(MsgId, MsgBodyBin,
current_file = NextFile,
current_file_offset = 0 }.
contains_message(MsgId, From, State) ->
MsgLocation = index_lookup_positive_ref_count(MsgId, State),
contains_message(MsgId, From, State = #msstate{ index_ets = IndexEts }) ->
MsgLocation = index_lookup_positive_ref_count(IndexEts, MsgId),
gen_server2:reply(From, MsgLocation =/= not_found),
State.
@ -1380,9 +1359,9 @@ record_pending_confirm(CRef, MsgId, State) ->
%% rewrite the msg - rewriting it would make it younger than the death
%% msg and thus should be ignored. Note that this (correctly) returns
%% false when testing to remove the death msg itself.
should_mask_action(CRef, MsgId,
State = #msstate{dying_clients = DyingClients}) ->
case {maps:find(CRef, DyingClients), index_lookup(MsgId, State)} of
should_mask_action(CRef, MsgId, #msstate{
index_ets = IndexEts, dying_clients = DyingClients}) ->
case {maps:find(CRef, DyingClients), index_lookup(IndexEts, MsgId)} of
{error, Location} ->
{false, Location};
{{ok, _}, not_found} ->
@ -1562,85 +1541,129 @@ scan_data(<<_, Rest/bits>>, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
scan_data(Rest, Fd, Offset + 1, FileSize, MsgIdsFound, Acc).
%%----------------------------------------------------------------------------
%% index
%% Ets index
%%----------------------------------------------------------------------------
index_lookup_positive_ref_count(Key, State) ->
case index_lookup(Key, State) of
-define(INDEX_TABLE_NAME, rabbit_msg_store_ets_index).
-define(INDEX_FILE_NAME, "msg_store_index.ets").
index_new(Dir) ->
_ = file:delete(filename:join(Dir, ?INDEX_FILE_NAME)),
ets:new(?INDEX_TABLE_NAME, [set, public, {keypos, #msg_location.msg_id}]).
index_recover(Dir) ->
Path = filename:join(Dir, ?INDEX_FILE_NAME),
case ets:file2tab(Path) of
{ok, IndexEts} -> _ = file:delete(Path),
{ok, IndexEts};
Error -> Error
end.
index_lookup(IndexEts, Key) ->
case ets:lookup(IndexEts, Key) of
[] -> not_found;
[Entry] -> Entry
end.
index_lookup_positive_ref_count(IndexEts, Key) ->
case index_lookup(IndexEts, Key) of
not_found -> not_found;
#msg_location { ref_count = 0 } -> not_found;
#msg_location {} = MsgLocation -> MsgLocation
end.
index_update_ref_count(Key, RefCount, State) ->
index_update_fields(Key, {#msg_location.ref_count, RefCount}, State).
%% @todo We currently fetch all and then filter by file.
%% This might lead to too many lookups... How to best
%% optimize this? ets:select didn't seem great.
index_select_from_file(IndexEts, MsgIds, File) ->
All = [index_lookup(IndexEts, Id) || Id <- MsgIds],
[MsgLoc || MsgLoc=#msg_location{file=MsgFile} <- All, MsgFile =:= File].
index_lookup(Key, #gc_state { index_module = Index,
index_state = State }) ->
Index:lookup(Key, State);
index_insert(IndexEts, Obj) ->
true = ets:insert_new(IndexEts, Obj),
ok.
index_lookup(Key, #client_msstate { index_module = Index,
index_state = State }) ->
Index:lookup(Key, State);
index_update(IndexEts, Obj) ->
true = ets:insert(IndexEts, Obj),
ok.
index_lookup(Key, #msstate { index_module = Index, index_state = State }) ->
Index:lookup(Key, State).
index_update_fields(IndexEts, Key, Updates) ->
true = ets:update_element(IndexEts, Key, Updates),
ok.
index_insert(Obj, #msstate { index_module = Index, index_state = State }) ->
Index:insert(Obj, State).
index_update_ref_counter(IndexEts, Key, RefCount) ->
_ = ets:update_counter(IndexEts, Key, RefCount),
ok.
index_update(Obj, #msstate { index_module = Index, index_state = State }) ->
Index:update(Obj, State).
index_update_ref_counter(IndexEts, Key, RefCount, Default) ->
_ = ets:update_counter(IndexEts, Key, RefCount, Default),
ok.
index_update_fields(Key, Updates, #msstate{ index_module = Index,
index_state = State }) ->
Index:update_fields(Key, Updates, State);
index_update_fields(Key, Updates, #gc_state{ index_module = Index,
index_state = State }) ->
Index:update_fields(Key, Updates, State).
index_delete(IndexEts, Key) ->
true = ets:delete(IndexEts, Key),
ok.
index_delete(Key, #msstate { index_module = Index, index_state = State }) ->
Index:delete(Key, State).
index_delete_object(IndexEts, Obj) ->
true = ets:delete_object(IndexEts, Obj),
ok.
index_delete_object(Obj, #gc_state{ index_module = Index,
index_state = State }) ->
Index:delete_object(Obj, State).
index_clean_up_temporary_reference_count_entries(IndexEts) ->
MatchHead = #msg_location { file = undefined, _ = '_' },
ets:select_delete(IndexEts, [{MatchHead, [], [true]}]),
ok.
index_clean_up_temporary_reference_count_entries(
#msstate { index_module = Index,
index_state = State }) ->
Index:clean_up_temporary_reference_count_entries_without_file(State).
index_terminate(IndexEts, Dir) ->
case ets:tab2file(IndexEts, filename:join(Dir, ?INDEX_FILE_NAME),
[{extended_info, [object_count]}]) of
ok -> ok;
{error, Err} ->
rabbit_log:error("Unable to save message store index"
" for directory ~tp.~nError: ~tp",
[Dir, Err])
end,
ets:delete(IndexEts).
%%----------------------------------------------------------------------------
%% shutdown and recovery
%%----------------------------------------------------------------------------
recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Name) ->
{false, IndexModule:new(Dir), []};
recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Name) ->
recover_index_and_client_refs(_Recover, undefined, Dir, _Name) ->
{false, index_new(Dir), []};
recover_index_and_client_refs(false, _ClientRefs, Dir, Name) ->
rabbit_log:warning("Message store ~tp: rebuilding indices from scratch", [Name]),
{false, IndexModule:new(Dir), []};
recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Name) ->
{false, index_new(Dir), []};
recover_index_and_client_refs(true, ClientRefs, Dir, Name) ->
Fresh = fun (ErrorMsg, ErrorArgs) ->
rabbit_log:warning("Message store ~tp : " ++ ErrorMsg ++ "~n"
"rebuilding indices from scratch",
[Name | ErrorArgs]),
{false, IndexModule:new(Dir), []}
{false, index_new(Dir), []}
end,
case read_recovery_terms(Dir) of
{false, Error} ->
Fresh("failed to read recovery terms: ~tp", [Error]);
{true, Terms} ->
RecClientRefs = proplists:get_value(client_refs, Terms, []),
RecIndexModule = proplists:get_value(index_module, Terms),
%% We expect the index module to either be unset or be set
%% to rabbit_msg_store_ets_index. This is needed for graceful
%% upgrade to RabbitMQ 4.0 and above. Starting from 4.0
%% however RabbitMQ will not save the index module in the
%% recovery terms, so this check can be removed in 4.1 or later.
%% What this effectively does is that for users that had a
%% custom index module in 3.13 we force a dirty recovery
%% to switch them to ets. Others can proceed as normal.
RecIndexModule = proplists:get_value(index_module, Terms,
rabbit_msg_store_ets_index),
case (lists:sort(ClientRefs) =:= lists:sort(RecClientRefs)
andalso IndexModule =:= RecIndexModule) of
true -> case IndexModule:recover(Dir) of
{ok, IndexState1} ->
{true, IndexState1, ClientRefs};
andalso RecIndexModule =:= rabbit_msg_store_ets_index) of
true -> case index_recover(Dir) of
{ok, IndexEts} ->
{true, IndexEts, ClientRefs};
{error, Error} ->
Fresh("failed to recover index: ~tp", [Error])
end;
false when RecIndexModule =/= rabbit_msg_store_ets_index ->
Fresh("custom index backends have been removed; using ETS index", []);
false -> Fresh("recovery terms differ from present", [])
end
end.
@ -1676,7 +1699,7 @@ recover_file_summary(true, Dir) ->
{error, _Error} -> recover_file_summary(false, Dir)
end.
count_msg_refs(Gen, Seed, State) ->
count_msg_refs(Gen, Seed, State = #msstate{ index_ets = IndexEts }) ->
case Gen(Seed) of
finished ->
ok;
@ -1686,14 +1709,12 @@ count_msg_refs(Gen, Seed, State) ->
%% This clause is kept for v1 compatibility purposes.
%% It can be removed once we no longer support converting from v1 data.
{MsgId, 1, Next} ->
%% @todo Remove index_module and avoid this element/2.
_ = ets:update_counter(element(2, State#msstate.index_state), MsgId, +1,
index_update_ref_counter(IndexEts, MsgId, +1,
#msg_location{msg_id=MsgId, file=undefined, ref_count=1}),
count_msg_refs(Gen, Next, State);
{MsgIds, Next} ->
lists:foreach(fun(MsgId) ->
%% @todo Remove index_module and avoid this element/2.
ets:update_counter(element(2, State#msstate.index_state), MsgId, +1,
index_update_ref_counter(IndexEts, MsgId, +1,
#msg_location{msg_id=MsgId, file=undefined, ref_count=1})
end, MsgIds),
count_msg_refs(Gen, Next, State)
@ -1719,7 +1740,7 @@ build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
State1, Files)}
end.
build_index_worker(Gatherer, State = #msstate { dir = Dir },
build_index_worker(Gatherer, #msstate { index_ets = IndexEts, dir = Dir },
File, Files) ->
FileName = filenum_to_name(File),
rabbit_log:debug("Rebuilding message location index from ~ts (~B file(s) remaining)",
@ -1735,12 +1756,11 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
%% Fan-out may result in the same message data in multiple
%% files so we have to guard against it.
case index_lookup(MsgId, State) of
case index_lookup(IndexEts, MsgId) of
#msg_location { file = undefined } = StoreEntry ->
ok = index_update(StoreEntry #msg_location {
ok = index_update(IndexEts, StoreEntry #msg_location {
file = File, offset = Offset,
total_size = TotalSize },
State),
total_size = TotalSize }),
{[Obj | VMAcc], VTSAcc + TotalSize};
_ ->
{VMAcc, VTSAcc}
@ -1780,11 +1800,12 @@ enqueue_build_index_workers(Gatherer, [File|Files], State) ->
enqueue_build_index_workers(Gatherer, Files, State).
reduce_index(Gatherer, LastFile,
State = #msstate { file_summary_ets = FileSummaryEts }) ->
State = #msstate { index_ets = IndexEts,
file_summary_ets = FileSummaryEts }) ->
case gatherer:out(Gatherer) of
empty ->
ok = gatherer:stop(Gatherer),
ok = index_clean_up_temporary_reference_count_entries(State),
ok = index_clean_up_temporary_reference_count_entries(IndexEts),
Offset = case ets:lookup(FileSummaryEts, LastFile) of
[] -> 0;
[#file_summary { file_size = FileSize }] -> FileSize
@ -1899,7 +1920,8 @@ delete_file_if_empty(File, State = #msstate {
%% We do not try to look at messages that are not the last because we do not want to
%% accidentally write over messages that were moved earlier.
compact_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
compact_file(File, State = #gc_state { index_ets = IndexEts,
file_summary_ets = FileSummaryEts,
dir = Dir,
msg_store = Server }) ->
%% Get metadata about the file. Will be used to calculate
@ -1926,9 +1948,8 @@ compact_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
%% be there for readers. Note that it's OK if we crash at any point before we
%% update the index because the old data is still there until we truncate.
lists:foreach(fun ({UpdateMsgId, UpdateOffset}) ->
ok = index_update_fields(UpdateMsgId,
[{#msg_location.offset, UpdateOffset}],
State)
ok = index_update_fields(IndexEts, UpdateMsgId,
[{#msg_location.offset, UpdateOffset}])
end, IndexUpdates),
%% We can truncate only if there are no files opened before this timestamp.
ThresholdTimestamp = erlang:monotonic_time(),
@ -2073,17 +2094,17 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
ok
end.
scan_and_vacuum_message_file(File, State = #gc_state{ dir = Dir }) ->
scan_and_vacuum_message_file(File, #gc_state{ index_ets = IndexEts, dir = Dir }) ->
%% Messages here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
lists:foldl(
fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) ->
case index_lookup(MsgId, State) of
case index_lookup(IndexEts, MsgId) of
#msg_location { file = File, total_size = TotalSize,
offset = Offset, ref_count = 0 } = Entry ->
ok = index_delete_object(Entry, State),
index_delete_object(IndexEts, Entry),
Acc;
#msg_location { file = File, total_size = TotalSize,
offset = Offset } = Entry ->

View File

@ -1,87 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_msg_store_ets_index).
-include_lib("rabbit_common/include/rabbit_msg_store.hrl").
-behaviour(rabbit_msg_store_index).
-export([new/1, recover/1,
lookup/2, select_from_file/3, insert/2, update/2, update_fields/3, delete/2,
delete_object/2, clean_up_temporary_reference_count_entries_without_file/1, terminate/1]).
-define(MSG_LOC_NAME, rabbit_msg_store_ets_index).
-define(FILENAME, "msg_store_index.ets").
-record(state,
{table,
%% Stored as binary() as opposed to file:filename() to save memory.
dir :: binary()}).
new(Dir) ->
_ = file:delete(filename:join(Dir, ?FILENAME)),
Tid = ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.msg_id}]),
#state { table = Tid, dir = rabbit_file:filename_to_binary(Dir) }.
recover(Dir) ->
Path = filename:join(Dir, ?FILENAME),
case ets:file2tab(Path) of
{ok, Tid} -> _ = file:delete(Path),
{ok, #state { table = Tid, dir = rabbit_file:filename_to_binary(Dir) }};
Error -> Error
end.
lookup(Key, State) ->
case ets:lookup(State #state.table, Key) of
[] -> not_found;
[Entry] -> Entry
end.
%% @todo We currently fetch all and then filter by file.
%% This might lead to too many lookups... How to best
%% optimize this? ets:select didn't seem great.
select_from_file(MsgIds, File, State) ->
All = [lookup(Id, State) || Id <- MsgIds],
[MsgLoc || MsgLoc=#msg_location{file=MsgFile} <- All, MsgFile =:= File].
insert(Obj, State) ->
true = ets:insert_new(State #state.table, Obj),
ok.
update(Obj, State) ->
true = ets:insert(State #state.table, Obj),
ok.
update_fields(Key, Updates, State) ->
true = ets:update_element(State #state.table, Key, Updates),
ok.
delete(Key, State) ->
true = ets:delete(State #state.table, Key),
ok.
delete_object(Obj, State) ->
true = ets:delete_object(State #state.table, Obj),
ok.
clean_up_temporary_reference_count_entries_without_file(State) ->
MatchHead = #msg_location { file = undefined, _ = '_' },
ets:select_delete(State #state.table, [{MatchHead, [], [true]}]),
ok.
terminate(#state { table = MsgLocations, dir = DirBin }) ->
Dir = rabbit_file:binary_to_filename(DirBin),
case ets:tab2file(MsgLocations, filename:join(Dir, ?FILENAME),
[{extended_info, [object_count]}]) of
ok -> ok;
{error, Err} ->
rabbit_log:error("Unable to save message store index"
" for directory ~tp.~nError: ~tp",
[Dir, Err])
end,
ets:delete(MsgLocations).

View File

@ -91,7 +91,7 @@ end_per_suite(Config) ->
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
ClusterSize = 2,
ClusterSize = 1,
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Group},
{rmq_nodes_count, ClusterSize}

View File

@ -59,7 +59,6 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_json.erl",
"src/rabbit_log.erl",
"src/rabbit_misc.erl",
"src/rabbit_msg_store_index.erl",
"src/rabbit_net.erl",
"src/rabbit_nodes_common.erl",
"src/rabbit_numerical.erl",
@ -155,7 +154,6 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_json.erl",
"src/rabbit_log.erl",
"src/rabbit_misc.erl",
"src/rabbit_msg_store_index.erl",
"src/rabbit_net.erl",
"src/rabbit_nodes_common.erl",
"src/rabbit_numerical.erl",
@ -246,7 +244,6 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_json.erl",
"src/rabbit_log.erl",
"src/rabbit_misc.erl",
"src/rabbit_msg_store_index.erl",
"src/rabbit_net.erl",
"src/rabbit_nodes_common.erl",
"src/rabbit_numerical.erl",

View File

@ -1,89 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_msg_store_index).
-include("rabbit_msg_store.hrl").
%% Behaviour module to provide pluggable message store index.
%% The index is used to locate message on disk and for reference-counting.
%% Message store have several additional assumptions about performance and
%% atomicity of some operations. See comments for each callback.
-type(dir() :: string()).
-type(index_state() :: any()).
-type(fieldpos() :: non_neg_integer()).
-type(fieldvalue() :: any()).
-type(msg_location() :: #msg_location{}).
%% There are two ways of starting an index:
%% - `new` - starts a clean index
%% - `recover` - attempts to read a saved index
%% In both cases the old saved state should be deleted from directory.
%% Initialize a fresh index state for msg store directory.
-callback new(dir()) -> index_state().
%% Try to recover gracefully stopped index state.
-callback recover(dir()) -> rabbit_types:ok_or_error2(index_state(), any()).
%% Gracefully shutdown the index.
%% Should save the index state, which will be loaded by the 'recover' function.
-callback terminate(index_state()) -> any().
%% Lookup an entry in the index.
%% Is called concurrently by msg_store, it's clients and GC processes.
%% This function is called multiple times for each message store operation.
%% Message store tries to avoid writing messages on disk if consumers can
%% process them fast, so there will be a lot of lookups for non-existent
%% entries, which should be as fast as possible.
-callback lookup(rabbit_types:msg_id(), index_state()) -> ('not_found' | msg_location()).
%% Insert an entry into the index.
%% Is called by a msg_store process only.
%% This function can exit if there is already an entry with the same ID
-callback insert(msg_location(), index_state()) -> 'ok'.
%% Update an entry in the index.
%% Is called by a msg_store process only.
%% The function is called during message store recovery after crash.
%% The difference between update and insert functions, is that update
%% should not fail if entry already exist, and should be atomic.
-callback update(msg_location(), index_state()) -> 'ok'.
%% Update positional fields in the entry tuple.
%% Is called by msg_store and GC processes concurrently.
%% This function can exit if there is no entry with specified ID
%% This function is called to update reference-counters and file locations.
%% File locations are updated from a GC process, reference-counters are
%% updated from a message store process.
%% This function should be atomic.
-callback update_fields(rabbit_types:msg_id(), ({fieldpos(), fieldvalue()} |
[{fieldpos(), fieldvalue()}]),
index_state()) -> 'ok'.
%% Delete an entry from the index by ID.
%% Is called from a msg_store process only.
%% This function should be atomic.
-callback delete(rabbit_types:msg_id(), index_state()) -> 'ok'.
%% Delete an exactly matching entry from the index.
%% Is called by GC process only.
%% This function should match exact object to avoid deleting a zero-reference
%% object, which reference-counter is being concurrently updated.
%% This function should be atomic.
-callback delete_object(msg_location(), index_state()) -> 'ok'.
%% Delete temporary reference count entries with the 'file' record field equal to 'undefined'.
%% Is called during index rebuild from scratch (e.g. after non-clean stop)
%% During recovery after non-clean stop or file corruption, reference-counters
%% are added to the index with `undefined` value for the `file` field.
%% If message is found in a message store file, it's file field is updated.
%% If some reference-counters miss the message location after recovery - they
%% should be deleted.
-callback clean_up_temporary_reference_count_entries_without_file(index_state()) -> 'ok'.

View File

@ -686,7 +686,6 @@ rabbit:
- rabbit_mirror_queue_misc
- rabbit_mnesia
- rabbit_msg_store
- rabbit_msg_store_ets_index
- rabbit_msg_store_gc
- rabbit_networking
- rabbit_networking_store
@ -802,7 +801,6 @@ rabbit_common:
- rabbit_json
- rabbit_log
- rabbit_misc
- rabbit_msg_store_index
- rabbit_net
- rabbit_nodes_common
- rabbit_numerical