diff --git a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl index 9452f1408a..99d76d785c 100644 --- a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl @@ -435,8 +435,13 @@ handle_group_after_connection_down(Pid, lists:foldl( fun(#consumer{pid = P, active = S}, {L, ActiveFlag, _}) when P == Pid -> {L, S or ActiveFlag, true}; - (C, {L, ActiveFlag, AnyFlag}) -> - {L ++ [C], ActiveFlag, AnyFlag} + (#consumer{pid = P, active = S} = C, {L, ActiveFlag, AnyFlag}) -> + case is_alive(P) of + true -> + {L ++ [C], ActiveFlag, AnyFlag}; + false -> + {L, S or ActiveFlag, true} + end end, {[], false, false}, Consumers0), case AnyRemoved of @@ -454,6 +459,15 @@ handle_group_after_connection_down(Pid, end end. +is_alive(Pid) -> + LocalNode = node(), + case node(Pid) of + LocalNode -> + is_process_alive(Pid); + OtherNode -> + rpc:call(OtherNode, erlang, is_process_alive, [Pid]) + end. + do_register_consumer(VirtualHost, Stream, -1 = _PartitionIndex, diff --git a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl index e5ef38d0fb..8b5e5b3630 100644 --- a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl @@ -317,7 +317,7 @@ handle_connection_down_sac_should_get_activated_test(_) -> ConsumerName = <<"app">>, GroupId = {<<"/">>, Stream, ConsumerName}, Pid0 = self(), - Pid1 = spawn(fun() -> ok end), + Pid1 = start_process(), Group = cgroup([consumer(Pid0, 0, true), consumer(Pid1, 1, false), consumer(Pid0, 2, false)]), @@ -339,6 +339,7 @@ handle_connection_down_sac_should_get_activated_test(_) -> assertEmpty(Effects2), assertEmpty(Groups2), + finish_process(Pid1), ok. handle_connection_down_sac_active_does_not_change_test(_) -> @@ -346,7 +347,7 @@ handle_connection_down_sac_active_does_not_change_test(_) -> ConsumerName = <<"app">>, GroupId = {<<"/">>, Stream, ConsumerName}, Pid0 = self(), - Pid1 = spawn(fun() -> ok end), + Pid1 = start_process(), Group = cgroup([consumer(Pid1, 0, true), consumer(Pid0, 1, false), consumer(Pid0, 2, false)]), @@ -361,6 +362,7 @@ handle_connection_down_sac_active_does_not_change_test(_) -> assertSize(1, maps:get(Pid1, PidsGroups)), assertEmpty(Effects), assertHasGroup(GroupId, cgroup([consumer(Pid1, 0, true)]), Groups), + finish_process(Pid1), ok. handle_connection_down_sac_no_more_consumers_test(_) -> @@ -386,7 +388,7 @@ handle_connection_down_sac_no_consumers_in_down_connection_test(_) -> ConsumerName = <<"app">>, GroupId = {<<"/">>, Stream, ConsumerName}, Pid0 = self(), - Pid1 = spawn(fun() -> ok end), + Pid1 = start_process(), Group = cgroup([consumer(Pid1, 0, true), consumer(Pid1, 1, false)]), State = state(#{GroupId => Group}, @@ -402,6 +404,7 @@ handle_connection_down_sac_no_consumers_in_down_connection_test(_) -> assertEmpty(Effects), assertHasGroup(GroupId, cgroup([consumer(Pid1, 0, true), consumer(Pid1, 1, false)]), Groups), + finish_process(Pid1), ok. handle_connection_down_super_stream_active_stays_test(_) -> @@ -409,7 +412,7 @@ handle_connection_down_super_stream_active_stays_test(_) -> ConsumerName = <<"app">>, GroupId = {<<"/">>, Stream, ConsumerName}, Pid0 = self(), - Pid1 = spawn(fun() -> ok end), + Pid1 = start_process(), Group = cgroup(1, [consumer(Pid0, 0, false), consumer(Pid0, 1, true), consumer(Pid1, 2, false), @@ -426,6 +429,7 @@ handle_connection_down_super_stream_active_stays_test(_) -> assertEmpty(Effects), assertHasGroup(GroupId, cgroup(1, [consumer(Pid0, 0, false), consumer(Pid0, 1, true)]), Groups), + finish_process(Pid1), ok. handle_connection_down_super_stream_active_changes_test(_) -> @@ -433,7 +437,7 @@ handle_connection_down_super_stream_active_changes_test(_) -> ConsumerName = <<"app">>, GroupId = {<<"/">>, Stream, ConsumerName}, Pid0 = self(), - Pid1 = spawn(fun() -> ok end), + Pid1 = start_process(), Group = cgroup(1, [consumer(Pid0, 0, false), consumer(Pid1, 1, true), consumer(Pid0, 2, false), @@ -450,6 +454,7 @@ handle_connection_down_super_stream_active_changes_test(_) -> assertSendMessageSteppingDownEffect(Pid1, 1, Stream, ConsumerName, Effects), assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 1, false), consumer(Pid1, 3, false)]), Groups), + finish_process(Pid1), ok. handle_connection_down_super_stream_activate_in_remaining_connection_test(_) -> @@ -457,7 +462,7 @@ handle_connection_down_super_stream_activate_in_remaining_connection_test(_) -> ConsumerName = <<"app">>, GroupId = {<<"/">>, Stream, ConsumerName}, Pid0 = self(), - Pid1 = spawn(fun() -> ok end), + Pid1 = start_process(), Group = cgroup(1, [consumer(Pid0, 0, false), consumer(Pid0, 1, true), consumer(Pid1, 2, false), @@ -474,6 +479,7 @@ handle_connection_down_super_stream_activate_in_remaining_connection_test(_) -> assertSendMessageEffect(Pid1, 3, Stream, ConsumerName, true, Effects), assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 2, false), consumer(Pid1, 3, true)]), Groups), + finish_process(Pid1), ok. handle_connection_down_super_stream_no_active_removed_or_present_test(_) -> @@ -481,7 +487,7 @@ handle_connection_down_super_stream_no_active_removed_or_present_test(_) -> ConsumerName = <<"app">>, GroupId = {<<"/">>, Stream, ConsumerName}, Pid0 = self(), - Pid1 = spawn(fun() -> ok end), + Pid1 = start_process(), %% this is a weird case that should not happen in the wild, %% we test the logic in the code nevertheless. %% No active consumer in the group @@ -501,8 +507,51 @@ handle_connection_down_super_stream_no_active_removed_or_present_test(_) -> assertEmpty(Effects), assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 2, false), consumer(Pid1, 3, false)]), Groups), + finish_process(Pid1), ok. +handle_connection_down_consumers_from_dead_connection_should_be_filtered_out_test(_) -> + Stream = <<"stream">>, + ConsumerName = <<"app">>, + GroupId = {<<"/">>, Stream, ConsumerName}, + Pid0 = self(), + Pid1 = start_process(), + Pid2 = start_process(), + Group = cgroup(1, [consumer(Pid0, 0, false), + consumer(Pid1, 1, true), + consumer(Pid2, 2, false)]), + State = state(#{GroupId => Group}, + #{Pid0 => maps:from_list([{GroupId, true}]), + Pid1 => maps:from_list([{GroupId, true}]), + Pid2 => maps:from_list([{GroupId, true}])}), + + finish_process(Pid1), + finish_process(Pid2), + {#?STATE{pids_groups = PidsGroups, groups = Groups}, + Effects} = + rabbit_stream_sac_coordinator:handle_connection_down(Pid1, State), + %% Pid2 is still in the monitors and this is expected in this test + %% it should be cleaned up when the coordinator gets the DOWN message + assertSize(2, PidsGroups), + assertSize(1, maps:get(Pid0, PidsGroups)), + assertSize(1, maps:get(Pid2, PidsGroups)), + assertSendMessageEffect(Pid0, 0, Stream, ConsumerName, true, Effects), + assertHasGroup(GroupId, cgroup(1, [consumer(Pid0, 0, true)]), + Groups), + finish_process(Pid1), + ok. + +start_process() -> + spawn(fun() -> + receive + _ -> ok + end + end). + +finish_process(Pid) -> + %% finish the process + Pid ! finish. + assertSize(Expected, []) -> ?assertEqual(Expected, 0); assertSize(Expected, Map) when is_map(Map) ->