diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 1cc1065111..e6ffa19529 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -59,6 +59,7 @@ -define(MANAGEMENT_LINK_CREDIT_RCV, 8). -define(MANAGEMENT_NODE_ADDRESS, <<"/management">>). -define(DEFAULT_EXCHANGE_NAME, <<>>). +-define(NOTIFY_SENT_AFTER, 500). -export([start_link/8, process_frame/2, @@ -138,15 +139,6 @@ multi_transfer_msg :: undefined | #multi_transfer_msg{} }). -%% link flow control state -%% §2.6.7 --record(flow_control, { - delivery_count :: sequence_no(), - credit :: non_neg_integer(), - available :: non_neg_integer(), - drain :: boolean() - }). - -record(outgoing_link, { %% Although the source address of a link might be an exchange name and binding key %% or a topic filter, an outgoing link will always consume from a queue. @@ -158,9 +150,12 @@ credit_api_version :: v1 | v2, %% When credit API v1 is used, our session process holds the delivery-count delivery_count :: sequence_no() | credit_api_v2, - %% When credit API v2 is used, both our session process and the queue hold link flow control state. - client_flow_control :: #flow_control{} | credit_api_v1, - queue_flow_control :: #flow_control{} | credit_api_v1 + %% This field is decremented for each message we were allowed to send to the writer proc. + %% When this field reaches 0, we notify the queue that we have sent to the writer proc ?NOTIFY_SENT_AFTER + %% messages so that the queue can sent us more. This is used for RabbitMQ internal flow control betweeen + %% AMQP writer proc <--- our sesssion proc <--- queue proc + %% to ensure that the queue doesn't overload our session proc. + notify_sent_after = ?NOTIFY_SENT_AFTER :: pos_integer() }). -record(outgoing_unsettled, { @@ -636,6 +631,8 @@ handle_stashed_down(#state{stashed_down = QNames, when QNameBin =:= QNameBinDown -> Detach = detach(Handle, Link, ?V_1_0_AMQP_ERROR_ILLEGAL_STATE), Frames = [Detach | Frames0], + %%TODO also remove any messages belonging to this consumer + %% from outgoing_pending queue? Links = maps:remove(Handle, Links0), {Frames, Links}; (_, _, Accum) -> @@ -743,6 +740,7 @@ destroy_incoming_link(Handle, Link = #incoming_link{queue_name_bin = QNameBin}, destroy_incoming_link(_, _, _, Acc) -> Acc. +%%TODO also remove any messages belonging to this consumer from outgoing_pending queue? destroy_outgoing_link(Handle, Link = #outgoing_link{queue_name_bin = QNameBin}, QNameBin, {Frames, Unsettled0, Links}) -> {Unsettled, _RemovedMsgIds} = remove_link_from_outgoing_unsettled_map(Handle, Unsettled0), {[detach(Handle, Link, ?V_1_0_AMQP_ERROR_RESOURCE_DELETED) | Frames], @@ -1000,13 +998,14 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, %% all consumers will use credit API v2. %% Streams always use credit API v2 since the stream client (rabbit_stream_queue) holds the link %% flow control state. Hence, credit API mixed version isn't an issue for streams. - {Mode, + {CreditApiVsn, + Mode, DeliveryCount} = case rabbit_feature_flags:is_enabled(credit_api_v2) orelse QType =:= rabbit_stream_queue of true -> - {{credited, ?INITIAL_DELIVERY_COUNT}, credit_api_v2}; + {2, {credited, ?INITIAL_DELIVERY_COUNT}, credit_api_v2}; false -> - {{credited, credit_api_v1}, {credit_api_v1, ?INITIAL_DELIVERY_COUNT}} + {1, {credited, credit_api_v1}, {credit_api_v1, ?INITIAL_DELIVERY_COUNT}} end, Spec = #{no_ack => SndSettled, channel_pid => self(), @@ -1040,6 +1039,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, queue_type = QType, send_settled = SndSettled, max_message_size = MaxMessageSize, + credit_api_version = CreditApiVsn, delivery_count = DeliveryCount}, OutgoingLinks = OutgoingLinks0#{HandleInt => Link}, State1 = State0#state{queue_states = QStates, @@ -1174,6 +1174,8 @@ handle_control(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)}, %% first detaching and then re-attaching to the same session with the same link handle (the handle %% becomes available for re-use once a link is closed): This will result in the same consumer tag, %% and we ideally disallow "updating" an AMQP consumer. + %%TODO If such an API is not added, we also must return messages in the outgoing_pending queue + %% which haven't made it to the outgoing_unsettled map? case rabbit_queue_type:cancel(Q, Ctag, undefined, Username, QStates0) of {ok, QStates1} -> {Unsettled1, MsgIds} = remove_link_from_outgoing_unsettled_map(Ctag, Unsettled0), @@ -1326,11 +1328,11 @@ send_pending(#state{remote_incoming_window = RemoteIncomingWindow, false -> {NewRemoteIncomingWindow, Buf, State1} = send_pending_delivery(Delivery, Buf1, State), - NumTransfersSent = NewRemoteIncomingWindow - RemoteIncomingWindow, + NumTransfersSent = RemoteIncomingWindow - NewRemoteIncomingWindow, State2 = session_flow_control_sent_transfers(NumTransfersSent, State1), - State = State2#state{outgoing_pending = Buf}, + State3 = State2#state{outgoing_pending = Buf}, %% Recurse to possibly send FLOW frames. - send_pending(State) + send_pending(State3) end end. @@ -1366,7 +1368,7 @@ send_pending_delivery(#pending_delivery{ {sent_all, SpaceLeft} -> {SpaceLeft, Buf, - record_outgoing_unsettled(Pending, State)}; + sent_pending_delivery(Pending, State)}; {sent_some, SpaceLeft, Rest} -> {SpaceLeft, queue:in_r(Pending#pending_delivery{frames = Rest}, Buf), @@ -1407,6 +1409,34 @@ send_fun(WriterPid, Ch) -> rabbit_amqp_writer:send_command(WriterPid, Ch, Transfer, Sections) end. +sent_pending_delivery( + #pending_delivery{outgoing_unsettled = #outgoing_unsettled{consumer_tag = Ctag, + queue_name = QName} + } = Pending, + #state{outgoing_links = OutgoingLinks0, + queue_states = QStates0} = S0) -> + Handle = ctag_to_handle(Ctag), + case OutgoingLinks0 of + #{Handle := Link0 = #outgoing_link{notify_sent_after = N0}} -> + {N, S3} = if N0 =:= 1 -> + {ok, QStates, Actions} = rabbit_queue_type:sent( + QName, Ctag, ?NOTIFY_SENT_AFTER, QStates0), + S1 = S0#state{queue_states = QStates}, + S2 = handle_queue_actions(Actions, S1), + {?NOTIFY_SENT_AFTER, S2}; + N0 > 1 -> + {N0 - 1, S0} + end, + Link = Link0#outgoing_link{notify_sent_after = N}, + OutgoingLinks = OutgoingLinks0#{Handle := Link}, + S = S3#state{outgoing_links = OutgoingLinks}, + record_outgoing_unsettled(Pending, S); + _ -> + %% TODO make sure we don't get here by removing returning messages in outgoing_pending + %% when detaching a link + exit({no_outgoing_link_handle, Handle}) + end. + record_outgoing_unsettled(#pending_delivery{queue_ack_required = true, delivery_id = DeliveryId, outgoing_unsettled = Unsettled}, diff --git a/deps/rabbit/src/rabbit_amqp_writer.erl b/deps/rabbit/src/rabbit_amqp_writer.erl index f55762d8bd..bd42fc1c2d 100644 --- a/deps/rabbit/src/rabbit_amqp_writer.erl +++ b/deps/rabbit/src/rabbit_amqp_writer.erl @@ -101,7 +101,8 @@ init({Sock, MaxFrame, ReaderPid}) -> max_frame_size = MaxFrame, reader = ReaderPid, pending = [], - pending_size = 0}, + pending_size = 0, + monitored_sessions = #{}}, process_flag(message_queue_data, off_heap), {ok, State}. diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index ebe29cd611..fe9328cfee 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -63,6 +63,7 @@ make_return/2, make_discard/2, make_credit/4, + make_sent/2, make_purge/0, make_purge_nodes/1, make_update_config/1, @@ -99,6 +100,8 @@ credit :: non_neg_integer(), delivery_count :: rabbit_queue_type:delivery_count(), drain :: boolean()}). +-record(sent, {consumer_id :: consumer_id(), + num :: pos_integer()}). -record(purge, {}). -record(purge_nodes, {nodes :: [node()]}). -record(update_config, {config :: config()}). @@ -113,6 +116,7 @@ #return{} | #discard{} | #credit{} | + #sent{} | #purge{} | #purge_nodes{} | #update_config{} | @@ -370,6 +374,31 @@ apply(Meta, #credit{credit = LinkCreditRcv, delivery_count = DeliveryCountRcv, %% credit for unknown consumer - just ignore {State0, ok} end; +apply(Meta, #sent{consumer_id = ConsumerId, num = NumSent}, + #?MODULE{consumers = Cons0, + service_queue = ServiceQueue0, + waiting_consumers = Waiting0} = State0) -> + case Cons0 of + #{ConsumerId := #consumer{pause_after = PauseAfter} = Con0} -> + Con = Con0#consumer{pause_after = PauseAfter + NumSent}, + ServiceQueue = maybe_queue_consumer(ConsumerId, Con, ServiceQueue0), + State1 = State0#?MODULE{service_queue = ServiceQueue, + consumers = maps:update(ConsumerId, Con, Cons0)}, + {_State, ok, _Effects} = checkout(Meta, State0, State1, []); + _ when Waiting0 /= [] -> + case lists:keytake(ConsumerId, 1, Waiting0) of + {value, {_, Con0 = #consumer{pause_after = PauseAfter}}, Waiting} -> + Con = Con0#consumer{pause_after = PauseAfter}, + State = State0#?MODULE{waiting_consumers = + [{ConsumerId, Con} | Waiting]}, + {State, ok}; + false -> + {State0, ok} + end; + _ -> + %% ignore unknown consumer + {State0, ok} + end; apply(_, #checkout{spec = {dequeue, _}}, #?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) -> {State0, {error, {unsupported, single_active_consumer}}}; @@ -2106,7 +2135,10 @@ checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) -> %% there are consumers waiting to be serviced %% process consumer checkout case maps:get(ConsumerId, Cons0) of - #consumer{credit = 0} -> + #consumer{credit = Credit, + pause_after = PauseAfter} + when Credit =:= 0 orelse + PauseAfter =:= 0 -> %% no credit but was still on queue %% can happen when draining %% recurse without consumer on queue @@ -2121,16 +2153,21 @@ checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) -> next_msg_id = Next, credit = Credit, delivery_count = DelCnt0, + pause_after = PauseAfter0, cfg = Cfg} = Con0 -> Checked = maps:put(Next, ConsumerMsg, Checked0), - DelCnt = case credit_api_v2(Cfg) of - true -> add(DelCnt0, 1); - false -> DelCnt0 + 1 - end, + %% For now, PauseAfter must only be decremented for AMQP 1.0 consumers. + {DelCnt, PauseAfter} = case credit_api_v2(Cfg) of + true -> + {add(DelCnt0, 1), PauseAfter0 - 1}; + false -> + DelCnt0 + 1 + end, Con = Con0#consumer{checked_out = Checked, next_msg_id = Next + 1, credit = Credit - 1, - delivery_count = DelCnt}, + delivery_count = DelCnt, + pause_after = PauseAfter}, Size = get_header(size, get_msg_header(ConsumerMsg)), State = update_or_remove_sub( Meta, ConsumerId, Con, @@ -2227,9 +2264,11 @@ update_or_remove_sub(_Meta, ConsumerId, service_queue = maybe_queue_consumer(ConsumerId, Con, ServiceQueue)}. maybe_queue_consumer(Key, #consumer{credit = Credit, + pause_after = PauseAfter, status = up, cfg = #consumer_cfg{priority = P}}, ServiceQueue) - when Credit > 0 -> + when Credit > 0 andalso + PauseAfter > 0 -> % TODO: queue:member could surely be quite expensive, however the practical % number of unique consumers may not be large enough for it to matter case priority_queue:member(Key, ServiceQueue) of @@ -2400,6 +2439,10 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) -> delivery_count = DeliveryCount, drain = Drain}. +make_sent(ConsumerId, NumSent) -> + #sent{consumer_id = ConsumerId, + num = NumSent}. + -spec make_purge() -> protocol(). make_purge() -> #purge{}. diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl index 65f2db8a60..e58f370b24 100644 --- a/deps/rabbit/src/rabbit_fifo.hrl +++ b/deps/rabbit/src/rabbit_fifo.hrl @@ -119,9 +119,16 @@ checked_out = #{} :: #{msg_id() => msg()}, %% max number of messages that can be sent %% decremented for each delivery - credit = 0 : non_neg_integer(), + credit = 0 :: non_neg_integer(), %% AMQP 1.0 §2.6.7 - delivery_count :: rabbit_queue_type:delivery_count() + delivery_count :: rabbit_queue_type:delivery_count(), + %% TODO session should send its initial NOTIFY_SENT_AFTER + %% This value is x2 so that we keep the procs busy. + %% + %% This field is used for RabbitMQ internal flow control betweeen + %% AMQP writer proc <--- sesssion proc <--- queue proc + %% to ensure we (queue proc) don't overload the session proc. + pause_after = 1000 :: non_neg_integer() }). -type consumer() :: #consumer{}. diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index f1bea8db34..12bab0d42b 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -24,6 +24,7 @@ discard/3, credit_v1/4, credit/6, + sent/3, handle_ra_event/4, untracked_enqueue/2, purge/1, @@ -431,6 +432,13 @@ credit(ConsumerTag, DeliveryCount, Credit, Drain, Echo, State = State0#state{consumer_deliveries = CDels}, {send_command(ServerId, undefined, Cmd, normal, State), []}. +sent(ConsumerTag, NumSent, State0) -> + ConsumerId = consumer_id(ConsumerTag), + ServerId = pick_server(State0), + Cmd = rabbit_fifo:make_sent(ConsumerId, NumSent), + State = send_command(ServerId, undefined, Cmd, normal, State0), + {State, []}. + %% @doc Cancels a checkout with the rabbit_fifo queue for the consumer tag %% %% This is a synchronous call. I.e. the call will block until the command diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 12329ce488..3f18db211a 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -47,6 +47,7 @@ settle/5, credit_v1/5, credit/7, + sent/4, dequeue/5, fold_state/3, is_policy_applicable/2, @@ -226,6 +227,10 @@ Drain :: boolean(), Echo :: boolean(), queue_state()) -> {queue_state(), actions()}. +%% credit API v2 +-callback sent(queue_name(), rabbit_types:ctag(), NumSent :: pos_integer(), queue_state()) -> + {queue_state(), actions()}. + -callback dequeue(queue_name(), NoAck :: boolean(), LimiterPid :: pid(), rabbit_types:ctag(), queue_state()) -> {ok, Count :: non_neg_integer(), rabbit_amqqueue:qmsg(), queue_state()} | @@ -676,6 +681,14 @@ credit(QName, CTag, DeliveryCount, Credit, Drain, Echo, Ctxs) -> {State, Actions} = Mod:credit(QName, CTag, DeliveryCount, Credit, Drain, Echo, State0), {ok, set_ctx(QName, Ctx#ctx{state = State}, Ctxs), Actions}. +-spec sent(queue_name(), rabbit_types:ctag(), pos_integer(), state()) -> + {ok, state(), actions()}. +sent(QName, CTag, NumSent, Ctxs) -> + #ctx{state = State0, + module = Mod} = Ctx = get_ctx(QName, Ctxs), + {State, Actions} = Mod:sent(QName, CTag, NumSent, State0), + {ok, set_ctx(QName, Ctx#ctx{state = State}, Ctxs), Actions}. + -spec dequeue(amqqueue:amqqueue(), boolean(), pid(), rabbit_types:ctag(), state()) -> {ok, non_neg_integer(), term(), state()} | diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 2086f46399..2146fea683 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -25,7 +25,7 @@ delete_immediately/1]). -export([state_info/1, info/2, stat/1, infos/1, infos/2]). -export([settle/5, dequeue/5, consume/3, cancel/5]). --export([credit_v1/5, credit/7]). +-export([credit_v1/5, credit/7, sent/4]). -export([purge/1]). -export([stateless_deliver/2, deliver/3]). -export([dead_letter_publish/5]). @@ -807,6 +807,9 @@ credit_v1(_QName, CTag, Credit, Drain, QState) -> credit(_QName, CTag, DeliveryCount, Credit, Drain, Echo, QState) -> rabbit_fifo_client:credit(quorum_ctag(CTag), DeliveryCount, Credit, Drain, Echo, QState). +sent(_QName, CTag, NumSent, QState) -> + rabbit_fifo_client:sent(quorum_ctag(CTag), NumSent, QState). + -spec dequeue(rabbit_amqqueue:name(), NoAck :: boolean(), pid(), rabbit_types:ctag(), rabbit_fifo_client:state()) -> {empty, rabbit_fifo_client:state()} |