Merge pull request #14122 from rabbitmq/mergify/bp/v4.1.x/pr-14115
Test (make) / Build and Xref (1.17, 26) (push) Waiting to run Details
Test (make) / Build and Xref (1.17, 27) (push) Waiting to run Details
Test (make) / Test (1.17, 27, khepri) (push) Waiting to run Details
Test (make) / Test (1.17, 27, mnesia) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.17, 27, khepri) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.17, 27, mnesia) (push) Waiting to run Details
Test (make) / Type check (1.17, 27) (push) Waiting to run Details

Add log in test (backport #14115)
This commit is contained in:
Michael Klishin 2025-06-25 14:29:17 +04:00 committed by GitHub
commit 5ca894138d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 82 additions and 9 deletions

View File

@ -12,7 +12,7 @@
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2025 Broadcom. All Rights Reserved.
%% The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%% The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_stream_partitions_SUITE).
@ -107,6 +107,8 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
%% another node will be isolated
?assertEqual(L#node.name, coordinator_leader(Config)),
log("Stream leader and coordinator leader are on ~p", [L#node.name]),
{ok, So0, C0_00} = stream_test_utils:connect(Config, 0),
{ok, So1, C1_00} = stream_test_utils:connect(Config, 1),
{ok, So2, C2_00} = stream_test_utils:connect(Config, 2),
@ -135,18 +137,24 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
log("Isolating node ~p", [Isolated]),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N),
wait_for_disconnected_consumer(Config, LN, S),
wait_for_presumed_down_consumer(Config, LN, S),
log("Node ~p rejoins cluster", [Isolated]),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N),
wait_for_all_consumers_connected(Config, LN, S),
Consumers2 = query_consumers(Config, LN, S),
log("Consumers after partition resolution: ~p", [Consumers2]),
log("Disconnected consumer: ~p", [DisconnectedConsumer]),
%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
assertSize(2, Consumers2),
@ -157,21 +165,28 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
%% assert the cancelled consumer received a metadata update frame
SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
log("Expecting metadata update for disconnected consumer"),
C1 = receive_metadata_update(S0, C0),
log("Received metadata update"),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),
log("Deleting stream"),
delete_stream(stream_port(Config, 0), S),
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue the this frame before closing the connection
%% directly closing the connection of the cancelled consumer
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
{_, C1} = receive_commands(S0, C0),
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = stream_test_utils:close(S0, C1);
(_, {S0, C0}) ->
(K, {S0, C0}) ->
log("Closing ~p", [K]),
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),
@ -190,6 +205,8 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
%% the coordinator leader node will be isolated
?assertNotEqual(L#node.name, CL),
log("Stream leader and coordinator leader are on ~p", [L#node.name]),
{ok, So0, C0_00} = stream_test_utils:connect(Config, CL),
{ok, So1, C1_00} = stream_test_utils:connect(Config, CF1),
{ok, So2, C2_00} = stream_test_utils:connect(Config, CF2),
@ -216,12 +233,16 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
log("Isolating node ~p", [Isolated]),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, CF1),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, CF2),
wait_for_disconnected_consumer(Config, NotIsolated, S),
wait_for_presumed_down_consumer(Config, NotIsolated, S),
log("Node ~p rejoins cluster", [Isolated]),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, CF1),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, CF2),
@ -231,6 +252,8 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
Consumers2 = query_consumers(Config, NotIsolated, S),
log("Consumers after partition resolution ~p", [Consumers2]),
log("Disconnected consumer: ~p", [DisconnectedConsumer]),
%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
assertSize(2, Consumers2),
@ -246,26 +269,35 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
log("Expecting metadata update for disconnected consumer"),
%% cancelled consumer received a metadata update
C1 = receive_metadata_update(S0, C0),
log("Received metadata update"),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) when K == ActiveSubId ->
log("Expecting consumer update for promoted consumer"),
%% promoted consumer should have received consumer update
C1 = receive_consumer_update_and_respond(S0, C0),
log("Received consumer update"),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),
log("Deleting stream"),
delete_stream(L#node.stream_port, S),
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
{_, C1} = receive_commands(S0, C0),
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = stream_test_utils:close(S0, C1);
(_, {S0, C0}) ->
(K, {S0, C0}) ->
log("Closing ~p", [K]),
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),
@ -286,6 +318,8 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
%% another node will be isolated
?assertEqual(L#node.name, CL),
log("Stream leader and coordinator leader are on ~p", [L#node.name]),
{ok, So0, C0_00} = stream_test_utils:connect(L#node.stream_port),
{ok, So1, C1_00} = stream_test_utils:connect(F1#node.stream_port),
{ok, So2, C2_00} = stream_test_utils:connect(F2#node.stream_port),
@ -315,12 +349,16 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
log("Isolating node ~p", [Isolated]),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N),
wait_for_disconnected_consumer(Config, NotIsolated, Partition),
wait_for_presumed_down_consumer(Config, NotIsolated, Partition),
log("Node ~p rejoins cluster", [Isolated]),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N),
@ -329,6 +367,8 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
wait_for_all_consumers_connected(Config, NotIsolated, Partition),
Consumers2 = query_consumers(Config, NotIsolated, Partition),
log("Consumers after partition resolution: ~p", [Consumers2]),
log("Disconnected consumer: ~p", [DisconnectedConsumer]),
%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
@ -340,22 +380,29 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
log("Expecting metadata update for disconnected consumer"),
%% cancelled consumer received a metadata update
C1 = receive_metadata_update(S0, C0),
log("Received metadata update"),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),
log("Deleting super stream"),
delete_super_stream(L#node.stream_port, Ss),
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
{_, C1} = receive_commands(S0, C0),
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = stream_test_utils:close(S0, C1);
(_, {S0, C0}) ->
(K, {S0, C0}) ->
log("Closing ~p", [K]),
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),
ok.
@ -374,6 +421,8 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
%% the coordinator leader node will be isolated
?assertNotEqual(L#node.name, CL),
log("Stream leader and coordinator leader are on ~p", [L#node.name]),
{ok, So0, C0_00} = stream_test_utils:connect(L#node.stream_port),
{ok, So1, C1_00} = stream_test_utils:connect(F1#node.stream_port),
{ok, So2, C2_00} = stream_test_utils:connect(F2#node.stream_port),
@ -410,12 +459,16 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
log("Isolating node ~p", [Isolated]),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N),
wait_for_disconnected_consumer(Config, NotIsolated, Partition),
wait_for_presumed_down_consumer(Config, NotIsolated, Partition),
log("Node ~p rejoins cluster", [Isolated]),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N),
@ -424,6 +477,8 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
wait_for_all_consumers_connected(Config, NotIsolated, Partition),
Consumers2 = query_consumers(Config, NotIsolated, Partition),
log("Consumers after partition resolution: ~p", [Consumers2]),
log("Disconnected consumer: ~p", [DisconnectedConsumer]),
%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
@ -440,27 +495,35 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
log("Expecting metadata update for disconnected consumer"),
%% cancelled consumer received a metadata update
C1 = receive_metadata_update(S0, C0),
log("Received metadata update"),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) when K == ActiveSubId ->
log("Expecting consumer update for promoted consumer"),
%% promoted consumer should have received consumer update
C1 = receive_consumer_update_and_respond(S0, C0),
log("Received consumer update"),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),
log("Deleting super stream"),
delete_super_stream(L#node.stream_port, Ss),
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
ct:pal("Received command: ~p", [Cmd1]),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = stream_test_utils:close(S0, C1);
(_, {S0, C0}) ->
(K, {S0, C0}) ->
log("Closing ~p", [K]),
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),
ok.
@ -727,6 +790,7 @@ wait_for_disconnected_consumer(Config, Node, Stream) ->
rabbit_ct_helpers:await_condition(
fun() ->
Cs = query_consumers(Config, Node, Stream),
log("Expecting a disconnected consumer: ~p", [Cs]),
lists:any(fun(#consumer{status = {disconnected, _}}) ->
true;
(_) ->
@ -738,6 +802,7 @@ wait_for_presumed_down_consumer(Config, Node, Stream) ->
rabbit_ct_helpers:await_condition(
fun() ->
Cs = query_consumers(Config, Node, Stream),
log("Expecting a presumed-down consumer: ~p", [Cs]),
lists:any(fun(#consumer{status = {presumed_down, _}}) ->
true;
(_) ->
@ -749,6 +814,7 @@ wait_for_all_consumers_connected(Config, Node, Stream) ->
rabbit_ct_helpers:await_condition(
fun() ->
Cs = query_consumers(Config, Node, Stream),
log("Expecting connected consumers: ~p", [Cs]),
lists:all(fun(#consumer{status = {connected, _}}) ->
true;
(_) ->
@ -761,6 +827,7 @@ wait_for_coordinator_ready(Config) ->
rabbit_ct_helpers:await_condition(
fun() ->
Status = coordinator_status(Config),
log("Coordinator status: ~p", [Status]),
lists:all(fun(St) ->
RS = proplists:get_value(<<"Raft State">>, St,
undefined),
@ -785,3 +852,9 @@ assertSize(Expected, List) when is_list(List) ->
assertEmpty(Data) ->
assertSize(0, Data).
log(Format) ->
ct:pal(Format).
log(Format, Args) ->
ct:pal(Format, Args).