diff --git a/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl index 6f12bbeed0..956bd899f2 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl @@ -12,7 +12,7 @@ %% %% The Initial Developer of the Original Code is Pivotal Software, Inc. %% Copyright (c) 2025 Broadcom. All Rights Reserved. -%% The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% -module(rabbit_stream_partitions_SUITE). @@ -107,6 +107,8 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) -> %% another node will be isolated ?assertEqual(L#node.name, coordinator_leader(Config)), + log("Stream leader and coordinator leader are on ~p", [L#node.name]), + {ok, So0, C0_00} = stream_test_utils:connect(Config, 0), {ok, So1, C1_00} = stream_test_utils:connect(Config, 1), {ok, So2, C2_00} = stream_test_utils:connect(Config, 2), @@ -135,18 +137,24 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) -> end, Consumers1), #consumer{subscription_id = DiscSubId} = DisconnectedConsumer, + log("Isolating node ~p", [Isolated]), + rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN), rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N), wait_for_disconnected_consumer(Config, LN, S), wait_for_presumed_down_consumer(Config, LN, S), + log("Node ~p rejoins cluster", [Isolated]), + rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN), rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N), wait_for_all_consumers_connected(Config, LN, S), Consumers2 = query_consumers(Config, LN, S), + log("Consumers after partition resolution: ~p", [Consumers2]), + log("Disconnected consumer: ~p", [DisconnectedConsumer]), %% the disconnected, then presumed down consumer is cancelled, %% because the stream member on its node has been restarted assertSize(2, Consumers2), @@ -157,21 +165,28 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) -> %% assert the cancelled consumer received a metadata update frame SubIdToState1 = maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId -> + log("Expecting metadata update for disconnected consumer"), C1 = receive_metadata_update(S0, C0), + log("Received metadata update"), Acc#{K => {S0, C1}}; (K, {S0, C0}, Acc) -> Acc#{K => {S0, C0}} end, #{}, SubIdToState0), + log("Deleting stream"), delete_stream(stream_port(Config, 0), S), %% online consumers should receive a metadata update frame (stream deleted) %% we unqueue the this frame before closing the connection %% directly closing the connection of the cancelled consumer maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId -> - {_, C1} = receive_commands(S0, C0), + log("Expecting frame in consumer ~p", [K]), + {Cmd1, C1} = receive_commands(S0, C0), + log("Received ~p", [Cmd1]), + log("Closing"), {ok, _} = stream_test_utils:close(S0, C1); - (_, {S0, C0}) -> + (K, {S0, C0}) -> + log("Closing ~p", [K]), {ok, _} = stream_test_utils:close(S0, C0) end, SubIdToState1), @@ -190,6 +205,8 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co %% the coordinator leader node will be isolated ?assertNotEqual(L#node.name, CL), + log("Stream leader and coordinator leader are on ~p", [L#node.name]), + {ok, So0, C0_00} = stream_test_utils:connect(Config, CL), {ok, So1, C1_00} = stream_test_utils:connect(Config, CF1), {ok, So2, C2_00} = stream_test_utils:connect(Config, CF2), @@ -216,12 +233,16 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co end, Consumers1), #consumer{subscription_id = DiscSubId} = DisconnectedConsumer, + log("Isolating node ~p", [Isolated]), + rabbit_ct_broker_helpers:block_traffic_between(Isolated, CF1), rabbit_ct_broker_helpers:block_traffic_between(Isolated, CF2), wait_for_disconnected_consumer(Config, NotIsolated, S), wait_for_presumed_down_consumer(Config, NotIsolated, S), + log("Node ~p rejoins cluster", [Isolated]), + rabbit_ct_broker_helpers:allow_traffic_between(Isolated, CF1), rabbit_ct_broker_helpers:allow_traffic_between(Isolated, CF2), @@ -231,6 +252,8 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co Consumers2 = query_consumers(Config, NotIsolated, S), + log("Consumers after partition resolution ~p", [Consumers2]), + log("Disconnected consumer: ~p", [DisconnectedConsumer]), %% the disconnected, then presumed down consumer is cancelled, %% because the stream member on its node has been restarted assertSize(2, Consumers2), @@ -246,26 +269,35 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co SubIdToState1 = maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId -> + log("Expecting metadata update for disconnected consumer"), %% cancelled consumer received a metadata update C1 = receive_metadata_update(S0, C0), + log("Received metadata update"), Acc#{K => {S0, C1}}; (K, {S0, C0}, Acc) when K == ActiveSubId -> + log("Expecting consumer update for promoted consumer"), %% promoted consumer should have received consumer update C1 = receive_consumer_update_and_respond(S0, C0), + log("Received consumer update"), Acc#{K => {S0, C1}}; (K, {S0, C0}, Acc) -> Acc#{K => {S0, C0}} end, #{}, SubIdToState0), + log("Deleting stream"), delete_stream(L#node.stream_port, S), %% online consumers should receive a metadata update frame (stream deleted) %% we unqueue this frame before closing the connection %% directly closing the connection of the cancelled consumer maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId -> - {_, C1} = receive_commands(S0, C0), + log("Expecting frame in consumer ~p", [K]), + {Cmd1, C1} = receive_commands(S0, C0), + log("Received ~p", [Cmd1]), + log("Closing"), {ok, _} = stream_test_utils:close(S0, C1); - (_, {S0, C0}) -> + (K, {S0, C0}) -> + log("Closing ~p", [K]), {ok, _} = stream_test_utils:close(S0, C0) end, SubIdToState1), @@ -286,6 +318,8 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) - %% another node will be isolated ?assertEqual(L#node.name, CL), + log("Stream leader and coordinator leader are on ~p", [L#node.name]), + {ok, So0, C0_00} = stream_test_utils:connect(L#node.stream_port), {ok, So1, C1_00} = stream_test_utils:connect(F1#node.stream_port), {ok, So2, C2_00} = stream_test_utils:connect(F2#node.stream_port), @@ -315,12 +349,16 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) - end, Consumers1), #consumer{subscription_id = DiscSubId} = DisconnectedConsumer, + log("Isolating node ~p", [Isolated]), + rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN), rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N), wait_for_disconnected_consumer(Config, NotIsolated, Partition), wait_for_presumed_down_consumer(Config, NotIsolated, Partition), + log("Node ~p rejoins cluster", [Isolated]), + rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN), rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N), @@ -329,6 +367,8 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) - wait_for_all_consumers_connected(Config, NotIsolated, Partition), Consumers2 = query_consumers(Config, NotIsolated, Partition), + log("Consumers after partition resolution: ~p", [Consumers2]), + log("Disconnected consumer: ~p", [DisconnectedConsumer]), %% the disconnected, then presumed down consumer is cancelled, %% because the stream member on its node has been restarted @@ -340,22 +380,29 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) - SubIdToState1 = maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId -> + log("Expecting metadata update for disconnected consumer"), %% cancelled consumer received a metadata update C1 = receive_metadata_update(S0, C0), + log("Received metadata update"), Acc#{K => {S0, C1}}; (K, {S0, C0}, Acc) -> Acc#{K => {S0, C0}} end, #{}, SubIdToState0), + log("Deleting super stream"), delete_super_stream(L#node.stream_port, Ss), %% online consumers should receive a metadata update frame (stream deleted) %% we unqueue this frame before closing the connection %% directly closing the connection of the cancelled consumer maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId -> - {_, C1} = receive_commands(S0, C0), + log("Expecting frame in consumer ~p", [K]), + {Cmd1, C1} = receive_commands(S0, C0), + log("Received ~p", [Cmd1]), + log("Closing"), {ok, _} = stream_test_utils:close(S0, C1); - (_, {S0, C0}) -> + (K, {S0, C0}) -> + log("Closing ~p", [K]), {ok, _} = stream_test_utils:close(S0, C0) end, SubIdToState1), ok. @@ -374,6 +421,8 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit %% the coordinator leader node will be isolated ?assertNotEqual(L#node.name, CL), + log("Stream leader and coordinator leader are on ~p", [L#node.name]), + {ok, So0, C0_00} = stream_test_utils:connect(L#node.stream_port), {ok, So1, C1_00} = stream_test_utils:connect(F1#node.stream_port), {ok, So2, C2_00} = stream_test_utils:connect(F2#node.stream_port), @@ -410,12 +459,16 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit end, Consumers1), #consumer{subscription_id = DiscSubId} = DisconnectedConsumer, + log("Isolating node ~p", [Isolated]), + rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN), rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N), wait_for_disconnected_consumer(Config, NotIsolated, Partition), wait_for_presumed_down_consumer(Config, NotIsolated, Partition), + log("Node ~p rejoins cluster", [Isolated]), + rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN), rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N), @@ -424,6 +477,8 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit wait_for_all_consumers_connected(Config, NotIsolated, Partition), Consumers2 = query_consumers(Config, NotIsolated, Partition), + log("Consumers after partition resolution: ~p", [Consumers2]), + log("Disconnected consumer: ~p", [DisconnectedConsumer]), %% the disconnected, then presumed down consumer is cancelled, %% because the stream member on its node has been restarted @@ -440,27 +495,35 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit SubIdToState1 = maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId -> + log("Expecting metadata update for disconnected consumer"), %% cancelled consumer received a metadata update C1 = receive_metadata_update(S0, C0), + log("Received metadata update"), Acc#{K => {S0, C1}}; (K, {S0, C0}, Acc) when K == ActiveSubId -> + log("Expecting consumer update for promoted consumer"), %% promoted consumer should have received consumer update C1 = receive_consumer_update_and_respond(S0, C0), + log("Received consumer update"), Acc#{K => {S0, C1}}; (K, {S0, C0}, Acc) -> Acc#{K => {S0, C0}} end, #{}, SubIdToState0), + log("Deleting super stream"), delete_super_stream(L#node.stream_port, Ss), %% online consumers should receive a metadata update frame (stream deleted) %% we unqueue this frame before closing the connection %% directly closing the connection of the cancelled consumer maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId -> + log("Expecting frame in consumer ~p", [K]), {Cmd1, C1} = receive_commands(S0, C0), - ct:pal("Received command: ~p", [Cmd1]), + log("Received ~p", [Cmd1]), + log("Closing"), {ok, _} = stream_test_utils:close(S0, C1); - (_, {S0, C0}) -> + (K, {S0, C0}) -> + log("Closing ~p", [K]), {ok, _} = stream_test_utils:close(S0, C0) end, SubIdToState1), ok. @@ -727,6 +790,7 @@ wait_for_disconnected_consumer(Config, Node, Stream) -> rabbit_ct_helpers:await_condition( fun() -> Cs = query_consumers(Config, Node, Stream), + log("Expecting a disconnected consumer: ~p", [Cs]), lists:any(fun(#consumer{status = {disconnected, _}}) -> true; (_) -> @@ -738,6 +802,7 @@ wait_for_presumed_down_consumer(Config, Node, Stream) -> rabbit_ct_helpers:await_condition( fun() -> Cs = query_consumers(Config, Node, Stream), + log("Expecting a presumed-down consumer: ~p", [Cs]), lists:any(fun(#consumer{status = {presumed_down, _}}) -> true; (_) -> @@ -749,6 +814,7 @@ wait_for_all_consumers_connected(Config, Node, Stream) -> rabbit_ct_helpers:await_condition( fun() -> Cs = query_consumers(Config, Node, Stream), + log("Expecting connected consumers: ~p", [Cs]), lists:all(fun(#consumer{status = {connected, _}}) -> true; (_) -> @@ -761,6 +827,7 @@ wait_for_coordinator_ready(Config) -> rabbit_ct_helpers:await_condition( fun() -> Status = coordinator_status(Config), + log("Coordinator status: ~p", [Status]), lists:all(fun(St) -> RS = proplists:get_value(<<"Raft State">>, St, undefined), @@ -785,3 +852,9 @@ assertSize(Expected, List) when is_list(List) -> assertEmpty(Data) -> assertSize(0, Data). + +log(Format) -> + ct:pal(Format). + +log(Format, Args) -> + ct:pal(Format, Args).