From 9de50e267c2629813d2f0d5e524ae9569c593508 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Tue, 15 Jul 2025 15:51:02 +0200 Subject: [PATCH] Fix flake in stream plugin test suite The closing sequence must account for consumer update and metadata update frames the broker sends when a consumer group changes and when a stream is deleted. (cherry picked from commit ff98f6fc1e60314f368a54315d5fb60accaad39d) --- .../test/rabbit_stream_partitions_SUITE.erl | 50 +++++++++++++++++-- 1 file changed, 45 insertions(+), 5 deletions(-) diff --git a/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl index 956bd899f2..e6c69bc17b 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl @@ -177,14 +177,20 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) -> delete_stream(stream_port(Config, 0), S), %% online consumers should receive a metadata update frame (stream deleted) - %% we unqueue the this frame before closing the connection + %% we unqueue this frame before closing the connection %% directly closing the connection of the cancelled consumer + %% Edge case: + %% the waiting consumer can get 2 frames: consumer_update then metadata_update. + %% This is because the active consumer is removed from the group and this triggers + %% a rebalancing. The 2 remaining consumers are most of the time cancelled when the + %% stream is deleted, so the rebalancing does not take place. + %% We just tolerate an extra frame when closing their respective connections. maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId -> log("Expecting frame in consumer ~p", [K]), {Cmd1, C1} = receive_commands(S0, C0), log("Received ~p", [Cmd1]), log("Closing"), - {ok, _} = stream_test_utils:close(S0, C1); + {ok, _} = close_connection(S0, C1); (K, {S0, C0}) -> log("Closing ~p", [K]), {ok, _} = stream_test_utils:close(S0, C0) @@ -290,12 +296,18 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co %% online consumers should receive a metadata update frame (stream deleted) %% we unqueue this frame before closing the connection %% directly closing the connection of the cancelled consumer + %% Edge case: + %% the waiting consumer can get 2 frames: consumer_update then metadata_update. + %% This is because the active consumer is removed from the group and this triggers + %% a rebalancing. The 2 remaining consumers are most of the time cancelled when the + %% stream is deleted, so the rebalancing does not take place. + %% We just tolerate an extra frame when closing their respective connections. maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId -> log("Expecting frame in consumer ~p", [K]), {Cmd1, C1} = receive_commands(S0, C0), log("Received ~p", [Cmd1]), log("Closing"), - {ok, _} = stream_test_utils:close(S0, C1); + {ok, _} = close_connection(S0, C1); (K, {S0, C0}) -> log("Closing ~p", [K]), {ok, _} = stream_test_utils:close(S0, C0) @@ -395,12 +407,18 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) - %% online consumers should receive a metadata update frame (stream deleted) %% we unqueue this frame before closing the connection %% directly closing the connection of the cancelled consumer + %% Edge case: + %% the waiting consumer can get 2 frames: consumer_update then metadata_update. + %% This is because the active consumer is removed from the group and this triggers + %% a rebalancing. The 2 remaining consumers are most of the time cancelled when the + %% stream is deleted, so the rebalancing does not take place. + %% We just tolerate an extra frame when closing their respective connections. maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId -> log("Expecting frame in consumer ~p", [K]), {Cmd1, C1} = receive_commands(S0, C0), log("Received ~p", [Cmd1]), log("Closing"), - {ok, _} = stream_test_utils:close(S0, C1); + {ok, _} = close_connection(S0, C1); (K, {S0, C0}) -> log("Closing ~p", [K]), {ok, _} = stream_test_utils:close(S0, C0) @@ -516,12 +534,18 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit %% online consumers should receive a metadata update frame (stream deleted) %% we unqueue this frame before closing the connection %% directly closing the connection of the cancelled consumer + %% Edge case: + %% the waiting consumer can get 2 frames: consumer_update then metadata_update. + %% This is because the active consumer is removed from the group and this triggers + %% a rebalancing. The 2 remaining consumers are most of the time cancelled when the + %% stream is deleted, so the rebalancing does not take place. + %% We just tolerate an extra frame when closing their respective connections. maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId -> log("Expecting frame in consumer ~p", [K]), {Cmd1, C1} = receive_commands(S0, C0), log("Received ~p", [Cmd1]), log("Closing"), - {ok, _} = stream_test_utils:close(S0, C1); + {ok, _} = close_connection(S0, C1); (K, {S0, C0}) -> log("Closing ~p", [K]), {ok, _} = stream_test_utils:close(S0, C0) @@ -858,3 +882,19 @@ log(Format) -> log(Format, Args) -> ct:pal(Format, Args). + +close_connection(Sock, C) -> + CloseReason = <<"OK">>, + CloseFrame = rabbit_stream_core:frame({request, 1, {close, ?RESPONSE_CODE_OK, CloseReason}}), + ok = gen_tcp:send(Sock, CloseFrame), + pump_until_close(Sock, C, 10). + +pump_until_close(_, _, 0) -> + ct:fail("did not get close response"); +pump_until_close(Sock, C0, N) -> + case stream_test_utils:receive_stream_commands(Sock, C0) of + {{response, 1, {close, ?RESPONSE_CODE_OK}}, C1} -> + {ok, C1}; + {_Cmd, C1} -> + pump_until_close(Sock, C1, N - 1) + end.