Prior to this commit, test
```
make -C deps/rabbitmq_mqtt ct-mqtt_shared t=[mqtt,cluster_size_1,v4]:non_clean_sess_reconnect_qos0_and_qos1
```
flaked in CI with error:
```
{mqtt_shared_SUITE,non_clean_sess_reconnect_qos0_and_qos1,972}
{badmatch,{publish_not_received,<<"msg-0">>}}
```
The problem was the following race condition:
* The MQTT v4 client sends an async DISCONNECT
* The global MQTT consumer metric got decremented. However, the classic
queue still has the MQTT connection proc registered as consumer.
* The test case sends a message
* The classic queue checks out the message to the old connection instead
of checking out the message to the new connection.
The solution in this commit is to check the consumer count of the
classic queue before proceeding to send the message after disconnection.
Expose the same metrics for AMQP 1.0 connections as for AMQP 0.9.1 connections.
Display the following AMQP 1.0 metrics on the Management UI:
* Network bytes per second from/to client on connections page
* Number of sessions/channels on connections page
* Network bytes per second from/to client graph on connection page
* Reductions graph on connection page
* Garbage colletion info on connection page
Expose the following AMQP 1.0 per-object Prometheus metrics:
* rabbitmq_connection_incoming_bytes_total
* rabbitmq_connection_outgoing_bytes_total
* rabbitmq_connection_process_reductions_total
* rabbitmq_connection_incoming_packets_total
* rabbitmq_connection_outgoing_packets_total
* rabbitmq_connection_pending_packets
* rabbitmq_connection_channels
The rabbit_amqp_writer proc:
* notifies the rabbit_amqp_reader proc if it sent frames
* hibernates eventually if it doesn't send any frames
The rabbit_amqp_reader proc:
* does not emit stats (update ETS tables) if no frames are received
or sent to save resources when there are many idle connections.
[Why]
Before this patch, required feature flags were basically checked during
boot: they must have been enabled when they were mere stable feature
flags. If they were not, the node refused to boot.
This was easy for the developer because making a feature flag required
allowed to remove the entire compatibility code. Very satisfying.
Unfortunately, this was a pain point to end users, especially those who
did not pay attention to RabbitMQ and the release notes and were just
asking their package manager to update everything. They could end up
with a node that refuse to boot. The only solution was to downgrade,
enable the disabled stable feature flags, upgrade again.
[How]
This patch introduces two levels of requirement to required feature
flags:
* `hard`: this corresponds to the existing behavior where a node will
refuse to boot if a hard required feature flag is not enabled before
the upgrade.
* `soft`: such a required feature flag will be automatically enabled
during the upgrade to a version where it is marked as required.
The level of requirement is set in the feature flag definition:
-rabbit_feature_flag(
{my_feature_flag,
#{stability => required,
require_level => hard
}}).
The default requirement level is `soft`. All existing required feature
flags have now a requirement level of `hard`.
The handling of soft required feature flag is done when the cluster
feature flags states are verified and synchronized. If a required
feature flag is not enabled yet, it is enabled at that time.
This means that as developers, we will have to keep compatibility code
forever for every soft required feature flag, like the feature flag
definition itself.
Support x-cc message annotation
Support an `x-cc` message annotation in AMQP 1.0
similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1.
The value of the `x-cc` message annotation must by a list of strings.
A message annotation is used since application properties allow only simple types.
This commit attempts to eliminate the test flake described in
https://github.com/rabbitmq/rabbitmq-server/issues/12413#issuecomment-2385449940
```
rabbitmq_mqtt > parallel-ct-set-1 > mqtt_shared_SUITE > cluster_size_3 > v4 rabbit_mqtt_qos0_queue_kill_node
=== Ended at 2024-10-01 09:59:52
=== Location: [{mqtt_shared_SUITE,rabbit_mqtt_qos0_queue_kill_node,[1165](https://github.com/rabbitmq/rabbitmq-server/issues/mqtt_shared_suite.src.html#1165)},
{test_server,ts_tc,1793},
{test_server,run_test_case_eval1,1302},
{test_server,run_test_case_eval,1234}]
=== === Reason: no match of right hand side value {publish_not_received,
<<"m1">>}
in function mqtt_shared_SUITE:rabbit_mqtt_qos0_queue_kill_node/1 (mqtt_shared_SUITE.erl, line 1165)
in call from test_server:ts_tc/3 (test_server.erl, line 1793)
in call from test_server:run_test_case_eval1/6 (test_server.erl, line 1302)
in call from test_server:run_test_case_eval/9 (test_server.erl, line 1234)
```
This flake could not be reproduced locally.
This commit also assumes that this flake occurred under Khepri but not
under Mnesia.
The hypothesis is the following:
* Node 0 is down
* MQTT client creates binding on node 1
* Khepri commits since the binding is replicated and persisted on node 1
and node 2. However the binding isn't reflected yet in node 2's
routing projecting table.
* Publishing a message to node 2 routes to nowhere.
Instead of every time we run Make for these applications.
This means that during development we are free to modify
these values or create new test suites without having to
worry about the check. If we forget to then add the test
suites in PARALLEL_CT the workflow will tell us.
The problem comes from `ct_master` which doesn't tell us
in the return value whether the tests succeeded. In order
to get that information a CT hook was created. But then
we run into another problem: despite its documentation
claiming otherwise, `ct_master` does not handle `ct_hooks`
instructions in the test spec.
So for the time being we fork `ct_master` into a new
`ct_master_fork` module and insert our hook directly
in the code. Later on we will submit patches to OTP.
All CT logs will now be under <toplevel>/logs. An improved
test workflow would be to always keep the logs/all_runs.html
page open in the browser and refresh it whenever tests are
run in any of the rabbit applications.
The shared test suite was renamed only for clarity, but the
Web-MQTT test suites were renamed out of necessity: since
we are now adding the MQTT test directory to the code path
we need test suites to have different names to avoid
conflicts. We can't (easily) addpath only for this test suite
either since CT hooks don't call functions in a predictable
enough manner; it would always be hacky.
This is a proof of concept that mostly works but is missing
some tests, such as rabbitmq_mqtt or rabbitmq_cli. It also
doesn't apply to mixed version testing yet.
* Add global histogram metrics for received message sizes per-protocol
fixup: add new files to bazel
fixup: expose message_size_bytes as prometheus classic histogram type
`rabbit_msg_size_metrics` does not use `seshat` any more, but
`counters` directly.
fixup: add msg_size_metrics unit test
* Improve message size histogram
1.
Avoid unnecessary time series emitted for stream protocol
The stream protocol cannot observe message sizes.
This commit ensures that the following time series are omitted:
```
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="64"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="256"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="1024"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="4096"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="16384"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="65536"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="262144"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="1048576"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="4194304"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="16777216"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="67108864"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="268435456"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="+Inf"} 0
rabbitmq_global_message_size_bytes_count{protocol="stream"} 0
rabbitmq_global_message_size_bytes_sum{protocol="stream"} 0
```
This reduces the number of time series by 15.
2.
Further reduce the number of time series by reducing the number of
buckets. Instead of 13 bucktes, emit only 9 buckets. Buckets are not
free, each is an extra time series stored.
Prior to this commit:
```
curl -s -u guest:guest localhost:15692/metrics | ag message_size | wc -l
92
```
After this commit:
```
curl -s -u guest:guest localhost:15692/metrics | ag message_size | wc -l
57
```
3.
The emitted metric should be called
`rabbitmq_message_size_bytes_bucket` instead of `rabbitmq_global_message_size_bytes_bucket`.
The latter is poor naming. There is no need to use `global` in
the metric name given that this metric doesn't exist in the old flawed
aggregated metrics.
4.
This commit simplies module `rabbit_global_counters`.
5.
Avoid garbage collecting the 10-elements list of buckets per message
being received.
---------
Co-authored-by: Péter Gömöri <peter@84codes.com>
Given that the default max_message_size got decreased from 128 MiB to 16
MiB in RabbitMQ 4.0 in https://github.com/rabbitmq/rabbitmq-server/pull/11455,
it makes sense to also decrease the default MQTT Maximum Packet Size from 256 MiB to 16 MiB.
Since this change was missed in RabbitMQ 4.0, it is scheduled for RabbitMQ 4.1.
This return value was already possible since a classic queue will return
it during termination if `rabbit_amqqueue:internal_delete/2` fails with
that value.
`rabbit_amqqueue:delete/4` already handles this value and converts it
into a protocol error and channel exit. The other caller (MQTT
processor) will be updated in a child commit.
This commit also replaces eager conversions to protocol errors in
rabbit_classic_queue, rabbit_quorum_queue and rabbit_stream_coordinator:
we should return `{error, timeout}` consistently and not hide it in
protocol errors.
This reverts commit 620fff22f1.
It intoduced a regression in another area - a TCP health check,
such as the default (with cluster-operator) readinessProbe,
on a TLS-enabled instance would log a `rabbit_reader` crash
every few seconds:
```
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0> crasher:
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0> initial call: rabbit_reader:init/3
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0> pid: <0.999.0>
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0> registered_name: []
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0> exception error: no match of right hand side value {error, handshake_failed}
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0> in function rabbit_reader:init/3 (rabbit_reader.erl, line 171)
```
The default of 0.4 was very conservative even when it was
set years ago. Since then:
- we moved to CQv2, which have much more predictable memory usage than (non-lazy) CQv1 used to
- we removed CQ mirroring which caused large sudden memory spikes in some situations
- we removed the option to store message payload in memory in quorum queues
For the past two years or so, we've been running all our internal tests and benchmarks
using the value of 0.8 with no OOMkills at all (note: we do this on
Kubernetes where the Cluster Operators overrides the available memory
levaing some additional headroom, but effectively we are still using more than
0.6 of memory).
1.
Prior to this commit, closing a stream connection via:
```
./sbin/rabbitmqctl close_all_user_connections guest enough
```
crashed the stream process as follows:
```
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> crasher:
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> initial call: rabbit_stream_reader:init/1
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> pid: <0.1098.0>
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> registered_name: []
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> exception error: no function clause matching
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> rabbit_stream_reader:open({call,
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> {<0.1233.0>,
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> #Ref<0.519694519.1387790337.15898>}},
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> {shutdown,<<"enough">>},
```
This commit fixes this crash.
2.
Both CLI commands and management plugin use the same way
to close MQTT, Web MQTT, and Stream connections: They all send a message
via `Pid ! {shutdown, Reason}` to the connection.
3.
This commit avoids making `rabbit` core app to know about
'Web MQTT'.
4
This commit simplifies rabbit_mqtt_reader by avoiding another
handle_call clause
1. Only run the CLI tests on a single node cluster. The shared_SUITE is
already very big. Testing the same CLI commands against node-0 on a
3-node cluster brings no benefit.
2. Move the two new CLI test cases in front of
management_plugin_connection because they are similar in that all
three tests close the MQTT connection.
3. There is no need to query the HTTP API for the two new CLI test
cases.
4. There is no need to set keepalive in the two new CLI test cases.
`{shutdown, Reason}` must be handled into handle_call and not handle_info
`rabbitmqctl close_all_user_connections` calls rabbit_reader which does
a call into the process, the same as rabbitmq_management
This commit is a breaking change in RabbitMQ 4.0.
## What?
Remove mqtt.default_user and mqtt.default_pass
Instead, rabbit.anonymous_login_user and rabbit.anonymous_login_pass
should be used.
## Why?
RabbitMQ 4.0 simplifies anonymous logins.
There should be a single configuration place
```
rabbit.anonymous_login_user
rabbit.anonymous_login_pass
```
that is used for anonymous logins for any protocol.
Anonymous login is orthogonal to the protocol the client uses.
Hence, there should be a single configuration place which can then be
used for MQTT, AMQP 1.0, AMQP 0.9.1, and RabbitMQ Stream protocol.
This will also simplify switching to SASL for MQTT 5.0 in the future.
This fixes a case-clause crash in the logs in `cluster_minority_SUITE`.
When the database is not available `rabbit_amqqueue:declare/6,7` should
return a `protocol_error` record with an error message rather than a
hard crash. Also included in this change is the necessary changes to
typespecs: `rabbit_db_queue:create_or_get/1` is the first function to
return a possible `{error,timeout}`. That bubbles up through
`rabbit_amqqueue:internal_declare/3` and must be handled in each
`rabbit_queue_type:declare/2` callback.
This commit contains the following new quorum queue features:
* Fair share high/low priorities
* SAC consumers honour consumer priorities
* Credited consumer refactoring to meet AMQP requirements.
* Use checkpoints feature to reduce memory use for queues with long backlogs
* Consumer cancel option that immediately removes consumer and returns all pending messages.
* More compact commands of the most common commands such as enqueue, settle and credit
* Correctly track the delivery-count to be compatible with the AMQP spec
* Support the "modified" AMQP 1.0 outcome better.
Commits:
* Quorum queues v4 scaffolding.
Create the new version but not including any changes yet.
QQ: force delete followers after leader has terminated.
Also try a longer sleep for mqtt_shared_SUITE so that the
delete operation stands a chance to time out and move on
to the forced deletion stage.
In some mixed machine version scenarios some followers will never
apply the poison pill command so we may as well force delete them
just in case.
QQ: skip test in amqp_client that cannot pass with mixed machine versions
QQ: remove dead code
Code relating to prior machine versions and state conversions.
rabbit_fifo_prop_SUITE fixes
* QQ: add v4 ff and new more compact enqueue command.
Also update rabbit_fifo_* suites to test more relevant code versions
where applicable.
QQ: always use the updated credit mode format
QQv4: use more compact consumer reference in settle, credit, return
This introudces a new type: consumer_key() which is either the consumer_id
or the raft index the checkout was processed at. If the consumer is
using one of the updated credit spec formats rabbit_fifo will use the
raft index as the primary key for the consumer such that the rabbit
fifo client can then use the more space efficient integer index
instead of the full consumer id in subsequent commands.
There is compatibility code to still accept the consumer id in
settle, return, discard and credit commands but this is slighlyt
slower and of course less space efficient.
The old form will be used in cases where the fifo client may have
already remove the local consumer state (as happens after a cancel).
Lots of test refactorings of the rabbit_fifo_SUITE to begin to use
the new forms.
* More test refactoring and new API fixes
rabbit_fifo_prop_SUITE refactoring and other fixes.
* First pass SAC consumer priority implementation.
Single active consumers will be activated if they have a higher priority
than the currently active consumer. if the currently active consumer
has pending messages, no further messages will be assigned to the
consumer and the activation of the new consumer will happen once
all pending messages are settled. This is to ensure processing order.
Consumers with the same priority will internally be ordered to
favour those with credit then those that attached first.
QQ: add SAC consumer priority integration tests
QQ: add check for ff in tests
* QQ: add new consumer cancel option: 'remove'
This option immediately removes and returns all messages for a
consumer instead of the softer 'cancel' option which keeps the
consumer around until all pending messages have been either
settled or returned.
This involves a change to the rabbit_queue_type:cancel/5 API
to rabbit_queue_type:cancel/3.
* QQ: capture checked out time for each consumer message.
This will form the basis for queue initiated consumer timeouts.
* QQ: Refactor to use the new ra_machine:handle_aux/5 API
Instead of the old ra_machine:handle_aux/6 callback.
* QQ hi/lo priority queue
* QQ: Avoid using mc:size/1 inside rabbit_fifo
As we dont want to depend on external functions for things that may
change the state of the queue.
* QQ bug fix: Maintain order when returning multiple
Prior to this commit, quorum queues requeued messages in an undefined
order, which is wrong.
This commit fixes this bug and requeues messages always in the order as
nacked / rejected / released by the client.
We ensure that order of requeues is deterministic from the client's
point of view and doesn't depend on whether the quorum queue soft limit
was exceeded temporarily.
So, even when rabbit_fifo_client batches requeues, the order as nacked
by the client is still maintained.
* Simplify
* Add rabbit_quorum_queue:file_handle* functions back.
For backwards compat.
* dialyzer fix
* dynamic_qq_SUITE: avoid mixed versions failure.
* QQ: track number of requeues for message.
To be able to calculate the correct value for the AMQP delivery_count
header we need to be able to distinguish between messages that were
"released" or returned in QQ speak and those that were returned
due to errors such as channel termination.
This commit implement such tracking as well as the calculation
of a new mc annotations `delivery_count` that AMQP makes use
of to set the header value accordingly.
* Use QQ consumer removal when AMQP client detaches
This enables us to unskip some AMQP tests.
* Use AMQP address v2 in fsharp-tests
* QQ: track number of requeues for message.
To be able to calculate the correct value for the AMQP delivery_count
header we need to be able to distinguish between messages that were
"released" or returned in QQ speak and those that were returned
due to errors such as channel termination.
This commit implement such tracking as well as the calculation
of a new mc annotations `delivery_count` that AMQP makes use
of to set the header value accordingly.
* rabbit_fifo: Use Ra checkpoints
* quorum queues: Use a custom interval for checkpoints
* rabbit_fifo_SUITE: List actual effects in ?ASSERT_EFF failure
* QQ: Checkpoints modifications
* fixes
* QQ: emit release cursors on tick for followers and leaders
else followers could end up holding on to segments a bit longer
after traffic stops.
* Support draining a QQ SAC waiting consumer
By issuing drain=true, the client says "either send a transfer or a flow frame".
Since there are no messages to send to an inactive consumer, the sending
queue should advance the delivery-count consuming all link-credit and send
a credit_reply with drain=true to the session proc which causes the session
proc to send a flow frame to the client.
* Extract applying #credit{} cmd into 2 functions
This commit is only refactoring and doesn't change any behaviour.
* Fix default priority level
Prior to this commit, when a message didn't have a priority level set,
it got enqueued as high prio.
This is wrong because the default priority is 4 and
"for example, if 2 distinct priorities are implemented,
then levels 0 to 4 are equivalent, and levels 5 to 9 are equivalent
and levels 4 and 5 are distinct."
Hence, by default a message without priority set, must be enqueued as
low prio.
* bazel run gazelle
* Avoid deprecated time unit
* Fix aux_test
* Delete dead code
* Fix rabbit_fifo_q:get_lowest_index/1
* Delete unused normalize functions
* Generate less garbage
* Add integration test for QQ SAC with consumer priority
* Improve readability
* Change modified outcome behaviour
With the new quorum queue v4 improvements where a requeue counter was
added in addition to the quorum queue delivery counter, the following
sentence from https://github.com/rabbitmq/rabbitmq-server/pull/6292#issue-1431275848
doesn't apply anymore:
> Also the case where delivery_failed=false|undefined requires the release of the
> message without incrementing the delivery_count. Again this is not something
> that our queues are able to do so again we have to reject without requeue.
Therefore, we simplify the modified outcome behaviour:
RabbitMQ will from now on only discard the message if the modified's
undeliverable-here field is true.
* Introduce single feature flag rabbitmq_4.0.0
## What?
Merge all feature flags introduced in RabbitMQ 4.0.0 into a single
feature flag called rabbitmq_4.0.0.
## Why?
1. This fixes the crash in
https://github.com/rabbitmq/rabbitmq-server/pull/10637#discussion_r1681002352
2. It's better user experience.
* QQ: expose priority metrics in UI
* Enable skipped test after rebasing onto main
* QQ: add new command "modify" to better handle AMQP modified outcomes.
This new command can be used to annotate returned or rejected messages.
This commit also retains the delivery-count across dead letter boundaries
such that the AMQP header delivery-count field can now include _all_ failed
deliver attempts since the message was originally received.
Internally the quorum queue has moved it's delivery_count header to
only track the AMQP protocol delivery attempts and now introduces
a new acquired_count to track all message acquisitions by consumers.
* Type tweaks and naming
* Add test for modified outcome with classic queue
* Add test routing on message-annotations in modified outcome
* Skip tests in mixed version tests
Skip tests in mixed version tests because feature flag
rabbitmq_4.0.0 is needed for the new #modify{} Ra command
being sent to quorum queues.
---------
Co-authored-by: David Ansari <david.ansari@gmx.de>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
Test case rabbit_mqtt_qos0_queue_kill_node flaked because after an
MQTT client subscribes on node 0, RabbitMQ returns success
and replicated the new binding to node 0 and node 1, but not
yet to node 2. Another MQTT client then publishes on node 2
without the binding being present yet on node 2, and the
message therefore isn't routed.
This commit attempts to eliminate this flake.
It adds a function to rabbit_ct_broker_helpers which waits until a given
node has caught up with the leader node.
We can reuse that function in future to eliminate more test flakes.
For consistency with other protocols (to protect from potential DoS attacks).
Wrong credentials and virtual host access errors trigger the delay.
References #11831
We keep the delay low when running tests. Otherwise,
```
make -C deps/rabbitmq_mqtt ct-auth
```
would run 3 minutes longer (with a SILENT_CLOSE_DELAY of 3 seconds).
## What?
Prior to this commit connecting 40k AMQP clients with 5 sessions each,
i.e. 200k sessions in total, took 7m55s.
After to this commit the same scenario takes 1m37s.
Additionally, prior to this commit, disconnecting all connections and sessions
at once caused the pg process to become overloaded taking ~14 minutes to
process its mailbox.
After this commit, these same deregistrations take less than 5 seconds.
To repro:
```go
package main
import (
"context"
"log"
"time"
"github.com/Azure/go-amqp"
)
func main() {
for i := 0; i < 40_000; i++ {
if i%1000 == 0 {
log.Printf("opened %d connections", i)
}
conn, err := amqp.Dial(
context.TODO(),
"amqp://localhost",
&amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
if err != nil {
log.Fatal("open connection:", err)
}
for j := 0; j < 5; j++ {
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("begin session:", err)
}
}
}
log.Println("opened all connections")
time.Sleep(5 * time.Hour)
}
```
## How?
This commit uses separate pg scopes (that is processes and ETS tables) to register
AMQP connections and AMQP sessions. Since each Pid is now its own group,
registration and deregistration is fast.
Configuring the mock authentication backend blocks
and generates an error in the test process when the
broker goes down. The error report makes the test fail
in some environments.
The process where the setup takes place must stay up
otherwise the ETS table used will go away.
This commit makes sure the broker-side authentication backend
setup returns at the end of the test. This way the calling
process terminates in a normal way.
Require all MQTT feature flags and remove their compatibility code:
* delete_ra_cluster_mqtt_node
* rabbit_mqtt_qos0_queue
* mqtt_v5
These feature flags were introduced in or before 3.13.0.
This commit is a follow up of https://github.com/rabbitmq/rabbitmq-server/pull/11604
This commit changes the AMQP address format v2 from
```
/e/:exchange/:routing-key
/e/:exchange
/q/:queue
```
to
```
/exchanges/:exchange/:routing-key
/exchanges/:exchange
/queues/:queue
```
Advantages:
1. more user friendly
2. matches nicely with the plural forms of HTTP API v1 and HTTP API v2
This plural form is still non-overlapping with AMQP address format v1.
Although it might feel unusual at first to send a message to `/queues/q1`,
if you think about `queues` just being a namespace or entity type, this
address format makes sense.
to distinguish between v1 and v2 address formats.
Previously, v1 and v2 address formats overlapped and behaved differently
for example for:
```
/queue/:queue
/exchange/:exchange
```
This PR changes the v2 format to:
```
/e/:exchange/:routing-key
/e/:exchange
/q/:queue
```
to distinguish between v1 and v2 addresses.
This allows to call `rabbit_deprecated_features:is_permitted(amqp_address_v1)`
only if we know that the user requests address format v1.
Note that `rabbit_deprecated_features:is_permitted/1` should only
be called when the old feature is actually used.
Use percent encoding / decoding for address URI format v2.
This allows to use any UTF-8 encoded characters including slashes (`/`)
in routing keys, exchange names, and queue names and is more future
safe.
We don't need to duplicate so many patterns in so many
files since we have a monorepo (and want to keep it).
If I managed to miss something or remove something that
should stay, please put it back. Note that monorepo-wide
patterns should go in the top-level .gitignore file.
Other .gitignore files are for application or folder-
specific patterns.
* MQTT: speed up shared_SUITE:many_qos1_messages
* speed up block_only_publisher
* MQTT: reorganise tests groups
To avoid starting a new broker for each protocol group (v3,v4,v5).
Instead we run all protocol groups under a single cluster configuration
group.
* MQTT: speed up publish_to_all_queue_types_qos* tests.
* Remove separate mnesia_store group
to speed up the test suite
* Fix wrong_shard_count
* Remove unused field
* Run subset of v3 tests
The code being tested under v3 and v4 is almost identical.
To save time in CI, we therefore run only a very small subset of tests in v3.
This cuts the total time reported by CT for the shared_SUITE from 898
seconds to 614 seconds.
Also, the java_SUITE tests in v3.
* Fix wrong_shard_count
* Fix mixed version failure
---------
Co-authored-by: David Ansari <david.ansari@gmx.de>
## What?
Introduce RabbitMQ internal flow control for messages sent to AMQP
clients.
Prior this PR, when an AMQP client granted a large amount of link
credit (e.g. 100k) to the sending queue, the sending queue sent
that amount of messages to the session process no matter what.
This becomes problematic for memory usage when the session process
cannot send out messages fast enough to the AMQP client, especially if
1. The writer proc cannot send fast enough. This can happen when
the AMQP client does not receive fast enough and causes TCP
back-pressure to the server. Or
2. The server session proc is limited by remote-incoming-window.
Both scenarios are now added as test cases.
Tests
* tcp_back_pressure_rabbitmq_internal_flow_quorum_queue
* tcp_back_pressure_rabbitmq_internal_flow_classic_queue
cover scenario 1.
Tests
* incoming_window_closed_rabbitmq_internal_flow_quorum_queue
* incoming_window_closed_rabbitmq_internal_flow_classic_queue
cover scenario 2.
This PR sends messages from queues to AMQP clients in a more controlled
manner.
To illustrate:
```
make run-broker PLUGINS="rabbitmq_management" RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 4"
observer_cli:start()
mq
```
where `mq` sorts by message queue length.
Create a stream:
```
deps/rabbitmq_management/bin/rabbitmqadmin declare queue name=s1 queue_type=stream durable=true
```
Next, send and receive from the Stream via AMQP.
Grant a large number of link credit to the sending stream:
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
bash-5.1# quiver //host.docker.internal//queue/s1 --durable -d 30s --credit 100000
```
**Before** to this PR:
```
RESULTS
Count ............................................... 100,696 messages
Duration ............................................... 30.0 seconds
Sender rate ......................................... 120,422 messages/s
Receiver rate ......................................... 3,363 messages/s
End-to-end rate ....................................... 3,359 messages/s
```
We observe that all 100k link credit worth of messages are buffered in the
writer proc's mailbox:
```
|No | Pid | MsgQueue |Name or Initial Call | Memory | Reductions |Current Function |
|1 |<0.845.0> |100001 |rabbit_amqp_writer:init/1 | 126.0734 MB| 466633491 |prim_inet:send/5 |
```
**After** to this PR:
```
RESULTS
Count ............................................. 2,973,440 messages
Duration ............................................... 30.0 seconds
Sender rate ......................................... 123,322 messages/s
Receiver rate ........................................ 99,250 messages/s
End-to-end rate ...................................... 99,148 messages/s
```
We observe that the message queue lengths of both writer and session
procs are low.
## How?
Our goal is to have queues send out messages in a controlled manner
without overloading RabbitMQ itself.
We want RabbitMQ internal flow control between:
```
AMQP writer proc <--- session proc <--- queue proc
```
A similar concept exists for classic queues sending via AMQP 0.9.1.
We want an approach that applies to AMQP and works generic for all queue
types.
For the interaction between AMQP writer proc and session proc we use a
simple credit based approach reusing module `credit_flow`.
For the interaction between session proc and queue proc, the following options
exist:
### Option 1
The session process provides expliclity feedback to the queue after it
has sent N messages.
This approach is implemented in
https://github.com/ansd/rabbitmq-server/tree/amqp-flow-control-poc-1
and works well.
A new `rabbit_queue_type:sent/4` API was added which lets the queue proc know
that it can send further messages to the session proc.
Pros:
* Will work equally well for AMQP 0.9.1, e.g. when quorum queues send messages
in auto ack mode to AMQP 0.9.1 clients.
* Simple for the session proc
Cons:
* Sligthly added complexity in every queue type implementation
* Multiple Ra commands (settle, credit, sent) to decide when a quorum
queue sends more messages.
### Option 2
A dual link approach where two AMQP links exists between
```
AMQP client <---link--> session proc <---link---> queue proc
```
When the client grants a large amount of credits, the session proc will
top up credits to the queue proc periodically in smaller batches.
Pros:
* No queue type modifications required.
* Re-uses AMQP link flow control
Cons:
* Significant added complexity in the session proc. A client can
dynamically decrease or increase credits and dynamically change the drain
mode while the session tops up credit to the queue.
### Option 3
Credit is a 32 bit unsigned integer.
The spec mandates that the receiver independently chooses a credit.
Nothing in the spec prevents the receiver to choose a credit of 1 billion.
However the credit value is merely a **maximum**:
> The link-credit variable defines the current maximum legal amount that the delivery-count can be increased by.
Therefore, the server is not required to send all available messages to this
receiver.
For delivery-count:
> Only the sender MAY independently modify this field.
"independently" could be interpreted as the sender could add to the delivery-count
irrespective of what the client chose for drain and link-credit.
Option 3: The queue proc could at credit time already consume credit
and advance the delivery-count if credit is too large before checking out any messages.
For example if credit is 100k, but the queue only wants to send 1k, the queue could
consume 99k of credits and advance the delivery-count, and subsequently send maximum 1k messages.
If the queue advanced the delivery-count, RabbitMQ must send a FLOW to the receiver,
otherwise the receiver wouldn’t know that it ran out of link-credit.
Pros:
* Very simple
Cons:
* Possibly unexpected behaviour for receiving AMQP clients
* Possibly poor end-to-end throughput in auto-ack mode because the queue
would send a batch of messages followed by a FLOW containing the advanced
delivery-count. Only therafter the client will learn that it ran out of
credits and top-up again. This feels like synchronously pulling a batch
of messages. In contrast, option 2 sends out more messages as soon as
the previous messages left RabbitMQ without requiring again a credit top
up from the receiver.
* drain mode with large credits requires the queue to send all available
messages and only thereafter advance the delivery-count. Therefore,
drain mode breaks option 3 somewhat.
### Option 4
Session proc drops message payload when its outgoing-pending queue gets
too large and re-reads payloads from the queue once the message can be
sent (see `get_checked_out` Ra command for quorum queues).
Cons:
* Would need to be implemented for every queue type, especially classic queues
* Doesn't limit the amount of message metadata in the session proc's
outgoing-pending queue
### Decision: Option 2
This commit implements option 2 to avoid any queue type modification.
At most one credit request is in-flight between session process and
queue process for a given queue consumer.
If the AMQP client sends another FLOW in between, the session proc
stashes the FLOW until it processes the previous credit reply.
A delivery is only sent from the outgoing-pending queue if the
session proc is not blocked by
1. writer proc, or
2. remote-incoming-window
The credit reply is placed into the outgoing-pending queue.
This ensures that the session proc will only top up the next batch of
credits if sufficient messages were sent out to the writer proc.
A future commit could additionally have each queue limit the number of
unacked messages for a given AMQP consumer, or alternatively make use
of session outgoing-window.
Put configuration credit_flow_default_credit into persistent term such
that the tuple doesn't have to be copied on the hot path.
Also, change persistent term keys from `{rabbit, AtomKey}` to `AtomKey`
so that hashing becomes cheaper.
We reject CQv1 in rabbit.schema as well.
Most of the v1 code is still around as it is needed
for conversion to v2. It will be removed at a later
time when conversion is no longer supported.
We don't shard the CQ property suite anymore:
there's only 1 case remaining.
the `mc` module is ideally meant to be kept pure and portable
and feature flags have external infrastructure dependencies
as well as impure semantics.
Moving the check of this feature flag into the amqp session
simplifies the code (as no message containers with the new
format will enter the system before the feature flag is enabled).
Fixes#11171
An MQTT user encountered TLS handshake timeouts with their IoT device,
and the actual error from `ssl:handshake` / `ranch:handshake` was not
caught and logged.
At this time, `ranch` uses `exit(normal)` in the case of timeouts, but
that should change in the future
(https://github.com/ninenines/ranch/issues/336)
Similar to how we convert from mc_amqp to mc_amqpl before
sending to a classic queue or quorum queue process if
feature flag message_containers_store_amqp_v1 is disabled,
we also need to do the same conversion before sending to an MQTT QoS 0
queue on the old node.
Prior to this commit the entire amqp-value or amqp-sequence sections
were parsed when converting a message from mc_amqp.
Parsing the entire amqp-value or amqp-sequence section can generate a
huge amount of garbage depending on how large these sections are.
Given that other protocol cannot make use of amqp-value and
amqp-sequence sections anyway, leave them AMQP encoded when converting
from mc_amqp.
In fact prior to this commit, the entire body section was parsed
generating huge amounts of garbage just to subsequently encode it again
in mc_amqpl or mc_mqtt.
The new conversion interface from mc_amqp to other mc_* modules will
either output amqp-data sections or the encoded amqp-value /
amqp-sequence sections.
AMQP 3.2.1 defines durable=false to be the default.
However, the same section also mentions:
> If the header section is omitted the receiver MUST assume the appropriate
> default values (or the meaning implied by no value being set) for the
> fields within the header unless other target or node specific defaults
> have otherwise been set.
We want RabbitMQ to be secure by default, hence in RabbitMQ we set
durable=true to be the default.
This commit fixes test
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed -t- \
--test_sharding_strategy=disabled --test_env \
FOCUS="-group [mqtt,v3,cluster_size_3] -case pubsub"
```
Fix some mixed version tests
Assume the AMQP body, especially amqp-value section won't be parsed.
Hence, omit smart conversions from AMQP to MQTT involving the
Payload-Format-Indicator bit.
Fix test
Fix
```
bazel test //deps/amqp10_client:system_SUITE-mixed -t- --test_sharding_strategy=disabled --test_env FOCUS="-group [rabbitmq]
```