diff --git a/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl index 956bd899f2..e6c69bc17b 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl @@ -177,14 +177,20 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) -> delete_stream(stream_port(Config, 0), S), %% online consumers should receive a metadata update frame (stream deleted) - %% we unqueue the this frame before closing the connection + %% we unqueue this frame before closing the connection %% directly closing the connection of the cancelled consumer + %% Edge case: + %% the waiting consumer can get 2 frames: consumer_update then metadata_update. + %% This is because the active consumer is removed from the group and this triggers + %% a rebalancing. The 2 remaining consumers are most of the time cancelled when the + %% stream is deleted, so the rebalancing does not take place. + %% We just tolerate an extra frame when closing their respective connections. maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId -> log("Expecting frame in consumer ~p", [K]), {Cmd1, C1} = receive_commands(S0, C0), log("Received ~p", [Cmd1]), log("Closing"), - {ok, _} = stream_test_utils:close(S0, C1); + {ok, _} = close_connection(S0, C1); (K, {S0, C0}) -> log("Closing ~p", [K]), {ok, _} = stream_test_utils:close(S0, C0) @@ -290,12 +296,18 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co %% online consumers should receive a metadata update frame (stream deleted) %% we unqueue this frame before closing the connection %% directly closing the connection of the cancelled consumer + %% Edge case: + %% the waiting consumer can get 2 frames: consumer_update then metadata_update. + %% This is because the active consumer is removed from the group and this triggers + %% a rebalancing. The 2 remaining consumers are most of the time cancelled when the + %% stream is deleted, so the rebalancing does not take place. + %% We just tolerate an extra frame when closing their respective connections. maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId -> log("Expecting frame in consumer ~p", [K]), {Cmd1, C1} = receive_commands(S0, C0), log("Received ~p", [Cmd1]), log("Closing"), - {ok, _} = stream_test_utils:close(S0, C1); + {ok, _} = close_connection(S0, C1); (K, {S0, C0}) -> log("Closing ~p", [K]), {ok, _} = stream_test_utils:close(S0, C0) @@ -395,12 +407,18 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) - %% online consumers should receive a metadata update frame (stream deleted) %% we unqueue this frame before closing the connection %% directly closing the connection of the cancelled consumer + %% Edge case: + %% the waiting consumer can get 2 frames: consumer_update then metadata_update. + %% This is because the active consumer is removed from the group and this triggers + %% a rebalancing. The 2 remaining consumers are most of the time cancelled when the + %% stream is deleted, so the rebalancing does not take place. + %% We just tolerate an extra frame when closing their respective connections. maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId -> log("Expecting frame in consumer ~p", [K]), {Cmd1, C1} = receive_commands(S0, C0), log("Received ~p", [Cmd1]), log("Closing"), - {ok, _} = stream_test_utils:close(S0, C1); + {ok, _} = close_connection(S0, C1); (K, {S0, C0}) -> log("Closing ~p", [K]), {ok, _} = stream_test_utils:close(S0, C0) @@ -516,12 +534,18 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit %% online consumers should receive a metadata update frame (stream deleted) %% we unqueue this frame before closing the connection %% directly closing the connection of the cancelled consumer + %% Edge case: + %% the waiting consumer can get 2 frames: consumer_update then metadata_update. + %% This is because the active consumer is removed from the group and this triggers + %% a rebalancing. The 2 remaining consumers are most of the time cancelled when the + %% stream is deleted, so the rebalancing does not take place. + %% We just tolerate an extra frame when closing their respective connections. maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId -> log("Expecting frame in consumer ~p", [K]), {Cmd1, C1} = receive_commands(S0, C0), log("Received ~p", [Cmd1]), log("Closing"), - {ok, _} = stream_test_utils:close(S0, C1); + {ok, _} = close_connection(S0, C1); (K, {S0, C0}) -> log("Closing ~p", [K]), {ok, _} = stream_test_utils:close(S0, C0) @@ -858,3 +882,19 @@ log(Format) -> log(Format, Args) -> ct:pal(Format, Args). + +close_connection(Sock, C) -> + CloseReason = <<"OK">>, + CloseFrame = rabbit_stream_core:frame({request, 1, {close, ?RESPONSE_CODE_OK, CloseReason}}), + ok = gen_tcp:send(Sock, CloseFrame), + pump_until_close(Sock, C, 10). + +pump_until_close(_, _, 0) -> + ct:fail("did not get close response"); +pump_until_close(Sock, C0, N) -> + case stream_test_utils:receive_stream_commands(Sock, C0) of + {{response, 1, {close, ?RESPONSE_CODE_OK}}, C1} -> + {ok, C1}; + {_Cmd, C1} -> + pump_until_close(Sock, C1, N - 1) + end.