Require MQTT feature flags in 4.0

Require all MQTT feature flags and remove their compatibility code:
* delete_ra_cluster_mqtt_node
* rabbit_mqtt_qos0_queue
* mqtt_v5

These feature flags were introduced in or before 3.13.0.
This commit is contained in:
David Ansari 2024-07-09 12:30:47 +02:00
parent 6b6cf58af8
commit 50116f0927
31 changed files with 143 additions and 1420 deletions

View File

@ -84,7 +84,6 @@ rabbitmq_app(
"//deps/amqp10_common:erlang_app",
"//deps/rabbit:erlang_app",
"//deps/rabbit_common:erlang_app",
"@ra//:erlang_app",
"@ranch//:erlang_app",
],
)
@ -178,16 +177,6 @@ rabbitmq_integration_suite(
name = "config_schema_SUITE",
)
rabbitmq_integration_suite(
name = "ff_SUITE",
additional_beam = [
":test_util_beam",
],
runtime_deps = [
"@emqtt//:erlang_app",
],
)
rabbitmq_integration_suite(
name = "java_SUITE",
additional_beam = [
@ -197,11 +186,6 @@ rabbitmq_integration_suite(
sharding_method = "group",
)
rabbitmq_suite(
name = "mqtt_machine_SUITE",
size = "small",
)
rabbitmq_suite(
name = "processor_SUITE",
size = "small",

View File

@ -44,7 +44,7 @@ BUILD_WITHOUT_QUIC=1
export BUILD_WITHOUT_QUIC
LOCAL_DEPS = ssl
DEPS = ranch rabbit_common rabbit ra amqp10_common
DEPS = ranch rabbit_common rabbit amqp10_common
TEST_DEPS = emqtt ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management rabbitmq_web_mqtt amqp_client rabbitmq_consistent_hash_exchange rabbitmq_amqp_client rabbitmq_stomp rabbitmq_stream
PLT_APPS += rabbitmqctl elixir

View File

@ -17,14 +17,9 @@ def all_beam_files(name = "all_beam_files"):
erlang_bytecode(
name = "other_beam",
srcs = [
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl",
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand.erl",
"src/mc_mqtt.erl",
"src/mqtt_machine.erl",
"src/mqtt_machine_v0.erl",
"src/mqtt_node.erl",
"src/rabbit_mqtt.erl",
"src/rabbit_mqtt_collector.erl",
"src/rabbit_mqtt_confirms.erl",
"src/rabbit_mqtt_ff.erl",
"src/rabbit_mqtt_internal_event_handler.erl",
@ -46,7 +41,7 @@ def all_beam_files(name = "all_beam_files"):
beam = [":behaviours"],
dest = "ebin",
erlc_opts = "//:erlc_opts",
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbit:erlang_app", "//deps/rabbit_common:erlang_app", "//deps/rabbitmq_cli:erlang_app", "@ra//:erlang_app", "@ranch//:erlang_app"],
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbit:erlang_app", "//deps/rabbit_common:erlang_app", "//deps/rabbitmq_cli:erlang_app", "@ranch//:erlang_app"],
)
def all_test_beam_files(name = "all_test_beam_files"):
@ -68,14 +63,9 @@ def all_test_beam_files(name = "all_test_beam_files"):
name = "test_other_beam",
testonly = True,
srcs = [
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl",
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand.erl",
"src/mc_mqtt.erl",
"src/mqtt_machine.erl",
"src/mqtt_machine_v0.erl",
"src/mqtt_node.erl",
"src/rabbit_mqtt.erl",
"src/rabbit_mqtt_collector.erl",
"src/rabbit_mqtt_confirms.erl",
"src/rabbit_mqtt_ff.erl",
"src/rabbit_mqtt_internal_event_handler.erl",
@ -102,7 +92,6 @@ def all_test_beam_files(name = "all_test_beam_files"):
"//deps/rabbit:erlang_app",
"//deps/rabbit_common:erlang_app",
"//deps/rabbitmq_cli:erlang_app",
"@ra//:erlang_app",
"@ranch//:erlang_app",
],
)
@ -127,14 +116,9 @@ def all_srcs(name = "all_srcs"):
filegroup(
name = "srcs",
srcs = [
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl",
"src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand.erl",
"src/mc_mqtt.erl",
"src/mqtt_machine.erl",
"src/mqtt_machine_v0.erl",
"src/mqtt_node.erl",
"src/rabbit_mqtt.erl",
"src/rabbit_mqtt_collector.erl",
"src/rabbit_mqtt_confirms.erl",
"src/rabbit_mqtt_ff.erl",
"src/rabbit_mqtt_internal_event_handler.erl",
@ -156,8 +140,6 @@ def all_srcs(name = "all_srcs"):
filegroup(
name = "public_hdrs",
srcs = [
"include/mqtt_machine.hrl",
"include/mqtt_machine_v0.hrl",
"include/rabbit_mqtt.hrl",
"include/rabbit_mqtt_packet.hrl",
],
@ -213,15 +195,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
app_name = "rabbitmq_mqtt",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "mqtt_machine_SUITE_beam_files",
testonly = True,
srcs = ["test/mqtt_machine_SUITE.erl"],
outs = ["test/mqtt_machine_SUITE.beam"],
hdrs = ["include/mqtt_machine.hrl"],
app_name = "rabbitmq_mqtt",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "processor_SUITE_beam_files",
testonly = True,
@ -280,14 +254,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
app_name = "rabbitmq_mqtt",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "ff_SUITE_beam_files",
testonly = True,
srcs = ["test/ff_SUITE.erl"],
outs = ["test/ff_SUITE.beam"],
app_name = "rabbitmq_mqtt",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "shared_SUITE_beam_files",
testonly = True,

View File

@ -1,25 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
%% A client ID that is tracked in Ra is a list of bytes
%% as returned by binary_to_list/1 in
%% https://github.com/rabbitmq/rabbitmq-server/blob/48467d6e1283b8d81e52cfd49c06ea4eaa31617d/deps/rabbitmq_mqtt/src/rabbit_mqtt_frame.erl#L137
%% prior to 3.12.0.
%% This has two downsides:
%% 1. Lists consume more memory than binaries (when tracking many clients).
%% 2. This violates the MQTT spec which states
%% "The ClientId MUST be a UTF-8 encoded string as defined in Section 1.5.3 [MQTT-3.1.3-4]." [v4 3.1.3.1]
%% However, for backwards compatibility, we leave the client ID as a list of bytes in the Ra machine state because
%% feature flag delete_ra_cluster_mqtt_node introduced in 3.12.0 will delete the Ra cluster anyway.
-type client_id_ra() :: [byte()].
-record(machine_state, {
client_ids = #{} :: #{client_id_ra() => Connection :: pid()},
pids = #{} :: #{Connection :: pid() => [client_id_ra(), ...]},
%% add acouple of fields for future extensibility
reserved_1,
reserved_2}).

View File

@ -1,8 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-record(machine_state, {client_ids = #{}}).

View File

@ -1,67 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand').
-include("rabbit_mqtt.hrl").
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
-export([scopes/0,
switches/0,
aliases/0,
usage/0,
usage_doc_guides/0,
banner/2,
validate/2,
merge_defaults/2,
run/2,
output/2,
description/0,
help_section/0]).
scopes() -> [ctl].
switches() -> [].
aliases() -> [].
description() -> <<"Removes cluster member and permanently deletes its cluster-wide MQTT state">>.
help_section() ->
{plugin, mqtt}.
validate([], _Opts) ->
{validation_failure, not_enough_args};
validate([_, _ | _], _Opts) ->
{validation_failure, too_many_args};
validate([_], _) ->
ok.
merge_defaults(Args, Opts) ->
{Args, Opts}.
usage() ->
<<"decommission_mqtt_node <node>">>.
usage_doc_guides() ->
[?MQTT_GUIDE_URL].
run([Node], #{node := NodeName,
timeout := Timeout}) ->
case rabbit_misc:rpc_call(NodeName, rabbit_mqtt_collector, leave, [Node], Timeout) of
{badrpc, _} = Error ->
Error;
nodedown ->
{ok, list_to_binary(io_lib:format("Node ~ts is down but has been successfully removed"
" from the cluster", [Node]))};
Result ->
%% 'ok' or 'timeout'
Result
end.
banner([Node], _) -> list_to_binary(io_lib:format("Removing node ~ts from the list of MQTT nodes...", [Node])).
output(Result, _Opts) ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result).

View File

@ -1,201 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(mqtt_machine).
-behaviour(ra_machine).
-include("mqtt_machine.hrl").
-export([version/0,
which_module/1,
init/1,
apply/3,
state_enter/2,
notify_connection/2,
overview/1]).
-type state() :: #machine_state{}.
-type config() :: map().
-type reply() :: {ok, term()} | {error, term()}.
-type command() :: {register, client_id_ra(), pid()} |
{unregister, client_id_ra(), pid()} |
list.
version() -> 1.
which_module(1) -> ?MODULE;
which_module(0) -> mqtt_machine_v0.
-spec init(config()) -> state().
init(_Conf) ->
#machine_state{}.
-spec apply(map(), command(), state()) ->
{state(), reply(), ra_machine:effects()}.
apply(_Meta, {register, ClientId, Pid},
#machine_state{client_ids = Ids,
pids = Pids0} = State0) ->
{Effects, Ids1, Pids} =
case maps:find(ClientId, Ids) of
{ok, OldPid} when Pid =/= OldPid ->
Effects0 = [{demonitor, process, OldPid},
{monitor, process, Pid},
{mod_call, ?MODULE, notify_connection,
[OldPid, duplicate_id]}],
Pids2 = case maps:take(OldPid, Pids0) of
error ->
Pids0;
{[ClientId], Pids1} ->
Pids1;
{ClientIds, Pids1} ->
Pids1#{ClientId => lists:delete(ClientId, ClientIds)}
end,
Pids3 = maps:update_with(Pid, fun(CIds) -> [ClientId | CIds] end,
[ClientId], Pids2),
{Effects0, maps:remove(ClientId, Ids), Pids3};
{ok, Pid} ->
{[], Ids, Pids0};
error ->
Pids1 = maps:update_with(Pid, fun(CIds) -> [ClientId | CIds] end,
[ClientId], Pids0),
Effects0 = [{monitor, process, Pid}],
{Effects0, Ids, Pids1}
end,
State = State0#machine_state{client_ids = maps:put(ClientId, Pid, Ids1),
pids = Pids},
{State, ok, Effects};
apply(Meta, {unregister, ClientId, Pid}, #machine_state{client_ids = Ids,
pids = Pids0} = State0) ->
State = case maps:find(ClientId, Ids) of
{ok, Pid} ->
Pids = case maps:get(Pid, Pids0, undefined) of
undefined ->
Pids0;
[ClientId] ->
maps:remove(Pid, Pids0);
Cids ->
Pids0#{Pid => lists:delete(ClientId, Cids)}
end,
State0#machine_state{client_ids = maps:remove(ClientId, Ids),
pids = Pids};
%% don't delete client id that might belong to a newer connection
%% that kicked the one with Pid out
{ok, _AnotherPid} ->
State0;
error ->
State0
end,
Effects0 = [{demonitor, process, Pid}],
%% snapshot only when the map has changed
Effects = case State of
State0 -> Effects0;
_ -> Effects0 ++ snapshot_effects(Meta, State)
end,
{State, ok, Effects};
apply(_Meta, {down, DownPid, noconnection}, State) ->
%% Monitor the node the pid is on (see {nodeup, Node} below)
%% so that we can detect when the node is re-connected and discover the
%% actual fate of the connection processes on it
Effect = {monitor, node, node(DownPid)},
{State, ok, Effect};
apply(Meta, {down, DownPid, _}, #machine_state{client_ids = Ids,
pids = Pids0} = State0) ->
case maps:get(DownPid, Pids0, undefined) of
undefined ->
{State0, ok, []};
ClientIds ->
Ids1 = maps:without(ClientIds, Ids),
State = State0#machine_state{client_ids = Ids1,
pids = maps:remove(DownPid, Pids0)},
Effects = lists:map(fun(Id) ->
[{mod_call, rabbit_log, debug,
["MQTT connection with client id '~ts' failed", [Id]]}]
end, ClientIds),
{State, ok, Effects ++ snapshot_effects(Meta, State)}
end;
apply(_Meta, {nodeup, Node}, State) ->
%% Work out if any pids that were disconnected are still
%% alive.
%% Re-request the monitor for the pids on the now-back node.
Effects = [{monitor, process, Pid} || Pid <- all_pids(State), node(Pid) == Node],
{State, ok, Effects};
apply(_Meta, {nodedown, _Node}, State) ->
{State, ok};
apply(Meta, {leave, Node}, #machine_state{client_ids = Ids,
pids = Pids0} = State0) ->
{Keep, Remove} = maps:fold(
fun (ClientId, Pid, {In, Out}) ->
case node(Pid) =/= Node of
true ->
{In#{ClientId => Pid}, Out};
false ->
{In, Out#{ClientId => Pid}}
end
end, {#{}, #{}}, Ids),
Effects = maps:fold(fun (ClientId, _Pid, Acc) ->
Pid = maps:get(ClientId, Ids),
[
{demonitor, process, Pid},
{mod_call, ?MODULE, notify_connection, [Pid, decommission_node]},
{mod_call, rabbit_log, debug,
["MQTT will remove client ID '~ts' from known "
"as its node has been decommissioned", [ClientId]]}
] ++ Acc
end, [], Remove),
State = State0#machine_state{client_ids = Keep,
pids = maps:without(maps:values(Remove), Pids0)},
{State, ok, Effects ++ snapshot_effects(Meta, State)};
apply(_Meta, {machine_version, 0, 1}, {machine_state, Ids}) ->
Pids = maps:fold(
fun(Id, Pid, Acc) ->
maps:update_with(Pid,
fun(CIds) -> [Id | CIds] end,
[Id], Acc)
end, #{}, Ids),
{#machine_state{client_ids = Ids,
pids = Pids}, ok, []};
apply(_Meta, Unknown, State) ->
logger:error("MQTT Raft state machine v1 received unknown command ~tp", [Unknown]),
{State, {error, {unknown_command, Unknown}}, []}.
-spec state_enter(ra_server:ra_state() | eol, state()) ->
ra_machine:effects().
state_enter(leader, State) ->
%% re-request monitors for all known pids, this would clean up
%% records for all connections are no longer around, e.g. right after node restart
[{monitor, process, Pid} || Pid <- all_pids(State)];
state_enter(_, _) ->
[].
-spec overview(state()) -> map().
overview(#machine_state{client_ids = ClientIds,
pids = Pids}) ->
#{num_client_ids => maps:size(ClientIds),
num_pids => maps:size(Pids)}.
%% ==========================
%% Avoids blocking the Raft leader.
-spec notify_connection(pid(), duplicate_id | decommission_node) -> pid().
notify_connection(Pid, Reason) ->
spawn(fun() -> gen_server2:cast(Pid, Reason) end).
-spec snapshot_effects(map(), state()) -> ra_machine:effects().
snapshot_effects(#{index := RaftIdx}, State) ->
[{release_cursor, RaftIdx, State}].
all_pids(#machine_state{client_ids = Ids}) ->
maps:values(Ids).

View File

@ -1,137 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(mqtt_machine_v0).
-behaviour(ra_machine).
-include("mqtt_machine_v0.hrl").
-export([init/1,
apply/3,
state_enter/2,
notify_connection/2]).
-type state() :: #machine_state{}.
-type config() :: map().
-type reply() :: {ok, term()} | {error, term()}.
-type client_id_ra() :: term().
-type command() :: {register, client_id_ra(), pid()} |
{unregister, client_id_ra(), pid()} |
list.
-spec init(config()) -> state().
init(_Conf) ->
#machine_state{}.
-spec apply(map(), command(), state()) ->
{state(), reply(), ra_machine:effects()}.
apply(_Meta, {register, ClientId, Pid}, #machine_state{client_ids = Ids} = State0) ->
{Effects, Ids1} =
case maps:find(ClientId, Ids) of
{ok, OldPid} when Pid =/= OldPid ->
Effects0 = [{demonitor, process, OldPid},
{monitor, process, Pid},
{mod_call, ?MODULE, notify_connection, [OldPid, duplicate_id]}],
{Effects0, maps:remove(ClientId, Ids)};
_ ->
Effects0 = [{monitor, process, Pid}],
{Effects0, Ids}
end,
State = State0#machine_state{client_ids = maps:put(ClientId, Pid, Ids1)},
{State, ok, Effects};
apply(Meta, {unregister, ClientId, Pid}, #machine_state{client_ids = Ids} = State0) ->
State = case maps:find(ClientId, Ids) of
{ok, Pid} -> State0#machine_state{client_ids = maps:remove(ClientId, Ids)};
%% don't delete client id that might belong to a newer connection
%% that kicked the one with Pid out
{ok, _AnotherPid} -> State0;
error -> State0
end,
Effects0 = [{demonitor, process, Pid}],
%% snapshot only when the map has changed
Effects = case State of
State0 -> Effects0;
_ -> Effects0 ++ snapshot_effects(Meta, State)
end,
{State, ok, Effects};
apply(_Meta, {down, DownPid, noconnection}, State) ->
%% Monitor the node the pid is on (see {nodeup, Node} below)
%% so that we can detect when the node is re-connected and discover the
%% actual fate of the connection processes on it
Effect = {monitor, node, node(DownPid)},
{State, ok, Effect};
apply(Meta, {down, DownPid, _}, #machine_state{client_ids = Ids} = State0) ->
Ids1 = maps:filter(fun (_ClientId, Pid) when Pid =:= DownPid ->
false;
(_, _) ->
true
end, Ids),
State = State0#machine_state{client_ids = Ids1},
Delta = maps:keys(Ids) -- maps:keys(Ids1),
Effects = lists:map(fun(Id) ->
[{mod_call, rabbit_log, debug,
["MQTT connection with client id '~ts' failed", [Id]]}] end, Delta),
{State, ok, Effects ++ snapshot_effects(Meta, State)};
apply(_Meta, {nodeup, Node}, State) ->
%% Work out if any pids that were disconnected are still
%% alive.
%% Re-request the monitor for the pids on the now-back node.
Effects = [{monitor, process, Pid} || Pid <- all_pids(State), node(Pid) == Node],
{State, ok, Effects};
apply(_Meta, {nodedown, _Node}, State) ->
{State, ok};
apply(Meta, {leave, Node}, #machine_state{client_ids = Ids} = State0) ->
Ids1 = maps:filter(fun (_ClientId, Pid) -> node(Pid) =/= Node end, Ids),
Delta = maps:keys(Ids) -- maps:keys(Ids1),
Effects = lists:foldl(fun (ClientId, Acc) ->
Pid = maps:get(ClientId, Ids),
[
{demonitor, process, Pid},
{mod_call, ?MODULE, notify_connection, [Pid, decommission_node]},
{mod_call, rabbit_log, debug,
["MQTT will remove client ID '~ts' from known "
"as its node has been decommissioned", [ClientId]]}
] ++ Acc
end, [], Delta),
State = State0#machine_state{client_ids = Ids1},
{State, ok, Effects ++ snapshot_effects(Meta, State)};
apply(_Meta, Unknown, State) ->
logger:error("MQTT Raft state machine received an unknown command ~tp", [Unknown]),
{State, {error, {unknown_command, Unknown}}, []}.
-spec state_enter(ra_server:ra_state(), state()) ->
ra_machine:effects().
state_enter(leader, State) ->
%% re-request monitors for all known pids, this would clean up
%% records for all connections are no longer around, e.g. right after node restart
[{monitor, process, Pid} || Pid <- all_pids(State)];
state_enter(_, _) ->
[].
%% ==========================
%% Avoids blocking the Raft leader.
-spec notify_connection(pid(), duplicate_id | decommission_node) -> pid().
notify_connection(Pid, Reason) ->
spawn(fun() -> gen_server2:cast(Pid, Reason) end).
-spec snapshot_effects(map(), state()) -> ra_machine:effects().
snapshot_effects(#{index := RaftIdx}, State) ->
[{release_cursor, RaftIdx, State}].
all_pids(#machine_state{client_ids = Ids}) ->
maps:values(Ids).

View File

@ -1,174 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(mqtt_node).
-export([start/0, node_id/0, server_id/0, all_node_ids/0, leave/1, trigger_election/0,
delete/1]).
-define(ID_NAME, mqtt_node).
-define(START_TIMEOUT, 100_000).
-define(RETRY_INTERVAL, 5000).
-define(RA_OPERATION_TIMEOUT, 60_000).
-define(RA_SYSTEM, coordination).
node_id() ->
server_id(node()).
server_id() ->
server_id(node()).
server_id(Node) ->
{?ID_NAME, Node}.
all_node_ids() ->
[server_id(N) || N <- rabbit_nodes:list_members(),
can_participate_in_clientid_tracking(N)].
start() ->
%% 3s to 6s randomized
Repetitions = rand:uniform(10) + 10,
start(300, Repetitions).
start(_Delay, AttemptsLeft) when AttemptsLeft =< 0 ->
ok = start_server(),
trigger_election();
start(Delay, AttemptsLeft) ->
NodeId = server_id(),
Nodes = compatible_peer_servers(),
case ra_directory:uid_of(?RA_SYSTEM, ?ID_NAME) of
undefined ->
case Nodes of
[] ->
%% Since cluster members are not known ahead of time and initial boot can be happening in parallel,
%% we wait and check a few times (up to a few seconds) to see if we can discover any peers to
%% join before forming a cluster. This reduces the probability of N independent clusters being
%% formed in the common scenario of N nodes booting in parallel e.g. because they were started
%% at the same time by a deployment tool.
%%
%% This scenario does not guarantee single cluster formation but without knowing the list of members
%% ahead of time, this is a best effort workaround. Multi-node consensus is apparently hard
%% to achieve without having consensus around expected cluster members.
rabbit_log:info("MQTT: will wait for ~tp more ms for cluster members to join before triggering a Raft leader election", [Delay]),
timer:sleep(Delay),
start(Delay, AttemptsLeft - 1);
Peers ->
%% Trigger an election.
%% This is required when we start a node for the first time.
%% Using default timeout because it supposed to reply fast.
rabbit_log:info("MQTT: discovered cluster peers that support client ID tracking: ~p", [Peers]),
ok = start_server(),
_ = join_peers(NodeId, Peers),
ra:trigger_election(NodeId, ?RA_OPERATION_TIMEOUT)
end;
_ ->
_ = join_peers(NodeId, Nodes),
ok = ra:restart_server(?RA_SYSTEM, NodeId),
ra:trigger_election(NodeId, ?RA_OPERATION_TIMEOUT)
end.
compatible_peer_servers() ->
all_node_ids() -- [(node_id())].
start_server() ->
NodeId = node_id(),
Nodes = compatible_peer_servers(),
UId = ra:new_uid(ra_lib:to_binary(?ID_NAME)),
Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
Conf = #{cluster_name => ?ID_NAME,
id => NodeId,
uid => UId,
friendly_name => atom_to_list(?ID_NAME),
initial_members => Nodes,
log_init_args => #{uid => UId},
tick_timeout => Timeout,
machine => {module, mqtt_machine, #{}}
},
rabbit_log:info("MQTT: starting Ra server with initial members: ~p", [Nodes]),
ra:start_server(?RA_SYSTEM, Conf).
trigger_election() ->
ra:trigger_election(server_id(), ?RA_OPERATION_TIMEOUT).
join_peers(_NodeId, []) ->
ok;
join_peers(NodeId, Nodes) ->
join_peers(NodeId, Nodes, 100).
join_peers(_NodeId, _Nodes, RetriesLeft) when RetriesLeft =:= 0 ->
rabbit_log:error("MQTT: exhausted all attempts while trying to rejoin cluster peers");
join_peers(NodeId, Nodes, RetriesLeft) ->
case ra:members(Nodes, ?START_TIMEOUT) of
{ok, Members, _} ->
case lists:member(NodeId, Members) of
true -> ok;
false -> ra:add_member(Members, NodeId)
end;
{timeout, _} ->
rabbit_log:debug("MQTT: timed out contacting cluster peers, %s retries left", [RetriesLeft]),
timer:sleep(?RETRY_INTERVAL),
join_peers(NodeId, Nodes, RetriesLeft - 1);
Err ->
Err
end.
-spec leave(node()) -> 'ok' | 'timeout' | 'nodedown'.
leave(Node) ->
NodeId = server_id(),
ToLeave = server_id(Node),
try
ra:leave_and_delete_server(?RA_SYSTEM, NodeId, ToLeave)
catch
exit:{{nodedown, Node}, _} ->
nodedown
end.
can_participate_in_clientid_tracking(Node) ->
case rpc:call(Node, mqtt_machine, module_info, []) of
{badrpc, _} -> false;
_ -> true
end.
-spec delete(Args) -> Ret when
Args :: rabbit_feature_flags:enable_callback_args(),
Ret :: rabbit_feature_flags:enable_callback_ret().
delete(_) ->
RaNodes = all_node_ids(),
Nodes = lists:map(fun({_, N}) -> N end, RaNodes),
LockId = {?ID_NAME, node_id()},
rabbit_log:info("Trying to acquire lock ~p on nodes ~p ...", [LockId, Nodes]),
true = global:set_lock(LockId, Nodes),
rabbit_log:info("Acquired lock ~p", [LockId]),
try whereis(?ID_NAME) of
undefined ->
rabbit_log:info("Local Ra process ~s does not exist", [?ID_NAME]),
ok;
_ ->
rabbit_log:info("Deleting Ra cluster ~s ...", [?ID_NAME]),
try ra:delete_cluster(RaNodes, 15_000) of
{ok, _Leader} ->
rabbit_log:info("Successfully deleted Ra cluster ~s", [?ID_NAME]),
ok;
{error, Reason} ->
rabbit_log:info("Failed to delete Ra cluster ~s: ~p", [?ID_NAME, Reason]),
ServerId = server_id(),
case ra:force_delete_server(?RA_SYSTEM, ServerId) of
ok ->
rabbit_log:info("Successfully force deleted Ra server ~p", [ServerId]),
ok;
Error ->
rabbit_log:error("Failed to force delete Ra server ~p: ~p",
[ServerId, Error]),
{error, Error}
end
catch exit:{{shutdown, delete}, _StackTrace} ->
rabbit_log:info("Ra cluster ~s already being deleted", [?ID_NAME]),
ok
end
after
true = global:del_lock(LockId, Nodes),
rabbit_log:info("Released lock ~p", [LockId])
end.

View File

@ -26,12 +26,6 @@ start(normal, []) ->
persist_static_configuration(),
{ok, Listeners} = application:get_env(tcp_listeners),
{ok, SslListeners} = application:get_env(ssl_listeners),
case rabbit_mqtt_ff:track_client_id_in_ra() of
true ->
ok = mqtt_node:start();
false ->
ok
end,
Result = rabbit_mqtt_sup:start_link({Listeners, SslListeners}, []),
EMPid = case rabbit_event:start_link() of
{ok, Pid} -> Pid;
@ -45,32 +39,19 @@ stop(_) ->
-spec emit_connection_info_all([node()], rabbit_types:info_keys(), reference(), pid()) -> term().
emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
case rabbit_mqtt_ff:track_client_id_in_ra() of
true ->
%% Ra tracks connections cluster-wide.
AllPids = rabbit_mqtt_collector:list_pids(),
emit_connection_info(Items, Ref, AggregatorPid, AllPids),
%% Our node already emitted infos for all connections. Therefore, for the
%% remaining nodes, we send back 'finished' so that the CLI does not time out.
[AggregatorPid ! {Ref, finished} || _ <- lists:seq(1, length(Nodes) - 1)];
false ->
Pids = [spawn_link(Node, ?MODULE, emit_connection_info_local,
[Items, Ref, AggregatorPid])
|| Node <- Nodes],
rabbit_control_misc:await_emitters_termination(Pids)
end.
Pids = [spawn_link(Node, ?MODULE, emit_connection_info_local,
[Items, Ref, AggregatorPid])
|| Node <- Nodes],
rabbit_control_misc:await_emitters_termination(Pids).
-spec emit_connection_info_local(rabbit_types:info_keys(), reference(), pid()) -> ok.
emit_connection_info_local(Items, Ref, AggregatorPid) ->
LocalPids = list_local_mqtt_connections(),
emit_connection_info(Items, Ref, AggregatorPid, LocalPids).
emit_connection_info(Items, Ref, AggregatorPid, Pids) ->
rabbit_control_misc:emitting_map_with_exit_handler(
AggregatorPid, Ref,
fun(Pid) ->
rabbit_mqtt_reader:info(Pid, Items)
end, Pids).
end, LocalPids).
-spec close_local_client_connections(atom()) -> {'ok', non_neg_integer()}.
close_local_client_connections(Reason) ->
@ -82,16 +63,10 @@ close_local_client_connections(Reason) ->
-spec local_connection_pids() -> [pid()].
local_connection_pids() ->
case rabbit_mqtt_ff:track_client_id_in_ra() of
true ->
AllPids = rabbit_mqtt_collector:list_pids(),
lists:filter(fun(Pid) -> node(Pid) =:= node() end, AllPids);
false ->
PgScope = persistent_term:get(?PG_SCOPE),
lists:flatmap(fun(Group) ->
pg:get_local_members(PgScope, Group)
end, pg:which_groups(PgScope))
end.
PgScope = persistent_term:get(?PG_SCOPE),
lists:flatmap(fun(Group) ->
pg:get_local_members(PgScope, Group)
end, pg:which_groups(PgScope)).
%% This function excludes Web MQTT connections
list_local_mqtt_connections() ->

View File

@ -1,98 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_mqtt_collector).
-include("mqtt_machine.hrl").
-export([register/2, register/3, unregister/2,
list/0, list_pids/0, leave/1]).
%%----------------------------------------------------------------------------
-spec register(client_id_ra(), pid()) -> {ok, reference()} | {error, term()}.
register(ClientId, Pid) ->
{ClusterName, _} = NodeId = mqtt_node:server_id(),
case ra_leaderboard:lookup_leader(ClusterName) of
undefined ->
case ra:members(NodeId) of
{ok, _, Leader} ->
register(Leader, ClientId, Pid);
_ = Error ->
Error
end;
Leader ->
register(Leader, ClientId, Pid)
end.
-spec register(ra:server_id(), client_id_ra(), pid()) ->
{ok, reference()} | {error, term()}.
register(ServerId, ClientId, Pid) ->
Corr = make_ref(),
send_ra_command(ServerId, {register, ClientId, Pid}, Corr),
erlang:send_after(5000, self(), {ra_event, undefined, register_timeout}),
{ok, Corr}.
-spec unregister(client_id_ra(), pid()) -> ok.
unregister(ClientId, Pid) ->
{ClusterName, _} = mqtt_node:server_id(),
case ra_leaderboard:lookup_leader(ClusterName) of
undefined ->
ok;
Leader ->
send_ra_command(Leader, {unregister, ClientId, Pid}, no_correlation)
end.
-spec list_pids() -> [pid()].
list_pids() ->
list(fun(#machine_state{pids = Pids}) -> maps:keys(Pids) end).
-spec list() -> term().
list() ->
list(fun(#machine_state{client_ids = Ids}) -> maps:to_list(Ids) end).
list(QF) ->
{ClusterName, _} = mqtt_node:server_id(),
case ra_leaderboard:lookup_leader(ClusterName) of
undefined ->
NodeIds = mqtt_node:all_node_ids(),
case ra:leader_query(NodeIds, QF) of
{ok, {_, Result}, _} -> Result;
{timeout, _} ->
rabbit_log:debug("~ts:list/1 leader query timed out",
[?MODULE]),
[]
end;
Leader ->
case ra:leader_query(Leader, QF) of
{ok, {_, Result}, _} -> Result;
{error, _} ->
[];
{timeout, _} ->
rabbit_log:debug("~ts:list/1 leader query timed out",
[?MODULE]),
[]
end
end.
-spec leave(binary()) -> ok | timeout | nodedown.
leave(NodeBin) ->
Node = binary_to_atom(NodeBin, utf8),
ServerId = mqtt_node:server_id(),
run_ra_command(ServerId, {leave, Node}),
mqtt_node:leave(Node).
%%----------------------------------------------------------------------------
-spec run_ra_command(term(), term()) -> term() | {error, term()}.
run_ra_command(ServerId, RaCommand) ->
case ra:process_command(ServerId, RaCommand) of
{ok, Result, _} -> Result;
_ = Error -> Error
end.
-spec send_ra_command(term(), term(), term()) -> ok.
send_ra_command(ServerId, RaCommand, Correlation) ->
ok = ra:pipeline_command(ServerId, RaCommand, Correlation, normal).

View File

@ -9,8 +9,6 @@
-include("rabbit_mqtt.hrl").
-export([track_client_id_in_ra/0]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Feature flags introduced in 3.12.0 %%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -18,14 +16,13 @@
-rabbit_feature_flag(
{?QUEUE_TYPE_QOS_0,
#{desc => "Support pseudo queue type for MQTT QoS 0 subscribers omitting a queue process",
stability => stable
stability => required
}}).
-rabbit_feature_flag(
{delete_ra_cluster_mqtt_node,
#{desc => "Delete Ra cluster 'mqtt_node' since MQTT client IDs are tracked locally",
stability => stable,
callbacks => #{enable => {mqtt_node, delete}}
stability => required
}}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -40,14 +37,10 @@
-rabbit_feature_flag(
{mqtt_v5,
#{desc => "Support MQTT 5.0",
stability => stable,
stability => required,
depends_on => [
%% MQTT 5.0 feature Will Delay Interval depends on client ID tracking in pg local.
delete_ra_cluster_mqtt_node,
message_containers
]
}}).
-spec track_client_id_in_ra() -> boolean().
track_client_id_in_ra() ->
rabbit_feature_flags:is_disabled(delete_ra_cluster_mqtt_node).

View File

@ -10,7 +10,7 @@
-export([info/2, init/4, process_packet/2,
terminate/3, handle_pre_hibernate/0,
handle_ra_event/2, handle_down/2, handle_queue_event/2,
handle_down/2, handle_queue_event/2,
proto_version_tuple/1, throttle/2, format_status/1,
remove_duplicate_client_id_connections/2,
remove_duplicate_client_id_connections/3,
@ -100,7 +100,6 @@
%% [v5 4.8.1]
subscriptions = #{} :: subscriptions(),
auth_state = #auth_state{},
ra_register_state :: option(registered | {pending, reference()}),
%% quorum queues and streams whose soft limit has been exceeded
queues_soft_limit_exceeded = sets:new([{version, 2}]) :: sets:set(),
qos0_messages_dropped = 0 :: non_neg_integer(),
@ -176,7 +175,6 @@ process_connect(
end,
Result0 =
maybe
ok ?= check_protocol_version(ProtoVer),
ok ?= check_extended_auth(ConnectProps),
{ok, ClientId} ?= ensure_client_id(ClientId0, CleanStart, ProtoVer),
{ok, {Username1, Password}} ?= check_credentials(Username0, Password0, SslLoginName, PeerIp),
@ -192,7 +190,7 @@ process_connect(
{ok, AuthzCtx} ?= check_vhost_access(VHost, User, ClientId, PeerIp),
ok ?= check_user_loopback(Username, PeerIp),
rabbit_core_metrics:auth_attempt_succeeded(PeerIp, Username, mqtt),
{ok, RaRegisterState} ?= register_client_id(VHost, ClientId, CleanStart, WillProps),
ok = register_client_id(VHost, ClientId, CleanStart, WillProps),
{ok, WillMsg} ?= make_will_msg(Packet),
{TraceState, ConnName} = init_trace(VHost, ConnName0),
ok = rabbit_mqtt_keepalive:start(KeepaliveSecs, Socket),
@ -221,8 +219,7 @@ process_connect(
topic_alias_maximum_outbound = TopicAliasMaxOutbound},
auth_state = #auth_state{
user = User,
authz_ctx = AuthzCtx},
ra_register_state = RaRegisterState},
authz_ctx = AuthzCtx}},
ok ?= clear_will_msg(S),
{ok, S}
end,
@ -317,7 +314,6 @@ process_connect(State0) ->
{ok, SessPresent, State}
else
{error, _} = Error ->
unregister_client(State0),
Error
end.
@ -613,18 +609,6 @@ update_session_expiry_interval(QName, Expiry) ->
ok = rabbit_queue_type:policy_changed(Q) % respects queue args
end.
check_protocol_version(V)
when V =:= 3 orelse V =:= 4 ->
ok;
check_protocol_version(5) ->
case rabbit_feature_flags:is_enabled(mqtt_v5) of
true ->
ok;
false ->
?LOG_ERROR("Rejecting MQTT 5.0 connection because feature flag mqtt_v5 is disabled"),
{error, ?RC_UNSUPPORTED_PROTOCOL_VERSION}
end.
check_extended_auth(#{'Authentication-Method' := Method}) ->
%% In future, we could support SASL via rabbit_auth_mechanism
%% as done by rabbit_reader and rabbit_stream_reader.
@ -665,47 +649,29 @@ ensure_client_id(ClientId, _, _)
when is_binary(ClientId) ->
{ok, ClientId}.
-spec register_client_id(rabbit_types:vhost(), client_id(), boolean(), properties()) ->
{ok, RaRegisterState :: undefined | {pending, reference()}} |
{error, ConnectErrorCode :: pos_integer()}.
-spec register_client_id(rabbit_types:vhost(), client_id(), boolean(), properties()) -> ok.
register_client_id(VHost, ClientId, CleanStart, WillProps)
when is_binary(VHost), is_binary(ClientId) ->
%% Always register client ID in pg.
PgGroup = {VHost, ClientId},
ok = pg:join(persistent_term:get(?PG_SCOPE), PgGroup, self()),
%% "If a Network Connection uses a Client Identifier of an existing Network Connection to
%% the Server, the Will Message for the exiting connection is sent unless the new
%% connection specifies Clean Start of 0 and the Will Delay is greater than zero."
%% [v5 3.1.3.2.2]
SendWill = case {CleanStart, WillProps} of
{false, #{'Will-Delay-Interval' := I}} when I > 0 ->
false;
_ ->
true
end,
ok = erpc:multicast([node() | nodes()],
?MODULE,
remove_duplicate_client_id_connections,
[PgGroup, self(), SendWill]).
case rabbit_mqtt_ff:track_client_id_in_ra() of
true ->
case collector_register(ClientId) of
{ok, Corr} ->
%% Ra node takes care of removing duplicate client ID connections.
{ok, {pending, Corr}};
{error, _} = Err ->
%% e.g. this node was removed from the MQTT cluster members
?LOG_ERROR("MQTT connection failed to register client ID ~s in vhost ~s in Ra: ~p",
[ClientId, VHost, Err]),
{error, ?RC_IMPLEMENTATION_SPECIFIC_ERROR}
end;
false ->
%% "If a Network Connection uses a Client Identifier of an existing Network Connection to
%% the Server, the Will Message for the exiting connection is sent unless the new
%% connection specifies Clean Start of 0 and the Will Delay is greater than zero."
%% [v5 3.1.3.2.2]
Args = case {CleanStart, WillProps} of
{false, #{'Will-Delay-Interval' := I}} when I > 0 ->
[PgGroup, self(), false];
_ ->
[PgGroup, self()]
end,
ok = erpc:multicast([node() | nodes()],
?MODULE,
remove_duplicate_client_id_connections,
Args),
{ok, undefined}
end.
%% Once feature flag mqtt_v5 becomes required, the caller should always pass SendWill to this
%% function (remove_duplicate_client_id_connections/2) so that we can delete this function.
%% remove_duplicate_client_id_connections/2 is only called from 3.13 nodes.
%% Hence, this function can be deleted when mixed version clusters between
%% this version and 3.13 are disallowed.
-spec remove_duplicate_client_id_connections(
{rabbit_types:vhost(), client_id()}, pid()) -> ok.
remove_duplicate_client_id_connections(PgGroup, PidToKeep) ->
@ -1403,13 +1369,8 @@ queue_ttl_args(SessionExpirySecs)
when is_integer(SessionExpirySecs) andalso SessionExpirySecs > 0 ->
[{?QUEUE_TTL_KEY, long, timer:seconds(SessionExpirySecs)}].
queue_type(?QOS_0, 0, QArgs) ->
case rabbit_queue_type:is_enabled(?QUEUE_TYPE_QOS_0) of
true ->
?QUEUE_TYPE_QOS_0;
false ->
rabbit_amqqueue:get_queue_type(QArgs)
end;
queue_type(?QOS_0, 0, _QArgs) ->
?QUEUE_TYPE_QOS_0;
queue_type(_, _, QArgs) ->
rabbit_amqqueue:get_queue_type(QArgs).
@ -1703,7 +1664,6 @@ terminate(SendWill, Infos, State = #state{queue_states = QStates}) ->
rabbit_core_metrics:connection_closed(self()),
rabbit_event:notify(connection_closed, Infos),
rabbit_networking:unregister_non_amqp_connection(self()),
unregister_client(State),
maybe_decrement_consumer(State),
maybe_decrement_publisher(State),
_ = maybe_delete_mqtt_qos0_queue(State),
@ -1799,15 +1759,6 @@ log_delayed_will_failure(Topic, ClientId, Reason) ->
?LOG_DEBUG("failed to schedule delayed Will Message to topic ~s for MQTT client ID ~s: ~p",
[Topic, ClientId, Reason]).
unregister_client(#state{cfg = #cfg{client_id = ClientIdBin}}) ->
case rabbit_mqtt_ff:track_client_id_in_ra() of
true ->
ClientId = rabbit_data_coercion:to_list(ClientIdBin),
rabbit_mqtt_collector:unregister(ClientId, self());
false ->
ok
end.
maybe_delete_mqtt_qos0_queue(
State = #state{cfg = #cfg{clean_start = true},
auth_state = #auth_state{user = #user{username = Username}}}) ->
@ -1867,41 +1818,6 @@ handle_pre_hibernate() ->
erase(topic_permission_cache),
ok.
-spec handle_ra_event(register_timeout
| {applied, [{reference(), ok}]}
| {not_leader, term(), reference()}, state()) -> state().
handle_ra_event({applied, [{Corr, ok}]},
State = #state{ra_register_state = {pending, Corr}}) ->
%% success case - command was applied transition into registered state
State#state{ra_register_state = registered};
handle_ra_event({not_leader, Leader, Corr},
State = #state{ra_register_state = {pending, Corr},
cfg = #cfg{client_id = ClientIdBin}}) ->
case rabbit_mqtt_ff:track_client_id_in_ra() of
true ->
ClientId = rabbit_data_coercion:to_list(ClientIdBin),
%% retry command against actual leader
{ok, NewCorr} = rabbit_mqtt_collector:register(Leader, ClientId, self()),
State#state{ra_register_state = {pending, NewCorr}};
false ->
State
end;
handle_ra_event(register_timeout,
State = #state{ra_register_state = {pending, _Corr},
cfg = #cfg{client_id = ClientId}}) ->
case rabbit_mqtt_ff:track_client_id_in_ra() of
true ->
{ok, NewCorr} = collector_register(ClientId),
State#state{ra_register_state = {pending, NewCorr}};
false ->
State
end;
handle_ra_event(register_timeout, State) ->
State;
handle_ra_event(Evt, State) ->
?LOG_DEBUG("unhandled ra_event: ~w ", [Evt]),
State.
-spec handle_down(term(), state()) ->
{ok, state()} | {error, Reason :: any()}.
handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason},
@ -2441,10 +2357,6 @@ message_redelivered(true, ProtoVer, QType) ->
message_redelivered(_, _, _) ->
ok.
collector_register(ClientIdBin) ->
ClientId = rabbit_data_coercion:to_list(ClientIdBin),
rabbit_mqtt_collector:register(ClientId, self()).
%% "Reason Codes less than 0x80 indicate successful completion of an operation.
%% Reason Code values of 0x80 or greater indicate failure."
-spec is_success(reason_code()) -> boolean().
@ -2459,7 +2371,6 @@ format_status(
packet_id = PackID,
subscriptions = Subscriptions,
auth_state = AuthState,
ra_register_state = RaRegisterState,
queues_soft_limit_exceeded = QSLE,
qos0_messages_dropped = Qos0MsgsDropped,
cfg = #cfg{
@ -2510,7 +2421,6 @@ format_status(
packet_id => PackID,
subscriptions => Subscriptions,
auth_state => AuthState,
ra_register_state => RaRegisterState,
queues_soft_limit_exceeded => QSLE,
qos0_messages_dropped => Qos0MsgsDropped}.

View File

@ -145,7 +145,7 @@ deliver(Qs, Msg, Options) ->
{[], Actions}.
-spec is_enabled() -> boolean().
is_enabled() -> rabbit_feature_flags:is_enabled(?MODULE).
is_enabled() -> true.
-spec is_compatible(boolean(), boolean(), boolean()) ->
boolean().

View File

@ -112,11 +112,6 @@ handle_call({info, InfoItems}, _From, State) ->
handle_call(Msg, From, State) ->
{stop, {mqtt_unexpected_call, Msg, From}, State}.
%% Delete this backward compatibility clause when feature flag
%% delete_ra_cluster_mqtt_node becomes required.
handle_cast(duplicate_id, State) ->
handle_cast({duplicate_id, true}, State);
handle_cast({duplicate_id, SendWill},
State = #state{proc_state = PState,
conn_name = ConnName}) ->
@ -235,13 +230,6 @@ handle_info(login_timeout, State) ->
handle_info(emit_stats, State) ->
{noreply, emit_stats(State), ?HIBERNATE_AFTER};
handle_info({ra_event, _From, Evt},
#state{proc_state = PState0} = State) ->
%% handle applied event to ensure registration command actually got applied
%% handle not_leader notification in case we send the command to a non-leader
PState = rabbit_mqtt_processor:handle_ra_event(Evt, PState0),
{noreply, pstate(State, PState), ?HIBERNATE_AFTER};
handle_info({{'DOWN', _QName}, _MRef, process, _Pid, _Reason} = Evt,
#state{proc_state = PState0} = State) ->
case rabbit_mqtt_processor:handle_down(Evt, PState0) of

View File

@ -129,18 +129,16 @@ init_per_group(authz, Config0) ->
,{vhost, VHost}
,{exchange, <<"amq.topic">>}
]},
Config1 = rabbit_ct_helpers:run_setup_steps(rabbit_ct_helpers:merge_app_env(Config0, MqttConfig),
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
rabbit_ct_broker_helpers:add_user(Config1, User, Password),
rabbit_ct_broker_helpers:add_vhost(Config1, VHost),
[Log|_] = rpc(Config1, 0, rabbit, log_locations, []),
Config2 = [{mqtt_user, User},
{mqtt_vhost, VHost},
{mqtt_password, Password},
{log_location, Log} | Config1],
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config2, mqtt_v5),
Config2;
Config = rabbit_ct_helpers:run_setup_steps(rabbit_ct_helpers:merge_app_env(Config0, MqttConfig),
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
rabbit_ct_broker_helpers:add_user(Config, User, Password),
rabbit_ct_broker_helpers:add_vhost(Config, VHost),
[Log|_] = rpc(Config, 0, rabbit, log_locations, []),
[{mqtt_user, User},
{mqtt_vhost, VHost},
{mqtt_password, Password},
{log_location, Log} | Config];
init_per_group(Group, Config) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(Config, [
@ -149,22 +147,20 @@ init_per_group(Group, Config) ->
]),
MqttConfig = mqtt_config(Group),
AuthConfig = auth_config(Group),
Config2 = rabbit_ct_helpers:run_setup_steps(
Config1,
[fun(Conf) -> case MqttConfig of
undefined -> Conf;
_ -> merge_app_env(MqttConfig, Conf)
end
end] ++
[fun(Conf) -> case AuthConfig of
undefined -> Conf;
_ -> merge_app_env(AuthConfig, Conf)
end
end] ++
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config2, mqtt_v5),
Config2.
rabbit_ct_helpers:run_setup_steps(
Config1,
[fun(Conf) -> case MqttConfig of
undefined -> Conf;
_ -> merge_app_env(MqttConfig, Conf)
end
end] ++
[fun(Conf) -> case AuthConfig of
undefined -> Conf;
_ -> merge_app_env(AuthConfig, Conf)
end
end] ++
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
end_per_group(G, Config)
when G =:= v4;

View File

@ -79,11 +79,11 @@ init_per_testcase(Testcase, Config) ->
{rmq_nodename_suffix, Testcase},
{rmq_nodes_clustered, true}
]),
Config2 = rabbit_ct_helpers:run_setup_steps(Config1,
[ fun merge_app_env/1 ] ++
rabbit_ct_helpers:run_setup_steps(
Config1,
[fun merge_app_env/1] ++
setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
util:maybe_skip_v5(Config2).
rabbit_ct_client_helpers:setup_steps()).
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:run_steps(Config,
@ -139,7 +139,7 @@ connection_id_tracking_on_nodedown(Config) ->
process_flag(trap_exit, true),
ok = stop_node(Config, 0),
await_exit(C),
ok = eventually(?_assertEqual([], util:all_connection_pids(1, Config)), 500, 4).
ok = eventually(?_assertEqual([], util:all_connection_pids(Config)), 500, 4).
%%
%% Helpers

View File

@ -56,9 +56,7 @@ end_per_suite(Config) ->
init_per_group(unit, Config) ->
Config;
init_per_group(Group, Config) ->
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, delete_ra_cluster_mqtt_node),
Config1 = rabbit_ct_helpers:set_config(Config, {mqtt_version, Group}),
util:maybe_skip_v5(Config1).
rabbit_ct_helpers:set_config(Config, {mqtt_version, Group}).
end_per_group(_, Config) ->
Config.

View File

@ -1,150 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
-module(ff_SUITE).
-compile([export_all, nowarn_export_all]).
-include_lib("eunit/include/eunit.hrl").
-import(rabbit_ct_broker_helpers, [rpc/5]).
-import(rabbit_ct_helpers, [eventually/1]).
-import(util, [expect_publishes/3,
get_global_counters/4,
connect/2,
connect/4]).
-define(PROTO_VER, v4).
all() ->
[
{group, cluster_size_3}
].
groups() ->
[
{cluster_size_3, [],
[rabbit_mqtt_qos0_queue,
%% delete_ra_cluster_mqtt_node must run before mqtt_v5
%% because the latter depends on (i.e. auto-enables) the former.
delete_ra_cluster_mqtt_node,
mqtt_v5]}
].
suite() ->
[
{timetrap, {minutes, 10}}
].
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config, []).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(Group = cluster_size_3, Config0) ->
Config1 = rabbit_ct_helpers:set_config(Config0, [{rmq_nodes_count, 3},
{rmq_nodename_suffix, Group}]),
Config = rabbit_ct_helpers:merge_app_env(
Config1, {rabbit, [{forced_feature_flags_on_init, []}]}),
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
end_per_group(_Group, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(TestCase, Config) ->
case rabbit_ct_broker_helpers:is_feature_flag_supported(Config, TestCase) of
true ->
?assertNot(rabbit_ct_broker_helpers:is_feature_flag_enabled(Config, TestCase)),
Config;
false ->
{skip, io_lib:format("feature flag ~s is unsupported", [TestCase])}
end.
end_per_testcase(_TestCase, Config) ->
Config.
delete_ra_cluster_mqtt_node(Config) ->
FeatureFlag = ?FUNCTION_NAME,
C = connect(<<"my-client">>, Config, 1, []),
timer:sleep(500),
%% old client ID tracking works
?assertEqual(1, length(util:all_connection_pids(Config))),
%% Ra processes are alive
?assert(lists:all(fun erlang:is_pid/1,
rabbit_ct_broker_helpers:rpc_all(Config, erlang, whereis, [mqtt_node]))),
?assertEqual(ok,
rabbit_ct_broker_helpers:enable_feature_flag(Config, FeatureFlag)),
%% Ra processes should be gone
eventually(
?_assert(lists:all(fun(Pid) -> Pid =:= undefined end,
rabbit_ct_broker_helpers:rpc_all(Config, erlang, whereis, [mqtt_node])))),
%% new client ID tracking works
?assertEqual(1, length(util:all_connection_pids(Config))),
ok = emqtt:disconnect(C),
eventually(?_assertEqual(0, length(util:all_connection_pids(Config)))).
rabbit_mqtt_qos0_queue(Config) ->
FeatureFlag = ?FUNCTION_NAME,
Msg = Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
C1 = connect(ClientId, Config),
{ok, _, [0]} = emqtt:subscribe(C1, Topic, qos0),
ok = emqtt:publish(C1, Topic, Msg, qos0),
ok = expect_publishes(C1, Topic, [Msg]),
?assertEqual(1,
length(rpc(Config, 0, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]))),
?assertEqual(ok,
rabbit_ct_broker_helpers:enable_feature_flag(Config, FeatureFlag)),
%% Queue type does not chanage for existing connection.
?assertEqual(1,
length(rpc(Config, 0, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]))),
ok = emqtt:publish(C1, Topic, Msg, qos0),
ok = expect_publishes(C1, Topic, [Msg]),
?assertMatch(#{messages_delivered_total := 2,
messages_delivered_consume_auto_ack_total := 2},
get_global_counters(Config, ?PROTO_VER, 0, [{queue_type, rabbit_classic_queue}])),
%% Reconnecting with the same client ID will terminate the old connection.
true = unlink(C1),
C2 = connect(ClientId, Config),
{ok, _, [0]} = emqtt:subscribe(C2, Topic, qos0),
%% This time, we get the new queue type.
eventually(
?_assertEqual(0,
length(rpc(Config, 0, rabbit_amqqueue, list_by_type, [rabbit_classic_queue])))),
?assertEqual(1,
length(rpc(Config, 0, rabbit_amqqueue, list_by_type, [FeatureFlag]))),
ok = emqtt:publish(C2, Topic, Msg, qos0),
ok = expect_publishes(C2, Topic, [Msg]),
?assertMatch(#{messages_delivered_total := 1,
messages_delivered_consume_auto_ack_total := 1},
get_global_counters(Config, ?PROTO_VER, 0, [{queue_type, FeatureFlag}])),
ok = emqtt:disconnect(C2).
mqtt_v5(Config) ->
FeatureFlag = ?FUNCTION_NAME,
%% MQTT 5.0 is not yet supported.
{C1, Connect} = util:start_client(?FUNCTION_NAME, Config, 0, [{proto_ver, v5}]),
unlink(C1),
?assertEqual({error, {unsupported_protocol_version, #{}}}, Connect(C1)),
?assertEqual(ok, rabbit_ct_broker_helpers:enable_feature_flag(Config, FeatureFlag)),
%% MQTT 5.0 is now supported.
{C5, Connect} = util:start_client(?FUNCTION_NAME, Config, 0, [{proto_ver, v5}]),
?assertMatch({ok, _}, Connect(C5)),
ok = emqtt:disconnect(C5).

View File

@ -54,18 +54,17 @@ end_per_suite(Config) ->
init_per_group(Group, Config0) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config0, "", "-"),
Config1 = rabbit_ct_helpers:set_config(Config0, [
{rmq_nodename_suffix, Suffix},
{rmq_certspwd, "bunnychow"},
{rmq_nodes_clustered, true},
{rmq_nodes_count, 3},
{mqtt_version, Group}
]),
Config = rabbit_ct_helpers:run_setup_steps(Config1,
[ fun merge_app_env/1 ] ++
Config = rabbit_ct_helpers:set_config(
Config0, [{rmq_nodename_suffix, Suffix},
{rmq_certspwd, "bunnychow"},
{rmq_nodes_clustered, true},
{rmq_nodes_count, 3},
{mqtt_version, Group}]),
rabbit_ct_helpers:run_setup_steps(
Config,
[fun merge_app_env/1] ++
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
util:maybe_skip_v5(Config).
rabbit_ct_client_helpers:setup_steps()).
end_per_group(_, Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,

View File

@ -1,95 +0,0 @@
-module(mqtt_machine_SUITE).
-compile([export_all, nowarn_export_all]).
-include_lib("eunit/include/eunit.hrl").
-include("mqtt_machine.hrl").
%%%===================================================================
%%% Common Test callbacks
%%%===================================================================
all() ->
[
{group, tests}
].
all_tests() ->
[
basics,
machine_upgrade,
many_downs
].
groups() ->
[
{tests, [], all_tests()}
].
%%%===================================================================
%%% Test cases
%%%===================================================================
basics(_Config) ->
S0 = mqtt_machine:init(#{}),
ClientId = <<"id1">>,
OthPid = spawn(fun () -> ok end),
{S1, ok, _} = mqtt_machine:apply(meta(1), {register, ClientId, self()}, S0),
?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 1, S1),
?assertMatch(#machine_state{pids = Pids} when map_size(Pids) == 1, S1),
{S2, ok, _} = mqtt_machine:apply(meta(2), {register, ClientId, OthPid}, S1),
?assertMatch(#machine_state{client_ids = #{ClientId := OthPid} = Ids}
when map_size(Ids) == 1, S2),
{S3, ok, _} = mqtt_machine:apply(meta(3), {down, OthPid, noproc}, S2),
?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 0, S3),
{S4, ok, _} = mqtt_machine:apply(meta(3), {unregister, ClientId, OthPid}, S2),
?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 0, S4),
ok.
machine_upgrade(_Config) ->
S0 = mqtt_machine_v0:init(#{}),
ClientId = <<"id1">>,
Self = self(),
{S1, ok, _} = mqtt_machine_v0:apply(meta(1), {register, ClientId, self()}, S0),
?assertMatch({machine_state, Ids} when map_size(Ids) == 1, S1),
{S2, ok, _} = mqtt_machine:apply(meta(2), {machine_version, 0, 1}, S1),
?assertMatch(#machine_state{client_ids = #{ClientId := Self},
pids = #{Self := [ClientId]} = Pids}
when map_size(Pids) == 1, S2),
{S3, ok, _} = mqtt_machine:apply(meta(3), {down, self(), noproc}, S2),
?assertMatch(#machine_state{client_ids = Ids,
pids = Pids}
when map_size(Ids) == 0 andalso map_size(Pids) == 0, S3),
ok.
many_downs(_Config) ->
S0 = mqtt_machine:init(#{}),
Clients = [{list_to_binary(integer_to_list(I)), spawn(fun() -> ok end)}
|| I <- lists:seq(1, 10000)],
S1 = lists:foldl(
fun ({ClientId, Pid}, Acc0) ->
{Acc, ok, _} = mqtt_machine:apply(meta(1), {register, ClientId, Pid}, Acc0),
Acc
end, S0, Clients),
_ = lists:foldl(
fun ({_ClientId, Pid}, Acc0) ->
{Acc, ok, _} = mqtt_machine:apply(meta(1), {down, Pid, noproc}, Acc0),
Acc
end, S1, Clients),
_ = lists:foldl(
fun ({ClientId, Pid}, Acc0) ->
{Acc, ok, _} = mqtt_machine:apply(meta(1), {unregister, ClientId,
Pid}, Acc0),
Acc
end, S0, Clients),
ok.
%% Utility
meta(Idx) ->
#{index => Idx,
term => 1,
ts => erlang:system_time(millisecond)}.

View File

@ -71,7 +71,6 @@ init_per_group(Group, Config0) ->
Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, mqtt_v5),
Plugins = [rabbitmq_stomp,
rabbitmq_stream],
@ -367,7 +366,6 @@ amqp_mqtt_amqp(Config) ->
%% consume via MQTT 5.0 with a QoS 0 subscription.
amqp_mqtt_qos0(Config) ->
%% We want to test that the old node can receive from an MQTT QoS 0 queue.
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, rabbit_mqtt_qos0_queue),
amqp_mqtt(0, Config).
%% Send messages with different AMQP body sections and

View File

@ -57,9 +57,10 @@ end_per_suite(Config) ->
rabbit_ct_broker_helpers:teardown_steps()).
init_per_group(Group, Config) ->
Config1 = rabbit_ct_helpers:set_config(Config, {mqtt_version, Group}),
util:maybe_skip_v5(Config1).
end_per_group(_, Config) -> Config.
rabbit_ct_helpers:set_config(Config, {mqtt_version, Group}).
end_per_group(_Group, Config) ->
Config.
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).

View File

@ -63,13 +63,11 @@ merge_app_env(Config) ->
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config, {rmq_nodename_suffix, ?MODULE}),
Config2 = rabbit_ct_helpers:run_setup_steps(
Config1,
[fun merge_app_env/1] ++
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config2, mqtt_v5),
Config2.
rabbit_ct_helpers:run_setup_steps(
Config1,
[fun merge_app_env/1] ++
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
@ -265,8 +263,6 @@ rabbit_mqtt_qos0_queue_overflow(Config) ->
#{messages_dead_lettered_maxlen_total := NumDeadLettered}
} = rabbit_ct_broker_helpers:rpc(Config, rabbit_global_counters, overview, []),
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, QType),
Topic = atom_to_binary(?FUNCTION_NAME),
Msg = binary:copy(<<"x">>, 4000),
NumMsgs = 10_000,

View File

@ -63,7 +63,7 @@ init_per_group(G, Config)
rabbit_ct_helpers:set_config(Config, {mqtt_version, G});
init_per_group(Group, Config0) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config0, "", "-"),
Config1 = rabbit_ct_helpers:set_config(
Config = rabbit_ct_helpers:set_config(
Config0, {rmq_nodename_suffix, Suffix}),
Mod = list_to_atom("rabbit_mqtt_retained_msg_store_" ++ atom_to_list(Group)),
Env = [{rabbitmq_mqtt, [{retained_message_store, Mod}]},
@ -73,13 +73,11 @@ init_per_group(Group, Config0) ->
{default_vhost, "/"},
{default_permissions, [".*", ".*", ".*"]}
]}],
Config = rabbit_ct_helpers:run_setup_steps(
Config1,
[fun(Conf) -> rabbit_ct_helpers:merge_app_env(Conf, Env) end] ++
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, mqtt_v5),
Config.
rabbit_ct_helpers:run_setup_steps(
Config,
[fun(Conf) -> rabbit_ct_helpers:merge_app_env(Conf, Env) end] ++
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
end_per_group(G, Config)
when G =:= v4;

View File

@ -27,8 +27,7 @@
rpc_all/4,
get_node_config/3,
drain_node/2,
revive_node/2,
is_feature_flag_enabled/2
revive_node/2
]).
-import(rabbit_ct_helpers,
[eventually/3,
@ -174,12 +173,11 @@ init_per_group(mqtt, Config) ->
init_per_group(web_mqtt, Config) ->
rabbit_ct_helpers:set_config(Config, {websocket, true});
init_per_group(Group, Config0)
init_per_group(Group, Config)
when Group =:= v3;
Group =:= v4;
Group =:= v5 ->
Config = rabbit_ct_helpers:set_config(Config0, {mqtt_version, Group}),
util:maybe_skip_v5(Config);
rabbit_ct_helpers:set_config(Config, {mqtt_version, Group});
init_per_group(Group, Config0) ->
Nodes = case Group of
@ -209,7 +207,7 @@ end_per_group(_, Config) ->
init_per_testcase(T, Config)
when T =:= management_plugin_connection;
T =:= management_plugin_enable ->
ok = inets:start(),
inets:start(),
init_per_testcase0(T, Config);
init_per_testcase(Testcase, Config) ->
init_per_testcase0(Testcase, Config).
@ -578,27 +576,13 @@ events(Config) ->
QueueNameBin = <<"mqtt-subscription-", ClientId/binary, "qos0">>,
QueueName = {resource, <<"/">>, queue, QueueNameBin},
[E2, E3 | E4] = get_events(Server),
QueueType = case is_feature_flag_enabled(Config, rabbit_mqtt_qos0_queue) of
true ->
?assertEqual([], E4),
rabbit_mqtt_qos0_queue;
false ->
[ConsumerCreated] = E4,
assert_event_type(consumer_created, ConsumerCreated),
assert_event_prop([{queue, QueueName},
{ack_required, false},
{exclusive, false},
{arguments, []}],
ConsumerCreated),
rabbit_classic_queue
end,
[E2, E3] = get_events(Server),
assert_event_type(queue_created, E2),
assert_event_prop([{name, QueueName},
{durable, true},
{auto_delete, false},
{exclusive, true},
{type, QueueType},
{type, rabbit_mqtt_qos0_queue},
{arguments, []}],
E2),
assert_event_type(binding_created, E3),
@ -617,28 +601,18 @@ events(Config) ->
{ok, _, _} = emqtt:unsubscribe(C, MqttTopic),
[E5] = get_events(Server),
assert_event_type(binding_deleted, E5),
[E4] = get_events(Server),
assert_event_type(binding_deleted, E4),
ok = emqtt:disconnect(C),
[E6, E7 | E8] = get_events(Server),
assert_event_type(connection_closed, E6),
?assertEqual(E1#event.props, E6#event.props,
[E5, E6] = get_events(Server),
assert_event_type(connection_closed, E5),
?assertEqual(E1#event.props, E5#event.props,
"connection_closed event props should match connection_created event props. "
"See https://github.com/rabbitmq/rabbitmq-server/discussions/6331"),
case is_feature_flag_enabled(Config, rabbit_mqtt_qos0_queue) of
true ->
assert_event_type(queue_deleted, E7),
assert_event_prop({name, QueueName}, E7);
false ->
assert_event_type(consumer_deleted, E7),
assert_event_prop({queue, QueueName}, E7),
[QueueDeleted] = E8,
assert_event_type(queue_deleted, QueueDeleted),
assert_event_prop({name, QueueName}, QueueDeleted)
end,
assert_event_type(queue_deleted, E6),
assert_event_prop({name, QueueName}, E6),
ok = gen_event:delete_handler({rabbit_event, Server}, event_recorder, []).
@ -681,38 +655,24 @@ global_counters(Config) ->
messages_unroutable_dropped_total => 1,
messages_unroutable_returned_total => 1},
get_global_counters(Config, ProtoVer)),
case is_feature_flag_enabled(Config, rabbit_mqtt_qos0_queue) of
true ->
?assertEqual(#{messages_delivered_total => 2,
messages_acknowledged_total => 1,
messages_delivered_consume_auto_ack_total => 1,
messages_delivered_consume_manual_ack_total => 1,
messages_delivered_get_auto_ack_total => 0,
messages_delivered_get_manual_ack_total => 0,
messages_get_empty_total => 0,
messages_redelivered_total => 0},
get_global_counters(Config, ProtoVer, 0, [{queue_type, rabbit_classic_queue}])),
?assertEqual(#{messages_delivered_total => 1,
messages_acknowledged_total => 0,
messages_delivered_consume_auto_ack_total => 1,
messages_delivered_consume_manual_ack_total => 0,
messages_delivered_get_auto_ack_total => 0,
messages_delivered_get_manual_ack_total => 0,
messages_get_empty_total => 0,
messages_redelivered_total => 0},
get_global_counters(Config, ProtoVer, 0, [{queue_type, rabbit_mqtt_qos0_queue}]));
false ->
?assertEqual(#{messages_delivered_total => 3,
messages_acknowledged_total => 1,
messages_delivered_consume_auto_ack_total => 2,
messages_delivered_consume_manual_ack_total => 1,
messages_delivered_get_auto_ack_total => 0,
messages_delivered_get_manual_ack_total => 0,
messages_get_empty_total => 0,
messages_redelivered_total => 0},
get_global_counters(Config, ProtoVer, 0, [{queue_type, rabbit_classic_queue}]))
end,
?assertEqual(#{messages_delivered_total => 2,
messages_acknowledged_total => 1,
messages_delivered_consume_auto_ack_total => 1,
messages_delivered_consume_manual_ack_total => 1,
messages_delivered_get_auto_ack_total => 0,
messages_delivered_get_manual_ack_total => 0,
messages_get_empty_total => 0,
messages_redelivered_total => 0},
get_global_counters(Config, ProtoVer, 0, [{queue_type, rabbit_classic_queue}])),
?assertEqual(#{messages_delivered_total => 1,
messages_acknowledged_total => 0,
messages_delivered_consume_auto_ack_total => 1,
messages_delivered_consume_manual_ack_total => 0,
messages_delivered_get_auto_ack_total => 0,
messages_delivered_get_manual_ack_total => 0,
messages_get_empty_total => 0,
messages_redelivered_total => 0},
get_global_counters(Config, ProtoVer, 0, [{queue_type, rabbit_mqtt_qos0_queue}])),
{ok, _, _} = emqtt:unsubscribe(C, Topic1),
?assertEqual(1, maps:get(consumers, get_global_counters(Config, ProtoVer))),
@ -1255,13 +1215,7 @@ cli_list_queues(Config) ->
"type", "name", "state", "durable", "auto_delete",
"arguments", "pid", "owner_pid", "messages", "exclusive_consumer_tag"
]),
ExpectedQueueType = case is_feature_flag_enabled(Config, rabbit_mqtt_qos0_queue) of
true ->
<<"MQTT QoS 0">>;
false ->
<<"classic">>
end,
?assertMatch([[ExpectedQueueType, <<"mqtt-subscription-cli_list_queuesqos0">>,
?assertMatch([[<<"MQTT QoS 0">>, <<"mqtt-subscription-cli_list_queuesqos0">>,
<<"running">>, <<"true">>, <<"false">>, <<"[]">>, _, _, <<"0">>, <<"">>]],
Qs),
@ -1273,11 +1227,6 @@ cli_list_queues(Config) ->
ok = emqtt:disconnect(C).
maintenance(Config) ->
%% When either file rabbit_mqtt_collector changes or different OTP versions
%% are used for compilation, the rabbit_mqtt_collector module version will
%% change and cause a bad fun error when executing ra:leader_query/2 remotely.
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, delete_ra_cluster_mqtt_node),
C0 = connect(<<"client-0">>, Config, 0, []),
C1a = connect(<<"client-1a">>, Config, 1, []),
C1b = connect(<<"client-1b">>, Config, 1, []),
@ -1367,11 +1316,6 @@ keepalive_turned_off(Config) ->
ok = emqtt:disconnect(C).
duplicate_client_id(Config) ->
%% When either file rabbit_mqtt_collector changes or different OTP versions
%% are used for compilation, the rabbit_mqtt_collector module version will
%% change and cause a bad fun error when executing ra:leader_query/2 remotely.
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, delete_ra_cluster_mqtt_node),
[Server1, Server2, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
%% Test session takeover by both new and old node in mixed version clusters.
ClientId1 = <<"c1">>,
@ -1509,14 +1453,8 @@ clean_session_disconnect_client(Config) ->
{ok, _, _} = emqtt:subscribe(C, <<"topic1">>, qos1),
QsQos0 = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_mqtt_qos0_queue]),
QsClassic = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]),
case is_feature_flag_enabled(Config, rabbit_mqtt_qos0_queue) of
true ->
?assertEqual(1, length(QsQos0)),
?assertEqual(1, length(QsClassic));
false ->
?assertEqual(0, length(QsQos0)),
?assertEqual(2, length(QsClassic))
end,
?assertEqual(1, length(QsQos0)),
?assertEqual(1, length(QsClassic)),
ok = emqtt:disconnect(C),
%% After terminating a clean session, we expect any session state to be cleaned up on the server.
@ -1536,14 +1474,8 @@ clean_session_node_down(NodeDown, Config) ->
{ok, _, _} = emqtt:subscribe(C, <<"topic1">>, qos1),
QsQos0 = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_mqtt_qos0_queue]),
QsClassic = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]),
case is_feature_flag_enabled(Config, rabbit_mqtt_qos0_queue) of
true ->
?assertEqual(1, length(QsQos0)),
?assertEqual(1, length(QsClassic));
false ->
?assertEqual(0, length(QsQos0)),
?assertEqual(2, length(QsClassic))
end,
?assertEqual(1, length(QsQos0)),
?assertEqual(1, length(QsClassic)),
?assertEqual(2, rpc(Config, rabbit_amqqueue, count, [])),
unlink(C),

View File

@ -6,7 +6,6 @@
-include_lib("eunit/include/eunit.hrl").
-export([all_connection_pids/1,
all_connection_pids/2,
publish_qos1_timeout/4,
sync_publish_result/3,
get_global_counters/1,
@ -25,23 +24,13 @@
assert_message_expiry_interval/2,
await_exit/1,
await_exit/2,
maybe_skip_v5/1,
non_clean_sess_opts/0
]).
all_connection_pids(Config) ->
all_connection_pids(0, Config).
all_connection_pids(Node, Config) ->
case rabbit_ct_broker_helpers:rpc(
Config, Node, rabbit_feature_flags, is_enabled, [delete_ra_cluster_mqtt_node]) of
true ->
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Result = erpc:multicall(Nodes, rabbit_mqtt, local_connection_pids, [], 5000),
lists:append([Pids || {ok, Pids} <- Result]);
false ->
rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_mqtt_collector, list_pids, [])
end.
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Result = erpc:multicall(Nodes, rabbit_mqtt, local_connection_pids, [], 5000),
lists:append([Pids || {ok, Pids} <- Result]).
publish_qos1_timeout(Client, Topic, Payload, Timeout) ->
Mref = erlang:monitor(process, Client),
@ -141,21 +130,6 @@ await_exit(Pid, Reason) ->
20_000 -> ct:fail({missing_exit, Pid})
end.
maybe_skip_v5({skip, _Reason} = Skip) ->
%% Mixed-version can be skipped as `khepri_db`
%% is not supported
Skip;
maybe_skip_v5(Config) ->
case ?config(mqtt_version, Config) of
v5 ->
case rabbit_ct_broker_helpers:enable_feature_flag(Config, mqtt_v5) of
ok -> Config;
{skip, _} = Skip -> Skip
end;
_ ->
Config
end.
%% "CleanStart=0 and SessionExpiry=0xFFFFFFFF (UINT_MAX) for
%% MQTT 5.0 would provide the same as CleanSession=0 for 3.1.1."
%% https://issues.oasis-open.org/projects/MQTT/issues/MQTT-538

View File

@ -167,27 +167,13 @@ init_per_group(Group, Config0) ->
[{mqtt_version, v5},
{rmq_nodes_count, Nodes},
{rmq_nodename_suffix, Suffix}]),
Config2 = rabbit_ct_helpers:merge_app_env(
Config1,
{rabbit, [{quorum_tick_interval, 200}]}),
Config = rabbit_ct_helpers:run_steps(
Config2,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
%% Mixed-version is skipped as `khepri_db`
%% is not supported
case Config of
{skip, _Reason} = Skip ->
Skip;
_ ->
case Group of
cluster_size_1 ->
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, mqtt_v5),
Config;
cluster_size_3 ->
util:maybe_skip_v5(Config)
end
end.
Config = rabbit_ct_helpers:merge_app_env(
Config1,
{rabbit, [{quorum_tick_interval, 200}]}),
rabbit_ct_helpers:run_steps(
Config,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
end_per_group(G, Config)
when G =:= cluster_size_1;

View File

@ -168,10 +168,6 @@ websocket_info({'$gen_cast', QueueEvent = {queue_event, _, _}},
[State#state.conn_name, Reason]),
stop(State#state{proc_state = PState})
end;
websocket_info({'$gen_cast', duplicate_id}, State) ->
%% Delete this backward compatibility clause when feature flag
%% delete_ra_cluster_mqtt_node becomes required.
websocket_info({'$gen_cast', {duplicate_id, true}}, State);
websocket_info({'$gen_cast', {duplicate_id, SendWill}},
State = #state{proc_state = ProcState,
conn_name = ConnName}) ->
@ -221,10 +217,6 @@ websocket_info({keepalive, Req}, State = #state{proc_state = ProcState,
end;
websocket_info(emit_stats, State) ->
{[], emit_stats(State), hibernate};
websocket_info({ra_event, _From, Evt},
#state{proc_state = PState0} = State) ->
PState = rabbit_mqtt_processor:handle_ra_event(Evt, PState0),
{[], State#state{proc_state = PState}, hibernate};
websocket_info({{'DOWN', _QName}, _MRef, process, _Pid, _Reason} = Evt,
State = #state{proc_state = PState0}) ->
case rabbit_mqtt_processor:handle_down(Evt, PState0) of

View File

@ -55,12 +55,8 @@ end_per_suite(Config) ->
init_per_group(unit, Config) ->
Config;
init_per_group(v5 = V5, Config0) ->
Config = rabbit_ct_helpers:set_config(Config0, {mqtt_version, V5}),
case rabbit_ct_broker_helpers:enable_feature_flag(Config, mqtt_v5) of
ok -> Config;
{skip, _} = Skip -> Skip
end.
init_per_group(v5 = V5, Config) ->
rabbit_ct_helpers:set_config(Config, {mqtt_version, V5}).
end_per_group(_, Config) ->
Config.

View File

@ -1018,14 +1018,9 @@ rabbitmq_management_agent:
- rabbit_mgmt_metrics_gc
- rabbit_mgmt_storage
rabbitmq_mqtt:
- Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand
- Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand
- mc_mqtt
- mqtt_machine
- mqtt_machine_v0
- mqtt_node
- rabbit_mqtt
- rabbit_mqtt_collector
- rabbit_mqtt_confirms
- rabbit_mqtt_ff
- rabbit_mqtt_internal_event_handler