rabbitmq-server/deps/rabbitmq_mqtt/test/command_SUITE.erl

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

171 lines
5.9 KiB
Erlang
Raw Permalink Normal View History

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
2016-12-06 22:24:33 +08:00
%%
2024-01-02 11:02:20 +08:00
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
2016-12-06 22:24:33 +08:00
2016-12-06 22:24:33 +08:00
-module(command_SUITE).
-compile([export_all, nowarn_export_all]).
2016-12-06 22:24:33 +08:00
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_mqtt.hrl").
Support MQTT 5.0 features No Local, RAP, Subscription IDs Support subscription options "No Local" and "Retain As Published" as well as Subscription Identifiers. All three MQTT 5.0 features can be set on a per subscription basis. Due to wildcards in topic filters, multiple subscriptions can match a given topic. Therefore, to implement Retain As Published and Subscription Identifiers, the destination MQTT connection process needs to know what subscription(s) caused it to receive the message. There are a few ways how this could be implemented: 1. The destination MQTT connection process is aware of all its subscriptions. Whenever, it receives a message, it can match the message's routing key / topic against all its known topic filters. However, to iteratively match the routing key against all topic filters for every received message can become very expensive in the worst case when the MQTT client creates many subscriptions containing wildcards. This could be the case for an MQTT client that acts as a bridge or proxy or dispatcher: It could subscribe via a wildcard for each of its own clients. 2. Instead of interatively matching the topic of the received message against all topic filters that contain wildcards, a better approach would be for every MQTT subscriber connection process to maintain a local trie datastructure (similar to how topic exchanges are implemented) and perform matching therefore more efficiently. However, this does not sound optimal either because routing is effectively performed twice: in the topic exchange and again against a much smaller trie in each destination connection process. 3. Given that the topic exchange already perform routing, a much more sensible way would be to send the matched binding key(s) to the destination MQTT connection process. A subscription (topic filter) maps to a binding key in AMQP 0.9.1 routing. Therefore, for the first time in RabbitMQ, the routing function should not only output a list of unique destination queues, but also the binding keys (subscriptions) that caused the message to be routed to the destination queue. This commit therefore implements the 3rd approach. The downside of the 3rd approach is that it requires API changes to the routing function and topic exchange. Specifically, this commit adds a new function rabbit_exchange:route/3 that accepts a list of routing options. If that list contains version 2, the caller of the routing function knows how to handle the return value that could also contain binding keys. This commits allows an MQTT connection process, the channel process, and at-most-once dead lettering to handle binding keys. Binding keys are included as AMQP 0.9.1 headers into the basic message. Therefore, whenever a message is sent from an MQTT client or AMQP 0.9.1 client or AMQP 1.0 client or STOMP client, the MQTT receiver will know the subscription identifier that caused the message to be received. Note that due to the low number of allowed wildcard characters (# and +), the cardinality of matched binding keys shouldn't be high even if the topic contains for example 3 levels and the message is sent to for example 5 million destination queues. In other words, sending multiple distinct basic messages to the destination shouldn't hurt the delegate optimisation too much. The delegate optimisation implemented for classic queues and rabbit_mqtt_qos0_queue(s) still takes place for all basic messages that contain the same set of matched binding keys. The topic exchange returns all matched binding keys by remembering the edges walked down to the leaves. As an optimisation, only for MQTT queues are binding keys being returned. This does add a small dependency from app rabbit to app rabbitmq_mqtt which is not optimal. However, this dependency should be simple to remove when omitting this optimisation. Another important feature of this commit is persisting subscription options and subscription identifiers because they are part of the MQTT 5.0 session state. In MQTT v3 and v4, the only subscription information that were part of the session state was the topic filter and the QoS level. Both information were implicitly stored in the form of bindings: The topic filter as the binding key and the QoS level as the destination queue name of the binding. For MQTT v5 we need to persist more subscription information. From a domain perspective, it makes sense to store subscription options as part of subscriptions, i.e. bindings, even though they are currently not used in routing. Therefore, this commits stores subscription options as binding arguments. Storing subscription options as binding arguments comes in turn with new challenges: How to handle mixed version clusters and upgrading an MQTT session from v3 or v4 to v5? Imagine an MQTT client connects via v5 with Session Expiry Interval > 0 to a new node in a mixed version cluster, creates a subscription, disconnects, and subsequently connects via v3 to an old node. The client should continue to receive messages. To simplify such edge cases, this commit introduces a new feature flag called mqtt_v5. If mqtt_v5 is disabled, clients cannot connect to RabbitMQ via MQTT 5.0. This still doesn't entirely solve the problem of MQTT session upgrades (v4 to v5 client) or session downgrades (v5 to v4 client). Ideally, once mqtt_v5 is enabled, all MQTT bindings contain non-empty binding arguments. However, this will require a feature flag migration function to modify all MQTT bindings. To be more precise, all MQTT bindings need to be deleted and added because the binding argument is part of the Mnesia table key. Since feature flag migration functions are non-trivial to implement in RabbitMQ (they can run on every node multiple times and concurrently), this commit takes a simpler approach: All v3 / v4 sessions keep the empty binding argument []. All v5 sessions use the new binding argument [#mqtt_subscription_opts{}]. This requires only handling a session upgrade / downgrade by creating a binding (with the new binding arg) and deleting the old binding (with the old binding arg) when processing the CONNECT packet. Note that such session upgrades or downgrades should be rather rare in practice. Therefore these binding transactions shouldn't hurt peformance. The No Local option is implemented within the MQTT publishing connection process: The message is not sent to the MQTT destination if the destination queue name matches the current MQTT client ID and the message was routed due to a subscription that has the No Local flag set. This avoids unnecessary traffic on the MQTT queue. The alternative would have been that the "receiving side" (same process) filters the message out - which would have been more consistent in how Retain As Published and Subscription Identifiers are implemented, but would have caused unnecessary load on the MQTT queue.
2023-04-19 21:32:34 +08:00
-import(util, [connect/3, connect/4]).
2016-12-06 22:24:33 +08:00
-define(COMMAND, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand').
all() ->
[
Support MQTT 5.0 features No Local, RAP, Subscription IDs Support subscription options "No Local" and "Retain As Published" as well as Subscription Identifiers. All three MQTT 5.0 features can be set on a per subscription basis. Due to wildcards in topic filters, multiple subscriptions can match a given topic. Therefore, to implement Retain As Published and Subscription Identifiers, the destination MQTT connection process needs to know what subscription(s) caused it to receive the message. There are a few ways how this could be implemented: 1. The destination MQTT connection process is aware of all its subscriptions. Whenever, it receives a message, it can match the message's routing key / topic against all its known topic filters. However, to iteratively match the routing key against all topic filters for every received message can become very expensive in the worst case when the MQTT client creates many subscriptions containing wildcards. This could be the case for an MQTT client that acts as a bridge or proxy or dispatcher: It could subscribe via a wildcard for each of its own clients. 2. Instead of interatively matching the topic of the received message against all topic filters that contain wildcards, a better approach would be for every MQTT subscriber connection process to maintain a local trie datastructure (similar to how topic exchanges are implemented) and perform matching therefore more efficiently. However, this does not sound optimal either because routing is effectively performed twice: in the topic exchange and again against a much smaller trie in each destination connection process. 3. Given that the topic exchange already perform routing, a much more sensible way would be to send the matched binding key(s) to the destination MQTT connection process. A subscription (topic filter) maps to a binding key in AMQP 0.9.1 routing. Therefore, for the first time in RabbitMQ, the routing function should not only output a list of unique destination queues, but also the binding keys (subscriptions) that caused the message to be routed to the destination queue. This commit therefore implements the 3rd approach. The downside of the 3rd approach is that it requires API changes to the routing function and topic exchange. Specifically, this commit adds a new function rabbit_exchange:route/3 that accepts a list of routing options. If that list contains version 2, the caller of the routing function knows how to handle the return value that could also contain binding keys. This commits allows an MQTT connection process, the channel process, and at-most-once dead lettering to handle binding keys. Binding keys are included as AMQP 0.9.1 headers into the basic message. Therefore, whenever a message is sent from an MQTT client or AMQP 0.9.1 client or AMQP 1.0 client or STOMP client, the MQTT receiver will know the subscription identifier that caused the message to be received. Note that due to the low number of allowed wildcard characters (# and +), the cardinality of matched binding keys shouldn't be high even if the topic contains for example 3 levels and the message is sent to for example 5 million destination queues. In other words, sending multiple distinct basic messages to the destination shouldn't hurt the delegate optimisation too much. The delegate optimisation implemented for classic queues and rabbit_mqtt_qos0_queue(s) still takes place for all basic messages that contain the same set of matched binding keys. The topic exchange returns all matched binding keys by remembering the edges walked down to the leaves. As an optimisation, only for MQTT queues are binding keys being returned. This does add a small dependency from app rabbit to app rabbitmq_mqtt which is not optimal. However, this dependency should be simple to remove when omitting this optimisation. Another important feature of this commit is persisting subscription options and subscription identifiers because they are part of the MQTT 5.0 session state. In MQTT v3 and v4, the only subscription information that were part of the session state was the topic filter and the QoS level. Both information were implicitly stored in the form of bindings: The topic filter as the binding key and the QoS level as the destination queue name of the binding. For MQTT v5 we need to persist more subscription information. From a domain perspective, it makes sense to store subscription options as part of subscriptions, i.e. bindings, even though they are currently not used in routing. Therefore, this commits stores subscription options as binding arguments. Storing subscription options as binding arguments comes in turn with new challenges: How to handle mixed version clusters and upgrading an MQTT session from v3 or v4 to v5? Imagine an MQTT client connects via v5 with Session Expiry Interval > 0 to a new node in a mixed version cluster, creates a subscription, disconnects, and subsequently connects via v3 to an old node. The client should continue to receive messages. To simplify such edge cases, this commit introduces a new feature flag called mqtt_v5. If mqtt_v5 is disabled, clients cannot connect to RabbitMQ via MQTT 5.0. This still doesn't entirely solve the problem of MQTT session upgrades (v4 to v5 client) or session downgrades (v5 to v4 client). Ideally, once mqtt_v5 is enabled, all MQTT bindings contain non-empty binding arguments. However, this will require a feature flag migration function to modify all MQTT bindings. To be more precise, all MQTT bindings need to be deleted and added because the binding argument is part of the Mnesia table key. Since feature flag migration functions are non-trivial to implement in RabbitMQ (they can run on every node multiple times and concurrently), this commit takes a simpler approach: All v3 / v4 sessions keep the empty binding argument []. All v5 sessions use the new binding argument [#mqtt_subscription_opts{}]. This requires only handling a session upgrade / downgrade by creating a binding (with the new binding arg) and deleting the old binding (with the old binding arg) when processing the CONNECT packet. Note that such session upgrades or downgrades should be rather rare in practice. Therefore these binding transactions shouldn't hurt peformance. The No Local option is implemented within the MQTT publishing connection process: The message is not sent to the MQTT destination if the destination queue name matches the current MQTT client ID and the message was routed due to a subscription that has the No Local flag set. This avoids unnecessary traffic on the MQTT queue. The alternative would have been that the "receiving side" (same process) filters the message out - which would have been more consistent in how Retain As Published and Subscription Identifiers are implemented, but would have caused unnecessary load on the MQTT queue.
2023-04-19 21:32:34 +08:00
{group, unit},
2023-02-22 02:24:15 +08:00
{group, v4},
{group, v5}
2016-12-06 22:24:33 +08:00
].
groups() ->
[
Support MQTT 5.0 features No Local, RAP, Subscription IDs Support subscription options "No Local" and "Retain As Published" as well as Subscription Identifiers. All three MQTT 5.0 features can be set on a per subscription basis. Due to wildcards in topic filters, multiple subscriptions can match a given topic. Therefore, to implement Retain As Published and Subscription Identifiers, the destination MQTT connection process needs to know what subscription(s) caused it to receive the message. There are a few ways how this could be implemented: 1. The destination MQTT connection process is aware of all its subscriptions. Whenever, it receives a message, it can match the message's routing key / topic against all its known topic filters. However, to iteratively match the routing key against all topic filters for every received message can become very expensive in the worst case when the MQTT client creates many subscriptions containing wildcards. This could be the case for an MQTT client that acts as a bridge or proxy or dispatcher: It could subscribe via a wildcard for each of its own clients. 2. Instead of interatively matching the topic of the received message against all topic filters that contain wildcards, a better approach would be for every MQTT subscriber connection process to maintain a local trie datastructure (similar to how topic exchanges are implemented) and perform matching therefore more efficiently. However, this does not sound optimal either because routing is effectively performed twice: in the topic exchange and again against a much smaller trie in each destination connection process. 3. Given that the topic exchange already perform routing, a much more sensible way would be to send the matched binding key(s) to the destination MQTT connection process. A subscription (topic filter) maps to a binding key in AMQP 0.9.1 routing. Therefore, for the first time in RabbitMQ, the routing function should not only output a list of unique destination queues, but also the binding keys (subscriptions) that caused the message to be routed to the destination queue. This commit therefore implements the 3rd approach. The downside of the 3rd approach is that it requires API changes to the routing function and topic exchange. Specifically, this commit adds a new function rabbit_exchange:route/3 that accepts a list of routing options. If that list contains version 2, the caller of the routing function knows how to handle the return value that could also contain binding keys. This commits allows an MQTT connection process, the channel process, and at-most-once dead lettering to handle binding keys. Binding keys are included as AMQP 0.9.1 headers into the basic message. Therefore, whenever a message is sent from an MQTT client or AMQP 0.9.1 client or AMQP 1.0 client or STOMP client, the MQTT receiver will know the subscription identifier that caused the message to be received. Note that due to the low number of allowed wildcard characters (# and +), the cardinality of matched binding keys shouldn't be high even if the topic contains for example 3 levels and the message is sent to for example 5 million destination queues. In other words, sending multiple distinct basic messages to the destination shouldn't hurt the delegate optimisation too much. The delegate optimisation implemented for classic queues and rabbit_mqtt_qos0_queue(s) still takes place for all basic messages that contain the same set of matched binding keys. The topic exchange returns all matched binding keys by remembering the edges walked down to the leaves. As an optimisation, only for MQTT queues are binding keys being returned. This does add a small dependency from app rabbit to app rabbitmq_mqtt which is not optimal. However, this dependency should be simple to remove when omitting this optimisation. Another important feature of this commit is persisting subscription options and subscription identifiers because they are part of the MQTT 5.0 session state. In MQTT v3 and v4, the only subscription information that were part of the session state was the topic filter and the QoS level. Both information were implicitly stored in the form of bindings: The topic filter as the binding key and the QoS level as the destination queue name of the binding. For MQTT v5 we need to persist more subscription information. From a domain perspective, it makes sense to store subscription options as part of subscriptions, i.e. bindings, even though they are currently not used in routing. Therefore, this commits stores subscription options as binding arguments. Storing subscription options as binding arguments comes in turn with new challenges: How to handle mixed version clusters and upgrading an MQTT session from v3 or v4 to v5? Imagine an MQTT client connects via v5 with Session Expiry Interval > 0 to a new node in a mixed version cluster, creates a subscription, disconnects, and subsequently connects via v3 to an old node. The client should continue to receive messages. To simplify such edge cases, this commit introduces a new feature flag called mqtt_v5. If mqtt_v5 is disabled, clients cannot connect to RabbitMQ via MQTT 5.0. This still doesn't entirely solve the problem of MQTT session upgrades (v4 to v5 client) or session downgrades (v5 to v4 client). Ideally, once mqtt_v5 is enabled, all MQTT bindings contain non-empty binding arguments. However, this will require a feature flag migration function to modify all MQTT bindings. To be more precise, all MQTT bindings need to be deleted and added because the binding argument is part of the Mnesia table key. Since feature flag migration functions are non-trivial to implement in RabbitMQ (they can run on every node multiple times and concurrently), this commit takes a simpler approach: All v3 / v4 sessions keep the empty binding argument []. All v5 sessions use the new binding argument [#mqtt_subscription_opts{}]. This requires only handling a session upgrade / downgrade by creating a binding (with the new binding arg) and deleting the old binding (with the old binding arg) when processing the CONNECT packet. Note that such session upgrades or downgrades should be rather rare in practice. Therefore these binding transactions shouldn't hurt peformance. The No Local option is implemented within the MQTT publishing connection process: The message is not sent to the MQTT destination if the destination queue name matches the current MQTT client ID and the message was routed due to a subscription that has the No Local flag set. This avoids unnecessary traffic on the MQTT queue. The alternative would have been that the "receiving side" (same process) filters the message out - which would have been more consistent in how Retain As Published and Subscription Identifiers are implemented, but would have caused unnecessary load on the MQTT queue.
2023-04-19 21:32:34 +08:00
{unit, [], [merge_defaults]},
{v4, [], [run]},
{v5, [], [run,
user_property]}
2016-12-06 22:24:33 +08:00
].
suite() ->
[
{timetrap, {minutes, 10}}
].
2016-12-06 22:24:33 +08:00
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, ?MODULE},
{rmq_extra_tcp_ports, [tcp_port_mqtt_extra,
tcp_port_mqtt_tls_extra]},
{rmq_nodes_clustered, true},
{rmq_nodes_count, 3},
{start_rmq_with_plugins_disabled, true}
2016-12-06 22:24:33 +08:00
]),
Config2 = rabbit_ct_helpers:run_setup_steps(Config1,
2016-12-06 22:24:33 +08:00
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
util:enable_plugin(Config2, rabbitmq_mqtt),
Config2.
2016-12-06 22:24:33 +08:00
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
Support MQTT 5.0 features No Local, RAP, Subscription IDs Support subscription options "No Local" and "Retain As Published" as well as Subscription Identifiers. All three MQTT 5.0 features can be set on a per subscription basis. Due to wildcards in topic filters, multiple subscriptions can match a given topic. Therefore, to implement Retain As Published and Subscription Identifiers, the destination MQTT connection process needs to know what subscription(s) caused it to receive the message. There are a few ways how this could be implemented: 1. The destination MQTT connection process is aware of all its subscriptions. Whenever, it receives a message, it can match the message's routing key / topic against all its known topic filters. However, to iteratively match the routing key against all topic filters for every received message can become very expensive in the worst case when the MQTT client creates many subscriptions containing wildcards. This could be the case for an MQTT client that acts as a bridge or proxy or dispatcher: It could subscribe via a wildcard for each of its own clients. 2. Instead of interatively matching the topic of the received message against all topic filters that contain wildcards, a better approach would be for every MQTT subscriber connection process to maintain a local trie datastructure (similar to how topic exchanges are implemented) and perform matching therefore more efficiently. However, this does not sound optimal either because routing is effectively performed twice: in the topic exchange and again against a much smaller trie in each destination connection process. 3. Given that the topic exchange already perform routing, a much more sensible way would be to send the matched binding key(s) to the destination MQTT connection process. A subscription (topic filter) maps to a binding key in AMQP 0.9.1 routing. Therefore, for the first time in RabbitMQ, the routing function should not only output a list of unique destination queues, but also the binding keys (subscriptions) that caused the message to be routed to the destination queue. This commit therefore implements the 3rd approach. The downside of the 3rd approach is that it requires API changes to the routing function and topic exchange. Specifically, this commit adds a new function rabbit_exchange:route/3 that accepts a list of routing options. If that list contains version 2, the caller of the routing function knows how to handle the return value that could also contain binding keys. This commits allows an MQTT connection process, the channel process, and at-most-once dead lettering to handle binding keys. Binding keys are included as AMQP 0.9.1 headers into the basic message. Therefore, whenever a message is sent from an MQTT client or AMQP 0.9.1 client or AMQP 1.0 client or STOMP client, the MQTT receiver will know the subscription identifier that caused the message to be received. Note that due to the low number of allowed wildcard characters (# and +), the cardinality of matched binding keys shouldn't be high even if the topic contains for example 3 levels and the message is sent to for example 5 million destination queues. In other words, sending multiple distinct basic messages to the destination shouldn't hurt the delegate optimisation too much. The delegate optimisation implemented for classic queues and rabbit_mqtt_qos0_queue(s) still takes place for all basic messages that contain the same set of matched binding keys. The topic exchange returns all matched binding keys by remembering the edges walked down to the leaves. As an optimisation, only for MQTT queues are binding keys being returned. This does add a small dependency from app rabbit to app rabbitmq_mqtt which is not optimal. However, this dependency should be simple to remove when omitting this optimisation. Another important feature of this commit is persisting subscription options and subscription identifiers because they are part of the MQTT 5.0 session state. In MQTT v3 and v4, the only subscription information that were part of the session state was the topic filter and the QoS level. Both information were implicitly stored in the form of bindings: The topic filter as the binding key and the QoS level as the destination queue name of the binding. For MQTT v5 we need to persist more subscription information. From a domain perspective, it makes sense to store subscription options as part of subscriptions, i.e. bindings, even though they are currently not used in routing. Therefore, this commits stores subscription options as binding arguments. Storing subscription options as binding arguments comes in turn with new challenges: How to handle mixed version clusters and upgrading an MQTT session from v3 or v4 to v5? Imagine an MQTT client connects via v5 with Session Expiry Interval > 0 to a new node in a mixed version cluster, creates a subscription, disconnects, and subsequently connects via v3 to an old node. The client should continue to receive messages. To simplify such edge cases, this commit introduces a new feature flag called mqtt_v5. If mqtt_v5 is disabled, clients cannot connect to RabbitMQ via MQTT 5.0. This still doesn't entirely solve the problem of MQTT session upgrades (v4 to v5 client) or session downgrades (v5 to v4 client). Ideally, once mqtt_v5 is enabled, all MQTT bindings contain non-empty binding arguments. However, this will require a feature flag migration function to modify all MQTT bindings. To be more precise, all MQTT bindings need to be deleted and added because the binding argument is part of the Mnesia table key. Since feature flag migration functions are non-trivial to implement in RabbitMQ (they can run on every node multiple times and concurrently), this commit takes a simpler approach: All v3 / v4 sessions keep the empty binding argument []. All v5 sessions use the new binding argument [#mqtt_subscription_opts{}]. This requires only handling a session upgrade / downgrade by creating a binding (with the new binding arg) and deleting the old binding (with the old binding arg) when processing the CONNECT packet. Note that such session upgrades or downgrades should be rather rare in practice. Therefore these binding transactions shouldn't hurt peformance. The No Local option is implemented within the MQTT publishing connection process: The message is not sent to the MQTT destination if the destination queue name matches the current MQTT client ID and the message was routed due to a subscription that has the No Local flag set. This avoids unnecessary traffic on the MQTT queue. The alternative would have been that the "receiving side" (same process) filters the message out - which would have been more consistent in how Retain As Published and Subscription Identifiers are implemented, but would have caused unnecessary load on the MQTT queue.
2023-04-19 21:32:34 +08:00
init_per_group(unit, Config) ->
Config;
2023-02-22 02:24:15 +08:00
init_per_group(Group, Config) ->
rabbit_ct_helpers:set_config(Config, {mqtt_version, Group}).
2016-12-06 22:24:33 +08:00
end_per_group(_, Config) ->
Config.
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
merge_defaults(_Config) ->
{[<<"client_id">>, <<"conn_name">>], #{verbose := false}} =
?COMMAND:merge_defaults([], #{}),
{[<<"other_key">>], #{verbose := true}} =
?COMMAND:merge_defaults([<<"other_key">>], #{verbose => true}),
{[<<"other_key">>], #{verbose := false}} =
?COMMAND:merge_defaults([<<"other_key">>], #{verbose => false}).
2016-12-06 22:24:33 +08:00
run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts = #{node => Node, timeout => 10_000, verbose => false},
2016-12-06 22:24:33 +08:00
2016-12-06 23:06:44 +08:00
%% No connections
2016-12-06 22:24:33 +08:00
[] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)),
%% Open a connection
C1 = connect(<<"simpleClient">>, Config, [{ack_timeout, 1}]),
timer:sleep(100),
2016-12-06 22:24:33 +08:00
[[{client_id, <<"simpleClient">>}]] =
'Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>], Opts)),
2016-12-06 22:24:33 +08:00
C2 = connect(<<"simpleClient1">>, Config, [{ack_timeout, 1}]),
timer:sleep(200),
2016-12-06 22:24:33 +08:00
[[{client_id, <<"simpleClient">>}, {user, <<"guest">>}],
[{client_id, <<"simpleClient1">>}, {user, <<"guest">>}]] =
lists:sort(
'Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>, <<"user">>],
Opts))),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
start_amqp_connection(network, Node, Port),
%% There are still just two MQTT connections
2016-12-06 22:24:33 +08:00
[[{client_id, <<"simpleClient">>}],
[{client_id, <<"simpleClient1">>}]] =
lists:sort('Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>], Opts))),
start_amqp_connection(direct, Node, Port),
timer:sleep(200),
2016-12-06 22:24:33 +08:00
%% Still two MQTT connections
?assertEqual(
[[{client_id, <<"simpleClient">>}],
[{client_id, <<"simpleClient1">>}]],
lists:sort('Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>], Opts)))),
2016-12-06 22:24:33 +08:00
%% Verbose returns all keys
AllKeys = lists:map(fun(I) -> atom_to_binary(I) end, ?INFO_ITEMS),
[AllInfos1Con1, _AllInfos1Con2] = 'Elixir.Enum':to_list(?COMMAND:run(AllKeys, Opts)),
[AllInfos2Con1, _AllInfos2Con2] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts#{verbose => true})),
2016-12-06 22:24:33 +08:00
%% Keys are INFO_ITEMS
InfoItemsSorted = lists:sort(?INFO_ITEMS),
?assertEqual(InfoItemsSorted, lists:sort(proplists:get_keys(AllInfos1Con1))),
?assertEqual(InfoItemsSorted, lists:sort(proplists:get_keys(AllInfos2Con1))),
2016-12-06 22:24:33 +08:00
Support MQTT 5.0 features No Local, RAP, Subscription IDs Support subscription options "No Local" and "Retain As Published" as well as Subscription Identifiers. All three MQTT 5.0 features can be set on a per subscription basis. Due to wildcards in topic filters, multiple subscriptions can match a given topic. Therefore, to implement Retain As Published and Subscription Identifiers, the destination MQTT connection process needs to know what subscription(s) caused it to receive the message. There are a few ways how this could be implemented: 1. The destination MQTT connection process is aware of all its subscriptions. Whenever, it receives a message, it can match the message's routing key / topic against all its known topic filters. However, to iteratively match the routing key against all topic filters for every received message can become very expensive in the worst case when the MQTT client creates many subscriptions containing wildcards. This could be the case for an MQTT client that acts as a bridge or proxy or dispatcher: It could subscribe via a wildcard for each of its own clients. 2. Instead of interatively matching the topic of the received message against all topic filters that contain wildcards, a better approach would be for every MQTT subscriber connection process to maintain a local trie datastructure (similar to how topic exchanges are implemented) and perform matching therefore more efficiently. However, this does not sound optimal either because routing is effectively performed twice: in the topic exchange and again against a much smaller trie in each destination connection process. 3. Given that the topic exchange already perform routing, a much more sensible way would be to send the matched binding key(s) to the destination MQTT connection process. A subscription (topic filter) maps to a binding key in AMQP 0.9.1 routing. Therefore, for the first time in RabbitMQ, the routing function should not only output a list of unique destination queues, but also the binding keys (subscriptions) that caused the message to be routed to the destination queue. This commit therefore implements the 3rd approach. The downside of the 3rd approach is that it requires API changes to the routing function and topic exchange. Specifically, this commit adds a new function rabbit_exchange:route/3 that accepts a list of routing options. If that list contains version 2, the caller of the routing function knows how to handle the return value that could also contain binding keys. This commits allows an MQTT connection process, the channel process, and at-most-once dead lettering to handle binding keys. Binding keys are included as AMQP 0.9.1 headers into the basic message. Therefore, whenever a message is sent from an MQTT client or AMQP 0.9.1 client or AMQP 1.0 client or STOMP client, the MQTT receiver will know the subscription identifier that caused the message to be received. Note that due to the low number of allowed wildcard characters (# and +), the cardinality of matched binding keys shouldn't be high even if the topic contains for example 3 levels and the message is sent to for example 5 million destination queues. In other words, sending multiple distinct basic messages to the destination shouldn't hurt the delegate optimisation too much. The delegate optimisation implemented for classic queues and rabbit_mqtt_qos0_queue(s) still takes place for all basic messages that contain the same set of matched binding keys. The topic exchange returns all matched binding keys by remembering the edges walked down to the leaves. As an optimisation, only for MQTT queues are binding keys being returned. This does add a small dependency from app rabbit to app rabbitmq_mqtt which is not optimal. However, this dependency should be simple to remove when omitting this optimisation. Another important feature of this commit is persisting subscription options and subscription identifiers because they are part of the MQTT 5.0 session state. In MQTT v3 and v4, the only subscription information that were part of the session state was the topic filter and the QoS level. Both information were implicitly stored in the form of bindings: The topic filter as the binding key and the QoS level as the destination queue name of the binding. For MQTT v5 we need to persist more subscription information. From a domain perspective, it makes sense to store subscription options as part of subscriptions, i.e. bindings, even though they are currently not used in routing. Therefore, this commits stores subscription options as binding arguments. Storing subscription options as binding arguments comes in turn with new challenges: How to handle mixed version clusters and upgrading an MQTT session from v3 or v4 to v5? Imagine an MQTT client connects via v5 with Session Expiry Interval > 0 to a new node in a mixed version cluster, creates a subscription, disconnects, and subsequently connects via v3 to an old node. The client should continue to receive messages. To simplify such edge cases, this commit introduces a new feature flag called mqtt_v5. If mqtt_v5 is disabled, clients cannot connect to RabbitMQ via MQTT 5.0. This still doesn't entirely solve the problem of MQTT session upgrades (v4 to v5 client) or session downgrades (v5 to v4 client). Ideally, once mqtt_v5 is enabled, all MQTT bindings contain non-empty binding arguments. However, this will require a feature flag migration function to modify all MQTT bindings. To be more precise, all MQTT bindings need to be deleted and added because the binding argument is part of the Mnesia table key. Since feature flag migration functions are non-trivial to implement in RabbitMQ (they can run on every node multiple times and concurrently), this commit takes a simpler approach: All v3 / v4 sessions keep the empty binding argument []. All v5 sessions use the new binding argument [#mqtt_subscription_opts{}]. This requires only handling a session upgrade / downgrade by creating a binding (with the new binding arg) and deleting the old binding (with the old binding arg) when processing the CONNECT packet. Note that such session upgrades or downgrades should be rather rare in practice. Therefore these binding transactions shouldn't hurt peformance. The No Local option is implemented within the MQTT publishing connection process: The message is not sent to the MQTT destination if the destination queue name matches the current MQTT client ID and the message was routed due to a subscription that has the No Local flag set. This avoids unnecessary traffic on the MQTT queue. The alternative would have been that the "receiving side" (same process) filters the message out - which would have been more consistent in how Retain As Published and Subscription Identifiers are implemented, but would have caused unnecessary load on the MQTT queue.
2023-04-19 21:32:34 +08:00
%% CLI command should list MQTT connections from all nodes.
C3 = connect(<<"simpleClient2">>, Config, 1, [{ack_timeout, 1}]),
rabbit_ct_helpers:eventually(
?_assertEqual(
[[{client_id, <<"simpleClient">>}],
[{client_id, <<"simpleClient1">>}],
[{client_id, <<"simpleClient2">>}]],
lists:sort('Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>], Opts))))),
ok = emqtt:disconnect(C1),
Support MQTT 5.0 features No Local, RAP, Subscription IDs Support subscription options "No Local" and "Retain As Published" as well as Subscription Identifiers. All three MQTT 5.0 features can be set on a per subscription basis. Due to wildcards in topic filters, multiple subscriptions can match a given topic. Therefore, to implement Retain As Published and Subscription Identifiers, the destination MQTT connection process needs to know what subscription(s) caused it to receive the message. There are a few ways how this could be implemented: 1. The destination MQTT connection process is aware of all its subscriptions. Whenever, it receives a message, it can match the message's routing key / topic against all its known topic filters. However, to iteratively match the routing key against all topic filters for every received message can become very expensive in the worst case when the MQTT client creates many subscriptions containing wildcards. This could be the case for an MQTT client that acts as a bridge or proxy or dispatcher: It could subscribe via a wildcard for each of its own clients. 2. Instead of interatively matching the topic of the received message against all topic filters that contain wildcards, a better approach would be for every MQTT subscriber connection process to maintain a local trie datastructure (similar to how topic exchanges are implemented) and perform matching therefore more efficiently. However, this does not sound optimal either because routing is effectively performed twice: in the topic exchange and again against a much smaller trie in each destination connection process. 3. Given that the topic exchange already perform routing, a much more sensible way would be to send the matched binding key(s) to the destination MQTT connection process. A subscription (topic filter) maps to a binding key in AMQP 0.9.1 routing. Therefore, for the first time in RabbitMQ, the routing function should not only output a list of unique destination queues, but also the binding keys (subscriptions) that caused the message to be routed to the destination queue. This commit therefore implements the 3rd approach. The downside of the 3rd approach is that it requires API changes to the routing function and topic exchange. Specifically, this commit adds a new function rabbit_exchange:route/3 that accepts a list of routing options. If that list contains version 2, the caller of the routing function knows how to handle the return value that could also contain binding keys. This commits allows an MQTT connection process, the channel process, and at-most-once dead lettering to handle binding keys. Binding keys are included as AMQP 0.9.1 headers into the basic message. Therefore, whenever a message is sent from an MQTT client or AMQP 0.9.1 client or AMQP 1.0 client or STOMP client, the MQTT receiver will know the subscription identifier that caused the message to be received. Note that due to the low number of allowed wildcard characters (# and +), the cardinality of matched binding keys shouldn't be high even if the topic contains for example 3 levels and the message is sent to for example 5 million destination queues. In other words, sending multiple distinct basic messages to the destination shouldn't hurt the delegate optimisation too much. The delegate optimisation implemented for classic queues and rabbit_mqtt_qos0_queue(s) still takes place for all basic messages that contain the same set of matched binding keys. The topic exchange returns all matched binding keys by remembering the edges walked down to the leaves. As an optimisation, only for MQTT queues are binding keys being returned. This does add a small dependency from app rabbit to app rabbitmq_mqtt which is not optimal. However, this dependency should be simple to remove when omitting this optimisation. Another important feature of this commit is persisting subscription options and subscription identifiers because they are part of the MQTT 5.0 session state. In MQTT v3 and v4, the only subscription information that were part of the session state was the topic filter and the QoS level. Both information were implicitly stored in the form of bindings: The topic filter as the binding key and the QoS level as the destination queue name of the binding. For MQTT v5 we need to persist more subscription information. From a domain perspective, it makes sense to store subscription options as part of subscriptions, i.e. bindings, even though they are currently not used in routing. Therefore, this commits stores subscription options as binding arguments. Storing subscription options as binding arguments comes in turn with new challenges: How to handle mixed version clusters and upgrading an MQTT session from v3 or v4 to v5? Imagine an MQTT client connects via v5 with Session Expiry Interval > 0 to a new node in a mixed version cluster, creates a subscription, disconnects, and subsequently connects via v3 to an old node. The client should continue to receive messages. To simplify such edge cases, this commit introduces a new feature flag called mqtt_v5. If mqtt_v5 is disabled, clients cannot connect to RabbitMQ via MQTT 5.0. This still doesn't entirely solve the problem of MQTT session upgrades (v4 to v5 client) or session downgrades (v5 to v4 client). Ideally, once mqtt_v5 is enabled, all MQTT bindings contain non-empty binding arguments. However, this will require a feature flag migration function to modify all MQTT bindings. To be more precise, all MQTT bindings need to be deleted and added because the binding argument is part of the Mnesia table key. Since feature flag migration functions are non-trivial to implement in RabbitMQ (they can run on every node multiple times and concurrently), this commit takes a simpler approach: All v3 / v4 sessions keep the empty binding argument []. All v5 sessions use the new binding argument [#mqtt_subscription_opts{}]. This requires only handling a session upgrade / downgrade by creating a binding (with the new binding arg) and deleting the old binding (with the old binding arg) when processing the CONNECT packet. Note that such session upgrades or downgrades should be rather rare in practice. Therefore these binding transactions shouldn't hurt peformance. The No Local option is implemented within the MQTT publishing connection process: The message is not sent to the MQTT destination if the destination queue name matches the current MQTT client ID and the message was routed due to a subscription that has the No Local flag set. This avoids unnecessary traffic on the MQTT queue. The alternative would have been that the "receiving side" (same process) filters the message out - which would have been more consistent in how Retain As Published and Subscription Identifiers are implemented, but would have caused unnecessary load on the MQTT queue.
2023-04-19 21:32:34 +08:00
ok = emqtt:disconnect(C2),
ok = emqtt:disconnect(C3).
2016-12-06 22:24:33 +08:00
user_property(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts = #{node => Node, timeout => 10_000, verbose => false},
ClientId = <<"my-client">>,
UserProp = [{<<"name 1">>, <<"value 1">>},
{<<"name 2">>, <<"value 2">>},
%% "The same name is allowed to appear more than once." [v5 3.1.2.11.8]
{<<"name 2">>, <<"value 3">>}],
C = connect(ClientId, Config, 1, [{properties, #{'User-Property' => UserProp}}]),
rabbit_ct_helpers:eventually(
?_assertEqual(
[[{client_id, ClientId},
{user_property, UserProp}]],
'Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>, <<"user_property">>], Opts)))),
ok = emqtt:disconnect(C).
2016-12-06 22:24:33 +08:00
start_amqp_connection(Type, Node, Port) ->
2017-02-24 01:33:42 +08:00
amqp_connection:start(amqp_params(Type, Node, Port)).
2016-12-06 22:24:33 +08:00
amqp_params(network, _, Port) ->
#amqp_params_network{port = Port};
amqp_params(direct, Node, _) ->
#amqp_params_direct{node = Node}.