diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 473b6bcf3b..cd7106bea1 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -377,7 +377,6 @@ rabbitmq_integration_suite( additional_beam = [ ":test_quorum_queue_utils_beam", ], - flaky = True, shard_count = 7, ) @@ -565,7 +564,6 @@ rabbitmq_integration_suite( rabbitmq_integration_suite( name = "per_user_connection_channel_limit_partitions_SUITE", size = "large", - flaky = True, ) rabbitmq_integration_suite( @@ -1045,7 +1043,6 @@ rabbitmq_integration_suite( additional_beam = [ "test/test_rabbit_event_handler.beam", ], - flaky = True, ) rabbitmq_suite( diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index ca20be44f5..0f3bd332fa 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -149,7 +149,7 @@ TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck prop PLT_APPS += mnesia dep_syslog = git https://github.com/schlagert/syslog 4.0.0 -dep_osiris = git https://github.com/rabbitmq/osiris v1.6.0 +dep_osiris = git https://github.com/rabbitmq/osiris v1.6.1 dep_systemd = hex 0.6.1 dep_seshat = hex 0.4.0 diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index e841629d34..46a6fda43d 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -880,7 +880,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): outs = ["test/definition_import_SUITE.beam"], app_name = "rabbit", erlc_opts = "//:test_erlc_opts", - deps = ["//deps/rabbit_common:erlang_app"], + deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"], ) erlang_bytecode( name = "deprecated_features_SUITE_beam_files", @@ -932,7 +932,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): outs = ["test/dynamic_qq_SUITE.beam"], app_name = "rabbit", erlc_opts = "//:test_erlc_opts", - deps = ["//deps/amqp_client:erlang_app"], + deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"], ) erlang_bytecode( name = "eager_sync_SUITE_beam_files", @@ -1111,7 +1111,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): outs = ["test/per_user_connection_tracking_SUITE.beam"], app_name = "rabbit", erlc_opts = "//:test_erlc_opts", - deps = ["//deps/amqp_client:erlang_app"], + deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"], ) erlang_bytecode( name = "per_vhost_connection_limit_SUITE_beam_files", @@ -1147,7 +1147,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): outs = ["test/per_vhost_queue_limit_SUITE.beam"], app_name = "rabbit", erlc_opts = "//:test_erlc_opts", - deps = ["//deps/amqp_client:erlang_app"], + deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"], ) erlang_bytecode( name = "policy_SUITE_beam_files", @@ -1218,7 +1218,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): outs = ["test/queue_master_location_SUITE.beam"], app_name = "rabbit", erlc_opts = "//:test_erlc_opts", - deps = ["//deps/amqp_client:erlang_app"], + deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"], ) erlang_bytecode( name = "queue_parallel_SUITE_beam_files", @@ -1726,7 +1726,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): outs = ["test/unit_log_management_SUITE.beam"], app_name = "rabbit", erlc_opts = "//:test_erlc_opts", - deps = ["//deps/amqp_client:erlang_app"], + deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"], ) erlang_bytecode( name = "unit_operator_policy_SUITE_beam_files", @@ -1828,7 +1828,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): outs = ["test/vhost_SUITE.beam"], app_name = "rabbit", erlc_opts = "//:test_erlc_opts", - deps = ["//deps/amqp_client:erlang_app"], + deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"], ) erlang_bytecode( name = "rabbit_cuttlefish_SUITE_beam_files", diff --git a/deps/rabbit/test/dead_lettering_SUITE.erl b/deps/rabbit/test/dead_lettering_SUITE.erl index 25905a23be..2c87a290cf 100644 --- a/deps/rabbit/test/dead_lettering_SUITE.erl +++ b/deps/rabbit/test/dead_lettering_SUITE.erl @@ -188,14 +188,15 @@ init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config1, Testcase). end_per_testcase(Testcase, Config) -> + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}), - amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_dlx, Config)}), - amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_dlx_2, Config)}), amqp_channel:call(Ch, #'exchange.delete'{exchange = ?config(dlx_exchange, Config)}), _ = rabbit_ct_broker_helpers:clear_policy(Config, 0, ?config(policy, Config)), rabbit_ct_helpers:testcase_finished(Config, Testcase). +delete_queues() -> + [rabbit_amqqueue:delete(Q, false, false, <<"tests">>) || Q <- rabbit_amqqueue:list()]. + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Dead letter exchanges %% @@ -850,7 +851,15 @@ dead_letter_policy(Config) -> <<"queues">>, [{<<"dead-letter-exchange">>, DLXExchange}, {<<"dead-letter-routing-key">>, DLXQName}]), - timer:sleep(1000), + ?awaitMatch([_ | _], + begin + {ok, Q0} = rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_amqqueue, lookup, + [rabbit_misc:r(<<"/">>, queue, QName)], infinity), + amqqueue:get_policy(Q0) + end, + 30000), %% Nack the second message amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2, multiple = false, @@ -1319,7 +1328,12 @@ dead_letter_headers_first_death_route(Config) -> %% Send and reject the 3rd message. P3 = <<"msg3">>, publish(Ch, QName2, [P3]), - timer:sleep(1000), + case group_name(Config) of + at_most_once -> + wait_for_messages(Config, [[QName2, <<"1">>, <<"1">>, <<"0">>]]); + at_least_once -> + wait_for_messages(Config, [[QName2, <<"2">>, <<"1">>, <<"0">>]]) + end, [DTag] = consume(Ch, QName2, [P3]), amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag, requeue = false}), diff --git a/deps/rabbit/test/definition_import_SUITE.erl b/deps/rabbit/test/definition_import_SUITE.erl index f1c7ca6709..5276c6a571 100644 --- a/deps/rabbit/test/definition_import_SUITE.erl +++ b/deps/rabbit/test/definition_import_SUITE.erl @@ -7,6 +7,7 @@ -module(definition_import_SUITE). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -245,8 +246,7 @@ import_case13a(Config) -> %% We expect that importing an existing queue (i.e. same vhost and name) %% but with different arguments and different properties is a no-op. import_file_case(Config, "case13a"), - timer:sleep(1000), - ?assertMatch({ok, Q}, queue_lookup(Config, VHost, QueueName)). + ?awaitMatch({ok, Q}, queue_lookup(Config, VHost, QueueName), 30000). import_case14(Config) -> import_file_case(Config, "case14"). %% contains a user with tags as a list diff --git a/deps/rabbit/test/dynamic_qq_SUITE.erl b/deps/rabbit/test/dynamic_qq_SUITE.erl index 8a33aea340..f60898d878 100644 --- a/deps/rabbit/test/dynamic_qq_SUITE.erl +++ b/deps/rabbit/test/dynamic_qq_SUITE.erl @@ -9,6 +9,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -import(quorum_queue_utils, [wait_for_messages_ready/3, ra_name/1]). @@ -179,11 +180,14 @@ quorum_unaffected_after_vhost_failure(Config) -> arguments = Args, durable = true }), - timer:sleep(300), - - Info0 = rpc:call(A, rabbit_quorum_queue, infos, - [rabbit_misc:r(<<"/">>, queue, QName)]), - ?assertEqual(Servers, lists:sort(proplists:get_value(online, Info0, []))), + ?awaitMatch( + Servers, + begin + Info0 = rpc:call(A, rabbit_quorum_queue, infos, + [rabbit_misc:r(<<"/">>, queue, QName)]), + lists:sort(proplists:get_value(online, Info0, [])) + end, + 60000), %% Crash vhost on both nodes {ok, SupA} = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_vhost_sup_sup, get_vhost_sup, [<<"/">>]), @@ -191,9 +195,14 @@ quorum_unaffected_after_vhost_failure(Config) -> {ok, SupB} = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, get_vhost_sup, [<<"/">>]), exit(SupB, foo), - Info = rpc:call(A, rabbit_quorum_queue, infos, - [rabbit_misc:r(<<"/">>, queue, QName)]), - ?assertEqual(Servers, lists:sort(proplists:get_value(online, Info, []))). + ?awaitMatch( + Servers, + begin + Info = rpc:call(A, rabbit_quorum_queue, infos, + [rabbit_misc:r(<<"/">>, queue, QName)]), + lists:sort(proplists:get_value(online, Info, [])) + end, + 60000). recover_follower_after_standalone_restart(Config) -> case rabbit_ct_helpers:is_mixed_versions() of diff --git a/deps/rabbit/test/per_user_connection_channel_limit_partitions_SUITE.erl b/deps/rabbit/test/per_user_connection_channel_limit_partitions_SUITE.erl index e46dc3f166..f542f0cc50 100644 --- a/deps/rabbit/test/per_user_connection_channel_limit_partitions_SUITE.erl +++ b/deps/rabbit/test/per_user_connection_channel_limit_partitions_SUITE.erl @@ -105,7 +105,10 @@ cluster_full_partition_with_autoheal(Config) -> %% B drops off the network, non-reachable by either A or C rabbit_ct_broker_helpers:block_traffic_between(A, B), rabbit_ct_broker_helpers:block_traffic_between(B, C), - timer:sleep(?DELAY), + LargeCluster = lists:sort([A, C]), + ?awaitMatch(LargeCluster, list_running(Config, A), 60000, 3000), + ?awaitMatch([B], list_running(Config, B), 60000, 3000), + ?awaitMatch(LargeCluster, list_running(Config, C), 60000, 3000), %% A and C are still connected, so 4 connections are tracked %% All connections to B are dropped @@ -116,7 +119,10 @@ cluster_full_partition_with_autoheal(Config) -> rabbit_ct_broker_helpers:allow_traffic_between(A, B), rabbit_ct_broker_helpers:allow_traffic_between(B, C), - timer:sleep(?DELAY), + All = lists:sort([A, B, C]), + ?awaitMatch(All, list_running(Config, A), 60000, 3000), + ?awaitMatch(All, list_running(Config, B), 60000, 3000), + ?awaitMatch(All, list_running(Config, C), 60000, 3000), %% during autoheal B's connections were dropped ?awaitMatch({4, 10}, @@ -167,3 +173,18 @@ tracked_list_of_user(Config, NodeIndex, TrackingMod, Username) -> rabbit_ct_broker_helpers:rpc(Config, NodeIndex, TrackingMod, list_of_user, [Username]). + +list_running(Config, NodeIndex) -> + Ret = (catch rabbit_ct_broker_helpers:rpc(Config, NodeIndex, rabbit_nodes, list_running, [])), + Running = case Ret of + {'EXIT', {{exception, undef, _}, _}} -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, rabbit_mnesia, cluster_nodes, [running]); + _ -> + Ret + end, + case Running of + List when is_list(List) -> + lists:sort(List); + Any -> + Any + end. diff --git a/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl b/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl index 5d2051b2a8..4c4b1f7af7 100644 --- a/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl +++ b/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl @@ -14,6 +14,8 @@ -compile(export_all). +-define(A_TOUT, 20000). + all() -> [ {group, cluster_size_1_network}, @@ -92,6 +94,11 @@ init_per_testcase(Testcase, Config) -> Config. end_per_testcase(Testcase, Config) -> + Vhost = proplists:get_value(rmq_vhost, Config), + Username = proplists:get_value(rmq_username, Config), + rabbit_ct_broker_helpers:add_vhost(Config, Vhost), + rabbit_ct_broker_helpers:add_user(Config, Username), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username, Vhost), rabbit_ct_helpers:testcase_finished(Config, Testcase). %% ------------------------------------------------------------------- @@ -106,73 +113,78 @@ single_node_user_connection_channel_tracking(Config) -> rabbit_ct_broker_helpers:add_user(Config, Username2), rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), - ?assertEqual(0, count_connections_in(Config, Username)), - ?assertEqual(0, count_connections_in(Config, Username2)), - ?assertEqual(0, count_channels_in(Config, Username)), - ?assertEqual(0, count_channels_in(Config, Username2)), - ?assertEqual(0, tracked_user_connection_count(Config, Username)), - ?assertEqual(0, tracked_user_connection_count(Config, Username2)), - ?assertEqual(0, tracked_user_channel_count(Config, Username)), - ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT), [Conn1] = open_connections(Config, [0]), [Chan1] = open_channels(Conn1, 1), + ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), [#tracked_connection{username = Username}] = connections_in(Config, Username), + ?awaitMatch(1, count_channels_in(Config, Username), ?A_TOUT), [#tracked_channel{username = Username}] = channels_in(Config, Username), - ?assertEqual(true, is_process_alive(Conn1)), - ?assertEqual(true, is_process_alive(Chan1)), + ?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Chan1), ?A_TOUT), close_channels([Chan1]), - ?awaitMatch(0, count_channels_in(Config, Username), 20000), - ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000), - ?awaitMatch(false, is_process_alive(Chan1), 20000), + ?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(false, is_process_alive(Chan1), ?A_TOUT), close_connections([Conn1]), - ?awaitMatch(0, length(connections_in(Config, Username)), 20000), - ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000), - ?awaitMatch(false, is_process_alive(Conn1), 20000), + ?awaitMatch(0, length(connections_in(Config, Username)), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(false, is_process_alive(Conn1), ?A_TOUT), [Conn2] = open_connections(Config, [{0, Username2}]), Chans2 = [_|_] = open_channels(Conn2, 5), - timer:sleep(100), + ?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT), [#tracked_connection{username = Username2}] = connections_in(Config, Username2), - ?assertEqual(5, count_channels_in(Config, Username2)), - ?assertEqual(1, tracked_user_connection_count(Config, Username2)), - ?assertEqual(5, tracked_user_channel_count(Config, Username2)), - ?assertEqual(true, is_process_alive(Conn2)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + ?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn2), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2], [Conn3] = open_connections(Config, [0]), Chans3 = [_|_] = open_channels(Conn3, 5), + ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), [#tracked_connection{username = Username}] = connections_in(Config, Username), - ?assertEqual(5, count_channels_in(Config, Username)), - ?assertEqual(1, tracked_user_connection_count(Config, Username)), - ?assertEqual(5, tracked_user_channel_count(Config, Username)), - ?assertEqual(true, is_process_alive(Conn3)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans3], + ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn3), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans3], [Conn4] = open_connections(Config, [0]), Chans4 = [_|_] = open_channels(Conn4, 5), - ?assertEqual(2, tracked_user_connection_count(Config, Username)), - ?assertEqual(10, tracked_user_channel_count(Config, Username)), - ?assertEqual(true, is_process_alive(Conn4)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans4], + ?awaitMatch(2, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(10, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn4), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans4], kill_connections([Conn4]), + ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), [#tracked_connection{username = Username}] = connections_in(Config, Username), - ?awaitMatch(5, count_channels_in(Config, Username), 20000), - ?awaitMatch(1, tracked_user_connection_count(Config, Username), 20000), - ?awaitMatch(5, tracked_user_channel_count(Config, Username), 20000), - ?assertEqual(false, is_process_alive(Conn4)), - [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans4], + ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(false, is_process_alive(Conn4), ?A_TOUT), + [?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans4], [Conn5] = open_connections(Config, [0]), Chans5 = [_|_] = open_channels(Conn5, 7), + ?awaitMatch(2, count_connections_in(Config, Username), ?A_TOUT), [Username, Username] = lists:map(fun (#tracked_connection{username = U}) -> U end, connections_in(Config, Username)), - ?assertEqual(12, count_channels_in(Config, Username)), - ?assertEqual(12, tracked_user_channel_count(Config, Username)), - ?assertEqual(2, tracked_user_connection_count(Config, Username)), - ?assertEqual(true, is_process_alive(Conn5)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans5], + ?awaitMatch(12, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(12, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(2, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn5), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans5], close_channels(Chans2 ++ Chans3 ++ Chans5), ?awaitMatch(0, length(all_channels(Config)), 20000), @@ -196,57 +208,56 @@ single_node_user_deletion(Config) -> rabbit_ct_broker_helpers:add_user(Config, Username2), rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), - ?assertEqual(100, get_tracking_execution_timeout(Config)), + ?awaitMatch(100, get_tracking_execution_timeout(Config), ?A_TOUT), - ?assertEqual(0, count_connections_in(Config, Username)), - ?assertEqual(0, count_connections_in(Config, Username2)), - ?assertEqual(0, count_channels_in(Config, Username)), - ?assertEqual(0, count_channels_in(Config, Username2)), - ?assertEqual(0, tracked_user_connection_count(Config, Username)), - ?assertEqual(0, tracked_user_connection_count(Config, Username2)), - ?assertEqual(0, tracked_user_channel_count(Config, Username)), - ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT), [Conn1] = open_connections(Config, [0]), Chans1 = [_|_] = open_channels(Conn1, 5), - ?assertEqual(1, count_connections_in(Config, Username)), - ?assertEqual(5, count_channels_in(Config, Username)), - ?assertEqual(1, tracked_user_connection_count(Config, Username)), - ?assertEqual(5, tracked_user_channel_count(Config, Username)), - ?assertEqual(true, is_process_alive(Conn1)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1], [Conn2] = open_connections(Config, [{0, Username2}]), Chans2 = [_|_] = open_channels(Conn2, 5), - ?assertEqual(1, count_connections_in(Config, Username2)), - ?assertEqual(5, count_channels_in(Config, Username2)), - ?assertEqual(1, tracked_user_connection_count(Config, Username2)), - ?assertEqual(5, tracked_user_channel_count(Config, Username2)), - ?assertEqual(true, is_process_alive(Conn2)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + ?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn2), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2], - ?assertEqual(true, exists_in_tracked_connection_per_user_table(Config, Username2)), - ?assertEqual(true, exists_in_tracked_channel_per_user_table(Config, Username2)), + ?awaitMatch(true, exists_in_tracked_connection_per_user_table(Config, Username2), ?A_TOUT), + ?awaitMatch(true, exists_in_tracked_channel_per_user_table(Config, Username2), ?A_TOUT), rabbit_ct_broker_helpers:delete_user(Config, Username2), - timer:sleep(100), - ?assertEqual(0, count_connections_in(Config, Username2)), - ?assertEqual(0, count_channels_in(Config, Username2)), - ?assertEqual(0, tracked_user_connection_count(Config, Username2)), - ?assertEqual(0, tracked_user_channel_count(Config, Username2)), - ?assertEqual(false, is_process_alive(Conn2)), - [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + ?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT), + ?awaitMatch(false, is_process_alive(Conn2), ?A_TOUT), + [?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2], %% ensure vhost entry is cleared after 'tracking_execution_timeout' ?awaitMatch(false, exists_in_tracked_connection_per_user_table(Config, Username2), 20000), ?awaitMatch(false, exists_in_tracked_channel_per_user_table(Config, Username2), 20000), - ?assertEqual(1, count_connections_in(Config, Username)), - ?assertEqual(5, count_channels_in(Config, Username)), - ?assertEqual(1, tracked_user_connection_count(Config, Username)), - ?assertEqual(5, tracked_user_channel_count(Config, Username)), - ?assertEqual(true, is_process_alive(Conn1)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1], close_channels(Chans1), ?awaitMatch(0, count_channels_in(Config, Username), 20000), @@ -267,55 +278,54 @@ single_node_vhost_deletion(Config) -> rabbit_ct_broker_helpers:add_user(Config, Username2), rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), - ?assertEqual(100, get_tracking_execution_timeout(Config)), + ?awaitMatch(100, get_tracking_execution_timeout(Config), ?A_TOUT), - ?assertEqual(0, count_connections_in(Config, Username)), - ?assertEqual(0, count_connections_in(Config, Username2)), - ?assertEqual(0, count_channels_in(Config, Username)), - ?assertEqual(0, count_channels_in(Config, Username2)), - ?assertEqual(0, tracked_user_connection_count(Config, Username)), - ?assertEqual(0, tracked_user_connection_count(Config, Username2)), - ?assertEqual(0, tracked_user_channel_count(Config, Username)), - ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT), [Conn1] = open_connections(Config, [0]), Chans1 = [_|_] = open_channels(Conn1, 5), - ?assertEqual(1, count_connections_in(Config, Username)), - ?assertEqual(5, count_channels_in(Config, Username)), - ?assertEqual(1, tracked_user_connection_count(Config, Username)), - ?assertEqual(5, tracked_user_channel_count(Config, Username)), - ?assertEqual(true, is_process_alive(Conn1)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1], [Conn2] = open_connections(Config, [{0, Username2}]), Chans2 = [_|_] = open_channels(Conn2, 5), - ?assertEqual(1, count_connections_in(Config, Username2)), - ?assertEqual(5, count_channels_in(Config, Username2)), - ?assertEqual(1, tracked_user_connection_count(Config, Username2)), - ?assertEqual(5, tracked_user_channel_count(Config, Username2)), - ?assertEqual(true, is_process_alive(Conn2)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + ?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn2), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2], - ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, Vhost)), + ?awaitMatch(true, exists_in_tracked_connection_per_vhost_table(Config, Vhost), ?A_TOUT), rabbit_ct_broker_helpers:delete_vhost(Config, Vhost), - timer:sleep(200), - ?assertEqual(0, count_connections_in(Config, Username2)), - ?assertEqual(0, count_channels_in(Config, Username2)), - ?assertEqual(0, tracked_user_connection_count(Config, Username2)), - ?assertEqual(0, tracked_user_channel_count(Config, Username2)), - ?assertEqual(false, is_process_alive(Conn2)), - [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + ?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT), + ?awaitMatch(false, is_process_alive(Conn2), ?A_TOUT), + [?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2], - ?assertEqual(0, count_connections_in(Config, Username)), - ?assertEqual(0, count_channels_in(Config, Username)), - ?assertEqual(0, tracked_user_connection_count(Config, Username)), - ?assertEqual(0, tracked_user_channel_count(Config, Username)), - ?assertEqual(false, is_process_alive(Conn1)), - [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1], + ?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(false, is_process_alive(Conn1), ?A_TOUT), + [?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1], %% ensure vhost entry is cleared after 'tracking_execution_timeout' - ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, Vhost)), + ?awaitMatch(false, exists_in_tracked_connection_per_vhost_table(Config, Vhost), 20000), rabbit_ct_broker_helpers:add_vhost(Config, Vhost). @@ -328,49 +338,48 @@ single_node_vhost_down_mimic(Config) -> rabbit_ct_broker_helpers:add_user(Config, Username2), rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), - ?assertEqual(0, count_connections_in(Config, Username)), - ?assertEqual(0, count_connections_in(Config, Username2)), - ?assertEqual(0, count_channels_in(Config, Username)), - ?assertEqual(0, count_channels_in(Config, Username2)), - ?assertEqual(0, tracked_user_connection_count(Config, Username)), - ?assertEqual(0, tracked_user_connection_count(Config, Username2)), - ?assertEqual(0, tracked_user_channel_count(Config, Username)), - ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT), [Conn1] = open_connections(Config, [0]), Chans1 = [_|_] = open_channels(Conn1, 5), - ?assertEqual(1, count_connections_in(Config, Username)), - ?assertEqual(5, count_channels_in(Config, Username)), - ?assertEqual(1, tracked_user_connection_count(Config, Username)), - ?assertEqual(5, tracked_user_channel_count(Config, Username)), - ?assertEqual(true, is_process_alive(Conn1)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1], [Conn2] = open_connections(Config, [{0, Username2}]), Chans2 = [_|_] = open_channels(Conn2, 5), - ?assertEqual(1, count_connections_in(Config, Username2)), - ?assertEqual(5, count_channels_in(Config, Username2)), - ?assertEqual(1, tracked_user_connection_count(Config, Username2)), - ?assertEqual(5, tracked_user_channel_count(Config, Username2)), - ?assertEqual(true, is_process_alive(Conn2)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + ?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn2), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2], %% mimic vhost down event, while connections exist mimic_vhost_down(Config, 0, Vhost), - timer:sleep(200), - ?assertEqual(0, count_connections_in(Config, Username2)), - ?assertEqual(0, count_channels_in(Config, Username2)), - ?assertEqual(0, tracked_user_connection_count(Config, Username2)), - ?assertEqual(0, tracked_user_channel_count(Config, Username2)), - ?assertEqual(false, is_process_alive(Conn2)), - [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + ?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT), + ?awaitMatch(false, is_process_alive(Conn2), ?A_TOUT), + [?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2], - ?assertEqual(0, count_connections_in(Config, Username)), - ?assertEqual(0, count_channels_in(Config, Username)), - ?assertEqual(0, tracked_user_connection_count(Config, Username)), - ?assertEqual(0, tracked_user_channel_count(Config, Username)), - ?assertEqual(false, is_process_alive(Conn1)), - [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1]. + ?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(false, is_process_alive(Conn1), ?A_TOUT), + [?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1]. cluster_user_deletion(Config) -> set_tracking_execution_timeout(Config, 0, 100), @@ -383,59 +392,58 @@ cluster_user_deletion(Config) -> rabbit_ct_broker_helpers:add_user(Config, Username2), rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), - ?assertEqual(100, get_tracking_execution_timeout(Config, 0)), - ?assertEqual(100, get_tracking_execution_timeout(Config, 1)), + ?awaitMatch(100, get_tracking_execution_timeout(Config, 0), ?A_TOUT), + ?awaitMatch(100, get_tracking_execution_timeout(Config, 1), ?A_TOUT), - ?assertEqual(0, count_connections_in(Config, Username)), - ?assertEqual(0, count_connections_in(Config, Username2)), - ?assertEqual(0, count_channels_in(Config, Username)), - ?assertEqual(0, count_channels_in(Config, Username2)), - ?assertEqual(0, tracked_user_connection_count(Config, Username)), - ?assertEqual(0, tracked_user_connection_count(Config, Username2)), - ?assertEqual(0, tracked_user_channel_count(Config, Username)), - ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT), [Conn1] = open_connections(Config, [0]), Chans1 = [_|_] = open_channels(Conn1, 5), - ?assertEqual(1, count_connections_in(Config, Username)), - ?assertEqual(5, count_channels_in(Config, Username)), - ?assertEqual(1, tracked_user_connection_count(Config, Username)), - ?assertEqual(5, tracked_user_channel_count(Config, Username)), - ?assertEqual(true, is_process_alive(Conn1)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1], [Conn2] = open_connections(Config, [{1, Username2}]), Chans2 = [_|_] = open_channels(Conn2, 5), - ?assertEqual(1, count_connections_in(Config, Username2)), - ?assertEqual(5, count_channels_in(Config, Username2)), - ?assertEqual(1, tracked_user_connection_count(Config, Username2)), - ?assertEqual(5, tracked_user_channel_count(Config, Username2)), - ?assertEqual(true, is_process_alive(Conn2)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + ?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn2), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2], - ?assertEqual(true, exists_in_tracked_connection_per_user_table(Config, 1, Username2)), - ?assertEqual(true, exists_in_tracked_channel_per_user_table(Config, 1, Username2)), + ?awaitMatch(true, exists_in_tracked_connection_per_user_table(Config, 1, Username2), ?A_TOUT), + ?awaitMatch(true, exists_in_tracked_channel_per_user_table(Config, 1, Username2), ?A_TOUT), rabbit_ct_broker_helpers:delete_user(Config, Username2), - timer:sleep(200), - ?assertEqual(0, count_connections_in(Config, Username2)), - ?assertEqual(0, count_channels_in(Config, Username2)), - ?assertEqual(0, tracked_user_connection_count(Config, Username2)), - ?assertEqual(0, tracked_user_channel_count(Config, Username2)), - ?assertEqual(false, is_process_alive(Conn2)), - [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + ?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT), + ?awaitMatch(false, is_process_alive(Conn2), ?A_TOUT), + [?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2], %% ensure user entry is cleared after 'tracking_execution_timeout' - ?assertEqual(false, exists_in_tracked_connection_per_user_table(Config, 1, Username2)), - ?assertEqual(false, exists_in_tracked_channel_per_user_table(Config, 1, Username2)), + ?awaitMatch(false, exists_in_tracked_connection_per_user_table(Config, 1, Username2), ?A_TOUT), + ?awaitMatch(false, exists_in_tracked_channel_per_user_table(Config, 1, Username2), ?A_TOUT), close_channels(Chans1), - ?awaitMatch(0, count_channels_in(Config, Username), 20000), - ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000), + ?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT), close_connections([Conn1]), - ?awaitMatch(0, count_connections_in(Config, Username), 20000), - ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000). + ?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT). cluster_vhost_deletion(Config) -> set_tracking_execution_timeout(Config, 0, 100), @@ -448,63 +456,58 @@ cluster_vhost_deletion(Config) -> rabbit_ct_broker_helpers:add_user(Config, Username2), rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), - ?assertEqual(100, get_tracking_execution_timeout(Config, 0)), - ?assertEqual(100, get_tracking_execution_timeout(Config, 1)), + ?awaitMatch(100, get_tracking_execution_timeout(Config, 0), ?A_TOUT), + ?awaitMatch(100, get_tracking_execution_timeout(Config, 1), ?A_TOUT), - ?assertEqual(0, count_connections_in(Config, Username)), - ?assertEqual(0, count_connections_in(Config, Username2)), - ?assertEqual(0, count_channels_in(Config, Username)), - ?assertEqual(0, count_channels_in(Config, Username2)), - ?assertEqual(0, tracked_user_connection_count(Config, Username)), - ?assertEqual(0, tracked_user_connection_count(Config, Username2)), - ?assertEqual(0, tracked_user_channel_count(Config, Username)), - ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT), [Conn1] = open_connections(Config, [{0, Username}]), Chans1 = [_|_] = open_channels(Conn1, 5), - ?assertEqual(1, count_connections_in(Config, Username)), - ?assertEqual(5, count_channels_in(Config, Username)), - ?assertEqual(1, tracked_user_connection_count(Config, Username)), - ?assertEqual(5, tracked_user_channel_count(Config, Username)), - ?assertEqual(true, is_process_alive(Conn1)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1], [Conn2] = open_connections(Config, [{1, Username2}]), Chans2 = [_|_] = open_channels(Conn2, 5), - ?assertEqual(1, count_connections_in(Config, Username2)), - ?assertEqual(5, count_channels_in(Config, Username2)), - ?assertEqual(1, tracked_user_connection_count(Config, Username2)), - ?assertEqual(5, tracked_user_channel_count(Config, Username2)), - ?assertEqual(true, is_process_alive(Conn2)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + ?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn2), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2], - ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost)), - ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost)), + ?awaitMatch(true, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost), ?A_TOUT), + ?awaitMatch(true, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost), ?A_TOUT), rabbit_ct_broker_helpers:delete_vhost(Config, Vhost), - timer:sleep(200), - ?assertEqual(0, count_connections_in(Config, Username2)), - ?assertEqual(0, count_channels_in(Config, Username2)), - ?assertEqual(0, tracked_user_connection_count(Config, Username2)), - ?assertEqual(0, tracked_user_channel_count(Config, Username2)), - ?assertEqual(false, is_process_alive(Conn2)), - [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + ?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT), + ?awaitMatch(false, is_process_alive(Conn2), ?A_TOUT), + [?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2], - ?assertEqual(0, count_connections_in(Config, Username)), - ?assertEqual(0, count_channels_in(Config, Username)), - ?assertEqual(0, tracked_user_connection_count(Config, Username)), - ?assertEqual(0, tracked_user_channel_count(Config, Username)), - ?assertEqual(false, is_process_alive(Conn1)), - [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1], + ?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(false, is_process_alive(Conn1), ?A_TOUT), + [?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1], %% ensure vhost entry is cleared after 'tracking_execution_timeout' - ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost)), - ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost)), - - rabbit_ct_broker_helpers:add_vhost(Config, Vhost), - rabbit_ct_broker_helpers:add_user(Config, Username), - rabbit_ct_broker_helpers:set_full_permissions(Config, Username, Vhost). + ?awaitMatch(false, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost), ?A_TOUT), + ?awaitMatch(false, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost), ?A_TOUT). cluster_vhost_down_mimic(Config) -> Username = proplists:get_value(rmq_username, Config), @@ -515,58 +518,56 @@ cluster_vhost_down_mimic(Config) -> rabbit_ct_broker_helpers:add_user(Config, Username2), rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), - ?assertEqual(0, count_connections_in(Config, Username)), - ?assertEqual(0, count_connections_in(Config, Username2)), - ?assertEqual(0, count_channels_in(Config, Username)), - ?assertEqual(0, count_channels_in(Config, Username2)), - ?assertEqual(0, tracked_user_connection_count(Config, Username)), - ?assertEqual(0, tracked_user_connection_count(Config, Username2)), - ?assertEqual(0, tracked_user_channel_count(Config, Username)), - ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT), [Conn1] = open_connections(Config, [{0, Username}]), Chans1 = [_|_] = open_channels(Conn1, 5), - ?assertEqual(1, count_connections_in(Config, Username)), - ?assertEqual(5, count_channels_in(Config, Username)), - ?assertEqual(1, tracked_user_connection_count(Config, Username)), - ?assertEqual(5, tracked_user_channel_count(Config, Username)), - ?assertEqual(true, is_process_alive(Conn1)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1], [Conn2] = open_connections(Config, [{1, Username2}]), Chans2 = [_|_] = open_channels(Conn2, 5), - ?assertEqual(1, count_connections_in(Config, Username2)), - ?assertEqual(5, count_channels_in(Config, Username2)), - ?assertEqual(1, tracked_user_connection_count(Config, Username2)), - ?assertEqual(5, tracked_user_channel_count(Config, Username2)), - ?assertEqual(true, is_process_alive(Conn2)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + ?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn2), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2], mimic_vhost_down(Config, 1, Vhost), - timer:sleep(100), - ?assertEqual(0, count_connections_in(Config, Username2)), - ?assertEqual(0, count_channels_in(Config, Username2)), - ?assertEqual(0, tracked_user_connection_count(Config, Username2)), - ?assertEqual(0, tracked_user_channel_count(Config, Username2)), - ?assertEqual(false, is_process_alive(Conn2)), - [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + ?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT), + ?awaitMatch(false, is_process_alive(Conn2), ?A_TOUT), + [?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2], %% gen_event notifies local handlers. remote connections still active - ?assertEqual(1, count_connections_in(Config, Username)), - ?assertEqual(5, count_channels_in(Config, Username)), - ?assertEqual(1, tracked_user_connection_count(Config, Username)), - ?assertEqual(5, tracked_user_channel_count(Config, Username)), - ?assertEqual(true, is_process_alive(Conn1)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1], mimic_vhost_down(Config, 0, Vhost), - timer:sleep(100), - ?assertEqual(0, count_connections_in(Config, Username)), - ?assertEqual(0, count_channels_in(Config, Username)), - ?assertEqual(0, tracked_user_connection_count(Config, Username)), - ?assertEqual(0, tracked_user_channel_count(Config, Username)), - ?assertEqual(false, is_process_alive(Conn1)), - [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1]. + ?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(false, is_process_alive(Conn1), ?A_TOUT), + [?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1]. cluster_node_removed(Config) -> Username = proplists:get_value(rmq_username, Config), @@ -577,54 +578,52 @@ cluster_node_removed(Config) -> rabbit_ct_broker_helpers:add_user(Config, Username2), rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), - ?assertEqual(0, count_connections_in(Config, Username)), - ?assertEqual(0, count_connections_in(Config, Username2)), - ?assertEqual(0, count_channels_in(Config, Username)), - ?assertEqual(0, count_channels_in(Config, Username2)), - ?assertEqual(0, tracked_user_connection_count(Config, Username)), - ?assertEqual(0, tracked_user_connection_count(Config, Username2)), - ?assertEqual(0, tracked_user_channel_count(Config, Username)), - ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT), [Conn1] = open_connections(Config, [{0, Username}]), Chans1 = [_|_] = open_channels(Conn1, 5), - ?assertEqual(1, count_connections_in(Config, Username)), - ?assertEqual(5, count_channels_in(Config, Username)), - ?assertEqual(1, tracked_user_connection_count(Config, Username)), - ?assertEqual(5, tracked_user_channel_count(Config, Username)), - ?assertEqual(true, is_process_alive(Conn1)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1], [Conn2] = open_connections(Config, [{1, Username2}]), Chans2 = [_|_] = open_channels(Conn2, 5), - ?assertEqual(1, count_connections_in(Config, Username2)), - ?assertEqual(5, count_channels_in(Config, Username2)), - ?assertEqual(1, tracked_user_connection_count(Config, Username2)), - ?assertEqual(5, tracked_user_channel_count(Config, Username2)), - ?assertEqual(true, is_process_alive(Conn2)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + ?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT), + ?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn2), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2], rabbit_ct_broker_helpers:stop_broker(Config, 1), - timer:sleep(200), - ?assertEqual(1, count_connections_in(Config, Username)), - ?assertEqual(5, count_channels_in(Config, Username)), - ?assertEqual(1, tracked_user_connection_count(Config, Username)), - ?assertEqual(5, tracked_user_channel_count(Config, Username)), - ?assertEqual(true, is_process_alive(Conn1)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1], rabbit_ct_broker_helpers:forget_cluster_node(Config, 0, 1), - timer:sleep(200), - ?assertEqual(false, is_process_alive(Conn2)), - [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + ?awaitMatch(false, is_process_alive(Conn2), ?A_TOUT), + [?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2], - ?assertEqual(1, count_connections_in(Config, Username)), - ?assertEqual(5, count_channels_in(Config, Username)), - ?assertEqual(1, tracked_user_connection_count(Config, Username)), - ?assertEqual(5, tracked_user_channel_count(Config, Username)), - ?assertEqual(true, is_process_alive(Conn1)), - [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), + ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), + ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), + ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), + ?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT), + [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1], close_channels(Chans1), ?awaitMatch(0, count_channels_in(Config, Username), 20000), @@ -651,22 +650,19 @@ open_connections(Config, NodesAndUsers) -> (Node) -> rabbit_ct_client_helpers:OpenConnectionFun(Config, Node) end, NodesAndUsers), - timer:sleep(500), Conns. close_connections(Conns) -> lists:foreach(fun (Conn) -> rabbit_ct_client_helpers:close_connection(Conn) - end, Conns), - timer:sleep(500). + end, Conns). kill_connections(Conns) -> lists:foreach(fun (Conn) -> (catch exit(Conn, please_terminate)) - end, Conns), - timer:sleep(500). + end, Conns). open_channels(Conn, N) -> [begin @@ -792,9 +788,6 @@ get_tracking_execution_timeout(Config, NodeIndex) -> [rabbit, tracking_execution_timeout]), Timeout. -await_running_node_refresh(_Config, _NodeIndex) -> - timer:sleep(250). - expect_that_client_connection_is_rejected(Config) -> expect_that_client_connection_is_rejected(Config, 0). diff --git a/deps/rabbit/test/per_user_connection_tracking_SUITE.erl b/deps/rabbit/test/per_user_connection_tracking_SUITE.erl index 153b45cec6..e62ce95b83 100644 --- a/deps/rabbit/test/per_user_connection_tracking_SUITE.erl +++ b/deps/rabbit/test/per_user_connection_tracking_SUITE.erl @@ -7,12 +7,15 @@ -module(per_user_connection_tracking_SUITE). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("eunit/include/eunit.hrl"). -compile(export_all). +-define(AWAIT_TIMEOUT, 30000). + all() -> [ {group, cluster_size_1_network}, @@ -100,32 +103,37 @@ single_node_list_of_user(Config) -> rabbit_ct_broker_helpers:add_user(Config, Username2), rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), - ?assertEqual(0, length(connections_in(Config, Username))), - ?assertEqual(0, length(connections_in(Config, Username2))), + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username2)), [Conn1] = open_connections(Config, [0]), + ?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT), [#tracked_connection{username = Username}] = connections_in(Config, Username), close_connections([Conn1]), - ?assertEqual(0, length(connections_in(Config, Username))), + ?awaitMatch(0, count_connections_in(Config, Username), ?AWAIT_TIMEOUT), [Conn2] = open_connections(Config, [{0, Username2}]), + ?awaitMatch(1, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT), [#tracked_connection{username = Username2}] = connections_in(Config, Username2), [Conn3] = open_connections(Config, [0]), + ?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT), [#tracked_connection{username = Username}] = connections_in(Config, Username), [Conn4] = open_connections(Config, [0]), kill_connections([Conn4]), + ?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT), [#tracked_connection{username = Username}] = connections_in(Config, Username), [Conn5] = open_connections(Config, [0]), + ?awaitMatch(2, count_connections_in(Config, Username), ?AWAIT_TIMEOUT), [Username, Username] = lists:map(fun (#tracked_connection{username = U}) -> U end, connections_in(Config, Username)), close_connections([Conn2, Conn3, Conn5]), rabbit_ct_broker_helpers:delete_user(Config, Username2), - ?assertEqual(0, length(all_connections(Config))). + ?awaitMatch(0, length(all_connections(Config)), ?AWAIT_TIMEOUT). single_node_user_deletion_forces_connection_closure(Config) -> Username = proplists:get_value(rmq_username, Config), @@ -140,17 +148,16 @@ single_node_user_deletion_forces_connection_closure(Config) -> ?assertEqual(0, count_connections_in(Config, Username2)), [Conn1] = open_connections(Config, [0]), - ?assertEqual(1, count_connections_in(Config, Username)), + ?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT), [_Conn2] = open_connections(Config, [{0, Username2}]), - ?assertEqual(1, count_connections_in(Config, Username2)), + ?awaitMatch(1, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT), rabbit_ct_broker_helpers:delete_user(Config, Username2), - timer:sleep(200), - ?assertEqual(0, count_connections_in(Config, Username2)), + ?awaitMatch(0, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT), close_connections([Conn1]), - ?assertEqual(0, count_connections_in(Config, Username)). + ?awaitMatch(0, count_connections_in(Config, Username), ?AWAIT_TIMEOUT). cluster_user_deletion_forces_connection_closure(Config) -> Username = proplists:get_value(rmq_username, Config), @@ -165,17 +172,16 @@ cluster_user_deletion_forces_connection_closure(Config) -> ?assertEqual(0, count_connections_in(Config, Username2)), [Conn1] = open_connections(Config, [{0, Username}]), - ?assertEqual(1, count_connections_in(Config, Username)), + ?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT), [_Conn2] = open_connections(Config, [{1, Username2}]), - ?assertEqual(1, count_connections_in(Config, Username2)), + ?awaitMatch(1, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT), rabbit_ct_broker_helpers:delete_user(Config, Username2), - timer:sleep(200), - ?assertEqual(0, count_connections_in(Config, Username2)), + ?awaitMatch(0, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT), close_connections([Conn1]), - ?assertEqual(0, count_connections_in(Config, Username)). + ?awaitMatch(0, count_connections_in(Config, Username), ?AWAIT_TIMEOUT). %% ------------------------------------------------------------------- %% Helpers @@ -194,22 +200,19 @@ open_connections(Config, NodesAndUsers) -> (Node) -> rabbit_ct_client_helpers:OpenConnectionFun(Config, Node) end, NodesAndUsers), - timer:sleep(500), Conns. close_connections(Conns) -> lists:foreach(fun (Conn) -> rabbit_ct_client_helpers:close_connection(Conn) - end, Conns), - timer:sleep(500). + end, Conns). kill_connections(Conns) -> lists:foreach(fun (Conn) -> (catch exit(Conn, please_terminate)) - end, Conns), - timer:sleep(500). + end, Conns). count_connections_in(Config, Username) -> @@ -244,17 +247,3 @@ set_vhost_connection_limit(Config, NodeIndex, VHost, Count) -> set_vhost_limits, Node, ["{\"max-connections\": " ++ integer_to_list(Count) ++ "}"], [{"-p", binary_to_list(VHost)}]). - -await_running_node_refresh(_Config, _NodeIndex) -> - timer:sleep(250). - -expect_that_client_connection_is_rejected(Config) -> - expect_that_client_connection_is_rejected(Config, 0). - -expect_that_client_connection_is_rejected(Config, NodeIndex) -> - {error, not_allowed} = - rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex). - -expect_that_client_connection_is_rejected(Config, NodeIndex, VHost) -> - {error, not_allowed} = - rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex, VHost). diff --git a/deps/rabbit/test/per_vhost_connection_limit_SUITE.erl b/deps/rabbit/test/per_vhost_connection_limit_SUITE.erl index d0fa02a4ce..5415a0727c 100644 --- a/deps/rabbit/test/per_vhost_connection_limit_SUITE.erl +++ b/deps/rabbit/test/per_vhost_connection_limit_SUITE.erl @@ -407,8 +407,8 @@ cluster_node_list_on_node(Config) -> rabbit_ct_broker_helpers:stop_broker(Config, 1), - ?awaitMatch(2, length(all_connections(Config)), 1000), - ?assertEqual(0, length(connections_on_node(Config, 0, B))), + ?awaitMatch(2, length(all_connections(Config)), ?AWAIT, ?INTERVAL), + ?awaitMatch(0, length(connections_on_node(Config, 0, B)), ?AWAIT, ?INTERVAL), close_connections([Conn3, Conn5]), ?awaitMatch(0, length(all_connections(Config, 0)), ?AWAIT, ?INTERVAL), @@ -714,7 +714,6 @@ kill_connections(Conns) -> count_connections_in(Config, VHost) -> count_connections_in(Config, VHost, 0). count_connections_in(Config, VHost, NodeIndex) -> - timer:sleep(200), rabbit_ct_broker_helpers:rpc(Config, NodeIndex, rabbit_connection_tracking, count_tracked_items_in, [{vhost, VHost}]). diff --git a/deps/rabbit/test/per_vhost_queue_limit_SUITE.erl b/deps/rabbit/test/per_vhost_queue_limit_SUITE.erl index db8310d956..13c51a220b 100644 --- a/deps/rabbit/test/per_vhost_queue_limit_SUITE.erl +++ b/deps/rabbit/test/per_vhost_queue_limit_SUITE.erl @@ -7,6 +7,7 @@ -module(per_vhost_queue_limit_SUITE). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -16,6 +17,8 @@ -import(rabbit_ct_client_helpers, [open_unmanaged_connection/3, close_connection_and_channel/2]). +-define(AWAIT_TIMEOUT, 30000). + all() -> [ {group, cluster_size_1} @@ -119,29 +122,29 @@ end_per_testcase(Testcase, Config) -> most_basic_single_node_queue_count(Config) -> VHost = <<"queue-limits">>, set_up_vhost(Config, VHost), - ?assertEqual(0, count_queues_in(Config, VHost)), + ?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT), Conn = open_unmanaged_connection(Config, 0, VHost), {ok, Ch} = amqp_connection:open_channel(Conn), declare_exclusive_queues(Ch, 10), - ?assertEqual(10, count_queues_in(Config, VHost)), + ?awaitMatch(10, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT), close_connection_and_channel(Conn, Ch), - ?assertEqual(0, count_queues_in(Config, VHost)), + ?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT), rabbit_ct_broker_helpers:delete_vhost(Config, VHost). single_node_single_vhost_queue_count(Config) -> VHost = <<"queue-limits">>, set_up_vhost(Config, VHost), - ?assertEqual(0, count_queues_in(Config, VHost)), + ?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT), Conn = open_unmanaged_connection(Config, 0, VHost), {ok, Ch} = amqp_connection:open_channel(Conn), declare_exclusive_queues(Ch, 10), - ?assertEqual(10, count_queues_in(Config, VHost)), + ?awaitMatch(10, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT), declare_durable_queues(Ch, 10), - ?assertEqual(20, count_queues_in(Config, VHost)), + ?awaitMatch(20, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT), delete_durable_queues(Ch, 10), - ?assertEqual(10, count_queues_in(Config, VHost)), + ?awaitMatch(10, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT), close_connection_and_channel(Conn, Ch), - ?assertEqual(0, count_queues_in(Config, VHost)), + ?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT), rabbit_ct_broker_helpers:delete_vhost(Config, VHost). single_node_multiple_vhosts_queue_count(Config) -> @@ -150,8 +153,8 @@ single_node_multiple_vhosts_queue_count(Config) -> set_up_vhost(Config, VHost1), set_up_vhost(Config, VHost2), - ?assertEqual(0, count_queues_in(Config, VHost1)), - ?assertEqual(0, count_queues_in(Config, VHost2)), + ?awaitMatch(0, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT), + ?awaitMatch(0, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT), Conn1 = open_unmanaged_connection(Config, 0, VHost1), {ok, Ch1} = amqp_connection:open_channel(Conn1), @@ -159,17 +162,17 @@ single_node_multiple_vhosts_queue_count(Config) -> {ok, Ch2} = amqp_connection:open_channel(Conn2), declare_exclusive_queues(Ch1, 10), - ?assertEqual(10, count_queues_in(Config, VHost1)), + ?awaitMatch(10, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT), declare_durable_queues(Ch1, 10), - ?assertEqual(20, count_queues_in(Config, VHost1)), + ?awaitMatch(20, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT), delete_durable_queues(Ch1, 10), - ?assertEqual(10, count_queues_in(Config, VHost1)), + ?awaitMatch(10, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT), declare_exclusive_queues(Ch2, 30), - ?assertEqual(30, count_queues_in(Config, VHost2)), + ?awaitMatch(30, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT), close_connection_and_channel(Conn1, Ch1), - ?assertEqual(0, count_queues_in(Config, VHost1)), + ?awaitMatch(0, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT), close_connection_and_channel(Conn2, Ch2), - ?assertEqual(0, count_queues_in(Config, VHost2)), + ?awaitMatch(0, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT), rabbit_ct_broker_helpers:delete_vhost(Config, VHost1), rabbit_ct_broker_helpers:delete_vhost(Config, VHost2). @@ -183,7 +186,7 @@ single_node_single_vhost_zero_limit(Config) -> single_node_single_vhost_limit_with_durable_named_queue(Config) -> VHost = <<"queue-limits">>, set_up_vhost(Config, VHost), - ?assertEqual(0, count_queues_in(Config, VHost)), + ?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT), set_vhost_queue_limit(Config, VHost, 3), Conn = open_unmanaged_connection(Config, 0, VHost), @@ -219,7 +222,7 @@ single_node_single_vhost_zero_limit_with_durable_named_queue(Config) -> single_node_single_vhost_limit_with(Config, WatermarkLimit) -> VHost = <<"queue-limits">>, set_up_vhost(Config, VHost), - ?assertEqual(0, count_queues_in(Config, VHost)), + ?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT), set_vhost_queue_limit(Config, VHost, 3), Conn = open_unmanaged_connection(Config, 0, VHost), @@ -243,7 +246,7 @@ single_node_single_vhost_limit_with(Config, WatermarkLimit) -> single_node_single_vhost_zero_limit_with(Config, QueueDeclare) -> VHost = <<"queue-limits">>, set_up_vhost(Config, VHost), - ?assertEqual(0, count_queues_in(Config, VHost)), + ?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT), Conn1 = open_unmanaged_connection(Config, 0, VHost), {ok, Ch1} = amqp_connection:open_channel(Conn1), @@ -272,7 +275,7 @@ single_node_single_vhost_zero_limit_with(Config, QueueDeclare) -> single_node_single_vhost_limit_with_queue_ttl(Config) -> VHost = <<"queue-limits">>, set_up_vhost(Config, VHost), - ?assertEqual(0, count_queues_in(Config, VHost)), + ?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT), Conn1 = open_unmanaged_connection(Config, 0, VHost), {ok, Ch1} = amqp_connection:open_channel(Conn1), @@ -297,7 +300,10 @@ single_node_single_vhost_limit_with_queue_ttl(Config) -> {ok, Ch2} = amqp_connection:open_channel(Conn2), %% wait for the queues to expire - timer:sleep(3000), + ?awaitMatch( + [], + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []), + ?AWAIT_TIMEOUT), #'queue.declare_ok'{queue = _} = amqp_channel:call(Ch2, #'queue.declare'{queue = <<"">>, @@ -309,7 +315,7 @@ single_node_single_vhost_limit_with_queue_ttl(Config) -> single_node_single_vhost_limit_with_redeclaration(Config) -> VHost = <<"queue-limits">>, set_up_vhost(Config, VHost), - ?assertEqual(0, count_queues_in(Config, VHost)), + ?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT), set_vhost_queue_limit(Config, VHost, 3), Conn1 = open_unmanaged_connection(Config, 0, VHost), @@ -367,55 +373,55 @@ single_node_single_vhost_limit_with_redeclaration(Config) -> most_basic_cluster_queue_count(Config) -> VHost = <<"queue-limits">>, set_up_vhost(Config, VHost), - ?assertEqual(0, count_queues_in(Config, VHost, 0)), - ?assertEqual(0, count_queues_in(Config, VHost, 1)), + ?awaitMatch(0, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT), + ?awaitMatch(0, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT), Conn1 = open_unmanaged_connection(Config, 0, VHost), {ok, Ch1} = amqp_connection:open_channel(Conn1), declare_exclusive_queues(Ch1, 10), - ?assertEqual(10, count_queues_in(Config, VHost, 0)), - ?assertEqual(10, count_queues_in(Config, VHost, 1)), + ?awaitMatch(10, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT), + ?awaitMatch(10, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT), Conn2 = open_unmanaged_connection(Config, 0, VHost), {ok, Ch2} = amqp_connection:open_channel(Conn2), declare_exclusive_queues(Ch2, 15), - ?assertEqual(25, count_queues_in(Config, VHost, 0)), - ?assertEqual(25, count_queues_in(Config, VHost, 1)), + ?awaitMatch(25, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT), + ?awaitMatch(25, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT), close_connection_and_channel(Conn1, Ch1), close_connection_and_channel(Conn2, Ch2), - ?assertEqual(0, count_queues_in(Config, VHost, 0)), - ?assertEqual(0, count_queues_in(Config, VHost, 1)), + ?awaitMatch(0, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT), + ?awaitMatch(0, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT), rabbit_ct_broker_helpers:delete_vhost(Config, VHost). cluster_node_restart_queue_count(Config) -> VHost = <<"queue-limits">>, set_up_vhost(Config, VHost), - ?assertEqual(0, count_queues_in(Config, VHost, 0)), - ?assertEqual(0, count_queues_in(Config, VHost, 1)), + ?awaitMatch(0, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT), + ?awaitMatch(0, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT), Conn1 = open_unmanaged_connection(Config, 0, VHost), {ok, Ch1} = amqp_connection:open_channel(Conn1), declare_exclusive_queues(Ch1, 10), - ?assertEqual(10, count_queues_in(Config, VHost, 0)), - ?assertEqual(10, count_queues_in(Config, VHost, 1)), + ?awaitMatch(10, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT), + ?awaitMatch(10, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT), rabbit_ct_broker_helpers:restart_broker(Config, 0), - ?assertEqual(0, count_queues_in(Config, VHost, 0)), + ?awaitMatch(0, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT), Conn2 = open_unmanaged_connection(Config, 1, VHost), {ok, Ch2} = amqp_connection:open_channel(Conn2), declare_exclusive_queues(Ch2, 15), - ?assertEqual(15, count_queues_in(Config, VHost, 0)), - ?assertEqual(15, count_queues_in(Config, VHost, 1)), + ?awaitMatch(15, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT), + ?awaitMatch(15, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT), declare_durable_queues(Ch2, 10), - ?assertEqual(25, count_queues_in(Config, VHost, 0)), - ?assertEqual(25, count_queues_in(Config, VHost, 1)), + ?awaitMatch(25, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT), + ?awaitMatch(25, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT), rabbit_ct_broker_helpers:restart_broker(Config, 1), - ?assertEqual(10, count_queues_in(Config, VHost, 0)), - ?assertEqual(10, count_queues_in(Config, VHost, 1)), + ?awaitMatch(10, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT), + ?awaitMatch(10, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT), rabbit_ct_broker_helpers:delete_vhost(Config, VHost). @@ -425,28 +431,28 @@ cluster_multiple_vhosts_queue_count(Config) -> set_up_vhost(Config, VHost1), set_up_vhost(Config, VHost2), - ?assertEqual(0, count_queues_in(Config, VHost1)), - ?assertEqual(0, count_queues_in(Config, VHost2)), + ?awaitMatch(0, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT), + ?awaitMatch(0, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT), Conn1 = open_unmanaged_connection(Config, 0, VHost1), {ok, Ch1} = amqp_connection:open_channel(Conn1), declare_exclusive_queues(Ch1, 10), - ?assertEqual(10, count_queues_in(Config, VHost1, 0)), - ?assertEqual(10, count_queues_in(Config, VHost1, 1)), - ?assertEqual(0, count_queues_in(Config, VHost2, 0)), - ?assertEqual(0, count_queues_in(Config, VHost2, 1)), + ?awaitMatch(10, count_queues_in(Config, VHost1, 0), ?AWAIT_TIMEOUT), + ?awaitMatch(10, count_queues_in(Config, VHost1, 1), ?AWAIT_TIMEOUT), + ?awaitMatch(0, count_queues_in(Config, VHost2, 0), ?AWAIT_TIMEOUT), + ?awaitMatch(0, count_queues_in(Config, VHost2, 1), ?AWAIT_TIMEOUT), Conn2 = open_unmanaged_connection(Config, 0, VHost2), {ok, Ch2} = amqp_connection:open_channel(Conn2), declare_exclusive_queues(Ch2, 15), - ?assertEqual(15, count_queues_in(Config, VHost2, 0)), - ?assertEqual(15, count_queues_in(Config, VHost2, 1)), + ?awaitMatch(15, count_queues_in(Config, VHost2, 0), ?AWAIT_TIMEOUT), + ?awaitMatch(15, count_queues_in(Config, VHost2, 1), ?AWAIT_TIMEOUT), close_connection_and_channel(Conn1, Ch1), close_connection_and_channel(Conn2, Ch2), - ?assertEqual(0, count_queues_in(Config, VHost1, 0)), - ?assertEqual(0, count_queues_in(Config, VHost1, 1)), - ?assertEqual(0, count_queues_in(Config, VHost2, 0)), - ?assertEqual(0, count_queues_in(Config, VHost2, 1)), + ?awaitMatch(0, count_queues_in(Config, VHost1, 0), ?AWAIT_TIMEOUT), + ?awaitMatch(0, count_queues_in(Config, VHost1, 1), ?AWAIT_TIMEOUT), + ?awaitMatch(0, count_queues_in(Config, VHost2, 0), ?AWAIT_TIMEOUT), + ?awaitMatch(0, count_queues_in(Config, VHost2, 1), ?AWAIT_TIMEOUT), rabbit_ct_broker_helpers:delete_vhost(Config, VHost1), rabbit_ct_broker_helpers:delete_vhost(Config, VHost2). @@ -460,8 +466,8 @@ cluster_multiple_vhosts_limit_with(Config, WatermarkLimit) -> VHost2 = <<"queue-limits2">>, set_up_vhost(Config, VHost1), set_up_vhost(Config, VHost2), - ?assertEqual(0, count_queues_in(Config, VHost1)), - ?assertEqual(0, count_queues_in(Config, VHost2)), + ?awaitMatch(0, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT), + ?awaitMatch(0, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT), set_vhost_queue_limit(Config, VHost1, 3), set_vhost_queue_limit(Config, VHost2, 3), @@ -510,8 +516,8 @@ cluster_multiple_vhosts_limit_with_durable_named_queue(Config) -> VHost2 = <<"queue-limits2">>, set_up_vhost(Config, VHost1), set_up_vhost(Config, VHost2), - ?assertEqual(0, count_queues_in(Config, VHost1)), - ?assertEqual(0, count_queues_in(Config, VHost2)), + ?awaitMatch(0, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT), + ?awaitMatch(0, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT), set_vhost_queue_limit(Config, VHost1, 3), set_vhost_queue_limit(Config, VHost2, 3), @@ -574,8 +580,8 @@ cluster_multiple_vhosts_zero_limit_with(Config, QueueDeclare) -> VHost2 = <<"queue-limits2">>, set_up_vhost(Config, VHost1), set_up_vhost(Config, VHost2), - ?assertEqual(0, count_queues_in(Config, VHost1)), - ?assertEqual(0, count_queues_in(Config, VHost2)), + ?awaitMatch(0, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT), + ?awaitMatch(0, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT), Conn1 = open_unmanaged_connection(Config, 0, VHost1), {ok, Ch1} = amqp_connection:open_channel(Conn1), @@ -640,7 +646,6 @@ set_vhost_queue_limit(Config, NodeIndex, VHost, Count) -> count_queues_in(Config, VHost) -> count_queues_in(Config, VHost, 0). count_queues_in(Config, VHost, NodeIndex) -> - timer:sleep(200), rabbit_ct_broker_helpers:rpc(Config, NodeIndex, rabbit_amqqueue, count, [VHost]). diff --git a/deps/rabbit/test/queue_master_location_SUITE.erl b/deps/rabbit/test/queue_master_location_SUITE.erl index 888c4c580c..f80de76bac 100644 --- a/deps/rabbit/test/queue_master_location_SUITE.erl +++ b/deps/rabbit/test/queue_master_location_SUITE.erl @@ -26,6 +26,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile(export_all). @@ -203,7 +204,7 @@ declare_policy_exactly(Config) -> Node0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), rabbit_ct_broker_helpers:control_action(sync_queue, Node0, [binary_to_list(Q)], [{"-p", "/"}]), - wait_for_sync(Config, Node0, QueueRes, 1), + ?awaitMatch(true, synced(Config, Node0, QueueRes, 1), 60000), {ok, Queue} = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue, lookup, [QueueRes]), @@ -459,18 +460,6 @@ set_location_policy(Config, Name, Strategy) -> ok = rabbit_ct_broker_helpers:set_policy(Config, 0, Name, <<".*">>, <<"queues">>, [{<<"queue-master-locator">>, Strategy}]). -wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen) -> - wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen, 600). - -wait_for_sync(_, _, _, _, 0) -> - throw(sync_timeout); -wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen, N) -> - case synced(Config, Nodename, Q, ExpectedSSPidLen) of - true -> ok; - false -> timer:sleep(100), - wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen, N-1) - end. - synced(Config, Nodename, Q, ExpectedSSPidLen) -> Args = [<<"/">>, [name, synchronised_slave_pids]], Info = rabbit_ct_broker_helpers:rpc(Config, Nodename, diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 83c3d2b9cf..76a4198506 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1401,7 +1401,7 @@ publishing_to_unavailable_queue(Config) -> exit(confirm_timeout) end, ok = rabbit_ct_broker_helpers:start_node(Config, Server1), - timer:sleep(2000), + ?awaitMatch(2, count_online_nodes(Server, <<"/">>, QQ), ?DEFAULT_AWAIT), publish_many(Ch, QQ, 1), %% this should now be acked ok = receive @@ -1841,23 +1841,25 @@ delete_member_queue_not_found(Config) -> [<<"/">>, QQ, Server])). delete_member(Config) -> - [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + NServers = length(Servers), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - timer:sleep(100), + ?awaitMatch(NServers, count_online_nodes(Server, <<"/">>, QQ), ?DEFAULT_AWAIT), ?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Server])). delete_member_not_a_member(Config) -> - [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + NServers = length(Servers), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - timer:sleep(100), + ?awaitMatch(NServers, count_online_nodes(Server, <<"/">>, QQ), ?DEFAULT_AWAIT), ?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Server])), @@ -1868,14 +1870,14 @@ delete_member_not_a_member(Config) -> delete_member_during_node_down(Config) -> [Server, DownServer, Remove] = rabbit_ct_broker_helpers:get_node_configs( - Config, nodename), + Config, nodename), stop_node(Config, DownServer), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - timer:sleep(200), + ?awaitMatch(2, count_online_nodes(Server, <<"/">>, QQ), ?DEFAULT_AWAIT), ?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Remove])), @@ -1889,11 +1891,12 @@ delete_member_during_node_down(Config) -> node_removal_is_quorum_critical(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + NServers = length(Servers), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QName = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - timer:sleep(100), + ?awaitMatch(NServers, count_online_nodes(Server, <<"/">>, QName), ?DEFAULT_AWAIT), [begin Qs = rpc:call(S, rabbit_quorum_queue, list_with_minimum_quorum, []), ?assertEqual([QName], queue_names(Qs)) @@ -1905,7 +1908,7 @@ node_removal_is_not_quorum_critical(Config) -> QName = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - timer:sleep(100), + ?awaitMatch(3, count_online_nodes(Server, <<"/">>, QName), ?DEFAULT_AWAIT), Qs = rpc:call(Server, rabbit_quorum_queue, list_with_minimum_quorum, []), ?assertEqual([], Qs). @@ -1975,7 +1978,7 @@ cleanup_data_dir(Config) -> QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - timer:sleep(100), + ?awaitMatch(2, count_online_nodes(Server1, <<"/">>, QQ), ?DEFAULT_AWAIT), UId1 = proplists:get_value(ra_name(QQ), rpc:call(Server1, ra_directory, list_registered, [quorum_queues])), UId2 = proplists:get_value(ra_name(QQ), rpc:call(Server2, ra_directory, list_registered, [quorum_queues])), @@ -2406,13 +2409,11 @@ consume_redelivery_count(Config) -> multiple = false, requeue = true}), %% wait for requeuing - timer:sleep(500), - {#'basic.get_ok'{delivery_tag = DeliveryTag1, redelivered = true}, #amqp_msg{props = #'P_basic'{headers = H1}}} = - amqp_channel:call(Ch, #'basic.get'{queue = QQ, - no_ack = false}), + basic_get(Ch, QQ, false, 300), + ?assertMatch({DCHeader, _, 1}, rabbit_basic:header(DCHeader, H1)), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, multiple = false, @@ -2497,9 +2498,16 @@ memory_alarm_rolls_wal(Config) -> rabbit_ct_helpers:await_condition( fun() -> rabbit_ct_broker_helpers:get_alarms(Config, Server) =/= [] end ), - timer:sleep(1000), - [Wal1] = filelib:wildcard(WalDataDir ++ "/*.wal"), - ?assert(Wal0 =/= Wal1), + rabbit_ct_helpers:await_condition( + fun() -> + List = filelib:wildcard(WalDataDir ++ "/*.wal"), + %% There is a small time window where there could be no + %% file, but we need to wait for it to ensure it is not + %% roll over again later on + [Wal0] =/= List andalso [] =/= List + end, 30000), + Wal1 = lists:last(lists:sort(filelib:wildcard(WalDataDir ++ "/*.wal"))), + %% roll over shouldn't happen if we trigger a new alarm in less than %% min_wal_roll_over_interval rabbit_ct_broker_helpers:set_alarm(Config, Server, memory), @@ -2507,7 +2515,7 @@ memory_alarm_rolls_wal(Config) -> fun() -> rabbit_ct_broker_helpers:get_alarms(Config, Server) =/= [] end ), timer:sleep(1000), - [Wal2] = filelib:wildcard(WalDataDir ++ "/*.wal"), + Wal2 = lists:last(lists:sort(filelib:wildcard(WalDataDir ++ "/*.wal"))), ?assert(Wal1 == Wal2), lists:foreach(fun (Node) -> ok = rabbit_ct_broker_helpers:clear_alarm(Config, Node, memory) @@ -2676,16 +2684,42 @@ message_ttl(Config) -> declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, {<<"x-message-ttl">>, long, 2000}])), + %% Checking for messages that are short-lived could cause intermitent + %% failures, as these could have expired before the check takes + %% place. Thus, we're goig to split this testcase in two: + %% 1. Check that messages published with ttl reach the queue + %% This can be easily achieved by having a consumer already + %% subscribed to the queue. + %% 2. Check that messages published eventually disappear from the + %% queue. + Msg1 = <<"msg1">>, Msg2 = <<"msg11">>, + %% 1. Subscribe, publish and consume two messages + subscribe(Ch, QQ, false), + publish(Ch, QQ, Msg1), + publish(Ch, QQ, Msg2), + wait_for_messages(Config, [[QQ, <<"2">>, <<"0">>, <<"2">>]]), + receive_and_ack(Ch), + receive_and_ack(Ch), + cancel(Ch), + wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]), + + %% 2. Publish two messages and wait until queue is empty publish(Ch, QQ, Msg1), publish(Ch, QQ, Msg2), - wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), - timer:sleep(2000), wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]), ok. +receive_and_ack(Ch) -> + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}) + end. + message_ttl_policy(Config) -> %% Using ttl is very difficul to guarantee 100% test rate success, unless %% using really high ttl values. Previously, this test used 1s and 3s ttl, @@ -2824,12 +2858,17 @@ consumer_metrics(Config) -> RaName = ra_name(QQ), {ok, _, {_, Leader}} = ra:members({RaName, Server}), - timer:sleep(5000), QNameRes = rabbit_misc:r(<<"/">>, queue, QQ), - [{_, PropList, _}] = rpc:call(Leader, ets, lookup, [queue_metrics, QNameRes]), - ?assertMatch([{consumers, 1}], lists:filter(fun({Key, _}) -> - Key == consumers - end, PropList)). + ?awaitMatch(1, + begin + case rpc:call(Leader, ets, lookup, [queue_metrics, QNameRes]) of + [{_, PropList, _}] -> + proplists:get_value(consumers, PropList, undefined); + _ -> + undefined + end + end, + 30000). delete_if_empty(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -3271,6 +3310,11 @@ get_queue_type(Server, VHost, Q0) -> {ok, Q1} = rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]), amqqueue:get_type(Q1). +count_online_nodes(Server, VHost, Q0) -> + QNameRes = rabbit_misc:r(VHost, queue, Q0), + Info = rpc:call(Server, rabbit_quorum_queue, infos, [QNameRes]), + length(proplists:get_value(online, Info, [])). + publish_many(Ch, Queue, Count) -> [publish(Ch, Queue) || _ <- lists:seq(1, Count)]. diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 7591262091..927c305915 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -416,6 +416,7 @@ add_replicas(Config) -> [<<"/">>, Q, Server1])), timer:sleep(1000), + check_leader_and_replicas(Config, [Server0, Server1]), %% it is almost impossible to reliably catch this situation. %% increasing number of messages published and the data size could help @@ -424,6 +425,8 @@ add_replicas(Config) -> rpc:call(Server0, rabbit_stream_queue, add_replica, [<<"/">>, Q, Server2])), timer:sleep(1000), + check_leader_and_replicas(Config, [Server0, Server1, Server2]), + %% validate we can read the last entry qos(Ch, 10, false), amqp_channel:subscribe( @@ -998,22 +1001,23 @@ consume_timestamp_last_offset(Config) -> %% Subscribe from now/future Offset = erlang:system_time(second) + 60, + CTag = <<"consume_timestamp_last_offset">>, amqp_channel:subscribe( Ch1, #'basic.consume'{queue = Q, no_ack = false, - consumer_tag = <<"ctag">>, + consumer_tag = CTag, arguments = [{<<"x-stream-offset">>, timestamp, Offset}]}, self()), receive - #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + #'basic.consume_ok'{consumer_tag = CTag} -> ok after 5000 -> exit(missing_consume_ok) end, receive - {_, + {#'basic.deliver'{consumer_tag = CTag}, #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S}]}}} when S < 100 -> exit({unexpected_offset, S}) @@ -1770,12 +1774,10 @@ max_age(Config) -> [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)], amqp_channel:wait_for_confirms(Ch, 5), - timer:sleep(5000), + %% Let's give it some margin if some messages fall between segments + quorum_queue_utils:wait_for_min_messages(Config, Q, 100), + quorum_queue_utils:wait_for_max_messages(Config, Q, 150), - Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), - qos(Ch1, 200, false), - subscribe(Ch1, Q, false, 0), - ?assertEqual(100, length(receive_batch())), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). replica_recovery(Config) -> @@ -1834,7 +1836,6 @@ leader_failover(Config) -> publish_confirm(Ch1, Q, [<<"msg">> || _ <- lists:seq(1, 100)]), ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), - timer:sleep(30000), rabbit_ct_helpers:await_condition( fun () -> @@ -1842,7 +1843,7 @@ leader_failover(Config) -> NewLeader = proplists:get_value(leader, Info), NewLeader =/= Server1 - end), + end, 45000), ok = rabbit_ct_broker_helpers:start_node(Config, Server1), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). @@ -2082,7 +2083,8 @@ leader_locator_balanced_maintenance(Config) -> rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). select_nodes_with_least_replicas(Config) -> - [Server1 | _ ] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server1 | _ ] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Servers = lists:usort(Servers0), Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), Q = ?config(queue_name, Config), Bin = rabbit_data_coercion:to_binary(?FUNCTION_NAME), @@ -2102,8 +2104,9 @@ select_nodes_with_least_replicas(Config) -> end, Qs), %% We expect that the second stream chose nodes where the first stream does not have replicas. - ?assertEqual(lists:usort(Servers), - lists:usort(Q1Members ++ QMembers)), + ?awaitMatch(Servers, + lists:usort(Q1Members ++ QMembers), + 30000), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, [Qs]). leader_locator_policy(Config) -> @@ -2209,13 +2212,15 @@ max_age_policy(Config) -> Config, 0, PolicyName, <<"max_age_policy.*">>, <<"queues">>, [{<<"max-age">>, <<"1Y">>}]), - Info = find_queue_info(Config, [policy, operator_policy, effective_policy_definition]), - - ?assertEqual(PolicyName, proplists:get_value(policy, Info)), - ?assertEqual('', proplists:get_value(operator_policy, Info)), - ?assertEqual([{<<"max-age">>, <<"1Y">>}], - proplists:get_value(effective_policy_definition, Info)), - + %% Policies are asynchronous, must wait until it has been applied everywhere + ensure_retention_applied(Config, Server), + ?awaitMatch( + {PolicyName, '', [{<<"max-age">>, <<"1Y">>}]}, + begin + Info = find_queue_info(Config, [policy, operator_policy, effective_policy_definition]), + {proplists:get_value(policy, Info), proplists:get_value(operator_policy, Info), proplists:get_value(effective_policy_definition, Info)} + end, + 30000), ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, PolicyName), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). diff --git a/deps/rabbit/test/unit_log_management_SUITE.erl b/deps/rabbit/test/unit_log_management_SUITE.erl index 95fc9222b8..3921f8ab6f 100644 --- a/deps/rabbit/test/unit_log_management_SUITE.erl +++ b/deps/rabbit/test/unit_log_management_SUITE.erl @@ -12,6 +12,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("kernel/include/file.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile(export_all). @@ -71,7 +72,7 @@ app_management(Config) -> ?MODULE, app_management1, [Config]). app_management1(_Config) -> - wait_for_application(rabbit), + ?awaitMatch(true, lists:keymember(rabbit, 1, application:which_applications()), 30000), %% Starting, stopping and diagnostics. Note that we don't try %% 'report' when the rabbit app is stopped and that we enable %% tracing for the duration of this function. @@ -92,22 +93,6 @@ no_exceptions(Mod, Fun, Args) -> catch Type:Ex -> {Type, Ex} end. -wait_for_application(Application) -> - wait_for_application(Application, 5000). - -wait_for_application(_, Time) when Time =< 0 -> - {error, timeout}; -wait_for_application(Application, Time) -> - Interval = 100, - case lists:keyfind(Application, 1, application:which_applications()) of - false -> - timer:sleep(Interval), - wait_for_application(Application, Time - Interval); - _ -> ok - end. - - - %% ------------------------------------------------------------------- %% Log management. %% ------------------------------------------------------------------- @@ -245,8 +230,9 @@ non_empty_files(Files) -> test_logs_working(LogFiles) -> ok = rabbit_log:error("Log a test message"), %% give the error loggers some time to catch up - timer:sleep(1000), - lists:all(fun(LogFile) -> [true] =:= non_empty_files([LogFile]) end, LogFiles), + ?awaitMatch(true, + lists:all(fun(LogFile) -> [true] =:= non_empty_files([LogFile]) end, LogFiles), + 30000), ok. set_permissions(Path, Mode) -> diff --git a/deps/rabbit/test/vhost_SUITE.erl b/deps/rabbit/test/vhost_SUITE.erl index 44362ce9fe..b32bb83b21 100644 --- a/deps/rabbit/test/vhost_SUITE.erl +++ b/deps/rabbit/test/vhost_SUITE.erl @@ -7,12 +7,15 @@ -module(vhost_SUITE). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("eunit/include/eunit.hrl"). -compile(export_all). +-define(AWAIT_TIMEOUT, 30000). + all() -> [ {group, cluster_size_1_network}, @@ -129,21 +132,20 @@ single_node_vhost_deletion_forces_connection_closure(Config) -> set_up_vhost(Config, VHost1), set_up_vhost(Config, VHost2), - ?assertEqual(0, count_connections_in(Config, VHost1)), - ?assertEqual(0, count_connections_in(Config, VHost2)), + ?awaitMatch(0, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT), + ?awaitMatch(0, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT), [Conn1] = open_connections(Config, [{0, VHost1}]), - ?assertEqual(1, count_connections_in(Config, VHost1)), + ?awaitMatch(1, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT), [_Conn2] = open_connections(Config, [{0, VHost2}]), - ?assertEqual(1, count_connections_in(Config, VHost2)), + ?awaitMatch(1, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT), rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), - timer:sleep(200), - ?assertEqual(0, count_connections_in(Config, VHost2)), + ?awaitMatch(0, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT), close_connections([Conn1]), - ?assertEqual(0, count_connections_in(Config, VHost1)). + ?awaitMatch(0, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT). vhost_failure_forces_connection_closure(Config) -> VHost1 = <<"vhost1">>, @@ -152,21 +154,20 @@ vhost_failure_forces_connection_closure(Config) -> set_up_vhost(Config, VHost1), set_up_vhost(Config, VHost2), - ?assertEqual(0, count_connections_in(Config, VHost1)), - ?assertEqual(0, count_connections_in(Config, VHost2)), + ?awaitMatch(0, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT), + ?awaitMatch(0, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT), [Conn1] = open_connections(Config, [{0, VHost1}]), - ?assertEqual(1, count_connections_in(Config, VHost1)), + ?awaitMatch(1, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT), [_Conn2] = open_connections(Config, [{0, VHost2}]), - ?assertEqual(1, count_connections_in(Config, VHost2)), + ?awaitMatch(1, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT), rabbit_ct_broker_helpers:force_vhost_failure(Config, VHost2), - timer:sleep(200), - ?assertEqual(0, count_connections_in(Config, VHost2)), + ?awaitMatch(0, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT), close_connections([Conn1]), - ?assertEqual(0, count_connections_in(Config, VHost1)). + ?awaitMatch(0, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT). vhost_failure_forces_connection_closure_on_failure_node(Config) -> @@ -176,25 +177,24 @@ vhost_failure_forces_connection_closure_on_failure_node(Config) -> set_up_vhost(Config, VHost1), set_up_vhost(Config, VHost2), - ?assertEqual(0, count_connections_in(Config, VHost1)), - ?assertEqual(0, count_connections_in(Config, VHost2)), + ?awaitMatch(0, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT), + ?awaitMatch(0, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT), [Conn1] = open_connections(Config, [{0, VHost1}]), - ?assertEqual(1, count_connections_in(Config, VHost1)), + ?awaitMatch(1, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT), [_Conn20] = open_connections(Config, [{0, VHost2}]), [_Conn21] = open_connections(Config, [{1, VHost2}]), - ?assertEqual(2, count_connections_in(Config, VHost2)), + ?awaitMatch(2, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT), rabbit_ct_broker_helpers:force_vhost_failure(Config, 0, VHost2), - timer:sleep(200), %% Vhost2 connection on node 1 is still alive - ?assertEqual(1, count_connections_in(Config, VHost2)), + ?awaitMatch(1, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT), %% Vhost1 connection on node 0 is still alive - ?assertEqual(1, count_connections_in(Config, VHost1)), + ?awaitMatch(1, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT), close_connections([Conn1]), - ?assertEqual(0, count_connections_in(Config, VHost1)). + ?awaitMatch(0, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT). cluster_vhost_deletion_forces_connection_closure(Config) -> @@ -204,21 +204,20 @@ cluster_vhost_deletion_forces_connection_closure(Config) -> set_up_vhost(Config, VHost1), set_up_vhost(Config, VHost2), - ?assertEqual(0, count_connections_in(Config, VHost1)), - ?assertEqual(0, count_connections_in(Config, VHost2)), + ?awaitMatch(0, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT), + ?awaitMatch(0, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT), [Conn1] = open_connections(Config, [{0, VHost1}]), - ?assertEqual(1, count_connections_in(Config, VHost1)), + ?awaitMatch(1, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT), [_Conn2] = open_connections(Config, [{1, VHost2}]), - ?assertEqual(1, count_connections_in(Config, VHost2)), + ?awaitMatch(1, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT), rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), - timer:sleep(200), - ?assertEqual(0, count_connections_in(Config, VHost2)), + ?awaitMatch(0, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT), close_connections([Conn1]), - ?assertEqual(0, count_connections_in(Config, VHost1)). + ?awaitMatch(0, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT). node_starts_with_dead_vhosts(Config) -> VHost1 = <<"vhost1">>, @@ -244,10 +243,11 @@ node_starts_with_dead_vhosts(Config) -> %% The node should start without a vhost ok = rabbit_ct_broker_helpers:start_node(Config, 1), - timer:sleep(3000), - - ?assertEqual(true, rabbit_ct_broker_helpers:rpc(Config, 1, - rabbit_vhost_sup_sup, is_vhost_alive, [VHost2])). + ?awaitMatch( + true, + rabbit_ct_broker_helpers:rpc(Config, 1, + rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]), + ?AWAIT_TIMEOUT). node_starts_with_dead_vhosts_with_mirrors(Config) -> VHost1 = <<"vhost1">>, @@ -274,7 +274,15 @@ node_starts_with_dead_vhosts_with_mirrors(Config) -> 0, <<"queues">>, <<"acting-user">>]), %% Wait for the queue to start a mirror - timer:sleep(500), + ?awaitMatch([_], + begin + {ok, Q0} = rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_amqqueue, lookup, + [rabbit_misc:r(VHost1, queue, QName)], infinity), + amqqueue:get_sync_slave_pids(Q0) + end, + ?AWAIT_TIMEOUT), rabbit_ct_client_helpers:publish(Chan, QName, 10), @@ -299,7 +307,9 @@ node_starts_with_dead_vhosts_with_mirrors(Config) -> %% The node should start without a vhost ok = rabbit_ct_broker_helpers:start_node(Config, 1), - timer:sleep(3000), + ?awaitMatch(true, + rabbit_ct_broker_helpers:rpc(Config, 1, rabbit, is_running, []), + ?AWAIT_TIMEOUT), ?assertEqual(true, rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_vhost_sup_sup, is_vhost_alive, [VHost2])). @@ -435,27 +445,23 @@ open_connections(Config, NodesAndVHosts) -> network -> open_unmanaged_connection; direct -> open_unmanaged_connection_direct end, - Conns = lists:map(fun + lists:map(fun ({Node, VHost}) -> rabbit_ct_client_helpers:OpenConnectionFun(Config, Node, VHost); (Node) -> rabbit_ct_client_helpers:OpenConnectionFun(Config, Node) - end, NodesAndVHosts), - timer:sleep(500), - Conns. + end, NodesAndVHosts). close_connections(Conns) -> lists:foreach(fun (Conn) -> rabbit_ct_client_helpers:close_connection(Conn) - end, Conns), - timer:sleep(500). + end, Conns). count_connections_in(Config, VHost) -> count_connections_in(Config, VHost, 0). count_connections_in(Config, VHost, NodeIndex) -> - timer:sleep(200), rabbit_ct_broker_helpers:rpc(Config, NodeIndex, rabbit_connection_tracking, count_tracked_items_in, [{vhost, VHost}]).