Savepoint for changing the routing structure

This commit is contained in:
Ben Hood 2008-07-31 00:44:02 +01:00
parent fbab5e9e49
commit 0a1d35c530
4 changed files with 190 additions and 172 deletions

View File

@ -41,11 +41,14 @@
-record(exchange, {name, type, durable, auto_delete, arguments}).
-record(amqqueue, {name, durable, auto_delete, arguments, binding_specs, pid}).
-record(binding_spec, {exchange_name, routing_key, arguments}).
-record(amqqueue, {name, durable, auto_delete, arguments, pid}).
%% This constant field seems to be required because the underlying storage is
%% ets, which stores key value pairs
-record(binding, {key, handlers}).
-record(handler, {binding_spec, queue, qpid}).
%% The spec field is made up of an {Exchange, Binding, Queue}
-record(forwards_binding, {spec, value = const}).
%% The spec field is made up of an {Queue, Binding, Exchange}
-record(reverse_binding, {spec, value = const}).
-record(listener, {node, protocol, host, port}).
@ -76,16 +79,11 @@
#user{username :: username(),
password :: password()}).
-type(permission() :: 'passive' | 'active' | 'write' | 'read').
-type(binding_spec() ::
#binding_spec{exchange_name :: exchange_name(),
routing_key :: routing_key(),
arguments :: amqp_table()}).
-type(amqqueue() ::
#amqqueue{name :: queue_name(),
durable :: bool(),
auto_delete :: bool(),
arguments :: amqp_table(),
binding_specs :: [binding_spec()],
pid :: maybe(pid())}).
-type(exchange() ::
#exchange{name :: exchange_name(),

View File

@ -84,7 +84,7 @@
-spec(commit/2 :: (pid(), txn()) -> 'ok').
-spec(rollback/2 :: (pid(), txn()) -> 'ok').
-spec(notify_down/2 :: (amqqueue(), pid()) -> 'ok').
-spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok').
%-spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok').
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
@ -135,7 +135,6 @@ declare(Resource = #resource{}, Durable, AutoDelete, Args) ->
durable = Durable,
auto_delete = AutoDelete,
arguments = Args,
binding_specs = [],
pid = none}),
case rabbit_misc:execute_mnesia_transaction(
fun () ->
@ -168,48 +167,52 @@ recover_queue(Q) ->
ok.
default_binding_spec(#resource{virtual_host = VHostPath, name = Name}) ->
#binding_spec{exchange_name = rabbit_misc:r(VHostPath,exchange,<<"">>),
routing_key = Name,
arguments = []}.
recover_bindings(Q = #amqqueue{name = QueueName, binding_specs = Specs}) ->
ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q),
lists:foreach(fun (B) ->
ok = rabbit_exchange:add_binding(B, Q)
end, Specs),
ok.
exit(default_binding_spec).
% #binding_spec{exchange_name = rabbit_misc:r(VHostPath,exchange,<<"">>),
% routing_key = Name,
% arguments = []}.
recover_bindings(Q = #amqqueue{name = QueueName}) ->
exit(recover_bindings).
% ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q),
% lists:foreach(fun (B) ->
% ok = rabbit_exchange:add_binding(B, Q)
% end, Specs),
% ok.
modify_bindings(Queue = #resource{}, X = #resource{}, RoutingKey, Arguments,
SpecPresentFun, SpecAbsentFun) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({amqqueue, Queue}) of
[Q = #amqqueue{binding_specs = Specs0}] ->
Spec = #binding_spec{exchange_name = X,
routing_key = RoutingKey,
arguments = Arguments},
case (case lists:member(Spec, Specs0) of
true -> SpecPresentFun;
false -> SpecAbsentFun
end)(Q, Spec) of
{ok, #amqqueue{binding_specs = Specs}} ->
{ok, length(Specs)};
{error, not_found} ->
{error, exchange_not_found};
Other -> Other
end;
[] -> {error, queue_not_found}
end
end).
update_bindings(Q = #amqqueue{binding_specs = Specs0}, Spec,
exit(modify_bindings).
% rabbit_misc:execute_mnesia_transaction(
% fun () ->
% case mnesia:wread({amqqueue, Queue}) of
% [Q = #amqqueue{binding_specs = Specs0}] ->
% Spec = #binding_spec{exchange_name = X,
% routing_key = RoutingKey,
% arguments = Arguments},
% case (case lists:member(Spec, Specs0) of
% true -> SpecPresentFun;
% false -> SpecAbsentFun
% end)(Q, Spec) of
% {ok, #amqqueue{binding_specs = Specs}} ->
% {ok, length(Specs)};
% {error, not_found} ->
% {error, exchange_not_found};
% Other -> Other
% end;
% [] -> {error, queue_not_found}
% end
% end).
update_bindings(Q = #amqqueue{}, Spec,
UpdateSpecFun, UpdateExchangeFun) ->
Q1 = Q#amqqueue{binding_specs = UpdateSpecFun(Spec, Specs0)},
case UpdateExchangeFun(Spec, Q1) of
ok -> store_queue(Q1),
{ok, Q1};
Other -> Other
end.
exit(update_bindings).
% Q1 = Q#amqqueue{binding_specs = UpdateSpecFun(Spec, Specs0)},
% case UpdateExchangeFun(Spec, Q1) of
% ok -> store_queue(Q1),
% {ok, Q1};
% Other -> Other
% end.
add_binding(QueueName, ExchangeName, RoutingKey, Arguments) ->
modify_bindings(
@ -297,15 +300,16 @@ notify_down(#amqqueue{ pid = QPid }, ChPid) ->
gen_server:call(QPid, {notify_down, ChPid}).
binding_forcibly_removed(BindingSpec, QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({amqqueue, QueueName}) of
[] -> ok;
[Q = #amqqueue{binding_specs = Specs}] ->
store_queue(Q#amqqueue{binding_specs =
lists:delete(BindingSpec, Specs)})
end
end).
exit(binding_forcibly_removed).
% rabbit_misc:execute_mnesia_transaction(
% fun () ->
% case mnesia:wread({amqqueue, QueueName}) of
% [] -> ok;
% [Q = #amqqueue{binding_specs = Specs}] ->
% store_queue(Q#amqqueue{binding_specs =
% lists:delete(BindingSpec, Specs)})
% end
% end).
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
gen_server:call(QPid, {claim_queue, ReaderPid}).
@ -324,11 +328,12 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
notify_sent(QPid, ChPid) ->
gen_server:cast(QPid, {notify_sent, ChPid}).
delete_bindings(Q = #amqqueue{binding_specs = Specs}) ->
lists:foreach(fun (BindingSpec) ->
ok = rabbit_exchange:delete_binding(
BindingSpec, Q)
end, Specs).
delete_bindings(Q = #amqqueue{}) ->
exit(delete_bindings).
% lists:foreach(fun (BindingSpec) ->
% ok = rabbit_exchange:delete_binding(
% BindingSpec, Q)
% end, Specs).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
@ -368,5 +373,4 @@ pseudo_queue(NameBin, Pid) ->
durable = false,
auto_delete = false,
arguments = [],
binding_specs = [],
pid = Pid}.

View File

@ -64,11 +64,11 @@
publish_res()).
-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
-spec(route/2 :: (exchange(), routing_key()) -> [pid()]).
-spec(add_binding/2 :: (binding_spec(), amqqueue()) ->
'ok' | not_found() |
{'error', 'durability_settings_incompatible'}).
-spec(delete_binding/2 :: (binding_spec(), amqqueue()) ->
'ok' | not_found()).
% -spec(add_binding/2 :: (binding_spec(), amqqueue()) ->
% 'ok' | not_found() |
% {'error', 'durability_settings_incompatible'}).
% -spec(delete_binding/2 :: (binding_spec(), amqqueue()) ->
% 'ok' | not_found()).
-spec(topic_matches/2 :: (binary(), binary()) -> bool()).
-spec(delete/2 :: (exchange_name(), bool()) ->
'ok' | not_found() | {'error', 'in_use'}).
@ -144,15 +144,17 @@ list_vhost_exchanges(VHostPath) ->
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
list_exchange_bindings(Name) ->
[{QueueName, RoutingKey, Arguments} ||
#binding{handlers = Handlers} <- bindings_for_exchange(Name),
#handler{binding_spec = #binding_spec{routing_key = RoutingKey,
arguments = Arguments},
queue = QueueName} <- Handlers].
exit(list_exchange_bindings).
% [{QueueName, RoutingKey, Arguments} ||
% #binding{handlers = Handlers} <- bindings_for_exchange(Name),
% #handler{binding_spec = #binding_spec{routing_key = RoutingKey,
% arguments = Arguments},
% queue = QueueName} <- Handlers].
bindings_for_exchange(Name) ->
qlc:e(qlc:q([B || B = #binding{key = K} <- mnesia:table(binding),
element(1, K) == Name])).
exit(bindings_for_exchange).
% qlc:e(qlc:q([B || B = #binding{key = K} <- mnesia:table(binding),
% element(1, K) == Name])).
empty_handlers() ->
[].
@ -189,25 +191,27 @@ simple_publish(Mandatory, Immediate,
%% as many times as a message should be delivered to it. With the
%% current exchange types that is at most once.
route(#exchange{name = Name, type = topic}, RoutingKey) ->
sets:to_list(
sets:union(
mnesia:activity(
async_dirty,
fun () ->
qlc:e(qlc:q([handler_qpids(H) ||
#binding{key = {Name1, PatternKey},
handlers = H}
<- mnesia:table(binding),
Name == Name1,
topic_matches(PatternKey, RoutingKey)]))
end)));
exit(route);
% sets:to_list(
% sets:union(
% mnesia:activity(
% async_dirty,
% fun () ->
% qlc:e(qlc:q([handler_qpids(H) ||
% #binding{key = {Name1, PatternKey},
% handlers = H}
% <- mnesia:table(binding),
% Name == Name1,
% topic_matches(PatternKey, RoutingKey)]))
% end)));
route(#exchange{name = Name, type = Type}, RoutingKey) ->
BindingKey = delivery_key_for_type(Type, Name, RoutingKey),
case rabbit_misc:dirty_read({binding, BindingKey}) of
{ok, #binding{handlers = H}} -> sets:to_list(handler_qpids(H));
{error, not_found} -> []
end.
exit(route).
% BindingKey = delivery_key_for_type(Type, Name, RoutingKey),
% case rabbit_misc:dirty_read({binding, BindingKey}) of
% {ok, #binding{handlers = H}} -> sets:to_list(handler_qpids(H));
% {error, not_found} -> []
% end.
delivery_key_for_type(fanout, Name, _RoutingKey) ->
{Name, fanout};
@ -221,28 +225,33 @@ call_with_exchange(Name, Fun) ->
end.
make_handler(BindingSpec, #amqqueue{name = QueueName, pid = QPid}) ->
#handler{binding_spec = BindingSpec, queue = QueueName, qpid = QPid}.
exit(make_handler).
%#handler{binding_spec = BindingSpec, queue = QueueName, qpid = QPid}.
add_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName,
routing_key = RoutingKey}, Q) ->
call_with_exchange(
ExchangeName,
fun (X) -> if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
true ->
internal_add_binding(
X, RoutingKey, make_handler(BindingSpec, Q))
end
end).
add_binding(BindingSpec %= #binding_spec{exchange_name = ExchangeName,
% routing_key = RoutingKey},
,Q) ->
exit(add_binding).
% call_with_exchange(
% ExchangeName,
% fun (X) -> if Q#amqqueue.durable and not(X#exchange.durable) ->
% {error, durability_settings_incompatible};
% true ->
% internal_add_binding(
% X, RoutingKey, make_handler(BindingSpec, Q))
% end
% end).
delete_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName,
routing_key = RoutingKey}, Q) ->
call_with_exchange(
ExchangeName,
fun (X) -> ok = internal_delete_binding(
X, RoutingKey, make_handler(BindingSpec, Q)),
maybe_auto_delete(X)
end).
delete_binding(BindingSpec %= #binding_spec{exchange_name = ExchangeName,
% routing_key = RoutingKey},
,Q) ->
exit(delete_binding).
% call_with_exchange(
% ExchangeName,
% fun (X) -> ok = internal_delete_binding(
% X, RoutingKey, make_handler(BindingSpec, Q)),
% maybe_auto_delete(X)
% end).
%% Must run within a transaction.
maybe_auto_delete(#exchange{auto_delete = false}) ->
@ -261,7 +270,8 @@ extend_handlers(Handlers, Handler) -> [Handler | Handlers].
delete_handler(Handlers, Handler) -> lists:delete(Handler, Handlers).
handler_qpids(Handlers) ->
sets:from_list([QPid || #handler{qpid = QPid} <- Handlers]).
exit(handler_qpids).
%sets:from_list([QPid || #handler{qpid = QPid} <- Handlers]).
%% Must run within a transaction.
internal_add_binding(#exchange{name = ExchangeName, type = Type},
@ -277,32 +287,34 @@ internal_delete_binding(#exchange{name = ExchangeName, type = Type}, RoutingKey,
%% Must run within a transaction.
add_handler_to_binding(BindingKey, Handler) ->
ok = case mnesia:wread({binding, BindingKey}) of
[] ->
ok = mnesia:write(
#binding{key = BindingKey,
handlers = extend_handlers(
empty_handlers(), Handler)});
[B = #binding{handlers = H}] ->
ok = mnesia:write(
B#binding{handlers = extend_handlers(H, Handler)})
end.
exit(add_handler_to_binding).
% ok = case mnesia:wread({binding, BindingKey}) of
% [] ->
% ok = mnesia:write(
% #binding{key = BindingKey,
% handlers = extend_handlers(
% empty_handlers(), Handler)});
% [B = #binding{handlers = H}] ->
% ok = mnesia:write(
% B#binding{handlers = extend_handlers(H, Handler)})
% end.
%% Must run within a transaction.
remove_handler_from_binding(BindingKey, Handler) ->
case mnesia:wread({binding, BindingKey}) of
[] -> empty;
[B = #binding{handlers = H}] ->
H1 = delete_handler(H, Handler),
case handlers_isempty(H1) of
true ->
ok = mnesia:delete({binding, BindingKey}),
empty;
_ ->
ok = mnesia:write(B#binding{handlers = H1}),
not_empty
end
end.
exit(remove_handler_from_binding).
% case mnesia:wread({binding, BindingKey}) of
% [] -> empty;
% [B = #binding{handlers = H}] ->
% H1 = delete_handler(H, Handler),
% case handlers_isempty(H1) of
% true ->
% ok = mnesia:delete({binding, BindingKey}),
% empty;
% _ ->
% ok,% = mnesia:write(B#binding{handlers = H1}),
% not_empty
% end
% end.
split_topic_key(Key) ->
{ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
@ -336,41 +348,44 @@ delete(ExchangeName, IfUnused) ->
fun () -> internal_delete(ExchangeName, IfUnused) end).
internal_delete(ExchangeName, _IfUnused = true) ->
Bindings = bindings_for_exchange(ExchangeName),
case Bindings of
[] -> do_internal_delete(ExchangeName, Bindings);
_ ->
case lists:all(fun (#binding{handlers = H}) -> handlers_isempty(H) end,
Bindings) of
true ->
%% There are no handlers anywhere in any of the
%% bindings for this exchange.
do_internal_delete(ExchangeName, Bindings);
false ->
%% There was at least one real handler
%% present. It's still in use.
{error, in_use}
end
end;
exit(internal_delete);
% Bindings = bindings_for_exchange(ExchangeName),
% case Bindings of
% [] -> do_internal_delete(ExchangeName, Bindings);
% _ ->
% case lists:all(fun (#binding{handlers = H}) -> handlers_isempty(H) end,
% Bindings) of
% true ->
% %% There are no handlers anywhere in any of the
% %% bindings for this exchange.
% do_internal_delete(ExchangeName, Bindings);
% false ->
% %% There was at least one real handler
% %% present. It's still in use.
% {error, in_use}
% end
% end;
internal_delete(ExchangeName, false) ->
do_internal_delete(ExchangeName, bindings_for_exchange(ExchangeName)).
forcibly_remove_handlers(Handlers) ->
lists:foreach(
fun (#handler{binding_spec = BindingSpec, queue = QueueName}) ->
ok = rabbit_amqqueue:binding_forcibly_removed(
BindingSpec, QueueName)
end, Handlers),
ok.
exit(forcibly_remove_handlers).
% lists:foreach(
% fun (#handler{binding_spec = BindingSpec, queue = QueueName}) ->
% ok = rabbit_amqqueue:binding_forcibly_removed(
% BindingSpec, QueueName)
% end, Handlers),
% ok.
do_internal_delete(ExchangeName, Bindings) ->
case mnesia:wread({exchange, ExchangeName}) of
[] -> {error, not_found};
_ ->
lists:foreach(fun (#binding{key = K, handlers = H}) ->
ok = forcibly_remove_handlers(H),
ok = mnesia:delete({binding, K})
end, Bindings),
ok = mnesia:delete({durable_exchanges, ExchangeName}),
ok = mnesia:delete({exchange, ExchangeName})
end.
exit(do_internal_delete).
% case mnesia:wread({exchange, ExchangeName}) of
% [] -> {error, not_found};
% _ ->
% lists:foreach(fun (#binding{key = K, handlers = H}) ->
% ok = forcibly_remove_handlers(H),
% ok = mnesia:delete({binding, K})
% end, Bindings),
% ok = mnesia:delete({durable_exchanges, ExchangeName}),
% ok = mnesia:delete({exchange, ExchangeName})
% end.

View File

@ -105,7 +105,8 @@ table_definitions() ->
{rabbit_config, [{disc_copies, [node()]}]},
{listener, [{type, bag},
{attributes, record_info(fields, listener)}]},
{binding, [{attributes, record_info(fields, binding)}]},
{forwards_binding, [{type,ordered_set},{attributes, record_info(fields, forwards_binding)}]},
{reverse_binding, [{type,ordered_set},{attributes, record_info(fields, reverse_binding)}]},
{durable_exchanges, [{disc_copies, [node()]},
{record_name, exchange},
{attributes, record_info(fields, exchange)}]},