New quorum queue members join as temporary non-voters

Because both `add_member` and `grow` default to Membership status `promotable`,
new members will have to catch up before they are considered cluster members.
This can be overridden with either `voter` or (permanent `non_voter` statuses.
The latter one is useless without additional tooling so kept undocumented.

- non-voters do not affect quorum size for election purposes
- `observer_cli` reports their status with lowercase 'f'
- `rabbitmq-queues check_if_node_is_quorum_critical` takes voter status into
account
This commit is contained in:
Alex Valiushko 2023-07-19 10:00:41 -07:00
parent c8d4c2334a
commit 4df3080fa1
12 changed files with 254 additions and 69 deletions

View File

@ -941,6 +941,11 @@ rabbitmq_integration_suite(
size = "medium",
)
rabbitmq_suite(
name = "unit_quorum_queue_SUITE",
size = "medium",
)
rabbitmq_integration_suite(
name = "unit_app_management_SUITE",
size = "medium",

9
deps/rabbit/app.bzl vendored
View File

@ -1692,6 +1692,15 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "unit_quorum_queue_SUITE_beam_files",
testonly = True,
srcs = ["test/unit_quorum_queue_SUITE.erl"],
outs = ["test/unit_quorum_queue_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "unit_app_management_SUITE_beam_files",
testonly = True,

View File

@ -137,6 +137,8 @@ sheet_body(PrevState) ->
case maps:get(InternalName, RaStates, undefined) of
leader -> "L";
follower -> "F";
promotable -> "f"; %% temporary non-voter
non_voter -> "-"; %% permanent non-voter
_ -> "?"
end,
format_int(proplists:get_value(memory, ProcInfo)),

View File

@ -39,21 +39,24 @@
-export([open_files/1]).
-export([peek/2, peek/3]).
-export([add_member/2,
add_member/4]).
add_member/3,
add_member/4,
add_member/5]).
-export([delete_member/3, delete_member/2]).
-export([requeue/3]).
-export([policy_changed/1]).
-export([format_ra_event/3]).
-export([cleanup_data_dir/0]).
-export([shrink_all/1,
grow/4]).
grow/4,
grow/5]).
-export([transfer_leadership/2, get_replicas/1, queue_length/1]).
-export([file_handle_leader_reservation/1,
file_handle_other_reservation/0]).
-export([file_handle_release_reservation/0]).
-export([list_with_minimum_quorum/0,
filter_quorum_critical/1,
filter_quorum_critical/2,
filter_quorum_critical/3,
all_replica_states/0]).
-export([capabilities/0]).
-export([repair_amqqueue_nodes/1,
@ -84,6 +87,7 @@
-type msg_id() :: non_neg_integer().
-type qmsg() :: {rabbit_types:r('queue'), pid(), msg_id(), boolean(),
mc:state()}.
-type membership() :: voter | non_voter | promotable. %% see ra_membership() in Ra.
-define(RA_SYSTEM, quorum_queues).
-define(RA_WAL_NAME, ra_log_wal).
@ -384,13 +388,15 @@ become_leader(QName, Name) ->
all_replica_states() ->
Rows0 = ets:tab2list(ra_state),
Rows = lists:map(fun
%% TODO: support other membership types
({K, follower, promotable}) ->
{K, promotable};
({K, follower, non_voter}) ->
{K, non_voter};
({K, S, voter}) ->
{K, S};
(T) ->
T
end, Rows0),
{node(), maps:from_list(Rows)}.
-spec list_with_minimum_quorum() -> [amqqueue:amqqueue()].
@ -419,20 +425,22 @@ filter_quorum_critical(Queues) ->
ReplicaStates = maps:from_list(
rabbit_misc:append_rpc_all_nodes(rabbit_nodes:list_running(),
?MODULE, all_replica_states, [])),
filter_quorum_critical(Queues, ReplicaStates).
filter_quorum_critical(Queues, ReplicaStates, node()).
-spec filter_quorum_critical([amqqueue:amqqueue()], #{node() => #{atom() => atom()}}) -> [amqqueue:amqqueue()].
-spec filter_quorum_critical([amqqueue:amqqueue()], #{node() => #{atom() => atom()}}, node()) ->
[amqqueue:amqqueue()].
filter_quorum_critical(Queues, ReplicaStates) ->
filter_quorum_critical(Queues, ReplicaStates, Self) ->
lists:filter(fun (Q) ->
MemberNodes = rabbit_amqqueue:get_quorum_nodes(Q),
{Name, _Node} = amqqueue:get_pid(Q),
AllUp = lists:filter(fun (N) ->
{Name, _} = amqqueue:get_pid(Q),
case maps:get(N, ReplicaStates, undefined) of
#{Name := State}
when State =:= follower orelse
State =:= leader ->
State =:= leader orelse
(State =:= promotable andalso N =:= Self) orelse
(State =:= non_voter andalso N =:= Self) ->
true;
_ -> false
end
@ -1143,7 +1151,7 @@ get_sys_status(Proc) ->
end.
add_member(VHost, Name, Node, Timeout) ->
add_member(VHost, Name, Node, Membership, Timeout) when is_binary(VHost) ->
QName = #resource{virtual_host = VHost, name = Name, kind = queue},
rabbit_log:debug("Asked to add a replica for queue ~ts on node ~ts", [rabbit_misc:rs(QName), Node]),
case rabbit_amqqueue:lookup(QName) of
@ -1161,7 +1169,7 @@ add_member(VHost, Name, Node, Timeout) ->
rabbit_log:debug("Quorum ~ts already has a replica on node ~ts", [rabbit_misc:rs(QName), Node]),
ok;
false ->
add_member(Q, Node, Timeout)
add_member(Q, Node, Membership, Timeout)
end
end;
{ok, _Q} ->
@ -1171,9 +1179,15 @@ add_member(VHost, Name, Node, Timeout) ->
end.
add_member(Q, Node) ->
add_member(Q, Node, ?ADD_MEMBER_TIMEOUT).
add_member(Q, Node, promotable).
add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
add_member(Q, Node, Membership) ->
add_member(Q, Node, Membership, ?ADD_MEMBER_TIMEOUT).
add_member(VHost, Name, Node, Timeout) when is_binary(VHost) ->
%% NOTE needed to pass mixed cluster tests.
add_member(VHost, Name, Node, promotable, Timeout);
add_member(Q, Node, Membership, Timeout) when ?amqqueue_is_quorum(Q) ->
{RaName, _} = amqqueue:get_pid(Q),
QName = amqqueue:get_name(Q),
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
@ -1183,7 +1197,7 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
?TICK_TIMEOUT),
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
?SNAPSHOT_INTERVAL),
Conf = make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval, voter),
Conf = make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval, Membership),
case ra:start_server(?RA_SYSTEM, Conf) of
ok ->
ServerIdSpec = maps:with([id, uid, membership], Conf),
@ -1295,17 +1309,21 @@ shrink_all(Node) ->
amqqueue:get_type(Q) == ?MODULE,
lists:member(Node, get_nodes(Q))].
-spec grow(node(), binary(), binary(), all | even) ->
grow(Node, VhostSpec, QueueSpec, Strategy) ->
grow(Node, VhostSpec, QueueSpec, Strategy, promotable).
-spec grow(node(), binary(), binary(), all | even, membership()) ->
[{rabbit_amqqueue:name(),
{ok, pos_integer()} | {error, pos_integer(), term()}}].
grow(Node, VhostSpec, QueueSpec, Strategy) ->
grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
Running = rabbit_nodes:list_running(),
[begin
Size = length(get_nodes(Q)),
QName = amqqueue:get_name(Q),
rabbit_log:info("~ts: adding a new member (replica) on node ~w",
[rabbit_misc:rs(QName), Node]),
case add_member(Q, Node, ?ADD_MEMBER_TIMEOUT) of
case add_member(Q, Node, Membership) of
ok ->
{QName, {ok, Size + 1}};
{error, Err} ->

View File

@ -1792,7 +1792,7 @@ add_member_not_running(Config) ->
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertEqual({error, node_not_running},
rpc:call(Server, rabbit_quorum_queue, add_member,
[<<"/">>, QQ, 'rabbit@burrow', 5000])).
[<<"/">>, QQ, 'rabbit@burrow', voter, 5000])).
add_member_classic(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@ -1801,7 +1801,7 @@ add_member_classic(Config) ->
?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
?assertEqual({error, classic_queue_not_supported},
rpc:call(Server, rabbit_quorum_queue, add_member,
[<<"/">>, CQ, Server, 5000])).
[<<"/">>, CQ, Server, voter, 5000])).
add_member_wrong_type(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@ -1811,7 +1811,7 @@ add_member_wrong_type(Config) ->
declare(Ch, SQ, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
?assertEqual({error, not_quorum_queue},
rpc:call(Server, rabbit_quorum_queue, add_member,
[<<"/">>, SQ, Server, 5000])).
[<<"/">>, SQ, Server, voter, 5000])).
add_member_already_a_member(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@ -1822,14 +1822,14 @@ add_member_already_a_member(Config) ->
%% idempotent by design
?assertEqual(ok,
rpc:call(Server, rabbit_quorum_queue, add_member,
[<<"/">>, QQ, Server, 5000])).
[<<"/">>, QQ, Server, voter, 5000])).
add_member_not_found(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
QQ = ?config(queue_name, Config),
?assertEqual({error, not_found},
rpc:call(Server, rabbit_quorum_queue, add_member,
[<<"/">>, QQ, Server, 5000])).
[<<"/">>, QQ, Server, voter, 5000])).
add_member(Config) ->
[Server0, Server1] = Servers0 =
@ -1840,12 +1840,12 @@ add_member(Config) ->
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertEqual({error, node_not_running},
rpc:call(Server0, rabbit_quorum_queue, add_member,
[<<"/">>, QQ, Server1, 5000])),
[<<"/">>, QQ, Server1, voter, 5000])),
ok = rabbit_control_helper:command(stop_app, Server1),
ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
rabbit_control_helper:command(start_app, Server1),
?assertEqual(ok, rpc:call(Server1, rabbit_quorum_queue, add_member,
[<<"/">>, QQ, Server1, 5000])),
[<<"/">>, QQ, Server1, voter, 5000])),
Info = rpc:call(Server0, rabbit_quorum_queue, infos,
[rabbit_misc:r(<<"/">>, queue, QQ)]),
Servers = lists:sort(Servers0),

View File

@ -0,0 +1,49 @@
-module(unit_quorum_queue_SUITE).
-compile(export_all).
all() ->
[
all_replica_states_includes_nonvoters,
filter_quorum_critical_accounts_nonvoters
].
filter_quorum_critical_accounts_nonvoters(_Config) ->
Nodes = [test@leader, test@follower1, test@follower2],
Qs0 = [amqqueue:new(rabbit_misc:r(<<"/">>, queue, <<"q1">>),
{q1, test@leader},
false, false, none, [], undefined, #{}),
amqqueue:new(rabbit_misc:r(<<"/">>, queue, <<"q2">>),
{q2, test@leader},
false, false, none, [], undefined, #{})],
Qs = [Q1, Q2] = lists:map(fun (Q) ->
amqqueue:set_type_state(Q, #{nodes => Nodes})
end, Qs0),
Ss = #{test@leader => #{q1 => leader, q2 => leader},
test@follower1 => #{q1 => promotable, q2 => follower},
test@follower2 => #{q1 => follower, q2 => promotable}},
Qs = rabbit_quorum_queue:filter_quorum_critical(Qs, Ss, test@leader),
[Q2] = rabbit_quorum_queue:filter_quorum_critical(Qs, Ss, test@follower1),
[Q1] = rabbit_quorum_queue:filter_quorum_critical(Qs, Ss, test@follower2),
ok.
all_replica_states_includes_nonvoters(_Config) ->
ets:new(ra_state, [named_table, public, {write_concurrency, true}]),
ets:insert(ra_state, [
{q1, leader, voter},
{q2, follower, voter},
{q3, follower, promotable},
%% pre ra-2.7.0
{q4, leader},
{q5, follower}
]),
{_, #{
q1 := leader,
q2 := follower,
q3 := promotable,
q4 := leader,
q5 := follower
}} = rabbit_quorum_queue:all_replica_states(),
true = ets:delete(ra_state),
ok.

View File

@ -10,21 +10,51 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do
@behaviour RabbitMQ.CLI.CommandBehaviour
@default_timeout 5_000
defp default_opts, do: %{vhost: "/", membership: "promotable", timeout: 5_000}
def merge_defaults(args, opts) do
timeout =
case opts[:timeout] do
nil -> @default_timeout
:infinity -> @default_timeout
default = default_opts()
opts =
Map.update(
opts,
:timeout,
:infinity,
&case &1 do
:infinity -> default.timeout
other -> other
end
)
{args, Map.merge(%{vhost: "/", timeout: timeout}, opts)}
{args, Map.merge(default, opts)}
end
use RabbitMQ.CLI.Core.AcceptsDefaultSwitchesAndTimeout
use RabbitMQ.CLI.Core.AcceptsTwoPositionalArguments
def switches(),
do: [
timeout: :integer,
membership: :string
]
def aliases(), do: [t: :timeout]
def validate(args, _) when length(args) < 2 do
{:validation_failure, :not_enough_args}
end
def validate(args, _) when length(args) > 2 do
{:validation_failure, :too_many_args}
end
def validate(_, %{membership: m})
when not (m == "promotable" or
m == "non_voter" or
m == "voter") do
{:validation_failure, "voter status '#{m}' is not recognised."}
end
def validate(_, _) do
:ok
end
def validate_execution_environment(args, opts) do
Validators.chain(
@ -39,13 +69,19 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do
)
end
def run([name, node] = _args, %{vhost: vhost, node: node_name, timeout: timeout}) do
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :add_member, [
vhost,
name,
to_atom(node),
timeout
]) do
def run(
[name, node] = _args,
%{vhost: vhost, node: node_name, timeout: timeout, membership: membership}
) do
args = [vhost, name, to_atom(node)]
args =
case to_atom(membership) do
:promotable -> args ++ [timeout]
other -> args ++ [other, timeout]
end
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :add_member, args) do
{:error, :classic_queue_not_supported} ->
{:error, "Cannot add members to a classic queue"}
@ -59,12 +95,13 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do
use RabbitMQ.CLI.DefaultOutput
def usage, do: "add_member [--vhost <vhost>] <queue> <node>"
def usage, do: "add_member [--vhost <vhost>] <queue> <node> [--membership <promotable|voter>]"
def usage_additional do
[
["<queue>", "quorum queue name"],
["<node>", "node to add a new replica on"]
["<node>", "node to add a new replica on"],
["--membership <promotable|voter>", "add a promotable non-voter (default) or full voter"]
]
end

View File

@ -10,12 +10,14 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
@behaviour RabbitMQ.CLI.CommandBehaviour
defp default_opts, do: %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false}
defp default_opts,
do: %{vhost_pattern: ".*", queue_pattern: ".*", membership: "promotable", errors_only: false}
def switches(),
do: [
vhost_pattern: :string,
queue_pattern: :string,
membership: :string,
errors_only: :boolean
]
@ -31,17 +33,21 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
{:validation_failure, :too_many_args}
end
def validate([_, s], _) do
case s do
"all" ->
:ok
"even" ->
:ok
_ ->
def validate([_, s], _)
when not (s == "all" or
s == "even") do
{:validation_failure, "strategy '#{s}' is not recognised."}
end
def validate(_, %{membership: m})
when not (m == "promotable" or
m == "non_voter" or
m == "voter") do
{:validation_failure, "voter status '#{m}' is not recognised."}
end
def validate(_, _) do
:ok
end
def validate_execution_environment(args, opts) do
@ -58,14 +64,18 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
node: node_name,
vhost_pattern: vhost_pat,
queue_pattern: queue_pat,
membership: membership,
errors_only: errors_only
}) do
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :grow, [
to_atom(node),
vhost_pat,
queue_pat,
to_atom(strategy)
]) do
args = [to_atom(node), vhost_pat, queue_pat, to_atom(strategy)]
args =
case to_atom(membership) do
:promotable -> args
other -> args ++ [other]
end
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :grow, args) do
{:error, _} = error ->
error
@ -97,7 +107,8 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
def formatter(), do: RabbitMQ.CLI.Formatters.Table
def usage,
do: "grow <node> <all | even> [--vhost-pattern <pattern>] [--queue-pattern <pattern>]"
do:
"grow <node> <all | even> [--vhost-pattern <pattern>] [--queue-pattern <pattern>] [--membership <promotable|voter>]"
def usage_additional do
[
@ -108,6 +119,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
],
["--queue-pattern <pattern>", "regular expression to match queue names"],
["--vhost-pattern <pattern>", "regular expression to match virtual host names"],
["--membership <promotable|voter>", "add a promotable non-voter (default) or full voter"],
["--errors-only", "only list queues which reported an error"]
]
end

View File

@ -20,6 +20,8 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommandTest do
{:ok,
opts: %{
node: get_rabbit_hostname(),
membership: "voter",
vhost: "/",
timeout: context[:test_timeout] || 30000
}}
end
@ -42,17 +44,36 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommandTest do
) == {:validation_failure, :too_many_args}
end
test "validate: when membership promotable is provided, returns a success" do
assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{membership: "promotable"}) ==
:ok
end
test "validate: when membership voter is provided, returns a success" do
assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{membership: "voter"}) == :ok
end
test "validate: when membership non_voter is provided, returns a success" do
assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{membership: "non_voter"}) ==
:ok
end
test "validate: when wrong membership is provided, returns failure" do
assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{membership: "banana"}) ==
{:validation_failure, "voter status 'banana' is not recognised."}
end
test "validate: treats two positional arguments and default switches as a success" do
assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{}) == :ok
end
@tag test_timeout: 3000
test "run: targeting an unreachable node throws a badrpc" do
test "run: targeting an unreachable node throws a badrpc", context do
assert match?(
{:badrpc, _},
@command.run(
["quorum-queue-a", "rabbit@new-node"],
%{node: :jake@thedog, vhost: "/", timeout: 200}
Map.merge(context[:opts], %{node: :jake@thedog})
)
)
end

View File

@ -23,13 +23,20 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do
timeout: context[:test_timeout] || 30000,
vhost_pattern: ".*",
queue_pattern: ".*",
membership: "promotable",
errors_only: false
}}
end
test "merge_defaults: defaults to reporting complete results" do
assert @command.merge_defaults([], %{}) ==
{[], %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false}}
{[],
%{
vhost_pattern: ".*",
queue_pattern: ".*",
errors_only: false,
membership: "promotable"
}}
end
test "validate: when no arguments are provided, returns a failure" do
@ -58,13 +65,30 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do
{:validation_failure, :too_many_args}
end
test "validate: when membership promotable is provided, returns a success" do
assert @command.validate(["quorum-queue-a", "all"], %{membership: "promotable"}) == :ok
end
test "validate: when membership voter is provided, returns a success" do
assert @command.validate(["quorum-queue-a", "all"], %{membership: "voter"}) == :ok
end
test "validate: when membership non_voter is provided, returns a success" do
assert @command.validate(["quorum-queue-a", "all"], %{membership: "non_voter"}) == :ok
end
test "validate: when wrong membership is provided, returns failure" do
assert @command.validate(["quorum-queue-a", "all"], %{membership: "banana"}) ==
{:validation_failure, "voter status 'banana' is not recognised."}
end
@tag test_timeout: 3000
test "run: targeting an unreachable node throws a badrpc", context do
assert match?(
{:badrpc, _},
@command.run(
["quorum-queue-a", "all"],
Map.merge(context[:opts], %{node: :jake@thedog, timeout: 200})
Map.merge(context[:opts], %{node: :jake@thedog})
)
)
end

View File

@ -38,11 +38,17 @@ accept_content(ReqData, Context) ->
QName = rabbit_mgmt_util:id(queue, ReqData),
Res = rabbit_mgmt_util:with_decode(
[node], ReqData, Context,
fun([NewReplicaNode], _Body, _ReqData) ->
fun([NewReplicaNode], Body, _ReqData) ->
Membership = maps:get(<<"membership">>, Body, promotable),
rabbit_amqqueue:with(
rabbit_misc:r(VHost, queue, QName),
fun(_Q) ->
rabbit_quorum_queue:add_member(VHost, QName, rabbit_data_coercion:to_atom(NewReplicaNode), ?TIMEOUT)
rabbit_quorum_queue:add_member(
VHost,
QName,
rabbit_data_coercion:to_atom(NewReplicaNode),
rabbit_data_coercion:to_atom(Membership),
?TIMEOUT)
end)
end),
case Res of

View File

@ -39,12 +39,14 @@ accept_content(ReqData, Context) ->
NewReplicaNode = rabbit_mgmt_util:id(node, ReqData),
rabbit_mgmt_util:with_decode(
[vhost_pattern, queue_pattern, strategy], ReqData, Context,
fun([VHPattern, QPattern, Strategy], _Body, _ReqData) ->
fun([VHPattern, QPattern, Strategy], Body, _ReqData) ->
Membership = maps:get(<<"membership">>, Body, promotable),
rabbit_quorum_queue:grow(
rabbit_data_coercion:to_atom(NewReplicaNode),
VHPattern,
QPattern,
rabbit_data_coercion:to_atom(Strategy))
rabbit_data_coercion:to_atom(Strategy),
rabbit_data_coercion:to_atom(Membership))
end),
{true, ReqData, Context}.