Experiment with #exchange records stored as maps

This commit is contained in:
Jean-Sébastien Pédron 2024-08-01 17:12:29 +02:00
parent b934dc597c
commit b7f217a88f
No known key found for this signature in database
GPG Key ID: 39E99761A5FD94CC
3 changed files with 135 additions and 28 deletions

View File

@ -48,6 +48,11 @@
%% For testing
-export([clear/0]).
-export([
record_to_storable_map/1,
storable_map_to_record/1
]).
-export([
khepri_exchange_path/1, khepri_exchange_path/2,
khepri_exchange_serial_path/1, khepri_exchange_serial_path/2
@ -81,7 +86,7 @@ get_all_in_mnesia() ->
get_all_in_khepri() ->
Path = khepri_exchange_path(?KHEPRI_WILDCARD_STAR, #if_has_data{}),
rabbit_db:list_in_khepri(Path).
fold_storable_map_to_record(Path).
-spec get_all(VHostName) -> [Exchange] when
VHostName :: vhost:name(),
@ -104,7 +109,28 @@ get_all_in_mnesia(VHost) ->
get_all_in_khepri(VHost) ->
Path = khepri_exchange_path(VHost, #if_has_data{}),
rabbit_db:list_in_khepri(Path).
fold_storable_map_to_record(Path).
fold_storable_map_to_record(Path) ->
Ret = rabbit_khepri:fold(Path, fun fold_storable_map_to_record1/3, []),
case Ret of
{ok, Acc} -> Acc;
_ -> []
end.
fold_storable_map_to_record(Path, Options) ->
Ret = rabbit_khepri:fold(
Path, fun fold_storable_map_to_record1/3, [], Options),
case Ret of
{ok, Acc} -> Acc;
_ -> []
end.
fold_storable_map_to_record1(_Path, #{data := Map}, Acc) ->
Record = storable_map_to_record(Map),
[Record | Acc];
fold_storable_map_to_record1(_Path, _NoPayload, Acc) ->
Acc.
%% -------------------------------------------------------------------
%% get_all_durable().
@ -155,7 +181,13 @@ list_in_khepri() ->
Path = khepri_exchange_path(?KHEPRI_WILDCARD_STAR, #if_has_data{}),
case rabbit_khepri:match(Path) of
{ok, Map} ->
maps:fold(fun(_K, X, Acc) -> [X#exchange.name | Acc] end, [], Map);
maps:fold(fun(_K, M, Acc) ->
%% We could skip the conversion and access the
%% map directly. For now, let's do this to leave
%% the conversion knowledge in a single place.
X = storable_map_to_record(M),
[X#exchange.name | Acc]
end, [], Map);
_ ->
[]
end.
@ -203,8 +235,11 @@ get_in_khepri(Name) ->
get_in_khepri_tx(Name) ->
Path = khepri_exchange_path(Name),
case khepri_tx:get(Path) of
{ok, X} -> [X];
_ -> []
{ok, M} ->
X = storable_map_to_record(M),
[X];
_ ->
[]
end.
%% -------------------------------------------------------------------
@ -326,12 +361,14 @@ update_in_khepri(XName, Fun) ->
Path = khepri_exchange_path(XName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := X, payload_version := Vsn}} ->
{ok, #{data := M, payload_version := Vsn}} ->
X = storable_map_to_record(M),
X1 = Fun(X),
M1 = record_to_storable_map(X1),
UpdatePath =
khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
Ret2 = rabbit_khepri:put(UpdatePath, X1),
Ret2 = rabbit_khepri:put(UpdatePath, M1),
case Ret2 of
ok ->
ok;
@ -361,9 +398,11 @@ update_in_khepri(XName, Fun) ->
update_in_khepri_tx(Name, Fun) ->
Path = khepri_exchange_path(Name),
case khepri_tx:get(Path) of
{ok, X} ->
{ok, M} ->
X = storable_map_to_record(M),
X1 = Fun(X),
ok = khepri_tx:put(Path, X1),
M1 = record_to_storable_map(X1),
ok = khepri_tx:put(Path, M1),
X1;
_ -> not_found
end.
@ -404,10 +443,12 @@ create_or_get_in_mnesia(#exchange{name = XName} = X) ->
create_or_get_in_khepri(#exchange{name = XName} = X) ->
Path = khepri_exchange_path(XName),
case rabbit_khepri:create(Path, X) of
M = record_to_storable_map(X),
case rabbit_khepri:create(Path, M) of
ok ->
{new, X};
{error, {khepri, mismatching_node, #{node_props := #{data := ExistingX}}}} ->
{error, {khepri, mismatching_node, #{node_props := #{data := ExistingM}}}} ->
ExistingX = storable_map_to_record(ExistingM),
{existing, ExistingX};
{error, timeout} = Err ->
Err
@ -447,7 +488,8 @@ set_in_khepri(Xs) when is_list(Xs) ->
set_in_khepri_tx(X) ->
Path = khepri_exchange_path(X#exchange.name),
ok = khepri_tx:put(Path, X),
M = record_to_storable_map(X),
ok = khepri_tx:put(Path, M),
X.
%% -------------------------------------------------------------------
@ -635,8 +677,11 @@ delete_in_khepri(XName, IfUnused) ->
rabbit_khepri:transaction(
fun() ->
case khepri_tx:get(khepri_exchange_path(XName)) of
{ok, X} -> DeletionFun(X, false);
_ -> {error, not_found}
{ok, M} ->
X = storable_map_to_record(M),
DeletionFun(X, false);
_ ->
{error, not_found}
end
end, rw).
@ -724,7 +769,7 @@ recover_in_khepri(VHost) ->
%% the next boot most likely would behave the same way.
%% Any other request stays with the default timeout, currently 30s.
Path = khepri_exchange_path(VHost, #if_has_data{}),
Exchanges0 = rabbit_db:list_in_khepri(Path, #{timeout => infinity}),
Exchanges0 = fold_storable_map_to_record(Path, #{timeout => infinity}),
Exchanges = [rabbit_exchange_decorator:set(X) || X <- Exchanges0],
rabbit_khepri:transaction(
@ -767,10 +812,11 @@ match_in_mnesia(Pattern) ->
{aborted, Err} -> {error, Err}
end.
match_in_khepri(Pattern0) ->
Pattern = #if_data_matches{pattern = Pattern0},
Path = khepri_exchange_path(?KHEPRI_WILDCARD_STAR, Pattern),
rabbit_db:list_in_khepri(Path).
match_in_khepri(Pattern) ->
Pattern1 = record_to_storable_map(Pattern),
Pattern2 = #if_data_matches{pattern = Pattern1},
Path = khepri_exchange_path(?KHEPRI_WILDCARD_STAR, Pattern2),
fold_storable_map_to_record(Path).
%% -------------------------------------------------------------------
%% exists().
@ -874,17 +920,73 @@ maybe_auto_delete_in_mnesia(XName, OnlyDurable) ->
maybe_auto_delete_in_khepri(XName, OnlyDurable) ->
case khepri_tx:get(khepri_exchange_path(XName)) of
{ok, #exchange{auto_delete = false} = X} ->
{not_deleted, X};
{ok, #exchange{auto_delete = true} = X} ->
case conditional_delete_in_khepri(X, OnlyDurable) of
{error, in_use} -> {not_deleted, X};
{deleted, X, [], Deletions} -> {deleted, X, Deletions}
{ok, M} ->
X = storable_map_to_record(M),
case X of
#exchange{auto_delete = false} ->
{not_deleted, X};
#exchange{auto_delete = true} ->
case conditional_delete_in_khepri(X, OnlyDurable) of
{error, in_use} -> {not_deleted, X};
{deleted, X, [], Deletions} -> {deleted, X, Deletions}
end
end;
{error, _} ->
{not_deleted, undefined}
end.
%% -------------------------------------------------------------------
%% Record <-> map conversion for Khepri storage.
%% -------------------------------------------------------------------
record_to_storable_map(#exchange{name = Name,
type = Type,
durable = Durable,
auto_delete = AutoDelete,
internal = Internal,
arguments = Arguments,
scratches = Scratches,
policy = Policy,
operator_policy = OperatorPolicy,
decorators = Decorators,
options = Options}) ->
Map = #{name => Name,
type => Type,
durable => Durable,
auto_delete => AutoDelete,
internal => Internal,
arguments => Arguments,
scratches => Scratches,
policy => Policy,
operator_policy => OperatorPolicy,
decorators => Decorators,
options => Options},
Map.
storable_map_to_record(#{name := Name,
type := Type,
durable := Durable,
auto_delete := AutoDelete,
internal := Internal,
arguments := Arguments,
scratches := Scratches,
policy := Policy,
operator_policy := OperatorPolicy,
decorators := Decorators,
options := Options}) ->
Record = #exchange{name = Name,
type = Type,
durable = Durable,
auto_delete = AutoDelete,
internal = Internal,
arguments = Arguments,
scratches = Scratches,
policy = Policy,
operator_policy = OperatorPolicy,
decorators = Decorators,
options = Options},
Record.
%% -------------------------------------------------------------------
%% Khepri paths
%% -------------------------------------------------------------------

View File

@ -52,6 +52,7 @@ copy_to_khepri(
[Table, Name],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
Path = rabbit_db_exchange:khepri_exchange_path(Name),
Map = rabbit_db_exchange:record_to_storable_map(Record),
rabbit_db_m2k_converter:with_correlation_id(
fun(CorrId) ->
Extra = #{async => CorrId},
@ -59,7 +60,7 @@ copy_to_khepri(
"Mnesia->Khepri data copy: [~0p] path: ~0p corr: ~0p",
[Table, Path, CorrId],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
rabbit_khepri:put(Path, Record, Extra)
rabbit_khepri:put(Path, Map, Extra)
end, State);
copy_to_khepri(rabbit_exchange_serial = Table,
#exchange_serial{name = Resource, next = Serial},

View File

@ -1090,8 +1090,12 @@ register_rabbit_exchange_projection() ->
exchanges,
_VHost = ?KHEPRI_WILDCARD_STAR,
_Name = ?KHEPRI_WILDCARD_STAR],
KeyPos = #exchange.name,
register_simple_projection(Name, PathPattern, KeyPos).
ProjectionFun = fun(_Path, Map) ->
rabbit_db_exchange:storable_map_to_record(Map)
end,
Options = #{keypos => #exchange.name},
Projection = khepri_projection:new(Name, ProjectionFun, Options),
khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection).
register_rabbit_queue_projection() ->
Name = rabbit_khepri_queue,