Create a links API.

This commit is contained in:
Simon MacMullen 2011-03-24 12:46:20 +00:00
parent 6ae679b635
commit 85e2ab33fd
6 changed files with 51 additions and 22 deletions

View File

@ -19,3 +19,5 @@
prefetch_count,
reconnect_delay,
queue_expires}).
-define(SUPERVISOR, rabbit_federation_sup).

View File

@ -29,7 +29,7 @@
start(_Type, _StartArgs) ->
{ok, Xs} = application:get_env(exchanges),
[declare_exchange(X) || X <- Xs],
rabbit_federation_sup:go_all(),
rabbit_federation_links:go_all(),
supervisor:start_link({local,?MODULE},?MODULE,[]).
stop(_State) ->

View File

@ -70,7 +70,7 @@ recover(X, Bs) ->
delete(transaction, X, Bs) ->
with_module(X, fun (M) -> M:delete(transaction, X, Bs) end);
delete(Serial, X, Bs) ->
call(X, stop),
rabbit_federation_links:stop(X),
ok = rabbit_federation_sup:stop_child(exchange_to_sup_args(X)),
with_module(X, fun (M) -> M:delete(serial(Serial, X), X, Bs) end).
@ -80,7 +80,7 @@ add_binding(Serial, X, B = #binding{destination = Dest}) ->
%% TODO add bindings only if needed.
case is_federation_exchange(Dest) of
true -> ok;
false -> call(X, {enqueue, Serial, {add_binding, B}})
false -> rabbit_federation_links:add_binding(Serial, X, B)
end,
with_module(X, fun (M) -> M:add_binding(serial(Serial, X), X, B) end).
@ -89,7 +89,7 @@ remove_bindings(transaction, X, Bs) ->
remove_bindings(Serial, X, Bs) ->
[case is_federation_exchange(Dest) of
true -> ok;
false -> call(X, {enqueue, Serial, {remove_binding, B}})
false -> rabbit_federation_links:remove_binding(Serial, X, B)
end || B = #binding{destination = Dest} <- Bs],
with_module(X, fun (M) -> M:remove_bindings(serial(Serial, X), X, Bs) end).
@ -109,10 +109,6 @@ serial(Serial, X) ->
%%----------------------------------------------------------------------------
call(#exchange{ name = Downstream }, Msg) ->
SupPid = rabbit_federation_db:sup_for_exchange(Downstream),
rabbit_federation_link_sup:call_all(SupPid, Msg).
with_module(#exchange{ arguments = Args }, Fun) ->
%% TODO should this be cached? It's on the publish path.
{longstr, Type} = rabbit_misc:table_lookup(Args, <<"type">>),

View File

@ -25,17 +25,10 @@
-export([start_link/1]).
-export([init/1]).
-export([call_all/2]).
-define(SUPERVISOR, ?MODULE).
start_link(Args) ->
supervisor2:start_link(?MODULE, Args).
call_all(Sup, Msg) ->
[gen_server2:call(Pid, Msg, infinity) ||
{_, Pid, _, _} <- supervisor2:which_children(Sup), Pid =/= undefined].
%%----------------------------------------------------------------------------
init({Upstreams, DownstreamX, Durable}) ->

View File

@ -0,0 +1,43 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ Federation.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
-module(rabbit_federation_links).
-include("rabbit_federation.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-export([go_all/0]).
-export([add_binding/3, remove_binding/3, stop/1]).
go_all() ->
[{ok, _} = rabbit_federation_exchange_sup:go(Pid) ||
{_, Pid, _, _} <- supervisor:which_children(?SUPERVISOR)].
add_binding(Serial, X, B) ->
call(X, {enqueue, Serial, {add_binding, B}}).
remove_binding(Serial, X, B) ->
call(X, {enqueue, Serial, {remove_binding, B}}).
stop(X) ->
call(X, stop).
%%----------------------------------------------------------------------------
call(#exchange{ name = Downstream }, Msg) ->
Sup = rabbit_federation_db:sup_for_exchange(Downstream),
[gen_server2:call(Pid, Msg, infinity) ||
{_, Pid, _, _} <- supervisor2:which_children(Sup), Pid =/= undefined].

View File

@ -20,14 +20,13 @@
%% Supervises everything. There is just one of these.
-include("rabbit_federation.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-export([start_link/0, start_child/1, stop_child/1, go_all/0]).
-export([start_link/0, start_child/1, stop_child/1]).
-export([init/1]).
-define(SUPERVISOR, ?MODULE).
%% This supervisor needs to be part of the rabbit application since
%% a) it needs to be in place when exchange recovery takes place
%% b) it needs to go up and down with rabbit
@ -60,10 +59,6 @@ stop_child(Args) ->
ok = supervisor:delete_child(?SUPERVISOR, id(Args)),
ok.
go_all() ->
[{ok, _} = rabbit_federation_exchange_sup:go(Pid) ||
{_, Pid, _, _} <- supervisor:which_children(?SUPERVISOR)].
%%----------------------------------------------------------------------------
federation_up() ->