Move decorators to exchange record

This commit is contained in:
Emile Joubert 2013-03-22 12:47:21 +00:00
parent 9b8ca7c15d
commit 6fa8df03f5
5 changed files with 73 additions and 49 deletions

View File

@ -40,7 +40,7 @@
-record(resource, {virtual_host, kind, name}). -record(resource, {virtual_host, kind, name}).
-record(exchange, {name, type, durable, auto_delete, internal, arguments, -record(exchange, {name, type, durable, auto_delete, internal, arguments,
scratches, policy}). scratches, policy, decorators}).
-record(exchange_serial, {name, next}). -record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,

View File

@ -115,23 +115,23 @@ recover() ->
rabbit_durable_exchange), rabbit_durable_exchange),
[XName || #exchange{name = XName} <- Xs]. [XName || #exchange{name = XName} <- Xs].
callback(X = #exchange{type = XType}, Fun, Serial0, Args) -> callback(X = #exchange{type = XType,
decorators = Decorators}, Fun, Serial0, Args) ->
Serial = if is_function(Serial0) -> Serial0; Serial = if is_function(Serial0) -> Serial0;
is_atom(Serial0) -> fun (_Bool) -> Serial0 end is_atom(Serial0) -> fun (_Bool) -> Serial0 end
end, end,
[ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) || [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) ||
M <- registry_lookup(exchange_decorator)], M <- rabbit_exchange_decorator:select(all, Decorators)],
Module = type_to_module(XType), Module = type_to_module(XType),
apply(Module, Fun, [Serial(Module:serialise_events()) | Args]). apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
policy_changed(X = #exchange{type = XType}, X1) -> policy_changed(X = #exchange{type = XType}, X1) ->
[ok = M:policy_changed(X, X1) || ok = (type_to_module(XType)):policy_changed(X, X1).
M <- [type_to_module(XType) | registry_lookup(exchange_decorator)]],
ok.
serialise_events(X = #exchange{type = Type}) -> serialise_events(X = #exchange{type = Type, decorators = Decorators}) ->
lists:any(fun (M) -> M:serialise_events(X) end, lists:any(fun (M) ->
registry_lookup(exchange_decorator)) M:serialise_events(X)
end, rabbit_exchange_decorator:select(all, Decorators))
orelse (type_to_module(Type)):serialise_events(). orelse (type_to_module(Type)):serialise_events().
serial(#exchange{name = XName} = X) -> serial(#exchange{name = XName} = X) ->
@ -143,23 +143,14 @@ serial(#exchange{name = XName} = X) ->
(false) -> none (false) -> none
end. end.
registry_lookup(exchange_decorator_route = Class) ->
case get(exchange_decorator_route_modules) of
undefined -> Mods = [M || {_, M} <- rabbit_registry:lookup_all(Class)],
put(exchange_decorator_route_modules, Mods),
Mods;
Mods -> Mods
end;
registry_lookup(Class) ->
[M || {_, M} <- rabbit_registry:lookup_all(Class)].
declare(XName, Type, Durable, AutoDelete, Internal, Args) -> declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
X = rabbit_policy:set(#exchange{name = XName, X0 = rabbit_policy:set(#exchange{name = XName,
type = Type, type = Type,
durable = Durable, durable = Durable,
auto_delete = AutoDelete, auto_delete = AutoDelete,
internal = Internal, internal = Internal,
arguments = Args}), arguments = Args}),
X = rabbit_exchange_decorator:record(X0, rabbit_exchange_decorator:list()),
XT = type_to_module(Type), XT = type_to_module(Type),
%% We want to upset things if it isn't ok %% We want to upset things if it isn't ok
ok = XT:validate(X), ok = XT:validate(X),
@ -318,25 +309,25 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
route(#exchange{name = #resource{virtual_host = VHost, route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName,
name = RName} = XName} = X, decorators = Decorators} = X,
#delivery{message = #basic_message{routing_keys = RKs}} = Delivery) -> #delivery{message = #basic_message{routing_keys = RKs}} = Delivery) ->
case {registry_lookup(exchange_decorator_route), RName == <<"">>} of case {RName, rabbit_exchange_decorator:select(route, Decorators)} of
{[], true} -> {<<"">>, []} ->
%% Optimisation %% Optimisation
[rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
{Decorators, _} -> {_, RDecorators} ->
lists:usort(route1(Delivery, Decorators, {[X], XName, []})) lists:usort(route1(Delivery, RDecorators, {[X], XName, []}))
end. end.
route1(_, _, {[], _, QNames}) -> route1(_, _, {[], _, QNames}) ->
QNames; QNames;
route1(Delivery, Decorators, route1(Delivery, RDecorators,
{[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) -> {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
ExchangeDests = (type_to_module(Type)):route(X, Delivery), ExchangeDests = (type_to_module(Type)):route(X, Delivery),
DecorateDests = process_decorators(X, Decorators, Delivery), DecorateDests = process_decorators(X, RDecorators, Delivery),
AlternateDests = process_alternate(X, ExchangeDests), AlternateDests = process_alternate(X, ExchangeDests),
route1(Delivery, Decorators, route1(Delivery, RDecorators,
lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames}, lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames},
AlternateDests ++ DecorateDests ++ ExchangeDests)). AlternateDests ++ DecorateDests ++ ExchangeDests)).

View File

@ -16,6 +16,10 @@
-module(rabbit_exchange_decorator). -module(rabbit_exchange_decorator).
-include("rabbit.hrl").
-export([list/0, select/2, record/2]).
%% This is like an exchange type except that: %% This is like an exchange type except that:
%% %%
%% 1) It applies to all exchanges as soon as it is installed, therefore %% 1) It applies to all exchanges as soon as it is installed, therefore
@ -45,10 +49,6 @@
-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) -> -callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) ->
'ok'. 'ok'.
%% called when the policy attached to this exchange changes.
-callback policy_changed(rabbit_types:exchange(), rabbit_types:exchange()) ->
'ok'.
%% called after a binding has been added or recovered %% called after a binding has been added or recovered
-callback add_binding(serial(), rabbit_types:exchange(), -callback add_binding(serial(), rabbit_types:exchange(),
rabbit_types:binding()) -> 'ok'. rabbit_types:binding()) -> 'ok'.
@ -59,8 +59,12 @@
%% Decorators can optionally implement route/2 which allows additional %% Decorators can optionally implement route/2 which allows additional
%% destinations to be added to the routing decision. %% destinations to be added to the routing decision.
%% -callback route(rabbit_types:exchange(), rabbit_types:delivery()) -> -callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
%% [rabbit_amqqueue:name() | rabbit_exchange:name()]. [rabbit_amqqueue:name() | rabbit_exchange:name()] | ok.
%% Whether the decorator wishes to receive callbacks for the exchange
%% none:no callbacks, noroute:all callbacks except route, all:all callbacks
-callback active_for(rabbit_types:exchange()) -> 'none' | 'noroute' | 'all'.
-else. -else.
@ -68,8 +72,29 @@
behaviour_info(callbacks) -> behaviour_info(callbacks) ->
[{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3}, [{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3},
{policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}]; {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3},
{active_for, 1}];
behaviour_info(_Other) -> behaviour_info(_Other) ->
undefined. undefined.
-endif. -endif.
%%----------------------------------------------------------------------------
list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
%% select a subset of active decorators
select(all, {Route, NoRoute}) -> Route ++ NoRoute;
select(route, {Route, _NoRoute}) -> Route.
%% record active decorators in an exchange
record(X, Decorators) ->
X#exchange{decorators =
lists:foldl(fun (D, {Route, NoRoute}) ->
Callbacks = D:active_for(X),
{cons_if_eq(all, Callbacks, D, Route),
cons_if_eq(noroute, Callbacks, D, NoRoute)}
end, {[], []}, Decorators)}.
cons_if_eq(Select, Select, Item, List) -> [Item | List];
cons_if_eq(_Select, _Other, _Item, List) -> List.

View File

@ -156,9 +156,10 @@ notify_clear(VHost, <<"policy">>, _Name) ->
update_policies(VHost) -> update_policies(VHost) ->
Policies = list(VHost), Policies = list(VHost),
Decorators = rabbit_exchange_decorator:list(),
{Xs, Qs} = rabbit_misc:execute_mnesia_transaction( {Xs, Qs} = rabbit_misc:execute_mnesia_transaction(
fun() -> fun() ->
{[update_exchange(X, Policies) || {[update_exchange(X, Policies, Decorators) ||
X <- rabbit_exchange:list(VHost)], X <- rabbit_exchange:list(VHost)],
[update_queue(Q, Policies) || [update_queue(Q, Policies) ||
Q <- rabbit_amqqueue:list(VHost)]} Q <- rabbit_amqqueue:list(VHost)]}
@ -167,12 +168,18 @@ update_policies(VHost) ->
[catch notify(Q) || Q <- Qs], [catch notify(Q) || Q <- Qs],
ok. ok.
update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) -> update_exchange(X = #exchange{name = XName, policy = OldPolicy},
Policies, Decorators) ->
case match(XName, Policies) of case match(XName, Policies) of
OldPolicy -> no_change; OldPolicy ->
NewPolicy -> rabbit_exchange:update( no_change;
XName, fun(X1) -> X1#exchange{policy = NewPolicy} end), NewPolicy ->
{X, X#exchange{policy = NewPolicy}} rabbit_exchange:update(
XName, fun(X1) ->
rabbit_exchange_decorator:record(
X1#exchange{policy = NewPolicy}, Decorators)
end),
{X, X#exchange{policy = NewPolicy}}
end. end.
update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) -> update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->

View File

@ -563,8 +563,9 @@ test_topic_matching() ->
XName = #resource{virtual_host = <<"/">>, XName = #resource{virtual_host = <<"/">>,
kind = exchange, kind = exchange,
name = <<"test_exchange">>}, name = <<"test_exchange">>},
X = #exchange{name = XName, type = topic, durable = false, X0 = #exchange{name = XName, type = topic, durable = false,
auto_delete = false, arguments = []}, auto_delete = false, arguments = []},
X = rabbit_exchange_decorator:record(X0, []),
%% create %% create
rabbit_exchange_type_topic:validate(X), rabbit_exchange_type_topic:validate(X),
exchange_op_callback(X, create, []), exchange_op_callback(X, create, []),