stable to default

This commit is contained in:
Simon MacMullen 2014-02-11 16:42:31 +00:00
commit 6b1349bd2f
16 changed files with 252 additions and 190 deletions

View File

@ -0,0 +1,5 @@
#!/bin/sh
CTL=$1
$CTL set_parameter federation-upstream cycle2 '{"uri": "amqp://localhost:5675", "max-hops": 99}'
$CTL set_policy cycle "^cycle$" '{"federation-upstream-set": "all"}'

View File

@ -0,0 +1,5 @@
#!/bin/sh
CTL=$1
$CTL set_parameter federation-upstream cycle1 '{"uri": "amqp://localhost:5674", "max-hops": 99}'
$CTL set_policy cycle "^cycle$" '{"federation-upstream-set": "all"}'

View File

@ -38,5 +38,6 @@
-define(ROUTING_HEADER, <<"x-received-from">>).
-define(BINDING_HEADER, <<"x-bound-from">>).
-define(MAX_HOPS_ARG, <<"x-max-hops">>).
-define(MAX_HOPS_ARG, <<"x-max-hops">>).
-define(NODE_NAME_ARG, <<"x-downstream-name">>).
-define(DEF_PREFETCH, 1000).

View File

@ -6,6 +6,6 @@ WITH_BROKER_SETUP_SCRIPTS:=$(PACKAGE_DIR)/etc/setup-rabbit-test.sh
$(PACKAGE_DIR)+pre-test::
rm -rf tmp /tmp/rabbitmq-*-mnesia
for R in hare flopsy mopsy cottontail dylan bugs jessica ; do \
for R in hare flopsy mopsy cottontail dylan bugs jessica cycle1 cycle2; do \
erl_call -sname $$R -q ; \
done

View File

@ -29,10 +29,12 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-import(rabbit_misc, [pget/2]).
-import(rabbit_federation_util, [name/1, vhost/1]).
-record(state, {upstream,
upstream_params,
upstream_name,
connection,
channel,
consumer_tag,
@ -129,6 +131,7 @@ handle_info({#'basic.deliver'{routing_key = Key,
State = #state{
upstream = Upstream = #upstream{max_hops = MaxH},
upstream_params = UParams,
upstream_name = UName,
downstream_exchange = #resource{name = XNameBin},
downstream_channel = DCh,
channel = Ch,
@ -136,10 +139,13 @@ handle_info({#'basic.deliver'{routing_key = Key,
PublishMethod = #'basic.publish'{exchange = XNameBin,
routing_key = Key},
%% TODO add user information here?
HeadersFun = fun (H) -> update_headers(UParams, Redelivered, H) end,
HeadersFun = fun (H) -> update_headers(UParams, UName, Redelivered, H) end,
%% We need to check should_forward/2 here in case the upstream
%% does not have federation and thus is using a fanout exchange.
ForwardFun = fun (H) -> rabbit_federation_util:should_forward(H, MaxH) end,
ForwardFun = fun (H) ->
DName = rabbit_nodes:cluster_name(),
rabbit_federation_util:should_forward(H, MaxH, DName)
end,
Unacked1 = rabbit_federation_link_util:forward(
Upstream, DeliverMethod, Ch, DCh, PublishMethod,
HeadersFun, ForwardFun, Msg, Unacked),
@ -329,26 +335,31 @@ bind_cmd0(unbind, Source, Destination, RoutingKey, Arguments) ->
%% restrictive max_hops we have yet passed through.
update_binding(Args, #state{downstream_exchange = X,
upstream = Upstream}) ->
upstream = Upstream,
upstream_name = UName}) ->
#upstream{max_hops = MaxHops} = Upstream,
Hops = case rabbit_misc:table_lookup(Args, ?BINDING_HEADER) of
undefined -> MaxHops;
{array, All} -> [{table, Prev} | _] = All,
{short, PrevHops} =
rabbit_misc:table_lookup(Prev, <<"hops">>),
lists:min([PrevHops - 1, MaxHops])
case rabbit_federation_util:already_seen(
UName, All) of
true -> 0;
false -> lists:min([PrevHops - 1, MaxHops])
end
end,
case Hops of
0 -> ignore;
_ -> Node = rabbit_federation_util:local_nodename(vhost(X)),
_ -> Cluster = rabbit_nodes:cluster_name(),
ABSuffix = rabbit_federation_db:get_active_suffix(
X, Upstream, <<"A">>),
DVHost = vhost(X),
DName = name(X),
Down = <<Node/binary, ":", DVHost/binary,":", DName/binary, " ",
ABSuffix/binary>>,
Info = [{<<"downstream">>, longstr, Down},
{<<"hops">>, short, Hops}],
Down = <<DVHost/binary,":", DName/binary, " ", ABSuffix/binary>>,
Info = [{<<"cluster-name">>, longstr, Cluster},
{<<"exchange">>, longstr, Down},
{<<"hops">>, short, Hops}],
rabbit_basic:prepend_table_header(?BINDING_HEADER, Info, Args)
end.
@ -358,6 +369,13 @@ go(S0 = {not_started, {Upstream, UParams, DownXName}}) ->
Unacked = rabbit_federation_link_util:unacked_new(),
rabbit_federation_link_util:start_conn_ch(
fun (Conn, Ch, DConn, DCh) ->
Props = pget(server_properties,
amqp_connection:info(Conn, [server_properties])),
UName = case rabbit_misc:table_lookup(
Props, <<"cluster_name">>) of
{longstr, N} -> N;
_ -> unknown
end,
{Serial, Bindings} =
rabbit_misc:execute_mnesia_transaction(
fun () ->
@ -376,6 +394,7 @@ go(S0 = {not_started, {Upstream, UParams, DownXName}}) ->
consume_from_upstream_queue(
#state{upstream = Upstream,
upstream_params = UParams,
upstream_name = UName,
connection = Conn,
channel = Ch,
next_serial = Serial,
@ -480,9 +499,11 @@ ensure_internal_exchange(IntXNameBin,
internal = true,
auto_delete = true},
Purpose = [{<<"x-internal-purpose">>, longstr, <<"federation">>}],
XFUArgs = [{?MAX_HOPS_ARG, long, MaxHops},
{?NODE_NAME_ARG, longstr, rabbit_nodes:cluster_name()}
| Purpose],
XFU = Base#'exchange.declare'{type = <<"x-federation-upstream">>,
arguments = [{?MAX_HOPS_ARG, long, MaxHops} |
Purpose]},
arguments = XFUArgs},
Fan = Base#'exchange.declare'{type = <<"fanout">>,
arguments = Purpose},
rabbit_federation_link_util:disposable_connection_call(
@ -492,7 +513,7 @@ ensure_internal_exchange(IntXNameBin,
upstream_queue_name(XNameBin, VHost, #resource{name = DownXNameBin,
virtual_host = DownVHost}) ->
Node = rabbit_federation_util:local_nodename(DownVHost),
Node = rabbit_nodes:cluster_name(),
DownPart = case DownVHost of
VHost -> case DownXNameBin of
XNameBin -> <<"">>;
@ -511,7 +532,11 @@ delete_upstream_exchange(Conn, XNameBin) ->
rabbit_federation_link_util:disposable_channel_call(
Conn, #'exchange.delete'{exchange = XNameBin}).
update_headers(#upstream_params{table = Table}, Redelivered, Headers) ->
update_headers(#upstream_params{table = Table}, UName, Redelivered, Headers) ->
rabbit_basic:prepend_table_header(
?ROUTING_HEADER, Table ++ [{<<"redelivered">>, bool, Redelivered}],
?ROUTING_HEADER, Table ++ [{<<"redelivered">>, bool, Redelivered}] ++
header_for_name(UName),
Headers).
header_for_name(unknown) -> [];
header_for_name(Name) -> [{<<"cluster-name">>, longstr, Name}].

View File

@ -58,7 +58,7 @@ adjust(Sup, XorQ, {clear_upstream, UpstreamName}) ->
[stop(Sup, Upstream, XorQ) || Upstream <- children(Sup, UpstreamName)];
%% TODO handle changes of upstream sets minimally (bug 24853)
adjust(Sup, X = #exchange{name = XName}, {upstream_set, Set}) ->
adjust(Sup, X = #exchange{name = XName}, {upstream_set, _Set}) ->
case rabbit_federation_upstream:federate(X) of
false -> ok;
true -> ok = rabbit_federation_db:prune_scratch(

View File

@ -43,7 +43,7 @@ start_conn_ch(Fun, Upstream, UParams,
%% us to exit. We can therefore only trap exits when past that
%% point. Bug 24372 may help us do something nicer.
process_flag(trap_exit, true),
case open_monitor(local_params(Upstream, DownVHost)) of
case open_monitor(#amqp_params_direct{virtual_host = DownVHost}) of
{ok, DConn, DCh} ->
case Upstream#upstream.ack_mode of
'on-confirm' ->
@ -286,20 +286,3 @@ disposable_connection_call(Params, Method, ErrFun) ->
E ->
E
end.
local_params(#upstream{trust_user_id = Trust}, VHost) ->
{ok, DefaultUser} = application:get_env(rabbit, default_user),
Username = rabbit_runtime_parameters:value(
VHost, <<"federation">>, <<"local-username">>, DefaultUser),
case rabbit_access_control:check_user_login(Username, []) of
{ok, User0} -> User = maybe_impersonator(Trust, User0),
#amqp_params_direct{username = User,
virtual_host = VHost};
{refused, _M, _A} -> exit({error, user_does_not_exist})
end.
maybe_impersonator(Trust, User = #user{tags = Tags}) ->
case Trust andalso not lists:member(impersonator, Tags) of
true -> User#user{tags = [impersonator | Tags]};
false -> User
end.

View File

@ -50,12 +50,6 @@ validate(_VHost, <<"federation-upstream">>, Name, Term) ->
Name, [{<<"uri">>, fun validate_uri/2, mandatory} |
shared_validation()], Term);
validate(_VHost, <<"federation">>, <<"local-nodename">>, Term) ->
rabbit_parameter_validation:binary(<<"local-nodename">>, Term);
validate(_VHost, <<"federation">>, <<"local-username">>, Term) ->
rabbit_parameter_validation:binary(<<"local-username">>, Term);
validate(_VHost, _Component, Name, _Term) ->
{error, "name not recognised: ~p", [Name]}.
@ -63,25 +57,13 @@ notify(_VHost, <<"federation-upstream-set">>, Name, _Term) ->
adjust({upstream_set, Name});
notify(_VHost, <<"federation-upstream">>, Name, _Term) ->
adjust({upstream, Name});
notify(_VHost, <<"federation">>, <<"local-nodename">>, _Term) ->
adjust(everything);
notify(_VHost, <<"federation">>, <<"local-username">>, _Term) ->
adjust(everything).
adjust({upstream, Name}).
notify_clear(_VHost, <<"federation-upstream-set">>, Name) ->
adjust({clear_upstream_set, Name});
notify_clear(_VHost, <<"federation-upstream">>, Name) ->
adjust({clear_upstream, Name});
notify_clear(_VHost, <<"federation">>, <<"local-nodename">>) ->
adjust(everything);
notify_clear(_VHost, <<"federation">>, <<"local-username">>) ->
adjust(everything).
adjust({clear_upstream, Name}).
adjust(Thing) ->
rabbit_federation_exchange_link_sup_sup:adjust(Thing),

View File

@ -28,7 +28,8 @@
-behaviour(rabbit_queue_decorator).
-export([startup/1, shutdown/1, policy_changed/2, active_for/1, notify/3]).
-export([startup/1, shutdown/1, policy_changed/2, active_for/1,
consumer_state_changed/3]).
-export([policy_changed_local/2]).
-import(rabbit_misc, [pget/2]).
@ -73,34 +74,36 @@ active_for(Q = #amqqueue{arguments = Args}) ->
%% that doesn't want to be federated.
%% We need to reconsider whether we need to run or pause every time
%% something significant changes in the queue. In theory we don't need
%% to respond to absolutely every event the queue emits, but in
%% practice we need to respond to most of them and it doesn't really
%% cost much to respond to all of them. So that's why we ignore the
%% Event parameter.
%% the consumer state changes in the queue. But why can the state
%% change?
%%
%% For the record, the events, and why we care about them:
%%
%% consumer_blocked | We may have no more active consumers, and thus need to
%% consumer blocked | We may have no more active consumers, and thus need to
%% | pause
%% |
%% consumer_unblocked | We don't care
%% consumer unblocked | We don't care
%% |
%% queue_empty | The queue has become empty therefore we need to run to
%% queue empty | The queue has become empty therefore we need to run to
%% | get more messages
%% |
%% basic_consume | We don't care
%% basic consume | We don't care
%% |
%% basic_cancel | We may have no more active consumers, and thus need to
%% basic cancel | We may have no more active consumers, and thus need to
%% | pause
%% |
%% refresh | We asked for it (we have started a new link after
%% | failover and need something to prod us into action
%% | (or not)).
%%
%% In the cases where we don't care it's not prohibitively expensive
%% for us to be here anyway, so never mind.
%%
%% Note that there is no "queue became non-empty" state change - that's
%% because of the queue invariant. If the queue transitions from empty to
%% non-empty then it must have no active consumers - in which case it stays
%% the same from our POV.
notify(#amqqueue{name = QName}, _Event, Props) ->
case pget(is_empty, Props) andalso
active_unfederated(pget(max_active_consumer_priority, Props)) of
consumer_state_changed(#amqqueue{name = QName}, MaxActivePriority, IsEmpty) ->
case IsEmpty andalso active_unfederated(MaxActivePriority) of
true -> rabbit_federation_queue_link:run(QName);
false -> rabbit_federation_queue_link:pause(QName)
end,

View File

@ -89,18 +89,21 @@ code_change(_OldVsn, State, _Extra) ->
format(#entry{key = {#resource{virtual_host = VHost,
kind = Type,
name = XorQNameBin},
Connection, UXorQNameBin},
UpstreamName, UXorQNameBin},
status = Status,
uri = URI,
timestamp = Timestamp}) ->
[{type, Type},
{name, XorQNameBin},
{upstream_name, UXorQNameBin},
{vhost, VHost},
{connection, Connection},
{uri, URI},
{status, Status},
{timestamp, Timestamp}].
case Type of
exchange -> [{exchange, XorQNameBin},
{upstream_exchange, UXorQNameBin}];
queue -> [{queue, XorQNameBin},
{upstream_queue, UXorQNameBin}]
end ++ [{type, Type},
{vhost, VHost},
{upstream, UpstreamName},
{uri, URI},
{status, Status},
{timestamp, Timestamp}].
%% We don't want to key off the entire upstream, bits of it may change
key(XName = #resource{kind = exchange}, #upstream{name = UpstreamName,

View File

@ -55,13 +55,12 @@ upstreams(XorQ) ->
{_, undefined} -> [[{<<"upstream">>, UName}]]
end.
params_table(SafeURI, Params, XorQ) ->
params_table(SafeURI, XorQ) ->
Key = case XorQ of
#exchange{} -> <<"exchange">>;
#amqqueue{} -> <<"queue">>
end,
[{<<"uri">>, longstr, SafeURI},
{<<"virtual_host">>, longstr, vhost(Params)},
{Key, longstr, name(XorQ)}].
params_to_string(#upstream_params{safe_uri = SafeURI,
@ -69,18 +68,7 @@ params_to_string(#upstream_params{safe_uri = SafeURI,
print("~s on ~s", [rabbit_misc:rs(r(XorQ)), SafeURI]).
remove_credentials(URI) ->
Props = uri_parser:parse(binary_to_list(URI),
[{host, undefined}, {path, undefined},
{port, undefined}, {'query', []}]),
PortPart = case pget(port, Props) of
undefined -> "";
Port -> rabbit_misc:format(":~B", [Port])
end,
PGet = fun(K, P) -> case pget(K, P) of undefined -> ""; R -> R end end,
list_to_binary(
rabbit_misc:format(
"~s://~s~s~s", [pget(scheme, Props), PGet(host, Props),
PortPart, PGet(path, Props)])).
list_to_binary(amqp_uri:remove_credentials(binary_to_list(URI))).
to_params(Upstream = #upstream{uris = URIs}, XorQ) ->
random:seed(now()),
@ -92,7 +80,7 @@ to_params(Upstream = #upstream{uris = URIs}, XorQ) ->
uri = URI,
x_or_q = XorQ1,
safe_uri = SafeURI,
table = params_table(SafeURI, Params, XorQ)}.
table = params_table(SafeURI, XorQ)}.
print(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)).

View File

@ -43,9 +43,16 @@ serialise_events() -> false.
route(X = #exchange{arguments = Args},
D = #delivery{message = #basic_message{content = Content}}) ->
%% This arg was introduced in the same release as this exchange type;
%% it must be set
{long, MaxHops} = rabbit_misc:table_lookup(Args, ?MAX_HOPS_ARG),
%% This was introduced later; it might be missing
DName = case rabbit_misc:table_lookup(Args, ?NODE_NAME_ARG) of
{longstr, N} -> N;
_ -> unknown
end,
Headers = rabbit_basic:extract_headers(Content),
case rabbit_federation_util:should_forward(Headers, MaxHops) of
case rabbit_federation_util:should_forward(Headers, MaxHops, DName) of
true -> rabbit_exchange_type_fanout:route(X, D);
false -> []
end.

View File

@ -16,35 +16,30 @@
-module(rabbit_federation_util).
-include_lib("kernel/include/inet.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_federation.hrl").
-export([local_nodename/1, should_forward/2, find_upstreams/2]).
-export([should_forward/3, find_upstreams/2, already_seen/2]).
-export([validate_arg/3, fail/2, name/1, vhost/1, r/1]).
-import(rabbit_misc, [pget_or_die/2, pget/3]).
%%----------------------------------------------------------------------------
local_nodename(VHost) ->
rabbit_runtime_parameters:value(
VHost, <<"federation">>, <<"local-nodename">>, local_nodename_implicit()).
local_nodename_implicit() ->
{ID, _} = rabbit_nodes:parts(node()),
{ok, Host} = inet:gethostname(),
{ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host),
list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))).
should_forward(undefined, _MaxHops) ->
should_forward(undefined, _MaxHops, _DName) ->
true;
should_forward(Headers, MaxHops) ->
should_forward(Headers, MaxHops, DName) ->
case rabbit_misc:table_lookup(Headers, ?ROUTING_HEADER) of
{array, A} -> length(A) < MaxHops;
{array, A} -> length(A) < MaxHops andalso not already_seen(DName, A);
_ -> true
end.
already_seen(Name, Array) ->
lists:any(fun ({table, T}) -> {longstr, Name} =:= rabbit_misc:table_lookup(
T, <<"cluster-name">>);
(_) -> false
end, Array).
find_upstreams(Name, Upstreams) ->
[U || U = #upstream{name = Name2} <- Upstreams,
Name =:= Name2].

View File

@ -26,12 +26,14 @@
-import(rabbit_federation_test_util,
[expect/3, expect_empty/2, set_param/3, clear_param/2,
set_pol/3, clear_pol/1, plugin_dir/0, policy/1,
start_other_node/1, start_other_node/2, start_other_node/3,
stop_other_node/1]).
start_other_node/1, start_other_node/2, start_other_node/3]).
-define(UPSTREAM_DOWNSTREAM, [x(<<"upstream">>),
x(<<"fed.downstream">>)]).
%% Used everywhere
-define(RABBIT, {"rabbit-test", 5672}).
%% Used in restart_upstream_test
-define(HARE, {"hare", 5673}).
@ -45,10 +47,15 @@
-define(BUGS, {"bugs", 5675}).
-define(JESSICA, {"jessica", 5676}).
%% Used in cycle_detection_test
-define(CYCLE1, {"cycle1", 5674}).
-define(CYCLE2, {"cycle2", 5675}).
simple_test() ->
with_ch(
fun (Ch) ->
Q = bind_queue(Ch, <<"fed.downstream">>, <<"key">>),
await_binding(<<"upstream">>, <<"key">>),
publish_expect(Ch, <<"upstream">>, <<"key">>, Q, <<"HELLO">>)
end, ?UPSTREAM_DOWNSTREAM).
@ -56,6 +63,8 @@ multiple_upstreams_test() ->
with_ch(
fun (Ch) ->
Q = bind_queue(Ch, <<"fed12.downstream">>, <<"key">>),
await_binding(<<"upstream">>, <<"key">>),
await_binding(<<"upstream2">>, <<"key">>),
publish_expect(Ch, <<"upstream">>, <<"key">>, Q, <<"HELLO1">>),
publish_expect(Ch, <<"upstream2">>, <<"key">>, Q, <<"HELLO2">>)
end, [x(<<"upstream">>),
@ -86,15 +95,20 @@ expect_uris(URIs) -> [Link] = rabbit_federation_status:status(),
kill_only_connection(Node) ->
case connection_pids(Node) of
[Pid] -> rabbit_networking:close_connection(Pid, "why not?"),
[Pid] -> catch rabbit_networking:close_connection(Pid, "boom"), %% [1]
wait_for_pid_to_die(Node, Pid);
_ -> timer:sleep(1000),
_ -> timer:sleep(100),
kill_only_connection(Node)
end.
%% [1] the catch is because we could still see a connection from a
%% previous time round. If so that's fine (we'll just loop around
%% again) but we don't want the test to fail because a connection
%% closed as we were trying to close it.
wait_for_pid_to_die(Node, Pid) ->
case connection_pids(Node) of
[Pid] -> timer:sleep(1000),
[Pid] -> timer:sleep(100),
wait_for_pid_to_die(Node, Pid);
_ -> ok
end.
@ -105,6 +119,8 @@ multiple_downstreams_test() ->
fun (Ch) ->
Q1 = bind_queue(Ch, <<"fed.downstream">>, <<"key">>),
Q12 = bind_queue(Ch, <<"fed12.downstream2">>, <<"key">>),
await_binding(<<"upstream">>, <<"key">>, 2),
await_binding(<<"upstream2">>, <<"key">>),
publish(Ch, <<"upstream">>, <<"key">>, <<"HELLO1">>),
publish(Ch, <<"upstream2">>, <<"key">>, <<"HELLO2">>),
expect(Ch, Q1, [<<"HELLO1">>]),
@ -118,6 +134,7 @@ e2e_test() ->
fun (Ch) ->
bind_exchange(Ch, <<"downstream2">>, <<"fed.downstream">>,
<<"key">>),
await_binding(<<"upstream">>, <<"key">>),
Q = bind_queue(Ch, <<"downstream2">>, <<"key">>),
publish_expect(Ch, <<"upstream">>, <<"key">>, Q, <<"HELLO1">>)
end, ?UPSTREAM_DOWNSTREAM ++ [x(<<"downstream2">>)]).
@ -127,6 +144,7 @@ unbind_on_delete_test() ->
fun (Ch) ->
Q1 = bind_queue(Ch, <<"fed.downstream">>, <<"key">>),
Q2 = bind_queue(Ch, <<"fed.downstream">>, <<"key">>),
await_binding(<<"upstream">>, <<"key">>),
delete_queue(Ch, Q2),
publish_expect(Ch, <<"upstream">>, <<"key">>, Q1, <<"HELLO">>)
end, ?UPSTREAM_DOWNSTREAM).
@ -136,6 +154,7 @@ unbind_on_unbind_test() ->
fun (Ch) ->
Q1 = bind_queue(Ch, <<"fed.downstream">>, <<"key">>),
Q2 = bind_queue(Ch, <<"fed.downstream">>, <<"key">>),
await_binding(<<"upstream">>, <<"key">>),
unbind_queue(Ch, Q2, <<"fed.downstream">>, <<"key">>),
publish_expect(Ch, <<"upstream">>, <<"key">>, Q1, <<"HELLO">>),
delete_queue(Ch, Q2)
@ -154,6 +173,7 @@ user_id_test() ->
declare_exchange(Ch2, x(<<"upstream">>)),
declare_exchange(Ch, x(<<"hare.downstream">>)),
Q = bind_queue(Ch, <<"hare.downstream">>, <<"key">>),
await_binding(?HARE, <<"upstream">>, <<"key">>),
Msg = #amqp_msg{props = #'P_basic'{user_id = <<"hare-user">>},
payload = <<"HELLO">>},
@ -207,6 +227,8 @@ unbind_gets_transmitted_test() ->
Q12 = bind_queue(Ch, <<"fed.downstream">>, <<"key1">>),
Q21 = bind_queue(Ch, <<"fed.downstream">>, <<"key2">>),
Q22 = bind_queue(Ch, <<"fed.downstream">>, <<"key2">>),
await_binding(<<"upstream">>, <<"key1">>),
await_binding(<<"upstream">>, <<"key2">>),
[delete_queue(Ch, Q) || Q <- [Q12, Q21, Q22]],
publish(Ch, <<"upstream">>, <<"key1">>, <<"YES">>),
publish(Ch, <<"upstream">>, <<"key2">>, <<"NO">>),
@ -220,6 +242,8 @@ no_loop_test() ->
fun (Ch) ->
Q1 = bind_queue(Ch, <<"one">>, <<"key">>),
Q2 = bind_queue(Ch, <<"two">>, <<"key">>),
await_binding(<<"one">>, <<"key">>, 2),
await_binding(<<"two">>, <<"key">>, 2),
publish(Ch, <<"one">>, <<"key">>, <<"Hello from one">>),
publish(Ch, <<"two">>, <<"key">>, <<"Hello from two">>),
expect(Ch, Q1, [<<"Hello from one">>, <<"Hello from two">>]),
@ -242,13 +266,15 @@ binding_recovery_test() ->
bind_queue(Ch, Q, <<"fed.downstream">>, <<"key">>),
timer:sleep(100), %% To get the suffix written
stop_other_node(?HARE),
%% i.e. don't clean up
rabbit_federation_test_util:stop_other_node(?HARE),
start_other_node(?HARE, "hare-two-upstreams"),
?assert(none =/= suffix(?HARE, "upstream")),
?assert(none =/= suffix(?HARE, "upstream2")),
stop_other_node(?HARE),
%% again don't clean up
rabbit_federation_test_util:stop_other_node(?HARE),
Ch2 = start_other_node(?HARE),
@ -291,12 +317,11 @@ restart_upstream_test() ->
unbind_queue(
Downstream, Qgoes, <<"hare.downstream">>, <<"goes">>),
Upstream1 = start_other_node(?HARE),
publish(Upstream1, <<"upstream">>, <<"goes">>, <<"GOES">>),
publish(Upstream1, <<"upstream">>, <<"stays">>, <<"STAYS">>),
%% Give the link a chance to come up and for this binding
%% to be transferred
timer:sleep(1000),
publish(Upstream1, <<"upstream">>, <<"comes">>, <<"COMES">>),
expect(Downstream, Qstays, [<<"STAYS">>]),
expect(Downstream, Qcomes, [<<"COMES">>]),
expect_empty(Downstream, Qgoes),
@ -321,8 +346,9 @@ max_hops_test() ->
Q2 = bind_queue(Mopsy, <<"ring">>, <<"key">>),
Q3 = bind_queue(Cottontail, <<"ring">>, <<"key">>),
%% Wait for federation to come up on all nodes
timer:sleep(5000),
await_binding(?FLOPSY, <<"ring">>, <<"key">>, 3),
await_binding(?MOPSY, <<"ring">>, <<"key">>, 3),
await_binding(?COTTONTAIL, <<"ring">>, <<"key">>, 3),
publish(Flopsy, <<"ring">>, <<"key">>, <<"HELLO flopsy">>),
publish(Mopsy, <<"ring">>, <<"key">>, <<"HELLO mopsy">>),
@ -341,6 +367,36 @@ max_hops_test() ->
stop_other_node(?COTTONTAIL),
ok.
%% Two nodes, both federated with each other, and max_hops set to a
%% high value. Things should not get out of hand.
cycle_detection_test() ->
Cycle1 = start_other_node(?CYCLE1),
Cycle2 = start_other_node(?CYCLE2),
declare_exchange(Cycle1, x(<<"cycle">>)),
declare_exchange(Cycle2, x(<<"cycle">>)),
Q1 = bind_queue(Cycle1, <<"cycle">>, <<"key">>),
Q2 = bind_queue(Cycle2, <<"cycle">>, <<"key">>),
%% "key" present twice because once for the local queue and once
%% for federation in each case
await_binding(?CYCLE1, <<"cycle">>, <<"key">>, 2),
await_binding(?CYCLE2, <<"cycle">>, <<"key">>, 2),
publish(Cycle1, <<"cycle">>, <<"key">>, <<"HELLO1">>),
publish(Cycle2, <<"cycle">>, <<"key">>, <<"HELLO2">>),
Msgs = [<<"HELLO1">>, <<"HELLO2">>],
expect(Cycle1, Q1, Msgs),
expect(Cycle2, Q2, Msgs),
expect_empty(Cycle1, Q1),
expect_empty(Cycle2, Q2),
stop_other_node(?CYCLE1),
stop_other_node(?CYCLE2),
ok.
%% Arrows indicate message flow. Numbers indicate max_hops.
%%
%% Dylan ---1--> Bugs ---2--> Jessica
@ -379,24 +435,18 @@ binding_propagation_test() ->
Q2 = bind_queue(Bugs, <<"x">>, <<"bugs">>),
Q3 = bind_queue(Jessica, <<"x">>, <<"jessica">>),
%% Wait for bindings to propagate
timer:sleep(5000),
assert_bindings("dylan", <<"x">>, [<<"jessica">>, <<"jessica">>,
<<"bugs">>, <<"dylan">>]),
assert_bindings("bugs", <<"x">>, [<<"jessica">>, <<"bugs">>]),
assert_bindings("jessica", <<"x">>, [<<"dylan">>, <<"jessica">>]),
await_binding( ?DYLAN, <<"x">>, <<"jessica">>, 2),
await_bindings(?DYLAN, <<"x">>, [<<"bugs">>, <<"dylan">>]),
await_bindings(?BUGS, <<"x">>, [<<"jessica">>, <<"bugs">>]),
await_bindings(?JESSICA, <<"x">>, [<<"dylan">>, <<"jessica">>]),
delete_queue(Dylan, Q1),
delete_queue(Bugs, Q2),
delete_queue(Jessica, Q3),
%% Wait for bindings to propagate
timer:sleep(5000),
assert_bindings("dylan", <<"x">>, []),
assert_bindings("bugs", <<"x">>, []),
assert_bindings("jessica", <<"x">>, []),
await_bindings(?DYLAN, <<"x">>, []),
await_bindings(?BUGS, <<"x">>, []),
await_bindings(?JESSICA, <<"x">>, []),
stop_other_node(?DYLAN),
stop_other_node(?BUGS),
@ -412,6 +462,7 @@ upstream_has_no_federation_test() ->
declare_exchange(Upstream, x(<<"upstream">>)),
declare_exchange(Downstream, x(<<"hare.downstream">>)),
Q = bind_queue(Downstream, <<"hare.downstream">>, <<"routing">>),
await_binding(?HARE, <<"upstream">>, <<"routing">>),
publish(Upstream, <<"upstream">>, <<"routing">>, <<"HELLO">>),
expect(Downstream, Q, [<<"HELLO">>]),
delete_exchange(Downstream, <<"hare.downstream">>),
@ -426,10 +477,6 @@ dynamic_reconfiguration_test() ->
%% Left from the conf we set up for previous tests
assert_connections(Xs, [<<"localhost">>, <<"local5673">>]),
%% Test this at least does not blow up
set_param("federation", "local-nodename", "\"test\""),
assert_connections(Xs, [<<"localhost">>, <<"local5673">>]),
%% Test that clearing connections works
clear_param("federation-upstream", "localhost"),
clear_param("federation-upstream", "local5673"),
@ -502,12 +549,28 @@ with_ch(Fun, Xs) ->
{ok, Conn} = amqp_connection:start(#amqp_params_network{}),
{ok, Ch} = amqp_connection:open_channel(Conn),
declare_all(Ch, Xs),
rabbit_federation_test_util:assert_status(Xs),
rabbit_federation_test_util:assert_status(
Xs, {exchange, upstream_exchange}),
Fun(Ch),
delete_all(Ch, Xs),
amqp_connection:close(Conn),
cleanup(?RABBIT),
ok.
cleanup({Nodename, _}) ->
[rpc:call(n(Nodename), rabbit_amqqueue, delete, [Q, false, false]) ||
Q <- queues(Nodename)].
queues(Nodename) ->
case rpc:call(n(Nodename), rabbit_amqqueue, list, [<<"/">>]) of
{badrpc, _} -> [];
Qs -> Qs
end.
stop_other_node(Node) ->
cleanup(Node),
rabbit_federation_test_util:stop_other_node(Node).
declare_all(Ch, Xs) -> [declare_exchange(Ch, X) || X <- Xs].
delete_all(Ch, Xs) ->
[delete_exchange(Ch, X) || #'exchange.declare'{exchange = X} <- Xs].
@ -555,12 +618,30 @@ delete_exchange(Ch, X) ->
delete_queue(Ch, Q) ->
amqp_channel:call(Ch, #'queue.delete'{queue = Q}).
await_binding(X, Key) -> await_binding(?RABBIT, X, Key, 1).
await_binding(B = {_, _}, X, Key) -> await_binding(B, X, Key, 1);
await_binding(X, Key, Count) -> await_binding(?RABBIT, X, Key, Count).
await_binding(Broker = {Nodename, _Port}, X, Key, Count) ->
Bs = bindings_from(Nodename, X),
case [K || #binding{key = K} <- Bs, K =:= Key] of
L when length(L) < Count -> timer:sleep(100),
await_binding(Broker, X, Key, Count);
L when length(L) =:= Count -> ok;
L -> exit({too_many_bindings,
X, Key, Count, L, Bs})
end.
await_bindings(Broker, X, Keys) ->
[await_binding(Broker, X, Key) || Key <- Keys].
bindings_from(Nodename, X) ->
rpc:call(n(Nodename), rabbit_binding, list_for_source, [r(X)]).
publish(Ch, X, Key, Payload) when is_binary(Payload) ->
publish(Ch, X, Key, #amqp_msg{payload = Payload});
publish(Ch, X, Key, Msg = #amqp_msg{}) ->
%% The trouble is that we transmit bindings upstream asynchronously...
timer:sleep(5000),
amqp_channel:call(Ch, #'basic.publish'{exchange = X,
routing_key = Key}, Msg).
@ -568,25 +649,6 @@ publish_expect(Ch, X, Key, Q, Payload) ->
publish(Ch, X, Key, Payload),
expect(Ch, Q, [Payload]).
assert_bindings(Nodename, X, BindingsExp) ->
Bindings0 = rpc:call(n(Nodename), rabbit_binding, list_for_source, [r(X)]),
BindingsAct = [Key || #binding{key = Key} <- Bindings0],
assert_list(BindingsExp, BindingsAct).
assert_list(Exp, Act) ->
case assert_list0(Exp, Act) of
ok -> ok;
fail -> exit({assert_failed, Exp, Act})
end.
assert_list0([], []) -> ok;
assert_list0(Exp, []) -> fail;
assert_list0([], Act) -> fail;
assert_list0(Exp, [H | T]) -> case lists:member(H, Exp) of
true -> assert_list0(Exp -- [H], T);
false -> fail
end.
%%----------------------------------------------------------------------------
assert_connections(Xs, Conns) ->
@ -594,8 +656,10 @@ assert_connections(Xs, Conns) ->
X <- Xs,
C <- Conns],
Remaining = lists:foldl(
fun rabbit_federation_test_util:assert_link_status/2,
rabbit_federation_status:status(), Links),
fun (Link, Status) ->
rabbit_federation_test_util:assert_link_status(
Link, Status, {exchange, upstream_exchange})
end, rabbit_federation_status:status(), Links),
?assertEqual([], Remaining),
ok.

View File

@ -78,10 +78,6 @@ dynamic_reconfiguration_test() ->
fun (Ch) ->
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>),
%% Test this at least does not blow up
set_param("federation", "local-nodename", "\"test\""),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>),
%% Test that clearing connections works
clear_param("federation-upstream", "localhost"),
expect_no_federation(Ch, <<"upstream">>, <<"fed.downstream">>),
@ -160,7 +156,8 @@ with_ch(Fun, Qs) ->
{ok, Ch} = amqp_connection:open_channel(Conn),
declare_all(Ch, Qs),
timer:sleep(1000), %% Time for statuses to get updated
rabbit_federation_test_util:assert_status(Qs),
rabbit_federation_test_util:assert_status(
Qs, {queue, upstream_queue}),
Fun(Ch),
delete_all(Ch, Qs),
amqp_connection:close(Conn),

View File

@ -74,30 +74,32 @@ start_other_node({Name, Port}, Config) ->
os:getenv("RABBITMQ_ENABLED_PLUGINS_FILE")).
start_other_node({Name, Port}, Config, PluginsFile) ->
%% ?assertCmd seems to hang if you background anything. Bah!
Res = os:cmd("make -C " ++ plugin_dir() ++ " OTHER_NODE=" ++ Name ++
" OTHER_PORT=" ++ integer_to_list(Port) ++
" OTHER_CONFIG=" ++ Config ++
" OTHER_PLUGINS=" ++ PluginsFile ++
" start-other-node ; echo $?"),
case lists:reverse(string:tokens(Res, "\n")) of
["0" | _] -> ok;
_ -> exit(broker_start_failed, Res)
end,
execute("make -C " ++ plugin_dir() ++ " OTHER_NODE=" ++ Name ++
" OTHER_PORT=" ++ integer_to_list(Port) ++
" OTHER_CONFIG=" ++ Config ++
" OTHER_PLUGINS=" ++ PluginsFile ++
" start-other-node"),
{ok, Conn} = amqp_connection:start(#amqp_params_network{port = Port}),
{ok, Ch} = amqp_connection:open_channel(Conn),
Ch.
stop_other_node({Name, _Port}) ->
?assertCmd("make -C " ++ plugin_dir() ++ " OTHER_NODE=" ++ Name ++
" stop-other-node"),
execute("make -C " ++ plugin_dir() ++ " OTHER_NODE=" ++ Name ++
" stop-other-node"),
timer:sleep(1000).
rabbitmqctl(Args) ->
?assertCmd(
plugin_dir() ++ "/../rabbitmq-server/scripts/rabbitmqctl " ++ Args),
execute(plugin_dir() ++ "/../rabbitmq-server/scripts/rabbitmqctl " ++ Args),
timer:sleep(100).
%% ?assertCmd seems to hang if you background anything. Bah!
execute(Cmd) ->
Res = os:cmd(Cmd ++ " ; echo $?"),
case lists:reverse(string:tokens(Res, "\n")) of
["0" | _] -> ok;
_ -> exit({command_failed, Cmd, Res})
end.
policy(UpstreamSet) ->
rabbit_misc:format("{\"federation-upstream-set\": \"~s\"}", [UpstreamSet]).
@ -107,19 +109,21 @@ plugin_dir() ->
%%----------------------------------------------------------------------------
assert_status(XorQs) ->
assert_status(XorQs, Names) ->
Links = lists:append([links(XorQ) || XorQ <- XorQs]),
Remaining = lists:foldl(fun assert_link_status/2,
rabbit_federation_status:status(), Links),
Remaining = lists:foldl(fun (Link, Status) ->
assert_link_status(Link, Status, Names)
end, rabbit_federation_status:status(), Links),
?assertEqual([], Remaining),
ok.
assert_link_status({DXNameBin, ConnectionName, UXNameBin}, Status) ->
assert_link_status({DXorQNameBin, UpstreamName, UXorQNameBin}, Status,
{TypeName, UpstreamTypeName}) ->
{This, Rest} = lists:partition(
fun(St) ->
pget(connection, St) =:= ConnectionName andalso
pget(name, St) =:= DXNameBin andalso
pget(upstream_name, St) =:= UXNameBin
pget(upstream, St) =:= UpstreamName andalso
pget(TypeName, St) =:= DXorQNameBin andalso
pget(UpstreamTypeName, St) =:= UXorQNameBin
end, Status),
?assertMatch([_], This),
Rest.