Add Classic Queue version to operator policies

This commit is contained in:
Jean-Sébastien Dominique 2023-09-26 15:09:55 -04:00 committed by Michael Klishin
parent 1e8701f2ca
commit 8c6ba6daca
5 changed files with 40 additions and 4 deletions

View File

@ -782,6 +782,12 @@ end}.
{datatype, string}
]}.
{mapping, "default_policies.operator.$id.classic_queues.queue_version", "rabbit.default_policies.operator",
[
{validators, ["non_zero_positive_integer"]},
{datatype, integer}
]}.
{translation, "rabbit.default_policies.operator", fun(Conf) ->
Props = rabbit_cuttlefish:aggregate_props(
Conf,

View File

@ -54,13 +54,15 @@ register() ->
{operator_policy_validator, <<"max-in-memory-length">>},
{operator_policy_validator, <<"max-in-memory-bytes">>},
{operator_policy_validator, <<"delivery-limit">>},
{operator_policy_validator, <<"queue-version">>},
{policy_merge_strategy, <<"expires">>},
{policy_merge_strategy, <<"message-ttl">>},
{policy_merge_strategy, <<"max-length">>},
{policy_merge_strategy, <<"max-length-bytes">>},
{policy_merge_strategy, <<"max-in-memory-length">>},
{policy_merge_strategy, <<"max-in-memory-bytes">>},
{policy_merge_strategy, <<"delivery-limit">>}]],
{policy_merge_strategy, <<"delivery-limit">>},
{policy_merge_strategy, <<"queue-version">>}]],
ok.
-spec validate_policy([{binary(), term()}]) -> rabbit_policy_validator:validate_results().
@ -211,5 +213,6 @@ merge_policy_value(<<"max-in-memory-length">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"max-in-memory-bytes">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"queue-version">>, _Val, OpVal) -> OpVal;
%% use operator policy value for booleans
merge_policy_value(_Key, Val, OpVal) when is_boolean(Val) andalso is_boolean(OpVal) -> OpVal.

View File

@ -151,6 +151,7 @@ ssl_options.fail_if_no_peer_cert = true",
default_policies.operator.a.classic_queues.ha_mode = exactly
default_policies.operator.a.classic_queues.ha_params = 2
default_policies.operator.a.classic_queues.ha_sync_mode = automatic
default_policies.operator.a.classic_queues.queue_version = 2
",
[{rabbit, [{default_policies, [{operator, [
@ -159,6 +160,7 @@ ssl_options.fail_if_no_peer_cert = true",
{<<"ha_params">>, 2},
{<<"ha_sync_mode">>, <<"automatic">>},
{<<"queue_pattern">>, <<"apple">>},
{<<"queue_version">>, 2},
{<<"vhost_pattern">>, "banana"}]}]}]}]}],
[]},

View File

@ -28,6 +28,7 @@ groups() ->
operator_retroactive_policy_ttl,
operator_retroactive_policy_publish_ttl,
queue_type_specific_policies,
queue_version_specific_policies,
is_supported_operator_policy_expires,
is_supported_operator_policy_message_ttl,
is_supported_operator_policy_max_length,
@ -256,6 +257,29 @@ queue_type_specific_policies(Config) ->
rabbit_ct_client_helpers:close_connection(Conn),
passed.
queue_version_specific_policies(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
QName = <<"policy_queue_version">>,
declare(Ch, QName),
QueueVersionOnePolicy = [{<<"queue-version">>, 1}],
QueueVersionTwoPolicy = [{<<"queue-version">>, 2}],
Opts = #{config => Config,
server => Server,
qname => QName},
%% Queue version OperPolicy has precedence always
verify_policies(QueueVersionOnePolicy, QueueVersionTwoPolicy, QueueVersionTwoPolicy, Opts),
verify_policies(QueueVersionTwoPolicy, QueueVersionOnePolicy, QueueVersionOnePolicy, Opts),
delete(Ch, QName),
rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"policy">>),
rabbit_ct_broker_helpers:clear_operator_policy(Config, 0, <<"op_policy">>),
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
%% See supported policies in https://www.rabbitmq.com/parameters.html#operator-policies
%% This test applies all supported operator policies to all queue types,
%% and later verifies the effective policy definitions.
@ -433,10 +457,10 @@ verify_policies(Policy, OperPolicy, VerifyFuns, #{config := Config,
server := Server,
qname := QName}) ->
rabbit_ct_broker_helpers:set_policy(Config, 0, <<"policy">>,
<<"policy_ha">>, <<"queues">>,
QName, <<"queues">>,
Policy),
rabbit_ct_broker_helpers:set_operator_policy(Config, 0, <<"op_policy">>,
<<"policy_ha">>, <<"queues">>,
QName, <<"queues">>,
OperPolicy),
verify_policy(VerifyFuns, Server, QName).

View File

@ -288,7 +288,8 @@
<span class="argument-link" field="definitionop" key="ha-sync-mode" type="string">HA sync mode</span> <span class="help" id="policy-ha-sync-mode"></span> </br>
<span class="argument-link" field="definitionop" key="max-length" type="number">Max length</span> |
<span class="argument-link" field="definitionop" key="max-length-bytes" type="number">Max length bytes</span> |
<span class="argument-link" field="definitionop" key="message-ttl" type="number">Message TTL</span>
<span class="argument-link" field="definitionop" key="message-ttl" type="number">Message TTL</span> |
<span class="argument-link" field="definitionop" key="queue-version" type="number">Version</span> <span class="help" id="queue-version"></span> </br>
<span class="help" id="queue-message-ttl"></span>
</td>
</tr>