Commit Graph

1430 Commits

Author SHA1 Message Date
David Ansari 19670b625b Copy remove consumer API from qq-v4 2024-06-27 15:09:39 +02:00
Loïc Hoguin 9f15e978b1
make: Remove xrefr
It is no longer used by Erlang.mk.
2024-06-25 13:08:08 +02:00
Karl Nilsson 077c027d52
MQTT: speed up shared_SUITE:many_qos1_messages (#11477)
* MQTT: speed up shared_SUITE:many_qos1_messages

* speed up block_only_publisher

* MQTT: reorganise tests groups

To avoid starting a new broker for each protocol group (v3,v4,v5).

Instead we run all protocol groups under a single cluster configuration
group.

* MQTT: speed up publish_to_all_queue_types_qos* tests.

* Remove separate mnesia_store group

to speed up the test suite

* Fix wrong_shard_count

* Remove unused field

* Run subset of v3 tests

The code being tested under v3 and v4 is almost identical.
To save time in CI, we therefore run only a very small subset of tests in v3.

This cuts the total time reported by CT for the shared_SUITE from 898
seconds to 614 seconds.

Also, the java_SUITE tests in v3.

* Fix wrong_shard_count

* Fix mixed version failure

---------

Co-authored-by: David Ansari <david.ansari@gmx.de>
2024-06-19 20:02:33 +02:00
Michal Kuratczyk 27f735f49e
Use emqx/emqtt instead of a fork (#11479)
* Use emqx/emqtt instead of a fork
* Specify SNI in test connections (otherwise OTP26 secure TLS defaults make some tests fail)
2024-06-19 14:03:30 +02:00
Rin Kuryloski 5debebfaf3 Use rules_elixir to build the cli without mix
Certain elixir-native deps are still build with mix, but this can be
corrected later
2024-06-18 14:50:34 +02:00
Karl Nilsson 1abf7a3886
Upgrade maven for MQTT java tests. (#11465) 2024-06-17 19:06:40 +02:00
dependabot[bot] 9bf8e13159
build(deps): bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.2.5 to 3.3.0.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.2.5...surefire-3.3.0)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-06-14 18:11:00 +00:00
Karl Nilsson ebbff46c96
Merge pull request #11307 from rabbitmq/amqp-flow-control-poc-2
Introduce outbound RabbitMQ internal AMQP flow control
2024-06-05 15:01:05 +01:00
David Ansari d70e529d9a Introduce outbound RabbitMQ internal AMQP flow control
## What?

Introduce RabbitMQ internal flow control for messages sent to AMQP
clients.

Prior this PR, when an AMQP client granted a large amount of link
credit (e.g. 100k) to the sending queue, the sending queue sent
that amount of messages to the session process no matter what.
This becomes problematic for memory usage when the session process
cannot send out messages fast enough to the AMQP client, especially if
1. The writer proc cannot send fast enough. This can happen when
the AMQP client does not receive fast enough and causes TCP
back-pressure to the server. Or
2. The server session proc is limited by remote-incoming-window.

Both scenarios are now added as test cases.
Tests
* tcp_back_pressure_rabbitmq_internal_flow_quorum_queue
* tcp_back_pressure_rabbitmq_internal_flow_classic_queue
cover scenario 1.

Tests
* incoming_window_closed_rabbitmq_internal_flow_quorum_queue
* incoming_window_closed_rabbitmq_internal_flow_classic_queue
cover scenario 2.

This PR sends messages from queues to AMQP clients in a more controlled
manner.

To illustrate:
```
make run-broker PLUGINS="rabbitmq_management" RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 4"
observer_cli:start()
mq
```
where `mq` sorts by message queue length.
Create a stream:
```
deps/rabbitmq_management/bin/rabbitmqadmin declare queue name=s1 queue_type=stream durable=true
```
Next, send and receive from the Stream via AMQP.
Grant a large number of link credit to the sending stream:
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
bash-5.1# quiver //host.docker.internal//queue/s1 --durable -d 30s --credit 100000
```

**Before** to this PR:
```
RESULTS

Count ............................................... 100,696 messages
Duration ............................................... 30.0 seconds
Sender rate ......................................... 120,422 messages/s
Receiver rate ......................................... 3,363 messages/s
End-to-end rate ....................................... 3,359 messages/s
```
We observe that all 100k link credit worth of messages are buffered in the
writer proc's mailbox:
```
|No | Pid        | MsgQueue  |Name or Initial Call                 |      Memory | Reductions          |Current Function                  |
|1  |<0.845.0>   |100001     |rabbit_amqp_writer:init/1            |  126.0734 MB| 466633491           |prim_inet:send/5                  |
```

**After** to this PR:
```
RESULTS

Count ............................................. 2,973,440 messages
Duration ............................................... 30.0 seconds
Sender rate ......................................... 123,322 messages/s
Receiver rate ........................................ 99,250 messages/s
End-to-end rate ...................................... 99,148 messages/s
```
We observe that the message queue lengths of both writer and session
procs are low.

 ## How?

Our goal is to have queues send out messages in a controlled manner
without overloading RabbitMQ itself.
We want RabbitMQ internal flow control between:
```
AMQP writer proc <--- session proc <--- queue proc
```
A similar concept exists for classic queues sending via AMQP 0.9.1.
We want an approach that applies to AMQP and works generic for all queue
types.

For the interaction between AMQP writer proc and session proc we use a
simple credit based approach reusing module `credit_flow`.

For the interaction between session proc and queue proc, the following options
exist:

 ### Option 1
The session process provides expliclity feedback to the queue after it
has sent N messages.
This approach is implemented in
https://github.com/ansd/rabbitmq-server/tree/amqp-flow-control-poc-1
and works well.
A new `rabbit_queue_type:sent/4` API was added which lets the queue proc know
that it can send further messages to the session proc.

Pros:
* Will work equally well for AMQP 0.9.1, e.g. when quorum queues send messages
  in auto ack mode to AMQP 0.9.1 clients.
* Simple for the session proc

Cons:
* Sligthly added complexity in every queue type implementation
* Multiple Ra commands (settle, credit, sent) to decide when a quorum
  queue sends more messages.

 ### Option 2
A dual link approach where two AMQP links exists between
```
AMQP client <---link--> session proc <---link---> queue proc
```
When the client grants a large amount of credits, the session proc will
top up credits to the queue proc periodically in smaller batches.

Pros:
* No queue type modifications required.
* Re-uses AMQP link flow control

Cons:
* Significant added complexity in the session proc. A client can
  dynamically decrease or increase credits and dynamically change the drain
  mode while the session tops up credit to the queue.

 ### Option 3
Credit is a 32 bit unsigned integer.
The spec mandates that the receiver independently chooses a credit.
Nothing in the spec prevents the receiver to choose a credit of 1 billion.
However the credit value is merely a **maximum**:
> The link-credit variable defines the current maximum legal amount that the delivery-count can be increased by.

Therefore, the server is not required to send all available messages to this
receiver.

For delivery-count:
> Only the sender MAY independently modify this field.

"independently" could be interpreted as the sender could add to the delivery-count
irrespective of what the client chose for drain and link-credit.

Option 3: The queue proc could at credit time already consume credit
and advance the delivery-count if credit is too large before checking out any messages.
For example if credit is 100k, but the queue only wants to send 1k, the queue could
consume 99k of credits and advance the delivery-count, and subsequently send maximum 1k messages.
If the queue advanced the delivery-count, RabbitMQ must send a FLOW to the receiver,
otherwise the receiver wouldn’t know that it ran out of link-credit.

Pros:
* Very simple

Cons:
* Possibly unexpected behaviour for receiving AMQP clients
* Possibly poor end-to-end throughput in auto-ack mode because the queue
  would send a batch of messages followed by a FLOW containing the advanced
  delivery-count. Only therafter the client will learn that it ran out of
  credits and top-up again. This feels like synchronously pulling a batch
  of messages. In contrast, option 2 sends out more messages as soon as
  the previous messages left RabbitMQ without requiring again a credit top
  up from the receiver.
* drain mode with large credits requires the queue to send all available
  messages and only thereafter advance the delivery-count. Therefore,
  drain mode breaks option 3 somewhat.

 ### Option 4
Session proc drops message payload when its outgoing-pending queue gets
too large and re-reads payloads from the queue once the message can be
sent (see `get_checked_out` Ra command for quorum queues).

Cons:
* Would need to be implemented for every queue type, especially classic queues
* Doesn't limit the amount of message metadata in the session proc's
  outgoing-pending queue

 ### Decision: Option 2
This commit implements option 2 to avoid any queue type modification.
At most one credit request is in-flight between session process and
queue process for a given queue consumer.
If the AMQP client sends another FLOW in between, the session proc
stashes the FLOW until it processes the previous credit reply.

A delivery is only sent from the outgoing-pending queue if the
session proc is not blocked by
1. writer proc, or
2. remote-incoming-window

The credit reply is placed into the outgoing-pending queue.
This ensures that the session proc will only top up the next batch of
credits if sufficient messages were sent out to the writer proc.

A future commit could additionally have each queue limit the number of
unacked messages for a given AMQP consumer, or alternatively make use
of session outgoing-window.
2024-06-04 13:11:55 +02:00
Diana Parra Corbacho 3bbda5bdba Remove classic mirror queues 2024-06-04 13:00:31 +02:00
David Ansari bd847b8cac Put credit flow config into persistent term
Put configuration credit_flow_default_credit into persistent term such
that the tuple doesn't have to be copied on the hot path.

Also, change persistent term keys from `{rabbit, AtomKey}` to `AtomKey`
so that hashing becomes cheaper.
2024-05-31 16:20:51 +02:00
dependabot[bot] f58f084bef
Bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.25.3 to 3.26.0.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.25.3...assertj-build-3.26.0)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-05-27 18:55:47 +00:00
Simon Unge 19a751890c Remove checks to vhost-limit as that is now handled by rabbit_queue_type:declare
Add new error return tuple when queue limit is exceed
2024-05-27 09:53:54 +02:00
Michal Kuratczyk cfa3de4b2b
Remove unused imports (thanks elp!) 2024-05-23 16:36:08 +02:00
Loïc Hoguin ecf46002e0
Remove availability of CQv1
We reject CQv1 in rabbit.schema as well.

Most of the v1 code is still around as it is needed
for conversion to v2. It will be removed at a later
time when conversion is no longer supported.

We don't shard the CQ property suite anymore:
there's only 1 case remaining.
2024-05-13 13:06:07 +02:00
Karl Nilsson 49a4c66fce Move feature flag check outside of mc
the `mc` module is ideally meant to be kept pure and portable
 and feature flags have external infrastructure dependencies
as well as impure semantics.

Moving the check of this feature flag into the amqp session
simplifies the code (as no message containers with the new
format will enter the system before the feature flag is enabled).
2024-05-13 10:05:40 +02:00
Luke Bakken 620fff22f1
Log errors from `ranch:handshake`
Fixes #11171

An MQTT user encountered TLS handshake timeouts with their IoT device,
and the actual error from `ssl:handshake` / `ranch:handshake` was not
caught and logged.

At this time, `ranch` uses `exit(normal)` in the case of timeouts, but
that should change in the future
(https://github.com/ninenines/ranch/issues/336)
2024-05-06 08:24:38 -07:00
David Ansari 1ecaaadb32
Merge pull request #10964 from rabbitmq/amqp-parser
4.x: change AMQP on disk message format & speed up the AMQP parser
2024-05-03 14:48:23 +02:00
Rin Kuryloski ea4e301510 Avoid leaking nodes in rabbitmq_mqtt shared_SUITE
Test nodes were not torn down in the inner group mnesia_store
2024-05-02 12:26:27 +02:00
David Ansari 9f42e40346 Fix crash when old node receives new AMQP message
Similar to how we convert from mc_amqp to mc_amqpl before
sending to a classic queue or quorum queue process if
feature flag message_containers_store_amqp_v1 is disabled,
we also need to do the same conversion before sending to an MQTT QoS 0
queue on the old node.
2024-05-02 07:56:00 +00:00
David Ansari 6225dc9928 Do not parse entire AMQP body
Prior to this commit the entire amqp-value or amqp-sequence sections
were parsed when converting a message from mc_amqp.
Parsing the entire amqp-value or amqp-sequence section can generate a
huge amount of garbage depending on how large these sections are.

Given that other protocol cannot make use of amqp-value and
amqp-sequence sections anyway, leave them AMQP encoded when converting
from mc_amqp.

In fact prior to this commit, the entire body section was parsed
generating huge amounts of garbage just to subsequently encode it again
in mc_amqpl or mc_mqtt.

The new conversion interface from mc_amqp to other mc_* modules will
either output amqp-data sections or the encoded amqp-value /
amqp-sequence sections.
2024-05-02 07:56:00 +00:00
David Ansari eac469a8a2 Change AMQP header durable to true by default
AMQP 3.2.1 defines durable=false to be the default.

However, the same section also mentions:
> If the header section is omitted the receiver MUST assume the appropriate
> default values (or the meaning implied by no value being set) for the
> fields within the header unless other target or node specific defaults
> have otherwise been set.

We want RabbitMQ to be secure by default, hence in RabbitMQ we set
durable=true to be the default.
2024-05-02 07:56:00 +00:00
David Ansari 81709d9745 Fix MQTT QoS
This commit fixes test
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed -t- \
    --test_sharding_strategy=disabled --test_env \
    FOCUS="-group [mqtt,v3,cluster_size_3] -case pubsub"
```

Fix some mixed version tests

Assume the AMQP body, especially amqp-value section won't be parsed.
Hence, omit smart conversions from AMQP to MQTT involving the
Payload-Format-Indicator bit.

Fix test

Fix
```
bazel test //deps/amqp10_client:system_SUITE-mixed -t- --test_sharding_strategy=disabled --test_env FOCUS="-group [rabbitmq]
```
2024-05-02 07:56:00 +00:00
David Ansari fc7f458f7c Fix tests 2024-05-02 07:56:00 +00:00
David Ansari 312d2af806 Fix AMQP -> MQTT durable conversion 2024-05-02 07:55:59 +00:00
Rin Kuryloski 9ed6155c0d Add elixir to PLT_APPS for some plugins 2024-04-29 15:23:09 +02:00
Rin Kuryloski 6a9d668def Set PLT_APPS in a number of plugins where it was missing 2024-04-29 14:54:28 +02:00
David Ansari e576dd766a Set durable annotation for MQTT messages
This is a follow up to https://github.com/rabbitmq/rabbitmq-server/pull/11012

 ## What?
For incoming MQTT messages, always set the `durable` message container
annotation.

 ## Why?
Even though defaulting to `durable=true` when no durable annotation is
set, as prior to this commit, is good enough, explicitly setting the
durable annotation makes the code a bit more future proof and
maintainable going forward in 4.0 where we will rely more on the durable
annotation because AMQP 1.0 message headers will be omitted in classic
and quorum queues (see https://github.com/rabbitmq/rabbitmq-server/pull/10964)

For MQTT messages, it's important to know whether the message was
published with QoS 0 or QoS 1 because it affects the QoS for the MQTT
message that will delivered to the MQTT subscriber.

The performance impact of always setting the durable annotation is
negligible.
2024-04-22 17:33:10 +02:00
David Ansari bb106ff65c Skip access check on absent will queue
Resolves https://github.com/rabbitmq/rabbitmq-server/discussions/11021

Prior to this commit, an MQTT client that connects to RabbitMQ needed
configure access to its will queue even if the will queue has never
existed. This breaks client apps connecting with either v3 or v4 or with
v5 without making use of the Will-Delay-Interval.

Specifically, in 3.13.0 and 3.13.1 an MQTT client that connects to
RabbitMQ needs unnecessarily configure access to queue
`mqtt-will-<MQTT client ID>`.

This commit only check for configure access, if the queue actually gets
deleted, i.e. if it existed.
2024-04-17 12:50:41 +02:00
David Ansari e96125bfd3 Store MQTT messages as non-durable if QoS 0
By default, when the 'durable' message container (mc) annotation is unset,
messages are interpreted to be durable.

Prior to this commit, MQTT messages that were sent with QoS 0 were
stored durably in classic queues.
This commit takes the same approach for mc_mqtt as for mc_amqpl and mc_amqp:
If the message is durable, the durable mc annotation will not be set.
If the message is non-durable, the durable mc annotation will be set to false.
2024-04-16 11:43:58 +02:00
David Ansari 71d1b3b455 Respect message_interceptors.incoming.set_header_timestamp
When feature flag message_containers is enabled, setting
```
message_interceptors.incoming.set_header_timestamp
```
wasn't respected anymore when a message is published via MQTT to a
stream and subsequently consumed via AMQP 0.9.1.

This commit ensures that AMQP 0.9.1 header timestamp_in_ms will be
set.

Note that we must not modify the AMQP 1.0 properties section when messages
are received via AMQP 1.0 and consumed via AMQP 1.0.
Also, message annoation keys not starting with "x-" are reserved.
2024-04-10 11:40:49 +02:00
dependabot[bot] 23f3ebf381
Bump com.rabbitmq:amqp-client
Bumps [com.rabbitmq:amqp-client](https://github.com/rabbitmq/rabbitmq-java-client) from 5.20.0 to 5.21.0.
- [Release notes](https://github.com/rabbitmq/rabbitmq-java-client/releases)
- [Commits](https://github.com/rabbitmq/rabbitmq-java-client/compare/v5.20.0...v5.21.0)

---
updated-dependencies:
- dependency-name: com.rabbitmq:amqp-client
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-04-08 19:00:59 +00:00
David Ansari 1b75baddd9 Add AMQP 1.0 -> MQTT 5.0 -> AMQP 1.0 test 2024-04-05 15:09:21 +02:00
David Ansari 390d5715a0 Introduce new AMQP 1.0 address format
## What?
Introduce a new address format (let's call it v2) for AMQP 1.0 source and target addresses.

The old format (let's call it v1) is described in
https://github.com/rabbitmq/rabbitmq-server/tree/v3.13.x/deps/rabbitmq_amqp1_0#routing-and-addressing

The only v2 source address format is:
```
/queue/:queue
```

The 4 possible v2 target addresses formats are:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
<null>
```
where the last AMQP <null> value format requires that each message’s `to` field contains one of:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
```

 ## Why?

The AMQP address v1 format comes with the following flaws:

1. Obscure address format:

Without reading the documentation, the differences for example between source addresses
```
/amq/queue/:queue
/queue/:queue
:queue
```
are unknown to users. Hence, the address format is obscure.

2. Implicit creation of topologies

Some address formats implicitly create queues (and bindings), such as source address
```
/exchange/:exchange/:binding-key
```
or target address
```
/queue/:queue
```
These queues and bindings are never deleted (by the AMQP 1.0 plugin.)
Implicit creation of such topologies is also obscure.

3. Redundant address formats

```
/queue/:queue
:queue
```
have the same meaning and are therefore redundant.

4. Properties section must be parsed to determine whether a routing key is present

Target address
```
/exchange/:exchange
```
requires RabbitMQ to parse the properties section in order to check whether the message `subject` is set.
If `subject` is not set, the routing key will default to the empty string.

5. Using `subject` as routing key misuses the purpose of this field.

According to the AMQP spec, the message `subject` field's purpose is:
> A common field for summary information about the message content and purpose.

6. Exchange names, queue names and routing keys must not contain the "/" (slash) character.

The current 3.13 implemenation splits by "/" disallowing these
characters in exchange, and queue names, and routing keys which is
unnecessary prohibitive.

7. Clients must create a separate link per target exchange

While this is reasonable working assumption, there might be rare use
cases where it could make sense to create many exchanges (e.g. 1
exchange per queue, see
https://github.com/rabbitmq/rabbitmq-server/discussions/10708) and have
a single application publish to all these exchanges.
With the v1 address format, for an application to send to 500 different
exchanges, it needs to create 500 links.

Due to these disadvantages and thanks to #10559 which allows clients to explicitly create topologies,
we can create a simpler, clearer, and better v2 address format.

 ## How?

 ### Design goals

Following the 7 cons from v1, the design goals for v2 are:
1. The address format should be simple so that users have a chance to
   understand the meaning of the address without necessarily consulting the docs.
2. The address format should not implicitly create queues, bindings, or exchanges.
   Instead, topologies should be created either explicitly via the new management node
   prior to link attachment (see #10559), or in future, we might support the `dynamic`
   source or target properties so that RabbitMQ creates queues dynamically.
3. No redundant address formats.
4. The target address format should explicitly state whether the routing key is present, empty,
   or will be provided dynamically in each message.
5. `Subject` should not be used as routing key. Instead, a better
   fitting field should be used.
6. Exchange names, queue names, and routing keys should allow to contain
   valid UTF-8 encoded data including the "/" character.
7. Allow both target exchange and routing key to by dynamically provided within each message.

Furthermore
8. v2 must co-exist with v1 for at least some time. Applications should be able to upgrade to
   RabbitMQ 4.0 while continuing to use v1. Examples include AMQP 1.0 shovels and plugins communicating
   between a 4.0 and a 3.13 cluster. Starting with 4.1, we should change the AMQP 1.0 shovel and plugin clients
   to use only the new v2 address format. This will allow AMQP 1.0 and plugins to communicate between a 4.1 and 4.2 cluster.
   We will deprecate v1 in 4.0 and remove support for v1 in a later 4.x version.

 ### Additional Context

The address is usually a String, but can be of any type.

The [AMQP Addressing extension](https://docs.oasis-open.org/amqp/addressing/v1.0/addressing-v1.0.html)
suggests that addresses are URIs and are therefore hierarchical and could even contain query parameters:
> An AMQP address is a URI reference as defined by RFC3986.

> the path expression is a sequence of identifier segments that reflects a path through an
> implementation specific relationship graph of AMQP nodes and their termini.
> The path expression MUST resolve to a node’s terminus in an AMQP container.

The [Using the AMQP Anonymous Terminus for Message Routing Version 1.0](https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html)
extension allows for the target being `null` and the `To` property to contain the node address.
This corresponds to AMQP 0.9.1 where clients can send each message on the same channel to a different `{exchange, routing-key}` destination.

The following v2 address formats will be used.

 ### v2 addresses

A new deprecated feature flag `amqp_address_v1` will be introduced in 4.0 which is permitted by default.
Starting with 4.1, we should change the AMQP 1.0 shovel and plugin AMQP 1.0 clients to use only the new v2 address format.
However, 4.1 server code must still understand the 4.0 AMQP 1.0 shovel and plugin AMQP 1.0 clients’ v1 address format.
The new deprecated feature flag will therefore be denied by default in 4.2.
This allows AMQP 1.0 shovels and plugins to work between
* 4.0 and 3.13 clusters using v1
* 4.1 and 4.0 clusters using v2 from 4.1 to v4.0 and v1 from 4.0 to 4.1
* 4.2 and 4.1 clusters using v2

without having to support both v1 and v2 at the same time in the AMQP 1.0 shovel and plugin clients.
While supporting both v1 and v2 in these clients is feasible, it's simpler to switch the client code directly from v1 to v2.

 ### v2 source addresses

The source address format is
```
/queue/:queue
```
If the deprecated feature flag `amqp_address_v1` is permitted and the queue does not exist, the queue will be auto-created.
If the deprecated feature flag `amqp_address_v1` is denied, the queue must exist.

 ### v2 target addresses

v1 requires attaching a new link for each destination exchange.
v2 will allow dynamic `{exchange, routing-key}` combinations for a given link.
v2 therefore allows for the rare use cases where a single AMQP 1.0 publisher app needs to send to many different exchanges.
Setting up a link per destination exchange could be cumbersome.
Hence, v2 will support the dynamic `{exchange, routing-key}` combinations of AMQP 0.9.1.
To achieve this, we make use of the "Anonymous Terminus for Message Routing" extension:
The target address will contain the AMQP value null.
The `To` field in each message must be set and contain either address format
```
/exchange/:exchange/key/:routing-key
```
or
```
/exchange/:exchange
```
when using the empty routing key.

The `to` field requires an address type and is better suited than the `subject field.

Note that each message will contain this `To` value for the anonymous terminus.
Hence, we should save some bytes being sent across the network and stored on disk.
Using a format
```
/e/:exchange/k/:routing-key
```
saves more bytes, but is too obscure.
However, we use only `/key/` instead of `/routing-key/` so save a few bytes.
This also simplifies the format because users don’t have to remember whether to use spell `routing-key` or `routing_key` or `routingkey`.

The other allowed target address formats are:
```
/exchange/:exchange/key/:routing-key
```
where exchange and routing key are static on the given link.

```
/exchange/:exchange
```
where exchange and routing key are static on the given link, and routing key will be the empty string (useful for example for the fanout exchange).

```
/queue/:queue
```
This provides RabbitMQ beginners the illusion of sending a message directly
to a queue without having to understand what exchanges and routing keys are.
If the deprecated feature flag `amqp_address_v1` is permitted and the queue does not exist, the queue will be auto-created.
If the deprecated feature flag `amqp_address_v1` is denied, the queue must exist.
Besides the additional queue existence check, this queue target is different from
```
/exchange//key/:queue
```
in that queue specific optimisations might be done (in future) by RabbitMQ
(for example different receiving queue types could grant different amounts of link credits to the sending clients).
A write permission check to the amq.default exchange will be performed nevertheless.

v2 will prohibit the v1 static link & dynamic routing-key combination
where the routing key is sent in the message `subject` as that’s also obscure.
For this use case, v2’s new anonymous terminus can be used where both exchange and routing key are defined in the message’s `To` field.

(The bare message must not be modified because it could be signed.)

The alias format
```
/topic/:topic
```
will also be removed.
Sending to topic exchanges is arguably an advanced feature.
Users can directly use the format
```
/exchange/amq.topic/key/:topic
```
which reduces the number of redundant address formats.

 ### v2 address format reference

To sump up (and as stated at the top of this commit message):

The only v2 source address format is:
```
/queue/:queue
```

The 4 possible v2 target addresses formats are:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
<null>
```
where the last AMQP <null> value format requires that each message’s `to` field contains one of:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
```

Hence, all 8 listed design goals are reached.
2024-04-05 12:22:02 +02:00
David Ansari dda1c500da Require feature flag message_containers
as it is required for Native AMQP 1.0 in 4.0.

Remove compatibility code.
2024-04-04 15:11:31 +02:00
David Ansari 4b9574571b Move macro to the correct Erlang app
The Web MQTT link is not used in the rabbitmq_mqtt Erlang app.
This link is only used in the rabbitmq_web_mqtt Erlang app.
Hence, move the link to the correct Erlang app.
2024-03-21 16:29:11 +01:00
David Ansari 8233db0703 Fix wrong test assumptions
PR #10761 added a new CLI command to list Web MQTT connections.
That new CLI command relies on feature flag delete_ra_cluster_mqtt_node
being enabled.

This commit ensures exactly this condition.
2024-03-21 16:24:57 +01:00
Jean-Sébastien Pédron affcb6aba5
Revert "Merge pull request #10772 from rabbitmq/dependabot/maven/deps/rabbitmq_mqtt/test/java_SUITE_data/main/org.apache.maven.plugins-maven-compiler-plugin-3.13.0"
This reverts commit d8505d6f43, reversing
changes made to d96b127a3b.
2024-03-19 16:05:53 +01:00
Michael Klishin f7697c3d19
Merge pull request #10761 from rabbitmq/cloudamqp-fix/9302-list-webmqtt-connections
A new command for Web MQTT connection listing #10693 #9302
2024-03-18 20:20:14 -04:00
dependabot[bot] ff4edf8d9f
Bump org.apache.maven.plugins:maven-compiler-plugin
Bumps [org.apache.maven.plugins:maven-compiler-plugin](https://github.com/apache/maven-compiler-plugin) from 3.12.1 to 3.13.0.
- [Release notes](https://github.com/apache/maven-compiler-plugin/releases)
- [Commits](https://github.com/apache/maven-compiler-plugin/compare/maven-compiler-plugin-3.12.1...maven-compiler-plugin-3.13.0)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-compiler-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-03-18 18:12:22 +00:00
Rin Kuryloski 5271c26908 Mixed version test fix 2024-03-18 10:34:34 +01:00
Lois Soto Lopez f7de9ba0b4 Remove unnecessary newline
Discussion #9302
2024-03-16 13:08:30 -04:00
LoisSotoLopez 481cac6430 Fix/9302 test list Web MQTT connections command
Discussion #9302
2024-03-16 13:08:30 -04:00
Lois Soto Lopez befb3b3015 Prevent not_found on list web mqtt connections
Fixes #9302
2024-03-16 13:08:30 -04:00
Michael Klishin eb261acd30
CLI: update guide URLs to use the new path structure
the original paths, e.g. /streams.html, do have redirects
in place but it turned out to be a surprisingly fragile
Cloudflare feature when there are hundreds of them,
so we better switch now.
2024-03-07 15:53:14 -05:00
David Ansari 8cb313d5a1 Support AMQP 1.0 natively
## What

Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0.
By "native", we mean do not proxy via AMQP 0.9.1 anymore.

  ## Why

Native AMQP 1.0 comes with the following major benefits:
1. Similar to Native MQTT, this commit provides better throughput, latency,
   scalability, and resource usage for AMQP 1.0.
   See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements.
   See further below for some benchmarks.
2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol,
   this commit allows implementing more AMQP 1.0 features in the future.
   Some features are already implemented in this commit (see next section).
3. Simpler, better understandable, and more maintainable code.

Native AMQP 1.0 as implemented in this commit has the
following major benefits compared to AMQP 0.9.1:
4. Memory and disk alarms will only stop accepting incoming TRANSFER frames.
   New connections can still be created to consume from RabbitMQ to empty queues.
5. Due to 4. no need anymore for separate connections for publishers and
   consumers as we currently recommended for AMQP 0.9.1. which potentially
   halves the number of physical TCP connections.
6. When a single connection sends to multiple target queues, a single
   slow target queue won't block the entire connection.
   Publisher can still send data quickly to all other target queues.
7. A publisher can request whether it wants publisher confirmation on a per-message basis.
   In AMQP 0.9.1 publisher confirms are configured per channel only.
8. Consumers can change their "prefetch count" dynamically which isn't
   possible in our AMQP 0.9.1 implementation. See #10174
9. AMQP 1.0 is an extensible protocol

This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in
RabbitMQ 3.x - most of which cannot be backported due to the complexity
and limitations of the old 3.x implementation.

This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.

 ## Implementation details

1. Breaking change: With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
will break because we always convert according to the message container conversions.
For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties.
Also, `false` won’t be respected since we always convert the headers with message containers.

2. Remove rabbit_queue_collector

rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.

3. 7 processes per connection + 1 process per session in this commit instead of
   7 processes per connection + 15 processes per session in 3.x
Supervision hierarchy got re-designed.

4. Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel.
Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session.
Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
  a connection write heavily at the same time.

This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.
Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
  can accumulate across all sessions bytes before flushing the socket.

In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy.
We still ensure high throughput by having separate reader, writer, and
session processes.

5. Transform rabbit_amqp1_0_writer into gen_server
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's better
to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.

How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.

6. Remove stats timer from writer
AMQP 1.0 connections haven't emitted any stats previously.

7. When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs.
This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
  settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
  and unsettled TRANSFERs.

8. Introduce credit API v2
Why:
The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly
designed since basic.credit is a synchronous call into the queue process
blocking the entire AMQP 1.0 session process.

How:
Change the interactions between queue clients and queue server
implementations:
* Clients only request a credit reply if the FLOW's `echo` field is set
* Include all link flow control state held by the queue process into a
  new credit_reply queue event:
  * `available` after the queue sends any deliveries
  * `link-credit` after the queue sends any deliveries
  * `drain` which allows us to combine the old queue events
    send_credit_reply and send_drained into a single new queue event
    credit_reply.
* Include the consumer tag into the credit_reply queue event such that
  the AMQP 1.0 session process can process any credit replies
  asynchronously.

Link flow control state `delivery-count` also moves to the queue processes.

The new interactions are hidden behind feature flag credit_api_v2 to
allow for rolling upgrades from 3.13 to 4.0.

9. Use serial number arithmetic in quorum queues and session process.

10. Completely bypass the rabbit_limiter module for AMQP 1.0
flow control. The goal is to eventually remove the rabbit_limiter module
in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This
commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter
into rabbit_queue_consumers.

11. Fix credit bug for streams:
AMQP 1.0 settlements shouldn't top up link credit,
only FLOW frames should top up link credit.

12. Allow sender settle mode unsettled for streams
since AMQP 1.0 acknowledgements to streams are no-ops (currently).

13. Fix AMQP 1.0 client bugs
Auto renewing credits should not be related to settling TRANSFERs.
Remove field link_credit_unsettled as it was wrong and confusing.
Prior to this commit auto renewal did not work when the sender uses
sender settlement mode settled.

14. Fix AMQP 1.0 client bugs
The wrong outdated Link was passed to function auto_flow/2

15. Use osiris chunk iterator
Only hold messages of uncompressed sub batches in memory if consumer
doesn't have sufficient credits.
Compressed sub batches are skipped for non Stream protocol consumers.

16. Fix incoming link flow control
Always use confirms between AMQP 1.0 queue clients and queue servers.
As already done internally by rabbit_fifo_client and
rabbit_stream_queue, use confirms for classic queues as well.

17. Include link handle into correlation when publishing messages to target queues
such that session process can correlate confirms from target queues to
incoming links.

18. Only grant more credits to publishers if publisher hasn't sufficient credits
anymore and there are not too many unconfirmed messages on the link.

19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow
between classic queue process and session process.

20. Link flow control is independent between links.
A client can refer to a queue or to an exchange with multiple
dynamically added target queues. Multiple incoming links can also fan
in to the same queue. However the link topology looks like, this
commit ensures that each link is only granted more credits if that link
isn't overloaded.

21. A connection or a session can send to many different queues.
In AMQP 0.9.1, a single slow queue will lead to the entire channel, and
then entire connection being blocked.
This commit makes sure that a single slow queue from one link won't slow
down sending on other links.
For example, having link A sending to a local classic queue and
link B sending to 5 replica quorum queue, link B will naturally
grant credits slower than link A. So, despite the quorum queue being
slower in confirming messages, the same AMQP 1.0 connection and session
can still pump data very fast into the classic queue.

22. If cluster wide memory or disk alarm occurs.
Each session sends a FLOW with incoming-window to 0 to sending client.
If sending clients don’t obey, force disconnect the client.

If cluster wide memory alarm clears:
Each session resumes with a FLOW defaulting to initial incoming-window.

23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms,
specifically, attaching consumers and consuming, i.e. emptying queues.
There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.

24. Flow control summary:
* If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
* If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
* If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied.
Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.

25. Register AMQP sessions
Prefer local-only pg over our custom pg_local implementation as
pg is a better process group implementation than pg_local.
pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.

26. Start a local-only pg when Rabbit boots:
> A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name:
> pg:start_link(node()).
Register AMQP 1.0 connections and sessions with pg.

In future we should remove pg_local and instead use the new local-only
pg for all registered processes such as AMQP 0.9.1 connections and channels.

27. Requeue messages if link detached
Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed'
field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed,
we expect every outstanding delivery to be requeued.
In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued.
Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1:
"After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can
still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them."
[https://www.rabbitmq.com/consumers.html#unsubscribing]
An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries

28. Init AMQP session with BEGIN frame
Similar to how there can't be an MQTT processor without a CONNECT
frame, there can't be an AMQP session without a BEGIN frame.
This allows having strict dialyzer types for session flow control
fields (i.e. not allowing 'undefined').

29. Move serial_number to AMQP 1.0 common lib
such that it can be used by both AMQP 1.0 server and client

30. Fix AMQP client to do serial number arithmetic.

31. AMQP client: Differentiate between delivery-id and transfer-id for better
understandability.

32. Fix link flow control in classic queues
This commit fixes
```
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
```
followed by
```
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2
```
Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.

The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.
Fixes #2597

The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.

Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.

33. The double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This is a big simplification.

34. This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.

Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.

Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).

Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.

This approach is safe and simple.

The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:

i. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.

ii. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simultaneously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?

35. Support stream filtering in AMQP 1.0 (by @acogoluegnes)
Use the x-stream-filter-value message annotation
to carry the filter value in a published message.
Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered
filters when creating a receiver that wants to filter
out messages from a stream.

36. Remove credit extension from AMQP 0.9.1 client

37. Support maintenance mode closing AMQP 1.0 connections.

38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.

39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default.
    The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment
    tools from failing that execute:
```
rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
```

40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`.
Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`:
```
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}
```

 ## Benchmarks

 ### Throughput & Latency

Setup:
* Single node Ubuntu 22.04
* Erlang 26.1.1

Start RabbitMQ:
```
make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"
```

Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.

Start client:
https://github.com/ssorj/quiver
https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
```

1. Classic queue
```
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```

This commit:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s

Latencies by percentile:

          0% ........ 0 ms       90.00% ........ 9 ms
         25% ........ 2 ms       99.00% ....... 14 ms
         50% ........ 4 ms       99.90% ....... 17 ms
        100% ....... 26 ms       99.99% ....... 24 ms
```

RabbitMQ 3.x (main branch as of 30 January 2024):
```
---------------------- Sender -----------------------  --------------------- Receiver ----------------------  --------
Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Lat [ms]
-----------------------------------------------------  -----------------------------------------------------  --------
     2.1        130,814      65,342        6     73.6       2.1          3,217       1,607        0      8.0       511
     4.1        163,580      16,367        2     74.1       4.1          3,217           0        0      8.0         0
     6.1        229,114      32,767        3     74.1       6.1          3,217           0        0      8.0         0
     8.1        261,880      16,367        2     74.1       8.1         67,874      32,296        8      8.2     7,662
    10.1        294,646      16,367        2     74.1      10.1         67,874           0        0      8.2         0
    12.1        360,180      32,734        3     74.1      12.1         67,874           0        0      8.2         0
    14.1        392,946      16,367        3     74.1      14.1         68,604         365        0      8.2    12,147
    16.1        458,480      32,734        3     74.1      16.1         68,604           0        0      8.2         0
    18.1        491,246      16,367        2     74.1      18.1         68,604           0        0      8.2         0
    20.1        556,780      32,767        4     74.1      20.1         68,604           0        0      8.2         0
    22.1        589,546      16,375        2     74.1      22.1         68,604           0        0      8.2         0
receiver timed out
    24.1        622,312      16,367        2     74.1      24.1         68,604           0        0      8.2         0
quiver:  error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
  File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
    _plano.wait(receiver, check=True)
  File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
    raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
```

2. Quorum queue:
```
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s

Latencies by percentile:

          0% ....... 11 ms       90.00% ....... 23 ms
         25% ....... 15 ms       99.00% ....... 28 ms
         50% ....... 18 ms       99.90% ....... 33 ms
        100% ....... 49 ms       99.99% ....... 47 ms
```

RabbitMQ 3.x:
```
---------------------- Sender -----------------------  --------------------- Receiver ----------------------  --------
Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Lat [ms]
-----------------------------------------------------  -----------------------------------------------------  --------
     2.1        130,814      65,342        9     69.9       2.1         18,430       9,206        5      7.6     1,221
     4.1        163,580      16,375        5     70.2       4.1         18,867         218        0      7.6     2,168
     6.1        229,114      32,767        6     70.2       6.1         18,867           0        0      7.6         0
     8.1        294,648      32,734        7     70.2       8.1         18,867           0        0      7.6         0
    10.1        360,182      32,734        6     70.2      10.1         18,867           0        0      7.6         0
    12.1        425,716      32,767        6     70.2      12.1         18,867           0        0      7.6         0
receiver timed out
    14.1        458,482      16,367        5     70.2      14.1         18,867           0        0      7.6         0
quiver:  error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
  File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
    _plano.wait(receiver, check=True)
  File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
    raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
```

3. Stream:
```
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose
```

This commit:
```
Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s
```

RabbitMQ 3.x:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s
```

 ### Memory usage

Start RabbitMQ:
```
ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
```

```
/bin/cat rabbitmq.conf

tcp_listen_options.sndbuf  = 2048
tcp_listen_options.recbuf  = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none
```

Create 50k connections with 2 sessions per connection, i.e. 100k session in total:

```go
package main

import (
	"context"
	"log"
	"time"

	"github.com/Azure/go-amqp"
)

func main() {
	for i := 0; i < 50000; i++ {
		conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
		if err != nil {
			log.Fatal("dialing AMQP server:", err)
		}
		_, err = conn.NewSession(context.TODO(), nil)
		if err != nil {
			log.Fatal("creating AMQP session:", err)
		}
		_, err = conn.NewSession(context.TODO(), nil)
		if err != nil {
			log.Fatal("creating AMQP session:", err)
		}
	}
	log.Println("opened all connections")
	time.Sleep(5 * time.Hour)
}
```

This commit:
```
erlang:memory().
[{total,4586376480},
 {processes,4025898504},
 {processes_used,4025871040},
 {system,560477976},
 {atom,1048841},
 {atom_used,1042841},
 {binary,233228608},
 {code,21449982},
 {ets,108560464}]

erlang:system_info(process_count).
450289
```
7 procs per connection + 1 proc per session.
(7 + 2*1) * 50,000 = 450,000 procs

RabbitMQ 3.x:
```
erlang:memory().
[{total,15168232704},
 {processes,14044779256},
 {processes_used,14044755120},
 {system,1123453448},
 {atom,1057033},
 {atom_used,1052587},
 {binary,236381264},
 {code,21790238},
 {ets,391423744}]

erlang:system_info(process_count).
1850309
```
7 procs per connection + 15 per session
(7 + 2*15) * 50,000 = 1,850,000 procs

50k connections + 100k session require
with this commit: 4.5 GB
in RabbitMQ 3.x: 15 GB

 ## Future work

1. More efficient parser and serializer
2. TODO in mc_amqp: Do not store the parsed message on disk.
3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP
clients to create RabbitMQ objects (queues, exchanges, ...).
2024-02-28 14:15:20 +01:00
David Ansari 8b151f43f7 Parse 2 bytes encoded AMQP boolean to Erlang boolean
An AMQP boolean can by encoded using 1 byte or 2 bytes:
https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-boolean

Prior to this commit, our Erlang parser returned:
* Erlang terms `true` or `false` for the 1 byte AMQP encoding
* Erlang terms `{boolean, true}` or `{boolean, false}` for the 2 byte AMQP enconding

Having a serializer and parser that perform the opposite actions such
that
```
Term = parse(serialize(Term))
```
is desirable as it provides a symmetric property useful not only for
property based testing, but also for avoiding altering message hashes
when serializing and parsing the same term.

However, dealing wth `{boolean, boolean()}` tuples instead of `boolean()` is very unhandy since
all Erlang code must take care of both forms leading to subtle bugs as
occurred in:
* 4cbeab8974/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl (L155-L158)
* b8173c9d3b/deps/rabbitmq_mqtt/src/mc_mqtt.erl (L83-L88)
* b8173c9d3b/deps/rabbit/src/mc_amqpl.erl (L123-L127)

Therefore, this commits decides to take the safe approach and always
parse to an Erlang `boolean()` independent of whether the AMQP boolean
was encoded with 1 or 2 bytes.
2024-02-16 17:57:28 +01:00
dependabot[bot] 8b4862972e
build(deps-dev): bump org.junit.jupiter:junit-jupiter
Bumps [org.junit.jupiter:junit-jupiter](https://github.com/junit-team/junit5) from 5.10.1 to 5.10.2.
- [Release notes](https://github.com/junit-team/junit5/releases)
- [Commits](https://github.com/junit-team/junit5/compare/r5.10.1...r5.10.2)

---
updated-dependencies:
- dependency-name: org.junit.jupiter:junit-jupiter
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-02-05 18:29:48 +00:00
dependabot[bot] 9f24f9348c
build(deps-dev): bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.25.2 to 3.25.3.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.25.2...assertj-build-3.25.3)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-02-05 18:28:08 +00:00
Michael Klishin 9c79ad8d55 More missed license header updates #9969 2024-02-05 12:26:25 -05:00
Michael Klishin f414c2d512
More missed license header updates #9969 2024-02-05 11:53:50 -05:00
David Ansari bedcae18c2 Fix MQTT test flake
Prior to this commit test block_connack_timeout
flaked when 2 new ports got created instead of only 1
in line
```
[NewPort] = Ports -- Ports0,
```

This commit filters for tcp_inet ports.
This will always return the port of the new MQTT connection.
2024-01-27 17:43:38 +01:00
dependabot[bot] 2b83b000f7
build(deps-dev): bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.25.1 to 3.25.2.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.25.1...assertj-build-3.25.2)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-24 19:01:02 +00:00
Arnaud Cogoluègnes 1f89ede396
Remove rabbit_authz_backend:state_can_expire/0
Use expiry_timestamp/1 instead, which returns 'never'
if the credentials do not expire.

Fixes #10382
2024-01-24 09:58:59 +01:00
dependabot[bot] 4b5ef6474f
build(deps): bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.42.0 to 2.43.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.42.0...lib/2.43.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-23 18:56:15 +00:00
Michael Klishin 7b151a7651 More missed (c) header updates 2024-01-22 23:44:47 -05:00
Michael Klishin c1d37e3e02
Merge pull request #10364 from rabbitmq/flaky-mc-flake-flake
Reduce flakiness of certain Common Test suites
2024-01-22 16:22:24 -05:00
Karl Nilsson c10b4dc0f0 protocol_interop_SUITE - try a durable queue for amqp part 2024-01-22 15:27:30 +00:00
Michael Klishin fb775afcc8
More (c) source header updates #9969 2024-01-19 19:53:28 -05:00
Karl Nilsson 7e2f148dd4 Try a little sleep in an mqtt test
It could help, we'll see.
2024-01-18 16:06:36 +00:00
David Ansari 6a3ba6210a
Reduce per message disk overhead (#10339)
* Reduce per message disk overhead

Message container annotation keys are stored on disk.
By shortening them we save 95 - 58 = 37 bytes per message.
```
1> byte_size(term_to_binary(#{exchange => <<>>, routing_keys => [<<"my-key">>], durable => true, priority => 3, timestamp => 1000})).
95
2> byte_size(term_to_binary(#{x => <<>>, rk => [<<"my-key">>], d => true, p => 3, ts => 1000})).
58
```
This should somewhat reduce disk I/O and disk space.

* Ensure durable is a boolean

Prevent key 'durable' with value 'undefined' being added to the
mc annotations, for example when the durable field was not set, but
another AMQP 1.0 header field was set.

* Apply feedback
2024-01-18 11:53:02 +01:00
dependabot[bot] e84d2d9cf6
build(deps): bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.41.1 to 2.42.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/maven/2.41.1...lib/2.42.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-15 18:17:40 +00:00
dependabot[bot] 12b6225387
build(deps): bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.2.3 to 3.2.5.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.2.3...surefire-3.2.5)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-10 18:59:58 +00:00
dependabot[bot] 0e22221efc
build(deps-dev): bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.25.0 to 3.25.1.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.25.0...assertj-build-3.25.1)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-03 18:37:55 +00:00
Michael Klishin 01092ff31f
(c) year bumps 2024-01-01 22:02:20 -05:00
dependabot[bot] 506dccb172
build(deps-dev): bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.24.2 to 3.25.0.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.24.2...assertj-build-3.25.0)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-01 18:59:45 +00:00
David Ansari 78b4fcc899 Allow MQTT QoS 0 subscribers to reconnect
The solution in #10203 has the following issues:
1. Bindings can be left ofter in Mnesia table rabbit_durable_queue.
One solution to 1. would be to first delete the old queue via
`rabbit_amqqueue:internal_delete(Q, User, missing_owner)`
and subsequently declare the new queue via
`rabbit_amqqueue:internal_declare(Q, false)`
However, even then, it suffers from:
2. Race conditions between `rabbit_amqqueue:on_node_down/1`
and `rabbit_mqtt_qos0_queue:declare/2`:
`rabbit_amqqueue:on_node_down/1` could first read the queue records that
need to be deleted, thereafter `rabbit_mqtt_qos0_queue:declare/2` could
re-create the queue owned by the new connection PID, and `rabbit_amqqueue:on_node_down/1`
could subsequently delete the re-created queue.

Unfortunately, `rabbit_amqqueue:on_node_down/1` does not delete
transient queues in one isolated transaction. Instead it first reads
queues and subsequenlty deletes queues in batches making it prone to
race conditions.

Ideally, this commit deletes all rabbit_mqtt_qos0_queue queues of the
node that has crashed including their bindings.
However, doing so in one transaction is risky as there may be millions
of such queues and the current code path applies the same logic on all
live nodes resulting in conflicting transactions and therefore a long
database operation.

Hence, this commit uses the simplest approach which should still be
safe:
Do not remove rabbit_mqtt_qos0_queue queues if a node crashes.
Other live nodes will continue to route to these dead queues.
That should be okay, given that the rabbit_mqtt_qos0_queue clients auto
confirm.
Continuing routing however has the effect of counting as routing result
for AMQP 0.9.1 `mandatory` property.
If an MQTT client re-connects to a live node with the same client ID,
the new node will delete and then re-create the queue.
Once the crashed node comes back online, it will clean up its leftover
queues and bindings.
2023-12-27 20:47:06 -05:00
dependabot[bot] 67b8d30281
build(deps): bump org.apache.maven.plugins:maven-compiler-plugin
Bumps [org.apache.maven.plugins:maven-compiler-plugin](https://github.com/apache/maven-compiler-plugin) from 3.12.0 to 3.12.1.
- [Release notes](https://github.com/apache/maven-compiler-plugin/releases)
- [Commits](https://github.com/apache/maven-compiler-plugin/compare/maven-compiler-plugin-3.12.0...maven-compiler-plugin-3.12.1)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-compiler-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-25 18:09:25 +00:00
David Ansari 9487189dc6 Overwrite rabbit_mqtt_qos0_queue record from crashed node
When a node is shut down cleanly, the rabbit_mqtt_qos0_queue record is
removed from Mnesia.
When a node crashes and subsequently reboots the new node incarnation
removes the old rabbit_mqtt_qos0_queue record from Mnesia (via
rabbit_mqtt_qos0_queue:recover/2)

However, when a node crashes, the rabbit_mqtt_qos0_queue will be removed
from Mnesia table rabbit_queue, but will still be present in table
rabbit_durable_queue on the other live nodes.
Prior to this commit, when the same MQTT client (i.e. same MQTT client
ID) re-connects from the crashed node to another live node and
re-subscribes, the following error occurred:
```
[info] <0.43155.0> Accepted MQTT connection 10.105.0.18:60508 -> 10.105.0.10:1883 for client ID nodered_24e214feb018a232
[debug] <0.43155.0> Received a SUBSCRIBE for topic(s) [{mqtt_topic,
[debug] <0.43155.0>                                        <<"as923/gateway/+/command/#">>,0}]
[error] <0.43155.0> Failed to declare queue 'mqtt-subscription-nodered_24e214feb018a232qos0' in vhost '/': {absent,
[error] <0.43155.0>                                                                                         {amqqueue,
[error] <0.43155.0>                                                                                          {resource,
[error] <0.43155.0>                                                                                           <<"/">>,
[error] <0.43155.0>                                                                                           queue,
[error] <0.43155.0>                                                                                           <<"mqtt-subscription-nodered_24e214feb018a232qos0">>},
[error] <0.43155.0>                                                                                          true,
[error] <0.43155.0>                                                                                          false,
[error] <0.43155.0>                                                                                          <15486.32690.0>,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          <15486.32690.0>,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          [{vhost,
[error] <0.43155.0>                                                                                            <<"/">>},
[error] <0.43155.0>                                                                                           {name,
[error] <0.43155.0>                                                                                            <<"ha-all-mqtt">>},
[error] <0.43155.0>                                                                                           {pattern,
[error] <0.43155.0>                                                                                            <<"^mqtt-">>},
[error] <0.43155.0>                                                                                           {'apply-to',
[error] <0.43155.0>                                                                                            <<"all">>},
[error] <0.43155.0>                                                                                           {definition,
[error] <0.43155.0>                                                                                            [{<<"ha-mode">>,
[error] <0.43155.0>                                                                                              <<"all">>}]},
[error] <0.43155.0>                                                                                           {priority,
[error] <0.43155.0>                                                                                            0}],
[error] <0.43155.0>                                                                                          undefined,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          undefined,
[error] <0.43155.0>                                                                                          live,
[error] <0.43155.0>                                                                                          0,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          <<"/">>,
[error] <0.43155.0>                                                                                          #{user =>
[error] <0.43155.0>                                                                                             <<"iottester">>},
[error] <0.43155.0>                                                                                          rabbit_mqtt_qos0_queue,
[error] <0.43155.0>                                                                                          #{}},
[error] <0.43155.0>                                                                                         nodedown}
[error] <0.43155.0> MQTT protocol error on connection 10.105.0.18:60508 -> 10.105.0.10:1883: subscribe_error
```

This commit fixes this error allowing an MQTT client that connects with CleanSession=true and
subscribes with QoS 0 to re-connect and re-subscribe to another live
node if the original Rabbit node crashes.

Reported in https://groups.google.com/g/rabbitmq-users/c/pxgy0QiwilM/m/LkJQ-3DyBgAJ
2023-12-21 17:30:15 +01:00
dependabot[bot] 0f5c2d2325
build(deps): bump org.apache.maven.plugins:maven-compiler-plugin
Bumps [org.apache.maven.plugins:maven-compiler-plugin](https://github.com/apache/maven-compiler-plugin) from 3.11.0 to 3.12.0.
- [Release notes](https://github.com/apache/maven-compiler-plugin/releases)
- [Commits](https://github.com/apache/maven-compiler-plugin/compare/maven-compiler-plugin-3.11.0...maven-compiler-plugin-3.12.0)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-compiler-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-19 18:09:22 +00:00
David Ansari abcbe5e647 Fix test expectation
as `main` and `v3.12.x` branches are currently red due to
https://github.com/rabbitmq/rabbitmq-server/pull/10139
2023-12-15 09:35:54 +01:00
David Ansari f44c851293 Fix crash when closing connection
Avoid the following crash
```
** Reason for termination ==
** {mqtt_unexpected_cast,{shutdown,"Closed via management plugin"}}

  crasher:
    initial call: rabbit_mqtt_reader:init/1
    pid: <0.1096.0>
    registered_name: []
    exception exit: {mqtt_unexpected_cast,
                        {shutdown,"Closed via management plugin"}}
      in function  gen_server:handle_common_reply/8 (gen_server.erl, line 1208)
```
when closing MQTT or Stream connections via HTTP API endpoint
```
/connections/username/:username
```
2023-12-14 12:35:51 +01:00
dependabot[bot] b3f33aa42c
build(deps): bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.2.2 to 3.2.3.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.2.2...surefire-3.2.3)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-13 18:51:16 +00:00
dependabot[bot] aa0095aae9
Bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.41.0 to 2.41.1.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.41.0...maven/2.41.1)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-04 18:13:42 +00:00
dependabot[bot] fb90276710
Bump ch.qos.logback:logback-classic
Bumps [ch.qos.logback:logback-classic](https://github.com/qos-ch/logback) from 1.2.12 to 1.2.13.
- [Commits](https://github.com/qos-ch/logback/compare/v_1.2.12...v_1.2.13)

---
updated-dependencies:
- dependency-name: ch.qos.logback:logback-classic
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-01 18:30:15 +00:00
dependabot[bot] c69de4a083
Bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.40.0 to 2.41.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.40.0...lib/2.41.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-11-27 18:19:07 +00:00
Karl Nilsson f67058ac6c
Merge pull request #9830 from rabbitmq/mc-refinements
Message container conversion improvements
2023-11-24 10:22:34 +00:00
Karl Nilsson 61f13d0bb7 Add UUID conversion to mc_mqtt 2023-11-23 17:36:34 +00:00
Michael Klishin 1b642353ca
Update (c) according to [1]
1. https://investors.broadcom.com/news-releases/news-release-details/broadcom-and-vmware-intend-close-transaction-november-22-2023
2023-11-21 23:18:22 -05:00
David Ansari 95c5f2ec9e Some small fixes 2023-11-16 12:33:17 +01:00
Karl Nilsson c4fd947aad MC: various changes and improvements
To refine conversion behaviour add additional tests
and ensure it matches the documentation.

mc: optionally capture source environment

And pass target environment to mc:convert

This allows environmental data and configuration to be captured and
used to modify and complete conversion logic whilst allowing conversion
code to remain pure and portable.
2023-11-15 11:04:49 +00:00
Michael Klishin e52772057c
Merge pull request #9874 from rabbitmq/opt-mgmt-queue-listings
Optimise HTTP API /queues endpoint
2023-11-07 14:30:42 -05:00
dependabot[bot] 37acb08feb
Bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.2.1 to 3.2.2.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.2.1...surefire-3.2.2)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-11-07 18:25:28 +00:00
David Ansari 5b18e9d982 Avoid warning of unhandled message
Prior to this commit:
1. Start RabbitMQ with MQTT plugin enabled.
2.
```
rabbitmq-diagnostics consume_event_stream
^C
```
3. The logs will print the following warning:
```
[warning] <0.570.0> ** Undefined handle_info in rabbit_mqtt_internal_event_handler
[warning] <0.570.0> ** Unhandled message: {'DOWN',#Ref<0.2410135134.1846280193.145044>,process,
[warning] <0.570.0>                               <52723.100.0>,noconnection}
[warning] <0.570.0>
```

This is because rabbit_event_consumer:init/1 monitors the CLI process.

Any rabbit_event handler should therefore implement handle_info/2.

It's similar to what's described in the gen_event docs about
add_sup_handler/3:
> Any event handler attached to an event manager which in turn has a
> supervised handler should expect callbacks of the shape
> Module:handle_info({'EXIT', Pid, Reason}, State).
2023-11-07 12:33:02 +01:00
dependabot[bot] 0664b7ec7b
Bump org.junit.jupiter:junit-jupiter
Bumps [org.junit.jupiter:junit-jupiter](https://github.com/junit-team/junit5) from 5.10.0 to 5.10.1.
- [Release notes](https://github.com/junit-team/junit5/releases)
- [Commits](https://github.com/junit-team/junit5/compare/r5.10.0...r5.10.1)

---
updated-dependencies:
- dependency-name: org.junit.jupiter:junit-jupiter
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-11-06 18:46:05 +00:00
Karl Nilsson c2cd60b18d Optimise mgmt HTTP API /queues endpoint
Listing queues with the HTTP API when there are many (1000s) of
quorum queues could be excessively slow compared to the same scenario
with classic queues.

This optimises various aspects of HTTP API queue listings.
For QQs it removes the expensive cluster wide rpcs used to get the
"online" status of each quorum queue. This was previously done _before_
paging and thus would perform a cluster-wide query for _each_ quorum queue in
the vhost/system. This accounted for most of the slowness compared to
classic queues.

Secondly the query to separate the running from the down queues
consisted of two separate queries that later were combined when a single
query would have sufficed.

This commit also includes a variety of other improvements and minor
fixes discovered during testing and optimisation.

MINOR BREAKING CHANGE: quorum queues would previously only display one
of two states: running or down. Now there is a new state called minority
which is emitted when the queue has at least one member running but
cannot commit entries due to lack of quorum.

Also the quorum queue may transiently enter the down state when a node
goes down and before its elected a new leader.
2023-11-06 15:34:26 +00:00
David Ansari d1940c997e Fix Khepri flake
Previously, test pubsub was flaky
```
{shared_SUITE,pubsub,766}
{test_case_failed,missing m1}
```
because the binding wasn't present yet on node 0 when publishing to node
0.
2023-11-03 09:52:53 +01:00
David Ansari 75e777b833 Provide more debug output for flaky test
In
https://github.com/rabbitmq/rabbitmq-server/actions/runs/6735035912/job/18308551274?pr=9865
test
```
bazel test //deps/rabbitmq_mqtt:v5_SUITE -t- --test_sharding_strategy=disabled \
    --test_env FOCUS="-group [mqtt,cluster_size_1] -case will_delay_equals_session_expiry" \
    --test_env RABBITMQ_METADATA_STORE=mnesia --config=rbe-26 --runs_per_test=100
```
seems to flake.
2023-11-02 18:40:38 +01:00
Iliia Khaprov c577e04b73 Remove POODLE check, we are in the future 2023-11-01 10:53:27 +01:00
David Ansari a5f4317c3f Fix test flake session_takeover_v3_v5
Prior to this commit the follwing test was flaky:
```
bazel test //deps/rabbitmq_mqtt:v5_SUITE -t- --test_sharding_strategy=disabled \
    --test_env FOCUS="-group [mqtt,cluster_size_3] -case session_takeover_v3_v5" \
    --test_env RABBITMQ_METADATA_STORE=khepri --config=rbe-26 --runs_per_test=20
```
because rabbit_misc:maps_any/2 filtered out a destination queue after routing if
that destination queue wasn't associated with any matched binding key.

This commit makes the test green.

However, the root cause of this issue isn't solved:
MQTT 5.0 requires the topic exchange to return matched binding keys for
destination queues such that feature NoLocal, and Subscription
Identifiers work correctly.

The current MQTT plugin relies on session state to be stored
consistently in the database. When a new client connects, the session
state is retrieved from the database and appropriate actions are taken:
e.g. consume from a queue, modify binding arguments, etc.

With Mnesia this consistency was guaranteed thanks to sync transactions
when updating queues and bindings.

Khepri has only eventual consistency semantics. This is problematic for the
MQTT plugin in the session_takeover_v3_v5 test scenario:

1. Client subscribes on node 1 (with v3). Node 1 returns subscription
   success to client.
2. **Thereafter**, another client with the same MQTT client ID connects
   to node 0 (with v5). "Proper" session takeover should take place.
   However due to eventual consistency, the subscription / binding isn't
   present yet on node 0. Therefore the session upgrade from v3 to v5
   does not take place and leads to binding keys being absent when
   messages are routed to the session's queue.
2023-10-30 09:39:49 +01:00
David Ansari 43dfa4d6ac Fix MQTT test flakes
Tests session_reconnect and session_takeover were flaky, specifically
when run under Khepri.

The issue was in the test itself that the connect properties didn't
apply. Therefore, prior to this commit an exclusive queue got created.
2023-10-27 17:30:41 +02:00
David Ansari 7cd91a8fd2 Do not skip test
On `main` branch and v3.12.6 feature flag delete_ra_cluster_mqtt_node is
supported. Instead of skipping the entire test if that feature flag is
not enabled, enable the feature flag and run the test.

More generally:
"Instead of verifying if a feature flag is enabled or not, it's best to enable
it and react from the return value (success or failure).
Mixed version testing always turn off all feature flags by default.
So in the future, even though all nodes supports the mentionned
feature flag, the testcase will still be skipped." [JSP]
2023-10-27 16:49:13 +02:00
David Ansari c8b90488e7 Skip some assertions in mixed version tests
See commit message 00c77e0a1a for details.

In a multi node mixed version cluster where the lower version is
compiled with a different OTP version, anonymous Ra leader queries will
fail with a badfun error if initiated on the higher version and executed
on the leader on the lower version node.
2023-10-27 15:12:51 +02:00
David Ansari cad067f5fa Remove erlang:port_command/2 hack
as selective receives are efficient in OTP 26:
```
OTP-18431
Application(s):
compiler, stdlib
Related Id(s):
PR-6739
Improved the selective receive optimization, which can now be enabled for references returned from other functions.
This greatly improves the performance of gen_server:send_request/3, gen_server:wait_response/2, and similar functions.
```
2023-10-27 10:37:56 +02:00
Michael Klishin 673548f343
Merge pull request #9805 from rabbitmq/skip-maintenance
Skip test maintenance in mixed version mode
2023-10-27 03:31:27 -04:00
Michael Klishin aaed0e676b
Merge pull request #9807 from rabbitmq/unskip-duplicate-client-id
Run test duplicate_client_id with Khepri
2023-10-27 03:31:13 -04:00
David Ansari c914e43097 Do not enable OTP feature at runtime
Since Erlang/OTP 26:
```
OTP-18445
Application(s):
erts, stdlib
It is no longer necessary to enable a feature in the runtime system in order to load modules that are using it.
It is sufficient to enable the feature in the compiler when compiling it.
That means that to use feature maybe_expr in Erlang/OTP 26, it is sufficient to enable it during compilation.
In Erlang/OTP 27, feature maybe_expr will be enabled by default, but it will be possible to disable it.
```
2023-10-27 09:26:31 +02:00
David Ansari 5cbeedffd9 Run test duplicate_client_id with Khepri
The only reason to skip tests with Khepri is if they use
classic mirror queues or use Mnesia directly.
2023-10-27 09:15:14 +02:00
David Ansari 00c77e0a1a Skip test maintenance in mixed version mode
This test fails when MQTT client ID tracking is performed in Ra, and the
higher version node gets compiled with a different OTP version (26) than
the lower version node (25).

The reason is described in 83eede7ef2
```
An interesting side note learned here is that the compiled file
rabbit_mqtt_collector must not be changed. This commit only modifies
function specs. However as soon as the compiled code is changed, this
module becomes a new version. The new version causes the anonymous ra query
function to fail in mixed clusters: When the old node does a
ra:leader_query where the leader is on the new node, the query function
fails on the new node with `badfun` because the new node does not have
the same module version. For more context, read:
https://web.archive.org/web/20181017104411/http://www.javalimit.com/2010/05/passing-funs-to-other-erlang-nodes.html
```

We shouldn’t use an anonymous function for ra:leader_query or ra:consistent_query.
Instead we should use the {M,F,A} form.
9e5d437a0a/src/ra.erl (L102-L103)

In MQTT the anonymous function is used in bcb95c949d/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl (L50)
This causes the query to return a bad fun error (silently ignored in bcb95c949d/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl (L70-L71) )
when executed on a different node and either:
1.) Any code in file rabbit_mqtt_collector.erl changed, or
2.) The code gets compiled with a different OTP version.

2.) is the reason for a failing mixed version test in https://github.com/rabbitmq/rabbitmq-server/pull/8553 because both higher and lower versions run OTP 26,
but the higher version node got compiled with 26 while the lower version node got compiled with 25.

The same file
compiled with OTP 26.0.1
```
1> rabbit_mqtt_collector:module_info(attributes).
[{vsn,[30045739264236496640687548892374951597]}]
```

compiled with OTP 25.3.2
```
1> rabbit_mqtt_collector:module_info(attributes).
[{vsn,[168144385419873449889532520247510637232]}]
```

Due to the very low impact that maintenance mode will not close all MQTT
client connections with feature flag delete_ra_cluster_mqtt_node being
disabled, we skip this test.
2023-10-27 08:51:00 +02:00
dependabot[bot] 1d9524ea98
Bump com.rabbitmq:amqp-client
Bumps [com.rabbitmq:amqp-client](https://github.com/rabbitmq/rabbitmq-java-client) from 5.19.0 to 5.20.0.
- [Release notes](https://github.com/rabbitmq/rabbitmq-java-client/releases)
- [Commits](https://github.com/rabbitmq/rabbitmq-java-client/compare/v5.19.0...v5.20.0)

---
updated-dependencies:
- dependency-name: com.rabbitmq:amqp-client
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-10-25 18:55:23 +00:00
dependabot[bot] 7f45bb1949
Bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.1.2 to 3.2.1.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.1.2...surefire-3.2.1)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-10-24 18:27:56 +00:00
David Ansari 3501a00632 Fix MQTT Topic Alias type spec 2023-10-05 18:11:26 +02:00
dependabot[bot] 83aa17cd7f
Bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.39.0 to 2.40.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.39.0...lib/2.40.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-09-29 18:58:01 +00:00
Diana Parra Corbacho 5f0981c5a3
Allow to use Khepri database to store metadata instead of Mnesia
[Why]

Mnesia is a very powerful and convenient tool for Erlang applications:
it is a persistent disc-based database, it handles replication accross
multiple Erlang nodes and it is available out-of-the-box from the
Erlang/OTP distribution. RabbitMQ relies on Mnesia to manage all its
metadata:

* virtual hosts' properties
* intenal users
* queue, exchange and binding declarations (not queues data)
* runtime parameters and policies
* ...

Unfortunately Mnesia makes it difficult to handle network partition and,
as a consequence, the merge conflicts between Erlang nodes once the
network partition is resolved. RabbitMQ provides several partition
handling strategies but they are not bullet-proof. Users still hit
situations where it is a pain to repair a cluster following a network
partition.

[How]

@kjnilsson created Ra [1], a Raft consensus library that RabbitMQ
already uses successfully to implement quorum queues and streams for
instance. Those queues do not suffer from network partitions.

We created Khepri [2], a new persistent and replicated database engine
based on Ra and we want to use it in place of Mnesia in RabbitMQ to
solve the problems with network partitions.

This patch integrates Khepri as an experimental feature. When enabled,
RabbitMQ will store all its metadata in Khepri instead of Mnesia.

This change comes with behavior changes. While Khepri remains disabled,
you should see no changes to the behavior of RabbitMQ. If there are
changes, it is a bug. After Khepri is enabled, there are significant
changes of behavior that you should be aware of.

Because it is based on the Raft consensus algorithm, when there is a
network partition, only the cluster members that are in the partition
with at least `(Number of nodes in the cluster ÷ 2) + 1` number of nodes
can "make progress". In other words, only those nodes may write to the
Khepri database and read from the database and expect a consistent
result.

For instance in a cluster of 5 RabbitMQ nodes:
* If there are two partitions, one with 3 nodes, one with 2 nodes, only
  the group of 3 nodes will be able to write to the database.
* If there are three partitions, two with 2 nodes, one with 1 node, none
  of the group can write to the database.

Because the Khepri database will be used for all kind of metadata, it
means that RabbitMQ nodes that can't write to the database will be
unable to perform some operations. A list of operations and what to
expect is documented in the associated pull request and the RabbitMQ
website.

This requirement from Raft also affects the startup of RabbitMQ nodes in
a cluster. Indeed, at least a quorum number of nodes must be started at
once to allow nodes to become ready.

To enable Khepri, you need to enable the `khepri_db` feature flag:

    rabbitmqctl enable_feature_flag khepri_db

When the `khepri_db` feature flag is enabled, the migration code
performs the following two tasks:
1. It synchronizes the Khepri cluster membership from the Mnesia
   cluster. It uses `mnesia_to_khepri:sync_cluster_membership/1` from
   the `khepri_mnesia_migration` application [3].
2. It copies data from relevant Mnesia tables to Khepri, doing some
   conversion if necessary on the way. Again, it uses
   `mnesia_to_khepri:copy_tables/4` from `khepri_mnesia_migration` to do
   it.

This can be performed on a running standalone RabbitMQ node or cluster.
Data will be migrated from Mnesia to Khepri without any service
interruption. Note that during the migration, the performance may
decrease and the memory footprint may go up.

Because this feature flag is considered experimental, it is not enabled
by default even on a brand new RabbitMQ deployment.

More about the implementation details below:

In the past months, all accesses to Mnesia were isolated in a collection
of `rabbit_db*` modules. This is where the integration of Khepri mostly
takes place: we use a function called `rabbit_khepri:handle_fallback/1`
which selects the database and perform the query or the transaction.
Here is an example from `rabbit_db_vhost`:

* Up until RabbitMQ 3.12.x:

        get(VHostName) when is_binary(VHostName) ->
            get_in_mnesia(VHostName).

* Starting with RabbitMQ 3.13.0:

        get(VHostName) when is_binary(VHostName) ->
            rabbit_khepri:handle_fallback(
              #{mnesia => fun() -> get_in_mnesia(VHostName) end,
                khepri => fun() -> get_in_khepri(VHostName) end}).

This `rabbit_khepri:handle_fallback/1` function relies on two things:
1. the fact that the `khepri_db` feature flag is enabled, in which case
   it always executes the Khepri-based variant.
4. the ability or not to read and write to Mnesia tables otherwise.

Before the feature flag is enabled, or during the migration, the
function will try to execute the Mnesia-based variant. If it succeeds,
then it returns the result. If it fails because one or more Mnesia
tables can't be used, it restarts from scratch: it means the feature
flag is being enabled and depending on the outcome, either the
Mnesia-based variant will succeed (the feature flag couldn't be enabled)
or the feature flag will be marked as enabled and it will call the
Khepri-based variant. The meat of this function really lives in the
`khepri_mnesia_migration` application [3] and
`rabbit_khepri:handle_fallback/1` is a wrapper on top of it that knows
about the feature flag.

However, some calls to the database do not depend on the existence of
Mnesia tables, such as functions where we need to learn about the
members of a cluster. For those, we can't rely on exceptions from
Mnesia. Therefore, we just look at the state of the feature flag to
determine which database to use. There are two situations though:

* Sometimes, we need the feature flag state query to block because the
  function interested in it can't return a valid answer during the
  migration. Here is an example:

        case rabbit_khepri:is_enabled(RemoteNode) of
            true  -> can_join_using_khepri(RemoteNode);
            false -> can_join_using_mnesia(RemoteNode)
        end

* Sometimes, we need the feature flag state query to NOT block (for
  instance because it would cause a deadlock). Here is an example:

        case rabbit_khepri:get_feature_state() of
            enabled -> members_using_khepri();
            _       -> members_using_mnesia()
        end

Direct accesses to Mnesia still exists. They are limited to code that is
specific to Mnesia such as classic queue mirroring or network partitions
handling strategies.

Now, to discover the Mnesia tables to migrate and how to migrate them,
we use an Erlang module attribute called
`rabbit_mnesia_tables_to_khepri_db` which indicates a list of Mnesia
tables and an associated converter module. Here is an example in the
`rabbitmq_recent_history_exchange` plugin:

    -rabbit_mnesia_tables_to_khepri_db(
       [{?RH_TABLE, rabbit_db_rh_exchange_m2k_converter}]).

The converter module  — `rabbit_db_rh_exchange_m2k_converter` in this
example  — is is fact a "sub" converter module called but
`rabbit_db_m2k_converter`. See the documentation of a `mnesia_to_khepri`
converter module to learn more about these modules.

[1] https://github.com/rabbitmq/ra
[2] https://github.com/rabbitmq/khepri
[3] https://github.com/rabbitmq/khepri_mnesia_migration

See #7206.

Co-authored-by: Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com>
Co-authored-by: Diana Parra Corbacho <dparracorbac@vmware.com>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
2023-09-29 16:00:11 +02:00
dependabot[bot] 88bb95c3ea
Bump com.rabbitmq:amqp-client
Bumps [com.rabbitmq:amqp-client](https://github.com/rabbitmq/rabbitmq-java-client) from 5.18.0 to 5.19.0.
- [Release notes](https://github.com/rabbitmq/rabbitmq-java-client/releases)
- [Commits](https://github.com/rabbitmq/rabbitmq-java-client/compare/v5.18.0...v5.19.0)

---
updated-dependencies:
- dependency-name: com.rabbitmq:amqp-client
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-09-27 18:09:42 +00:00
David Ansari d8ecc66a8e Do not confirm MQTT messages if in partition
Since MQTT publishers might publish to classic mirrored queues,
add the same check as in rabbit_channel:send_confirms_and_nacks/1:

```
If we are in a minority and pause_minority mode then a) we are
going to shut down imminently and b) we should not confirm anything
until then, since anything we confirm is likely to be lost.
```
2023-09-22 17:16:43 +02:00
Arnaud Cogoluègnes e22fcd70fe
Make MC conversion function return ok or error 2023-09-18 18:32:59 +02:00
Rin Kuryloski 214c5dd2d3 Add missing dep in rabbitmq_mqtt/Makefile 2023-09-13 10:44:37 +02:00
David Ansari 1eeb9b3a4f Fix MQTT test flake 2023-09-12 10:22:30 +02:00
David Ansari 62710f576e Add integration test MQTT 5.0 -> Stream
Add an integration test that sends via MQTT 5.0,
converts the MQTT message to an AMQP 1.0 message via mc_mqtt,
and consumes via the Stream protocol.
2023-09-05 11:04:44 +02:00
David Ansari 8b4a26be12 Fix test flake
Fix the followin flake:
```
[ERROR] com.rabbitmq.mqtt.test.MqttTest.sessionRedelivery(TestInfo) -- Time elapsed: 0.959 s <<< FAILURE!
org.opentest4j.AssertionFailedError:
Expecting value to be false but was true
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at com.rabbitmq.mqtt.test.MqttTest.sessionRedelivery(MqttTest.java:523)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
[ERROR] Failures:
[ERROR]   MqttTest.sessionRedelivery:523
Expecting value to be false but was true
```
2023-09-04 10:20:30 +02:00
David Ansari 81cbc0a7f0 Close MQTT connection if sending fails
Why:
https://github.com/rabbitmq/rabbitmq-server/discussions/9196 reports
that failures in sending can cause huge log amounts and eventually even
memory alarms.

How:
Similar to rabbit_writer and rabbit_amqp1_0_writer, when sending on a
network connection fails, the connection process will be terminated
immediately.
2023-09-04 09:51:57 +02:00
Karl Nilsson 119f034406
Message Containers (#5077)
This PR implements an approach for a "protocol (data format) agnostic core" where the format of the message isn't converted at point of reception.

Currently all non AMQP 0.9.1 originating messages are converted into a AMQP 0.9.1 flavoured basic_message record before sent to a queue. If the messages are then consumed by the originating protocol they are converted back from AMQP 0.9.1. For some protocols such as MQTT 3.1 this isn't too expensive as MQTT is mostly a fairly easily mapped subset of AMQP 0.9.1 but for others such as AMQP 1.0 the conversions are awkward and in some cases lossy even if consuming from the originating protocol.

This PR instead wraps all incoming messages in their originating form into a generic, extensible message container type (mc). The container module exposes an API to get common message details such as size and various properties (ttl, priority etc) directly from the source data type. Each protocol needs to implement the mc behaviour such that when a message originating form one protocol is consumed by another protocol we convert it to the target protocol at that point.

The message container also contains annotations, dead letter records and other meta data we need to record during the lifetime of a message. The original protocol message is never modified unless it is consumed.

This includes conversion modules to and from amqp, amqpl (AMQP 0.9.1) and mqtt.


COMMIT HISTORY:

* Refactor away from using the delivery{} record

In many places including exchange types. This should make it
easier to move towards using a message container type instead of
basic_message.

Add mc module and move direct replies outside of exchange

Lots of changes incl classic queues

Implement stream support incl amqp conversions

simplify mc state record

move mc.erl

mc dlx stuff

recent history exchange

Make tracking work

But doesn't take a protocol agnostic approach as we just convert
everything into AMQP legacy and back. Might be good enough for now.

Tracing as a whole may want a bit of a re-vamp at some point.

tidy

make quorum queue peek work by legacy conversion

dead lettering fixes

dead lettering fixes

CMQ fixes

rabbit_trace type fixes

fixes

fix

Fix classic queue props

test assertion fix

feature flag and backwards compat

Enable message_container feature flag in some SUITEs

Dialyzer fixes

fixes

fix

test fixes

Various

Manually update a gazelle generated file

until a gazelle enhancement can be made
https://github.com/rabbitmq/rules_erlang/issues/185

Add message_containers_SUITE to bazel

and regen bazel files with gazelle from rules_erlang@main

Simplify essential proprty access

Such as durable, ttl and priority by extracting them into annotations
at message container init time.

Move type

to remove dependenc on amqp10 stuff in mc.erl

mostly because I don't know how to make bazel do the right thing

add more stuff

Refine routing header stuff

wip

Cosmetics

Do not use "maybe" as type name as "maybe" is a keyword since OTP 25
which makes Erlang LS complain.

* Dedup death queue names

* Fix function clause crashes

Fix failing tests in the MQTT shared_SUITE:
A classic queue message ID can be undefined as set in
fbe79ff47b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl (L1048)

Fix failing tests in the MQTT shared_SUITE-mixed:
When feature flag message_containers is disabled, the
message is not an #mc{} record, but a #basic_message{} record.

* Fix is_utf8_no_null crash

Prior to this commit, the function crashed if invalid UTF-8 was
provided, e.g.:
```
1> rabbit_misc:is_valid_shortstr(<<"😇"/utf16>>).
** exception error: no function clause matching rabbit_misc:is_utf8_no_null(<<216,61,222,7>>) (rabbit_misc.erl, line 1481)
```

* Implement mqtt mc behaviour

For now via amqp translation.

This is still work in progress, but the following SUITEs pass:
```
make -C deps/rabbitmq_mqtt ct-shared t=[mqtt,v5,cluster_size_1] FULL=1
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_1] FULL=1
```

* Shorten mc file names

Module name length matters because for each persistent message the #mc{}
record is persisted to disk.

```
1> iolist_size(term_to_iovec({mc, rabbit_mc_amqp_legacy})).
30
2> iolist_size(term_to_iovec({mc, mc_amqpl})).
17
```

This commit renames the mc modules:
```
ag -l rabbit_mc_amqp_legacy | xargs sed -i 's/rabbit_mc_amqp_legacy/mc_amqpl/g'
ag -l rabbit_mc_amqp | xargs sed -i 's/rabbit_mc_amqp/mc_amqp/g'
ag -l rabbit_mqtt_mc | xargs sed -i 's/rabbit_mqtt_mc/mc_mqtt/g'
```

* mc: make deaths an annotation + fixes

* Fix mc_mqtt protocol_state callback

* Fix test will_delay_node_restart

```
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_3]:will_delay_node_restart FULL=1
```

* Bazel run gazelle

* mix format rabbitmqctl.ex

* Ensure ttl annotation is refelected in amqp legacy protocol state

* Fix id access in message store

* Fix rabbit_message_interceptor_SUITE

* dializer fixes

* Fix rabbit:rabbit_message_interceptor_SUITE-mixed

set_annotation/3 should not result in duplicate keys

* Fix MQTT shared_SUITE-mixed

Up to 3.12 non-MQTT publishes were always QoS 1 regardless of delivery_mode.
75a953ce28/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl (L2075-L2076)
From now on, non-MQTT publishes are QoS 1 if durable.
This makes more sense.

The MQTT plugin must send a #basic_message{} to an old node that does
not understand message containers.

* Field content of 'v1_0.data' can be binary

Fix
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed \
    --test_env FOCUS="-group [mqtt,v4,cluster_size_1] -case trace" \
    -t- --test_sharding_strategy=disabled
```

* Remove route/2 and implement route/3 for all exchange types.

This removes the route/2 callback from rabbit_exchange_type and
makes route/3 mandatory instead. This is a breaking change and
will require all implementations of exchange types to update their
code, however this is necessary anyway for them to correctly handle
the mc type.

stream filtering fixes

* Translate directly from MQTT to AMQP 0.9.1

* handle undecoded properties in mc_compat

amqpl: put clause in right order

recover death deatails from amqp data

* Replace callback init_amqp with convert_from

* Fix return value of lists:keyfind/3

* Translate directly from AMQP 0.9.1 to MQTT

* Fix MQTT payload size

MQTT payload can be a list when converted from AMQP 0.9.1 for example

First conversions tests

Plus some other conversion related fixes.

bazel

bazel

translate amqp 1.0 null to undefined

mc: property/2 and correlation_id/message_id return type tagged values.

To ensure we can support a variety of types better.

The type type tags are AMQP 1.0 flavoured.

fix death recovery

mc_mqtt: impl new api

Add callbacks to allow protocols to compact data before storage

And make readable if needing to query things repeatedly.

bazel fix

* more decoding

* tracking mixed versions compat

* mc: flip default of `durable` annotation to save some data.

Assuming most messages are durable and that in memory messages suffer less
from persistence overhead it makes sense for a non existent `durable`
annotation to mean durable=true.

* mc conversion tests and tidy up

* mc make x_header unstrict again

* amqpl: death record fixes

* bazel

* amqp -> amqpl conversion test

* Fix crash in mc_amqp:size/1

Body can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0/ ct-system t=java
```
on branch native-amqp.

* Fix crash in lists:flatten/1

Data can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp.

* Fix crash in rabbit_writer

Running test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp resulted in the following crash:
```
crasher:
  initial call: rabbit_writer:enter_mainloop/2
  pid: <0.711.0>
  registered_name: []
  exception error: bad argument
    in function  size/1
       called as size([<<0>>,<<"Sw">>,[<<160,2>>,<<"hi">>]])
       *** argument 1: not tuple or binary
    in call from rabbit_binary_generator:build_content_frames/7 (rabbit_binary_generator.erl, line 89)
    in call from rabbit_binary_generator:build_simple_content_frames/4 (rabbit_binary_generator.erl, line 61)
    in call from rabbit_writer:assemble_frames/5 (rabbit_writer.erl, line 334)
    in call from rabbit_writer:internal_send_command_async/3 (rabbit_writer.erl, line 365)
    in call from rabbit_writer:handle_message/2 (rabbit_writer.erl, line 265)
    in call from rabbit_writer:handle_message/3 (rabbit_writer.erl, line 232)
    in call from rabbit_writer:mainloop1/2 (rabbit_writer.erl, line 223)
```
because #content.payload_fragments_rev is currently supposed to
be a flat list of binaries instead of being an iolist.

This commit fixes this crash inefficiently by calling
iolist_to_binary/1. A better solution would be to allow AMQP legacy's #content.payload_fragments_rev
to be an iolist.

* Add accidentally deleted line back

* mc: optimise mc_amqp internal format

By removint the outer records for message and delivery annotations
as well as application properties and footers.

* mc: optimis mc_amqp map_add by using upsert

* mc: refactoring and bug fixes

* mc_SUITE routingheader assertions

* mc remove serialize/1 callback as only used by amqp

* mc_amqp: avoid returning a nested list from protocol_state

* test and bug fix

* move infer_type to mc_util

* mc fixes and additiona assertions

* Support headers exchange routing for MQTT messages

When a headers exchange is bound to the MQTT topic exchange, routing
will be performend based on both MQTT topic (by the topic exchange) and
MQTT User Property (by the headers exchange).

This combines the best worlds of both MQTT 5.0 and AMQP 0.9.1 and
enables powerful routing topologies.

When the User Property contains the same name multiple times, only the
last name (and value) will be considered by the headers exchange.

* Fix crash when sending from stream to amqpl

When publishing a message via the stream protocol and consuming it via
AMQP 0.9.1, the following crash occurred prior to this commit:
```
crasher:
  initial call: rabbit_channel:init/1
  pid: <0.818.0>
  registered_name: []
  exception exit: {{badmatch,undefined},
                   [{rabbit_channel,handle_deliver0,4,
                                    [{file,"rabbit_channel.erl"},
                                     {line,2728}]},
                    {lists,foldl,3,[{file,"lists.erl"},{line,1594}]},
                    {rabbit_channel,handle_cast,2,
                                    [{file,"rabbit_channel.erl"},
                                     {line,728}]},
                    {gen_server2,handle_msg,2,
                                 [{file,"gen_server2.erl"},{line,1056}]},
                    {proc_lib,wake_up,3,
                              [{file,"proc_lib.erl"},{line,251}]}]}
```

This commit first gives `mc:init/3` the chance to set exchange and
routing_keys annotations.
If not set, `rabbit_stream_queue` will set these annotations assuming
the message was originally published via the stream protocol.

* Support consistent hash exchange routing for MQTT 5.0

When a consistent hash exchange is bound to the MQTT topic exchange,
MQTT 5.0 messages can be routed to queues consistently based on the
Correlation-Data in the PUBLISH packet.

* Convert MQTT 5.0 User Property

* to AMQP 0.9.1 headers
* from AMQP 0.9.1 headers
* to AMQP 1.0 application properties and message annotations
* from AMQP 1.0 application properties and message annotations

* Make use of Annotations in mc_mqtt:protocol_state/2

mc_mqtt:protocol_state/2 includes Annotations as parameter.
It's cleaner to make use of these Annotations when computing the
protocol state instead of relying on the caller (rabbitmq_mqtt_processor)
to compute the protocol state.

* Enforce AMQP 0.9.1 field name length limit

The AMQP 0.9.1 spec prohibits field names longer than 128 characters.
Therefore, when converting AMQP 1.0 message annotations, application
properties or MQTT 5.0 User Property to AMQP 0.9.1 headers, drop any
names longer than 128 characters.

* Fix type specs

Apply feedback from Michael Davis

Co-authored-by: Michael Davis <mcarsondavis@gmail.com>

* Add mc_mqtt unit test suite

Implement mc_mqtt:x_header/2

* Translate indicator that payload is UTF-8 encoded

when converting between MQTT 5.0 and AMQP 1.0

* Translate single amqp-value section from AMQP 1.0 to MQTT

Convert to a text representation, if possible, and indicate to MQTT
client that the payload is UTF-8 encoded. This way, the MQTT client will
be able to parse the payload.

If conversion to text representation is not possible, encode the payload
using the AMQP 1.0 type system and indiate the encoding via Content-Type
message/vnd.rabbitmq.amqp.

This Content-Type is not registered.
Type "message" makes sense since it's a message.
Vendor tree "vnd.rabbitmq.amqp" makes sense since merely subtype "amqp" is not
registered.

* Fix payload conversion

* Translate Response Topic between MQTT and AMQP

Translate MQTT 5.0 Response Topic to AMQP 1.0 reply-to address and vice
versa.

The Response Topic must be a UTF-8 encoded string.

This commit re-uses the already defined RabbitMQ target addresses:
```
"/topic/"     RK        Publish to amq.topic with routing key RK
"/exchange/"  X "/" RK  Publish to exchange X with routing key RK
```

By default, the MQTT topic exchange is configure dto be amq.topic using
the 1st target address.

When an operator modifies the mqtt.exchange, the 2nd target address is
used.

* Apply PR feedback

and fix formatting

Co-authored-by: Michael Davis <mcarsondavis@gmail.com>

* tidy up

* Add MQTT message_containers test

* consistent hash exchange: avoid amqp legacy conversion

When hashing on a header value.

* Avoid converting to amqp legacy when using exchange federation

* Fix test flake

* test and dialyzer fixes

* dialyzer fix

* Add MQTT protocol interoperability tests

Test receiving from and sending to MQTT 5.0 and
* AMQP 0.9.1
* AMQP 1.0
* STOMP
* Streams

* Regenerate portions of deps/rabbit/app.bzl with gazelle

I'm not exactly sure how this happened, but gazell seems to have been
run with an older version of the rules_erlang gazelle extension at
some point. This caused generation of a structure that is no longer
used. This commit updates the structure to the current pattern.

* mc: refactoring

* mc_amqpl: handle delivery annotations

Just in case they are included.

Also use iolist_to_iovec to create flat list of binaries when
converting from amqp with amqp encoded payload.

---------

Co-authored-by: David Ansari <david.ansari@gmx.de>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
Co-authored-by: Rin Kuryloski <kuryloskip@vmware.com>
2023-08-31 11:27:13 +01:00
dependabot[bot] 9014f251a4
Bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.38.0 to 2.39.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.38.0...lib/2.39.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-08-29 18:18:29 +00:00
David Ansari fb0df64373 Disallow empty password in MQTT
Setting a username with an empty password is allowed by the MQTT spec:
> If the Password Flag is set to 1, this is the next field in the payload. The Password field contains 0 to 65535 bytes of binary data

RabbitMQ's default auth backend `rabbit_auth_backend_internal` prohibits
empty passwords:
64d69f76ff/deps/rabbit/src/rabbit_auth_backend_internal.erl (L77-L82)

Since some RabbitMQ operators were confused about other auth
backends such as `rabbit_auth_backend_http`, let us prohibit empty MQTT
passwords for all auth backends.
2023-08-16 18:14:15 +02:00
David Ansari 64d69f76ff Bump test timeout
We see on GitHub Actions sporadically:
```
rabbit_ct_broker_helpers:wait_for_rabbitmq_nodes failed on line 378
Reason: {timetrap_timeout,60000}
```
2023-08-16 10:01:48 +02:00
David Ansari 939540f6dd Init dead letter counter once
follow up from https://github.com/rabbitmq/rabbitmq-server/pull/9080

Call rabbit_global_counters:init/1 just once.
2023-08-16 10:01:48 +02:00
David Ansari 484c1071f2 Fix flake in at_most_once_dead_letter_detect_cycle
The test case was flaky:
```
*** CT Error Notification 2023-08-15 14:25:51.016 ***🔗
v5_SUITE:at_most_once_dead_letter_detect_cycle failed on line 871
Reason: {test_case_failed,Received unexpected message: {publish,#{client_pid => <0.227.0>,dup => false,
                                        packet_id => 1,
                                        payload =>
                                            <<"at_most_once_dead_letter_detect_cycle">>,
                                        properties =>
                                            #{'Subscription-Identifier' => 10},
                                        qos => 1,retain => false,
                                        topic => <<"a/b">>,
                                        via => #Port<0.76>}}}
```
2023-08-15 16:43:05 +02:00
David Ansari 0f5fe8fadd Add Prometheus metric messages dropped by MQTT QoS 0 queue type
Why:
A RabbitMQ operator should be able to see whether RabbitMQ drops MQTT
QoS 0 messages due to overload protection. It's an indication that an
MQTT subscriber does not consume fast enough.

How:
Use Prometheus global counters.

There are 2 valid solutions:
1. Introduce a new metric called messages_dropped specifically for the
   rabbitmq_mqtt_qos0_queue type. This would work in a similar fashion
   how streams extends the per protocol global counters, but requires
   extending the per protocol & queue type global counters for the MQTT
   QoS queue type. The emitted metrics would look as follows:
```
rabbitmq_global_messages_dropped_total{protocol="mqtt310",queue_type="rabbit_mqtt_qos0_queue"} 0
rabbitmq_global_messages_dropped_total{protocol="mqtt311",queue_type="rabbit_mqtt_qos0_queue"} 0
rabbitmq_global_messages_dropped_total{protocol="mqtt50",queue_type="rabbit_mqtt_qos0_queue"} 0
```
2. Reuse the existing metric rabbitmq_global_messages_dead_lettered_maxlen_total

This commit decides to go for the 2nd approach because:
a) there is no need to add a new metric. Even though dead lettering is not supported
for the MQTT QoS 0 queue type, this metric maps nicely to
what happens: The queue drop messages since itx max length
(mqtt.mailbox_soft_limit) is exceeded with overflow behaviour
drop-head. Furtheremore the label `dead_letter_strategy="disabled"` tells
that dead lettering is not taking place from this queue type.

b) this metric allows to support dead lettering for the MQTT QoS 0 queue
type in the future.

The new dead lettering metrics look as follows:
```
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_mqtt_qos0_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

rabbitmq_global_messages_dead_lettered_confirmed_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
```
2023-08-15 16:06:15 +02:00
David Ansari 1ab4f79147 Fix MQTT vhost mapping test case
Multiple things were wrong in this test:
1. The test name ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping was wrong.
As written in our docs, the opposite is true
> The certificate-to-vhost mapping with the mqtt_default_vhosts global parameter is considered more specific than the port-to-vhost mapping with the mqtt_port_to_vhost_mapping global parameter and so takes precedence over it.

2. The test case didn't test what it was supposed to test:
The test relies on revoking the permissions that were set for the
port_to_vhost mapping. However revoking these permission did not happen
because:
2.a) The wrong Config was used, and
2.b) The wrong key was used.

2. could be observed by the following CT warning being logged:
```
Could not find element mqtt_port_to_vhost_mapping in Config.
```
2023-08-14 19:01:38 +02:00
David Ansari 8daaa48da8 User short variable instead of long atom name 2023-08-14 19:01:34 +02:00
Michael Klishin 17c9acf8ac
Merge pull request #9034 from rabbitmq/mqtt-nack
Nack rejected messages to MQTT 5.0 client
2023-08-09 19:53:19 +04:00
David Ansari b8f43735a9 Fix crash in MQTT reader
Fix the following crash:
```
crasher:
  initial call: rabbit_mqtt_reader:init/1
  pid: <0.2009.0>
  registered_name: []
  exception exit: {bad_return_value,
                   {stop,
                    {shutdown,consuming_queue_down,
                     {state,#Port<0.163>,undefined,true,undefined,4,
                      {state,
```
2023-08-09 15:37:51 +02:00
David Ansari 2a4301e12d Nack rejected messages to MQTT 5.0 client
since MQTT 5.0 supports negative acknowledgements thanks to reason codes
in the PUBACK packet.

We could either choose reason code 128 or 131. The description code for
131 applies for rejected messages, hence this commit uses 131:
> The PUBLISH is valid but the receiver is not willing to accept it.
2023-08-09 15:31:14 +02:00
Diana Parra Corbacho 7d06f806c2
Mqtt: filter test events 2023-08-07 17:21:04 +02:00
Diana Parra Corbacho 57aa7c0343 mqtt shared_SUITE: another case of a time dependent test
Longer wait for expiry. On slow machines, as CI, we can't guarantee
that the expiration will happen at the exact time
2023-08-03 10:40:23 +02:00
dependabot[bot] efcd91db31
Bump org.junit.jupiter:junit-jupiter
Bumps [org.junit.jupiter:junit-jupiter](https://github.com/junit-team/junit5) from 5.9.3 to 5.10.0.
- [Release notes](https://github.com/junit-team/junit5/releases)
- [Commits](https://github.com/junit-team/junit5/compare/r5.9.3...r5.10.0)

---
updated-dependencies:
- dependency-name: org.junit.jupiter:junit-jupiter
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-07-24 18:33:05 +00:00
David Ansari 5208d729b3 Remove amqp_client dep from rabbitmq_mqtt
Since Native MQTT in 3.12 the MQTT plugin does not depend on AMQP 0.9.1
client anymore. Such a dependency is only needed for the MQTT tests.
2023-07-20 08:09:46 +00:00
dependabot[bot] 7ec082473c
Bump spotless-maven-plugin in /deps/rabbitmq_mqtt/test/java_SUITE_data
Bumps [spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.37.0 to 2.38.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.37.0...lib/2.38.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-07-18 18:22:48 +00:00
David Ansari 52262bb6e9 Rename to x-opt-reply-to-topic
due to AMQP 1.0 spec:
> The annotations type is a map where the keys are restricted to be of type symbol or of type ulong.
All ulong keys, and all symbolic keys except those beginning with "x-" are reserved.
Keys beginning with "x-opt-" MUST be ignored if not understood.
On receiving an annotation key which is not understood, and which does not begin with "x-opt",
the receiving AMQP container MUST detach the link with a not-implemented error.

So, for new headers being introduced it makes sense to comply with that
AMQP 1.0 requirement. We don't want the receiver to force to understand
this header.
2023-07-14 17:25:28 +02:00
David Ansari 5fa35d4890 Fix MQTT test flake
```
//deps/rabbitmq_mqtt:shared_SUITE-mixed
--test_env FOCUS="-group [web_mqtt,v4,cluster_size_1]
-case non_clean_sess_reconnect_qos0_and_qos1"
```

was flaky.

The DISCONNECT packet is sent aysnc from client to server.

See
https://github.com/rabbitmq/rabbitmq-server/actions/runs/5553886147/attempts/1?pr=5077
for an instance of that flake.
2023-07-14 16:13:13 +02:00
David Ansari ed31a818c3 Make mqtt.subscription_ttl unsupported
Starting with RabbitMQ 3.13 mqtt.max_session_expiry_interval_seconds
(set in seconds) will replace the previous setting
mqtt.subscription_ttl.

MQTT 5.0 introduces the Session Expiry Interval
feature which does not only apply to subscribers, but also to
publishers.

The new config name mqtt.max_session_expiry_interval_seconds makes it clear
that it also applies to publishers.

Prior to this commit, when mqtt.subscription_ttl was set, a warning got
logged and the value was ignored. This is dangerous if an operator does
not see the warning but relies for example on `mqtt.subscription =
infinity` to not expire non clean session.

It's safer to make the boot fail if that unsupported config name is
still set. A clear error message will be logged:
```
[error] <0.142.0> Error preparing configuration in phase apply_translations:
[error] <0.142.0>   - Translation for 'rabbitmq_mqtt.subscription_ttl' found invalid configuration:
        Since 3.13 mqtt.subscription_ttl (in milliseconds) is unsupported.
        Use mqtt.max_session_expiry_interval_seconds (in seconds) instead.
```

Alternatively, RabbitMQ could translate mqtt.subscription_ttl to
mqtt.max_session_expiry_interval_seconds.

However, forcing the new config option sounds the better way to go.

Once we write MQTT 5.0 docs, this change must go into the 3.13 release notes.

This commit also renames max_session_expiry_interval_secs to max_session_expiry_interval_seconds.
The latter is clearer to users.
2023-07-13 14:47:33 +00:00
David Ansari d705f9d03d Fix parsing of MQTT PUBACK packet
The MQTT v5 spec is a bit contradictory and imprecise when it comes to the
Property Length.

It first mandates that if there are no properties, this MUST be
indidated by including a Property Length of 0:
```
2.2.2.1 Property Length
The Property Length is encoded as a Variable Byte Integer. The Property Length
does not include the bytes used to encode itself, but includes the length of
the Properties. If there are no properties, this MUST be indicated by including
a Property Length of zero [MQTT-2.2.2-1].
```

All MQTT packet's Property Length sections that only mention:
```
The length of the Properties in the PUBLISH packet Variable Header encoded as a Variable Byte Integer.
```
So, they follow above requirement.

However, MQTT packets PUBACK, PUBREC, PUBREL, PUBCOMP, and DISCONNECT seem to be exceptions to this rule:
```
3.4.2.2 PUBACK Properties
3.4.2.2.1 Property Length
The length of the Properties in the PUBACK packet Variable Header encoded as a Variable Byte Integer.
If the Remaining Length is less than 4 there is no Property Length and the value of 0 is used.
```

```
3.14.2.2 DISCONNECT Properties
3.14.2.2.1 Property Length
The length of Properties in the DISCONNECT packet Variable Header encoded as a Variable Byte Integer.
If the Remaining Length is less than 2, a value of 0 is used.
```

Since this special case has already been implemented for DISCONNECT, and
RabbitMQ does not support QoS 2, this commit implements this special
case for the PUBACK packet.

Some MQTT clients (e.g. mqttjs) indeed set a Remaining Length of 3 in the PUBACK
packet.

See https://github.com/rabbitmq/rabbitmq-server/issues/2554#issuecomment-1620083550
2023-07-04 16:33:44 +02:00
Michael Klishin e2ae3a4680
Merge pull request #8708 from rabbitmq/dependabot/maven/deps/rabbitmq_mqtt/test/java_SUITE_data/main/org.codehaus.mojo-keytool-maven-plugin-1.7
Bump keytool-maven-plugin from 1.5 to 1.7 in /deps/rabbitmq_mqtt/test/java_SUITE_data
2023-06-30 18:08:04 +04:00
dependabot[bot] dcd85dcb5d
Bump logback-classic in /deps/rabbitmq_mqtt/test/java_SUITE_data
Bumps [logback-classic](https://github.com/qos-ch/logback) from 1.2.11 to 1.2.12.
- [Commits](https://github.com/qos-ch/logback/compare/v_1.2.11...v_1.2.12)

---
updated-dependencies:
- dependency-name: ch.qos.logback:logback-classic
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-06-30 14:02:18 +00:00
dependabot[bot] f924a7525b
Bump keytool-maven-plugin in /deps/rabbitmq_mqtt/test/java_SUITE_data
Bumps [keytool-maven-plugin](https://github.com/mojohaus/keytool) from 1.5 to 1.7.
- [Release notes](https://github.com/mojohaus/keytool/releases)
- [Commits](https://github.com/mojohaus/keytool/compare/keytool-1.5...keytool-1.7)

---
updated-dependencies:
- dependency-name: org.codehaus.mojo:keytool-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-06-30 14:02:11 +00:00
Arnaud Cogoluègnes b0dc49150f
Add logback configuration file to MQTT Java test project 2023-06-30 10:03:02 +02:00
Arnaud Cogoluègnes 87d8460751
Add logback dependency to MQTT Java test project
To avoid no-binding warning from SLF4J.
2023-06-30 09:30:10 +02:00
Arnaud Cogoluègnes fbe79ff47b Small cleaning up in MQTT Java tests 2023-06-29 09:44:06 +02:00
Arnaud Cogoluègnes f2206c809c Format MQTT Java tests
Using spotless plugin with Google Java format.

Command to run to format: ./mvnw spotless:apply
2023-06-29 09:44:06 +02:00
Arnaud Cogoluègnes 3a4381229a Refactor MQTT Java tests
Use AssertJ instead of JUnit assertions (more readable API).
Bump dependencies and clean up pom.xml.
2023-06-29 09:44:06 +02:00
David Ansari c7e4984d59 Display MQTT 5 CONNECT User Property in Management UI
and CLI as requested in
https://github.com/rabbitmq/rabbitmq-server/issues/2554#issuecomment-1604205277

"User Properties on the CONNECT packet can be used to send connection related properties from the Client to the Server.
The meaning of these properties is not defined by this specification."
[v5 3.1.2.11.8]

It makes sense to display the User Property of the CONNECT packet in the
Management UI in the connection's Client Properties.
2023-06-26 14:08:05 +02:00
David Ansari b3795f55e6 Fix flaky test
Test
```
//deps/rabbitmq_mqtt:shared_SUITE-mixed --test_env FOCUS="-group [web_mqtt,v3,cluster_size_1] -case block_only_publisher"
```
was flaky:
```
=== Ended at 2023-06-26 07:09:57
=== Location: [{shared_SUITE,block_only_publisher,1323},
              {test_server,ts_tc,1782},
              {test_server,run_test_case_eval1,1291},
              {test_server,run_test_case_eval,1223}]
=== === Reason: no match of right hand side value {error,ack_timeout}
  in function  shared_SUITE:block_only_publisher/1 (shared_SUITE.erl, line 1323)
  in call from test_server:ts_tc/3 (test_server.erl, line 1782)
  in call from test_server:run_test_case_eval1/6 (test_server.erl, line 1291)
  in call from test_server:run_test_case_eval/9 (test_server.erl, line 1223)
```
It seems that the ack_timeout of 1 second was too low for a
subscription.
2023-06-26 13:00:01 +02:00
David Ansari a715eb7756 Attempt to fix flake
Attempt to fix the following flake:
```
=== Ended at 2023-06-26 07:13:34
=== Location: [{shared_SUITE,events,570},
              {test_server,ts_tc,1782},
              {test_server,run_test_case_eval1,1291},
              {test_server,run_test_case_eval,1223}]
=== === Reason: no match of right hand side value []
  in function  shared_SUITE:events/1 (shared_SUITE.erl, line 570)
  in call from test_server:ts_tc/3 (test_server.erl, line 1782)
  in call from test_server:run_test_case_eval1/6 (test_server.erl, line 1291)
  in call from test_server:run_test_case_eval/9 (test_server.erl, line 1223)
```

of test
```
-group [web_mqtt,v3,cluster_size_1] -case events
```

The logs showed that deletion of the exclusive queue took place in the
same millisecond as the server received the DISCONNECT:
```
2023-06-26 07:13:32.838282+00:00 [debug] <0.2494.0> Received a CONNECT, client ID: events, username: undefined, clean start: true, protocol version: 3, keepalive: 60, property names: []
2023-06-26 07:13:32.838436+00:00 [debug] <0.2494.0> MQTT connection 127.0.0.1:38808 -> 127.0.0.1:21007 picked vhost using plugin_configuration_or_default_vhost
2023-06-26 07:13:32.838523+00:00 [debug] <0.2494.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2023-06-26 07:13:32.838672+00:00 [info] <0.2494.0> Accepted Web MQTT connection 127.0.0.1:38808 -> 127.0.0.1:21007 for client ID events
2023-06-26 07:13:33.147196+00:00 [debug] <0.2494.0> Received a SUBSCRIBE with subscription(s) [{mqtt_subscription,<<"my/topic">>,
2023-06-26 07:13:33.147196+00:00 [debug] <0.2494.0>                                             {mqtt_subscription_opts,0,false,
2023-06-26 07:13:33.147196+00:00 [debug] <0.2494.0>                                              false,0,undefined}}]
2023-06-26 07:13:33.457541+00:00 [debug] <0.2494.0> Received an UNSUBSCRIBE for topic filter(s) [<<"my/topic">>]
2023-06-26 07:13:33.762171+00:00 [debug] <0.2494.0> Received a DISCONNECT with reason code 0 and properties #{}
2023-06-26 07:13:33.762350+00:00 [info] <0.2494.0> Web MQTT closing connection 127.0.0.1:38808 -> 127.0.0.1:21007
2023-06-26 07:13:33.762780+00:00 [debug] <0.2504.0> Deleting exclusive queue 'mqtt-subscription-eventsqos0' in vhost '/' because its declaring connection <0.2494.0> was closed
```
However, there could be some delay between disconnecting on the client
WebMQTT side and the processor processing the DISCONNECT packet.
2023-06-26 13:00:01 +02:00
David Ansari 8601ca7557 Use same AMQP 0.9.1 header for correlation ID
For correlation IDs greater than 255 bytes, AMQP 1.0 to AMQP 0.9.1
conversion puts the correlation ID into an AMQP 0.9.1 header called
x-correlation-id (see
a17b28ec61/deps/rabbit/src/rabbit_msg_record.erl (L380)
)
Let's use the same key for the MQTT 5.0 to AMQP 0.9.1 conversion.
2023-06-26 13:00:01 +02:00
David Ansari ff30bb0bef Clear retained messages synchronously
due to the following flake:
```
v5_SUITE:subscription_identifier failed on line 783
Reason: {test_case_failed,Received unexpected message: {publish,#{client_pid => <0.495.0>,dup => false,
                                        packet_id => undefined,
                                        payload => <<"m3">>,properties => #{},
                                        qos => 0,retain => true,
                                        topic => <<"t/3">>,
                                        via => #Port<0.164>}}}
```

Also, log if unexpected message received due to flake in
```
=== Ended at 2023-06-22 14:30:07
=== Location: [{v5_SUITE,will_delay_message_expiry_publish_properties,1597},
              {test_server,ts_tc,1782},
              {test_server,run_test_case_eval1,1291},
              {test_server,run_test_case_eval,1223}]
=== === Reason: {test_case_failed,"did not receive Will Message"}
```
2023-06-23 14:03:15 +02:00
Loïc Hoguin 610af302c6
Add support for LOCAL proxy header
This is what the proxy uses for health checks. In those cases
we use the socket's IP/ports for the connection name as we
have nothing else we can use.
2023-06-23 12:12:58 +02:00
Chunyi Lyu 468985f7ad Remove unused test helpers
- testQueuePropertiesWithCleanSessionSet() was not used
- testQueuePropertiesWithCleanSessionUnset() and
  testQueuePropertiesWithCleanSession() was called only once so removing
the extra layer here to simply structure
2023-06-22 14:36:53 +01:00
Chunyi Lyu 27e0cfce35 Clean up with clean start/session set to true 2023-06-22 12:12:04 +01:00
Chunyi Lyu c301a873aa Remove redundent connect options in test cases
- no need to set username and password unless they are different from
  guest/guest
- clean start and clean session default to true; no need to set
explicitly
2023-06-22 11:35:17 +01:00
GitHub d28fb8cee4 bazel run gazelle 2023-06-22 04:02:27 +00:00
David Ansari 36855b500f Do not run_teardown_steps twice 2023-06-21 17:14:08 +01:00
Chunyi Lyu 5fe691abe6 Run mqtt java suite with 2 shards
- one RMQ cluster per test group
2023-06-21 17:14:08 +01:00
Chunyi Lyu 17ad067259 Duplicate java SSL test for mqtt v5 2023-06-21 17:14:08 +01:00
Chunyi Lyu 147e2d6676 Run Java v5 tests in separate RMQ cluster
- to avoid test poluting since both v3 and v5 java test cases use
the same resource names
2023-06-21 17:14:08 +01:00
David Ansari 0a98c1e986 Fix assertion arguments order
The correct order is
```
assertArrayEquals(Expecteds, Actuals)
```

Do not mark the tests as flaky.
Instead, we want to understand and fix the flakes.
2023-06-21 17:14:08 +01:00
Chunyi Lyu 7aba94468f Add V5 test to java SUITE
- same tests as v3 test suite, with pathos mqtt V5 client.
- two test cases are removed:
1. sessionRedelivery callback behaviors in v3 and v5 client seems
to be different that throwing exception does not shutdown a v5 client
immediately. This test case tests that unacked qos1 msgs are redelivered
by RMQ broker, which is covered in other test suite
2. lastWillDowngradesQoS2 is removed because for mqtt 5, will msgs with
unsupported Qos is an error and not accepted
2023-06-21 17:14:08 +01:00
David Ansari 92addc067e Fix MQTT crash
This commit fixes the following crash:
```
2388-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0>     exception error: bad argument
2389-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0>       in function  lists:keymember/3
2390-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0>          called as lists:keymember(<<"x-mqtt-publish-qos">>,1,undefined)
2391-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0>          *** argument 3: not a list
2392-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0>       in call from rabbit_mqtt_processor:amqp_props_to_mqtt_props/2 (rabbit_mqtt_processor.erl, line 2219)
```

This crash occurs when a message without AMQP 0.9.1 #P_basic.headers
is sent to the MQTT v4 connection process, but consumed by an MQTT v5
connection process.
This is the case when an AMQP 0.9.1 client sends a message to a v4 MQTT
client, and this same client subsequently upgrades to v5 consuming the
message.

When sending from AMQP 0.9.1 client directly to a v5 MQTT connection,
the AMQP 0.9.1 header <<"x-binding-keys">> is set.

When sending from AMQP 0.9.1 client directly to a v4 MQTT connection,
no header is set, but this code branch was not evaluated.
2023-06-21 17:14:08 +01:00
David Ansari 2fec1a22d9 Allow route/3 to return additional infos
Allow rabbitmq_exchange:route_return() to return infos in addition to
the matched binding keys.
For example, some exchange types read the full #amqqueue{} record from the database
and can in future return it from the route/3 function to avoid reading the full
record again in the channel.
2023-06-21 17:14:08 +01:00
David Ansari 3dc799afb5 Fix test expectation
Matching against #{} does not validate that the map is empty.
2023-06-21 17:14:08 +01:00
Chunyi Lyu 08fd9d00e3 Set topic alias when publish retained message
- remove topic alias from message props when storing retained msgs
- set topic alias for outbound before sending retained msgs
2023-06-21 17:14:08 +01:00
David Ansari 0e00e3479e CONNACK with Bad Authentication Method
RabbitMQ MQTT already supports authenticating clients via
username + password, OAuth tokens, and certificates.
We could make use of RabbitMQ SASL mechanisms in the future,
if needed. For now, if the client wants to perform extended
authentication, we return Bad Authentication Method in the CONNACK
packet.
2023-06-21 17:14:08 +01:00
David Ansari 3b65c97184 Add type alias for topic and client ID 2023-06-21 17:14:08 +01:00
Chunyi Lyu 12cdc69572 Test MQTT 5 with proxy protocol suite 2023-06-21 17:14:08 +01:00
Chunyi Lyu 1eac345f1c Run cluster suite against v5 enabled cluster 2023-06-21 17:14:08 +01:00
David Ansari 14d81b430f Add Topic Aliases from server to client
Once the server's Topic Alias cache for messages from server to client
is full, this commit does not replace any existing aliases.
So, the first topics "win" and stay in the cache forever.
This matches the behaviour of VerneMQ and EMQX.
For now that's good enough.
In the future, we can easily change that behaviour to some smarter strategy,
for example
1. Hash the TopicName to an Topic Alias and replace the old
   alias, or
2. For the Topic Alias Cache from server to client, keep 2 Maps:
   #{TopicName => TopicAlias} and #{TopicAlias => TopicName} and a
   counter that wraps to 1 once the Topic Alias Maximum is reached and
   just replace an existing Alias if the TopicName is not cached.

Also, refactor Topic Alias Maximum:
* Remove code duplication
* Allow an operator to prohibit Topic Aliases by allowing value 0 to be
  configured
* Change config name to topic_alias_maximum to that it matches exactly
  the MQTT feture name
* Fix wrong code formatting
* Add the invalid or unkown Topic Alias to log message for easier
  troubleshooting
2023-06-21 17:14:08 +01:00
Chunyi Lyu fd52caa211 Support topic alias from client to broker 2023-06-21 17:14:08 +01:00
Chunyi Lyu 60f6784d30 Make Topic Alias Maximum configurable
- default to 20, configurable through cuttlefish config
- add test to v5 suite for invalid topic alias in publish
2023-06-21 17:14:08 +01:00
David Ansari bb20618b13 Return matched binding keys faster
For MQTT 5.0 destination queues, the topic exchange does not only have
to return the destination queue names, but also the matched binding
keys.
This is needed to implement MQTT 5.0 subscription options No Local,
Retain As Published and Subscription Identifiers.

Prior to this commit, as the trie was walked down, we remembered the
edges being walked and assembled the final binding key with
list_to_binary/1.

list_to_binary/1 is very expensive with long lists (long topic names),
even in OTP 26.
The CPU flame graph showed ~3% of CPU usage was spent only in
list_to_binary/1.

Unfortunately and unnecessarily, the current topic exchange
implementation stores topic levels as lists.

It would be better to store topic levels as binaries:
split_topic_key/1 should ideally use binary:split/3 similar as follows:
```
1> P = binary:compile_pattern(<<".">>).
{bm,#Ref<0.1273071188.1488322568.63736>}
2> Bin = <<"aaa.bbb..ccc">>.
<<"aaa.bbb..ccc">>
3> binary:split(Bin, P, [global]).
[<<"aaa">>,<<"bbb">>,<<>>,<<"ccc">>]
```
The compiled pattern could be placed into persistent term.

This commit decided to avoid migrating Mnesia tables to use binaries
instead of lists. Mnesia migrations are non-trivial, especially with the
current feature flag subsystem.
Furthermore the Mnesia topic tables are already getting migrated to
their Khepri counterparts in 3.13.
Adding additional migration only for Mnesia does not make sense.

So, instead of assembling the binding key as we walk down the trie and
then calling list_to_binary/1 in the leaf, it
would be better to just fetch the binding key from the database in the leaf.

As we reach the leaf of the trie, we know both source and destination.
Unfortunately, we cannot fetch the binding key efficiently with the
current rabbit_route (sorted by source exchange) and
rabbit_reverse_route (sorted by destination) tables as the key is in
the middle between source and destination.
If there are a huge number of bindings for a given sourc exchange (very
realistic in MQTT use cases) or a large number of bindings for a given
destination (also realistic), it would require scanning these large
number of bindings.

Therefore this commit takes the simplest possible solution:
The solution leverages the fact that binding arguments are already part of
table rabbit_topic_trie_binding.
So, if we simply include the binding key into the binding arguments, we
can fetch and return it efficiently in the topic exchange
implementation.

The following patch omitting fetching the empty list binding argument
(the default) makes routing slower because function
`analyze_pattern.constprop.0` requires significantly more (~2.5%) CPU time
```
@@ -273,7 +273,11 @@ trie_bindings(X, Node) ->
                                    node_id       = Node,
                                    destination   = '$1',
                                    arguments     = '$2'}},
-    mnesia:select(?MNESIA_BINDING_TABLE, [{MatchHead, [], [{{'$1', '$2'}}]}]).
+    mnesia:select(
+      ?MNESIA_BINDING_TABLE,
+      [{MatchHead, [{'andalso', {'is_list', '$2'}, {'=/=', '$2', []}}], [{{'$1', '$2'}}]},
+       {MatchHead, [], ['$1']}
+      ]).
```
Hence, this commit always fetches the binding arguments.

All MQTT 5.0 destination queues will create a binding that
contains the binding key in the binding arguments.

Not only does this solution avoid expensive list_to_binay/1 calls, but
it also means that Erlang app rabbit (specifically the topic exchange
implementation) does not need to be aware of MQTT anymore:
It just returns the binding key when the binding args tell to do so.

In future, once the Khepri migration completed, we should be able to
relatively simply remove the binding key from the binding arguments
again to free up some storage space.

Note that one of the advantages of a trie data structue is its space
efficiency that you don't have to store the same prefixes multiple
times.
However, for RabbitMQ the binding key is already stored at least N times
in various routing tables, so storing it a few times more via the
binding arguments should be acceptable.
The speed improvements are favoured over a few more MBs ETS usage.
2023-06-21 17:14:08 +01:00
David Ansari f425f87192 Make retained message stores compatible with pre 3.13
The format of #mqtt_msg{} changes from 3.12 to 3.13.
In 3.13 the record contains 2 additional fields:
* props
* timestamp

The old #mqtt_msg{} might still be stored by the retained message store
in ets or dets.

This commit converts such an old message format when read from the
database.

The alternative would have been to run a migration function over the
whole table which is slightly more complex to implement.

Instead of giving the new message format a different record name,
e.g. #mqtt_msg_v2{}, this commit decides to re-use the same name such
that the new code only handles the record name #mqtt_msg{}.
2023-06-21 17:14:08 +01:00
David Ansari d7882b00dc PUBACK with reason code "No matching subscribers"
Support reason code "No matching subscribers" in PUBACK.

This somewhat corresponds to the `mandatory` message property
in AMQP 0.9.1.
2023-06-21 17:14:08 +01:00
David Ansari 23837c5270 DISCONNECT v5 clients with Server Shutting Down
When RabbitMQ enters maintenance mode / is being drained, all client
connections are closed.

This commit sends a DISCONNECT packet to (Web) MQTT 5.0 clients with
Reason Code "Server shutting down" before the connection is closed.
2023-06-21 17:14:08 +01:00
David Ansari 8c0b0e9338 Support MQTT 5.0 Properties
The following PUBLISH and Will properties are forwarded unaltered by the
server:
* Payload Format Indicator
* Content Type
* Response Topic
* Correlation Data
* User Property

Not only must these properties be forwarded unaltered from an MQTT
publishing client to an MQTT receiving client, but it would also be nice
to allow for protocol interoperability:
Think about RPC request-response style patterns where the requester is
an MQTT client and the responder is an AMQP 0.9.1 or STOMP client.

We reuse the P_basic fields where possible:
* content_type (if <= 255 bytes)
* correlation_id (if <= 255 bytes)

Otherwise, we add custom AMQP 0.9.1 headers.

The headers follow the naming pattern "x-mqtt-<property>" where
<property> is the MQTT v5 property if that property makes only
(mainly) sense in the MQTT world:
* x-mqtt-user-property
* x-mqtt-payload-format-indicator

If the MQTT v5 property makes also sense outside of the MQTT world, we
name it more generic:
* x-correlation (if > 255 bytes)
* x-reply-to-topic (since P_basic.reply_to assumes a queue name)

In the future, we can think about adding a header x-reply-to-exchange
and have the MQTT plugin set its value to the configured mqtt.exchange
such that clients don't have to assume the default topic exchange amq.topic.
2023-06-21 17:14:08 +01:00
David Ansari fb7af48df6 Support Will Delay Interval
Previously, the Will Message could be kept in memory in the MQTT
connection process state. Upon termination, the Will Message is sent.

The new MQTT 5.0 feature Will Delay Interval requires storing the Will
Message outside of the MQTT connection process state.

The Will Message should not be stored node local because the client
could reconnect to a different node.

Storing the Will Message in Mnesia is not an option because we want to
get rid of Mnesia. Storing the Will Message in a Ra cluster or in Khepri
is only an option if the Will Payload is small as there is currently no
way in Ra to **efficiently** snapshot large binary data (Note that these
Will Messages are not consumed in a FIFO style workload like messages in
quorum queues. A Will Message needs to be stored for as long as the
Session lasts - up to 1 day by default, but could also be much longer if
RabbitMQ is configured with a higher maximum session expiry interval.)
Usually Will Payloads are small: They are just a notification that its
MQTT session ended abnormally. However, we don't know how users leverage
the Will Message feature. The MQTT protocol allows for large Will Payloads.

Therefore, the solution implemented in this commit - which should work
good enough - is storing the Will Message in a queue.
Each MQTT session which has a Session Expiry Interval and Will Delay
Interval of > 0 seconds will create a queue if the current Network
Connection ends where it stores its Will Message. The Will Message has a
message TTL set (corresponds to the Will Delay Interval) and the queue
has a queue TTL set (corresponds to the Session Expiry Interval).
If the client does not reconnect within the Will Delay Interval, the
message is dead lettered to the configured MQTT topic exchange
(amq.topic by default).

The Will Delay Interval can be set by both publishers and subscribers.
Therefore, the Will Message is the 1st session state that RabbitMQ keeps
for publish-only MQTT clients.

One current limitation of this commit is that a Will Message that is
delayed (i.e. Will Delay Interval is set) and retained (i.e. Will Retain
flag set) will not be retained.
One solution to retain delayed Will Messages is that the retainer
process consumes from a queue and the queue binds to the topic exchange
with a topic starting with `$`, for example `$retain/#`.
The AMQP 0.9.1 Will Message that is dead lettered could then be added a
CC header such that it won't not only be published with the Will Topic,
but also with `$retain` topic. For example, if the Will Topic is `a/b`,
it will publish with routing key `a/b` and CC header `$retain/a/b`.

The reason this is not implemented in this commit is that to keep the
currently broken retained message store behaviour, we would require
creating at least one queue per node and publishing only to that local
queue. In future, once we have a replicated retained message store based
on a Stream for example, we could just publish all retained messages to
the `$retain` topic and thefore into the Stream.
So, for now, we list "retained and delayed Will Messages" as a limitation
that they actually won't be retained.
2023-06-21 17:14:08 +01:00
David Ansari becb92ca6f Do not set queue pid in MQTT connection process
Every queue type sets the queue pid when it creates the queue.

Prior to this commit, the queue pid set within the MQTT connection
process was a bit confusing as the queue pid will be different for
classic queues and quorum queues.
2023-06-21 17:14:08 +01:00
David Ansari 60a6af0054 Rename will_msg to will_payload
when only the payload is meant.
See [v5 3.1.3.4]
2023-06-21 17:14:08 +01:00
David Ansari 605e033f43 Fix test decode_basic_properties
This commit fixes 2 separate issues:
1. No quorum queue got created in v5 because Session Expiry Interval was 0.
2. Fix a function_clause error. Pass the decoded properties further to other
functions looking up headers.
2023-06-21 17:14:08 +01:00
David Ansari 48a442b23e Change routing options from list to map
as small maps with atom keys are optimized in OTP 26.
Rename v2 to return_binding_keys to make the routing option clearer.
2023-06-21 17:14:08 +01:00
David Ansari 0183909453 Fix failing test
due to rebasing onto main.

mqtt5 branch adds a new header
```
{<<"x-mqtt-retain">>, bool, false}
```
which caused the incoming_message_interceptors test case to fail.
2023-06-21 17:14:08 +01:00
David Ansari ce573c35fa Support MQTT 5.0 Subscription Option Retain Handling
The MQTT v5 spec is a bit vague on Retain Handling 1:
"If Retain Handling is set to 1 then if the subscription did not
already exist, the Server MUST send all retained message matching the
Topic Filter of the subscription to the Client, and if the subscription
did exist the Server MUST NOT send the retained messages.
[MQTT-3.3.1-10]." [v5 3.3.1.3]

Does a subscription with the same topic filter but different
subscription options mean that "the subscription did exist"?

This commit interprets "subscription exists" as both topic filter and
subscription options must be the same.

Therefore, if a client creates a subscription with a topic filter that
is identical to a previous subscription and subscription options that
are different and Retain Handling 1, the server sends the retained
message.
2023-06-21 17:14:08 +01:00
David Ansari e2b545f270 Support MQTT 5.0 features No Local, RAP, Subscription IDs
Support subscription options "No Local" and "Retain As Published"
as well as Subscription Identifiers.

All three MQTT 5.0 features can be set on a per subscription basis.
Due to wildcards in topic filters, multiple subscriptions
can match a given topic. Therefore, to implement Retain As Published and
Subscription Identifiers, the destination MQTT connection process needs
to know what subscription(s) caused it to receive the message.

There are a few ways how this could be implemented:

1. The destination MQTT connection process is aware of all its
   subscriptions. Whenever, it receives a message, it can match the
   message's routing key / topic against all its known topic filters.
   However, to iteratively match the routing key against all topic
   filters for every received message can become very expensive in the
   worst case when the MQTT client creates many subscriptions containing
   wildcards. This could be the case for an MQTT client that acts as a
   bridge or proxy or dispatcher: It could subscribe via a wildcard for
   each of its own clients.

2. Instead of interatively matching the topic of the received message
   against all topic filters that contain wildcards, a better approach
   would be for every MQTT subscriber connection process to maintain a
   local trie datastructure (similar to how topic exchanges are
   implemented) and perform matching therefore more efficiently.
   However, this does not sound optimal either because routing is
   effectively performed twice: in the topic exchange and again against
   a much smaller trie in each destination connection process.

3. Given that the topic exchange already perform routing, a much more
   sensible way would be to send the matched binding key(s) to the
   destination MQTT connection process. A subscription (topic filter)
   maps to a binding key in AMQP 0.9.1 routing. Therefore, for the first
   time in RabbitMQ, the routing function should not only output a list
   of unique destination queues, but also the binding keys (subscriptions)
   that caused the message to be routed to the destination queue.

This commit therefore implements the 3rd approach.
The downside of the 3rd approach is that it requires API changes to the
routing function and topic exchange.

Specifically, this commit adds a new function rabbit_exchange:route/3
that accepts a list of routing options. If that list contains version 2,
the caller of the routing function knows how to handle the return value
that could also contain binding keys.

This commits allows an MQTT connection process, the channel process, and
at-most-once dead lettering to handle binding keys. Binding keys are
included as AMQP 0.9.1 headers into the basic message.
Therefore, whenever a message is sent from an MQTT client or AMQP 0.9.1
client or AMQP 1.0 client or STOMP client, the MQTT receiver will know
the subscription identifier that caused the message to be received.

Note that due to the low number of allowed wildcard characters (# and
+), the cardinality of matched binding keys shouldn't be high even if
the topic contains for example 3 levels and the message is sent to for
example 5 million destination queues. In other words, sending multiple
distinct basic messages to the destination shouldn't hurt the delegate
optimisation too much. The delegate optimisation implemented for classic
queues and rabbit_mqtt_qos0_queue(s) still takes place for all basic
messages that contain the same set of matched binding keys.

The topic exchange returns all matched binding keys by remembering the
edges walked down to the leaves. As an optimisation, only for MQTT
queues are binding keys being returned. This does add a small dependency
from app rabbit to app rabbitmq_mqtt which is not optimal. However, this
dependency should be simple to remove when omitting this optimisation.

Another important feature of this commit is persisting subscription
options and subscription identifiers because they are part of the
MQTT 5.0 session state.

In MQTT v3 and v4, the only subscription information that were part of
the session state was the topic filter and the QoS level.
Both information were implicitly stored in the form of bindings:
The topic filter as the binding key and the QoS level as the destination
queue name of the binding.

For MQTT v5 we need to persist more subscription information.
From a domain perspective, it makes sense to store subscription options
as part of subscriptions, i.e. bindings, even though they are currently
not used in routing.
Therefore, this commits stores subscription options as binding arguments.

Storing subscription options as binding arguments comes in turn with
new challenges: How to handle mixed version clusters and upgrading an
MQTT session from v3 or v4 to v5?
Imagine an MQTT client connects via v5 with Session Expiry Interval > 0
to a new node in a mixed version cluster, creates a subscription,
disconnects, and subsequently connects via v3 to an old node. The
client should continue to receive messages.

To simplify such edge cases, this commit introduces a new feature flag
called mqtt_v5. If mqtt_v5 is disabled, clients cannot connect to
RabbitMQ via MQTT 5.0.

This still doesn't entirely solve the problem of MQTT session upgrades
(v4 to v5 client) or session downgrades (v5 to v4 client).

Ideally, once mqtt_v5 is enabled, all MQTT bindings contain non-empty binding
arguments. However, this will require a feature flag migration function
to modify all MQTT bindings. To be more precise, all MQTT bindings need
to be deleted and added because the binding argument is part of the
Mnesia table key.

Since feature flag migration functions are non-trivial to implement in
RabbitMQ (they can run on every node multiple times and concurrently),
this commit takes a simpler approach:
All v3 / v4 sessions keep the empty binding argument [].
All v5 sessions use the new binding argument [#mqtt_subscription_opts{}].

This requires only handling a session upgrade / downgrade by
creating a binding (with the new binding arg) and deleting the old
binding (with the old binding arg) when processing the CONNECT packet.

Note that such session upgrades or downgrades should be rather rare in
practice. Therefore these binding transactions shouldn't hurt peformance.

The No Local option is implemented within the MQTT publishing connection
process: The message is not sent to the MQTT destination if the
destination queue name matches the current MQTT client ID and the
message was routed due to a subscription that has the No Local flag set.
This avoids unnecessary traffic on the MQTT queue.
The alternative would have been that the "receiving side" (same process)
filters the message out - which would have been more consistent in how
Retain As Published and Subscription Identifiers are implemented, but
would have caused unnecessary load on the MQTT queue.
2023-06-21 17:14:08 +01:00
David Ansari 51d659fd07 Fix failing property in packet_prop_SUITE
1. Shrinking times out if there is an error, therefore remove the 60
   seconds Bazel timeout by using a medium size bazel test suite.
2. The MQTT 5.0 spec mandates for binary data types and UTF 8 string
   data types to have values of maximum 65,535 bytes.
   Therefore, ensure this test suite does not generate data greater than
   that limit.
2023-06-21 17:14:08 +01:00
Chunyi Lyu d601c6432e Send disconnect packet from server
- when clients connect with a duplicate client id;
disconnect with reason code session taken over 142
- when keep alive has timed out;
disconnect with reason code keep alive timeout 141
2023-06-21 17:14:08 +01:00
David Ansari e273b4c87b Fix two small bugs 2023-06-21 17:14:08 +01:00
David Ansari 2270a30af0 Point emqtt to rabbitmq/emqtt:master
emqtt repos:
emqx/emqtt PR #196 is based on rabbitmq:otp-26-compatibility
emqx/emqtt PR #198 is based on ansd:master
rabbitmq/master contains both of these 2 PRs cherry-picked.

rabbitmq-server repos:
main branch points emqtt to rabbitmq:otp-26-compatibility
mqtt5 branch points emqtt to rabbitmq:master

Therefore, the current mqtt5 branch is OTP 26 compatible and can support
multiple subscription identifiers.
2023-06-21 17:14:08 +01:00
David Ansari ad5152bdd6 bazel run gazelle 2023-06-21 17:14:08 +01:00
David Ansari 80d972e308 Add property test for MQTT encoder / decoder
All MQTT packets that can be sent in both directions (from client to
server and server to client) are tested in packet_prop_SUITE.

The symmetric property is very concise because encoding and then decoding an
MQTT packet should yield the original MQTT packet.

The input data variety of the previous example based tests was very
small.
2023-06-21 17:14:08 +01:00
David Ansari 3b3ccd4d42 Simplify UNSUBACK reply
Whether a payload is sent to the client is decided by the serialiser.
2023-06-21 17:14:08 +01:00
David Ansari cd7f396bea Simplify code and remove code duplication 2023-06-21 17:14:08 +01:00
David Ansari 6f4f9506a4 Add a test case for large Receive Maximum value 2023-06-21 17:14:08 +01:00
Chunyi Lyu 818a1f410b Max 10 unack msgs from server to client
- This is a revision on the commit that implemented
receive maximum 670d6b2 which sends as many unack msgs
as the client has set to be their receive max.
This commit adds back the unack msgs limit from the server (10)
which is a much safer limit than a user provided max.
2023-06-21 17:14:08 +01:00
Chunyi Lyu bb9fed85f5 Add return codes in unsuback packet 2023-06-21 17:14:08 +01:00
Chunyi Lyu d1b173de8c Return v5 failure reason codes for suback
- mqtt v5 has more descriptive return values for suback
- added two possible failure reason codes for suback packet
one for permission error, another for quota exceeded error
- modified auth suite to assert on reason codes for v5
- no new test case since failures were already covered
2023-06-21 17:14:08 +01:00
Chunyi Lyu 471540dbdc Implement client ReceiveMaximum
- rename processor state prefetch to receive_maximum
to better match property name for mqtt 5
- defaults to 10 (as previously) when not set
not saved in session state, configuration is per
connection
2023-06-21 17:14:08 +01:00
David Ansari acd249cb0f Add a test for Session Expiry Interval 2023-06-21 17:14:08 +01:00
Chunyi Lyu 68d59bcaf3 Update sess exp interval when client reconnect
- when client reconnecting with clean start false,
server respects the new session expiry interval provided
by the client
2023-06-21 17:14:08 +01:00
David Ansari df64e3a41c Rename #mqtt_topic{} to #mqtt_subscription{}
because that's what the record represents and is
more in line with the terminology used in the MQTT
specification.
2023-06-21 17:14:08 +01:00
David Ansari 2efd9c06b8 Support Session Expiry Interval
Allow Session Expiry Interval to be changed when client DISCONNECTs.

Deprecate config subscription_ttl in favour of max_session_expiry_interval_secs
because the Session Expiry Interval will also apply to publishers that
connect with a will message and will delay interval.
"The additional session state of an MQTT v5 server includes:
* The Will Message and the Will Delay Interval
* If the Session is currently not connected, the time at which the Session
  will end and Session State will be discarded."

The Session Expiry Interval picked by the server and sent to the client
in the CONNACK is the minimum of max_session_expiry_interval_secs and
the requested Session Expiry Interval by the client in CONNECT.

This commit favours dynamically changing the queue argument x-expires
over creating millions of different policies since that many policies
will come with new scalability issues.

Dynamically changing queue arguments is not allowed by AMQP 0.9.1
clients. However, it should be perfectly okay for the MQTT plugin to do
so for the queues it manages. MQTT clients are not aware that these
queues exist.
2023-06-21 17:14:08 +01:00
David Ansari 6e9aa952ea Remove code duplication 2023-06-21 17:14:08 +01:00
Chunyi Lyu 8ce0813bda Close conn when will msg qos is 2 [MQTT-3.2.2-12]
If a Server receives a CONNECT packet containing a Will QoS that
exceeds its capabilities, it MUST reject the connection. It SHOULD
use a CONNACK packet with Reason Code 0x9B (QoS not supported) as
described in section 4.13 Handling errors, and MUST close the Network Connection
2023-06-21 17:14:08 +01:00