RMQ-1263: a mechanism for marking queues as protected (e.g. from deletion) (#44)

* RMQ-1263: Check if queue protected from deleted inside rabbit_amqqueue:with_delete

Delayed exchange automatically manages associated Delayed Queue. We don't want users to delete it accidentally.

If queue is indeed protected its removal can be forced by calling  with
?INTERNAL_USER as ActingUser.

* RMQ-1263: Correct a type spec of amqqueue:internal_owner/1

* RMQ-1263: Add protected queues test

---------

Co-authored-by: Iliia Khaprov <iliia.khaprov@broadcom.net>
Co-authored-by: Michael Klishin <klishinm@vmware.com>
(cherry picked from commit 97f44adfad6d0d98feb1c3a47de76e72694c19e0)
This commit is contained in:
Diana Parra Corbacho 2025-03-13 23:59:47 +01:00 committed by Michael Klishin
parent 3eeb8f9c01
commit c69403e3e9
No known key found for this signature in database
GPG Key ID: FF4F6501646A9C9A
4 changed files with 189 additions and 11 deletions

View File

@ -276,7 +276,7 @@ PARALLEL_CT_SET_3_D = metadata_store_phase1 metrics mirrored_supervisor peer_dis
PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_message_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue
PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit rabbit_fifo_dlx_integration rabbit_fifo_int
PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost
PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info publisher_confirms_parallel queue_type rabbitmq_queues_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing
PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info publisher_confirms_parallel queue_type rabbitmq_queues_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue
PARALLEL_CT_SET_1 = $(sort $(PARALLEL_CT_SET_1_A) $(PARALLEL_CT_SET_1_B) $(PARALLEL_CT_SET_1_C) $(PARALLEL_CT_SET_1_D))
PARALLEL_CT_SET_2 = $(sort $(PARALLEL_CT_SET_2_A) $(PARALLEL_CT_SET_2_B) $(PARALLEL_CT_SET_2_C) $(PARALLEL_CT_SET_2_D))

View File

@ -61,6 +61,10 @@
is_exclusive/1,
is_classic/1,
is_quorum/1,
is_internal/1,
internal_owner/1,
make_internal/1,
make_internal/2,
pattern_match_all/0,
pattern_match_on_name/1,
pattern_match_on_type/1,
@ -78,6 +82,8 @@
-define(is_backwards_compat_classic(T),
(T =:= classic orelse T =:= ?amqqueue_v1_type)).
-type amqqueue_options() :: map() | ets:match_pattern().
-record(amqqueue, {
%% immutable
name :: rabbit_amqqueue:name() | ets:match_pattern(),
@ -108,7 +114,7 @@
slave_pids_pending_shutdown = [], %% reserved
%% secondary index
vhost :: rabbit_types:vhost() | undefined | ets:match_pattern(),
options = #{} :: map() | ets:match_pattern(),
options = #{} :: amqqueue_options(),
type = ?amqqueue_v1_type :: module() | ets:match_pattern(),
type_state = #{} :: map() | ets:match_pattern()
}).
@ -351,6 +357,19 @@ get_arguments(#amqqueue{arguments = Args}) ->
set_arguments(#amqqueue{} = Queue, Args) ->
Queue#amqqueue{arguments = Args}.
% options
-spec get_options(amqqueue()) -> amqqueue_options().
get_options(#amqqueue{options = Options}) ->
Options.
-spec set_options(amqqueue(), amqqueue_options()) -> amqqueue().
set_options(#amqqueue{} = Queue, Options) ->
Queue#amqqueue{options = Options}.
% decorators
-spec get_decorators(amqqueue()) -> [atom()] | none | undefined.
@ -395,15 +414,6 @@ get_name(#amqqueue{name = Name}) -> Name.
set_name(#amqqueue{} = Queue, Name) ->
Queue#amqqueue{name = Name}.
-spec get_options(amqqueue()) -> map().
get_options(#amqqueue{options = Options}) -> Options.
-spec set_options(amqqueue(), map()) -> amqqueue().
set_options(#amqqueue{} = Queue, Options) ->
Queue#amqqueue{options = Options}.
% pid
-spec get_pid(amqqueue_v2()) -> pid() | ra_server_id() | none.
@ -497,6 +507,27 @@ is_classic(Queue) ->
is_quorum(Queue) ->
get_type(Queue) =:= rabbit_quorum_queue.
-spec is_internal(amqqueue()) -> boolean().
is_internal(#amqqueue{options = #{internal := true}}) -> true;
is_internal(#amqqueue{}) -> false.
-spec internal_owner(amqqueue()) -> rabbit_types:option(#resource{}).
internal_owner(#amqqueue{options = #{internal := true,
internal_owner := IOwner}}) ->
IOwner;
internal_owner(#amqqueue{}) ->
undefined.
make_internal(Q = #amqqueue{options = Options}) when is_map(Options) ->
Q#amqqueue{options = maps:merge(Options, #{internal => true,
internal_owner => undefined})}.
make_internal(Q = #amqqueue{options = Options}, Owner)
when is_map(Options) andalso is_record(Owner, resource) ->
Q#amqqueue{options = maps:merge(Options, #{internal => true,
interna_owner => Owner})}.
fields() ->
fields(?record_version).

View File

@ -811,6 +811,35 @@ check_exclusive_access(Q, _ReaderPid, _MatchType) ->
"match that of the original declaration.",
[rabbit_misc:rs(QueueName)]).
-spec check_internal(amqqueue:amqqueue(), rabbit_types:username()) ->
'ok' | rabbit_types:channel_exit().
check_internal(Q, Username) ->
case amqqueue:is_internal(Q) of
true ->
case Username of
%% note cli delete command uses "cli_user"
?INTERNAL_USER ->
ok;
_ ->
QueueName = amqqueue:get_name(Q),
case amqqueue:internal_owner(Q) of
undefined ->
rabbit_misc:protocol_error(
resource_locked,
"Cannot delete protected ~ts.",
[rabbit_misc:rs(QueueName)]);
IOwner ->
rabbit_misc:protocol_error(
resource_locked,
"Cannot delete protected ~ts. It was "
"declared as an protected and can be deleted only by deleting the owner entity: ~ts",
[rabbit_misc:rs(QueueName), rabbit_misc:rs(IOwner)])
end
end;
false ->
ok
end.
-spec with_exclusive_access_or_die(name(), pid(), qfun(A)) ->
A | rabbit_types:channel_exit().
with_exclusive_access_or_die(Name, ReaderPid, F) ->
@ -1681,6 +1710,7 @@ delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) whe
case with(
QueueName,
fun (Q) ->
ok = check_internal(Q, Username),
if CheckExclusive ->
check_exclusive_access(Q, ConnPid);
true ->

View File

@ -0,0 +1,117 @@
-module(rabbit_amqqueue_SUITE).
-compile([export_all, nowarn_export_all]).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
%%%===================================================================
%%% Common Test callbacks
%%%===================================================================
all() ->
[
{group, rabbit_amqqueue_tests}
].
all_tests() ->
[
normal_queue_delete_with,
internal_queue_delete_with
].
groups() ->
[
{rabbit_amqqueue_tests, [], all_tests()}
].
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(_Group, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_broker_helpers:setup_steps()).
end_per_group(_Group, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_client_helpers:setup_steps()).
end_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:run_steps(
Config,
rabbit_ct_client_helpers:teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
%%%===================================================================
%%% Test cases
%%%===================================================================
normal_queue_delete_with(Config) ->
QName = queue_name(Config, <<"normal">>),
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Queue = amqqueue:new(QName,
none, %% pid
true, %% durable
false, %% auto delete
none, %% owner,
[],
<<"/">>,
#{},
rabbit_classic_queue),
?assertMatch({new, _Q}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_queue_type, declare, [Queue, Node])),
?assertMatch({ok, _}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, delete_with, [QName, false, false, <<"dummy">>])),
?assertMatch({error, not_found}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [QName])),
ok.
internal_queue_delete_with(Config) ->
QName = queue_name(Config, <<"internal_protected">>),
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Queue = amqqueue:new(QName,
none, %% pid
true, %% durable
false, %% auto delete
none, %% owner,
[],
<<"/">>,
#{},
rabbit_classic_queue),
IQueue = amqqueue:make_internal(Queue, rabbit_misc:r(<<"/">>, exchange, <<"amq.default">>)),
?assertMatch({new, _Q}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_queue_type, declare, [IQueue, Node])),
?assertException(exit, {exception,
{amqp_error, resource_locked,
"Cannot delete protected queue 'rabbit_amqqueue_tests/internal_protected' in vhost '/'.",
none}}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, delete_with, [QName, false, false, <<"dummy">>])),
?assertMatch({ok, _}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [QName])),
?assertMatch({ok, _}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, delete_with, [QName, false, false, ?INTERNAL_USER])),
?assertMatch({error, not_found}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [QName])),
ok.
%% Utility
queue_name(Config, Name) ->
Name1 = iolist_to_binary(rabbit_ct_helpers:config_to_testcase_name(Config, Name)),
queue_name(Name1).
queue_name(Name) ->
rabbit_misc:r(<<"/">>, queue, Name).