Avoid policy name collisions in stream queue suite

This commit is contained in:
Karl Nilsson 2022-07-26 12:05:22 +01:00
parent a5ced5d97e
commit c9b414a26e
1 changed files with 24 additions and 17 deletions

View File

@ -1828,9 +1828,11 @@ initial_cluster_size_two(Config) ->
initial_cluster_size_one_policy(Config) -> initial_cluster_size_one_policy(Config) ->
[Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
PolicyName = atom_to_binary(?FUNCTION_NAME),
ok = rabbit_ct_broker_helpers:set_policy( ok = rabbit_ct_broker_helpers:set_policy(
Config, 0, <<"cluster-size">>, <<"initial_cluster_size_one_policy">>, <<"queues">>, Config, 0, PolicyName, <<"initial_cluster_size_one_policy">>,
<<"queues">>,
[{<<"initial-cluster-size">>, 1}]), [{<<"initial-cluster-size">>, 1}]),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
@ -1844,7 +1846,7 @@ initial_cluster_size_one_policy(Config) ->
?assertMatch(#'queue.delete_ok'{}, ?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q})), amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"cluster-size">>), ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, PolicyName),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
declare_delete_same_stream(Config) -> declare_delete_same_stream(Config) ->
@ -1984,8 +1986,10 @@ leader_locator_policy(Config) ->
Bin = rabbit_data_coercion:to_binary(?FUNCTION_NAME), Bin = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
Q1 = <<Bin/binary, "_q1">>, Q1 = <<Bin/binary, "_q1">>,
PolicyName = atom_to_binary(?FUNCTION_NAME),
ok = rabbit_ct_broker_helpers:set_policy( ok = rabbit_ct_broker_helpers:set_policy(
Config, 0, <<"my-leader-locator">>, Q, <<"queues">>, Config, 0, PolicyName, Q, <<"queues">>,
[{<<"queue-leader-locator">>, <<"balanced">>}]), [{<<"queue-leader-locator">>, <<"balanced">>}]),
?assertEqual({'queue.declare_ok', Q1, 0, 0}, ?assertEqual({'queue.declare_ok', Q1, 0, 0},
@ -1995,14 +1999,14 @@ leader_locator_policy(Config) ->
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
Info = find_queue_info(Config, [policy, operator_policy, effective_policy_definition, leader]), Info = find_queue_info(Config, [policy, operator_policy, effective_policy_definition, leader]),
?assertEqual(<<"my-leader-locator">>, proplists:get_value(policy, Info)), ?assertEqual(PolicyName, proplists:get_value(policy, Info)),
?assertEqual('', proplists:get_value(operator_policy, Info)), ?assertEqual('', proplists:get_value(operator_policy, Info)),
?assertEqual([{<<"queue-leader-locator">>, <<"balanced">>}], ?assertEqual([{<<"queue-leader-locator">>, <<"balanced">>}],
proplists:get_value(effective_policy_definition, Info)), proplists:get_value(effective_policy_definition, Info)),
Leader = proplists:get_value(leader, Info), Leader = proplists:get_value(leader, Info),
?assert(lists:member(Leader, [Server2, Server3])), ?assert(lists:member(Leader, [Server2, Server3])),
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"my-leader-locator">>), ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, PolicyName),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, [[Q1, Q]]). rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, [[Q1, Q]]).
queue_size_on_declare(Config) -> queue_size_on_declare(Config) ->
@ -2071,24 +2075,26 @@ max_age_policy(Config) ->
Q = ?config(queue_name, Config), Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0}, ?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
PolicyName = atom_to_binary(?FUNCTION_NAME),
ok = rabbit_ct_broker_helpers:set_policy( ok = rabbit_ct_broker_helpers:set_policy(
Config, 0, <<"age">>, <<"max_age_policy.*">>, <<"queues">>, Config, 0, PolicyName, <<"max_age_policy.*">>, <<"queues">>,
[{<<"max-age">>, <<"1Y">>}]), [{<<"max-age">>, <<"1Y">>}]),
Info = find_queue_info(Config, [policy, operator_policy, effective_policy_definition]), Info = find_queue_info(Config, [policy, operator_policy, effective_policy_definition]),
?assertEqual(<<"age">>, proplists:get_value(policy, Info)), ?assertEqual(PolicyName, proplists:get_value(policy, Info)),
?assertEqual('', proplists:get_value(operator_policy, Info)), ?assertEqual('', proplists:get_value(operator_policy, Info)),
?assertEqual([{<<"max-age">>, <<"1Y">>}], ?assertEqual([{<<"max-age">>, <<"1Y">>}],
proplists:get_value(effective_policy_definition, Info)), proplists:get_value(effective_policy_definition, Info)),
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"age">>), ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, PolicyName),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
update_retention_policy(Config) -> update_retention_policy(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
PolicyName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server), Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config), Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0}, ?assertEqual({'queue.declare_ok', Q, 0, 0},
@ -2104,7 +2110,7 @@ update_retention_policy(Config) ->
[rabbit_misc:r(<<"/">>, queue, Q)]), [rabbit_misc:r(<<"/">>, queue, Q)]),
%% Don't use time based retention, it's really hard to get those tests right %% Don't use time based retention, it's really hard to get those tests right
ok = rabbit_ct_broker_helpers:set_policy( ok = rabbit_ct_broker_helpers:set_policy(
Config, 0, <<"retention">>, <<"update_retention_policy.*">>, <<"queues">>, Config, 0, PolicyName, <<"update_retention_policy.*">>, <<"queues">>,
[{<<"max-length-bytes">>, 10000}]), [{<<"max-length-bytes">>, 10000}]),
ensure_retention_applied(Config, Server), ensure_retention_applied(Config, Server),
@ -2118,7 +2124,7 @@ update_retention_policy(Config) ->
%% If there are changes only in the retention policy, processes should not be restarted %% If there are changes only in the retention policy, processes should not be restarted
?assertEqual(amqqueue:get_pid(Q0), amqqueue:get_pid(Q1)), ?assertEqual(amqqueue:get_pid(Q0), amqqueue:get_pid(Q1)),
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"retention">>), ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, PolicyName),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
queue_info(Config) -> queue_info(Config) ->
@ -2139,17 +2145,18 @@ queue_info(Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
max_segment_size_bytes_policy_validation(Config) -> max_segment_size_bytes_policy_validation(Config) ->
PolicyName = atom_to_binary(?FUNCTION_NAME),
ok = rabbit_ct_broker_helpers:set_policy( ok = rabbit_ct_broker_helpers:set_policy(
Config, 0, <<"segment">>, <<"max_segment_size_bytes.*">>, <<"queues">>, Config, 0, PolicyName, <<"max_segment_size_bytes.*">>, <<"queues">>,
[{<<"stream-max-segment-size-bytes">>, ?MAX_STREAM_MAX_SEGMENT_SIZE - 1_000}]), [{<<"stream-max-segment-size-bytes">>, ?MAX_STREAM_MAX_SEGMENT_SIZE - 1_000}]),
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"segment">>), ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, PolicyName),
{error_string, _} = rabbit_ct_broker_helpers:rpc( {error_string, _} = rabbit_ct_broker_helpers:rpc(
Config, 0, Config, 0,
rabbit_policy, set, rabbit_policy, set,
[<<"/">>, [<<"/">>,
<<"segment">>, PolicyName,
<<"max_segment_size_bytes.*">>, <<"max_segment_size_bytes.*">>,
[{<<"stream-max-segment-size-bytes">>, ?MAX_STREAM_MAX_SEGMENT_SIZE + 1_000}], [{<<"stream-max-segment-size-bytes">>, ?MAX_STREAM_MAX_SEGMENT_SIZE + 1_000}],
0, 0,
@ -2162,23 +2169,23 @@ max_segment_size_bytes_policy(Config) ->
%% config update but will pick it up the next time a stream is restarted. %% config update but will pick it up the next time a stream is restarted.
%% This is a limitation that we may want to address at some %% This is a limitation that we may want to address at some
%% point but for now we need to set the policy _before_ creating the stream. %% point but for now we need to set the policy _before_ creating the stream.
PolicyName = atom_to_binary(?FUNCTION_NAME),
ok = rabbit_ct_broker_helpers:set_policy( ok = rabbit_ct_broker_helpers:set_policy(
Config, 0, <<"segment">>, <<"max_segment_size_bytes.*">>, <<"queues">>, Config, 0, PolicyName, <<"max_segment_size_bytes.*">>, <<"queues">>,
[{<<"stream-max-segment-size-bytes">>, 5000}]), [{<<"stream-max-segment-size-bytes">>, 5000}]),
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server), Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config), Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0}, ?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
Info = find_queue_info(Config, [policy, operator_policy, effective_policy_definition]), Info = find_queue_info(Config, [policy, operator_policy, effective_policy_definition]),
?assertEqual(<<"segment">>, proplists:get_value(policy, Info)), ?assertEqual(PolicyName, proplists:get_value(policy, Info)),
?assertEqual('', proplists:get_value(operator_policy, Info)), ?assertEqual('', proplists:get_value(operator_policy, Info)),
?assertEqual([{<<"stream-max-segment-size-bytes">>, 5000}], ?assertEqual([{<<"stream-max-segment-size-bytes">>, 5000}],
proplists:get_value(effective_policy_definition, Info)), proplists:get_value(effective_policy_definition, Info)),
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"segment">>), ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, PolicyName),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
purge(Config) -> purge(Config) ->