MQTT tests depend on a few plugins, which are just used in 1 or 2
suites each. These have caused issues in CI, triggering a bug in
rabbitmq_federation where the mirrored supervisor submits a transaction
while the cluster is being shut down. The transaction hangs and the
whole rabbitmq_mqtt job times out.
This bug has been addressed, however it is best to start just the required
plugins on each SUITE.
Require all MQTT feature flags and remove their compatibility code:
* delete_ra_cluster_mqtt_node
* rabbit_mqtt_qos0_queue
* mqtt_v5
These feature flags were introduced in or before 3.13.0.
PR #10761 added a new CLI command to list Web MQTT connections.
That new CLI command relies on feature flag delete_ra_cluster_mqtt_node
being enabled.
This commit ensures exactly this condition.
and CLI as requested in
https://github.com/rabbitmq/rabbitmq-server/issues/2554#issuecomment-1604205277
"User Properties on the CONNECT packet can be used to send connection related properties from the Client to the Server.
The meaning of these properties is not defined by this specification."
[v5 3.1.2.11.8]
It makes sense to display the User Property of the CONNECT packet in the
Management UI in the connection's Client Properties.
Support subscription options "No Local" and "Retain As Published"
as well as Subscription Identifiers.
All three MQTT 5.0 features can be set on a per subscription basis.
Due to wildcards in topic filters, multiple subscriptions
can match a given topic. Therefore, to implement Retain As Published and
Subscription Identifiers, the destination MQTT connection process needs
to know what subscription(s) caused it to receive the message.
There are a few ways how this could be implemented:
1. The destination MQTT connection process is aware of all its
subscriptions. Whenever, it receives a message, it can match the
message's routing key / topic against all its known topic filters.
However, to iteratively match the routing key against all topic
filters for every received message can become very expensive in the
worst case when the MQTT client creates many subscriptions containing
wildcards. This could be the case for an MQTT client that acts as a
bridge or proxy or dispatcher: It could subscribe via a wildcard for
each of its own clients.
2. Instead of interatively matching the topic of the received message
against all topic filters that contain wildcards, a better approach
would be for every MQTT subscriber connection process to maintain a
local trie datastructure (similar to how topic exchanges are
implemented) and perform matching therefore more efficiently.
However, this does not sound optimal either because routing is
effectively performed twice: in the topic exchange and again against
a much smaller trie in each destination connection process.
3. Given that the topic exchange already perform routing, a much more
sensible way would be to send the matched binding key(s) to the
destination MQTT connection process. A subscription (topic filter)
maps to a binding key in AMQP 0.9.1 routing. Therefore, for the first
time in RabbitMQ, the routing function should not only output a list
of unique destination queues, but also the binding keys (subscriptions)
that caused the message to be routed to the destination queue.
This commit therefore implements the 3rd approach.
The downside of the 3rd approach is that it requires API changes to the
routing function and topic exchange.
Specifically, this commit adds a new function rabbit_exchange:route/3
that accepts a list of routing options. If that list contains version 2,
the caller of the routing function knows how to handle the return value
that could also contain binding keys.
This commits allows an MQTT connection process, the channel process, and
at-most-once dead lettering to handle binding keys. Binding keys are
included as AMQP 0.9.1 headers into the basic message.
Therefore, whenever a message is sent from an MQTT client or AMQP 0.9.1
client or AMQP 1.0 client or STOMP client, the MQTT receiver will know
the subscription identifier that caused the message to be received.
Note that due to the low number of allowed wildcard characters (# and
+), the cardinality of matched binding keys shouldn't be high even if
the topic contains for example 3 levels and the message is sent to for
example 5 million destination queues. In other words, sending multiple
distinct basic messages to the destination shouldn't hurt the delegate
optimisation too much. The delegate optimisation implemented for classic
queues and rabbit_mqtt_qos0_queue(s) still takes place for all basic
messages that contain the same set of matched binding keys.
The topic exchange returns all matched binding keys by remembering the
edges walked down to the leaves. As an optimisation, only for MQTT
queues are binding keys being returned. This does add a small dependency
from app rabbit to app rabbitmq_mqtt which is not optimal. However, this
dependency should be simple to remove when omitting this optimisation.
Another important feature of this commit is persisting subscription
options and subscription identifiers because they are part of the
MQTT 5.0 session state.
In MQTT v3 and v4, the only subscription information that were part of
the session state was the topic filter and the QoS level.
Both information were implicitly stored in the form of bindings:
The topic filter as the binding key and the QoS level as the destination
queue name of the binding.
For MQTT v5 we need to persist more subscription information.
From a domain perspective, it makes sense to store subscription options
as part of subscriptions, i.e. bindings, even though they are currently
not used in routing.
Therefore, this commits stores subscription options as binding arguments.
Storing subscription options as binding arguments comes in turn with
new challenges: How to handle mixed version clusters and upgrading an
MQTT session from v3 or v4 to v5?
Imagine an MQTT client connects via v5 with Session Expiry Interval > 0
to a new node in a mixed version cluster, creates a subscription,
disconnects, and subsequently connects via v3 to an old node. The
client should continue to receive messages.
To simplify such edge cases, this commit introduces a new feature flag
called mqtt_v5. If mqtt_v5 is disabled, clients cannot connect to
RabbitMQ via MQTT 5.0.
This still doesn't entirely solve the problem of MQTT session upgrades
(v4 to v5 client) or session downgrades (v5 to v4 client).
Ideally, once mqtt_v5 is enabled, all MQTT bindings contain non-empty binding
arguments. However, this will require a feature flag migration function
to modify all MQTT bindings. To be more precise, all MQTT bindings need
to be deleted and added because the binding argument is part of the
Mnesia table key.
Since feature flag migration functions are non-trivial to implement in
RabbitMQ (they can run on every node multiple times and concurrently),
this commit takes a simpler approach:
All v3 / v4 sessions keep the empty binding argument [].
All v5 sessions use the new binding argument [#mqtt_subscription_opts{}].
This requires only handling a session upgrade / downgrade by
creating a binding (with the new binding arg) and deleting the old
binding (with the old binding arg) when processing the CONNECT packet.
Note that such session upgrades or downgrades should be rather rare in
practice. Therefore these binding transactions shouldn't hurt peformance.
The No Local option is implemented within the MQTT publishing connection
process: The message is not sent to the MQTT destination if the
destination queue name matches the current MQTT client ID and the
message was routed due to a subscription that has the No Local flag set.
This avoids unnecessary traffic on the MQTT queue.
The alternative would have been that the "receiving side" (same process)
filters the message out - which would have been more consistent in how
Retain As Published and Subscription Identifiers are implemented, but
would have caused unnecessary load on the MQTT queue.
in MQTT tests.
For example in the ff_SUITE wee see in Buildbuddy sporadic
failures that the cluster cannot be created within 2 minutes:
```
*** CT Error Notification 2023-04-24 10:58:55.628 ***🔗
rabbit_ct_helpers:port_receive_loop failed on line 945
Reason: {timetrap_timeout,120000}
...
=== Ended at 2023-04-24 10:58:55
=== Location: [{rabbit_ct_helpers,port_receive_loop,945},
{rabbit_ct_helpers,exec,920},
{rabbit_ct_broker_helpers,cluster_nodes1,858},
{rabbit_ct_broker_helpers,cluster_nodes1,840},
{rabbit_ct_helpers,run_steps,141},
{ff_SUITE,init_per_group,last_expr},
{test_server,ts_tc,1782},
{test_server,run_test_case_eval1,1379},
{test_server,run_test_case_eval,1223}]
=== Reason: timetrap timeout
===
*** init_per_group failed.
Skipping all cases.
```
The default time limit for a test case is 30 minutes.
1. Allow to inspect an (web) MQTT connection.
2. Show MQTT client ID on connection page as part of client_properties.
3. Handle force_event_refresh (when management_plugin gets enabled
after (web) MQTT connections got created).
4. Reduce code duplication between protocol readers.
5. Display '?' instead of 'NaN' in UI for absent queue metrics.
6. Allow an (web) MQTT connection to be closed via management_plugin.
For 6. this commit takes the same approach as already done for the stream
plugin:
The stream plugin registers neither with {type, network} nor {type,
direct}.
We cannot use gen_server:call/3 anymore to close the connection
because the web MQTT connection cannot handle gen_server calls (only
casts).
Strictly speaking, this commit requires a feature flag to allow to force
closing stream connections from the management plugin during a rolling
update. However, given that this is rather an edge case, and there is a
workaround (connect to the node directly hosting the stream connection),
this commit will not introduce a new feature flag.
When listing MQTT connections with the CLI, whether feature flag
delete_ra_cluster_mqtt_node is enabled or disabled, in both cases
return cluster wide MQTT connections.
If connection tracking is done in Ra, the CLI target node returns all
connection infos because Ra is aware of all MQTT connections.
If connection tracking is done in (local-only) pg, all nodes return
their local MQTT connection infos.
This is a follow-up commit of https://github.com/rabbitmq/rabbitmq-server/pull/5693
The allowed values of emqtt client library are:
```
{proto_ver, v3 | v4 | v5}
```
Therefore, `{proto_ver, 3}` did not have any effect and used the default
protocol version v4.
Let's fix the misleading version in our tests and be explicit that
we use v4.
The rabbitmq_mqtt tests used an outdated MQTT Erlang client.
It was a fork that has not been updated for > 4 years.
This commit upgrades the client to the latest version.
Therefore, we can delete our fork https://github.com/rabbitmq/emqttc.git
The way connections are listed already contains all the connections, so
there is no need to reach out to all cluster nodes and aggregate the
results, as it results in duplicate lines.
The command test now uses a cluster to make sure connections are listed
properly in a cluster.
[#167639960]
Fixes#202
* Handle duplicate keys (see 82d1cb23e9, for instance)
* Use maps since this test is in master only
* Use eunit assertions that would print arguments on failures