Only ack messages when necessary
This commit is contained in:
parent
0741dfc702
commit
306c4035ee
|
|
@ -1169,7 +1169,7 @@ remove_pending_ack(KeepPersistent,
|
|||
State = #vqstate { pending_ack = PA,
|
||||
index_state = IndexState,
|
||||
msg_store_clients = MSCState }) ->
|
||||
{MsgIdsByStore, _AllMsgIds} =
|
||||
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
|
||||
dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
|
||||
State1 = State #vqstate { pending_ack = dict:new(),
|
||||
ram_ack_index = gb_trees:empty() },
|
||||
|
|
@ -1181,7 +1181,7 @@ remove_pending_ack(KeepPersistent,
|
|||
State1
|
||||
end;
|
||||
false -> IndexState1 =
|
||||
rabbit_queue_index:ack(dict:fetch_keys(PA), IndexState),
|
||||
rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
|
||||
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|
||||
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
|
||||
State1 #vqstate { index_state = IndexState1 }
|
||||
|
|
@ -1190,7 +1190,7 @@ remove_pending_ack(KeepPersistent,
|
|||
ack(_MsgStoreFun, _Fun, [], State) ->
|
||||
{[], State};
|
||||
ack(MsgStoreFun, Fun, AckTags, State) ->
|
||||
{{MsgIdsByStore, AllMsgIds},
|
||||
{{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
|
||||
State1 = #vqstate { index_state = IndexState,
|
||||
msg_store_clients = MSCState,
|
||||
persistent_count = PCount,
|
||||
|
|
@ -1205,7 +1205,7 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
|
|||
ram_ack_index =
|
||||
gb_trees:delete_any(SeqId, RAI)})}
|
||||
end, {accumulate_ack_init(), State}, AckTags),
|
||||
IndexState1 = rabbit_queue_index:ack(AckTags, IndexState),
|
||||
IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
|
||||
[ok = MsgStoreFun(MSCState, IsPersistent, MsgIds)
|
||||
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
|
||||
PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
|
||||
|
|
@ -1215,17 +1215,18 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
|
|||
persistent_count = PCount1,
|
||||
ack_out_counter = AckOutCount + length(AckTags) }}.
|
||||
|
||||
accumulate_ack_init() -> {orddict:new(), []}.
|
||||
accumulate_ack_init() -> {[], orddict:new(), []}.
|
||||
|
||||
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
|
||||
msg_on_disk = false,
|
||||
index_on_disk = false,
|
||||
msg_id = MsgId },
|
||||
{MsgIdsByStore, AllMsgIds}) ->
|
||||
{MsgIdsByStore, [MsgId | AllMsgIds]};
|
||||
accumulate_ack(_SeqId, {IsPersistent, MsgId, _MsgProps, _IndexOnDisk},
|
||||
{MsgIdsByStore, AllMsgIds}) ->
|
||||
{rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore),
|
||||
{IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
|
||||
{IndexOnDiskSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]};
|
||||
accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps, IndexOnDisk},
|
||||
{IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
|
||||
{cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
|
||||
rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore),
|
||||
[MsgId | AllMsgIds]}.
|
||||
|
||||
find_persistent_count(LensByStore) ->
|
||||
|
|
|
|||
Loading…
Reference in New Issue