Support Direct Reply-To for AMQP 1.0

# What?
* Support Direct Reply-To for AMQP 1.0
* Compared to AMQP 0.9.1, this PR allows for multiple volatile queues on a single
  AMQP 1.0 session. Use case: JMS clients can create multiple temporary queues on
  the same JMS/AMQP session:
  * https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/session#createTemporaryQueue()
  * https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/jmscontext#createTemporaryQueue()
* Fix missing metrics in for Direct Reply-To in AMQP 0.9.1, e.g.
  `messages_delivered_total`
* Fix missing metrics (even without using Direct Reply-To ) in AMQP 0.9.1:
  If stats level is not `fine`, global metrics `rabbitmq_global_messages_delivered_*` should still be incremented.

 # Why?
* Allow for scalable at-most-once RPC reply delivery
  Example use case: thousands of requesters connect, send a single
  request, wait for a single reply, and disconnect.
  This PR won't create any queue and won't write to the metadata store.
  Therefore, there's less pressure on the metadata store, less pressure
  on the Management API when listing all queues, less pressure on the
  metrics subsystem, etc.
* Feature parity with AMQP 0.9.1

 # How?
This PR extracts the previously channel specific Direct Reply-To code
into a new queue type: `rabbit_volatile_queue`.
"Volatile" describes the semantics, not a use-case. It signals non-durable,
zero-buffer, at-most-once, may-drop, and "not stored in Khepri."

This new queue type is then used for AMQP 1.0 and AMQP 0.9.1.

Sending to the volatile queue is stateless like previously with Direct Reply-To in AMQP 0.9.1 and like done
for the MQTT QoS 0 queue.
This allows for use cases where a single responder replies to e.g. 100k different requesters.

RabbitMQ will automatically auto grant new link-credit to the responder because the new queue type confirms immediately.

The key gets implicitly checked by the channel/session:
If the queue name (including the key) doesn’t exist, the `handle_event` callback for this queue isn’t invoked and therefore
no delivery will be sent to the responder.

This commit supports Direct Reply-To across AMQP 1.0 and 0.9.1. In other
words, the requester can be an AMQP 1.0 client while the responder is an
AMQP 0.9.1 client or vice versa.
RabbitMQ will internally convert between AMQP 0.9.1 `reply_to` and AMQP
1.0 `/queues/<queue>` address. The AMQP 0.9.1 `reply_to` property is
expected to contain a queue name. That's in line with the AMQP 0.9.1
spec:
> One of the standard message properties is Reply-To, which is designed
specifically for carrying the name of reply queues.

Compared to AMQP 0.9.1 where the requester sets the `reply_to` property
to `amq.rabbitmq.reply-to` and RabbitMQ modifies this field when
forwarding the message to the request queue, in AMQP 1.0 the requester
learns about the queue name from the broker at link attachment time.
The requester has to set the reply-to property to the server generated
queue name. That's because the server isn't allowed to modify the bare
message.

During link attachment time, the client has to set certain fields.
These fields are expected to be set by the RabbitMQ client libraries.
Here is an Erlang example:
```erl
Source = #{address => undefined,
           durable => none,
           expiry_policy => <<"link-detach">>,
           dynamic => true,
           capabilities => [<<"rabbitmq:volatile-queue">>]},
AttachArgs = #{name => <<"receiver">>,
               role => {receiver, Source, self()},
               snd_settle_mode => settled,
               rcv_settle_mode => first},
{ok, Receiver} = amqp10_client:attach_link(Session, AttachArgs),
AddressReplyQ = receive {amqp10_event, {link, Receiver, {attached, Attach}}} ->
                  #'v1_0.attach'{source = #'v1_0.source'{address = {utf8, Addr}}} = Attach,
                  Addr
end,
```

The client then sends the message by setting the reply-to address as
follows:
```erl
amqp10_client:send_msg(
  SenderRequester,
  amqp10_msg:set_properties(
    #{message_id => <<"my ID">>,
      reply_to => AddressReplyQ},
    amqp10_msg:new(<<"tag">>, <<"request">>))),
```

If the responder attaches to the queue target in the reply-to field,
RabbitMQ will check if the requester link is still attached. If the
requester detached, the link will be refused.

The responder can also attach to the anonymous null target and set the
`to` field to the `reply-to` address.

If RabbitMQ cannot deliver a reply, instead of buffering the reply,
RabbitMQ will be drop the reply and increment the following Prometheus metric:
```
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_volatile_queue",dead_letter_strategy="disabled"} 0.0
```
That's in line with the MQTT QoS 0 queue type.

A reply message could be dropped for a variety of reasons:
1. The requester ran out of link-credit. It's therefore the requester's
   responsibility to grant sufficient link-credit on its receiving link.
2. RabbitMQ isn't allowed to deliver any message to due session flow
   control. It's the requster's responsibility to keep the session window
   large enough.
3. The requester doesn't consume messages fast enough causing TCP
   backpressure being applied or the RabbitMQ AMQP writer proc isn't
   scheduled quickly enough. The latter can happen for example if
   RabbitMQ runs with a single scheduler (is assigned a single CPU
   core). In either case, RabbitMQ internal flow control causes the
   volatile queue to drop messages.

Therefore, if high throughput is required while message loss is undesirable, a classic queue should be used
instead of a volatile queue since the former buffers messages while the
latter doesn't.

The main difference between the volatile queue and the MQTT QoS 0 queue
is that the former isn't written to the metadata store.

 # Breaking Change
Prior to this PR the following [documented caveat](https://www.rabbitmq.com/docs/4.0/direct-reply-to#limitations) applied:
> If the RPC server publishes with the mandatory flag set then `amq.rabbitmq.reply-to.*`
is treated as **not** a queue; i.e. if the server only publishes to this name then the message
will be considered "not routed"; a `basic.return` will be sent if the mandatory flag was set.

This PR removes this caveat.
This PR introduces the following new behaviour:
> If the RPC server publishes with the mandatory flag set, then `amq.rabbitmq.reply-to.*`
is treated as a queue (assuming this queue name is encoded correctly). However,
whether the requester is still there to consume the reply is not checked at routing time.
In other words, if the RPC server only publishes to this name, then the message will be
considered "routed" and RabbitMQ will therefore not send a `basic.return`.
This commit is contained in:
David Ansari 2025-08-06 14:57:27 +02:00
parent 81117133be
commit 72cd7a35c2
35 changed files with 2082 additions and 481 deletions

View File

@ -28,6 +28,7 @@ jobs:
- parallel-ct-set-2
- parallel-ct-set-3
- parallel-ct-set-4
- parallel-ct-set-5
- ct-amqp_client
- ct-clustering_management
- eunit ct-dead_lettering

View File

@ -92,8 +92,8 @@
-type max_message_size() :: undefined | non_neg_integer().
-type footer_opt() :: crc32 | adler32.
-type attach_args() :: #{name => binary(),
role => attach_role(),
-type attach_args() :: #{name := binary(),
role := attach_role(),
snd_settle_mode => snd_settle_mode(),
rcv_settle_mode => rcv_settle_mode(),
filter => filter(),
@ -739,13 +739,19 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) ->
make_source(#{role := {sender, _}}) ->
#'v1_0.source'{};
make_source(#{role := {receiver, Source, _Pid},
filter := Filter}) ->
make_source(#{role := {receiver, Source, _Pid}} = AttachArgs) ->
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
ExpiryPolicy = case Source of
#{expiry_policy := Policy} when is_binary(Policy) ->
{symbol, Policy};
_ ->
undefined
end,
Dynamic = maps:get(dynamic, Source, false),
TranslatedFilter = translate_filters(Filter),
TranslatedFilter = translate_filters(maps:get(filter, AttachArgs, #{})),
#'v1_0.source'{address = make_address(Source),
durable = {uint, Durable},
expiry_policy = ExpiryPolicy,
dynamic = Dynamic,
filter = TranslatedFilter,
capabilities = make_capabilities(Source)}.

16
deps/rabbit/Makefile vendored
View File

@ -236,15 +236,19 @@ define ct_master.erl
{ok, Pid2, _} = peer:start(StartOpts#{name => "rabbit_shard2"}),
{ok, Pid3, _} = peer:start(StartOpts#{name => "rabbit_shard3"}),
{ok, Pid4, _} = peer:start(StartOpts#{name => "rabbit_shard4"}),
{ok, Pid5, _} = peer:start(StartOpts#{name => "rabbit_shard5"}),
peer:call(Pid1, net_kernel, set_net_ticktime, [5]),
peer:call(Pid2, net_kernel, set_net_ticktime, [5]),
peer:call(Pid3, net_kernel, set_net_ticktime, [5]),
peer:call(Pid4, net_kernel, set_net_ticktime, [5]),
peer:call(Pid5, net_kernel, set_net_ticktime, [5]),
peer:call(Pid1, persistent_term, put, [rabbit_ct_tcp_port_base, 16000]),
peer:call(Pid2, persistent_term, put, [rabbit_ct_tcp_port_base, 20000]),
peer:call(Pid3, persistent_term, put, [rabbit_ct_tcp_port_base, 24000]),
peer:call(Pid4, persistent_term, put, [rabbit_ct_tcp_port_base, 28000]),
peer:call(Pid5, persistent_term, put, [rabbit_ct_tcp_port_base, 32000]),
[{[_], {ok, Results}}] = ct_master_fork:run("$1"),
peer:stop(Pid5),
peer:stop(Pid4),
peer:stop(Pid3),
peer:stop(Pid2),
@ -258,7 +262,7 @@ endef
PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit
PARALLEL_CT_SET_2_A = cluster confirms_rejects consumer_timeout rabbit_access_control rabbit_confirms rabbit_core_metrics_gc rabbit_cuttlefish rabbit_db_binding rabbit_db_exchange
@ -276,13 +280,16 @@ PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit ra
PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost
PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info publisher_confirms_parallel queue_type rabbitmq_queues_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue
PARALLEL_CT_SET_5_A = rabbit_direct_reply_to_prop direct_reply_to_amqpl direct_reply_to_amqp
PARALLEL_CT_SET_1 = $(sort $(PARALLEL_CT_SET_1_A) $(PARALLEL_CT_SET_1_B) $(PARALLEL_CT_SET_1_C) $(PARALLEL_CT_SET_1_D))
PARALLEL_CT_SET_2 = $(sort $(PARALLEL_CT_SET_2_A) $(PARALLEL_CT_SET_2_B) $(PARALLEL_CT_SET_2_C) $(PARALLEL_CT_SET_2_D))
PARALLEL_CT_SET_3 = $(sort $(PARALLEL_CT_SET_3_A) $(PARALLEL_CT_SET_3_B) $(PARALLEL_CT_SET_3_C) $(PARALLEL_CT_SET_3_D))
PARALLEL_CT_SET_4 = $(sort $(PARALLEL_CT_SET_4_A) $(PARALLEL_CT_SET_4_B) $(PARALLEL_CT_SET_4_C) $(PARALLEL_CT_SET_4_D))
PARALLEL_CT_SET_5 = $(PARALLEL_CT_SET_5_A)
SEQUENTIAL_CT_SUITES = amqp_client clustering_management dead_lettering feature_flags metadata_store_clustering quorum_queue rabbit_stream_queue rabbit_fifo_prop
PARALLEL_CT_SUITES = $(PARALLEL_CT_SET_1) $(PARALLEL_CT_SET_2) $(PARALLEL_CT_SET_3) $(PARALLEL_CT_SET_4)
PARALLEL_CT_SUITES = $(PARALLEL_CT_SET_1) $(PARALLEL_CT_SET_2) $(PARALLEL_CT_SET_3) $(PARALLEL_CT_SET_4) $(PARALLEL_CT_SET_5)
ifeq ($(filter-out $(SEQUENTIAL_CT_SUITES) $(PARALLEL_CT_SUITES),$(CT_SUITES)),)
parallel-ct-sanity-check:
@ -308,16 +315,19 @@ define tpl_parallel_ct_test_spec
{node, shard2, 'rabbit_shard2@localhost'}.
{node, shard3, 'rabbit_shard3@localhost'}.
{node, shard4, 'rabbit_shard4@localhost'}.
{node, shard5, 'rabbit_shard5@localhost'}.
{define, 'Set1', [$(call comma_list,$(addsuffix _SUITE,$1))]}.
{define, 'Set2', [$(call comma_list,$(addsuffix _SUITE,$2))]}.
{define, 'Set3', [$(call comma_list,$(addsuffix _SUITE,$3))]}.
{define, 'Set4', [$(call comma_list,$(addsuffix _SUITE,$4))]}.
{define, 'Set5', [$(call comma_list,$(addsuffix _SUITE,$5))]}.
{suites, shard1, "test/", 'Set1'}.
{suites, shard2, "test/", 'Set2'}.
{suites, shard3, "test/", 'Set3'}.
{suites, shard4, "test/", 'Set4'}.
{suites, shard5, "test/", 'Set5'}.
endef
define parallel_ct_set_target
@ -330,7 +340,7 @@ parallel-ct-set-$(1): test-build
$$(call erlang,$$(call ct_master.erl,ct.set-$(1).spec),-sname parallel_ct_$(PROJECT)@localhost -hidden -kernel net_ticktime 5)
endef
$(foreach set,1 2 3 4,$(eval $(call parallel_ct_set_target,$(set))))
$(foreach set,1 2 3 4 5,$(eval $(call parallel_ct_set_target,$(set))))
# --------------------------------------------------------------------
# Compilation.

View File

@ -228,7 +228,14 @@ convert_from(mc_amqp, Sections, Env) ->
%% drop it, what else can we do?
undefined
end,
ReplyTo = case unwrap_shortstr(ReplyTo0) of
<<"/queues/", Queue/binary>> ->
try cow_uri:urldecode(Queue)
catch error:_ -> undefined
end;
Other ->
Other
end,
BP = #'P_basic'{message_id = MsgId091,
delivery_mode = DelMode,
expiration = Expiration,
@ -237,7 +244,7 @@ convert_from(mc_amqp, Sections, Env) ->
[] -> undefined;
AllHeaders -> AllHeaders
end,
reply_to = unwrap_shortstr(ReplyTo0),
reply_to = ReplyTo,
type = Type,
app_id = unwrap_shortstr(GroupId),
priority = Priority,
@ -349,7 +356,7 @@ convert_to(mc_amqp, #content{payload_fragments_rev = PFR} = Content, Env) ->
delivery_mode = DelMode,
headers = Headers0,
user_id = UserId,
reply_to = ReplyTo,
reply_to = ReplyTo0,
type = Type,
priority = Priority,
app_id = AppId,
@ -382,6 +389,13 @@ convert_to(mc_amqp, #content{payload_fragments_rev = PFR} = Content, Env) ->
ttl = wrap(uint, Ttl),
%% TODO: check Priority is a ubyte?
priority = wrap(ubyte, Priority)},
ReplyTo = case ReplyTo0 of
undefined ->
undefined;
_ ->
Queue = uri_string:quote(ReplyTo0),
{utf8, <<"/queues/", Queue/binary>>}
end,
CorrId = case mc_util:urn_string_to_uuid(CorrId0) of
{ok, CorrUUID} ->
{uuid, CorrUUID};
@ -400,7 +414,7 @@ convert_to(mc_amqp, #content{payload_fragments_rev = PFR} = Content, Env) ->
user_id = wrap(binary, UserId),
to = undefined,
% subject = wrap(utf8, RKey),
reply_to = wrap(utf8, ReplyTo),
reply_to = ReplyTo,
correlation_id = CorrId,
content_type = wrap(symbol, ContentType),
content_encoding = wrap(symbol, ContentEncoding),

View File

@ -1,60 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(pid_recomposition).
%% API
-export([
to_binary/1,
from_binary/1,
decompose/1,
recompose/1
]).
-define(TTB_PREFIX, 131).
-define(NEW_PID_EXT, 88).
-define(ATOM_UTF8_EXT, 118).
-define(SMALL_ATOM_UTF8_EXT, 119).
-spec decompose(pid()) -> #{atom() => any()}.
decompose(Pid) ->
from_binary(term_to_binary(Pid, [{minor_version, 2}])).
-spec from_binary(binary()) -> #{atom() => any()}.
from_binary(Bin) ->
<<?TTB_PREFIX, ?NEW_PID_EXT, PidData/binary>> = Bin,
{Node, Rest2} = case PidData of
<<?ATOM_UTF8_EXT, AtomLen:16/integer, Node0:AtomLen/binary, Rest1/binary>> ->
{Node0, Rest1};
<<?SMALL_ATOM_UTF8_EXT, AtomLen/integer, Node0:AtomLen/binary, Rest1/binary>> ->
{Node0, Rest1}
end,
<<ID:32/integer, Serial:32/integer, Creation:32/integer>> = Rest2,
#{
node => binary_to_atom(Node, utf8),
id => ID,
serial => Serial,
creation => Creation
}.
-spec to_binary(#{atom() => any()}) -> binary().
to_binary(#{node := Node, id := ID, serial := Serial, creation := Creation}) ->
BinNode = atom_to_binary(Node),
NodeLen = byte_size(BinNode),
<<?TTB_PREFIX:8/unsigned,
?NEW_PID_EXT:8/unsigned,
?ATOM_UTF8_EXT:8/unsigned,
NodeLen:16/unsigned,
BinNode/binary,
ID:32,
Serial:32,
Creation:32>>.
-spec recompose(#{atom() => any()}) -> pid().
recompose(M) ->
binary_to_term(to_binary(M)).

View File

@ -1157,7 +1157,6 @@ pg_local_amqp_connection() ->
pg_local_scope(Prefix) ->
list_to_atom(io_lib:format("~s_~s", [Prefix, node()])).
-spec update_cluster_tags() -> 'ok'.
update_cluster_tags() ->

View File

@ -87,8 +87,14 @@ handle_http_req(<<"GET">>,
QName,
fun(Q) ->
{ok, NumMsgs, NumConsumers} = rabbit_amqqueue:stat(Q),
case rabbit_volatile_queue:is(QNameBin) andalso
not rabbit_volatile_queue:exists(QName) of
true ->
{error, not_found};
false ->
RespPayload = encode_queue(Q, NumMsgs, NumConsumers),
{ok, {<<"200">>, RespPayload, PermCaches}}
end
end) of
{ok, Result} ->
Result;

View File

@ -81,6 +81,10 @@
?V_1_0_SYMBOL_REJECTED,
?V_1_0_SYMBOL_RELEASED,
?V_1_0_SYMBOL_MODIFIED]).
%% The queue process monitors our session process. When our session process
%% terminates (abnormally) any messages checked out to our session process
%% will be requeued. That's why the we only support RELEASED as the default outcome.
-define(DEFAULT_OUTCOME, #'v1_0.released'{}).
-define(DEFAULT_EXCHANGE_NAME, <<>>).
-define(PROTOCOL, amqp10).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@ -88,6 +92,7 @@
-define(CREDIT_REPLY_TIMEOUT, 30_000).
%% Capability defined in amqp-bindmap-jms-v1.0-wd10 [5.2] and sent by Qpid JMS client.
-define(CAP_TEMPORARY_QUEUE, <<"temporary-queue">>).
-define(CAP_VOLATILE_QUEUE, <<"rabbitmq:volatile-queue">>).
-export([start_link/9,
process_frame/2,
@ -96,7 +101,8 @@
check_resource_access/4,
check_read_permitted_on_topic/4,
reset_authz/2,
info/1
info/1,
is_local/1
]).
-export([init/1,
@ -500,19 +506,15 @@ terminate(_Reason, #state{incoming_links = IncomingLinks,
maps:foreach(
fun (_, Link) ->
rabbit_global_counters:publisher_deleted(?PROTOCOL),
maybe_delete_dynamic_queue(Link, Cfg)
maybe_delete_dynamic_classic_queue(Link, Cfg)
end, IncomingLinks),
maps:foreach(
fun (_, Link) ->
rabbit_global_counters:consumer_deleted(?PROTOCOL),
maybe_delete_dynamic_queue(Link, Cfg)
maybe_delete_dynamic_classic_queue(Link, Cfg)
end, OutgoingLinks),
ok = rabbit_queue_type:close(QStates).
-spec list_local() -> [pid()].
list_local() ->
pg:which_groups(pg_scope()).
-spec conserve_resources(pid(),
rabbit_alarm:resource_alarm_source(),
rabbit_alarm:resource_alert()) -> ok.
@ -523,6 +525,16 @@ conserve_resources(Pid, Source, {_, Conserve, _}) ->
reset_authz(Pid, User) ->
gen_server:cast(Pid, {reset_authz, User}).
handle_call({has_state, QName, QType},
_From,
#state{queue_states = QStates} = State) ->
Reply = case rabbit_queue_type:module(QName, QStates) of
{ok, QType} ->
true;
_ ->
false
end,
reply(Reply, State);
handle_call(infos, _From, State) ->
reply(infos(State), State);
handle_call(Msg, _From, State) ->
@ -645,15 +657,25 @@ log_error_and_close_session(
WriterPid, Ch, #'v1_0.end'{error = Error}),
{stop, {shutdown, Error}, State}.
%% Batch confirms / rejects to publishers.
%% If we receive consecutive confirms/rejects from queues, we batch them
%% to send fewer disposition frames to publishers.
noreply_coalesce(#state{stashed_rejected = [],
stashed_settled = [],
stashed_down = [],
stashed_eol = []} = State) ->
noreply(State);
noreply_coalesce(State) ->
noreply_coalesce(#state{outgoing_pending = Pending} = State) ->
case queue:is_empty(Pending) of
true ->
Timeout = 0,
{noreply, State, Timeout}.
{noreply, State, Timeout};
false ->
%% We prioritise processing the Pending queue over batching confirms/rejects
%% because we must ensure to grant the next batch of link credit to the
%% volatile queue before processing the next delivery in our mailbox to
%% avoid the volatile queue dropping messages.
noreply(State)
end.
noreply(State0) ->
State = send_buffered(State0),
@ -1165,13 +1187,17 @@ handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
{OutgoingLinks, Unsettled, Pending, QStates} =
case maps:take(HandleInt, OutgoingLinks0) of
{#outgoing_link{queue_name = QName,
queue_type = QType,
dynamic = Dynamic}, OutgoingLinks1} ->
Ctag = handle_to_ctag(HandleInt),
{Unsettled1, Pending1} = remove_outgoing_link(Ctag, Unsettled0, Pending0),
case Dynamic of
true ->
delete_dynamic_queue(QName, Cfg),
true when QType =:= rabbit_classic_queue ->
delete_dynamic_classic_queue(QName, Cfg),
{OutgoingLinks1, Unsettled1, Pending1, QStates0};
true when QType =:= rabbit_volatile_queue ->
QStates1 = rabbit_queue_type:remove(QName, QStates0),
{OutgoingLinks1, Unsettled1, Pending1, QStates1};
false ->
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
@ -1196,7 +1222,7 @@ handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
end,
IncomingLinks = case maps:take(HandleInt, IncomingLinks0) of
{IncomingLink, IncomingLinks1} ->
maybe_delete_dynamic_queue(IncomingLink, Cfg),
maybe_delete_dynamic_classic_queue(IncomingLink, Cfg),
IncomingLinks1;
error ->
IncomingLinks0
@ -1410,7 +1436,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
name = LinkName = {utf8, LinkNameBin},
handle = Handle = ?UINT(HandleInt),
source = Source0 = #'v1_0.source'{filter = DesiredFilter},
source = Source0,
snd_settle_mode = SndSettleMode,
rcv_settle_mode = RcvSettleMode,
max_message_size = MaybeMaxMessageSize,
@ -1433,11 +1459,14 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
%% client only for durable messages.
{false, ?V_1_0_SENDER_SETTLE_MODE_UNSETTLED}
end,
case ensure_source(Source0, LinkNameBin, Vhost, User, ContainerId,
ReaderPid, PermCache0, TopicPermCache0) of
case ensure_source(Source0, SndSettled, LinkNameBin,
Vhost, User, ContainerId, ReaderPid,
PermCache0, TopicPermCache0) of
{error, Reason} ->
link_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, "Attach refused: ~tp", [Reason]);
{ok, QName = #resource{name = QNameBin}, Source, PermCache1, TopicPermCache} ->
{ok, QName = #resource{name = QNameBin},
Source = #'v1_0.source'{filter = DesiredFilter},
PermCache1, TopicPermCache} ->
PermCache = check_resource_access(QName, read, User, PermCache1),
case rabbit_amqqueue:with(
QName,
@ -1511,14 +1540,8 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
initial_delivery_count = ?UINT(?INITIAL_DELIVERY_COUNT),
snd_settle_mode = EffectiveSndSettleMode,
rcv_settle_mode = RcvSettleMode,
%% The queue process monitors our session process. When our session process
%% terminates (abnormally) any messages checked out to our session process
%% will be requeued. That's why the we only support RELEASED as the default outcome.
source = Source#'v1_0.source'{
default_outcome = #'v1_0.released'{},
outcomes = outcomes(Source),
%% "the sending endpoint sets the filter actually in place" [3.5.3]
filter = EffectiveFilter},
source = Source#'v1_0.source'{filter = EffectiveFilter},
role = ?AMQP_ROLE_SENDER,
%% Echo back that we will respect the client's requested max-message-size.
max_message_size = MaybeMaxMessageSize,
@ -2705,6 +2728,7 @@ maybe_grant_mgmt_link_credit(Credit, _, _) ->
{Credit, []}.
-spec ensure_source(#'v1_0.source'{},
boolean(),
binary(),
rabbit_types:vhost(),
rabbit_types:user(),
@ -2720,38 +2744,69 @@ maybe_grant_mgmt_link_credit(Credit, _, _) ->
{error, term()}.
ensure_source(#'v1_0.source'{
address = undefined,
durable = Durable,
expiry_policy = ExpiryPolicy,
timeout = Timeout,
dynamic = true,
%% We will reply with the actual node properties.
dynamic_node_properties = _IgnoreDesiredProperties,
capabilities = {array, symbol, Caps}
} = Source0,
LinkName, Vhost, User, ContainerId,
SndSettled, LinkName, Vhost, User, ContainerId,
ConnPid, PermCache0, TopicPermCache) ->
case lists:member({symbol, ?CAP_TEMPORARY_QUEUE}, Caps) of
true ->
{QNameBin, Address, Props, PermCache} =
declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0),
Source = Source0#'v1_0.source'{
FFEnabled = rabbit_volatile_queue:ff_enabled(),
case maps:from_keys(Caps, true) of
#{{symbol, ?CAP_VOLATILE_QUEUE} := true}
when (Durable =:= undefined orelse Durable =:= ?V_1_0_TERMINUS_DURABILITY_NONE) andalso
ExpiryPolicy =:= ?V_1_0_TERMINUS_EXPIRY_POLICY_LINK_DETACH andalso
(Timeout =:= undefined orelse Timeout =:= {uint, 0}) andalso
SndSettled andalso
FFEnabled ->
%% create volatile queue
QNameBin = rabbit_volatile_queue:new_name(),
Source = #'v1_0.source'{
address = {utf8, queue_address(QNameBin)},
durable = Durable,
expiry_policy = ExpiryPolicy,
timeout = {uint, 0},
dynamic = true,
dynamic_node_properties = dynamic_node_properties(),
distribution_mode = ?V_1_0_STD_DIST_MODE_MOVE,
capabilities = rabbit_amqp_util:capabilities([?CAP_VOLATILE_QUEUE])
},
QName = rabbit_misc:queue_resource(Vhost, QNameBin),
{ok, QName, Source, PermCache0, TopicPermCache};
#{{symbol, ?CAP_TEMPORARY_QUEUE} := true} ->
%% create exclusive classic queue
{QNameBin, Address, PermCache} =
declare_exclusive_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0),
Source = #'v1_0.source'{
address = {utf8, Address},
%% While Khepri stores queue records durably, the terminus
%% - i.e. the existence of this receiver - is not stored durably.
durable = ?V_1_0_TERMINUS_DURABILITY_NONE,
expiry_policy = ?V_1_0_TERMINUS_EXPIRY_POLICY_LINK_DETACH,
timeout = {uint, 0},
dynamic_node_properties = Props,
dynamic = true,
dynamic_node_properties = dynamic_node_properties(),
distribution_mode = ?V_1_0_STD_DIST_MODE_MOVE,
default_outcome = ?DEFAULT_OUTCOME,
outcomes = outcomes(Source0),
capabilities = rabbit_amqp_util:capabilities([?CAP_TEMPORARY_QUEUE])
},
QName = queue_resource(Vhost, QNameBin),
{ok, QName, Source, PermCache, TopicPermCache};
false ->
exit_not_implemented("Dynamic source not supported: ~p", [Source0])
_ ->
exit_not_implemented("Dynamic source not supported: ~tp", [Source0])
end;
ensure_source(Source = #'v1_0.source'{dynamic = true}, _, _, _, _, _, _, _) ->
exit_not_implemented("Dynamic source not supported: ~p", [Source]);
ensure_source(Source = #'v1_0.source'{address = Address,
ensure_source(Source = #'v1_0.source'{dynamic = true}, _, _, _, _, _, _, _, _) ->
exit_not_implemented("Dynamic source not supported: ~tp", [Source]);
ensure_source(Source0 = #'v1_0.source'{address = Address,
durable = Durable},
_LinkName, Vhost, User, _ContainerId, _ConnPid, PermCache, TopicPermCache) ->
_SndSettle, _LinkName, Vhost, User, _ContainerId,
_ConnPid, PermCache, TopicPermCache) ->
Source = Source0#'v1_0.source'{default_outcome = ?DEFAULT_OUTCOME,
outcomes = outcomes(Source0)},
case Address of
{utf8, <<"/queues/", QNameBinQuoted/binary>>} ->
%% The only possible v2 source address format is:
@ -2843,9 +2898,9 @@ ensure_target(#'v1_0.target'{
LinkName, Vhost, User, ContainerId, ConnPid, PermCache0) ->
case lists:member({symbol, ?CAP_TEMPORARY_QUEUE}, Caps) of
true ->
{QNameBin, Address, Props, PermCache1} =
declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0),
{ok, Exchange, PermCache} = check_exchange(?DEFAULT_EXCHANGE_NAME, User, Vhost, PermCache1),
{ok, Exchange, PermCache1} = check_exchange(?DEFAULT_EXCHANGE_NAME, User, Vhost, PermCache0),
{QNameBin, Address, PermCache} =
declare_exclusive_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache1),
Target = #'v1_0.target'{
address = {utf8, Address},
%% While Khepri stores queue records durably,
@ -2854,7 +2909,7 @@ ensure_target(#'v1_0.target'{
expiry_policy = ?V_1_0_TERMINUS_EXPIRY_POLICY_LINK_DETACH,
timeout = {uint, 0},
dynamic = true,
dynamic_node_properties = Props,
dynamic_node_properties = dynamic_node_properties(),
capabilities = rabbit_amqp_util:capabilities([?CAP_TEMPORARY_QUEUE])
},
{ok, Exchange, QNameBin, QNameBin, Target, PermCache};
@ -2908,8 +2963,7 @@ check_exchange(XNameBin, User, Vhost, PermCache0) ->
end,
{ok, Exchange, PermCache};
{error, not_found} ->
link_error(?V_1_0_AMQP_ERROR_NOT_FOUND,
"no ~ts", [rabbit_misc:rs(XName)])
link_error_not_found(XName)
end.
address_v1_permitted() ->
@ -3467,17 +3521,15 @@ error_if_absent(Kind, Vhost, Name) when is_list(Name) ->
error_if_absent(Kind, Vhost, Name) when is_binary(Name) ->
error_if_absent(rabbit_misc:r(Vhost, Kind, Name)).
error_if_absent(Resource = #resource{kind = Kind}) ->
Mod = case Kind of
exchange -> rabbit_exchange;
queue -> rabbit_amqqueue
end,
case Mod:exists(Resource) of
true ->
ok;
false ->
link_error(?V_1_0_AMQP_ERROR_NOT_FOUND,
"no ~ts", [rabbit_misc:rs(Resource)])
error_if_absent(#resource{kind = exchange} = Name) ->
case rabbit_exchange:exists(Name) of
true -> ok;
false -> link_error_not_found(Name)
end;
error_if_absent(#resource{kind = queue} = Name) ->
case rabbit_amqqueue:exists(Name) of
true -> ok;
false -> link_error_not_found(Name)
end.
generate_queue_name_v1() ->
@ -3533,30 +3585,40 @@ declare_queue(QNameBin,
end,
{ok, PermCache}.
declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0) ->
declare_exclusive_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0) ->
QNameBin = generate_queue_name_dynamic(ContainerId, LinkName),
Address = queue_address(QNameBin),
{ok, PermCache} = declare_queue(QNameBin, Vhost, User, true, ConnPid, PermCache0),
QNameBinQuoted = uri_string:quote(QNameBin),
Address = <<"/queues/", QNameBinQuoted/binary>>,
Props = {map, [{{symbol, <<"lifetime-policy">>},
{QNameBin, Address, PermCache}.
-spec queue_address(unicode:unicode_binary()) ->
unicode:unicode_binary().
queue_address(QueueName) ->
QueueNameQuoted = uri_string:quote(QueueName),
<<"/queues/", QueueNameQuoted/binary>>.
dynamic_node_properties() ->
{map, [{{symbol, <<"lifetime-policy">>},
{described, ?V_1_0_SYMBOL_DELETE_ON_CLOSE, {list, []}}},
{{symbol, <<"supported-dist-modes">>},
{array, symbol, [?V_1_0_STD_DIST_MODE_MOVE]}}]},
{QNameBin, Address, Props, PermCache}.
{array, symbol, [?V_1_0_STD_DIST_MODE_MOVE]}}]}.
maybe_delete_dynamic_queue(#incoming_link{dynamic = true,
maybe_delete_dynamic_classic_queue(
#incoming_link{dynamic = true,
queue_name_bin = QNameBin},
Cfg = #cfg{vhost = Vhost}) ->
QName = queue_resource(Vhost, QNameBin),
delete_dynamic_queue(QName, Cfg);
maybe_delete_dynamic_queue(#outgoing_link{dynamic = true,
delete_dynamic_classic_queue(QName, Cfg);
maybe_delete_dynamic_classic_queue(
#outgoing_link{dynamic = true,
queue_type = rabbit_classic_queue,
queue_name = QName},
Cfg) ->
delete_dynamic_queue(QName, Cfg);
maybe_delete_dynamic_queue(_, _) ->
delete_dynamic_classic_queue(QName, Cfg);
maybe_delete_dynamic_classic_queue(_, _) ->
ok.
delete_dynamic_queue(QName, #cfg{user = #user{username = Username}}) ->
delete_dynamic_classic_queue(QName, #cfg{user = #user{username = Username}}) ->
%% No real need to check for 'configure' access again since this queue is owned by
%% this connection and the user had 'configure' access when the queue got declared.
_ = rabbit_amqqueue:with(
@ -3888,13 +3950,37 @@ error_not_found(Resource) ->
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, Description}}.
-spec link_error_not_found(rabbit_types:r(exchange | queue)) -> no_return().
link_error_not_found(Resource) ->
link_error(?V_1_0_AMQP_ERROR_NOT_FOUND,
"no ~ts",
[rabbit_misc:rs(Resource)]).
is_valid_max(Val) ->
is_integer(Val) andalso
Val > 0 andalso
Val =< ?UINT_MAX.
-spec list_local() -> [pid()].
list_local() ->
pg:which_groups(pg_scope()).
%% Returns true if Pid is a local AMQP session process.
-spec is_local(pid()) -> boolean().
is_local(Pid) ->
pg:get_local_members(pg_scope(), Pid) =/= [].
-spec pg_scope() -> atom().
pg_scope() ->
rabbit:pg_local_scope(amqp_session).
Key = pg_scope_amqp_session,
case persistent_term:get(Key, undefined) of
undefined ->
Val = rabbit:pg_local_scope(amqp_session),
persistent_term:put(Key, Val),
Val;
Val ->
Val
end.
-spec cap_credit(rabbit_queue_type:credit(), pos_integer()) ->
rabbit_queue_type:credit().

View File

@ -129,6 +129,11 @@
msg_id
}).
-record(direct_reply, {
consumer_tag :: rabbit_types:ctag(),
queue :: rabbit_misc:resource_name()
}).
-record(ch, {cfg :: #conf{},
%% limiter state, see rabbit_limiter
limiter,
@ -159,8 +164,7 @@
%% a list of tags for published messages that were
%% rejected but are yet to be sent to the client
rejected,
%% used by "one shot RPC" (amq.
reply_consumer :: none | {rabbit_types:ctag(), binary(), binary()},
direct_reply :: none | #direct_reply{},
%% see rabbitmq-server#114
delivery_flow :: flow | noflow,
interceptor_state,
@ -297,48 +301,14 @@ shutdown(Pid) ->
send_command(Pid, Msg) ->
gen_server2:cast(Pid, {command, Msg}).
-spec deliver_reply(binary(), mc:state()) -> 'ok'.
deliver_reply(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>, Message) ->
Nodes = nodes_with_hashes(),
case rabbit_direct_reply_to:decode_reply_to(EncodedBin, Nodes) of
{ok, Pid, Key} ->
delegate:invoke_no_result(
Pid, {?MODULE, deliver_reply_local, [Key, Message]});
{error, _} ->
ok
end.
%% We want to ensure people can't use this mechanism to send a message
%% to an arbitrary process and kill it!
-spec deliver_reply_local(pid(), binary(), mc:state()) -> 'ok'.
%% Delete this function when feature flag rabbitmq_4.2.0 becomes required.
-spec deliver_reply_local(pid(), binary(), mc:state()) -> ok.
deliver_reply_local(Pid, Key, Message) ->
case pg_local:in_group(rabbit_channels, Pid) of
true -> gen_server2:cast(Pid, {deliver_reply, Key, Message});
false -> ok
end.
declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) ->
exists;
declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) ->
Nodes = nodes_with_hashes(),
case rabbit_direct_reply_to:decode_reply_to(EncodedBin, Nodes) of
{ok, Pid, Key} ->
Msg = {declare_fast_reply_to, Key},
rabbit_misc:with_exit_handler(
rabbit_misc:const(not_found),
fun() -> gen_server2:call(Pid, Msg, infinity) end);
{error, _} ->
not_found
end;
declare_fast_reply_to(_) ->
not_found.
nodes_with_hashes() ->
#{erlang:phash2(Node) => Node || Node <- rabbit_nodes:list_members()}.
-spec list() -> [pid()].
list() ->
@ -530,7 +500,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
unconfirmed = rabbit_confirms:init(),
rejected = [],
confirmed = [],
reply_consumer = none,
direct_reply = none,
delivery_flow = Flow,
interceptor_state = undefined,
queue_states = rabbit_queue_type:init()
@ -587,6 +557,29 @@ handle_call({{info, Items}, Deadline}, _From, State) ->
reply({error, Error}, State)
end;
handle_call({has_state, #resource{virtual_host = Vhost,
name = Name}, rabbit_volatile_queue},
_From,
#ch{cfg = #conf{virtual_host = Vhost},
direct_reply = #direct_reply{queue = Name}} = State) ->
reply(true, State);
handle_call({has_state, _QName, _QType}, _From, State) ->
reply(false, State);
%% Delete below clause when feature flag rabbitmq_4.2.0 becomes required.
handle_call({declare_fast_reply_to, Key}, _From, State = #ch{direct_reply = Reply}) ->
Result = case Reply of
none ->
not_found;
#direct_reply{queue = QNameBin} ->
case rabbit_volatile_queue:key_from_name(QNameBin) of
{ok, Key} ->
exists;
_ ->
not_found
end
end,
reply(Result, State);
handle_call(refresh_config, _From,
State = #ch{cfg = #conf{virtual_host = VHost} = Cfg}) ->
reply(ok, State#ch{cfg = Cfg#conf{trace_state = rabbit_trace:init(VHost)}});
@ -595,13 +588,6 @@ handle_call(refresh_interceptors, _From, State) ->
IState = rabbit_channel_interceptor:init(State),
reply(ok, State#ch{interceptor_state = IState});
handle_call({declare_fast_reply_to, Key}, _From,
State = #ch{reply_consumer = Consumer}) ->
reply(case Consumer of
{_, _, Key} -> exists;
_ -> not_found
end, State);
handle_call(list_queue_states, _From, State = #ch{queue_states = QueueStates}) ->
%% For testing of cleanup only
%% HACK
@ -665,29 +651,31 @@ handle_cast({command, Msg}, State) ->
ok = send(Msg, State),
noreply(State);
handle_cast({deliver_reply, _K, _Del},
State = #ch{cfg = #conf{state = closing}}) ->
noreply(State);
handle_cast({deliver_reply, _K, _Msg}, State = #ch{reply_consumer = none}) ->
noreply(State);
%% Delete below clause when feature flag rabbitmq_4.2.0 becomes required.
handle_cast({deliver_reply, Key, Mc},
State = #ch{cfg = #conf{writer_pid = WriterPid,
#ch{cfg = #conf{state = ChanState,
writer_pid = WriterPid,
msg_interceptor_ctx = MsgIcptCtx},
next_tag = DeliveryTag,
reply_consumer = {ConsumerTag, _Suffix, Key}}) ->
direct_reply = #direct_reply{consumer_tag = Ctag,
queue = QNameBin}} = State)
when ChanState =/= closing ->
case rabbit_volatile_queue:key_from_name(QNameBin) of
{ok, Key} ->
ExchName = mc:exchange(Mc),
[RoutingKey | _] = mc:routing_keys(Mc),
Content = outgoing_content(Mc, MsgIcptCtx),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.deliver'{consumer_tag = ConsumerTag,
Deliver = #'basic.deliver'{consumer_tag = Ctag,
delivery_tag = DeliveryTag,
redelivered = false,
exchange = ExchName,
routing_key = RoutingKey},
Content),
Content = outgoing_content(Mc, MsgIcptCtx),
ok = rabbit_writer:send_command(WriterPid, Deliver, Content);
_ ->
ok
end,
noreply(State);
handle_cast({deliver_reply, _K1, _}, State=#ch{reply_consumer = {_, _, _K2}}) ->
handle_cast({deliver_reply, _, _}, State) ->
noreply(State);
% Note: https://www.pivotaltracker.com/story/show/166962656
@ -1084,20 +1072,19 @@ strip_cr_lf(NameBin) ->
binary:replace(NameBin, [<<"\n">>, <<"\r">>], <<"">>, [global]).
maybe_set_fast_reply_to(
C = #content{properties = P = #'P_basic'{reply_to =
<<"amq.rabbitmq.reply-to">>}},
#ch{reply_consumer = ReplyConsumer}) ->
case ReplyConsumer of
none -> rabbit_misc:protocol_error(
precondition_failed,
"fast reply consumer does not exist", []);
{_, Suf, _K} -> Rep = <<"amq.rabbitmq.reply-to.", Suf/binary>>,
rabbit_binary_generator:clear_encoded_content(
C#content{properties = P#'P_basic'{reply_to = Rep}})
override_reply_to(
C0 = #content{properties = P = #'P_basic'{reply_to = <<"amq.rabbitmq.reply-to">>}},
#ch{direct_reply = Reply}) ->
case Reply of
#direct_reply{queue = QNameBin} ->
C = C0#content{properties = P#'P_basic'{reply_to = QNameBin}},
rabbit_binary_generator:clear_encoded_content(C);
none ->
rabbit_misc:protocol_error(precondition_failed,
"fast reply consumer does not exist", [])
end;
maybe_set_fast_reply_to(C, _State) ->
C.
override_reply_to(Content, _) ->
Content.
record_rejects([], State) ->
State;
@ -1198,9 +1185,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_internal_exchange(Exchange),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = #content {properties = Props} =
maybe_set_fast_reply_to(
rabbit_binary_parser:ensure_content_decoded(Content), State1),
DecContent0 = rabbit_binary_parser:ensure_content_decoded(Content),
DecContent = #content{properties = Props} = override_reply_to(DecContent0, State1),
check_expiration_header(Props),
DoConfirm = Tx =/= none orelse ConfirmEnabled,
{DeliveryOptions, State} =
@ -1217,7 +1203,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
case mc_amqpl:message(ExchangeName,
RoutingKey,
DecodedContent) of
DecContent) of
{error, Reason} ->
rabbit_misc:precondition_failed("invalid message: ~tp", [Reason]);
{ok, Message0} ->
@ -1225,7 +1211,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_user_id_header(Message0, User),
Message = rabbit_msg_interceptor:intercept_incoming(Message0, MsgIcptCtx),
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
[deliver_reply(RK, Message) || {virtual_reply_queue, RK} <- QNames],
Queues = rabbit_amqqueue:lookup_many(QNames),
rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum,
Username, TraceState),
@ -1301,29 +1286,45 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
consumer_tag = CTag0,
no_ack = NoAck,
nowait = NoWait},
_, State = #ch{reply_consumer = ReplyConsumer,
cfg = #conf{max_consumers = MaxConsumers},
_, State = #ch{direct_reply = DirectReply0,
cfg = #conf{virtual_host = Vhost,
user = #user{username = Username},
max_consumers = MaxConsumers},
queue_states = QStates0,
consumer_mapping = ConsumerMapping}) ->
CurrentConsumers = maps:size(ConsumerMapping),
case maps:find(CTag0, ConsumerMapping) of
error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity'
rabbit_misc:protocol_error(
not_allowed, "reached maximum (~B) of consumers per channel", [MaxConsumers]);
not_allowed,
"reached maximum (~B) of consumers per channel", [MaxConsumers]);
error ->
case {ReplyConsumer, NoAck} of
case {DirectReply0, NoAck} of
{none, true} ->
CTag = case CTag0 of
<<>> -> rabbit_guid:binary(
rabbit_guid:gen_secure(), "amq.ctag");
Other -> Other
<<>> ->
rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.ctag");
Other ->
Other
end,
%% Precalculate both suffix and key
{Key, Suffix} = rabbit_direct_reply_to:compute_key_and_suffix(self()),
Consumer = {CTag, Suffix, Key},
State1 = State#ch{reply_consumer = Consumer},
QNameBin = rabbit_volatile_queue:new_name(),
QName = rabbit_misc:queue_resource(Vhost, QNameBin),
Q = rabbit_volatile_queue:new(QName),
Spec = #{no_ack => true,
channel_pid => self(),
mode => {simple_prefetch, 0},
consumer_tag => CTag,
ok_msg => undefined,
acting_user => Username},
{ok, QStates} = rabbit_queue_type:consume(Q, Spec, QStates0),
State1 = State#ch{direct_reply = #direct_reply{consumer_tag = CTag,
queue = QNameBin},
queue_states = QStates},
case NoWait of
true -> {noreply, State1};
false -> Rep = #'basic.consume_ok'{consumer_tag = CTag},
true ->
{noreply, State1};
false ->
Rep = #'basic.consume_ok'{consumer_tag = CTag},
{reply, Rep, State1}
end;
{_, false} ->
@ -1341,17 +1342,25 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
end;
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
_, State = #ch{reply_consumer = {ConsumerTag, _, _}}) ->
State1 = State#ch{reply_consumer = none},
_, State = #ch{cfg = #conf{virtual_host = Vhost},
direct_reply = #direct_reply{consumer_tag = ConsumerTag,
queue = QNameBin},
queue_states = QStates}) ->
QName = rabbit_misc:queue_resource(Vhost, QNameBin),
QStates1 = rabbit_queue_type:remove(QName, QStates),
State1 = State#ch{direct_reply = none,
queue_states = QStates1},
case NoWait of
true -> {noreply, State1};
false -> Rep = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
true ->
{noreply, State1};
false ->
Rep = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
{reply, Rep, State1}
end;
handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ConsumerTag,
no_local = _, % FIXME: implement
no_local = _Unsupported,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait,
@ -1873,21 +1882,30 @@ record_sent(Type, QueueType, Tag, AckRequired,
next_tag = DeliveryTag
}) ->
rabbit_global_counters:messages_delivered(amqp091, QueueType, 1),
?INCR_STATS(queue_stats, QName, 1,
case {Type, AckRequired} of
Measure = case {Type, AckRequired} of
{get, true} ->
rabbit_global_counters:messages_delivered_get_manual_ack(amqp091, QueueType, 1),
rabbit_global_counters:messages_delivered_get_manual_ack(
amqp091, QueueType, 1),
get;
{get, false} ->
rabbit_global_counters:messages_delivered_get_auto_ack(amqp091, QueueType, 1),
rabbit_global_counters:messages_delivered_get_auto_ack(
amqp091, QueueType, 1),
get_no_ack;
{deliver, true} ->
rabbit_global_counters:messages_delivered_consume_manual_ack(amqp091, QueueType, 1),
rabbit_global_counters:messages_delivered_consume_manual_ack(
amqp091, QueueType, 1),
deliver;
{deliver, false} ->
rabbit_global_counters:messages_delivered_consume_auto_ack(amqp091, QueueType, 1),
rabbit_global_counters:messages_delivered_consume_auto_ack(
amqp091, QueueType, 1),
deliver_no_ack
end, State),
end,
case rabbit_volatile_queue:is(QName#resource.name) of
true ->
ok;
false ->
?INCR_STATS(queue_stats, QName, 1, Measure, State)
end,
case Redelivered of
true ->
rabbit_global_counters:messages_redelivered(amqp091, QueueType, 1),
@ -2074,8 +2092,15 @@ deliver_to_queues(XName,
case rabbit_event:stats_level(State, #ch.stats_timer) of
fine ->
?INCR_STATS(exchange_stats, XName, 1, publish),
lists:foreach(fun(QName) ->
?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish)
lists:foreach(
fun(#resource{name = QNameBin} = QName) ->
case rabbit_volatile_queue:is(QNameBin) of
true ->
ok;
false ->
?INCR_STATS(queue_exchange_stats,
{QName, XName}, 1, publish)
end
end, QueueNames);
_ ->
ok
@ -2380,13 +2405,16 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
%% Note that all declares to these are effectively passive. If it
%% exists it by definition has one consumer.
handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
_/binary>> = QueueNameBin},
_/binary>> = QueueNameBin0},
_ConnPid, _AuthzContext, _CollectorPid, VHost, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin),
case declare_fast_reply_to(StrippedQueueNameBin) of
exists -> {ok, QueueName, 0, 1};
not_found -> rabbit_amqqueue:not_found(QueueName)
QueueNameBin = strip_cr_lf(QueueNameBin0),
QueueName = rabbit_misc:queue_resource(VHost, QueueNameBin),
Q = rabbit_volatile_queue:new(QueueName),
case rabbit_queue_type:declare(Q, node()) of
{existing, _} ->
{ok, QueueName, 0, 1};
{absent, _, _} ->
rabbit_amqqueue:not_found(QueueName)
end;
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = false,
@ -2489,7 +2517,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true},
ConnPid, _AuthzContext, _CollectorPid, VHostPath, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin),
QueueName = rabbit_misc:queue_resource(VHostPath, StrippedQueueNameBin),
Fun = fun (Q0) ->
QStat = maybe_stat(NoWait, Q0),
{QStat, Q0}
@ -2600,7 +2628,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
check_not_default_exchange(ExchangeName),
_ = rabbit_exchange:lookup_or_die(ExchangeName).
handle_deliver(CTag, Ack, Msgs, State) when is_list(Msgs) ->
handle_deliver(CTag, Ack, Msgs, State) ->
lists:foldl(fun(Msg, S) ->
handle_deliver0(CTag, Ack, Msg, S)
end, State, Msgs).

View File

@ -54,6 +54,7 @@
consume/3,
cancel/3,
handle_event/3,
supports_stateful_delivery/0,
deliver/3,
settle/5,
credit_v1/5,
@ -62,8 +63,7 @@
info/2,
state_info/1,
capabilities/0,
notify_decorators/1,
is_stateful/0
notify_decorators/1
]).
-export([delete_crashed/1,
@ -471,6 +471,8 @@ settlement_action(_Type, _QRef, [], Acc) ->
settlement_action(Type, QRef, MsgSeqs, Acc) ->
[{Type, QRef, MsgSeqs} | Acc].
supports_stateful_delivery() -> true.
-spec deliver([{amqqueue:amqqueue(), state()}],
Delivery :: mc:state(),
rabbit_queue_type:delivery_options()) ->
@ -684,8 +686,6 @@ notify_decorators(Q) when ?is_amqqueue(Q) ->
QPid = amqqueue:get_pid(Q),
delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}).
is_stateful() -> true.
reject_seq_no(SeqNo, U0) ->
reject_seq_no(SeqNo, U0, []).

View File

@ -211,3 +211,10 @@
stability => stable,
depends_on => ['rabbitmq_4.0.0']
}}).
-rabbit_feature_flag(
{'rabbitmq_4.2.0',
#{desc => "Allows rolling upgrades to 4.2.x",
stability => stable,
depends_on => ['rabbitmq_4.1.0']
}}).

View File

@ -490,26 +490,39 @@ get_many_in_khepri(Names) ->
get_many_in_ets(Table, [{Name, RouteInfos}])
when is_map(RouteInfos) ->
case ets:lookup(Table, Name) of
case ets_lookup(Table, Name) of
[] -> [];
[Q] -> [{Q, RouteInfos}]
end;
get_many_in_ets(Table, [Name]) ->
ets:lookup(Table, Name);
ets_lookup(Table, Name);
get_many_in_ets(Table, Names) when is_list(Names) ->
lists:filtermap(fun({Name, RouteInfos})
when is_map(RouteInfos) ->
case ets:lookup(Table, Name) of
case ets_lookup(Table, Name) of
[] -> false;
[Q] -> {true, {Q, RouteInfos}}
end;
(Name) ->
case ets:lookup(Table, Name) of
case ets_lookup(Table, Name) of
[] -> false;
[Q] -> {true, Q}
end
end, Names).
ets_lookup(Table, QName = #resource{name = QNameBin}) ->
case rabbit_volatile_queue:is(QNameBin) of
true ->
%% This queue record is not stored in the database.
%% We create it on the fly.
case rabbit_volatile_queue:new(QName) of
error -> [];
Q -> [Q]
end;
false ->
ets:lookup(Table, QName)
end.
%% -------------------------------------------------------------------
%% get().
%% -------------------------------------------------------------------
@ -517,11 +530,19 @@ get_many_in_ets(Table, Names) when is_list(Names) ->
-spec get(QName) -> Ret when
QName :: rabbit_amqqueue:name(),
Ret :: {ok, Queue :: amqqueue:amqqueue()} | {error, not_found}.
get(Name) ->
get(#resource{name = NameBin} = Name) ->
case rabbit_volatile_queue:is(NameBin) of
true ->
case rabbit_volatile_queue:new(Name) of
error -> {error, not_found};
Q -> {ok, Q}
end;
false ->
rabbit_khepri:handle_fallback(
#{mnesia => fun() -> get_in_mnesia(Name) end,
khepri => fun() -> get_in_khepri(Name) end
}).
})
end.
get_in_mnesia(Name) ->
rabbit_mnesia:dirty_read({?MNESIA_TABLE, Name}).
@ -776,11 +797,16 @@ update_durable_in_khepri(UpdateFun, FilterFun) ->
%%
%% @private
exists(QName) ->
exists(#resource{name = NameBin} = Name) ->
case rabbit_volatile_queue:is(NameBin) of
true ->
rabbit_volatile_queue:exists(Name);
false ->
rabbit_khepri:handle_fallback(
#{mnesia => fun() -> exists_in_mnesia(QName) end,
khepri => fun() -> exists_in_khepri(QName) end
}).
#{mnesia => fun() -> exists_in_mnesia(Name) end,
khepri => fun() -> exists_in_khepri(Name) end
})
end.
exists_in_mnesia(QName) ->
ets:member(?MNESIA_TABLE, QName).

View File

@ -31,7 +31,7 @@
-type match_result() :: [rabbit_types:binding_destination() |
{rabbit_amqqueue:name(), rabbit_types:binding_key()}].
-define(COMPILED_TOPIC_SPLIT_PATTERN, dot_binary_pattern).
-define(COMPILED_TOPIC_SPLIT_PATTERN, cp_dot).
%% -------------------------------------------------------------------
%% set().

View File

@ -1,51 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_direct_reply_to).
-export([compute_key_and_suffix/1,
decode_reply_to/2]).
%% This pid encoding function produces values that are of mostly fixed size
%% regardless of the node name length.
-spec compute_key_and_suffix(pid()) ->
{binary(), binary()}.
compute_key_and_suffix(Pid) ->
Key = base64:encode(rabbit_guid:gen()),
PidParts0 = #{node := Node} = pid_recomposition:decompose(Pid),
%% Note: we hash the entire node name. This is sufficient for our needs of shortening node name
%% in the TTB-encoded pid, and helps avoid doing the node name split for every single cluster member
%% in rabbit_channel:nodes_with_hashes/0.
%%
%% We also use a synthetic node prefix because the hash alone will be sufficient to
NodeHash = erlang:phash2(Node),
PidParts = maps:update(node, rabbit_nodes_common:make("reply", integer_to_list(NodeHash)), PidParts0),
RecomposedEncoded = base64:encode(pid_recomposition:to_binary(PidParts)),
Suffix = <<RecomposedEncoded/binary, ".", Key/binary>>,
{Key, Suffix}.
-spec decode_reply_to(binary(), #{non_neg_integer() => node()}) ->
{ok, pid(), binary()} | {error, any()}.
decode_reply_to(Bin, CandidateNodes) ->
try
[PidEnc, Key] = binary:split(Bin, <<".">>),
RawPidBin = base64:decode(PidEnc),
PidParts0 = #{node := ShortenedNodename} = pid_recomposition:from_binary(RawPidBin),
{_, NodeHash} = rabbit_nodes_common:parts(ShortenedNodename),
case maps:get(list_to_integer(NodeHash), CandidateNodes, undefined) of
undefined ->
{error, target_node_not_found};
Candidate ->
PidParts = maps:update(node, Candidate, PidParts0),
{ok, pid_recomposition:recompose(PidParts), Key}
end
catch
error:_ ->
{error, unrecognized_format}
end.

View File

@ -30,8 +30,7 @@
-type route_opts() :: #{return_binding_keys => boolean()}.
-type route_infos() :: #{binding_keys => #{rabbit_types:binding_key() => true}}.
-type route_return() :: list(rabbit_amqqueue:name() |
{rabbit_amqqueue:name(), route_infos()} |
{virtual_reply_queue, binary()}).
{rabbit_amqqueue:name(), route_infos()}).
%%----------------------------------------------------------------------------
@ -373,7 +372,7 @@ info_all(VHostPath, Items, Ref, AggregatorPid) ->
AggregatorPid, Ref, fun(X) -> info(X, Items) end, list(VHostPath)).
-spec route(rabbit_types:exchange(), mc:state()) ->
[rabbit_amqqueue:name() | {virtual_reply_queue, binary()}].
[rabbit_amqqueue:name()].
route(Exchange, Message) ->
route(Exchange, Message, #{}).
@ -384,15 +383,7 @@ route(#exchange{name = #resource{name = ?DEFAULT_EXCHANGE_NAME,
Message, _Opts) ->
RKs0 = mc:routing_keys(Message),
RKs = lists:usort(RKs0),
[begin
case virtual_reply_queue(RK) of
false ->
rabbit_misc:r(VHost, queue, RK);
true ->
{virtual_reply_queue, RK}
end
end
|| RK <- RKs];
[rabbit_misc:r(VHost, queue, RK) || RK <- RKs];
route(X = #exchange{name = XName,
decorators = Decorators},
Message, Opts) ->
@ -407,9 +398,6 @@ route(X = #exchange{name = XName,
maps:keys(QNamesToBKeys)
end.
virtual_reply_queue(<<"amq.rabbitmq.reply-to.", _/binary>>) -> true;
virtual_reply_queue(_) -> false.
route1(_, _, _, {[], _, QNames}) ->
QNames;
route1(Message, Decorators, Opts,

View File

@ -132,20 +132,19 @@
boot_step() ->
[begin
%% Protocol counters
Protocol = #{protocol => Proto},
init(Protocol),
Labels = #{protocol => Proto},
init(Labels),
rabbit_msg_size_metrics:init(Proto),
%% Protocol & Queue Type counters
init(Protocol#{queue_type => rabbit_classic_queue}),
init(Protocol#{queue_type => rabbit_quorum_queue}),
init(Protocol#{queue_type => rabbit_stream_queue})
init(Labels#{queue_type => rabbit_classic_queue}),
init(Labels#{queue_type => rabbit_quorum_queue}),
init(Labels#{queue_type => rabbit_stream_queue}),
init(Labels#{queue_type => rabbit_volatile_queue})
end || Proto <- [amqp091, amqp10]],
%% Dead Letter counters
%%
%% Streams never dead letter.
%%
init(#{queue_type => rabbit_volatile_queue, dead_letter_strategy => disabled},
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER]),
%% Source classic queue dead letters.
init(#{queue_type => rabbit_classic_queue, dead_letter_strategy => disabled},
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER,
@ -155,7 +154,6 @@ boot_step() ->
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER,
?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER,
?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER]),
%%
%% Source quorum queue dead letters.
%% Only quorum queues can dead letter due to delivery-limit exceeded.
%% Only quorum queues support dead letter strategy at-least-once.

64
deps/rabbit/src/rabbit_pid_codec.erl vendored Normal file
View File

@ -0,0 +1,64 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_pid_codec).
-export([decompose/1,
decompose_from_binary/1,
recompose/1,
recompose_to_binary/1]).
-define(NEW_PID_EXT, 88).
-define(ATOM_UTF8_EXT, 118).
-define(SMALL_ATOM_UTF8_EXT, 119).
-define(TTB_PREFIX, 131).
-type decomposed_pid() :: #{node := node(),
id := non_neg_integer(),
serial := non_neg_integer(),
creation := non_neg_integer()}.
-spec decompose(pid()) -> decomposed_pid().
decompose(Pid) ->
Bin = term_to_binary(Pid, [{minor_version, 2}]),
decompose_from_binary(Bin).
-spec decompose_from_binary(binary()) -> decomposed_pid().
decompose_from_binary(Bin) ->
<<?TTB_PREFIX, ?NEW_PID_EXT, PidData/binary>> = Bin,
{Node, Rest} = case PidData of
<<?ATOM_UTF8_EXT, Len:16/integer, Node0:Len/binary, Rest1/binary>> ->
{Node0, Rest1};
<<?SMALL_ATOM_UTF8_EXT, Len/integer, Node0:Len/binary, Rest1/binary>> ->
{Node0, Rest1}
end,
<<ID:32/integer, Serial:32/integer, Creation:32/integer>> = Rest,
#{node => binary_to_atom(Node, utf8),
id => ID,
serial => Serial,
creation => Creation}.
-spec recompose_to_binary(decomposed_pid()) -> binary().
recompose_to_binary(#{node := Node,
id := ID,
serial := Serial,
creation := Creation}) ->
BinNode = atom_to_binary(Node),
NodeLen = byte_size(BinNode),
<<?TTB_PREFIX:8/unsigned,
?NEW_PID_EXT:8/unsigned,
?ATOM_UTF8_EXT:8/unsigned,
NodeLen:16/unsigned,
BinNode/binary,
ID:32,
Serial:32,
Creation:32>>.
-spec recompose(decomposed_pid()) -> pid().
recompose(Map) ->
Bin = recompose_to_binary(Map),
binary_to_term(Bin).

View File

@ -203,8 +203,6 @@
-callback policy_changed(amqqueue:amqqueue()) -> ok.
-callback is_stateful() -> boolean().
%% intitialise and return a queue type specific session context
-callback init(amqqueue:amqqueue()) ->
{ok, queue_state()} | {error, Reason :: term()}.
@ -216,7 +214,7 @@
-callback consume(amqqueue:amqqueue(),
consume_spec(),
queue_state()) ->
{ok, queue_state(), actions()} |
{ok, queue_state()} |
{error, Type :: atom(), Format :: string(), FormatArgs :: [term()]}.
-callback cancel(amqqueue:amqqueue(),
@ -232,6 +230,8 @@
{ok, queue_state(), actions()} | {error, term()} | {eol, actions()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
-callback supports_stateful_delivery() -> boolean().
-callback deliver([{amqqueue:amqqueue(), queue_state()}],
Message :: mc:state(),
Options :: delivery_options()) ->
@ -577,8 +577,10 @@ handle_down(Pid, QName, Info, State0) ->
{ok, state(), actions()} | {eol, actions()} | {error, term()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
handle_event(QRef, Evt, Ctxs) ->
%% events can arrive after a queue state has been cleared up
%% so need to be defensive here
%% We are defensive here and do not want to crash because
%% 1. Events can arrive after a queue state has been cleared up, and
%% 2. Direct Reply-to responder might send to a non-existing queue name
%% by using correctly encoded channel/session pid but wrong key.
case get_ctx(QRef, Ctxs, undefined) of
#ctx{module = Mod,
state = State0} = Ctx ->
@ -660,7 +662,7 @@ deliver0(Qs, Message0, Options, #?STATE{} = State0) ->
fun (Elem, Acc) ->
{Q, BKeys} = queue_binding_keys(Elem),
Mod = amqqueue:get_type(Q),
QState = case Mod:is_stateful() of
QState = case Mod:supports_stateful_delivery() of
true ->
#ctx{state = S} = get_ctx(Q, State0),
S;
@ -743,7 +745,8 @@ credit(QName, CTag, DeliveryCount, Credit, Drain, Ctxs) ->
-spec dequeue(amqqueue:amqqueue(), boolean(),
pid(), rabbit_types:ctag(), state()) ->
{ok, non_neg_integer(), term(), state()} |
{empty, state()} | rabbit_types:error(term()) |
{empty, state()} |
rabbit_types:error(term()) |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
dequeue(Q, NoAck, LimiterPid, CTag, Ctxs) ->
#ctx{state = State0} = Ctx = get_ctx(Q, Ctxs),

View File

@ -29,7 +29,8 @@
-export([settle/5, dequeue/5, consume/3, cancel/3]).
-export([credit_v1/5, credit/6]).
-export([purge/1]).
-export([deliver/3]).
-export([supports_stateful_delivery/0,
deliver/3]).
-export([dead_letter_publish/5]).
-export([cluster_state/1, status/2]).
-export([update_consumer_handler/8, update_consumer/9]).
@ -70,8 +71,7 @@
-export([is_enabled/0,
is_compatible/3,
declare/2,
is_stateful/0]).
declare/2]).
-export([validate_policy/1, merge_policy_value/3]).
-export([force_shrink_member_to_current_member/2,
@ -1109,6 +1109,8 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName, ActingUser) ->
{queue, QName},
{user_who_performed_action, ActingUser}]).
supports_stateful_delivery() -> true.
deliver0(QName, undefined, Msg, QState0) ->
case rabbit_fifo_client:enqueue(QName, Msg, QState0) of
{ok, _, _} = Res -> Res;
@ -1123,10 +1125,10 @@ deliver(QSs, Msg0, Options) ->
Correlation = maps:get(correlation, Options, undefined),
Msg = mc:prepare(store, Msg0),
lists:foldl(
fun({Q, stateless}, {Qs, Actions}) ->
fun({Q, stateless}, Acc) ->
QRef = amqqueue:get_pid(Q),
ok = rabbit_fifo_client:untracked_enqueue([QRef], Msg),
{Qs, Actions};
Acc;
({Q, S0}, {Qs, Actions}) ->
QName = amqqueue:get_name(Q),
case deliver0(QName, Correlation, Msg, S0) of
@ -2083,8 +2085,6 @@ notify_decorators(QName, F, A) ->
ok
end.
is_stateful() -> true.
force_shrink_member_to_current_member(VHost, Name) ->
Node = node(),
QName = rabbit_misc:r(VHost, queue, Name),

View File

@ -23,6 +23,7 @@
consume/3,
cancel/3,
handle_event/3,
supports_stateful_delivery/0,
deliver/3,
settle/5,
credit_v1/5,
@ -39,8 +40,7 @@
stat/1,
format/2,
capabilities/0,
notify_decorators/1,
is_stateful/0]).
notify_decorators/1]).
-export([list_with_minimum_quorum/0]).
@ -536,6 +536,8 @@ credit(QName, CTag, DeliveryCountRcv, LinkCreditRcv, Drain,
{State, []}
end.
supports_stateful_delivery() -> true.
deliver(QSs, Msg, Options) ->
lists:foldl(
fun({Q, stateless}, {Qs, Actions}) ->
@ -1431,8 +1433,6 @@ list_with_minimum_quorum() ->
map_size(RunningMembers) =< map_size(Members) div 2 + 1
end, rabbit_amqqueue:list_local_stream_queues()).
is_stateful() -> true.
get_nodes(Q) when ?is_amqqueue(Q) ->
#{nodes := Nodes} = amqqueue:get_type_state(Q),
Nodes.

View File

@ -62,8 +62,6 @@ tap_in(Msg, QNames, ConnName, ChannelNum, Username, TraceX) ->
RoutedQs = lists:map(fun(#resource{kind = queue, name = Name}) ->
{longstr, Name};
({#resource{kind = queue, name = Name}, _}) ->
{longstr, Name};
({virtual_reply_queue, Name}) ->
{longstr, Name}
end, QNames),
trace(TraceX, Msg, <<"publish">>, XName,

View File

@ -0,0 +1,409 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
%% This queue type is volatile:
%% * Queue metadata is not stored in the metadata store.
%% * Messages in this queue are effectively transient and delivered at most once.
%% * Messages are not buffered.
%% * Messages are dropped immediately if consumer ran out of link credit.
-module(rabbit_volatile_queue).
-behaviour(rabbit_queue_type).
-include_lib("rabbit_common/include/rabbit.hrl").
-export([new/1,
new_name/0,
is/1,
key_from_name/1,
pid_from_name/2,
exists/1,
ff_enabled/0,
local_cast/2,
local_call/2]).
%% rabbit_queue_type callbacks
-export([declare/2,
supports_stateful_delivery/0,
deliver/3,
credit/6,
init/1,
close/1,
update/2,
consume/3,
cancel/3,
handle_event/3,
is_enabled/0,
is_compatible/3,
is_recoverable/1,
purge/1,
policy_changed/1,
stat/1,
format/2,
capabilities/0,
notify_decorators/1,
stop/1,
list_with_minimum_quorum/0,
drain/1,
revive/0,
queue_vm_stats_sups/0,
queue_vm_ets/0,
delete/4,
recover/2,
settle/5,
credit_v1/5,
dequeue/5,
state_info/1,
info/2,
policy_apply_to_name/0
]).
-define(STATE, ?MODULE).
-record(?STATE, {
name :: rabbit_amqqueue:name(),
ctag :: undefined | rabbit_types:ctag(),
delivery_count :: undefined | rabbit_queue_type:delivery_count(),
credit :: undefined | rabbit_queue_type:credit(),
dropped = 0 :: non_neg_integer()
}).
-opaque state() :: #?STATE{}.
-export_type([state/0]).
-define(PREFIX, "amq.rabbitmq.reply-to.").
-define(CP_DOT, cp_dot).
-spec new(rabbit_amqqueue:name()) ->
amqqueue:amqqueue() | error.
new(#resource{virtual_host = Vhost,
name = <<"amq.rabbitmq.reply-to">>} = Name) ->
new0(Name, self(), Vhost);
new(#resource{virtual_host = Vhost,
name = NameBin} = Name) ->
case pid_from_name(NameBin, nodes_with_hashes()) of
{ok, Pid} when is_pid(Pid) ->
new0(Name, Pid, Vhost);
_ ->
error
end.
new0(Name, Pid, Vhost) ->
amqqueue:new(Name, Pid, false, true, none, [], Vhost, #{}, ?MODULE).
-spec is(rabbit_misc:resource_name()) ->
boolean().
is(<<?PREFIX, _/binary>>) ->
true;
is(Name) when is_binary(Name) ->
false.
init(Q) ->
{ok, #?STATE{name = amqqueue:get_name(Q)}}.
consume(_Q, Spec, State) ->
#{no_ack := true,
consumer_tag := Ctag,
mode := Mode} = Spec,
{DeliveryCount, Credit} = case Mode of
{credited, InitialDC} ->
{InitialDC, 0};
{simple_prefetch, 0} ->
{undefined, undefined}
end,
{ok, State#?STATE{ctag = Ctag,
delivery_count = DeliveryCount,
credit = Credit}}.
declare(Q, _Node) ->
#resource{name = NameBin} = Name = amqqueue:get_name(Q),
case NameBin of
<<"amq.rabbitmq.reply-to">> ->
{existing, Q};
_ ->
case exists(Name) of
true ->
{existing, Q};
false ->
{absent, Q, stopped}
end
end.
-spec exists(rabbit_amqqueue:name()) -> boolean().
exists(#resource{kind = queue,
name = QNameBin} = QName) ->
case pid_from_name(QNameBin, nodes_with_hashes()) of
{ok, Pid} when is_pid(Pid) ->
case ff_enabled() of
true ->
Request = {has_state, QName, ?MODULE},
MFA = {?MODULE, local_call, [Request]},
try delegate:invoke(Pid, MFA)
catch _:_ -> false
end;
false ->
case key_from_name(QNameBin) of
{ok, Key} ->
Msg = {declare_fast_reply_to, Key},
try gen_server:call(Pid, Msg, infinity) of
exists -> true;
_ -> false
catch exit:_ -> false
end;
error ->
false
end
end;
_ ->
false
end.
supports_stateful_delivery() ->
false.
deliver(Qs, Msg, #{correlation := Corr})
when Corr =/= undefined ->
Corrs = [Corr],
Actions = lists:map(fun({Q, stateless}) ->
deliver0(Q, Msg),
{settled, amqqueue:get_name(Q), Corrs}
end, Qs),
{[], Actions};
deliver(Qs, Msg, #{}) ->
lists:foreach(fun({Q, stateless}) ->
deliver0(Q, Msg)
end, Qs),
{[], []}.
deliver0(Q, Msg) ->
QName = amqqueue:get_name(Q),
QPid = amqqueue:get_pid(Q),
case ff_enabled() of
true ->
Request = {queue_event, QName, {deliver, Msg}},
MFA = {?MODULE, local_cast, [Request]},
delegate:invoke_no_result(QPid, MFA);
false ->
case key_from_name(QName#resource.name) of
{ok, Key} ->
MFA = {rabbit_channel, deliver_reply_local, [Key, Msg]},
delegate:invoke_no_result(QPid, MFA);
error ->
ok
end
end.
-spec local_cast(pid(), term()) -> ok.
local_cast(Pid, Request) ->
%% Ensure clients can't send a message to an arbitrary process and kill it.
case is_local(Pid) of
true -> gen_server:cast(Pid, Request);
false -> ok
end.
-spec local_call(pid(), term()) -> term().
local_call(Pid, Request) ->
%% Ensure clients can't send a message to an arbitrary process and kill it.
case is_local(Pid) of
true -> gen_server:call(Pid, Request);
false -> exit({unknown_pid, Pid})
end.
is_local(Pid) ->
rabbit_amqp_session:is_local(Pid) orelse
pg_local:in_group(rabbit_channels, Pid).
handle_event(QName, {deliver, Msg}, #?STATE{name = QName,
ctag = Ctag,
credit = undefined} = State) ->
{ok, State, deliver_actions(QName, Ctag, Msg)};
handle_event(QName, {deliver, Msg}, #?STATE{name = QName,
ctag = Ctag,
delivery_count = DeliveryCount,
credit = Credit} = State0)
when Credit > 0 ->
State = State0#?STATE{delivery_count = serial_number:add(DeliveryCount, 1),
credit = Credit - 1},
{ok, State, deliver_actions(QName, Ctag, Msg)};
handle_event(QName, {deliver, _Msg}, #?STATE{name = QName,
dropped = Dropped} = State) ->
rabbit_global_counters:messages_dead_lettered(maxlen, ?MODULE, disabled, 1),
{ok, State#?STATE{dropped = Dropped + 1}, []}.
deliver_actions(QName, Ctag, Mc) ->
Msgs = [{QName, self(), undefined, _Redelivered = false, Mc}],
[{deliver, Ctag, _AckRequired = false, Msgs}].
credit(_QName, CTag, DeliveryCountRcv, LinkCreditRcv, Drain,
#?STATE{delivery_count = DeliveryCountSnd} = State) ->
LinkCreditSnd = amqp10_util:link_credit_snd(
DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd),
{DeliveryCount, Credit} = case Drain of
true ->
{serial_number:add(DeliveryCountSnd, LinkCreditSnd), 0};
false ->
{DeliveryCountSnd, LinkCreditSnd}
end,
{State#?STATE{delivery_count = DeliveryCount,
credit = Credit},
[{credit_reply, CTag, DeliveryCount, Credit, _Available = 0, Drain}]}.
close(#?STATE{}) ->
ok.
update(_, #?STATE{} = State) ->
State.
cancel(_, _, #?STATE{} = State) ->
{ok, State}.
is_enabled() ->
true.
ff_enabled() ->
rabbit_feature_flags:is_enabled('rabbitmq_4.2.0').
is_compatible(_, _, _) ->
true.
is_recoverable(_) ->
false.
purge(_) ->
{ok, 0}.
policy_changed(_) ->
ok.
notify_decorators(_) ->
ok.
stat(_) ->
{ok, 0, 1}.
format(_, _) ->
[].
capabilities() ->
#{unsupported_policies => [],
queue_arguments => [],
consumer_arguments => [],
amqp_capabilities => [],
server_named => false,
rebalance_module => undefined,
can_redeliver => false ,
is_replicable => false}.
stop(_) ->
ok.
list_with_minimum_quorum() ->
[].
drain(_) ->
ok.
revive() ->
ok.
queue_vm_stats_sups() ->
{[], []}.
queue_vm_ets() ->
{[], []}.
delete(_, _, _, _) ->
{ok, 0}.
recover(_, _) ->
{[], []}.
settle(_, _, _, _, #?STATE{} = State) ->
{State, []}.
credit_v1(_, _, _, _, #?STATE{} = State) ->
{State, []}.
dequeue(_, _, _, _, #?STATE{name = Name}) ->
{protocol_error, not_implemented,
"basic.get not supported by volatile ~ts",
[rabbit_misc:rs(Name)]}.
state_info(#?STATE{}) ->
#{}.
info(_, _) ->
[].
policy_apply_to_name() ->
<<>>.
-spec new_name() ->
rabbit_misc:resource_name().
new_name() ->
EncodedPid = encode_pid(self()),
EncodedKey = base64:encode(rabbit_guid:gen()),
<<?PREFIX, EncodedPid/binary, ".", EncodedKey/binary>>.
%% This pid encoding function produces values that are of mostly fixed size
%% regardless of the node name length.
encode_pid(Pid) ->
PidParts0 = #{node := Node} = rabbit_pid_codec:decompose(Pid),
%% Note: we hash the entire node name. This is sufficient for our needs of shortening node name
%% in the TTB-encoded pid, and helps avoid doing the node name split for every single cluster member
%% in rabbit_nodes:all_running_with_hashes/0.
%%
%% We also use a synthetic node prefix because the hash alone will be sufficient to
NodeHash = erlang:phash2(Node),
PidParts = maps:update(node,
rabbit_nodes_common:make("reply", integer_to_list(NodeHash)),
PidParts0),
base64:encode(rabbit_pid_codec:recompose_to_binary(PidParts)).
-spec pid_from_name(rabbit_misc:resource_name(),
#{non_neg_integer() => node()}) ->
{ok, pid()} | error.
pid_from_name(<<?PREFIX, Bin/binary>>, CandidateNodes) ->
Cp = case persistent_term:get(?CP_DOT, undefined) of
undefined ->
P = binary:compile_pattern(<<".">>),
persistent_term:put(?CP_DOT, P),
P;
P ->
P
end,
try
[PidBase64, _KeyBase64] = binary:split(Bin, Cp),
PidBin = base64:decode(PidBase64),
PidParts0 = #{node := ShortenedNodename} = rabbit_pid_codec:decompose_from_binary(PidBin),
{_, NodeHash} = rabbit_nodes_common:parts(ShortenedNodename),
case maps:get(list_to_integer(NodeHash), CandidateNodes, undefined) of
undefined ->
error;
Candidate ->
PidParts = maps:update(node, Candidate, PidParts0),
{ok, rabbit_pid_codec:recompose(PidParts)}
end
catch error:_ -> error
end;
pid_from_name(_, _) ->
error.
%% Returns the base 64 encoded key.
-spec key_from_name(rabbit_misc:resource_name()) ->
{ok, binary()} | error.
key_from_name(<<?PREFIX, Suffix/binary>>) ->
case binary:split(Suffix, <<".">>) of
[_Pid, Key] ->
{ok, Key};
_ ->
error
end;
key_from_name(_) ->
error.
nodes_with_hashes() ->
#{erlang:phash2(Node) => Node || Node <- rabbit_nodes:list_members()}.

View File

@ -56,7 +56,8 @@ groups() ->
[
%% authz
attach_source_queue,
attach_source_queue_dynamic,
attach_source_queue_dynamic_exclusive,
attach_source_queue_dynamic_volatile,
attach_target_exchange,
attach_target_topic_exchange,
attach_target_queue,
@ -447,7 +448,7 @@ attach_source_queue(Config) ->
end,
ok = close_connection_sync(Conn).
attach_source_queue_dynamic(Config) ->
attach_source_queue_dynamic_exclusive(Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
@ -480,6 +481,41 @@ attach_source_queue_dynamic(Config) ->
end,
ok = close_connection_sync(Connection).
attach_source_queue_dynamic_volatile(Config) ->
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0'),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
%% missing read permission on volatile queue
ok = set_permissions(Config, <<".*">>, <<".*">>, <<>>),
Source = #{address => undefined,
durable => none,
expiry_policy => <<"link-detach">>,
dynamic => true,
capabilities => [<<"rabbitmq:volatile-queue">>]},
AttachArgs = #{name => <<"receiver">>,
role => {receiver, Source, self()},
snd_settle_mode => settled,
rcv_settle_mode => first},
{ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs),
receive {amqp10_event,
{session, Session,
{ended, Error}}} ->
#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
description = {utf8, Description}} = Error,
?assertEqual(
match,
re:run(Description,
<<"^read access to queue 'amq\.rabbitmq\.reply-to\..*' in vhost "
"'test vhost' refused for user 'test user'$">>,
[{capture, none}]))
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
ok = close_connection_sync(Connection).
attach_target_exchange(Config) ->
XName = <<"amq.fanout">>,
Address1 = rabbitmq_amqp_address:exchange(XName),

View File

@ -37,6 +37,7 @@
wait_for_credit/1,
wait_for_accepts/1,
send_messages/3, send_messages/4,
receive_messages/2,
detach_link_sync/1,
end_session_sync/1,
wait_for_session_end/1,
@ -3428,8 +3429,7 @@ max_message_size_server_to_client(Config) ->
role => {receiver, #{address => Address,
durable => configuration}, self()},
snd_settle_mode => unsettled,
rcv_settle_mode => first,
filter => #{}},
rcv_settle_mode => first},
{ok, Receiver} = amqp10_client:attach_link(Session, AttachArgs),
{ok, Msg} = amqp10_client:get_msg(Receiver),
?assertEqual([PayloadSmallEnough], amqp10_msg:body(Msg)),
@ -5009,8 +5009,7 @@ dynamic_source_rpc(Config) ->
AttachArgs = #{name => <<"rpc-client-receiver🥕"/utf8>>,
role => {receiver, Source, self()},
snd_settle_mode => unsettled,
rcv_settle_mode => first,
filter => #{}},
rcv_settle_mode => first},
{ok, ReceiverClient} = amqp10_client:attach_link(SessionClient, AttachArgs),
RespAddr = receive {amqp10_event, {link, ReceiverClient, {attached, Attach}}} ->
#'v1_0.attach'{
@ -5081,8 +5080,7 @@ dynamic_terminus_delete(Config) ->
durable => none},
RcvAttachArgs = #{role => {receiver, Terminus, self()},
snd_settle_mode => unsettled,
rcv_settle_mode => first,
filter => #{}},
rcv_settle_mode => first},
SndAttachArgs = #{role => {sender, Terminus},
snd_settle_mode => mixed,
rcv_settle_mode => first},
@ -5768,7 +5766,6 @@ footer_checksum(FooterOpt, Config) ->
durable => configuration}, self()},
snd_settle_mode => settled,
rcv_settle_mode => first,
filter => #{},
footer_opt => FooterOpt},
SndAttachArgs = #{name => <<"my sender">>,
role => {sender, #{address => Addr,
@ -6873,19 +6870,6 @@ drain_queue(Session, Address, N) ->
ok = amqp10_client:detach_link(Receiver),
{ok, Msgs}.
receive_messages(Receiver, N) ->
receive_messages0(Receiver, N, []).
receive_messages0(_Receiver, 0, Acc) ->
lists:reverse(Acc);
receive_messages0(Receiver, N, Acc) ->
receive
{amqp10_msg, Receiver, Msg} ->
receive_messages0(Receiver, N - 1, [Msg | Acc])
after 30000 ->
ct:fail({timeout, {num_received, length(Acc)}, {num_missing, N}})
end.
count_received_messages(Receiver) ->
count_received_messages0(Receiver, 0).

View File

@ -18,6 +18,7 @@
wait_for_accepts/1,
send_message/2,
send_messages/3, send_messages/4,
receive_messages/2,
detach_link_sync/1,
end_session_sync/1,
wait_for_session_end/1,
@ -118,6 +119,19 @@ send_messages(Sender, Left, Settled, BodySuffix) ->
ok = send_message(Sender, Msg),
send_messages(Sender, Left - 1, Settled, BodySuffix).
receive_messages(Receiver, Num) ->
receive_messages0(Receiver, Num, []).
receive_messages0(_Receiver, 0, Acc) ->
lists:reverse(Acc);
receive_messages0(Receiver, N, Acc) ->
receive
{amqp10_msg, Receiver, Msg} ->
receive_messages0(Receiver, N - 1, [Msg | Acc])
after 20_000 ->
ct:fail({timeout, {num_received, length(Acc)}, {num_missing, N}})
end.
detach_link_sync(Link) ->
ok = amqp10_client:detach_link(Link),
ok = wait_for_link_detach(Link).

View File

@ -0,0 +1,631 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(direct_reply_to_amqp_SUITE).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-compile([nowarn_export_all,
export_all]).
-import(amqp_utils,
[init/1, init/2,
connection_config/1, connection_config/2,
flush/1,
wait_for_credit/1,
wait_for_accepts/1,
send_messages/3,
receive_messages/2,
detach_link_sync/1,
end_session_sync/1,
close_connection_sync/1]).
all() ->
[
{group, cluster_size_1},
{group, cluster_size_3}
].
groups() ->
[
{cluster_size_1, [],
[
responder_attaches_queue_target,
many_replies,
many_volatile_queues_same_session
]},
{cluster_size_3, [shuffle],
[
rpc_new_to_old_node,
rpc_old_to_new_node
]}
].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(rabbitmq_amqp_client),
rabbit_ct_helpers:log_environment(),
Config.
end_per_suite(Config) ->
Config.
init_per_group(Group, Config) ->
Nodes = case Group of
cluster_size_1 -> 1;
cluster_size_3 -> 3
end,
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(
Config, [{rmq_nodes_count, Nodes},
{rmq_nodename_suffix, Suffix}]),
Config2 = rabbit_ct_helpers:run_setup_steps(
Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
case rabbit_ct_broker_helpers:enable_feature_flag(Config2, 'rabbitmq_4.2.0') of
ok ->
Config2;
{skip, _} = Skip ->
Skip
end.
end_per_group(_Group, Config) ->
rabbit_ct_helpers:run_teardown_steps(
Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
%% Responder attaches to a "/queues/amq.rabbitmq.reply-to.<suffix>" target.
responder_attaches_queue_target(Config) ->
RequestQueue = atom_to_binary(?FUNCTION_NAME),
AddrRequestQueue = rabbitmq_amqp_address:queue(RequestQueue),
{ConnResponder, SessionResponder, LinkPairResponder} = init(Config),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPairResponder, RequestQueue, #{}),
OpnConfRequester0 = connection_config(Config),
OpnConfRequester = OpnConfRequester0#{container_id := <<"requester">>,
notify_with_performative => true},
{ok, ConnRequester} = amqp10_client:open_connection(OpnConfRequester),
{ok, SessionRequester} = amqp10_client:begin_session_sync(ConnRequester),
{ok, ReceiverRequester} = amqp10_client:attach_link(SessionRequester, attach_args()),
AddrVolQ = receive {amqp10_event, {link, ReceiverRequester, {attached, Attach}}} ->
#'v1_0.attach'{
source = #'v1_0.source'{
address = {utf8, AddressVolatileQueue},
dynamic = true}} = Attach,
AddressVolatileQueue
after 9000 -> ct:fail({missing_event, ?LINE})
end,
ok = amqp10_client:flow_link_credit(ReceiverRequester, 3, never),
{ok, SenderRequester} = amqp10_client:attach_sender_link_sync(
SessionRequester, <<"sender requester">>, AddrRequestQueue),
ok = wait_for_credit(SenderRequester),
RpcId = <<"RPC message ID">>,
ok = amqp10_client:send_msg(
SenderRequester,
amqp10_msg:set_properties(
#{message_id => RpcId,
reply_to => AddrVolQ},
amqp10_msg:new(<<"t1">>, <<"request-1">>))),
receive {amqp10_disposition, {accepted, <<"t1">>}} -> ok
after 9000 -> ct:fail({missing_event, ?LINE})
end,
{ok, ReceiverResponder} = amqp10_client:attach_receiver_link(
SessionResponder, <<"receiver responder">>,
AddrRequestQueue, unsettled),
{ok, RequestMsg} = amqp10_client:get_msg(ReceiverResponder),
ok = amqp10_client:accept_msg(ReceiverResponder, RequestMsg),
?assertEqual(<<"request-1">>, amqp10_msg:body_bin(RequestMsg)),
#{message_id := RpcId,
reply_to := ReplyToAddr} = amqp10_msg:properties(RequestMsg),
?assertMatch(<<"/queues/amq.rabbitmq.reply-to.", _/binary>>, ReplyToAddr),
%% The metadata store should store only the request queue.
?assertEqual(1, rabbit_ct_broker_helpers:rpc(Config, rabbit_db_queue, count, [])),
{ok, #{queue := ReplyQ}} = rabbitmq_amqp_address:to_map(ReplyToAddr),
?assertMatch({ok, #{vhost := <<"/">>,
durable := false,
type := <<"rabbit_volatile_queue">>,
message_count := 0,
consumer_count := 1}},
rabbitmq_amqp_client:get_queue(LinkPairResponder, ReplyQ)),
{ok, SenderResponder1} = amqp10_client:attach_sender_link_sync(
SessionResponder, <<"sender responder unsettled">>,
ReplyToAddr, unsettled),
{ok, SenderResponder2} = amqp10_client:attach_sender_link_sync(
SessionResponder, <<"sender responder settled">>,
ReplyToAddr, settled),
ok = wait_for_credit(SenderResponder1),
ok = wait_for_credit(SenderResponder2),
flush(attached),
?assertMatch(#{publishers := 3,
consumers := 2},
maps:get(#{protocol => amqp10}, get_global_counters(Config))),
%% Multiple responders stream back multiple replies for the single request.
%% "AMQP is commonly used in publish/subscribe systems where copies of a single
%% original message are distributed to zero or many subscribers. AMQP permits
%% zero or multiple responses to a message with the reply-to property set,
%% which can be correlated using the correlation-id property."
%% [http-over-amqp-v1.0-wd06]
ok = amqp10_client:send_msg(
SenderResponder1,
amqp10_msg:set_properties(
#{message_id => <<"reply 1">>,
correlation_id => RpcId},
amqp10_msg:new(<<1>>, <<"reply-1">>))),
receive {amqp10_disposition, {accepted, <<1>>}} -> ok
after 9000 -> ct:fail({missing_event, ?LINE})
end,
ok = amqp10_client:send_msg(
SenderResponder1,
amqp10_msg:set_properties(
#{message_id => <<"reply 2">>,
to => ReplyToAddr,
correlation_id => RpcId},
amqp10_msg:new(<<2>>, <<"reply-2">>))),
receive {amqp10_disposition, {accepted, <<2>>}} -> ok
after 9000 -> ct:fail({missing_event, ?LINE})
end,
ok = amqp10_client:send_msg(
SenderResponder2,
amqp10_msg:set_properties(
#{message_id => <<"reply 3">>,
to => ReplyToAddr,
correlation_id => RpcId},
amqp10_msg:new(<<3>>, <<"reply-3">>, true))),
receive {amqp10_msg, ReceiverRequester, ReplyMsg1} ->
?assertEqual(<<"reply-1">>,
amqp10_msg:body_bin(ReplyMsg1)),
?assertMatch(#{message_id := <<"reply 1">>,
correlation_id := RpcId},
amqp10_msg:properties(ReplyMsg1))
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, ReceiverRequester, ReplyMsg2} ->
?assertEqual(<<"reply-2">>,
amqp10_msg:body_bin(ReplyMsg2)),
?assertMatch(#{message_id := <<"reply 2">>,
correlation_id := RpcId},
amqp10_msg:properties(ReplyMsg2))
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, ReceiverRequester, ReplyMsg3} ->
?assertEqual(<<"reply-3">>,
amqp10_msg:body_bin(ReplyMsg3)),
?assertMatch(#{message_id := <<"reply 3">>,
correlation_id := RpcId},
amqp10_msg:properties(ReplyMsg3))
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
%% RabbitMQ should drop the 4th reply due to insufficient link credit.
ok = amqp10_client:send_msg(
SenderResponder2,
amqp10_msg:set_properties(
#{message_id => <<"reply 4">>,
to => ReplyToAddr,
correlation_id => RpcId},
amqp10_msg:new(<<4>>, <<"reply-4">>, true))),
receive {amqp10_msg, _, _} ->
ct:fail({unxpected_msg, ?LINE})
after 5 -> ok
end,
%% Test drain
flush(pre_drain),
ok = amqp10_client:flow_link_credit(ReceiverRequester, 100_000, never, true),
receive {amqp10_event, {link, ReceiverRequester, credit_exhausted}} -> ok
after 9000 -> ct:fail({missing_event, ?LINE})
end,
%% RabbitMQ should also drop the 5th reply due to insufficient link credit.
ok = amqp10_client:send_msg(
SenderResponder2,
amqp10_msg:set_properties(
#{message_id => <<"reply 5">>,
to => ReplyToAddr,
correlation_id => RpcId},
amqp10_msg:new(<<5>>, <<"reply-5">>, true))),
receive {amqp10_msg, _, _} ->
ct:fail({unxpected_msg, ?LINE})
after 5 -> ok
end,
%% When the requester detaches, the volatile queue is gone.
ok = detach_link_sync(ReceiverRequester),
flush(detached),
?assertMatch(#{publishers := 3,
consumers := 1},
maps:get(#{protocol => amqp10}, get_global_counters(Config))),
%% Therefore, HTTP GET on that queue should return 404.
{error, Resp} = rabbitmq_amqp_client:get_queue(LinkPairResponder, ReplyQ),
?assertMatch(#{subject := <<"404">>}, amqp10_msg:properties(Resp)),
%% Also, RabbitMQ should refuse attaching to the volatile queue target.
{ok, SenderResponder3} = amqp10_client:attach_sender_link_sync(
SessionResponder, <<"sender responder 3">>,
ReplyToAddr),
receive {amqp10_event, {link, SenderResponder3, {detached, Error1}}} ->
?assertMatch(
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, <<"no queue 'amq.rabbitmq.reply-to.", _/binary>>}},
Error1)
after 9000 -> ct:fail({missing_event, ?LINE})
end,
%% RabbitMQ should also refuse attaching to the volatile queue target
%% when the requester ends the session.
ok = end_session_sync(SessionRequester),
{ok, SenderResponder4} = amqp10_client:attach_sender_link_sync(
SessionResponder, <<"sender responder 4">>,
ReplyToAddr),
receive {amqp10_event, {link, SenderResponder4, {detached, Error2}}} ->
?assertMatch(
#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_NOT_FOUND},
Error2)
after 9000 -> ct:fail({missing_event, ?LINE})
end,
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPairResponder, RequestQueue),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPairResponder),
ok = close_connection_sync(ConnResponder),
ok = close_connection_sync(ConnRequester),
Counters = get_global_counters(Config),
?assertMatch(#{messages_delivered_total := 3},
maps:get(#{protocol => amqp10,
queue_type => rabbit_volatile_queue}, Counters)),
?assertMatch(#{messages_dead_lettered_maxlen_total := 2},
maps:get(#{dead_letter_strategy => disabled,
queue_type => rabbit_volatile_queue}, Counters)),
?assertMatch(#{publishers := 0,
consumers := 0,
%% RabbitMQ received 6 msgs in total (1 request + 5 replies)
messages_received_total := 6},
maps:get(#{protocol => amqp10}, Counters)).
%% Test that responder can send many messages to requester.
%% Load test the volatile queue.
many_replies(Config) ->
Num = 3000,
RequestQueue = atom_to_binary(?FUNCTION_NAME),
AddrRequestQueue = rabbitmq_amqp_address:queue(RequestQueue),
{ConnResponder, SessionResponder, LinkPair} = init(Config),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, RequestQueue, #{}),
OpnConfRequester0 = connection_config(Config),
OpnConfRequester = OpnConfRequester0#{container_id := <<"requester">>,
notify_with_performative => true},
{ok, ConnRequester} = amqp10_client:open_connection(OpnConfRequester),
{ok, SessionRequester} = amqp10_client:begin_session_sync(ConnRequester),
{ok, ReceiverRequester} = amqp10_client:attach_link(SessionRequester, attach_args()),
AddrVolQ = receive {amqp10_event, {link, ReceiverRequester, {attached, Attach}}} ->
#'v1_0.attach'{
source = #'v1_0.source'{
address = {utf8, AddressVolatileQueue},
dynamic = true}} = Attach,
AddressVolatileQueue
after 9000 -> ct:fail({missing_event, ?LINE})
end,
ok = amqp10_client:flow_link_credit(ReceiverRequester, Num, never),
{ok, SenderRequester} = amqp10_client:attach_sender_link_sync(
SessionRequester, <<"sender requester">>, AddrRequestQueue),
ok = wait_for_credit(SenderRequester),
ok = amqp10_client:send_msg(
SenderRequester,
amqp10_msg:set_properties(
#{reply_to => AddrVolQ},
amqp10_msg:new(<<"t1">>, <<"request-1">>))),
receive {amqp10_disposition, {accepted, <<"t1">>}} -> ok
after 9000 -> ct:fail({missing_event, ?LINE})
end,
{ok, ReceiverResponder} = amqp10_client:attach_receiver_link(
SessionResponder, <<"receiver responder">>,
AddrRequestQueue, unsettled),
{ok, RequestMsg} = amqp10_client:get_msg(ReceiverResponder),
ok = amqp10_client:accept_msg(ReceiverResponder, RequestMsg),
#{reply_to := ReplyToAddr} = amqp10_msg:properties(RequestMsg),
{ok, SenderResponder} = amqp10_client:attach_sender_link_sync(
SessionResponder, <<"sender responder">>, ReplyToAddr),
ok = wait_for_credit(SenderResponder),
flush(attached),
ok = send_messages(SenderResponder, Num, true),
Msgs = receive_messages(ReceiverRequester, Num),
receive {amqp10_event, {link, ReceiverRequester, credit_exhausted}} -> ok
after 9000 -> ct:fail({missing_event, ?LINE})
end,
lists:foldl(fun(Msg, N) ->
Bin = integer_to_binary(N),
?assertEqual(Bin, amqp10_msg:body_bin(Msg)),
N - 1
end, Num, Msgs),
[ok = detach_link_sync(R) || R <- [ReceiverRequester, SenderRequester,
ReceiverResponder, SenderResponder]],
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, RequestQueue),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = close_connection_sync(ConnResponder),
ok = close_connection_sync(ConnRequester).
%% In contrast to AMQP 0.9.1, we expect RabbitMQ to allow for multiple volatile queues
%% on the same AMQP 1.0 session because for example a JMS app can create multiple
%% temporary queues on the same session:
%% https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/session#createTemporaryQueue()
%% https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/jmscontext#createTemporaryQueue()
many_volatile_queues_same_session(Config) ->
RequestQueue = atom_to_binary(?FUNCTION_NAME),
AddrRequestQueue = rabbitmq_amqp_address:queue(RequestQueue),
{ConnResponder, SessionResponder, LinkPair} = init(Config),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, RequestQueue, #{}),
OpnConfRequester0 = connection_config(Config),
OpnConfRequester = OpnConfRequester0#{container_id := <<"requester">>,
notify_with_performative => true},
{ok, ConnRequester} = amqp10_client:open_connection(OpnConfRequester),
{ok, SessionRequester} = amqp10_client:begin_session_sync(ConnRequester),
{ok, Receiver1Requester} = amqp10_client:attach_link(SessionRequester, attach_args(<<"r1">>)),
{ok, Receiver2Requester} = amqp10_client:attach_link(SessionRequester, attach_args(<<"r2">>)),
AddrVolQ1 = receive {amqp10_event, {link, Receiver1Requester, {attached, Attach1}}} ->
#'v1_0.attach'{
source = #'v1_0.source'{
address = {utf8, AddressVolatileQueue1},
dynamic = true}} = Attach1,
AddressVolatileQueue1
after 9000 -> ct:fail({missing_event, ?LINE})
end,
AddrVolQ2 = receive {amqp10_event, {link, Receiver2Requester, {attached, Attach2}}} ->
#'v1_0.attach'{
source = #'v1_0.source'{
address = {utf8, AddressVolatileQueue2},
dynamic = true}} = Attach2,
AddressVolatileQueue2
after 9000 -> ct:fail({missing_event, ?LINE})
end,
ok = amqp10_client:flow_link_credit(Receiver1Requester, 1, never),
ok = amqp10_client:flow_link_credit(Receiver2Requester, 1, never),
{ok, SenderRequester} = amqp10_client:attach_sender_link_sync(
SessionRequester, <<"sender requester">>, AddrRequestQueue),
ok = wait_for_credit(SenderRequester),
ok = amqp10_client:send_msg(
SenderRequester,
amqp10_msg:set_properties(
#{message_id => <<"RPC receiver 1">>,
reply_to => AddrVolQ1},
amqp10_msg:new(<<"t1">>, <<"request-1">>))),
ok = amqp10_client:send_msg(
SenderRequester,
amqp10_msg:set_properties(
#{message_id => <<"RPC receiver 2">>,
reply_to => AddrVolQ2},
amqp10_msg:new(<<"t2">>, <<"request-2">>))),
ok = wait_for_accepts(2),
{ok, ReceiverResponder} = amqp10_client:attach_receiver_link(
SessionResponder, <<"receiver responder">>,
AddrRequestQueue, settled),
{ok, RequestMsg1} = amqp10_client:get_msg(ReceiverResponder),
{ok, RequestMsg2} = amqp10_client:get_msg(ReceiverResponder),
#{message_id := Id1,
reply_to := ReplyToAddr1} = amqp10_msg:properties(RequestMsg1),
#{message_id := Id2,
reply_to := ReplyToAddr2} = amqp10_msg:properties(RequestMsg2),
?assertMatch(<<"/queues/amq.rabbitmq.reply-to.", _/binary>>, ReplyToAddr1),
?assertMatch(<<"/queues/amq.rabbitmq.reply-to.", _/binary>>, ReplyToAddr2),
?assertNotEqual(ReplyToAddr1, ReplyToAddr2),
%% The metadata store should store only the request queue.
?assertEqual(1, rabbit_ct_broker_helpers:rpc(Config, rabbit_db_queue, count, [])),
{ok, SenderResponder} = amqp10_client:attach_sender_link_sync(
SessionResponder, <<"sender responder">>,
null, mixed),
ok = wait_for_credit(SenderResponder),
flush(attached),
ok = amqp10_client:send_msg(
SenderResponder,
amqp10_msg:set_properties(
#{message_id => <<"reply 1">>,
to => ReplyToAddr1,
correlation_id => Id1},
amqp10_msg:new(<<1>>, <<"reply-1">>, true))),
ok = amqp10_client:send_msg(
SenderResponder,
amqp10_msg:set_properties(
#{message_id => <<"reply 2">>,
to => ReplyToAddr2,
correlation_id => Id2},
amqp10_msg:new(<<2>>, <<"reply-2">>, false))),
ok = wait_for_accepts(1),
receive {amqp10_msg, Receiver2Requester, ReplyMsg2} ->
?assertEqual(<<"reply-2">>,
amqp10_msg:body_bin(ReplyMsg2)),
?assertMatch(#{message_id := <<"reply 2">>,
correlation_id := <<"RPC receiver 2">>},
amqp10_msg:properties(ReplyMsg2))
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, Receiver1Requester, ReplyMsg1} ->
?assertEqual(<<"reply-1">>,
amqp10_msg:body_bin(ReplyMsg1)),
?assertMatch(#{message_id := <<"reply 1">>,
correlation_id := <<"RPC receiver 1">>},
amqp10_msg:properties(ReplyMsg1))
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
[ok = detach_link_sync(R) || R <- [Receiver1Requester, Receiver2Requester, SenderRequester,
ReceiverResponder, SenderResponder]],
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, RequestQueue),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = close_connection_sync(ConnResponder),
ok = close_connection_sync(ConnRequester).
%% "new" and "old" refers to new and old RabbitMQ versions in mixed version tests.
rpc_new_to_old_node(Config) ->
rpc(0, 1, Config).
rpc_old_to_new_node(Config) ->
rpc(1, 0, Config).
rpc(RequesterNode, ResponderNode, Config) ->
RequestQueue = atom_to_binary(?FUNCTION_NAME),
AddrRequestQueue = rabbitmq_amqp_address:queue(RequestQueue),
{ConnResponder, SessionResponder, _} = init(ResponderNode, Config),
OpnConfRequester0 = connection_config(RequesterNode, Config),
OpnConfRequester = OpnConfRequester0#{container_id := <<"requester">>,
notify_with_performative => true},
{ok, ConnRequester} = amqp10_client:open_connection(OpnConfRequester),
{ok, SessionRequester} = amqp10_client:begin_session_sync(ConnRequester),
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(
SessionRequester, <<"link pair requester">>),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, RequestQueue, #{}),
{ok, ReceiverRequester} = amqp10_client:attach_link(SessionRequester, attach_args()),
AddrVolQ = receive {amqp10_event, {link, ReceiverRequester, {attached, Attach}}} ->
#'v1_0.attach'{
source = #'v1_0.source'{
address = {utf8, AddressVolatileQueue},
dynamic = true}} = Attach,
AddressVolatileQueue
after 9000 -> ct:fail({missing_event, ?LINE})
end,
ok = amqp10_client:flow_link_credit(ReceiverRequester, 2, never),
{ok, SenderRequester} = amqp10_client:attach_sender_link_sync(
SessionRequester, <<"sender requester">>, AddrRequestQueue),
ok = wait_for_credit(SenderRequester),
RpcId = <<"RPC message ID">>,
ok = amqp10_client:send_msg(
SenderRequester,
amqp10_msg:set_properties(
#{message_id => RpcId,
reply_to => AddrVolQ},
amqp10_msg:new(<<"t1">>, <<"request-1">>))),
receive {amqp10_disposition, {accepted, <<"t1">>}} -> ok
after 9000 -> ct:fail({missing_event, ?LINE})
end,
ok = rabbit_ct_broker_helpers:await_metadata_store_consistent(Config, ResponderNode),
{ok, ReceiverResponder} = amqp10_client:attach_receiver_link(
SessionResponder, <<"receiver responder">>,
RequestQueue, unsettled),
{ok, RequestMsg} = amqp10_client:get_msg(ReceiverResponder),
?assertEqual(<<"request-1">>, amqp10_msg:body_bin(RequestMsg)),
#{message_id := RpcId,
reply_to := ReplyToAddr} = amqp10_msg:properties(RequestMsg),
?assertMatch(<<"/queues/amq.rabbitmq.reply-to.", _/binary>>, ReplyToAddr),
{ok, SenderResponderAnon} = amqp10_client:attach_sender_link_sync(
SessionResponder,
<<"sender responder anonymous terminus">>,
null, unsettled),
ok = wait_for_credit(SenderResponderAnon),
flush(attached),
%% Responder streams back two replies for the single request.
ok = amqp10_client:send_msg(
SenderResponderAnon,
amqp10_msg:set_properties(
#{message_id => <<"reply 1">>,
to => ReplyToAddr,
correlation_id => RpcId},
amqp10_msg:new(<<1>>, <<"reply-1">>))),
receive {amqp10_disposition, {accepted, <<1>>}} -> ok
after 9000 -> ct:fail({missing_event, ?LINE})
end,
ok = amqp10_client:send_msg(
SenderResponderAnon,
amqp10_msg:set_properties(
#{message_id => <<"reply 2">>,
to => ReplyToAddr,
correlation_id => RpcId},
amqp10_msg:new(<<2>>, <<"reply-2">>))),
receive {amqp10_disposition, {accepted, <<2>>}} -> ok
after 9000 -> ct:fail({missing_event, ?LINE})
end,
ok = amqp10_client:accept_msg(ReceiverResponder, RequestMsg),
%% The metadata store should store only the request queue.
?assertEqual(1, rabbit_ct_broker_helpers:rpc(Config, rabbit_db_queue, count, [])),
receive {amqp10_msg, ReceiverRequester, ReplyMsg1} ->
?assertEqual(<<"reply-1">>,
amqp10_msg:body_bin(ReplyMsg1)),
?assertMatch(#{message_id := <<"reply 1">>,
correlation_id := RpcId},
amqp10_msg:properties(ReplyMsg1))
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, ReceiverRequester, ReplyMsg2} ->
?assertEqual(<<"reply-2">>,
amqp10_msg:body_bin(ReplyMsg2)),
?assertMatch(#{message_id := <<"reply 2">>,
correlation_id := RpcId},
amqp10_msg:properties(ReplyMsg2))
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, RequestQueue),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = close_connection_sync(ConnResponder),
ok = close_connection_sync(ConnRequester).
attach_args() ->
attach_args(<<"receiver requester">>).
attach_args(Name) ->
Source = #{address => undefined,
durable => none,
expiry_policy => <<"link-detach">>,
dynamic => true,
capabilities => [<<"rabbitmq:volatile-queue">>]},
#{name => Name,
role => {receiver, Source, self()},
snd_settle_mode => settled,
rcv_settle_mode => first}.
get_global_counters(Config) ->
rabbit_ct_broker_helpers:rpc(Config, rabbit_global_counters, overview, []).

View File

@ -5,18 +5,27 @@
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(amqpl_direct_reply_to_SUITE).
-module(direct_reply_to_amqpl_SUITE).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-compile([nowarn_export_all,
export_all]).
-import(rabbit_ct_helpers, [eventually/1]).
-import(rabbit_ct_helpers,
[eventually/1]).
-import(amqp_utils,
[init/1,
close/1,
connection_config/1,
wait_for_credit/1,
end_session_sync/1,
close_connection_sync/1]).
-define(TIMEOUT, 30_000).
-define(TIMEOUT, 9000).
%% This is the pseudo queue that is specially interpreted by RabbitMQ.
-define(REPLY_QUEUE, <<"amq.rabbitmq.reply-to">>).
@ -24,19 +33,26 @@
all() ->
[
{group, cluster_size_1},
{group, cluster_size_1_ff_disabled},
{group, cluster_size_3}
].
groups() ->
[
{cluster_size_1, [shuffle],
cluster_size_1_common() ++
[
trace,
failure_ack_mode,
failure_multiple_consumers,
failure_reuse_consumer_tag,
failure_publish
amqpl_amqp_amqpl,
amqp_amqpl_amqp
]},
%% Delete this group when feature flag rabbitmq_4.2.0 becomes required.
{cluster_size_1_ff_disabled, [],
cluster_size_1_common() ++
[
enable_ff % must run last
]},
{cluster_size_3, [shuffle],
[
rpc_new_to_old_node,
@ -44,11 +60,21 @@ groups() ->
]}
].
cluster_size_1_common() ->
[
trace,
failure_ack_mode,
failure_multiple_consumers,
failure_reuse_consumer_tag,
failure_publish
].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(rabbitmq_amqp_client),
rabbit_ct_helpers:log_environment(),
Config.
@ -57,15 +83,27 @@ end_per_suite(Config) ->
init_per_group(Group, Config) ->
Nodes = case Group of
cluster_size_1 -> 1;
cluster_size_3 -> 3
cluster_size_3 ->
3;
_ ->
1
end,
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(
Config, [{rmq_nodes_count, Nodes},
Config1 = case Group of
cluster_size_1_ff_disabled ->
rabbit_ct_helpers:merge_app_env(
Config,
{rabbit,
[{forced_feature_flags_on_init,
{rel, [], ['rabbitmq_4.2.0']}}]});
_ ->
Config
end,
Suffix = rabbit_ct_helpers:testcase_absname(Config1, "", "-"),
Config2 = rabbit_ct_helpers:set_config(
Config1, [{rmq_nodes_count, Nodes},
{rmq_nodename_suffix, Suffix}]),
rabbit_ct_helpers:run_setup_steps(
Config1,
Config2,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
@ -81,21 +119,129 @@ init_per_testcase(Testcase, Config) ->
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
%% Test enabling the feature flag while a client consumes from the volatile queue.
%% Delete this test case when feature flag rabbitmq_4.2.0 becomes required.
enable_ff(Config) ->
RequestQueue = <<"request queue">>,
RequestPayload = <<"my request">>,
CorrelationId = <<"my correlation ID">>,
RequesterCh = rabbit_ct_client_helpers:open_channel(Config),
ResponderCh = rabbit_ct_client_helpers:open_channel(Config),
amqp_channel:subscribe(RequesterCh,
#'basic.consume'{queue = ?REPLY_QUEUE,
no_ack = true},
self()),
CTag = receive #'basic.consume_ok'{consumer_tag = CTag0} -> CTag0
end,
#'queue.declare_ok'{} = amqp_channel:call(
RequesterCh,
#'queue.declare'{queue = RequestQueue}),
#'confirm.select_ok'{} = amqp_channel:call(RequesterCh, #'confirm.select'{}),
amqp_channel:register_confirm_handler(RequesterCh, self()),
%% Send the request.
amqp_channel:cast(
RequesterCh,
#'basic.publish'{routing_key = RequestQueue},
#amqp_msg{props = #'P_basic'{reply_to = ?REPLY_QUEUE,
correlation_id = CorrelationId},
payload = RequestPayload}),
receive #'basic.ack'{} -> ok
end,
%% Receive the request.
{#'basic.get_ok'{},
#amqp_msg{props = #'P_basic'{reply_to = ReplyTo,
correlation_id = CorrelationId},
payload = RequestPayload}
} = amqp_channel:call(ResponderCh, #'basic.get'{queue = RequestQueue}),
?assertEqual(#'queue.declare_ok'{queue = ReplyTo,
message_count = 0,
consumer_count = 1},
amqp_channel:call(ResponderCh,
#'queue.declare'{queue = ReplyTo})),
%% Send the first reply.
amqp_channel:cast(
ResponderCh,
#'basic.publish'{routing_key = ReplyTo},
#amqp_msg{props = #'P_basic'{correlation_id = CorrelationId},
payload = <<"reply 1">>}),
%% Receive the frst reply.
receive {#'basic.deliver'{consumer_tag = CTag,
redelivered = false,
exchange = <<>>,
routing_key = ReplyTo},
#amqp_msg{payload = P1,
props = #'P_basic'{correlation_id = CorrelationId}}} ->
?assertEqual(<<"reply 1">>, P1)
after ?TIMEOUT -> ct:fail({missing_reply, ?LINE})
end,
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0'),
?assertEqual(#'queue.declare_ok'{queue = ReplyTo,
message_count = 0,
consumer_count = 1},
amqp_channel:call(ResponderCh,
#'queue.declare'{queue = ReplyTo})),
%% Send the second reply.
amqp_channel:cast(
ResponderCh,
#'basic.publish'{routing_key = ReplyTo},
#amqp_msg{props = #'P_basic'{correlation_id = CorrelationId},
payload = <<"reply 2">>}),
%% Receive the second reply.
receive {#'basic.deliver'{consumer_tag = CTag},
#amqp_msg{payload = P2,
props = #'P_basic'{correlation_id = CorrelationId}}} ->
?assertEqual(<<"reply 2">>, P2)
after ?TIMEOUT -> ct:fail({missing_reply, ?LINE})
end,
%% Requester cancels consumption.
?assertMatch(#'basic.cancel_ok'{consumer_tag = CTag},
amqp_channel:call(RequesterCh, #'basic.cancel'{consumer_tag = CTag})),
%% Responder checks again if the requester is still there.
%% This time, the requester and its queue should be gone.
try amqp_channel:call(ResponderCh, #'queue.declare'{queue = ReplyTo}) of
_ ->
ct:fail("expected queue.declare to fail")
catch exit:Reason ->
?assertMatch(
{{_, {_, _, <<"NOT_FOUND - no queue '",
ReplyTo:(byte_size(ReplyTo))/binary,
"' in vhost '/'">>}}, _},
Reason)
end,
%% Clean up.
#'queue.delete_ok'{} = amqp_channel:call(RequesterCh,
#'queue.delete'{queue = RequestQueue}),
ok = rabbit_ct_client_helpers:close_channel(RequesterCh).
%% Test case for
%% https://github.com/rabbitmq/rabbitmq-server/discussions/11662
trace(Config) ->
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_on"]),
Node = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)),
TraceQueue = <<"tests.amqpl_direct_reply_to.trace.tracing">>,
RequestQueue = <<"tests.amqpl_direct_reply_to.trace.requests">>,
RequestPayload = <<"my request">>,
ReplyPayload = <<"my reply">>,
CorrelationId = <<"my correlation ID">>,
TraceQueue = <<"trace-queue">>,
RequestQueue = <<"request-queue">>,
RequestPayload = <<"my-request">>,
ReplyPayload = <<"my-reply">>,
CorrelationId = <<"my-correlation-id">>,
Qs = [RequestQueue, TraceQueue],
Ch = rabbit_ct_client_helpers:open_channel(Config),
RequesterCh = rabbit_ct_client_helpers:open_channel(Config, 0),
ResponderCh = rabbit_ct_client_helpers:open_channel(Config, 0),
RequesterCh = rabbit_ct_client_helpers:open_channel(Config),
ResponderCh = rabbit_ct_client_helpers:open_channel(Config),
[#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q0}) || Q0 <- Qs],
#'queue.bind_ok'{} = amqp_channel:call(
@ -266,6 +412,201 @@ failure_publish(Config) ->
end,
ok = rabbit_ct_client_helpers:close_connection(Conn).
%% Test that Direct Reply-To works when the requester is an AMQP 0.9.1 client
%% and the responder is an AMQP 1.0 client.
amqpl_amqp_amqpl(Config) ->
RequestQ = atom_to_binary(?FUNCTION_NAME),
AddrRequestQ = rabbitmq_amqp_address:queue(RequestQ),
Id = <<"🥕"/utf8>>,
RequestPayload = <<"request payload">>,
ReplyPayload = <<"reply payload">>,
Chan = rabbit_ct_client_helpers:open_channel(Config),
amqp_channel:subscribe(Chan, #'basic.consume'{queue = ?REPLY_QUEUE,
no_ack = true}, self()),
CTag = receive #'basic.consume_ok'{consumer_tag = CTag0} -> CTag0
end,
%% Send the request via AMQP 0.9.1
#'queue.declare_ok'{} = amqp_channel:call(Chan, #'queue.declare'{queue = RequestQ}),
amqp_channel:cast(Chan,
#'basic.publish'{routing_key = RequestQ},
#amqp_msg{props = #'P_basic'{reply_to = ?REPLY_QUEUE,
message_id = Id},
payload = RequestPayload}),
%% Receive the request via AMQP 1.0.
{_, Session, LinkPair} = Init = init(Config),
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"receiver">>, AddrRequestQ),
{ok, RequestMsg} = amqp10_client:get_msg(Receiver, ?TIMEOUT),
?assertEqual(RequestPayload, amqp10_msg:body_bin(RequestMsg)),
#{message_id := Id,
reply_to := ReplyToAddr} = amqp10_msg:properties(RequestMsg),
%% AMQP 1.0 responder checks whether the AMQP 0.9.1 requester is still there.
{ok, #{queue := ReplyQ}} = rabbitmq_amqp_address:to_map(ReplyToAddr),
?assertMatch({ok, #{vhost := <<"/">>,
durable := false,
type := <<"rabbit_volatile_queue">>,
message_count := 0,
consumer_count := 1}},
rabbitmq_amqp_client:get_queue(LinkPair, ReplyQ)),
%% Send the reply via AMQP 1.0.
{ok, Sender} = amqp10_client:attach_sender_link_sync(
Session, <<"sender">>, ReplyToAddr),
ok = wait_for_credit(Sender),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_headers(
#{priority => 3,
durable => true},
amqp10_msg:set_properties(
#{message_id => <<"reply ID">>,
correlation_id => Id},
amqp10_msg:set_application_properties(
#{<<"my key">> => <<"my val">>},
amqp10_msg:new(<<1>>, ReplyPayload, true))))),
%% Receive the reply via AMQP 0.9.1
receive {Deliver, #amqp_msg{payload = ReplyPayload,
props = #'P_basic'{headers = Headers} = Props}} ->
?assertMatch(#'basic.deliver'{
consumer_tag = CTag,
redelivered = false,
exchange = <<>>,
routing_key = <<"amq.rabbitmq.reply-to.", _/binary>>},
Deliver),
?assertMatch(#'P_basic'{
message_id = <<"reply ID">>,
correlation_id = Id,
priority = 3,
delivery_mode = 2},
Props),
?assertEqual({value, {<<"my key">>, longstr, <<"my val">>}},
lists:keysearch(<<"my key">>, 1, Headers))
after ?TIMEOUT -> ct:fail(missing_reply)
end,
%% AMQP 0.9.1 requester cancels consumption.
?assertMatch(#'basic.cancel_ok'{consumer_tag = CTag},
amqp_channel:call(Chan, #'basic.cancel'{consumer_tag = CTag})),
%% This time, when the AMQP 1.0 responder checks whether the AMQP 0.9.1 requester
%% is still there, an error should be returned.
{error, Resp} = rabbitmq_amqp_client:get_queue(LinkPair, ReplyQ),
?assertMatch(#{subject := <<"404">>}, amqp10_msg:properties(Resp)),
?assertEqual(#'v1_0.amqp_value'{content = {utf8, <<"queue '", ReplyQ/binary, "' in vhost '/' not found">>}},
amqp10_msg:body(Resp)),
#'queue.delete_ok'{} = amqp_channel:call(Chan, #'queue.delete'{queue = RequestQ}),
ok = close(Init),
ok = rabbit_ct_client_helpers:close_channel(Chan).
%% Test that Direct Reply-To works when the requester is an AMQP 1.0 client
%% and the responder is an AMQP 0.9.1 client.
amqp_amqpl_amqp(Config) ->
RequestQ = atom_to_binary(?FUNCTION_NAME),
AddrRequestQ = rabbitmq_amqp_address:queue(RequestQ),
Id = <<"🥕"/utf8>>,
RequestPayload = <<"request payload">>,
ReplyPayload = <<"reply payload">>,
Chan = rabbit_ct_client_helpers:open_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Chan, #'queue.declare'{queue = RequestQ}),
OpnConf0 = connection_config(Config),
OpnConf = OpnConf0#{container_id := <<"requester">>,
notify_with_performative => true},
{ok, Conn} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Conn),
Source = #{address => undefined,
durable => none,
expiry_policy => <<"link-detach">>,
dynamic => true,
capabilities => [<<"rabbitmq:volatile-queue">>]},
AttachArgs = #{name => <<"receiver">>,
role => {receiver, Source, self()},
snd_settle_mode => settled,
rcv_settle_mode => first},
{ok, Receiver} = amqp10_client:attach_link(Session, AttachArgs),
AddrReplyQ = receive {amqp10_event, {link, Receiver, {attached, Attach}}} ->
#'v1_0.attach'{
source = #'v1_0.source'{
address = {utf8, Addr},
dynamic = true}} = Attach,
Addr
after 9000 -> ct:fail({missing_event, ?LINE})
end,
ok = amqp10_client:flow_link_credit(Receiver, 1, never),
%% Send the request via AMQP 1.0
{ok, SenderRequester} = amqp10_client:attach_sender_link_sync(
Session, <<"sender">>, AddrRequestQ),
ok = wait_for_credit(SenderRequester),
ok = amqp10_client:send_msg(
SenderRequester,
amqp10_msg:set_properties(
#{message_id => Id,
reply_to => AddrReplyQ},
amqp10_msg:new(<<"t1">>, RequestPayload))),
receive {amqp10_disposition, {accepted, <<"t1">>}} -> ok
after 9000 -> ct:fail({missing_event, ?LINE})
end,
%% Receive the request via AMQP 0.9.1
{#'basic.get_ok'{},
#amqp_msg{props = #'P_basic'{reply_to = ReplyQ,
message_id = Id},
payload = RequestPayload}
} = amqp_channel:call(Chan, #'basic.get'{queue = RequestQ}),
%% Test what the docs state:
%% "If the RPC server is going to perform some expensive computation it might wish
%% to check if the client has gone away. To do this the server can declare the
%% generated reply name first on a disposable channel in order to determine whether
%% it still exists."
?assertEqual(#'queue.declare_ok'{queue = ReplyQ,
message_count = 0,
consumer_count = 1},
amqp_channel:call(Chan, #'queue.declare'{queue = ReplyQ})),
%% Send the reply via AMQP 0.9.1
amqp_channel:cast(
Chan,
#'basic.publish'{routing_key = ReplyQ},
#amqp_msg{props = #'P_basic'{message_id = <<"msg ID reply">>,
correlation_id = Id},
payload = ReplyPayload}),
%% Receive the reply via AMQP 1.0
receive {amqp10_msg, Receiver, Reply} ->
?assertEqual(ReplyPayload,
amqp10_msg:body_bin(Reply)),
?assertMatch(#{message_id := <<"msg ID reply">>,
correlation_id := Id},
amqp10_msg:properties(Reply))
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
ok = end_session_sync(Session),
ok = close_connection_sync(Conn),
#'queue.delete_ok'{} = amqp_channel:call(Chan, #'queue.delete'{queue = RequestQ}),
%% AMQP 0.9.1 responder checks again if the AMQP 1.0 requester is still there.
%% This time, the requester and its queue should be gone.
try amqp_channel:call(Chan, #'queue.declare'{queue = ReplyQ}) of
_ ->
ct:fail("expected queue.declare to fail")
catch exit:Reason ->
?assertMatch(
{{_, {_, _, <<"NOT_FOUND - no queue '",
ReplyQ:(byte_size(ReplyQ))/binary,
"' in vhost '/'">>}}, _},
Reason)
end.
%% "new" and "old" refers to new and old RabbitMQ versions in mixed version tests.
rpc_new_to_old_node(Config) ->
rpc(0, 1, Config).
@ -337,7 +678,6 @@ rpc(RequesterNode, ResponderNode, Config) ->
payload = <<"reply 1">>}),
%% Let's assume the RPC server sends multiple replies for a single request.
%% (This is a bit unusual but should work.)
%% Setting the reply address in CC should work.
amqp_channel:cast(
ResponderCh,
@ -366,7 +706,7 @@ rpc(RequesterNode, ResponderNode, Config) ->
end,
%% The requester sends a reply to itself.
%% (Really odd, but should work.)
%% (Odd, but should work.)
amqp_channel:cast(
RequesterCh,
#'basic.publish'{routing_key = ReplyTo},

View File

@ -335,7 +335,7 @@ amqpl_amqp_bin_amqpl(_Config) ->
delivery_mode = 2,
priority = 98,
correlation_id = <<"corr">> ,
reply_to = <<"reply-to">>,
reply_to = <<"reply/to">>,
expiration = <<"1">>,
message_id = <<"msg-id">>,
timestamp = 99,
@ -407,7 +407,7 @@ amqpl_amqp_bin_amqpl(_Config) ->
Hdr10),
?assertMatch(#'v1_0.properties'{content_encoding = {symbol, <<"gzip">>},
content_type = {symbol, <<"text/plain">>},
reply_to = {utf8, <<"reply-to">>},
reply_to = {utf8, <<"/queues/reply%2Fto">>},
creation_time = {timestamp, 99000},
user_id = {binary, <<"banana">>},
group_id = {utf8, <<"rmq">>}
@ -452,6 +452,9 @@ amqpl_amqp_bin_amqpl(_Config) ->
?assertEqual(RoutingHeaders,
maps:remove(<<"timestamp_in_ms">>, RoutingHeaders2)),
#content{properties = #'P_basic'{reply_to = ReplyTo}} = mc:protocol_state(MsgL2),
?assertEqual(<<"reply/to">>, ReplyTo),
ok = persistent_term:put(message_interceptors, []).
amqpl_cc_amqp_bin_amqpl(_Config) ->

View File

@ -182,9 +182,9 @@ smoke(Config) ->
?assertEqual(#{
messages_acknowledged_total => 3,
messages_delivered_consume_auto_ack_total => 0,
messages_delivered_consume_manual_ack_total => 0,
messages_delivered_consume_manual_ack_total => 2,
messages_delivered_get_auto_ack_total => 0,
messages_delivered_get_manual_ack_total => 0,
messages_delivered_get_manual_ack_total => 2,
messages_delivered_total => 4,
messages_get_empty_total => 2,
messages_redelivered_total => 1

View File

@ -51,8 +51,8 @@ prop_decode_reply_to(_) ->
PidParts = #{node => Node, id => 0, serial => 0, creation => 0},
IxParts = PidParts#{node := rabbit_nodes_common:make("banana", Ix)},
IxPartsEnc = base64:encode(pid_recomposition:to_binary(IxParts)),
IxBin = <<IxPartsEnc/binary, ".", Key/binary>>,
IxPartsEnc = base64:encode(rabbit_pid_codec:recompose_to_binary(IxParts)),
QNameBin = <<"amq.rabbitmq.reply-to.", IxPartsEnc/binary, ".", Key/binary>>,
NodeMap = maps:from_list(NodeList),
NoNodeMap = maps:from_list(NoNodeList),
@ -60,10 +60,12 @@ prop_decode_reply_to(_) ->
%% There is non-zero chance Random is a valid encoded Pid.
NonB64 = <<0, Random/binary>>,
{ok, pid_recomposition:recompose(PidParts), Key} =:=
rabbit_direct_reply_to:decode_reply_to(IxBin, NodeMap)
andalso {error, target_node_not_found} =:=
rabbit_direct_reply_to:decode_reply_to(IxBin, NoNodeMap)
andalso {error, unrecognized_format} =:=
rabbit_direct_reply_to:decode_reply_to(NonB64, NodeMap)
{ok, rabbit_pid_codec:recompose(PidParts)} =:=
rabbit_volatile_queue:pid_from_name(QNameBin, NodeMap)
andalso {ok, Key} =:=
rabbit_volatile_queue:key_from_name(QNameBin)
andalso error =:=
rabbit_volatile_queue:pid_from_name(QNameBin, NoNodeMap)
andalso error =:=
rabbit_volatile_queue:pid_from_name(NonB64, NodeMap)
end).

View File

@ -18,9 +18,8 @@
%% consistent route, to prevent them being reordered. In fact all
%% AMQP-ish things (such as queue declaration results and basic.get)
%% must take the same route as well, to ensure that clients see causal
%% ordering correctly. Therefore we have a rather generic mechanism
%% here rather than just a message-reflector. That's also why we pick
%% the delegate process to use based on a hash of the source pid.
%% ordering correctly. Therefore we can't use erpc. That's also why we
%% pick the delegate process to use based on a hash of the source pid.
%%
%% When a function is invoked using delegate:invoke/2,
%% or delegate:invoke_no_result/2 on a group of pids, the pids are first split

View File

@ -8,7 +8,13 @@
-export([exchange/1,
exchange/2,
queue/1]).
queue/1,
from_map/1,
to_map/1]).
-type address_map() :: #{queue := unicode:unicode_binary()} |
#{exchange := unicode:unicode_binary(),
routing_key => unicode:unicode_binary()}.
-spec exchange(unicode:unicode_binary()) ->
unicode:unicode_binary().
@ -28,3 +34,57 @@ exchange(ExchangeName, RoutingKey) ->
queue(QueueName) ->
QueueNameQuoted = uri_string:quote(QueueName),
<<"/queues/", QueueNameQuoted/binary>>.
-spec from_map(address_map()) ->
unicode:unicode_binary().
from_map(#{exchange := Exchange, routing_key := RoutingKey}) ->
exchange(Exchange, RoutingKey);
from_map(#{exchange := Exchange}) ->
exchange(Exchange);
from_map(#{queue := Queue}) ->
queue(Queue).
-spec to_map(unicode:unicode_binary()) ->
{ok, address_map()} | error.
to_map(<<"/exchanges/", Rest/binary>>) ->
case binary:split(Rest, <<"/">>, [global]) of
[ExchangeQuoted]
when ExchangeQuoted =/= <<>> ->
Exchange = uri_string:unquote(ExchangeQuoted),
{ok, #{exchange => Exchange}};
[ExchangeQuoted, RoutingKeyQuoted]
when ExchangeQuoted =/= <<>> ->
Exchange = uri_string:unquote(ExchangeQuoted),
RoutingKey = uri_string:unquote(RoutingKeyQuoted),
{ok, #{exchange => Exchange,
routing_key => RoutingKey}};
_ ->
error
end;
to_map(<<"/queues/">>) ->
error;
to_map(<<"/queues/", QueueQuoted/binary>>) ->
Queue = uri_string:unquote(QueueQuoted),
{ok, #{queue => Queue}};
to_map(_) ->
error.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
address_test() ->
M1 = #{queue => <<"my queue">>},
M2 = #{queue => <<"🥕"/utf8>>},
M3 = #{exchange => <<"my exchange">>},
M4 = #{exchange => <<"🥕"/utf8>>},
M5 = #{exchange => <<"my exchange">>,
routing_key => <<"my routing key">>},
M6 = #{exchange => <<"🥕"/utf8>>,
routing_key => <<"🍰"/utf8>>},
lists:foreach(fun(Map) ->
{ok, Map} = to_map(from_map(Map))
end, [M1, M2, M3, M4, M5, M6]),
error = to_map(<<"/queues/">>),
error = to_map(<<"/exchanges/">>),
error = to_map(<<"/exchanges//key">>).
-endif.

View File

@ -26,9 +26,9 @@
%% Stateless rabbit_queue_type callbacks.
-export([
is_stateful/0,
declare/2,
delete/4,
supports_stateful_delivery/0,
deliver/3,
is_enabled/0,
is_compatible/3,
@ -75,11 +75,6 @@
-define(INFO_KEYS, [type, name, durable, auto_delete, arguments,
pid, owner_pid, state, messages]).
-spec is_stateful() ->
boolean().
is_stateful() ->
false.
-spec declare(amqqueue:amqqueue(), node()) ->
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()} |
@ -135,6 +130,9 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) ->
Err
end.
supports_stateful_delivery() ->
false.
-spec deliver([{amqqueue:amqqueue(), stateless}],
Msg :: mc:state(),
rabbit_queue_type:delivery_options()) ->

View File

@ -121,13 +121,14 @@ amqp10_destination(Config, AckMode) ->
{amqp10_msg, Receiver, InMsg} ->
[<<42>>] = amqp10_msg:body(InMsg),
Ts = Timestamp * 1000,
ReplyTo = <<"/queues/", ?UNSHOVELLED/binary>>,
?assertMatch(
#{content_type := ?UNSHOVELLED,
content_encoding := ?UNSHOVELLED,
correlation_id := ?UNSHOVELLED,
user_id := <<"guest">>,
message_id := ?UNSHOVELLED,
reply_to := ?UNSHOVELLED,
reply_to := ReplyTo,
%% Message timestamp is no longer overwritten
creation_time := Ts},
amqp10_msg:properties(InMsg)),

View File

@ -137,13 +137,14 @@ local_destination(Config, AckMode) ->
receive
{amqp10_msg, Receiver, InMsg} ->
ReplyTo = <<"/queues/", ?UNSHOVELLED/binary>>,
[<<42>>] = amqp10_msg:body(InMsg),
#{content_type := ?UNSHOVELLED,
content_encoding := ?UNSHOVELLED,
correlation_id := ?UNSHOVELLED,
user_id := <<"guest">>,
message_id := ?UNSHOVELLED,
reply_to := ?UNSHOVELLED
reply_to := ReplyTo
} = amqp10_msg:properties(InMsg),
#{<<"header1">> := 1,
<<"header2">> := <<"h2">>