further speed up quorum_queue_SUITE

This commit is contained in:
Karl Nilsson 2024-06-25 11:00:23 +01:00
parent e84cb65287
commit e2767a750d
3 changed files with 39 additions and 27 deletions

View File

@ -98,6 +98,7 @@ init_per_suite(Config0) ->
rabbit_ct_helpers:log_environment(),
Config = rabbit_ct_helpers:merge_app_env(
Config0, {rabbit, [{dead_letter_worker_publisher_confirm_timeout, 2000},
{collect_statistics_interval, Tick},
{channel_tick_interval, Tick},
{quorum_tick_interval, Tick},
{stream_tick_interval, Tick}]}),

View File

@ -18,17 +18,23 @@
has_local_stream_member_rpc/1
]).
-define(WFM_SLEEP, 256).
-define(WFM_DEFAULT_NUMS, 30_000 div ?WFM_SLEEP). %% ~30s
wait_for_messages_ready(Servers, QName, Ready) ->
wait_for_messages(Servers, QName, Ready,
fun rabbit_fifo:query_messages_ready/1, 60).
fun rabbit_fifo:query_messages_ready/1,
?WFM_DEFAULT_NUMS).
wait_for_messages_pending_ack(Servers, QName, Ready) ->
wait_for_messages(Servers, QName, Ready,
fun rabbit_fifo:query_messages_checked_out/1, 60).
fun rabbit_fifo:query_messages_checked_out/1,
?WFM_DEFAULT_NUMS).
wait_for_messages_total(Servers, QName, Total) ->
wait_for_messages(Servers, QName, Total,
fun rabbit_fifo:query_messages_total/1, 60).
fun rabbit_fifo:query_messages_total/1,
?WFM_DEFAULT_NUMS).
wait_for_messages(Servers, QName, Number, Fun, 0) ->
Msgs = dirty_query(Servers, QName, Fun),
@ -52,12 +58,12 @@ wait_for_messages(Servers, QName, Number, Fun, N) ->
true ->
ok;
_ ->
timer:sleep(500),
timer:sleep(?WFM_SLEEP),
wait_for_messages(Servers, QName, Number, Fun, N - 1)
end.
wait_for_messages(Config, Stats) ->
wait_for_messages(Config, lists:sort(Stats), 60).
wait_for_messages(Config, lists:sort(Stats), ?WFM_DEFAULT_NUMS).
wait_for_messages(Config, Stats, 0) ->
?assertEqual(Stats,
@ -75,12 +81,12 @@ wait_for_messages(Config, Stats, N) ->
Stats0 when Stats0 == Stats ->
ok;
_ ->
timer:sleep(500),
timer:sleep(?WFM_SLEEP),
wait_for_messages(Config, Stats, N - 1)
end.
wait_for_min_messages(Config, Queue, Msgs) ->
wait_for_min_messages(Config, Queue, Msgs, 60).
wait_for_min_messages(Config, Queue, Msgs, ?WFM_DEFAULT_NUMS).
wait_for_min_messages(Config, Queue, Msgs, 0) ->
[[_, Got]] = filter_queues([[Queue, Msgs]],
@ -97,16 +103,16 @@ wait_for_min_messages(Config, Queue, Msgs, N) ->
true ->
ok;
false ->
timer:sleep(500),
timer:sleep(?WFM_SLEEP),
wait_for_min_messages(Config, Queue, Msgs, N - 1)
end;
_ ->
timer:sleep(500),
timer:sleep(?WFM_SLEEP),
wait_for_min_messages(Config, Queue, Msgs, N - 1)
end.
wait_for_max_messages(Config, Queue, Msgs) ->
wait_for_max_messages(Config, Queue, Msgs, 60).
wait_for_max_messages(Config, Queue, Msgs, ?WFM_DEFAULT_NUMS).
wait_for_max_messages(Config, Queue, Msgs, 0) ->
[[_, Got]] = filter_queues([[Queue, Msgs]],
@ -123,11 +129,11 @@ wait_for_max_messages(Config, Queue, Msgs, N) ->
true ->
ok;
false ->
timer:sleep(500),
timer:sleep(?WFM_SLEEP),
wait_for_max_messages(Config, Queue, Msgs, N - 1)
end;
_ ->
timer:sleep(500),
timer:sleep(?WFM_SLEEP),
wait_for_max_messages(Config, Queue, Msgs, N - 1)
end.

View File

@ -190,7 +190,7 @@ memory_tests() ->
init_per_suite(Config0) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:merge_app_env(
Config0, {rabbit, [{quorum_tick_interval, 1000}]}),
Config0, {rabbit, [{quorum_tick_interval, 256}]}),
rabbit_ct_helpers:run_setup_steps(Config1, []).
end_per_suite(Config) ->
@ -977,18 +977,19 @@ consume_in_minority(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
QQ = ?config(queue_name, Config),
RaName = binary_to_atom(<<"%2F_", QQ/binary>>, utf8),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
ok = rabbit_control_helper:command(stop_app, Server1),
ok = rabbit_control_helper:command(stop_app, Server2),
rabbit_quorum_queue:stop_server({RaName, Server1}),
rabbit_quorum_queue:stop_server({RaName, Server2}),
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = false})),
ok = rabbit_control_helper:command(start_app, Server1),
ok = rabbit_control_helper:command(start_app, Server2),
rabbit_quorum_queue:restart_server({RaName, Server1}),
rabbit_quorum_queue:restart_server({RaName, Server2}),
ok.
reject_after_leader_transfer(Config) ->
@ -1430,17 +1431,17 @@ publishing_to_unavailable_queue(Config) ->
%% publishing to an unavailable queue but with a reachable member should result
%% in the initial enqueuer command that is send syncronously to set up
%% the enqueuer session timing out and the message being nacked
[Server, Server1, Server2] = Servers =
[Server, Server1, Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
TCh = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
RaName = binary_to_atom(<<"%2F_", QQ/binary>>, utf8),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(TCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
ok = rabbit_control_helper:command(stop_app, Server1),
ok = rabbit_control_helper:command(stop_app, Server2),
assert_cluster_status({Servers, Servers, [Server]}, [Server]),
rabbit_quorum_queue:stop_server({RaName, Server1}),
rabbit_quorum_queue:stop_server({RaName, Server2}),
ct:pal("opening channel to ~w", [Server]),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
@ -1452,10 +1453,10 @@ publishing_to_unavailable_queue(Config) ->
#'basic.ack'{} -> fail;
#'basic.nack'{} -> ok
after 90000 ->
flush(1),
exit(confirm_timeout)
end,
ok = rabbit_control_helper:command(start_app, Server1),
?awaitMatch(2, count_online_nodes(Server, <<"/">>, QQ), ?DEFAULT_AWAIT),
rabbit_quorum_queue:restart_server({RaName, Server1}),
publish_many(Ch, QQ, 1),
%% this should now be acked
%% check we get at least on ack
@ -1463,10 +1464,11 @@ publishing_to_unavailable_queue(Config) ->
#'basic.ack'{} -> ok;
#'basic.nack'{} -> fail
after 90000 ->
flush(1),
exit(confirm_timeout)
end,
flush(1),
ok = rabbit_control_helper:command(start_app, Server2),
rabbit_quorum_queue:restart_server({RaName, Server2}),
ok.
leadership_takeover(Config) ->
@ -3046,9 +3048,9 @@ message_ttl_policy(Config) ->
ok = rabbit_ct_broker_helpers:set_policy(Config, 0, <<"msg-ttl">>,
QQ, <<"queues">>,
[{<<"message-ttl">>, 10000}]),
[{<<"message-ttl">>, 1000}]),
{ok, {_, Overview2}, _} = rpc:call(Server, ra, local_query, [RaName, QueryFun]),
?assertMatch(#{config := #{msg_ttl := 10000}}, Overview2),
?assertMatch(#{config := #{msg_ttl := 1000}}, Overview2),
publish(Ch, QQ, Msg1),
wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
@ -3096,11 +3098,13 @@ per_message_ttl_mixed_expiry(Config) ->
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{props = #'P_basic'{delivery_mode = 2,
expiration = <<"500">>},
expiration = <<"100">>},
payload = Msg2}),
wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
%% twice the expiry interval
timer:sleep(100 * 2),
subscribe(Ch, QQ, false),
receive
{#'basic.deliver'{delivery_tag = DeliveryTag},
@ -3112,6 +3116,7 @@ per_message_ttl_mixed_expiry(Config) ->
ct:fail("basic deliver timeout")
end,
%% the second message should NOT be received as it has expired
receive
{#'basic.deliver'{}, #amqp_msg{payload = Msg2}} ->