978 lines
36 KiB
Erlang
978 lines
36 KiB
Erlang
%% This Source Code Form is subject to the terms of the Mozilla Public
|
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
%%
|
|
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
|
%%
|
|
|
|
-module(clustering_SUITE).
|
|
|
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("rabbit_common/include/rabbit_core_metrics.hrl").
|
|
-include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl").
|
|
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
|
|
|
|
-import(rabbit_ct_broker_helpers, [get_node_config/3, restart_node/2]).
|
|
-import(rabbit_ct_helpers, [eventually/3]).
|
|
-import(rabbit_mgmt_test_util, [http_get/2, http_put/4, http_post/4, http_delete/3, http_delete/4]).
|
|
-import(rabbit_misc, [pget/2]).
|
|
|
|
-compile(nowarn_export_all).
|
|
-compile(export_all).
|
|
|
|
-define(STATS_INTERVAL, 250).
|
|
|
|
all() ->
|
|
[
|
|
{group, non_parallel_tests}
|
|
].
|
|
|
|
groups() ->
|
|
[{non_parallel_tests, [], [
|
|
list_cluster_nodes_test,
|
|
queue_on_other_node,
|
|
queue_with_multiple_consumers,
|
|
queue_consumer_cancelled,
|
|
queue_consumer_channel_closed,
|
|
queue,
|
|
queues_single,
|
|
queues_multiple,
|
|
queues_removed,
|
|
channels_multiple_on_different_nodes,
|
|
channel_closed,
|
|
channel,
|
|
channel_other_node,
|
|
channel_with_consumer_on_other_node,
|
|
channel_with_consumer_on_one_node,
|
|
consumers,
|
|
connections,
|
|
exchanges,
|
|
exchange,
|
|
vhosts,
|
|
nodes,
|
|
overview,
|
|
disable_plugin,
|
|
qq_replicas_add,
|
|
qq_replicas_delete,
|
|
qq_replicas_grow,
|
|
qq_replicas_shrink
|
|
]}
|
|
].
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Testsuite setup/teardown.
|
|
%% -------------------------------------------------------------------
|
|
|
|
merge_app_env(Config) ->
|
|
Config1 = rabbit_ct_helpers:merge_app_env(
|
|
Config, {rabbit, [
|
|
{collect_statistics, fine},
|
|
{collect_statistics_interval, ?STATS_INTERVAL},
|
|
{core_metrics_gc_interval, 500}
|
|
]}),
|
|
rabbit_ct_helpers:merge_app_env(Config1,
|
|
{rabbitmq_management_agent, [
|
|
{rates_mode, detailed},
|
|
{sample_retention_policies,
|
|
%% List of {MaxAgeInSeconds, SampleEveryNSeconds}
|
|
[{global, [{605, 5}, {3660, 60}, {29400, 600}, {86400, 1800}]},
|
|
{basic, [{605, 5}, {3600, 60}]},
|
|
{detailed, [{10, 5}]}] }]}).
|
|
|
|
init_per_suite(Config) ->
|
|
rabbit_ct_helpers:log_environment(),
|
|
inets:start(),
|
|
Config1 = rabbit_ct_helpers:set_config(Config, [
|
|
{rmq_nodename_suffix, ?MODULE},
|
|
{rmq_nodes_count, 2}
|
|
]),
|
|
Config2 = merge_app_env(Config1),
|
|
rabbit_ct_helpers:run_setup_steps(Config2,
|
|
rabbit_ct_broker_helpers:setup_steps()).
|
|
|
|
end_per_suite(Config) ->
|
|
rabbit_ct_helpers:run_teardown_steps(Config,
|
|
rabbit_ct_broker_helpers:teardown_steps()).
|
|
|
|
init_per_group(_, Config) ->
|
|
Config.
|
|
|
|
end_per_group(_, Config) ->
|
|
Config.
|
|
|
|
init_per_testcase(Testcase, Config) ->
|
|
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, clear_all_table_data, []),
|
|
rabbit_ct_broker_helpers:rpc(Config, 1, ?MODULE, clear_all_table_data, []),
|
|
rabbit_ct_broker_helpers:close_all_connections(Config, 0, <<"clustering_SUITE:init_per_testcase">>),
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
|
|
Config1 = rabbit_ct_helpers:set_config(Config, {conn, Conn}),
|
|
rabbit_ct_helpers:testcase_started(Config1, Testcase).
|
|
|
|
end_per_testcase(Testcase, Config) ->
|
|
rabbit_ct_client_helpers:close_connection(?config(conn, Config)),
|
|
rabbit_ct_broker_helpers:close_all_connections(Config, 0, <<"clustering_SUITE:end_per_testcase">>),
|
|
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Testcases.
|
|
%% -------------------------------------------------------------------
|
|
|
|
list_cluster_nodes_test(Config) ->
|
|
%% see rmq_nodes_count in init_per_suite
|
|
eventually(?_assertEqual(2, length(http_get(Config, "/nodes"))),
|
|
1000, 30),
|
|
passed.
|
|
|
|
qq_replicas_add(Config) ->
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
_ = queue_declare_quorum(Chan, <<"qq.22">>),
|
|
_ = wait_for_queue(Config, "/queues/%2F/qq.22"),
|
|
|
|
Nodename1 = rabbit_data_coercion:to_binary(get_node_config(Config, 1, nodename)),
|
|
Body = [{node, Nodename1}],
|
|
http_post(Config, "/queues/quorum/%2F/qq.22/replicas/add", Body, ?NO_CONTENT),
|
|
|
|
http_delete(Config, "/queues/%2F/qq.22", ?NO_CONTENT),
|
|
|
|
amqp_channel:close(Chan),
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
ok.
|
|
|
|
qq_replicas_delete(Config) ->
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
_ = queue_declare_quorum(Chan, <<"qq.23">>),
|
|
|
|
?awaitMatch(#{members := [_, _]}, http_get(Config, "/queues/%2F/qq.23"), 30000),
|
|
|
|
Nodename1 = rabbit_data_coercion:to_binary(get_node_config(Config, 1, nodename)),
|
|
Body = [{node, Nodename1}],
|
|
|
|
http_delete(Config, "/queues/quorum/%2F/qq.23/replicas/delete", ?ACCEPTED, Body),
|
|
?awaitMatch(#{members := [_]}, http_get(Config, "/queues/%2F/qq.23"), 30000),
|
|
|
|
http_post(Config, "/queues/quorum/%2F/qq.23/replicas/add", Body, ?NO_CONTENT),
|
|
?awaitMatch(#{members := [_, _]}, http_get(Config, "/queues/%2F/qq.23"), 30000),
|
|
|
|
http_delete(Config, "/queues/%2F/qq.23", ?NO_CONTENT),
|
|
|
|
amqp_channel:close(Chan),
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
ok.
|
|
|
|
qq_replicas_grow(Config) ->
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
_ = queue_declare_quorum(Chan, <<"qq.24">>),
|
|
|
|
Nodename1 = rabbit_data_coercion:to_list(get_node_config(Config, 1, nodename)),
|
|
?awaitMatch(#{members := [_, _]}, http_get(Config, "/queues/%2F/qq.24"), 30000),
|
|
http_delete(Config, "/queues/quorum/%2F/qq.24/replicas/delete", ?ACCEPTED,
|
|
[{node, Nodename1}]),
|
|
?awaitMatch(#{members := [_]}, http_get(Config, "/queues/%2F/qq.24"), 30000),
|
|
|
|
Body = [{strategy, <<"all">>},
|
|
{queue_pattern, <<"qq.24">>},
|
|
{vhost_pattern, <<".*">>}],
|
|
http_post(Config, "/queues/quorum/replicas/on/" ++ Nodename1 ++ "/grow",
|
|
Body, ?NO_CONTENT),
|
|
?awaitMatch(#{members := [_, _]}, http_get(Config, "/queues/%2F/qq.24"), 30000),
|
|
|
|
http_delete(Config, "/queues/%2F/qq.24", ?NO_CONTENT),
|
|
|
|
amqp_channel:close(Chan),
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
ok.
|
|
|
|
qq_replicas_shrink(Config) ->
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
_ = queue_declare_quorum(Chan, <<"qq.25">>),
|
|
?awaitMatch(#{members := [_, _]}, http_get(Config, "/queues/%2F/qq.25"), 30000),
|
|
|
|
?awaitMatch(#{members := [_, _]}, http_get(Config, "/queues/%2F/qq.25"), 30000),
|
|
Nodename1 = rabbit_data_coercion:to_list(get_node_config(Config, 1, nodename)),
|
|
|
|
http_delete(Config, "/queues/quorum/replicas/on/" ++ Nodename1 ++ "/shrink",
|
|
?ACCEPTED),
|
|
?awaitMatch(#{members := [_]}, http_get(Config, "/queues/%2F/qq.25"), 30000),
|
|
|
|
http_delete(Config, "/queues/%2F/qq.25", ?NO_CONTENT),
|
|
|
|
amqp_channel:close(Chan),
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
ok.
|
|
|
|
queue_on_other_node(Config) ->
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
_ = queue_declare(Chan, <<"some-queue">>),
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
|
|
|
{ok, Chan2} = amqp_connection:open_channel(?config(conn, Config)),
|
|
consume(Chan2, <<"some-queue">>),
|
|
|
|
?awaitMatch([_],
|
|
begin
|
|
force_stats(Config),
|
|
maps:get(consumer_details, http_get(Config, "/queues/%2F/some-queue"))
|
|
end,
|
|
60000),
|
|
|
|
?awaitMatch({#{}, 0, <<"some-queue">>},
|
|
begin
|
|
Res = http_get(Config, "/queues/%2F/some-queue"),
|
|
%% assert some basic data is present
|
|
case maps:get(consumer_details, Res, undefined) of
|
|
[Cons] ->
|
|
{maps:get(channel_details, Cons, undefined), % channel details proplist must not be empty
|
|
maps:get(prefetch_count, Cons, undefined), % check one of the augmented properties
|
|
maps:get(name, Res, undefined)};
|
|
Any ->
|
|
{unexpected_consumer_details, Any}
|
|
end
|
|
end,
|
|
60000),
|
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
|
|
|
amqp_channel:close(Chan),
|
|
amqp_channel:close(Chan2),
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
ok.
|
|
|
|
queue_with_multiple_consumers(Config) ->
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
|
Q = <<"multi-consumer-queue1">>,
|
|
_ = queue_declare(Chan, Q),
|
|
_ = wait_for_queue(Config, "/queues/%2F/multi-consumer-queue1"),
|
|
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
{ok, Chan2} = amqp_connection:open_channel(Conn),
|
|
consume(Chan, Q),
|
|
consume(Chan2, Q),
|
|
publish(Chan2, Q),
|
|
publish(Chan, Q),
|
|
% ensure a message has been consumed and acked
|
|
receive
|
|
{#'basic.deliver'{delivery_tag = T}, _} ->
|
|
amqp_channel:cast(Chan, #'basic.ack'{delivery_tag = T})
|
|
end,
|
|
|
|
eventually(?_assertMatch(
|
|
{#{}, #{}, 0, 0, Q},
|
|
begin
|
|
force_stats(Config),
|
|
Res = http_get(Config, "/queues/%2F/multi-consumer-queue1"),
|
|
%% assert some basic data is there
|
|
case maps:get(consumer_details, Res) of
|
|
[C1, C2] ->
|
|
%% channel details proplist must not be empty
|
|
{maps:get(channel_details, C1),
|
|
maps:get(channel_details, C2),
|
|
%% check one of the augmented properties
|
|
maps:get(prefetch_count, C1),
|
|
maps:get(prefetch_count, C2),
|
|
maps:get(name, Res)};
|
|
Any ->
|
|
{unexpected_consumer_details, Any}
|
|
end
|
|
end),
|
|
1000, 60),
|
|
|
|
http_delete(Config, "/queues/%2F/multi-consumer-queue1", ?NO_CONTENT),
|
|
|
|
amqp_channel:close(Chan),
|
|
amqp_channel:close(Chan2),
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
ok.
|
|
|
|
queue_consumer_cancelled(Config) ->
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
|
_ = queue_declare(Chan, <<"some-queue">>),
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
|
|
|
Tag = consume(Chan, <<"some-queue">>),
|
|
|
|
#'basic.cancel_ok'{} =
|
|
amqp_channel:call(Chan, #'basic.cancel'{consumer_tag = Tag}),
|
|
eventually(?_assertMatch(
|
|
{[], <<"some-queue">>},
|
|
begin
|
|
force_stats(Config),
|
|
Res = http_get(Config, "/queues/%2F/some-queue"),
|
|
%% assert there are no consumer details
|
|
{maps:get(consumer_details, Res),
|
|
maps:get(name, Res)}
|
|
end),
|
|
1000, 60),
|
|
|
|
amqp_channel:close(Chan),
|
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
|
ok.
|
|
|
|
queue_consumer_channel_closed(Config) ->
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
|
_ = queue_declare(Chan, <<"some-queue">>),
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
|
|
|
consume(Chan, <<"some-queue">>),
|
|
force_stats(Config), % ensure channel stats have been written
|
|
|
|
amqp_channel:close(Chan),
|
|
force_stats(Config),
|
|
|
|
?awaitMatch([],
|
|
%% assert there are no consumer details
|
|
maps:get(consumer_details,
|
|
http_get(Config, "/queues/%2F/some-queue")),
|
|
30000),
|
|
?awaitMatch(<<"some-queue">>,
|
|
maps:get(name,
|
|
http_get(Config, "/queues/%2F/some-queue")),
|
|
30000),
|
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
|
ok.
|
|
|
|
queue(Config) ->
|
|
http_put(Config, "/queues/%2F/some-queue", none, [?CREATED, ?NO_CONTENT]),
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
|
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
|
{ok, Chan2} = amqp_connection:open_channel(?config(conn, Config)),
|
|
|
|
publish(Chan, <<"some-queue">>),
|
|
basic_get(Chan, <<"some-queue">>),
|
|
publish(Chan2, <<"some-queue">>),
|
|
basic_get(Chan2, <<"some-queue">>),
|
|
force_stats(Config),
|
|
|
|
% assert single queue is returned
|
|
?awaitMatch([#{} | _],
|
|
maps:get(deliveries,
|
|
http_get(Config, "/queues/%2F/some-queue")),
|
|
30000),
|
|
|
|
amqp_channel:close(Chan),
|
|
amqp_channel:close(Chan2),
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
|
|
|
ok.
|
|
|
|
queues_single(Config) ->
|
|
http_put(Config, "/queues/%2F/some-queue", none, [?CREATED, ?NO_CONTENT]),
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
|
|
|
eventually(?_assertMatch(
|
|
true,
|
|
begin
|
|
force_stats(Config),
|
|
Res = http_get(Config, "/queues/%2F"),
|
|
%% assert at least one queue is returned
|
|
length(Res) >= 1
|
|
end),
|
|
1000, 60),
|
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
|
|
|
ok.
|
|
|
|
queues_multiple(Config) ->
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
|
_ = queue_declare(Chan, <<"some-queue">>),
|
|
_ = queue_declare(Chan, <<"some-other-queue">>),
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-other-queue"),
|
|
|
|
eventually(?_assertNot(
|
|
begin
|
|
force_stats(Config),
|
|
case http_get(Config, "/queues/%2F") of
|
|
[Q1, Q2 | _] ->
|
|
%% assert some basic data is present
|
|
ct:pal("Name q1 ~p q2 ~p",
|
|
[maps:get(name, Q1),
|
|
maps:get(name, Q2)]),
|
|
maps:get(name, Q1) =:= maps:get(name, Q2);
|
|
Any ->
|
|
{unexpected_queues, Any}
|
|
end
|
|
end),
|
|
1000, 60),
|
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
|
http_delete(Config, "/queues/%2F/some-other-queue", ?NO_CONTENT),
|
|
|
|
amqp_channel:close(Chan),
|
|
|
|
ok.
|
|
|
|
queues_removed(Config) ->
|
|
http_put(Config, "/queues/%2F/some-queue", none, [?CREATED, ?NO_CONTENT]),
|
|
force_stats(Config),
|
|
N = length(http_get(Config, "/queues/%2F")),
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
|
eventually(?_assertEqual(
|
|
N - 1,
|
|
begin
|
|
force_stats(Config),
|
|
length(http_get(Config, "/queues/%2F"))
|
|
end),
|
|
1000, 60),
|
|
ok.
|
|
|
|
channels_multiple_on_different_nodes(Config) ->
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
_ = queue_declare(Chan, <<"some-queue">>),
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
|
|
|
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
{ok, Chan2} = amqp_connection:open_channel(Conn2),
|
|
consume(Chan, <<"some-queue">>),
|
|
|
|
% assert two channels are present
|
|
?awaitMatch([_,_],
|
|
begin
|
|
force_stats(Config),
|
|
http_get(Config, "/channels")
|
|
end,
|
|
30000),
|
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
|
|
|
amqp_channel:close(Chan),
|
|
amqp_channel:close(Chan2),
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
rabbit_ct_client_helpers:close_connection(Conn2),
|
|
|
|
ok.
|
|
|
|
channel_closed(Config) ->
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
|
_ = queue_declare(Chan, <<"some-queue">>),
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
|
|
|
{ok, Chan2} = amqp_connection:open_channel(?config(conn, Config)),
|
|
force_stats(Config),
|
|
|
|
consume(Chan2, <<"some-queue">>),
|
|
amqp_channel:close(Chan),
|
|
|
|
rabbit_ct_helpers:await_condition(
|
|
fun() ->
|
|
force_stats(Config),
|
|
%% assert one channel is present
|
|
length(http_get(Config, "/channels")) == 1
|
|
end,
|
|
60000),
|
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
|
|
|
amqp_channel:close(Chan2),
|
|
|
|
ok.
|
|
|
|
channel(Config) ->
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
|
[{_, ChData}] = rabbit_ct_broker_helpers:rpc(Config, 0, ets, tab2list, [channel_created]),
|
|
|
|
ChName = uri_string:recompose(#{path => binary_to_list(pget(name, ChData))}),
|
|
|
|
eventually(?_assertMatch(
|
|
#{},
|
|
begin
|
|
force_stats(Config),
|
|
%% assert channel is non empty
|
|
http_get(Config, "/channels/" ++ ChName )
|
|
end),
|
|
1000, 60),
|
|
|
|
amqp_channel:close(Chan),
|
|
ok.
|
|
|
|
channel_other_node(Config) ->
|
|
Q = <<"some-queue">>,
|
|
http_put(Config, "/queues/%2F/some-queue", none, [?CREATED, ?NO_CONTENT]),
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
[{_, ChData}] = rabbit_ct_broker_helpers:rpc(Config, 1, ets, tab2list,
|
|
[channel_created]),
|
|
ChName = uri_string:recompose(#{path => binary_to_list(pget(name, ChData))}),
|
|
consume(Chan, Q),
|
|
publish(Chan, Q),
|
|
|
|
eventually(?_assertMatch(
|
|
{[#{}], #{}},
|
|
begin
|
|
force_stats(Config),
|
|
case http_get(Config, "/channels/" ++ ChName) of
|
|
%% assert channel is non empty
|
|
#{} = Res ->
|
|
{maps:get(deliveries, Res),
|
|
maps:get(connection_details, Res)};
|
|
Any ->
|
|
{unexpected_channels, Any}
|
|
end
|
|
end),
|
|
1000, 60),
|
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
|
amqp_connection:close(Conn),
|
|
|
|
ok.
|
|
|
|
channel_with_consumer_on_other_node(Config) ->
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
|
Q = <<"some-queue">>,
|
|
_ = queue_declare(Chan, Q),
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
|
|
|
ChName = get_channel_name(Config, 0),
|
|
consume(Chan, Q),
|
|
publish(Chan, Q),
|
|
|
|
eventually(?_assertMatch(
|
|
[#{}],
|
|
begin
|
|
force_stats(Config),
|
|
case http_get(Config, "/channels/" ++ ChName) of
|
|
%% assert channel is non empty
|
|
#{} = Res ->
|
|
maps:get(consumer_details, Res);
|
|
Any ->
|
|
{unexpected_channels, Any}
|
|
end
|
|
end),
|
|
1000, 60),
|
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
|
amqp_channel:close(Chan),
|
|
|
|
ok.
|
|
|
|
channel_with_consumer_on_one_node(Config) ->
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
|
Q = <<"some-queue">>,
|
|
_ = queue_declare(Chan, Q),
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
|
|
|
ChName = get_channel_name(Config, 0),
|
|
consume(Chan, Q),
|
|
|
|
eventually(?_assertMatch(
|
|
[#{}],
|
|
begin
|
|
force_stats(Config),
|
|
Res = http_get(Config, "/channels/" ++ ChName),
|
|
%% assert channel is non empty
|
|
maps:get(consumer_details, Res)
|
|
end),
|
|
1000, 60),
|
|
|
|
amqp_channel:close(Chan),
|
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
|
ok.
|
|
|
|
consumers(Config) ->
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
_ = queue_declare(Chan, <<"some-queue">>),
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
|
|
|
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
{ok, Chan2} = amqp_connection:open_channel(Conn2),
|
|
consume(Chan, <<"some-queue">>),
|
|
consume(Chan2, <<"some-queue">>),
|
|
|
|
%% assert there are two non-empty consumer records
|
|
eventually(?_assertMatch([#{}, #{}],
|
|
begin
|
|
force_stats(Config),
|
|
http_get(Config, "/consumers")
|
|
end),
|
|
1000, 30),
|
|
eventually(?_assertMatch([#{}, #{}],
|
|
begin
|
|
[C1, C2] = http_get(Config, "/consumers"),
|
|
[maps:get(channel_details, C1),
|
|
maps:get(channel_details, C2)]
|
|
end),
|
|
1000, 30),
|
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
|
|
|
amqp_channel:close(Chan),
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
rabbit_ct_client_helpers:close_connection(Conn2),
|
|
|
|
ok.
|
|
|
|
|
|
connections(Config) ->
|
|
%% one connection is maintained by CT helpers
|
|
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
|
|
|
|
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
|
|
{ok, _Chan2} = amqp_connection:open_channel(Conn2),
|
|
|
|
%% assert there are two non-empty connection records
|
|
eventually(?_assertMatch([1, 1],
|
|
begin
|
|
force_stats(Config),
|
|
case http_get(Config, "/connections") of
|
|
[#{} = C1, #{} = C2] ->
|
|
[maps:get(channels, C1),
|
|
maps:get(channels, C2)];
|
|
Any ->
|
|
{unexpected_connections, Any}
|
|
end
|
|
end),
|
|
1000, 30),
|
|
|
|
amqp_channel:close(Chan),
|
|
rabbit_ct_client_helpers:close_connection(Conn2),
|
|
|
|
ok.
|
|
|
|
|
|
exchanges(Config) ->
|
|
{ok, _Chan0} = amqp_connection:open_channel(?config(conn, Config)),
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
QName = <<"exchanges-test">>,
|
|
XName = <<"some-exchange">>,
|
|
Q = queue_declare(Chan, QName),
|
|
exchange_declare(Chan, XName),
|
|
queue_bind(Chan, XName, Q, <<"some-key">>),
|
|
consume(Chan, QName),
|
|
publish_to(Chan, XName, <<"some-key">>),
|
|
|
|
eventually(?_assertEqual([<<"direct">>],
|
|
begin
|
|
force_stats(Config),
|
|
Res = http_get(Config, "/exchanges"),
|
|
[maps:get(type, X) || X <- Res, maps:get(name, X) =:= XName]
|
|
end),
|
|
1000, 30),
|
|
|
|
amqp_channel:close(Chan),
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
ok.
|
|
|
|
|
|
exchange(Config) ->
|
|
{ok, _Chan0} = amqp_connection:open_channel(?config(conn, Config)),
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
QName = <<"exchanges-test">>,
|
|
XName = <<"some-other-exchange">>,
|
|
Q = queue_declare(Chan, QName),
|
|
exchange_declare(Chan, XName),
|
|
queue_bind(Chan, XName, Q, <<"some-key">>),
|
|
consume(Chan, QName),
|
|
publish_to(Chan, XName, <<"some-key">>),
|
|
|
|
eventually(?_assertEqual(<<"direct">>,
|
|
begin
|
|
force_stats(Config),
|
|
maps:get(type, http_get(Config, "/exchanges/%2F/some-other-exchange"))
|
|
end),
|
|
1000, 30),
|
|
|
|
amqp_channel:close(Chan),
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
ok.
|
|
|
|
vhosts(Config) ->
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
_ = queue_declare(Chan, <<"some-queue">>),
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
|
|
|
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
{ok, Chan2} = amqp_connection:open_channel(Conn2),
|
|
publish(Chan2, <<"some-queue">>),
|
|
|
|
eventually(?_assertMatch(#{},
|
|
begin
|
|
force_stats(Config),
|
|
%% default vhost
|
|
case http_get(Config, "/vhosts") of
|
|
[#{} = Vhost] ->
|
|
%% assert vhost has some message stats
|
|
maps:get(message_stats, Vhost);
|
|
Any ->
|
|
{unexpected_vhosts, Any}
|
|
end
|
|
end),
|
|
1000, 30),
|
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
|
|
|
amqp_channel:close(Chan),
|
|
amqp_channel:close(Chan2),
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
ok.
|
|
|
|
nodes(Config) ->
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
_ = queue_declare(Chan, <<"some-queue">>),
|
|
_ = wait_for_queue(Config, "/queues/%2F/some-queue"),
|
|
|
|
{ok, Chan2} = amqp_connection:open_channel(Conn),
|
|
publish(Chan2, <<"some-queue">>),
|
|
|
|
eventually(?_assertMatch({true, true, [#{} | _], [#{} | _]},
|
|
begin
|
|
force_stats(Config),
|
|
case http_get(Config, "/nodes") of
|
|
[#{} = N1 , #{} = N2] ->
|
|
{is_binary(maps:get(name, N1)),
|
|
is_binary(maps:get(name, N2)),
|
|
maps:get(cluster_links, N1),
|
|
maps:get(cluster_links, N2)};
|
|
Any ->
|
|
{unexpected_nodes, Any}
|
|
end
|
|
end),
|
|
1000, 30),
|
|
|
|
http_delete(Config, "/queues/%2F/some-queue", ?NO_CONTENT),
|
|
|
|
amqp_channel:close(Chan),
|
|
amqp_channel:close(Chan2),
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
|
|
ok.
|
|
|
|
overview(Config) ->
|
|
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
|
|
{ok, Chan} = amqp_connection:open_channel(Conn),
|
|
_ = queue_declare(Chan, <<"queue-n1">>),
|
|
_ = queue_declare(Chan, <<"queue-n2">>),
|
|
_ = wait_for_queue(Config, "/queues/%2F/queue-n1"),
|
|
_ = wait_for_queue(Config, "/queues/%2F/queue-n2"),
|
|
|
|
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
|
|
{ok, Chan2} = amqp_connection:open_channel(Conn2),
|
|
publish(Chan, <<"queue-n1">>),
|
|
publish(Chan2, <<"queue-n2">>),
|
|
|
|
eventually(?_assertMatch(
|
|
{true, true, true, true, 2, 2, 0, 2, 0, 0},
|
|
begin
|
|
force_stats(Config), % channel count needs a bit longer for 2nd chan
|
|
Res = http_get(Config, "/overview"),
|
|
%% assert there are two non-empty connection records
|
|
ObjTots = maps:get(object_totals, Res),
|
|
QT = maps:get(queue_totals, Res),
|
|
MS = maps:get(message_stats, Res),
|
|
ChurnRates = maps:get(churn_rates, Res),
|
|
{maps:get(connections, ObjTots) >= 2,
|
|
maps:get(channels, ObjTots) >= 2,
|
|
maps:get(messages_ready, QT) >= 2,
|
|
maps:get(publish, MS) >= 2,
|
|
maps:get(queue_declared, ChurnRates),
|
|
maps:get(queue_created, ChurnRates),
|
|
maps:get(queue_deleted, ChurnRates),
|
|
maps:get(channel_created, ChurnRates),
|
|
maps:get(channel_closed, ChurnRates),
|
|
maps:get(connection_closed, ChurnRates)}
|
|
end),
|
|
1000, 60),
|
|
|
|
http_delete(Config, "/queues/%2F/queue-n1", ?NO_CONTENT),
|
|
http_delete(Config, "/queues/%2F/queue-n2", ?NO_CONTENT),
|
|
|
|
amqp_channel:close(Chan),
|
|
amqp_channel:close(Chan2),
|
|
rabbit_ct_client_helpers:close_connection(Conn),
|
|
rabbit_ct_client_helpers:close_connection(Conn2),
|
|
|
|
ok.
|
|
|
|
disable_plugin(Config) ->
|
|
Node = get_node_config(Config, 0, nodename),
|
|
Status0 = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit, status, []),
|
|
Listeners0 = proplists:get_value(listeners, Status0),
|
|
?assert(lists:member(http, listener_protos(Listeners0))),
|
|
rabbit_ct_broker_helpers:disable_plugin(Config, Node, 'rabbitmq_web_dispatch'),
|
|
Status = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit, status, []),
|
|
Listeners = proplists:get_value(listeners, Status),
|
|
?assert(not lists:member(http, listener_protos(Listeners))),
|
|
rabbit_ct_broker_helpers:enable_plugin(Config, Node, 'rabbitmq_management').
|
|
|
|
%%----------------------------------------------------------------------------
|
|
%%
|
|
|
|
clear_all_table_data() ->
|
|
[ets:delete_all_objects(T) || {T, _} <- ?CORE_TABLES],
|
|
rabbit_mgmt_storage:reset(),
|
|
[gen_server:call(P, purge_cache)
|
|
|| {_, P, _, _} <- supervisor:which_children(rabbit_mgmt_db_cache_sup)],
|
|
send_to_all_collectors(purge_old_stats).
|
|
|
|
get_channel_name(Config, Node) ->
|
|
[{_, ChData}|_] = rabbit_ct_broker_helpers:rpc(Config, Node, ets, tab2list,
|
|
[channel_created]),
|
|
uri_string:recompose(#{path => binary_to_list(pget(name, ChData))}).
|
|
|
|
consume(Channel, Queue) ->
|
|
#'basic.consume_ok'{consumer_tag = Tag} =
|
|
amqp_channel:call(Channel, #'basic.consume'{queue = Queue}),
|
|
Tag.
|
|
|
|
publish(Channel, Key) ->
|
|
Payload = <<"foobar">>,
|
|
Publish = #'basic.publish'{routing_key = Key},
|
|
amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}).
|
|
|
|
basic_get(Channel, Queue) ->
|
|
Publish = #'basic.get'{queue = Queue},
|
|
amqp_channel:call(Channel, Publish).
|
|
|
|
publish_to(Channel, Exchange, Key) ->
|
|
Payload = <<"foobar">>,
|
|
Publish = #'basic.publish'{routing_key = Key,
|
|
exchange = Exchange},
|
|
amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}).
|
|
|
|
exchange_declare(Chan, Name) ->
|
|
Declare = #'exchange.declare'{exchange = Name},
|
|
#'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare).
|
|
|
|
queue_declare(Chan) ->
|
|
Declare = #'queue.declare'{},
|
|
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Chan, Declare),
|
|
Q.
|
|
|
|
queue_declare(Chan, Name) ->
|
|
Declare = #'queue.declare'{queue = Name},
|
|
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Chan, Declare),
|
|
Q.
|
|
|
|
queue_declare_durable(Chan, Name) ->
|
|
Declare = #'queue.declare'{queue = Name, durable = true, exclusive = false},
|
|
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Chan, Declare),
|
|
Q.
|
|
|
|
queue_declare_quorum(Chan, Name) ->
|
|
Declare = #'queue.declare'{
|
|
queue = Name,
|
|
durable = true,
|
|
arguments = [
|
|
{<<"x-queue-type">>, longstr, <<"quorum">>}
|
|
]
|
|
},
|
|
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Chan, Declare),
|
|
Q.
|
|
|
|
|
|
queue_bind(Chan, Ex, Q, Key) ->
|
|
Binding = #'queue.bind'{queue = Q,
|
|
exchange = Ex,
|
|
routing_key = Key},
|
|
#'queue.bind_ok'{} = amqp_channel:call(Chan, Binding).
|
|
|
|
wait_for_queue(Config, Path) ->
|
|
wait_for_queue(Config, Path, []).
|
|
|
|
wait_for_queue(Config, Path, Keys) ->
|
|
wait_for_queue(Config, Path, Keys, 1000).
|
|
|
|
wait_for_queue(_Config, Path, Keys, 0) ->
|
|
exit({timeout, {Path, Keys}});
|
|
|
|
wait_for_queue(Config, Path, Keys, Count) ->
|
|
Res = http_get(Config, Path),
|
|
case present(Keys, Res) of
|
|
false -> timer:sleep(10),
|
|
wait_for_queue(Config, Path, Keys, Count - 1);
|
|
true -> Res
|
|
end.
|
|
|
|
present([], _Res) ->
|
|
true;
|
|
present(Keys, Res) ->
|
|
lists:all(fun (Key) ->
|
|
X = maps:get(Key, Res, undefined),
|
|
X =/= [] andalso X =/= undefined
|
|
end, Keys).
|
|
|
|
extract_node(N) ->
|
|
list_to_atom(hd(string:tokens(binary_to_list(N), "@"))).
|
|
|
|
%% debugging utilities
|
|
|
|
trace_fun(Config, MFs) ->
|
|
Nodename1 = get_node_config(Config, 0, nodename),
|
|
Nodename2 = get_node_config(Config, 1, nodename),
|
|
dbg:tracer(process, {fun(A,_) ->
|
|
ct:pal(?LOW_IMPORTANCE,
|
|
"TRACE: ~tp", [A])
|
|
end, ok}),
|
|
dbg:n(Nodename1),
|
|
dbg:n(Nodename2),
|
|
dbg:p(all,c),
|
|
[ dbg:tpl(M, F, cx) || {M, F} <- MFs],
|
|
[ dbg:tpl(M, F, A, cx) || {M, F, A} <- MFs].
|
|
|
|
dump_table(Config, Table) ->
|
|
Data = rabbit_ct_broker_helpers:rpc(Config, 0, ets, tab2list, [Table]),
|
|
ct:pal(?LOW_IMPORTANCE, "Node 0: Dump of table ~tp:~n~tp~n", [Table, Data]),
|
|
Data0 = rabbit_ct_broker_helpers:rpc(Config, 1, ets, tab2list, [Table]),
|
|
ct:pal(?LOW_IMPORTANCE, "Node 1: Dump of table ~tp:~n~tp~n", [Table, Data0]).
|
|
|
|
force_stats(Config) ->
|
|
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
|
force_all(Nodes),
|
|
ok.
|
|
|
|
force_all(Nodes) ->
|
|
lists:append(
|
|
[begin
|
|
ExtStats = {rabbit_mgmt_external_stats, N},
|
|
ExtStats ! emit_update,
|
|
[ExtStats |
|
|
[begin
|
|
Name = {rabbit_mgmt_metrics_collector:name(Table), N},
|
|
Name ! collect_metrics,
|
|
Name
|
|
end
|
|
|| {Table, _} <- ?CORE_TABLES]]
|
|
end || N <- Nodes]).
|
|
|
|
send_to_all_collectors(Msg) ->
|
|
[begin
|
|
[{rabbit_mgmt_metrics_collector:name(Table), N} ! Msg
|
|
|| {Table, _} <- ?CORE_TABLES]
|
|
end || N <- [node() | nodes()]].
|
|
|
|
listener_protos(Listeners) ->
|
|
[listener_proto(L) || L <- Listeners].
|
|
|
|
listener_proto(#listener{protocol = Proto}) ->
|
|
Proto;
|
|
listener_proto(Proto) when is_atom(Proto) ->
|
|
Proto;
|
|
%% rabbit:status/0 used this formatting before rabbitmq/rabbitmq-cli#340
|
|
listener_proto({Proto, _Port, _Interface}) ->
|
|
Proto.
|