Limit stream max segment size to 3 GB

Values too large can overflow the stream position field
in the index (32 bit int).
This commit is contained in:
Arnaud Cogoluègnes 2022-06-10 11:45:57 +02:00
parent 7e80db7b70
commit e44b65957d
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
7 changed files with 96 additions and 24 deletions

View File

@ -45,6 +45,8 @@
tracking_status/2,
get_overview/1]).
-export([check_max_segment_size_bytes/1]).
-include_lib("rabbit_common/include/rabbit.hrl").
-include("amqqueue.hrl").
@ -92,7 +94,8 @@ declare(Q0, _Node) when ?amqqueue_is_stream(Q0) ->
case rabbit_queue_type_util:run_checks(
[fun rabbit_queue_type_util:check_auto_delete/1,
fun rabbit_queue_type_util:check_exclusive/1,
fun rabbit_queue_type_util:check_non_durable/1],
fun rabbit_queue_type_util:check_non_durable/1,
fun rabbit_stream_queue:check_max_segment_size_bytes/1],
Q0) of
ok ->
create_stream(Q0);
@ -100,6 +103,18 @@ declare(Q0, _Node) when ?amqqueue_is_stream(Q0) ->
Err
end.
check_max_segment_size_bytes(Q) ->
Args = amqqueue:get_arguments(Q),
case rabbit_misc:table_lookup(Args, <<"x-stream-max-segment-size-bytes">>) of
undefined ->
ok;
{_Type, Val} when Val > ?MAX_STREAM_MAX_SEGMENT_SIZE ->
{protocol_error, precondition_failed, "Exceeded max value for x-stream-max-segment-size-bytes",
[]};
_ ->
ok
end.
create_stream(Q0) ->
Arguments = amqqueue:get_arguments(Q0),
QName = amqqueue:get_name(Q0),

View File

@ -114,6 +114,7 @@ all_tests() ->
max_age,
invalid_policy,
max_age_policy,
max_segment_size_bytes_validation,
max_segment_size_bytes_policy,
purge,
update_retention_policy,
@ -1601,6 +1602,25 @@ max_length_bytes(Config) ->
?assert(length(receive_batch()) < 200),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
max_segment_size_bytes_validation(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-stream-max-segment-size-bytes">>, long, 10_000_000}])),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]),
?assertExit(
{{shutdown, {server_initiated_close, 406, _}}, _},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-stream-max-segment-size-bytes">>, long, ?MAX_STREAM_MAX_SEGMENT_SIZE + 1_000}])),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
max_age(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

View File

@ -272,3 +272,7 @@
%% 3.6, 3.7, early 3.8
-define(LEGACY_INDEX_SEGMENT_ENTRY_COUNT, 16384).
%% Max value for stream max segment size
-define(MAX_STREAM_MAX_SEGMENT_SIZE, 3_000_000_000).

View File

@ -95,7 +95,8 @@ validate_stream_arguments(#{stream_max_segment_size_bytes := Value} =
validate_stream_arguments(#{leader_locator := <<"client-local">>} =
Opts) ->
validate_stream_arguments(maps:remove(leader_locator, Opts));
validate_stream_arguments(#{leader_locator := <<"balanced">>} = Opts) ->
validate_stream_arguments(#{leader_locator := <<"balanced">>} =
Opts) ->
validate_stream_arguments(maps:remove(leader_locator, Opts));
%% 'random' and 'least-leaders' are deprecated and get mapped to 'balanced'
validate_stream_arguments(#{leader_locator := <<"random">>} = Opts) ->

View File

@ -176,7 +176,9 @@ validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long,
validate_stream_queue_arguments([{<<"x-queue-leader-locator">>,
longstr, Locator}
| T]) ->
case lists:member(Locator, rabbit_queue_location:queue_leader_locators()) of
case lists:member(Locator,
rabbit_queue_location:queue_leader_locators())
of
true ->
validate_stream_queue_arguments(T);
false ->
@ -538,7 +540,15 @@ create_stream(VirtualHost, Reference, Arguments, Username) ->
{error, Err} ->
rabbit_log:warning("Error while creating ~p stream, ~p",
[Reference, Err]),
{error, internal_error}
{error, internal_error};
{protocol_error,
precondition_failed,
Msg,
Args} ->
rabbit_log:warning("Error while creating ~p stream, "
++ Msg,
[Reference] ++ Args),
{error, validation_failed}
end
catch
exit:Error ->

View File

@ -819,11 +819,11 @@ open(info,
Conn1 =
maybe_send_consumer_update(Transport,
Connection0,
SubId,
Active,
true,
Extra),
Connection0,
SubId,
Active,
true,
Extra),
{Conn1,
ConnState0#stream_connection_state{consumers =
Consumers0#{SubId
@ -1738,9 +1738,7 @@ handle_frame_post_auth(Transport,
User, #{})
of
ok ->
case rabbit_stream_manager:lookup_leader(VirtualHost,
Stream)
of
case rabbit_stream_manager:lookup_leader(VirtualHost, Stream) of
{error, not_found} ->
rabbit_global_counters:increase_protocol_counter(stream,
?STREAM_DOES_NOT_EXIST,
@ -1748,8 +1746,7 @@ handle_frame_post_auth(Transport,
{?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0};
{ok, LeaderPid} ->
{?RESPONSE_CODE_OK,
case osiris:fetch_writer_seq(LeaderPid, Reference)
of
case osiris:fetch_writer_seq(LeaderPid, Reference) of
undefined ->
0;
Offt ->
@ -2777,15 +2774,16 @@ maybe_register_consumer(VirtualHost,
maybe_send_consumer_update(_, Connection, _, _, false = _Sac, _) ->
Connection;
maybe_send_consumer_update(Transport,
#stream_connection{socket = S,
correlation_id_sequence = CorrIdSeq,
outstanding_requests =
OutstandingRequests0} =
Connection,
SubscriptionId,
Active,
true = _Sac,
Extra) ->
#stream_connection{socket = S,
correlation_id_sequence =
CorrIdSeq,
outstanding_requests =
OutstandingRequests0} =
Connection,
SubscriptionId,
Active,
true = _Sac,
Extra) ->
rabbit_log:debug("SAC subscription ~p, active = ~p",
[SubscriptionId, Active]),
Frame =

View File

@ -44,7 +44,8 @@ groups() ->
timeout_peer_properties_exchanged,
unauthenticated_client_rejected_authenticating,
timeout_authenticating,
timeout_close_sent]},
timeout_close_sent,
max_segment_size_bytes_validation]},
%% Run `test_global_counters` on its own so the global metrics are
%% initialised to 0 for each testcase
{single_node_1, [], [test_global_counters]},
@ -326,6 +327,29 @@ sac_ff(Config) ->
closed = wait_for_socket_close(gen_tcp, S, 10),
ok.
max_segment_size_bytes_validation(Config) ->
Transport = gen_tcp,
Port = get_stream_port(Config),
{ok, S} =
Transport:connect("localhost", Port,
[{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
C2 = test_authenticate(Transport, S, C1),
Stream = <<"stream-max-segment-size">>,
CreateStreamFrame =
rabbit_stream_core:frame({request, 1,
{create_stream, Stream,
#{<<"stream-max-segment-size-bytes">> =>
<<"3000000001">>}}}),
ok = Transport:send(S, CreateStreamFrame),
{Cmd, C3} = receive_commands(Transport, S, C2),
?assertMatch({response, 1,
{create_stream, ?RESPONSE_CODE_PRECONDITION_FAILED}},
Cmd),
test_close(Transport, S, C3),
ok.
consumer_count(Config) ->
ets_count(Config, ?TABLE_CONSUMER).