Merge pull request #49 from rabbitmq/rabbitmq-federation-management-17

Link restart
This commit is contained in:
Michael Klishin 2016-12-31 01:34:44 +08:00 committed by GitHub
commit 0469d84899
5 changed files with 210 additions and 33 deletions

View File

@ -23,7 +23,7 @@
%% Supervises the upstream links for an exchange or queue.
-export([start_link/1, adjust/3]).
-export([start_link/1, adjust/3, restart/2]).
-export([init/1]).
start_link(XorQ) ->
@ -70,6 +70,11 @@ adjust(Sup, Q = #amqqueue{}, {upstream_set, _}) ->
adjust(Sup, XorQ, {clear_upstream_set, _}) ->
adjust(Sup, XorQ, everything).
restart(Sup, Upstream) ->
ok = supervisor2:terminate_child(Sup, Upstream),
{ok, _Pid} = supervisor2:restart_child(Sup, Upstream),
ok.
start(Sup, Upstream, XorQ) ->
{ok, _Pid} = supervisor2:start_child(Sup, spec(Upstream, XorQ)),
ok.

View File

@ -22,7 +22,7 @@
-export([start_link/0]).
-export([report/4, remove_exchange_or_queue/1, remove/2, status/0]).
-export([report/4, remove_exchange_or_queue/1, remove/2, status/0, lookup/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@ -33,14 +33,15 @@
-define(ETS_NAME, ?MODULE).
-record(state, {}).
-record(entry, {key, uri, status, timestamp}).
-record(entry, {key, uri, status, timestamp, id, supervisor, upstream}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
report(Upstream, UParams, XorQName, Status) ->
gen_server:cast(?SERVER, {report, Upstream, UParams, XorQName, Status,
calendar:local_time()}).
[Supervisor | _] = get('$ancestors'),
gen_server:cast(?SERVER, {report, Supervisor, Upstream, UParams, XorQName,
Status, calendar:local_time()}).
remove_exchange_or_queue(XorQName) ->
gen_server:call(?SERVER, {remove_exchange_or_queue, XorQName}, infinity).
@ -51,6 +52,9 @@ remove(Upstream, XorQName) ->
status() ->
gen_server:call(?SERVER, status, infinity).
lookup(Id) ->
gen_server:call(?SERVER, {lookup, Id}, infinity).
init([]) ->
?ETS_NAME = ets:new(?ETS_NAME,
[named_table, {keypos, #entry.key}, private]),
@ -68,16 +72,33 @@ handle_call({remove, Upstream, XorQName}, _From, State) ->
end,
{reply, ok, State};
handle_call({lookup, Id}, _From, State) ->
Link = case ets:match_object(?ETS_NAME, match_id(Id)) of
[Entry] -> [{key, Entry#entry.key},
{uri, Entry#entry.uri},
{status, Entry#entry.status},
{timestamp, Entry#entry.timestamp},
{id, Entry#entry.id},
{supervisor, Entry#entry.supervisor},
{upstream, Entry#entry.upstream}];
[] -> not_found
end,
{reply, Link, State};
handle_call(status, _From, State) ->
Entries = ets:tab2list(?ETS_NAME),
{reply, [format(Entry) || Entry <- Entries], State}.
handle_cast({report, Upstream, #upstream_params{safe_uri = URI},
handle_cast({report, Supervisor, Upstream, #upstream_params{safe_uri = URI},
XorQName, Status, Timestamp}, State) ->
Entry = #entry{key = key(XorQName, Upstream),
Key = key(XorQName, Upstream),
Entry = #entry{key = Key,
status = Status,
uri = URI,
timestamp = Timestamp},
timestamp = Timestamp,
supervisor = Supervisor,
upstream = Upstream,
id = unique_id(Key)},
true = ets:insert(?ETS_NAME, Entry),
rabbit_event:notify(federation_link_status, format(Entry)),
{noreply, State}.
@ -100,7 +121,8 @@ format(#entry{status = Status,
identity(#entry{key = {#resource{virtual_host = VHost,
kind = Type,
name = XorQNameBin},
UpstreamName, UXorQNameBin}}) ->
UpstreamName, UXorQNameBin},
id = Id}) ->
case Type of
exchange -> [{exchange, XorQNameBin},
{upstream_exchange, UXorQNameBin}];
@ -108,7 +130,14 @@ identity(#entry{key = {#resource{virtual_host = VHost,
{upstream_queue, UXorQNameBin}]
end ++ [{type, Type},
{vhost, VHost},
{upstream, UpstreamName}].
{upstream, UpstreamName},
{id, Id}].
unique_id(Key) ->
<< << case N >= 10 of
true -> N - 10 + $a;
false -> N + $0 end >>
|| <<N:4>> <= crypto:hash(sha, term_to_binary(Key)) >>.
split_status({running, ConnName}) -> [{status, running},
{local_connection, ConnName}];
@ -136,4 +165,16 @@ match_entry(Key) ->
#entry{key = Key,
uri = '_',
status = '_',
timestamp = '_'}.
timestamp = '_',
id = '_',
supervisor = '_',
upstream = '_'}.
match_id(Id) ->
#entry{key = '_',
uri = '_',
status = '_',
timestamp = '_',
id = Id,
supervisor = '_',
upstream = '_'}.

View File

@ -30,9 +30,6 @@
set_policy_upstream/5, set_policy_upstreams/4,
no_plugins/1]).
-define(UPSTREAM_DOWNSTREAM, [x(<<"upstream">>),
x(<<"fed.downstream">>)]).
all() ->
[
{group, without_disambiguate},
@ -163,7 +160,7 @@ simple(Config) ->
Q = bind_queue(Ch, <<"fed.downstream">>, <<"key">>),
await_binding(Config, 0, <<"upstream">>, <<"key">>),
publish_expect(Ch, <<"upstream">>, <<"key">>, Q, <<"HELLO">>)
end, ?UPSTREAM_DOWNSTREAM).
end, upstream_downstream()).
multiple_upstreams(Config) ->
with_ch(Config,
@ -190,9 +187,9 @@ multiple_uris(Config) ->
rabbit_ct_client_helpers:close_channels_and_connection(
Config, 0)
end,
WithCh(fun (Ch) -> declare_all(Ch, ?UPSTREAM_DOWNSTREAM) end),
WithCh(fun (Ch) -> declare_all(Ch, upstream_downstream()) end),
expect_uris(Config, 0, URIs),
WithCh(fun (Ch) -> delete_all(Ch, ?UPSTREAM_DOWNSTREAM) end),
WithCh(fun (Ch) -> delete_all(Ch, upstream_downstream()) end),
%% Put back how it was
rabbit_federation_test_util:setup_federation(Config),
ok.
@ -239,7 +236,7 @@ multiple_downstreams(Config) ->
publish(Ch, <<"upstream2">>, <<"key">>, <<"HELLO2">>),
expect(Ch, Q1, [<<"HELLO1">>]),
expect(Ch, Q12, [<<"HELLO1">>, <<"HELLO2">>])
end, ?UPSTREAM_DOWNSTREAM ++
end, upstream_downstream() ++
[x(<<"upstream2">>),
x(<<"fed12.downstream2">>)]).
@ -251,7 +248,7 @@ e2e(Config) ->
await_binding(Config, 0, <<"upstream">>, <<"key">>),
Q = bind_queue(Ch, <<"downstream2">>, <<"key">>),
publish_expect(Ch, <<"upstream">>, <<"key">>, Q, <<"HELLO1">>)
end, ?UPSTREAM_DOWNSTREAM ++ [x(<<"downstream2">>)]).
end, upstream_downstream() ++ [x(<<"downstream2">>)]).
unbind_on_delete(Config) ->
with_ch(Config,
@ -261,7 +258,7 @@ unbind_on_delete(Config) ->
await_binding(Config, 0, <<"upstream">>, <<"key">>),
delete_queue(Ch, Q2),
publish_expect(Ch, <<"upstream">>, <<"key">>, Q1, <<"HELLO">>)
end, ?UPSTREAM_DOWNSTREAM).
end, upstream_downstream()).
unbind_on_unbind(Config) ->
with_ch(Config,
@ -272,7 +269,7 @@ unbind_on_unbind(Config) ->
unbind_queue(Ch, Q2, <<"fed.downstream">>, <<"key">>),
publish_expect(Ch, <<"upstream">>, <<"key">>, Q1, <<"HELLO">>),
delete_queue(Ch, Q2)
end, ?UPSTREAM_DOWNSTREAM).
end, upstream_downstream()).
user_id(Config) ->
[Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@ -394,7 +391,7 @@ binding_recovery(Config) ->
rabbit_federation_test_util:set_policy(Config,
Rabbit, <<"fed">>, <<"^fed\\.">>, <<"upstream">>),
declare_all(Ch, [x(<<"upstream2">>) | ?UPSTREAM_DOWNSTREAM]),
declare_all(Ch, [x(<<"upstream2">>) | upstream_downstream()]),
#'queue.declare_ok'{} =
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
durable = true}),
@ -419,7 +416,7 @@ binding_recovery(Config) ->
publish_expect(Ch3, <<"upstream">>, <<"key">>, Q, <<"HELLO">>),
true = (none =/= suffix(Config, Rabbit, <<"rabbit">>, "upstream")),
none = suffix(Config, Rabbit, <<"rabbit">>, "upstream2"),
delete_all(Ch3, [x(<<"upstream2">>) | ?UPSTREAM_DOWNSTREAM]),
delete_all(Ch3, [x(<<"upstream2">>) | upstream_downstream()]),
delete_queue(Ch3, Q),
ok.
@ -883,3 +880,6 @@ connection_pids(Config, Node) ->
[P || [{pid, P}] <-
rabbit_ct_broker_helpers:rpc(Config, Node,
rabbit_networking, connection_info_all, [[pid]])].
upstream_downstream() ->
[x(<<"upstream">>), x(<<"fed.downstream">>)].

View File

@ -26,9 +26,6 @@
set_upstream/4, clear_upstream/3, set_policy/5, clear_policy/3,
set_policy_upstream/5, set_policy_upstreams/4]).
-define(UPSTREAM_DOWNSTREAM, [q(<<"upstream">>),
q(<<"fed.downstream">>)]).
all() ->
[
{group, without_disambiguate},
@ -122,8 +119,7 @@ simple(Config) ->
with_ch(Config,
fun (Ch) ->
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>)
end, [q(<<"upstream">>),
q(<<"fed.downstream">>)]).
end, upstream_downstream()).
multiple_upstreams(Config) ->
with_ch(Config,
@ -139,9 +135,7 @@ multiple_downstreams(Config) ->
fun (Ch) ->
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>)
end, [q(<<"upstream">>),
q(<<"fed.downstream">>),
q(<<"fed.downstream2">>)]).
end, upstream_downstream() ++ [q(<<"fed.downstream2">>)]).
bidirectional(Config) ->
with_ch(Config,
@ -175,8 +169,7 @@ dynamic_reconfiguration(Config) ->
set_upstream(Config, 0, <<"localhost">>, URI),
set_upstream(Config, 0, <<"localhost">>, URI),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>)
end, [q(<<"upstream">>),
q(<<"fed.downstream">>)]).
end, upstream_downstream()).
federate_unfederate(Config) ->
with_ch(Config,
@ -320,3 +313,6 @@ expect_no_federation(Ch, UpstreamQ, DownstreamQ) ->
publish(Ch, <<>>, UpstreamQ, <<"HELLO">>),
expect_empty(Ch, DownstreamQ),
expect(Ch, UpstreamQ, [<<"HELLO">>]).
upstream_downstream() ->
[q(<<"upstream">>), q(<<"fed.downstream">>)].

View File

@ -0,0 +1,135 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ Federation.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_federation_status_SUITE).
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_federation.hrl").
-compile(export_all).
-import(rabbit_federation_test_util,
[expect/3, expect_empty/2,
set_upstream/4, clear_upstream/3, set_upstream_set/4,
set_policy/5, clear_policy/3,
set_policy_upstream/5, set_policy_upstreams/4,
no_plugins/1]).
all() ->
[
{group, non_parallel_tests}
].
groups() ->
[
{non_parallel_tests, [], [
exchange_status,
queue_status,
lookup_exchange_status,
lookup_queue_status,
lookup_bad_status
]}
].
suite() ->
[{timetrap, {minutes, 5}}].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, ?MODULE}
]),
rabbit_ct_helpers:run_setup_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps() ++
[fun rabbit_federation_test_util:setup_federation/1]).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_group(_, Config) ->
Config.
end_per_group(_, Config) ->
Config.
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
%% -------------------------------------------------------------------
%% Testcases.
%% -------------------------------------------------------------------
exchange_status(Config) ->
exchange_SUITE:with_ch(
Config,
fun (_Ch) ->
[Link] = rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_federation_status, status, []),
true = is_binary(proplists:get_value(id, Link))
end, exchange_SUITE:upstream_downstream()).
queue_status(Config) ->
queue_SUITE:with_ch(
Config,
fun (_Ch) ->
[Link] = rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_federation_status, status, []),
true = is_binary(proplists:get_value(id, Link))
end, queue_SUITE:upstream_downstream()).
lookup_exchange_status(Config) ->
exchange_SUITE:with_ch(
Config,
fun (_Ch) ->
[Link] = rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_federation_status, status, []),
Id = proplists:get_value(id, Link),
Props = rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_federation_status, lookup, [Id]),
lists:all(fun(K) -> lists:keymember(K, 1, Props) end,
[key, uri, status, timestamp, id, supervisor, upstream])
end, exchange_SUITE:upstream_downstream()).
lookup_queue_status(Config) ->
queue_SUITE:with_ch(
Config,
fun (_Ch) ->
[Link] = rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_federation_status, status, []),
Id = proplists:get_value(id, Link),
Props = rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_federation_status, lookup, [Id]),
lists:all(fun(K) -> lists:keymember(K, 1, Props) end,
[key, uri, status, timestamp, id, supervisor, upstream])
end, queue_SUITE:upstream_downstream()).
lookup_bad_status(Config) ->
queue_SUITE:with_ch(
Config,
fun (_Ch) ->
not_found = rabbit_ct_broker_helpers:rpc(
Config, 0,
rabbit_federation_status, lookup, [<<"justmadeitup">>])
end, queue_SUITE:upstream_downstream()).