Merge pull request #10761 from rabbitmq/cloudamqp-fix/9302-list-webmqtt-connections

A new command for Web MQTT connection listing #10693 #9302
This commit is contained in:
Michael Klishin 2024-03-18 20:20:14 -04:00 committed by GitHub
commit f7697c3d19
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 434 additions and 28 deletions

View File

@ -34,7 +34,8 @@
force_connection_event_refresh/1, force_non_amqp_connection_event_refresh/1,
handshake/2, tcp_host/1,
ranch_ref/1, ranch_ref/2, ranch_ref_of_protocol/1,
listener_of_protocol/1, stop_ranch_listener_of_protocol/1]).
listener_of_protocol/1, stop_ranch_listener_of_protocol/1,
list_local_connections_of_protocol/1]).
%% Used by TCP-based transports, e.g. STOMP adapter
-export([tcp_listener_addresses/1,
@ -252,6 +253,13 @@ stop_ranch_listener_of_protocol(Protocol) ->
ranch:stop_listener(Ref)
end.
-spec list_local_connections_of_protocol(atom()) -> [pid()].
list_local_connections_of_protocol(Protocol) ->
case ranch_ref_of_protocol(Protocol) of
undefined -> [];
AcceptorRef -> ranch:procs(AcceptorRef, connections)
end.
-spec start_tcp_listener(
listener_config(), integer()) -> 'ok' | {'error', term()}.

View File

@ -12,7 +12,10 @@
-define(PERSISTENT_TERM_EXCHANGE, mqtt_exchange).
-define(DEFAULT_MQTT_EXCHANGE, <<"amq.topic">>).
-define(MQTT_GUIDE_URL, <<"https://rabbitmq.com/docs/mqtt/">>).
-define(WEB_MQTT_GUIDE_URL, <<"https://rabbitmq.com/docs/web-mqtt/">>).
-define(MQTT_TCP_PROTOCOL, 'mqtt').
-define(MQTT_TLS_PROTOCOL, 'mqtt/ssl').
-define(MQTT_PROTO_V3, mqtt310).
-define(MQTT_PROTO_V4, mqtt311).
-define(MQTT_PROTO_V5, mqtt50).

View File

@ -62,7 +62,7 @@ emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
-spec emit_connection_info_local(rabbit_types:info_keys(), reference(), pid()) -> ok.
emit_connection_info_local(Items, Ref, AggregatorPid) ->
LocalPids = local_connection_pids(),
LocalPids = list_local_mqtt_connections(),
emit_connection_info(Items, Ref, AggregatorPid, LocalPids).
emit_connection_info(Items, Ref, AggregatorPid, Pids) ->
@ -93,6 +93,13 @@ local_connection_pids() ->
end, pg:which_groups(PgScope))
end.
%% This function excludes Web MQTT connections
list_local_mqtt_connections() ->
PlainPids = rabbit_networking:list_local_connections_of_protocol(?MQTT_TCP_PROTOCOL),
TLSPids = rabbit_networking:list_local_connections_of_protocol(?MQTT_TLS_PROTOCOL),
PlainPids ++ TLSPids.
init_global_counters() ->
lists:foreach(fun init_global_counters/1, [?MQTT_PROTO_V3,
?MQTT_PROTO_V4,

View File

@ -13,9 +13,6 @@
-export([start_link/2, init/1, stop_listeners/0]).
-define(TCP_PROTOCOL, 'mqtt').
-define(TLS_PROTOCOL, 'mqtt/ssl').
start_link(Listeners, []) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [Listeners]).
@ -66,8 +63,8 @@ init([{Listeners, SslListeners0}]) ->
-spec stop_listeners() -> ok.
stop_listeners() ->
_ = rabbit_networking:stop_ranch_listener_of_protocol(?TCP_PROTOCOL),
_ = rabbit_networking:stop_ranch_listener_of_protocol(?TLS_PROTOCOL),
_ = rabbit_networking:stop_ranch_listener_of_protocol(?MQTT_TCP_PROTOCOL),
_ = rabbit_networking:stop_ranch_listener_of_protocol(?MQTT_TLS_PROTOCOL),
ok.
%%
@ -86,7 +83,7 @@ tcp_listener_spec([Address, SocketOpts, NumAcceptors, ConcurrentConnsSups]) ->
rabbit_mqtt_listener_sup,
Address,
SocketOpts,
transport(?TCP_PROTOCOL),
transport(?MQTT_TCP_PROTOCOL),
rabbit_mqtt_reader,
[],
mqtt,
@ -101,7 +98,7 @@ ssl_listener_spec([Address, SocketOpts, SslOpts, NumAcceptors, ConcurrentConnsSu
rabbit_mqtt_listener_sup,
Address,
SocketOpts ++ SslOpts,
transport(?TLS_PROTOCOL),
transport(?MQTT_TLS_PROTOCOL),
rabbit_mqtt_reader,
[],
'mqtt/ssl',
@ -111,7 +108,7 @@ ssl_listener_spec([Address, SocketOpts, SslOpts, NumAcceptors, ConcurrentConnsSu
"MQTT TLS listener"
).
transport(?TCP_PROTOCOL) ->
transport(?MQTT_TCP_PROTOCOL) ->
ranch_tcp;
transport(?TLS_PROTOCOL) ->
transport(?MQTT_TLS_PROTOCOL) ->
ranch_ssl.

View File

@ -57,7 +57,23 @@ init_per_group(unit, Config) ->
Config;
init_per_group(Group, Config) ->
Config1 = rabbit_ct_helpers:set_config(Config, {mqtt_version, Group}),
util:maybe_skip_v5(Config1).
case Group of
v4 ->
AllApps = rabbit_ct_broker_helpers:rpc_all(Config1, application, loaded_applications, []),
AllAppNames = lists:map(fun (AppList) ->
lists:map(fun ({Name, _, _}) -> Name end, AppList)
end, AllApps),
case lists:all(fun (NodeApps) ->
lists:member(rabbit_web_mqtt_app, NodeApps)
end, AllAppNames) of
true ->
Config1;
false ->
{skip, "rabbit_web_mqtt_app not available on all nodes"}
end;
v5 ->
util:maybe_skip_v5(Config1)
end.
end_per_group(_, Config) ->
Config.
@ -86,6 +102,13 @@ run(Config) ->
%% No connections
[] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)),
%% Open a WebMQTT connection, command won't list it
WebMqttConfig = [{websocket, true} | Config],
_C0 = connect(<<"simpleWebMqttClient">>, WebMqttConfig, [{ack_timeout, 1}]),
[] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)),
%% Open a connection
C1 = connect(<<"simpleClient">>, Config, [{ack_timeout, 1}]),
timer:sleep(100),

View File

@ -68,6 +68,9 @@ rabbitmq_app(
xref(
name = "xref",
additional_libs = [
"//deps/rabbitmq_cli:erlang_app", # keep
],
target = ":erlang_app",
)
@ -77,6 +80,7 @@ plt(
ignore_warnings = True,
libs = ["//deps/rabbitmq_cli:elixir"], # keep
plt = "//:base_plt",
deps = ["//deps/rabbitmq_cli:erlang_app"], # keep
)
dialyze(
@ -91,6 +95,7 @@ eunit(
compiled_suites = [
":test_src_rabbit_ws_test_util_beam",
":test_src_rfc6455_client_beam",
":test_rabbit_web_mqtt_test_util_beam",
],
target = ":test_erlang_app",
)
@ -101,6 +106,16 @@ rabbitmq_integration_suite(
name = "config_schema_SUITE",
)
rabbitmq_integration_suite(
name = "command_SUITE",
additional_beam = [
"test/rabbit_web_mqtt_test_util.beam",
],
runtime_deps = [
"@emqtt//:erlang_app",
],
)
rabbitmq_integration_suite(
name = "proxy_protocol_SUITE",
additional_beam = [

View File

@ -9,6 +9,7 @@ def all_beam_files(name = "all_beam_files"):
erlang_bytecode(
name = "other_beam",
srcs = [
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListWebMqttConnectionsCommand.erl",
"src/rabbit_web_mqtt_app.erl",
"src/rabbit_web_mqtt_handler.erl",
"src/rabbit_web_mqtt_stream_handler.erl",
@ -19,6 +20,7 @@ def all_beam_files(name = "all_beam_files"):
erlc_opts = "//:erlc_opts",
deps = [
"//deps/rabbit_common:erlang_app",
"//deps/rabbitmq_cli:erlang_app",
"//deps/rabbitmq_mqtt:erlang_app",
"@cowboy//:erlang_app",
],
@ -34,6 +36,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
name = "test_other_beam",
testonly = True,
srcs = [
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListWebMqttConnectionsCommand.erl",
"src/rabbit_web_mqtt_app.erl",
"src/rabbit_web_mqtt_handler.erl",
"src/rabbit_web_mqtt_stream_handler.erl",
@ -44,6 +47,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = [
"//deps/rabbit_common:erlang_app",
"//deps/rabbitmq_cli:erlang_app",
"//deps/rabbitmq_mqtt:erlang_app",
"@cowboy//:erlang_app",
],
@ -70,6 +74,7 @@ def all_srcs(name = "all_srcs"):
filegroup(
name = "srcs",
srcs = [
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListWebMqttConnectionsCommand.erl",
"src/rabbit_web_mqtt_app.erl",
"src/rabbit_web_mqtt_handler.erl",
"src/rabbit_web_mqtt_stream_handler.erl",
@ -128,3 +133,20 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
app_name = "rabbitmq_web_mqtt",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "command_SUITE_beam_files",
testonly = True,
srcs = ["test/command_SUITE.erl"],
outs = ["test/command_SUITE.beam"],
app_name = "rabbitmq_web_mqtt",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_mqtt:erlang_app"],
)
erlang_bytecode(
name = "test_rabbit_web_mqtt_test_util_beam",
testonly = True,
srcs = ["test/rabbit_web_mqtt_test_util.erl"],
outs = ["test/rabbit_web_mqtt_test_util.beam"],
app_name = "rabbitmq_web_mqtt",
erlc_opts = "//:test_erlc_opts",
)

View File

@ -0,0 +1,86 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ListWebMqttConnectionsCommand').
-include_lib("rabbitmq_mqtt/include/rabbit_mqtt.hrl").
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
-export([formatter/0,
scopes/0,
switches/0,
aliases/0,
usage/0,
usage_additional/0,
usage_doc_guides/0,
banner/2,
validate/2,
merge_defaults/2,
run/2,
output/2,
description/0,
help_section/0]).
formatter() -> 'Elixir.RabbitMQ.CLI.Formatters.Table'.
scopes() -> [ctl, diagnostics].
switches() -> [{verbose, boolean}].
aliases() -> [{'V', verbose}].
description() -> <<"Lists all Web MQTT connections">>.
help_section() ->
{plugin, web_mqtt}.
validate(Args, _) ->
InfoItems = lists:map(fun atom_to_list/1, ?INFO_ITEMS),
case 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args,
InfoItems) of
{ok, _} -> ok;
Error -> Error
end.
merge_defaults([], Opts) ->
merge_defaults([<<"client_id">>, <<"conn_name">>], Opts);
merge_defaults(Args, Opts) ->
{Args, maps:merge(#{verbose => false}, Opts)}.
usage() ->
<<"list_web_mqtt_connections [<column> ...]">>.
usage_additional() ->
Prefix = <<" must be one of ">>,
InfoItems = 'Elixir.Enum':join(lists:usort(?INFO_ITEMS), <<", ">>),
[
{<<"<column>">>, <<Prefix/binary, InfoItems/binary>>}
].
usage_doc_guides() ->
[?WEB_MQTT_GUIDE_URL].
run(Args, #{node := NodeName,
timeout := Timeout,
verbose := Verbose}) ->
InfoKeys = case Verbose of
true -> ?INFO_ITEMS;
false -> 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args)
end,
Nodes = 'Elixir.RabbitMQ.CLI.Core.Helpers':nodes_in_cluster(NodeName),
'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items(
NodeName,
rabbit_web_mqtt_app,
emit_connection_info_all,
[Nodes, InfoKeys],
Timeout,
InfoKeys,
length(Nodes)).
banner(_, _) -> <<"Listing Web MQTT connections ...">>.
output(Result, _Opts) ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result).

View File

@ -12,7 +12,9 @@
start/2,
prep_stop/1,
stop/1,
list_connections/0
list_connections/0,
emit_connection_info_all/4,
emit_connection_info_local/3
]).
%% Dummy supervisor - see Ulf Wiger's comment at
@ -48,27 +50,33 @@ init([]) -> {ok, {{one_for_one, 1, 5}, []}}.
-spec list_connections() -> [pid()].
list_connections() ->
PlainPids = connection_pids_of_protocol(?TCP_PROTOCOL),
TLSPids = connection_pids_of_protocol(?TLS_PROTOCOL),
PlainPids = rabbit_networking:list_local_connections_of_protocol(?TCP_PROTOCOL),
TLSPids = rabbit_networking:list_local_connections_of_protocol(?TLS_PROTOCOL),
PlainPids ++ TLSPids.
-spec emit_connection_info_all([node()], rabbit_types:info_keys(), reference(), pid()) -> term().
emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
Pids = [spawn_link(Node, ?MODULE, emit_connection_info_local,
[Items, Ref, AggregatorPid])
|| Node <- Nodes],
rabbit_control_misc:await_emitters_termination(Pids).
-spec emit_connection_info_local(rabbit_types:info_keys(), reference(), pid()) -> ok.
emit_connection_info_local(Items, Ref, AggregatorPid) ->
LocalPids = list_connections(),
emit_connection_info(Items, Ref, AggregatorPid, LocalPids).
emit_connection_info(Items, Ref, AggregatorPid, Pids) ->
rabbit_control_misc:emitting_map_with_exit_handler(
AggregatorPid, Ref,
fun(Pid) ->
rabbit_web_mqtt_handler:info(Pid, Items)
end, Pids).
%%
%% Implementation
%%
connection_pids_of_protocol(Protocol) ->
case rabbit_networking:ranch_ref_of_protocol(Protocol) of
undefined -> [];
AcceptorRef ->
lists:map(fun cowboy_ws_connection_pid/1, ranch:procs(AcceptorRef, connections))
end.
-spec cowboy_ws_connection_pid(pid()) -> pid().
cowboy_ws_connection_pid(RanchConnPid) ->
Children = supervisor:which_children(RanchConnPid),
{cowboy_clear, Pid, _, _} = lists:keyfind(cowboy_clear, 1, Children),
Pid.
mqtt_init() ->
CowboyOpts0 = maps:from_list(get_env(cowboy_opts, [])),
CowboyWsOpts = maps:from_list(get_env(cowboy_ws_opts, [])),

View File

@ -23,6 +23,7 @@
]).
-export([conserve_resources/3]).
-export([info/2]).
%% cowboy_sub_protocol
-export([upgrade/4,
@ -94,6 +95,19 @@ init(Req, Opts) ->
end
end.
%% We cannot use a gen_server call, because the handler process is a
%% special cowboy_websocket process (not a gen_server) which assumes
%% all gen_server calls are supervisor calls, and does not pass on the
%% request to this callback module. (see cowboy_websocket:loop/3 and
%% cowboy_children:handle_supervisor_call/4) However using a generic
%% gen:call with a special label ?MODULE works fine.
-spec info(pid(), rabbit_types:info_keys()) ->
rabbit_types:infos().
info(Pid, all) ->
info(Pid, ?INFO_ITEMS);
info(Pid, Items) ->
{ok, Res} = gen:call(Pid, ?MODULE, {info, Items}),
Res.
-spec websocket_init(state()) ->
{cowboy_websocket:commands(), state()} |
{cowboy_websocket:commands(), state(), hibernate}.
@ -244,6 +258,10 @@ websocket_info(connection_created, State) ->
rabbit_core_metrics:connection_created(self(), Infos),
rabbit_event:notify(connection_created, Infos),
{[], State, hibernate};
websocket_info({?MODULE, From, {info, Items}}, State) ->
Infos = infos(Items, State),
gen:reply(From, Infos),
{[], State, hibernate};
websocket_info(Msg, State) ->
?LOG_WARNING("Web MQTT: unexpected message ~tp", [Msg]),
{[], State, hibernate}.

View File

@ -0,0 +1,179 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
-module(command_SUITE).
-compile([export_all, nowarn_export_all]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_mqtt/include/rabbit_mqtt.hrl").
-import(rabbit_web_mqtt_test_util, [connect/3, connect/4]).
-define(COMMAND, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListWebMqttConnectionsCommand').
all() ->
[
{group, unit},
{group, v5}
].
groups() ->
[
{unit, [], [merge_defaults]},
{v5, [], [run,
user_property]}
].
suite() ->
[
{timetrap, {minutes, 10}}
].
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, ?MODULE},
{rmq_extra_tcp_ports, [tcp_port_mqtt_extra,
tcp_port_mqtt_tls_extra]},
{rmq_nodes_clustered, true},
{rmq_nodes_count, 3}
]),
rabbit_ct_helpers:run_setup_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_group(unit, Config) ->
Config;
init_per_group(Group, Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip, "mixed version clusters are not supported"};
_ ->
rabbit_ct_helpers:set_config(Config, {mqtt_version, Group})
end.
end_per_group(_, Config) ->
Config.
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
merge_defaults(_Config) ->
{[<<"client_id">>, <<"conn_name">>], #{verbose := false}} =
?COMMAND:merge_defaults([], #{}),
{[<<"other_key">>], #{verbose := true}} =
?COMMAND:merge_defaults([<<"other_key">>], #{verbose => true}),
{[<<"other_key">>], #{verbose := false}} =
?COMMAND:merge_defaults([<<"other_key">>], #{verbose => false}).
run(BaseConfig) ->
Node = rabbit_ct_broker_helpers:get_node_config(BaseConfig, 0, nodename),
Config = [{websocket, true} | BaseConfig],
Opts = #{node => Node, timeout => 10_000, verbose => false},
%% No connections
[] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)),
%% Open an MQTT connection
C1 = connect(<<"simpleMqttClient">>, BaseConfig, [{ack_timeout, 1}]),
timer:sleep(200),
%% No connections for MQTT-over-WebSockets, C1 is an MQTT connection
[] = 'Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>], Opts)),
%% Open a WebMQTT connection
C2 = connect(<<"simpleWebMqttClient">>, Config, [{ack_timeout, 1}]),
timer:sleep(200),
[[{client_id, <<"simpleWebMqttClient">>}]] =
'Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>], Opts)),
C3 = connect(<<"simpleWebMqttClient1">>, Config, [{ack_timeout, 1}]),
timer:sleep(200),
[[{client_id, <<"simpleWebMqttClient">>}, {user, <<"guest">>}],
[{client_id, <<"simpleWebMqttClient1">>}, {user, <<"guest">>}]] =
lists:sort(
'Elixir.Enum':to_list(
?COMMAND:run([<<"client_id">>, <<"user">>], Opts))),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
start_amqp_connection(network, Node, Port),
%% There are still just two Web MQTT connections
[[{client_id, <<"simpleWebMqttClient">>}],
[{client_id, <<"simpleWebMqttClient1">>}]] =
lists:sort('Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>], Opts))),
start_amqp_connection(direct, Node, Port),
timer:sleep(200),
%% Still two Web MQTT connections
[[{client_id, <<"simpleWebMqttClient">>}],
[{client_id, <<"simpleWebMqttClient1">>}]] =
lists:sort('Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>], Opts))),
%% Verbose returns all keys
AllKeys = lists:map(fun(I) -> atom_to_binary(I) end, ?INFO_ITEMS),
[AllInfos1Con1, _AllInfos1Con2] =
'Elixir.Enum':to_list(?COMMAND:run(AllKeys, Opts)),
[AllInfos2Con1, _AllInfos2Con2] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts#{verbose => true})),
%% Keys are INFO_ITEMS
InfoItemsSorted = lists:sort(?INFO_ITEMS),
?assertEqual(InfoItemsSorted, lists:sort(proplists:get_keys(AllInfos1Con1))),
?assertEqual(InfoItemsSorted, lists:sort(proplists:get_keys(AllInfos2Con1))),
%% List Web MQTT connections from all nodes
C4 = connect(<<"simpleWebMqttClient2">>, Config, 1, [{ack_timeout, 1}]),
rabbit_ct_helpers:eventually(
?_assertEqual(
[[{client_id, <<"simpleWebMqttClient">>}],
[{client_id, <<"simpleWebMqttClient1">>}],
[{client_id, <<"simpleWebMqttClient2">>}]],
lists:sort('Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>], Opts))))),
ok = emqtt:disconnect(C1),
ok = emqtt:disconnect(C2),
ok = emqtt:disconnect(C3),
ok = emqtt:disconnect(C4).
user_property(BaseConfig) ->
Node = rabbit_ct_broker_helpers:get_node_config(BaseConfig, 0, nodename),
Config = [{websocket, true} | BaseConfig],
Opts = #{node => Node, timeout => 10_000, verbose => false},
ClientId = <<"my-client">>,
UserProp = [{<<"name 1">>, <<"value 1">>},
{<<"name 2">>, <<"value 2">>},
%% "The same name is allowed to appear more than once." [v5 3.1.2.11.8]
{<<"name 2">>, <<"value 3">>}],
C = connect(ClientId, Config, 1, [{properties, #{'User-Property' => UserProp}}]),
rabbit_ct_helpers:eventually(
?_assertEqual(
[[{client_id, ClientId},
{user_property, UserProp}]],
'Elixir.Enum':to_list(?COMMAND:run([<<"client_id">>, <<"user_property">>], Opts)))),
ok = emqtt:disconnect(C).
start_amqp_connection(Type, Node, Port) ->
amqp_connection:start(amqp_params(Type, Node, Port)).
amqp_params(network, _, Port) ->
#amqp_params_network{port = Port};
amqp_params(direct, Node, _) ->
#amqp_params_direct{node = Node}.

View File

@ -0,0 +1,39 @@
-module(rabbit_web_mqtt_test_util).
-include_lib("eunit/include/eunit.hrl").
-export([connect/3,
connect/4
]).
connect(ClientId, Config, AdditionalOpts) ->
connect(ClientId, Config, 0, AdditionalOpts).
connect(ClientId, Config, Node, AdditionalOpts) ->
{C, Connect} = start_client(ClientId, Config, Node, AdditionalOpts),
{ok, _Properties} = Connect(C),
C.
start_client(ClientId, Config, Node, AdditionalOpts) ->
{Port, WsOpts, Connect} =
case rabbit_ct_helpers:get_config(Config, websocket, false) of
false ->
{rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_mqtt),
[],
fun emqtt:connect/1};
true ->
{rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_web_mqtt),
[{ws_path, "/ws"}],
fun emqtt:ws_connect/1}
end,
ProtoVer = proplists:get_value(
proto_ver,
AdditionalOpts,
rabbit_ct_helpers:get_config(Config, mqtt_version, v4)),
Options = [{host, "localhost"},
{port, Port},
{proto_ver, ProtoVer},
{clientid, rabbit_data_coercion:to_binary(ClientId)}
] ++ WsOpts ++ AdditionalOpts,
{ok, C} = emqtt:start_link(Options),
{C, Connect}.

View File

@ -1232,6 +1232,7 @@ rabbitmq_web_dispatch:
- webmachine_log
- webmachine_log_handler
rabbitmq_web_mqtt:
- Elixir.RabbitMQ.CLI.Ctl.Commands.ListWebMqttConnectionsCommand
- rabbit_web_mqtt_app
- rabbit_web_mqtt_handler
- rabbit_web_mqtt_stream_handler