Local shovels: fix handling of acks/nacks
This commit is contained in:
parent
7f1febe70b
commit
6e2e19591a
|
@ -591,22 +591,20 @@ get_user_vhost_from_amqp_param(Uri) ->
|
|||
settle(Op, DeliveryTag, Multiple,
|
||||
#{source := #{queue_r := QRef,
|
||||
current := Current = #{consumer_tag := CTag,
|
||||
unacked_message_q := UAMQ0}
|
||||
unacked_message_q := UAMQ0,
|
||||
queue_states := QState0}
|
||||
} = Src} = State0) ->
|
||||
{MsgIds, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple),
|
||||
State = State0#{source => Src#{current => Current#{unacked_message_q => UAMQ}}},
|
||||
lists:foldl(
|
||||
fun(MsgId, #{source := Src0 = #{current := Current0 = #{queue_states := QState0}}} = St0) ->
|
||||
case rabbit_queue_type:settle(QRef, Op, CTag, [MsgId], QState0) of
|
||||
case rabbit_queue_type:settle(QRef, Op, CTag, lists:reverse(MsgIds), QState0) of
|
||||
{ok, QState1, Actions} ->
|
||||
St = St0#{source => Src0#{current => Current0#{queue_states => QState1}}},
|
||||
handle_queue_actions(Actions, St);
|
||||
State = State0#{source => Src#{current => Current#{queue_states => QState1,
|
||||
unacked_message_q => UAMQ}}},
|
||||
handle_queue_actions(Actions, State);
|
||||
{'protocol_error', Type, Reason, Args} ->
|
||||
?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp",
|
||||
[Op, Type, io_lib:format(Reason, Args)]),
|
||||
exit({shutdown, {ack_failed, Reason}})
|
||||
end
|
||||
end, State, MsgIds).
|
||||
end.
|
||||
|
||||
%% From rabbit_channel
|
||||
%% Records a client-sent acknowledgement. Handles both single delivery acks
|
||||
|
|
Loading…
Reference in New Issue