578 lines
22 KiB
Erlang
578 lines
22 KiB
Erlang
%% This Source Code Form is subject to the terms of the Mozilla Public
|
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
%%
|
|
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
|
%%
|
|
|
|
-module(rabbitmq_4_0_deprecations_SUITE).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
|
|
|
-export([suite/0,
|
|
all/0,
|
|
groups/0,
|
|
init_per_suite/1,
|
|
end_per_suite/1,
|
|
init_per_group/2,
|
|
end_per_group/2,
|
|
init_per_testcase/2,
|
|
end_per_testcase/2,
|
|
|
|
when_global_qos_is_permitted_by_default/1,
|
|
when_global_qos_is_not_permitted_from_conf/1,
|
|
|
|
join_when_ram_node_type_is_permitted_by_default/1,
|
|
join_when_ram_node_type_is_not_permitted_from_conf/1,
|
|
|
|
set_policy_when_cmq_is_permitted_by_default/1,
|
|
set_policy_when_cmq_is_not_permitted_from_conf/1,
|
|
|
|
when_transient_nonexcl_is_permitted_by_default/1,
|
|
when_transient_nonexcl_is_not_permitted_from_conf/1,
|
|
|
|
when_queue_master_locator_is_permitted_by_default/1,
|
|
when_queue_master_locator_is_not_permitted_from_conf/1
|
|
]).
|
|
|
|
suite() ->
|
|
[{timetrap, {minutes, 5}}].
|
|
|
|
all() ->
|
|
[
|
|
{group, mnesia_store},
|
|
{group, khepri_store}
|
|
].
|
|
|
|
groups() ->
|
|
Groups = [
|
|
{global_qos, [],
|
|
[when_global_qos_is_permitted_by_default,
|
|
when_global_qos_is_not_permitted_from_conf]},
|
|
{ram_node_type, [],
|
|
[join_when_ram_node_type_is_permitted_by_default,
|
|
join_when_ram_node_type_is_not_permitted_from_conf]},
|
|
{classic_queue_mirroring, [],
|
|
[set_policy_when_cmq_is_permitted_by_default,
|
|
set_policy_when_cmq_is_not_permitted_from_conf]},
|
|
{transient_nonexcl_queues, [],
|
|
[when_transient_nonexcl_is_permitted_by_default,
|
|
when_transient_nonexcl_is_not_permitted_from_conf]},
|
|
{queue_master_locator, [],
|
|
[when_queue_master_locator_is_permitted_by_default,
|
|
when_queue_master_locator_is_not_permitted_from_conf]}
|
|
],
|
|
[{mnesia_store, [], Groups},
|
|
{khepri_store, [], Groups}].
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Testsuite setup/teardown.
|
|
%% -------------------------------------------------------------------
|
|
|
|
init_per_suite(Config) ->
|
|
rabbit_ct_helpers:log_environment(),
|
|
logger:set_primary_config(level, debug),
|
|
rabbit_ct_helpers:run_setup_steps(Config, []).
|
|
|
|
end_per_suite(Config) ->
|
|
Config.
|
|
|
|
init_per_group(mnesia_store, Config) ->
|
|
rabbit_ct_helpers:set_config(Config, [{metadata_store, mnesia}]);
|
|
init_per_group(khepri_store, Config) ->
|
|
rabbit_ct_helpers:set_config(Config, [{metadata_store, khepri}]);
|
|
init_per_group(global_qos, Config) ->
|
|
rabbit_ct_helpers:set_config(Config, {rmq_nodes_count, 1});
|
|
init_per_group(ram_node_type, Config) ->
|
|
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 2},
|
|
{rmq_nodes_clustered, false}]);
|
|
init_per_group(classic_queue_mirroring, Config) ->
|
|
rabbit_ct_helpers:set_config(Config, {rmq_nodes_count, 1});
|
|
init_per_group(transient_nonexcl_queues, Config) ->
|
|
rabbit_ct_helpers:set_config(Config, {rmq_nodes_count, 1});
|
|
init_per_group(queue_master_locator, Config) ->
|
|
rabbit_ct_helpers:set_config(Config, {rmq_nodes_count, 1});
|
|
init_per_group(_Group, Config) ->
|
|
Config.
|
|
|
|
end_per_group(_Group, Config) ->
|
|
Config.
|
|
|
|
init_per_testcase(
|
|
when_global_qos_is_not_permitted_from_conf = Testcase, Config) ->
|
|
Config1 = rabbit_ct_helpers:merge_app_env(
|
|
Config,
|
|
{rabbit,
|
|
[{permit_deprecated_features, #{global_qos => false}}]}),
|
|
init_per_testcase1(Testcase, Config1);
|
|
init_per_testcase(
|
|
join_when_ram_node_type_is_not_permitted_from_conf = Testcase, Config) ->
|
|
Config1 = rabbit_ct_helpers:merge_app_env(
|
|
Config,
|
|
{rabbit,
|
|
[{permit_deprecated_features, #{ram_node_type => false}}]}),
|
|
init_per_testcase1(Testcase, Config1);
|
|
init_per_testcase(
|
|
set_policy_when_cmq_is_not_permitted_from_conf = Testcase, Config) ->
|
|
Config1 = rabbit_ct_helpers:merge_app_env(
|
|
Config,
|
|
{rabbit,
|
|
[{permit_deprecated_features,
|
|
#{classic_queue_mirroring => false}}]}),
|
|
init_per_testcase1(Testcase, Config1);
|
|
init_per_testcase(
|
|
when_transient_nonexcl_is_not_permitted_from_conf = Testcase, Config) ->
|
|
Config1 = rabbit_ct_helpers:merge_app_env(
|
|
Config,
|
|
{rabbit,
|
|
[{permit_deprecated_features,
|
|
#{transient_nonexcl_queues => false}}]}),
|
|
init_per_testcase1(Testcase, Config1);
|
|
init_per_testcase(
|
|
when_queue_master_locator_is_not_permitted_from_conf = Testcase, Config) ->
|
|
Config1 = rabbit_ct_helpers:merge_app_env(
|
|
Config,
|
|
{rabbit,
|
|
[{permit_deprecated_features,
|
|
#{queue_master_locator => false}}]}),
|
|
init_per_testcase1(Testcase, Config1);
|
|
init_per_testcase(Testcase, Config) ->
|
|
init_per_testcase1(Testcase, Config).
|
|
|
|
init_per_testcase1(Testcase, Config) ->
|
|
rabbit_ct_helpers:testcase_started(Config, Testcase),
|
|
ClusterSize = ?config(rmq_nodes_count, Config),
|
|
TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
|
|
Config1 = rabbit_ct_helpers:set_config(Config, [
|
|
{rmq_nodename_suffix, Testcase},
|
|
{tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}},
|
|
{keep_pid_file_on_exit, true}
|
|
]),
|
|
rabbit_ct_helpers:run_steps(Config1,
|
|
rabbit_ct_broker_helpers:setup_steps() ++
|
|
rabbit_ct_client_helpers:setup_steps()).
|
|
|
|
end_per_testcase(Testcase, Config) ->
|
|
Config1 = rabbit_ct_helpers:run_steps(Config,
|
|
rabbit_ct_client_helpers:teardown_steps() ++
|
|
rabbit_ct_broker_helpers:teardown_steps()),
|
|
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Global QoS.
|
|
%% -------------------------------------------------------------------
|
|
|
|
when_global_qos_is_permitted_by_default(Config) ->
|
|
[NodeA] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
|
|
|
ExistingServerChs = list_server_channels(Config, NodeA),
|
|
ClientCh = rabbit_ct_client_helpers:open_channel(Config, NodeA),
|
|
[ServerCh] = list_server_channels(Config, NodeA) -- ExistingServerChs,
|
|
|
|
?assertNot(is_prefetch_limited(ServerCh)),
|
|
|
|
%% It's possible to request global QoS and it is accepted by the server.
|
|
?assertMatch(
|
|
#'basic.qos_ok'{},
|
|
amqp_channel:call(
|
|
ClientCh,
|
|
#'basic.qos'{global = true, prefetch_count = 10})),
|
|
?assert(is_prefetch_limited(ServerCh)),
|
|
|
|
?assert(
|
|
log_file_contains_message(
|
|
Config, NodeA,
|
|
["Deprecated features: `global_qos`: Feature `global_qos` is deprecated",
|
|
"By default, this feature can still be used for now."])).
|
|
|
|
when_global_qos_is_not_permitted_from_conf(Config) ->
|
|
[NodeA] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
|
|
|
ExistingServerChs = list_server_channels(Config, NodeA),
|
|
ClientCh = rabbit_ct_client_helpers:open_channel(Config, NodeA),
|
|
[ServerCh] = list_server_channels(Config, NodeA) -- ExistingServerChs,
|
|
|
|
?assertNot(is_prefetch_limited(ServerCh)),
|
|
|
|
%% It's possible to request global QoS but it is ignored by the server.
|
|
?assertMatch(
|
|
#'basic.qos_ok'{},
|
|
amqp_channel:call(
|
|
ClientCh,
|
|
#'basic.qos'{global = true, prefetch_count = 10})),
|
|
?assertNot(is_prefetch_limited(ServerCh)),
|
|
|
|
?assert(
|
|
log_file_contains_message(
|
|
Config, NodeA,
|
|
["Deprecated features: `global_qos`: Feature `global_qos` is deprecated",
|
|
"Its use is not permitted per the configuration"])).
|
|
|
|
list_server_channels(Config, Node) ->
|
|
rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_channel, list, []).
|
|
|
|
is_prefetch_limited(ServerCh) ->
|
|
GenServer2State = sys:get_state(ServerCh),
|
|
ChState = element(4, GenServer2State),
|
|
ct:pal("Server channel (~p) state: ~p", [ServerCh, ChState]),
|
|
LimiterState = element(3, ChState),
|
|
element(3, LimiterState).
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% RAM node type.
|
|
%% -------------------------------------------------------------------
|
|
|
|
join_when_ram_node_type_is_permitted_by_default(Config) ->
|
|
case ?config(metadata_store, Config) of
|
|
mnesia ->
|
|
join_when_ram_node_type_is_permitted_by_default_mnesia(Config);
|
|
khepri ->
|
|
join_when_ram_node_type_is_permitted_by_default_khepri(Config)
|
|
end.
|
|
|
|
join_when_ram_node_type_is_permitted_by_default_mnesia(Config) ->
|
|
[NodeA, NodeB] = rabbit_ct_broker_helpers:get_node_configs(
|
|
Config, nodename),
|
|
|
|
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(
|
|
Config, NodeA, ["stop_app"]),
|
|
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(
|
|
Config, NodeA, ["join_cluster", "--ram", atom_to_list(NodeB)]),
|
|
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(
|
|
Config, NodeA, ["start_app"]),
|
|
|
|
?assertEqual([NodeA, NodeB], get_all_nodes(Config, NodeA)),
|
|
?assertEqual([NodeA, NodeB], get_all_nodes(Config, NodeB)),
|
|
?assertEqual([NodeB], get_disc_nodes(Config, NodeA)),
|
|
?assertEqual([NodeB], get_disc_nodes(Config, NodeB)),
|
|
|
|
?assert(
|
|
log_file_contains_message(
|
|
Config, NodeA,
|
|
["Deprecated features: `ram_node_type`: Feature `ram_node_type` is "
|
|
"deprecated",
|
|
"By default, this feature can still be used for now."])),
|
|
|
|
%% Change the advanced configuration file to turn off RAM node type.
|
|
ConfigFilename0 = rabbit_ct_broker_helpers:get_node_config(
|
|
Config, NodeA, erlang_node_config_filename),
|
|
ConfigFilename = ConfigFilename0 ++ ".config",
|
|
{ok, [ConfigContent0]} = file:consult(ConfigFilename),
|
|
ConfigContent1 = rabbit_ct_helpers:merge_app_env_in_erlconf(
|
|
ConfigContent0,
|
|
{rabbit, [{permit_deprecated_features,
|
|
#{ram_node_type => false}}]}),
|
|
ConfigContent2 = lists:flatten(io_lib:format("~p.~n", [ConfigContent1])),
|
|
ok = file:write_file(ConfigFilename, ConfigContent2),
|
|
?assertEqual({ok, [ConfigContent1]}, file:consult(ConfigFilename)),
|
|
|
|
%% Restart the node and see if it was correctly converted to a disc node.
|
|
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(
|
|
Config, NodeA, ["stop_app"]),
|
|
Ret = rabbit_ct_broker_helpers:rabbitmqctl(Config, NodeA, ["start_app"]),
|
|
|
|
case Ret of
|
|
{ok, _} ->
|
|
?assertEqual([NodeA, NodeB], get_all_nodes(Config, NodeA)),
|
|
?assertEqual([NodeA, NodeB], get_all_nodes(Config, NodeB)),
|
|
?assertEqual([NodeA, NodeB], get_disc_nodes(Config, NodeA)),
|
|
?assertEqual([NodeA, NodeB], get_disc_nodes(Config, NodeB));
|
|
{error, 69, Message} ->
|
|
Ret1 = re:run(
|
|
Message, "incompatible_feature_flags",
|
|
[{capture, none}]),
|
|
case Ret1 of
|
|
match ->
|
|
{skip, "Incompatible feature flags between nodes A and B"};
|
|
_ ->
|
|
throw(Ret)
|
|
end
|
|
end.
|
|
|
|
join_when_ram_node_type_is_permitted_by_default_khepri(Config) ->
|
|
[NodeA, NodeB] = rabbit_ct_broker_helpers:get_node_configs(
|
|
Config, nodename),
|
|
|
|
IsPermitted = rabbit_ct_broker_helpers:rpc(
|
|
Config, NodeA,
|
|
rabbit_deprecated_features, is_permitted,
|
|
[ram_node_type]),
|
|
|
|
ok = rabbit_control_helper:command(stop_app, NodeA),
|
|
?assertMatch(
|
|
{error, 70,
|
|
<<"Error:\nError: `ram` node type is unsupported", _/binary>>},
|
|
rabbit_control_helper:command_with_output(
|
|
join_cluster, NodeA,
|
|
[atom_to_list(NodeB)], [{"--ram", true}])),
|
|
ok = rabbit_control_helper:command(start_app, NodeA),
|
|
|
|
?assertEqual(
|
|
IsPermitted,
|
|
rabbit_ct_broker_helpers:rpc(
|
|
Config, NodeA,
|
|
rabbit_deprecated_features, is_permitted, [ram_node_type])),
|
|
|
|
?assertEqual([NodeA], get_all_nodes(Config, NodeA)),
|
|
?assertEqual([NodeB], get_all_nodes(Config, NodeB)),
|
|
?assertEqual([NodeA], get_disc_nodes(Config, NodeA)),
|
|
?assertEqual([NodeB], get_disc_nodes(Config, NodeB)).
|
|
|
|
join_when_ram_node_type_is_not_permitted_from_conf(Config) ->
|
|
case ?config(metadata_store, Config) of
|
|
mnesia ->
|
|
join_when_ram_node_type_is_not_permitted_from_conf_mnesia(Config);
|
|
khepri ->
|
|
join_when_ram_node_type_is_not_permitted_from_conf_khepri(Config)
|
|
end.
|
|
|
|
join_when_ram_node_type_is_not_permitted_from_conf_mnesia(Config) ->
|
|
[NodeA, NodeB] = rabbit_ct_broker_helpers:get_node_configs(
|
|
Config, nodename),
|
|
|
|
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(
|
|
Config, NodeA, ["stop_app"]),
|
|
Ret = rabbit_ct_broker_helpers:rabbitmqctl(
|
|
Config, NodeA, ["join_cluster", "--ram", atom_to_list(NodeB)]),
|
|
case Ret of
|
|
{ok, _} ->
|
|
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(
|
|
Config, NodeA, ["start_app"]),
|
|
|
|
?assertEqual([NodeA, NodeB], get_all_nodes(Config, NodeA)),
|
|
?assertEqual([NodeA, NodeB], get_all_nodes(Config, NodeB)),
|
|
?assertEqual([NodeA, NodeB], get_disc_nodes(Config, NodeA)),
|
|
?assertEqual([NodeA, NodeB], get_disc_nodes(Config, NodeB)),
|
|
|
|
?assert(
|
|
log_file_contains_message(
|
|
Config, NodeA,
|
|
["Deprecated features: `ram_node_type`: Feature `ram_node_type` is deprecated",
|
|
"Its use is not permitted per the configuration"]));
|
|
{error, 69, Message} ->
|
|
Ret1 = re:run(
|
|
Message, "incompatible_feature_flags",
|
|
[{capture, none}]),
|
|
case Ret1 of
|
|
match ->
|
|
{skip, "Incompatible feature flags between nodes A and B"};
|
|
_ ->
|
|
throw(Ret)
|
|
end
|
|
end.
|
|
|
|
join_when_ram_node_type_is_not_permitted_from_conf_khepri(Config) ->
|
|
[NodeA, NodeB] = rabbit_ct_broker_helpers:get_node_configs(
|
|
Config, nodename),
|
|
|
|
IsPermitted = rabbit_ct_broker_helpers:rpc(
|
|
Config, NodeA,
|
|
rabbit_deprecated_features, is_permitted,
|
|
[ram_node_type]),
|
|
|
|
ok = rabbit_control_helper:command(stop_app, NodeA),
|
|
?assertMatch(
|
|
{error, 70,
|
|
<<"Error:\nError: `ram` node type is unsupported", _/binary>>},
|
|
rabbit_control_helper:command_with_output(
|
|
join_cluster, NodeA,
|
|
[atom_to_list(NodeB)], [{"--ram", true}])),
|
|
ok = rabbit_control_helper:command(start_app, NodeA),
|
|
|
|
?assertEqual(
|
|
IsPermitted,
|
|
rabbit_ct_broker_helpers:rpc(
|
|
Config, NodeA,
|
|
rabbit_deprecated_features, is_permitted, [ram_node_type])),
|
|
|
|
?assertEqual([NodeA], get_all_nodes(Config, NodeA)),
|
|
?assertEqual([NodeB], get_all_nodes(Config, NodeB)),
|
|
?assertEqual([NodeA], get_disc_nodes(Config, NodeA)),
|
|
?assertEqual([NodeB], get_disc_nodes(Config, NodeB)).
|
|
|
|
get_all_nodes(Config, Node) ->
|
|
Nodes = case rabbit_khepri:is_enabled(Node) of
|
|
true ->
|
|
rabbit_ct_broker_helpers:rpc(
|
|
Config, Node, rabbit_khepri, locally_known_nodes, []);
|
|
false ->
|
|
rabbit_ct_broker_helpers:rpc(
|
|
Config, Node, rabbit_mnesia, cluster_nodes, [all])
|
|
end,
|
|
lists:sort(Nodes).
|
|
|
|
get_disc_nodes(Config, Node) ->
|
|
Nodes = case rabbit_khepri:is_enabled(Node) of
|
|
true ->
|
|
rabbit_ct_broker_helpers:rpc(
|
|
Config, Node, rabbit_khepri, locally_known_nodes, []);
|
|
false ->
|
|
rabbit_ct_broker_helpers:rpc(
|
|
Config, Node, rabbit_mnesia, cluster_nodes, [disc])
|
|
end,
|
|
lists:sort(Nodes).
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Classic queue mirroring.
|
|
%% -------------------------------------------------------------------
|
|
|
|
set_policy_when_cmq_is_permitted_by_default(Config) ->
|
|
set_cmq_policy(Config).
|
|
|
|
set_policy_when_cmq_is_not_permitted_from_conf(Config) ->
|
|
set_cmq_policy(Config).
|
|
|
|
set_cmq_policy(Config) ->
|
|
%% CMQ have been removed, any attempt to set a policy
|
|
%% should fail as any other unknown policy.
|
|
?assertError(
|
|
{badmatch,
|
|
{error_string,
|
|
"Validation failed\n\n[{<<\"ha-mode\">>,<<\"all\">>}] are not recognised policy settings" ++ _}},
|
|
rabbit_ct_broker_helpers:set_policy(
|
|
Config, 0, <<"ha">>, <<".*">>, <<"queues">>, [{<<"ha-mode">>, <<"all">>}])),
|
|
|
|
[NodeA] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
|
|
|
?assertNot(
|
|
log_file_contains_message(
|
|
Config, NodeA,
|
|
["Deprecated features: `classic_queue_mirroring`: Classic mirrored queues have been removed."])).
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Transient non-exclusive queues.
|
|
%% -------------------------------------------------------------------
|
|
|
|
when_transient_nonexcl_is_permitted_by_default(Config) ->
|
|
[NodeA] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
|
|
|
Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA),
|
|
|
|
QName = list_to_binary(atom_to_list(?FUNCTION_NAME)),
|
|
?assertEqual(
|
|
{'queue.declare_ok', QName, 0, 0},
|
|
amqp_channel:call(
|
|
Ch,
|
|
#'queue.declare'{queue = QName,
|
|
durable = false,
|
|
exclusive = false})),
|
|
|
|
?assert(
|
|
log_file_contains_message(
|
|
Config, NodeA,
|
|
["Deprecated features: `transient_nonexcl_queues`: Feature `transient_nonexcl_queues` is deprecated",
|
|
"By default, this feature can still be used for now."])).
|
|
|
|
when_transient_nonexcl_is_not_permitted_from_conf(Config) ->
|
|
[NodeA] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
|
|
|
Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA),
|
|
|
|
QName = list_to_binary(atom_to_list(?FUNCTION_NAME)),
|
|
?assertExit(
|
|
{{shutdown,
|
|
{connection_closing,
|
|
{server_initiated_close, 541,
|
|
<<"INTERNAL_ERROR - Feature `transient_nonexcl_queues` is "
|
|
"deprecated.", _/binary>>}}}, _},
|
|
amqp_channel:call(
|
|
Ch,
|
|
#'queue.declare'{queue = QName,
|
|
durable = false,
|
|
exclusive = false})),
|
|
|
|
?assert(
|
|
log_file_contains_message(
|
|
Config, NodeA,
|
|
["Deprecated features: `transient_nonexcl_queues`: Feature `transient_nonexcl_queues` is deprecated",
|
|
"Its use is not permitted per the configuration"])).
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% (x-)queue-master-locator
|
|
%% -------------------------------------------------------------------
|
|
|
|
when_queue_master_locator_is_permitted_by_default(Config) ->
|
|
[NodeA] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
|
|
|
Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA),
|
|
|
|
QName = list_to_binary(atom_to_list(?FUNCTION_NAME)),
|
|
?assertEqual(
|
|
{'queue.declare_ok', QName, 0, 0},
|
|
amqp_channel:call(
|
|
Ch,
|
|
#'queue.declare'{queue = QName,
|
|
arguments = [{<<"x-queue-master-locator">>, longstr, <<"client-local">>}]})),
|
|
|
|
?assertEqual(
|
|
ok,
|
|
rabbit_ct_broker_helpers:set_policy(
|
|
Config, 0, <<"client-local">>, <<".*">>, <<"queues">>, [{<<"queue-master-locator">>, <<"client-local">>}])),
|
|
|
|
?assert(
|
|
log_file_contains_message(
|
|
Config, NodeA,
|
|
["Deprecated features: `queue_master_locator`: queue-master-locator is deprecated"])).
|
|
|
|
when_queue_master_locator_is_not_permitted_from_conf(Config) ->
|
|
[NodeA] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
|
|
|
Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA),
|
|
|
|
QName = list_to_binary(atom_to_list(?FUNCTION_NAME)),
|
|
?assertExit(
|
|
{{shutdown,
|
|
{connection_closing,
|
|
{server_initiated_close, 541,
|
|
<<"INTERNAL_ERROR - Feature `queue_master_locator` is "
|
|
"deprecated.", _/binary>>}}}, _},
|
|
amqp_channel:call(
|
|
Ch,
|
|
#'queue.declare'{queue = QName,
|
|
arguments = [{<<"x-queue-master-locator">>, longstr, <<"client-local">>}]})),
|
|
|
|
?assertError(
|
|
{badmatch,
|
|
{error_string,
|
|
"Validation failed\n\nuse of deprecated queue-master-locator argument is not permitted\n"}},
|
|
rabbit_ct_broker_helpers:set_policy(
|
|
Config, 0, <<"client-local">>, <<".*">>, <<"queues">>, [{<<"queue-master-locator">>, <<"client-local">>}])),
|
|
|
|
?assert(
|
|
log_file_contains_message(
|
|
Config, NodeA,
|
|
["Deprecated features: `queue_master_locator`: Feature `queue_master_locator` is deprecated",
|
|
"Its use is not permitted per the configuration"])).
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Helpers.
|
|
%% -------------------------------------------------------------------
|
|
|
|
log_file_contains_message(Config, Node, Messages) ->
|
|
_ = catch rabbit_ct_broker_helpers:rpc(
|
|
Config, Node, rabbit_logger_std_h, filesync, [rmq_1_file_1]),
|
|
_ = catch rabbit_ct_broker_helpers:rpc(
|
|
Config, Node, rabbit_logger_std_h, filesync, [rmq_2_file_1]),
|
|
LogLocations = rabbit_ct_broker_helpers:rpc(
|
|
Config, Node, rabbit, log_locations, []),
|
|
LogFiles = [LogLocation ||
|
|
LogLocation <- LogLocations,
|
|
filelib:is_regular(LogLocation)],
|
|
?assertNotEqual([], LogFiles),
|
|
lists:any(
|
|
fun(LogFile) ->
|
|
{ok, Content} = file:read_file(LogFile),
|
|
find_messages(Content, Messages)
|
|
end, LogFiles).
|
|
|
|
find_messages(Content, [Message | Rest]) ->
|
|
case string:find(Content, Message) of
|
|
nomatch -> false;
|
|
Remaining -> find_messages(Remaining, Rest)
|
|
end;
|
|
find_messages(_Content, []) ->
|
|
true.
|