From 6e2e19591ac9ecdf4615cb4805a837f84453f31d Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Thu, 28 Aug 2025 12:29:20 +0200 Subject: [PATCH] Local shovels: fix handling of acks/nacks --- .../src/rabbit_local_shovel.erl | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index 8ba238675d..f0b673ac0a 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -591,22 +591,20 @@ get_user_vhost_from_amqp_param(Uri) -> settle(Op, DeliveryTag, Multiple, #{source := #{queue_r := QRef, current := Current = #{consumer_tag := CTag, - unacked_message_q := UAMQ0} + unacked_message_q := UAMQ0, + queue_states := QState0} } = Src} = State0) -> {MsgIds, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple), - State = State0#{source => Src#{current => Current#{unacked_message_q => UAMQ}}}, - lists:foldl( - fun(MsgId, #{source := Src0 = #{current := Current0 = #{queue_states := QState0}}} = St0) -> - case rabbit_queue_type:settle(QRef, Op, CTag, [MsgId], QState0) of - {ok, QState1, Actions} -> - St = St0#{source => Src0#{current => Current0#{queue_states => QState1}}}, - handle_queue_actions(Actions, St); - {'protocol_error', Type, Reason, Args} -> - ?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp", - [Op, Type, io_lib:format(Reason, Args)]), - exit({shutdown, {ack_failed, Reason}}) - end - end, State, MsgIds). + case rabbit_queue_type:settle(QRef, Op, CTag, lists:reverse(MsgIds), QState0) of + {ok, QState1, Actions} -> + State = State0#{source => Src#{current => Current#{queue_states => QState1, + unacked_message_q => UAMQ}}}, + handle_queue_actions(Actions, State); + {'protocol_error', Type, Reason, Args} -> + ?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp", + [Op, Type, io_lib:format(Reason, Args)]), + exit({shutdown, {ack_failed, Reason}}) + end. %% From rabbit_channel %% Records a client-sent acknowledgement. Handles both single delivery acks