fix indentation and comments
This commit is contained in:
parent
8512cde5b3
commit
b865db3a6d
|
|
@ -37,7 +37,7 @@
|
|||
-export([delete_bindings_for_queue/1]).
|
||||
-export([check_type/1, assert_type/2, topic_matches/2]).
|
||||
|
||||
% EXTENDED API
|
||||
%% EXTENDED API
|
||||
-export([list_exchange_bindings/1]).
|
||||
|
||||
-import(mnesia).
|
||||
|
|
@ -69,7 +69,7 @@
|
|||
-spec(route/2 :: (exchange(), routing_key()) -> [pid()]).
|
||||
-spec(add_binding/4 ::
|
||||
(exchange_name(), queue_name(), routing_key(), amqp_table()) ->
|
||||
bind_res() | {'error', 'durability_settings_incompatible'}).
|
||||
bind_res() | {'error', 'durability_settings_incompatible'}).
|
||||
-spec(delete_binding/4 ::
|
||||
(exchange_name(), queue_name(), routing_key(), amqp_table()) ->
|
||||
bind_res() | {'error', 'binding_not_found'}).
|
||||
|
|
@ -255,11 +255,11 @@ delete_forward_routes(Route) ->
|
|||
|
||||
exchanges_for_queue(QueueName) ->
|
||||
MatchHead = #reverse_route{reverse_binding =
|
||||
#reverse_binding{exchange_name = '$1',
|
||||
queue_name = QueueName,
|
||||
key = '_'}},
|
||||
#reverse_binding{exchange_name = '$1',
|
||||
queue_name = QueueName,
|
||||
key = '_'}},
|
||||
sets:to_list(sets:from_list(
|
||||
mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))).
|
||||
mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))).
|
||||
|
||||
has_bindings(ExchangeName) ->
|
||||
MatchHead = #route{binding = #binding{exchange_name = ExchangeName,
|
||||
|
|
@ -276,54 +276,53 @@ continue(_) ->
|
|||
|
||||
call_with_exchange(Exchange, Fun) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
fun() -> case mnesia:read({exchange, Exchange}) of
|
||||
[] -> {error, exchange_not_found};
|
||||
[X] -> Fun(X)
|
||||
end
|
||||
end).
|
||||
fun() -> case mnesia:read({exchange, Exchange}) of
|
||||
[] -> {error, exchange_not_found};
|
||||
[X] -> Fun(X)
|
||||
end
|
||||
end).
|
||||
|
||||
call_with_exchange_and_queue(Exchange, Queue, Fun) ->
|
||||
call_with_exchange(Exchange,
|
||||
fun(X) ->
|
||||
case mnesia:read({amqqueue, Queue}) of
|
||||
[] -> {error, queue_not_found};
|
||||
[Q] -> Fun(X, Q)
|
||||
end
|
||||
end).
|
||||
call_with_exchange(
|
||||
Exchange,
|
||||
fun(X) -> case mnesia:read({amqqueue, Queue}) of
|
||||
[] -> {error, queue_not_found};
|
||||
[Q] -> Fun(X, Q)
|
||||
end
|
||||
end).
|
||||
|
||||
add_binding(ExchangeName, QueueName, RoutingKey, _Arguments) ->
|
||||
call_with_exchange_and_queue(
|
||||
ExchangeName, QueueName,
|
||||
fun (X, Q) ->
|
||||
if Q#amqqueue.durable and not(X#exchange.durable) ->
|
||||
{error, durability_settings_incompatible};
|
||||
true -> ok = sync_binding(
|
||||
ExchangeName, QueueName, RoutingKey,
|
||||
Q#amqqueue.durable, fun mnesia:write/3)
|
||||
end
|
||||
end).
|
||||
ExchangeName, QueueName,
|
||||
fun (X, Q) ->
|
||||
if Q#amqqueue.durable and not(X#exchange.durable) ->
|
||||
{error, durability_settings_incompatible};
|
||||
true -> ok = sync_binding(
|
||||
ExchangeName, QueueName, RoutingKey,
|
||||
Q#amqqueue.durable, fun mnesia:write/3)
|
||||
end
|
||||
end).
|
||||
|
||||
delete_binding(ExchangeName, QueueName, RoutingKey, _Arguments) ->
|
||||
call_with_exchange_and_queue(
|
||||
ExchangeName, QueueName,
|
||||
fun (X, Q) ->
|
||||
ok = sync_binding(
|
||||
ExchangeName, QueueName, RoutingKey,
|
||||
Q#amqqueue.durable, fun mnesia:delete_object/3),
|
||||
maybe_auto_delete(X)
|
||||
end).
|
||||
ExchangeName, QueueName,
|
||||
fun (X, Q) ->
|
||||
ok = sync_binding(
|
||||
ExchangeName, QueueName, RoutingKey,
|
||||
Q#amqqueue.durable, fun mnesia:delete_object/3),
|
||||
maybe_auto_delete(X)
|
||||
end).
|
||||
|
||||
%% Must run within a transaction.
|
||||
sync_binding(ExchangeName, QueueName, RoutingKey, Durable, Fun) ->
|
||||
Binding = #binding{exchange_name = ExchangeName,
|
||||
queue_name = QueueName,
|
||||
key = RoutingKey},
|
||||
ok = case Durable of
|
||||
true -> Fun(durable_routes, #route{binding = Binding}, write);
|
||||
false -> ok
|
||||
end,
|
||||
[ok, ok] = [Fun(element(1, R), R, write) || R
|
||||
<- tuple_to_list(route_with_reverse(Binding))],
|
||||
true -> Fun(durable_routes, #route{binding = Binding}, write);
|
||||
false -> ok
|
||||
end,
|
||||
[ok, ok] = [Fun(element(1, R), R, write) ||
|
||||
R <- tuple_to_list(route_with_reverse(Binding))],
|
||||
ok.
|
||||
|
||||
route_with_reverse(#route{binding = Binding}) ->
|
||||
|
|
@ -339,11 +338,11 @@ reverse_route(#reverse_route{reverse_binding = Binding}) ->
|
|||
#route{binding = reverse_binding(Binding)}.
|
||||
|
||||
reverse_binding(#reverse_binding{exchange_name = Exchange,
|
||||
queue_name = Queue,
|
||||
key = Key}) ->
|
||||
queue_name = Queue,
|
||||
key = Key}) ->
|
||||
#binding{exchange_name = Exchange,
|
||||
queue_name = Queue,
|
||||
key = Key};
|
||||
queue_name = Queue,
|
||||
key = Key};
|
||||
|
||||
reverse_binding(#binding{exchange_name = Exchange,
|
||||
queue_name = Queue,
|
||||
|
|
@ -378,50 +377,40 @@ last_topic_match(P, R, []) ->
|
|||
topic_matches1(P, R);
|
||||
last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
|
||||
topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
|
||||
|
||||
%----------------------------------------------------------------------------
|
||||
% These functions deal with removing exchanges and any bindings attached to
|
||||
% them depending on whether if-unused flag is set.
|
||||
|
||||
delete(ExchangeName, _IfUnused = true) ->
|
||||
call_with_exchange(ExchangeName, fun conditional_delete/1);
|
||||
delete(ExchangeName, _IfUnused = false) ->
|
||||
call_with_exchange(ExchangeName, fun unconditional_delete/1).
|
||||
|
||||
%----------------------------------------------------------------------------
|
||||
% The following functions must run within a transaction and assume that the
|
||||
% exchange record for which they are performed actually exists.
|
||||
maybe_auto_delete(#exchange{auto_delete = false}) ->
|
||||
ok;
|
||||
maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
|
||||
conditional_delete(Exchange).
|
||||
|
||||
% This will only delete the exchange if and only if there is
|
||||
% no route bound to it
|
||||
|
||||
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
|
||||
case has_bindings(ExchangeName) of
|
||||
false -> unconditional_delete(Exchange);
|
||||
true -> {error, in_use}
|
||||
end.
|
||||
|
||||
% This will unconditionally delete an exchange together with any route
|
||||
% that may have been bound to it
|
||||
unconditional_delete(#exchange{name = ExchangeName}) ->
|
||||
ok = delete_bindings_for_exchange(ExchangeName),
|
||||
ok = mnesia:delete({durable_exchanges, ExchangeName}),
|
||||
ok = mnesia:delete({exchange, ExchangeName}).
|
||||
|
||||
%----------------------------------------------------------------------------
|
||||
% EXTENDED API
|
||||
% These are API calls that are not used by the server internally,
|
||||
% they are exported for embedded clients to use
|
||||
|
||||
% This is currently used in mod_rabbit.erl (XMPP) and expects this to return
|
||||
% {QueueName, RoutingKey, Arguments} tuples
|
||||
%%----------------------------------------------------------------------------
|
||||
%% EXTENDED API
|
||||
%% These are API calls that are not used by the server internally,
|
||||
%% they are exported for embedded clients to use
|
||||
|
||||
%% This is currently used in mod_rabbit.erl (XMPP) and expects this to
|
||||
%% return {QueueName, RoutingKey, Arguments} tuples
|
||||
list_exchange_bindings(ExchangeName) ->
|
||||
Route = #route{binding = #binding{exchange_name = ExchangeName,
|
||||
queue_name = '_',
|
||||
key = '_'}},
|
||||
[{QueueName, RoutingKey, []} ||
|
||||
#route{binding = #binding{queue_name = QueueName,
|
||||
key = RoutingKey}}
|
||||
<- mnesia:dirty_match_object(Route)].
|
||||
#route{binding = #binding{queue_name = QueueName,
|
||||
key = RoutingKey}}
|
||||
<- mnesia:dirty_match_object(Route)].
|
||||
|
|
|
|||
Loading…
Reference in New Issue