rabbit_feature_flags: Use one callback per command

In the initial Feature flags subsystem implementation, we used a
migration function taking three arguments:
* the name of the feature flag
* its properties
* a command (`enable` or `is_enabled`)

However we wanted to implement a command (`post_enable`) and pass more
variables to the migration function. With the rework in #3940, the
migration function was therefore changed to take a single argument. That
argument was a record containing the command and much more information.
The content of the record could be different, depending on the command.

This solved the extensibility and the flexilibity of how we could call
the migration function. Unfortunately, this didn't improve its return
value as we wanted to return different things depending on the command.

In this patch, we change this completely by using a map of callbacks,
one per command, instead of that single migration function.

So before, where we had:

    #{migration_fun => {Module, Function}}

The new property is now:

    #{callbacks => #{enable => {Module, Function},
                     post_enable => {Module, Function}}}

All callbacks are still optional. You don't have to implement a fallback
empty function clause to skip commands you don't want to use oryou don't
support, as you would have to with the migration function.

Callbacks may be called with a callback-specific map of argument and
they should return the expected callback-specific return values.
Everything is defined with specs.

If a feature flag uses this new callbacks map, it will automatically
depend on `feature_flags_v2`, like the previous arity-1 migration
function. A feature flag can't define both the old migration function
and the new callbacks map.

Note that this arity-1 migration function never made it to a release of
RabbitMQ, so its support is entirely dropped with no backward
compatibility support. Likewise for the `#ffcommand{}` record.
This commit is contained in:
Jean-Sébastien Pédron 2022-07-22 16:33:18 +02:00
parent 1155808e45
commit bc6e28f5f3
No known key found for this signature in database
GPG Key ID: 39E99761A5FD94CC
5 changed files with 280 additions and 212 deletions

View File

@ -1,13 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved.
%%
-record(ffcommand, {
name :: rabbit_feature_flags:feature_name(),
props :: rabbit_feature_flags:feature_props_extended(),
command :: enable | post_enable,
extra :: #{nodes := [node()]}
}).

View File

@ -7,10 +7,9 @@
-module(rabbit_core_ff).
-include("feature_flags.hrl").
-export([direct_exchange_routing_v2_migration/1,
listener_records_in_ets_migration/1]).
-export([direct_exchange_routing_v2_enable/1,
listener_records_in_ets_enable/1,
listener_records_in_ets_post_enable/1]).
-rabbit_feature_flag(
{classic_mirrored_queue_version,
@ -74,55 +73,64 @@
-rabbit_feature_flag(
{direct_exchange_routing_v2,
#{desc => "v2 direct exchange routing implementation",
stability => stable,
depends_on => [feature_flags_v2, implicit_default_bindings],
migration_fun => {?MODULE, direct_exchange_routing_v2_migration}
#{desc => "v2 direct exchange routing implementation",
stability => stable,
depends_on => [feature_flags_v2, implicit_default_bindings],
callbacks => #{enable => {?MODULE, direct_exchange_routing_v2_enable}}
}}).
-rabbit_feature_flag(
{listener_records_in_ets,
#{desc => "Store listener records in ETS instead of Mnesia",
stability => stable,
depends_on => [feature_flags_v2],
migration_fun => {?MODULE, listener_records_in_ets_migration}
#{desc => "Store listener records in ETS instead of Mnesia",
stability => stable,
depends_on => [feature_flags_v2],
callbacks => #{enable =>
{?MODULE, listener_records_in_ets_enable},
post_enable =>
{?MODULE, listener_records_in_ets_post_enable}}
}}).
%% -------------------------------------------------------------------
%% Direct exchange routing v2.
%% -------------------------------------------------------------------
-spec direct_exchange_routing_v2_migration(#ffcommand{}) ->
ok | {error, term()}.
direct_exchange_routing_v2_migration(#ffcommand{name = FeatureName,
command = enable}) ->
-spec direct_exchange_routing_v2_enable(Args) -> Ret when
Args :: rabbit_feature_flags:enable_callback_args(),
Ret :: rabbit_feature_flags:enable_callback_ret().
direct_exchange_routing_v2_enable(#{feature_name := FeatureName}) ->
TableName = rabbit_index_route,
ok = rabbit_table:wait([rabbit_route], _Retry = true),
try
ok = rabbit_table:create(TableName, rabbit_table:rabbit_index_route_definition()),
ok = rabbit_table:create(
TableName, rabbit_table:rabbit_index_route_definition()),
case rabbit_table:ensure_table_copy(TableName, node(), ram_copies) of
ok ->
ok = rabbit_binding:populate_index_route_table();
{error, Err} = Error ->
rabbit_log_feature_flags:error("Failed to add copy of table ~s to node ~p: ~p",
[TableName, node(), Err]),
rabbit_log_feature_flags:error(
"Feature flags: `~s`: failed to add copy of table ~s to "
"node ~p: ~p",
[FeatureName, TableName, node(), Err]),
Error
end
catch throw:Reason ->
rabbit_log_feature_flags:error("Enabling feature flag ~s failed: ~p",
[FeatureName, Reason]),
catch throw:Reason ->
rabbit_log_feature_flags:error(
"Feature flags: `~s`: enable callback failure: ~p",
[FeatureName, Reason]),
{error, Reason}
end;
direct_exchange_routing_v2_migration(_) ->
ok.
end.
listener_records_in_ets_migration(#ffcommand{name = FeatureName,
command = enable}) ->
%% -------------------------------------------------------------------
%% Listener records moved from Mnesia to ETS.
%% -------------------------------------------------------------------
listener_records_in_ets_enable(#{feature_name := FeatureName}) ->
try
rabbit_misc:execute_mnesia_transaction(
fun () ->
mnesia:lock({table, rabbit_listener}, read),
Listeners = mnesia:select(rabbit_listener, [{'$1',[],['$1']}]),
Listeners = mnesia:select(
rabbit_listener, [{'$1',[],['$1']}]),
lists:foreach(
fun(Listener) ->
ets:insert(rabbit_listener_ets, Listener)
@ -132,12 +140,13 @@ listener_records_in_ets_migration(#ffcommand{name = FeatureName,
throw:{error, {no_exists, rabbit_listener}} ->
ok;
throw:{error, Reason} ->
rabbit_log_feature_flags:error("Enabling feature flag ~s failed: ~p",
[FeatureName, Reason]),
rabbit_log_feature_flags:error(
"Feature flags: `~s`: failed to migrate Mnesia table: ~p",
[FeatureName, Reason]),
{error, Reason}
end;
listener_records_in_ets_migration(#ffcommand{name = FeatureName,
command = post_enable}) ->
end.
listener_records_in_ets_post_enable(#{feature_name := FeatureName}) ->
try
case mnesia:delete_table(rabbit_listener) of
{atomic, ok} ->
@ -145,15 +154,15 @@ listener_records_in_ets_migration(#ffcommand{name = FeatureName,
{aborted, {no_exists, _}} ->
ok;
{aborted, Err} ->
rabbit_log_feature_flags:error("Enabling feature flag ~s failed to delete mnesia table: ~p",
[FeatureName, Err]),
%% adheres to the callback interface
rabbit_log_feature_flags:error(
"Feature flags: `~s`: failed to delete Mnesia table: ~p",
[FeatureName, Err]),
ok
end
catch
throw:{error, Reason} ->
rabbit_log_feature_flags:error("Enabling feature flag ~s failed: ~p",
[FeatureName, Reason]),
%% adheres to the callback interface
rabbit_log_feature_flags:error(
"Feature flags: `~s`: failed to delete Mnesia table: ~p",
[FeatureName, Reason]),
ok
end.

View File

@ -82,8 +82,6 @@
-include_lib("rabbit_common/include/logging.hrl").
-include("feature_flags.hrl").
-export([list/0,
list/1,
list/2,
@ -130,8 +128,7 @@
does_enabled_feature_flags_list_file_exist/0,
read_enabled_feature_flags_list/0,
run_migration_fun/3,
uses_migration_fun_v2/1,
uses_migration_fun_v2/2]).
uses_callbacks/1]).
-ifdef(TEST).
-export([mark_as_enabled_remotely/4,
@ -160,7 +157,9 @@
doc_url => string(),
stability => stability(),
depends_on => [feature_name()],
migration_fun => migration_fun_name()}.
migration_fun => migration_fun_name(),
callbacks =>
#{callback_name() => migration_fun_name()}}.
%% The feature flag properties.
%%
%% All properties are optional.
@ -174,7 +173,8 @@
%% <li>`depends_on': a list of feature flags name which must be enabled
%% before this one</li>
%% <li>`migration_fun': a migration function specified by its module and
%% function names</li>
%% function names (deprecated for new feature flags)</li>
%% <li>`callbacks': a map of callback names</li>
%% </ul>
%%
%% Note that the `migration_fun' is a {@type migration_fun_name()},
@ -183,6 +183,9 @@
%% is that we must be able to represent it as an Erlang term when
%% we regenerate the registry module source code (using {@link
%% erl_syntax:abstract/1}).
%%
%% Likewise for `callbacks': each one is a {@type migration_fun_name()} but
%% functions must conform to the appropriate signature for this callback.
-type feature_flags() :: #{feature_name() => feature_props_extended()}.
%% The feature flags map as returned or accepted by several functions in
@ -193,6 +196,8 @@
doc_url => string(),
stability => stability(),
migration_fun => migration_fun_name(),
callbacks =>
#{callback_name() => migration_fun_name()},
depends_on => [feature_name()],
provided_by => atom()}.
%% The feature flag properties, once expanded by this module when feature
@ -219,16 +224,12 @@
%% The name of the module and function to call when changing the state of
%% the feature flag.
-type migration_fun() :: migration_fun_v1() | migration_fun_v2().
%% The migration function signature.
-type migration_fun_context() :: enable | is_enabled.
-type migration_fun_v1() :: fun((feature_name(),
feature_props_extended(),
migration_fun_context())
-> ok | {error, any()} |
boolean() | undefined).
-type migration_fun() :: fun((feature_name(),
feature_props_extended(),
migration_fun_context())
-> ok | {error, any()} | boolean() | undefined).
%% The migration function signature (v1).
%%
%% It is called with context `enable' when a feature flag is being enabled.
@ -244,10 +245,29 @@
%% is actually enabled. It is useful on RabbitMQ startup, just in case
%% the previous instance failed to write the feature flags list file.
-type ffcommand() :: #ffcommand{}.
-type callbacks() :: enable_callback() | post_enable_callback().
%% All possible callbacks.
-type migration_fun_v2() :: fun((ffcommand()) -> ok | no_return()).
%% The migration function signature (v2).
-type callbacks_args() :: enable_callback_args() | post_enable_callback_args().
%% All possible callbacks arguments.
-type callbacks_rets() :: enable_callback_ret() | post_enable_callback_ret().
%% All possible callbacks return values.
-type callback_name() :: enable | post_enable.
%% Name of the callback.
-type enable_callback() :: fun((feature_name(),
feature_props_extended(),
enable_callback_args())
-> enable_callback_ret()).
%% The callback called while enabling a feature flag.
%%
%% It is called when a feature flag is being enabled. The function is
%% responsible for this feature-flag-specific verification and data
%% conversion. It returns `ok' if RabbitMQ can mark the feature flag as
%% enabled an continue with the next one, if any. `{error, Reason}' and
%% exceptions are an error and the feature flag will remain disabled.
%%
%% The migration function is called on all nodes which fulfill the following
%% conditions:
@ -257,30 +277,66 @@
%% function.</li>
%% </ol>
%%
%% All executions of the migration function on these nodes will run in
%% parallel (concurrently). The migration function is responsible for its own
%% locking and synchronization.
%%
%% It is first called with the command `enable' when a feature flag is being
%% enabled. The function is responsible for this feature-flag-specific
%% verification and data conversion. It returns `ok' if RabbitMQ can mark the
%% feature flag as enabled an continue with the next one, if any. Other return
%% values or exceptions are an error and the feature flag should remain
%% disabled.
%% All executions of the callback on these nodes will run in parallel
%% (concurrently). The callback is responsible for its own locking and
%% synchronization.
%%
%% It is then called with the command `post_enable' after a feature flag has
%% been marked as enabled. The return value or enay exceptions are ignored and
%% the feature flag will remain enabled even if there is a failure.
%%
%% When a node is joining a cluster where one side has a feature flag enabled,
%% that feature flag will be enabled on the other side. It means the migration
%% function will run on the nodes where it is disabled. Therefore the
%% migration function can run in clusters where some nodes previously executed
%% it and the feature flag was already enabled.
%% that feature flag will be enabled on the other side. It means the callback
%% will run on the nodes where it is disabled. Therefore the callback can run
%% in clusters where some nodes previously executed it and the feature flag
%% was already enabled.
%%
%% The migration function should also be idempotent. For instance, if the
%% feature flag couldn't be marked as enabled everywhere after the migration
%% function was called with the `enable' command, it may be called again.
%% The callback should also be idempotent. For instance, if the feature flag
%% couldn't be marked as enabled everywhere after the callback was called, it
%% may be called again.
-type enable_callback_args() :: #{feature_name := feature_name(),
feature_props := feature_props_extended(),
command := enable,
nodes := [node()]}.
%% A map passed to {@type enable_callback()}.
-type enable_callback_ret() :: ok | {error, term()}.
%% Return value of the `enable' callback.
-type post_enable_callback() :: fun((feature_name(),
feature_props_extended(),
post_enable_callback_args())
-> post_enable_callback_ret()).
%% The callback called after enabling a feature flag.
%%
%% It is called after a feature flag has been marked as enabled. The return
%% value or any exceptions are ignored and the feature flag will remain
%% enabled even if there is a failure.
%%
%% All executions of the callback on nodes will run in parallel
%% (concurrently). The callback is responsible for its own locking and
%% synchronization.
%%
%% When a node is joining a cluster where one side has a feature flag enabled,
%% that feature flag will be enabled on the other side. It means the callback
%% will run on the nodes where it is disabled. Therefore the callback can run
%% in clusters where some nodes previously executed it and the feature flag
%% was already enabled.
%%
%% The callback should also be idempotent. For instance, if the feature flag
%% couldn't be marked as enabled everywhere after the callback was called, it
%% may be called again.
-type post_enable_callback_args() :: #{feature_name := feature_name(),
feature_props :=
feature_props_extended(),
command := post_enable,
nodes := [node()]}.
%% A map passed to {@type post_enable_callback()}.
-type post_enable_callback_ret() :: ok.
%% Return value of the `post_enable' callback.
-type inventory() :: #{applications := [atom()],
feature_flags := feature_flags(),
@ -302,10 +358,17 @@
stability/0,
migration_fun_name/0,
migration_fun/0,
migration_fun_v1/0,
migration_fun_v2/0,
migration_fun_context/0,
ffcommand/0,
callbacks/0,
callback_name/0,
callbacks_args/0,
callbacks_rets/0,
enable_callback/0,
enable_callback_args/0,
enable_callback_ret/0,
post_enable_callback/0,
post_enable_callback_args/0,
post_enable_callback_ret/0,
inventory/0,
cluster_inventory/0]).
@ -397,24 +460,15 @@ sort_feature_flags_v2_first(FeatureNames) ->
end, FeatureNames).
requires_feature_flags_v2(FeatureName) ->
uses_migration_fun_v2(FeatureName).
uses_callbacks(FeatureName).
uses_migration_fun_v2(FeatureName) ->
uses_callbacks(FeatureName) when is_atom(FeatureName) ->
case rabbit_ff_registry:get(FeatureName) of
undefined ->
false;
FeatureProps ->
case maps:get(migration_fun, FeatureProps, none) of
{MigrationMod, MigrationFun}
when is_atom(MigrationMod) andalso is_atom(MigrationFun) ->
uses_migration_fun_v2(MigrationMod, MigrationFun);
_ ->
false
end
end.
uses_migration_fun_v2(MigrationMod, MigrationFun) ->
erlang:function_exported(MigrationMod, MigrationFun, 1).
undefined -> false;
FeatureProps -> uses_callbacks(FeatureProps)
end;
uses_callbacks(FeatureProps) when is_map(FeatureProps) ->
maps:is_key(callbacks, FeatureProps).
enable_v1(FeatureName) ->
rabbit_log_feature_flags:debug(
@ -964,29 +1018,57 @@ prepare_queried_feature_flags([], AllFeatureFlags) ->
AllFeatureFlags.
assert_feature_flag_is_valid(FeatureName, FeatureProps) ->
?assert(is_atom(FeatureName)),
?assert(is_map(FeatureProps)),
Stability = get_stability(FeatureProps),
?assert(Stability =:= stable orelse
Stability =:= experimental orelse
Stability =:= required),
case FeatureProps of
#{migration_fun := _} when Stability =:= required ->
try
?assert(is_atom(FeatureName)),
?assert(is_map(FeatureProps)),
Stability = get_stability(FeatureProps),
?assert(Stability =:= stable orelse
Stability =:= experimental orelse
Stability =:= required),
case FeatureProps of
#{migration_fun := _} when Stability =:= required ->
rabbit_log_feature_flags:error(
"Feature flags: `~s`: a required feature flag can't have a "
"migration function",
[FeatureName]),
throw(
{required_feature_flag_with_migration_fun, FeatureName});
#{migration_fun := MigrationFunMF} ->
?assertMatch({_, _}, MigrationFunMF),
{MigrationMod, MigrationFun} = MigrationFunMF,
?assert(is_atom(MigrationMod)),
?assert(is_atom(MigrationFun)),
?assert(
erlang:function_exported(MigrationMod, MigrationFun, 3)),
?assertNot(maps:is_key(callbacks, FeatureProps));
#{callbacks := Callbacks} ->
Known = [enable,
post_enable],
?assert(is_map(Callbacks)),
?assertEqual([], maps:keys(Callbacks) -- Known),
lists:foreach(
fun(CallbackMF) ->
?assertMatch({_, _}, CallbackMF),
{CallbackMod, CallbackFun} = CallbackMF,
?assert(is_atom(CallbackMod)),
?assert(is_atom(CallbackFun)),
?assert(erlang:function_exported(
CallbackMod, CallbackFun, 1))
end, maps:values(Callbacks)),
?assertNot(maps:is_key(migration_fun, FeatureProps));
_ ->
ok
end
catch
Class:Reason:Stacktrace ->
rabbit_log_feature_flags:error(
"Feature flags: `~s`: a required feature flag can't have a "
"migration function",
[FeatureName]),
throw({required_feature_flag_with_migration_fun, FeatureName});
#{migration_fun := MigrationFunMF} ->
?assertMatch({_, _}, MigrationFunMF),
{MigrationMod, MigrationFun} = MigrationFunMF,
?assert(is_atom(MigrationMod)),
?assert(is_atom(MigrationFun)),
?assert(
erlang:function_exported(MigrationMod, MigrationFun, 1) orelse
erlang:function_exported(MigrationMod, MigrationFun, 3));
_ ->
ok
"Feature flags: `~s`: invalid properties:~n"
"Feature flags: `~s`: Properties: ~p~n"
"Feature flags: `~s`: Error: ~p",
[FeatureName,
FeatureName, FeatureProps,
FeatureName, Reason]),
erlang:raise(Class, Reason, Stacktrace)
end.
-spec merge_new_feature_flags(feature_flags(),
@ -2021,7 +2103,7 @@ verify_which_feature_flags_are_actually_enabled() ->
%% function fails), we keep the current state of the feature flag.
List1 = maps:fold(
fun(Name, Props, Acc) ->
case uses_migration_fun_v2(Name) of
case uses_callbacks(Name) of
true ->
Acc;
false ->

View File

@ -31,8 +31,6 @@
-include_lib("rabbit_common/include/logging.hrl").
-include("feature_flags.hrl").
-export([is_supported/1, is_supported/2,
enable/1,
check_node_compatibility/1,
@ -662,9 +660,7 @@ do_enable(#{states_per_node := _} = Inventory, FeatureName, Nodes) ->
%% several other feature flags.
case enable_dependencies(Inventory, FeatureName) of
{ok, Inventory1} ->
Extra = #{nodes => Nodes},
Rets = run_migration_fun(
Nodes, FeatureName, enable, Extra, infinity),
Rets = run_callback(Nodes, FeatureName, enable, #{}, infinity),
maps:fold(
fun
(_Node, ok, {ok, _} = Ret) -> Ret;
@ -682,11 +678,9 @@ do_enable(#{states_per_node := _} = Inventory, FeatureName, Nodes) ->
Ret :: ok.
post_enable(#{states_per_node := _}, FeatureName, Nodes) ->
case rabbit_feature_flags:uses_migration_fun_v2(FeatureName) of
case rabbit_feature_flags:uses_callbacks(FeatureName) of
true ->
Extra = #{nodes => Nodes},
_ = run_migration_fun(
Nodes, FeatureName, post_enable, Extra, infinity),
_ = run_callback(Nodes, FeatureName, post_enable, #{}, infinity),
ok;
false ->
ok
@ -1062,81 +1056,81 @@ enable_dependencies1(
%% Migration function.
%% --------------------------------------------------------------------
-spec run_migration_fun(Nodes, FeatureName, Command, Extra, Timeout) ->
-spec run_callback(Nodes, FeatureName, Command, Extra, Timeout) ->
Rets when
Nodes :: [node()],
FeatureName :: rabbit_feature_flags:feature_name(),
Command :: atom(),
Extra :: map(),
Command :: rabbit_feature_flags:callback_name(),
Extra :: rabbit_feature_flags:callbacks_args(),
Timeout :: timeout(),
Rets :: #{node() => term()}.
run_migration_fun(Nodes, FeatureName, Command, Extra, Timeout) ->
run_callback(Nodes, FeatureName, Command, Extra, Timeout) ->
FeatureProps = rabbit_ff_registry:get(FeatureName),
case maps:get(migration_fun, FeatureProps, none) of
{MigrationMod, MigrationFun}
when is_atom(MigrationMod) andalso is_atom(MigrationFun) ->
UsesMFv2 = rabbit_feature_flags:uses_migration_fun_v2(
MigrationMod, MigrationFun),
case UsesMFv2 of
true ->
%% The migration fun API v2 is of the form:
%% MigrationMod:MigrationFun(#ffcommand{...}).
%%
%% Also, the function is executed on all nodes in
%% parallel.
FFCommand = #ffcommand{
name = FeatureName,
props = FeatureProps,
command = Command,
extra = Extra},
run_migration_fun_v2(
Nodes, MigrationMod, MigrationFun, FFCommand, Timeout);
false ->
%% The migration fun API v1 is of the form:
%% MigrationMod:MigrationFun(
%% FeatureName, FeatureProps, Command).
%%
%% Also, the function is executed once on the calling node
%% only.
Ret = rabbit_feature_flags:run_migration_fun(
FeatureName, FeatureProps, Command),
#{node() => Ret}
end;
none ->
#{};
Invalid ->
Callbacks = maps:get(callbacks, FeatureProps, #{}),
case Callbacks of
#{Command := {CallbackMod, CallbackFun}}
when is_atom(CallbackMod) andalso is_atom(CallbackFun) ->
%% The migration fun API v2 is of the form:
%% CallbackMod:CallbackFun(...)
%%
%% Also, the function is executed on all nodes in parallel.
Args = Extra#{feature_name => FeatureName,
feature_props => FeatureProps,
command => Command,
nodes => Nodes},
do_run_callback(Nodes, CallbackMod, CallbackFun, Args, Timeout);
#{Command := Invalid} ->
?LOG_ERROR(
"Feature flags: `~s`: invalid migration function: ~p",
[FeatureName, Invalid],
"Feature flags: `~s`: invalid callback for `~s`: ~p",
[FeatureName, Command, Invalid],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
#{node() => {error, {invalid_migration_fun, Invalid}}}
#{node() =>
{error, {invalid_callback, FeatureName, Command, Invalid}}};
_ ->
%% The migration fun API v1 is of the form:
%% MigrationMod:MigrationFun(
%% FeatureName, FeatureProps, Command).
%%
%% Also, the function is executed once on the calling node only.
%%
%% Only `enable' is supported by the v1 migration function.
Ret = case Command of
enable ->
Ret0 = rabbit_feature_flags:run_migration_fun(
FeatureName, FeatureProps, Command),
case Ret0 of
{error, no_migration_fun} -> ok;
_ -> Ret0
end;
_ ->
ok
end,
#{node() => Ret}
end.
-spec run_migration_fun_v2(Nodes, MigrationMod, MigrationFun, FFCommand,
Timeout) -> Rets when
-spec do_run_callback(Nodes, CallbackMod, CallbackFun, Args, Timeout) ->
Rets when
Nodes :: [node()],
MigrationMod :: module(),
MigrationFun :: atom(),
FFCommand :: rabbit_feature_flags:ffcommand(),
CallbackMod :: module(),
CallbackFun :: atom(),
Args :: rabbit_feature_flags:callbacks_args(),
Timeout :: timeout(),
Rets :: #{node() => term}.
Rets :: #{node() => rabbit_feature_flags:callbacks_rets()}.
run_migration_fun_v2(
Nodes, MigrationMod, MigrationFun,
#ffcommand{name = FeatureName} = FFCommand,
Timeout) ->
do_run_callback(Nodes, CallbackMod, CallbackFun, Args, Timeout) ->
#{feature_name := FeatureName,
command := Command} = Args,
?LOG_DEBUG(
"Feature flags: `~s`: run migration function (v2) ~s:~s~n"
"Feature flags: with command=~p~n"
"Feature flags: `~s`: run callback ~s:~s (~s callback)~n"
"Feature flags: with args = ~p~n"
"Feature flags: on nodes ~p",
[FeatureName, MigrationMod, MigrationFun, FFCommand, Nodes],
[FeatureName, CallbackMod, CallbackFun, Command, Args, Nodes],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
Rets = rpc_calls(
Nodes, MigrationMod, MigrationFun, [FFCommand], Timeout),
Rets = rpc_calls(Nodes, CallbackMod, CallbackFun, [Args], Timeout),
?LOG_DEBUG(
"Feature flags: `~s`: migration function (v2) ~s:~s returned:~n"
"Feature flags: `~s`: callback ~s:~s (~s callback) returned:~n"
"Feature flags: ~p",
[FeatureName, MigrationMod, MigrationFun, Rets],
[FeatureName, CallbackMod, CallbackFun, Command, Rets],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
Rets.

View File

@ -13,8 +13,6 @@
-include_lib("rabbit_common/include/logging.hrl").
-include("feature_flags.hrl").
-export([suite/0,
all/0,
groups/0,
@ -27,8 +25,8 @@
mf_count_runs/3,
mf_wait_and_count_runs/3,
mf_wait_and_count_runs_v2/1,
mf_wait_and_count_runs_in_post_enable/1,
mf_wait_and_count_runs_v2_enable/1,
mf_wait_and_count_runs_v2_post_enable/1,
enable_unknown_feature_flag_on_a_single_node/1,
enable_supported_feature_flag_on_a_single_node/1,
@ -257,7 +255,7 @@ inject_on_nodes(Nodes, FeatureFlags) ->
"Injecting feature flags on nodes~n"
" Nodes: ~p~n"
" Feature flags: ~p~n",
[FeatureFlags, Nodes]),
[Nodes, FeatureFlags]),
_ = [ok =
run_on_node(
Node,
@ -296,26 +294,22 @@ mf_wait_and_count_runs(_FeatureName, _FeatureProps, enable) ->
bump_runs(),
ok.
mf_wait_and_count_runs_v2(#ffcommand{command = enable}) ->
mf_wait_and_count_runs_v2_enable(_Args) ->
Peer = get_peer_proc(),
Peer ! {node(), self(), waiting},
?LOG_NOTICE("Migration function: waiting for signal from ~p...", [Peer]),
receive proceed -> ok end,
?LOG_NOTICE("Migration function: unblocked!", []),
bump_runs(),
ok;
mf_wait_and_count_runs_v2(_) ->
ok.
mf_wait_and_count_runs_in_post_enable(#ffcommand{command = post_enable}) ->
mf_wait_and_count_runs_v2_post_enable(_Args) ->
Peer = get_peer_proc(),
Peer ! {node(), self(), waiting},
?LOG_NOTICE("Migration function: waiting for signal from ~p...", [Peer]),
receive proceed -> ok end,
?LOG_NOTICE("Migration function: unblocked!", []),
bump_runs(),
ok;
mf_wait_and_count_runs_in_post_enable(_) ->
ok.
-define(PT_PEER_PROC, {?MODULE, peer_proc}).
@ -929,8 +923,9 @@ enable_feature_flag_in_cluster_and_add_member_concurrently_mfv2(Config) ->
FeatureFlags = #{FeatureName =>
#{provided_by => ?MODULE,
stability => stable,
migration_fun => {?MODULE,
mf_wait_and_count_runs_v2}}},
callbacks =>
#{enable =>
{?MODULE, mf_wait_and_count_runs_v2_enable}}}},
inject_on_nodes(AllNodes, FeatureFlags),
ct:pal(
@ -1226,8 +1221,9 @@ enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2(Config) ->
FeatureFlags = #{FeatureName =>
#{provided_by => ?MODULE,
stability => stable,
migration_fun => {?MODULE,
mf_wait_and_count_runs_v2}}},
callbacks =>
#{enable =>
{?MODULE, mf_wait_and_count_runs_v2_enable}}}},
inject_on_nodes(AllNodes, FeatureFlags),
UsingFFv1 = not ?config(enable_feature_flags_v2, Config),
@ -1359,9 +1355,9 @@ enable_feature_flag_with_post_enable(Config) ->
FeatureFlags = #{FeatureName =>
#{provided_by => ?MODULE,
stability => stable,
migration_fun =>
{?MODULE,
mf_wait_and_count_runs_in_post_enable}}},
callbacks =>
#{post_enable =>
{?MODULE, mf_wait_and_count_runs_v2_post_enable}}}},
inject_on_nodes(AllNodes, FeatureFlags),
ct:pal(