Local shovels: slow down publishing when all messages are being rejected
This commit is contained in:
parent
1fc7390fc6
commit
6dc72fdee5
|
@ -185,7 +185,10 @@ init_source(State = #{source := #{queue_r := QName,
|
||||||
delivery_count => ?INITIAL_DELIVERY_COUNT,
|
delivery_count => ?INITIAL_DELIVERY_COUNT,
|
||||||
max_link_credit => MaxLinkCredit,
|
max_link_credit => MaxLinkCredit,
|
||||||
credit => MaxLinkCredit,
|
credit => MaxLinkCredit,
|
||||||
at_least_one_credit_req_in_flight => true}},
|
at_least_one_credit_req_in_flight => true,
|
||||||
|
complete => 0,
|
||||||
|
requeue => 0,
|
||||||
|
last_settlement_count => {0, 0}}},
|
||||||
handle_queue_actions(Actions, State2);
|
handle_queue_actions(Actions, State2);
|
||||||
{0, {error, autodelete}} ->
|
{0, {error, autodelete}} ->
|
||||||
exit({shutdown, autodelete});
|
exit({shutdown, autodelete});
|
||||||
|
@ -600,7 +603,7 @@ settle(Op, DeliveryTag, Multiple,
|
||||||
case rabbit_queue_type:settle(QRef, Op, CTag, [MsgId], QState0) of
|
case rabbit_queue_type:settle(QRef, Op, CTag, [MsgId], QState0) of
|
||||||
{ok, QState1, Actions} ->
|
{ok, QState1, Actions} ->
|
||||||
St = St0#{source => Src0#{current => Current0#{queue_states => QState1}}},
|
St = St0#{source => Src0#{current => Current0#{queue_states => QState1}}},
|
||||||
handle_queue_actions(Actions, St);
|
handle_queue_actions(Actions, increase_settled(St, Op));
|
||||||
{'protocol_error', Type, Reason, Args} ->
|
{'protocol_error', Type, Reason, Args} ->
|
||||||
?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp",
|
?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp",
|
||||||
[Op, Type, io_lib:format(Reason, Args)]),
|
[Op, Type, io_lib:format(Reason, Args)]),
|
||||||
|
@ -716,17 +719,22 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
|
||||||
delivery_count := QDeliveryCount,
|
delivery_count := QDeliveryCount,
|
||||||
queue_r := QName,
|
queue_r := QName,
|
||||||
current := Current = #{queue_states := QState0,
|
current := Current = #{queue_states := QState0,
|
||||||
unacked_message_q := Q}
|
unacked_message_q := Q},
|
||||||
|
complete := Complete,
|
||||||
|
requeue := Requeue,
|
||||||
|
last_settlement_count := Last
|
||||||
} = Src} = State0) ->
|
} = Src} = State0) ->
|
||||||
%% Assertion: Our (receiver) delivery-count should be always
|
%% Assertion: Our (receiver) delivery-count should be always
|
||||||
%% in sync with the delivery-count of the sending queue.
|
%% in sync with the delivery-count of the sending queue.
|
||||||
QDeliveryCount = DeliveryCount,
|
QDeliveryCount = DeliveryCount,
|
||||||
case grant_link_credit(CCredit, MaxLinkCredit, ?QUEUE:len(Q)) of
|
case grant_link_credit(CCredit, MaxLinkCredit, ?QUEUE:len(Q)) of
|
||||||
true ->
|
true ->
|
||||||
|
MaxCredit = cap_credit(MaxLinkCredit, Complete, Requeue, Last),
|
||||||
{ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, QDeliveryCount,
|
{ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, QDeliveryCount,
|
||||||
MaxLinkCredit, false, QState0),
|
MaxCredit, false, QState0),
|
||||||
State = State0#{source => Src#{credit => MaxLinkCredit,
|
State = State0#{source => Src#{credit => MaxLinkCredit,
|
||||||
at_least_one_credit_req_in_flight => true,
|
at_least_one_credit_req_in_flight => true,
|
||||||
|
last_settlement_count => {Complete, Requeue},
|
||||||
current => Current#{queue_states => QState}}},
|
current => Current#{queue_states => QState}}},
|
||||||
handle_queue_actions(Actions, State);
|
handle_queue_actions(Actions, State);
|
||||||
false ->
|
false ->
|
||||||
|
@ -738,3 +746,14 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
|
||||||
State0#{source => Src#{credit => Credit,
|
State0#{source => Src#{credit => Credit,
|
||||||
at_least_one_credit_req_in_flight => false}}
|
at_least_one_credit_req_in_flight => false}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
increase_settled(State = #{source := Src = #{complete := Complete}}, complete) ->
|
||||||
|
State#{source => Src#{complete => Complete + 1}};
|
||||||
|
increase_settled(State = #{source := Src = #{requeue := Requeue}}, requeue) ->
|
||||||
|
State#{source => Src#{requeue => Requeue + 1}}.
|
||||||
|
|
||||||
|
cap_credit(MaxLinkCredit, Complete, Requeue, {Complete, LastRequeue})
|
||||||
|
when (Requeue - LastRequeue) > (MaxLinkCredit div 2) ->
|
||||||
|
MaxLinkCredit div 2;
|
||||||
|
cap_credit(MaxLinkCredit, _, _, _) ->
|
||||||
|
MaxLinkCredit.
|
||||||
|
|
Loading…
Reference in New Issue