Local shovels: optimisation
This commit is contained in:
parent
edf0e3c1ff
commit
02fcbc0dc5
|
@ -579,8 +579,7 @@ settle(Op, DeliveryTag, Multiple,
|
|||
consumer_tag := CTag,
|
||||
unacked_message_q := UAMQ0}
|
||||
} = Src} = State0) ->
|
||||
{Acked, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple),
|
||||
MsgIds = [Ack#pending_ack.msg_id || Ack <- Acked],
|
||||
{MsgIds, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple),
|
||||
case rabbit_queue_type:settle(QRef, Op, CTag, MsgIds, QState0) of
|
||||
{ok, QState1, Actions} ->
|
||||
State = State0#{source => Src#{current => Current#{queue_states => QState1,
|
||||
|
@ -603,10 +602,11 @@ collect_acks(UAMQ, DeliveryTag, Multiple) ->
|
|||
|
||||
collect_acks(AcknowledgedAcc, RemainingAcc, UAMQ, DeliveryTag, Multiple) ->
|
||||
case ?QUEUE:out(UAMQ) of
|
||||
{{value, UnackedMsg = #pending_ack{delivery_tag = CurrentDT}},
|
||||
{{value, UnackedMsg = #pending_ack{delivery_tag = CurrentDT,
|
||||
msg_id = Id}},
|
||||
UAMQTail} ->
|
||||
if CurrentDT == DeliveryTag ->
|
||||
{[UnackedMsg | AcknowledgedAcc],
|
||||
{[Id | AcknowledgedAcc],
|
||||
case RemainingAcc of
|
||||
[] -> UAMQTail;
|
||||
_ -> ?QUEUE:join(
|
||||
|
@ -614,7 +614,7 @@ collect_acks(AcknowledgedAcc, RemainingAcc, UAMQ, DeliveryTag, Multiple) ->
|
|||
UAMQTail)
|
||||
end};
|
||||
Multiple ->
|
||||
collect_acks([UnackedMsg | AcknowledgedAcc], RemainingAcc,
|
||||
collect_acks([Id | AcknowledgedAcc], RemainingAcc,
|
||||
UAMQTail, DeliveryTag, Multiple);
|
||||
true ->
|
||||
collect_acks(AcknowledgedAcc, [UnackedMsg | RemainingAcc],
|
||||
|
|
Loading…
Reference in New Issue