Remove only stream subscriptions affected by down stream member

The clean-up of a stream connection state when a stream member goes down can
remove subscriptions not affected by the member. The subscription state is
removed from the connection, but the subscription is not removed from
the SAC state (if the subscription is a SAC), because the subscription member
PID does not match the down member PID.

When the actual member of the subscription goes down, the subscription is no
longer part of the state, so the clean-up does not find the subscription
and does not remove it from the SAC state. This lets a ghost consumer in
the corresponding SAC group.

This commit makes sure only the affected subscriptions are removed from
the state when a stream member goes down.

Fixes #13961
This commit is contained in:
Arnaud Cogoluègnes 2025-06-02 09:18:24 +02:00
parent d1aab61566
commit a9cf049030
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
2 changed files with 138 additions and 77 deletions

View File

@ -106,7 +106,8 @@
close_sent/3]). close_sent/3]).
-ifdef(TEST). -ifdef(TEST).
-export([ensure_token_expiry_timer/2, -export([ensure_token_expiry_timer/2,
evaluate_state_after_secret_update/4]). evaluate_state_after_secret_update/4,
clean_subscriptions/4]).
-endif. -endif.
callback_mode() -> callback_mode() ->
@ -3280,89 +3281,19 @@ clean_state_after_super_stream_deletion(Partitions, Connection, State, Transport
clean_state_after_stream_deletion_or_failure(MemberPid, Stream, clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
#stream_connection{ #stream_connection{
user = #user{username = Username}, stream_leaders = Leaders} = C0,
virtual_host = VirtualHost, S0) ->
stream_subscriptions = StreamSubscriptions,
publishers = Publishers,
publisher_to_ids = PublisherToIds,
stream_leaders = Leaders,
outstanding_requests = Requests0} = C0,
#stream_connection_state{consumers = Consumers} = S0) ->
{SubscriptionsCleaned, C1, S1} = {SubscriptionsCleaned, C1, S1} =
case stream_has_subscriptions(Stream, C0) of case stream_has_subscriptions(Stream, C0) of
true -> true ->
#{Stream := SubscriptionIds} = StreamSubscriptions, clean_subscriptions(MemberPid, Stream, C0, S0);
Requests1 = lists:foldl(
fun(SubId, Rqsts0) ->
#{SubId := Consumer} = Consumers,
case {MemberPid, Consumer} of
{undefined, _C} ->
rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(Stream,
C0),
SubId,
Username),
maybe_unregister_consumer(
VirtualHost, Consumer,
single_active_consumer(Consumer),
Rqsts0);
{MemberPid, #consumer{configuration =
#consumer_configuration{member_pid = MemberPid}}} ->
rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(Stream,
C0),
SubId,
Username),
maybe_unregister_consumer(
VirtualHost, Consumer,
single_active_consumer(Consumer),
Rqsts0);
_ ->
Rqsts0
end
end, Requests0, SubscriptionIds),
{true,
C0#stream_connection{stream_subscriptions =
maps:remove(Stream,
StreamSubscriptions),
outstanding_requests = Requests1},
S0#stream_connection_state{consumers =
maps:without(SubscriptionIds,
Consumers)}};
false -> false ->
{false, C0, S0} {false, C0, S0}
end, end,
{PublishersCleaned, C2, S2} = {PublishersCleaned, C2, S2} =
case stream_has_publishers(Stream, C1) of case stream_has_publishers(Stream, C1) of
true -> true ->
{PurgedPubs, PurgedPubToIds} = clean_publishers(MemberPid, Stream, C1, S1);
maps:fold(fun(PubId,
#publisher{stream = S, reference = Ref},
{Pubs, PubToIds}) when S =:= Stream andalso MemberPid =:= undefined ->
rabbit_stream_metrics:publisher_deleted(self(),
stream_r(Stream,
C1),
PubId),
{maps:remove(PubId, Pubs),
maps:remove({Stream, Ref}, PubToIds)};
(PubId,
#publisher{stream = S, reference = Ref, leader = MPid},
{Pubs, PubToIds}) when S =:= Stream andalso MPid =:= MemberPid ->
rabbit_stream_metrics:publisher_deleted(self(),
stream_r(Stream,
C1),
PubId),
{maps:remove(PubId, Pubs),
maps:remove({Stream, Ref}, PubToIds)};
(_PubId, _Publisher, {Pubs, PubToIds}) ->
{Pubs, PubToIds}
end,
{Publishers, PublisherToIds}, Publishers),
{true,
C1#stream_connection{publishers = PurgedPubs,
publisher_to_ids = PurgedPubToIds},
S1};
false -> false ->
{false, C1, S1} {false, C1, S1}
end, end,
@ -3384,6 +3315,98 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
{not_cleaned, C2#stream_connection{stream_leaders = Leaders1}, S2} {not_cleaned, C2#stream_connection{stream_leaders = Leaders1}, S2}
end. end.
clean_subscriptions(MemberPid, Stream,
#stream_connection{user = #user{username = Username},
virtual_host = VirtualHost,
stream_subscriptions = StreamSubs,
outstanding_requests = Requests0} = C0,
#stream_connection_state{consumers = Consumers} = S0) ->
#{Stream := SubIds} = StreamSubs,
{DelSubs1, Requests1} =
lists:foldl(
fun(SubId, {DelSubIds, Rqsts0}) ->
#{SubId := Consumer} = Consumers,
case {MemberPid, Consumer} of
{undefined, _C} ->
rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(Stream,
C0),
SubId,
Username),
Rqsts1 = maybe_unregister_consumer(
VirtualHost, Consumer,
single_active_consumer(Consumer),
Rqsts0),
{[SubId | DelSubIds], Rqsts1};
{MemberPid,
#consumer{configuration =
#consumer_configuration{member_pid = MemberPid}}} ->
rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(Stream,
C0),
SubId,
Username),
Rqsts1 = maybe_unregister_consumer(
VirtualHost, Consumer,
single_active_consumer(Consumer),
Rqsts0),
{[SubId | DelSubIds], Rqsts1};
_ ->
{DelSubIds, Rqsts0}
end
end, {[], Requests0}, SubIds),
case DelSubs1 of
[] ->
{false, C0, S0};
_ ->
StreamSubs1 = case SubIds -- DelSubs1 of
[] ->
maps:remove(Stream, StreamSubs);
RemSubIds ->
StreamSubs#{Stream => RemSubIds}
end,
Consumers1 = maps:without(DelSubs1, Consumers),
{true,
C0#stream_connection{stream_subscriptions = StreamSubs1,
outstanding_requests = Requests1},
S0#stream_connection_state{consumers = Consumers1}}
end.
clean_publishers(MemberPid, Stream,
#stream_connection{
publishers = Publishers,
publisher_to_ids = PublisherToIds} = C0, S0) ->
{Updated, PurgedPubs, PurgedPubToIds} =
maps:fold(fun(PubId, #publisher{stream = S, reference = Ref},
{_, Pubs, PubToIds})
when S =:= Stream andalso MemberPid =:= undefined ->
rabbit_stream_metrics:publisher_deleted(self(),
stream_r(Stream,
C0),
PubId),
{true,
maps:remove(PubId, Pubs),
maps:remove({Stream, Ref}, PubToIds)};
(PubId, #publisher{stream = S, reference = Ref, leader = MPid},
{_, Pubs, PubToIds})
when S =:= Stream andalso MPid =:= MemberPid ->
rabbit_stream_metrics:publisher_deleted(self(),
stream_r(Stream,
C0),
PubId),
{true,
maps:remove(PubId, Pubs),
maps:remove({Stream, Ref}, PubToIds)};
(_PubId, _Publisher, {Updated, Pubs, PubToIds}) ->
{Updated, Pubs, PubToIds}
end,
{false, Publishers, PublisherToIds}, Publishers),
{Updated,
C0#stream_connection{publishers = PurgedPubs,
publisher_to_ids = PurgedPubToIds},
S0}.
store_offset(Reference, _, _, C) when ?IS_INVALID_REF(Reference) -> store_offset(Reference, _, _, C) when ?IS_INVALID_REF(Reference) ->
rabbit_log:warning("Reference is too long to store offset: ~p", [byte_size(Reference)]), rabbit_log:warning("Reference is too long to store offset: ~p", [byte_size(Reference)]),
C; C;
@ -3401,8 +3424,7 @@ store_offset(Reference, Stream, Offset, Connection0) ->
lookup_leader(Stream, lookup_leader(Stream,
#stream_connection{stream_leaders = StreamLeaders, #stream_connection{stream_leaders = StreamLeaders,
virtual_host = VirtualHost} = virtual_host = VirtualHost} = Connection) ->
Connection) ->
case maps:get(Stream, StreamLeaders, undefined) of case maps:get(Stream, StreamLeaders, undefined) of
undefined -> undefined ->
case lookup_leader_from_manager(VirtualHost, Stream) of case lookup_leader_from_manager(VirtualHost, Stream) of
@ -3411,6 +3433,7 @@ lookup_leader(Stream,
{ok, LeaderPid} -> {ok, LeaderPid} ->
Connection1 = Connection1 =
maybe_monitor_stream(LeaderPid, Stream, Connection), maybe_monitor_stream(LeaderPid, Stream, Connection),
{LeaderPid, {LeaderPid,
Connection1#stream_connection{stream_leaders = Connection1#stream_connection{stream_leaders =
StreamLeaders#{Stream => StreamLeaders#{Stream =>

View File

@ -184,6 +184,44 @@ evaluate_state_after_secret_update_test(_) ->
?assert(is_integer(Cancel2)), ?assert(is_integer(Cancel2)),
ok. ok.
clean_subscriptions_should_remove_only_affected_subscriptions_test(_) ->
Mod = rabbit_stream_reader,
meck:new(Mod, [passthrough]),
meck:new(rabbit_stream_metrics, [stub_all]),
meck:new(rabbit_stream_sac_coordinator, [stub_all]),
S = <<"s1">>,
Pid1 = new_process(),
Pid2 = new_process(),
StreamSubs = #{S => [0, 1]},
Consumers = #{0 => consumer(S, Pid1),
1 => consumer(S, Pid2)},
C0 = #stream_connection{stream_subscriptions = StreamSubs,
user = #user{}},
S0 = #stream_connection_state{consumers = Consumers},
{Cleaned1, C1, S1} = Mod:clean_subscriptions(Pid1, S, C0, S0),
?assert(Cleaned1),
?assertEqual(#{S => [1]},
C1#stream_connection.stream_subscriptions),
?assertEqual(#{1 => consumer(S, Pid2)},
S1#stream_connection_state.consumers),
{Cleaned2, C2, S2} = Mod:clean_subscriptions(Pid2, S, C1, S1),
?assert(Cleaned2),
?assertEqual(#{}, C2#stream_connection.stream_subscriptions),
?assertEqual(#{}, S2#stream_connection_state.consumers),
ok.
consumer(S, Pid) ->
#consumer{configuration = #consumer_configuration{stream = S,
member_pid = Pid}}.
consumer(S) -> consumer(S) ->
#consumer{configuration = #consumer_configuration{stream = S}, #consumer{configuration = #consumer_configuration{stream = S},
log = osiris_log:init(#{})}. log = osiris_log:init(#{})}.
new_process() ->
spawn(node(), fun() -> ok end).