speed up quorum_queues_SUITE

AFTER: gmake -C deps/rabbit ct-quorum_queue  6.15s user 4.25s system 2% cpu 6:25.29 total
This commit is contained in:
Karl Nilsson 2024-06-24 12:31:59 +01:00
parent 3551309baf
commit f919fee7f1
1 changed files with 40 additions and 42 deletions

View File

@ -25,6 +25,8 @@
-compile([nowarn_export_all, export_all]).
-define(NET_TICKTIME_S, 5).
-define(DEFAULT_AWAIT, 10_000).
suite() ->
@ -206,7 +208,8 @@ init_per_group(clustered_with_partitions, Config0) ->
Config1 = rabbit_ct_helpers:run_setup_steps(
Config0,
[fun rabbit_ct_broker_helpers:configure_dist_proxy/1]),
Config2 = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]),
Config2 = rabbit_ct_helpers:set_config(Config1,
[{net_ticktime, ?NET_TICKTIME_S}]),
Config2
end;
init_per_group(Group, Config) ->
@ -225,10 +228,10 @@ init_per_group(Group, Config) ->
Config1 = rabbit_ct_helpers:set_config(Config,
[{rmq_nodes_count, ClusterSize},
{rmq_nodename_suffix, Group},
{tcp_ports_base, {skip_n_nodes, ClusterSize}}
{tcp_ports_base, {skip_n_nodes, ClusterSize}},
{net_ticktime, ?NET_TICKTIME_S}
]),
Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]),
Ret = rabbit_ct_helpers:run_steps(Config1b,
Ret = rabbit_ct_helpers:run_steps(Config1,
[fun merge_app_env/1 ] ++
rabbit_ct_broker_helpers:setup_steps()),
case Ret of
@ -238,10 +241,6 @@ init_per_group(Group, Config) ->
ok = rabbit_ct_broker_helpers:rpc(
Config2, 0, application, set_env,
[rabbit, channel_tick_interval, 100]),
%% HACK: the larger cluster sizes benefit for a bit
%% more time after clustering before running the
%% tests.
timer:sleep(ClusterSize * 1000),
Config2
end
end.
@ -580,7 +579,7 @@ start_queue_concurrent(Config) ->
[{<<"x-queue-type">>,
longstr,
<<"quorum">>}])),
timer:sleep(500),
timer:sleep(100),
rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
Self ! {done, Server}
end)
@ -740,7 +739,10 @@ server_system_recover(Config) ->
%% validate quorum queue is still functional
?awaitMatch({ok, _, _},
begin
ra:members({RaName, Server})
%% there is a small chance that a quorum queue process will crash
%% due to missing ETS table, in this case we need to keep
%% retrying awaiting the restart
catch ra:members({RaName, Server})
end, ?DEFAULT_AWAIT),
ok.
@ -978,17 +980,15 @@ consume_in_minority(Config) ->
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
ok = rabbit_control_helper:command(stop_app, Server1),
ok = rabbit_control_helper:command(stop_app, Server2),
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = false})),
ok = rabbit_ct_broker_helpers:async_start_node(Config, Server1),
ok = rabbit_ct_broker_helpers:async_start_node(Config, Server2),
ok = rabbit_ct_broker_helpers:wait_for_async_start_node(Server1),
ok = rabbit_ct_broker_helpers:wait_for_async_start_node(Server2),
ok = rabbit_control_helper:command(start_app, Server1),
ok = rabbit_control_helper:command(start_app, Server2),
ok.
reject_after_leader_transfer(Config) ->
@ -1427,17 +1427,19 @@ recover_from_multiple_failures(Config) ->
wait_for_messages_pending_ack(Servers, RaName, 0).
publishing_to_unavailable_queue(Config) ->
%% publishing to an unavialable queue but with a reachable member should result
%% in the initial enqueuer session timing out and the message being nacked
[Server, Server1, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
%% publishing to an unavailable queue but with a reachable member should result
%% in the initial enqueuer command that is send syncronously to set up
%% the enqueuer session timing out and the message being nacked
[Server, Server1, Server2] = Servers =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
TCh = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(TCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
ok = rabbit_control_helper:command(stop_app, Server1),
ok = rabbit_control_helper:command(stop_app, Server2),
assert_cluster_status({Servers, Servers, [Server]}, [Server]),
ct:pal("opening channel to ~w", [Server]),
@ -1452,31 +1454,33 @@ publishing_to_unavailable_queue(Config) ->
after 90000 ->
exit(confirm_timeout)
end,
ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
ok = rabbit_control_helper:command(start_app, Server1),
?awaitMatch(2, count_online_nodes(Server, <<"/">>, QQ), ?DEFAULT_AWAIT),
publish_many(Ch, QQ, 1),
%% this should now be acked
%% check we get at least on ack
ok = receive
#'basic.ack'{} -> ok;
#'basic.nack'{} -> fail
after 90000 ->
exit(confirm_timeout)
end,
%% check we get at least on ack
ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
flush(1),
ok = rabbit_control_helper:command(start_app, Server2),
ok.
leadership_takeover(Config) ->
%% Kill nodes in succession forcing the takeover of leadership, and all messages that
%% are in the queue.
[Server, Server1, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[Server, Server1, Server2] = Servers =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
ok = rabbit_control_helper:command(stop_app, Server1),
Running = Servers -- [Server1],
assert_cluster_status({Servers, Servers, Running}, Running),
@ -1489,18 +1493,17 @@ leadership_takeover(Config) ->
wait_for_messages_ready([Server], RaName, 3),
wait_for_messages_pending_ack([Server], RaName, 0),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server),
ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
ok = rabbit_ct_broker_helpers:start_node(Config, Server),
ok = rabbit_control_helper:command(stop_app, Server2),
ok = rabbit_control_helper:command(start_app, Server1),
ok = rabbit_control_helper:command(stop_app, Server),
ok = rabbit_control_helper:command(start_app, Server2),
ok = rabbit_control_helper:command(stop_app, Server1),
ok = rabbit_control_helper:command(start_app, Server),
wait_for_messages_ready([Server2, Server], RaName, 3),
wait_for_messages_pending_ack([Server2, Server], RaName, 0),
ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
ok = rabbit_control_helper:command(start_app, Server1),
wait_for_messages_ready(Servers, RaName, 3),
wait_for_messages_pending_ack(Servers, RaName, 0).
@ -2268,7 +2271,6 @@ reconnect_consumer_and_wait_channel_down(Config) ->
%% Let's give it a few seconds to ensure it doesn't attempt to
%% deliver to the down channel - it shouldn't be monitored
%% at this time!
timer:sleep(5000),
wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0).
@ -3067,11 +3069,10 @@ per_message_ttl(Config) ->
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{props = #'P_basic'{delivery_mode = 2,
expiration = <<"2000">>},
expiration = <<"1000">>},
payload = Msg1}),
amqp_channel:wait_for_confirms(Ch, 5),
%% we know the message got to the queue in 2s it should be gone
timer:sleep(2000),
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
ok.
@ -3099,8 +3100,6 @@ per_message_ttl_mixed_expiry(Config) ->
payload = Msg2}),
wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
timer:sleep(1000),
wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
subscribe(Ch, QQ, false),
receive
@ -3593,7 +3592,6 @@ select_nodes_with_least_replicas_node_down(Config) ->
Qs = [?config(queue_name, Config),
?config(alt_queue_name, Config)],
timer:sleep(1000),
Members = [begin
?assertMatch({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q,
@ -3710,7 +3708,7 @@ nack(Ch, Multiple, Requeue) ->
end.
wait_for_cleanup(Server, Channel, Number) ->
wait_for_cleanup(Server, Channel, Number, 60).
wait_for_cleanup(Server, Channel, Number, 120).
wait_for_cleanup(Server, Channel, Number, 0) ->
?assertEqual(length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])),
@ -3720,7 +3718,7 @@ wait_for_cleanup(Server, Channel, Number, N) ->
Length when Number == Length ->
ok;
_ ->
timer:sleep(500),
timer:sleep(250),
wait_for_cleanup(Server, Channel, Number, N - 1)
end.