From da339a051336ad9ba3ac169a0b8c76db46e9d83b Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 10 Feb 2011 17:11:33 +0000 Subject: [PATCH] Give the federation exchange another type of exchange to emulate. This sorta works now (we can federate a message! woo!), but in a couple of cases we need the backing module before we have it. Hmm. --- deps/rabbitmq_federation/README | 9 ++- .../src/rabbit_federation_app.erl | 5 +- .../src/rabbit_federation_exchange.erl | 65 ++++++++++++------- .../src/rabbit_federation_sup.erl | 6 +- 4 files changed, 57 insertions(+), 28 deletions(-) diff --git a/deps/rabbitmq_federation/README b/deps/rabbitmq_federation/README index efc4f5a856..1452c90c0f 100644 --- a/deps/rabbitmq_federation/README +++ b/deps/rabbitmq_federation/README @@ -3,4 +3,11 @@ This is all very experimental. Configuration might look like: {rabbit_federation, - [ {exchanges, [{"local", "amqp://amqp.example.com/%2f/remote"}] } ]} + [ {exchanges, [{ %% local exchange people can bind to + "local", + %% remote exchange they effectively bind to + "amqp://amqp.example.com/%2f/remote", + %% type of remote exchange + "direct"}] + } + ]} diff --git a/deps/rabbitmq_federation/src/rabbit_federation_app.erl b/deps/rabbitmq_federation/src/rabbit_federation_app.erl index 104c70b196..c72c8c47cb 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_app.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_app.erl @@ -33,7 +33,7 @@ stop(_State) -> %%---------------------------------------------------------------------------- -declare_exchange({Local, Remote}) -> +declare_exchange({Local, Remote, Type}) -> {ok, Conn} = amqp_connection:start(direct), {ok, Ch} = amqp_connection:open_channel(Conn), %% TODO make durable, recover bindings etc @@ -41,6 +41,7 @@ declare_exchange({Local, Remote}) -> exchange = list_to_binary(Local), type = <<"x-federation">>, arguments = - [{<<"remote">>, longstr, list_to_binary(Remote)}]}), + [{<<"remote">>, longstr, list_to_binary(Remote)}, + {<<"type">>, longstr, list_to_binary(Type)}]}), amqp_channel:close(Ch), amqp_connection:close(Conn). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange.erl index cac2bba49d..36bdd3a497 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_exchange.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange.erl @@ -36,7 +36,7 @@ -export([validate/1, create/2, recover/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). --export([start_link/2]). +-export([start_link/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -46,8 +46,7 @@ -define(TX, false). -record(state, { local_connection, local_channel, remote_connection, remote_channel, - local, remote, - remote_queue }). + local, remote, remote_queue }). %%---------------------------------------------------------------------------- @@ -58,39 +57,61 @@ description() -> [{name, <<"x-federation">>}, {description, <<"Federation exchange">>}]. -route(_X, _Delivery) -> - exit(not_implemented). +route(X, Delivery) -> + with_module(X, fun (M) -> M:route(X, Delivery) end). -validate(_X) -> ok. +validate(X) -> + ok. + %%with_module(X, fun (M) -> M:validate(X) end). -create(?TX, #exchange{ name = Local, arguments = Args }) -> +create(?TX, X = #exchange{ name = Local, arguments = Args }) -> {longstr, Remote} = rabbit_misc:table_lookup(Args, <<"remote">>), - rabbit_federation_sup:start_child(Local, binary_to_list(Remote)); -create(_, _) -> ok. + {longstr, Type} = rabbit_misc:table_lookup(Args, <<"type">>), + {ok, Module} = rabbit_registry:lookup_module( + exchange, rabbit_exchange:check_type(Type)), + rabbit_federation_sup:start_child(Local, binary_to_list(Remote), Module), + with_module(X, fun (M) -> M:create(?TX, X) end); +create(Tx, X) -> + ok. + %%with_module(X, fun (M) -> M:create(Tx, X) end). + +recover(X, Bs) -> + with_module(X, fun (M) -> M:recover(X, Bs) end). + +delete(Tx, X, Bs) -> + with_module(X, fun (M) -> M:delete(Tx, X, Bs) end). -recover(_X, _Bs) -> ok. -delete(_Tx, _X, _Bs) -> ok. add_binding(?TX, X, B) -> - call(X, {add_binding, B}); -add_binding(_, _, _) -> ok. -remove_bindings(_Tx, _X, _Bs) -> ok. + call(X, {add_binding, B}), + with_module(X, fun (M) -> M:add_binding(?TX, X, B) end); +add_binding(Tx, X, B) -> + with_module(X, fun (M) -> M:add_binding(Tx, X, B) end). + +remove_bindings(Tx, X, Bs) -> + with_module(X, fun (M) -> M:remove_bindings(Tx, X, Bs) end). + assert_args_equivalence(X, Args) -> - rabbit_exchange:assert_args_equivalence(X, Args). + with_module(X, fun (M) -> M:assert_args_equivalence(X, Args) end). %%---------------------------------------------------------------------------- call(#exchange{ name = Local }, Msg) -> - [{_, Pid}] = ets:lookup(?ETS_NAME, Local), + [{_, Pid, _}] = ets:lookup(?ETS_NAME, Local), gen_server:call(Pid, Msg, infinity). -%%---------------------------------------------------------------------------- - -start_link(Local, Remote) -> - gen_server:start_link(?MODULE, {Local, Remote}, [{timeout, infinity}]). +with_module(#exchange{ name = Local }, Fun) -> + [{_, _, Module}] = ets:lookup(?ETS_NAME, Local), + Fun(Module). %%---------------------------------------------------------------------------- -init({Local, RemoteURI}) -> +start_link(Local, Remote, Module) -> + gen_server:start_link(?MODULE, {Local, Remote, Module}, + [{timeout, infinity}]). + +%%---------------------------------------------------------------------------- + +init({Local, RemoteURI, Module}) -> Remote0 = uri_parser:parse(RemoteURI, [{host, undefined}, {path, "/"}, {port, undefined}, {'query', []}]), [VHostEnc, XEnc] = string:tokens(proplists:get_value(path, Remote0), "/"), @@ -108,7 +129,7 @@ init({Local, RemoteURI}) -> self()), {ok, LConn} = amqp_connection:start(direct), {ok, LCh} = amqp_connection:open_channel(LConn), - true = ets:insert(?ETS_NAME, {Local, self()}), + true = ets:insert(?ETS_NAME, {Local, self(), Module}), {ok, #state{local_connection = LConn, local_channel = LCh, remote_connection = RConn, remote_channel = RCh, local = Local, diff --git a/deps/rabbitmq_federation/src/rabbit_federation_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_sup.erl index 77ff5b9757..839c03e371 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_sup.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor). --export([start_link/0, start_child/2]). +-export([start_link/0, start_child/3]). -export([init/1]). @@ -27,10 +27,10 @@ start_link() -> supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []). -start_child(Local, Remote) -> +start_child(Local, Remote, Module) -> supervisor:start_child(?SUPERVISOR, {exchange, {rabbit_federation_exchange, start_link, - [Local, Remote]}, + [Local, Remote, Module]}, permanent, brutal_kill, worker, [rabbit_federation_exchange]}).