Add log statements stream network partitions

The test creates network partitions and checks how the stream SAC
coordinator deals with them. It can be flaky on CI, the log statements
should help diagnose the flakiness.

(cherry picked from commit 066145763f)
This commit is contained in:
Arnaud Cogoluègnes 2025-06-24 12:09:21 +02:00 committed by Mergify
parent af9b0d00ba
commit f0776c8b97
1 changed files with 82 additions and 9 deletions

View File

@ -12,7 +12,7 @@
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2025 Broadcom. All Rights Reserved.
%% The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%% The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_stream_partitions_SUITE).
@ -107,6 +107,8 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
%% another node will be isolated
?assertEqual(L#node.name, coordinator_leader(Config)),
log("Stream leader and coordinator leader are on ~p", [L#node.name]),
{ok, So0, C0_00} = stream_test_utils:connect(Config, 0),
{ok, So1, C1_00} = stream_test_utils:connect(Config, 1),
{ok, So2, C2_00} = stream_test_utils:connect(Config, 2),
@ -135,18 +137,24 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
log("Isolating node ~p", [Isolated]),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N),
wait_for_disconnected_consumer(Config, LN, S),
wait_for_presumed_down_consumer(Config, LN, S),
log("Node ~p rejoins cluster", [Isolated]),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N),
wait_for_all_consumers_connected(Config, LN, S),
Consumers2 = query_consumers(Config, LN, S),
log("Consumers after partition resolution: ~p", [Consumers2]),
log("Disconnected consumer: ~p", [DisconnectedConsumer]),
%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
assertSize(2, Consumers2),
@ -157,21 +165,28 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
%% assert the cancelled consumer received a metadata update frame
SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
log("Expecting metadata update for disconnected consumer"),
C1 = receive_metadata_update(S0, C0),
log("Received metadata update"),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),
log("Deleting stream"),
delete_stream(stream_port(Config, 0), S),
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue the this frame before closing the connection
%% directly closing the connection of the cancelled consumer
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
{_, C1} = receive_commands(S0, C0),
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = stream_test_utils:close(S0, C1);
(_, {S0, C0}) ->
(K, {S0, C0}) ->
log("Closing ~p", [K]),
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),
@ -190,6 +205,8 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
%% the coordinator leader node will be isolated
?assertNotEqual(L#node.name, CL),
log("Stream leader and coordinator leader are on ~p", [L#node.name]),
{ok, So0, C0_00} = stream_test_utils:connect(Config, CL),
{ok, So1, C1_00} = stream_test_utils:connect(Config, CF1),
{ok, So2, C2_00} = stream_test_utils:connect(Config, CF2),
@ -216,12 +233,16 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
log("Isolating node ~p", [Isolated]),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, CF1),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, CF2),
wait_for_disconnected_consumer(Config, NotIsolated, S),
wait_for_presumed_down_consumer(Config, NotIsolated, S),
log("Node ~p rejoins cluster", [Isolated]),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, CF1),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, CF2),
@ -231,6 +252,8 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
Consumers2 = query_consumers(Config, NotIsolated, S),
log("Consumers after partition resolution ~p", [Consumers2]),
log("Disconnected consumer: ~p", [DisconnectedConsumer]),
%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
assertSize(2, Consumers2),
@ -246,26 +269,35 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
log("Expecting metadata update for disconnected consumer"),
%% cancelled consumer received a metadata update
C1 = receive_metadata_update(S0, C0),
log("Received metadata update"),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) when K == ActiveSubId ->
log("Expecting consumer update for promoted consumer"),
%% promoted consumer should have received consumer update
C1 = receive_consumer_update_and_respond(S0, C0),
log("Received consumer update"),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),
log("Deleting stream"),
delete_stream(L#node.stream_port, S),
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
{_, C1} = receive_commands(S0, C0),
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = stream_test_utils:close(S0, C1);
(_, {S0, C0}) ->
(K, {S0, C0}) ->
log("Closing ~p", [K]),
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),
@ -286,6 +318,8 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
%% another node will be isolated
?assertEqual(L#node.name, CL),
log("Stream leader and coordinator leader are on ~p", [L#node.name]),
{ok, So0, C0_00} = stream_test_utils:connect(L#node.stream_port),
{ok, So1, C1_00} = stream_test_utils:connect(F1#node.stream_port),
{ok, So2, C2_00} = stream_test_utils:connect(F2#node.stream_port),
@ -315,12 +349,16 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
log("Isolating node ~p", [Isolated]),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N),
wait_for_disconnected_consumer(Config, NotIsolated, Partition),
wait_for_presumed_down_consumer(Config, NotIsolated, Partition),
log("Node ~p rejoins cluster", [Isolated]),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N),
@ -329,6 +367,8 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
wait_for_all_consumers_connected(Config, NotIsolated, Partition),
Consumers2 = query_consumers(Config, NotIsolated, Partition),
log("Consumers after partition resolution: ~p", [Consumers2]),
log("Disconnected consumer: ~p", [DisconnectedConsumer]),
%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
@ -340,22 +380,29 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
log("Expecting metadata update for disconnected consumer"),
%% cancelled consumer received a metadata update
C1 = receive_metadata_update(S0, C0),
log("Received metadata update"),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),
log("Deleting super stream"),
delete_super_stream(L#node.stream_port, Ss),
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
{_, C1} = receive_commands(S0, C0),
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = stream_test_utils:close(S0, C1);
(_, {S0, C0}) ->
(K, {S0, C0}) ->
log("Closing ~p", [K]),
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),
ok.
@ -374,6 +421,8 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
%% the coordinator leader node will be isolated
?assertNotEqual(L#node.name, CL),
log("Stream leader and coordinator leader are on ~p", [L#node.name]),
{ok, So0, C0_00} = stream_test_utils:connect(L#node.stream_port),
{ok, So1, C1_00} = stream_test_utils:connect(F1#node.stream_port),
{ok, So2, C2_00} = stream_test_utils:connect(F2#node.stream_port),
@ -410,12 +459,16 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
log("Isolating node ~p", [Isolated]),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N),
wait_for_disconnected_consumer(Config, NotIsolated, Partition),
wait_for_presumed_down_consumer(Config, NotIsolated, Partition),
log("Node ~p rejoins cluster", [Isolated]),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N),
@ -424,6 +477,8 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
wait_for_all_consumers_connected(Config, NotIsolated, Partition),
Consumers2 = query_consumers(Config, NotIsolated, Partition),
log("Consumers after partition resolution: ~p", [Consumers2]),
log("Disconnected consumer: ~p", [DisconnectedConsumer]),
%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
@ -440,27 +495,35 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
log("Expecting metadata update for disconnected consumer"),
%% cancelled consumer received a metadata update
C1 = receive_metadata_update(S0, C0),
log("Received metadata update"),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) when K == ActiveSubId ->
log("Expecting consumer update for promoted consumer"),
%% promoted consumer should have received consumer update
C1 = receive_consumer_update_and_respond(S0, C0),
log("Received consumer update"),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),
log("Deleting super stream"),
delete_super_stream(L#node.stream_port, Ss),
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
ct:pal("Received command: ~p", [Cmd1]),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = stream_test_utils:close(S0, C1);
(_, {S0, C0}) ->
(K, {S0, C0}) ->
log("Closing ~p", [K]),
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),
ok.
@ -727,6 +790,7 @@ wait_for_disconnected_consumer(Config, Node, Stream) ->
rabbit_ct_helpers:await_condition(
fun() ->
Cs = query_consumers(Config, Node, Stream),
log("Expecting a disconnected consumer: ~p", [Cs]),
lists:any(fun(#consumer{status = {disconnected, _}}) ->
true;
(_) ->
@ -738,6 +802,7 @@ wait_for_presumed_down_consumer(Config, Node, Stream) ->
rabbit_ct_helpers:await_condition(
fun() ->
Cs = query_consumers(Config, Node, Stream),
log("Expecting a presumed-down consumer: ~p", [Cs]),
lists:any(fun(#consumer{status = {presumed_down, _}}) ->
true;
(_) ->
@ -749,6 +814,7 @@ wait_for_all_consumers_connected(Config, Node, Stream) ->
rabbit_ct_helpers:await_condition(
fun() ->
Cs = query_consumers(Config, Node, Stream),
log("Expecting connected consumers: ~p", [Cs]),
lists:all(fun(#consumer{status = {connected, _}}) ->
true;
(_) ->
@ -761,6 +827,7 @@ wait_for_coordinator_ready(Config) ->
rabbit_ct_helpers:await_condition(
fun() ->
Status = coordinator_status(Config),
log("Coordinator status: ~p", [Status]),
lists:all(fun(St) ->
RS = proplists:get_value(<<"Raft State">>, St,
undefined),
@ -785,3 +852,9 @@ assertSize(Expected, List) when is_list(List) ->
assertEmpty(Data) ->
assertSize(0, Data).
log(Format) ->
ct:pal(Format).
log(Format, Args) ->
ct:pal(Format, Args).