Decrease memory usage of queue_type state

Prior to this commit, 1 MQTT publisher publishing to 1 Million target
classic queues requires around 680 MB of process memory.

After this commit, it requires around 290 MB of process memory.

This commit requires feature flag classic_queue_type_delivery_support
and introduces a new one called no_queue_name_in_classic_queue_client.

Instead of storing the binary queue name 4 times, this commit now stores
it only 1 time.

The monitor_registry is removed since only classic queue clients monitor
their classic queue server processes.

The classic queue client does not store the queue name anymore. Instead
the queue name is included in messages handled by the classic queue
client.

Storing the queue name in the record ctx was unnecessary.

More potential future memory optimisations:
* When routing to destination queues, looking up the queue record,
  delivering to queue: Use streaming / batching instead of fetching all
  at once
* Only fetch ETS columns that are necessary instead of whole queue
  records
* Do not hold the same vhost binary in memory many times. Instead,
  maintain a mapping.
* Remove unnecessary tuple fields.
This commit is contained in:
David Ansari 2022-10-18 10:29:15 +00:00
parent 4b1c2c870b
commit af68fb4484
15 changed files with 162 additions and 279 deletions

View File

@ -1763,7 +1763,7 @@ basic_get(Q, NoAck, LimiterPid, CTag, QStates) ->
non_neg_integer(), rabbit_types:ctag(), boolean(),
rabbit_framing:amqp_table(), any(), rabbit_types:username(),
rabbit_queue_type:state()) ->
{ok, rabbit_queue_type:state(), rabbit_queue_type:actions()} |
{ok, rabbit_queue_type:state()} |
{error, term()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
basic_consume(Q, NoAck, ChPid, LimiterPid,

View File

@ -743,27 +743,6 @@ handle_cast({mandatory_received, _MsgSeqNo}, State) ->
%% NB: don't call noreply/1 since we don't want to send confirms.
noreply_coalesce(State);
handle_cast({reject_publish, _MsgSeqNo, QPid} = Evt, State) ->
%% For backwards compatibility
QRef = rabbit_queue_type:find_name_from_pid(QPid, State#ch.queue_states),
case QRef of
undefined ->
%% ignore if no queue could be found for the given pid
noreply(State);
_ ->
handle_cast({queue_event, QRef, Evt}, State)
end;
handle_cast({confirm, _MsgSeqNo, QPid} = Evt, State) ->
%% For backwards compatibility
QRef = rabbit_queue_type:find_name_from_pid(QPid, State#ch.queue_states),
case QRef of
undefined ->
%% ignore if no queue could be found for the given pid
noreply(State);
_ ->
handle_cast({queue_event, QRef, Evt}, State)
end;
handle_cast({queue_event, QRef, Evt},
#ch{queue_states = QueueStates0} = State0) ->
case rabbit_queue_type:handle_event(QRef, Evt, QueueStates0) of
@ -786,11 +765,6 @@ handle_cast({queue_event, QRef, Evt},
rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
end.
handle_info({ra_event, {Name, _} = From, Evt}, State) ->
%% For backwards compatibility
QRef = find_queue_name_from_quorum_name(Name, State#ch.queue_states),
handle_cast({queue_event, QRef, {From, Evt}}, State);
handle_info({bump_credit, Msg}, State) ->
%% A rabbit_amqqueue_process is granting credit to our channel. If
%% our channel was being blocked by this process, and no other
@ -811,11 +785,11 @@ handle_info(emit_stats, State) ->
%% stats timer.
{noreply, send_confirms_and_nacks(State1), hibernate};
handle_info({'DOWN', _MRef, process, QPid, Reason},
handle_info({{'DOWN', QName}, _MRef, process, QPid, Reason},
#ch{queue_states = QStates0,
queue_monitors = _QMons} = State0) ->
credit_flow:peer_down(QPid),
case rabbit_queue_type:handle_down(QPid, Reason, QStates0) of
case rabbit_queue_type:handle_down(QPid, QName, Reason, QStates0) of
{ok, QState1, Actions} ->
State1 = State0#ch{queue_states = QState1},
State = handle_queue_actions(Actions, State1),
@ -1813,7 +1787,7 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
Username, QueueStates0),
Q}
end) of
{{ok, QueueStates, Actions}, Q} when ?is_amqqueue(Q) ->
{{ok, QueueStates}, Q} when ?is_amqqueue(Q) ->
rabbit_global_counters:consumer_created(amqp091),
CM1 = maps:put(
ActualConsumerTag,
@ -1822,10 +1796,9 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
State1 = State#ch{consumer_mapping = CM1,
queue_states = QueueStates},
State2 = handle_queue_actions(Actions, State1),
{ok, case NoWait of
true -> consumer_monitor(ActualConsumerTag, State2);
false -> State2
true -> consumer_monitor(ActualConsumerTag, State1);
false -> State1
end};
{{error, exclusive_consume_unavailable} = E, _Q} ->
E;
@ -2891,20 +2864,6 @@ handle_queue_actions(Actions, #ch{} = State0) ->
end, State0, Actions).
find_queue_name_from_quorum_name(Name, QStates) ->
Fun = fun(K, _V, undefined) ->
{ok, Q} = rabbit_amqqueue:lookup(K),
case amqqueue:get_pid(Q) of
{Name, _} ->
amqqueue:get_name(Q);
_ ->
undefined
end;
(_, _, Acc) ->
Acc
end,
rabbit_queue_type:fold_state(Fun, undefined, QStates).
maybe_increase_global_publishers(#ch{publishing_mode = true} = State0) ->
State0;
maybe_increase_global_publishers(State0) ->

View File

@ -4,14 +4,20 @@
-include("amqqueue.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
%% TODO possible to use sets / maps instead of lists?
%% Check performance with QoS 1 and 1 million target queues.
-record(msg_status, {pending :: [pid()],
confirmed = [] :: [pid()]}).
-define(STATE, ?MODULE).
-record(?STATE, {pid :: undefined | pid(), %% the current master pid
qref :: term(), %% TODO
unconfirmed = #{} ::
#{non_neg_integer() => #msg_status{}}}).
-record(?STATE, {
%% the current master pid
pid :: undefined | pid(),
%% undefined if feature flag no_queue_name_in_classic_queue_client enabled
qref :: term(),
unconfirmed = #{} :: #{non_neg_integer() => #msg_status{}},
monitored = #{} :: #{pid() => ok}
}).
-opaque state() :: #?STATE{}.
@ -156,9 +162,14 @@ stat(Q) ->
-spec init(amqqueue:amqqueue()) -> {ok, state()}.
init(Q) when ?amqqueue_is_classic(Q) ->
QName = amqqueue:get_name(Q),
QRef = case rabbit_feature_flags:is_enabled(no_queue_name_in_classic_queue_client) of
true ->
undefined;
false ->
amqqueue:get_name(Q)
end,
{ok, #?STATE{pid = amqqueue:get_pid(Q),
qref = QName}}.
qref = QRef}}.
-spec close(state()) -> ok.
close(_State) ->
@ -174,7 +185,7 @@ update(Q, #?STATE{pid = Pid} = State) when ?amqqueue_is_classic(Q) ->
State#?STATE{pid = NewPid}
end.
consume(Q, Spec, State) when ?amqqueue_is_classic(Q) ->
consume(Q, Spec, State0) when ?amqqueue_is_classic(Q) ->
QPid = amqqueue:get_pid(Q),
QRef = amqqueue:get_name(Q),
#{no_ack := NoAck,
@ -194,9 +205,9 @@ consume(Q, Spec, State) when ?amqqueue_is_classic(Q) ->
ExclusiveConsume, Args, OkMsg, ActingUser},
infinity]}) of
ok ->
%% ask the host process to monitor this pid
%% TODO: track pids as they change
{ok, State#?STATE{pid = QPid}, [{monitor, QPid, QRef}]};
State = ensure_monitor(QPid, QRef, State0),
{ok, State#?STATE{pid = QPid}};
Err ->
Err
end.
@ -233,8 +244,10 @@ credit(CTag, Credit, Drain, State) ->
[{credit, ChPid, CTag, Credit, Drain}]}),
{State, []}.
handle_event({confirm, MsgSeqNos, Pid}, #?STATE{qref = QRef,
unconfirmed = U0} = State) ->
handle_event({confirm, MsgSeqNos, Pid}, #?STATE{qref = QRef} = State) ->
%% backwards compatibility when feature flag no_queue_name_in_classic_queue_client disabled
handle_event({confirm, MsgSeqNos, Pid, QRef}, State);
handle_event({confirm, MsgSeqNos, Pid, QRef}, #?STATE{unconfirmed = U0} = State) ->
%% confirms should never result in rejections
{Unconfirmed, ConfirmedSeqNos, []} =
settle_seq_nos(MsgSeqNos, Pid, U0, confirm),
@ -247,17 +260,20 @@ handle_event({confirm, MsgSeqNos, Pid}, #?STATE{qref = QRef,
{ok, State#?STATE{unconfirmed = Unconfirmed}, Actions};
handle_event({deliver, _, _, _} = Delivery, #?STATE{} = State) ->
{ok, State, [Delivery]};
handle_event({reject_publish, SeqNo, _QPid},
#?STATE{qref = QRef,
unconfirmed = U0} = State) ->
handle_event({reject_publish, SeqNo, QPid}, #?STATE{qref = QRef} = State) ->
%% backwards compatibility when feature flag no_queue_name_in_classic_queue_client disabled
handle_event({reject_publish, SeqNo, QPid, QRef}, State);
handle_event({reject_publish, SeqNo, _QPid, QRef},
#?STATE{unconfirmed = U0} = State) ->
%% It does not matter which queue rejected the message,
%% if any queue did, it should not be confirmed.
{U, Rejected} = reject_seq_no(SeqNo, U0),
Actions = [{rejected, QRef, Rejected}],
{ok, State#?STATE{unconfirmed = U}, Actions};
handle_event({down, Pid, Info}, #?STATE{qref = QRef,
pid = MasterPid,
unconfirmed = U0} = State0) ->
handle_event({down, Pid, QRef, Info}, #?STATE{monitored = Monitored,
pid = MasterPid,
unconfirmed = U0} = State0) ->
State = State0#?STATE{monitored = maps:remove(Pid, Monitored)},
Actions0 = case Pid =:= MasterPid of
true ->
[{queue_down, QRef}];
@ -279,7 +295,7 @@ handle_event({down, Pid, Info}, #?STATE{qref = QRef,
Actions = settlement_action(
settled, QRef, Settled,
settlement_action(rejected, QRef, Rejected, Actions0)),
{ok, State0#?STATE{unconfirmed = Unconfirmed}, Actions};
{ok, State#?STATE{unconfirmed = Unconfirmed}, Actions};
true ->
%% any abnormal exit should be considered a full reject of the
%% oustanding message ids - If the message didn't get to all
@ -294,7 +310,7 @@ handle_event({down, Pid, Info}, #?STATE{qref = QRef,
end
end, [], U0),
U = maps:without(MsgIds, U0),
{ok, State0#?STATE{unconfirmed = U},
{ok, State#?STATE{unconfirmed = U},
[{rejected, QRef, MsgIds} | Actions0]}
end;
handle_event({send_drained, _} = Action, State) ->
@ -319,7 +335,7 @@ deliver(Qs0, #delivery{flow = Flow,
Msg = Msg0#basic_message{id = rabbit_guid:gen()},
Delivery = Delivery0#delivery{message = Msg},
{MPids, SPids, Qs, Actions} = qpids(Qs0, Confirm, MsgNo),
{MPids, SPids, Qs} = qpids(Qs0, Confirm, MsgNo),
case Flow of
%% Here we are tracking messages sent by the rabbit_channel
%% process. We are accessing the rabbit_channel process
@ -334,7 +350,7 @@ deliver(Qs0, #delivery{flow = Flow,
SMsg = {deliver, Delivery, true},
delegate:invoke_no_result(MPids, {gen_server2, cast, [MMsg]}),
delegate:invoke_no_result(SPids, {gen_server2, cast, [SMsg]}),
{Qs, Actions}.
{Qs, []}.
-spec dequeue(NoAck :: boolean(), LimiterPid :: pid(),
@ -382,14 +398,16 @@ purge(Q) when ?is_amqqueue(Q) ->
qpids(Qs, Confirm, MsgNo) ->
lists:foldl(
fun ({Q, S0}, {MPidAcc, SPidAcc, Qs0, Actions0}) ->
fun ({Q, S0}, {MPidAcc, SPidAcc, Qs0}) ->
QPid = amqqueue:get_pid(Q),
SPids = amqqueue:get_slave_pids(Q),
QRef = amqqueue:get_name(Q),
Actions = [{monitor, QPid, QRef}
| [{monitor, P, QRef} || P <- SPids]] ++ Actions0,
S1 = ensure_monitor(QPid, QRef, S0),
S2 = lists:foldl(fun(SPid, Acc) ->
ensure_monitor(SPid, QRef, Acc)
end, S1, SPids),
%% confirm record only if necessary
S = case S0 of
S = case S2 of
#?STATE{unconfirmed = U0} ->
Rec = [QPid | SPids],
U = case Confirm of
@ -398,14 +416,14 @@ qpids(Qs, Confirm, MsgNo) ->
true ->
U0#{MsgNo => #msg_status{pending = Rec}}
end,
S0#?STATE{pid = QPid,
S2#?STATE{pid = QPid,
unconfirmed = U};
stateless ->
S0
S2
end,
{[QPid | MPidAcc], SPidAcc ++ SPids,
[{Q, S} | Qs0], Actions}
end, {[], [], [], []}, Qs).
[{Q, S} | Qs0]}
end, {[], [], []}, Qs).
%% internal-ish
-spec wait_for_promoted_or_stopped(amqqueue:amqqueue()) ->
@ -522,59 +540,43 @@ update_msg_status(confirm, Pid, #msg_status{pending = P,
update_msg_status(down, Pid, #msg_status{pending = P} = S) ->
S#msg_status{pending = lists:delete(Pid, P)}.
ensure_monitor(_, _, State = stateless) ->
State;
ensure_monitor(Pid, _, State = #?STATE{monitored = Monitored})
when is_map_key(Pid, Monitored) ->
State;
ensure_monitor(Pid, QName, State = #?STATE{monitored = Monitored}) ->
_ = erlang:monitor(process, Pid, [{tag, {'DOWN', QName}}]),
State#?STATE{monitored = Monitored#{Pid => ok}}.
%% part of channel <-> queue api
confirm_to_sender(Pid, QName, MsgSeqNos) ->
%% the stream queue included the queue type refactoring and thus requires
%% a different message format
case rabbit_queue_type:is_supported() of
true ->
gen_server:cast(Pid,
{queue_event, QName,
{confirm, MsgSeqNos, self()}});
false ->
gen_server2:cast(Pid, {confirm, MsgSeqNos, self()})
end.
Msg = case rabbit_feature_flags:is_enabled(no_queue_name_in_classic_queue_client) of
true ->
{confirm, MsgSeqNos, self(), QName};
false ->
{confirm, MsgSeqNos, self()}
end,
gen_server:cast(Pid, {queue_event, QName, Msg}).
send_rejection(Pid, QName, MsgSeqNo) ->
case rabbit_queue_type:is_supported() of
true ->
gen_server:cast(Pid, {queue_event, QName,
{reject_publish, MsgSeqNo, self()}});
false ->
gen_server2:cast(Pid, {reject_publish, MsgSeqNo, self()})
end.
Msg = case rabbit_feature_flags:is_enabled(no_queue_name_in_classic_queue_client) of
true ->
{reject_publish, MsgSeqNo, self(), QName};
false ->
{reject_publish, MsgSeqNo, self()}
end,
gen_server:cast(Pid, {queue_event, QName, Msg}).
deliver_to_consumer(Pid, QName, CTag, AckRequired, Message) ->
case has_classic_queue_type_delivery_support() of
true ->
Deliver = {deliver, CTag, AckRequired, [Message]},
Evt = {queue_event, QName, Deliver},
gen_server:cast(Pid, Evt);
false ->
Deliver = {deliver, CTag, AckRequired, Message},
gen_server2:cast(Pid, Deliver)
end.
Deliver = {deliver, CTag, AckRequired, [Message]},
Evt = {queue_event, QName, Deliver},
gen_server:cast(Pid, Evt).
send_drained(Pid, QName, CTagCredits) ->
case has_classic_queue_type_delivery_support() of
true ->
gen_server:cast(Pid, {queue_event, QName,
{send_drained, CTagCredits}});
false ->
gen_server2:cast(Pid, {send_drained, CTagCredits})
end.
gen_server:cast(Pid, {queue_event, QName,
{send_drained, CTagCredits}}).
send_credit_reply(Pid, QName, Len) when is_integer(Len) ->
case rabbit_queue_type:is_supported() of
true ->
gen_server:cast(Pid, {queue_event, QName,
{send_credit_reply, Len}});
false ->
gen_server2:cast(Pid, {send_credit_reply, Len})
end.
has_classic_queue_type_delivery_support() ->
%% some queue_events were missed in the initial queue_type implementation
%% this feature flag enables those and completes the initial queue type
%% API for classic queues
rabbit_feature_flags:is_enabled(classic_queue_type_delivery_support).
gen_server:cast(Pid, {queue_event, QName,
{send_credit_reply, Len}}).

View File

@ -112,18 +112,26 @@
{classic_queue_type_delivery_support,
#{desc => "Bug fix for classic queue deliveries using mixed versions",
doc_url => "https://github.com/rabbitmq/rabbitmq-server/issues/5931",
stability => stable,
%%TODO remove compatibility code
stability => required,
depends_on => [stream_queue]
}}).
-rabbit_feature_flag(
{restart_streams,
#{desc => "Support for restarting streams with optional preferred next leader argument. "
"Used to implement stream leader rebalancing",
"Used to implement stream leader rebalancing",
stability => stable,
depends_on => [stream_queue]
}}).
-rabbit_feature_flag(
{no_queue_name_in_classic_queue_client,
#{desc => "Remove queue name from classic queue type client to save memory",
stability => stable,
depends_on => [classic_queue_type_delivery_support]
}}).
%% -------------------------------------------------------------------
%% Direct exchange routing v2.
%% -------------------------------------------------------------------

View File

@ -27,7 +27,7 @@
handle_ra_event/3,
untracked_enqueue/2,
purge/1,
cluster_name/1,
queue_name/1,
update_machine_state/2,
pending_size/1,
stat/1,
@ -47,13 +47,13 @@
{rabbit_fifo:consumer_tag(), non_neg_integer()}}.
-type actions() :: [action()].
-type cluster_name() :: rabbit_types:r(queue).
-type queue_name() :: rabbit_types:r(queue).
-record(consumer, {last_msg_id :: seq() | -1,
ack = false :: boolean(),
delivery_count = 0 :: non_neg_integer()}).
-record(cfg, {cluster_name :: cluster_name(),
-record(cfg, {queue_name :: queue_name(),
servers = [] :: [ra:server_id()],
soft_limit = ?SOFT_LIMIT :: non_neg_integer(),
block_handler = fun() -> ok end :: fun(() -> term()),
@ -87,33 +87,33 @@
%% @doc Create the initial state for a new rabbit_fifo sessions. A state is needed
%% to interact with a rabbit_fifo queue using @module.
%% @param ClusterName the id of the cluster to interact with
%% @param QueueName the id of the cluster to interact with
%% @param Servers The known servers of the queue. If the current leader is known
%% ensure the leader node is at the head of the list.
-spec init(cluster_name(), [ra:server_id()]) -> state().
init(ClusterName, Servers) ->
init(ClusterName, Servers, ?SOFT_LIMIT).
-spec init(queue_name(), [ra:server_id()]) -> state().
init(QueueName, Servers) ->
init(QueueName, Servers, ?SOFT_LIMIT).
%% @doc Create the initial state for a new rabbit_fifo sessions. A state is needed
%% to interact with a rabbit_fifo queue using @module.
%% @param ClusterName the id of the cluster to interact with
%% @param QueueName the id of the cluster to interact with
%% @param Servers The known servers of the queue. If the current leader is known
%% ensure the leader node is at the head of the list.
%% @param MaxPending size defining the max number of pending commands.
-spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit) ->
-spec init(queue_name(), [ra:server_id()], non_neg_integer()) -> state().
init(QueueName = #resource{}, Servers, SoftLimit) ->
Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
#state{cfg = #cfg{cluster_name = ClusterName,
#state{cfg = #cfg{queue_name = QueueName,
servers = Servers,
soft_limit = SoftLimit,
timeout = Timeout * 1000}}.
-spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok),
-spec init(queue_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok),
fun(() -> ok)) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
init(QueueName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
%% net ticktime is in seconds
Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
#state{cfg = #cfg{cluster_name = ClusterName,
#state{cfg = #cfg{queue_name = QueueName,
servers = Servers,
block_handler = BlockFun,
unblock_handler = UnblockFun,
@ -237,7 +237,7 @@ enqueue(Msg, State) ->
| {empty, state()} | {error | timeout, term()}.
dequeue(ConsumerTag, Settlement,
#state{cfg = #cfg{timeout = Timeout,
cluster_name = QName}} = State0) ->
queue_name = QName}} = State0) ->
Node = pick_server(State0),
ConsumerId = consumer_id(ConsumerTag),
case ra:process_command(Node,
@ -502,9 +502,9 @@ stat(Leader, Timeout) ->
end.
%% @doc returns the cluster name
-spec cluster_name(state()) -> cluster_name().
cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) ->
ClusterName.
-spec queue_name(state()) -> queue_name().
queue_name(#state{cfg = #cfg{queue_name = QueueName}}) ->
QueueName.
update_machine_state(Server, Conf) ->
case ra:process_command(Server, rabbit_fifo:make_update_config(Conf), ?COMMAND_TIMEOUT) of
@ -561,7 +561,7 @@ update_machine_state(Server, Conf) ->
{internal, Correlators :: [term()], actions(), state()} |
{rabbit_fifo:client_msg(), state()} | eol.
handle_ra_event(From, {applied, Seqs},
#state{cfg = #cfg{cluster_name = QRef,
#state{cfg = #cfg{queue_name = QRef,
soft_limit = SftLmt,
unblock_handler = UnblockFun}} = State0) ->
@ -738,7 +738,7 @@ maybe_auto_ack(false, {deliver, Tag, _Ack, Msgs} = Deliver, State0) ->
{ok, State, [Deliver] ++ Actions}.
handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs},
#state{cfg = #cfg{cluster_name = QName},
#state{cfg = #cfg{queue_name = QName},
consumer_deliveries = CDels0} = State0)
when is_map_key(Tag, CDels0) ->
QRef = qref(Leader),
@ -905,7 +905,7 @@ add_command(Cid, discard, MsgIds, Acc) ->
set_timer(#state{leader = Leader0,
cfg = #cfg{servers = [Server | _],
cluster_name = QName}} = State) ->
queue_name = QName}} = State) ->
Leader = case Leader0 of
undefined -> Server;
_ ->

View File

@ -177,10 +177,10 @@ handle_info({'DOWN', Ref, process, _, _},
rabbit_log:debug("~ts terminating itself because leader of ~ts is down...",
[?MODULE, rabbit_misc:rs(QRef)]),
supervisor:terminate_child(rabbit_fifo_dlx_sup, self());
handle_info({'DOWN', _MRef, process, QPid, Reason},
handle_info({{'DOWN', QName}, _MRef, process, QPid, Reason},
#state{queue_type_state = QTypeState0} = State0) ->
%% received from target classic queue
case rabbit_queue_type:handle_down(QPid, Reason, QTypeState0) of
case rabbit_queue_type:handle_down(QPid, QName, Reason, QTypeState0) of
{ok, QTypeState, Actions} ->
State = State0#state{queue_type_state = QTypeState},
{noreply, handle_queue_actions(Actions, State)};

View File

@ -15,7 +15,6 @@
discover/1,
feature_flag_name/1,
default/0,
is_supported/0,
is_enabled/1,
is_compatible/4,
declare/2,
@ -34,7 +33,7 @@
new/2,
consume/3,
cancel/5,
handle_down/3,
handle_down/4,
handle_event/3,
module/2,
deliver/3,
@ -42,7 +41,6 @@
credit/5,
dequeue/5,
fold_state/3,
find_name_from_pid/2,
is_policy_applicable/2,
is_server_named_allowed/1,
arguments/1,
@ -51,7 +49,6 @@
]).
-type queue_name() :: rabbit_types:r(queue).
-type queue_ref() :: queue_name() | atom().
-type queue_state() :: term().
-type msg_tag() :: term().
-type arguments() :: queue_arguments | consumer_arguments.
@ -66,13 +63,8 @@
%% TODO resolve all registered queue types from registry
-define(QUEUE_TYPES, [rabbit_classic_queue, rabbit_quorum_queue, rabbit_stream_queue]).
-define(QREF(QueueReference),
(is_tuple(QueueReference) andalso element(1, QueueReference) == resource)
orelse is_atom(QueueReference)).
%% anything that the host process needs to do on behalf of the queue type
%% session, like knowing when to notify on monitor down
%% anything that the host process needs to do on behalf of the queue type session
-type action() ::
{monitor, Pid :: pid(), queue_ref()} |
%% indicate to the queue type module that a message has been delivered
%% fully to the queue
{settled, Success :: boolean(), [msg_tag()]} |
@ -85,7 +77,6 @@
term().
-record(ctx, {module :: module(),
name :: queue_name(),
%% "publisher confirm queue accounting"
%% queue type implementation should emit a:
%% {settle, Success :: boolean(), msg_tag()}
@ -97,8 +88,7 @@
state :: queue_state()}).
-record(?STATE, {ctxs = #{} :: #{queue_ref() => #ctx{}},
monitor_registry = #{} :: #{pid() => queue_ref()}
-record(?STATE, {ctxs = #{} :: #{queue_name() => #ctx{}}
}).
-opaque state() :: #?STATE{}.
@ -245,12 +235,6 @@ feature_flag_name(_) ->
default() ->
rabbit_classic_queue.
%% is the queue type API supported in the cluster
is_supported() ->
%% the stream_queue feature enables
%% the queue_type internal message API
rabbit_feature_flags:is_enabled(stream_queue).
%% is a specific queue type implementation enabled
-spec is_enabled(module()) -> boolean().
is_enabled(Type) ->
@ -297,7 +281,7 @@ stat(Q) ->
Mod = amqqueue:get_type(Q),
Mod:stat(Q).
-spec remove(queue_ref(), state()) -> state().
-spec remove(queue_name(), state()) -> state().
remove(QRef, #?STATE{ctxs = Ctxs0} = State) ->
case maps:take(QRef, Ctxs0) of
error ->
@ -319,11 +303,6 @@ info(Q, Items) ->
fold_state(Fun, Acc, #?STATE{ctxs = Ctxs}) ->
maps:fold(Fun, Acc, Ctxs).
%% slight hack to help provide backwards compatibility in the channel
%% better than scanning the entire queue state
find_name_from_pid(Pid, #?STATE{monitor_registry = Mons}) ->
maps:get(Pid, Mons, undefined).
state_info(#ctx{state = S,
module = Mod}) ->
Mod:state_info(S);
@ -399,13 +378,13 @@ new(Q, State) when ?is_amqqueue(Q) ->
set_ctx(Q, Ctx, State).
-spec consume(amqqueue:amqqueue(), consume_spec(), state()) ->
{ok, state(), actions()} | {error, term()}.
{ok, state()} | {error, term()}.
consume(Q, Spec, State) ->
#ctx{state = CtxState0} = Ctx = get_ctx(Q, State),
Mod = amqqueue:get_type(Q),
case Mod:consume(Q, Spec, CtxState0) of
{ok, CtxState, Actions} ->
return_ok(set_ctx(Q, Ctx#ctx{state = CtxState}, State), Actions);
{ok, CtxState} ->
{ok, set_ctx(Q, Ctx#ctx{state = CtxState}, State)};
Err ->
Err
end.
@ -453,26 +432,20 @@ recover(VHost, Qs) ->
{R0 ++ R, F0 ++ F}
end, {[], []}, ByType).
-spec handle_down(pid(), term(), state()) ->
{ok, state(), actions()} | {eol, state(), queue_ref()} | {error, term()}.
handle_down(Pid, Info, #?STATE{monitor_registry = Reg0} = State0) ->
%% lookup queue ref in monitor registry
case maps:take(Pid, Reg0) of
{QRef, Reg} ->
case handle_event(QRef, {down, Pid, Info}, State0) of
{ok, State, Actions} ->
{ok, State#?STATE{monitor_registry = Reg}, Actions};
eol ->
{eol, State0#?STATE{monitor_registry = Reg}, QRef};
Err ->
Err
end;
error ->
{ok, State0, []}
-spec handle_down(pid(), queue_name(), term(), state()) ->
{ok, state(), actions()} | {eol, state(), queue_name()} | {error, term()}.
handle_down(Pid, QName, Info, State0) ->
case handle_event(QName, {down, Pid, QName, Info}, State0) of
{ok, State, Actions} ->
{ok, State, Actions};
eol ->
{eol, State0, QName};
Err ->
Err
end.
%% messages sent from queues
-spec handle_event(queue_ref(), term(), state()) ->
-spec handle_event(queue_name(), term(), state()) ->
{ok, state(), actions()} | eol | {error, term()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
handle_event(QRef, Evt, Ctxs) ->
@ -483,7 +456,7 @@ handle_event(QRef, Evt, Ctxs) ->
state = State0} = Ctx ->
case Mod:handle_event(Evt, State0) of
{ok, State, Actions} ->
return_ok(set_ctx(QRef, Ctx#ctx{state = State}, Ctxs), Actions);
{ok, set_ctx(QRef, Ctx#ctx{state = State}, Ctxs), Actions};
Err ->
Err
end;
@ -491,7 +464,7 @@ handle_event(QRef, Evt, Ctxs) ->
{ok, Ctxs, []}
end.
-spec module(queue_ref(), state()) ->
-spec module(queue_name(), state()) ->
{ok, module()} | {error, not_found}.
module(QRef, State) ->
%% events can arrive after a queue state has been cleared up
@ -515,7 +488,7 @@ deliver(Qs, Delivery, State) ->
end.
deliver0(Qs, Delivery, stateless) ->
_ = lists:map(fun(Q) ->
lists:foreach(fun(Q) ->
Mod = amqqueue:get_type(Q),
_ = Mod:deliver([{Q, stateless}], Delivery)
end, Qs),
@ -542,15 +515,13 @@ deliver0(Qs, Delivery, #?STATE{} = State0) ->
Ctx = get_ctx_with(Q, Acc, S),
set_ctx(qref(Q), Ctx#ctx{state = S}, Acc)
end, State0, Xs),
return_ok(State, Actions).
{ok, State, Actions}.
-spec settle(queue_ref(), settle_op(), rabbit_types:ctag(),
-spec settle(queue_name(), settle_op(), rabbit_types:ctag(),
[non_neg_integer()], state()) ->
{ok, state(), actions()} |
{'protocol_error', Type :: atom(), Reason :: string(), Args :: term()}.
settle(QRef, Op, CTag, MsgIds, Ctxs)
when ?QREF(QRef) ->
settle(#resource{kind = queue} = QRef, Op, CTag, MsgIds, Ctxs) ->
case get_ctx(QRef, Ctxs, undefined) of
undefined ->
%% if we receive a settlement and there is no queue state it means
@ -566,7 +537,7 @@ settle(QRef, Op, CTag, MsgIds, Ctxs)
end
end.
-spec credit(amqqueue:amqqueue() | queue_ref(),
-spec credit(amqqueue:amqqueue() | queue_name(),
rabbit_types:ctag(), non_neg_integer(),
boolean(), state()) -> {ok, state(), actions()}.
credit(Q, CTag, Credit, Drain, Ctxs) ->
@ -609,24 +580,20 @@ get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState)
_ when InitState == undefined ->
%% not found and no initial state passed - initialize new state
Mod = amqqueue:get_type(Q),
Name = amqqueue:get_name(Q),
case Mod:init(Q) of
{error, Reason} ->
exit({Reason, Ref});
{ok, QState} ->
#ctx{module = Mod,
name = Name,
state = QState}
end;
_ ->
%% not found - initialize with supplied initial state
Mod = amqqueue:get_type(Q),
Name = amqqueue:get_name(Q),
#ctx{module = Mod,
name = Name,
state = InitState}
end;
get_ctx_with(QRef, Contexts, undefined) when ?QREF(QRef) ->
get_ctx_with(#resource{kind = queue} = QRef, Contexts, undefined) ->
case get_ctx(QRef, Contexts, undefined) of
undefined ->
exit({queue_context_not_found, QRef});
@ -639,10 +606,6 @@ get_ctx(QRef, #?STATE{ctxs = Contexts}, Default) ->
%% if we use a QRef it should always be initialised
maps:get(Ref, Contexts, Default).
set_ctx(Q, Ctx, #?STATE{ctxs = Contexts} = State)
when ?is_amqqueue(Q) ->
Ref = qref(Q),
State#?STATE{ctxs = maps:put(Ref, Ctx, Contexts)};
set_ctx(QRef, Ctx, #?STATE{ctxs = Contexts} = State) ->
Ref = qref(QRef),
State#?STATE{ctxs = maps:put(Ref, Ctx, Contexts)}.
@ -651,27 +614,3 @@ qref(#resource{kind = queue} = QName) ->
QName;
qref(Q) when ?is_amqqueue(Q) ->
amqqueue:get_name(Q).
return_ok(State0, []) ->
{ok, State0, []};
return_ok(State0, Actions0) ->
{State, Actions} =
lists:foldl(
fun({monitor, Pid, QRef},
{#?STATE{monitor_registry = M0} = S0, A0}) ->
case M0 of
#{Pid := QRef} ->
%% already monitored by the qref
{S0, A0};
#{Pid := _} ->
%% TODO: allow multiple Qrefs to monitor the same pid
exit(return_ok_duplicate_monitored_pid);
_ ->
_ = erlang:monitor(process, Pid),
M = M0#{Pid => QRef},
{S0#?STATE{monitor_registry = M}, A0}
end;
(Act, {S, A0}) ->
{S, [Act | A0]}
end, {State0, []}, Actions0),
{ok, State, lists:reverse(Actions)}.

View File

@ -816,7 +816,7 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName, Prefetch,
Args, none, ActingUser),
{ok, QState, []};
{ok, QState};
{error, Error} ->
Error;
{timeout, _} ->
@ -831,7 +831,7 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName, Prefetch,
Args, none, ActingUser),
{ok, QState, []}
{ok, QState}
end.
cancel(_Q, ConsumerTag, OkMsg, _ActingUser, State) ->
@ -893,7 +893,7 @@ deliver(QSs, #delivery{message = #basic_message{content = Content0} = Msg,
case deliver(Confirm, Delivery, S0) of
{reject_publish, S} ->
Seq = Delivery#delivery.msg_seq_no,
QName = rabbit_fifo_client:cluster_name(S),
QName = rabbit_fifo_client:queue_name(S),
{[{Q, S} | Qs], [{rejected, QName, [Seq]} | Actions]};
{_, S} ->
{[{Q, S} | Qs], Actions}
@ -1325,18 +1325,8 @@ dlh(undefined, _, Strategy, _, QName) ->
"because dead-letter-exchange is not configured.",
[rabbit_misc:rs(QName), Strategy]),
undefined;
dlh(Exchange, RoutingKey, <<"at-least-once">>, reject_publish, QName) ->
%% Feature flag stream_queue includes the rabbit_queue_type refactor
%% which is required by rabbit_fifo_dlx_worker.
case rabbit_queue_type:is_supported() of
true ->
at_least_once;
false ->
rabbit_log:warning("Falling back to dead-letter-strategy at-most-once for ~ts "
"because feature flag stream_queue is disabled.",
[rabbit_misc:rs(QName)]),
dlh_at_most_once(Exchange, RoutingKey, QName)
end;
dlh(_, _, <<"at-least-once">>, reject_publish, _) ->
at_least_once;
dlh(Exchange, RoutingKey, <<"at-least-once">>, drop_head, QName) ->
rabbit_log:warning("Falling back to dead-letter-strategy at-most-once for ~ts "
"because configured dead-letter-strategy at-least-once is incompatible with "
@ -1593,7 +1583,7 @@ maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
queue_name(RaFifoState) ->
rabbit_fifo_client:cluster_name(RaFifoState).
rabbit_fifo_client:queue_name(RaFifoState).
get_default_quorum_initial_group_size(Arguments) ->
case rabbit_misc:table_lookup(Arguments, <<"x-quorum-initial-group-size">>) of

View File

@ -321,9 +321,8 @@ begin_stream(#stream_client{name = QName, readers = Readers0} = State0,
listening_offset = NextOffset,
log = Seg0,
max = Max},
Actions = [],
{ok, State#stream_client{local_pid = LocalPid,
readers = Readers0#{Tag => Str0}}, Actions}
readers = Readers0#{Tag => Str0}}}
end.
cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,

View File

@ -1616,12 +1616,11 @@ wait_for_confirms(Unconfirmed) ->
true -> ok;
false ->
receive
{'$gen_cast',
{queue_event, _QName, {confirm, Confirmed, _}}} ->
{'$gen_cast', {queue_event, _QName, {confirm, Confirmed, _}}} ->
wait_for_confirms(
sets:subtract(
Unconfirmed, sets:from_list(Confirmed)));
{'$gen_cast', {confirm, Confirmed, _}} ->
{'$gen_cast', {queue_event, QName, {confirm, Confirmed, _, QName}}} ->
wait_for_confirms(
sets:subtract(
Unconfirmed, sets:from_list(Confirmed)))

View File

@ -58,7 +58,7 @@
%% data pending delivery (between socket
%% flushes)
pending,
%% defines how ofter gc will be executed
%% defines how often gc will be executed
writer_gc_threshold
}).

View File

@ -11,8 +11,7 @@
-export([info/2, initial_state/2, initial_state/4,
process_frame/2, serialise/2, send_will/1,
terminate/1, handle_pre_hibernate/0,
handle_ra_event/2, handle_down/2, handle_queue_event/2,
handle_deprecated_delivery/2]).
handle_ra_event/2, handle_down/2, handle_queue_event/2]).
%%TODO Use single queue per MQTT subscriber connection?
%% * when publishing we store in x-mqtt-publish-qos header the publishing QoS
@ -958,7 +957,7 @@ consume(Q, QoS, #proc_state{
ok_msg => undefined,
acting_user => Username},
case rabbit_queue_type:consume(Q, Spec, QStates0) of
{ok, QStates, _Actions = []} ->
{ok, QStates} ->
% rabbit_global_counters:consumer_created(mqtt),
PState = PState0#proc_state{queue_states = QStates},
{ok, PState};
@ -1088,6 +1087,8 @@ publish_to_queues(
deliver_to_queues(Delivery,
RoutedToQNames,
PState0 = #proc_state{queue_states = QStates0}) ->
%% TODO only lookup fields that are needed using ets:select / match?
%% TODO Use ETS continuations to be more space efficient
Qs0 = rabbit_amqqueue:lookup(RoutedToQNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
case rabbit_queue_type:deliver(Qs, Delivery, QStates0) of
@ -1210,10 +1211,9 @@ handle_ra_event(Evt, PState) ->
rabbit_log:debug("unhandled ra_event: ~w ", [Evt]),
PState.
handle_down({'DOWN', _MRef, process, QPid, Reason},
handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason},
PState0 = #proc_state{queue_states = QStates0}) ->
%% spike handles only QoS0
case rabbit_queue_type:handle_down(QPid, Reason, QStates0) of
case rabbit_queue_type:handle_down(QPid, QName, Reason, QStates0) of
{ok, QStates1, Actions} ->
PState = PState0#proc_state{queue_states = QStates1},
handle_queue_actions(Actions, PState);
@ -1222,11 +1222,6 @@ handle_down({'DOWN', _MRef, process, QPid, Reason},
PState0#proc_state{queue_states = QStates}
end.
%% Handle deprecated delivery from classic queue. This function is to be
%% removed when feature flag classic_queue_type_delivery_support becomes required.
handle_deprecated_delivery({deliver, ?CONSUMER_TAG, AckRequired, Msg}, PState) ->
{ok, deliver_one_to_client(Msg, AckRequired, PState)}.
handle_queue_event({queue_event, QName, Evt},
PState0 = #proc_state{queue_states = QStates0,
unacked_client_pubs = U0}) ->

View File

@ -128,9 +128,6 @@ handle_cast(QueueEvent = {queue_event, _, _},
State = #state{proc_state = PState}) ->
callback_reply(State, rabbit_mqtt_processor:handle_queue_event(QueueEvent, PState));
handle_cast(Delivery = {deliver, _, _, _}, State = #state{proc_state = PState}) ->
callback_reply(State, rabbit_mqtt_processor:handle_deprecated_delivery(Delivery, PState));
handle_cast(Msg, State) ->
{stop, {mqtt_unexpected_cast, Msg}, State}.
@ -207,7 +204,7 @@ handle_info({ra_event, _From, Evt},
PState = rabbit_mqtt_processor:handle_ra_event(Evt, PState0),
{noreply, pstate(State, PState), ?HIBERNATE_AFTER};
handle_info({'DOWN', _MRef, process, _Pid, _Reason} = Evt,
handle_info({{'DOWN', _QName}, _MRef, process, _Pid, _Reason} = Evt,
#state{proc_state = PState0} = State) ->
PState = rabbit_mqtt_processor:handle_down(Evt, PState0),
{noreply, pstate(State, PState), ?HIBERNATE_AFTER};

View File

@ -36,9 +36,6 @@
keepalive :: rabbit_mqtt_keepalive:state()
}).
%%TODO move from deprecated callback results to new callback results
%% see cowboy_websocket.erl
%%TODO call rabbit_networking:register_non_amqp_connection/1 so that we are notified
%% when need to force load the 'connection_created' event for the management plugin, see
%% https://github.com/rabbitmq/rabbitmq-management-agent/issues/58
@ -170,10 +167,6 @@ websocket_info({'$gen_cast', QueueEvent = {queue_event, _, _}},
[State#state.conn_name, Reason]),
stop(State#state{proc_state = PState})
end;
websocket_info({'$gen_cast', Delivery = {deliver, _, _, _}},
State = #state{proc_state = PState0}) ->
{ok, PState} = rabbit_mqtt_processor:handle_deprecated_delivery(Delivery, PState0),
{[], State#state{proc_state = PState}, hibernate};
websocket_info({'$gen_cast', duplicate_id}, State = #state{ proc_state = ProcState,
conn_name = ConnName }) ->
rabbit_log_connection:warning("Web MQTT disconnecting a client with duplicate ID '~s' (~p)",

View File

@ -275,7 +275,9 @@ def rabbitmq_integration_suite(
# user_limits
# Starting from 3.12.0:
# feature_flags_v2
"RABBITMQ_FEATURE_FLAGS": "quorum_queue,implicit_default_bindings,virtual_host_metadata,maintenance_mode_status,user_limits,feature_flags_v2",
# stream_queue
# classic_queue_type_delivery_support
"RABBITMQ_FEATURE_FLAGS": "quorum_queue,implicit_default_bindings,virtual_host_metadata,maintenance_mode_status,user_limits,feature_flags_v2,stream_queue,classic_queue_type_delivery_support",
"RABBITMQ_RUN": "$TEST_SRCDIR/$TEST_WORKSPACE/{}/rabbitmq-for-tests-run".format(package),
"RABBITMQCTL": "$TEST_SRCDIR/$TEST_WORKSPACE/{}/broker-for-tests-home/sbin/rabbitmqctl".format(package),
"RABBITMQ_PLUGINS": "$TEST_SRCDIR/$TEST_WORKSPACE/{}/broker-for-tests-home/sbin/rabbitmq-plugins".format(package),