When RabbitMQ enters maintenance mode / is being drained, all client
connections are closed.
This commit sends a DISCONNECT packet to (Web) MQTT 5.0 clients with
Reason Code "Server shutting down" before the connection is closed.
Previously, the Will Message could be kept in memory in the MQTT
connection process state. Upon termination, the Will Message is sent.
The new MQTT 5.0 feature Will Delay Interval requires storing the Will
Message outside of the MQTT connection process state.
The Will Message should not be stored node local because the client
could reconnect to a different node.
Storing the Will Message in Mnesia is not an option because we want to
get rid of Mnesia. Storing the Will Message in a Ra cluster or in Khepri
is only an option if the Will Payload is small as there is currently no
way in Ra to **efficiently** snapshot large binary data (Note that these
Will Messages are not consumed in a FIFO style workload like messages in
quorum queues. A Will Message needs to be stored for as long as the
Session lasts - up to 1 day by default, but could also be much longer if
RabbitMQ is configured with a higher maximum session expiry interval.)
Usually Will Payloads are small: They are just a notification that its
MQTT session ended abnormally. However, we don't know how users leverage
the Will Message feature. The MQTT protocol allows for large Will Payloads.
Therefore, the solution implemented in this commit - which should work
good enough - is storing the Will Message in a queue.
Each MQTT session which has a Session Expiry Interval and Will Delay
Interval of > 0 seconds will create a queue if the current Network
Connection ends where it stores its Will Message. The Will Message has a
message TTL set (corresponds to the Will Delay Interval) and the queue
has a queue TTL set (corresponds to the Session Expiry Interval).
If the client does not reconnect within the Will Delay Interval, the
message is dead lettered to the configured MQTT topic exchange
(amq.topic by default).
The Will Delay Interval can be set by both publishers and subscribers.
Therefore, the Will Message is the 1st session state that RabbitMQ keeps
for publish-only MQTT clients.
One current limitation of this commit is that a Will Message that is
delayed (i.e. Will Delay Interval is set) and retained (i.e. Will Retain
flag set) will not be retained.
One solution to retain delayed Will Messages is that the retainer
process consumes from a queue and the queue binds to the topic exchange
with a topic starting with `$`, for example `$retain/#`.
The AMQP 0.9.1 Will Message that is dead lettered could then be added a
CC header such that it won't not only be published with the Will Topic,
but also with `$retain` topic. For example, if the Will Topic is `a/b`,
it will publish with routing key `a/b` and CC header `$retain/a/b`.
The reason this is not implemented in this commit is that to keep the
currently broken retained message store behaviour, we would require
creating at least one queue per node and publishing only to that local
queue. In future, once we have a replicated retained message store based
on a Stream for example, we could just publish all retained messages to
the `$retain` topic and thefore into the Stream.
So, for now, we list "retained and delayed Will Messages" as a limitation
that they actually won't be retained.
- when clients connect with a duplicate client id;
disconnect with reason code session taken over 142
- when keep alive has timed out;
disconnect with reason code keep alive timeout 141
emqtt repos:
emqx/emqtt PR #196 is based on rabbitmq:otp-26-compatibility
emqx/emqtt PR #198 is based on ansd:master
rabbitmq/master contains both of these 2 PRs cherry-picked.
rabbitmq-server repos:
main branch points emqtt to rabbitmq:otp-26-compatibility
mqtt5 branch points emqtt to rabbitmq:master
Therefore, the current mqtt5 branch is OTP 26 compatible and can support
multiple subscription identifiers.
- "If the Server included a Maximum QoS in its CONNACK response
to a Client and it receives a PUBLISH packet with a QoS greater than this
then it uses DISCONNECT with Reason Code 0x9B (QoS not supported)"
- only affects mqtt v5, server max qos is 1
"Allow the Client and Server to independently specify the maximum
packet size they support. It is an error for the session partner
to send a larger packet."
This commit implements the part where the Server specifies the maximum
packet size.
"In the case of an error in a CONNECT packet it MAY send a CONNACK
packet containing the Reason Code, before closing the Network
Connection. In the case of an error in any other packet it SHOULD send a
DISCONNECT packet containing the Reason Code before closing the Network
Connection."
This commit implements only the "SHOULD" (second) part, not the "MAY"
(first) part.
There are now 2 different global wide MQTT settings on the server:
1. max_packet_size_unauthenticated which applies to the CONNECT packet
(and maybe AUTH packet in the future)
2. max_packet_size_authenticated which applies to all other MQTT
packets (that is, after the client successfully authenticated).
These two settings will apply to all MQTT versions.
In MQTT v5, if a non-CONNECT packet is too large, the server will send a
DISCONNECT packet to the client with Reason Code "Packet Too Large"
before closing the network connection.
As requested in https://github.com/rabbitmq/rabbitmq-server/discussions/6331#discussioncomment-5796154
include all infos that were emitted in the MQTT connection created event also
in the MQTT connection closed event.
This ensures infos such as MQTT client ID are part of the connection
closed event.
Therefore, it's easy for the user to correlate between the two event
types.
Note that the MQTT plugin emits connection created and connection closed events only if
the CONNECT packet was successfully processed, i.e.authentication was successful.
Remove the disconnected_at property because it was never used.
rabbit_event already adds a timestamp to any event.
This change should be reverted once emqx/emqtt is OTP26 compatible.
Our fork/branch isn't either at this point, but at least partially
works. Let's use this branch for now to uncover server-side OTP26
incompatibilities (and continue working on OTP26 support for emqtt of
course).
Bazel build files are now maintained primarily with `bazel run
gazelle`. This will analyze and merge changes into the build files as
necessitated by certain code changes (e.g. the introduction of new
modules).
In some cases there hints to gazelle in the build files, such as `#
gazelle:erlang...` or `# keep` comments. xref checks on plugins that
depend on the cli are a good example.
"If the Client supplies a zero-byte ClientId with CleanSession set to 0,
the Server MUST respond to the CONNECT Packet with a CONNACK return code 0x02
(Identifier rejected) and then close the Network Connection" [MQTT-3.1.3-8].
In Web MQTT, the CONNACK was not sent to the client because the Web MQTT
connection process terminated before being sending the CONNACK to the
client.
This is the latest commit in the series, it fixes (almost) all the
problems with missing and circular dependencies for typing.
The only 2 unsolved problems are:
- `lg` dependency for `rabbit` - the problem is that it's the only
dependency that contains NIF. And there is no way to make dialyzer
ignore it - looks like unknown check is not suppressable by dialyzer
directives. In the future making `lg` a proper dependency can be a
good thing anyway.
- some missing elixir function in `rabbitmq_cli` (CSV, JSON and
logging related).
- `eetcd` dependency for `rabbitmq_peer_discovery_etcd` - this one
uses sub-directories in `src/`, which confuses dialyzer (or our bazel
machinery is not able to properly handle it). I've tried the latest
rules_erlang which flattens directory for .beam files, but it wasn't
enough for dialyzer - it wasn't able to find core erlang files. This
is a niche plugin and an unusual dependency, so probably not worth
investigating further.
This commit is pure refactoring making the code base more maintainable.
Replace rabbit_misc:pipeline/3 with the new OTP 25 experimental maybe
expression because
"Frequent ways in which people work with sequences of failable
operations include folds over lists of functions, and abusing list
comprehensions. Both patterns have heavy weaknesses that makes them less
than ideal."
https://www.erlang.org/eeps/eep-0049#obsoleting-messy-patterns
Additionally, this commit is more restrictive in the type spec of
rabbit_mqtt_processor state fields.
Specifically, many fields were defined to be `undefined | T` where
`undefined` was only temporarily until the first CONNECT packet was
processed by the processor.
It's better to initialise the MQTT processor upon first CONNECT packet
because there is no point in having a processor without having received
any packet.
This allows many type specs in the processor to change from `undefined |
T` to just `T`.
Additionally, memory is saved by removing the `received_connect_packet`
field from the `rabbit_mqtt_reader` and `rabbit_web_mqtt_handler`.
as it seems to always match peer_host.
Commit 7e09b85426 adds peer address
provided by WebMQTT plugin.
However, this seems unnecessary since function rabbit_net:peername/1 on
the unwrapped socket provides the same address.
The peer address was the address of the proxy if the proxy protocol is
enabled.
This commit simplifies code and reduces memory consumption.
In MQTT 3.1.1, the CONNECT packet consists of
1. 10 bytes variable header
2. ClientId (up to 23 bytes must be supported)
3. Will Topic
4. Will Message (maximum length 2^16 bytes)
5. User Name
6. Password
Restricting the CONNECT packet size to 2^16 = 65,536 bytes
seems to be a reasonalbe default.
The value is configurable via the MQTT app parameter
`max_packet_size_unauthenticated`.
(Instead of being called `max_packet_size_connect`) the
name `max_packet_size_unauthenticated` is generic
because MQTT 5 introduces an AUTH packet type.
We want the build to fail if there are any dialyzer warnings in
rabbitmq_mqtt or rabbitmq_web_mqtt. Otherwise we rely on people manually
executing and checking the results of dialyzer.
Also, we want any test to fail that is flaky.
Flaky tests can indicate subtle errors in either test or program execution.
Instead of marking them as flaky, we should understand and - if possible -
fix the underlying root cause.
Fix OTP 25.0 dialyzer warning
Type gen_server:format_status() is known in OTP 25.2, but not in 25.0
When a cluster wide memory or disk alarm is fired, in AMQP 0.9.1 only
connections that are publishing messages get blocked.
Connections that only consume can continue to empty the queues.
Prior to this commit, all MQTT connections got blocked during a memory
or disk alarm. This has two downsides:
1. MQTT connections that only consume, but never published, cannot empty
queues anymore.
2. If the memory or disk alarm takes long, the MQTT client does not
receive a PINGRESP from the server when it sends a PINGREQ potentially
leading to mass client disconnection (depending on the MQTT client
implementation).
This commit makes sure that an MQTT connection that never sent a single
PUBLISH packet (e.g. "pure" MQTT subscribers) are not blocked during
memory or disk alarms.
In contrast to AMQP 0.9.1, new connections are still blocked from being
accepted because accepting (many) new MQTT connections also lead to
increased resource usage.
The implemenation as done in this commit is simpler, but also more naive
than the logic in rabbit_reader: rabbit_reader blocks connections more
dynamically whereas rabbit_mqtt_reader and rabbit_web_mqtt_handler
block a connection if the connection ever sent a single PUBLISH packet
during its lifetime.
Convert from the old rabbit_log* API to the new Logger macros for MQTT
and Web MQTT connections.
Advantages:
* metadata mfa, file, line, pid, gl, time is auto-inserted by Logger.
* Log lines output by the shared module rabbit_mqtt_processor now
include via the domain whether it's a MQTT or Web MQTT connection.
Instead of using domain [rabbitmq, connection], this commit now uses
the smaller and more specialized domains [rabbitmq, connection, mqtt] and
[rabbitmq, connection, web_mqtt] for MQTT and Web MQTT processes
respectively, resulting in the following example output:
"msg":"Received a CONNECT,", "domain":"rabbitmq.connection.mqtt"
or
"msg":"Received a CONNECT,", "domain":"rabbitmq.connection.web_mqtt"
New test suite deps/rabbitmq_mqtt/test/shared_SUITE contains tests that
are executed against both MQTT and Web MQTT.
This has two major advantages:
1. Eliminates test code duplication between rabbitmq_mqtt and
rabbitmq_web_mqtt making the tests easier to maintain and to understand.
2. Increases test coverage of Web MQTT.
It's acceptable to add a **test** dependency from rabbitmq_mqtt to
rabbitmq_web_mqtt. Obviously, there should be no such dependency
for non-test code.
Previously (until RabbitMQ v3.11.x), a memory or disk alarm did
not block the Web MQTT connection because this feature was only
implemented half way through: The part that registers the Web MQTT
connection with rabbit_alarm was missing.
1. Allow to inspect an (web) MQTT connection.
2. Show MQTT client ID on connection page as part of client_properties.
3. Handle force_event_refresh (when management_plugin gets enabled
after (web) MQTT connections got created).
4. Reduce code duplication between protocol readers.
5. Display '?' instead of 'NaN' in UI for absent queue metrics.
6. Allow an (web) MQTT connection to be closed via management_plugin.
For 6. this commit takes the same approach as already done for the stream
plugin:
The stream plugin registers neither with {type, network} nor {type,
direct}.
We cannot use gen_server:call/3 anymore to close the connection
because the web MQTT connection cannot handle gen_server calls (only
casts).
Strictly speaking, this commit requires a feature flag to allow to force
closing stream connections from the management plugin during a rolling
update. However, given that this is rather an edge case, and there is a
workaround (connect to the node directly hosting the stream connection),
this commit will not introduce a new feature flag.
- add a wrapper for mqtt parse/2 to prevent crashing when
parse/2 fails to parse a packet
- add invalid packet test case for web mqtt; server will respond
status code 1007, same status code as framing error
The MQTT protocol specs define the term "MQTT Control Packet".
The MQTT specs never talk about "frame".
Let's reflect this naming in the source code since things get confusing
otherwise:
Packets belong to MQTT.
Frames belong to AMQP 0.9.1 or web sockets.
by removing the two fields referencing a function:
mqtt2amqp_fun and amqp2mqtt_fun
Each field required 1 words + 11 words for the function reference.
Therefore, for 1 million MQTT connections this commit saves:
(1+11) * 2 * 1,000,000 words
= 192 MB of memory
In addition, the code is now simpler to understand.
There is no change in behaviour except for the sparkplug environment
variable being read upon application start.
We put the compiled regexes into persistent term because they are the
same for all MQTT connections.
Reduce memory usage of (Web) MQTT connection process and STOMP reader
process by storing the connection name as binary instead of string.
Previously:
82 = erts_debug:size("192.168.2.104:52497 -> 192.168.2.175:1883").
The binary <<"192.168.2.104:52497 -> 192.168.2.175:1883">>
requires 8 words.
So, for 1 million MQTT connections, this commit should save
(82 - 8) words * 1,000,000
= 592 MB
of memory.
- if #state.stats_timer is undefined, rabbit_event:if_enabled crashes
- remove compression related TODO from web_mqtt. It's a intentional
default behavior set in: https://github.com/rabbitmq/rabbitmq-web-mqtt/pull/35
"The Will Message MUST be published when the Network Connection is subsequently
closed unless the Will Message has been deleted by the Server on receipt of a
DISCONNECT Packet [MQTT-3.1.2-8].
Situations in which the Will Message is published include, but are not limited to:
• An I/O error or network failure detected by the Server.
• The Client fails to communicate within the Keep Alive time.
• The Client closes the Network Connection without first sending a DISCONNECT Packet.
• The Server closes the Network Connection because of a protocol error.
"
Prior to this commit, the will message was not sent in all scenarios
where it should have been sent.
In this commit, the will message is always sent unless the client sent a
DISCONNECT packet to the server.
We achieve this by sending the will message in the terminate callback.
Note that the Reason passed into the terminate callback of
rabbit_web_mqtt_handler is the atom 'stop' (that is, we cannot pass a custom
reason here).
Therefore, in order to know within the terminate callback whether the client
sent a DISCONNECT packet, we have to modify the process state.
Rather than including a new field into the process state record which requires
1 additional word per MQTT connection (i.e. expensive with millions of
MQTT connection processes - we want to keep the process state small),
we intead modify the state just before stopping the process to
{SendWill, State}.
Some users use classic mirrored queues for MQTT queues by
applying a policy.
Given that classic mirrored queues are deprecated, but still supported
in RabbitMQ 3.x, native MQTT must support classic mirrored queues.
In mixed verion tests all non-required feature flags are disabled.
Therefore, MQTT client ID tracking happens in Ra.
The Ra client sends commands asynchronously when registering and
deregistering the client ID.
Also, add more tests.
Instead of performing credit_flow within quorum queue and stream queue
clients, return new {block | unblock, QueueName} actions.
The queue client process can then decide what to do.
For example, the channel continues to use credit_flow such that the
channel gets blocked sending any more credits to rabbit_reader.
However, the MQTT connection process does not use credit_flow. It
instead blocks its reader directly.
Prior to this commit, 1 MQTT publisher publishing to 1 Million target
classic queues requires around 680 MB of process memory.
After this commit, it requires around 290 MB of process memory.
This commit requires feature flag classic_queue_type_delivery_support
and introduces a new one called no_queue_name_in_classic_queue_client.
Instead of storing the binary queue name 4 times, this commit now stores
it only 1 time.
The monitor_registry is removed since only classic queue clients monitor
their classic queue server processes.
The classic queue client does not store the queue name anymore. Instead
the queue name is included in messages handled by the classic queue
client.
Storing the queue name in the record ctx was unnecessary.
More potential future memory optimisations:
* When routing to destination queues, looking up the queue record,
delivering to queue: Use streaming / batching instead of fetching all
at once
* Only fetch ETS columns that are necessary instead of whole queue
records
* Do not hold the same vhost binary in memory many times. Instead,
maintain a mapping.
* Remove unnecessary tuple fields.
Share the same MQTT keepalive code between rabbit_mqtt_reader and
rabbit_web_mqtt_handler.
Add MQTT keepalive test in both plugins rabbitmq_mqtt and
rabbitmq_web_mqtt.
Reducing number of Erlang processes allows better scaling with millions
of clients connecting via MQTT over WebSockets.
In this commit, we partially revert
9c153b2d40
where support for heartbeats were introduced.
Prior to this commit there were 4 processes per WebMQTT connection
supervised by ranch_conns_sup in the ranch application:
1. rabbit_web_mqtt_connection_sup
2. rabbit_web_mqtt_connection_sup (id=rabbit_web_mqtt_keepalive_sup)
3. rabbit_hearbeat (receiver)
4. rabbit_web_mqtt_handler connection process
After this commit, there is only the 4th process supervised directly by
ranch_conns_sup.
Also rework elixir dependency handling, so we no longer rely on mix to
fetch the rabbitmq_cli deps
Also:
- Specify ra version with a commit rather than a branch
- Fixup compilation options for erlang 23
- Add missing ra reference in MODULE.bazel
- Add missing flag in oci.yaml
- Reduce bazel rbe jobs to try to save memory
- Use bazel built erlang for erlang git master tests
- Use the same cache for all the workflows but windows
- Avoid using `mix local.hex --force` in elixir rules
- Fetching seems blocked in CI, and this should reduce hex api usage in
all builds, which is always nice
- Remove xref and dialyze tags since rules_erlang 3 includes them in
the defaults
bazel-erlang has been renamed rules_erlang. v2 is a substantial
refactor that brings Windows support. While this alone isn't enough to
run all rabbitmq-server suites on windows, one can at least now start
the broker (bazel run broker) and run the tests that do not start a
background broker process