From 6bb649a9df4638a712e1fe12286a02807d0a6560 Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Thu, 14 Aug 2025 15:24:02 +0200 Subject: [PATCH] Local shovels: single acks For some reason, multiple acknowledgments are really slow when using credit flow v2 --- .../src/rabbit_local_shovel.erl | 35 ++++++++----------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index 73335b6fff..6a5176964d 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -59,12 +59,6 @@ msg_id }). -%% This is a significantly reduced version of its rabbit_amqp_session counterpart. -%% Local shovels always use the maximum credit allowed. --record(credit_req, { - delivery_count :: sequence_no() -}). - parse(_Name, {source, Source}) -> Queue = parse_parameter(queue, fun parse_binary/1, proplists:get_value(queue, Source)), @@ -574,21 +568,23 @@ get_user_vhost_from_amqp_param(Uri) -> settle(Op, DeliveryTag, Multiple, #{source := #{queue_r := QRef, - current := Current = #{queue_states := QState0, - consumer_tag := CTag, + current := Current = #{consumer_tag := CTag, unacked_message_q := UAMQ0} } = Src} = State0) -> {MsgIds, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple), - case rabbit_queue_type:settle(QRef, Op, CTag, MsgIds, QState0) of - {ok, QState1, Actions} -> - State = State0#{source => Src#{current => Current#{queue_states => QState1, - unacked_message_q => UAMQ}}}, - handle_queue_actions(Actions, State); - {'protocol_error', Type, Reason, Args} -> - ?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp", - [Op, Type, io_lib:format(Reason, Args)]), - exit({shutdown, {ack_failed, Reason}}) - end. + State = State0#{source => Src#{current => Current#{unacked_message_q => UAMQ}}}, + lists:foldl( + fun(MsgId, #{source := Src0 = #{current := Current0 = #{queue_states := QState0}}} = St0) -> + case rabbit_queue_type:settle(QRef, Op, CTag, [MsgId], QState0) of + {ok, QState1, Actions} -> + St = St0#{source => Src0#{current => Current0#{queue_states => QState1}}}, + handle_queue_actions(Actions, St); + {'protocol_error', Type, Reason, Args} -> + ?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp", + [Op, Type, io_lib:format(Reason, Args)]), + exit({shutdown, {ack_failed, Reason}}) + end + end, State, MsgIds). %% From rabbit_channel %% Records a client-sent acknowledgement. Handles both single delivery acks @@ -661,8 +657,7 @@ maybe_grant_credit(#{source := #{queue_r := QName, current := #{consumer_tag := CTag, queue_states := QState0, unacked_message_q := Q} = Current - } = Src, - dest := #{unacked := Unacked}} = State0) -> + } = Src} = State0) -> GrantLinkCredit = grant_link_credit(Credit, MaxLinkCredit, ?QUEUE:len(Q)), {ok, QState, Actions} = case (GrantLinkCredit and not HaveCreditReqInFlight) of true ->