Fix partition index conflict in stream SAC coordinator
Consumers with a same name, consuming from the same stream should have the same partition index. This commit adds a check to enforce this rule and make the subscription fail if it does not comply. Fixes #13835
This commit is contained in:
parent
99a9237abb
commit
cad8b70ee8
|
@ -198,13 +198,12 @@ apply(#command_register_consumer{vhost = VirtualHost,
|
||||||
owner = Owner,
|
owner = Owner,
|
||||||
subscription_id = SubscriptionId},
|
subscription_id = SubscriptionId},
|
||||||
#?MODULE{groups = StreamGroups0} = State) ->
|
#?MODULE{groups = StreamGroups0} = State) ->
|
||||||
StreamGroups1 =
|
case maybe_create_group(VirtualHost,
|
||||||
maybe_create_group(VirtualHost,
|
|
||||||
Stream,
|
Stream,
|
||||||
PartitionIndex,
|
PartitionIndex,
|
||||||
ConsumerName,
|
ConsumerName,
|
||||||
StreamGroups0),
|
StreamGroups0) of
|
||||||
|
{ok, StreamGroups1} ->
|
||||||
do_register_consumer(VirtualHost,
|
do_register_consumer(VirtualHost,
|
||||||
Stream,
|
Stream,
|
||||||
PartitionIndex,
|
PartitionIndex,
|
||||||
|
@ -213,6 +212,9 @@ apply(#command_register_consumer{vhost = VirtualHost,
|
||||||
Owner,
|
Owner,
|
||||||
SubscriptionId,
|
SubscriptionId,
|
||||||
State#?MODULE{groups = StreamGroups1});
|
State#?MODULE{groups = StreamGroups1});
|
||||||
|
{error, Error} ->
|
||||||
|
{State, {error, Error}, []}
|
||||||
|
end;
|
||||||
apply(#command_unregister_consumer{vhost = VirtualHost,
|
apply(#command_unregister_consumer{vhost = VirtualHost,
|
||||||
stream = Stream,
|
stream = Stream,
|
||||||
consumer_name = ConsumerName,
|
consumer_name = ConsumerName,
|
||||||
|
@ -644,12 +646,15 @@ maybe_create_group(VirtualHost,
|
||||||
ConsumerName,
|
ConsumerName,
|
||||||
StreamGroups) ->
|
StreamGroups) ->
|
||||||
case StreamGroups of
|
case StreamGroups of
|
||||||
#{{VirtualHost, Stream, ConsumerName} := _Group} ->
|
#{{VirtualHost, Stream, ConsumerName} := #group{partition_index = PI}}
|
||||||
StreamGroups;
|
when PI =/= PartitionIndex ->
|
||||||
|
{error, partition_index_conflict};
|
||||||
|
#{{VirtualHost, Stream, ConsumerName} := _} ->
|
||||||
|
{ok, StreamGroups};
|
||||||
SGS ->
|
SGS ->
|
||||||
maps:put({VirtualHost, Stream, ConsumerName},
|
{ok, maps:put({VirtualHost, Stream, ConsumerName},
|
||||||
#group{consumers = [], partition_index = PartitionIndex},
|
#group{consumers = [], partition_index = PartitionIndex},
|
||||||
SGS)
|
SGS)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) ->
|
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) ->
|
||||||
|
|
|
@ -503,6 +503,20 @@ handle_connection_down_super_stream_no_active_removed_or_present_test(_) ->
|
||||||
Groups),
|
Groups),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
register_consumer_with_different_partition_index_should_return_error_test(_) ->
|
||||||
|
Stream = <<"stream">>,
|
||||||
|
ConsumerName = <<"app">>,
|
||||||
|
ConnectionPid = self(),
|
||||||
|
Command0 =
|
||||||
|
register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 0),
|
||||||
|
State0 = state(),
|
||||||
|
{State1, {ok, true}, _} =
|
||||||
|
rabbit_stream_sac_coordinator:apply(Command0, State0),
|
||||||
|
Command1 =
|
||||||
|
register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 1),
|
||||||
|
{_, {error, partition_index_conflict}, []} =
|
||||||
|
rabbit_stream_sac_coordinator:apply(Command1, State1).
|
||||||
|
|
||||||
assertSize(Expected, []) ->
|
assertSize(Expected, []) ->
|
||||||
?assertEqual(Expected, 0);
|
?assertEqual(Expected, 0);
|
||||||
assertSize(Expected, Map) when is_map(Map) ->
|
assertSize(Expected, Map) when is_map(Map) ->
|
||||||
|
|
|
@ -1927,21 +1927,17 @@ handle_frame_post_auth(Transport, {ok, #stream_connection{user = User} = C}, Sta
|
||||||
{C, State};
|
{C, State};
|
||||||
handle_frame_post_auth(Transport,
|
handle_frame_post_auth(Transport,
|
||||||
{ok, #stream_connection{
|
{ok, #stream_connection{
|
||||||
name = ConnName,
|
|
||||||
socket = Socket,
|
|
||||||
stream_subscriptions = StreamSubscriptions,
|
stream_subscriptions = StreamSubscriptions,
|
||||||
virtual_host = VirtualHost,
|
virtual_host = VirtualHost,
|
||||||
user = User,
|
user = User} = Connection},
|
||||||
send_file_oct = SendFileOct,
|
State,
|
||||||
transport = ConnTransport} = Connection},
|
|
||||||
#stream_connection_state{consumers = Consumers} = State,
|
|
||||||
{request, CorrelationId,
|
{request, CorrelationId,
|
||||||
{subscribe,
|
{subscribe,
|
||||||
SubscriptionId,
|
SubscriptionId,
|
||||||
Stream,
|
Stream,
|
||||||
OffsetSpec,
|
OffsetSpec,
|
||||||
Credit,
|
_Credit,
|
||||||
Properties}}) ->
|
Properties}} = Request) ->
|
||||||
QueueResource =
|
QueueResource =
|
||||||
#resource{name = Stream,
|
#resource{name = Stream,
|
||||||
kind = queue,
|
kind = queue,
|
||||||
|
@ -2004,89 +2000,9 @@ handle_frame_post_auth(Transport,
|
||||||
increase_protocol_counter(?PRECONDITION_FAILED),
|
increase_protocol_counter(?PRECONDITION_FAILED),
|
||||||
{Connection, State};
|
{Connection, State};
|
||||||
_ ->
|
_ ->
|
||||||
Log = case Sac of
|
handle_subscription(Transport, Connection,
|
||||||
true ->
|
State, Request,
|
||||||
undefined;
|
LocalMemberPid)
|
||||||
false ->
|
|
||||||
init_reader(ConnTransport,
|
|
||||||
LocalMemberPid,
|
|
||||||
QueueResource,
|
|
||||||
SubscriptionId,
|
|
||||||
Properties,
|
|
||||||
OffsetSpec)
|
|
||||||
end,
|
|
||||||
|
|
||||||
ConsumerCounters =
|
|
||||||
atomics:new(2, [{signed, false}]),
|
|
||||||
|
|
||||||
response_ok(Transport,
|
|
||||||
Connection,
|
|
||||||
subscribe,
|
|
||||||
CorrelationId),
|
|
||||||
|
|
||||||
Active =
|
|
||||||
maybe_register_consumer(VirtualHost,
|
|
||||||
Stream,
|
|
||||||
ConsumerName,
|
|
||||||
ConnName,
|
|
||||||
SubscriptionId,
|
|
||||||
Properties,
|
|
||||||
Sac),
|
|
||||||
|
|
||||||
ConsumerConfiguration =
|
|
||||||
#consumer_configuration{member_pid =
|
|
||||||
LocalMemberPid,
|
|
||||||
subscription_id
|
|
||||||
=
|
|
||||||
SubscriptionId,
|
|
||||||
socket = Socket,
|
|
||||||
stream = Stream,
|
|
||||||
offset =
|
|
||||||
OffsetSpec,
|
|
||||||
counters =
|
|
||||||
ConsumerCounters,
|
|
||||||
properties =
|
|
||||||
Properties,
|
|
||||||
active =
|
|
||||||
Active},
|
|
||||||
SendLimit = Credit div 2,
|
|
||||||
ConsumerState =
|
|
||||||
#consumer{configuration =
|
|
||||||
ConsumerConfiguration,
|
|
||||||
log = Log,
|
|
||||||
send_limit = SendLimit,
|
|
||||||
credit = Credit},
|
|
||||||
|
|
||||||
Connection1 =
|
|
||||||
maybe_monitor_stream(LocalMemberPid,
|
|
||||||
Stream,
|
|
||||||
Connection),
|
|
||||||
|
|
||||||
State1 =
|
|
||||||
maybe_dispatch_on_subscription(Transport,
|
|
||||||
State,
|
|
||||||
ConsumerState,
|
|
||||||
Connection1,
|
|
||||||
Consumers,
|
|
||||||
Stream,
|
|
||||||
SubscriptionId,
|
|
||||||
Properties,
|
|
||||||
SendFileOct,
|
|
||||||
Sac),
|
|
||||||
StreamSubscriptions1 =
|
|
||||||
case StreamSubscriptions of
|
|
||||||
#{Stream := SubscriptionIds} ->
|
|
||||||
StreamSubscriptions#{Stream =>
|
|
||||||
[SubscriptionId]
|
|
||||||
++ SubscriptionIds};
|
|
||||||
_ ->
|
|
||||||
StreamSubscriptions#{Stream =>
|
|
||||||
[SubscriptionId]}
|
|
||||||
end,
|
|
||||||
{Connection1#stream_connection{stream_subscriptions
|
|
||||||
=
|
|
||||||
StreamSubscriptions1},
|
|
||||||
State1}
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
|
@ -2995,8 +2911,106 @@ maybe_dispatch_on_subscription(_Transport,
|
||||||
Consumers1 = Consumers#{SubscriptionId => ConsumerState},
|
Consumers1 = Consumers#{SubscriptionId => ConsumerState},
|
||||||
State#stream_connection_state{consumers = Consumers1}.
|
State#stream_connection_state{consumers = Consumers1}.
|
||||||
|
|
||||||
|
handle_subscription(Transport,#stream_connection{
|
||||||
|
name = ConnName,
|
||||||
|
socket = Socket,
|
||||||
|
stream_subscriptions = StreamSubscriptions,
|
||||||
|
virtual_host = VirtualHost,
|
||||||
|
send_file_oct = SendFileOct,
|
||||||
|
transport = ConnTransport} = Connection,
|
||||||
|
#stream_connection_state{consumers = Consumers} = State,
|
||||||
|
{request, CorrelationId, {subscribe,
|
||||||
|
SubscriptionId,
|
||||||
|
Stream,
|
||||||
|
OffsetSpec,
|
||||||
|
Credit,
|
||||||
|
Properties}},
|
||||||
|
LocalMemberPid) ->
|
||||||
|
Sac = single_active_consumer(Properties),
|
||||||
|
ConsumerName = consumer_name(Properties),
|
||||||
|
QueueResource = #resource{name = Stream,
|
||||||
|
kind = queue,
|
||||||
|
virtual_host = VirtualHost},
|
||||||
|
case maybe_register_consumer(VirtualHost, Stream, ConsumerName, ConnName,
|
||||||
|
SubscriptionId, Properties, Sac) of
|
||||||
|
{ok, Active} ->
|
||||||
|
Log = case Sac of
|
||||||
|
true ->
|
||||||
|
undefined;
|
||||||
|
false ->
|
||||||
|
init_reader(ConnTransport,
|
||||||
|
LocalMemberPid,
|
||||||
|
QueueResource,
|
||||||
|
SubscriptionId,
|
||||||
|
Properties,
|
||||||
|
OffsetSpec)
|
||||||
|
end,
|
||||||
|
|
||||||
|
ConsumerCounters = atomics:new(2, [{signed, false}]),
|
||||||
|
|
||||||
|
response_ok(Transport,
|
||||||
|
Connection,
|
||||||
|
subscribe,
|
||||||
|
CorrelationId),
|
||||||
|
|
||||||
|
ConsumerConfiguration = #consumer_configuration{
|
||||||
|
member_pid = LocalMemberPid,
|
||||||
|
subscription_id = SubscriptionId,
|
||||||
|
socket = Socket,
|
||||||
|
stream = Stream,
|
||||||
|
offset = OffsetSpec,
|
||||||
|
counters = ConsumerCounters,
|
||||||
|
properties = Properties,
|
||||||
|
active = Active},
|
||||||
|
SendLimit = Credit div 2,
|
||||||
|
ConsumerState =
|
||||||
|
#consumer{configuration = ConsumerConfiguration,
|
||||||
|
log = Log,
|
||||||
|
send_limit = SendLimit,
|
||||||
|
credit = Credit},
|
||||||
|
|
||||||
|
Connection1 = maybe_monitor_stream(LocalMemberPid,
|
||||||
|
Stream,
|
||||||
|
Connection),
|
||||||
|
|
||||||
|
State1 = maybe_dispatch_on_subscription(Transport,
|
||||||
|
State,
|
||||||
|
ConsumerState,
|
||||||
|
Connection1,
|
||||||
|
Consumers,
|
||||||
|
Stream,
|
||||||
|
SubscriptionId,
|
||||||
|
Properties,
|
||||||
|
SendFileOct,
|
||||||
|
Sac),
|
||||||
|
StreamSubscriptions1 =
|
||||||
|
case StreamSubscriptions of
|
||||||
|
#{Stream := SubscriptionIds} ->
|
||||||
|
StreamSubscriptions#{Stream =>
|
||||||
|
[SubscriptionId]
|
||||||
|
++ SubscriptionIds};
|
||||||
|
_ ->
|
||||||
|
StreamSubscriptions#{Stream =>
|
||||||
|
[SubscriptionId]}
|
||||||
|
end,
|
||||||
|
{Connection1#stream_connection{stream_subscriptions
|
||||||
|
=
|
||||||
|
StreamSubscriptions1},
|
||||||
|
State1};
|
||||||
|
{error, Reason} ->
|
||||||
|
rabbit_log:warning("Cannot create SAC subcription ~tp: ~tp",
|
||||||
|
[SubscriptionId, Reason]),
|
||||||
|
response(Transport,
|
||||||
|
Connection,
|
||||||
|
subscribe,
|
||||||
|
CorrelationId,
|
||||||
|
?RESPONSE_CODE_PRECONDITION_FAILED),
|
||||||
|
increase_protocol_counter(?PRECONDITION_FAILED),
|
||||||
|
{Connection, State}
|
||||||
|
end.
|
||||||
|
|
||||||
maybe_register_consumer(_, _, _, _, _, _, false = _Sac) ->
|
maybe_register_consumer(_, _, _, _, _, _, false = _Sac) ->
|
||||||
true;
|
{ok, true};
|
||||||
maybe_register_consumer(VirtualHost,
|
maybe_register_consumer(VirtualHost,
|
||||||
Stream,
|
Stream,
|
||||||
ConsumerName,
|
ConsumerName,
|
||||||
|
@ -3005,15 +3019,13 @@ maybe_register_consumer(VirtualHost,
|
||||||
Properties,
|
Properties,
|
||||||
true) ->
|
true) ->
|
||||||
PartitionIndex = partition_index(VirtualHost, Stream, Properties),
|
PartitionIndex = partition_index(VirtualHost, Stream, Properties),
|
||||||
{ok, Active} =
|
|
||||||
rabbit_stream_sac_coordinator:register_consumer(VirtualHost,
|
rabbit_stream_sac_coordinator:register_consumer(VirtualHost,
|
||||||
Stream,
|
Stream,
|
||||||
PartitionIndex,
|
PartitionIndex,
|
||||||
ConsumerName,
|
ConsumerName,
|
||||||
self(),
|
self(),
|
||||||
ConnectionName,
|
ConnectionName,
|
||||||
SubscriptionId),
|
SubscriptionId).
|
||||||
Active.
|
|
||||||
|
|
||||||
maybe_send_consumer_update(Transport,
|
maybe_send_consumer_update(Transport,
|
||||||
Connection = #stream_connection{
|
Connection = #stream_connection{
|
||||||
|
|
|
@ -68,7 +68,8 @@ groups() ->
|
||||||
test_publisher_with_too_long_reference_errors,
|
test_publisher_with_too_long_reference_errors,
|
||||||
test_consumer_with_too_long_reference_errors,
|
test_consumer_with_too_long_reference_errors,
|
||||||
subscribe_unsubscribe_should_create_events,
|
subscribe_unsubscribe_should_create_events,
|
||||||
test_stream_test_utils
|
test_stream_test_utils,
|
||||||
|
sac_subscription_with_partition_index_conflict_should_return_error
|
||||||
]},
|
]},
|
||||||
%% Run `test_global_counters` on its own so the global metrics are
|
%% Run `test_global_counters` on its own so the global metrics are
|
||||||
%% initialised to 0 for each testcase
|
%% initialised to 0 for each testcase
|
||||||
|
@ -1069,6 +1070,52 @@ test_stream_test_utils(Config) ->
|
||||||
{ok, _} = stream_test_utils:close(S, C5),
|
{ok, _} = stream_test_utils:close(S, C5),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
sac_subscription_with_partition_index_conflict_should_return_error(Config) ->
|
||||||
|
T = gen_tcp,
|
||||||
|
App = <<"app-1">>,
|
||||||
|
{ok, S, C0} = stream_test_utils:connect(Config, 0),
|
||||||
|
Ss = atom_to_binary(?FUNCTION_NAME, utf8),
|
||||||
|
Partition = unicode:characters_to_binary([Ss, <<"-0">>]),
|
||||||
|
SsCreationFrame = request({create_super_stream, Ss, [Partition], [<<"0">>], #{}}),
|
||||||
|
ok = T:send(S, SsCreationFrame),
|
||||||
|
{Cmd1, C1} = receive_commands(T, S, C0),
|
||||||
|
?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_OK}},
|
||||||
|
Cmd1),
|
||||||
|
|
||||||
|
SacSubscribeFrame = request({subscribe, 0, Partition,
|
||||||
|
first, 1,
|
||||||
|
#{<<"single-active-consumer">> => <<"true">>,
|
||||||
|
<<"name">> => App}}),
|
||||||
|
ok = T:send(S, SacSubscribeFrame),
|
||||||
|
{Cmd2, C2} = receive_commands(T, S, C1),
|
||||||
|
?assertMatch({response, 1, {subscribe, ?RESPONSE_CODE_OK}},
|
||||||
|
Cmd2),
|
||||||
|
{Cmd3, C3} = receive_commands(T, S, C2),
|
||||||
|
?assertMatch({request,0,{consumer_update,0,true}},
|
||||||
|
Cmd3),
|
||||||
|
|
||||||
|
SsSubscribeFrame = request({subscribe, 1, Partition,
|
||||||
|
first, 1,
|
||||||
|
#{<<"super-stream">> => Ss,
|
||||||
|
<<"single-active-consumer">> => <<"true">>,
|
||||||
|
<<"name">> => App}}),
|
||||||
|
ok = T:send(S, SsSubscribeFrame),
|
||||||
|
{Cmd4, C4} = receive_commands(T, S, C3),
|
||||||
|
?assertMatch({response, 1, {subscribe, ?RESPONSE_CODE_PRECONDITION_FAILED}},
|
||||||
|
Cmd4),
|
||||||
|
|
||||||
|
{ok, C5} = stream_test_utils:unsubscribe(S, C4, 0),
|
||||||
|
|
||||||
|
SsDeletionFrame = request({delete_super_stream, Ss}),
|
||||||
|
ok = T:send(S, SsDeletionFrame),
|
||||||
|
{Cmd5, C5} = receive_commands(T, S, C5),
|
||||||
|
?assertMatch({response, 1, {delete_super_stream, ?RESPONSE_CODE_OK}},
|
||||||
|
Cmd5),
|
||||||
|
|
||||||
|
{ok, _} = stream_test_utils:close(S, C5),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
|
||||||
filtered_events(Config, EventType) ->
|
filtered_events(Config, EventType) ->
|
||||||
Events = rabbit_ct_broker_helpers:rpc(Config, 0,
|
Events = rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||||
gen_event,
|
gen_event,
|
||||||
|
|
Loading…
Reference in New Issue