Give the federation exchange another type of exchange to emulate. This sorta works now (we can federate a message! woo!), but in a couple of cases we need the backing module before we have it. Hmm.

This commit is contained in:
Simon MacMullen 2011-02-10 17:11:33 +00:00
parent 8f4cf84e23
commit da339a0513
4 changed files with 57 additions and 28 deletions

View File

@ -3,4 +3,11 @@ This is all very experimental.
Configuration might look like:
{rabbit_federation,
[ {exchanges, [{"local", "amqp://amqp.example.com/%2f/remote"}] } ]}
[ {exchanges, [{ %% local exchange people can bind to
"local",
%% remote exchange they effectively bind to
"amqp://amqp.example.com/%2f/remote",
%% type of remote exchange
"direct"}]
}
]}

View File

@ -33,7 +33,7 @@ stop(_State) ->
%%----------------------------------------------------------------------------
declare_exchange({Local, Remote}) ->
declare_exchange({Local, Remote, Type}) ->
{ok, Conn} = amqp_connection:start(direct),
{ok, Ch} = amqp_connection:open_channel(Conn),
%% TODO make durable, recover bindings etc
@ -41,6 +41,7 @@ declare_exchange({Local, Remote}) ->
exchange = list_to_binary(Local),
type = <<"x-federation">>,
arguments =
[{<<"remote">>, longstr, list_to_binary(Remote)}]}),
[{<<"remote">>, longstr, list_to_binary(Remote)},
{<<"type">>, longstr, list_to_binary(Type)}]}),
amqp_channel:close(Ch),
amqp_connection:close(Conn).

View File

@ -36,7 +36,7 @@
-export([validate/1, create/2, recover/2, delete/3,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
-export([start_link/2]).
-export([start_link/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@ -46,8 +46,7 @@
-define(TX, false).
-record(state, { local_connection, local_channel,
remote_connection, remote_channel,
local, remote,
remote_queue }).
local, remote, remote_queue }).
%%----------------------------------------------------------------------------
@ -58,39 +57,61 @@ description() ->
[{name, <<"x-federation">>},
{description, <<"Federation exchange">>}].
route(_X, _Delivery) ->
exit(not_implemented).
route(X, Delivery) ->
with_module(X, fun (M) -> M:route(X, Delivery) end).
validate(_X) -> ok.
validate(X) ->
ok.
%%with_module(X, fun (M) -> M:validate(X) end).
create(?TX, #exchange{ name = Local, arguments = Args }) ->
create(?TX, X = #exchange{ name = Local, arguments = Args }) ->
{longstr, Remote} = rabbit_misc:table_lookup(Args, <<"remote">>),
rabbit_federation_sup:start_child(Local, binary_to_list(Remote));
create(_, _) -> ok.
{longstr, Type} = rabbit_misc:table_lookup(Args, <<"type">>),
{ok, Module} = rabbit_registry:lookup_module(
exchange, rabbit_exchange:check_type(Type)),
rabbit_federation_sup:start_child(Local, binary_to_list(Remote), Module),
with_module(X, fun (M) -> M:create(?TX, X) end);
create(Tx, X) ->
ok.
%%with_module(X, fun (M) -> M:create(Tx, X) end).
recover(X, Bs) ->
with_module(X, fun (M) -> M:recover(X, Bs) end).
delete(Tx, X, Bs) ->
with_module(X, fun (M) -> M:delete(Tx, X, Bs) end).
recover(_X, _Bs) -> ok.
delete(_Tx, _X, _Bs) -> ok.
add_binding(?TX, X, B) ->
call(X, {add_binding, B});
add_binding(_, _, _) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
call(X, {add_binding, B}),
with_module(X, fun (M) -> M:add_binding(?TX, X, B) end);
add_binding(Tx, X, B) ->
with_module(X, fun (M) -> M:add_binding(Tx, X, B) end).
remove_bindings(Tx, X, Bs) ->
with_module(X, fun (M) -> M:remove_bindings(Tx, X, Bs) end).
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
with_module(X, fun (M) -> M:assert_args_equivalence(X, Args) end).
%%----------------------------------------------------------------------------
call(#exchange{ name = Local }, Msg) ->
[{_, Pid}] = ets:lookup(?ETS_NAME, Local),
[{_, Pid, _}] = ets:lookup(?ETS_NAME, Local),
gen_server:call(Pid, Msg, infinity).
%%----------------------------------------------------------------------------
start_link(Local, Remote) ->
gen_server:start_link(?MODULE, {Local, Remote}, [{timeout, infinity}]).
with_module(#exchange{ name = Local }, Fun) ->
[{_, _, Module}] = ets:lookup(?ETS_NAME, Local),
Fun(Module).
%%----------------------------------------------------------------------------
init({Local, RemoteURI}) ->
start_link(Local, Remote, Module) ->
gen_server:start_link(?MODULE, {Local, Remote, Module},
[{timeout, infinity}]).
%%----------------------------------------------------------------------------
init({Local, RemoteURI, Module}) ->
Remote0 = uri_parser:parse(RemoteURI, [{host, undefined}, {path, "/"},
{port, undefined}, {'query', []}]),
[VHostEnc, XEnc] = string:tokens(proplists:get_value(path, Remote0), "/"),
@ -108,7 +129,7 @@ init({Local, RemoteURI}) ->
self()),
{ok, LConn} = amqp_connection:start(direct),
{ok, LCh} = amqp_connection:open_channel(LConn),
true = ets:insert(?ETS_NAME, {Local, self()}),
true = ets:insert(?ETS_NAME, {Local, self(), Module}),
{ok, #state{local_connection = LConn, local_channel = LCh,
remote_connection = RConn, remote_channel = RCh,
local = Local,

View File

@ -18,7 +18,7 @@
-behaviour(supervisor).
-export([start_link/0, start_child/2]).
-export([start_link/0, start_child/3]).
-export([init/1]).
@ -27,10 +27,10 @@
start_link() ->
supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []).
start_child(Local, Remote) ->
start_child(Local, Remote, Module) ->
supervisor:start_child(?SUPERVISOR,
{exchange, {rabbit_federation_exchange, start_link,
[Local, Remote]},
[Local, Remote, Module]},
permanent, brutal_kill, worker,
[rabbit_federation_exchange]}).