parent
92cd4bc4f2
commit
40172b86a6
|
|
@ -565,11 +565,32 @@ fetch(AckRequired, State) ->
|
|||
{Res, a(State3)}
|
||||
end.
|
||||
|
||||
ack([], State) ->
|
||||
{[], State};
|
||||
ack(AckTags, State) ->
|
||||
{MsgIds, State1} = ack(fun msg_store_remove/3,
|
||||
fun (_, State0) -> State0 end,
|
||||
AckTags, State),
|
||||
{MsgIds, a(State1)}.
|
||||
{{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
|
||||
State1 = #vqstate { index_state = IndexState,
|
||||
msg_store_clients = MSCState,
|
||||
persistent_count = PCount,
|
||||
ack_out_counter = AckOutCount }} =
|
||||
lists:foldl(
|
||||
fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA,
|
||||
ram_ack_index = RAI }}) ->
|
||||
AckEntry = dict:fetch(SeqId, PA),
|
||||
{accumulate_ack(SeqId, AckEntry, Acc),
|
||||
State2 #vqstate {
|
||||
pending_ack = dict:erase(SeqId, PA),
|
||||
ram_ack_index = gb_trees:delete_any(SeqId, RAI)}}
|
||||
end, {accumulate_ack_init(), State}, AckTags),
|
||||
IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
|
||||
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|
||||
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
|
||||
PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
|
||||
orddict:new(), MsgIdsByStore)),
|
||||
{lists:reverse(AllMsgIds),
|
||||
a(State1 #vqstate { index_state = IndexState1,
|
||||
persistent_count = PCount1,
|
||||
ack_out_counter = AckOutCount + length(AckTags) })}.
|
||||
|
||||
requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta,
|
||||
q3 = Q3,
|
||||
|
|
@ -1216,34 +1237,6 @@ remove_pending_ack(KeepPersistent,
|
|||
State1 #vqstate { index_state = IndexState1 }
|
||||
end.
|
||||
|
||||
ack(_MsgStoreFun, _Fun, [], State) ->
|
||||
{[], State};
|
||||
ack(MsgStoreFun, Fun, AckTags, State) ->
|
||||
{{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
|
||||
State1 = #vqstate { index_state = IndexState,
|
||||
msg_store_clients = MSCState,
|
||||
persistent_count = PCount,
|
||||
ack_out_counter = AckOutCount }} =
|
||||
lists:foldl(
|
||||
fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA,
|
||||
ram_ack_index = RAI }}) ->
|
||||
AckEntry = dict:fetch(SeqId, PA),
|
||||
{accumulate_ack(SeqId, AckEntry, Acc),
|
||||
Fun(AckEntry, State2 #vqstate {
|
||||
pending_ack = dict:erase(SeqId, PA),
|
||||
ram_ack_index =
|
||||
gb_trees:delete_any(SeqId, RAI)})}
|
||||
end, {accumulate_ack_init(), State}, AckTags),
|
||||
IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
|
||||
[ok = MsgStoreFun(MSCState, IsPersistent, MsgIds)
|
||||
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
|
||||
PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
|
||||
orddict:new(), MsgIdsByStore)),
|
||||
{lists:reverse(AllMsgIds),
|
||||
State1 #vqstate { index_state = IndexState1,
|
||||
persistent_count = PCount1,
|
||||
ack_out_counter = AckOutCount + length(AckTags) }}.
|
||||
|
||||
accumulate_ack_init() -> {[], orddict:new(), []}.
|
||||
|
||||
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
|
||||
|
|
|
|||
Loading…
Reference in New Issue