diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index 8644b8c61f..73335b6fff 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -186,8 +186,7 @@ init_source(State = #{source := #{queue_r := QName, delivery_count => ?INITIAL_DELIVERY_COUNT, max_link_credit => MaxLinkCredit, credit => MaxLinkCredit, - at_least_one_credit_req_in_flight => true, - stashed_credit_req => none}}, + at_least_one_credit_req_in_flight => true}}, handle_queue_actions(Actions, State2); {0, {error, autodelete}} -> exit({shutdown, autodelete}); @@ -323,10 +322,10 @@ handle_dest(_Msg, State) -> State. ack(DeliveryTag, Multiple, State) -> - maybe_grant_or_stash_credit(settle(complete, DeliveryTag, Multiple, State)). + maybe_grant_credit(settle(complete, DeliveryTag, Multiple, State)). nack(DeliveryTag, Multiple, State) -> - maybe_grant_or_stash_credit(settle(discard, DeliveryTag, Multiple, State)). + maybe_grant_credit(settle(discard, DeliveryTag, Multiple, State)). forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current, unacked := Unacked} = Dest, @@ -426,7 +425,7 @@ handle_queue_actions(Actions, State) -> end, State, Actions). handle_deliver(AckRequired, Msgs, State) when is_list(Msgs) -> - maybe_grant_or_stash_credit( + maybe_grant_credit( lists:foldl( fun({_QName, _QPid, MsgId, _Redelivered, Mc}, S0) -> DeliveryTag = next_tag(S0), @@ -654,25 +653,17 @@ sent_delivery(#{source := #{delivery_count := DeliveryCount0, delivery_count => DeliveryCount }}. -maybe_grant_or_stash_credit(#{source := #{queue_r := QName, - credit := Credit, - max_link_credit := MaxLinkCredit, - delivery_count := DeliveryCount, - at_least_one_credit_req_in_flight := HaveCreditReqInFlight, - current := #{consumer_tag := CTag, - queue_states := QState0} = Current - } = Src, - dest := #{unacked := Unacked}} = State0) -> - GrantLinkCredit = grant_link_credit(Credit, MaxLinkCredit, maps:size(Unacked)), - Src1 = case HaveCreditReqInFlight andalso GrantLinkCredit of - true -> - Req = #credit_req { - delivery_count = DeliveryCount - }, - maps:put(stashed_credit_req, Req, Src); - false -> - Src - end, +maybe_grant_credit(#{source := #{queue_r := QName, + credit := Credit, + max_link_credit := MaxLinkCredit, + delivery_count := DeliveryCount, + at_least_one_credit_req_in_flight := HaveCreditReqInFlight, + current := #{consumer_tag := CTag, + queue_states := QState0, + unacked_message_q := Q} = Current + } = Src, + dest := #{unacked := Unacked}} = State0) -> + GrantLinkCredit = grant_link_credit(Credit, MaxLinkCredit, ?QUEUE:len(Q)), {ok, QState, Actions} = case (GrantLinkCredit and not HaveCreditReqInFlight) of true -> rabbit_queue_type:credit( @@ -685,9 +676,9 @@ maybe_grant_or_stash_credit(#{source := #{queue_r := QName, true -> true; false -> HaveCreditReqInFlight end, - State = State0#{source => Src1#{current => Current#{queue_states => QState}, - at_least_one_credit_req_in_flight => CreditReqInFlight - }}, + State = State0#{source => Src#{current => Current#{queue_states => QState}, + at_least_one_credit_req_in_flight => CreditReqInFlight + }}, handle_queue_actions(Actions, State). max_link_credit() -> @@ -702,30 +693,22 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra #{source := #{credit := CCredit, max_link_credit := MaxLinkCredit, delivery_count := QDeliveryCount, - stashed_credit_req := StashedCreditReq, queue_r := QName, - current := Current = #{queue_states := QState0} + current := Current = #{queue_states := QState0, + unacked_message_q := Q} } = Src} = State0) -> %% Assertion: Our (receiver) delivery-count should be always %% in sync with the delivery-count of the sending queue. QDeliveryCount = DeliveryCount, - case StashedCreditReq of - #credit_req{delivery_count = StashedDeliveryCount} -> - {ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, StashedDeliveryCount, - MaxLinkCredit, false, QState0), - State = State0#{source => Src#{credit => MaxLinkCredit, - at_least_one_credit_req_in_flight => true, - stashed_credit_req => none, - current => Current#{queue_states => QState}}}, - handle_queue_actions(Actions, State); - none when Credit =:= 0 andalso - CCredit >= 0 -> - {ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, DeliveryCount, MaxLinkCredit, false, QState0), + case grant_link_credit(CCredit, MaxLinkCredit, ?QUEUE:len(Q)) of + true -> + {ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, QDeliveryCount, + MaxLinkCredit, false, QState0), State = State0#{source => Src#{credit => MaxLinkCredit, at_least_one_credit_req_in_flight => true, current => Current#{queue_states => QState}}}, handle_queue_actions(Actions, State); - none -> + false -> %% Although we (the receiver) usually determine link credit, we set here %% our link credit to what the queue says our link credit is (which is safer %% in case credit requests got applied out of order in quorum queues).