Local shovels: single acks

For some reason, multiple acknowledgments are really slow when using credit flow v2
This commit is contained in:
Diana Parra Corbacho 2025-08-14 15:24:02 +02:00
parent 382fac3e34
commit 6bb649a9df
1 changed files with 15 additions and 20 deletions

View File

@ -59,12 +59,6 @@
msg_id msg_id
}). }).
%% This is a significantly reduced version of its rabbit_amqp_session counterpart.
%% Local shovels always use the maximum credit allowed.
-record(credit_req, {
delivery_count :: sequence_no()
}).
parse(_Name, {source, Source}) -> parse(_Name, {source, Source}) ->
Queue = parse_parameter(queue, fun parse_binary/1, Queue = parse_parameter(queue, fun parse_binary/1,
proplists:get_value(queue, Source)), proplists:get_value(queue, Source)),
@ -574,21 +568,23 @@ get_user_vhost_from_amqp_param(Uri) ->
settle(Op, DeliveryTag, Multiple, settle(Op, DeliveryTag, Multiple,
#{source := #{queue_r := QRef, #{source := #{queue_r := QRef,
current := Current = #{queue_states := QState0, current := Current = #{consumer_tag := CTag,
consumer_tag := CTag,
unacked_message_q := UAMQ0} unacked_message_q := UAMQ0}
} = Src} = State0) -> } = Src} = State0) ->
{MsgIds, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple), {MsgIds, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple),
case rabbit_queue_type:settle(QRef, Op, CTag, MsgIds, QState0) of State = State0#{source => Src#{current => Current#{unacked_message_q => UAMQ}}},
{ok, QState1, Actions} -> lists:foldl(
State = State0#{source => Src#{current => Current#{queue_states => QState1, fun(MsgId, #{source := Src0 = #{current := Current0 = #{queue_states := QState0}}} = St0) ->
unacked_message_q => UAMQ}}}, case rabbit_queue_type:settle(QRef, Op, CTag, [MsgId], QState0) of
handle_queue_actions(Actions, State); {ok, QState1, Actions} ->
{'protocol_error', Type, Reason, Args} -> St = St0#{source => Src0#{current => Current0#{queue_states => QState1}}},
?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp", handle_queue_actions(Actions, St);
[Op, Type, io_lib:format(Reason, Args)]), {'protocol_error', Type, Reason, Args} ->
exit({shutdown, {ack_failed, Reason}}) ?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp",
end. [Op, Type, io_lib:format(Reason, Args)]),
exit({shutdown, {ack_failed, Reason}})
end
end, State, MsgIds).
%% From rabbit_channel %% From rabbit_channel
%% Records a client-sent acknowledgement. Handles both single delivery acks %% Records a client-sent acknowledgement. Handles both single delivery acks
@ -661,8 +657,7 @@ maybe_grant_credit(#{source := #{queue_r := QName,
current := #{consumer_tag := CTag, current := #{consumer_tag := CTag,
queue_states := QState0, queue_states := QState0,
unacked_message_q := Q} = Current unacked_message_q := Q} = Current
} = Src, } = Src} = State0) ->
dest := #{unacked := Unacked}} = State0) ->
GrantLinkCredit = grant_link_credit(Credit, MaxLinkCredit, ?QUEUE:len(Q)), GrantLinkCredit = grant_link_credit(Credit, MaxLinkCredit, ?QUEUE:len(Q)),
{ok, QState, Actions} = case (GrantLinkCredit and not HaveCreditReqInFlight) of {ok, QState, Actions} = case (GrantLinkCredit and not HaveCreditReqInFlight) of
true -> true ->