CQ: Confirm related optimisations (#7516)
* CQ: Optimise shared store remove when nothing to remove The message(s) was/were already optimised out during write when the client acked faster than we could process the write message. * Optimise sets:subtract in single-confirm case This happens due to message store optimisations that tries to confirm as fast as possible on write or ack, even if that means processing a single confirm. The ack scenario is common because clients tend to not be built for multi-ack. The optimisation avoids calling an expensive sets:subtract/2 when there is a single new confirm and instead does a sets:del_element/2 of the first set.
This commit is contained in:
parent
b623f8e8dd
commit
0dce848549
|
|
@ -936,8 +936,11 @@ handle_cast({remove, CRef, MsgIds}, State) ->
|
|||
ignore -> {Removed, State2}
|
||||
end
|
||||
end, {[], State}, MsgIds),
|
||||
noreply(maybe_compact(client_confirm(CRef, sets:from_list(RemovedMsgIds, [{version, 2}]),
|
||||
ignored, State1)));
|
||||
case RemovedMsgIds of
|
||||
[] -> noreply(State1);
|
||||
_ -> noreply(maybe_compact(client_confirm(CRef, sets:from_list(RemovedMsgIds, [{version, 2}]),
|
||||
ignored, State1)))
|
||||
end;
|
||||
|
||||
handle_cast({combine_files, Source, Destination, Reclaimed},
|
||||
State = #msstate { sum_file_size = SumFileSize,
|
||||
|
|
@ -1372,7 +1375,7 @@ client_confirm(CRef, MsgIds, ActionTaken, State) ->
|
|||
case maps:find(CRef, CTM) of
|
||||
{ok, Gs} -> MsgOnDiskFun(sets:intersection(Gs, MsgIds),
|
||||
ActionTaken),
|
||||
MsgIds1 = sets:subtract(Gs, MsgIds),
|
||||
MsgIds1 = sets_subtract(Gs, MsgIds),
|
||||
case sets:is_empty(MsgIds1) of
|
||||
true -> maps:remove(CRef, CTM);
|
||||
false -> maps:put(CRef, MsgIds1, CTM)
|
||||
|
|
@ -1381,6 +1384,13 @@ client_confirm(CRef, MsgIds, ActionTaken, State) ->
|
|||
end
|
||||
end, CRef, State).
|
||||
|
||||
%% Function defined in both rabbit_msg_store and rabbit_variable_queue.
|
||||
sets_subtract(Set1, Set2) ->
|
||||
case sets:size(Set2) of
|
||||
1 -> sets:del_element(hd(sets:to_list(Set2)), Set1);
|
||||
_ -> sets:subtract(Set1, Set2)
|
||||
end.
|
||||
|
||||
blind_confirm(CRef, MsgIds, ActionTaken, State) ->
|
||||
update_pending_confirms(
|
||||
fun (MsgOnDiskFun, CTM) -> MsgOnDiskFun(MsgIds, ActionTaken), CTM end,
|
||||
|
|
|
|||
|
|
@ -2355,11 +2355,18 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
|
|||
unconfirmed = UC,
|
||||
confirmed = C }) ->
|
||||
State #vqstate {
|
||||
msgs_on_disk = sets:subtract(MOD, MsgIdSet),
|
||||
msg_indices_on_disk = sets:subtract(MIOD, MsgIdSet),
|
||||
unconfirmed = sets:subtract(UC, MsgIdSet),
|
||||
msgs_on_disk = sets_subtract(MOD, MsgIdSet),
|
||||
msg_indices_on_disk = sets_subtract(MIOD, MsgIdSet),
|
||||
unconfirmed = sets_subtract(UC, MsgIdSet),
|
||||
confirmed = sets:union(C, MsgIdSet) }.
|
||||
|
||||
%% Function defined in both rabbit_msg_store and rabbit_variable_queue.
|
||||
sets_subtract(Set1, Set2) ->
|
||||
case sets:size(Set2) of
|
||||
1 -> sets:del_element(hd(sets:to_list(Set2)), Set1);
|
||||
_ -> sets:subtract(Set1, Set2)
|
||||
end.
|
||||
|
||||
msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
|
||||
Callback(?MODULE,
|
||||
fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);
|
||||
|
|
|
|||
Loading…
Reference in New Issue