The shared test suite was renamed only for clarity, but the
Web-MQTT test suites were renamed out of necessity: since
we are now adding the MQTT test directory to the code path
we need test suites to have different names to avoid
conflicts. We can't (easily) addpath only for this test suite
either since CT hooks don't call functions in a predictable
enough manner; it would always be hacky.
This is a proof of concept that mostly works but is missing
some tests, such as rabbitmq_mqtt or rabbitmq_cli. It also
doesn't apply to mixed version testing yet.
Otherwise some plugins can't build if we try to run tests
directly after checkout. This is because the plugins
depend on osiris as well as rabbit, but there is no
dep_osiris defined in the plugin itself.
Fixes https://github.com/rabbitmq/rabbitmq-server/issues/12398
To repro this crash:
1. Start RabbitMQ v3.13.7 with feature flag message_containers disabled:
```
make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test" RABBITMQ_FEATURE_FLAGS=quorum_queue,implicit_default_bindings,virtual_host_metadata,maintenance_mode_status,user_limits,feature_flags_v2,stream_queue,classic_queue_type_delivery_support,classic_mirrored_queue_version,stream_single_active_consumer,direct_exchange_routing_v2,listener_records_in_ets,tracking_records_in_ets
```
In the Management UI
2. Create a quorum queue with x-delivery-limit=10
3. Publish a message to this queue.
4. Requeue this message two times.
5. ./sbin/rabbitmqctl enable_feature_flag all
6. Stop the node
7. git checkout v4.0.2
8. make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test"
9. Again in the Management UI, Get Message with Automatic Ack leads to above crash:
```
[error] <0.1185.0> ** Reason for termination ==
[error] <0.1185.0> ** {function_clause,
[error] <0.1185.0> [{mc_compat,set_annotation,
[error] <0.1185.0> [delivery_count,2,
[error] <0.1185.0> {basic_message,
[error] <0.1185.0> {resource,<<"/">>,exchange,<<>>},
[error] <0.1185.0> [<<"qq1">>],
[error] <0.1185.0> {content,60,
[error] <0.1185.0> {'P_basic',undefined,undefined,
[error] <0.1185.0> [{<<"x-delivery-count">>,long,2}],
[error] <0.1185.0> 2,undefined,undefined,undefined,undefined,undefined,
[error] <0.1185.0> undefined,undefined,undefined,undefined,undefined},
[error] <0.1185.0> none,none,
[error] <0.1185.0> [<<"m1">>]},
[error] <0.1185.0> <<230,146,94,58,177,125,64,163,30,18,177,132,53,207,69,103>>,
[error] <0.1185.0> true}],
[error] <0.1185.0> [{file,"mc_compat.erl"},{line,61}]},
[error] <0.1185.0> {rabbit_fifo_client,add_delivery_count_header,2,
[error] <0.1185.0> [{file,"rabbit_fifo_client.erl"},{line,228}]},
[error] <0.1185.0> {rabbit_fifo_client,dequeue,4,
[error] <0.1185.0> [{file,"rabbit_fifo_client.erl"},{line,211}]},
[error] <0.1185.0> {rabbit_queue_type,dequeue,5,
[error] <0.1185.0> [{file,"rabbit_queue_type.erl"},{line,755}]},
[error] <0.1185.0> {rabbit_misc,with_exit_handler,2,
[error] <0.1185.0> [{file,"rabbit_misc.erl"},{line,526}]},
[error] <0.1185.0> {rabbit_channel,handle_method,3,
[error] <0.1185.0> [{file,"rabbit_channel.erl"},{line,1257}]},
[error] <0.1185.0> {rabbit_channel,handle_cast,2,
[error] <0.1185.0> [{file,"rabbit_channel.erl"},{line,629}]},
[error] <0.1185.0> {gen_server2,handle_msg,2,[{file,"gen_server2.erl"},{line,1056}]}]}
```
The mc annotation `delivery_count` is a new mc annotation specifically
used in the header section of AMQP 1.0 messages:
https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-header
Hence, we can ignore this annotation for the old `#basic_message{}`.
Messages, messages_ready and messages_unacknowledged are duplicated
during management stats collection, resulting in internal errors
when sorting queues in the management UI.
These should not be part of rabbit_core_metrics:queue_stats/2
The shovel violated the AMQP 1.0 spec by sending transfers with settled=true
under sender settle mode unsettled (in case of shovel ack-mode being
on-publish).
For messages published to RabbitMQ, RabbitMQ honors the transfer `settled`
field, no matter what value the sender settle mode was set to in the attach
frame.
Therefore, prior to this commit, a client could send a transfer with
`settled=true` even though sender settle mode was set to `unsettled` in the
attach frame.
This commit enforces that the publisher sets only transfer `settled` fields
that are valid with the spec.
If sender settle mode is:
* `unsettled`, the transfer `settled` flag must be `false`.
* `settled`, the transfer `settled` flag must be `true`.
* `mixed`, the transfer `settled` flag can be `true` or `false`.
* Add global histogram metrics for received message sizes per-protocol
fixup: add new files to bazel
fixup: expose message_size_bytes as prometheus classic histogram type
`rabbit_msg_size_metrics` does not use `seshat` any more, but
`counters` directly.
fixup: add msg_size_metrics unit test
* Improve message size histogram
1.
Avoid unnecessary time series emitted for stream protocol
The stream protocol cannot observe message sizes.
This commit ensures that the following time series are omitted:
```
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="64"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="256"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="1024"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="4096"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="16384"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="65536"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="262144"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="1048576"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="4194304"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="16777216"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="67108864"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="268435456"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="+Inf"} 0
rabbitmq_global_message_size_bytes_count{protocol="stream"} 0
rabbitmq_global_message_size_bytes_sum{protocol="stream"} 0
```
This reduces the number of time series by 15.
2.
Further reduce the number of time series by reducing the number of
buckets. Instead of 13 bucktes, emit only 9 buckets. Buckets are not
free, each is an extra time series stored.
Prior to this commit:
```
curl -s -u guest:guest localhost:15692/metrics | ag message_size | wc -l
92
```
After this commit:
```
curl -s -u guest:guest localhost:15692/metrics | ag message_size | wc -l
57
```
3.
The emitted metric should be called
`rabbitmq_message_size_bytes_bucket` instead of `rabbitmq_global_message_size_bytes_bucket`.
The latter is poor naming. There is no need to use `global` in
the metric name given that this metric doesn't exist in the old flawed
aggregated metrics.
4.
This commit simplies module `rabbit_global_counters`.
5.
Avoid garbage collecting the 10-elements list of buckets per message
being received.
---------
Co-authored-by: Péter Gömöri <peter@84codes.com>
* Add global histogram metrics for received message sizes per-protocol
fixup: add new files to bazel
fixup: expose message_size_bytes as prometheus classic histogram type
`rabbit_msg_size_metrics` does not use `seshat` any more, but
`counters` directly.
fixup: add msg_size_metrics unit test
* Improve message size histogram
1.
Avoid unnecessary time series emitted for stream protocol
The stream protocol cannot observe message sizes.
This commit ensures that the following time series are omitted:
```
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="64"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="256"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="1024"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="4096"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="16384"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="65536"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="262144"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="1048576"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="4194304"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="16777216"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="67108864"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="268435456"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="+Inf"} 0
rabbitmq_global_message_size_bytes_count{protocol="stream"} 0
rabbitmq_global_message_size_bytes_sum{protocol="stream"} 0
```
This reduces the number of time series by 15.
2.
Further reduce the number of time series by reducing the number of
buckets. Instead of 13 bucktes, emit only 9 buckets. Buckets are not
free, each is an extra time series stored.
Prior to this commit:
```
curl -s -u guest:guest localhost:15692/metrics | ag message_size | wc -l
92
```
After this commit:
```
curl -s -u guest:guest localhost:15692/metrics | ag message_size | wc -l
57
```
3.
The emitted metric should be called
`rabbitmq_message_size_bytes_bucket` instead of `rabbitmq_global_message_size_bytes_bucket`.
The latter is poor naming. There is no need to use `global` in
the metric name given that this metric doesn't exist in the old flawed
aggregated metrics.
4.
This commit simplies module `rabbit_global_counters`.
5.
Avoid garbage collecting the 10-elements list of buckets per message
being received.
---------
Co-authored-by: Péter Gömöri <peter@84codes.com>
Adds a specific clause on the
`prometheus_rabbitmq_core_metrics_collector:labels` function when the
associated metric item is a Queue + Exchange combo (`{Queue, Exchange}`)
{release_cursor, Idx} effects promote checkpoints with an index
lower or _equal_ to the release cursor index. rabbit_fifo is emitting
the smallest active raft index instead which could cause the log to truncate
one index too many after a checkpoint promotion.
Using `rabbitmq.com` works and redirects to `www.rabbitmq.com`, but it
is preferable to use the canonical domain to have cleaner search
results.
This is important for manpages because we have an HTML copy in the
website.
Given that the default max_message_size got decreased from 128 MiB to 16
MiB in RabbitMQ 4.0 in https://github.com/rabbitmq/rabbitmq-server/pull/11455,
it makes sense to also decrease the default MQTT Maximum Packet Size from 256 MiB to 16 MiB.
Since this change was missed in RabbitMQ 4.0, it is scheduled for RabbitMQ 4.1.
build(deps): bump org.springframework.boot:spring-boot-starter-parent from 3.3.3 to 3.3.4 in /deps/rabbitmq_auth_backend_http/examples/rabbitmq_auth_backend_spring_boot_kotlin
```
<field name="message-annotations" type="fields"/>
```
Prior to this commit integration tests succeeded because both Erlang
client and RabbitMQ server contained a bug.
This bug was noticed by a Java client test suite.
--experimental is no longer particularly fair to Khepri,
which is not enabled by default because of its enormous
scope, and because once enabled, it cannot be disabled.
--opt-in would be a better name but --experimental
remains for backwards compatiblity.
When both are specified, we consider that the
user opts in if at least one of the flags is
set to true.
`delegate:invoke/2` catches errors but not exits of the delegate
process. Another process might query for a classic queue's consumers
while the classic queue is being deleted or otherwise terminating and
that would result in an exit of the calling process previously.
This return value was already possible since a classic queue will return
it during termination if `rabbit_amqqueue:internal_delete/2` fails with
that value.
`rabbit_amqqueue:delete/4` already handles this value and converts it
into a protocol error and channel exit. The other caller (MQTT
processor) will be updated in a child commit.
This commit also replaces eager conversions to protocol errors in
rabbit_classic_queue, rabbit_quorum_queue and rabbit_stream_coordinator:
we should return `{error, timeout}` consistently and not hide it in
protocol errors.
Before:
```
FORMATTER CRASH: {"Waiting for ~ts queues and streams to have quorum+1 replicas online.You can list them with `rabbitmq-diagnostics check_if_node_is_quorum_critical`","\t"}
```
After:
```
Waiting for 9 queues and streams to have quorum+1 replicas online. You can list them with `rabbitmq-diagnostics check_if_node_is_quorum_critical`
```
This reverts commit 620fff22f1.
It intoduced a regression in another area - a TCP health check,
such as the default (with cluster-operator) readinessProbe,
on a TLS-enabled instance would log a `rabbit_reader` crash
every few seconds:
```
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0> crasher:
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0> initial call: rabbit_reader:init/3
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0> pid: <0.999.0>
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0> registered_name: []
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0> exception error: no match of right hand side value {error, handshake_failed}
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0> in function rabbit_reader:init/3 (rabbit_reader.erl, line 171)
```
This updates Khepri FF description to be more correct
and to the point.
It also tweaks the management UI copywriting so
that it does not recommend against the use of
Khepri in production as it is much more mature
in 4.0.
interference from other tests sometimes makes
it fail because there is more than one connection.
Compared to most other AMQP 1.0 tests, this one can be
dropped.
For cases where users want to live a bit more dangerously this commit
maps a delivery limit of -1 (or any negative value) such that it
disables the delivery limit and restores the 3.13.x behaviour.
`khepri:fence/0,1,2` queries the leader's Raft index and blocks the
caller for the given (or default) timeout until the local member has
caught up in log replication to that index. We want to do this during
Khepri init to ensure that the local Khepri store is reasonably up to
date before continuing in the boot process and starting listeners.
This is conceptually similar to the call to `mnesia:wait_for_tables/2`
during `rabbit_mnesia:init/0` and should have the same effect.
This covers a specific case where we need to register projections not
covered by the enable callback of the `khepri_db` feature flag. The
feature flag may be enabled if a node has been part of a cluster which
enabled the flag, but the metadata store might be reset. Upon init the
feature flag will be enabled but the store will be empty and the
projections will not exist, so operations like inserting default data
will fail when asserting that a vhost exists for example.
This fixes the `cluster_management_SUITE:forget_cluster_node_in_khepri/1`
case when running the suite with `RABBITMQ_METADATA_STORE=khepri`, which
fails as mentioned above.
We could run projection registration always when using Khepri but once
projections are registered the command is idempotent so there's no need
to, and the commands are somewhat large.
This is a cosmetic change. `?RA_CLUSTER_NAME` is equivalent but is used
for clustering commands. Commands sent via the `khepri`/`khepri_adv`
APIs consistently use the `?STORE_ID` macro instead.
Previously this function threw errors. With this minor refactor we
return them instead so that `register_projection/0` is easier for
callers to work with. (In the child commit we will add another caller.)
This commit is only refactoring.
To avoid confusion with reply and noreply gen_server return values, this
commit uses different return values for handle_frame/2.
## What?
1. Support `handle-max` field in the AMQP 1.0 `begin` frame
2. Add a new setting `link_max_per_session` which defaults to 256.
3. Rename `session_max` to `session_max_per_connection`
## Why?
1. Operators might want to limit the number of links per session. A
similar setting `consumer_max_per_channel` exists for AMQP 0.9.1.
2. We should use RabbitMQ 4.0 as an opportunity to set a sensible
default as to how many links can be active on a given session simultaneously.
The session code does iterate over every link in some scenarios (e.g.
queue was deleted). At some point, it's better to just open 2nd
session instead of attaching hundreds or thousands of links to a single session.
A default `link_max_per_session` of 256 should be more than enough given
that `session_max_per_connection` is 64. So, the defaults allow
`256 * 64 = 16,384` links to be active on an AMQP 1.0 connection.
(Operators might want to lower both defaults.)
3. The name is clearer given that we might introduce
`session_max_per_node` in the future since
`channel_max_per_node` exists for AMQP 0.9.1.
### Additional Context
> Link handles MAY be reused once a link is closed for both send and receive.
> To make it easier to monitor AMQP link attach frames, it is RECOMMENDED that
> implementations always assign the lowest available handle to this field.
* Enforce AMQP 1.0 channel-max
Enforce AMQP 1.0 field `channel-max` in the `open` frame by introducing
a new more user friendly setting called `session_max`:
> The channel-max value is the highest channel number that can be used on the connection.
> This value plus one is the maximum number of sessions that can be simultaneously active on the connection.
We set the default value of `session_max` to 64 such that, by
default, RabbitMQ 4.0 allows maximum 64 AMQP 1.0 sessions per AMQP 1.0 connection.
More than 64 AMQP 1.0 sessions per connection make little sense.
See also https://www.rabbitmq.com/blog/2024/09/02/amqp-flow-control#session
Limiting the maximum number of sessions per connection can be useful to
protect against
* applications that accidentally open new sessions without ending old sessions
(session leaks)
* too many metrics being exposed, for example in the future via the
"/metrics/per-object" Prometheus endpoint with timeseries per session
being emitted.
This commit does not make use of the existing `channel_max` setting
because:
1. Given that `channel_max = 0` means "no limit", there is no way for an
operator to limit the number of sessions per connections to 1.
2. Operators might want to set different limits for maximum number of
AMQP 0.9.1 channels and maximum number of AMQP 1.0 sessions.
3. The default of `channel_max` is very high: It allows using more than
2,000 AMQP 0.9.1 channels per connection. Lowering this default might
break existing AMQP 0.9.1 applications.
This commit also fixes a bug in the AMQP 1.0 Erlang client which, prior
to this commit used channel number 1 for the first session. That's wrong
if a broker allows maximum 1 session by replying with `channel-max = 0`
in the `open` frame. Additionally, the spec recommends:
> To make it easier to monitor AMQP sessions, it is RECOMMENDED that implementations always assign the lowest available unused channel number.
Note that in AMQP 0.9.1, channel number 0 has a special meaning:
> The channel number is 0 for all frames which are global to the connection and 1-65535 for frames that
refer to specific channels.
* Apply PR feedback
With the change in the parent commit we no longer set and clear a
runtime parameter when deleting an exchange as part of vhost deletion.
We need to adapt the `audit_vhost_internal_parameter` test case to test
that the parameter is set and cleared when the exchange is deleted
instead.
Currently we delete each exchange one-by-one which requires three
commands: the delete itself plus a put and delete for a runtime
parameter that acts as a lock to prevent a client from declaring an
exchange while it's being deleted. The lock is unnecessary during vhost
deletion because permissions are cleared for the vhost before any
resources are deleted.
We can use a transaction to delete all exchanges and bindings for a
vhost in a single command against the Khepri store. This minimizes the
number of commands we need to send against the store and therefore the
latency of the deletion.
In a quick test with a vhost containing only 10,000 exchanges (no
bindings, queues, users, etc.), this is an order of magnitude speedup:
the prior commit takes 22s to delete the vhost while with this commit
the vhost is deleted in 2s.
Currently we use a combination of `khepri_tx:get_many/1` and then either
`khepri_tx:delete/1` or `khepri_tx:delete_many/1`. This isn't a
functional change: switching to `khepri_tx_adv:delete_many/1` is
essentially equivalent but performs the deletion and lookup all in one
command and one traversal of the tree. This should improve performance
when deleting many bindings in an exchange.
[Why]
The previous layout followed the flat structure we have in Mnesia:
* In Mnesia, we have tables named after each purpose (exchanges, queues,
runtime parameters and so on).
* In Khepri, we had about the same but the table names were replaced by
a tree node in the tree. We ended up with one tree node per purpose
at the root of the tree.
Khepri implements a tree. We could benefit from this and organize data
to reflect their relationship in RabbitMQ.
[How]
Here is the new hierarchy implemented by this commit:
rabbitmq
|-- users
| `-- $username
|-- vhosts
| `-- $vhost
| |-- user_permissions
| | `-- $username
| |-- exchanges
| | `-- $exchange
| | |-- bindings
| | | |-- queue
| | | | `-- $queue
| | | `-- exchange
| | | `-- $exchange
| | |-- consistent_hash_ring_state
| | |-- jms_topic
| | |-- recent_history
| | |-- serial
| | `-- user_permissions
| | `-- $username
| |-- queues
| | `-- $queue
| `-- runtime_params
| `-- $param_name
|-- runtime_params
| `-- $param_name
|-- mirrored_supervisors
| `-- $group
| `-- $id
`-- node_maintenance
`-- $node
We first define a root path in `rabbit/include/khepri.hrl` as
`[rabbitmq]`. This could be anything, including an empty path.
All paths are constructed either from this root path definition (users
and vhosts paths do that), or from a parent resource's path (exchanges
and queues paths are based on a vhost path).
[Why]
Currently, `rabbit_db_*` modules use and export the following kind of
functions to return the path to the resources they manage:
khepri_db_thing:khepri_things_path(),
khepri_db_thing:khepri_thing_path(Identifier).
Internally, `khepri_db_thing:khepri_thing_path(Identifier)` appends
`Identifier` to the list returned by
`khepri_db_thing:khepri_things_path()`. This works for the organization
of the records we have today in Khepri:
|-- thing
| |-- <<"identifier1">>
| | <<"identifier2">>
`-- other_thing
`-- <<"other_identifier1">>
However, with the upcoming organization that leverages the tree in
Khepri, identifiers may be in the middle of the path instead of a leaf
component. We may also put `other_thing` under `thing` in the tree.
That's why, we can't really expose a parent directory for `thing` and
`other_thing`. Therefore, `khepri_db_thing:khepri_things_path/0` needs
to go away. Only `khepri_db_thing:khepri_thing_path/1` should be
exported and used.
In addition to that, there are several places where paths are hard-coded
(i.e. their definition is duplicated).
[How]
The patch does exactly that. Uses of
`khepri_db_thing:khepri_things_path()` are generally replaced by
`rabbit_db_thing:khepri_thing_path(?KHEPRI_WILDCARD_STAR)`.
Places where the path definitions were duplicated are fixed too by
calling the path building functions.
In the future, for a resource that depends on another one, the
corresponding module will call the `rabbit_db_thing:khepri_thing_path/1`
for that other resource and build its path on top of that.
A while back, @mkuratczyk noted that we keep the timestamp of when a
connection is established in the connection state and related ETS table.
This PR uses the `connected_at` timestamp to calculate the duration of
the connection, to make it easier to identify short-running connections
via the log files.
By default Ra will use the cluster name as the metrics key. Currently
atom values are ignored by the prometheus plugin's tag rendering
functions, so if you have a QQ and Khepri running and request the
`/metrics/per-object` or `/metrics/detailed` endpoints you'll see values
that don't have labels set for the `ra_metrics` metrics:
# TYPE rabbitmq_raft_term_total counter
# HELP rabbitmq_raft_term_total Current Raft term number
rabbitmq_raft_term_total{vhost="/",queue="qq"} 9
rabbitmq_raft_term_total 10
With this change we map the name of the Ra cluster to a "raft_cluster"
tag, so instead an example metric might be:
# TYPE rabbitmq_raft_term_total counter
# HELP rabbitmq_raft_term_total Current Raft term number
rabbitmq_raft_term_total{vhost="/",queue="qq"} 9
rabbitmq_raft_term_total{raft_cluster="rabbitmq_metadata"} 10
This affects metrics for Khepri and the stream coordinator.
[Why]
If a node failed to join a cluster, `rabbit` was restarted then the
feature flags were reset and the error returned. I.e., the error
handling was in a single place at the end of the function.
We need to reset feature flags after a failure because the feature flags
states were copied from the remote node just before the join.
However, resetting them after restarting `rabbit` was incorrect because
feature flags were initialized in a way that didn't match the rest of
the state. This led to crashes during the start of `rabbit`.
[How]
The feature flags are now reset after the failure to join but before
starting `rabbit`.
A new testcase was added to test this scenario.
Because `ct_master` is yet another Erlang node, and it is used
to run multiple CT nodes, meaning it is in a cluster of CT
nodes, the tests that change the net_ticktime could not
work properly anymore. This is because net_ticktime must
be the same value across the cluster.
The same value had to be set for all tests in order to solve
this. This is why it was changed to 5s across the board. The
lower net_ticktime was used in most places to speed up tests
that must deal with cluster failures, so that value is good
enough for these cases.
One test in amqp_client was using the net_ticktime to test
the behavior of the direct connection timeout with varying
net_ticktime configurations. The test now mocks the
`net_kernel:get_net_ticktime()` function to achieve the
same result.
This has no real impact on performance[1] but should
make it clear which application can run the broker
and/or publish to Hex.pm. In particular, applications
that we can't run the broker from will now give up
early if we try to.
Note that while the broker can't normally run from the
amqp_client application's directory, it can run from
tests and some of the tests start the broker.
[1] on my machine
No real need to have two files, especially since it contains
only a few variable definitions. Plan is to only keep
separate files for larger features such as dist or run.
The default of 0.4 was very conservative even when it was
set years ago. Since then:
- we moved to CQv2, which have much more predictable memory usage than (non-lazy) CQv1 used to
- we removed CQ mirroring which caused large sudden memory spikes in some situations
- we removed the option to store message payload in memory in quorum queues
For the past two years or so, we've been running all our internal tests and benchmarks
using the value of 0.8 with no OOMkills at all (note: we do this on
Kubernetes where the Cluster Operators overrides the available memory
levaing some additional headroom, but effectively we are still using more than
0.6 of memory).
It was observed that `rabbit_core_metrics_gc` and
`rabbit_stream_metrics_gc` processes can grow to several MBs of
memory (probably because fetching the list of all queues). As they
execute infrequently (every 2 minutes by default) it can save some
memory to hibernate them in-between (similar to other similar
processes).
[Why]
So far, the code that selected the node to use as the "entry point" to
add the local node to a remote cluster assumed that all cluster members
were running and it picked the first node in the cluster members list.
If that node was stopped, the join would fail immediately, even if the
rest of the members were running fine.
[How]
Now the function filters out nodes that are unavailable or don't run the
expected Khepri store. Then it uses the resulting list as before.
The code returns an error if all nodes are stopped or unreachable.
Before this commit formatting the amqp body would crash and the log
message would not be published to the log exchange.
Before commit 34bcb911 it even crashed the whole exchange logging
handler which caused the log exchange to be deleted.
1.
Prior to this commit, closing a stream connection via:
```
./sbin/rabbitmqctl close_all_user_connections guest enough
```
crashed the stream process as follows:
```
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> crasher:
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> initial call: rabbit_stream_reader:init/1
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> pid: <0.1098.0>
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> registered_name: []
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> exception error: no function clause matching
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> rabbit_stream_reader:open({call,
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> {<0.1233.0>,
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> #Ref<0.519694519.1387790337.15898>}},
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> {shutdown,<<"enough">>},
```
This commit fixes this crash.
2.
Both CLI commands and management plugin use the same way
to close MQTT, Web MQTT, and Stream connections: They all send a message
via `Pid ! {shutdown, Reason}` to the connection.
3.
This commit avoids making `rabbit` core app to know about
'Web MQTT'.
4
This commit simplifies rabbit_mqtt_reader by avoiding another
handle_call clause
1. Only run the CLI tests on a single node cluster. The shared_SUITE is
already very big. Testing the same CLI commands against node-0 on a
3-node cluster brings no benefit.
2. Move the two new CLI test cases in front of
management_plugin_connection because they are similar in that all
three tests close the MQTT connection.
3. There is no need to query the HTTP API for the two new CLI test
cases.
4. There is no need to set keepalive in the two new CLI test cases.
from 128 to 170. See comments for rationale.
On an Ubuntu box, run
```
quiver //host.docker.internal//queues/my-quorum-queue --durable --count 100k --duration 10m --body-size 12 --credit 10000
```
Before this commit:
```
RESULTS
Count ............................................... 100,000 messages
Duration ............................................... 11.0 seconds
Sender rate ........................................... 9,077 messages/s
Receiver rate ......................................... 9,097 messages/s
End-to-end rate ....................................... 9,066 messages/s
```
After this commit:
```
RESULTS
Count ............................................... 100,000 messages
Duration ................................................ 6.2 seconds
Sender rate .......................................... 16,215 messages/s
Receiver rate ........................................ 16,271 messages/s
End-to-end rate ...................................... 16,166 messages/s
```
That's because more `#enqueue{}` Ra commands can be batched before
fsyncing.
So, this commit brings the performance of scenario "a single connection publishing to
a quorum queue with large number (>200) of unconfirmed publishes" in AMQP 1.0
closer to AMQP 0.9.1.
As described in the 4.0 release notes:
> RabbitMQ Shovels will be able connect to a RabbitMQ 4.0 node via AMQP 1.0 only when the Shovel runs on a RabbitMQ node >= 3.13.7.
`{shutdown, Reason}` must be handled into handle_call and not handle_info
`rabbitmqctl close_all_user_connections` calls rabbit_reader which does
a call into the process, the same as rabbitmq_management
Now the API endpoint can return Khepri as
a "queue" (or "stream") without the necessary
number of replicas online.
So don't expect the list to only have one element.
1. If khepri_db is enabled, rabbitmq_metadata is a critical component
2. When waiting for quorum+1, periodically log what doesn't have the
quorum+1
- for components: just list them
- for queues: list how many we are waiting for and how to display
them (because there could be a large number, logging that
could be impractical or even dangerous)
3. make the tests signficantly faster by using a single group
This relaxes assert_list/2 assertion to
not require the size of an actually returned list element
to be exactly equal to the size of the expected one.
Sometimes it makes perfect sense to not assert on
every single key but only a subset, and with this
change, it now will be possible.
Individual tests may choose to assert on all
keys by listing them explicitly.
Don't let the `log` callback of exchange_logging handler crash,
because in case of a crash OTP logger removes the exchange_logger
handler, which in turn deletes the log exchange and its bindings.
It was seen several times in production that the log exchange suddenly
disappears and without debug logging there is no trace of why.
With this commit `erlang:display` will print the reason and stacktrace
to stderr without using the logging infrastructure.
Bump org.springframework.boot:spring-boot-starter-parent from 3.3.2 to 3.3.3 in /deps/rabbitmq_auth_backend_http/examples/rabbitmq_auth_backend_spring_boot
Bump org.springframework.boot:spring-boot-starter-parent from 3.3.2 to 3.3.3 in /deps/rabbitmq_auth_backend_http/examples/rabbitmq_auth_backend_spring_boot_kotlin
The design of `rabbit_amqqueue_process` makes this change challenging.
The old implementation of the handler of the `{delete,_,_,_}` command
simply stopped the process and any cleanup was done in `gen_server2`'s
`terminate` callback. This makes it impossible to pass any error back
to the caller if the record can't be deleted from the metadata store
before a timeout.
The strategy taken here slightly mirrors an existing
`{shutdown, missing_owner}` termination value which can be returned from
`init_it2/3`. We pass the `ReplyTo` for the call with the state. We then
optionally reply to this `ReplyTo` if it is set in `terminate_delete/4`
with the result of `rabbit_amqqueue:internal_delete/3`. So deletion of
a classic queue will terminate the process but may return an error to
the caller if the record can't be removed from the metadata store
before the timeout.
The consumer reader process is gone and there is no way to recover
it as the node does not have a member of the stream anymore,
so it should be cancelled/detached.
When khepri_db feature flag is disabled, Khepri servers
are running but are not clustered. In this case `rabbit_khepri:status/0`
shows that all nodes are leaders, which is confusing and scary
(even though actually harmless). Instead, we now just print that mnesia
is in use.
This release contains fixes around certain recovery failures where
there are either orphaned segment files (that do not have a corresponding
index file) or index files that do not have a corresponding segment
file.
With the prior behavior it can be unclear whether the text was a warning
and the feature flag was enabled anyways. We can use a non-zero exit
code and the `{:error, code, text}` return value to make it clear that
the flag wasn't enabled.
[Why]
All other queries are based on projections, not direct queries to
Khepri. Using projections for exchange names should be faster and more
consistent with the rest of the module.
[How]
The Khepri query is replaced by an ETS query.
Rename the two quorum queue priority levels from "low" and "high" to "normal" and
"high". This improves user experience because the default priority level is low /
normal. Prior to this commit users were confused why their messages show
up as low priority. Furthermore there is no need to consult the docs to
know whether the default priority level is low or high.
For RabbitMQ 4.0, this commit removes support for the deprecated `rabbitmq.conf` settings
```
cluster_formation.randomized_startup_delay_range.min
cluster_formation.randomized_startup_delay_range.max
```
The rabbitmq/cluster-operator already removed these settings in
b81e0f9bb8
This test called `rabbit_db_queue:update_decorators/1` which doesn't
exist - instead it can call `update_decorators/2` with an empty list.
This commit also adds the test to the `all_tests/0` list - it being
absent is why this wasn't caught before.
RabbitMQ should advertise the SASL mechanisms in the order as
configured in `rabbitmq.conf`.
Starting RabbitMQ with the following `rabbitmq.conf`:
```
auth_mechanisms.1 = PLAIN
auth_mechanisms.2 = AMQPLAIN
auth_mechanisms.3 = ANONYMOUS
```
translates prior to this commit to:
```
1> application:get_env(rabbit, auth_mechanisms).
{ok,['ANONYMOUS','AMQPLAIN','PLAIN']}
```
and after this commit to:
```
1> application:get_env(rabbit, auth_mechanisms).
{ok,['PLAIN','AMQPLAIN','ANONYMOUS']}
```
In our 4.0 docs we write:
> The server mechanisms are ordered in decreasing level of preference.
which complies with https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-security-v1.0-os.html#type-sasl-mechanisms
returned by Ra, e.g. when a replica cannot be
restarted because of a concurrent delete
or because a QQ was inserted into a schema data
store but not yet registered as a process on
the node.
References #12013.
This commit is a breaking change in RabbitMQ 4.0.
## What?
Remove mqtt.default_user and mqtt.default_pass
Instead, rabbit.anonymous_login_user and rabbit.anonymous_login_pass
should be used.
## Why?
RabbitMQ 4.0 simplifies anonymous logins.
There should be a single configuration place
```
rabbit.anonymous_login_user
rabbit.anonymous_login_pass
```
that is used for anonymous logins for any protocol.
Anonymous login is orthogonal to the protocol the client uses.
Hence, there should be a single configuration place which can then be
used for MQTT, AMQP 1.0, AMQP 0.9.1, and RabbitMQ Stream protocol.
This will also simplify switching to SASL for MQTT 5.0 in the future.
## 1. Introduce new SASL mechanism ANONYMOUS
### What?
Introduce a new `rabbit_auth_mechanism` implementation for SASL
mechanism ANONYMOUS called `rabbit_auth_mechanism_anonymous`.
### Why?
As described in AMQP section 5.3.3.1, ANONYMOUS should be used when the
client doesn't need to authenticate.
Introducing a new `rabbit_auth_mechanism` consolidates and simplifies how anonymous
logins work across all RabbitMQ protocols that support SASL. This commit
therefore allows AMQP 0.9.1, AMQP 1.0, stream clients to connect out of
the box to RabbitMQ without providing any username or password.
Today's AMQP 0.9.1 and stream protocol client libs hard code RabbitMQ default credentials
`guest:guest` for example done in:
* 0215e85643/src/main/java/com/rabbitmq/client/ConnectionFactory.java (L58-L61)
* ddb7a2f068/uri.go (L31-L32)
Hard coding RabbitMQ specific default credentials in dozens of different
client libraries is an anti-pattern in my opinion.
Furthermore, there are various AMQP 1.0 and MQTT client libraries which
we do not control or maintain and which still should work out of the box
when a user is getting started with RabbitMQ (that is without
providing `guest:guest` credentials).
### How?
The old RabbitMQ 3.13 AMQP 1.0 plugin `default_user`
[configuration](146b4862d8/deps/rabbitmq_amqp1_0/Makefile (L6))
is replaced with the following two new `rabbit` configurations:
```
{anonymous_login_user, <<"guest">>},
{anonymous_login_pass, <<"guest">>},
```
We call it `anonymous_login_user` because this user will be used for
anonymous logins. The subsequent commit uses the same setting for
anonymous logins in MQTT. Hence, this user is orthogonal to the protocol
used when the client connects.
Setting `anonymous_login_pass` could have been left out.
This commit decides to include it because our documentation has so far
recommended:
> It is highly recommended to pre-configure a new user with a generated username and password or delete the guest user
> or at least change its password to reasonably secure generated value that won't be known to the public.
By having the new module `rabbit_auth_mechanism_anonymous` internally
authenticate with `anonymous_login_pass` instead of blindly allowing
access without any password, we protect operators that relied on the
sentence:
> or at least change its password to reasonably secure generated value that won't be known to the public
To ease the getting started experience, since RabbitMQ already deploys a
guest user with full access to the default virtual host `/`, this commit
also allows SASL mechanism ANONYMOUS in `rabbit` setting `auth_mechanisms`.
In production, operators should disable SASL mechanism ANONYMOUS by
setting `anonymous_login_user` to `none` (or by removing ANONYMOUS from
the `auth_mechanisms` setting. This will be documented separately.
Even if operators forget or don't read the docs, this new ANONYMOUS
mechanism won't do any harm because it relies on the default user name
`guest` and password `guest`, which is recommended against in
production, and who by default can only connect from the local host.
## 2. Require SASL security layer in AMQP 1.0
### What?
An AMQP 1.0 client must use the SASL security layer.
### Why?
This is in line with the mandatory usage of SASL in AMQP 0.9.1 and
RabbitMQ stream protocol.
Since (presumably) any AMQP 1.0 client knows how to authenticate with a
username and password using SASL mechanism PLAIN, any AMQP 1.0 client
also (presumably) implements the trivial SASL mechanism ANONYMOUS.
Skipping SASL is not recommended in production anyway.
By requiring SASL, configuration for operators becomes easier.
Following the principle of least surprise, when an an operator
configures `auth_mechanisms` to exclude `ANONYMOUS`, anonymous logins
will be prohibited in SASL and also by disallowing skipping the SASL
layer.
### How?
This commit implements AMQP 1.0 figure 2.13.
A follow-up commit needs to be pushed to `v3.13.x` which will use SASL
mechanism `anon` instead of `none` in the Erlang AMQP 1.0 client
such that AMQP 1.0 shovels running on 3.13 can connect to 4.0 RabbitMQ nodes.
* Support SASL mechanism EXTERNAL in Erlang AMQP 1.0 client
* Move test to plugin rabbitmq_auth_mechanism_ssl
In theory, there can be other plugin that offer SASL mechanism EXTERNAL.
Therefore, instead of adding a test dependency from app rabbit to app
rabbitmq_auth_mechanism_ssl, it's better to test this plugin specific
functionality directly in the plugin itself.
'ctl encode' is unfortunately name and targets
advanced.config commands.
This introduce a command that targets 'rabbitmq.conf'
values and has a more specific name.
Eventually 'ctl encode' will be aliased and deprecated,
although we still do not have an aliasing mechanism
and it won't be in scope for 4.0.
Transient queue deletion previously caused a crash if Khepri was enabled
and a node with a transient queue went down while its cluster was in a
minority. We need to handle the `{error,timeout}` return possible from
`rabbit_db_queue:delete_transient/1`. In the
`rabbit_amqqueue:on_node_down/1` callback we log a warning when we see
this return.
We then try this deletion again during that node's
`rabbit_khepri:init/0` which is called from a boot step after
`rabbit_khepri:setup/0`. At that point we can return an error and halt
the node's boot if the command times out. The cluster is very likely to
be in a majority at that point since `rabbit_khepri:setup/0` waits for
a leader to be elected (requiring a majority).
This fixes a crash report found in the `cluster_minority_SUITE`'s
`end_per_group`.
The prior code skirted transactions because the filter function might
cause Khepri to call itself. We want to use the same idea as the old
code - get all queues, filter them, then delete them - but we want to
perform the deletion in a transaction and fail the transaction if any
queues changed since we read them.
This fixes a bug - that the call to `delete_in_khepri/2` could return
an error tuple that would be improperly recognized as `Deletions` -
but should also make deleting transient queues atomic and fast.
Each call to `delete_in_khepri/2` needed to wait on Ra to replicate
because the deletion is an individual command sent from one process.
Performing all deletions at once means we only need to wait for one
command to be replicated across the cluster.
We also bubble up any errors to delete now rather than storing them as
deletions. This fixes a crash that occurs on node down when Khepri is
in a minority.
This makes possible to specify an encrypted
value in rabbitmq.conf using a prefix.
For example, to specify a default user password
as an encrypted value:
``` ini
default_user = bunnies-444
default_pass = encrypted:F/bjQkteQENB4rMUXFKdgsJEpYMXYLzBY/AmcYG83Tg8AOUwYP7Oa0Q33ooNEpK9
```
``` erl
[
{rabbit, [
{config_entry_decoder, [
{passphrase, <<"bunnies">>}
]}
]}
].
```
This fixes a case-clause crash in the logs in `cluster_minority_SUITE`.
When the database is not available `rabbit_amqqueue:declare/6,7` should
return a `protocol_error` record with an error message rather than a
hard crash. Also included in this change is the necessary changes to
typespecs: `rabbit_db_queue:create_or_get/1` is the first function to
return a possible `{error,timeout}`. That bubbles up through
`rabbit_amqqueue:internal_declare/3` and must be handled in each
`rabbit_queue_type:declare/2` callback.
`rabbit_misc:rs/1` for a queue resource will print
`queue '<QName>' in vhost '<VHostName>'` so the "a queue" and
surrounding single quotes should be removed here.
* Log AMQP connection name and container-id
Fixes#11958
## What
Log container-id and connection name.
Example JSON log:
```
{"time":"2024-08-12 10:49:44.365724+02:00","level":"info","msg":"accepting AMQP connection [::1]:56754 -> [::1]:5672","pid":"<0.1164.0>","domain":"rabbitmq.connection"}
{"time":"2024-08-12 10:49:44.381244+02:00","level":"debug","msg":"User 'guest' authenticated successfully by backend rabbit_auth_backend_internal","pid":"<0.1164.0>","domain":"rabbitmq","connection":"[::1]:56754 -> [::1]:5672"}
{"time":"2024-08-12 10:49:44.381578+02:00","level":"info","msg":"AMQP 1.0 connection from container 'my container ID': user 'guest' authenticated and granted access to vhost '/'","pid":"<0.1164.0>","domain":"rabbitmq.connection","connection":"[::1]:56754 -> [::1]:5672","container_id":"my container ID"}
{"time":"2024-08-12 10:49:44.381654+02:00","level":"debug","msg":"AMQP 1.0 connection.open frame: hostname = localhost, extracted vhost = /, idle-time-out = {uint,\n 30000}","pid":"<0.1164.0>","domain":"rabbitmq","connection":"[::1]:56754 -> [::1]:5672","container_id":"my container ID"}
{"time":"2024-08-12 10:49:44.386412+02:00","level":"debug","msg":"AMQP 1.0 created session process <0.1170.0> for channel number 0","pid":"<0.1164.0>","domain":"rabbitmq","connection":"[::1]:56754 -> [::1]:5672","container_id":"my container ID"}
{"time":"2024-08-12 10:49:46.387957+02:00","level":"debug","msg":"AMQP 1.0 closed session process <0.1170.0> with channel number 0","pid":"<0.1164.0>","domain":"rabbitmq","connection":"[::1]:56754 -> [::1]:5672","container_id":"my container ID"}
{"time":"2024-08-12 10:49:46.388201+02:00","level":"info","msg":"closing AMQP connection ([::1]:56754 -> [::1]:5672)","pid":"<0.1164.0>","domain":"rabbitmq.connection","connection":"[::1]:56754 -> [::1]:5672","container_id":"my container ID"}
```
If JSON logging is not used, this commit still includes the container-ID
once at info level:
```
2024-08-12 10:48:57.451580+02:00 [info] <0.1164.0> accepting AMQP connection [::1]:56715 -> [::1]:5672
2024-08-12 10:48:57.465924+02:00 [debug] <0.1164.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2024-08-12 10:48:57.466289+02:00 [info] <0.1164.0> AMQP 1.0 connection from container 'my container ID': user 'guest' authenticated and granted access to vhost '/'
2024-08-12 10:48:57.466377+02:00 [debug] <0.1164.0> AMQP 1.0 connection.open frame: hostname = localhost, extracted vhost = /, idle-time-out = {uint,
2024-08-12 10:48:57.466377+02:00 [debug] <0.1164.0> 30000}
2024-08-12 10:48:57.470800+02:00 [debug] <0.1164.0> AMQP 1.0 created session process <0.1170.0> for channel number 0
2024-08-12 10:48:59.472928+02:00 [debug] <0.1164.0> AMQP 1.0 closed session process <0.1170.0> with channel number 0
2024-08-12 10:48:59.473332+02:00 [info] <0.1164.0> closing AMQP connection ([::1]:56715 -> [::1]:5672)
```
## Why?
See #11958 and https://www.rabbitmq.com/docs/connections#client-provided-names
To provide a similar feature to AMQP 0.9.1 this commit uses container-id as sent by the client in the open frame.
> Examples of containers are brokers and client applications.
The advantage is that the `container-id` is mandatory. Hence, in AMQP 1.0, we can enforce the desired behaviour that we document on our website for AMQP 0.9.1:
> The name is optional; however, developers are strongly encouraged to provide one as it would significantly simplify certain operational tasks.
* Clarify that container refers to AMQP 1.0
Rename container_id to amqp_container and change log message such that
it's unambigious that the word "container" refers to AMQP 1.0 containers
(to reduce confusion with the meaning of "container" in Docker / Kubernetes).
This suite uses the mixed version secondary umbrella as a starting
version for a cluster and then has a helper to upgrade the cluster to
the current code. This is meant to ensure that we can upgrade from the
previous minor.
This keeps functions pure and ensures that existing links do not break
if an operator were to dynamically change the server's max_message_size.
Each link now has a max_message_size:
* incoming links as determined by RabbitMQ config
* outgoing links as determined by the client
Put credit configuration into session state to make functions pure.
Although these credit configurations are not meant to be dynamically
changed at runtime, prior to this commit it could happen that
persistent_term:get/1 returns different results across invocations
leading to bugs in how credit is granted and recorded.
Prior to this commit, test
```
ERL_AFLAGS="+S 2" make -C deps/rabbit ct-amqp_client t=cluster_size_3:detach_requeues_two_connections_quorum_queue
```
failed rarely locally, and more often in CI.
An instance of a failed test in CI is
https://github.com/rabbitmq/rabbitmq-server/actions/runs/10298099899/job/28502687451?pr=11945
The test failed with:
```
=== === Reason: {assertEqual,[{module,amqp_client_SUITE},
{line,2800},
{expression,"amqp10_msg : body ( Msg1 )"},
{expected,[<<"1">>]},
{value,[<<"2">>]}]}
in function amqp_client_SUITE:detach_requeues_two_connections/2 (amqp_client_SUITE.erl, line 2800)
```
because it could happen that Receiver1's credit top up to the quorum
queue is applied before Receiver0's credit top up such that Receiver1
gets enqueued to the ServiceQueue before Receiver0.
This commit contains the following new quorum queue features:
* Fair share high/low priorities
* SAC consumers honour consumer priorities
* Credited consumer refactoring to meet AMQP requirements.
* Use checkpoints feature to reduce memory use for queues with long backlogs
* Consumer cancel option that immediately removes consumer and returns all pending messages.
* More compact commands of the most common commands such as enqueue, settle and credit
* Correctly track the delivery-count to be compatible with the AMQP spec
* Support the "modified" AMQP 1.0 outcome better.
Commits:
* Quorum queues v4 scaffolding.
Create the new version but not including any changes yet.
QQ: force delete followers after leader has terminated.
Also try a longer sleep for mqtt_shared_SUITE so that the
delete operation stands a chance to time out and move on
to the forced deletion stage.
In some mixed machine version scenarios some followers will never
apply the poison pill command so we may as well force delete them
just in case.
QQ: skip test in amqp_client that cannot pass with mixed machine versions
QQ: remove dead code
Code relating to prior machine versions and state conversions.
rabbit_fifo_prop_SUITE fixes
* QQ: add v4 ff and new more compact enqueue command.
Also update rabbit_fifo_* suites to test more relevant code versions
where applicable.
QQ: always use the updated credit mode format
QQv4: use more compact consumer reference in settle, credit, return
This introudces a new type: consumer_key() which is either the consumer_id
or the raft index the checkout was processed at. If the consumer is
using one of the updated credit spec formats rabbit_fifo will use the
raft index as the primary key for the consumer such that the rabbit
fifo client can then use the more space efficient integer index
instead of the full consumer id in subsequent commands.
There is compatibility code to still accept the consumer id in
settle, return, discard and credit commands but this is slighlyt
slower and of course less space efficient.
The old form will be used in cases where the fifo client may have
already remove the local consumer state (as happens after a cancel).
Lots of test refactorings of the rabbit_fifo_SUITE to begin to use
the new forms.
* More test refactoring and new API fixes
rabbit_fifo_prop_SUITE refactoring and other fixes.
* First pass SAC consumer priority implementation.
Single active consumers will be activated if they have a higher priority
than the currently active consumer. if the currently active consumer
has pending messages, no further messages will be assigned to the
consumer and the activation of the new consumer will happen once
all pending messages are settled. This is to ensure processing order.
Consumers with the same priority will internally be ordered to
favour those with credit then those that attached first.
QQ: add SAC consumer priority integration tests
QQ: add check for ff in tests
* QQ: add new consumer cancel option: 'remove'
This option immediately removes and returns all messages for a
consumer instead of the softer 'cancel' option which keeps the
consumer around until all pending messages have been either
settled or returned.
This involves a change to the rabbit_queue_type:cancel/5 API
to rabbit_queue_type:cancel/3.
* QQ: capture checked out time for each consumer message.
This will form the basis for queue initiated consumer timeouts.
* QQ: Refactor to use the new ra_machine:handle_aux/5 API
Instead of the old ra_machine:handle_aux/6 callback.
* QQ hi/lo priority queue
* QQ: Avoid using mc:size/1 inside rabbit_fifo
As we dont want to depend on external functions for things that may
change the state of the queue.
* QQ bug fix: Maintain order when returning multiple
Prior to this commit, quorum queues requeued messages in an undefined
order, which is wrong.
This commit fixes this bug and requeues messages always in the order as
nacked / rejected / released by the client.
We ensure that order of requeues is deterministic from the client's
point of view and doesn't depend on whether the quorum queue soft limit
was exceeded temporarily.
So, even when rabbit_fifo_client batches requeues, the order as nacked
by the client is still maintained.
* Simplify
* Add rabbit_quorum_queue:file_handle* functions back.
For backwards compat.
* dialyzer fix
* dynamic_qq_SUITE: avoid mixed versions failure.
* QQ: track number of requeues for message.
To be able to calculate the correct value for the AMQP delivery_count
header we need to be able to distinguish between messages that were
"released" or returned in QQ speak and those that were returned
due to errors such as channel termination.
This commit implement such tracking as well as the calculation
of a new mc annotations `delivery_count` that AMQP makes use
of to set the header value accordingly.
* Use QQ consumer removal when AMQP client detaches
This enables us to unskip some AMQP tests.
* Use AMQP address v2 in fsharp-tests
* QQ: track number of requeues for message.
To be able to calculate the correct value for the AMQP delivery_count
header we need to be able to distinguish between messages that were
"released" or returned in QQ speak and those that were returned
due to errors such as channel termination.
This commit implement such tracking as well as the calculation
of a new mc annotations `delivery_count` that AMQP makes use
of to set the header value accordingly.
* rabbit_fifo: Use Ra checkpoints
* quorum queues: Use a custom interval for checkpoints
* rabbit_fifo_SUITE: List actual effects in ?ASSERT_EFF failure
* QQ: Checkpoints modifications
* fixes
* QQ: emit release cursors on tick for followers and leaders
else followers could end up holding on to segments a bit longer
after traffic stops.
* Support draining a QQ SAC waiting consumer
By issuing drain=true, the client says "either send a transfer or a flow frame".
Since there are no messages to send to an inactive consumer, the sending
queue should advance the delivery-count consuming all link-credit and send
a credit_reply with drain=true to the session proc which causes the session
proc to send a flow frame to the client.
* Extract applying #credit{} cmd into 2 functions
This commit is only refactoring and doesn't change any behaviour.
* Fix default priority level
Prior to this commit, when a message didn't have a priority level set,
it got enqueued as high prio.
This is wrong because the default priority is 4 and
"for example, if 2 distinct priorities are implemented,
then levels 0 to 4 are equivalent, and levels 5 to 9 are equivalent
and levels 4 and 5 are distinct."
Hence, by default a message without priority set, must be enqueued as
low prio.
* bazel run gazelle
* Avoid deprecated time unit
* Fix aux_test
* Delete dead code
* Fix rabbit_fifo_q:get_lowest_index/1
* Delete unused normalize functions
* Generate less garbage
* Add integration test for QQ SAC with consumer priority
* Improve readability
* Change modified outcome behaviour
With the new quorum queue v4 improvements where a requeue counter was
added in addition to the quorum queue delivery counter, the following
sentence from https://github.com/rabbitmq/rabbitmq-server/pull/6292#issue-1431275848
doesn't apply anymore:
> Also the case where delivery_failed=false|undefined requires the release of the
> message without incrementing the delivery_count. Again this is not something
> that our queues are able to do so again we have to reject without requeue.
Therefore, we simplify the modified outcome behaviour:
RabbitMQ will from now on only discard the message if the modified's
undeliverable-here field is true.
* Introduce single feature flag rabbitmq_4.0.0
## What?
Merge all feature flags introduced in RabbitMQ 4.0.0 into a single
feature flag called rabbitmq_4.0.0.
## Why?
1. This fixes the crash in
https://github.com/rabbitmq/rabbitmq-server/pull/10637#discussion_r1681002352
2. It's better user experience.
* QQ: expose priority metrics in UI
* Enable skipped test after rebasing onto main
* QQ: add new command "modify" to better handle AMQP modified outcomes.
This new command can be used to annotate returned or rejected messages.
This commit also retains the delivery-count across dead letter boundaries
such that the AMQP header delivery-count field can now include _all_ failed
deliver attempts since the message was originally received.
Internally the quorum queue has moved it's delivery_count header to
only track the AMQP protocol delivery attempts and now introduces
a new acquired_count to track all message acquisitions by consumers.
* Type tweaks and naming
* Add test for modified outcome with classic queue
* Add test routing on message-annotations in modified outcome
* Skip tests in mixed version tests
Skip tests in mixed version tests because feature flag
rabbitmq_4.0.0 is needed for the new #modify{} Ra command
being sent to quorum queues.
---------
Co-authored-by: David Ansari <david.ansari@gmx.de>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
This could happen if a leader election occurred just before the
the member removal was initiated. In particular this could
happen when stopping and forgetting an existing rabbit node.
The leader returned in rabbit_quorum_queue:info/2 only ever queried
the pid field from the queue record when more up to date info could
have been available in the ra_leaderboard table.
Test case rabbit_mqtt_qos0_queue_kill_node flaked because after an
MQTT client subscribes on node 0, RabbitMQ returns success
and replicated the new binding to node 0 and node 1, but not
yet to node 2. Another MQTT client then publishes on node 2
without the binding being present yet on node 2, and the
message therefore isn't routed.
This commit attempts to eliminate this flake.
It adds a function to rabbit_ct_broker_helpers which waits until a given
node has caught up with the leader node.
We can reuse that function in future to eliminate more test flakes.
## What?
`mc:init()` already sets mc annotation `rts` (received timestamp).
This commit reuses this timestamp in `rabbit_message_interceptor`.
## Why?
`os:system_time/1` can jump forward or backward between invocations.
Using two different timestamps for the same meaning, the time the message
was received by RabbitMQ, can be misleading.
For consistency with other protocols (to protect from potential DoS attacks).
Wrong credentials and virtual host access errors trigger the delay.
References #11831
We keep the delay low when running tests. Otherwise,
```
make -C deps/rabbitmq_mqtt ct-auth
```
would run 3 minutes longer (with a SILENT_CLOSE_DELAY of 3 seconds).
Log warnings when:
- Local node is not present. Even though we force it on the node
list, this will not work for other cluster nodes if they have
the same list.
- There are duplicated nodes
## What?
Prior to this commit connecting 40k AMQP clients with 5 sessions each,
i.e. 200k sessions in total, took 7m55s.
After to this commit the same scenario takes 1m37s.
Additionally, prior to this commit, disconnecting all connections and sessions
at once caused the pg process to become overloaded taking ~14 minutes to
process its mailbox.
After this commit, these same deregistrations take less than 5 seconds.
To repro:
```go
package main
import (
"context"
"log"
"time"
"github.com/Azure/go-amqp"
)
func main() {
for i := 0; i < 40_000; i++ {
if i%1000 == 0 {
log.Printf("opened %d connections", i)
}
conn, err := amqp.Dial(
context.TODO(),
"amqp://localhost",
&amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
if err != nil {
log.Fatal("open connection:", err)
}
for j := 0; j < 5; j++ {
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("begin session:", err)
}
}
}
log.Println("opened all connections")
time.Sleep(5 * time.Hour)
}
```
## How?
This commit uses separate pg scopes (that is processes and ETS tables) to register
AMQP connections and AMQP sessions. Since each Pid is now its own group,
registration and deregistration is fast.
Sometimes in CI under Khepri, the test case errored with:
```
receiver_attached flushed: {amqp10_event,
{session,<0.396.0>,
{ended,
{'v1_0.error',
{symbol,<<"amqp:internal-error">>},
{utf8,
<<"stream queue 'leader_transfer_stream_credit_single' in vhost '/' does not have a running replica on the local node">>},
undefined}}}}
```
Similar to other RabbitMQ internal credit flow configurations such as
`credit_flow_default_credit` and `msg_store_credit_disc_bound`, this
commit makes the `classic_queue_consumer_unsent_message_limit`
configurable via `advanced.config`.
See https://github.com/rabbitmq/rabbitmq-server/pull/11822 for the
original motivation to make this setting configurable.
Fixes#11841
PR #11307 introduced the invariant that at most one credit request between
session proc and quorum queue proc can be in flight at any given time.
This is not the case when rabbit_fifo_client re-sends credit
requests on behalf of the session proc when the quorum queue leader changes.
This commit therefore removes assertions which assumed only a single credit
request to be in flight.
This commit also removes field queue_flow_ctl.desired_credit
since it is redundant to field client_flow_ctl.credit
This fixes a potential crash in `rabbit_amqp_amanegment` where we tried
to format the exchange resource as a string (`~ts`). The other changes
are cosmetic.
`rabbit_amqp_management` returns HTTP status codes to the client. 503
means that a service is unavailable (which Khepri is while it is in a
minority) so it's a more appropriate code than the generic 500
internal server error.
Previously we used the `registered` approach where all Ra servers that
have a registered name would be recovered. This could have unintended
side effects for queues that e.g. were deleted when not all members of
a quorum queueu were running when the queue was deleted. In this case
the Ra system would have recovered the members that were not deleted
which is not ideal as a dangling member would just sit and loop in
pre vote state and a future declaration of the queue may partially
fail.
Instead we rely on the meta data store for the truth about which
members should be restarted after a ra system restart.
Sometimes on Khepri the test failed with:
```
=== Ended at 2024-07-24 10:07:15
=== Location: [{gen_server,call,419},
{amqpl_direct_reply_to_SUITE,rpc,226},
{test_server,ts_tc,1793},
{test_server,run_test_case_eval1,1302},
{test_server,run_test_case_eval,1234}]
=== === Reason: {{shutdown,
{server_initiated_close,404,
<<"NOT_FOUND - no queue 'tests.amqpl_direct_reply_to.rpc.requests' in vhost '/'">>}},
{gen_server,call,
[<0.272.0>,
{call,
{'basic.get',0,
<<"tests.amqpl_direct_reply_to.rpc.requests">>,
false},
none,<0.246.0>},
infinity]}}
```
https://github.com/rabbitmq/rabbitmq-server/actions/runs/10074558971/job/27851173817?pr=11809
shows an instance of this flake.
Prior to this commit, a crash occurred when a consistent hash exchange
got declared with a `hash-header` argument, but the publishing client
didn't set that header on the message.
This bug is present in RabbitMQ 3.13.0 - 3.13.6.
Fixes https://github.com/rabbitmq/rabbitmq-server/discussions/11671
Collecting them on a large system (tens of thousands of processes
or more) can be time consuming as we iterate over all processes.
By putting them on a separate endpoint, we make that opt-in
The spec of `rabbit_exchange:declare/7` needs to be updated to return
`{ok, Exchange} | {error, Reason}` instead of the old return value of
`rabbit_types:exchange()`. This is safe to do since `declare/7` is not
called by RPC - from the CLI or otherwise - outside of test suites, and
in test suites only through the CLI's `TestHelper.declare_exchange/7`.
Callers of this helper are updated in this commit.
Otherwise this commit updates callers to unwrap the `{ok, Exchange}`
and bubble up errors.
It's unlikely that these operations will time out since the serial
number is always updated after some other transaction, for example
adding or deleting an exchange.
In the future we could consider moving the serial updates into those
transactions. In the meantime we can remove the possibility of timeouts
by giving the serial update unlimited time to finish.
A common case for exchange deletion is that callers want the deletion
to be idempotent: they treat the `ok` and `{error, not_found}` returns
from `rabbit_exchange:delete/3` the same way. To simplify these
callsites we add a `rabbit_exchange:ensure_deleted/3` that wraps
`rabbit_exchange:delete/3` and returns `ok` when the exchange did not
exist. Part of this commit is to update callsites to use this helper.
The other part is to handle the `rabbit_khepri:timeout()` error possible
when Khepri is in a minority. For most callsites this is just a matter
of adding a branch to their `case` clauses and an appropriate error and
message.
We need to bubble up the error through the caller
`rabbit_vhost:delete/2`. The CLI calls `rabbit_vhost:delete/2` and
already handles the `{error, timeout}` but the management UI needs an
update so that an HTTP DELETE returns an error code when the deletion
times out.
This function throws if the database fails to apply the transaction.
This function is only called by the `rabbit_vhost_limit` runtime
parameter module in its `notify/5` and `notify_clear/4` callbacks. These
callers have no way of handling this error but it should be very
difficult for them to face this crash: setting the runtime parameter
would need to succeed first which needs Khepri to be in majority. Khepri
would need to enter a minority between inserting/updating/deleting the
runtime parameter and updating the vhost. It's possible but unlikely.
In the future we could consider refactoring vhost limits to update the
vhost as the runtime parameter is changed, transactionally. I figure
that to be a very large change though so we leave this to the future.
This error is already handled by the callers of
`rabbit_vhost:update_metadata/3` (the CLI) and `rabbit_vhost:put_vhost/6`
(see the parent commit) but was just missing from the spec.
`rabbit_definitions:concurrent_for_all/4` doesn't pay any attention to
the return value of the `Fun`, only counting an error when it catches
`{error, E}`. So we need to `throw/1` the error from
`rabbit_vhost:put_vhost/6`.
The other callers of `rabbit_vhost:put_vhost/6` - the management UI and
the CLI (indirectly through `rabbit_vhost:add/2,3`) already handle this
error return.
`create_or_get_in_khepri/2` throws errors like the
`rabbit_khepri:timeout_error()`. Callers of `create_or_get/3` like
`rabbit_vhost:do_add/3` and its callers handle the throw with a `try`/
`catch` block and return the error tuple, which is then handled by
their callers.
This ensures that the call graph of `rabbit_db_binding:create/2` and
`rabbit_db_binding:delete/2` handle the `{error, timeout}` error
possible when Khepri is in a minority.
This is essentially a cosmetic change. Read-only transactions are done
with queries in Khepri rather than commands, like read-write
transactions. Local queries cannot timeout like commands so marking the
transaction as 'ro' means that we don't need to handle a potential
'{error, timeout}' return.
This case only targets Khepri. Instead of setting the `metadata_store`
config option we should skip the test when the configured metadata
store is mnesia.
This contains a fix in the ra_directory module to ensure
names can be deleted even when a Ra server has never been started
during the current node lifetime.
Also contains a small tweak to ensure the ra_directory:unregister_name
is called before deleting a Ra data directory which is less likely
to cause a corrupt state that will stop a Ra system from starting.
Configuring the mock authentication backend blocks
and generates an error in the test process when the
broker goes down. The error report makes the test fail
in some environments.
The process where the setup takes place must stay up
otherwise the ETS table used will go away.
This commit makes sure the broker-side authentication backend
setup returns at the end of the test. This way the calling
process terminates in a normal way.
build(deps): bump org.springframework.boot:spring-boot-starter-parent from 3.3.1 to 3.3.2 in /deps/rabbitmq_auth_backend_http/examples/rabbitmq_auth_backend_spring_boot
The rabbitmqqueue:declare is handled, and in case of known errors, the correct error code is sent back.
Signed-off-by: Gabriele Santomaggio <g.santomaggio@gmail.com>
Configuring the mock authentication backend blocks
and generates an error in the test process when the
broker goes down. The error report makes the test fail
in some environments.
The process where the setup takes place must stay up
otherwise the ETS table used will go away.
This commit makes sure the broker-side authentication backend
setup returns at the end of the test. This way the calling
process terminates in a normal way.
Add copies of some per-object metrics that are labeled per-channel
aggregated to reduce cardinality. These metrics are valuable and
easier to process if exposed on per-exchange and per-queue basis.
Make `check_if_node_is_mirror_sync_critical` a no-op
with a deprecation warning. Since this command is commonly used
as part of the node shutdown process (eg. by Cluster Operator),
making it a no-op instead of removing completly will make the
transition to 4.0 easier for users.
The AMQP 0.9.1 longstr type is problematic as it can contain arbitrary
binary data but is typically used for utf8 by users.
The current conversion into AMQP avoids scanning arbitrarily large
longstr to see if they only contain valid utf8 by treating all
longstr data longer than 255 bytes as binary. This is in hindsight
too strict and thus this commit increases the scanning limit to
4096 bytes - enough to cover the vast majority of AMQP 0.9.1 header
values.
This change also conversts the AMQP binary types into longstr to
ensure that existing data (held in streams for example) is converted
to an AMQP 0.9.1 type most likely what the user intended.
Arguments
* `rabbitmq:stream-offset-spec`,
* `rabbitmq:stream-filter`,
* `rabbitmq:stream-match-unfiltered`
are set in the `filter` field of the `Source`.
This makes sense for these consumer arguments because:
> A filter acts as a function on a message which returns a boolean result
> indicating whether the message can pass through that filter or not.
Consumer priority is not really such a predicate.
Therefore, it makes more sense to set consumer priority in the
`properties` field of the `Attach` frame.
We call the key `rabbitmq:priority` which maps to consumer argument
`x-priority`.
While AMQP 0.9.1 consumers are allowed to set any integer data
type for the priority level, this commit decides to enforce an `int`
value (range -(2^31) to 2^31 - 1 inclusive).
Consumer priority levels outside of this range are not needed in
practice.
`ets:whereis/1` adds some overhead - it's two ETS calls rather than one
when `ets:whereis/1` returns a table identifier. It's also not atomic:
the table could disappear between `ets:whereis/1` calls and the call to
read data from a projection. We replace all `ets:whereis/1` calls on
projection tables with `try`/`catch` and return default values when we
catch the `badarg` `error` which ETS emits when passed a non-existing
table name.
One special case though is `ets:info/2` which returns `undefined` when
passed a non-existing table names. That block is refactored to use a
`case` instead.
* Deprecate queue-master-locator
This should not be a breaking change - all validation should still pass
* CQs can now use `queue-leader-locator`
* `queue-leader-locator` takes precedence over `queue-master-locator` if both are used
* regardless of which name is used, effectively there are only two values: `client-local` (default) or `balanced`
* other values (`min-masters`, `random`, `least-leaders`) are mapped to `balanced`
* Management UI no longer shows `master-locator` fields when declaring a queue/policy, but such arguments can still be used manually (unless not permitted)
* exclusive queues are always declared locally, as before
Since feature flag `message_containers` introduced in 3.13.0 is required in 4.0,
we can also require all other feature flags introduced in or before 3.13.0
and remove their compatibility code for 4.0:
* restart_streams
* stream_sac_coordinator_unblock_group
* stream_filtering
* stream_update_config_command
Projections might not be available in a mixed-version scenario where a
cluster has nodes which are all blank/uninitialized and the majority
of nodes run a version of Khepri with a new machine version while the
minority does not have the new machine version's code.
In this case, the cluster's effective machine version will be set to
the newer version as the majority of members have access to the new
code. The older version members will be unable to apply commands
including the `register_projection` commands that set up these ETS
tables. When these ETS tables don't exist, calls like `ets:tab2list/1`
or `ets:lookup/2` cause `badarg` errors.
We use default empty values when `ets:whereis/1` returns `undefined` for
a projection table name. Instead we could use local queries or leader
queries. Writing equivalent queries is a fair amount more work and the
code would be hard to test. `ets:whereis/1` should only return
`undefined` in the above scenario which should only be a problem in
our mixed-version testing - not in practice.
Without buildbuddy for parallelization and with the change in the parent
commit to reduce the number of cases, the suite now runs fast enough
that sharding is counterproductive.
This suite is only meant to run with Khepri as the metadata store.
Instead of setting this explicitly we can look at the configured
metadata store and conditionally skip the entire suite. This prevents
these tests from running twice in CI.
Khepri is not yet compatible with mixed-version testing and this suite
only tests clustering when Khepri is the metadata store in at least some
of the nodes.
This commit fixes https://github.com/rabbitmq/rabbitmq-server/discussions/11662
Prior to this commit the following crash occurred when an RPC reply message entered
RabbitMQ and tracing was enabled:
```
** Reason for termination ==
** {function_clause,
[{rabbit_trace,'-tap_in/6-fun-0-',
[{virtual_reply_queue,
<<"amq.rabbitmq.reply-to.g1h2AA5yZXBseUAyNzc5NjQyMAAAC1oAAAAAZo4bIw==.+Uvn1EmAp0ZA+oQx2yoQFA==">>}],
[{file,"rabbit_trace.erl"},{line,62}]},
{lists,map,2,[{file,"lists.erl"},{line,1559}]},
{rabbit_trace,tap_in,6,[{file,"rabbit_trace.erl"},{line,62}]},
{rabbit_channel,handle_method,3,
[{file,"rabbit_channel.erl"},{line,1284}]},
{rabbit_channel,handle_cast,2,
[{file,"rabbit_channel.erl"},{line,659}]},
{gen_server2,handle_msg,2,[{file,"gen_server2.erl"},{line,1056}]},
{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,241}]}]}
```
(Note that no trace message is emitted for messages that are delivered to
direct reply to requesting clients (neither in 3.12, nor in 3.13, nor
after this commit). This behaviour can be added in future when a direct
reply virtual queue becomes its own queue type.)
[How]
We must check the return value of `rabbit_ct_broker_helpers:run_steps/2`
because it could ask that the testsuite/testgroup/testcase should be
skipped.
Require all MQTT feature flags and remove their compatibility code:
* delete_ra_cluster_mqtt_node
* rabbit_mqtt_qos0_queue
* mqtt_v5
These feature flags were introduced in or before 3.13.0.
Fixes issue raised in discussion #11653
The user used the following env var:
```
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-setcookie FOOBARBAZ"
```
...and the cookie was not passed to the peer node for peer discovery.
Add tests which shovel messages via AMQP from an 3.13 cluster to a 4.0
cluster and vice versa.
This test ensures that a 3.13 AMQP 1.0 client can communicate with a 4.0
node, which isn't tested anywhere else since all other mixed version
tests use the new 4.0 AMQP 1.0 client.
This commit is a follow up of https://github.com/rabbitmq/rabbitmq-server/pull/11604
This commit changes the AMQP address format v2 from
```
/e/:exchange/:routing-key
/e/:exchange
/q/:queue
```
to
```
/exchanges/:exchange/:routing-key
/exchanges/:exchange
/queues/:queue
```
Advantages:
1. more user friendly
2. matches nicely with the plural forms of HTTP API v1 and HTTP API v2
This plural form is still non-overlapping with AMQP address format v1.
Although it might feel unusual at first to send a message to `/queues/q1`,
if you think about `queues` just being a namespace or entity type, this
address format makes sense.
`rabbit_runtime_parameters:value_global/2` was only used in
`rabbit_nodes:cluster_name/0` since near the beginning of the commit
history of the server and its usage was eliminated in 06932b9fcb
(#3085, released in v3.8.17+ and v3.9.0+).
`rabbit_runtime_parameters:value/4` doesn't appear to have been ever
used since it was introduced near the beginning of the commit history.
It may have been added just to mirror `value_global/2`'s interface.
Eliminating these dead functions allows us to also eliminate a somewhat
complicated function `rabbit_db_rtparams:get_or_set/2`.
Partially copy file
https://github.com/ninenines/cowlib/blob/optimise-urldecode/src/cow_uri.erl
We use this copy because:
1. uri_string:unquote/1 is lax: It doesn't validate that characters that are
required to be percent encoded are indeed percent encoded. In RabbitMQ,
we want to enforce that proper percent encoding is done by AMQP clients.
2. uri_string:unquote/1 and cow_uri:urldecode/1 in cowlib v2.13.0 are both
slow because they allocate a new binary for the common case where no
character was percent encoded.
When a new cowlib version is released, we should make app rabbit depend on
app cowlib calling cow_uri:urldecode/1 and delete this file (rabbit_uri.erl).
to distinguish between v1 and v2 address formats.
Previously, v1 and v2 address formats overlapped and behaved differently
for example for:
```
/queue/:queue
/exchange/:exchange
```
This PR changes the v2 format to:
```
/e/:exchange/:routing-key
/e/:exchange
/q/:queue
```
to distinguish between v1 and v2 addresses.
This allows to call `rabbit_deprecated_features:is_permitted(amqp_address_v1)`
only if we know that the user requests address format v1.
Note that `rabbit_deprecated_features:is_permitted/1` should only
be called when the old feature is actually used.
Use percent encoding / decoding for address URI format v2.
This allows to use any UTF-8 encoded characters including slashes (`/`)
in routing keys, exchange names, and queue names and is more future
safe.
test `confirm_nack` had a race condition that mean either ack or nack
where likely outcomes. By stopping the queue before the publish we
ensure only nack is the valid outcome.
`rabbit_mgmt_util:internal_server_error/4` expects an atom or binary
and a string formattable term (`~ts`) as arguments but
`rabbit_mgmt_wm_vhost` passes charlists and any term. This can cause
a log formatter crash and an unexpected message in the management UI
when attempting to add a vhost while a cluster is in a minority with
Khepri enabled for example.
We can pass atoms for the `Error` parameter and binaries or strings for
the `Reason` parameter to fix both issues.
We don't need to duplicate so many patterns in so many
files since we have a monorepo (and want to keep it).
If I managed to miss something or remove something that
should stay, please put it back. Note that monorepo-wide
patterns should go in the top-level .gitignore file.
Other .gitignore files are for application or folder-
specific patterns.
build(deps-dev): bump org.junit.jupiter:junit-jupiter-params from 5.10.2 to 5.10.3 in /deps/rabbitmq_auth_backend_http/examples/rabbitmq_auth_backend_spring_boot
## What?
The rabbit_queue_type API has allowed to cancel a consumer.
Cancelling a consumer merely stops the queue sending more messages
to the consumer. However, messages checked out to the cancelled consumer
remain checked out until acked by the client. It's up to the client when
and whether it wants to ack the remaining checked out messages.
For AMQP 0.9.1, this behaviour is necessary because the client may
receive messages between the point in time it sends the basic.cancel
request and the point in time it receives the basic.cancel_ok response.
AMQP 1.0 is a better designed protocol because a receiver can stop a
link as shown in figure 2.46.
After a link is stopped, the client knows that the queue won't deliver
any more messages.
Once the link is stopped, the client can subsequently detach the link.
This commit extends the rabbit_queue_type API to allow a consumer being
immediately removed:
1. Any checked out messages to that receiver will be requeued by the
queue. (As explained previously, a client that first stops a link
by sending a FLOW with `link_credit=0` and `echo=true` and waiting
for the FLOW reply from the server and settles any messages before
it sends the DETACH frame, won't have any checked out messages).
2. The queue entirely forgets this consumer and therefore stops
delivering messages to the receiver.
This new behaviour of consumer removal is similar to what happens when
an AMQP 0.9.1 channel is closed: All checked out messages to that
channel will be requeued.
## Why?
Removing the consumer immediately simplifies many aspects:
1. The server session process doesn't need to requeue any checked out
messages for the receiver when the receiver detaches the link.
Specifically, messages in the outgoing_unsettled_map and
outgoing_pending queue don't need to be requeued because the queue
takes care of requeueing any checked out messages.
2. It simplifies reasoning about clients first detaching and then
re-attaching in the same session with the same link handle (the handle
becomes available for re-use once a link is closed): This will result
in the same RabbitMQ queue consumer tag.
3. It simplifies queue implementations since state needs to be hold and
special logic needs to be applied to consumers that are only
cancelled (basic.cancel AMQP 0.9.1) but not removed.
4. It makes the single active consumer feature safer when it comes to
maintaning message order: If a client cancels consumption via AMQP
0.9.1 basic.cancel, but still has in-flight checked out messages,
the queue will activate the next consumer. If the AMQP 0.9.1 client
shortly after crashes, messages to the old consumer will be requeued
which results in message being out of order. To maintain message order,
an AMQP 0.9.1 client must close the whole channel such that messages
are requeued before the next consumer is activated.
For AMQP 1.0, the client can either stop the link first (preferred)
or detach the link directly. Detaching the link will requeue all
messages before activating the next consumer, therefore maintaining
message order. Even if the session crashes, message order will be
maintained.
## How?
`rabbit_queue_type:cancel` accepts a spec as argument.
The new interaction between session proc and classic queue proc (let's
call it cancel API v2) must be hidden behind a feature flag.
This commit re-uses feature flag credit_api_v2 since it also gets
introduced in 4.0.
Fix the following sporadic failure:
```
=== === Reason: {assertMatch,
[{module,amqp_client_SUITE},
{line,446},
{expression,
"rabbitmq_amqp_client : delete_queue ( LinkPair , QName )"},
{pattern,"{ ok , # { message_count := NumMsgs } }"},
{value,{ok,#{message_count => 29}}}]}
in function amqp_client_SUITE:sender_settle_mode_mixed/1 (amqp_client_SUITE.erl, line 446)
```
The last message (30th) is send as settled.
It apparently happened that all messages up to 29 got stored.
The 29th message also got confirmed.
Subsequently the queue got deleted with only 29 ready messages.
Bumping NumbMsgs to 31 ensures that the last message is sent unsettled.
Fix the following sporadich error in CI:
```
=== Location: [{amqp_client_SUITE,available_messages,3137},
{test_server,ts_tc,1793},
{test_server,run_test_case_eval1,1302},
{test_server,run_test_case_eval,1234}]
=== === Reason: {assertEqual,
[{module,amqp_client_SUITE},
{line,3137},
{expression,"get_available_messages ( Receiver )"},
{expected,5000},
{value,0}]}
```
The client decrements the available variable from 1 to 0 when it
receives the transfer and sends a credit_exhausted event to the CT test
case proc. The CT test case proc queries the client session proc for available
messages, which is still 0. The FLOW frame from RabbitMQ to the client with the
available=5000 set could arrive shortly after.
Avoid the following unexpected error in mixed version testing where
feature flag credit_api_v2 is disabled:
```
[error] <0.1319.0> Timed out waiting for credit reply from quorum queue 'stop' in vhost '/'. Hint: Enable feature flag credit_api_v2
[warning] <0.1319.0> Closing session for connection <0.1314.0>: {'v1_0.error',
[warning] <0.1319.0> {symbol,<<"amqp:internal-error">>},
[warning] <0.1319.0> {utf8,
[warning] <0.1319.0> <<"Timed out waiting for credit reply from quorum queue 'stop' in vhost '/'. Hint: Enable feature flag credit_api_v2">>},
[warning] <0.1319.0> undefined}
[error] <0.1319.0> ** Generic server <0.1319.0> terminating
[error] <0.1319.0> ** Last message in was {'$gen_cast',
[error] <0.1319.0> {frame_body,
[error] <0.1319.0> {'v1_0.flow',
[error] <0.1319.0> {uint,283},
[error] <0.1319.0> {uint,65535},
[error] <0.1319.0> {uint,298},
[error] <0.1319.0> {uint,4294967295},
[error] <0.1319.0> {uint,1},
[error] <0.1319.0> {uint,282},
[error] <0.1319.0> {uint,50},
[error] <0.1319.0> {uint,0},
[error] <0.1319.0> undefined,undefined,undefined}}}
```
Presumably, the server session proc timed out receiving a credit reply from
the quorum queue because the test case deleted the quorum queue
concurrently with the client (and therefore also the server session
process) topping up link credit.
This commit detaches the link first and ends the session synchronously
before deleting the quorum queue.
The DIST step used rsync for copying files; changing this
to using cp/rm provides a noticeable speed boost.
Before this commit the situation was as follows. With
FAST_RUN_BROKER=1 we are pretty fast but don't benefit
from parallel make:
make -C deps/rabbitmq_management run-broker FAST_RUN_BROKER=1
2,04s user 1,57s system 90% cpu 4,016 total
make -C deps/rabbitmq_management run-broker FAST_RUN_BROKER=1 -j8
2,08s user 1,55s system 89% cpu 4,069 total
With FAST_RUN_BROKER=0 we are slow; on the other hand
we greatly benefit from parallel make:
make -C deps/rabbitmq_management run-broker FAST_RUN_BROKER=0
3,29s user 1,93s system 81% cpu 6,425 total
make -C deps/rabbitmq_management run-broker FAST_RUN_BROKER=0 -j8
3,36s user 1,90s system 142% cpu 3,695 total
The reason this method achieves such a result is because
the DIST step that takes a lot of time can be run in
parallel. In addition, this method results on only
the necessary plugins being available in the path,
therefore it doesn't discover unrelated plugins
during node startup, saving time.
By changing rsync to cp/rm, we get great results even
without parallel make:
make -C deps/rabbitmq_management run-broker FAST_RUN_BROKER=0
3,28s user 1,64s system 105% cpu 4,684 total
make -C deps/rabbitmq_management run-broker FAST_RUN_BROKER=0 -j8
3,27s user 1,65s system 135% cpu 3,640 total
We are within 1s of FAST_RUN_BROKER=1 by default, and
faster than FAST_RUN_BROKER=1 with parallel make. On
top of that, we greatly benefit when rebuilding as the
DIST files do not need to be rebuilt every time:
make -C deps/rabbitmq_management run-broker FAST_RUN_BROKER=0
2,94s user 1,40s system 107% cpu 4,035 total
make -C deps/rabbitmq_management run-broker FAST_RUN_BROKER=0 -j8
2,85s user 1,51s system 138% cpu 3,140 total
Therefore it only makes sense to remove FAST_RUN_BROKER,
and instead use the old method which is both more correct
and has more potential for optimisation.
This commit attempts to remove the following flake:
```
{amqp_client_SUITE,server_closes_link,1113}
{badmatch,[<14696.3530.0>,<14696.3453.0>]}
```
by waiting after each test case until sessions were de-registered from
the 1st RabbitMQ node.
## What?
This commit fixes issues that were present only on `main`
branch and were introduced by #9022.
1. Classic queues (specifically `rabbit_queue_consumers:subtract_acks/3`)
expect message IDs to be (n)acked in the order as they were delivered
to the channel / session proc.
Hence, the `lists:usort(MsgIds0)` in `rabbit_classic_queue:settle/5`
was wrong causing not all messages to be acked adding a regression
to also AMQP 0.9.1.
2. The order in which the session proc requeues or rejects multiple
message IDs at once is important. For example, if the client sends a
DISPOSITION with first=3 and last=5, the message IDs corresponding to
delivery IDs 3,4,5 must be requeued or rejected in exactly that
order.
For example, quorum queues use this order of message IDs in
34d3f94374/deps/rabbit/src/rabbit_fifo.erl (L226-L234)
to dead letter in that order.
## How?
The session proc will settle (internal) message IDs to queues in ascending
(AMQP) delivery ID order, i.e. in the order messages were sent to the
client and in the order messages were settled by the client.
This commit chooses to keep the session's outgoing_unsettled_map map
data structure.
An alternative would have been to use a queue or lqueue for the
outgoing_unsettled_map as done in
* 34d3f94374/deps/rabbit/src/rabbit_channel.erl (L135)
* 34d3f94374/deps/rabbit/src/rabbit_queue_consumers.erl (L43)
Whether a queue (as done by `rabbit_channel`) or a map (as done by
`rabbit_amqp_session`) performs better depends on the pattern how
clients ack messages.
A queue will likely perform good enough because usually the oldest
delivered messages will be acked first.
However, given that there can be many different consumers on an AQMP
0.9.1 channel or AMQP 1.0 session, this commit favours a map because
it will likely generate less garbage and is very efficient when for
example a single new message (or few new messages) gets acked while
many (older) messages are still checked out by the session (but by
possibly different AMQP 1.0 receivers).
It has largely been superseded by `perf`. It is no longer
generally useful. It can always be added to BUILD_DEPS for
the rare cases it is needed, or installed locally and
pointed to by setting its path to ERL_LIBS.
BEFORE: time gmake -C deps/rabbit ct-dynamic_qq 1.92s user 1.44s system 2% cpu 2:23.56 total
AFTER: time gmake -C deps/rabbit ct-dynamic_qq 1.66s user 1.22s system 2% cpu 1:56.44 total
Reduce the number of tests that are run for 2 nodes.
BEFORE: time gmake -C deps/rabbit ct-rabbit_stream_queue 7.22s user 5.72s system 2% cpu 8:28.18 total
AFTER time gmake -C deps/rabbit ct-rabbit_stream_queue 27.04s user 8.43s system 10% cpu 5:38.63 total
in rabbitmq.conf.
Note that this does not include any tests because
the test would have to use a writeable directory,
and it is not obvious what kind of cross-platform
path that is not computed programmatically they
could use.
It's a trivial schema file that uses an existing
core validator => let's leave it as is.
The FD limits are still valuable.
The FD used will still show some information during CQv1
upgrade to v2 so it is kept for now. But in the future
it will have to be reworked to query the system, or be
removed.