* Add test case for binding args Khepri regression
This commit adds a test case for a regression/bug that occurs in Khepri.
```
make -C deps/rabbit ct-bindings t=cluster:binding_args RABBITMQ_METADATA_STORE=mnesia
```
succeeds, but
```
make -C deps/rabbit ct-bindings t=cluster:binding_args RABBITMQ_METADATA_STORE=khepri
```
fails.
The problem is that ETS table `rabbit_khepri_index_route` cannot
differentiate between two bindings with different binding arguments, and
therefore deletes entries too early, leading to wrong routing decisions.
The solution to this bug is to include the binding arguments in the
`rabbit_khepri_index_route` projection, similar to how the binding args
are also included in the `rabbit_index_route` Mnesia table.
This bug/regression is an edge case and exists if the source exchange
type is `direct` or `fanout` and if different bindings arguments are
used by client apps. Note that such binding arguments are entirely
ignored when RabbitMQ performs routing decisions for the `direct` or
`fanout` exchange. However, there might be client apps that use binding
arguments to add some metadata to the binding, for example `app-id` or
`user` or `purpose` and might use this metadata as a form of reference
counting in deciding when to delete `auto-delete` exchanges or just for
informational/operational purposes.
* Fix regression with Khepri binding args
Fix#14533
* Speed up fanout exchange
Resolves#14531
## What?
Increase end-to-end message throughput for messages routed via the fanout exchange by ~42% (see benchmark below).
In addition to the fanout exchange, a similar speed up is achieved for the following exchange types:
* modulus hash
* random
* recent history
This applies only if Khepri is enabled.
## How?
Use an additional routing table (projection) whose table key is the source exchange.
Looking up the destinations happens then by an ETS table key.
Prior to this commit, CPUs were busy compiling the same match spec for every incoming message.
## Benchmark
1. Start RabbitMQ:
```
make run-broker RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \
RABBITMQ_CONFIG_FILE="advanced.config" PLUGINS="rabbitmq_management"
```
where `advanced.config` contains:
```
[
{rabbitmq_management_agent, [
{disable_metrics_collector, true}
]}
].
```
2. Create a queue and binding:
```
deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=classic durable=true name=q1 && \
deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=q1
```
3. Create the load
```
java -jar target/perf-test.jar -p -e amq.fanout -u q1 -s 5 --autoack -z 60
```
Before this commit:
```
sending rate avg: 97394 msg/s
receiving rate avg: 97394 msg/s
```
After this commit:
```
sending rate avg: 138677 msg/s
receiving rate avg: 138677 msg/s
```
The CPU flamegraph shows that `rabbit_exchange:route/3` consumes the following CPU amounts:
* 13.5% before this commit
* 3.4% after this commit
## Downsides
Additional ETS memory usage for the new projection table.
However, the new table does not store any binding entries for the following
source exchange types:
* direct
* headers
* topic
* x-local-random
* Add exchange binding tests
Test that exchange bindings work correctly with the new projection
tables `rabbit_khepri_route_by_source` and
`rabbit_khepri_route_by_source_key`.
* Always register all projections
Khepri won’t modify a projection that is already registered (based on its name).
* Protect ets:lookup_element/4 in try catch
See https://github.com/rabbitmq/rabbitmq-server/pull/11667#issue-2401399413
for rationale.
* Call `Mod:format_state/1` if exported to possibly truncate huge states
* Add more information about truncated ram_pending_ack and disk_pending_ack
* Add `log.error_logger_format_depth` cuttlefish schema value
* Add `format_state/1` to `rabbit_channel`
* Add `log.summarize_process_state`, default is `false`, to enable summarizing process state for crash logs.
* Added `format_state` to `rabbit_classic_queue_index_v2` and `rabbit_classic_queue_store_v2`
* Ensure `rabbit_channel:format_state/1` uses `summarize_process_state_when_logged`
* Do not set `summarize_process_state_when_logged` value by default.
* Type specs
# What?
* Support Direct Reply-To for AMQP 1.0
* Compared to AMQP 0.9.1, this PR allows for multiple volatile queues on a single
AMQP 1.0 session. Use case: JMS clients can create multiple temporary queues on
the same JMS/AMQP session:
* https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/session#createTemporaryQueue()
* https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/jmscontext#createTemporaryQueue()
* Fix missing metrics in for Direct Reply-To in AMQP 0.9.1, e.g.
`messages_delivered_total`
* Fix missing metrics (even without using Direct Reply-To ) in AMQP 0.9.1:
If stats level is not `fine`, global metrics `rabbitmq_global_messages_delivered_*` should still be incremented.
# Why?
* Allow for scalable at-most-once RPC reply delivery
Example use case: thousands of requesters connect, send a single
request, wait for a single reply, and disconnect.
This PR won't create any queue and won't write to the metadata store.
Therefore, there's less pressure on the metadata store, less pressure
on the Management API when listing all queues, less pressure on the
metrics subsystem, etc.
* Feature parity with AMQP 0.9.1
# How?
This PR extracts the previously channel specific Direct Reply-To code
into a new queue type: `rabbit_volatile_queue`.
"Volatile" describes the semantics, not a use-case. It signals non-durable,
zero-buffer, at-most-once, may-drop, and "not stored in Khepri."
This new queue type is then used for AMQP 1.0 and AMQP 0.9.1.
Sending to the volatile queue is stateless like previously with Direct Reply-To in AMQP 0.9.1 and like done
for the MQTT QoS 0 queue.
This allows for use cases where a single responder replies to e.g. 100k different requesters.
RabbitMQ will automatically auto grant new link-credit to the responder because the new queue type confirms immediately.
The key gets implicitly checked by the channel/session:
If the queue name (including the key) doesn’t exist, the `handle_event` callback for this queue isn’t invoked and therefore
no delivery will be sent to the responder.
This commit supports Direct Reply-To across AMQP 1.0 and 0.9.1. In other
words, the requester can be an AMQP 1.0 client while the responder is an
AMQP 0.9.1 client or vice versa.
RabbitMQ will internally convert between AMQP 0.9.1 `reply_to` and AMQP
1.0 `/queues/<queue>` address. The AMQP 0.9.1 `reply_to` property is
expected to contain a queue name. That's in line with the AMQP 0.9.1
spec:
> One of the standard message properties is Reply-To, which is designed
specifically for carrying the name of reply queues.
Compared to AMQP 0.9.1 where the requester sets the `reply_to` property
to `amq.rabbitmq.reply-to` and RabbitMQ modifies this field when
forwarding the message to the request queue, in AMQP 1.0 the requester
learns about the queue name from the broker at link attachment time.
The requester has to set the reply-to property to the server generated
queue name. That's because the server isn't allowed to modify the bare
message.
During link attachment time, the client has to set certain fields.
These fields are expected to be set by the RabbitMQ client libraries.
Here is an Erlang example:
```erl
Source = #{address => undefined,
durable => none,
expiry_policy => <<"link-detach">>,
dynamic => true,
capabilities => [<<"rabbitmq:volatile-queue">>]},
AttachArgs = #{name => <<"receiver">>,
role => {receiver, Source, self()},
snd_settle_mode => settled,
rcv_settle_mode => first},
{ok, Receiver} = amqp10_client:attach_link(Session, AttachArgs),
AddressReplyQ = receive {amqp10_event, {link, Receiver, {attached, Attach}}} ->
#'v1_0.attach'{source = #'v1_0.source'{address = {utf8, Addr}}} = Attach,
Addr
end,
```
The client then sends the message by setting the reply-to address as
follows:
```erl
amqp10_client:send_msg(
SenderRequester,
amqp10_msg:set_properties(
#{message_id => <<"my ID">>,
reply_to => AddressReplyQ},
amqp10_msg:new(<<"tag">>, <<"request">>))),
```
If the responder attaches to the queue target in the reply-to field,
RabbitMQ will check if the requester link is still attached. If the
requester detached, the link will be refused.
The responder can also attach to the anonymous null target and set the
`to` field to the `reply-to` address.
If RabbitMQ cannot deliver a reply, instead of buffering the reply,
RabbitMQ will be drop the reply and increment the following Prometheus metric:
```
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_volatile_queue",dead_letter_strategy="disabled"} 0.0
```
That's in line with the MQTT QoS 0 queue type.
A reply message could be dropped for a variety of reasons:
1. The requester ran out of link-credit. It's therefore the requester's
responsibility to grant sufficient link-credit on its receiving link.
2. RabbitMQ isn't allowed to deliver any message to due session flow
control. It's the requster's responsibility to keep the session window
large enough.
3. The requester doesn't consume messages fast enough causing TCP
backpressure being applied or the RabbitMQ AMQP writer proc isn't
scheduled quickly enough. The latter can happen for example if
RabbitMQ runs with a single scheduler (is assigned a single CPU
core). In either case, RabbitMQ internal flow control causes the
volatile queue to drop messages.
Therefore, if high throughput is required while message loss is undesirable, a classic queue should be used
instead of a volatile queue since the former buffers messages while the
latter doesn't.
The main difference between the volatile queue and the MQTT QoS 0 queue
is that the former isn't written to the metadata store.
# Breaking Change
Prior to this PR the following [documented caveat](https://www.rabbitmq.com/docs/4.0/direct-reply-to#limitations) applied:
> If the RPC server publishes with the mandatory flag set then `amq.rabbitmq.reply-to.*`
is treated as **not** a queue; i.e. if the server only publishes to this name then the message
will be considered "not routed"; a `basic.return` will be sent if the mandatory flag was set.
This PR removes this caveat.
This PR introduces the following new behaviour:
> If the RPC server publishes with the mandatory flag set, then `amq.rabbitmq.reply-to.*`
is treated as a queue (assuming this queue name is encoded correctly). However,
whether the requester is still there to consume the reply is not checked at routing time.
In other words, if the RPC server only publishes to this name, then the message will be
considered "routed" and RabbitMQ will therefore not send a `basic.return`.
When validation fails for a policy parameter, the resulting popup can't
be read due to one extra binary encoding as well as code that escapes
HTML entites. Since the EJS template uses `<%= >` for the popup, it will
display the text as-is, and not render any HTML.
[Why]
They make it more difficult to compile RabbitMQ on Windows. They were
probably useful at the time of the switch to a monorepository but I
don't see their need anymore.
As a follow-up to my GChat thread about removing default logger handler to clean CT stdout, I was looking at
injecting logger config with undefined default handler to ct_run. It is possible but breaks cth_styledout - no
nice green things whatsoever. Then I found rabbit_ct_hook which calls redirect_logger_to_ct_logs which in turn
calls logger:remove_handler(default) apparently with zero effect! To cut story short - turned out rabbit_ct_hook
must run before cth_styledout for remove_handler line to have any effect
https://github.com/erlang/otp/issues/9739
In OTP28+, splitting an empty string returns an empty list, not an empty
string (the input).
Additionally `street-address` macro was removed in OTP28 - replace with
the value it used to be.
Lastly, rabbitmq_auth_backend_oauth2 has an MQTT test, so add
rabbitmq_mqtt to TEST_DEPS
Building from source using this command:
```
make RMQ_ERLC_OPTS= FULL=1
```
... then starting RabbitMQ via `make run-broker`, allows re-compilation
from the erl shell:
```
1> c(rabbit).
Recompiling /home/lbakken/development/rabbitmq/rabbitmq-server/deps/rabbit/src/rabbit.erl
{ok,rabbit}
```
When `+deterministic` is passed to `erlc`, the `compile` data in each
modules' information is missing the source path for the module.
Follow-up to #3442
[Why]
They were moved from `rabbit` to `rabbit_common` several years ago to
solve an dependency issue because `amqp_client` depended on the file
handle cache. This is not the case anymore.
[How]
The modules are moved back to `rabbit`.
`rabbit_common` doesn't need to depend on `os_mon` anymore. `rabbit`
already depends on it, so no changes needed here.
`include/rabbit_memory.hrl` and some test cases are moved as well to
follow the `vm_memory_monitor` module.
This avoids using Mix while compiling which simplifies
a number of things and let us do further build improvements
later on.
Elixir is only enabled from within rabbitmq_cli currently.
Eunit is disabled since there are only Elixir tests.
Dialyzer will force-enable Elixir in order to process
Elixir-compiled beam files.
This commit also includes a few changes that are
related:
* The Erlang distribution will now be started for parallel-ct
* Many unnecessary PROJECT_MOD lines have been removed
* `eunit_formatters` has been removed, it provides little value
* The new `maybe_flock` Erlang.mk function is used where possible
* Build test deps when testing rabbitmq_cli (Mix won't do it anymore)
* rabbitmq_ct_helpers now use the early plugins to have Dialyzer
properly set up
Before the client authenticates, the standard
frame_max is not used. Instead, the limit is
a special constant.
This is fine for password or x.509 certificate-based
authentication but not for some JWT tokens,
which can vary in size, and take multiple
kilobytes.
8 kB specifically is the default HTTP header
length limit used by Nginx.
Sounds like this value was good enough
for a lot of Bearer headers with JWT tokens.
Closes#13541.
[Why]
The CLI sometimes crashes early because it fails to configure the Erlang
distribution.
Because we use two CLI commands to watch the start of RabbitMQ, if one
of them fails, the Make recipe will exit with an error, leaving the
RabbitMQ node running.
[How]
We use a shell trap to stop the node if the shell is about to exit with
an error.
While here, we retry the `await_startup` CLI command several times
because this is the one failing the most. This is until the crash is
understood and a proper fix is committed.
* Redesigned k8s peer discovery
Rather than querying the Kubernetes API, just check the local node name
and try to connect to the pod with `-0` suffix (or configured
`ordinal_start` value). Only the pod with the lowest ordinal can form
a new cluster - all other pods will wait forever.
This should prevent any race conditions and incorrectly formed clusters.
`rabbit_binary_generator:map_exception/3` will crash when there are
unicode characters in the `explaination` field of `Reason#amqp_error`
parameter. The explaination string (list) is assumed to be ascii, with
each character/member in the range of a byte. Any unicode characters
in the string will trigger `badarg` crash of `list_to_binary/1` in
`rabbit_binary_generator:amqp_exception_explanation/2`.
Amqp091 shovel crash due to this is reported,
https://github.com/rabbitmq/rabbitmq-server/discussions/12874
When a queue as shovel source/destination does not exist, and its
name contains non-ascii characters, the explaination of amqp_error
will be like `no queue non_ascii_name_😍 in vhost /`. It will
subsequently crash and even affect management console.
To fix this, `unicode:characters_to_binary/1` is used instead of
`list_to_binary/1`, and unicode-safe truncation of long explaination
with `io_lib:format/3` chars_limit replaces direct bytes truncation.
The `:io.format/2` call was originally passed a single-quote string
(i.e. a charlist in Elixir terminology) which emits a warning in more
recent Elixir versions:
warning: single-quoted strings represent charlists. Use ~c"" if you indeed want a charlist or use "" instead
└─ nofile:1:12
This warning would pop up a few times when using `make dialyze` within
a deps directory. To resolve it we can switch the quoting so that the
eval string is wrapped in single quotes (equivalent for shell since this
line doesn't use variables) and the format argument is wrapped in double
quotes. This uses a binary in Elixir instead, but that's ok because
`io:format/3`'s `io:format()` parameter may either be an atom, string,
or binary.
This trick was copied from Makefile:49 which uses the same quoting.
Expose the same metrics for AMQP 1.0 connections as for AMQP 0.9.1 connections.
Display the following AMQP 1.0 metrics on the Management UI:
* Network bytes per second from/to client on connections page
* Number of sessions/channels on connections page
* Network bytes per second from/to client graph on connection page
* Reductions graph on connection page
* Garbage colletion info on connection page
Expose the following AMQP 1.0 per-object Prometheus metrics:
* rabbitmq_connection_incoming_bytes_total
* rabbitmq_connection_outgoing_bytes_total
* rabbitmq_connection_process_reductions_total
* rabbitmq_connection_incoming_packets_total
* rabbitmq_connection_outgoing_packets_total
* rabbitmq_connection_pending_packets
* rabbitmq_connection_channels
The rabbit_amqp_writer proc:
* notifies the rabbit_amqp_reader proc if it sent frames
* hibernates eventually if it doesn't send any frames
The rabbit_amqp_reader proc:
* does not emit stats (update ETS tables) if no frames are received
or sent to save resources when there are many idle connections.
For example, if the first restarted node doesn't start,
don't try to restart the other nodes. This mimics what
orchestrators such as Kubernetes or BOSH would do
(although they perform this check differently)
Provides a specific function to fix client ssl options, i.e.: apply all
fixes that are applied for TLS listeneres and clients on previous
versions but also sets `cacerts` option to CA certificates obtained by
`public_key:cacerts_get`, only when no `cacertfile` or `cacerts` are
provided.
OTP 27 reset all assumptions on how the vm reacts to processes that
buffer and process a lot of large binaries.
Substantially increasing the vheap sizes for such process restores
most of the same performance by allowing processes to hold more binary
data before major garbage collections are triggered.
This introduces a new module to capture process flag configurations.
The new vheap sizes are only applied when running on OTP 27 or
above.
[Why]
Before this patch, the $RABBITMQ_FEATURE_FLAGS environment variable took
an exhaustive list of feature flags to enable. This list overrode the
default of enabling all stable feature flags.
It made it inconvenient when a user wanted to enable an experimental
feature flag like `khepri_db` while still leaving the default behavior.
[How]
$RABBITMQ_FEATURE_FLAGS now acceps the following syntax:
RABBITMQ_FEATURE_FLAGS=+feature1,-feature2
This will start RabbitMQ with all stable feature flags, plus `feature1`,
but without `feature2`.
For users setting `forced_feature_flags_on_init` in the config, the
corresponding syntax is:
{forced_feature_flags_on_init, {rel, [feature1], [feature2]}}
All CT logs will now be under <toplevel>/logs. An improved
test workflow would be to always keep the logs/all_runs.html
page open in the browser and refresh it whenever tests are
run in any of the rabbit applications.