rabbitmq-server/deps/rabbitmq_amqp_client
David Ansari 72cd7a35c2 Support Direct Reply-To for AMQP 1.0
# What?
* Support Direct Reply-To for AMQP 1.0
* Compared to AMQP 0.9.1, this PR allows for multiple volatile queues on a single
  AMQP 1.0 session. Use case: JMS clients can create multiple temporary queues on
  the same JMS/AMQP session:
  * https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/session#createTemporaryQueue()
  * https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/jmscontext#createTemporaryQueue()
* Fix missing metrics in for Direct Reply-To in AMQP 0.9.1, e.g.
  `messages_delivered_total`
* Fix missing metrics (even without using Direct Reply-To ) in AMQP 0.9.1:
  If stats level is not `fine`, global metrics `rabbitmq_global_messages_delivered_*` should still be incremented.

 # Why?
* Allow for scalable at-most-once RPC reply delivery
  Example use case: thousands of requesters connect, send a single
  request, wait for a single reply, and disconnect.
  This PR won't create any queue and won't write to the metadata store.
  Therefore, there's less pressure on the metadata store, less pressure
  on the Management API when listing all queues, less pressure on the
  metrics subsystem, etc.
* Feature parity with AMQP 0.9.1

 # How?
This PR extracts the previously channel specific Direct Reply-To code
into a new queue type: `rabbit_volatile_queue`.
"Volatile" describes the semantics, not a use-case. It signals non-durable,
zero-buffer, at-most-once, may-drop, and "not stored in Khepri."

This new queue type is then used for AMQP 1.0 and AMQP 0.9.1.

Sending to the volatile queue is stateless like previously with Direct Reply-To in AMQP 0.9.1 and like done
for the MQTT QoS 0 queue.
This allows for use cases where a single responder replies to e.g. 100k different requesters.

RabbitMQ will automatically auto grant new link-credit to the responder because the new queue type confirms immediately.

The key gets implicitly checked by the channel/session:
If the queue name (including the key) doesn’t exist, the `handle_event` callback for this queue isn’t invoked and therefore
no delivery will be sent to the responder.

This commit supports Direct Reply-To across AMQP 1.0 and 0.9.1. In other
words, the requester can be an AMQP 1.0 client while the responder is an
AMQP 0.9.1 client or vice versa.
RabbitMQ will internally convert between AMQP 0.9.1 `reply_to` and AMQP
1.0 `/queues/<queue>` address. The AMQP 0.9.1 `reply_to` property is
expected to contain a queue name. That's in line with the AMQP 0.9.1
spec:
> One of the standard message properties is Reply-To, which is designed
specifically for carrying the name of reply queues.

Compared to AMQP 0.9.1 where the requester sets the `reply_to` property
to `amq.rabbitmq.reply-to` and RabbitMQ modifies this field when
forwarding the message to the request queue, in AMQP 1.0 the requester
learns about the queue name from the broker at link attachment time.
The requester has to set the reply-to property to the server generated
queue name. That's because the server isn't allowed to modify the bare
message.

During link attachment time, the client has to set certain fields.
These fields are expected to be set by the RabbitMQ client libraries.
Here is an Erlang example:
```erl
Source = #{address => undefined,
           durable => none,
           expiry_policy => <<"link-detach">>,
           dynamic => true,
           capabilities => [<<"rabbitmq:volatile-queue">>]},
AttachArgs = #{name => <<"receiver">>,
               role => {receiver, Source, self()},
               snd_settle_mode => settled,
               rcv_settle_mode => first},
{ok, Receiver} = amqp10_client:attach_link(Session, AttachArgs),
AddressReplyQ = receive {amqp10_event, {link, Receiver, {attached, Attach}}} ->
                  #'v1_0.attach'{source = #'v1_0.source'{address = {utf8, Addr}}} = Attach,
                  Addr
end,
```

The client then sends the message by setting the reply-to address as
follows:
```erl
amqp10_client:send_msg(
  SenderRequester,
  amqp10_msg:set_properties(
    #{message_id => <<"my ID">>,
      reply_to => AddressReplyQ},
    amqp10_msg:new(<<"tag">>, <<"request">>))),
```

If the responder attaches to the queue target in the reply-to field,
RabbitMQ will check if the requester link is still attached. If the
requester detached, the link will be refused.

The responder can also attach to the anonymous null target and set the
`to` field to the `reply-to` address.

If RabbitMQ cannot deliver a reply, instead of buffering the reply,
RabbitMQ will be drop the reply and increment the following Prometheus metric:
```
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_volatile_queue",dead_letter_strategy="disabled"} 0.0
```
That's in line with the MQTT QoS 0 queue type.

A reply message could be dropped for a variety of reasons:
1. The requester ran out of link-credit. It's therefore the requester's
   responsibility to grant sufficient link-credit on its receiving link.
2. RabbitMQ isn't allowed to deliver any message to due session flow
   control. It's the requster's responsibility to keep the session window
   large enough.
3. The requester doesn't consume messages fast enough causing TCP
   backpressure being applied or the RabbitMQ AMQP writer proc isn't
   scheduled quickly enough. The latter can happen for example if
   RabbitMQ runs with a single scheduler (is assigned a single CPU
   core). In either case, RabbitMQ internal flow control causes the
   volatile queue to drop messages.

Therefore, if high throughput is required while message loss is undesirable, a classic queue should be used
instead of a volatile queue since the former buffers messages while the
latter doesn't.

The main difference between the volatile queue and the MQTT QoS 0 queue
is that the former isn't written to the metadata store.

 # Breaking Change
Prior to this PR the following [documented caveat](https://www.rabbitmq.com/docs/4.0/direct-reply-to#limitations) applied:
> If the RPC server publishes with the mandatory flag set then `amq.rabbitmq.reply-to.*`
is treated as **not** a queue; i.e. if the server only publishes to this name then the message
will be considered "not routed"; a `basic.return` will be sent if the mandatory flag was set.

This PR removes this caveat.
This PR introduces the following new behaviour:
> If the RPC server publishes with the mandatory flag set, then `amq.rabbitmq.reply-to.*`
is treated as a queue (assuming this queue name is encoded correctly). However,
whether the requester is still there to consume the reply is not checked at routing time.
In other words, if the RPC server only publishes to this name, then the message will be
considered "routed" and RabbitMQ will therefore not send a `basic.return`.
2025-09-09 14:52:22 +02:00
..
include Enable AMQP 1.0 clients to manage topologies 2024-03-28 11:36:56 +01:00
src Support Direct Reply-To for AMQP 1.0 2025-09-09 14:52:22 +02:00
test Fix concurrent AMQP queue declarations (#13727) 2025-04-11 12:04:00 +02:00
LICENSE Enable AMQP 1.0 clients to manage topologies 2024-03-28 11:36:56 +01:00
LICENSE-MPL-RabbitMQ Enable AMQP 1.0 clients to manage topologies 2024-03-28 11:36:56 +01:00
Makefile make: Fix regressions following make plugins cleanup 2024-09-10 15:42:28 +02:00
README.md Enable AMQP 1.0 clients to manage topologies 2024-03-28 11:36:56 +01:00
erlang.mk Enable AMQP 1.0 clients to manage topologies 2024-03-28 11:36:56 +01:00
rabbitmq-components.mk Enable AMQP 1.0 clients to manage topologies 2024-03-28 11:36:56 +01:00

README.md

Erlang RabbitMQ AMQP 1.0 Client

The Erlang AMQP 1.0 client is a client that can communicate with any AMQP 1.0 broker. In contrast, this project (Erlang RabbitMQ AMQP 1.0 Client) can only communicate with RabbitMQ. This project wraps (i.e. depends on) the Erlang AMQP 1.0 client providing additionally the following RabbitMQ management operations:

  • declare queue
  • get queue
  • delete queue
  • purge queue
  • bind queue to exchange
  • unbind queue from exchange
  • declare exchange
  • delete exchange
  • bind exchange to exchange
  • unbind exchange from exchange

Except for get queue, these management operations are defined in the AMQP 0.9.1 protocol. To support these AMQP 0.9.1 / RabbitMQ specific operations over AMQP 1.0, this project implements a subset of the following (most recent) AMQP 1.0 extension specifications:

This project might support more (non AMQP 0.9.1) RabbitMQ operations via AMQP 1.0 in the future.

Topologies (exchanges, bindings, queues) in RabbitMQ can be created via