Require all MQTT feature flags and remove their compatibility code:
* delete_ra_cluster_mqtt_node
* rabbit_mqtt_qos0_queue
* mqtt_v5
These feature flags were introduced in or before 3.13.0.
The Web MQTT link is not used in the rabbitmq_mqtt Erlang app.
This link is only used in the rabbitmq_web_mqtt Erlang app.
Hence, move the link to the correct Erlang app.
the original paths, e.g. /streams.html, do have redirects
in place but it turned out to be a surprisingly fragile
Cloudflare feature when there are hundreds of them,
so we better switch now.
To refine conversion behaviour add additional tests
and ensure it matches the documentation.
mc: optionally capture source environment
And pass target environment to mc:convert
This allows environmental data and configuration to be captured and
used to modify and complete conversion logic whilst allowing conversion
code to remain pure and portable.
This PR implements an approach for a "protocol (data format) agnostic core" where the format of the message isn't converted at point of reception.
Currently all non AMQP 0.9.1 originating messages are converted into a AMQP 0.9.1 flavoured basic_message record before sent to a queue. If the messages are then consumed by the originating protocol they are converted back from AMQP 0.9.1. For some protocols such as MQTT 3.1 this isn't too expensive as MQTT is mostly a fairly easily mapped subset of AMQP 0.9.1 but for others such as AMQP 1.0 the conversions are awkward and in some cases lossy even if consuming from the originating protocol.
This PR instead wraps all incoming messages in their originating form into a generic, extensible message container type (mc). The container module exposes an API to get common message details such as size and various properties (ttl, priority etc) directly from the source data type. Each protocol needs to implement the mc behaviour such that when a message originating form one protocol is consumed by another protocol we convert it to the target protocol at that point.
The message container also contains annotations, dead letter records and other meta data we need to record during the lifetime of a message. The original protocol message is never modified unless it is consumed.
This includes conversion modules to and from amqp, amqpl (AMQP 0.9.1) and mqtt.
COMMIT HISTORY:
* Refactor away from using the delivery{} record
In many places including exchange types. This should make it
easier to move towards using a message container type instead of
basic_message.
Add mc module and move direct replies outside of exchange
Lots of changes incl classic queues
Implement stream support incl amqp conversions
simplify mc state record
move mc.erl
mc dlx stuff
recent history exchange
Make tracking work
But doesn't take a protocol agnostic approach as we just convert
everything into AMQP legacy and back. Might be good enough for now.
Tracing as a whole may want a bit of a re-vamp at some point.
tidy
make quorum queue peek work by legacy conversion
dead lettering fixes
dead lettering fixes
CMQ fixes
rabbit_trace type fixes
fixes
fix
Fix classic queue props
test assertion fix
feature flag and backwards compat
Enable message_container feature flag in some SUITEs
Dialyzer fixes
fixes
fix
test fixes
Various
Manually update a gazelle generated file
until a gazelle enhancement can be made
https://github.com/rabbitmq/rules_erlang/issues/185
Add message_containers_SUITE to bazel
and regen bazel files with gazelle from rules_erlang@main
Simplify essential proprty access
Such as durable, ttl and priority by extracting them into annotations
at message container init time.
Move type
to remove dependenc on amqp10 stuff in mc.erl
mostly because I don't know how to make bazel do the right thing
add more stuff
Refine routing header stuff
wip
Cosmetics
Do not use "maybe" as type name as "maybe" is a keyword since OTP 25
which makes Erlang LS complain.
* Dedup death queue names
* Fix function clause crashes
Fix failing tests in the MQTT shared_SUITE:
A classic queue message ID can be undefined as set in
fbe79ff47b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl (L1048)
Fix failing tests in the MQTT shared_SUITE-mixed:
When feature flag message_containers is disabled, the
message is not an #mc{} record, but a #basic_message{} record.
* Fix is_utf8_no_null crash
Prior to this commit, the function crashed if invalid UTF-8 was
provided, e.g.:
```
1> rabbit_misc:is_valid_shortstr(<<"😇"/utf16>>).
** exception error: no function clause matching rabbit_misc:is_utf8_no_null(<<216,61,222,7>>) (rabbit_misc.erl, line 1481)
```
* Implement mqtt mc behaviour
For now via amqp translation.
This is still work in progress, but the following SUITEs pass:
```
make -C deps/rabbitmq_mqtt ct-shared t=[mqtt,v5,cluster_size_1] FULL=1
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_1] FULL=1
```
* Shorten mc file names
Module name length matters because for each persistent message the #mc{}
record is persisted to disk.
```
1> iolist_size(term_to_iovec({mc, rabbit_mc_amqp_legacy})).
30
2> iolist_size(term_to_iovec({mc, mc_amqpl})).
17
```
This commit renames the mc modules:
```
ag -l rabbit_mc_amqp_legacy | xargs sed -i 's/rabbit_mc_amqp_legacy/mc_amqpl/g'
ag -l rabbit_mc_amqp | xargs sed -i 's/rabbit_mc_amqp/mc_amqp/g'
ag -l rabbit_mqtt_mc | xargs sed -i 's/rabbit_mqtt_mc/mc_mqtt/g'
```
* mc: make deaths an annotation + fixes
* Fix mc_mqtt protocol_state callback
* Fix test will_delay_node_restart
```
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_3]:will_delay_node_restart FULL=1
```
* Bazel run gazelle
* mix format rabbitmqctl.ex
* Ensure ttl annotation is refelected in amqp legacy protocol state
* Fix id access in message store
* Fix rabbit_message_interceptor_SUITE
* dializer fixes
* Fix rabbit:rabbit_message_interceptor_SUITE-mixed
set_annotation/3 should not result in duplicate keys
* Fix MQTT shared_SUITE-mixed
Up to 3.12 non-MQTT publishes were always QoS 1 regardless of delivery_mode.
75a953ce28/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl (L2075-L2076)
From now on, non-MQTT publishes are QoS 1 if durable.
This makes more sense.
The MQTT plugin must send a #basic_message{} to an old node that does
not understand message containers.
* Field content of 'v1_0.data' can be binary
Fix
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed \
--test_env FOCUS="-group [mqtt,v4,cluster_size_1] -case trace" \
-t- --test_sharding_strategy=disabled
```
* Remove route/2 and implement route/3 for all exchange types.
This removes the route/2 callback from rabbit_exchange_type and
makes route/3 mandatory instead. This is a breaking change and
will require all implementations of exchange types to update their
code, however this is necessary anyway for them to correctly handle
the mc type.
stream filtering fixes
* Translate directly from MQTT to AMQP 0.9.1
* handle undecoded properties in mc_compat
amqpl: put clause in right order
recover death deatails from amqp data
* Replace callback init_amqp with convert_from
* Fix return value of lists:keyfind/3
* Translate directly from AMQP 0.9.1 to MQTT
* Fix MQTT payload size
MQTT payload can be a list when converted from AMQP 0.9.1 for example
First conversions tests
Plus some other conversion related fixes.
bazel
bazel
translate amqp 1.0 null to undefined
mc: property/2 and correlation_id/message_id return type tagged values.
To ensure we can support a variety of types better.
The type type tags are AMQP 1.0 flavoured.
fix death recovery
mc_mqtt: impl new api
Add callbacks to allow protocols to compact data before storage
And make readable if needing to query things repeatedly.
bazel fix
* more decoding
* tracking mixed versions compat
* mc: flip default of `durable` annotation to save some data.
Assuming most messages are durable and that in memory messages suffer less
from persistence overhead it makes sense for a non existent `durable`
annotation to mean durable=true.
* mc conversion tests and tidy up
* mc make x_header unstrict again
* amqpl: death record fixes
* bazel
* amqp -> amqpl conversion test
* Fix crash in mc_amqp:size/1
Body can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0/ ct-system t=java
```
on branch native-amqp.
* Fix crash in lists:flatten/1
Data can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp.
* Fix crash in rabbit_writer
Running test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp resulted in the following crash:
```
crasher:
initial call: rabbit_writer:enter_mainloop/2
pid: <0.711.0>
registered_name: []
exception error: bad argument
in function size/1
called as size([<<0>>,<<"Sw">>,[<<160,2>>,<<"hi">>]])
*** argument 1: not tuple or binary
in call from rabbit_binary_generator:build_content_frames/7 (rabbit_binary_generator.erl, line 89)
in call from rabbit_binary_generator:build_simple_content_frames/4 (rabbit_binary_generator.erl, line 61)
in call from rabbit_writer:assemble_frames/5 (rabbit_writer.erl, line 334)
in call from rabbit_writer:internal_send_command_async/3 (rabbit_writer.erl, line 365)
in call from rabbit_writer:handle_message/2 (rabbit_writer.erl, line 265)
in call from rabbit_writer:handle_message/3 (rabbit_writer.erl, line 232)
in call from rabbit_writer:mainloop1/2 (rabbit_writer.erl, line 223)
```
because #content.payload_fragments_rev is currently supposed to
be a flat list of binaries instead of being an iolist.
This commit fixes this crash inefficiently by calling
iolist_to_binary/1. A better solution would be to allow AMQP legacy's #content.payload_fragments_rev
to be an iolist.
* Add accidentally deleted line back
* mc: optimise mc_amqp internal format
By removint the outer records for message and delivery annotations
as well as application properties and footers.
* mc: optimis mc_amqp map_add by using upsert
* mc: refactoring and bug fixes
* mc_SUITE routingheader assertions
* mc remove serialize/1 callback as only used by amqp
* mc_amqp: avoid returning a nested list from protocol_state
* test and bug fix
* move infer_type to mc_util
* mc fixes and additiona assertions
* Support headers exchange routing for MQTT messages
When a headers exchange is bound to the MQTT topic exchange, routing
will be performend based on both MQTT topic (by the topic exchange) and
MQTT User Property (by the headers exchange).
This combines the best worlds of both MQTT 5.0 and AMQP 0.9.1 and
enables powerful routing topologies.
When the User Property contains the same name multiple times, only the
last name (and value) will be considered by the headers exchange.
* Fix crash when sending from stream to amqpl
When publishing a message via the stream protocol and consuming it via
AMQP 0.9.1, the following crash occurred prior to this commit:
```
crasher:
initial call: rabbit_channel:init/1
pid: <0.818.0>
registered_name: []
exception exit: {{badmatch,undefined},
[{rabbit_channel,handle_deliver0,4,
[{file,"rabbit_channel.erl"},
{line,2728}]},
{lists,foldl,3,[{file,"lists.erl"},{line,1594}]},
{rabbit_channel,handle_cast,2,
[{file,"rabbit_channel.erl"},
{line,728}]},
{gen_server2,handle_msg,2,
[{file,"gen_server2.erl"},{line,1056}]},
{proc_lib,wake_up,3,
[{file,"proc_lib.erl"},{line,251}]}]}
```
This commit first gives `mc:init/3` the chance to set exchange and
routing_keys annotations.
If not set, `rabbit_stream_queue` will set these annotations assuming
the message was originally published via the stream protocol.
* Support consistent hash exchange routing for MQTT 5.0
When a consistent hash exchange is bound to the MQTT topic exchange,
MQTT 5.0 messages can be routed to queues consistently based on the
Correlation-Data in the PUBLISH packet.
* Convert MQTT 5.0 User Property
* to AMQP 0.9.1 headers
* from AMQP 0.9.1 headers
* to AMQP 1.0 application properties and message annotations
* from AMQP 1.0 application properties and message annotations
* Make use of Annotations in mc_mqtt:protocol_state/2
mc_mqtt:protocol_state/2 includes Annotations as parameter.
It's cleaner to make use of these Annotations when computing the
protocol state instead of relying on the caller (rabbitmq_mqtt_processor)
to compute the protocol state.
* Enforce AMQP 0.9.1 field name length limit
The AMQP 0.9.1 spec prohibits field names longer than 128 characters.
Therefore, when converting AMQP 1.0 message annotations, application
properties or MQTT 5.0 User Property to AMQP 0.9.1 headers, drop any
names longer than 128 characters.
* Fix type specs
Apply feedback from Michael Davis
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
* Add mc_mqtt unit test suite
Implement mc_mqtt:x_header/2
* Translate indicator that payload is UTF-8 encoded
when converting between MQTT 5.0 and AMQP 1.0
* Translate single amqp-value section from AMQP 1.0 to MQTT
Convert to a text representation, if possible, and indicate to MQTT
client that the payload is UTF-8 encoded. This way, the MQTT client will
be able to parse the payload.
If conversion to text representation is not possible, encode the payload
using the AMQP 1.0 type system and indiate the encoding via Content-Type
message/vnd.rabbitmq.amqp.
This Content-Type is not registered.
Type "message" makes sense since it's a message.
Vendor tree "vnd.rabbitmq.amqp" makes sense since merely subtype "amqp" is not
registered.
* Fix payload conversion
* Translate Response Topic between MQTT and AMQP
Translate MQTT 5.0 Response Topic to AMQP 1.0 reply-to address and vice
versa.
The Response Topic must be a UTF-8 encoded string.
This commit re-uses the already defined RabbitMQ target addresses:
```
"/topic/" RK Publish to amq.topic with routing key RK
"/exchange/" X "/" RK Publish to exchange X with routing key RK
```
By default, the MQTT topic exchange is configure dto be amq.topic using
the 1st target address.
When an operator modifies the mqtt.exchange, the 2nd target address is
used.
* Apply PR feedback
and fix formatting
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
* tidy up
* Add MQTT message_containers test
* consistent hash exchange: avoid amqp legacy conversion
When hashing on a header value.
* Avoid converting to amqp legacy when using exchange federation
* Fix test flake
* test and dialyzer fixes
* dialyzer fix
* Add MQTT protocol interoperability tests
Test receiving from and sending to MQTT 5.0 and
* AMQP 0.9.1
* AMQP 1.0
* STOMP
* Streams
* Regenerate portions of deps/rabbit/app.bzl with gazelle
I'm not exactly sure how this happened, but gazell seems to have been
run with an older version of the rules_erlang gazelle extension at
some point. This caused generation of a structure that is no longer
used. This commit updates the structure to the current pattern.
* mc: refactoring
* mc_amqpl: handle delivery annotations
Just in case they are included.
Also use iolist_to_iovec to create flat list of binaries when
converting from amqp with amqp encoded payload.
---------
Co-authored-by: David Ansari <david.ansari@gmx.de>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
Co-authored-by: Rin Kuryloski <kuryloskip@vmware.com>
and CLI as requested in
https://github.com/rabbitmq/rabbitmq-server/issues/2554#issuecomment-1604205277
"User Properties on the CONNECT packet can be used to send connection related properties from the Client to the Server.
The meaning of these properties is not defined by this specification."
[v5 3.1.2.11.8]
It makes sense to display the User Property of the CONNECT packet in the
Management UI in the connection's Client Properties.
Once the server's Topic Alias cache for messages from server to client
is full, this commit does not replace any existing aliases.
So, the first topics "win" and stay in the cache forever.
This matches the behaviour of VerneMQ and EMQX.
For now that's good enough.
In the future, we can easily change that behaviour to some smarter strategy,
for example
1. Hash the TopicName to an Topic Alias and replace the old
alias, or
2. For the Topic Alias Cache from server to client, keep 2 Maps:
#{TopicName => TopicAlias} and #{TopicAlias => TopicName} and a
counter that wraps to 1 once the Topic Alias Maximum is reached and
just replace an existing Alias if the TopicName is not cached.
Also, refactor Topic Alias Maximum:
* Remove code duplication
* Allow an operator to prohibit Topic Aliases by allowing value 0 to be
configured
* Change config name to topic_alias_maximum to that it matches exactly
the MQTT feture name
* Fix wrong code formatting
* Add the invalid or unkown Topic Alias to log message for easier
troubleshooting
The format of #mqtt_msg{} changes from 3.12 to 3.13.
In 3.13 the record contains 2 additional fields:
* props
* timestamp
The old #mqtt_msg{} might still be stored by the retained message store
in ets or dets.
This commit converts such an old message format when read from the
database.
The alternative would have been to run a migration function over the
whole table which is slightly more complex to implement.
Instead of giving the new message format a different record name,
e.g. #mqtt_msg_v2{}, this commit decides to re-use the same name such
that the new code only handles the record name #mqtt_msg{}.
The MQTT v5 spec is a bit vague on Retain Handling 1:
"If Retain Handling is set to 1 then if the subscription did not
already exist, the Server MUST send all retained message matching the
Topic Filter of the subscription to the Client, and if the subscription
did exist the Server MUST NOT send the retained messages.
[MQTT-3.3.1-10]." [v5 3.3.1.3]
Does a subscription with the same topic filter but different
subscription options mean that "the subscription did exist"?
This commit interprets "subscription exists" as both topic filter and
subscription options must be the same.
Therefore, if a client creates a subscription with a topic filter that
is identical to a previous subscription and subscription options that
are different and Retain Handling 1, the server sends the retained
message.
Support subscription options "No Local" and "Retain As Published"
as well as Subscription Identifiers.
All three MQTT 5.0 features can be set on a per subscription basis.
Due to wildcards in topic filters, multiple subscriptions
can match a given topic. Therefore, to implement Retain As Published and
Subscription Identifiers, the destination MQTT connection process needs
to know what subscription(s) caused it to receive the message.
There are a few ways how this could be implemented:
1. The destination MQTT connection process is aware of all its
subscriptions. Whenever, it receives a message, it can match the
message's routing key / topic against all its known topic filters.
However, to iteratively match the routing key against all topic
filters for every received message can become very expensive in the
worst case when the MQTT client creates many subscriptions containing
wildcards. This could be the case for an MQTT client that acts as a
bridge or proxy or dispatcher: It could subscribe via a wildcard for
each of its own clients.
2. Instead of interatively matching the topic of the received message
against all topic filters that contain wildcards, a better approach
would be for every MQTT subscriber connection process to maintain a
local trie datastructure (similar to how topic exchanges are
implemented) and perform matching therefore more efficiently.
However, this does not sound optimal either because routing is
effectively performed twice: in the topic exchange and again against
a much smaller trie in each destination connection process.
3. Given that the topic exchange already perform routing, a much more
sensible way would be to send the matched binding key(s) to the
destination MQTT connection process. A subscription (topic filter)
maps to a binding key in AMQP 0.9.1 routing. Therefore, for the first
time in RabbitMQ, the routing function should not only output a list
of unique destination queues, but also the binding keys (subscriptions)
that caused the message to be routed to the destination queue.
This commit therefore implements the 3rd approach.
The downside of the 3rd approach is that it requires API changes to the
routing function and topic exchange.
Specifically, this commit adds a new function rabbit_exchange:route/3
that accepts a list of routing options. If that list contains version 2,
the caller of the routing function knows how to handle the return value
that could also contain binding keys.
This commits allows an MQTT connection process, the channel process, and
at-most-once dead lettering to handle binding keys. Binding keys are
included as AMQP 0.9.1 headers into the basic message.
Therefore, whenever a message is sent from an MQTT client or AMQP 0.9.1
client or AMQP 1.0 client or STOMP client, the MQTT receiver will know
the subscription identifier that caused the message to be received.
Note that due to the low number of allowed wildcard characters (# and
+), the cardinality of matched binding keys shouldn't be high even if
the topic contains for example 3 levels and the message is sent to for
example 5 million destination queues. In other words, sending multiple
distinct basic messages to the destination shouldn't hurt the delegate
optimisation too much. The delegate optimisation implemented for classic
queues and rabbit_mqtt_qos0_queue(s) still takes place for all basic
messages that contain the same set of matched binding keys.
The topic exchange returns all matched binding keys by remembering the
edges walked down to the leaves. As an optimisation, only for MQTT
queues are binding keys being returned. This does add a small dependency
from app rabbit to app rabbitmq_mqtt which is not optimal. However, this
dependency should be simple to remove when omitting this optimisation.
Another important feature of this commit is persisting subscription
options and subscription identifiers because they are part of the
MQTT 5.0 session state.
In MQTT v3 and v4, the only subscription information that were part of
the session state was the topic filter and the QoS level.
Both information were implicitly stored in the form of bindings:
The topic filter as the binding key and the QoS level as the destination
queue name of the binding.
For MQTT v5 we need to persist more subscription information.
From a domain perspective, it makes sense to store subscription options
as part of subscriptions, i.e. bindings, even though they are currently
not used in routing.
Therefore, this commits stores subscription options as binding arguments.
Storing subscription options as binding arguments comes in turn with
new challenges: How to handle mixed version clusters and upgrading an
MQTT session from v3 or v4 to v5?
Imagine an MQTT client connects via v5 with Session Expiry Interval > 0
to a new node in a mixed version cluster, creates a subscription,
disconnects, and subsequently connects via v3 to an old node. The
client should continue to receive messages.
To simplify such edge cases, this commit introduces a new feature flag
called mqtt_v5. If mqtt_v5 is disabled, clients cannot connect to
RabbitMQ via MQTT 5.0.
This still doesn't entirely solve the problem of MQTT session upgrades
(v4 to v5 client) or session downgrades (v5 to v4 client).
Ideally, once mqtt_v5 is enabled, all MQTT bindings contain non-empty binding
arguments. However, this will require a feature flag migration function
to modify all MQTT bindings. To be more precise, all MQTT bindings need
to be deleted and added because the binding argument is part of the
Mnesia table key.
Since feature flag migration functions are non-trivial to implement in
RabbitMQ (they can run on every node multiple times and concurrently),
this commit takes a simpler approach:
All v3 / v4 sessions keep the empty binding argument [].
All v5 sessions use the new binding argument [#mqtt_subscription_opts{}].
This requires only handling a session upgrade / downgrade by
creating a binding (with the new binding arg) and deleting the old
binding (with the old binding arg) when processing the CONNECT packet.
Note that such session upgrades or downgrades should be rather rare in
practice. Therefore these binding transactions shouldn't hurt peformance.
The No Local option is implemented within the MQTT publishing connection
process: The message is not sent to the MQTT destination if the
destination queue name matches the current MQTT client ID and the
message was routed due to a subscription that has the No Local flag set.
This avoids unnecessary traffic on the MQTT queue.
The alternative would have been that the "receiving side" (same process)
filters the message out - which would have been more consistent in how
Retain As Published and Subscription Identifiers are implemented, but
would have caused unnecessary load on the MQTT queue.
Allow Session Expiry Interval to be changed when client DISCONNECTs.
Deprecate config subscription_ttl in favour of max_session_expiry_interval_secs
because the Session Expiry Interval will also apply to publishers that
connect with a will message and will delay interval.
"The additional session state of an MQTT v5 server includes:
* The Will Message and the Will Delay Interval
* If the Session is currently not connected, the time at which the Session
will end and Session State will be discarded."
The Session Expiry Interval picked by the server and sent to the client
in the CONNACK is the minimum of max_session_expiry_interval_secs and
the requested Session Expiry Interval by the client in CONNECT.
This commit favours dynamically changing the queue argument x-expires
over creating millions of different policies since that many policies
will come with new scalability issues.
Dynamically changing queue arguments is not allowed by AMQP 0.9.1
clients. However, it should be perfectly okay for the MQTT plugin to do
so for the queues it manages. MQTT clients are not aware that these
queues exist.
MQTT v5 spec:
"If the current retained message for a Topic expires, it is discarded
and there will be no retained message for that topic."
This commit also supports Message Expiry Interval for retained messages
when a node is restarted.
Therefore, the insertion timestamp needs to be stored on disk.
Upon recovery, the Erlang timers are re-created.
This commit does not yet implement Message Expiry Interval of
* retained messages: "If the current retained message for a Topic
expires, it is discarded and there will be no retained message for
that topic."
"Allow the Client and Server to independently specify the maximum
packet size they support. It is an error for the session partner
to send a larger packet."
This commit implements the part where the Server specifies the maximum
packet size.
"In the case of an error in a CONNECT packet it MAY send a CONNACK
packet containing the Reason Code, before closing the Network
Connection. In the case of an error in any other packet it SHOULD send a
DISCONNECT packet containing the Reason Code before closing the Network
Connection."
This commit implements only the "SHOULD" (second) part, not the "MAY"
(first) part.
There are now 2 different global wide MQTT settings on the server:
1. max_packet_size_unauthenticated which applies to the CONNECT packet
(and maybe AUTH packet in the future)
2. max_packet_size_authenticated which applies to all other MQTT
packets (that is, after the client successfully authenticated).
These two settings will apply to all MQTT versions.
In MQTT v5, if a non-CONNECT packet is too large, the server will send a
DISCONNECT packet to the client with Reason Code "Packet Too Large"
before closing the network connection.
"Allow the Client and Server to independently specify the maximum
packet size they support. It is an error for the session partner
to send a larger packet."
This commit implements the part where the Client specifies the maximum
packet size.
As per protocol spec, instead of sending, the server drops the MQTT packet
if it's too large.
A debug message is logged for "infrequent" packet types.
For PUBLISH packets, the messages is rejected to the queue such that it
will be dead lettered, if dead lettering is configured.
At the very least, Prometheus metrics for dead lettered messages will
be increased, even if dead lettering is not configured.
As requested in https://github.com/rabbitmq/rabbitmq-server/discussions/6331#discussioncomment-5796154
include all infos that were emitted in the MQTT connection created event also
in the MQTT connection closed event.
This ensures infos such as MQTT client ID are part of the connection
closed event.
Therefore, it's easy for the user to correlate between the two event
types.
Note that the MQTT plugin emits connection created and connection closed events only if
the CONNECT packet was successfully processed, i.e.authentication was successful.
Remove the disconnected_at property because it was never used.
rabbit_event already adds a timestamp to any event.
Up to 3.11.x an MQTT client ID is tracked in Ra
as a list of bytes as returned by binary_to_list/1 in
48467d6e12/deps/rabbitmq_mqtt/src/rabbit_mqtt_frame.erl (L137)
This has two downsides:
1. Lists consume more memory than binaries (when tracking many clients).
2. It violates the MQTT spec which states
"The ClientId MUST be a UTF-8 encoded string as defined in Section 1.5.3 [MQTT-3.1.3-4]." [v4 3.1.3.1]
Therefore, the original idea was to always store MQTT client IDs as
binaries starting with Native MQTT in 3.12.
However, this leads to client ID tracking misbehaving in mixed version
clusters since new nodes would register client IDs as binaries and old
nodes would register client IDs as lists. This means that a client
registering on a new node with the same client ID as a connection to the
old node did not terminate the connection on the old node.
Therefore, for backwards compatibility, we leave the client ID as a list of bytes
in the Ra machine state because the feature flag delete_ra_cluster_mqtt_node
introduced in v3.12 will delete the Ra cluster anyway and
the new client ID tracking via pg local will store client IDs as
binaries.
An interesting side note learned here is that the compiled file
rabbit_mqtt_collector must not be changed. This commit only modifies
function specs. However as soon as the compiled code is changed, this
module becomes a new version. The new version causes the anonymous ra query
function to fail in mixed clusters: When the old node does a
ra:leader_query where the leader is on the new node, the query function
fails on the new node with `badfun` because the new node does not have
the same module version. For more context, read:
https://web.archive.org/web/20181017104411/http://www.javalimit.com/2010/05/passing-funs-to-other-erlang-nodes.html
Instead of using atom `undefined`, use an integer as correlation term
when sending the will message to destination queues.
Classic queue clients for example expect a non negative integer.
Quorum queues expect any term.
AMQP 0.9.1 header x-mqtt-dup was determined by the incoming MQTT PUBLISH
packet's DUP flag. Its only use was to determine the outgoing MQTT
PUBLISH packet's DUP flag. However, that's wrong behaviour because
the MQTT 3.1.1 protocol spec mandates:
"The value of the DUP flag from an incoming PUBLISH packet is not
propagated when the PUBLISH Packet is sent to subscribers by the Server.
The DUP flag in the outgoing PUBLISH packet is set independently to the
incoming PUBLISH packet, its value MUST be determined solely by whether
the outgoing PUBLISH packet is a retransmission."
[MQTT-3.3.1-3]
Native MQTT fixes this wrong behaviour. Therefore, we can delete this
AMQP 0.9.1 header.
This commit is pure refactoring making the code base more maintainable.
Replace rabbit_misc:pipeline/3 with the new OTP 25 experimental maybe
expression because
"Frequent ways in which people work with sequences of failable
operations include folds over lists of functions, and abusing list
comprehensions. Both patterns have heavy weaknesses that makes them less
than ideal."
https://www.erlang.org/eeps/eep-0049#obsoleting-messy-patterns
Additionally, this commit is more restrictive in the type spec of
rabbit_mqtt_processor state fields.
Specifically, many fields were defined to be `undefined | T` where
`undefined` was only temporarily until the first CONNECT packet was
processed by the processor.
It's better to initialise the MQTT processor upon first CONNECT packet
because there is no point in having a processor without having received
any packet.
This allows many type specs in the processor to change from `undefined |
T` to just `T`.
Additionally, memory is saved by removing the `received_connect_packet`
field from the `rabbit_mqtt_reader` and `rabbit_web_mqtt_handler`.
Topic, username, and password are parsed as binaries.
Storing topics as lists or converting between
lists and binaries back and forth several times is
unnecessary and expensive.
1. Allow to inspect an (web) MQTT connection.
2. Show MQTT client ID on connection page as part of client_properties.
3. Handle force_event_refresh (when management_plugin gets enabled
after (web) MQTT connections got created).
4. Reduce code duplication between protocol readers.
5. Display '?' instead of 'NaN' in UI for absent queue metrics.
6. Allow an (web) MQTT connection to be closed via management_plugin.
For 6. this commit takes the same approach as already done for the stream
plugin:
The stream plugin registers neither with {type, network} nor {type,
direct}.
We cannot use gen_server:call/3 anymore to close the connection
because the web MQTT connection cannot handle gen_server calls (only
casts).
Strictly speaking, this commit requires a feature flag to allow to force
closing stream connections from the management plugin during a rolling
update. However, given that this is rather an edge case, and there is a
workaround (connect to the node directly hosting the stream connection),
this commit will not introduce a new feature flag.
- it also returns {stop, disconnect, state()} when receiving
a disconnect packet
- remove match for a {timeout, _} return when calling register_client.
register_client only returns {ok, _} and {error, _} according to its
function spec
Traditionally, queue types implement flow control by keeping state in
both sending and receiving Erlang processes (for example credit based flow
control keeps the number of credits within the process dictionary).
The rabbit_mqtt_qos0_queue cannot keep such state in sending or receiving
Erlang process because such state would consume a large amount of memory
in case of large fan-ins or large fan-outs.
The whole idea of the rabbit_mqtt_qos_queue type is to not keep any
state in the rabbit_queue_type client. This makes this new queue
type scale so well.
Therefore the new queue type cannot (easily) implement flow control
throttling individual senders.
In this commit, we take a different approach:
Instead of implementing flow control throttling individual senders,
the receiving MQTT connection process drops QoS 0 messages from the
rabbit_mqtt_qos_queue if it is overflowed with messages AND its MQTT
client is not capable of receiving messages fast enough.
This is a simple and sufficient solution because it's better to drop QoS
0 (at most once) messages instead of causing cluster-wide memory alarms.
The user can opt out of dropping messages by setting the new env setting
mailbox_soft_limit to 0.
Additionally, we reduce the send_timeout from 30 seconds default in
Ranch to 15 seconds default in MQTT. This will detect hanging MQTT
clients faster causing the MQTT connection to be closed.
When the MQTT connection receives an AMQP 0.9.1 message, it will contain
a list of payload fragments.
This commit avoids the expensive operation of turning that list into a binary.
All I/O methods accept iodata():
* erlang:port_command/2
* ssl:send/2
* In Web MQTT, cowboy websockets accept iodata():
0d04cfffa3/src/cow_ws.erl (L58)
The MQTT protocol specs define the term "MQTT Control Packet".
The MQTT specs never talk about "frame".
Let's reflect this naming in the source code since things get confusing
otherwise:
Packets belong to MQTT.
Frames belong to AMQP 0.9.1 or web sockets.
MQTT spec only talks about "Packet Identifier",
but never about "Message Identitier".
RabbitMQ has message identifiers (for example the classic queue store
uses message identifiers to uniquely identify internal messages).
So, let's not confuse these two terms and be specific.
Record #state{} is purely local to rabbit_mqtt_reader.
Record #proc_state{} is purely local to rabbit_mqtt_processor.
Therefore, move these record definitions to the defining module.
This avoids unnecessarily exposing internal information.
Now, that #proc_state{} is defined in rabbit_mqtt_processor,
rename #proc_state to #state{}.
1. The mqtt_qos0 queue type uses now QName in the delivery.
This makes the code simpler although it might be a bit less efficient
because the tuple containing binaries is sent around and a hash is
computed within rabbit_queue_type:module/2
2. Do not construct a new binary on every PUBACK. This is expensive with
many PUBACKs per second. Instead, we store the QoS1 queue name in the
process state (but only if the connection also consumes from that
queue).
3. To make the code more readable, and less specialised, always handle
queue actions when we call rabbit_queue_type:settle/5.
This method only returns an action (delivery) when settling to the stream
queue, which the MQTT plugin never does because an MQTT connection
does not consume from a stream. It's not expensive at all to handle
an empty list of queue actions.
- subscriptions information can be retrieved directly from mnesia
- when unsubscribe, we check if there is binding between topic name
and queue (check for both qos0 queue name and qos1 queue name) to
unbind
- added a boolean value has_subs in proc state which will indicate
if connection has any active subscriptions. Used for setting consumer
global counter
- protocols are set to mqtt301 or mqtt311 depends
on protocal version set by client connection
- added boolean isPublisher in proc state to track
if connection was ever used to publish. This is used to
set publisher_create and publisher_delete global counters.
- tests added in integration_SUITE