diff --git a/deps/rabbitmq_federation/Makefile b/deps/rabbitmq_federation/Makefile index 4c291dd686..6527bc99a6 100644 --- a/deps/rabbitmq_federation/Makefile +++ b/deps/rabbitmq_federation/Makefile @@ -4,7 +4,8 @@ PROJECT_MOD = rabbit_federation_app define PROJECT_ENV [ - {pgroup_name_cluster_id, false} + {pgroup_name_cluster_id, false}, + {internal_exchange_check_interval, 30000} ] endef diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl index 37a314fdf1..e3266433ab 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl @@ -46,7 +46,9 @@ downstream_connection, downstream_channel, downstream_exchange, - unacked}). + unacked, + internal_exchange_timer, + internal_exchange_interval}). %%---------------------------------------------------------------------------- @@ -175,6 +177,16 @@ handle_info({'DOWN', _Ref, process, Pid, Reason}, rabbit_federation_link_util:handle_down( Pid, Reason, Ch, DCh, {Upstream, UParams, XName}, State); +handle_info(check_internal_exchange, State = #state{internal_exchange = IntXNameBin, + internal_exchange_interval = Int}) -> + case check_internal_exchange(IntXNameBin, State) of + upstream_not_found -> + {stop, {shutdown, restart}, State}; + _ -> + TRef = erlang:send_after(Int, self(), check_internal_exchange), + {noreply, State#state{internal_exchange_timer = TRef}} + end; + handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -185,7 +197,9 @@ terminate(Reason, #state{downstream_connection = DConn, connection = Conn, upstream = Upstream, upstream_params = UParams, - downstream_exchange = XName}) -> + downstream_exchange = XName, + internal_exchange_timer = TRef}) -> + timer:cancel(TRef), rabbit_federation_link_util:ensure_connection_closed(DConn), rabbit_federation_link_util:ensure_connection_closed(Conn), rabbit_federation_link_util:log_terminate(Reason, Upstream, UParams, XName), @@ -404,6 +418,8 @@ go(S0 = {not_started, {Upstream, UParams, DownXName}}) -> %% serial we will process. Since it compares larger than %% any number we never process any commands. And we will %% soon get told to stop anyway. + {ok, Interval} = application:get_env(rabbitmq_federation, + internal_exchange_check_interval), State = ensure_upstream_bindings( consume_from_upstream_queue( #state{upstream = Upstream, @@ -415,9 +431,11 @@ go(S0 = {not_started, {Upstream, UParams, DownXName}}) -> downstream_connection = DConn, downstream_channel = DCh, downstream_exchange = DownXName, - unacked = Unacked}), + unacked = Unacked, + internal_exchange_interval = Interval}), Bindings), - {noreply, State} + TRef = erlang:send_after(Interval, self(), check_internal_exchange), + {noreply, State#state{internal_exchange_timer = TRef}} end, Upstream, UParams, DownXName, S0). consume_from_upstream_queue( @@ -523,6 +541,35 @@ ensure_internal_exchange(IntXNameBin, amqp_channel:call(Ch, Fan) end). +check_internal_exchange(IntXNameBin, + #state{upstream = #upstream{max_hops = MaxHops}, + upstream_params = UParams, + downstream_exchange = XName}) -> + #upstream_params{params = Params} = UParams, + Base = #'exchange.declare'{exchange = IntXNameBin, + passive = true, + durable = true, + internal = true, + auto_delete = true}, + Purpose = [{<<"x-internal-purpose">>, longstr, <<"federation">>}], + XFUArgs = [{?MAX_HOPS_ARG, long, MaxHops}, + {?NODE_NAME_ARG, longstr, rabbit_nodes:cluster_name()} + | Purpose], + XFU = Base#'exchange.declare'{type = <<"x-federation-upstream">>, + arguments = XFUArgs}, + rabbit_federation_link_util:disposable_connection_call( + Params, XFU, fun(404, Text) -> + rabbit_federation_link_util:log_warning( + XName, "detected internal upstream exchange changes," + " restarting link: ~p~n", [Text]), + upstream_not_found; + (Code, Text) -> + rabbit_federation_link_util:log_warning( + XName, "internal upstream exchange check failed: ~p ~p~n", + [Code, Text]), + error + end). + upstream_queue_name(XNameBin, VHost, #resource{name = DownXNameBin, virtual_host = DownVHost}) -> Node = rabbit_nodes:cluster_name(), diff --git a/deps/rabbitmq_federation/src/rabbit_federation_link_util.erl b/deps/rabbitmq_federation/src/rabbit_federation_link_util.erl index 04371f4a22..d99281bcdd 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_link_util.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_link_util.erl @@ -300,6 +300,8 @@ disposable_connection_call(Params, Method, ErrFun) -> amqp_channel:call(Ch, Method) catch exit:{{shutdown, {connection_closing, {server_initiated_close, Code, Txt}}}, _} -> + ErrFun(Code, Txt); + exit:{{shutdown, {server_initiated_close, Code, Txt}}, _} -> ErrFun(Code, Txt) after ensure_connection_closed(Conn)