Merge pull request #8694 from rabbitmq/test-resilience

Tests: more resilient time-dependent tests
This commit is contained in:
Jean-Sébastien Pédron 2023-07-07 16:42:25 +02:00 committed by GitHub
commit eaf1f0e56b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 610 additions and 553 deletions

View File

@ -377,7 +377,6 @@ rabbitmq_integration_suite(
additional_beam = [
":test_quorum_queue_utils_beam",
],
flaky = True,
shard_count = 7,
)
@ -565,7 +564,6 @@ rabbitmq_integration_suite(
rabbitmq_integration_suite(
name = "per_user_connection_channel_limit_partitions_SUITE",
size = "large",
flaky = True,
)
rabbitmq_integration_suite(
@ -1045,7 +1043,6 @@ rabbitmq_integration_suite(
additional_beam = [
"test/test_rabbit_event_handler.beam",
],
flaky = True,
)
rabbitmq_suite(

View File

@ -149,7 +149,7 @@ TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck prop
PLT_APPS += mnesia
dep_syslog = git https://github.com/schlagert/syslog 4.0.0
dep_osiris = git https://github.com/rabbitmq/osiris v1.6.0
dep_osiris = git https://github.com/rabbitmq/osiris v1.6.1
dep_systemd = hex 0.6.1
dep_seshat = hex 0.4.0

14
deps/rabbit/app.bzl vendored
View File

@ -880,7 +880,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
outs = ["test/definition_import_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/rabbit_common:erlang_app"],
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "deprecated_features_SUITE_beam_files",
@ -932,7 +932,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
outs = ["test/dynamic_qq_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "eager_sync_SUITE_beam_files",
@ -1111,7 +1111,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
outs = ["test/per_user_connection_tracking_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "per_vhost_connection_limit_SUITE_beam_files",
@ -1147,7 +1147,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
outs = ["test/per_vhost_queue_limit_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "policy_SUITE_beam_files",
@ -1218,7 +1218,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
outs = ["test/queue_master_location_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "queue_parallel_SUITE_beam_files",
@ -1726,7 +1726,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
outs = ["test/unit_log_management_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "unit_operator_policy_SUITE_beam_files",
@ -1828,7 +1828,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
outs = ["test/vhost_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "rabbit_cuttlefish_SUITE_beam_files",

View File

@ -188,14 +188,15 @@ init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config1, Testcase).
end_per_testcase(Testcase, Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_dlx, Config)}),
amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_dlx_2, Config)}),
amqp_channel:call(Ch, #'exchange.delete'{exchange = ?config(dlx_exchange, Config)}),
_ = rabbit_ct_broker_helpers:clear_policy(Config, 0, ?config(policy, Config)),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
delete_queues() ->
[rabbit_amqqueue:delete(Q, false, false, <<"tests">>) || Q <- rabbit_amqqueue:list()].
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Dead letter exchanges
%%
@ -850,7 +851,15 @@ dead_letter_policy(Config) ->
<<"queues">>,
[{<<"dead-letter-exchange">>, DLXExchange},
{<<"dead-letter-routing-key">>, DLXQName}]),
timer:sleep(1000),
?awaitMatch([_ | _],
begin
{ok, Q0} = rabbit_ct_broker_helpers:rpc(
Config, 0,
rabbit_amqqueue, lookup,
[rabbit_misc:r(<<"/">>, queue, QName)], infinity),
amqqueue:get_policy(Q0)
end,
30000),
%% Nack the second message
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2,
multiple = false,
@ -1319,7 +1328,12 @@ dead_letter_headers_first_death_route(Config) ->
%% Send and reject the 3rd message.
P3 = <<"msg3">>,
publish(Ch, QName2, [P3]),
timer:sleep(1000),
case group_name(Config) of
at_most_once ->
wait_for_messages(Config, [[QName2, <<"1">>, <<"1">>, <<"0">>]]);
at_least_once ->
wait_for_messages(Config, [[QName2, <<"2">>, <<"1">>, <<"0">>]])
end,
[DTag] = consume(Ch, QName2, [P3]),
amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag,
requeue = false}),

View File

@ -7,6 +7,7 @@
-module(definition_import_SUITE).
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("eunit/include/eunit.hrl").
@ -245,8 +246,7 @@ import_case13a(Config) ->
%% We expect that importing an existing queue (i.e. same vhost and name)
%% but with different arguments and different properties is a no-op.
import_file_case(Config, "case13a"),
timer:sleep(1000),
?assertMatch({ok, Q}, queue_lookup(Config, VHost, QueueName)).
?awaitMatch({ok, Q}, queue_lookup(Config, VHost, QueueName), 30000).
import_case14(Config) -> import_file_case(Config, "case14").
%% contains a user with tags as a list

View File

@ -9,6 +9,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-import(quorum_queue_utils, [wait_for_messages_ready/3,
ra_name/1]).
@ -179,11 +180,14 @@ quorum_unaffected_after_vhost_failure(Config) ->
arguments = Args,
durable = true
}),
timer:sleep(300),
?awaitMatch(
Servers,
begin
Info0 = rpc:call(A, rabbit_quorum_queue, infos,
[rabbit_misc:r(<<"/">>, queue, QName)]),
?assertEqual(Servers, lists:sort(proplists:get_value(online, Info0, []))),
lists:sort(proplists:get_value(online, Info0, []))
end,
60000),
%% Crash vhost on both nodes
{ok, SupA} = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_vhost_sup_sup, get_vhost_sup, [<<"/">>]),
@ -191,9 +195,14 @@ quorum_unaffected_after_vhost_failure(Config) ->
{ok, SupB} = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, get_vhost_sup, [<<"/">>]),
exit(SupB, foo),
?awaitMatch(
Servers,
begin
Info = rpc:call(A, rabbit_quorum_queue, infos,
[rabbit_misc:r(<<"/">>, queue, QName)]),
?assertEqual(Servers, lists:sort(proplists:get_value(online, Info, []))).
lists:sort(proplists:get_value(online, Info, []))
end,
60000).
recover_follower_after_standalone_restart(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of

View File

@ -105,7 +105,10 @@ cluster_full_partition_with_autoheal(Config) ->
%% B drops off the network, non-reachable by either A or C
rabbit_ct_broker_helpers:block_traffic_between(A, B),
rabbit_ct_broker_helpers:block_traffic_between(B, C),
timer:sleep(?DELAY),
LargeCluster = lists:sort([A, C]),
?awaitMatch(LargeCluster, list_running(Config, A), 60000, 3000),
?awaitMatch([B], list_running(Config, B), 60000, 3000),
?awaitMatch(LargeCluster, list_running(Config, C), 60000, 3000),
%% A and C are still connected, so 4 connections are tracked
%% All connections to B are dropped
@ -116,7 +119,10 @@ cluster_full_partition_with_autoheal(Config) ->
rabbit_ct_broker_helpers:allow_traffic_between(A, B),
rabbit_ct_broker_helpers:allow_traffic_between(B, C),
timer:sleep(?DELAY),
All = lists:sort([A, B, C]),
?awaitMatch(All, list_running(Config, A), 60000, 3000),
?awaitMatch(All, list_running(Config, B), 60000, 3000),
?awaitMatch(All, list_running(Config, C), 60000, 3000),
%% during autoheal B's connections were dropped
?awaitMatch({4, 10},
@ -167,3 +173,18 @@ tracked_list_of_user(Config, NodeIndex, TrackingMod, Username) ->
rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
TrackingMod,
list_of_user, [Username]).
list_running(Config, NodeIndex) ->
Ret = (catch rabbit_ct_broker_helpers:rpc(Config, NodeIndex, rabbit_nodes, list_running, [])),
Running = case Ret of
{'EXIT', {{exception, undef, _}, _}} ->
rabbit_ct_broker_helpers:rpc(Config, NodeIndex, rabbit_mnesia, cluster_nodes, [running]);
_ ->
Ret
end,
case Running of
List when is_list(List) ->
lists:sort(List);
Any ->
Any
end.

View File

@ -14,6 +14,8 @@
-compile(export_all).
-define(A_TOUT, 20000).
all() ->
[
{group, cluster_size_1_network},
@ -92,6 +94,11 @@ init_per_testcase(Testcase, Config) ->
Config.
end_per_testcase(Testcase, Config) ->
Vhost = proplists:get_value(rmq_vhost, Config),
Username = proplists:get_value(rmq_username, Config),
rabbit_ct_broker_helpers:add_vhost(Config, Vhost),
rabbit_ct_broker_helpers:add_user(Config, Username),
rabbit_ct_broker_helpers:set_full_permissions(Config, Username, Vhost),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
%% -------------------------------------------------------------------
@ -106,73 +113,78 @@ single_node_user_connection_channel_tracking(Config) ->
rabbit_ct_broker_helpers:add_user(Config, Username2),
rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
?assertEqual(0, count_connections_in(Config, Username)),
?assertEqual(0, count_connections_in(Config, Username2)),
?assertEqual(0, count_channels_in(Config, Username)),
?assertEqual(0, count_channels_in(Config, Username2)),
?assertEqual(0, tracked_user_connection_count(Config, Username)),
?assertEqual(0, tracked_user_connection_count(Config, Username2)),
?assertEqual(0, tracked_user_channel_count(Config, Username)),
?assertEqual(0, tracked_user_channel_count(Config, Username2)),
?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT),
[Conn1] = open_connections(Config, [0]),
[Chan1] = open_channels(Conn1, 1),
?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT),
[#tracked_connection{username = Username}] = connections_in(Config, Username),
?awaitMatch(1, count_channels_in(Config, Username), ?A_TOUT),
[#tracked_channel{username = Username}] = channels_in(Config, Username),
?assertEqual(true, is_process_alive(Conn1)),
?assertEqual(true, is_process_alive(Chan1)),
?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT),
?awaitMatch(true, is_process_alive(Chan1), ?A_TOUT),
close_channels([Chan1]),
?awaitMatch(0, count_channels_in(Config, Username), 20000),
?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000),
?awaitMatch(false, is_process_alive(Chan1), 20000),
?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(false, is_process_alive(Chan1), ?A_TOUT),
close_connections([Conn1]),
?awaitMatch(0, length(connections_in(Config, Username)), 20000),
?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000),
?awaitMatch(false, is_process_alive(Conn1), 20000),
?awaitMatch(0, length(connections_in(Config, Username)), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(false, is_process_alive(Conn1), ?A_TOUT),
[Conn2] = open_connections(Config, [{0, Username2}]),
Chans2 = [_|_] = open_channels(Conn2, 5),
timer:sleep(100),
?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT),
[#tracked_connection{username = Username2}] = connections_in(Config, Username2),
?assertEqual(5, count_channels_in(Config, Username2)),
?assertEqual(1, tracked_user_connection_count(Config, Username2)),
?assertEqual(5, tracked_user_channel_count(Config, Username2)),
?assertEqual(true, is_process_alive(Conn2)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn2), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2],
[Conn3] = open_connections(Config, [0]),
Chans3 = [_|_] = open_channels(Conn3, 5),
?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT),
[#tracked_connection{username = Username}] = connections_in(Config, Username),
?assertEqual(5, count_channels_in(Config, Username)),
?assertEqual(1, tracked_user_connection_count(Config, Username)),
?assertEqual(5, tracked_user_channel_count(Config, Username)),
?assertEqual(true, is_process_alive(Conn3)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans3],
?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn3), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans3],
[Conn4] = open_connections(Config, [0]),
Chans4 = [_|_] = open_channels(Conn4, 5),
?assertEqual(2, tracked_user_connection_count(Config, Username)),
?assertEqual(10, tracked_user_channel_count(Config, Username)),
?assertEqual(true, is_process_alive(Conn4)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans4],
?awaitMatch(2, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(10, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn4), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans4],
kill_connections([Conn4]),
?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT),
[#tracked_connection{username = Username}] = connections_in(Config, Username),
?awaitMatch(5, count_channels_in(Config, Username), 20000),
?awaitMatch(1, tracked_user_connection_count(Config, Username), 20000),
?awaitMatch(5, tracked_user_channel_count(Config, Username), 20000),
?assertEqual(false, is_process_alive(Conn4)),
[?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans4],
?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(false, is_process_alive(Conn4), ?A_TOUT),
[?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans4],
[Conn5] = open_connections(Config, [0]),
Chans5 = [_|_] = open_channels(Conn5, 7),
?awaitMatch(2, count_connections_in(Config, Username), ?A_TOUT),
[Username, Username] =
lists:map(fun (#tracked_connection{username = U}) -> U end,
connections_in(Config, Username)),
?assertEqual(12, count_channels_in(Config, Username)),
?assertEqual(12, tracked_user_channel_count(Config, Username)),
?assertEqual(2, tracked_user_connection_count(Config, Username)),
?assertEqual(true, is_process_alive(Conn5)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans5],
?awaitMatch(12, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(12, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(2, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn5), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans5],
close_channels(Chans2 ++ Chans3 ++ Chans5),
?awaitMatch(0, length(all_channels(Config)), 20000),
@ -196,57 +208,56 @@ single_node_user_deletion(Config) ->
rabbit_ct_broker_helpers:add_user(Config, Username2),
rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
?assertEqual(100, get_tracking_execution_timeout(Config)),
?awaitMatch(100, get_tracking_execution_timeout(Config), ?A_TOUT),
?assertEqual(0, count_connections_in(Config, Username)),
?assertEqual(0, count_connections_in(Config, Username2)),
?assertEqual(0, count_channels_in(Config, Username)),
?assertEqual(0, count_channels_in(Config, Username2)),
?assertEqual(0, tracked_user_connection_count(Config, Username)),
?assertEqual(0, tracked_user_connection_count(Config, Username2)),
?assertEqual(0, tracked_user_channel_count(Config, Username)),
?assertEqual(0, tracked_user_channel_count(Config, Username2)),
?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT),
[Conn1] = open_connections(Config, [0]),
Chans1 = [_|_] = open_channels(Conn1, 5),
?assertEqual(1, count_connections_in(Config, Username)),
?assertEqual(5, count_channels_in(Config, Username)),
?assertEqual(1, tracked_user_connection_count(Config, Username)),
?assertEqual(5, tracked_user_channel_count(Config, Username)),
?assertEqual(true, is_process_alive(Conn1)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1],
[Conn2] = open_connections(Config, [{0, Username2}]),
Chans2 = [_|_] = open_channels(Conn2, 5),
?assertEqual(1, count_connections_in(Config, Username2)),
?assertEqual(5, count_channels_in(Config, Username2)),
?assertEqual(1, tracked_user_connection_count(Config, Username2)),
?assertEqual(5, tracked_user_channel_count(Config, Username2)),
?assertEqual(true, is_process_alive(Conn2)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn2), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2],
?assertEqual(true, exists_in_tracked_connection_per_user_table(Config, Username2)),
?assertEqual(true, exists_in_tracked_channel_per_user_table(Config, Username2)),
?awaitMatch(true, exists_in_tracked_connection_per_user_table(Config, Username2), ?A_TOUT),
?awaitMatch(true, exists_in_tracked_channel_per_user_table(Config, Username2), ?A_TOUT),
rabbit_ct_broker_helpers:delete_user(Config, Username2),
timer:sleep(100),
?assertEqual(0, count_connections_in(Config, Username2)),
?assertEqual(0, count_channels_in(Config, Username2)),
?assertEqual(0, tracked_user_connection_count(Config, Username2)),
?assertEqual(0, tracked_user_channel_count(Config, Username2)),
?assertEqual(false, is_process_alive(Conn2)),
[?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT),
?awaitMatch(false, is_process_alive(Conn2), ?A_TOUT),
[?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2],
%% ensure vhost entry is cleared after 'tracking_execution_timeout'
?awaitMatch(false, exists_in_tracked_connection_per_user_table(Config, Username2), 20000),
?awaitMatch(false, exists_in_tracked_channel_per_user_table(Config, Username2), 20000),
?assertEqual(1, count_connections_in(Config, Username)),
?assertEqual(5, count_channels_in(Config, Username)),
?assertEqual(1, tracked_user_connection_count(Config, Username)),
?assertEqual(5, tracked_user_channel_count(Config, Username)),
?assertEqual(true, is_process_alive(Conn1)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1],
close_channels(Chans1),
?awaitMatch(0, count_channels_in(Config, Username), 20000),
@ -267,55 +278,54 @@ single_node_vhost_deletion(Config) ->
rabbit_ct_broker_helpers:add_user(Config, Username2),
rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
?assertEqual(100, get_tracking_execution_timeout(Config)),
?awaitMatch(100, get_tracking_execution_timeout(Config), ?A_TOUT),
?assertEqual(0, count_connections_in(Config, Username)),
?assertEqual(0, count_connections_in(Config, Username2)),
?assertEqual(0, count_channels_in(Config, Username)),
?assertEqual(0, count_channels_in(Config, Username2)),
?assertEqual(0, tracked_user_connection_count(Config, Username)),
?assertEqual(0, tracked_user_connection_count(Config, Username2)),
?assertEqual(0, tracked_user_channel_count(Config, Username)),
?assertEqual(0, tracked_user_channel_count(Config, Username2)),
?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT),
[Conn1] = open_connections(Config, [0]),
Chans1 = [_|_] = open_channels(Conn1, 5),
?assertEqual(1, count_connections_in(Config, Username)),
?assertEqual(5, count_channels_in(Config, Username)),
?assertEqual(1, tracked_user_connection_count(Config, Username)),
?assertEqual(5, tracked_user_channel_count(Config, Username)),
?assertEqual(true, is_process_alive(Conn1)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1],
[Conn2] = open_connections(Config, [{0, Username2}]),
Chans2 = [_|_] = open_channels(Conn2, 5),
?assertEqual(1, count_connections_in(Config, Username2)),
?assertEqual(5, count_channels_in(Config, Username2)),
?assertEqual(1, tracked_user_connection_count(Config, Username2)),
?assertEqual(5, tracked_user_channel_count(Config, Username2)),
?assertEqual(true, is_process_alive(Conn2)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn2), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2],
?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, Vhost)),
?awaitMatch(true, exists_in_tracked_connection_per_vhost_table(Config, Vhost), ?A_TOUT),
rabbit_ct_broker_helpers:delete_vhost(Config, Vhost),
timer:sleep(200),
?assertEqual(0, count_connections_in(Config, Username2)),
?assertEqual(0, count_channels_in(Config, Username2)),
?assertEqual(0, tracked_user_connection_count(Config, Username2)),
?assertEqual(0, tracked_user_channel_count(Config, Username2)),
?assertEqual(false, is_process_alive(Conn2)),
[?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT),
?awaitMatch(false, is_process_alive(Conn2), ?A_TOUT),
[?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2],
?assertEqual(0, count_connections_in(Config, Username)),
?assertEqual(0, count_channels_in(Config, Username)),
?assertEqual(0, tracked_user_connection_count(Config, Username)),
?assertEqual(0, tracked_user_channel_count(Config, Username)),
?assertEqual(false, is_process_alive(Conn1)),
[?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1],
?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(false, is_process_alive(Conn1), ?A_TOUT),
[?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1],
%% ensure vhost entry is cleared after 'tracking_execution_timeout'
?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, Vhost)),
?awaitMatch(false, exists_in_tracked_connection_per_vhost_table(Config, Vhost), 20000),
rabbit_ct_broker_helpers:add_vhost(Config, Vhost).
@ -328,49 +338,48 @@ single_node_vhost_down_mimic(Config) ->
rabbit_ct_broker_helpers:add_user(Config, Username2),
rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
?assertEqual(0, count_connections_in(Config, Username)),
?assertEqual(0, count_connections_in(Config, Username2)),
?assertEqual(0, count_channels_in(Config, Username)),
?assertEqual(0, count_channels_in(Config, Username2)),
?assertEqual(0, tracked_user_connection_count(Config, Username)),
?assertEqual(0, tracked_user_connection_count(Config, Username2)),
?assertEqual(0, tracked_user_channel_count(Config, Username)),
?assertEqual(0, tracked_user_channel_count(Config, Username2)),
?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT),
[Conn1] = open_connections(Config, [0]),
Chans1 = [_|_] = open_channels(Conn1, 5),
?assertEqual(1, count_connections_in(Config, Username)),
?assertEqual(5, count_channels_in(Config, Username)),
?assertEqual(1, tracked_user_connection_count(Config, Username)),
?assertEqual(5, tracked_user_channel_count(Config, Username)),
?assertEqual(true, is_process_alive(Conn1)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1],
[Conn2] = open_connections(Config, [{0, Username2}]),
Chans2 = [_|_] = open_channels(Conn2, 5),
?assertEqual(1, count_connections_in(Config, Username2)),
?assertEqual(5, count_channels_in(Config, Username2)),
?assertEqual(1, tracked_user_connection_count(Config, Username2)),
?assertEqual(5, tracked_user_channel_count(Config, Username2)),
?assertEqual(true, is_process_alive(Conn2)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn2), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2],
%% mimic vhost down event, while connections exist
mimic_vhost_down(Config, 0, Vhost),
timer:sleep(200),
?assertEqual(0, count_connections_in(Config, Username2)),
?assertEqual(0, count_channels_in(Config, Username2)),
?assertEqual(0, tracked_user_connection_count(Config, Username2)),
?assertEqual(0, tracked_user_channel_count(Config, Username2)),
?assertEqual(false, is_process_alive(Conn2)),
[?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT),
?awaitMatch(false, is_process_alive(Conn2), ?A_TOUT),
[?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2],
?assertEqual(0, count_connections_in(Config, Username)),
?assertEqual(0, count_channels_in(Config, Username)),
?assertEqual(0, tracked_user_connection_count(Config, Username)),
?assertEqual(0, tracked_user_channel_count(Config, Username)),
?assertEqual(false, is_process_alive(Conn1)),
[?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1].
?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(false, is_process_alive(Conn1), ?A_TOUT),
[?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1].
cluster_user_deletion(Config) ->
set_tracking_execution_timeout(Config, 0, 100),
@ -383,59 +392,58 @@ cluster_user_deletion(Config) ->
rabbit_ct_broker_helpers:add_user(Config, Username2),
rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
?assertEqual(100, get_tracking_execution_timeout(Config, 0)),
?assertEqual(100, get_tracking_execution_timeout(Config, 1)),
?awaitMatch(100, get_tracking_execution_timeout(Config, 0), ?A_TOUT),
?awaitMatch(100, get_tracking_execution_timeout(Config, 1), ?A_TOUT),
?assertEqual(0, count_connections_in(Config, Username)),
?assertEqual(0, count_connections_in(Config, Username2)),
?assertEqual(0, count_channels_in(Config, Username)),
?assertEqual(0, count_channels_in(Config, Username2)),
?assertEqual(0, tracked_user_connection_count(Config, Username)),
?assertEqual(0, tracked_user_connection_count(Config, Username2)),
?assertEqual(0, tracked_user_channel_count(Config, Username)),
?assertEqual(0, tracked_user_channel_count(Config, Username2)),
?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT),
[Conn1] = open_connections(Config, [0]),
Chans1 = [_|_] = open_channels(Conn1, 5),
?assertEqual(1, count_connections_in(Config, Username)),
?assertEqual(5, count_channels_in(Config, Username)),
?assertEqual(1, tracked_user_connection_count(Config, Username)),
?assertEqual(5, tracked_user_channel_count(Config, Username)),
?assertEqual(true, is_process_alive(Conn1)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1],
[Conn2] = open_connections(Config, [{1, Username2}]),
Chans2 = [_|_] = open_channels(Conn2, 5),
?assertEqual(1, count_connections_in(Config, Username2)),
?assertEqual(5, count_channels_in(Config, Username2)),
?assertEqual(1, tracked_user_connection_count(Config, Username2)),
?assertEqual(5, tracked_user_channel_count(Config, Username2)),
?assertEqual(true, is_process_alive(Conn2)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn2), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2],
?assertEqual(true, exists_in_tracked_connection_per_user_table(Config, 1, Username2)),
?assertEqual(true, exists_in_tracked_channel_per_user_table(Config, 1, Username2)),
?awaitMatch(true, exists_in_tracked_connection_per_user_table(Config, 1, Username2), ?A_TOUT),
?awaitMatch(true, exists_in_tracked_channel_per_user_table(Config, 1, Username2), ?A_TOUT),
rabbit_ct_broker_helpers:delete_user(Config, Username2),
timer:sleep(200),
?assertEqual(0, count_connections_in(Config, Username2)),
?assertEqual(0, count_channels_in(Config, Username2)),
?assertEqual(0, tracked_user_connection_count(Config, Username2)),
?assertEqual(0, tracked_user_channel_count(Config, Username2)),
?assertEqual(false, is_process_alive(Conn2)),
[?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT),
?awaitMatch(false, is_process_alive(Conn2), ?A_TOUT),
[?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2],
%% ensure user entry is cleared after 'tracking_execution_timeout'
?assertEqual(false, exists_in_tracked_connection_per_user_table(Config, 1, Username2)),
?assertEqual(false, exists_in_tracked_channel_per_user_table(Config, 1, Username2)),
?awaitMatch(false, exists_in_tracked_connection_per_user_table(Config, 1, Username2), ?A_TOUT),
?awaitMatch(false, exists_in_tracked_channel_per_user_table(Config, 1, Username2), ?A_TOUT),
close_channels(Chans1),
?awaitMatch(0, count_channels_in(Config, Username), 20000),
?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000),
?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT),
close_connections([Conn1]),
?awaitMatch(0, count_connections_in(Config, Username), 20000),
?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000).
?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT).
cluster_vhost_deletion(Config) ->
set_tracking_execution_timeout(Config, 0, 100),
@ -448,63 +456,58 @@ cluster_vhost_deletion(Config) ->
rabbit_ct_broker_helpers:add_user(Config, Username2),
rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
?assertEqual(100, get_tracking_execution_timeout(Config, 0)),
?assertEqual(100, get_tracking_execution_timeout(Config, 1)),
?awaitMatch(100, get_tracking_execution_timeout(Config, 0), ?A_TOUT),
?awaitMatch(100, get_tracking_execution_timeout(Config, 1), ?A_TOUT),
?assertEqual(0, count_connections_in(Config, Username)),
?assertEqual(0, count_connections_in(Config, Username2)),
?assertEqual(0, count_channels_in(Config, Username)),
?assertEqual(0, count_channels_in(Config, Username2)),
?assertEqual(0, tracked_user_connection_count(Config, Username)),
?assertEqual(0, tracked_user_connection_count(Config, Username2)),
?assertEqual(0, tracked_user_channel_count(Config, Username)),
?assertEqual(0, tracked_user_channel_count(Config, Username2)),
?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT),
[Conn1] = open_connections(Config, [{0, Username}]),
Chans1 = [_|_] = open_channels(Conn1, 5),
?assertEqual(1, count_connections_in(Config, Username)),
?assertEqual(5, count_channels_in(Config, Username)),
?assertEqual(1, tracked_user_connection_count(Config, Username)),
?assertEqual(5, tracked_user_channel_count(Config, Username)),
?assertEqual(true, is_process_alive(Conn1)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1],
[Conn2] = open_connections(Config, [{1, Username2}]),
Chans2 = [_|_] = open_channels(Conn2, 5),
?assertEqual(1, count_connections_in(Config, Username2)),
?assertEqual(5, count_channels_in(Config, Username2)),
?assertEqual(1, tracked_user_connection_count(Config, Username2)),
?assertEqual(5, tracked_user_channel_count(Config, Username2)),
?assertEqual(true, is_process_alive(Conn2)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn2), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2],
?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost)),
?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost)),
?awaitMatch(true, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost), ?A_TOUT),
?awaitMatch(true, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost), ?A_TOUT),
rabbit_ct_broker_helpers:delete_vhost(Config, Vhost),
timer:sleep(200),
?assertEqual(0, count_connections_in(Config, Username2)),
?assertEqual(0, count_channels_in(Config, Username2)),
?assertEqual(0, tracked_user_connection_count(Config, Username2)),
?assertEqual(0, tracked_user_channel_count(Config, Username2)),
?assertEqual(false, is_process_alive(Conn2)),
[?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT),
?awaitMatch(false, is_process_alive(Conn2), ?A_TOUT),
[?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2],
?assertEqual(0, count_connections_in(Config, Username)),
?assertEqual(0, count_channels_in(Config, Username)),
?assertEqual(0, tracked_user_connection_count(Config, Username)),
?assertEqual(0, tracked_user_channel_count(Config, Username)),
?assertEqual(false, is_process_alive(Conn1)),
[?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1],
?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(false, is_process_alive(Conn1), ?A_TOUT),
[?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1],
%% ensure vhost entry is cleared after 'tracking_execution_timeout'
?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost)),
?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost)),
rabbit_ct_broker_helpers:add_vhost(Config, Vhost),
rabbit_ct_broker_helpers:add_user(Config, Username),
rabbit_ct_broker_helpers:set_full_permissions(Config, Username, Vhost).
?awaitMatch(false, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost), ?A_TOUT),
?awaitMatch(false, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost), ?A_TOUT).
cluster_vhost_down_mimic(Config) ->
Username = proplists:get_value(rmq_username, Config),
@ -515,58 +518,56 @@ cluster_vhost_down_mimic(Config) ->
rabbit_ct_broker_helpers:add_user(Config, Username2),
rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
?assertEqual(0, count_connections_in(Config, Username)),
?assertEqual(0, count_connections_in(Config, Username2)),
?assertEqual(0, count_channels_in(Config, Username)),
?assertEqual(0, count_channels_in(Config, Username2)),
?assertEqual(0, tracked_user_connection_count(Config, Username)),
?assertEqual(0, tracked_user_connection_count(Config, Username2)),
?assertEqual(0, tracked_user_channel_count(Config, Username)),
?assertEqual(0, tracked_user_channel_count(Config, Username2)),
?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT),
[Conn1] = open_connections(Config, [{0, Username}]),
Chans1 = [_|_] = open_channels(Conn1, 5),
?assertEqual(1, count_connections_in(Config, Username)),
?assertEqual(5, count_channels_in(Config, Username)),
?assertEqual(1, tracked_user_connection_count(Config, Username)),
?assertEqual(5, tracked_user_channel_count(Config, Username)),
?assertEqual(true, is_process_alive(Conn1)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1],
[Conn2] = open_connections(Config, [{1, Username2}]),
Chans2 = [_|_] = open_channels(Conn2, 5),
?assertEqual(1, count_connections_in(Config, Username2)),
?assertEqual(5, count_channels_in(Config, Username2)),
?assertEqual(1, tracked_user_connection_count(Config, Username2)),
?assertEqual(5, tracked_user_channel_count(Config, Username2)),
?assertEqual(true, is_process_alive(Conn2)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn2), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2],
mimic_vhost_down(Config, 1, Vhost),
timer:sleep(100),
?assertEqual(0, count_connections_in(Config, Username2)),
?assertEqual(0, count_channels_in(Config, Username2)),
?assertEqual(0, tracked_user_connection_count(Config, Username2)),
?assertEqual(0, tracked_user_channel_count(Config, Username2)),
?assertEqual(false, is_process_alive(Conn2)),
[?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT),
?awaitMatch(false, is_process_alive(Conn2), ?A_TOUT),
[?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2],
%% gen_event notifies local handlers. remote connections still active
?assertEqual(1, count_connections_in(Config, Username)),
?assertEqual(5, count_channels_in(Config, Username)),
?assertEqual(1, tracked_user_connection_count(Config, Username)),
?assertEqual(5, tracked_user_channel_count(Config, Username)),
?assertEqual(true, is_process_alive(Conn1)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1],
mimic_vhost_down(Config, 0, Vhost),
timer:sleep(100),
?assertEqual(0, count_connections_in(Config, Username)),
?assertEqual(0, count_channels_in(Config, Username)),
?assertEqual(0, tracked_user_connection_count(Config, Username)),
?assertEqual(0, tracked_user_channel_count(Config, Username)),
?assertEqual(false, is_process_alive(Conn1)),
[?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1].
?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(false, is_process_alive(Conn1), ?A_TOUT),
[?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1].
cluster_node_removed(Config) ->
Username = proplists:get_value(rmq_username, Config),
@ -577,54 +578,52 @@ cluster_node_removed(Config) ->
rabbit_ct_broker_helpers:add_user(Config, Username2),
rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
?assertEqual(0, count_connections_in(Config, Username)),
?assertEqual(0, count_connections_in(Config, Username2)),
?assertEqual(0, count_channels_in(Config, Username)),
?assertEqual(0, count_channels_in(Config, Username2)),
?assertEqual(0, tracked_user_connection_count(Config, Username)),
?assertEqual(0, tracked_user_connection_count(Config, Username2)),
?assertEqual(0, tracked_user_channel_count(Config, Username)),
?assertEqual(0, tracked_user_channel_count(Config, Username2)),
?awaitMatch(0, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(0, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(0, tracked_user_channel_count(Config, Username2), ?A_TOUT),
[Conn1] = open_connections(Config, [{0, Username}]),
Chans1 = [_|_] = open_channels(Conn1, 5),
?assertEqual(1, count_connections_in(Config, Username)),
?assertEqual(5, count_channels_in(Config, Username)),
?assertEqual(1, tracked_user_connection_count(Config, Username)),
?assertEqual(5, tracked_user_channel_count(Config, Username)),
?assertEqual(true, is_process_alive(Conn1)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1],
[Conn2] = open_connections(Config, [{1, Username2}]),
Chans2 = [_|_] = open_channels(Conn2, 5),
?assertEqual(1, count_connections_in(Config, Username2)),
?assertEqual(5, count_channels_in(Config, Username2)),
?assertEqual(1, tracked_user_connection_count(Config, Username2)),
?assertEqual(5, tracked_user_channel_count(Config, Username2)),
?assertEqual(true, is_process_alive(Conn2)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT),
?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn2), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2],
rabbit_ct_broker_helpers:stop_broker(Config, 1),
timer:sleep(200),
?assertEqual(1, count_connections_in(Config, Username)),
?assertEqual(5, count_channels_in(Config, Username)),
?assertEqual(1, tracked_user_connection_count(Config, Username)),
?assertEqual(5, tracked_user_channel_count(Config, Username)),
?assertEqual(true, is_process_alive(Conn1)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1],
rabbit_ct_broker_helpers:forget_cluster_node(Config, 0, 1),
timer:sleep(200),
?assertEqual(false, is_process_alive(Conn2)),
[?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
?awaitMatch(false, is_process_alive(Conn2), ?A_TOUT),
[?awaitMatch(false, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans2],
?assertEqual(1, count_connections_in(Config, Username)),
?assertEqual(5, count_channels_in(Config, Username)),
?assertEqual(1, tracked_user_connection_count(Config, Username)),
?assertEqual(5, tracked_user_channel_count(Config, Username)),
?assertEqual(true, is_process_alive(Conn1)),
[?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT),
?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT),
?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT),
?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT),
?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT),
[?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans1],
close_channels(Chans1),
?awaitMatch(0, count_channels_in(Config, Username), 20000),
@ -651,22 +650,19 @@ open_connections(Config, NodesAndUsers) ->
(Node) ->
rabbit_ct_client_helpers:OpenConnectionFun(Config, Node)
end, NodesAndUsers),
timer:sleep(500),
Conns.
close_connections(Conns) ->
lists:foreach(fun
(Conn) ->
rabbit_ct_client_helpers:close_connection(Conn)
end, Conns),
timer:sleep(500).
end, Conns).
kill_connections(Conns) ->
lists:foreach(fun
(Conn) ->
(catch exit(Conn, please_terminate))
end, Conns),
timer:sleep(500).
end, Conns).
open_channels(Conn, N) ->
[begin
@ -792,9 +788,6 @@ get_tracking_execution_timeout(Config, NodeIndex) ->
[rabbit, tracking_execution_timeout]),
Timeout.
await_running_node_refresh(_Config, _NodeIndex) ->
timer:sleep(250).
expect_that_client_connection_is_rejected(Config) ->
expect_that_client_connection_is_rejected(Config, 0).

View File

@ -7,12 +7,15 @@
-module(per_user_connection_tracking_SUITE).
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-define(AWAIT_TIMEOUT, 30000).
all() ->
[
{group, cluster_size_1_network},
@ -100,32 +103,37 @@ single_node_list_of_user(Config) ->
rabbit_ct_broker_helpers:add_user(Config, Username2),
rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
?assertEqual(0, length(connections_in(Config, Username))),
?assertEqual(0, length(connections_in(Config, Username2))),
?assertEqual(0, count_connections_in(Config, Username)),
?assertEqual(0, count_connections_in(Config, Username2)),
[Conn1] = open_connections(Config, [0]),
?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT),
[#tracked_connection{username = Username}] = connections_in(Config, Username),
close_connections([Conn1]),
?assertEqual(0, length(connections_in(Config, Username))),
?awaitMatch(0, count_connections_in(Config, Username), ?AWAIT_TIMEOUT),
[Conn2] = open_connections(Config, [{0, Username2}]),
?awaitMatch(1, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT),
[#tracked_connection{username = Username2}] = connections_in(Config, Username2),
[Conn3] = open_connections(Config, [0]),
?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT),
[#tracked_connection{username = Username}] = connections_in(Config, Username),
[Conn4] = open_connections(Config, [0]),
kill_connections([Conn4]),
?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT),
[#tracked_connection{username = Username}] = connections_in(Config, Username),
[Conn5] = open_connections(Config, [0]),
?awaitMatch(2, count_connections_in(Config, Username), ?AWAIT_TIMEOUT),
[Username, Username] =
lists:map(fun (#tracked_connection{username = U}) -> U end,
connections_in(Config, Username)),
close_connections([Conn2, Conn3, Conn5]),
rabbit_ct_broker_helpers:delete_user(Config, Username2),
?assertEqual(0, length(all_connections(Config))).
?awaitMatch(0, length(all_connections(Config)), ?AWAIT_TIMEOUT).
single_node_user_deletion_forces_connection_closure(Config) ->
Username = proplists:get_value(rmq_username, Config),
@ -140,17 +148,16 @@ single_node_user_deletion_forces_connection_closure(Config) ->
?assertEqual(0, count_connections_in(Config, Username2)),
[Conn1] = open_connections(Config, [0]),
?assertEqual(1, count_connections_in(Config, Username)),
?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT),
[_Conn2] = open_connections(Config, [{0, Username2}]),
?assertEqual(1, count_connections_in(Config, Username2)),
?awaitMatch(1, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT),
rabbit_ct_broker_helpers:delete_user(Config, Username2),
timer:sleep(200),
?assertEqual(0, count_connections_in(Config, Username2)),
?awaitMatch(0, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT),
close_connections([Conn1]),
?assertEqual(0, count_connections_in(Config, Username)).
?awaitMatch(0, count_connections_in(Config, Username), ?AWAIT_TIMEOUT).
cluster_user_deletion_forces_connection_closure(Config) ->
Username = proplists:get_value(rmq_username, Config),
@ -165,17 +172,16 @@ cluster_user_deletion_forces_connection_closure(Config) ->
?assertEqual(0, count_connections_in(Config, Username2)),
[Conn1] = open_connections(Config, [{0, Username}]),
?assertEqual(1, count_connections_in(Config, Username)),
?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT),
[_Conn2] = open_connections(Config, [{1, Username2}]),
?assertEqual(1, count_connections_in(Config, Username2)),
?awaitMatch(1, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT),
rabbit_ct_broker_helpers:delete_user(Config, Username2),
timer:sleep(200),
?assertEqual(0, count_connections_in(Config, Username2)),
?awaitMatch(0, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT),
close_connections([Conn1]),
?assertEqual(0, count_connections_in(Config, Username)).
?awaitMatch(0, count_connections_in(Config, Username), ?AWAIT_TIMEOUT).
%% -------------------------------------------------------------------
%% Helpers
@ -194,22 +200,19 @@ open_connections(Config, NodesAndUsers) ->
(Node) ->
rabbit_ct_client_helpers:OpenConnectionFun(Config, Node)
end, NodesAndUsers),
timer:sleep(500),
Conns.
close_connections(Conns) ->
lists:foreach(fun
(Conn) ->
rabbit_ct_client_helpers:close_connection(Conn)
end, Conns),
timer:sleep(500).
end, Conns).
kill_connections(Conns) ->
lists:foreach(fun
(Conn) ->
(catch exit(Conn, please_terminate))
end, Conns),
timer:sleep(500).
end, Conns).
count_connections_in(Config, Username) ->
@ -244,17 +247,3 @@ set_vhost_connection_limit(Config, NodeIndex, VHost, Count) ->
set_vhost_limits, Node,
["{\"max-connections\": " ++ integer_to_list(Count) ++ "}"],
[{"-p", binary_to_list(VHost)}]).
await_running_node_refresh(_Config, _NodeIndex) ->
timer:sleep(250).
expect_that_client_connection_is_rejected(Config) ->
expect_that_client_connection_is_rejected(Config, 0).
expect_that_client_connection_is_rejected(Config, NodeIndex) ->
{error, not_allowed} =
rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex).
expect_that_client_connection_is_rejected(Config, NodeIndex, VHost) ->
{error, not_allowed} =
rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex, VHost).

View File

@ -407,8 +407,8 @@ cluster_node_list_on_node(Config) ->
rabbit_ct_broker_helpers:stop_broker(Config, 1),
?awaitMatch(2, length(all_connections(Config)), 1000),
?assertEqual(0, length(connections_on_node(Config, 0, B))),
?awaitMatch(2, length(all_connections(Config)), ?AWAIT, ?INTERVAL),
?awaitMatch(0, length(connections_on_node(Config, 0, B)), ?AWAIT, ?INTERVAL),
close_connections([Conn3, Conn5]),
?awaitMatch(0, length(all_connections(Config, 0)), ?AWAIT, ?INTERVAL),
@ -714,7 +714,6 @@ kill_connections(Conns) ->
count_connections_in(Config, VHost) ->
count_connections_in(Config, VHost, 0).
count_connections_in(Config, VHost, NodeIndex) ->
timer:sleep(200),
rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
rabbit_connection_tracking,
count_tracked_items_in, [{vhost, VHost}]).

View File

@ -7,6 +7,7 @@
-module(per_vhost_queue_limit_SUITE).
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").
@ -16,6 +17,8 @@
-import(rabbit_ct_client_helpers, [open_unmanaged_connection/3,
close_connection_and_channel/2]).
-define(AWAIT_TIMEOUT, 30000).
all() ->
[
{group, cluster_size_1}
@ -119,29 +122,29 @@ end_per_testcase(Testcase, Config) ->
most_basic_single_node_queue_count(Config) ->
VHost = <<"queue-limits">>,
set_up_vhost(Config, VHost),
?assertEqual(0, count_queues_in(Config, VHost)),
?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
Conn = open_unmanaged_connection(Config, 0, VHost),
{ok, Ch} = amqp_connection:open_channel(Conn),
declare_exclusive_queues(Ch, 10),
?assertEqual(10, count_queues_in(Config, VHost)),
?awaitMatch(10, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
close_connection_and_channel(Conn, Ch),
?assertEqual(0, count_queues_in(Config, VHost)),
?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
single_node_single_vhost_queue_count(Config) ->
VHost = <<"queue-limits">>,
set_up_vhost(Config, VHost),
?assertEqual(0, count_queues_in(Config, VHost)),
?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
Conn = open_unmanaged_connection(Config, 0, VHost),
{ok, Ch} = amqp_connection:open_channel(Conn),
declare_exclusive_queues(Ch, 10),
?assertEqual(10, count_queues_in(Config, VHost)),
?awaitMatch(10, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
declare_durable_queues(Ch, 10),
?assertEqual(20, count_queues_in(Config, VHost)),
?awaitMatch(20, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
delete_durable_queues(Ch, 10),
?assertEqual(10, count_queues_in(Config, VHost)),
?awaitMatch(10, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
close_connection_and_channel(Conn, Ch),
?assertEqual(0, count_queues_in(Config, VHost)),
?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
single_node_multiple_vhosts_queue_count(Config) ->
@ -150,8 +153,8 @@ single_node_multiple_vhosts_queue_count(Config) ->
set_up_vhost(Config, VHost1),
set_up_vhost(Config, VHost2),
?assertEqual(0, count_queues_in(Config, VHost1)),
?assertEqual(0, count_queues_in(Config, VHost2)),
?awaitMatch(0, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT),
?awaitMatch(0, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT),
Conn1 = open_unmanaged_connection(Config, 0, VHost1),
{ok, Ch1} = amqp_connection:open_channel(Conn1),
@ -159,17 +162,17 @@ single_node_multiple_vhosts_queue_count(Config) ->
{ok, Ch2} = amqp_connection:open_channel(Conn2),
declare_exclusive_queues(Ch1, 10),
?assertEqual(10, count_queues_in(Config, VHost1)),
?awaitMatch(10, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT),
declare_durable_queues(Ch1, 10),
?assertEqual(20, count_queues_in(Config, VHost1)),
?awaitMatch(20, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT),
delete_durable_queues(Ch1, 10),
?assertEqual(10, count_queues_in(Config, VHost1)),
?awaitMatch(10, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT),
declare_exclusive_queues(Ch2, 30),
?assertEqual(30, count_queues_in(Config, VHost2)),
?awaitMatch(30, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT),
close_connection_and_channel(Conn1, Ch1),
?assertEqual(0, count_queues_in(Config, VHost1)),
?awaitMatch(0, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT),
close_connection_and_channel(Conn2, Ch2),
?assertEqual(0, count_queues_in(Config, VHost2)),
?awaitMatch(0, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT),
rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).
@ -183,7 +186,7 @@ single_node_single_vhost_zero_limit(Config) ->
single_node_single_vhost_limit_with_durable_named_queue(Config) ->
VHost = <<"queue-limits">>,
set_up_vhost(Config, VHost),
?assertEqual(0, count_queues_in(Config, VHost)),
?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
set_vhost_queue_limit(Config, VHost, 3),
Conn = open_unmanaged_connection(Config, 0, VHost),
@ -219,7 +222,7 @@ single_node_single_vhost_zero_limit_with_durable_named_queue(Config) ->
single_node_single_vhost_limit_with(Config, WatermarkLimit) ->
VHost = <<"queue-limits">>,
set_up_vhost(Config, VHost),
?assertEqual(0, count_queues_in(Config, VHost)),
?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
set_vhost_queue_limit(Config, VHost, 3),
Conn = open_unmanaged_connection(Config, 0, VHost),
@ -243,7 +246,7 @@ single_node_single_vhost_limit_with(Config, WatermarkLimit) ->
single_node_single_vhost_zero_limit_with(Config, QueueDeclare) ->
VHost = <<"queue-limits">>,
set_up_vhost(Config, VHost),
?assertEqual(0, count_queues_in(Config, VHost)),
?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
Conn1 = open_unmanaged_connection(Config, 0, VHost),
{ok, Ch1} = amqp_connection:open_channel(Conn1),
@ -272,7 +275,7 @@ single_node_single_vhost_zero_limit_with(Config, QueueDeclare) ->
single_node_single_vhost_limit_with_queue_ttl(Config) ->
VHost = <<"queue-limits">>,
set_up_vhost(Config, VHost),
?assertEqual(0, count_queues_in(Config, VHost)),
?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
Conn1 = open_unmanaged_connection(Config, 0, VHost),
{ok, Ch1} = amqp_connection:open_channel(Conn1),
@ -297,7 +300,10 @@ single_node_single_vhost_limit_with_queue_ttl(Config) ->
{ok, Ch2} = amqp_connection:open_channel(Conn2),
%% wait for the queues to expire
timer:sleep(3000),
?awaitMatch(
[],
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []),
?AWAIT_TIMEOUT),
#'queue.declare_ok'{queue = _} =
amqp_channel:call(Ch2, #'queue.declare'{queue = <<"">>,
@ -309,7 +315,7 @@ single_node_single_vhost_limit_with_queue_ttl(Config) ->
single_node_single_vhost_limit_with_redeclaration(Config) ->
VHost = <<"queue-limits">>,
set_up_vhost(Config, VHost),
?assertEqual(0, count_queues_in(Config, VHost)),
?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
set_vhost_queue_limit(Config, VHost, 3),
Conn1 = open_unmanaged_connection(Config, 0, VHost),
@ -367,55 +373,55 @@ single_node_single_vhost_limit_with_redeclaration(Config) ->
most_basic_cluster_queue_count(Config) ->
VHost = <<"queue-limits">>,
set_up_vhost(Config, VHost),
?assertEqual(0, count_queues_in(Config, VHost, 0)),
?assertEqual(0, count_queues_in(Config, VHost, 1)),
?awaitMatch(0, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
?awaitMatch(0, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT),
Conn1 = open_unmanaged_connection(Config, 0, VHost),
{ok, Ch1} = amqp_connection:open_channel(Conn1),
declare_exclusive_queues(Ch1, 10),
?assertEqual(10, count_queues_in(Config, VHost, 0)),
?assertEqual(10, count_queues_in(Config, VHost, 1)),
?awaitMatch(10, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
?awaitMatch(10, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT),
Conn2 = open_unmanaged_connection(Config, 0, VHost),
{ok, Ch2} = amqp_connection:open_channel(Conn2),
declare_exclusive_queues(Ch2, 15),
?assertEqual(25, count_queues_in(Config, VHost, 0)),
?assertEqual(25, count_queues_in(Config, VHost, 1)),
?awaitMatch(25, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
?awaitMatch(25, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT),
close_connection_and_channel(Conn1, Ch1),
close_connection_and_channel(Conn2, Ch2),
?assertEqual(0, count_queues_in(Config, VHost, 0)),
?assertEqual(0, count_queues_in(Config, VHost, 1)),
?awaitMatch(0, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
?awaitMatch(0, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT),
rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
cluster_node_restart_queue_count(Config) ->
VHost = <<"queue-limits">>,
set_up_vhost(Config, VHost),
?assertEqual(0, count_queues_in(Config, VHost, 0)),
?assertEqual(0, count_queues_in(Config, VHost, 1)),
?awaitMatch(0, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
?awaitMatch(0, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT),
Conn1 = open_unmanaged_connection(Config, 0, VHost),
{ok, Ch1} = amqp_connection:open_channel(Conn1),
declare_exclusive_queues(Ch1, 10),
?assertEqual(10, count_queues_in(Config, VHost, 0)),
?assertEqual(10, count_queues_in(Config, VHost, 1)),
?awaitMatch(10, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
?awaitMatch(10, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT),
rabbit_ct_broker_helpers:restart_broker(Config, 0),
?assertEqual(0, count_queues_in(Config, VHost, 0)),
?awaitMatch(0, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
Conn2 = open_unmanaged_connection(Config, 1, VHost),
{ok, Ch2} = amqp_connection:open_channel(Conn2),
declare_exclusive_queues(Ch2, 15),
?assertEqual(15, count_queues_in(Config, VHost, 0)),
?assertEqual(15, count_queues_in(Config, VHost, 1)),
?awaitMatch(15, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
?awaitMatch(15, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT),
declare_durable_queues(Ch2, 10),
?assertEqual(25, count_queues_in(Config, VHost, 0)),
?assertEqual(25, count_queues_in(Config, VHost, 1)),
?awaitMatch(25, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
?awaitMatch(25, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT),
rabbit_ct_broker_helpers:restart_broker(Config, 1),
?assertEqual(10, count_queues_in(Config, VHost, 0)),
?assertEqual(10, count_queues_in(Config, VHost, 1)),
?awaitMatch(10, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
?awaitMatch(10, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT),
rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
@ -425,28 +431,28 @@ cluster_multiple_vhosts_queue_count(Config) ->
set_up_vhost(Config, VHost1),
set_up_vhost(Config, VHost2),
?assertEqual(0, count_queues_in(Config, VHost1)),
?assertEqual(0, count_queues_in(Config, VHost2)),
?awaitMatch(0, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT),
?awaitMatch(0, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT),
Conn1 = open_unmanaged_connection(Config, 0, VHost1),
{ok, Ch1} = amqp_connection:open_channel(Conn1),
declare_exclusive_queues(Ch1, 10),
?assertEqual(10, count_queues_in(Config, VHost1, 0)),
?assertEqual(10, count_queues_in(Config, VHost1, 1)),
?assertEqual(0, count_queues_in(Config, VHost2, 0)),
?assertEqual(0, count_queues_in(Config, VHost2, 1)),
?awaitMatch(10, count_queues_in(Config, VHost1, 0), ?AWAIT_TIMEOUT),
?awaitMatch(10, count_queues_in(Config, VHost1, 1), ?AWAIT_TIMEOUT),
?awaitMatch(0, count_queues_in(Config, VHost2, 0), ?AWAIT_TIMEOUT),
?awaitMatch(0, count_queues_in(Config, VHost2, 1), ?AWAIT_TIMEOUT),
Conn2 = open_unmanaged_connection(Config, 0, VHost2),
{ok, Ch2} = amqp_connection:open_channel(Conn2),
declare_exclusive_queues(Ch2, 15),
?assertEqual(15, count_queues_in(Config, VHost2, 0)),
?assertEqual(15, count_queues_in(Config, VHost2, 1)),
?awaitMatch(15, count_queues_in(Config, VHost2, 0), ?AWAIT_TIMEOUT),
?awaitMatch(15, count_queues_in(Config, VHost2, 1), ?AWAIT_TIMEOUT),
close_connection_and_channel(Conn1, Ch1),
close_connection_and_channel(Conn2, Ch2),
?assertEqual(0, count_queues_in(Config, VHost1, 0)),
?assertEqual(0, count_queues_in(Config, VHost1, 1)),
?assertEqual(0, count_queues_in(Config, VHost2, 0)),
?assertEqual(0, count_queues_in(Config, VHost2, 1)),
?awaitMatch(0, count_queues_in(Config, VHost1, 0), ?AWAIT_TIMEOUT),
?awaitMatch(0, count_queues_in(Config, VHost1, 1), ?AWAIT_TIMEOUT),
?awaitMatch(0, count_queues_in(Config, VHost2, 0), ?AWAIT_TIMEOUT),
?awaitMatch(0, count_queues_in(Config, VHost2, 1), ?AWAIT_TIMEOUT),
rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).
@ -460,8 +466,8 @@ cluster_multiple_vhosts_limit_with(Config, WatermarkLimit) ->
VHost2 = <<"queue-limits2">>,
set_up_vhost(Config, VHost1),
set_up_vhost(Config, VHost2),
?assertEqual(0, count_queues_in(Config, VHost1)),
?assertEqual(0, count_queues_in(Config, VHost2)),
?awaitMatch(0, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT),
?awaitMatch(0, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT),
set_vhost_queue_limit(Config, VHost1, 3),
set_vhost_queue_limit(Config, VHost2, 3),
@ -510,8 +516,8 @@ cluster_multiple_vhosts_limit_with_durable_named_queue(Config) ->
VHost2 = <<"queue-limits2">>,
set_up_vhost(Config, VHost1),
set_up_vhost(Config, VHost2),
?assertEqual(0, count_queues_in(Config, VHost1)),
?assertEqual(0, count_queues_in(Config, VHost2)),
?awaitMatch(0, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT),
?awaitMatch(0, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT),
set_vhost_queue_limit(Config, VHost1, 3),
set_vhost_queue_limit(Config, VHost2, 3),
@ -574,8 +580,8 @@ cluster_multiple_vhosts_zero_limit_with(Config, QueueDeclare) ->
VHost2 = <<"queue-limits2">>,
set_up_vhost(Config, VHost1),
set_up_vhost(Config, VHost2),
?assertEqual(0, count_queues_in(Config, VHost1)),
?assertEqual(0, count_queues_in(Config, VHost2)),
?awaitMatch(0, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT),
?awaitMatch(0, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT),
Conn1 = open_unmanaged_connection(Config, 0, VHost1),
{ok, Ch1} = amqp_connection:open_channel(Conn1),
@ -640,7 +646,6 @@ set_vhost_queue_limit(Config, NodeIndex, VHost, Count) ->
count_queues_in(Config, VHost) ->
count_queues_in(Config, VHost, 0).
count_queues_in(Config, VHost, NodeIndex) ->
timer:sleep(200),
rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
rabbit_amqqueue,
count, [VHost]).

View File

@ -26,6 +26,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-compile(export_all).
@ -203,7 +204,7 @@ declare_policy_exactly(Config) ->
Node0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
rabbit_ct_broker_helpers:control_action(sync_queue, Node0,
[binary_to_list(Q)], [{"-p", "/"}]),
wait_for_sync(Config, Node0, QueueRes, 1),
?awaitMatch(true, synced(Config, Node0, QueueRes, 1), 60000),
{ok, Queue} = rabbit_ct_broker_helpers:rpc(Config, Node0,
rabbit_amqqueue, lookup, [QueueRes]),
@ -459,18 +460,6 @@ set_location_policy(Config, Name, Strategy) ->
ok = rabbit_ct_broker_helpers:set_policy(Config, 0,
Name, <<".*">>, <<"queues">>, [{<<"queue-master-locator">>, Strategy}]).
wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen) ->
wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen, 600).
wait_for_sync(_, _, _, _, 0) ->
throw(sync_timeout);
wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen, N) ->
case synced(Config, Nodename, Q, ExpectedSSPidLen) of
true -> ok;
false -> timer:sleep(100),
wait_for_sync(Config, Nodename, Q, ExpectedSSPidLen, N-1)
end.
synced(Config, Nodename, Q, ExpectedSSPidLen) ->
Args = [<<"/">>, [name, synchronised_slave_pids]],
Info = rabbit_ct_broker_helpers:rpc(Config, Nodename,

View File

@ -1401,7 +1401,7 @@ publishing_to_unavailable_queue(Config) ->
exit(confirm_timeout)
end,
ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
timer:sleep(2000),
?awaitMatch(2, count_online_nodes(Server, <<"/">>, QQ), ?DEFAULT_AWAIT),
publish_many(Ch, QQ, 1),
%% this should now be acked
ok = receive
@ -1841,23 +1841,25 @@ delete_member_queue_not_found(Config) ->
[<<"/">>, QQ, Server])).
delete_member(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
NServers = length(Servers),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
timer:sleep(100),
?awaitMatch(NServers, count_online_nodes(Server, <<"/">>, QQ), ?DEFAULT_AWAIT),
?assertEqual(ok,
rpc:call(Server, rabbit_quorum_queue, delete_member,
[<<"/">>, QQ, Server])).
delete_member_not_a_member(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
NServers = length(Servers),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
timer:sleep(100),
?awaitMatch(NServers, count_online_nodes(Server, <<"/">>, QQ), ?DEFAULT_AWAIT),
?assertEqual(ok,
rpc:call(Server, rabbit_quorum_queue, delete_member,
[<<"/">>, QQ, Server])),
@ -1875,7 +1877,7 @@ delete_member_during_node_down(Config) ->
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
timer:sleep(200),
?awaitMatch(2, count_online_nodes(Server, <<"/">>, QQ), ?DEFAULT_AWAIT),
?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, delete_member,
[<<"/">>, QQ, Remove])),
@ -1889,11 +1891,12 @@ delete_member_during_node_down(Config) ->
node_removal_is_quorum_critical(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
NServers = length(Servers),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QName = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QName, 0, 0},
declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
timer:sleep(100),
?awaitMatch(NServers, count_online_nodes(Server, <<"/">>, QName), ?DEFAULT_AWAIT),
[begin
Qs = rpc:call(S, rabbit_quorum_queue, list_with_minimum_quorum, []),
?assertEqual([QName], queue_names(Qs))
@ -1905,7 +1908,7 @@ node_removal_is_not_quorum_critical(Config) ->
QName = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QName, 0, 0},
declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
timer:sleep(100),
?awaitMatch(3, count_online_nodes(Server, <<"/">>, QName), ?DEFAULT_AWAIT),
Qs = rpc:call(Server, rabbit_quorum_queue, list_with_minimum_quorum, []),
?assertEqual([], Qs).
@ -1975,7 +1978,7 @@ cleanup_data_dir(Config) ->
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
timer:sleep(100),
?awaitMatch(2, count_online_nodes(Server1, <<"/">>, QQ), ?DEFAULT_AWAIT),
UId1 = proplists:get_value(ra_name(QQ), rpc:call(Server1, ra_directory, list_registered, [quorum_queues])),
UId2 = proplists:get_value(ra_name(QQ), rpc:call(Server2, ra_directory, list_registered, [quorum_queues])),
@ -2406,13 +2409,11 @@ consume_redelivery_count(Config) ->
multiple = false,
requeue = true}),
%% wait for requeuing
timer:sleep(500),
{#'basic.get_ok'{delivery_tag = DeliveryTag1,
redelivered = true},
#amqp_msg{props = #'P_basic'{headers = H1}}} =
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = false}),
basic_get(Ch, QQ, false, 300),
?assertMatch({DCHeader, _, 1}, rabbit_basic:header(DCHeader, H1)),
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
multiple = false,
@ -2497,9 +2498,16 @@ memory_alarm_rolls_wal(Config) ->
rabbit_ct_helpers:await_condition(
fun() -> rabbit_ct_broker_helpers:get_alarms(Config, Server) =/= [] end
),
timer:sleep(1000),
[Wal1] = filelib:wildcard(WalDataDir ++ "/*.wal"),
?assert(Wal0 =/= Wal1),
rabbit_ct_helpers:await_condition(
fun() ->
List = filelib:wildcard(WalDataDir ++ "/*.wal"),
%% There is a small time window where there could be no
%% file, but we need to wait for it to ensure it is not
%% roll over again later on
[Wal0] =/= List andalso [] =/= List
end, 30000),
Wal1 = lists:last(lists:sort(filelib:wildcard(WalDataDir ++ "/*.wal"))),
%% roll over shouldn't happen if we trigger a new alarm in less than
%% min_wal_roll_over_interval
rabbit_ct_broker_helpers:set_alarm(Config, Server, memory),
@ -2507,7 +2515,7 @@ memory_alarm_rolls_wal(Config) ->
fun() -> rabbit_ct_broker_helpers:get_alarms(Config, Server) =/= [] end
),
timer:sleep(1000),
[Wal2] = filelib:wildcard(WalDataDir ++ "/*.wal"),
Wal2 = lists:last(lists:sort(filelib:wildcard(WalDataDir ++ "/*.wal"))),
?assert(Wal1 == Wal2),
lists:foreach(fun (Node) ->
ok = rabbit_ct_broker_helpers:clear_alarm(Config, Node, memory)
@ -2676,16 +2684,42 @@ message_ttl(Config) ->
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-message-ttl">>, long, 2000}])),
%% Checking for messages that are short-lived could cause intermitent
%% failures, as these could have expired before the check takes
%% place. Thus, we're goig to split this testcase in two:
%% 1. Check that messages published with ttl reach the queue
%% This can be easily achieved by having a consumer already
%% subscribed to the queue.
%% 2. Check that messages published eventually disappear from the
%% queue.
Msg1 = <<"msg1">>,
Msg2 = <<"msg11">>,
%% 1. Subscribe, publish and consume two messages
subscribe(Ch, QQ, false),
publish(Ch, QQ, Msg1),
publish(Ch, QQ, Msg2),
wait_for_messages(Config, [[QQ, <<"2">>, <<"0">>, <<"2">>]]),
receive_and_ack(Ch),
receive_and_ack(Ch),
cancel(Ch),
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
%% 2. Publish two messages and wait until queue is empty
publish(Ch, QQ, Msg1),
publish(Ch, QQ, Msg2),
wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
timer:sleep(2000),
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
ok.
receive_and_ack(Ch) ->
receive
{#'basic.deliver'{delivery_tag = DeliveryTag,
redelivered = false}, _} ->
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = false})
end.
message_ttl_policy(Config) ->
%% Using ttl is very difficul to guarantee 100% test rate success, unless
%% using really high ttl values. Previously, this test used 1s and 3s ttl,
@ -2824,12 +2858,17 @@ consumer_metrics(Config) ->
RaName = ra_name(QQ),
{ok, _, {_, Leader}} = ra:members({RaName, Server}),
timer:sleep(5000),
QNameRes = rabbit_misc:r(<<"/">>, queue, QQ),
[{_, PropList, _}] = rpc:call(Leader, ets, lookup, [queue_metrics, QNameRes]),
?assertMatch([{consumers, 1}], lists:filter(fun({Key, _}) ->
Key == consumers
end, PropList)).
?awaitMatch(1,
begin
case rpc:call(Leader, ets, lookup, [queue_metrics, QNameRes]) of
[{_, PropList, _}] ->
proplists:get_value(consumers, PropList, undefined);
_ ->
undefined
end
end,
30000).
delete_if_empty(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
@ -3271,6 +3310,11 @@ get_queue_type(Server, VHost, Q0) ->
{ok, Q1} = rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]),
amqqueue:get_type(Q1).
count_online_nodes(Server, VHost, Q0) ->
QNameRes = rabbit_misc:r(VHost, queue, Q0),
Info = rpc:call(Server, rabbit_quorum_queue, infos, [QNameRes]),
length(proplists:get_value(online, Info, [])).
publish_many(Ch, Queue, Count) ->
[publish(Ch, Queue) || _ <- lists:seq(1, Count)].

View File

@ -416,6 +416,7 @@ add_replicas(Config) ->
[<<"/">>, Q, Server1])),
timer:sleep(1000),
check_leader_and_replicas(Config, [Server0, Server1]),
%% it is almost impossible to reliably catch this situation.
%% increasing number of messages published and the data size could help
@ -424,6 +425,8 @@ add_replicas(Config) ->
rpc:call(Server0, rabbit_stream_queue, add_replica,
[<<"/">>, Q, Server2])),
timer:sleep(1000),
check_leader_and_replicas(Config, [Server0, Server1, Server2]),
%% validate we can read the last entry
qos(Ch, 10, false),
amqp_channel:subscribe(
@ -998,22 +1001,23 @@ consume_timestamp_last_offset(Config) ->
%% Subscribe from now/future
Offset = erlang:system_time(second) + 60,
CTag = <<"consume_timestamp_last_offset">>,
amqp_channel:subscribe(
Ch1,
#'basic.consume'{queue = Q,
no_ack = false,
consumer_tag = <<"ctag">>,
consumer_tag = CTag,
arguments = [{<<"x-stream-offset">>, timestamp, Offset}]},
self()),
receive
#'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
#'basic.consume_ok'{consumer_tag = CTag} ->
ok
after 5000 ->
exit(missing_consume_ok)
end,
receive
{_,
{#'basic.deliver'{consumer_tag = CTag},
#amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S}]}}}
when S < 100 ->
exit({unexpected_offset, S})
@ -1770,12 +1774,10 @@ max_age(Config) ->
[publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)],
amqp_channel:wait_for_confirms(Ch, 5),
timer:sleep(5000),
%% Let's give it some margin if some messages fall between segments
quorum_queue_utils:wait_for_min_messages(Config, Q, 100),
quorum_queue_utils:wait_for_max_messages(Config, Q, 150),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 200, false),
subscribe(Ch1, Q, false, 0),
?assertEqual(100, length(receive_batch())),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
replica_recovery(Config) ->
@ -1834,7 +1836,6 @@ leader_failover(Config) ->
publish_confirm(Ch1, Q, [<<"msg">> || _ <- lists:seq(1, 100)]),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
timer:sleep(30000),
rabbit_ct_helpers:await_condition(
fun () ->
@ -1842,7 +1843,7 @@ leader_failover(Config) ->
NewLeader = proplists:get_value(leader, Info),
NewLeader =/= Server1
end),
end, 45000),
ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
@ -2082,7 +2083,8 @@ leader_locator_balanced_maintenance(Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
select_nodes_with_least_replicas(Config) ->
[Server1 | _ ] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[Server1 | _ ] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Servers = lists:usort(Servers0),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q = ?config(queue_name, Config),
Bin = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
@ -2102,8 +2104,9 @@ select_nodes_with_least_replicas(Config) ->
end, Qs),
%% We expect that the second stream chose nodes where the first stream does not have replicas.
?assertEqual(lists:usort(Servers),
lists:usort(Q1Members ++ QMembers)),
?awaitMatch(Servers,
lists:usort(Q1Members ++ QMembers),
30000),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, [Qs]).
leader_locator_policy(Config) ->
@ -2209,13 +2212,15 @@ max_age_policy(Config) ->
Config, 0, PolicyName, <<"max_age_policy.*">>, <<"queues">>,
[{<<"max-age">>, <<"1Y">>}]),
%% Policies are asynchronous, must wait until it has been applied everywhere
ensure_retention_applied(Config, Server),
?awaitMatch(
{PolicyName, '', [{<<"max-age">>, <<"1Y">>}]},
begin
Info = find_queue_info(Config, [policy, operator_policy, effective_policy_definition]),
?assertEqual(PolicyName, proplists:get_value(policy, Info)),
?assertEqual('', proplists:get_value(operator_policy, Info)),
?assertEqual([{<<"max-age">>, <<"1Y">>}],
proplists:get_value(effective_policy_definition, Info)),
{proplists:get_value(policy, Info), proplists:get_value(operator_policy, Info), proplists:get_value(effective_policy_definition, Info)}
end,
30000),
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, PolicyName),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).

View File

@ -12,6 +12,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("kernel/include/file.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-compile(export_all).
@ -71,7 +72,7 @@ app_management(Config) ->
?MODULE, app_management1, [Config]).
app_management1(_Config) ->
wait_for_application(rabbit),
?awaitMatch(true, lists:keymember(rabbit, 1, application:which_applications()), 30000),
%% Starting, stopping and diagnostics. Note that we don't try
%% 'report' when the rabbit app is stopped and that we enable
%% tracing for the duration of this function.
@ -92,22 +93,6 @@ no_exceptions(Mod, Fun, Args) ->
catch Type:Ex -> {Type, Ex}
end.
wait_for_application(Application) ->
wait_for_application(Application, 5000).
wait_for_application(_, Time) when Time =< 0 ->
{error, timeout};
wait_for_application(Application, Time) ->
Interval = 100,
case lists:keyfind(Application, 1, application:which_applications()) of
false ->
timer:sleep(Interval),
wait_for_application(Application, Time - Interval);
_ -> ok
end.
%% -------------------------------------------------------------------
%% Log management.
%% -------------------------------------------------------------------
@ -245,8 +230,9 @@ non_empty_files(Files) ->
test_logs_working(LogFiles) ->
ok = rabbit_log:error("Log a test message"),
%% give the error loggers some time to catch up
timer:sleep(1000),
?awaitMatch(true,
lists:all(fun(LogFile) -> [true] =:= non_empty_files([LogFile]) end, LogFiles),
30000),
ok.
set_permissions(Path, Mode) ->

View File

@ -7,12 +7,15 @@
-module(vhost_SUITE).
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-define(AWAIT_TIMEOUT, 30000).
all() ->
[
{group, cluster_size_1_network},
@ -129,21 +132,20 @@ single_node_vhost_deletion_forces_connection_closure(Config) ->
set_up_vhost(Config, VHost1),
set_up_vhost(Config, VHost2),
?assertEqual(0, count_connections_in(Config, VHost1)),
?assertEqual(0, count_connections_in(Config, VHost2)),
?awaitMatch(0, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT),
?awaitMatch(0, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT),
[Conn1] = open_connections(Config, [{0, VHost1}]),
?assertEqual(1, count_connections_in(Config, VHost1)),
?awaitMatch(1, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT),
[_Conn2] = open_connections(Config, [{0, VHost2}]),
?assertEqual(1, count_connections_in(Config, VHost2)),
?awaitMatch(1, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT),
rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
timer:sleep(200),
?assertEqual(0, count_connections_in(Config, VHost2)),
?awaitMatch(0, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT),
close_connections([Conn1]),
?assertEqual(0, count_connections_in(Config, VHost1)).
?awaitMatch(0, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT).
vhost_failure_forces_connection_closure(Config) ->
VHost1 = <<"vhost1">>,
@ -152,21 +154,20 @@ vhost_failure_forces_connection_closure(Config) ->
set_up_vhost(Config, VHost1),
set_up_vhost(Config, VHost2),
?assertEqual(0, count_connections_in(Config, VHost1)),
?assertEqual(0, count_connections_in(Config, VHost2)),
?awaitMatch(0, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT),
?awaitMatch(0, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT),
[Conn1] = open_connections(Config, [{0, VHost1}]),
?assertEqual(1, count_connections_in(Config, VHost1)),
?awaitMatch(1, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT),
[_Conn2] = open_connections(Config, [{0, VHost2}]),
?assertEqual(1, count_connections_in(Config, VHost2)),
?awaitMatch(1, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT),
rabbit_ct_broker_helpers:force_vhost_failure(Config, VHost2),
timer:sleep(200),
?assertEqual(0, count_connections_in(Config, VHost2)),
?awaitMatch(0, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT),
close_connections([Conn1]),
?assertEqual(0, count_connections_in(Config, VHost1)).
?awaitMatch(0, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT).
vhost_failure_forces_connection_closure_on_failure_node(Config) ->
@ -176,25 +177,24 @@ vhost_failure_forces_connection_closure_on_failure_node(Config) ->
set_up_vhost(Config, VHost1),
set_up_vhost(Config, VHost2),
?assertEqual(0, count_connections_in(Config, VHost1)),
?assertEqual(0, count_connections_in(Config, VHost2)),
?awaitMatch(0, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT),
?awaitMatch(0, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT),
[Conn1] = open_connections(Config, [{0, VHost1}]),
?assertEqual(1, count_connections_in(Config, VHost1)),
?awaitMatch(1, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT),
[_Conn20] = open_connections(Config, [{0, VHost2}]),
[_Conn21] = open_connections(Config, [{1, VHost2}]),
?assertEqual(2, count_connections_in(Config, VHost2)),
?awaitMatch(2, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT),
rabbit_ct_broker_helpers:force_vhost_failure(Config, 0, VHost2),
timer:sleep(200),
%% Vhost2 connection on node 1 is still alive
?assertEqual(1, count_connections_in(Config, VHost2)),
?awaitMatch(1, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT),
%% Vhost1 connection on node 0 is still alive
?assertEqual(1, count_connections_in(Config, VHost1)),
?awaitMatch(1, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT),
close_connections([Conn1]),
?assertEqual(0, count_connections_in(Config, VHost1)).
?awaitMatch(0, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT).
cluster_vhost_deletion_forces_connection_closure(Config) ->
@ -204,21 +204,20 @@ cluster_vhost_deletion_forces_connection_closure(Config) ->
set_up_vhost(Config, VHost1),
set_up_vhost(Config, VHost2),
?assertEqual(0, count_connections_in(Config, VHost1)),
?assertEqual(0, count_connections_in(Config, VHost2)),
?awaitMatch(0, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT),
?awaitMatch(0, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT),
[Conn1] = open_connections(Config, [{0, VHost1}]),
?assertEqual(1, count_connections_in(Config, VHost1)),
?awaitMatch(1, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT),
[_Conn2] = open_connections(Config, [{1, VHost2}]),
?assertEqual(1, count_connections_in(Config, VHost2)),
?awaitMatch(1, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT),
rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
timer:sleep(200),
?assertEqual(0, count_connections_in(Config, VHost2)),
?awaitMatch(0, count_connections_in(Config, VHost2), ?AWAIT_TIMEOUT),
close_connections([Conn1]),
?assertEqual(0, count_connections_in(Config, VHost1)).
?awaitMatch(0, count_connections_in(Config, VHost1), ?AWAIT_TIMEOUT).
node_starts_with_dead_vhosts(Config) ->
VHost1 = <<"vhost1">>,
@ -244,10 +243,11 @@ node_starts_with_dead_vhosts(Config) ->
%% The node should start without a vhost
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
timer:sleep(3000),
?assertEqual(true, rabbit_ct_broker_helpers:rpc(Config, 1,
rabbit_vhost_sup_sup, is_vhost_alive, [VHost2])).
?awaitMatch(
true,
rabbit_ct_broker_helpers:rpc(Config, 1,
rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]),
?AWAIT_TIMEOUT).
node_starts_with_dead_vhosts_with_mirrors(Config) ->
VHost1 = <<"vhost1">>,
@ -274,7 +274,15 @@ node_starts_with_dead_vhosts_with_mirrors(Config) ->
0, <<"queues">>, <<"acting-user">>]),
%% Wait for the queue to start a mirror
timer:sleep(500),
?awaitMatch([_],
begin
{ok, Q0} = rabbit_ct_broker_helpers:rpc(
Config, 0,
rabbit_amqqueue, lookup,
[rabbit_misc:r(VHost1, queue, QName)], infinity),
amqqueue:get_sync_slave_pids(Q0)
end,
?AWAIT_TIMEOUT),
rabbit_ct_client_helpers:publish(Chan, QName, 10),
@ -299,7 +307,9 @@ node_starts_with_dead_vhosts_with_mirrors(Config) ->
%% The node should start without a vhost
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
timer:sleep(3000),
?awaitMatch(true,
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit, is_running, []),
?AWAIT_TIMEOUT),
?assertEqual(true, rabbit_ct_broker_helpers:rpc(Config, 1,
rabbit_vhost_sup_sup, is_vhost_alive, [VHost2])).
@ -435,27 +445,23 @@ open_connections(Config, NodesAndVHosts) ->
network -> open_unmanaged_connection;
direct -> open_unmanaged_connection_direct
end,
Conns = lists:map(fun
lists:map(fun
({Node, VHost}) ->
rabbit_ct_client_helpers:OpenConnectionFun(Config, Node,
VHost);
(Node) ->
rabbit_ct_client_helpers:OpenConnectionFun(Config, Node)
end, NodesAndVHosts),
timer:sleep(500),
Conns.
end, NodesAndVHosts).
close_connections(Conns) ->
lists:foreach(fun
(Conn) ->
rabbit_ct_client_helpers:close_connection(Conn)
end, Conns),
timer:sleep(500).
end, Conns).
count_connections_in(Config, VHost) ->
count_connections_in(Config, VHost, 0).
count_connections_in(Config, VHost, NodeIndex) ->
timer:sleep(200),
rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
rabbit_connection_tracking,
count_tracked_items_in, [{vhost, VHost}]).