Commit Graph

910 Commits

Author SHA1 Message Date
Diana Parra Corbacho 62daba4126 Local shovel: ensure no more messages are published with delete-after
once the limit is reached
2025-10-02 13:55:24 +02:00
Michael Klishin c3b064c704
Merge pull request #14648 from rabbitmq/mk-dpc-shovel-fetch-parameters-module-from-registry
Shovel worker: fetch handler module from rabbit_registry
2025-09-30 12:43:40 -04:00
Diana Parra Corbacho c9697a6b7a Shovels: make changes to shovel status backward compatible 2025-09-30 17:53:51 +02:00
Michael Klishin 2a920a4a32
Shovel worker: fetch handle module from rabbit_registry 2025-09-30 11:38:53 -04:00
Diana Parra Corbacho 33a6a20017 Shovels: tests for deletion of failed shovels 2025-09-30 10:10:35 +02:00
Diana Parra Corbacho 13201b2eb0 Shovels: return hosting node in terminated shovel status
Fixes deletion and restart
2025-09-30 08:25:56 +02:00
Diana Parra Corbacho 7f1ceb26fd Shovel: fix deletion of terminated shovels 2025-09-29 21:10:36 +02:00
Diana Parra Corbacho 99fea41352 Shovels: more detailed error message 2025-09-29 13:42:24 +02:00
Diana Parra Corbacho 9f39f60a3e Shovels: fix shovel status and deletion of failed shovels
These shovels are stuck in a restart loop and need to be listed on
shovel status, which also allows for its deletion
2025-09-29 12:15:39 +02:00
David Ansari 2e75bc6eb5
Reduce ETS copy overhead when delivering to target queues (#14570)
* Reduce ETS copy overhead when delivering to target queues

 ## What?
This commit avoids copying the full amqqueue record from ETS per incoming message
and target queue.
The amqqueue record contains 21 elements and for some queue types,
especially streams, some elements are themselves nested terms.

 ## How?

In Khepri, use a new `rabbit_khepri_queue_target` projection which
contains a subset of the full amqqueue record.

This way all relevant information to deliver to a target queue can be
looked up in a single ets:lookup_element call.

Alternative approaches are described in https://github.com/erlang/otp/issues/10211

 ## Benchmark

Fanout to 3 streams

Start broker:
```
make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test" \
    FULL=1 \
    RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \
    RABBITMQ_CONFIG_FILE="$HOME/scratch/rabbit/high-credit.config" \
    PLUGINS="rabbitmq_management"
```

`high-credit.config` contains:
```
[
 {rabbit, [
  %% Maximum incoming-window of AMQP 1.0 session.
  %% Default: 400
  {max_incoming_window, 5000},

  %% Maximum link-credit RabbitMQ grants to AMQP 1.0 sender.
  %% Default: 128
  {max_link_credit, 2000},

  %% Maximum link-credit RabbitMQ AMQP 1.0 session grants to sending queue.
  %% Default: 256
  {max_queue_credit, 5000},

  {loopback_users, []}
 ]},

 {rabbitmq_management_agent, [
  {disable_metrics_collector, true}
 ]}
].
```

Create the 3 streams and bindings to the fanout exchange:
```
deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3

```

Start the client:
```
quiver-arrow send //host.docker.internal//exchanges/amq.fanout --summary --count 1m --body-size 4
```

`main` branch:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 16.3 seconds
Message rate ......................................... 61,237 messages/s
```

with this PR:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 14.2 seconds
Message rate ......................................... 70,309 messages/s
```

Hence, this PR increases the throughput when sending to 3 streams via AMQP by ~14%.

* Avoid creating 5 elems tuple

* Simplify rabbit_queue_type callbacks

deliver should only take targets and init should only take the full record

* Fix flaky test

* Fix specs
2025-09-25 11:25:09 +02:00
Michael Klishin 351ec1b4c7
Merge pull request #14593 from rabbitmq/shovel-raw
Shovels: Optimise amqp10 client messages for shovel usage
2025-09-24 14:04:00 -04:00
Diana Parra Corbacho 897260ce3d Shovels: Optimise amqp10 client messages for shovel usage
AMQP10 shovels don't need the amqp10 message format, the binary
can be translated directly into a message container and also
the other way around. The new amqp10_raw_msg just stores the payload
and information required to create the transfer frame, skipping
a few unnecessary encoding/decoding operations of the AMQP10 sections.
2025-09-23 23:11:49 +02:00
Michael Klishin 668dbe2e2d
Merge pull request #14561 from rabbitmq/local-shovel-counters
Local shovels: Add global counters
2025-09-23 17:02:00 -04:00
D Corbacho 1e3e58a29b
Amqp10 shovel: retry with insufficient_credit error (#14569) 2025-09-19 14:37:39 +02:00
Diana Parra Corbacho a4b37d7886 Local shovels: Test global counters 2025-09-18 13:18:16 +02:00
Diana Parra Corbacho bb5d527d17 Local shovels: Add global counters 2025-09-18 09:17:52 +02:00
David Ansari 72cd7a35c2 Support Direct Reply-To for AMQP 1.0
# What?
* Support Direct Reply-To for AMQP 1.0
* Compared to AMQP 0.9.1, this PR allows for multiple volatile queues on a single
  AMQP 1.0 session. Use case: JMS clients can create multiple temporary queues on
  the same JMS/AMQP session:
  * https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/session#createTemporaryQueue()
  * https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/jmscontext#createTemporaryQueue()
* Fix missing metrics in for Direct Reply-To in AMQP 0.9.1, e.g.
  `messages_delivered_total`
* Fix missing metrics (even without using Direct Reply-To ) in AMQP 0.9.1:
  If stats level is not `fine`, global metrics `rabbitmq_global_messages_delivered_*` should still be incremented.

 # Why?
* Allow for scalable at-most-once RPC reply delivery
  Example use case: thousands of requesters connect, send a single
  request, wait for a single reply, and disconnect.
  This PR won't create any queue and won't write to the metadata store.
  Therefore, there's less pressure on the metadata store, less pressure
  on the Management API when listing all queues, less pressure on the
  metrics subsystem, etc.
* Feature parity with AMQP 0.9.1

 # How?
This PR extracts the previously channel specific Direct Reply-To code
into a new queue type: `rabbit_volatile_queue`.
"Volatile" describes the semantics, not a use-case. It signals non-durable,
zero-buffer, at-most-once, may-drop, and "not stored in Khepri."

This new queue type is then used for AMQP 1.0 and AMQP 0.9.1.

Sending to the volatile queue is stateless like previously with Direct Reply-To in AMQP 0.9.1 and like done
for the MQTT QoS 0 queue.
This allows for use cases where a single responder replies to e.g. 100k different requesters.

RabbitMQ will automatically auto grant new link-credit to the responder because the new queue type confirms immediately.

The key gets implicitly checked by the channel/session:
If the queue name (including the key) doesn’t exist, the `handle_event` callback for this queue isn’t invoked and therefore
no delivery will be sent to the responder.

This commit supports Direct Reply-To across AMQP 1.0 and 0.9.1. In other
words, the requester can be an AMQP 1.0 client while the responder is an
AMQP 0.9.1 client or vice versa.
RabbitMQ will internally convert between AMQP 0.9.1 `reply_to` and AMQP
1.0 `/queues/<queue>` address. The AMQP 0.9.1 `reply_to` property is
expected to contain a queue name. That's in line with the AMQP 0.9.1
spec:
> One of the standard message properties is Reply-To, which is designed
specifically for carrying the name of reply queues.

Compared to AMQP 0.9.1 where the requester sets the `reply_to` property
to `amq.rabbitmq.reply-to` and RabbitMQ modifies this field when
forwarding the message to the request queue, in AMQP 1.0 the requester
learns about the queue name from the broker at link attachment time.
The requester has to set the reply-to property to the server generated
queue name. That's because the server isn't allowed to modify the bare
message.

During link attachment time, the client has to set certain fields.
These fields are expected to be set by the RabbitMQ client libraries.
Here is an Erlang example:
```erl
Source = #{address => undefined,
           durable => none,
           expiry_policy => <<"link-detach">>,
           dynamic => true,
           capabilities => [<<"rabbitmq:volatile-queue">>]},
AttachArgs = #{name => <<"receiver">>,
               role => {receiver, Source, self()},
               snd_settle_mode => settled,
               rcv_settle_mode => first},
{ok, Receiver} = amqp10_client:attach_link(Session, AttachArgs),
AddressReplyQ = receive {amqp10_event, {link, Receiver, {attached, Attach}}} ->
                  #'v1_0.attach'{source = #'v1_0.source'{address = {utf8, Addr}}} = Attach,
                  Addr
end,
```

The client then sends the message by setting the reply-to address as
follows:
```erl
amqp10_client:send_msg(
  SenderRequester,
  amqp10_msg:set_properties(
    #{message_id => <<"my ID">>,
      reply_to => AddressReplyQ},
    amqp10_msg:new(<<"tag">>, <<"request">>))),
```

If the responder attaches to the queue target in the reply-to field,
RabbitMQ will check if the requester link is still attached. If the
requester detached, the link will be refused.

The responder can also attach to the anonymous null target and set the
`to` field to the `reply-to` address.

If RabbitMQ cannot deliver a reply, instead of buffering the reply,
RabbitMQ will be drop the reply and increment the following Prometheus metric:
```
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_volatile_queue",dead_letter_strategy="disabled"} 0.0
```
That's in line with the MQTT QoS 0 queue type.

A reply message could be dropped for a variety of reasons:
1. The requester ran out of link-credit. It's therefore the requester's
   responsibility to grant sufficient link-credit on its receiving link.
2. RabbitMQ isn't allowed to deliver any message to due session flow
   control. It's the requster's responsibility to keep the session window
   large enough.
3. The requester doesn't consume messages fast enough causing TCP
   backpressure being applied or the RabbitMQ AMQP writer proc isn't
   scheduled quickly enough. The latter can happen for example if
   RabbitMQ runs with a single scheduler (is assigned a single CPU
   core). In either case, RabbitMQ internal flow control causes the
   volatile queue to drop messages.

Therefore, if high throughput is required while message loss is undesirable, a classic queue should be used
instead of a volatile queue since the former buffers messages while the
latter doesn't.

The main difference between the volatile queue and the MQTT QoS 0 queue
is that the former isn't written to the metadata store.

 # Breaking Change
Prior to this PR the following [documented caveat](https://www.rabbitmq.com/docs/4.0/direct-reply-to#limitations) applied:
> If the RPC server publishes with the mandatory flag set then `amq.rabbitmq.reply-to.*`
is treated as **not** a queue; i.e. if the server only publishes to this name then the message
will be considered "not routed"; a `basic.return` will be sent if the mandatory flag was set.

This PR removes this caveat.
This PR introduces the following new behaviour:
> If the RPC server publishes with the mandatory flag set, then `amq.rabbitmq.reply-to.*`
is treated as a queue (assuming this queue name is encoded correctly). However,
whether the requester is still there to consume the reply is not checked at routing time.
In other words, if the RPC server only publishes to this name, then the message will be
considered "routed" and RabbitMQ will therefore not send a `basic.return`.
2025-09-09 14:52:22 +02:00
Diana Parra Corbacho 0eb5046094 Local shovels: send confirms and rejections before autodelete 2025-09-02 16:49:09 +02:00
Diana Parra Corbacho 41d52835bf Local shovels: fix handling of acks/nacks from multiple queues 2025-09-02 12:09:58 +02:00
Diana Parra Corbacho 8a116500e0 Local shovels: generate multiple ack/nack as the channel does
Avoids overflowing source queue with individual ack/nacks
2025-09-02 12:09:17 +02:00
Diana Parra Corbacho 6e2e19591a Local shovels: fix handling of acks/nacks 2025-09-02 12:09:17 +02:00
D Corbacho 7f1febe70b
Local shovels: exclude tests in mixed-versions with 3.13.x (#14482)
The test suites need to be excluded at group level, so the end_per_suite
is always executed and the cluster stopped. Otherwise, clusters
remain running in CI and the following suites find the TCP ports busy.
2025-09-02 10:56:56 +02:00
Michal Kuratczyk 1b4cff21b3
Skip local shovel tests when mixed with 3.13 (#14475) 2025-09-01 15:50:44 +02:00
D Corbacho 0977ad2dde
Local shovels: skip tests in mixed-version (#14473)
Local shovels require rabbitmq_4_0_0 feature flag, so it can't run
in mixed-version clusters with 3.13.x
2025-09-01 13:25:51 +02:00
Michael Klishin 30fb9c1128
Re-arrange shovel test suites
* Use more descriptive names
 * Prefix unit test suites accordingly
 * Reuse await_credit/1
 * await_credit/1 in a flakey test
2025-08-29 18:34:51 -04:00
Michael Klishin 88ea37b290
Shovel: use a constant for the runtime parameter component 2025-08-27 17:26:07 -04:00
Michael Klishin dee15f05a1
Shovel status: minor refactoring 2025-08-26 18:43:57 -04:00
Diana Parra Corbacho 68b98bfd3b Local shovels: ack messages not routed to any queue using exchanges 2025-08-26 20:24:30 +02:00
Diana Parra Corbacho 1c72316d2e Local shovels: Handle multiple rejects on node down 2025-08-26 12:22:59 +02:00
Diana Parra Corbacho 0174f59c4e Local shovel: handle unacked messages in queue down
It affects messages routed through exchanges, when there is no target
queue configured in the shovel (it could be many queues as destination or none).
2025-08-26 11:27:14 +02:00
Michael Klishin bbb9aeaf0e
rabbit_shovel_operating_mode: coerce returned value to an atom 2025-08-25 19:59:59 -04:00
Michael Klishin c68ea370d2
rabbit_shovel_status: handle binary values 2025-08-25 19:43:10 -04:00
Michael Klishin 5de3a57af3
Shovel: don't start top-level sups if running in library mode
References #14425
2025-08-25 17:22:21 -04:00
Michael Klishin e61eb7a45e
More logging 2025-08-25 14:41:55 -04:00
Michael Klishin dbff137e4a
Address review feedback #14425 2025-08-25 14:14:45 -04:00
Michael Klishin 3043dc621f
Shovel: introduce operating modes
Sometimes you want a plugin to act as a library
and not an application. That is, for its modules
to be available at compile time and on a running
node but, say, the actual runtime parameter
handling and supervision of shovels to be
handled by another plugin.

Since we do not currently have a concept of
"library plugins" or "library dependencies",
this approach demonstrates one example of how
some plugins can be used as libraries.
2025-08-25 13:37:49 -04:00
D Corbacho e4bc525b5e
Merge pull request #14421 from rabbitmq/local-shovel-default-user-pass
Local shovels: Set default user/pass for uris like "amqp://"
2025-08-25 12:36:47 +02:00
Diana Parra Corbacho f990f7e271 Local shovels: Set default user/pass for uris like "amqp://" 2025-08-25 11:34:57 +02:00
Diana Parra Corbacho b6d831b115 Shovel amqp1.0: fix delete after validation
Queue-length is not supported, so it should fail already during validation and
not shovel startup
2025-08-23 13:26:46 +02:00
Michael Klishin c7f6cad331
Shovel: ignore expected log exceptions in local_SUITE 2025-08-20 19:23:33 -04:00
Diana Parra Corbacho 212ae64c2d Local shovels: place behind rabbitmq_4.0.0 feature flag 2025-08-19 11:52:08 +02:00
Michael Klishin d475a0e95f
Rename a Shovel key to 'shovel.local.max_credit' 2025-08-18 11:42:00 -04:00
Diana Parra Corbacho 4452872042 Local shovels: Update default credit 2025-08-18 16:50:31 +02:00
Diana Parra Corbacho a419ab3708 Shovels: re-order confirmation and unacked update 2025-08-18 16:40:03 +02:00
Diana Parra Corbacho 6bb649a9df Local shovels: single acks
For some reason, multiple acknowledgments are really slow when using credit flow v2
2025-08-14 15:40:40 +02:00
Diana Parra Corbacho 382fac3e34 Local shovels: remove stashed credit request 2025-08-14 12:39:26 +02:00
Diana Parra Corbacho 3349321c58 Local shovels: fix credit flow 2025-08-13 19:30:19 +02:00
Diana Parra Corbacho 02fcbc0dc5 Local shovels: optimisation 2025-08-13 14:13:12 +02:00
Diana Parra Corbacho edf0e3c1ff Local shovel: remove unused clause 2025-08-13 13:54:52 +02:00
Diana Parra Corbacho 07a085365e Local shovels: optimisations 2025-08-13 12:59:40 +02:00