Local shovels: remove stashed credit request

This commit is contained in:
Diana Parra Corbacho 2025-08-14 12:39:26 +02:00
parent 3349321c58
commit 382fac3e34
1 changed files with 25 additions and 42 deletions

View File

@ -186,8 +186,7 @@ init_source(State = #{source := #{queue_r := QName,
delivery_count => ?INITIAL_DELIVERY_COUNT, delivery_count => ?INITIAL_DELIVERY_COUNT,
max_link_credit => MaxLinkCredit, max_link_credit => MaxLinkCredit,
credit => MaxLinkCredit, credit => MaxLinkCredit,
at_least_one_credit_req_in_flight => true, at_least_one_credit_req_in_flight => true}},
stashed_credit_req => none}},
handle_queue_actions(Actions, State2); handle_queue_actions(Actions, State2);
{0, {error, autodelete}} -> {0, {error, autodelete}} ->
exit({shutdown, autodelete}); exit({shutdown, autodelete});
@ -323,10 +322,10 @@ handle_dest(_Msg, State) ->
State. State.
ack(DeliveryTag, Multiple, State) -> ack(DeliveryTag, Multiple, State) ->
maybe_grant_or_stash_credit(settle(complete, DeliveryTag, Multiple, State)). maybe_grant_credit(settle(complete, DeliveryTag, Multiple, State)).
nack(DeliveryTag, Multiple, State) -> nack(DeliveryTag, Multiple, State) ->
maybe_grant_or_stash_credit(settle(discard, DeliveryTag, Multiple, State)). maybe_grant_credit(settle(discard, DeliveryTag, Multiple, State)).
forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current, forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current,
unacked := Unacked} = Dest, unacked := Unacked} = Dest,
@ -426,7 +425,7 @@ handle_queue_actions(Actions, State) ->
end, State, Actions). end, State, Actions).
handle_deliver(AckRequired, Msgs, State) when is_list(Msgs) -> handle_deliver(AckRequired, Msgs, State) when is_list(Msgs) ->
maybe_grant_or_stash_credit( maybe_grant_credit(
lists:foldl( lists:foldl(
fun({_QName, _QPid, MsgId, _Redelivered, Mc}, S0) -> fun({_QName, _QPid, MsgId, _Redelivered, Mc}, S0) ->
DeliveryTag = next_tag(S0), DeliveryTag = next_tag(S0),
@ -654,25 +653,17 @@ sent_delivery(#{source := #{delivery_count := DeliveryCount0,
delivery_count => DeliveryCount delivery_count => DeliveryCount
}}. }}.
maybe_grant_or_stash_credit(#{source := #{queue_r := QName, maybe_grant_credit(#{source := #{queue_r := QName,
credit := Credit, credit := Credit,
max_link_credit := MaxLinkCredit, max_link_credit := MaxLinkCredit,
delivery_count := DeliveryCount, delivery_count := DeliveryCount,
at_least_one_credit_req_in_flight := HaveCreditReqInFlight, at_least_one_credit_req_in_flight := HaveCreditReqInFlight,
current := #{consumer_tag := CTag, current := #{consumer_tag := CTag,
queue_states := QState0} = Current queue_states := QState0,
} = Src, unacked_message_q := Q} = Current
dest := #{unacked := Unacked}} = State0) -> } = Src,
GrantLinkCredit = grant_link_credit(Credit, MaxLinkCredit, maps:size(Unacked)), dest := #{unacked := Unacked}} = State0) ->
Src1 = case HaveCreditReqInFlight andalso GrantLinkCredit of GrantLinkCredit = grant_link_credit(Credit, MaxLinkCredit, ?QUEUE:len(Q)),
true ->
Req = #credit_req {
delivery_count = DeliveryCount
},
maps:put(stashed_credit_req, Req, Src);
false ->
Src
end,
{ok, QState, Actions} = case (GrantLinkCredit and not HaveCreditReqInFlight) of {ok, QState, Actions} = case (GrantLinkCredit and not HaveCreditReqInFlight) of
true -> true ->
rabbit_queue_type:credit( rabbit_queue_type:credit(
@ -685,9 +676,9 @@ maybe_grant_or_stash_credit(#{source := #{queue_r := QName,
true -> true; true -> true;
false -> HaveCreditReqInFlight false -> HaveCreditReqInFlight
end, end,
State = State0#{source => Src1#{current => Current#{queue_states => QState}, State = State0#{source => Src#{current => Current#{queue_states => QState},
at_least_one_credit_req_in_flight => CreditReqInFlight at_least_one_credit_req_in_flight => CreditReqInFlight
}}, }},
handle_queue_actions(Actions, State). handle_queue_actions(Actions, State).
max_link_credit() -> max_link_credit() ->
@ -702,30 +693,22 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
#{source := #{credit := CCredit, #{source := #{credit := CCredit,
max_link_credit := MaxLinkCredit, max_link_credit := MaxLinkCredit,
delivery_count := QDeliveryCount, delivery_count := QDeliveryCount,
stashed_credit_req := StashedCreditReq,
queue_r := QName, queue_r := QName,
current := Current = #{queue_states := QState0} current := Current = #{queue_states := QState0,
unacked_message_q := Q}
} = Src} = State0) -> } = Src} = State0) ->
%% Assertion: Our (receiver) delivery-count should be always %% Assertion: Our (receiver) delivery-count should be always
%% in sync with the delivery-count of the sending queue. %% in sync with the delivery-count of the sending queue.
QDeliveryCount = DeliveryCount, QDeliveryCount = DeliveryCount,
case StashedCreditReq of case grant_link_credit(CCredit, MaxLinkCredit, ?QUEUE:len(Q)) of
#credit_req{delivery_count = StashedDeliveryCount} -> true ->
{ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, StashedDeliveryCount, {ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, QDeliveryCount,
MaxLinkCredit, false, QState0), MaxLinkCredit, false, QState0),
State = State0#{source => Src#{credit => MaxLinkCredit,
at_least_one_credit_req_in_flight => true,
stashed_credit_req => none,
current => Current#{queue_states => QState}}},
handle_queue_actions(Actions, State);
none when Credit =:= 0 andalso
CCredit >= 0 ->
{ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, DeliveryCount, MaxLinkCredit, false, QState0),
State = State0#{source => Src#{credit => MaxLinkCredit, State = State0#{source => Src#{credit => MaxLinkCredit,
at_least_one_credit_req_in_flight => true, at_least_one_credit_req_in_flight => true,
current => Current#{queue_states => QState}}}, current => Current#{queue_states => QState}}},
handle_queue_actions(Actions, State); handle_queue_actions(Actions, State);
none -> false ->
%% Although we (the receiver) usually determine link credit, we set here %% Although we (the receiver) usually determine link credit, we set here
%% our link credit to what the queue says our link credit is (which is safer %% our link credit to what the queue says our link credit is (which is safer
%% in case credit requests got applied out of order in quorum queues). %% in case credit requests got applied out of order in quorum queues).