Like internal/protected queues, but shovels
This commit is contained in:
parent
20f89b2e0b
commit
7a34bf8053
|
@ -8,6 +8,7 @@
|
||||||
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand').
|
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand').
|
||||||
|
|
||||||
-include("rabbit_shovel.hrl").
|
-include("rabbit_shovel.hrl").
|
||||||
|
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||||
|
|
||||||
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
|
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
|
||||||
|
|
||||||
|
@ -31,7 +32,7 @@
|
||||||
%% Callbacks
|
%% Callbacks
|
||||||
%%----------------------------------------------------------------------------
|
%%----------------------------------------------------------------------------
|
||||||
usage() ->
|
usage() ->
|
||||||
<<"delete_shovel [--vhost <vhost>] <name>">>.
|
<<"delete_shovel [--vhost <vhost>] [--force] <name>">>.
|
||||||
|
|
||||||
usage_additional() ->
|
usage_additional() ->
|
||||||
[
|
[
|
||||||
|
@ -49,20 +50,24 @@ help_section() ->
|
||||||
|
|
||||||
validate([], _Opts) ->
|
validate([], _Opts) ->
|
||||||
{validation_failure, not_enough_args};
|
{validation_failure, not_enough_args};
|
||||||
validate([_, _ | _], _Opts) ->
|
validate([_, _| _], _Opts) ->
|
||||||
{validation_failure, too_many_args};
|
{validation_failure, too_many_args};
|
||||||
validate([_], _Opts) ->
|
validate([_], _Opts) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
merge_defaults(A, Opts) ->
|
merge_defaults(A, Opts) ->
|
||||||
{A, maps:merge(#{vhost => <<"/">>}, Opts)}.
|
{A, maps:merge(#{vhost => <<"/">>,
|
||||||
|
force => false}, Opts)}.
|
||||||
|
|
||||||
banner([Name], #{vhost := VHost}) ->
|
banner([Name], #{vhost := VHost}) ->
|
||||||
erlang:list_to_binary(io_lib:format("Deleting shovel ~ts in vhost ~ts",
|
erlang:list_to_binary(io_lib:format("Deleting shovel ~ts in vhost ~ts",
|
||||||
[Name, VHost])).
|
[Name, VHost])).
|
||||||
|
|
||||||
run([Name], #{node := Node, vhost := VHost}) ->
|
run([Name], #{node := Node, vhost := VHost, force := Force}) ->
|
||||||
ActingUser = 'Elixir.RabbitMQ.CLI.Core.Helpers':cli_acting_user(),
|
ActingUser = case Force of
|
||||||
|
true -> ?INTERNAL_USER;
|
||||||
|
false -> 'Elixir.RabbitMQ.CLI.Core.Helpers':cli_acting_user()
|
||||||
|
end,
|
||||||
|
|
||||||
case rabbit_misc:rpc_call(Node, rabbit_shovel_status, cluster_status_with_nodes, []) of
|
case rabbit_misc:rpc_call(Node, rabbit_shovel_status, cluster_status_with_nodes, []) of
|
||||||
{badrpc, _} = Error ->
|
{badrpc, _} = Error ->
|
||||||
|
@ -98,7 +103,7 @@ delete_shovel(ErrMsg, VHost, Name, ActingUser, Opts, Node) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
switches() ->
|
switches() ->
|
||||||
[].
|
[{force, boolean}].
|
||||||
|
|
||||||
aliases() ->
|
aliases() ->
|
||||||
[].
|
[].
|
||||||
|
|
|
@ -26,6 +26,8 @@
|
||||||
src_decl_exchange/4, src_decl_queue/4, src_check_queue/4,
|
src_decl_exchange/4, src_decl_queue/4, src_check_queue/4,
|
||||||
fields_fun/5, props_fun/9]).
|
fields_fun/5, props_fun/9]).
|
||||||
|
|
||||||
|
-export([is_internal/1, internal_owner/1]).
|
||||||
|
|
||||||
-import(rabbit_misc, [pget/2, pget/3, pset/3]).
|
-import(rabbit_misc, [pget/2, pget/3, pset/3]).
|
||||||
|
|
||||||
-rabbit_boot_step({?MODULE,
|
-rabbit_boot_step({?MODULE,
|
||||||
|
@ -69,6 +71,17 @@ notify_clear(VHost, <<"shovel">>, Name, _Username) ->
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
%%----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
is_internal(Def) ->
|
||||||
|
pget(<<"internal">>, Def, false).
|
||||||
|
|
||||||
|
internal_owner(Def) ->
|
||||||
|
case pget(<<"internal_owner">>, Def, undefined) of
|
||||||
|
undefined -> undefined;
|
||||||
|
Owner -> rabbit_misc:r(pget(<<"virtual_host">>, Owner),
|
||||||
|
binary_to_existing_atom(pget(<<"kind">>, Owner)),
|
||||||
|
pget(<<"name">>, Owner))
|
||||||
|
end.
|
||||||
|
|
||||||
validate_src(Def) ->
|
validate_src(Def) ->
|
||||||
case protocols(Def) of
|
case protocols(Def) of
|
||||||
{amqp091, _} -> validate_amqp091_src(Def);
|
{amqp091, _} -> validate_amqp091_src(Def);
|
||||||
|
@ -112,7 +125,9 @@ validate_amqp091_dest(Def) ->
|
||||||
end].
|
end].
|
||||||
|
|
||||||
shovel_validation() ->
|
shovel_validation() ->
|
||||||
[{<<"reconnect-delay">>, fun rabbit_parameter_validation:number/2,optional},
|
[{<<"internal">>, fun rabbit_parameter_validation:boolean/2, optional},
|
||||||
|
{<<"internal_owner">>, fun validate_internal_owner/2, optional},
|
||||||
|
{<<"reconnect-delay">>, fun rabbit_parameter_validation:number/2,optional},
|
||||||
{<<"ack-mode">>, rabbit_parameter_validation:enum(
|
{<<"ack-mode">>, rabbit_parameter_validation:enum(
|
||||||
['no-ack', 'on-publish', 'on-confirm']), optional},
|
['no-ack', 'on-publish', 'on-confirm']), optional},
|
||||||
{<<"src-protocol">>,
|
{<<"src-protocol">>,
|
||||||
|
@ -233,6 +248,14 @@ validate_delete_after(Name, Term) ->
|
||||||
{error, "~ts should be a number greater than or equal to 0, \"never\" or \"queue-length\", actually was "
|
{error, "~ts should be a number greater than or equal to 0, \"never\" or \"queue-length\", actually was "
|
||||||
"~tp", [Name, Term]}.
|
"~tp", [Name, Term]}.
|
||||||
|
|
||||||
|
validate_internal_owner(Name, Term0) ->
|
||||||
|
Term = rabbit_data_coercion:to_proplist(Term0),
|
||||||
|
|
||||||
|
rabbit_parameter_validation:proplist(Name, [{<<"name">>, fun rabbit_parameter_validation:binary/2},
|
||||||
|
{<<"kind">>, rabbit_parameter_validation:enum(
|
||||||
|
['exchange', 'queue'])},
|
||||||
|
{<<"virtual_host">>, fun rabbit_parameter_validation:binary/2}], Term).
|
||||||
|
|
||||||
validate_queue_args(Name, Term0) ->
|
validate_queue_args(Name, Term0) ->
|
||||||
Term = rabbit_data_coercion:to_proplist(Term0),
|
Term = rabbit_data_coercion:to_proplist(Term0),
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
get_shovel_parameter/1]).
|
get_shovel_parameter/1]).
|
||||||
|
|
||||||
-include_lib("rabbit_common/include/rabbit_framing.hrl").
|
-include_lib("rabbit_common/include/rabbit_framing.hrl").
|
||||||
|
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||||
|
|
||||||
-define(ROUTING_HEADER, <<"x-shovelled">>).
|
-define(ROUTING_HEADER, <<"x-shovelled">>).
|
||||||
-define(TIMESTAMP_HEADER, <<"x-shovelled-timestamp">>).
|
-define(TIMESTAMP_HEADER, <<"x-shovelled-timestamp">>).
|
||||||
|
@ -45,8 +46,41 @@ delete_shovel(VHost, Name, ActingUser) ->
|
||||||
ok = rabbit_runtime_parameters:clear(VHost, <<"shovel">>, Name, ActingUser),
|
ok = rabbit_runtime_parameters:clear(VHost, <<"shovel">>, Name, ActingUser),
|
||||||
{error, not_found};
|
{error, not_found};
|
||||||
_Obj ->
|
_Obj ->
|
||||||
rabbit_log:info("Will delete runtime parameters of shovel '~ts' in virtual host '~ts'", [Name, VHost]),
|
ShovelParameters = rabbit_runtime_parameters:value(VHost, <<"shovel">>, Name),
|
||||||
ok = rabbit_runtime_parameters:clear(VHost, <<"shovel">>, Name, ActingUser)
|
case needs_force_delete(ShovelParameters, ActingUser) of
|
||||||
|
false ->
|
||||||
|
rabbit_log:info("Will delete runtime parameters of shovel '~ts' in virtual host '~ts'", [Name, VHost]),
|
||||||
|
ok = rabbit_runtime_parameters:clear(VHost, <<"shovel">>, Name, ActingUser);
|
||||||
|
true ->
|
||||||
|
report_connot_delete_protected_shovel(Name, VHost, ShovelParameters)
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec report_connot_delete_protected_shovel(binary(), binary(), map() | [tuple()]) -> no_return().
|
||||||
|
report_connot_delete_protected_shovel(Name, VHost, ShovelParameters) ->
|
||||||
|
case rabbit_shovel_parameters:internal_owner(ShovelParameters) of
|
||||||
|
undefined ->
|
||||||
|
rabbit_misc:protocol_error(
|
||||||
|
resource_locked,
|
||||||
|
"Cannot delete protected shovel '~ts' in virtual host '~ts'.",
|
||||||
|
[Name, VHost]);
|
||||||
|
IOwner ->
|
||||||
|
rabbit_misc:protocol_error(
|
||||||
|
resource_locked,
|
||||||
|
"Cannot delete protected shovel '~ts' in virtual host '~ts'. It was "
|
||||||
|
"declared as an protected and can be deleted only by deleting the owner entity: ~ts",
|
||||||
|
[Name, VHost, rabbit_misc:rs(IOwner)])
|
||||||
|
end.
|
||||||
|
|
||||||
|
needs_force_delete(Parameters,ActingUser) ->
|
||||||
|
case rabbit_shovel_parameters:is_internal(Parameters) of
|
||||||
|
false ->
|
||||||
|
false;
|
||||||
|
true ->
|
||||||
|
case ActingUser of
|
||||||
|
?INTERNAL_USER -> false;
|
||||||
|
_ -> true
|
||||||
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
restart_shovel(VHost, Name) ->
|
restart_shovel(VHost, Name) ->
|
||||||
|
|
|
@ -24,7 +24,9 @@ groups() ->
|
||||||
[
|
[
|
||||||
{non_parallel_tests, [], [
|
{non_parallel_tests, [], [
|
||||||
delete_not_found,
|
delete_not_found,
|
||||||
delete
|
delete,
|
||||||
|
delete_internal,
|
||||||
|
delete_internal_owner
|
||||||
]},
|
]},
|
||||||
{cluster_size_2, [], [
|
{cluster_size_2, [], [
|
||||||
clear_param_on_different_node
|
clear_param_on_different_node
|
||||||
|
@ -73,7 +75,7 @@ end_per_testcase(Testcase, Config) ->
|
||||||
%% -------------------------------------------------------------------
|
%% -------------------------------------------------------------------
|
||||||
delete_not_found(Config) ->
|
delete_not_found(Config) ->
|
||||||
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||||
Opts = #{node => A, vhost => <<"/">>},
|
Opts = #{node => A, vhost => <<"/">>, force => false},
|
||||||
{error, _} = ?CMD:run([<<"myshovel">>], Opts).
|
{error, _} = ?CMD:run([<<"myshovel">>], Opts).
|
||||||
|
|
||||||
delete(Config) ->
|
delete(Config) ->
|
||||||
|
@ -82,10 +84,56 @@ delete(Config) ->
|
||||||
<<"myshovel">>, [{<<"src-queue">>, <<"src">>},
|
<<"myshovel">>, [{<<"src-queue">>, <<"src">>},
|
||||||
{<<"dest-queue">>, <<"dest">>}]),
|
{<<"dest-queue">>, <<"dest">>}]),
|
||||||
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||||
Opts = #{node => A, vhost => <<"/">>},
|
Opts = #{node => A, vhost => <<"/">>, force => false},
|
||||||
ok = ?CMD:run([<<"myshovel">>], Opts),
|
ok = ?CMD:run([<<"myshovel">>], Opts),
|
||||||
[] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status,
|
[] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status,
|
||||||
status, []).
|
status, []).
|
||||||
|
|
||||||
|
delete_internal(Config) ->
|
||||||
|
shovel_test_utils:set_param(
|
||||||
|
Config,
|
||||||
|
<<"myshovel">>, [{<<"src-queue">>, <<"src">>},
|
||||||
|
{<<"internal">>, true},
|
||||||
|
{<<"dest-queue">>, <<"dest">>}]),
|
||||||
|
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||||
|
Opts = #{node => A, vhost => <<"/">>, force => false},
|
||||||
|
{badrpc,
|
||||||
|
{'EXIT',
|
||||||
|
{amqp_error, resource_locked,
|
||||||
|
"Cannot delete protected shovel 'myshovel' in virtual host '/'.",
|
||||||
|
none}}} = ?CMD:run([<<"myshovel">>], Opts),
|
||||||
|
[_] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status,
|
||||||
|
status, []),
|
||||||
|
|
||||||
|
ForceOpts = #{node => A, vhost => <<"/">>, force => true},
|
||||||
|
ok = ?CMD:run([<<"myshovel">>], ForceOpts),
|
||||||
|
[] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status,
|
||||||
|
status, []).
|
||||||
|
|
||||||
|
delete_internal_owner(Config) ->
|
||||||
|
shovel_test_utils:set_param(
|
||||||
|
Config,
|
||||||
|
<<"myshovel">>, [{<<"src-queue">>, <<"src">>},
|
||||||
|
{<<"internal">>, true},
|
||||||
|
{<<"internal_owner">>, [{<<"name">>, <<"src">>},
|
||||||
|
{<<"kind">>, <<"queue">>},
|
||||||
|
{<<"virtual_host">>, <<"/">>}]},
|
||||||
|
{<<"dest-queue">>, <<"dest">>}]),
|
||||||
|
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||||
|
Opts = #{node => A, vhost => <<"/">>, force => false},
|
||||||
|
{badrpc,
|
||||||
|
{'EXIT',
|
||||||
|
{amqp_error, resource_locked,
|
||||||
|
"Cannot delete protected shovel 'myshovel' in virtual host '/'. "
|
||||||
|
"It was declared as an protected and can be deleted only by deleting the owner entity: queue 'src' in vhost '/'",
|
||||||
|
none}}} = ?CMD:run([<<"myshovel">>], Opts),
|
||||||
|
[_] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status,
|
||||||
|
status, []),
|
||||||
|
|
||||||
|
ForceOpts = #{node => A, vhost => <<"/">>, force => true},
|
||||||
|
ok = ?CMD:run([<<"myshovel">>], ForceOpts),
|
||||||
|
[] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status,
|
||||||
|
status, []).
|
||||||
clear_param_on_different_node(Config) ->
|
clear_param_on_different_node(Config) ->
|
||||||
shovel_test_utils:set_param(
|
shovel_test_utils:set_param(
|
||||||
Config,
|
Config,
|
||||||
|
|
Loading…
Reference in New Issue