Fixes for auto-delete exchanges
This commit is contained in:
parent
9ce6c93cb9
commit
b003c245d5
|
|
@ -43,6 +43,8 @@
|
|||
-import(qlc).
|
||||
-import(regexp).
|
||||
|
||||
-define(CHUNK_SIZE, 10).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
-ifdef(use_specs).
|
||||
|
|
@ -224,38 +226,73 @@ delete_bindings(Binding = #binding{}) ->
|
|||
ok = mnesia:delete_object(Route),
|
||||
ok = mnesia:delete_object(ReverseRoute),
|
||||
ok = mnesia:delete_object(durable_routes, Route, write);
|
||||
|
||||
|
||||
% Must be called in a transaction
|
||||
delete_bindings(QueueName) ->
|
||||
% TODO: The head of this list *SHOULD* always be the default exchange
|
||||
% what if somebody nukes it?
|
||||
[_|Exchanges] = exchanges_for_queue(QueueName),
|
||||
lists:foreach(fun (Name) ->
|
||||
Exchange = #exchange{name = Name, auto_delete = true,
|
||||
type = '_', durable = '_',
|
||||
arguments = '_'},
|
||||
ok = mnesia:delete_object(Exchange),
|
||||
ok = mnesia:delete_object(durable_exchanges,
|
||||
Exchange, write)
|
||||
end, Exchanges),
|
||||
|
||||
% TODO: What about auto_delete on durable exchanges?
|
||||
delete_bindings(#binding{exchange_name = '_',
|
||||
queue_name = QueueName,
|
||||
key = '_'}).
|
||||
|
||||
exchanges_for_queue(QueueName) ->
|
||||
MatchHead = #route{binding = #binding{exchange_name = '$1',
|
||||
queue_name = QueueName,
|
||||
key = '_'}},
|
||||
mnesia:dirty_select(route, [{MatchHead, [], ['$1']}]).
|
||||
key = '_'}),
|
||||
lists:foreach(fun(ExchangeName) ->
|
||||
call_with_exchange(ExchangeName,
|
||||
fun(Exchange) ->
|
||||
if Exchange#exchange.auto_delete ->
|
||||
Predicate = fun(E) -> E == QueueName end,
|
||||
case has_bindings(ExchangeName, Predicate) of
|
||||
true -> ok;
|
||||
false -> do_internal_delete(ExchangeName)
|
||||
end;
|
||||
true -> ok
|
||||
end
|
||||
end)
|
||||
end, exchanges_for_queue(QueueName)).
|
||||
|
||||
routes_for_exchange(ExchangeName) ->
|
||||
exchanges_for_queue(QueueName) ->
|
||||
MatchHead = #reverse_route{reverse_binding =
|
||||
#reverse_binding{exchange_name = '$1',
|
||||
queue_name = QueueName,
|
||||
key = '_'}},
|
||||
sets:to_list(sets:from_list(
|
||||
mnesia:dirty_select(reverse_route, [{MatchHead, [], ['$1']}]))).
|
||||
|
||||
|
||||
has_bindings(ExchangeName) ->
|
||||
has_bindings(ExchangeName, fun(_) -> false end).
|
||||
|
||||
has_bindings(ExchangeName, Predicate) ->
|
||||
MatchHead = #route{binding = #binding{exchange_name = ExchangeName,
|
||||
queue_name = '_',
|
||||
key = '$1'}},
|
||||
mnesia:dirty_select(route, [{MatchHead, [], ['$1']}]).
|
||||
queue_name = '$1',
|
||||
key = '_'}},
|
||||
case mnesia:select(route,
|
||||
[{MatchHead, [], ['$1']}], ?CHUNK_SIZE, write) of
|
||||
'$end_of_table' -> ok;
|
||||
{Routes, Continuation} ->
|
||||
case lists:dropwhile(Predicate, Routes) of
|
||||
[] -> continue(Continuation, Predicate);
|
||||
_ -> true
|
||||
end
|
||||
end.
|
||||
|
||||
continue(Continuation, Predicate) ->
|
||||
case mnesia:select(Continuation) of
|
||||
'$end_of_table' -> false;
|
||||
{Routes, Cont} ->
|
||||
case lists:dropwhile(Predicate, Routes) of
|
||||
[] -> continue(Cont, Predicate);
|
||||
_ -> true
|
||||
end
|
||||
end.
|
||||
|
||||
call_with_exchange(Exchange, Fun) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
case mnesia:wread({exchange, Exchange}) of
|
||||
[] -> {error, exchange_not_found};
|
||||
[X] -> Fun(X)
|
||||
end
|
||||
end).
|
||||
|
||||
call_with_exchange_and_queue(Exchange, Queue, Fun) ->
|
||||
% TODO Refactor to avoid duplication with call_with_exchange/2
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
case mnesia:wread({exchange, Exchange}) of
|
||||
|
|
@ -362,14 +399,17 @@ delete(ExchangeName, IfUnused) ->
|
|||
fun () -> internal_delete(ExchangeName, IfUnused) end).
|
||||
|
||||
internal_delete(ExchangeName, _IfUnused = true) ->
|
||||
case routes_for_exchange(ExchangeName) of
|
||||
[] -> do_internal_delete(ExchangeName);
|
||||
_ -> {error, in_use}
|
||||
case has_bindings(ExchangeName) of
|
||||
false -> do_internal_delete(ExchangeName);
|
||||
true -> {error, in_use}
|
||||
end;
|
||||
|
||||
internal_delete(ExchangeName, false) ->
|
||||
do_internal_delete(ExchangeName).
|
||||
|
||||
% TODO: Think about an optional do_internal_delete that takes an Exchange
|
||||
% instead of an Exchange, i.e. something that has already done the lookup
|
||||
% already, e.g. delete_bindings/1
|
||||
do_internal_delete(ExchangeName) ->
|
||||
case mnesia:wread({exchange, ExchangeName}) of
|
||||
[] -> {error, not_found};
|
||||
|
|
|
|||
Loading…
Reference in New Issue