Merge pull request #12722 from rabbitmq/fix-flakes

Fix flakes
This commit is contained in:
Michael Klishin 2024-11-14 13:36:17 -05:00 committed by GitHub
commit c888689cca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 234 additions and 185 deletions

File diff suppressed because it is too large Load Diff

View File

@ -79,7 +79,8 @@ requeue_one_channel_quorum_queue(Config) ->
requeue_one_channel(QType, Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Ctag = <<"my consumer tag">>,
Ch = rabbit_ct_client_helpers:open_channel(Config),
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
{ok, Ch} = amqp_connection:open_channel(Conn),
#'queue.declare_ok'{} = amqp_channel:call(
Ch,

View File

@ -178,7 +178,7 @@ open_connections_to_limit(Config, Limit) ->
Connections.
close_all_connections(Connections) ->
[rabbit_ct_client_helpers:close_connection(C) || C <- Connections].
[catch rabbit_ct_client_helpers:close_connection(C) || C <- Connections].
set_node_limit(Config, Type, Limit) ->
rabbit_ct_broker_helpers:rpc(Config, 0,

View File

@ -304,10 +304,15 @@ queue_consumer_channel_closed(Config) ->
amqp_channel:close(Chan),
force_stats(Config),
Res = http_get(Config, "/queues/%2F/some-queue"),
% assert there are no consumer details
[] = maps:get(consumer_details, Res),
<<"some-queue">> = maps:get(name, Res),
?awaitMatch([],
%% assert there are no consumer details
maps:get(consumer_details,
http_get(Config, "/queues/%2F/some-queue")),
30000),
?awaitMatch(<<"some-queue">>,
maps:get(name,
http_get(Config, "/queues/%2F/some-queue")),
30000),
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
ok.
@ -325,10 +330,12 @@ queue(Config) ->
basic_get(Chan2, <<"some-queue">>),
force_stats(Config),
Res = http_get(Config, "/queues/%2F/some-queue"),
% assert single queue is returned
[#{} | _] = maps:get(deliveries, Res),
?awaitMatch([#{} | _],
maps:get(deliveries,
http_get(Config, "/queues/%2F/some-queue")),
30000),
amqp_channel:close(Chan),
amqp_channel:close(Chan2),
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
@ -390,9 +397,10 @@ channels_multiple_on_different_nodes(Config) ->
force_stats(Config),
Res = http_get(Config, "/channels"),
% assert two channels are present
[_,_] = Res,
?awaitMatch([_,_],
http_get(Config, "/channels"),
30000),
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
@ -416,9 +424,12 @@ channel_closed(Config) ->
force_stats(Config),
Res = http_get(Config, "/channels"),
% assert one channel is present
[_] = Res,
rabbit_ct_helpers:await_condition(
fun() ->
%% assert one channel is present
length(http_get(Config, "/channels")) == 1
end,
60000),
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),

View File

@ -113,10 +113,11 @@ prop_connection_channel_counts(Config) ->
Cons = lists:foldl(fun (Op, Agg) ->
execute_op(Config, Op, Agg)
end, [], Ops),
force_stats(Config),
%% TODO retry a few times
Res = retry_for(
fun() -> validate_counts(Config, Cons) end,
fun() ->
force_stats(Config),
validate_counts(Config, Cons) end,
60),
cleanup(Cons),
rabbit_ct_helpers:await_condition(

View File

@ -1204,26 +1204,50 @@ management_plugin_connection(Config) ->
Node = atom_to_binary(get_node_config(Config, 0, nodename)),
C1 = connect(ClientId, Config, [{keepalive, KeepaliveSecs}]),
eventually(?_assertEqual(1, length(http_get(Config, "/connections"))), 1000, 10),
FilterFun =
fun(#{client_properties := #{client_id := CId}})
when CId == ClientId -> true;
(_) -> false
end,
%% Sometimes connections remain open from other testcases,
%% let's match the one we're looking for
eventually(
?_assertMatch(
[_],
lists:filter(FilterFun, http_get(Config, "/connections"))),
1000, 10),
[#{client_properties := #{client_id := ClientId},
timeout := KeepaliveSecs,
node := Node,
name := ConnectionName}] = http_get(Config, "/connections"),
name := ConnectionName}] =
lists:filter(FilterFun, http_get(Config, "/connections")),
process_flag(trap_exit, true),
http_delete(Config,
"/connections/" ++ binary_to_list(uri_string:quote(ConnectionName)),
?NO_CONTENT),
await_exit(C1),
eventually(?_assertEqual([], http_get(Config, "/connections"))),
eventually(
?_assertMatch(
[],
lists:filter(FilterFun, http_get(Config, "/connections"))),
1000, 10),
eventually(?_assertEqual([], all_connection_pids(Config)), 500, 3),
C2 = connect(ClientId, Config, [{keepalive, KeepaliveSecs}]),
eventually(?_assertEqual(1, length(http_get(Config, "/connections"))), 1000, 10),
eventually(
?_assertMatch(
[_],
lists:filter(FilterFun, http_get(Config, "/connections"))),
1000, 10),
http_delete(Config,
"/connections/username/guest",
?NO_CONTENT),
await_exit(C2),
eventually(?_assertEqual([], http_get(Config, "/connections"))),
eventually(
?_assertMatch(
[],
lists:filter(FilterFun, http_get(Config, "/connections"))),
1000, 10),
eventually(?_assertEqual([], all_connection_pids(Config)), 500, 3).
management_plugin_enable(Config) ->
@ -1233,10 +1257,22 @@ management_plugin_enable(Config) ->
%% If the (web) MQTT connection is established **before** the management plugin is enabled,
%% the management plugin should still list the (web) MQTT connection.
C = connect(?FUNCTION_NAME, Config),
ClientId = atom_to_binary(?FUNCTION_NAME),
C = connect(ClientId, Config),
ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, rabbitmq_management_agent),
ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, rabbitmq_management),
eventually(?_assertEqual(1, length(http_get(Config, "/connections"))), 1000, 10),
FilterFun =
fun(#{client_properties := #{client_id := CId}})
when ClientId == CId -> true;
(_) -> false
end,
%% Sometimes connections remain open from other testcases,
%% let's match the one we're looking for
eventually(
?_assertMatch(
[_],
lists:filter(FilterFun, http_get(Config, "/connections"))),
1000, 10),
ok = emqtt:disconnect(C).