Commit Graph

1421 Commits

Author SHA1 Message Date
David Ansari b1eb354385 Strictly validate annotations 2024-09-18 12:42:27 +02:00
Michael Klishin 4805e31e37
Merge pull request #12317 from rabbitmq/md/khepri/mqtt-fixes
Handle database timeouts in MQTT queue deletion
2024-09-16 19:00:09 -04:00
Michael Davis a9c48ef951
rabbit_mqtt_processor: Handle failures to delete a queue 2024-09-16 14:43:27 -04:00
Michael Davis 9627903716
rabbit_queue_type: Add `{error,timeout}` to delete/4 callback spec
This return value was already possible since a classic queue will return
it during termination if `rabbit_amqqueue:internal_delete/2` fails with
that value.

`rabbit_amqqueue:delete/4` already handles this value and converts it
into a protocol error and channel exit. The other caller (MQTT
processor) will be updated in a child commit.

This commit also replaces eager conversions to protocol errors in
rabbit_classic_queue, rabbit_quorum_queue and rabbit_stream_coordinator:
we should return `{error, timeout}` consistently and not hide it in
protocol errors.
2024-09-16 14:43:24 -04:00
dependabot[bot] 0d6916e3c5
build(deps-dev): bump com.rabbitmq:amqp-client
Bumps [com.rabbitmq:amqp-client](https://github.com/rabbitmq/rabbitmq-java-client) from 5.21.0 to 5.22.0.
- [Release notes](https://github.com/rabbitmq/rabbitmq-java-client/releases)
- [Commits](https://github.com/rabbitmq/rabbitmq-java-client/compare/v5.21.0...v5.22.0)

---
updated-dependencies:
- dependency-name: com.rabbitmq:amqp-client
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-09-16 18:42:53 +00:00
Michal Kuratczyk f0f7500f6a
Revert "Log errors from `ranch:handshake`" (#12304)
This reverts commit 620fff22f1.

It intoduced a regression in another area - a TCP health check,
such as the default (with cluster-operator) readinessProbe,
on a TLS-enabled instance would log a `rabbit_reader` crash
every few seconds:
```
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0>   crasher:
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0>     initial call: rabbit_reader:init/3
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0>     pid: <0.999.0>
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0>     registered_name: []
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0>     exception error: no match of right hand side value {error, handshake_failed}
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0>       in function  rabbit_reader:init/3 (rabbit_reader.erl, line 171)
```
2024-09-13 17:07:57 +02:00
Michael Davis c37b192beb
Handle Khepri timeouts when deleting MQTT QOS0 queues 2024-09-11 12:18:13 -04:00
Michael Klishin 606a65169f Style, wording 2024-09-03 01:31:44 -04:00
Marcial Rosales 1abc4ed02f Extract client_id from client cert 2024-08-30 11:39:48 +01:00
Michal Kuratczyk 8a03975ba7
Set the default vm_memory_high_watermark to 0.6 (#12161)
The default of 0.4 was very conservative even when it was
set years ago. Since then:
- we moved to CQv2, which have much more predictable memory usage than (non-lazy) CQv1 used to
- we removed CQ mirroring which caused large sudden memory spikes in some situations
- we removed the option to store message payload in memory in quorum queues

For the past two years or so, we've been running all our internal tests and benchmarks
using the value of 0.8 with no OOMkills at all (note: we do this on
Kubernetes where the Cluster Operators overrides the available memory
levaing some additional headroom, but effectively we are still using  more than
0.6 of memory).
2024-08-29 12:10:49 +02:00
D Corbacho afa28cbdb3
Merge pull request #12118 from rabbitmq/issue-11985
MQTT and Streams: handle connection shutdown via CLI command gracefully
2024-08-28 14:27:19 +02:00
David Ansari 8c905b9009 Avoid crash in stream connection
1.
Prior to this commit, closing a stream connection via:
```
./sbin/rabbitmqctl close_all_user_connections guest enough
```
crashed the stream process as follows:
```
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0>   crasher:
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0>     initial call: rabbit_stream_reader:init/1
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0>     pid: <0.1098.0>
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0>     registered_name: []
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0>     exception error: no function clause matching
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0>                      rabbit_stream_reader:open({call,
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0>                                                 {<0.1233.0>,
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0>                                                  #Ref<0.519694519.1387790337.15898>}},
2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0>                                                {shutdown,<<"enough">>},
```

This commit fixes this crash.

2.
Both CLI commands and management plugin use the same way
to close MQTT, Web MQTT, and Stream connections: They all send a message
via `Pid ! {shutdown, Reason}` to the connection.

3.
This commit avoids making `rabbit` core app to know about
'Web MQTT'.

4
This commit simplifies rabbit_mqtt_reader by avoiding another
handle_call clause
2024-08-28 13:19:10 +02:00
David Ansari 69d407e6b6 Simplify test cases
1. Only run the CLI tests on a single node cluster. The shared_SUITE is
   already very big. Testing the same CLI commands against node-0 on a
   3-node cluster brings no benefit.
2. Move the two new CLI test cases in front of
   management_plugin_connection because they are similar in that all
   three tests close the MQTT connection.
3. There is no need to query the HTTP API for the two new CLI test
   cases.
4. There is no need to set keepalive in the two new CLI test cases.
2024-08-28 13:18:45 +02:00
dependabot[bot] e3caab47af
Bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.4.0 to 3.5.0.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.4.0...surefire-3.5.0)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-08-27 18:09:54 +00:00
Diana Parra Corbacho ea6ef17cc0 Mqtt: test close connection 2024-08-27 16:44:18 +02:00
Diana Parra Corbacho 494c1b8209 mqtt: handle connection shutdown
`{shutdown, Reason}` must be handled into handle_call and not handle_info
`rabbitmqctl close_all_user_connections` calls rabbit_reader which does
a call into the process, the same as rabbitmq_management
2024-08-26 09:44:39 +02:00
dependabot[bot] 8147006c6d
Bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.3.1 to 3.4.0.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.3.1...surefire-3.4.0)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-08-19 19:02:14 +00:00
Michael Davis 140abd871a
Merge pull request #11980 from rabbitmq/md/khepri-minority-errors/queue-declaration 2024-08-15 14:26:08 -05:00
David Ansari ba14b158af Remove mqtt.default_user and mqtt.default_pass
This commit is a breaking change in RabbitMQ 4.0.

 ## What?
Remove mqtt.default_user and mqtt.default_pass
Instead, rabbit.anonymous_login_user and rabbit.anonymous_login_pass
should be used.

 ## Why?
RabbitMQ 4.0 simplifies anonymous logins.
There should be a single configuration place
```
rabbit.anonymous_login_user
rabbit.anonymous_login_pass
```
that is used for anonymous logins for any protocol.

Anonymous login is orthogonal to the protocol the client uses.
Hence, there should be a single configuration place which can then be
used for MQTT, AMQP 1.0, AMQP 0.9.1, and RabbitMQ Stream protocol.

This will also simplify switching to SASL for MQTT 5.0 in the future.
2024-08-15 10:58:48 +00:00
dependabot[bot] 93c6a28d06
Bump org.junit.jupiter:junit-jupiter
Bumps [org.junit.jupiter:junit-jupiter](https://github.com/junit-team/junit5) from 5.10.3 to 5.11.0.
- [Release notes](https://github.com/junit-team/junit5/releases)
- [Commits](https://github.com/junit-team/junit5/compare/r5.10.3...r5.11.0)

---
updated-dependencies:
- dependency-name: org.junit.jupiter:junit-jupiter
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-08-14 18:36:10 +00:00
Michael Davis 8889d40a92
Handle database timeouts when declaring queues
This fixes a case-clause crash in the logs in `cluster_minority_SUITE`.
When the database is not available `rabbit_amqqueue:declare/6,7` should
return a `protocol_error` record with an error message rather than a
hard crash. Also included in this change is the necessary changes to
typespecs: `rabbit_db_queue:create_or_get/1` is the first function to
return a possible `{error,timeout}`. That bubbles up through
`rabbit_amqqueue:internal_declare/3` and must be handled in each
`rabbit_queue_type:declare/2` callback.
2024-08-12 16:16:57 -04:00
Karl Nilsson 194d4ba2f5
Quorum queues v4 (#10637)
This commit contains the following new quorum queue features:

* Fair share high/low priorities
* SAC consumers honour consumer priorities
* Credited consumer refactoring to meet AMQP requirements.
* Use checkpoints feature to reduce memory use for queues with long backlogs
 * Consumer cancel option that immediately removes consumer and returns all pending messages.
 * More compact commands of the most common commands such as enqueue, settle and credit
 * Correctly track the delivery-count to be compatible with the AMQP spec
 * Support the "modified" AMQP 1.0 outcome better.

Commits:

* Quorum queues v4 scaffolding.

Create the new version but not including any changes yet.

QQ: force delete followers after leader has terminated.

Also try a longer sleep for mqtt_shared_SUITE so that the
delete operation stands a chance to time out and move on
to the forced deletion stage.

In some mixed machine version scenarios some followers will never
apply the poison pill command so we may as well force delete them
just in case.

QQ: skip test in amqp_client that cannot pass with mixed machine versions

QQ: remove dead code

Code relating to prior machine versions and state conversions.

rabbit_fifo_prop_SUITE fixes

* QQ: add v4 ff and new more compact enqueue command.

Also update rabbit_fifo_* suites to test more relevant code versions
where applicable.

QQ: always use the updated credit mode format

QQv4: use more compact consumer reference in settle, credit, return

This introudces a new type: consumer_key() which is either the consumer_id
or the raft index the checkout was processed at. If the consumer is
using one of the updated credit spec formats rabbit_fifo will use the
raft index as the primary key for the consumer such that the rabbit
fifo client can then use the more space efficient integer index
instead of the full consumer id in subsequent commands.

There is compatibility code to still accept the consumer id in
settle, return, discard and credit commands but this is slighlyt
slower and of course less space efficient.

The old form will be used in cases where the fifo client may have
already remove the local consumer state (as happens after a cancel).

Lots of test refactorings of the rabbit_fifo_SUITE to begin to use
the new forms.

* More test refactoring and new API fixes

rabbit_fifo_prop_SUITE refactoring and other fixes.


* First pass SAC consumer priority implementation.

Single active consumers will be activated if they have a higher priority
than the currently active consumer. if the currently active consumer
has pending messages, no further messages will be assigned to the
consumer and the activation of the new consumer will happen once
all pending messages are settled. This is to ensure processing order.

Consumers with the same priority will internally be ordered to
favour those with credit then those that attached first.

QQ: add SAC consumer priority integration tests

QQ: add check for ff in tests

* QQ: add new consumer cancel option: 'remove'

This option immediately removes and returns all messages for a
consumer instead of the softer 'cancel' option which keeps the
consumer around until all pending messages have been either
settled or returned.

This involves a change to the rabbit_queue_type:cancel/5 API
to rabbit_queue_type:cancel/3.

* QQ: capture checked out time for each consumer message.

This will form the basis for queue initiated consumer timeouts.

* QQ: Refactor to use the new ra_machine:handle_aux/5 API

Instead of the old ra_machine:handle_aux/6 callback.

* QQ hi/lo priority queue

* QQ: Avoid using mc:size/1 inside rabbit_fifo

As we dont want to depend on external functions for things that may
change the state of the queue.

* QQ bug fix: Maintain order when returning multiple

Prior to this commit, quorum queues requeued messages in an undefined
order, which is wrong.

This commit fixes this bug and requeues messages always in the order as
nacked / rejected / released by the client.

We ensure that order of requeues is deterministic from the client's
point of view and doesn't depend on whether the quorum queue soft limit
was exceeded temporarily.
So, even when rabbit_fifo_client batches requeues, the order as nacked
by the client is still maintained.

* Simplify

* Add rabbit_quorum_queue:file_handle* functions back.

For backwards compat.

* dialyzer fix

* dynamic_qq_SUITE: avoid mixed versions failure.

* QQ: track number of requeues for message.

To be able to calculate the correct value for the AMQP delivery_count
header we need to be able to distinguish between messages that were
"released" or returned in QQ speak and those that were returned
due to errors such as channel termination.

This commit implement such tracking as well as the calculation
of a new mc annotations `delivery_count` that AMQP makes use
of to set the header value accordingly.

* Use QQ consumer removal when AMQP client detaches

This enables us to unskip some AMQP tests.

* Use AMQP address v2 in fsharp-tests

* QQ: track number of requeues for message.

To be able to calculate the correct value for the AMQP delivery_count
header we need to be able to distinguish between messages that were
"released" or returned in QQ speak and those that were returned
due to errors such as channel termination.

This commit implement such tracking as well as the calculation
of a new mc annotations `delivery_count` that AMQP makes use
of to set the header value accordingly.

* rabbit_fifo: Use Ra checkpoints

* quorum queues: Use a custom interval for checkpoints

* rabbit_fifo_SUITE: List actual effects in ?ASSERT_EFF failure

* QQ: Checkpoints modifications

* fixes

* QQ: emit release cursors on tick for followers and leaders

else followers could end up holding on to segments a bit longer
after traffic stops.

* Support draining a QQ SAC waiting consumer

By issuing drain=true, the client says "either send a transfer or a flow frame".
Since there are no messages to send to an inactive consumer, the sending
queue should advance the delivery-count consuming all link-credit and send
a credit_reply with drain=true to the session proc which causes the session
proc to send a flow frame to the client.

* Extract applying #credit{} cmd into 2 functions

This commit is only refactoring and doesn't change any behaviour.

* Fix default priority level

Prior to this commit, when a message didn't have a priority level set,
it got enqueued as high prio.

This is wrong because the default priority is 4 and
"for example, if 2 distinct priorities are implemented,
then levels 0 to 4 are equivalent, and levels 5 to 9 are equivalent
and levels 4 and 5 are distinct."
Hence, by default a message without priority set, must be enqueued as
low prio.

* bazel run gazelle

* Avoid deprecated time unit

* Fix aux_test

* Delete dead code

* Fix rabbit_fifo_q:get_lowest_index/1

* Delete unused normalize functions

* Generate less garbage

* Add integration test for QQ SAC with consumer priority

* Improve readability

* Change modified outcome behaviour

With the new quorum queue v4 improvements where a requeue counter was
added in addition to the quorum queue delivery counter, the following
sentence from https://github.com/rabbitmq/rabbitmq-server/pull/6292#issue-1431275848
doesn't apply anymore:

> Also the case where delivery_failed=false|undefined requires the release of the
> message without incrementing the delivery_count. Again this is not something
> that our queues are able to do so again we have to reject without requeue.

Therefore, we simplify the modified outcome behaviour:
RabbitMQ will from now on only discard the message if the modified's
undeliverable-here field is true.

* Introduce single feature flag rabbitmq_4.0.0

 ## What?

Merge all feature flags introduced in RabbitMQ 4.0.0 into a single
feature flag called rabbitmq_4.0.0.

 ## Why?

1. This fixes the crash in
https://github.com/rabbitmq/rabbitmq-server/pull/10637#discussion_r1681002352
2. It's better user experience.

* QQ: expose priority metrics in UI

* Enable skipped test after rebasing onto main

* QQ: add new command "modify" to better handle AMQP modified outcomes.

This new command can be used to annotate returned or rejected messages.

This commit also retains the delivery-count across dead letter boundaries
such that the AMQP header delivery-count field can now include _all_ failed
deliver attempts since the message was originally received.

Internally the quorum queue has moved it's delivery_count header to
only track the AMQP protocol delivery attempts and now introduces
a new acquired_count to track all message acquisitions by consumers.

* Type tweaks and naming

* Add test for modified outcome with classic queue

* Add test routing on message-annotations in modified outcome

* Skip tests in mixed version tests

Skip tests in mixed version tests because feature flag
rabbitmq_4.0.0 is needed for the new #modify{} Ra command
being sent to quorum queues.

---------

Co-authored-by: David Ansari <david.ansari@gmx.de>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
2024-08-08 08:48:27 +01:00
David Ansari 8ba36492ad Reduce Khepri test flakes
Test case rabbit_mqtt_qos0_queue_kill_node flaked because after an
MQTT client subscribes on node 0, RabbitMQ returns success
and replicated the new binding to node 0 and node 1, but not
yet to node 2. Another MQTT client then publishes on node 2
without the binding being present yet on node 2, and the
message therefore isn't routed.

This commit attempts to eliminate this flake.
It adds a function to rabbit_ct_broker_helpers which waits until a given
node has caught up with the leader node.
We can reuse that function in future to eliminate more test flakes.
2024-08-06 22:01:17 +02:00
David Ansari 80ff6d0224 Close MQTT connection with delay when authentication fails
For consistency with other protocols (to protect from potential DoS attacks).
Wrong credentials and virtual host access errors trigger the delay.

References #11831

We keep the delay low when running tests. Otherwise,
```
make -C deps/rabbitmq_mqtt ct-auth
```
would run 3 minutes longer (with a SILENT_CLOSE_DELAY of 3 seconds).
2024-08-05 15:49:42 +00:00
David Ansari 93d1ac9bb8 Speed up AMQP connection and session (de)registration
## What?

Prior to this commit connecting 40k AMQP clients with 5 sessions each,
i.e. 200k sessions in total, took 7m55s.

After to this commit the same scenario takes 1m37s.

Additionally, prior to this commit, disconnecting all connections and sessions
at once caused the pg process to become overloaded taking ~14 minutes to
process its mailbox.

After this commit, these same deregistrations take less than 5 seconds.

To repro:
```go

package main

import (
	"context"
	"log"
	"time"

	"github.com/Azure/go-amqp"
)

func main() {
	for i := 0; i < 40_000; i++ {
		if i%1000 == 0 {
			log.Printf("opened %d connections", i)
		}
		conn, err := amqp.Dial(
			context.TODO(),
			"amqp://localhost",
			&amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
		if err != nil {
			log.Fatal("open connection:", err)
		}
		for j := 0; j < 5; j++ {
			_, err = conn.NewSession(context.TODO(), nil)
			if err != nil {
				log.Fatal("begin session:", err)
			}
		}
	}
	log.Println("opened all connections")
	time.Sleep(5 * time.Hour)
}
```

 ## How?

This commit uses separate pg scopes (that is processes and ETS tables) to register
AMQP connections and AMQP sessions. Since each Pid is now its own group,
registration and deregistration is fast.
2024-08-02 13:46:30 +02:00
David Ansari 7fb78338c6 Disconnect MQTT client when its credential expires
Fixes https://github.com/rabbitmq/rabbitmq-server/discussions/11854
Fixes https://github.com/rabbitmq/rabbitmq-server/issues/11862

This commit uses the same approach as implemented for AMQP 1.0 and
Streams: When a token expires, RabbitMQ will close the connection.
2024-07-30 19:55:46 +02:00
David Ansari 4e3ff2c8ef Delete leftover code
This code should have been deleted as part of
https://github.com/rabbitmq/rabbitmq-server/pull/11642
2024-07-30 18:47:09 +02:00
Arnaud Cogoluègnes 58d835ba5d
MQTT auth_SUITE: terminate setup process
Configuring the mock authentication backend blocks
and generates an error in the test process when the
broker goes down. The error report makes the test fail
in some environments.

The process where the setup takes place must stay up
otherwise the ETS table used will go away.

This commit makes sure the broker-side authentication backend
setup returns at the end of the test. This way the calling
process terminates in a normal way.
2024-07-17 15:36:20 +02:00
David Ansari d4ea90d777 Simplify
Protect only `rabbit_mqtt_processor:handle_queue_event/2` since only
that call might throw a `{send_failed, Reaso}`.
2024-07-12 14:40:35 +02:00
Michael Klishin 0af892396a
Merge pull request #11680 from rabbitmq/dependabot/maven/deps/rabbitmq_mqtt/test/java_SUITE_data/main/org.apache.maven.plugins-maven-surefire-plugin-3.3.1
build(deps): bump org.apache.maven.plugins:maven-surefire-plugin from 3.3.0 to 3.3.1 in /deps/rabbitmq_mqtt/test/java_SUITE_data
2024-07-11 16:11:35 -04:00
dependabot[bot] eaf24e6b1f
build(deps): bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.3.0 to 3.3.1.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.3.0...surefire-3.3.1)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-07-11 18:40:32 +00:00
Luke Bakken 994008aa7f Catch abrupt TCP closure when processing `queue_event`
Reported here:
https://groups.google.com/g/rabbitmq-users/c/4AOwZrQyekI
2024-07-11 09:36:44 -07:00
David Ansari 50116f0927 Require MQTT feature flags in 4.0
Require all MQTT feature flags and remove their compatibility code:
* delete_ra_cluster_mqtt_node
* rabbit_mqtt_qos0_queue
* mqtt_v5

These feature flags were introduced in or before 3.13.0.
2024-07-10 10:27:59 +02:00
dependabot[bot] b0b857b0a4
build(deps-dev): bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.26.0 to 3.26.3.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.26.0...assertj-build-3.26.3)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-07-09 18:52:14 +00:00
David Ansari 19523876cd Make AMQP address v2 format user friendly
This commit is a follow up of https://github.com/rabbitmq/rabbitmq-server/pull/11604

This commit changes the AMQP address format v2 from
```
/e/:exchange/:routing-key
/e/:exchange
/q/:queue
```
to
```
/exchanges/:exchange/:routing-key
/exchanges/:exchange
/queues/:queue
```

Advantages:
1. more user friendly
2. matches nicely with the plural forms of HTTP API v1 and HTTP API v2

This plural form is still non-overlapping with AMQP address format v1.

Although it might feel unusual at first to send a message to `/queues/q1`,
if you think about `queues` just being a namespace or entity type, this
address format makes sense.
2024-07-04 14:33:05 +02:00
David Ansari 0de9591050 Use different AMQP address format for v1 and v2
to distinguish between v1 and v2 address formats.

Previously, v1 and v2 address formats overlapped and behaved differently
for example for:
```
/queue/:queue
/exchange/:exchange
```

This PR changes the v2 format to:
```
/e/:exchange/:routing-key
/e/:exchange
/q/:queue
```
to distinguish between v1 and v2 addresses.

This allows to call `rabbit_deprecated_features:is_permitted(amqp_address_v1)`
only if we know that the user requests address format v1.

Note that `rabbit_deprecated_features:is_permitted/1` should only
be called when the old feature is actually used.

Use percent encoding / decoding for address URI format v2.
This allows to use any UTF-8 encoded characters including slashes (`/`)
in routing keys, exchange names, and queue names and is more future
safe.
2024-07-03 16:36:03 +02:00
Loïc Hoguin a561b45dcf
Merge pull request #11573 from rabbitmq/loic-more-make
Another make PR
2024-07-01 14:27:04 +02:00
David Ansari 1bc1e4ae34 Remove HA failover for classic mirrored queue
This is a follow-up commit of what has been missed in
* https://github.com/rabbitmq/rabbitmq-server/pull/9815, and
* https://github.com/rabbitmq/rabbitmq-server/pull/11583
2024-07-01 11:18:01 +02:00
David Ansari 25bd2f9bf7
Merge pull request #11574 from rabbitmq/remove-consumer-api
Classic queue removes consumer
2024-06-28 17:24:12 +02:00
Loïc Hoguin bbfa066d79
Cleanup .gitignore files for the monorepo
We don't need to duplicate so many patterns in so many
files since we have a monorepo (and want to keep it).

If I managed to miss something or remove something that
should stay, please put it back. Note that monorepo-wide
patterns should go in the top-level .gitignore file.
Other .gitignore files are for application or folder-
specific patterns.
2024-06-28 12:00:52 +02:00
dependabot[bot] af2a944b86
build(deps-dev): bump org.junit.jupiter:junit-jupiter
Bumps [org.junit.jupiter:junit-jupiter](https://github.com/junit-team/junit5) from 5.10.2 to 5.10.3.
- [Release notes](https://github.com/junit-team/junit5/releases)
- [Commits](https://github.com/junit-team/junit5/compare/r5.10.2...r5.10.3)

---
updated-dependencies:
- dependency-name: org.junit.jupiter:junit-jupiter
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-06-27 18:46:15 +00:00
David Ansari 19670b625b Copy remove consumer API from qq-v4 2024-06-27 15:09:39 +02:00
Loïc Hoguin 9f15e978b1
make: Remove xrefr
It is no longer used by Erlang.mk.
2024-06-25 13:08:08 +02:00
Karl Nilsson 077c027d52
MQTT: speed up shared_SUITE:many_qos1_messages (#11477)
* MQTT: speed up shared_SUITE:many_qos1_messages

* speed up block_only_publisher

* MQTT: reorganise tests groups

To avoid starting a new broker for each protocol group (v3,v4,v5).

Instead we run all protocol groups under a single cluster configuration
group.

* MQTT: speed up publish_to_all_queue_types_qos* tests.

* Remove separate mnesia_store group

to speed up the test suite

* Fix wrong_shard_count

* Remove unused field

* Run subset of v3 tests

The code being tested under v3 and v4 is almost identical.
To save time in CI, we therefore run only a very small subset of tests in v3.

This cuts the total time reported by CT for the shared_SUITE from 898
seconds to 614 seconds.

Also, the java_SUITE tests in v3.

* Fix wrong_shard_count

* Fix mixed version failure

---------

Co-authored-by: David Ansari <david.ansari@gmx.de>
2024-06-19 20:02:33 +02:00
Michal Kuratczyk 27f735f49e
Use emqx/emqtt instead of a fork (#11479)
* Use emqx/emqtt instead of a fork
* Specify SNI in test connections (otherwise OTP26 secure TLS defaults make some tests fail)
2024-06-19 14:03:30 +02:00
Rin Kuryloski 5debebfaf3 Use rules_elixir to build the cli without mix
Certain elixir-native deps are still build with mix, but this can be
corrected later
2024-06-18 14:50:34 +02:00
Karl Nilsson 1abf7a3886
Upgrade maven for MQTT java tests. (#11465) 2024-06-17 19:06:40 +02:00
dependabot[bot] 9bf8e13159
build(deps): bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.2.5 to 3.3.0.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.2.5...surefire-3.3.0)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-06-14 18:11:00 +00:00
Karl Nilsson ebbff46c96
Merge pull request #11307 from rabbitmq/amqp-flow-control-poc-2
Introduce outbound RabbitMQ internal AMQP flow control
2024-06-05 15:01:05 +01:00
David Ansari d70e529d9a Introduce outbound RabbitMQ internal AMQP flow control
## What?

Introduce RabbitMQ internal flow control for messages sent to AMQP
clients.

Prior this PR, when an AMQP client granted a large amount of link
credit (e.g. 100k) to the sending queue, the sending queue sent
that amount of messages to the session process no matter what.
This becomes problematic for memory usage when the session process
cannot send out messages fast enough to the AMQP client, especially if
1. The writer proc cannot send fast enough. This can happen when
the AMQP client does not receive fast enough and causes TCP
back-pressure to the server. Or
2. The server session proc is limited by remote-incoming-window.

Both scenarios are now added as test cases.
Tests
* tcp_back_pressure_rabbitmq_internal_flow_quorum_queue
* tcp_back_pressure_rabbitmq_internal_flow_classic_queue
cover scenario 1.

Tests
* incoming_window_closed_rabbitmq_internal_flow_quorum_queue
* incoming_window_closed_rabbitmq_internal_flow_classic_queue
cover scenario 2.

This PR sends messages from queues to AMQP clients in a more controlled
manner.

To illustrate:
```
make run-broker PLUGINS="rabbitmq_management" RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 4"
observer_cli:start()
mq
```
where `mq` sorts by message queue length.
Create a stream:
```
deps/rabbitmq_management/bin/rabbitmqadmin declare queue name=s1 queue_type=stream durable=true
```
Next, send and receive from the Stream via AMQP.
Grant a large number of link credit to the sending stream:
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
bash-5.1# quiver //host.docker.internal//queue/s1 --durable -d 30s --credit 100000
```

**Before** to this PR:
```
RESULTS

Count ............................................... 100,696 messages
Duration ............................................... 30.0 seconds
Sender rate ......................................... 120,422 messages/s
Receiver rate ......................................... 3,363 messages/s
End-to-end rate ....................................... 3,359 messages/s
```
We observe that all 100k link credit worth of messages are buffered in the
writer proc's mailbox:
```
|No | Pid        | MsgQueue  |Name or Initial Call                 |      Memory | Reductions          |Current Function                  |
|1  |<0.845.0>   |100001     |rabbit_amqp_writer:init/1            |  126.0734 MB| 466633491           |prim_inet:send/5                  |
```

**After** to this PR:
```
RESULTS

Count ............................................. 2,973,440 messages
Duration ............................................... 30.0 seconds
Sender rate ......................................... 123,322 messages/s
Receiver rate ........................................ 99,250 messages/s
End-to-end rate ...................................... 99,148 messages/s
```
We observe that the message queue lengths of both writer and session
procs are low.

 ## How?

Our goal is to have queues send out messages in a controlled manner
without overloading RabbitMQ itself.
We want RabbitMQ internal flow control between:
```
AMQP writer proc <--- session proc <--- queue proc
```
A similar concept exists for classic queues sending via AMQP 0.9.1.
We want an approach that applies to AMQP and works generic for all queue
types.

For the interaction between AMQP writer proc and session proc we use a
simple credit based approach reusing module `credit_flow`.

For the interaction between session proc and queue proc, the following options
exist:

 ### Option 1
The session process provides expliclity feedback to the queue after it
has sent N messages.
This approach is implemented in
https://github.com/ansd/rabbitmq-server/tree/amqp-flow-control-poc-1
and works well.
A new `rabbit_queue_type:sent/4` API was added which lets the queue proc know
that it can send further messages to the session proc.

Pros:
* Will work equally well for AMQP 0.9.1, e.g. when quorum queues send messages
  in auto ack mode to AMQP 0.9.1 clients.
* Simple for the session proc

Cons:
* Sligthly added complexity in every queue type implementation
* Multiple Ra commands (settle, credit, sent) to decide when a quorum
  queue sends more messages.

 ### Option 2
A dual link approach where two AMQP links exists between
```
AMQP client <---link--> session proc <---link---> queue proc
```
When the client grants a large amount of credits, the session proc will
top up credits to the queue proc periodically in smaller batches.

Pros:
* No queue type modifications required.
* Re-uses AMQP link flow control

Cons:
* Significant added complexity in the session proc. A client can
  dynamically decrease or increase credits and dynamically change the drain
  mode while the session tops up credit to the queue.

 ### Option 3
Credit is a 32 bit unsigned integer.
The spec mandates that the receiver independently chooses a credit.
Nothing in the spec prevents the receiver to choose a credit of 1 billion.
However the credit value is merely a **maximum**:
> The link-credit variable defines the current maximum legal amount that the delivery-count can be increased by.

Therefore, the server is not required to send all available messages to this
receiver.

For delivery-count:
> Only the sender MAY independently modify this field.

"independently" could be interpreted as the sender could add to the delivery-count
irrespective of what the client chose for drain and link-credit.

Option 3: The queue proc could at credit time already consume credit
and advance the delivery-count if credit is too large before checking out any messages.
For example if credit is 100k, but the queue only wants to send 1k, the queue could
consume 99k of credits and advance the delivery-count, and subsequently send maximum 1k messages.
If the queue advanced the delivery-count, RabbitMQ must send a FLOW to the receiver,
otherwise the receiver wouldn’t know that it ran out of link-credit.

Pros:
* Very simple

Cons:
* Possibly unexpected behaviour for receiving AMQP clients
* Possibly poor end-to-end throughput in auto-ack mode because the queue
  would send a batch of messages followed by a FLOW containing the advanced
  delivery-count. Only therafter the client will learn that it ran out of
  credits and top-up again. This feels like synchronously pulling a batch
  of messages. In contrast, option 2 sends out more messages as soon as
  the previous messages left RabbitMQ without requiring again a credit top
  up from the receiver.
* drain mode with large credits requires the queue to send all available
  messages and only thereafter advance the delivery-count. Therefore,
  drain mode breaks option 3 somewhat.

 ### Option 4
Session proc drops message payload when its outgoing-pending queue gets
too large and re-reads payloads from the queue once the message can be
sent (see `get_checked_out` Ra command for quorum queues).

Cons:
* Would need to be implemented for every queue type, especially classic queues
* Doesn't limit the amount of message metadata in the session proc's
  outgoing-pending queue

 ### Decision: Option 2
This commit implements option 2 to avoid any queue type modification.
At most one credit request is in-flight between session process and
queue process for a given queue consumer.
If the AMQP client sends another FLOW in between, the session proc
stashes the FLOW until it processes the previous credit reply.

A delivery is only sent from the outgoing-pending queue if the
session proc is not blocked by
1. writer proc, or
2. remote-incoming-window

The credit reply is placed into the outgoing-pending queue.
This ensures that the session proc will only top up the next batch of
credits if sufficient messages were sent out to the writer proc.

A future commit could additionally have each queue limit the number of
unacked messages for a given AMQP consumer, or alternatively make use
of session outgoing-window.
2024-06-04 13:11:55 +02:00
Diana Parra Corbacho 3bbda5bdba Remove classic mirror queues 2024-06-04 13:00:31 +02:00
David Ansari bd847b8cac Put credit flow config into persistent term
Put configuration credit_flow_default_credit into persistent term such
that the tuple doesn't have to be copied on the hot path.

Also, change persistent term keys from `{rabbit, AtomKey}` to `AtomKey`
so that hashing becomes cheaper.
2024-05-31 16:20:51 +02:00
dependabot[bot] f58f084bef
Bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.25.3 to 3.26.0.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.25.3...assertj-build-3.26.0)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-05-27 18:55:47 +00:00
Simon Unge 19a751890c Remove checks to vhost-limit as that is now handled by rabbit_queue_type:declare
Add new error return tuple when queue limit is exceed
2024-05-27 09:53:54 +02:00
Michal Kuratczyk cfa3de4b2b
Remove unused imports (thanks elp!) 2024-05-23 16:36:08 +02:00
Loïc Hoguin ecf46002e0
Remove availability of CQv1
We reject CQv1 in rabbit.schema as well.

Most of the v1 code is still around as it is needed
for conversion to v2. It will be removed at a later
time when conversion is no longer supported.

We don't shard the CQ property suite anymore:
there's only 1 case remaining.
2024-05-13 13:06:07 +02:00
Karl Nilsson 49a4c66fce Move feature flag check outside of mc
the `mc` module is ideally meant to be kept pure and portable
 and feature flags have external infrastructure dependencies
as well as impure semantics.

Moving the check of this feature flag into the amqp session
simplifies the code (as no message containers with the new
format will enter the system before the feature flag is enabled).
2024-05-13 10:05:40 +02:00
Luke Bakken 620fff22f1
Log errors from `ranch:handshake`
Fixes #11171

An MQTT user encountered TLS handshake timeouts with their IoT device,
and the actual error from `ssl:handshake` / `ranch:handshake` was not
caught and logged.

At this time, `ranch` uses `exit(normal)` in the case of timeouts, but
that should change in the future
(https://github.com/ninenines/ranch/issues/336)
2024-05-06 08:24:38 -07:00
David Ansari 1ecaaadb32
Merge pull request #10964 from rabbitmq/amqp-parser
4.x: change AMQP on disk message format & speed up the AMQP parser
2024-05-03 14:48:23 +02:00
Rin Kuryloski ea4e301510 Avoid leaking nodes in rabbitmq_mqtt shared_SUITE
Test nodes were not torn down in the inner group mnesia_store
2024-05-02 12:26:27 +02:00
David Ansari 9f42e40346 Fix crash when old node receives new AMQP message
Similar to how we convert from mc_amqp to mc_amqpl before
sending to a classic queue or quorum queue process if
feature flag message_containers_store_amqp_v1 is disabled,
we also need to do the same conversion before sending to an MQTT QoS 0
queue on the old node.
2024-05-02 07:56:00 +00:00
David Ansari 6225dc9928 Do not parse entire AMQP body
Prior to this commit the entire amqp-value or amqp-sequence sections
were parsed when converting a message from mc_amqp.
Parsing the entire amqp-value or amqp-sequence section can generate a
huge amount of garbage depending on how large these sections are.

Given that other protocol cannot make use of amqp-value and
amqp-sequence sections anyway, leave them AMQP encoded when converting
from mc_amqp.

In fact prior to this commit, the entire body section was parsed
generating huge amounts of garbage just to subsequently encode it again
in mc_amqpl or mc_mqtt.

The new conversion interface from mc_amqp to other mc_* modules will
either output amqp-data sections or the encoded amqp-value /
amqp-sequence sections.
2024-05-02 07:56:00 +00:00
David Ansari eac469a8a2 Change AMQP header durable to true by default
AMQP 3.2.1 defines durable=false to be the default.

However, the same section also mentions:
> If the header section is omitted the receiver MUST assume the appropriate
> default values (or the meaning implied by no value being set) for the
> fields within the header unless other target or node specific defaults
> have otherwise been set.

We want RabbitMQ to be secure by default, hence in RabbitMQ we set
durable=true to be the default.
2024-05-02 07:56:00 +00:00
David Ansari 81709d9745 Fix MQTT QoS
This commit fixes test
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed -t- \
    --test_sharding_strategy=disabled --test_env \
    FOCUS="-group [mqtt,v3,cluster_size_3] -case pubsub"
```

Fix some mixed version tests

Assume the AMQP body, especially amqp-value section won't be parsed.
Hence, omit smart conversions from AMQP to MQTT involving the
Payload-Format-Indicator bit.

Fix test

Fix
```
bazel test //deps/amqp10_client:system_SUITE-mixed -t- --test_sharding_strategy=disabled --test_env FOCUS="-group [rabbitmq]
```
2024-05-02 07:56:00 +00:00
David Ansari fc7f458f7c Fix tests 2024-05-02 07:56:00 +00:00
David Ansari 312d2af806 Fix AMQP -> MQTT durable conversion 2024-05-02 07:55:59 +00:00
Rin Kuryloski 9ed6155c0d Add elixir to PLT_APPS for some plugins 2024-04-29 15:23:09 +02:00
Rin Kuryloski 6a9d668def Set PLT_APPS in a number of plugins where it was missing 2024-04-29 14:54:28 +02:00
David Ansari e576dd766a Set durable annotation for MQTT messages
This is a follow up to https://github.com/rabbitmq/rabbitmq-server/pull/11012

 ## What?
For incoming MQTT messages, always set the `durable` message container
annotation.

 ## Why?
Even though defaulting to `durable=true` when no durable annotation is
set, as prior to this commit, is good enough, explicitly setting the
durable annotation makes the code a bit more future proof and
maintainable going forward in 4.0 where we will rely more on the durable
annotation because AMQP 1.0 message headers will be omitted in classic
and quorum queues (see https://github.com/rabbitmq/rabbitmq-server/pull/10964)

For MQTT messages, it's important to know whether the message was
published with QoS 0 or QoS 1 because it affects the QoS for the MQTT
message that will delivered to the MQTT subscriber.

The performance impact of always setting the durable annotation is
negligible.
2024-04-22 17:33:10 +02:00
David Ansari bb106ff65c Skip access check on absent will queue
Resolves https://github.com/rabbitmq/rabbitmq-server/discussions/11021

Prior to this commit, an MQTT client that connects to RabbitMQ needed
configure access to its will queue even if the will queue has never
existed. This breaks client apps connecting with either v3 or v4 or with
v5 without making use of the Will-Delay-Interval.

Specifically, in 3.13.0 and 3.13.1 an MQTT client that connects to
RabbitMQ needs unnecessarily configure access to queue
`mqtt-will-<MQTT client ID>`.

This commit only check for configure access, if the queue actually gets
deleted, i.e. if it existed.
2024-04-17 12:50:41 +02:00
David Ansari e96125bfd3 Store MQTT messages as non-durable if QoS 0
By default, when the 'durable' message container (mc) annotation is unset,
messages are interpreted to be durable.

Prior to this commit, MQTT messages that were sent with QoS 0 were
stored durably in classic queues.
This commit takes the same approach for mc_mqtt as for mc_amqpl and mc_amqp:
If the message is durable, the durable mc annotation will not be set.
If the message is non-durable, the durable mc annotation will be set to false.
2024-04-16 11:43:58 +02:00
David Ansari 71d1b3b455 Respect message_interceptors.incoming.set_header_timestamp
When feature flag message_containers is enabled, setting
```
message_interceptors.incoming.set_header_timestamp
```
wasn't respected anymore when a message is published via MQTT to a
stream and subsequently consumed via AMQP 0.9.1.

This commit ensures that AMQP 0.9.1 header timestamp_in_ms will be
set.

Note that we must not modify the AMQP 1.0 properties section when messages
are received via AMQP 1.0 and consumed via AMQP 1.0.
Also, message annoation keys not starting with "x-" are reserved.
2024-04-10 11:40:49 +02:00
dependabot[bot] 23f3ebf381
Bump com.rabbitmq:amqp-client
Bumps [com.rabbitmq:amqp-client](https://github.com/rabbitmq/rabbitmq-java-client) from 5.20.0 to 5.21.0.
- [Release notes](https://github.com/rabbitmq/rabbitmq-java-client/releases)
- [Commits](https://github.com/rabbitmq/rabbitmq-java-client/compare/v5.20.0...v5.21.0)

---
updated-dependencies:
- dependency-name: com.rabbitmq:amqp-client
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-04-08 19:00:59 +00:00
David Ansari 1b75baddd9 Add AMQP 1.0 -> MQTT 5.0 -> AMQP 1.0 test 2024-04-05 15:09:21 +02:00
David Ansari 390d5715a0 Introduce new AMQP 1.0 address format
## What?
Introduce a new address format (let's call it v2) for AMQP 1.0 source and target addresses.

The old format (let's call it v1) is described in
https://github.com/rabbitmq/rabbitmq-server/tree/v3.13.x/deps/rabbitmq_amqp1_0#routing-and-addressing

The only v2 source address format is:
```
/queue/:queue
```

The 4 possible v2 target addresses formats are:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
<null>
```
where the last AMQP <null> value format requires that each message’s `to` field contains one of:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
```

 ## Why?

The AMQP address v1 format comes with the following flaws:

1. Obscure address format:

Without reading the documentation, the differences for example between source addresses
```
/amq/queue/:queue
/queue/:queue
:queue
```
are unknown to users. Hence, the address format is obscure.

2. Implicit creation of topologies

Some address formats implicitly create queues (and bindings), such as source address
```
/exchange/:exchange/:binding-key
```
or target address
```
/queue/:queue
```
These queues and bindings are never deleted (by the AMQP 1.0 plugin.)
Implicit creation of such topologies is also obscure.

3. Redundant address formats

```
/queue/:queue
:queue
```
have the same meaning and are therefore redundant.

4. Properties section must be parsed to determine whether a routing key is present

Target address
```
/exchange/:exchange
```
requires RabbitMQ to parse the properties section in order to check whether the message `subject` is set.
If `subject` is not set, the routing key will default to the empty string.

5. Using `subject` as routing key misuses the purpose of this field.

According to the AMQP spec, the message `subject` field's purpose is:
> A common field for summary information about the message content and purpose.

6. Exchange names, queue names and routing keys must not contain the "/" (slash) character.

The current 3.13 implemenation splits by "/" disallowing these
characters in exchange, and queue names, and routing keys which is
unnecessary prohibitive.

7. Clients must create a separate link per target exchange

While this is reasonable working assumption, there might be rare use
cases where it could make sense to create many exchanges (e.g. 1
exchange per queue, see
https://github.com/rabbitmq/rabbitmq-server/discussions/10708) and have
a single application publish to all these exchanges.
With the v1 address format, for an application to send to 500 different
exchanges, it needs to create 500 links.

Due to these disadvantages and thanks to #10559 which allows clients to explicitly create topologies,
we can create a simpler, clearer, and better v2 address format.

 ## How?

 ### Design goals

Following the 7 cons from v1, the design goals for v2 are:
1. The address format should be simple so that users have a chance to
   understand the meaning of the address without necessarily consulting the docs.
2. The address format should not implicitly create queues, bindings, or exchanges.
   Instead, topologies should be created either explicitly via the new management node
   prior to link attachment (see #10559), or in future, we might support the `dynamic`
   source or target properties so that RabbitMQ creates queues dynamically.
3. No redundant address formats.
4. The target address format should explicitly state whether the routing key is present, empty,
   or will be provided dynamically in each message.
5. `Subject` should not be used as routing key. Instead, a better
   fitting field should be used.
6. Exchange names, queue names, and routing keys should allow to contain
   valid UTF-8 encoded data including the "/" character.
7. Allow both target exchange and routing key to by dynamically provided within each message.

Furthermore
8. v2 must co-exist with v1 for at least some time. Applications should be able to upgrade to
   RabbitMQ 4.0 while continuing to use v1. Examples include AMQP 1.0 shovels and plugins communicating
   between a 4.0 and a 3.13 cluster. Starting with 4.1, we should change the AMQP 1.0 shovel and plugin clients
   to use only the new v2 address format. This will allow AMQP 1.0 and plugins to communicate between a 4.1 and 4.2 cluster.
   We will deprecate v1 in 4.0 and remove support for v1 in a later 4.x version.

 ### Additional Context

The address is usually a String, but can be of any type.

The [AMQP Addressing extension](https://docs.oasis-open.org/amqp/addressing/v1.0/addressing-v1.0.html)
suggests that addresses are URIs and are therefore hierarchical and could even contain query parameters:
> An AMQP address is a URI reference as defined by RFC3986.

> the path expression is a sequence of identifier segments that reflects a path through an
> implementation specific relationship graph of AMQP nodes and their termini.
> The path expression MUST resolve to a node’s terminus in an AMQP container.

The [Using the AMQP Anonymous Terminus for Message Routing Version 1.0](https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html)
extension allows for the target being `null` and the `To` property to contain the node address.
This corresponds to AMQP 0.9.1 where clients can send each message on the same channel to a different `{exchange, routing-key}` destination.

The following v2 address formats will be used.

 ### v2 addresses

A new deprecated feature flag `amqp_address_v1` will be introduced in 4.0 which is permitted by default.
Starting with 4.1, we should change the AMQP 1.0 shovel and plugin AMQP 1.0 clients to use only the new v2 address format.
However, 4.1 server code must still understand the 4.0 AMQP 1.0 shovel and plugin AMQP 1.0 clients’ v1 address format.
The new deprecated feature flag will therefore be denied by default in 4.2.
This allows AMQP 1.0 shovels and plugins to work between
* 4.0 and 3.13 clusters using v1
* 4.1 and 4.0 clusters using v2 from 4.1 to v4.0 and v1 from 4.0 to 4.1
* 4.2 and 4.1 clusters using v2

without having to support both v1 and v2 at the same time in the AMQP 1.0 shovel and plugin clients.
While supporting both v1 and v2 in these clients is feasible, it's simpler to switch the client code directly from v1 to v2.

 ### v2 source addresses

The source address format is
```
/queue/:queue
```
If the deprecated feature flag `amqp_address_v1` is permitted and the queue does not exist, the queue will be auto-created.
If the deprecated feature flag `amqp_address_v1` is denied, the queue must exist.

 ### v2 target addresses

v1 requires attaching a new link for each destination exchange.
v2 will allow dynamic `{exchange, routing-key}` combinations for a given link.
v2 therefore allows for the rare use cases where a single AMQP 1.0 publisher app needs to send to many different exchanges.
Setting up a link per destination exchange could be cumbersome.
Hence, v2 will support the dynamic `{exchange, routing-key}` combinations of AMQP 0.9.1.
To achieve this, we make use of the "Anonymous Terminus for Message Routing" extension:
The target address will contain the AMQP value null.
The `To` field in each message must be set and contain either address format
```
/exchange/:exchange/key/:routing-key
```
or
```
/exchange/:exchange
```
when using the empty routing key.

The `to` field requires an address type and is better suited than the `subject field.

Note that each message will contain this `To` value for the anonymous terminus.
Hence, we should save some bytes being sent across the network and stored on disk.
Using a format
```
/e/:exchange/k/:routing-key
```
saves more bytes, but is too obscure.
However, we use only `/key/` instead of `/routing-key/` so save a few bytes.
This also simplifies the format because users don’t have to remember whether to use spell `routing-key` or `routing_key` or `routingkey`.

The other allowed target address formats are:
```
/exchange/:exchange/key/:routing-key
```
where exchange and routing key are static on the given link.

```
/exchange/:exchange
```
where exchange and routing key are static on the given link, and routing key will be the empty string (useful for example for the fanout exchange).

```
/queue/:queue
```
This provides RabbitMQ beginners the illusion of sending a message directly
to a queue without having to understand what exchanges and routing keys are.
If the deprecated feature flag `amqp_address_v1` is permitted and the queue does not exist, the queue will be auto-created.
If the deprecated feature flag `amqp_address_v1` is denied, the queue must exist.
Besides the additional queue existence check, this queue target is different from
```
/exchange//key/:queue
```
in that queue specific optimisations might be done (in future) by RabbitMQ
(for example different receiving queue types could grant different amounts of link credits to the sending clients).
A write permission check to the amq.default exchange will be performed nevertheless.

v2 will prohibit the v1 static link & dynamic routing-key combination
where the routing key is sent in the message `subject` as that’s also obscure.
For this use case, v2’s new anonymous terminus can be used where both exchange and routing key are defined in the message’s `To` field.

(The bare message must not be modified because it could be signed.)

The alias format
```
/topic/:topic
```
will also be removed.
Sending to topic exchanges is arguably an advanced feature.
Users can directly use the format
```
/exchange/amq.topic/key/:topic
```
which reduces the number of redundant address formats.

 ### v2 address format reference

To sump up (and as stated at the top of this commit message):

The only v2 source address format is:
```
/queue/:queue
```

The 4 possible v2 target addresses formats are:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
<null>
```
where the last AMQP <null> value format requires that each message’s `to` field contains one of:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
```

Hence, all 8 listed design goals are reached.
2024-04-05 12:22:02 +02:00
David Ansari dda1c500da Require feature flag message_containers
as it is required for Native AMQP 1.0 in 4.0.

Remove compatibility code.
2024-04-04 15:11:31 +02:00
David Ansari 4b9574571b Move macro to the correct Erlang app
The Web MQTT link is not used in the rabbitmq_mqtt Erlang app.
This link is only used in the rabbitmq_web_mqtt Erlang app.
Hence, move the link to the correct Erlang app.
2024-03-21 16:29:11 +01:00
David Ansari 8233db0703 Fix wrong test assumptions
PR #10761 added a new CLI command to list Web MQTT connections.
That new CLI command relies on feature flag delete_ra_cluster_mqtt_node
being enabled.

This commit ensures exactly this condition.
2024-03-21 16:24:57 +01:00
Jean-Sébastien Pédron affcb6aba5
Revert "Merge pull request #10772 from rabbitmq/dependabot/maven/deps/rabbitmq_mqtt/test/java_SUITE_data/main/org.apache.maven.plugins-maven-compiler-plugin-3.13.0"
This reverts commit d8505d6f43, reversing
changes made to d96b127a3b.
2024-03-19 16:05:53 +01:00
Michael Klishin f7697c3d19
Merge pull request #10761 from rabbitmq/cloudamqp-fix/9302-list-webmqtt-connections
A new command for Web MQTT connection listing #10693 #9302
2024-03-18 20:20:14 -04:00
dependabot[bot] ff4edf8d9f
Bump org.apache.maven.plugins:maven-compiler-plugin
Bumps [org.apache.maven.plugins:maven-compiler-plugin](https://github.com/apache/maven-compiler-plugin) from 3.12.1 to 3.13.0.
- [Release notes](https://github.com/apache/maven-compiler-plugin/releases)
- [Commits](https://github.com/apache/maven-compiler-plugin/compare/maven-compiler-plugin-3.12.1...maven-compiler-plugin-3.13.0)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-compiler-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-03-18 18:12:22 +00:00
Rin Kuryloski 5271c26908 Mixed version test fix 2024-03-18 10:34:34 +01:00
Lois Soto Lopez f7de9ba0b4 Remove unnecessary newline
Discussion #9302
2024-03-16 13:08:30 -04:00
LoisSotoLopez 481cac6430 Fix/9302 test list Web MQTT connections command
Discussion #9302
2024-03-16 13:08:30 -04:00
Lois Soto Lopez befb3b3015 Prevent not_found on list web mqtt connections
Fixes #9302
2024-03-16 13:08:30 -04:00
Michael Klishin eb261acd30
CLI: update guide URLs to use the new path structure
the original paths, e.g. /streams.html, do have redirects
in place but it turned out to be a surprisingly fragile
Cloudflare feature when there are hundreds of them,
so we better switch now.
2024-03-07 15:53:14 -05:00
David Ansari 8cb313d5a1 Support AMQP 1.0 natively
## What

Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0.
By "native", we mean do not proxy via AMQP 0.9.1 anymore.

  ## Why

Native AMQP 1.0 comes with the following major benefits:
1. Similar to Native MQTT, this commit provides better throughput, latency,
   scalability, and resource usage for AMQP 1.0.
   See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements.
   See further below for some benchmarks.
2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol,
   this commit allows implementing more AMQP 1.0 features in the future.
   Some features are already implemented in this commit (see next section).
3. Simpler, better understandable, and more maintainable code.

Native AMQP 1.0 as implemented in this commit has the
following major benefits compared to AMQP 0.9.1:
4. Memory and disk alarms will only stop accepting incoming TRANSFER frames.
   New connections can still be created to consume from RabbitMQ to empty queues.
5. Due to 4. no need anymore for separate connections for publishers and
   consumers as we currently recommended for AMQP 0.9.1. which potentially
   halves the number of physical TCP connections.
6. When a single connection sends to multiple target queues, a single
   slow target queue won't block the entire connection.
   Publisher can still send data quickly to all other target queues.
7. A publisher can request whether it wants publisher confirmation on a per-message basis.
   In AMQP 0.9.1 publisher confirms are configured per channel only.
8. Consumers can change their "prefetch count" dynamically which isn't
   possible in our AMQP 0.9.1 implementation. See #10174
9. AMQP 1.0 is an extensible protocol

This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in
RabbitMQ 3.x - most of which cannot be backported due to the complexity
and limitations of the old 3.x implementation.

This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.

 ## Implementation details

1. Breaking change: With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
will break because we always convert according to the message container conversions.
For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties.
Also, `false` won’t be respected since we always convert the headers with message containers.

2. Remove rabbit_queue_collector

rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.

3. 7 processes per connection + 1 process per session in this commit instead of
   7 processes per connection + 15 processes per session in 3.x
Supervision hierarchy got re-designed.

4. Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel.
Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session.
Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
  a connection write heavily at the same time.

This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.
Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
  can accumulate across all sessions bytes before flushing the socket.

In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy.
We still ensure high throughput by having separate reader, writer, and
session processes.

5. Transform rabbit_amqp1_0_writer into gen_server
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's better
to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.

How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.

6. Remove stats timer from writer
AMQP 1.0 connections haven't emitted any stats previously.

7. When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs.
This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
  settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
  and unsettled TRANSFERs.

8. Introduce credit API v2
Why:
The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly
designed since basic.credit is a synchronous call into the queue process
blocking the entire AMQP 1.0 session process.

How:
Change the interactions between queue clients and queue server
implementations:
* Clients only request a credit reply if the FLOW's `echo` field is set
* Include all link flow control state held by the queue process into a
  new credit_reply queue event:
  * `available` after the queue sends any deliveries
  * `link-credit` after the queue sends any deliveries
  * `drain` which allows us to combine the old queue events
    send_credit_reply and send_drained into a single new queue event
    credit_reply.
* Include the consumer tag into the credit_reply queue event such that
  the AMQP 1.0 session process can process any credit replies
  asynchronously.

Link flow control state `delivery-count` also moves to the queue processes.

The new interactions are hidden behind feature flag credit_api_v2 to
allow for rolling upgrades from 3.13 to 4.0.

9. Use serial number arithmetic in quorum queues and session process.

10. Completely bypass the rabbit_limiter module for AMQP 1.0
flow control. The goal is to eventually remove the rabbit_limiter module
in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This
commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter
into rabbit_queue_consumers.

11. Fix credit bug for streams:
AMQP 1.0 settlements shouldn't top up link credit,
only FLOW frames should top up link credit.

12. Allow sender settle mode unsettled for streams
since AMQP 1.0 acknowledgements to streams are no-ops (currently).

13. Fix AMQP 1.0 client bugs
Auto renewing credits should not be related to settling TRANSFERs.
Remove field link_credit_unsettled as it was wrong and confusing.
Prior to this commit auto renewal did not work when the sender uses
sender settlement mode settled.

14. Fix AMQP 1.0 client bugs
The wrong outdated Link was passed to function auto_flow/2

15. Use osiris chunk iterator
Only hold messages of uncompressed sub batches in memory if consumer
doesn't have sufficient credits.
Compressed sub batches are skipped for non Stream protocol consumers.

16. Fix incoming link flow control
Always use confirms between AMQP 1.0 queue clients and queue servers.
As already done internally by rabbit_fifo_client and
rabbit_stream_queue, use confirms for classic queues as well.

17. Include link handle into correlation when publishing messages to target queues
such that session process can correlate confirms from target queues to
incoming links.

18. Only grant more credits to publishers if publisher hasn't sufficient credits
anymore and there are not too many unconfirmed messages on the link.

19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow
between classic queue process and session process.

20. Link flow control is independent between links.
A client can refer to a queue or to an exchange with multiple
dynamically added target queues. Multiple incoming links can also fan
in to the same queue. However the link topology looks like, this
commit ensures that each link is only granted more credits if that link
isn't overloaded.

21. A connection or a session can send to many different queues.
In AMQP 0.9.1, a single slow queue will lead to the entire channel, and
then entire connection being blocked.
This commit makes sure that a single slow queue from one link won't slow
down sending on other links.
For example, having link A sending to a local classic queue and
link B sending to 5 replica quorum queue, link B will naturally
grant credits slower than link A. So, despite the quorum queue being
slower in confirming messages, the same AMQP 1.0 connection and session
can still pump data very fast into the classic queue.

22. If cluster wide memory or disk alarm occurs.
Each session sends a FLOW with incoming-window to 0 to sending client.
If sending clients don’t obey, force disconnect the client.

If cluster wide memory alarm clears:
Each session resumes with a FLOW defaulting to initial incoming-window.

23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms,
specifically, attaching consumers and consuming, i.e. emptying queues.
There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.

24. Flow control summary:
* If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
* If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
* If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied.
Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.

25. Register AMQP sessions
Prefer local-only pg over our custom pg_local implementation as
pg is a better process group implementation than pg_local.
pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.

26. Start a local-only pg when Rabbit boots:
> A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name:
> pg:start_link(node()).
Register AMQP 1.0 connections and sessions with pg.

In future we should remove pg_local and instead use the new local-only
pg for all registered processes such as AMQP 0.9.1 connections and channels.

27. Requeue messages if link detached
Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed'
field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed,
we expect every outstanding delivery to be requeued.
In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued.
Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1:
"After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can
still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them."
[https://www.rabbitmq.com/consumers.html#unsubscribing]
An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries

28. Init AMQP session with BEGIN frame
Similar to how there can't be an MQTT processor without a CONNECT
frame, there can't be an AMQP session without a BEGIN frame.
This allows having strict dialyzer types for session flow control
fields (i.e. not allowing 'undefined').

29. Move serial_number to AMQP 1.0 common lib
such that it can be used by both AMQP 1.0 server and client

30. Fix AMQP client to do serial number arithmetic.

31. AMQP client: Differentiate between delivery-id and transfer-id for better
understandability.

32. Fix link flow control in classic queues
This commit fixes
```
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
```
followed by
```
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2
```
Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.

The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.
Fixes #2597

The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.

Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.

33. The double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This is a big simplification.

34. This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.

Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.

Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).

Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.

This approach is safe and simple.

The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:

i. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.

ii. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simultaneously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?

35. Support stream filtering in AMQP 1.0 (by @acogoluegnes)
Use the x-stream-filter-value message annotation
to carry the filter value in a published message.
Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered
filters when creating a receiver that wants to filter
out messages from a stream.

36. Remove credit extension from AMQP 0.9.1 client

37. Support maintenance mode closing AMQP 1.0 connections.

38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.

39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default.
    The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment
    tools from failing that execute:
```
rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
```

40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`.
Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`:
```
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}
```

 ## Benchmarks

 ### Throughput & Latency

Setup:
* Single node Ubuntu 22.04
* Erlang 26.1.1

Start RabbitMQ:
```
make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"
```

Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.

Start client:
https://github.com/ssorj/quiver
https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
```

1. Classic queue
```
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```

This commit:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s

Latencies by percentile:

          0% ........ 0 ms       90.00% ........ 9 ms
         25% ........ 2 ms       99.00% ....... 14 ms
         50% ........ 4 ms       99.90% ....... 17 ms
        100% ....... 26 ms       99.99% ....... 24 ms
```

RabbitMQ 3.x (main branch as of 30 January 2024):
```
---------------------- Sender -----------------------  --------------------- Receiver ----------------------  --------
Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Lat [ms]
-----------------------------------------------------  -----------------------------------------------------  --------
     2.1        130,814      65,342        6     73.6       2.1          3,217       1,607        0      8.0       511
     4.1        163,580      16,367        2     74.1       4.1          3,217           0        0      8.0         0
     6.1        229,114      32,767        3     74.1       6.1          3,217           0        0      8.0         0
     8.1        261,880      16,367        2     74.1       8.1         67,874      32,296        8      8.2     7,662
    10.1        294,646      16,367        2     74.1      10.1         67,874           0        0      8.2         0
    12.1        360,180      32,734        3     74.1      12.1         67,874           0        0      8.2         0
    14.1        392,946      16,367        3     74.1      14.1         68,604         365        0      8.2    12,147
    16.1        458,480      32,734        3     74.1      16.1         68,604           0        0      8.2         0
    18.1        491,246      16,367        2     74.1      18.1         68,604           0        0      8.2         0
    20.1        556,780      32,767        4     74.1      20.1         68,604           0        0      8.2         0
    22.1        589,546      16,375        2     74.1      22.1         68,604           0        0      8.2         0
receiver timed out
    24.1        622,312      16,367        2     74.1      24.1         68,604           0        0      8.2         0
quiver:  error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
  File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
    _plano.wait(receiver, check=True)
  File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
    raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
```

2. Quorum queue:
```
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s

Latencies by percentile:

          0% ....... 11 ms       90.00% ....... 23 ms
         25% ....... 15 ms       99.00% ....... 28 ms
         50% ....... 18 ms       99.90% ....... 33 ms
        100% ....... 49 ms       99.99% ....... 47 ms
```

RabbitMQ 3.x:
```
---------------------- Sender -----------------------  --------------------- Receiver ----------------------  --------
Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Lat [ms]
-----------------------------------------------------  -----------------------------------------------------  --------
     2.1        130,814      65,342        9     69.9       2.1         18,430       9,206        5      7.6     1,221
     4.1        163,580      16,375        5     70.2       4.1         18,867         218        0      7.6     2,168
     6.1        229,114      32,767        6     70.2       6.1         18,867           0        0      7.6         0
     8.1        294,648      32,734        7     70.2       8.1         18,867           0        0      7.6         0
    10.1        360,182      32,734        6     70.2      10.1         18,867           0        0      7.6         0
    12.1        425,716      32,767        6     70.2      12.1         18,867           0        0      7.6         0
receiver timed out
    14.1        458,482      16,367        5     70.2      14.1         18,867           0        0      7.6         0
quiver:  error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
  File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
    _plano.wait(receiver, check=True)
  File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
    raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
```

3. Stream:
```
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose
```

This commit:
```
Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s
```

RabbitMQ 3.x:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s
```

 ### Memory usage

Start RabbitMQ:
```
ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
```

```
/bin/cat rabbitmq.conf

tcp_listen_options.sndbuf  = 2048
tcp_listen_options.recbuf  = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none
```

Create 50k connections with 2 sessions per connection, i.e. 100k session in total:

```go
package main

import (
	"context"
	"log"
	"time"

	"github.com/Azure/go-amqp"
)

func main() {
	for i := 0; i < 50000; i++ {
		conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
		if err != nil {
			log.Fatal("dialing AMQP server:", err)
		}
		_, err = conn.NewSession(context.TODO(), nil)
		if err != nil {
			log.Fatal("creating AMQP session:", err)
		}
		_, err = conn.NewSession(context.TODO(), nil)
		if err != nil {
			log.Fatal("creating AMQP session:", err)
		}
	}
	log.Println("opened all connections")
	time.Sleep(5 * time.Hour)
}
```

This commit:
```
erlang:memory().
[{total,4586376480},
 {processes,4025898504},
 {processes_used,4025871040},
 {system,560477976},
 {atom,1048841},
 {atom_used,1042841},
 {binary,233228608},
 {code,21449982},
 {ets,108560464}]

erlang:system_info(process_count).
450289
```
7 procs per connection + 1 proc per session.
(7 + 2*1) * 50,000 = 450,000 procs

RabbitMQ 3.x:
```
erlang:memory().
[{total,15168232704},
 {processes,14044779256},
 {processes_used,14044755120},
 {system,1123453448},
 {atom,1057033},
 {atom_used,1052587},
 {binary,236381264},
 {code,21790238},
 {ets,391423744}]

erlang:system_info(process_count).
1850309
```
7 procs per connection + 15 per session
(7 + 2*15) * 50,000 = 1,850,000 procs

50k connections + 100k session require
with this commit: 4.5 GB
in RabbitMQ 3.x: 15 GB

 ## Future work

1. More efficient parser and serializer
2. TODO in mc_amqp: Do not store the parsed message on disk.
3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP
clients to create RabbitMQ objects (queues, exchanges, ...).
2024-02-28 14:15:20 +01:00
David Ansari 8b151f43f7 Parse 2 bytes encoded AMQP boolean to Erlang boolean
An AMQP boolean can by encoded using 1 byte or 2 bytes:
https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-boolean

Prior to this commit, our Erlang parser returned:
* Erlang terms `true` or `false` for the 1 byte AMQP encoding
* Erlang terms `{boolean, true}` or `{boolean, false}` for the 2 byte AMQP enconding

Having a serializer and parser that perform the opposite actions such
that
```
Term = parse(serialize(Term))
```
is desirable as it provides a symmetric property useful not only for
property based testing, but also for avoiding altering message hashes
when serializing and parsing the same term.

However, dealing wth `{boolean, boolean()}` tuples instead of `boolean()` is very unhandy since
all Erlang code must take care of both forms leading to subtle bugs as
occurred in:
* 4cbeab8974/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl (L155-L158)
* b8173c9d3b/deps/rabbitmq_mqtt/src/mc_mqtt.erl (L83-L88)
* b8173c9d3b/deps/rabbit/src/mc_amqpl.erl (L123-L127)

Therefore, this commits decides to take the safe approach and always
parse to an Erlang `boolean()` independent of whether the AMQP boolean
was encoded with 1 or 2 bytes.
2024-02-16 17:57:28 +01:00
dependabot[bot] 8b4862972e
build(deps-dev): bump org.junit.jupiter:junit-jupiter
Bumps [org.junit.jupiter:junit-jupiter](https://github.com/junit-team/junit5) from 5.10.1 to 5.10.2.
- [Release notes](https://github.com/junit-team/junit5/releases)
- [Commits](https://github.com/junit-team/junit5/compare/r5.10.1...r5.10.2)

---
updated-dependencies:
- dependency-name: org.junit.jupiter:junit-jupiter
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-02-05 18:29:48 +00:00
dependabot[bot] 9f24f9348c
build(deps-dev): bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.25.2 to 3.25.3.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.25.2...assertj-build-3.25.3)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-02-05 18:28:08 +00:00
Michael Klishin 9c79ad8d55 More missed license header updates #9969 2024-02-05 12:26:25 -05:00
Michael Klishin f414c2d512
More missed license header updates #9969 2024-02-05 11:53:50 -05:00
David Ansari bedcae18c2 Fix MQTT test flake
Prior to this commit test block_connack_timeout
flaked when 2 new ports got created instead of only 1
in line
```
[NewPort] = Ports -- Ports0,
```

This commit filters for tcp_inet ports.
This will always return the port of the new MQTT connection.
2024-01-27 17:43:38 +01:00
dependabot[bot] 2b83b000f7
build(deps-dev): bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.25.1 to 3.25.2.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.25.1...assertj-build-3.25.2)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-24 19:01:02 +00:00
Arnaud Cogoluègnes 1f89ede396
Remove rabbit_authz_backend:state_can_expire/0
Use expiry_timestamp/1 instead, which returns 'never'
if the credentials do not expire.

Fixes #10382
2024-01-24 09:58:59 +01:00
dependabot[bot] 4b5ef6474f
build(deps): bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.42.0 to 2.43.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.42.0...lib/2.43.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-23 18:56:15 +00:00
Michael Klishin 7b151a7651 More missed (c) header updates 2024-01-22 23:44:47 -05:00
Michael Klishin c1d37e3e02
Merge pull request #10364 from rabbitmq/flaky-mc-flake-flake
Reduce flakiness of certain Common Test suites
2024-01-22 16:22:24 -05:00
Karl Nilsson c10b4dc0f0 protocol_interop_SUITE - try a durable queue for amqp part 2024-01-22 15:27:30 +00:00
Michael Klishin fb775afcc8
More (c) source header updates #9969 2024-01-19 19:53:28 -05:00
Karl Nilsson 7e2f148dd4 Try a little sleep in an mqtt test
It could help, we'll see.
2024-01-18 16:06:36 +00:00
David Ansari 6a3ba6210a
Reduce per message disk overhead (#10339)
* Reduce per message disk overhead

Message container annotation keys are stored on disk.
By shortening them we save 95 - 58 = 37 bytes per message.
```
1> byte_size(term_to_binary(#{exchange => <<>>, routing_keys => [<<"my-key">>], durable => true, priority => 3, timestamp => 1000})).
95
2> byte_size(term_to_binary(#{x => <<>>, rk => [<<"my-key">>], d => true, p => 3, ts => 1000})).
58
```
This should somewhat reduce disk I/O and disk space.

* Ensure durable is a boolean

Prevent key 'durable' with value 'undefined' being added to the
mc annotations, for example when the durable field was not set, but
another AMQP 1.0 header field was set.

* Apply feedback
2024-01-18 11:53:02 +01:00
dependabot[bot] e84d2d9cf6
build(deps): bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.41.1 to 2.42.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/maven/2.41.1...lib/2.42.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-15 18:17:40 +00:00
dependabot[bot] 12b6225387
build(deps): bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.2.3 to 3.2.5.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.2.3...surefire-3.2.5)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-10 18:59:58 +00:00
dependabot[bot] 0e22221efc
build(deps-dev): bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.25.0 to 3.25.1.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.25.0...assertj-build-3.25.1)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-03 18:37:55 +00:00
Michael Klishin 01092ff31f
(c) year bumps 2024-01-01 22:02:20 -05:00
dependabot[bot] 506dccb172
build(deps-dev): bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.24.2 to 3.25.0.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.24.2...assertj-build-3.25.0)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-01 18:59:45 +00:00
David Ansari 78b4fcc899 Allow MQTT QoS 0 subscribers to reconnect
The solution in #10203 has the following issues:
1. Bindings can be left ofter in Mnesia table rabbit_durable_queue.
One solution to 1. would be to first delete the old queue via
`rabbit_amqqueue:internal_delete(Q, User, missing_owner)`
and subsequently declare the new queue via
`rabbit_amqqueue:internal_declare(Q, false)`
However, even then, it suffers from:
2. Race conditions between `rabbit_amqqueue:on_node_down/1`
and `rabbit_mqtt_qos0_queue:declare/2`:
`rabbit_amqqueue:on_node_down/1` could first read the queue records that
need to be deleted, thereafter `rabbit_mqtt_qos0_queue:declare/2` could
re-create the queue owned by the new connection PID, and `rabbit_amqqueue:on_node_down/1`
could subsequently delete the re-created queue.

Unfortunately, `rabbit_amqqueue:on_node_down/1` does not delete
transient queues in one isolated transaction. Instead it first reads
queues and subsequenlty deletes queues in batches making it prone to
race conditions.

Ideally, this commit deletes all rabbit_mqtt_qos0_queue queues of the
node that has crashed including their bindings.
However, doing so in one transaction is risky as there may be millions
of such queues and the current code path applies the same logic on all
live nodes resulting in conflicting transactions and therefore a long
database operation.

Hence, this commit uses the simplest approach which should still be
safe:
Do not remove rabbit_mqtt_qos0_queue queues if a node crashes.
Other live nodes will continue to route to these dead queues.
That should be okay, given that the rabbit_mqtt_qos0_queue clients auto
confirm.
Continuing routing however has the effect of counting as routing result
for AMQP 0.9.1 `mandatory` property.
If an MQTT client re-connects to a live node with the same client ID,
the new node will delete and then re-create the queue.
Once the crashed node comes back online, it will clean up its leftover
queues and bindings.
2023-12-27 20:47:06 -05:00
dependabot[bot] 67b8d30281
build(deps): bump org.apache.maven.plugins:maven-compiler-plugin
Bumps [org.apache.maven.plugins:maven-compiler-plugin](https://github.com/apache/maven-compiler-plugin) from 3.12.0 to 3.12.1.
- [Release notes](https://github.com/apache/maven-compiler-plugin/releases)
- [Commits](https://github.com/apache/maven-compiler-plugin/compare/maven-compiler-plugin-3.12.0...maven-compiler-plugin-3.12.1)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-compiler-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-25 18:09:25 +00:00
David Ansari 9487189dc6 Overwrite rabbit_mqtt_qos0_queue record from crashed node
When a node is shut down cleanly, the rabbit_mqtt_qos0_queue record is
removed from Mnesia.
When a node crashes and subsequently reboots the new node incarnation
removes the old rabbit_mqtt_qos0_queue record from Mnesia (via
rabbit_mqtt_qos0_queue:recover/2)

However, when a node crashes, the rabbit_mqtt_qos0_queue will be removed
from Mnesia table rabbit_queue, but will still be present in table
rabbit_durable_queue on the other live nodes.
Prior to this commit, when the same MQTT client (i.e. same MQTT client
ID) re-connects from the crashed node to another live node and
re-subscribes, the following error occurred:
```
[info] <0.43155.0> Accepted MQTT connection 10.105.0.18:60508 -> 10.105.0.10:1883 for client ID nodered_24e214feb018a232
[debug] <0.43155.0> Received a SUBSCRIBE for topic(s) [{mqtt_topic,
[debug] <0.43155.0>                                        <<"as923/gateway/+/command/#">>,0}]
[error] <0.43155.0> Failed to declare queue 'mqtt-subscription-nodered_24e214feb018a232qos0' in vhost '/': {absent,
[error] <0.43155.0>                                                                                         {amqqueue,
[error] <0.43155.0>                                                                                          {resource,
[error] <0.43155.0>                                                                                           <<"/">>,
[error] <0.43155.0>                                                                                           queue,
[error] <0.43155.0>                                                                                           <<"mqtt-subscription-nodered_24e214feb018a232qos0">>},
[error] <0.43155.0>                                                                                          true,
[error] <0.43155.0>                                                                                          false,
[error] <0.43155.0>                                                                                          <15486.32690.0>,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          <15486.32690.0>,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          [{vhost,
[error] <0.43155.0>                                                                                            <<"/">>},
[error] <0.43155.0>                                                                                           {name,
[error] <0.43155.0>                                                                                            <<"ha-all-mqtt">>},
[error] <0.43155.0>                                                                                           {pattern,
[error] <0.43155.0>                                                                                            <<"^mqtt-">>},
[error] <0.43155.0>                                                                                           {'apply-to',
[error] <0.43155.0>                                                                                            <<"all">>},
[error] <0.43155.0>                                                                                           {definition,
[error] <0.43155.0>                                                                                            [{<<"ha-mode">>,
[error] <0.43155.0>                                                                                              <<"all">>}]},
[error] <0.43155.0>                                                                                           {priority,
[error] <0.43155.0>                                                                                            0}],
[error] <0.43155.0>                                                                                          undefined,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          undefined,
[error] <0.43155.0>                                                                                          live,
[error] <0.43155.0>                                                                                          0,
[error] <0.43155.0>                                                                                          [],
[error] <0.43155.0>                                                                                          <<"/">>,
[error] <0.43155.0>                                                                                          #{user =>
[error] <0.43155.0>                                                                                             <<"iottester">>},
[error] <0.43155.0>                                                                                          rabbit_mqtt_qos0_queue,
[error] <0.43155.0>                                                                                          #{}},
[error] <0.43155.0>                                                                                         nodedown}
[error] <0.43155.0> MQTT protocol error on connection 10.105.0.18:60508 -> 10.105.0.10:1883: subscribe_error
```

This commit fixes this error allowing an MQTT client that connects with CleanSession=true and
subscribes with QoS 0 to re-connect and re-subscribe to another live
node if the original Rabbit node crashes.

Reported in https://groups.google.com/g/rabbitmq-users/c/pxgy0QiwilM/m/LkJQ-3DyBgAJ
2023-12-21 17:30:15 +01:00
dependabot[bot] 0f5c2d2325
build(deps): bump org.apache.maven.plugins:maven-compiler-plugin
Bumps [org.apache.maven.plugins:maven-compiler-plugin](https://github.com/apache/maven-compiler-plugin) from 3.11.0 to 3.12.0.
- [Release notes](https://github.com/apache/maven-compiler-plugin/releases)
- [Commits](https://github.com/apache/maven-compiler-plugin/compare/maven-compiler-plugin-3.11.0...maven-compiler-plugin-3.12.0)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-compiler-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-19 18:09:22 +00:00
David Ansari abcbe5e647 Fix test expectation
as `main` and `v3.12.x` branches are currently red due to
https://github.com/rabbitmq/rabbitmq-server/pull/10139
2023-12-15 09:35:54 +01:00
David Ansari f44c851293 Fix crash when closing connection
Avoid the following crash
```
** Reason for termination ==
** {mqtt_unexpected_cast,{shutdown,"Closed via management plugin"}}

  crasher:
    initial call: rabbit_mqtt_reader:init/1
    pid: <0.1096.0>
    registered_name: []
    exception exit: {mqtt_unexpected_cast,
                        {shutdown,"Closed via management plugin"}}
      in function  gen_server:handle_common_reply/8 (gen_server.erl, line 1208)
```
when closing MQTT or Stream connections via HTTP API endpoint
```
/connections/username/:username
```
2023-12-14 12:35:51 +01:00
dependabot[bot] b3f33aa42c
build(deps): bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.2.2 to 3.2.3.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.2.2...surefire-3.2.3)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-13 18:51:16 +00:00
dependabot[bot] aa0095aae9
Bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.41.0 to 2.41.1.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.41.0...maven/2.41.1)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-04 18:13:42 +00:00
dependabot[bot] fb90276710
Bump ch.qos.logback:logback-classic
Bumps [ch.qos.logback:logback-classic](https://github.com/qos-ch/logback) from 1.2.12 to 1.2.13.
- [Commits](https://github.com/qos-ch/logback/compare/v_1.2.12...v_1.2.13)

---
updated-dependencies:
- dependency-name: ch.qos.logback:logback-classic
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-01 18:30:15 +00:00
dependabot[bot] c69de4a083
Bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.40.0 to 2.41.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.40.0...lib/2.41.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-11-27 18:19:07 +00:00
Karl Nilsson f67058ac6c
Merge pull request #9830 from rabbitmq/mc-refinements
Message container conversion improvements
2023-11-24 10:22:34 +00:00
Karl Nilsson 61f13d0bb7 Add UUID conversion to mc_mqtt 2023-11-23 17:36:34 +00:00
Michael Klishin 1b642353ca
Update (c) according to [1]
1. https://investors.broadcom.com/news-releases/news-release-details/broadcom-and-vmware-intend-close-transaction-november-22-2023
2023-11-21 23:18:22 -05:00
David Ansari 95c5f2ec9e Some small fixes 2023-11-16 12:33:17 +01:00
Karl Nilsson c4fd947aad MC: various changes and improvements
To refine conversion behaviour add additional tests
and ensure it matches the documentation.

mc: optionally capture source environment

And pass target environment to mc:convert

This allows environmental data and configuration to be captured and
used to modify and complete conversion logic whilst allowing conversion
code to remain pure and portable.
2023-11-15 11:04:49 +00:00
Michael Klishin e52772057c
Merge pull request #9874 from rabbitmq/opt-mgmt-queue-listings
Optimise HTTP API /queues endpoint
2023-11-07 14:30:42 -05:00
dependabot[bot] 37acb08feb
Bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.2.1 to 3.2.2.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.2.1...surefire-3.2.2)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-11-07 18:25:28 +00:00
David Ansari 5b18e9d982 Avoid warning of unhandled message
Prior to this commit:
1. Start RabbitMQ with MQTT plugin enabled.
2.
```
rabbitmq-diagnostics consume_event_stream
^C
```
3. The logs will print the following warning:
```
[warning] <0.570.0> ** Undefined handle_info in rabbit_mqtt_internal_event_handler
[warning] <0.570.0> ** Unhandled message: {'DOWN',#Ref<0.2410135134.1846280193.145044>,process,
[warning] <0.570.0>                               <52723.100.0>,noconnection}
[warning] <0.570.0>
```

This is because rabbit_event_consumer:init/1 monitors the CLI process.

Any rabbit_event handler should therefore implement handle_info/2.

It's similar to what's described in the gen_event docs about
add_sup_handler/3:
> Any event handler attached to an event manager which in turn has a
> supervised handler should expect callbacks of the shape
> Module:handle_info({'EXIT', Pid, Reason}, State).
2023-11-07 12:33:02 +01:00
dependabot[bot] 0664b7ec7b
Bump org.junit.jupiter:junit-jupiter
Bumps [org.junit.jupiter:junit-jupiter](https://github.com/junit-team/junit5) from 5.10.0 to 5.10.1.
- [Release notes](https://github.com/junit-team/junit5/releases)
- [Commits](https://github.com/junit-team/junit5/compare/r5.10.0...r5.10.1)

---
updated-dependencies:
- dependency-name: org.junit.jupiter:junit-jupiter
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-11-06 18:46:05 +00:00
Karl Nilsson c2cd60b18d Optimise mgmt HTTP API /queues endpoint
Listing queues with the HTTP API when there are many (1000s) of
quorum queues could be excessively slow compared to the same scenario
with classic queues.

This optimises various aspects of HTTP API queue listings.
For QQs it removes the expensive cluster wide rpcs used to get the
"online" status of each quorum queue. This was previously done _before_
paging and thus would perform a cluster-wide query for _each_ quorum queue in
the vhost/system. This accounted for most of the slowness compared to
classic queues.

Secondly the query to separate the running from the down queues
consisted of two separate queries that later were combined when a single
query would have sufficed.

This commit also includes a variety of other improvements and minor
fixes discovered during testing and optimisation.

MINOR BREAKING CHANGE: quorum queues would previously only display one
of two states: running or down. Now there is a new state called minority
which is emitted when the queue has at least one member running but
cannot commit entries due to lack of quorum.

Also the quorum queue may transiently enter the down state when a node
goes down and before its elected a new leader.
2023-11-06 15:34:26 +00:00
David Ansari d1940c997e Fix Khepri flake
Previously, test pubsub was flaky
```
{shared_SUITE,pubsub,766}
{test_case_failed,missing m1}
```
because the binding wasn't present yet on node 0 when publishing to node
0.
2023-11-03 09:52:53 +01:00
David Ansari 75e777b833 Provide more debug output for flaky test
In
https://github.com/rabbitmq/rabbitmq-server/actions/runs/6735035912/job/18308551274?pr=9865
test
```
bazel test //deps/rabbitmq_mqtt:v5_SUITE -t- --test_sharding_strategy=disabled \
    --test_env FOCUS="-group [mqtt,cluster_size_1] -case will_delay_equals_session_expiry" \
    --test_env RABBITMQ_METADATA_STORE=mnesia --config=rbe-26 --runs_per_test=100
```
seems to flake.
2023-11-02 18:40:38 +01:00
Iliia Khaprov c577e04b73 Remove POODLE check, we are in the future 2023-11-01 10:53:27 +01:00
David Ansari a5f4317c3f Fix test flake session_takeover_v3_v5
Prior to this commit the follwing test was flaky:
```
bazel test //deps/rabbitmq_mqtt:v5_SUITE -t- --test_sharding_strategy=disabled \
    --test_env FOCUS="-group [mqtt,cluster_size_3] -case session_takeover_v3_v5" \
    --test_env RABBITMQ_METADATA_STORE=khepri --config=rbe-26 --runs_per_test=20
```
because rabbit_misc:maps_any/2 filtered out a destination queue after routing if
that destination queue wasn't associated with any matched binding key.

This commit makes the test green.

However, the root cause of this issue isn't solved:
MQTT 5.0 requires the topic exchange to return matched binding keys for
destination queues such that feature NoLocal, and Subscription
Identifiers work correctly.

The current MQTT plugin relies on session state to be stored
consistently in the database. When a new client connects, the session
state is retrieved from the database and appropriate actions are taken:
e.g. consume from a queue, modify binding arguments, etc.

With Mnesia this consistency was guaranteed thanks to sync transactions
when updating queues and bindings.

Khepri has only eventual consistency semantics. This is problematic for the
MQTT plugin in the session_takeover_v3_v5 test scenario:

1. Client subscribes on node 1 (with v3). Node 1 returns subscription
   success to client.
2. **Thereafter**, another client with the same MQTT client ID connects
   to node 0 (with v5). "Proper" session takeover should take place.
   However due to eventual consistency, the subscription / binding isn't
   present yet on node 0. Therefore the session upgrade from v3 to v5
   does not take place and leads to binding keys being absent when
   messages are routed to the session's queue.
2023-10-30 09:39:49 +01:00
David Ansari 43dfa4d6ac Fix MQTT test flakes
Tests session_reconnect and session_takeover were flaky, specifically
when run under Khepri.

The issue was in the test itself that the connect properties didn't
apply. Therefore, prior to this commit an exclusive queue got created.
2023-10-27 17:30:41 +02:00
David Ansari 7cd91a8fd2 Do not skip test
On `main` branch and v3.12.6 feature flag delete_ra_cluster_mqtt_node is
supported. Instead of skipping the entire test if that feature flag is
not enabled, enable the feature flag and run the test.

More generally:
"Instead of verifying if a feature flag is enabled or not, it's best to enable
it and react from the return value (success or failure).
Mixed version testing always turn off all feature flags by default.
So in the future, even though all nodes supports the mentionned
feature flag, the testcase will still be skipped." [JSP]
2023-10-27 16:49:13 +02:00
David Ansari c8b90488e7 Skip some assertions in mixed version tests
See commit message 00c77e0a1a for details.

In a multi node mixed version cluster where the lower version is
compiled with a different OTP version, anonymous Ra leader queries will
fail with a badfun error if initiated on the higher version and executed
on the leader on the lower version node.
2023-10-27 15:12:51 +02:00
David Ansari cad067f5fa Remove erlang:port_command/2 hack
as selective receives are efficient in OTP 26:
```
OTP-18431
Application(s):
compiler, stdlib
Related Id(s):
PR-6739
Improved the selective receive optimization, which can now be enabled for references returned from other functions.
This greatly improves the performance of gen_server:send_request/3, gen_server:wait_response/2, and similar functions.
```
2023-10-27 10:37:56 +02:00
Michael Klishin 673548f343
Merge pull request #9805 from rabbitmq/skip-maintenance
Skip test maintenance in mixed version mode
2023-10-27 03:31:27 -04:00
Michael Klishin aaed0e676b
Merge pull request #9807 from rabbitmq/unskip-duplicate-client-id
Run test duplicate_client_id with Khepri
2023-10-27 03:31:13 -04:00
David Ansari c914e43097 Do not enable OTP feature at runtime
Since Erlang/OTP 26:
```
OTP-18445
Application(s):
erts, stdlib
It is no longer necessary to enable a feature in the runtime system in order to load modules that are using it.
It is sufficient to enable the feature in the compiler when compiling it.
That means that to use feature maybe_expr in Erlang/OTP 26, it is sufficient to enable it during compilation.
In Erlang/OTP 27, feature maybe_expr will be enabled by default, but it will be possible to disable it.
```
2023-10-27 09:26:31 +02:00
David Ansari 5cbeedffd9 Run test duplicate_client_id with Khepri
The only reason to skip tests with Khepri is if they use
classic mirror queues or use Mnesia directly.
2023-10-27 09:15:14 +02:00
David Ansari 00c77e0a1a Skip test maintenance in mixed version mode
This test fails when MQTT client ID tracking is performed in Ra, and the
higher version node gets compiled with a different OTP version (26) than
the lower version node (25).

The reason is described in 83eede7ef2
```
An interesting side note learned here is that the compiled file
rabbit_mqtt_collector must not be changed. This commit only modifies
function specs. However as soon as the compiled code is changed, this
module becomes a new version. The new version causes the anonymous ra query
function to fail in mixed clusters: When the old node does a
ra:leader_query where the leader is on the new node, the query function
fails on the new node with `badfun` because the new node does not have
the same module version. For more context, read:
https://web.archive.org/web/20181017104411/http://www.javalimit.com/2010/05/passing-funs-to-other-erlang-nodes.html
```

We shouldn’t use an anonymous function for ra:leader_query or ra:consistent_query.
Instead we should use the {M,F,A} form.
9e5d437a0a/src/ra.erl (L102-L103)

In MQTT the anonymous function is used in bcb95c949d/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl (L50)
This causes the query to return a bad fun error (silently ignored in bcb95c949d/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl (L70-L71) )
when executed on a different node and either:
1.) Any code in file rabbit_mqtt_collector.erl changed, or
2.) The code gets compiled with a different OTP version.

2.) is the reason for a failing mixed version test in https://github.com/rabbitmq/rabbitmq-server/pull/8553 because both higher and lower versions run OTP 26,
but the higher version node got compiled with 26 while the lower version node got compiled with 25.

The same file
compiled with OTP 26.0.1
```
1> rabbit_mqtt_collector:module_info(attributes).
[{vsn,[30045739264236496640687548892374951597]}]
```

compiled with OTP 25.3.2
```
1> rabbit_mqtt_collector:module_info(attributes).
[{vsn,[168144385419873449889532520247510637232]}]
```

Due to the very low impact that maintenance mode will not close all MQTT
client connections with feature flag delete_ra_cluster_mqtt_node being
disabled, we skip this test.
2023-10-27 08:51:00 +02:00
dependabot[bot] 1d9524ea98
Bump com.rabbitmq:amqp-client
Bumps [com.rabbitmq:amqp-client](https://github.com/rabbitmq/rabbitmq-java-client) from 5.19.0 to 5.20.0.
- [Release notes](https://github.com/rabbitmq/rabbitmq-java-client/releases)
- [Commits](https://github.com/rabbitmq/rabbitmq-java-client/compare/v5.19.0...v5.20.0)

---
updated-dependencies:
- dependency-name: com.rabbitmq:amqp-client
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-10-25 18:55:23 +00:00
dependabot[bot] 7f45bb1949
Bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.1.2 to 3.2.1.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.1.2...surefire-3.2.1)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-10-24 18:27:56 +00:00
David Ansari 3501a00632 Fix MQTT Topic Alias type spec 2023-10-05 18:11:26 +02:00
dependabot[bot] 83aa17cd7f
Bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.39.0 to 2.40.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.39.0...lib/2.40.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-09-29 18:58:01 +00:00
Diana Parra Corbacho 5f0981c5a3
Allow to use Khepri database to store metadata instead of Mnesia
[Why]

Mnesia is a very powerful and convenient tool for Erlang applications:
it is a persistent disc-based database, it handles replication accross
multiple Erlang nodes and it is available out-of-the-box from the
Erlang/OTP distribution. RabbitMQ relies on Mnesia to manage all its
metadata:

* virtual hosts' properties
* intenal users
* queue, exchange and binding declarations (not queues data)
* runtime parameters and policies
* ...

Unfortunately Mnesia makes it difficult to handle network partition and,
as a consequence, the merge conflicts between Erlang nodes once the
network partition is resolved. RabbitMQ provides several partition
handling strategies but they are not bullet-proof. Users still hit
situations where it is a pain to repair a cluster following a network
partition.

[How]

@kjnilsson created Ra [1], a Raft consensus library that RabbitMQ
already uses successfully to implement quorum queues and streams for
instance. Those queues do not suffer from network partitions.

We created Khepri [2], a new persistent and replicated database engine
based on Ra and we want to use it in place of Mnesia in RabbitMQ to
solve the problems with network partitions.

This patch integrates Khepri as an experimental feature. When enabled,
RabbitMQ will store all its metadata in Khepri instead of Mnesia.

This change comes with behavior changes. While Khepri remains disabled,
you should see no changes to the behavior of RabbitMQ. If there are
changes, it is a bug. After Khepri is enabled, there are significant
changes of behavior that you should be aware of.

Because it is based on the Raft consensus algorithm, when there is a
network partition, only the cluster members that are in the partition
with at least `(Number of nodes in the cluster ÷ 2) + 1` number of nodes
can "make progress". In other words, only those nodes may write to the
Khepri database and read from the database and expect a consistent
result.

For instance in a cluster of 5 RabbitMQ nodes:
* If there are two partitions, one with 3 nodes, one with 2 nodes, only
  the group of 3 nodes will be able to write to the database.
* If there are three partitions, two with 2 nodes, one with 1 node, none
  of the group can write to the database.

Because the Khepri database will be used for all kind of metadata, it
means that RabbitMQ nodes that can't write to the database will be
unable to perform some operations. A list of operations and what to
expect is documented in the associated pull request and the RabbitMQ
website.

This requirement from Raft also affects the startup of RabbitMQ nodes in
a cluster. Indeed, at least a quorum number of nodes must be started at
once to allow nodes to become ready.

To enable Khepri, you need to enable the `khepri_db` feature flag:

    rabbitmqctl enable_feature_flag khepri_db

When the `khepri_db` feature flag is enabled, the migration code
performs the following two tasks:
1. It synchronizes the Khepri cluster membership from the Mnesia
   cluster. It uses `mnesia_to_khepri:sync_cluster_membership/1` from
   the `khepri_mnesia_migration` application [3].
2. It copies data from relevant Mnesia tables to Khepri, doing some
   conversion if necessary on the way. Again, it uses
   `mnesia_to_khepri:copy_tables/4` from `khepri_mnesia_migration` to do
   it.

This can be performed on a running standalone RabbitMQ node or cluster.
Data will be migrated from Mnesia to Khepri without any service
interruption. Note that during the migration, the performance may
decrease and the memory footprint may go up.

Because this feature flag is considered experimental, it is not enabled
by default even on a brand new RabbitMQ deployment.

More about the implementation details below:

In the past months, all accesses to Mnesia were isolated in a collection
of `rabbit_db*` modules. This is where the integration of Khepri mostly
takes place: we use a function called `rabbit_khepri:handle_fallback/1`
which selects the database and perform the query or the transaction.
Here is an example from `rabbit_db_vhost`:

* Up until RabbitMQ 3.12.x:

        get(VHostName) when is_binary(VHostName) ->
            get_in_mnesia(VHostName).

* Starting with RabbitMQ 3.13.0:

        get(VHostName) when is_binary(VHostName) ->
            rabbit_khepri:handle_fallback(
              #{mnesia => fun() -> get_in_mnesia(VHostName) end,
                khepri => fun() -> get_in_khepri(VHostName) end}).

This `rabbit_khepri:handle_fallback/1` function relies on two things:
1. the fact that the `khepri_db` feature flag is enabled, in which case
   it always executes the Khepri-based variant.
4. the ability or not to read and write to Mnesia tables otherwise.

Before the feature flag is enabled, or during the migration, the
function will try to execute the Mnesia-based variant. If it succeeds,
then it returns the result. If it fails because one or more Mnesia
tables can't be used, it restarts from scratch: it means the feature
flag is being enabled and depending on the outcome, either the
Mnesia-based variant will succeed (the feature flag couldn't be enabled)
or the feature flag will be marked as enabled and it will call the
Khepri-based variant. The meat of this function really lives in the
`khepri_mnesia_migration` application [3] and
`rabbit_khepri:handle_fallback/1` is a wrapper on top of it that knows
about the feature flag.

However, some calls to the database do not depend on the existence of
Mnesia tables, such as functions where we need to learn about the
members of a cluster. For those, we can't rely on exceptions from
Mnesia. Therefore, we just look at the state of the feature flag to
determine which database to use. There are two situations though:

* Sometimes, we need the feature flag state query to block because the
  function interested in it can't return a valid answer during the
  migration. Here is an example:

        case rabbit_khepri:is_enabled(RemoteNode) of
            true  -> can_join_using_khepri(RemoteNode);
            false -> can_join_using_mnesia(RemoteNode)
        end

* Sometimes, we need the feature flag state query to NOT block (for
  instance because it would cause a deadlock). Here is an example:

        case rabbit_khepri:get_feature_state() of
            enabled -> members_using_khepri();
            _       -> members_using_mnesia()
        end

Direct accesses to Mnesia still exists. They are limited to code that is
specific to Mnesia such as classic queue mirroring or network partitions
handling strategies.

Now, to discover the Mnesia tables to migrate and how to migrate them,
we use an Erlang module attribute called
`rabbit_mnesia_tables_to_khepri_db` which indicates a list of Mnesia
tables and an associated converter module. Here is an example in the
`rabbitmq_recent_history_exchange` plugin:

    -rabbit_mnesia_tables_to_khepri_db(
       [{?RH_TABLE, rabbit_db_rh_exchange_m2k_converter}]).

The converter module  — `rabbit_db_rh_exchange_m2k_converter` in this
example  — is is fact a "sub" converter module called but
`rabbit_db_m2k_converter`. See the documentation of a `mnesia_to_khepri`
converter module to learn more about these modules.

[1] https://github.com/rabbitmq/ra
[2] https://github.com/rabbitmq/khepri
[3] https://github.com/rabbitmq/khepri_mnesia_migration

See #7206.

Co-authored-by: Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com>
Co-authored-by: Diana Parra Corbacho <dparracorbac@vmware.com>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
2023-09-29 16:00:11 +02:00
dependabot[bot] 88bb95c3ea
Bump com.rabbitmq:amqp-client
Bumps [com.rabbitmq:amqp-client](https://github.com/rabbitmq/rabbitmq-java-client) from 5.18.0 to 5.19.0.
- [Release notes](https://github.com/rabbitmq/rabbitmq-java-client/releases)
- [Commits](https://github.com/rabbitmq/rabbitmq-java-client/compare/v5.18.0...v5.19.0)

---
updated-dependencies:
- dependency-name: com.rabbitmq:amqp-client
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-09-27 18:09:42 +00:00
David Ansari d8ecc66a8e Do not confirm MQTT messages if in partition
Since MQTT publishers might publish to classic mirrored queues,
add the same check as in rabbit_channel:send_confirms_and_nacks/1:

```
If we are in a minority and pause_minority mode then a) we are
going to shut down imminently and b) we should not confirm anything
until then, since anything we confirm is likely to be lost.
```
2023-09-22 17:16:43 +02:00
Arnaud Cogoluègnes e22fcd70fe
Make MC conversion function return ok or error 2023-09-18 18:32:59 +02:00
Rin Kuryloski 214c5dd2d3 Add missing dep in rabbitmq_mqtt/Makefile 2023-09-13 10:44:37 +02:00
David Ansari 1eeb9b3a4f Fix MQTT test flake 2023-09-12 10:22:30 +02:00