From a22486211ac73ebf7900b49d892d522f3c07b0d1 Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Thu, 16 Feb 2023 10:35:34 -0800 Subject: [PATCH 1/4] See #7323. Oper policy for ha-mode and ha-params --- deps/rabbit/src/rabbit_mirror_queue_misc.erl | 43 ++++++++++ deps/rabbit/test/policy_SUITE.erl | 84 ++++++++++++++++++++ 2 files changed, 127 insertions(+) diff --git a/deps/rabbit/src/rabbit_mirror_queue_misc.erl b/deps/rabbit/src/rabbit_mirror_queue_misc.erl index 8e2f8660d0..a45cd795f7 100644 --- a/deps/rabbit/src/rabbit_mirror_queue_misc.erl +++ b/deps/rabbit/src/rabbit_mirror_queue_misc.erl @@ -7,6 +7,7 @@ -module(rabbit_mirror_queue_misc). -behaviour(rabbit_policy_validator). +-behaviour(rabbit_policy_merge_strategy). -include("amqqueue.hrl"). @@ -15,6 +16,7 @@ initial_queue_node/2, suggested_queue_nodes/1, actual_queue_nodes/1, is_mirrored/1, is_mirrored_ha_nodes/1, update_mirrors/2, update_mirrors/1, validate_policy/1, + merge_policy_value/3, maybe_auto_sync/1, maybe_drop_master_after_sync/1, sync_batch_size/1, default_max_sync_throughput/0, log_info/3, log_warning/3]). @@ -46,6 +48,14 @@ [policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}}, {mfa, {rabbit_registry, register, [policy_validator, <<"ha-promote-on-failure">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [operator_policy_validator, <<"ha-mode">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [operator_policy_validator, <<"ha-params">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_merge_strategy, <<"ha-mode">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_merge_strategy, <<"ha-params">>, ?MODULE]}}, {requires, rabbit_registry}, {enables, recovery}]}). @@ -788,3 +798,36 @@ validate_pof(PromoteOnShutdown) -> Mode -> {error, "ha-promote-on-failure must be " "\"always\" or \"when-synced\", got ~tp", [Mode]} end. + +merge_policy_value(<<"ha-mode">>, Val, Val) -> + Val; +merge_policy_value(<<"ha-mode">>, <<"all">> = Val, _OpVal) -> + Val; +merge_policy_value(<<"ha-mode">>, _Val, <<"all">> = OpVal) -> + OpVal; +merge_policy_value(<<"ha-mode">>, <<"exactly">> = Val, _OpVal) -> + Val; +merge_policy_value(<<"ha-mode">>, _Val, <<"exactly">> = OpVal) -> + OpVal; +%% Both values are integers, both are ha-mode 'exactly' +merge_policy_value(<<"ha-params">>, Val, OpVal) when is_integer(Val) + andalso + is_integer(OpVal)-> + if Val > OpVal -> + Val; + true -> + OpVal + end; +%% The integer values is of ha-mode 'exactly', the other is a list and of +%% ha-mode 'nodes'. 'exactly' takes precedence +merge_policy_value(<<"ha-params">>, Val, _OpVal) when is_integer(Val) -> + Val; +merge_policy_value(<<"ha-params">>, _Val, OpVal) when is_integer(OpVal) -> + OpVal; +%% Both values are lists, of ha-mode 'nodes', max length takes precedence. +merge_policy_value(<<"ha-params">>, Val, OpVal) -> + if length(Val) > length(OpVal) -> + Val; + true -> + OpVal + end. diff --git a/deps/rabbit/test/policy_SUITE.erl b/deps/rabbit/test/policy_SUITE.erl index df321f931c..9c22932457 100644 --- a/deps/rabbit/test/policy_SUITE.erl +++ b/deps/rabbit/test/policy_SUITE.erl @@ -10,6 +10,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). + -compile(export_all). all() -> @@ -20,6 +21,7 @@ all() -> groups() -> [ {cluster_size_2, [], [ + target_count_policy, policy_ttl, operator_policy_ttl, operator_retroactive_policy_ttl, @@ -149,6 +151,59 @@ operator_retroactive_policy_publish_ttl(Config) -> rabbit_ct_client_helpers:close_connection(Conn), passed. +target_count_policy(Config) -> + [Server | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = <<"policy_ha">>, + declare(Ch, QName), + BNodes = [atom_to_binary(N) || N <- Nodes], + + AllPolicy = [{<<"ha-mode">>, <<"all">>}], + ExactlyPolicyOne = [{<<"ha-mode">>, <<"exactly">>}, + {<<"ha-params">>, 1}], + ExactlyPolicyTwo = [{<<"ha-mode">>, <<"exactly">>}, + {<<"ha-params">>, 2}], + NodesPolicyAll = [{<<"ha-mode">>, <<"nodes">>}, + {<<"ha-params">>, BNodes}], + NodesPolicyOne = [{<<"ha-mode">>, <<"nodes">>}, + {<<"ha-params">>, [hd(BNodes)]}], + + %% ALL has precedence + Opts = #{config => Config, + server => Server, + qname => QName}, + + verify_policies(AllPolicy, ExactlyPolicyTwo, [{<<"ha-mode">>, <<"all">>}], Opts), + + verify_policies(ExactlyPolicyTwo, AllPolicy, [{<<"ha-mode">>, <<"all">>}], Opts), + + verify_policies(AllPolicy, NodesPolicyAll, [{<<"ha-mode">>, <<"all">>}], Opts), + + verify_policies(NodesPolicyAll, AllPolicy, [{<<"ha-mode">>, <<"all">>}], Opts), + + %% exactly has precedence over nodes + verify_policies(ExactlyPolicyTwo, NodesPolicyAll,[{<<"ha-mode">>, <<"exactly">>}, {<<"ha-params">>, 2}], Opts), + + verify_policies(NodesPolicyAll, ExactlyPolicyTwo, [{<<"ha-mode">>, <<"exactly">>}, {<<"ha-params">>, 2}], Opts), + + %% Highest exactly value has precedence + verify_policies(ExactlyPolicyTwo, ExactlyPolicyOne, [{<<"ha-mode">>, <<"exactly">>}, {<<"ha-params">>, 2}], Opts), + + verify_policies(ExactlyPolicyOne, ExactlyPolicyTwo, [{<<"ha-mode">>, <<"exactly">>}, {<<"ha-params">>, 2}], Opts), + + %% Longest node count has precedence + SortedNodes = lists:sort(BNodes), + verify_policies(NodesPolicyAll, NodesPolicyOne, [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, SortedNodes}], Opts), + verify_policies(NodesPolicyOne, NodesPolicyAll, [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, SortedNodes}], Opts), + + delete(Ch, QName), + rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"policy">>), + rabbit_ct_broker_helpers:clear_operator_policy(Config, 0, <<"op_policy">>), + rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), + passed. + + %%---------------------------------------------------------------------------- @@ -156,6 +211,12 @@ declare(Ch, Q) -> amqp_channel:call(Ch, #'queue.declare'{queue = Q, durable = true}). +declare(Ch, Q, Args) -> + amqp_channel:call(Ch, #'queue.declare'{queue = Q, + durable = true, + auto_delete = false, + arguments = Args}). + delete(Ch, Q) -> amqp_channel:call(Ch, #'queue.delete'{queue = Q}). @@ -201,4 +262,27 @@ get_messages(Number, Ch, Q) -> exit(failed) end. +check_policy_value(Server, QName, Value) -> + {ok, Q} = rpc:call(Server, rabbit_amqqueue, lookup, [rabbit_misc:r(<<"/">>, queue, QName)]), + proplists:get_value(Value, rpc:call(Server, rabbit_policy, effective_definition, [Q])). + +verify_policies(Policy, OperPolicy, VerifyFuns, #{config := Config, + server := Server, + qname := QName}) -> + rabbit_ct_broker_helpers:set_policy(Config, 0, <<"policy">>, + <<"policy_ha">>, <<"queues">>, + Policy), + rabbit_ct_broker_helpers:set_operator_policy(Config, 0, <<"op_policy">>, + <<"policy_ha">>, <<"queues">>, + OperPolicy), + verify_policy(VerifyFuns, Server, QName). + +verify_policy([], _, _) -> + ok; +verify_policy([{HA, Expect} | Tail], Server, QName) -> + ct:print(">>> Expect: ~p >>> actual ~p",[Expect, check_policy_value(Server, QName, HA)]), + Expect = check_policy_value(Server, QName, HA), + verify_policy(Tail, Server, QName). + + %%---------------------------------------------------------------------------- From 36a559da518f7eb85ef04ca5b60fc4c8c68a1411 Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Tue, 21 Feb 2023 11:32:54 -0800 Subject: [PATCH 2/4] See #7323. Cleanup testcase. --- deps/rabbit/test/policy_SUITE.erl | 7 ------- 1 file changed, 7 deletions(-) diff --git a/deps/rabbit/test/policy_SUITE.erl b/deps/rabbit/test/policy_SUITE.erl index 9c22932457..26457c4389 100644 --- a/deps/rabbit/test/policy_SUITE.erl +++ b/deps/rabbit/test/policy_SUITE.erl @@ -211,12 +211,6 @@ declare(Ch, Q) -> amqp_channel:call(Ch, #'queue.declare'{queue = Q, durable = true}). -declare(Ch, Q, Args) -> - amqp_channel:call(Ch, #'queue.declare'{queue = Q, - durable = true, - auto_delete = false, - arguments = Args}). - delete(Ch, Q) -> amqp_channel:call(Ch, #'queue.delete'{queue = Q}). @@ -280,7 +274,6 @@ verify_policies(Policy, OperPolicy, VerifyFuns, #{config := Config, verify_policy([], _, _) -> ok; verify_policy([{HA, Expect} | Tail], Server, QName) -> - ct:print(">>> Expect: ~p >>> actual ~p",[Expect, check_policy_value(Server, QName, HA)]), Expect = check_policy_value(Server, QName, HA), verify_policy(Tail, Server, QName). From 245a5e07b311b60affb9c965468d6a1fe3a397e1 Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Tue, 21 Feb 2023 11:45:49 -0800 Subject: [PATCH 3/4] See #7323. Add ha-mode and ha-params to default_policies --- deps/rabbit/priv/schema/rabbit.schema | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index a4cbc2b170..15c4afa10f 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -743,6 +743,14 @@ end}. {datatype, integer} ]}. +{mapping, "default_policies.operator.$id.ha_mode", "rabbit.default_policies.operator", [ + {datatype, string} +]}. + +{mapping, "default_policies.operator.$id.ha_params", "rabbit.default_policies.operator", [ + {datatype, [integer, {list, string}]} +]}. + {translation, "rabbit.default_policies.operator", fun(Conf) -> Props = rabbit_cuttlefish:aggregate_props(Conf, ["default_policies", "operator"]), Props1 = lists:map( From d66b38d333c0bf6aa6d49b9e32b2ce8bb596e897 Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Wed, 22 Feb 2023 11:46:03 -0800 Subject: [PATCH 4/4] See #7323. Rename default policy for ha-* and add option to massage key/value for aggregate_props --- deps/rabbit/priv/schema/rabbit.schema | 12 +++++++++--- deps/rabbit/src/rabbit_cuttlefish.erl | 13 ++++++++++--- .../test/config_schema_SUITE_data/rabbit.snippets | 4 ++++ 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 15c4afa10f..fec9737cff 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -743,16 +743,22 @@ end}. {datatype, integer} ]}. -{mapping, "default_policies.operator.$id.ha_mode", "rabbit.default_policies.operator", [ +{mapping, "default_policies.operator.$id.classic_queues.ha_mode", "rabbit.default_policies.operator", [ {datatype, string} ]}. -{mapping, "default_policies.operator.$id.ha_params", "rabbit.default_policies.operator", [ +{mapping, "default_policies.operator.$id.classic_queues.ha_params", "rabbit.default_policies.operator", [ {datatype, [integer, {list, string}]} ]}. {translation, "rabbit.default_policies.operator", fun(Conf) -> - Props = rabbit_cuttlefish:aggregate_props(Conf, ["default_policies", "operator"]), + Props = rabbit_cuttlefish:aggregate_props( + Conf, + ["default_policies", "operator"], + fun({["default_policies","operator",ID,"classic_queues"|T], V}) -> + {["default_policies","operator",ID|T],V}; + (E) -> E + end), Props1 = lists:map( fun({K, Ss}) -> {K, diff --git a/deps/rabbit/src/rabbit_cuttlefish.erl b/deps/rabbit/src/rabbit_cuttlefish.erl index 3ab4694853..a1326fb94a 100644 --- a/deps/rabbit/src/rabbit_cuttlefish.erl +++ b/deps/rabbit/src/rabbit_cuttlefish.erl @@ -8,7 +8,8 @@ -module(rabbit_cuttlefish). -export([ - aggregate_props/2 + aggregate_props/2, + aggregate_props/3 ]). -type keyed_props() :: [{binary(), [{binary(), any()}]}]. @@ -16,13 +17,19 @@ -spec aggregate_props([{string(), any()}], [string()]) -> keyed_props(). aggregate_props(Conf, Prefix) -> + aggregate_props(Conf, Prefix, fun(E) -> E end). + +-spec aggregate_props([{string(), any()}], [string()], function()) -> + keyed_props(). +aggregate_props(Conf, Prefix, KeyFun) -> Pattern = Prefix ++ ["$id", "$_"], PrefixLen = length(Prefix), FlatList = lists:filtermap( - fun({K, V}) -> + fun(E) -> + {K, V} = KeyFun(E), case cuttlefish_variable:is_fuzzy_match(K, Pattern) of true -> {true, {lists:nthtail(PrefixLen, K), V}}; - _ -> false + false -> false end end, Conf diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index b4917eab52..95ff9e84e6 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -123,9 +123,13 @@ ssl_options.fail_if_no_peer_cert = true", default_policies.operator.a.expires = 1h default_policies.operator.a.queue_pattern = apple default_policies.operator.a.vhost_pattern = banana + default_policies.operator.a.classic_queues.ha_mode = exactly + default_policies.operator.a.classic_queues.ha_params = 2 ", [{rabbit, [{default_policies, [{operator, [ {<<"a">>, [{<<"expires">>, 3600000}, + {<<"ha-mode">>, "exactly"}, + {<<"ha-params">>, 2}, {<<"queue-pattern">>, "apple"}, {<<"vhost-pattern">>, "banana"}]}]}]}]}], []},