Add a mechanism for federation to run multiple nodes in one distributed system, and thus port the rest of the mutli-node tests. There is still a great deal of detritus to be removed...
This commit is contained in:
parent
6a88d64fac
commit
2b556b38be
|
|
@ -30,7 +30,7 @@
|
|||
terminate/2, code_change/3]).
|
||||
|
||||
-import(rabbit_misc, [pget/2]).
|
||||
-import(rabbit_federation_util, [name/1, vhost/1]).
|
||||
-import(rabbit_federation_util, [name/1, vhost/1, pgname/1]).
|
||||
|
||||
-record(state, {upstream,
|
||||
upstream_params,
|
||||
|
|
@ -192,16 +192,16 @@ cast(Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- all()].
|
|||
cast(XName, Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- x(XName)].
|
||||
|
||||
join(Name) ->
|
||||
pg2_fixed:create(Name),
|
||||
ok = pg2_fixed:join(Name, self()).
|
||||
pg2_fixed:create(pgname(Name)),
|
||||
ok = pg2_fixed:join(pgname(Name), self()).
|
||||
|
||||
all() ->
|
||||
pg2_fixed:create(rabbit_federation_exchanges),
|
||||
pg2_fixed:get_members(rabbit_federation_exchanges).
|
||||
pg2_fixed:create(pgname(rabbit_federation_exchanges)),
|
||||
pg2_fixed:get_members(pgname(rabbit_federation_exchanges)).
|
||||
|
||||
x(XName) ->
|
||||
pg2_fixed:create({rabbit_federation_exchange, XName}),
|
||||
pg2_fixed:get_members({rabbit_federation_exchange, XName}).
|
||||
pg2_fixed:create(pgname({rabbit_federation_exchange, XName})),
|
||||
pg2_fixed:get_members(pgname({rabbit_federation_exchange, XName})).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@
|
|||
terminate/2, code_change/3]).
|
||||
|
||||
-import(rabbit_misc, [pget/2]).
|
||||
-import(rabbit_federation_util, [name/1]).
|
||||
-import(rabbit_federation_util, [name/1, pgname/1]).
|
||||
|
||||
-record(not_started, {queue, run, upstream, upstream_params}).
|
||||
-record(state, {queue, run, conn, ch, dconn, dch, upstream, upstream_params,
|
||||
|
|
@ -46,16 +46,16 @@ cast(Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- all()].
|
|||
cast(QName, Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- q(QName)].
|
||||
|
||||
join(Name) ->
|
||||
pg2_fixed:create(Name),
|
||||
ok = pg2_fixed:join(Name, self()).
|
||||
pg2_fixed:create(pgname(Name)),
|
||||
ok = pg2_fixed:join(pgname(Name), self()).
|
||||
|
||||
all() ->
|
||||
pg2_fixed:create(rabbit_federation_queues),
|
||||
pg2_fixed:get_members(rabbit_federation_queues).
|
||||
pg2_fixed:create(pgname(rabbit_federation_queues)),
|
||||
pg2_fixed:get_members(pgname(rabbit_federation_queues)).
|
||||
|
||||
q(QName) ->
|
||||
pg2_fixed:create({rabbit_federation_queue, QName}),
|
||||
pg2_fixed:get_members({rabbit_federation_queue, QName}).
|
||||
pg2_fixed:create(pgname({rabbit_federation_queue, QName})),
|
||||
pg2_fixed:get_members(pgname({rabbit_federation_queue, QName})).
|
||||
|
||||
federation_up() ->
|
||||
proplists:is_defined(rabbitmq_federation,
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@
|
|||
-include("rabbit_federation.hrl").
|
||||
|
||||
-export([should_forward/3, find_upstreams/2, already_seen/2]).
|
||||
-export([validate_arg/3, fail/2, name/1, vhost/1, r/1]).
|
||||
-export([validate_arg/3, fail/2, name/1, vhost/1, r/1, pgname/1]).
|
||||
|
||||
-import(rabbit_misc, [pget_or_die/2, pget/3]).
|
||||
|
||||
|
|
@ -65,3 +65,9 @@ vhost(#amqp_params_network{virtual_host = VHost}) -> VHost.
|
|||
|
||||
r(#exchange{name = XName}) -> XName;
|
||||
r(#amqqueue{name = QName}) -> QName.
|
||||
|
||||
pgname(Name) ->
|
||||
case application:get_env(rabbitmq_federation, pgroup_name_cluster_id) of
|
||||
{ok, false} -> Name;
|
||||
{ok, true} -> {rabbit_nodes:cluster_name(), Name}
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -4,5 +4,5 @@
|
|||
{modules, []},
|
||||
{registered, []},
|
||||
{mod, {rabbit_federation_app, []}},
|
||||
{env, []},
|
||||
{env, [{pgroup_name_cluster_id, false}]},
|
||||
{applications, [kernel, stdlib, rabbit, amqp_client]}]}.
|
||||
|
|
|
|||
|
|
@ -27,6 +27,8 @@
|
|||
-import(rabbit_federation_test_util,
|
||||
[expect/3, expect_empty/2, set_param/3, clear_param/2,
|
||||
set_pol/3, clear_pol/1, plugin_dir/0, policy/1,
|
||||
set_policy_upstream/4, set_policy_upstreams/3,
|
||||
disambiguate/1, no_plugins/1,
|
||||
start_other_node/1, start_other_node/2, start_other_node/3]).
|
||||
|
||||
-define(UPSTREAM_DOWNSTREAM, [x(<<"upstream">>),
|
||||
|
|
@ -161,61 +163,66 @@ unbind_on_unbind_test() ->
|
|||
delete_queue(Ch, Q2)
|
||||
end, ?UPSTREAM_DOWNSTREAM).
|
||||
|
||||
user_id_test() ->
|
||||
with_ch(
|
||||
fun (Ch) ->
|
||||
stop_other_node(?HARE),
|
||||
start_other_node(?HARE),
|
||||
{ok, Conn2} = amqp_connection:start(
|
||||
#amqp_params_network{username = <<"hare-user">>,
|
||||
password = <<"hare-user">>,
|
||||
port = 5673}),
|
||||
{ok, Ch2} = amqp_connection:open_channel(Conn2),
|
||||
declare_exchange(Ch2, x(<<"upstream">>)),
|
||||
declare_exchange(Ch, x(<<"hare.downstream">>)),
|
||||
Q = bind_queue(Ch, <<"hare.downstream">>, <<"key">>),
|
||||
await_binding(?HARE, <<"upstream">>, <<"key">>),
|
||||
user_id_with() -> disambiguate(start_ab).
|
||||
user_id([Rabbit, Hare]) ->
|
||||
set_policy_upstream(Rabbit, <<"^test$">>, <<"amqp://localhost:5673">>, []),
|
||||
Perm = fun (F, A) ->
|
||||
ok = rpc:call(pget(node, Hare),
|
||||
rabbit_auth_backend_internal, F, A)
|
||||
end,
|
||||
Perm(add_user, [<<"hare-user">>, <<"hare-user">>]),
|
||||
Perm(set_permissions, [<<"hare-user">>,
|
||||
<<"/">>, <<".*">>, <<".*">>, <<".*">>]),
|
||||
|
||||
Msg = #amqp_msg{props = #'P_basic'{user_id = <<"hare-user">>},
|
||||
payload = <<"HELLO">>},
|
||||
{_, Ch} = rabbit_test_util:connect(Rabbit),
|
||||
{ok, Conn2} = amqp_connection:start(
|
||||
#amqp_params_network{username = <<"hare-user">>,
|
||||
password = <<"hare-user">>,
|
||||
port = pget(port, Hare)}),
|
||||
{ok, Ch2} = amqp_connection:open_channel(Conn2),
|
||||
|
||||
SafeUri = fun (H) ->
|
||||
{array, [{table, Recv}]} =
|
||||
rabbit_misc:table_lookup(
|
||||
H, <<"x-received-from">>),
|
||||
?assertEqual(
|
||||
{longstr, <<"amqp://localhost:5673">>},
|
||||
rabbit_misc:table_lookup(Recv, <<"uri">>))
|
||||
end,
|
||||
ExpectUser =
|
||||
fun (ExpUser) ->
|
||||
fun () ->
|
||||
receive
|
||||
{#'basic.deliver'{},
|
||||
#amqp_msg{props = Props,
|
||||
payload = Payload}} ->
|
||||
#'P_basic'{user_id = ActUser,
|
||||
headers = Headers} = Props,
|
||||
SafeUri(Headers),
|
||||
?assertEqual(<<"HELLO">>, Payload),
|
||||
?assertEqual(ExpUser, ActUser)
|
||||
end
|
||||
end
|
||||
end,
|
||||
declare_exchange(Ch2, x(<<"test">>)),
|
||||
declare_exchange(Ch, x(<<"test">>)),
|
||||
Q = bind_queue(Ch, <<"test">>, <<"key">>),
|
||||
await_binding(Hare, <<"test">>, <<"key">>),
|
||||
|
||||
publish(Ch2, <<"upstream">>, <<"key">>, Msg),
|
||||
expect(Ch, Q, ExpectUser(undefined)),
|
||||
Msg = #amqp_msg{props = #'P_basic'{user_id = <<"hare-user">>},
|
||||
payload = <<"HELLO">>},
|
||||
|
||||
set_param("federation-upstream", "local5673",
|
||||
"{\"uri\": \"amqp://localhost:5673\","
|
||||
" \"trust-user-id\": true}"),
|
||||
SafeUri = fun (H) ->
|
||||
{array, [{table, Recv}]} =
|
||||
rabbit_misc:table_lookup(
|
||||
H, <<"x-received-from">>),
|
||||
?assertEqual(
|
||||
{longstr, <<"amqp://localhost:5673">>},
|
||||
rabbit_misc:table_lookup(Recv, <<"uri">>))
|
||||
end,
|
||||
ExpectUser =
|
||||
fun (ExpUser) ->
|
||||
fun () ->
|
||||
receive
|
||||
{#'basic.deliver'{},
|
||||
#amqp_msg{props = Props,
|
||||
payload = Payload}} ->
|
||||
#'P_basic'{user_id = ActUser,
|
||||
headers = Headers} = Props,
|
||||
SafeUri(Headers),
|
||||
?assertEqual(<<"HELLO">>, Payload),
|
||||
?assertEqual(ExpUser, ActUser)
|
||||
end
|
||||
end
|
||||
end,
|
||||
|
||||
publish(Ch2, <<"upstream">>, <<"key">>, Msg),
|
||||
expect(Ch, Q, ExpectUser(<<"hare-user">>)),
|
||||
publish(Ch2, <<"test">>, <<"key">>, Msg),
|
||||
expect(Ch, Q, ExpectUser(undefined)),
|
||||
|
||||
delete_exchange(Ch, <<"hare.downstream">>),
|
||||
delete_exchange(Ch2, <<"upstream">>)
|
||||
end, []).
|
||||
set_policy_upstream(Rabbit, <<"^test$">>, <<"amqp://localhost:5673">>,
|
||||
[{<<"trust-user-id">>, true}]),
|
||||
|
||||
publish(Ch2, <<"test">>, <<"key">>, Msg),
|
||||
expect(Ch, Q, ExpectUser(<<"hare-user">>)),
|
||||
|
||||
ok.
|
||||
|
||||
%% In order to test that unbinds get sent we deliberately set up a
|
||||
%% broken config - with topic upstream and fanout downstream. You
|
||||
|
|
@ -254,19 +261,22 @@ no_loop_test() ->
|
|||
end, [x(<<"one">>),
|
||||
x(<<"two">>)]).
|
||||
|
||||
binding_recovery_with() -> start_ab.
|
||||
binding_recovery([_Rabbit, Hare]) ->
|
||||
binding_recovery_with() -> disambiguate(
|
||||
fun (Init) ->
|
||||
rabbit_test_configs:start_nodes(Init, [a])
|
||||
end).
|
||||
binding_recovery([Rabbit]) ->
|
||||
Q = <<"durable-Q">>,
|
||||
{_, Ch} = rabbit_test_util:connect(Hare),
|
||||
{_, Ch} = rabbit_test_util:connect(Rabbit),
|
||||
|
||||
rabbit_federation_test_util:set_upstream(
|
||||
Hare, <<"hare">>, <<"amqp://localhost:5673">>),
|
||||
Rabbit, <<"rabbit">>, <<"amqp://localhost:5672">>),
|
||||
rabbit_federation_test_util:set_upstream_set(
|
||||
Hare, <<"upstream">>,
|
||||
[{<<"hare">>, [{<<"exchange">>, <<"upstream">>}]},
|
||||
{<<"hare">>, [{<<"exchange">>, <<"upstream2">>}]}]),
|
||||
Rabbit, <<"upstream">>,
|
||||
[{<<"rabbit">>, [{<<"exchange">>, <<"upstream">>}]},
|
||||
{<<"rabbit">>, [{<<"exchange">>, <<"upstream2">>}]}]),
|
||||
rabbit_federation_test_util:set_policy(
|
||||
Hare, <<"fed">>, <<"^fed\\.">>, <<"upstream">>),
|
||||
Rabbit, <<"fed">>, <<"^fed\\.">>, <<"upstream">>),
|
||||
|
||||
declare_all(Ch, [x(<<"upstream2">>) | ?UPSTREAM_DOWNSTREAM]),
|
||||
#'queue.declare_ok'{} =
|
||||
|
|
@ -276,22 +286,22 @@ binding_recovery([_Rabbit, Hare]) ->
|
|||
timer:sleep(100), %% To get the suffix written
|
||||
|
||||
%% i.e. don't clean up
|
||||
Hare2 = rabbit_test_configs:restart_node(Hare),
|
||||
Rabbit2 = rabbit_test_configs:restart_node(Rabbit),
|
||||
|
||||
?assert(none =/= suffix(Hare2, <<"hare">>, "upstream")),
|
||||
?assert(none =/= suffix(Hare2, <<"hare">>, "upstream2")),
|
||||
?assert(none =/= suffix(Rabbit2, <<"rabbit">>, "upstream")),
|
||||
?assert(none =/= suffix(Rabbit2, <<"rabbit">>, "upstream2")),
|
||||
|
||||
%% again don't clean up
|
||||
Hare3 = rabbit_test_configs:restart_node(Hare2),
|
||||
{_, Ch3} = rabbit_test_util:connect(Hare3),
|
||||
Rabbit3 = rabbit_test_configs:restart_node(Rabbit2),
|
||||
{_, Ch3} = rabbit_test_util:connect(Rabbit3),
|
||||
|
||||
rabbit_test_util:set_param(
|
||||
Hare, <<"federation-upstream-set">>, <<"upstream">>,
|
||||
[[{<<"upstream">>, <<"hare">>}, {<<"exchange">>, <<"upstream">>}]]),
|
||||
Rabbit, <<"federation-upstream-set">>, <<"upstream">>,
|
||||
[[{<<"upstream">>, <<"rabbit">>}, {<<"exchange">>, <<"upstream">>}]]),
|
||||
|
||||
publish_expect(Ch3, <<"upstream">>, <<"key">>, Q, <<"HELLO">>),
|
||||
?assert(none =/= suffix(Hare3, <<"hare">>, "upstream")),
|
||||
?assertEqual(none, suffix(Hare3, <<"hare">>, "upstream2")),
|
||||
?assert(none =/= suffix(Rabbit3, <<"rabbit">>, "upstream")),
|
||||
?assertEqual(none, suffix(Rabbit3, <<"rabbit">>, "upstream2")),
|
||||
delete_all(Ch3, [x(<<"upstream2">>) | ?UPSTREAM_DOWNSTREAM]),
|
||||
delete_queue(Ch3, Q),
|
||||
ok.
|
||||
|
|
@ -307,7 +317,7 @@ n(Nodename) ->
|
|||
{_, NodeHost} = rabbit_nodes:parts(node()),
|
||||
rabbit_nodes:make({Nodename, NodeHost}).
|
||||
|
||||
restart_upstream_with() -> start_ab.
|
||||
restart_upstream_with() -> disambiguate(start_ab).
|
||||
restart_upstream([Rabbit, Hare]) ->
|
||||
{_, Downstream} = rabbit_test_util:connect(Rabbit),
|
||||
{_, Upstream} = rabbit_test_util:connect(Hare),
|
||||
|
|
@ -355,18 +365,15 @@ restart_upstream([Rabbit, Hare]) ->
|
|||
%% flopsy, mopsy and cottontail, connected in a ring with max_hops = 2
|
||||
%% for each connection. We should not see any duplicates.
|
||||
|
||||
max_hops_with() -> start_abc.
|
||||
max_hops_with() -> disambiguate(start_abc).
|
||||
max_hops([Flopsy, Mopsy, Cottontail]) ->
|
||||
[begin
|
||||
rabbit_federation_test_util:set_upstream(
|
||||
Cfg, <<"ring">>,
|
||||
list_to_binary("amqp://localhost:" ++ integer_to_list(Port)),
|
||||
[{<<"max-hops">>, 2}]),
|
||||
rabbit_federation_test_util:set_policy1(
|
||||
Cfg, <<"ring">>, <<"^ring$">>, <<"ring">>)
|
||||
end || {Cfg, Port} <- [{Flopsy, pget(port, Cottontail)},
|
||||
{Mopsy, pget(port, Flopsy)},
|
||||
{Cottontail, pget(port, Mopsy)}]],
|
||||
[set_policy_upstream(
|
||||
Cfg, <<"^ring$">>,
|
||||
list_to_binary("amqp://localhost:" ++ integer_to_list(Port)),
|
||||
[{<<"max-hops">>, 2}])
|
||||
|| {Cfg, Port} <- [{Flopsy, pget(port, Cottontail)},
|
||||
{Mopsy, pget(port, Flopsy)},
|
||||
{Cottontail, pget(port, Mopsy)}]],
|
||||
|
||||
{_, FlopsyCh} = rabbit_test_util:connect(Flopsy),
|
||||
{_, MopsyCh} = rabbit_test_util:connect(Mopsy),
|
||||
|
|
@ -399,17 +406,14 @@ max_hops([Flopsy, Mopsy, Cottontail]) ->
|
|||
|
||||
%% Two nodes, both federated with each other, and max_hops set to a
|
||||
%% high value. Things should not get out of hand.
|
||||
cycle_detection_with() -> start_ab.
|
||||
cycle_detection_with() -> disambiguate(start_ab).
|
||||
cycle_detection([Cycle1, Cycle2]) ->
|
||||
[begin
|
||||
rabbit_federation_test_util:set_upstream(
|
||||
Cfg, <<"other">>,
|
||||
list_to_binary("amqp://localhost:" ++ integer_to_list(Port)),
|
||||
[{<<"max-hops">>, 10}]),
|
||||
rabbit_federation_test_util:set_policy1(
|
||||
Cfg, <<"cycle">>, <<"^cycle$">>, <<"other">>)
|
||||
end || {Cfg, Port} <- [{Cycle1, pget(port, Cycle2)},
|
||||
{Cycle2, pget(port, Cycle1)}]],
|
||||
[set_policy_upstream(
|
||||
Cfg, <<"^cycle$">>,
|
||||
list_to_binary("amqp://localhost:" ++ integer_to_list(Port)),
|
||||
[{<<"max-hops">>, 10}])
|
||||
|| {Cfg, Port} <- [{Cycle1, pget(port, Cycle2)},
|
||||
{Cycle2, pget(port, Cycle1)}]],
|
||||
|
||||
{_, Cycle1Ch} = rabbit_test_util:connect(Cycle1),
|
||||
{_, Cycle2Ch} = rabbit_test_util:connect(Cycle2),
|
||||
|
|
@ -461,53 +465,56 @@ cycle_detection([Cycle1, Cycle2]) ->
|
|||
%% Also we check that when we tear down the original bindings
|
||||
%% that we get rid of everything again.
|
||||
|
||||
binding_propagation_test() ->
|
||||
Dylan = start_other_node(?DYLAN),
|
||||
Bugs = start_other_node(?BUGS),
|
||||
Jessica = start_other_node(?JESSICA),
|
||||
binding_propagation_with() -> disambiguate(start_abc).
|
||||
binding_propagation([Dylan, Bugs, Jessica]) ->
|
||||
set_policy_upstream( Dylan, <<"^x$">>, <<"amqp://localhost:5674">>, []),
|
||||
set_policy_upstream( Bugs, <<"^x$">>, <<"amqp://localhost:5672">>, []),
|
||||
set_policy_upstreams(Jessica, <<"^x$">>, [{<<"amqp://localhost:5672">>, []},
|
||||
{<<"amqp://localhost:5673">>,
|
||||
[{<<"max-hops">>, 2}]}]),
|
||||
{_, DylanCh} = rabbit_test_util:connect(Dylan),
|
||||
{_, BugsCh} = rabbit_test_util:connect(Bugs),
|
||||
{_, JessicaCh} = rabbit_test_util:connect(Jessica),
|
||||
|
||||
declare_exchange(Dylan, x(<<"x">>)),
|
||||
declare_exchange(Bugs, x(<<"x">>)),
|
||||
declare_exchange(Jessica, x(<<"x">>)),
|
||||
declare_exchange(DylanCh, x(<<"x">>)),
|
||||
declare_exchange(BugsCh, x(<<"x">>)),
|
||||
declare_exchange(JessicaCh, x(<<"x">>)),
|
||||
|
||||
Q1 = bind_queue(Dylan, <<"x">>, <<"dylan">>),
|
||||
Q2 = bind_queue(Bugs, <<"x">>, <<"bugs">>),
|
||||
Q3 = bind_queue(Jessica, <<"x">>, <<"jessica">>),
|
||||
Q1 = bind_queue(DylanCh, <<"x">>, <<"dylan">>),
|
||||
Q2 = bind_queue(BugsCh, <<"x">>, <<"bugs">>),
|
||||
Q3 = bind_queue(JessicaCh, <<"x">>, <<"jessica">>),
|
||||
|
||||
await_binding( ?DYLAN, <<"x">>, <<"jessica">>, 2),
|
||||
await_bindings(?DYLAN, <<"x">>, [<<"bugs">>, <<"dylan">>]),
|
||||
await_bindings(?BUGS, <<"x">>, [<<"jessica">>, <<"bugs">>]),
|
||||
await_bindings(?JESSICA, <<"x">>, [<<"dylan">>, <<"jessica">>]),
|
||||
await_binding( Dylan, <<"x">>, <<"jessica">>, 2),
|
||||
await_bindings(Dylan, <<"x">>, [<<"bugs">>, <<"dylan">>]),
|
||||
await_bindings(Bugs, <<"x">>, [<<"jessica">>, <<"bugs">>]),
|
||||
await_bindings(Jessica, <<"x">>, [<<"dylan">>, <<"jessica">>]),
|
||||
|
||||
delete_queue(Dylan, Q1),
|
||||
delete_queue(Bugs, Q2),
|
||||
delete_queue(Jessica, Q3),
|
||||
delete_queue(DylanCh, Q1),
|
||||
delete_queue(BugsCh, Q2),
|
||||
delete_queue(JessicaCh, Q3),
|
||||
|
||||
await_bindings(?DYLAN, <<"x">>, []),
|
||||
await_bindings(?BUGS, <<"x">>, []),
|
||||
await_bindings(?JESSICA, <<"x">>, []),
|
||||
await_bindings(Dylan, <<"x">>, []),
|
||||
await_bindings(Bugs, <<"x">>, []),
|
||||
await_bindings(Jessica, <<"x">>, []),
|
||||
|
||||
stop_other_node(?DYLAN),
|
||||
stop_other_node(?BUGS),
|
||||
stop_other_node(?JESSICA),
|
||||
ok.
|
||||
|
||||
upstream_has_no_federation_test() ->
|
||||
with_ch(
|
||||
fun (Downstream) ->
|
||||
stop_other_node(?HARE),
|
||||
Upstream = start_other_node(
|
||||
?HARE, "hare-no-federation", "no_plugins"),
|
||||
declare_exchange(Upstream, x(<<"upstream">>)),
|
||||
declare_exchange(Downstream, x(<<"hare.downstream">>)),
|
||||
Q = bind_queue(Downstream, <<"hare.downstream">>, <<"routing">>),
|
||||
await_binding(?HARE, <<"upstream">>, <<"routing">>),
|
||||
publish(Upstream, <<"upstream">>, <<"routing">>, <<"HELLO">>),
|
||||
expect(Downstream, Q, [<<"HELLO">>]),
|
||||
delete_exchange(Downstream, <<"hare.downstream">>),
|
||||
delete_exchange(Upstream, <<"upstream">>),
|
||||
stop_other_node(?HARE)
|
||||
end, []).
|
||||
upstream_has_no_federation_with() ->
|
||||
disambiguate(fun (Init) ->
|
||||
Inits = [Init, no_plugins(Init)],
|
||||
rabbit_test_configs:start_nodes(Inits, [a, b])
|
||||
end).
|
||||
upstream_has_no_federation([Rabbit, Hare]) ->
|
||||
set_policy_upstream(Rabbit, <<"^test$">>, <<"amqp://localhost:5673">>, []),
|
||||
{_, Downstream} = rabbit_test_util:connect(Rabbit),
|
||||
{_, Upstream} = rabbit_test_util:connect(Hare),
|
||||
declare_exchange(Upstream, x(<<"test">>)),
|
||||
declare_exchange(Downstream, x(<<"test">>)),
|
||||
Q = bind_queue(Downstream, <<"test">>, <<"routing">>),
|
||||
await_binding(Hare, <<"test">>, <<"routing">>),
|
||||
publish(Upstream, <<"test">>, <<"routing">>, <<"HELLO">>),
|
||||
expect(Downstream, Q, [<<"HELLO">>]),
|
||||
ok.
|
||||
|
||||
dynamic_reconfiguration_test() ->
|
||||
with_ch(
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
-module(rabbit_federation_queue_test).
|
||||
|
||||
-compile(export_all).
|
||||
-include("rabbit_federation.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
|
|
@ -25,6 +26,8 @@
|
|||
|
||||
-import(rabbit_federation_test_util, [expect/3, set_param/3, clear_param/2,
|
||||
set_pol/3, clear_pol/1, policy/1, start_other_node/1,
|
||||
set_policy_upstream/4, set_policy_upstreams/3,
|
||||
disambiguate/1,
|
||||
stop_other_node/1]).
|
||||
|
||||
-define(UPSTREAM_DOWNSTREAM, [q(<<"upstream">>),
|
||||
|
|
@ -121,29 +124,29 @@ federate_unfederate_test() ->
|
|||
%% Downstream: rabbit-test, port 5672
|
||||
%% Upstream: hare, port 5673
|
||||
|
||||
restart_upstream_test() ->
|
||||
with_ch(
|
||||
fun (Downstream) ->
|
||||
stop_other_node(?HARE),
|
||||
Upstream = start_other_node(?HARE),
|
||||
restart_upstream_with() -> disambiguate(start_ab).
|
||||
restart_upstream([Rabbit, Hare]) ->
|
||||
set_policy_upstream(Rabbit, <<"^test$">>, <<"amqp://localhost:5673">>, []),
|
||||
|
||||
declare_queue(Upstream, q(<<"upstream">>)),
|
||||
declare_queue(Downstream, q(<<"hare.downstream">>)),
|
||||
Seq = lists:seq(1, 100),
|
||||
[publish(Upstream, <<>>, <<"upstream">>, <<"bulk">>) || _ <- Seq],
|
||||
expect(Upstream, <<"upstream">>, repeat(25, <<"bulk">>)),
|
||||
expect(Downstream, <<"hare.downstream">>, repeat(25, <<"bulk">>)),
|
||||
{_, Downstream} = rabbit_test_util:connect(Rabbit),
|
||||
{_, Upstream} = rabbit_test_util:connect(Hare),
|
||||
|
||||
stop_other_node(?HARE),
|
||||
Upstream2 = start_other_node(?HARE),
|
||||
declare_queue(Upstream, q(<<"test">>)),
|
||||
declare_queue(Downstream, q(<<"test">>)),
|
||||
Seq = lists:seq(1, 100),
|
||||
[publish(Upstream, <<>>, <<"test">>, <<"bulk">>) || _ <- Seq],
|
||||
expect(Upstream, <<"test">>, repeat(25, <<"bulk">>)),
|
||||
expect(Downstream, <<"test">>, repeat(25, <<"bulk">>)),
|
||||
|
||||
expect(Upstream2, <<"upstream">>, repeat(25, <<"bulk">>)),
|
||||
expect(Downstream, <<"hare.downstream">>, repeat(25, <<"bulk">>)),
|
||||
expect_empty(Upstream2, <<"upstream">>),
|
||||
expect_empty(Downstream, <<"hare.downstream">>),
|
||||
Hare2 = rabbit_test_configs:restart_node(Hare),
|
||||
{_, Upstream2} = rabbit_test_util:connect(Hare2),
|
||||
|
||||
stop_other_node(?HARE)
|
||||
end, []).
|
||||
expect(Upstream2, <<"test">>, repeat(25, <<"bulk">>)),
|
||||
expect(Downstream, <<"test">>, repeat(25, <<"bulk">>)),
|
||||
expect_empty(Upstream2, <<"test">>),
|
||||
expect_empty(Downstream, <<"test">>),
|
||||
|
||||
ok.
|
||||
|
||||
upstream_has_no_federation_test() ->
|
||||
%% TODO
|
||||
|
|
|
|||
|
|
@ -68,10 +68,44 @@ set_policy(Cfg, Name, Pattern, UpstreamSet) ->
|
|||
Cfg, Name, Pattern, <<"all">>,
|
||||
[{<<"federation-upstream-set">>, UpstreamSet}]).
|
||||
|
||||
set_policy1(Cfg, Name, Pattern, UpstreamSet) ->
|
||||
set_policy1(Cfg, Name, Pattern, Upstream) ->
|
||||
rabbit_test_util:set_policy(
|
||||
Cfg, Name, Pattern, <<"all">>,
|
||||
[{<<"federation-upstream">>, UpstreamSet}]).
|
||||
[{<<"federation-upstream">>, Upstream}]).
|
||||
|
||||
set_policy_upstream(Cfg, Pattern, URI, Extra) ->
|
||||
set_policy_upstreams(Cfg, Pattern, [{URI, Extra}]).
|
||||
|
||||
set_policy_upstreams(Cfg, Pattern, URIExtras) ->
|
||||
put(upstream_num, 1),
|
||||
[set_upstream(Cfg, gen_upstream_name(), URI, Extra)
|
||||
|| {URI, Extra} <- URIExtras],
|
||||
set_policy(Cfg, Pattern, Pattern, <<"all">>).
|
||||
|
||||
gen_upstream_name() ->
|
||||
list_to_binary("upstream-" ++ integer_to_list(next_upstream_num())).
|
||||
|
||||
next_upstream_num() ->
|
||||
R = get(upstream_num) + 1,
|
||||
put (upstream_num, R),
|
||||
R.
|
||||
|
||||
%% Make sure that even though multiple nodes are in a single
|
||||
%% distributed system, we still keep all our process groups separate.
|
||||
disambiguate(Rest) ->
|
||||
[Rest,
|
||||
fun (Cfgs) ->
|
||||
[rpc:call(pget(node, Cfg), application, set_env,
|
||||
[rabbitmq_federation, pgroup_name_cluster_id, true])
|
||||
|| Cfg <- Cfgs],
|
||||
Cfgs
|
||||
end].
|
||||
|
||||
no_plugins(Cfg) ->
|
||||
[{K, case K of
|
||||
plugins -> none;
|
||||
_ -> V
|
||||
end} || {K, V} <- Cfg].
|
||||
|
||||
%% set_param(Component, Name, Value) ->
|
||||
%% rabbitmqctl(fmt("set_parameter ~s ~s '~s'", [Component, Name, Value])).
|
||||
|
|
|
|||
Loading…
Reference in New Issue