Move jms topic exchange Mnesia-specific code to rabbit_db_* modules

This commit is contained in:
Diana Parra Corbacho 2023-01-13 14:01:52 +01:00
parent ba1670a95a
commit 5b39e7e4ce
3 changed files with 162 additions and 91 deletions

View File

@ -0,0 +1,143 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
%% -----------------------------------------------------------------------------
-module(rabbit_db_jms_exchange).
-include_lib("rabbit_common/include/rabbit.hrl").
-include("rabbit_jms_topic_exchange.hrl").
-export([
setup_schema/0,
create_or_update/3,
insert/2,
get/1,
delete/1,
delete/3
]).
%% -------------------------------------------------------------------
%% setup_schema()
%% -------------------------------------------------------------------
setup_schema() ->
rabbit_db:run(
#{mnesia => fun() -> setup_schema_in_mnesia() end
}).
setup_schema_in_mnesia() ->
case mnesia:create_table( ?JMS_TOPIC_TABLE
, [ {attributes, record_info(fields, ?JMS_TOPIC_RECORD)}
, {record_name, ?JMS_TOPIC_RECORD}
, {type, set} ]
) of
{atomic, ok} -> ok;
{aborted, {already_exists, ?JMS_TOPIC_TABLE}} -> ok
end,
ok.
%% -------------------------------------------------------------------
%% create_or_update().
%% -------------------------------------------------------------------
create_or_update(XName, BindingKeyAndFun, ErrorFun) ->
rabbit_db:run(
#{mnesia =>
fun() -> create_or_update_in_mnesia(XName, BindingKeyAndFun, ErrorFun) end
}).
create_or_update_in_mnesia(XName, BindingKeyAndFun, ErrorFun) ->
rabbit_misc:execute_mnesia_transaction(
fun() ->
#?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} =
read_state_in_mnesia(XName, ErrorFun),
write_state_fun_in_mnesia(XName, put_item(BindingFuns, BindingKeyAndFun))
end).
%% -------------------------------------------------------------------
%% insert().
%% -------------------------------------------------------------------
insert(XName, BFuns) ->
rabbit_db:run(
#{mnesia => fun() -> insert_in_mnesia(XName, BFuns) end
}).
insert_in_mnesia(XName, BFuns) ->
rabbit_misc:execute_mnesia_transaction(
fun() ->
write_state_fun_in_mnesia(XName, BFuns)
end).
%% -------------------------------------------------------------------
%% get().
%% -------------------------------------------------------------------
get(XName) ->
rabbit_db:run(
#{mnesia => fun() -> get_in_mnesia(XName) end
}).
get_in_mnesia(XName) ->
mnesia:async_dirty(
fun() ->
case mnesia:read(?JMS_TOPIC_TABLE, XName, read) of
[#?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns}] ->
BindingFuns;
_ ->
not_found
end
end,
[]
).
%% -------------------------------------------------------------------
%% delete().
%% -------------------------------------------------------------------
delete(XName) ->
rabbit_db:run(
#{mnesia => fun() -> delete_in_mnesia(XName) end
}).
delete_in_mnesia(XName) ->
rabbit_misc:execute_mnesia_transaction(
fun() -> mnesia:delete(?JMS_TOPIC_TABLE, XName, write) end).
delete(XName, BindingKeys, ErrorFun) ->
rabbit_db:run(
#{mnesia =>
fun() -> delete_in_mnesia(XName, BindingKeys, ErrorFun) end
}).
delete_in_mnesia(XName, BindingKeys, ErrorFun) ->
rabbit_misc:execute_mnesia_transaction(
fun() ->
#?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} =
read_state_in_mnesia(XName, ErrorFun),
write_state_fun_in_mnesia(XName, remove_items(BindingFuns, BindingKeys))
end).
read_state_in_mnesia(XName, ErrorFun) ->
case mnesia:read(?JMS_TOPIC_TABLE, XName, write) of
[Rec] -> Rec;
_ -> ErrorFun(XName)
end.
write_state_fun_in_mnesia(XName, BFuns) ->
mnesia:write( ?JMS_TOPIC_TABLE
, #?JMS_TOPIC_RECORD{x_name = XName, x_selector_funs = BFuns}
, write ).
%% -------------------------------------------------------------------
%% dictionary handling
%% -------------------------------------------------------------------
% add an item to the dictionary of binding functions
put_item(Dict, {Key, Item}) -> dict:store(Key, Item, Dict).
% remove a list of keyed items from the dictionary, by key
remove_items(Dict, []) -> Dict;
remove_items(Dict, [Key | Keys]) -> remove_items(dict:erase(Key, Dict), Keys).

View File

@ -22,7 +22,7 @@
, route/2
, validate/1
, create/2
, delete/3
, delete/2
, validate_binding/2
, add_binding/3
, remove_bindings/3
@ -55,14 +55,7 @@
% Initialise database table for all exchanges of type <<"x-jms-topic">>
setup_db_schema() ->
case mnesia:create_table( ?JMS_TOPIC_TABLE
, [ {attributes, record_info(fields, ?JMS_TOPIC_RECORD)}
, {record_name, ?JMS_TOPIC_RECORD}
, {type, set} ]
) of
{atomic, ok} -> ok;
{aborted, {already_exists, ?JMS_TOPIC_TABLE}} -> ok
end.
rabbit_db_jms_exchange:setup_schema().
%%----------------------------------------------------------------------------
%% R E F E R E N C E T Y P E I N F O R M A T I O N
@ -111,30 +104,25 @@ route( #exchange{name = XName}
validate(_X) -> ok.
% After exchange declaration and recovery
create(transaction, #exchange{name = XName}) ->
add_initial_record(XName);
create(_Tx, _X) ->
ok.
create(none, #exchange{name = XName}) ->
add_initial_record(XName).
% Delete an exchange
delete(transaction, #exchange{name = XName}, _Bs) ->
delete_state(XName),
ok;
delete(_Tx, _X, _Bs) ->
ok.
delete(none, #exchange{name = XName}) ->
delete_state(XName).
% Before add binding
validate_binding(_X, _B) -> ok.
% A new binding has ben added or recovered
add_binding( Tx
add_binding( none
, #exchange{name = XName}
, #binding{key = BindingKey, destination = Dest, args = Args}
) ->
Selector = get_string_arg(Args, ?RJMS_COMPILED_SELECTOR_ARG),
BindGen = generate_binding_fun(Selector),
case {Tx, BindGen} of
{transaction, {ok, BindFun}} ->
case BindGen of
{ok, BindFun} ->
add_binding_fun(XName, {{BindingKey, Dest}, BindFun});
{none, error} ->
parsing_error(XName, Selector, Dest);
@ -144,13 +132,11 @@ add_binding( Tx
ok.
% Binding removal
remove_bindings( transaction
remove_bindings( none
, #exchange{name = XName}
, Bindings
) ->
remove_binding_funs(XName, Bindings),
ok;
remove_bindings(_Tx, _X, _Bs) ->
ok.
% Exchange argument equivalence
@ -234,66 +220,27 @@ selector_match(Selector, Headers) ->
% get binding funs from state (using dirty_reads)
get_binding_funs_x(XName) ->
mnesia:async_dirty(
fun() ->
case read_state_no_error(XName) of
not_found ->
not_found;
#?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} ->
BindingFuns
end
end,
[]
).
rabbit_db_jms_exchange:get(XName).
add_initial_record(XName) ->
write_state_fun(XName, dict:new()).
% add binding fun to binding fun dictionary
add_binding_fun(XName, BindingKeyAndFun) ->
#?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} = read_state_for_update(XName),
write_state_fun(XName, put_item(BindingFuns, BindingKeyAndFun)).
rabbit_db_jms_exchange:create_or_update(XName, BindingKeyAndFun, fun exchange_state_corrupt_error/1).
% remove binding funs from binding fun dictionary
remove_binding_funs(XName, Bindings) ->
BindingKeys = [ {BindingKey, DestName} || #binding{key = BindingKey, destination = DestName} <- Bindings ],
#?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} = read_state_for_update(XName),
write_state_fun(XName, remove_items(BindingFuns, BindingKeys)).
% add an item to the dictionary of binding functions
put_item(Dict, {Key, Item}) -> dict:store(Key, Item, Dict).
% remove a list of keyed items from the dictionary, by key
remove_items(Dict, []) -> Dict;
remove_items(Dict, [Key | Keys]) -> remove_items(dict:erase(Key, Dict), Keys).
rabbit_db_jms_exchange:delete(XName, BindingKeys, fun exchange_state_corrupt_error/1).
% delete all the state saved for this exchange
delete_state(XName) ->
mnesia:delete(?JMS_TOPIC_TABLE, XName, write).
% Basic read for update
read_state_for_update(XName) -> read_state(XName, write).
% Lockable read
read_state(XName, Lock) ->
case mnesia:read(?JMS_TOPIC_TABLE, XName, Lock) of
[Rec] -> Rec;
_ -> exchange_state_corrupt_error(XName)
end.
read_state_no_error(XName) ->
case mnesia:read(?JMS_TOPIC_TABLE, XName, read) of
[Rec] -> Rec;
_ -> not_found
end.
rabbit_db_jms_exchange:delete(XName).
% Basic write
write_state_fun(XName, BFuns) ->
mnesia:write( ?JMS_TOPIC_TABLE
, #?JMS_TOPIC_RECORD{x_name = XName, x_selector_funs = BFuns}
, write ).
rabbit_db_jms_exchange:insert(XName, BFuns).
%%----------------------------------------------------------------------------
%% E R R O R S

View File

@ -20,15 +20,8 @@
-import(rabbit_jms_topic_exchange, [ description/0
, serialise_events/0
, route/2
, validate/1
, create/2
, delete/3
, validate_binding/2
, add_binding/3
, remove_bindings/3
, assert_args_equivalence/2
, policy_changed/3 ]).
, validate_binding/2 ]).
all() ->
@ -42,10 +35,7 @@ groups() ->
description_test,
serialise_events_test,
validate_test,
create_test,
delete_test,
validate_binding_test,
add_binding_test
validate_binding_test
]}
].
@ -82,19 +72,10 @@ serialise_events_test(_Config) ->
?assertMatch(false, serialise_events()).
validate_test(_Config) ->
?assertEqual(ok, validate(any_exchange)).
create_test(_Config) ->
?assertEqual(ok, create(none, any_exchange)).
delete_test(_Config) ->
?assertEqual(ok, delete(none, any_exchange, any_bindings)).
?assertEqual(ok, validate(dummy_exchange())).
validate_binding_test(_Config) ->
?assertEqual(ok, validate_binding(any_exchange, any_bindings)).
add_binding_test(_Config) ->
?assertEqual(ok, add_binding(none, dummy_exchange(), dummy_binding())).
?assertEqual(ok, validate_binding(dummy_exchange(), dummy_binding())).
dummy_exchange() ->
#exchange{name = <<"XName">>, arguments = []}.