Commit Graph

31 Commits

Author SHA1 Message Date
David Ansari 42ede4a258 Speed up tests
Multiple test cases were recently slowed down by up to 30 seconds.
This commit reverts these changes.
2024-12-30 16:56:18 +00:00
Diana Parra Corbacho 43cfc3c937 Tests: Increase receive-after timeout in all mqtt test suites 2024-12-16 11:58:05 +01:00
David Ansari 50116f0927 Require MQTT feature flags in 4.0
Require all MQTT feature flags and remove their compatibility code:
* delete_ra_cluster_mqtt_node
* rabbit_mqtt_qos0_queue
* mqtt_v5

These feature flags were introduced in or before 3.13.0.
2024-07-10 10:27:59 +02:00
Diana Parra Corbacho 5f0981c5a3
Allow to use Khepri database to store metadata instead of Mnesia
[Why]

Mnesia is a very powerful and convenient tool for Erlang applications:
it is a persistent disc-based database, it handles replication accross
multiple Erlang nodes and it is available out-of-the-box from the
Erlang/OTP distribution. RabbitMQ relies on Mnesia to manage all its
metadata:

* virtual hosts' properties
* intenal users
* queue, exchange and binding declarations (not queues data)
* runtime parameters and policies
* ...

Unfortunately Mnesia makes it difficult to handle network partition and,
as a consequence, the merge conflicts between Erlang nodes once the
network partition is resolved. RabbitMQ provides several partition
handling strategies but they are not bullet-proof. Users still hit
situations where it is a pain to repair a cluster following a network
partition.

[How]

@kjnilsson created Ra [1], a Raft consensus library that RabbitMQ
already uses successfully to implement quorum queues and streams for
instance. Those queues do not suffer from network partitions.

We created Khepri [2], a new persistent and replicated database engine
based on Ra and we want to use it in place of Mnesia in RabbitMQ to
solve the problems with network partitions.

This patch integrates Khepri as an experimental feature. When enabled,
RabbitMQ will store all its metadata in Khepri instead of Mnesia.

This change comes with behavior changes. While Khepri remains disabled,
you should see no changes to the behavior of RabbitMQ. If there are
changes, it is a bug. After Khepri is enabled, there are significant
changes of behavior that you should be aware of.

Because it is based on the Raft consensus algorithm, when there is a
network partition, only the cluster members that are in the partition
with at least `(Number of nodes in the cluster ÷ 2) + 1` number of nodes
can "make progress". In other words, only those nodes may write to the
Khepri database and read from the database and expect a consistent
result.

For instance in a cluster of 5 RabbitMQ nodes:
* If there are two partitions, one with 3 nodes, one with 2 nodes, only
  the group of 3 nodes will be able to write to the database.
* If there are three partitions, two with 2 nodes, one with 1 node, none
  of the group can write to the database.

Because the Khepri database will be used for all kind of metadata, it
means that RabbitMQ nodes that can't write to the database will be
unable to perform some operations. A list of operations and what to
expect is documented in the associated pull request and the RabbitMQ
website.

This requirement from Raft also affects the startup of RabbitMQ nodes in
a cluster. Indeed, at least a quorum number of nodes must be started at
once to allow nodes to become ready.

To enable Khepri, you need to enable the `khepri_db` feature flag:

    rabbitmqctl enable_feature_flag khepri_db

When the `khepri_db` feature flag is enabled, the migration code
performs the following two tasks:
1. It synchronizes the Khepri cluster membership from the Mnesia
   cluster. It uses `mnesia_to_khepri:sync_cluster_membership/1` from
   the `khepri_mnesia_migration` application [3].
2. It copies data from relevant Mnesia tables to Khepri, doing some
   conversion if necessary on the way. Again, it uses
   `mnesia_to_khepri:copy_tables/4` from `khepri_mnesia_migration` to do
   it.

This can be performed on a running standalone RabbitMQ node or cluster.
Data will be migrated from Mnesia to Khepri without any service
interruption. Note that during the migration, the performance may
decrease and the memory footprint may go up.

Because this feature flag is considered experimental, it is not enabled
by default even on a brand new RabbitMQ deployment.

More about the implementation details below:

In the past months, all accesses to Mnesia were isolated in a collection
of `rabbit_db*` modules. This is where the integration of Khepri mostly
takes place: we use a function called `rabbit_khepri:handle_fallback/1`
which selects the database and perform the query or the transaction.
Here is an example from `rabbit_db_vhost`:

* Up until RabbitMQ 3.12.x:

        get(VHostName) when is_binary(VHostName) ->
            get_in_mnesia(VHostName).

* Starting with RabbitMQ 3.13.0:

        get(VHostName) when is_binary(VHostName) ->
            rabbit_khepri:handle_fallback(
              #{mnesia => fun() -> get_in_mnesia(VHostName) end,
                khepri => fun() -> get_in_khepri(VHostName) end}).

This `rabbit_khepri:handle_fallback/1` function relies on two things:
1. the fact that the `khepri_db` feature flag is enabled, in which case
   it always executes the Khepri-based variant.
4. the ability or not to read and write to Mnesia tables otherwise.

Before the feature flag is enabled, or during the migration, the
function will try to execute the Mnesia-based variant. If it succeeds,
then it returns the result. If it fails because one or more Mnesia
tables can't be used, it restarts from scratch: it means the feature
flag is being enabled and depending on the outcome, either the
Mnesia-based variant will succeed (the feature flag couldn't be enabled)
or the feature flag will be marked as enabled and it will call the
Khepri-based variant. The meat of this function really lives in the
`khepri_mnesia_migration` application [3] and
`rabbit_khepri:handle_fallback/1` is a wrapper on top of it that knows
about the feature flag.

However, some calls to the database do not depend on the existence of
Mnesia tables, such as functions where we need to learn about the
members of a cluster. For those, we can't rely on exceptions from
Mnesia. Therefore, we just look at the state of the feature flag to
determine which database to use. There are two situations though:

* Sometimes, we need the feature flag state query to block because the
  function interested in it can't return a valid answer during the
  migration. Here is an example:

        case rabbit_khepri:is_enabled(RemoteNode) of
            true  -> can_join_using_khepri(RemoteNode);
            false -> can_join_using_mnesia(RemoteNode)
        end

* Sometimes, we need the feature flag state query to NOT block (for
  instance because it would cause a deadlock). Here is an example:

        case rabbit_khepri:get_feature_state() of
            enabled -> members_using_khepri();
            _       -> members_using_mnesia()
        end

Direct accesses to Mnesia still exists. They are limited to code that is
specific to Mnesia such as classic queue mirroring or network partitions
handling strategies.

Now, to discover the Mnesia tables to migrate and how to migrate them,
we use an Erlang module attribute called
`rabbit_mnesia_tables_to_khepri_db` which indicates a list of Mnesia
tables and an associated converter module. Here is an example in the
`rabbitmq_recent_history_exchange` plugin:

    -rabbit_mnesia_tables_to_khepri_db(
       [{?RH_TABLE, rabbit_db_rh_exchange_m2k_converter}]).

The converter module  — `rabbit_db_rh_exchange_m2k_converter` in this
example  — is is fact a "sub" converter module called but
`rabbit_db_m2k_converter`. See the documentation of a `mnesia_to_khepri`
converter module to learn more about these modules.

[1] https://github.com/rabbitmq/ra
[2] https://github.com/rabbitmq/khepri
[3] https://github.com/rabbitmq/khepri_mnesia_migration

See #7206.

Co-authored-by: Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com>
Co-authored-by: Diana Parra Corbacho <dparracorbac@vmware.com>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
2023-09-29 16:00:11 +02:00
Diana Parra Corbacho 7d06f806c2
Mqtt: filter test events 2023-08-07 17:21:04 +02:00
David Ansari a715eb7756 Attempt to fix flake
Attempt to fix the following flake:
```
=== Ended at 2023-06-26 07:13:34
=== Location: [{shared_SUITE,events,570},
              {test_server,ts_tc,1782},
              {test_server,run_test_case_eval1,1291},
              {test_server,run_test_case_eval,1223}]
=== === Reason: no match of right hand side value []
  in function  shared_SUITE:events/1 (shared_SUITE.erl, line 570)
  in call from test_server:ts_tc/3 (test_server.erl, line 1782)
  in call from test_server:run_test_case_eval1/6 (test_server.erl, line 1291)
  in call from test_server:run_test_case_eval/9 (test_server.erl, line 1223)
```

of test
```
-group [web_mqtt,v3,cluster_size_1] -case events
```

The logs showed that deletion of the exclusive queue took place in the
same millisecond as the server received the DISCONNECT:
```
2023-06-26 07:13:32.838282+00:00 [debug] <0.2494.0> Received a CONNECT, client ID: events, username: undefined, clean start: true, protocol version: 3, keepalive: 60, property names: []
2023-06-26 07:13:32.838436+00:00 [debug] <0.2494.0> MQTT connection 127.0.0.1:38808 -> 127.0.0.1:21007 picked vhost using plugin_configuration_or_default_vhost
2023-06-26 07:13:32.838523+00:00 [debug] <0.2494.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2023-06-26 07:13:32.838672+00:00 [info] <0.2494.0> Accepted Web MQTT connection 127.0.0.1:38808 -> 127.0.0.1:21007 for client ID events
2023-06-26 07:13:33.147196+00:00 [debug] <0.2494.0> Received a SUBSCRIBE with subscription(s) [{mqtt_subscription,<<"my/topic">>,
2023-06-26 07:13:33.147196+00:00 [debug] <0.2494.0>                                             {mqtt_subscription_opts,0,false,
2023-06-26 07:13:33.147196+00:00 [debug] <0.2494.0>                                              false,0,undefined}}]
2023-06-26 07:13:33.457541+00:00 [debug] <0.2494.0> Received an UNSUBSCRIBE for topic filter(s) [<<"my/topic">>]
2023-06-26 07:13:33.762171+00:00 [debug] <0.2494.0> Received a DISCONNECT with reason code 0 and properties #{}
2023-06-26 07:13:33.762350+00:00 [info] <0.2494.0> Web MQTT closing connection 127.0.0.1:38808 -> 127.0.0.1:21007
2023-06-26 07:13:33.762780+00:00 [debug] <0.2504.0> Deleting exclusive queue 'mqtt-subscription-eventsqos0' in vhost '/' because its declaring connection <0.2494.0> was closed
```
However, there could be some delay between disconnecting on the client
WebMQTT side and the processor processing the DISCONNECT packet.
2023-06-26 13:00:01 +02:00
David Ansari fb7af48df6 Support Will Delay Interval
Previously, the Will Message could be kept in memory in the MQTT
connection process state. Upon termination, the Will Message is sent.

The new MQTT 5.0 feature Will Delay Interval requires storing the Will
Message outside of the MQTT connection process state.

The Will Message should not be stored node local because the client
could reconnect to a different node.

Storing the Will Message in Mnesia is not an option because we want to
get rid of Mnesia. Storing the Will Message in a Ra cluster or in Khepri
is only an option if the Will Payload is small as there is currently no
way in Ra to **efficiently** snapshot large binary data (Note that these
Will Messages are not consumed in a FIFO style workload like messages in
quorum queues. A Will Message needs to be stored for as long as the
Session lasts - up to 1 day by default, but could also be much longer if
RabbitMQ is configured with a higher maximum session expiry interval.)
Usually Will Payloads are small: They are just a notification that its
MQTT session ended abnormally. However, we don't know how users leverage
the Will Message feature. The MQTT protocol allows for large Will Payloads.

Therefore, the solution implemented in this commit - which should work
good enough - is storing the Will Message in a queue.
Each MQTT session which has a Session Expiry Interval and Will Delay
Interval of > 0 seconds will create a queue if the current Network
Connection ends where it stores its Will Message. The Will Message has a
message TTL set (corresponds to the Will Delay Interval) and the queue
has a queue TTL set (corresponds to the Session Expiry Interval).
If the client does not reconnect within the Will Delay Interval, the
message is dead lettered to the configured MQTT topic exchange
(amq.topic by default).

The Will Delay Interval can be set by both publishers and subscribers.
Therefore, the Will Message is the 1st session state that RabbitMQ keeps
for publish-only MQTT clients.

One current limitation of this commit is that a Will Message that is
delayed (i.e. Will Delay Interval is set) and retained (i.e. Will Retain
flag set) will not be retained.
One solution to retain delayed Will Messages is that the retainer
process consumes from a queue and the queue binds to the topic exchange
with a topic starting with `$`, for example `$retain/#`.
The AMQP 0.9.1 Will Message that is dead lettered could then be added a
CC header such that it won't not only be published with the Will Topic,
but also with `$retain` topic. For example, if the Will Topic is `a/b`,
it will publish with routing key `a/b` and CC header `$retain/a/b`.

The reason this is not implemented in this commit is that to keep the
currently broken retained message store behaviour, we would require
creating at least one queue per node and publishing only to that local
queue. In future, once we have a replicated retained message store based
on a Stream for example, we could just publish all retained messages to
the `$retain` topic and thefore into the Stream.
So, for now, we list "retained and delayed Will Messages" as a limitation
that they actually won't be retained.
2023-06-21 17:14:08 +01:00
David Ansari e2b545f270 Support MQTT 5.0 features No Local, RAP, Subscription IDs
Support subscription options "No Local" and "Retain As Published"
as well as Subscription Identifiers.

All three MQTT 5.0 features can be set on a per subscription basis.
Due to wildcards in topic filters, multiple subscriptions
can match a given topic. Therefore, to implement Retain As Published and
Subscription Identifiers, the destination MQTT connection process needs
to know what subscription(s) caused it to receive the message.

There are a few ways how this could be implemented:

1. The destination MQTT connection process is aware of all its
   subscriptions. Whenever, it receives a message, it can match the
   message's routing key / topic against all its known topic filters.
   However, to iteratively match the routing key against all topic
   filters for every received message can become very expensive in the
   worst case when the MQTT client creates many subscriptions containing
   wildcards. This could be the case for an MQTT client that acts as a
   bridge or proxy or dispatcher: It could subscribe via a wildcard for
   each of its own clients.

2. Instead of interatively matching the topic of the received message
   against all topic filters that contain wildcards, a better approach
   would be for every MQTT subscriber connection process to maintain a
   local trie datastructure (similar to how topic exchanges are
   implemented) and perform matching therefore more efficiently.
   However, this does not sound optimal either because routing is
   effectively performed twice: in the topic exchange and again against
   a much smaller trie in each destination connection process.

3. Given that the topic exchange already perform routing, a much more
   sensible way would be to send the matched binding key(s) to the
   destination MQTT connection process. A subscription (topic filter)
   maps to a binding key in AMQP 0.9.1 routing. Therefore, for the first
   time in RabbitMQ, the routing function should not only output a list
   of unique destination queues, but also the binding keys (subscriptions)
   that caused the message to be routed to the destination queue.

This commit therefore implements the 3rd approach.
The downside of the 3rd approach is that it requires API changes to the
routing function and topic exchange.

Specifically, this commit adds a new function rabbit_exchange:route/3
that accepts a list of routing options. If that list contains version 2,
the caller of the routing function knows how to handle the return value
that could also contain binding keys.

This commits allows an MQTT connection process, the channel process, and
at-most-once dead lettering to handle binding keys. Binding keys are
included as AMQP 0.9.1 headers into the basic message.
Therefore, whenever a message is sent from an MQTT client or AMQP 0.9.1
client or AMQP 1.0 client or STOMP client, the MQTT receiver will know
the subscription identifier that caused the message to be received.

Note that due to the low number of allowed wildcard characters (# and
+), the cardinality of matched binding keys shouldn't be high even if
the topic contains for example 3 levels and the message is sent to for
example 5 million destination queues. In other words, sending multiple
distinct basic messages to the destination shouldn't hurt the delegate
optimisation too much. The delegate optimisation implemented for classic
queues and rabbit_mqtt_qos0_queue(s) still takes place for all basic
messages that contain the same set of matched binding keys.

The topic exchange returns all matched binding keys by remembering the
edges walked down to the leaves. As an optimisation, only for MQTT
queues are binding keys being returned. This does add a small dependency
from app rabbit to app rabbitmq_mqtt which is not optimal. However, this
dependency should be simple to remove when omitting this optimisation.

Another important feature of this commit is persisting subscription
options and subscription identifiers because they are part of the
MQTT 5.0 session state.

In MQTT v3 and v4, the only subscription information that were part of
the session state was the topic filter and the QoS level.
Both information were implicitly stored in the form of bindings:
The topic filter as the binding key and the QoS level as the destination
queue name of the binding.

For MQTT v5 we need to persist more subscription information.
From a domain perspective, it makes sense to store subscription options
as part of subscriptions, i.e. bindings, even though they are currently
not used in routing.
Therefore, this commits stores subscription options as binding arguments.

Storing subscription options as binding arguments comes in turn with
new challenges: How to handle mixed version clusters and upgrading an
MQTT session from v3 or v4 to v5?
Imagine an MQTT client connects via v5 with Session Expiry Interval > 0
to a new node in a mixed version cluster, creates a subscription,
disconnects, and subsequently connects via v3 to an old node. The
client should continue to receive messages.

To simplify such edge cases, this commit introduces a new feature flag
called mqtt_v5. If mqtt_v5 is disabled, clients cannot connect to
RabbitMQ via MQTT 5.0.

This still doesn't entirely solve the problem of MQTT session upgrades
(v4 to v5 client) or session downgrades (v5 to v4 client).

Ideally, once mqtt_v5 is enabled, all MQTT bindings contain non-empty binding
arguments. However, this will require a feature flag migration function
to modify all MQTT bindings. To be more precise, all MQTT bindings need
to be deleted and added because the binding argument is part of the
Mnesia table key.

Since feature flag migration functions are non-trivial to implement in
RabbitMQ (they can run on every node multiple times and concurrently),
this commit takes a simpler approach:
All v3 / v4 sessions keep the empty binding argument [].
All v5 sessions use the new binding argument [#mqtt_subscription_opts{}].

This requires only handling a session upgrade / downgrade by
creating a binding (with the new binding arg) and deleting the old
binding (with the old binding arg) when processing the CONNECT packet.

Note that such session upgrades or downgrades should be rather rare in
practice. Therefore these binding transactions shouldn't hurt peformance.

The No Local option is implemented within the MQTT publishing connection
process: The message is not sent to the MQTT destination if the
destination queue name matches the current MQTT client ID and the
message was routed due to a subscription that has the No Local flag set.
This avoids unnecessary traffic on the MQTT queue.
The alternative would have been that the "receiving side" (same process)
filters the message out - which would have been more consistent in how
Retain As Published and Subscription Identifiers are implemented, but
would have caused unnecessary load on the MQTT queue.
2023-06-21 17:14:08 +01:00
David Ansari 2efd9c06b8 Support Session Expiry Interval
Allow Session Expiry Interval to be changed when client DISCONNECTs.

Deprecate config subscription_ttl in favour of max_session_expiry_interval_secs
because the Session Expiry Interval will also apply to publishers that
connect with a will message and will delay interval.
"The additional session state of an MQTT v5 server includes:
* The Will Message and the Will Delay Interval
* If the Session is currently not connected, the time at which the Session
  will end and Session State will be discarded."

The Session Expiry Interval picked by the server and sent to the client
in the CONNACK is the minimum of max_session_expiry_interval_secs and
the requested Session Expiry Interval by the client in CONNECT.

This commit favours dynamically changing the queue argument x-expires
over creating millions of different policies since that many policies
will come with new scalability issues.

Dynamically changing queue arguments is not allowed by AMQP 0.9.1
clients. However, it should be perfectly okay for the MQTT plugin to do
so for the queues it manages. MQTT clients are not aware that these
queues exist.
2023-06-21 17:14:08 +01:00
David Ansari 66fe9630b5 Add Message Expiry Interval for retained messages
MQTT v5 spec:
"If the current retained message for a Topic expires, it is discarded
and there will be no retained message for that topic."

This commit also supports Message Expiry Interval for retained messages
when a node is restarted.
Therefore, the insertion timestamp needs to be stored on disk.
Upon recovery, the Erlang timers are re-created.
2023-06-21 17:14:08 +01:00
David Ansari 49f1071591 Add MQTT v5 feature Maximum Packet Size set by client
"Allow the Client and Server to independently specify the maximum
packet size they support. It is an error for the session partner
to send a larger packet."

This commit implements the part where the Client specifies the maximum
packet size.

As per protocol spec, instead of sending, the server drops the MQTT packet
if it's too large.
A debug message is logged for "infrequent" packet types.

For PUBLISH packets, the messages is rejected to the queue such that it
will be dead lettered, if dead lettering is configured.
At the very least, Prometheus metrics for dead lettered messages will
be increased, even if dead lettering is not configured.
2023-06-21 17:14:08 +01:00
David Ansari c44b546f73 Test MQTT v5 in existing MQTT suites 2023-06-21 17:14:08 +01:00
David Ansari 83eede7ef2 Keep storing MQTT client IDs as lists in Ra
Up to 3.11.x an MQTT client ID is tracked in Ra
as a list of bytes as returned by binary_to_list/1 in
48467d6e12/deps/rabbitmq_mqtt/src/rabbit_mqtt_frame.erl (L137)

This has two downsides:
1. Lists consume more memory than binaries (when tracking many clients).
2. It violates the MQTT spec which states
   "The ClientId MUST be a UTF-8 encoded string as defined in Section 1.5.3 [MQTT-3.1.3-4]." [v4 3.1.3.1]

Therefore, the original idea was to always store MQTT client IDs as
binaries starting with Native MQTT in 3.12.
However, this leads to client ID tracking misbehaving in mixed version
clusters since new nodes would register client IDs as binaries and old
nodes would register client IDs as lists. This means that a client
registering on a new node with the same client ID as a connection to the
old node did not terminate the connection on the old node.

Therefore, for backwards compatibility, we leave the client ID as a list of bytes
in the Ra machine state because the feature flag delete_ra_cluster_mqtt_node
introduced in v3.12 will delete the Ra cluster anyway and
the new client ID tracking via pg local will store client IDs as
binaries.

An interesting side note learned here is that the compiled file
rabbit_mqtt_collector must not be changed. This commit only modifies
function specs. However as soon as the compiled code is changed, this
module becomes a new version. The new version causes the anonymous ra query
function to fail in mixed clusters: When the old node does a
ra:leader_query where the leader is on the new node, the query function
fails on the new node with `badfun` because the new node does not have
the same module version. For more context, read:
https://web.archive.org/web/20181017104411/http://www.javalimit.com/2010/05/passing-funs-to-other-erlang-nodes.html
2023-04-28 07:57:23 +00:00
David Ansari 02cf072ae4 Restrict MQTT CONNECT packet size
In MQTT 3.1.1, the CONNECT packet consists of
1. 10 bytes variable header
2. ClientId (up to 23 bytes must be supported)
3. Will Topic
4. Will Message (maximum length 2^16 bytes)
5. User Name
6. Password

Restricting the CONNECT packet size to 2^16 = 65,536 bytes
seems to be a reasonalbe default.

The value is configurable via the MQTT app parameter
`max_packet_size_unauthenticated`.

(Instead of being called `max_packet_size_connect`) the
name `max_packet_size_unauthenticated` is generic
because MQTT 5 introduces an AUTH packet type.
2023-01-29 15:00:19 +00:00
Chunyi Lyu 209f23fa2f
Revert "Format MQTT code with `erlfmt`" 2023-01-27 18:25:57 +00:00
Chunyi Lyu 1de9fcf582 Format mqtt files with erlfmt 2023-01-27 11:06:41 +00:00
David Ansari d651f87ea7 Share tests between MQTT and Web MQTT
New test suite deps/rabbitmq_mqtt/test/shared_SUITE contains tests that
are executed against both MQTT and Web MQTT.

This has two major advantages:
1. Eliminates test code duplication between rabbitmq_mqtt and
rabbitmq_web_mqtt making the tests easier to maintain and to understand.
2. Increases test coverage of Web MQTT.

It's acceptable to add a **test** dependency from rabbitmq_mqtt to
rabbitmq_web_mqtt. Obviously, there should be no such dependency
for non-test code.
2023-01-24 17:32:59 +00:00
David Ansari bd0acb33e4 Remove test helper util:connect_to_node/3
because this method is superfluous given that util:connect
already exists.
2023-01-24 17:30:10 +00:00
David Ansari 97fefff0fe Add overflow drop-head to rabbit_mqtt_qos_queue type
Traditionally, queue types implement flow control by keeping state in
both sending and receiving Erlang processes (for example credit based flow
control keeps the number of credits within the process dictionary).

The rabbit_mqtt_qos0_queue cannot keep such state in sending or receiving
Erlang process because such state would consume a large amount of memory
in case of large fan-ins or large fan-outs.
The whole idea of the rabbit_mqtt_qos_queue type is to not keep any
state in the rabbit_queue_type client. This makes this new queue
type scale so well.

Therefore the new queue type cannot (easily) implement flow control
throttling individual senders.

In this commit, we take a different approach:
Instead of implementing flow control throttling individual senders,
the receiving MQTT connection process drops QoS 0 messages from the
rabbit_mqtt_qos_queue if it is overflowed with messages AND its MQTT
client is not capable of receiving messages fast enough.

This is a simple and sufficient solution because it's better to drop QoS
0 (at most once) messages instead of causing cluster-wide memory alarms.

The user can opt out of dropping messages by setting the new env setting
mailbox_soft_limit to 0.

Additionally, we reduce the send_timeout from 30 seconds default in
Ranch to 15 seconds default in MQTT. This will detect hanging MQTT
clients faster causing the MQTT connection to be closed.
2023-01-24 17:30:10 +00:00
Chunyi Lyu de28560d8f Extract connect to node helper in rmq mqtt tests 2023-01-24 17:30:10 +00:00
Chunyi Lyu aea7ff8f8d Use helper to connect to node in mqtt cluster suite 2023-01-24 17:30:10 +00:00
Chunyi Lyu 30a9ea521e Use connect helper func in more mqtt tests
- reduce code duplication
- connect helper does not unlink the connection process by default
2023-01-24 17:30:10 +00:00
David Ansari 7bc8208a1b Remove local record definitions from header files
Record #state{} is purely local to rabbit_mqtt_reader.
Record #proc_state{} is purely local to rabbit_mqtt_processor.

Therefore, move these record definitions to the defining module.
This avoids unnecessarily exposing internal information.

Now, that #proc_state{} is defined in rabbit_mqtt_processor,
rename #proc_state to #state{}.
2023-01-24 17:30:10 +00:00
Chunyi Lyu c3779d9996 Implement message consuming counters in mqtt 2023-01-24 17:30:10 +00:00
Chunyi Lyu 0b43f002f5 Remove subscriptions map from proc state in mqtt
- subscriptions information can be retrieved directly from mnesia
- when unsubscribe, we check if there is binding between topic name
and queue (check for both qos0 queue name and qos1 queue name) to
unbind
- added a boolean value has_subs in proc state which will indicate
if connection has any active subscriptions. Used for setting consumer
global counter
2023-01-24 17:29:07 +00:00
David Ansari aad7e1cdf6 Add test for consuming MQTT classic queue going down 2023-01-24 17:29:07 +00:00
David Ansari bda52dbf64 Support consuming classic mirrored queue failover
Some users use classic mirrored queues for MQTT queues by
applying a policy.

Given that classic mirrored queues are deprecated, but still supported
in RabbitMQ 3.x, native MQTT must support classic mirrored queues.
2023-01-24 17:29:07 +00:00
David Ansari b97006c4b9 Output username in connection closed event 2023-01-24 17:29:07 +00:00
Chunyi Lyu 96854a8c4c Use emqtt:publish in mqtt tests
- rename publish_qos1 to publish_qos1_timeout
since it's only been used for handling publisher timeout
more gracefully in tests
2023-01-24 17:29:07 +00:00
David Ansari 33bf2150a5 Add test for publishing via MQTT to different queue types 2023-01-24 17:29:07 +00:00
David Ansari 199238d76e Use pg to track MQTT client IDs
Instead of tracking {Vhost, ClientId} to ConnectionPid mappings in our
custom process registry, i.e. custom local ETS table with a custom
gen_server process managing that ETS table, this commit uses the pg module
because pg is better tested.

To save memory with millions of MQTT client connections, we want to save
the mappings only locally on the node where the connection resides and
therfore not be replicated across all nodes.

According to Maxim Fedorov:
"The easy way to provide per-node unique pg scope is to start it like
pg:start_link(node()). At least that's what we've been doing to have
node-local scopes. It will still try to discover scopes on nodeup from
nodes joining the cluster, but since you cannot have nodes with the
same name in one cluster, using node() for local-only scopes worked
well for us."

So that's what we're doing in this commit.
2023-01-24 17:29:07 +00:00