From 4df3080fa1792b2a5db06130ac9bd5e7e7223f1b Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Wed, 19 Jul 2023 10:00:41 -0700 Subject: [PATCH] New quorum queue members join as temporary non-voters Because both `add_member` and `grow` default to Membership status `promotable`, new members will have to catch up before they are considered cluster members. This can be overridden with either `voter` or (permanent `non_voter` statuses. The latter one is useless without additional tooling so kept undocumented. - non-voters do not affect quorum size for election purposes - `observer_cli` reports their status with lowercase 'f' - `rabbitmq-queues check_if_node_is_quorum_critical` takes voter status into account --- deps/rabbit/BUILD.bazel | 5 ++ deps/rabbit/app.bzl | 9 +++ .../src/rabbit_observer_cli_quorum_queues.erl | 2 + deps/rabbit/src/rabbit_quorum_queue.erl | 54 ++++++++----- deps/rabbit/test/quorum_queue_SUITE.erl | 14 ++-- deps/rabbit/test/unit_quorum_queue_SUITE.erl | 49 ++++++++++++ .../cli/queues/commands/add_member_command.ex | 75 ++++++++++++++----- .../cli/queues/commands/grow_command.ex | 46 +++++++----- .../test/queues/add_member_command_test.exs | 25 ++++++- .../test/queues/grow_command_test.exs | 28 ++++++- ...mt_wm_quorum_queue_replicas_add_member.erl | 10 ++- ...bit_mgmt_wm_quorum_queue_replicas_grow.erl | 6 +- 12 files changed, 254 insertions(+), 69 deletions(-) create mode 100644 deps/rabbit/test/unit_quorum_queue_SUITE.erl diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index aae8c1465f..f4d0fb7f3e 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -941,6 +941,11 @@ rabbitmq_integration_suite( size = "medium", ) +rabbitmq_suite( + name = "unit_quorum_queue_SUITE", + size = "medium", +) + rabbitmq_integration_suite( name = "unit_app_management_SUITE", size = "medium", diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 0653bc596e..0dd2dec9bf 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -1692,6 +1692,15 @@ def test_suite_beam_files(name = "test_suite_beam_files"): erlc_opts = "//:test_erlc_opts", deps = ["//deps/amqp_client:erlang_app"], ) + erlang_bytecode( + name = "unit_quorum_queue_SUITE_beam_files", + testonly = True, + srcs = ["test/unit_quorum_queue_SUITE.erl"], + outs = ["test/unit_quorum_queue_SUITE.beam"], + app_name = "rabbit", + erlc_opts = "//:test_erlc_opts", + deps = ["//deps/amqp_client:erlang_app"], + ) erlang_bytecode( name = "unit_app_management_SUITE_beam_files", testonly = True, diff --git a/deps/rabbit/src/rabbit_observer_cli_quorum_queues.erl b/deps/rabbit/src/rabbit_observer_cli_quorum_queues.erl index 56379ea893..c213ffbd39 100644 --- a/deps/rabbit/src/rabbit_observer_cli_quorum_queues.erl +++ b/deps/rabbit/src/rabbit_observer_cli_quorum_queues.erl @@ -137,6 +137,8 @@ sheet_body(PrevState) -> case maps:get(InternalName, RaStates, undefined) of leader -> "L"; follower -> "F"; + promotable -> "f"; %% temporary non-voter + non_voter -> "-"; %% permanent non-voter _ -> "?" end, format_int(proplists:get_value(memory, ProcInfo)), diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index b5de8644cf..ab60cc032b 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -39,21 +39,24 @@ -export([open_files/1]). -export([peek/2, peek/3]). -export([add_member/2, - add_member/4]). + add_member/3, + add_member/4, + add_member/5]). -export([delete_member/3, delete_member/2]). -export([requeue/3]). -export([policy_changed/1]). -export([format_ra_event/3]). -export([cleanup_data_dir/0]). -export([shrink_all/1, - grow/4]). + grow/4, + grow/5]). -export([transfer_leadership/2, get_replicas/1, queue_length/1]). -export([file_handle_leader_reservation/1, file_handle_other_reservation/0]). -export([file_handle_release_reservation/0]). -export([list_with_minimum_quorum/0, filter_quorum_critical/1, - filter_quorum_critical/2, + filter_quorum_critical/3, all_replica_states/0]). -export([capabilities/0]). -export([repair_amqqueue_nodes/1, @@ -84,6 +87,7 @@ -type msg_id() :: non_neg_integer(). -type qmsg() :: {rabbit_types:r('queue'), pid(), msg_id(), boolean(), mc:state()}. +-type membership() :: voter | non_voter | promotable. %% see ra_membership() in Ra. -define(RA_SYSTEM, quorum_queues). -define(RA_WAL_NAME, ra_log_wal). @@ -384,13 +388,15 @@ become_leader(QName, Name) -> all_replica_states() -> Rows0 = ets:tab2list(ra_state), Rows = lists:map(fun - %% TODO: support other membership types + ({K, follower, promotable}) -> + {K, promotable}; + ({K, follower, non_voter}) -> + {K, non_voter}; ({K, S, voter}) -> {K, S}; (T) -> T end, Rows0), - {node(), maps:from_list(Rows)}. -spec list_with_minimum_quorum() -> [amqqueue:amqqueue()]. @@ -419,20 +425,22 @@ filter_quorum_critical(Queues) -> ReplicaStates = maps:from_list( rabbit_misc:append_rpc_all_nodes(rabbit_nodes:list_running(), ?MODULE, all_replica_states, [])), - filter_quorum_critical(Queues, ReplicaStates). + filter_quorum_critical(Queues, ReplicaStates, node()). --spec filter_quorum_critical([amqqueue:amqqueue()], #{node() => #{atom() => atom()}}) -> [amqqueue:amqqueue()]. +-spec filter_quorum_critical([amqqueue:amqqueue()], #{node() => #{atom() => atom()}}, node()) -> + [amqqueue:amqqueue()]. -filter_quorum_critical(Queues, ReplicaStates) -> +filter_quorum_critical(Queues, ReplicaStates, Self) -> lists:filter(fun (Q) -> MemberNodes = rabbit_amqqueue:get_quorum_nodes(Q), {Name, _Node} = amqqueue:get_pid(Q), AllUp = lists:filter(fun (N) -> - {Name, _} = amqqueue:get_pid(Q), case maps:get(N, ReplicaStates, undefined) of #{Name := State} when State =:= follower orelse - State =:= leader -> + State =:= leader orelse + (State =:= promotable andalso N =:= Self) orelse + (State =:= non_voter andalso N =:= Self) -> true; _ -> false end @@ -1143,7 +1151,7 @@ get_sys_status(Proc) -> end. -add_member(VHost, Name, Node, Timeout) -> +add_member(VHost, Name, Node, Membership, Timeout) when is_binary(VHost) -> QName = #resource{virtual_host = VHost, name = Name, kind = queue}, rabbit_log:debug("Asked to add a replica for queue ~ts on node ~ts", [rabbit_misc:rs(QName), Node]), case rabbit_amqqueue:lookup(QName) of @@ -1161,7 +1169,7 @@ add_member(VHost, Name, Node, Timeout) -> rabbit_log:debug("Quorum ~ts already has a replica on node ~ts", [rabbit_misc:rs(QName), Node]), ok; false -> - add_member(Q, Node, Timeout) + add_member(Q, Node, Membership, Timeout) end end; {ok, _Q} -> @@ -1171,9 +1179,15 @@ add_member(VHost, Name, Node, Timeout) -> end. add_member(Q, Node) -> - add_member(Q, Node, ?ADD_MEMBER_TIMEOUT). + add_member(Q, Node, promotable). -add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) -> +add_member(Q, Node, Membership) -> + add_member(Q, Node, Membership, ?ADD_MEMBER_TIMEOUT). + +add_member(VHost, Name, Node, Timeout) when is_binary(VHost) -> + %% NOTE needed to pass mixed cluster tests. + add_member(VHost, Name, Node, promotable, Timeout); +add_member(Q, Node, Membership, Timeout) when ?amqqueue_is_quorum(Q) -> {RaName, _} = amqqueue:get_pid(Q), QName = amqqueue:get_name(Q), %% TODO parallel calls might crash this, or add a duplicate in quorum_nodes @@ -1183,7 +1197,7 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) -> ?TICK_TIMEOUT), SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval, ?SNAPSHOT_INTERVAL), - Conf = make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval, voter), + Conf = make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval, Membership), case ra:start_server(?RA_SYSTEM, Conf) of ok -> ServerIdSpec = maps:with([id, uid, membership], Conf), @@ -1295,17 +1309,21 @@ shrink_all(Node) -> amqqueue:get_type(Q) == ?MODULE, lists:member(Node, get_nodes(Q))]. --spec grow(node(), binary(), binary(), all | even) -> + + grow(Node, VhostSpec, QueueSpec, Strategy) -> + grow(Node, VhostSpec, QueueSpec, Strategy, promotable). + +-spec grow(node(), binary(), binary(), all | even, membership()) -> [{rabbit_amqqueue:name(), {ok, pos_integer()} | {error, pos_integer(), term()}}]. - grow(Node, VhostSpec, QueueSpec, Strategy) -> + grow(Node, VhostSpec, QueueSpec, Strategy, Membership) -> Running = rabbit_nodes:list_running(), [begin Size = length(get_nodes(Q)), QName = amqqueue:get_name(Q), rabbit_log:info("~ts: adding a new member (replica) on node ~w", [rabbit_misc:rs(QName), Node]), - case add_member(Q, Node, ?ADD_MEMBER_TIMEOUT) of + case add_member(Q, Node, Membership) of ok -> {QName, {ok, Size + 1}}; {error, Err} -> diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index ed1fcde693..9c53098150 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1792,7 +1792,7 @@ add_member_not_running(Config) -> declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), ?assertEqual({error, node_not_running}, rpc:call(Server, rabbit_quorum_queue, add_member, - [<<"/">>, QQ, 'rabbit@burrow', 5000])). + [<<"/">>, QQ, 'rabbit@burrow', voter, 5000])). add_member_classic(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1801,7 +1801,7 @@ add_member_classic(Config) -> ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])), ?assertEqual({error, classic_queue_not_supported}, rpc:call(Server, rabbit_quorum_queue, add_member, - [<<"/">>, CQ, Server, 5000])). + [<<"/">>, CQ, Server, voter, 5000])). add_member_wrong_type(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1811,7 +1811,7 @@ add_member_wrong_type(Config) -> declare(Ch, SQ, [{<<"x-queue-type">>, longstr, <<"stream">>}])), ?assertEqual({error, not_quorum_queue}, rpc:call(Server, rabbit_quorum_queue, add_member, - [<<"/">>, SQ, Server, 5000])). + [<<"/">>, SQ, Server, voter, 5000])). add_member_already_a_member(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1822,14 +1822,14 @@ add_member_already_a_member(Config) -> %% idempotent by design ?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, add_member, - [<<"/">>, QQ, Server, 5000])). + [<<"/">>, QQ, Server, voter, 5000])). add_member_not_found(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), QQ = ?config(queue_name, Config), ?assertEqual({error, not_found}, rpc:call(Server, rabbit_quorum_queue, add_member, - [<<"/">>, QQ, Server, 5000])). + [<<"/">>, QQ, Server, voter, 5000])). add_member(Config) -> [Server0, Server1] = Servers0 = @@ -1840,12 +1840,12 @@ add_member(Config) -> declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), ?assertEqual({error, node_not_running}, rpc:call(Server0, rabbit_quorum_queue, add_member, - [<<"/">>, QQ, Server1, 5000])), + [<<"/">>, QQ, Server1, voter, 5000])), ok = rabbit_control_helper:command(stop_app, Server1), ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), rabbit_control_helper:command(start_app, Server1), ?assertEqual(ok, rpc:call(Server1, rabbit_quorum_queue, add_member, - [<<"/">>, QQ, Server1, 5000])), + [<<"/">>, QQ, Server1, voter, 5000])), Info = rpc:call(Server0, rabbit_quorum_queue, infos, [rabbit_misc:r(<<"/">>, queue, QQ)]), Servers = lists:sort(Servers0), diff --git a/deps/rabbit/test/unit_quorum_queue_SUITE.erl b/deps/rabbit/test/unit_quorum_queue_SUITE.erl new file mode 100644 index 0000000000..101946cfaf --- /dev/null +++ b/deps/rabbit/test/unit_quorum_queue_SUITE.erl @@ -0,0 +1,49 @@ +-module(unit_quorum_queue_SUITE). + +-compile(export_all). + +all() -> + [ + all_replica_states_includes_nonvoters, + filter_quorum_critical_accounts_nonvoters + ]. + +filter_quorum_critical_accounts_nonvoters(_Config) -> + Nodes = [test@leader, test@follower1, test@follower2], + Qs0 = [amqqueue:new(rabbit_misc:r(<<"/">>, queue, <<"q1">>), + {q1, test@leader}, + false, false, none, [], undefined, #{}), + amqqueue:new(rabbit_misc:r(<<"/">>, queue, <<"q2">>), + {q2, test@leader}, + false, false, none, [], undefined, #{})], + Qs = [Q1, Q2] = lists:map(fun (Q) -> + amqqueue:set_type_state(Q, #{nodes => Nodes}) + end, Qs0), + Ss = #{test@leader => #{q1 => leader, q2 => leader}, + test@follower1 => #{q1 => promotable, q2 => follower}, + test@follower2 => #{q1 => follower, q2 => promotable}}, + Qs = rabbit_quorum_queue:filter_quorum_critical(Qs, Ss, test@leader), + [Q2] = rabbit_quorum_queue:filter_quorum_critical(Qs, Ss, test@follower1), + [Q1] = rabbit_quorum_queue:filter_quorum_critical(Qs, Ss, test@follower2), + ok. + +all_replica_states_includes_nonvoters(_Config) -> + ets:new(ra_state, [named_table, public, {write_concurrency, true}]), + ets:insert(ra_state, [ + {q1, leader, voter}, + {q2, follower, voter}, + {q3, follower, promotable}, + %% pre ra-2.7.0 + {q4, leader}, + {q5, follower} + ]), + {_, #{ + q1 := leader, + q2 := follower, + q3 := promotable, + q4 := leader, + q5 := follower + }} = rabbit_quorum_queue:all_replica_states(), + + true = ets:delete(ra_state), + ok. diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/add_member_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/add_member_command.ex index e4d96715bc..9e409abae8 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/add_member_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/add_member_command.ex @@ -10,21 +10,51 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do @behaviour RabbitMQ.CLI.CommandBehaviour - @default_timeout 5_000 + defp default_opts, do: %{vhost: "/", membership: "promotable", timeout: 5_000} def merge_defaults(args, opts) do - timeout = - case opts[:timeout] do - nil -> @default_timeout - :infinity -> @default_timeout - other -> other - end + default = default_opts() - {args, Map.merge(%{vhost: "/", timeout: timeout}, opts)} + opts = + Map.update( + opts, + :timeout, + :infinity, + &case &1 do + :infinity -> default.timeout + other -> other + end + ) + + {args, Map.merge(default, opts)} end - use RabbitMQ.CLI.Core.AcceptsDefaultSwitchesAndTimeout - use RabbitMQ.CLI.Core.AcceptsTwoPositionalArguments + def switches(), + do: [ + timeout: :integer, + membership: :string + ] + + def aliases(), do: [t: :timeout] + + def validate(args, _) when length(args) < 2 do + {:validation_failure, :not_enough_args} + end + + def validate(args, _) when length(args) > 2 do + {:validation_failure, :too_many_args} + end + + def validate(_, %{membership: m}) + when not (m == "promotable" or + m == "non_voter" or + m == "voter") do + {:validation_failure, "voter status '#{m}' is not recognised."} + end + + def validate(_, _) do + :ok + end def validate_execution_environment(args, opts) do Validators.chain( @@ -39,13 +69,19 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do ) end - def run([name, node] = _args, %{vhost: vhost, node: node_name, timeout: timeout}) do - case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :add_member, [ - vhost, - name, - to_atom(node), - timeout - ]) do + def run( + [name, node] = _args, + %{vhost: vhost, node: node_name, timeout: timeout, membership: membership} + ) do + args = [vhost, name, to_atom(node)] + + args = + case to_atom(membership) do + :promotable -> args ++ [timeout] + other -> args ++ [other, timeout] + end + + case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :add_member, args) do {:error, :classic_queue_not_supported} -> {:error, "Cannot add members to a classic queue"} @@ -59,12 +95,13 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do use RabbitMQ.CLI.DefaultOutput - def usage, do: "add_member [--vhost ] " + def usage, do: "add_member [--vhost ] [--membership ]" def usage_additional do [ ["", "quorum queue name"], - ["", "node to add a new replica on"] + ["", "node to add a new replica on"], + ["--membership ", "add a promotable non-voter (default) or full voter"] ] end diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex index f6ab616a62..28d9dc21c2 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex @@ -10,12 +10,14 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do @behaviour RabbitMQ.CLI.CommandBehaviour - defp default_opts, do: %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false} + defp default_opts, + do: %{vhost_pattern: ".*", queue_pattern: ".*", membership: "promotable", errors_only: false} def switches(), do: [ vhost_pattern: :string, queue_pattern: :string, + membership: :string, errors_only: :boolean ] @@ -31,17 +33,21 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do {:validation_failure, :too_many_args} end - def validate([_, s], _) do - case s do - "all" -> - :ok + def validate([_, s], _) + when not (s == "all" or + s == "even") do + {:validation_failure, "strategy '#{s}' is not recognised."} + end - "even" -> - :ok + def validate(_, %{membership: m}) + when not (m == "promotable" or + m == "non_voter" or + m == "voter") do + {:validation_failure, "voter status '#{m}' is not recognised."} + end - _ -> - {:validation_failure, "strategy '#{s}' is not recognised."} - end + def validate(_, _) do + :ok end def validate_execution_environment(args, opts) do @@ -58,14 +64,18 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do node: node_name, vhost_pattern: vhost_pat, queue_pattern: queue_pat, + membership: membership, errors_only: errors_only }) do - case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :grow, [ - to_atom(node), - vhost_pat, - queue_pat, - to_atom(strategy) - ]) do + args = [to_atom(node), vhost_pat, queue_pat, to_atom(strategy)] + + args = + case to_atom(membership) do + :promotable -> args + other -> args ++ [other] + end + + case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :grow, args) do {:error, _} = error -> error @@ -97,7 +107,8 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do def formatter(), do: RabbitMQ.CLI.Formatters.Table def usage, - do: "grow [--vhost-pattern ] [--queue-pattern ]" + do: + "grow [--vhost-pattern ] [--queue-pattern ] [--membership ]" def usage_additional do [ @@ -108,6 +119,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do ], ["--queue-pattern ", "regular expression to match queue names"], ["--vhost-pattern ", "regular expression to match virtual host names"], + ["--membership ", "add a promotable non-voter (default) or full voter"], ["--errors-only", "only list queues which reported an error"] ] end diff --git a/deps/rabbitmq_cli/test/queues/add_member_command_test.exs b/deps/rabbitmq_cli/test/queues/add_member_command_test.exs index 90a9d0b8a7..f967f8c814 100644 --- a/deps/rabbitmq_cli/test/queues/add_member_command_test.exs +++ b/deps/rabbitmq_cli/test/queues/add_member_command_test.exs @@ -20,6 +20,8 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommandTest do {:ok, opts: %{ node: get_rabbit_hostname(), + membership: "voter", + vhost: "/", timeout: context[:test_timeout] || 30000 }} end @@ -42,17 +44,36 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommandTest do ) == {:validation_failure, :too_many_args} end + test "validate: when membership promotable is provided, returns a success" do + assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{membership: "promotable"}) == + :ok + end + + test "validate: when membership voter is provided, returns a success" do + assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{membership: "voter"}) == :ok + end + + test "validate: when membership non_voter is provided, returns a success" do + assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{membership: "non_voter"}) == + :ok + end + + test "validate: when wrong membership is provided, returns failure" do + assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{membership: "banana"}) == + {:validation_failure, "voter status 'banana' is not recognised."} + end + test "validate: treats two positional arguments and default switches as a success" do assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{}) == :ok end @tag test_timeout: 3000 - test "run: targeting an unreachable node throws a badrpc" do + test "run: targeting an unreachable node throws a badrpc", context do assert match?( {:badrpc, _}, @command.run( ["quorum-queue-a", "rabbit@new-node"], - %{node: :jake@thedog, vhost: "/", timeout: 200} + Map.merge(context[:opts], %{node: :jake@thedog}) ) ) end diff --git a/deps/rabbitmq_cli/test/queues/grow_command_test.exs b/deps/rabbitmq_cli/test/queues/grow_command_test.exs index fbf35d536b..8cb40ae710 100644 --- a/deps/rabbitmq_cli/test/queues/grow_command_test.exs +++ b/deps/rabbitmq_cli/test/queues/grow_command_test.exs @@ -23,13 +23,20 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do timeout: context[:test_timeout] || 30000, vhost_pattern: ".*", queue_pattern: ".*", + membership: "promotable", errors_only: false }} end test "merge_defaults: defaults to reporting complete results" do assert @command.merge_defaults([], %{}) == - {[], %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false}} + {[], + %{ + vhost_pattern: ".*", + queue_pattern: ".*", + errors_only: false, + membership: "promotable" + }} end test "validate: when no arguments are provided, returns a failure" do @@ -58,13 +65,30 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do {:validation_failure, :too_many_args} end + test "validate: when membership promotable is provided, returns a success" do + assert @command.validate(["quorum-queue-a", "all"], %{membership: "promotable"}) == :ok + end + + test "validate: when membership voter is provided, returns a success" do + assert @command.validate(["quorum-queue-a", "all"], %{membership: "voter"}) == :ok + end + + test "validate: when membership non_voter is provided, returns a success" do + assert @command.validate(["quorum-queue-a", "all"], %{membership: "non_voter"}) == :ok + end + + test "validate: when wrong membership is provided, returns failure" do + assert @command.validate(["quorum-queue-a", "all"], %{membership: "banana"}) == + {:validation_failure, "voter status 'banana' is not recognised."} + end + @tag test_timeout: 3000 test "run: targeting an unreachable node throws a badrpc", context do assert match?( {:badrpc, _}, @command.run( ["quorum-queue-a", "all"], - Map.merge(context[:opts], %{node: :jake@thedog, timeout: 200}) + Map.merge(context[:opts], %{node: :jake@thedog}) ) ) end diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl index 2e6376189f..9d91a5f90b 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl @@ -38,11 +38,17 @@ accept_content(ReqData, Context) -> QName = rabbit_mgmt_util:id(queue, ReqData), Res = rabbit_mgmt_util:with_decode( [node], ReqData, Context, - fun([NewReplicaNode], _Body, _ReqData) -> + fun([NewReplicaNode], Body, _ReqData) -> + Membership = maps:get(<<"membership">>, Body, promotable), rabbit_amqqueue:with( rabbit_misc:r(VHost, queue, QName), fun(_Q) -> - rabbit_quorum_queue:add_member(VHost, QName, rabbit_data_coercion:to_atom(NewReplicaNode), ?TIMEOUT) + rabbit_quorum_queue:add_member( + VHost, + QName, + rabbit_data_coercion:to_atom(NewReplicaNode), + rabbit_data_coercion:to_atom(Membership), + ?TIMEOUT) end) end), case Res of diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl index 7b87604fab..e768cbdf44 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl @@ -39,12 +39,14 @@ accept_content(ReqData, Context) -> NewReplicaNode = rabbit_mgmt_util:id(node, ReqData), rabbit_mgmt_util:with_decode( [vhost_pattern, queue_pattern, strategy], ReqData, Context, - fun([VHPattern, QPattern, Strategy], _Body, _ReqData) -> + fun([VHPattern, QPattern, Strategy], Body, _ReqData) -> + Membership = maps:get(<<"membership">>, Body, promotable), rabbit_quorum_queue:grow( rabbit_data_coercion:to_atom(NewReplicaNode), VHPattern, QPattern, - rabbit_data_coercion:to_atom(Strategy)) + rabbit_data_coercion:to_atom(Strategy), + rabbit_data_coercion:to_atom(Membership)) end), {true, ReqData, Context}.