Introduce a configurable limit to message size.

Add `max_message_size` configuration to configure limit
in bytes.
If message is bigger - channel exception will be thrown.

Default limit is 128MB.
There is still a hard limit of 521MB.

[#161983593]
This commit is contained in:
Daniil Fedotov 2018-12-27 19:26:37 +04:00
parent 794ef8de24
commit 03fc22ddd9
4 changed files with 99 additions and 2 deletions

View File

@ -130,7 +130,9 @@ define PROJECT_ENV
{vhost_restart_strategy, continue},
%% {global, prefetch count}
{default_consumer_prefetch, {false, 0}},
{channel_queue_cleanup_interval, 60000}
{channel_queue_cleanup_interval, 60000},
%% Default max message size is 128 MB
{max_message_size, 134217728}
]
endef

View File

@ -554,6 +554,9 @@ end}.
}.
{mapping, "msx_message_size", "rabbit.max_message_size",
[{datatype, integer}, {validators, ["less_then_512MB"]}]}.
%% Customising Socket Options.
%%
%% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for
@ -1361,6 +1364,11 @@ fun(Size) when is_integer(Size) ->
Size > 0 andalso Size < 2147483648
end}.
{validator, "less_then_512MB", "Max message size should be less than 512MB and gre than 0",
fun(Size) when is_integer(Size) ->
Size > 0 andalso Size < 536870912
end}.
{validator, "less_than_1", "Flooat is not beetween 0 and 1",
fun(Float) when is_float(Float) ->
Float > 0 andalso Float < 1

View File

@ -990,7 +990,15 @@ check_msg_size(Content) ->
case Size > ?MAX_MSG_SIZE of
true -> precondition_failed("message size ~B larger than max size ~B",
[Size, ?MAX_MSG_SIZE]);
false -> ok
false ->
case application:get_env(rabbit, max_message_size) of
{ok, MaxSize} when is_integer(MaxSize) andalso Size > MaxSize ->
precondition_failed("message size ~B larger than"
" configured max size ~B",
[Size, MaxSize]);
_ -> ok
end
end.
check_vhost_queue_limit(#resource{name = QueueName}, VHost) ->

View File

@ -58,6 +58,7 @@ groups() ->
set_disk_free_limit_command,
set_vm_memory_high_watermark_command,
topic_matching,
max_message_size,
{queue_max_length, [], [
{max_length_simple, [], MaxLengthTests},
{max_length_mirrored, [], MaxLengthTests}]}
@ -1299,6 +1300,84 @@ sync_mirrors(QName, Config) ->
_ -> ok
end.
gen_binary_mb(N) ->
B1M = << <<"_">> || _ <- lists:seq(1, 1024 * 1024) >>,
<< B1M || _ <- lists:seq(1, N) >>.
assert_channel_alive(Ch) ->
amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>},
#amqp_msg{payload = <<"HI">>}).
assert_channel_fail_max_size(Ch, Monitor, ExpectedException) ->
receive
{'DOWN', Monitor, process, Ch,
{shutdown,
{server_initiated_close, 406, Exception}}} ->
?assertMatch(Exception, ExpectedException)
after 100000 ->
error({channel_exception_expected, max_message_size})
end.
max_message_size(Config) ->
Binary128M = gen_binary_mb(128),
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
%% Default message size is 128MB
Size128Mb = 1024 * 1024 * 128,
Size128Mb = rabbit_ct_broker_helpers:rpc(Config, 0,
application, get_env, [rabbit, max_message_size, undefined]),
Size128Mb = byte_size(Binary128M),
%% Binary is whithin the max size limit
amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary128M}),
%% Channel process is alive
assert_channel_alive(Ch),
Monitor = monitor(process, Ch),
%% This publish should cause a channel exception
BinaryBiggerThan128M = <<"_", Binary128M/binary>>,
amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan128M}),
ct:pal("Assert channel error 128"),
ExpectedException = <<"PRECONDITION_FAILED - message size ",
(integer_to_binary(byte_size(BinaryBiggerThan128M)))/binary,
" larger than configured max size ",
(integer_to_binary(Size128Mb))/binary>>,
assert_channel_fail_max_size(Ch, Monitor, ExpectedException),
{_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
%% Set a bigger message size
rabbit_ct_broker_helpers:rpc(Config, 0,
application, set_env, [rabbit, max_message_size, 1024 * 1024 * 256]),
amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary128M}),
assert_channel_alive(Ch1),
amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan128M}),
assert_channel_alive(Ch1),
%% Set message size above 512MB.
%% The actual limit will be 512MB
rabbit_ct_broker_helpers:rpc(Config, 0,
application, set_env, [rabbit, max_message_size, 1024 * 1024 * 515]),
Binary512M = << Binary128M/binary, Binary128M/binary,
Binary128M/binary, Binary128M/binary>>,
BinaryBiggerThan512M = <<"_", Binary512M/binary>>,
amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary512M}),
assert_channel_alive(Ch1),
Monitor1 = monitor(process, Ch1),
amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan512M}),
ct:pal("Assert channel error 512"),
ExpectedException1 = <<"PRECONDITION_FAILED - message size ",
(integer_to_binary(byte_size(BinaryBiggerThan512M)))/binary,
" larger than max size ",
(integer_to_binary(byte_size(Binary512M)))/binary>>,
assert_channel_fail_max_size(Ch1, Monitor1, ExpectedException1).
%% ---------------------------------------------------------------------------
%% rabbitmqctl helpers.
%% ---------------------------------------------------------------------------