* Reduce per message disk overhead
Message container annotation keys are stored on disk.
By shortening them we save 95 - 58 = 37 bytes per message.
```
1> byte_size(term_to_binary(#{exchange => <<>>, routing_keys => [<<"my-key">>], durable => true, priority => 3, timestamp => 1000})).
95
2> byte_size(term_to_binary(#{x => <<>>, rk => [<<"my-key">>], d => true, p => 3, ts => 1000})).
58
```
This should somewhat reduce disk I/O and disk space.
* Ensure durable is a boolean
Prevent key 'durable' with value 'undefined' being added to the
mc annotations, for example when the durable field was not set, but
another AMQP 1.0 header field was set.
* Apply feedback
The solution in #10203 has the following issues:
1. Bindings can be left ofter in Mnesia table rabbit_durable_queue.
One solution to 1. would be to first delete the old queue via
`rabbit_amqqueue:internal_delete(Q, User, missing_owner)`
and subsequently declare the new queue via
`rabbit_amqqueue:internal_declare(Q, false)`
However, even then, it suffers from:
2. Race conditions between `rabbit_amqqueue:on_node_down/1`
and `rabbit_mqtt_qos0_queue:declare/2`:
`rabbit_amqqueue:on_node_down/1` could first read the queue records that
need to be deleted, thereafter `rabbit_mqtt_qos0_queue:declare/2` could
re-create the queue owned by the new connection PID, and `rabbit_amqqueue:on_node_down/1`
could subsequently delete the re-created queue.
Unfortunately, `rabbit_amqqueue:on_node_down/1` does not delete
transient queues in one isolated transaction. Instead it first reads
queues and subsequenlty deletes queues in batches making it prone to
race conditions.
Ideally, this commit deletes all rabbit_mqtt_qos0_queue queues of the
node that has crashed including their bindings.
However, doing so in one transaction is risky as there may be millions
of such queues and the current code path applies the same logic on all
live nodes resulting in conflicting transactions and therefore a long
database operation.
Hence, this commit uses the simplest approach which should still be
safe:
Do not remove rabbit_mqtt_qos0_queue queues if a node crashes.
Other live nodes will continue to route to these dead queues.
That should be okay, given that the rabbit_mqtt_qos0_queue clients auto
confirm.
Continuing routing however has the effect of counting as routing result
for AMQP 0.9.1 `mandatory` property.
If an MQTT client re-connects to a live node with the same client ID,
the new node will delete and then re-create the queue.
Once the crashed node comes back online, it will clean up its leftover
queues and bindings.
When a node is shut down cleanly, the rabbit_mqtt_qos0_queue record is
removed from Mnesia.
When a node crashes and subsequently reboots the new node incarnation
removes the old rabbit_mqtt_qos0_queue record from Mnesia (via
rabbit_mqtt_qos0_queue:recover/2)
However, when a node crashes, the rabbit_mqtt_qos0_queue will be removed
from Mnesia table rabbit_queue, but will still be present in table
rabbit_durable_queue on the other live nodes.
Prior to this commit, when the same MQTT client (i.e. same MQTT client
ID) re-connects from the crashed node to another live node and
re-subscribes, the following error occurred:
```
[info] <0.43155.0> Accepted MQTT connection 10.105.0.18:60508 -> 10.105.0.10:1883 for client ID nodered_24e214feb018a232
[debug] <0.43155.0> Received a SUBSCRIBE for topic(s) [{mqtt_topic,
[debug] <0.43155.0> <<"as923/gateway/+/command/#">>,0}]
[error] <0.43155.0> Failed to declare queue 'mqtt-subscription-nodered_24e214feb018a232qos0' in vhost '/': {absent,
[error] <0.43155.0> {amqqueue,
[error] <0.43155.0> {resource,
[error] <0.43155.0> <<"/">>,
[error] <0.43155.0> queue,
[error] <0.43155.0> <<"mqtt-subscription-nodered_24e214feb018a232qos0">>},
[error] <0.43155.0> true,
[error] <0.43155.0> false,
[error] <0.43155.0> <15486.32690.0>,
[error] <0.43155.0> [],
[error] <0.43155.0> <15486.32690.0>,
[error] <0.43155.0> [],
[error] <0.43155.0> [],
[error] <0.43155.0> [],
[error] <0.43155.0> [{vhost,
[error] <0.43155.0> <<"/">>},
[error] <0.43155.0> {name,
[error] <0.43155.0> <<"ha-all-mqtt">>},
[error] <0.43155.0> {pattern,
[error] <0.43155.0> <<"^mqtt-">>},
[error] <0.43155.0> {'apply-to',
[error] <0.43155.0> <<"all">>},
[error] <0.43155.0> {definition,
[error] <0.43155.0> [{<<"ha-mode">>,
[error] <0.43155.0> <<"all">>}]},
[error] <0.43155.0> {priority,
[error] <0.43155.0> 0}],
[error] <0.43155.0> undefined,
[error] <0.43155.0> [],
[error] <0.43155.0> undefined,
[error] <0.43155.0> live,
[error] <0.43155.0> 0,
[error] <0.43155.0> [],
[error] <0.43155.0> <<"/">>,
[error] <0.43155.0> #{user =>
[error] <0.43155.0> <<"iottester">>},
[error] <0.43155.0> rabbit_mqtt_qos0_queue,
[error] <0.43155.0> #{}},
[error] <0.43155.0> nodedown}
[error] <0.43155.0> MQTT protocol error on connection 10.105.0.18:60508 -> 10.105.0.10:1883: subscribe_error
```
This commit fixes this error allowing an MQTT client that connects with CleanSession=true and
subscribes with QoS 0 to re-connect and re-subscribe to another live
node if the original Rabbit node crashes.
Reported in https://groups.google.com/g/rabbitmq-users/c/pxgy0QiwilM/m/LkJQ-3DyBgAJ
Avoid the following crash
```
** Reason for termination ==
** {mqtt_unexpected_cast,{shutdown,"Closed via management plugin"}}
crasher:
initial call: rabbit_mqtt_reader:init/1
pid: <0.1096.0>
registered_name: []
exception exit: {mqtt_unexpected_cast,
{shutdown,"Closed via management plugin"}}
in function gen_server:handle_common_reply/8 (gen_server.erl, line 1208)
```
when closing MQTT or Stream connections via HTTP API endpoint
```
/connections/username/:username
```
To refine conversion behaviour add additional tests
and ensure it matches the documentation.
mc: optionally capture source environment
And pass target environment to mc:convert
This allows environmental data and configuration to be captured and
used to modify and complete conversion logic whilst allowing conversion
code to remain pure and portable.
Prior to this commit:
1. Start RabbitMQ with MQTT plugin enabled.
2.
```
rabbitmq-diagnostics consume_event_stream
^C
```
3. The logs will print the following warning:
```
[warning] <0.570.0> ** Undefined handle_info in rabbit_mqtt_internal_event_handler
[warning] <0.570.0> ** Unhandled message: {'DOWN',#Ref<0.2410135134.1846280193.145044>,process,
[warning] <0.570.0> <52723.100.0>,noconnection}
[warning] <0.570.0>
```
This is because rabbit_event_consumer:init/1 monitors the CLI process.
Any rabbit_event handler should therefore implement handle_info/2.
It's similar to what's described in the gen_event docs about
add_sup_handler/3:
> Any event handler attached to an event manager which in turn has a
> supervised handler should expect callbacks of the shape
> Module:handle_info({'EXIT', Pid, Reason}, State).
Listing queues with the HTTP API when there are many (1000s) of
quorum queues could be excessively slow compared to the same scenario
with classic queues.
This optimises various aspects of HTTP API queue listings.
For QQs it removes the expensive cluster wide rpcs used to get the
"online" status of each quorum queue. This was previously done _before_
paging and thus would perform a cluster-wide query for _each_ quorum queue in
the vhost/system. This accounted for most of the slowness compared to
classic queues.
Secondly the query to separate the running from the down queues
consisted of two separate queries that later were combined when a single
query would have sufficed.
This commit also includes a variety of other improvements and minor
fixes discovered during testing and optimisation.
MINOR BREAKING CHANGE: quorum queues would previously only display one
of two states: running or down. Now there is a new state called minority
which is emitted when the queue has at least one member running but
cannot commit entries due to lack of quorum.
Also the quorum queue may transiently enter the down state when a node
goes down and before its elected a new leader.
Previously, test pubsub was flaky
```
{shared_SUITE,pubsub,766}
{test_case_failed,missing m1}
```
because the binding wasn't present yet on node 0 when publishing to node
0.
Prior to this commit the follwing test was flaky:
```
bazel test //deps/rabbitmq_mqtt:v5_SUITE -t- --test_sharding_strategy=disabled \
--test_env FOCUS="-group [mqtt,cluster_size_3] -case session_takeover_v3_v5" \
--test_env RABBITMQ_METADATA_STORE=khepri --config=rbe-26 --runs_per_test=20
```
because rabbit_misc:maps_any/2 filtered out a destination queue after routing if
that destination queue wasn't associated with any matched binding key.
This commit makes the test green.
However, the root cause of this issue isn't solved:
MQTT 5.0 requires the topic exchange to return matched binding keys for
destination queues such that feature NoLocal, and Subscription
Identifiers work correctly.
The current MQTT plugin relies on session state to be stored
consistently in the database. When a new client connects, the session
state is retrieved from the database and appropriate actions are taken:
e.g. consume from a queue, modify binding arguments, etc.
With Mnesia this consistency was guaranteed thanks to sync transactions
when updating queues and bindings.
Khepri has only eventual consistency semantics. This is problematic for the
MQTT plugin in the session_takeover_v3_v5 test scenario:
1. Client subscribes on node 1 (with v3). Node 1 returns subscription
success to client.
2. **Thereafter**, another client with the same MQTT client ID connects
to node 0 (with v5). "Proper" session takeover should take place.
However due to eventual consistency, the subscription / binding isn't
present yet on node 0. Therefore the session upgrade from v3 to v5
does not take place and leads to binding keys being absent when
messages are routed to the session's queue.
Tests session_reconnect and session_takeover were flaky, specifically
when run under Khepri.
The issue was in the test itself that the connect properties didn't
apply. Therefore, prior to this commit an exclusive queue got created.
On `main` branch and v3.12.6 feature flag delete_ra_cluster_mqtt_node is
supported. Instead of skipping the entire test if that feature flag is
not enabled, enable the feature flag and run the test.
More generally:
"Instead of verifying if a feature flag is enabled or not, it's best to enable
it and react from the return value (success or failure).
Mixed version testing always turn off all feature flags by default.
So in the future, even though all nodes supports the mentionned
feature flag, the testcase will still be skipped." [JSP]
See commit message 00c77e0a1a for details.
In a multi node mixed version cluster where the lower version is
compiled with a different OTP version, anonymous Ra leader queries will
fail with a badfun error if initiated on the higher version and executed
on the leader on the lower version node.
as selective receives are efficient in OTP 26:
```
OTP-18431
Application(s):
compiler, stdlib
Related Id(s):
PR-6739
Improved the selective receive optimization, which can now be enabled for references returned from other functions.
This greatly improves the performance of gen_server:send_request/3, gen_server:wait_response/2, and similar functions.
```
Since Erlang/OTP 26:
```
OTP-18445
Application(s):
erts, stdlib
It is no longer necessary to enable a feature in the runtime system in order to load modules that are using it.
It is sufficient to enable the feature in the compiler when compiling it.
That means that to use feature maybe_expr in Erlang/OTP 26, it is sufficient to enable it during compilation.
In Erlang/OTP 27, feature maybe_expr will be enabled by default, but it will be possible to disable it.
```
This test fails when MQTT client ID tracking is performed in Ra, and the
higher version node gets compiled with a different OTP version (26) than
the lower version node (25).
The reason is described in 83eede7ef2
```
An interesting side note learned here is that the compiled file
rabbit_mqtt_collector must not be changed. This commit only modifies
function specs. However as soon as the compiled code is changed, this
module becomes a new version. The new version causes the anonymous ra query
function to fail in mixed clusters: When the old node does a
ra:leader_query where the leader is on the new node, the query function
fails on the new node with `badfun` because the new node does not have
the same module version. For more context, read:
https://web.archive.org/web/20181017104411/http://www.javalimit.com/2010/05/passing-funs-to-other-erlang-nodes.html
```
We shouldn’t use an anonymous function for ra:leader_query or ra:consistent_query.
Instead we should use the {M,F,A} form.
9e5d437a0a/src/ra.erl (L102-L103)
In MQTT the anonymous function is used in bcb95c949d/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl (L50)
This causes the query to return a bad fun error (silently ignored in bcb95c949d/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl (L70-L71) )
when executed on a different node and either:
1.) Any code in file rabbit_mqtt_collector.erl changed, or
2.) The code gets compiled with a different OTP version.
2.) is the reason for a failing mixed version test in https://github.com/rabbitmq/rabbitmq-server/pull/8553 because both higher and lower versions run OTP 26,
but the higher version node got compiled with 26 while the lower version node got compiled with 25.
The same file
compiled with OTP 26.0.1
```
1> rabbit_mqtt_collector:module_info(attributes).
[{vsn,[30045739264236496640687548892374951597]}]
```
compiled with OTP 25.3.2
```
1> rabbit_mqtt_collector:module_info(attributes).
[{vsn,[168144385419873449889532520247510637232]}]
```
Due to the very low impact that maintenance mode will not close all MQTT
client connections with feature flag delete_ra_cluster_mqtt_node being
disabled, we skip this test.
[Why]
Mnesia is a very powerful and convenient tool for Erlang applications:
it is a persistent disc-based database, it handles replication accross
multiple Erlang nodes and it is available out-of-the-box from the
Erlang/OTP distribution. RabbitMQ relies on Mnesia to manage all its
metadata:
* virtual hosts' properties
* intenal users
* queue, exchange and binding declarations (not queues data)
* runtime parameters and policies
* ...
Unfortunately Mnesia makes it difficult to handle network partition and,
as a consequence, the merge conflicts between Erlang nodes once the
network partition is resolved. RabbitMQ provides several partition
handling strategies but they are not bullet-proof. Users still hit
situations where it is a pain to repair a cluster following a network
partition.
[How]
@kjnilsson created Ra [1], a Raft consensus library that RabbitMQ
already uses successfully to implement quorum queues and streams for
instance. Those queues do not suffer from network partitions.
We created Khepri [2], a new persistent and replicated database engine
based on Ra and we want to use it in place of Mnesia in RabbitMQ to
solve the problems with network partitions.
This patch integrates Khepri as an experimental feature. When enabled,
RabbitMQ will store all its metadata in Khepri instead of Mnesia.
This change comes with behavior changes. While Khepri remains disabled,
you should see no changes to the behavior of RabbitMQ. If there are
changes, it is a bug. After Khepri is enabled, there are significant
changes of behavior that you should be aware of.
Because it is based on the Raft consensus algorithm, when there is a
network partition, only the cluster members that are in the partition
with at least `(Number of nodes in the cluster ÷ 2) + 1` number of nodes
can "make progress". In other words, only those nodes may write to the
Khepri database and read from the database and expect a consistent
result.
For instance in a cluster of 5 RabbitMQ nodes:
* If there are two partitions, one with 3 nodes, one with 2 nodes, only
the group of 3 nodes will be able to write to the database.
* If there are three partitions, two with 2 nodes, one with 1 node, none
of the group can write to the database.
Because the Khepri database will be used for all kind of metadata, it
means that RabbitMQ nodes that can't write to the database will be
unable to perform some operations. A list of operations and what to
expect is documented in the associated pull request and the RabbitMQ
website.
This requirement from Raft also affects the startup of RabbitMQ nodes in
a cluster. Indeed, at least a quorum number of nodes must be started at
once to allow nodes to become ready.
To enable Khepri, you need to enable the `khepri_db` feature flag:
rabbitmqctl enable_feature_flag khepri_db
When the `khepri_db` feature flag is enabled, the migration code
performs the following two tasks:
1. It synchronizes the Khepri cluster membership from the Mnesia
cluster. It uses `mnesia_to_khepri:sync_cluster_membership/1` from
the `khepri_mnesia_migration` application [3].
2. It copies data from relevant Mnesia tables to Khepri, doing some
conversion if necessary on the way. Again, it uses
`mnesia_to_khepri:copy_tables/4` from `khepri_mnesia_migration` to do
it.
This can be performed on a running standalone RabbitMQ node or cluster.
Data will be migrated from Mnesia to Khepri without any service
interruption. Note that during the migration, the performance may
decrease and the memory footprint may go up.
Because this feature flag is considered experimental, it is not enabled
by default even on a brand new RabbitMQ deployment.
More about the implementation details below:
In the past months, all accesses to Mnesia were isolated in a collection
of `rabbit_db*` modules. This is where the integration of Khepri mostly
takes place: we use a function called `rabbit_khepri:handle_fallback/1`
which selects the database and perform the query or the transaction.
Here is an example from `rabbit_db_vhost`:
* Up until RabbitMQ 3.12.x:
get(VHostName) when is_binary(VHostName) ->
get_in_mnesia(VHostName).
* Starting with RabbitMQ 3.13.0:
get(VHostName) when is_binary(VHostName) ->
rabbit_khepri:handle_fallback(
#{mnesia => fun() -> get_in_mnesia(VHostName) end,
khepri => fun() -> get_in_khepri(VHostName) end}).
This `rabbit_khepri:handle_fallback/1` function relies on two things:
1. the fact that the `khepri_db` feature flag is enabled, in which case
it always executes the Khepri-based variant.
4. the ability or not to read and write to Mnesia tables otherwise.
Before the feature flag is enabled, or during the migration, the
function will try to execute the Mnesia-based variant. If it succeeds,
then it returns the result. If it fails because one or more Mnesia
tables can't be used, it restarts from scratch: it means the feature
flag is being enabled and depending on the outcome, either the
Mnesia-based variant will succeed (the feature flag couldn't be enabled)
or the feature flag will be marked as enabled and it will call the
Khepri-based variant. The meat of this function really lives in the
`khepri_mnesia_migration` application [3] and
`rabbit_khepri:handle_fallback/1` is a wrapper on top of it that knows
about the feature flag.
However, some calls to the database do not depend on the existence of
Mnesia tables, such as functions where we need to learn about the
members of a cluster. For those, we can't rely on exceptions from
Mnesia. Therefore, we just look at the state of the feature flag to
determine which database to use. There are two situations though:
* Sometimes, we need the feature flag state query to block because the
function interested in it can't return a valid answer during the
migration. Here is an example:
case rabbit_khepri:is_enabled(RemoteNode) of
true -> can_join_using_khepri(RemoteNode);
false -> can_join_using_mnesia(RemoteNode)
end
* Sometimes, we need the feature flag state query to NOT block (for
instance because it would cause a deadlock). Here is an example:
case rabbit_khepri:get_feature_state() of
enabled -> members_using_khepri();
_ -> members_using_mnesia()
end
Direct accesses to Mnesia still exists. They are limited to code that is
specific to Mnesia such as classic queue mirroring or network partitions
handling strategies.
Now, to discover the Mnesia tables to migrate and how to migrate them,
we use an Erlang module attribute called
`rabbit_mnesia_tables_to_khepri_db` which indicates a list of Mnesia
tables and an associated converter module. Here is an example in the
`rabbitmq_recent_history_exchange` plugin:
-rabbit_mnesia_tables_to_khepri_db(
[{?RH_TABLE, rabbit_db_rh_exchange_m2k_converter}]).
The converter module — `rabbit_db_rh_exchange_m2k_converter` in this
example — is is fact a "sub" converter module called but
`rabbit_db_m2k_converter`. See the documentation of a `mnesia_to_khepri`
converter module to learn more about these modules.
[1] https://github.com/rabbitmq/ra
[2] https://github.com/rabbitmq/khepri
[3] https://github.com/rabbitmq/khepri_mnesia_migration
See #7206.
Co-authored-by: Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com>
Co-authored-by: Diana Parra Corbacho <dparracorbac@vmware.com>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
Since MQTT publishers might publish to classic mirrored queues,
add the same check as in rabbit_channel:send_confirms_and_nacks/1:
```
If we are in a minority and pause_minority mode then a) we are
going to shut down imminently and b) we should not confirm anything
until then, since anything we confirm is likely to be lost.
```
Fix the followin flake:
```
[ERROR] com.rabbitmq.mqtt.test.MqttTest.sessionRedelivery(TestInfo) -- Time elapsed: 0.959 s <<< FAILURE!
org.opentest4j.AssertionFailedError:
Expecting value to be false but was true
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at com.rabbitmq.mqtt.test.MqttTest.sessionRedelivery(MqttTest.java:523)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
[ERROR] Failures:
[ERROR] MqttTest.sessionRedelivery:523
Expecting value to be false but was true
```
Why:
https://github.com/rabbitmq/rabbitmq-server/discussions/9196 reports
that failures in sending can cause huge log amounts and eventually even
memory alarms.
How:
Similar to rabbit_writer and rabbit_amqp1_0_writer, when sending on a
network connection fails, the connection process will be terminated
immediately.
This PR implements an approach for a "protocol (data format) agnostic core" where the format of the message isn't converted at point of reception.
Currently all non AMQP 0.9.1 originating messages are converted into a AMQP 0.9.1 flavoured basic_message record before sent to a queue. If the messages are then consumed by the originating protocol they are converted back from AMQP 0.9.1. For some protocols such as MQTT 3.1 this isn't too expensive as MQTT is mostly a fairly easily mapped subset of AMQP 0.9.1 but for others such as AMQP 1.0 the conversions are awkward and in some cases lossy even if consuming from the originating protocol.
This PR instead wraps all incoming messages in their originating form into a generic, extensible message container type (mc). The container module exposes an API to get common message details such as size and various properties (ttl, priority etc) directly from the source data type. Each protocol needs to implement the mc behaviour such that when a message originating form one protocol is consumed by another protocol we convert it to the target protocol at that point.
The message container also contains annotations, dead letter records and other meta data we need to record during the lifetime of a message. The original protocol message is never modified unless it is consumed.
This includes conversion modules to and from amqp, amqpl (AMQP 0.9.1) and mqtt.
COMMIT HISTORY:
* Refactor away from using the delivery{} record
In many places including exchange types. This should make it
easier to move towards using a message container type instead of
basic_message.
Add mc module and move direct replies outside of exchange
Lots of changes incl classic queues
Implement stream support incl amqp conversions
simplify mc state record
move mc.erl
mc dlx stuff
recent history exchange
Make tracking work
But doesn't take a protocol agnostic approach as we just convert
everything into AMQP legacy and back. Might be good enough for now.
Tracing as a whole may want a bit of a re-vamp at some point.
tidy
make quorum queue peek work by legacy conversion
dead lettering fixes
dead lettering fixes
CMQ fixes
rabbit_trace type fixes
fixes
fix
Fix classic queue props
test assertion fix
feature flag and backwards compat
Enable message_container feature flag in some SUITEs
Dialyzer fixes
fixes
fix
test fixes
Various
Manually update a gazelle generated file
until a gazelle enhancement can be made
https://github.com/rabbitmq/rules_erlang/issues/185
Add message_containers_SUITE to bazel
and regen bazel files with gazelle from rules_erlang@main
Simplify essential proprty access
Such as durable, ttl and priority by extracting them into annotations
at message container init time.
Move type
to remove dependenc on amqp10 stuff in mc.erl
mostly because I don't know how to make bazel do the right thing
add more stuff
Refine routing header stuff
wip
Cosmetics
Do not use "maybe" as type name as "maybe" is a keyword since OTP 25
which makes Erlang LS complain.
* Dedup death queue names
* Fix function clause crashes
Fix failing tests in the MQTT shared_SUITE:
A classic queue message ID can be undefined as set in
fbe79ff47b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl (L1048)
Fix failing tests in the MQTT shared_SUITE-mixed:
When feature flag message_containers is disabled, the
message is not an #mc{} record, but a #basic_message{} record.
* Fix is_utf8_no_null crash
Prior to this commit, the function crashed if invalid UTF-8 was
provided, e.g.:
```
1> rabbit_misc:is_valid_shortstr(<<"😇"/utf16>>).
** exception error: no function clause matching rabbit_misc:is_utf8_no_null(<<216,61,222,7>>) (rabbit_misc.erl, line 1481)
```
* Implement mqtt mc behaviour
For now via amqp translation.
This is still work in progress, but the following SUITEs pass:
```
make -C deps/rabbitmq_mqtt ct-shared t=[mqtt,v5,cluster_size_1] FULL=1
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_1] FULL=1
```
* Shorten mc file names
Module name length matters because for each persistent message the #mc{}
record is persisted to disk.
```
1> iolist_size(term_to_iovec({mc, rabbit_mc_amqp_legacy})).
30
2> iolist_size(term_to_iovec({mc, mc_amqpl})).
17
```
This commit renames the mc modules:
```
ag -l rabbit_mc_amqp_legacy | xargs sed -i 's/rabbit_mc_amqp_legacy/mc_amqpl/g'
ag -l rabbit_mc_amqp | xargs sed -i 's/rabbit_mc_amqp/mc_amqp/g'
ag -l rabbit_mqtt_mc | xargs sed -i 's/rabbit_mqtt_mc/mc_mqtt/g'
```
* mc: make deaths an annotation + fixes
* Fix mc_mqtt protocol_state callback
* Fix test will_delay_node_restart
```
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_3]:will_delay_node_restart FULL=1
```
* Bazel run gazelle
* mix format rabbitmqctl.ex
* Ensure ttl annotation is refelected in amqp legacy protocol state
* Fix id access in message store
* Fix rabbit_message_interceptor_SUITE
* dializer fixes
* Fix rabbit:rabbit_message_interceptor_SUITE-mixed
set_annotation/3 should not result in duplicate keys
* Fix MQTT shared_SUITE-mixed
Up to 3.12 non-MQTT publishes were always QoS 1 regardless of delivery_mode.
75a953ce28/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl (L2075-L2076)
From now on, non-MQTT publishes are QoS 1 if durable.
This makes more sense.
The MQTT plugin must send a #basic_message{} to an old node that does
not understand message containers.
* Field content of 'v1_0.data' can be binary
Fix
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed \
--test_env FOCUS="-group [mqtt,v4,cluster_size_1] -case trace" \
-t- --test_sharding_strategy=disabled
```
* Remove route/2 and implement route/3 for all exchange types.
This removes the route/2 callback from rabbit_exchange_type and
makes route/3 mandatory instead. This is a breaking change and
will require all implementations of exchange types to update their
code, however this is necessary anyway for them to correctly handle
the mc type.
stream filtering fixes
* Translate directly from MQTT to AMQP 0.9.1
* handle undecoded properties in mc_compat
amqpl: put clause in right order
recover death deatails from amqp data
* Replace callback init_amqp with convert_from
* Fix return value of lists:keyfind/3
* Translate directly from AMQP 0.9.1 to MQTT
* Fix MQTT payload size
MQTT payload can be a list when converted from AMQP 0.9.1 for example
First conversions tests
Plus some other conversion related fixes.
bazel
bazel
translate amqp 1.0 null to undefined
mc: property/2 and correlation_id/message_id return type tagged values.
To ensure we can support a variety of types better.
The type type tags are AMQP 1.0 flavoured.
fix death recovery
mc_mqtt: impl new api
Add callbacks to allow protocols to compact data before storage
And make readable if needing to query things repeatedly.
bazel fix
* more decoding
* tracking mixed versions compat
* mc: flip default of `durable` annotation to save some data.
Assuming most messages are durable and that in memory messages suffer less
from persistence overhead it makes sense for a non existent `durable`
annotation to mean durable=true.
* mc conversion tests and tidy up
* mc make x_header unstrict again
* amqpl: death record fixes
* bazel
* amqp -> amqpl conversion test
* Fix crash in mc_amqp:size/1
Body can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0/ ct-system t=java
```
on branch native-amqp.
* Fix crash in lists:flatten/1
Data can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp.
* Fix crash in rabbit_writer
Running test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp resulted in the following crash:
```
crasher:
initial call: rabbit_writer:enter_mainloop/2
pid: <0.711.0>
registered_name: []
exception error: bad argument
in function size/1
called as size([<<0>>,<<"Sw">>,[<<160,2>>,<<"hi">>]])
*** argument 1: not tuple or binary
in call from rabbit_binary_generator:build_content_frames/7 (rabbit_binary_generator.erl, line 89)
in call from rabbit_binary_generator:build_simple_content_frames/4 (rabbit_binary_generator.erl, line 61)
in call from rabbit_writer:assemble_frames/5 (rabbit_writer.erl, line 334)
in call from rabbit_writer:internal_send_command_async/3 (rabbit_writer.erl, line 365)
in call from rabbit_writer:handle_message/2 (rabbit_writer.erl, line 265)
in call from rabbit_writer:handle_message/3 (rabbit_writer.erl, line 232)
in call from rabbit_writer:mainloop1/2 (rabbit_writer.erl, line 223)
```
because #content.payload_fragments_rev is currently supposed to
be a flat list of binaries instead of being an iolist.
This commit fixes this crash inefficiently by calling
iolist_to_binary/1. A better solution would be to allow AMQP legacy's #content.payload_fragments_rev
to be an iolist.
* Add accidentally deleted line back
* mc: optimise mc_amqp internal format
By removint the outer records for message and delivery annotations
as well as application properties and footers.
* mc: optimis mc_amqp map_add by using upsert
* mc: refactoring and bug fixes
* mc_SUITE routingheader assertions
* mc remove serialize/1 callback as only used by amqp
* mc_amqp: avoid returning a nested list from protocol_state
* test and bug fix
* move infer_type to mc_util
* mc fixes and additiona assertions
* Support headers exchange routing for MQTT messages
When a headers exchange is bound to the MQTT topic exchange, routing
will be performend based on both MQTT topic (by the topic exchange) and
MQTT User Property (by the headers exchange).
This combines the best worlds of both MQTT 5.0 and AMQP 0.9.1 and
enables powerful routing topologies.
When the User Property contains the same name multiple times, only the
last name (and value) will be considered by the headers exchange.
* Fix crash when sending from stream to amqpl
When publishing a message via the stream protocol and consuming it via
AMQP 0.9.1, the following crash occurred prior to this commit:
```
crasher:
initial call: rabbit_channel:init/1
pid: <0.818.0>
registered_name: []
exception exit: {{badmatch,undefined},
[{rabbit_channel,handle_deliver0,4,
[{file,"rabbit_channel.erl"},
{line,2728}]},
{lists,foldl,3,[{file,"lists.erl"},{line,1594}]},
{rabbit_channel,handle_cast,2,
[{file,"rabbit_channel.erl"},
{line,728}]},
{gen_server2,handle_msg,2,
[{file,"gen_server2.erl"},{line,1056}]},
{proc_lib,wake_up,3,
[{file,"proc_lib.erl"},{line,251}]}]}
```
This commit first gives `mc:init/3` the chance to set exchange and
routing_keys annotations.
If not set, `rabbit_stream_queue` will set these annotations assuming
the message was originally published via the stream protocol.
* Support consistent hash exchange routing for MQTT 5.0
When a consistent hash exchange is bound to the MQTT topic exchange,
MQTT 5.0 messages can be routed to queues consistently based on the
Correlation-Data in the PUBLISH packet.
* Convert MQTT 5.0 User Property
* to AMQP 0.9.1 headers
* from AMQP 0.9.1 headers
* to AMQP 1.0 application properties and message annotations
* from AMQP 1.0 application properties and message annotations
* Make use of Annotations in mc_mqtt:protocol_state/2
mc_mqtt:protocol_state/2 includes Annotations as parameter.
It's cleaner to make use of these Annotations when computing the
protocol state instead of relying on the caller (rabbitmq_mqtt_processor)
to compute the protocol state.
* Enforce AMQP 0.9.1 field name length limit
The AMQP 0.9.1 spec prohibits field names longer than 128 characters.
Therefore, when converting AMQP 1.0 message annotations, application
properties or MQTT 5.0 User Property to AMQP 0.9.1 headers, drop any
names longer than 128 characters.
* Fix type specs
Apply feedback from Michael Davis
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
* Add mc_mqtt unit test suite
Implement mc_mqtt:x_header/2
* Translate indicator that payload is UTF-8 encoded
when converting between MQTT 5.0 and AMQP 1.0
* Translate single amqp-value section from AMQP 1.0 to MQTT
Convert to a text representation, if possible, and indicate to MQTT
client that the payload is UTF-8 encoded. This way, the MQTT client will
be able to parse the payload.
If conversion to text representation is not possible, encode the payload
using the AMQP 1.0 type system and indiate the encoding via Content-Type
message/vnd.rabbitmq.amqp.
This Content-Type is not registered.
Type "message" makes sense since it's a message.
Vendor tree "vnd.rabbitmq.amqp" makes sense since merely subtype "amqp" is not
registered.
* Fix payload conversion
* Translate Response Topic between MQTT and AMQP
Translate MQTT 5.0 Response Topic to AMQP 1.0 reply-to address and vice
versa.
The Response Topic must be a UTF-8 encoded string.
This commit re-uses the already defined RabbitMQ target addresses:
```
"/topic/" RK Publish to amq.topic with routing key RK
"/exchange/" X "/" RK Publish to exchange X with routing key RK
```
By default, the MQTT topic exchange is configure dto be amq.topic using
the 1st target address.
When an operator modifies the mqtt.exchange, the 2nd target address is
used.
* Apply PR feedback
and fix formatting
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
* tidy up
* Add MQTT message_containers test
* consistent hash exchange: avoid amqp legacy conversion
When hashing on a header value.
* Avoid converting to amqp legacy when using exchange federation
* Fix test flake
* test and dialyzer fixes
* dialyzer fix
* Add MQTT protocol interoperability tests
Test receiving from and sending to MQTT 5.0 and
* AMQP 0.9.1
* AMQP 1.0
* STOMP
* Streams
* Regenerate portions of deps/rabbit/app.bzl with gazelle
I'm not exactly sure how this happened, but gazell seems to have been
run with an older version of the rules_erlang gazelle extension at
some point. This caused generation of a structure that is no longer
used. This commit updates the structure to the current pattern.
* mc: refactoring
* mc_amqpl: handle delivery annotations
Just in case they are included.
Also use iolist_to_iovec to create flat list of binaries when
converting from amqp with amqp encoded payload.
---------
Co-authored-by: David Ansari <david.ansari@gmx.de>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
Co-authored-by: Rin Kuryloski <kuryloskip@vmware.com>
Setting a username with an empty password is allowed by the MQTT spec:
> If the Password Flag is set to 1, this is the next field in the payload. The Password field contains 0 to 65535 bytes of binary data
RabbitMQ's default auth backend `rabbit_auth_backend_internal` prohibits
empty passwords:
64d69f76ff/deps/rabbit/src/rabbit_auth_backend_internal.erl (L77-L82)
Since some RabbitMQ operators were confused about other auth
backends such as `rabbit_auth_backend_http`, let us prohibit empty MQTT
passwords for all auth backends.
Why:
A RabbitMQ operator should be able to see whether RabbitMQ drops MQTT
QoS 0 messages due to overload protection. It's an indication that an
MQTT subscriber does not consume fast enough.
How:
Use Prometheus global counters.
There are 2 valid solutions:
1. Introduce a new metric called messages_dropped specifically for the
rabbitmq_mqtt_qos0_queue type. This would work in a similar fashion
how streams extends the per protocol global counters, but requires
extending the per protocol & queue type global counters for the MQTT
QoS queue type. The emitted metrics would look as follows:
```
rabbitmq_global_messages_dropped_total{protocol="mqtt310",queue_type="rabbit_mqtt_qos0_queue"} 0
rabbitmq_global_messages_dropped_total{protocol="mqtt311",queue_type="rabbit_mqtt_qos0_queue"} 0
rabbitmq_global_messages_dropped_total{protocol="mqtt50",queue_type="rabbit_mqtt_qos0_queue"} 0
```
2. Reuse the existing metric rabbitmq_global_messages_dead_lettered_maxlen_total
This commit decides to go for the 2nd approach because:
a) there is no need to add a new metric. Even though dead lettering is not supported
for the MQTT QoS 0 queue type, this metric maps nicely to
what happens: The queue drop messages since itx max length
(mqtt.mailbox_soft_limit) is exceeded with overflow behaviour
drop-head. Furtheremore the label `dead_letter_strategy="disabled"` tells
that dead lettering is not taking place from this queue type.
b) this metric allows to support dead lettering for the MQTT QoS 0 queue
type in the future.
The new dead lettering metrics look as follows:
```
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_mqtt_qos0_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_rejected_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_delivery_limit_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0
rabbitmq_global_messages_dead_lettered_confirmed_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
```
Multiple things were wrong in this test:
1. The test name ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping was wrong.
As written in our docs, the opposite is true
> The certificate-to-vhost mapping with the mqtt_default_vhosts global parameter is considered more specific than the port-to-vhost mapping with the mqtt_port_to_vhost_mapping global parameter and so takes precedence over it.
2. The test case didn't test what it was supposed to test:
The test relies on revoking the permissions that were set for the
port_to_vhost mapping. However revoking these permission did not happen
because:
2.a) The wrong Config was used, and
2.b) The wrong key was used.
2. could be observed by the following CT warning being logged:
```
Could not find element mqtt_port_to_vhost_mapping in Config.
```
since MQTT 5.0 supports negative acknowledgements thanks to reason codes
in the PUBACK packet.
We could either choose reason code 128 or 131. The description code for
131 applies for rejected messages, hence this commit uses 131:
> The PUBLISH is valid but the receiver is not willing to accept it.
due to AMQP 1.0 spec:
> The annotations type is a map where the keys are restricted to be of type symbol or of type ulong.
All ulong keys, and all symbolic keys except those beginning with "x-" are reserved.
Keys beginning with "x-opt-" MUST be ignored if not understood.
On receiving an annotation key which is not understood, and which does not begin with "x-opt",
the receiving AMQP container MUST detach the link with a not-implemented error.
So, for new headers being introduced it makes sense to comply with that
AMQP 1.0 requirement. We don't want the receiver to force to understand
this header.
Starting with RabbitMQ 3.13 mqtt.max_session_expiry_interval_seconds
(set in seconds) will replace the previous setting
mqtt.subscription_ttl.
MQTT 5.0 introduces the Session Expiry Interval
feature which does not only apply to subscribers, but also to
publishers.
The new config name mqtt.max_session_expiry_interval_seconds makes it clear
that it also applies to publishers.
Prior to this commit, when mqtt.subscription_ttl was set, a warning got
logged and the value was ignored. This is dangerous if an operator does
not see the warning but relies for example on `mqtt.subscription =
infinity` to not expire non clean session.
It's safer to make the boot fail if that unsupported config name is
still set. A clear error message will be logged:
```
[error] <0.142.0> Error preparing configuration in phase apply_translations:
[error] <0.142.0> - Translation for 'rabbitmq_mqtt.subscription_ttl' found invalid configuration:
Since 3.13 mqtt.subscription_ttl (in milliseconds) is unsupported.
Use mqtt.max_session_expiry_interval_seconds (in seconds) instead.
```
Alternatively, RabbitMQ could translate mqtt.subscription_ttl to
mqtt.max_session_expiry_interval_seconds.
However, forcing the new config option sounds the better way to go.
Once we write MQTT 5.0 docs, this change must go into the 3.13 release notes.
This commit also renames max_session_expiry_interval_secs to max_session_expiry_interval_seconds.
The latter is clearer to users.
The MQTT v5 spec is a bit contradictory and imprecise when it comes to the
Property Length.
It first mandates that if there are no properties, this MUST be
indidated by including a Property Length of 0:
```
2.2.2.1 Property Length
The Property Length is encoded as a Variable Byte Integer. The Property Length
does not include the bytes used to encode itself, but includes the length of
the Properties. If there are no properties, this MUST be indicated by including
a Property Length of zero [MQTT-2.2.2-1].
```
All MQTT packet's Property Length sections that only mention:
```
The length of the Properties in the PUBLISH packet Variable Header encoded as a Variable Byte Integer.
```
So, they follow above requirement.
However, MQTT packets PUBACK, PUBREC, PUBREL, PUBCOMP, and DISCONNECT seem to be exceptions to this rule:
```
3.4.2.2 PUBACK Properties
3.4.2.2.1 Property Length
The length of the Properties in the PUBACK packet Variable Header encoded as a Variable Byte Integer.
If the Remaining Length is less than 4 there is no Property Length and the value of 0 is used.
```
```
3.14.2.2 DISCONNECT Properties
3.14.2.2.1 Property Length
The length of Properties in the DISCONNECT packet Variable Header encoded as a Variable Byte Integer.
If the Remaining Length is less than 2, a value of 0 is used.
```
Since this special case has already been implemented for DISCONNECT, and
RabbitMQ does not support QoS 2, this commit implements this special
case for the PUBACK packet.
Some MQTT clients (e.g. mqttjs) indeed set a Remaining Length of 3 in the PUBACK
packet.
See https://github.com/rabbitmq/rabbitmq-server/issues/2554#issuecomment-1620083550
and CLI as requested in
https://github.com/rabbitmq/rabbitmq-server/issues/2554#issuecomment-1604205277
"User Properties on the CONNECT packet can be used to send connection related properties from the Client to the Server.
The meaning of these properties is not defined by this specification."
[v5 3.1.2.11.8]
It makes sense to display the User Property of the CONNECT packet in the
Management UI in the connection's Client Properties.
Test
```
//deps/rabbitmq_mqtt:shared_SUITE-mixed --test_env FOCUS="-group [web_mqtt,v3,cluster_size_1] -case block_only_publisher"
```
was flaky:
```
=== Ended at 2023-06-26 07:09:57
=== Location: [{shared_SUITE,block_only_publisher,1323},
{test_server,ts_tc,1782},
{test_server,run_test_case_eval1,1291},
{test_server,run_test_case_eval,1223}]
=== === Reason: no match of right hand side value {error,ack_timeout}
in function shared_SUITE:block_only_publisher/1 (shared_SUITE.erl, line 1323)
in call from test_server:ts_tc/3 (test_server.erl, line 1782)
in call from test_server:run_test_case_eval1/6 (test_server.erl, line 1291)
in call from test_server:run_test_case_eval/9 (test_server.erl, line 1223)
```
It seems that the ack_timeout of 1 second was too low for a
subscription.
Attempt to fix the following flake:
```
=== Ended at 2023-06-26 07:13:34
=== Location: [{shared_SUITE,events,570},
{test_server,ts_tc,1782},
{test_server,run_test_case_eval1,1291},
{test_server,run_test_case_eval,1223}]
=== === Reason: no match of right hand side value []
in function shared_SUITE:events/1 (shared_SUITE.erl, line 570)
in call from test_server:ts_tc/3 (test_server.erl, line 1782)
in call from test_server:run_test_case_eval1/6 (test_server.erl, line 1291)
in call from test_server:run_test_case_eval/9 (test_server.erl, line 1223)
```
of test
```
-group [web_mqtt,v3,cluster_size_1] -case events
```
The logs showed that deletion of the exclusive queue took place in the
same millisecond as the server received the DISCONNECT:
```
2023-06-26 07:13:32.838282+00:00 [debug] <0.2494.0> Received a CONNECT, client ID: events, username: undefined, clean start: true, protocol version: 3, keepalive: 60, property names: []
2023-06-26 07:13:32.838436+00:00 [debug] <0.2494.0> MQTT connection 127.0.0.1:38808 -> 127.0.0.1:21007 picked vhost using plugin_configuration_or_default_vhost
2023-06-26 07:13:32.838523+00:00 [debug] <0.2494.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2023-06-26 07:13:32.838672+00:00 [info] <0.2494.0> Accepted Web MQTT connection 127.0.0.1:38808 -> 127.0.0.1:21007 for client ID events
2023-06-26 07:13:33.147196+00:00 [debug] <0.2494.0> Received a SUBSCRIBE with subscription(s) [{mqtt_subscription,<<"my/topic">>,
2023-06-26 07:13:33.147196+00:00 [debug] <0.2494.0> {mqtt_subscription_opts,0,false,
2023-06-26 07:13:33.147196+00:00 [debug] <0.2494.0> false,0,undefined}}]
2023-06-26 07:13:33.457541+00:00 [debug] <0.2494.0> Received an UNSUBSCRIBE for topic filter(s) [<<"my/topic">>]
2023-06-26 07:13:33.762171+00:00 [debug] <0.2494.0> Received a DISCONNECT with reason code 0 and properties #{}
2023-06-26 07:13:33.762350+00:00 [info] <0.2494.0> Web MQTT closing connection 127.0.0.1:38808 -> 127.0.0.1:21007
2023-06-26 07:13:33.762780+00:00 [debug] <0.2504.0> Deleting exclusive queue 'mqtt-subscription-eventsqos0' in vhost '/' because its declaring connection <0.2494.0> was closed
```
However, there could be some delay between disconnecting on the client
WebMQTT side and the processor processing the DISCONNECT packet.
For correlation IDs greater than 255 bytes, AMQP 1.0 to AMQP 0.9.1
conversion puts the correlation ID into an AMQP 0.9.1 header called
x-correlation-id (see
a17b28ec61/deps/rabbit/src/rabbit_msg_record.erl (L380)
)
Let's use the same key for the MQTT 5.0 to AMQP 0.9.1 conversion.
due to the following flake:
```
v5_SUITE:subscription_identifier failed on line 783
Reason: {test_case_failed,Received unexpected message: {publish,#{client_pid => <0.495.0>,dup => false,
packet_id => undefined,
payload => <<"m3">>,properties => #{},
qos => 0,retain => true,
topic => <<"t/3">>,
via => #Port<0.164>}}}
```
Also, log if unexpected message received due to flake in
```
=== Ended at 2023-06-22 14:30:07
=== Location: [{v5_SUITE,will_delay_message_expiry_publish_properties,1597},
{test_server,ts_tc,1782},
{test_server,run_test_case_eval1,1291},
{test_server,run_test_case_eval,1223}]
=== === Reason: {test_case_failed,"did not receive Will Message"}
```
- testQueuePropertiesWithCleanSessionSet() was not used
- testQueuePropertiesWithCleanSessionUnset() and
testQueuePropertiesWithCleanSession() was called only once so removing
the extra layer here to simply structure
- no need to set username and password unless they are different from
guest/guest
- clean start and clean session default to true; no need to set
explicitly
- same tests as v3 test suite, with pathos mqtt V5 client.
- two test cases are removed:
1. sessionRedelivery callback behaviors in v3 and v5 client seems
to be different that throwing exception does not shutdown a v5 client
immediately. This test case tests that unacked qos1 msgs are redelivered
by RMQ broker, which is covered in other test suite
2. lastWillDowngradesQoS2 is removed because for mqtt 5, will msgs with
unsupported Qos is an error and not accepted
This commit fixes the following crash:
```
2388-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0> exception error: bad argument
2389-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0> in function lists:keymember/3
2390-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0> called as lists:keymember(<<"x-mqtt-publish-qos">>,1,undefined)
2391-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0> *** argument 3: not a list
2392-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0> in call from rabbit_mqtt_processor:amqp_props_to_mqtt_props/2 (rabbit_mqtt_processor.erl, line 2219)
```
This crash occurs when a message without AMQP 0.9.1 #P_basic.headers
is sent to the MQTT v4 connection process, but consumed by an MQTT v5
connection process.
This is the case when an AMQP 0.9.1 client sends a message to a v4 MQTT
client, and this same client subsequently upgrades to v5 consuming the
message.
When sending from AMQP 0.9.1 client directly to a v5 MQTT connection,
the AMQP 0.9.1 header <<"x-binding-keys">> is set.
When sending from AMQP 0.9.1 client directly to a v4 MQTT connection,
no header is set, but this code branch was not evaluated.
Allow rabbitmq_exchange:route_return() to return infos in addition to
the matched binding keys.
For example, some exchange types read the full #amqqueue{} record from the database
and can in future return it from the route/3 function to avoid reading the full
record again in the channel.
RabbitMQ MQTT already supports authenticating clients via
username + password, OAuth tokens, and certificates.
We could make use of RabbitMQ SASL mechanisms in the future,
if needed. For now, if the client wants to perform extended
authentication, we return Bad Authentication Method in the CONNACK
packet.
Once the server's Topic Alias cache for messages from server to client
is full, this commit does not replace any existing aliases.
So, the first topics "win" and stay in the cache forever.
This matches the behaviour of VerneMQ and EMQX.
For now that's good enough.
In the future, we can easily change that behaviour to some smarter strategy,
for example
1. Hash the TopicName to an Topic Alias and replace the old
alias, or
2. For the Topic Alias Cache from server to client, keep 2 Maps:
#{TopicName => TopicAlias} and #{TopicAlias => TopicName} and a
counter that wraps to 1 once the Topic Alias Maximum is reached and
just replace an existing Alias if the TopicName is not cached.
Also, refactor Topic Alias Maximum:
* Remove code duplication
* Allow an operator to prohibit Topic Aliases by allowing value 0 to be
configured
* Change config name to topic_alias_maximum to that it matches exactly
the MQTT feture name
* Fix wrong code formatting
* Add the invalid or unkown Topic Alias to log message for easier
troubleshooting
For MQTT 5.0 destination queues, the topic exchange does not only have
to return the destination queue names, but also the matched binding
keys.
This is needed to implement MQTT 5.0 subscription options No Local,
Retain As Published and Subscription Identifiers.
Prior to this commit, as the trie was walked down, we remembered the
edges being walked and assembled the final binding key with
list_to_binary/1.
list_to_binary/1 is very expensive with long lists (long topic names),
even in OTP 26.
The CPU flame graph showed ~3% of CPU usage was spent only in
list_to_binary/1.
Unfortunately and unnecessarily, the current topic exchange
implementation stores topic levels as lists.
It would be better to store topic levels as binaries:
split_topic_key/1 should ideally use binary:split/3 similar as follows:
```
1> P = binary:compile_pattern(<<".">>).
{bm,#Ref<0.1273071188.1488322568.63736>}
2> Bin = <<"aaa.bbb..ccc">>.
<<"aaa.bbb..ccc">>
3> binary:split(Bin, P, [global]).
[<<"aaa">>,<<"bbb">>,<<>>,<<"ccc">>]
```
The compiled pattern could be placed into persistent term.
This commit decided to avoid migrating Mnesia tables to use binaries
instead of lists. Mnesia migrations are non-trivial, especially with the
current feature flag subsystem.
Furthermore the Mnesia topic tables are already getting migrated to
their Khepri counterparts in 3.13.
Adding additional migration only for Mnesia does not make sense.
So, instead of assembling the binding key as we walk down the trie and
then calling list_to_binary/1 in the leaf, it
would be better to just fetch the binding key from the database in the leaf.
As we reach the leaf of the trie, we know both source and destination.
Unfortunately, we cannot fetch the binding key efficiently with the
current rabbit_route (sorted by source exchange) and
rabbit_reverse_route (sorted by destination) tables as the key is in
the middle between source and destination.
If there are a huge number of bindings for a given sourc exchange (very
realistic in MQTT use cases) or a large number of bindings for a given
destination (also realistic), it would require scanning these large
number of bindings.
Therefore this commit takes the simplest possible solution:
The solution leverages the fact that binding arguments are already part of
table rabbit_topic_trie_binding.
So, if we simply include the binding key into the binding arguments, we
can fetch and return it efficiently in the topic exchange
implementation.
The following patch omitting fetching the empty list binding argument
(the default) makes routing slower because function
`analyze_pattern.constprop.0` requires significantly more (~2.5%) CPU time
```
@@ -273,7 +273,11 @@ trie_bindings(X, Node) ->
node_id = Node,
destination = '$1',
arguments = '$2'}},
- mnesia:select(?MNESIA_BINDING_TABLE, [{MatchHead, [], [{{'$1', '$2'}}]}]).
+ mnesia:select(
+ ?MNESIA_BINDING_TABLE,
+ [{MatchHead, [{'andalso', {'is_list', '$2'}, {'=/=', '$2', []}}], [{{'$1', '$2'}}]},
+ {MatchHead, [], ['$1']}
+ ]).
```
Hence, this commit always fetches the binding arguments.
All MQTT 5.0 destination queues will create a binding that
contains the binding key in the binding arguments.
Not only does this solution avoid expensive list_to_binay/1 calls, but
it also means that Erlang app rabbit (specifically the topic exchange
implementation) does not need to be aware of MQTT anymore:
It just returns the binding key when the binding args tell to do so.
In future, once the Khepri migration completed, we should be able to
relatively simply remove the binding key from the binding arguments
again to free up some storage space.
Note that one of the advantages of a trie data structue is its space
efficiency that you don't have to store the same prefixes multiple
times.
However, for RabbitMQ the binding key is already stored at least N times
in various routing tables, so storing it a few times more via the
binding arguments should be acceptable.
The speed improvements are favoured over a few more MBs ETS usage.
The format of #mqtt_msg{} changes from 3.12 to 3.13.
In 3.13 the record contains 2 additional fields:
* props
* timestamp
The old #mqtt_msg{} might still be stored by the retained message store
in ets or dets.
This commit converts such an old message format when read from the
database.
The alternative would have been to run a migration function over the
whole table which is slightly more complex to implement.
Instead of giving the new message format a different record name,
e.g. #mqtt_msg_v2{}, this commit decides to re-use the same name such
that the new code only handles the record name #mqtt_msg{}.
When RabbitMQ enters maintenance mode / is being drained, all client
connections are closed.
This commit sends a DISCONNECT packet to (Web) MQTT 5.0 clients with
Reason Code "Server shutting down" before the connection is closed.
The following PUBLISH and Will properties are forwarded unaltered by the
server:
* Payload Format Indicator
* Content Type
* Response Topic
* Correlation Data
* User Property
Not only must these properties be forwarded unaltered from an MQTT
publishing client to an MQTT receiving client, but it would also be nice
to allow for protocol interoperability:
Think about RPC request-response style patterns where the requester is
an MQTT client and the responder is an AMQP 0.9.1 or STOMP client.
We reuse the P_basic fields where possible:
* content_type (if <= 255 bytes)
* correlation_id (if <= 255 bytes)
Otherwise, we add custom AMQP 0.9.1 headers.
The headers follow the naming pattern "x-mqtt-<property>" where
<property> is the MQTT v5 property if that property makes only
(mainly) sense in the MQTT world:
* x-mqtt-user-property
* x-mqtt-payload-format-indicator
If the MQTT v5 property makes also sense outside of the MQTT world, we
name it more generic:
* x-correlation (if > 255 bytes)
* x-reply-to-topic (since P_basic.reply_to assumes a queue name)
In the future, we can think about adding a header x-reply-to-exchange
and have the MQTT plugin set its value to the configured mqtt.exchange
such that clients don't have to assume the default topic exchange amq.topic.
Previously, the Will Message could be kept in memory in the MQTT
connection process state. Upon termination, the Will Message is sent.
The new MQTT 5.0 feature Will Delay Interval requires storing the Will
Message outside of the MQTT connection process state.
The Will Message should not be stored node local because the client
could reconnect to a different node.
Storing the Will Message in Mnesia is not an option because we want to
get rid of Mnesia. Storing the Will Message in a Ra cluster or in Khepri
is only an option if the Will Payload is small as there is currently no
way in Ra to **efficiently** snapshot large binary data (Note that these
Will Messages are not consumed in a FIFO style workload like messages in
quorum queues. A Will Message needs to be stored for as long as the
Session lasts - up to 1 day by default, but could also be much longer if
RabbitMQ is configured with a higher maximum session expiry interval.)
Usually Will Payloads are small: They are just a notification that its
MQTT session ended abnormally. However, we don't know how users leverage
the Will Message feature. The MQTT protocol allows for large Will Payloads.
Therefore, the solution implemented in this commit - which should work
good enough - is storing the Will Message in a queue.
Each MQTT session which has a Session Expiry Interval and Will Delay
Interval of > 0 seconds will create a queue if the current Network
Connection ends where it stores its Will Message. The Will Message has a
message TTL set (corresponds to the Will Delay Interval) and the queue
has a queue TTL set (corresponds to the Session Expiry Interval).
If the client does not reconnect within the Will Delay Interval, the
message is dead lettered to the configured MQTT topic exchange
(amq.topic by default).
The Will Delay Interval can be set by both publishers and subscribers.
Therefore, the Will Message is the 1st session state that RabbitMQ keeps
for publish-only MQTT clients.
One current limitation of this commit is that a Will Message that is
delayed (i.e. Will Delay Interval is set) and retained (i.e. Will Retain
flag set) will not be retained.
One solution to retain delayed Will Messages is that the retainer
process consumes from a queue and the queue binds to the topic exchange
with a topic starting with `$`, for example `$retain/#`.
The AMQP 0.9.1 Will Message that is dead lettered could then be added a
CC header such that it won't not only be published with the Will Topic,
but also with `$retain` topic. For example, if the Will Topic is `a/b`,
it will publish with routing key `a/b` and CC header `$retain/a/b`.
The reason this is not implemented in this commit is that to keep the
currently broken retained message store behaviour, we would require
creating at least one queue per node and publishing only to that local
queue. In future, once we have a replicated retained message store based
on a Stream for example, we could just publish all retained messages to
the `$retain` topic and thefore into the Stream.
So, for now, we list "retained and delayed Will Messages" as a limitation
that they actually won't be retained.
Every queue type sets the queue pid when it creates the queue.
Prior to this commit, the queue pid set within the MQTT connection
process was a bit confusing as the queue pid will be different for
classic queues and quorum queues.
This commit fixes 2 separate issues:
1. No quorum queue got created in v5 because Session Expiry Interval was 0.
2. Fix a function_clause error. Pass the decoded properties further to other
functions looking up headers.
due to rebasing onto main.
mqtt5 branch adds a new header
```
{<<"x-mqtt-retain">>, bool, false}
```
which caused the incoming_message_interceptors test case to fail.
The MQTT v5 spec is a bit vague on Retain Handling 1:
"If Retain Handling is set to 1 then if the subscription did not
already exist, the Server MUST send all retained message matching the
Topic Filter of the subscription to the Client, and if the subscription
did exist the Server MUST NOT send the retained messages.
[MQTT-3.3.1-10]." [v5 3.3.1.3]
Does a subscription with the same topic filter but different
subscription options mean that "the subscription did exist"?
This commit interprets "subscription exists" as both topic filter and
subscription options must be the same.
Therefore, if a client creates a subscription with a topic filter that
is identical to a previous subscription and subscription options that
are different and Retain Handling 1, the server sends the retained
message.
Support subscription options "No Local" and "Retain As Published"
as well as Subscription Identifiers.
All three MQTT 5.0 features can be set on a per subscription basis.
Due to wildcards in topic filters, multiple subscriptions
can match a given topic. Therefore, to implement Retain As Published and
Subscription Identifiers, the destination MQTT connection process needs
to know what subscription(s) caused it to receive the message.
There are a few ways how this could be implemented:
1. The destination MQTT connection process is aware of all its
subscriptions. Whenever, it receives a message, it can match the
message's routing key / topic against all its known topic filters.
However, to iteratively match the routing key against all topic
filters for every received message can become very expensive in the
worst case when the MQTT client creates many subscriptions containing
wildcards. This could be the case for an MQTT client that acts as a
bridge or proxy or dispatcher: It could subscribe via a wildcard for
each of its own clients.
2. Instead of interatively matching the topic of the received message
against all topic filters that contain wildcards, a better approach
would be for every MQTT subscriber connection process to maintain a
local trie datastructure (similar to how topic exchanges are
implemented) and perform matching therefore more efficiently.
However, this does not sound optimal either because routing is
effectively performed twice: in the topic exchange and again against
a much smaller trie in each destination connection process.
3. Given that the topic exchange already perform routing, a much more
sensible way would be to send the matched binding key(s) to the
destination MQTT connection process. A subscription (topic filter)
maps to a binding key in AMQP 0.9.1 routing. Therefore, for the first
time in RabbitMQ, the routing function should not only output a list
of unique destination queues, but also the binding keys (subscriptions)
that caused the message to be routed to the destination queue.
This commit therefore implements the 3rd approach.
The downside of the 3rd approach is that it requires API changes to the
routing function and topic exchange.
Specifically, this commit adds a new function rabbit_exchange:route/3
that accepts a list of routing options. If that list contains version 2,
the caller of the routing function knows how to handle the return value
that could also contain binding keys.
This commits allows an MQTT connection process, the channel process, and
at-most-once dead lettering to handle binding keys. Binding keys are
included as AMQP 0.9.1 headers into the basic message.
Therefore, whenever a message is sent from an MQTT client or AMQP 0.9.1
client or AMQP 1.0 client or STOMP client, the MQTT receiver will know
the subscription identifier that caused the message to be received.
Note that due to the low number of allowed wildcard characters (# and
+), the cardinality of matched binding keys shouldn't be high even if
the topic contains for example 3 levels and the message is sent to for
example 5 million destination queues. In other words, sending multiple
distinct basic messages to the destination shouldn't hurt the delegate
optimisation too much. The delegate optimisation implemented for classic
queues and rabbit_mqtt_qos0_queue(s) still takes place for all basic
messages that contain the same set of matched binding keys.
The topic exchange returns all matched binding keys by remembering the
edges walked down to the leaves. As an optimisation, only for MQTT
queues are binding keys being returned. This does add a small dependency
from app rabbit to app rabbitmq_mqtt which is not optimal. However, this
dependency should be simple to remove when omitting this optimisation.
Another important feature of this commit is persisting subscription
options and subscription identifiers because they are part of the
MQTT 5.0 session state.
In MQTT v3 and v4, the only subscription information that were part of
the session state was the topic filter and the QoS level.
Both information were implicitly stored in the form of bindings:
The topic filter as the binding key and the QoS level as the destination
queue name of the binding.
For MQTT v5 we need to persist more subscription information.
From a domain perspective, it makes sense to store subscription options
as part of subscriptions, i.e. bindings, even though they are currently
not used in routing.
Therefore, this commits stores subscription options as binding arguments.
Storing subscription options as binding arguments comes in turn with
new challenges: How to handle mixed version clusters and upgrading an
MQTT session from v3 or v4 to v5?
Imagine an MQTT client connects via v5 with Session Expiry Interval > 0
to a new node in a mixed version cluster, creates a subscription,
disconnects, and subsequently connects via v3 to an old node. The
client should continue to receive messages.
To simplify such edge cases, this commit introduces a new feature flag
called mqtt_v5. If mqtt_v5 is disabled, clients cannot connect to
RabbitMQ via MQTT 5.0.
This still doesn't entirely solve the problem of MQTT session upgrades
(v4 to v5 client) or session downgrades (v5 to v4 client).
Ideally, once mqtt_v5 is enabled, all MQTT bindings contain non-empty binding
arguments. However, this will require a feature flag migration function
to modify all MQTT bindings. To be more precise, all MQTT bindings need
to be deleted and added because the binding argument is part of the
Mnesia table key.
Since feature flag migration functions are non-trivial to implement in
RabbitMQ (they can run on every node multiple times and concurrently),
this commit takes a simpler approach:
All v3 / v4 sessions keep the empty binding argument [].
All v5 sessions use the new binding argument [#mqtt_subscription_opts{}].
This requires only handling a session upgrade / downgrade by
creating a binding (with the new binding arg) and deleting the old
binding (with the old binding arg) when processing the CONNECT packet.
Note that such session upgrades or downgrades should be rather rare in
practice. Therefore these binding transactions shouldn't hurt peformance.
The No Local option is implemented within the MQTT publishing connection
process: The message is not sent to the MQTT destination if the
destination queue name matches the current MQTT client ID and the
message was routed due to a subscription that has the No Local flag set.
This avoids unnecessary traffic on the MQTT queue.
The alternative would have been that the "receiving side" (same process)
filters the message out - which would have been more consistent in how
Retain As Published and Subscription Identifiers are implemented, but
would have caused unnecessary load on the MQTT queue.
1. Shrinking times out if there is an error, therefore remove the 60
seconds Bazel timeout by using a medium size bazel test suite.
2. The MQTT 5.0 spec mandates for binary data types and UTF 8 string
data types to have values of maximum 65,535 bytes.
Therefore, ensure this test suite does not generate data greater than
that limit.
- when clients connect with a duplicate client id;
disconnect with reason code session taken over 142
- when keep alive has timed out;
disconnect with reason code keep alive timeout 141
emqtt repos:
emqx/emqtt PR #196 is based on rabbitmq:otp-26-compatibility
emqx/emqtt PR #198 is based on ansd:master
rabbitmq/master contains both of these 2 PRs cherry-picked.
rabbitmq-server repos:
main branch points emqtt to rabbitmq:otp-26-compatibility
mqtt5 branch points emqtt to rabbitmq:master
Therefore, the current mqtt5 branch is OTP 26 compatible and can support
multiple subscription identifiers.
All MQTT packets that can be sent in both directions (from client to
server and server to client) are tested in packet_prop_SUITE.
The symmetric property is very concise because encoding and then decoding an
MQTT packet should yield the original MQTT packet.
The input data variety of the previous example based tests was very
small.
- This is a revision on the commit that implemented
receive maximum 670d6b2 which sends as many unack msgs
as the client has set to be their receive max.
This commit adds back the unack msgs limit from the server (10)
which is a much safer limit than a user provided max.
- mqtt v5 has more descriptive return values for suback
- added two possible failure reason codes for suback packet
one for permission error, another for quota exceeded error
- modified auth suite to assert on reason codes for v5
- no new test case since failures were already covered
- rename processor state prefetch to receive_maximum
to better match property name for mqtt 5
- defaults to 10 (as previously) when not set
not saved in session state, configuration is per
connection
Allow Session Expiry Interval to be changed when client DISCONNECTs.
Deprecate config subscription_ttl in favour of max_session_expiry_interval_secs
because the Session Expiry Interval will also apply to publishers that
connect with a will message and will delay interval.
"The additional session state of an MQTT v5 server includes:
* The Will Message and the Will Delay Interval
* If the Session is currently not connected, the time at which the Session
will end and Session State will be discarded."
The Session Expiry Interval picked by the server and sent to the client
in the CONNACK is the minimum of max_session_expiry_interval_secs and
the requested Session Expiry Interval by the client in CONNECT.
This commit favours dynamically changing the queue argument x-expires
over creating millions of different policies since that many policies
will come with new scalability issues.
Dynamically changing queue arguments is not allowed by AMQP 0.9.1
clients. However, it should be perfectly okay for the MQTT plugin to do
so for the queues it manages. MQTT clients are not aware that these
queues exist.
If a Server receives a CONNECT packet containing a Will QoS that
exceeds its capabilities, it MUST reject the connection. It SHOULD
use a CONNACK packet with Reason Code 0x9B (QoS not supported) as
described in section 4.13 Handling errors, and MUST close the Network Connection
MQTT v5 allows client and server to negatively ack a message by setting
a reason code of 128 or greater indicating failure.
"If PUBACK or PUBREC is received containing a Reason Code of 0x80 or greater
the corresponding PUBLISH packet is treated as acknowledged, and MUST NOT be
retransmitted [MQTT-4.4.0-2]."
Even though the spec prohibits resending such messages, if a client does
not accept a message, RabbitMQ can still dead letter the message.
MQTT v5 spec:
"If the current retained message for a Topic expires, it is discarded
and there will be no retained message for that topic."
This commit also supports Message Expiry Interval for retained messages
when a node is restarted.
Therefore, the insertion timestamp needs to be stored on disk.
Upon recovery, the Erlang timers are re-created.
- "If the Server included a Maximum QoS in its CONNACK response
to a Client and it receives a PUBLISH packet with a QoS greater than this
then it uses DISCONNECT with Reason Code 0x9B (QoS not supported)"
- only affects mqtt v5, server max qos is 1
This commit does not yet implement Message Expiry Interval of
* retained messages: "If the current retained message for a Topic
expires, it is discarded and there will be no retained message for
that topic."
"Retained messages do not form part of the Session State in the Server,
they are not deleted as a result of a Session ending."
Both retained message stores ETS and DETS implement recovery.
This commit adds a test that recovery works as intended.
"Allow the Client and Server to independently specify the maximum
packet size they support. It is an error for the session partner
to send a larger packet."
This commit implements the part where the Server specifies the maximum
packet size.
"In the case of an error in a CONNECT packet it MAY send a CONNACK
packet containing the Reason Code, before closing the Network
Connection. In the case of an error in any other packet it SHOULD send a
DISCONNECT packet containing the Reason Code before closing the Network
Connection."
This commit implements only the "SHOULD" (second) part, not the "MAY"
(first) part.
There are now 2 different global wide MQTT settings on the server:
1. max_packet_size_unauthenticated which applies to the CONNECT packet
(and maybe AUTH packet in the future)
2. max_packet_size_authenticated which applies to all other MQTT
packets (that is, after the client successfully authenticated).
These two settings will apply to all MQTT versions.
In MQTT v5, if a non-CONNECT packet is too large, the server will send a
DISCONNECT packet to the client with Reason Code "Packet Too Large"
before closing the network connection.
"Allow the Client and Server to independently specify the maximum
packet size they support. It is an error for the session partner
to send a larger packet."
This commit implements the part where the Client specifies the maximum
packet size.
As per protocol spec, instead of sending, the server drops the MQTT packet
if it's too large.
A debug message is logged for "infrequent" packet types.
For PUBLISH packets, the messages is rejected to the queue such that it
will be dead lettered, if dead lettering is configured.
At the very least, Prometheus metrics for dead lettered messages will
be increased, even if dead lettering is not configured.
As reported in https://groups.google.com/g/rabbitmq-users/c/x8ACs4dBlkI/
plugins that implement rabbit_channel_interceptor break with
Native MQTT in 3.12 because Native MQTT does not use rabbit_channel anymore.
Specifically, these plugins don't work anymore in 3.12 when sending a message
from an MQTT publisher to an AMQP 0.9.1 consumer.
Two of these plugins are
https://github.com/rabbitmq/rabbitmq-message-timestamp
and
https://github.com/rabbitmq/rabbitmq-routing-node-stamp
This commit moves both plugins into rabbitmq-server.
Therefore, these plugins are deprecated starting in 3.12.
Instead of using these plugins, the user gets the same behaviour by
configuring rabbitmq.conf as follows:
```
incoming_message_interceptors.set_header_timestamp.overwrite = false
incoming_message_interceptors.set_header_routing_node.overwrite = false
```
While both plugins were incompatible to be used together, this commit
allows setting both headers.
We name the top level configuration key `incoming_message_interceptors`
because only incoming messages are intercepted.
Currently, only `set_header_timestamp` and `set_header_routing_node` are
supported. (We might support more in the future.)
Both can set `overwrite` to `false` or `true`.
The meaning of `overwrite` is the same as documented in
https://github.com/rabbitmq/rabbitmq-message-timestamp#always-overwrite-timestamps
i.e. whether headers should be overwritten if they are already present
in the message.
Both `set_header_timestamp` and `set_header_routing_node` behave exactly
to plugins `rabbitmq-message-timestamp` and `rabbitmq-routing-node-stamp`,
respectively.
Upon node boot, the configuration is put into persistent_term to not
cause any performance penalty in the default case where these settings
are disabled.
The channel and MQTT connection process will intercept incoming messages
and - if configured - add the desired AMQP 0.9.1 headers.
For now, this allows using Native MQTT in 3.12 with the old plugins
behaviour.
In the future, once "message containers" are implemented,
we can think about more generic message interceptors where plugins can be
written to modify arbitrary headers or message contents for various protocols.
Likewise, in the future, once MQTT 5.0 is implemented, we can think
about an MQTT connection interceptor which could function similar to a
`rabbit_channel_interceptor` allowing to modify any MQTT packet.
As requested in https://github.com/rabbitmq/rabbitmq-server/discussions/6331#discussioncomment-5796154
include all infos that were emitted in the MQTT connection created event also
in the MQTT connection closed event.
This ensures infos such as MQTT client ID are part of the connection
closed event.
Therefore, it's easy for the user to correlate between the two event
types.
Note that the MQTT plugin emits connection created and connection closed events only if
the CONNECT packet was successfully processed, i.e.authentication was successful.
Remove the disconnected_at property because it was never used.
rabbit_event already adds a timestamp to any event.
Up to 3.11.x an MQTT client ID is tracked in Ra
as a list of bytes as returned by binary_to_list/1 in
48467d6e12/deps/rabbitmq_mqtt/src/rabbit_mqtt_frame.erl (L137)
This has two downsides:
1. Lists consume more memory than binaries (when tracking many clients).
2. It violates the MQTT spec which states
"The ClientId MUST be a UTF-8 encoded string as defined in Section 1.5.3 [MQTT-3.1.3-4]." [v4 3.1.3.1]
Therefore, the original idea was to always store MQTT client IDs as
binaries starting with Native MQTT in 3.12.
However, this leads to client ID tracking misbehaving in mixed version
clusters since new nodes would register client IDs as binaries and old
nodes would register client IDs as lists. This means that a client
registering on a new node with the same client ID as a connection to the
old node did not terminate the connection on the old node.
Therefore, for backwards compatibility, we leave the client ID as a list of bytes
in the Ra machine state because the feature flag delete_ra_cluster_mqtt_node
introduced in v3.12 will delete the Ra cluster anyway and
the new client ID tracking via pg local will store client IDs as
binaries.
An interesting side note learned here is that the compiled file
rabbit_mqtt_collector must not be changed. This commit only modifies
function specs. However as soon as the compiled code is changed, this
module becomes a new version. The new version causes the anonymous ra query
function to fail in mixed clusters: When the old node does a
ra:leader_query where the leader is on the new node, the query function
fails on the new node with `badfun` because the new node does not have
the same module version. For more context, read:
https://web.archive.org/web/20181017104411/http://www.javalimit.com/2010/05/passing-funs-to-other-erlang-nodes.html
This change should be reverted once emqx/emqtt is OTP26 compatible.
Our fork/branch isn't either at this point, but at least partially
works. Let's use this branch for now to uncover server-side OTP26
incompatibilities (and continue working on OTP26 support for emqtt of
course).
in MQTT tests.
For example in the ff_SUITE wee see in Buildbuddy sporadic
failures that the cluster cannot be created within 2 minutes:
```
*** CT Error Notification 2023-04-24 10:58:55.628 ***🔗
rabbit_ct_helpers:port_receive_loop failed on line 945
Reason: {timetrap_timeout,120000}
...
=== Ended at 2023-04-24 10:58:55
=== Location: [{rabbit_ct_helpers,port_receive_loop,945},
{rabbit_ct_helpers,exec,920},
{rabbit_ct_broker_helpers,cluster_nodes1,858},
{rabbit_ct_broker_helpers,cluster_nodes1,840},
{rabbit_ct_helpers,run_steps,141},
{ff_SUITE,init_per_group,last_expr},
{test_server,ts_tc,1782},
{test_server,run_test_case_eval1,1379},
{test_server,run_test_case_eval,1223}]
=== Reason: timetrap timeout
===
*** init_per_group failed.
Skipping all cases.
```
The default time limit for a test case is 30 minutes.
Bazel build files are now maintained primarily with `bazel run
gazelle`. This will analyze and merge changes into the build files as
necessitated by certain code changes (e.g. the introduction of new
modules).
In some cases there hints to gazelle in the build files, such as `#
gazelle:erlang...` or `# keep` comments. xref checks on plugins that
depend on the cli are a good example.
Similarly to https://github.com/rabbitmq/rabbitmq-server/pull/7663,
increase the message size and decrease the client buffer sizes.
This change is needed because we switched from erlang:port_command/2 to
gen_tcp:send/2. The former is a bit more asynchronous than the latter
because the latter waits for the inet_reply from the port.
What:
Delete bindings of exclusive durable queues after an
unclean shutdown.
Why:
Native MQTT in 3.12 uses only durable queues to ease transition to
Khepri. Since auto-delete queues are not deleted after an unclean
shutdown, Native MQTT uses exclusive (instead of auto-delete) queues
for clean sessions.
While this bug is not specific to Native MQTT, this bug is most relevant
for the upcoming 3.12 release since exclusive durable queues are rarely used
otherwise.
How:
During queue recovery, not all bindings are recovered yet.
Therefore, if, during queue recovery, an exclusive, durable queue need to
be deleted, only durable bindings should be queried.
Queue types need to make sure that their exclusive, durable queues including their
bindings are deleted before starting with binding recovery.
Otherwise binding deletion and binding recovery get interleaved leading to topic
bindings being created and left behind.
Therefore, a classic queue process replies to the recovery process after it
deleted its queue record and associated bindings from the database.
The test always succeeds on `main` branch.
The test also always succeeds on `mc` branch when running remotely:
```
bazel test //deps/rabbitmq_mqtt:reader_SUITE --test_env FOCUS="-group tests -case rabbit_mqtt_qos0_queue_overflow" --config=rbe-25 -t- --runs_per_test=50
```
However, the test flakes when running on `mc` branch locally on the MAC:
```
make -C deps/rabbitmq_mqtt ct-reader t=tests:rabbit_mqtt_qos0_queue_overflow FULL=1
```
with the following local changes:
```
~/workspace/rabbitmq-server/deps/rabbitmq_mqtt mc *6 !1 > 3s direnv rb 2.7.2
diff --git a/deps/rabbitmq_mqtt/test/reader_SUITE.erl b/deps/rabbitmq_mqtt/test/reader_SUITE.erl
index fb71eae375..21377a2e73 100644
--- a/deps/rabbitmq_mqtt/test/reader_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/reader_SUITE.erl
@@ -27,7 +27,7 @@ all() ->
groups() ->
[
- {tests, [],
+ {tests, [{repeat_until_any_fail, 30}],
[
block_connack_timeout,
handle_invalid_packets,
@@ -43,7 +43,7 @@ groups() ->
].
suite() ->
- [{timetrap, {seconds, 60}}].
+ [{timetrap, {minutes, 60}}].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
```
failes prior to this commit after the 2nd time and does not fail after
this commit.
This is useful for understanding if a deleted queue was matching any
policies given the more selective policies introduced in #7601.
Does not apply to bulk deletion of transient queues on node down.
Deploying a 5 node RabbitMQ cluster with rabbitmq_mqtt plugin enabled
using the cluster-operator with rabbitmq image
rabbitmq:3.12.0-beta.1-management often fails with the following
error:
```
Feature flags: failed to enable `delete_ra_cluster_mqtt_node`:
{error,
{no_more_servers_to_try,
[{error,
noproc},
{error,
noproc},
{timeout,
{mqtt_node,
'rabbit@mqtt-rabbit-3-12-server-2.mqtt-rabbit-3-12-nodes.default'}}]}}
```
During rabbitmq_mqtt plugin start, the plugin decides whether it should
create a Ra cluster:
If feature flag delete_ra_cluster_mqtt_node is enabled, no Ra cluster
gets created.
If this feature flag is disabled, a Ra cluster is created.
Even though all feature flags are enabled by default for a fresh
cluster, the feature flag subsystem cannot provide any promise when feature
flag migration functions run during the boot process:
The migration functions can run before plugins are started or many
seconds after plugins are started.
There is also no API that tells when feature flag initialisation on the
local node is completed.
Therefore, during a fresh 3.12 cluster start with rabbitmq_mqtt enabled,
some nodes decide to start a Ra cluster (because not all feature flags
are enabled yet when the rabbitmq_mqtt plugin is started).
Prior to this commit, when the feature flag delete_ra_cluster_mqtt_node
got enabled, Ra cluster deletion timed out because the Ra cluster never got
initialised successfully: Members never proceeded past the pre-vote
phase because their Ra peers already had feature flag delete_ra_cluster_mqtt_node
enabled and therefore don't participate.
One possible fix is to remove the feature flag delete_ra_cluster_mqtt_node
and have the newly upgraded 3.12 node delete its Ra membership during a
rolling update. However, this approach is too risky because during a
rolling update of a 3 node cluster A, B, and C the following issue
arises:
1. C upgrades successfully and deletes its Ra membership
2. B shuts down. At this point A and B are members of the Ra cluster
while only A is online (i.e. not a majority). MQTT clients which try
to re-connect to A fail because A times out registering the client in
ad5cc7e250/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl (L174-L179)
Therefore this commit fixes 3.12 cluster creation as follows:
The Ra cluster deletion timeout is reduced from 60 seconds to 15 seconds,
and the Ra server is force deleted if Ra cluster deletion fails.
Force deleting the server will wipe the Ra cluster data on disk.
Instead of using atom `undefined`, use an integer as correlation term
when sending the will message to destination queues.
Classic queue clients for example expect a non negative integer.
Quorum queues expect any term.
"If the Client supplies a zero-byte ClientId with CleanSession set to 0,
the Server MUST respond to the CONNECT Packet with a CONNACK return code 0x02
(Identifier rejected) and then close the Network Connection" [MQTT-3.1.3-8].
In Web MQTT, the CONNACK was not sent to the client because the Web MQTT
connection process terminated before being sending the CONNACK to the
client.
Every ~30 runs, test case `sessionRedelivery` was failing with error:
```
[ERROR] sessionRedelivery{TestInfo} Time elapsed: 1.298 s <<< ERROR!
org.eclipse.paho.client.mqttv3.MqttException: Client is currently disconnecting
at com.rabbitmq.mqtt.test.MqttTest.sessionRedelivery(MqttTest.java:535)
```
The problem was that the Java client was still in connection state
`DISCONNECTING` which throws a Java exception when `connect()`ing.
So, the problem was client side.
We already check for `isConnected()` to be `false` which internally
checks for
```
conState == CONNECTED
```
However, there is no public client API to check for other connection
states. Therefore just waiting for a few milliseconds fixes the flake.
We see sporadic test failures where a test case hangs in the
receive until the Bazel suite timeout is reached.
There is no point in a test case to wait forever for an AMQP 0.9.1
connection to establish. Let's time out after 1 minute.
This will make the test case fail faster.
This is the latest commit in the series, it fixes (almost) all the
problems with missing and circular dependencies for typing.
The only 2 unsolved problems are:
- `lg` dependency for `rabbit` - the problem is that it's the only
dependency that contains NIF. And there is no way to make dialyzer
ignore it - looks like unknown check is not suppressable by dialyzer
directives. In the future making `lg` a proper dependency can be a
good thing anyway.
- some missing elixir function in `rabbitmq_cli` (CSV, JSON and
logging related).
- `eetcd` dependency for `rabbitmq_peer_discovery_etcd` - this one
uses sub-directories in `src/`, which confuses dialyzer (or our bazel
machinery is not able to properly handle it). I've tried the latest
rules_erlang which flattens directory for .beam files, but it wasn't
enough for dialyzer - it wasn't able to find core erlang files. This
is a niche plugin and an unusual dependency, so probably not worth
investigating further.
So far, we had the following functions to list nodes in a RabbitMQ
cluster:
* `rabbit_mnesia:cluster_nodes/1` to get members of the Mnesia cluster;
the argument was used to select members (all members or only those
running Mnesia and participating in the cluster)
* `rabbit_nodes:all/0` to get all members of the Mnesia cluster
* `rabbit_nodes:all_running/0` to get all members who currently run
Mnesia
Basically:
* `rabbit_nodes:all/0` calls `rabbit_mnesia:cluster_nodes(all)`
* `rabbit_nodes:all_running/0` calls `rabbit_mnesia:cluster_nodes(running)`
We also have:
* `rabbit_node_monitor:alive_nodes/1` which filters the given list of
nodes to only select those currently running Mnesia
* `rabbit_node_monitor:alive_rabbit_nodes/1` which filters the given
list of nodes to only select those currently running RabbitMQ
Most of the code uses `rabbit_mnesia:cluster_nodes/1` or the
`rabbit_nodes:all*/0` functions. `rabbit_mnesia:cluster_nodes(running)`
or `rabbit_nodes:all_running/0` is often used as a close approximation
of "all cluster members running RabbitMQ". This list might be incorrect
in times where a node is joining the clustered or is being worked on
(i.e. Mnesia is running but not RabbitMQ).
With Khepri, there won't be the same possible approximation because we
will try to keep Khepri/Ra running even if RabbitMQ is stopped to
expand/shrink the cluster.
So in order to clarify what we want when we query a list of nodes, this
patch introduces the following functions:
* `rabbit_nodes:list_members/0` to get all cluster members, regardless
of their state
* `rabbit_nodes:list_reachable/0` to get all cluster members we can
reach using Erlang distribution, regardless of the state of RabbitMQ
* `rabbit_nodes:list_running/0` to get all cluster members who run
RabbitMQ, regardless of the maintenance state
* `rabbit_nodes:list_serving/0` to get all cluster members who run
RabbitMQ and are accepting clients
In addition to the list functions, there are the corresponding
`rabbit_nodes:is_*(Node)` checks and `rabbit_nodes:filter_*(Nodes)`
filtering functions.
The code is modified to use these new functions. One possible
significant change is that the new list functions will perform RPC calls
to query the nodes' state, unlike `rabbit_mnesia:cluster_nodes(running)`.
The queue type being created for MQTT connections is solely determined
by the rabbitmq_mqtt plugin, not by per vhost defaults.
If the per vhost default queue type is configured to be a quorum queue,
we still want to create classic queues for MQTT connections.
Let's decrease the mailbox_soft_limit from 1000 to 200.
Obviously, both values are a bit arbitrary.
However, MQTT workloads usually do not have high throughput patterns for
a single MQTT connection. The only valid scenario where an MQTT
connections' process mailbox could have many messages is in large fan-in
scenarios where many MQTT devices sending messages at once to a single MQTT
device - which is rather unusual.
It makes more sense to protect against cluster wide memory alarms by
decreasing the mailbox_soft_limit.
Always enable feature flag rabbit_mqtt_qos0_queue
in test case rabbit_mqtt_qos0_queue_overflow because this test case does
not make sense without the mqtt_qos0 queue type.
Note that enabling the feature flag should always succeed because this
test case runs on a single node, and therefore on a new version in mixed
version tests.
In the MQTT test assertions, instead of checking whether the test runs
in mixed version mode where all non-required feature flags are disabled
by default, check whether the given feature flag is enabled.
Prior to this commit, once feature flag rabbit_mqtt_qos0_queue becomes
required, the test cases would have failed.
Nowadays, the old RabbitMQ nodes in mixed version cluster
tests on `main` branch run in version 3.11.7.
Since maintenance mode was wrongly closing cluster-wide MQTT connections
only in RabbitMQ <3.11.2 (and <3.10.10), we can re-enable this mixed
version test.
such that MQTT and WebMQTT tests of the shared_SUITE can run in parallel.
Before this commit, the shared_SUITE runs 14 minutes, after this commit
the shared_SUITE runs 4 minutes in GitHub actions.
AMQP 0.9.1 header x-mqtt-dup was determined by the incoming MQTT PUBLISH
packet's DUP flag. Its only use was to determine the outgoing MQTT
PUBLISH packet's DUP flag. However, that's wrong behaviour because
the MQTT 3.1.1 protocol spec mandates:
"The value of the DUP flag from an incoming PUBLISH packet is not
propagated when the PUBLISH Packet is sent to subscribers by the Server.
The DUP flag in the outgoing PUBLISH packet is set independently to the
incoming PUBLISH packet, its value MUST be determined solely by whether
the outgoing PUBLISH packet is a retransmission."
[MQTT-3.3.1-3]
Native MQTT fixes this wrong behaviour. Therefore, we can delete this
AMQP 0.9.1 header.
Native MQTT introduced a regression where the "{username}" and "{vhost}"
variables were not expanded in permission patterns.
This regression was unnoticed because the java_SUITE's
topicAuthorisationVariableExpansion test was wrongfully passing because
its topic started with "test-topic" which matched another allow listed
topic (namely "test-topic") instead of the pattern
"{username}.{client_id}.a".
This other java_SUITE regression got introduced by commit
26a17e8530
This commit fixes both the buggy Java test and the actual regression
introduced in Native MQTT.
This commit is pure refactoring making the code base more maintainable.
Replace rabbit_misc:pipeline/3 with the new OTP 25 experimental maybe
expression because
"Frequent ways in which people work with sequences of failable
operations include folds over lists of functions, and abusing list
comprehensions. Both patterns have heavy weaknesses that makes them less
than ideal."
https://www.erlang.org/eeps/eep-0049#obsoleting-messy-patterns
Additionally, this commit is more restrictive in the type spec of
rabbit_mqtt_processor state fields.
Specifically, many fields were defined to be `undefined | T` where
`undefined` was only temporarily until the first CONNECT packet was
processed by the processor.
It's better to initialise the MQTT processor upon first CONNECT packet
because there is no point in having a processor without having received
any packet.
This allows many type specs in the processor to change from `undefined |
T` to just `T`.
Additionally, memory is saved by removing the `received_connect_packet`
field from the `rabbit_mqtt_reader` and `rabbit_web_mqtt_handler`.
Include API functions to the rabbit_mqtt_retained_msg_store
behaviour module.
"There is a best practice to have the behaviour module include
the API also as it helps other parts of the code to be correct
and a bit more dialyzable."
This commit also fixes a bug where the retainer process had only
60 milliseconds shutdown time before being unconditionally killed.
60 milliseconds can be too short to dump a large ETS table containing
many retained messages to disk.
Prior to this commit test `deps.rabbitmq_mqtt.cluster_SUITE`
`connection_id_tracking_with_decommissioned_node` was flaky and sometimes
failed with
```
{cluster_SUITE,connection_id_tracking_with_decommissioned_node,160}
{test_case_failed,failed to match connection count 0}
```
as it seems to always match peer_host.
Commit 7e09b85426 adds peer address
provided by WebMQTT plugin.
However, this seems unnecessary since function rabbit_net:peername/1 on
the unwrapped socket provides the same address.
The peer address was the address of the proxy if the proxy protocol is
enabled.
This commit simplifies code and reduces memory consumption.
In MQTT 3.1.1, the CONNECT packet consists of
1. 10 bytes variable header
2. ClientId (up to 23 bytes must be supported)
3. Will Topic
4. Will Message (maximum length 2^16 bytes)
5. User Name
6. Password
Restricting the CONNECT packet size to 2^16 = 65,536 bytes
seems to be a reasonalbe default.
The value is configurable via the MQTT app parameter
`max_packet_size_unauthenticated`.
(Instead of being called `max_packet_size_connect`) the
name `max_packet_size_unauthenticated` is generic
because MQTT 5 introduces an AUTH packet type.
When a single field in a record is updated, all remaining
fields' pointers are copied. Hence, if the record is large,
a lot will be copied.
Therefore, put static or rarely changing fields into their own record.
The same was done for the state in rabbit_channel or rabbit_fifo
for example.
Also, merge #info{} record into the new #cfg{} record.
as it was unnecessary to introduce it in the first place.
Remove the queue name from all queue type clients and pass the queue
name to the queue type callbacks that need it.
We have to leave feature flag classic_queue_type_delivery_support
required because we removed the monitor registry
1fd4a6d353/deps/rabbit/src/rabbit_queue_type.erl (L322-L325)
Implements review from Karl:
"rather than changing the message format we could amend the queue type
callbacks involved with the stateful operation to also take the queue
name record as an argument. This way we don't need to maintain the extra
queue name (which uses memory for known but obscurely technical reasons
with how maps work) in the queue type state (as it is used in the queue
type state map as the key)"
Instead of having optional rabbit_queue_type callbacks, add stub
implementations to rabbit_mqtt_qos0_queue throwing an exception.
The exception uses erlang:error/2 including stack trace and arguments
of the unsupported functions to ease debugging in case these functions
were ever to be called.
Dialyzer suppressions are added for these functions such that dialyzer
won't complain about:
```
rabbit_mqtt_qos0_queue.erl:244:1: Function init/1 only terminates with explicit exception
```
For example when at-most-once dead lettering does a fan out to many
target classic queues this commit will reduce inter-node data traffic by
using delegate.
Use delegate.
For large fan-outs with medium to large message size,
this commit will reduce inter-node data traffic by
multiple orders of magnitude preventing busy distribution
ports.
We want the build to fail if there are any dialyzer warnings in
rabbitmq_mqtt or rabbitmq_web_mqtt. Otherwise we rely on people manually
executing and checking the results of dialyzer.
Also, we want any test to fail that is flaky.
Flaky tests can indicate subtle errors in either test or program execution.
Instead of marking them as flaky, we should understand and - if possible -
fix the underlying root cause.
Fix OTP 25.0 dialyzer warning
Type gen_server:format_status() is known in OTP 25.2, but not in 25.0
Run test either on a RabbitMQ cluster of size 1 or size 3.
Running a test on both cluster sizes does not result in higher test
coverage.
This puts less pressure on Buildbuddy and reduces overall test
execution time.
Although the first element (destination queue) of the compound key
in rabbit_reverse_route is provided, scaling tests with million of
subscribers have shown via timer:tc/1 that the mnesia:match_object/3
query often takes > 100 microseconds, sometimes even a few milliseconds.
So, querying bindings is not super expensive, but moderately expensive
when done many times concurrenlty.
Espcecially when mass diconnecting millions of clients, `msacc` showed
that all schedulers were 80% busy in `ets`.
To put less pressure on the CPU, in this commit we rather decide to
slightly increase memory usage.
When first connecting a client, we only query bindings and cache them in
the process state if a prior session for the same client is present.
Thereafter, bindings are not queried again.
Up to RabbitMQ 3.11, the following bug existed.
The MQTT 3.1.1. protocol spec mandates:
```
The Session state in the Server consists of:
* The Client’s subscriptions.
* ...
```
However, because QoS 0 queues were auto-delete up to 3.11 (or exclusive
prior to this commit), QoS 0 queues and therefore their bindings were
deleted when a non-clean session terminated.
When the same client re-connected, its QoS 0 subscription was lost.
Note that storing **messages** for QoS 0 subscription is optional while the
client is disconnected. However, storing the subscription itself (i.e.
bindings in RabbitMQ terms) is not optional: The client must receive new
messages for its QoS 0 subscriptions after it reconnects without having
to send a SUBSCRIBE packet again.
"After the disconnection of a Session that had CleanSession set to 0,
the Server MUST store further QoS 1 and QoS 2 messages that match any
subscriptions that the client had at the time of disconnection as part
of the Session state [MQTT-3.1.2-5]. It MAY also store QoS 0 messages
that meet the same criteria."
This commit additionally implements the last sentence.
Prior to this commit:
```
rabbitmqctl status
...
Totals
Connection count: 0
Queue count: 64308
Virtual host count: 1
...
```
only counted AMQP connections, but did not include MQTT or stream
connections.
Let's include the count of all connections in the output of
`rabbitmqctl status`.
Prior to this commit, there was a CPU bottleneck (not present in 3.11.x)
when creating, deleting or disconnecting many MQTT subscribers.
Example:
Add 120 MQTT connections per second each creating a subscription.
Starting at around 300k MQTT subscribers, all 45 CPUs on the server were
maxed out spending time in `ets` according to msacc.
When running a similar workload with only 30k MQTT subscribers on a
local server with only 5 CPUs, all 5 CPUs were maxed out and the CPU
flame graph showed that 86% of the CPU time is spent in function
rabbit_mqtt_processor:topic_names/2.
This commit uses the rabbit_reverse_route table to query MQTT
subscriptions for a given client ID. CPU usage is now drastically lower.
The configured source topic exchange is always the same in the MQTT
plugin. There is however a high cardinality in the destination queues
(MQTT client IDs) and routing keys (topics).
When a cluster wide memory or disk alarm is fired, in AMQP 0.9.1 only
connections that are publishing messages get blocked.
Connections that only consume can continue to empty the queues.
Prior to this commit, all MQTT connections got blocked during a memory
or disk alarm. This has two downsides:
1. MQTT connections that only consume, but never published, cannot empty
queues anymore.
2. If the memory or disk alarm takes long, the MQTT client does not
receive a PINGRESP from the server when it sends a PINGREQ potentially
leading to mass client disconnection (depending on the MQTT client
implementation).
This commit makes sure that an MQTT connection that never sent a single
PUBLISH packet (e.g. "pure" MQTT subscribers) are not blocked during
memory or disk alarms.
In contrast to AMQP 0.9.1, new connections are still blocked from being
accepted because accepting (many) new MQTT connections also lead to
increased resource usage.
The implemenation as done in this commit is simpler, but also more naive
than the logic in rabbit_reader: rabbit_reader blocks connections more
dynamically whereas rabbit_mqtt_reader and rabbit_web_mqtt_handler
block a connection if the connection ever sent a single PUBLISH packet
during its lifetime.
Convert from the old rabbit_log* API to the new Logger macros for MQTT
and Web MQTT connections.
Advantages:
* metadata mfa, file, line, pid, gl, time is auto-inserted by Logger.
* Log lines output by the shared module rabbit_mqtt_processor now
include via the domain whether it's a MQTT or Web MQTT connection.
Instead of using domain [rabbitmq, connection], this commit now uses
the smaller and more specialized domains [rabbitmq, connection, mqtt] and
[rabbitmq, connection, web_mqtt] for MQTT and Web MQTT processes
respectively, resulting in the following example output:
"msg":"Received a CONNECT,", "domain":"rabbitmq.connection.mqtt"
or
"msg":"Received a CONNECT,", "domain":"rabbitmq.connection.web_mqtt"
New test suite deps/rabbitmq_mqtt/test/shared_SUITE contains tests that
are executed against both MQTT and Web MQTT.
This has two major advantages:
1. Eliminates test code duplication between rabbitmq_mqtt and
rabbitmq_web_mqtt making the tests easier to maintain and to understand.
2. Increases test coverage of Web MQTT.
It's acceptable to add a **test** dependency from rabbitmq_mqtt to
rabbitmq_web_mqtt. Obviously, there should be no such dependency
for non-test code.
Topic, username, and password are parsed as binaries.
Storing topics as lists or converting between
lists and binaries back and forth several times is
unnecessary and expensive.
Previously (until RabbitMQ v3.11.x), a memory or disk alarm did
not block the Web MQTT connection because this feature was only
implemented half way through: The part that registers the Web MQTT
connection with rabbit_alarm was missing.
1. Allow to inspect an (web) MQTT connection.
2. Show MQTT client ID on connection page as part of client_properties.
3. Handle force_event_refresh (when management_plugin gets enabled
after (web) MQTT connections got created).
4. Reduce code duplication between protocol readers.
5. Display '?' instead of 'NaN' in UI for absent queue metrics.
6. Allow an (web) MQTT connection to be closed via management_plugin.
For 6. this commit takes the same approach as already done for the stream
plugin:
The stream plugin registers neither with {type, network} nor {type,
direct}.
We cannot use gen_server:call/3 anymore to close the connection
because the web MQTT connection cannot handle gen_server calls (only
casts).
Strictly speaking, this commit requires a feature flag to allow to force
closing stream connections from the management plugin during a rolling
update. However, given that this is rather an edge case, and there is a
workaround (connect to the node directly hosting the stream connection),
this commit will not introduce a new feature flag.
- mqtt processor publish_to_queue/2 is called in
process_request(?PUBLISH,_, _) and maybe_send_will/3. In
both places #mqtt_msg{} is initialized with value so it will
never be 'undefined'.
- all possible value are already matched in mqtt_processor
human_readable_vhost_lookup_strategy/1; deleted the unneeded
catch all function clause.
- Removed a unnecessary case matching in mqtt_reader init/1.
Return values for 'rabbit_net:connection_string' are {ok, _} or
{error, _}. {'network_error', Reason} will never match.
- Fix function spec for mqtt_util gen_client_id/1. Return type of
rabbit_misc:base64url is string(), not binary().