343 lines
13 KiB
Erlang
343 lines
13 KiB
Erlang
%% This Source Code Form is subject to the terms of the Mozilla Public
|
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
%%
|
|
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
|
%%
|
|
-module(reader_SUITE).
|
|
-compile([export_all,
|
|
nowarn_export_all]).
|
|
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
-import(rabbit_ct_broker_helpers, [rpc/4]).
|
|
-import(rabbit_ct_helpers, [eventually/3]).
|
|
-import(util, [all_connection_pids/1,
|
|
publish_qos1_timeout/4,
|
|
expect_publishes/3,
|
|
connect/2, connect/3,
|
|
await_exit/1,
|
|
non_clean_sess_opts/0
|
|
]).
|
|
|
|
all() ->
|
|
[
|
|
{group, v4},
|
|
{group, v5}
|
|
].
|
|
|
|
groups() ->
|
|
[
|
|
{v4, [shuffle], tests()},
|
|
{v5, [shuffle], tests()}
|
|
].
|
|
|
|
tests() ->
|
|
[
|
|
block_connack_timeout,
|
|
handle_invalid_packets,
|
|
login_timeout,
|
|
stats,
|
|
quorum_clean_session_false,
|
|
quorum_clean_session_true,
|
|
classic_clean_session_true,
|
|
classic_clean_session_false,
|
|
event_authentication_failure,
|
|
rabbit_mqtt_qos0_queue_overflow
|
|
].
|
|
|
|
suite() ->
|
|
[{timetrap, {minutes, 3}}].
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Testsuite setup/teardown.
|
|
%% -------------------------------------------------------------------
|
|
|
|
merge_app_env(Config) ->
|
|
rabbit_ct_helpers:merge_app_env(
|
|
Config, {rabbit, [{collect_statistics, basic},
|
|
{collect_statistics_interval, 100}
|
|
]}).
|
|
|
|
init_per_suite(Config) ->
|
|
rabbit_ct_helpers:log_environment(),
|
|
Config1 = rabbit_ct_helpers:set_config(
|
|
Config,
|
|
[{rmq_nodename_suffix, ?MODULE},
|
|
{start_rmq_with_plugins_disabled, true}
|
|
]),
|
|
Config2 = rabbit_ct_helpers:run_setup_steps(
|
|
Config1,
|
|
[fun merge_app_env/1] ++
|
|
rabbit_ct_broker_helpers:setup_steps() ++
|
|
rabbit_ct_client_helpers:setup_steps()),
|
|
util:enable_plugin(Config2, rabbitmq_mqtt),
|
|
Config2.
|
|
|
|
end_per_suite(Config) ->
|
|
rabbit_ct_helpers:run_teardown_steps(Config,
|
|
rabbit_ct_client_helpers:teardown_steps() ++
|
|
rabbit_ct_broker_helpers:teardown_steps()).
|
|
|
|
init_per_group(Group, Config) ->
|
|
rabbit_ct_helpers:set_config(Config, {mqtt_version, Group}).
|
|
|
|
end_per_group(_, Config) ->
|
|
Config.
|
|
|
|
init_per_testcase(Testcase, Config) ->
|
|
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
|
|
|
end_per_testcase(Testcase, Config) ->
|
|
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Testsuite cases
|
|
%% -------------------------------------------------------------------
|
|
|
|
block_connack_timeout(Config) ->
|
|
P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
|
|
Ports = rpc(Config, erlang, ports, []),
|
|
|
|
DefaultWatermark = rpc(Config, vm_memory_monitor, get_vm_memory_high_watermark, []),
|
|
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0]),
|
|
%% Let connection block.
|
|
timer:sleep(100),
|
|
|
|
%% We can still connect via TCP, but CONNECT packet will not be processed on the server.
|
|
{ok, Client} = emqtt:start_link([{host, "localhost"},
|
|
{port, P},
|
|
{clientid, atom_to_binary(?FUNCTION_NAME)},
|
|
{proto_ver, ?config(mqtt_version, Config)},
|
|
{connect_timeout, 1}]),
|
|
unlink(Client),
|
|
ClientMRef = monitor(process, Client),
|
|
{error, connack_timeout} = emqtt:connect(Client),
|
|
receive {'DOWN', ClientMRef, process, Client, connack_timeout} -> ok
|
|
after 30_000 -> ct:fail("missing connack_timeout in client")
|
|
end,
|
|
|
|
MqttReader = rpc(Config, ?MODULE, mqtt_connection_pid, [Ports]),
|
|
MqttReaderMRef = monitor(process, MqttReader),
|
|
|
|
%% Unblock connection. CONNECT packet will be processed on the server.
|
|
rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [DefaultWatermark]),
|
|
|
|
receive {'DOWN', MqttReaderMRef, process, MqttReader, {shutdown, {socket_ends, einval}}} ->
|
|
%% We expect that MQTT reader process exits (without crashing)
|
|
%% because our client already disconnected.
|
|
ok
|
|
after 30_000 -> ct:fail("missing peername_not_known from server")
|
|
end,
|
|
%% Ensure that our client is not registered.
|
|
?assertEqual([], all_connection_pids(Config)),
|
|
ok.
|
|
|
|
mqtt_connection_pid(ExistingPorts) ->
|
|
NewPorts = erlang:ports() -- ExistingPorts,
|
|
%% Server creates 1 new TCP port to handle our MQTT connection.
|
|
[MqttConnectionPort] = lists:filter(fun(P) ->
|
|
erlang:port_info(P, name) =:= {name, "tcp_inet"}
|
|
end, NewPorts),
|
|
{connected, MqttConnectionPid} = erlang:port_info(MqttConnectionPort, connected),
|
|
MqttConnectionPid.
|
|
|
|
handle_invalid_packets(Config) ->
|
|
N = rpc(Config, ets, info, [connection_metrics, size]),
|
|
P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
|
|
{ok, C} = gen_tcp:connect("localhost", P, []),
|
|
Bin = <<"GET / HTTP/1.1\r\nHost: www.rabbitmq.com\r\nUser-Agent: curl/7.43.0\r\nAccept: */*">>,
|
|
gen_tcp:send(C, Bin),
|
|
gen_tcp:close(C),
|
|
%% Wait for stats being emitted (every 100ms)
|
|
timer:sleep(300),
|
|
%% No new stats entries should be inserted as connection never got to initialize
|
|
?assertEqual(N, rpc(Config, ets, info, [connection_metrics, size])).
|
|
|
|
login_timeout(Config) ->
|
|
App = rabbitmq_mqtt,
|
|
Par = ?FUNCTION_NAME,
|
|
ok = rpc(Config, application, set_env, [App, Par, 400]),
|
|
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
|
|
{ok, Socket} = gen_tcp:connect("localhost", Port, [{active, false}]),
|
|
?assertEqual({error, closed}, gen_tcp:recv(Socket, 0, 500)),
|
|
ok = rpc(Config, application, unset_env, [App, Par]).
|
|
|
|
stats(Config) ->
|
|
C = connect(?FUNCTION_NAME, Config),
|
|
%% Wait for stats being emitted (every 100ms)
|
|
timer:sleep(300),
|
|
%% Retrieve the connection Pid
|
|
[Pid] = all_connection_pids(Config),
|
|
[{pid, Pid}] = rpc(Config, rabbit_mqtt_reader, info, [Pid, [pid]]),
|
|
%% Verify the content of the metrics, garbage_collection must be present
|
|
[{Pid, Props}] = rpc(Config, ets, lookup, [connection_metrics, Pid]),
|
|
true = proplists:is_defined(garbage_collection, Props),
|
|
%% If the coarse entry is present, stats were successfully emitted
|
|
[{Pid, _, _, _, _}] = rpc(Config, ets, lookup,
|
|
[connection_coarse_metrics, Pid]),
|
|
ok = emqtt:disconnect(C).
|
|
|
|
get_durable_queue_type(Server, QNameBin) ->
|
|
QName = rabbit_misc:r(<<"/">>, queue, QNameBin),
|
|
{ok, Q} = rpc:call(Server, rabbit_amqqueue, lookup, [QName]),
|
|
amqqueue:get_type(Q).
|
|
|
|
set_env(QueueType) ->
|
|
application:set_env(rabbitmq_mqtt, durable_queue_type, QueueType).
|
|
|
|
get_env() ->
|
|
rabbit_mqtt_util:env(durable_queue_type).
|
|
|
|
validate_durable_queue_type(Config, ClientName, Opts, ExpectedQueueType) ->
|
|
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
C = connect(ClientName, Config, Opts),
|
|
{ok, _, _} = emqtt:subscribe(C, <<"TopicB">>, qos1),
|
|
ok = emqtt:publish(C, <<"TopicB">>, <<"Payload">>),
|
|
ok = expect_publishes(C, <<"TopicB">>, [<<"Payload">>]),
|
|
{ok, _, _} = emqtt:unsubscribe(C, <<"TopicB">>),
|
|
Prefix = <<"mqtt-subscription-">>,
|
|
Suffix = <<"qos1">>,
|
|
QNameBin = <<Prefix/binary, ClientName/binary, Suffix/binary>>,
|
|
?assertEqual(ExpectedQueueType, get_durable_queue_type(Server, QNameBin)),
|
|
ok = emqtt:disconnect(C).
|
|
|
|
quorum_clean_session_false(Config) ->
|
|
Default = rpc(Config, reader_SUITE, get_env, []),
|
|
rpc(Config, reader_SUITE, set_env, [quorum]),
|
|
validate_durable_queue_type(
|
|
Config, <<"quorumCleanSessionFalse">>, non_clean_sess_opts(), rabbit_quorum_queue),
|
|
rpc(Config, reader_SUITE, set_env, [Default]).
|
|
|
|
quorum_clean_session_true(Config) ->
|
|
Default = rpc(Config, reader_SUITE, get_env, []),
|
|
rpc(Config, reader_SUITE, set_env, [quorum]),
|
|
%% Since we use a clean session and quorum queues cannot be auto-delete or exclusive,
|
|
%% we expect a classic queue.
|
|
validate_durable_queue_type(
|
|
Config, <<"quorumCleanSessionTrue">>, [{clean_start, true}], rabbit_classic_queue),
|
|
rpc(Config, reader_SUITE, set_env, [Default]).
|
|
|
|
classic_clean_session_true(Config) ->
|
|
validate_durable_queue_type(
|
|
Config, <<"classicCleanSessionTrue">>, [{clean_start, true}], rabbit_classic_queue).
|
|
|
|
classic_clean_session_false(Config) ->
|
|
validate_durable_queue_type(
|
|
Config, <<"classicCleanSessionFalse">>, non_clean_sess_opts(), rabbit_classic_queue).
|
|
|
|
event_authentication_failure(Config) ->
|
|
{ok, C} = emqtt:start_link(
|
|
[{username, <<"Trudy">>},
|
|
{password, <<"fake-password">>},
|
|
{host, "localhost"},
|
|
{port, rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt)},
|
|
{clientid, atom_to_binary(?FUNCTION_NAME)},
|
|
{proto_ver, ?config(mqtt_version, Config)}]),
|
|
true = unlink(C),
|
|
|
|
ok = rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config, event_recorder),
|
|
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
ok = gen_event:add_handler({rabbit_event, Server}, event_recorder, []),
|
|
|
|
?assertMatch({error, _}, emqtt:connect(C)),
|
|
|
|
[E | _] = util:get_events(Server, user_authentication_failure),
|
|
util:assert_event_type(user_authentication_failure, E),
|
|
util:assert_event_prop([{name, <<"Trudy">>},
|
|
{connection_type, network}],
|
|
E),
|
|
|
|
ok = gen_event:delete_handler({rabbit_event, Server}, event_recorder, []).
|
|
|
|
%% Test that queue type rabbit_mqtt_qos0_queue drops QoS 0 messages when its
|
|
%% max length is reached.
|
|
rabbit_mqtt_qos0_queue_overflow(Config) ->
|
|
ProtoVer = case ?config(mqtt_version, Config) of
|
|
v4 -> mqtt311;
|
|
v5 -> mqtt50
|
|
end,
|
|
QType = rabbit_mqtt_qos0_queue,
|
|
|
|
#{
|
|
[{protocol, ProtoVer}, {queue_type, QType}] :=
|
|
#{messages_delivered_total := 0,
|
|
messages_delivered_consume_auto_ack_total := 0},
|
|
|
|
[{queue_type, QType}, {dead_letter_strategy, disabled}] :=
|
|
#{messages_dead_lettered_maxlen_total := NumDeadLettered}
|
|
} = rabbit_ct_broker_helpers:rpc(Config, rabbit_global_counters, overview, []),
|
|
|
|
Topic = atom_to_binary(?FUNCTION_NAME),
|
|
Msg = binary:copy(<<"x">>, 4000),
|
|
NumMsgs = 10_000,
|
|
|
|
%% Provoke TCP back-pressure from client to server by using very small buffers.
|
|
Opts = [{tcp_opts, [{recbuf, 256},
|
|
{buffer, 256}]}],
|
|
Sub = connect(<<"subscriber">>, Config, Opts),
|
|
{ok, _, [0]} = emqtt:subscribe(Sub, Topic, qos0),
|
|
[ServerConnectionPid] = all_connection_pids(Config),
|
|
|
|
%% Suspend the receiving client such that it stops reading from its socket
|
|
%% causing TCP back-pressure to the server being applied.
|
|
true = erlang:suspend_process(Sub),
|
|
|
|
%% Let's overflow the receiving server MQTT connection process
|
|
%% (i.e. the rabbit_mqtt_qos0_queue) by sending many large messages.
|
|
Pub = connect(<<"publisher">>, Config),
|
|
lists:foreach(fun(_) ->
|
|
ok = emqtt:publish(Pub, Topic, Msg, qos0)
|
|
end, lists:seq(1, NumMsgs)),
|
|
|
|
%% Give the server some time to process (either send or drop) the messages.
|
|
timer:sleep(2500),
|
|
|
|
%% Let's resume the receiving client to receive any remaining messages that did
|
|
%% not get dropped.
|
|
true = erlang:resume_process(Sub),
|
|
NumReceived = num_received(Topic, Msg, 0),
|
|
|
|
{status, _, _, [_, _, _, _, Misc]} = sys:get_status(ServerConnectionPid),
|
|
[State] = [S || {data, [{"State", S}]} <- Misc],
|
|
#{proc_state := #{qos0_messages_dropped := NumDropped}} = State,
|
|
|
|
ct:pal("NumReceived=~b NumDropped=~b", [NumReceived, NumDropped]),
|
|
|
|
%% We expect that
|
|
%% 1. all sent messages were either received or dropped
|
|
?assertEqual(NumMsgs, NumReceived + NumDropped),
|
|
%% 2. at least one message was dropped (otherwise our whole test case did not
|
|
%% test what it was supposed to test: that messages are dropped due to the
|
|
%% server being overflowed with messages while the client receives too slowly)
|
|
?assert(NumDropped >= 1),
|
|
%% 3. we received at least 200 messages because everything below the default
|
|
%% of mailbox_soft_limit=200 should not be dropped
|
|
?assert(NumReceived >= 200),
|
|
|
|
%% Assert that Prometheus metrics counted correctly.
|
|
ExpectedNumDeadLettered = NumDeadLettered + NumDropped,
|
|
?assertMatch(
|
|
#{
|
|
[{protocol, ProtoVer}, {queue_type, QType}] :=
|
|
#{messages_delivered_total := NumReceived,
|
|
messages_delivered_consume_auto_ack_total := NumReceived},
|
|
|
|
[{queue_type, QType}, {dead_letter_strategy, disabled}] :=
|
|
#{messages_dead_lettered_maxlen_total := ExpectedNumDeadLettered}
|
|
},
|
|
rabbit_ct_broker_helpers:rpc(Config, rabbit_global_counters, overview, [])),
|
|
|
|
ok = emqtt:disconnect(Sub),
|
|
ok = emqtt:disconnect(Pub).
|
|
|
|
num_received(Topic, Payload, N) ->
|
|
receive
|
|
{publish, #{topic := Topic,
|
|
payload := Payload}} ->
|
|
num_received(Topic, Payload, N + 1)
|
|
after 3000 ->
|
|
N
|
|
end.
|