CQ: Use v2 sets instead of gb_sets for confirms

For the following flags I see an improvement of
30k/s to 34k/s on my machine:

-x 1 -y 1 -A 1000 -q 1000 -c 1000 -s 1000 -f persistent
-u cqv2 --queue-args=x-queue-version=2
This commit is contained in:
Loïc Hoguin 2022-06-10 15:24:02 +02:00
parent a31be66af5
commit 3683ab9a6e
No known key found for this signature in database
GPG Key ID: C69E26E3A9DF618F
7 changed files with 67 additions and 73 deletions

View File

@ -111,7 +111,7 @@
%% and there are outstanding unconfirmed messages.
%% In that case the buffer is flushed to disk when
%% the queue requests a sync (after a timeout).
confirms = gb_sets:new() :: gb_sets:set(),
confirms = sets:new([{version,2}]) :: sets:set(),
%% Segments we currently know of along with the
%% number of unacked messages remaining in the
@ -156,7 +156,7 @@
%% Types copied from rabbit_queue_index.
-type on_sync_fun() :: fun ((gb_sets:set()) -> ok).
-type on_sync_fun() :: fun ((sets:set()) -> ok).
-type contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean()).
-type shutdown_terms() :: list() | 'non_clean_shutdown'.
@ -658,7 +658,7 @@ reduce_fd_usage(SegmentToOpen, State = #qi{ fds = OpenFds0 }) ->
maybe_mark_unconfirmed(MsgId, #message_properties{ needs_confirming = true },
State = #qi { confirms = Confirms }) ->
State#qi{ confirms = gb_sets:add_element(MsgId, Confirms) };
State#qi{ confirms = sets:add_element(MsgId, Confirms) };
maybe_mark_unconfirmed(_, _, State) ->
State.
@ -1055,19 +1055,19 @@ sync(State0 = #qi{ confirms = Confirms,
on_sync = OnSyncFun }) ->
?DEBUG("~0p", [State0]),
State = flush_buffer(State0, full, segment_entry_count()),
_ = case gb_sets:is_empty(Confirms) of
_ = case sets:is_empty(Confirms) of
true ->
ok;
false ->
OnSyncFun(Confirms)
end,
State#qi{ confirms = gb_sets:new() }.
State#qi{ confirms = sets:new([{version,2}]) }.
-spec needs_sync(state()) -> 'false'.
needs_sync(State = #qi{ confirms = Confirms }) ->
?DEBUG("~0p", [State]),
case gb_sets:is_empty(Confirms) of
case sets:is_empty(Confirms) of
true -> false;
false -> confirms
end.

View File

@ -122,7 +122,7 @@
%% publisher confirms will be sent at regular
%% intervals after the index has been flushed
%% to disk.
confirms = gb_sets:new() :: gb_sets:set(),
confirms = sets:new([{version,2}]) :: sets:set(),
on_sync :: on_sync_fun()
}).
@ -131,7 +131,7 @@
-type msg_location() :: {?MODULE, non_neg_integer(), non_neg_integer()}.
-export_type([msg_location/0]).
-type on_sync_fun() :: fun ((gb_sets:set(), 'written' | 'ignored') -> any()).
-type on_sync_fun() :: fun ((sets:set(), 'written' | 'ignored') -> any()).
-spec init(rabbit_amqqueue:name(), on_sync_fun()) -> state().
@ -248,7 +248,7 @@ maybe_cache(SeqId, MsgSize, Msg, State = #qs{ cache = Cache,
maybe_mark_unconfirmed(MsgId, #message_properties{ needs_confirming = true },
State = #qs { confirms = Confirms }) ->
State#qs{ confirms = gb_sets:add_element(MsgId, Confirms) };
State#qs{ confirms = sets:add_element(MsgId, Confirms) };
maybe_mark_unconfirmed(_, _, State) ->
State.
@ -258,12 +258,12 @@ sync(State = #qs{ confirms = Confirms,
on_sync = OnSyncFun }) ->
?DEBUG("~0p", [State]),
flush_write_fd(State),
case gb_sets:is_empty(Confirms) of
case sets:is_empty(Confirms) of
true ->
State;
false ->
OnSyncFun(Confirms, written),
State#qs{ confirms = gb_sets:new() }
State#qs{ confirms = sets:new([{version,2}]) }
end.
-spec read(rabbit_variable_queue:seq_id(), msg_location(), State)

View File

@ -162,7 +162,7 @@
fun ((A) -> 'finished' |
{rabbit_types:msg_id(), non_neg_integer(), A}).
-type maybe_msg_id_fun() ::
'undefined' | fun ((gb_sets:set(), 'written' | 'ignored') -> any()).
'undefined' | fun ((sets:set(), 'written' | 'ignored') -> any()).
-type maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok').
-type deletion_thunk() :: fun (() -> boolean()).
@ -907,7 +907,7 @@ handle_cast({write, CRef, MsgId, Flow},
ignore ->
%% A 'remove' has already been issued and eliminated the
%% 'write'.
State1 = blind_confirm(CRef, gb_sets:singleton(MsgId),
State1 = blind_confirm(CRef, sets:add_element(MsgId, sets:new([{version,2}])),
ignored, State),
%% If all writes get eliminated, cur_file_cache_ets could
%% grow unbounded. To prevent that we delete the cache
@ -938,7 +938,7 @@ handle_cast({remove, CRef, MsgIds}, State) ->
ignore -> {Removed, State2}
end
end, {[], State}, MsgIds),
noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(RemovedMsgIds),
noreply(maybe_compact(client_confirm(CRef, sets:from_list(RemovedMsgIds),
ignored, State1)));
handle_cast({combine_files, Source, Destination, Reclaimed},
@ -1066,7 +1066,7 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
cref_to_msg_ids = CTM }) ->
State1 = stop_sync_timer(State),
CGs = maps:fold(fun (CRef, MsgIds, NS) ->
case gb_sets:is_empty(MsgIds) of
case sets:is_empty(MsgIds) of
true -> NS;
false -> [{CRef, MsgIds} | NS]
end
@ -1156,7 +1156,7 @@ write_message(MsgId, Msg, CRef,
true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
update_pending_confirms(
fun (MsgOnDiskFun, CTM) ->
MsgOnDiskFun(gb_sets:singleton(MsgId), written),
MsgOnDiskFun(sets:add_element(MsgId, sets:new([{version,2}])), written),
CTM
end, CRef, State1)
end.
@ -1356,8 +1356,8 @@ record_pending_confirm(CRef, MsgId, State) ->
update_pending_confirms(
fun (_MsgOnDiskFun, CTM) ->
NewMsgIds = case maps:find(CRef, CTM) of
error -> gb_sets:singleton(MsgId);
{ok, MsgIds} -> gb_sets:add(MsgId, MsgIds)
error -> sets:add_element(MsgId, sets:new([{version,2}]));
{ok, MsgIds} -> sets:add_element(MsgId, MsgIds)
end,
maps:put(CRef, NewMsgIds, CTM)
end, CRef, State).
@ -1366,11 +1366,10 @@ client_confirm(CRef, MsgIds, ActionTaken, State) ->
update_pending_confirms(
fun (MsgOnDiskFun, CTM) ->
case maps:find(CRef, CTM) of
{ok, Gs} -> MsgOnDiskFun(gb_sets:intersection(Gs, MsgIds),
{ok, Gs} -> MsgOnDiskFun(sets:intersection(Gs, MsgIds),
ActionTaken),
MsgIds1 = rabbit_misc:gb_sets_difference(
Gs, MsgIds),
case gb_sets:is_empty(MsgIds1) of
MsgIds1 = sets:subtract(Gs, MsgIds),
case sets:is_empty(MsgIds1) of
true -> maps:remove(CRef, CTM);
false -> maps:put(CRef, MsgIds1, CTM)
end;

View File

@ -249,7 +249,7 @@
unacked :: non_neg_integer()
}).
-type seg_map() :: {map(), [segment()]}.
-type on_sync_fun() :: fun ((gb_sets:set()) -> ok).
-type on_sync_fun() :: fun ((sets:set()) -> ok).
-type qistate() :: #qistate { dir :: file:filename(),
segments :: 'undefined' | seg_map(),
journal_handle :: hdl(),
@ -257,8 +257,8 @@
max_journal_entries :: non_neg_integer(),
on_sync :: on_sync_fun(),
on_sync_msg :: on_sync_fun(),
unconfirmed :: gb_sets:set(),
unconfirmed_msg :: gb_sets:set(),
unconfirmed :: sets:set(),
unconfirmed_msg :: sets:set(),
pre_publish_cache :: list(),
delivered_cache :: list()
}.
@ -439,9 +439,9 @@ maybe_needs_confirming(MsgProps, MsgOrId,
end,
?MSG_ID_BYTES = size(MsgId),
case {MsgProps#message_properties.needs_confirming, MsgOrId} of
{true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC),
{true, MsgId} -> UC1 = sets:add_element(MsgId, UC),
State#qistate{unconfirmed = UC1};
{true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM),
{true, _} -> UCM1 = sets:add_element(MsgId, UCM),
State#qistate{unconfirmed_msg = UCM1};
{false, _} -> State
end.
@ -474,7 +474,7 @@ needs_sync(#qistate{journal_handle = undefined}) ->
needs_sync(#qistate{journal_handle = JournalHdl,
unconfirmed = UC,
unconfirmed_msg = UCM}) ->
case gb_sets:is_empty(UC) andalso gb_sets:is_empty(UCM) of
case sets:is_empty(UC) andalso sets:is_empty(UCM) of
true -> case file_handle_cache:needs_sync(JournalHdl) of
true -> other;
false -> false
@ -623,8 +623,8 @@ blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun) ->
max_journal_entries = MaxJournal,
on_sync = OnSyncFun,
on_sync_msg = OnSyncMsgFun,
unconfirmed = gb_sets:new(),
unconfirmed_msg = gb_sets:new(),
unconfirmed = sets:new([{version,2}]),
unconfirmed_msg = sets:new([{version,2}]),
pre_publish_cache = [],
delivered_cache = [],
queue_name = Name }.
@ -1101,15 +1101,15 @@ notify_sync(State = #qistate{unconfirmed = UC,
unconfirmed_msg = UCM,
on_sync = OnSyncFun,
on_sync_msg = OnSyncMsgFun}) ->
State1 = case gb_sets:is_empty(UC) of
State1 = case sets:is_empty(UC) of
true -> State;
false -> OnSyncFun(UC),
State#qistate{unconfirmed = gb_sets:new()}
State#qistate{unconfirmed = sets:new([{version,2}])}
end,
case gb_sets:is_empty(UCM) of
case sets:is_empty(UCM) of
true -> State1;
false -> OnSyncMsgFun(UCM),
State1#qistate{unconfirmed_msg = gb_sets:new()}
State1#qistate{unconfirmed_msg = sets:new([{version,2}])}
end.
%%----------------------------------------------------------------------------

View File

@ -443,10 +443,10 @@
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
rates :: rates(),
msgs_on_disk :: gb_sets:set(),
msg_indices_on_disk :: gb_sets:set(),
unconfirmed :: gb_sets:set(),
confirmed :: gb_sets:set(),
msgs_on_disk :: sets:set(),
msg_indices_on_disk :: sets:set(),
unconfirmed :: sets:set(),
confirmed :: sets:set(),
ack_out_counter :: non_neg_integer(),
ack_in_counter :: non_neg_integer(),
disk_read_count :: non_neg_integer(),
@ -700,10 +700,10 @@ batch_publish_delivered(Publishes, ChPid, Flow, State) ->
discard(_MsgId, _ChPid, _Flow, State) -> State.
drain_confirmed(State = #vqstate { confirmed = C }) ->
case gb_sets:is_empty(C) of
case sets:is_empty(C) of
true -> {[], State}; %% common case
false -> {gb_sets:to_list(C), State #vqstate {
confirmed = gb_sets:new() }}
false -> {sets:to_list(C), State #vqstate {
confirmed = sets:new([{version, 2}]) }}
end.
dropwhile(Pred, State) ->
@ -1385,8 +1385,8 @@ one_if(false) -> 0.
cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
gb_sets_maybe_insert(false, _Val, Set) -> Set;
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
sets_maybe_insert(false, _Val, Set) -> Set;
sets_maybe_insert(true, Val, Set) -> sets:add_element(Val, Set).
msg_status(Version, IsPersistent, IsDelivered, SeqId,
Msg = #basic_message {id = MsgId}, MsgProps, IndexMaxSize) ->
@ -1625,10 +1625,10 @@ init(QueueVsn, IsDurable, IndexMod, IndexState, StoreState, DeltaCount, DeltaByt
out_counter = 0,
in_counter = 0,
rates = blank_rates(Now),
msgs_on_disk = gb_sets:new(),
msg_indices_on_disk = gb_sets:new(),
unconfirmed = gb_sets:new(),
confirmed = gb_sets:new(),
msgs_on_disk = sets:new([{version,2}]),
msg_indices_on_disk = sets:new([{version,2}]),
unconfirmed = sets:new([{version,2}]),
confirmed = sets:new([{version,2}]),
ack_out_counter = 0,
ack_in_counter = 0,
disk_read_count = 0,
@ -1987,7 +1987,7 @@ is_pending_ack_empty(State) ->
count_pending_acks(State) =:= 0.
is_unconfirmed_empty(#vqstate { unconfirmed = UC }) ->
gb_sets:is_empty(UC).
sets:is_empty(UC).
count_pending_acks(#vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA }) ->
@ -2088,7 +2088,7 @@ publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
State2 = State1 #vqstate { delta = Delta1 },
stats_published_disk(MsgStatus1, State2)
end,
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
UC1 = sets_maybe_insert(NeedsConfirming, MsgId, UC),
State4#vqstate{ next_seq_id = SeqId + 1,
next_deliver_seq_id = maybe_next_deliver_seq_id(SeqId, NextDeliverSeqId, IsDelivered),
in_counter = InCount + 1,
@ -2119,7 +2119,7 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, id = Msg
MsgStatus = msg_status(Version, IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize),
{MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
UC1 = sets_maybe_insert(NeedsConfirming, MsgId, UC),
{SeqId,
stats_published_pending_acks(MsgStatus1,
State2#vqstate{ next_seq_id = SeqId + 1,
@ -2444,10 +2444,10 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
unconfirmed = UC,
confirmed = C }) ->
State #vqstate {
msgs_on_disk = rabbit_misc:gb_sets_difference(MOD, MsgIdSet),
msg_indices_on_disk = rabbit_misc:gb_sets_difference(MIOD, MsgIdSet),
unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet),
confirmed = gb_sets:union(C, MsgIdSet) }.
msgs_on_disk = sets:subtract(MOD, MsgIdSet),
msg_indices_on_disk = sets:subtract(MIOD, MsgIdSet),
unconfirmed = sets:subtract(UC, MsgIdSet),
confirmed = sets:union(C, MsgIdSet) }.
msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
Callback(?MODULE,
@ -2463,11 +2463,11 @@ msgs_written_to_disk(Callback, MsgIdSet, written) ->
%% this intersection call.
%%
%% The same may apply to msg_indices_written_to_disk as well.
Confirmed = gb_sets:intersection(UC, MsgIdSet),
record_confirms(gb_sets:intersection(MsgIdSet, MIOD),
Confirmed = sets:intersection(UC, MsgIdSet),
record_confirms(sets:intersection(MsgIdSet, MIOD),
State #vqstate {
msgs_on_disk =
gb_sets:union(MOD, Confirmed) })
sets:union(MOD, Confirmed) })
end).
msg_indices_written_to_disk(Callback, MsgIdSet) ->
@ -2475,11 +2475,11 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
Confirmed = gb_sets:intersection(UC, MsgIdSet),
record_confirms(gb_sets:intersection(MsgIdSet, MOD),
Confirmed = sets:intersection(UC, MsgIdSet),
record_confirms(sets:intersection(MsgIdSet, MOD),
State #vqstate {
msg_indices_on_disk =
gb_sets:union(MIOD, Confirmed) })
sets:union(MIOD, Confirmed) })
end).
msgs_and_indices_written_to_disk(Callback, MsgIdSet) ->

View File

@ -368,7 +368,7 @@ on_disk_capture([_|_], _Awaiting, Pid) ->
on_disk_capture(OnDisk, Awaiting, Pid) ->
receive
{on_disk, MsgIdsS} ->
MsgIds = gb_sets:to_list(MsgIdsS),
MsgIds = sets:to_list(MsgIdsS),
on_disk_capture(OnDisk ++ (MsgIds -- Awaiting), Awaiting -- MsgIds,
Pid);
stop ->
@ -481,7 +481,7 @@ test_msg_store_confirm_timer() ->
?PERSISTENT_MSG_STORE,
Ref,
fun (MsgIds, _ActionTaken) ->
case gb_sets:is_member(MsgId, MsgIds) of
case sets:is_element(MsgId, MsgIds) of
true -> Self ! on_disk;
false -> ok
end
@ -1673,23 +1673,23 @@ publish_and_confirm(Q, Payload, Count) ->
{ok, Acc, _Actions} = rabbit_queue_type:deliver([Q], Delivery, Acc0),
Acc
end, QTState0, Seqs),
wait_for_confirms(gb_sets:from_list(Seqs)),
wait_for_confirms(sets:from_list(Seqs)),
QTState.
wait_for_confirms(Unconfirmed) ->
case gb_sets:is_empty(Unconfirmed) of
case sets:is_empty(Unconfirmed) of
true -> ok;
false ->
receive
{'$gen_cast',
{queue_event, _QName, {confirm, Confirmed, _}}} ->
wait_for_confirms(
rabbit_misc:gb_sets_difference(
Unconfirmed, gb_sets:from_list(Confirmed)));
sets:subtract(
Unconfirmed, sets:from_list(Confirmed)));
{'$gen_cast', {confirm, Confirmed, _}} ->
wait_for_confirms(
rabbit_misc:gb_sets_difference(
Unconfirmed, gb_sets:from_list(Confirmed)))
sets:subtract(
Unconfirmed, sets:from_list(Confirmed)))
after ?TIMEOUT ->
flush(),
exit(timeout_waiting_for_confirm)

View File

@ -64,7 +64,6 @@
-export([append_rpc_all_nodes/4, append_rpc_all_nodes/5]).
-export([os_cmd/1]).
-export([is_os_process_alive/1]).
-export([gb_sets_difference/2]).
-export([version/0, otp_release/0, platform_and_version/0, otp_system_version/0,
rabbitmq_and_erlang_versions/0, which_applications/0]).
-export([sequence_error/1]).
@ -232,7 +231,6 @@
-spec format_message_queue(any(), priority_queue:q()) -> term().
-spec os_cmd(string()) -> string().
-spec is_os_process_alive(non_neg_integer()) -> boolean().
-spec gb_sets_difference(gb_sets:set(), gb_sets:set()) -> gb_sets:set().
-spec version() -> string().
-spec otp_release() -> string().
-spec otp_system_version() -> string().
@ -1208,9 +1206,6 @@ exit_loop(Port) ->
{Port, _} -> exit_loop(Port)
end.
gb_sets_difference(S1, S2) ->
gb_sets:fold(fun gb_sets:delete_any/2, S1, S2).
version() ->
{ok, VSN} = application:get_key(rabbit, vsn),
VSN.