Commit Graph

593 Commits

Author SHA1 Message Date
David Ansari 6225dc9928 Do not parse entire AMQP body
Prior to this commit the entire amqp-value or amqp-sequence sections
were parsed when converting a message from mc_amqp.
Parsing the entire amqp-value or amqp-sequence section can generate a
huge amount of garbage depending on how large these sections are.

Given that other protocol cannot make use of amqp-value and
amqp-sequence sections anyway, leave them AMQP encoded when converting
from mc_amqp.

In fact prior to this commit, the entire body section was parsed
generating huge amounts of garbage just to subsequently encode it again
in mc_amqpl or mc_mqtt.

The new conversion interface from mc_amqp to other mc_* modules will
either output amqp-data sections or the encoded amqp-value /
amqp-sequence sections.
2024-05-02 07:56:00 +00:00
David Ansari 81709d9745 Fix MQTT QoS
This commit fixes test
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed -t- \
    --test_sharding_strategy=disabled --test_env \
    FOCUS="-group [mqtt,v3,cluster_size_3] -case pubsub"
```

Fix some mixed version tests

Assume the AMQP body, especially amqp-value section won't be parsed.
Hence, omit smart conversions from AMQP to MQTT involving the
Payload-Format-Indicator bit.

Fix test

Fix
```
bazel test //deps/amqp10_client:system_SUITE-mixed -t- --test_sharding_strategy=disabled --test_env FOCUS="-group [rabbitmq]
```
2024-05-02 07:56:00 +00:00
David Ansari fc7f458f7c Fix tests 2024-05-02 07:56:00 +00:00
David Ansari bb106ff65c Skip access check on absent will queue
Resolves https://github.com/rabbitmq/rabbitmq-server/discussions/11021

Prior to this commit, an MQTT client that connects to RabbitMQ needed
configure access to its will queue even if the will queue has never
existed. This breaks client apps connecting with either v3 or v4 or with
v5 without making use of the Will-Delay-Interval.

Specifically, in 3.13.0 and 3.13.1 an MQTT client that connects to
RabbitMQ needs unnecessarily configure access to queue
`mqtt-will-<MQTT client ID>`.

This commit only check for configure access, if the queue actually gets
deleted, i.e. if it existed.
2024-04-17 12:50:41 +02:00
David Ansari e96125bfd3 Store MQTT messages as non-durable if QoS 0
By default, when the 'durable' message container (mc) annotation is unset,
messages are interpreted to be durable.

Prior to this commit, MQTT messages that were sent with QoS 0 were
stored durably in classic queues.
This commit takes the same approach for mc_mqtt as for mc_amqpl and mc_amqp:
If the message is durable, the durable mc annotation will not be set.
If the message is non-durable, the durable mc annotation will be set to false.
2024-04-16 11:43:58 +02:00
David Ansari 71d1b3b455 Respect message_interceptors.incoming.set_header_timestamp
When feature flag message_containers is enabled, setting
```
message_interceptors.incoming.set_header_timestamp
```
wasn't respected anymore when a message is published via MQTT to a
stream and subsequently consumed via AMQP 0.9.1.

This commit ensures that AMQP 0.9.1 header timestamp_in_ms will be
set.

Note that we must not modify the AMQP 1.0 properties section when messages
are received via AMQP 1.0 and consumed via AMQP 1.0.
Also, message annoation keys not starting with "x-" are reserved.
2024-04-10 11:40:49 +02:00
dependabot[bot] 23f3ebf381
Bump com.rabbitmq:amqp-client
Bumps [com.rabbitmq:amqp-client](https://github.com/rabbitmq/rabbitmq-java-client) from 5.20.0 to 5.21.0.
- [Release notes](https://github.com/rabbitmq/rabbitmq-java-client/releases)
- [Commits](https://github.com/rabbitmq/rabbitmq-java-client/compare/v5.20.0...v5.21.0)

---
updated-dependencies:
- dependency-name: com.rabbitmq:amqp-client
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-04-08 19:00:59 +00:00
David Ansari 1b75baddd9 Add AMQP 1.0 -> MQTT 5.0 -> AMQP 1.0 test 2024-04-05 15:09:21 +02:00
David Ansari 390d5715a0 Introduce new AMQP 1.0 address format
## What?
Introduce a new address format (let's call it v2) for AMQP 1.0 source and target addresses.

The old format (let's call it v1) is described in
https://github.com/rabbitmq/rabbitmq-server/tree/v3.13.x/deps/rabbitmq_amqp1_0#routing-and-addressing

The only v2 source address format is:
```
/queue/:queue
```

The 4 possible v2 target addresses formats are:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
<null>
```
where the last AMQP <null> value format requires that each message’s `to` field contains one of:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
```

 ## Why?

The AMQP address v1 format comes with the following flaws:

1. Obscure address format:

Without reading the documentation, the differences for example between source addresses
```
/amq/queue/:queue
/queue/:queue
:queue
```
are unknown to users. Hence, the address format is obscure.

2. Implicit creation of topologies

Some address formats implicitly create queues (and bindings), such as source address
```
/exchange/:exchange/:binding-key
```
or target address
```
/queue/:queue
```
These queues and bindings are never deleted (by the AMQP 1.0 plugin.)
Implicit creation of such topologies is also obscure.

3. Redundant address formats

```
/queue/:queue
:queue
```
have the same meaning and are therefore redundant.

4. Properties section must be parsed to determine whether a routing key is present

Target address
```
/exchange/:exchange
```
requires RabbitMQ to parse the properties section in order to check whether the message `subject` is set.
If `subject` is not set, the routing key will default to the empty string.

5. Using `subject` as routing key misuses the purpose of this field.

According to the AMQP spec, the message `subject` field's purpose is:
> A common field for summary information about the message content and purpose.

6. Exchange names, queue names and routing keys must not contain the "/" (slash) character.

The current 3.13 implemenation splits by "/" disallowing these
characters in exchange, and queue names, and routing keys which is
unnecessary prohibitive.

7. Clients must create a separate link per target exchange

While this is reasonable working assumption, there might be rare use
cases where it could make sense to create many exchanges (e.g. 1
exchange per queue, see
https://github.com/rabbitmq/rabbitmq-server/discussions/10708) and have
a single application publish to all these exchanges.
With the v1 address format, for an application to send to 500 different
exchanges, it needs to create 500 links.

Due to these disadvantages and thanks to #10559 which allows clients to explicitly create topologies,
we can create a simpler, clearer, and better v2 address format.

 ## How?

 ### Design goals

Following the 7 cons from v1, the design goals for v2 are:
1. The address format should be simple so that users have a chance to
   understand the meaning of the address without necessarily consulting the docs.
2. The address format should not implicitly create queues, bindings, or exchanges.
   Instead, topologies should be created either explicitly via the new management node
   prior to link attachment (see #10559), or in future, we might support the `dynamic`
   source or target properties so that RabbitMQ creates queues dynamically.
3. No redundant address formats.
4. The target address format should explicitly state whether the routing key is present, empty,
   or will be provided dynamically in each message.
5. `Subject` should not be used as routing key. Instead, a better
   fitting field should be used.
6. Exchange names, queue names, and routing keys should allow to contain
   valid UTF-8 encoded data including the "/" character.
7. Allow both target exchange and routing key to by dynamically provided within each message.

Furthermore
8. v2 must co-exist with v1 for at least some time. Applications should be able to upgrade to
   RabbitMQ 4.0 while continuing to use v1. Examples include AMQP 1.0 shovels and plugins communicating
   between a 4.0 and a 3.13 cluster. Starting with 4.1, we should change the AMQP 1.0 shovel and plugin clients
   to use only the new v2 address format. This will allow AMQP 1.0 and plugins to communicate between a 4.1 and 4.2 cluster.
   We will deprecate v1 in 4.0 and remove support for v1 in a later 4.x version.

 ### Additional Context

The address is usually a String, but can be of any type.

The [AMQP Addressing extension](https://docs.oasis-open.org/amqp/addressing/v1.0/addressing-v1.0.html)
suggests that addresses are URIs and are therefore hierarchical and could even contain query parameters:
> An AMQP address is a URI reference as defined by RFC3986.

> the path expression is a sequence of identifier segments that reflects a path through an
> implementation specific relationship graph of AMQP nodes and their termini.
> The path expression MUST resolve to a node’s terminus in an AMQP container.

The [Using the AMQP Anonymous Terminus for Message Routing Version 1.0](https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html)
extension allows for the target being `null` and the `To` property to contain the node address.
This corresponds to AMQP 0.9.1 where clients can send each message on the same channel to a different `{exchange, routing-key}` destination.

The following v2 address formats will be used.

 ### v2 addresses

A new deprecated feature flag `amqp_address_v1` will be introduced in 4.0 which is permitted by default.
Starting with 4.1, we should change the AMQP 1.0 shovel and plugin AMQP 1.0 clients to use only the new v2 address format.
However, 4.1 server code must still understand the 4.0 AMQP 1.0 shovel and plugin AMQP 1.0 clients’ v1 address format.
The new deprecated feature flag will therefore be denied by default in 4.2.
This allows AMQP 1.0 shovels and plugins to work between
* 4.0 and 3.13 clusters using v1
* 4.1 and 4.0 clusters using v2 from 4.1 to v4.0 and v1 from 4.0 to 4.1
* 4.2 and 4.1 clusters using v2

without having to support both v1 and v2 at the same time in the AMQP 1.0 shovel and plugin clients.
While supporting both v1 and v2 in these clients is feasible, it's simpler to switch the client code directly from v1 to v2.

 ### v2 source addresses

The source address format is
```
/queue/:queue
```
If the deprecated feature flag `amqp_address_v1` is permitted and the queue does not exist, the queue will be auto-created.
If the deprecated feature flag `amqp_address_v1` is denied, the queue must exist.

 ### v2 target addresses

v1 requires attaching a new link for each destination exchange.
v2 will allow dynamic `{exchange, routing-key}` combinations for a given link.
v2 therefore allows for the rare use cases where a single AMQP 1.0 publisher app needs to send to many different exchanges.
Setting up a link per destination exchange could be cumbersome.
Hence, v2 will support the dynamic `{exchange, routing-key}` combinations of AMQP 0.9.1.
To achieve this, we make use of the "Anonymous Terminus for Message Routing" extension:
The target address will contain the AMQP value null.
The `To` field in each message must be set and contain either address format
```
/exchange/:exchange/key/:routing-key
```
or
```
/exchange/:exchange
```
when using the empty routing key.

The `to` field requires an address type and is better suited than the `subject field.

Note that each message will contain this `To` value for the anonymous terminus.
Hence, we should save some bytes being sent across the network and stored on disk.
Using a format
```
/e/:exchange/k/:routing-key
```
saves more bytes, but is too obscure.
However, we use only `/key/` instead of `/routing-key/` so save a few bytes.
This also simplifies the format because users don’t have to remember whether to use spell `routing-key` or `routing_key` or `routingkey`.

The other allowed target address formats are:
```
/exchange/:exchange/key/:routing-key
```
where exchange and routing key are static on the given link.

```
/exchange/:exchange
```
where exchange and routing key are static on the given link, and routing key will be the empty string (useful for example for the fanout exchange).

```
/queue/:queue
```
This provides RabbitMQ beginners the illusion of sending a message directly
to a queue without having to understand what exchanges and routing keys are.
If the deprecated feature flag `amqp_address_v1` is permitted and the queue does not exist, the queue will be auto-created.
If the deprecated feature flag `amqp_address_v1` is denied, the queue must exist.
Besides the additional queue existence check, this queue target is different from
```
/exchange//key/:queue
```
in that queue specific optimisations might be done (in future) by RabbitMQ
(for example different receiving queue types could grant different amounts of link credits to the sending clients).
A write permission check to the amq.default exchange will be performed nevertheless.

v2 will prohibit the v1 static link & dynamic routing-key combination
where the routing key is sent in the message `subject` as that’s also obscure.
For this use case, v2’s new anonymous terminus can be used where both exchange and routing key are defined in the message’s `To` field.

(The bare message must not be modified because it could be signed.)

The alias format
```
/topic/:topic
```
will also be removed.
Sending to topic exchanges is arguably an advanced feature.
Users can directly use the format
```
/exchange/amq.topic/key/:topic
```
which reduces the number of redundant address formats.

 ### v2 address format reference

To sump up (and as stated at the top of this commit message):

The only v2 source address format is:
```
/queue/:queue
```

The 4 possible v2 target addresses formats are:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
<null>
```
where the last AMQP <null> value format requires that each message’s `to` field contains one of:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
```

Hence, all 8 listed design goals are reached.
2024-04-05 12:22:02 +02:00
David Ansari dda1c500da Require feature flag message_containers
as it is required for Native AMQP 1.0 in 4.0.

Remove compatibility code.
2024-04-04 15:11:31 +02:00
David Ansari 8233db0703 Fix wrong test assumptions
PR #10761 added a new CLI command to list Web MQTT connections.
That new CLI command relies on feature flag delete_ra_cluster_mqtt_node
being enabled.

This commit ensures exactly this condition.
2024-03-21 16:24:57 +01:00
Jean-Sébastien Pédron affcb6aba5
Revert "Merge pull request #10772 from rabbitmq/dependabot/maven/deps/rabbitmq_mqtt/test/java_SUITE_data/main/org.apache.maven.plugins-maven-compiler-plugin-3.13.0"
This reverts commit d8505d6f43, reversing
changes made to d96b127a3b.
2024-03-19 16:05:53 +01:00
Michael Klishin f7697c3d19
Merge pull request #10761 from rabbitmq/cloudamqp-fix/9302-list-webmqtt-connections
A new command for Web MQTT connection listing #10693 #9302
2024-03-18 20:20:14 -04:00
dependabot[bot] ff4edf8d9f
Bump org.apache.maven.plugins:maven-compiler-plugin
Bumps [org.apache.maven.plugins:maven-compiler-plugin](https://github.com/apache/maven-compiler-plugin) from 3.12.1 to 3.13.0.
- [Release notes](https://github.com/apache/maven-compiler-plugin/releases)
- [Commits](https://github.com/apache/maven-compiler-plugin/compare/maven-compiler-plugin-3.12.1...maven-compiler-plugin-3.13.0)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-compiler-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-03-18 18:12:22 +00:00
Rin Kuryloski 5271c26908 Mixed version test fix 2024-03-18 10:34:34 +01:00
LoisSotoLopez 481cac6430 Fix/9302 test list Web MQTT connections command
Discussion #9302
2024-03-16 13:08:30 -04:00
David Ansari 8cb313d5a1 Support AMQP 1.0 natively
## What

Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0.
By "native", we mean do not proxy via AMQP 0.9.1 anymore.

  ## Why

Native AMQP 1.0 comes with the following major benefits:
1. Similar to Native MQTT, this commit provides better throughput, latency,
   scalability, and resource usage for AMQP 1.0.
   See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements.
   See further below for some benchmarks.
2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol,
   this commit allows implementing more AMQP 1.0 features in the future.
   Some features are already implemented in this commit (see next section).
3. Simpler, better understandable, and more maintainable code.

Native AMQP 1.0 as implemented in this commit has the
following major benefits compared to AMQP 0.9.1:
4. Memory and disk alarms will only stop accepting incoming TRANSFER frames.
   New connections can still be created to consume from RabbitMQ to empty queues.
5. Due to 4. no need anymore for separate connections for publishers and
   consumers as we currently recommended for AMQP 0.9.1. which potentially
   halves the number of physical TCP connections.
6. When a single connection sends to multiple target queues, a single
   slow target queue won't block the entire connection.
   Publisher can still send data quickly to all other target queues.
7. A publisher can request whether it wants publisher confirmation on a per-message basis.
   In AMQP 0.9.1 publisher confirms are configured per channel only.
8. Consumers can change their "prefetch count" dynamically which isn't
   possible in our AMQP 0.9.1 implementation. See #10174
9. AMQP 1.0 is an extensible protocol

This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in
RabbitMQ 3.x - most of which cannot be backported due to the complexity
and limitations of the old 3.x implementation.

This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.

 ## Implementation details

1. Breaking change: With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
will break because we always convert according to the message container conversions.
For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties.
Also, `false` won’t be respected since we always convert the headers with message containers.

2. Remove rabbit_queue_collector

rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.

3. 7 processes per connection + 1 process per session in this commit instead of
   7 processes per connection + 15 processes per session in 3.x
Supervision hierarchy got re-designed.

4. Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel.
Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session.
Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
  a connection write heavily at the same time.

This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.
Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
  can accumulate across all sessions bytes before flushing the socket.

In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy.
We still ensure high throughput by having separate reader, writer, and
session processes.

5. Transform rabbit_amqp1_0_writer into gen_server
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's better
to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.

How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.

6. Remove stats timer from writer
AMQP 1.0 connections haven't emitted any stats previously.

7. When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs.
This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
  settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
  and unsettled TRANSFERs.

8. Introduce credit API v2
Why:
The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly
designed since basic.credit is a synchronous call into the queue process
blocking the entire AMQP 1.0 session process.

How:
Change the interactions between queue clients and queue server
implementations:
* Clients only request a credit reply if the FLOW's `echo` field is set
* Include all link flow control state held by the queue process into a
  new credit_reply queue event:
  * `available` after the queue sends any deliveries
  * `link-credit` after the queue sends any deliveries
  * `drain` which allows us to combine the old queue events
    send_credit_reply and send_drained into a single new queue event
    credit_reply.
* Include the consumer tag into the credit_reply queue event such that
  the AMQP 1.0 session process can process any credit replies
  asynchronously.

Link flow control state `delivery-count` also moves to the queue processes.

The new interactions are hidden behind feature flag credit_api_v2 to
allow for rolling upgrades from 3.13 to 4.0.

9. Use serial number arithmetic in quorum queues and session process.

10. Completely bypass the rabbit_limiter module for AMQP 1.0
flow control. The goal is to eventually remove the rabbit_limiter module
in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This
commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter
into rabbit_queue_consumers.

11. Fix credit bug for streams:
AMQP 1.0 settlements shouldn't top up link credit,
only FLOW frames should top up link credit.

12. Allow sender settle mode unsettled for streams
since AMQP 1.0 acknowledgements to streams are no-ops (currently).

13. Fix AMQP 1.0 client bugs
Auto renewing credits should not be related to settling TRANSFERs.
Remove field link_credit_unsettled as it was wrong and confusing.
Prior to this commit auto renewal did not work when the sender uses
sender settlement mode settled.

14. Fix AMQP 1.0 client bugs
The wrong outdated Link was passed to function auto_flow/2

15. Use osiris chunk iterator
Only hold messages of uncompressed sub batches in memory if consumer
doesn't have sufficient credits.
Compressed sub batches are skipped for non Stream protocol consumers.

16. Fix incoming link flow control
Always use confirms between AMQP 1.0 queue clients and queue servers.
As already done internally by rabbit_fifo_client and
rabbit_stream_queue, use confirms for classic queues as well.

17. Include link handle into correlation when publishing messages to target queues
such that session process can correlate confirms from target queues to
incoming links.

18. Only grant more credits to publishers if publisher hasn't sufficient credits
anymore and there are not too many unconfirmed messages on the link.

19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow
between classic queue process and session process.

20. Link flow control is independent between links.
A client can refer to a queue or to an exchange with multiple
dynamically added target queues. Multiple incoming links can also fan
in to the same queue. However the link topology looks like, this
commit ensures that each link is only granted more credits if that link
isn't overloaded.

21. A connection or a session can send to many different queues.
In AMQP 0.9.1, a single slow queue will lead to the entire channel, and
then entire connection being blocked.
This commit makes sure that a single slow queue from one link won't slow
down sending on other links.
For example, having link A sending to a local classic queue and
link B sending to 5 replica quorum queue, link B will naturally
grant credits slower than link A. So, despite the quorum queue being
slower in confirming messages, the same AMQP 1.0 connection and session
can still pump data very fast into the classic queue.

22. If cluster wide memory or disk alarm occurs.
Each session sends a FLOW with incoming-window to 0 to sending client.
If sending clients don’t obey, force disconnect the client.

If cluster wide memory alarm clears:
Each session resumes with a FLOW defaulting to initial incoming-window.

23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms,
specifically, attaching consumers and consuming, i.e. emptying queues.
There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.

24. Flow control summary:
* If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
* If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
* If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied.
Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.

25. Register AMQP sessions
Prefer local-only pg over our custom pg_local implementation as
pg is a better process group implementation than pg_local.
pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.

26. Start a local-only pg when Rabbit boots:
> A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name:
> pg:start_link(node()).
Register AMQP 1.0 connections and sessions with pg.

In future we should remove pg_local and instead use the new local-only
pg for all registered processes such as AMQP 0.9.1 connections and channels.

27. Requeue messages if link detached
Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed'
field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed,
we expect every outstanding delivery to be requeued.
In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued.
Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1:
"After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can
still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them."
[https://www.rabbitmq.com/consumers.html#unsubscribing]
An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries

28. Init AMQP session with BEGIN frame
Similar to how there can't be an MQTT processor without a CONNECT
frame, there can't be an AMQP session without a BEGIN frame.
This allows having strict dialyzer types for session flow control
fields (i.e. not allowing 'undefined').

29. Move serial_number to AMQP 1.0 common lib
such that it can be used by both AMQP 1.0 server and client

30. Fix AMQP client to do serial number arithmetic.

31. AMQP client: Differentiate between delivery-id and transfer-id for better
understandability.

32. Fix link flow control in classic queues
This commit fixes
```
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
```
followed by
```
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2
```
Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.

The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.
Fixes #2597

The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.

Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.

33. The double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This is a big simplification.

34. This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.

Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.

Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).

Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.

This approach is safe and simple.

The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:

i. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.

ii. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simultaneously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?

35. Support stream filtering in AMQP 1.0 (by @acogoluegnes)
Use the x-stream-filter-value message annotation
to carry the filter value in a published message.
Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered
filters when creating a receiver that wants to filter
out messages from a stream.

36. Remove credit extension from AMQP 0.9.1 client

37. Support maintenance mode closing AMQP 1.0 connections.

38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.

39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default.
    The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment
    tools from failing that execute:
```
rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
```

40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`.
Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`:
```
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}
```

 ## Benchmarks

 ### Throughput & Latency

Setup:
* Single node Ubuntu 22.04
* Erlang 26.1.1

Start RabbitMQ:
```
make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"
```

Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.

Start client:
https://github.com/ssorj/quiver
https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
```

1. Classic queue
```
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```

This commit:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s

Latencies by percentile:

          0% ........ 0 ms       90.00% ........ 9 ms
         25% ........ 2 ms       99.00% ....... 14 ms
         50% ........ 4 ms       99.90% ....... 17 ms
        100% ....... 26 ms       99.99% ....... 24 ms
```

RabbitMQ 3.x (main branch as of 30 January 2024):
```
---------------------- Sender -----------------------  --------------------- Receiver ----------------------  --------
Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Lat [ms]
-----------------------------------------------------  -----------------------------------------------------  --------
     2.1        130,814      65,342        6     73.6       2.1          3,217       1,607        0      8.0       511
     4.1        163,580      16,367        2     74.1       4.1          3,217           0        0      8.0         0
     6.1        229,114      32,767        3     74.1       6.1          3,217           0        0      8.0         0
     8.1        261,880      16,367        2     74.1       8.1         67,874      32,296        8      8.2     7,662
    10.1        294,646      16,367        2     74.1      10.1         67,874           0        0      8.2         0
    12.1        360,180      32,734        3     74.1      12.1         67,874           0        0      8.2         0
    14.1        392,946      16,367        3     74.1      14.1         68,604         365        0      8.2    12,147
    16.1        458,480      32,734        3     74.1      16.1         68,604           0        0      8.2         0
    18.1        491,246      16,367        2     74.1      18.1         68,604           0        0      8.2         0
    20.1        556,780      32,767        4     74.1      20.1         68,604           0        0      8.2         0
    22.1        589,546      16,375        2     74.1      22.1         68,604           0        0      8.2         0
receiver timed out
    24.1        622,312      16,367        2     74.1      24.1         68,604           0        0      8.2         0
quiver:  error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
  File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
    _plano.wait(receiver, check=True)
  File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
    raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
```

2. Quorum queue:
```
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s

Latencies by percentile:

          0% ....... 11 ms       90.00% ....... 23 ms
         25% ....... 15 ms       99.00% ....... 28 ms
         50% ....... 18 ms       99.90% ....... 33 ms
        100% ....... 49 ms       99.99% ....... 47 ms
```

RabbitMQ 3.x:
```
---------------------- Sender -----------------------  --------------------- Receiver ----------------------  --------
Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Lat [ms]
-----------------------------------------------------  -----------------------------------------------------  --------
     2.1        130,814      65,342        9     69.9       2.1         18,430       9,206        5      7.6     1,221
     4.1        163,580      16,375        5     70.2       4.1         18,867         218        0      7.6     2,168
     6.1        229,114      32,767        6     70.2       6.1         18,867           0        0      7.6         0
     8.1        294,648      32,734        7     70.2       8.1         18,867           0        0      7.6         0
    10.1        360,182      32,734        6     70.2      10.1         18,867           0        0      7.6         0
    12.1        425,716      32,767        6     70.2      12.1         18,867           0        0      7.6         0
receiver timed out
    14.1        458,482      16,367        5     70.2      14.1         18,867           0        0      7.6         0
quiver:  error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
  File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
    _plano.wait(receiver, check=True)
  File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
    raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
```

3. Stream:
```
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose
```

This commit:
```
Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s
```

RabbitMQ 3.x:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s
```

 ### Memory usage

Start RabbitMQ:
```
ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
```

```
/bin/cat rabbitmq.conf

tcp_listen_options.sndbuf  = 2048
tcp_listen_options.recbuf  = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none
```

Create 50k connections with 2 sessions per connection, i.e. 100k session in total:

```go
package main

import (
	"context"
	"log"
	"time"

	"github.com/Azure/go-amqp"
)

func main() {
	for i := 0; i < 50000; i++ {
		conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
		if err != nil {
			log.Fatal("dialing AMQP server:", err)
		}
		_, err = conn.NewSession(context.TODO(), nil)
		if err != nil {
			log.Fatal("creating AMQP session:", err)
		}
		_, err = conn.NewSession(context.TODO(), nil)
		if err != nil {
			log.Fatal("creating AMQP session:", err)
		}
	}
	log.Println("opened all connections")
	time.Sleep(5 * time.Hour)
}
```

This commit:
```
erlang:memory().
[{total,4586376480},
 {processes,4025898504},
 {processes_used,4025871040},
 {system,560477976},
 {atom,1048841},
 {atom_used,1042841},
 {binary,233228608},
 {code,21449982},
 {ets,108560464}]

erlang:system_info(process_count).
450289
```
7 procs per connection + 1 proc per session.
(7 + 2*1) * 50,000 = 450,000 procs

RabbitMQ 3.x:
```
erlang:memory().
[{total,15168232704},
 {processes,14044779256},
 {processes_used,14044755120},
 {system,1123453448},
 {atom,1057033},
 {atom_used,1052587},
 {binary,236381264},
 {code,21790238},
 {ets,391423744}]

erlang:system_info(process_count).
1850309
```
7 procs per connection + 15 per session
(7 + 2*15) * 50,000 = 1,850,000 procs

50k connections + 100k session require
with this commit: 4.5 GB
in RabbitMQ 3.x: 15 GB

 ## Future work

1. More efficient parser and serializer
2. TODO in mc_amqp: Do not store the parsed message on disk.
3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP
clients to create RabbitMQ objects (queues, exchanges, ...).
2024-02-28 14:15:20 +01:00
David Ansari 8b151f43f7 Parse 2 bytes encoded AMQP boolean to Erlang boolean
An AMQP boolean can by encoded using 1 byte or 2 bytes:
https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-boolean

Prior to this commit, our Erlang parser returned:
* Erlang terms `true` or `false` for the 1 byte AMQP encoding
* Erlang terms `{boolean, true}` or `{boolean, false}` for the 2 byte AMQP enconding

Having a serializer and parser that perform the opposite actions such
that
```
Term = parse(serialize(Term))
```
is desirable as it provides a symmetric property useful not only for
property based testing, but also for avoiding altering message hashes
when serializing and parsing the same term.

However, dealing wth `{boolean, boolean()}` tuples instead of `boolean()` is very unhandy since
all Erlang code must take care of both forms leading to subtle bugs as
occurred in:
* 4cbeab8974/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl (L155-L158)
* b8173c9d3b/deps/rabbitmq_mqtt/src/mc_mqtt.erl (L83-L88)
* b8173c9d3b/deps/rabbit/src/mc_amqpl.erl (L123-L127)

Therefore, this commits decides to take the safe approach and always
parse to an Erlang `boolean()` independent of whether the AMQP boolean
was encoded with 1 or 2 bytes.
2024-02-16 17:57:28 +01:00
dependabot[bot] 8b4862972e
build(deps-dev): bump org.junit.jupiter:junit-jupiter
Bumps [org.junit.jupiter:junit-jupiter](https://github.com/junit-team/junit5) from 5.10.1 to 5.10.2.
- [Release notes](https://github.com/junit-team/junit5/releases)
- [Commits](https://github.com/junit-team/junit5/compare/r5.10.1...r5.10.2)

---
updated-dependencies:
- dependency-name: org.junit.jupiter:junit-jupiter
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-02-05 18:29:48 +00:00
dependabot[bot] 9f24f9348c
build(deps-dev): bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.25.2 to 3.25.3.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.25.2...assertj-build-3.25.3)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-02-05 18:28:08 +00:00
Michael Klishin 9c79ad8d55 More missed license header updates #9969 2024-02-05 12:26:25 -05:00
Michael Klishin f414c2d512
More missed license header updates #9969 2024-02-05 11:53:50 -05:00
David Ansari bedcae18c2 Fix MQTT test flake
Prior to this commit test block_connack_timeout
flaked when 2 new ports got created instead of only 1
in line
```
[NewPort] = Ports -- Ports0,
```

This commit filters for tcp_inet ports.
This will always return the port of the new MQTT connection.
2024-01-27 17:43:38 +01:00
dependabot[bot] 2b83b000f7
build(deps-dev): bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.25.1 to 3.25.2.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.25.1...assertj-build-3.25.2)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-24 19:01:02 +00:00
Arnaud Cogoluègnes 1f89ede396
Remove rabbit_authz_backend:state_can_expire/0
Use expiry_timestamp/1 instead, which returns 'never'
if the credentials do not expire.

Fixes #10382
2024-01-24 09:58:59 +01:00
dependabot[bot] 4b5ef6474f
build(deps): bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.42.0 to 2.43.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.42.0...lib/2.43.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-23 18:56:15 +00:00
Michael Klishin c1d37e3e02
Merge pull request #10364 from rabbitmq/flaky-mc-flake-flake
Reduce flakiness of certain Common Test suites
2024-01-22 16:22:24 -05:00
Karl Nilsson c10b4dc0f0 protocol_interop_SUITE - try a durable queue for amqp part 2024-01-22 15:27:30 +00:00
Karl Nilsson 7e2f148dd4 Try a little sleep in an mqtt test
It could help, we'll see.
2024-01-18 16:06:36 +00:00
David Ansari 6a3ba6210a
Reduce per message disk overhead (#10339)
* Reduce per message disk overhead

Message container annotation keys are stored on disk.
By shortening them we save 95 - 58 = 37 bytes per message.
```
1> byte_size(term_to_binary(#{exchange => <<>>, routing_keys => [<<"my-key">>], durable => true, priority => 3, timestamp => 1000})).
95
2> byte_size(term_to_binary(#{x => <<>>, rk => [<<"my-key">>], d => true, p => 3, ts => 1000})).
58
```
This should somewhat reduce disk I/O and disk space.

* Ensure durable is a boolean

Prevent key 'durable' with value 'undefined' being added to the
mc annotations, for example when the durable field was not set, but
another AMQP 1.0 header field was set.

* Apply feedback
2024-01-18 11:53:02 +01:00
dependabot[bot] e84d2d9cf6
build(deps): bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.41.1 to 2.42.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/maven/2.41.1...lib/2.42.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-15 18:17:40 +00:00
dependabot[bot] 12b6225387
build(deps): bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.2.3 to 3.2.5.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.2.3...surefire-3.2.5)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-10 18:59:58 +00:00
dependabot[bot] 0e22221efc
build(deps-dev): bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.25.0 to 3.25.1.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.25.0...assertj-build-3.25.1)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-03 18:37:55 +00:00
Michael Klishin 01092ff31f
(c) year bumps 2024-01-01 22:02:20 -05:00
dependabot[bot] 506dccb172
build(deps-dev): bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.24.2 to 3.25.0.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.24.2...assertj-build-3.25.0)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-01 18:59:45 +00:00
David Ansari 78b4fcc899 Allow MQTT QoS 0 subscribers to reconnect
The solution in #10203 has the following issues:
1. Bindings can be left ofter in Mnesia table rabbit_durable_queue.
One solution to 1. would be to first delete the old queue via
`rabbit_amqqueue:internal_delete(Q, User, missing_owner)`
and subsequently declare the new queue via
`rabbit_amqqueue:internal_declare(Q, false)`
However, even then, it suffers from:
2. Race conditions between `rabbit_amqqueue:on_node_down/1`
and `rabbit_mqtt_qos0_queue:declare/2`:
`rabbit_amqqueue:on_node_down/1` could first read the queue records that
need to be deleted, thereafter `rabbit_mqtt_qos0_queue:declare/2` could
re-create the queue owned by the new connection PID, and `rabbit_amqqueue:on_node_down/1`
could subsequently delete the re-created queue.

Unfortunately, `rabbit_amqqueue:on_node_down/1` does not delete
transient queues in one isolated transaction. Instead it first reads
queues and subsequenlty deletes queues in batches making it prone to
race conditions.

Ideally, this commit deletes all rabbit_mqtt_qos0_queue queues of the
node that has crashed including their bindings.
However, doing so in one transaction is risky as there may be millions
of such queues and the current code path applies the same logic on all
live nodes resulting in conflicting transactions and therefore a long
database operation.

Hence, this commit uses the simplest approach which should still be
safe:
Do not remove rabbit_mqtt_qos0_queue queues if a node crashes.
Other live nodes will continue to route to these dead queues.
That should be okay, given that the rabbit_mqtt_qos0_queue clients auto
confirm.
Continuing routing however has the effect of counting as routing result
for AMQP 0.9.1 `mandatory` property.
If an MQTT client re-connects to a live node with the same client ID,
the new node will delete and then re-create the queue.
Once the crashed node comes back online, it will clean up its leftover
queues and bindings.
2023-12-27 20:47:06 -05:00
dependabot[bot] 67b8d30281
build(deps): bump org.apache.maven.plugins:maven-compiler-plugin
Bumps [org.apache.maven.plugins:maven-compiler-plugin](https://github.com/apache/maven-compiler-plugin) from 3.12.0 to 3.12.1.
- [Release notes](https://github.com/apache/maven-compiler-plugin/releases)
- [Commits](https://github.com/apache/maven-compiler-plugin/compare/maven-compiler-plugin-3.12.0...maven-compiler-plugin-3.12.1)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-compiler-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-25 18:09:25 +00:00
David Ansari 9487189dc6 Overwrite rabbit_mqtt_qos0_queue record from crashed node
When a node is shut down cleanly, the rabbit_mqtt_qos0_queue record is
removed from Mnesia.
When a node crashes and subsequently reboots the new node incarnation
removes the old rabbit_mqtt_qos0_queue record from Mnesia (via
rabbit_mqtt_qos0_queue:recover/2)

However, when a node crashes, the rabbit_mqtt_qos0_queue will be removed
from Mnesia table rabbit_queue, but will still be present in table
rabbit_durable_queue on the other live nodes.
Prior to this commit, when the same MQTT client (i.e. same MQTT client
ID) re-connects from the crashed node to another live node and
re-subscribes, the following error occurred:
```
[info] <0.43155.0> Accepted MQTT connection 10.105.0.18:60508 -> 10.105.0.10:1883 for client ID nodered_24e214feb018a232
[debug] <0.43155.0> Received a SUBSCRIBE for topic(s) [{mqtt_topic,
[debug] <0.43155.0>                                        <<"as923/gateway/+/command/#">>,0}]
[error] <0.43155.0> Failed to declare queue 'mqtt-subscription-nodered_24e214feb018a232qos0' in vhost '/': {absent,
[error] <0.43155.0>                                                                                         {amqqueue,
[error] <0.43155.0>                                                                                          {resource,
[error] <0.43155.0>                                                                                           <<"/">>,
[error] <0.43155.0>                                                                                           queue,
[error] <0.43155.0>                                                                                           <<"mqtt-subscription-nodered_24e214feb018a232qos0">>},
[error] <0.43155.0>                                                                                          true,
[error] <0.43155.0>                                                                                          false,
[error] <0.43155.0>                                                                                          <15486.32690.0>,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          <15486.32690.0>,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          [{vhost,
[error] <0.43155.0>                                                                                            <<"/">>},
[error] <0.43155.0>                                                                                           {name,
[error] <0.43155.0>                                                                                            <<"ha-all-mqtt">>},
[error] <0.43155.0>                                                                                           {pattern,
[error] <0.43155.0>                                                                                            <<"^mqtt-">>},
[error] <0.43155.0>                                                                                           {'apply-to',
[error] <0.43155.0>                                                                                            <<"all">>},
[error] <0.43155.0>                                                                                           {definition,
[error] <0.43155.0>                                                                                            [{<<"ha-mode">>,
[error] <0.43155.0>                                                                                              <<"all">>}]},
[error] <0.43155.0>                                                                                           {priority,
[error] <0.43155.0>                                                                                            0}],
[error] <0.43155.0>                                                                                          undefined,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          undefined,
[error] <0.43155.0>                                                                                          live,
[error] <0.43155.0>                                                                                          0,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          <<"/">>,
[error] <0.43155.0>                                                                                          #{user =>
[error] <0.43155.0>                                                                                             <<"iottester">>},
[error] <0.43155.0>                                                                                          rabbit_mqtt_qos0_queue,
[error] <0.43155.0>                                                                                          #{}},
[error] <0.43155.0>                                                                                         nodedown}
[error] <0.43155.0> MQTT protocol error on connection 10.105.0.18:60508 -> 10.105.0.10:1883: subscribe_error
```

This commit fixes this error allowing an MQTT client that connects with CleanSession=true and
subscribes with QoS 0 to re-connect and re-subscribe to another live
node if the original Rabbit node crashes.

Reported in https://groups.google.com/g/rabbitmq-users/c/pxgy0QiwilM/m/LkJQ-3DyBgAJ
2023-12-21 17:30:15 +01:00
dependabot[bot] 0f5c2d2325
build(deps): bump org.apache.maven.plugins:maven-compiler-plugin
Bumps [org.apache.maven.plugins:maven-compiler-plugin](https://github.com/apache/maven-compiler-plugin) from 3.11.0 to 3.12.0.
- [Release notes](https://github.com/apache/maven-compiler-plugin/releases)
- [Commits](https://github.com/apache/maven-compiler-plugin/compare/maven-compiler-plugin-3.11.0...maven-compiler-plugin-3.12.0)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-compiler-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-19 18:09:22 +00:00
David Ansari abcbe5e647 Fix test expectation
as `main` and `v3.12.x` branches are currently red due to
https://github.com/rabbitmq/rabbitmq-server/pull/10139
2023-12-15 09:35:54 +01:00
David Ansari f44c851293 Fix crash when closing connection
Avoid the following crash
```
** Reason for termination ==
** {mqtt_unexpected_cast,{shutdown,"Closed via management plugin"}}

  crasher:
    initial call: rabbit_mqtt_reader:init/1
    pid: <0.1096.0>
    registered_name: []
    exception exit: {mqtt_unexpected_cast,
                        {shutdown,"Closed via management plugin"}}
      in function  gen_server:handle_common_reply/8 (gen_server.erl, line 1208)
```
when closing MQTT or Stream connections via HTTP API endpoint
```
/connections/username/:username
```
2023-12-14 12:35:51 +01:00
dependabot[bot] b3f33aa42c
build(deps): bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.2.2 to 3.2.3.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.2.2...surefire-3.2.3)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-13 18:51:16 +00:00
dependabot[bot] aa0095aae9
Bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.41.0 to 2.41.1.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.41.0...maven/2.41.1)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-04 18:13:42 +00:00
dependabot[bot] fb90276710
Bump ch.qos.logback:logback-classic
Bumps [ch.qos.logback:logback-classic](https://github.com/qos-ch/logback) from 1.2.12 to 1.2.13.
- [Commits](https://github.com/qos-ch/logback/compare/v_1.2.12...v_1.2.13)

---
updated-dependencies:
- dependency-name: ch.qos.logback:logback-classic
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-01 18:30:15 +00:00
dependabot[bot] c69de4a083
Bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.40.0 to 2.41.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.40.0...lib/2.41.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-11-27 18:19:07 +00:00
Karl Nilsson f67058ac6c
Merge pull request #9830 from rabbitmq/mc-refinements
Message container conversion improvements
2023-11-24 10:22:34 +00:00
Karl Nilsson 61f13d0bb7 Add UUID conversion to mc_mqtt 2023-11-23 17:36:34 +00:00
Michael Klishin 1b642353ca
Update (c) according to [1]
1. https://investors.broadcom.com/news-releases/news-release-details/broadcom-and-vmware-intend-close-transaction-november-22-2023
2023-11-21 23:18:22 -05:00
David Ansari 95c5f2ec9e Some small fixes 2023-11-16 12:33:17 +01:00
Karl Nilsson c4fd947aad MC: various changes and improvements
To refine conversion behaviour add additional tests
and ensure it matches the documentation.

mc: optionally capture source environment

And pass target environment to mc:convert

This allows environmental data and configuration to be captured and
used to modify and complete conversion logic whilst allowing conversion
code to remain pure and portable.
2023-11-15 11:04:49 +00:00
dependabot[bot] 37acb08feb
Bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.2.1 to 3.2.2.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.2.1...surefire-3.2.2)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-11-07 18:25:28 +00:00
dependabot[bot] 0664b7ec7b
Bump org.junit.jupiter:junit-jupiter
Bumps [org.junit.jupiter:junit-jupiter](https://github.com/junit-team/junit5) from 5.10.0 to 5.10.1.
- [Release notes](https://github.com/junit-team/junit5/releases)
- [Commits](https://github.com/junit-team/junit5/compare/r5.10.0...r5.10.1)

---
updated-dependencies:
- dependency-name: org.junit.jupiter:junit-jupiter
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-11-06 18:46:05 +00:00
David Ansari d1940c997e Fix Khepri flake
Previously, test pubsub was flaky
```
{shared_SUITE,pubsub,766}
{test_case_failed,missing m1}
```
because the binding wasn't present yet on node 0 when publishing to node
0.
2023-11-03 09:52:53 +01:00
David Ansari 75e777b833 Provide more debug output for flaky test
In
https://github.com/rabbitmq/rabbitmq-server/actions/runs/6735035912/job/18308551274?pr=9865
test
```
bazel test //deps/rabbitmq_mqtt:v5_SUITE -t- --test_sharding_strategy=disabled \
    --test_env FOCUS="-group [mqtt,cluster_size_1] -case will_delay_equals_session_expiry" \
    --test_env RABBITMQ_METADATA_STORE=mnesia --config=rbe-26 --runs_per_test=100
```
seems to flake.
2023-11-02 18:40:38 +01:00
David Ansari 43dfa4d6ac Fix MQTT test flakes
Tests session_reconnect and session_takeover were flaky, specifically
when run under Khepri.

The issue was in the test itself that the connect properties didn't
apply. Therefore, prior to this commit an exclusive queue got created.
2023-10-27 17:30:41 +02:00
David Ansari 7cd91a8fd2 Do not skip test
On `main` branch and v3.12.6 feature flag delete_ra_cluster_mqtt_node is
supported. Instead of skipping the entire test if that feature flag is
not enabled, enable the feature flag and run the test.

More generally:
"Instead of verifying if a feature flag is enabled or not, it's best to enable
it and react from the return value (success or failure).
Mixed version testing always turn off all feature flags by default.
So in the future, even though all nodes supports the mentionned
feature flag, the testcase will still be skipped." [JSP]
2023-10-27 16:49:13 +02:00
David Ansari c8b90488e7 Skip some assertions in mixed version tests
See commit message 00c77e0a1a for details.

In a multi node mixed version cluster where the lower version is
compiled with a different OTP version, anonymous Ra leader queries will
fail with a badfun error if initiated on the higher version and executed
on the leader on the lower version node.
2023-10-27 15:12:51 +02:00
Michael Klishin 673548f343
Merge pull request #9805 from rabbitmq/skip-maintenance
Skip test maintenance in mixed version mode
2023-10-27 03:31:27 -04:00
David Ansari 5cbeedffd9 Run test duplicate_client_id with Khepri
The only reason to skip tests with Khepri is if they use
classic mirror queues or use Mnesia directly.
2023-10-27 09:15:14 +02:00
David Ansari 00c77e0a1a Skip test maintenance in mixed version mode
This test fails when MQTT client ID tracking is performed in Ra, and the
higher version node gets compiled with a different OTP version (26) than
the lower version node (25).

The reason is described in 83eede7ef2
```
An interesting side note learned here is that the compiled file
rabbit_mqtt_collector must not be changed. This commit only modifies
function specs. However as soon as the compiled code is changed, this
module becomes a new version. The new version causes the anonymous ra query
function to fail in mixed clusters: When the old node does a
ra:leader_query where the leader is on the new node, the query function
fails on the new node with `badfun` because the new node does not have
the same module version. For more context, read:
https://web.archive.org/web/20181017104411/http://www.javalimit.com/2010/05/passing-funs-to-other-erlang-nodes.html
```

We shouldn’t use an anonymous function for ra:leader_query or ra:consistent_query.
Instead we should use the {M,F,A} form.
9e5d437a0a/src/ra.erl (L102-L103)

In MQTT the anonymous function is used in bcb95c949d/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl (L50)
This causes the query to return a bad fun error (silently ignored in bcb95c949d/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl (L70-L71) )
when executed on a different node and either:
1.) Any code in file rabbit_mqtt_collector.erl changed, or
2.) The code gets compiled with a different OTP version.

2.) is the reason for a failing mixed version test in https://github.com/rabbitmq/rabbitmq-server/pull/8553 because both higher and lower versions run OTP 26,
but the higher version node got compiled with 26 while the lower version node got compiled with 25.

The same file
compiled with OTP 26.0.1
```
1> rabbit_mqtt_collector:module_info(attributes).
[{vsn,[30045739264236496640687548892374951597]}]
```

compiled with OTP 25.3.2
```
1> rabbit_mqtt_collector:module_info(attributes).
[{vsn,[168144385419873449889532520247510637232]}]
```

Due to the very low impact that maintenance mode will not close all MQTT
client connections with feature flag delete_ra_cluster_mqtt_node being
disabled, we skip this test.
2023-10-27 08:51:00 +02:00
dependabot[bot] 1d9524ea98
Bump com.rabbitmq:amqp-client
Bumps [com.rabbitmq:amqp-client](https://github.com/rabbitmq/rabbitmq-java-client) from 5.19.0 to 5.20.0.
- [Release notes](https://github.com/rabbitmq/rabbitmq-java-client/releases)
- [Commits](https://github.com/rabbitmq/rabbitmq-java-client/compare/v5.19.0...v5.20.0)

---
updated-dependencies:
- dependency-name: com.rabbitmq:amqp-client
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-10-25 18:55:23 +00:00
dependabot[bot] 7f45bb1949
Bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.1.2 to 3.2.1.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.1.2...surefire-3.2.1)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-10-24 18:27:56 +00:00
dependabot[bot] 83aa17cd7f
Bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.39.0 to 2.40.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.39.0...lib/2.40.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-09-29 18:58:01 +00:00
Diana Parra Corbacho 5f0981c5a3
Allow to use Khepri database to store metadata instead of Mnesia
[Why]

Mnesia is a very powerful and convenient tool for Erlang applications:
it is a persistent disc-based database, it handles replication accross
multiple Erlang nodes and it is available out-of-the-box from the
Erlang/OTP distribution. RabbitMQ relies on Mnesia to manage all its
metadata:

* virtual hosts' properties
* intenal users
* queue, exchange and binding declarations (not queues data)
* runtime parameters and policies
* ...

Unfortunately Mnesia makes it difficult to handle network partition and,
as a consequence, the merge conflicts between Erlang nodes once the
network partition is resolved. RabbitMQ provides several partition
handling strategies but they are not bullet-proof. Users still hit
situations where it is a pain to repair a cluster following a network
partition.

[How]

@kjnilsson created Ra [1], a Raft consensus library that RabbitMQ
already uses successfully to implement quorum queues and streams for
instance. Those queues do not suffer from network partitions.

We created Khepri [2], a new persistent and replicated database engine
based on Ra and we want to use it in place of Mnesia in RabbitMQ to
solve the problems with network partitions.

This patch integrates Khepri as an experimental feature. When enabled,
RabbitMQ will store all its metadata in Khepri instead of Mnesia.

This change comes with behavior changes. While Khepri remains disabled,
you should see no changes to the behavior of RabbitMQ. If there are
changes, it is a bug. After Khepri is enabled, there are significant
changes of behavior that you should be aware of.

Because it is based on the Raft consensus algorithm, when there is a
network partition, only the cluster members that are in the partition
with at least `(Number of nodes in the cluster ÷ 2) + 1` number of nodes
can "make progress". In other words, only those nodes may write to the
Khepri database and read from the database and expect a consistent
result.

For instance in a cluster of 5 RabbitMQ nodes:
* If there are two partitions, one with 3 nodes, one with 2 nodes, only
  the group of 3 nodes will be able to write to the database.
* If there are three partitions, two with 2 nodes, one with 1 node, none
  of the group can write to the database.

Because the Khepri database will be used for all kind of metadata, it
means that RabbitMQ nodes that can't write to the database will be
unable to perform some operations. A list of operations and what to
expect is documented in the associated pull request and the RabbitMQ
website.

This requirement from Raft also affects the startup of RabbitMQ nodes in
a cluster. Indeed, at least a quorum number of nodes must be started at
once to allow nodes to become ready.

To enable Khepri, you need to enable the `khepri_db` feature flag:

    rabbitmqctl enable_feature_flag khepri_db

When the `khepri_db` feature flag is enabled, the migration code
performs the following two tasks:
1. It synchronizes the Khepri cluster membership from the Mnesia
   cluster. It uses `mnesia_to_khepri:sync_cluster_membership/1` from
   the `khepri_mnesia_migration` application [3].
2. It copies data from relevant Mnesia tables to Khepri, doing some
   conversion if necessary on the way. Again, it uses
   `mnesia_to_khepri:copy_tables/4` from `khepri_mnesia_migration` to do
   it.

This can be performed on a running standalone RabbitMQ node or cluster.
Data will be migrated from Mnesia to Khepri without any service
interruption. Note that during the migration, the performance may
decrease and the memory footprint may go up.

Because this feature flag is considered experimental, it is not enabled
by default even on a brand new RabbitMQ deployment.

More about the implementation details below:

In the past months, all accesses to Mnesia were isolated in a collection
of `rabbit_db*` modules. This is where the integration of Khepri mostly
takes place: we use a function called `rabbit_khepri:handle_fallback/1`
which selects the database and perform the query or the transaction.
Here is an example from `rabbit_db_vhost`:

* Up until RabbitMQ 3.12.x:

        get(VHostName) when is_binary(VHostName) ->
            get_in_mnesia(VHostName).

* Starting with RabbitMQ 3.13.0:

        get(VHostName) when is_binary(VHostName) ->
            rabbit_khepri:handle_fallback(
              #{mnesia => fun() -> get_in_mnesia(VHostName) end,
                khepri => fun() -> get_in_khepri(VHostName) end}).

This `rabbit_khepri:handle_fallback/1` function relies on two things:
1. the fact that the `khepri_db` feature flag is enabled, in which case
   it always executes the Khepri-based variant.
4. the ability or not to read and write to Mnesia tables otherwise.

Before the feature flag is enabled, or during the migration, the
function will try to execute the Mnesia-based variant. If it succeeds,
then it returns the result. If it fails because one or more Mnesia
tables can't be used, it restarts from scratch: it means the feature
flag is being enabled and depending on the outcome, either the
Mnesia-based variant will succeed (the feature flag couldn't be enabled)
or the feature flag will be marked as enabled and it will call the
Khepri-based variant. The meat of this function really lives in the
`khepri_mnesia_migration` application [3] and
`rabbit_khepri:handle_fallback/1` is a wrapper on top of it that knows
about the feature flag.

However, some calls to the database do not depend on the existence of
Mnesia tables, such as functions where we need to learn about the
members of a cluster. For those, we can't rely on exceptions from
Mnesia. Therefore, we just look at the state of the feature flag to
determine which database to use. There are two situations though:

* Sometimes, we need the feature flag state query to block because the
  function interested in it can't return a valid answer during the
  migration. Here is an example:

        case rabbit_khepri:is_enabled(RemoteNode) of
            true  -> can_join_using_khepri(RemoteNode);
            false -> can_join_using_mnesia(RemoteNode)
        end

* Sometimes, we need the feature flag state query to NOT block (for
  instance because it would cause a deadlock). Here is an example:

        case rabbit_khepri:get_feature_state() of
            enabled -> members_using_khepri();
            _       -> members_using_mnesia()
        end

Direct accesses to Mnesia still exists. They are limited to code that is
specific to Mnesia such as classic queue mirroring or network partitions
handling strategies.

Now, to discover the Mnesia tables to migrate and how to migrate them,
we use an Erlang module attribute called
`rabbit_mnesia_tables_to_khepri_db` which indicates a list of Mnesia
tables and an associated converter module. Here is an example in the
`rabbitmq_recent_history_exchange` plugin:

    -rabbit_mnesia_tables_to_khepri_db(
       [{?RH_TABLE, rabbit_db_rh_exchange_m2k_converter}]).

The converter module  — `rabbit_db_rh_exchange_m2k_converter` in this
example  — is is fact a "sub" converter module called but
`rabbit_db_m2k_converter`. See the documentation of a `mnesia_to_khepri`
converter module to learn more about these modules.

[1] https://github.com/rabbitmq/ra
[2] https://github.com/rabbitmq/khepri
[3] https://github.com/rabbitmq/khepri_mnesia_migration

See #7206.

Co-authored-by: Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com>
Co-authored-by: Diana Parra Corbacho <dparracorbac@vmware.com>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
2023-09-29 16:00:11 +02:00
dependabot[bot] 88bb95c3ea
Bump com.rabbitmq:amqp-client
Bumps [com.rabbitmq:amqp-client](https://github.com/rabbitmq/rabbitmq-java-client) from 5.18.0 to 5.19.0.
- [Release notes](https://github.com/rabbitmq/rabbitmq-java-client/releases)
- [Commits](https://github.com/rabbitmq/rabbitmq-java-client/compare/v5.18.0...v5.19.0)

---
updated-dependencies:
- dependency-name: com.rabbitmq:amqp-client
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-09-27 18:09:42 +00:00
David Ansari 1eeb9b3a4f Fix MQTT test flake 2023-09-12 10:22:30 +02:00
David Ansari 62710f576e Add integration test MQTT 5.0 -> Stream
Add an integration test that sends via MQTT 5.0,
converts the MQTT message to an AMQP 1.0 message via mc_mqtt,
and consumes via the Stream protocol.
2023-09-05 11:04:44 +02:00
David Ansari 8b4a26be12 Fix test flake
Fix the followin flake:
```
[ERROR] com.rabbitmq.mqtt.test.MqttTest.sessionRedelivery(TestInfo) -- Time elapsed: 0.959 s <<< FAILURE!
org.opentest4j.AssertionFailedError:
Expecting value to be false but was true
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at com.rabbitmq.mqtt.test.MqttTest.sessionRedelivery(MqttTest.java:523)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
[ERROR] Failures:
[ERROR]   MqttTest.sessionRedelivery:523
Expecting value to be false but was true
```
2023-09-04 10:20:30 +02:00
Karl Nilsson 119f034406
Message Containers (#5077)
This PR implements an approach for a "protocol (data format) agnostic core" where the format of the message isn't converted at point of reception.

Currently all non AMQP 0.9.1 originating messages are converted into a AMQP 0.9.1 flavoured basic_message record before sent to a queue. If the messages are then consumed by the originating protocol they are converted back from AMQP 0.9.1. For some protocols such as MQTT 3.1 this isn't too expensive as MQTT is mostly a fairly easily mapped subset of AMQP 0.9.1 but for others such as AMQP 1.0 the conversions are awkward and in some cases lossy even if consuming from the originating protocol.

This PR instead wraps all incoming messages in their originating form into a generic, extensible message container type (mc). The container module exposes an API to get common message details such as size and various properties (ttl, priority etc) directly from the source data type. Each protocol needs to implement the mc behaviour such that when a message originating form one protocol is consumed by another protocol we convert it to the target protocol at that point.

The message container also contains annotations, dead letter records and other meta data we need to record during the lifetime of a message. The original protocol message is never modified unless it is consumed.

This includes conversion modules to and from amqp, amqpl (AMQP 0.9.1) and mqtt.


COMMIT HISTORY:

* Refactor away from using the delivery{} record

In many places including exchange types. This should make it
easier to move towards using a message container type instead of
basic_message.

Add mc module and move direct replies outside of exchange

Lots of changes incl classic queues

Implement stream support incl amqp conversions

simplify mc state record

move mc.erl

mc dlx stuff

recent history exchange

Make tracking work

But doesn't take a protocol agnostic approach as we just convert
everything into AMQP legacy and back. Might be good enough for now.

Tracing as a whole may want a bit of a re-vamp at some point.

tidy

make quorum queue peek work by legacy conversion

dead lettering fixes

dead lettering fixes

CMQ fixes

rabbit_trace type fixes

fixes

fix

Fix classic queue props

test assertion fix

feature flag and backwards compat

Enable message_container feature flag in some SUITEs

Dialyzer fixes

fixes

fix

test fixes

Various

Manually update a gazelle generated file

until a gazelle enhancement can be made
https://github.com/rabbitmq/rules_erlang/issues/185

Add message_containers_SUITE to bazel

and regen bazel files with gazelle from rules_erlang@main

Simplify essential proprty access

Such as durable, ttl and priority by extracting them into annotations
at message container init time.

Move type

to remove dependenc on amqp10 stuff in mc.erl

mostly because I don't know how to make bazel do the right thing

add more stuff

Refine routing header stuff

wip

Cosmetics

Do not use "maybe" as type name as "maybe" is a keyword since OTP 25
which makes Erlang LS complain.

* Dedup death queue names

* Fix function clause crashes

Fix failing tests in the MQTT shared_SUITE:
A classic queue message ID can be undefined as set in
fbe79ff47b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl (L1048)

Fix failing tests in the MQTT shared_SUITE-mixed:
When feature flag message_containers is disabled, the
message is not an #mc{} record, but a #basic_message{} record.

* Fix is_utf8_no_null crash

Prior to this commit, the function crashed if invalid UTF-8 was
provided, e.g.:
```
1> rabbit_misc:is_valid_shortstr(<<"😇"/utf16>>).
** exception error: no function clause matching rabbit_misc:is_utf8_no_null(<<216,61,222,7>>) (rabbit_misc.erl, line 1481)
```

* Implement mqtt mc behaviour

For now via amqp translation.

This is still work in progress, but the following SUITEs pass:
```
make -C deps/rabbitmq_mqtt ct-shared t=[mqtt,v5,cluster_size_1] FULL=1
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_1] FULL=1
```

* Shorten mc file names

Module name length matters because for each persistent message the #mc{}
record is persisted to disk.

```
1> iolist_size(term_to_iovec({mc, rabbit_mc_amqp_legacy})).
30
2> iolist_size(term_to_iovec({mc, mc_amqpl})).
17
```

This commit renames the mc modules:
```
ag -l rabbit_mc_amqp_legacy | xargs sed -i 's/rabbit_mc_amqp_legacy/mc_amqpl/g'
ag -l rabbit_mc_amqp | xargs sed -i 's/rabbit_mc_amqp/mc_amqp/g'
ag -l rabbit_mqtt_mc | xargs sed -i 's/rabbit_mqtt_mc/mc_mqtt/g'
```

* mc: make deaths an annotation + fixes

* Fix mc_mqtt protocol_state callback

* Fix test will_delay_node_restart

```
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_3]:will_delay_node_restart FULL=1
```

* Bazel run gazelle

* mix format rabbitmqctl.ex

* Ensure ttl annotation is refelected in amqp legacy protocol state

* Fix id access in message store

* Fix rabbit_message_interceptor_SUITE

* dializer fixes

* Fix rabbit:rabbit_message_interceptor_SUITE-mixed

set_annotation/3 should not result in duplicate keys

* Fix MQTT shared_SUITE-mixed

Up to 3.12 non-MQTT publishes were always QoS 1 regardless of delivery_mode.
75a953ce28/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl (L2075-L2076)
From now on, non-MQTT publishes are QoS 1 if durable.
This makes more sense.

The MQTT plugin must send a #basic_message{} to an old node that does
not understand message containers.

* Field content of 'v1_0.data' can be binary

Fix
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed \
    --test_env FOCUS="-group [mqtt,v4,cluster_size_1] -case trace" \
    -t- --test_sharding_strategy=disabled
```

* Remove route/2 and implement route/3 for all exchange types.

This removes the route/2 callback from rabbit_exchange_type and
makes route/3 mandatory instead. This is a breaking change and
will require all implementations of exchange types to update their
code, however this is necessary anyway for them to correctly handle
the mc type.

stream filtering fixes

* Translate directly from MQTT to AMQP 0.9.1

* handle undecoded properties in mc_compat

amqpl: put clause in right order

recover death deatails from amqp data

* Replace callback init_amqp with convert_from

* Fix return value of lists:keyfind/3

* Translate directly from AMQP 0.9.1 to MQTT

* Fix MQTT payload size

MQTT payload can be a list when converted from AMQP 0.9.1 for example

First conversions tests

Plus some other conversion related fixes.

bazel

bazel

translate amqp 1.0 null to undefined

mc: property/2 and correlation_id/message_id return type tagged values.

To ensure we can support a variety of types better.

The type type tags are AMQP 1.0 flavoured.

fix death recovery

mc_mqtt: impl new api

Add callbacks to allow protocols to compact data before storage

And make readable if needing to query things repeatedly.

bazel fix

* more decoding

* tracking mixed versions compat

* mc: flip default of `durable` annotation to save some data.

Assuming most messages are durable and that in memory messages suffer less
from persistence overhead it makes sense for a non existent `durable`
annotation to mean durable=true.

* mc conversion tests and tidy up

* mc make x_header unstrict again

* amqpl: death record fixes

* bazel

* amqp -> amqpl conversion test

* Fix crash in mc_amqp:size/1

Body can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0/ ct-system t=java
```
on branch native-amqp.

* Fix crash in lists:flatten/1

Data can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp.

* Fix crash in rabbit_writer

Running test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp resulted in the following crash:
```
crasher:
  initial call: rabbit_writer:enter_mainloop/2
  pid: <0.711.0>
  registered_name: []
  exception error: bad argument
    in function  size/1
       called as size([<<0>>,<<"Sw">>,[<<160,2>>,<<"hi">>]])
       *** argument 1: not tuple or binary
    in call from rabbit_binary_generator:build_content_frames/7 (rabbit_binary_generator.erl, line 89)
    in call from rabbit_binary_generator:build_simple_content_frames/4 (rabbit_binary_generator.erl, line 61)
    in call from rabbit_writer:assemble_frames/5 (rabbit_writer.erl, line 334)
    in call from rabbit_writer:internal_send_command_async/3 (rabbit_writer.erl, line 365)
    in call from rabbit_writer:handle_message/2 (rabbit_writer.erl, line 265)
    in call from rabbit_writer:handle_message/3 (rabbit_writer.erl, line 232)
    in call from rabbit_writer:mainloop1/2 (rabbit_writer.erl, line 223)
```
because #content.payload_fragments_rev is currently supposed to
be a flat list of binaries instead of being an iolist.

This commit fixes this crash inefficiently by calling
iolist_to_binary/1. A better solution would be to allow AMQP legacy's #content.payload_fragments_rev
to be an iolist.

* Add accidentally deleted line back

* mc: optimise mc_amqp internal format

By removint the outer records for message and delivery annotations
as well as application properties and footers.

* mc: optimis mc_amqp map_add by using upsert

* mc: refactoring and bug fixes

* mc_SUITE routingheader assertions

* mc remove serialize/1 callback as only used by amqp

* mc_amqp: avoid returning a nested list from protocol_state

* test and bug fix

* move infer_type to mc_util

* mc fixes and additiona assertions

* Support headers exchange routing for MQTT messages

When a headers exchange is bound to the MQTT topic exchange, routing
will be performend based on both MQTT topic (by the topic exchange) and
MQTT User Property (by the headers exchange).

This combines the best worlds of both MQTT 5.0 and AMQP 0.9.1 and
enables powerful routing topologies.

When the User Property contains the same name multiple times, only the
last name (and value) will be considered by the headers exchange.

* Fix crash when sending from stream to amqpl

When publishing a message via the stream protocol and consuming it via
AMQP 0.9.1, the following crash occurred prior to this commit:
```
crasher:
  initial call: rabbit_channel:init/1
  pid: <0.818.0>
  registered_name: []
  exception exit: {{badmatch,undefined},
                   [{rabbit_channel,handle_deliver0,4,
                                    [{file,"rabbit_channel.erl"},
                                     {line,2728}]},
                    {lists,foldl,3,[{file,"lists.erl"},{line,1594}]},
                    {rabbit_channel,handle_cast,2,
                                    [{file,"rabbit_channel.erl"},
                                     {line,728}]},
                    {gen_server2,handle_msg,2,
                                 [{file,"gen_server2.erl"},{line,1056}]},
                    {proc_lib,wake_up,3,
                              [{file,"proc_lib.erl"},{line,251}]}]}
```

This commit first gives `mc:init/3` the chance to set exchange and
routing_keys annotations.
If not set, `rabbit_stream_queue` will set these annotations assuming
the message was originally published via the stream protocol.

* Support consistent hash exchange routing for MQTT 5.0

When a consistent hash exchange is bound to the MQTT topic exchange,
MQTT 5.0 messages can be routed to queues consistently based on the
Correlation-Data in the PUBLISH packet.

* Convert MQTT 5.0 User Property

* to AMQP 0.9.1 headers
* from AMQP 0.9.1 headers
* to AMQP 1.0 application properties and message annotations
* from AMQP 1.0 application properties and message annotations

* Make use of Annotations in mc_mqtt:protocol_state/2

mc_mqtt:protocol_state/2 includes Annotations as parameter.
It's cleaner to make use of these Annotations when computing the
protocol state instead of relying on the caller (rabbitmq_mqtt_processor)
to compute the protocol state.

* Enforce AMQP 0.9.1 field name length limit

The AMQP 0.9.1 spec prohibits field names longer than 128 characters.
Therefore, when converting AMQP 1.0 message annotations, application
properties or MQTT 5.0 User Property to AMQP 0.9.1 headers, drop any
names longer than 128 characters.

* Fix type specs

Apply feedback from Michael Davis

Co-authored-by: Michael Davis <mcarsondavis@gmail.com>

* Add mc_mqtt unit test suite

Implement mc_mqtt:x_header/2

* Translate indicator that payload is UTF-8 encoded

when converting between MQTT 5.0 and AMQP 1.0

* Translate single amqp-value section from AMQP 1.0 to MQTT

Convert to a text representation, if possible, and indicate to MQTT
client that the payload is UTF-8 encoded. This way, the MQTT client will
be able to parse the payload.

If conversion to text representation is not possible, encode the payload
using the AMQP 1.0 type system and indiate the encoding via Content-Type
message/vnd.rabbitmq.amqp.

This Content-Type is not registered.
Type "message" makes sense since it's a message.
Vendor tree "vnd.rabbitmq.amqp" makes sense since merely subtype "amqp" is not
registered.

* Fix payload conversion

* Translate Response Topic between MQTT and AMQP

Translate MQTT 5.0 Response Topic to AMQP 1.0 reply-to address and vice
versa.

The Response Topic must be a UTF-8 encoded string.

This commit re-uses the already defined RabbitMQ target addresses:
```
"/topic/"     RK        Publish to amq.topic with routing key RK
"/exchange/"  X "/" RK  Publish to exchange X with routing key RK
```

By default, the MQTT topic exchange is configure dto be amq.topic using
the 1st target address.

When an operator modifies the mqtt.exchange, the 2nd target address is
used.

* Apply PR feedback

and fix formatting

Co-authored-by: Michael Davis <mcarsondavis@gmail.com>

* tidy up

* Add MQTT message_containers test

* consistent hash exchange: avoid amqp legacy conversion

When hashing on a header value.

* Avoid converting to amqp legacy when using exchange federation

* Fix test flake

* test and dialyzer fixes

* dialyzer fix

* Add MQTT protocol interoperability tests

Test receiving from and sending to MQTT 5.0 and
* AMQP 0.9.1
* AMQP 1.0
* STOMP
* Streams

* Regenerate portions of deps/rabbit/app.bzl with gazelle

I'm not exactly sure how this happened, but gazell seems to have been
run with an older version of the rules_erlang gazelle extension at
some point. This caused generation of a structure that is no longer
used. This commit updates the structure to the current pattern.

* mc: refactoring

* mc_amqpl: handle delivery annotations

Just in case they are included.

Also use iolist_to_iovec to create flat list of binaries when
converting from amqp with amqp encoded payload.

---------

Co-authored-by: David Ansari <david.ansari@gmx.de>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
Co-authored-by: Rin Kuryloski <kuryloskip@vmware.com>
2023-08-31 11:27:13 +01:00
dependabot[bot] 9014f251a4
Bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.38.0 to 2.39.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.38.0...lib/2.39.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-08-29 18:18:29 +00:00
David Ansari 64d69f76ff Bump test timeout
We see on GitHub Actions sporadically:
```
rabbit_ct_broker_helpers:wait_for_rabbitmq_nodes failed on line 378
Reason: {timetrap_timeout,60000}
```
2023-08-16 10:01:48 +02:00
David Ansari 484c1071f2 Fix flake in at_most_once_dead_letter_detect_cycle
The test case was flaky:
```
*** CT Error Notification 2023-08-15 14:25:51.016 ***🔗
v5_SUITE:at_most_once_dead_letter_detect_cycle failed on line 871
Reason: {test_case_failed,Received unexpected message: {publish,#{client_pid => <0.227.0>,dup => false,
                                        packet_id => 1,
                                        payload =>
                                            <<"at_most_once_dead_letter_detect_cycle">>,
                                        properties =>
                                            #{'Subscription-Identifier' => 10},
                                        qos => 1,retain => false,
                                        topic => <<"a/b">>,
                                        via => #Port<0.76>}}}
```
2023-08-15 16:43:05 +02:00
David Ansari 0f5fe8fadd Add Prometheus metric messages dropped by MQTT QoS 0 queue type
Why:
A RabbitMQ operator should be able to see whether RabbitMQ drops MQTT
QoS 0 messages due to overload protection. It's an indication that an
MQTT subscriber does not consume fast enough.

How:
Use Prometheus global counters.

There are 2 valid solutions:
1. Introduce a new metric called messages_dropped specifically for the
   rabbitmq_mqtt_qos0_queue type. This would work in a similar fashion
   how streams extends the per protocol global counters, but requires
   extending the per protocol & queue type global counters for the MQTT
   QoS queue type. The emitted metrics would look as follows:
```
rabbitmq_global_messages_dropped_total{protocol="mqtt310",queue_type="rabbit_mqtt_qos0_queue"} 0
rabbitmq_global_messages_dropped_total{protocol="mqtt311",queue_type="rabbit_mqtt_qos0_queue"} 0
rabbitmq_global_messages_dropped_total{protocol="mqtt50",queue_type="rabbit_mqtt_qos0_queue"} 0
```
2. Reuse the existing metric rabbitmq_global_messages_dead_lettered_maxlen_total

This commit decides to go for the 2nd approach because:
a) there is no need to add a new metric. Even though dead lettering is not supported
for the MQTT QoS 0 queue type, this metric maps nicely to
what happens: The queue drop messages since itx max length
(mqtt.mailbox_soft_limit) is exceeded with overflow behaviour
drop-head. Furtheremore the label `dead_letter_strategy="disabled"` tells
that dead lettering is not taking place from this queue type.

b) this metric allows to support dead lettering for the MQTT QoS 0 queue
type in the future.

The new dead lettering metrics look as follows:
```
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_mqtt_qos0_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

rabbitmq_global_messages_dead_lettered_confirmed_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
```
2023-08-15 16:06:15 +02:00
David Ansari 1ab4f79147 Fix MQTT vhost mapping test case
Multiple things were wrong in this test:
1. The test name ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping was wrong.
As written in our docs, the opposite is true
> The certificate-to-vhost mapping with the mqtt_default_vhosts global parameter is considered more specific than the port-to-vhost mapping with the mqtt_port_to_vhost_mapping global parameter and so takes precedence over it.

2. The test case didn't test what it was supposed to test:
The test relies on revoking the permissions that were set for the
port_to_vhost mapping. However revoking these permission did not happen
because:
2.a) The wrong Config was used, and
2.b) The wrong key was used.

2. could be observed by the following CT warning being logged:
```
Could not find element mqtt_port_to_vhost_mapping in Config.
```
2023-08-14 19:01:38 +02:00
David Ansari 8daaa48da8 User short variable instead of long atom name 2023-08-14 19:01:34 +02:00
David Ansari 2a4301e12d Nack rejected messages to MQTT 5.0 client
since MQTT 5.0 supports negative acknowledgements thanks to reason codes
in the PUBACK packet.

We could either choose reason code 128 or 131. The description code for
131 applies for rejected messages, hence this commit uses 131:
> The PUBLISH is valid but the receiver is not willing to accept it.
2023-08-09 15:31:14 +02:00
Diana Parra Corbacho 7d06f806c2
Mqtt: filter test events 2023-08-07 17:21:04 +02:00
Diana Parra Corbacho 57aa7c0343 mqtt shared_SUITE: another case of a time dependent test
Longer wait for expiry. On slow machines, as CI, we can't guarantee
that the expiration will happen at the exact time
2023-08-03 10:40:23 +02:00
dependabot[bot] efcd91db31
Bump org.junit.jupiter:junit-jupiter
Bumps [org.junit.jupiter:junit-jupiter](https://github.com/junit-team/junit5) from 5.9.3 to 5.10.0.
- [Release notes](https://github.com/junit-team/junit5/releases)
- [Commits](https://github.com/junit-team/junit5/compare/r5.9.3...r5.10.0)

---
updated-dependencies:
- dependency-name: org.junit.jupiter:junit-jupiter
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-07-24 18:33:05 +00:00
dependabot[bot] 7ec082473c
Bump spotless-maven-plugin in /deps/rabbitmq_mqtt/test/java_SUITE_data
Bumps [spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.37.0 to 2.38.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.37.0...lib/2.38.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-07-18 18:22:48 +00:00
David Ansari 52262bb6e9 Rename to x-opt-reply-to-topic
due to AMQP 1.0 spec:
> The annotations type is a map where the keys are restricted to be of type symbol or of type ulong.
All ulong keys, and all symbolic keys except those beginning with "x-" are reserved.
Keys beginning with "x-opt-" MUST be ignored if not understood.
On receiving an annotation key which is not understood, and which does not begin with "x-opt",
the receiving AMQP container MUST detach the link with a not-implemented error.

So, for new headers being introduced it makes sense to comply with that
AMQP 1.0 requirement. We don't want the receiver to force to understand
this header.
2023-07-14 17:25:28 +02:00
David Ansari 5fa35d4890 Fix MQTT test flake
```
//deps/rabbitmq_mqtt:shared_SUITE-mixed
--test_env FOCUS="-group [web_mqtt,v4,cluster_size_1]
-case non_clean_sess_reconnect_qos0_and_qos1"
```

was flaky.

The DISCONNECT packet is sent aysnc from client to server.

See
https://github.com/rabbitmq/rabbitmq-server/actions/runs/5553886147/attempts/1?pr=5077
for an instance of that flake.
2023-07-14 16:13:13 +02:00
David Ansari ed31a818c3 Make mqtt.subscription_ttl unsupported
Starting with RabbitMQ 3.13 mqtt.max_session_expiry_interval_seconds
(set in seconds) will replace the previous setting
mqtt.subscription_ttl.

MQTT 5.0 introduces the Session Expiry Interval
feature which does not only apply to subscribers, but also to
publishers.

The new config name mqtt.max_session_expiry_interval_seconds makes it clear
that it also applies to publishers.

Prior to this commit, when mqtt.subscription_ttl was set, a warning got
logged and the value was ignored. This is dangerous if an operator does
not see the warning but relies for example on `mqtt.subscription =
infinity` to not expire non clean session.

It's safer to make the boot fail if that unsupported config name is
still set. A clear error message will be logged:
```
[error] <0.142.0> Error preparing configuration in phase apply_translations:
[error] <0.142.0>   - Translation for 'rabbitmq_mqtt.subscription_ttl' found invalid configuration:
        Since 3.13 mqtt.subscription_ttl (in milliseconds) is unsupported.
        Use mqtt.max_session_expiry_interval_seconds (in seconds) instead.
```

Alternatively, RabbitMQ could translate mqtt.subscription_ttl to
mqtt.max_session_expiry_interval_seconds.

However, forcing the new config option sounds the better way to go.

Once we write MQTT 5.0 docs, this change must go into the 3.13 release notes.

This commit also renames max_session_expiry_interval_secs to max_session_expiry_interval_seconds.
The latter is clearer to users.
2023-07-13 14:47:33 +00:00
Michael Klishin e2ae3a4680
Merge pull request #8708 from rabbitmq/dependabot/maven/deps/rabbitmq_mqtt/test/java_SUITE_data/main/org.codehaus.mojo-keytool-maven-plugin-1.7
Bump keytool-maven-plugin from 1.5 to 1.7 in /deps/rabbitmq_mqtt/test/java_SUITE_data
2023-06-30 18:08:04 +04:00
dependabot[bot] dcd85dcb5d
Bump logback-classic in /deps/rabbitmq_mqtt/test/java_SUITE_data
Bumps [logback-classic](https://github.com/qos-ch/logback) from 1.2.11 to 1.2.12.
- [Commits](https://github.com/qos-ch/logback/compare/v_1.2.11...v_1.2.12)

---
updated-dependencies:
- dependency-name: ch.qos.logback:logback-classic
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-06-30 14:02:18 +00:00
dependabot[bot] f924a7525b
Bump keytool-maven-plugin in /deps/rabbitmq_mqtt/test/java_SUITE_data
Bumps [keytool-maven-plugin](https://github.com/mojohaus/keytool) from 1.5 to 1.7.
- [Release notes](https://github.com/mojohaus/keytool/releases)
- [Commits](https://github.com/mojohaus/keytool/compare/keytool-1.5...keytool-1.7)

---
updated-dependencies:
- dependency-name: org.codehaus.mojo:keytool-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-06-30 14:02:11 +00:00
Arnaud Cogoluègnes b0dc49150f
Add logback configuration file to MQTT Java test project 2023-06-30 10:03:02 +02:00
Arnaud Cogoluègnes 87d8460751
Add logback dependency to MQTT Java test project
To avoid no-binding warning from SLF4J.
2023-06-30 09:30:10 +02:00
Arnaud Cogoluègnes fbe79ff47b Small cleaning up in MQTT Java tests 2023-06-29 09:44:06 +02:00
Arnaud Cogoluègnes f2206c809c Format MQTT Java tests
Using spotless plugin with Google Java format.

Command to run to format: ./mvnw spotless:apply
2023-06-29 09:44:06 +02:00
Arnaud Cogoluègnes 3a4381229a Refactor MQTT Java tests
Use AssertJ instead of JUnit assertions (more readable API).
Bump dependencies and clean up pom.xml.
2023-06-29 09:44:06 +02:00
David Ansari c7e4984d59 Display MQTT 5 CONNECT User Property in Management UI
and CLI as requested in
https://github.com/rabbitmq/rabbitmq-server/issues/2554#issuecomment-1604205277

"User Properties on the CONNECT packet can be used to send connection related properties from the Client to the Server.
The meaning of these properties is not defined by this specification."
[v5 3.1.2.11.8]

It makes sense to display the User Property of the CONNECT packet in the
Management UI in the connection's Client Properties.
2023-06-26 14:08:05 +02:00
David Ansari b3795f55e6 Fix flaky test
Test
```
//deps/rabbitmq_mqtt:shared_SUITE-mixed --test_env FOCUS="-group [web_mqtt,v3,cluster_size_1] -case block_only_publisher"
```
was flaky:
```
=== Ended at 2023-06-26 07:09:57
=== Location: [{shared_SUITE,block_only_publisher,1323},
              {test_server,ts_tc,1782},
              {test_server,run_test_case_eval1,1291},
              {test_server,run_test_case_eval,1223}]
=== === Reason: no match of right hand side value {error,ack_timeout}
  in function  shared_SUITE:block_only_publisher/1 (shared_SUITE.erl, line 1323)
  in call from test_server:ts_tc/3 (test_server.erl, line 1782)
  in call from test_server:run_test_case_eval1/6 (test_server.erl, line 1291)
  in call from test_server:run_test_case_eval/9 (test_server.erl, line 1223)
```
It seems that the ack_timeout of 1 second was too low for a
subscription.
2023-06-26 13:00:01 +02:00
David Ansari a715eb7756 Attempt to fix flake
Attempt to fix the following flake:
```
=== Ended at 2023-06-26 07:13:34
=== Location: [{shared_SUITE,events,570},
              {test_server,ts_tc,1782},
              {test_server,run_test_case_eval1,1291},
              {test_server,run_test_case_eval,1223}]
=== === Reason: no match of right hand side value []
  in function  shared_SUITE:events/1 (shared_SUITE.erl, line 570)
  in call from test_server:ts_tc/3 (test_server.erl, line 1782)
  in call from test_server:run_test_case_eval1/6 (test_server.erl, line 1291)
  in call from test_server:run_test_case_eval/9 (test_server.erl, line 1223)
```

of test
```
-group [web_mqtt,v3,cluster_size_1] -case events
```

The logs showed that deletion of the exclusive queue took place in the
same millisecond as the server received the DISCONNECT:
```
2023-06-26 07:13:32.838282+00:00 [debug] <0.2494.0> Received a CONNECT, client ID: events, username: undefined, clean start: true, protocol version: 3, keepalive: 60, property names: []
2023-06-26 07:13:32.838436+00:00 [debug] <0.2494.0> MQTT connection 127.0.0.1:38808 -> 127.0.0.1:21007 picked vhost using plugin_configuration_or_default_vhost
2023-06-26 07:13:32.838523+00:00 [debug] <0.2494.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2023-06-26 07:13:32.838672+00:00 [info] <0.2494.0> Accepted Web MQTT connection 127.0.0.1:38808 -> 127.0.0.1:21007 for client ID events
2023-06-26 07:13:33.147196+00:00 [debug] <0.2494.0> Received a SUBSCRIBE with subscription(s) [{mqtt_subscription,<<"my/topic">>,
2023-06-26 07:13:33.147196+00:00 [debug] <0.2494.0>                                             {mqtt_subscription_opts,0,false,
2023-06-26 07:13:33.147196+00:00 [debug] <0.2494.0>                                              false,0,undefined}}]
2023-06-26 07:13:33.457541+00:00 [debug] <0.2494.0> Received an UNSUBSCRIBE for topic filter(s) [<<"my/topic">>]
2023-06-26 07:13:33.762171+00:00 [debug] <0.2494.0> Received a DISCONNECT with reason code 0 and properties #{}
2023-06-26 07:13:33.762350+00:00 [info] <0.2494.0> Web MQTT closing connection 127.0.0.1:38808 -> 127.0.0.1:21007
2023-06-26 07:13:33.762780+00:00 [debug] <0.2504.0> Deleting exclusive queue 'mqtt-subscription-eventsqos0' in vhost '/' because its declaring connection <0.2494.0> was closed
```
However, there could be some delay between disconnecting on the client
WebMQTT side and the processor processing the DISCONNECT packet.
2023-06-26 13:00:01 +02:00
David Ansari ff30bb0bef Clear retained messages synchronously
due to the following flake:
```
v5_SUITE:subscription_identifier failed on line 783
Reason: {test_case_failed,Received unexpected message: {publish,#{client_pid => <0.495.0>,dup => false,
                                        packet_id => undefined,
                                        payload => <<"m3">>,properties => #{},
                                        qos => 0,retain => true,
                                        topic => <<"t/3">>,
                                        via => #Port<0.164>}}}
```

Also, log if unexpected message received due to flake in
```
=== Ended at 2023-06-22 14:30:07
=== Location: [{v5_SUITE,will_delay_message_expiry_publish_properties,1597},
              {test_server,ts_tc,1782},
              {test_server,run_test_case_eval1,1291},
              {test_server,run_test_case_eval,1223}]
=== === Reason: {test_case_failed,"did not receive Will Message"}
```
2023-06-23 14:03:15 +02:00
Loïc Hoguin 610af302c6
Add support for LOCAL proxy header
This is what the proxy uses for health checks. In those cases
we use the socket's IP/ports for the connection name as we
have nothing else we can use.
2023-06-23 12:12:58 +02:00
Chunyi Lyu 468985f7ad Remove unused test helpers
- testQueuePropertiesWithCleanSessionSet() was not used
- testQueuePropertiesWithCleanSessionUnset() and
  testQueuePropertiesWithCleanSession() was called only once so removing
the extra layer here to simply structure
2023-06-22 14:36:53 +01:00
Chunyi Lyu 27e0cfce35 Clean up with clean start/session set to true 2023-06-22 12:12:04 +01:00
Chunyi Lyu c301a873aa Remove redundent connect options in test cases
- no need to set username and password unless they are different from
  guest/guest
- clean start and clean session default to true; no need to set
explicitly
2023-06-22 11:35:17 +01:00
David Ansari 36855b500f Do not run_teardown_steps twice 2023-06-21 17:14:08 +01:00
Chunyi Lyu 17ad067259 Duplicate java SSL test for mqtt v5 2023-06-21 17:14:08 +01:00
Chunyi Lyu 147e2d6676 Run Java v5 tests in separate RMQ cluster
- to avoid test poluting since both v3 and v5 java test cases use
the same resource names
2023-06-21 17:14:08 +01:00
David Ansari 0a98c1e986 Fix assertion arguments order
The correct order is
```
assertArrayEquals(Expecteds, Actuals)
```

Do not mark the tests as flaky.
Instead, we want to understand and fix the flakes.
2023-06-21 17:14:08 +01:00
Chunyi Lyu 7aba94468f Add V5 test to java SUITE
- same tests as v3 test suite, with pathos mqtt V5 client.
- two test cases are removed:
1. sessionRedelivery callback behaviors in v3 and v5 client seems
to be different that throwing exception does not shutdown a v5 client
immediately. This test case tests that unacked qos1 msgs are redelivered
by RMQ broker, which is covered in other test suite
2. lastWillDowngradesQoS2 is removed because for mqtt 5, will msgs with
unsupported Qos is an error and not accepted
2023-06-21 17:14:08 +01:00
David Ansari 92addc067e Fix MQTT crash
This commit fixes the following crash:
```
2388-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0>     exception error: bad argument
2389-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0>       in function  lists:keymember/3
2390-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0>          called as lists:keymember(<<"x-mqtt-publish-qos">>,1,undefined)
2391-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0>          *** argument 3: not a list
2392-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0>       in call from rabbit_mqtt_processor:amqp_props_to_mqtt_props/2 (rabbit_mqtt_processor.erl, line 2219)
```

This crash occurs when a message without AMQP 0.9.1 #P_basic.headers
is sent to the MQTT v4 connection process, but consumed by an MQTT v5
connection process.
This is the case when an AMQP 0.9.1 client sends a message to a v4 MQTT
client, and this same client subsequently upgrades to v5 consuming the
message.

When sending from AMQP 0.9.1 client directly to a v5 MQTT connection,
the AMQP 0.9.1 header <<"x-binding-keys">> is set.

When sending from AMQP 0.9.1 client directly to a v4 MQTT connection,
no header is set, but this code branch was not evaluated.
2023-06-21 17:14:08 +01:00
David Ansari 3dc799afb5 Fix test expectation
Matching against #{} does not validate that the map is empty.
2023-06-21 17:14:08 +01:00
Chunyi Lyu 08fd9d00e3 Set topic alias when publish retained message
- remove topic alias from message props when storing retained msgs
- set topic alias for outbound before sending retained msgs
2023-06-21 17:14:08 +01:00
David Ansari 0e00e3479e CONNACK with Bad Authentication Method
RabbitMQ MQTT already supports authenticating clients via
username + password, OAuth tokens, and certificates.
We could make use of RabbitMQ SASL mechanisms in the future,
if needed. For now, if the client wants to perform extended
authentication, we return Bad Authentication Method in the CONNACK
packet.
2023-06-21 17:14:08 +01:00
Chunyi Lyu 12cdc69572 Test MQTT 5 with proxy protocol suite 2023-06-21 17:14:08 +01:00
Chunyi Lyu 1eac345f1c Run cluster suite against v5 enabled cluster 2023-06-21 17:14:08 +01:00
David Ansari 14d81b430f Add Topic Aliases from server to client
Once the server's Topic Alias cache for messages from server to client
is full, this commit does not replace any existing aliases.
So, the first topics "win" and stay in the cache forever.
This matches the behaviour of VerneMQ and EMQX.
For now that's good enough.
In the future, we can easily change that behaviour to some smarter strategy,
for example
1. Hash the TopicName to an Topic Alias and replace the old
   alias, or
2. For the Topic Alias Cache from server to client, keep 2 Maps:
   #{TopicName => TopicAlias} and #{TopicAlias => TopicName} and a
   counter that wraps to 1 once the Topic Alias Maximum is reached and
   just replace an existing Alias if the TopicName is not cached.

Also, refactor Topic Alias Maximum:
* Remove code duplication
* Allow an operator to prohibit Topic Aliases by allowing value 0 to be
  configured
* Change config name to topic_alias_maximum to that it matches exactly
  the MQTT feture name
* Fix wrong code formatting
* Add the invalid or unkown Topic Alias to log message for easier
  troubleshooting
2023-06-21 17:14:08 +01:00
Chunyi Lyu fd52caa211 Support topic alias from client to broker 2023-06-21 17:14:08 +01:00
Chunyi Lyu 60f6784d30 Make Topic Alias Maximum configurable
- default to 20, configurable through cuttlefish config
- add test to v5 suite for invalid topic alias in publish
2023-06-21 17:14:08 +01:00
David Ansari bb20618b13 Return matched binding keys faster
For MQTT 5.0 destination queues, the topic exchange does not only have
to return the destination queue names, but also the matched binding
keys.
This is needed to implement MQTT 5.0 subscription options No Local,
Retain As Published and Subscription Identifiers.

Prior to this commit, as the trie was walked down, we remembered the
edges being walked and assembled the final binding key with
list_to_binary/1.

list_to_binary/1 is very expensive with long lists (long topic names),
even in OTP 26.
The CPU flame graph showed ~3% of CPU usage was spent only in
list_to_binary/1.

Unfortunately and unnecessarily, the current topic exchange
implementation stores topic levels as lists.

It would be better to store topic levels as binaries:
split_topic_key/1 should ideally use binary:split/3 similar as follows:
```
1> P = binary:compile_pattern(<<".">>).
{bm,#Ref<0.1273071188.1488322568.63736>}
2> Bin = <<"aaa.bbb..ccc">>.
<<"aaa.bbb..ccc">>
3> binary:split(Bin, P, [global]).
[<<"aaa">>,<<"bbb">>,<<>>,<<"ccc">>]
```
The compiled pattern could be placed into persistent term.

This commit decided to avoid migrating Mnesia tables to use binaries
instead of lists. Mnesia migrations are non-trivial, especially with the
current feature flag subsystem.
Furthermore the Mnesia topic tables are already getting migrated to
their Khepri counterparts in 3.13.
Adding additional migration only for Mnesia does not make sense.

So, instead of assembling the binding key as we walk down the trie and
then calling list_to_binary/1 in the leaf, it
would be better to just fetch the binding key from the database in the leaf.

As we reach the leaf of the trie, we know both source and destination.
Unfortunately, we cannot fetch the binding key efficiently with the
current rabbit_route (sorted by source exchange) and
rabbit_reverse_route (sorted by destination) tables as the key is in
the middle between source and destination.
If there are a huge number of bindings for a given sourc exchange (very
realistic in MQTT use cases) or a large number of bindings for a given
destination (also realistic), it would require scanning these large
number of bindings.

Therefore this commit takes the simplest possible solution:
The solution leverages the fact that binding arguments are already part of
table rabbit_topic_trie_binding.
So, if we simply include the binding key into the binding arguments, we
can fetch and return it efficiently in the topic exchange
implementation.

The following patch omitting fetching the empty list binding argument
(the default) makes routing slower because function
`analyze_pattern.constprop.0` requires significantly more (~2.5%) CPU time
```
@@ -273,7 +273,11 @@ trie_bindings(X, Node) ->
                                    node_id       = Node,
                                    destination   = '$1',
                                    arguments     = '$2'}},
-    mnesia:select(?MNESIA_BINDING_TABLE, [{MatchHead, [], [{{'$1', '$2'}}]}]).
+    mnesia:select(
+      ?MNESIA_BINDING_TABLE,
+      [{MatchHead, [{'andalso', {'is_list', '$2'}, {'=/=', '$2', []}}], [{{'$1', '$2'}}]},
+       {MatchHead, [], ['$1']}
+      ]).
```
Hence, this commit always fetches the binding arguments.

All MQTT 5.0 destination queues will create a binding that
contains the binding key in the binding arguments.

Not only does this solution avoid expensive list_to_binay/1 calls, but
it also means that Erlang app rabbit (specifically the topic exchange
implementation) does not need to be aware of MQTT anymore:
It just returns the binding key when the binding args tell to do so.

In future, once the Khepri migration completed, we should be able to
relatively simply remove the binding key from the binding arguments
again to free up some storage space.

Note that one of the advantages of a trie data structue is its space
efficiency that you don't have to store the same prefixes multiple
times.
However, for RabbitMQ the binding key is already stored at least N times
in various routing tables, so storing it a few times more via the
binding arguments should be acceptable.
The speed improvements are favoured over a few more MBs ETS usage.
2023-06-21 17:14:08 +01:00
David Ansari f425f87192 Make retained message stores compatible with pre 3.13
The format of #mqtt_msg{} changes from 3.12 to 3.13.
In 3.13 the record contains 2 additional fields:
* props
* timestamp

The old #mqtt_msg{} might still be stored by the retained message store
in ets or dets.

This commit converts such an old message format when read from the
database.

The alternative would have been to run a migration function over the
whole table which is slightly more complex to implement.

Instead of giving the new message format a different record name,
e.g. #mqtt_msg_v2{}, this commit decides to re-use the same name such
that the new code only handles the record name #mqtt_msg{}.
2023-06-21 17:14:08 +01:00
David Ansari d7882b00dc PUBACK with reason code "No matching subscribers"
Support reason code "No matching subscribers" in PUBACK.

This somewhat corresponds to the `mandatory` message property
in AMQP 0.9.1.
2023-06-21 17:14:08 +01:00
David Ansari 23837c5270 DISCONNECT v5 clients with Server Shutting Down
When RabbitMQ enters maintenance mode / is being drained, all client
connections are closed.

This commit sends a DISCONNECT packet to (Web) MQTT 5.0 clients with
Reason Code "Server shutting down" before the connection is closed.
2023-06-21 17:14:08 +01:00
David Ansari 8c0b0e9338 Support MQTT 5.0 Properties
The following PUBLISH and Will properties are forwarded unaltered by the
server:
* Payload Format Indicator
* Content Type
* Response Topic
* Correlation Data
* User Property

Not only must these properties be forwarded unaltered from an MQTT
publishing client to an MQTT receiving client, but it would also be nice
to allow for protocol interoperability:
Think about RPC request-response style patterns where the requester is
an MQTT client and the responder is an AMQP 0.9.1 or STOMP client.

We reuse the P_basic fields where possible:
* content_type (if <= 255 bytes)
* correlation_id (if <= 255 bytes)

Otherwise, we add custom AMQP 0.9.1 headers.

The headers follow the naming pattern "x-mqtt-<property>" where
<property> is the MQTT v5 property if that property makes only
(mainly) sense in the MQTT world:
* x-mqtt-user-property
* x-mqtt-payload-format-indicator

If the MQTT v5 property makes also sense outside of the MQTT world, we
name it more generic:
* x-correlation (if > 255 bytes)
* x-reply-to-topic (since P_basic.reply_to assumes a queue name)

In the future, we can think about adding a header x-reply-to-exchange
and have the MQTT plugin set its value to the configured mqtt.exchange
such that clients don't have to assume the default topic exchange amq.topic.
2023-06-21 17:14:08 +01:00
David Ansari fb7af48df6 Support Will Delay Interval
Previously, the Will Message could be kept in memory in the MQTT
connection process state. Upon termination, the Will Message is sent.

The new MQTT 5.0 feature Will Delay Interval requires storing the Will
Message outside of the MQTT connection process state.

The Will Message should not be stored node local because the client
could reconnect to a different node.

Storing the Will Message in Mnesia is not an option because we want to
get rid of Mnesia. Storing the Will Message in a Ra cluster or in Khepri
is only an option if the Will Payload is small as there is currently no
way in Ra to **efficiently** snapshot large binary data (Note that these
Will Messages are not consumed in a FIFO style workload like messages in
quorum queues. A Will Message needs to be stored for as long as the
Session lasts - up to 1 day by default, but could also be much longer if
RabbitMQ is configured with a higher maximum session expiry interval.)
Usually Will Payloads are small: They are just a notification that its
MQTT session ended abnormally. However, we don't know how users leverage
the Will Message feature. The MQTT protocol allows for large Will Payloads.

Therefore, the solution implemented in this commit - which should work
good enough - is storing the Will Message in a queue.
Each MQTT session which has a Session Expiry Interval and Will Delay
Interval of > 0 seconds will create a queue if the current Network
Connection ends where it stores its Will Message. The Will Message has a
message TTL set (corresponds to the Will Delay Interval) and the queue
has a queue TTL set (corresponds to the Session Expiry Interval).
If the client does not reconnect within the Will Delay Interval, the
message is dead lettered to the configured MQTT topic exchange
(amq.topic by default).

The Will Delay Interval can be set by both publishers and subscribers.
Therefore, the Will Message is the 1st session state that RabbitMQ keeps
for publish-only MQTT clients.

One current limitation of this commit is that a Will Message that is
delayed (i.e. Will Delay Interval is set) and retained (i.e. Will Retain
flag set) will not be retained.
One solution to retain delayed Will Messages is that the retainer
process consumes from a queue and the queue binds to the topic exchange
with a topic starting with `$`, for example `$retain/#`.
The AMQP 0.9.1 Will Message that is dead lettered could then be added a
CC header such that it won't not only be published with the Will Topic,
but also with `$retain` topic. For example, if the Will Topic is `a/b`,
it will publish with routing key `a/b` and CC header `$retain/a/b`.

The reason this is not implemented in this commit is that to keep the
currently broken retained message store behaviour, we would require
creating at least one queue per node and publishing only to that local
queue. In future, once we have a replicated retained message store based
on a Stream for example, we could just publish all retained messages to
the `$retain` topic and thefore into the Stream.
So, for now, we list "retained and delayed Will Messages" as a limitation
that they actually won't be retained.
2023-06-21 17:14:08 +01:00
David Ansari 60a6af0054 Rename will_msg to will_payload
when only the payload is meant.
See [v5 3.1.3.4]
2023-06-21 17:14:08 +01:00
David Ansari 605e033f43 Fix test decode_basic_properties
This commit fixes 2 separate issues:
1. No quorum queue got created in v5 because Session Expiry Interval was 0.
2. Fix a function_clause error. Pass the decoded properties further to other
functions looking up headers.
2023-06-21 17:14:08 +01:00
David Ansari 0183909453 Fix failing test
due to rebasing onto main.

mqtt5 branch adds a new header
```
{<<"x-mqtt-retain">>, bool, false}
```
which caused the incoming_message_interceptors test case to fail.
2023-06-21 17:14:08 +01:00
David Ansari ce573c35fa Support MQTT 5.0 Subscription Option Retain Handling
The MQTT v5 spec is a bit vague on Retain Handling 1:
"If Retain Handling is set to 1 then if the subscription did not
already exist, the Server MUST send all retained message matching the
Topic Filter of the subscription to the Client, and if the subscription
did exist the Server MUST NOT send the retained messages.
[MQTT-3.3.1-10]." [v5 3.3.1.3]

Does a subscription with the same topic filter but different
subscription options mean that "the subscription did exist"?

This commit interprets "subscription exists" as both topic filter and
subscription options must be the same.

Therefore, if a client creates a subscription with a topic filter that
is identical to a previous subscription and subscription options that
are different and Retain Handling 1, the server sends the retained
message.
2023-06-21 17:14:08 +01:00
David Ansari e2b545f270 Support MQTT 5.0 features No Local, RAP, Subscription IDs
Support subscription options "No Local" and "Retain As Published"
as well as Subscription Identifiers.

All three MQTT 5.0 features can be set on a per subscription basis.
Due to wildcards in topic filters, multiple subscriptions
can match a given topic. Therefore, to implement Retain As Published and
Subscription Identifiers, the destination MQTT connection process needs
to know what subscription(s) caused it to receive the message.

There are a few ways how this could be implemented:

1. The destination MQTT connection process is aware of all its
   subscriptions. Whenever, it receives a message, it can match the
   message's routing key / topic against all its known topic filters.
   However, to iteratively match the routing key against all topic
   filters for every received message can become very expensive in the
   worst case when the MQTT client creates many subscriptions containing
   wildcards. This could be the case for an MQTT client that acts as a
   bridge or proxy or dispatcher: It could subscribe via a wildcard for
   each of its own clients.

2. Instead of interatively matching the topic of the received message
   against all topic filters that contain wildcards, a better approach
   would be for every MQTT subscriber connection process to maintain a
   local trie datastructure (similar to how topic exchanges are
   implemented) and perform matching therefore more efficiently.
   However, this does not sound optimal either because routing is
   effectively performed twice: in the topic exchange and again against
   a much smaller trie in each destination connection process.

3. Given that the topic exchange already perform routing, a much more
   sensible way would be to send the matched binding key(s) to the
   destination MQTT connection process. A subscription (topic filter)
   maps to a binding key in AMQP 0.9.1 routing. Therefore, for the first
   time in RabbitMQ, the routing function should not only output a list
   of unique destination queues, but also the binding keys (subscriptions)
   that caused the message to be routed to the destination queue.

This commit therefore implements the 3rd approach.
The downside of the 3rd approach is that it requires API changes to the
routing function and topic exchange.

Specifically, this commit adds a new function rabbit_exchange:route/3
that accepts a list of routing options. If that list contains version 2,
the caller of the routing function knows how to handle the return value
that could also contain binding keys.

This commits allows an MQTT connection process, the channel process, and
at-most-once dead lettering to handle binding keys. Binding keys are
included as AMQP 0.9.1 headers into the basic message.
Therefore, whenever a message is sent from an MQTT client or AMQP 0.9.1
client or AMQP 1.0 client or STOMP client, the MQTT receiver will know
the subscription identifier that caused the message to be received.

Note that due to the low number of allowed wildcard characters (# and
+), the cardinality of matched binding keys shouldn't be high even if
the topic contains for example 3 levels and the message is sent to for
example 5 million destination queues. In other words, sending multiple
distinct basic messages to the destination shouldn't hurt the delegate
optimisation too much. The delegate optimisation implemented for classic
queues and rabbit_mqtt_qos0_queue(s) still takes place for all basic
messages that contain the same set of matched binding keys.

The topic exchange returns all matched binding keys by remembering the
edges walked down to the leaves. As an optimisation, only for MQTT
queues are binding keys being returned. This does add a small dependency
from app rabbit to app rabbitmq_mqtt which is not optimal. However, this
dependency should be simple to remove when omitting this optimisation.

Another important feature of this commit is persisting subscription
options and subscription identifiers because they are part of the
MQTT 5.0 session state.

In MQTT v3 and v4, the only subscription information that were part of
the session state was the topic filter and the QoS level.
Both information were implicitly stored in the form of bindings:
The topic filter as the binding key and the QoS level as the destination
queue name of the binding.

For MQTT v5 we need to persist more subscription information.
From a domain perspective, it makes sense to store subscription options
as part of subscriptions, i.e. bindings, even though they are currently
not used in routing.
Therefore, this commits stores subscription options as binding arguments.

Storing subscription options as binding arguments comes in turn with
new challenges: How to handle mixed version clusters and upgrading an
MQTT session from v3 or v4 to v5?
Imagine an MQTT client connects via v5 with Session Expiry Interval > 0
to a new node in a mixed version cluster, creates a subscription,
disconnects, and subsequently connects via v3 to an old node. The
client should continue to receive messages.

To simplify such edge cases, this commit introduces a new feature flag
called mqtt_v5. If mqtt_v5 is disabled, clients cannot connect to
RabbitMQ via MQTT 5.0.

This still doesn't entirely solve the problem of MQTT session upgrades
(v4 to v5 client) or session downgrades (v5 to v4 client).

Ideally, once mqtt_v5 is enabled, all MQTT bindings contain non-empty binding
arguments. However, this will require a feature flag migration function
to modify all MQTT bindings. To be more precise, all MQTT bindings need
to be deleted and added because the binding argument is part of the
Mnesia table key.

Since feature flag migration functions are non-trivial to implement in
RabbitMQ (they can run on every node multiple times and concurrently),
this commit takes a simpler approach:
All v3 / v4 sessions keep the empty binding argument [].
All v5 sessions use the new binding argument [#mqtt_subscription_opts{}].

This requires only handling a session upgrade / downgrade by
creating a binding (with the new binding arg) and deleting the old
binding (with the old binding arg) when processing the CONNECT packet.

Note that such session upgrades or downgrades should be rather rare in
practice. Therefore these binding transactions shouldn't hurt peformance.

The No Local option is implemented within the MQTT publishing connection
process: The message is not sent to the MQTT destination if the
destination queue name matches the current MQTT client ID and the
message was routed due to a subscription that has the No Local flag set.
This avoids unnecessary traffic on the MQTT queue.
The alternative would have been that the "receiving side" (same process)
filters the message out - which would have been more consistent in how
Retain As Published and Subscription Identifiers are implemented, but
would have caused unnecessary load on the MQTT queue.
2023-06-21 17:14:08 +01:00
David Ansari 51d659fd07 Fix failing property in packet_prop_SUITE
1. Shrinking times out if there is an error, therefore remove the 60
   seconds Bazel timeout by using a medium size bazel test suite.
2. The MQTT 5.0 spec mandates for binary data types and UTF 8 string
   data types to have values of maximum 65,535 bytes.
   Therefore, ensure this test suite does not generate data greater than
   that limit.
2023-06-21 17:14:08 +01:00
Chunyi Lyu d601c6432e Send disconnect packet from server
- when clients connect with a duplicate client id;
disconnect with reason code session taken over 142
- when keep alive has timed out;
disconnect with reason code keep alive timeout 141
2023-06-21 17:14:08 +01:00
David Ansari e273b4c87b Fix two small bugs 2023-06-21 17:14:08 +01:00
David Ansari 80d972e308 Add property test for MQTT encoder / decoder
All MQTT packets that can be sent in both directions (from client to
server and server to client) are tested in packet_prop_SUITE.

The symmetric property is very concise because encoding and then decoding an
MQTT packet should yield the original MQTT packet.

The input data variety of the previous example based tests was very
small.
2023-06-21 17:14:08 +01:00
David Ansari 3b3ccd4d42 Simplify UNSUBACK reply
Whether a payload is sent to the client is decided by the serialiser.
2023-06-21 17:14:08 +01:00
David Ansari cd7f396bea Simplify code and remove code duplication 2023-06-21 17:14:08 +01:00
David Ansari 6f4f9506a4 Add a test case for large Receive Maximum value 2023-06-21 17:14:08 +01:00
Chunyi Lyu bb9fed85f5 Add return codes in unsuback packet 2023-06-21 17:14:08 +01:00
Chunyi Lyu d1b173de8c Return v5 failure reason codes for suback
- mqtt v5 has more descriptive return values for suback
- added two possible failure reason codes for suback packet
one for permission error, another for quota exceeded error
- modified auth suite to assert on reason codes for v5
- no new test case since failures were already covered
2023-06-21 17:14:08 +01:00
Chunyi Lyu 471540dbdc Implement client ReceiveMaximum
- rename processor state prefetch to receive_maximum
to better match property name for mqtt 5
- defaults to 10 (as previously) when not set
not saved in session state, configuration is per
connection
2023-06-21 17:14:08 +01:00
David Ansari acd249cb0f Add a test for Session Expiry Interval 2023-06-21 17:14:08 +01:00
Chunyi Lyu 68d59bcaf3 Update sess exp interval when client reconnect
- when client reconnecting with clean start false,
server respects the new session expiry interval provided
by the client
2023-06-21 17:14:08 +01:00
David Ansari 2efd9c06b8 Support Session Expiry Interval
Allow Session Expiry Interval to be changed when client DISCONNECTs.

Deprecate config subscription_ttl in favour of max_session_expiry_interval_secs
because the Session Expiry Interval will also apply to publishers that
connect with a will message and will delay interval.
"The additional session state of an MQTT v5 server includes:
* The Will Message and the Will Delay Interval
* If the Session is currently not connected, the time at which the Session
  will end and Session State will be discarded."

The Session Expiry Interval picked by the server and sent to the client
in the CONNACK is the minimum of max_session_expiry_interval_secs and
the requested Session Expiry Interval by the client in CONNECT.

This commit favours dynamically changing the queue argument x-expires
over creating millions of different policies since that many policies
will come with new scalability issues.

Dynamically changing queue arguments is not allowed by AMQP 0.9.1
clients. However, it should be perfectly okay for the MQTT plugin to do
so for the queues it manages. MQTT clients are not aware that these
queues exist.
2023-06-21 17:14:08 +01:00
David Ansari 6e9aa952ea Remove code duplication 2023-06-21 17:14:08 +01:00
Chunyi Lyu 8ce0813bda Close conn when will msg qos is 2 [MQTT-3.2.2-12]
If a Server receives a CONNECT packet containing a Will QoS that
exceeds its capabilities, it MUST reject the connection. It SHOULD
use a CONNACK packet with Reason Code 0x9B (QoS not supported) as
described in section 4.13 Handling errors, and MUST close the Network Connection
2023-06-21 17:14:08 +01:00
David Ansari c31ce01443 Dead letter negatively ACKed MQTT v5 messages
MQTT v5 allows client and server to negatively ack a message by setting
a reason code of 128 or greater indicating failure.

"If PUBACK or PUBREC is received containing a Reason Code of 0x80 or greater
the corresponding PUBLISH packet is treated as acknowledged, and MUST NOT be
retransmitted [MQTT-4.4.0-2]."

Even though the spec prohibits resending such messages, if a client does
not accept a message, RabbitMQ can still dead letter the message.
2023-06-21 17:14:08 +01:00
David Ansari 66fe9630b5 Add Message Expiry Interval for retained messages
MQTT v5 spec:
"If the current retained message for a Topic expires, it is discarded
and there will be no retained message for that topic."

This commit also supports Message Expiry Interval for retained messages
when a node is restarted.
Therefore, the insertion timestamp needs to be stored on disk.
Upon recovery, the Erlang timers are re-created.
2023-06-21 17:14:08 +01:00
Chunyi Lyu c39079f657 Disconnect at pub qos > server max qos
- "If the Server included a Maximum QoS in its CONNACK response
to a Client and it receives a PUBLISH packet with a QoS greater than this
then it uses DISCONNECT with Reason Code 0x9B (QoS not supported)"
- only affects mqtt v5, server max qos is 1
2023-06-21 17:14:08 +01:00
David Ansari 044ee02b36 Add MQTT v5 feature Message Expiry Interval
This commit does not yet implement Message Expiry Interval of
* retained messages: "If the current retained message for a Topic
  expires, it is discarded and there will be no retained message for
  that topic."
2023-06-21 17:14:08 +01:00
Chunyi Lyu d237a6b0c9 Allow setting max packet size by cuttlefish 2023-06-21 17:14:08 +01:00
David Ansari 2ef1f79fdd Test server restart with retained messages
"Retained messages do not form part of the Session State in the Server,
they are not deleted as a result of a Session ending."

Both retained message stores ETS and DETS implement recovery.
This commit adds a test that recovery works as intended.
2023-06-21 17:14:08 +01:00
David Ansari e50e994ef4 Return Assigned Client Identifier in CONNACK
"If the Client connects using a zero length Client Identifier, the Server
MUST respond with a CONNACK containing an Assigned Client Identifier."
2023-06-21 17:14:08 +01:00
David Ansari f1f8167ec4 Add MQTT v5 feature Maximum Packet Size set by server
"Allow the Client and Server to independently specify the maximum
packet size they support. It is an error for the session partner
to send a larger packet."

This commit implements the part where the Server specifies the maximum
packet size.

"In the case of an error in a CONNECT packet it MAY send a CONNACK
packet containing the Reason Code, before closing the Network
Connection. In the case of an error in any other packet it SHOULD send a
DISCONNECT packet containing the Reason Code before closing the Network
Connection."

This commit implements only the "SHOULD" (second) part, not the "MAY"
(first) part.

There are now 2 different global wide MQTT settings on the server:
1. max_packet_size_unauthenticated which applies to the CONNECT packet
   (and maybe AUTH packet in the future)
2. max_packet_size_authenticated which applies to all other MQTT
   packets (that is, after the client successfully authenticated).

These two settings will apply to all MQTT versions.
In MQTT v5, if a non-CONNECT packet is too large, the server will send a
DISCONNECT packet to the client with Reason Code "Packet Too Large"
before closing the network connection.
2023-06-21 17:14:08 +01:00
David Ansari 49f1071591 Add MQTT v5 feature Maximum Packet Size set by client
"Allow the Client and Server to independently specify the maximum
packet size they support. It is an error for the session partner
to send a larger packet."

This commit implements the part where the Client specifies the maximum
packet size.

As per protocol spec, instead of sending, the server drops the MQTT packet
if it's too large.
A debug message is logged for "infrequent" packet types.

For PUBLISH packets, the messages is rejected to the queue such that it
will be dead lettered, if dead lettering is configured.
At the very least, Prometheus metrics for dead lettered messages will
be increased, even if dead lettering is not configured.
2023-06-21 17:14:08 +01:00
David Ansari c44b546f73 Test MQTT v5 in existing MQTT suites 2023-06-21 17:14:08 +01:00
David Ansari be6ff92692 Serialise and parse MQTT 5.0 packets 2023-06-21 17:14:08 +01:00