Shovel: introduce operating modes
Sometimes you want a plugin to act as a library and not an application. That is, for its modules to be available at compile time and on a running node but, say, the actual runtime parameter handling and supervision of shovels to be handled by another plugin. Since we do not currently have a concept of "library plugins" or "library dependencies", this approach demonstrates one example of how some plugins can be used as libraries.
This commit is contained in:
parent
e4bc525b5e
commit
3043dc621f
|
@ -5,6 +5,8 @@
|
||||||
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
||||||
%%
|
%%
|
||||||
|
|
||||||
|
-define(SHOVEL_APP, rabbitmq_shovel).
|
||||||
|
|
||||||
-record(endpoint,
|
-record(endpoint,
|
||||||
{uris,
|
{uris,
|
||||||
resource_declaration
|
resource_declaration
|
||||||
|
|
|
@ -2,6 +2,10 @@
|
||||||
%% RabbitMQ Shovel plugin
|
%% RabbitMQ Shovel plugin
|
||||||
%% ----------------------------------------------------------------------------
|
%% ----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
{mapping, "shovel.operating_mode", "rabbitmq_shovel.operating_mode", [
|
||||||
|
[{datatype, {enum, [standard, alternative, library]}}]
|
||||||
|
]}.
|
||||||
|
|
||||||
{mapping, "shovel.topology.predeclared", "rabbitmq_shovel.topology.predeclared", [
|
{mapping, "shovel.topology.predeclared", "rabbitmq_shovel.topology.predeclared", [
|
||||||
[{datatype, {enum, [true, false]}}]
|
[{datatype, {enum, [true, false]}}]
|
||||||
]}.
|
]}.
|
||||||
|
|
|
@ -26,7 +26,14 @@ start_link() ->
|
||||||
{ok, Pid0} -> Pid0;
|
{ok, Pid0} -> Pid0;
|
||||||
{error, {already_started, Pid0}} -> Pid0
|
{error, {already_started, Pid0}} -> Pid0
|
||||||
end,
|
end,
|
||||||
Shovels = rabbit_runtime_parameters:list_component(<<"shovel">>),
|
IsStandard = rabbit_shovel_operating_mode:is_standard(),
|
||||||
|
Shovels = case IsStandard of
|
||||||
|
false ->
|
||||||
|
%% when operating in a non-standard mode, do not start any shovels
|
||||||
|
[];
|
||||||
|
true ->
|
||||||
|
rabbit_runtime_parameters:list_component(<<"shovel">>)
|
||||||
|
end,
|
||||||
[start_child({pget(vhost, Shovel), pget(name, Shovel)},
|
[start_child({pget(vhost, Shovel), pget(name, Shovel)},
|
||||||
pget(value, Shovel)) || Shovel <- Shovels],
|
pget(value, Shovel)) || Shovel <- Shovels],
|
||||||
{ok, Pid}.
|
{ok, Pid}.
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
%% This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
||||||
|
%%
|
||||||
|
|
||||||
|
-module(rabbit_shovel_operating_mode).
|
||||||
|
|
||||||
|
-include("rabbit_shovel.hrl").
|
||||||
|
|
||||||
|
-export([
|
||||||
|
operating_mode/0,
|
||||||
|
is_standard/0,
|
||||||
|
is_alternative/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
-type operating_mode() :: 'standard' | atom().
|
||||||
|
|
||||||
|
%%
|
||||||
|
%% API
|
||||||
|
%%
|
||||||
|
|
||||||
|
-spec operating_mode() -> operating_mode().
|
||||||
|
operating_mode() ->
|
||||||
|
application:get_env(?SHOVEL_APP, operating_mode, standard).
|
||||||
|
|
||||||
|
-spec is_standard() -> boolean().
|
||||||
|
is_standard() ->
|
||||||
|
operating_mode() =:= standard.
|
||||||
|
|
||||||
|
-spec is_alternative() -> boolean().
|
||||||
|
is_alternative() ->
|
||||||
|
operating_mode() =/= standard.
|
|
@ -39,10 +39,20 @@
|
||||||
{enables, recovery}]}).
|
{enables, recovery}]}).
|
||||||
|
|
||||||
register() ->
|
register() ->
|
||||||
rabbit_registry:register(runtime_parameter, <<"shovel">>, ?MODULE).
|
case rabbit_shovel_operating_mode:is_standard() of
|
||||||
|
true ->
|
||||||
|
rabbit_registry:register(runtime_parameter, <<"shovel">>, ?MODULE);
|
||||||
|
false ->
|
||||||
|
?LOG_DEBUG("Shovel: skipping runtime parameter registration, operating mode: ~ts", [rabbit_shovel_operating_mode:operating_mode()])
|
||||||
|
end.
|
||||||
|
|
||||||
unregister() ->
|
unregister() ->
|
||||||
rabbit_registry:unregister(runtime_parameter, <<"shovel">>).
|
case rabbit_shovel_operating_mode:is_standard() of
|
||||||
|
true ->
|
||||||
|
rabbit_registry:unregister(runtime_parameter, <<"shovel">>);
|
||||||
|
false ->
|
||||||
|
?LOG_DEBUG("Shovel: skipping runtime parameter deregistration, operating mode: ~ts", [rabbit_shovel_operating_mode:operating_mode()])
|
||||||
|
end.
|
||||||
|
|
||||||
validate(_VHost, <<"shovel">>, Name, Def0, User) ->
|
validate(_VHost, <<"shovel">>, Name, Def0, User) ->
|
||||||
Def = rabbit_data_coercion:to_proplist(Def0),
|
Def = rabbit_data_coercion:to_proplist(Def0),
|
||||||
|
@ -65,10 +75,20 @@ pget2(K1, K2, Defs) -> case {pget(K1, Defs), pget(K2, Defs)} of
|
||||||
end.
|
end.
|
||||||
|
|
||||||
notify(VHost, <<"shovel">>, Name, Definition, _Username) ->
|
notify(VHost, <<"shovel">>, Name, Definition, _Username) ->
|
||||||
rabbit_shovel_dyn_worker_sup_sup:adjust({VHost, Name}, Definition).
|
case rabbit_shovel_operating_mode:is_standard() of
|
||||||
|
true ->
|
||||||
|
rabbit_shovel_dyn_worker_sup_sup:adjust({VHost, Name}, Definition);
|
||||||
|
false ->
|
||||||
|
?LOG_DEBUG("Shovel: ignoring a runtime parameter update, operating mode: ~ts", [rabbit_shovel_operating_mode:operating_mode()])
|
||||||
|
end.
|
||||||
|
|
||||||
notify_clear(VHost, <<"shovel">>, Name, _Username) ->
|
notify_clear(VHost, <<"shovel">>, Name, _Username) ->
|
||||||
rabbit_shovel_dyn_worker_sup_sup:stop_child({VHost, Name}).
|
case rabbit_shovel_operating_mode:is_standard() of
|
||||||
|
true ->
|
||||||
|
rabbit_shovel_dyn_worker_sup_sup:stop_child({VHost, Name});
|
||||||
|
false ->
|
||||||
|
?LOG_DEBUG("Shovel: ignoring a cleared runtime parameter, operating mode: ~ts", [rabbit_shovel_operating_mode:operating_mode()])
|
||||||
|
end.
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
%%----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,8 @@ start_link() ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
init([Configurations]) ->
|
init([Configurations]) ->
|
||||||
|
IsStandard = rabbit_shovel_operating_mode:is_standard(),
|
||||||
|
StaticShovelSpecs = make_child_specs(IsStandard, Configurations),
|
||||||
Len = dict:size(Configurations),
|
Len = dict:size(Configurations),
|
||||||
ChildSpecs = [
|
ChildSpecs = [
|
||||||
#{
|
#{
|
||||||
|
@ -39,11 +41,15 @@ init([Configurations]) ->
|
||||||
type => supervisor,
|
type => supervisor,
|
||||||
modules => [rabbit_shovel_dyn_worker_sup_sup]
|
modules => [rabbit_shovel_dyn_worker_sup_sup]
|
||||||
}
|
}
|
||||||
| make_child_specs(Configurations)
|
| StaticShovelSpecs
|
||||||
],
|
],
|
||||||
{ok, {#{strategy => one_for_one, intensity => 2 * Len, period => 2}, ChildSpecs}}.
|
Opts = #{strategy => one_for_one, intensity => 2 * Len, period => 2},
|
||||||
|
{ok, {Opts, ChildSpecs}}.
|
||||||
|
|
||||||
make_child_specs(Configurations) ->
|
make_child_specs(false = _StandardOperatingMode, _Configurations) ->
|
||||||
|
%% when operating in a non-standard mode, do not start any shovels
|
||||||
|
[];
|
||||||
|
make_child_specs(true, Configurations) ->
|
||||||
dict:fold(
|
dict:fold(
|
||||||
fun (ShovelName, ShovelConfig, Acc) ->
|
fun (ShovelName, ShovelConfig, Acc) ->
|
||||||
[
|
[
|
||||||
|
|
Loading…
Reference in New Issue