Reduce priority_queue_SUITE to single node tests

Other tests (that produce flakes) arguably test classic mirrored
queues, a deprecated feature reasonably well
covered in other suites.

Per discussion with @gerhard.
This commit is contained in:
Michael Klishin 2021-06-28 21:51:22 +03:00
parent 24c29733a9
commit bed64f2cc9
No known key found for this signature in database
GPG Key ID: E80EDCFA0CDB21EE
1 changed files with 23 additions and 273 deletions

View File

@ -15,40 +15,29 @@
all() ->
[
{group, cluster_size_2},
{group, cluster_size_3}
{group, single_node}
].
groups() ->
[
{cluster_size_2, [], [
ackfold,
drop,
{overflow_reject_publish, [], [reject]},
{overflow_reject_publish_dlx, [], [reject]},
dropwhile_fetchwhile,
info_head_message_timestamp,
matching,
mirror_queue_sync,
mirror_queue_sync_priority_above_max,
mirror_queue_sync_priority_above_max_pending_ack,
mirror_queue_sync_order,
purge,
requeue,
resume,
simple_order,
straight_through,
invoke,
gen_server2_stats,
negative_max_priorities,
max_priorities_above_hard_limit
]},
{cluster_size_3, [], [
mirror_queue_auto_ack,
mirror_fast_reset_policy,
mirror_reset_policy,
mirror_stop_pending_followers
]}
{single_node, [], [
ackfold,
drop,
{overflow_reject_publish, [], [reject]},
{overflow_reject_publish_dlx, [], [reject]},
dropwhile_fetchwhile,
info_head_message_timestamp,
matching,
purge,
requeue,
resume,
simple_order,
straight_through,
invoke,
gen_server2_stats,
negative_max_priorities,
max_priorities_above_hard_limit
]}
].
%% -------------------------------------------------------------------
@ -62,21 +51,12 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(cluster_size_2, Config) ->
init_per_group(single_node, Config) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodes_count, 2},
{rmq_nodename_suffix, Suffix}
]),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps());
init_per_group(cluster_size_3, Config) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodes_count, 3},
{rmq_nodename_suffix, Suffix}
]),
{rmq_nodes_count, 1},
{rmq_nodename_suffix, Suffix}
]),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps());
@ -430,199 +410,6 @@ ram_duration(_Config) ->
PQ:delete_and_terminate(a_whim, BQS5),
passed.
mirror_queue_sync(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"mirror_queue_sync-queue">>,
declare(Ch, Q, 3),
publish(Ch, Q, [1, 2, 3]),
ok = rabbit_ct_broker_helpers:set_ha_policy(Config, 0,
<<"^mirror_queue_sync-queue$">>, <<"all">>),
publish(Ch, Q, [1, 2, 3, 1, 2, 3]),
%% master now has 9, mirror 6.
get_partial(Ch, Q, manual_ack, [3, 3, 3, 2, 2, 2]),
%% So some but not all are unacked at the mirror
Nodename0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
rabbit_ct_broker_helpers:control_action(sync_queue, Nodename0,
[binary_to_list(Q)], [{"-p", "/"}]),
wait_for_sync(Config, Nodename0, rabbit_misc:r(<<"/">>, queue, Q)),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
mirror_queue_sync_priority_above_max(Config) ->
A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
%% Tests synchronisation of mirrors when priority is higher than max priority.
%% This causes an infinity loop (and test timeout) before rabbitmq-server-795
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
Q = <<"mirror_queue_sync_priority_above_max-queue">>,
declare(Ch, Q, 3),
publish(Ch, Q, [5, 5, 5]),
ok = rabbit_ct_broker_helpers:set_ha_policy(Config, A,
<<".*">>, <<"all">>),
rabbit_ct_broker_helpers:control_action(sync_queue, A,
[binary_to_list(Q)], [{"-p", "/"}]),
wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
delete(Ch, Q),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
mirror_queue_sync_priority_above_max_pending_ack(Config) ->
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
%% Tests synchronisation of mirrors when priority is higher than max priority
%% and there are pending acks.
%% This causes an infinity loop (and test timeout) before rabbitmq-server-795
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
Q = <<"mirror_queue_sync_priority_above_max_pending_ack-queue">>,
declare(Ch, Q, 3),
publish(Ch, Q, [5, 5, 5]),
%% Consume but 'forget' to acknowledge
get_without_ack(Ch, Q),
get_without_ack(Ch, Q),
ok = rabbit_ct_broker_helpers:set_ha_policy(Config, A,
<<".*">>, <<"all">>),
rabbit_ct_broker_helpers:control_action(sync_queue, A,
[binary_to_list(Q)], [{"-p", "/"}]),
wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
synced_msgs(Config, A, rabbit_misc:r(<<"/">>, queue, Q), 3),
synced_msgs(Config, B, rabbit_misc:r(<<"/">>, queue, Q), 3),
delete(Ch, Q),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
mirror_queue_auto_ack(Config) ->
A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
%% Check correct use of AckRequired in the notifications to the mirrors.
%% If mirrors are notified with AckRequired == true when it is false,
%% the mirrors will crash with the depth notification as they will not
%% match the master delta.
%% Bug rabbitmq-server 687
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
Q = <<"mirror_queue_auto_ack-queue">>,
declare(Ch, Q, 3),
publish(Ch, Q, [1, 2, 3]),
ok = rabbit_ct_broker_helpers:set_ha_policy(Config, A,
<<".*">>, <<"all">>),
get_partial(Ch, Q, no_ack, [3, 2, 1]),
%% Retrieve mirrors
SPids = slave_pids(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
[{SNode1, _SPid1}, {SNode2, SPid2}] = nodes_and_pids(SPids),
%% Restart one of the mirrors so `request_depth` is triggered
rabbit_ct_broker_helpers:restart_node(Config, SNode1),
%% The alive mirror must have the same pid after its neighbour is restarted
timer:sleep(3000), %% ugly but we can't know when the `depth` instruction arrives
Slaves = nodes_and_pids(slave_pids(Config, A, rabbit_misc:r(<<"/">>, queue, Q))),
SPid2 = proplists:get_value(SNode2, Slaves),
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
mirror_queue_sync_order(Config) ->
A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
B = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
{Conn2, Ch2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, B),
Q = <<"mirror_queue_sync_order-queue">>,
declare(Ch, Q, 3),
publish_payload(Ch, Q, [{1, <<"msg1">>}, {2, <<"msg2">>},
{2, <<"msg3">>}, {2, <<"msg4">>},
{3, <<"msg5">>}]),
rabbit_ct_client_helpers:close_channel(Ch),
%% Add and sync mirror
ok = rabbit_ct_broker_helpers:set_ha_policy(
Config, A, <<"^mirror_queue_sync_order-queue$">>, <<"all">>),
rabbit_ct_broker_helpers:control_action(sync_queue, A,
[binary_to_list(Q)], [{"-p", "/"}]),
wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
%% Stop the master
rabbit_ct_broker_helpers:stop_node(Config, A),
get_payload(Ch2, Q, do_ack, [<<"msg5">>, <<"msg2">>, <<"msg3">>,
<<"msg4">>, <<"msg1">>]),
delete(Ch2, Q),
rabbit_ct_broker_helpers:start_node(Config, A),
rabbit_ct_client_helpers:close_connection(Conn),
rabbit_ct_client_helpers:close_connection(Conn2),
passed.
mirror_reset_policy(Config) ->
%% Gives time to the master to go through all stages.
%% Might eventually trigger some race conditions from #802,
%% although for that I would expect a longer run and higher
%% number of messages in the system.
mirror_reset_policy(Config, 5000).
mirror_fast_reset_policy(Config) ->
%% This test seems to trigger the bug tested in invoke/1, but it
%% cannot guarantee it will always happen. Thus, both tests
%% should stay in the test suite.
mirror_reset_policy(Config, 5).
mirror_reset_policy(Config, Wait) ->
A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
Q = <<"mirror_reset_policy-queue">>,
declare(Ch, Q, 5),
Pid = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
publish_many(Ch, Q, 20000),
[begin
rabbit_ct_broker_helpers:set_ha_policy(
Config, A, <<"^mirror_reset_policy-queue$">>, <<"all">>,
[{<<"ha-sync-mode">>, <<"automatic">>}]),
timer:sleep(Wait),
rabbit_ct_broker_helpers:clear_policy(
Config, A, <<"^mirror_reset_policy-queue$">>),
timer:sleep(Wait)
end || _ <- lists:seq(1, 10)],
timer:sleep(1000),
ok = rabbit_ct_broker_helpers:set_ha_policy(
Config, A, <<"^mirror_reset_policy-queue$">>, <<"all">>,
[{<<"ha-sync-mode">>, <<"automatic">>}]),
wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q), 2),
%% Verify master has not crashed
Pid = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
delete(Ch, Q),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
mirror_stop_pending_followers(Config) ->
A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
B = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
C = rabbit_ct_broker_helpers:get_node_config(Config, 2, nodename),
[ok = rabbit_ct_broker_helpers:rpc(
Config, Nodename, application, set_env, [rabbit, slave_wait_timeout, 0]) || Nodename <- [A, B, C]],
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
Q = <<"mirror_stop_pending_followers-queue">>,
declare(Ch, Q, 5),
publish_many(Ch, Q, 20000),
[begin
rabbit_ct_broker_helpers:set_ha_policy(
Config, A, <<"^mirror_stop_pending_followers-queue$">>, <<"all">>,
[{<<"ha-sync-mode">>, <<"automatic">>}]),
wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q), 2),
rabbit_ct_broker_helpers:clear_policy(
Config, A, <<"^mirror_stop_pending_followers-queue$">>)
end || _ <- lists:seq(1, 15)],
delete(Ch, Q),
[ok = rabbit_ct_broker_helpers:rpc(
Config, Nodename, application, set_env, [rabbit, slave_wait_timeout, 15000]) || Nodename <- [A, B, C]],
rabbit_ct_client_helpers:close_connection(Conn),
passed.
%%----------------------------------------------------------------------------
declare(Ch, Q, Args) when is_list(Args) ->
@ -723,43 +510,6 @@ priority2bin(Int) -> list_to_binary(integer_to_list(Int)).
%%----------------------------------------------------------------------------
wait_for_sync(Config, Nodename, Q) ->
wait_for_sync(Config, Nodename, Q, 1).
wait_for_sync(Config, Nodename, Q, Nodes) ->
wait_for_sync(Config, Nodename, Q, Nodes, 600).
wait_for_sync(_, _, _, _, 0) ->
throw(sync_timeout);
wait_for_sync(Config, Nodename, Q, Nodes, N) ->
case synced(Config, Nodename, Q, Nodes) of
true -> ok;
false -> timer:sleep(100),
wait_for_sync(Config, Nodename, Q, Nodes, N-1)
end.
synced(Config, Nodename, Q, Nodes) ->
Info = rabbit_ct_broker_helpers:rpc(Config, Nodename,
rabbit_amqqueue, info_all, [<<"/">>, [name, synchronised_slave_pids]]),
[SSPids] = [Pids || [{name, Q1}, {synchronised_slave_pids, Pids}] <- Info,
Q =:= Q1],
length(SSPids) =:= Nodes.
synced_msgs(Config, Nodename, Q, Expected) ->
Info = rabbit_ct_broker_helpers:rpc(Config, Nodename,
rabbit_amqqueue, info_all, [<<"/">>, [name, messages]]),
[M] = [M || [{name, Q1}, {messages, M}] <- Info, Q =:= Q1],
M =:= Expected.
nodes_and_pids(SPids) ->
lists:zip([node(S) || S <- SPids], SPids).
slave_pids(Config, Nodename, Q) ->
Info = rabbit_ct_broker_helpers:rpc(Config, Nodename,
rabbit_amqqueue, info_all, [<<"/">>, [name, slave_pids]]),
[SPids] = [SPids || [{name, Q1}, {slave_pids, SPids}] <- Info,
Q =:= Q1],
SPids.
queue_pid(Config, Nodename, Q) ->
Info = rabbit_ct_broker_helpers:rpc(