Check virtual host queue limit on stream creation

Fixes #9910
This commit is contained in:
Arnaud Cogoluègnes 2023-11-17 08:45:42 +01:00
parent f3819883fb
commit 0e5d15592a
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
3 changed files with 210 additions and 94 deletions

View File

@ -16,6 +16,8 @@
-module(rabbit_stream_manager).
-feature(maybe_expr, enable).
-behaviour(gen_server).
-include_lib("rabbit_common/include/rabbit_framing.hrl").
@ -518,85 +520,99 @@ handle_info(Info, State) ->
{noreply, State}.
create_stream(VirtualHost, Reference, Arguments, Username) ->
Name =
#resource{virtual_host = VirtualHost,
kind = queue,
name = Reference},
StreamQueueArguments = stream_queue_arguments(Arguments),
case validate_stream_queue_arguments(StreamQueueArguments) of
ok ->
Q0 = amqqueue:new(Name,
none,
true,
false,
none,
StreamQueueArguments,
VirtualHost,
#{user => Username},
rabbit_stream_queue),
try
QueueLookup =
rabbit_amqqueue:with(Name,
fun(Q) ->
ok =
rabbit_amqqueue:assert_equivalence(Q,
true,
false,
StreamQueueArguments,
none)
end),
maybe
ok ?= case rabbit_vhost_limit:is_over_queue_limit(VirtualHost) of
false ->
ok;
{true, Limit} ->
rabbit_log:warning("Cannot declare stream ~tp because "
"queue limit ~tp in vhost '~tp' is reached",
[Reference, Limit, VirtualHost]),
{error, validation_failed}
end,
ok ?= validate_stream_queue_arguments(StreamQueueArguments),
do_create_stream(VirtualHost, Reference, StreamQueueArguments, Username)
else
error ->
{error, validation_failed};
{error, _} = Err ->
Err
end.
case QueueLookup of
ok ->
{error, reference_already_exists};
{error, not_found} ->
try
case rabbit_queue_type:declare(Q0, node()) of
{new, Q} ->
{ok, amqqueue:get_type_state(Q)};
{existing, _} ->
{error, reference_already_exists};
{error, Err} ->
rabbit_log:warning("Error while creating ~tp stream, ~tp",
[Reference, Err]),
{error, internal_error};
{protocol_error,
precondition_failed,
Msg,
Args} ->
rabbit_log:warning("Error while creating ~tp stream, "
++ Msg,
[Reference] ++ Args),
{error, validation_failed}
end
catch
exit:Error ->
rabbit_log:error("Error while creating ~tp stream, ~tp",
[Reference, Error]),
{error, internal_error}
end;
{error, {absent, _, Reason}} ->
rabbit_log:error("Error while creating ~tp stream, ~tp",
[Reference, Reason]),
{error, internal_error}
end
catch
exit:ExitError ->
case ExitError of
% likely a problem of inequivalent args on an existing stream
{amqp_error, precondition_failed, M, _} ->
rabbit_log:info("Error while creating ~tp stream, "
++ M,
[Reference]),
{error, validation_failed};
E ->
do_create_stream(VirtualHost, Reference, StreamQueueArguments, Username) ->
Name = #resource{virtual_host = VirtualHost,
kind = queue,
name = Reference},
Q0 = amqqueue:new(Name,
none,
true,
false,
none,
StreamQueueArguments,
VirtualHost,
#{user => Username},
rabbit_stream_queue),
try
QueueLookup =
rabbit_amqqueue:with(Name,
fun(Q) ->
ok =
rabbit_amqqueue:assert_equivalence(Q,
true,
false,
StreamQueueArguments,
none)
end),
case QueueLookup of
ok ->
{error, reference_already_exists};
{error, not_found} ->
try
case rabbit_queue_type:declare(Q0, node()) of
{new, Q} ->
{ok, amqqueue:get_type_state(Q)};
{existing, _} ->
{error, reference_already_exists};
{error, Err} ->
rabbit_log:warning("Error while creating ~tp stream, ~tp",
[Reference, E]),
[Reference, Err]),
{error, internal_error};
{protocol_error,
precondition_failed,
Msg,
Args} ->
rabbit_log:warning("Error while creating ~tp stream, "
++ Msg,
[Reference] ++ Args),
{error, validation_failed}
end
end;
error ->
{error, validation_failed}
catch
exit:Error ->
rabbit_log:error("Error while creating ~tp stream, ~tp",
[Reference, Error]),
{error, internal_error}
end;
{error, {absent, _, Reason}} ->
rabbit_log:error("Error while creating ~tp stream, ~tp",
[Reference, Reason]),
{error, internal_error}
end
catch
exit:ExitError ->
case ExitError of
% likely a problem of inequivalent args on an existing stream
{amqp_error, precondition_failed, M, _} ->
rabbit_log:info("Error while creating ~tp stream, "
++ M,
[Reference]),
{error, validation_failed};
E ->
rabbit_log:warning("Error while creating ~tp stream, ~tp",
[Reference, E]),
{error, validation_failed}
end
end.
delete_stream(VirtualHost, Reference, Username) ->
@ -659,24 +675,31 @@ validate_super_stream_creation(_VirtualHost, _Name, Partitions, BindingKeys)
when length(Partitions) =/= length(BindingKeys) ->
{error, {validation_failed, "There must be the same number of partitions and binding keys"}};
validate_super_stream_creation(VirtualHost, Name, Partitions, _BindingKeys) ->
case exchange_exists(VirtualHost, Name) of
{error, validation_failed} ->
{error,
{validation_failed,
rabbit_misc:format("~ts is not a correct name for a super stream",
[Name])}};
{ok, true} ->
{error,
{reference_already_exists,
rabbit_misc:format("there is already an exchange named ~ts",
[Name])}};
{ok, false} ->
case check_already_existing_queue(VirtualHost, Partitions) of
{error, Reason} ->
{error, Reason};
ok ->
ok
end
maybe
ok ?= case rabbit_vhost_limit:would_exceed_queue_limit(length(Partitions), VirtualHost) of
false ->
ok;
{true, Limit, _} ->
{error, {validation_failed,
rabbit_misc:format("Cannot declare super stream ~tp with ~tp partition(s) "
"because queue limit ~tp in vhost '~tp' is reached",
[Name, length(Partitions), Limit, VirtualHost])}}
end,
ok ?= case exchange_exists(VirtualHost, Name) of
{error, validation_failed} ->
{error,
{validation_failed,
rabbit_misc:format("~ts is not a correct name for a super stream",
[Name])}};
{ok, true} ->
{error,
{reference_already_exists,
rabbit_misc:format("there is already an exchange named ~ts",
[Name])}};
{ok, false} ->
ok
end,
ok ?= check_already_existing_queue(VirtualHost, Partitions)
end.
exchange_exists(VirtualHost, Name) ->

View File

@ -54,7 +54,8 @@ groups() ->
timeout_close_sent,
max_segment_size_bytes_validation,
close_connection_on_consumer_update_timeout,
set_filter_size
set_filter_size,
vhost_queue_limit
]},
%% Run `test_global_counters` on its own so the global metrics are
%% initialised to 0 for each testcase
@ -153,6 +154,14 @@ init_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config
set_env,
[rabbitmq_stream, request_timeout, 2000]),
rabbit_ct_helpers:testcase_started(Config, TestCase);
init_per_testcase(vhost_queue_limit = TestCase, Config) ->
QueueCount = rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_amqqueue,
count,
[<<"/">>]),
ok = rabbit_ct_broker_helpers:set_vhost_limit(Config, 0, <<"/">>, max_queues, QueueCount + 5),
rabbit_ct_helpers:testcase_started(Config, TestCase);
init_per_testcase(TestCase, Config) ->
rabbit_ct_helpers:testcase_started(Config, TestCase).
@ -178,6 +187,13 @@ end_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config)
set_env,
[rabbitmq_stream, request_timeout, 60000]),
rabbit_ct_helpers:testcase_finished(Config, TestCase);
end_per_testcase(vhost_queue_limit = TestCase, Config) ->
_ = rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_vhost_limit,
clear,
[<<"/">>, <<"guest">>]),
rabbit_ct_helpers:testcase_finished(Config, TestCase);
end_per_testcase(TestCase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, TestCase).
@ -627,6 +643,66 @@ set_filter_size(Config) ->
closed = wait_for_socket_close(Transport, S, 10),
ok.
vhost_queue_limit(Config) ->
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C = rabbit_stream_core:init(0),
test_peer_properties(T, S, C),
test_authenticate(T, S, C),
QueueCount = rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_amqqueue,
count,
[<<"/">>]),
{ok, QueueLimit} = rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_vhost_limit,
queue_limit,
[<<"/">>]),
PartitionCount = QueueLimit - 1 - QueueCount,
Name = atom_to_binary(?FUNCTION_NAME, utf8),
Partitions = [unicode:characters_to_binary([Name, <<"-">>, integer_to_binary(N)]) || N <- lists:seq(0, PartitionCount)],
Bks = [integer_to_binary(N) || N <- lists:seq(0, PartitionCount)],
SsCreationFrame = frame({request, 1, {create_super_stream, Name, Partitions, Bks, #{}}}),
ok = T:send(S, SsCreationFrame),
{Cmd1, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_OK}},
Cmd1),
SsCreationFrameKo = frame({request, 1, {create_super_stream,
<<"exceed-queue-limit">>,
[<<"s1">>, <<"s2">>, <<"s3">>],
[<<"1">>, <<"2">>, <<"3">>], #{}}}),
ok = T:send(S, SsCreationFrameKo),
{Cmd2, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_PRECONDITION_FAILED}},
Cmd2),
CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, <<"exceed-queue-limit">>, #{}}}),
ok = T:send(S, CreateStreamFrame),
{Cmd3, C} = receive_commands(T, S, C),
?assertMatch({response, 1, {create_stream, ?RESPONSE_CODE_PRECONDITION_FAILED}}, Cmd3),
SsDeletionFrame = frame({request, 1, {delete_super_stream, Name}}),
ok = T:send(S, SsDeletionFrame),
{Cmd4, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {delete_super_stream, ?RESPONSE_CODE_OK}},
Cmd4),
ok = T:send(S, rabbit_stream_core:frame({request, 1, {create_stream, Name, #{}}})),
{Cmd5, C} = receive_commands(T, S, C),
?assertMatch({response, 1, {create_stream, ?RESPONSE_CODE_OK}}, Cmd5),
ok = T:send(S, rabbit_stream_core:frame({request, 1, {delete_stream, Name}})),
{Cmd6, C} = receive_commands(T, S, C),
?assertMatch({response, 1, {delete_stream, ?RESPONSE_CODE_OK}}, Cmd6),
ok.
consumer_count(Config) ->
ets_count(Config, ?TABLE_CONSUMER).

View File

@ -1,5 +1,7 @@
-module(rabbit_stream_utils_SUITE).
-feature(maybe_expr, enable).
-compile(nowarn_export_all).
-compile(export_all).
@ -17,7 +19,7 @@ suite() ->
[{timetrap, {seconds, 30}}].
groups() ->
[{tests, [], [sort_partitions, filter_spec, filter_defined]}].
[{tests, [], [sort_partitions, filter_spec, filter_defined, test_maybe]}].
init_per_suite(Config) ->
Config.
@ -100,3 +102,18 @@ binding(Destination, Order) ->
binding(Destination) ->
#binding{destination = #resource{name = Destination}, args = []}.
test_maybe(_) ->
R = maybe
ok ?= callme(),
S = hello("world"),
"bye bye " ++ S
end,
io:format("Result = ~p", [R]),
ok.
callme() ->
ok.
hello(S) ->
"hello " ++ S.