From bcf1a5b69c07a4fd3dfb5b059ca71ccabb6dbdbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Tue, 6 May 2025 12:17:25 +0200 Subject: [PATCH] Fix partition index conflict in stream SAC coordinator Consumers with a same name, consuming from the same stream should have the same partition index. This commit adds a check to enforce this rule and make the subscription fail if it does not comply. Fixes #13835 (cherry picked from commit cad8b70ee8b91420a1c546076dc0524f90ee978c) --- .../src/rabbit_stream_sac_coordinator.erl | 39 ++-- .../rabbit_stream_sac_coordinator_SUITE.erl | 14 ++ .../src/rabbit_stream_reader.erl | 214 +++++++++--------- .../test/rabbit_stream_SUITE.erl | 49 +++- 4 files changed, 197 insertions(+), 119 deletions(-) diff --git a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl index 9452f1408a..9975cebb48 100644 --- a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl @@ -198,21 +198,23 @@ apply(#command_register_consumer{vhost = VirtualHost, owner = Owner, subscription_id = SubscriptionId}, #?MODULE{groups = StreamGroups0} = State) -> - StreamGroups1 = - maybe_create_group(VirtualHost, + case maybe_create_group(VirtualHost, Stream, PartitionIndex, ConsumerName, - StreamGroups0), - - do_register_consumer(VirtualHost, - Stream, - PartitionIndex, - ConsumerName, - ConnectionPid, - Owner, - SubscriptionId, - State#?MODULE{groups = StreamGroups1}); + StreamGroups0) of + {ok, StreamGroups1} -> + do_register_consumer(VirtualHost, + Stream, + PartitionIndex, + ConsumerName, + ConnectionPid, + Owner, + SubscriptionId, + State#?MODULE{groups = StreamGroups1}); + {error, Error} -> + {State, {error, Error}, []} + end; apply(#command_unregister_consumer{vhost = VirtualHost, stream = Stream, consumer_name = ConsumerName, @@ -644,12 +646,15 @@ maybe_create_group(VirtualHost, ConsumerName, StreamGroups) -> case StreamGroups of - #{{VirtualHost, Stream, ConsumerName} := _Group} -> - StreamGroups; + #{{VirtualHost, Stream, ConsumerName} := #group{partition_index = PI}} + when PI =/= PartitionIndex -> + {error, partition_index_conflict}; + #{{VirtualHost, Stream, ConsumerName} := _} -> + {ok, StreamGroups}; SGS -> - maps:put({VirtualHost, Stream, ConsumerName}, - #group{consumers = [], partition_index = PartitionIndex}, - SGS) + {ok, maps:put({VirtualHost, Stream, ConsumerName}, + #group{consumers = [], partition_index = PartitionIndex}, + SGS)} end. lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) -> diff --git a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl index e5ef38d0fb..0a54ce4f05 100644 --- a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl @@ -503,6 +503,20 @@ handle_connection_down_super_stream_no_active_removed_or_present_test(_) -> Groups), ok. +register_consumer_with_different_partition_index_should_return_error_test(_) -> + Stream = <<"stream">>, + ConsumerName = <<"app">>, + ConnectionPid = self(), + Command0 = + register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 0), + State0 = state(), + {State1, {ok, true}, _} = + rabbit_stream_sac_coordinator:apply(Command0, State0), + Command1 = + register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 1), + {_, {error, partition_index_conflict}, []} = + rabbit_stream_sac_coordinator:apply(Command1, State1). + assertSize(Expected, []) -> ?assertEqual(Expected, 0); assertSize(Expected, Map) when is_map(Map) -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index f069e25b04..e5931ce041 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -1927,21 +1927,17 @@ handle_frame_post_auth(Transport, {ok, #stream_connection{user = User} = C}, Sta {C, State}; handle_frame_post_auth(Transport, {ok, #stream_connection{ - name = ConnName, - socket = Socket, stream_subscriptions = StreamSubscriptions, virtual_host = VirtualHost, - user = User, - send_file_oct = SendFileOct, - transport = ConnTransport} = Connection}, - #stream_connection_state{consumers = Consumers} = State, + user = User} = Connection}, + State, {request, CorrelationId, {subscribe, SubscriptionId, Stream, OffsetSpec, - Credit, - Properties}}) -> + _Credit, + Properties}} = Request) -> QueueResource = #resource{name = Stream, kind = queue, @@ -2004,89 +2000,9 @@ handle_frame_post_auth(Transport, increase_protocol_counter(?PRECONDITION_FAILED), {Connection, State}; _ -> - Log = case Sac of - true -> - undefined; - false -> - init_reader(ConnTransport, - LocalMemberPid, - QueueResource, - SubscriptionId, - Properties, - OffsetSpec) - end, - - ConsumerCounters = - atomics:new(2, [{signed, false}]), - - response_ok(Transport, - Connection, - subscribe, - CorrelationId), - - Active = - maybe_register_consumer(VirtualHost, - Stream, - ConsumerName, - ConnName, - SubscriptionId, - Properties, - Sac), - - ConsumerConfiguration = - #consumer_configuration{member_pid = - LocalMemberPid, - subscription_id - = - SubscriptionId, - socket = Socket, - stream = Stream, - offset = - OffsetSpec, - counters = - ConsumerCounters, - properties = - Properties, - active = - Active}, - SendLimit = Credit div 2, - ConsumerState = - #consumer{configuration = - ConsumerConfiguration, - log = Log, - send_limit = SendLimit, - credit = Credit}, - - Connection1 = - maybe_monitor_stream(LocalMemberPid, - Stream, - Connection), - - State1 = - maybe_dispatch_on_subscription(Transport, - State, - ConsumerState, - Connection1, - Consumers, - Stream, - SubscriptionId, - Properties, - SendFileOct, - Sac), - StreamSubscriptions1 = - case StreamSubscriptions of - #{Stream := SubscriptionIds} -> - StreamSubscriptions#{Stream => - [SubscriptionId] - ++ SubscriptionIds}; - _ -> - StreamSubscriptions#{Stream => - [SubscriptionId]} - end, - {Connection1#stream_connection{stream_subscriptions - = - StreamSubscriptions1}, - State1} + handle_subscription(Transport, Connection, + State, Request, + LocalMemberPid) end end end; @@ -2995,8 +2911,106 @@ maybe_dispatch_on_subscription(_Transport, Consumers1 = Consumers#{SubscriptionId => ConsumerState}, State#stream_connection_state{consumers = Consumers1}. +handle_subscription(Transport,#stream_connection{ + name = ConnName, + socket = Socket, + stream_subscriptions = StreamSubscriptions, + virtual_host = VirtualHost, + send_file_oct = SendFileOct, + transport = ConnTransport} = Connection, + #stream_connection_state{consumers = Consumers} = State, + {request, CorrelationId, {subscribe, + SubscriptionId, + Stream, + OffsetSpec, + Credit, + Properties}}, + LocalMemberPid) -> + Sac = single_active_consumer(Properties), + ConsumerName = consumer_name(Properties), + QueueResource = #resource{name = Stream, + kind = queue, + virtual_host = VirtualHost}, + case maybe_register_consumer(VirtualHost, Stream, ConsumerName, ConnName, + SubscriptionId, Properties, Sac) of + {ok, Active} -> + Log = case Sac of + true -> + undefined; + false -> + init_reader(ConnTransport, + LocalMemberPid, + QueueResource, + SubscriptionId, + Properties, + OffsetSpec) + end, + + ConsumerCounters = atomics:new(2, [{signed, false}]), + + response_ok(Transport, + Connection, + subscribe, + CorrelationId), + + ConsumerConfiguration = #consumer_configuration{ + member_pid = LocalMemberPid, + subscription_id = SubscriptionId, + socket = Socket, + stream = Stream, + offset = OffsetSpec, + counters = ConsumerCounters, + properties = Properties, + active = Active}, + SendLimit = Credit div 2, + ConsumerState = + #consumer{configuration = ConsumerConfiguration, + log = Log, + send_limit = SendLimit, + credit = Credit}, + + Connection1 = maybe_monitor_stream(LocalMemberPid, + Stream, + Connection), + + State1 = maybe_dispatch_on_subscription(Transport, + State, + ConsumerState, + Connection1, + Consumers, + Stream, + SubscriptionId, + Properties, + SendFileOct, + Sac), + StreamSubscriptions1 = + case StreamSubscriptions of + #{Stream := SubscriptionIds} -> + StreamSubscriptions#{Stream => + [SubscriptionId] + ++ SubscriptionIds}; + _ -> + StreamSubscriptions#{Stream => + [SubscriptionId]} + end, + {Connection1#stream_connection{stream_subscriptions + = + StreamSubscriptions1}, + State1}; + {error, Reason} -> + rabbit_log:warning("Cannot create SAC subcription ~tp: ~tp", + [SubscriptionId, Reason]), + response(Transport, + Connection, + subscribe, + CorrelationId, + ?RESPONSE_CODE_PRECONDITION_FAILED), + increase_protocol_counter(?PRECONDITION_FAILED), + {Connection, State} + end. + maybe_register_consumer(_, _, _, _, _, _, false = _Sac) -> - true; + {ok, true}; maybe_register_consumer(VirtualHost, Stream, ConsumerName, @@ -3005,15 +3019,13 @@ maybe_register_consumer(VirtualHost, Properties, true) -> PartitionIndex = partition_index(VirtualHost, Stream, Properties), - {ok, Active} = - rabbit_stream_sac_coordinator:register_consumer(VirtualHost, - Stream, - PartitionIndex, - ConsumerName, - self(), - ConnectionName, - SubscriptionId), - Active. + rabbit_stream_sac_coordinator:register_consumer(VirtualHost, + Stream, + PartitionIndex, + ConsumerName, + self(), + ConnectionName, + SubscriptionId). maybe_send_consumer_update(Transport, Connection = #stream_connection{ diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index deade27bca..66a111cc3b 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -68,7 +68,8 @@ groups() -> test_publisher_with_too_long_reference_errors, test_consumer_with_too_long_reference_errors, subscribe_unsubscribe_should_create_events, - test_stream_test_utils + test_stream_test_utils, + sac_subscription_with_partition_index_conflict_should_return_error ]}, %% Run `test_global_counters` on its own so the global metrics are %% initialised to 0 for each testcase @@ -1069,6 +1070,52 @@ test_stream_test_utils(Config) -> {ok, _} = stream_test_utils:close(S, C5), ok. +sac_subscription_with_partition_index_conflict_should_return_error(Config) -> + T = gen_tcp, + App = <<"app-1">>, + {ok, S, C0} = stream_test_utils:connect(Config, 0), + Ss = atom_to_binary(?FUNCTION_NAME, utf8), + Partition = unicode:characters_to_binary([Ss, <<"-0">>]), + SsCreationFrame = request({create_super_stream, Ss, [Partition], [<<"0">>], #{}}), + ok = T:send(S, SsCreationFrame), + {Cmd1, C1} = receive_commands(T, S, C0), + ?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_OK}}, + Cmd1), + + SacSubscribeFrame = request({subscribe, 0, Partition, + first, 1, + #{<<"single-active-consumer">> => <<"true">>, + <<"name">> => App}}), + ok = T:send(S, SacSubscribeFrame), + {Cmd2, C2} = receive_commands(T, S, C1), + ?assertMatch({response, 1, {subscribe, ?RESPONSE_CODE_OK}}, + Cmd2), + {Cmd3, C3} = receive_commands(T, S, C2), + ?assertMatch({request,0,{consumer_update,0,true}}, + Cmd3), + + SsSubscribeFrame = request({subscribe, 1, Partition, + first, 1, + #{<<"super-stream">> => Ss, + <<"single-active-consumer">> => <<"true">>, + <<"name">> => App}}), + ok = T:send(S, SsSubscribeFrame), + {Cmd4, C4} = receive_commands(T, S, C3), + ?assertMatch({response, 1, {subscribe, ?RESPONSE_CODE_PRECONDITION_FAILED}}, + Cmd4), + + {ok, C5} = stream_test_utils:unsubscribe(S, C4, 0), + + SsDeletionFrame = request({delete_super_stream, Ss}), + ok = T:send(S, SsDeletionFrame), + {Cmd5, C5} = receive_commands(T, S, C5), + ?assertMatch({response, 1, {delete_super_stream, ?RESPONSE_CODE_OK}}, + Cmd5), + + {ok, _} = stream_test_utils:close(S, C5), + ok. + + filtered_events(Config, EventType) -> Events = rabbit_ct_broker_helpers:rpc(Config, 0, gen_event,