diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl index 06e993c8d3..cd84bed56e 100644 --- a/deps/rabbit/src/rabbit_msg_store.erl +++ b/deps/rabbit/src/rabbit_msg_store.erl @@ -936,8 +936,11 @@ handle_cast({remove, CRef, MsgIds}, State) -> ignore -> {Removed, State2} end end, {[], State}, MsgIds), - noreply(maybe_compact(client_confirm(CRef, sets:from_list(RemovedMsgIds, [{version, 2}]), - ignored, State1))); + case RemovedMsgIds of + [] -> noreply(State1); + _ -> noreply(maybe_compact(client_confirm(CRef, sets:from_list(RemovedMsgIds, [{version, 2}]), + ignored, State1))) + end; handle_cast({combine_files, Source, Destination, Reclaimed}, State = #msstate { sum_file_size = SumFileSize, @@ -1372,7 +1375,7 @@ client_confirm(CRef, MsgIds, ActionTaken, State) -> case maps:find(CRef, CTM) of {ok, Gs} -> MsgOnDiskFun(sets:intersection(Gs, MsgIds), ActionTaken), - MsgIds1 = sets:subtract(Gs, MsgIds), + MsgIds1 = sets_subtract(Gs, MsgIds), case sets:is_empty(MsgIds1) of true -> maps:remove(CRef, CTM); false -> maps:put(CRef, MsgIds1, CTM) @@ -1381,6 +1384,13 @@ client_confirm(CRef, MsgIds, ActionTaken, State) -> end end, CRef, State). +%% Function defined in both rabbit_msg_store and rabbit_variable_queue. +sets_subtract(Set1, Set2) -> + case sets:size(Set2) of + 1 -> sets:del_element(hd(sets:to_list(Set2)), Set1); + _ -> sets:subtract(Set1, Set2) + end. + blind_confirm(CRef, MsgIds, ActionTaken, State) -> update_pending_confirms( fun (MsgOnDiskFun, CTM) -> MsgOnDiskFun(MsgIds, ActionTaken), CTM end, diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index 378a8671c4..9cf3af8716 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -2355,11 +2355,18 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, unconfirmed = UC, confirmed = C }) -> State #vqstate { - msgs_on_disk = sets:subtract(MOD, MsgIdSet), - msg_indices_on_disk = sets:subtract(MIOD, MsgIdSet), - unconfirmed = sets:subtract(UC, MsgIdSet), + msgs_on_disk = sets_subtract(MOD, MsgIdSet), + msg_indices_on_disk = sets_subtract(MIOD, MsgIdSet), + unconfirmed = sets_subtract(UC, MsgIdSet), confirmed = sets:union(C, MsgIdSet) }. +%% Function defined in both rabbit_msg_store and rabbit_variable_queue. +sets_subtract(Set1, Set2) -> + case sets:size(Set2) of + 1 -> sets:del_element(hd(sets:to_list(Set2)), Set1); + _ -> sets:subtract(Set1, Set2) + end. + msgs_written_to_disk(Callback, MsgIdSet, ignored) -> Callback(?MODULE, fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);