Merge pull request #12466 from rabbitmq/enable-required-feature-flags-during-sync

rabbit_feature_flags: Introduce hard vs. soft required feature flags
This commit is contained in:
Jean-Sébastien Pédron 2024-10-30 14:31:45 +01:00 committed by GitHub
commit b9328fe586
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 301 additions and 64 deletions

View File

@ -10,14 +10,16 @@
-rabbit_feature_flag(
{classic_mirrored_queue_version,
#{desc => "Support setting version for classic mirrored queues",
stability => required
stability => required,
require_level => hard
}}).
-rabbit_feature_flag(
{quorum_queue,
#{desc => "Support queues of type `quorum`",
doc_url => "https://www.rabbitmq.com/docs/quorum-queues",
stability => required
stability => required,
require_level => hard
}}).
-rabbit_feature_flag(
@ -25,6 +27,7 @@
#{desc => "Support queues of type `stream`",
doc_url => "https://www.rabbitmq.com/docs/stream",
stability => required,
require_level => hard,
depends_on => [quorum_queue]
}}).
@ -32,25 +35,29 @@
{implicit_default_bindings,
#{desc => "Default bindings are now implicit, instead of "
"being stored in the database",
stability => required
stability => required,
require_level => hard
}}).
-rabbit_feature_flag(
{virtual_host_metadata,
#{desc => "Virtual host metadata (description, tags, etc)",
stability => required
stability => required,
require_level => hard
}}).
-rabbit_feature_flag(
{maintenance_mode_status,
#{desc => "Maintenance mode status",
stability => required
stability => required,
require_level => hard
}}).
-rabbit_feature_flag(
{user_limits,
#{desc => "Configure connection and channel limits for a user",
stability => required
{user_limits,
#{desc => "Configure connection and channel limits for a user",
stability => required,
require_level => hard
}}).
-rabbit_feature_flag(
@ -58,33 +65,38 @@
#{desc => "Single active consumer for streams",
doc_url => "https://www.rabbitmq.com/docs/stream",
stability => required,
require_level => hard,
depends_on => [stream_queue]
}}).
-rabbit_feature_flag(
{feature_flags_v2,
#{desc => "Feature flags subsystem V2",
stability => required
{feature_flags_v2,
#{desc => "Feature flags subsystem V2",
stability => required,
require_level => hard
}}).
-rabbit_feature_flag(
{direct_exchange_routing_v2,
#{desc => "v2 direct exchange routing implementation",
stability => required,
depends_on => [feature_flags_v2, implicit_default_bindings]
#{desc => "v2 direct exchange routing implementation",
stability => required,
require_level => hard,
depends_on => [feature_flags_v2, implicit_default_bindings]
}}).
-rabbit_feature_flag(
{listener_records_in_ets,
#{desc => "Store listener records in ETS instead of Mnesia",
stability => required,
depends_on => [feature_flags_v2]
#{desc => "Store listener records in ETS instead of Mnesia",
stability => required,
require_level => hard,
depends_on => [feature_flags_v2]
}}).
-rabbit_feature_flag(
{tracking_records_in_ets,
#{desc => "Store tracking records in ETS instead of Mnesia",
stability => required,
require_level => hard,
depends_on => [feature_flags_v2]
}}).
@ -94,6 +106,7 @@
doc_url => "https://github.com/rabbitmq/rabbitmq-server/issues/5931",
%%TODO remove compatibility code
stability => required,
require_level => hard,
depends_on => [stream_queue]
}}).
@ -102,6 +115,7 @@
#{desc => "Support for restarting streams with optional preferred next leader argument."
"Used to implement stream leader rebalancing",
stability => required,
require_level => hard,
depends_on => [stream_queue]
}}).
@ -110,6 +124,7 @@
#{desc => "Bug fix to unblock a group of consumers in a super stream partition",
doc_url => "https://github.com/rabbitmq/rabbitmq-server/issues/7743",
stability => required,
require_level => hard,
depends_on => [stream_single_active_consumer]
}}).
@ -117,6 +132,7 @@
{stream_filtering,
#{desc => "Support for stream filtering.",
stability => required,
require_level => hard,
depends_on => [stream_queue]
}}).
@ -124,6 +140,7 @@
{message_containers,
#{desc => "Message containers.",
stability => required,
require_level => hard,
depends_on => [feature_flags_v2]
}}).
@ -154,6 +171,7 @@
#{desc => "A new internal command that is used to update streams as "
"part of a policy.",
stability => required,
require_level => hard,
depends_on => [stream_queue]
}}).

View File

@ -105,6 +105,7 @@
init/0,
get_state/1,
get_stability/1,
get_require_level/1,
check_node_compatibility/1, check_node_compatibility/2,
sync_feature_flags_with_cluster/2,
refresh_feature_flags_after_app_load/0,
@ -147,6 +148,7 @@
-type feature_props() :: #{desc => string(),
doc_url => string(),
stability => stability(),
require_level => require_level(),
depends_on => [feature_name()],
callbacks =>
#{callback_name() => callback_fun_name()}}.
@ -183,6 +185,7 @@
desc => string(),
doc_url => string(),
stability => stability(),
require_level => require_level(),
depends_on => [feature_name()],
callbacks =>
#{callback_name() => callback_fun_name()},
@ -207,6 +210,15 @@
%% Experimental feature flags are not enabled by default on a fresh RabbitMQ
%% node. They must be enabled by the user.
-type require_level() :: hard | soft.
%% The level of requirement of a feature flag.
%%
%% A hard required feature flags must be enabled before a RabbitMQ node is
%% upgraded to a version where it is required.
%%
%% A soft required feature flag will be automatically enabled when a RabbitMQ
%% node is upgraded to a version where it is required.
-type callback_fun_name() :: {Module :: module(), Function :: atom()}.
%% The name of the module and function to call when changing the state of
%% the feature flag.
@ -755,6 +767,48 @@ get_stability(FeatureProps) when ?IS_DEPRECATION(FeatureProps) ->
permitted_by_default -> experimental
end.
-spec get_require_level
(FeatureName) -> RequireLevel | undefined when
FeatureName :: feature_name(),
RequireLevel :: require_level() | none;
(FeatureProps) -> RequireLevel when
FeatureProps ::
feature_props_extended() |
rabbit_deprecated_features:feature_props_extended(),
RequireLevel :: require_level() | none.
%% @doc
%% Returns the requirement level of a feature flag.
%%
%% The possible requirement levels are:
%% <ul>
%% <li>`hard': the feature flag must be enabled before the RabbitMQ node is
%% upgraded to a version where it is hard required.</li>
%% <li>`soft': the feature flag will be automatically enabled wher a RabbitMQ
%% node is upgraded to a version where it is soft required.</li>
%% <li>`none': the feature flag is not required.</li>
%% </ul>
%%
%% @param FeatureName The name of the feature flag to check.
%% @param FeatureProps A feature flag properties map.
%% @returns `hard', `soft' or `none', or `undefined' if the given feature flag
%% name doesn't correspond to a known feature flag.
get_require_level(FeatureName) when is_atom(FeatureName) ->
case rabbit_ff_registry_wrapper:get(FeatureName) of
undefined -> undefined;
FeatureProps -> get_require_level(FeatureProps)
end;
get_require_level(FeatureProps) when ?IS_FEATURE_FLAG(FeatureProps) ->
case get_stability(FeatureProps) of
required -> maps:get(require_level, FeatureProps, soft);
_ -> none
end;
get_require_level(FeatureProps) when ?IS_DEPRECATION(FeatureProps) ->
case get_stability(FeatureProps) of
required -> hard;
_ -> none
end.
%% -------------------------------------------------------------------
%% Feature flags registry.
%% -------------------------------------------------------------------
@ -913,6 +967,7 @@ assert_feature_flag_is_valid(FeatureName, FeatureProps) ->
ValidProps = [desc,
doc_url,
stability,
require_level,
depends_on,
callbacks],
?assertEqual([], maps:keys(FeatureProps) -- ValidProps),
@ -1363,7 +1418,7 @@ run_feature_flags_mod_on_remote_node(Node, Function, Args, Timeout) ->
sync_feature_flags_with_cluster([] = _Nodes, true = _NodeIsVirgin) ->
rabbit_ff_controller:enable_default();
sync_feature_flags_with_cluster([] = _Nodes, false = _NodeIsVirgin) ->
ok;
rabbit_ff_controller:enable_required();
sync_feature_flags_with_cluster(Nodes, _NodeIsVirgin) ->
%% We don't use `rabbit_nodes:filter_running()' here because the given
%% `Nodes' list may contain nodes which are not members yet (the cluster

View File

@ -38,6 +38,7 @@
-export([is_supported/1, is_supported/2,
enable/1,
enable_default/0,
enable_required/0,
check_node_compatibility/2,
sync_cluster/1,
refresh_after_app_load/0,
@ -136,6 +137,24 @@ enable_default() ->
Ret
end.
enable_required() ->
?LOG_DEBUG(
"Feature flags: enable required feature flags",
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
case erlang:whereis(?LOCAL_NAME) of
Pid when is_pid(Pid) ->
%% The function is called while `rabbit' is running.
gen_statem:call(?LOCAL_NAME, enable_required);
undefined ->
%% The function is called while `rabbit' is stopped. We need to
%% start a one-off controller, again to make sure concurrent
%% changes are blocked.
{ok, Pid} = start_link(),
Ret = gen_statem:call(Pid, enable_required),
gen_statem:stop(Pid),
Ret
end.
check_node_compatibility(RemoteNode, LocalNodeAsVirgin) ->
ThisNode = node(),
case LocalNodeAsVirgin of
@ -206,7 +225,7 @@ standing_by(
when EventContent =/= notify_when_done ->
?LOG_DEBUG(
"Feature flags: registering controller globally before "
"proceeding with task: ~tp",
"proceeding with task: ~0tp",
[EventContent],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
@ -304,6 +323,8 @@ proceed_with_task({enable, FeatureNames}) ->
enable_task(FeatureNames);
proceed_with_task(enable_default) ->
enable_default_task();
proceed_with_task(enable_required) ->
enable_required_task();
proceed_with_task({sync_cluster, Nodes}) ->
sync_cluster_task(Nodes);
proceed_with_task(refresh_after_app_load) ->
@ -841,6 +862,24 @@ get_forced_feature_flag_names_from_config() ->
_ when is_list(Value) -> {ok, Value}
end.
-spec enable_required_task() -> Ret when
Ret :: ok | {error, Reason},
Reason :: term().
enable_required_task() ->
{ok, Inventory} = collect_inventory_on_nodes([node()]),
RequiredFeatureNames = list_soft_required_feature_flags(Inventory),
case RequiredFeatureNames of
[] ->
ok;
_ ->
?LOG_DEBUG(
"Feature flags: enabling required feature flags: ~0p",
[RequiredFeatureNames],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS})
end,
enable_many(Inventory, RequiredFeatureNames).
-spec sync_cluster_task() -> Ret when
Ret :: ok | {error, Reason},
Reason :: term().
@ -855,23 +894,6 @@ sync_cluster_task() ->
Reason :: term().
sync_cluster_task(Nodes) ->
%% We assume that a feature flag can only be enabled, not disabled.
%% Therefore this synchronization searches for feature flags enabled on
%% some nodes but not all, and make sure they are enabled everywhere.
%%
%% This happens when a node joins a cluster and that node has a different
%% set of enabled feature flags.
%%
%% FIXME: `enable_task()' requires that all nodes in the cluster run to
%% enable anything. Should we require the same here? On one hand, this
%% would make sure a feature flag isn't enabled while there is a network
%% partition. On the other hand, this would require that all nodes are
%% running before we can expand the cluster...
?LOG_DEBUG(
"Feature flags: synchronizing feature flags on nodes: ~tp",
[Nodes],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
case collect_inventory_on_nodes(Nodes) of
{ok, Inventory} ->
CantEnable = list_deprecated_features_that_cant_be_denied(
@ -880,7 +902,27 @@ sync_cluster_task(Nodes) ->
[] ->
FeatureNames = list_feature_flags_enabled_somewhere(
Inventory, false),
enable_many(Inventory, FeatureNames);
%% In addition to feature flags enabled somewhere, we also
%% ensure required feature flags are enabled accross the
%% board.
RequiredFeatureNames = list_soft_required_feature_flags(
Inventory),
case RequiredFeatureNames of
[] ->
ok;
_ ->
?LOG_DEBUG(
"Feature flags: enabling required feature "
"flags as part of cluster sync: ~0p",
[RequiredFeatureNames],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS})
end,
FeatureNamesToEnable = lists:usort(
FeatureNames ++
RequiredFeatureNames),
enable_many(Inventory, FeatureNamesToEnable);
_ ->
?LOG_ERROR(
"Feature flags: the following deprecated features "
@ -1026,13 +1068,13 @@ check_required_and_enable(
FeatureName) ->
%% Required feature flags vs. virgin nodes.
FeatureProps = maps:get(FeatureName, FeatureFlags),
Stability = rabbit_feature_flags:get_stability(FeatureProps),
RequireLevel = rabbit_feature_flags:get_require_level(FeatureProps),
ProvidedBy = maps:get(provided_by, FeatureProps),
NodesWhereDisabled = list_nodes_where_feature_flag_is_disabled(
Inventory, FeatureName),
MarkDirectly = case Stability of
required when ProvidedBy =:= rabbit ->
MarkDirectly = case RequireLevel of
hard when ProvidedBy =:= rabbit ->
?LOG_DEBUG(
"Feature flags: `~s`: the feature flag is "
"required on some nodes; list virgin nodes "
@ -1051,7 +1093,7 @@ check_required_and_enable(
end
end, NodesWhereDisabled),
VirginNodesWhereDisabled =:= NodesWhereDisabled;
required when ProvidedBy =/= rabbit ->
hard when ProvidedBy =/= rabbit ->
%% A plugin can be enabled/disabled at runtime and
%% between restarts. Thus we have no way to
%% distinguish a newly enabled plugin from a plugin
@ -1076,8 +1118,8 @@ check_required_and_enable(
case MarkDirectly of
false ->
case Stability of
required ->
case RequireLevel of
hard ->
?LOG_DEBUG(
"Feature flags: `~s`: some nodes where the feature "
"flag is disabled are not virgin, we need to perform "
@ -1445,6 +1487,26 @@ list_feature_flags_enabled_somewhere(
end, #{}, StatesPerNode),
lists:sort(maps:keys(MergedStates)).
list_soft_required_feature_flags(
#{feature_flags := FeatureFlags, states_per_node := StatesPerNode}) ->
FeatureStates = maps:get(node(), StatesPerNode),
RequiredFeatureNames = maps:fold(
fun(FeatureName, FeatureProps, Acc) ->
RequireLevel = (
rabbit_feature_flags:get_require_level(
FeatureProps)),
IsEnabled = maps:get(
FeatureName, FeatureStates,
false),
case RequireLevel of
soft when IsEnabled =:= false ->
[FeatureName | Acc];
_ ->
Acc
end
end, [], FeatureFlags),
lists:sort(RequiredFeatureNames).
-spec list_deprecated_features_that_cant_be_denied(Inventory) ->
Ret when
Inventory :: rabbit_feature_flags:cluster_inventory(),
@ -1517,7 +1579,7 @@ list_nodes_where_feature_flag_is_disabled(
%% disabled.
not Enabled;
_ ->
%% The feature flags is unknown on this
%% The feature flag is unknown on this
%% node, don't run the migration function.
false
end

View File

@ -261,26 +261,27 @@ maybe_initialize_registry(NewSupportedFeatureFlags,
maps:map(
fun
(FeatureName, FeatureProps) when ?IS_FEATURE_FLAG(FeatureProps) ->
Stability = rabbit_feature_flags:get_stability(FeatureProps),
RequireLevel = (
rabbit_feature_flags:get_require_level(FeatureProps)),
ProvidedBy = maps:get(provided_by, FeatureProps),
State = case FeatureStates0 of
#{FeatureName := FeatureState} -> FeatureState;
_ -> false
end,
case Stability of
required when State =:= true ->
case RequireLevel of
hard when State =:= true ->
%% The required feature flag is already enabled, we keep
%% it this way.
State;
required when NewNode ->
hard when NewNode ->
%% This is the very first time the node starts, we
%% already mark the required feature flag as enabled.
?assertNotEqual(state_changing, State),
true;
required when ProvidedBy =/= rabbit ->
hard when ProvidedBy =/= rabbit ->
?assertNotEqual(state_changing, State),
true;
required ->
hard ->
%% This is not a new node and the required feature flag
%% is disabled. This is an error and RabbitMQ must be
%% downgraded to enable the feature flag.

View File

@ -37,7 +37,9 @@ setup(#{feature_flags_file := FFFile}) ->
"Failed to initialize feature flags registry: ~tp",
[Reason],
#{domain => ?RMQLOG_DOMAIN_PRELAUNCH}),
throw({error, failed_to_initialize_feature_flags_registry})
throw({error,
{failed_to_initialize_feature_flags_registry,
Reason}})
end;
{error, Reason} ->
?LOG_ERROR(

View File

@ -47,8 +47,9 @@
enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2/1,
enable_feature_flag_with_post_enable/1,
failed_enable_feature_flag_with_post_enable/1,
have_required_feature_flag_in_cluster_and_add_member_with_it_disabled/1,
have_required_feature_flag_in_cluster_and_add_member_without_it/1,
have_soft_required_feature_flag_in_cluster_and_add_member_with_it_disabled/1,
have_soft_required_feature_flag_in_cluster_and_add_member_without_it/1,
have_hard_required_feature_flag_in_cluster_and_add_member_without_it/1,
have_unknown_feature_flag_in_cluster_and_add_member_with_it_enabled/1,
error_during_migration_after_initial_success/1,
controller_waits_for_own_task_to_finish_before_exiting/1,
@ -97,8 +98,9 @@ groups() ->
enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2,
enable_feature_flag_with_post_enable,
failed_enable_feature_flag_with_post_enable,
have_required_feature_flag_in_cluster_and_add_member_with_it_disabled,
have_required_feature_flag_in_cluster_and_add_member_without_it,
have_soft_required_feature_flag_in_cluster_and_add_member_with_it_disabled,
have_soft_required_feature_flag_in_cluster_and_add_member_without_it,
have_hard_required_feature_flag_in_cluster_and_add_member_without_it,
have_unknown_feature_flag_in_cluster_and_add_member_with_it_enabled,
error_during_migration_after_initial_success,
controller_waits_for_own_task_to_finish_before_exiting,
@ -1327,7 +1329,7 @@ failed_enable_feature_flag_with_post_enable(Config) ->
ok.
have_required_feature_flag_in_cluster_and_add_member_with_it_disabled(
have_soft_required_feature_flag_in_cluster_and_add_member_with_it_disabled(
Config) ->
AllNodes = [NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config),
connect_nodes(Nodes),
@ -1410,7 +1412,7 @@ have_required_feature_flag_in_cluster_and_add_member_with_it_disabled(
|| Node <- AllNodes],
ok.
have_required_feature_flag_in_cluster_and_add_member_without_it(
have_soft_required_feature_flag_in_cluster_and_add_member_without_it(
Config) ->
AllNodes = [NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config),
connect_nodes(Nodes),
@ -1427,6 +1429,98 @@ have_required_feature_flag_in_cluster_and_add_member_without_it(
?assertEqual(ok, inject_on_nodes([NewNode], FeatureFlags)),
?assertEqual(ok, inject_on_nodes(Nodes, RequiredFeatureFlags)),
ct:pal(
"Checking the feature flag is supported and enabled on existing the "
"cluster only"),
ok = run_on_node(
NewNode,
fun() ->
?assert(rabbit_feature_flags:is_supported(FeatureName)),
?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
DBDir = rabbit_db:dir(),
ok = filelib:ensure_path(DBDir),
SomeFile = filename:join(DBDir, "some-file.db"),
ok = file:write_file(SomeFile, <<>>),
?assertNot(rabbit_db:is_virgin_node()),
ok
end,
[]),
_ = [ok =
run_on_node(
Node,
fun() ->
?assert(rabbit_feature_flags:is_supported(FeatureName)),
?assert(rabbit_feature_flags:is_enabled(FeatureName)),
ok
end,
[])
|| Node <- Nodes],
%% Check compatibility between NewNodes and Nodes.
ok = run_on_node(
NewNode,
fun() ->
?assertEqual(
ok,
rabbit_feature_flags:check_node_compatibility(
FirstNode)),
ok
end, []),
%% Add node to cluster and synchronize feature flags.
connect_nodes(AllNodes),
override_running_nodes(AllNodes),
ct:pal(
"Synchronizing feature flags in the expanded cluster~n"
"~n"
"NOTE: Error messages about crashed migration functions can be "
"ignored for feature~n"
" flags other than `~ts`~n"
" because they assume they run inside RabbitMQ.",
[FeatureName]),
ok = run_on_node(
NewNode,
fun() ->
?assertEqual(
ok,
rabbit_feature_flags:sync_feature_flags_with_cluster(
Nodes, false)),
ok
end, []),
ct:pal("Checking the feature flag state is unchanged"),
_ = [ok =
run_on_node(
Node,
fun() ->
?assertEqual(
true,
rabbit_feature_flags:is_enabled(FeatureName)),
ok
end,
[])
|| Node <- AllNodes],
ok.
have_hard_required_feature_flag_in_cluster_and_add_member_without_it(
Config) ->
AllNodes = [NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config),
connect_nodes(Nodes),
override_running_nodes([NewNode]),
override_running_nodes(Nodes),
FeatureName = ?FUNCTION_NAME,
FeatureFlags = #{FeatureName =>
#{provided_by => rabbit,
stability => stable}},
RequiredFeatureFlags = #{FeatureName =>
#{provided_by => rabbit,
stability => required,
require_level => hard}},
?assertEqual(ok, inject_on_nodes([NewNode], FeatureFlags)),
?assertEqual(ok, inject_on_nodes(Nodes, RequiredFeatureFlags)),
ct:pal(
"Checking the feature flag is supported and enabled on existing the "
"cluster only"),

View File

@ -10,11 +10,13 @@
-rabbit_feature_flag(
{empty_basic_get_metric,
#{desc => "Count AMQP `basic.get` on empty queues in stats",
stability => required
stability => required,
require_level => hard
}}).
-rabbit_feature_flag(
{drop_unroutable_metric,
#{desc => "Count unroutable publishes to be dropped in stats",
stability => required
}}).
{drop_unroutable_metric,
#{desc => "Count unroutable publishes to be dropped in stats",
stability => required,
require_level => hard
}}).

View File

@ -16,13 +16,15 @@
-rabbit_feature_flag(
{?QUEUE_TYPE_QOS_0,
#{desc => "Support pseudo queue type for MQTT QoS 0 subscribers omitting a queue process",
stability => required
stability => required,
require_level => hard
}}).
-rabbit_feature_flag(
{delete_ra_cluster_mqtt_node,
#{desc => "Delete Ra cluster 'mqtt_node' since MQTT client IDs are tracked locally",
stability => required
stability => required,
require_level => hard
}}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -38,6 +40,7 @@
{mqtt_v5,
#{desc => "Support MQTT 5.0",
stability => required,
require_level => hard,
depends_on => [
%% MQTT 5.0 feature Will Delay Interval depends on client ID tracking in pg local.
delete_ra_cluster_mqtt_node,