From f919fee7f1e3613caa78cb5362cf7819b8617348 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Mon, 24 Jun 2024 12:31:59 +0100 Subject: [PATCH] speed up quorum_queues_SUITE AFTER: gmake -C deps/rabbit ct-quorum_queue 6.15s user 4.25s system 2% cpu 6:25.29 total --- deps/rabbit/test/quorum_queue_SUITE.erl | 82 ++++++++++++------------- 1 file changed, 40 insertions(+), 42 deletions(-) diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 726f9bb378..4ecbe3a705 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -25,6 +25,8 @@ -compile([nowarn_export_all, export_all]). + +-define(NET_TICKTIME_S, 5). -define(DEFAULT_AWAIT, 10_000). suite() -> @@ -206,7 +208,8 @@ init_per_group(clustered_with_partitions, Config0) -> Config1 = rabbit_ct_helpers:run_setup_steps( Config0, [fun rabbit_ct_broker_helpers:configure_dist_proxy/1]), - Config2 = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]), + Config2 = rabbit_ct_helpers:set_config(Config1, + [{net_ticktime, ?NET_TICKTIME_S}]), Config2 end; init_per_group(Group, Config) -> @@ -225,10 +228,10 @@ init_per_group(Group, Config) -> Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, ClusterSize}, {rmq_nodename_suffix, Group}, - {tcp_ports_base, {skip_n_nodes, ClusterSize}} + {tcp_ports_base, {skip_n_nodes, ClusterSize}}, + {net_ticktime, ?NET_TICKTIME_S} ]), - Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]), - Ret = rabbit_ct_helpers:run_steps(Config1b, + Ret = rabbit_ct_helpers:run_steps(Config1, [fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps()), case Ret of @@ -238,10 +241,6 @@ init_per_group(Group, Config) -> ok = rabbit_ct_broker_helpers:rpc( Config2, 0, application, set_env, [rabbit, channel_tick_interval, 100]), - %% HACK: the larger cluster sizes benefit for a bit - %% more time after clustering before running the - %% tests. - timer:sleep(ClusterSize * 1000), Config2 end end. @@ -580,7 +579,7 @@ start_queue_concurrent(Config) -> [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - timer:sleep(500), + timer:sleep(100), rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), Self ! {done, Server} end) @@ -740,7 +739,10 @@ server_system_recover(Config) -> %% validate quorum queue is still functional ?awaitMatch({ok, _, _}, begin - ra:members({RaName, Server}) + %% there is a small chance that a quorum queue process will crash + %% due to missing ETS table, in this case we need to keep + %% retrying awaiting the restart + catch ra:members({RaName, Server}) end, ?DEFAULT_AWAIT), ok. @@ -978,17 +980,15 @@ consume_in_minority(Config) -> ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), - ok = rabbit_ct_broker_helpers:stop_node(Config, Server2), + ok = rabbit_control_helper:command(stop_app, Server1), + ok = rabbit_control_helper:command(stop_app, Server2), ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _}, amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = false})), - ok = rabbit_ct_broker_helpers:async_start_node(Config, Server1), - ok = rabbit_ct_broker_helpers:async_start_node(Config, Server2), - ok = rabbit_ct_broker_helpers:wait_for_async_start_node(Server1), - ok = rabbit_ct_broker_helpers:wait_for_async_start_node(Server2), + ok = rabbit_control_helper:command(start_app, Server1), + ok = rabbit_control_helper:command(start_app, Server2), ok. reject_after_leader_transfer(Config) -> @@ -1427,17 +1427,19 @@ recover_from_multiple_failures(Config) -> wait_for_messages_pending_ack(Servers, RaName, 0). publishing_to_unavailable_queue(Config) -> - %% publishing to an unavialable queue but with a reachable member should result - %% in the initial enqueuer session timing out and the message being nacked - [Server, Server1, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + %% publishing to an unavailable queue but with a reachable member should result + %% in the initial enqueuer command that is send syncronously to set up + %% the enqueuer session timing out and the message being nacked + [Server, Server1, Server2] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), TCh = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(TCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), - ok = rabbit_ct_broker_helpers:stop_node(Config, Server2), + ok = rabbit_control_helper:command(stop_app, Server1), + ok = rabbit_control_helper:command(stop_app, Server2), assert_cluster_status({Servers, Servers, [Server]}, [Server]), ct:pal("opening channel to ~w", [Server]), @@ -1452,31 +1454,33 @@ publishing_to_unavailable_queue(Config) -> after 90000 -> exit(confirm_timeout) end, - ok = rabbit_ct_broker_helpers:start_node(Config, Server1), + ok = rabbit_control_helper:command(start_app, Server1), ?awaitMatch(2, count_online_nodes(Server, <<"/">>, QQ), ?DEFAULT_AWAIT), publish_many(Ch, QQ, 1), %% this should now be acked + %% check we get at least on ack ok = receive #'basic.ack'{} -> ok; #'basic.nack'{} -> fail after 90000 -> exit(confirm_timeout) end, - %% check we get at least on ack - ok = rabbit_ct_broker_helpers:start_node(Config, Server2), + flush(1), + ok = rabbit_control_helper:command(start_app, Server2), ok. leadership_takeover(Config) -> %% Kill nodes in succession forcing the takeover of leadership, and all messages that %% are in the queue. - [Server, Server1, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server, Server1, Server2] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), + ok = rabbit_control_helper:command(stop_app, Server1), Running = Servers -- [Server1], assert_cluster_status({Servers, Servers, Running}, Running), @@ -1489,18 +1493,17 @@ leadership_takeover(Config) -> wait_for_messages_ready([Server], RaName, 3), wait_for_messages_pending_ack([Server], RaName, 0), - ok = rabbit_ct_broker_helpers:stop_node(Config, Server2), - - ok = rabbit_ct_broker_helpers:start_node(Config, Server1), - ok = rabbit_ct_broker_helpers:stop_node(Config, Server), - ok = rabbit_ct_broker_helpers:start_node(Config, Server2), - ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), - ok = rabbit_ct_broker_helpers:start_node(Config, Server), + ok = rabbit_control_helper:command(stop_app, Server2), + ok = rabbit_control_helper:command(start_app, Server1), + ok = rabbit_control_helper:command(stop_app, Server), + ok = rabbit_control_helper:command(start_app, Server2), + ok = rabbit_control_helper:command(stop_app, Server1), + ok = rabbit_control_helper:command(start_app, Server), wait_for_messages_ready([Server2, Server], RaName, 3), wait_for_messages_pending_ack([Server2, Server], RaName, 0), - ok = rabbit_ct_broker_helpers:start_node(Config, Server1), + ok = rabbit_control_helper:command(start_app, Server1), wait_for_messages_ready(Servers, RaName, 3), wait_for_messages_pending_ack(Servers, RaName, 0). @@ -2268,7 +2271,6 @@ reconnect_consumer_and_wait_channel_down(Config) -> %% Let's give it a few seconds to ensure it doesn't attempt to %% deliver to the down channel - it shouldn't be monitored %% at this time! - timer:sleep(5000), wait_for_messages_ready(Servers, RaName, 1), wait_for_messages_pending_ack(Servers, RaName, 0). @@ -3067,11 +3069,10 @@ per_message_ttl(Config) -> ok = amqp_channel:cast(Ch, #'basic.publish'{routing_key = QQ}, #amqp_msg{props = #'P_basic'{delivery_mode = 2, - expiration = <<"2000">>}, + expiration = <<"1000">>}, payload = Msg1}), amqp_channel:wait_for_confirms(Ch, 5), %% we know the message got to the queue in 2s it should be gone - timer:sleep(2000), wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]), ok. @@ -3099,8 +3100,6 @@ per_message_ttl_mixed_expiry(Config) -> payload = Msg2}), - wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), - timer:sleep(1000), wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), subscribe(Ch, QQ, false), receive @@ -3593,7 +3592,6 @@ select_nodes_with_least_replicas_node_down(Config) -> Qs = [?config(queue_name, Config), ?config(alt_queue_name, Config)], - timer:sleep(1000), Members = [begin ?assertMatch({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, @@ -3710,7 +3708,7 @@ nack(Ch, Multiple, Requeue) -> end. wait_for_cleanup(Server, Channel, Number) -> - wait_for_cleanup(Server, Channel, Number, 60). + wait_for_cleanup(Server, Channel, Number, 120). wait_for_cleanup(Server, Channel, Number, 0) -> ?assertEqual(length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])), @@ -3720,7 +3718,7 @@ wait_for_cleanup(Server, Channel, Number, N) -> Length when Number == Length -> ok; _ -> - timer:sleep(500), + timer:sleep(250), wait_for_cleanup(Server, Channel, Number, N - 1) end.