Check if queue protected from deleted inside rabbit_amqqueue:with_delete

If queue is indeed protected its removal can be forced by calling  with
.
This commit is contained in:
Iliia Khaprov 2025-03-03 23:25:30 +01:00 committed by Iliia Khaprov
parent 3c938aa3d5
commit 72b53437f2
2 changed files with 71 additions and 10 deletions

View File

@ -61,6 +61,10 @@
is_exclusive/1, is_exclusive/1,
is_classic/1, is_classic/1,
is_quorum/1, is_quorum/1,
is_internal/1,
internal_owner/1,
make_internal/1,
make_internal/2,
pattern_match_all/0, pattern_match_all/0,
pattern_match_on_name/1, pattern_match_on_name/1,
pattern_match_on_type/1, pattern_match_on_type/1,
@ -76,6 +80,8 @@
-define(is_backwards_compat_classic(T), -define(is_backwards_compat_classic(T),
(T =:= classic orelse T =:= ?amqqueue_v1_type)). (T =:= classic orelse T =:= ?amqqueue_v1_type)).
-type amqqueue_options() :: map() | ets:match_pattern().
-record(amqqueue, { -record(amqqueue, {
%% immutable %% immutable
name :: rabbit_amqqueue:name() | ets:match_pattern(), name :: rabbit_amqqueue:name() | ets:match_pattern(),
@ -106,7 +112,7 @@
slave_pids_pending_shutdown = [], %% reserved slave_pids_pending_shutdown = [], %% reserved
%% secondary index %% secondary index
vhost :: rabbit_types:vhost() | undefined | ets:match_pattern(), vhost :: rabbit_types:vhost() | undefined | ets:match_pattern(),
options = #{} :: map() | ets:match_pattern(), options = #{} :: amqqueue_options(),
type = ?amqqueue_v1_type :: module() | ets:match_pattern(), type = ?amqqueue_v1_type :: module() | ets:match_pattern(),
type_state = #{} :: map() | ets:match_pattern() type_state = #{} :: map() | ets:match_pattern()
}). }).
@ -349,6 +355,19 @@ get_arguments(#amqqueue{arguments = Args}) ->
set_arguments(#amqqueue{} = Queue, Args) -> set_arguments(#amqqueue{} = Queue, Args) ->
Queue#amqqueue{arguments = Args}. Queue#amqqueue{arguments = Args}.
% options
-spec get_options(amqqueue()) -> amqqueue_options().
get_options(#amqqueue{options = Options}) ->
Options.
-spec set_options(amqqueue(), amqqueue_options()) -> amqqueue().
set_options(#amqqueue{} = Queue, Options) ->
Queue#amqqueue{options = Options}.
% decorators % decorators
-spec get_decorators(amqqueue()) -> [atom()] | none | undefined. -spec get_decorators(amqqueue()) -> [atom()] | none | undefined.
@ -394,15 +413,6 @@ get_name(#amqqueue{name = Name}) -> Name.
set_name(#amqqueue{} = Queue, Name) -> set_name(#amqqueue{} = Queue, Name) ->
Queue#amqqueue{name = Name}. Queue#amqqueue{name = Name}.
-spec get_options(amqqueue()) -> map().
get_options(#amqqueue{options = Options}) -> Options.
-spec set_options(amqqueue(), map()) -> amqqueue().
set_options(#amqqueue{} = Queue, Options) ->
Queue#amqqueue{options = Options}.
% pid % pid
-spec get_pid(amqqueue_v2()) -> pid() | ra_server_id() | none. -spec get_pid(amqqueue_v2()) -> pid() | ra_server_id() | none.
@ -496,6 +506,27 @@ is_classic(Queue) ->
is_quorum(Queue) -> is_quorum(Queue) ->
get_type(Queue) =:= rabbit_quorum_queue. get_type(Queue) =:= rabbit_quorum_queue.
-spec is_internal(amqqueue()) -> boolean().
is_internal(#amqqueue{options = #{internal := true}}) -> true;
is_internal(#amqqueue{}) -> false.
-spec internal_owner(amqqueue()) -> #resource{}.
internal_owner(#amqqueue{options = #{internal := true,
internal_owner := IOwner}}) ->
IOwner;
internal_owner(#amqqueue{}) ->
undefined.
make_internal(Q = #amqqueue{options = Options}) when is_map(Options) ->
Q#amqqueue{options = maps:merge(Options, #{internal => true,
internal_owner => undefined})}.
make_internal(Q = #amqqueue{options = Options}, Owner)
when is_map(Options) andalso is_record(Owner, resource) ->
Q#amqqueue{options = maps:merge(Options, #{internal => true,
interna_owner => Owner})}.
fields() -> fields() ->
fields(?record_version). fields(?record_version).

View File

@ -820,6 +820,35 @@ check_exclusive_access(Q, _ReaderPid, _MatchType) ->
"match that of the original declaration.", "match that of the original declaration.",
[rabbit_misc:rs(QueueName)]). [rabbit_misc:rs(QueueName)]).
-spec check_internal(amqqueue:amqqueue(), rabbit_types:username()) ->
'ok' | rabbit_types:channel_exit().
check_internal(Q, Username) ->
case amqqueue:is_internal(Q) of
true ->
case Username of
%% note cli delete command uses "cli_user"
?INTERNAL_USER ->
ok;
_ ->
QueueName = amqqueue:get_name(Q),
case amqqueue:internal_owner(Q) of
undefined ->
rabbit_misc:protocol_error(
resource_locked,
"Cannot delete protected ~ts.",
[rabbit_misc:rs(QueueName)]);
IOwner ->
rabbit_misc:protocol_error(
resource_locked,
"Cannot delete protected ~ts. It was "
"declared as an protected and can be deleted only by deleting the owner entity: ~ts",
[rabbit_misc:rs(QueueName), rabbit_misc:rs(IOwner)])
end
end;
false ->
ok
end.
-spec with_exclusive_access_or_die(name(), pid(), qfun(A)) -> -spec with_exclusive_access_or_die(name(), pid(), qfun(A)) ->
A | rabbit_types:channel_exit(). A | rabbit_types:channel_exit().
with_exclusive_access_or_die(Name, ReaderPid, F) -> with_exclusive_access_or_die(Name, ReaderPid, F) ->
@ -1689,6 +1718,7 @@ delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) whe
case with( case with(
QueueName, QueueName,
fun (Q) -> fun (Q) ->
ok = check_internal(Q, Username),
if CheckExclusive -> if CheckExclusive ->
check_exclusive_access(Q, ConnPid); check_exclusive_access(Q, ConnPid);
true -> true ->