From 3683ab9a6e205ffe42c654890ce3fce223ca8d36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 10 Jun 2022 15:24:02 +0200 Subject: [PATCH] CQ: Use v2 sets instead of gb_sets for confirms For the following flags I see an improvement of 30k/s to 34k/s on my machine: -x 1 -y 1 -A 1000 -q 1000 -c 1000 -s 1000 -f persistent -u cqv2 --queue-args=x-queue-version=2 --- .../src/rabbit_classic_queue_index_v2.erl | 12 ++--- .../src/rabbit_classic_queue_store_v2.erl | 10 ++-- deps/rabbit/src/rabbit_msg_store.erl | 21 ++++---- deps/rabbit/src/rabbit_queue_index.erl | 24 ++++----- deps/rabbit/src/rabbit_variable_queue.erl | 52 +++++++++---------- deps/rabbit/test/backing_queue_SUITE.erl | 16 +++--- deps/rabbit_common/src/rabbit_misc.erl | 5 -- 7 files changed, 67 insertions(+), 73 deletions(-) diff --git a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl index 7e3d1d1200..6a9b73f875 100644 --- a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl @@ -111,7 +111,7 @@ %% and there are outstanding unconfirmed messages. %% In that case the buffer is flushed to disk when %% the queue requests a sync (after a timeout). - confirms = gb_sets:new() :: gb_sets:set(), + confirms = sets:new([{version,2}]) :: sets:set(), %% Segments we currently know of along with the %% number of unacked messages remaining in the @@ -156,7 +156,7 @@ %% Types copied from rabbit_queue_index. --type on_sync_fun() :: fun ((gb_sets:set()) -> ok). +-type on_sync_fun() :: fun ((sets:set()) -> ok). -type contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean()). -type shutdown_terms() :: list() | 'non_clean_shutdown'. @@ -658,7 +658,7 @@ reduce_fd_usage(SegmentToOpen, State = #qi{ fds = OpenFds0 }) -> maybe_mark_unconfirmed(MsgId, #message_properties{ needs_confirming = true }, State = #qi { confirms = Confirms }) -> - State#qi{ confirms = gb_sets:add_element(MsgId, Confirms) }; + State#qi{ confirms = sets:add_element(MsgId, Confirms) }; maybe_mark_unconfirmed(_, _, State) -> State. @@ -1055,19 +1055,19 @@ sync(State0 = #qi{ confirms = Confirms, on_sync = OnSyncFun }) -> ?DEBUG("~0p", [State0]), State = flush_buffer(State0, full, segment_entry_count()), - _ = case gb_sets:is_empty(Confirms) of + _ = case sets:is_empty(Confirms) of true -> ok; false -> OnSyncFun(Confirms) end, - State#qi{ confirms = gb_sets:new() }. + State#qi{ confirms = sets:new([{version,2}]) }. -spec needs_sync(state()) -> 'false'. needs_sync(State = #qi{ confirms = Confirms }) -> ?DEBUG("~0p", [State]), - case gb_sets:is_empty(Confirms) of + case sets:is_empty(Confirms) of true -> false; false -> confirms end. diff --git a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl index 7e21fa90bc..1af34cb8e7 100644 --- a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl @@ -122,7 +122,7 @@ %% publisher confirms will be sent at regular %% intervals after the index has been flushed %% to disk. - confirms = gb_sets:new() :: gb_sets:set(), + confirms = sets:new([{version,2}]) :: sets:set(), on_sync :: on_sync_fun() }). @@ -131,7 +131,7 @@ -type msg_location() :: {?MODULE, non_neg_integer(), non_neg_integer()}. -export_type([msg_location/0]). --type on_sync_fun() :: fun ((gb_sets:set(), 'written' | 'ignored') -> any()). +-type on_sync_fun() :: fun ((sets:set(), 'written' | 'ignored') -> any()). -spec init(rabbit_amqqueue:name(), on_sync_fun()) -> state(). @@ -248,7 +248,7 @@ maybe_cache(SeqId, MsgSize, Msg, State = #qs{ cache = Cache, maybe_mark_unconfirmed(MsgId, #message_properties{ needs_confirming = true }, State = #qs { confirms = Confirms }) -> - State#qs{ confirms = gb_sets:add_element(MsgId, Confirms) }; + State#qs{ confirms = sets:add_element(MsgId, Confirms) }; maybe_mark_unconfirmed(_, _, State) -> State. @@ -258,12 +258,12 @@ sync(State = #qs{ confirms = Confirms, on_sync = OnSyncFun }) -> ?DEBUG("~0p", [State]), flush_write_fd(State), - case gb_sets:is_empty(Confirms) of + case sets:is_empty(Confirms) of true -> State; false -> OnSyncFun(Confirms, written), - State#qs{ confirms = gb_sets:new() } + State#qs{ confirms = sets:new([{version,2}]) } end. -spec read(rabbit_variable_queue:seq_id(), msg_location(), State) diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl index 5f8601bad8..52386b2fd8 100644 --- a/deps/rabbit/src/rabbit_msg_store.erl +++ b/deps/rabbit/src/rabbit_msg_store.erl @@ -162,7 +162,7 @@ fun ((A) -> 'finished' | {rabbit_types:msg_id(), non_neg_integer(), A}). -type maybe_msg_id_fun() :: - 'undefined' | fun ((gb_sets:set(), 'written' | 'ignored') -> any()). + 'undefined' | fun ((sets:set(), 'written' | 'ignored') -> any()). -type maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok'). -type deletion_thunk() :: fun (() -> boolean()). @@ -907,7 +907,7 @@ handle_cast({write, CRef, MsgId, Flow}, ignore -> %% A 'remove' has already been issued and eliminated the %% 'write'. - State1 = blind_confirm(CRef, gb_sets:singleton(MsgId), + State1 = blind_confirm(CRef, sets:add_element(MsgId, sets:new([{version,2}])), ignored, State), %% If all writes get eliminated, cur_file_cache_ets could %% grow unbounded. To prevent that we delete the cache @@ -938,7 +938,7 @@ handle_cast({remove, CRef, MsgIds}, State) -> ignore -> {Removed, State2} end end, {[], State}, MsgIds), - noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(RemovedMsgIds), + noreply(maybe_compact(client_confirm(CRef, sets:from_list(RemovedMsgIds), ignored, State1))); handle_cast({combine_files, Source, Destination, Reclaimed}, @@ -1066,7 +1066,7 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, cref_to_msg_ids = CTM }) -> State1 = stop_sync_timer(State), CGs = maps:fold(fun (CRef, MsgIds, NS) -> - case gb_sets:is_empty(MsgIds) of + case sets:is_empty(MsgIds) of true -> NS; false -> [{CRef, MsgIds} | NS] end @@ -1156,7 +1156,7 @@ write_message(MsgId, Msg, CRef, true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), update_pending_confirms( fun (MsgOnDiskFun, CTM) -> - MsgOnDiskFun(gb_sets:singleton(MsgId), written), + MsgOnDiskFun(sets:add_element(MsgId, sets:new([{version,2}])), written), CTM end, CRef, State1) end. @@ -1356,8 +1356,8 @@ record_pending_confirm(CRef, MsgId, State) -> update_pending_confirms( fun (_MsgOnDiskFun, CTM) -> NewMsgIds = case maps:find(CRef, CTM) of - error -> gb_sets:singleton(MsgId); - {ok, MsgIds} -> gb_sets:add(MsgId, MsgIds) + error -> sets:add_element(MsgId, sets:new([{version,2}])); + {ok, MsgIds} -> sets:add_element(MsgId, MsgIds) end, maps:put(CRef, NewMsgIds, CTM) end, CRef, State). @@ -1366,11 +1366,10 @@ client_confirm(CRef, MsgIds, ActionTaken, State) -> update_pending_confirms( fun (MsgOnDiskFun, CTM) -> case maps:find(CRef, CTM) of - {ok, Gs} -> MsgOnDiskFun(gb_sets:intersection(Gs, MsgIds), + {ok, Gs} -> MsgOnDiskFun(sets:intersection(Gs, MsgIds), ActionTaken), - MsgIds1 = rabbit_misc:gb_sets_difference( - Gs, MsgIds), - case gb_sets:is_empty(MsgIds1) of + MsgIds1 = sets:subtract(Gs, MsgIds), + case sets:is_empty(MsgIds1) of true -> maps:remove(CRef, CTM); false -> maps:put(CRef, MsgIds1, CTM) end; diff --git a/deps/rabbit/src/rabbit_queue_index.erl b/deps/rabbit/src/rabbit_queue_index.erl index 952c68147e..4287a87ab1 100644 --- a/deps/rabbit/src/rabbit_queue_index.erl +++ b/deps/rabbit/src/rabbit_queue_index.erl @@ -249,7 +249,7 @@ unacked :: non_neg_integer() }). -type seg_map() :: {map(), [segment()]}. --type on_sync_fun() :: fun ((gb_sets:set()) -> ok). +-type on_sync_fun() :: fun ((sets:set()) -> ok). -type qistate() :: #qistate { dir :: file:filename(), segments :: 'undefined' | seg_map(), journal_handle :: hdl(), @@ -257,8 +257,8 @@ max_journal_entries :: non_neg_integer(), on_sync :: on_sync_fun(), on_sync_msg :: on_sync_fun(), - unconfirmed :: gb_sets:set(), - unconfirmed_msg :: gb_sets:set(), + unconfirmed :: sets:set(), + unconfirmed_msg :: sets:set(), pre_publish_cache :: list(), delivered_cache :: list() }. @@ -439,9 +439,9 @@ maybe_needs_confirming(MsgProps, MsgOrId, end, ?MSG_ID_BYTES = size(MsgId), case {MsgProps#message_properties.needs_confirming, MsgOrId} of - {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC), + {true, MsgId} -> UC1 = sets:add_element(MsgId, UC), State#qistate{unconfirmed = UC1}; - {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM), + {true, _} -> UCM1 = sets:add_element(MsgId, UCM), State#qistate{unconfirmed_msg = UCM1}; {false, _} -> State end. @@ -474,7 +474,7 @@ needs_sync(#qistate{journal_handle = undefined}) -> needs_sync(#qistate{journal_handle = JournalHdl, unconfirmed = UC, unconfirmed_msg = UCM}) -> - case gb_sets:is_empty(UC) andalso gb_sets:is_empty(UCM) of + case sets:is_empty(UC) andalso sets:is_empty(UCM) of true -> case file_handle_cache:needs_sync(JournalHdl) of true -> other; false -> false @@ -623,8 +623,8 @@ blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun) -> max_journal_entries = MaxJournal, on_sync = OnSyncFun, on_sync_msg = OnSyncMsgFun, - unconfirmed = gb_sets:new(), - unconfirmed_msg = gb_sets:new(), + unconfirmed = sets:new([{version,2}]), + unconfirmed_msg = sets:new([{version,2}]), pre_publish_cache = [], delivered_cache = [], queue_name = Name }. @@ -1101,15 +1101,15 @@ notify_sync(State = #qistate{unconfirmed = UC, unconfirmed_msg = UCM, on_sync = OnSyncFun, on_sync_msg = OnSyncMsgFun}) -> - State1 = case gb_sets:is_empty(UC) of + State1 = case sets:is_empty(UC) of true -> State; false -> OnSyncFun(UC), - State#qistate{unconfirmed = gb_sets:new()} + State#qistate{unconfirmed = sets:new([{version,2}])} end, - case gb_sets:is_empty(UCM) of + case sets:is_empty(UCM) of true -> State1; false -> OnSyncMsgFun(UCM), - State1#qistate{unconfirmed_msg = gb_sets:new()} + State1#qistate{unconfirmed_msg = sets:new([{version,2}])} end. %%---------------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index a554168f00..4eb57a8e88 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -443,10 +443,10 @@ out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), rates :: rates(), - msgs_on_disk :: gb_sets:set(), - msg_indices_on_disk :: gb_sets:set(), - unconfirmed :: gb_sets:set(), - confirmed :: gb_sets:set(), + msgs_on_disk :: sets:set(), + msg_indices_on_disk :: sets:set(), + unconfirmed :: sets:set(), + confirmed :: sets:set(), ack_out_counter :: non_neg_integer(), ack_in_counter :: non_neg_integer(), disk_read_count :: non_neg_integer(), @@ -700,10 +700,10 @@ batch_publish_delivered(Publishes, ChPid, Flow, State) -> discard(_MsgId, _ChPid, _Flow, State) -> State. drain_confirmed(State = #vqstate { confirmed = C }) -> - case gb_sets:is_empty(C) of + case sets:is_empty(C) of true -> {[], State}; %% common case - false -> {gb_sets:to_list(C), State #vqstate { - confirmed = gb_sets:new() }} + false -> {sets:to_list(C), State #vqstate { + confirmed = sets:new([{version, 2}]) }} end. dropwhile(Pred, State) -> @@ -1385,8 +1385,8 @@ one_if(false) -> 0. cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. -gb_sets_maybe_insert(false, _Val, Set) -> Set; -gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). +sets_maybe_insert(false, _Val, Set) -> Set; +sets_maybe_insert(true, Val, Set) -> sets:add_element(Val, Set). msg_status(Version, IsPersistent, IsDelivered, SeqId, Msg = #basic_message {id = MsgId}, MsgProps, IndexMaxSize) -> @@ -1625,10 +1625,10 @@ init(QueueVsn, IsDurable, IndexMod, IndexState, StoreState, DeltaCount, DeltaByt out_counter = 0, in_counter = 0, rates = blank_rates(Now), - msgs_on_disk = gb_sets:new(), - msg_indices_on_disk = gb_sets:new(), - unconfirmed = gb_sets:new(), - confirmed = gb_sets:new(), + msgs_on_disk = sets:new([{version,2}]), + msg_indices_on_disk = sets:new([{version,2}]), + unconfirmed = sets:new([{version,2}]), + confirmed = sets:new([{version,2}]), ack_out_counter = 0, ack_in_counter = 0, disk_read_count = 0, @@ -1987,7 +1987,7 @@ is_pending_ack_empty(State) -> count_pending_acks(State) =:= 0. is_unconfirmed_empty(#vqstate { unconfirmed = UC }) -> - gb_sets:is_empty(UC). + sets:is_empty(UC). count_pending_acks(#vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> @@ -2088,7 +2088,7 @@ publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, State2 = State1 #vqstate { delta = Delta1 }, stats_published_disk(MsgStatus1, State2) end, - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), + UC1 = sets_maybe_insert(NeedsConfirming, MsgId, UC), State4#vqstate{ next_seq_id = SeqId + 1, next_deliver_seq_id = maybe_next_deliver_seq_id(SeqId, NextDeliverSeqId, IsDelivered), in_counter = InCount + 1, @@ -2119,7 +2119,7 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, id = Msg MsgStatus = msg_status(Version, IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), + UC1 = sets_maybe_insert(NeedsConfirming, MsgId, UC), {SeqId, stats_published_pending_acks(MsgStatus1, State2#vqstate{ next_seq_id = SeqId + 1, @@ -2444,10 +2444,10 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, unconfirmed = UC, confirmed = C }) -> State #vqstate { - msgs_on_disk = rabbit_misc:gb_sets_difference(MOD, MsgIdSet), - msg_indices_on_disk = rabbit_misc:gb_sets_difference(MIOD, MsgIdSet), - unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet), - confirmed = gb_sets:union(C, MsgIdSet) }. + msgs_on_disk = sets:subtract(MOD, MsgIdSet), + msg_indices_on_disk = sets:subtract(MIOD, MsgIdSet), + unconfirmed = sets:subtract(UC, MsgIdSet), + confirmed = sets:union(C, MsgIdSet) }. msgs_written_to_disk(Callback, MsgIdSet, ignored) -> Callback(?MODULE, @@ -2463,11 +2463,11 @@ msgs_written_to_disk(Callback, MsgIdSet, written) -> %% this intersection call. %% %% The same may apply to msg_indices_written_to_disk as well. - Confirmed = gb_sets:intersection(UC, MsgIdSet), - record_confirms(gb_sets:intersection(MsgIdSet, MIOD), + Confirmed = sets:intersection(UC, MsgIdSet), + record_confirms(sets:intersection(MsgIdSet, MIOD), State #vqstate { msgs_on_disk = - gb_sets:union(MOD, Confirmed) }) + sets:union(MOD, Confirmed) }) end). msg_indices_written_to_disk(Callback, MsgIdSet) -> @@ -2475,11 +2475,11 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC }) -> - Confirmed = gb_sets:intersection(UC, MsgIdSet), - record_confirms(gb_sets:intersection(MsgIdSet, MOD), + Confirmed = sets:intersection(UC, MsgIdSet), + record_confirms(sets:intersection(MsgIdSet, MOD), State #vqstate { msg_indices_on_disk = - gb_sets:union(MIOD, Confirmed) }) + sets:union(MIOD, Confirmed) }) end). msgs_and_indices_written_to_disk(Callback, MsgIdSet) -> diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index f55dee148a..ffd2778c44 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -368,7 +368,7 @@ on_disk_capture([_|_], _Awaiting, Pid) -> on_disk_capture(OnDisk, Awaiting, Pid) -> receive {on_disk, MsgIdsS} -> - MsgIds = gb_sets:to_list(MsgIdsS), + MsgIds = sets:to_list(MsgIdsS), on_disk_capture(OnDisk ++ (MsgIds -- Awaiting), Awaiting -- MsgIds, Pid); stop -> @@ -481,7 +481,7 @@ test_msg_store_confirm_timer() -> ?PERSISTENT_MSG_STORE, Ref, fun (MsgIds, _ActionTaken) -> - case gb_sets:is_member(MsgId, MsgIds) of + case sets:is_element(MsgId, MsgIds) of true -> Self ! on_disk; false -> ok end @@ -1673,23 +1673,23 @@ publish_and_confirm(Q, Payload, Count) -> {ok, Acc, _Actions} = rabbit_queue_type:deliver([Q], Delivery, Acc0), Acc end, QTState0, Seqs), - wait_for_confirms(gb_sets:from_list(Seqs)), + wait_for_confirms(sets:from_list(Seqs)), QTState. wait_for_confirms(Unconfirmed) -> - case gb_sets:is_empty(Unconfirmed) of + case sets:is_empty(Unconfirmed) of true -> ok; false -> receive {'$gen_cast', {queue_event, _QName, {confirm, Confirmed, _}}} -> wait_for_confirms( - rabbit_misc:gb_sets_difference( - Unconfirmed, gb_sets:from_list(Confirmed))); + sets:subtract( + Unconfirmed, sets:from_list(Confirmed))); {'$gen_cast', {confirm, Confirmed, _}} -> wait_for_confirms( - rabbit_misc:gb_sets_difference( - Unconfirmed, gb_sets:from_list(Confirmed))) + sets:subtract( + Unconfirmed, sets:from_list(Confirmed))) after ?TIMEOUT -> flush(), exit(timeout_waiting_for_confirm) diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl index d0cc1cbc8e..abda1f65bb 100644 --- a/deps/rabbit_common/src/rabbit_misc.erl +++ b/deps/rabbit_common/src/rabbit_misc.erl @@ -64,7 +64,6 @@ -export([append_rpc_all_nodes/4, append_rpc_all_nodes/5]). -export([os_cmd/1]). -export([is_os_process_alive/1]). --export([gb_sets_difference/2]). -export([version/0, otp_release/0, platform_and_version/0, otp_system_version/0, rabbitmq_and_erlang_versions/0, which_applications/0]). -export([sequence_error/1]). @@ -232,7 +231,6 @@ -spec format_message_queue(any(), priority_queue:q()) -> term(). -spec os_cmd(string()) -> string(). -spec is_os_process_alive(non_neg_integer()) -> boolean(). --spec gb_sets_difference(gb_sets:set(), gb_sets:set()) -> gb_sets:set(). -spec version() -> string(). -spec otp_release() -> string(). -spec otp_system_version() -> string(). @@ -1208,9 +1206,6 @@ exit_loop(Port) -> {Port, _} -> exit_loop(Port) end. -gb_sets_difference(S1, S2) -> - gb_sets:fold(fun gb_sets:delete_any/2, S1, S2). - version() -> {ok, VSN} = application:get_key(rabbit, vsn), VSN.