Merge pull request #14054 from rabbitmq/terminate-links-when-federation-plugins-stop

rabbitmq_*_federation: Stop links during plugin stop
This commit is contained in:
Jean-Sébastien Pédron 2025-06-11 09:17:23 +02:00 committed by GitHub
commit f84828ec62
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 30 additions and 2 deletions

View File

@ -184,6 +184,9 @@ handle_info(check_internal_exchange, State = #state{internal_exchange = IntXName
{noreply, State#state{internal_exchange_timer = TRef}} {noreply, State#state{internal_exchange_timer = TRef}}
end; end;
handle_info({'EXIT', _From, Reason}, State) ->
{stop, Reason, State};
handle_info(Msg, State) -> handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}. {stop, {unexpected_info, Msg}, State}.

View File

@ -99,12 +99,12 @@ specs(LinkMod, XorQ) ->
spec(LinkMod, U = #upstream{reconnect_delay = Delay}, #exchange{name = XName}) -> spec(LinkMod, U = #upstream{reconnect_delay = Delay}, #exchange{name = XName}) ->
{U, {LinkMod, start_link, [{U, XName}]}, {U, {LinkMod, start_link, [{U, XName}]},
{permanent, Delay}, ?WORKER_WAIT, worker, {transient, Delay}, ?WORKER_WAIT, worker,
[LinkMod]}; [LinkMod]};
spec(LinkMod, Upstream = #upstream{reconnect_delay = Delay}, Q) when ?is_amqqueue(Q) -> spec(LinkMod, Upstream = #upstream{reconnect_delay = Delay}, Q) when ?is_amqqueue(Q) ->
{Upstream, {LinkMod, start_link, [{Upstream, Q}]}, {Upstream, {LinkMod, start_link, [{Upstream, Q}]},
{permanent, Delay}, ?WORKER_WAIT, worker, {transient, Delay}, ?WORKER_WAIT, worker,
[LinkMod]}. [LinkMod]}.
name(#exchange{name = XName}) -> XName; name(#exchange{name = XName}) -> XName;

View File

@ -17,7 +17,29 @@ stop_scope(Scope) ->
case whereis(Scope) of case whereis(Scope) of
Pid when is_pid(Pid) -> Pid when is_pid(Pid) ->
rabbit_log_federation:debug("Stopping pg scope ~ts", [Scope]), rabbit_log_federation:debug("Stopping pg scope ~ts", [Scope]),
Groups = pg:which_groups(Scope),
lists:foreach(
fun(Group) ->
stop_group(Scope, Group)
end, Groups),
exit(Pid, normal); exit(Pid, normal);
_ -> _ ->
ok ok
end. end.
stop_group(Scope, Group) ->
Members = pg:get_local_members(Scope, Group),
MRefs = [erlang:monitor(process, Member) || Member <- Members],
lists:foreach(
fun(Member) ->
exit(Member, normal)
end, Members),
lists:foreach(
fun(MRef) ->
receive
{'DOWN', MRef, process, _Member, _Info} ->
logger:alert("Member ~p stopped: ~0p", [_Member, _Info]),
ok
end
end, MRefs),
ok.

View File

@ -162,6 +162,9 @@ handle_info({'DOWN', _Ref, process, Pid, Reason},
QName = amqqueue:get_name(Q), QName = amqqueue:get_name(Q),
handle_down(Pid, Reason, Ch, DCh, {Upstream, UParams, QName}, State); handle_down(Pid, Reason, Ch, DCh, {Upstream, UParams, QName}, State);
handle_info({'EXIT', _From, Reason}, State) ->
{stop, Reason, State};
handle_info(Msg, State) -> handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}. {stop, {unexpected_info, Msg}, State}.