Reduce ETS copy overhead when delivering to target queues (#14570)

* Reduce ETS copy overhead when delivering to target queues

 ## What?
This commit avoids copying the full amqqueue record from ETS per incoming message
and target queue.
The amqqueue record contains 21 elements and for some queue types,
especially streams, some elements are themselves nested terms.

 ## How?

In Khepri, use a new `rabbit_khepri_queue_target` projection which
contains a subset of the full amqqueue record.

This way all relevant information to deliver to a target queue can be
looked up in a single ets:lookup_element call.

Alternative approaches are described in https://github.com/erlang/otp/issues/10211

 ## Benchmark

Fanout to 3 streams

Start broker:
```
make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test" \
    FULL=1 \
    RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \
    RABBITMQ_CONFIG_FILE="$HOME/scratch/rabbit/high-credit.config" \
    PLUGINS="rabbitmq_management"
```

`high-credit.config` contains:
```
[
 {rabbit, [
  %% Maximum incoming-window of AMQP 1.0 session.
  %% Default: 400
  {max_incoming_window, 5000},

  %% Maximum link-credit RabbitMQ grants to AMQP 1.0 sender.
  %% Default: 128
  {max_link_credit, 2000},

  %% Maximum link-credit RabbitMQ AMQP 1.0 session grants to sending queue.
  %% Default: 256
  {max_queue_credit, 5000},

  {loopback_users, []}
 ]},

 {rabbitmq_management_agent, [
  {disable_metrics_collector, true}
 ]}
].
```

Create the 3 streams and bindings to the fanout exchange:
```
deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3

```

Start the client:
```
quiver-arrow send //host.docker.internal//exchanges/amq.fanout --summary --count 1m --body-size 4
```

`main` branch:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 16.3 seconds
Message rate ......................................... 61,237 messages/s
```

with this PR:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 14.2 seconds
Message rate ......................................... 70,309 messages/s
```

Hence, this PR increases the throughput when sending to 3 streams via AMQP by ~14%.

* Avoid creating 5 elems tuple

* Simplify rabbit_queue_type callbacks

deliver should only take targets and init should only take the full record

* Fix flaky test

* Fix specs
This commit is contained in:
David Ansari 2025-09-25 11:25:09 +02:00 committed by GitHub
parent ccd384c974
commit 2e75bc6eb5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 252 additions and 178 deletions

View File

@ -14,6 +14,7 @@
new/9,
new_with_version/9,
new_with_version/10,
new_target/2,
fields/0,
fields/1,
field_vhost/0,
@ -39,6 +40,7 @@
% options
get_options/1,
set_options/2,
get_extra_bcc/1,
% pid
get_pid/1,
set_pid/2,
@ -77,13 +79,15 @@
qnode/1,
to_printable/1,
to_printable/2,
macros/0]).
macros/0
]).
-define(record_version, amqqueue_v2).
-define(is_backwards_compat_classic(T),
(T =:= classic orelse T =:= ?amqqueue_v1_type)).
-type amqqueue_options() :: map() | ets:match_pattern().
-type extra_bcc() :: rabbit_misc:resource_name() | none.
-record(amqqueue, {
%% immutable
@ -120,6 +124,17 @@
type_state = #{} :: map() | ets:match_pattern()
}).
%% A subset of the amqqueue record containing just the necessary fields
%% to deliver a message to a target queue.
-record(queue_target,
{name :: rabbit_amqqueue:name(),
target :: {rabbit_queue_type:queue_type(),
pid() | ra_server_id() | none,
extra_bcc()}
}).
-opaque target() :: #queue_target{}.
-type amqqueue() :: amqqueue_v2().
-type amqqueue_v2() :: #amqqueue{
name :: rabbit_amqqueue:name(),
@ -175,6 +190,7 @@
amqqueue_v2/0,
amqqueue_pattern/0,
amqqueue_v2_pattern/0,
target/0,
ra_server_id/0]).
-spec new(rabbit_amqqueue:name(),
@ -328,6 +344,15 @@ new_with_version(?record_version,
options = Options,
type = ensure_type_compat(Type)}.
-spec new_target(rabbit_amqqueue:name(),
{rabbit_queue_type:queue_type(),
pid() | ra_server_id() | none,
extra_bcc()}) ->
target().
new_target(Name, Target) when tuple_size(Target) =:= 3 ->
#queue_target{name = Name,
target = Target}.
-spec is_amqqueue(any()) -> boolean().
is_amqqueue(#amqqueue{}) -> true.
@ -361,15 +386,21 @@ set_arguments(#amqqueue{} = Queue, Args) ->
% options
-spec get_options(amqqueue()) -> amqqueue_options().
get_options(#amqqueue{options = Options}) ->
Options.
-spec set_options(amqqueue(), amqqueue_options()) -> amqqueue().
set_options(#amqqueue{} = Queue, Options) ->
Queue#amqqueue{options = Options}.
-spec get_extra_bcc(amqqueue() | target()) ->
extra_bcc().
get_extra_bcc(#amqqueue{options = #{extra_bcc := ExtraBcc}}) ->
ExtraBcc;
get_extra_bcc(#amqqueue{}) ->
none;
get_extra_bcc(#queue_target{target = {_Type, _Pid, ExtraBcc}}) ->
ExtraBcc.
% decorators
@ -418,9 +449,10 @@ set_operator_policy(#amqqueue{} = Queue, Policy) ->
% name
-spec get_name(amqqueue()) -> rabbit_amqqueue:name().
-spec get_name(amqqueue() | target()) -> rabbit_amqqueue:name().
get_name(#amqqueue{name = Name}) -> Name.
get_name(#amqqueue{name = Name}) -> Name;
get_name(#queue_target{name = Name}) -> Name.
-spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue().
@ -429,9 +461,10 @@ set_name(#amqqueue{} = Queue, Name) ->
% pid
-spec get_pid(amqqueue_v2()) -> pid() | ra_server_id() | none.
-spec get_pid(amqqueue_v2() | target()) -> pid() | ra_server_id() | none.
get_pid(#amqqueue{pid = Pid}) -> Pid.
get_pid(#amqqueue{pid = Pid}) -> Pid;
get_pid(#queue_target{target = {_Type, Pid, _ExtraBcc}}) -> Pid.
-spec set_pid(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2().
@ -488,9 +521,10 @@ set_state(#amqqueue{} = Queue, State) ->
%% New in v2.
-spec get_type(amqqueue()) -> atom().
-spec get_type(amqqueue() | target()) -> atom().
get_type(#amqqueue{type = Type}) -> Type.
get_type(#amqqueue{type = Type}) -> Type;
get_type(#queue_target{target = {Type, _Pid, _ExtraBcc}}) -> Type.
-spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined.
@ -629,8 +663,6 @@ to_printable(QName = #resource{name = Name, virtual_host = VHost}, Type) ->
<<"virtual_host">> => VHost,
<<"type">> => Type}.
% private
macros() ->
io:format(
"-define(is_~ts(Q), is_record(Q, amqqueue, ~b)).~n~n",

View File

@ -2528,7 +2528,7 @@ incoming_link_transfer(
QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}),
rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace),
Opts = #{correlation => {HandleInt, DeliveryId}},
Qs0 = rabbit_amqqueue:lookup_many(QNames),
Qs0 = rabbit_db_queue:get_targets(QNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
Mc = ensure_mc_cluster_compat(Mc2),
case rabbit_queue_type:deliver(Qs, Mc, Opts, QStates0) of
@ -2674,8 +2674,8 @@ process_routing_confirm([], _SenderSettles = false, DeliveryId, U) ->
Disposition = released(DeliveryId),
{U, [Disposition]};
process_routing_confirm([_|_] = Qs, SenderSettles, DeliveryId, U0) ->
QNames = rabbit_amqqueue:queue_names(Qs),
false = maps:is_key(DeliveryId, U0),
QNames = rabbit_amqqueue:queue_names(Qs),
Map = maps:from_keys(QNames, ok),
U = U0#{DeliveryId => {Map, SenderSettles, false}},
rabbit_global_counters:messages_routed(?PROTOCOL, map_size(Map)),

View File

@ -11,7 +11,7 @@
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
forget_all_durable/1]).
-export([pseudo_queue/2, pseudo_queue/3]).
-export([exists/1, lookup/1, lookup/2, lookup_many/1, lookup_durable_queue/1,
-export([exists/1, lookup/1, lookup/2, lookup_durable_queue/1,
not_found_or_absent_dirty/1,
with/2, with/3, with_or_die/2,
assert_equivalence/5,
@ -367,14 +367,6 @@ lookup(Name) when is_record(Name, resource) ->
lookup_durable_queue(QName) ->
rabbit_db_queue:get_durable(QName).
-spec lookup_many(rabbit_exchange:route_return()) ->
[amqqueue:amqqueue() | {amqqueue:amqqueue(), route_infos()}].
lookup_many([]) ->
%% optimisation
[];
lookup_many(Names) when is_list(Names) ->
rabbit_db_queue:get_many(Names).
-spec lookup(binary(), binary()) ->
rabbit_types:ok(amqqueue:amqqueue()) |
rabbit_types:error('not_found').
@ -2051,68 +2043,57 @@ get_quorum_nodes(Q) ->
end.
-spec prepend_extra_bcc(Qs) ->
Qs when Qs :: [amqqueue:amqqueue() |
{amqqueue:amqqueue(), route_infos()}].
Qs when Qs :: [amqqueue:target() | {amqqueue:target(), route_infos()}].
prepend_extra_bcc([]) ->
[];
prepend_extra_bcc([Q0] = Qs) ->
Q = queue(Q0),
case amqqueue:get_options(Q) of
#{extra_bcc := BCCName} ->
case get_bcc_queue(Q, BCCName) of
{ok, BCCQueue} ->
[BCCQueue | Qs];
{error, not_found} ->
Qs
end;
_ ->
Qs
case amqqueue:get_extra_bcc(Q) of
none ->
Qs;
Name ->
lookup_extra_bcc(Q, Name) ++ Qs
end;
prepend_extra_bcc(Qs) ->
BCCQueues =
lists:filtermap(
ExtraQs = lists:filtermap(
fun(Q0) ->
Q = queue(Q0),
case amqqueue:get_options(Q) of
#{extra_bcc := BCCName} ->
case get_bcc_queue(Q, BCCName) of
{ok, BCCQ} ->
{true, BCCQ};
{error, not_found} ->
false
end;
_ ->
case amqqueue:get_extra_bcc(Q) of
none ->
false;
Name ->
case lookup_extra_bcc(Q, Name) of
[ExtraQ] ->
{true, ExtraQ};
[] ->
false
end
end
end, Qs),
lists:usort(BCCQueues) ++ Qs.
lists:usort(ExtraQs) ++ Qs.
-spec queue(Q | {Q, route_infos()}) ->
Q when Q :: amqqueue:amqqueue().
queue(Q)
when ?is_amqqueue(Q) ->
Q when Q :: amqqueue:target().
queue({Q, RouteInfos}) when is_map(RouteInfos) ->
Q;
queue({Q, RouteInfos})
when ?is_amqqueue(Q) andalso is_map(RouteInfos) ->
queue(Q) ->
Q.
-spec queue_names([Q | {Q, route_infos()}]) ->
[name()] when Q :: amqqueue:amqqueue().
queue_names(Queues)
when is_list(Queues) ->
lists:map(fun(Q) when ?is_amqqueue(Q) ->
[name()] when Q :: amqqueue:target().
queue_names(Queues) ->
lists:map(fun({Q, RouteInfos}) when is_map(RouteInfos) ->
amqqueue:get_name(Q);
({Q, RouteInfos})
when ?is_amqqueue(Q) andalso is_map(RouteInfos) ->
(Q) ->
amqqueue:get_name(Q)
end, Queues).
-spec get_bcc_queue(amqqueue:amqqueue(), binary()) ->
{ok, amqqueue:amqqueue()} | {error, not_found}.
get_bcc_queue(Q, BCCName) ->
-spec lookup_extra_bcc(amqqueue:target(), binary()) ->
[amqqueue:target()].
lookup_extra_bcc(Q, BCCName) ->
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
BCCQueueName = rabbit_misc:r(VHost, queue, BCCName),
lookup(BCCQueueName).
rabbit_db_queue:get_targets([BCCQueueName]).
is_queue_args_combination_permitted(Q) ->
Durable = amqqueue:is_durable(Q),

View File

@ -1222,7 +1222,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_user_id_header(Message0, User),
Message = rabbit_msg_interceptor:intercept_incoming(Message0, MsgIcptCtx),
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
Queues = rabbit_amqqueue:lookup_many(QNames),
Queues = rabbit_db_queue:get_targets(QNames),
rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum,
Username, TraceState),
%% TODO: call delivery_to_queues with plain args
@ -2126,7 +2126,12 @@ deliver_to_queues(XName,
rabbit_misc:protocol_error(
resource_error,
"Stream coordinator unavailable for ~ts",
[rabbit_misc:rs(Resource)])
[rabbit_misc:rs(Resource)]);
{error, Reason} ->
rabbit_misc:protocol_error(
resource_error,
"failed to deliver message: ~tp",
[Reason])
end.
process_routing_mandatory(_Mandatory = true,

View File

@ -295,8 +295,8 @@ init(Q) when ?amqqueue_is_classic(Q) ->
close(_State) ->
ok.
-spec update(amqqueue:amqqueue(), state()) -> state().
update(Q, #?STATE{pid = Pid} = State) when ?amqqueue_is_classic(Q) ->
-spec update(amqqueue:amqqueue() | amqqueue:target(), state()) -> state().
update(Q, #?STATE{pid = Pid} = State) ->
case amqqueue:get_pid(Q) of
Pid ->
State;
@ -473,10 +473,10 @@ settlement_action(Type, QRef, MsgSeqs, Acc) ->
supports_stateful_delivery() -> true.
-spec deliver([{amqqueue:amqqueue(), state()}],
-spec deliver([{amqqueue:target(), state()}],
Delivery :: mc:state(),
rabbit_queue_type:delivery_options()) ->
{[{amqqueue:amqqueue(), state()}], rabbit_queue_type:actions()}.
{[{amqqueue:target(), state()}], rabbit_queue_type:actions()}.
deliver(Qs0, Msg0, Options) ->
%% add guid to content here instead of in rabbit_basic:message/3,
%% as classic queues are the only ones that need it

View File

@ -18,7 +18,7 @@
-export([
get/1,
get_many/1,
get_targets/1,
get_all/0,
get_all/1,
get_all_by_type/1,
@ -85,6 +85,7 @@
-define(MNESIA_DURABLE_TABLE, rabbit_durable_queue).
-define(KHEPRI_PROJECTION, rabbit_khepri_queue).
-define(KHEPRI_TARGET_PROJECTION, rabbit_khepri_queue_target).
%% -------------------------------------------------------------------
%% get_all().
@ -469,58 +470,62 @@ internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) ->
rabbit_db_binding:delete_for_destination_in_mnesia(QueueName, OnlyDurable).
%% -------------------------------------------------------------------
%% get_many().
%% get_targets().
%% -------------------------------------------------------------------
-spec get_many(rabbit_exchange:route_return()) ->
[amqqueue:amqqueue() | {amqqueue:amqqueue(), rabbit_exchange:route_infos()}].
get_many(Names) when is_list(Names) ->
-spec get_targets(rabbit_exchange:route_return()) ->
[amqqueue:target() | {amqqueue:target(), rabbit_exchange:route_infos()}].
get_targets(Names) ->
rabbit_khepri:handle_fallback(
#{mnesia => fun() -> get_many_in_ets(?MNESIA_TABLE, Names) end,
khepri => fun() -> get_many_in_khepri(Names) end
#{mnesia => fun() -> lookup_targets(mnesia, Names) end,
khepri => fun() -> lookup_targets(khepri, Names) end
}).
get_many_in_khepri(Names) ->
try
get_many_in_ets(?KHEPRI_PROJECTION, Names)
catch
error:badarg ->
[]
end.
get_many_in_ets(Table, [{Name, RouteInfos}])
when is_map(RouteInfos) ->
case ets_lookup(Table, Name) of
[] -> [];
[Q] -> [{Q, RouteInfos}]
end;
get_many_in_ets(Table, [Name]) ->
ets_lookup(Table, Name);
get_many_in_ets(Table, Names) when is_list(Names) ->
lookup_targets(Store, Names) ->
lists:filtermap(fun({Name, RouteInfos})
when is_map(RouteInfos) ->
case ets_lookup(Table, Name) of
[] -> false;
[Q] -> {true, {Q, RouteInfos}}
case lookup_target(Store, Name) of
not_found -> false;
Target -> {true, {Target, RouteInfos}}
end;
(Name) ->
case ets_lookup(Table, Name) of
[] -> false;
[Q] -> {true, Q}
case lookup_target(Store, Name) of
not_found -> false;
Target -> {true, Target}
end
end, Names).
ets_lookup(Table, QName = #resource{name = QNameBin}) ->
case rabbit_volatile_queue:is(QNameBin) of
lookup_target(Store, #resource{name = NameBin} = Name) ->
case rabbit_volatile_queue:is(NameBin) of
true ->
%% This queue record is not stored in the database.
%% We create it on the fly.
case rabbit_volatile_queue:new(QName) of
error -> [];
Q -> [Q]
%% This queue is not stored in the database. We create it on the fly.
case rabbit_volatile_queue:new_target(Name) of
error -> not_found;
Target -> Target
end;
false ->
ets:lookup(Table, QName)
lookup_target0(Store, Name)
end.
lookup_target0(khepri, Name) ->
try ets:lookup_element(?KHEPRI_TARGET_PROJECTION, Name, 2, not_found) of
not_found ->
not_found;
Target ->
amqqueue:new_target(Name, Target)
catch
error:badarg ->
not_found
end;
lookup_target0(mnesia, Name) ->
case ets:lookup(?MNESIA_TABLE, Name) of
[] ->
not_found;
[Q] ->
Type = amqqueue:get_type(Q),
Pid = amqqueue:get_pid(Q),
ExtraBcc = amqqueue:get_extra_bcc(Q),
amqqueue:new_target(Name, {Type, Pid, ExtraBcc})
end.
%% -------------------------------------------------------------------
@ -610,6 +615,14 @@ get_many_durable_in_khepri(Names) ->
[]
end.
get_many_in_ets(Table, Names) ->
lists:filtermap(fun(Name) ->
case ets:lookup(Table, Name) of
[] -> false;
[Q] -> {true, Q}
end
end, Names).
%% -------------------------------------------------------------------
%% update().
%% -------------------------------------------------------------------

View File

@ -44,7 +44,7 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK,
Routed0 = rabbit_exchange:route(DLX, DLMsg, #{return_binding_keys => true}),
{Cycles, Routed} = detect_cycles(Reason, DLMsg, Routed0),
lists:foreach(fun log_cycle_once/1, Cycles),
Qs0 = rabbit_amqqueue:lookup_many(Routed),
Qs0 = rabbit_db_queue:get_targets(Routed),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
_ = rabbit_queue_type:deliver(Qs, DLMsg, #{}, stateless),
ok.

View File

@ -342,7 +342,7 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason,
{Cycles, RouteToQs1} = rabbit_dead_letter:detect_cycles(
Reason, Msg, RouteToQs0),
State1 = log_cycles(Cycles, [RKey], State0),
RouteToQs2 = rabbit_amqqueue:lookup_many(RouteToQs1),
RouteToQs2 = rabbit_db_queue:get_targets(RouteToQs1),
RouteToQs = rabbit_amqqueue:prepend_extra_bcc(RouteToQs2),
State2 = case RouteToQs of
[] ->
@ -496,7 +496,7 @@ redeliver0(#pending{delivery = Msg0,
%% queues that do not exist. Therefore, filter out non-existent target queues.
RouteToQs0 = queue_names(
rabbit_amqqueue:prepend_extra_bcc(
rabbit_amqqueue:lookup_many(
rabbit_db_queue:get_targets(
rabbit_exchange:route(DLX, Msg)))),
case {RouteToQs0, Settled} of
{[], [_|_]} ->
@ -529,7 +529,10 @@ redeliver0(#pending{delivery = Msg0,
rejected = []},
State = State0#state{pendings = maps:update(OutSeq, Pend, Pendings)},
Options = #{correlation => OutSeq},
deliver_to_queues(Msg, Options, rabbit_amqqueue:lookup_many(RouteToQs), State)
deliver_to_queues(Msg,
Options,
rabbit_db_queue:get_targets(RouteToQs),
State)
end
end.
@ -569,8 +572,7 @@ cancel_timer(#state{timer = TRef} = State)
cancel_timer(State) ->
State.
queue_names(Qs)
when is_list(Qs) ->
queue_names(Qs) ->
lists:map(fun amqqueue:get_name/1, Qs).
format_status(#{state := #state{

View File

@ -1310,6 +1310,7 @@ delete_or_fail(Path) ->
register_projections() ->
RegFuns = [fun register_rabbit_exchange_projection/0,
fun register_rabbit_queue_projection/0,
fun register_rabbit_queue_target_projection/0,
fun register_rabbit_vhost_projection/0,
fun register_rabbit_users_projection/0,
fun register_rabbit_global_runtime_parameters_projection/0,
@ -1351,7 +1352,25 @@ register_rabbit_queue_projection() ->
_VHost = ?KHEPRI_WILDCARD_STAR,
_Name = ?KHEPRI_WILDCARD_STAR),
KeyPos = 2, %% #amqqueue.name
register_simple_projection(Name, PathPattern, KeyPos, true).
register_simple_projection(Name, PathPattern, KeyPos, false).
%% This projection exists to avoid looking up the full amqqueue record
%% per message delivered to a target queue.
register_rabbit_queue_target_projection() ->
PathPattern = rabbit_db_queue:khepri_queue_path(
_VHost = ?KHEPRI_WILDCARD_STAR,
_Name = ?KHEPRI_WILDCARD_STAR),
Fun = fun(_Path, Q) ->
Name = amqqueue:get_name(Q),
Type = amqqueue:get_type(Q),
Pid = amqqueue:get_pid(Q),
ExtraBcc = amqqueue:get_extra_bcc(Q),
{Name, {Type, Pid, ExtraBcc}}
end,
Opts = #{keypos => 1,
read_concurrency => true},
Projection = khepri_projection:new(rabbit_khepri_queue_target, Fun, Opts),
khepri:register_projection(?STORE_ID, PathPattern, Projection).
register_rabbit_vhost_projection() ->
Name = rabbit_khepri_vhost,

View File

@ -208,8 +208,9 @@
{ok, queue_state()} | {error, Reason :: term()}.
-callback close(queue_state()) -> ok.
%% update the queue type state from amqqrecord
-callback update(amqqueue:amqqueue(), queue_state()) -> queue_state().
-callback update(amqqueue:amqqueue() | amqqueue:target(), queue_state()) ->
queue_state().
-callback consume(amqqueue:amqqueue(),
consume_spec(),
@ -232,10 +233,10 @@
-callback supports_stateful_delivery() -> boolean().
-callback deliver([{amqqueue:amqqueue(), queue_state()}],
-callback deliver([{amqqueue:target(), queue_state()}],
Message :: mc:state(),
Options :: delivery_options()) ->
{[{amqqueue:amqqueue(), queue_state()}], actions()}.
{[{amqqueue:target(), queue_state()}], actions()}.
-callback settle(queue_name(), settle_op(), rabbit_types:ctag(),
[non_neg_integer()], queue_state()) ->
@ -621,12 +622,12 @@ publish_at_most_once(#resource{} = XName, Msg) ->
publish_at_most_once(X, Msg)
when element(1, X) == exchange -> % hacky but good enough
QNames = rabbit_exchange:route(X, Msg, #{return_binding_keys => true}),
Qs = rabbit_amqqueue:lookup_many(QNames),
Qs = rabbit_db_queue:get_targets(QNames),
_ = deliver(Qs, Msg, #{}, stateless),
ok.
-spec deliver([amqqueue:amqqueue() |
{amqqueue:amqqueue(), rabbit_exchange:route_infos()}],
-spec deliver([amqqueue:target() |
{amqqueue:target(), rabbit_exchange:route_infos()}],
Message :: mc:state(),
delivery_options(),
stateless | state()) ->
@ -688,14 +689,13 @@ deliver0(Qs, Message0, Options, #?STATE{} = State0) ->
end, State0, Xs),
{ok, State, Actions}.
queue_binding_keys(Q)
when ?is_amqqueue(Q) ->
{Q, #{}};
queue_binding_keys({Q, #{binding_keys := BindingKeys}})
when ?is_amqqueue(Q) andalso is_map(BindingKeys) ->
when is_map(BindingKeys) ->
{Q, BindingKeys};
queue_binding_keys({Q, _RouteInfos})
when ?is_amqqueue(Q) ->
queue_binding_keys({Q, RouteInfos})
when is_map(RouteInfos) ->
{Q, #{}};
queue_binding_keys(Q) ->
{Q, #{}}.
add_binding_keys(Message, BindingKeys)
@ -775,9 +775,15 @@ removed_from_rabbit_registry(_Type) -> ok.
get_ctx(QOrQref, State) ->
get_ctx_with(QOrQref, State, undefined).
get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState)
when ?is_amqqueue(Q) ->
Ref = qref(Q),
get_ctx_with(#resource{kind = queue} = QRef, Contexts, undefined) ->
case get_ctx(QRef, Contexts, undefined) of
undefined ->
exit({queue_context_not_found, QRef});
Ctx ->
Ctx
end;
get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState) ->
Ref = amqqueue:get_name(Q),
case Contexts of
#{Ref := #ctx{module = Mod,
state = State} = Ctx} ->
@ -785,25 +791,20 @@ get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState)
_ when InitState == undefined ->
%% not found and no initial state passed - initialize new state
Mod = amqqueue:get_type(Q),
case Mod:init(Q) of
{error, Reason} ->
exit({Reason, Ref});
{ok, QState} ->
maybe
{ok, Q1} ?= to_queue(Q),
{ok, QState} ?= Mod:init(Q1),
#ctx{module = Mod,
state = QState}
else
{error, Reason} ->
exit({Reason, Ref})
end;
_ ->
%% not found - initialize with supplied initial state
Mod = amqqueue:get_type(Q),
#ctx{module = Mod,
state = InitState}
end;
get_ctx_with(#resource{kind = queue} = QRef, Contexts, undefined) ->
case get_ctx(QRef, Contexts, undefined) of
undefined ->
exit({queue_context_not_found, QRef});
Ctx ->
Ctx
end.
get_ctx(QRef, #?STATE{ctxs = Contexts}, Default) ->
@ -817,9 +818,15 @@ set_ctx(QRef, Ctx, #?STATE{ctxs = Contexts} = State) ->
qref(#resource{kind = queue} = QName) ->
QName;
qref(Q) when ?is_amqqueue(Q) ->
qref(Q) ->
amqqueue:get_name(Q).
to_queue(Q) when ?is_amqqueue(Q) ->
{ok, Q};
to_queue(Target) ->
QName = amqqueue:get_name(Target),
rabbit_amqqueue:lookup(QName).
-spec known_queue_type_modules() -> [module()].
known_queue_type_modules() ->
Registered = rabbit_registry:lookup_all(queue),

View File

@ -234,10 +234,10 @@ init(Q) when ?is_amqqueue(Q) ->
close(State) ->
rabbit_fifo_client:close(State).
-spec update(amqqueue:amqqueue(), rabbit_fifo_client:state()) ->
-spec update(amqqueue:amqqueue() | amqqueue:target(), rabbit_fifo_client:state()) ->
rabbit_fifo_client:state().
update(Q, State) when ?amqqueue_is_quorum(Q) ->
%% QQ state maintains it's own updates
update(_Q, State) ->
%% QQ state maintains its own updates
State.
-spec handle_event(rabbit_amqqueue:name(),
@ -1140,7 +1140,6 @@ deliver(QSs, Msg0, Options) ->
end
end, {[], []}, QSs).
state_info(S) ->
#{pending_raft_commands => rabbit_fifo_client:pending_size(S),
cached_segments => rabbit_fifo_client:num_cached_segments(S)}.

View File

@ -169,12 +169,8 @@ restart_stream(QRes) ->
{ok, node()} |
{error, term()} |
{timeout, term()}.
restart_stream(QRes, Options)
when element(1, QRes) == resource ->
restart_stream(hd(rabbit_amqqueue:lookup_many([QRes])), Options);
restart_stream(Q, Options)
when ?is_amqqueue(Q) andalso
?amqqueue_is_stream(Q) ->
when ?amqqueue_is_stream(Q) ->
?LOG_INFO("restarting stream ~s in vhost ~s with options ~p",
[maps:get(name, amqqueue:get_type_state(Q)), amqqueue:get_vhost(Q), Options]),
#{name := StreamId} = amqqueue:get_type_state(Q),
@ -183,6 +179,13 @@ restart_stream(Q, Options)
{ok, node(LeaderPid)};
Err ->
Err
end;
restart_stream(QRes, Options) ->
case rabbit_amqqueue:lookup(QRes) of
{ok, Q} ->
restart_stream(Q, Options);
Err ->
Err
end.
delete_stream(Q, ActingUser)

View File

@ -1024,8 +1024,7 @@ close(#stream_client{readers = Readers,
rabbit_core_metrics:consumer_deleted(self(), CTag, QName)
end, Readers).
update(Q, State)
when ?is_amqqueue(Q) ->
update(_Q, State) ->
State.
update_leader_pid(Pid, #stream_client{leader = Pid} = State) ->

View File

@ -16,6 +16,7 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-export([new/1,
new_target/1,
new_name/0,
is/1,
key_from_name/1,
@ -94,6 +95,16 @@ new(#resource{virtual_host = Vhost,
new0(Name, Pid, Vhost) ->
amqqueue:new(Name, Pid, false, true, none, [], Vhost, #{}, ?MODULE).
-spec new_target(rabbit_amqqueue:name()) ->
amqqueue:target() | error.
new_target(#resource{name = NameBin} = Name) ->
case pid_from_name(NameBin) of
{ok, Pid} when is_pid(Pid) ->
amqqueue:new_target(Name, {?MODULE, Pid, none});
_ ->
error
end.
-spec is(rabbit_misc:resource_name()) ->
boolean().
is(<<?PREFIX, _/binary>>) ->

View File

@ -31,7 +31,7 @@ all_tests() ->
[
create_or_get,
get,
get_many,
get_targets,
get_all,
get_all_by_vhost,
get_all_by_type,
@ -131,20 +131,24 @@ get1(_Config) ->
rabbit_db_queue:get(rabbit_misc:r(?VHOST, queue, <<"test-queue2">>))),
passed.
get_many(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_many1, [Config]).
get_targets(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_targets1, [Config]).
get_many1(_Config) ->
get_targets1(_Config) ->
QName = rabbit_misc:r(?VHOST, queue, <<"test-queue">>),
QName2 = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
Q = new_queue(QName, rabbit_classic_queue),
Q2 = new_queue(QName2, rabbit_classic_queue),
ok = rabbit_db_queue:set(Q),
?assertEqual([Q], rabbit_db_queue:get_many([QName])),
?assertEqual([Q], rabbit_db_queue:get_many([QName, QName2])),
?assertEqual([], rabbit_db_queue:get_many([QName2])),
Target = {rabbit_classic_queue, none, none},
QTarget = amqqueue:new_target(QName, Target),
QTarget2 = amqqueue:new_target(QName2, Target),
?assertEqual([QTarget], rabbit_db_queue:get_targets([QName])),
?assertEqual([QTarget], rabbit_db_queue:get_targets([QName, QName2])),
?assertEqual([], rabbit_db_queue:get_targets([QName2])),
ok = rabbit_db_queue:set(Q2),
?assertEqual(lists:sort([Q, Q2]), lists:sort(rabbit_db_queue:get_many([QName, QName2]))),
?assertEqual(lists:sort([QTarget, QTarget2]),
lists:sort(rabbit_db_queue:get_targets([QName, QName2]))),
passed.
get_all(Config) ->

View File

@ -214,7 +214,7 @@ queue_on_other_node(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
{ok, Chan} = amqp_connection:open_channel(Conn),
_ = queue_declare(Chan, <<"some-queue">>),
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
eventually(?_assertEqual(1, length(http_get(Config, "/queues/%2F"))), 2000, 5),
{ok, Chan2} = amqp_connection:open_channel(?config(conn, Config)),
consume(Chan2, <<"some-queue">>),
@ -899,7 +899,6 @@ wait_for_queue(Config, Path, Keys) ->
wait_for_queue(_Config, Path, Keys, 0) ->
exit({timeout, {Path, Keys}});
wait_for_queue(Config, Path, Keys, Count) ->
Res = http_get(Config, Path),
case present(Keys, Res) of

View File

@ -1697,7 +1697,7 @@ deliver_to_queues(Message,
RoutedToQNames,
State0 = #state{queue_states = QStates0,
cfg = #cfg{proto_ver = ProtoVer}}) ->
Qs0 = rabbit_amqqueue:lookup_many(RoutedToQNames),
Qs0 = rabbit_db_queue:get_targets(RoutedToQNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
case rabbit_queue_type:deliver(Qs, Message, Options, QStates0) of
{ok, QStates, Actions} ->

View File

@ -133,7 +133,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) ->
supports_stateful_delivery() ->
false.
-spec deliver([{amqqueue:amqqueue(), stateless}],
-spec deliver([{amqqueue:target(), stateless}],
Msg :: mc:state(),
rabbit_queue_type:delivery_options()) ->
{[], rabbit_queue_type:actions()}.

View File

@ -91,12 +91,12 @@ add_binding(_Tx, #exchange{ name = XName },
{ok, X} ->
Msgs = get_msgs_from_cache(XName),
[begin
Qs = rabbit_exchange:route(X, Msg),
case rabbit_amqqueue:lookup_many(Qs) of
QNames = rabbit_exchange:route(X, Msg),
case rabbit_db_queue:get_targets(QNames) of
[] ->
destination_not_found_error(Qs);
QPids ->
deliver_messages(QPids, [Msg])
destination_not_found_error(QNames);
Qs ->
deliver_messages(Qs, [Msg])
end
end || Msg <- Msgs]
end,

View File

@ -382,7 +382,7 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} =
end,
Msg = set_annotations(Msg0, Dest),
RoutedQNames = route(Msg, Dest),
Queues = rabbit_amqqueue:lookup_many(RoutedQNames),
Queues = rabbit_db_queue:get_targets(RoutedQNames),
messages_received(AckMode),
case rabbit_queue_type:deliver(Queues, Msg, Options, QState) of
{ok, QState1, Actions} ->