Introduce stream SAC status instead of active flag

Conflicts:
	deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl
This commit is contained in:
Arnaud Cogoluègnes 2025-05-06 17:22:19 +02:00
parent 8cc26c4acd
commit d3e5ae98e5
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
3 changed files with 139 additions and 87 deletions

View File

@ -231,7 +231,9 @@ apply(#command_unregister_consumer{vhost = VirtualHost,
of
{value, Consumer} ->
G1 = remove_from_group(Consumer, Group0),
handle_consumer_removal(G1, Stream, ConsumerName, Consumer#consumer.active);
handle_consumer_removal(
G1, Stream, ConsumerName,
is_active(Consumer#consumer.active));
false ->
{Group0, []}
end,
@ -254,11 +256,12 @@ apply(#command_activate_consumer{vhost = VirtualHost,
"the group does not longer exist",
[{VirtualHost, Stream, ConsumerName}]),
{undefined, []};
Group ->
Group0 ->
Group1 = update_consumers(Group0, waiting),
#consumer{pid = Pid, subscription_id = SubId} =
evaluate_active_consumer(Group),
Group1 = update_consumer_state_in_group(Group, Pid, SubId, true),
{Group1, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]}
evaluate_active_consumer(Group1),
Group2 = update_consumer_state_in_group(Group1, Pid, SubId, active),
{Group2, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]}
end,
StreamGroups1 =
update_groups(VirtualHost, Stream, ConsumerName, G, StreamGroups0),
@ -324,12 +327,8 @@ group_consumers(VirtualHost,
[{connection_name,
Owner}
| RecAcc];
(state, RecAcc)
when Active ->
[{state, active}
| RecAcc];
(state, RecAcc) ->
[{state, inactive}
[{state, Active}
| RecAcc];
(Unknown, RecAcc) ->
[{Unknown,
@ -434,12 +433,13 @@ handle_group_after_connection_down(Pid,
%% remove the connection consumers from the group state
%% keep flags to know what happened
{Consumers1, ActiveRemoved, AnyRemoved} =
lists:foldl(
fun(#consumer{pid = P, active = S}, {L, ActiveFlag, _}) when P == Pid ->
{L, S or ActiveFlag, true};
(C, {L, ActiveFlag, AnyFlag}) ->
{L ++ [C], ActiveFlag, AnyFlag}
end, {[], false, false}, Consumers0),
lists:foldl(
fun(#consumer{pid = P, active = S}, {L, ActiveFlag, _})
when P == Pid ->
{L, is_active(S) or ActiveFlag, true};
(C, {L, ActiveFlag, AnyFlag}) ->
{L ++ [C], ActiveFlag, AnyFlag}
end, {[], false, false}, Consumers0),
case AnyRemoved of
true ->
@ -456,6 +456,11 @@ handle_group_after_connection_down(Pid,
end
end.
is_active(waiting) ->
false;
is_active(_) ->
true.
do_register_consumer(VirtualHost,
Stream,
-1 = _PartitionIndex,
@ -473,12 +478,12 @@ do_register_consumer(VirtualHost,
#consumer{pid = ConnectionPid,
owner = Owner,
subscription_id = SubscriptionId,
active = false};
active = waiting};
false ->
#consumer{pid = ConnectionPid,
subscription_id = SubscriptionId,
owner = Owner,
active = true}
active = active}
end,
Group1 = add_to_group(Consumer, Group0),
StreamGroups1 =
@ -491,14 +496,14 @@ do_register_consumer(VirtualHost,
#consumer{active = Active} = Consumer,
Effects =
case Active of
true ->
active ->
[notify_consumer_effect(ConnectionPid, SubscriptionId,
Stream, ConsumerName, Active)];
Stream, ConsumerName, is_active(Active))];
_ ->
[]
end,
{State#?MODULE{groups = StreamGroups1}, {ok, Active}, Effects};
{State#?MODULE{groups = StreamGroups1}, {ok, is_active(Active)}, Effects};
do_register_consumer(VirtualHost,
Stream,
_PartitionIndex,
@ -518,7 +523,7 @@ do_register_consumer(VirtualHost,
#consumer{pid = ConnectionPid,
owner = Owner,
subscription_id = SubscriptionId,
active = true},
active = active},
G1 = add_to_group(Consumer0, Group0),
{G1,
[notify_consumer_effect(ConnectionPid, SubscriptionId,
@ -529,7 +534,7 @@ do_register_consumer(VirtualHost,
#consumer{pid = ConnectionPid,
owner = Owner,
subscription_id = SubscriptionId,
active = false},
active = waiting},
G1 = add_to_group(Consumer0, Group0),
case lookup_active_consumer(G1) of
@ -545,7 +550,7 @@ do_register_consumer(VirtualHost,
{update_consumer_state_in_group(G1,
ActPid,
ActSubId,
false),
deactivating),
[notify_consumer_effect(ActPid,
ActSubId,
Stream,
@ -567,7 +572,7 @@ do_register_consumer(VirtualHost,
StreamGroups0),
{value, #consumer{active = Active}} =
lookup_consumer(ConnectionPid, SubscriptionId, Group1),
{State#?MODULE{groups = StreamGroups1}, {ok, Active}, Effects}.
{State#?MODULE{groups = StreamGroups1}, {ok, is_active(Active)}, Effects}.
handle_consumer_removal(#group{consumers = []} = G, _, _, _) ->
{G, []};
@ -603,7 +608,7 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
{update_consumer_state_in_group(Group0,
ActPid,
ActSubId,
false),
deactivating),
[notify_consumer_effect(ActPid, ActSubId,
Stream, ConsumerName, false, true)]}
end;
@ -613,7 +618,7 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
%% the active one is going away, picking a new one
#consumer{pid = P, subscription_id = SID} =
evaluate_active_consumer(Group0),
{update_consumer_state_in_group(Group0, P, SID, true),
{update_consumer_state_in_group(Group0, P, SID, active),
[notify_consumer_effect(P, SID,
Stream, ConsumerName, true)]};
false ->
@ -683,13 +688,13 @@ compute_active_consumer(#group{consumers = Crs,
compute_active_consumer(#group{partition_index = -1,
consumers = [Consumer0]} =
Group0) ->
Consumer1 = Consumer0#consumer{active = true},
Consumer1 = Consumer0#consumer{active = active},
Group0#group{consumers = [Consumer1]};
compute_active_consumer(#group{partition_index = -1,
consumers = [Consumer0 | T]} =
Group0) ->
Consumer1 = Consumer0#consumer{active = true},
Consumers = lists:map(fun(C) -> C#consumer{active = false} end, T),
Consumer1 = Consumer0#consumer{active = active},
Consumers = lists:map(fun(C) -> C#consumer{active = waiting} end, T),
Group0#group{consumers = [Consumer1] ++ Consumers}.
evaluate_active_consumer(#group{partition_index = PartitionIndex,
@ -706,7 +711,7 @@ lookup_consumer(ConnectionPid, SubscriptionId,
Consumers).
lookup_active_consumer(#group{consumers = Consumers}) ->
lists:search(fun(#consumer{active = Active}) -> Active end,
lists:search(fun(#consumer{active = Active}) -> is_active(Active) end,
Consumers).
update_groups(_VirtualHost,
@ -743,6 +748,12 @@ update_consumer_state_in_group(#group{consumers = Consumers0} = G,
Consumers0),
G#group{consumers = CS1}.
update_consumers(#group{consumers = Consumers0} = G, NewState) ->
Consumers1 = lists:map(fun(C) ->
C#consumer{active = NewState}
end, Consumers0),
G#group{consumers = Consumers1}.
mod_call_effect(Pid, Msg) ->
{mod_call, rabbit_stream_sac_coordinator, send_message, [Pid, Msg]}.

View File

@ -27,7 +27,7 @@
{pid :: pid(),
subscription_id :: subscription_id(),
owner :: owner(), %% just a label
active :: boolean()}).
active :: active | waiting | deactivating}).
-record(group,
{consumers :: [#consumer{}], partition_index :: integer()}).
-record(rabbit_stream_sac_coordinator,

View File

@ -73,7 +73,7 @@ simple_sac_test(_) ->
{ok, Active1}, Effects1} =
rabbit_stream_sac_coordinator:apply(Command0, State0),
?assert(Active1),
?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1),
?assertEqual([consumer(ConnectionPid, 0, active)], Consumers1),
assertSendMessageEffect(ConnectionPid, 0, Stream, ConsumerName, true, Effects1),
Command1 =
@ -83,8 +83,8 @@ simple_sac_test(_) ->
{ok, Active2}, Effects2} =
rabbit_stream_sac_coordinator:apply(Command1, State1),
?assertNot(Active2),
?assertEqual([consumer(ConnectionPid, 0, true),
consumer(ConnectionPid, 1, false)],
?assertEqual([consumer(ConnectionPid, 0, active),
consumer(ConnectionPid, 1, waiting)],
Consumers2),
assertEmpty(Effects2),
@ -95,9 +95,9 @@ simple_sac_test(_) ->
{ok, Active3}, Effects3} =
rabbit_stream_sac_coordinator:apply(Command2, State2),
?assertNot(Active3),
?assertEqual([consumer(ConnectionPid, 0, true),
consumer(ConnectionPid, 1, false),
consumer(ConnectionPid, 2, false)],
?assertEqual([consumer(ConnectionPid, 0, active),
consumer(ConnectionPid, 1, waiting),
consumer(ConnectionPid, 2, waiting)],
Consumers3),
assertEmpty(Effects3),
@ -107,8 +107,8 @@ simple_sac_test(_) ->
State4,
ok, Effects4} =
rabbit_stream_sac_coordinator:apply(Command3, State3),
?assertEqual([consumer(ConnectionPid, 1, true),
consumer(ConnectionPid, 2, false)],
?assertEqual([consumer(ConnectionPid, 1, active),
consumer(ConnectionPid, 2, waiting)],
Consumers4),
assertSendMessageEffect(ConnectionPid, 1, Stream, ConsumerName, true, Effects4),
@ -118,7 +118,7 @@ simple_sac_test(_) ->
State5,
ok, Effects5} =
rabbit_stream_sac_coordinator:apply(Command4, State4),
?assertEqual([consumer(ConnectionPid, 2, true)], Consumers5),
?assertEqual([consumer(ConnectionPid, 2, active)], Consumers5),
assertSendMessageEffect(ConnectionPid, 2, Stream, ConsumerName, true, Effects5),
Command5 =
@ -143,7 +143,7 @@ super_stream_partition_sac_test(_) ->
{ok, Active1}, Effects1} =
rabbit_stream_sac_coordinator:apply(Command0, State0),
?assert(Active1),
?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1),
?assertEqual([consumer(ConnectionPid, 0, active)], Consumers1),
assertSendMessageEffect(ConnectionPid, 0, Stream, ConsumerName, true, Effects1),
Command1 =
@ -155,8 +155,8 @@ super_stream_partition_sac_test(_) ->
%% never active on registration
?assertNot(Active2),
%% all consumers inactive, until the former active one steps down and activates the new consumer
?assertEqual([consumer(ConnectionPid, 0, false),
consumer(ConnectionPid, 1, false)],
?assertEqual([consumer(ConnectionPid, 0, deactivating),
consumer(ConnectionPid, 1, waiting)],
Consumers2),
assertSendMessageSteppingDownEffect(ConnectionPid, 0, Stream, ConsumerName, Effects2),
@ -167,8 +167,8 @@ super_stream_partition_sac_test(_) ->
rabbit_stream_sac_coordinator:apply(Command2, State2),
%% 1 (partition index) % 2 (consumer count) = 1 (active consumer index)
?assertEqual([consumer(ConnectionPid, 0, false),
consumer(ConnectionPid, 1, true)],
?assertEqual([consumer(ConnectionPid, 0, waiting),
consumer(ConnectionPid, 1, active)],
Consumers3),
assertSendMessageEffect(ConnectionPid, 1, Stream, ConsumerName, true, Effects3),
@ -182,9 +182,9 @@ super_stream_partition_sac_test(_) ->
?assertNot(Active4),
%% 1 (partition index) % 3 (consumer count) = 1 (active consumer index)
%% the active consumer stays the same
?assertEqual([consumer(ConnectionPid, 0, false),
consumer(ConnectionPid, 1, true),
consumer(ConnectionPid, 2, false)],
?assertEqual([consumer(ConnectionPid, 0, waiting),
consumer(ConnectionPid, 1, active),
consumer(ConnectionPid, 2, waiting)],
Consumers4),
assertEmpty(Effects4),
@ -196,8 +196,8 @@ super_stream_partition_sac_test(_) ->
rabbit_stream_sac_coordinator:apply(Command4, State4),
%% 1 (partition index) % 2 (consumer count) = 1 (active consumer index)
%% the active consumer will move from sub 1 to sub 2
?assertEqual([consumer(ConnectionPid, 1, false),
consumer(ConnectionPid, 2, false)],
?assertEqual([consumer(ConnectionPid, 1, deactivating),
consumer(ConnectionPid, 2, waiting)],
Consumers5),
assertSendMessageSteppingDownEffect(ConnectionPid, 1, Stream, ConsumerName, Effects5),
@ -208,8 +208,8 @@ super_stream_partition_sac_test(_) ->
ok, Effects6} =
rabbit_stream_sac_coordinator:apply(Command5, State5),
?assertEqual([consumer(ConnectionPid, 1, false),
consumer(ConnectionPid, 2, true)],
?assertEqual([consumer(ConnectionPid, 1, waiting),
consumer(ConnectionPid, 2, active)],
Consumers6),
assertSendMessageEffect(ConnectionPid, 2, Stream, ConsumerName, true, Effects6),
@ -219,7 +219,7 @@ super_stream_partition_sac_test(_) ->
State7,
ok, Effects7} =
rabbit_stream_sac_coordinator:apply(Command6, State6),
?assertEqual([consumer(ConnectionPid, 2, true)], Consumers7),
?assertEqual([consumer(ConnectionPid, 2, active)], Consumers7),
assertEmpty(Effects7),
Command7 =
@ -318,9 +318,9 @@ handle_connection_down_sac_should_get_activated_test(_) ->
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Group = cgroup([consumer(Pid0, 0, true),
consumer(Pid1, 1, false),
consumer(Pid0, 2, false)]),
Group = cgroup([consumer(Pid0, 0, active),
consumer(Pid1, 1, waiting),
consumer(Pid0, 2, waiting)]),
State0 = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}]),
Pid1 => maps:from_list([{GroupId, true}])}),
@ -331,7 +331,7 @@ handle_connection_down_sac_should_get_activated_test(_) ->
assertSize(1, PidsGroups1),
assertSize(1, maps:get(Pid1, PidsGroups1)),
assertSendMessageEffect(Pid1, 1, Stream, ConsumerName, true, Effects1),
assertHasGroup(GroupId, cgroup([consumer(Pid1, 1, true)]), Groups1),
assertHasGroup(GroupId, cgroup([consumer(Pid1, 1, active)]), Groups1),
{#?STATE{pids_groups = PidsGroups2, groups = Groups2},
Effects2} =
rabbit_stream_sac_coordinator:handle_connection_down(Pid1, State1),
@ -347,9 +347,9 @@ handle_connection_down_sac_active_does_not_change_test(_) ->
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Group = cgroup([consumer(Pid1, 0, true),
consumer(Pid0, 1, false),
consumer(Pid0, 2, false)]),
Group = cgroup([consumer(Pid1, 0, active),
consumer(Pid0, 1, waiting),
consumer(Pid0, 2, waiting)]),
State = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}]),
Pid1 => maps:from_list([{GroupId, true}])}),
@ -360,7 +360,7 @@ handle_connection_down_sac_active_does_not_change_test(_) ->
assertSize(1, PidsGroups),
assertSize(1, maps:get(Pid1, PidsGroups)),
assertEmpty(Effects),
assertHasGroup(GroupId, cgroup([consumer(Pid1, 0, true)]), Groups),
assertHasGroup(GroupId, cgroup([consumer(Pid1, 0, active)]), Groups),
ok.
handle_connection_down_sac_no_more_consumers_test(_) ->
@ -368,8 +368,8 @@ handle_connection_down_sac_no_more_consumers_test(_) ->
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Group = cgroup([consumer(Pid0, 0, true),
consumer(Pid0, 1, false)]),
Group = cgroup([consumer(Pid0, 0, active),
consumer(Pid0, 1, waiting)]),
State = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}])}),
@ -387,8 +387,8 @@ handle_connection_down_sac_no_consumers_in_down_connection_test(_) ->
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Group = cgroup([consumer(Pid1, 0, true),
consumer(Pid1, 1, false)]),
Group = cgroup([consumer(Pid1, 0, active),
consumer(Pid1, 1, waiting)]),
State = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}]), %% should not be there
Pid1 => maps:from_list([{GroupId, true}])}),
@ -400,7 +400,8 @@ handle_connection_down_sac_no_consumers_in_down_connection_test(_) ->
assertSize(1, PidsGroups),
assertSize(1, maps:get(Pid1, PidsGroups)),
assertEmpty(Effects),
assertHasGroup(GroupId, cgroup([consumer(Pid1, 0, true), consumer(Pid1, 1, false)]),
assertHasGroup(GroupId,
cgroup([consumer(Pid1, 0, active), consumer(Pid1, 1, waiting)]),
Groups),
ok.
@ -410,10 +411,10 @@ handle_connection_down_super_stream_active_stays_test(_) ->
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Group = cgroup(1, [consumer(Pid0, 0, false),
consumer(Pid0, 1, true),
consumer(Pid1, 2, false),
consumer(Pid1, 3, false)]),
Group = cgroup(1, [consumer(Pid0, 0, waiting),
consumer(Pid0, 1, active),
consumer(Pid1, 2, waiting),
consumer(Pid1, 3, waiting)]),
State = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}]),
Pid1 => maps:from_list([{GroupId, true}])}),
@ -424,7 +425,8 @@ handle_connection_down_super_stream_active_stays_test(_) ->
assertSize(1, PidsGroups),
assertSize(1, maps:get(Pid0, PidsGroups)),
assertEmpty(Effects),
assertHasGroup(GroupId, cgroup(1, [consumer(Pid0, 0, false), consumer(Pid0, 1, true)]),
assertHasGroup(GroupId,
cgroup(1, [consumer(Pid0, 0, waiting), consumer(Pid0, 1, active)]),
Groups),
ok.
@ -434,10 +436,10 @@ handle_connection_down_super_stream_active_changes_test(_) ->
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Group = cgroup(1, [consumer(Pid0, 0, false),
consumer(Pid1, 1, true),
consumer(Pid0, 2, false),
consumer(Pid1, 3, false)]),
Group = cgroup(1, [consumer(Pid0, 0, waiting),
consumer(Pid1, 1, active),
consumer(Pid0, 2, waiting),
consumer(Pid1, 3, waiting)]),
State = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}]),
Pid1 => maps:from_list([{GroupId, true}])}),
@ -448,7 +450,8 @@ handle_connection_down_super_stream_active_changes_test(_) ->
assertSize(1, PidsGroups),
assertSize(1, maps:get(Pid1, PidsGroups)),
assertSendMessageSteppingDownEffect(Pid1, 1, Stream, ConsumerName, Effects),
assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 1, false), consumer(Pid1, 3, false)]),
assertHasGroup(GroupId,
cgroup(1, [consumer(Pid1, 1, deactivating), consumer(Pid1, 3, waiting)]),
Groups),
ok.
@ -458,10 +461,10 @@ handle_connection_down_super_stream_activate_in_remaining_connection_test(_) ->
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Group = cgroup(1, [consumer(Pid0, 0, false),
consumer(Pid0, 1, true),
consumer(Pid1, 2, false),
consumer(Pid1, 3, false)]),
Group = cgroup(1, [consumer(Pid0, 0, waiting),
consumer(Pid0, 1, active),
consumer(Pid1, 2, waiting),
consumer(Pid1, 3, waiting)]),
State = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}]),
Pid1 => maps:from_list([{GroupId, true}])}),
@ -472,7 +475,7 @@ handle_connection_down_super_stream_activate_in_remaining_connection_test(_) ->
assertSize(1, PidsGroups),
assertSize(1, maps:get(Pid1, PidsGroups)),
assertSendMessageEffect(Pid1, 3, Stream, ConsumerName, true, Effects),
assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 2, false), consumer(Pid1, 3, true)]),
assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 2, waiting), consumer(Pid1, 3, active)]),
Groups),
ok.
@ -485,10 +488,10 @@ handle_connection_down_super_stream_no_active_removed_or_present_test(_) ->
%% this is a weird case that should not happen in the wild,
%% we test the logic in the code nevertheless.
%% No active consumer in the group
Group = cgroup(1, [consumer(Pid0, 0, false),
consumer(Pid0, 1, false),
consumer(Pid1, 2, false),
consumer(Pid1, 3, false)]),
Group = cgroup(1, [consumer(Pid0, 0, waiting),
consumer(Pid0, 1, waiting),
consumer(Pid1, 2, waiting),
consumer(Pid1, 3, waiting)]),
State = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}]),
Pid1 => maps:from_list([{GroupId, true}])}),
@ -499,7 +502,7 @@ handle_connection_down_super_stream_no_active_removed_or_present_test(_) ->
assertSize(1, PidsGroups),
assertSize(1, maps:get(Pid1, PidsGroups)),
assertEmpty(Effects),
assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 2, false), consumer(Pid1, 3, false)]),
assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 2, waiting), consumer(Pid1, 3, waiting)]),
Groups),
ok.
@ -517,6 +520,44 @@ register_consumer_with_different_partition_index_should_return_error_test(_) ->
{_, {error, partition_index_conflict}, []} =
rabbit_stream_sac_coordinator:apply(Command1, State1).
handle_connection_down_consumers_from_dead_connection_should_be_filtered_out_test(_) ->
Stream = <<"stream">>,
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Group = cgroup(1, [consumer(Pid0, 0, waiting),
consumer(Pid1, 1, active),
consumer(Pid2, 2, waiting)]),
State0 = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}]),
Pid1 => maps:from_list([{GroupId, true}]),
Pid2 => maps:from_list([{GroupId, true}])}),
{#?STATE{pids_groups = PidsGroups1, groups = Groups1} = State1,
Effects1} =
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State0),
assertSize(2, PidsGroups1),
assertSize(1, maps:get(Pid1, PidsGroups1)),
assertSize(1, maps:get(Pid2, PidsGroups1)),
assertSendMessageSteppingDownEffect(Pid1, 1, Stream, ConsumerName, Effects1),
assertHasGroup(GroupId,
cgroup(1, [consumer(Pid1, 1, deactivating), consumer(Pid2, 2, waiting)]),
Groups1),
{#?STATE{pids_groups = PidsGroups2, groups = Groups2},
Effects2} =
rabbit_stream_sac_coordinator:handle_connection_down(Pid1, State1),
assertSize(1, PidsGroups2),
assertSize(1, maps:get(Pid2, PidsGroups2)),
assertSendMessageEffect(Pid2, 2, Stream, ConsumerName, true, Effects2),
assertHasGroup(GroupId,
cgroup(1, [consumer(Pid2, 2, active)]),
Groups2),
ok.
assertSize(Expected, []) ->
?assertEqual(Expected, 0);
assertSize(Expected, Map) when is_map(Map) ->