See discussion #14244
These changes will allow a user to make an HTTP API request to...
```
/api/ldap/validate/simple-bind
```
...with an appropriate JSON body, and the plugin will attempt a
connection to the specified LDAP server using the provided credentials.
This allows validation that a connection can be made to an LDAP server
from a RabbitMQ cluster environment.
* Add code and tests for `eldap:simple_bind` validation.
* Add support for testing TLS connections to OpenLDAP
* Add support for validating TLS related configuration via `/ldap/validate/simple-bind`
* Add support for various TLS options:
* versions
* depth
* multiple CA cert pem data
* Fall back to system certs if neither `cacertfile` nor `cacerts_pem_data` are provided to the `simple-bind` validation.
* Add `ssl_hostname_verification` support.
* Return 422 when auth fails
* Add more informative information when connection fails
* Add more tests with invalid input
* Catch parsing errors
* Tests for edge-cases for password / user_dn
* Add test for use_ssl + use_starttls combo
* Add test for ssl_options.depth validation
* Add validation tests for server_name_indication
(cherry picked from commit 9a4cb9c881)
This change adds config options for controlling the `is_enabled/0`
queue type callback for classic queues, quorum queues and stream
queues. This is useful for operators to be able to control whether
users can declare these queues - i.e. to prevent users from declaring
queue types which cannot be supported operationally.
Setting
queue_types.stream.enabled = false
In a config file for example prevents stream queues from being declared.
(cherry picked from commit 976f0cf175)
'user_provided_plugins_data_dir' make it immediately
obvious that the data directory is for 3rd party
plugins (user-provided plugins), and it is
not set up or used internally by RabbitMQ.
(cherry picked from commit f2131c6549)
This is mainly for community plugins which want to migrate away from
Mnesia into a new subdir. (They might not even use Khepri, or use
Khepri but a different store id.)
Fixes#11304
(cherry picked from commit 3442c91986)
Follow-up to:
* #3442
* 93db480bc4
* #13899
* #14326
Simply changing `+=` to `?=` doesn't work as there are other important
statements added to `RMQ_ERLC_OPTS` that can't be left out when `make
RMQ_ERLC_OPTS=''` is run. This only became apparent when a completely
cleaned-out checkout of `rabbitmq/rabbitmq-server` was attempted to be
built with that command.
This adds `NON_DETERMINISTIC` which will be used like this:
```
make NON_DETERMINISTIC=true
```
...and `+deterministic` won't be passed to `erlc` or other commands.
(cherry picked from commit 2eb8ce5e0a)
* Reduce ETS copy overhead when delivering to target queues
## What?
This commit avoids copying the full amqqueue record from ETS per incoming message
and target queue.
The amqqueue record contains 21 elements and for some queue types,
especially streams, some elements are themselves nested terms.
## How?
In Khepri, use a new `rabbit_khepri_queue_target` projection which
contains a subset of the full amqqueue record.
This way all relevant information to deliver to a target queue can be
looked up in a single ets:lookup_element call.
Alternative approaches are described in https://github.com/erlang/otp/issues/10211
## Benchmark
Fanout to 3 streams
Start broker:
```
make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test" \
FULL=1 \
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \
RABBITMQ_CONFIG_FILE="$HOME/scratch/rabbit/high-credit.config" \
PLUGINS="rabbitmq_management"
```
`high-credit.config` contains:
```
[
{rabbit, [
%% Maximum incoming-window of AMQP 1.0 session.
%% Default: 400
{max_incoming_window, 5000},
%% Maximum link-credit RabbitMQ grants to AMQP 1.0 sender.
%% Default: 128
{max_link_credit, 2000},
%% Maximum link-credit RabbitMQ AMQP 1.0 session grants to sending queue.
%% Default: 256
{max_queue_credit, 5000},
{loopback_users, []}
]},
{rabbitmq_management_agent, [
{disable_metrics_collector, true}
]}
].
```
Create the 3 streams and bindings to the fanout exchange:
```
deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \
deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \
deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 && \
deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \
deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \
deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3
```
Start the client:
```
quiver-arrow send //host.docker.internal//exchanges/amq.fanout --summary --count 1m --body-size 4
```
`main` branch:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 16.3 seconds
Message rate ......................................... 61,237 messages/s
```
with this PR:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 14.2 seconds
Message rate ......................................... 70,309 messages/s
```
Hence, this PR increases the throughput when sending to 3 streams via AMQP by ~14%.
* Avoid creating 5 elems tuple
* Simplify rabbit_queue_type callbacks
deliver should only take targets and init should only take the full record
* Fix flaky test
* Fix specs
(cherry picked from commit 2e75bc6eb5)
Co-authored-by: David Ansari <david.ansari@gmx.de>
A queue client that send a message during a network partition
that later caused a distribution disconnection would in some cases
never resend the lost message, even if kept in the pending buffer.
Subsequent sends would be accepted by the state machine but would
never be enqueued as there would be a missing sequence.
In the case of publishers that use pre-settled sends the pending
messages would have also been incorrectly removed from the
pending map.
To fix we removed timer resend aapproach and instead have the leader
send leader_change messages on node up to prompt any queue clients
to resend their pending buffer.
(cherry picked from commit b66dc4017e)
This uses Cowboy's new direct data delivery mechanism,
which provides more performance.
Cowboy is now pinned to 2.14.0 and Cowlib to 2.16.0.
HTTP/2 Websocket is enabled by default. It can be disabled
as needed by setting `#{enable_connect_protocol => false}`
in the plugin's `cowboy_opts`.
Web-STOMP did not have HTTP/2 enabled before, now it does.
Web-MQTT already had HTTP/2 enabled.
(cherry picked from commit 725c2d560f)
AMQP10 shovels don't need the amqp10 message format, the binary
can be translated directly into a message container and also
the other way around. The new amqp10_raw_msg just stores the payload
and information required to create the transfer frame, skipping
a few unnecessary encoding/decoding operations of the AMQP10 sections.
(cherry picked from commit 897260ce3d)
And `eof` crashes.
The problem is that we may end up trying to read more data
from the file when scanning, despite being at the end of the
file. This results in the current Acc to be returned instead
of the remaining data being parsed.
This results in some messages at the end of the file being
truncated off despite still being in memory (and still pointing
to the end of the original file, well past the truncation point).
(cherry picked from commit f36385408a)
The whole point of introducing rabbitmq-diagnostics,
rabbitmq-queues, rabbitmq-streams is to stop
growing the ctl command list, which has been massive
even five years ago.
(cherry picked from commit 42e99ddf2a)
This command displays cluster-wide message size
statistics. It's less detailed than what can be
retrieved from the Prometheus endpoint, but
it'll be available to all users, regardless of their
monitoring setup, or lack thereof.
(cherry picked from commit bf3c378473)
These changes add the minimum required changes and artifacts to enable
cuttlefish schema tests, which can be run with this command:
```
make -C deps/rabbitmq_aws ct-config_schema
```
(cherry picked from commit 1001e2b038)
* AWS peer discovery: ensure consistent hostname path ordering
AWS EC2 API returns networkInterfaceSet and privateIpAddressesSet in
arbitrary order, causing non-deterministic hostname resolution during
peer discovery. This leads to inconsistent cluster formation.
Changes:
- Sort network interfaces by deviceIndex (0 first for primary ENI)
- Sort private IP addresses by primary flag (primary=true first)
- Add debug logging to show hostname path selection and sorting results
- Add comprehensive unit tests for sorting behavior
The sorting ensures deviceIndex=0 and primary=true IPs are consistently
selected first, making peer discovery deterministic across deployments.
* AWS peer discovery: ensure consistent hostname path ordering (address feedback on debug logs and sorting helper functions)
(cherry picked from commit 4a324706a4)
Osiris can read ahead data in case of small chunks. This saves system
calls and increases consumption rate dramatically for some streams.
This is transparent for the stream protocol, but requires a small tweak
for the stream queue type implementation (passing in the previous
iterator when creating a new one).
The read ahead is on by default but can be deactivated with to the new
stream.read_ahead configuration entry (true / false).
Co-authored-by: Karl Nilsson <kjnilsson@gmail.com>
References rabbitmq/osiris#192
(cherry picked from commit 9f162dfd01)
This saves a system call by sending the frame header and the chunk
header at the same time.
References rabbitmq/osiris#192
(cherry picked from commit 885e89ebbb)
* Add test case for binding args Khepri regression
This commit adds a test case for a regression/bug that occurs in Khepri.
```
make -C deps/rabbit ct-bindings t=cluster:binding_args RABBITMQ_METADATA_STORE=mnesia
```
succeeds, but
```
make -C deps/rabbit ct-bindings t=cluster:binding_args RABBITMQ_METADATA_STORE=khepri
```
fails.
The problem is that ETS table `rabbit_khepri_index_route` cannot
differentiate between two bindings with different binding arguments, and
therefore deletes entries too early, leading to wrong routing decisions.
The solution to this bug is to include the binding arguments in the
`rabbit_khepri_index_route` projection, similar to how the binding args
are also included in the `rabbit_index_route` Mnesia table.
This bug/regression is an edge case and exists if the source exchange
type is `direct` or `fanout` and if different bindings arguments are
used by client apps. Note that such binding arguments are entirely
ignored when RabbitMQ performs routing decisions for the `direct` or
`fanout` exchange. However, there might be client apps that use binding
arguments to add some metadata to the binding, for example `app-id` or
`user` or `purpose` and might use this metadata as a form of reference
counting in deciding when to delete `auto-delete` exchanges or just for
informational/operational purposes.
* Fix regression with Khepri binding args
Fix#14533
* Speed up fanout exchange
Resolves#14531
## What?
Increase end-to-end message throughput for messages routed via the fanout exchange by ~42% (see benchmark below).
In addition to the fanout exchange, a similar speed up is achieved for the following exchange types:
* modulus hash
* random
* recent history
This applies only if Khepri is enabled.
## How?
Use an additional routing table (projection) whose table key is the source exchange.
Looking up the destinations happens then by an ETS table key.
Prior to this commit, CPUs were busy compiling the same match spec for every incoming message.
## Benchmark
1. Start RabbitMQ:
```
make run-broker RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \
RABBITMQ_CONFIG_FILE="advanced.config" PLUGINS="rabbitmq_management"
```
where `advanced.config` contains:
```
[
{rabbitmq_management_agent, [
{disable_metrics_collector, true}
]}
].
```
2. Create a queue and binding:
```
deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=classic durable=true name=q1 && \
deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=q1
```
3. Create the load
```
java -jar target/perf-test.jar -p -e amq.fanout -u q1 -s 5 --autoack -z 60
```
Before this commit:
```
sending rate avg: 97394 msg/s
receiving rate avg: 97394 msg/s
```
After this commit:
```
sending rate avg: 138677 msg/s
receiving rate avg: 138677 msg/s
```
The CPU flamegraph shows that `rabbit_exchange:route/3` consumes the following CPU amounts:
* 13.5% before this commit
* 3.4% after this commit
## Downsides
Additional ETS memory usage for the new projection table.
However, the new table does not store any binding entries for the following
source exchange types:
* direct
* headers
* topic
* x-local-random
* Add exchange binding tests
Test that exchange bindings work correctly with the new projection
tables `rabbit_khepri_route_by_source` and
`rabbit_khepri_route_by_source_key`.
* Always register all projections
Khepri won’t modify a projection that is already registered (based on its name).
* Protect ets:lookup_element/4 in try catch
See https://github.com/rabbitmq/rabbitmq-server/pull/11667#issue-2401399413
for rationale.
(cherry picked from commit a66c716d9c)
Co-authored-by: David Ansari <david.ansari@gmx.de>
The previous implementation did not take the unique `passthrough` record
into account when formatting state.
To test the priority and non-priority scenarios, add `exit(kaboom)` in
`rabbit_amqqueue_process` `handle_call({basic_consume...` then run
PerfTest with these arguments:
```
--producers 1 --consumers 1 --pmessages 100 --queue-args x-max-priority=10
--producers 1 --consumers 1 --pmessages 100
```
(cherry picked from commit 9c90e85e7c)
* Call `Mod:format_state/1` if exported to possibly truncate huge states
* Add more information about truncated ram_pending_ack and disk_pending_ack
* Add `log.error_logger_format_depth` cuttlefish schema value
* Add `format_state/1` to `rabbit_channel`
* Add `log.summarize_process_state`, default is `false`, to enable summarizing process state for crash logs.
* Added `format_state` to `rabbit_classic_queue_index_v2` and `rabbit_classic_queue_store_v2`
* Ensure `rabbit_channel:format_state/1` uses `summarize_process_state_when_logged`
* Do not set `summarize_process_state_when_logged` value by default.
* Type specs
(cherry picked from commit db1291d49b)
This commit makes read operations for the following Khepri projections much cheaper:
* rabbit_khepri_queue
* rabbit_khepri_exchange
* rabbit_khepri_index_route
* rabbit_khepri_topic_trie
Entries in these ETS tables are read for every message entering RabbitMQ.
Some messages entering RabbitMQ will cause even multiple reads from
these ETS tables, e.g. multiple reads from `rabbit_khepri_queue` if a message
is routed to more than one queue or multiple reads from `rabbit_khepri_index_route`
if a message has multiple routing keys.
On a busy RabbitMQ node, these tables are read concurrently (on multiple physical processors)
hundreds of thousands of times per second.
(cherry picked from commit d29cac3cd5)
There can be at most one consumer per volatile queue instance.
This consumer must also have attached on the same channel/session as the
creator of the queue.
Prior to this commit, it was possible for clients on other
connections or sessions to attach a receiving link to an existing volatile
queue name, even though no messages would be delivered.
It's better for RabbitMQ to directly refuse the link at attach time.
(cherry picked from commit 0843704dbb)
# What?
* Support Direct Reply-To for AMQP 1.0
* Compared to AMQP 0.9.1, this PR allows for multiple volatile queues on a single
AMQP 1.0 session. Use case: JMS clients can create multiple temporary queues on
the same JMS/AMQP session:
* https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/session#createTemporaryQueue()
* https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/jmscontext#createTemporaryQueue()
* Fix missing metrics in for Direct Reply-To in AMQP 0.9.1, e.g.
`messages_delivered_total`
* Fix missing metrics (even without using Direct Reply-To ) in AMQP 0.9.1:
If stats level is not `fine`, global metrics `rabbitmq_global_messages_delivered_*` should still be incremented.
# Why?
* Allow for scalable at-most-once RPC reply delivery
Example use case: thousands of requesters connect, send a single
request, wait for a single reply, and disconnect.
This PR won't create any queue and won't write to the metadata store.
Therefore, there's less pressure on the metadata store, less pressure
on the Management API when listing all queues, less pressure on the
metrics subsystem, etc.
* Feature parity with AMQP 0.9.1
# How?
This PR extracts the previously channel specific Direct Reply-To code
into a new queue type: `rabbit_volatile_queue`.
"Volatile" describes the semantics, not a use-case. It signals non-durable,
zero-buffer, at-most-once, may-drop, and "not stored in Khepri."
This new queue type is then used for AMQP 1.0 and AMQP 0.9.1.
Sending to the volatile queue is stateless like previously with Direct Reply-To in AMQP 0.9.1 and like done
for the MQTT QoS 0 queue.
This allows for use cases where a single responder replies to e.g. 100k different requesters.
RabbitMQ will automatically auto grant new link-credit to the responder because the new queue type confirms immediately.
The key gets implicitly checked by the channel/session:
If the queue name (including the key) doesn’t exist, the `handle_event` callback for this queue isn’t invoked and therefore
no delivery will be sent to the responder.
This commit supports Direct Reply-To across AMQP 1.0 and 0.9.1. In other
words, the requester can be an AMQP 1.0 client while the responder is an
AMQP 0.9.1 client or vice versa.
RabbitMQ will internally convert between AMQP 0.9.1 `reply_to` and AMQP
1.0 `/queues/<queue>` address. The AMQP 0.9.1 `reply_to` property is
expected to contain a queue name. That's in line with the AMQP 0.9.1
spec:
> One of the standard message properties is Reply-To, which is designed
specifically for carrying the name of reply queues.
Compared to AMQP 0.9.1 where the requester sets the `reply_to` property
to `amq.rabbitmq.reply-to` and RabbitMQ modifies this field when
forwarding the message to the request queue, in AMQP 1.0 the requester
learns about the queue name from the broker at link attachment time.
The requester has to set the reply-to property to the server generated
queue name. That's because the server isn't allowed to modify the bare
message.
During link attachment time, the client has to set certain fields.
These fields are expected to be set by the RabbitMQ client libraries.
Here is an Erlang example:
```erl
Source = #{address => undefined,
durable => none,
expiry_policy => <<"link-detach">>,
dynamic => true,
capabilities => [<<"rabbitmq:volatile-queue">>]},
AttachArgs = #{name => <<"receiver">>,
role => {receiver, Source, self()},
snd_settle_mode => settled,
rcv_settle_mode => first},
{ok, Receiver} = amqp10_client:attach_link(Session, AttachArgs),
AddressReplyQ = receive {amqp10_event, {link, Receiver, {attached, Attach}}} ->
#'v1_0.attach'{source = #'v1_0.source'{address = {utf8, Addr}}} = Attach,
Addr
end,
```
The client then sends the message by setting the reply-to address as
follows:
```erl
amqp10_client:send_msg(
SenderRequester,
amqp10_msg:set_properties(
#{message_id => <<"my ID">>,
reply_to => AddressReplyQ},
amqp10_msg:new(<<"tag">>, <<"request">>))),
```
If the responder attaches to the queue target in the reply-to field,
RabbitMQ will check if the requester link is still attached. If the
requester detached, the link will be refused.
The responder can also attach to the anonymous null target and set the
`to` field to the `reply-to` address.
If RabbitMQ cannot deliver a reply, instead of buffering the reply,
RabbitMQ will be drop the reply and increment the following Prometheus metric:
```
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_volatile_queue",dead_letter_strategy="disabled"} 0.0
```
That's in line with the MQTT QoS 0 queue type.
A reply message could be dropped for a variety of reasons:
1. The requester ran out of link-credit. It's therefore the requester's
responsibility to grant sufficient link-credit on its receiving link.
2. RabbitMQ isn't allowed to deliver any message to due session flow
control. It's the requster's responsibility to keep the session window
large enough.
3. The requester doesn't consume messages fast enough causing TCP
backpressure being applied or the RabbitMQ AMQP writer proc isn't
scheduled quickly enough. The latter can happen for example if
RabbitMQ runs with a single scheduler (is assigned a single CPU
core). In either case, RabbitMQ internal flow control causes the
volatile queue to drop messages.
Therefore, if high throughput is required while message loss is undesirable, a classic queue should be used
instead of a volatile queue since the former buffers messages while the
latter doesn't.
The main difference between the volatile queue and the MQTT QoS 0 queue
is that the former isn't written to the metadata store.
# Breaking Change
Prior to this PR the following [documented caveat](https://www.rabbitmq.com/docs/4.0/direct-reply-to#limitations) applied:
> If the RPC server publishes with the mandatory flag set then `amq.rabbitmq.reply-to.*`
is treated as **not** a queue; i.e. if the server only publishes to this name then the message
will be considered "not routed"; a `basic.return` will be sent if the mandatory flag was set.
This PR removes this caveat.
This PR introduces the following new behaviour:
> If the RPC server publishes with the mandatory flag set, then `amq.rabbitmq.reply-to.*`
is treated as a queue (assuming this queue name is encoded correctly). However,
whether the requester is still there to consume the reply is not checked at routing time.
In other words, if the RPC server only publishes to this name, then the message will be
considered "routed" and RabbitMQ will therefore not send a `basic.return`.
(cherry picked from commit 72cd7a35c2)
A list of binaries was incorrectly converted by
`unicode:characters_to_binary` into one big binary. Add a function head
to match this case.
Also, add tests for the values that were not correctly formatted prior
to #14101 and #14381
(cherry picked from commit 3280bed8ac)
Speed up Direct Reply-To in AMQP 0.9.1 because prior to this PR sending each RPC reply
was bottlenecked by the slowest network link between the RabbitMQ node the responder
connected to and all other RabbitMQ nodes. This regression was introduced in RabbitMQ 3.12.0
via d65637190a
(cherry picked from commit 9f2ff6c150)
The random binary contains characters that can make an authentication
test fail. Encoding it in base64 fixes the problem.
(cherry picked from commit 67ac485e74)