Local shovels: optimisations

This commit is contained in:
Diana Parra Corbacho 2025-08-13 12:59:40 +02:00
parent e9d767b84b
commit 07a085365e
1 changed files with 39 additions and 39 deletions

View File

@ -100,35 +100,44 @@ connect_source(State = #{source := Src = #{resource_decl := {M, F, MFArgs},
<<>> -> MRDQ; <<>> -> MRDQ;
_ -> QName0 _ -> QName0
end, end,
Queue = rabbit_misc:r(VHost, queue, QName),
State#{source => Src#{current => #{queue_states => QState, State#{source => Src#{current => #{queue_states => QState,
next_tag => 1, next_tag => 1,
user => User, user => User,
vhost => VHost, vhost => VHost,
unacked_message_q => ?QUEUE:new()}, unacked_message_q => ?QUEUE:new()},
queue => QName}}. queue => QName,
queue_r => Queue}}.
connect_dest(State = #{dest := Dest = #{resource_decl := {M, F, MFArgs}, connect_dest(State = #{dest := Dest = #{resource_decl := {M, F, MFArgs},
uris := [Uri | _] uris := [Uri | _]},
},
ack_mode := AckMode}) -> ack_mode := AckMode}) ->
%% Shall we get the user from an URI or something else? %% Shall we get the user from an URI or something else?
{User, VHost} = get_user_vhost_from_amqp_param(Uri), {User, VHost} = get_user_vhost_from_amqp_param(Uri),
apply(M, F, MFArgs ++ [VHost, User]), apply(M, F, MFArgs ++ [VHost, User]),
QState = rabbit_queue_type:init(), QState = rabbit_queue_type:init(),
case AckMode of maybe_add_dest_queue(
on_confirm -> case AckMode of
State#{dest => Dest#{current => #{queue_states => QState, on_confirm ->
delivery_id => 1, State#{dest => Dest#{current => #{queue_states => QState,
vhost => VHost}, delivery_id => 1,
unacked => #{}}}; vhost => VHost},
_ -> unacked => #{}}};
State#{dest => Dest#{current => #{queue_states => QState, _ ->
vhost => VHost}, State#{dest => Dest#{current => #{queue_states => QState,
unacked => #{}}} vhost => VHost},
end. unacked => #{}}}
end).
init_source(State = #{source := #{queue := QName0, maybe_add_dest_queue(State = #{dest := Dest = #{queue := QName,
current := #{vhost := VHost}}}) ->
Queue = rabbit_misc:r(VHost, queue, QName),
State#{dest => Dest#{queue_r => Queue}};
maybe_add_dest_queue(State) ->
State.
init_source(State = #{source := #{queue_r := QName,
consumer_args := Args, consumer_args := Args,
current := #{queue_states := QState0, current := #{queue_states := QState0,
vhost := VHost} = Current} = Src, vhost := VHost} = Current} = Src,
@ -142,7 +151,6 @@ init_source(State = #{source := #{queue := QName0,
{credited, credit_api_v1} {credited, credit_api_v1}
end, end,
MaxLinkCredit = max_link_credit(), MaxLinkCredit = max_link_credit(),
QName = rabbit_misc:r(VHost, queue, QName0),
CTag = consumer_tag(Name), CTag = consumer_tag(Name),
case rabbit_amqqueue:with( case rabbit_amqqueue:with(
QName, QName,
@ -176,6 +184,7 @@ init_source(State = #{source := #{queue := QName0,
remaining => Remaining, remaining => Remaining,
remaining_unacked => Remaining, remaining_unacked => Remaining,
delivery_count => ?INITIAL_DELIVERY_COUNT, delivery_count => ?INITIAL_DELIVERY_COUNT,
max_link_credit => MaxLinkCredit,
credit => MaxLinkCredit, credit => MaxLinkCredit,
at_least_one_credit_req_in_flight => true, at_least_one_credit_req_in_flight => true,
stashed_credit_req => none}}, stashed_credit_req => none}},
@ -246,10 +255,8 @@ close_dest(_State) ->
close_source(#{source := #{current := #{queue_states := QStates0, close_source(#{source := #{current := #{queue_states := QStates0,
consumer_tag := CTag, consumer_tag := CTag,
user := User, user := User},
vhost := VHost}, queue_r := QName}}) ->
queue := QName0}}) ->
QName = rabbit_misc:r(VHost, queue, QName0),
case rabbit_amqqueue:with( case rabbit_amqqueue:with(
QName, QName,
fun(Q) -> fun(Q) ->
@ -567,13 +574,12 @@ get_user_vhost_from_amqp_param(Uri) ->
end. end.
settle(Op, DeliveryTag, Multiple, settle(Op, DeliveryTag, Multiple,
#{source := #{queue := Queue, #{source := #{queue_r := QRef,
current := Current = #{queue_states := QState0, current := Current = #{queue_states := QState0,
consumer_tag := CTag, consumer_tag := CTag,
unacked_message_q := UAMQ0, unacked_message_q := UAMQ0}
vhost := VHost}} = Src} = State0) -> } = Src} = State0) ->
{Acked, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple), {Acked, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple),
QRef = rabbit_misc:r(VHost, queue, Queue),
MsgIds = [Ack#pending_ack.msg_id || Ack <- Acked], MsgIds = [Ack#pending_ack.msg_id || Ack <- Acked],
case rabbit_queue_type:settle(QRef, Op, CTag, MsgIds, QState0) of case rabbit_queue_type:settle(QRef, Op, CTag, MsgIds, QState0) of
{ok, QState1, Actions} -> {ok, QState1, Actions} ->
@ -622,10 +628,9 @@ collect_acks(AcknowledgedAcc, RemainingAcc, UAMQ, DeliveryTag, Multiple) ->
{AcknowledgedAcc, UAMQTail} {AcknowledgedAcc, UAMQTail}
end. end.
route(_Msg, #{queue := Queue, route(_Msg, #{queue_r := QueueR,
current := #{vhost := VHost}}) when Queue =/= none -> queue := Queue}) when Queue =/= none ->
QName = rabbit_misc:r(VHost, queue, Queue), [QueueR];
[QName];
route(Msg, #{current := #{vhost := VHost}}) -> route(Msg, #{current := #{vhost := VHost}}) ->
ExchangeName = rabbit_misc:r(VHost, exchange, mc:exchange(Msg)), ExchangeName = rabbit_misc:r(VHost, exchange, mc:exchange(Msg)),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName), Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@ -653,16 +658,15 @@ sent_delivery(#{source := #{delivery_count := DeliveryCount0,
delivery_count => DeliveryCount delivery_count => DeliveryCount
}}. }}.
maybe_grant_or_stash_credit(#{source := #{queue := QName0, maybe_grant_or_stash_credit(#{source := #{queue_r := QName,
credit := Credit, credit := Credit,
max_link_credit := MaxLinkCredit,
delivery_count := DeliveryCount, delivery_count := DeliveryCount,
at_least_one_credit_req_in_flight := HaveCreditReqInFlight, at_least_one_credit_req_in_flight := HaveCreditReqInFlight,
current := #{consumer_tag := CTag, current := #{consumer_tag := CTag,
vhost := VHost,
queue_states := QState0} = Current queue_states := QState0} = Current
} = Src, } = Src,
dest := #{unacked := Unacked}} = State0) -> dest := #{unacked := Unacked}} = State0) ->
MaxLinkCredit = max_link_credit(),
GrantLinkCredit = grant_link_credit(Credit, MaxLinkCredit, maps:size(Unacked)), GrantLinkCredit = grant_link_credit(Credit, MaxLinkCredit, maps:size(Unacked)),
Src1 = case HaveCreditReqInFlight andalso GrantLinkCredit of Src1 = case HaveCreditReqInFlight andalso GrantLinkCredit of
true -> true ->
@ -675,7 +679,6 @@ maybe_grant_or_stash_credit(#{source := #{queue := QName0,
end, end,
{ok, QState, Actions} = case (GrantLinkCredit and not HaveCreditReqInFlight) of {ok, QState, Actions} = case (GrantLinkCredit and not HaveCreditReqInFlight) of
true -> true ->
QName = rabbit_misc:r(VHost, queue, QName0),
rabbit_queue_type:credit( rabbit_queue_type:credit(
QName, CTag, DeliveryCount, MaxLinkCredit, QName, CTag, DeliveryCount, MaxLinkCredit,
false, QState0); false, QState0);
@ -701,18 +704,17 @@ grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) ->
%% Drain is ignored because local shovels do not use it. %% Drain is ignored because local shovels do not use it.
handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Drain}, handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Drain},
#{source := #{credit := CCredit, #{source := #{credit := CCredit,
max_link_credit := MaxLinkCredit,
delivery_count := QDeliveryCount, delivery_count := QDeliveryCount,
stashed_credit_req := StashedCreditReq, stashed_credit_req := StashedCreditReq,
queue := QName0, queue_r := QName,
current := Current = #{queue_states := QState0, current := Current = #{queue_states := QState0}
vhost := VHost}} = Src} = State0) -> } = Src} = State0) ->
%% Assertion: Our (receiver) delivery-count should be always %% Assertion: Our (receiver) delivery-count should be always
%% in sync with the delivery-count of the sending queue. %% in sync with the delivery-count of the sending queue.
QDeliveryCount = DeliveryCount, QDeliveryCount = DeliveryCount,
case StashedCreditReq of case StashedCreditReq of
#credit_req{delivery_count = StashedDeliveryCount} -> #credit_req{delivery_count = StashedDeliveryCount} ->
MaxLinkCredit = max_link_credit(),
QName = rabbit_misc:r(VHost, queue, QName0),
{ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, StashedDeliveryCount, {ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, StashedDeliveryCount,
MaxLinkCredit, false, QState0), MaxLinkCredit, false, QState0),
State = State0#{source => Src#{credit => MaxLinkCredit, State = State0#{source => Src#{credit => MaxLinkCredit,
@ -722,8 +724,6 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
handle_queue_actions(Actions, State); handle_queue_actions(Actions, State);
none when Credit =:= 0 andalso none when Credit =:= 0 andalso
CCredit > 0 -> CCredit > 0 ->
MaxLinkCredit = max_link_credit(),
QName = rabbit_misc:r(VHost, queue, QName0),
{ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, DeliveryCount, MaxLinkCredit, false, QState0), {ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, DeliveryCount, MaxLinkCredit, false, QState0),
State = State0#{source => Src#{credit => MaxLinkCredit, State = State0#{source => Src#{credit => MaxLinkCredit,
at_least_one_credit_req_in_flight => true, at_least_one_credit_req_in_flight => true,