Commit Graph

82 Commits

Author SHA1 Message Date
Marcial Rosales e7cb2420a7 Verify non-zero DNS and email SAN 2024-10-29 16:41:20 +01:00
David Ansari fa5b738cbc Fix shard counts
Half of these groups moved to the rabbitmq_web_mqtt plugin.
2024-10-01 18:02:00 +02:00
Loïc Hoguin a0ee6ddb69
Bazel fixes following renaming of test suites 2024-09-30 12:35:44 +02:00
David Ansari bddc54613f Decrease default MQTT Maximum Packet Size
Given that the default max_message_size got decreased from 128 MiB to 16
MiB in RabbitMQ 4.0 in https://github.com/rabbitmq/rabbitmq-server/pull/11455,
it makes sense to also decrease the default MQTT Maximum Packet Size from 256 MiB to 16 MiB.
Since this change was missed in RabbitMQ 4.0, it is scheduled for RabbitMQ 4.1.
2024-09-20 16:11:21 +02:00
Marcial Rosales 1abc4ed02f Extract client_id from client cert 2024-08-30 11:39:48 +01:00
David Ansari ba14b158af Remove mqtt.default_user and mqtt.default_pass
This commit is a breaking change in RabbitMQ 4.0.

 ## What?
Remove mqtt.default_user and mqtt.default_pass
Instead, rabbit.anonymous_login_user and rabbit.anonymous_login_pass
should be used.

 ## Why?
RabbitMQ 4.0 simplifies anonymous logins.
There should be a single configuration place
```
rabbit.anonymous_login_user
rabbit.anonymous_login_pass
```
that is used for anonymous logins for any protocol.

Anonymous login is orthogonal to the protocol the client uses.
Hence, there should be a single configuration place which can then be
used for MQTT, AMQP 1.0, AMQP 0.9.1, and RabbitMQ Stream protocol.

This will also simplify switching to SASL for MQTT 5.0 in the future.
2024-08-15 10:58:48 +00:00
David Ansari 50116f0927 Require MQTT feature flags in 4.0
Require all MQTT feature flags and remove their compatibility code:
* delete_ra_cluster_mqtt_node
* rabbit_mqtt_qos0_queue
* mqtt_v5

These feature flags were introduced in or before 3.13.0.
2024-07-10 10:27:59 +02:00
Karl Nilsson 077c027d52
MQTT: speed up shared_SUITE:many_qos1_messages (#11477)
* MQTT: speed up shared_SUITE:many_qos1_messages

* speed up block_only_publisher

* MQTT: reorganise tests groups

To avoid starting a new broker for each protocol group (v3,v4,v5).

Instead we run all protocol groups under a single cluster configuration
group.

* MQTT: speed up publish_to_all_queue_types_qos* tests.

* Remove separate mnesia_store group

to speed up the test suite

* Fix wrong_shard_count

* Remove unused field

* Run subset of v3 tests

The code being tested under v3 and v4 is almost identical.
To save time in CI, we therefore run only a very small subset of tests in v3.

This cuts the total time reported by CT for the shared_SUITE from 898
seconds to 614 seconds.

Also, the java_SUITE tests in v3.

* Fix wrong_shard_count

* Fix mixed version failure

---------

Co-authored-by: David Ansari <david.ansari@gmx.de>
2024-06-19 20:02:33 +02:00
Rin Kuryloski 5debebfaf3 Use rules_elixir to build the cli without mix
Certain elixir-native deps are still build with mix, but this can be
corrected later
2024-06-18 14:50:34 +02:00
David Ansari 9f42e40346 Fix crash when old node receives new AMQP message
Similar to how we convert from mc_amqp to mc_amqpl before
sending to a classic queue or quorum queue process if
feature flag message_containers_store_amqp_v1 is disabled,
we also need to do the same conversion before sending to an MQTT QoS 0
queue on the old node.
2024-05-02 07:56:00 +00:00
David Ansari 390d5715a0 Introduce new AMQP 1.0 address format
## What?
Introduce a new address format (let's call it v2) for AMQP 1.0 source and target addresses.

The old format (let's call it v1) is described in
https://github.com/rabbitmq/rabbitmq-server/tree/v3.13.x/deps/rabbitmq_amqp1_0#routing-and-addressing

The only v2 source address format is:
```
/queue/:queue
```

The 4 possible v2 target addresses formats are:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
<null>
```
where the last AMQP <null> value format requires that each message’s `to` field contains one of:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
```

 ## Why?

The AMQP address v1 format comes with the following flaws:

1. Obscure address format:

Without reading the documentation, the differences for example between source addresses
```
/amq/queue/:queue
/queue/:queue
:queue
```
are unknown to users. Hence, the address format is obscure.

2. Implicit creation of topologies

Some address formats implicitly create queues (and bindings), such as source address
```
/exchange/:exchange/:binding-key
```
or target address
```
/queue/:queue
```
These queues and bindings are never deleted (by the AMQP 1.0 plugin.)
Implicit creation of such topologies is also obscure.

3. Redundant address formats

```
/queue/:queue
:queue
```
have the same meaning and are therefore redundant.

4. Properties section must be parsed to determine whether a routing key is present

Target address
```
/exchange/:exchange
```
requires RabbitMQ to parse the properties section in order to check whether the message `subject` is set.
If `subject` is not set, the routing key will default to the empty string.

5. Using `subject` as routing key misuses the purpose of this field.

According to the AMQP spec, the message `subject` field's purpose is:
> A common field for summary information about the message content and purpose.

6. Exchange names, queue names and routing keys must not contain the "/" (slash) character.

The current 3.13 implemenation splits by "/" disallowing these
characters in exchange, and queue names, and routing keys which is
unnecessary prohibitive.

7. Clients must create a separate link per target exchange

While this is reasonable working assumption, there might be rare use
cases where it could make sense to create many exchanges (e.g. 1
exchange per queue, see
https://github.com/rabbitmq/rabbitmq-server/discussions/10708) and have
a single application publish to all these exchanges.
With the v1 address format, for an application to send to 500 different
exchanges, it needs to create 500 links.

Due to these disadvantages and thanks to #10559 which allows clients to explicitly create topologies,
we can create a simpler, clearer, and better v2 address format.

 ## How?

 ### Design goals

Following the 7 cons from v1, the design goals for v2 are:
1. The address format should be simple so that users have a chance to
   understand the meaning of the address without necessarily consulting the docs.
2. The address format should not implicitly create queues, bindings, or exchanges.
   Instead, topologies should be created either explicitly via the new management node
   prior to link attachment (see #10559), or in future, we might support the `dynamic`
   source or target properties so that RabbitMQ creates queues dynamically.
3. No redundant address formats.
4. The target address format should explicitly state whether the routing key is present, empty,
   or will be provided dynamically in each message.
5. `Subject` should not be used as routing key. Instead, a better
   fitting field should be used.
6. Exchange names, queue names, and routing keys should allow to contain
   valid UTF-8 encoded data including the "/" character.
7. Allow both target exchange and routing key to by dynamically provided within each message.

Furthermore
8. v2 must co-exist with v1 for at least some time. Applications should be able to upgrade to
   RabbitMQ 4.0 while continuing to use v1. Examples include AMQP 1.0 shovels and plugins communicating
   between a 4.0 and a 3.13 cluster. Starting with 4.1, we should change the AMQP 1.0 shovel and plugin clients
   to use only the new v2 address format. This will allow AMQP 1.0 and plugins to communicate between a 4.1 and 4.2 cluster.
   We will deprecate v1 in 4.0 and remove support for v1 in a later 4.x version.

 ### Additional Context

The address is usually a String, but can be of any type.

The [AMQP Addressing extension](https://docs.oasis-open.org/amqp/addressing/v1.0/addressing-v1.0.html)
suggests that addresses are URIs and are therefore hierarchical and could even contain query parameters:
> An AMQP address is a URI reference as defined by RFC3986.

> the path expression is a sequence of identifier segments that reflects a path through an
> implementation specific relationship graph of AMQP nodes and their termini.
> The path expression MUST resolve to a node’s terminus in an AMQP container.

The [Using the AMQP Anonymous Terminus for Message Routing Version 1.0](https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html)
extension allows for the target being `null` and the `To` property to contain the node address.
This corresponds to AMQP 0.9.1 where clients can send each message on the same channel to a different `{exchange, routing-key}` destination.

The following v2 address formats will be used.

 ### v2 addresses

A new deprecated feature flag `amqp_address_v1` will be introduced in 4.0 which is permitted by default.
Starting with 4.1, we should change the AMQP 1.0 shovel and plugin AMQP 1.0 clients to use only the new v2 address format.
However, 4.1 server code must still understand the 4.0 AMQP 1.0 shovel and plugin AMQP 1.0 clients’ v1 address format.
The new deprecated feature flag will therefore be denied by default in 4.2.
This allows AMQP 1.0 shovels and plugins to work between
* 4.0 and 3.13 clusters using v1
* 4.1 and 4.0 clusters using v2 from 4.1 to v4.0 and v1 from 4.0 to 4.1
* 4.2 and 4.1 clusters using v2

without having to support both v1 and v2 at the same time in the AMQP 1.0 shovel and plugin clients.
While supporting both v1 and v2 in these clients is feasible, it's simpler to switch the client code directly from v1 to v2.

 ### v2 source addresses

The source address format is
```
/queue/:queue
```
If the deprecated feature flag `amqp_address_v1` is permitted and the queue does not exist, the queue will be auto-created.
If the deprecated feature flag `amqp_address_v1` is denied, the queue must exist.

 ### v2 target addresses

v1 requires attaching a new link for each destination exchange.
v2 will allow dynamic `{exchange, routing-key}` combinations for a given link.
v2 therefore allows for the rare use cases where a single AMQP 1.0 publisher app needs to send to many different exchanges.
Setting up a link per destination exchange could be cumbersome.
Hence, v2 will support the dynamic `{exchange, routing-key}` combinations of AMQP 0.9.1.
To achieve this, we make use of the "Anonymous Terminus for Message Routing" extension:
The target address will contain the AMQP value null.
The `To` field in each message must be set and contain either address format
```
/exchange/:exchange/key/:routing-key
```
or
```
/exchange/:exchange
```
when using the empty routing key.

The `to` field requires an address type and is better suited than the `subject field.

Note that each message will contain this `To` value for the anonymous terminus.
Hence, we should save some bytes being sent across the network and stored on disk.
Using a format
```
/e/:exchange/k/:routing-key
```
saves more bytes, but is too obscure.
However, we use only `/key/` instead of `/routing-key/` so save a few bytes.
This also simplifies the format because users don’t have to remember whether to use spell `routing-key` or `routing_key` or `routingkey`.

The other allowed target address formats are:
```
/exchange/:exchange/key/:routing-key
```
where exchange and routing key are static on the given link.

```
/exchange/:exchange
```
where exchange and routing key are static on the given link, and routing key will be the empty string (useful for example for the fanout exchange).

```
/queue/:queue
```
This provides RabbitMQ beginners the illusion of sending a message directly
to a queue without having to understand what exchanges and routing keys are.
If the deprecated feature flag `amqp_address_v1` is permitted and the queue does not exist, the queue will be auto-created.
If the deprecated feature flag `amqp_address_v1` is denied, the queue must exist.
Besides the additional queue existence check, this queue target is different from
```
/exchange//key/:queue
```
in that queue specific optimisations might be done (in future) by RabbitMQ
(for example different receiving queue types could grant different amounts of link credits to the sending clients).
A write permission check to the amq.default exchange will be performed nevertheless.

v2 will prohibit the v1 static link & dynamic routing-key combination
where the routing key is sent in the message `subject` as that’s also obscure.
For this use case, v2’s new anonymous terminus can be used where both exchange and routing key are defined in the message’s `To` field.

(The bare message must not be modified because it could be signed.)

The alias format
```
/topic/:topic
```
will also be removed.
Sending to topic exchanges is arguably an advanced feature.
Users can directly use the format
```
/exchange/amq.topic/key/:topic
```
which reduces the number of redundant address formats.

 ### v2 address format reference

To sump up (and as stated at the top of this commit message):

The only v2 source address format is:
```
/queue/:queue
```

The 4 possible v2 target addresses formats are:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
<null>
```
where the last AMQP <null> value format requires that each message’s `to` field contains one of:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
```

Hence, all 8 listed design goals are reached.
2024-04-05 12:22:02 +02:00
David Ansari 8cb313d5a1 Support AMQP 1.0 natively
## What

Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0.
By "native", we mean do not proxy via AMQP 0.9.1 anymore.

  ## Why

Native AMQP 1.0 comes with the following major benefits:
1. Similar to Native MQTT, this commit provides better throughput, latency,
   scalability, and resource usage for AMQP 1.0.
   See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements.
   See further below for some benchmarks.
2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol,
   this commit allows implementing more AMQP 1.0 features in the future.
   Some features are already implemented in this commit (see next section).
3. Simpler, better understandable, and more maintainable code.

Native AMQP 1.0 as implemented in this commit has the
following major benefits compared to AMQP 0.9.1:
4. Memory and disk alarms will only stop accepting incoming TRANSFER frames.
   New connections can still be created to consume from RabbitMQ to empty queues.
5. Due to 4. no need anymore for separate connections for publishers and
   consumers as we currently recommended for AMQP 0.9.1. which potentially
   halves the number of physical TCP connections.
6. When a single connection sends to multiple target queues, a single
   slow target queue won't block the entire connection.
   Publisher can still send data quickly to all other target queues.
7. A publisher can request whether it wants publisher confirmation on a per-message basis.
   In AMQP 0.9.1 publisher confirms are configured per channel only.
8. Consumers can change their "prefetch count" dynamically which isn't
   possible in our AMQP 0.9.1 implementation. See #10174
9. AMQP 1.0 is an extensible protocol

This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in
RabbitMQ 3.x - most of which cannot be backported due to the complexity
and limitations of the old 3.x implementation.

This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.

 ## Implementation details

1. Breaking change: With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
will break because we always convert according to the message container conversions.
For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties.
Also, `false` won’t be respected since we always convert the headers with message containers.

2. Remove rabbit_queue_collector

rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.

3. 7 processes per connection + 1 process per session in this commit instead of
   7 processes per connection + 15 processes per session in 3.x
Supervision hierarchy got re-designed.

4. Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel.
Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session.
Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
  a connection write heavily at the same time.

This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.
Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
  can accumulate across all sessions bytes before flushing the socket.

In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy.
We still ensure high throughput by having separate reader, writer, and
session processes.

5. Transform rabbit_amqp1_0_writer into gen_server
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's better
to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.

How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.

6. Remove stats timer from writer
AMQP 1.0 connections haven't emitted any stats previously.

7. When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs.
This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
  settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
  and unsettled TRANSFERs.

8. Introduce credit API v2
Why:
The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly
designed since basic.credit is a synchronous call into the queue process
blocking the entire AMQP 1.0 session process.

How:
Change the interactions between queue clients and queue server
implementations:
* Clients only request a credit reply if the FLOW's `echo` field is set
* Include all link flow control state held by the queue process into a
  new credit_reply queue event:
  * `available` after the queue sends any deliveries
  * `link-credit` after the queue sends any deliveries
  * `drain` which allows us to combine the old queue events
    send_credit_reply and send_drained into a single new queue event
    credit_reply.
* Include the consumer tag into the credit_reply queue event such that
  the AMQP 1.0 session process can process any credit replies
  asynchronously.

Link flow control state `delivery-count` also moves to the queue processes.

The new interactions are hidden behind feature flag credit_api_v2 to
allow for rolling upgrades from 3.13 to 4.0.

9. Use serial number arithmetic in quorum queues and session process.

10. Completely bypass the rabbit_limiter module for AMQP 1.0
flow control. The goal is to eventually remove the rabbit_limiter module
in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This
commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter
into rabbit_queue_consumers.

11. Fix credit bug for streams:
AMQP 1.0 settlements shouldn't top up link credit,
only FLOW frames should top up link credit.

12. Allow sender settle mode unsettled for streams
since AMQP 1.0 acknowledgements to streams are no-ops (currently).

13. Fix AMQP 1.0 client bugs
Auto renewing credits should not be related to settling TRANSFERs.
Remove field link_credit_unsettled as it was wrong and confusing.
Prior to this commit auto renewal did not work when the sender uses
sender settlement mode settled.

14. Fix AMQP 1.0 client bugs
The wrong outdated Link was passed to function auto_flow/2

15. Use osiris chunk iterator
Only hold messages of uncompressed sub batches in memory if consumer
doesn't have sufficient credits.
Compressed sub batches are skipped for non Stream protocol consumers.

16. Fix incoming link flow control
Always use confirms between AMQP 1.0 queue clients and queue servers.
As already done internally by rabbit_fifo_client and
rabbit_stream_queue, use confirms for classic queues as well.

17. Include link handle into correlation when publishing messages to target queues
such that session process can correlate confirms from target queues to
incoming links.

18. Only grant more credits to publishers if publisher hasn't sufficient credits
anymore and there are not too many unconfirmed messages on the link.

19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow
between classic queue process and session process.

20. Link flow control is independent between links.
A client can refer to a queue or to an exchange with multiple
dynamically added target queues. Multiple incoming links can also fan
in to the same queue. However the link topology looks like, this
commit ensures that each link is only granted more credits if that link
isn't overloaded.

21. A connection or a session can send to many different queues.
In AMQP 0.9.1, a single slow queue will lead to the entire channel, and
then entire connection being blocked.
This commit makes sure that a single slow queue from one link won't slow
down sending on other links.
For example, having link A sending to a local classic queue and
link B sending to 5 replica quorum queue, link B will naturally
grant credits slower than link A. So, despite the quorum queue being
slower in confirming messages, the same AMQP 1.0 connection and session
can still pump data very fast into the classic queue.

22. If cluster wide memory or disk alarm occurs.
Each session sends a FLOW with incoming-window to 0 to sending client.
If sending clients don’t obey, force disconnect the client.

If cluster wide memory alarm clears:
Each session resumes with a FLOW defaulting to initial incoming-window.

23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms,
specifically, attaching consumers and consuming, i.e. emptying queues.
There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.

24. Flow control summary:
* If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
* If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
* If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied.
Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.

25. Register AMQP sessions
Prefer local-only pg over our custom pg_local implementation as
pg is a better process group implementation than pg_local.
pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.

26. Start a local-only pg when Rabbit boots:
> A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name:
> pg:start_link(node()).
Register AMQP 1.0 connections and sessions with pg.

In future we should remove pg_local and instead use the new local-only
pg for all registered processes such as AMQP 0.9.1 connections and channels.

27. Requeue messages if link detached
Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed'
field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed,
we expect every outstanding delivery to be requeued.
In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued.
Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1:
"After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can
still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them."
[https://www.rabbitmq.com/consumers.html#unsubscribing]
An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries

28. Init AMQP session with BEGIN frame
Similar to how there can't be an MQTT processor without a CONNECT
frame, there can't be an AMQP session without a BEGIN frame.
This allows having strict dialyzer types for session flow control
fields (i.e. not allowing 'undefined').

29. Move serial_number to AMQP 1.0 common lib
such that it can be used by both AMQP 1.0 server and client

30. Fix AMQP client to do serial number arithmetic.

31. AMQP client: Differentiate between delivery-id and transfer-id for better
understandability.

32. Fix link flow control in classic queues
This commit fixes
```
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
```
followed by
```
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2
```
Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.

The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.
Fixes #2597

The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.

Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.

33. The double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This is a big simplification.

34. This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.

Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.

Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).

Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.

This approach is safe and simple.

The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:

i. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.

ii. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simultaneously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?

35. Support stream filtering in AMQP 1.0 (by @acogoluegnes)
Use the x-stream-filter-value message annotation
to carry the filter value in a published message.
Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered
filters when creating a receiver that wants to filter
out messages from a stream.

36. Remove credit extension from AMQP 0.9.1 client

37. Support maintenance mode closing AMQP 1.0 connections.

38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.

39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default.
    The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment
    tools from failing that execute:
```
rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
```

40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`.
Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`:
```
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}
```

 ## Benchmarks

 ### Throughput & Latency

Setup:
* Single node Ubuntu 22.04
* Erlang 26.1.1

Start RabbitMQ:
```
make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"
```

Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.

Start client:
https://github.com/ssorj/quiver
https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
```

1. Classic queue
```
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```

This commit:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s

Latencies by percentile:

          0% ........ 0 ms       90.00% ........ 9 ms
         25% ........ 2 ms       99.00% ....... 14 ms
         50% ........ 4 ms       99.90% ....... 17 ms
        100% ....... 26 ms       99.99% ....... 24 ms
```

RabbitMQ 3.x (main branch as of 30 January 2024):
```
---------------------- Sender -----------------------  --------------------- Receiver ----------------------  --------
Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Lat [ms]
-----------------------------------------------------  -----------------------------------------------------  --------
     2.1        130,814      65,342        6     73.6       2.1          3,217       1,607        0      8.0       511
     4.1        163,580      16,367        2     74.1       4.1          3,217           0        0      8.0         0
     6.1        229,114      32,767        3     74.1       6.1          3,217           0        0      8.0         0
     8.1        261,880      16,367        2     74.1       8.1         67,874      32,296        8      8.2     7,662
    10.1        294,646      16,367        2     74.1      10.1         67,874           0        0      8.2         0
    12.1        360,180      32,734        3     74.1      12.1         67,874           0        0      8.2         0
    14.1        392,946      16,367        3     74.1      14.1         68,604         365        0      8.2    12,147
    16.1        458,480      32,734        3     74.1      16.1         68,604           0        0      8.2         0
    18.1        491,246      16,367        2     74.1      18.1         68,604           0        0      8.2         0
    20.1        556,780      32,767        4     74.1      20.1         68,604           0        0      8.2         0
    22.1        589,546      16,375        2     74.1      22.1         68,604           0        0      8.2         0
receiver timed out
    24.1        622,312      16,367        2     74.1      24.1         68,604           0        0      8.2         0
quiver:  error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
  File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
    _plano.wait(receiver, check=True)
  File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
    raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
```

2. Quorum queue:
```
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s

Latencies by percentile:

          0% ....... 11 ms       90.00% ....... 23 ms
         25% ....... 15 ms       99.00% ....... 28 ms
         50% ....... 18 ms       99.90% ....... 33 ms
        100% ....... 49 ms       99.99% ....... 47 ms
```

RabbitMQ 3.x:
```
---------------------- Sender -----------------------  --------------------- Receiver ----------------------  --------
Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Lat [ms]
-----------------------------------------------------  -----------------------------------------------------  --------
     2.1        130,814      65,342        9     69.9       2.1         18,430       9,206        5      7.6     1,221
     4.1        163,580      16,375        5     70.2       4.1         18,867         218        0      7.6     2,168
     6.1        229,114      32,767        6     70.2       6.1         18,867           0        0      7.6         0
     8.1        294,648      32,734        7     70.2       8.1         18,867           0        0      7.6         0
    10.1        360,182      32,734        6     70.2      10.1         18,867           0        0      7.6         0
    12.1        425,716      32,767        6     70.2      12.1         18,867           0        0      7.6         0
receiver timed out
    14.1        458,482      16,367        5     70.2      14.1         18,867           0        0      7.6         0
quiver:  error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
  File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
    _plano.wait(receiver, check=True)
  File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
    raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
```

3. Stream:
```
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose
```

This commit:
```
Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s
```

RabbitMQ 3.x:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s
```

 ### Memory usage

Start RabbitMQ:
```
ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
```

```
/bin/cat rabbitmq.conf

tcp_listen_options.sndbuf  = 2048
tcp_listen_options.recbuf  = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none
```

Create 50k connections with 2 sessions per connection, i.e. 100k session in total:

```go
package main

import (
	"context"
	"log"
	"time"

	"github.com/Azure/go-amqp"
)

func main() {
	for i := 0; i < 50000; i++ {
		conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
		if err != nil {
			log.Fatal("dialing AMQP server:", err)
		}
		_, err = conn.NewSession(context.TODO(), nil)
		if err != nil {
			log.Fatal("creating AMQP session:", err)
		}
		_, err = conn.NewSession(context.TODO(), nil)
		if err != nil {
			log.Fatal("creating AMQP session:", err)
		}
	}
	log.Println("opened all connections")
	time.Sleep(5 * time.Hour)
}
```

This commit:
```
erlang:memory().
[{total,4586376480},
 {processes,4025898504},
 {processes_used,4025871040},
 {system,560477976},
 {atom,1048841},
 {atom_used,1042841},
 {binary,233228608},
 {code,21449982},
 {ets,108560464}]

erlang:system_info(process_count).
450289
```
7 procs per connection + 1 proc per session.
(7 + 2*1) * 50,000 = 450,000 procs

RabbitMQ 3.x:
```
erlang:memory().
[{total,15168232704},
 {processes,14044779256},
 {processes_used,14044755120},
 {system,1123453448},
 {atom,1057033},
 {atom_used,1052587},
 {binary,236381264},
 {code,21790238},
 {ets,391423744}]

erlang:system_info(process_count).
1850309
```
7 procs per connection + 15 per session
(7 + 2*15) * 50,000 = 1,850,000 procs

50k connections + 100k session require
with this commit: 4.5 GB
in RabbitMQ 3.x: 15 GB

 ## Future work

1. More efficient parser and serializer
2. TODO in mc_amqp: Do not store the parsed message on disk.
3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP
clients to create RabbitMQ objects (queues, exchanges, ...).
2024-02-28 14:15:20 +01:00
David Ansari 6a3ba6210a
Reduce per message disk overhead (#10339)
* Reduce per message disk overhead

Message container annotation keys are stored on disk.
By shortening them we save 95 - 58 = 37 bytes per message.
```
1> byte_size(term_to_binary(#{exchange => <<>>, routing_keys => [<<"my-key">>], durable => true, priority => 3, timestamp => 1000})).
95
2> byte_size(term_to_binary(#{x => <<>>, rk => [<<"my-key">>], d => true, p => 3, ts => 1000})).
58
```
This should somewhat reduce disk I/O and disk space.

* Ensure durable is a boolean

Prevent key 'durable' with value 'undefined' being added to the
mc annotations, for example when the durable field was not set, but
another AMQP 1.0 header field was set.

* Apply feedback
2024-01-18 11:53:02 +01:00
David Ansari c8b90488e7 Skip some assertions in mixed version tests
See commit message 00c77e0a1a for details.

In a multi node mixed version cluster where the lower version is
compiled with a different OTP version, anonymous Ra leader queries will
fail with a badfun error if initiated on the higher version and executed
on the leader on the lower version node.
2023-10-27 15:12:51 +02:00
David Ansari c914e43097 Do not enable OTP feature at runtime
Since Erlang/OTP 26:
```
OTP-18445
Application(s):
erts, stdlib
It is no longer necessary to enable a feature in the runtime system in order to load modules that are using it.
It is sufficient to enable the feature in the compiler when compiling it.
That means that to use feature maybe_expr in Erlang/OTP 26, it is sufficient to enable it during compilation.
In Erlang/OTP 27, feature maybe_expr will be enabled by default, but it will be possible to disable it.
```
2023-10-27 09:26:31 +02:00
Diana Parra Corbacho 5f0981c5a3
Allow to use Khepri database to store metadata instead of Mnesia
[Why]

Mnesia is a very powerful and convenient tool for Erlang applications:
it is a persistent disc-based database, it handles replication accross
multiple Erlang nodes and it is available out-of-the-box from the
Erlang/OTP distribution. RabbitMQ relies on Mnesia to manage all its
metadata:

* virtual hosts' properties
* intenal users
* queue, exchange and binding declarations (not queues data)
* runtime parameters and policies
* ...

Unfortunately Mnesia makes it difficult to handle network partition and,
as a consequence, the merge conflicts between Erlang nodes once the
network partition is resolved. RabbitMQ provides several partition
handling strategies but they are not bullet-proof. Users still hit
situations where it is a pain to repair a cluster following a network
partition.

[How]

@kjnilsson created Ra [1], a Raft consensus library that RabbitMQ
already uses successfully to implement quorum queues and streams for
instance. Those queues do not suffer from network partitions.

We created Khepri [2], a new persistent and replicated database engine
based on Ra and we want to use it in place of Mnesia in RabbitMQ to
solve the problems with network partitions.

This patch integrates Khepri as an experimental feature. When enabled,
RabbitMQ will store all its metadata in Khepri instead of Mnesia.

This change comes with behavior changes. While Khepri remains disabled,
you should see no changes to the behavior of RabbitMQ. If there are
changes, it is a bug. After Khepri is enabled, there are significant
changes of behavior that you should be aware of.

Because it is based on the Raft consensus algorithm, when there is a
network partition, only the cluster members that are in the partition
with at least `(Number of nodes in the cluster ÷ 2) + 1` number of nodes
can "make progress". In other words, only those nodes may write to the
Khepri database and read from the database and expect a consistent
result.

For instance in a cluster of 5 RabbitMQ nodes:
* If there are two partitions, one with 3 nodes, one with 2 nodes, only
  the group of 3 nodes will be able to write to the database.
* If there are three partitions, two with 2 nodes, one with 1 node, none
  of the group can write to the database.

Because the Khepri database will be used for all kind of metadata, it
means that RabbitMQ nodes that can't write to the database will be
unable to perform some operations. A list of operations and what to
expect is documented in the associated pull request and the RabbitMQ
website.

This requirement from Raft also affects the startup of RabbitMQ nodes in
a cluster. Indeed, at least a quorum number of nodes must be started at
once to allow nodes to become ready.

To enable Khepri, you need to enable the `khepri_db` feature flag:

    rabbitmqctl enable_feature_flag khepri_db

When the `khepri_db` feature flag is enabled, the migration code
performs the following two tasks:
1. It synchronizes the Khepri cluster membership from the Mnesia
   cluster. It uses `mnesia_to_khepri:sync_cluster_membership/1` from
   the `khepri_mnesia_migration` application [3].
2. It copies data from relevant Mnesia tables to Khepri, doing some
   conversion if necessary on the way. Again, it uses
   `mnesia_to_khepri:copy_tables/4` from `khepri_mnesia_migration` to do
   it.

This can be performed on a running standalone RabbitMQ node or cluster.
Data will be migrated from Mnesia to Khepri without any service
interruption. Note that during the migration, the performance may
decrease and the memory footprint may go up.

Because this feature flag is considered experimental, it is not enabled
by default even on a brand new RabbitMQ deployment.

More about the implementation details below:

In the past months, all accesses to Mnesia were isolated in a collection
of `rabbit_db*` modules. This is where the integration of Khepri mostly
takes place: we use a function called `rabbit_khepri:handle_fallback/1`
which selects the database and perform the query or the transaction.
Here is an example from `rabbit_db_vhost`:

* Up until RabbitMQ 3.12.x:

        get(VHostName) when is_binary(VHostName) ->
            get_in_mnesia(VHostName).

* Starting with RabbitMQ 3.13.0:

        get(VHostName) when is_binary(VHostName) ->
            rabbit_khepri:handle_fallback(
              #{mnesia => fun() -> get_in_mnesia(VHostName) end,
                khepri => fun() -> get_in_khepri(VHostName) end}).

This `rabbit_khepri:handle_fallback/1` function relies on two things:
1. the fact that the `khepri_db` feature flag is enabled, in which case
   it always executes the Khepri-based variant.
4. the ability or not to read and write to Mnesia tables otherwise.

Before the feature flag is enabled, or during the migration, the
function will try to execute the Mnesia-based variant. If it succeeds,
then it returns the result. If it fails because one or more Mnesia
tables can't be used, it restarts from scratch: it means the feature
flag is being enabled and depending on the outcome, either the
Mnesia-based variant will succeed (the feature flag couldn't be enabled)
or the feature flag will be marked as enabled and it will call the
Khepri-based variant. The meat of this function really lives in the
`khepri_mnesia_migration` application [3] and
`rabbit_khepri:handle_fallback/1` is a wrapper on top of it that knows
about the feature flag.

However, some calls to the database do not depend on the existence of
Mnesia tables, such as functions where we need to learn about the
members of a cluster. For those, we can't rely on exceptions from
Mnesia. Therefore, we just look at the state of the feature flag to
determine which database to use. There are two situations though:

* Sometimes, we need the feature flag state query to block because the
  function interested in it can't return a valid answer during the
  migration. Here is an example:

        case rabbit_khepri:is_enabled(RemoteNode) of
            true  -> can_join_using_khepri(RemoteNode);
            false -> can_join_using_mnesia(RemoteNode)
        end

* Sometimes, we need the feature flag state query to NOT block (for
  instance because it would cause a deadlock). Here is an example:

        case rabbit_khepri:get_feature_state() of
            enabled -> members_using_khepri();
            _       -> members_using_mnesia()
        end

Direct accesses to Mnesia still exists. They are limited to code that is
specific to Mnesia such as classic queue mirroring or network partitions
handling strategies.

Now, to discover the Mnesia tables to migrate and how to migrate them,
we use an Erlang module attribute called
`rabbit_mnesia_tables_to_khepri_db` which indicates a list of Mnesia
tables and an associated converter module. Here is an example in the
`rabbitmq_recent_history_exchange` plugin:

    -rabbit_mnesia_tables_to_khepri_db(
       [{?RH_TABLE, rabbit_db_rh_exchange_m2k_converter}]).

The converter module  — `rabbit_db_rh_exchange_m2k_converter` in this
example  — is is fact a "sub" converter module called but
`rabbit_db_m2k_converter`. See the documentation of a `mnesia_to_khepri`
converter module to learn more about these modules.

[1] https://github.com/rabbitmq/ra
[2] https://github.com/rabbitmq/khepri
[3] https://github.com/rabbitmq/khepri_mnesia_migration

See #7206.

Co-authored-by: Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com>
Co-authored-by: Diana Parra Corbacho <dparracorbac@vmware.com>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
2023-09-29 16:00:11 +02:00
David Ansari 62710f576e Add integration test MQTT 5.0 -> Stream
Add an integration test that sends via MQTT 5.0,
converts the MQTT message to an AMQP 1.0 message via mc_mqtt,
and consumes via the Stream protocol.
2023-09-05 11:04:44 +02:00
Karl Nilsson 119f034406
Message Containers (#5077)
This PR implements an approach for a "protocol (data format) agnostic core" where the format of the message isn't converted at point of reception.

Currently all non AMQP 0.9.1 originating messages are converted into a AMQP 0.9.1 flavoured basic_message record before sent to a queue. If the messages are then consumed by the originating protocol they are converted back from AMQP 0.9.1. For some protocols such as MQTT 3.1 this isn't too expensive as MQTT is mostly a fairly easily mapped subset of AMQP 0.9.1 but for others such as AMQP 1.0 the conversions are awkward and in some cases lossy even if consuming from the originating protocol.

This PR instead wraps all incoming messages in their originating form into a generic, extensible message container type (mc). The container module exposes an API to get common message details such as size and various properties (ttl, priority etc) directly from the source data type. Each protocol needs to implement the mc behaviour such that when a message originating form one protocol is consumed by another protocol we convert it to the target protocol at that point.

The message container also contains annotations, dead letter records and other meta data we need to record during the lifetime of a message. The original protocol message is never modified unless it is consumed.

This includes conversion modules to and from amqp, amqpl (AMQP 0.9.1) and mqtt.


COMMIT HISTORY:

* Refactor away from using the delivery{} record

In many places including exchange types. This should make it
easier to move towards using a message container type instead of
basic_message.

Add mc module and move direct replies outside of exchange

Lots of changes incl classic queues

Implement stream support incl amqp conversions

simplify mc state record

move mc.erl

mc dlx stuff

recent history exchange

Make tracking work

But doesn't take a protocol agnostic approach as we just convert
everything into AMQP legacy and back. Might be good enough for now.

Tracing as a whole may want a bit of a re-vamp at some point.

tidy

make quorum queue peek work by legacy conversion

dead lettering fixes

dead lettering fixes

CMQ fixes

rabbit_trace type fixes

fixes

fix

Fix classic queue props

test assertion fix

feature flag and backwards compat

Enable message_container feature flag in some SUITEs

Dialyzer fixes

fixes

fix

test fixes

Various

Manually update a gazelle generated file

until a gazelle enhancement can be made
https://github.com/rabbitmq/rules_erlang/issues/185

Add message_containers_SUITE to bazel

and regen bazel files with gazelle from rules_erlang@main

Simplify essential proprty access

Such as durable, ttl and priority by extracting them into annotations
at message container init time.

Move type

to remove dependenc on amqp10 stuff in mc.erl

mostly because I don't know how to make bazel do the right thing

add more stuff

Refine routing header stuff

wip

Cosmetics

Do not use "maybe" as type name as "maybe" is a keyword since OTP 25
which makes Erlang LS complain.

* Dedup death queue names

* Fix function clause crashes

Fix failing tests in the MQTT shared_SUITE:
A classic queue message ID can be undefined as set in
fbe79ff47b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl (L1048)

Fix failing tests in the MQTT shared_SUITE-mixed:
When feature flag message_containers is disabled, the
message is not an #mc{} record, but a #basic_message{} record.

* Fix is_utf8_no_null crash

Prior to this commit, the function crashed if invalid UTF-8 was
provided, e.g.:
```
1> rabbit_misc:is_valid_shortstr(<<"😇"/utf16>>).
** exception error: no function clause matching rabbit_misc:is_utf8_no_null(<<216,61,222,7>>) (rabbit_misc.erl, line 1481)
```

* Implement mqtt mc behaviour

For now via amqp translation.

This is still work in progress, but the following SUITEs pass:
```
make -C deps/rabbitmq_mqtt ct-shared t=[mqtt,v5,cluster_size_1] FULL=1
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_1] FULL=1
```

* Shorten mc file names

Module name length matters because for each persistent message the #mc{}
record is persisted to disk.

```
1> iolist_size(term_to_iovec({mc, rabbit_mc_amqp_legacy})).
30
2> iolist_size(term_to_iovec({mc, mc_amqpl})).
17
```

This commit renames the mc modules:
```
ag -l rabbit_mc_amqp_legacy | xargs sed -i 's/rabbit_mc_amqp_legacy/mc_amqpl/g'
ag -l rabbit_mc_amqp | xargs sed -i 's/rabbit_mc_amqp/mc_amqp/g'
ag -l rabbit_mqtt_mc | xargs sed -i 's/rabbit_mqtt_mc/mc_mqtt/g'
```

* mc: make deaths an annotation + fixes

* Fix mc_mqtt protocol_state callback

* Fix test will_delay_node_restart

```
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_3]:will_delay_node_restart FULL=1
```

* Bazel run gazelle

* mix format rabbitmqctl.ex

* Ensure ttl annotation is refelected in amqp legacy protocol state

* Fix id access in message store

* Fix rabbit_message_interceptor_SUITE

* dializer fixes

* Fix rabbit:rabbit_message_interceptor_SUITE-mixed

set_annotation/3 should not result in duplicate keys

* Fix MQTT shared_SUITE-mixed

Up to 3.12 non-MQTT publishes were always QoS 1 regardless of delivery_mode.
75a953ce28/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl (L2075-L2076)
From now on, non-MQTT publishes are QoS 1 if durable.
This makes more sense.

The MQTT plugin must send a #basic_message{} to an old node that does
not understand message containers.

* Field content of 'v1_0.data' can be binary

Fix
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed \
    --test_env FOCUS="-group [mqtt,v4,cluster_size_1] -case trace" \
    -t- --test_sharding_strategy=disabled
```

* Remove route/2 and implement route/3 for all exchange types.

This removes the route/2 callback from rabbit_exchange_type and
makes route/3 mandatory instead. This is a breaking change and
will require all implementations of exchange types to update their
code, however this is necessary anyway for them to correctly handle
the mc type.

stream filtering fixes

* Translate directly from MQTT to AMQP 0.9.1

* handle undecoded properties in mc_compat

amqpl: put clause in right order

recover death deatails from amqp data

* Replace callback init_amqp with convert_from

* Fix return value of lists:keyfind/3

* Translate directly from AMQP 0.9.1 to MQTT

* Fix MQTT payload size

MQTT payload can be a list when converted from AMQP 0.9.1 for example

First conversions tests

Plus some other conversion related fixes.

bazel

bazel

translate amqp 1.0 null to undefined

mc: property/2 and correlation_id/message_id return type tagged values.

To ensure we can support a variety of types better.

The type type tags are AMQP 1.0 flavoured.

fix death recovery

mc_mqtt: impl new api

Add callbacks to allow protocols to compact data before storage

And make readable if needing to query things repeatedly.

bazel fix

* more decoding

* tracking mixed versions compat

* mc: flip default of `durable` annotation to save some data.

Assuming most messages are durable and that in memory messages suffer less
from persistence overhead it makes sense for a non existent `durable`
annotation to mean durable=true.

* mc conversion tests and tidy up

* mc make x_header unstrict again

* amqpl: death record fixes

* bazel

* amqp -> amqpl conversion test

* Fix crash in mc_amqp:size/1

Body can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0/ ct-system t=java
```
on branch native-amqp.

* Fix crash in lists:flatten/1

Data can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp.

* Fix crash in rabbit_writer

Running test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp resulted in the following crash:
```
crasher:
  initial call: rabbit_writer:enter_mainloop/2
  pid: <0.711.0>
  registered_name: []
  exception error: bad argument
    in function  size/1
       called as size([<<0>>,<<"Sw">>,[<<160,2>>,<<"hi">>]])
       *** argument 1: not tuple or binary
    in call from rabbit_binary_generator:build_content_frames/7 (rabbit_binary_generator.erl, line 89)
    in call from rabbit_binary_generator:build_simple_content_frames/4 (rabbit_binary_generator.erl, line 61)
    in call from rabbit_writer:assemble_frames/5 (rabbit_writer.erl, line 334)
    in call from rabbit_writer:internal_send_command_async/3 (rabbit_writer.erl, line 365)
    in call from rabbit_writer:handle_message/2 (rabbit_writer.erl, line 265)
    in call from rabbit_writer:handle_message/3 (rabbit_writer.erl, line 232)
    in call from rabbit_writer:mainloop1/2 (rabbit_writer.erl, line 223)
```
because #content.payload_fragments_rev is currently supposed to
be a flat list of binaries instead of being an iolist.

This commit fixes this crash inefficiently by calling
iolist_to_binary/1. A better solution would be to allow AMQP legacy's #content.payload_fragments_rev
to be an iolist.

* Add accidentally deleted line back

* mc: optimise mc_amqp internal format

By removint the outer records for message and delivery annotations
as well as application properties and footers.

* mc: optimis mc_amqp map_add by using upsert

* mc: refactoring and bug fixes

* mc_SUITE routingheader assertions

* mc remove serialize/1 callback as only used by amqp

* mc_amqp: avoid returning a nested list from protocol_state

* test and bug fix

* move infer_type to mc_util

* mc fixes and additiona assertions

* Support headers exchange routing for MQTT messages

When a headers exchange is bound to the MQTT topic exchange, routing
will be performend based on both MQTT topic (by the topic exchange) and
MQTT User Property (by the headers exchange).

This combines the best worlds of both MQTT 5.0 and AMQP 0.9.1 and
enables powerful routing topologies.

When the User Property contains the same name multiple times, only the
last name (and value) will be considered by the headers exchange.

* Fix crash when sending from stream to amqpl

When publishing a message via the stream protocol and consuming it via
AMQP 0.9.1, the following crash occurred prior to this commit:
```
crasher:
  initial call: rabbit_channel:init/1
  pid: <0.818.0>
  registered_name: []
  exception exit: {{badmatch,undefined},
                   [{rabbit_channel,handle_deliver0,4,
                                    [{file,"rabbit_channel.erl"},
                                     {line,2728}]},
                    {lists,foldl,3,[{file,"lists.erl"},{line,1594}]},
                    {rabbit_channel,handle_cast,2,
                                    [{file,"rabbit_channel.erl"},
                                     {line,728}]},
                    {gen_server2,handle_msg,2,
                                 [{file,"gen_server2.erl"},{line,1056}]},
                    {proc_lib,wake_up,3,
                              [{file,"proc_lib.erl"},{line,251}]}]}
```

This commit first gives `mc:init/3` the chance to set exchange and
routing_keys annotations.
If not set, `rabbit_stream_queue` will set these annotations assuming
the message was originally published via the stream protocol.

* Support consistent hash exchange routing for MQTT 5.0

When a consistent hash exchange is bound to the MQTT topic exchange,
MQTT 5.0 messages can be routed to queues consistently based on the
Correlation-Data in the PUBLISH packet.

* Convert MQTT 5.0 User Property

* to AMQP 0.9.1 headers
* from AMQP 0.9.1 headers
* to AMQP 1.0 application properties and message annotations
* from AMQP 1.0 application properties and message annotations

* Make use of Annotations in mc_mqtt:protocol_state/2

mc_mqtt:protocol_state/2 includes Annotations as parameter.
It's cleaner to make use of these Annotations when computing the
protocol state instead of relying on the caller (rabbitmq_mqtt_processor)
to compute the protocol state.

* Enforce AMQP 0.9.1 field name length limit

The AMQP 0.9.1 spec prohibits field names longer than 128 characters.
Therefore, when converting AMQP 1.0 message annotations, application
properties or MQTT 5.0 User Property to AMQP 0.9.1 headers, drop any
names longer than 128 characters.

* Fix type specs

Apply feedback from Michael Davis

Co-authored-by: Michael Davis <mcarsondavis@gmail.com>

* Add mc_mqtt unit test suite

Implement mc_mqtt:x_header/2

* Translate indicator that payload is UTF-8 encoded

when converting between MQTT 5.0 and AMQP 1.0

* Translate single amqp-value section from AMQP 1.0 to MQTT

Convert to a text representation, if possible, and indicate to MQTT
client that the payload is UTF-8 encoded. This way, the MQTT client will
be able to parse the payload.

If conversion to text representation is not possible, encode the payload
using the AMQP 1.0 type system and indiate the encoding via Content-Type
message/vnd.rabbitmq.amqp.

This Content-Type is not registered.
Type "message" makes sense since it's a message.
Vendor tree "vnd.rabbitmq.amqp" makes sense since merely subtype "amqp" is not
registered.

* Fix payload conversion

* Translate Response Topic between MQTT and AMQP

Translate MQTT 5.0 Response Topic to AMQP 1.0 reply-to address and vice
versa.

The Response Topic must be a UTF-8 encoded string.

This commit re-uses the already defined RabbitMQ target addresses:
```
"/topic/"     RK        Publish to amq.topic with routing key RK
"/exchange/"  X "/" RK  Publish to exchange X with routing key RK
```

By default, the MQTT topic exchange is configure dto be amq.topic using
the 1st target address.

When an operator modifies the mqtt.exchange, the 2nd target address is
used.

* Apply PR feedback

and fix formatting

Co-authored-by: Michael Davis <mcarsondavis@gmail.com>

* tidy up

* Add MQTT message_containers test

* consistent hash exchange: avoid amqp legacy conversion

When hashing on a header value.

* Avoid converting to amqp legacy when using exchange federation

* Fix test flake

* test and dialyzer fixes

* dialyzer fix

* Add MQTT protocol interoperability tests

Test receiving from and sending to MQTT 5.0 and
* AMQP 0.9.1
* AMQP 1.0
* STOMP
* Streams

* Regenerate portions of deps/rabbit/app.bzl with gazelle

I'm not exactly sure how this happened, but gazell seems to have been
run with an older version of the rules_erlang gazelle extension at
some point. This caused generation of a structure that is no longer
used. This commit updates the structure to the current pattern.

* mc: refactoring

* mc_amqpl: handle delivery annotations

Just in case they are included.

Also use iolist_to_iovec to create flat list of binaries when
converting from amqp with amqp encoded payload.

---------

Co-authored-by: David Ansari <david.ansari@gmx.de>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
Co-authored-by: Rin Kuryloski <kuryloskip@vmware.com>
2023-08-31 11:27:13 +01:00
David Ansari 2a4301e12d Nack rejected messages to MQTT 5.0 client
since MQTT 5.0 supports negative acknowledgements thanks to reason codes
in the PUBACK packet.

We could either choose reason code 128 or 131. The description code for
131 applies for rejected messages, hence this commit uses 131:
> The PUBLISH is valid but the receiver is not willing to accept it.
2023-08-09 15:31:14 +02:00
David Ansari 5208d729b3 Remove amqp_client dep from rabbitmq_mqtt
Since Native MQTT in 3.12 the MQTT plugin does not depend on AMQP 0.9.1
client anymore. Such a dependency is only needed for the MQTT tests.
2023-07-20 08:09:46 +00:00
David Ansari ed31a818c3 Make mqtt.subscription_ttl unsupported
Starting with RabbitMQ 3.13 mqtt.max_session_expiry_interval_seconds
(set in seconds) will replace the previous setting
mqtt.subscription_ttl.

MQTT 5.0 introduces the Session Expiry Interval
feature which does not only apply to subscribers, but also to
publishers.

The new config name mqtt.max_session_expiry_interval_seconds makes it clear
that it also applies to publishers.

Prior to this commit, when mqtt.subscription_ttl was set, a warning got
logged and the value was ignored. This is dangerous if an operator does
not see the warning but relies for example on `mqtt.subscription =
infinity` to not expire non clean session.

It's safer to make the boot fail if that unsupported config name is
still set. A clear error message will be logged:
```
[error] <0.142.0> Error preparing configuration in phase apply_translations:
[error] <0.142.0>   - Translation for 'rabbitmq_mqtt.subscription_ttl' found invalid configuration:
        Since 3.13 mqtt.subscription_ttl (in milliseconds) is unsupported.
        Use mqtt.max_session_expiry_interval_seconds (in seconds) instead.
```

Alternatively, RabbitMQ could translate mqtt.subscription_ttl to
mqtt.max_session_expiry_interval_seconds.

However, forcing the new config option sounds the better way to go.

Once we write MQTT 5.0 docs, this change must go into the 3.13 release notes.

This commit also renames max_session_expiry_interval_secs to max_session_expiry_interval_seconds.
The latter is clearer to users.
2023-07-13 14:47:33 +00:00
GitHub d28fb8cee4 bazel run gazelle 2023-06-22 04:02:27 +00:00
Chunyi Lyu 5fe691abe6 Run mqtt java suite with 2 shards
- one RMQ cluster per test group
2023-06-21 17:14:08 +01:00
Chunyi Lyu 147e2d6676 Run Java v5 tests in separate RMQ cluster
- to avoid test poluting since both v3 and v5 java test cases use
the same resource names
2023-06-21 17:14:08 +01:00
David Ansari 0a98c1e986 Fix assertion arguments order
The correct order is
```
assertArrayEquals(Expecteds, Actuals)
```

Do not mark the tests as flaky.
Instead, we want to understand and fix the flakes.
2023-06-21 17:14:08 +01:00
Chunyi Lyu 12cdc69572 Test MQTT 5 with proxy protocol suite 2023-06-21 17:14:08 +01:00
Chunyi Lyu 1eac345f1c Run cluster suite against v5 enabled cluster 2023-06-21 17:14:08 +01:00
David Ansari 14d81b430f Add Topic Aliases from server to client
Once the server's Topic Alias cache for messages from server to client
is full, this commit does not replace any existing aliases.
So, the first topics "win" and stay in the cache forever.
This matches the behaviour of VerneMQ and EMQX.
For now that's good enough.
In the future, we can easily change that behaviour to some smarter strategy,
for example
1. Hash the TopicName to an Topic Alias and replace the old
   alias, or
2. For the Topic Alias Cache from server to client, keep 2 Maps:
   #{TopicName => TopicAlias} and #{TopicAlias => TopicName} and a
   counter that wraps to 1 once the Topic Alias Maximum is reached and
   just replace an existing Alias if the TopicName is not cached.

Also, refactor Topic Alias Maximum:
* Remove code duplication
* Allow an operator to prohibit Topic Aliases by allowing value 0 to be
  configured
* Change config name to topic_alias_maximum to that it matches exactly
  the MQTT feture name
* Fix wrong code formatting
* Add the invalid or unkown Topic Alias to log message for easier
  troubleshooting
2023-06-21 17:14:08 +01:00
Chunyi Lyu 60f6784d30 Make Topic Alias Maximum configurable
- default to 20, configurable through cuttlefish config
- add test to v5 suite for invalid topic alias in publish
2023-06-21 17:14:08 +01:00
David Ansari e2b545f270 Support MQTT 5.0 features No Local, RAP, Subscription IDs
Support subscription options "No Local" and "Retain As Published"
as well as Subscription Identifiers.

All three MQTT 5.0 features can be set on a per subscription basis.
Due to wildcards in topic filters, multiple subscriptions
can match a given topic. Therefore, to implement Retain As Published and
Subscription Identifiers, the destination MQTT connection process needs
to know what subscription(s) caused it to receive the message.

There are a few ways how this could be implemented:

1. The destination MQTT connection process is aware of all its
   subscriptions. Whenever, it receives a message, it can match the
   message's routing key / topic against all its known topic filters.
   However, to iteratively match the routing key against all topic
   filters for every received message can become very expensive in the
   worst case when the MQTT client creates many subscriptions containing
   wildcards. This could be the case for an MQTT client that acts as a
   bridge or proxy or dispatcher: It could subscribe via a wildcard for
   each of its own clients.

2. Instead of interatively matching the topic of the received message
   against all topic filters that contain wildcards, a better approach
   would be for every MQTT subscriber connection process to maintain a
   local trie datastructure (similar to how topic exchanges are
   implemented) and perform matching therefore more efficiently.
   However, this does not sound optimal either because routing is
   effectively performed twice: in the topic exchange and again against
   a much smaller trie in each destination connection process.

3. Given that the topic exchange already perform routing, a much more
   sensible way would be to send the matched binding key(s) to the
   destination MQTT connection process. A subscription (topic filter)
   maps to a binding key in AMQP 0.9.1 routing. Therefore, for the first
   time in RabbitMQ, the routing function should not only output a list
   of unique destination queues, but also the binding keys (subscriptions)
   that caused the message to be routed to the destination queue.

This commit therefore implements the 3rd approach.
The downside of the 3rd approach is that it requires API changes to the
routing function and topic exchange.

Specifically, this commit adds a new function rabbit_exchange:route/3
that accepts a list of routing options. If that list contains version 2,
the caller of the routing function knows how to handle the return value
that could also contain binding keys.

This commits allows an MQTT connection process, the channel process, and
at-most-once dead lettering to handle binding keys. Binding keys are
included as AMQP 0.9.1 headers into the basic message.
Therefore, whenever a message is sent from an MQTT client or AMQP 0.9.1
client or AMQP 1.0 client or STOMP client, the MQTT receiver will know
the subscription identifier that caused the message to be received.

Note that due to the low number of allowed wildcard characters (# and
+), the cardinality of matched binding keys shouldn't be high even if
the topic contains for example 3 levels and the message is sent to for
example 5 million destination queues. In other words, sending multiple
distinct basic messages to the destination shouldn't hurt the delegate
optimisation too much. The delegate optimisation implemented for classic
queues and rabbit_mqtt_qos0_queue(s) still takes place for all basic
messages that contain the same set of matched binding keys.

The topic exchange returns all matched binding keys by remembering the
edges walked down to the leaves. As an optimisation, only for MQTT
queues are binding keys being returned. This does add a small dependency
from app rabbit to app rabbitmq_mqtt which is not optimal. However, this
dependency should be simple to remove when omitting this optimisation.

Another important feature of this commit is persisting subscription
options and subscription identifiers because they are part of the
MQTT 5.0 session state.

In MQTT v3 and v4, the only subscription information that were part of
the session state was the topic filter and the QoS level.
Both information were implicitly stored in the form of bindings:
The topic filter as the binding key and the QoS level as the destination
queue name of the binding.

For MQTT v5 we need to persist more subscription information.
From a domain perspective, it makes sense to store subscription options
as part of subscriptions, i.e. bindings, even though they are currently
not used in routing.
Therefore, this commits stores subscription options as binding arguments.

Storing subscription options as binding arguments comes in turn with
new challenges: How to handle mixed version clusters and upgrading an
MQTT session from v3 or v4 to v5?
Imagine an MQTT client connects via v5 with Session Expiry Interval > 0
to a new node in a mixed version cluster, creates a subscription,
disconnects, and subsequently connects via v3 to an old node. The
client should continue to receive messages.

To simplify such edge cases, this commit introduces a new feature flag
called mqtt_v5. If mqtt_v5 is disabled, clients cannot connect to
RabbitMQ via MQTT 5.0.

This still doesn't entirely solve the problem of MQTT session upgrades
(v4 to v5 client) or session downgrades (v5 to v4 client).

Ideally, once mqtt_v5 is enabled, all MQTT bindings contain non-empty binding
arguments. However, this will require a feature flag migration function
to modify all MQTT bindings. To be more precise, all MQTT bindings need
to be deleted and added because the binding argument is part of the
Mnesia table key.

Since feature flag migration functions are non-trivial to implement in
RabbitMQ (they can run on every node multiple times and concurrently),
this commit takes a simpler approach:
All v3 / v4 sessions keep the empty binding argument [].
All v5 sessions use the new binding argument [#mqtt_subscription_opts{}].

This requires only handling a session upgrade / downgrade by
creating a binding (with the new binding arg) and deleting the old
binding (with the old binding arg) when processing the CONNECT packet.

Note that such session upgrades or downgrades should be rather rare in
practice. Therefore these binding transactions shouldn't hurt peformance.

The No Local option is implemented within the MQTT publishing connection
process: The message is not sent to the MQTT destination if the
destination queue name matches the current MQTT client ID and the
message was routed due to a subscription that has the No Local flag set.
This avoids unnecessary traffic on the MQTT queue.
The alternative would have been that the "receiving side" (same process)
filters the message out - which would have been more consistent in how
Retain As Published and Subscription Identifiers are implemented, but
would have caused unnecessary load on the MQTT queue.
2023-06-21 17:14:08 +01:00
David Ansari 51d659fd07 Fix failing property in packet_prop_SUITE
1. Shrinking times out if there is an error, therefore remove the 60
   seconds Bazel timeout by using a medium size bazel test suite.
2. The MQTT 5.0 spec mandates for binary data types and UTF 8 string
   data types to have values of maximum 65,535 bytes.
   Therefore, ensure this test suite does not generate data greater than
   that limit.
2023-06-21 17:14:08 +01:00
David Ansari ad5152bdd6 bazel run gazelle 2023-06-21 17:14:08 +01:00
David Ansari 80d972e308 Add property test for MQTT encoder / decoder
All MQTT packets that can be sent in both directions (from client to
server and server to client) are tested in packet_prop_SUITE.

The symmetric property is very concise because encoding and then decoding an
MQTT packet should yield the original MQTT packet.

The input data variety of the previous example based tests was very
small.
2023-06-21 17:14:08 +01:00
David Ansari 2efd9c06b8 Support Session Expiry Interval
Allow Session Expiry Interval to be changed when client DISCONNECTs.

Deprecate config subscription_ttl in favour of max_session_expiry_interval_secs
because the Session Expiry Interval will also apply to publishers that
connect with a will message and will delay interval.
"The additional session state of an MQTT v5 server includes:
* The Will Message and the Will Delay Interval
* If the Session is currently not connected, the time at which the Session
  will end and Session State will be discarded."

The Session Expiry Interval picked by the server and sent to the client
in the CONNACK is the minimum of max_session_expiry_interval_secs and
the requested Session Expiry Interval by the client in CONNECT.

This commit favours dynamically changing the queue argument x-expires
over creating millions of different policies since that many policies
will come with new scalability issues.

Dynamically changing queue arguments is not allowed by AMQP 0.9.1
clients. However, it should be perfectly okay for the MQTT plugin to do
so for the queues it manages. MQTT clients are not aware that these
queues exist.
2023-06-21 17:14:08 +01:00
David Ansari f1f8167ec4 Add MQTT v5 feature Maximum Packet Size set by server
"Allow the Client and Server to independently specify the maximum
packet size they support. It is an error for the session partner
to send a larger packet."

This commit implements the part where the Server specifies the maximum
packet size.

"In the case of an error in a CONNECT packet it MAY send a CONNACK
packet containing the Reason Code, before closing the Network
Connection. In the case of an error in any other packet it SHOULD send a
DISCONNECT packet containing the Reason Code before closing the Network
Connection."

This commit implements only the "SHOULD" (second) part, not the "MAY"
(first) part.

There are now 2 different global wide MQTT settings on the server:
1. max_packet_size_unauthenticated which applies to the CONNECT packet
   (and maybe AUTH packet in the future)
2. max_packet_size_authenticated which applies to all other MQTT
   packets (that is, after the client successfully authenticated).

These two settings will apply to all MQTT versions.
In MQTT v5, if a non-CONNECT packet is too large, the server will send a
DISCONNECT packet to the client with Reason Code "Packet Too Large"
before closing the network connection.
2023-06-21 17:14:08 +01:00
David Ansari 49f1071591 Add MQTT v5 feature Maximum Packet Size set by client
"Allow the Client and Server to independently specify the maximum
packet size they support. It is an error for the session partner
to send a larger packet."

This commit implements the part where the Client specifies the maximum
packet size.

As per protocol spec, instead of sending, the server drops the MQTT packet
if it's too large.
A debug message is logged for "infrequent" packet types.

For PUBLISH packets, the messages is rejected to the queue such that it
will be dead lettered, if dead lettering is configured.
At the very least, Prometheus metrics for dead lettered messages will
be increased, even if dead lettering is not configured.
2023-06-21 17:14:08 +01:00
David Ansari c44b546f73 Test MQTT v5 in existing MQTT suites 2023-06-21 17:14:08 +01:00
David Ansari be6ff92692 Serialise and parse MQTT 5.0 packets 2023-06-21 17:14:08 +01:00
Rin Kuryloski 8de8f59d47 Use gazelle generated bazel files
Bazel build files are now maintained primarily with `bazel run
gazelle`. This will analyze and merge changes into the build files as
necessitated by certain code changes (e.g. the introduction of new
modules).

In some cases there hints to gazelle in the build files, such as `#
gazelle:erlang...` or `# keep` comments. xref checks on plugins that
depend on the cli are a good example.
2023-04-17 18:13:18 +02:00
Rin Kuryloski 8a7eee6a86 Ignore warnings when building plt files for dependencies
As we don't generally care if a dependency has warnings, only the
target
2023-04-17 10:09:24 +02:00
David Ansari 942c4ab542 Add missing BEAM file
to fix failing Bazel test
2023-03-02 10:25:07 +01:00
Alexey Lebedeff 949b53543d Fix all dependencies for the dialyzer
This is the latest commit in the series, it fixes (almost) all the
problems with missing and circular dependencies for typing.

The only 2 unsolved problems are:

- `lg` dependency for `rabbit` - the problem is that it's the only
  dependency that contains NIF. And there is no way to make dialyzer
  ignore it - looks like unknown check is not suppressable by dialyzer
  directives. In the future making `lg` a proper dependency can be a
  good thing anyway.

- some missing elixir function in `rabbitmq_cli` (CSV, JSON and
  logging related).

- `eetcd` dependency for `rabbitmq_peer_discovery_etcd` - this one
  uses sub-directories in `src/`, which confuses dialyzer (or our bazel
  machinery is not able to properly handle it). I've tried the latest
  rules_erlang which flattens directory for .beam files, but it wasn't
  enough for dialyzer - it wasn't able to find core erlang files. This
  is a niche plugin and an unusual dependency, so probably not worth
  investigating further.
2023-02-13 17:37:44 +01:00
David Ansari bd50a41d67 Decrease MQTT mailbox_soft_limit
Let's decrease the mailbox_soft_limit from 1000 to 200.
Obviously, both values are a bit arbitrary.
However, MQTT workloads usually do not have high throughput patterns for
a single MQTT connection. The only valid scenario where an MQTT
connections' process mailbox could have many messages is in large fan-in
scenarios where many MQTT devices sending messages at once to a single MQTT
device - which is rather unusual.

It makes more sense to protect against cluster wide memory alarms by
decreasing the mailbox_soft_limit.
2023-02-09 19:18:34 +01:00
David Ansari 3432dcb161 Shard shared_SUITE
such that MQTT and WebMQTT tests of the shared_SUITE can run in parallel.
Before this commit, the shared_SUITE runs 14 minutes, after this commit
the shared_SUITE runs 4 minutes in GitHub actions.
2023-02-07 16:36:08 +01:00
David Ansari 9db8626abf Re-enable dialyzer option Wunmatched_returns 2023-01-24 17:32:59 +00:00
David Ansari a4db85de0d Make pipeline fail when there are dialyzer warnings
We want the build to fail if there are any dialyzer warnings in
rabbitmq_mqtt or rabbitmq_web_mqtt. Otherwise we rely on people manually
executing and checking the results of dialyzer.

Also, we want any test to fail that is flaky.
Flaky tests can indicate subtle errors in either test or program execution.
Instead of marking them as flaky, we should understand and - if possible -
fix the underlying root cause.

Fix OTP 25.0 dialyzer warning

Type gen_server:format_status() is known in OTP 25.2, but not in 25.0
2023-01-24 17:32:59 +00:00
David Ansari d651f87ea7 Share tests between MQTT and Web MQTT
New test suite deps/rabbitmq_mqtt/test/shared_SUITE contains tests that
are executed against both MQTT and Web MQTT.

This has two major advantages:
1. Eliminates test code duplication between rabbitmq_mqtt and
rabbitmq_web_mqtt making the tests easier to maintain and to understand.
2. Increases test coverage of Web MQTT.

It's acceptable to add a **test** dependency from rabbitmq_mqtt to
rabbitmq_web_mqtt. Obviously, there should be no such dependency
for non-test code.
2023-01-24 17:32:59 +00:00
David Ansari 56e97a9142 Fix MQTT in management plugin
1. Allow to inspect an (web) MQTT connection.
2. Show MQTT client ID on connection page as part of client_properties.
3. Handle force_event_refresh (when management_plugin gets enabled
   after (web) MQTT connections got created).
4. Reduce code duplication between protocol readers.
5. Display '?' instead of 'NaN' in UI for absent queue metrics.
6. Allow an (web) MQTT connection to be closed via management_plugin.

For 6. this commit takes the same approach as already done for the stream
plugin:
The stream plugin registers neither with {type, network} nor {type,
direct}.
We cannot use gen_server:call/3 anymore to close the connection
because the web MQTT connection cannot handle gen_server calls (only
casts).
Strictly speaking, this commit requires a feature flag to allow to force
closing stream connections from the management plugin during a rolling
update. However, given that this is rather an edge case, and there is a
workaround (connect to the node directly hosting the stream connection),
this commit will not introduce a new feature flag.
2023-01-24 17:30:10 +00:00
David Ansari 97fefff0fe Add overflow drop-head to rabbit_mqtt_qos_queue type
Traditionally, queue types implement flow control by keeping state in
both sending and receiving Erlang processes (for example credit based flow
control keeps the number of credits within the process dictionary).

The rabbit_mqtt_qos0_queue cannot keep such state in sending or receiving
Erlang process because such state would consume a large amount of memory
in case of large fan-ins or large fan-outs.
The whole idea of the rabbit_mqtt_qos_queue type is to not keep any
state in the rabbit_queue_type client. This makes this new queue
type scale so well.

Therefore the new queue type cannot (easily) implement flow control
throttling individual senders.

In this commit, we take a different approach:
Instead of implementing flow control throttling individual senders,
the receiving MQTT connection process drops QoS 0 messages from the
rabbit_mqtt_qos_queue if it is overflowed with messages AND its MQTT
client is not capable of receiving messages fast enough.

This is a simple and sufficient solution because it's better to drop QoS
0 (at most once) messages instead of causing cluster-wide memory alarms.

The user can opt out of dropping messages by setting the new env setting
mailbox_soft_limit to 0.

Additionally, we reduce the send_timeout from 30 seconds default in
Ranch to 15 seconds default in MQTT. This will detect hanging MQTT
clients faster causing the MQTT connection to be closed.
2023-01-24 17:30:10 +00:00
David Ansari 3980c28596 Allow higher load on Mnesia by default
Prior to this commit, when connecting or disconnecting many thousands of
MQTT subscribers, RabbitMQ printed many times:
```
[warning] <0.241.0> Mnesia('rabbit@mqtt-rabbit-1-server-0.mqtt-rabbit-1-nodes.default'): ** WARNING ** Mnesia is overloaded: {dump_log,write_threshold}
```

Each MQTT subscription causes queues and bindings to be written into Mnesia.

In order to allow for higher Mnesia load, the user can configure
```
[
 {mnesia,[
  {dump_log_write_threshold, 10000}
 ]}
].
```
in advanced.config

or set this value via
```
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-mnesia dump_log_write_threshold 10000"
```

The Mnesia default for dump_log_write_threshold is 1,000.
The Mnesia default for dump_log_time_threshold is 180,000 ms.

It is reasonable to increase the default for dump_log_write_threshold from
1,000 to 5,000 and in return decrease the default dump_log_time_threshold
from 3 minutes to 1.5 minutes.
This way, users can achieve higher MQTT scalability by default.

This setting cannot be changed at Mnesia runtime, it needs to be set
before Mnesia gets started.
Since the rabbitmq_mqtt plugin can be enabled dynamically after Mnesia
started, this setting must therefore apply globally to RabbitMQ.

Users can continue to set their own defaults via advanced.config or
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS. They continue to be respected
as shown by the new test suite included in this commit.
2023-01-24 17:30:10 +00:00