Cleanup internal exchanges and queues after plugin is disabled or policy removed

rabbitmq-federation#63
[#153011041]
This commit is contained in:
Diana Corbacho 2017-11-20 11:27:59 +00:00
parent f6949e37e8
commit a8953cd6ce
2 changed files with 94 additions and 3 deletions

View File

@ -198,8 +198,14 @@ terminate(Reason, #state{downstream_connection = DConn,
upstream = Upstream,
upstream_params = UParams,
downstream_exchange = XName,
internal_exchange_timer = TRef}) ->
internal_exchange_timer = TRef,
internal_exchange = IntExchange,
queue = Queue}) ->
timer:cancel(TRef),
%% Cleanup of internal queue and exchange
delete_upstream_queue(Conn, Queue),
delete_upstream_exchange(Conn, IntExchange),
rabbit_federation_link_util:ensure_connection_closed(DConn),
rabbit_federation_link_util:ensure_connection_closed(Conn),
rabbit_federation_link_util:log_terminate(Reason, Upstream, UParams, XName),
@ -591,6 +597,10 @@ delete_upstream_exchange(Conn, XNameBin) ->
rabbit_federation_link_util:disposable_channel_call(
Conn, #'exchange.delete'{exchange = XNameBin}).
delete_upstream_queue(Conn, Queue) ->
rabbit_federation_link_util:disposable_channel_call(
Conn, #'queue.delete'{queue = Queue}).
update_headers(#upstream_params{table = Table}, UName, Redelivered, Headers) ->
rabbit_basic:prepend_table_header(
?ROUTING_HEADER, Table ++ [{<<"redelivered">>, bool, Redelivered}] ++

View File

@ -51,7 +51,9 @@ groups() ->
dynamic_reconfiguration,
dynamic_reconfiguration_integrity,
federate_unfederate,
dynamic_plugin_stop_start
dynamic_plugin_stop_start,
dynamic_plugin_cleanup_stop_start,
dynamic_policy_cleanup
]}
]},
{with_disambiguate, [], [
@ -412,6 +414,7 @@ binding_recovery(Config) ->
rabbit_ct_broker_helpers:set_parameter(Config,
Rabbit, <<"federation-upstream-set">>, <<"upstream">>,
[[{<<"upstream">>, <<"rabbit">>}, {<<"exchange">>, <<"upstream">>}]]),
wait_for_federation(120, Config, Rabbit, <<"/">>),
publish_expect(Ch3, <<"upstream">>, <<"key">>, Q, <<"HELLO">>),
true = (none =/= suffix(Config, Rabbit, <<"rabbit">>, "upstream")),
@ -747,9 +750,87 @@ dynamic_plugin_stop_start(Config) ->
end || X <- [X1, X2]],
clear_policy(Config, 0, <<"dyn">>),
assert_connections(Config, 0, [X1, X2], [])
assert_connections(Config, 0, [X1, X2], []),
delete_exchange(Ch, X2)
end, [x(X1)]).
dynamic_plugin_cleanup_stop_start(Config) ->
X1 = <<"dyn.exch1">>,
with_ch(Config,
fun (_Ch) ->
set_policy(Config, 0, <<"dyn">>, <<"^dyn\\.">>, <<"localhost">>),
%% Declare federated exchange - get link
assert_connections(Config, 0, [X1], [<<"localhost">>]),
wait_for_federation(120, Config, 0, <<"/">>),
true = has_internal_federated_exchange(Config, 0, <<"/">>),
true = has_internal_federated_queue(Config, 0, <<"/">>),
%% Disable plugin, link goes
ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0,
"rabbitmq_federation"),
%% Internal exchanges and queues need cleanup
false = has_internal_federated_exchange(Config, 0, <<"/">>),
false = has_internal_federated_queue(Config, 0, <<"/">>),
ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0,
"rabbitmq_federation"),
clear_policy(Config, 0, <<"dyn">>),
assert_connections(Config, 0, [X1], [])
end, [x(X1)]).
dynamic_policy_cleanup(Config) ->
X1 = <<"dyn.exch1">>,
with_ch(Config,
fun (_Ch) ->
set_policy(Config, 0, <<"dyn">>, <<"^dyn\\.">>, <<"localhost">>),
%% Declare federated exchange - get link
assert_connections(Config, 0, [X1], [<<"localhost">>]),
wait_for_federation(120, Config, 0, <<"/">>),
true = has_internal_federated_exchange(Config, 0, <<"/">>),
true = has_internal_federated_queue(Config, 0, <<"/">>),
clear_policy(Config, 0, <<"dyn">>),
timer:sleep(5000),
%% Internal exchanges and queues need cleanup
false = has_internal_federated_exchange(Config, 0, <<"/">>),
false = has_internal_federated_queue(Config, 0, <<"/">>),
clear_policy(Config, 0, <<"dyn">>),
assert_connections(Config, 0, [X1], [])
end, [x(X1)]).
wait_for_federation(0, _, _, _) ->
ok;
wait_for_federation(N, Config, Node, VHost) ->
case has_internal_federated_exchange(Config, Node, VHost) andalso
has_internal_federated_queue(Config, Node, VHost) of
true ->
ok;
false ->
timer:sleep(500),
wait_for_federation(N - 1, Config, Node, VHost)
end.
has_internal_federated_exchange(Config, Node, VHost) ->
lists:any(fun(X) ->
X#exchange.type == 'x-federation-upstream'
end, rabbit_ct_broker_helpers:rpc(Config, Node,
rabbit_exchange, list, [VHost])).
has_internal_federated_queue(Config, Node, VHost) ->
lists:any(
fun(Q) ->
{'longstr', <<"federation">>} ==
rabbit_misc:table_lookup(Q#amqqueue.arguments, <<"x-internal-purpose">>)
end, rabbit_ct_broker_helpers:rpc(Config, Node,
rabbit_amqqueue, list, [VHost])).
%%----------------------------------------------------------------------------
with_ch(Config, Fun, Xs) ->