Rename local / remote to downstream / upstream.

This commit is contained in:
Simon MacMullen 2011-02-10 17:59:33 +00:00
parent 438df4c513
commit 6371fa1cd9
3 changed files with 60 additions and 57 deletions

View File

@ -33,15 +33,15 @@ stop(_State) ->
%%----------------------------------------------------------------------------
declare_exchange({Local, Remote, Type}) ->
declare_exchange({Downstream, Upstream, Type}) ->
{ok, Conn} = amqp_connection:start(direct),
{ok, Ch} = amqp_connection:open_channel(Conn),
%% TODO make durable, recover bindings etc
amqp_channel:call(Ch, #'exchange.declare'{
exchange = list_to_binary(Local),
type = <<"x-federation">>,
arguments =
[{<<"remote">>, longstr, list_to_binary(Remote)},
{<<"type">>, longstr, list_to_binary(Type)}]}),
amqp_channel:call(
Ch, #'exchange.declare'{
exchange = list_to_binary(Downstream),
type = <<"x-federation">>,
arguments = [{<<"upstream">>, longstr, list_to_binary(Upstream)},
{<<"type">>, longstr, list_to_binary(Type)}]}),
amqp_channel:close(Ch),
amqp_connection:close(Conn).

View File

@ -44,9 +44,9 @@
-define(ETS_NAME, ?MODULE).
-define(TX, false).
-record(state, { local_connection, local_channel,
remote_connection, remote_channel,
local, remote, remote_queue }).
-record(state, { downstream_connection, downstream_channel,
upstream_connection, upstream_channel,
downstream_exchange, upstream_queue, upstream_properties }).
%%----------------------------------------------------------------------------
@ -61,18 +61,18 @@ description() ->
route(X, Delivery) ->
with_module(X, fun (M) -> M:route(X, Delivery) end).
validate(X) ->
validate(_X) ->
ok.
%%with_module(X, fun (M) -> M:validate(X) end).
create(?TX, X = #exchange{ name = Local, arguments = Args }) ->
{longstr, Remote} = rabbit_misc:table_lookup(Args, <<"remote">>),
create(?TX, X = #exchange{ name = Downstream, arguments = Args }) ->
{longstr, Upstream} = rabbit_misc:table_lookup(Args, <<"upstream">>),
{longstr, Type} = rabbit_misc:table_lookup(Args, <<"type">>),
{ok, Module} = rabbit_registry:lookup_module(
exchange, rabbit_exchange:check_type(Type)),
rabbit_federation_sup:start_child(Local, binary_to_list(Remote), Module),
rabbit_federation_sup:start_child(Downstream, binary_to_list(Upstream), Module),
with_module(X, fun (M) -> M:create(?TX, X) end);
create(Tx, X) ->
create(_Tx, _X) ->
ok.
%%with_module(X, fun (M) -> M:create(Tx, X) end).
@ -98,55 +98,58 @@ assert_args_equivalence(X, Args) ->
%%----------------------------------------------------------------------------
call(#exchange{ name = Local }, Msg) ->
[{_, Pid, _}] = ets:lookup(?ETS_NAME, Local),
call(#exchange{ name = Downstream }, Msg) ->
[{_, Pid, _}] = ets:lookup(?ETS_NAME, Downstream),
gen_server:call(Pid, Msg, infinity).
with_module(#exchange{ name = Local }, Fun) ->
[{_, _, Module}] = ets:lookup(?ETS_NAME, Local),
with_module(#exchange{ name = Downstream }, Fun) ->
[{_, _, Module}] = ets:lookup(?ETS_NAME, Downstream),
Fun(Module).
%%----------------------------------------------------------------------------
start_link(Local, Remote, Module) ->
gen_server:start_link(?MODULE, {Local, Remote, Module},
start_link(Downstream, Upstream, Module) ->
gen_server:start_link(?MODULE, {Downstream, Upstream, Module},
[{timeout, infinity}]).
%%----------------------------------------------------------------------------
init({Local, RemoteURI, Module}) ->
Remote0 = uri_parser:parse(RemoteURI, [{host, undefined}, {path, "/"},
{port, undefined}, {'query', []}]),
[VHostEnc, XEnc] = string:tokens(proplists:get_value(path, Remote0), "/"),
init({DownstreamX, UpstreamURI, Module}) ->
UpstreamProps0 = uri_parser:parse(
UpstreamURI, [{host, undefined}, {path, "/"},
{port, undefined}, {'query', []}]),
[VHostEnc, XEnc] = string:tokens(
proplists:get_value(path, UpstreamProps0), "/"),
VHost = httpd_util:decode_hex(VHostEnc),
X = httpd_util:decode_hex(XEnc),
Remote = [{vhost, VHost}, {exchange, X}] ++ Remote0,
Params = #amqp_params{host = proplists:get_value(host, Remote),
UpstreamProps = [{vhost, VHost}, {exchange, X}] ++ UpstreamProps0,
Params = #amqp_params{host = proplists:get_value(host, UpstreamProps),
virtual_host = list_to_binary(VHost)},
{ok, RConn} = amqp_connection:start(network, Params),
{ok, RCh} = amqp_connection:open_channel(RConn),
{ok, UConn} = amqp_connection:start(network, Params),
{ok, UCh} = amqp_connection:open_channel(UConn),
#'queue.declare_ok' {queue = Q} =
amqp_channel:call(RCh, #'queue.declare'{ exclusive = true}),
amqp_channel:subscribe(RCh, #'basic.consume'{ queue = Q,
amqp_channel:call(UCh, #'queue.declare'{ exclusive = true}),
amqp_channel:subscribe(UCh, #'basic.consume'{ queue = Q,
no_ack = true }, %% FIXME
self()),
{ok, LConn} = amqp_connection:start(direct),
{ok, LCh} = amqp_connection:open_channel(LConn),
true = ets:insert(?ETS_NAME, {Local, self(), Module}),
{ok, #state{local_connection = LConn, local_channel = LCh,
remote_connection = RConn, remote_channel = RCh,
local = Local,
remote = Remote, remote_queue = Q} }.
{ok, DConn} = amqp_connection:start(direct),
{ok, DCh} = amqp_connection:open_channel(DConn),
true = ets:insert(?ETS_NAME, {DownstreamX, self(), Module}),
{ok, #state{downstream_connection = DConn, downstream_channel = DCh,
upstream_connection = UConn, upstream_channel = UCh,
downstream_exchange = DownstreamX,
upstream_properties = UpstreamProps, upstream_queue = Q} }.
handle_call({add_binding, #binding{key = Key, args = Args} }, _From,
State = #state{ remote_channel = RCh,
remote = Remote, remote_queue = Q}) ->
amqp_channel:call(RCh, #'queue.bind'{
queue = Q,
exchange = list_to_binary(
proplists:get_value(exchange, Remote)),
routing_key = Key,
arguments = Args}),
State = #state{ upstream_channel = UCh,
upstream_properties = UpstreamProps,
upstream_queue = Q}) ->
amqp_channel:call(
UCh, #'queue.bind'{
queue = Q,
exchange = list_to_binary(proplists:get_value(exchange, UpstreamProps)),
routing_key = Key,
arguments = Args}),
{reply, ok, State};
handle_call(Msg, _From, State) ->
@ -162,9 +165,9 @@ handle_info({#'basic.deliver'{delivery_tag = _DTag,
%%redelivered = Redelivered,
%%exchange = Exchange,
routing_key = Key},
Msg}, State = #state{local = #resource {name = X},
local_channel = LCh}) ->
amqp_channel:cast(LCh, #'basic.publish'{exchange = X,
Msg}, State = #state{downstream_exchange = #resource {name = X},
downstream_channel = DCh}) ->
amqp_channel:cast(DCh, #'basic.publish'{exchange = X,
routing_key = Key}, Msg),
{noreply, State};
@ -174,10 +177,10 @@ handle_info(Msg, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Reason, State = #state { local_connection = LConn,
remote_connection = RConn,
local = Local }) ->
amqp_connection:close(LConn),
amqp_connection:close(RConn),
true = ets:delete(?ETS_NAME, Local),
terminate(_Reason, State = #state { downstream_connection = DConn,
upstream_connection = UConn,
downstream_exchange = DownstreamX }) ->
amqp_connection:close(DConn),
amqp_connection:close(UConn),
true = ets:delete(?ETS_NAME, DownstreamX),
State.

View File

@ -27,10 +27,10 @@
start_link() ->
supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []).
start_child(Local, Remote, Module) ->
start_child(Downstream, Upstream, Module) ->
supervisor:start_child(?SUPERVISOR,
{exchange, {rabbit_federation_exchange, start_link,
[Local, Remote, Module]},
[Downstream, Upstream, Module]},
permanent, brutal_kill, worker,
[rabbit_federation_exchange]}).