2077 lines
87 KiB
Erlang
2077 lines
87 KiB
Erlang
%% This Source Code Form is subject to the terms of the Mozilla Public
|
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
%%
|
|
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
|
|
|
%% This test suite contains test cases that are shared between (i.e. executed across):
|
|
%% 1. plugins rabbitmq_mqtt and rabbitmq_web_mqtt
|
|
%% 2. MQTT versions v3, v4 and v5
|
|
%%
|
|
%% In other words, this test suite should not contain any test case that is executed
|
|
%% only with a particular plugin or particular MQTT version.
|
|
%%
|
|
%% When adding a test case here the same function must be defined in web_mqtt_shared_SUITE.
|
|
-module(mqtt_shared_SUITE).
|
|
-compile([export_all,
|
|
nowarn_export_all]).
|
|
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
|
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
|
|
-include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl").
|
|
|
|
-import(rabbit_ct_broker_helpers,
|
|
[rabbitmqctl_list/3,
|
|
rabbitmqctl/3,
|
|
rpc/4,
|
|
rpc/5,
|
|
rpc_all/4,
|
|
get_node_config/3,
|
|
drain_node/2,
|
|
revive_node/2,
|
|
await_metadata_store_consistent/2
|
|
]).
|
|
-import(rabbit_ct_helpers,
|
|
[eventually/3,
|
|
eventually/1]).
|
|
-import(util,
|
|
[all_connection_pids/1,
|
|
get_global_counters/1, get_global_counters/2,
|
|
get_global_counters/3, get_global_counters/4,
|
|
expect_publishes/3,
|
|
connect/2, connect/3, connect/4,
|
|
get_events/1, assert_event_type/2, assert_event_prop/2,
|
|
await_exit/1, await_exit/2,
|
|
publish_qos1_timeout/4,
|
|
non_clean_sess_opts/0]).
|
|
-import(rabbit_mgmt_test_util,
|
|
[http_get/2,
|
|
http_delete/3]).
|
|
|
|
%% defined in MQTT v5 (not in v4 or v3)
|
|
-define(RC_SERVER_SHUTTING_DOWN, 16#8B).
|
|
-define(RC_KEEP_ALIVE_TIMEOUT, 16#8D).
|
|
-define(RC_SESSION_TAKEN_OVER, 16#8E).
|
|
-define(TIMEOUT, 30_000).
|
|
|
|
all() ->
|
|
[{group, mqtt}].
|
|
|
|
%% The code being tested under v3 and v4 is almost identical.
|
|
%% To save time in CI, we therefore run only a very small subset of tests in v3.
|
|
groups() ->
|
|
[
|
|
{mqtt, [],
|
|
[{cluster_size_1, [],
|
|
[{v3, [], cluster_size_1_tests_v3()},
|
|
{v4, [], cluster_size_1_tests()},
|
|
{v5, [], cluster_size_1_tests()}]},
|
|
{cluster_size_3, [],
|
|
[{v4, [], cluster_size_3_tests()},
|
|
{v5, [], cluster_size_3_tests()}]}
|
|
]}
|
|
].
|
|
|
|
cluster_size_1_tests_v3() ->
|
|
[global_counters,
|
|
events
|
|
].
|
|
|
|
cluster_size_1_tests() ->
|
|
[
|
|
global_counters %% must be the 1st test case
|
|
,message_size_metrics
|
|
,block_only_publisher
|
|
,many_qos1_messages
|
|
,session_expiry
|
|
,cli_close_all_connections
|
|
,cli_close_all_user_connections
|
|
,management_plugin_connection
|
|
,management_plugin_enable
|
|
,disconnect
|
|
,pubsub_shared_connection
|
|
,pubsub_separate_connections
|
|
,will_with_disconnect
|
|
,will_without_disconnect
|
|
,decode_basic_properties
|
|
,quorum_queue_rejects
|
|
,events
|
|
,internal_event_handler
|
|
,non_clean_sess_reconnect_qos1
|
|
,non_clean_sess_reconnect_qos0
|
|
,non_clean_sess_reconnect_qos0_and_qos1
|
|
,non_clean_sess_empty_client_id
|
|
,subscribe_same_topic_same_qos
|
|
,subscribe_same_topic_different_qos
|
|
,subscribe_multiple
|
|
,large_message_mqtt_to_mqtt
|
|
,large_message_amqp_to_mqtt
|
|
,keepalive
|
|
,keepalive_turned_off
|
|
,block
|
|
,amqp_to_mqtt_qos0
|
|
,clean_session_disconnect_client
|
|
,clean_session_node_restart
|
|
,clean_session_node_kill
|
|
,rabbit_status_connection_count
|
|
,trace
|
|
,trace_large_message
|
|
,max_packet_size_unauthenticated
|
|
,max_packet_size_authenticated
|
|
,default_queue_type
|
|
,message_interceptors
|
|
,utf8
|
|
,retained_message_conversion
|
|
,bind_exchange_to_exchange
|
|
,bind_exchange_to_exchange_single_message
|
|
,notify_consumer_classic_queue_deleted
|
|
,notify_consumer_quorum_queue_deleted
|
|
,notify_consumer_qos0_queue_deleted
|
|
].
|
|
|
|
cluster_size_3_tests() ->
|
|
[
|
|
pubsub,
|
|
queue_down_qos1,
|
|
consuming_classic_queue_down,
|
|
flow_classic_queue,
|
|
flow_quorum_queue,
|
|
flow_stream,
|
|
rabbit_mqtt_qos0_queue,
|
|
rabbit_mqtt_qos0_queue_kill_node,
|
|
cli_list_queues,
|
|
delete_create_queue,
|
|
session_reconnect,
|
|
session_takeover,
|
|
duplicate_client_id,
|
|
publish_to_all_queue_types_qos0,
|
|
publish_to_all_queue_types_qos1,
|
|
maintenance
|
|
].
|
|
|
|
suite() ->
|
|
[{timetrap, {minutes, 10}}].
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Testsuite setup/teardown.
|
|
%% -------------------------------------------------------------------
|
|
|
|
init_per_suite(Config) ->
|
|
rabbit_ct_helpers:log_environment(),
|
|
Config1 = rabbit_ct_helpers:merge_app_env(
|
|
Config, {rabbit, [
|
|
{quorum_tick_interval, 1000},
|
|
{stream_tick_interval, 1000},
|
|
{forced_feature_flags_on_init, []},
|
|
{start_rmq_with_plugins_disabled, true}
|
|
]}),
|
|
rabbit_ct_helpers:run_setup_steps(Config1).
|
|
|
|
end_per_suite(Config) ->
|
|
rabbit_ct_helpers:run_teardown_steps(Config).
|
|
|
|
init_per_group(mqtt, Config0) ->
|
|
rabbit_ct_helpers:set_config(Config0, {websocket, false});
|
|
init_per_group(Group, Config)
|
|
when Group =:= v3;
|
|
Group =:= v4;
|
|
Group =:= v5 ->
|
|
rabbit_ct_helpers:set_config(Config, {mqtt_version, Group});
|
|
|
|
init_per_group(Group, Config0) ->
|
|
Nodes = case Group of
|
|
cluster_size_1 -> 1;
|
|
cluster_size_3 -> 3
|
|
end,
|
|
Suffix = rabbit_ct_helpers:testcase_absname(Config0, "", "-"),
|
|
Config = rabbit_ct_helpers:set_config(
|
|
Config0,
|
|
[{rmq_nodes_count, Nodes},
|
|
{rmq_nodename_suffix, Suffix}]),
|
|
Config1 = rabbit_ct_helpers:run_steps(
|
|
Config,
|
|
rabbit_ct_broker_helpers:setup_steps() ++
|
|
rabbit_ct_client_helpers:setup_steps()),
|
|
util:enable_plugin(Config1, rabbitmq_mqtt),
|
|
Config1.
|
|
|
|
end_per_group(G, Config)
|
|
when G =:= cluster_size_1;
|
|
G =:= cluster_size_3 ->
|
|
rabbit_ct_helpers:run_steps(
|
|
Config,
|
|
rabbit_ct_client_helpers:teardown_steps() ++
|
|
rabbit_ct_broker_helpers:teardown_steps());
|
|
end_per_group(_, Config) ->
|
|
Config.
|
|
|
|
init_per_testcase(T, Config)
|
|
when T =:= management_plugin_connection;
|
|
T =:= management_plugin_enable ->
|
|
inets:start(),
|
|
init_per_testcase0(T, Config);
|
|
init_per_testcase(T, Config)
|
|
when T =:= clean_session_disconnect_client;
|
|
T =:= zero_session_expiry_interval_disconnect_client;
|
|
T =:= clean_session_node_restart;
|
|
T =:= clean_session_node_kill;
|
|
T =:= notify_consumer_qos0_queue_deleted ->
|
|
ok = rpc(Config, rabbit_registry, register, [queue, <<"qos0">>, rabbit_mqtt_qos0_queue]),
|
|
init_per_testcase0(T, Config);
|
|
init_per_testcase(Testcase, Config) ->
|
|
init_per_testcase0(Testcase, Config).
|
|
|
|
init_per_testcase0(Testcase, Config) ->
|
|
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
|
|
|
end_per_testcase(T, Config)
|
|
when T =:= management_plugin_connection;
|
|
T =:= management_plugin_enable ->
|
|
ok = inets:stop(),
|
|
end_per_testcase0(T, Config);
|
|
end_per_testcase(T, Config)
|
|
when T =:= clean_session_disconnect_client;
|
|
T =:= zero_session_expiry_interval_disconnect_client;
|
|
T =:= clean_session_node_restart;
|
|
T =:= clean_session_node_kill;
|
|
T =:= notify_consumer_qos0_queue_deleted ->
|
|
ok = rpc(Config, rabbit_registry, unregister, [queue, <<"qos0">>]),
|
|
end_per_testcase0(T, Config);
|
|
end_per_testcase(Testcase, Config) ->
|
|
end_per_testcase0(Testcase, Config).
|
|
|
|
end_per_testcase0(Testcase, Config) ->
|
|
rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
|
|
%% Assert that every testcase cleaned up their MQTT sessions.
|
|
_ = rpc(Config, ?MODULE, delete_queues, []),
|
|
eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))),
|
|
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
|
|
|
delete_queues() ->
|
|
[catch rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
|
|
|| Q <- rabbit_amqqueue:list()].
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Testsuite cases
|
|
%% -------------------------------------------------------------------
|
|
|
|
disconnect(Config) ->
|
|
C = connect(?FUNCTION_NAME, Config),
|
|
eventually(?_assertEqual(1, length(all_connection_pids(Config)))),
|
|
process_flag(trap_exit, true),
|
|
ok = emqtt:disconnect(C),
|
|
await_exit(C, normal),
|
|
eventually(?_assertEqual([], all_connection_pids(Config))),
|
|
ok.
|
|
|
|
pubsub_shared_connection(Config) ->
|
|
C = connect(?FUNCTION_NAME, Config),
|
|
|
|
Topic = <<"/topic/test-mqtt">>,
|
|
{ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
|
|
|
|
Payload = <<"a\x00a">>,
|
|
?assertMatch({ok, #{packet_id := _,
|
|
reason_code := 0,
|
|
reason_code_name := success
|
|
}},
|
|
emqtt:publish(C, Topic, Payload, [{qos, 1}])),
|
|
ok = expect_publishes(C, Topic, [Payload]),
|
|
ok = emqtt:disconnect(C).
|
|
|
|
pubsub_separate_connections(Config) ->
|
|
Pub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_publisher">>, Config),
|
|
Sub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_subscriber">>, Config),
|
|
|
|
Topic = <<"/topic/test-mqtt">>,
|
|
{ok, _, [1]} = emqtt:subscribe(Sub, Topic, qos1),
|
|
|
|
Payload = <<"a\x00a">>,
|
|
?assertMatch({ok, #{packet_id := _,
|
|
reason_code := 0,
|
|
reason_code_name := success
|
|
}},
|
|
emqtt:publish(Pub, Topic, Payload, [{qos, 1}])),
|
|
ok = expect_publishes(Sub, Topic, [Payload]),
|
|
ok = emqtt:disconnect(Pub),
|
|
ok = emqtt:disconnect(Sub).
|
|
|
|
will_with_disconnect(Config) ->
|
|
LastWillTopic = <<"/topic/last-will">>,
|
|
LastWillMsg = <<"last will message">>,
|
|
Opts = [{will_topic, LastWillTopic},
|
|
{will_payload, LastWillMsg},
|
|
{will_qos, 1}],
|
|
Pub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_publisher">>, Config, Opts),
|
|
Sub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_subscriber">>, Config),
|
|
{ok, _, [1]} = emqtt:subscribe(Sub, LastWillTopic, qos1),
|
|
|
|
%% Client sends DISCONNECT packet. Therefore, will message should not be sent.
|
|
ok = emqtt:disconnect(Pub),
|
|
?assertEqual({publish_not_received, LastWillMsg},
|
|
expect_publishes(Sub, LastWillTopic, [LastWillMsg])),
|
|
|
|
ok = emqtt:disconnect(Sub).
|
|
|
|
will_without_disconnect(Config) ->
|
|
LastWillTopic = <<"/topic/last-will">>,
|
|
LastWillMsg = <<"last will message">>,
|
|
Opts = [{will_topic, LastWillTopic},
|
|
{will_payload, LastWillMsg},
|
|
{will_qos, 1}],
|
|
Pub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_publisher">>, Config, Opts),
|
|
Sub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_subscriber">>, Config),
|
|
{ok, _, [1]} = emqtt:subscribe(Sub, LastWillTopic, qos1),
|
|
|
|
%% Client does not send DISCONNECT packet. Therefore, will message should be sent.
|
|
unlink(Pub),
|
|
erlang:exit(Pub, test_will),
|
|
?assertEqual(ok, expect_publishes(Sub, LastWillTopic, [LastWillMsg])),
|
|
|
|
ok = emqtt:disconnect(Sub).
|
|
|
|
%% Test that an MQTT connection decodes the AMQP 0.9.1 'P_basic' properties.
|
|
%% see https://github.com/rabbitmq/rabbitmq-server/discussions/8252
|
|
decode_basic_properties(Config) ->
|
|
set_durable_queue_type(Config),
|
|
ClientId = Topic = Payload = atom_to_binary(?FUNCTION_NAME),
|
|
C1 = connect(ClientId, Config, non_clean_sess_opts()),
|
|
{ok, _, [1]} = emqtt:subscribe(C1, Topic, qos1),
|
|
QuorumQueues = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_quorum_queue]),
|
|
?assertEqual(1, length(QuorumQueues)),
|
|
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
|
amqp_channel:call(Ch, #'basic.publish'{exchange = <<"amq.topic">>,
|
|
routing_key = Topic},
|
|
#amqp_msg{payload = Payload}),
|
|
ok = expect_publishes(C1, Topic, [Payload]),
|
|
ok = emqtt:disconnect(C1),
|
|
C2 = connect(ClientId, Config, [{clean_start, true}]),
|
|
ok = emqtt:disconnect(C2),
|
|
unset_durable_queue_type(Config),
|
|
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
|
|
|
quorum_queue_rejects(Config) ->
|
|
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
|
Name = atom_to_binary(?FUNCTION_NAME),
|
|
|
|
ok = rabbit_ct_broker_helpers:set_policy(
|
|
Config, 0, <<"qq-policy">>, Name, <<"queues">>, [{<<"max-length">>, 1},
|
|
{<<"overflow">>, <<"reject-publish">>}]),
|
|
declare_queue(Ch, Name, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
|
|
bind(Ch, Name, Name),
|
|
|
|
C = connect(Name, Config, [{retry_interval, 1}]),
|
|
{ok, #{reason_code_name := success}} = emqtt:publish(C, Name, <<"m1">>, qos1),
|
|
{ok, #{reason_code_name := success}} = emqtt:publish(C, Name, <<"m2">>, qos1),
|
|
|
|
%% The queue will reject m3.
|
|
V = ?config(mqtt_version, Config),
|
|
if V =:= v3 orelse V =:= v4 ->
|
|
%% v3 and v4 do not support NACKs. Therefore, the server should drop the message.
|
|
?assertEqual(puback_timeout, util:publish_qos1_timeout(C, Name, <<"m3">>, 700));
|
|
V =:= v5 ->
|
|
%% v5 supports NACKs. Therefore, the server should send us a NACK.
|
|
?assertMatch({ok, #{reason_code_name := implementation_specific_error}},
|
|
emqtt:publish(C, Name, <<"m3">>, qos1))
|
|
end,
|
|
|
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Name})),
|
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Name})),
|
|
if V =:= v3 orelse V =:= v4 ->
|
|
%% m3 is re-sent by emqtt since we didn't receive a PUBACK.
|
|
?awaitMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m3">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Name}),
|
|
2000, 200);
|
|
V =:= v5 ->
|
|
%% m3 should not be re-sent by emqtt since we received a PUBACK.
|
|
?assertMatch(#'basic.get_empty'{},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Name}))
|
|
end,
|
|
|
|
ok = emqtt:disconnect(C),
|
|
delete_queue(Ch, Name),
|
|
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"qq-policy">>).
|
|
|
|
publish_to_all_queue_types_qos0(Config) ->
|
|
publish_to_all_queue_types(Config, qos0).
|
|
|
|
publish_to_all_queue_types_qos1(Config) ->
|
|
publish_to_all_queue_types(Config, qos1).
|
|
|
|
publish_to_all_queue_types(Config, QoS) ->
|
|
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
|
|
|
CQ = <<"classic-queue">>,
|
|
QQ = <<"quorum-queue">>,
|
|
SQ = <<"stream-queue">>,
|
|
Topic = <<"mytopic">>,
|
|
|
|
declare_queue(Ch, CQ, []),
|
|
bind(Ch, CQ, Topic),
|
|
|
|
declare_queue(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
|
|
bind(Ch, QQ, Topic),
|
|
|
|
declare_queue(Ch, SQ, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
|
|
bind(Ch, SQ, Topic),
|
|
|
|
NumMsgs = 1000,
|
|
C = connect(?FUNCTION_NAME, Config, [{max_inflight, 200},
|
|
{retry_interval, 2}]),
|
|
Self = self(),
|
|
lists:foreach(
|
|
fun(N) ->
|
|
%% Publish async all messages at once to trigger flow control
|
|
ok = emqtt:publish_async(C, Topic, integer_to_binary(N), QoS,
|
|
{fun(N0, {ok, #{reason_code_name := success}}) ->
|
|
Self ! {self(), N0};
|
|
(N0, ok) ->
|
|
Self ! {self(), N0}
|
|
end, [N]})
|
|
end, lists:seq(1, NumMsgs)),
|
|
ok = await_confirms_ordered(C, 1, NumMsgs),
|
|
eventually(?_assert(
|
|
begin
|
|
L = rabbitmqctl_list(Config, 0, ["list_queues", "messages", "--no-table-headers"]),
|
|
length(L) =:= 3 andalso
|
|
lists:all(fun([Bin]) ->
|
|
N = binary_to_integer(Bin),
|
|
case QoS of
|
|
qos0 ->
|
|
N =:= NumMsgs;
|
|
qos1 ->
|
|
%% Allow for some duplicates when client resends
|
|
%% a message that gets acked at roughly the same time.
|
|
N >= NumMsgs andalso
|
|
N < NumMsgs * 2
|
|
end
|
|
end, L)
|
|
end), 1000, 20),
|
|
|
|
delete_queue(Ch, [CQ, QQ, SQ]),
|
|
ok = emqtt:disconnect(C),
|
|
?awaitMatch([],
|
|
all_connection_pids(Config), 10_000, 1000),
|
|
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
|
|
|
publish_to_all_non_deprecated_queue_types_qos0(Config) ->
|
|
publish_to_all_non_deprecated_queue_types(Config, qos0).
|
|
|
|
publish_to_all_non_deprecated_queue_types_qos1(Config) ->
|
|
publish_to_all_non_deprecated_queue_types(Config, qos1).
|
|
|
|
publish_to_all_non_deprecated_queue_types(Config, QoS) ->
|
|
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
|
|
|
CQ = <<"classic-queue">>,
|
|
QQ = <<"quorum-queue">>,
|
|
SQ = <<"stream-queue">>,
|
|
Topic = <<"mytopic">>,
|
|
|
|
declare_queue(Ch, CQ, []),
|
|
bind(Ch, CQ, Topic),
|
|
|
|
declare_queue(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
|
|
bind(Ch, QQ, Topic),
|
|
|
|
declare_queue(Ch, SQ, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
|
|
bind(Ch, SQ, Topic),
|
|
|
|
NumMsgs = 2000,
|
|
C = connect(?FUNCTION_NAME, Config, [{retry_interval, 2}]),
|
|
lists:foreach(fun(N) ->
|
|
case emqtt:publish(C, Topic, integer_to_binary(N), QoS) of
|
|
ok ->
|
|
ok;
|
|
{ok, _} ->
|
|
ok;
|
|
Other ->
|
|
ct:fail("Failed to publish: ~p", [Other])
|
|
end
|
|
end, lists:seq(1, NumMsgs)),
|
|
|
|
eventually(?_assert(
|
|
begin
|
|
L = rabbitmqctl_list(Config, 0, ["list_queues", "messages", "--no-table-headers"]),
|
|
length(L) =:= 3 andalso
|
|
lists:all(fun([Bin]) ->
|
|
N = binary_to_integer(Bin),
|
|
case QoS of
|
|
qos0 ->
|
|
N =:= NumMsgs;
|
|
qos1 ->
|
|
%% Allow for some duplicates when client resends
|
|
%% a message that gets acked at roughly the same time.
|
|
N >= NumMsgs andalso
|
|
N < NumMsgs * 2
|
|
end
|
|
end, L)
|
|
end), 2000, 10),
|
|
|
|
delete_queue(Ch, [CQ, QQ, SQ]),
|
|
ok = emqtt:disconnect(C),
|
|
?awaitMatch([],
|
|
all_connection_pids(Config), 10_000, 1000),
|
|
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
|
|
|
%% This test case does not require multiple nodes
|
|
%% but it is grouped together with flow test cases for other queue types
|
|
%% (and historically used to use a mirrored classic queue on multiple nodes)
|
|
flow_classic_queue(Config) ->
|
|
%% New nodes lookup via persistent_term:get/1 (since 4.0.0)
|
|
%% Old nodes lookup via application:get_env/2. (that is taken care of by flow/3)
|
|
%% Therefore, we set both persistent_term and application.
|
|
Key = credit_flow_default_credit,
|
|
Val = {2, 1},
|
|
DefaultVal = rabbit_ct_broker_helpers:rpc(Config, persistent_term, get, [Key]),
|
|
Result = rpc_all(Config, persistent_term, put, [Key, Val]),
|
|
?assert(lists:all(fun(R) -> R =:= ok end, Result)),
|
|
|
|
flow(Config, {rabbit, Key, Val}, <<"classic">>),
|
|
|
|
?assertEqual(Result, rpc_all(Config, persistent_term, put, [Key, DefaultVal])),
|
|
ok.
|
|
|
|
flow_quorum_queue(Config) ->
|
|
flow(Config, {rabbit, quorum_commands_soft_limit, 1}, <<"quorum">>).
|
|
|
|
flow_stream(Config) ->
|
|
flow(Config, {rabbit, stream_messages_soft_limit, 1}, <<"stream">>).
|
|
|
|
flow(Config, {App, Par, Val}, QueueType)
|
|
when is_binary(QueueType) ->
|
|
{ok, DefaultVal} = rpc(Config, application, get_env, [App, Par]),
|
|
Result = rpc_all(Config, application, set_env, [App, Par, Val]),
|
|
?assert(lists:all(fun(R) -> R =:= ok end, Result)),
|
|
|
|
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
|
QueueName = Topic = atom_to_binary(?FUNCTION_NAME),
|
|
declare_queue(Ch, QueueName, [{<<"x-queue-type">>, longstr, QueueType}]),
|
|
bind(Ch, QueueName, Topic),
|
|
|
|
NumMsgs = 1000,
|
|
C = connect(?FUNCTION_NAME, Config, [{retry_interval, 600},
|
|
{max_inflight, NumMsgs}]),
|
|
TestPid = self(),
|
|
lists:foreach(
|
|
fun(N) ->
|
|
%% Publish async all messages at once to trigger flow control
|
|
ok = emqtt:publish_async(C, Topic, integer_to_binary(N), qos1,
|
|
{fun(N0, {ok, #{reason_code_name := success}}) ->
|
|
TestPid ! {self(), N0}
|
|
end, [N]})
|
|
end, lists:seq(1, NumMsgs)),
|
|
ok = await_confirms_ordered(C, 1, NumMsgs),
|
|
eventually(?_assertEqual(
|
|
[[integer_to_binary(NumMsgs)]],
|
|
rabbitmqctl_list(Config, 0, ["list_queues", "messages", "--no-table-headers"])
|
|
), 1000, 10),
|
|
|
|
delete_queue(Ch, QueueName),
|
|
ok = emqtt:disconnect(C),
|
|
?awaitMatch([],
|
|
all_connection_pids(Config), 10_000, 1000),
|
|
?assertEqual(Result,
|
|
rpc_all(Config, application, set_env, [App, Par, DefaultVal])),
|
|
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
|
|
|
events(Config) ->
|
|
ok = rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config, event_recorder),
|
|
Server = get_node_config(Config, 0, nodename),
|
|
ok = gen_event:add_handler({rabbit_event, Server}, event_recorder, []),
|
|
|
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
C = connect(ClientId, Config),
|
|
|
|
[E0, E1] = get_events(Server),
|
|
assert_event_type(user_authentication_success, E0),
|
|
assert_event_prop([{name, <<"guest">>},
|
|
{connection_type, network}],
|
|
E0),
|
|
assert_event_type(connection_created, E1),
|
|
[ConnectionPid] = all_connection_pids(Config),
|
|
ProtoName = case ?config(websocket, Config) of
|
|
true -> 'Web MQTT';
|
|
false -> 'MQTT'
|
|
end,
|
|
ProtoVer = case ?config(mqtt_version, Config) of
|
|
v3 -> {3,1,0};
|
|
v4 -> {3,1,1};
|
|
v5 -> {5,0}
|
|
end,
|
|
ExpectedConnectionProps = [{protocol, {ProtoName, ProtoVer}},
|
|
{node, Server},
|
|
{vhost, <<"/">>},
|
|
{user, <<"guest">>},
|
|
{client_properties, [{client_id, longstr, ClientId}]},
|
|
{pid, ConnectionPid}],
|
|
assert_event_prop(ExpectedConnectionProps, E1),
|
|
|
|
Qos = 0,
|
|
MqttTopic = <<"my/topic">>,
|
|
AmqpTopic = <<"my.topic">>,
|
|
{ok, _, [Qos]} = emqtt:subscribe(C, MqttTopic, Qos),
|
|
|
|
QueueNameBin = <<"mqtt-subscription-", ClientId/binary, "qos0">>,
|
|
QueueName = {resource, <<"/">>, queue, QueueNameBin},
|
|
[E2, E3] = get_events(Server),
|
|
assert_event_type(queue_created, E2),
|
|
assert_event_prop([{name, QueueName},
|
|
{durable, true},
|
|
{auto_delete, false},
|
|
{exclusive, true},
|
|
{type, rabbit_mqtt_qos0_queue},
|
|
{arguments, []}],
|
|
E2),
|
|
assert_event_type(binding_created, E3),
|
|
ExpectedBindingArgs = case ?config(mqtt_version, Config) of
|
|
v5 -> [{mqtt_subscription_opts, Qos, false, false, 0, undefined},
|
|
{<<"x-binding-key">>, longstr, AmqpTopic}];
|
|
_ -> []
|
|
end,
|
|
assert_event_prop([{source_name, <<"amq.topic">>},
|
|
{source_kind, exchange},
|
|
{destination_name, QueueNameBin},
|
|
{destination_kind, queue},
|
|
{routing_key, AmqpTopic},
|
|
{arguments, ExpectedBindingArgs}],
|
|
E3),
|
|
|
|
{ok, _, _} = emqtt:unsubscribe(C, MqttTopic),
|
|
|
|
[E4] = get_events(Server),
|
|
assert_event_type(binding_deleted, E4),
|
|
|
|
ok = emqtt:disconnect(C),
|
|
|
|
[E5, E6] = get_events(Server),
|
|
assert_event_type(connection_closed, E5),
|
|
?assertEqual(E1#event.props, E5#event.props,
|
|
"connection_closed event props should match connection_created event props. "
|
|
"See https://github.com/rabbitmq/rabbitmq-server/discussions/6331"),
|
|
assert_event_type(queue_deleted, E6),
|
|
assert_event_prop({name, QueueName}, E6),
|
|
|
|
ok = gen_event:delete_handler({rabbit_event, Server}, event_recorder, []).
|
|
|
|
internal_event_handler(Config) ->
|
|
Server = get_node_config(Config, 0, nodename),
|
|
ok = gen_event:call({rabbit_event, Server}, rabbit_mqtt_internal_event_handler, ignored_request, 1000).
|
|
|
|
global_counters(Config) ->
|
|
C = connect(?FUNCTION_NAME, Config),
|
|
|
|
Topic0 = <<"test-topic0">>,
|
|
Topic1 = <<"test-topic1">>,
|
|
Topic2 = <<"test-topic2">>,
|
|
{ok, _, [0]} = emqtt:subscribe(C, Topic0, qos0),
|
|
{ok, _, [1]} = emqtt:subscribe(C, Topic1, qos1),
|
|
{ok, _, [1]} = emqtt:subscribe(C, Topic2, qos1),
|
|
|
|
ok = emqtt:publish(C, Topic0, <<"testm0">>, qos0),
|
|
ok = emqtt:publish(C, Topic1, <<"testm1">>, qos0),
|
|
{ok, _} = emqtt:publish(C, Topic2, <<"testm2">>, qos1),
|
|
ok = emqtt:publish(C, <<"no/queue/bound">>, <<"msg-dropped">>, qos0),
|
|
{ok, Pub} = emqtt:publish(C, <<"no/queue/bound">>, <<"msg-returned">>, qos1),
|
|
case ?config(mqtt_version, Config) of
|
|
v3 -> ok;
|
|
v4 -> ok;
|
|
v5 -> ?assertMatch(#{reason_code_name := no_matching_subscribers}, Pub)
|
|
end,
|
|
|
|
ok = expect_publishes(C, Topic0, [<<"testm0">>]),
|
|
ok = expect_publishes(C, Topic1, [<<"testm1">>]),
|
|
ok = expect_publishes(C, Topic2, [<<"testm2">>]),
|
|
|
|
ProtoVer = ?config(mqtt_version, Config),
|
|
?assertEqual(#{publishers => 1,
|
|
consumers => 1,
|
|
messages_confirmed_total => 2,
|
|
messages_received_confirm_total => 2,
|
|
messages_received_total => 5,
|
|
messages_routed_total => 3,
|
|
messages_unroutable_dropped_total => 1,
|
|
messages_unroutable_returned_total => 1},
|
|
get_global_counters(Config, ProtoVer)),
|
|
?assertEqual(#{messages_delivered_total => 2,
|
|
messages_acknowledged_total => 1,
|
|
messages_delivered_consume_auto_ack_total => 1,
|
|
messages_delivered_consume_manual_ack_total => 1,
|
|
messages_delivered_get_auto_ack_total => 0,
|
|
messages_delivered_get_manual_ack_total => 0,
|
|
messages_get_empty_total => 0,
|
|
messages_redelivered_total => 0},
|
|
get_global_counters(Config, ProtoVer, 0, [{queue_type, rabbit_classic_queue}])),
|
|
?assertEqual(#{messages_delivered_total => 1,
|
|
messages_acknowledged_total => 0,
|
|
messages_delivered_consume_auto_ack_total => 1,
|
|
messages_delivered_consume_manual_ack_total => 0,
|
|
messages_delivered_get_auto_ack_total => 0,
|
|
messages_delivered_get_manual_ack_total => 0,
|
|
messages_get_empty_total => 0,
|
|
messages_redelivered_total => 0},
|
|
get_global_counters(Config, ProtoVer, 0, [{queue_type, rabbit_mqtt_qos0_queue}])),
|
|
|
|
{ok, _, _} = emqtt:unsubscribe(C, Topic1),
|
|
?assertEqual(1, maps:get(consumers, get_global_counters(Config, ProtoVer))),
|
|
|
|
ok = emqtt:disconnect(C),
|
|
eventually(?_assertEqual(#{publishers => 0,
|
|
consumers => 0,
|
|
messages_confirmed_total => 2,
|
|
messages_received_confirm_total => 2,
|
|
messages_received_total => 5,
|
|
messages_routed_total => 3,
|
|
messages_unroutable_dropped_total => 1,
|
|
messages_unroutable_returned_total => 1},
|
|
get_global_counters(Config, ProtoVer))).
|
|
|
|
message_size_metrics(Config) ->
|
|
Protocol = case ?config(mqtt_version, Config) of
|
|
v4 -> mqtt311;
|
|
v5 -> mqtt50
|
|
end,
|
|
BucketsBefore = rpc(Config, rabbit_msg_size_metrics, raw_buckets, [Protocol]),
|
|
|
|
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
C = connect(ClientId, Config),
|
|
{ok, _, [0]} = emqtt:subscribe(C, Topic, qos0),
|
|
Payload1B = <<255>>,
|
|
Payload500B = binary:copy(Payload1B, 500),
|
|
Payload5KB = binary:copy(Payload1B, 5_000),
|
|
Payload2MB = binary:copy(Payload1B, 2_000_000),
|
|
Payloads = [Payload2MB, Payload5KB, Payload500B, Payload1B, Payload500B],
|
|
[ok = emqtt:publish(C, Topic, P, qos0) || P <- Payloads],
|
|
ok = expect_publishes(C, Topic, Payloads),
|
|
|
|
BucketsAfter = rpc(Config, rabbit_msg_size_metrics, raw_buckets, [Protocol]),
|
|
?assertEqual(
|
|
[{100, 1},
|
|
{1000, 2},
|
|
{10_000, 1},
|
|
{10_000_000, 1}],
|
|
rabbit_msg_size_metrics:diff_raw_buckets(BucketsAfter, BucketsBefore)),
|
|
|
|
ok = emqtt:disconnect(C).
|
|
|
|
pubsub(Config) ->
|
|
Topic0 = <<"t/0">>,
|
|
Topic1 = <<"t/1">>,
|
|
C1 = connect(<<"c1">>, Config, 1, []),
|
|
{ok, _, [1]} = emqtt:subscribe(C1, Topic1, qos1),
|
|
C0 = connect(<<"c0">>, Config, 0, []),
|
|
{ok, _, [1]} = emqtt:subscribe(C0, Topic0, qos1),
|
|
|
|
{ok, _} = emqtt:publish(C0, Topic1, <<"m1">>, qos1),
|
|
receive {publish, #{client_pid := C1,
|
|
qos := 1,
|
|
payload := <<"m1">>}} -> ok
|
|
after ?TIMEOUT -> ct:fail("missing m1")
|
|
end,
|
|
|
|
ok = emqtt:publish(C0, Topic1, <<"m2">>, qos0),
|
|
receive {publish, #{client_pid := C1,
|
|
qos := 0,
|
|
payload := <<"m2">>}} -> ok
|
|
after ?TIMEOUT -> ct:fail("missing m2")
|
|
end,
|
|
|
|
{ok, _} = emqtt:publish(C1, Topic0, <<"m3">>, qos1),
|
|
receive {publish, #{client_pid := C0,
|
|
qos := 1,
|
|
payload := <<"m3">>}} -> ok
|
|
after ?TIMEOUT -> ct:fail("missing m3")
|
|
end,
|
|
|
|
ok = emqtt:publish(C1, Topic0, <<"m4">>, qos0),
|
|
receive {publish, #{client_pid := C0,
|
|
qos := 0,
|
|
payload := <<"m4">>}} -> ok
|
|
after ?TIMEOUT -> ct:fail("missing m4")
|
|
end,
|
|
|
|
ok = emqtt:disconnect(C0),
|
|
ok = emqtt:disconnect(C1).
|
|
|
|
queue_down_qos1(Config) ->
|
|
{Conn1, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 1),
|
|
CQ = Topic = atom_to_binary(?FUNCTION_NAME),
|
|
declare_queue(Ch1, CQ, []),
|
|
bind(Ch1, CQ, Topic),
|
|
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn1, Ch1),
|
|
ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
|
|
C = connect(?FUNCTION_NAME, Config, [{retry_interval, 2}]),
|
|
|
|
%% classic queue is down, therefore message is rejected
|
|
V = ?config(mqtt_version, Config),
|
|
if V =:= v3 orelse V =:= v4 ->
|
|
?assertEqual(puback_timeout, util:publish_qos1_timeout(C, Topic, <<"msg">>, 500)),
|
|
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
|
|
%% Classic queue is up. Therefore, message should arrive.
|
|
eventually(?_assertEqual([[<<"1">>]],
|
|
rabbitmqctl_list(Config, 1, ["list_queues", "messages", "--no-table-headers"])),
|
|
500, 20);
|
|
V =:= v5 ->
|
|
?assertMatch({ok, #{reason_code_name := implementation_specific_error}},
|
|
emqtt:publish(C, Topic, <<"msg">>, qos1)),
|
|
ok = rabbit_ct_broker_helpers:start_node(Config, 1)
|
|
end,
|
|
|
|
{Conn, Ch0} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
|
delete_queue(Ch0, CQ),
|
|
ok = emqtt:disconnect(C),
|
|
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch0).
|
|
|
|
%% Consuming classic queue on a different node goes down.
|
|
consuming_classic_queue_down(Config) ->
|
|
[Server1, _Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
|
ClientId = Topic = atom_to_binary(?FUNCTION_NAME),
|
|
|
|
%% Declare classic queue on Server1.
|
|
C1 = connect(ClientId, Config, non_clean_sess_opts()),
|
|
{ok, _, _} = emqtt:subscribe(C1, Topic, qos1),
|
|
ok = emqtt:disconnect(C1),
|
|
|
|
%% Consume from Server3.
|
|
C2 = connect(ClientId, Config, Server3, non_clean_sess_opts()),
|
|
|
|
ProtoVer = ?config(mqtt_version, Config),
|
|
?assertMatch(#{consumers := 1},
|
|
get_global_counters(Config, ProtoVer, Server3)),
|
|
|
|
%% Let's stop the queue leader node.
|
|
process_flag(trap_exit, true),
|
|
ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
|
|
|
|
%% When the dedicated MQTT connection queue goes down, it is reasonable
|
|
%% that the server closes the MQTT connection because the MQTT client cannot consume anymore.
|
|
eventually(?_assertMatch(#{consumers := 0},
|
|
get_global_counters(Config, ProtoVer, Server3)),
|
|
1000, 5),
|
|
await_exit(C2),
|
|
|
|
%% Cleanup
|
|
ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
|
|
C3 = connect(ClientId, Config, Server3, [{clean_start, true}]),
|
|
ok = emqtt:disconnect(C3),
|
|
?assertEqual([],
|
|
rpc(Config, Server1, rabbit_amqqueue, list, [])),
|
|
ok.
|
|
|
|
delete_create_queue(Config) ->
|
|
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
|
CQ1 = <<"classic-queue-1-delete-create">>,
|
|
CQ2 = <<"classic-queue-2-delete-create">>,
|
|
QQ = <<"quorum-queue-delete-create">>,
|
|
Topic = atom_to_binary(?FUNCTION_NAME),
|
|
|
|
DeclareQueues = fun() ->
|
|
declare_queue(Ch, CQ1, []),
|
|
bind(Ch, CQ1, Topic),
|
|
declare_queue(Ch, CQ2, []),
|
|
bind(Ch, CQ2, Topic),
|
|
declare_queue(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
|
|
bind(Ch, QQ, Topic)
|
|
end,
|
|
DeclareQueues(),
|
|
|
|
%% some large retry_interval to avoid re-sending
|
|
C = connect(?FUNCTION_NAME, Config, [{retry_interval, 300}]),
|
|
NumMsgs = 50,
|
|
TestPid = self(),
|
|
spawn(
|
|
fun() ->
|
|
lists:foreach(
|
|
fun(N) ->
|
|
ok = emqtt:publish_async(C, Topic, integer_to_binary(N), qos1,
|
|
{fun(N0, {ok, #{reason_code_name := success}}) ->
|
|
TestPid ! {self(), N0}
|
|
end, [N]})
|
|
end, lists:seq(1, NumMsgs))
|
|
end),
|
|
|
|
%% Delete queues while sending to them.
|
|
%% We want to test the path where a queue is deleted while confirms are outstanding.
|
|
timer:sleep(2),
|
|
delete_queue(Ch, [CQ1, QQ]),
|
|
%% Give queues some time to be fully deleted
|
|
%% TODO: wait longer for quorum queues in mixed mode as it can take longer
|
|
%% for deletion to complete, delete timeout is 5s so we need to exceed that
|
|
timer:sleep(6000),
|
|
|
|
%% We expect confirms for all messages.
|
|
%% Confirm here does not mean that messages made it ever to the deleted queues.
|
|
%% It is valid for confirms to sporadically arrive out of order: This happens when the classic
|
|
%% queue is being deleted while the remaining messages are routed and confirmed to the 2nd and 3rd queues
|
|
%% before the monitor to the classic queue fires.
|
|
ok = await_confirms_unordered(C, NumMsgs),
|
|
|
|
%% Recreate the same queues.
|
|
DeclareQueues(),
|
|
|
|
%% Sending a message to each of them should work.
|
|
{ok, _} = emqtt:publish(C, Topic, <<"m">>, qos1),
|
|
eventually(?_assertEqual(lists:sort([[CQ1, <<"1">>],
|
|
%% This queue should have all messages because we did not delete it.
|
|
[CQ2, integer_to_binary(NumMsgs + 1)],
|
|
[QQ, <<"1">>]]),
|
|
lists:sort(rabbitmqctl_list(Config, 0, ["list_queues", "name", "messages", "--no-table-headers"]))),
|
|
1000, 10),
|
|
|
|
delete_queue(Ch, [CQ1, CQ2, QQ]),
|
|
ok = emqtt:disconnect(C),
|
|
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
|
|
|
session_expiry(Config) ->
|
|
App = rabbitmq_mqtt,
|
|
Par = max_session_expiry_interval_seconds,
|
|
Seconds = 1,
|
|
{ok, DefaultVal} = rpc(Config, application, get_env, [App, Par]),
|
|
ok = rpc(Config, application, set_env, [App, Par, Seconds]),
|
|
|
|
C = connect(?FUNCTION_NAME, Config, non_clean_sess_opts()),
|
|
{ok, _, [0, 1]} = emqtt:subscribe(C, [{<<"topic0">>, qos0},
|
|
{<<"topic1">>, qos1}]),
|
|
ok = emqtt:disconnect(C),
|
|
|
|
?assertEqual(2, rpc(Config, rabbit_amqqueue, count, [])),
|
|
timer:sleep(timer:seconds(Seconds) + 100),
|
|
%% On a slow machine, this test might fail. Let's consider
|
|
%% the expiry on a longer time window
|
|
?awaitMatch(0, rpc(Config, rabbit_amqqueue, count, []), 15_000, 1000),
|
|
|
|
ok = rpc(Config, application, set_env, [App, Par, DefaultVal]).
|
|
|
|
non_clean_sess_reconnect_qos1(Config) ->
|
|
non_clean_sess_reconnect(Config, 1).
|
|
|
|
non_clean_sess_reconnect_qos0(Config) ->
|
|
non_clean_sess_reconnect(Config, 0).
|
|
|
|
non_clean_sess_reconnect(Config, SubscriptionQoS) ->
|
|
Pub = connect(<<"publisher">>, Config),
|
|
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
|
|
C1 = connect(ClientId, Config, non_clean_sess_opts()),
|
|
{ok, _, [SubscriptionQoS]} = emqtt:subscribe(C1, Topic, SubscriptionQoS),
|
|
ok = await_consumer_count(1, ClientId, SubscriptionQoS, Config),
|
|
|
|
ok = emqtt:disconnect(C1),
|
|
ok = await_consumer_count(0, ClientId, SubscriptionQoS, Config),
|
|
|
|
ok = emqtt:publish(Pub, Topic, <<"msg-3-qos0">>, qos0),
|
|
{ok, _} = emqtt:publish(Pub, Topic, <<"msg-4-qos1">>, qos1),
|
|
|
|
C2 = connect(ClientId, Config, non_clean_sess_opts()),
|
|
%% Server should reply in CONNACK that it has session state.
|
|
?assertEqual({session_present, 1},
|
|
proplists:lookup(session_present, emqtt:info(C2))),
|
|
ok = await_consumer_count(1, ClientId, SubscriptionQoS, Config),
|
|
|
|
ok = emqtt:publish(Pub, Topic, <<"msg-5-qos0">>, qos0),
|
|
{ok, _} = emqtt:publish(Pub, Topic, <<"msg-6-qos1">>, qos1),
|
|
|
|
%% shouldn't receive message after unsubscribe
|
|
{ok, _, _} = emqtt:unsubscribe(C2, Topic),
|
|
?assertMatch(#{consumers := 0},
|
|
get_global_counters(Config)),
|
|
{ok, _} = emqtt:publish(Pub, Topic, <<"msg-7-qos0">>, qos1),
|
|
|
|
%% "After the disconnection of a Session that had CleanSession set to 0, the Server MUST store
|
|
%% further QoS 1 and QoS 2 messages that match any subscriptions that the client had at the
|
|
%% time of disconnection as part of the Session state [MQTT-3.1.2-5].
|
|
%% It MAY also store QoS 0 messages that meet the same criteria."
|
|
%% Starting with RabbitMQ v3.12 we store QoS 0 messages as well.
|
|
ok = expect_publishes(C2, Topic, [<<"msg-3-qos0">>, <<"msg-4-qos1">>,
|
|
<<"msg-5-qos0">>, <<"msg-6-qos1">>]),
|
|
{publish_not_received, <<"msg-7-qos0">>} = expect_publishes(C2, Topic, [<<"msg-7-qos0">>]),
|
|
|
|
ok = emqtt:disconnect(Pub),
|
|
ok = emqtt:disconnect(C2),
|
|
%% connect with clean sess true to clean up
|
|
C3 = connect(ClientId, Config, [{clean_start, true}]),
|
|
ok = emqtt:disconnect(C3).
|
|
|
|
non_clean_sess_reconnect_qos0_and_qos1(Config) ->
|
|
Pub = connect(<<"publisher">>, Config),
|
|
Topic0 = <<"t/0">>,
|
|
Topic1 = <<"t/1">>,
|
|
ClientId = ?FUNCTION_NAME,
|
|
|
|
C1 = connect(ClientId, Config, non_clean_sess_opts()),
|
|
{ok, _, [1, 0]} = emqtt:subscribe(C1, [{Topic1, qos1},
|
|
{Topic0, qos0}]),
|
|
ok = await_consumer_count(1, ClientId, 0, Config),
|
|
ok = await_consumer_count(1, ClientId, 1, Config),
|
|
|
|
ok = emqtt:disconnect(C1),
|
|
ok = await_consumer_count(0, ClientId, 0, Config),
|
|
ok = await_consumer_count(0, ClientId, 1, Config),
|
|
{ok, _} = emqtt:publish(Pub, Topic0, <<"msg-0">>, qos1),
|
|
{ok, _} = emqtt:publish(Pub, Topic1, <<"msg-1">>, qos1),
|
|
|
|
C2 = connect(ClientId, Config, non_clean_sess_opts()),
|
|
ok = await_consumer_count(1, ClientId, 0, Config),
|
|
ok = await_consumer_count(1, ClientId, 1, Config),
|
|
ok = expect_publishes(C2, Topic0, [<<"msg-0">>]),
|
|
ok = expect_publishes(C2, Topic1, [<<"msg-1">>]),
|
|
|
|
ok = emqtt:disconnect(Pub),
|
|
ok = emqtt:disconnect(C2),
|
|
C3 = connect(ClientId, Config, [{clean_start, true}]),
|
|
ok = emqtt:disconnect(C3).
|
|
|
|
non_clean_sess_empty_client_id(Config) ->
|
|
{C, Connect} = util:start_client(<<>>, Config, 0, non_clean_sess_opts()),
|
|
case ?config(mqtt_version, Config) of
|
|
V when V =:= v3;
|
|
V =:= v4 ->
|
|
%% "If the Client supplies a zero-byte ClientId with CleanSession set to 0,
|
|
%% the Server MUST respond to the CONNECT Packet with a CONNACK return code 0x02
|
|
%% (Identifier rejected) and then close the Network Connection" [MQTT-3.1.3-8].
|
|
process_flag(trap_exit, true),
|
|
?assertMatch({error, {client_identifier_not_valid, _}}, Connect(C)),
|
|
ok = await_exit(C);
|
|
v5 ->
|
|
%% "If the Client connects using a zero length Client Identifier, the Server MUST respond with
|
|
%% a CONNACK containing an Assigned Client Identifier. The Assigned Client Identifier MUST be
|
|
%% a new Client Identifier not used by any other Session currently in the Server [MQTT-3.2.2-16]."
|
|
{ok, #{'Assigned-Client-Identifier' := ClientId}} = Connect(C),
|
|
{C2, Connect2} = util:start_client(<<>>, Config, 0, [{clean_start, true}]),
|
|
{ok, #{'Assigned-Client-Identifier' := ClientId2}} = Connect2(C2),
|
|
?assertNotEqual(ClientId, ClientId2),
|
|
ok = emqtt:disconnect(C),
|
|
ok = emqtt:disconnect(C2)
|
|
end.
|
|
|
|
subscribe_same_topic_same_qos(Config) ->
|
|
C = connect(?FUNCTION_NAME, Config),
|
|
Topic = <<"a/b">>,
|
|
|
|
{ok, _} = emqtt:publish(C, Topic, <<"retained">>, [{retain, true},
|
|
{qos, 1}]),
|
|
%% Subscribe with QoS 0
|
|
{ok, _, [0]} = emqtt:subscribe(C, Topic, qos0),
|
|
{ok, _} = emqtt:publish(C, Topic, <<"msg1">>, qos1),
|
|
%% Subscribe to same topic with same QoS
|
|
{ok, _, [0]} = emqtt:subscribe(C, Topic, qos0),
|
|
{ok, _} = emqtt:publish(C, Topic, <<"msg2">>, qos1),
|
|
|
|
%% "Any existing retained messages matching the Topic Filter MUST be re-sent" [MQTT-3.8.4-3]
|
|
ok = expect_publishes(C, Topic, [<<"retained">>, <<"msg1">>,
|
|
<<"retained">>, <<"msg2">>
|
|
]),
|
|
ok = emqtt:disconnect(C).
|
|
|
|
subscribe_same_topic_different_qos(Config) ->
|
|
C = connect(?FUNCTION_NAME, Config, non_clean_sess_opts()),
|
|
Topic = <<"b/c">>,
|
|
|
|
{ok, _} = emqtt:publish(C, Topic, <<"retained">>, [{retain, true},
|
|
{qos, 1}]),
|
|
%% Subscribe with QoS 0
|
|
{ok, _, [0]} = emqtt:subscribe(C, Topic, qos0),
|
|
{ok, _} = emqtt:publish(C, Topic, <<"msg1">>, qos1),
|
|
%% Subscribe to same topic with QoS 1
|
|
{ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
|
|
{ok, _} = emqtt:publish(C, Topic, <<"msg2">>, qos1),
|
|
%% Subscribe to same topic with QoS 0 again
|
|
{ok, _, [0]} = emqtt:subscribe(C, Topic, qos0),
|
|
{ok, _} = emqtt:publish(C, Topic, <<"msg3">>, qos1),
|
|
|
|
%% "Any existing retained messages matching the Topic Filter MUST be re-sent" [MQTT-3.8.4-3]
|
|
ok = expect_publishes(C, Topic, [<<"retained">>, <<"msg1">>,
|
|
<<"retained">>, <<"msg2">>,
|
|
<<"retained">>, <<"msg3">>]),
|
|
|
|
%% There should be exactly one consumer for each queue: qos0 and qos1
|
|
Consumers = rpc(Config, rabbit_amqqueue, consumers_all, [<<"/">>]),
|
|
?assertEqual(2, length(Consumers)),
|
|
|
|
ok = emqtt:disconnect(C),
|
|
C1 = connect(?FUNCTION_NAME, Config, [{clean_start, true}]),
|
|
ok = emqtt:disconnect(C1).
|
|
|
|
subscribe_multiple(Config) ->
|
|
C = connect(?FUNCTION_NAME, Config),
|
|
%% Subscribe to multiple topics at once
|
|
?assertMatch({ok, _, [0, 1]},
|
|
emqtt:subscribe(C, [{<<"topic0">>, qos0},
|
|
{<<"topic1">>, qos1}])),
|
|
ok = emqtt:disconnect(C).
|
|
|
|
large_message_mqtt_to_mqtt(Config) ->
|
|
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
C = connect(ClientId, Config),
|
|
{ok, _, [1]} = emqtt:subscribe(C, {Topic, qos1}),
|
|
|
|
Payload0 = binary:copy(<<"x">>, 8_000_000),
|
|
Payload = <<Payload0/binary, "y">>,
|
|
{ok, _} = emqtt:publish(C, Topic, Payload, qos1),
|
|
ok = expect_publishes(C, Topic, [Payload]),
|
|
ok = emqtt:disconnect(C).
|
|
|
|
large_message_amqp_to_mqtt(Config) ->
|
|
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
C = connect(ClientId, Config),
|
|
{ok, _, [1]} = emqtt:subscribe(C, {Topic, qos1}),
|
|
|
|
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
|
Payload0 = binary:copy(<<"x">>, 8_000_000),
|
|
Payload = <<Payload0/binary, "y">>,
|
|
amqp_channel:call(Ch,
|
|
#'basic.publish'{exchange = <<"amq.topic">>,
|
|
routing_key = Topic},
|
|
#amqp_msg{payload = Payload}),
|
|
ok = expect_publishes(C, Topic, [Payload]),
|
|
ok = emqtt:disconnect(C),
|
|
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
|
|
|
amqp_to_mqtt_qos0(Config) ->
|
|
Topic = ClientId = Payload = atom_to_binary(?FUNCTION_NAME),
|
|
C = connect(ClientId, Config),
|
|
{ok, _, [0]} = emqtt:subscribe(C, {Topic, qos0}),
|
|
|
|
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
|
amqp_channel:call(Ch,
|
|
#'basic.publish'{exchange = <<"amq.topic">>,
|
|
routing_key = Topic},
|
|
#amqp_msg{payload = Payload}),
|
|
ok = expect_publishes(C, Topic, [Payload]),
|
|
ok = emqtt:disconnect(C),
|
|
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
|
|
|
%% Packet identifier is a non zero two byte integer.
|
|
%% Test that the server wraps around the packet identifier.
|
|
many_qos1_messages(Config) ->
|
|
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
NumMsgs = 16#ffff + 100,
|
|
C = connect(ClientId, Config, 0, [{retry_interval, 600},
|
|
{max_inflight, NumMsgs div 8}]),
|
|
{ok, _, [1]} = emqtt:subscribe(C, {Topic, qos1}),
|
|
Payloads = lists:map(fun integer_to_binary/1, lists:seq(1, NumMsgs)),
|
|
Self = self(),
|
|
Target = lists:last(Payloads),
|
|
lists:foreach(fun(P) ->
|
|
Cb = {fun(T, _) when T == Target ->
|
|
Self ! proceed;
|
|
(_, _) ->
|
|
ok
|
|
end, [P]},
|
|
ok = emqtt:publish_async(C, Topic, P, qos1, Cb)
|
|
end, Payloads),
|
|
receive
|
|
proceed -> ok
|
|
after 300_000 ->
|
|
ct:fail("message to proceed never received")
|
|
end,
|
|
ok = expect_publishes(C, Topic, Payloads),
|
|
ok = emqtt:disconnect(C).
|
|
|
|
%% This test is mostly interesting in mixed version mode where feature flag
|
|
%% rabbit_mqtt_qos0_queue is disabled and therefore a classic queue gets created.
|
|
rabbit_mqtt_qos0_queue(Config) ->
|
|
Topic = atom_to_binary(?FUNCTION_NAME),
|
|
|
|
%% Place MQTT subscriber process on new node in mixed version.
|
|
Sub = connect(<<"subscriber">>, Config),
|
|
{ok, _, [0]} = emqtt:subscribe(Sub, Topic, qos0),
|
|
|
|
%% Place MQTT publisher process on old node in mixed version.
|
|
Pub = connect(<<"publisher">>, Config, 1, []),
|
|
|
|
Msg = <<"msg">>,
|
|
ok = emqtt:publish(Pub, Topic, Msg, qos0),
|
|
ok = expect_publishes(Sub, Topic, [Msg]),
|
|
|
|
ok = emqtt:disconnect(Sub),
|
|
ok = emqtt:disconnect(Pub).
|
|
|
|
rabbit_mqtt_qos0_queue_kill_node(Config) ->
|
|
Topic1 = <<"t/1">>,
|
|
Topic2 = <<"t/2">>,
|
|
Pub = connect(<<"publisher">>, Config, 2, []),
|
|
|
|
SubscriberId = <<"subscriber">>,
|
|
Sub0 = connect(SubscriberId, Config, 0, []),
|
|
{ok, _, [0]} = emqtt:subscribe(Sub0, Topic1, qos0),
|
|
ok = await_metadata_store_consistent(Config, 2),
|
|
ok = emqtt:publish(Pub, Topic1, <<"m0">>, qos0),
|
|
ok = expect_publishes(Sub0, Topic1, [<<"m0">>]),
|
|
|
|
process_flag(trap_exit, true),
|
|
ok = rabbit_ct_broker_helpers:kill_node(Config, 0),
|
|
ok = await_exit(Sub0),
|
|
%% Wait to run rabbit_amqqueue:on_node_down/1 on both live nodes.
|
|
timer:sleep(500),
|
|
%% Re-connect to a live node with same MQTT client ID.
|
|
Sub1 = connect(SubscriberId, Config, 1, []),
|
|
{ok, _, [0]} = emqtt:subscribe(Sub1, Topic2, qos0),
|
|
ok = await_metadata_store_consistent(Config, 2),
|
|
ok = emqtt:publish(Pub, Topic2, <<"m1">>, qos0),
|
|
ok = expect_publishes(Sub1, Topic2, [<<"m1">>]),
|
|
%% Since we started a new clean session, previous subscription should have been deleted.
|
|
ok = emqtt:publish(Pub, Topic1, <<"m2">>, qos0),
|
|
receive {publish, _} = Publish -> ct:fail({unexpected, Publish})
|
|
after 300 -> ok
|
|
end,
|
|
|
|
ok = rabbit_ct_broker_helpers:start_node(Config, 0),
|
|
ok = rabbit_ct_broker_helpers:kill_node(Config, 1),
|
|
%% This time, do not wait.
|
|
%% rabbit_amqqueue:on_node_down/1 may or may not have run.
|
|
Sub2 = connect(SubscriberId, Config, 2, []),
|
|
{ok, _, [0]} = emqtt:subscribe(Sub2, Topic2, qos0),
|
|
ok = emqtt:publish(Pub, Topic2, <<"m3">>, qos0),
|
|
ok = expect_publishes(Sub2, Topic2, [<<"m3">>]),
|
|
|
|
ok = emqtt:disconnect(Sub2),
|
|
ok = emqtt:disconnect(Pub),
|
|
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
|
|
?assertEqual([], rpc(Config, rabbit_db_binding, get_all, [])).
|
|
|
|
cli_close_all_connections(Config) ->
|
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
C = connect(ClientId, Config),
|
|
process_flag(trap_exit, true),
|
|
{ok, String} = rabbit_ct_broker_helpers:rabbitmqctl(
|
|
Config, 0, ["close_all_connections", "bye"]),
|
|
?assertEqual(match, re:run(String, "Closing .* reason: bye", [{capture, none}])),
|
|
ok = await_exit(C).
|
|
|
|
cli_close_all_user_connections(Config) ->
|
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
C = connect(ClientId, Config),
|
|
process_flag(trap_exit, true),
|
|
{ok, String} = rabbit_ct_broker_helpers:rabbitmqctl(
|
|
Config, 0, ["close_all_user_connections","guest", "bye"]),
|
|
?assertEqual(match, re:run(String, "Closing .* reason: bye", [{capture, none}])),
|
|
ok = await_exit(C).
|
|
|
|
%% Test that MQTT connection can be listed and closed via the rabbitmq_management plugin.
|
|
management_plugin_connection(Config) ->
|
|
KeepaliveSecs = 99,
|
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
Node = atom_to_binary(get_node_config(Config, 0, nodename)),
|
|
|
|
C1 = connect(ClientId, Config, [{keepalive, KeepaliveSecs}]),
|
|
FilterFun =
|
|
fun(#{client_properties := #{client_id := CId}})
|
|
when CId == ClientId -> true;
|
|
(_) -> false
|
|
end,
|
|
%% Sometimes connections remain open from other testcases,
|
|
%% let's match the one we're looking for
|
|
eventually(
|
|
?_assertMatch(
|
|
[_],
|
|
lists:filter(FilterFun, http_get(Config, "/connections"))),
|
|
1000, 10),
|
|
[#{client_properties := #{client_id := ClientId},
|
|
timeout := KeepaliveSecs,
|
|
node := Node,
|
|
name := ConnectionName}] =
|
|
lists:filter(FilterFun, http_get(Config, "/connections")),
|
|
process_flag(trap_exit, true),
|
|
http_delete(Config,
|
|
"/connections/" ++ binary_to_list(uri_string:quote(ConnectionName)),
|
|
?NO_CONTENT),
|
|
await_exit(C1),
|
|
eventually(
|
|
?_assertMatch(
|
|
[],
|
|
lists:filter(FilterFun, http_get(Config, "/connections"))),
|
|
1000, 10),
|
|
eventually(?_assertEqual([], all_connection_pids(Config)), 500, 3),
|
|
|
|
C2 = connect(ClientId, Config, [{keepalive, KeepaliveSecs}]),
|
|
eventually(
|
|
?_assertMatch(
|
|
[_],
|
|
lists:filter(FilterFun, http_get(Config, "/connections"))),
|
|
1000, 10),
|
|
http_delete(Config,
|
|
"/connections/username/guest",
|
|
?NO_CONTENT),
|
|
await_exit(C2),
|
|
eventually(
|
|
?_assertMatch(
|
|
[],
|
|
lists:filter(FilterFun, http_get(Config, "/connections"))),
|
|
1000, 10),
|
|
eventually(?_assertEqual([], all_connection_pids(Config)), 500, 3).
|
|
|
|
management_plugin_enable(Config) ->
|
|
ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, rabbitmq_management),
|
|
ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, rabbitmq_management_agent),
|
|
|
|
%% If the (web) MQTT connection is established **before** the management plugin is enabled,
|
|
%% the management plugin should still list the (web) MQTT connection.
|
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
C = connect(ClientId, Config),
|
|
ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, rabbitmq_management_agent),
|
|
ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, rabbitmq_management),
|
|
FilterFun =
|
|
fun(#{client_properties := #{client_id := CId}})
|
|
when ClientId == CId -> true;
|
|
(_) -> false
|
|
end,
|
|
%% Sometimes connections remain open from other testcases,
|
|
%% let's match the one we're looking for
|
|
eventually(
|
|
?_assertMatch(
|
|
[_],
|
|
lists:filter(FilterFun, http_get(Config, "/connections"))),
|
|
1000, 10),
|
|
|
|
ok = emqtt:disconnect(C).
|
|
|
|
%% Test that queues of type rabbit_mqtt_qos0_queue can be listed via rabbitmqctl.
|
|
cli_list_queues(Config) ->
|
|
C = connect(?FUNCTION_NAME, Config),
|
|
{ok, _, _} = emqtt:subscribe(C, <<"a/b/c">>, qos0),
|
|
|
|
Qs = rabbit_ct_broker_helpers:rabbitmqctl_list(
|
|
Config, 1,
|
|
["list_queues", "--no-table-headers",
|
|
"type", "name", "state", "durable", "auto_delete",
|
|
"arguments", "pid", "owner_pid", "messages", "exclusive_consumer_tag"
|
|
]),
|
|
?assertMatch([[<<"MQTT QoS 0">>, <<"mqtt-subscription-cli_list_queuesqos0">>,
|
|
<<"running">>, <<"true">>, <<"false">>, <<"[]">>, _, _, <<"0">>, <<"">>]],
|
|
Qs),
|
|
|
|
?assertEqual([],
|
|
rabbit_ct_broker_helpers:rabbitmqctl_list(
|
|
Config, 1, ["list_queues", "--local", "--no-table-headers"])
|
|
),
|
|
|
|
ok = emqtt:disconnect(C).
|
|
|
|
maintenance(Config) ->
|
|
C0 = connect(<<"client-0">>, Config, 0, []),
|
|
C1a = connect(<<"client-1a">>, Config, 1, []),
|
|
C1b = connect(<<"client-1b">>, Config, 1, []),
|
|
ClientsNode1 = [C1a, C1b],
|
|
|
|
timer:sleep(500),
|
|
|
|
ok = drain_node(Config, 2),
|
|
ok = revive_node(Config, 2),
|
|
timer:sleep(500),
|
|
[?assert(erlang:is_process_alive(C)) || C <- [C0, C1a, C1b]],
|
|
|
|
process_flag(trap_exit, true),
|
|
ok = drain_node(Config, 1),
|
|
[await_exit(Pid) || Pid <- ClientsNode1],
|
|
[assert_v5_disconnect_reason_code(Config, ?RC_SERVER_SHUTTING_DOWN) || _ <- ClientsNode1],
|
|
ok = revive_node(Config, 1),
|
|
?assert(erlang:is_process_alive(C0)),
|
|
|
|
ok = drain_node(Config, 0),
|
|
await_exit(C0),
|
|
assert_v5_disconnect_reason_code(Config, ?RC_SERVER_SHUTTING_DOWN),
|
|
ok = revive_node(Config, 0).
|
|
|
|
keepalive(Config) ->
|
|
KeepaliveSecs = 1,
|
|
KeepaliveMs = timer:seconds(KeepaliveSecs),
|
|
WillTopic = <<"will/topic">>,
|
|
WillPayload = <<"will-payload">>,
|
|
C1 = connect(?FUNCTION_NAME, Config, [{keepalive, KeepaliveSecs},
|
|
{will_topic, WillTopic},
|
|
{will_payload, WillPayload},
|
|
{will_retain, true},
|
|
{will_qos, 0}]),
|
|
ok = emqtt:publish(C1, <<"ignored">>, <<"msg">>),
|
|
|
|
%% Connection should stay up when client sends PING requests.
|
|
timer:sleep(KeepaliveMs),
|
|
?assertMatch(#{publishers := 1},
|
|
util:get_global_counters(Config)),
|
|
|
|
%% Mock the server socket to not have received any bytes.
|
|
rabbit_ct_broker_helpers:setup_meck(Config),
|
|
Mod = rabbit_net,
|
|
ok = rpc(Config, meck, new, [Mod, [no_link, passthrough]]),
|
|
ok = rpc(Config, meck, expect, [Mod, getstat, 2, {ok, [{recv_oct, 999}]} ]),
|
|
process_flag(trap_exit, true),
|
|
|
|
%% We expect the server to respect the keepalive closing the connection.
|
|
eventually(?_assertMatch(#{publishers := 0},
|
|
util:get_global_counters(Config)),
|
|
KeepaliveMs, 4 * KeepaliveSecs),
|
|
|
|
await_exit(C1),
|
|
assert_v5_disconnect_reason_code(Config, ?RC_KEEP_ALIVE_TIMEOUT),
|
|
?assert(rpc(Config, meck, validate, [Mod])),
|
|
ok = rpc(Config, meck, unload, [Mod]),
|
|
|
|
C2 = connect(<<"client2">>, Config),
|
|
{ok, _, [0]} = emqtt:subscribe(C2, WillTopic),
|
|
receive {publish, #{client_pid := C2,
|
|
dup := false,
|
|
qos := 0,
|
|
retain := true,
|
|
topic := WillTopic,
|
|
payload := WillPayload}} -> ok
|
|
after ?TIMEOUT -> ct:fail("missing will")
|
|
end,
|
|
ok = emqtt:disconnect(C2).
|
|
|
|
keepalive_turned_off(Config) ->
|
|
%% "A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism."
|
|
KeepaliveSecs = 0,
|
|
C = connect(?FUNCTION_NAME, Config, [{keepalive, KeepaliveSecs}]),
|
|
ok = emqtt:publish(C, <<"TopicB">>, <<"Payload">>),
|
|
|
|
%% Mock the server socket to not have received any bytes.
|
|
rabbit_ct_broker_helpers:setup_meck(Config),
|
|
Mod = rabbit_net,
|
|
ok = rpc(Config, meck, new, [Mod, [no_link, passthrough]]),
|
|
ok = rpc(Config, meck, expect, [Mod, getstat, 2, {ok, [{recv_oct, 999}]} ]),
|
|
|
|
rabbit_ct_helpers:consistently(?_assert(erlang:is_process_alive(C))),
|
|
|
|
?assert(rpc(Config, meck, validate, [Mod])),
|
|
ok = rpc(Config, meck, unload, [Mod]),
|
|
ok = emqtt:disconnect(C).
|
|
|
|
duplicate_client_id(Config) ->
|
|
[Server1, Server2, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
|
%% Test session takeover by both new and old node in mixed version clusters.
|
|
ClientId1 = <<"c1">>,
|
|
ClientId2 = <<"c2">>,
|
|
C1a = connect(ClientId1, Config, Server2, []),
|
|
C2a = connect(ClientId2, Config, Server1, []),
|
|
eventually(?_assertEqual(2, length(all_connection_pids(Config)))),
|
|
process_flag(trap_exit, true),
|
|
C1b = connect(ClientId1, Config, Server1, []),
|
|
C2b = connect(ClientId2, Config, Server2, []),
|
|
assert_v5_disconnect_reason_code(Config, ?RC_SESSION_TAKEN_OVER),
|
|
assert_v5_disconnect_reason_code(Config, ?RC_SESSION_TAKEN_OVER),
|
|
await_exit(C1a),
|
|
await_exit(C2a),
|
|
timer:sleep(200),
|
|
?assertEqual(2, length(all_connection_pids(Config))),
|
|
ok = emqtt:disconnect(C1b),
|
|
ok = emqtt:disconnect(C2b),
|
|
eventually(?_assertEqual(0, length(all_connection_pids(Config)))).
|
|
|
|
session_reconnect(Config) ->
|
|
session_switch(Config, true).
|
|
|
|
session_takeover(Config) ->
|
|
session_switch(Config, false).
|
|
|
|
session_switch(Config, Disconnect) ->
|
|
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
%% Connect to old node in mixed version cluster.
|
|
C1 = connect(ClientId, Config, 1, non_clean_sess_opts()),
|
|
{ok, _, [1]} = emqtt:subscribe(C1, Topic, qos1),
|
|
case Disconnect of
|
|
true -> ok = emqtt:disconnect(C1);
|
|
false -> unlink(C1)
|
|
end,
|
|
%% Connect to new node in mixed version cluster.
|
|
C2 = connect(ClientId, Config, 0, non_clean_sess_opts()),
|
|
case Disconnect of
|
|
true -> ok;
|
|
false -> assert_v5_disconnect_reason_code(Config, ?RC_SESSION_TAKEN_OVER)
|
|
end,
|
|
%% New connection should be able to modify subscription.
|
|
{ok, _, [0]} = emqtt:subscribe(C2, Topic, qos0),
|
|
{ok, _} = emqtt:publish(C2, Topic, <<"m1">>, qos1),
|
|
receive {publish, #{client_pid := C2,
|
|
payload := <<"m1">>,
|
|
qos := 0}} -> ok
|
|
after ?TIMEOUT -> ct:fail("did not receive m1 with QoS 0")
|
|
end,
|
|
%% New connection should be able to unsubscribe.
|
|
?assertMatch({ok, _, _}, emqtt:unsubscribe(C2, Topic)),
|
|
{ok, _} = emqtt:publish(C2, Topic, <<"m2">>, qos1),
|
|
receive Unexpected -> ct:fail({unexpected, Unexpected})
|
|
after 300 -> ok
|
|
end,
|
|
|
|
ok = emqtt:disconnect(C2),
|
|
C3 = connect(ClientId, Config, 0, [{clean_start, true}]),
|
|
ok = emqtt:disconnect(C3),
|
|
eventually(?_assertEqual([], all_connection_pids(Config))).
|
|
|
|
block(Config) ->
|
|
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
C = connect(ClientId, Config),
|
|
|
|
{ok, _, _} = emqtt:subscribe(C, Topic),
|
|
{ok, _} = emqtt:publish(C, Topic, <<"Not blocked yet">>, [{qos, 1}]),
|
|
|
|
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0]),
|
|
%% Let it block
|
|
timer:sleep(100),
|
|
|
|
%% Blocked, but still will publish when unblocked
|
|
puback_timeout = publish_qos1_timeout(C, Topic, <<"Now blocked">>, 1000),
|
|
puback_timeout = publish_qos1_timeout(C, Topic, <<"Still blocked">>, 1000),
|
|
|
|
%% Unblock
|
|
rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0.6]),
|
|
ok = expect_publishes(C, Topic, [<<"Not blocked yet">>,
|
|
<<"Now blocked">>,
|
|
<<"Still blocked">>]),
|
|
ok = emqtt:disconnect(C).
|
|
|
|
block_only_publisher(Config) ->
|
|
Topic = atom_to_binary(?FUNCTION_NAME),
|
|
|
|
Opts = [{ack_timeout, 1}],
|
|
Con = connect(<<"background-connection">>, Config, Opts),
|
|
Sub = connect(<<"subscriber-connection">>, Config, Opts),
|
|
Pub = connect(<<"publisher-connection">>, Config, Opts),
|
|
PubSub = connect(<<"publisher-and-subscriber-connection">>, Config, Opts),
|
|
|
|
{ok, _, [1]} = emqtt:subscribe(Sub, Topic, qos1),
|
|
{ok, _, [1]} = emqtt:subscribe(PubSub, Topic, qos1),
|
|
{ok, _} = emqtt:publish(Pub, Topic, <<"from Pub">>, [{qos, 1}]),
|
|
{ok, _} = emqtt:publish(PubSub, Topic, <<"from PubSub">>, [{qos, 1}]),
|
|
ok = expect_publishes(Sub, Topic, [<<"from Pub">>, <<"from PubSub">>]),
|
|
ok = expect_publishes(PubSub, Topic, [<<"from Pub">>, <<"from PubSub">>]),
|
|
|
|
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0]),
|
|
%% Let it block
|
|
timer:sleep(100),
|
|
|
|
%% We expect that the publishing connections are blocked.
|
|
[?assertEqual({error, ack_timeout}, emqtt:ping(Pid)) || Pid <- [Pub, PubSub]],
|
|
%% We expect that the non-publishing connections are not blocked.
|
|
[?assertEqual(pong, emqtt:ping(Pid)) || Pid <- [Con, Sub]],
|
|
|
|
%% While the memory alarm is on, let's turn a non-publishing connection
|
|
%% into a publishing connection.
|
|
{ok, _} = emqtt:publish(Con, Topic, <<"from Con 1">>, [{qos, 1}]),
|
|
%% The very first message still goes through.
|
|
ok = expect_publishes(Sub, Topic, [<<"from Con 1">>]),
|
|
%% But now the new publisher should be blocked as well.
|
|
?assertEqual({error, ack_timeout}, emqtt:ping(Con)),
|
|
?assertEqual(puback_timeout, publish_qos1_timeout(Con, Topic, <<"from Con 2">>, 500)),
|
|
?assertEqual(pong, emqtt:ping(Sub)),
|
|
|
|
rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0.6]),
|
|
%% Let it unblock
|
|
timer:sleep(100),
|
|
|
|
%% All connections are unblocked.
|
|
[?assertEqual(pong, emqtt:ping(Pid)) || Pid <- [Con, Sub, Pub, PubSub]],
|
|
%% The publishing connections should be able to publish again.
|
|
{ok, _} = emqtt:publish(Con, Topic, <<"from Con 3">>, [{qos, 1}]),
|
|
ok = expect_publishes(Sub, Topic, [<<"from Con 2">>, <<"from Con 3">>]),
|
|
ok = expect_publishes(PubSub, Topic, [<<"from Con 1">>, <<"from Con 2">>, <<"from Con 3">>]),
|
|
|
|
[ok = emqtt:disconnect(Pid) || Pid <- [Con, Sub, Pub, PubSub]].
|
|
|
|
clean_session_disconnect_client(Config) ->
|
|
C = connect(?FUNCTION_NAME, Config),
|
|
{ok, _, _} = emqtt:subscribe(C, <<"topic0">>, qos0),
|
|
{ok, _, _} = emqtt:subscribe(C, <<"topic1">>, qos1),
|
|
QsQos0 = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_mqtt_qos0_queue]),
|
|
QsClassic = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]),
|
|
?assertEqual(1, length(QsQos0)),
|
|
?assertEqual(1, length(QsClassic)),
|
|
|
|
ok = emqtt:disconnect(C),
|
|
%% After terminating a clean session, we expect any session state to be cleaned up on the server.
|
|
timer:sleep(200), %% Give some time to clean up exclusive classic queue.
|
|
L = rpc(Config, rabbit_amqqueue, list, []),
|
|
?assertEqual(0, length(L)).
|
|
|
|
clean_session_node_restart(Config) ->
|
|
clean_session_node_down(stop_node, Config).
|
|
|
|
clean_session_node_kill(Config) ->
|
|
clean_session_node_down(kill_node, Config).
|
|
|
|
clean_session_node_down(NodeDown, Config) ->
|
|
C = connect(?FUNCTION_NAME, Config),
|
|
{ok, _, _} = emqtt:subscribe(C, <<"topic0">>, qos0),
|
|
{ok, _, _} = emqtt:subscribe(C, <<"topic1">>, qos1),
|
|
QsQos0 = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_mqtt_qos0_queue]),
|
|
QsClassic = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]),
|
|
?assertEqual(1, length(QsQos0)),
|
|
?assertEqual(1, length(QsClassic)),
|
|
?assertEqual(2, rpc(Config, rabbit_amqqueue, count, [])),
|
|
|
|
unlink(C),
|
|
ok = rabbit_ct_broker_helpers:NodeDown(Config, 0),
|
|
ok = rabbit_ct_broker_helpers:start_node(Config, 0),
|
|
|
|
%% After terminating a clean session by a node crash, we expect any session
|
|
%% state to be cleaned up on the server once the server comes back up.
|
|
?assertEqual(0, rpc(Config, rabbit_amqqueue, count, [])).
|
|
|
|
rabbit_status_connection_count(Config) ->
|
|
_Pid = rabbit_ct_client_helpers:open_connection(Config, 0),
|
|
C = connect(?FUNCTION_NAME, Config),
|
|
|
|
{ok, String} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["status"]),
|
|
?assertNotEqual(nomatch, string:find(String, "Connection count: 2")),
|
|
|
|
ok = emqtt:disconnect(C).
|
|
|
|
trace(Config) ->
|
|
Server = atom_to_binary(get_node_config(Config, 0, nodename)),
|
|
Topic = Payload = TraceQ = atom_to_binary(?FUNCTION_NAME),
|
|
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
|
declare_queue(Ch, TraceQ, []),
|
|
#'queue.bind_ok'{} = amqp_channel:call(
|
|
Ch, #'queue.bind'{queue = TraceQ,
|
|
exchange = <<"amq.rabbitmq.trace">>,
|
|
routing_key = <<"#">>}),
|
|
|
|
%% We expect traced messages for connections created before and connections
|
|
%% created after tracing is enabled.
|
|
Pub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_publisher">>, Config),
|
|
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_on"]),
|
|
Sub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_subscriber">>, Config),
|
|
|
|
{ok, _, [0]} = emqtt:subscribe(Sub, Topic, qos0),
|
|
{ok, _} = emqtt:publish(Pub, Topic, Payload, qos1),
|
|
ok = expect_publishes(Sub, Topic, [Payload]),
|
|
timer:sleep(10),
|
|
|
|
{#'basic.get_ok'{routing_key = <<"publish.amq.topic">>},
|
|
#amqp_msg{props = #'P_basic'{headers = PublishHeaders},
|
|
payload = Payload}} =
|
|
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ}),
|
|
?assertMatch(#{<<"exchange_name">> := <<"amq.topic">>,
|
|
<<"routing_keys">> := [Topic],
|
|
<<"connection">> := <<"127.0.0.1:", _/binary>>,
|
|
<<"node">> := Server,
|
|
<<"vhost">> := <<"/">>,
|
|
<<"channel">> := 0,
|
|
<<"user">> := <<"guest">>,
|
|
<<"properties">> := #{<<"delivery_mode">> := 2},
|
|
<<"routed_queues">> := [<<"mqtt-subscription-trace_subscriberqos0">>]},
|
|
rabbit_misc:amqp_table(PublishHeaders)),
|
|
|
|
{#'basic.get_ok'{routing_key = <<"deliver.mqtt-subscription-trace_subscriberqos0">>},
|
|
#amqp_msg{props = #'P_basic'{headers = DeliverHeaders},
|
|
payload = Payload}} =
|
|
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ}),
|
|
?assertMatch(#{<<"exchange_name">> := <<"amq.topic">>,
|
|
<<"routing_keys">> := [Topic],
|
|
<<"connection">> := <<"127.0.0.1:", _/binary>>,
|
|
<<"node">> := Server,
|
|
<<"vhost">> := <<"/">>,
|
|
<<"channel">> := 0,
|
|
<<"user">> := <<"guest">>,
|
|
<<"properties">> := #{<<"delivery_mode">> := 2},
|
|
<<"redelivered">> := 0},
|
|
rabbit_misc:amqp_table(DeliverHeaders)),
|
|
|
|
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_off"]),
|
|
{ok, _} = emqtt:publish(Pub, Topic, Payload, qos1),
|
|
ok = expect_publishes(Sub, Topic, [Payload]),
|
|
?assertMatch(#'basic.get_empty'{},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ})),
|
|
|
|
delete_queue(Ch, TraceQ),
|
|
[ok = emqtt:disconnect(C) || C <- [Pub, Sub]],
|
|
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
|
|
|
trace_large_message(Config) ->
|
|
TraceQ = <<"trace-queue">>,
|
|
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
|
declare_queue(Ch, TraceQ, []),
|
|
#'queue.bind_ok'{} = amqp_channel:call(
|
|
Ch, #'queue.bind'{queue = TraceQ,
|
|
exchange = <<"amq.rabbitmq.trace">>,
|
|
routing_key = <<"deliver.*">>}),
|
|
C = connect(<<"my-client">>, Config),
|
|
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_on"]),
|
|
{ok, _, [0]} = emqtt:subscribe(C, <<"/my/topic">>),
|
|
Payload0 = binary:copy(<<"x">>, 1_000_000),
|
|
Payload = <<Payload0/binary, "y">>,
|
|
amqp_channel:call(Ch,
|
|
#'basic.publish'{exchange = <<"amq.topic">>,
|
|
routing_key = <<".my.topic">>},
|
|
#amqp_msg{payload = Payload}),
|
|
ok = expect_publishes(C, <<"/my/topic">>, [Payload]),
|
|
timer:sleep(10),
|
|
?assertMatch(
|
|
{#'basic.get_ok'{routing_key = <<"deliver.mqtt-subscription-my-clientqos0">>},
|
|
#amqp_msg{payload = Payload}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ})
|
|
),
|
|
|
|
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_off"]),
|
|
delete_queue(Ch, TraceQ),
|
|
ok = emqtt:disconnect(C),
|
|
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
|
|
|
max_packet_size_unauthenticated(Config) ->
|
|
ClientId = ?FUNCTION_NAME,
|
|
Opts = [{will_topic, <<"will/topic">>}],
|
|
|
|
{C1, Connect} = util:start_client(
|
|
ClientId, Config, 0,
|
|
[{will_payload, binary:copy(<<"a">>, 64_000)} | Opts]),
|
|
?assertMatch({ok, _}, Connect(C1)),
|
|
ok = emqtt:disconnect(C1),
|
|
|
|
Key = mqtt_max_packet_size_unauthenticated,
|
|
OldMaxSize = rpc(Config, persistent_term, get, [Key]),
|
|
MaxSize = 500,
|
|
ok = rpc(Config, persistent_term, put, [Key, MaxSize]),
|
|
|
|
{C2, Connect} = util:start_client(
|
|
ClientId, Config, 0,
|
|
[{will_payload, binary:copy(<<"b">>, MaxSize + 1)} | Opts]),
|
|
true = unlink(C2),
|
|
?assertMatch({error, _}, Connect(C2)),
|
|
|
|
{C3, Connect} = util:start_client(
|
|
ClientId, Config, 0,
|
|
[{will_payload, binary:copy(<<"c">>, round(MaxSize / 2))} | Opts]),
|
|
?assertMatch({ok, _}, Connect(C3)),
|
|
ok = emqtt:disconnect(C3),
|
|
|
|
ok = rpc(Config, persistent_term, put, [Key, OldMaxSize]).
|
|
|
|
max_packet_size_authenticated(Config) ->
|
|
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
Key = mqtt_max_packet_size_authenticated,
|
|
OldMaxSize = rpc(Config, persistent_term, get, [Key]),
|
|
MaxSize = 500,
|
|
ok = rpc(Config, persistent_term, put, [Key, MaxSize]),
|
|
|
|
{C, Connect} = util:start_client(ClientId, Config, 0, []),
|
|
{ok, ConnAckProps} = Connect(C),
|
|
process_flag(trap_exit, true),
|
|
ok = emqtt:publish(C, Topic, binary:copy(<<"x">>, MaxSize + 1), qos0),
|
|
await_exit(C),
|
|
case ?config(mqtt_version, Config) of
|
|
v3 -> ok;
|
|
v4 -> ok;
|
|
v5 -> ?assertMatch(#{'Maximum-Packet-Size' := MaxSize}, ConnAckProps),
|
|
receive {disconnected, _ReasonCodePacketTooLarge = 149, _Props} -> ok
|
|
after ?TIMEOUT -> ct:fail("missing DISCONNECT packet from server")
|
|
end
|
|
end,
|
|
ok = rpc(Config, persistent_term, put, [Key, OldMaxSize]).
|
|
|
|
%% Test that the per vhost default queue type introduced in
|
|
%% https://github.com/rabbitmq/rabbitmq-server/pull/5305
|
|
%% does not apply to queues created for MQTT connections
|
|
%% because having millions of quorum queues is too expensive.
|
|
default_queue_type(Config) ->
|
|
Server = get_node_config(Config, 0, nodename),
|
|
QName = Vhost = ClientId = Topic = atom_to_binary(?FUNCTION_NAME),
|
|
ok = erpc:call(Server, rabbit_vhost, add, [Vhost,
|
|
#{default_queue_type => <<"quorum">>},
|
|
<<"acting-user">>]),
|
|
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, Vhost),
|
|
|
|
?assertEqual([], rpc(Config, rabbit_amqqueue, list, [])),
|
|
%% Sanity check that the configured default queue type works with AMQP 0.9.1.
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Server, Vhost),
|
|
{ok, Ch} = amqp_connection:open_channel(Conn),
|
|
declare_queue(Ch, QName, []),
|
|
QuorumQueues = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_quorum_queue]),
|
|
?assertEqual(1, length(QuorumQueues)),
|
|
delete_queue(Ch, QName),
|
|
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
|
|
|
|
%% Test that the configured default queue type does not apply to MQTT.
|
|
Creds = [{username, <<Vhost/binary, ":guest">>},
|
|
{password, <<"guest">>}],
|
|
C1 = connect(ClientId, Config, Creds ++ non_clean_sess_opts()),
|
|
{ok, _, [1]} = emqtt:subscribe(C1, Topic, qos1),
|
|
ClassicQueues = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]),
|
|
?assertEqual(1, length(ClassicQueues)),
|
|
|
|
ok = emqtt:disconnect(C1),
|
|
C2 = connect(ClientId, Config, [{clean_start, true} | Creds]),
|
|
ok = emqtt:disconnect(C2),
|
|
ok = rabbit_ct_broker_helpers:delete_vhost(Config, Vhost).
|
|
|
|
message_interceptors(Config) ->
|
|
ok = rpc(Config, persistent_term, put,
|
|
[message_interceptors,
|
|
[
|
|
{rabbit_mqtt_msg_interceptor_client_id, #{}},
|
|
{rabbit_msg_interceptor_timestamp, #{overwrite => false,
|
|
incoming => true,
|
|
outgoing => true}}
|
|
]]),
|
|
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
|
Payload = Topic = atom_to_binary(?FUNCTION_NAME),
|
|
ClientId = <<"🆔"/utf8>>,
|
|
CQName = <<"my classic queue">>,
|
|
Stream = <<"my stream">>,
|
|
declare_queue(Ch, CQName, [{<<"x-queue-type">>, longstr, <<"classic">>}]),
|
|
declare_queue(Ch, Stream, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
|
|
bind(Ch, CQName, Topic),
|
|
bind(Ch, Stream, Topic),
|
|
C = connect(ClientId, Config),
|
|
|
|
NowSecs = os:system_time(second),
|
|
NowMillis = os:system_time(millisecond),
|
|
{ok, _} = emqtt:publish(C, Topic, Payload, qos1),
|
|
|
|
{#'basic.get_ok'{},
|
|
#amqp_msg{payload = Payload,
|
|
props = #'P_basic'{
|
|
timestamp = Secs,
|
|
headers = Headers
|
|
}}
|
|
} = amqp_channel:call(Ch, #'basic.get'{queue = CQName}),
|
|
|
|
{_, long, ReceivedTs} = lists:keyfind(<<"timestamp_in_ms">>, 1, Headers),
|
|
?assert(Secs < NowSecs + 9),
|
|
?assert(Secs > NowSecs - 9),
|
|
?assert(ReceivedTs < NowMillis + 9000),
|
|
?assert(ReceivedTs > NowMillis - 9000),
|
|
{_, long, SentTs} = lists:keyfind(<<"x-opt-rabbitmq-sent-time">>, 1, Headers),
|
|
?assert(SentTs < NowMillis + 9000),
|
|
?assert(SentTs > NowMillis - 9000),
|
|
|
|
?assertEqual({<<"x-opt-mqtt-client-id">>, longstr, ClientId},
|
|
lists:keyfind(<<"x-opt-mqtt-client-id">>, 1, Headers)),
|
|
|
|
#'basic.qos_ok'{} = amqp_channel:call(Ch, #'basic.qos'{prefetch_count = 1}),
|
|
CTag = <<"my ctag">>,
|
|
#'basic.consume_ok'{} = amqp_channel:subscribe(
|
|
Ch,
|
|
#'basic.consume'{
|
|
queue = Stream,
|
|
consumer_tag = CTag,
|
|
arguments = [{<<"x-stream-offset">>, longstr, <<"first">>}]},
|
|
self()),
|
|
receive {#'basic.deliver'{consumer_tag = CTag},
|
|
#amqp_msg{payload = Payload,
|
|
props = #'P_basic'{
|
|
headers = [{<<"timestamp_in_ms">>, long, ReceivedTs} | XHeaders]
|
|
}}} ->
|
|
?assertEqual({<<"x-opt-mqtt-client-id">>, longstr, ClientId},
|
|
lists:keyfind(<<"x-opt-mqtt-client-id">>, 1, XHeaders)),
|
|
|
|
{_, long, SentTs1} = lists:keyfind(<<"x-opt-rabbitmq-sent-time">>, 1, XHeaders),
|
|
?assert(SentTs1 < NowMillis + 9000),
|
|
?assert(SentTs1 > NowMillis - 9000)
|
|
after ?TIMEOUT -> ct:fail(missing_deliver)
|
|
end,
|
|
|
|
delete_queue(Ch, Stream),
|
|
delete_queue(Ch, CQName),
|
|
ok = rpc(Config, persistent_term, put, [message_interceptors, []]),
|
|
ok = emqtt:disconnect(C),
|
|
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
|
|
|
%% This test makes sure that a retained message that got written in 3.12 or earlier
|
|
%% can be consumed in 3.13 or later.
|
|
retained_message_conversion(Config) ->
|
|
Topic = <<"a/b">>,
|
|
Payload = <<"my retained msg">>,
|
|
OldMqttMsgFormat = {mqtt_msg, _Retain = true, _QoS = 1, Topic, _Dup = false, _PktId = 1, Payload},
|
|
RetainerPid = rpc(Config, rabbit_mqtt_retainer_sup, start_child_for_vhost, [<<"/">>]),
|
|
{rabbit_mqtt_retainer, StoreState, _} = sys:get_state(RetainerPid),
|
|
ok = rpc(Config, rabbit_mqtt_retained_msg_store, insert, [Topic, OldMqttMsgFormat, StoreState]),
|
|
|
|
C = connect(?FUNCTION_NAME, Config),
|
|
{ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
|
|
receive {publish, #{client_pid := C,
|
|
dup := false,
|
|
qos := 1,
|
|
retain := true,
|
|
topic := Topic,
|
|
payload := Payload}} -> ok
|
|
after ?TIMEOUT -> ct:fail("missing retained message")
|
|
end,
|
|
ok = emqtt:publish(C, Topic, <<>>, [{retain, true}]),
|
|
ok = emqtt:disconnect(C).
|
|
|
|
%% Test that the server can handle UTF-8 encoded strings.
|
|
utf8(Config) ->
|
|
C = connect(?FUNCTION_NAME, Config),
|
|
% "The Topic Name MUST be present as the first field in the PUBLISH Packet Variable header.
|
|
% It MUST be a UTF-8 encoded string [MQTT-3.3.2-1] as defined in section 1.5.3."
|
|
Topic = <<"うさぎ"/utf8>>, %% Rabbit in Japanese
|
|
{ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
|
|
{ok, _} = emqtt:publish(C, Topic, <<"msg">>, qos1),
|
|
ok = expect_publishes(C, Topic, [<<"msg">>]),
|
|
ok = emqtt:disconnect(C).
|
|
|
|
bind_exchange_to_exchange(Config) ->
|
|
SourceX = <<"amq.topic">>,
|
|
DestinationX = <<"destination">>,
|
|
Q = <<"q">>,
|
|
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
|
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DestinationX,
|
|
durable = true,
|
|
auto_delete = true}),
|
|
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q,
|
|
durable = true}),
|
|
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = DestinationX,
|
|
queue = Q,
|
|
routing_key = <<"a.b">>}),
|
|
#'exchange.bind_ok'{} = amqp_channel:call(Ch, #'exchange.bind'{destination = DestinationX,
|
|
source = SourceX,
|
|
routing_key = <<"*.b">>}),
|
|
C = connect(?FUNCTION_NAME, Config),
|
|
%% Message should be routed as follows: SourceX -> DestinationX -> Q
|
|
{ok, _} = emqtt:publish(C, <<"a/b">>, <<"msg">>, qos1),
|
|
eventually(?_assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q}))),
|
|
#'queue.delete_ok'{message_count = 0} = amqp_channel:call(Ch, #'queue.delete'{queue = Q}),
|
|
ok = emqtt:disconnect(C),
|
|
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
|
|
|
bind_exchange_to_exchange_single_message(Config) ->
|
|
SourceX = <<"amq.topic">>,
|
|
DestinationX = <<"destination">>,
|
|
Q = <<"q">>,
|
|
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
|
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DestinationX,
|
|
durable = true,
|
|
auto_delete = true}),
|
|
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q,
|
|
durable = true}),
|
|
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = Q,
|
|
exchange = DestinationX,
|
|
routing_key = <<"a.b">>}),
|
|
#'exchange.bind_ok'{} = amqp_channel:call(Ch, #'exchange.bind'{destination = DestinationX,
|
|
source = SourceX,
|
|
routing_key = <<"*.b">>}),
|
|
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = Q,
|
|
exchange = SourceX,
|
|
routing_key = <<"a.b">>}),
|
|
C = connect(?FUNCTION_NAME, Config),
|
|
%% Message should be routed as follows:
|
|
%% SourceX -> DestinationX -> Q and
|
|
%% SourceX -> Q
|
|
{ok, _} = emqtt:publish(C, <<"a/b">>, <<"msg">>, qos1),
|
|
%% However, since we publish only one time a single message and have a single destination queue,
|
|
%% we expect only one copy of the message to end up in the destination queue.
|
|
eventually(?_assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg">>}},
|
|
amqp_channel:call(Ch, #'basic.get'{queue = Q}))),
|
|
timer:sleep(10),
|
|
?assertEqual(#'queue.delete_ok'{message_count = 0},
|
|
amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
|
|
ok = emqtt:disconnect(C),
|
|
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
|
|
|
notify_consumer_qos0_queue_deleted(Config) ->
|
|
Topic = atom_to_binary(?FUNCTION_NAME),
|
|
notify_consumer_queue_deleted(Config, Topic, <<"MQTT QoS 0">>, [{retry_interval, 1}], qos0).
|
|
|
|
notify_consumer_classic_queue_deleted(Config) ->
|
|
Topic = atom_to_binary(?FUNCTION_NAME),
|
|
notify_consumer_queue_deleted(Config, Topic, <<"classic">>, non_clean_sess_opts(), qos0).
|
|
|
|
notify_consumer_quorum_queue_deleted(Config) ->
|
|
set_durable_queue_type(Config),
|
|
Topic = atom_to_binary(?FUNCTION_NAME),
|
|
notify_consumer_queue_deleted(Config, Topic, <<"quorum">>, non_clean_sess_opts(), qos1),
|
|
unset_durable_queue_type(Config).
|
|
|
|
notify_consumer_queue_deleted(Config, Name = Topic, ExpectedType, ConnOpts, Qos) ->
|
|
C = connect(Name, Config, ConnOpts),
|
|
{ok, _, _} = emqtt:subscribe(C, Topic, Qos),
|
|
{ok, #{reason_code_name := success}} = emqtt:publish(C, Name, <<"m1">>, qos1),
|
|
{ok, #{reason_code_name := success}} = emqtt:publish(C, Name, <<"m2">>, qos1),
|
|
ok = expect_publishes(C, Topic, [<<"m1">>, <<"m2">>]),
|
|
|
|
[[QName, Type]] = rabbitmqctl_list(Config, 0, ["list_queues", "name", "type", "--no-table-headers"]),
|
|
?assertMatch(ExpectedType, Type),
|
|
|
|
process_flag(trap_exit, true),
|
|
{ok, _} = rabbitmqctl(Config, 0, ["delete_queue", QName]),
|
|
|
|
await_exit(C).
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Internal helpers
|
|
%% -------------------------------------------------------------------
|
|
|
|
await_confirms_ordered(_, To, To) ->
|
|
ok;
|
|
await_confirms_ordered(From, N, To) ->
|
|
Expected = {From, N},
|
|
receive
|
|
Expected ->
|
|
await_confirms_ordered(From, N + 1, To);
|
|
Got ->
|
|
ct:fail("Received unexpected message. Expected: ~p Got: ~p", [Expected, Got])
|
|
after ?TIMEOUT ->
|
|
ct:fail("Did not receive expected message: ~p", [Expected])
|
|
end.
|
|
|
|
await_confirms_unordered(_, 0) ->
|
|
ok;
|
|
await_confirms_unordered(From, Left) ->
|
|
receive
|
|
{From, _N} ->
|
|
await_confirms_unordered(From, Left - 1);
|
|
Other ->
|
|
ct:fail("Received unexpected message: ~p", [Other])
|
|
after ?TIMEOUT ->
|
|
ct:fail("~b confirms are missing", [Left])
|
|
end.
|
|
|
|
await_consumer_count(ConsumerCount, ClientId, QoS, Config) ->
|
|
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
|
QueueName = rabbit_mqtt_util:queue_name_bin(
|
|
rabbit_data_coercion:to_binary(ClientId), QoS),
|
|
eventually(
|
|
?_assertMatch(
|
|
#'queue.declare_ok'{consumer_count = ConsumerCount},
|
|
amqp_channel:call(Ch, #'queue.declare'{queue = QueueName,
|
|
passive = true})), 500, 10),
|
|
ok = rabbit_ct_client_helpers:close_channel(Ch).
|
|
|
|
declare_queue(Ch, QueueName, Args)
|
|
when is_pid(Ch), is_binary(QueueName), is_list(Args) ->
|
|
#'queue.declare_ok'{} = amqp_channel:call(
|
|
Ch, #'queue.declare'{
|
|
queue = QueueName,
|
|
durable = true,
|
|
arguments = Args}).
|
|
|
|
delete_queue(Ch, QueueNames)
|
|
when is_pid(Ch), is_list(QueueNames) ->
|
|
lists:foreach(
|
|
fun(Q) ->
|
|
delete_queue(Ch, Q)
|
|
end, QueueNames);
|
|
delete_queue(Ch, QueueName)
|
|
when is_pid(Ch), is_binary(QueueName) ->
|
|
#'queue.delete_ok'{} = amqp_channel:call(
|
|
Ch, #'queue.delete'{
|
|
queue = QueueName}).
|
|
|
|
bind(Ch, QueueName, Topic)
|
|
when is_pid(Ch), is_binary(QueueName), is_binary(Topic) ->
|
|
#'queue.bind_ok'{} = amqp_channel:call(
|
|
Ch, #'queue.bind'{queue = QueueName,
|
|
exchange = <<"amq.topic">>,
|
|
routing_key = Topic}).
|
|
|
|
assert_v5_disconnect_reason_code(Config, ReasonCode) ->
|
|
case ?config(mqtt_version, Config) of
|
|
v3 -> ok;
|
|
v4 -> ok;
|
|
v5 -> receive {disconnected, ReasonCode, _Props} -> ok
|
|
after ?TIMEOUT -> ct:fail("missing DISCONNECT packet from server")
|
|
end
|
|
end.
|
|
|
|
set_durable_queue_type(Config) ->
|
|
ok = rpc(Config, application, set_env, [rabbitmq_mqtt, durable_queue_type, quorum]).
|
|
|
|
unset_durable_queue_type(Config) ->
|
|
ok = rpc(Config, application, unset_env, [rabbitmq_mqtt, durable_queue_type]).
|