2020-07-14 00:39:36 +08:00
|
|
|
%% This Source Code Form is subject to the terms of the Mozilla Public
|
|
|
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
|
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
|
|
%%
|
2024-01-02 11:02:20 +08:00
|
|
|
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
2020-07-14 00:39:36 +08:00
|
|
|
%%
|
2016-12-06 18:03:02 +08:00
|
|
|
-module(retainer_SUITE).
|
2022-09-26 16:48:44 +08:00
|
|
|
-compile([export_all, nowarn_export_all]).
|
2016-12-06 18:03:02 +08:00
|
|
|
|
|
|
|
-include_lib("common_test/include/ct.hrl").
|
2023-03-04 00:09:36 +08:00
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
-import(util, [connect/2, connect/3,
|
|
|
|
expect_publishes/3,
|
|
|
|
assert_message_expiry_interval/2
|
|
|
|
]).
|
2016-12-06 18:03:02 +08:00
|
|
|
|
|
|
|
all() ->
|
|
|
|
[
|
2023-02-22 02:24:15 +08:00
|
|
|
{group, v4},
|
|
|
|
{group, v5}
|
2016-12-06 18:03:02 +08:00
|
|
|
].
|
|
|
|
|
|
|
|
groups() ->
|
|
|
|
[
|
2023-02-22 02:24:15 +08:00
|
|
|
{v4, [], sub_groups()},
|
|
|
|
{v5, [], sub_groups()}
|
|
|
|
].
|
|
|
|
|
|
|
|
sub_groups() ->
|
|
|
|
[
|
|
|
|
{dets, [shuffle], tests()},
|
|
|
|
{ets, [shuffle], tests()},
|
|
|
|
{noop, [shuffle], [does_not_retain]}
|
2022-11-14 18:37:41 +08:00
|
|
|
].
|
|
|
|
|
|
|
|
tests() ->
|
|
|
|
[
|
|
|
|
coerce_configuration_data,
|
|
|
|
should_translate_amqp2mqtt_on_publish,
|
|
|
|
should_translate_amqp2mqtt_on_retention,
|
2023-03-01 00:27:27 +08:00
|
|
|
should_translate_amqp2mqtt_on_retention_search,
|
2023-03-04 00:09:36 +08:00
|
|
|
recover,
|
|
|
|
recover_with_message_expiry_interval
|
2016-12-06 18:03:02 +08:00
|
|
|
].
|
|
|
|
|
|
|
|
suite() ->
|
Support MQTT 5.0 features No Local, RAP, Subscription IDs
Support subscription options "No Local" and "Retain As Published"
as well as Subscription Identifiers.
All three MQTT 5.0 features can be set on a per subscription basis.
Due to wildcards in topic filters, multiple subscriptions
can match a given topic. Therefore, to implement Retain As Published and
Subscription Identifiers, the destination MQTT connection process needs
to know what subscription(s) caused it to receive the message.
There are a few ways how this could be implemented:
1. The destination MQTT connection process is aware of all its
subscriptions. Whenever, it receives a message, it can match the
message's routing key / topic against all its known topic filters.
However, to iteratively match the routing key against all topic
filters for every received message can become very expensive in the
worst case when the MQTT client creates many subscriptions containing
wildcards. This could be the case for an MQTT client that acts as a
bridge or proxy or dispatcher: It could subscribe via a wildcard for
each of its own clients.
2. Instead of interatively matching the topic of the received message
against all topic filters that contain wildcards, a better approach
would be for every MQTT subscriber connection process to maintain a
local trie datastructure (similar to how topic exchanges are
implemented) and perform matching therefore more efficiently.
However, this does not sound optimal either because routing is
effectively performed twice: in the topic exchange and again against
a much smaller trie in each destination connection process.
3. Given that the topic exchange already perform routing, a much more
sensible way would be to send the matched binding key(s) to the
destination MQTT connection process. A subscription (topic filter)
maps to a binding key in AMQP 0.9.1 routing. Therefore, for the first
time in RabbitMQ, the routing function should not only output a list
of unique destination queues, but also the binding keys (subscriptions)
that caused the message to be routed to the destination queue.
This commit therefore implements the 3rd approach.
The downside of the 3rd approach is that it requires API changes to the
routing function and topic exchange.
Specifically, this commit adds a new function rabbit_exchange:route/3
that accepts a list of routing options. If that list contains version 2,
the caller of the routing function knows how to handle the return value
that could also contain binding keys.
This commits allows an MQTT connection process, the channel process, and
at-most-once dead lettering to handle binding keys. Binding keys are
included as AMQP 0.9.1 headers into the basic message.
Therefore, whenever a message is sent from an MQTT client or AMQP 0.9.1
client or AMQP 1.0 client or STOMP client, the MQTT receiver will know
the subscription identifier that caused the message to be received.
Note that due to the low number of allowed wildcard characters (# and
+), the cardinality of matched binding keys shouldn't be high even if
the topic contains for example 3 levels and the message is sent to for
example 5 million destination queues. In other words, sending multiple
distinct basic messages to the destination shouldn't hurt the delegate
optimisation too much. The delegate optimisation implemented for classic
queues and rabbit_mqtt_qos0_queue(s) still takes place for all basic
messages that contain the same set of matched binding keys.
The topic exchange returns all matched binding keys by remembering the
edges walked down to the leaves. As an optimisation, only for MQTT
queues are binding keys being returned. This does add a small dependency
from app rabbit to app rabbitmq_mqtt which is not optimal. However, this
dependency should be simple to remove when omitting this optimisation.
Another important feature of this commit is persisting subscription
options and subscription identifiers because they are part of the
MQTT 5.0 session state.
In MQTT v3 and v4, the only subscription information that were part of
the session state was the topic filter and the QoS level.
Both information were implicitly stored in the form of bindings:
The topic filter as the binding key and the QoS level as the destination
queue name of the binding.
For MQTT v5 we need to persist more subscription information.
From a domain perspective, it makes sense to store subscription options
as part of subscriptions, i.e. bindings, even though they are currently
not used in routing.
Therefore, this commits stores subscription options as binding arguments.
Storing subscription options as binding arguments comes in turn with
new challenges: How to handle mixed version clusters and upgrading an
MQTT session from v3 or v4 to v5?
Imagine an MQTT client connects via v5 with Session Expiry Interval > 0
to a new node in a mixed version cluster, creates a subscription,
disconnects, and subsequently connects via v3 to an old node. The
client should continue to receive messages.
To simplify such edge cases, this commit introduces a new feature flag
called mqtt_v5. If mqtt_v5 is disabled, clients cannot connect to
RabbitMQ via MQTT 5.0.
This still doesn't entirely solve the problem of MQTT session upgrades
(v4 to v5 client) or session downgrades (v5 to v4 client).
Ideally, once mqtt_v5 is enabled, all MQTT bindings contain non-empty binding
arguments. However, this will require a feature flag migration function
to modify all MQTT bindings. To be more precise, all MQTT bindings need
to be deleted and added because the binding argument is part of the
Mnesia table key.
Since feature flag migration functions are non-trivial to implement in
RabbitMQ (they can run on every node multiple times and concurrently),
this commit takes a simpler approach:
All v3 / v4 sessions keep the empty binding argument [].
All v5 sessions use the new binding argument [#mqtt_subscription_opts{}].
This requires only handling a session upgrade / downgrade by
creating a binding (with the new binding arg) and deleting the old
binding (with the old binding arg) when processing the CONNECT packet.
Note that such session upgrades or downgrades should be rather rare in
practice. Therefore these binding transactions shouldn't hurt peformance.
The No Local option is implemented within the MQTT publishing connection
process: The message is not sent to the MQTT destination if the
destination queue name matches the current MQTT client ID and the
message was routed due to a subscription that has the No Local flag set.
This avoids unnecessary traffic on the MQTT queue.
The alternative would have been that the "receiving side" (same process)
filters the message out - which would have been more consistent in how
Retain As Published and Subscription Identifiers are implemented, but
would have caused unnecessary load on the MQTT queue.
2023-04-19 21:32:34 +08:00
|
|
|
[{timetrap, {minutes, 2}}].
|
2016-12-06 18:03:02 +08:00
|
|
|
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
%% Testsuite setup/teardown.
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
|
|
|
|
init_per_suite(Config) ->
|
|
|
|
rabbit_ct_helpers:log_environment(),
|
2022-11-14 18:37:41 +08:00
|
|
|
Config.
|
2016-12-06 18:03:02 +08:00
|
|
|
|
|
|
|
end_per_suite(Config) ->
|
|
|
|
Config.
|
|
|
|
|
2023-02-22 02:24:15 +08:00
|
|
|
init_per_group(G, Config)
|
|
|
|
when G =:= v4;
|
|
|
|
G =:= v5 ->
|
|
|
|
rabbit_ct_helpers:set_config(Config, {mqtt_version, G});
|
2022-11-14 18:37:41 +08:00
|
|
|
init_per_group(Group, Config0) ->
|
2023-02-22 02:24:15 +08:00
|
|
|
Suffix = rabbit_ct_helpers:testcase_absname(Config0, "", "-"),
|
2024-07-09 18:30:47 +08:00
|
|
|
Config = rabbit_ct_helpers:set_config(
|
2025-06-05 21:02:58 +08:00
|
|
|
Config0, [{rmq_nodename_suffix, Suffix},
|
|
|
|
{start_rmq_with_plugins_disabled, true}
|
|
|
|
]),
|
2022-11-14 18:37:41 +08:00
|
|
|
Mod = list_to_atom("rabbit_mqtt_retained_msg_store_" ++ atom_to_list(Group)),
|
|
|
|
Env = [{rabbitmq_mqtt, [{retained_message_store, Mod}]},
|
|
|
|
{rabbit, [
|
|
|
|
{default_user, "guest"},
|
|
|
|
{default_pass, "guest"},
|
|
|
|
{default_vhost, "/"},
|
|
|
|
{default_permissions, [".*", ".*", ".*"]}
|
|
|
|
]}],
|
2025-06-05 21:02:58 +08:00
|
|
|
Config1 = rabbit_ct_helpers:run_setup_steps(
|
|
|
|
Config,
|
|
|
|
[fun(Conf) -> rabbit_ct_helpers:merge_app_env(Conf, Env) end] ++
|
|
|
|
rabbit_ct_broker_helpers:setup_steps() ++
|
|
|
|
rabbit_ct_client_helpers:setup_steps()),
|
|
|
|
util:enable_plugin(Config1, rabbitmq_mqtt),
|
|
|
|
Config1.
|
2022-11-14 18:37:41 +08:00
|
|
|
|
2023-02-22 02:24:15 +08:00
|
|
|
end_per_group(G, Config)
|
|
|
|
when G =:= v4;
|
|
|
|
G =:= v5 ->
|
|
|
|
Config;
|
2016-12-06 18:03:02 +08:00
|
|
|
end_per_group(_, Config) ->
|
2022-11-14 18:37:41 +08:00
|
|
|
rabbit_ct_helpers:run_teardown_steps(
|
|
|
|
Config,
|
|
|
|
rabbit_ct_client_helpers:teardown_steps() ++
|
|
|
|
rabbit_ct_broker_helpers:teardown_steps()).
|
2016-12-06 18:03:02 +08:00
|
|
|
|
2023-03-04 00:09:36 +08:00
|
|
|
init_per_testcase(recover_with_message_expiry_interval = T, Config) ->
|
|
|
|
case ?config(mqtt_version, Config) of
|
|
|
|
v4 ->
|
|
|
|
{skip, "Message Expiry Interval not supported in MQTT v4"};
|
|
|
|
v5 ->
|
|
|
|
rabbit_ct_helpers:testcase_started(Config, T)
|
|
|
|
end;
|
2016-12-06 18:03:02 +08:00
|
|
|
init_per_testcase(Testcase, Config) ->
|
|
|
|
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
|
|
|
|
|
|
|
end_per_testcase(Testcase, Config) ->
|
|
|
|
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
|
|
|
|
2023-01-28 02:25:57 +08:00
|
|
|
|
2016-12-06 18:03:02 +08:00
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
%% Testsuite cases
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
|
|
|
|
coerce_configuration_data(Config) ->
|
2022-11-29 19:41:50 +08:00
|
|
|
C = connect(<<"simpleClientRetainer">>, Config, [{ack_timeout, 1}]),
|
2016-12-06 18:03:02 +08:00
|
|
|
|
2022-08-30 16:24:40 +08:00
|
|
|
{ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos0),
|
|
|
|
ok = emqtt:publish(C, <<"TopicA">>, <<"Payload">>),
|
2023-01-03 01:02:43 +08:00
|
|
|
ok = expect_publishes(C, <<"TopicA">>, [<<"Payload">>]),
|
2016-12-06 18:03:02 +08:00
|
|
|
|
2022-08-30 16:24:40 +08:00
|
|
|
ok = emqtt:disconnect(C).
|
2016-12-06 18:03:02 +08:00
|
|
|
|
2019-02-01 13:21:10 +08:00
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
%% When a client is subscribed to TopicA/Device.Field and another
|
|
|
|
%% client publishes to TopicA/Device.Field the client should be
|
|
|
|
%% sent messages for the translated topic (TopicA/Device/Field)
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
should_translate_amqp2mqtt_on_publish(Config) ->
|
2022-11-29 19:41:50 +08:00
|
|
|
C = connect(<<"simpleClientRetainer">>, Config, [{ack_timeout, 1}]),
|
2019-02-04 00:43:44 +08:00
|
|
|
%% there's an active consumer
|
2022-08-30 16:24:40 +08:00
|
|
|
{ok, _, _} = emqtt:subscribe(C, <<"TopicA/Device.Field">>, qos1),
|
|
|
|
ok = emqtt:publish(C, <<"TopicA/Device.Field">>, #{}, <<"Payload">>, [{retain, true}]),
|
2023-01-03 01:02:43 +08:00
|
|
|
ok = expect_publishes(C, <<"TopicA/Device/Field">>, [<<"Payload">>]),
|
2022-08-30 16:24:40 +08:00
|
|
|
ok = emqtt:disconnect(C).
|
2019-02-01 13:21:10 +08:00
|
|
|
|
|
|
|
%% -------------------------------------------------------------------
|
2022-08-30 16:24:40 +08:00
|
|
|
%% If a client publishes a retained message to TopicA/Device.Field and another
|
2019-02-01 13:21:10 +08:00
|
|
|
%% client subscribes to TopicA/Device.Field the client should be
|
|
|
|
%% sent the retained message for the translated topic (TopicA/Device/Field)
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
should_translate_amqp2mqtt_on_retention(Config) ->
|
2022-11-29 19:41:50 +08:00
|
|
|
C = connect(<<"simpleClientRetainer">>, Config, [{ack_timeout, 1}]),
|
2019-02-04 00:43:44 +08:00
|
|
|
%% publish with retain = true before a consumer comes around
|
2022-08-30 16:24:40 +08:00
|
|
|
ok = emqtt:publish(C, <<"TopicA/Device.Field">>, #{}, <<"Payload">>, [{retain, true}]),
|
|
|
|
{ok, _, _} = emqtt:subscribe(C, <<"TopicA/Device.Field">>, qos1),
|
2023-01-03 01:02:43 +08:00
|
|
|
ok = expect_publishes(C, <<"TopicA/Device/Field">>, [<<"Payload">>]),
|
2022-08-30 16:24:40 +08:00
|
|
|
ok = emqtt:disconnect(C).
|
2019-02-01 13:21:10 +08:00
|
|
|
|
|
|
|
%% -------------------------------------------------------------------
|
2022-08-30 16:24:40 +08:00
|
|
|
%% If a client publishes a retained message to TopicA/Device.Field and another
|
2019-02-04 00:43:44 +08:00
|
|
|
%% client subscribes to TopicA/Device/Field the client should be
|
2019-02-01 13:21:10 +08:00
|
|
|
%% sent retained message for the translated topic (TopicA/Device/Field)
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
should_translate_amqp2mqtt_on_retention_search(Config) ->
|
2022-11-29 19:41:50 +08:00
|
|
|
C = connect(<<"simpleClientRetainer">>, Config, [{ack_timeout, 1}]),
|
2022-08-30 16:24:40 +08:00
|
|
|
ok = emqtt:publish(C, <<"TopicA/Device.Field">>, #{}, <<"Payload">>, [{retain, true}]),
|
|
|
|
{ok, _, _} = emqtt:subscribe(C, <<"TopicA/Device/Field">>, qos1),
|
2023-01-03 01:02:43 +08:00
|
|
|
ok = expect_publishes(C, <<"TopicA/Device/Field">>, [<<"Payload">>]),
|
2022-08-30 16:24:40 +08:00
|
|
|
ok = emqtt:disconnect(C).
|
|
|
|
|
2022-11-14 18:37:41 +08:00
|
|
|
does_not_retain(Config) ->
|
2022-11-29 19:41:50 +08:00
|
|
|
C = connect(<<"simpleClientRetainer">>, Config, [{ack_timeout, 1}]),
|
2022-11-14 18:37:41 +08:00
|
|
|
ok = emqtt:publish(C, <<"TopicA/Device.Field">>, #{}, <<"Payload">>, [{retain, true}]),
|
|
|
|
{ok, _, _} = emqtt:subscribe(C, <<"TopicA/Device.Field">>, qos1),
|
|
|
|
receive
|
|
|
|
Unexpected ->
|
|
|
|
ct:fail("Unexpected message: ~p", [Unexpected])
|
|
|
|
after 1000 ->
|
|
|
|
ok
|
|
|
|
end,
|
2023-01-03 01:02:43 +08:00
|
|
|
ok = emqtt:disconnect(C).
|
2023-03-01 00:27:27 +08:00
|
|
|
|
|
|
|
recover(Config) ->
|
|
|
|
Topic = Payload = ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
|
|
C1 = connect(ClientId, Config),
|
|
|
|
{ok, _} = emqtt:publish(C1, Topic, Payload, [{retain, true},
|
|
|
|
{qos, 1}]),
|
|
|
|
ok = emqtt:disconnect(C1),
|
|
|
|
ok = rabbit_ct_broker_helpers:restart_node(Config, 0),
|
2025-06-05 21:02:58 +08:00
|
|
|
rabbit_ct_broker_helpers:enable_plugin(Config, 0, rabbitmq_mqtt),
|
2023-03-01 00:27:27 +08:00
|
|
|
C2 = connect(ClientId, Config),
|
|
|
|
{ok, _, _} = emqtt:subscribe(C2, Topic, qos1),
|
|
|
|
ok = expect_publishes(C2, Topic, [Payload]),
|
|
|
|
ok = emqtt:disconnect(C2).
|
2023-03-04 00:09:36 +08:00
|
|
|
|
|
|
|
recover_with_message_expiry_interval(Config) ->
|
|
|
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
|
|
C1 = connect(ClientId, Config),
|
|
|
|
Start = os:system_time(second),
|
|
|
|
{ok, _} = emqtt:publish(C1, <<"topic/1">>,
|
|
|
|
<<"m1">>, [{retain, true}, {qos, 1}]),
|
|
|
|
{ok, _} = emqtt:publish(C1, <<"topic/2">>, #{'Message-Expiry-Interval' => 100},
|
|
|
|
<<"m2">>, [{retain, true}, {qos, 1}]),
|
|
|
|
{ok, _} = emqtt:publish(C1, <<"topic/3">>, #{'Message-Expiry-Interval' => 3},
|
|
|
|
<<"m3">>, [{retain, true}, {qos, 1}]),
|
|
|
|
{ok, _} = emqtt:publish(C1, <<"topic/4">>, #{'Message-Expiry-Interval' => 15},
|
|
|
|
<<"m4">>, [{retain, true}, {qos, 1}]),
|
|
|
|
ok = emqtt:disconnect(C1),
|
|
|
|
%% Takes around 9 seconds on Linux.
|
|
|
|
ok = rabbit_ct_broker_helpers:restart_node(Config, 0),
|
2025-06-05 21:02:58 +08:00
|
|
|
rabbit_ct_broker_helpers:enable_plugin(Config, 0, rabbitmq_mqtt),
|
2023-03-04 00:09:36 +08:00
|
|
|
C2 = connect(ClientId, Config),
|
|
|
|
|
|
|
|
%% Retained message for topic/3 should have expired during node restart.
|
|
|
|
%% Wait for retained message for topic/4 to expire.
|
|
|
|
ElapsedSeconds1 = os:system_time(second) - Start,
|
|
|
|
SleepMs = max(0, timer:seconds(15 - ElapsedSeconds1 + 1)),
|
|
|
|
ct:pal("Sleeping for ~b ms", [SleepMs]),
|
|
|
|
timer:sleep(SleepMs),
|
|
|
|
|
|
|
|
ElapsedSeconds2 = os:system_time(second) - Start,
|
|
|
|
{ok, _, [1,1,1,1]} = emqtt:subscribe(C2, [{<<"topic/1">>, qos1},
|
|
|
|
{<<"topic/2">>, qos1},
|
|
|
|
{<<"topic/3">>, qos1},
|
|
|
|
{<<"topic/4">>, qos1}]),
|
|
|
|
receive {publish, #{client_pid := C2,
|
|
|
|
retain := true,
|
|
|
|
topic := <<"topic/1">>,
|
|
|
|
payload := <<"m1">>,
|
|
|
|
properties := Props}}
|
|
|
|
when map_size(Props) =:= 0 -> ok
|
2024-12-10 23:19:34 +08:00
|
|
|
after 30_000 -> ct:fail("did not topic/1")
|
2023-03-04 00:09:36 +08:00
|
|
|
end,
|
|
|
|
|
|
|
|
receive {publish, #{client_pid := C2,
|
|
|
|
retain := true,
|
|
|
|
topic := <<"topic/2">>,
|
|
|
|
payload := <<"m2">>,
|
|
|
|
properties := #{'Message-Expiry-Interval' := MEI}}} ->
|
|
|
|
assert_message_expiry_interval(100 - ElapsedSeconds2, MEI)
|
2024-12-10 23:19:34 +08:00
|
|
|
after 30_000 -> ct:fail("did not topic/2")
|
2023-03-04 00:09:36 +08:00
|
|
|
end,
|
|
|
|
|
|
|
|
receive Unexpected -> ct:fail("Received unexpectedly: ~p", [Unexpected])
|
|
|
|
after 0 -> ok
|
|
|
|
end,
|
|
|
|
|
|
|
|
ok = emqtt:disconnect(C2).
|