Filter out single active consumers from dead connection

[Why]
When re-evaluating single active consumer groups because of the DOWN
connection message, the stream SAC coordinator may register `mod_call`
RA effects to send messages to other connections. These messages aims at
activating or deactivating consumers.

This works fine if one connection at a time dies, but when a node goes
down, several connections may go down and the stream SAC coordinator may
send messages to these dead connections. SAC groups can then get stuck,
with only inactive consumers. This is because the coordinator considers
only one connection during a group evaluation.

[How]
While evaluating the consumers of a SAC group after a DOWN message,
the stream SAC coordinator not only remove the consumers of the "current"
dead connection, but also checks if the consumer connections in the group
are still alive and remove the consumers accordingly.

The consumers are preemptively removed from the group and so not
considered during the evaluation of the new active consumer.
This commit is contained in:
Arnaud Cogoluègnes 2025-04-02 10:27:10 +02:00
parent 82480e42a7
commit 132d992958
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
2 changed files with 72 additions and 9 deletions

View File

@ -435,8 +435,13 @@ handle_group_after_connection_down(Pid,
lists:foldl(
fun(#consumer{pid = P, active = S}, {L, ActiveFlag, _}) when P == Pid ->
{L, S or ActiveFlag, true};
(C, {L, ActiveFlag, AnyFlag}) ->
{L ++ [C], ActiveFlag, AnyFlag}
(#consumer{pid = P, active = S} = C, {L, ActiveFlag, AnyFlag}) ->
case is_alive(P) of
true ->
{L ++ [C], ActiveFlag, AnyFlag};
false ->
{L, S or ActiveFlag, true}
end
end, {[], false, false}, Consumers0),
case AnyRemoved of
@ -454,6 +459,15 @@ handle_group_after_connection_down(Pid,
end
end.
is_alive(Pid) ->
LocalNode = node(),
case node(Pid) of
LocalNode ->
is_process_alive(Pid);
OtherNode ->
rpc:call(OtherNode, erlang, is_process_alive, [Pid])
end.
do_register_consumer(VirtualHost,
Stream,
-1 = _PartitionIndex,

View File

@ -317,7 +317,7 @@ handle_connection_down_sac_should_get_activated_test(_) ->
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid1 = start_process(),
Group = cgroup([consumer(Pid0, 0, true),
consumer(Pid1, 1, false),
consumer(Pid0, 2, false)]),
@ -339,6 +339,7 @@ handle_connection_down_sac_should_get_activated_test(_) ->
assertEmpty(Effects2),
assertEmpty(Groups2),
finish_process(Pid1),
ok.
handle_connection_down_sac_active_does_not_change_test(_) ->
@ -346,7 +347,7 @@ handle_connection_down_sac_active_does_not_change_test(_) ->
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid1 = start_process(),
Group = cgroup([consumer(Pid1, 0, true),
consumer(Pid0, 1, false),
consumer(Pid0, 2, false)]),
@ -361,6 +362,7 @@ handle_connection_down_sac_active_does_not_change_test(_) ->
assertSize(1, maps:get(Pid1, PidsGroups)),
assertEmpty(Effects),
assertHasGroup(GroupId, cgroup([consumer(Pid1, 0, true)]), Groups),
finish_process(Pid1),
ok.
handle_connection_down_sac_no_more_consumers_test(_) ->
@ -386,7 +388,7 @@ handle_connection_down_sac_no_consumers_in_down_connection_test(_) ->
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid1 = start_process(),
Group = cgroup([consumer(Pid1, 0, true),
consumer(Pid1, 1, false)]),
State = state(#{GroupId => Group},
@ -402,6 +404,7 @@ handle_connection_down_sac_no_consumers_in_down_connection_test(_) ->
assertEmpty(Effects),
assertHasGroup(GroupId, cgroup([consumer(Pid1, 0, true), consumer(Pid1, 1, false)]),
Groups),
finish_process(Pid1),
ok.
handle_connection_down_super_stream_active_stays_test(_) ->
@ -409,7 +412,7 @@ handle_connection_down_super_stream_active_stays_test(_) ->
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid1 = start_process(),
Group = cgroup(1, [consumer(Pid0, 0, false),
consumer(Pid0, 1, true),
consumer(Pid1, 2, false),
@ -426,6 +429,7 @@ handle_connection_down_super_stream_active_stays_test(_) ->
assertEmpty(Effects),
assertHasGroup(GroupId, cgroup(1, [consumer(Pid0, 0, false), consumer(Pid0, 1, true)]),
Groups),
finish_process(Pid1),
ok.
handle_connection_down_super_stream_active_changes_test(_) ->
@ -433,7 +437,7 @@ handle_connection_down_super_stream_active_changes_test(_) ->
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid1 = start_process(),
Group = cgroup(1, [consumer(Pid0, 0, false),
consumer(Pid1, 1, true),
consumer(Pid0, 2, false),
@ -450,6 +454,7 @@ handle_connection_down_super_stream_active_changes_test(_) ->
assertSendMessageSteppingDownEffect(Pid1, 1, Stream, ConsumerName, Effects),
assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 1, false), consumer(Pid1, 3, false)]),
Groups),
finish_process(Pid1),
ok.
handle_connection_down_super_stream_activate_in_remaining_connection_test(_) ->
@ -457,7 +462,7 @@ handle_connection_down_super_stream_activate_in_remaining_connection_test(_) ->
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid1 = start_process(),
Group = cgroup(1, [consumer(Pid0, 0, false),
consumer(Pid0, 1, true),
consumer(Pid1, 2, false),
@ -474,6 +479,7 @@ handle_connection_down_super_stream_activate_in_remaining_connection_test(_) ->
assertSendMessageEffect(Pid1, 3, Stream, ConsumerName, true, Effects),
assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 2, false), consumer(Pid1, 3, true)]),
Groups),
finish_process(Pid1),
ok.
handle_connection_down_super_stream_no_active_removed_or_present_test(_) ->
@ -481,7 +487,7 @@ handle_connection_down_super_stream_no_active_removed_or_present_test(_) ->
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid1 = start_process(),
%% this is a weird case that should not happen in the wild,
%% we test the logic in the code nevertheless.
%% No active consumer in the group
@ -501,8 +507,51 @@ handle_connection_down_super_stream_no_active_removed_or_present_test(_) ->
assertEmpty(Effects),
assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 2, false), consumer(Pid1, 3, false)]),
Groups),
finish_process(Pid1),
ok.
handle_connection_down_consumers_from_dead_connection_should_be_filtered_out_test(_) ->
Stream = <<"stream">>,
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = start_process(),
Pid2 = start_process(),
Group = cgroup(1, [consumer(Pid0, 0, false),
consumer(Pid1, 1, true),
consumer(Pid2, 2, false)]),
State = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}]),
Pid1 => maps:from_list([{GroupId, true}]),
Pid2 => maps:from_list([{GroupId, true}])}),
finish_process(Pid1),
finish_process(Pid2),
{#?STATE{pids_groups = PidsGroups, groups = Groups},
Effects} =
rabbit_stream_sac_coordinator:handle_connection_down(Pid1, State),
%% Pid2 is still in the monitors and this is expected in this test
%% it should be cleaned up when the coordinator gets the DOWN message
assertSize(2, PidsGroups),
assertSize(1, maps:get(Pid0, PidsGroups)),
assertSize(1, maps:get(Pid2, PidsGroups)),
assertSendMessageEffect(Pid0, 0, Stream, ConsumerName, true, Effects),
assertHasGroup(GroupId, cgroup(1, [consumer(Pid0, 0, true)]),
Groups),
finish_process(Pid1),
ok.
start_process() ->
spawn(fun() ->
receive
_ -> ok
end
end).
finish_process(Pid) ->
%% finish the process
Pid ! finish.
assertSize(Expected, []) ->
?assertEqual(Expected, 0);
assertSize(Expected, Map) when is_map(Map) ->