diff --git a/deps/rabbitmq_exchange_federation/src/rabbit_federation_exchange_link.erl b/deps/rabbitmq_exchange_federation/src/rabbit_federation_exchange_link.erl index 81d8a49333..951dd67e4d 100644 --- a/deps/rabbitmq_exchange_federation/src/rabbit_federation_exchange_link.erl +++ b/deps/rabbitmq_exchange_federation/src/rabbit_federation_exchange_link.erl @@ -184,6 +184,9 @@ handle_info(check_internal_exchange, State = #state{internal_exchange = IntXName {noreply, State#state{internal_exchange_timer = TRef}} end; +handle_info({'EXIT', _From, Reason}, State) -> + {stop, Reason, State}; + handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. diff --git a/deps/rabbitmq_federation_common/src/rabbit_federation_link_sup.erl b/deps/rabbitmq_federation_common/src/rabbit_federation_link_sup.erl index 7c76aafbd9..10f04a96ef 100644 --- a/deps/rabbitmq_federation_common/src/rabbit_federation_link_sup.erl +++ b/deps/rabbitmq_federation_common/src/rabbit_federation_link_sup.erl @@ -99,12 +99,12 @@ specs(LinkMod, XorQ) -> spec(LinkMod, U = #upstream{reconnect_delay = Delay}, #exchange{name = XName}) -> {U, {LinkMod, start_link, [{U, XName}]}, - {permanent, Delay}, ?WORKER_WAIT, worker, + {transient, Delay}, ?WORKER_WAIT, worker, [LinkMod]}; spec(LinkMod, Upstream = #upstream{reconnect_delay = Delay}, Q) when ?is_amqqueue(Q) -> {Upstream, {LinkMod, start_link, [{Upstream, Q}]}, - {permanent, Delay}, ?WORKER_WAIT, worker, + {transient, Delay}, ?WORKER_WAIT, worker, [LinkMod]}. name(#exchange{name = XName}) -> XName; diff --git a/deps/rabbitmq_federation_common/src/rabbit_federation_pg.erl b/deps/rabbitmq_federation_common/src/rabbit_federation_pg.erl index 2f3ee5f244..32ec7cbe95 100644 --- a/deps/rabbitmq_federation_common/src/rabbit_federation_pg.erl +++ b/deps/rabbitmq_federation_common/src/rabbit_federation_pg.erl @@ -17,7 +17,29 @@ stop_scope(Scope) -> case whereis(Scope) of Pid when is_pid(Pid) -> rabbit_log_federation:debug("Stopping pg scope ~ts", [Scope]), + Groups = pg:which_groups(Scope), + lists:foreach( + fun(Group) -> + stop_group(Scope, Group) + end, Groups), exit(Pid, normal); _ -> ok end. + +stop_group(Scope, Group) -> + Members = pg:get_local_members(Scope, Group), + MRefs = [erlang:monitor(process, Member) || Member <- Members], + lists:foreach( + fun(Member) -> + exit(Member, normal) + end, Members), + lists:foreach( + fun(MRef) -> + receive + {'DOWN', MRef, process, _Member, _Info} -> + logger:alert("Member ~p stopped: ~0p", [_Member, _Info]), + ok + end + end, MRefs), + ok. diff --git a/deps/rabbitmq_queue_federation/src/rabbit_federation_queue_link.erl b/deps/rabbitmq_queue_federation/src/rabbit_federation_queue_link.erl index fda313f63d..11d0598ba3 100644 --- a/deps/rabbitmq_queue_federation/src/rabbit_federation_queue_link.erl +++ b/deps/rabbitmq_queue_federation/src/rabbit_federation_queue_link.erl @@ -162,6 +162,9 @@ handle_info({'DOWN', _Ref, process, Pid, Reason}, QName = amqqueue:get_name(Q), handle_down(Pid, Reason, Ch, DCh, {Upstream, UParams, QName}, State); +handle_info({'EXIT', _From, Reason}, State) -> + {stop, Reason, State}; + handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}.