Return error if stream publisher reference is longer than 255 characters

Fixes #12499
This commit is contained in:
Arnaud Cogoluègnes 2024-10-10 15:12:52 +02:00
parent d9ff6a00d8
commit 4e8fb46bbf
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
2 changed files with 58 additions and 3 deletions

View File

@ -81,6 +81,8 @@
-define(UNKNOWN_FIELD, unknown_field).
-define(SILENT_CLOSE_DELAY, 3_000).
-import(rabbit_stream_utils, [check_write_permitted/2]).
%% client API
-export([start_link/4,
info/2,
@ -1655,6 +1657,26 @@ handle_frame_post_auth(Transport,
{C1#stream_connection{connection_step = failure}, S1}
end,
{Connection1, State1};
handle_frame_post_auth(Transport,
#stream_connection{user = User,
resource_alarm = false} = C,
State,
{request, CorrelationId,
{declare_publisher, _PublisherId, WriterRef, S}})
when is_binary(WriterRef), byte_size(WriterRef) > 255 ->
{Code, Counter} = case check_write_permitted(stream_r(S, C), User) of
ok ->
{?RESPONSE_CODE_PRECONDITION_FAILED, ?PRECONDITION_FAILED};
error ->
{?RESPONSE_CODE_ACCESS_REFUSED, ?ACCESS_REFUSED}
end,
response(Transport,
C,
declare_publisher,
CorrelationId,
Code),
rabbit_global_counters:increase_protocol_counter(stream, Counter, 1),
{C, State};
handle_frame_post_auth(Transport,
#stream_connection{user = User,
publishers = Publishers0,
@ -1664,7 +1686,7 @@ handle_frame_post_auth(Transport,
State,
{request, CorrelationId,
{declare_publisher, PublisherId, WriterRef, Stream}}) ->
case rabbit_stream_utils:check_write_permitted(stream_r(Stream,
case check_write_permitted(stream_r(Stream,
Connection0),
User)
of
@ -3102,7 +3124,7 @@ evaluate_state_after_secret_update(Transport,
{_, Conn1} = ensure_token_expiry_timer(User, Conn0),
PublisherStreams =
lists:foldl(fun(#publisher{stream = Str}, Acc) ->
case rabbit_stream_utils:check_write_permitted(stream_r(Str, Conn0), User) of
case check_write_permitted(stream_r(Str, Conn0), User) of
ok ->
Acc;
_ ->

View File

@ -64,7 +64,8 @@ groups() ->
test_super_stream_duplicate_partitions,
authentication_error_should_close_with_delay,
unauthorized_vhost_access_should_close_with_delay,
sasl_anonymous
sasl_anonymous,
test_publisher_with_too_long_reference_errors
]},
%% Run `test_global_counters` on its own so the global metrics are
%% initialised to 0 for each testcase
@ -945,6 +946,38 @@ unauthorized_vhost_access_should_close_with_delay(Config) ->
closed = wait_for_socket_close(T, S, 10),
ok.
test_publisher_with_too_long_reference_errors(Config) ->
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C = rabbit_stream_core:init(0),
ConnectionName = FunctionName,
test_peer_properties(T, S, #{<<"connection_name">> => ConnectionName}, C),
test_authenticate(T, S, C),
Stream = FunctionName,
test_create_stream(T, S, Stream, C),
MaxSize = 255,
ReferenceOK = iolist_to_binary(lists:duplicate(MaxSize, <<"a">>)),
ReferenceKO = iolist_to_binary(lists:duplicate(MaxSize + 1, <<"a">>)),
Tests = [{1, ReferenceOK, ?RESPONSE_CODE_OK},
{2, ReferenceKO, ?RESPONSE_CODE_PRECONDITION_FAILED}],
[begin
F = request({declare_publisher, PubId, Ref, Stream}),
ok = T:send(S, F),
{Cmd, C} = receive_commands(T, S, C),
?assertMatch({response, 1, {declare_publisher, ExpectedResponseCode}}, Cmd)
end || {PubId, Ref, ExpectedResponseCode} <- Tests],
test_delete_stream(T, S, Stream, C),
test_close(T, S, C),
ok.
consumer_offset_info(Config, ConnectionName) ->
[[{offset, Offset},
{offset_lag, Lag}]] = rpc(Config, 0, ?MODULE,