Share tests between MQTT and Web MQTT

New test suite deps/rabbitmq_mqtt/test/shared_SUITE contains tests that
are executed against both MQTT and Web MQTT.

This has two major advantages:
1. Eliminates test code duplication between rabbitmq_mqtt and
rabbitmq_web_mqtt making the tests easier to maintain and to understand.
2. Increases test coverage of Web MQTT.

It's acceptable to add a **test** dependency from rabbitmq_mqtt to
rabbitmq_web_mqtt. Obviously, there should be no such dependency
for non-test code.
This commit is contained in:
David Ansari 2023-01-02 17:02:43 +00:00
parent 7c1aa49361
commit d651f87ea7
16 changed files with 662 additions and 857 deletions

View File

@ -38,7 +38,7 @@
add_code_path_to_node/2,
add_code_path_to_all_nodes/2,
rpc/5, rpc/6,
rpc/4, rpc/5, rpc/6,
rpc_all/4, rpc_all/5,
start_node/2,
@ -1558,6 +1558,9 @@ add_code_path_to_all_nodes(Config, Module) ->
|| Nodename <- Nodenames],
ok.
rpc(Config, Module, Function, Args) ->
rpc(Config, 0, Module, Function, Args).
rpc(Config, Node, Module, Function, Args)
when is_atom(Node) andalso Node =/= undefined ->
rpc(Config, Node, Module, Function, Args, infinity);

View File

@ -86,6 +86,7 @@ dialyze(
broker_for_integration_suites(
extra_plugins = [
"//deps/rabbitmq_management:erlang_app",
"//deps/rabbitmq_web_mqtt:erlang_app",
],
)
@ -193,6 +194,7 @@ rabbitmq_integration_suite(
"@emqtt//:erlang_app",
],
additional_beam = [
":event_recorder",
":util",
],
)
@ -226,16 +228,18 @@ rabbitmq_integration_suite(
)
rabbitmq_integration_suite(
name = "integration_SUITE",
size = "large",
runtime_deps = [
"@emqtt//:erlang_app",
"//deps/rabbitmq_management_agent:erlang_app",
],
additional_beam = [
":event_recorder",
":util",
],
name = "shared_SUITE",
size = "large",
timeout = "eternal",
runtime_deps = [
"@emqtt//:erlang_app",
"//deps/rabbitmq_management_agent:erlang_app",
"@gun//:erlang_app",
],
additional_beam = [
":event_recorder",
":util",
],
)
assert_suites()

View File

@ -40,7 +40,7 @@ BUILD_WITHOUT_QUIC=1
export BUILD_WITHOUT_QUIC
DEPS = ranch rabbit_common rabbit amqp_client ra
TEST_DEPS = emqtt ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management
TEST_DEPS = emqtt ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management rabbitmq_web_mqtt
dep_ct_helper = git https://github.com/extend/ct_helper.git master
dep_emqtt = git https://github.com/emqx/emqtt.git 1.7.0-rc.2

View File

@ -76,7 +76,7 @@
%% quorum queues and streams whose soft limit has been exceeded
soft_limit_exceeded = sets:new([{version, 2}]) :: sets:set(),
qos0_messages_dropped = 0 :: non_neg_integer()
}).
}).
-opaque state() :: #state{}.

View File

@ -412,14 +412,14 @@ run_socket(State = #state{ socket = Sock }) ->
rabbit_net:setopts(Sock, [{active, once}]),
State#state{ await_recv = true }.
control_throttle(State = #state{connection_state = Flow,
control_throttle(State = #state{connection_state = ConnState,
conserve = Conserve,
keepalive = KState,
proc_state = PState}) ->
Throttle = Conserve orelse
rabbit_mqtt_processor:soft_limit_exceeded(PState) orelse
credit_flow:blocked(),
case {Flow, Throttle} of
case {ConnState, Throttle} of
{running, true} ->
State#state{connection_state = blocked,
keepalive = rabbit_mqtt_keepalive:cancel_timer(KState)};

View File

@ -9,54 +9,52 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(util, [expect_publishes/2,
-import(util, [expect_publishes/3,
connect/3,
connect/4]).
connect/4,
await_exit/1]).
-import(rabbit_ct_broker_helpers,
[setup_steps/0,
teardown_steps/0,
get_node_config/3,
rabbitmqctl/3,
rpc/5,
stop_node/2,
drain_node/2,
revive_node/2]).
rpc/4,
stop_node/2
]).
-define(OPTS, [{connect_timeout, 1},
{ack_timeout, 1}]).
all() ->
[
{group, cluster_size_3},
{group, cluster_size_5}
].
groups() ->
[
{cluster_size_3, [], [
maintenance
]},
{cluster_size_5, [], [
connection_id_tracking,
connection_id_tracking_on_nodedown,
connection_id_tracking_with_decommissioned_node
]}
{cluster_size_5, [],
[
connection_id_tracking,
connection_id_tracking_on_nodedown,
connection_id_tracking_with_decommissioned_node
]}
].
suite() ->
[{timetrap, {minutes, 5}}].
[{timetrap, {minutes, 3}}].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
merge_app_env(Config) ->
rabbit_ct_helpers:merge_app_env(Config,
{rabbit, [
{collect_statistics, basic},
{collect_statistics_interval, 100}
]}).
rabbit_ct_helpers:merge_app_env(
Config,
{rabbit, [
{collect_statistics, basic},
{collect_statistics_interval, 100}
]}).
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
@ -65,20 +63,9 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(cluster_size_3, Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip, "maintenance mode wrongly closes cluster-wide MQTT connections "
" in RMQ < 3.11.2 and < 3.10.10"};
false ->
set_cluster_size(3, Config)
end;
init_per_group(cluster_size_5, Config) ->
set_cluster_size(5, Config).
set_cluster_size(NodesCount, Config) ->
rabbit_ct_helpers:set_config(
Config, [{rmq_nodes_count, NodesCount}]).
Config, [{rmq_nodes_count, 5}]).
end_per_group(_, Config) ->
Config.
@ -107,28 +94,6 @@ end_per_testcase(Testcase, Config) ->
%% Test cases
%% -------------------------------------------------------------------
maintenance(Config) ->
C0 = connect(<<"client-0">>, Config, 0, ?OPTS),
C1a = connect(<<"client-1a">>, Config, 1, ?OPTS),
C1b = connect(<<"client-1b">>, Config, 1, ?OPTS),
timer:sleep(500),
ok = drain_node(Config, 2),
ok = revive_node(Config, 2),
timer:sleep(500),
[?assert(erlang:is_process_alive(C)) || C <- [C0, C1a, C1b]],
process_flag(trap_exit, true),
ok = drain_node(Config, 1),
[await_disconnection(Pid) || Pid <- [C1a, C1b]],
ok = revive_node(Config, 1),
?assert(erlang:is_process_alive(C0)),
ok = drain_node(Config, 0),
await_disconnection(C0),
ok = revive_node(Config, 0).
%% Note about running this testsuite in a mixed-versions cluster:
%% All even-numbered nodes will use the same code base when using a
%% secondary Umbrella. Odd-numbered nodes might use an incompatible code
@ -147,7 +112,7 @@ connection_id_tracking(Config) ->
C1 = connect(Id, Config, 0, ?OPTS),
{ok, _, _} = emqtt:subscribe(C1, <<"TopicA">>, qos0),
ok = emqtt:publish(C1, <<"TopicA">>, <<"Payload">>),
ok = expect_publishes(<<"TopicA">>, [<<"Payload">>]),
ok = expect_publishes(C1, <<"TopicA">>, [<<"Payload">>]),
%% there's one connection
assert_connection_count(Config, 4, 2, 1),
@ -155,12 +120,12 @@ connection_id_tracking(Config) ->
%% connect to the same node (A or 0)
process_flag(trap_exit, true),
C2 = connect(Id, Config, 0, ?OPTS),
await_disconnection(C1),
await_exit(C1),
assert_connection_count(Config, 4, 2, 1),
%% connect to a different node (C or 2)
C3 = connect(Id, Config, 2, ?OPTS),
await_disconnection(C2),
await_exit(C2),
assert_connection_count(Config, 4, 2, 1),
ok = emqtt:disconnect(C3).
@ -169,16 +134,16 @@ connection_id_tracking_on_nodedown(Config) ->
C = connect(<<"simpleClient">>, Config, ?OPTS),
{ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos0),
ok = emqtt:publish(C, <<"TopicA">>, <<"Payload">>),
ok = expect_publishes(<<"TopicA">>, [<<"Payload">>]),
ok = expect_publishes(C, <<"TopicA">>, [<<"Payload">>]),
assert_connection_count(Config, 4, 2, 1),
process_flag(trap_exit, true),
ok = stop_node(Config, Server),
await_disconnection(C),
await_exit(C),
assert_connection_count(Config, 4, 2, 0),
ok.
connection_id_tracking_with_decommissioned_node(Config) ->
case rpc(Config, 0, rabbit_mqtt_ff, track_client_id_in_ra, []) of
case rpc(Config, rabbit_mqtt_ff, track_client_id_in_ra, []) of
false ->
{skip, "This test requires client ID tracking in Ra"};
true ->
@ -186,12 +151,12 @@ connection_id_tracking_with_decommissioned_node(Config) ->
C = connect(<<"simpleClient">>, Config, ?OPTS),
{ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos0),
ok = emqtt:publish(C, <<"TopicA">>, <<"Payload">>),
ok = expect_publishes(<<"TopicA">>, [<<"Payload">>]),
ok = expect_publishes(C, <<"TopicA">>, [<<"Payload">>]),
assert_connection_count(Config, 4, 2, 1),
process_flag(trap_exit, true),
{ok, _} = rabbitmqctl(Config, 0, ["decommission_mqtt_node", Server]),
await_disconnection(C),
await_exit(C),
assert_connection_count(Config, 4, 2, 0),
ok
end.
@ -211,10 +176,3 @@ assert_connection_count(Config, Retries, NodeId, NumElements) ->
timer:sleep(500),
assert_connection_count(Config, Retries-1, NodeId, NumElements)
end.
await_disconnection(Client) ->
receive
{'EXIT', Client, _} -> ok
after
20_000 -> ct:fail({missing_exit, Client})
end.

View File

@ -15,9 +15,10 @@
init(_) ->
{ok, ?INIT_STATE}.
handle_event(#event{type = node_stats}, State) ->
{ok, State};
handle_event(#event{type = node_node_stats}, State) ->
handle_event(#event{type = T}, State)
when T =:= node_stats orelse
T =:= node_node_stats orelse
T =:= node_node_deleted ->
{ok, State};
handle_event(Event, State) ->
{ok, [Event | State]}.

View File

@ -13,7 +13,7 @@
-import(rabbit_ct_broker_helpers, [rpc/5]).
-import(rabbit_ct_helpers, [eventually/1]).
-import(util, [expect_publishes/2,
-import(util, [expect_publishes/3,
get_global_counters/4,
connect/2,
connect/4]).
@ -98,7 +98,7 @@ rabbit_mqtt_qos0_queue(Config) ->
C1 = connect(ClientId, Config),
{ok, _, [0]} = emqtt:subscribe(C1, Topic, qos0),
ok = emqtt:publish(C1, Topic, Msg, qos0),
ok = expect_publishes(Topic, [Msg]),
ok = expect_publishes(C1, Topic, [Msg]),
?assertEqual(1,
length(rpc(Config, 0, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]))),
@ -109,7 +109,7 @@ rabbit_mqtt_qos0_queue(Config) ->
?assertEqual(1,
length(rpc(Config, 0, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]))),
ok = emqtt:publish(C1, Topic, Msg, qos0),
ok = expect_publishes(Topic, [Msg]),
ok = expect_publishes(C1, Topic, [Msg]),
?assertMatch(#{messages_delivered_total := 2,
messages_delivered_consume_auto_ack_total := 2},
get_global_counters(Config, ?PROTO_VER, 0, [{queue_type, rabbit_classic_queue}])),
@ -125,7 +125,7 @@ rabbit_mqtt_qos0_queue(Config) ->
?assertEqual(1,
length(rpc(Config, 0, rabbit_amqqueue, list_by_type, [FeatureFlag]))),
ok = emqtt:publish(C2, Topic, Msg, qos0),
ok = expect_publishes(Topic, [Msg]),
ok = expect_publishes(C2, Topic, [Msg]),
?assertMatch(#{messages_delivered_total := 1,
messages_delivered_consume_auto_ack_total := 1},
get_global_counters(Config, ?PROTO_VER, 0, [{queue_type, FeatureFlag}])),

View File

@ -11,38 +11,35 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(rabbit_ct_broker_helpers, [rpc/5]).
-import(rabbit_ct_helpers, [consistently/1,
eventually/3]).
-import(rabbit_ct_broker_helpers, [rpc/4]).
-import(rabbit_ct_helpers, [eventually/3]).
-import(util, [all_connection_pids/1,
publish_qos1_timeout/4,
expect_publishes/2,
expect_publishes/3,
connect/2,
connect/3]).
connect/3,
await_exit/1]).
all() ->
[
{group, non_parallel_tests}
{group, tests}
].
groups() ->
[
{non_parallel_tests, [],
{tests, [],
[
block,
block_connack_timeout,
handle_invalid_packets,
login_timeout,
keepalive,
keepalive_turned_off,
stats,
will,
clean_session_disconnect_client,
clean_session_kill_node,
quorum_clean_session_false,
quorum_clean_session_true,
classic_clean_session_true,
classic_clean_session_false
classic_clean_session_false,
non_clean_sess_empty_client_id,
event_authentication_failure,
rabbit_mqtt_qos0_queue_overflow
]}
].
@ -94,33 +91,6 @@ end_per_testcase(Testcase, Config) ->
%% Testsuite cases
%% -------------------------------------------------------------------
block(Config) ->
C = connect(?FUNCTION_NAME, Config),
%% Only here to ensure the connection is really up
{ok, _, _} = emqtt:subscribe(C, <<"TopicA">>),
ok = emqtt:publish(C, <<"TopicA">>, <<"Payload">>),
ok = expect_publishes(<<"TopicA">>, [<<"Payload">>]),
{ok, _, _} = emqtt:unsubscribe(C, <<"TopicA">>),
{ok, _, _} = emqtt:subscribe(C, <<"Topic1">>),
{ok, _} = emqtt:publish(C, <<"Topic1">>, <<"Not blocked yet">>, [{qos, 1}]),
ok = rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0.00000001]),
% %% Let it block
timer:sleep(100),
%% Blocked, but still will publish when unblocked
puback_timeout = publish_qos1_timeout(C, <<"Topic1">>, <<"Now blocked">>, 1000),
puback_timeout = publish_qos1_timeout(C, <<"Topic1">>, <<"Still blocked">>, 1000),
%% Unblock
rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0.4]),
ok = expect_publishes(<<"Topic1">>, [<<"Not blocked yet">>,
<<"Now blocked">>,
<<"Still blocked">>]),
ok = emqtt:disconnect(C).
block_connack_timeout(Config) ->
P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
Ports0 = rpc(Config, erlang, ports, []),
@ -189,74 +159,6 @@ login_timeout(Config) ->
rpc(Config, application, unset_env, [rabbitmq_mqtt, login_timeout])
end.
keepalive(Config) ->
KeepaliveSecs = 1,
KeepaliveMs = timer:seconds(KeepaliveSecs),
ProtoVer = v4,
WillTopic = <<"will/topic">>,
WillPayload = <<"will-payload">>,
C1 = connect(?FUNCTION_NAME, Config, [{keepalive, KeepaliveSecs},
{proto_ver, ProtoVer},
{will_topic, WillTopic},
{will_payload, WillPayload},
{will_retain, true},
{will_qos, 0}]),
ok = emqtt:publish(C1, <<"ignored">>, <<"msg">>),
%% Connection should stay up when client sends PING requests.
timer:sleep(KeepaliveMs),
?assertMatch(#{publishers := 1},
util:get_global_counters(Config, ProtoVer)),
%% Mock the server socket to not have received any bytes.
rabbit_ct_broker_helpers:setup_meck(Config),
Mod = rabbit_net,
ok = rpc(Config, 0, meck, new, [Mod, [no_link, passthrough]]),
ok = rpc(Config, 0, meck, expect, [Mod, getstat, 2, {ok, [{recv_oct, 999}]} ]),
process_flag(trap_exit, true),
%% We expect the server to respect the keepalive closing the connection.
eventually(?_assertMatch(#{publishers := 0},
util:get_global_counters(Config, ProtoVer)),
KeepaliveMs, 3 * KeepaliveSecs),
receive {'EXIT', C1, _} -> ok
after 1000 -> ct:fail("missing client exit")
end,
true = rpc(Config, 0, meck, validate, [Mod]),
ok = rpc(Config, 0, meck, unload, [Mod]),
C2 = connect(<<"client2">>, Config),
{ok, _, [0]} = emqtt:subscribe(C2, WillTopic),
receive {publish, #{client_pid := C2,
dup := false,
qos := 0,
retain := true,
topic := WillTopic,
payload := WillPayload}} -> ok
after 3000 -> ct:fail("missing will")
end,
ok = emqtt:disconnect(C2).
keepalive_turned_off(Config) ->
%% "A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism."
KeepaliveSecs = 0,
C = connect(?FUNCTION_NAME, Config, [{keepalive, KeepaliveSecs}]),
ok = emqtt:publish(C, <<"TopicB">>, <<"Payload">>),
%% Mock the server socket to not have received any bytes.
rabbit_ct_broker_helpers:setup_meck(Config),
Mod = rabbit_net,
ok = rpc(Config, 0, meck, new, [Mod, [no_link, passthrough]]),
ok = rpc(Config, 0, meck, expect, [Mod, getstat, 2, {ok, [{recv_oct, 999}]} ]),
consistently(?_assert(erlang:is_process_alive(C))),
true = rpc(Config, 0, meck, validate, [Mod]),
ok = rpc(Config, 0, meck, unload, [Mod]),
ok = emqtt:disconnect(C).
stats(Config) ->
C = connect(?FUNCTION_NAME, Config),
%% Wait for stats being emitted (every 100ms)
@ -288,7 +190,7 @@ validate_durable_queue_type(Config, ClientName, CleanSession, ExpectedQueueType)
C = connect(ClientName, Config, [{clean_start, CleanSession}]),
{ok, _, _} = emqtt:subscribe(C, <<"TopicB">>, qos1),
ok = emqtt:publish(C, <<"TopicB">>, <<"Payload">>),
ok = expect_publishes(<<"TopicB">>, [<<"Payload">>]),
ok = expect_publishes(C, <<"TopicB">>, [<<"Payload">>]),
{ok, _, _} = emqtt:unsubscribe(C, <<"TopicB">>),
Prefix = <<"mqtt-subscription-">>,
Suffix = <<"qos1">>,
@ -296,51 +198,6 @@ validate_durable_queue_type(Config, ClientName, CleanSession, ExpectedQueueType)
?assertEqual(ExpectedQueueType, get_durable_queue_type(Server, QNameBin)),
ok = emqtt:disconnect(C).
clean_session_disconnect_client(Config) ->
C = connect(?FUNCTION_NAME, Config),
{ok, _, _} = emqtt:subscribe(C, <<"topic0">>, qos0),
{ok, _, _} = emqtt:subscribe(C, <<"topic1">>, qos1),
QsQos0 = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_mqtt_qos0_queue]),
QsClassic = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]),
case rabbit_ct_helpers:is_mixed_versions(Config) of
false ->
?assertEqual(1, length(QsQos0)),
?assertEqual(1, length(QsClassic));
true ->
?assertEqual(0, length(QsQos0)),
?assertEqual(2, length(QsClassic))
end,
ok = emqtt:disconnect(C),
%% After terminating a clean session, we expect any session state to be cleaned up on the server.
timer:sleep(200), %% Give some time to clean up exclusive classic queue.
L = rpc(Config, rabbit_amqqueue, list, []),
?assertEqual(0, length(L)).
clean_session_kill_node(Config) ->
C = connect(?FUNCTION_NAME, Config),
{ok, _, _} = emqtt:subscribe(C, <<"topic0">>, qos0),
{ok, _, _} = emqtt:subscribe(C, <<"topic1">>, qos1),
QsQos0 = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_mqtt_qos0_queue]),
QsClassic = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]),
case rabbit_ct_helpers:is_mixed_versions(Config) of
false ->
?assertEqual(1, length(QsQos0)),
?assertEqual(1, length(QsClassic));
true ->
?assertEqual(0, length(QsQos0)),
?assertEqual(2, length(QsClassic))
end,
?assertEqual(2, rpc(Config, ets, info, [rabbit_durable_queue, size])),
process_flag(trap_exit, true),
ok = rabbit_ct_broker_helpers:kill_node(Config, 0),
ok = rabbit_ct_broker_helpers:start_node(Config, 0),
%% After terminating a clean session by a node crash, we expect any session
%% state to be cleaned up on the server once the server comes back up.
?assertEqual(0, rpc(Config, ets, info, [rabbit_durable_queue, size])).
quorum_clean_session_false(Config) ->
Default = rpc(Config, reader_SUITE, get_env, []),
rpc(Config, reader_SUITE, set_env, [quorum]),
@ -361,24 +218,109 @@ classic_clean_session_true(Config) ->
classic_clean_session_false(Config) ->
validate_durable_queue_type(Config, <<"classicCleanSessionFalse">>, false, rabbit_classic_queue).
will(Config) ->
Topic = <<"will/topic">>,
Msg = <<"will msg">>,
Publisher = connect(<<"will-publisher">>, Config, [{will_topic, Topic},
{will_payload, Msg},
{will_qos, 0},
{will_retain, false}]),
timer:sleep(100),
[ServerPublisherPid] = all_connection_pids(Config),
%% "If the Client supplies a zero-byte ClientId with CleanSession set to 0,
%% the Server MUST respond to the CONNECT Packet with a CONNACK return code 0x02
%% (Identifier rejected) and then close the Network Connection" [MQTT-3.1.3-8].
non_clean_sess_empty_client_id(Config) ->
{ok, C} = emqtt:start_link(
[{clientid, <<>>},
{clean_start, false},
{proto_ver, v4},
{host, "localhost"},
{port, rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt)}
]),
process_flag(trap_exit, true),
?assertMatch({error, {client_identifier_not_valid, _}},
emqtt:connect(C)),
ok = await_exit(C).
Subscriber = connect(<<"will-subscriber">>, Config),
{ok, _, _} = emqtt:subscribe(Subscriber, Topic, qos0),
event_authentication_failure(Config) ->
{ok, C} = emqtt:start_link(
[{username, <<"Trudy">>},
{password, <<"fake-password">>},
{host, "localhost"},
{port, rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt)},
{clientid, atom_to_binary(?FUNCTION_NAME)},
{proto_ver, v4}]),
true = unlink(C),
true = unlink(Publisher),
erlang:exit(ServerPublisherPid, test_will),
ok = expect_publishes(Topic, [Msg]),
ok = rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config, event_recorder),
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
ok = gen_event:add_handler({rabbit_event, Server}, event_recorder, []),
ok = emqtt:disconnect(Subscriber).
?assertMatch({error, _}, emqtt:connect(C)),
rpc(Config, M, F, A) ->
rpc(Config, 0, M, F, A).
[E, _ConnectionClosedEvent] = util:get_events(Server),
util:assert_event_type(user_authentication_failure, E),
util:assert_event_prop([{name, <<"Trudy">>},
{connection_type, network}],
E),
ok = gen_event:delete_handler({rabbit_event, Server}, event_recorder, []).
%% Test that queue type rabbit_mqtt_qos0_queue drops QoS 0 messages when its
%% max length is reached.
rabbit_mqtt_qos0_queue_overflow(Config) ->
Topic = atom_to_binary(?FUNCTION_NAME),
Msg = binary:copy(<<"x">>, 1000),
NumMsgs = 10_000,
%% Provoke TCP back-pressure from client to server by using very small buffers.
Opts = [{tcp_opts, [{recbuf, 512},
{buffer, 512}]}],
Sub = connect(<<"subscriber">>, Config, Opts),
{ok, _, [0]} = emqtt:subscribe(Sub, Topic, qos0),
[ServerConnectionPid] = all_connection_pids(Config),
%% Suspend the receiving client such that it stops reading from its socket
%% causing TCP back-pressure to the server being applied.
true = erlang:suspend_process(Sub),
%% Let's overflow the receiving server MQTT connection process
%% (i.e. the rabbit_mqtt_qos0_queue) by sending many large messages.
Pub = connect(<<"publisher">>, Config),
lists:foreach(fun(_) ->
ok = emqtt:publish(Pub, Topic, Msg, qos0)
end, lists:seq(1, NumMsgs)),
%% Give the server some time to process (either send or drop) the messages.
timer:sleep(2000),
%% Let's resume the receiving client to receive any remaining messages that did
%% not get dropped.
true = erlang:resume_process(Sub),
NumReceived = num_received(Topic, Msg, 0),
{status, _, _, [_, _, _, _, Misc]} = sys:get_status(ServerConnectionPid),
[State] = [S || {data, [{"State", S}]} <- Misc],
#{proc_state := #{qos0_messages_dropped := NumDropped}} = State,
ct:pal("NumReceived=~b~nNumDropped=~b", [NumReceived, NumDropped]),
%% We expect that
%% 1. all sent messages were either received or dropped
?assertEqual(NumMsgs, NumReceived + NumDropped),
case rabbit_ct_helpers:is_mixed_versions(Config) of
false ->
%% 2. at least one message was dropped (otherwise our whole test case did not
%% test what it was supposed to test: that messages are dropped due to the
%% server being overflowed with messages while the client receives too slowly)
?assert(NumDropped >= 1);
true ->
%% Feature flag rabbit_mqtt_qos0_queue is disabled.
?assertEqual(0, NumDropped)
end,
%% 3. we received at least 1000 messages because everything below the default
%% of mailbox_soft_limit=1000 should not be dropped
?assert(NumReceived >= 1000),
ok = emqtt:disconnect(Sub),
ok = emqtt:disconnect(Pub).
num_received(Topic, Payload, N) ->
receive
{publish, #{topic := Topic,
payload := Payload}} ->
num_received(Topic, Payload, N + 1)
after 1000 ->
N
end.

View File

@ -8,7 +8,7 @@
-compile([export_all, nowarn_export_all]).
-include_lib("common_test/include/ct.hrl").
-import(util, [expect_publishes/2,
-import(util, [expect_publishes/3,
connect/3]).
all() ->
@ -91,7 +91,7 @@ coerce_configuration_data(Config) ->
{ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos0),
ok = emqtt:publish(C, <<"TopicA">>, <<"Payload">>),
ok = expect_publishes(<<"TopicA">>, [<<"Payload">>]),
ok = expect_publishes(C, <<"TopicA">>, [<<"Payload">>]),
ok = emqtt:disconnect(C).
@ -105,7 +105,7 @@ should_translate_amqp2mqtt_on_publish(Config) ->
%% there's an active consumer
{ok, _, _} = emqtt:subscribe(C, <<"TopicA/Device.Field">>, qos1),
ok = emqtt:publish(C, <<"TopicA/Device.Field">>, #{}, <<"Payload">>, [{retain, true}]),
ok = expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]),
ok = expect_publishes(C, <<"TopicA/Device/Field">>, [<<"Payload">>]),
ok = emqtt:disconnect(C).
%% -------------------------------------------------------------------
@ -118,7 +118,7 @@ should_translate_amqp2mqtt_on_retention(Config) ->
%% publish with retain = true before a consumer comes around
ok = emqtt:publish(C, <<"TopicA/Device.Field">>, #{}, <<"Payload">>, [{retain, true}]),
{ok, _, _} = emqtt:subscribe(C, <<"TopicA/Device.Field">>, qos1),
ok = expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]),
ok = expect_publishes(C, <<"TopicA/Device/Field">>, [<<"Payload">>]),
ok = emqtt:disconnect(C).
%% -------------------------------------------------------------------
@ -130,7 +130,7 @@ should_translate_amqp2mqtt_on_retention_search(Config) ->
C = connect(<<"simpleClientRetainer">>, Config, [{ack_timeout, 1}]),
ok = emqtt:publish(C, <<"TopicA/Device.Field">>, #{}, <<"Payload">>, [{retain, true}]),
{ok, _, _} = emqtt:subscribe(C, <<"TopicA/Device/Field">>, qos1),
ok = expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]),
ok = expect_publishes(C, <<"TopicA/Device/Field">>, [<<"Payload">>]),
ok = emqtt:disconnect(C).
does_not_retain(Config) ->
@ -143,4 +143,4 @@ does_not_retain(Config) ->
after 1000 ->
ok
end,
ok = emqtt:disconnect(C).
ok = emqtt:disconnect(C).

View File

@ -3,11 +3,13 @@
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(integration_SUITE).
%% Test suite shared between rabbitmq_mqtt and rabbitmq_web_mqtt.
-module(shared_SUITE).
-compile([export_all,
nowarn_export_all]).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
@ -15,63 +17,82 @@
-import(rabbit_ct_broker_helpers,
[rabbitmqctl_list/3,
rpc/4,
rpc/5,
rpc_all/4,
get_node_config/3]).
get_node_config/3,
drain_node/2,
revive_node/2
]).
-import(rabbit_ct_helpers,
[eventually/3,
eventually/1]).
-import(util,
[all_connection_pids/1,
get_global_counters/2,
get_global_counters/3,
get_global_counters/4,
expect_publishes/2,
connect/2,
connect/3]).
get_global_counters/2, get_global_counters/3, get_global_counters/4,
expect_publishes/3,
connect/2, connect/3, connect/4,
get_events/1, assert_event_type/2, assert_event_prop/2,
await_exit/1, await_exit/2,
publish_qos1_timeout/4]).
-import(rabbit_mgmt_test_util,
[http_get/2,
http_delete/3]).
-define(MANAGEMENT_PLUGIN_TESTS,
[management_plugin_connection,
management_plugin_enable]).
all() ->
[
{group, cluster_size_1},
{group, cluster_size_3}
{group, mqtt}
,{group, web_mqtt}
].
groups() ->
[
{mqtt, [], subgroups()}
,{web_mqtt, [], subgroups()}
].
subgroups() ->
[
{cluster_size_1, [],
[
%% separate node so global counters start from 0
{global_counters, [], [global_counters_v3, global_counters_v4]},
{tests, [], tests()}
{global_counters, [],
[
global_counters_v3,
global_counters_v4
]},
{tests, [],
[
many_qos1_messages
] ++ tests()}
]},
{cluster_size_3, [],
[queue_down_qos1,
[
queue_down_qos1,
consuming_classic_mirrored_queue_down,
consuming_classic_queue_down,
flow_classic_mirrored_queue,
flow_quorum_queue,
flow_stream,
rabbit_mqtt_qos0_queue,
cli_list_queues
] ++ tests()
}
cli_list_queues,
maintenance
] ++ tests()}
].
tests() ->
?MANAGEMENT_PLUGIN_TESTS ++
[delete_create_queue
[
management_plugin_connection
,management_plugin_enable
,disconnect
,pubsub_shared_connection
,pubsub_separate_connections
,will_with_disconnect
,will_without_disconnect
,delete_create_queue
,quorum_queue_rejects
,publish_to_all_queue_types_qos0
,publish_to_all_queue_types_qos1
,events
,event_authentication_failure
,internal_event_handler
,non_clean_sess_disconnect
,subscribe_same_topic_same_qos
@ -79,9 +100,12 @@ tests() ->
,subscribe_multiple
,large_message_mqtt_to_mqtt
,large_message_amqp_to_mqtt
,many_qos1_messages
,rabbit_mqtt_qos0_queue_overflow
,non_clean_sess_empty_client_id
,keepalive
,keepalive_turned_off
,duplicate_client_id
,block
,clean_session_disconnect_client
,clean_session_kill_node
].
suite() ->
@ -98,6 +122,11 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(mqtt, Config) ->
rabbit_ct_helpers:set_config(Config, {websocket, false});
init_per_group(web_mqtt, Config) ->
rabbit_ct_helpers:set_config(Config, {websocket, true});
init_per_group(cluster_size_1, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 1}]);
init_per_group(cluster_size_3 = Group, Config) ->
@ -109,20 +138,24 @@ init_per_group(Group, Config)
init_per_group0(Group, Config).
init_per_group0(Group, Config0) ->
Suffix = lists:flatten(io_lib:format("~s_websocket_~w", [Group, ?config(websocket, Config0)])),
Config1 = rabbit_ct_helpers:set_config(
Config0,
[{rmq_nodename_suffix, Group},
[{rmq_nodename_suffix, Suffix},
{rmq_extra_tcp_ports, [tcp_port_mqtt_extra,
tcp_port_mqtt_tls_extra]}]),
Config = rabbit_ct_helpers:run_steps(
Config = rabbit_ct_helpers:merge_app_env(
Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
Result = rpc_all(Config, application, set_env, [rabbit, classic_queue_default_version, 2]),
?assert(lists:all(fun(R) -> R =:= ok end, Result)),
Config.
{rabbit, [{classic_queue_default_version, 2}]}),
rabbit_ct_helpers:run_steps(
Config,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
end_per_group(cluster_size_1, Config) ->
end_per_group(G, Config)
when G =:= mqtt;
G =:= web_mqtt;
G =:= cluster_size_1 ->
Config;
end_per_group(_, Config) ->
rabbit_ct_helpers:run_teardown_steps(
@ -130,35 +163,120 @@ end_per_group(_, Config) ->
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(Testcase = maintenance, Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip, "maintenance mode wrongly closes cluster-wide MQTT connections "
"in RMQ < 3.11.2 and < 3.10.10"};
false ->
init_per_testcase0(Testcase, Config)
end;
init_per_testcase(T, Config)
when T =:= management_plugin_connection;
T =:= management_plugin_enable ->
ok = inets:start(),
init_per_testcase0(T, Config);
init_per_testcase(Testcase, Config) ->
maybe_start_inets(Testcase),
init_per_testcase0(Testcase, Config).
init_per_testcase0(Testcase, Config) ->
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[ok = rabbit_ct_broker_helpers:enable_plugin(Config, N, rabbitmq_web_mqtt) || N <- Nodes],
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(T, Config)
when T =:= management_plugin_connection;
T =:= management_plugin_enable ->
ok = inets:stop(),
end_per_testcase0(T, Config);
end_per_testcase(Testcase, Config) ->
maybe_stop_inets(Testcase),
end_per_testcase0(Testcase, Config).
end_per_testcase0(Testcase, Config) ->
rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
maybe_start_inets(Testcase) ->
case lists:member(Testcase, ?MANAGEMENT_PLUGIN_TESTS) of
true ->
ok = inets:start();
false ->
ok
end.
maybe_stop_inets(Testcase) ->
case lists:member(Testcase, ?MANAGEMENT_PLUGIN_TESTS) of
true ->
ok = inets:stop();
false ->
ok
end.
%% -------------------------------------------------------------------
%% Testsuite cases
%% -------------------------------------------------------------------
disconnect(Config) ->
C = connect(?FUNCTION_NAME, Config),
eventually(?_assertEqual(1, length(all_connection_pids(Config)))),
process_flag(trap_exit, true),
ok = emqtt:disconnect(C),
await_exit(C, normal),
eventually(?_assertEqual([], all_connection_pids(Config))),
ok.
pubsub_shared_connection(Config) ->
C = connect(?FUNCTION_NAME, Config),
Topic = <<"/topic/test-mqtt">>,
{ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
Payload = <<"a\x00a">>,
?assertMatch({ok, #{packet_id := _,
reason_code := 0,
reason_code_name := success
}},
emqtt:publish(C, Topic, Payload, [{qos, 1}])),
ok = expect_publishes(C, Topic, [Payload]),
ok = emqtt:disconnect(C).
pubsub_separate_connections(Config) ->
Pub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_publisher">>, Config),
Sub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_subscriber">>, Config),
Topic = <<"/topic/test-mqtt">>,
{ok, _, [1]} = emqtt:subscribe(Sub, Topic, qos1),
Payload = <<"a\x00a">>,
?assertMatch({ok, #{packet_id := _,
reason_code := 0,
reason_code_name := success
}},
emqtt:publish(Pub, Topic, Payload, [{qos, 1}])),
ok = expect_publishes(Sub, Topic, [Payload]),
ok = emqtt:disconnect(Pub),
ok = emqtt:disconnect(Sub).
will_with_disconnect(Config) ->
LastWillTopic = <<"/topic/last-will">>,
LastWillMsg = <<"last will message">>,
PubOpts = [{will_topic, LastWillTopic},
{will_payload, LastWillMsg},
{will_qos, 1}],
Pub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_publisher">>, Config, PubOpts),
Sub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_subscriber">>, Config),
{ok, _, [1]} = emqtt:subscribe(Sub, LastWillTopic, qos1),
%% Client sends DISCONNECT packet. Therefore, will message should not be sent.
ok = emqtt:disconnect(Pub),
?assertEqual({publish_not_received, LastWillMsg},
expect_publishes(Sub, LastWillTopic, [LastWillMsg])),
ok = emqtt:disconnect(Sub).
will_without_disconnect(Config) ->
LastWillTopic = <<"/topic/last-will">>,
LastWillMsg = <<"last will message">>,
PubOpts = [{will_topic, LastWillTopic},
{will_payload, LastWillMsg},
{will_qos, 1}],
Pub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_publisher">>, Config, PubOpts),
timer:sleep(100),
[ServerPublisherPid] = all_connection_pids(Config),
Sub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_subscriber">>, Config),
{ok, _, [1]} = emqtt:subscribe(Sub, LastWillTopic, qos1),
%% Client does not send DISCONNECT packet. Therefore, will message should be sent.
unlink(Pub),
erlang:exit(ServerPublisherPid, test_will),
?assertEqual(ok, expect_publishes(Sub, LastWillTopic, [LastWillMsg])),
ok = emqtt:disconnect(Sub).
quorum_queue_rejects(Config) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Name = atom_to_binary(?FUNCTION_NAME),
@ -267,7 +385,7 @@ flow_stream(Config) ->
flow(Config, {App, Par, Val}, QueueType)
when is_binary(QueueType) ->
{ok, DefaultVal} = rpc(Config, 0, application, get_env, [App, Par]),
{ok, DefaultVal} = rpc(Config, application, get_env, [App, Par]),
Result = rpc_all(Config, application, set_env, [App, Par, Val]),
?assert(lists:all(fun(R) -> R =:= ok end, Result)),
@ -278,7 +396,7 @@ flow(Config, {App, Par, Val}, QueueType)
NumMsgs = 1000,
C = connect(?FUNCTION_NAME, Config, [{retry_interval, 600},
{max_inflight, NumMsgs}]),
{max_inflight, NumMsgs}]),
TestPid = self(),
lists:foreach(
fun(N) ->
@ -316,7 +434,11 @@ events(Config) ->
E0),
assert_event_type(connection_created, E1),
[ConnectionPid] = all_connection_pids(Config),
ExpectedConnectionProps = [{protocol, {'MQTT', {3,1,1}}},
Proto = case ?config(websocket, Config) of
true -> 'Web MQTT';
false -> 'MQTT'
end,
ExpectedConnectionProps = [{protocol, {Proto, {3,1,1}}},
{node, Server},
{vhost, <<"/">>},
{user, <<"guest">>},
@ -384,31 +506,6 @@ events(Config) ->
ok = gen_event:delete_handler({rabbit_event, Server}, event_recorder, []).
event_authentication_failure(Config) ->
P = get_node_config(Config, 0, tcp_port_mqtt),
ClientId = atom_to_binary(?FUNCTION_NAME),
{ok, C} = emqtt:start_link([{username, <<"Trudy">>},
{password, <<"fake-password">>},
{host, "localhost"},
{port, P},
{clientid, ClientId},
{proto_ver, v4}]),
true = unlink(C),
ok = rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config, event_recorder),
Server = get_node_config(Config, 0, nodename),
ok = gen_event:add_handler({rabbit_event, Server}, event_recorder, []),
?assertMatch({error, _}, emqtt:connect(C)),
[E, _ConnectionClosedEvent] = get_events(Server),
assert_event_type(user_authentication_failure, E),
assert_event_prop([{name, <<"Trudy">>},
{connection_type, network}],
E),
ok = gen_event:delete_handler({rabbit_event, Server}, event_recorder, []).
internal_event_handler(Config) ->
Server = get_node_config(Config, 0, nodename),
ok = gen_event:call({rabbit_event, Server}, rabbit_mqtt_internal_event_handler, ignored_request, 1000).
@ -435,9 +532,9 @@ global_counters(Config, ProtoVer) ->
ok = emqtt:publish(C, <<"no/queue/bound">>, <<"msg-dropped">>, qos0),
{ok, _} = emqtt:publish(C, <<"no/queue/bound">>, <<"msg-returned">>, qos1),
ok = expect_publishes(Topic0, [<<"testm0">>]),
ok = expect_publishes(Topic1, [<<"testm1">>]),
ok = expect_publishes(Topic2, [<<"testm2">>]),
ok = expect_publishes(C, Topic0, [<<"testm0">>]),
ok = expect_publishes(C, Topic1, [<<"testm1">>]),
ok = expect_publishes(C, Topic2, [<<"testm2">>]),
?assertEqual(#{publishers => 1,
consumers => 1,
@ -524,7 +621,7 @@ queue_down_qos1(Config) ->
%% and failover consumption when the classic mirrored queue leader fails.
consuming_classic_mirrored_queue_down(Config) ->
[Server1, Server2, _Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Topic = PolicyName = atom_to_binary(?FUNCTION_NAME),
ClientId = Topic = PolicyName = atom_to_binary(?FUNCTION_NAME),
ok = rabbit_ct_broker_helpers:set_policy(
Config, Server1, PolicyName, <<".*">>, <<"queues">>,
@ -532,38 +629,30 @@ consuming_classic_mirrored_queue_down(Config) ->
{<<"queue-master-locator">>, <<"client-local">>}]),
%% Declare queue leader on Server1.
C1 = connect(?FUNCTION_NAME, Config, [{clean_start, false}]),
C1 = connect(ClientId, Config, Server1, [{clean_start, false}]),
{ok, _, _} = emqtt:subscribe(C1, Topic, qos1),
ok = emqtt:disconnect(C1),
%% Consume from Server2.
Options = [{host, "localhost"},
{port, get_node_config(Config, Server2, tcp_port_mqtt)},
{clientid, atom_to_binary(?FUNCTION_NAME)},
{proto_ver, v4}],
{ok, C2} = emqtt:start_link([{clean_start, false} | Options]),
{ok, _} = emqtt:connect(C2),
C2 = connect(ClientId, Config, Server2, [{clean_start, false}]),
%% Sanity check that consumption works.
{ok, _} = emqtt:publish(C2, Topic, <<"m1">>, qos1),
ok = expect_publishes(Topic, [<<"m1">>]),
ok = expect_publishes(C2, Topic, [<<"m1">>]),
%% Let's stop the queue leader node.
ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
%% Consumption should continue to work.
{ok, _} = emqtt:publish(C2, Topic, <<"m2">>, qos1),
ok = expect_publishes(Topic, [<<"m2">>]),
ok = expect_publishes(C2, Topic, [<<"m2">>]),
%% Cleanup
ok = emqtt:disconnect(C2),
ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
?assertMatch([_Q],
rpc(Config, Server1, rabbit_amqqueue, list, [])),
%% "When a Client has determined that it has no further use for the session it should do a
%% final connect with CleanSession set to 1 and then disconnect."
{ok, C3} = emqtt:start_link([{clean_start, true} | Options]),
{ok, _} = emqtt:connect(C3),
C3 = connect(ClientId, Config, Server2, [{clean_start, true}]),
ok = emqtt:disconnect(C3),
?assertEqual([],
rpc(Config, Server1, rabbit_amqqueue, list, [])),
@ -575,19 +664,14 @@ consuming_classic_queue_down(Config) ->
ClientId = Topic = atom_to_binary(?FUNCTION_NAME),
%% Declare classic queue on Server1.
C1 = connect(?FUNCTION_NAME, Config, [{clean_start, false}]),
C1 = connect(ClientId, Config, [{clean_start, false}]),
{ok, _, _} = emqtt:subscribe(C1, Topic, qos1),
ok = emqtt:disconnect(C1),
ProtoVer = v4,
%% Consume from Server3.
Options = [{host, "localhost"},
{port, get_node_config(Config, Server3, tcp_port_mqtt)},
{clientid, ClientId},
{proto_ver, ProtoVer}],
{ok, C2} = emqtt:start_link([{clean_start, false} | Options]),
{ok, _} = emqtt:connect(C2),
C2 = connect(ClientId, Config, Server3, [{clean_start, false}]),
ProtoVer = v4,
?assertMatch(#{consumers := 1},
get_global_counters(Config, ProtoVer, Server3)),
@ -600,17 +684,11 @@ consuming_classic_queue_down(Config) ->
eventually(?_assertMatch(#{consumers := 0},
get_global_counters(Config, ProtoVer, Server3)),
1000, 5),
receive
{'EXIT', C2, _} ->
ok
after 3000 ->
ct:fail("MQTT connection should have been closed")
end,
await_exit(C2),
%% Cleanup
ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
{ok, C3} = emqtt:start_link([{clean_start, true} | Options]),
{ok, _} = emqtt:connect(C3),
C3 = connect(ClientId, Config, Server3, [{clean_start, true}]),
ok = emqtt:disconnect(C3),
?assertEqual([],
rpc(Config, Server1, rabbit_amqqueue, list, [])),
@ -699,7 +777,7 @@ non_clean_sess_disconnect(Config) ->
get_global_counters(Config, ProtoVer)),
Msg = <<"msg">>,
{ok, _} = emqtt:publish(C2, Topic, Msg, qos1),
{publish_not_received, Msg} = expect_publishes(Topic, [Msg]),
{publish_not_received, Msg} = expect_publishes(C2, Topic, [Msg]),
ok = emqtt:disconnect(C2),
%% connect with clean sess true to clean up
@ -720,9 +798,9 @@ subscribe_same_topic_same_qos(Config) ->
{ok, _} = emqtt:publish(C, Topic, <<"msg2">>, qos1),
%% "Any existing retained messages matching the Topic Filter MUST be re-sent" [MQTT-3.8.4-3]
ok = expect_publishes(Topic, [<<"retained">>, <<"msg1">>,
<<"retained">>, <<"msg2">>
]),
ok = expect_publishes(C, Topic, [<<"retained">>, <<"msg1">>,
<<"retained">>, <<"msg2">>
]),
ok = emqtt:disconnect(C).
subscribe_same_topic_different_qos(Config) ->
@ -742,12 +820,12 @@ subscribe_same_topic_different_qos(Config) ->
{ok, _} = emqtt:publish(C, Topic, <<"msg3">>, qos1),
%% "Any existing retained messages matching the Topic Filter MUST be re-sent" [MQTT-3.8.4-3]
ok = expect_publishes(Topic, [<<"retained">>, <<"msg1">>,
<<"retained">>, <<"msg2">>,
<<"retained">>, <<"msg3">>]),
ok = expect_publishes(C, Topic, [<<"retained">>, <<"msg1">>,
<<"retained">>, <<"msg2">>,
<<"retained">>, <<"msg3">>]),
%% There should be exactly one consumer for each queue: qos0 and qos1
Consumers = rpc(Config, 0, rabbit_amqqueue, consumers_all, [<<"/">>]),
Consumers = rpc(Config, rabbit_amqqueue, consumers_all, [<<"/">>]),
?assertEqual(2, length(Consumers)),
ok = emqtt:disconnect(C),
@ -770,7 +848,7 @@ large_message_mqtt_to_mqtt(Config) ->
Payload0 = binary:copy(<<"x">>, 8_000_000),
Payload = <<Payload0/binary, "y">>,
{ok, _} = emqtt:publish(C, Topic, Payload, qos1),
ok = expect_publishes(Topic, [Payload]),
ok = expect_publishes(C, Topic, [Payload]),
ok = emqtt:disconnect(C).
large_message_amqp_to_mqtt(Config) ->
@ -785,21 +863,21 @@ large_message_amqp_to_mqtt(Config) ->
#'basic.publish'{exchange = <<"amq.topic">>,
routing_key = Topic},
#amqp_msg{payload = Payload}),
ok = expect_publishes(Topic, [Payload]),
ok = expect_publishes(C, Topic, [Payload]),
ok = emqtt:disconnect(C).
%% Packet identifier is a non zero two byte integer.
%% Test that the server wraps around the packet identifier.
many_qos1_messages(Config) ->
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
C = connect(ClientId, Config),
C = connect(ClientId, Config, 0, [{retry_interval, 600}]),
{ok, _, [1]} = emqtt:subscribe(C, {Topic, qos1}),
NumMsgs = 16#ffff + 100,
Payload = <<>>,
lists:foreach(fun(_) ->
{ok, _} = emqtt:publish(C, Topic, Payload, qos1)
end, lists:seq(1, NumMsgs)),
?assertEqual(NumMsgs, num_received(Topic, Payload, 0)),
Payloads = lists:map(fun integer_to_binary/1, lists:seq(1, NumMsgs)),
lists:foreach(fun(P) ->
{ok, _} = emqtt:publish(C, Topic, P, qos1)
end, Payloads),
expect_publishes(C, Topic, Payloads),
ok = emqtt:disconnect(C).
%% This test is mostly interesting in mixed version mode where feature flag
@ -812,74 +890,11 @@ rabbit_mqtt_qos0_queue(Config) ->
{ok, _, [0]} = emqtt:subscribe(Sub, Topic, qos0),
%% Place MQTT publisher process on old node in mixed version.
{ok, Pub} = emqtt:start_link(
[{port, get_node_config(Config, 1, tcp_port_mqtt)},
{clientid, <<"publisher">>},
{proto_ver, v4}
]),
{ok, _Properties} = emqtt:connect(Pub),
Pub = connect(<<"publisher">>, Config, 1, []),
Msg = <<"msg">>,
ok = emqtt:publish(Pub, Topic, Msg, qos0),
ok = expect_publishes(Topic, [Msg]),
ok = emqtt:disconnect(Sub),
ok = emqtt:disconnect(Pub).
%% Test that queue type rabbit_mqtt_qos0_queue drops QoS 0 messages when its
%% max length is reached.
rabbit_mqtt_qos0_queue_overflow(Config) ->
Topic = atom_to_binary(?FUNCTION_NAME),
Msg = binary:copy(<<"x">>, 1000),
NumMsgs = 10_000,
%% Provoke TCP back-pressure from client to server by using very small buffers.
Opts = [{tcp_opts, [{recbuf, 512},
{buffer, 512}]}],
Sub = connect(<<"subscriber">>, Config, Opts),
{ok, _, [0]} = emqtt:subscribe(Sub, Topic, qos0),
[ServerConnectionPid] = all_connection_pids(Config),
%% Suspend the receiving client such that it stops reading from its socket
%% causing TCP back-pressure to the server being applied.
true = erlang:suspend_process(Sub),
%% Let's overflow the receiving server MQTT connection process
%% (i.e. the rabbit_mqtt_qos0_queue) by sending many large messages.
Pub = connect(<<"publisher">>, Config),
lists:foreach(fun(_) ->
ok = emqtt:publish(Pub, Topic, Msg, qos0)
end, lists:seq(1, NumMsgs)),
%% Give the server some time to process (either send or drop) the messages.
timer:sleep(2000),
%% Let's resume the receiving client to receive any remaining messages that did
%% not get dropped.
true = erlang:resume_process(Sub),
NumReceived = num_received(Topic, Msg, 0),
{status, _, _, [_, _, _, _, Misc]} = sys:get_status(ServerConnectionPid),
[State] = [S || {data, [{"State", S}]} <- Misc],
#{proc_state := #{qos0_messages_dropped := NumDropped}} = State,
ct:pal("NumReceived=~b~nNumDropped=~b", [NumReceived, NumDropped]),
%% We expect that
%% 1. all sent messages were either received or dropped
?assertEqual(NumMsgs, NumReceived + NumDropped),
case rabbit_ct_helpers:is_mixed_versions(Config) of
false ->
%% 2. at least one message was dropped (otherwise our whole test case did not
%% test what it was supposed to test: that messages are dropped due to the
%% server being overflowed with messages while the client receives too slowly)
?assert(NumDropped >= 1);
true ->
%% Feature flag rabbit_mqtt_qos0_queue is disabled.
?assertEqual(0, NumDropped)
end,
%% 3. we received at least 1000 messages because everything below the default
%% of mailbox_soft_limit=1000 should not be dropped
?assert(NumReceived >= 1000),
ok = expect_publishes(Sub, Topic, [Msg]),
ok = emqtt:disconnect(Sub),
ok = emqtt:disconnect(Pub).
@ -901,12 +916,7 @@ management_plugin_connection(Config) ->
http_delete(Config,
"/connections/" ++ binary_to_list(uri_string:quote((ConnectionName))),
?NO_CONTENT),
receive
{'EXIT', C, _} ->
ok
after 5000 ->
ct:fail("server did not close connection")
end,
await_exit(C),
?assertEqual([], http_get(Config, "/connections")),
eventually(?_assertEqual([], all_connection_pids(Config)), 500, 3).
@ -915,8 +925,8 @@ management_plugin_enable(Config) ->
ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, rabbitmq_management),
ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, rabbitmq_management_agent),
%% If the MQTT connection is established **before** the management plugin is enabled,
%% the management plugin should still list the MQTT connection.
%% If the (web) MQTT connection is established **before** the management plugin is enabled,
%% the management plugin should still list the (web) MQTT connection.
C = connect(?FUNCTION_NAME, Config),
ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, rabbitmq_management_agent),
ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, rabbitmq_management),
@ -952,36 +962,177 @@ cli_list_queues(Config) ->
ok = emqtt:disconnect(C).
%% "If the Client supplies a zero-byte ClientId with CleanSession set to 0,
%% the Server MUST respond to the CONNECT Packet with a CONNACK return code 0x02
%% (Identifier rejected) and then close the Network Connection" [MQTT-3.1.3-8].
non_clean_sess_empty_client_id(Config) ->
{ok, C} = emqtt:start_link([{clientid, <<>>},
{clean_start, false},
{proto_ver, v4},
{host, "localhost"},
{port, get_node_config(Config, 0, tcp_port_mqtt)}
]),
maintenance(Config) ->
C0 = connect(<<"client-0">>, Config, 0, []),
C1a = connect(<<"client-1a">>, Config, 1, []),
C1b = connect(<<"client-1b">>, Config, 1, []),
timer:sleep(500),
ok = drain_node(Config, 2),
ok = revive_node(Config, 2),
timer:sleep(500),
[?assert(erlang:is_process_alive(C)) || C <- [C0, C1a, C1b]],
process_flag(trap_exit, true),
?assertMatch({error, {client_identifier_not_valid, _}},
emqtt:connect(C)),
receive {'EXIT', C, _} -> ok
after 500 -> ct:fail("server did not close connection")
end.
ok = drain_node(Config, 1),
[await_exit(Pid) || Pid <- [C1a, C1b]],
ok = revive_node(Config, 1),
?assert(erlang:is_process_alive(C0)),
ok = drain_node(Config, 0),
await_exit(C0),
ok = revive_node(Config, 0).
keepalive(Config) ->
KeepaliveSecs = 1,
KeepaliveMs = timer:seconds(KeepaliveSecs),
ProtoVer = v4,
WillTopic = <<"will/topic">>,
WillPayload = <<"will-payload">>,
C1 = connect(?FUNCTION_NAME, Config, [{keepalive, KeepaliveSecs},
{proto_ver, ProtoVer},
{will_topic, WillTopic},
{will_payload, WillPayload},
{will_retain, true},
{will_qos, 0}]),
ok = emqtt:publish(C1, <<"ignored">>, <<"msg">>),
%% Connection should stay up when client sends PING requests.
timer:sleep(KeepaliveMs),
?assertMatch(#{publishers := 1},
util:get_global_counters(Config, ProtoVer)),
%% Mock the server socket to not have received any bytes.
rabbit_ct_broker_helpers:setup_meck(Config),
Mod = rabbit_net,
ok = rpc(Config, meck, new, [Mod, [no_link, passthrough]]),
ok = rpc(Config, meck, expect, [Mod, getstat, 2, {ok, [{recv_oct, 999}]} ]),
process_flag(trap_exit, true),
%% We expect the server to respect the keepalive closing the connection.
eventually(?_assertMatch(#{publishers := 0},
util:get_global_counters(Config, ProtoVer)),
KeepaliveMs, 3 * KeepaliveSecs),
await_exit(C1),
true = rpc(Config, meck, validate, [Mod]),
ok = rpc(Config, meck, unload, [Mod]),
C2 = connect(<<"client2">>, Config),
{ok, _, [0]} = emqtt:subscribe(C2, WillTopic),
receive {publish, #{client_pid := C2,
dup := false,
qos := 0,
retain := true,
topic := WillTopic,
payload := WillPayload}} -> ok
after 3000 -> ct:fail("missing will")
end,
ok = emqtt:disconnect(C2).
keepalive_turned_off(Config) ->
%% "A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism."
KeepaliveSecs = 0,
C = connect(?FUNCTION_NAME, Config, [{keepalive, KeepaliveSecs}]),
ok = emqtt:publish(C, <<"TopicB">>, <<"Payload">>),
%% Mock the server socket to not have received any bytes.
rabbit_ct_broker_helpers:setup_meck(Config),
Mod = rabbit_net,
ok = rpc(Config, meck, new, [Mod, [no_link, passthrough]]),
ok = rpc(Config, meck, expect, [Mod, getstat, 2, {ok, [{recv_oct, 999}]} ]),
rabbit_ct_helpers:consistently(?_assert(erlang:is_process_alive(C))),
true = rpc(Config, meck, validate, [Mod]),
ok = rpc(Config, meck, unload, [Mod]),
ok = emqtt:disconnect(C).
duplicate_client_id(Config) ->
DuplicateClientId = ?FUNCTION_NAME,
C1 = connect(DuplicateClientId, Config),
eventually(?_assertEqual(1, length(all_connection_pids(Config)))),
process_flag(trap_exit, true),
C2 = connect(DuplicateClientId, Config),
await_exit(C1),
timer:sleep(200),
?assertEqual(1, length(all_connection_pids(Config))),
ok = emqtt:disconnect(C2).
block(Config) ->
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
C = connect(ClientId, Config),
{ok, _, _} = emqtt:subscribe(C, Topic),
{ok, _} = emqtt:publish(C, Topic, <<"Not blocked yet">>, [{qos, 1}]),
ok = rpc(Config, 0, vm_memory_monitor, set_vm_memory_high_watermark, [0.00000001]),
%% Let it block
timer:sleep(100),
%% Blocked, but still will publish when unblocked
puback_timeout = publish_qos1_timeout(C, Topic, <<"Now blocked">>, 1000),
puback_timeout = publish_qos1_timeout(C, Topic, <<"Still blocked">>, 1000),
%% Unblock
rpc(Config, 0, vm_memory_monitor, set_vm_memory_high_watermark, [0.4]),
ok = expect_publishes(C, Topic, [<<"Not blocked yet">>,
<<"Now blocked">>,
<<"Still blocked">>]),
ok = emqtt:disconnect(C).
clean_session_disconnect_client(Config) ->
C = connect(?FUNCTION_NAME, Config),
{ok, _, _} = emqtt:subscribe(C, <<"topic0">>, qos0),
{ok, _, _} = emqtt:subscribe(C, <<"topic1">>, qos1),
QsQos0 = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_mqtt_qos0_queue]),
QsClassic = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]),
case rabbit_ct_helpers:is_mixed_versions(Config) of
false ->
?assertEqual(1, length(QsQos0)),
?assertEqual(1, length(QsClassic));
true ->
?assertEqual(0, length(QsQos0)),
?assertEqual(2, length(QsClassic))
end,
ok = emqtt:disconnect(C),
%% After terminating a clean session, we expect any session state to be cleaned up on the server.
timer:sleep(200), %% Give some time to clean up exclusive classic queue.
L = rpc(Config, rabbit_amqqueue, list, []),
?assertEqual(0, length(L)).
clean_session_kill_node(Config) ->
C = connect(?FUNCTION_NAME, Config),
{ok, _, _} = emqtt:subscribe(C, <<"topic0">>, qos0),
{ok, _, _} = emqtt:subscribe(C, <<"topic1">>, qos1),
QsQos0 = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_mqtt_qos0_queue]),
QsClassic = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]),
case rabbit_ct_helpers:is_mixed_versions(Config) of
false ->
?assertEqual(1, length(QsQos0)),
?assertEqual(1, length(QsClassic));
true ->
?assertEqual(0, length(QsQos0)),
?assertEqual(2, length(QsClassic))
end,
?assertEqual(2, rpc(Config, ets, info, [rabbit_durable_queue, size])),
unlink(C),
ok = rabbit_ct_broker_helpers:kill_node(Config, 0),
ok = rabbit_ct_broker_helpers:start_node(Config, 0),
%% After terminating a clean session by a node crash, we expect any session
%% state to be cleaned up on the server once the server comes back up.
?assertEqual(0, rpc(Config, ets, info, [rabbit_durable_queue, size])).
%% -------------------------------------------------------------------
%% Internal helpers
%% -------------------------------------------------------------------
num_received(Topic, Payload, N) ->
receive
{publish, #{topic := Topic,
payload := Payload}} ->
num_received(Topic, Payload, N + 1)
after 1000 ->
N
end.
await_confirms_ordered(_, To, To) ->
ok;
await_confirms_ordered(From, N, To) ->
@ -1033,20 +1184,3 @@ bind(Ch, QueueName, Topic)
Ch, #'queue.bind'{queue = QueueName,
exchange = <<"amq.topic">>,
routing_key = Topic}).
get_events(Node) ->
timer:sleep(300), %% events are sent and processed asynchronously
Result = gen_event:call({rabbit_event, Node}, event_recorder, take_state),
?assert(is_list(Result)),
Result.
assert_event_type(ExpectedType, #event{type = ActualType}) ->
?assertEqual(ExpectedType, ActualType).
assert_event_prop(ExpectedProp = {Key, _Value}, #event{props = Props}) ->
?assertEqual(ExpectedProp, lists:keyfind(Key, 1, Props));
assert_event_prop(ExpectedProps, Event)
when is_list(ExpectedProps) ->
lists:foreach(fun(P) ->
assert_event_prop(P, Event)
end, ExpectedProps).

View File

@ -1,6 +1,8 @@
-module(util).
-include("rabbit_mqtt.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("eunit/include/eunit.hrl").
-export([all_connection_pids/1,
publish_qos1_timeout/4,
@ -8,10 +10,15 @@
get_global_counters/2,
get_global_counters/3,
get_global_counters/4,
expect_publishes/2,
expect_publishes/3,
connect/2,
connect/3,
connect/4
connect/4,
get_events/1,
assert_event_type/2,
assert_event_prop/2,
await_exit/1,
await_exit/2
]).
all_connection_pids(Config) ->
@ -39,20 +46,27 @@ publish_qos1_timeout(Client, Topic, Payload, Timeout) ->
puback_timeout
end.
expect_publishes(_Topic, []) ->
ok;
expect_publishes(Topic, [Payload|Rest]) ->
receive
{publish, #{topic := Topic,
payload := Payload}} ->
expect_publishes(Topic, Rest)
after 5000 ->
{publish_not_received, Payload}
end.
sync_publish_result(Caller, Mref, Result) ->
erlang:send(Caller, {Mref, Result}).
expect_publishes(_, _, []) ->
ok;
expect_publishes(Client, Topic, [Payload|Rest])
when is_pid(Client) ->
receive
{publish, #{client_pid := Client,
topic := Topic,
payload := Payload}} ->
expect_publishes(Client, Topic, Rest);
{publish, #{client_pid := Client,
topic := Topic,
payload := Other}} ->
ct:fail("Received unexpected PUBLISH payload. Expected: ~p Got: ~p",
[Payload, Other])
after 3000 ->
{publish_not_received, Payload}
end.
get_global_counters(Config, ProtoVer) ->
get_global_counters(Config, ProtoVer, 0, []).
@ -67,6 +81,37 @@ get_global_counters(Config, Proto, Node, QType) ->
maps:get([{protocol, Proto}] ++ QType,
rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_global_counters, overview, [])).
get_events(Node) ->
timer:sleep(300), %% events are sent and processed asynchronously
Result = gen_event:call({rabbit_event, Node}, event_recorder, take_state),
?assert(is_list(Result)),
Result.
assert_event_type(ExpectedType, #event{type = ActualType}) ->
?assertEqual(ExpectedType, ActualType).
assert_event_prop(ExpectedProp = {Key, _Value}, #event{props = Props}) ->
?assertEqual(ExpectedProp, lists:keyfind(Key, 1, Props));
assert_event_prop(ExpectedProps, Event)
when is_list(ExpectedProps) ->
lists:foreach(fun(P) ->
assert_event_prop(P, Event)
end, ExpectedProps).
await_exit(Pid) ->
receive
{'EXIT', Pid, _} -> ok
after
20_000 -> ct:fail({missing_exit, Pid})
end.
await_exit(Pid, Reason) ->
receive
{'EXIT', Pid, Reason} -> ok
after
20_000 -> ct:fail({missing_exit, Pid})
end.
connect(ClientId, Config) ->
connect(ClientId, Config, []).
@ -74,12 +119,22 @@ connect(ClientId, Config, AdditionalOpts) ->
connect(ClientId, Config, 0, AdditionalOpts).
connect(ClientId, Config, Node, AdditionalOpts) ->
P = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_mqtt),
{Port, WsOpts, Connect} =
case rabbit_ct_helpers:get_config(Config, websocket, false) of
false ->
{rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_mqtt),
[],
fun emqtt:connect/1};
true ->
{rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_web_mqtt),
[{ws_path, "/ws"}],
fun emqtt:ws_connect/1}
end,
Options = [{host, "localhost"},
{port, P},
{clientid, rabbit_data_coercion:to_binary(ClientId)},
{proto_ver, v4}
] ++ AdditionalOpts,
{port, Port},
{proto_ver, v4},
{clientid, rabbit_data_coercion:to_binary(ClientId)}
] ++ WsOpts ++ AdditionalOpts,
{ok, C} = emqtt:start_link(Options),
{ok, _Properties} = emqtt:connect(C),
{ok, _Properties} = Connect(C),
C.

View File

@ -12,8 +12,7 @@
start/2,
prep_stop/1,
stop/1,
list_connections/0,
close_all_client_connections/1
list_connections/0
]).
%% Dummy supervisor - see Ulf Wiger's comment at
@ -51,15 +50,8 @@ init([]) -> {ok, {{one_for_one, 1, 5}, []}}.
list_connections() ->
PlainPids = connection_pids_of_protocol(?TCP_PROTOCOL),
TLSPids = connection_pids_of_protocol(?TLS_PROTOCOL),
PlainPids ++ TLSPids.
-spec close_all_client_connections(string()) -> {'ok', non_neg_integer()}.
close_all_client_connections(Reason) ->
Connections = list_connections(),
[rabbit_web_mqtt_handler:close_connection(Pid, Reason) || Pid <- Connections],
{ok, length(Connections)}.
%%
%% Implementation
%%

View File

@ -19,8 +19,7 @@
terminate/3
]).
-export([conserve_resources/3,
close_connection/2]).
-export([conserve_resources/3]).
%% cowboy_sub_protocol
-export([upgrade/4,
@ -114,13 +113,6 @@ websocket_init({State0 = #state{socket = Sock}, PeerAddr}) ->
{[{shutdown_reason, Reason}], State0}
end.
-spec close_connection(pid(), string()) -> 'ok'.
close_connection(Pid, Reason) ->
rabbit_log_connection:info("Web MQTT: will terminate connection process ~tp, reason: ~ts",
[Pid, Reason]),
sys:terminate(Pid, Reason),
ok.
-spec conserve_resources(pid(),
rabbit_alarm:resource_alarm_source(),
rabbit_alarm:resource_alert()) -> ok.
@ -321,14 +313,14 @@ handle_credits(State0) ->
end,
{[{active, Active}], State, hibernate}.
control_throttle(State = #state{connection_state = CS,
control_throttle(State = #state{connection_state = ConnState,
conserve = Conserve,
keepalive = KState,
proc_state = PState}) ->
Throttle = Conserve orelse
rabbit_mqtt_processor:soft_limit_exceeded(PState) orelse
credit_flow:blocked(),
case {CS, Throttle} of
case {ConnState, Throttle} of
{running, true} ->
State#state{connection_state = blocked,
keepalive = rabbit_mqtt_keepalive:cancel_timer(KState)};

View File

@ -7,6 +7,8 @@
-module(rabbit_ws_test_util).
-include_lib("common_test/include/ct.hrl").
-export([update_app_env/3, get_web_mqtt_port_str/1,
mqtt_3_1_1_connect_packet/0]).
@ -22,12 +24,12 @@ update_app_env(Config, Key, Val) ->
[rabbitmq_web_mqtt]).
get_web_mqtt_port_str(Config) ->
Port = case rabbit_ct_helpers:get_config(Config, protocol) of
"ws" ->
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt);
"wss" ->
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt_tls)
end,
Port = case ?config(protocol, Config) of
"ws" ->
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt);
"wss" ->
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt_tls)
end,
integer_to_list(Port).
mqtt_3_1_1_connect_packet() ->

View File

@ -8,51 +8,27 @@
-module(system_SUITE).
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl").
-compile([export_all, nowarn_export_all]).
-import(rabbit_ct_broker_helpers,
[rpc/5]).
-import(rabbit_ct_helpers,
[eventually/1,
eventually/3]).
-import(rabbit_mgmt_test_util,
[http_get/2,
http_delete/3]).
-define(MANAGEMENT_PLUGIN_TESTS,
[management_plugin_connection,
management_plugin_enable]).
-import(rabbit_ct_helpers, [eventually/1]).
all() ->
[
{group, tests}
].
[{group, tests}].
groups() ->
[
{tests, [],
[block
, pubsub_shared_connection
, pubsub_separate_connections
, last_will_enabled_disconnect
, last_will_enabled_no_disconnect
, disconnect
, keepalive
, maintenance
, client_no_supported_protocol
, client_not_support_mqtt
, unacceptable_data_type
, duplicate_id
, handle_invalid_packets
, duplicate_connect
] ++ ?MANAGEMENT_PLUGIN_TESTS
}
[no_websocket_subprotocol
,unsupported_websocket_subprotocol
,unacceptable_data_type
,handle_invalid_packets
,duplicate_connect
]}
].
suite() ->
[{timetrap, {minutes, 5}}].
[{timetrap, {minutes, 2}}].
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
@ -76,185 +52,32 @@ end_per_group(_, Config) ->
Config.
init_per_testcase(Testcase, Config) ->
maybe_start_inets(Testcase),
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
maybe_stop_inets(Testcase),
rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
maybe_start_inets(Testcase) ->
case lists:member(Testcase, ?MANAGEMENT_PLUGIN_TESTS) of
true ->
ok = inets:start();
false ->
ok
end.
maybe_stop_inets(Testcase) ->
case lists:member(Testcase, ?MANAGEMENT_PLUGIN_TESTS) of
true ->
ok = inets:stop();
false ->
ok
end.
%% -------------------------------------------------------------------
%% Testsuite cases
%% -------------------------------------------------------------------
block(Config) ->
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
C = ws_connect(ClientId, Config),
no_websocket_subprotocol(Config) ->
websocket_subprotocol(Config, []).
{ok, _, _} = emqtt:subscribe(C, Topic),
{ok, _} = emqtt:publish(C, Topic, <<"Not blocked yet">>, [{qos, 1}]),
unsupported_websocket_subprotocol(Config) ->
websocket_subprotocol(Config, ["not-mqtt-protocol"]).
ok = rpc(Config, 0, vm_memory_monitor, set_vm_memory_high_watermark, [0.00000001]),
%% Let it block
timer:sleep(100),
%% Blocked, but still will publish when unblocked
puback_timeout = publish_qos1_timeout(C, Topic, <<"Now blocked">>, 1000),
puback_timeout = publish_qos1_timeout(C, Topic, <<"Still blocked">>, 1000),
%% Unblock
rpc(Config, 0, vm_memory_monitor, set_vm_memory_high_watermark, [0.4]),
ok = expect_publishes(C, Topic, [<<"Not blocked yet">>,
<<"Now blocked">>,
<<"Still blocked">>]),
ok = emqtt:disconnect(C).
pubsub_shared_connection(Config) ->
C = ws_connect(?FUNCTION_NAME, Config),
Topic = <<"/topic/test-web-mqtt">>,
{ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
Payload = <<"a\x00a">>,
?assertMatch({ok, #{packet_id := _,
reason_code := 0,
reason_code_name := success
}},
emqtt:publish(C, Topic, Payload, [{qos, 1}])),
ok = expect_publishes(C, Topic, [Payload]),
ok = emqtt:disconnect(C).
pubsub_separate_connections(Config) ->
Publisher = ws_connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_publisher">>, Config),
Consumer = ws_connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_consumer">>, Config),
Topic = <<"/topic/test-web-mqtt">>,
{ok, _, [1]} = emqtt:subscribe(Consumer, Topic, qos1),
Payload = <<"a\x00a">>,
?assertMatch({ok, #{packet_id := _,
reason_code := 0,
reason_code_name := success
}},
emqtt:publish(Publisher, Topic, Payload, [{qos, 1}])),
ok = expect_publishes(Consumer, Topic, [Payload]),
ok = emqtt:disconnect(Publisher),
ok = emqtt:disconnect(Consumer).
last_will_enabled_disconnect(Config) ->
LastWillTopic = <<"/topic/web-mqtt-tests-ws1-last-will">>,
LastWillMsg = <<"a last will and testament message">>,
PubOpts = [{will_topic, LastWillTopic},
{will_payload, LastWillMsg},
{will_qos, 1}],
Publisher = ws_connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_publisher">>, Config, PubOpts),
Consumer = ws_connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_consumer">>, Config),
{ok, _, [1]} = emqtt:subscribe(Consumer, LastWillTopic, qos1),
%% Client sends DISCONNECT packet. Therefore, will message should not be sent.
ok = emqtt:disconnect(Publisher),
?assertEqual({publish_not_received, LastWillMsg},
expect_publishes(Consumer, LastWillTopic, [LastWillMsg])),
ok = emqtt:disconnect(Consumer).
last_will_enabled_no_disconnect(Config) ->
LastWillTopic = <<"/topic/web-mqtt-tests-ws1-last-will">>,
LastWillMsg = <<"a last will and testament message">>,
PubOpts = [{will_topic, LastWillTopic},
{will_payload, LastWillMsg},
{will_qos, 1}],
_Publisher = ws_connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_publisher">>, Config, PubOpts),
timer:sleep(100),
[ServerPublisherPid] = rpc(Config, 0, rabbit_mqtt, local_connection_pids, []),
Consumer = ws_connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_consumer">>, Config),
{ok, _, [1]} = emqtt:subscribe(Consumer, LastWillTopic, qos1),
%% Client does not send DISCONNECT packet. Therefore, will message should be sent.
erlang:exit(ServerPublisherPid, test_will),
?assertEqual(ok, expect_publishes(Consumer, LastWillTopic, [LastWillMsg])),
ok = emqtt:disconnect(Consumer).
disconnect(Config) ->
C = ws_connect(?FUNCTION_NAME, Config),
process_flag(trap_exit, true),
eventually(?_assertEqual(1, num_mqtt_connections(Config, 0))),
ok = emqtt:disconnect(C),
receive
{'EXIT', C, normal} ->
ok
after 5000 ->
ct:fail("disconnect didn't terminate client")
end,
eventually(?_assertEqual(0, num_mqtt_connections(Config, 0))),
ok.
keepalive(Config) ->
KeepaliveSecs = 1,
KeepaliveMs = timer:seconds(KeepaliveSecs),
C = ws_connect(?FUNCTION_NAME, Config, [{keepalive, KeepaliveSecs}]),
%% Connection should stay up when client sends PING requests.
timer:sleep(KeepaliveMs),
%% Mock the server socket to not have received any bytes.
rabbit_ct_broker_helpers:setup_meck(Config),
Mod = rabbit_net,
ok = rpc(Config, 0, meck, new, [Mod, [no_link, passthrough]]),
ok = rpc(Config, 0, meck, expect, [Mod, getstat, 2, {ok, [{recv_oct, 999}]} ]),
process_flag(trap_exit, true),
receive
{'EXIT', C, _Reason} ->
ok
after
ceil(3 * 0.75 * KeepaliveMs) ->
ct:fail("server did not respect keepalive")
end,
true = rpc(Config, 0, meck, validate, [Mod]),
ok = rpc(Config, 0, meck, unload, [Mod]).
maintenance(Config) ->
C = ws_connect(?FUNCTION_NAME, Config),
true = unlink(C),
eventually(?_assertEqual(1, num_mqtt_connections(Config, 0))),
ok = rabbit_ct_broker_helpers:drain_node(Config, 0),
eventually(?_assertEqual(0, num_mqtt_connections(Config, 0))),
ok = rabbit_ct_broker_helpers:revive_node(Config, 0).
client_no_supported_protocol(Config) ->
client_protocol_test(Config, []).
client_not_support_mqtt(Config) ->
client_protocol_test(Config, ["not-mqtt-protocol"]).
client_protocol_test(Config, Protocol) ->
%% "The client MUST include “mqtt” in the list of WebSocket Sub Protocols it offers" [MQTT-6.0.0-3].
websocket_subprotocol(Config, SubProtocol) ->
PortStr = rabbit_ws_test_util:get_web_mqtt_port_str(Config),
WS = rfc6455_client:new("ws://localhost:" ++ PortStr ++ "/ws", self(), undefined, Protocol),
WS = rfc6455_client:new("ws://localhost:" ++ PortStr ++ "/ws", self(), undefined, SubProtocol),
{_, [{http_response, Res}]} = rfc6455_client:open(WS),
{'HTTP/1.1', 400, <<"Bad Request">>, _} = cow_http:parse_status_line(rabbit_data_coercion:to_binary(Res)),
rfc6455_client:send_binary(WS, rabbit_ws_test_util:mqtt_3_1_1_connect_packet()),
{close, _} = rfc6455_client:recv(WS, timer:seconds(1)).
%% "MQTT Control Packets MUST be sent in WebSocket binary data frames. If any other type
%% of data frame is received the recipient MUST close the Network Connection" [MQTT-6.0.0-1].
unacceptable_data_type(Config) ->
PortStr = rabbit_ws_test_util:get_web_mqtt_port_str(Config),
WS = rfc6455_client:new("ws://localhost:" ++ PortStr ++ "/ws", self(), undefined, ["mqtt"]),
@ -262,20 +85,6 @@ unacceptable_data_type(Config) ->
rfc6455_client:send(WS, "not-binary-data"),
{close, {1003, _}} = rfc6455_client:recv(WS, timer:seconds(1)).
duplicate_id(Config) ->
C1 = ws_connect(?FUNCTION_NAME, Config),
eventually(?_assertEqual(1, num_mqtt_connections(Config, 0))),
process_flag(trap_exit, true),
C2 = ws_connect(?FUNCTION_NAME, Config),
receive
{'EXIT', C1, _Reason} ->
ok
after 5000 ->
ct:fail("server did not disconnect a client with duplicate ID")
end,
eventually(?_assertEqual(1, num_mqtt_connections(Config, 0))),
ok = emqtt:disconnect(C2).
handle_invalid_packets(Config) ->
PortStr = rabbit_ws_test_util:get_web_mqtt_port_str(Config),
WS = rfc6455_client:new("ws://localhost:" ++ PortStr ++ "/ws", self(), undefined, ["mqtt"]),
@ -284,46 +93,6 @@ handle_invalid_packets(Config) ->
rfc6455_client:send_binary(WS, Bin),
{close, {1002, _}} = rfc6455_client:recv(WS, timer:seconds(1)).
%% Test that Web MQTT connection can be listed and closed via the rabbitmq_management plugin.
management_plugin_connection(Config) ->
KeepaliveSecs = 99,
ClientId = atom_to_binary(?FUNCTION_NAME),
Node = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)),
C = ws_connect(ClientId, Config, [{keepalive, KeepaliveSecs}]),
eventually(?_assertEqual(1, length(http_get(Config, "/connections"))), 1000, 10),
[#{client_properties := #{client_id := ClientId},
timeout := KeepaliveSecs,
node := Node,
name := ConnectionName}] = http_get(Config, "/connections"),
process_flag(trap_exit, true),
http_delete(Config,
"/connections/" ++ binary_to_list(uri_string:quote((ConnectionName))),
?NO_CONTENT),
receive
{'EXIT', C, _} ->
ok
after 5000 ->
ct:fail("server did not close connection")
end,
?assertEqual([], http_get(Config, "/connections")),
?assertEqual(0, num_mqtt_connections(Config, 0)).
management_plugin_enable(Config) ->
?assertEqual(0, length(http_get(Config, "/connections"))),
ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, rabbitmq_management),
ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, rabbitmq_management_agent),
%% If the Web MQTT connection is established **before** the management plugin is enabled,
%% the management plugin should still list the Web MQTT connection.
C = ws_connect(?FUNCTION_NAME, Config),
ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, rabbitmq_management_agent),
ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, rabbitmq_management),
eventually(?_assertEqual(1, length(http_get(Config, "/connections"))), 1000, 10),
ok = emqtt:disconnect(C).
%% "A Client can only send the CONNECT Packet once over a Network Connection.
%% The Server MUST process a second CONNECT Packet sent from a Client as a protocol
%% violation and disconnect the Client [MQTT-3.1.0-2].
@ -351,51 +120,4 @@ duplicate_connect(Config) ->
%% Web mqtt connections are tracked together with mqtt connections
num_mqtt_connections(Config, Node) ->
length(rpc(Config, Node, rabbit_mqtt, local_connection_pids, [])).
ws_connect(ClientId, Config) ->
ws_connect(ClientId, Config, []).
ws_connect(ClientId, Config, AdditionalOpts) ->
P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt),
Options = [{host, "localhost"},
{username, "guest"},
{password, "guest"},
{ws_path, "/ws"},
{port, P},
{clientid, rabbit_data_coercion:to_binary(ClientId)},
{proto_ver, v4}
] ++ AdditionalOpts,
{ok, C} = emqtt:start_link(Options),
{ok, _Properties} = emqtt:ws_connect(C),
C.
expect_publishes(_ClientPid, _Topic, []) ->
ok;
expect_publishes(ClientPid, Topic, [Payload|Rest]) ->
receive
{publish, #{client_pid := ClientPid,
topic := Topic,
payload := Payload}} ->
expect_publishes(ClientPid, Topic, Rest)
after 1000 ->
{publish_not_received, Payload}
end.
publish_qos1_timeout(Client, Topic, Payload, Timeout) ->
Mref = erlang:monitor(process, Client),
ok = emqtt:publish_async(Client, Topic, #{}, Payload, [{qos, 1}], infinity,
{fun ?MODULE:sync_publish_result/3, [self(), Mref]}),
receive
{Mref, Reply} ->
erlang:demonitor(Mref, [flush]),
Reply;
{'DOWN', Mref, process, Client, Reason} ->
ct:fail("client is down: ~tp", [Reason])
after
Timeout ->
erlang:demonitor(Mref, [flush]),
puback_timeout
end.
sync_publish_result(Caller, Mref, Result) ->
erlang:send(Caller, {Mref, Result}).
length(rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_mqtt, local_connection_pids, [])).