Move mnesia queries from rabbit_misc to rabbit_mnesia
This commit is contained in:
parent
783996f53d
commit
f2443f6d10
|
|
@ -467,7 +467,7 @@ close_channels(_TrackedChannels = []) -> ok.
|
|||
|
||||
migrate_tracking_records() ->
|
||||
Node = node(),
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
Table = tracked_channel_table_name_for(Node),
|
||||
_ = mnesia:lock({table, Table}, read),
|
||||
|
|
@ -477,7 +477,7 @@ migrate_tracking_records() ->
|
|||
ets:insert(tracked_channel, Channel)
|
||||
end, Channels)
|
||||
end),
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
Table = tracked_channel_per_user_table_name_for(Node),
|
||||
_ = mnesia:lock({table, Table}, read),
|
||||
|
|
|
|||
|
|
@ -731,7 +731,7 @@ close_connection(#tracked_connection{pid = Pid}, Message) ->
|
|||
|
||||
migrate_tracking_records() ->
|
||||
Node = node(),
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
Table = tracked_connection_table_name_for(Node),
|
||||
_ = mnesia:lock({table, Table}, read),
|
||||
|
|
@ -741,7 +741,7 @@ migrate_tracking_records() ->
|
|||
ets:insert(tracked_connection, Connection)
|
||||
end, Connections)
|
||||
end),
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
Table = tracked_connection_per_user_table_name_for(Node),
|
||||
_ = mnesia:lock({table, Table}, read),
|
||||
|
|
@ -752,7 +752,7 @@ migrate_tracking_records() ->
|
|||
ets:update_counter(tracked_connection_per_user, Username, C, {Username, 0})
|
||||
end, Connections)
|
||||
end),
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
Table = tracked_connection_per_vhost_table_name_for(Node),
|
||||
_ = mnesia:lock({table, Table}, read),
|
||||
|
|
|
|||
|
|
@ -160,7 +160,7 @@ direct_exchange_routing_v2_enable(#{feature_name := FeatureName}) ->
|
|||
|
||||
listener_records_in_ets_enable(#{feature_name := FeatureName}) ->
|
||||
try
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
_ = mnesia:lock({table, rabbit_listener}, read),
|
||||
Listeners = mnesia:select(
|
||||
|
|
|
|||
|
|
@ -254,7 +254,7 @@ recover_in_mnesia(RecoverFun) ->
|
|||
[RecoverFun(Route, Src, Dst, fun recover_semi_durable_route/2) ||
|
||||
#route{binding = #binding{destination = Dst,
|
||||
source = Src}} = Route <-
|
||||
rabbit_misc:dirty_read_all(rabbit_semi_durable_route)].
|
||||
rabbit_mnesia:dirty_read_all(rabbit_semi_durable_route)].
|
||||
|
||||
create_index_route_table() ->
|
||||
rabbit_db:run(
|
||||
|
|
@ -333,7 +333,7 @@ binding_action_in_mnesia(#binding{source = SrcName,
|
|||
destination = DstName}, Fun, ErrFun) ->
|
||||
SrcTable = table_for_resource(SrcName),
|
||||
DstTable = table_for_resource(DstName),
|
||||
rabbit_misc:execute_mnesia_tx_with_tail(
|
||||
rabbit_mnesia:execute_mnesia_tx_with_tail(
|
||||
fun () ->
|
||||
case {mnesia:read({SrcTable, SrcName}),
|
||||
mnesia:read({DstTable, DstName})} of
|
||||
|
|
@ -371,11 +371,11 @@ create_in_mnesia(Binding, ChecksFun) ->
|
|||
end, fun not_found_or_absent_errs_in_mnesia/1).
|
||||
|
||||
populate_index_route_table_in_mnesia() ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
_ = mnesia:lock({table, rabbit_route}, read),
|
||||
_ = mnesia:lock({table, rabbit_index_route}, write),
|
||||
Routes = rabbit_misc:dirty_read_all(rabbit_route),
|
||||
Routes = rabbit_mnesia:dirty_read_all(rabbit_route),
|
||||
lists:foreach(fun(#route{binding = #binding{source = Exchange}} = Route) ->
|
||||
case rabbit_db_exchange:get(Exchange) of
|
||||
{ok, X} ->
|
||||
|
|
@ -542,11 +542,11 @@ not_found_or_absent_in_mnesia(#resource{kind = queue} = Name) ->
|
|||
end.
|
||||
|
||||
recover_in_mnesia() ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
_ = mnesia:lock({table, rabbit_durable_route}, read),
|
||||
_ = mnesia:lock({table, rabbit_semi_durable_route}, write),
|
||||
Routes = rabbit_misc:dirty_read_all(rabbit_durable_route),
|
||||
Routes = rabbit_mnesia:dirty_read_all(rabbit_durable_route),
|
||||
Fun = fun(Route) ->
|
||||
mnesia:dirty_write(rabbit_semi_durable_route, Route)
|
||||
end,
|
||||
|
|
@ -556,7 +556,7 @@ recover_in_mnesia() ->
|
|||
|
||||
recover_semi_durable_route(#route{binding = B} = Route, X) ->
|
||||
MaybeSerial = rabbit_exchange:serialise_events(X),
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
case mnesia:read(rabbit_semi_durable_route, B, read) of
|
||||
[] -> no_recover;
|
||||
|
|
|
|||
|
|
@ -132,7 +132,7 @@ get(Name) ->
|
|||
}).
|
||||
|
||||
get_in_mnesia(Name) ->
|
||||
rabbit_misc:dirty_read({rabbit_exchange, Name}).
|
||||
rabbit_mnesia:dirty_read({rabbit_exchange, Name}).
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% get_many().
|
||||
|
|
@ -192,7 +192,7 @@ update(XName, Fun) ->
|
|||
}).
|
||||
|
||||
update_in_mnesia(XName, Fun) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
update_in_mnesia_tx(XName, Fun)
|
||||
end).
|
||||
|
|
@ -218,7 +218,7 @@ create_or_get(X) ->
|
|||
}).
|
||||
|
||||
create_or_get_in_mnesia(#exchange{name = XName} = X) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
case mnesia:wread({rabbit_exchange, XName}) of
|
||||
[] ->
|
||||
|
|
@ -246,7 +246,7 @@ insert(Xs) ->
|
|||
}).
|
||||
|
||||
insert_in_mnesia(Xs) when is_list(Xs) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
[mnesia:write(rabbit_durable_exchange, X, write) || X <- Xs]
|
||||
end),
|
||||
|
|
@ -271,7 +271,7 @@ peek_serial(XName) ->
|
|||
}).
|
||||
|
||||
peek_serial_in_mnesia(XName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
peek_serial_in_mnesia_tx(XName, read)
|
||||
end).
|
||||
|
|
@ -295,7 +295,7 @@ next_serial(X) ->
|
|||
}).
|
||||
|
||||
next_serial_in_mnesia(X) ->
|
||||
rabbit_misc:execute_mnesia_transaction(fun() ->
|
||||
rabbit_mnesia:execute_mnesia_transaction(fun() ->
|
||||
next_serial_in_mnesia_tx(X)
|
||||
end).
|
||||
|
||||
|
|
@ -332,7 +332,7 @@ delete_in_mnesia(XName, IfUnused) ->
|
|||
true -> fun conditional_delete_in_mnesia/2;
|
||||
false -> fun unconditional_delete_in_mnesia/2
|
||||
end,
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
case mnesia:wread({rabbit_exchange, XName}) of
|
||||
[X] -> DeletionFun(X, false);
|
||||
|
|
@ -358,7 +358,7 @@ delete_serial(XName) ->
|
|||
}).
|
||||
|
||||
delete_serial_in_mnesia(XName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
mnesia:delete({rabbit_exchange_serial, XName})
|
||||
end).
|
||||
|
|
@ -458,7 +458,7 @@ delete_in_mnesia(X = #exchange{name = XName}, OnlyDurable, RemoveBindingsForSour
|
|||
get_many_in_mnesia(Table, [Name]) -> ets:lookup(Table, Name);
|
||||
get_many_in_mnesia(Table, Names) when is_list(Names) ->
|
||||
%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
|
||||
%% expensive for reasons explained in rabbit_misc:dirty_read/1.
|
||||
%% expensive for reasons explained in rabbit_mnesia:dirty_read/1.
|
||||
lists:append([ets:lookup(Table, Name) || Name <- Names]).
|
||||
|
||||
conditional_delete_in_mnesia(X = #exchange{name = XName}, OnlyDurable) ->
|
||||
|
|
@ -500,7 +500,7 @@ insert_ram_in_mnesia_tx(X) ->
|
|||
X1.
|
||||
|
||||
recover_in_mnesia(VHost) ->
|
||||
rabbit_misc:table_filter(
|
||||
rabbit_mnesia:table_filter(
|
||||
fun (#exchange{name = XName}) ->
|
||||
XName#resource.virtual_host =:= VHost andalso
|
||||
mnesia:read({rabbit_exchange, XName}) =:= []
|
||||
|
|
@ -508,7 +508,7 @@ recover_in_mnesia(VHost) ->
|
|||
fun (X, true) ->
|
||||
X;
|
||||
(X, false) ->
|
||||
X1 = rabbit_misc:execute_mnesia_transaction(
|
||||
X1 = rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> insert_in_mnesia_tx(X) end),
|
||||
Serial = rabbit_exchange:serial(X1),
|
||||
rabbit_exchange:callback(X1, create, Serial, [X1])
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ delete(Group, Id) ->
|
|||
}).
|
||||
|
||||
delete_in_mnesia(Group, Id) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> delete_in_mnesia_tx(Group, Id) end).
|
||||
|
||||
-spec find_mirror(Group, Id) -> Ret when
|
||||
|
|
@ -115,7 +115,7 @@ create_tables_in_mnesia([{Table, Attributes} | Ts]) ->
|
|||
end.
|
||||
|
||||
create_or_update_in_mnesia(Group, Overall, Delegate, ChildSpec, Id) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
ReadResult = mnesia:wread({?TABLE, {Group, Id}}),
|
||||
rabbit_log:debug("Mirrored supervisor: check_start table ~ts read for key ~tp returned ~tp",
|
||||
|
|
@ -145,7 +145,7 @@ create_or_update_in_mnesia(Group, Overall, Delegate, ChildSpec, Id) ->
|
|||
end).
|
||||
|
||||
update_all_in_mnesia(Overall, OldOverall) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
MatchHead = #mirrored_sup_childspec{mirroring_pid = OldOverall,
|
||||
key = '$1',
|
||||
|
|
@ -156,7 +156,7 @@ update_all_in_mnesia(Overall, OldOverall) ->
|
|||
end).
|
||||
|
||||
delete_all_in_mnesia(Group) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
MatchHead = #mirrored_sup_childspec{key = {Group, '$1'},
|
||||
_ = '_'},
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ update(VHost, GetUpdatedExchangeFun, GetUpdatedQueueFun) ->
|
|||
update_in_mnesia(VHost, GetUpdatedExchangeFun, GetUpdatedQueueFun) ->
|
||||
Tabs = [rabbit_queue, rabbit_durable_queue,
|
||||
rabbit_exchange, rabbit_durable_exchange],
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
[mnesia:lock({table, T}, write) || T <- Tabs], %% [1]
|
||||
Exchanges0 = rabbit_db_exchange:get_all(VHost),
|
||||
|
|
|
|||
|
|
@ -195,7 +195,7 @@ get(Name) ->
|
|||
}).
|
||||
|
||||
get_in_mnesia(Name) ->
|
||||
rabbit_misc:dirty_read({rabbit_queue, Name}).
|
||||
rabbit_mnesia:dirty_read({rabbit_queue, Name}).
|
||||
|
||||
get_durable(Names) when is_list(Names) ->
|
||||
rabbit_db:run(
|
||||
|
|
@ -207,7 +207,7 @@ get_durable(Name) ->
|
|||
}).
|
||||
|
||||
get_durable_in_mnesia(Name) ->
|
||||
rabbit_misc:dirty_read({rabbit_durable_queue, Name}).
|
||||
rabbit_mnesia:dirty_read({rabbit_durable_queue, Name}).
|
||||
|
||||
delete_transient(Queues) ->
|
||||
rabbit_db:run(
|
||||
|
|
@ -215,7 +215,7 @@ delete_transient(Queues) ->
|
|||
}).
|
||||
|
||||
delete_transient_in_mnesia(Queues) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
[{QName, delete_transient_in_mnesia_tx(QName)}
|
||||
|| QName <- Queues]
|
||||
|
|
@ -227,7 +227,7 @@ on_node_up(Node, Fun) ->
|
|||
}).
|
||||
|
||||
on_node_up_in_mnesia(Node, Fun) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
Qs = mnesia:match_object(rabbit_queue,
|
||||
amqqueue:pattern_match_all(), write),
|
||||
|
|
@ -241,7 +241,7 @@ on_node_down(Node, Fun) ->
|
|||
}).
|
||||
|
||||
on_node_down_in_mnesia(Node, Fun) ->
|
||||
Qs = rabbit_misc:execute_mnesia_transaction(
|
||||
Qs = rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
qlc:e(qlc:q([amqqueue:get_name(Q) || Q <- mnesia:table(rabbit_queue),
|
||||
Fun(Node, Q)
|
||||
|
|
@ -274,7 +274,7 @@ update(QName, Fun) ->
|
|||
}).
|
||||
|
||||
update_in_mnesia(QName, Fun) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
update_in_mnesia_tx(QName, Fun)
|
||||
end).
|
||||
|
|
@ -327,7 +327,7 @@ create_or_get(DurableQ, Q) ->
|
|||
|
||||
create_or_get_in_mnesia(DurableQ, Q) ->
|
||||
QueueName = amqqueue:get_name(Q),
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
case mnesia:wread({rabbit_queue, QueueName}) of
|
||||
[] ->
|
||||
|
|
@ -349,7 +349,7 @@ insert(DurableQ, Q) ->
|
|||
}).
|
||||
|
||||
insert_in_mnesia(DurableQ, Q) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
insert_in_mnesia_tx(DurableQ, Q)
|
||||
end).
|
||||
|
|
@ -360,7 +360,7 @@ insert(Qs) ->
|
|||
}).
|
||||
|
||||
insert_many_in_mnesia(Qs) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
[ok = mnesia:write(rabbit_durable_queue, Q, write) || Q <- Qs]
|
||||
end).
|
||||
|
|
@ -420,7 +420,7 @@ get_many_in_mnesia(Table, [Name]) ->
|
|||
ets:lookup(Table, Name);
|
||||
get_many_in_mnesia(Table, Names) when is_list(Names) ->
|
||||
%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
|
||||
%% expensive for reasons explained in rabbit_misc:dirty_read/1.
|
||||
%% expensive for reasons explained in rabbit_mnesia:dirty_read/1.
|
||||
lists:append([ets:lookup(Table, Name) || Name <- Names]).
|
||||
|
||||
delete_transient_in_mnesia_tx(QName) ->
|
||||
|
|
@ -438,7 +438,7 @@ not_found_or_absent_queue_dirty_in_mnesia(Name) ->
|
|||
%% We should read from both tables inside a tx, to get a
|
||||
%% consistent view. But the chances of an inconsistency are small,
|
||||
%% and only affect the error kind.
|
||||
case rabbit_misc:dirty_read({rabbit_durable_queue, Name}) of
|
||||
case rabbit_mnesia:dirty_read({rabbit_durable_queue, Name}) of
|
||||
{error, not_found} -> not_found;
|
||||
{ok, Q} -> {absent, Q, nodedown}
|
||||
end.
|
||||
|
|
@ -476,7 +476,7 @@ list_with_possible_retry_in_mnesia(Fun) ->
|
|||
end.
|
||||
|
||||
delete_in_mnesia(QueueName, Reason) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
case {mnesia:wread({rabbit_queue, QueueName}),
|
||||
mnesia:wread({rabbit_durable_queue, QueueName})} of
|
||||
|
|
@ -521,7 +521,7 @@ list_for_count_in_mnesia(VHost) ->
|
|||
end).
|
||||
|
||||
update_decorators_in_mnesia(Name) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
case mnesia:wread({rabbit_queue, Name}) of
|
||||
[Q] -> ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q),
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ set(Key, Term) when is_atom(Key) ->
|
|||
#{mnesia => fun() -> set_in_mnesia(Key, Term) end}).
|
||||
|
||||
set_in_mnesia(Key, Term) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> set_in_mnesia_tx(Key, Term) end).
|
||||
|
||||
-spec set(VHostName, Comp, Name, Term) -> Ret when
|
||||
|
|
@ -63,7 +63,7 @@ set(VHostName, Comp, Name, Term)
|
|||
#{mnesia => fun() -> set_in_mnesia(VHostName, Key, Term) end}).
|
||||
|
||||
set_in_mnesia(VHostName, Key, Term) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
rabbit_db_vhost:with_fun_in_mnesia_tx(
|
||||
VHostName,
|
||||
fun() -> set_in_mnesia_tx(Key, Term) end)).
|
||||
|
|
@ -131,7 +131,7 @@ get_or_set(Key, Default) ->
|
|||
#{mnesia => fun() -> get_or_set_in_mnesia(Key, Default) end}).
|
||||
|
||||
get_or_set_in_mnesia(Key, Default) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> get_or_set_in_mnesia_tx(Key, Default) end).
|
||||
|
||||
get_or_set_in_mnesia_tx(Key, Default) ->
|
||||
|
|
@ -162,7 +162,7 @@ get_all() ->
|
|||
#{mnesia => fun() -> get_all_in_mnesia() end}).
|
||||
|
||||
get_all_in_mnesia() ->
|
||||
rabbit_misc:dirty_read_all(?MNESIA_TABLE).
|
||||
rabbit_mnesia:dirty_read_all(?MNESIA_TABLE).
|
||||
|
||||
-spec get_all(VHostName, Comp) -> Ret when
|
||||
VHostName :: vhost:name() | '_',
|
||||
|
|
@ -230,14 +230,14 @@ delete(VHostName, Comp, Name)
|
|||
fun() -> delete_matching_in_mnesia(VHostName, Comp, Name) end}).
|
||||
|
||||
delete_in_mnesia(Key) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> delete_in_mnesia_tx(Key) end).
|
||||
|
||||
delete_in_mnesia_tx(Key) ->
|
||||
mnesia:delete(?MNESIA_TABLE, Key, write).
|
||||
|
||||
delete_matching_in_mnesia(VHostName, Comp, Name) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> delete_matching_in_mnesia_tx(VHostName, Comp, Name) end).
|
||||
|
||||
delete_matching_in_mnesia_tx(VHostName, Comp, Name) ->
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ split_topic_key(Key) ->
|
|||
split_topic_key(Key, [], []).
|
||||
|
||||
insert_in_mnesia(XName, RoutingKey, Destination, Args) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
FinalNode = follow_down_create(XName, split_topic_key(RoutingKey)),
|
||||
trie_add_binding(XName, FinalNode, Destination, Args),
|
||||
|
|
@ -93,7 +93,7 @@ insert_in_mnesia(XName, RoutingKey, Destination, Args) ->
|
|||
end).
|
||||
|
||||
delete_all_for_exchange_in_mnesia(XName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
trie_remove_all_nodes(XName),
|
||||
trie_remove_all_edges(XName),
|
||||
|
|
@ -149,7 +149,7 @@ delete_in_mnesia_tx(Bs) ->
|
|||
ok.
|
||||
|
||||
delete_in_mnesia(Bs) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> delete_in_mnesia_tx(Bs) end).
|
||||
|
||||
split_topic_key(<<>>, [], []) ->
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ create(User) ->
|
|||
#{mnesia => fun() -> create_in_mnesia(Username, User) end}).
|
||||
|
||||
create_in_mnesia(Username, User) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> create_in_mnesia_tx(Username, User) end).
|
||||
|
||||
create_in_mnesia_tx(Username, User) ->
|
||||
|
|
@ -78,7 +78,7 @@ update(Username, UpdateFun)
|
|||
#{mnesia => fun() -> update_in_mnesia(Username, UpdateFun) end}).
|
||||
|
||||
update_in_mnesia(Username, UpdateFun) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> update_in_mnesia_tx(Username, UpdateFun) end).
|
||||
|
||||
update_in_mnesia_tx(Username, UpdateFun) ->
|
||||
|
|
@ -218,26 +218,26 @@ check_and_match_user_permissions(Username, VHostName)
|
|||
fun() -> match_user_permissions_in_mnesia(Username, VHostName) end}).
|
||||
|
||||
match_user_permissions_in_mnesia('_' = Username, '_' = VHostName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
match_user_permissions_in_mnesia_tx(Username, VHostName)
|
||||
end);
|
||||
match_user_permissions_in_mnesia('_' = Username, VHostName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
rabbit_db_vhost:with_fun_in_mnesia_tx(
|
||||
VHostName,
|
||||
fun() ->
|
||||
match_user_permissions_in_mnesia_tx(Username, VHostName)
|
||||
end));
|
||||
match_user_permissions_in_mnesia(Username, '_' = VHostName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
with_fun_in_mnesia_tx(
|
||||
Username,
|
||||
fun() ->
|
||||
match_user_permissions_in_mnesia_tx(Username, VHostName)
|
||||
end));
|
||||
match_user_permissions_in_mnesia(Username, VHostName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
with_fun_in_mnesia_tx(
|
||||
Username,
|
||||
rabbit_db_vhost:with_fun_in_mnesia_tx(
|
||||
|
|
@ -278,7 +278,7 @@ set_user_permissions(
|
|||
end}).
|
||||
|
||||
set_user_permissions_in_mnesia(Username, VHostName, UserPermission) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
with_fun_in_mnesia_tx(
|
||||
Username,
|
||||
rabbit_db_vhost:with_fun_in_mnesia_tx(
|
||||
|
|
@ -307,7 +307,7 @@ clear_user_permissions(Username, VHostName)
|
|||
fun() -> clear_user_permissions_in_mnesia(Username, VHostName) end}).
|
||||
|
||||
clear_user_permissions_in_mnesia(Username, VHostName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> clear_user_permissions_in_mnesia_tx(Username, VHostName) end).
|
||||
|
||||
clear_user_permissions_in_mnesia_tx(Username, VHostName) ->
|
||||
|
|
@ -340,7 +340,7 @@ clear_matching_user_permissions(Username, VHostName)
|
|||
}).
|
||||
|
||||
clear_matching_user_permissions_in_mnesia(Username, VHostName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
clear_matching_user_permissions_in_mnesia_tx( Username, VHostName)
|
||||
end).
|
||||
|
|
@ -422,14 +422,14 @@ check_and_match_topic_permissions(Username, VHostName, ExchangeName)
|
|||
|
||||
match_topic_permissions_in_mnesia(
|
||||
'_' = Username, '_' = VHostName, ExchangeName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
match_topic_permissions_in_mnesia_tx(
|
||||
Username, VHostName, ExchangeName)
|
||||
end);
|
||||
match_topic_permissions_in_mnesia(
|
||||
'_' = Username, VHostName, ExchangeName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
rabbit_db_vhost:with_fun_in_mnesia_tx(
|
||||
VHostName,
|
||||
fun() ->
|
||||
|
|
@ -438,7 +438,7 @@ match_topic_permissions_in_mnesia(
|
|||
end));
|
||||
match_topic_permissions_in_mnesia(
|
||||
Username, '_' = VHostName, ExchangeName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
with_fun_in_mnesia_tx(
|
||||
Username,
|
||||
fun() ->
|
||||
|
|
@ -447,7 +447,7 @@ match_topic_permissions_in_mnesia(
|
|||
end));
|
||||
match_topic_permissions_in_mnesia(
|
||||
Username, VHostName, ExchangeName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
with_fun_in_mnesia_tx(
|
||||
Username,
|
||||
rabbit_db_vhost:with_fun_in_mnesia_tx(
|
||||
|
|
@ -495,7 +495,7 @@ set_topic_permissions(
|
|||
end}).
|
||||
|
||||
set_topic_permissions_in_mnesia(Username, VHostName, TopicPermission) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
with_fun_in_mnesia_tx(
|
||||
Username,
|
||||
rabbit_db_vhost:with_fun_in_mnesia_tx(
|
||||
|
|
@ -531,7 +531,7 @@ clear_topic_permissions(Username, VHostName, ExchangeName)
|
|||
end}).
|
||||
|
||||
clear_topic_permissions_in_mnesia(Username, VHostName, ExchangeName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
clear_topic_permissions_in_mnesia_tx(
|
||||
Username, VHostName, ExchangeName)
|
||||
|
|
@ -569,7 +569,7 @@ clear_matching_topic_permissions(Username, VHostName, ExchangeName)
|
|||
|
||||
clear_matching_topic_permissions_in_mnesia(
|
||||
Username, VHostName, ExchangeName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
clear_matching_topic_permissions_in_mnesia_tx(
|
||||
Username, VHostName, ExchangeName)
|
||||
|
|
@ -604,7 +604,7 @@ delete(Username) when is_binary(Username) ->
|
|||
#{mnesia => fun() -> delete_in_mnesia(Username) end}).
|
||||
|
||||
delete_in_mnesia(Username) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> delete_in_mnesia_tx(Username) end).
|
||||
|
||||
delete_in_mnesia_tx(Username) ->
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ create_or_get(VHostName, Limits, Metadata)
|
|||
#{mnesia => fun() -> create_or_get_in_mnesia(VHostName, VHost) end}).
|
||||
|
||||
create_or_get_in_mnesia(VHostName, VHost) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> create_or_get_in_mnesia_tx(VHostName, VHost) end).
|
||||
|
||||
create_or_get_in_mnesia_tx(VHostName, VHost) ->
|
||||
|
|
@ -99,7 +99,7 @@ do_merge_metadata(VHostName, Metadata) ->
|
|||
#{mnesia => fun() -> merge_metadata_in_mnesia(VHostName, Metadata) end}).
|
||||
|
||||
merge_metadata_in_mnesia(VHostName, Metadata) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> merge_metadata_in_mnesia_tx(VHostName, Metadata) end).
|
||||
|
||||
merge_metadata_in_mnesia_tx(VHostName, Metadata) ->
|
||||
|
|
@ -135,7 +135,7 @@ set_tags(VHostName, Tags)
|
|||
#{mnesia => fun() -> set_tags_in_mnesia(VHostName, ConvertedTags) end}).
|
||||
|
||||
set_tags_in_mnesia(VHostName, Tags) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> set_tags_in_mnesia_tx(VHostName, Tags) end).
|
||||
|
||||
set_tags_in_mnesia_tx(VHostName, Tags) ->
|
||||
|
|
@ -251,7 +251,7 @@ update(VHostName, UpdateFun)
|
|||
#{mnesia => fun() -> update_in_mnesia(VHostName, UpdateFun) end}).
|
||||
|
||||
update_in_mnesia(VHostName, UpdateFun) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> update_in_mnesia_tx(VHostName, UpdateFun) end).
|
||||
|
||||
update_in_mnesia_tx(VHostName, UpdateFun)
|
||||
|
|
@ -312,7 +312,7 @@ delete(VHostName) when is_binary(VHostName) ->
|
|||
#{mnesia => fun() -> delete_in_mnesia(VHostName) end}).
|
||||
|
||||
delete_in_mnesia(VHostName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> delete_in_mnesia_tx(VHostName) end).
|
||||
|
||||
delete_in_mnesia_tx(VHostName) ->
|
||||
|
|
|
|||
|
|
@ -320,7 +320,7 @@ init([Q, GM, DeathFun, DepthFun]) when ?is_amqqueue(Q) ->
|
|||
undefined ->
|
||||
{ok, GM2} = gm:start_link(
|
||||
QueueName, ?MODULE, [self()],
|
||||
fun rabbit_misc:execute_mnesia_transaction/1),
|
||||
fun rabbit_mnesia:execute_mnesia_transaction/1),
|
||||
receive {joined, GM2, _Members} ->
|
||||
ok
|
||||
end,
|
||||
|
|
|
|||
|
|
@ -106,7 +106,7 @@ init_with_existing_bq(Q0, BQ, BQS) when ?is_amqqueue(Q0) ->
|
|||
%% need to handle migration.
|
||||
ok = rabbit_amqqueue:store_queue(Q3)
|
||||
end,
|
||||
ok = rabbit_misc:execute_mnesia_transaction(Fun),
|
||||
ok = rabbit_mnesia:execute_mnesia_transaction(Fun),
|
||||
{_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0),
|
||||
%% We need synchronous add here (i.e. do not return until the
|
||||
%% mirror is running) so that when queue declaration is finished
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@
|
|||
{'error', {'not_synced', [pid()]}}.
|
||||
|
||||
remove_from_queue(QueueName, Self, DeadGMPids) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
%% Someone else could have deleted the queue before we
|
||||
%% get here. Or, gm group could've altered. see rabbitmq-server#914
|
||||
|
|
@ -162,7 +162,7 @@ slaves_to_start_on_failure(Q, DeadGMPids) ->
|
|||
|
||||
on_vhost_up(VHost) ->
|
||||
QNames =
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
mnesia:foldl(
|
||||
fun
|
||||
|
|
@ -353,7 +353,7 @@ stop_all_slaves(Reason, SPids, QName, GM, WaitTimeout) ->
|
|||
%% Normally when we remove a mirror another mirror or master will
|
||||
%% notice and update Mnesia. But we just removed them all, and
|
||||
%% have stopped listening ourselves. So manually clean up.
|
||||
rabbit_misc:execute_mnesia_transaction(fun () ->
|
||||
rabbit_mnesia:execute_mnesia_transaction(fun () ->
|
||||
[Q0] = mnesia:read({rabbit_queue, QName}),
|
||||
Q1 = amqqueue:set_gm_pids(Q0, []),
|
||||
Q2 = amqqueue:set_slave_pids(Q1, []),
|
||||
|
|
|
|||
|
|
@ -106,7 +106,7 @@ handle_go(Q0) when ?is_amqqueue(Q0) ->
|
|||
%%
|
||||
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
|
||||
{ok, GM} = gm:start_link(QName, ?MODULE, [self()],
|
||||
fun rabbit_misc:execute_mnesia_transaction/1),
|
||||
fun rabbit_mnesia:execute_mnesia_transaction/1),
|
||||
MRef = erlang:monitor(process, GM),
|
||||
%% We ignore the DOWN message because we are also linked and
|
||||
%% trapping exits, we just want to not get stuck and we will exit
|
||||
|
|
@ -118,7 +118,7 @@ handle_go(Q0) when ?is_amqqueue(Q0) ->
|
|||
end,
|
||||
Self = self(),
|
||||
Node = node(),
|
||||
case rabbit_misc:execute_mnesia_transaction(
|
||||
case rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> init_it(Self, GM, Node, QName) end) of
|
||||
{new, QPid, GMPids} ->
|
||||
ok = file_handle_cache:register_callback(
|
||||
|
|
@ -1088,7 +1088,7 @@ record_synchronised(Q0) when ?is_amqqueue(Q0) ->
|
|||
{ok, Q2}
|
||||
end
|
||||
end,
|
||||
case rabbit_misc:execute_mnesia_transaction(F) of
|
||||
case rabbit_mnesia:execute_mnesia_transaction(F) of
|
||||
ok -> ok;
|
||||
{ok, Q2} -> rabbit_mirror_queue_misc:maybe_drop_master_after_sync(Q2)
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -43,6 +43,16 @@
|
|||
schema_info/1
|
||||
]).
|
||||
|
||||
%% Mnesia queries
|
||||
-export([
|
||||
table_filter/3,
|
||||
dirty_read_all/1,
|
||||
dirty_read/1,
|
||||
execute_mnesia_tx_with_tail/1,
|
||||
execute_mnesia_transaction/1,
|
||||
execute_mnesia_transaction/2
|
||||
]).
|
||||
|
||||
%% Used internally in rpc calls
|
||||
-export([node_info/0, remove_node_if_mnesia_running/1]).
|
||||
|
||||
|
|
@ -796,6 +806,103 @@ info(Table, Items) ->
|
|||
All = [{name, Table} | mnesia:table_info(Table, all)],
|
||||
[{Item, proplists:get_value(Item, All)} || Item <- Items].
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Queries
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec table_filter
|
||||
(fun ((A) -> boolean()), fun ((A, boolean()) -> 'ok'), atom()) -> [A].
|
||||
%% Apply a pre-post-commit function to all entries in a table that
|
||||
%% satisfy a predicate, and return those entries.
|
||||
%%
|
||||
%% We ignore entries that have been modified or removed.
|
||||
table_filter(Pred, PrePostCommitFun, TableName) ->
|
||||
lists:foldl(
|
||||
fun (E, Acc) ->
|
||||
case execute_mnesia_transaction(
|
||||
fun () -> mnesia:match_object(TableName, E, read) =/= []
|
||||
andalso Pred(E) end,
|
||||
fun (false, _Tx) -> false;
|
||||
(true, Tx) -> PrePostCommitFun(E, Tx), true
|
||||
end) of
|
||||
false -> Acc;
|
||||
true -> [E | Acc]
|
||||
end
|
||||
end, [], dirty_read_all(TableName)).
|
||||
|
||||
-spec dirty_read_all(atom()) -> [any()].
|
||||
dirty_read_all(TableName) ->
|
||||
mnesia:dirty_select(TableName, [{'$1',[],['$1']}]).
|
||||
|
||||
-spec dirty_read({atom(), any()}) ->
|
||||
rabbit_types:ok_or_error2(any(), 'not_found').
|
||||
%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
|
||||
%% expensive due to general mnesia overheads (figuring out table types
|
||||
%% and locations, etc). We get away with bypassing these because we
|
||||
%% know that the tables we are looking at here
|
||||
%% - are not the schema table
|
||||
%% - have a local ram copy
|
||||
%% - do not have any indices
|
||||
dirty_read({Table, Key}) ->
|
||||
case ets:lookup(Table, Key) of
|
||||
[Result] -> {ok, Result};
|
||||
[] -> {error, not_found}
|
||||
end.
|
||||
|
||||
-spec execute_mnesia_tx_with_tail
|
||||
(rabbit_misc:thunk(fun ((boolean()) -> B))) -> B | (fun ((boolean()) -> B)).
|
||||
%% Like execute_mnesia_transaction/2, but TxFun is expected to return a
|
||||
%% TailFun which gets called (only) immediately after the tx commit
|
||||
execute_mnesia_tx_with_tail(TxFun) ->
|
||||
case mnesia:is_transaction() of
|
||||
true -> execute_mnesia_transaction(TxFun);
|
||||
false -> TailFun = execute_mnesia_transaction(TxFun),
|
||||
TailFun()
|
||||
end.
|
||||
|
||||
-spec execute_mnesia_transaction(rabbit_misc:thunk(A)) -> A.
|
||||
execute_mnesia_transaction(TxFun) ->
|
||||
%% Making this a sync_transaction allows us to use dirty_read
|
||||
%% elsewhere and get a consistent result even when that read
|
||||
%% executes on a different node.
|
||||
case worker_pool:submit(
|
||||
fun () ->
|
||||
case mnesia:is_transaction() of
|
||||
false -> DiskLogBefore = mnesia_dumper:get_log_writes(),
|
||||
Res = mnesia:sync_transaction(TxFun),
|
||||
DiskLogAfter = mnesia_dumper:get_log_writes(),
|
||||
case DiskLogAfter == DiskLogBefore of
|
||||
true -> file_handle_cache_stats:update(
|
||||
mnesia_ram_tx),
|
||||
Res;
|
||||
false -> file_handle_cache_stats:update(
|
||||
mnesia_disk_tx),
|
||||
{sync, Res}
|
||||
end;
|
||||
true -> mnesia:sync_transaction(TxFun)
|
||||
end
|
||||
end, single) of
|
||||
{sync, {atomic, Result}} -> mnesia_sync:sync(), Result;
|
||||
{sync, {aborted, Reason}} -> throw({error, Reason});
|
||||
{atomic, Result} -> Result;
|
||||
{aborted, Reason} -> throw({error, Reason})
|
||||
end.
|
||||
|
||||
-spec execute_mnesia_transaction(rabbit_misc:thunk(A), fun ((A, boolean()) -> B)) -> B.
|
||||
%% Like execute_mnesia_transaction/1 with additional Pre- and Post-
|
||||
%% commit function
|
||||
execute_mnesia_transaction(TxFun, PrePostCommitFun) ->
|
||||
case mnesia:is_transaction() of
|
||||
true -> throw(unexpected_transaction);
|
||||
false -> ok
|
||||
end,
|
||||
PrePostCommitFun(execute_mnesia_transaction(
|
||||
fun () ->
|
||||
Result = TxFun(),
|
||||
PrePostCommitFun(Result, true),
|
||||
Result
|
||||
end), false).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal helpers
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -257,7 +257,7 @@ listener_of_protocol(Protocol) ->
|
|||
end.
|
||||
|
||||
listener_of_protocol_mnesia(Protocol) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
MatchSpec = #listener{
|
||||
node = node(),
|
||||
|
|
|
|||
|
|
@ -1359,13 +1359,10 @@ dead_letter_extra_bcc(Config) ->
|
|||
ok.
|
||||
|
||||
set_queue_options(QName, Options) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
rabbit_amqqueue:update(rabbit_misc:r(<<"/">>, queue, QName),
|
||||
fun(Q) ->
|
||||
amqqueue:set_options(Q, Options)
|
||||
end)
|
||||
end).
|
||||
rabbit_amqqueue:update(rabbit_misc:r(<<"/">>, queue, QName),
|
||||
fun(Q) ->
|
||||
amqqueue:set_options(Q, Options)
|
||||
end).
|
||||
|
||||
metric_maxlen(Config) ->
|
||||
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
||||
|
|
|
|||
|
|
@ -838,9 +838,7 @@ set_queue_options(Config, QName, Options) ->
|
|||
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, set_queue_options1, [QName, Options]).
|
||||
|
||||
set_queue_options1(QName, Options) ->
|
||||
rabbit_misc:execute_mnesia_transaction(fun() ->
|
||||
rabbit_amqqueue:update(rabbit_misc:r(<<"/">>, queue, QName),
|
||||
fun(Q) ->
|
||||
amqqueue:set_options(Q, Options)
|
||||
end)
|
||||
end).
|
||||
rabbit_amqqueue:update(rabbit_misc:r(<<"/">>, queue, QName),
|
||||
fun(Q) ->
|
||||
amqqueue:set_options(Q, Options)
|
||||
end).
|
||||
|
|
|
|||
|
|
@ -145,7 +145,7 @@ topic_permission_checks(Config) ->
|
|||
|
||||
topic_permission_checks1(_Config) ->
|
||||
0 = length(ets:tab2list(rabbit_topic_permission)),
|
||||
rabbit_misc:execute_mnesia_transaction(fun() ->
|
||||
rabbit_mnesia:execute_mnesia_transaction(fun() ->
|
||||
ok = mnesia:write(rabbit_vhost,
|
||||
vhost:new(<<"/">>, []),
|
||||
write),
|
||||
|
|
|
|||
|
|
@ -422,7 +422,7 @@ topic_matching1(_Config) ->
|
|||
passed.
|
||||
|
||||
exchange_op_callback(X, Fun, Args) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () -> rabbit_exchange:callback(X, Fun, transaction, [X] ++ Args) end),
|
||||
rabbit_exchange:callback(X, Fun, none, [X] ++ Args).
|
||||
|
||||
|
|
|
|||
|
|
@ -70,7 +70,7 @@ member_death(_Config) ->
|
|||
fun (Pid, Pid2) ->
|
||||
{ok, Pid3} = gm:start_link(
|
||||
?MODULE, ?MODULE, self(),
|
||||
fun rabbit_misc:execute_mnesia_transaction/1),
|
||||
fun rabbit_mnesia:execute_mnesia_transaction/1),
|
||||
passed = receive_joined(Pid3, [Pid, Pid2, Pid3],
|
||||
timeout_joining_gm_group_3),
|
||||
passed = receive_birth(Pid, Pid3, timeout_waiting_for_birth_3_1),
|
||||
|
|
@ -120,10 +120,10 @@ down_in_members_change(_Config) ->
|
|||
%% Setup
|
||||
ok = gm:create_tables(),
|
||||
{ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(),
|
||||
fun rabbit_misc:execute_mnesia_transaction/1),
|
||||
fun rabbit_mnesia:execute_mnesia_transaction/1),
|
||||
passed = receive_joined(Pid, [Pid], timeout_joining_gm_group_1),
|
||||
{ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self(),
|
||||
fun rabbit_misc:execute_mnesia_transaction/1),
|
||||
fun rabbit_mnesia:execute_mnesia_transaction/1),
|
||||
passed = receive_joined(Pid2, [Pid, Pid2], timeout_joining_gm_group_2),
|
||||
passed = receive_birth(Pid, Pid2, timeout_waiting_for_birth_2),
|
||||
|
||||
|
|
@ -167,11 +167,11 @@ with_two_members(Fun) ->
|
|||
ok = gm:create_tables(),
|
||||
|
||||
{ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(),
|
||||
fun rabbit_misc:execute_mnesia_transaction/1),
|
||||
fun rabbit_mnesia:execute_mnesia_transaction/1),
|
||||
passed = receive_joined(Pid, [Pid], timeout_joining_gm_group_1),
|
||||
|
||||
{ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self(),
|
||||
fun rabbit_misc:execute_mnesia_transaction/1),
|
||||
fun rabbit_mnesia:execute_mnesia_transaction/1),
|
||||
passed = receive_joined(Pid2, [Pid, Pid2], timeout_joining_gm_group_2),
|
||||
passed = receive_birth(Pid, Pid2, timeout_waiting_for_birth_2),
|
||||
|
||||
|
|
|
|||
|
|
@ -23,7 +23,6 @@
|
|||
-export([die/1, frame_error/2, amqp_error/4, quit/1,
|
||||
protocol_error/3, protocol_error/4, protocol_error/1]).
|
||||
-export([type_class/1, assert_args_equivalence/4, assert_field_equivalence/4]).
|
||||
-export([dirty_read/1]).
|
||||
-export([table_lookup/2, set_table_value/4, amqp_table/1, to_amqp_table/1]).
|
||||
-export([r/3, r/2, r_arg/4, rs/1]).
|
||||
-export([enable_cover/0, report_cover/0]).
|
||||
|
|
@ -31,15 +30,10 @@
|
|||
-export([start_cover/1]).
|
||||
-export([throw_on_error/2, with_exit_handler/2, is_abnormal_exit/1,
|
||||
filter_exit_map/2]).
|
||||
-export([with_user/2]).
|
||||
-export([execute_mnesia_transaction/1]).
|
||||
-export([execute_mnesia_transaction/2]).
|
||||
-export([execute_mnesia_tx_with_tail/1]).
|
||||
-export([ensure_ok/2]).
|
||||
-export([tcp_name/3, format_inet_error/1]).
|
||||
-export([upmap/2, map_in_order/2, utf8_safe/1]).
|
||||
-export([table_filter/3]).
|
||||
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
|
||||
-export([dirty_dump_log/1]).
|
||||
-export([format/2, format_many/1, format_stderr/2]).
|
||||
-export([unfold/2, ceil/1, queue_fold/3]).
|
||||
-export([sort_field_table/1]).
|
||||
|
|
@ -140,8 +134,6 @@
|
|||
-spec equivalence_fail
|
||||
(any(), any(), rabbit_types:r(any()), atom() | binary()) ->
|
||||
rabbit_types:connection_exit().
|
||||
-spec dirty_read({atom(), any()}) ->
|
||||
rabbit_types:ok_or_error2(any(), 'not_found').
|
||||
-spec table_lookup(rabbit_framing:amqp_table(), binary()) ->
|
||||
'undefined' | {rabbit_framing:amqp_field_type(), any()}.
|
||||
-spec set_table_value
|
||||
|
|
@ -172,22 +164,12 @@
|
|||
-spec with_exit_handler(thunk(A), thunk(A)) -> A.
|
||||
-spec is_abnormal_exit(any()) -> boolean().
|
||||
-spec filter_exit_map(fun ((A) -> B), [A]) -> [B].
|
||||
-spec with_user(rabbit_types:username(), thunk(A)) -> A.
|
||||
-spec execute_mnesia_transaction(thunk(A)) -> A.
|
||||
-spec execute_mnesia_transaction(thunk(A), fun ((A, boolean()) -> B)) -> B.
|
||||
-spec execute_mnesia_tx_with_tail
|
||||
(thunk(fun ((boolean()) -> B))) -> B | (fun ((boolean()) -> B)).
|
||||
-spec ensure_ok(ok_or_error(), atom()) -> 'ok'.
|
||||
-spec tcp_name(atom(), inet:ip_address(), rabbit_net:ip_port()) ->
|
||||
atom().
|
||||
-spec format_inet_error(atom()) -> string().
|
||||
-spec upmap(fun ((A) -> B), [A]) -> [B].
|
||||
-spec map_in_order(fun ((A) -> B), [A]) -> [B].
|
||||
-spec table_filter
|
||||
(fun ((A) -> boolean()), fun ((A, boolean()) -> 'ok'), atom()) -> [A].
|
||||
-spec dirty_read_all(atom()) -> [any()].
|
||||
-spec dirty_foreach_key(fun ((any()) -> any()), atom()) ->
|
||||
'ok' | 'aborted'.
|
||||
-spec dirty_dump_log(file:filename()) -> ok_or_error().
|
||||
-spec format(string(), [any()]) -> string().
|
||||
-spec format_many([{string(), [any()]}]) -> string().
|
||||
|
|
@ -361,19 +343,6 @@ val(Value) ->
|
|||
false -> "'~tp'"
|
||||
end, [Value]).
|
||||
|
||||
%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
|
||||
%% expensive due to general mnesia overheads (figuring out table types
|
||||
%% and locations, etc). We get away with bypassing these because we
|
||||
%% know that the tables we are looking at here
|
||||
%% - are not the schema table
|
||||
%% - have a local ram copy
|
||||
%% - do not have any indices
|
||||
dirty_read({Table, Key}) ->
|
||||
case ets:lookup(Table, Key) of
|
||||
[Result] -> {ok, Result};
|
||||
[] -> {error, not_found}
|
||||
end.
|
||||
|
||||
%%
|
||||
%% Attribute Tables
|
||||
%%
|
||||
|
|
@ -544,67 +513,6 @@ filter_exit_map(F, L) ->
|
|||
fun () -> Ref end,
|
||||
fun () -> F(I) end) || I <- L]).
|
||||
|
||||
|
||||
with_user(Username, Thunk) ->
|
||||
fun () ->
|
||||
case mnesia:read({rabbit_user, Username}) of
|
||||
[] ->
|
||||
mnesia:abort({no_such_user, Username});
|
||||
[_U] ->
|
||||
Thunk()
|
||||
end
|
||||
end.
|
||||
|
||||
execute_mnesia_transaction(TxFun) ->
|
||||
%% Making this a sync_transaction allows us to use dirty_read
|
||||
%% elsewhere and get a consistent result even when that read
|
||||
%% executes on a different node.
|
||||
case worker_pool:submit(
|
||||
fun () ->
|
||||
case mnesia:is_transaction() of
|
||||
false -> DiskLogBefore = mnesia_dumper:get_log_writes(),
|
||||
Res = mnesia:sync_transaction(TxFun),
|
||||
DiskLogAfter = mnesia_dumper:get_log_writes(),
|
||||
case DiskLogAfter == DiskLogBefore of
|
||||
true -> file_handle_cache_stats:update(
|
||||
mnesia_ram_tx),
|
||||
Res;
|
||||
false -> file_handle_cache_stats:update(
|
||||
mnesia_disk_tx),
|
||||
{sync, Res}
|
||||
end;
|
||||
true -> mnesia:sync_transaction(TxFun)
|
||||
end
|
||||
end, single) of
|
||||
{sync, {atomic, Result}} -> mnesia_sync:sync(), Result;
|
||||
{sync, {aborted, Reason}} -> throw({error, Reason});
|
||||
{atomic, Result} -> Result;
|
||||
{aborted, Reason} -> throw({error, Reason})
|
||||
end.
|
||||
|
||||
%% Like execute_mnesia_transaction/1 with additional Pre- and Post-
|
||||
%% commit function
|
||||
execute_mnesia_transaction(TxFun, PrePostCommitFun) ->
|
||||
case mnesia:is_transaction() of
|
||||
true -> throw(unexpected_transaction);
|
||||
false -> ok
|
||||
end,
|
||||
PrePostCommitFun(execute_mnesia_transaction(
|
||||
fun () ->
|
||||
Result = TxFun(),
|
||||
PrePostCommitFun(Result, true),
|
||||
Result
|
||||
end), false).
|
||||
|
||||
%% Like execute_mnesia_transaction/2, but TxFun is expected to return a
|
||||
%% TailFun which gets called (only) immediately after the tx commit
|
||||
execute_mnesia_tx_with_tail(TxFun) ->
|
||||
case mnesia:is_transaction() of
|
||||
true -> execute_mnesia_transaction(TxFun);
|
||||
false -> TailFun = execute_mnesia_transaction(TxFun),
|
||||
TailFun()
|
||||
end.
|
||||
|
||||
ensure_ok(ok, _) -> ok;
|
||||
ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}).
|
||||
|
||||
|
|
@ -660,41 +568,6 @@ map_in_order(F, L) ->
|
|||
lists:reverse(
|
||||
lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)).
|
||||
|
||||
%% Apply a pre-post-commit function to all entries in a table that
|
||||
%% satisfy a predicate, and return those entries.
|
||||
%%
|
||||
%% We ignore entries that have been modified or removed.
|
||||
table_filter(Pred, PrePostCommitFun, TableName) ->
|
||||
lists:foldl(
|
||||
fun (E, Acc) ->
|
||||
case execute_mnesia_transaction(
|
||||
fun () -> mnesia:match_object(TableName, E, read) =/= []
|
||||
andalso Pred(E) end,
|
||||
fun (false, _Tx) -> false;
|
||||
(true, Tx) -> PrePostCommitFun(E, Tx), true
|
||||
end) of
|
||||
false -> Acc;
|
||||
true -> [E | Acc]
|
||||
end
|
||||
end, [], dirty_read_all(TableName)).
|
||||
|
||||
dirty_read_all(TableName) ->
|
||||
mnesia:dirty_select(TableName, [{'$1',[],['$1']}]).
|
||||
|
||||
dirty_foreach_key(F, TableName) ->
|
||||
dirty_foreach_key1(F, TableName, mnesia:dirty_first(TableName)).
|
||||
|
||||
dirty_foreach_key1(_F, _TableName, '$end_of_table') ->
|
||||
ok;
|
||||
dirty_foreach_key1(F, TableName, K) ->
|
||||
case catch mnesia:dirty_next(TableName, K) of
|
||||
{'EXIT', _} ->
|
||||
aborted;
|
||||
NextKey ->
|
||||
F(K),
|
||||
dirty_foreach_key1(F, TableName, NextKey)
|
||||
end.
|
||||
|
||||
dirty_dump_log(FileName) ->
|
||||
{ok, LH} = disk_log:open([{name, dirty_dump_log},
|
||||
{mode, read_only},
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ create(X) ->
|
|||
}).
|
||||
|
||||
create_in_mnesia(X) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> create_in_mnesia_tx(X) end).
|
||||
|
||||
create_in_mnesia_tx(X) ->
|
||||
|
|
@ -59,7 +59,7 @@ create_binding(Src, Dst, Weight, UpdateFun) ->
|
|||
}).
|
||||
|
||||
create_binding_in_mnesia(Src, Dst, Weight, UpdateFun) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
create_binding_in_mnesia_tx(Src, Dst, Weight, UpdateFun)
|
||||
end).
|
||||
|
|
@ -98,7 +98,7 @@ delete(XName) ->
|
|||
}).
|
||||
|
||||
delete_in_mnesia(XName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
mnesia:write_lock_table(?HASH_RING_STATE_TABLE),
|
||||
mnesia:delete({?HASH_RING_STATE_TABLE, XName})
|
||||
|
|
@ -110,7 +110,7 @@ delete_bindings(Bindings, DeleteFun) ->
|
|||
}).
|
||||
|
||||
delete_bindings_in_mnesia(Bindings, DeleteFun) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
[delete_binding_in_mnesia(Binding, DeleteFun) || Binding <- Bindings]
|
||||
end).
|
||||
|
|
|
|||
|
|
@ -439,7 +439,7 @@ go(S0 = {not_started, {Upstream, UParams, DownXName}}) ->
|
|||
_ -> unknown
|
||||
end,
|
||||
{Serial, Bindings} =
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
{rabbit_exchange:peek_serial(DownXName),
|
||||
rabbit_binding:list_for_source(DownXName)}
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ create_or_update(XName, BindingKeyAndFun, ErrorFun) ->
|
|||
}).
|
||||
|
||||
create_or_update_in_mnesia(XName, BindingKeyAndFun, ErrorFun) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
#?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} =
|
||||
read_state_in_mnesia(XName, ErrorFun),
|
||||
|
|
@ -66,7 +66,7 @@ insert(XName, BFuns) ->
|
|||
}).
|
||||
|
||||
insert_in_mnesia(XName, BFuns) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
write_state_fun_in_mnesia(XName, BFuns)
|
||||
end).
|
||||
|
|
@ -103,7 +103,7 @@ delete(XName) ->
|
|||
}).
|
||||
|
||||
delete_in_mnesia(XName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> mnesia:delete(?JMS_TOPIC_TABLE, XName, write) end).
|
||||
|
||||
delete(XName, BindingKeys, ErrorFun) ->
|
||||
|
|
@ -113,7 +113,7 @@ delete(XName, BindingKeys, ErrorFun) ->
|
|||
}).
|
||||
|
||||
delete_in_mnesia(XName, BindingKeys, ErrorFun) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
#?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} =
|
||||
read_state_in_mnesia(XName, ErrorFun),
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ get(XName) ->
|
|||
}).
|
||||
|
||||
get_in_mnesia(XName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() -> get_in_mnesia_tx(XName) end).
|
||||
|
||||
get_in_mnesia_tx(XName) ->
|
||||
|
|
@ -67,7 +67,7 @@ insert(XName, Message, Length) ->
|
|||
}).
|
||||
|
||||
insert_in_mnesia(XName, Message, Length) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun () ->
|
||||
Cached = get_in_mnesia_tx(XName),
|
||||
insert_in_mnesia(XName, Cached, Message, Length)
|
||||
|
|
@ -109,7 +109,7 @@ delete(XName) ->
|
|||
}).
|
||||
|
||||
delete_in_mnesia(XName) ->
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
rabbit_mnesia:execute_mnesia_transaction(
|
||||
fun() ->
|
||||
mnesia:delete(?RH_TABLE, XName, write)
|
||||
end).
|
||||
|
|
|
|||
Loading…
Reference in New Issue