Merge branch 'stable'

This commit is contained in:
Michael Klishin 2017-07-24 18:07:54 +03:00
commit c1d9733e12
3 changed files with 55 additions and 5 deletions

View File

@ -4,7 +4,8 @@ PROJECT_MOD = rabbit_federation_app
define PROJECT_ENV
[
{pgroup_name_cluster_id, false}
{pgroup_name_cluster_id, false},
{internal_exchange_check_interval, 30000}
]
endef

View File

@ -46,7 +46,9 @@
downstream_connection,
downstream_channel,
downstream_exchange,
unacked}).
unacked,
internal_exchange_timer,
internal_exchange_interval}).
%%----------------------------------------------------------------------------
@ -175,6 +177,16 @@ handle_info({'DOWN', _Ref, process, Pid, Reason},
rabbit_federation_link_util:handle_down(
Pid, Reason, Ch, DCh, {Upstream, UParams, XName}, State);
handle_info(check_internal_exchange, State = #state{internal_exchange = IntXNameBin,
internal_exchange_interval = Int}) ->
case check_internal_exchange(IntXNameBin, State) of
upstream_not_found ->
{stop, {shutdown, restart}, State};
_ ->
TRef = erlang:send_after(Int, self(), check_internal_exchange),
{noreply, State#state{internal_exchange_timer = TRef}}
end;
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@ -185,7 +197,9 @@ terminate(Reason, #state{downstream_connection = DConn,
connection = Conn,
upstream = Upstream,
upstream_params = UParams,
downstream_exchange = XName}) ->
downstream_exchange = XName,
internal_exchange_timer = TRef}) ->
timer:cancel(TRef),
rabbit_federation_link_util:ensure_connection_closed(DConn),
rabbit_federation_link_util:ensure_connection_closed(Conn),
rabbit_federation_link_util:log_terminate(Reason, Upstream, UParams, XName),
@ -404,6 +418,8 @@ go(S0 = {not_started, {Upstream, UParams, DownXName}}) ->
%% serial we will process. Since it compares larger than
%% any number we never process any commands. And we will
%% soon get told to stop anyway.
{ok, Interval} = application:get_env(rabbitmq_federation,
internal_exchange_check_interval),
State = ensure_upstream_bindings(
consume_from_upstream_queue(
#state{upstream = Upstream,
@ -415,9 +431,11 @@ go(S0 = {not_started, {Upstream, UParams, DownXName}}) ->
downstream_connection = DConn,
downstream_channel = DCh,
downstream_exchange = DownXName,
unacked = Unacked}),
unacked = Unacked,
internal_exchange_interval = Interval}),
Bindings),
{noreply, State}
TRef = erlang:send_after(Interval, self(), check_internal_exchange),
{noreply, State#state{internal_exchange_timer = TRef}}
end, Upstream, UParams, DownXName, S0).
consume_from_upstream_queue(
@ -523,6 +541,35 @@ ensure_internal_exchange(IntXNameBin,
amqp_channel:call(Ch, Fan)
end).
check_internal_exchange(IntXNameBin,
#state{upstream = #upstream{max_hops = MaxHops},
upstream_params = UParams,
downstream_exchange = XName}) ->
#upstream_params{params = Params} = UParams,
Base = #'exchange.declare'{exchange = IntXNameBin,
passive = true,
durable = true,
internal = true,
auto_delete = true},
Purpose = [{<<"x-internal-purpose">>, longstr, <<"federation">>}],
XFUArgs = [{?MAX_HOPS_ARG, long, MaxHops},
{?NODE_NAME_ARG, longstr, rabbit_nodes:cluster_name()}
| Purpose],
XFU = Base#'exchange.declare'{type = <<"x-federation-upstream">>,
arguments = XFUArgs},
rabbit_federation_link_util:disposable_connection_call(
Params, XFU, fun(404, Text) ->
rabbit_federation_link_util:log_warning(
XName, "detected internal upstream exchange changes,"
" restarting link: ~p~n", [Text]),
upstream_not_found;
(Code, Text) ->
rabbit_federation_link_util:log_warning(
XName, "internal upstream exchange check failed: ~p ~p~n",
[Code, Text]),
error
end).
upstream_queue_name(XNameBin, VHost, #resource{name = DownXNameBin,
virtual_host = DownVHost}) ->
Node = rabbit_nodes:cluster_name(),

View File

@ -300,6 +300,8 @@ disposable_connection_call(Params, Method, ErrFun) ->
amqp_channel:call(Ch, Method)
catch exit:{{shutdown, {connection_closing,
{server_initiated_close, Code, Txt}}}, _} ->
ErrFun(Code, Txt);
exit:{{shutdown, {server_initiated_close, Code, Txt}}, _} ->
ErrFun(Code, Txt)
after
ensure_connection_closed(Conn)