689 lines
29 KiB
Erlang
689 lines
29 KiB
Erlang
%% This Source Code Form is subject to the terms of the Mozilla Public
|
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
%%
|
|
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
|
%%
|
|
|
|
-module(per_vhost_queue_limit_SUITE).
|
|
|
|
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
-compile(nowarn_export_all).
|
|
-compile(export_all).
|
|
|
|
-import(rabbit_ct_client_helpers, [open_unmanaged_connection/3,
|
|
close_connection_and_channel/2]).
|
|
|
|
-define(AWAIT_TIMEOUT, 30000).
|
|
|
|
all() ->
|
|
[
|
|
{group, tests}
|
|
].
|
|
|
|
groups() ->
|
|
[
|
|
{tests, [], [
|
|
{cluster_size_1, [], cluster_size_1_tests()},
|
|
{cluster_size_2, [], cluster_size_2_tests()}
|
|
]}
|
|
].
|
|
|
|
cluster_size_1_tests() ->
|
|
[
|
|
most_basic_single_node_queue_count,
|
|
single_node_single_vhost_queue_count,
|
|
single_node_multiple_vhosts_queue_count,
|
|
single_node_single_vhost_limit,
|
|
single_node_single_vhost_zero_limit,
|
|
single_node_single_vhost_limit_with_durable_named_queue,
|
|
single_node_single_vhost_zero_limit_with_durable_named_queue,
|
|
single_node_single_vhost_limit_with_queue_ttl,
|
|
single_node_single_vhost_limit_with_redeclaration
|
|
].
|
|
|
|
cluster_size_2_tests() ->
|
|
[
|
|
most_basic_cluster_queue_count,
|
|
cluster_multiple_vhosts_queue_count,
|
|
cluster_multiple_vhosts_limit,
|
|
cluster_multiple_vhosts_zero_limit,
|
|
cluster_multiple_vhosts_limit_with_durable_named_queue,
|
|
cluster_multiple_vhosts_zero_limit_with_durable_named_queue,
|
|
cluster_node_restart_queue_count
|
|
].
|
|
|
|
suite() ->
|
|
[
|
|
%% If a test hangs, no need to wait for 30 minutes.
|
|
{timetrap, {minutes, 8}}
|
|
].
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Testsuite setup/teardown.
|
|
%% -------------------------------------------------------------------
|
|
|
|
init_per_suite(Config) ->
|
|
rabbit_ct_helpers:log_environment(),
|
|
rabbit_ct_helpers:run_setup_steps(Config).
|
|
|
|
end_per_suite(Config) ->
|
|
rabbit_ct_helpers:run_teardown_steps(Config).
|
|
|
|
init_per_group(tests, Config) ->
|
|
Config;
|
|
init_per_group(cluster_size_1 = Group, Config) ->
|
|
init_per_multinode_group(Group, Config, 1);
|
|
init_per_group(cluster_size_2 = Group, Config) ->
|
|
init_per_multinode_group(Group, Config, 2).
|
|
|
|
init_per_multinode_group(_Group, Config, NodeCount) ->
|
|
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
|
|
Config1 = rabbit_ct_helpers:set_config(Config, [
|
|
{rmq_nodes_count, NodeCount},
|
|
{rmq_nodename_suffix, Suffix}
|
|
]),
|
|
rabbit_ct_helpers:run_steps(Config1,
|
|
rabbit_ct_broker_helpers:setup_steps() ++
|
|
rabbit_ct_client_helpers:setup_steps()).
|
|
|
|
end_per_group(tests, Config) ->
|
|
Config;
|
|
end_per_group(_Group, Config) ->
|
|
rabbit_ct_helpers:run_steps(Config,
|
|
rabbit_ct_client_helpers:teardown_steps() ++
|
|
rabbit_ct_broker_helpers:teardown_steps()).
|
|
|
|
init_per_testcase(vhost_limit_after_node_renamed = Testcase, Config) ->
|
|
rabbit_ct_helpers:testcase_started(Config, Testcase),
|
|
rabbit_ct_helpers:run_steps(Config,
|
|
rabbit_ct_broker_helpers:setup_steps() ++
|
|
rabbit_ct_client_helpers:setup_steps());
|
|
init_per_testcase(Testcase, Config) ->
|
|
rabbit_ct_helpers:testcase_started(Config, Testcase),
|
|
Config.
|
|
|
|
end_per_testcase(vhost_limit_after_node_renamed = Testcase, Config) ->
|
|
Config1 = ?config(save_config, Config),
|
|
rabbit_ct_helpers:run_steps(Config1,
|
|
rabbit_ct_client_helpers:teardown_steps() ++
|
|
rabbit_ct_broker_helpers:teardown_steps()),
|
|
rabbit_ct_helpers:testcase_finished(Config1, Testcase);
|
|
end_per_testcase(Testcase, Config) ->
|
|
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Test cases.
|
|
%% -------------------------------------------------------------------
|
|
|
|
most_basic_single_node_queue_count(Config) ->
|
|
VHost = <<"queue-limits">>,
|
|
set_up_vhost(Config, VHost),
|
|
?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
|
|
Conn = open_unmanaged_connection(Config, 0, VHost),
|
|
{ok, Ch} = amqp_connection:open_channel(Conn),
|
|
declare_exclusive_queues(Ch, 10),
|
|
?awaitMatch(10, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
|
|
close_connection_and_channel(Conn, Ch),
|
|
?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
|
|
|
|
single_node_single_vhost_queue_count(Config) ->
|
|
VHost = <<"queue-limits">>,
|
|
set_up_vhost(Config, VHost),
|
|
?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
|
|
Conn = open_unmanaged_connection(Config, 0, VHost),
|
|
{ok, Ch} = amqp_connection:open_channel(Conn),
|
|
declare_exclusive_queues(Ch, 10),
|
|
?awaitMatch(10, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
|
|
declare_durable_queues(Ch, 10),
|
|
?awaitMatch(20, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
|
|
delete_durable_queues(Ch, 10),
|
|
?awaitMatch(10, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
|
|
close_connection_and_channel(Conn, Ch),
|
|
?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
|
|
|
|
single_node_multiple_vhosts_queue_count(Config) ->
|
|
VHost1 = <<"queue-limits1">>,
|
|
VHost2 = <<"queue-limits2">>,
|
|
set_up_vhost(Config, VHost1),
|
|
set_up_vhost(Config, VHost2),
|
|
|
|
?awaitMatch(0, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(0, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT),
|
|
|
|
Conn1 = open_unmanaged_connection(Config, 0, VHost1),
|
|
{ok, Ch1} = amqp_connection:open_channel(Conn1),
|
|
Conn2 = open_unmanaged_connection(Config, 0, VHost2),
|
|
{ok, Ch2} = amqp_connection:open_channel(Conn2),
|
|
|
|
declare_exclusive_queues(Ch1, 10),
|
|
?awaitMatch(10, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT),
|
|
declare_durable_queues(Ch1, 10),
|
|
?awaitMatch(20, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT),
|
|
delete_durable_queues(Ch1, 10),
|
|
?awaitMatch(10, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT),
|
|
declare_exclusive_queues(Ch2, 30),
|
|
?awaitMatch(30, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT),
|
|
close_connection_and_channel(Conn1, Ch1),
|
|
?awaitMatch(0, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT),
|
|
close_connection_and_channel(Conn2, Ch2),
|
|
?awaitMatch(0, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT),
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).
|
|
|
|
single_node_single_vhost_limit(Config) ->
|
|
single_node_single_vhost_limit_with(Config, 5),
|
|
single_node_single_vhost_limit_with(Config, 10).
|
|
single_node_single_vhost_zero_limit(Config) ->
|
|
single_node_single_vhost_zero_limit_with(Config, #'queue.declare'{queue = <<"">>,
|
|
exclusive = true}).
|
|
|
|
single_node_single_vhost_limit_with_durable_named_queue(Config) ->
|
|
VHost = <<"queue-limits">>,
|
|
set_up_vhost(Config, VHost),
|
|
?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
|
|
|
|
set_vhost_queue_limit(Config, VHost, 3),
|
|
Conn = open_unmanaged_connection(Config, 0, VHost),
|
|
{ok, Ch} = amqp_connection:open_channel(Conn),
|
|
|
|
#'queue.declare_ok'{queue = _} =
|
|
amqp_channel:call(Ch, #'queue.declare'{queue = <<"Q1">>,
|
|
exclusive = false,
|
|
durable = true}),
|
|
#'queue.declare_ok'{queue = _} =
|
|
amqp_channel:call(Ch, #'queue.declare'{queue = <<"Q2">>,
|
|
exclusive = false,
|
|
durable = true}),
|
|
#'queue.declare_ok'{queue = _} =
|
|
amqp_channel:call(Ch, #'queue.declare'{queue = <<"Q3">>,
|
|
exclusive = false,
|
|
durable = true}),
|
|
|
|
expect_shutdown_due_to_precondition_failed(
|
|
fun () ->
|
|
amqp_channel:call(Ch, #'queue.declare'{queue = <<"Q4">>,
|
|
exclusive = false,
|
|
durable = true})
|
|
end),
|
|
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
|
|
|
|
single_node_single_vhost_zero_limit_with_durable_named_queue(Config) ->
|
|
single_node_single_vhost_zero_limit_with(Config, #'queue.declare'{queue = <<"Q4">>,
|
|
exclusive = false,
|
|
durable = true}).
|
|
|
|
single_node_single_vhost_limit_with(Config, WatermarkLimit) ->
|
|
VHost = <<"queue-limits">>,
|
|
set_up_vhost(Config, VHost),
|
|
?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
|
|
|
|
set_vhost_queue_limit(Config, VHost, 3),
|
|
Conn = open_unmanaged_connection(Config, 0, VHost),
|
|
{ok, Ch} = amqp_connection:open_channel(Conn),
|
|
|
|
set_vhost_queue_limit(Config, VHost, WatermarkLimit),
|
|
lists:foreach(fun (_) ->
|
|
#'queue.declare_ok'{queue = _} =
|
|
amqp_channel:call(Ch, #'queue.declare'{queue = <<"">>,
|
|
exclusive = true})
|
|
end, lists:seq(1, WatermarkLimit)),
|
|
|
|
expect_shutdown_due_to_precondition_failed(
|
|
fun () ->
|
|
amqp_channel:call(Ch, #'queue.declare'{queue = <<"">>,
|
|
exclusive = true})
|
|
end),
|
|
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
|
|
|
|
single_node_single_vhost_zero_limit_with(Config, QueueDeclare) ->
|
|
VHost = <<"queue-limits">>,
|
|
set_up_vhost(Config, VHost),
|
|
?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
|
|
|
|
Conn1 = open_unmanaged_connection(Config, 0, VHost),
|
|
{ok, Ch1} = amqp_connection:open_channel(Conn1),
|
|
|
|
set_vhost_queue_limit(Config, VHost, 0),
|
|
|
|
expect_shutdown_due_to_precondition_failed(
|
|
fun () ->
|
|
amqp_channel:call(Ch1, QueueDeclare)
|
|
end),
|
|
|
|
Conn2 = open_unmanaged_connection(Config, 0, VHost),
|
|
{ok, Ch2} = amqp_connection:open_channel(Conn2),
|
|
|
|
%% lift the limit
|
|
set_vhost_queue_limit(Config, VHost, -1),
|
|
lists:foreach(fun (_) ->
|
|
#'queue.declare_ok'{queue = _} =
|
|
amqp_channel:call(Ch2, #'queue.declare'{queue = <<"">>,
|
|
exclusive = true})
|
|
end, lists:seq(1, 100)),
|
|
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
|
|
|
|
|
|
single_node_single_vhost_limit_with_queue_ttl(Config) ->
|
|
VHost = <<"queue-limits">>,
|
|
set_up_vhost(Config, VHost),
|
|
?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
|
|
|
|
Conn1 = open_unmanaged_connection(Config, 0, VHost),
|
|
{ok, Ch1} = amqp_connection:open_channel(Conn1),
|
|
|
|
set_vhost_queue_limit(Config, VHost, 3),
|
|
|
|
lists:foreach(fun (_) ->
|
|
#'queue.declare_ok'{queue = _} =
|
|
amqp_channel:call(Ch1, #'queue.declare'{queue = <<"">>,
|
|
exclusive = true,
|
|
arguments = [{<<"x-expires">>, long, 2000}]})
|
|
end, lists:seq(1, 3)),
|
|
|
|
|
|
expect_shutdown_due_to_precondition_failed(
|
|
fun () ->
|
|
amqp_channel:call(Ch1, #'queue.declare'{queue = <<"">>,
|
|
exclusive = true})
|
|
end),
|
|
|
|
Conn2 = open_unmanaged_connection(Config, 0, VHost),
|
|
{ok, Ch2} = amqp_connection:open_channel(Conn2),
|
|
|
|
%% wait for the queues to expire
|
|
?awaitMatch(
|
|
[],
|
|
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []),
|
|
?AWAIT_TIMEOUT),
|
|
|
|
#'queue.declare_ok'{queue = _} =
|
|
amqp_channel:call(Ch2, #'queue.declare'{queue = <<"">>,
|
|
exclusive = true}),
|
|
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
|
|
|
|
|
|
single_node_single_vhost_limit_with_redeclaration(Config) ->
|
|
VHost = <<"queue-limits">>,
|
|
set_up_vhost(Config, VHost),
|
|
?awaitMatch(0, count_queues_in(Config, VHost), ?AWAIT_TIMEOUT),
|
|
|
|
set_vhost_queue_limit(Config, VHost, 3),
|
|
Conn1 = open_unmanaged_connection(Config, 0, VHost),
|
|
{ok, Ch1} = amqp_connection:open_channel(Conn1),
|
|
|
|
#'queue.declare_ok'{queue = _} =
|
|
amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q1">>,
|
|
exclusive = false,
|
|
durable = true}),
|
|
#'queue.declare_ok'{queue = _} =
|
|
amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q2">>,
|
|
exclusive = false,
|
|
durable = true}),
|
|
#'queue.declare_ok'{queue = _} =
|
|
amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q3">>,
|
|
exclusive = false,
|
|
durable = true}),
|
|
|
|
%% can't declare a new queue...
|
|
expect_shutdown_due_to_precondition_failed(
|
|
fun () ->
|
|
amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q4">>,
|
|
exclusive = false,
|
|
durable = true})
|
|
end),
|
|
|
|
|
|
Conn2 = open_unmanaged_connection(Config, 0, VHost),
|
|
{ok, Ch2} = amqp_connection:open_channel(Conn2),
|
|
|
|
%% ...but re-declarations succeed
|
|
#'queue.declare_ok'{queue = _} =
|
|
amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q1">>,
|
|
exclusive = false,
|
|
durable = true}),
|
|
#'queue.declare_ok'{queue = _} =
|
|
amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q2">>,
|
|
exclusive = false,
|
|
durable = true}),
|
|
#'queue.declare_ok'{queue = _} =
|
|
amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q3">>,
|
|
exclusive = false,
|
|
durable = true}),
|
|
|
|
expect_shutdown_due_to_precondition_failed(
|
|
fun () ->
|
|
amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q4">>,
|
|
exclusive = false,
|
|
durable = true})
|
|
end),
|
|
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
|
|
|
|
|
|
most_basic_cluster_queue_count(Config) ->
|
|
VHost = <<"queue-limits">>,
|
|
set_up_vhost(Config, VHost),
|
|
?awaitMatch(0, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(0, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT),
|
|
|
|
Conn1 = open_unmanaged_connection(Config, 0, VHost),
|
|
{ok, Ch1} = amqp_connection:open_channel(Conn1),
|
|
declare_exclusive_queues(Ch1, 10),
|
|
?awaitMatch(10, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(10, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT),
|
|
|
|
Conn2 = open_unmanaged_connection(Config, 0, VHost),
|
|
{ok, Ch2} = amqp_connection:open_channel(Conn2),
|
|
declare_exclusive_queues(Ch2, 15),
|
|
?awaitMatch(25, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(25, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT),
|
|
close_connection_and_channel(Conn1, Ch1),
|
|
close_connection_and_channel(Conn2, Ch2),
|
|
?awaitMatch(0, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(0, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT),
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
|
|
|
|
cluster_node_restart_queue_count(Config) ->
|
|
VHost = <<"queue-limits">>,
|
|
set_up_vhost(Config, VHost),
|
|
?awaitMatch(0, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(0, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT),
|
|
|
|
Conn1 = open_unmanaged_connection(Config, 0, VHost),
|
|
{ok, Ch1} = amqp_connection:open_channel(Conn1),
|
|
declare_exclusive_queues(Ch1, 10),
|
|
?awaitMatch(10, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(10, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT),
|
|
|
|
rabbit_ct_broker_helpers:restart_broker(Config, 0),
|
|
?awaitMatch(0, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
|
|
|
|
Conn2 = open_unmanaged_connection(Config, 1, VHost),
|
|
{ok, Ch2} = amqp_connection:open_channel(Conn2),
|
|
declare_exclusive_queues(Ch2, 15),
|
|
?awaitMatch(15, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(15, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT),
|
|
|
|
declare_durable_queues(Ch2, 10),
|
|
?awaitMatch(25, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(25, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT),
|
|
|
|
rabbit_ct_broker_helpers:restart_broker(Config, 1),
|
|
|
|
?awaitMatch(10, count_queues_in(Config, VHost, 0), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(10, count_queues_in(Config, VHost, 1), ?AWAIT_TIMEOUT),
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
|
|
|
|
|
|
cluster_multiple_vhosts_queue_count(Config) ->
|
|
VHost1 = <<"queue-limits1">>,
|
|
VHost2 = <<"queue-limits2">>,
|
|
set_up_vhost(Config, VHost1),
|
|
set_up_vhost(Config, VHost2),
|
|
|
|
?awaitMatch(0, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(0, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT),
|
|
|
|
Conn1 = open_unmanaged_connection(Config, 0, VHost1),
|
|
{ok, Ch1} = amqp_connection:open_channel(Conn1),
|
|
declare_exclusive_queues(Ch1, 10),
|
|
?awaitMatch(10, count_queues_in(Config, VHost1, 0), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(10, count_queues_in(Config, VHost1, 1), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(0, count_queues_in(Config, VHost2, 0), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(0, count_queues_in(Config, VHost2, 1), ?AWAIT_TIMEOUT),
|
|
|
|
Conn2 = open_unmanaged_connection(Config, 0, VHost2),
|
|
{ok, Ch2} = amqp_connection:open_channel(Conn2),
|
|
declare_exclusive_queues(Ch2, 15),
|
|
?awaitMatch(15, count_queues_in(Config, VHost2, 0), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(15, count_queues_in(Config, VHost2, 1), ?AWAIT_TIMEOUT),
|
|
close_connection_and_channel(Conn1, Ch1),
|
|
close_connection_and_channel(Conn2, Ch2),
|
|
?awaitMatch(0, count_queues_in(Config, VHost1, 0), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(0, count_queues_in(Config, VHost1, 1), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(0, count_queues_in(Config, VHost2, 0), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(0, count_queues_in(Config, VHost2, 1), ?AWAIT_TIMEOUT),
|
|
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).
|
|
|
|
cluster_multiple_vhosts_limit(Config) ->
|
|
cluster_multiple_vhosts_limit_with(Config, 10),
|
|
cluster_multiple_vhosts_limit_with(Config, 20).
|
|
|
|
cluster_multiple_vhosts_limit_with(Config, WatermarkLimit) ->
|
|
VHost1 = <<"queue-limits1">>,
|
|
VHost2 = <<"queue-limits2">>,
|
|
set_up_vhost(Config, VHost1),
|
|
set_up_vhost(Config, VHost2),
|
|
?awaitMatch(0, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(0, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT),
|
|
|
|
set_vhost_queue_limit(Config, VHost1, 3),
|
|
set_vhost_queue_limit(Config, VHost2, 3),
|
|
|
|
Conn1 = open_unmanaged_connection(Config, 0, VHost1),
|
|
{ok, Ch1} = amqp_connection:open_channel(Conn1),
|
|
set_vhost_queue_limit(Config, VHost1, WatermarkLimit),
|
|
|
|
lists:foreach(fun (_) ->
|
|
#'queue.declare_ok'{queue = _} =
|
|
amqp_channel:call(Ch1, #'queue.declare'{queue = <<"">>,
|
|
exclusive = true})
|
|
end, lists:seq(1, WatermarkLimit)),
|
|
|
|
Conn2 = open_unmanaged_connection(Config, 1, VHost2),
|
|
{ok, Ch2} = amqp_connection:open_channel(Conn2),
|
|
set_vhost_queue_limit(Config, VHost2, WatermarkLimit),
|
|
|
|
lists:foreach(fun (_) ->
|
|
#'queue.declare_ok'{queue = _} =
|
|
amqp_channel:call(Ch2, #'queue.declare'{queue = <<"">>,
|
|
exclusive = true})
|
|
end, lists:seq(1, WatermarkLimit)),
|
|
|
|
expect_shutdown_due_to_precondition_failed(
|
|
fun () ->
|
|
amqp_channel:call(Ch1, #'queue.declare'{queue = <<"">>,
|
|
exclusive = true})
|
|
end),
|
|
expect_shutdown_due_to_precondition_failed(
|
|
fun () ->
|
|
amqp_channel:call(Ch2, #'queue.declare'{queue = <<"">>,
|
|
exclusive = true})
|
|
end),
|
|
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).
|
|
|
|
|
|
cluster_multiple_vhosts_zero_limit(Config) ->
|
|
cluster_multiple_vhosts_zero_limit_with(Config, #'queue.declare'{queue = <<"">>,
|
|
exclusive = true}).
|
|
|
|
cluster_multiple_vhosts_limit_with_durable_named_queue(Config) ->
|
|
VHost1 = <<"queue-limits1">>,
|
|
VHost2 = <<"queue-limits2">>,
|
|
set_up_vhost(Config, VHost1),
|
|
set_up_vhost(Config, VHost2),
|
|
?awaitMatch(0, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(0, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT),
|
|
|
|
set_vhost_queue_limit(Config, VHost1, 3),
|
|
set_vhost_queue_limit(Config, VHost2, 3),
|
|
|
|
Conn1 = open_unmanaged_connection(Config, 0, VHost1),
|
|
{ok, Ch1} = amqp_connection:open_channel(Conn1),
|
|
|
|
Conn2 = open_unmanaged_connection(Config, 1, VHost2),
|
|
{ok, Ch2} = amqp_connection:open_channel(Conn2),
|
|
|
|
#'queue.declare_ok'{} =
|
|
amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q1">>,
|
|
exclusive = false,
|
|
durable = true}),
|
|
#'queue.declare_ok'{} =
|
|
amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q2">>,
|
|
exclusive = false,
|
|
durable = true}),
|
|
#'queue.declare_ok'{} =
|
|
amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q3">>,
|
|
exclusive = false,
|
|
durable = true}),
|
|
|
|
#'queue.declare_ok'{} =
|
|
amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q1">>,
|
|
exclusive = false,
|
|
durable = true}),
|
|
#'queue.declare_ok'{} =
|
|
amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q2">>,
|
|
exclusive = false,
|
|
durable = true}),
|
|
#'queue.declare_ok'{} =
|
|
amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q3">>,
|
|
exclusive = false,
|
|
durable = true}),
|
|
|
|
expect_shutdown_due_to_precondition_failed(
|
|
fun () ->
|
|
amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q3">>,
|
|
exclusive = false,
|
|
durable = true})
|
|
end),
|
|
expect_shutdown_due_to_precondition_failed(
|
|
fun () ->
|
|
amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q3">>,
|
|
exclusive = false,
|
|
durable = true})
|
|
end),
|
|
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).
|
|
|
|
cluster_multiple_vhosts_zero_limit_with_durable_named_queue(Config) ->
|
|
cluster_multiple_vhosts_zero_limit_with(Config, #'queue.declare'{queue = <<"Q4">>,
|
|
exclusive = false,
|
|
durable = true}).
|
|
|
|
cluster_multiple_vhosts_zero_limit_with(Config, QueueDeclare) ->
|
|
VHost1 = <<"queue-limits1">>,
|
|
VHost2 = <<"queue-limits2">>,
|
|
set_up_vhost(Config, VHost1),
|
|
set_up_vhost(Config, VHost2),
|
|
?awaitMatch(0, count_queues_in(Config, VHost1), ?AWAIT_TIMEOUT),
|
|
?awaitMatch(0, count_queues_in(Config, VHost2), ?AWAIT_TIMEOUT),
|
|
|
|
Conn1 = open_unmanaged_connection(Config, 0, VHost1),
|
|
{ok, Ch1} = amqp_connection:open_channel(Conn1),
|
|
Conn2 = open_unmanaged_connection(Config, 1, VHost2),
|
|
{ok, Ch2} = amqp_connection:open_channel(Conn2),
|
|
|
|
set_vhost_queue_limit(Config, VHost1, 0),
|
|
set_vhost_queue_limit(Config, VHost2, 0),
|
|
|
|
expect_shutdown_due_to_precondition_failed(
|
|
fun () ->
|
|
amqp_channel:call(Ch1, QueueDeclare)
|
|
end),
|
|
expect_shutdown_due_to_precondition_failed(
|
|
fun () ->
|
|
amqp_channel:call(Ch2, QueueDeclare)
|
|
end),
|
|
|
|
|
|
Conn3 = open_unmanaged_connection(Config, 0, VHost1),
|
|
{ok, Ch3} = amqp_connection:open_channel(Conn3),
|
|
Conn4 = open_unmanaged_connection(Config, 1, VHost2),
|
|
{ok, Ch4} = amqp_connection:open_channel(Conn4),
|
|
|
|
%% lift the limits
|
|
set_vhost_queue_limit(Config, VHost1, -1),
|
|
set_vhost_queue_limit(Config, VHost2, -1),
|
|
lists:foreach(fun (_) ->
|
|
#'queue.declare_ok'{queue = _} =
|
|
amqp_channel:call(Ch3, #'queue.declare'{queue = <<"">>,
|
|
exclusive = true}),
|
|
#'queue.declare_ok'{queue = _} =
|
|
amqp_channel:call(Ch4, #'queue.declare'{queue = <<"">>,
|
|
exclusive = true})
|
|
end, lists:seq(1, 400)),
|
|
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
|
|
rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).
|
|
|
|
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Helpers
|
|
%% -------------------------------------------------------------------
|
|
|
|
set_up_vhost(Config, VHost) ->
|
|
rabbit_ct_broker_helpers:add_vhost(Config, VHost),
|
|
rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost),
|
|
set_vhost_queue_limit(Config, VHost, -1).
|
|
|
|
set_vhost_queue_limit(Config, VHost, Count) ->
|
|
set_vhost_queue_limit(Config, 0, VHost, Count).
|
|
|
|
set_vhost_queue_limit(Config, NodeIndex, VHost, Count) ->
|
|
Node = rabbit_ct_broker_helpers:get_node_config(
|
|
Config, NodeIndex, nodename),
|
|
rabbit_ct_broker_helpers:control_action(
|
|
set_vhost_limits, Node,
|
|
["{\"max-queues\": " ++ integer_to_list(Count) ++ "}"],
|
|
[{"-p", binary_to_list(VHost)}]).
|
|
|
|
count_queues_in(Config, VHost) ->
|
|
count_queues_in(Config, VHost, 0).
|
|
count_queues_in(Config, VHost, NodeIndex) ->
|
|
rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
|
|
rabbit_amqqueue,
|
|
count, [VHost]).
|
|
|
|
declare_exclusive_queues(Ch, N) ->
|
|
lists:foreach(fun (_) ->
|
|
amqp_channel:call(Ch,
|
|
#'queue.declare'{queue = <<"">>,
|
|
exclusive = true})
|
|
end,
|
|
lists:seq(1, N)).
|
|
|
|
declare_durable_queues(Ch, N) ->
|
|
lists:foreach(fun (I) ->
|
|
amqp_channel:call(Ch,
|
|
#'queue.declare'{queue = durable_queue_name(I),
|
|
exclusive = false,
|
|
durable = true})
|
|
end,
|
|
lists:seq(1, N)).
|
|
|
|
delete_durable_queues(Ch, N) ->
|
|
lists:foreach(fun (I) ->
|
|
amqp_channel:call(Ch,
|
|
#'queue.delete'{queue = durable_queue_name(I)})
|
|
end,
|
|
lists:seq(1, N)).
|
|
|
|
durable_queue_name(N) when is_integer(N) ->
|
|
iolist_to_binary(io_lib:format("queue-limits-durable-~tp", [N])).
|
|
|
|
expect_shutdown_due_to_precondition_failed(Thunk) ->
|
|
try
|
|
Thunk(),
|
|
ok
|
|
catch _:{{shutdown, {server_initiated_close, 406, _}}, _} ->
|
|
%% expected
|
|
ok
|
|
end.
|