Merge default

This commit is contained in:
Simon MacMullen 2014-02-05 15:15:57 +00:00
commit 80bfbe2272
9 changed files with 108 additions and 23 deletions

View File

@ -0,0 +1,5 @@
#!/bin/sh
CTL=$1
$CTL set_parameter federation-upstream cycle2 '{"uri": "amqp://localhost:5675", "max-hops": 99}'
$CTL set_policy cycle "^cycle$" '{"federation-upstream-set": "all"}'

View File

@ -0,0 +1,5 @@
#!/bin/sh
CTL=$1
$CTL set_parameter federation-upstream cycle1 '{"uri": "amqp://localhost:5674", "max-hops": 99}'
$CTL set_policy cycle "^cycle$" '{"federation-upstream-set": "all"}'

View File

@ -38,5 +38,6 @@
-define(ROUTING_HEADER, <<"x-received-from">>).
-define(BINDING_HEADER, <<"x-bound-from">>).
-define(MAX_HOPS_ARG, <<"x-max-hops">>).
-define(MAX_HOPS_ARG, <<"x-max-hops">>).
-define(NODE_NAME_ARG, <<"x-downstream-name">>).
-define(DEF_PREFETCH, 1000).

View File

@ -6,6 +6,6 @@ WITH_BROKER_SETUP_SCRIPTS:=$(PACKAGE_DIR)/etc/setup-rabbit-test.sh
$(PACKAGE_DIR)+pre-test::
rm -rf tmp /tmp/rabbitmq-*-mnesia
for R in hare flopsy mopsy cottontail dylan bugs jessica ; do \
for R in hare flopsy mopsy cottontail dylan bugs jessica cycle1 cycle2; do \
erl_call -sname $$R -q ; \
done

View File

@ -29,10 +29,12 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-import(rabbit_misc, [pget/2]).
-import(rabbit_federation_util, [name/1, vhost/1]).
-record(state, {upstream,
upstream_params,
upstream_name,
connection,
channel,
consumer_tag,
@ -129,6 +131,7 @@ handle_info({#'basic.deliver'{routing_key = Key,
State = #state{
upstream = Upstream = #upstream{max_hops = MaxH},
upstream_params = UParams,
upstream_name = UName,
downstream_exchange = #resource{name = XNameBin},
downstream_channel = DCh,
channel = Ch,
@ -136,10 +139,13 @@ handle_info({#'basic.deliver'{routing_key = Key,
PublishMethod = #'basic.publish'{exchange = XNameBin,
routing_key = Key},
%% TODO add user information here?
HeadersFun = fun (H) -> update_headers(UParams, Redelivered, H) end,
HeadersFun = fun (H) -> update_headers(UParams, UName, Redelivered, H) end,
%% We need to check should_forward/2 here in case the upstream
%% does not have federation and thus is using a fanout exchange.
ForwardFun = fun (H) -> rabbit_federation_util:should_forward(H, MaxH) end,
ForwardFun = fun (H) ->
DName = rabbit_nodes:cluster_name(),
rabbit_federation_util:should_forward(H, MaxH, DName)
end,
Unacked1 = rabbit_federation_link_util:forward(
Upstream, DeliverMethod, Ch, DCh, PublishMethod,
HeadersFun, ForwardFun, Msg, Unacked),
@ -329,26 +335,31 @@ bind_cmd0(unbind, Source, Destination, RoutingKey, Arguments) ->
%% restrictive max_hops we have yet passed through.
update_binding(Args, #state{downstream_exchange = X,
upstream = Upstream}) ->
upstream = Upstream,
upstream_name = UName}) ->
#upstream{max_hops = MaxHops} = Upstream,
Hops = case rabbit_misc:table_lookup(Args, ?BINDING_HEADER) of
undefined -> MaxHops;
{array, All} -> [{table, Prev} | _] = All,
{short, PrevHops} =
rabbit_misc:table_lookup(Prev, <<"hops">>),
lists:min([PrevHops - 1, MaxHops])
case rabbit_federation_util:already_seen(
UName, All) of
true -> 0;
false -> lists:min([PrevHops - 1, MaxHops])
end
end,
case Hops of
0 -> ignore;
_ -> Node = rabbit_nodes:cluster_name(),
_ -> Cluster = rabbit_nodes:cluster_name(),
ABSuffix = rabbit_federation_db:get_active_suffix(
X, Upstream, <<"A">>),
DVHost = vhost(X),
DName = name(X),
Down = <<Node/binary, ":", DVHost/binary,":", DName/binary, " ",
ABSuffix/binary>>,
Info = [{<<"downstream">>, longstr, Down},
{<<"hops">>, short, Hops}],
Down = <<DVHost/binary,":", DName/binary, " ", ABSuffix/binary>>,
Info = [{<<"cluster-name">>, longstr, Cluster},
{<<"exchange">>, longstr, Down},
{<<"hops">>, short, Hops}],
rabbit_basic:prepend_table_header(?BINDING_HEADER, Info, Args)
end.
@ -365,6 +376,13 @@ go(S0 = {not_started, {Upstream, UParams, DownXName}}) ->
Unacked = rabbit_federation_link_util:unacked_new(),
rabbit_federation_link_util:start_conn_ch(
fun (Conn, Ch, DConn, DCh) ->
Props = pget(server_properties,
amqp_connection:info(Conn, [server_properties])),
UName = case rabbit_misc:table_lookup(
Props, <<"cluster_name">>) of
{longstr, N} -> N;
_ -> unknown
end,
{Serial, Bindings} =
rabbit_misc:execute_mnesia_transaction(
fun () ->
@ -383,6 +401,7 @@ go(S0 = {not_started, {Upstream, UParams, DownXName}}) ->
consume_from_upstream_queue(
#state{upstream = Upstream,
upstream_params = UParams,
upstream_name = UName,
connection = Conn,
channel = Ch,
next_serial = Serial,
@ -487,9 +506,11 @@ ensure_internal_exchange(IntXNameBin,
internal = true,
auto_delete = true},
Purpose = [{<<"x-internal-purpose">>, longstr, <<"federation">>}],
XFUArgs = [{?MAX_HOPS_ARG, long, MaxHops},
{?NODE_NAME_ARG, longstr, rabbit_nodes:cluster_name()}
| Purpose],
XFU = Base#'exchange.declare'{type = <<"x-federation-upstream">>,
arguments = [{?MAX_HOPS_ARG, long, MaxHops} |
Purpose]},
arguments = XFUArgs},
Fan = Base#'exchange.declare'{type = <<"fanout">>,
arguments = Purpose},
rabbit_federation_link_util:disposable_connection_call(
@ -518,7 +539,11 @@ delete_upstream_exchange(Conn, XNameBin) ->
rabbit_federation_link_util:disposable_channel_call(
Conn, #'exchange.delete'{exchange = XNameBin}).
update_headers(#upstream_params{table = Table}, Redelivered, Headers) ->
update_headers(#upstream_params{table = Table}, UName, Redelivered, Headers) ->
rabbit_basic:prepend_table_header(
?ROUTING_HEADER, Table ++ [{<<"redelivered">>, bool, Redelivered}],
?ROUTING_HEADER, Table ++ [{<<"redelivered">>, bool, Redelivered}] ++
header_for_name(UName),
Headers).
header_for_name(unknown) -> [];
header_for_name(Name) -> [{<<"cluster-name">>, longstr, Name}].

View File

@ -55,13 +55,12 @@ upstreams(XorQ) ->
{_, undefined} -> [[{<<"upstream">>, UName}]]
end.
params_table(SafeURI, Params, XorQ) ->
params_table(SafeURI, XorQ) ->
Key = case XorQ of
#exchange{} -> <<"exchange">>;
#amqqueue{} -> <<"queue">>
end,
[{<<"uri">>, longstr, SafeURI},
{<<"virtual_host">>, longstr, vhost(Params)},
{Key, longstr, name(XorQ)}].
params_to_string(#upstream_params{safe_uri = SafeURI,
@ -81,7 +80,7 @@ to_params(Upstream = #upstream{uris = URIs}, XorQ) ->
uri = URI,
x_or_q = XorQ1,
safe_uri = SafeURI,
table = params_table(SafeURI, Params, XorQ)}.
table = params_table(SafeURI, XorQ)}.
print(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)).

View File

@ -43,9 +43,16 @@ serialise_events() -> false.
route(X = #exchange{arguments = Args},
D = #delivery{message = #basic_message{content = Content}}) ->
%% This arg was introduced in the same release as this exchange type;
%% it must be set
{long, MaxHops} = rabbit_misc:table_lookup(Args, ?MAX_HOPS_ARG),
%% This was introduced later; it might be missing
DName = case rabbit_misc:table_lookup(Args, ?NODE_NAME_ARG) of
{longstr, N} -> N;
_ -> unknown
end,
Headers = rabbit_basic:extract_headers(Content),
case rabbit_federation_util:should_forward(Headers, MaxHops) of
case rabbit_federation_util:should_forward(Headers, MaxHops, DName) of
true -> rabbit_exchange_type_fanout:route(X, D);
false -> []
end.

View File

@ -19,21 +19,27 @@
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_federation.hrl").
-export([should_forward/2, find_upstreams/2]).
-export([should_forward/3, find_upstreams/2, already_seen/2]).
-export([validate_arg/3, fail/2, name/1, vhost/1, r/1]).
-import(rabbit_misc, [pget_or_die/2, pget/3]).
%%----------------------------------------------------------------------------
should_forward(undefined, _MaxHops) ->
should_forward(undefined, _MaxHops, _DName) ->
true;
should_forward(Headers, MaxHops) ->
should_forward(Headers, MaxHops, DName) ->
case rabbit_misc:table_lookup(Headers, ?ROUTING_HEADER) of
{array, A} -> length(A) < MaxHops;
{array, A} -> length(A) < MaxHops andalso not already_seen(DName, A);
_ -> true
end.
already_seen(Name, Array) ->
lists:any(fun ({table, T}) -> {longstr, Name} =:= rabbit_misc:table_lookup(
T, <<"cluster-name">>);
(_) -> false
end, Array).
find_upstreams(Name, Upstreams) ->
[U || U = #upstream{name = Name2} <- Upstreams,
Name =:= Name2].

View File

@ -47,6 +47,10 @@
-define(BUGS, {"bugs", 5675}).
-define(JESSICA, {"jessica", 5676}).
%% Used in cycle_detection_test
-define(CYCLE1, {"cycle1", 5674}).
-define(CYCLE2, {"cycle2", 5675}).
simple_test() ->
with_ch(
fun (Ch) ->
@ -358,6 +362,39 @@ max_hops_test() ->
stop_other_node(?COTTONTAIL),
ok.
%% Two nodes, both federated with each other, and max_hops set to a
%% high value. Things should not get out of hand.
cycle_detection_test() ->
Cycle1 = start_other_node(?CYCLE1),
Cycle2 = start_other_node(?CYCLE2),
declare_exchange(Cycle1, x(<<"cycle">>)),
declare_exchange(Cycle2, x(<<"cycle">>)),
Q1 = bind_queue(Cycle1, <<"cycle">>, <<"key">>),
Q2 = bind_queue(Cycle2, <<"cycle">>, <<"key">>),
%% Wait for federation to come up on all nodes
timer:sleep(5000),
%% "key" listed twice because once for the local queue and once
%% for federation in each case
assert_bindings(?CYCLE1, <<"cycle">>, [<<"key">>, <<"key">>]),
assert_bindings(?CYCLE2, <<"cycle">>, [<<"key">>, <<"key">>]),
publish(Cycle1, <<"cycle">>, <<"key">>, <<"HELLO1">>),
publish(Cycle2, <<"cycle">>, <<"key">>, <<"HELLO2">>),
Msgs = [<<"HELLO1">>, <<"HELLO2">>],
expect(Cycle1, Q1, Msgs),
expect(Cycle2, Q2, Msgs),
expect_empty(Cycle1, Q1),
expect_empty(Cycle2, Q2),
stop_other_node(?CYCLE1),
stop_other_node(?CYCLE2),
ok.
%% Arrows indicate message flow. Numbers indicate max_hops.
%%
%% Dylan ---1--> Bugs ---2--> Jessica