Add test for QQ force_vhost_queues_shrink_member_to_current_member/1
This commit is contained in:
parent
c26aa3b1c7
commit
de0c0dbd89
|
|
@ -94,7 +94,8 @@ groups() ->
|
|||
single_active_consumer_priority_take_over,
|
||||
single_active_consumer_priority,
|
||||
force_shrink_member_to_current_member,
|
||||
force_all_queues_shrink_member_to_current_member
|
||||
force_all_queues_shrink_member_to_current_member,
|
||||
force_vhost_queues_shrink_member_to_current_member
|
||||
]
|
||||
++ all_tests()},
|
||||
{cluster_size_5, [], [start_queue,
|
||||
|
|
@ -1232,6 +1233,72 @@ force_all_queues_shrink_member_to_current_member(Config) ->
|
|||
?assertEqual(3, length(Nodes0))
|
||||
end || Q <- QQs].
|
||||
|
||||
force_vhost_queues_shrink_member_to_current_member(Config) ->
|
||||
[Server0, Server1, Server2] =
|
||||
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
|
||||
Ch0 = rabbit_ct_client_helpers:open_channel(Config, Server0),
|
||||
QQ = ?config(queue_name, Config),
|
||||
AQ = ?config(alt_queue_name, Config),
|
||||
?assertEqual({'queue.declare_ok', QQ, 0, 0},
|
||||
declare(Ch0, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
|
||||
?assertEqual({'queue.declare_ok', AQ, 0, 0},
|
||||
declare(Ch0, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
|
||||
|
||||
QQs = [QQ, AQ],
|
||||
|
||||
VHost1 = <<"/">>,
|
||||
VHost2 = <<"another-vhost">>,
|
||||
VHosts = [VHost1, VHost2],
|
||||
|
||||
User = ?config(rmq_username, Config),
|
||||
ok = rabbit_ct_broker_helpers:add_vhost(Config, Server0, VHost2, User),
|
||||
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost2),
|
||||
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Server0, VHost2),
|
||||
{ok, Ch1} = amqp_connection:open_channel(Conn1),
|
||||
?assertEqual({'queue.declare_ok', QQ, 0, 0},
|
||||
declare(Ch1, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
|
||||
?assertEqual({'queue.declare_ok', AQ, 0, 0},
|
||||
declare(Ch1, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
|
||||
|
||||
[rabbit_ct_client_helpers:publish(Ch, Q, 3) || Q <- QQs, Ch <- [Ch0, Ch1]],
|
||||
|
||||
[begin
|
||||
QQRes = rabbit_misc:r(VHost, queue, Q),
|
||||
{ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]),
|
||||
wait_for_messages_ready([Server0], RaName, 3),
|
||||
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]),
|
||||
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
|
||||
?assertEqual(3, length(Nodes0))
|
||||
end || Q <- QQs, VHost <- VHosts],
|
||||
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
|
||||
force_vhost_queues_shrink_member_to_current_member, [VHost2]),
|
||||
|
||||
[begin
|
||||
QQRes = rabbit_misc:r(VHost, queue, Q),
|
||||
{ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]),
|
||||
wait_for_messages_ready([Server0], RaName, 3),
|
||||
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]),
|
||||
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
|
||||
case VHost of
|
||||
VHost1 -> ?assertEqual(3, length(Nodes0));
|
||||
VHost2 -> ?assertEqual(1, length(Nodes0))
|
||||
end
|
||||
end || Q <- QQs, VHost <- VHosts],
|
||||
|
||||
%% grow queues back to all nodes in VHost2 only
|
||||
[rpc:call(Server0, rabbit_quorum_queue, grow, [S, VHost2, <<".*">>, all]) || S <- [Server1, Server2]],
|
||||
|
||||
[begin
|
||||
QQRes = rabbit_misc:r(VHost, queue, Q),
|
||||
{ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]),
|
||||
wait_for_messages_ready([Server0], RaName, 3),
|
||||
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]),
|
||||
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
|
||||
?assertEqual(3, length(Nodes0))
|
||||
end || Q <- QQs, VHost <- VHosts].
|
||||
|
||||
priority_queue_fifo(Config) ->
|
||||
%% testing: if hi priority messages are published before lo priority
|
||||
%% messages they are always consumed first (fifo)
|
||||
|
|
|
|||
Loading…
Reference in New Issue