From de75f3bf79d34c819e6e3776f6b5dd36f08a4b89 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 13 Jun 2023 01:36:54 +0400 Subject: [PATCH 1/4] HTTP API: introduce two endpoints for QQ replica management to match what is offered by the CLI (`rabbitmq-queues`). Part of #8532. --- deps/rabbit/src/rabbit_quorum_queue.erl | 6 +- deps/rabbitmq_management/app.bzl | 6 ++ .../src/rabbit_mgmt_dispatcher.erl | 2 + .../src/rabbit_mgmt_util.erl | 5 +- ...mt_wm_quorum_queue_replicas_add_member.erl | 59 +++++++++++++++++++ ...wm_quorum_queue_replicas_delete_member.erl | 57 ++++++++++++++++++ moduleindex.yaml | 2 + 7 files changed, 135 insertions(+), 2 deletions(-) create mode 100644 deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl create mode 100644 deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 389d767143..b38a2a7ec3 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1064,6 +1064,7 @@ get_sys_status(Proc) -> add_member(VHost, Name, Node, Timeout) -> QName = #resource{virtual_host = VHost, name = Name, kind = queue}, + rabbit_log:debug("Asked to add a replica for queue ~ts on node ~ts", [rabbit_misc:rs(QName), Node]), case rabbit_amqqueue:lookup(QName) of {ok, Q} when ?amqqueue_is_classic(Q) -> {error, classic_queue_not_supported}; @@ -1076,6 +1077,7 @@ add_member(VHost, Name, Node, Timeout) -> case lists:member(Node, QNodes) of true -> %% idempotent by design + rabbit_log:debug("Quorum ~ts already has a replica on node ~ts", [rabbit_misc:rs(QName), Node]), ok; false -> add_member(Q, Node, Timeout) @@ -1110,6 +1112,7 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) -> amqqueue:set_pid(Q2, Leader) end, _ = rabbit_amqqueue:update(QName, Fun), + rabbit_log:info("Added a replica of quorum ~ts on node ~ts", [rabbit_misc:rs(QName), Node]), ok; {timeout, _} -> _ = ra:force_delete_server(?RA_SYSTEM, ServerId), @@ -1120,7 +1123,8 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) -> E end; E -> - E + rabbit_log:warning("Could not add a replica of quorum ~ts on node ~ts: ~p", [rabbit_misc:rs(QName), Node, E]), + E end. delete_member(VHost, Name, Node) -> diff --git a/deps/rabbitmq_management/app.bzl b/deps/rabbitmq_management/app.bzl index 6e39bb52e1..69d14a98fd 100644 --- a/deps/rabbitmq_management/app.bzl +++ b/deps/rabbitmq_management/app.bzl @@ -93,6 +93,8 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_mgmt_wm_queue_get.erl", "src/rabbit_mgmt_wm_queue_purge.erl", "src/rabbit_mgmt_wm_queues.erl", + "src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl", + "src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl", "src/rabbit_mgmt_wm_rebalance_queues.erl", "src/rabbit_mgmt_wm_redirect.erl", "src/rabbit_mgmt_wm_reset.erl", @@ -219,6 +221,8 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_mgmt_wm_queue_get.erl", "src/rabbit_mgmt_wm_queue_purge.erl", "src/rabbit_mgmt_wm_queues.erl", + "src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl", + "src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl", "src/rabbit_mgmt_wm_rebalance_queues.erl", "src/rabbit_mgmt_wm_redirect.erl", "src/rabbit_mgmt_wm_reset.erl", @@ -434,6 +438,8 @@ def all_srcs(name = "all_srcs"): "src/rabbit_mgmt_wm_queue_get.erl", "src/rabbit_mgmt_wm_queue_purge.erl", "src/rabbit_mgmt_wm_queues.erl", + "src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl", + "src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl", "src/rabbit_mgmt_wm_rebalance_queues.erl", "src/rabbit_mgmt_wm_redirect.erl", "src/rabbit_mgmt_wm_reset.erl", diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_dispatcher.erl b/deps/rabbitmq_management/src/rabbit_mgmt_dispatcher.erl index 7ac2012107..c6fc416c51 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_dispatcher.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_dispatcher.erl @@ -140,6 +140,8 @@ dispatcher() -> {"/queues/:vhost/:queue/contents", rabbit_mgmt_wm_queue_purge, []}, {"/queues/:vhost/:queue/get", rabbit_mgmt_wm_queue_get, []}, {"/queues/:vhost/:queue/actions", rabbit_mgmt_wm_queue_actions, []}, + {"/queues/quorum/:vhost/:queue/replicas/add", rabbit_mgmt_wm_quorum_queue_replicas_add_member, []}, + {"/queues/quorum/:vhost/:queue/replicas/delete", rabbit_mgmt_wm_quorum_queue_replicas_delete_member, []}, {"/bindings", rabbit_mgmt_wm_bindings, [all]}, {"/bindings/:vhost", rabbit_mgmt_wm_bindings, [all]}, {"/bindings/:vhost/e/:source/:dtype/:destination", rabbit_mgmt_wm_bindings, [source_destination]}, diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_util.erl b/deps/rabbitmq_management/src/rabbit_mgmt_util.erl index ba34f09184..aa2dc0a76f 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_util.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_util.erl @@ -18,7 +18,7 @@ is_authorized_vhost_visible_for_monitoring/2, is_authorized_global_parameters/2]). -export([user/1]). --export([bad_request/3, bad_request_exception/4, internal_server_error/4, +-export([bad_request/3, service_unavailable/3, bad_request_exception/4, internal_server_error/4, id/2, parse_bool/1, parse_int/1, redirect_to_home/3]). -export([with_decode/4, not_found/3]). -export([with_channel/4, with_channel/5]). @@ -768,6 +768,9 @@ a2b(B) -> B. bad_request(Reason, ReqData, Context) -> halt_response(400, bad_request, Reason, ReqData, Context). +service_unavailable(Reason, ReqData, Context) -> + halt_response(503, service_unavailable, Reason, ReqData, Context). + not_authenticated(Reason, ReqData, Context) -> case is_oauth2_enabled() of false -> diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl new file mode 100644 index 0000000000..6befd137c4 --- /dev/null +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl @@ -0,0 +1,59 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(rabbit_mgmt_wm_quorum_queue_replicas_add_member). + +-export([init/2, resource_exists/2, is_authorized/2, allowed_methods/2, + content_types_accepted/2, accept_content/2]). +-export([variances/2]). + +-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). + +-define(TIMEOUT, 30_000). + +init(Req, _State) -> + {cowboy_rest, rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE), #context{}}. + +variances(Req, Context) -> + {[<<"accept-encoding">>, <<"origin">>], Req, Context}. + +allowed_methods(ReqData, Context) -> + {[<<"POST">>, <<"OPTIONS">>], ReqData, Context}. + +resource_exists(ReqData, Context) -> + {case rabbit_mgmt_wm_queue:queue(ReqData) of + not_found -> false; + _ -> true + end, ReqData, Context}. + +content_types_accepted(ReqData, Context) -> + {[{'*', accept_content}], ReqData, Context}. + +accept_content(ReqData, Context) -> + VHost = rabbit_mgmt_util:vhost(ReqData), + QName = rabbit_mgmt_util:id(queue, ReqData), + Res = rabbit_mgmt_util:with_decode( + [node], ReqData, Context, + fun([NewReplicaNode], _Body, _ReqData) -> + rabbit_amqqueue:with( + rabbit_misc:r(VHost, queue, QName), + fun(_Q) -> + rabbit_quorum_queue:add_member(VHost, QName, rabbit_data_coercion:to_atom(NewReplicaNode), ?TIMEOUT) + end) + end), + case Res of + ok -> + {true, ReqData, Context}; + {ok, _} -> + {true, ReqData, Context}; + {error, Reason} -> + rabbit_mgmt_util:service_unavailable(Reason, ReqData, Context) + end. + + +is_authorized(ReqData, Context) -> + rabbit_mgmt_util:is_authorized_admin(ReqData, Context). \ No newline at end of file diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl new file mode 100644 index 0000000000..be216edd7e --- /dev/null +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl @@ -0,0 +1,57 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(rabbit_mgmt_wm_quorum_queue_replicas_delete_member). + +-export([init/2, resource_exists/2, is_authorized/2, allowed_methods/2, + content_types_accepted/2, delete_resource/2]). +-export([variances/2]). + +-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). + +init(Req, _State) -> + {cowboy_rest, rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE), #context{}}. + +variances(Req, Context) -> + {[<<"accept-encoding">>, <<"origin">>], Req, Context}. + +allowed_methods(ReqData, Context) -> + {[<<"DELETE">>, <<"OPTIONS">>], ReqData, Context}. + +resource_exists(ReqData, Context) -> + {case rabbit_mgmt_wm_queue:queue(ReqData) of + not_found -> false; + _ -> true + end, ReqData, Context}. + +content_types_accepted(ReqData, Context) -> + {[{'*', accept_content}], ReqData, Context}. + +delete_resource(ReqData, Context) -> + VHost = rabbit_mgmt_util:vhost(ReqData), + QName = rabbit_mgmt_util:id(queue, ReqData), + Res = rabbit_mgmt_util:with_decode( + [node], ReqData, Context, + fun([NewReplicaNode], _Body, _ReqData) -> + rabbit_amqqueue:with( + rabbit_misc:r(VHost, queue, QName), + fun(_Q) -> + rabbit_quorum_queue:delete_member(VHost, QName, rabbit_data_coercion:to_atom(NewReplicaNode)) + end) + end), + case Res of + ok -> + {true, ReqData, Context}; + {ok, _} -> + {true, ReqData, Context}; + {error, Reason} -> + rabbit_mgmt_util:service_unavailable(Reason, ReqData, Context) + end. + + +is_authorized(ReqData, Context) -> + rabbit_mgmt_util:is_authorized_admin(ReqData, Context). \ No newline at end of file diff --git a/moduleindex.yaml b/moduleindex.yaml index c1a8c8aff6..2d33f4ad36 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -913,6 +913,8 @@ rabbitmq_management: - rabbit_mgmt_wm_queue_get - rabbit_mgmt_wm_queue_purge - rabbit_mgmt_wm_queues +- rabbit_mgmt_wm_quorum_queue_replicas_add_member +- rabbit_mgmt_wm_quorum_queue_replicas_delete_member - rabbit_mgmt_wm_rebalance_queues - rabbit_mgmt_wm_redirect - rabbit_mgmt_wm_reset From fc895b72127241ac13234b06421ca0c148f323d4 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 14 Jun 2023 00:07:06 +0400 Subject: [PATCH 2/4] Return 202 for QQ replica removal --- .../rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl index be216edd7e..6d8fc7518a 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl @@ -7,7 +7,7 @@ -module(rabbit_mgmt_wm_quorum_queue_replicas_delete_member). -export([init/2, resource_exists/2, is_authorized/2, allowed_methods/2, - content_types_accepted/2, delete_resource/2]). + content_types_accepted/2, delete_resource/2, delete_completed/2]). -export([variances/2]). -include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl"). @@ -52,6 +52,9 @@ delete_resource(ReqData, Context) -> rabbit_mgmt_util:service_unavailable(Reason, ReqData, Context) end. +delete_completed(ReqData, Context) -> + %% return 202 Accepted since this is an inherently asynchronous operation + {false, ReqData, Context}. is_authorized(ReqData, Context) -> rabbit_mgmt_util:is_authorized_admin(ReqData, Context). \ No newline at end of file From e5759e2b600443a4d6b1089b1ec38d5285e054d8 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 14 Jun 2023 02:01:31 +0400 Subject: [PATCH 3/4] Implement {POST, DELETE} /api/queues/quorum/replicas/on/:node/{grow,shrink} Part of #8532. --- deps/rabbit/src/rabbit_quorum_queue.erl | 3 +- deps/rabbitmq_management/app.bzl | 6 +++ .../src/rabbit_mgmt_dispatcher.erl | 2 + ...bit_mgmt_wm_quorum_queue_replicas_grow.erl | 53 +++++++++++++++++++ ...t_mgmt_wm_quorum_queue_replicas_shrink.erl | 38 +++++++++++++ moduleindex.yaml | 2 + 6 files changed, 103 insertions(+), 1 deletion(-) create mode 100644 deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl create mode 100644 deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_shrink.erl diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index b38a2a7ec3..742fe05eba 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1192,6 +1192,7 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> [{rabbit_amqqueue:name(), {ok, pos_integer()} | {error, pos_integer(), term()}}]. shrink_all(Node) -> + rabbit_log:info("Asked to remove all quorum queue replicas from node ~ts", [Node]), [begin QName = amqqueue:get_name(Q), rabbit_log:info("~ts: removing member (replica) on node ~w", @@ -1212,7 +1213,7 @@ shrink_all(Node) -> -spec grow(node(), binary(), binary(), all | even) -> [{rabbit_amqqueue:name(), {ok, pos_integer()} | {error, pos_integer(), term()}}]. -grow(Node, VhostSpec, QueueSpec, Strategy) -> + grow(Node, VhostSpec, QueueSpec, Strategy) -> Running = rabbit_nodes:list_running(), [begin Size = length(get_nodes(Q)), diff --git a/deps/rabbitmq_management/app.bzl b/deps/rabbitmq_management/app.bzl index 69d14a98fd..c3d0372518 100644 --- a/deps/rabbitmq_management/app.bzl +++ b/deps/rabbitmq_management/app.bzl @@ -95,6 +95,8 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_mgmt_wm_queues.erl", "src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl", "src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl", + "src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl", + "src/rabbit_mgmt_wm_quorum_queue_replicas_shrink.erl", "src/rabbit_mgmt_wm_rebalance_queues.erl", "src/rabbit_mgmt_wm_redirect.erl", "src/rabbit_mgmt_wm_reset.erl", @@ -223,6 +225,8 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_mgmt_wm_queues.erl", "src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl", "src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl", + "src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl", + "src/rabbit_mgmt_wm_quorum_queue_replicas_shrink.erl", "src/rabbit_mgmt_wm_rebalance_queues.erl", "src/rabbit_mgmt_wm_redirect.erl", "src/rabbit_mgmt_wm_reset.erl", @@ -440,6 +444,8 @@ def all_srcs(name = "all_srcs"): "src/rabbit_mgmt_wm_queues.erl", "src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl", "src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl", + "src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl", + "src/rabbit_mgmt_wm_quorum_queue_replicas_shrink.erl", "src/rabbit_mgmt_wm_rebalance_queues.erl", "src/rabbit_mgmt_wm_redirect.erl", "src/rabbit_mgmt_wm_reset.erl", diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_dispatcher.erl b/deps/rabbitmq_management/src/rabbit_mgmt_dispatcher.erl index c6fc416c51..270a81aee3 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_dispatcher.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_dispatcher.erl @@ -142,6 +142,8 @@ dispatcher() -> {"/queues/:vhost/:queue/actions", rabbit_mgmt_wm_queue_actions, []}, {"/queues/quorum/:vhost/:queue/replicas/add", rabbit_mgmt_wm_quorum_queue_replicas_add_member, []}, {"/queues/quorum/:vhost/:queue/replicas/delete", rabbit_mgmt_wm_quorum_queue_replicas_delete_member, []}, + {"/queues/quorum/replicas/on/:node/grow", rabbit_mgmt_wm_quorum_queue_replicas_grow, []}, + {"/queues/quorum/replicas/on/:node/shrink", rabbit_mgmt_wm_quorum_queue_replicas_shrink, []}, {"/bindings", rabbit_mgmt_wm_bindings, [all]}, {"/bindings/:vhost", rabbit_mgmt_wm_bindings, [all]}, {"/bindings/:vhost/e/:source/:dtype/:destination", rabbit_mgmt_wm_bindings, [source_destination]}, diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl new file mode 100644 index 0000000000..5c878c6125 --- /dev/null +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl @@ -0,0 +1,53 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(rabbit_mgmt_wm_quorum_queue_replicas_grow). + +-export([init/2, is_authorized/2, allowed_methods/2, + content_types_accepted/2, resource_exists/2, accept_content/2]). +-export([variances/2]). + +-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). + +-define(TIMEOUT, 30_000). + +init(Req, _State) -> + {cowboy_rest, rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE), #context{}}. + +variances(Req, Context) -> + {[<<"accept-encoding">>, <<"origin">>], Req, Context}. + +allowed_methods(ReqData, Context) -> + {[<<"POST">>, <<"OPTIONS">>], ReqData, Context}. + +content_types_accepted(ReqData, Context) -> + {[{'*', accept_content}], ReqData, Context}. + +resource_exists(ReqData, Context) -> + case rabbit_mgmt_util:id(node, ReqData) of + none -> {false, ReqData, Context}; + Node -> + NodeExists = lists:member(rabbit_data_coercion:to_atom(Node), rabbit_nodes:list_running()), + {NodeExists, ReqData, Context} + end. + +accept_content(ReqData, Context) -> + NewReplicaNode = rabbit_mgmt_util:id(node, ReqData), + rabbit_mgmt_util:with_decode( + [vhost_pattern, queue_pattern, strategy], ReqData, Context, + fun([VHPattern, QPattern, Strategy], _Body, _ReqData) -> + rabbit_quorum_queue:grow( + rabbit_data_coercion:to_atom(NewReplicaNode), + VHPattern, + QPattern, + rabbit_data_coercion:to_atom(Strategy)) + end), + {true, ReqData, Context}. + + +is_authorized(ReqData, Context) -> + rabbit_mgmt_util:is_authorized_admin(ReqData, Context). \ No newline at end of file diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_shrink.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_shrink.erl new file mode 100644 index 0000000000..87d47d2ba2 --- /dev/null +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_shrink.erl @@ -0,0 +1,38 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(rabbit_mgmt_wm_quorum_queue_replicas_shrink). + +-export([init/2, is_authorized/2, allowed_methods/2, + content_types_accepted/2, delete_resource/2, delete_completed/2]). +-export([variances/2]). + +-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). + +init(Req, _State) -> + {cowboy_rest, rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE), #context{}}. + +variances(Req, Context) -> + {[<<"accept-encoding">>, <<"origin">>], Req, Context}. + +allowed_methods(ReqData, Context) -> + {[<<"DELETE">>, <<"OPTIONS">>], ReqData, Context}. + +content_types_accepted(ReqData, Context) -> + {[{'*', accept_content}], ReqData, Context}. + +delete_resource(ReqData, Context) -> + NodeToRemove = rabbit_mgmt_util:id(node, ReqData), + rabbit_quorum_queue:shrink_all(rabbit_data_coercion:to_atom(NodeToRemove)), + {true, ReqData, Context}. + +delete_completed(ReqData, Context) -> + %% return 202 Accepted since this is an inherently asynchronous operation + {false, ReqData, Context}. + +is_authorized(ReqData, Context) -> + rabbit_mgmt_util:is_authorized_admin(ReqData, Context). \ No newline at end of file diff --git a/moduleindex.yaml b/moduleindex.yaml index 2d33f4ad36..650f895847 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -915,6 +915,8 @@ rabbitmq_management: - rabbit_mgmt_wm_queues - rabbit_mgmt_wm_quorum_queue_replicas_add_member - rabbit_mgmt_wm_quorum_queue_replicas_delete_member +- rabbit_mgmt_wm_quorum_queue_replicas_grow +- rabbit_mgmt_wm_quorum_queue_replicas_shrink - rabbit_mgmt_wm_rebalance_queues - rabbit_mgmt_wm_redirect - rabbit_mgmt_wm_reset From 99968792fa1df484ab094674d07650359f9ad73c Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 14 Jun 2023 03:14:36 +0400 Subject: [PATCH 4/4] Basic tests for the endpoints introduced in #8532 --- .../include/rabbit_mgmt_test.hrl | 1 + .../src/rabbit_mgmt_test_util.erl | 2 +- ...t_mgmt_wm_quorum_queue_replicas_shrink.erl | 2 +- .../test/clustering_SUITE.erl | 105 +++++++++++++++++- 4 files changed, 106 insertions(+), 4 deletions(-) diff --git a/deps/rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl b/deps/rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl index 83281ad936..857cc89467 100644 --- a/deps/rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl +++ b/deps/rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl @@ -2,6 +2,7 @@ -define(OK, 200). -define(CREATED, 201). +-define(ACCEPTED, 202). -define(NO_CONTENT, 204). -define(SEE_OTHER, 303). -define(BAD_REQUEST, 400). diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_mgmt_test_util.erl b/deps/rabbitmq_ct_helpers/src/rabbit_mgmt_test_util.erl index 45e630442f..0e1f327ad5 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_mgmt_test_util.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_mgmt_test_util.erl @@ -165,7 +165,7 @@ http_delete(Config, Path, CodeExp, Body) -> http_delete(Config, Path, User, Pass, CodeExp, Body) -> {ok, {{_HTTP, CodeAct, _}, Headers, ResBody}} = - req(Config, 0, delete, Path, [auth_header(User, Pass)], Body), + req(Config, 0, delete, Path, [auth_header(User, Pass)], format_for_upload(Body)), assert_code(CodeExp, CodeAct, "DELETE", Path, ResBody), decode(CodeExp, Headers, ResBody). diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_shrink.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_shrink.erl index 87d47d2ba2..745c77ae9b 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_shrink.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_shrink.erl @@ -27,7 +27,7 @@ content_types_accepted(ReqData, Context) -> delete_resource(ReqData, Context) -> NodeToRemove = rabbit_mgmt_util:id(node, ReqData), - rabbit_quorum_queue:shrink_all(rabbit_data_coercion:to_atom(NodeToRemove)), + _ = rabbit_quorum_queue:shrink_all(rabbit_data_coercion:to_atom(NodeToRemove)), {true, ReqData, Context}. delete_completed(ReqData, Context) -> diff --git a/deps/rabbitmq_management/test/clustering_SUITE.erl b/deps/rabbitmq_management/test/clustering_SUITE.erl index 30e93a5020..92a1e34565 100644 --- a/deps/rabbitmq_management/test/clustering_SUITE.erl +++ b/deps/rabbitmq_management/test/clustering_SUITE.erl @@ -15,7 +15,7 @@ -include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl"). -import(rabbit_ct_broker_helpers, [get_node_config/3, restart_node/2]). --import(rabbit_mgmt_test_util, [http_get/2, http_put/4, http_delete/3]). +-import(rabbit_mgmt_test_util, [http_get/2, http_put/4, http_post/4, http_delete/3, http_delete/4]). -import(rabbit_misc, [pget/2]). -compile(nowarn_export_all). @@ -53,7 +53,11 @@ groups() -> vhosts, nodes, overview, - disable_plugin + disable_plugin, + qq_replicas_add, + qq_replicas_delete, + qq_replicas_grow, + qq_replicas_shrink ]} ]. @@ -225,6 +229,91 @@ ha_queue_with_multiple_consumers(Config) -> ok. +qq_replicas_add(Config) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0), + {ok, Chan} = amqp_connection:open_channel(Conn), + _ = queue_declare_quorum(Chan, <<"qq.22">>), + _ = wait_for_queue(Config, "/queues/%2F/qq.22"), + + Nodename1 = rabbit_data_coercion:to_binary(get_node_config(Config, 1, nodename)), + Body = [{node, Nodename1}], + http_post(Config, "/queues/quorum/%2F/qq.22/replicas/add", Body, ?NO_CONTENT), + + http_delete(Config, "/queues/%2F/qq.22", ?NO_CONTENT), + + amqp_channel:close(Chan), + rabbit_ct_client_helpers:close_connection(Conn), + + ok. + +qq_replicas_delete(Config) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0), + {ok, Chan} = amqp_connection:open_channel(Conn), + _ = queue_declare_quorum(Chan, <<"qq.23">>), + _ = wait_for_queue(Config, "/queues/%2F/qq.23"), + + Nodename1 = rabbit_data_coercion:to_binary(get_node_config(Config, 1, nodename)), + Body = [{node, Nodename1}], + http_post(Config, "/queues/quorum/%2F/qq.23/replicas/add", Body, ?NO_CONTENT), + timer:sleep(1100), + + http_delete(Config, "/queues/quorum/%2F/qq.23/replicas/delete", ?ACCEPTED, Body), + timer:sleep(1100), + + http_delete(Config, "/queues/%2F/qq.23", ?NO_CONTENT), + + amqp_channel:close(Chan), + rabbit_ct_client_helpers:close_connection(Conn), + + ok. + +qq_replicas_grow(Config) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0), + {ok, Chan} = amqp_connection:open_channel(Conn), + _ = queue_declare_quorum(Chan, <<"qq.24">>), + _ = wait_for_queue(Config, "/queues/%2F/qq.24"), + + Nodename1 = rabbit_data_coercion:to_list(get_node_config(Config, 1, nodename)), + Body = [ + {strategy, <<"all">>}, + {queue_pattern, <<"qq.24">>}, + {vhost_pattern, <<".*">>} + ], + http_post(Config, "/queues/quorum/replicas/on/" ++ Nodename1 ++ "/grow", Body, ?NO_CONTENT), + timer:sleep(1100), + + http_delete(Config, "/queues/%2F/qq.24", ?NO_CONTENT), + + amqp_channel:close(Chan), + rabbit_ct_client_helpers:close_connection(Conn), + + ok. + +qq_replicas_shrink(Config) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0), + {ok, Chan} = amqp_connection:open_channel(Conn), + _ = queue_declare_quorum(Chan, <<"qq.24">>), + _ = wait_for_queue(Config, "/queues/%2F/qq.24"), + + Nodename1 = rabbit_data_coercion:to_list(get_node_config(Config, 1, nodename)), + Body = [ + {strategy, <<"all">>}, + {queue_pattern, <<"qq.24">>}, + {vhost_pattern, <<".*">>} + ], + http_post(Config, "/queues/quorum/replicas/on/" ++ Nodename1 ++ "/grow", Body, ?NO_CONTENT), + timer:sleep(1100), + + http_delete(Config, "/queues/quorum/replicas/on/" ++ Nodename1 ++ "/shrink", ?ACCEPTED), + timer:sleep(1100), + + http_delete(Config, "/queues/%2F/qq.24", ?NO_CONTENT), + + amqp_channel:close(Chan), + rabbit_ct_client_helpers:close_connection(Conn), + + ok. + queue_on_other_node(Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1), {ok, Chan} = amqp_connection:open_channel(Conn), @@ -787,6 +876,18 @@ queue_declare_durable(Chan, Name) -> #'queue.declare_ok'{queue = Q} = amqp_channel:call(Chan, Declare), Q. +queue_declare_quorum(Chan, Name) -> + Declare = #'queue.declare'{ + queue = Name, + durable = true, + arguments = [ + {<<"x-queue-type">>, longstr, <<"quorum">>} + ] + }, + #'queue.declare_ok'{queue = Q} = amqp_channel:call(Chan, Declare), + Q. + + queue_bind(Chan, Ex, Q, Key) -> Binding = #'queue.bind'{queue = Q, exchange = Ex,