Fix flake in stream plugin test suite
The closing sequence must account for consumer update and metadata
update frames the broker sends when a consumer group changes and when a
stream is deleted.
(cherry picked from commit ff98f6fc1e
)
This commit is contained in:
parent
9bcbad197c
commit
9de50e267c
|
@ -177,14 +177,20 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
|
|||
delete_stream(stream_port(Config, 0), S),
|
||||
|
||||
%% online consumers should receive a metadata update frame (stream deleted)
|
||||
%% we unqueue the this frame before closing the connection
|
||||
%% we unqueue this frame before closing the connection
|
||||
%% directly closing the connection of the cancelled consumer
|
||||
%% Edge case:
|
||||
%% the waiting consumer can get 2 frames: consumer_update then metadata_update.
|
||||
%% This is because the active consumer is removed from the group and this triggers
|
||||
%% a rebalancing. The 2 remaining consumers are most of the time cancelled when the
|
||||
%% stream is deleted, so the rebalancing does not take place.
|
||||
%% We just tolerate an extra frame when closing their respective connections.
|
||||
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
|
||||
log("Expecting frame in consumer ~p", [K]),
|
||||
{Cmd1, C1} = receive_commands(S0, C0),
|
||||
log("Received ~p", [Cmd1]),
|
||||
log("Closing"),
|
||||
{ok, _} = stream_test_utils:close(S0, C1);
|
||||
{ok, _} = close_connection(S0, C1);
|
||||
(K, {S0, C0}) ->
|
||||
log("Closing ~p", [K]),
|
||||
{ok, _} = stream_test_utils:close(S0, C0)
|
||||
|
@ -290,12 +296,18 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
|
|||
%% online consumers should receive a metadata update frame (stream deleted)
|
||||
%% we unqueue this frame before closing the connection
|
||||
%% directly closing the connection of the cancelled consumer
|
||||
%% Edge case:
|
||||
%% the waiting consumer can get 2 frames: consumer_update then metadata_update.
|
||||
%% This is because the active consumer is removed from the group and this triggers
|
||||
%% a rebalancing. The 2 remaining consumers are most of the time cancelled when the
|
||||
%% stream is deleted, so the rebalancing does not take place.
|
||||
%% We just tolerate an extra frame when closing their respective connections.
|
||||
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
|
||||
log("Expecting frame in consumer ~p", [K]),
|
||||
{Cmd1, C1} = receive_commands(S0, C0),
|
||||
log("Received ~p", [Cmd1]),
|
||||
log("Closing"),
|
||||
{ok, _} = stream_test_utils:close(S0, C1);
|
||||
{ok, _} = close_connection(S0, C1);
|
||||
(K, {S0, C0}) ->
|
||||
log("Closing ~p", [K]),
|
||||
{ok, _} = stream_test_utils:close(S0, C0)
|
||||
|
@ -395,12 +407,18 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
|
|||
%% online consumers should receive a metadata update frame (stream deleted)
|
||||
%% we unqueue this frame before closing the connection
|
||||
%% directly closing the connection of the cancelled consumer
|
||||
%% Edge case:
|
||||
%% the waiting consumer can get 2 frames: consumer_update then metadata_update.
|
||||
%% This is because the active consumer is removed from the group and this triggers
|
||||
%% a rebalancing. The 2 remaining consumers are most of the time cancelled when the
|
||||
%% stream is deleted, so the rebalancing does not take place.
|
||||
%% We just tolerate an extra frame when closing their respective connections.
|
||||
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
|
||||
log("Expecting frame in consumer ~p", [K]),
|
||||
{Cmd1, C1} = receive_commands(S0, C0),
|
||||
log("Received ~p", [Cmd1]),
|
||||
log("Closing"),
|
||||
{ok, _} = stream_test_utils:close(S0, C1);
|
||||
{ok, _} = close_connection(S0, C1);
|
||||
(K, {S0, C0}) ->
|
||||
log("Closing ~p", [K]),
|
||||
{ok, _} = stream_test_utils:close(S0, C0)
|
||||
|
@ -516,12 +534,18 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
|
|||
%% online consumers should receive a metadata update frame (stream deleted)
|
||||
%% we unqueue this frame before closing the connection
|
||||
%% directly closing the connection of the cancelled consumer
|
||||
%% Edge case:
|
||||
%% the waiting consumer can get 2 frames: consumer_update then metadata_update.
|
||||
%% This is because the active consumer is removed from the group and this triggers
|
||||
%% a rebalancing. The 2 remaining consumers are most of the time cancelled when the
|
||||
%% stream is deleted, so the rebalancing does not take place.
|
||||
%% We just tolerate an extra frame when closing their respective connections.
|
||||
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
|
||||
log("Expecting frame in consumer ~p", [K]),
|
||||
{Cmd1, C1} = receive_commands(S0, C0),
|
||||
log("Received ~p", [Cmd1]),
|
||||
log("Closing"),
|
||||
{ok, _} = stream_test_utils:close(S0, C1);
|
||||
{ok, _} = close_connection(S0, C1);
|
||||
(K, {S0, C0}) ->
|
||||
log("Closing ~p", [K]),
|
||||
{ok, _} = stream_test_utils:close(S0, C0)
|
||||
|
@ -858,3 +882,19 @@ log(Format) ->
|
|||
|
||||
log(Format, Args) ->
|
||||
ct:pal(Format, Args).
|
||||
|
||||
close_connection(Sock, C) ->
|
||||
CloseReason = <<"OK">>,
|
||||
CloseFrame = rabbit_stream_core:frame({request, 1, {close, ?RESPONSE_CODE_OK, CloseReason}}),
|
||||
ok = gen_tcp:send(Sock, CloseFrame),
|
||||
pump_until_close(Sock, C, 10).
|
||||
|
||||
pump_until_close(_, _, 0) ->
|
||||
ct:fail("did not get close response");
|
||||
pump_until_close(Sock, C0, N) ->
|
||||
case stream_test_utils:receive_stream_commands(Sock, C0) of
|
||||
{{response, 1, {close, ?RESPONSE_CODE_OK}}, C1} ->
|
||||
{ok, C1};
|
||||
{_Cmd, C1} ->
|
||||
pump_until_close(Sock, C1, N - 1)
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue