Add unit tests to stream SAC coordinator

Node disconnection/reconnection.
This commit is contained in:
Arnaud Cogoluègnes 2025-04-17 16:10:00 +02:00
parent a435248b85
commit a96db75229
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
3 changed files with 410 additions and 174 deletions

View File

@ -720,10 +720,11 @@ apply(#{machine_version := MachineVersion} = Meta,
{Ss#{Id => S}, E}
end, {Streams0, Effects0}, Streams0),
{Sac1, Effects2} = case MachineVersion > 5 of
{Sac1, Effects2} = case MachineVersion > 4 of
true ->
SacMod = sac_module(Meta),
SacMod:handle_node_reconnected(Sac0, Effects1);
SacMod:handle_node_reconnected(Node,
Sac0, Effects1);
false ->
{Sac0, Effects1}
end,

View File

@ -42,7 +42,7 @@
ensure_monitors/4,
handle_connection_down/2,
handle_connection_node_disconnected/2,
handle_node_reconnected/2,
handle_node_reconnected/3,
forget_connection/2,
consumer_groups/3,
group_consumers/5,
@ -300,17 +300,7 @@ apply(#command_purge_nodes{nodes = Nodes}, State0) ->
{State1, ok, Eff}.
purge_node(Node, #?MODULE{groups = Groups0} = State0) ->
PidsGroups =
maps:fold(fun(K, #group{consumers = Consumers}, Acc) ->
lists:foldl(fun(#consumer{pid = Pid}, AccIn)
when node(Pid) =:= Node ->
PG0 = maps:get(Pid, AccIn, #{}),
PG1 = PG0#{K => true},
AccIn#{Pid => PG1};
(_, AccIn) ->
AccIn
end, Acc, Consumers)
end, #{}, Groups0),
PidsGroups = compute_node_pid_group_dependencies(Node, Groups0),
maps:fold(fun(Pid, Groups, {S0, Eff0}) ->
{S1, Eff1} = handle_connection_down0(Pid, S0, Groups),
{S1, Eff1 ++ Eff0}
@ -580,8 +570,15 @@ ensure_monitors(#command_connection_reconnected{pid = Pid},
{State#?MODULE{pids_groups = AllPidsGroups},
Monitors#{Pid => sac},
[{monitor, process, Pid}, {monitor, node, node(Pid)} | Effects]};
ensure_monitors(#command_purge_nodes{},
#?MODULE{groups = Groups} = State,
Monitors,
Effects) ->
AllPidsGroups = compute_pid_group_dependencies(Groups),
{State#?MODULE{pids_groups = AllPidsGroups},
Monitors,
Effects};
ensure_monitors(_, #?MODULE{} = State0, Monitors, Effects) ->
%% TODO sac: ensure the pid-group mapping after purge_nodes?
{State0, Monitors, Effects}.
-spec handle_connection_down(connection_pid(), state()) ->
@ -621,21 +618,21 @@ handle_connection_node_disconnected(ConnPid,
#{connection_pid => ConnPid}}, T}]}
end.
-spec handle_node_reconnected(state(), ra_machine:effects()) ->
-spec handle_node_reconnected(node(), state(), ra_machine:effects()) ->
{state(), ra_machine:effects()}.
handle_node_reconnected(#?MODULE{pids_groups = PidsGroups0,
handle_node_reconnected(Node,
#?MODULE{pids_groups = PidsGroups0,
groups = Groups0} = State0,
Effects0) ->
AllPidsGroups = compute_pid_group_dependencies(Groups0),
NotMonitored = maps:keys(AllPidsGroups) -- maps:keys(PidsGroups0),
NodePidsGroups = compute_node_pid_group_dependencies(Node, Groups0),
PidsGroups1 = maps:merge(PidsGroups0, NodePidsGroups),
Effects1 =
lists:foldr(fun(P, Acc) ->
[notify_connection_effect(P),
{monitor, process, P},
{monitor, node, node(P)} | Acc]
end, Effects0, NotMonitored),
{monitor, process, P} | Acc]
end, Effects0, maps:keys(NodePidsGroups)),
{State0#?MODULE{pids_groups = AllPidsGroups}, Effects1}.
{State0#?MODULE{pids_groups = PidsGroups1}, Effects1}.
-spec forget_connection(connection_pid(), state()) ->
{state(), ra_machine:effects()}.
@ -1122,3 +1119,17 @@ compute_pid_group_dependencies(Groups) ->
AccIn#{Pid => PG1}
end, Acc, Cs)
end, #{}, Groups).
-spec compute_node_pid_group_dependencies(node(), groups()) -> pids_groups().
compute_node_pid_group_dependencies(Node, Groups) ->
maps:fold(fun(K, #group{consumers = Consumers}, Acc) ->
lists:foldl(fun(#consumer{pid = Pid}, AccIn)
when node(Pid) =:= Node ->
PG0 = maps:get(Pid, AccIn, #{}),
PG1 = PG0#{K => true},
AccIn#{Pid => PG1};
(_, AccIn) ->
AccIn
end, Acc, Consumers)
end, #{}, Groups).

View File

@ -74,7 +74,7 @@ simple_sac_test(_) ->
{ok, Active1}, Effects1} = ?MOD:apply(Command0, State0),
?assert(Active1),
?assertEqual([consumer(ConnectionPid, 0, active)], Consumers1),
assertSendMessageEffect(ConnectionPid, 0, Stream, ConsumerName, true, Effects1),
assertSendMessageActivateEffect(ConnectionPid, 0, Stream, ConsumerName, true, Effects1),
Command1 =
register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 1),
@ -107,7 +107,7 @@ simple_sac_test(_) ->
?assertEqual([consumer(ConnectionPid, 1, active),
consumer(ConnectionPid, 2, waiting)],
Consumers4),
assertSendMessageEffect(ConnectionPid, 1, Stream, ConsumerName, true, Effects4),
assertSendMessageActivateEffect(ConnectionPid, 1, Stream, ConsumerName, true, Effects4),
Command4 =
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1),
@ -115,7 +115,7 @@ simple_sac_test(_) ->
State5,
ok, Effects5} = ?MOD:apply(Command4, State4),
?assertEqual([consumer(ConnectionPid, 2, active)], Consumers5),
assertSendMessageEffect(ConnectionPid, 2, Stream, ConsumerName, true, Effects5),
assertSendMessageActivateEffect(ConnectionPid, 2, Stream, ConsumerName, true, Effects5),
Command5 =
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 2),
@ -138,7 +138,7 @@ super_stream_partition_sac_test(_) ->
{ok, Active1}, Effects1} = ?MOD:apply(Command0, State0),
?assert(Active1),
?assertEqual([consumer(ConnectionPid, 0, active)], Consumers1),
assertSendMessageEffect(ConnectionPid, 0, Stream, ConsumerName, true, Effects1),
assertSendMessageActivateEffect(ConnectionPid, 0, Stream, ConsumerName, true, Effects1),
Command1 =
register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 1),
@ -162,7 +162,7 @@ super_stream_partition_sac_test(_) ->
?assertEqual([consumer(ConnectionPid, 0, waiting),
consumer(ConnectionPid, 1, active)],
Consumers3),
assertSendMessageEffect(ConnectionPid, 1, Stream, ConsumerName, true, Effects3),
assertSendMessageActivateEffect(ConnectionPid, 1, Stream, ConsumerName, true, Effects3),
Command3 =
register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 2),
@ -200,7 +200,7 @@ super_stream_partition_sac_test(_) ->
?assertEqual([consumer(ConnectionPid, 1, waiting),
consumer(ConnectionPid, 2, active)],
Consumers6),
assertSendMessageEffect(ConnectionPid, 2, Stream, ConsumerName, true, Effects6),
assertSendMessageActivateEffect(ConnectionPid, 2, Stream, ConsumerName, true, Effects6),
Command6 =
unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1),
@ -265,7 +265,7 @@ ensure_monitors_test(_) ->
%% trying with an unknown connection PID
%% the function should not change anything
UnknownConnectionPid = spawn(fun() -> ok end),
UnknownConnectionPid = new_process(),
PassthroughCommand = unregister_consumer_command(<<"stream">>,
<<"app">>,
UnknownConnectionPid,
@ -296,7 +296,7 @@ handle_connection_down_sac_should_get_activated_test(_) ->
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid1 = new_process(),
Group = cgroup([consumer(Pid0, 0, active),
consumer(Pid1, 1, waiting),
consumer(Pid0, 2, waiting)]),
@ -306,7 +306,7 @@ handle_connection_down_sac_should_get_activated_test(_) ->
Effects1} = ?MOD:handle_connection_down(Pid0, State0),
assertSize(1, PidsGroups1),
assertSize(1, maps:get(Pid1, PidsGroups1)),
assertSendMessageEffect(Pid1, 1, Stream, ConsumerName, true, Effects1),
assertSendMessageActivateEffect(Pid1, 1, Stream, ConsumerName, true, Effects1),
assertHasGroup(GroupId, cgroup([consumer(Pid1, 1, active)]), Groups1),
{#?STATE{pids_groups = PidsGroups2, groups = Groups2},
Effects2} = ?MOD:handle_connection_down(Pid1, State1),
@ -321,7 +321,7 @@ handle_connection_down_sac_active_does_not_change_test(_) ->
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid1 = new_process(),
Group = cgroup([consumer(Pid1, 0, active),
consumer(Pid0, 1, waiting),
consumer(Pid0, 2, waiting)]),
@ -356,7 +356,7 @@ handle_connection_down_sac_no_consumers_in_down_connection_test(_) ->
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid1 = new_process(),
Group = cgroup([consumer(Pid1, 0, active),
consumer(Pid1, 1, waiting)]),
State = state(#{GroupId => Group},
@ -379,7 +379,7 @@ handle_connection_down_super_stream_active_stays_test(_) ->
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid1 = new_process(),
Group = cgroup(1, [consumer(Pid0, 0, waiting),
consumer(Pid0, 1, active),
consumer(Pid1, 2, waiting),
@ -402,7 +402,7 @@ handle_connection_down_super_stream_active_changes_test(_) ->
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid1 = new_process(),
Group = cgroup(1, [consumer(Pid0, 0, waiting),
consumer(Pid1, 1, active),
consumer(Pid0, 2, waiting),
@ -426,7 +426,7 @@ handle_connection_down_super_stream_activate_in_remaining_connection_test(_) ->
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid1 = new_process(),
Group = cgroup(1, [consumer(Pid0, 0, waiting),
consumer(Pid0, 1, active),
consumer(Pid1, 2, waiting),
@ -437,7 +437,7 @@ handle_connection_down_super_stream_activate_in_remaining_connection_test(_) ->
Effects} = ?MOD:handle_connection_down(Pid0, State),
assertSize(1, PidsGroups),
assertSize(1, maps:get(Pid1, PidsGroups)),
assertSendMessageEffect(Pid1, 3, Stream, ConsumerName, true, Effects),
assertSendMessageActivateEffect(Pid1, 3, Stream, ConsumerName, true, Effects),
assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 2, waiting),
consumer(Pid1, 3, active)]),
Groups),
@ -448,7 +448,7 @@ handle_connection_down_super_stream_no_active_removed_or_present_test(_) ->
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid1 = new_process(),
%% this is a weird case that should not happen in the wild,
%% we test the logic in the code nevertheless.
%% No active consumer in the group
@ -487,8 +487,8 @@ handle_connection_down_consumers_from_dead_connection_should_be_filtered_out_tes
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Pid1 = new_process(),
Pid2 = new_process(),
Group = cgroup(1, [consumer(Pid0, 0, waiting),
consumer(Pid1, 1, active),
consumer(Pid2, 2, waiting)]),
@ -510,7 +510,7 @@ handle_connection_down_consumers_from_dead_connection_should_be_filtered_out_tes
Effects2} = ?MOD:handle_connection_down(Pid1, State1),
assertSize(1, PidsGroups2),
assertSize(1, maps:get(Pid2, PidsGroups2)),
assertSendMessageEffect(Pid2, 2, Stream, ConsumerName, true, Effects2),
assertSendMessageActivateEffect(Pid2, 2, Stream, ConsumerName, true, Effects2),
assertHasGroup(GroupId,
cgroup(1, [consumer(Pid2, 2, active)]),
Groups2),
@ -529,8 +529,8 @@ import_state_v4_test(_) ->
OldMod = rabbit_stream_sac_coordinator_v4,
OldState0 = OldMod:init_state(),
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Pid1 = new_process(),
Pid2 = new_process(),
S = <<"stream">>,
App0 = <<"app-0">>,
Cmd0 = register_consumer_command(S, -1, App0, Pid0, 0),
@ -576,8 +576,8 @@ handle_connection_node_disconnected_test(_) ->
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Pid1 = new_process(),
Pid2 = new_process(),
Group = cgroup(1, [consumer(Pid0, 0, waiting),
consumer(Pid1, 1, active),
consumer(Pid2, 2, waiting)]),
@ -600,58 +600,61 @@ handle_connection_node_disconnected_test(_) ->
ok.
handle_node_reconnected_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
CName = <<"app">>,
N0 = node(),
{N1Pid, N1} = start_node(?FUNCTION_NAME),
N0Pid0 = new_process(N0),
N0Pid1 = new_process(N0),
N1Pid0 = new_process(N1),
S0 = <<"s0">>,
GId0 = {<<"/">>, S0, CName},
Group0 = cgroup(0, [consumer(Pid0, 0, {connected, active}),
consumer(Pid1, 1, {disconnected, waiting}),
consumer(Pid2, 2, {connected, waiting})]),
S1 = <<"s1">>,
GId1 = {<<"/">>, S1, CName},
Group1 = cgroup(1, [consumer(Pid0, 0, {connected, waiting}),
consumer(Pid1, 1, {disconnected, active}),
consumer(Pid2, 2, {connected, waiting})]),
S2 = <<"s2">>,
GId2 = {<<"/">>, S2, CName},
Group2 = cgroup(1, [consumer(Pid0, 0, {connected, waiting}),
consumer(Pid1, 1, {disconnected, waiting}),
consumer(Pid2, 2, {connected, active})]),
GId0 = group_id(S0),
GId1 = group_id(S1),
GId2 = group_id(S2),
Group0 = cgroup(0, [consumer(N0Pid0, 0, {connected, active}),
consumer(N1Pid0, 1, {disconnected, waiting}),
consumer(N0Pid1, 2, {connected, waiting})]),
Group1 = cgroup(1, [consumer(N0Pid0, 0, {connected, waiting}),
consumer(N1Pid0, 1, {disconnected, active}),
consumer(N0Pid1, 2, {connected, waiting})]),
Group2 = cgroup(1, [consumer(N0Pid0, 0, {connected, waiting}),
consumer(N1Pid0, 1, {disconnected, waiting}),
consumer(N0Pid1, 2, {connected, active})]),
Groups0 = #{GId0 => Group0,
GId1 => Group1,
GId2 => Group2},
%% Pid2 is missing from PIDs to groups dependency mapping
State0 = state(Groups0,
#{Pid0 => #{GId0 => true, GId1 => true, GId2 => true},
Pid2 => #{GId0 => true, GId1 => true, GId2 => true}}),
#{N0Pid0 => #{GId0 => true, GId1 => true, GId2 => true},
N0Pid1 => #{GId0 => true, GId1 => true, GId2 => true}}),
{#?STATE{pids_groups = PidsGroups1, groups = Groups1} = _State1,
Effects1} =
?MOD:handle_node_reconnected(State0, []),
?MOD:handle_node_reconnected(N1, State0, []),
?assertEqual(Groups0, Groups1),
?assertEqual(#{Pid0 => #{GId0 => true, GId1 => true, GId2 => true},
Pid1 => #{GId0 => true, GId1 => true, GId2 => true},
Pid2 => #{GId0 => true, GId1 => true, GId2 => true}},
?assertEqual(#{N0Pid0 => #{GId0 => true, GId1 => true, GId2 => true},
N1Pid0 => #{GId0 => true, GId1 => true, GId2 => true},
N0Pid1 => #{GId0 => true, GId1 => true, GId2 => true}},
PidsGroups1),
?assertEqual([{mod_call,rabbit_stream_sac_coordinator,send_message,
[Pid1,{sac,check_connection,#{}}]},
{monitor, process, Pid1},
{monitor, node, node(Pid1)}],
Effects1),
assertSize(2, Effects1),
assertContainsCheckConnectionEffect(N1Pid0, Effects1),
assertContainsMonitorProcessEffect(N1Pid0, Effects1),
stop_node(N1Pid),
ok.
connection_reconnected_simple_disconnected_becomes_connected_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
Pid2 = new_process(),
GId = group_id(),
Group = cgroup([consumer(Pid0, 0, {disconnected, active}),
consumer(Pid1, 1, {connected, waiting}),
@ -671,9 +674,9 @@ connection_reconnected_simple_disconnected_becomes_connected_test(_) ->
ok.
connection_reconnected_simple_active_should_be_first_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
Pid2 = new_process(),
GId = group_id(),
%% disconnected for a while, got first in consumer array
%% because consumers arrived and left
@ -695,9 +698,9 @@ connection_reconnected_simple_active_should_be_first_test(_) ->
ok.
connection_reconnected_super_disconnected_becomes_connected_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
Pid2 = new_process(),
GId = group_id(),
Group = cgroup(1, [consumer(Pid0, 0, {disconnected, waiting}),
consumer(Pid1, 1, {connected, waiting}),
@ -718,9 +721,9 @@ connection_reconnected_super_disconnected_becomes_connected_test(_) ->
ok.
forget_connection_simple_disconnected_becomes_forgotten_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
Pid2 = new_process(),
GId = group_id(),
Group = cgroup([consumer(Pid0, 0, {disconnected, active}),
consumer(Pid1, 1, {connected, waiting}),
@ -735,13 +738,13 @@ forget_connection_simple_disconnected_becomes_forgotten_test(_) ->
consumer(Pid1, 1, {connected, active}),
consumer(Pid2, 2, {connected, waiting})]),
Groups1),
assertSendMessageEffect(Pid1, 1, stream(), name(), true, Eff),
assertSendMessageActivateEffect(Pid1, 1, stream(), name(), true, Eff),
ok.
forget_connection_super_stream_disconnected_becomes_forgotten_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
Pid2 = new_process(),
GId = group_id(),
Group = cgroup(1, [consumer(Pid0, 0, {connected, waiting}),
consumer(Pid1, 1, {disconnected, active}),
@ -757,12 +760,12 @@ forget_connection_super_stream_disconnected_becomes_forgotten_test(_) ->
consumer(Pid2, 2, {connected, active})]),
Groups1),
assertSendMessageEffect(Pid2, 2, stream(), name(), true, Eff),
assertSendMessageActivateEffect(Pid2, 2, stream(), name(), true, Eff),
ok.
register_consumer_simple_disconn_active_block_rebalancing_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
GId = group_id(),
Group = cgroup([consumer(Pid0, 0, {connected, waiting}),
consumer(Pid1, 1, {disconnected, active}),
@ -781,8 +784,8 @@ register_consumer_simple_disconn_active_block_rebalancing_test(_) ->
ok.
register_consumer_super_stream_disconn_active_block_rebalancing_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
GId = group_id(),
Group = cgroup(1, [consumer(Pid0, 0, {connected, waiting}),
consumer(Pid1, 1, {disconnected, active}),
@ -801,8 +804,8 @@ register_consumer_super_stream_disconn_active_block_rebalancing_test(_) ->
ok.
unregister_consumer_simple_disconn_active_block_rebalancing_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
GId = group_id(),
Group = cgroup([consumer(Pid0, 0, {connected, waiting}),
consumer(Pid1, 1, {disconnected, active}),
@ -819,8 +822,8 @@ unregister_consumer_simple_disconn_active_block_rebalancing_test(_) ->
ok.
unregister_consumer_super_stream_disconn_active_block_rebalancing_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
GId = group_id(),
Group = cgroup(1, [consumer(Pid0, 0, {connected, waiting}),
consumer(Pid1, 1, {disconnected, active}),
@ -837,8 +840,8 @@ unregister_consumer_super_stream_disconn_active_block_rebalancing_test(_) ->
ok.
activate_consumer_simple_disconn_active_block_rebalancing_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
GId = group_id(),
Group = cgroup([consumer(Pid0, 0, {connected, waiting}),
consumer(Pid1, 1, {disconnected, active}),
@ -856,8 +859,8 @@ activate_consumer_simple_disconn_active_block_rebalancing_test(_) ->
ok.
active_consumer_super_stream_disconn_active_block_rebalancing_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
GId = group_id(),
Group = cgroup(1, [consumer(Pid0, 0, {connected, waiting}),
consumer(Pid1, 1, {disconnected, active}),
@ -875,9 +878,9 @@ active_consumer_super_stream_disconn_active_block_rebalancing_test(_) ->
ok.
handle_connection_down_simple_disconn_active_block_rebalancing_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
Pid2 = new_process(),
GId = group_id(),
Group = cgroup([consumer(Pid0, 0, {connected, waiting}),
consumer(Pid1, 0, {disconnected, active}),
@ -893,9 +896,9 @@ handle_connection_down_simple_disconn_active_block_rebalancing_test(_) ->
ok.
handle_connection_down_super_stream_disconn_active_block_rebalancing_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
Pid2 = new_process(),
GId = group_id(),
Group = cgroup(1, [consumer(Pid0, 0, {connected, waiting}),
consumer(Pid1, 0, {disconnected, active}),
@ -911,9 +914,9 @@ handle_connection_down_super_stream_disconn_active_block_rebalancing_test(_) ->
ok.
handle_connection_node_disconnected_simple_disconn_active_block_rebalancing_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
Pid2 = new_process(),
GId = group_id(),
Group = cgroup([consumer(Pid0, 0, {connected, waiting}),
consumer(Pid1, 0, {disconnected, active}),
@ -931,9 +934,9 @@ handle_connection_node_disconnected_simple_disconn_active_block_rebalancing_test
ok.
handle_connection_node_disconnected_super_stream_disconn_active_block_rebalancing_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
Pid2 = new_process(),
GId = group_id(),
Group = cgroup(1, [consumer(Pid0, 0, {connected, waiting}),
consumer(Pid1, 0, {disconnected, active}),
@ -951,9 +954,9 @@ handle_connection_node_disconnected_super_stream_disconn_active_block_rebalancin
ok.
connection_reconnected_simple_disconn_active_block_rebalancing_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
Pid2 = new_process(),
GId = group_id(),
Group = cgroup([consumer(Pid0, 0, {disconnected, waiting}),
consumer(Pid1, 0, {disconnected, active}),
@ -972,9 +975,9 @@ connection_reconnected_simple_disconn_active_block_rebalancing_test(_) ->
ok.
connection_reconnected_super_stream_disconn_active_block_rebalancing_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
Pid2 = new_process(),
GId = group_id(),
Group = cgroup(1, [consumer(Pid0, 0, {disconnected, active}),
consumer(Pid1, 0, {disconnected, waiting}),
@ -993,9 +996,9 @@ connection_reconnected_super_stream_disconn_active_block_rebalancing_test(_) ->
ok.
forget_connection_simple_disconn_active_block_rebalancing_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
Pid2 = new_process(),
GId = group_id(),
Group = cgroup([consumer(Pid0, {disconnected, waiting}),
consumer(Pid1, {connected, waiting}),
@ -1014,9 +1017,9 @@ forget_connection_simple_disconn_active_block_rebalancing_test(_) ->
ok.
forget_connection_super_stream_disconn_active_block_rebalancing_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Pid0 = new_process(),
Pid1 = new_process(),
Pid2 = new_process(),
GId = group_id(),
Group = cgroup(1, [consumer(Pid0, {disconnected, waiting}),
consumer(Pid1, {connected, waiting}),
@ -1036,18 +1039,14 @@ forget_connection_super_stream_disconn_active_block_rebalancing_test(_) ->
purge_nodes_test(_) ->
N0 = node(),
{ok, N1Pid, N1} = peer:start(#{
name => ?FUNCTION_NAME,
connection => standard_io,
shutdown => close
}),
{N1Pid, N1} = start_node(?FUNCTION_NAME),
N0P0 = spawn(N0, fun() -> ok end),
N0P1 = spawn(N0, fun() -> ok end),
N0P2 = spawn(N0, fun() -> ok end),
N1P0 = spawn(N1, fun() -> ok end),
N1P1 = spawn(N1, fun() -> ok end),
N1P2 = spawn(N1, fun() -> ok end),
N0P0 = new_process(N0),
N0P1 = new_process(N0),
N0P2 = new_process(N0),
N1P0 = new_process(N1),
N1P1 = new_process(N1),
N1P2 = new_process(N1),
S0 = <<"s0">>,
S1 = <<"s1">>,
@ -1069,7 +1068,6 @@ purge_nodes_test(_) ->
consumer(N0P1, {connected, waiting}),
consumer(N0P2, {connected, waiting})]),
State0 = state(#{GId0 => Group0, GId1 => Group1, GId2 => Group2}),
Cmd = purge_nodes_command([N1]),
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
@ -1089,9 +1087,230 @@ purge_nodes_test(_) ->
assertContainsSendMessageEffect(N0P1, S0, true, Eff),
assertContainsSendMessageEffect(N0P0, S1, true, Eff),
_ = peer:stop(N1Pid),
stop_node(N1Pid),
ok.
node_disconnected_and_reconnected_test(_) ->
N0 = node(),
{N1Pid, N1} = start_node(?FUNCTION_NAME),
N0P0 = new_process(N0),
N0P1 = new_process(N0),
N0P2 = new_process(N0),
N1P0 = new_process(N1),
N1P1 = new_process(N1),
N1P2 = new_process(N1),
N0Pids = [N0P0, N0P1, N0P2],
N1Pids = [N1P0, N1P1, N1P2],
S0 = <<"s0">>,
S1 = <<"s1">>,
S2 = <<"s2">>,
GId0 = group_id(S0),
GId1 = group_id(S1),
GId2 = group_id(S2),
GIds = [GId0, GId1, GId2],
G0 = cgroup([consumer(N0P0, {connected, active}),
consumer(N1P0, {connected, waiting}),
consumer(N0P1, {connected, waiting})]),
G1 = cgroup(1, [consumer(N1P1, {connected, waiting}),
consumer(N0P2, {connected, active}),
consumer(N1P2, {connected, waiting})]),
G2 = cgroup([consumer(N0P0, {connected, active}),
consumer(N1P1, {connected, waiting}),
consumer(N0P2, {connected, waiting})]),
State0 = state(#{GId0 => G0, GId1 => G1, GId2 => G2}),
{State1, Eff1} = ?MOD:handle_connection_node_disconnected(N1P0, State0),
{State2, Eff2} = ?MOD:handle_connection_node_disconnected(N1P1, State1),
{State3, Eff3} = ?MOD:handle_connection_node_disconnected(N1P2, State2),
assertNodeDisconnectedTimerEffect(N1P0, Eff1),
assertNodeDisconnectedTimerEffect(N1P1, Eff2),
assertNodeDisconnectedTimerEffect(N1P2, Eff3),
assertHasGroup(GId0,
cgroup([consumer(N0P0, {connected, active}),
consumer(N1P0, {disconnected, waiting}),
consumer(N0P1, {connected, waiting})]),
State3#?STATE.groups),
assertHasGroup(GId1,
cgroup(1, [consumer(N1P1, {disconnected, waiting}),
consumer(N0P2, {connected, active}),
consumer(N1P2, {disconnected, waiting})]),
State3#?STATE.groups),
assertHasGroup(GId2,
cgroup([consumer(N0P0, {connected, active}),
consumer(N1P1, {disconnected, waiting}),
consumer(N0P2, {connected, waiting})]),
State3#?STATE.groups),
PidsGroups3 = State3#?STATE.pids_groups,
assertSize(3, PidsGroups3),
[ ?assert(maps:is_key(Pid, PidsGroups3)) || Pid <- N0Pids],
[ ?assertNot(maps:is_key(Pid, PidsGroups3)) || Pid <- N1Pids],
{State4, Eff4} = ?MOD:handle_node_reconnected(N1, State3, []),
%% groups should not change
[?assertEqual(maps:get(GId, State3#?STATE.groups),
maps:get(GId, State4#?STATE.groups))
|| GId <- GIds],
%% all connections should be checked and monitored
[begin
assertContainsCheckConnectionEffect(Pid, Eff4),
assertContainsMonitorProcessEffect(Pid, Eff4)
end || Pid <- N1Pids],
Cmd4 = connection_reconnected_command(N1P0),
{#?STATE{groups = Groups5} = State5, ok, Eff5} = ?MOD:apply(Cmd4, State4),
assertHasGroup(GId0,
cgroup([consumer(N0P0, {connected, active}),
consumer(N1P0, {connected, waiting}),
consumer(N0P1, {connected, waiting})]),
Groups5),
assertHasGroup(GId1,
cgroup(1, [consumer(N1P1, {disconnected, waiting}),
consumer(N0P2, {connected, active}),
consumer(N1P2, {disconnected, waiting})]),
Groups5),
assertHasGroup(GId2,
cgroup([consumer(N0P0, {connected, active}),
consumer(N1P1, {disconnected, waiting}),
consumer(N0P2, {connected, waiting})]),
Groups5),
assertEmpty(Eff5),
Cmd5 = connection_reconnected_command(N1P1),
{#?STATE{groups = Groups6} = State6, ok, Eff6} = ?MOD:apply(Cmd5, State5),
assertHasGroup(GId0,
cgroup([consumer(N0P0, {connected, active}),
consumer(N1P0, {connected, waiting}),
consumer(N0P1, {connected, waiting})]),
Groups6),
assertHasGroup(GId1,
cgroup(1, [consumer(N1P1, {connected, waiting}),
consumer(N0P2, {connected, active}),
consumer(N1P2, {disconnected, waiting})]),
Groups6),
assertHasGroup(GId2,
cgroup([consumer(N0P0, {connected, active}),
consumer(N1P1, {connected, waiting}),
consumer(N0P2, {connected, waiting})]),
Groups6),
assertEmpty(Eff6),
%% last connection does not come back for some reason
{#?STATE{groups = Groups7}, Eff7} = ?MOD:forget_connection(N1P2, State6),
assertHasGroup(GId0,
cgroup([consumer(N0P0, {connected, active}),
consumer(N1P0, {connected, waiting}),
consumer(N0P1, {connected, waiting})]),
Groups7),
assertHasGroup(GId1,
cgroup(1, [consumer(N1P1, {connected, waiting}),
consumer(N0P2, {connected, active}),
consumer(N1P2, {forgotten, waiting})]),
Groups7),
assertHasGroup(GId2,
cgroup([consumer(N0P0, {connected, active}),
consumer(N1P1, {connected, waiting}),
consumer(N0P2, {connected, waiting})]),
Groups7),
assertEmpty(Eff7),
stop_node(N1Pid),
ok.
node_disconnected_reconnected_connection_down_test(_) ->
N0 = node(),
{N1Pid, N1} = start_node(list_to_atom(atom_to_list(?FUNCTION_NAME) ++ "1")),
{N2Pid, N2} = start_node(list_to_atom(atom_to_list(?FUNCTION_NAME) ++ "2")),
P0 = new_process(N0),
P1 = new_process(N1),
P2 = new_process(N2),
GId = group_id(),
G0 = cgroup(1, [consumer(P0, {connected, waiting}),
consumer(P1, {connected, active}),
consumer(P2, {connected, waiting})]),
S0 = state(#{GId => G0}),
{#?STATE{groups = G1} = S1, Eff1} =
?MOD:handle_connection_node_disconnected(P1, S0),
assertHasGroup(GId,
cgroup(1, [consumer(P0, {connected, waiting}),
consumer(P1, {disconnected, active}),
consumer(P2, {connected, waiting})]),
G1),
assertNodeDisconnectedTimerEffect(P1, Eff1),
{#?STATE{groups = G2} = S2, Eff2} =
?MOD:handle_node_reconnected(N1, S1, []),
assertHasGroup(GId,
cgroup(1, [consumer(P0, {connected, waiting}),
consumer(P1, {disconnected, active}),
consumer(P2, {connected, waiting})]),
G2),
assertContainsCheckConnectionEffect(P1, Eff2),
{#?STATE{groups = G3}, Eff3} = ?MOD:handle_connection_down(P1, S2),
assertHasGroup(GId,
cgroup(1, [consumer(P0, {connected, waiting}),
consumer(P2, {connected, active})]),
G3),
assertContainsSendMessageEffect(P2, stream(), true, Eff3),
stop_node(N1Pid),
stop_node(N2Pid),
ok.
start_node(Name) ->
{ok, NodePid, Node} = peer:start(#{
name => Name,
connection => standard_io,
shutdown => close
}),
{NodePid, Node}.
stop_node(NodePid) ->
_ = peer:stop(NodePid).
new_process() ->
new_process(node()).
new_process(Node) ->
spawn(Node, fun() -> ok end).
group_id() ->
group_id(stream()).
@ -1183,50 +1402,55 @@ connection_reconnected_command(Pid) ->
purge_nodes_command(Nodes) ->
#command_purge_nodes{nodes = Nodes}.
assertContainsCheckConnectionEffect(Pid, Effects) ->
assertContainsSendMessageEffect(Pid, {sac, check_connection, #{}}, Effects).
assertContainsSendMessageEffect(Pid, Stream, Active, Effects) ->
assertContainsSendMessageEffect(Pid, 0, Stream, name(), Active, Effects).
assertContainsSendMessageEffect(Pid, SubId, Stream, ConsumerName, Active,
Effects) ->
Contains = lists:any(fun(Eff) ->
Eff =:= {mod_call,
rabbit_stream_sac_coordinator,
send_message,
[Pid,
{sac,
#{subscription_id => SubId,
stream => Stream,
consumer_name => ConsumerName,
active => Active}}]}
end, Effects),
?assert(Contains).
assertContainsSendMessageEffect(Pid, {sac,
#{subscription_id => SubId,
stream => Stream,
consumer_name => ConsumerName,
active => Active}},
Effects).
assertContainsSendMessageEffect(Pid, Msg, Effects) ->
assertContainsEffect({mod_call,
rabbit_stream_sac_coordinator,
send_message,
[Pid, Msg]}, Effects).
assertSendMessageEffect(Pid, SubId, Stream, ConsumerName, Active, [Effect]) ->
assertContainsMonitorProcessEffect(Pid, Effects) ->
assertContainsEffect({monitor, process, Pid}, Effects).
assertContainsEffect(Effect, Effects) ->
Contains = lists:any(fun(Eff) -> Eff =:= Effect end, Effects),
?assert(Contains, "List does not contain the expected effect").
assertSendMessageActivateEffect(Pid, SubId, Stream, ConsumerName, Active, Effects) ->
assertSendMessageEffect(Pid, {sac,
#{subscription_id => SubId,
stream => Stream,
consumer_name => ConsumerName,
active => Active}
}, Effects).
assertSendMessageSteppingDownEffect(Pid, SubId, Stream, ConsumerName, Effects) ->
assertSendMessageEffect(Pid, {sac,
#{subscription_id => SubId,
stream => Stream,
consumer_name => ConsumerName,
active => false,
stepping_down => true}}, Effects).
assertSendMessageEffect(Pid, Msg, [Effect]) ->
?assertEqual({mod_call,
rabbit_stream_sac_coordinator,
send_message,
[Pid,
{sac,
#{subscription_id => SubId,
stream => Stream,
consumer_name => ConsumerName,
active => Active}
}]},
Effect).
assertSendMessageSteppingDownEffect(Pid, SubId, Stream, ConsumerName, [Effect]) ->
?assertEqual({mod_call,
rabbit_stream_sac_coordinator,
send_message,
[Pid,
{sac,
#{subscription_id => SubId,
stream => Stream,
consumer_name => ConsumerName,
active => false,
stepping_down => true}}]},
[Pid, Msg]},
Effect).
assertNodeDisconnectedTimerEffect(Pid, [Effect]) ->