Pass upstream sets around less, simplifying a bunch of APIs.
This commit is contained in:
		
							parent
							
								
									c6befb533d
								
							
						
					
					
						commit
						59b9fcfcda
					
				| 
						 | 
					@ -23,7 +23,7 @@
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-import(rabbit_federation_util, [name/1]).
 | 
					-import(rabbit_federation_util, [name/1]).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-export([get_active_suffix/3, set_active_suffix/3, prune_scratch/2]).
 | 
					-export([get_active_suffix/3, set_active_suffix/3, prune_scratch/1]).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
%%----------------------------------------------------------------------------
 | 
					%%----------------------------------------------------------------------------
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -43,7 +43,8 @@ set_active_suffix(XName, Upstream, Suffix) ->
 | 
				
			||||||
           XName, federation,
 | 
					           XName, federation,
 | 
				
			||||||
           fun(D) -> ?DICT:store(key(Upstream), Suffix, D) end).
 | 
					           fun(D) -> ?DICT:store(key(Upstream), Suffix, D) end).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
prune_scratch(XName, Upstreams) ->
 | 
					prune_scratch(X = #exchange{name = XName}) ->
 | 
				
			||||||
 | 
					    Upstreams = rabbit_federation_upstream:for(X),
 | 
				
			||||||
    ok = rabbit_exchange:update_scratch(
 | 
					    ok = rabbit_exchange:update_scratch(
 | 
				
			||||||
           XName, federation,
 | 
					           XName, federation,
 | 
				
			||||||
           fun(undefined) -> ?DICT:new();
 | 
					           fun(undefined) -> ?DICT:new();
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -86,23 +86,17 @@ federate(#exchange{internal = true}) ->
 | 
				
			||||||
    false;
 | 
					    false;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
federate(X) ->
 | 
					federate(X) ->
 | 
				
			||||||
    case rabbit_federation_upstream:for(X) of
 | 
					    case rabbit_federation_upstream:set_for(X) of
 | 
				
			||||||
        {ok, _}    -> true;
 | 
					        {ok, _}    -> true;
 | 
				
			||||||
        {error, _} -> false
 | 
					        {error, _} -> false
 | 
				
			||||||
    end.
 | 
					    end.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
maybe_start(X = #exchange{name = XName})->
 | 
					maybe_start(X = #exchange{name = XName})->
 | 
				
			||||||
    case federate(X) of
 | 
					    case federate(X) of
 | 
				
			||||||
        true ->
 | 
					        true  -> ok = rabbit_federation_db:prune_scratch(X),
 | 
				
			||||||
            %% TODO the extent to which we pass Set around can
 | 
					                 {ok, _} = rabbit_federation_link_sup_sup:start_child(X),
 | 
				
			||||||
            %% probably be simplified.
 | 
					                 ok;
 | 
				
			||||||
            {ok, Set} = rabbit_federation_upstream:for(X),
 | 
					        false -> ok
 | 
				
			||||||
            Upstreams = rabbit_federation_upstream:from_set(Set, X),
 | 
					 | 
				
			||||||
            ok = rabbit_federation_db:prune_scratch(XName, Upstreams),
 | 
					 | 
				
			||||||
            {ok, _} = rabbit_federation_link_sup_sup:start_child(X, {Set, X}),
 | 
					 | 
				
			||||||
            ok;
 | 
					 | 
				
			||||||
        false ->
 | 
					 | 
				
			||||||
            ok
 | 
					 | 
				
			||||||
    end.
 | 
					    end.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
maybe_stop(X = #exchange{name = XName}) ->
 | 
					maybe_stop(X = #exchange{name = XName}) ->
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -26,22 +26,16 @@
 | 
				
			||||||
-export([start_link/1, adjust/3]).
 | 
					-export([start_link/1, adjust/3]).
 | 
				
			||||||
-export([init/1]).
 | 
					-export([init/1]).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
start_link(Args) -> supervisor2:start_link(?MODULE, Args).
 | 
					start_link(X) -> supervisor2:start_link(?MODULE, X).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
adjust(Sup, X, everything) ->
 | 
					adjust(Sup, X, everything) ->
 | 
				
			||||||
    [stop(Sup, Upstream) ||
 | 
					    [stop(Sup, Upstream) ||
 | 
				
			||||||
        {Upstream, _, _, _} <- supervisor2:which_children(Sup)],
 | 
					        {Upstream, _, _, _} <- supervisor2:which_children(Sup)],
 | 
				
			||||||
    case rabbit_federation_upstream:for(X) of
 | 
					    [{ok, _Pid} = supervisor2:start_child(Sup, Spec) || Spec <- specs(X)];
 | 
				
			||||||
        {ok, UpstreamSet} ->
 | 
					 | 
				
			||||||
            [{ok, _Pid} = supervisor2:start_child(Sup, Spec) ||
 | 
					 | 
				
			||||||
                Spec <- specs(UpstreamSet, X)];
 | 
					 | 
				
			||||||
        {error, not_found} ->
 | 
					 | 
				
			||||||
            ok
 | 
					 | 
				
			||||||
    end;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
adjust(Sup, X, {connection, ConnName}) ->
 | 
					adjust(Sup, X, {connection, ConnName}) ->
 | 
				
			||||||
    OldUpstreams0 = children(Sup, ConnName),
 | 
					    OldUpstreams0 = children(Sup, ConnName),
 | 
				
			||||||
    NewUpstreams0 = upstreams(X, ConnName),
 | 
					    NewUpstreams0 = rabbit_federation_upstream:for(X, ConnName),
 | 
				
			||||||
    %% If any haven't changed, don't restart them. The broker will
 | 
					    %% If any haven't changed, don't restart them. The broker will
 | 
				
			||||||
    %% avoid telling us about connections that have not changed
 | 
					    %% avoid telling us about connections that have not changed
 | 
				
			||||||
    %% syntactically, but even if one has, this X may not have that
 | 
					    %% syntactically, but even if one has, this X may not have that
 | 
				
			||||||
| 
						 | 
					@ -58,14 +52,13 @@ adjust(Sup, X, {connection, ConnName}) ->
 | 
				
			||||||
    [start(Sup, NewUpstream, X) || NewUpstream <- NewUpstreams];
 | 
					    [start(Sup, NewUpstream, X) || NewUpstream <- NewUpstreams];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
adjust(Sup, X = #exchange{name = XName}, {clear_connection, ConnName}) ->
 | 
					adjust(Sup, X = #exchange{name = XName}, {clear_connection, ConnName}) ->
 | 
				
			||||||
    ok = rabbit_federation_db:prune_scratch(XName, upstreams(X)),
 | 
					    ok = rabbit_federation_db:prune_scratch(X),
 | 
				
			||||||
    [stop(Sup, Upstream) || Upstream <- children(Sup, ConnName)];
 | 
					    [stop(Sup, Upstream) || Upstream <- children(Sup, ConnName)];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
%% TODO handle changes of upstream sets minimally (bug 24853)
 | 
					%% TODO handle changes of upstream sets minimally (bug 24853)
 | 
				
			||||||
adjust(Sup, X = #exchange{name = XName}, {upstream_set, Set}) ->
 | 
					adjust(Sup, X = #exchange{name = XName}, {upstream_set, Set}) ->
 | 
				
			||||||
    case rabbit_federation_upstream:for(X) of
 | 
					    case rabbit_federation_upstream:set_for(X) of
 | 
				
			||||||
        {ok, Set} -> Us = rabbit_federation_upstream:from_set(Set, X),
 | 
					        {ok, Set} -> ok = rabbit_federation_db:prune_scratch(X);
 | 
				
			||||||
                     ok = rabbit_federation_db:prune_scratch(XName, Us);
 | 
					 | 
				
			||||||
        _         -> ok
 | 
					        _         -> ok
 | 
				
			||||||
    end,
 | 
					    end,
 | 
				
			||||||
    adjust(Sup, X, everything);
 | 
					    adjust(Sup, X, everything);
 | 
				
			||||||
| 
						 | 
					@ -90,32 +83,15 @@ children(Sup, ConnName) ->
 | 
				
			||||||
    rabbit_federation_util:find_upstreams(
 | 
					    rabbit_federation_util:find_upstreams(
 | 
				
			||||||
      ConnName, [U || {U, _, _, _} <- supervisor2:which_children(Sup)]).
 | 
					      ConnName, [U || {U, _, _, _} <- supervisor2:which_children(Sup)]).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
upstreams(X, ConnName) ->
 | 
					 | 
				
			||||||
    case rabbit_federation_upstream:for(X) of
 | 
					 | 
				
			||||||
        {ok, UpstreamSet} ->
 | 
					 | 
				
			||||||
            rabbit_federation_upstream:from_set(UpstreamSet, X, ConnName);
 | 
					 | 
				
			||||||
        {error, not_found} ->
 | 
					 | 
				
			||||||
            []
 | 
					 | 
				
			||||||
    end.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
upstreams(X) ->
 | 
					 | 
				
			||||||
    case rabbit_federation_upstream:for(X) of
 | 
					 | 
				
			||||||
        {ok, UpstreamSet} ->
 | 
					 | 
				
			||||||
            rabbit_federation_upstream:from_set(UpstreamSet, X);
 | 
					 | 
				
			||||||
        {error, not_found} ->
 | 
					 | 
				
			||||||
            []
 | 
					 | 
				
			||||||
    end.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
%%----------------------------------------------------------------------------
 | 
					%%----------------------------------------------------------------------------
 | 
				
			||||||
 | 
					
 | 
				
			||||||
init({UpstreamSet, X}) ->
 | 
					init(X) ->
 | 
				
			||||||
    %% 1, 1 so that the supervisor can give up and get into waiting
 | 
					    %% 1, 1 so that the supervisor can give up and get into waiting
 | 
				
			||||||
    %% for the reconnect_delay quickly.
 | 
					    %% for the reconnect_delay quickly.
 | 
				
			||||||
    {ok, {{one_for_one, 1, 1}, specs(UpstreamSet, X)}}.
 | 
					    {ok, {{one_for_one, 1, 1}, specs(X)}}.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
specs(UpstreamSet, X) ->
 | 
					specs(X) ->
 | 
				
			||||||
    [spec(Upstream, X) ||
 | 
					    [spec(Upstream, X) || Upstream <- rabbit_federation_upstream:for(X)].
 | 
				
			||||||
        Upstream <- rabbit_federation_upstream:from_set(UpstreamSet, X)].
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
spec(Upstream = #upstream{reconnect_delay = Delay}, #exchange{name = XName}) ->
 | 
					spec(Upstream = #upstream{reconnect_delay = Delay}, #exchange{name = XName}) ->
 | 
				
			||||||
    {Upstream, {rabbit_federation_link, start_link, [{Upstream, XName}]},
 | 
					    {Upstream, {rabbit_federation_link, start_link, [{Upstream, XName}]},
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -21,7 +21,7 @@
 | 
				
			||||||
-include_lib("rabbit_common/include/rabbit.hrl").
 | 
					-include_lib("rabbit_common/include/rabbit.hrl").
 | 
				
			||||||
-define(SUPERVISOR, rabbit_federation_link_sup_sup).
 | 
					-define(SUPERVISOR, rabbit_federation_link_sup_sup).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-export([start_link/0, start_child/2, adjust/1, stop_child/1]).
 | 
					-export([start_link/0, start_child/1, adjust/1, stop_child/1]).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-export([init/1]).
 | 
					-export([init/1]).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -31,10 +31,10 @@ start_link() ->
 | 
				
			||||||
    mirrored_supervisor:start_link({local, ?SUPERVISOR},
 | 
					    mirrored_supervisor:start_link({local, ?SUPERVISOR},
 | 
				
			||||||
                                   ?SUPERVISOR, ?MODULE, []).
 | 
					                                   ?SUPERVISOR, ?MODULE, []).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
start_child(X, Args) ->
 | 
					start_child(X) ->
 | 
				
			||||||
    {ok, _Pid} = mirrored_supervisor:start_child(
 | 
					    {ok, _Pid} = mirrored_supervisor:start_child(
 | 
				
			||||||
                   ?SUPERVISOR,
 | 
					                   ?SUPERVISOR,
 | 
				
			||||||
                   {id(X), {rabbit_federation_link_sup, start_link, [Args]},
 | 
					                   {id(X), {rabbit_federation_link_sup, start_link, [X]},
 | 
				
			||||||
                    transient, ?MAX_WAIT, supervisor,
 | 
					                    transient, ?MAX_WAIT, supervisor,
 | 
				
			||||||
                    [rabbit_federation_link_sup]}).
 | 
					                    [rabbit_federation_link_sup]}).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -19,14 +19,26 @@
 | 
				
			||||||
-include("rabbit_federation.hrl").
 | 
					-include("rabbit_federation.hrl").
 | 
				
			||||||
-include_lib("amqp_client/include/amqp_client.hrl").
 | 
					-include_lib("amqp_client/include/amqp_client.hrl").
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-export([for/1, to_table/1, to_string/1, from_set/2, from_set/3]).
 | 
					-export([set_for/1, for/1, for/2, to_table/1, to_string/1]).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-import(rabbit_misc, [pget/2, pget/3]).
 | 
					-import(rabbit_misc, [pget/2, pget/3]).
 | 
				
			||||||
-import(rabbit_federation_util, [name/1, vhost/1]).
 | 
					-import(rabbit_federation_util, [name/1, vhost/1]).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
%%----------------------------------------------------------------------------
 | 
					%%----------------------------------------------------------------------------
 | 
				
			||||||
 | 
					
 | 
				
			||||||
for(X) -> rabbit_policy:get(<<"federation-upstream-set">>, X).
 | 
					set_for(X) -> rabbit_policy:get(<<"federation-upstream-set">>, X).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					for(X) ->
 | 
				
			||||||
 | 
					    case set_for(X) of
 | 
				
			||||||
 | 
					        {ok, UpstreamSet}  -> from_set(UpstreamSet, X);
 | 
				
			||||||
 | 
					        {error, not_found} -> []
 | 
				
			||||||
 | 
					    end.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					for(X, ConnName) ->
 | 
				
			||||||
 | 
					    case set_for(X) of
 | 
				
			||||||
 | 
					        {ok, UpstreamSet}  -> from_set(UpstreamSet, X, ConnName);
 | 
				
			||||||
 | 
					        {error, not_found} -> []
 | 
				
			||||||
 | 
					    end.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
to_table(#upstream{original_uri = URI,
 | 
					to_table(#upstream{original_uri = URI,
 | 
				
			||||||
                   params       = Params,
 | 
					                   params       = Params,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue