This commit enables client apps to automatically perform end-to-end
checksumming over the bare message (i.e. body + application defined
headers).
This commit allows an app to configure the AMQP client:
* for a sending link to automatically compute CRC-32 or Adler-32
checksums over each bare message including the computed checksum
as a footer annotation, and
* for a receiving link to automatically lookup the expected CRC-32
or Adler-32 checksum in the footer annotation and, if present, check
the received checksum against the actually computed checksum.
The commit comes with the following advantages:
1. Transparent end-to-end checksumming. Although checksumming is
performed by TCP and RabbitMQ queues using the disk, end-to-end
checksumming is a level higher up and can therefore detect bit flips
within RabbitMQ nodes or load balancers and other bit flips that
went unnoticed.
2. Not only is the body checksummed, but also the properties and
application-properties sections. This is an advantage over AMQP 0.9.1
because the AMQP protocol disallows modification of the bare message.
3. This commit is currently used for testing the RabbitMQ AMQP
implementation, but it shows the feasiblity of how apps could also
get integrity guarantees of the whole bare message using HMACs or
signatures.
Fix crashes when message is originally sent via AMQP and
stored within a classic or quorum queue and subsequently
dead lettered where the dead letter exchange needs access to message
annotations or properties or application-properties.
Crashes could happen because compaction would wrongly write
over valid messages, or truncate over valid messages, because
when looking for messages into the files it would encounter
leftover data that made it look like there was a message,
which prompted compaction to not look for the real messages
hidden within.
To avoid this we ensure that there can't be leftover data
as a result of compaction. We get this guarantee by blanking
data in the holes in the file before we start copying messages
closer to the start of the file. This requires us to do a few
more writes but we know that the only data in the files at any
point are valid messages.
Note that it's possible that some of the messages in the files
are no longer referenced; that's OK. We filter them out after
scanning the file.
This was also a good time to merge two almost identical scan
functions, and be more explicit about what messages should be
dropped after scanning the file (the messages no longer in the
ets index and the fan-out messages that ended up re-written in
a more recent file).
```
make -C deps/rabbit ct-amqp_system t=dotnet:fragmentation
```
fails in the new make CI with:
```
amqp_system_SUITE > dotnet > fragmentation
#1. {error,{{badmatch,{error,134,
"Unhandled exception. Amqp.AmqpException: Invalid frame size:527, maximum frame size:512.\n at Amqp.Connection.ThrowIfClosed(String operation)\n at Amqp.Connection.AddSession(Session session)\n at Amqp.Session..ctor(Connection connection, Begin begin, OnBegin onBegin)\n at Amqp.Session..ctor(Connection connection)\n at Program.AmqpClient.connectWithOpen(String uri, Open opn) in /home/runner/work/rabbitmq-server/rabbitmq-server/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs:line 53\n at Program.Test.fragmentation(String uri) in /home/runner/work/rabbitmq-server/rabbitmq-server/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs:line 284\n at Program.main(String[] argv) in /home/runner/work/rabbitmq-server/rabbitmq-server/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs:line 533\n"}},
[{amqp_system_SUITE,run_dotnet_test,2,
[{file,"amqp_system_SUITE.erl"},
{line,228}]},
{test_server,ts_tc,3,[{file,"test_server.erl"},{line,1793}]},
{test_server,run_test_case_eval1,6,
[{file,"test_server.erl"},{line,1302}]},
{test_server,run_test_case_eval,9,
[{file,"test_server.erl"},{line,1234}]}]}}
```
RabbitMQ includes its node name and cluster name in the open frame to
the client. Running this test locally shows an open frame size of 467
bytes.
The suspicion is that the node name and cluster name in CI is longer
causing the open frame from RabbitMQ to the client to exceed the frame size
of 512 bytes.
These changes to the defaults can affect other test cases. In particular
it will affect the time to live and deletion events in the vhost
deletion idempotency case.
## What?
Introduce a new address format (let's call it v2) for AMQP 1.0 source and target addresses.
The old format (let's call it v1) is described in
https://github.com/rabbitmq/rabbitmq-server/tree/v3.13.x/deps/rabbitmq_amqp1_0#routing-and-addressing
The only v2 source address format is:
```
/queue/:queue
```
The 4 possible v2 target addresses formats are:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
<null>
```
where the last AMQP <null> value format requires that each message’s `to` field contains one of:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
```
## Why?
The AMQP address v1 format comes with the following flaws:
1. Obscure address format:
Without reading the documentation, the differences for example between source addresses
```
/amq/queue/:queue
/queue/:queue
:queue
```
are unknown to users. Hence, the address format is obscure.
2. Implicit creation of topologies
Some address formats implicitly create queues (and bindings), such as source address
```
/exchange/:exchange/:binding-key
```
or target address
```
/queue/:queue
```
These queues and bindings are never deleted (by the AMQP 1.0 plugin.)
Implicit creation of such topologies is also obscure.
3. Redundant address formats
```
/queue/:queue
:queue
```
have the same meaning and are therefore redundant.
4. Properties section must be parsed to determine whether a routing key is present
Target address
```
/exchange/:exchange
```
requires RabbitMQ to parse the properties section in order to check whether the message `subject` is set.
If `subject` is not set, the routing key will default to the empty string.
5. Using `subject` as routing key misuses the purpose of this field.
According to the AMQP spec, the message `subject` field's purpose is:
> A common field for summary information about the message content and purpose.
6. Exchange names, queue names and routing keys must not contain the "/" (slash) character.
The current 3.13 implemenation splits by "/" disallowing these
characters in exchange, and queue names, and routing keys which is
unnecessary prohibitive.
7. Clients must create a separate link per target exchange
While this is reasonable working assumption, there might be rare use
cases where it could make sense to create many exchanges (e.g. 1
exchange per queue, see
https://github.com/rabbitmq/rabbitmq-server/discussions/10708) and have
a single application publish to all these exchanges.
With the v1 address format, for an application to send to 500 different
exchanges, it needs to create 500 links.
Due to these disadvantages and thanks to #10559 which allows clients to explicitly create topologies,
we can create a simpler, clearer, and better v2 address format.
## How?
### Design goals
Following the 7 cons from v1, the design goals for v2 are:
1. The address format should be simple so that users have a chance to
understand the meaning of the address without necessarily consulting the docs.
2. The address format should not implicitly create queues, bindings, or exchanges.
Instead, topologies should be created either explicitly via the new management node
prior to link attachment (see #10559), or in future, we might support the `dynamic`
source or target properties so that RabbitMQ creates queues dynamically.
3. No redundant address formats.
4. The target address format should explicitly state whether the routing key is present, empty,
or will be provided dynamically in each message.
5. `Subject` should not be used as routing key. Instead, a better
fitting field should be used.
6. Exchange names, queue names, and routing keys should allow to contain
valid UTF-8 encoded data including the "/" character.
7. Allow both target exchange and routing key to by dynamically provided within each message.
Furthermore
8. v2 must co-exist with v1 for at least some time. Applications should be able to upgrade to
RabbitMQ 4.0 while continuing to use v1. Examples include AMQP 1.0 shovels and plugins communicating
between a 4.0 and a 3.13 cluster. Starting with 4.1, we should change the AMQP 1.0 shovel and plugin clients
to use only the new v2 address format. This will allow AMQP 1.0 and plugins to communicate between a 4.1 and 4.2 cluster.
We will deprecate v1 in 4.0 and remove support for v1 in a later 4.x version.
### Additional Context
The address is usually a String, but can be of any type.
The [AMQP Addressing extension](https://docs.oasis-open.org/amqp/addressing/v1.0/addressing-v1.0.html)
suggests that addresses are URIs and are therefore hierarchical and could even contain query parameters:
> An AMQP address is a URI reference as defined by RFC3986.
> the path expression is a sequence of identifier segments that reflects a path through an
> implementation specific relationship graph of AMQP nodes and their termini.
> The path expression MUST resolve to a node’s terminus in an AMQP container.
The [Using the AMQP Anonymous Terminus for Message Routing Version 1.0](https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html)
extension allows for the target being `null` and the `To` property to contain the node address.
This corresponds to AMQP 0.9.1 where clients can send each message on the same channel to a different `{exchange, routing-key}` destination.
The following v2 address formats will be used.
### v2 addresses
A new deprecated feature flag `amqp_address_v1` will be introduced in 4.0 which is permitted by default.
Starting with 4.1, we should change the AMQP 1.0 shovel and plugin AMQP 1.0 clients to use only the new v2 address format.
However, 4.1 server code must still understand the 4.0 AMQP 1.0 shovel and plugin AMQP 1.0 clients’ v1 address format.
The new deprecated feature flag will therefore be denied by default in 4.2.
This allows AMQP 1.0 shovels and plugins to work between
* 4.0 and 3.13 clusters using v1
* 4.1 and 4.0 clusters using v2 from 4.1 to v4.0 and v1 from 4.0 to 4.1
* 4.2 and 4.1 clusters using v2
without having to support both v1 and v2 at the same time in the AMQP 1.0 shovel and plugin clients.
While supporting both v1 and v2 in these clients is feasible, it's simpler to switch the client code directly from v1 to v2.
### v2 source addresses
The source address format is
```
/queue/:queue
```
If the deprecated feature flag `amqp_address_v1` is permitted and the queue does not exist, the queue will be auto-created.
If the deprecated feature flag `amqp_address_v1` is denied, the queue must exist.
### v2 target addresses
v1 requires attaching a new link for each destination exchange.
v2 will allow dynamic `{exchange, routing-key}` combinations for a given link.
v2 therefore allows for the rare use cases where a single AMQP 1.0 publisher app needs to send to many different exchanges.
Setting up a link per destination exchange could be cumbersome.
Hence, v2 will support the dynamic `{exchange, routing-key}` combinations of AMQP 0.9.1.
To achieve this, we make use of the "Anonymous Terminus for Message Routing" extension:
The target address will contain the AMQP value null.
The `To` field in each message must be set and contain either address format
```
/exchange/:exchange/key/:routing-key
```
or
```
/exchange/:exchange
```
when using the empty routing key.
The `to` field requires an address type and is better suited than the `subject field.
Note that each message will contain this `To` value for the anonymous terminus.
Hence, we should save some bytes being sent across the network and stored on disk.
Using a format
```
/e/:exchange/k/:routing-key
```
saves more bytes, but is too obscure.
However, we use only `/key/` instead of `/routing-key/` so save a few bytes.
This also simplifies the format because users don’t have to remember whether to use spell `routing-key` or `routing_key` or `routingkey`.
The other allowed target address formats are:
```
/exchange/:exchange/key/:routing-key
```
where exchange and routing key are static on the given link.
```
/exchange/:exchange
```
where exchange and routing key are static on the given link, and routing key will be the empty string (useful for example for the fanout exchange).
```
/queue/:queue
```
This provides RabbitMQ beginners the illusion of sending a message directly
to a queue without having to understand what exchanges and routing keys are.
If the deprecated feature flag `amqp_address_v1` is permitted and the queue does not exist, the queue will be auto-created.
If the deprecated feature flag `amqp_address_v1` is denied, the queue must exist.
Besides the additional queue existence check, this queue target is different from
```
/exchange//key/:queue
```
in that queue specific optimisations might be done (in future) by RabbitMQ
(for example different receiving queue types could grant different amounts of link credits to the sending clients).
A write permission check to the amq.default exchange will be performed nevertheless.
v2 will prohibit the v1 static link & dynamic routing-key combination
where the routing key is sent in the message `subject` as that’s also obscure.
For this use case, v2’s new anonymous terminus can be used where both exchange and routing key are defined in the message’s `To` field.
(The bare message must not be modified because it could be signed.)
The alias format
```
/topic/:topic
```
will also be removed.
Sending to topic exchanges is arguably an advanced feature.
Users can directly use the format
```
/exchange/amq.topic/key/:topic
```
which reduces the number of redundant address formats.
### v2 address format reference
To sump up (and as stated at the top of this commit message):
The only v2 source address format is:
```
/queue/:queue
```
The 4 possible v2 target addresses formats are:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
<null>
```
where the last AMQP <null> value format requires that each message’s `to` field contains one of:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
```
Hence, all 8 listed design goals are reached.
```
bazel test //deps/rabbit:amqp_client_SUITE-mixed -t- --test_sharding_strategy=disabled --test_env FOCUS="-group [cluster_size_3] -case async_notify_unsettled_classic_queue" --config=rbe-26 --runs_per_test=40
```
was failing 8 out of 40 times.
Skip this test as we know that link flow control with classic queues is
broken in 3.13:
https://github.com/rabbitmq/rabbitmq-server/issues/2597
Credit API v2 in RabbitMQ 4.0 fixes this bug.
Not only are quorum queues wrongly implemented, but also classic queues
when draining in 3.13.
Like quorum queues, classsic queues reply with a send_drained event
before delivering the message(s).
Therefore, we have to skip the drain test in such mixed version
clusters where the leader runs on the old (3.13.1) node.
The new 4.0 implementation with credit API v2 fixes this bug.
## What?
* Allow AMQP 1.0 clients to dynamically create and delete RabbitMQ
topologies (exchanges, queues, bindings).
* Provide an Erlang AMQP 1.0 client that manages topologies.
## Why?
Today, RabbitMQ topologies can be created via:
* [Management HTTP API](https://www.rabbitmq.com/docs/management#http-api)
(including Management UI and
[messaging-topology-operator](https://github.com/rabbitmq/messaging-topology-operator))
* [Definition Import](https://www.rabbitmq.com/docs/definitions#import)
* AMQP 0.9.1 clients
Up to RabbitMQ 3.13 the RabbitMQ AMQP 1.0 plugin auto creates queues
and bindings depending on the terminus [address
format](https://github.com/rabbitmq/rabbitmq-server/tree/v3.13.x/deps/rabbitmq_amqp1_0#routing-and-addressing).
Such implicit creation of topologies is limiting and obscure.
For some address formats, queues will be created, but not deleted.
Some of RabbitMQ's success is due to its flexible routing topologies
that AMQP 0.9.1 clients can create and delete dynamically.
This commit allows dynamic management of topologies for AMQP 1.0 clients.
This commit builds on top of Native AMQP 1.0 (PR #9022) and will be
available in RabbitMQ 4.0.
## How?
This commits adds the following management operations for AMQP 1.0 clients:
* declare queue
* delete queue
* purge queue
* bind queue to exchange
* unbind queue from exchange
* declare exchange
* delete exchange
* bind exchange to exchange
* unbind exchange from exchange
Hence, at least the AMQP 0.9.1 management operations are supported for
AMQP 1.0 clients.
In addition the operation
* get queue
is provided which - similar to `declare queue` - returns queue
information including the current leader and replicas.
This allows clients to publish or consume locally on the node that hosts
the queue.
Compared to AMQP 0.9.1 whose commands and command fields are fixed, the
new AMQP Management API is extensible: New operations and new fields can
easily be added in the future.
There are different design options how management operations could be
supported for AMQP 1.0 clients:
1. Use a special exchange type as done in https://github.com/rabbitmq/rabbitmq-management-exchange
This has the advantage that any protocol client (e.g. also STOMP clients) could
dynamically manage topologies. However, a special exchange type is the wrong abstraction.
2. Clients could send "special" messages with special headers that the broker interprets.
This commit decided for a variation of the 2nd option using a more
standardized way by re-using a subest of the following latest AMQP 1.0 extension
specifications:
* [AMQP Request-Response Messaging with Link Pairing Version 1.0 - Committee Specification 01](https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html) (February 2021)
* [HTTP Semantics and Content over AMQP Version 1.0 - Working Draft 06](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=65571) (July 2019)
* [AMQP Management Version 1.0 - Working Draft 16](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=65575) (July 2019)
An important goal is to keep the interaction between AMQP 1.0 client and RabbitMQ
simple to increase usage, development and adoptability of future RabbitMQ AMQP 1.0
client library wrappers.
The AMQP 1.0 client has to create a link pair to the special `/management` node.
This allows the client to send and receive from the management node.
Similar to AMQP 0.9.1, there is no need for a reply queue since the reply
will be sent directly to the client.
Requests and responses are modelled via HTTP, but sent via AMQP using
the `HTTP Semantics and Content over AMQP` extension (henceforth `HTTP
over AMQP` extension).
This commit tries to follow the `HTTP over AMQP` extension as much as
possible but deviates where this draft spec doesn't make sense.
The projected mode §4.1 is used as opposed to tunneled mode §4.2.
A named relay `/management` is used (§6.3) where the message field `to` is the URL.
Deviations are
* §3.1 mandates that URIs are not encoded in an AMQP message.
However, we percent encode URIs in the AMQP message. Otherwise there
is for example no way to distinguish a `/` in a queue name from the
URI path separator `/`.
* §4.1.4 mandates a data section. This commit uses an amqp-value section
as it's a better fit given that the content is AMQP encoded data.
Using an HTTP API allows for a common well understood interface and future extensibility.
Instead of re-using the current RabbitMQ HTTP API, this commit uses a
new HTTP API (let's call it v2) which could be used as a future API for
plain HTTP clients.
### HTTP API v1
The current HTTP API (let's call it v1) is **not** used since v1
comes with a couple of weaknesses:
1. Deep level of nesting becomes confusing and difficult to manage.
Examples of deep nesting in v1:
```
/api/bindings/vhost/e/source/e/destination/props
/api/bindings/vhost/e/exchange/q/queue/props
```
2. Redundant endpoints returning the same resources
v1 has 9 endpoints to list binding(s):
```
/api/exchanges/vhost/name/bindings/source
/api/exchanges/vhost/name/bindings/destination
/api/queues/vhost/name/bindings
/api/bindings
/api/bindings/vhost
/api/bindings/vhost/e/exchange/q/queue
/api/bindings/vhost/e/exchange/q/queue/props
/api/bindings/vhost/e/source/e/destination
/api/bindings/vhost/e/source/e/destination/props
```
3. Verbs in path names
Path names should be nouns instead.
v1 contains verbs:
```
/api/queues/vhost/name/get
/api/exchanges/vhost/name/publish
```
### AMQP Management extension
Only few aspects of the AMQP Management extension are used.
The central idea of the AMQP management spec is **dynamic discovery** such that broker independent AMQP 1.0
clients can discover objects, types, operations, and HTTP endpoints of specific brokers.
In fact, clients are only conformant if:
> All request addresses are dynamically discovered starting from the discovery document.
> A requesting container MUST NOT use fixed assumptions about the addressing structure of the management API.
While this is a nice and powerful idea, no AMQP 1.0 client and no AMQP 1.0 server implement the
latest AMQP 1.0 management spec from 2019, partly presumably due to its complexity.
Therefore, the idea of such dynamic discovery has failed to be implemented in practice.
The AMQP management spec mandates that the management endpoint returns a discovery document containing
broker specific collections, types, configuration, and operations including their endpoints.
The API endpoints of the AMQP management spec are therefore all designed around dynamic discovery.
For example, to create either a queue or an exchange, the client has to
```
POST /$management/entities
```
which shows that the entities collection acts as a generic factory, see section 2.2.
The server will then create the resource and reply with a location header containing a URI pointing to the resource.
For RabbitMQ, we don’t need such a generic factory to create queues or exchanges.
To list bindings for a queue Q1, the spec suggests
```
GET /$management/Queues/Q1/$management/entities
```
which again shows the generic entities endpoint as well as a `$management` endpoint under Q1 to
allow a queue to return a discovery document.
For RabbitMQ, we don’t need such generic endpoints and discovery documents.
Given we aim for our own thin RabbitMQ AMQP 1.0 client wrapper libraries which expose
the RabbitMQ model to the developer, we can directly use fixed HTTP endpoint assumptions
in our RabbitMQ specific libraries.
This is by far simpler than using the dynamic endpoints of the management spec.
Simplicity leads to higher adoption and enables more developers to write RabbitMQ AMQP 1.0 client
library wrappers.
The AMQP Management extension also suffers from deep level of nesting in paths
Examples:
```
/$management/Queues/Q1/$management/entities
/$management/Queues/Q1/Bindings/Binding1
```
as well as verbs in path names: Section 7.1.4 suggests using verbs in path names,
for example “purge”, due to the dynamic operations discovery document.
### HTTP API v2
This commit introduces a new HTTP API v2 following best practices.
It could serve as a future API for plain HTTP clients.
This commit and RabbitMQ 4.0 will only implement a minimal set of
HTTP API v2 endpoints and only for HTTP over AMQP.
In other words, the existing HTTP API v1 Cowboy handlers will continue to be
used for all plain HTTP requests in RabbitMQ 4.0 and will remain untouched for RabbitMQ 4.0.
Over time, after 4.0 shipped, we could ship a pure HTTP API implementation for HTTP API v2.
Hence, the new HTTP API v2 endpoints for HTTP over AMQP should be designed such that they
can be re-used in the future for a pure HTTP implementation.
The minimal set of endpoints for RabbitMQ 4.0 are:
``
GET / PUT / DELETE
/vhosts/:vhost/queues/:queue
```
read, create, delete a queue
```
DELETE
/vhosts/:vhost/queues/:queue/messages
```
purges a queue
```
GET / DELETE
/vhosts/:vhost/bindings/:binding
```
read, delete bindings
where `:binding` is a binding ID of the following path segment:
```
src=e1;dstq=q2;key=my-key;args=
```
Binding arguments `args` has an empty value by default, i.e. there are no binding arguments.
If the binding includes binding arguments, `args` will be an Erlang portable term hash
provided by the server similar to what’s provided in HTTP API v1 today.
Alternatively, we could use an arguments scheme of:
```
args=k1,utf8,v1&k2,uint,3
```
However, such a scheme leads to long URIs when there are many binding arguments.
Note that it’s perfectly fine for URI producing applications to include URI
reserved characters `=` / `;` / `,` / `$` in a path segment.
To create a binding, the client therefore needs to POST to a bindings factory URI:
```
POST
/vhosts/:vhost/bindings
```
To list all bindings between a source exchange e1 and destination exchange e2 with binding key k1:
```
GET
/vhosts/:vhost/bindings?src=e1&dste=e2&key=k1
```
This endpoint will be called by the RabbitMQ AMQP 1.0 client library to unbind a
binding with non-empty binding arguments to get the binding ID before invoking a
```
DELETE
/vhosts/:vhost/bindings/:binding
```
In future, after RabbitMQ 4.0 shipped, new API endpoints could be added.
The following is up for discussion and is only meant to show the clean and simple design of HTTP API v2.
Bindings endpoint can be queried as follows:
to list all bindings for a given source exchange e1:
```
GET
/vhosts/:vhost/bindings?src=e1
```
to list all bindings for a given destination queue q1:
```
GET
/vhosts/:vhost/bindings?dstq=q1
```
to list all bindings between a source exchange e1 and destination queue q1:
```
GET
/vhosts/:vhost/bindings?src=e1&dstq=q1
```
multiple bindings between source exchange e1 and destination queue q1 could be deleted at once as follows:
```
DELETE /vhosts/:vhost/bindings?src=e1&dstq=q1
```
GET could be supported globally across all vhosts:
```
/exchanges
/queues
/bindings
```
Publish a message:
```
POST
/vhosts/:vhost/queues/:queue/messages
```
Consume or peek a message (depending on query parameters):
```
GET
/vhosts/:vhost/queues/:queue/messages
```
Note that the AMQP 1.0 client omits the `/vhost/:vhost` path prefix.
Since an AMQP connection belongs to a single vhost, there is no need to
additionally include the vhost in every HTTP request.
Pros of HTTP API v2:
1. Low level of nesting
Queues, exchanges, bindings are top level entities directly under vhosts.
Although the HTTP API doesn’t have to reflect how resources are stored in the database,
v2 does nicely reflect the Khepri tree structure.
2. Nouns instead of verbs
HTTP API v2 is very simple to read and understand as shown by
```
POST /vhosts/:vhost/queues/:queue/messages to post messages, i.e. publish to a queue.
GET /vhosts/:vhost/queues/:queue/messages to get messages, i.e. consume or peek from a queue.
DELETE /vhosts/:vhost/queues/:queue/messages to delete messages, i.e. purge a queue.
```
A separate new HTTP API v2 allows us to ship only handlers for HTTP over AMQP for RabbitMQ 4.0
and therefore move faster while still keeping the option on the table to re-use the new v2 API
for pure HTTP in the future.
In contrast, re-using the HTTP API v1 for HTTP over AMQP is possible, but dirty because separate handlers
(HTTP over AMQP and pure HTTP) replying differently will be needed for the same v1 endpoints.
[Why]
It looks like `exit(Spammer, normal)` doesn't terminate the process.
This leaves a dangling process around and seems to cause transient
failures in the `try_to_deadlock_in_registry_reload_1` testcase that
follows it.
[How]
We use `exit(Spammer, kill)` and a monitor to wait for the process to
actually terminate.
Without a feature flag it is possible to add a member on a newer node
with a Ra command format that the other nodes do not yet understand
resulting in crashed nodes.
when message was published to a stream via the stream protocol.
Fixes the following test:
```
./mvnw test -Dtest=AmqpInteroperabilityTest#publishToStreamConsumeFromStreamQueue
```
for default and pre-declared exchanges to save copying
the #exchange{} record (i.e. save an ETS lookup call) on
every received message.
The default and pre-declared exchanges are protected from deletion and
modification. Exchange routing decorators are not used in tier 1 plugins
and in no open source tier 2 plugin.
What?
Protect receiving application from being overloaded with new messages
while still processing existing messages if the auto credit renewal
feature of the Erlang AMQP 1.0 client library is used.
This feature can therefore be thought of as a prefetch window equivalent
in AMQP 0.9.1 or MQTT 5.0 property Receive Maximum.
How?
The credit auto renewal feature in RabbitMQ 3.x was wrongly implemented.
This commit takes the same approach as done in the server:
The incoming_unsettled map is hold in the link instead of in the session
to accurately and quickly determine the number of unsettled messages for
a receiving link.
The amqp10_client lib will grant more credits to the sender when the sum
of remaining link credits and number of unsettled deliveries falls below
the threshold RenewWhenBelow.
This avoids maintaning additional state like the `link_credit_unsettled`
or an alternative delivery_count_settled sequence number which is more
complex to implement correctly.
This commit breaks the amqp10_client_session:disposition/6 API:
This commit forces the client application to only range settle for a
given link, i.e. not across multiple links on a given session at once.
The latter is allowed according to the AMQP spec.
What?
To not risk any regressions, keep the behaviour of RabbitMQ 3.x
where channel processes and connection helper processes such as
rabbit_queue_collector and rabbit_heartbeat are terminated after
rabbit_reader process.
For example, when RabbitMQ terminates with SIGTERM, we want
exclusive queues being deleted synchronously (as in 3.x).
Prior to this commit:
1. java -jar target/perf-test.jar -x 0 -y 1
2. ./sbin/rabbitmqctl stop_app
resulted in the following crash:
```
crasher:
initial call: rabbit_reader:init/2
pid: <0.2389.0>
registered_name: []
exception exit: {noproc,
{gen_server,call,[<0.2391.0>,delete_all,infinity]}}
in function gen_server:call/3 (gen_server.erl, line 419)
in call from rabbit_reader:close_connection/1 (rabbit_reader.erl, line 683)
in call from rabbit_reader:send_error_on_channel0_and_close/4 (rabbit_reader.erl, line 1668)
in call from rabbit_reader:handle_dependent_exit/3 (rabbit_reader.erl, line 710)
in call from rabbit_reader:mainloop/4 (rabbit_reader.erl, line 530)
in call from rabbit_reader:run/1 (rabbit_reader.erl, line 452)
in call from rabbit_reader:start_connection/4 (rabbit_reader.erl, line 351)
```
because rabbit_queue_collector was terminated before rabbit_reader.
This commit fixes this crash.
How?
Any Erlang supervisor including the rabbit_connection_sup supervisor
terminates its children in the opposite of the start order.
Since we want channel and queue collector processes - children of
rabbit_connection_helper_sup - be terminated after the
reader process, we must start rabbit_connection_helper_sup before the
reader process.
Since rabbit_connection_sup - the ranch_protocol implementation - does
not know yet whether it will supervise an AMQP 0.9.1 or AMQP 1.0
connection, it creates rabbit_connection_helper_sup for each AMQP protocol
version removing the superfluous one as soon as the protocol version negotation is
completed. Spawning and deleting this addition process has a negligible
effect on performance.
The whole problem is that the rabbit_connection_helper_sup differs in
its supervisor flags for AMQP 0.9.1 and AMQP 1.0 when it is started
because for Native AMQP 1.0 in 4.0 we remove the unnecessary
rabbit_amqp1_0_session_sup_sup supervisor level.
Therefore, we achieve our goal:
* in Native AMQP 1.0, 1 additional Erlang process is created per session
* in AMQP 1.0 in 3.x, 15 additional Erlang processes are created per session
## What
Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0.
By "native", we mean do not proxy via AMQP 0.9.1 anymore.
## Why
Native AMQP 1.0 comes with the following major benefits:
1. Similar to Native MQTT, this commit provides better throughput, latency,
scalability, and resource usage for AMQP 1.0.
See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements.
See further below for some benchmarks.
2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol,
this commit allows implementing more AMQP 1.0 features in the future.
Some features are already implemented in this commit (see next section).
3. Simpler, better understandable, and more maintainable code.
Native AMQP 1.0 as implemented in this commit has the
following major benefits compared to AMQP 0.9.1:
4. Memory and disk alarms will only stop accepting incoming TRANSFER frames.
New connections can still be created to consume from RabbitMQ to empty queues.
5. Due to 4. no need anymore for separate connections for publishers and
consumers as we currently recommended for AMQP 0.9.1. which potentially
halves the number of physical TCP connections.
6. When a single connection sends to multiple target queues, a single
slow target queue won't block the entire connection.
Publisher can still send data quickly to all other target queues.
7. A publisher can request whether it wants publisher confirmation on a per-message basis.
In AMQP 0.9.1 publisher confirms are configured per channel only.
8. Consumers can change their "prefetch count" dynamically which isn't
possible in our AMQP 0.9.1 implementation. See #10174
9. AMQP 1.0 is an extensible protocol
This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in
RabbitMQ 3.x - most of which cannot be backported due to the complexity
and limitations of the old 3.x implementation.
This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.
## Implementation details
1. Breaking change: With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
will break because we always convert according to the message container conversions.
For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties.
Also, `false` won’t be respected since we always convert the headers with message containers.
2. Remove rabbit_queue_collector
rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.
3. 7 processes per connection + 1 process per session in this commit instead of
7 processes per connection + 15 processes per session in 3.x
Supervision hierarchy got re-designed.
4. Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel.
Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session.
Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
a connection write heavily at the same time.
This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.
Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
can accumulate across all sessions bytes before flushing the socket.
In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy.
We still ensure high throughput by having separate reader, writer, and
session processes.
5. Transform rabbit_amqp1_0_writer into gen_server
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's better
to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.
How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.
6. Remove stats timer from writer
AMQP 1.0 connections haven't emitted any stats previously.
7. When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs.
This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
and unsettled TRANSFERs.
8. Introduce credit API v2
Why:
The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly
designed since basic.credit is a synchronous call into the queue process
blocking the entire AMQP 1.0 session process.
How:
Change the interactions between queue clients and queue server
implementations:
* Clients only request a credit reply if the FLOW's `echo` field is set
* Include all link flow control state held by the queue process into a
new credit_reply queue event:
* `available` after the queue sends any deliveries
* `link-credit` after the queue sends any deliveries
* `drain` which allows us to combine the old queue events
send_credit_reply and send_drained into a single new queue event
credit_reply.
* Include the consumer tag into the credit_reply queue event such that
the AMQP 1.0 session process can process any credit replies
asynchronously.
Link flow control state `delivery-count` also moves to the queue processes.
The new interactions are hidden behind feature flag credit_api_v2 to
allow for rolling upgrades from 3.13 to 4.0.
9. Use serial number arithmetic in quorum queues and session process.
10. Completely bypass the rabbit_limiter module for AMQP 1.0
flow control. The goal is to eventually remove the rabbit_limiter module
in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This
commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter
into rabbit_queue_consumers.
11. Fix credit bug for streams:
AMQP 1.0 settlements shouldn't top up link credit,
only FLOW frames should top up link credit.
12. Allow sender settle mode unsettled for streams
since AMQP 1.0 acknowledgements to streams are no-ops (currently).
13. Fix AMQP 1.0 client bugs
Auto renewing credits should not be related to settling TRANSFERs.
Remove field link_credit_unsettled as it was wrong and confusing.
Prior to this commit auto renewal did not work when the sender uses
sender settlement mode settled.
14. Fix AMQP 1.0 client bugs
The wrong outdated Link was passed to function auto_flow/2
15. Use osiris chunk iterator
Only hold messages of uncompressed sub batches in memory if consumer
doesn't have sufficient credits.
Compressed sub batches are skipped for non Stream protocol consumers.
16. Fix incoming link flow control
Always use confirms between AMQP 1.0 queue clients and queue servers.
As already done internally by rabbit_fifo_client and
rabbit_stream_queue, use confirms for classic queues as well.
17. Include link handle into correlation when publishing messages to target queues
such that session process can correlate confirms from target queues to
incoming links.
18. Only grant more credits to publishers if publisher hasn't sufficient credits
anymore and there are not too many unconfirmed messages on the link.
19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow
between classic queue process and session process.
20. Link flow control is independent between links.
A client can refer to a queue or to an exchange with multiple
dynamically added target queues. Multiple incoming links can also fan
in to the same queue. However the link topology looks like, this
commit ensures that each link is only granted more credits if that link
isn't overloaded.
21. A connection or a session can send to many different queues.
In AMQP 0.9.1, a single slow queue will lead to the entire channel, and
then entire connection being blocked.
This commit makes sure that a single slow queue from one link won't slow
down sending on other links.
For example, having link A sending to a local classic queue and
link B sending to 5 replica quorum queue, link B will naturally
grant credits slower than link A. So, despite the quorum queue being
slower in confirming messages, the same AMQP 1.0 connection and session
can still pump data very fast into the classic queue.
22. If cluster wide memory or disk alarm occurs.
Each session sends a FLOW with incoming-window to 0 to sending client.
If sending clients don’t obey, force disconnect the client.
If cluster wide memory alarm clears:
Each session resumes with a FLOW defaulting to initial incoming-window.
23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms,
specifically, attaching consumers and consuming, i.e. emptying queues.
There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.
24. Flow control summary:
* If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
* If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
* If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied.
Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.
25. Register AMQP sessions
Prefer local-only pg over our custom pg_local implementation as
pg is a better process group implementation than pg_local.
pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.
26. Start a local-only pg when Rabbit boots:
> A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name:
> pg:start_link(node()).
Register AMQP 1.0 connections and sessions with pg.
In future we should remove pg_local and instead use the new local-only
pg for all registered processes such as AMQP 0.9.1 connections and channels.
27. Requeue messages if link detached
Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed'
field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed,
we expect every outstanding delivery to be requeued.
In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued.
Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1:
"After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can
still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them."
[https://www.rabbitmq.com/consumers.html#unsubscribing]
An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries
28. Init AMQP session with BEGIN frame
Similar to how there can't be an MQTT processor without a CONNECT
frame, there can't be an AMQP session without a BEGIN frame.
This allows having strict dialyzer types for session flow control
fields (i.e. not allowing 'undefined').
29. Move serial_number to AMQP 1.0 common lib
such that it can be used by both AMQP 1.0 server and client
30. Fix AMQP client to do serial number arithmetic.
31. AMQP client: Differentiate between delivery-id and transfer-id for better
understandability.
32. Fix link flow control in classic queues
This commit fixes
```
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
```
followed by
```
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2
```
Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.
The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.
Fixes#2597
The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.
Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.
33. The double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This is a big simplification.
34. This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.
Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.
Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).
Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.
This approach is safe and simple.
The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:
i. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.
ii. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simultaneously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?
35. Support stream filtering in AMQP 1.0 (by @acogoluegnes)
Use the x-stream-filter-value message annotation
to carry the filter value in a published message.
Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered
filters when creating a receiver that wants to filter
out messages from a stream.
36. Remove credit extension from AMQP 0.9.1 client
37. Support maintenance mode closing AMQP 1.0 connections.
38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.
39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default.
The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment
tools from failing that execute:
```
rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
```
40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`.
Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`:
```
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}
```
## Benchmarks
### Throughput & Latency
Setup:
* Single node Ubuntu 22.04
* Erlang 26.1.1
Start RabbitMQ:
```
make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"
```
Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.
Start client:
https://github.com/ssorj/quiverhttps://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
```
1. Classic queue
```
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s
Latencies by percentile:
0% ........ 0 ms 90.00% ........ 9 ms
25% ........ 2 ms 99.00% ....... 14 ms
50% ........ 4 ms 99.90% ....... 17 ms
100% ....... 26 ms 99.99% ....... 24 ms
```
RabbitMQ 3.x (main branch as of 30 January 2024):
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 6 73.6 2.1 3,217 1,607 0 8.0 511
4.1 163,580 16,367 2 74.1 4.1 3,217 0 0 8.0 0
6.1 229,114 32,767 3 74.1 6.1 3,217 0 0 8.0 0
8.1 261,880 16,367 2 74.1 8.1 67,874 32,296 8 8.2 7,662
10.1 294,646 16,367 2 74.1 10.1 67,874 0 0 8.2 0
12.1 360,180 32,734 3 74.1 12.1 67,874 0 0 8.2 0
14.1 392,946 16,367 3 74.1 14.1 68,604 365 0 8.2 12,147
16.1 458,480 32,734 3 74.1 16.1 68,604 0 0 8.2 0
18.1 491,246 16,367 2 74.1 18.1 68,604 0 0 8.2 0
20.1 556,780 32,767 4 74.1 20.1 68,604 0 0 8.2 0
22.1 589,546 16,375 2 74.1 22.1 68,604 0 0 8.2 0
receiver timed out
24.1 622,312 16,367 2 74.1 24.1 68,604 0 0 8.2 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
```
2. Quorum queue:
```
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s
Latencies by percentile:
0% ....... 11 ms 90.00% ....... 23 ms
25% ....... 15 ms 99.00% ....... 28 ms
50% ....... 18 ms 99.90% ....... 33 ms
100% ....... 49 ms 99.99% ....... 47 ms
```
RabbitMQ 3.x:
```
---------------------- Sender ----------------------- --------------------- Receiver ---------------------- --------
Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Time [s] Count [m] Rate [m/s] CPU [%] RSS [M] Lat [ms]
----------------------------------------------------- ----------------------------------------------------- --------
2.1 130,814 65,342 9 69.9 2.1 18,430 9,206 5 7.6 1,221
4.1 163,580 16,375 5 70.2 4.1 18,867 218 0 7.6 2,168
6.1 229,114 32,767 6 70.2 6.1 18,867 0 0 7.6 0
8.1 294,648 32,734 7 70.2 8.1 18,867 0 0 7.6 0
10.1 360,182 32,734 6 70.2 10.1 18,867 0 0 7.6 0
12.1 425,716 32,767 6 70.2 12.1 18,867 0 0 7.6 0
receiver timed out
14.1 458,482 16,367 5 70.2 14.1 18,867 0 0 7.6 0
quiver: error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
_plano.wait(receiver, check=True)
File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
```
3. Stream:
```
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s
```
RabbitMQ 3.x:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s
```
### Memory usage
Start RabbitMQ:
```
ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
```
```
/bin/cat rabbitmq.conf
tcp_listen_options.sndbuf = 2048
tcp_listen_options.recbuf = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none
```
Create 50k connections with 2 sessions per connection, i.e. 100k session in total:
```go
package main
import (
"context"
"log"
"time"
"github.com/Azure/go-amqp"
)
func main() {
for i := 0; i < 50000; i++ {
conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
if err != nil {
log.Fatal("dialing AMQP server:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
_, err = conn.NewSession(context.TODO(), nil)
if err != nil {
log.Fatal("creating AMQP session:", err)
}
}
log.Println("opened all connections")
time.Sleep(5 * time.Hour)
}
```
This commit:
```
erlang:memory().
[{total,4586376480},
{processes,4025898504},
{processes_used,4025871040},
{system,560477976},
{atom,1048841},
{atom_used,1042841},
{binary,233228608},
{code,21449982},
{ets,108560464}]
erlang:system_info(process_count).
450289
```
7 procs per connection + 1 proc per session.
(7 + 2*1) * 50,000 = 450,000 procs
RabbitMQ 3.x:
```
erlang:memory().
[{total,15168232704},
{processes,14044779256},
{processes_used,14044755120},
{system,1123453448},
{atom,1057033},
{atom_used,1052587},
{binary,236381264},
{code,21790238},
{ets,391423744}]
erlang:system_info(process_count).
1850309
```
7 procs per connection + 15 per session
(7 + 2*15) * 50,000 = 1,850,000 procs
50k connections + 100k session require
with this commit: 4.5 GB
in RabbitMQ 3.x: 15 GB
## Future work
1. More efficient parser and serializer
2. TODO in mc_amqp: Do not store the parsed message on disk.
3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP
clients to create RabbitMQ objects (queues, exchanges, ...).
Fixes https://github.com/rabbitmq/rabbitmq-server/discussions/10620
Up to RabbitMQ 3.12:
* When an AMQP 0.9.1 publisher sends a message with P_basic.headers
unset, RabbitMQ will deliver an AMQP 0.9.1 message with
P_basic.headers unset.
* When an AMQP 0.9.1 publisher sends a message with P_basic.headers
being an empty list ([]), RabbitMQ will deliver an AMQP 0.9.1 message with
P_basic.headers being an empty list ([]).
In 3.13 including message containers, the 1st behaviour stayed the same
while the 2nd behaviour changed to:
* When an AMQP 0.9.1 publisher sends a message with P_basic.headers
being an empty list ([]), RabbitMQ will deliver an AMQP 0.9.1 message with
P_basic.headers unset.
This commit fixes this regression by using the same behaviour as in
3.12.
An AMQP boolean can by encoded using 1 byte or 2 bytes:
https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-boolean
Prior to this commit, our Erlang parser returned:
* Erlang terms `true` or `false` for the 1 byte AMQP encoding
* Erlang terms `{boolean, true}` or `{boolean, false}` for the 2 byte AMQP enconding
Having a serializer and parser that perform the opposite actions such
that
```
Term = parse(serialize(Term))
```
is desirable as it provides a symmetric property useful not only for
property based testing, but also for avoiding altering message hashes
when serializing and parsing the same term.
However, dealing wth `{boolean, boolean()}` tuples instead of `boolean()` is very unhandy since
all Erlang code must take care of both forms leading to subtle bugs as
occurred in:
* 4cbeab8974/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl (L155-L158)
* b8173c9d3b/deps/rabbitmq_mqtt/src/mc_mqtt.erl (L83-L88)
* b8173c9d3b/deps/rabbit/src/mc_amqpl.erl (L123-L127)
Therefore, this commits decides to take the safe approach and always
parse to an Erlang `boolean()` independent of whether the AMQP boolean
was encoded with 1 or 2 bytes.
[Why]
The format was changed to be compatible with Khepri paths. However, this
ID is used in in-memory states here and there as well. So changing its
format makes upgrades complicated because the code has to handle both
the old and new formats possibly used by the mirrored supervisor already
running on other nodes.
[How]
Instead, this patch converts the ID (in its old format) to something
compatible with a Khepri path only when we need to build a Khepri path.
This relies on the fact that the `Group` is a module and we can call it
to let it convert the opaque ID to a Khepri path.
While here, improve the type specs to document that a group is always a
module name and to document what a child ID can be.
This configuration is not guaranteed to be safe to change after a stream has bee n
declared and thus we'll remove the ability to change it after the initial
declaration. Users should favour the x- queue arg for this config but it will still
be possible to configure it as a policy but it will only be evaluated at
declara tion time.
This means that if a policy is set for a stream that re-configures the
`stream-m ax-segment-size-bytes` key it will show in the UI as updated but
the pre-existing stream will not use the updated configuration.
The key has been removed from the UI but for backwards compatibility it is still
settable.
NB: this PR adds a new command `update_config` to the stream coordinator state
machine. Strictly speaking this should require a new machine version but we're by
passing that by relying on the feature flag instead which avoids this command
being committed before all nodes have the new code version. A new machine version
can lower the availability properties during a rolling cluster upgrade so in
this case it is preferable to avoid that given the simplicity of the change.
[Why]
Sometimes, `ra_leaderboard:lookup_leader/1` will return `undefined`
because it doesn't know the leader yet. This leads to a failure of the
testcase with a `badmatch` exception.
[How]
We wait for the function to return a valid leader ID, then try again and
return the result.
Various bug fixes to make stream coordinator membership changes
more reliable. Previously various errors could happen as well as
partially successful attempts where the membership change command
may fail but it leaves the new server running.
Also ensure that stream coordinator members are removed as part of
the forget_cluster_node command.
Add stream coordinator status command.
To show the raft status of the stream coordinator just like is done
for quorum queues.
The leader_locator_balanced_random_maintenance test is effectively
using a plain random approach so we cannot assert that there
definitely would be leaders on both potential nodes only that there
aren't any leaders on the node that is in maintenance mode.
Stream deletes aren't necessarily fully complete by the time the
queue.delete command returns as the stream coordinator will do this
work async. By using unique queue names we avoid the need to do
additional polling / waiting for teh delete operation to be
fully completed.
This makes a command that renames cluster members
a no-op. This command is really complex under
the hood and is fundamentally incompatible
with a few key Raft-based features:
* Khepri
* Quorum queues
* Streams
Because Khepri first ships in RabbitMQ 3.13,
now is the time to effectively eliminate this
command.
It will be permanently removed together with
other deprecated CLI commands in 4.0.
Per discussion with the team.
Closes#10367.
Backends return 'never' or the timestamp of the expiry time
of the credentials. Only the OAuth2 backend returns a timestamp,
other RabbitMQ authz backends return 'never'.
Client code uses rabbit_access_control, so it contains now
a new expiry_timestamp/1 function that returns the earliest
expiry time of the underlying backends.
Fixes#10298
per_message_ttl test woudl publish a message with a short ttl
then assert on info counters. On a slow system it is possible
that the message expires before the test could observe the counter
change.
[Why]
Before `rabbitmq_prelaunch` was moved from `deps/rabbit/apps` to `deps`,
it would inherit compile flags from `deps/rabbit`. Therefore, when
`rabbit` was tested, `rabbit_logger_std_h` simply replaced the calls to
`io:put_chars/2` with an internal macro to also call `ct:log/2`.
This is not possible anymore after the move and the move broke the
console-based testcases.
[How]
`rabbit_logger_std_h` now uses an indirect internal call to a wrapper of
`io:put_chars/2`. This allows the `logging_SUITE` to mock that call and
add the addition call to `ct:log/2`.
We need to do an explicit `?MODULE:io_put_chars/2` even though a local
call would work, otherwise meck can't intercept the calls.
* Reduce per message disk overhead
Message container annotation keys are stored on disk.
By shortening them we save 95 - 58 = 37 bytes per message.
```
1> byte_size(term_to_binary(#{exchange => <<>>, routing_keys => [<<"my-key">>], durable => true, priority => 3, timestamp => 1000})).
95
2> byte_size(term_to_binary(#{x => <<>>, rk => [<<"my-key">>], d => true, p => 3, ts => 1000})).
58
```
This should somewhat reduce disk I/O and disk space.
* Ensure durable is a boolean
Prevent key 'durable' with value 'undefined' being added to the
mc annotations, for example when the durable field was not set, but
another AMQP 1.0 header field was set.
* Apply feedback
Ram nodes are a deprecated feature and the actual assertion is
quite a complicated once that isn't easy to reason about as it
asserts on the cluster view of nodes that that have their
rabbit app stopped.
This revisits the information system conversion,
that is, support for suffixes like GiB, GB.
When configuration values like disk_free_limit.absolute,
vm_memory_high_watermark.absolute are set, the value
can contain an information unit (IU) suffix.
We now support several new suffixes and the meaning
a few more changes.
First, the changes:
* k, K now mean kilobytes and not kibibytes
* m, M now mean megabytes and not mebibytes
* g, G now means gigabytes and not gibibytes
This is to match the system used by Kubernetes.
There is no consensus in the industry about how
"k", "m", "g", and similar single letter suffixes
should be treated. Previously it was a power of 2,
now a power of 10 to align with a very popular OSS
project that explicitly documents what suffixes it supports.
Now, the additions:
Finally, the node will now validate these suffixes
at boot time, so an unsupported value will cause
the node to stop with a rabbitmq.conf validation
error.
The message logged will look like this:
````
2024-01-15 22:11:17.829272-05:00 [error] <0.164.0> disk_free_limit.absolute invalid, supported formats: 500MB, 500MiB, 10GB, 10GiB, 2TB, 2TiB, 10000000000
2024-01-15 22:11:17.829376-05:00 [error] <0.164.0> Error preparing configuration in phase validation:
2024-01-15 22:11:17.829387-05:00 [error] <0.164.0> - disk_free_limit.absolute invalid, supported formats: 500MB, 500MiB, 10GB, 10GiB, 2TB, 2TiB, 10000000000
````
Closes#10310
Scan queues, exchanges and bindings before attempting
to import anything on boot. If they miss the virtual
host field, fail early and log a sensible message.
[Why]
If a node joins the selected node but the selected node's DB layer is
not ready, it will fail and the whole peer discovery process will
restart (until the selected node is ready).
That's fine, but scary messages are logged for a situation that is not
really an actual error at this point.
[How]
While querying properties of all discovered nodes, we also check is the
DB layer is ready using `rabbit_db:is_init_finished/0`. We then use this
property to determine if we can try to join or if we should wait and
retry.
This avoids a join which we know will fail eventually, and thus error
messages.
[Why]
This work started as an effort to add peer discovery support to our
Khepri integration. Indeed, as part of the task to integrate Khepri, we
missed the fact that `rabbit_peer_discovery:maybe_create_cluster/1` was
called from the Mnesia-specific code only. Even though we knew about it
because we hit many issues caused by the fact the `join_cluster` and
peer discovery use different code path to create a cluster.
To add support for Khepri, the first version of this patch was to move
the call to `rabbit_peer_discovery:maybe_create_cluster/1` from
`rabbit_db_cluster` instead of `rabbit_mnesia`. To achieve that, it made
sense to unify the code and simply call `rabbit_db_cluster:join/2`
instead of duplicating the work.
Unfortunately, doing so highlighted another issue: the way the node to
cluster with was selected. Indeed, it could cause situations where
multiple clusters are created instead of one, without resorting to
out-of-band counter-measures, like a 30-second delay added in the
Kubernetes operator (rabbitmq/cluster-operator#1156). This problem was
even more frequent when we tried to unify the code path and call
`join_cluster`.
After several iterations on the patch and even more discussions with the
team, we decided to rewrite the algorithm to make node selection more
robust and still use `rabbit_db_cluster:join/2` to create the cluster.
[How]
This commit is only about the rewrite of the algorithm. Calling peer
discovery from `rabbit_db_cluster` instead of `rabbit_mnesia` (and thus
making peer discovery work with Khepri) will be done in a follow-up
commit.
We wanted the new algorithm to fulfill the following properties:
1. `rabbit_peer_discovery` should provide the ability to re-trigger it
easily to re-evaluate the cluster. The new public API is
`rabbit_peer_discovery:sync_desired_cluster/0`.
2. The selection of the node to join should be designed in a way that
all nodes select the same, regardless of the order in which they
become available. The adopted solution is to sort the list of
discovered nodes with the following criterias (in that order):
1. the size of the cluster a discovered node is part of; sorted from
bigger to smaller clusters
2. the start time of a discovered node; sorted from older to younger
nodes
3. the name of a discovered node; sorted alphabetically
The first node in that list will not join anyone and simply proceed
with its boot process. Other nodes will try to join the first node.
3. To reduce the chance of incorrectly having multiple standalone nodes
because the discovery backend returned only a single node, we want to
apply the following constraints to the list of nodes after it is
filtered and sorted (see property 2 above):
* The list must contain `node()` (i.e. the node running peer
discovery itself).
* If the RabbitMQ's cluster size hint is greater than 1, the list
must have at least two nodes. The cluster size hint is the maximum
between the configured target cluster size hint and the number of
elements in the nodes list returned by the backend.
If one of the constraint is not met, the entire peer discovery
process is restarted after a delay.
4. The lock is acquired only to protect the actual join, not the
discovery step where the backend is queried to get the list of peers.
With the node selection described above, this will let the first node
to start without acquiring the lock.
5. The cluster membership views queried as part of the algorithm to sort
the list of nodes will be used to detect additional clusters or
standalone nodes that did not cluster correctly. These nodes will be
asked to re-evaluate peer discovery to increase the chance of forming
a single cluster.
6. After some delay, peer discovery will be re-evaluated to further
eliminate the chances of having multiple clusters instead of one.
This commit covers properties from point 1 to point 4. Remaining
properties will be the scope of additional pull requests after this one
works.
If there is a failure at any point during discovery, filtering/sorting,
locking or joining, the entire process is restarted after a delay. This
is configured using the following parameters:
* cluster_formation.discovery_retry_limit
* cluster_formation.discovery_retry_interval
The default parameters were bumped to 30 retries with a delay of 1
second between each.
The locking retries/interval parameters are not used by the new
algorithm anymore.
There are extra minor changes that come with the rewrite:
* The configured backend is cached in a persistent term. The goal is to
make sure we use the same backend throughout the entire process and
when we call `maybe_unregister/0` even if the configuration changed
for whatever reason in between.
* `maybe_register/0` is called from `rabbit_db_cluster` instead of at
the end of a successful peer discovery process. `rabbit_db_cluster`
had to call `maybe_register/0` if the node was not virgin anyway. So
make it simpler and always call it in `rabbit_db_cluster` regardless
of the state of the node.
* `log_configured_backend/0` is gone. `maybe_init/0` can log the backend
directly. There is no need to explicitly call another function for
that.
* Messages are logged using `?LOG_*()` macros instead of the old
`rabbit_log` module.
[Why]
The testcase was broken as part of the work on Khepri (#7206): all nodes
were started, making it an equivalent of the `successful_discovery`
testcase.
[How]
We drop the first entry in the list of nodes given to
`rabbit_ct_broker_helpers`. This way, it won't be started at all while
still being listed in the classic config parameter.
If a quorum queue is declared whilst one or more selected nodes
are down the nodes were not started with the correct config.
This change addresses that as well as adding one more parameter to
the mutable config passed to `ra:restart_server/2`
To refine conversion behaviour add additional tests
and ensure it matches the documentation.
mc: optionally capture source environment
And pass target environment to mc:convert
This allows environmental data and configuration to be captured and
used to modify and complete conversion logic whilst allowing conversion
code to remain pure and portable.
[Why]
Up until now, a user had to run the following three commands to expand a
cluster:
1. stop_app
2. join_cluster
3. start_app
Stopping and starting the `rabbit` application and taking care of the
underlying Mnesia application could be handled by `join_cluster`
directly.
[How]
After the call to `can_join/1` and before proceeding with the actual
join, the code remembers the state of `rabbit`, the Feature flags
controler and Mnesia.
After the join, it restarts whatever needs to be restarted to. It does
so regardless of the success or failure of the join. One exception is
when the node switched from Mnesia to Khepri as part of that join. In
this case, Mnesia is left stopped.
[Why]
When a Khepri-based node joins a Mnesia-based cluster, it is reset and
switches back from Khepri to Mnesia. If there are Mnesia files left in
its data directory, Mnesia will restart with stale/incorrect data and
the operation will fail.
After a migration to Khepri, we need to make sure there is no stale
Mnesia files.
[How]
We use `rabbit_mnesia` to query the Mnesia files and delete them.
Currently these are not allowed for use with stream queues
which is a bit too strict. Some client impl will automatically
nack or reject messages that are pending when an application
requests to stop consuming. Treating all message outcomes the same
makes as much sense as not to.
Because both `add_member` and `grow` default to Membership status `promotable`,
new members will have to catch up before they are considered cluster members.
This can be overridden with either `voter` or (permanent `non_voter` statuses.
The latter one is useless without additional tooling so kept undocumented.
- non-voters do not affect quorum size for election purposes
- `observer_cli` reports their status with lowercase 'f'
- `rabbitmq-queues check_if_node_is_quorum_critical` takes voter status into
account
[Why]
Mnesia is a very powerful and convenient tool for Erlang applications:
it is a persistent disc-based database, it handles replication accross
multiple Erlang nodes and it is available out-of-the-box from the
Erlang/OTP distribution. RabbitMQ relies on Mnesia to manage all its
metadata:
* virtual hosts' properties
* intenal users
* queue, exchange and binding declarations (not queues data)
* runtime parameters and policies
* ...
Unfortunately Mnesia makes it difficult to handle network partition and,
as a consequence, the merge conflicts between Erlang nodes once the
network partition is resolved. RabbitMQ provides several partition
handling strategies but they are not bullet-proof. Users still hit
situations where it is a pain to repair a cluster following a network
partition.
[How]
@kjnilsson created Ra [1], a Raft consensus library that RabbitMQ
already uses successfully to implement quorum queues and streams for
instance. Those queues do not suffer from network partitions.
We created Khepri [2], a new persistent and replicated database engine
based on Ra and we want to use it in place of Mnesia in RabbitMQ to
solve the problems with network partitions.
This patch integrates Khepri as an experimental feature. When enabled,
RabbitMQ will store all its metadata in Khepri instead of Mnesia.
This change comes with behavior changes. While Khepri remains disabled,
you should see no changes to the behavior of RabbitMQ. If there are
changes, it is a bug. After Khepri is enabled, there are significant
changes of behavior that you should be aware of.
Because it is based on the Raft consensus algorithm, when there is a
network partition, only the cluster members that are in the partition
with at least `(Number of nodes in the cluster ÷ 2) + 1` number of nodes
can "make progress". In other words, only those nodes may write to the
Khepri database and read from the database and expect a consistent
result.
For instance in a cluster of 5 RabbitMQ nodes:
* If there are two partitions, one with 3 nodes, one with 2 nodes, only
the group of 3 nodes will be able to write to the database.
* If there are three partitions, two with 2 nodes, one with 1 node, none
of the group can write to the database.
Because the Khepri database will be used for all kind of metadata, it
means that RabbitMQ nodes that can't write to the database will be
unable to perform some operations. A list of operations and what to
expect is documented in the associated pull request and the RabbitMQ
website.
This requirement from Raft also affects the startup of RabbitMQ nodes in
a cluster. Indeed, at least a quorum number of nodes must be started at
once to allow nodes to become ready.
To enable Khepri, you need to enable the `khepri_db` feature flag:
rabbitmqctl enable_feature_flag khepri_db
When the `khepri_db` feature flag is enabled, the migration code
performs the following two tasks:
1. It synchronizes the Khepri cluster membership from the Mnesia
cluster. It uses `mnesia_to_khepri:sync_cluster_membership/1` from
the `khepri_mnesia_migration` application [3].
2. It copies data from relevant Mnesia tables to Khepri, doing some
conversion if necessary on the way. Again, it uses
`mnesia_to_khepri:copy_tables/4` from `khepri_mnesia_migration` to do
it.
This can be performed on a running standalone RabbitMQ node or cluster.
Data will be migrated from Mnesia to Khepri without any service
interruption. Note that during the migration, the performance may
decrease and the memory footprint may go up.
Because this feature flag is considered experimental, it is not enabled
by default even on a brand new RabbitMQ deployment.
More about the implementation details below:
In the past months, all accesses to Mnesia were isolated in a collection
of `rabbit_db*` modules. This is where the integration of Khepri mostly
takes place: we use a function called `rabbit_khepri:handle_fallback/1`
which selects the database and perform the query or the transaction.
Here is an example from `rabbit_db_vhost`:
* Up until RabbitMQ 3.12.x:
get(VHostName) when is_binary(VHostName) ->
get_in_mnesia(VHostName).
* Starting with RabbitMQ 3.13.0:
get(VHostName) when is_binary(VHostName) ->
rabbit_khepri:handle_fallback(
#{mnesia => fun() -> get_in_mnesia(VHostName) end,
khepri => fun() -> get_in_khepri(VHostName) end}).
This `rabbit_khepri:handle_fallback/1` function relies on two things:
1. the fact that the `khepri_db` feature flag is enabled, in which case
it always executes the Khepri-based variant.
4. the ability or not to read and write to Mnesia tables otherwise.
Before the feature flag is enabled, or during the migration, the
function will try to execute the Mnesia-based variant. If it succeeds,
then it returns the result. If it fails because one or more Mnesia
tables can't be used, it restarts from scratch: it means the feature
flag is being enabled and depending on the outcome, either the
Mnesia-based variant will succeed (the feature flag couldn't be enabled)
or the feature flag will be marked as enabled and it will call the
Khepri-based variant. The meat of this function really lives in the
`khepri_mnesia_migration` application [3] and
`rabbit_khepri:handle_fallback/1` is a wrapper on top of it that knows
about the feature flag.
However, some calls to the database do not depend on the existence of
Mnesia tables, such as functions where we need to learn about the
members of a cluster. For those, we can't rely on exceptions from
Mnesia. Therefore, we just look at the state of the feature flag to
determine which database to use. There are two situations though:
* Sometimes, we need the feature flag state query to block because the
function interested in it can't return a valid answer during the
migration. Here is an example:
case rabbit_khepri:is_enabled(RemoteNode) of
true -> can_join_using_khepri(RemoteNode);
false -> can_join_using_mnesia(RemoteNode)
end
* Sometimes, we need the feature flag state query to NOT block (for
instance because it would cause a deadlock). Here is an example:
case rabbit_khepri:get_feature_state() of
enabled -> members_using_khepri();
_ -> members_using_mnesia()
end
Direct accesses to Mnesia still exists. They are limited to code that is
specific to Mnesia such as classic queue mirroring or network partitions
handling strategies.
Now, to discover the Mnesia tables to migrate and how to migrate them,
we use an Erlang module attribute called
`rabbit_mnesia_tables_to_khepri_db` which indicates a list of Mnesia
tables and an associated converter module. Here is an example in the
`rabbitmq_recent_history_exchange` plugin:
-rabbit_mnesia_tables_to_khepri_db(
[{?RH_TABLE, rabbit_db_rh_exchange_m2k_converter}]).
The converter module — `rabbit_db_rh_exchange_m2k_converter` in this
example — is is fact a "sub" converter module called but
`rabbit_db_m2k_converter`. See the documentation of a `mnesia_to_khepri`
converter module to learn more about these modules.
[1] https://github.com/rabbitmq/ra
[2] https://github.com/rabbitmq/khepri
[3] https://github.com/rabbitmq/khepri_mnesia_migration
See #7206.
Co-authored-by: Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com>
Co-authored-by: Diana Parra Corbacho <dparracorbac@vmware.com>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
This includes a new ra:key_metrics/1 API that is more available
than parsing the output of sys:get_status/1.
the rabbit_quorum_queue:status/1 function has been ported to use
this API instead as well as now inludes a few new fields.
it was named by copying and pasting an adjacent
one that indeed had to do with queue type-specific
policies but "version-specific" policies is not
something RabbitMQ supports
References #9547#9541
Just valid policies are effectively applied on each queue type,
but they need to be added to 'unsupported-capabilities' to be
excluded from the queue info.
[Why]
The CLI is only compatible with the version of RabbitMQ it is shipped
with. It does not pretend to be backward- or forward-compatible with
other versions.
Unfortunately, `rabbit_control_helper` always use the CLI's module from
the first RabbitMQ node and is executed against any nodes in a testcase.
This may break for the reason described above.
[How]
There is no reason to fix `rabbit_control_helper`, we just need to
switch to the initial way of using the CLI,
`rabbit_ct_broker_helper:rabbitmqctl()`. This one was already fixed to
use the appropriate copy of the CLI.
This patch only fixes `clustering_management_SUITE` and
`rabbitmq_4_0_deprecations_SUITE`. The former because it broke because
of this, the latter as a liow hanging fruit.
Following up on failures detected by Java project test
suites after the merge of the message container PR.
These tests are ported to Erlang in the broker test suite.
Fixes#9371
Since each AMQP 1.0 connection opens several direct AMQP connections, we
must assign each direct connection a unique name to prevent multiple
entries in the `connection_created_stats` table.
Also, use `pg_local` to track AMQP 1.0 connections instead of walking
the supervisor trees.
Nuke authz_backends from connection created event 💥
Fix regex for connection name because UniqueId is part of it now (channel number)
* Translate AMQP 0.9.1 CC headers to AMQP 1.0 x-cc
Translate AMQP 0.9.1 CC headers to AMQP 1.0 x-cc message annotations.
We want CC headers to be kept an AMQP legacy feature and therefore
special case its conversion to AMQP 1.0.
* Translate x-cc from 1.0 message annotation to 091 CC header
for the case where you publish via 091 with CC to a stream and consume
via 091 from a stream in which case the 091 consuming client would like
to know the original CC headers.
* AMQP encoded bodies should be converted to amqp correctly
Fix for AMQP encoded amqpl payloads.
Also removing some headers added during amqpl->amqpl conversions that
duplicate information in the amqp header.
* we should not need to prepre for read toset annotations
* fix tagged_prop() type spec
* tagged_prop() -> tagged_value()
As the unit_access_control_SUITE topic test is the only testcase
that covers topic routing, it makes sense to extract it and run
it as a standalone test suite. It eases the development and testing
of topic routing features.
This PR implements an approach for a "protocol (data format) agnostic core" where the format of the message isn't converted at point of reception.
Currently all non AMQP 0.9.1 originating messages are converted into a AMQP 0.9.1 flavoured basic_message record before sent to a queue. If the messages are then consumed by the originating protocol they are converted back from AMQP 0.9.1. For some protocols such as MQTT 3.1 this isn't too expensive as MQTT is mostly a fairly easily mapped subset of AMQP 0.9.1 but for others such as AMQP 1.0 the conversions are awkward and in some cases lossy even if consuming from the originating protocol.
This PR instead wraps all incoming messages in their originating form into a generic, extensible message container type (mc). The container module exposes an API to get common message details such as size and various properties (ttl, priority etc) directly from the source data type. Each protocol needs to implement the mc behaviour such that when a message originating form one protocol is consumed by another protocol we convert it to the target protocol at that point.
The message container also contains annotations, dead letter records and other meta data we need to record during the lifetime of a message. The original protocol message is never modified unless it is consumed.
This includes conversion modules to and from amqp, amqpl (AMQP 0.9.1) and mqtt.
COMMIT HISTORY:
* Refactor away from using the delivery{} record
In many places including exchange types. This should make it
easier to move towards using a message container type instead of
basic_message.
Add mc module and move direct replies outside of exchange
Lots of changes incl classic queues
Implement stream support incl amqp conversions
simplify mc state record
move mc.erl
mc dlx stuff
recent history exchange
Make tracking work
But doesn't take a protocol agnostic approach as we just convert
everything into AMQP legacy and back. Might be good enough for now.
Tracing as a whole may want a bit of a re-vamp at some point.
tidy
make quorum queue peek work by legacy conversion
dead lettering fixes
dead lettering fixes
CMQ fixes
rabbit_trace type fixes
fixes
fix
Fix classic queue props
test assertion fix
feature flag and backwards compat
Enable message_container feature flag in some SUITEs
Dialyzer fixes
fixes
fix
test fixes
Various
Manually update a gazelle generated file
until a gazelle enhancement can be made
https://github.com/rabbitmq/rules_erlang/issues/185
Add message_containers_SUITE to bazel
and regen bazel files with gazelle from rules_erlang@main
Simplify essential proprty access
Such as durable, ttl and priority by extracting them into annotations
at message container init time.
Move type
to remove dependenc on amqp10 stuff in mc.erl
mostly because I don't know how to make bazel do the right thing
add more stuff
Refine routing header stuff
wip
Cosmetics
Do not use "maybe" as type name as "maybe" is a keyword since OTP 25
which makes Erlang LS complain.
* Dedup death queue names
* Fix function clause crashes
Fix failing tests in the MQTT shared_SUITE:
A classic queue message ID can be undefined as set in
fbe79ff47b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl (L1048)
Fix failing tests in the MQTT shared_SUITE-mixed:
When feature flag message_containers is disabled, the
message is not an #mc{} record, but a #basic_message{} record.
* Fix is_utf8_no_null crash
Prior to this commit, the function crashed if invalid UTF-8 was
provided, e.g.:
```
1> rabbit_misc:is_valid_shortstr(<<"😇"/utf16>>).
** exception error: no function clause matching rabbit_misc:is_utf8_no_null(<<216,61,222,7>>) (rabbit_misc.erl, line 1481)
```
* Implement mqtt mc behaviour
For now via amqp translation.
This is still work in progress, but the following SUITEs pass:
```
make -C deps/rabbitmq_mqtt ct-shared t=[mqtt,v5,cluster_size_1] FULL=1
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_1] FULL=1
```
* Shorten mc file names
Module name length matters because for each persistent message the #mc{}
record is persisted to disk.
```
1> iolist_size(term_to_iovec({mc, rabbit_mc_amqp_legacy})).
30
2> iolist_size(term_to_iovec({mc, mc_amqpl})).
17
```
This commit renames the mc modules:
```
ag -l rabbit_mc_amqp_legacy | xargs sed -i 's/rabbit_mc_amqp_legacy/mc_amqpl/g'
ag -l rabbit_mc_amqp | xargs sed -i 's/rabbit_mc_amqp/mc_amqp/g'
ag -l rabbit_mqtt_mc | xargs sed -i 's/rabbit_mqtt_mc/mc_mqtt/g'
```
* mc: make deaths an annotation + fixes
* Fix mc_mqtt protocol_state callback
* Fix test will_delay_node_restart
```
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_3]:will_delay_node_restart FULL=1
```
* Bazel run gazelle
* mix format rabbitmqctl.ex
* Ensure ttl annotation is refelected in amqp legacy protocol state
* Fix id access in message store
* Fix rabbit_message_interceptor_SUITE
* dializer fixes
* Fix rabbit:rabbit_message_interceptor_SUITE-mixed
set_annotation/3 should not result in duplicate keys
* Fix MQTT shared_SUITE-mixed
Up to 3.12 non-MQTT publishes were always QoS 1 regardless of delivery_mode.
75a953ce28/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl (L2075-L2076)
From now on, non-MQTT publishes are QoS 1 if durable.
This makes more sense.
The MQTT plugin must send a #basic_message{} to an old node that does
not understand message containers.
* Field content of 'v1_0.data' can be binary
Fix
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed \
--test_env FOCUS="-group [mqtt,v4,cluster_size_1] -case trace" \
-t- --test_sharding_strategy=disabled
```
* Remove route/2 and implement route/3 for all exchange types.
This removes the route/2 callback from rabbit_exchange_type and
makes route/3 mandatory instead. This is a breaking change and
will require all implementations of exchange types to update their
code, however this is necessary anyway for them to correctly handle
the mc type.
stream filtering fixes
* Translate directly from MQTT to AMQP 0.9.1
* handle undecoded properties in mc_compat
amqpl: put clause in right order
recover death deatails from amqp data
* Replace callback init_amqp with convert_from
* Fix return value of lists:keyfind/3
* Translate directly from AMQP 0.9.1 to MQTT
* Fix MQTT payload size
MQTT payload can be a list when converted from AMQP 0.9.1 for example
First conversions tests
Plus some other conversion related fixes.
bazel
bazel
translate amqp 1.0 null to undefined
mc: property/2 and correlation_id/message_id return type tagged values.
To ensure we can support a variety of types better.
The type type tags are AMQP 1.0 flavoured.
fix death recovery
mc_mqtt: impl new api
Add callbacks to allow protocols to compact data before storage
And make readable if needing to query things repeatedly.
bazel fix
* more decoding
* tracking mixed versions compat
* mc: flip default of `durable` annotation to save some data.
Assuming most messages are durable and that in memory messages suffer less
from persistence overhead it makes sense for a non existent `durable`
annotation to mean durable=true.
* mc conversion tests and tidy up
* mc make x_header unstrict again
* amqpl: death record fixes
* bazel
* amqp -> amqpl conversion test
* Fix crash in mc_amqp:size/1
Body can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0/ ct-system t=java
```
on branch native-amqp.
* Fix crash in lists:flatten/1
Data can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp.
* Fix crash in rabbit_writer
Running test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp resulted in the following crash:
```
crasher:
initial call: rabbit_writer:enter_mainloop/2
pid: <0.711.0>
registered_name: []
exception error: bad argument
in function size/1
called as size([<<0>>,<<"Sw">>,[<<160,2>>,<<"hi">>]])
*** argument 1: not tuple or binary
in call from rabbit_binary_generator:build_content_frames/7 (rabbit_binary_generator.erl, line 89)
in call from rabbit_binary_generator:build_simple_content_frames/4 (rabbit_binary_generator.erl, line 61)
in call from rabbit_writer:assemble_frames/5 (rabbit_writer.erl, line 334)
in call from rabbit_writer:internal_send_command_async/3 (rabbit_writer.erl, line 365)
in call from rabbit_writer:handle_message/2 (rabbit_writer.erl, line 265)
in call from rabbit_writer:handle_message/3 (rabbit_writer.erl, line 232)
in call from rabbit_writer:mainloop1/2 (rabbit_writer.erl, line 223)
```
because #content.payload_fragments_rev is currently supposed to
be a flat list of binaries instead of being an iolist.
This commit fixes this crash inefficiently by calling
iolist_to_binary/1. A better solution would be to allow AMQP legacy's #content.payload_fragments_rev
to be an iolist.
* Add accidentally deleted line back
* mc: optimise mc_amqp internal format
By removint the outer records for message and delivery annotations
as well as application properties and footers.
* mc: optimis mc_amqp map_add by using upsert
* mc: refactoring and bug fixes
* mc_SUITE routingheader assertions
* mc remove serialize/1 callback as only used by amqp
* mc_amqp: avoid returning a nested list from protocol_state
* test and bug fix
* move infer_type to mc_util
* mc fixes and additiona assertions
* Support headers exchange routing for MQTT messages
When a headers exchange is bound to the MQTT topic exchange, routing
will be performend based on both MQTT topic (by the topic exchange) and
MQTT User Property (by the headers exchange).
This combines the best worlds of both MQTT 5.0 and AMQP 0.9.1 and
enables powerful routing topologies.
When the User Property contains the same name multiple times, only the
last name (and value) will be considered by the headers exchange.
* Fix crash when sending from stream to amqpl
When publishing a message via the stream protocol and consuming it via
AMQP 0.9.1, the following crash occurred prior to this commit:
```
crasher:
initial call: rabbit_channel:init/1
pid: <0.818.0>
registered_name: []
exception exit: {{badmatch,undefined},
[{rabbit_channel,handle_deliver0,4,
[{file,"rabbit_channel.erl"},
{line,2728}]},
{lists,foldl,3,[{file,"lists.erl"},{line,1594}]},
{rabbit_channel,handle_cast,2,
[{file,"rabbit_channel.erl"},
{line,728}]},
{gen_server2,handle_msg,2,
[{file,"gen_server2.erl"},{line,1056}]},
{proc_lib,wake_up,3,
[{file,"proc_lib.erl"},{line,251}]}]}
```
This commit first gives `mc:init/3` the chance to set exchange and
routing_keys annotations.
If not set, `rabbit_stream_queue` will set these annotations assuming
the message was originally published via the stream protocol.
* Support consistent hash exchange routing for MQTT 5.0
When a consistent hash exchange is bound to the MQTT topic exchange,
MQTT 5.0 messages can be routed to queues consistently based on the
Correlation-Data in the PUBLISH packet.
* Convert MQTT 5.0 User Property
* to AMQP 0.9.1 headers
* from AMQP 0.9.1 headers
* to AMQP 1.0 application properties and message annotations
* from AMQP 1.0 application properties and message annotations
* Make use of Annotations in mc_mqtt:protocol_state/2
mc_mqtt:protocol_state/2 includes Annotations as parameter.
It's cleaner to make use of these Annotations when computing the
protocol state instead of relying on the caller (rabbitmq_mqtt_processor)
to compute the protocol state.
* Enforce AMQP 0.9.1 field name length limit
The AMQP 0.9.1 spec prohibits field names longer than 128 characters.
Therefore, when converting AMQP 1.0 message annotations, application
properties or MQTT 5.0 User Property to AMQP 0.9.1 headers, drop any
names longer than 128 characters.
* Fix type specs
Apply feedback from Michael Davis
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
* Add mc_mqtt unit test suite
Implement mc_mqtt:x_header/2
* Translate indicator that payload is UTF-8 encoded
when converting between MQTT 5.0 and AMQP 1.0
* Translate single amqp-value section from AMQP 1.0 to MQTT
Convert to a text representation, if possible, and indicate to MQTT
client that the payload is UTF-8 encoded. This way, the MQTT client will
be able to parse the payload.
If conversion to text representation is not possible, encode the payload
using the AMQP 1.0 type system and indiate the encoding via Content-Type
message/vnd.rabbitmq.amqp.
This Content-Type is not registered.
Type "message" makes sense since it's a message.
Vendor tree "vnd.rabbitmq.amqp" makes sense since merely subtype "amqp" is not
registered.
* Fix payload conversion
* Translate Response Topic between MQTT and AMQP
Translate MQTT 5.0 Response Topic to AMQP 1.0 reply-to address and vice
versa.
The Response Topic must be a UTF-8 encoded string.
This commit re-uses the already defined RabbitMQ target addresses:
```
"/topic/" RK Publish to amq.topic with routing key RK
"/exchange/" X "/" RK Publish to exchange X with routing key RK
```
By default, the MQTT topic exchange is configure dto be amq.topic using
the 1st target address.
When an operator modifies the mqtt.exchange, the 2nd target address is
used.
* Apply PR feedback
and fix formatting
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
* tidy up
* Add MQTT message_containers test
* consistent hash exchange: avoid amqp legacy conversion
When hashing on a header value.
* Avoid converting to amqp legacy when using exchange federation
* Fix test flake
* test and dialyzer fixes
* dialyzer fix
* Add MQTT protocol interoperability tests
Test receiving from and sending to MQTT 5.0 and
* AMQP 0.9.1
* AMQP 1.0
* STOMP
* Streams
* Regenerate portions of deps/rabbit/app.bzl with gazelle
I'm not exactly sure how this happened, but gazell seems to have been
run with an older version of the rules_erlang gazelle extension at
some point. This caused generation of a structure that is no longer
used. This commit updates the structure to the current pattern.
* mc: refactoring
* mc_amqpl: handle delivery annotations
Just in case they are included.
Also use iolist_to_iovec to create flat list of binaries when
converting from amqp with amqp encoded payload.
---------
Co-authored-by: David Ansari <david.ansari@gmx.de>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
Co-authored-by: Rin Kuryloski <kuryloskip@vmware.com>
[Why]
The `enable` callback is executed on each node of the cluster. It could
succeed on some of them and fail on other nodes. If it succeeds
everywhere, the controller could still fail to mark the feature flag as
enabled on some of the nodes.
When this happens, we correctly mark the feature flag back as disabled
everywhere. However, the controller never gave a chance to the feature
flag callbacks to roll back anything.
[How]
Now, the controller always runs the `post_enable` callback (if any)
after it ran the `enable` callback. It adds the following field to the
passed map of arguments to indicate if the feature flag was enabled or
not:
#{enabled => boolean()}
While here, fix two things:
1. One call to `restore_feature_flag_state()` was passed an older
"version" of the inventory, instead of the latest modified one.
2. One log message had no domain set.
[Why]
The feature flags controller ensures all nodes in a cluster are running
before a feature flag can be enabled. It continues to do so whenever it
wants to record a state change because it requires that all nodes get
the new state otherwise the task in aborted.
However, it's difficult to verify that through out the entire process if
the feature flag has an `enable` callback. But again, if we loose a node
during the execution of the callback or between its execution and the
time we mark the feature flag as enabled on all nodes, that's ok because
the feature flag will be marked as disabled everywhere: the remaining
running nodes will go back from `state_changing` to `false` and the
stopped nodes will keep their initial state of `false`.
Nonetheless, we can increase the chance of letting an `enable` operation
to finish if the controller waits for anything in-flight before is
actually exits.
[How]
The `terminate/3` function now tries to register globally, like if the
controller wanted to lock the cluster and run a task. If it succeeds to
register, it means nothing is running in parallel and it can exit. If it
fails, it waits for the globally-registered controller to finish and
tries to register again.
We expose a new `wait_for_task_and_stop/0` function to explicitly stop
the feature flags controller and call it from the `rabbit` application
pre-stop phase. The reason is that when the supervisor asks the
controller to stop as part of the regular shutdown of a supervision
tree, it has a timeout and could kill the controller if an in-flight
operation takes too much time. To avoid this kill, we prefer to use
`wait_for_task_and_stop/0` which has no timeout.
When subscribing using a consumer tag that is already in the quorum
queues state (but perhaps with a cancelled status) and that has
pending messages the next_msg_id which is used to initialise the
queue type consumer state did not take the in flight message ids into
account. This resulted in some messages occasionally not being delivered
to the clint and thus would appear stuck as awaiting acknowledgement
for the consumer.
When a new checkout operation detects there are in-flight messages
we set the last_msg_id to `undefined` and just accept the next message
that arrives, irrespective of their message id. This isn't 100% fool proof
as there may be cases where messages are lost between queue and channel
where we'd miss to trigger the fallback query for missing messages.
It is however much better than what we have atm.
NB: really the ideal solution would be to make checkout operations
async so that any inflight messages are delivered before the checkout
result. That is a much bigger change for another day.
[Why]
They are deprecated. Currently, we simply got a warning in the logs but
in a few minor versions, the testcase will start to fail because it
may not be able to declare a queue.
[Why]
We were running the check to make sure the exchange was declared, but we
didn't verify the result of that check. The testcase would still fail
later but if we verify its existence early, the testcase can fail early
too.
since MQTT 5.0 supports negative acknowledgements thanks to reason codes
in the PUBACK packet.
We could either choose reason code 128 or 131. The description code for
131 applies for rejected messages, hence this commit uses 131:
> The PUBLISH is valid but the receiver is not willing to accept it.
[Why]
The testcase used to set the `cluster_formation` proplist twice. It is
very ambiguous what we should do: is only one of them relevant or should
they be merged?
[How]
We merge both proplists into a single one.
[Why]
The previous detection was based on a reuse of the channel to get the
error from an exit exception. The problem is that it is very dependent
on the timing: if the channel process exits before it is reused, the
test fails for two possible reasons:
1. The channel and connection processes exit before they are reused and
the channel manager opens a new pair. The problem is that the declare
suceeds but the test expected a failure.
2. The channel and connection processes exit during the reuse and
`rabbit_ct_client_helpers:open_channel` in
`retry_if_coordinator_unavailable()` waits a response from the
channel manager forever (this is probably a weakness of the channel
manager in rabbitmq_ct_client_helpers). This indefinite wait causes
the testcase to timeout.
[How]
A simpler solution is to monitor the exit reason of the channel process
that triggers the error on the server side.
[Why]
We don't record the state of deprecated features because it is
controlled from configuration and they can be disabled (the deprecated
feature can be turned back on) if the deprecated feature allows it.
However, some feature flags may depend on deprecated features. If those
feature flags are enabled, we need to enable the deprecated features
(turn off the deprecated features) they depend on regardless of the
configuration.
[How]
During the (re)initialization of the registry, we go through all enabled
feature flags and deprecated features' `depends_on` declarations and
consider all their dependencies to be implicitly enabled.
[Why]
A database reset removes the enabled feature flags file on disc. A reset
of the registry ensures that the next time the registry is reloaded, it
is also initialized from scratch.
[How]
We call `rabbit_feature_flags:reset_registry/0` after both a regular
reset and a forced reset.
The `reset_registry/0` is also exposed by the `rabbit_feature_flags`
module now. The actual implementation in `rabbit_ff_registry_factory`
should only be called by the Feature flags subsystem itself.
[Why]
Testcases fail with various system errors in CI, like the inability to
spawn system processes or to open a TCP port.
[How]
We check if the `$RABBITMQ_RUN` environment variable is set. It is only
set by Bazel and not make(1). Based on that, we compute the test group
options to include `parallel` or not.
[Why]
The CLI may be used against a remote node running a different version.
We took that into account in several uses of the `rabbit_db*` modules on
remote nodes, but not everywhere. Likewise in the
`clustering_management_SUITE` testsuite.
[How]
This patch falls back to previous `rabbit_mnesia`-based calls if the
initial calls throws an `undef` exception.
If the target for at least once dead lettering included the source queue
the dead letter outbound queue in the quorum queue would never be cleared.
This changes the queue -> dead letter worker message format to better distinguish
between those and queue events for "normal" queue type interactions.
[Why]
We want the code to depend less on Mnesia (and not at all in the
future). We also want to make room to introduce the use of Khepri.
[How]
For now, we simply store each list in a variable. This give them a name
to better understand what each one is.
`rabbit_mnsia:cluster_nodes(all)` is also replaced by
`rabbit_db_cluster:members()`. The other two calls to `rabbit_mnesia`
are left alone as they are quite specific to Mnesia.
[Why]
Peer discovery is not Mnesia-specific and will be used once we introduce
Khepri.
[How]
The whole peer discovery driving code is moved from `rabbit_mnesia` to
`rabbit_peer_discovery`. When `rabbit_mnesia` calls that code, it simply
passes a callback for the Mnesia-specific cluster expansion code.
[Why]
Now that feature flags compatibility is tested first, before
Mnesia-specific checks, when a peer is not started yet, the feature
flags check lasts the entire timeout, so one minute. This retry
mechanism was added to feature flags in #8411.
Thus, instead of 20 seconds, the testcase takes 10 minutes now (10
retries of one minute each).
[Why]
Transient queues are queues that are removed upon node restart. An
application developer can't rely on such an arbitrary event to reason
about a queue's lifetime.
The only exception are exclusive transient queues which have a lifetime
linked to that of a client connection.
[How]
Non-exclusive transient queues are marked as deprecated in the code
using the Deprecated features subsystem (based on feature flags). See
pull request #7390 for a description of that subsystem.
To test RabbitMQ behavior as if the feature was removed, the following
configuration setting can be used:
deprecated_features.permit.transient_nonexcl_queues = false
Non-exclusive transient queues can be turned off anytime, there are no
conditions to do that.
Once non-exclusive transient queues are turned off, declaring a new
queue with those arguments will be rejected with a protocol error.
Note that given the marketing calendar, the deprecated feature will go
directly from "permitted by default" to "removed" in RabbitMQ 4.0. It
won't go through the gradual deprecation process.
[Why]
Classic queue mirroring will be removed in RabbitMQ 4.0. Quorum queues
provide a better safer alternative. Non-replicated classic queues remain
supported.
[How]
Classic queue mirroring is marked as deprecated in the code using the
Deprecated features subsystem (based on feature flags). See #7390 for a
description of that subsystem.
To test RabbitMQ behavior as if the feature was removed, the following
configuration setting can be used:
deprecated_features.permit.classic_queue_mirroring = false
To turn off classic queue mirroring, there must be no classic mirrored
queues declared and no HA policy defined. A node with classic mirrored
queues will refuse to start if classic queue mirroring is turned off.
Once classic queue mirroring is turned off, users will not be able to
declare HA policies. Trying to do that from the CLI or the management
API will be rejected with a warning in the logs. This impacts clustering
too: a node with classic queue mirroring turned off will only cluster
with another node which has no HA policy or has classic queue mirroring
turned off.
Note that given the marketing calendar, the deprecated feature will go
directly from "permitted by default" to "removed" in RabbitMQ 4.0. It
won't go through the gradual deprecation process.
V2: Renamed the deprecated feature from `classic_mirrored_queues` to
`classic_queue_mirroring` to better reflect the intention. Otherwise
it could be unclear is only the mirroring property is
deprecated/removed or classic queues entirely.
[Why]
RAM nodes provide no safety at all and they lost interest with recent
fast storage solutions.
[How]
RAM nodes are marked as deprecated in the code using the Deprecated
features subsystem (based on feature flags). See pull request #7390 for
a description of that subsystem.
To test RabbitMQ behavior as if the feature was removed, the following
configuration setting can be used:
deprecated_features.permit.ram_node_type = false
RAM nodes can be turned off anytime, there are no conditions to do that.
Once RAM nodes are turned off, an existing node previously created as a
RAM node will change itself to a disc node during boot. If a new node is
added to the cluster using peer discovery or the CLI, it will be as a
disc node and a warning will be logged if the requested node type is
RAM. The `change_cluster_node_type` CLI command will reject a change to
a RAM node with an error.
Note that given the marketing calendar, the deprecated feature will go
directly from "permitted by default" to "removed" in RabbitMQ 4.0. It
won't go through the gradual deprecation process.
[Why]
Global QoS, where a single shared prefetch is used for an entire
channel, is not recommended practice. Per-consumer QoS (non-global)
should be set instead.
[How]
The global QoS setting is marked as deprecated in the code using the
Deprecated features subsystem (based on feature flags). See #7390 for a
description of that subsystem.
To test RabbitMQ behavior as if the feature was removed, the following
configuration setting can be used:
deprecated_features.permit.global_qos = false
Global QoS can be turned off anytime, there are no conditions to do
that.
Once global QoS is turned off, the prefetch setting will always be
considered as non-global (i.e. per-consumer). A warning message will be
logged if the default prefetch setting enables global QoS or anytime a
client requests a global QoS on the channel.
Note that given the marketing calendar, the deprecated feature will go
directly from "permitted by default" to "removed" in RabbitMQ 4.0. It
won't go through the gradual deprecation process.
Hashing the #resource{} record is expensive.
Routing to 40k queues via the topic exchanges takes:
~150ms prior to this commit
~100ms after this commit
As rabbit_exchange already deduplicates destination queues and binding
keys, there's no need to use maps in rabbit_db_topic_exchange or
rabbit_exchange_type_topic.
For MQTT 5.0 destination queues, the topic exchange does not only have
to return the destination queue names, but also the matched binding
keys.
This is needed to implement MQTT 5.0 subscription options No Local,
Retain As Published and Subscription Identifiers.
Prior to this commit, as the trie was walked down, we remembered the
edges being walked and assembled the final binding key with
list_to_binary/1.
list_to_binary/1 is very expensive with long lists (long topic names),
even in OTP 26.
The CPU flame graph showed ~3% of CPU usage was spent only in
list_to_binary/1.
Unfortunately and unnecessarily, the current topic exchange
implementation stores topic levels as lists.
It would be better to store topic levels as binaries:
split_topic_key/1 should ideally use binary:split/3 similar as follows:
```
1> P = binary:compile_pattern(<<".">>).
{bm,#Ref<0.1273071188.1488322568.63736>}
2> Bin = <<"aaa.bbb..ccc">>.
<<"aaa.bbb..ccc">>
3> binary:split(Bin, P, [global]).
[<<"aaa">>,<<"bbb">>,<<>>,<<"ccc">>]
```
The compiled pattern could be placed into persistent term.
This commit decided to avoid migrating Mnesia tables to use binaries
instead of lists. Mnesia migrations are non-trivial, especially with the
current feature flag subsystem.
Furthermore the Mnesia topic tables are already getting migrated to
their Khepri counterparts in 3.13.
Adding additional migration only for Mnesia does not make sense.
So, instead of assembling the binding key as we walk down the trie and
then calling list_to_binary/1 in the leaf, it
would be better to just fetch the binding key from the database in the leaf.
As we reach the leaf of the trie, we know both source and destination.
Unfortunately, we cannot fetch the binding key efficiently with the
current rabbit_route (sorted by source exchange) and
rabbit_reverse_route (sorted by destination) tables as the key is in
the middle between source and destination.
If there are a huge number of bindings for a given sourc exchange (very
realistic in MQTT use cases) or a large number of bindings for a given
destination (also realistic), it would require scanning these large
number of bindings.
Therefore this commit takes the simplest possible solution:
The solution leverages the fact that binding arguments are already part of
table rabbit_topic_trie_binding.
So, if we simply include the binding key into the binding arguments, we
can fetch and return it efficiently in the topic exchange
implementation.
The following patch omitting fetching the empty list binding argument
(the default) makes routing slower because function
`analyze_pattern.constprop.0` requires significantly more (~2.5%) CPU time
```
@@ -273,7 +273,11 @@ trie_bindings(X, Node) ->
node_id = Node,
destination = '$1',
arguments = '$2'}},
- mnesia:select(?MNESIA_BINDING_TABLE, [{MatchHead, [], [{{'$1', '$2'}}]}]).
+ mnesia:select(
+ ?MNESIA_BINDING_TABLE,
+ [{MatchHead, [{'andalso', {'is_list', '$2'}, {'=/=', '$2', []}}], [{{'$1', '$2'}}]},
+ {MatchHead, [], ['$1']}
+ ]).
```
Hence, this commit always fetches the binding arguments.
All MQTT 5.0 destination queues will create a binding that
contains the binding key in the binding arguments.
Not only does this solution avoid expensive list_to_binay/1 calls, but
it also means that Erlang app rabbit (specifically the topic exchange
implementation) does not need to be aware of MQTT anymore:
It just returns the binding key when the binding args tell to do so.
In future, once the Khepri migration completed, we should be able to
relatively simply remove the binding key from the binding arguments
again to free up some storage space.
Note that one of the advantages of a trie data structue is its space
efficiency that you don't have to store the same prefixes multiple
times.
However, for RabbitMQ the binding key is already stored at least N times
in various routing tables, so storing it a few times more via the
binding arguments should be acceptable.
The speed improvements are favoured over a few more MBs ETS usage.
Support subscription options "No Local" and "Retain As Published"
as well as Subscription Identifiers.
All three MQTT 5.0 features can be set on a per subscription basis.
Due to wildcards in topic filters, multiple subscriptions
can match a given topic. Therefore, to implement Retain As Published and
Subscription Identifiers, the destination MQTT connection process needs
to know what subscription(s) caused it to receive the message.
There are a few ways how this could be implemented:
1. The destination MQTT connection process is aware of all its
subscriptions. Whenever, it receives a message, it can match the
message's routing key / topic against all its known topic filters.
However, to iteratively match the routing key against all topic
filters for every received message can become very expensive in the
worst case when the MQTT client creates many subscriptions containing
wildcards. This could be the case for an MQTT client that acts as a
bridge or proxy or dispatcher: It could subscribe via a wildcard for
each of its own clients.
2. Instead of interatively matching the topic of the received message
against all topic filters that contain wildcards, a better approach
would be for every MQTT subscriber connection process to maintain a
local trie datastructure (similar to how topic exchanges are
implemented) and perform matching therefore more efficiently.
However, this does not sound optimal either because routing is
effectively performed twice: in the topic exchange and again against
a much smaller trie in each destination connection process.
3. Given that the topic exchange already perform routing, a much more
sensible way would be to send the matched binding key(s) to the
destination MQTT connection process. A subscription (topic filter)
maps to a binding key in AMQP 0.9.1 routing. Therefore, for the first
time in RabbitMQ, the routing function should not only output a list
of unique destination queues, but also the binding keys (subscriptions)
that caused the message to be routed to the destination queue.
This commit therefore implements the 3rd approach.
The downside of the 3rd approach is that it requires API changes to the
routing function and topic exchange.
Specifically, this commit adds a new function rabbit_exchange:route/3
that accepts a list of routing options. If that list contains version 2,
the caller of the routing function knows how to handle the return value
that could also contain binding keys.
This commits allows an MQTT connection process, the channel process, and
at-most-once dead lettering to handle binding keys. Binding keys are
included as AMQP 0.9.1 headers into the basic message.
Therefore, whenever a message is sent from an MQTT client or AMQP 0.9.1
client or AMQP 1.0 client or STOMP client, the MQTT receiver will know
the subscription identifier that caused the message to be received.
Note that due to the low number of allowed wildcard characters (# and
+), the cardinality of matched binding keys shouldn't be high even if
the topic contains for example 3 levels and the message is sent to for
example 5 million destination queues. In other words, sending multiple
distinct basic messages to the destination shouldn't hurt the delegate
optimisation too much. The delegate optimisation implemented for classic
queues and rabbit_mqtt_qos0_queue(s) still takes place for all basic
messages that contain the same set of matched binding keys.
The topic exchange returns all matched binding keys by remembering the
edges walked down to the leaves. As an optimisation, only for MQTT
queues are binding keys being returned. This does add a small dependency
from app rabbit to app rabbitmq_mqtt which is not optimal. However, this
dependency should be simple to remove when omitting this optimisation.
Another important feature of this commit is persisting subscription
options and subscription identifiers because they are part of the
MQTT 5.0 session state.
In MQTT v3 and v4, the only subscription information that were part of
the session state was the topic filter and the QoS level.
Both information were implicitly stored in the form of bindings:
The topic filter as the binding key and the QoS level as the destination
queue name of the binding.
For MQTT v5 we need to persist more subscription information.
From a domain perspective, it makes sense to store subscription options
as part of subscriptions, i.e. bindings, even though they are currently
not used in routing.
Therefore, this commits stores subscription options as binding arguments.
Storing subscription options as binding arguments comes in turn with
new challenges: How to handle mixed version clusters and upgrading an
MQTT session from v3 or v4 to v5?
Imagine an MQTT client connects via v5 with Session Expiry Interval > 0
to a new node in a mixed version cluster, creates a subscription,
disconnects, and subsequently connects via v3 to an old node. The
client should continue to receive messages.
To simplify such edge cases, this commit introduces a new feature flag
called mqtt_v5. If mqtt_v5 is disabled, clients cannot connect to
RabbitMQ via MQTT 5.0.
This still doesn't entirely solve the problem of MQTT session upgrades
(v4 to v5 client) or session downgrades (v5 to v4 client).
Ideally, once mqtt_v5 is enabled, all MQTT bindings contain non-empty binding
arguments. However, this will require a feature flag migration function
to modify all MQTT bindings. To be more precise, all MQTT bindings need
to be deleted and added because the binding argument is part of the
Mnesia table key.
Since feature flag migration functions are non-trivial to implement in
RabbitMQ (they can run on every node multiple times and concurrently),
this commit takes a simpler approach:
All v3 / v4 sessions keep the empty binding argument [].
All v5 sessions use the new binding argument [#mqtt_subscription_opts{}].
This requires only handling a session upgrade / downgrade by
creating a binding (with the new binding arg) and deleting the old
binding (with the old binding arg) when processing the CONNECT packet.
Note that such session upgrades or downgrades should be rather rare in
practice. Therefore these binding transactions shouldn't hurt peformance.
The No Local option is implemented within the MQTT publishing connection
process: The message is not sent to the MQTT destination if the
destination queue name matches the current MQTT client ID and the
message was routed due to a subscription that has the No Local flag set.
This avoids unnecessary traffic on the MQTT queue.
The alternative would have been that the "receiving side" (same process)
filters the message out - which would have been more consistent in how
Retain As Published and Subscription Identifiers are implemented, but
would have caused unnecessary load on the MQTT queue.
"Allow the Client and Server to independently specify the maximum
packet size they support. It is an error for the session partner
to send a larger packet."
This commit implements the part where the Client specifies the maximum
packet size.
As per protocol spec, instead of sending, the server drops the MQTT packet
if it's too large.
A debug message is logged for "infrequent" packet types.
For PUBLISH packets, the messages is rejected to the queue such that it
will be dead lettered, if dead lettering is configured.
At the very least, Prometheus metrics for dead lettered messages will
be increased, even if dead lettering is not configured.
* CQ: Don't use FHC for writes in shared store
* CQ: Send confirms when flushing to disk in shared store
Before they would only be sent periodically or when
rolling over to a new file.
* CQ: Fast-confirm when flushing data to disk
We know the messages are on disk or were acked so there is no
need to do sets intersections/subtracts in this scenario.
* Fix a Dialyzer warning
* Faster confirms for unwritten messages
Instead of having the message store send a message to the queue
with the confirms for messages ignored due to the flying
optimisation, we have the queue handle the confirms directly
when removing the messages.
This avoids sending potentially 1 Erlang message per 1 AMQP
message to the queue.
* Refactor rabbit_msg_file:pread into rabbit_msg_store
Also make use of the opened file for multi-reads instead
of opening/reading/closing each time.
* CQ: Make sure we keep the updated CState when using read_many
* CQ shared store: Run compaction on older file candidates
The way I initially did this the maybe_gc would be triggered
based on candidates from 15s ago, but run against candidates
from just now. This is sub-optimal because when messages are
consumed rapidly, just-now candidates are likely to be in a
file about to be deleted, and we don't want to run compaction
on those.
Instead, when sending the maybe_gc we also send the candidates
we had at the time. Then 15s later we check if the file still
exists. If it's gone, great! No compaction to do.
* CQ: Add a few todos for later
Since CMQs are on their way out, we are only willing
to spend so much time on it.
The test covers a scenario where four nodes are stopped, then
one force booted and then immediately removed from the cluster.
In other words, a scenario that's quite unrealistic.
This introduces a way to declare deprecated features in the code, not
only in our communication. The new module allows to disallow the use of
a deprecated feature and/or warn the user when he relies on such a
feature.
[Why]
Currently, we only tell people about deprecated features through blog
posts and the mailing-list. This might be insufficiant for our users
that a feature they use will be removed in a future version:
* They may not read our blog or mailing-list
* They may not understand that they use such a deprecated feature
* They might wait for the big removal before they plan testing
* They might not take it seriously enough
The idea behind this patch is to increase the chance that users notice
that they are using something which is about to be dropped from
RabbitMQ. Anopther benefit is that they should be able to test how
RabbitMQ will behave in the future before the actual removal. This
should allow them to test and plan changes.
[How]
When a feature is deprecated in other large projects (such as FreeBSD
where I took the idea from), it goes through a lifecycle:
1. The feature is still available, but users get a warning somehow when
they use it. They can disable it to test.
2. The feature is still available, but disabled out-of-the-box. Users
can re-enable it (and get a warning).
3. The feature is disconnected from the build. Therefore, the code
behind it is still there, but users have to recompile the thing to be
able to use it.
4. The feature is removed from the source code. Users have to adapt or
they can't upgrade anymore.
The solution in this patch offers the same lifecycle. A deprecated
feature will be in one of these deprecation phases:
1. `permitted_by_default`: The feature is available. Users get a warning
if they use it. They can disable it from the configuration.
2. `denied_by_default`: The feature is available but disabled by
default. Users get an error if they use it and RabbitMQ behaves like
the feature is removed. They can re-enable is from the configuration
and get a warning.
3. `disconnected`: The feature is present in the source code, but is
disabled and can't be re-enabled without recompiling RabbitMQ. Users
get the same behavior as if the code was removed.
4. `removed`: The feature's code is gone.
The whole thing is based on the feature flags subsystem, but it has the
following differences with other feature flags:
* The semantic is reversed: the feature flag behind a deprecated feature
is disabled when the deprecated feature is permitted, or enabled when
the deprecated feature is denied.
* The feature flag behind a deprecated feature is enabled out-of-the-box
(meaning the deprecated feature is denied):
* if the deprecation phase is `permitted_by_default` and the
configuration denies the deprecated feature
* if the deprecation phase is `denied_by_default` and the
configuration doesn't permit the deprecated feature
* if the deprecation phase is `disconnected` or `removed`
* Feature flags behind deprecated feature don't appear in feature flags
listings.
Otherwise, deprecated features' feature flags are managed like other
feature flags, in particular inside clusters.
To declare a deprecated feature:
-rabbit_deprecated_feature(
{my_deprecated_feature,
#{deprecation_phase => permitted_by_default,
msgs => #{when_permitted => "This feature will be removed in RabbitMQ X.0"},
}}).
Then, to check the state of a deprecated feature in the code:
case rabbit_deprecated_features:is_permitted(my_deprecated_feature) of
true ->
%% The deprecated feature is still permitted.
ok;
false ->
%% The deprecated feature is gone or should be considered
%% unavailable.
error
end.
Warnings and errors are logged automatically. A message is generated
automatically, but it is possible to define a message in the deprecated
feature flag declaration like in the example above.
Here is an example of a logged warning that was generated automatically:
Feature `my_deprecated_feature` is deprecated.
By default, this feature can still be used for now.
Its use will not be permitted by default in a future minor RabbitMQ version and the feature will be removed from a future major RabbitMQ version; actual versions to be determined.
To continue using this feature when it is not permitted by default, set the following parameter in your configuration:
"deprecated_features.permit.my_deprecated_feature = true"
To test RabbitMQ as if the feature was removed, set this in your configuration:
"deprecated_features.permit.my_deprecated_feature = false"
To override the default state of `permitted_by_default` and
`denied_by_default` deprecation phases, users can set the following
configuration:
# In rabbitmq.conf:
deprecated_features.permit.my_deprecated_feature = true # or false
The actual behavior protected by a deprecated feature check is out of
scope for this subsystem. It is the repsonsibility of each deprecated
feature code to determine what to do when the deprecated feature is
denied.
V1: Deprecated feature states are initially computed during the
initialization of the registry, based on their deprecation phase and
possibly the configuration. They don't go through the `enable/1`
code at all.
V2: Manage deprecated feature states as any other non-required
feature flags. This allows to execute an `is_feature_used()`
callback to determine if a deprecated feature can be denied. This
also allows to prevent the RabbitMQ node from starting if it
continues to use a deprecated feature.
V3: Manage deprecated feature states from the registry initialization
again. This is required because we need to know very early if some
of them are denied, so that an upgrade to a version of RabbitMQ
where a deprecated feature is disconnected or removed can be
performed.
To still prevent the start of a RabbitMQ node when a denied
deprecated feature is actively used, we run the `is_feature_used()`
callback of all denied deprecated features as part of the
`sync_cluster()` task. This task is executed as part of a feature
flag refresh executed when RabbitMQ starts or when plugins are
enabled. So even though a deprecated feature is marked as denied in
the registry early in the boot process, we will still abort the
start of a RabbitMQ node if the feature is used.
V4: Support context-dependent warnings. It is now possible to set a
specific message when deprecated feature is permitted, when it is
denied and when it is removed. Generic per-context messages are
still generated.
V5: Improve default warning messages, thanks to @pstack2021.
V6: Rename the configuration variable from `permit_deprecated_features.*`
to `deprecated_features.permit.*`. As @michaelklishin said, we tend
to use shorter top-level names.
[Why]
There could be a transient network issue. Let's give a few more chances
to perform the requested RPC call.
[How]
We retry until the given timeout is reached, if any.
To honor that timeout, we measure the time taken by the RPC call itself.
We also sleep between retries. Before each retry, the timeout is reduced
by the total of the time taken by the RPC call and the sleep.
References #8346.
V2: Treat `infinity` timeout differently. In this case, we never retry
following a `noconnection` error. The reason is that this timeout is
used specifically for callbacks executed remotely. We don't know how
long they take (for instance if there is a lot of data to migrate).
We don't want an infinite retry loop either, so in this case, we
don't retry.
[Why]
During peer discovery, when the feature flags state is synchronized on a
starting node that joins a cluster thanks to peer discovery, the list of
nodes returned by `rabbit_nodes:list_running()` is incorrect because
Mnesia is not initiliazed yet.
Because of that, the synchronization works on the wrong inventory of
feature flags. In the end, the states of feature flags is incorrect
across the cluster.
[How]
`rabbit_mnesia` passes a list of nodes to
`rabbit_feature_flags:sync_feature_flags_with_cluster/2`. We can use
this list, as we did in feature flags V1. This makes sure that the
synchronization works with a valid list of cluster members, in case the
cluster state is not ready yet.
V2: Filter the given list of nodes to only keep those where `rabbit` is
running. This avoids trying to collect inventory from nodes which
are stopped.
Instead of doing a complicated +1/-1 we do an update_counter
of an integer value using 2^n values. We always know exactly
in which state we are when looking at the ets table. We also
can avoid some ets operations as a result although the
performance improvements are minimal.
In mixed version cluster tests where the new node
uses CQv2, when mirror synchronisation happens,
v2 (source) overloads v1 (destination) leading to
a memory spike and a crash (in a memory-constrained
CI environment). Given that in 3.12 we switch to
a lazy-like mode for all classic queues, I think
we can make use a lazy queue in the test.
[Why]
The background reason for this fix is about the same as the one
explained in the previous version of this fix; see commit
e0a2f10272.
This time, the order of events that led to a similar deadlock is the
following:
0. No `rabbit_ff_registry` is loaded yet.
1. Process A, B and C call `rabbit_ff_registry:something()` indirectly
which triggers two initializations in parallel.
* Process A did it from an explicit call to
`rabbit_ff_registry_factory:initialize_factory()` during RabbitMQ
boot.
* Process B and C indirectly called it because they checked if a
feature flag was enabled.
2. Process B acquires the lock first and finishes the initialization. A
new registry is loaded and the old `rabbit_ff_registry` module copy
is marked as "old". At this point, process A and C still reference
that old copy because `rabbit_ff_registry:something()` is up above in
its call stack.
3. Process A acquires the lock, prepares the new registry and tries to
soft-purge the old `rabbit_ff_registry` copy before loading the new
one.
This is where the deadlock happens: process A requests the Code server
to purge the old copy, but the Code server waits for process C to stop
using it.
The difference between the steps described in the first bug fix
attempt's commit and these ones is that the process which lingers on the
deleted `rabbit_ff_registry` (process C above) isn't the one who
acquired the lock; process A has it.
That's why the first bug fix isn't effective in this case: it relied on
the fact that the process which lingers on the deleted
`rabbit_ff_registry` is the process which attempts to purge the module.
[How]
In this commit, we go with a more drastic change. This time, we put a
wrapper in front of `rabbit_ff_registry` called
`rabbit_ff_registry_wrapper`. This wrapper is responsible for doing the
automatic initialization if the loaded registry is the stub module. The
`rabbit_ff_registry` stub now always returns `init_required` instead of
performing the initialization and calling itself recursively.
This way, processes linger on `rabbit_ff_registry_wrapper`, not on
`rabbit_ff_registry`. Thanks to this, the Code server can proceed with
the purge.
See #8112.
as it nicer categorises if there will be a future
"message_interceptors.outgoing.*" key.
We leave the advanced config file key because simple single value
settings should not require using the advanced config file.
As reported in https://groups.google.com/g/rabbitmq-users/c/x8ACs4dBlkI/
plugins that implement rabbit_channel_interceptor break with
Native MQTT in 3.12 because Native MQTT does not use rabbit_channel anymore.
Specifically, these plugins don't work anymore in 3.12 when sending a message
from an MQTT publisher to an AMQP 0.9.1 consumer.
Two of these plugins are
https://github.com/rabbitmq/rabbitmq-message-timestamp
and
https://github.com/rabbitmq/rabbitmq-routing-node-stamp
This commit moves both plugins into rabbitmq-server.
Therefore, these plugins are deprecated starting in 3.12.
Instead of using these plugins, the user gets the same behaviour by
configuring rabbitmq.conf as follows:
```
incoming_message_interceptors.set_header_timestamp.overwrite = false
incoming_message_interceptors.set_header_routing_node.overwrite = false
```
While both plugins were incompatible to be used together, this commit
allows setting both headers.
We name the top level configuration key `incoming_message_interceptors`
because only incoming messages are intercepted.
Currently, only `set_header_timestamp` and `set_header_routing_node` are
supported. (We might support more in the future.)
Both can set `overwrite` to `false` or `true`.
The meaning of `overwrite` is the same as documented in
https://github.com/rabbitmq/rabbitmq-message-timestamp#always-overwrite-timestamps
i.e. whether headers should be overwritten if they are already present
in the message.
Both `set_header_timestamp` and `set_header_routing_node` behave exactly
to plugins `rabbitmq-message-timestamp` and `rabbitmq-routing-node-stamp`,
respectively.
Upon node boot, the configuration is put into persistent_term to not
cause any performance penalty in the default case where these settings
are disabled.
The channel and MQTT connection process will intercept incoming messages
and - if configured - add the desired AMQP 0.9.1 headers.
For now, this allows using Native MQTT in 3.12 with the old plugins
behaviour.
In the future, once "message containers" are implemented,
we can think about more generic message interceptors where plugins can be
written to modify arbitrary headers or message contents for various protocols.
Likewise, in the future, once MQTT 5.0 is implemented, we can think
about an MQTT connection interceptor which could function similar to a
`rabbit_channel_interceptor` allowing to modify any MQTT packet.
[Why]
The Feature flags registry is implemented as a module called
`rabbit_ff_registry` recompiled and reloaded at runtime.
There is a copy on disk which is a stub responsible for triggering the
first initialization of the real registry and please Dialyzer. Once the
initialization is done, this stub calls `rabbit_ff_registry` again to
get an actual return value. This is kind of recursive: the on-disk
`rabbit_ff_registry` copy calls the `rabbit_ff_registry` copy generated
at runtime.
Early during RabbitMQ startup, there could be multiple processes
indirectly calling `rabbit_ff_registry` and possibly triggering that
first initialization concurrently. Unfortunately, there is a slight
chance of race condition and deadlock:
0. No `rabbit_ff_registry` is loaded yet.
1. Both process A and B call `rabbit_ff_registry:something()` indirectly
which triggers two initializations in parallel.
2. Process A acquires the lock first and finishes the initialization. A
new registry is loaded and the old `rabbit_ff_registry` module copy
is marked as "old". At this point, process B still references that
old copy because `rabbit_ff_registry:something()` is up above in its
call stack.
3. Process B acquires the lock, prepares the new registry and tries to
soft-purge the old `rabbit_ff_registry` copy before loading the new
one.
This is where the deadlock happens: process B requests the Code server
to purge the old copy, but the Code server waits for process B to stop
using it.
[How]
With this commit, process B calls `erlang:check_process_code/2` before
asking for a soft purge. If it is using an old copy, it skips the purge
because it will deadlock anyway.
Bazel build files are now maintained primarily with `bazel run
gazelle`. This will analyze and merge changes into the build files as
necessitated by certain code changes (e.g. the introduction of new
modules).
In some cases there hints to gazelle in the build files, such as `#
gazelle:erlang...` or `# keep` comments. xref checks on plugins that
depend on the cli are a good example.
OTP-26 changed the default version for binary_to_term from 1 to 2.
There's one place where we explicitly ask for version 1 anyway
(in the STOMP plugin) and seems like we need to keep it like this.
Previously osiris did not support uncorrelated writes which means
we could not use a "stateless" queue type delivery and these were
silently dropped.
This had the impact that at-most-once dead letter was not possible
where the dead letter target is a stream.
This change bumps the osiris version that has the required API
to allow for uncorrelated writes (osiris:write/2).
Currently there is no feature flag to control this as osiris writer
processes just logs and drops any messages they don't understand.
Returns reaching a Ra member that used to be leader but now has stepped
down would cause that follower to crash and restart.
This commit avoids this scenario as well as giving the return commands
a good chance of being resent to the new leader in a timeley manner.
(see the Ra release for this).
vhost_precondition_failed => vhost_limit_exceeded
vhost_limit_exceeded is the error type used by
definition import when a per-vhost is exceeded.
It feels appropriate for this case, too.
The x-delivery-count header only needs to be added when a message is
redelivered. Adding it on the first delivery attempt is unnecessary,
not recorded in the quorum queue documentation and causes additional work
deserialising the binary basic properties data to add this header.
This could be notable for messages with substantial property data incl.
heavy use of headers for example.
This is useful for understanding if a deleted queue was matching any
policies given the more selective policies introduced in #7601.
Does not apply to bulk deletion of transient queues on node down.
Rather than relying on queue name conventions, allow applying policies
based on the queue type. For example, this allows multiple policies that
apply to all queue names (".*") that specify different parameters for
different queue types.
This puts a limit to the amount of message data that is added
to the process heap at the same time to around 128KB.
Large prefetch values combined with large messages could cause
excessive garbage collection work.
Also similify the intermediate delivery message format to avoid
allocations that aren't necessary.
This new module sits on top of `rabbit_mnesia` and provide an API with
all cluster-related functions.
`rabbit_mnesia` should be called directly inside Mnesia-specific code
only, `rabbit_mnesia_rename` or classic mirrored queues for instance.
Otherwise, `rabbit_db_cluster` must be used.
Several modules, in particular in `rabbitmq_cli`, continue to call
`rabbit_mnesia` as a fallback option if the `rabbit_db_cluster` module
unavailable. This will be the case when the CLI will interact with an
older RabbitMQ version.
This will help with the introduction of a new database backend.
[Why]
If a plugin was already enabled when RabbitMQ starts, its required
feature flags were correctly handled and thus enabled. However, this was
not the case for a plugin enabled at runtime.
Here is an example with the `drop_unroutable_metric` from the
rabbitmq_management_agent plugin:
Feature flags: `drop_unroutable_metric`: required feature flag not
enabled! It must be enabled before upgrading RabbitMQ.
Supporting required feature flags in plugin is trickier than in the
core broker. Indeed, with the broker, we know when this is the first
time the broker is started. Therefore we are sure that a required
feature flag can be enabled directly, there is no existing data/context
that could conflict with the code behind the required feature flag.
For plugins, this is different: a plugin can be enabled/disabled at
runtime and between broker restarts (and thus upgrades). So, when a
plugin is enabled and it has a required feature flag, we have no way to
make sure that there is no existing and conflicting data/context.
[How]
In this patch, if the required feature flag is provided by a plugin
(i.e. not `rabbit`), we always mark it as enabled.
The plugin is responsible for handling any existing data/context and
perform any cleanup/conversion.
Reported by: @ansd
So far, we had the following functions to list nodes in a RabbitMQ
cluster:
* `rabbit_mnesia:cluster_nodes/1` to get members of the Mnesia cluster;
the argument was used to select members (all members or only those
running Mnesia and participating in the cluster)
* `rabbit_nodes:all/0` to get all members of the Mnesia cluster
* `rabbit_nodes:all_running/0` to get all members who currently run
Mnesia
Basically:
* `rabbit_nodes:all/0` calls `rabbit_mnesia:cluster_nodes(all)`
* `rabbit_nodes:all_running/0` calls `rabbit_mnesia:cluster_nodes(running)`
We also have:
* `rabbit_node_monitor:alive_nodes/1` which filters the given list of
nodes to only select those currently running Mnesia
* `rabbit_node_monitor:alive_rabbit_nodes/1` which filters the given
list of nodes to only select those currently running RabbitMQ
Most of the code uses `rabbit_mnesia:cluster_nodes/1` or the
`rabbit_nodes:all*/0` functions. `rabbit_mnesia:cluster_nodes(running)`
or `rabbit_nodes:all_running/0` is often used as a close approximation
of "all cluster members running RabbitMQ". This list might be incorrect
in times where a node is joining the clustered or is being worked on
(i.e. Mnesia is running but not RabbitMQ).
With Khepri, there won't be the same possible approximation because we
will try to keep Khepri/Ra running even if RabbitMQ is stopped to
expand/shrink the cluster.
So in order to clarify what we want when we query a list of nodes, this
patch introduces the following functions:
* `rabbit_nodes:list_members/0` to get all cluster members, regardless
of their state
* `rabbit_nodes:list_reachable/0` to get all cluster members we can
reach using Erlang distribution, regardless of the state of RabbitMQ
* `rabbit_nodes:list_running/0` to get all cluster members who run
RabbitMQ, regardless of the maintenance state
* `rabbit_nodes:list_serving/0` to get all cluster members who run
RabbitMQ and are accepting clients
In addition to the list functions, there are the corresponding
`rabbit_nodes:is_*(Node)` checks and `rabbit_nodes:filter_*(Nodes)`
filtering functions.
The code is modified to use these new functions. One possible
significant change is that the new list functions will perform RPC calls
to query the nodes' state, unlike `rabbit_mnesia:cluster_nodes(running)`.
RabbitMQ 3.12 requires feature flag `feature_flags_v2` which got
introduced in 3.11.0 (see
https://github.com/rabbitmq/rabbitmq-server/pull/6810).
Therefore, we can mark all feature flags that got introduced in 3.11.0
or before 3.11.0 as required because users will have to upgrade to
3.11.x first, before upgrading to 3.12.x
The advantage of marking these feature flags as required is that we can
start deleting any compatibliy code for these feature flags, similarly
as done in https://github.com/rabbitmq/rabbitmq-server/issues/5215
This list shows when a given feature flag was first introduced:
```
classic_mirrored_queue_version 3.11.0
stream_single_active_consumer 3.11.0
direct_exchange_routing_v2 3.11.0
listener_records_in_ets 3.11.0
tracking_records_in_ets 3.11.0
empty_basic_get_metric 3.8.10
drop_unroutable_metric 3.8.10
```
In this commit, we also force all required feature flags in Erlang
application `rabbit` to be enabled in mixed version cluster testing
and delete any tests that were about a feature flag starting as disabled.
Furthermore, this commit already deletes the callback (migration) functions
given they do not run anymore in 3.12.x.
All other clean up (i.e. branching depending on whether a feature flag
is enabled) will be done in separate commits.
* Mark AMQP 1.0 properties chunk as binary
It is marked as an UTF8 string, which is not, so
strict AMQP 1.0 codecs can fail.
* Re-use AMQP 1.0 binary chunks if available
Instead of converting from AMQP 091 back to AMQP 1.0.
This is for AMQP 1.0 properties, application properties,
and message annotations.
* Test AMQP 1.0 binary chunk reuse
* Support AMQP 1.0 multi-value body better
In the rabbit_msg_record module, mostly. Before this commit,
only one Data section was supported. Now multiple Data sections,
multiple Sequence sections, and an AMQP value section are supported.
* Add test for non-single-data-section AMQP 1.0 message
* Squash some Dialyzer warnings
* Silent dialyzer for a function for now
* Fix type declaration, use type, not atom
* Address review comments
as it was unnecessary to introduce it in the first place.
Remove the queue name from all queue type clients and pass the queue
name to the queue type callbacks that need it.
We have to leave feature flag classic_queue_type_delivery_support
required because we removed the monitor registry
1fd4a6d353/deps/rabbit/src/rabbit_queue_type.erl (L322-L325)
Implements review from Karl:
"rather than changing the message format we could amend the queue type
callbacks involved with the stateful operation to also take the queue
name record as an argument. This way we don't need to maintain the extra
queue name (which uses memory for known but obscurely technical reasons
with how maps work) in the queue type state (as it is used in the queue
type state map as the key)"
We want the build to fail if there are any dialyzer warnings in
rabbitmq_mqtt or rabbitmq_web_mqtt. Otherwise we rely on people manually
executing and checking the results of dialyzer.
Also, we want any test to fail that is flaky.
Flaky tests can indicate subtle errors in either test or program execution.
Instead of marking them as flaky, we should understand and - if possible -
fix the underlying root cause.
Fix OTP 25.0 dialyzer warning
Type gen_server:format_status() is known in OTP 25.2, but not in 25.0
Prior to this commit, when connecting or disconnecting many thousands of
MQTT subscribers, RabbitMQ printed many times:
```
[warning] <0.241.0> Mnesia('rabbit@mqtt-rabbit-1-server-0.mqtt-rabbit-1-nodes.default'): ** WARNING ** Mnesia is overloaded: {dump_log,write_threshold}
```
Each MQTT subscription causes queues and bindings to be written into Mnesia.
In order to allow for higher Mnesia load, the user can configure
```
[
{mnesia,[
{dump_log_write_threshold, 10000}
]}
].
```
in advanced.config
or set this value via
```
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-mnesia dump_log_write_threshold 10000"
```
The Mnesia default for dump_log_write_threshold is 1,000.
The Mnesia default for dump_log_time_threshold is 180,000 ms.
It is reasonable to increase the default for dump_log_write_threshold from
1,000 to 5,000 and in return decrease the default dump_log_time_threshold
from 3 minutes to 1.5 minutes.
This way, users can achieve higher MQTT scalability by default.
This setting cannot be changed at Mnesia runtime, it needs to be set
before Mnesia gets started.
Since the rabbitmq_mqtt plugin can be enabled dynamically after Mnesia
started, this setting must therefore apply globally to RabbitMQ.
Users can continue to set their own defaults via advanced.config or
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS. They continue to be respected
as shown by the new test suite included in this commit.
Tests sporadically fail with:
```
=== Ended at 2022-11-17 20:27:09
=== Location: [{rabbit_fifo_dlx_integration_SUITE,assert_active_dlx_workers,938},
{test_server,ts_tc,1782},
{test_server,run_test_case_eval1,1291},
{test_server,run_test_case_eval,1223}]
=== === Reason: {assertMatch,
[{module,rabbit_fifo_dlx_integration_SUITE},
{line,938},
{expression,
"rabbit_ct_broker_helpers : rpc ( Config , Server , supervisor , count_children , [ rabbit_fifo_dlx_sup ] , 1000 )"},
{pattern,"[ _ , { active , N } , _ , _ ]"},
{value,
[{specs,1},
{active,2},
{supervisors,0},
{workers,2}]}]}
in function rabbit_fifo_dlx_integration_SUITE:assert_active_dlx_workers/3 (rabbit_fifo_dlx_integration_SUITE.erl, line 938)
in call from test_server:ts_tc/3 (test_server.erl, line 1782)
in call from test_server:run_test_case_eval1/6 (test_server.erl, line 1291)
in call from test_server:run_test_case_eval/9 (test_server.erl, line 1223)
```
This commits attempts to remove that failure by using
supervisor:which_children/1 because the docs for
supervisor:count_children/1 say:
"active - The count of all actively running child processes managed by this supervisor.
For a simple_one_for_one supervisors, no check is done to ensure that each child process
is still alive, although the result provided here is likely to be very accurate unless
the supervisor is heavily overloaded."
Instead of performing credit_flow within quorum queue and stream queue
clients, return new {block | unblock, QueueName} actions.
The queue client process can then decide what to do.
For example, the channel continues to use credit_flow such that the
channel gets blocked sending any more credits to rabbit_reader.
However, the MQTT connection process does not use credit_flow. It
instead blocks its reader directly.
Prior to this commit, 1 MQTT publisher publishing to 1 Million target
classic queues requires around 680 MB of process memory.
After this commit, it requires around 290 MB of process memory.
This commit requires feature flag classic_queue_type_delivery_support
and introduces a new one called no_queue_name_in_classic_queue_client.
Instead of storing the binary queue name 4 times, this commit now stores
it only 1 time.
The monitor_registry is removed since only classic queue clients monitor
their classic queue server processes.
The classic queue client does not store the queue name anymore. Instead
the queue name is included in messages handled by the classic queue
client.
Storing the queue name in the record ctx was unnecessary.
More potential future memory optimisations:
* When routing to destination queues, looking up the queue record,
delivering to queue: Use streaming / batching instead of fetching all
at once
* Only fetch ETS columns that are necessary instead of whole queue
records
* Do not hold the same vhost binary in memory many times. Instead,
maintain a mapping.
* Remove unnecessary tuple fields.
"Each Client connecting to the Server has a unique ClientId"
"If the ClientId represents a Client already connected to
the Server then the Server MUST disconnect the existing
Client [MQTT-3.1.4-2]."
Instead of tracking client IDs via Raft, we use local ETS tables in this
commit.
Previous tracking of client IDs via Raft:
(+) consistency (does the right thing)
(-) state of Ra process becomes large > 1GB with many (> 1 Million) MQTT clients
(-) Ra process becomes a bottleneck when many MQTT clients (e.g. 300k)
disconnect at the same time because monitor (DOWN) Ra commands get
written resulting in Ra machine timeout.
(-) if we need consistency, we ideally want a single source of truth,
e.g. only Mnesia, or only Khepri (but not Mnesia + MQTT ra process)
While above downsides could be fixed (e.g. avoiding DOWN commands by
instead doing periodic cleanups of client ID entries using session interval
in MQTT 5 or using subscription_ttl parameter in current RabbitMQ MQTT config),
in this case we do not necessarily need the consistency guarantees Raft provides.
In this commit, we try to comply with [MQTT-3.1.4-2] on a best-effort
basis: If there are no network failures and no messages get lost,
existing clients with duplicate client IDs get disconnected.
In the presence of network failures / lost messages, two clients with
the same client ID can end up publishing or receiving from the same
queue. Arguably, that's acceptable and less worse than the scaling
issues we experience when we want stronger consistency.
Note that it is also the responsibility of the client to not connect
twice with the same client ID.
This commit also ensures that the client ID is a binary to save memory.
A new feature flag is introduced, which when enabled, deletes the Ra
cluster named 'mqtt_node'.
Independent of that feature flag, client IDs are tracked locally in ETS
tables.
If that feature flag is disabled, client IDs are additionally tracked in
Ra.
The feature flag is required such that clients can continue to connect
to all nodes except for the node being udpated in a rolling update.
This commit also fixes a bug where previously all MQTT connections were
cluster-wide closed when one RabbitMQ node was put into maintenance
mode.
These functions sit on top of their equivalent in `rabbit_mnesia`. In
the future, they will take care of picking the right database layer,
whatever it is.
The start of `mnesia_sync` is now part of this initialization instead of
a separate boot step in `rabbit` because it is specific to our use of
Mnesia.
In addition, `rabbit_db` provides `is_virgin_node/1` to query the state
of a remote node. This is used by `rabbit_ff_controller` in the feature
flags subsystem.
At this point, the underlying equivalent functions in `rabbit_mnesia`
become private to this module (and other modules implementing the
interaction with Mnesia). Other parts of RabbitMQ, including plugins,
should now use `rabbit_db`, not `rabbit_mnesia`.
With `rpc:call/5`, the `throw(reason)` in a migration function would be
detected as an error by the feature flags subsystem, but the return
value of `sync_cluster/0` would be `reason` instead of `{error, reason}`
(which was expected by the caller).
This should make sure that the call to
`rabbit_feature_flags:sync_feature_flags_with_cluster/2` in
`rabbit_mnesia` gets the proper return value and aborts the node
startup.