Merge pull request #8542 from rabbitmq/rabbitmq-server-8532

HTTP API: introduce a number of endpoints for QQ replica management
This commit is contained in:
Michael Klishin 2023-06-14 03:54:28 +04:00 committed by GitHub
commit 6aa66a9382
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 346 additions and 6 deletions

View File

@ -1064,6 +1064,7 @@ get_sys_status(Proc) ->
add_member(VHost, Name, Node, Timeout) ->
QName = #resource{virtual_host = VHost, name = Name, kind = queue},
rabbit_log:debug("Asked to add a replica for queue ~ts on node ~ts", [rabbit_misc:rs(QName), Node]),
case rabbit_amqqueue:lookup(QName) of
{ok, Q} when ?amqqueue_is_classic(Q) ->
{error, classic_queue_not_supported};
@ -1076,6 +1077,7 @@ add_member(VHost, Name, Node, Timeout) ->
case lists:member(Node, QNodes) of
true ->
%% idempotent by design
rabbit_log:debug("Quorum ~ts already has a replica on node ~ts", [rabbit_misc:rs(QName), Node]),
ok;
false ->
add_member(Q, Node, Timeout)
@ -1110,6 +1112,7 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
amqqueue:set_pid(Q2, Leader)
end,
_ = rabbit_amqqueue:update(QName, Fun),
rabbit_log:info("Added a replica of quorum ~ts on node ~ts", [rabbit_misc:rs(QName), Node]),
ok;
{timeout, _} ->
_ = ra:force_delete_server(?RA_SYSTEM, ServerId),
@ -1120,7 +1123,8 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
E
end;
E ->
E
rabbit_log:warning("Could not add a replica of quorum ~ts on node ~ts: ~p", [rabbit_misc:rs(QName), Node, E]),
E
end.
delete_member(VHost, Name, Node) ->
@ -1188,6 +1192,7 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
[{rabbit_amqqueue:name(),
{ok, pos_integer()} | {error, pos_integer(), term()}}].
shrink_all(Node) ->
rabbit_log:info("Asked to remove all quorum queue replicas from node ~ts", [Node]),
[begin
QName = amqqueue:get_name(Q),
rabbit_log:info("~ts: removing member (replica) on node ~w",
@ -1208,7 +1213,7 @@ shrink_all(Node) ->
-spec grow(node(), binary(), binary(), all | even) ->
[{rabbit_amqqueue:name(),
{ok, pos_integer()} | {error, pos_integer(), term()}}].
grow(Node, VhostSpec, QueueSpec, Strategy) ->
grow(Node, VhostSpec, QueueSpec, Strategy) ->
Running = rabbit_nodes:list_running(),
[begin
Size = length(get_nodes(Q)),

View File

@ -2,6 +2,7 @@
-define(OK, 200).
-define(CREATED, 201).
-define(ACCEPTED, 202).
-define(NO_CONTENT, 204).
-define(SEE_OTHER, 303).
-define(BAD_REQUEST, 400).

View File

@ -165,7 +165,7 @@ http_delete(Config, Path, CodeExp, Body) ->
http_delete(Config, Path, User, Pass, CodeExp, Body) ->
{ok, {{_HTTP, CodeAct, _}, Headers, ResBody}} =
req(Config, 0, delete, Path, [auth_header(User, Pass)], Body),
req(Config, 0, delete, Path, [auth_header(User, Pass)], format_for_upload(Body)),
assert_code(CodeExp, CodeAct, "DELETE", Path, ResBody),
decode(CodeExp, Headers, ResBody).

View File

@ -93,6 +93,10 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_mgmt_wm_queue_get.erl",
"src/rabbit_mgmt_wm_queue_purge.erl",
"src/rabbit_mgmt_wm_queues.erl",
"src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl",
"src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl",
"src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl",
"src/rabbit_mgmt_wm_quorum_queue_replicas_shrink.erl",
"src/rabbit_mgmt_wm_rebalance_queues.erl",
"src/rabbit_mgmt_wm_redirect.erl",
"src/rabbit_mgmt_wm_reset.erl",
@ -219,6 +223,10 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_mgmt_wm_queue_get.erl",
"src/rabbit_mgmt_wm_queue_purge.erl",
"src/rabbit_mgmt_wm_queues.erl",
"src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl",
"src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl",
"src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl",
"src/rabbit_mgmt_wm_quorum_queue_replicas_shrink.erl",
"src/rabbit_mgmt_wm_rebalance_queues.erl",
"src/rabbit_mgmt_wm_redirect.erl",
"src/rabbit_mgmt_wm_reset.erl",
@ -434,6 +442,10 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_mgmt_wm_queue_get.erl",
"src/rabbit_mgmt_wm_queue_purge.erl",
"src/rabbit_mgmt_wm_queues.erl",
"src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl",
"src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl",
"src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl",
"src/rabbit_mgmt_wm_quorum_queue_replicas_shrink.erl",
"src/rabbit_mgmt_wm_rebalance_queues.erl",
"src/rabbit_mgmt_wm_redirect.erl",
"src/rabbit_mgmt_wm_reset.erl",

View File

@ -140,6 +140,10 @@ dispatcher() ->
{"/queues/:vhost/:queue/contents", rabbit_mgmt_wm_queue_purge, []},
{"/queues/:vhost/:queue/get", rabbit_mgmt_wm_queue_get, []},
{"/queues/:vhost/:queue/actions", rabbit_mgmt_wm_queue_actions, []},
{"/queues/quorum/:vhost/:queue/replicas/add", rabbit_mgmt_wm_quorum_queue_replicas_add_member, []},
{"/queues/quorum/:vhost/:queue/replicas/delete", rabbit_mgmt_wm_quorum_queue_replicas_delete_member, []},
{"/queues/quorum/replicas/on/:node/grow", rabbit_mgmt_wm_quorum_queue_replicas_grow, []},
{"/queues/quorum/replicas/on/:node/shrink", rabbit_mgmt_wm_quorum_queue_replicas_shrink, []},
{"/bindings", rabbit_mgmt_wm_bindings, [all]},
{"/bindings/:vhost", rabbit_mgmt_wm_bindings, [all]},
{"/bindings/:vhost/e/:source/:dtype/:destination", rabbit_mgmt_wm_bindings, [source_destination]},

View File

@ -18,7 +18,7 @@
is_authorized_vhost_visible_for_monitoring/2,
is_authorized_global_parameters/2]).
-export([user/1]).
-export([bad_request/3, bad_request_exception/4, internal_server_error/4,
-export([bad_request/3, service_unavailable/3, bad_request_exception/4, internal_server_error/4,
id/2, parse_bool/1, parse_int/1, redirect_to_home/3]).
-export([with_decode/4, not_found/3]).
-export([with_channel/4, with_channel/5]).
@ -768,6 +768,9 @@ a2b(B) -> B.
bad_request(Reason, ReqData, Context) ->
halt_response(400, bad_request, Reason, ReqData, Context).
service_unavailable(Reason, ReqData, Context) ->
halt_response(503, service_unavailable, Reason, ReqData, Context).
not_authenticated(Reason, ReqData, Context) ->
case is_oauth2_enabled() of
false ->

View File

@ -0,0 +1,59 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_mgmt_wm_quorum_queue_replicas_add_member).
-export([init/2, resource_exists/2, is_authorized/2, allowed_methods/2,
content_types_accepted/2, accept_content/2]).
-export([variances/2]).
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-define(TIMEOUT, 30_000).
init(Req, _State) ->
{cowboy_rest, rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE), #context{}}.
variances(Req, Context) ->
{[<<"accept-encoding">>, <<"origin">>], Req, Context}.
allowed_methods(ReqData, Context) ->
{[<<"POST">>, <<"OPTIONS">>], ReqData, Context}.
resource_exists(ReqData, Context) ->
{case rabbit_mgmt_wm_queue:queue(ReqData) of
not_found -> false;
_ -> true
end, ReqData, Context}.
content_types_accepted(ReqData, Context) ->
{[{'*', accept_content}], ReqData, Context}.
accept_content(ReqData, Context) ->
VHost = rabbit_mgmt_util:vhost(ReqData),
QName = rabbit_mgmt_util:id(queue, ReqData),
Res = rabbit_mgmt_util:with_decode(
[node], ReqData, Context,
fun([NewReplicaNode], _Body, _ReqData) ->
rabbit_amqqueue:with(
rabbit_misc:r(VHost, queue, QName),
fun(_Q) ->
rabbit_quorum_queue:add_member(VHost, QName, rabbit_data_coercion:to_atom(NewReplicaNode), ?TIMEOUT)
end)
end),
case Res of
ok ->
{true, ReqData, Context};
{ok, _} ->
{true, ReqData, Context};
{error, Reason} ->
rabbit_mgmt_util:service_unavailable(Reason, ReqData, Context)
end.
is_authorized(ReqData, Context) ->
rabbit_mgmt_util:is_authorized_admin(ReqData, Context).

View File

@ -0,0 +1,60 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_mgmt_wm_quorum_queue_replicas_delete_member).
-export([init/2, resource_exists/2, is_authorized/2, allowed_methods/2,
content_types_accepted/2, delete_resource/2, delete_completed/2]).
-export([variances/2]).
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
init(Req, _State) ->
{cowboy_rest, rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE), #context{}}.
variances(Req, Context) ->
{[<<"accept-encoding">>, <<"origin">>], Req, Context}.
allowed_methods(ReqData, Context) ->
{[<<"DELETE">>, <<"OPTIONS">>], ReqData, Context}.
resource_exists(ReqData, Context) ->
{case rabbit_mgmt_wm_queue:queue(ReqData) of
not_found -> false;
_ -> true
end, ReqData, Context}.
content_types_accepted(ReqData, Context) ->
{[{'*', accept_content}], ReqData, Context}.
delete_resource(ReqData, Context) ->
VHost = rabbit_mgmt_util:vhost(ReqData),
QName = rabbit_mgmt_util:id(queue, ReqData),
Res = rabbit_mgmt_util:with_decode(
[node], ReqData, Context,
fun([NewReplicaNode], _Body, _ReqData) ->
rabbit_amqqueue:with(
rabbit_misc:r(VHost, queue, QName),
fun(_Q) ->
rabbit_quorum_queue:delete_member(VHost, QName, rabbit_data_coercion:to_atom(NewReplicaNode))
end)
end),
case Res of
ok ->
{true, ReqData, Context};
{ok, _} ->
{true, ReqData, Context};
{error, Reason} ->
rabbit_mgmt_util:service_unavailable(Reason, ReqData, Context)
end.
delete_completed(ReqData, Context) ->
%% return 202 Accepted since this is an inherently asynchronous operation
{false, ReqData, Context}.
is_authorized(ReqData, Context) ->
rabbit_mgmt_util:is_authorized_admin(ReqData, Context).

View File

@ -0,0 +1,53 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_mgmt_wm_quorum_queue_replicas_grow).
-export([init/2, is_authorized/2, allowed_methods/2,
content_types_accepted/2, resource_exists/2, accept_content/2]).
-export([variances/2]).
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-define(TIMEOUT, 30_000).
init(Req, _State) ->
{cowboy_rest, rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE), #context{}}.
variances(Req, Context) ->
{[<<"accept-encoding">>, <<"origin">>], Req, Context}.
allowed_methods(ReqData, Context) ->
{[<<"POST">>, <<"OPTIONS">>], ReqData, Context}.
content_types_accepted(ReqData, Context) ->
{[{'*', accept_content}], ReqData, Context}.
resource_exists(ReqData, Context) ->
case rabbit_mgmt_util:id(node, ReqData) of
none -> {false, ReqData, Context};
Node ->
NodeExists = lists:member(rabbit_data_coercion:to_atom(Node), rabbit_nodes:list_running()),
{NodeExists, ReqData, Context}
end.
accept_content(ReqData, Context) ->
NewReplicaNode = rabbit_mgmt_util:id(node, ReqData),
rabbit_mgmt_util:with_decode(
[vhost_pattern, queue_pattern, strategy], ReqData, Context,
fun([VHPattern, QPattern, Strategy], _Body, _ReqData) ->
rabbit_quorum_queue:grow(
rabbit_data_coercion:to_atom(NewReplicaNode),
VHPattern,
QPattern,
rabbit_data_coercion:to_atom(Strategy))
end),
{true, ReqData, Context}.
is_authorized(ReqData, Context) ->
rabbit_mgmt_util:is_authorized_admin(ReqData, Context).

View File

@ -0,0 +1,38 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_mgmt_wm_quorum_queue_replicas_shrink).
-export([init/2, is_authorized/2, allowed_methods/2,
content_types_accepted/2, delete_resource/2, delete_completed/2]).
-export([variances/2]).
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
init(Req, _State) ->
{cowboy_rest, rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE), #context{}}.
variances(Req, Context) ->
{[<<"accept-encoding">>, <<"origin">>], Req, Context}.
allowed_methods(ReqData, Context) ->
{[<<"DELETE">>, <<"OPTIONS">>], ReqData, Context}.
content_types_accepted(ReqData, Context) ->
{[{'*', accept_content}], ReqData, Context}.
delete_resource(ReqData, Context) ->
NodeToRemove = rabbit_mgmt_util:id(node, ReqData),
_ = rabbit_quorum_queue:shrink_all(rabbit_data_coercion:to_atom(NodeToRemove)),
{true, ReqData, Context}.
delete_completed(ReqData, Context) ->
%% return 202 Accepted since this is an inherently asynchronous operation
{false, ReqData, Context}.
is_authorized(ReqData, Context) ->
rabbit_mgmt_util:is_authorized_admin(ReqData, Context).

View File

@ -15,7 +15,7 @@
-include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl").
-import(rabbit_ct_broker_helpers, [get_node_config/3, restart_node/2]).
-import(rabbit_mgmt_test_util, [http_get/2, http_put/4, http_delete/3]).
-import(rabbit_mgmt_test_util, [http_get/2, http_put/4, http_post/4, http_delete/3, http_delete/4]).
-import(rabbit_misc, [pget/2]).
-compile(nowarn_export_all).
@ -53,7 +53,11 @@ groups() ->
vhosts,
nodes,
overview,
disable_plugin
disable_plugin,
qq_replicas_add,
qq_replicas_delete,
qq_replicas_grow,
qq_replicas_shrink
]}
].
@ -225,6 +229,91 @@ ha_queue_with_multiple_consumers(Config) ->
ok.
qq_replicas_add(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
{ok, Chan} = amqp_connection:open_channel(Conn),
_ = queue_declare_quorum(Chan, <<"qq.22">>),
_ = wait_for_queue(Config, "/queues/%2F/qq.22"),
Nodename1 = rabbit_data_coercion:to_binary(get_node_config(Config, 1, nodename)),
Body = [{node, Nodename1}],
http_post(Config, "/queues/quorum/%2F/qq.22/replicas/add", Body, ?NO_CONTENT),
http_delete(Config, "/queues/%2F/qq.22", ?NO_CONTENT),
amqp_channel:close(Chan),
rabbit_ct_client_helpers:close_connection(Conn),
ok.
qq_replicas_delete(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
{ok, Chan} = amqp_connection:open_channel(Conn),
_ = queue_declare_quorum(Chan, <<"qq.23">>),
_ = wait_for_queue(Config, "/queues/%2F/qq.23"),
Nodename1 = rabbit_data_coercion:to_binary(get_node_config(Config, 1, nodename)),
Body = [{node, Nodename1}],
http_post(Config, "/queues/quorum/%2F/qq.23/replicas/add", Body, ?NO_CONTENT),
timer:sleep(1100),
http_delete(Config, "/queues/quorum/%2F/qq.23/replicas/delete", ?ACCEPTED, Body),
timer:sleep(1100),
http_delete(Config, "/queues/%2F/qq.23", ?NO_CONTENT),
amqp_channel:close(Chan),
rabbit_ct_client_helpers:close_connection(Conn),
ok.
qq_replicas_grow(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
{ok, Chan} = amqp_connection:open_channel(Conn),
_ = queue_declare_quorum(Chan, <<"qq.24">>),
_ = wait_for_queue(Config, "/queues/%2F/qq.24"),
Nodename1 = rabbit_data_coercion:to_list(get_node_config(Config, 1, nodename)),
Body = [
{strategy, <<"all">>},
{queue_pattern, <<"qq.24">>},
{vhost_pattern, <<".*">>}
],
http_post(Config, "/queues/quorum/replicas/on/" ++ Nodename1 ++ "/grow", Body, ?NO_CONTENT),
timer:sleep(1100),
http_delete(Config, "/queues/%2F/qq.24", ?NO_CONTENT),
amqp_channel:close(Chan),
rabbit_ct_client_helpers:close_connection(Conn),
ok.
qq_replicas_shrink(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
{ok, Chan} = amqp_connection:open_channel(Conn),
_ = queue_declare_quorum(Chan, <<"qq.24">>),
_ = wait_for_queue(Config, "/queues/%2F/qq.24"),
Nodename1 = rabbit_data_coercion:to_list(get_node_config(Config, 1, nodename)),
Body = [
{strategy, <<"all">>},
{queue_pattern, <<"qq.24">>},
{vhost_pattern, <<".*">>}
],
http_post(Config, "/queues/quorum/replicas/on/" ++ Nodename1 ++ "/grow", Body, ?NO_CONTENT),
timer:sleep(1100),
http_delete(Config, "/queues/quorum/replicas/on/" ++ Nodename1 ++ "/shrink", ?ACCEPTED),
timer:sleep(1100),
http_delete(Config, "/queues/%2F/qq.24", ?NO_CONTENT),
amqp_channel:close(Chan),
rabbit_ct_client_helpers:close_connection(Conn),
ok.
queue_on_other_node(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
{ok, Chan} = amqp_connection:open_channel(Conn),
@ -787,6 +876,18 @@ queue_declare_durable(Chan, Name) ->
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Chan, Declare),
Q.
queue_declare_quorum(Chan, Name) ->
Declare = #'queue.declare'{
queue = Name,
durable = true,
arguments = [
{<<"x-queue-type">>, longstr, <<"quorum">>}
]
},
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Chan, Declare),
Q.
queue_bind(Chan, Ex, Q, Key) ->
Binding = #'queue.bind'{queue = Q,
exchange = Ex,

View File

@ -913,6 +913,10 @@ rabbitmq_management:
- rabbit_mgmt_wm_queue_get
- rabbit_mgmt_wm_queue_purge
- rabbit_mgmt_wm_queues
- rabbit_mgmt_wm_quorum_queue_replicas_add_member
- rabbit_mgmt_wm_quorum_queue_replicas_delete_member
- rabbit_mgmt_wm_quorum_queue_replicas_grow
- rabbit_mgmt_wm_quorum_queue_replicas_shrink
- rabbit_mgmt_wm_rebalance_queues
- rabbit_mgmt_wm_redirect
- rabbit_mgmt_wm_reset