Fix partition index conflict in stream SAC coordinator

Consumers with a same name, consuming from the same stream should have
the same partition index. This commit adds a check to enforce this rule
and make the subscription fail if it does not comply.

Fixes #13835

(cherry picked from commit cad8b70ee8)
This commit is contained in:
Arnaud Cogoluègnes 2025-05-06 12:17:25 +02:00 committed by Mergify
parent 0aeca40416
commit bcf1a5b69c
4 changed files with 197 additions and 119 deletions

View File

@ -198,21 +198,23 @@ apply(#command_register_consumer{vhost = VirtualHost,
owner = Owner,
subscription_id = SubscriptionId},
#?MODULE{groups = StreamGroups0} = State) ->
StreamGroups1 =
maybe_create_group(VirtualHost,
case maybe_create_group(VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
StreamGroups0),
do_register_consumer(VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
ConnectionPid,
Owner,
SubscriptionId,
State#?MODULE{groups = StreamGroups1});
StreamGroups0) of
{ok, StreamGroups1} ->
do_register_consumer(VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
ConnectionPid,
Owner,
SubscriptionId,
State#?MODULE{groups = StreamGroups1});
{error, Error} ->
{State, {error, Error}, []}
end;
apply(#command_unregister_consumer{vhost = VirtualHost,
stream = Stream,
consumer_name = ConsumerName,
@ -644,12 +646,15 @@ maybe_create_group(VirtualHost,
ConsumerName,
StreamGroups) ->
case StreamGroups of
#{{VirtualHost, Stream, ConsumerName} := _Group} ->
StreamGroups;
#{{VirtualHost, Stream, ConsumerName} := #group{partition_index = PI}}
when PI =/= PartitionIndex ->
{error, partition_index_conflict};
#{{VirtualHost, Stream, ConsumerName} := _} ->
{ok, StreamGroups};
SGS ->
maps:put({VirtualHost, Stream, ConsumerName},
#group{consumers = [], partition_index = PartitionIndex},
SGS)
{ok, maps:put({VirtualHost, Stream, ConsumerName},
#group{consumers = [], partition_index = PartitionIndex},
SGS)}
end.
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) ->

View File

@ -503,6 +503,20 @@ handle_connection_down_super_stream_no_active_removed_or_present_test(_) ->
Groups),
ok.
register_consumer_with_different_partition_index_should_return_error_test(_) ->
Stream = <<"stream">>,
ConsumerName = <<"app">>,
ConnectionPid = self(),
Command0 =
register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 0),
State0 = state(),
{State1, {ok, true}, _} =
rabbit_stream_sac_coordinator:apply(Command0, State0),
Command1 =
register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 1),
{_, {error, partition_index_conflict}, []} =
rabbit_stream_sac_coordinator:apply(Command1, State1).
assertSize(Expected, []) ->
?assertEqual(Expected, 0);
assertSize(Expected, Map) when is_map(Map) ->

View File

@ -1927,21 +1927,17 @@ handle_frame_post_auth(Transport, {ok, #stream_connection{user = User} = C}, Sta
{C, State};
handle_frame_post_auth(Transport,
{ok, #stream_connection{
name = ConnName,
socket = Socket,
stream_subscriptions = StreamSubscriptions,
virtual_host = VirtualHost,
user = User,
send_file_oct = SendFileOct,
transport = ConnTransport} = Connection},
#stream_connection_state{consumers = Consumers} = State,
user = User} = Connection},
State,
{request, CorrelationId,
{subscribe,
SubscriptionId,
Stream,
OffsetSpec,
Credit,
Properties}}) ->
_Credit,
Properties}} = Request) ->
QueueResource =
#resource{name = Stream,
kind = queue,
@ -2004,89 +2000,9 @@ handle_frame_post_auth(Transport,
increase_protocol_counter(?PRECONDITION_FAILED),
{Connection, State};
_ ->
Log = case Sac of
true ->
undefined;
false ->
init_reader(ConnTransport,
LocalMemberPid,
QueueResource,
SubscriptionId,
Properties,
OffsetSpec)
end,
ConsumerCounters =
atomics:new(2, [{signed, false}]),
response_ok(Transport,
Connection,
subscribe,
CorrelationId),
Active =
maybe_register_consumer(VirtualHost,
Stream,
ConsumerName,
ConnName,
SubscriptionId,
Properties,
Sac),
ConsumerConfiguration =
#consumer_configuration{member_pid =
LocalMemberPid,
subscription_id
=
SubscriptionId,
socket = Socket,
stream = Stream,
offset =
OffsetSpec,
counters =
ConsumerCounters,
properties =
Properties,
active =
Active},
SendLimit = Credit div 2,
ConsumerState =
#consumer{configuration =
ConsumerConfiguration,
log = Log,
send_limit = SendLimit,
credit = Credit},
Connection1 =
maybe_monitor_stream(LocalMemberPid,
Stream,
Connection),
State1 =
maybe_dispatch_on_subscription(Transport,
State,
ConsumerState,
Connection1,
Consumers,
Stream,
SubscriptionId,
Properties,
SendFileOct,
Sac),
StreamSubscriptions1 =
case StreamSubscriptions of
#{Stream := SubscriptionIds} ->
StreamSubscriptions#{Stream =>
[SubscriptionId]
++ SubscriptionIds};
_ ->
StreamSubscriptions#{Stream =>
[SubscriptionId]}
end,
{Connection1#stream_connection{stream_subscriptions
=
StreamSubscriptions1},
State1}
handle_subscription(Transport, Connection,
State, Request,
LocalMemberPid)
end
end
end;
@ -2995,8 +2911,106 @@ maybe_dispatch_on_subscription(_Transport,
Consumers1 = Consumers#{SubscriptionId => ConsumerState},
State#stream_connection_state{consumers = Consumers1}.
handle_subscription(Transport,#stream_connection{
name = ConnName,
socket = Socket,
stream_subscriptions = StreamSubscriptions,
virtual_host = VirtualHost,
send_file_oct = SendFileOct,
transport = ConnTransport} = Connection,
#stream_connection_state{consumers = Consumers} = State,
{request, CorrelationId, {subscribe,
SubscriptionId,
Stream,
OffsetSpec,
Credit,
Properties}},
LocalMemberPid) ->
Sac = single_active_consumer(Properties),
ConsumerName = consumer_name(Properties),
QueueResource = #resource{name = Stream,
kind = queue,
virtual_host = VirtualHost},
case maybe_register_consumer(VirtualHost, Stream, ConsumerName, ConnName,
SubscriptionId, Properties, Sac) of
{ok, Active} ->
Log = case Sac of
true ->
undefined;
false ->
init_reader(ConnTransport,
LocalMemberPid,
QueueResource,
SubscriptionId,
Properties,
OffsetSpec)
end,
ConsumerCounters = atomics:new(2, [{signed, false}]),
response_ok(Transport,
Connection,
subscribe,
CorrelationId),
ConsumerConfiguration = #consumer_configuration{
member_pid = LocalMemberPid,
subscription_id = SubscriptionId,
socket = Socket,
stream = Stream,
offset = OffsetSpec,
counters = ConsumerCounters,
properties = Properties,
active = Active},
SendLimit = Credit div 2,
ConsumerState =
#consumer{configuration = ConsumerConfiguration,
log = Log,
send_limit = SendLimit,
credit = Credit},
Connection1 = maybe_monitor_stream(LocalMemberPid,
Stream,
Connection),
State1 = maybe_dispatch_on_subscription(Transport,
State,
ConsumerState,
Connection1,
Consumers,
Stream,
SubscriptionId,
Properties,
SendFileOct,
Sac),
StreamSubscriptions1 =
case StreamSubscriptions of
#{Stream := SubscriptionIds} ->
StreamSubscriptions#{Stream =>
[SubscriptionId]
++ SubscriptionIds};
_ ->
StreamSubscriptions#{Stream =>
[SubscriptionId]}
end,
{Connection1#stream_connection{stream_subscriptions
=
StreamSubscriptions1},
State1};
{error, Reason} ->
rabbit_log:warning("Cannot create SAC subcription ~tp: ~tp",
[SubscriptionId, Reason]),
response(Transport,
Connection,
subscribe,
CorrelationId,
?RESPONSE_CODE_PRECONDITION_FAILED),
increase_protocol_counter(?PRECONDITION_FAILED),
{Connection, State}
end.
maybe_register_consumer(_, _, _, _, _, _, false = _Sac) ->
true;
{ok, true};
maybe_register_consumer(VirtualHost,
Stream,
ConsumerName,
@ -3005,15 +3019,13 @@ maybe_register_consumer(VirtualHost,
Properties,
true) ->
PartitionIndex = partition_index(VirtualHost, Stream, Properties),
{ok, Active} =
rabbit_stream_sac_coordinator:register_consumer(VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
self(),
ConnectionName,
SubscriptionId),
Active.
rabbit_stream_sac_coordinator:register_consumer(VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
self(),
ConnectionName,
SubscriptionId).
maybe_send_consumer_update(Transport,
Connection = #stream_connection{

View File

@ -68,7 +68,8 @@ groups() ->
test_publisher_with_too_long_reference_errors,
test_consumer_with_too_long_reference_errors,
subscribe_unsubscribe_should_create_events,
test_stream_test_utils
test_stream_test_utils,
sac_subscription_with_partition_index_conflict_should_return_error
]},
%% Run `test_global_counters` on its own so the global metrics are
%% initialised to 0 for each testcase
@ -1069,6 +1070,52 @@ test_stream_test_utils(Config) ->
{ok, _} = stream_test_utils:close(S, C5),
ok.
sac_subscription_with_partition_index_conflict_should_return_error(Config) ->
T = gen_tcp,
App = <<"app-1">>,
{ok, S, C0} = stream_test_utils:connect(Config, 0),
Ss = atom_to_binary(?FUNCTION_NAME, utf8),
Partition = unicode:characters_to_binary([Ss, <<"-0">>]),
SsCreationFrame = request({create_super_stream, Ss, [Partition], [<<"0">>], #{}}),
ok = T:send(S, SsCreationFrame),
{Cmd1, C1} = receive_commands(T, S, C0),
?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_OK}},
Cmd1),
SacSubscribeFrame = request({subscribe, 0, Partition,
first, 1,
#{<<"single-active-consumer">> => <<"true">>,
<<"name">> => App}}),
ok = T:send(S, SacSubscribeFrame),
{Cmd2, C2} = receive_commands(T, S, C1),
?assertMatch({response, 1, {subscribe, ?RESPONSE_CODE_OK}},
Cmd2),
{Cmd3, C3} = receive_commands(T, S, C2),
?assertMatch({request,0,{consumer_update,0,true}},
Cmd3),
SsSubscribeFrame = request({subscribe, 1, Partition,
first, 1,
#{<<"super-stream">> => Ss,
<<"single-active-consumer">> => <<"true">>,
<<"name">> => App}}),
ok = T:send(S, SsSubscribeFrame),
{Cmd4, C4} = receive_commands(T, S, C3),
?assertMatch({response, 1, {subscribe, ?RESPONSE_CODE_PRECONDITION_FAILED}},
Cmd4),
{ok, C5} = stream_test_utils:unsubscribe(S, C4, 0),
SsDeletionFrame = request({delete_super_stream, Ss}),
ok = T:send(S, SsDeletionFrame),
{Cmd5, C5} = receive_commands(T, S, C5),
?assertMatch({response, 1, {delete_super_stream, ?RESPONSE_CODE_OK}},
Cmd5),
{ok, _} = stream_test_utils:close(S, C5),
ok.
filtered_events(Config, EventType) ->
Events = rabbit_ct_broker_helpers:rpc(Config, 0,
gen_event,