pg2 => pg for OTP 24 compatibility

there is still one failing queue federation test.
This commit is contained in:
Michael Klishin 2021-03-02 18:30:34 +03:00
parent 82512ce206
commit ad47eba2fa
No known key found for this signature in database
GPG Key ID: E80EDCFA0CDB21EE
12 changed files with 105 additions and 47 deletions

View File

@ -7,9 +7,6 @@
-module(mirrored_supervisor).
%% pg2 is deprecated in OTP 23.
-compile(nowarn_deprecated_function).
%% Mirrored Supervisor
%% ===================
%%
@ -211,6 +208,7 @@ start_link0(Prefix, Group, TxFun, Init) ->
end.
init(Mod, Args) ->
_ = pg:start_link(),
case Mod:init(Args) of
{ok, {{Bad, _, _}, _ChildSpecs}} when
Bad =:= simple_one_for_one -> erlang:error(badarg);
@ -247,7 +245,7 @@ fold(FunAtom, Sup, AggFun) ->
Group = call(Sup, group),
lists:foldl(AggFun, [],
[apply(?SUPERVISOR, FunAtom, [D]) ||
M <- pg2:get_members(Group),
M <- pg:get_members(Group),
D <- [delegate(M)]]).
child(Sup, Id) ->
@ -279,9 +277,8 @@ handle_call({init, Overall}, _From,
tx_fun = TxFun,
initial_childspecs = ChildSpecs}) ->
process_flag(trap_exit, true),
pg2:create(Group),
ok = pg2:join(Group, Overall),
Rest = pg2:get_members(Group) -- [Overall],
ok = pg:join(Group, Overall),
Rest = pg:get_members(Group) -- [Overall],
case Rest of
[] -> TxFun(fun() -> delete_all(Group) end);
_ -> ok
@ -355,9 +352,8 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason},
tx_fun = TxFun,
overall = O,
child_order = ChildOrder}) ->
%% TODO load balance this
%% No guarantee pg2 will have received the DOWN before us.
R = case lists:sort(pg2:get_members(Group)) -- [Pid] of
%% No guarantee pg will have received the DOWN before us.
R = case lists:sort(pg:get_members(Group)) -- [Pid] of
[O | _] -> ChildSpecs =
TxFun(fun() -> update_all(O, Pid) end),
[start(Delegate, ChildSpec)
@ -382,7 +378,7 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
tell_all_peers_to_die(Group, Reason) ->
[cast(P, {die, Reason}) || P <- pg2:get_members(Group) -- [self()]].
[cast(P, {die, Reason}) || P <- pg:get_members(Group) -- [self()]].
maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) ->
try TxFun(fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of

View File

@ -44,3 +44,5 @@
-define(DEF_PREFETCH, 1000).
-define(FEDERATION_GUIDE_URL, <<"https://rabbitmq.com/federation.html">>).
-define(FEDERATION_PG_SCOPE, rabbitmq_federation_pg_scope).

View File

@ -7,6 +7,8 @@
-module(rabbit_federation_app).
-include("rabbit_federation.hrl").
-behaviour(application).
-export([start/2, stop/1]).
@ -32,7 +34,14 @@ start(_Type, _StartArgs) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
stop(_State) ->
rabbit_federation_pg:stop_scope(),
ok.
%%----------------------------------------------------------------------------
init([]) -> {ok, {{one_for_one, 3, 10}, []}}.
init([]) ->
Flags = #{
strategy => one_for_one,
intensity => 3,
period => 10
},
{ok, {Flags, []}}.

View File

@ -7,9 +7,6 @@
-module(rabbit_federation_exchange_link).
%% pg2 is deprecated in OTP 23.
-compile(nowarn_deprecated_function).
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_federation.hrl").
@ -51,7 +48,9 @@
%% start during exchange recovery, when rabbit is not fully started
%% and the Erlang client is not running. This then gets invoked when
%% the federation app is started.
go() -> cast(go).
go() ->
rabbit_federation_pg:start_scope(),
cast(go).
add_binding(S, XN, B) -> cast(XN, {enqueue, S, {add_binding, B}}).
remove_bindings(S, XN, Bs) -> cast(XN, {enqueue, S, {remove_bindings, Bs}}).
@ -247,16 +246,13 @@ cast(Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- all()].
cast(XName, Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- x(XName)].
join(Name) ->
pg2:create(pgname(Name)),
ok = pg2:join(pgname(Name), self()).
ok = pg:join(?FEDERATION_PG_SCOPE, pgname(Name), self()).
all() ->
pg2:create(pgname(rabbit_federation_exchanges)),
pg2:get_members(pgname(rabbit_federation_exchanges)).
pg:get_members(?FEDERATION_PG_SCOPE, pgname(rabbit_federation_exchanges)).
x(XName) ->
pg2:create(pgname({rabbit_federation_exchange, XName})),
pg2:get_members(pgname({rabbit_federation_exchange, XName})).
pg:get_members(?FEDERATION_PG_SCOPE, pgname({rabbit_federation_exchange, XName})).
%%----------------------------------------------------------------------------

View File

@ -21,6 +21,11 @@
%%----------------------------------------------------------------------------
start_link() ->
_ = pg:start_link(),
%% This scope is used by concurrently starting exchange and queue links,
%% and other places, so we have to start it very early outside of the supervision tree.
%% The scope is stopped in stop/1.
rabbit_federation_pg:start_scope(),
mirrored_supervisor:start_link({local, ?SUPERVISOR}, ?SUPERVISOR,
fun rabbit_misc:execute_mnesia_transaction/1,
?MODULE, []).

View File

@ -0,0 +1,25 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_federation_pg).
-include("rabbit_federation.hrl").
-export([start_scope/0, stop_scope/0]).
start_scope() ->
rabbit_log_federation:debug("Starting pg scope ~s", [?FEDERATION_PG_SCOPE]),
_ = pg:start_link(?FEDERATION_PG_SCOPE).
stop_scope() ->
case whereis(?FEDERATION_PG_SCOPE) of
Pid when is_pid(Pid) ->
rabbit_log_federation:debug("Stopping pg scope ~s", [?FEDERATION_PG_SCOPE]),
exit(Pid, normal);
_ ->
ok
end.

View File

@ -7,9 +7,6 @@
-module(rabbit_federation_queue_link).
%% pg2 is deprecated in OTP 23.
-compile(nowarn_deprecated_function).
-include_lib("rabbit/include/amqqueue.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_federation.hrl").
@ -33,7 +30,9 @@ start_link(Args) ->
run(QName) -> cast(QName, run).
pause(QName) -> cast(QName, pause).
go() -> cast(go).
go() ->
rabbit_federation_pg:start_scope(),
cast(go).
%%----------------------------------------------------------------------------
%%call(QName, Msg) -> [gen_server2:call(Pid, Msg, infinity) || Pid <- q(QName)].
@ -41,15 +40,13 @@ cast(Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- all()].
cast(QName, Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- q(QName)].
join(Name) ->
pg2:create(pgname(Name)),
ok = pg2:join(pgname(Name), self()).
ok = pg:join(?FEDERATION_PG_SCOPE, pgname(Name), self()).
all() ->
pg2:create(pgname(rabbit_federation_queues)),
pg2:get_members(pgname(rabbit_federation_queues)).
pg:get_members(pgname(rabbit_federation_queues)).
q(QName) ->
case pg2:get_members(pgname({rabbit_federation_queue, QName})) of
case pg:get_members(?FEDERATION_PG_SCOPE, pgname({rabbit_federation_queue, QName})) of
{error, {no_such_group, _}} ->
[];
Members ->
@ -184,7 +181,7 @@ terminate(Reason, #not_started{upstream = Upstream,
queue = Q}) when ?is_amqqueue(Q) ->
QName = amqqueue:get_name(Q),
rabbit_federation_link_util:log_terminate(Reason, Upstream, UParams, QName),
pg2:delete(pgname({rabbit_federation_queue, QName})),
_ = pg:leave(?FEDERATION_PG_SCOPE, pgname({rabbit_federation_queue, QName}), self()),
ok;
terminate(Reason, #state{dconn = DConn,
@ -196,7 +193,7 @@ terminate(Reason, #state{dconn = DConn,
rabbit_federation_link_util:ensure_connection_closed(DConn),
rabbit_federation_link_util:ensure_connection_closed(Conn),
rabbit_federation_link_util:log_terminate(Reason, Upstream, UParams, QName),
pg2:delete(pgname({rabbit_federation_queue, QName})),
_ = pg:leave(?FEDERATION_PG_SCOPE, pgname({rabbit_federation_queue, QName}), self()),
ok.
code_change(_OldVsn, State, _Extra) ->

View File

@ -22,6 +22,11 @@
%%----------------------------------------------------------------------------
start_link() ->
_ = pg:start_link(),
%% This scope is used by concurrently starting exchange and queue links,
%% and other places, so we have to start it very early outside of the supervision tree.
%% The scope is stopped in stop/1.
rabbit_federation_pg:start_scope(),
mirrored_supervisor:start_link({local, ?SUPERVISOR}, ?SUPERVISOR,
fun rabbit_misc:execute_mnesia_transaction/1,
?MODULE, []).

View File

@ -12,6 +12,7 @@
%% Supervises everything. There is just one of these.
-include_lib("rabbit_common/include/rabbit.hrl").
-define(SUPERVISOR, rabbit_federation_sup).
-export([start_link/0, stop/0]).
@ -45,19 +46,38 @@ stop() ->
%%----------------------------------------------------------------------------
init([]) ->
Status = {status, {rabbit_federation_status, start_link, []},
transient, ?WORKER_WAIT, worker,
[rabbit_federation_status]},
XLinkSupSup = {x_links,
{rabbit_federation_exchange_link_sup_sup, start_link, []},
transient, ?SUPERVISOR_WAIT, supervisor,
[rabbit_federation_exchange_link_sup_sup]},
QLinkSupSup = {q_links,
{rabbit_federation_queue_link_sup_sup, start_link, []},
transient, ?SUPERVISOR_WAIT, supervisor,
[rabbit_federation_queue_link_sup_sup]},
Status = #{
id => status,
start => {rabbit_federation_status, start_link, []},
restart => transient,
shutdown => ?WORKER_WAIT,
type => worker,
modules => [rabbit_federation_status]
},
XLinkSupSup = #{
id => x_links,
start => {rabbit_federation_exchange_link_sup_sup, start_link, []},
restart => transient,
shutdown => ?SUPERVISOR_WAIT,
type => supervisor,
modules =>[rabbit_federation_exchange_link_sup_sup]
},
QLinkSupSup = #{
id => q_links,
start => {rabbit_federation_queue_link_sup_sup, start_link, []},
restart => transient,
shutdown => ?SUPERVISOR_WAIT,
type => supervisor,
modules => [rabbit_federation_queue_link_sup_sup]
},
%% with default reconnect-delay of 5 second, this supports up to
%% 100 links constantly failing and being restarted a minute
%% (or 200 links if reconnect-delay is 10 seconds, 600 with 30 seconds,
%% etc: N * (60/reconnect-delay) <= 1200)
{ok, {{one_for_one, 1200, 60}, [Status, XLinkSupSup, QLinkSupSup]}}.
Flags = #{
strategy => one_for_one,
intensity => 1200,
period => 60
},
Specs = [Status, XLinkSupSup, QLinkSupSup],
{ok, {Flags, Specs}}.

View File

@ -274,6 +274,7 @@ dynamic_plugin_stop_start(Config) ->
expect_federation(Ch, UpQ, DownQ2, ?EXPECT_FEDERATION_TIMEOUT),
%% Disable the plugin, the link disappears
ct:pal("Stopping rabbitmq_federation"),
ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, "rabbitmq_federation"),
expect_no_federation(Ch, UpQ, DownQ1),
@ -281,7 +282,9 @@ dynamic_plugin_stop_start(Config) ->
declare_queue(Ch, q(DownQ1, Args)),
declare_queue(Ch, q(DownQ2, Args)),
ct:pal("Re-starting rabbitmq_federation"),
ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, "rabbitmq_federation"),
timer:sleep(?INITIAL_WAIT),
%% Declare a queue then re-enable the plugin, the links appear
wait_for_federation(

View File

@ -18,5 +18,5 @@
-define(HEALTH_CHECK_FAILURE_STATUS, 503).
-define(MANAGEMENT_PG_SCOPE, rabbit_mgmt).
-define(MANAGEMENT_PG_SCOPE, rabbitmq_management).
-define(MANAGEMENT_PG_GROUP, management_db).

View File

@ -14,5 +14,5 @@
%% Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
%%
-define(MANAGEMENT_PG_SCOPE, rabbit_mgmt).
-define(MANAGEMENT_PG_SCOPE, rabbitmq_management).
-define(MANAGEMENT_PG_GROUP, management_db).