Merge pull request #14079 from rabbitmq/ik-internal-shovels
Trigger a 4.2.x alpha release build / trigger_alpha_build (push) Waiting to run Details
Test (make) / Build and Xref (1.17, 26) (push) Waiting to run Details
Test (make) / Build and Xref (1.17, 27) (push) Waiting to run Details
Test (make) / Test (1.17, 27, khepri) (push) Waiting to run Details
Test (make) / Test (1.17, 27, mnesia) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.17, 27, khepri) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.17, 27, mnesia) (push) Waiting to run Details
Test (make) / Type check (1.17, 27) (push) Waiting to run Details
Test Authentication/Authorization backends via mutiple messaging protocols / selenium (chrome, 1.17.3, 27.3) (push) Has been cancelled Details
Test Management UI with Selenium / selenium (chrome, 1.17.3, 27.3) (push) Has been cancelled Details
Test Authentication/Authorization backends via mutiple messaging protocols / summary-selenium (push) Has been cancelled Details

Internal/Protected shovels
This commit is contained in:
Michael Klishin 2025-06-16 14:31:47 +04:00 committed by GitHub
commit c6e2405366
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 229 additions and 58 deletions

View File

@ -1094,3 +1094,15 @@ function fmt_deprecation_phase(phase, deprecation_phases){
} }
} }
} }
function fmt_resource(res) {
return `${res.kind} '${res.name}' in vhost '${res.virtual_host}'`;
}
function fmt_resource_link(res) {
if (res.kind == "queue") {
return `${res.kind} '${link_queue(res.virtual_host, res.name, {})}' in vhost '${link_vhost(res.virtual_host)}'`;
} else if (res.kind == "exchange") {
return `${res.kind} '${link_exchange(res.virtual_host, res.name, {})}' in vhost '${link_vhost(res.virtual_host)}'`;
}
}

View File

@ -8,6 +8,7 @@
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand'). -module('Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand').
-include("rabbit_shovel.hrl"). -include("rabbit_shovel.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). -behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
@ -31,7 +32,7 @@
%% Callbacks %% Callbacks
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
usage() -> usage() ->
<<"delete_shovel [--vhost <vhost>] <name>">>. <<"delete_shovel [--vhost <vhost>] [--force] <name>">>.
usage_additional() -> usage_additional() ->
[ [
@ -49,20 +50,24 @@ help_section() ->
validate([], _Opts) -> validate([], _Opts) ->
{validation_failure, not_enough_args}; {validation_failure, not_enough_args};
validate([_, _ | _], _Opts) -> validate([_, _| _], _Opts) ->
{validation_failure, too_many_args}; {validation_failure, too_many_args};
validate([_], _Opts) -> validate([_], _Opts) ->
ok. ok.
merge_defaults(A, Opts) -> merge_defaults(A, Opts) ->
{A, maps:merge(#{vhost => <<"/">>}, Opts)}. {A, maps:merge(#{vhost => <<"/">>,
force => false}, Opts)}.
banner([Name], #{vhost := VHost}) -> banner([Name], #{vhost := VHost}) ->
erlang:list_to_binary(io_lib:format("Deleting shovel ~ts in vhost ~ts", erlang:list_to_binary(io_lib:format("Deleting shovel ~ts in vhost ~ts",
[Name, VHost])). [Name, VHost])).
run([Name], #{node := Node, vhost := VHost}) -> run([Name], #{node := Node, vhost := VHost, force := Force}) ->
ActingUser = 'Elixir.RabbitMQ.CLI.Core.Helpers':cli_acting_user(), ActingUser = case Force of
true -> ?INTERNAL_USER;
false -> 'Elixir.RabbitMQ.CLI.Core.Helpers':cli_acting_user()
end,
case rabbit_misc:rpc_call(Node, rabbit_shovel_status, cluster_status_with_nodes, []) of case rabbit_misc:rpc_call(Node, rabbit_shovel_status, cluster_status_with_nodes, []) of
{badrpc, _} = Error -> {badrpc, _} = Error ->
@ -98,7 +103,7 @@ delete_shovel(ErrMsg, VHost, Name, ActingUser, Opts, Node) ->
end. end.
switches() -> switches() ->
[]. [{force, boolean}].
aliases() -> aliases() ->
[]. [].

View File

@ -26,6 +26,8 @@
src_decl_exchange/4, src_decl_queue/4, src_check_queue/4, src_decl_exchange/4, src_decl_queue/4, src_check_queue/4,
fields_fun/5, props_fun/9]). fields_fun/5, props_fun/9]).
-export([is_internal/1, internal_owner/1]).
-import(rabbit_misc, [pget/2, pget/3, pset/3]). -import(rabbit_misc, [pget/2, pget/3, pset/3]).
-rabbit_boot_step({?MODULE, -rabbit_boot_step({?MODULE,
@ -69,6 +71,17 @@ notify_clear(VHost, <<"shovel">>, Name, _Username) ->
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
is_internal(Def) ->
pget(<<"internal">>, Def, false).
internal_owner(Def) ->
case pget(<<"internal_owner">>, Def, undefined) of
undefined -> undefined;
Owner -> rabbit_misc:r(pget(<<"virtual_host">>, Owner),
binary_to_existing_atom(pget(<<"kind">>, Owner)),
pget(<<"name">>, Owner))
end.
validate_src(Def) -> validate_src(Def) ->
case protocols(Def) of case protocols(Def) of
{amqp091, _} -> validate_amqp091_src(Def); {amqp091, _} -> validate_amqp091_src(Def);
@ -112,7 +125,9 @@ validate_amqp091_dest(Def) ->
end]. end].
shovel_validation() -> shovel_validation() ->
[{<<"reconnect-delay">>, fun rabbit_parameter_validation:number/2,optional}, [{<<"internal">>, fun rabbit_parameter_validation:boolean/2, optional},
{<<"internal_owner">>, fun validate_internal_owner/2, optional},
{<<"reconnect-delay">>, fun rabbit_parameter_validation:number/2,optional},
{<<"ack-mode">>, rabbit_parameter_validation:enum( {<<"ack-mode">>, rabbit_parameter_validation:enum(
['no-ack', 'on-publish', 'on-confirm']), optional}, ['no-ack', 'on-publish', 'on-confirm']), optional},
{<<"src-protocol">>, {<<"src-protocol">>,
@ -233,6 +248,14 @@ validate_delete_after(Name, Term) ->
{error, "~ts should be a number greater than or equal to 0, \"never\" or \"queue-length\", actually was " {error, "~ts should be a number greater than or equal to 0, \"never\" or \"queue-length\", actually was "
"~tp", [Name, Term]}. "~tp", [Name, Term]}.
validate_internal_owner(Name, Term0) ->
Term = rabbit_data_coercion:to_proplist(Term0),
rabbit_parameter_validation:proplist(Name, [{<<"name">>, fun rabbit_parameter_validation:binary/2},
{<<"kind">>, rabbit_parameter_validation:enum(
['exchange', 'queue'])},
{<<"virtual_host">>, fun rabbit_parameter_validation:binary/2}], Term).
validate_queue_args(Name, Term0) -> validate_queue_args(Name, Term0) ->
Term = rabbit_data_coercion:to_proplist(Term0), Term = rabbit_data_coercion:to_proplist(Term0),

View File

@ -14,6 +14,7 @@
get_shovel_parameter/1]). get_shovel_parameter/1]).
-include_lib("rabbit_common/include/rabbit_framing.hrl"). -include_lib("rabbit_common/include/rabbit_framing.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-define(ROUTING_HEADER, <<"x-shovelled">>). -define(ROUTING_HEADER, <<"x-shovelled">>).
-define(TIMESTAMP_HEADER, <<"x-shovelled-timestamp">>). -define(TIMESTAMP_HEADER, <<"x-shovelled-timestamp">>).
@ -45,8 +46,41 @@ delete_shovel(VHost, Name, ActingUser) ->
ok = rabbit_runtime_parameters:clear(VHost, <<"shovel">>, Name, ActingUser), ok = rabbit_runtime_parameters:clear(VHost, <<"shovel">>, Name, ActingUser),
{error, not_found}; {error, not_found};
_Obj -> _Obj ->
rabbit_log:info("Will delete runtime parameters of shovel '~ts' in virtual host '~ts'", [Name, VHost]), ShovelParameters = rabbit_runtime_parameters:value(VHost, <<"shovel">>, Name),
ok = rabbit_runtime_parameters:clear(VHost, <<"shovel">>, Name, ActingUser) case needs_force_delete(ShovelParameters, ActingUser) of
false ->
rabbit_log:info("Will delete runtime parameters of shovel '~ts' in virtual host '~ts'", [Name, VHost]),
ok = rabbit_runtime_parameters:clear(VHost, <<"shovel">>, Name, ActingUser);
true ->
report_that_protected_shovel_cannot_be_deleted(Name, VHost, ShovelParameters)
end
end.
-spec report_that_protected_shovel_cannot_be_deleted(binary(), binary(), map() | [tuple()]) -> no_return().
report_that_protected_shovel_cannot_be_deleted(Name, VHost, ShovelParameters) ->
case rabbit_shovel_parameters:internal_owner(ShovelParameters) of
undefined ->
rabbit_misc:protocol_error(
resource_locked,
"Cannot delete protected shovel '~ts' in virtual host '~ts'.",
[Name, VHost]);
IOwner ->
rabbit_misc:protocol_error(
resource_locked,
"Cannot delete protected shovel '~ts' in virtual host '~ts'. It was "
"declared as protected, delete it with --force or delete its owner entity instead: ~ts",
[Name, VHost, rabbit_misc:rs(IOwner)])
end.
needs_force_delete(Parameters,ActingUser) ->
case rabbit_shovel_parameters:is_internal(Parameters) of
false ->
false;
true ->
case ActingUser of
?INTERNAL_USER -> false;
_ -> true
end
end. end.
restart_shovel(VHost, Name) -> restart_shovel(VHost, Name) ->

View File

@ -24,7 +24,9 @@ groups() ->
[ [
{non_parallel_tests, [], [ {non_parallel_tests, [], [
delete_not_found, delete_not_found,
delete delete,
delete_internal,
delete_internal_owner
]}, ]},
{cluster_size_2, [], [ {cluster_size_2, [], [
clear_param_on_different_node clear_param_on_different_node
@ -73,7 +75,7 @@ end_per_testcase(Testcase, Config) ->
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
delete_not_found(Config) -> delete_not_found(Config) ->
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Opts = #{node => A, vhost => <<"/">>}, Opts = #{node => A, vhost => <<"/">>, force => false},
{error, _} = ?CMD:run([<<"myshovel">>], Opts). {error, _} = ?CMD:run([<<"myshovel">>], Opts).
delete(Config) -> delete(Config) ->
@ -82,10 +84,55 @@ delete(Config) ->
<<"myshovel">>, [{<<"src-queue">>, <<"src">>}, <<"myshovel">>, [{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>}]), {<<"dest-queue">>, <<"dest">>}]),
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Opts = #{node => A, vhost => <<"/">>}, Opts = #{node => A, vhost => <<"/">>, force => false},
ok = ?CMD:run([<<"myshovel">>], Opts), ok = ?CMD:run([<<"myshovel">>], Opts),
[] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, [] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status,
status, []). status, []).
delete_internal(Config) ->
shovel_test_utils:set_param(
Config,
<<"myshovel">>, [{<<"src-queue">>, <<"src">>},
{<<"internal">>, true},
{<<"dest-queue">>, <<"dest">>}]),
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Opts = #{node => A, vhost => <<"/">>, force => false},
{badrpc,
{'EXIT',
{amqp_error, resource_locked,
"Cannot delete protected shovel 'myshovel' in virtual host '/'.",
none}}} = ?CMD:run([<<"myshovel">>], Opts),
[_] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status,
status, []),
ForceOpts = #{node => A, vhost => <<"/">>, force => true},
ok = ?CMD:run([<<"myshovel">>], ForceOpts),
[] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status,
status, []).
delete_internal_owner(Config) ->
shovel_test_utils:set_param(
Config,
<<"myshovel">>, [{<<"src-queue">>, <<"src">>},
{<<"internal">>, true},
{<<"internal_owner">>, [{<<"name">>, <<"src">>},
{<<"kind">>, <<"queue">>},
{<<"virtual_host">>, <<"/">>}]},
{<<"dest-queue">>, <<"dest">>}]),
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Opts = #{node => A, vhost => <<"/">>, force => false},
?assertMatch(
{badrpc, {'EXIT', {amqp_error, resource_locked, _, none}}},
?CMD:run([<<"myshovel">>], Opts)
),
[_] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status,
status, []),
ForceOpts = #{node => A, vhost => <<"/">>, force => true},
ok = ?CMD:run([<<"myshovel">>], ForceOpts),
[] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status,
status, []).
clear_param_on_different_node(Config) -> clear_param_on_different_node(Config) ->
shovel_test_utils:set_param( shovel_test_utils:set_param(
Config, Config,

View File

@ -206,6 +206,27 @@ function fmt_shovel_endpoint(prefix, shovel) {
return txt; return txt;
} }
function is_internal_shovel(shovel) {
if (!shovel.hasOwnProperty('internal')) {
return false;
} else {
return shovel['internal'];
}
}
function shovel_has_internal_owner(shovel) {
if (!shovel.hasOwnProperty('internal_owner')) {
return false;
} else {
return true;
}
}
function shovel_internal_owner(shovel) {
return shovel.internal_owner;
}
function fallback_value(shovel, key1, key2) { function fallback_value(shovel, key1, key2) {
var v = shovel.value[key1]; var v = shovel.value[key1];
return (v !== undefined ? v : shovel.value[key2]); return (v !== undefined ? v : shovel.value[key2]);

View File

@ -44,14 +44,23 @@
</div> </div>
</div> </div>
<div class="section-hidden">
<div class="section-hidden">
<h2>Delete this shovel</h2> <h2>Delete this shovel</h2>
<div class="hider"> <div class="hider">
<form action="#/shovel-parameters" method="delete" class="confirm"> <% if (!is_internal_shovel(shovel.value)) { %>
<input type="hidden" name="component" value="shovel"/> <form action="#/shovel-parameters" method="delete" class="confirm">
<input type="hidden" name="vhost" value="<%= fmt_string(shovel.vhost) %>"/> <input type="hidden" name="component" value="shovel"/>
<input type="hidden" name="name" value="<%= fmt_string(shovel.name) %>"/> <input type="hidden" name="vhost" value="<%= fmt_string(shovel.vhost) %>"/>
<input type="submit" value="Delete this shovel"/> <input type="hidden" name="name" value="<%= fmt_string(shovel.name) %>"/>
</form> <input type="submit" value="Delete this shovel"/>
</form>
<% } else { %>
<% if (shovel_has_internal_owner(shovel.value)) { %>
<span>This shovel is internal and owned by <%= fmt_resource_link(shovel_internal_owner(shovel.value)) %>. Could be deleted only via CLI command with --force.</span>
<% } else { %>
<span>This shovel is internal. Could be deleted only via CLI command with '--force'.</span>
<% } %>
<% } %>
</div>
</div> </div>
</div>

View File

@ -79,45 +79,60 @@ is_authorized(ReqData, Context) ->
delete_resource(ReqData, #context{user = #user{username = Username}}=Context) -> delete_resource(ReqData, #context{user = #user{username = Username}}=Context) ->
VHost = rabbit_mgmt_util:id(vhost, ReqData), VHost = rabbit_mgmt_util:id(vhost, ReqData),
Reply = case rabbit_mgmt_util:id(name, ReqData) of case rabbit_mgmt_util:id(name, ReqData) of
none -> none ->
false; {false, ReqData, Context};
Name -> Name ->
case get_shovel_node(VHost, Name, ReqData, Context) of case get_shovel_node(VHost, Name, ReqData, Context) of
undefined -> rabbit_log:error("Could not find shovel data for shovel '~ts' in vhost: '~ts'", [Name, VHost]), undefined -> rabbit_log:error("Could not find shovel data for shovel '~ts' in vhost: '~ts'", [Name, VHost]),
case is_restart(ReqData) of case is_restart(ReqData) of
true -> true ->
false; {false, ReqData, Context};
%% this is a deletion attempt %% this is a deletion attempt
false -> false ->
%% if we do not know the node, use the local one %% if we do not know the node, use the local one
try_delete(node(), VHost, Name, Username), case try_delete(node(), VHost, Name, Username) of
true true -> {true, ReqData, Context};
%% NOTE: that how it was before, try_delete return was ignored and true returned ¯\_()_/¯
false -> {true, ReqData, Context};
locked -> Reply = cowboy_req:reply(405, #{<<"content-type">> => <<"text/plain">>},
"Protected", ReqData),
{halt, Reply, Context};
%% NOTE: that how it was before, try_delete return was ignored and true returned ¯\_()_/¯
error -> {true, ReqData, Context}
end
end;
Node ->
%% We must distinguish between a delete and a restart
case is_restart(ReqData) of
true ->
rabbit_log:info("Asked to restart shovel '~ts' in vhost '~ts' on node '~s'", [Name, VHost, Node]),
try erpc:call(Node, rabbit_shovel_util, restart_shovel, [VHost, Name], ?SHOVEL_CALLS_TIMEOUT_MS) of
ok -> {true, ReqData, Context};
{error, not_found} ->
rabbit_log:error("Could not find shovel data for shovel '~s' in vhost: '~s'", [Name, VHost]),
{false, ReqData, Context}
catch _:Reason ->
rabbit_log:error("Failed to restart shovel '~s' on vhost '~s', reason: ~p",
[Name, VHost, Reason]),
{false, ReqData, Context}
end; end;
Node ->
%% We must distinguish between a delete and a restart
case is_restart(ReqData) of
true ->
rabbit_log:info("Asked to restart shovel '~ts' in vhost '~ts' on node '~s'", [Name, VHost, Node]),
try erpc:call(Node, rabbit_shovel_util, restart_shovel, [VHost, Name], ?SHOVEL_CALLS_TIMEOUT_MS) of
ok -> true;
{error, not_found} ->
rabbit_log:error("Could not find shovel data for shovel '~s' in vhost: '~s'", [Name, VHost]),
false
catch _:Reason ->
rabbit_log:error("Failed to restart shovel '~s' on vhost '~s', reason: ~p",
[Name, VHost, Reason]),
false
end;
_ ->
try_delete(Node, VHost, Name, Username),
true
_ ->
case try_delete(Node, VHost, Name, Username) of
true -> {true, ReqData, Context};
%% NOTE: that how it was before, try_delete return was ignored and true returned ¯\_()_/¯
false -> {true, ReqData, Context};
locked -> Reply = cowboy_req:reply(405, #{<<"content-type">> => <<"text/plain">>},
"Protected", ReqData),
{halt, Reply, Context};
%% NOTE: that how it was before, try_delete return was ignored and true returned ¯\_()_/¯
error -> {true, ReqData, Context}
end end
end end
end, end
{Reply, ReqData, Context}. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -168,7 +183,7 @@ find_matching_shovel(VHost, Name, Shovels) ->
undefined undefined
end. end.
-spec try_delete(node(), vhost:name(), any(), rabbit_types:username()) -> boolean(). -spec try_delete(node(), vhost:name(), any(), rabbit_types:username()) -> true | false | locked | error.
try_delete(Node, VHost, Name, Username) -> try_delete(Node, VHost, Name, Username) ->
rabbit_log:info("Asked to delete shovel '~ts' in vhost '~ts' on node '~s'", [Name, VHost, Node]), rabbit_log:info("Asked to delete shovel '~ts' in vhost '~ts' on node '~s'", [Name, VHost, Node]),
%% this will clear the runtime parameter, the ultimate way of deleting a dynamic Shovel eventually. MK. %% this will clear the runtime parameter, the ultimate way of deleting a dynamic Shovel eventually. MK.
@ -177,8 +192,13 @@ try_delete(Node, VHost, Name, Username) ->
{error, not_found} -> {error, not_found} ->
rabbit_log:error("Could not find shovel data for shovel '~s' in vhost: '~s'", [Name, VHost]), rabbit_log:error("Could not find shovel data for shovel '~s' in vhost: '~s'", [Name, VHost]),
false false
catch _:Reason -> catch
_:{exception, {amqp_error, resource_locked, Reason, _}} ->
rabbit_log:error("Failed to delete shovel '~s' on vhost '~s', reason: ~p", rabbit_log:error("Failed to delete shovel '~s' on vhost '~s', reason: ~p",
[Name, VHost, Reason]), [Name, VHost, Reason]),
false locked;
_:Reason ->
rabbit_log:error("Failed to delete shovel '~s' on vhost '~s', reason: ~p",
[Name, VHost, Reason]),
error
end. end.