Allow applying policies to specific queue types

Rather than relying on queue name conventions, allow applying policies
based on the queue type. For example, this allows multiple policies that
apply to all queue names (".*") that specify different parameters for
different queue types.
This commit is contained in:
Michal Kuratczyk 2023-03-13 09:15:31 +01:00
parent 7bdc06a6c8
commit 0a3136a916
No known key found for this signature in database
5 changed files with 77 additions and 9 deletions

View File

@ -1724,7 +1724,13 @@ Which types of object this policy should apply to.
Possible values are:
.Bl -bullet -compact
.It
queues
queues (all queue types, including streams)
.It
classic_queues (classic queues only)
.It
quorum_queues (quorum queues only)
.It
streams (streams only)
.It
exchanges
.It

View File

@ -193,8 +193,8 @@ match_all(NameOrQueue, Policies) ->
lists:sort(fun priority_comparator/2, [P || P <- Policies, matches(NameOrQueue, P)]).
matches(Q, Policy) when ?is_amqqueue(Q) ->
#resource{name = Name, kind = Kind, virtual_host = VHost} = amqqueue:get_name(Q),
matches_type(Kind, pget('apply-to', Policy)) andalso
#resource{name = Name, virtual_host = VHost} = amqqueue:get_name(Q),
matches_queue_type(queue, amqqueue:get_type(Q), pget('apply-to', Policy)) andalso
is_applicable(Q, pget(definition, Policy)) andalso
match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso
VHost =:= pget(vhost, Policy);
@ -517,11 +517,16 @@ maybe_notify_of_policy_change({Q1, Q2}, PolicyDef, ActingUser) when ?is_amqqueue
rabbit_amqqueue:policy_changed(Q1, Q2).
matches_type(exchange, <<"exchanges">>) -> true;
matches_type(queue, <<"queues">>) -> true;
matches_type(exchange, <<"all">>) -> true;
matches_type(queue, <<"all">>) -> true;
matches_type(_, _) -> false.
matches_queue_type(queue, _, <<"all">>) -> true;
matches_queue_type(queue, _, <<"queues">>) -> true;
matches_queue_type(queue, rabbit_classic_queue, <<"classic_queues">>) -> true;
matches_queue_type(queue, rabbit_quorum_queue, <<"quorum_queues">>) -> true;
matches_queue_type(queue, rabbit_stream_queue, <<"streams">>) -> true;
matches_queue_type(queue, _, _) -> false.
priority_comparator(A, B) -> pget(priority, A) >= pget(priority, B).
is_applicable(Q, Policy) when ?is_amqqueue(Q) ->
@ -602,6 +607,9 @@ is_proplist(L) -> length(L) =:= length([I || I = {_, _} <- L]).
apply_to_validation(_Name, <<"all">>) -> ok;
apply_to_validation(_Name, <<"exchanges">>) -> ok;
apply_to_validation(_Name, <<"queues">>) -> ok;
apply_to_validation(_Name, <<"classic_queues">>) -> ok;
apply_to_validation(_Name, <<"quorum_queues">>) -> ok;
apply_to_validation(_Name, <<"streams">>) -> ok;
apply_to_validation(_Name, Term) ->
{error, "apply-to '~ts' unrecognised; should be 'queues', 'exchanges' "
"or 'all'", [Term]}.
{error, "apply-to '~ts' unrecognised; should be one of: 'queues', 'classic_queues', "
" 'quorum_queues', 'streams', 'exchanges', or 'all'", [Term]}.

View File

@ -9,7 +9,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("stdlib/include/assert.hrl").
-compile(export_all).
@ -25,7 +25,8 @@ groups() ->
policy_ttl,
operator_policy_ttl,
operator_retroactive_policy_ttl,
operator_retroactive_policy_publish_ttl
operator_retroactive_policy_publish_ttl,
queue_type_specific_policies
]}
].
@ -203,6 +204,43 @@ target_count_policy(Config) ->
rabbit_ct_client_helpers:close_connection(Conn),
passed.
queue_type_specific_policies(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
ClassicQ = <<"policy_ttl-classic_queue">>,
QuorumQ = <<"policy_ttl-quorum_queue">>,
StreamQ = <<"policy_ttl-stream_queue">>,
%% all policies match ".*" but different values should be applied based on queue type
rabbit_ct_broker_helpers:set_policy(Config, 0, <<"ttl-policy-classic">>,
<<".*">>, <<"classic_queues">>, [{<<"message-ttl">>, 20}]),
rabbit_ct_broker_helpers:set_policy(Config, 0, <<"ttl-policy-quorum">>,
<<".*">>, <<"quorum_queues">>, [{<<"message-ttl">>, 40}]),
rabbit_ct_broker_helpers:set_policy(Config, 0, <<"ttl-policy-stream">>,
<<".*">>, <<"streams">>, [{<<"max-age">>, "1h"}]),
declare(Ch, ClassicQ, [{<<"x-queue-type">>, longstr, <<"classic">>}]),
declare(Ch, QuorumQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
declare(Ch, StreamQ, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
timer:sleep(1),
?assertMatch(20, check_policy_value(Server, ClassicQ, <<"message-ttl">>)),
?assertMatch(40, check_policy_value(Server, QuorumQ, <<"message-ttl">>)),
?assertMatch("1h", check_policy_value(Server, StreamQ, <<"max-age">>)),
delete(Ch, ClassicQ),
delete(Ch, QuorumQ),
delete(Ch, StreamQ),
rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl-policy-classic">>),
rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl-policy-quorum">>),
rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl-policy-stream">>),
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed.
%%----------------------------------------------------------------------------
@ -211,6 +249,11 @@ declare(Ch, Q) ->
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
durable = true}).
declare(Ch, Q, Args) ->
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
durable = true,
arguments = Args}).
delete(Ch, Q) ->
amqp_channel:call(Ch, #'queue.delete'{queue = Q}).

View File

@ -83,6 +83,9 @@
<option value="all">Exchanges and queues</option>
<option value="exchanges">Exchanges</option>
<option value="queues">Queues</option>
<option value="classic_queues">Classic Queues</option>
<option value="quorum_queues">Quorum Queues</option>
<option value="streams">Streams</option>
</select>
</td>
</tr>
@ -259,6 +262,9 @@
<td>
<select name="apply-to">
<option value="queues">Queues</option>
<option value="classic_queues">Classic Queues</option>
<option value="quorum_queues">Quorum Queues</option>
<option value="streams">Streams</option>
</select>
</td>
</tr>

View File

@ -141,6 +141,11 @@ in the `3.11.x` release series.
GitHub issue: [#7208](https://github.com/rabbitmq/rabbitmq-server/issues/7208).
* Policies can now be defined to only apply to specific queue types. For example, you can have two policies that match all queue names ('.*')
but different queue types, so that different parameters are applied to all queues of a specific type. Example usage:
```
rabbitmqctl set_policy at-least-once-dead-lettering ".*" '{"dead-letter-strategy": "at-least-once"}' --apply-to quorum_queues
```
### CLI Tools