2020-07-13 23:45:00 +08:00
|
|
|
%% This Source Code Form is subject to the terms of the Mozilla Public
|
|
|
|
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
|
|
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
2016-06-24 18:27:05 +08:00
|
|
|
%%
|
2022-03-21 05:21:56 +08:00
|
|
|
%% Copyright (c) 2016-2022 VMware, Inc. or its affiliates. All rights reserved.
|
2016-06-24 18:27:05 +08:00
|
|
|
%%
|
|
|
|
|
|
2016-10-06 19:15:22 +08:00
|
|
|
-module(clustering_SUITE).
|
2016-06-24 18:27:05 +08:00
|
|
|
|
2017-05-04 15:09:39 +08:00
|
|
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
2016-06-24 18:27:05 +08:00
|
|
|
-include_lib("common_test/include/ct.hrl").
|
2016-07-19 21:16:43 +08:00
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
2016-09-29 00:33:51 +08:00
|
|
|
-include_lib("rabbit_common/include/rabbit_core_metrics.hrl").
|
2016-11-29 19:26:58 +08:00
|
|
|
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_metrics.hrl").
|
2017-04-28 19:42:21 +08:00
|
|
|
-include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl").
|
2016-06-24 18:27:05 +08:00
|
|
|
|
|
|
|
|
-import(rabbit_ct_broker_helpers, [get_node_config/3, restart_node/2]).
|
|
|
|
|
-import(rabbit_mgmt_test_util, [http_get/2, http_put/4, http_delete/3]).
|
|
|
|
|
-import(rabbit_misc, [pget/2]).
|
|
|
|
|
|
2022-09-30 16:45:40 +08:00
|
|
|
-compile(nowarn_export_all).
|
2016-06-24 18:27:05 +08:00
|
|
|
-compile(export_all).
|
|
|
|
|
|
|
|
|
|
all() ->
|
|
|
|
|
[
|
|
|
|
|
{group, non_parallel_tests}
|
|
|
|
|
].
|
|
|
|
|
|
|
|
|
|
groups() ->
|
2016-09-27 20:59:46 +08:00
|
|
|
[{non_parallel_tests, [], [
|
2016-06-24 18:27:05 +08:00
|
|
|
list_cluster_nodes_test,
|
2016-09-09 19:43:29 +08:00
|
|
|
multi_node_case1_test,
|
2016-09-13 22:53:51 +08:00
|
|
|
ha_queue_hosted_on_other_node,
|
|
|
|
|
ha_queue_with_multiple_consumers,
|
2018-03-07 14:37:12 +08:00
|
|
|
queue_on_other_node,
|
2016-09-13 22:53:51 +08:00
|
|
|
queue_with_multiple_consumers,
|
|
|
|
|
queue_consumer_cancelled,
|
2016-09-14 21:44:15 +08:00
|
|
|
queue_consumer_channel_closed,
|
2016-09-30 16:26:14 +08:00
|
|
|
queue,
|
2016-09-14 21:44:15 +08:00
|
|
|
queues_single,
|
|
|
|
|
queues_multiple,
|
2016-09-16 16:41:19 +08:00
|
|
|
queues_removed,
|
|
|
|
|
channels_multiple_on_different_nodes,
|
2018-03-07 14:37:12 +08:00
|
|
|
channel_closed,
|
2016-09-16 16:41:19 +08:00
|
|
|
channel,
|
|
|
|
|
channel_other_node,
|
|
|
|
|
channel_with_consumer_on_other_node,
|
2018-03-07 14:37:12 +08:00
|
|
|
channel_with_consumer_on_one_node,
|
2016-09-26 17:55:37 +08:00
|
|
|
consumers,
|
2016-09-26 19:50:32 +08:00
|
|
|
connections,
|
|
|
|
|
exchanges,
|
2016-09-26 23:31:25 +08:00
|
|
|
exchange,
|
2016-09-27 15:31:39 +08:00
|
|
|
vhosts,
|
2016-09-27 20:59:46 +08:00
|
|
|
nodes,
|
2018-11-03 00:00:18 +08:00
|
|
|
overview,
|
|
|
|
|
disable_plugin
|
2016-06-24 18:27:05 +08:00
|
|
|
]}
|
|
|
|
|
].
|
|
|
|
|
|
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
|
%% Testsuite setup/teardown.
|
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
|
|
2016-09-13 22:53:51 +08:00
|
|
|
merge_app_env(Config) ->
|
|
|
|
|
Config1 = rabbit_ct_helpers:merge_app_env(Config,
|
|
|
|
|
{rabbit, [
|
|
|
|
|
{collect_statistics, fine},
|
|
|
|
|
{collect_statistics_interval, 500}
|
|
|
|
|
]}),
|
|
|
|
|
rabbit_ct_helpers:merge_app_env(Config1,
|
2016-11-29 19:26:58 +08:00
|
|
|
{rabbitmq_management_agent, [
|
2016-09-30 16:26:14 +08:00
|
|
|
{rates_mode, detailed},
|
2016-09-13 22:53:51 +08:00
|
|
|
{sample_retention_policies,
|
|
|
|
|
%% List of {MaxAgeInSeconds, SampleEveryNSeconds}
|
|
|
|
|
[{global, [{605, 5}, {3660, 60}, {29400, 600}, {86400, 1800}]},
|
2016-09-21 20:49:34 +08:00
|
|
|
{basic, [{605, 5}, {3600, 60}]},
|
2016-09-13 22:53:51 +08:00
|
|
|
{detailed, [{10, 5}]}] }]}).
|
|
|
|
|
|
2016-06-24 18:27:05 +08:00
|
|
|
init_per_suite(Config) ->
|
|
|
|
|
rabbit_ct_helpers:log_environment(),
|
|
|
|
|
inets:start(),
|
|
|
|
|
Config1 = rabbit_ct_helpers:set_config(Config, [
|
|
|
|
|
{rmq_nodename_suffix, ?MODULE},
|
|
|
|
|
{rmq_nodes_count, 2}
|
|
|
|
|
]),
|
2016-09-13 22:53:51 +08:00
|
|
|
Config2 = merge_app_env(Config1),
|
|
|
|
|
rabbit_ct_helpers:run_setup_steps(Config2,
|
2016-06-24 18:27:05 +08:00
|
|
|
rabbit_ct_broker_helpers:setup_steps()).
|
|
|
|
|
|
|
|
|
|
end_per_suite(Config) ->
|
|
|
|
|
rabbit_ct_helpers:run_teardown_steps(Config,
|
|
|
|
|
rabbit_ct_broker_helpers:teardown_steps()).
|
|
|
|
|
|
|
|
|
|
init_per_group(_, Config) ->
|
|
|
|
|
Config.
|
|
|
|
|
|
|
|
|
|
end_per_group(_, Config) ->
|
|
|
|
|
Config.
|
|
|
|
|
|
2016-09-13 22:53:51 +08:00
|
|
|
init_per_testcase(multi_node_case1_test = Testcase, Config) ->
|
|
|
|
|
rabbit_ct_helpers:testcase_started(Config, Testcase);
|
2016-06-24 18:27:05 +08:00
|
|
|
init_per_testcase(Testcase, Config) ->
|
2016-09-29 00:33:51 +08:00
|
|
|
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, clear_all_table_data, []),
|
|
|
|
|
rabbit_ct_broker_helpers:rpc(Config, 1, ?MODULE, clear_all_table_data, []),
|
2020-09-24 19:18:18 +08:00
|
|
|
rabbit_ct_broker_helpers:close_all_connections(Config, 0, <<"clustering_SUITE:init_per_testcase">>),
|
2016-09-13 22:53:51 +08:00
|
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
|
|
|
|
|
Config1 = rabbit_ct_helpers:set_config(Config, {conn, Conn}),
|
|
|
|
|
rabbit_ct_helpers:testcase_started(Config1, Testcase).
|
2016-06-24 18:27:05 +08:00
|
|
|
|
2016-09-13 22:53:51 +08:00
|
|
|
end_per_testcase(multi_node_case1_test = Testcase, Config) ->
|
2020-09-24 19:18:18 +08:00
|
|
|
rabbit_ct_broker_helpers:close_all_connections(Config, 0, <<"clustering_SUITE:end_per_testcase">>),
|
2016-09-13 22:53:51 +08:00
|
|
|
rabbit_ct_helpers:testcase_finished(Config, Testcase);
|
2016-06-24 18:27:05 +08:00
|
|
|
end_per_testcase(Testcase, Config) ->
|
2016-09-13 22:53:51 +08:00
|
|
|
rabbit_ct_client_helpers:close_connection(?config(conn, Config)),
|
2020-09-24 19:18:18 +08:00
|
|
|
rabbit_ct_broker_helpers:close_all_connections(Config, 0, <<"clustering_SUITE:end_per_testcase">>),
|
2016-06-24 18:27:05 +08:00
|
|
|
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
|
|
|
|
|
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
|
%% Testcases.
|
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
list_cluster_nodes_test(Config) ->
|
|
|
|
|
%% see rmq_nodes_count in init_per_suite
|
|
|
|
|
?assertEqual(2, length(http_get(Config, "/nodes"))),
|
|
|
|
|
passed.
|
|
|
|
|
|
|
|
|
|
multi_node_case1_test(Config) ->
|
2018-03-07 14:37:12 +08:00
|
|
|
Nodename1 = rabbit_data_coercion:to_binary(get_node_config(Config, 0, nodename)),
|
|
|
|
|
Nodename2 = rabbit_data_coercion:to_binary(get_node_config(Config, 1, nodename)),
|
2016-06-24 18:27:05 +08:00
|
|
|
Policy = [{pattern, <<".*">>},
|
|
|
|
|
{definition, [{'ha-mode', <<"all">>}]}],
|
2018-06-12 17:34:34 +08:00
|
|
|
http_put(Config, "/policies/%2F/HA", Policy, [?CREATED, ?NO_CONTENT]),
|
|
|
|
|
http_delete(Config, "/queues/%2F/multi-node-test-queue", [?NO_CONTENT, ?NOT_FOUND]),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
|
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
|
|
|
_ = queue_declare(Chan, <<"multi-node-test-queue">>),
|
2018-06-12 17:34:34 +08:00
|
|
|
Q = wait_for_mirrored_queue(Config, "/queues/%2F/multi-node-test-queue"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
?assert(lists:member(maps:get(node, Q), [Nodename1, Nodename2])),
|
|
|
|
|
[Mirror] = maps:get(slave_nodes, Q),
|
|
|
|
|
[Mirror] = maps:get(synchronised_slave_nodes, Q),
|
|
|
|
|
?assert(lists:member(Mirror, [Nodename1, Nodename2])),
|
|
|
|
|
|
|
|
|
|
%% restart node2 so that queue master migrates
|
2016-06-24 18:27:05 +08:00
|
|
|
restart_node(Config, 1),
|
|
|
|
|
|
2018-06-12 17:34:34 +08:00
|
|
|
Q2 = wait_for_mirrored_queue(Config, "/queues/%2F/multi-node-test-queue"),
|
|
|
|
|
http_delete(Config, "/queues/%2F/multi-node-test-queue", ?NO_CONTENT),
|
|
|
|
|
http_delete(Config, "/policies/%2F/HA", ?NO_CONTENT),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
?assert(lists:member(maps:get(node, Q2), [Nodename1, Nodename2])),
|
|
|
|
|
|
|
|
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
2016-06-24 18:27:05 +08:00
|
|
|
|
|
|
|
|
passed.
|
|
|
|
|
|
2016-09-13 22:53:51 +08:00
|
|
|
ha_queue_hosted_on_other_node(Config) ->
|
2016-09-09 19:43:29 +08:00
|
|
|
Policy = [{pattern, <<".*">>},
|
|
|
|
|
{definition, [{'ha-mode', <<"all">>}]}],
|
2018-06-12 17:34:34 +08:00
|
|
|
http_put(Config, "/policies/%2F/HA", Policy, [?CREATED, ?NO_CONTENT]),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
|
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
|
|
|
_ = queue_declare_durable(Chan, <<"ha-queue">>),
|
2018-06-12 17:34:34 +08:00
|
|
|
_ = wait_for_mirrored_queue(Config, "/queues/%2F/ha-queue"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
{ok, Chan2} = amqp_connection:open_channel(?config(conn, Config)),
|
2016-09-14 16:14:58 +08:00
|
|
|
consume(Chan, <<"ha-queue">>),
|
2016-09-13 22:53:51 +08:00
|
|
|
|
2018-03-07 14:37:12 +08:00
|
|
|
timer:sleep(5100),
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2018-06-12 17:34:34 +08:00
|
|
|
Res = http_get(Config, "/queues/%2F/ha-queue"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-13 22:53:51 +08:00
|
|
|
% assert some basic data is there
|
2016-12-06 01:18:57 +08:00
|
|
|
[Cons] = maps:get(consumer_details, Res),
|
|
|
|
|
#{} = maps:get(channel_details, Cons), % channel details proplist must not be empty
|
|
|
|
|
0 = maps:get(prefetch_count, Cons), % check one of the augmented properties
|
|
|
|
|
<<"ha-queue">> = maps:get(name, Res),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
amqp_channel:close(Chan),
|
|
|
|
|
amqp_channel:close(Chan2),
|
|
|
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
|
|
2018-06-12 17:34:34 +08:00
|
|
|
http_delete(Config, "/queues/%2F/ha-queue", ?NO_CONTENT),
|
|
|
|
|
http_delete(Config, "/policies/%2F/HA", ?NO_CONTENT),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-13 22:53:51 +08:00
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
ha_queue_with_multiple_consumers(Config) ->
|
|
|
|
|
Policy = [{pattern, <<".*">>},
|
|
|
|
|
{definition, [{'ha-mode', <<"all">>}]}],
|
2018-06-12 17:34:34 +08:00
|
|
|
http_put(Config, "/policies/%2F/HA", Policy, [?CREATED, ?NO_CONTENT]),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-13 22:53:51 +08:00
|
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
2018-03-07 14:37:12 +08:00
|
|
|
_ = queue_declare_durable(Chan, <<"ha-queue3">>),
|
2018-06-12 17:34:34 +08:00
|
|
|
_ = wait_for_mirrored_queue(Config, "/queues/%2F/ha-queue3"),
|
2016-09-13 22:53:51 +08:00
|
|
|
|
2018-03-07 14:37:12 +08:00
|
|
|
consume(Chan, <<"ha-queue3">>),
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
{ok, Chan2} = amqp_connection:open_channel(?config(conn, Config)),
|
|
|
|
|
consume(Chan2, <<"ha-queue3">>),
|
|
|
|
|
|
|
|
|
|
timer:sleep(5100),
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2018-06-12 17:34:34 +08:00
|
|
|
Res = http_get(Config, "/queues/%2F/ha-queue3"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-13 22:53:51 +08:00
|
|
|
% assert some basic data is there
|
2016-12-06 01:18:57 +08:00
|
|
|
[C1, C2] = maps:get(consumer_details, Res),
|
2016-09-13 22:53:51 +08:00
|
|
|
% channel details proplist must not be empty
|
2016-12-06 01:18:57 +08:00
|
|
|
#{} = maps:get(channel_details, C1),
|
|
|
|
|
#{} = maps:get(channel_details, C2),
|
2016-09-13 22:53:51 +08:00
|
|
|
% check one of the augmented properties
|
2016-12-06 01:18:57 +08:00
|
|
|
0 = maps:get(prefetch_count, C1),
|
|
|
|
|
0 = maps:get(prefetch_count, C2),
|
2018-03-07 14:37:12 +08:00
|
|
|
<<"ha-queue3">> = maps:get(name, Res),
|
|
|
|
|
|
|
|
|
|
amqp_channel:close(Chan),
|
|
|
|
|
amqp_channel:close(Chan2),
|
|
|
|
|
|
2018-06-12 17:34:34 +08:00
|
|
|
http_delete(Config, "/queues/%2F/ha-queue3", ?NO_CONTENT),
|
|
|
|
|
http_delete(Config, "/policies/%2F/HA", ?NO_CONTENT),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-13 22:53:51 +08:00
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
queue_on_other_node(Config) ->
|
2018-03-07 14:37:12 +08:00
|
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
|
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
|
|
|
_ = queue_declare(Chan, <<"some-queue">>),
|
2018-06-12 17:34:34 +08:00
|
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
2016-09-13 22:53:51 +08:00
|
|
|
|
2018-03-07 14:37:12 +08:00
|
|
|
{ok, Chan2} = amqp_connection:open_channel(?config(conn, Config)),
|
|
|
|
|
consume(Chan2, <<"some-queue">>),
|
|
|
|
|
|
|
|
|
|
timer:sleep(5100),
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2018-06-12 17:34:34 +08:00
|
|
|
Res = http_get(Config, "/queues/%2F/some-queue"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-13 22:53:51 +08:00
|
|
|
% assert some basic data is present
|
2016-12-06 01:18:57 +08:00
|
|
|
[Cons] = maps:get(consumer_details, Res),
|
|
|
|
|
#{} = maps:get(channel_details, Cons), % channel details proplist must not be empty
|
|
|
|
|
0 = maps:get(prefetch_count, Cons), % check one of the augmented properties
|
|
|
|
|
<<"some-queue">> = maps:get(name, Res),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2018-06-12 17:34:34 +08:00
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
amqp_channel:close(Chan),
|
|
|
|
|
amqp_channel:close(Chan2),
|
|
|
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
|
|
2016-09-13 22:53:51 +08:00
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
queue_with_multiple_consumers(Config) ->
|
2022-09-30 16:45:40 +08:00
|
|
|
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, stream_queue),
|
|
|
|
|
%% this may not be supported in mixed mode
|
|
|
|
|
_ = rabbit_ct_broker_helpers:enable_feature_flag(Config, classic_queue_type_delivery_support),
|
2016-09-13 22:53:51 +08:00
|
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
2018-03-07 14:37:12 +08:00
|
|
|
Q = <<"multi-consumer-queue1">>,
|
|
|
|
|
_ = queue_declare(Chan, Q),
|
2018-06-12 17:34:34 +08:00
|
|
|
_ = wait_for_queue(Config, "/queues/%2F/multi-consumer-queue1"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
|
2016-09-21 20:49:34 +08:00
|
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
|
|
|
{ok, Chan2} = amqp_connection:open_channel(Conn),
|
2018-03-07 14:37:12 +08:00
|
|
|
consume(Chan, Q),
|
|
|
|
|
consume(Chan2, Q),
|
|
|
|
|
publish(Chan2, Q),
|
|
|
|
|
publish(Chan, Q),
|
2016-09-29 19:10:47 +08:00
|
|
|
% ensure a message has been consumed and acked
|
2016-09-21 20:49:34 +08:00
|
|
|
receive
|
|
|
|
|
{#'basic.deliver'{delivery_tag = T}, _} ->
|
|
|
|
|
amqp_channel:cast(Chan, #'basic.ack'{delivery_tag = T})
|
|
|
|
|
end,
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
timer:sleep(5100),
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2018-06-12 17:34:34 +08:00
|
|
|
Res = http_get(Config, "/queues/%2F/multi-consumer-queue1"),
|
|
|
|
|
http_delete(Config, "/queues/%2F/multi-consumer-queue1", ?NO_CONTENT),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-13 22:53:51 +08:00
|
|
|
% assert some basic data is there
|
2016-12-06 01:18:57 +08:00
|
|
|
[C1, C2] = maps:get(consumer_details, Res),
|
2016-09-13 22:53:51 +08:00
|
|
|
% channel details proplist must not be empty
|
2016-12-06 01:18:57 +08:00
|
|
|
#{} = maps:get(channel_details, C1),
|
|
|
|
|
#{} = maps:get(channel_details, C2),
|
2016-09-13 22:53:51 +08:00
|
|
|
% check one of the augmented properties
|
2016-12-06 01:18:57 +08:00
|
|
|
0 = maps:get(prefetch_count, C1),
|
|
|
|
|
0 = maps:get(prefetch_count, C2),
|
2018-03-07 14:37:12 +08:00
|
|
|
Q = maps:get(name, Res),
|
|
|
|
|
|
|
|
|
|
amqp_channel:close(Chan),
|
|
|
|
|
amqp_channel:close(Chan2),
|
|
|
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
|
|
2016-09-13 22:53:51 +08:00
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
queue_consumer_cancelled(Config) ->
|
|
|
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
2018-03-07 14:37:12 +08:00
|
|
|
_ = queue_declare(Chan, <<"some-queue">>),
|
2018-06-12 17:34:34 +08:00
|
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-14 16:14:58 +08:00
|
|
|
Tag = consume(Chan, <<"some-queue">>),
|
2016-09-13 22:53:51 +08:00
|
|
|
|
|
|
|
|
#'basic.cancel_ok'{} =
|
|
|
|
|
amqp_channel:call(Chan, #'basic.cancel'{consumer_tag = Tag}),
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2018-06-12 17:34:34 +08:00
|
|
|
Res = http_get(Config, "/queues/%2F/some-queue"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
amqp_channel:close(Chan),
|
|
|
|
|
|
2016-09-13 22:53:51 +08:00
|
|
|
% assert there are no consumer details
|
2016-12-06 01:18:57 +08:00
|
|
|
[] = maps:get(consumer_details, Res),
|
|
|
|
|
<<"some-queue">> = maps:get(name, Res),
|
2018-06-12 17:34:34 +08:00
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
2016-09-13 22:53:51 +08:00
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
queue_consumer_channel_closed(Config) ->
|
|
|
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
2018-03-07 14:37:12 +08:00
|
|
|
_ = queue_declare(Chan, <<"some-queue">>),
|
2018-06-12 17:34:34 +08:00
|
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-14 16:14:58 +08:00
|
|
|
consume(Chan, <<"some-queue">>),
|
2016-09-29 19:10:47 +08:00
|
|
|
force_stats(), % ensure channel stats have been written
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-13 22:53:51 +08:00
|
|
|
amqp_channel:close(Chan),
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2018-06-12 17:34:34 +08:00
|
|
|
Res = http_get(Config, "/queues/%2F/some-queue"),
|
2016-09-13 22:53:51 +08:00
|
|
|
% assert there are no consumer details
|
2016-12-06 01:18:57 +08:00
|
|
|
[] = maps:get(consumer_details, Res),
|
|
|
|
|
<<"some-queue">> = maps:get(name, Res),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2018-06-12 17:34:34 +08:00
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
2016-09-09 19:43:29 +08:00
|
|
|
ok.
|
|
|
|
|
|
2016-09-30 16:26:14 +08:00
|
|
|
queue(Config) ->
|
2018-06-12 17:34:34 +08:00
|
|
|
http_put(Config, "/queues/%2F/some-queue", none, [?CREATED, ?NO_CONTENT]),
|
|
|
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
|
|
|
|
{ok, Chan2} = amqp_connection:open_channel(?config(conn, Config)),
|
|
|
|
|
|
2016-09-30 16:26:14 +08:00
|
|
|
publish(Chan, <<"some-queue">>),
|
|
|
|
|
basic_get(Chan, <<"some-queue">>),
|
|
|
|
|
publish(Chan2, <<"some-queue">>),
|
|
|
|
|
basic_get(Chan2, <<"some-queue">>),
|
|
|
|
|
force_stats(),
|
2018-03-07 14:37:12 +08:00
|
|
|
timer:sleep(5100),
|
2018-06-12 17:34:34 +08:00
|
|
|
Res = http_get(Config, "/queues/%2F/some-queue"),
|
2016-09-30 16:26:14 +08:00
|
|
|
% assert single queue is returned
|
2016-12-06 01:18:57 +08:00
|
|
|
[#{} | _] = maps:get(deliveries, Res),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
amqp_channel:close(Chan),
|
|
|
|
|
amqp_channel:close(Chan2),
|
2018-06-12 17:34:34 +08:00
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-30 16:26:14 +08:00
|
|
|
ok.
|
|
|
|
|
|
2016-09-14 21:44:15 +08:00
|
|
|
queues_single(Config) ->
|
2018-06-12 17:34:34 +08:00
|
|
|
http_put(Config, "/queues/%2F/some-queue", none, [?CREATED, ?NO_CONTENT]),
|
|
|
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2018-06-12 17:34:34 +08:00
|
|
|
Res = http_get(Config, "/queues/%2F"),
|
|
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
% assert at least one queue is returned
|
|
|
|
|
?assert(length(Res) >= 1),
|
|
|
|
|
|
2016-09-14 21:44:15 +08:00
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
queues_multiple(Config) ->
|
2018-03-07 14:37:12 +08:00
|
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
|
|
|
|
_ = queue_declare(Chan, <<"some-queue">>),
|
|
|
|
|
_ = queue_declare(Chan, <<"some-other-queue">>),
|
2018-06-12 17:34:34 +08:00
|
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
|
|
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-other-queue"),
|
2016-09-14 21:44:15 +08:00
|
|
|
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2018-03-07 14:37:12 +08:00
|
|
|
timer:sleep(5100),
|
|
|
|
|
|
2018-06-12 17:34:34 +08:00
|
|
|
Res = http_get(Config, "/queues/%2F"),
|
2018-03-07 14:37:12 +08:00
|
|
|
[Q1, Q2 | _] = Res,
|
|
|
|
|
|
2016-09-14 21:44:15 +08:00
|
|
|
% assert some basic data is present
|
2018-06-12 17:34:34 +08:00
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
|
|
|
|
http_delete(Config, "/queues/%2F/some-other-queue", ?NO_CONTENT),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
false = (maps:get(name, Q1) =:= maps:get(name, Q2)),
|
|
|
|
|
amqp_channel:close(Chan),
|
|
|
|
|
|
2016-09-14 21:44:15 +08:00
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
queues_removed(Config) ->
|
2018-06-12 17:34:34 +08:00
|
|
|
http_put(Config, "/queues/%2F/some-queue", none, [?CREATED, ?NO_CONTENT]),
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2018-06-12 17:34:34 +08:00
|
|
|
N = length(http_get(Config, "/queues/%2F")),
|
|
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2018-06-12 17:34:34 +08:00
|
|
|
?assertEqual(N - 1, length(http_get(Config, "/queues/%2F"))),
|
2016-09-14 21:44:15 +08:00
|
|
|
ok.
|
|
|
|
|
|
2016-09-16 16:41:19 +08:00
|
|
|
channels_multiple_on_different_nodes(Config) ->
|
|
|
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
2018-03-07 14:37:12 +08:00
|
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
|
|
|
_ = queue_declare(Chan, <<"some-queue">>),
|
2018-06-12 17:34:34 +08:00
|
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
|
|
|
{ok, Chan2} = amqp_connection:open_channel(Conn2),
|
2016-09-16 16:41:19 +08:00
|
|
|
consume(Chan, <<"some-queue">>),
|
|
|
|
|
|
2018-03-07 14:37:12 +08:00
|
|
|
timer:sleep(5100),
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-16 16:41:19 +08:00
|
|
|
Res = http_get(Config, "/channels"),
|
|
|
|
|
% assert two channels are present
|
|
|
|
|
[_,_] = Res,
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2018-06-12 17:34:34 +08:00
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
amqp_channel:close(Chan),
|
|
|
|
|
amqp_channel:close(Chan2),
|
|
|
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
|
rabbit_ct_client_helpers:close_connection(Conn2),
|
|
|
|
|
|
2016-09-16 16:41:19 +08:00
|
|
|
ok.
|
|
|
|
|
|
2018-03-07 14:37:12 +08:00
|
|
|
channel_closed(Config) ->
|
|
|
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
|
|
|
|
_ = queue_declare(Chan, <<"some-queue">>),
|
2018-06-12 17:34:34 +08:00
|
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
{ok, Chan2} = amqp_connection:open_channel(?config(conn, Config)),
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
consume(Chan2, <<"some-queue">>),
|
|
|
|
|
amqp_channel:close(Chan),
|
|
|
|
|
|
|
|
|
|
timer:sleep(5100),
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2016-09-16 16:41:19 +08:00
|
|
|
|
|
|
|
|
Res = http_get(Config, "/channels"),
|
|
|
|
|
% assert one channel is present
|
|
|
|
|
[_] = Res,
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2018-06-12 17:34:34 +08:00
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
amqp_channel:close(Chan2),
|
|
|
|
|
|
2016-09-16 16:41:19 +08:00
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
channel(Config) ->
|
|
|
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
|
|
|
|
[{_, ChData}] = rabbit_ct_broker_helpers:rpc(Config, 0, ets, tab2list, [channel_created]),
|
|
|
|
|
|
2020-08-28 04:34:18 +08:00
|
|
|
ChName = uri_string:recompose(#{path => binary_to_list(pget(name, ChData))}),
|
2018-03-07 14:37:12 +08:00
|
|
|
timer:sleep(5100),
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2016-09-16 16:41:19 +08:00
|
|
|
Res = http_get(Config, "/channels/" ++ ChName ),
|
|
|
|
|
% assert channel is non empty
|
2016-12-06 01:18:57 +08:00
|
|
|
#{} = Res,
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
amqp_channel:close(Chan),
|
2016-09-16 16:41:19 +08:00
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
channel_other_node(Config) ->
|
2018-03-07 14:37:12 +08:00
|
|
|
Q = <<"some-queue">>,
|
2018-06-12 17:34:34 +08:00
|
|
|
http_put(Config, "/queues/%2F/some-queue", none, [?CREATED, ?NO_CONTENT]),
|
2016-09-16 16:41:19 +08:00
|
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
2016-09-30 17:21:24 +08:00
|
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
2016-09-16 16:41:19 +08:00
|
|
|
[{_, ChData}] = rabbit_ct_broker_helpers:rpc(Config, 1, ets, tab2list,
|
|
|
|
|
[channel_created]),
|
2020-08-28 04:34:18 +08:00
|
|
|
ChName = uri_string:recompose(#{path => binary_to_list(pget(name, ChData))}),
|
2018-03-07 14:37:12 +08:00
|
|
|
consume(Chan, Q),
|
|
|
|
|
publish(Chan, Q),
|
|
|
|
|
|
|
|
|
|
timer:sleep(5100),
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-16 16:41:19 +08:00
|
|
|
Res = http_get(Config, "/channels/" ++ ChName ),
|
|
|
|
|
% assert channel is non empty
|
2016-12-06 01:18:57 +08:00
|
|
|
#{} = Res,
|
|
|
|
|
[#{}] = maps:get(deliveries, Res),
|
|
|
|
|
#{} = maps:get(connection_details, Res),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2018-06-12 17:34:34 +08:00
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
2018-03-07 14:37:12 +08:00
|
|
|
amqp_connection:close(Conn),
|
|
|
|
|
|
2016-09-16 16:41:19 +08:00
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
channel_with_consumer_on_other_node(Config) ->
|
|
|
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
2018-03-07 14:37:12 +08:00
|
|
|
Q = <<"some-queue">>,
|
|
|
|
|
_ = queue_declare(Chan, Q),
|
2018-06-12 17:34:34 +08:00
|
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-16 16:41:19 +08:00
|
|
|
ChName = get_channel_name(Config, 0),
|
2018-03-07 14:37:12 +08:00
|
|
|
consume(Chan, Q),
|
|
|
|
|
publish(Chan, Q),
|
|
|
|
|
|
|
|
|
|
timer:sleep(5100),
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
Res = http_get(Config, "/channels/" ++ ChName),
|
2018-06-12 17:34:34 +08:00
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
2016-09-16 16:41:19 +08:00
|
|
|
% assert channel is non empty
|
2016-12-06 01:18:57 +08:00
|
|
|
#{} = Res,
|
|
|
|
|
[#{}] = maps:get(consumer_details, Res),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
amqp_channel:close(Chan),
|
|
|
|
|
|
2016-09-16 16:41:19 +08:00
|
|
|
ok.
|
|
|
|
|
|
2018-03-07 14:37:12 +08:00
|
|
|
channel_with_consumer_on_one_node(Config) ->
|
2016-09-16 16:41:19 +08:00
|
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
2018-03-07 14:37:12 +08:00
|
|
|
Q = <<"some-queue">>,
|
|
|
|
|
_ = queue_declare(Chan, Q),
|
2018-06-12 17:34:34 +08:00
|
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-16 16:41:19 +08:00
|
|
|
ChName = get_channel_name(Config, 0),
|
2018-03-07 14:37:12 +08:00
|
|
|
consume(Chan, Q),
|
|
|
|
|
|
|
|
|
|
timer:sleep(5100),
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
Res = http_get(Config, "/channels/" ++ ChName),
|
2016-09-16 16:41:19 +08:00
|
|
|
amqp_channel:close(Chan),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2018-06-12 17:34:34 +08:00
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
2016-09-16 16:41:19 +08:00
|
|
|
% assert channel is non empty
|
2016-12-06 01:18:57 +08:00
|
|
|
#{} = Res,
|
|
|
|
|
[#{}] = maps:get(consumer_details, Res),
|
2016-09-16 16:41:19 +08:00
|
|
|
ok.
|
|
|
|
|
|
2016-09-20 17:13:08 +08:00
|
|
|
consumers(Config) ->
|
2018-03-07 14:37:12 +08:00
|
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
|
|
|
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
|
|
|
_ = queue_declare(Chan, <<"some-queue">>),
|
2018-06-12 17:34:34 +08:00
|
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
|
|
|
{ok, Chan2} = amqp_connection:open_channel(Conn2),
|
2016-09-20 17:13:08 +08:00
|
|
|
consume(Chan, <<"some-queue">>),
|
|
|
|
|
consume(Chan2, <<"some-queue">>),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
timer:sleep(5100),
|
2016-09-21 20:49:34 +08:00
|
|
|
force_stats(),
|
2016-09-20 17:13:08 +08:00
|
|
|
Res = http_get(Config, "/consumers"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-20 17:13:08 +08:00
|
|
|
% assert there are two non-empty consumer records
|
2016-12-06 01:18:57 +08:00
|
|
|
[#{} = C1, #{} = C2] = Res,
|
|
|
|
|
#{} = maps:get(channel_details, C1),
|
|
|
|
|
#{} = maps:get(channel_details, C2),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2018-06-12 17:34:34 +08:00
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
amqp_channel:close(Chan),
|
|
|
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
|
rabbit_ct_client_helpers:close_connection(Conn2),
|
|
|
|
|
|
2016-09-20 17:13:08 +08:00
|
|
|
ok.
|
2016-09-26 17:55:37 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
connections(Config) ->
|
2018-03-07 14:37:12 +08:00
|
|
|
%% one connection is maintained by CT helpers
|
2016-09-26 17:55:37 +08:00
|
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
|
|
|
|
|
{ok, _Chan2} = amqp_connection:open_channel(Conn2),
|
|
|
|
|
|
|
|
|
|
%% channel count needs a bit longer for 2nd chan
|
|
|
|
|
timer:sleep(5100),
|
2016-09-26 17:55:37 +08:00
|
|
|
force_stats(),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-26 17:55:37 +08:00
|
|
|
Res = http_get(Config, "/connections"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-26 17:55:37 +08:00
|
|
|
% assert there are two non-empty connection records
|
2018-03-07 14:37:12 +08:00
|
|
|
[#{} = C1, #{} = C2] = Res,
|
|
|
|
|
1 = maps:get(channels, C1),
|
|
|
|
|
1 = maps:get(channels, C2),
|
|
|
|
|
|
|
|
|
|
amqp_channel:close(Chan),
|
|
|
|
|
rabbit_ct_client_helpers:close_connection(Conn2),
|
|
|
|
|
|
2016-09-26 17:55:37 +08:00
|
|
|
ok.
|
|
|
|
|
|
2016-09-26 19:50:32 +08:00
|
|
|
|
|
|
|
|
exchanges(Config) ->
|
|
|
|
|
{ok, _Chan0} = amqp_connection:open_channel(?config(conn, Config)),
|
|
|
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
|
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
|
|
|
QName = <<"exchanges-test">>,
|
|
|
|
|
XName = <<"some-exchange">>,
|
|
|
|
|
Q = queue_declare(Chan, QName),
|
|
|
|
|
exchange_declare(Chan, XName),
|
|
|
|
|
queue_bind(Chan, XName, Q, <<"some-key">>),
|
|
|
|
|
consume(Chan, QName),
|
|
|
|
|
publish_to(Chan, XName, <<"some-key">>),
|
|
|
|
|
|
|
|
|
|
force_stats(),
|
|
|
|
|
Res = http_get(Config, "/exchanges"),
|
2018-03-07 14:37:12 +08:00
|
|
|
[X] = [X || X <- Res, maps:get(name, X) =:= XName],
|
|
|
|
|
|
|
|
|
|
?assertEqual(<<"direct">>, maps:get(type, X)),
|
|
|
|
|
|
2016-09-26 19:50:32 +08:00
|
|
|
amqp_channel:close(Chan),
|
|
|
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
|
|
|
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
exchange(Config) ->
|
|
|
|
|
{ok, _Chan0} = amqp_connection:open_channel(?config(conn, Config)),
|
|
|
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
|
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
|
|
|
QName = <<"exchanges-test">>,
|
|
|
|
|
XName = <<"some-other-exchange">>,
|
|
|
|
|
Q = queue_declare(Chan, QName),
|
|
|
|
|
exchange_declare(Chan, XName),
|
|
|
|
|
queue_bind(Chan, XName, Q, <<"some-key">>),
|
|
|
|
|
consume(Chan, QName),
|
|
|
|
|
publish_to(Chan, XName, <<"some-key">>),
|
|
|
|
|
|
|
|
|
|
force_stats(),
|
|
|
|
|
force_stats(),
|
|
|
|
|
Res = http_get(Config, "/exchanges/%2F/some-other-exchange"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
?assertEqual(<<"direct">>, maps:get(type, Res)),
|
|
|
|
|
|
2016-09-26 19:50:32 +08:00
|
|
|
amqp_channel:close(Chan),
|
|
|
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
|
|
|
|
|
|
ok.
|
|
|
|
|
|
2016-09-26 23:31:25 +08:00
|
|
|
vhosts(Config) ->
|
2018-03-07 14:37:12 +08:00
|
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
|
|
|
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
|
|
|
_ = queue_declare(Chan, <<"some-queue">>),
|
2018-06-12 17:34:34 +08:00
|
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
|
|
|
{ok, Chan2} = amqp_connection:open_channel(Conn2),
|
2016-09-26 23:31:25 +08:00
|
|
|
publish(Chan2, <<"some-queue">>),
|
2018-03-07 14:37:12 +08:00
|
|
|
timer:sleep(5100), % TODO force stat emission
|
2016-09-26 23:31:25 +08:00
|
|
|
force_stats(),
|
|
|
|
|
Res = http_get(Config, "/vhosts"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2018-06-12 17:34:34 +08:00
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
2016-09-27 15:31:39 +08:00
|
|
|
% default vhost
|
2016-12-06 01:18:57 +08:00
|
|
|
[#{} = Vhost] = Res,
|
2016-09-26 23:31:25 +08:00
|
|
|
% assert vhost has some message stats
|
2016-12-06 01:18:57 +08:00
|
|
|
#{} = maps:get(message_stats, Vhost),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
amqp_channel:close(Chan),
|
|
|
|
|
amqp_channel:close(Chan2),
|
|
|
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
|
|
2016-09-26 23:31:25 +08:00
|
|
|
ok.
|
|
|
|
|
|
2016-09-27 15:31:39 +08:00
|
|
|
nodes(Config) ->
|
2018-03-07 14:37:12 +08:00
|
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
|
|
|
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
|
|
|
_ = queue_declare(Chan, <<"some-queue">>),
|
2018-06-12 17:34:34 +08:00
|
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2016-09-27 15:31:39 +08:00
|
|
|
{ok, Chan2} = amqp_connection:open_channel(Conn),
|
|
|
|
|
publish(Chan2, <<"some-queue">>),
|
2018-03-07 14:37:12 +08:00
|
|
|
timer:sleep(5100), % TODO force stat emission
|
2016-09-27 15:31:39 +08:00
|
|
|
force_stats(),
|
|
|
|
|
Res = http_get(Config, "/nodes"),
|
2018-06-12 17:34:34 +08:00
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
2016-09-27 15:31:39 +08:00
|
|
|
|
2016-12-06 01:18:57 +08:00
|
|
|
[#{} = N1 , #{} = N2] = Res,
|
2018-03-07 14:37:12 +08:00
|
|
|
?assert(is_binary(maps:get(name, N1))),
|
|
|
|
|
?assert(is_binary(maps:get(name, N2))),
|
2016-12-06 01:18:57 +08:00
|
|
|
[#{} | _] = maps:get(cluster_links, N1),
|
|
|
|
|
[#{} | _] = maps:get(cluster_links, N2),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
amqp_channel:close(Chan),
|
|
|
|
|
amqp_channel:close(Chan2),
|
|
|
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
|
|
2016-09-27 15:31:39 +08:00
|
|
|
ok.
|
|
|
|
|
|
2016-09-27 20:59:46 +08:00
|
|
|
overview(Config) ->
|
2018-03-07 14:37:12 +08:00
|
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
|
|
|
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
|
|
|
_ = queue_declare(Chan, <<"queue-n1">>),
|
|
|
|
|
_ = queue_declare(Chan, <<"queue-n2">>),
|
2018-06-12 17:34:34 +08:00
|
|
|
_ = wait_for_queue(Config, "/queues/%2F/queue-n1"),
|
|
|
|
|
_ = wait_for_queue(Config, "/queues/%2F/queue-n2"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
|
|
|
{ok, Chan2} = amqp_connection:open_channel(Conn2),
|
2016-09-28 19:09:09 +08:00
|
|
|
publish(Chan, <<"queue-n1">>),
|
|
|
|
|
publish(Chan2, <<"queue-n2">>),
|
2018-03-07 14:37:12 +08:00
|
|
|
timer:sleep(5100), % TODO force stat emission
|
2016-09-27 20:59:46 +08:00
|
|
|
force_stats(), % channel count needs a bit longer for 2nd chan
|
|
|
|
|
Res = http_get(Config, "/overview"),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
2018-06-12 17:34:34 +08:00
|
|
|
http_delete(Config, "/queues/%2F/queue-n1", ?NO_CONTENT),
|
|
|
|
|
http_delete(Config, "/queues/%2F/queue-n2", ?NO_CONTENT),
|
2016-09-27 20:59:46 +08:00
|
|
|
% assert there are two non-empty connection records
|
2016-12-06 01:18:57 +08:00
|
|
|
ObjTots = maps:get(object_totals, Res),
|
2018-03-07 14:37:12 +08:00
|
|
|
?assert(maps:get(connections, ObjTots) >= 2),
|
|
|
|
|
?assert(maps:get(channels, ObjTots) >= 2),
|
2016-12-06 01:18:57 +08:00
|
|
|
#{} = QT = maps:get(queue_totals, Res),
|
2018-03-07 11:45:42 +08:00
|
|
|
?assert(maps:get(messages_ready, QT) >= 2),
|
2016-12-06 01:18:57 +08:00
|
|
|
MS = maps:get(message_stats, Res),
|
2018-03-07 11:45:42 +08:00
|
|
|
?assert(maps:get(publish, MS) >= 2),
|
2018-10-09 22:35:50 +08:00
|
|
|
ChurnRates = maps:get(churn_rates, Res),
|
|
|
|
|
?assertEqual(maps:get(queue_declared, ChurnRates), 2),
|
|
|
|
|
?assertEqual(maps:get(queue_created, ChurnRates), 2),
|
|
|
|
|
?assertEqual(maps:get(queue_deleted, ChurnRates), 0),
|
|
|
|
|
?assertEqual(maps:get(channel_created, ChurnRates), 2),
|
|
|
|
|
?assertEqual(maps:get(channel_closed, ChurnRates), 0),
|
|
|
|
|
?assertEqual(maps:get(connection_closed, ChurnRates), 0),
|
2018-03-07 14:37:12 +08:00
|
|
|
|
|
|
|
|
amqp_channel:close(Chan),
|
|
|
|
|
amqp_channel:close(Chan2),
|
|
|
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
|
rabbit_ct_client_helpers:close_connection(Conn2),
|
|
|
|
|
|
2016-09-27 20:59:46 +08:00
|
|
|
ok.
|
|
|
|
|
|
2018-11-03 00:00:18 +08:00
|
|
|
disable_plugin(Config) ->
|
|
|
|
|
Node = get_node_config(Config, 0, nodename),
|
|
|
|
|
Status0 = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit, status, []),
|
|
|
|
|
Listeners0 = proplists:get_value(listeners, Status0),
|
2019-04-12 17:14:02 +08:00
|
|
|
?assert(lists:member(http, listener_protos(Listeners0))),
|
2018-11-03 00:00:18 +08:00
|
|
|
rabbit_ct_broker_helpers:disable_plugin(Config, Node, 'rabbitmq_web_dispatch'),
|
|
|
|
|
Status = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit, status, []),
|
|
|
|
|
Listeners = proplists:get_value(listeners, Status),
|
2019-04-12 17:14:02 +08:00
|
|
|
?assert(not lists:member(http, listener_protos(Listeners))),
|
2018-11-03 00:00:18 +08:00
|
|
|
rabbit_ct_broker_helpers:enable_plugin(Config, Node, 'rabbitmq_management').
|
|
|
|
|
|
2016-06-24 18:27:05 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
2016-09-29 00:33:51 +08:00
|
|
|
%%
|
|
|
|
|
|
|
|
|
|
clear_all_table_data() ->
|
|
|
|
|
[ets:delete_all_objects(T) || {T, _} <- ?CORE_TABLES],
|
2016-10-06 19:15:22 +08:00
|
|
|
[ets:delete_all_objects(T) || {T, _} <- ?TABLES],
|
2016-10-06 23:49:14 +08:00
|
|
|
[gen_server:call(P, purge_cache)
|
2016-12-06 01:18:57 +08:00
|
|
|
|| {_, P, _, _} <- supervisor:which_children(rabbit_mgmt_db_cache_sup)],
|
|
|
|
|
send_to_all_collectors(purge_old_stats).
|
2016-06-24 18:27:05 +08:00
|
|
|
|
2016-09-16 16:41:19 +08:00
|
|
|
get_channel_name(Config, Node) ->
|
|
|
|
|
[{_, ChData}|_] = rabbit_ct_broker_helpers:rpc(Config, Node, ets, tab2list,
|
|
|
|
|
[channel_created]),
|
2020-08-28 04:34:18 +08:00
|
|
|
uri_string:recompose(#{path => binary_to_list(pget(name, ChData))}).
|
2016-09-16 16:41:19 +08:00
|
|
|
|
2016-09-14 16:14:58 +08:00
|
|
|
consume(Channel, Queue) ->
|
|
|
|
|
#'basic.consume_ok'{consumer_tag = Tag} =
|
|
|
|
|
amqp_channel:call(Channel, #'basic.consume'{queue = Queue}),
|
|
|
|
|
Tag.
|
|
|
|
|
|
2016-09-21 20:49:34 +08:00
|
|
|
publish(Channel, Key) ->
|
2016-09-28 19:09:09 +08:00
|
|
|
Payload = <<"foobar">>,
|
|
|
|
|
Publish = #'basic.publish'{routing_key = Key},
|
|
|
|
|
amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}).
|
2016-09-26 19:50:32 +08:00
|
|
|
|
2016-09-30 16:26:14 +08:00
|
|
|
basic_get(Channel, Queue) ->
|
|
|
|
|
Publish = #'basic.get'{queue = Queue},
|
|
|
|
|
amqp_channel:call(Channel, Publish).
|
|
|
|
|
|
2016-09-26 19:50:32 +08:00
|
|
|
publish_to(Channel, Exchange, Key) ->
|
2016-09-21 20:49:34 +08:00
|
|
|
Payload = <<"foobar">>,
|
2016-09-26 19:50:32 +08:00
|
|
|
Publish = #'basic.publish'{routing_key = Key,
|
|
|
|
|
exchange = Exchange},
|
2016-09-21 20:49:34 +08:00
|
|
|
amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}).
|
|
|
|
|
|
2016-09-26 19:50:32 +08:00
|
|
|
exchange_declare(Chan, Name) ->
|
|
|
|
|
Declare = #'exchange.declare'{exchange = Name},
|
|
|
|
|
#'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare).
|
|
|
|
|
|
2016-10-03 23:14:44 +08:00
|
|
|
queue_declare(Chan) ->
|
|
|
|
|
Declare = #'queue.declare'{},
|
|
|
|
|
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Chan, Declare),
|
|
|
|
|
Q.
|
|
|
|
|
|
2016-09-26 19:50:32 +08:00
|
|
|
queue_declare(Chan, Name) ->
|
|
|
|
|
Declare = #'queue.declare'{queue = Name},
|
|
|
|
|
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Chan, Declare),
|
|
|
|
|
Q.
|
|
|
|
|
|
2018-03-07 14:37:12 +08:00
|
|
|
queue_declare_durable(Chan, Name) ->
|
|
|
|
|
Declare = #'queue.declare'{queue = Name, durable = true, exclusive = false},
|
|
|
|
|
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Chan, Declare),
|
|
|
|
|
Q.
|
|
|
|
|
|
2016-09-26 19:50:32 +08:00
|
|
|
queue_bind(Chan, Ex, Q, Key) ->
|
|
|
|
|
Binding = #'queue.bind'{queue = Q,
|
|
|
|
|
exchange = Ex,
|
|
|
|
|
routing_key = Key},
|
|
|
|
|
#'queue.bind_ok'{} = amqp_channel:call(Chan, Binding).
|
|
|
|
|
|
2018-03-07 14:37:12 +08:00
|
|
|
wait_for_mirrored_queue(Config, Path) ->
|
|
|
|
|
wait_for_queue(Config, Path, [slave_nodes, synchronised_slave_nodes]).
|
2016-06-24 18:27:05 +08:00
|
|
|
|
2018-03-07 14:37:12 +08:00
|
|
|
wait_for_queue(Config, Path) ->
|
|
|
|
|
wait_for_queue(Config, Path, []).
|
2016-06-24 18:27:05 +08:00
|
|
|
|
2018-03-07 14:37:12 +08:00
|
|
|
wait_for_queue(Config, Path, Keys) ->
|
|
|
|
|
wait_for_queue(Config, Path, Keys, 1000).
|
|
|
|
|
|
|
|
|
|
wait_for_queue(_Config, Path, Keys, 0) ->
|
2016-06-24 18:27:05 +08:00
|
|
|
exit({timeout, {Path, Keys}});
|
|
|
|
|
|
2018-03-07 14:37:12 +08:00
|
|
|
wait_for_queue(Config, Path, Keys, Count) ->
|
2016-06-24 18:27:05 +08:00
|
|
|
Res = http_get(Config, Path),
|
|
|
|
|
case present(Keys, Res) of
|
|
|
|
|
false -> timer:sleep(10),
|
2018-03-07 14:37:12 +08:00
|
|
|
wait_for_queue(Config, Path, Keys, Count - 1);
|
2016-06-24 18:27:05 +08:00
|
|
|
true -> Res
|
|
|
|
|
end.
|
|
|
|
|
|
2018-03-07 14:37:12 +08:00
|
|
|
present([], _Res) ->
|
|
|
|
|
true;
|
2016-06-24 18:27:05 +08:00
|
|
|
present(Keys, Res) ->
|
|
|
|
|
lists:all(fun (Key) ->
|
2016-12-06 01:18:57 +08:00
|
|
|
X = maps:get(Key, Res, undefined),
|
2016-06-24 18:27:05 +08:00
|
|
|
X =/= [] andalso X =/= undefined
|
|
|
|
|
end, Keys).
|
|
|
|
|
|
|
|
|
|
extract_node(N) ->
|
|
|
|
|
list_to_atom(hd(string:tokens(binary_to_list(N), "@"))).
|
2016-09-29 19:10:47 +08:00
|
|
|
|
|
|
|
|
%% debugging utilities
|
|
|
|
|
|
|
|
|
|
trace_fun(Config, MFs) ->
|
|
|
|
|
Nodename1 = get_node_config(Config, 0, nodename),
|
|
|
|
|
Nodename2 = get_node_config(Config, 1, nodename),
|
|
|
|
|
dbg:tracer(process, {fun(A,_) ->
|
|
|
|
|
ct:pal(?LOW_IMPORTANCE,
|
|
|
|
|
"TRACE: ~p", [A])
|
|
|
|
|
end, ok}),
|
|
|
|
|
dbg:n(Nodename1),
|
|
|
|
|
dbg:n(Nodename2),
|
|
|
|
|
dbg:p(all,c),
|
2016-09-30 16:26:14 +08:00
|
|
|
[ dbg:tpl(M, F, cx) || {M, F} <- MFs],
|
|
|
|
|
[ dbg:tpl(M, F, A, cx) || {M, F, A} <- MFs].
|
2016-09-29 19:10:47 +08:00
|
|
|
|
|
|
|
|
dump_table(Config, Table) ->
|
|
|
|
|
Data = rabbit_ct_broker_helpers:rpc(Config, 0, ets, tab2list, [Table]),
|
|
|
|
|
ct:pal(?LOW_IMPORTANCE, "Node 0: Dump of table ~p:~n~p~n", [Table, Data]),
|
|
|
|
|
Data0 = rabbit_ct_broker_helpers:rpc(Config, 1, ets, tab2list, [Table]),
|
|
|
|
|
ct:pal(?LOW_IMPORTANCE, "Node 1: Dump of table ~p:~n~p~n", [Table, Data0]).
|
|
|
|
|
|
2016-12-06 01:18:57 +08:00
|
|
|
force_stats() ->
|
|
|
|
|
force_all(),
|
2018-03-07 14:37:12 +08:00
|
|
|
timer:sleep(2000).
|
2016-12-06 01:18:57 +08:00
|
|
|
|
|
|
|
|
force_all() ->
|
|
|
|
|
[begin
|
2016-12-02 20:36:56 +08:00
|
|
|
{rabbit_mgmt_external_stats, N} ! emit_update,
|
2018-03-07 09:47:22 +08:00
|
|
|
timer:sleep(125)
|
2016-12-06 01:18:57 +08:00
|
|
|
end || N <- [node() | nodes()]],
|
|
|
|
|
send_to_all_collectors(collect_metrics).
|
|
|
|
|
|
|
|
|
|
send_to_all_collectors(Msg) ->
|
|
|
|
|
[begin
|
2016-12-02 20:36:56 +08:00
|
|
|
[{rabbit_mgmt_metrics_collector:name(Table), N} ! Msg
|
|
|
|
|
|| {Table, _} <- ?CORE_TABLES]
|
2016-12-06 01:18:57 +08:00
|
|
|
end || N <- [node() | nodes()]].
|
2019-04-12 17:14:02 +08:00
|
|
|
|
|
|
|
|
listener_protos(Listeners) ->
|
|
|
|
|
[listener_proto(L) || L <- Listeners].
|
|
|
|
|
|
|
|
|
|
listener_proto(#listener{protocol = Proto}) ->
|
|
|
|
|
Proto;
|
|
|
|
|
listener_proto(Proto) when is_atom(Proto) ->
|
|
|
|
|
Proto;
|
|
|
|
|
%% rabbit:status/0 used this formatting before rabbitmq/rabbitmq-cli#340
|
|
|
|
|
listener_proto({Proto, _Port, _Interface}) ->
|
|
|
|
|
Proto.
|