PoC 1: Session grants queue credits

Proof of Concept 1:
Solve internal flow control:
```
AMQP writer proc <--- sesssion proc <--- queue proc
```
by introducing a new API
```
rabbit_queue_type:sent/4
```
which avoids the queue flooding the session proc when the session
proc can't send anymore because it's blocked by either
1. AMQP session flow control (remote-incoming-window), or
2. the writer proc (when client doesn't receive fast enough causing TCP
   backpressure)
This commit is contained in:
David Ansari 2024-05-20 11:39:25 +02:00
parent 66a397d0fa
commit 193c617651
7 changed files with 135 additions and 30 deletions

View File

@ -59,6 +59,7 @@
-define(MANAGEMENT_LINK_CREDIT_RCV, 8). -define(MANAGEMENT_LINK_CREDIT_RCV, 8).
-define(MANAGEMENT_NODE_ADDRESS, <<"/management">>). -define(MANAGEMENT_NODE_ADDRESS, <<"/management">>).
-define(DEFAULT_EXCHANGE_NAME, <<>>). -define(DEFAULT_EXCHANGE_NAME, <<>>).
-define(NOTIFY_SENT_AFTER, 500).
-export([start_link/8, -export([start_link/8,
process_frame/2, process_frame/2,
@ -138,15 +139,6 @@
multi_transfer_msg :: undefined | #multi_transfer_msg{} multi_transfer_msg :: undefined | #multi_transfer_msg{}
}). }).
%% link flow control state
%% §2.6.7
-record(flow_control, {
delivery_count :: sequence_no(),
credit :: non_neg_integer(),
available :: non_neg_integer(),
drain :: boolean()
}).
-record(outgoing_link, { -record(outgoing_link, {
%% Although the source address of a link might be an exchange name and binding key %% Although the source address of a link might be an exchange name and binding key
%% or a topic filter, an outgoing link will always consume from a queue. %% or a topic filter, an outgoing link will always consume from a queue.
@ -158,9 +150,12 @@
credit_api_version :: v1 | v2, credit_api_version :: v1 | v2,
%% When credit API v1 is used, our session process holds the delivery-count %% When credit API v1 is used, our session process holds the delivery-count
delivery_count :: sequence_no() | credit_api_v2, delivery_count :: sequence_no() | credit_api_v2,
%% When credit API v2 is used, both our session process and the queue hold link flow control state. %% This field is decremented for each message we were allowed to send to the writer proc.
client_flow_control :: #flow_control{} | credit_api_v1, %% When this field reaches 0, we notify the queue that we have sent to the writer proc ?NOTIFY_SENT_AFTER
queue_flow_control :: #flow_control{} | credit_api_v1 %% messages so that the queue can sent us more. This is used for RabbitMQ internal flow control betweeen
%% AMQP writer proc <--- our sesssion proc <--- queue proc
%% to ensure that the queue doesn't overload our session proc.
notify_sent_after = ?NOTIFY_SENT_AFTER :: pos_integer()
}). }).
-record(outgoing_unsettled, { -record(outgoing_unsettled, {
@ -636,6 +631,8 @@ handle_stashed_down(#state{stashed_down = QNames,
when QNameBin =:= QNameBinDown -> when QNameBin =:= QNameBinDown ->
Detach = detach(Handle, Link, ?V_1_0_AMQP_ERROR_ILLEGAL_STATE), Detach = detach(Handle, Link, ?V_1_0_AMQP_ERROR_ILLEGAL_STATE),
Frames = [Detach | Frames0], Frames = [Detach | Frames0],
%%TODO also remove any messages belonging to this consumer
%% from outgoing_pending queue?
Links = maps:remove(Handle, Links0), Links = maps:remove(Handle, Links0),
{Frames, Links}; {Frames, Links};
(_, _, Accum) -> (_, _, Accum) ->
@ -743,6 +740,7 @@ destroy_incoming_link(Handle, Link = #incoming_link{queue_name_bin = QNameBin},
destroy_incoming_link(_, _, _, Acc) -> destroy_incoming_link(_, _, _, Acc) ->
Acc. Acc.
%%TODO also remove any messages belonging to this consumer from outgoing_pending queue?
destroy_outgoing_link(Handle, Link = #outgoing_link{queue_name_bin = QNameBin}, QNameBin, {Frames, Unsettled0, Links}) -> destroy_outgoing_link(Handle, Link = #outgoing_link{queue_name_bin = QNameBin}, QNameBin, {Frames, Unsettled0, Links}) ->
{Unsettled, _RemovedMsgIds} = remove_link_from_outgoing_unsettled_map(Handle, Unsettled0), {Unsettled, _RemovedMsgIds} = remove_link_from_outgoing_unsettled_map(Handle, Unsettled0),
{[detach(Handle, Link, ?V_1_0_AMQP_ERROR_RESOURCE_DELETED) | Frames], {[detach(Handle, Link, ?V_1_0_AMQP_ERROR_RESOURCE_DELETED) | Frames],
@ -1000,13 +998,14 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
%% all consumers will use credit API v2. %% all consumers will use credit API v2.
%% Streams always use credit API v2 since the stream client (rabbit_stream_queue) holds the link %% Streams always use credit API v2 since the stream client (rabbit_stream_queue) holds the link
%% flow control state. Hence, credit API mixed version isn't an issue for streams. %% flow control state. Hence, credit API mixed version isn't an issue for streams.
{Mode, {CreditApiVsn,
Mode,
DeliveryCount} = case rabbit_feature_flags:is_enabled(credit_api_v2) orelse DeliveryCount} = case rabbit_feature_flags:is_enabled(credit_api_v2) orelse
QType =:= rabbit_stream_queue of QType =:= rabbit_stream_queue of
true -> true ->
{{credited, ?INITIAL_DELIVERY_COUNT}, credit_api_v2}; {2, {credited, ?INITIAL_DELIVERY_COUNT}, credit_api_v2};
false -> false ->
{{credited, credit_api_v1}, {credit_api_v1, ?INITIAL_DELIVERY_COUNT}} {1, {credited, credit_api_v1}, {credit_api_v1, ?INITIAL_DELIVERY_COUNT}}
end, end,
Spec = #{no_ack => SndSettled, Spec = #{no_ack => SndSettled,
channel_pid => self(), channel_pid => self(),
@ -1040,6 +1039,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
queue_type = QType, queue_type = QType,
send_settled = SndSettled, send_settled = SndSettled,
max_message_size = MaxMessageSize, max_message_size = MaxMessageSize,
credit_api_version = CreditApiVsn,
delivery_count = DeliveryCount}, delivery_count = DeliveryCount},
OutgoingLinks = OutgoingLinks0#{HandleInt => Link}, OutgoingLinks = OutgoingLinks0#{HandleInt => Link},
State1 = State0#state{queue_states = QStates, State1 = State0#state{queue_states = QStates,
@ -1174,6 +1174,8 @@ handle_control(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
%% first detaching and then re-attaching to the same session with the same link handle (the handle %% first detaching and then re-attaching to the same session with the same link handle (the handle
%% becomes available for re-use once a link is closed): This will result in the same consumer tag, %% becomes available for re-use once a link is closed): This will result in the same consumer tag,
%% and we ideally disallow "updating" an AMQP consumer. %% and we ideally disallow "updating" an AMQP consumer.
%%TODO If such an API is not added, we also must return messages in the outgoing_pending queue
%% which haven't made it to the outgoing_unsettled map?
case rabbit_queue_type:cancel(Q, Ctag, undefined, Username, QStates0) of case rabbit_queue_type:cancel(Q, Ctag, undefined, Username, QStates0) of
{ok, QStates1} -> {ok, QStates1} ->
{Unsettled1, MsgIds} = remove_link_from_outgoing_unsettled_map(Ctag, Unsettled0), {Unsettled1, MsgIds} = remove_link_from_outgoing_unsettled_map(Ctag, Unsettled0),
@ -1326,11 +1328,11 @@ send_pending(#state{remote_incoming_window = RemoteIncomingWindow,
false -> false ->
{NewRemoteIncomingWindow, Buf, State1} = {NewRemoteIncomingWindow, Buf, State1} =
send_pending_delivery(Delivery, Buf1, State), send_pending_delivery(Delivery, Buf1, State),
NumTransfersSent = NewRemoteIncomingWindow - RemoteIncomingWindow, NumTransfersSent = RemoteIncomingWindow - NewRemoteIncomingWindow,
State2 = session_flow_control_sent_transfers(NumTransfersSent, State1), State2 = session_flow_control_sent_transfers(NumTransfersSent, State1),
State = State2#state{outgoing_pending = Buf}, State3 = State2#state{outgoing_pending = Buf},
%% Recurse to possibly send FLOW frames. %% Recurse to possibly send FLOW frames.
send_pending(State) send_pending(State3)
end end
end. end.
@ -1366,7 +1368,7 @@ send_pending_delivery(#pending_delivery{
{sent_all, SpaceLeft} -> {sent_all, SpaceLeft} ->
{SpaceLeft, {SpaceLeft,
Buf, Buf,
record_outgoing_unsettled(Pending, State)}; sent_pending_delivery(Pending, State)};
{sent_some, SpaceLeft, Rest} -> {sent_some, SpaceLeft, Rest} ->
{SpaceLeft, {SpaceLeft,
queue:in_r(Pending#pending_delivery{frames = Rest}, Buf), queue:in_r(Pending#pending_delivery{frames = Rest}, Buf),
@ -1407,6 +1409,34 @@ send_fun(WriterPid, Ch) ->
rabbit_amqp_writer:send_command(WriterPid, Ch, Transfer, Sections) rabbit_amqp_writer:send_command(WriterPid, Ch, Transfer, Sections)
end. end.
sent_pending_delivery(
#pending_delivery{outgoing_unsettled = #outgoing_unsettled{consumer_tag = Ctag,
queue_name = QName}
} = Pending,
#state{outgoing_links = OutgoingLinks0,
queue_states = QStates0} = S0) ->
Handle = ctag_to_handle(Ctag),
case OutgoingLinks0 of
#{Handle := Link0 = #outgoing_link{notify_sent_after = N0}} ->
{N, S3} = if N0 =:= 1 ->
{ok, QStates, Actions} = rabbit_queue_type:sent(
QName, Ctag, ?NOTIFY_SENT_AFTER, QStates0),
S1 = S0#state{queue_states = QStates},
S2 = handle_queue_actions(Actions, S1),
{?NOTIFY_SENT_AFTER, S2};
N0 > 1 ->
{N0 - 1, S0}
end,
Link = Link0#outgoing_link{notify_sent_after = N},
OutgoingLinks = OutgoingLinks0#{Handle := Link},
S = S3#state{outgoing_links = OutgoingLinks},
record_outgoing_unsettled(Pending, S);
_ ->
%% TODO make sure we don't get here by removing returning messages in outgoing_pending
%% when detaching a link
exit({no_outgoing_link_handle, Handle})
end.
record_outgoing_unsettled(#pending_delivery{queue_ack_required = true, record_outgoing_unsettled(#pending_delivery{queue_ack_required = true,
delivery_id = DeliveryId, delivery_id = DeliveryId,
outgoing_unsettled = Unsettled}, outgoing_unsettled = Unsettled},

View File

@ -101,7 +101,8 @@ init({Sock, MaxFrame, ReaderPid}) ->
max_frame_size = MaxFrame, max_frame_size = MaxFrame,
reader = ReaderPid, reader = ReaderPid,
pending = [], pending = [],
pending_size = 0}, pending_size = 0,
monitored_sessions = #{}},
process_flag(message_queue_data, off_heap), process_flag(message_queue_data, off_heap),
{ok, State}. {ok, State}.

View File

@ -63,6 +63,7 @@
make_return/2, make_return/2,
make_discard/2, make_discard/2,
make_credit/4, make_credit/4,
make_sent/2,
make_purge/0, make_purge/0,
make_purge_nodes/1, make_purge_nodes/1,
make_update_config/1, make_update_config/1,
@ -99,6 +100,8 @@
credit :: non_neg_integer(), credit :: non_neg_integer(),
delivery_count :: rabbit_queue_type:delivery_count(), delivery_count :: rabbit_queue_type:delivery_count(),
drain :: boolean()}). drain :: boolean()}).
-record(sent, {consumer_id :: consumer_id(),
num :: pos_integer()}).
-record(purge, {}). -record(purge, {}).
-record(purge_nodes, {nodes :: [node()]}). -record(purge_nodes, {nodes :: [node()]}).
-record(update_config, {config :: config()}). -record(update_config, {config :: config()}).
@ -113,6 +116,7 @@
#return{} | #return{} |
#discard{} | #discard{} |
#credit{} | #credit{} |
#sent{} |
#purge{} | #purge{} |
#purge_nodes{} | #purge_nodes{} |
#update_config{} | #update_config{} |
@ -370,6 +374,31 @@ apply(Meta, #credit{credit = LinkCreditRcv, delivery_count = DeliveryCountRcv,
%% credit for unknown consumer - just ignore %% credit for unknown consumer - just ignore
{State0, ok} {State0, ok}
end; end;
apply(Meta, #sent{consumer_id = ConsumerId, num = NumSent},
#?MODULE{consumers = Cons0,
service_queue = ServiceQueue0,
waiting_consumers = Waiting0} = State0) ->
case Cons0 of
#{ConsumerId := #consumer{pause_after = PauseAfter} = Con0} ->
Con = Con0#consumer{pause_after = PauseAfter + NumSent},
ServiceQueue = maybe_queue_consumer(ConsumerId, Con, ServiceQueue0),
State1 = State0#?MODULE{service_queue = ServiceQueue,
consumers = maps:update(ConsumerId, Con, Cons0)},
{_State, ok, _Effects} = checkout(Meta, State0, State1, []);
_ when Waiting0 /= [] ->
case lists:keytake(ConsumerId, 1, Waiting0) of
{value, {_, Con0 = #consumer{pause_after = PauseAfter}}, Waiting} ->
Con = Con0#consumer{pause_after = PauseAfter},
State = State0#?MODULE{waiting_consumers =
[{ConsumerId, Con} | Waiting]},
{State, ok};
false ->
{State0, ok}
end;
_ ->
%% ignore unknown consumer
{State0, ok}
end;
apply(_, #checkout{spec = {dequeue, _}}, apply(_, #checkout{spec = {dequeue, _}},
#?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) -> #?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) ->
{State0, {error, {unsupported, single_active_consumer}}}; {State0, {error, {unsupported, single_active_consumer}}};
@ -2106,7 +2135,10 @@ checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) ->
%% there are consumers waiting to be serviced %% there are consumers waiting to be serviced
%% process consumer checkout %% process consumer checkout
case maps:get(ConsumerId, Cons0) of case maps:get(ConsumerId, Cons0) of
#consumer{credit = 0} -> #consumer{credit = Credit,
pause_after = PauseAfter}
when Credit =:= 0 orelse
PauseAfter =:= 0 ->
%% no credit but was still on queue %% no credit but was still on queue
%% can happen when draining %% can happen when draining
%% recurse without consumer on queue %% recurse without consumer on queue
@ -2121,16 +2153,21 @@ checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) ->
next_msg_id = Next, next_msg_id = Next,
credit = Credit, credit = Credit,
delivery_count = DelCnt0, delivery_count = DelCnt0,
pause_after = PauseAfter0,
cfg = Cfg} = Con0 -> cfg = Cfg} = Con0 ->
Checked = maps:put(Next, ConsumerMsg, Checked0), Checked = maps:put(Next, ConsumerMsg, Checked0),
DelCnt = case credit_api_v2(Cfg) of %% For now, PauseAfter must only be decremented for AMQP 1.0 consumers.
true -> add(DelCnt0, 1); {DelCnt, PauseAfter} = case credit_api_v2(Cfg) of
false -> DelCnt0 + 1 true ->
{add(DelCnt0, 1), PauseAfter0 - 1};
false ->
DelCnt0 + 1
end, end,
Con = Con0#consumer{checked_out = Checked, Con = Con0#consumer{checked_out = Checked,
next_msg_id = Next + 1, next_msg_id = Next + 1,
credit = Credit - 1, credit = Credit - 1,
delivery_count = DelCnt}, delivery_count = DelCnt,
pause_after = PauseAfter},
Size = get_header(size, get_msg_header(ConsumerMsg)), Size = get_header(size, get_msg_header(ConsumerMsg)),
State = update_or_remove_sub( State = update_or_remove_sub(
Meta, ConsumerId, Con, Meta, ConsumerId, Con,
@ -2227,9 +2264,11 @@ update_or_remove_sub(_Meta, ConsumerId,
service_queue = maybe_queue_consumer(ConsumerId, Con, ServiceQueue)}. service_queue = maybe_queue_consumer(ConsumerId, Con, ServiceQueue)}.
maybe_queue_consumer(Key, #consumer{credit = Credit, maybe_queue_consumer(Key, #consumer{credit = Credit,
pause_after = PauseAfter,
status = up, status = up,
cfg = #consumer_cfg{priority = P}}, ServiceQueue) cfg = #consumer_cfg{priority = P}}, ServiceQueue)
when Credit > 0 -> when Credit > 0 andalso
PauseAfter > 0 ->
% TODO: queue:member could surely be quite expensive, however the practical % TODO: queue:member could surely be quite expensive, however the practical
% number of unique consumers may not be large enough for it to matter % number of unique consumers may not be large enough for it to matter
case priority_queue:member(Key, ServiceQueue) of case priority_queue:member(Key, ServiceQueue) of
@ -2400,6 +2439,10 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) ->
delivery_count = DeliveryCount, delivery_count = DeliveryCount,
drain = Drain}. drain = Drain}.
make_sent(ConsumerId, NumSent) ->
#sent{consumer_id = ConsumerId,
num = NumSent}.
-spec make_purge() -> protocol(). -spec make_purge() -> protocol().
make_purge() -> #purge{}. make_purge() -> #purge{}.

View File

@ -119,9 +119,16 @@
checked_out = #{} :: #{msg_id() => msg()}, checked_out = #{} :: #{msg_id() => msg()},
%% max number of messages that can be sent %% max number of messages that can be sent
%% decremented for each delivery %% decremented for each delivery
credit = 0 : non_neg_integer(), credit = 0 :: non_neg_integer(),
%% AMQP 1.0 §2.6.7 %% AMQP 1.0 §2.6.7
delivery_count :: rabbit_queue_type:delivery_count() delivery_count :: rabbit_queue_type:delivery_count(),
%% TODO session should send its initial NOTIFY_SENT_AFTER
%% This value is x2 so that we keep the procs busy.
%%
%% This field is used for RabbitMQ internal flow control betweeen
%% AMQP writer proc <--- sesssion proc <--- queue proc
%% to ensure we (queue proc) don't overload the session proc.
pause_after = 1000 :: non_neg_integer()
}). }).
-type consumer() :: #consumer{}. -type consumer() :: #consumer{}.

View File

@ -24,6 +24,7 @@
discard/3, discard/3,
credit_v1/4, credit_v1/4,
credit/6, credit/6,
sent/3,
handle_ra_event/4, handle_ra_event/4,
untracked_enqueue/2, untracked_enqueue/2,
purge/1, purge/1,
@ -431,6 +432,13 @@ credit(ConsumerTag, DeliveryCount, Credit, Drain, Echo,
State = State0#state{consumer_deliveries = CDels}, State = State0#state{consumer_deliveries = CDels},
{send_command(ServerId, undefined, Cmd, normal, State), []}. {send_command(ServerId, undefined, Cmd, normal, State), []}.
sent(ConsumerTag, NumSent, State0) ->
ConsumerId = consumer_id(ConsumerTag),
ServerId = pick_server(State0),
Cmd = rabbit_fifo:make_sent(ConsumerId, NumSent),
State = send_command(ServerId, undefined, Cmd, normal, State0),
{State, []}.
%% @doc Cancels a checkout with the rabbit_fifo queue for the consumer tag %% @doc Cancels a checkout with the rabbit_fifo queue for the consumer tag
%% %%
%% This is a synchronous call. I.e. the call will block until the command %% This is a synchronous call. I.e. the call will block until the command

View File

@ -47,6 +47,7 @@
settle/5, settle/5,
credit_v1/5, credit_v1/5,
credit/7, credit/7,
sent/4,
dequeue/5, dequeue/5,
fold_state/3, fold_state/3,
is_policy_applicable/2, is_policy_applicable/2,
@ -226,6 +227,10 @@
Drain :: boolean(), Echo :: boolean(), queue_state()) -> Drain :: boolean(), Echo :: boolean(), queue_state()) ->
{queue_state(), actions()}. {queue_state(), actions()}.
%% credit API v2
-callback sent(queue_name(), rabbit_types:ctag(), NumSent :: pos_integer(), queue_state()) ->
{queue_state(), actions()}.
-callback dequeue(queue_name(), NoAck :: boolean(), LimiterPid :: pid(), -callback dequeue(queue_name(), NoAck :: boolean(), LimiterPid :: pid(),
rabbit_types:ctag(), queue_state()) -> rabbit_types:ctag(), queue_state()) ->
{ok, Count :: non_neg_integer(), rabbit_amqqueue:qmsg(), queue_state()} | {ok, Count :: non_neg_integer(), rabbit_amqqueue:qmsg(), queue_state()} |
@ -676,6 +681,14 @@ credit(QName, CTag, DeliveryCount, Credit, Drain, Echo, Ctxs) ->
{State, Actions} = Mod:credit(QName, CTag, DeliveryCount, Credit, Drain, Echo, State0), {State, Actions} = Mod:credit(QName, CTag, DeliveryCount, Credit, Drain, Echo, State0),
{ok, set_ctx(QName, Ctx#ctx{state = State}, Ctxs), Actions}. {ok, set_ctx(QName, Ctx#ctx{state = State}, Ctxs), Actions}.
-spec sent(queue_name(), rabbit_types:ctag(), pos_integer(), state()) ->
{ok, state(), actions()}.
sent(QName, CTag, NumSent, Ctxs) ->
#ctx{state = State0,
module = Mod} = Ctx = get_ctx(QName, Ctxs),
{State, Actions} = Mod:sent(QName, CTag, NumSent, State0),
{ok, set_ctx(QName, Ctx#ctx{state = State}, Ctxs), Actions}.
-spec dequeue(amqqueue:amqqueue(), boolean(), -spec dequeue(amqqueue:amqqueue(), boolean(),
pid(), rabbit_types:ctag(), state()) -> pid(), rabbit_types:ctag(), state()) ->
{ok, non_neg_integer(), term(), state()} | {ok, non_neg_integer(), term(), state()} |

View File

@ -25,7 +25,7 @@
delete_immediately/1]). delete_immediately/1]).
-export([state_info/1, info/2, stat/1, infos/1, infos/2]). -export([state_info/1, info/2, stat/1, infos/1, infos/2]).
-export([settle/5, dequeue/5, consume/3, cancel/5]). -export([settle/5, dequeue/5, consume/3, cancel/5]).
-export([credit_v1/5, credit/7]). -export([credit_v1/5, credit/7, sent/4]).
-export([purge/1]). -export([purge/1]).
-export([stateless_deliver/2, deliver/3]). -export([stateless_deliver/2, deliver/3]).
-export([dead_letter_publish/5]). -export([dead_letter_publish/5]).
@ -807,6 +807,9 @@ credit_v1(_QName, CTag, Credit, Drain, QState) ->
credit(_QName, CTag, DeliveryCount, Credit, Drain, Echo, QState) -> credit(_QName, CTag, DeliveryCount, Credit, Drain, Echo, QState) ->
rabbit_fifo_client:credit(quorum_ctag(CTag), DeliveryCount, Credit, Drain, Echo, QState). rabbit_fifo_client:credit(quorum_ctag(CTag), DeliveryCount, Credit, Drain, Echo, QState).
sent(_QName, CTag, NumSent, QState) ->
rabbit_fifo_client:sent(quorum_ctag(CTag), NumSent, QState).
-spec dequeue(rabbit_amqqueue:name(), NoAck :: boolean(), pid(), -spec dequeue(rabbit_amqqueue:name(), NoAck :: boolean(), pid(),
rabbit_types:ctag(), rabbit_fifo_client:state()) -> rabbit_types:ctag(), rabbit_fifo_client:state()) ->
{empty, rabbit_fifo_client:state()} | {empty, rabbit_fifo_client:state()} |