Remove checks to vhost-limit as that is now handled by rabbit_queue_type:declare

Add new error return tuple when queue limit is exceed
This commit is contained in:
Simon Unge 2024-05-22 18:24:18 +00:00 committed by David Ansari
parent 698cdc5275
commit 19a751890c
8 changed files with 54 additions and 72 deletions

View File

@ -142,7 +142,6 @@ handle_http_req(HttpMethod = <<"PUT">>,
{ok, Result} ->
Result;
{error, not_found} ->
ok = check_vhost_queue_limit(QName),
PermCache2 = check_dead_letter_exchange(QName, QArgs, User, PermCache1),
case rabbit_amqqueue:declare(
QName, Durable, AutoDelete, QArgs, Owner, Username) of
@ -160,6 +159,10 @@ handle_http_req(HttpMethod = <<"PUT">>,
%% Must have been created in the meantime. Loop around again.
handle_http_req(HttpMethod, PathSegments, Query, ReqPayload,
Vhost, User, ConnPid, {PermCache2, TopicPermCache});
{error, queue_limit_exceeded, Reason, ReasonArgs} ->
throw(<<"403">>,
Reason,
ReasonArgs);
{protocol_error, _ErrorType, Reason, ReasonArgs} ->
throw(<<"400">>, Reason, ReasonArgs)
end;
@ -676,17 +679,6 @@ prohibit_reserved_amq(Res = #resource{name = <<"amq.", _/binary>>}) ->
prohibit_reserved_amq(#resource{}) ->
ok.
check_vhost_queue_limit(QName = #resource{virtual_host = Vhost}) ->
case rabbit_vhost_limit:is_over_queue_limit(Vhost) of
false ->
ok;
{true, Limit} ->
throw(<<"403">>,
"refused to declare ~ts because vhost queue limit ~b is reached",
[rabbit_misc:rs(QName), Limit])
end.
check_dead_letter_exchange(QName = #resource{virtual_host = Vhost}, QArgs, User, PermCache0) ->
case rabbit_misc:r_arg(Vhost, exchange, QArgs, ?DEAD_LETTER_EXCHANGE_KEY) of
undefined ->

View File

@ -71,7 +71,7 @@
-export([init/1,
terminate/2,
handle_call/3,
handle_cast/2,
handle_cast/2,
handle_info/2,
format_status/1]).
@ -2612,7 +2612,6 @@ declare_queue(QNameBin,
PermCache0) ->
QName = rabbit_misc:r(Vhost, queue, QNameBin),
PermCache = check_resource_access(QName, configure, User, PermCache0),
check_vhost_queue_limit(Vhost, QName),
rabbit_core_metrics:queue_declared(QName),
Q0 = amqqueue:new(QName,
_Pid = none,
@ -2628,6 +2627,11 @@ declare_queue(QNameBin,
rabbit_core_metrics:queue_created(QName);
{existing, _Q} ->
ok;
{error, queue_limit_exceeded, Reason, ReasonArgs} ->
protocol_error(
?V_1_0_AMQP_ERROR_RESOURCE_LIMIT_EXCEEDED,
Reason,
ReasonArgs);
Other ->
protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
"Failed to declare ~s: ~p",
@ -2867,17 +2871,6 @@ check_topic_authorisation(#exchange{type = topic,
check_topic_authorisation(_, _, _, _, Cache) ->
Cache.
check_vhost_queue_limit(Vhost, QName) ->
case rabbit_vhost_limit:is_over_queue_limit(Vhost) of
false ->
ok;
{true, Limit} ->
protocol_error(
?V_1_0_AMQP_ERROR_RESOURCE_LIMIT_EXCEEDED,
"cannot declare ~ts: vhost queue limit (~p) is reached",
[rabbit_misc:rs(QName), Limit])
end.
check_user_id(Mc, User) ->
case rabbit_access_control:check_user_id(Mc, User) of
ok ->

View File

@ -201,6 +201,7 @@ find_recoverable_queues() ->
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
{'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} |
{'absent', amqqueue:amqqueue(), absent_reason()} |
{'error', Type :: atom(), Reason :: string(), Args :: term()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) ->
declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser, node()).
@ -219,6 +220,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) ->
node() | {'ignore_location', node()}) ->
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
{'absent', amqqueue:amqqueue(), absent_reason()} |
{'error', Type :: atom(), Reason :: string(), Args :: term()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
Owner, ActingUser, Node) ->

View File

@ -2503,6 +2503,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
%% connection has died. Pretend the queue exists though,
%% just so nothing fails.
{ok, QueueName, 0, 0};
{error, queue_limit_exceeded, Reason, ReasonArgs} ->
rabbit_misc:precondition_failed(Reason, ReasonArgs);
{protocol_error, ErrorType, Reason, ReasonArgs} ->
rabbit_misc:protocol_error(ErrorType, Reason, ReasonArgs)
end;

View File

@ -304,6 +304,7 @@ is_compatible(Type, Durable, Exclusive, AutoDelete) ->
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()} |
{'error', Type :: atom(), Reason :: string(), Args :: term()} |
{'error', Err :: term() }.
declare(Q0, Node) ->
Q = rabbit_queue_decorator:set(rabbit_policy:set(Q0)),
@ -774,7 +775,7 @@ known_queue_type_names() ->
-spec check_queue_limits(amqqueue:amqqueue()) ->
ok |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
{error, queue_limit_exceeded, Reason :: string(), Args :: term()}.
check_queue_limits(Q) ->
maybe
ok ?= check_vhost_queue_limit(Q),
@ -788,10 +789,9 @@ check_vhost_queue_limit(Q) ->
false ->
ok;
{true, Limit} ->
{protocol_error, precondition_failed,
"cannot declare queue '~ts': "
"queue limit in vhost '~ts' (~tp) is reached",
[QueueName, VHost, Limit]}
queue_limit_error("cannot declare queue '~ts': "
"queue limit in vhost '~ts' (~tp) is reached",
[QueueName, VHost, Limit])
end.
check_cluster_queue_limit(Q) ->
@ -802,11 +802,13 @@ check_cluster_queue_limit(Q) ->
Limit ->
case rabbit_db_queue:count() >= Limit of
true ->
{protocol_error, precondition_failed,
"cannot declare queue '~ts': "
"queue limit in cluster (~tp) is reached",
[QueueName, Limit]};
queue_limit_error("cannot declare queue '~ts': "
"queue limit in cluster (~tp) is reached",
[QueueName, Limit]);
false ->
ok
end
end.
queue_limit_error(Reason, ReasonArgs) ->
{error, queue_limit_exceeded, Reason, ReasonArgs}.

View File

@ -727,7 +727,7 @@ v1_vhost_queue_limit(Config) ->
Session1, <<"test-sender-1">>, TargetAddress),
ExpectedErr = amqp_error(
?V_1_0_AMQP_ERROR_RESOURCE_LIMIT_EXCEEDED,
<<"cannot declare queue 'q1' in vhost 'test vhost': vhost queue limit (0) is reached">>),
<<"cannot declare queue 'q1': queue limit in vhost 'test vhost' (0) is reached">>),
receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok
after 5000 -> flush(missing_ended),
ct:fail("did not receive expected error")
@ -831,8 +831,7 @@ declare_queue_vhost_queue_limit(Config) ->
?assertMatch(#{subject := <<"403">>}, amqp10_msg:properties(Resp)),
?assertEqual(
#'v1_0.amqp_value'{
content = {utf8, <<"refused to declare queue '", QName/binary, "' in vhost 'test vhost' ",
"because vhost queue limit 0 is reached">>}},
content = {utf8, <<"cannot declare queue '", QName/binary, "': queue limit in vhost 'test vhost' (0) is reached">>}},
amqp10_msg:body(Resp)),
ok = cleanup_pair(Init),

View File

@ -1354,32 +1354,27 @@ create_queue(QNamePart, QOwner, QArgs, QType,
Err0 -> Err0
end
end,
case rabbit_vhost_limit:is_over_queue_limit(VHost) of
false ->
rabbit_core_metrics:queue_declared(QName),
Q0 = amqqueue:new(QName,
none,
_Durable = true,
_AutoDelete = false,
QOwner,
QArgs,
VHost,
#{user => Username},
QType),
case rabbit_queue_type:declare(Q0, node()) of
{new, Q} when ?is_amqqueue(Q) ->
rabbit_core_metrics:queue_created(QName),
{ok, Q};
Other ->
?LOG_ERROR("Failed to declare ~s: ~p",
[rabbit_misc:rs(QName), Other]),
{error, queue_declare}
end;
{true, Limit} ->
?LOG_ERROR("cannot declare ~s because "
"queue limit ~p in vhost '~s' is reached",
[rabbit_misc:rs(QName), Limit, VHost]),
{error, queue_limit_exceeded}
rabbit_core_metrics:queue_declared(QName),
Q0 = amqqueue:new(QName,
none,
_Durable = true,
_AutoDelete = false,
QOwner,
QArgs,
VHost,
#{user => Username},
QType),
case rabbit_queue_type:declare(Q0, node()) of
{new, Q} when ?is_amqqueue(Q) ->
rabbit_core_metrics:queue_created(QName),
{ok, Q};
{error, queue_limit_exceeded, Reason, ReasonArgs} ->
?LOG_ERROR(Reason, ReasonArgs),
{error, queue_limit_exceeded};
Other ->
?LOG_ERROR("Failed to declare ~s: ~p",
[rabbit_misc:rs(QName), Other]),
{error, queue_declare}
end
else
{error, access_refused} = Err ->

View File

@ -524,15 +524,6 @@ handle_info(Info, State) ->
create_stream(VirtualHost, Reference, Arguments, Username) ->
StreamQueueArguments = stream_queue_arguments(Arguments),
maybe
ok ?= case rabbit_vhost_limit:is_over_queue_limit(VirtualHost) of
false ->
ok;
{true, Limit} ->
rabbit_log:warning("Cannot declare stream ~tp because "
"queue limit ~tp in vhost '~tp' is reached",
[Reference, Limit, VirtualHost]),
{error, validation_failed}
end,
ok ?= validate_stream_queue_arguments(StreamQueueArguments),
do_create_stream(VirtualHost, Reference, StreamQueueArguments, Username)
else
@ -581,6 +572,12 @@ do_create_stream(VirtualHost, Reference, StreamQueueArguments, Username) ->
rabbit_log:warning("Error while creating ~tp stream, ~tp",
[Reference, Err]),
{error, internal_error};
{error,
queue_limit_exceeded, Reason, ReasonArg} ->
rabbit_log:warning("Cannot declare stream ~tp because, "
++ Reason,
[Reference] ++ ReasonArg),
{error, validation_failed};
{protocol_error,
precondition_failed,
Msg,