diff --git a/deps/rabbitmq_federation/src/rabbit_federation_db.erl b/deps/rabbitmq_federation/src/rabbit_federation_db.erl index 5774860a43..5c3f197f67 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_db.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_db.erl @@ -23,7 +23,7 @@ -import(rabbit_federation_util, [name/1]). --export([get_active_suffix/3, set_active_suffix/3, prune_scratch/2]). +-export([get_active_suffix/3, set_active_suffix/3, prune_scratch/1]). %%---------------------------------------------------------------------------- @@ -43,7 +43,8 @@ set_active_suffix(XName, Upstream, Suffix) -> XName, federation, fun(D) -> ?DICT:store(key(Upstream), Suffix, D) end). -prune_scratch(XName, Upstreams) -> +prune_scratch(X = #exchange{name = XName}) -> + Upstreams = rabbit_federation_upstream:for(X), ok = rabbit_exchange:update_scratch( XName, federation, fun(undefined) -> ?DICT:new(); diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange.erl index 06289644da..d21e6be210 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_exchange.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange.erl @@ -86,23 +86,17 @@ federate(#exchange{internal = true}) -> false; federate(X) -> - case rabbit_federation_upstream:for(X) of + case rabbit_federation_upstream:set_for(X) of {ok, _} -> true; {error, _} -> false end. maybe_start(X = #exchange{name = XName})-> case federate(X) of - true -> - %% TODO the extent to which we pass Set around can - %% probably be simplified. - {ok, Set} = rabbit_federation_upstream:for(X), - Upstreams = rabbit_federation_upstream:from_set(Set, X), - ok = rabbit_federation_db:prune_scratch(XName, Upstreams), - {ok, _} = rabbit_federation_link_sup_sup:start_child(X, {Set, X}), - ok; - false -> - ok + true -> ok = rabbit_federation_db:prune_scratch(X), + {ok, _} = rabbit_federation_link_sup_sup:start_child(X), + ok; + false -> ok end. maybe_stop(X = #exchange{name = XName}) -> diff --git a/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl index a2766a1716..42d4d9a2fb 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl @@ -26,22 +26,16 @@ -export([start_link/1, adjust/3]). -export([init/1]). -start_link(Args) -> supervisor2:start_link(?MODULE, Args). +start_link(X) -> supervisor2:start_link(?MODULE, X). adjust(Sup, X, everything) -> [stop(Sup, Upstream) || {Upstream, _, _, _} <- supervisor2:which_children(Sup)], - case rabbit_federation_upstream:for(X) of - {ok, UpstreamSet} -> - [{ok, _Pid} = supervisor2:start_child(Sup, Spec) || - Spec <- specs(UpstreamSet, X)]; - {error, not_found} -> - ok - end; + [{ok, _Pid} = supervisor2:start_child(Sup, Spec) || Spec <- specs(X)]; adjust(Sup, X, {connection, ConnName}) -> OldUpstreams0 = children(Sup, ConnName), - NewUpstreams0 = upstreams(X, ConnName), + NewUpstreams0 = rabbit_federation_upstream:for(X, ConnName), %% If any haven't changed, don't restart them. The broker will %% avoid telling us about connections that have not changed %% syntactically, but even if one has, this X may not have that @@ -58,14 +52,13 @@ adjust(Sup, X, {connection, ConnName}) -> [start(Sup, NewUpstream, X) || NewUpstream <- NewUpstreams]; adjust(Sup, X = #exchange{name = XName}, {clear_connection, ConnName}) -> - ok = rabbit_federation_db:prune_scratch(XName, upstreams(X)), + ok = rabbit_federation_db:prune_scratch(X), [stop(Sup, Upstream) || Upstream <- children(Sup, ConnName)]; %% TODO handle changes of upstream sets minimally (bug 24853) adjust(Sup, X = #exchange{name = XName}, {upstream_set, Set}) -> - case rabbit_federation_upstream:for(X) of - {ok, Set} -> Us = rabbit_federation_upstream:from_set(Set, X), - ok = rabbit_federation_db:prune_scratch(XName, Us); + case rabbit_federation_upstream:set_for(X) of + {ok, Set} -> ok = rabbit_federation_db:prune_scratch(X); _ -> ok end, adjust(Sup, X, everything); @@ -90,32 +83,15 @@ children(Sup, ConnName) -> rabbit_federation_util:find_upstreams( ConnName, [U || {U, _, _, _} <- supervisor2:which_children(Sup)]). -upstreams(X, ConnName) -> - case rabbit_federation_upstream:for(X) of - {ok, UpstreamSet} -> - rabbit_federation_upstream:from_set(UpstreamSet, X, ConnName); - {error, not_found} -> - [] - end. - -upstreams(X) -> - case rabbit_federation_upstream:for(X) of - {ok, UpstreamSet} -> - rabbit_federation_upstream:from_set(UpstreamSet, X); - {error, not_found} -> - [] - end. - %%---------------------------------------------------------------------------- -init({UpstreamSet, X}) -> +init(X) -> %% 1, 1 so that the supervisor can give up and get into waiting %% for the reconnect_delay quickly. - {ok, {{one_for_one, 1, 1}, specs(UpstreamSet, X)}}. + {ok, {{one_for_one, 1, 1}, specs(X)}}. -specs(UpstreamSet, X) -> - [spec(Upstream, X) || - Upstream <- rabbit_federation_upstream:from_set(UpstreamSet, X)]. +specs(X) -> + [spec(Upstream, X) || Upstream <- rabbit_federation_upstream:for(X)]. spec(Upstream = #upstream{reconnect_delay = Delay}, #exchange{name = XName}) -> {Upstream, {rabbit_federation_link, start_link, [{Upstream, XName}]}, diff --git a/deps/rabbitmq_federation/src/rabbit_federation_link_sup_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_link_sup_sup.erl index 9e8970ab24..d622552f6f 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_link_sup_sup.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_link_sup_sup.erl @@ -21,7 +21,7 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -define(SUPERVISOR, rabbit_federation_link_sup_sup). --export([start_link/0, start_child/2, adjust/1, stop_child/1]). +-export([start_link/0, start_child/1, adjust/1, stop_child/1]). -export([init/1]). @@ -31,10 +31,10 @@ start_link() -> mirrored_supervisor:start_link({local, ?SUPERVISOR}, ?SUPERVISOR, ?MODULE, []). -start_child(X, Args) -> +start_child(X) -> {ok, _Pid} = mirrored_supervisor:start_child( ?SUPERVISOR, - {id(X), {rabbit_federation_link_sup, start_link, [Args]}, + {id(X), {rabbit_federation_link_sup, start_link, [X]}, transient, ?MAX_WAIT, supervisor, [rabbit_federation_link_sup]}). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl index 183c4d38a0..7f56265cc6 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl @@ -19,14 +19,26 @@ -include("rabbit_federation.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). --export([for/1, to_table/1, to_string/1, from_set/2, from_set/3]). +-export([set_for/1, for/1, for/2, to_table/1, to_string/1]). -import(rabbit_misc, [pget/2, pget/3]). -import(rabbit_federation_util, [name/1, vhost/1]). %%---------------------------------------------------------------------------- -for(X) -> rabbit_policy:get(<<"federation-upstream-set">>, X). +set_for(X) -> rabbit_policy:get(<<"federation-upstream-set">>, X). + +for(X) -> + case set_for(X) of + {ok, UpstreamSet} -> from_set(UpstreamSet, X); + {error, not_found} -> [] + end. + +for(X, ConnName) -> + case set_for(X) of + {ok, UpstreamSet} -> from_set(UpstreamSet, X, ConnName); + {error, not_found} -> [] + end. to_table(#upstream{original_uri = URI, params = Params,