Khepri: Handle breaking change in khepri adv API return type
[Why] All callers of `khepri_adv` and `khepri_tx_adv` need updates to handle the now uniform return type of `khepri:node_props_map()` in Khepri 0.17.0. [How] We don't need any compatibility code to handle "either the old return type or the new return type" from the khepri_adv API because the translation is done entirely in the "client side" code in Khepri - meaning that the return value from the Ra server is the same but it is translated differently by the functions in `khepri_adv`. However, we need to adapt transaction functions because they may be executed on different versions of Khepri and the behaviour of `khepri_tx_adv` can be different. To take the possible change of return value format, we use the new `khepri_tx:does_api_comply_with/1` to know what to expect.
This commit is contained in:
parent
9b5ab14faf
commit
f5805b83d2
|
@ -837,17 +837,25 @@ delete_all_for_exchange_in_khepri(X = #exchange{name = XName}, OnlyDurable, Remo
|
||||||
end,
|
end,
|
||||||
{deleted, X, Bindings, delete_for_destination_in_khepri(XName, OnlyDurable)}.
|
{deleted, X, Bindings, delete_for_destination_in_khepri(XName, OnlyDurable)}.
|
||||||
|
|
||||||
delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
|
delete_for_source_in_khepri(#resource{virtual_host = VHost, name = SrcName}) ->
|
||||||
Path = khepri_route_path(
|
Pattern = khepri_route_path(
|
||||||
VHost,
|
VHost,
|
||||||
Name,
|
SrcName,
|
||||||
_Kind = ?KHEPRI_WILDCARD_STAR,
|
?KHEPRI_WILDCARD_STAR, %% Kind
|
||||||
_DstName = ?KHEPRI_WILDCARD_STAR,
|
?KHEPRI_WILDCARD_STAR, %% DstName
|
||||||
_RoutingKey = #if_has_data{}),
|
#if_has_data{}), %% RoutingKey
|
||||||
{ok, Bindings} = khepri_tx_adv:delete_many(Path),
|
{ok, Bindings} = khepri_tx_adv:delete_many(Pattern),
|
||||||
maps:fold(fun(_P, #{data := Set}, Acc) ->
|
maps:fold(
|
||||||
sets:to_list(Set) ++ Acc
|
fun(Path, Props, Acc) ->
|
||||||
end, [], Bindings).
|
case {Path, Props} of
|
||||||
|
{?RABBITMQ_KHEPRI_ROUTE_PATH(
|
||||||
|
VHost, SrcName, _Kind, _Name, _RoutingKey),
|
||||||
|
#{data := Set}} ->
|
||||||
|
sets:to_list(Set) ++ Acc;
|
||||||
|
{_, _} ->
|
||||||
|
Acc
|
||||||
|
end
|
||||||
|
end, [], Bindings).
|
||||||
|
|
||||||
%% -------------------------------------------------------------------
|
%% -------------------------------------------------------------------
|
||||||
%% delete_for_destination_in_mnesia().
|
%% delete_for_destination_in_mnesia().
|
||||||
|
@ -892,14 +900,22 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) ->
|
||||||
delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) ->
|
delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) ->
|
||||||
Pattern = khepri_route_path(
|
Pattern = khepri_route_path(
|
||||||
VHost,
|
VHost,
|
||||||
_SrcName = ?KHEPRI_WILDCARD_STAR,
|
?KHEPRI_WILDCARD_STAR, %% SrcName
|
||||||
Kind,
|
Kind,
|
||||||
Name,
|
Name,
|
||||||
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
|
?KHEPRI_WILDCARD_STAR), %% RoutingKey
|
||||||
{ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern),
|
{ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern),
|
||||||
Bindings = maps:fold(fun(_, #{data := Set}, Acc) ->
|
Bindings = maps:fold(
|
||||||
sets:to_list(Set) ++ Acc
|
fun(Path, Props, Acc) ->
|
||||||
end, [], BindingsMap),
|
case {Path, Props} of
|
||||||
|
{?RABBITMQ_KHEPRI_ROUTE_PATH(
|
||||||
|
VHost, _SrcName, Kind, Name, _RoutingKey),
|
||||||
|
#{data := Set}} ->
|
||||||
|
sets:to_list(Set) ++ Acc;
|
||||||
|
{_, _} ->
|
||||||
|
Acc
|
||||||
|
end
|
||||||
|
end, [], BindingsMap),
|
||||||
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
|
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
|
||||||
lists:keysort(#binding.source, Bindings), OnlyDurable).
|
lists:keysort(#binding.source, Bindings), OnlyDurable).
|
||||||
|
|
||||||
|
|
|
@ -331,7 +331,7 @@ update_in_khepri(XName, Fun) ->
|
||||||
Path = khepri_exchange_path(XName),
|
Path = khepri_exchange_path(XName),
|
||||||
Ret1 = rabbit_khepri:adv_get(Path),
|
Ret1 = rabbit_khepri:adv_get(Path),
|
||||||
case Ret1 of
|
case Ret1 of
|
||||||
{ok, #{data := X, payload_version := Vsn}} ->
|
{ok, #{Path := #{data := X, payload_version := Vsn}}} ->
|
||||||
X1 = Fun(X),
|
X1 = Fun(X),
|
||||||
UpdatePath =
|
UpdatePath =
|
||||||
khepri_path:combine_with_conditions(
|
khepri_path:combine_with_conditions(
|
||||||
|
@ -534,8 +534,7 @@ next_serial_in_khepri(XName) ->
|
||||||
Path = khepri_exchange_serial_path(XName),
|
Path = khepri_exchange_serial_path(XName),
|
||||||
Ret1 = rabbit_khepri:adv_get(Path),
|
Ret1 = rabbit_khepri:adv_get(Path),
|
||||||
case Ret1 of
|
case Ret1 of
|
||||||
{ok, #{data := Serial,
|
{ok, #{Path := #{data := Serial, payload_version := Vsn}}} ->
|
||||||
payload_version := Vsn}} ->
|
|
||||||
UpdatePath =
|
UpdatePath =
|
||||||
khepri_path:combine_with_conditions(
|
khepri_path:combine_with_conditions(
|
||||||
Path, [#if_payload_version{version = Vsn}]),
|
Path, [#if_payload_version{version = Vsn}]),
|
||||||
|
@ -711,13 +710,20 @@ delete_all_in_khepri_tx(VHostName) ->
|
||||||
{ok, NodeProps} = khepri_tx_adv:delete_many(Pattern),
|
{ok, NodeProps} = khepri_tx_adv:delete_many(Pattern),
|
||||||
Deletions =
|
Deletions =
|
||||||
maps:fold(
|
maps:fold(
|
||||||
fun(_Path, #{data := X}, Deletions) ->
|
fun(Path, Props, Deletions) ->
|
||||||
{deleted, #exchange{name = XName}, Bindings, XDeletions} =
|
case {Path, Props} of
|
||||||
rabbit_db_binding:delete_all_for_exchange_in_khepri(
|
{?RABBITMQ_KHEPRI_EXCHANGE_PATH(VHostName, _),
|
||||||
X, false, true),
|
#{data := X}} ->
|
||||||
Deletions1 = rabbit_binding:add_deletion(
|
{deleted,
|
||||||
XName, X, deleted, Bindings, XDeletions),
|
#exchange{name = XName}, Bindings, XDeletions} =
|
||||||
rabbit_binding:combine_deletions(Deletions, Deletions1)
|
rabbit_db_binding:delete_all_for_exchange_in_khepri(
|
||||||
|
X, false, true),
|
||||||
|
Deletions1 = rabbit_binding:add_deletion(
|
||||||
|
XName, X, deleted, Bindings, XDeletions),
|
||||||
|
rabbit_binding:combine_deletions(Deletions, Deletions1);
|
||||||
|
{_, _} ->
|
||||||
|
Deletions
|
||||||
|
end
|
||||||
end, rabbit_binding:new_deletions(), NodeProps),
|
end, rabbit_binding:new_deletions(), NodeProps),
|
||||||
{ok, Deletions}.
|
{ok, Deletions}.
|
||||||
|
|
||||||
|
|
|
@ -135,8 +135,8 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) ->
|
||||||
mirroring_pid = Overall,
|
mirroring_pid = Overall,
|
||||||
childspec = ChildSpec},
|
childspec = ChildSpec},
|
||||||
case rabbit_khepri:adv_get(Path) of
|
case rabbit_khepri:adv_get(Path) of
|
||||||
{ok, #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
|
{ok, #{Path := #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
|
||||||
payload_version := Vsn}} ->
|
payload_version := Vsn}}} ->
|
||||||
case Overall of
|
case Overall of
|
||||||
Pid ->
|
Pid ->
|
||||||
Delegate;
|
Delegate;
|
||||||
|
|
|
@ -411,8 +411,18 @@ delete_in_khepri(QueueName, OnlyDurable) ->
|
||||||
rabbit_khepri:transaction(
|
rabbit_khepri:transaction(
|
||||||
fun () ->
|
fun () ->
|
||||||
Path = khepri_queue_path(QueueName),
|
Path = khepri_queue_path(QueueName),
|
||||||
|
UsesUniformWriteRet = try
|
||||||
|
khepri_tx:does_api_comply_with(uniform_write_ret)
|
||||||
|
catch
|
||||||
|
error:undef ->
|
||||||
|
false
|
||||||
|
end,
|
||||||
case khepri_tx_adv:delete(Path) of
|
case khepri_tx_adv:delete(Path) of
|
||||||
{ok, #{data := _}} ->
|
{ok, #{Path := #{data := _}}} when UsesUniformWriteRet ->
|
||||||
|
%% we want to execute some things, as decided by rabbit_exchange,
|
||||||
|
%% after the transaction.
|
||||||
|
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
|
||||||
|
{ok, #{data := _}} when not UsesUniformWriteRet ->
|
||||||
%% we want to execute some things, as decided by rabbit_exchange,
|
%% we want to execute some things, as decided by rabbit_exchange,
|
||||||
%% after the transaction.
|
%% after the transaction.
|
||||||
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
|
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
|
||||||
|
@ -607,7 +617,7 @@ update_in_khepri(QName, Fun) ->
|
||||||
Path = khepri_queue_path(QName),
|
Path = khepri_queue_path(QName),
|
||||||
Ret1 = rabbit_khepri:adv_get(Path),
|
Ret1 = rabbit_khepri:adv_get(Path),
|
||||||
case Ret1 of
|
case Ret1 of
|
||||||
{ok, #{data := Q, payload_version := Vsn}} ->
|
{ok, #{Path := #{data := Q, payload_version := Vsn}}} ->
|
||||||
UpdatePath = khepri_path:combine_with_conditions(
|
UpdatePath = khepri_path:combine_with_conditions(
|
||||||
Path, [#if_payload_version{version = Vsn}]),
|
Path, [#if_payload_version{version = Vsn}]),
|
||||||
Q1 = Fun(Q),
|
Q1 = Fun(Q),
|
||||||
|
@ -658,7 +668,7 @@ update_decorators_in_khepri(QName, Decorators) ->
|
||||||
Path = khepri_queue_path(QName),
|
Path = khepri_queue_path(QName),
|
||||||
Ret1 = rabbit_khepri:adv_get(Path),
|
Ret1 = rabbit_khepri:adv_get(Path),
|
||||||
case Ret1 of
|
case Ret1 of
|
||||||
{ok, #{data := Q1, payload_version := Vsn}} ->
|
{ok, #{Path := #{data := Q1, payload_version := Vsn}}} ->
|
||||||
Q2 = amqqueue:set_decorators(Q1, Decorators),
|
Q2 = amqqueue:set_decorators(Q1, Decorators),
|
||||||
UpdatePath = khepri_path:combine_with_conditions(
|
UpdatePath = khepri_path:combine_with_conditions(
|
||||||
Path, [#if_payload_version{version = Vsn}]),
|
Path, [#if_payload_version{version = Vsn}]),
|
||||||
|
@ -1098,15 +1108,12 @@ delete_transient_in_khepri(FilterFun) ->
|
||||||
case rabbit_khepri:adv_get_many(PathPattern) of
|
case rabbit_khepri:adv_get_many(PathPattern) of
|
||||||
{ok, Props} ->
|
{ok, Props} ->
|
||||||
Qs = maps:fold(
|
Qs = maps:fold(
|
||||||
fun(Path0, #{data := Q, payload_version := Vsn}, Acc)
|
fun(Path, #{data := Q, payload_version := Vsn}, Acc)
|
||||||
when ?is_amqqueue(Q) ->
|
when ?is_amqqueue(Q) ->
|
||||||
case FilterFun(Q) of
|
case FilterFun(Q) of
|
||||||
true ->
|
true ->
|
||||||
Path = khepri_path:combine_with_conditions(
|
|
||||||
Path0,
|
|
||||||
[#if_payload_version{version = Vsn}]),
|
|
||||||
QName = amqqueue:get_name(Q),
|
QName = amqqueue:get_name(Q),
|
||||||
[{Path, QName} | Acc];
|
[{Path, Vsn, QName} | Acc];
|
||||||
false ->
|
false ->
|
||||||
Acc
|
Acc
|
||||||
end
|
end
|
||||||
|
@ -1125,20 +1132,7 @@ do_delete_transient_queues_in_khepri([], _FilterFun) ->
|
||||||
do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
|
do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
|
||||||
Res = rabbit_khepri:transaction(
|
Res = rabbit_khepri:transaction(
|
||||||
fun() ->
|
fun() ->
|
||||||
rabbit_misc:fold_while_ok(
|
do_delete_transient_queues_in_khepri_tx(Qs, [])
|
||||||
fun({Path, QName}, Acc) ->
|
|
||||||
%% Also see `delete_in_khepri/2'.
|
|
||||||
case khepri_tx_adv:delete(Path) of
|
|
||||||
{ok, #{data := _}} ->
|
|
||||||
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
|
|
||||||
QName, false),
|
|
||||||
{ok, [{QName, Deletions} | Acc]};
|
|
||||||
{ok, _} ->
|
|
||||||
{ok, Acc};
|
|
||||||
{error, _} = Error ->
|
|
||||||
Error
|
|
||||||
end
|
|
||||||
end, [], Qs)
|
|
||||||
end),
|
end),
|
||||||
case Res of
|
case Res of
|
||||||
{ok, Items} ->
|
{ok, Items} ->
|
||||||
|
@ -1152,6 +1146,35 @@ do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
do_delete_transient_queues_in_khepri_tx([], Acc) ->
|
||||||
|
{ok, Acc};
|
||||||
|
do_delete_transient_queues_in_khepri_tx([{Path, Vsn, QName} | Rest], Acc) ->
|
||||||
|
%% Also see `delete_in_khepri/2'.
|
||||||
|
VersionedPath = khepri_path:combine_with_conditions(
|
||||||
|
Path, [#if_payload_version{version = Vsn}]),
|
||||||
|
UsesUniformWriteRet = try
|
||||||
|
khepri_tx:does_api_comply_with(uniform_write_ret)
|
||||||
|
catch
|
||||||
|
error:undef ->
|
||||||
|
false
|
||||||
|
end,
|
||||||
|
case khepri_tx_adv:delete(VersionedPath) of
|
||||||
|
{ok, #{Path := #{data := _}}} when UsesUniformWriteRet ->
|
||||||
|
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
|
||||||
|
QName, false),
|
||||||
|
Acc1 = [{QName, Deletions} | Acc],
|
||||||
|
do_delete_transient_queues_in_khepri_tx(Rest, Acc1);
|
||||||
|
{ok, #{data := _}} when not UsesUniformWriteRet ->
|
||||||
|
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
|
||||||
|
QName, false),
|
||||||
|
Acc1 = [{QName, Deletions} | Acc],
|
||||||
|
do_delete_transient_queues_in_khepri_tx(Rest, Acc1);
|
||||||
|
{ok, _} ->
|
||||||
|
do_delete_transient_queues_in_khepri_tx(Rest, Acc);
|
||||||
|
{error, _} = Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
%% -------------------------------------------------------------------
|
%% -------------------------------------------------------------------
|
||||||
%% foreach_transient().
|
%% foreach_transient().
|
||||||
%% -------------------------------------------------------------------
|
%% -------------------------------------------------------------------
|
||||||
|
|
|
@ -59,7 +59,7 @@ set_in_khepri(Key, Term) ->
|
||||||
Record = #runtime_parameters{key = Key,
|
Record = #runtime_parameters{key = Key,
|
||||||
value = Term},
|
value = Term},
|
||||||
case rabbit_khepri:adv_put(Path, Record) of
|
case rabbit_khepri:adv_put(Path, Record) of
|
||||||
{ok, #{data := Params}} ->
|
{ok, #{Path := #{data := Params}}} ->
|
||||||
{old, Params#runtime_parameters.value};
|
{old, Params#runtime_parameters.value};
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
new
|
new
|
||||||
|
@ -113,8 +113,16 @@ set_in_khepri_tx(Key, Term) ->
|
||||||
Path = khepri_rp_path(Key),
|
Path = khepri_rp_path(Key),
|
||||||
Record = #runtime_parameters{key = Key,
|
Record = #runtime_parameters{key = Key,
|
||||||
value = Term},
|
value = Term},
|
||||||
|
UsesUniformWriteRet = try
|
||||||
|
khepri_tx:does_api_comply_with(uniform_write_ret)
|
||||||
|
catch
|
||||||
|
error:undef ->
|
||||||
|
false
|
||||||
|
end,
|
||||||
case khepri_tx_adv:put(Path, Record) of
|
case khepri_tx_adv:put(Path, Record) of
|
||||||
{ok, #{data := Params}} ->
|
{ok, #{Path := #{data := Params}}} when UsesUniformWriteRet ->
|
||||||
|
{old, Params#runtime_parameters.value};
|
||||||
|
{ok, #{data := Params}} when not UsesUniformWriteRet ->
|
||||||
{old, Params#runtime_parameters.value};
|
{old, Params#runtime_parameters.value};
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
new
|
new
|
||||||
|
@ -347,11 +355,23 @@ delete_vhost_in_mnesia_tx(VHostName) ->
|
||||||
<- mnesia:match_object(?MNESIA_TABLE, Match, read)].
|
<- mnesia:match_object(?MNESIA_TABLE, Match, read)].
|
||||||
|
|
||||||
delete_vhost_in_khepri(VHostName) ->
|
delete_vhost_in_khepri(VHostName) ->
|
||||||
Path = khepri_vhost_rp_path(
|
Pattern = khepri_vhost_rp_path(
|
||||||
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
|
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
|
||||||
case rabbit_khepri:adv_delete_many(Path) of
|
case rabbit_khepri:adv_delete_many(Pattern) of
|
||||||
{ok, Props} ->
|
{ok, NodePropsMap} ->
|
||||||
{ok, rabbit_khepri:collect_payloads(Props)};
|
RTParams =
|
||||||
|
maps:fold(
|
||||||
|
fun(Path, Props, Acc) ->
|
||||||
|
case {Path, Props} of
|
||||||
|
{?RABBITMQ_KHEPRI_VHOST_RUNTIME_PARAM_PATH(
|
||||||
|
VHostName, _, _),
|
||||||
|
#{data := RTParam}} ->
|
||||||
|
[RTParam | Acc];
|
||||||
|
{_, _} ->
|
||||||
|
Acc
|
||||||
|
end
|
||||||
|
end, [], NodePropsMap),
|
||||||
|
{ok, RTParams};
|
||||||
{error, _} = Err ->
|
{error, _} = Err ->
|
||||||
Err
|
Err
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -628,20 +628,42 @@ clear_all_permissions_for_vhost_in_mnesia(VHostName) ->
|
||||||
clear_all_permissions_for_vhost_in_khepri(VHostName) ->
|
clear_all_permissions_for_vhost_in_khepri(VHostName) ->
|
||||||
rabbit_khepri:transaction(
|
rabbit_khepri:transaction(
|
||||||
fun() ->
|
fun() ->
|
||||||
UserPermissionsPath = khepri_user_permission_path(
|
clear_all_permissions_for_vhost_in_khepri_tx(VHostName)
|
||||||
?KHEPRI_WILDCARD_STAR, VHostName),
|
|
||||||
TopicPermissionsPath = khepri_topic_permission_path(
|
|
||||||
?KHEPRI_WILDCARD_STAR, VHostName,
|
|
||||||
?KHEPRI_WILDCARD_STAR),
|
|
||||||
{ok, UserProps} = khepri_tx_adv:delete_many(UserPermissionsPath),
|
|
||||||
{ok, TopicProps} = khepri_tx_adv:delete_many(
|
|
||||||
TopicPermissionsPath),
|
|
||||||
Deletions = rabbit_khepri:collect_payloads(
|
|
||||||
TopicProps,
|
|
||||||
rabbit_khepri:collect_payloads(UserProps)),
|
|
||||||
{ok, Deletions}
|
|
||||||
end, rw, #{timeout => infinity}).
|
end, rw, #{timeout => infinity}).
|
||||||
|
|
||||||
|
clear_all_permissions_for_vhost_in_khepri_tx(VHostName) ->
|
||||||
|
UserPermissionsPattern = khepri_user_permission_path(
|
||||||
|
?KHEPRI_WILDCARD_STAR, VHostName),
|
||||||
|
TopicPermissionsPattern = khepri_topic_permission_path(
|
||||||
|
?KHEPRI_WILDCARD_STAR, VHostName,
|
||||||
|
?KHEPRI_WILDCARD_STAR),
|
||||||
|
{ok, UserNodePropsMap} = khepri_tx_adv:delete_many(UserPermissionsPattern),
|
||||||
|
{ok, TopicNodePropsMap} = khepri_tx_adv:delete_many(
|
||||||
|
TopicPermissionsPattern),
|
||||||
|
Deletions0 =
|
||||||
|
maps:fold(
|
||||||
|
fun(Path, Props, Acc) ->
|
||||||
|
case {Path, Props} of
|
||||||
|
{?RABBITMQ_KHEPRI_USER_PERMISSION_PATH(VHostName, _),
|
||||||
|
#{data := Permission}} ->
|
||||||
|
[Permission | Acc];
|
||||||
|
{_, _} ->
|
||||||
|
Acc
|
||||||
|
end
|
||||||
|
end, [], UserNodePropsMap),
|
||||||
|
Deletions1 =
|
||||||
|
maps:fold(
|
||||||
|
fun(Path, Props, Acc) ->
|
||||||
|
case {Path, Props} of
|
||||||
|
{?RABBITMQ_KHEPRI_TOPIC_PERMISSION_PATH(VHostName, _, _),
|
||||||
|
#{data := Permission}} ->
|
||||||
|
[Permission | Acc];
|
||||||
|
{_, _} ->
|
||||||
|
Acc
|
||||||
|
end
|
||||||
|
end, Deletions0, TopicNodePropsMap),
|
||||||
|
{ok, Deletions1}.
|
||||||
|
|
||||||
%% -------------------------------------------------------------------
|
%% -------------------------------------------------------------------
|
||||||
%% get_topic_permissions().
|
%% get_topic_permissions().
|
||||||
%% -------------------------------------------------------------------
|
%% -------------------------------------------------------------------
|
||||||
|
|
|
@ -167,7 +167,7 @@ merge_metadata_in_khepri(VHostName, Metadata) ->
|
||||||
Path = khepri_vhost_path(VHostName),
|
Path = khepri_vhost_path(VHostName),
|
||||||
Ret1 = rabbit_khepri:adv_get(Path),
|
Ret1 = rabbit_khepri:adv_get(Path),
|
||||||
case Ret1 of
|
case Ret1 of
|
||||||
{ok, #{data := VHost0, payload_version := DVersion}} ->
|
{ok, #{Path := #{data := VHost0, payload_version := DVersion}}} ->
|
||||||
VHost = vhost:merge_metadata(VHost0, Metadata),
|
VHost = vhost:merge_metadata(VHost0, Metadata),
|
||||||
rabbit_log:debug("Updating a virtual host record ~p", [VHost]),
|
rabbit_log:debug("Updating a virtual host record ~p", [VHost]),
|
||||||
Path1 = khepri_path:combine_with_conditions(
|
Path1 = khepri_path:combine_with_conditions(
|
||||||
|
@ -443,10 +443,10 @@ update_in_mnesia_tx(VHostName, UpdateFun)
|
||||||
update_in_khepri(VHostName, UpdateFun) ->
|
update_in_khepri(VHostName, UpdateFun) ->
|
||||||
Path = khepri_vhost_path(VHostName),
|
Path = khepri_vhost_path(VHostName),
|
||||||
case rabbit_khepri:adv_get(Path) of
|
case rabbit_khepri:adv_get(Path) of
|
||||||
{ok, #{data := V, payload_version := DVersion}} ->
|
{ok, #{Path := #{data := V, payload_version := Vsn}}} ->
|
||||||
V1 = UpdateFun(V),
|
V1 = UpdateFun(V),
|
||||||
Path1 = khepri_path:combine_with_conditions(
|
Path1 = khepri_path:combine_with_conditions(
|
||||||
Path, [#if_payload_version{version = DVersion}]),
|
Path, [#if_payload_version{version = Vsn}]),
|
||||||
case rabbit_khepri:put(Path1, V1) of
|
case rabbit_khepri:put(Path1, V1) of
|
||||||
ok ->
|
ok ->
|
||||||
V1;
|
V1;
|
||||||
|
|
|
@ -174,10 +174,6 @@
|
||||||
|
|
||||||
-export([force_shrink_member_to_current_member/0]).
|
-export([force_shrink_member_to_current_member/0]).
|
||||||
|
|
||||||
%% Helpers for working with the Khepri API / types.
|
|
||||||
-export([collect_payloads/1,
|
|
||||||
collect_payloads/2]).
|
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-export([force_metadata_store/1,
|
-export([force_metadata_store/1,
|
||||||
clear_forced_metadata_store/0]).
|
clear_forced_metadata_store/0]).
|
||||||
|
@ -1020,12 +1016,14 @@ delete(Path, Options0) ->
|
||||||
|
|
||||||
delete_or_fail(Path) ->
|
delete_or_fail(Path) ->
|
||||||
case khepri_adv:delete(?STORE_ID, Path, ?DEFAULT_COMMAND_OPTIONS) of
|
case khepri_adv:delete(?STORE_ID, Path, ?DEFAULT_COMMAND_OPTIONS) of
|
||||||
{ok, Result} ->
|
{ok, #{Path := NodeProps}} ->
|
||||||
case maps:size(Result) of
|
case maps:size(NodeProps) of
|
||||||
0 -> {error, {node_not_found, #{}}};
|
0 -> {error, {node_not_found, #{}}};
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end;
|
end;
|
||||||
Error ->
|
{ok, #{} = NodePropsMap} when NodePropsMap =:= #{} ->
|
||||||
|
{error, {node_not_found, #{}}};
|
||||||
|
{error, _} = Error ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -1072,48 +1070,6 @@ handle_async_ret(RaEvent) ->
|
||||||
fence(Timeout) ->
|
fence(Timeout) ->
|
||||||
khepri:fence(?STORE_ID, Timeout).
|
khepri:fence(?STORE_ID, Timeout).
|
||||||
|
|
||||||
%% -------------------------------------------------------------------
|
|
||||||
%% collect_payloads().
|
|
||||||
%% -------------------------------------------------------------------
|
|
||||||
|
|
||||||
-spec collect_payloads(Props) -> Ret when
|
|
||||||
Props :: khepri:node_props(),
|
|
||||||
Ret :: [Payload],
|
|
||||||
Payload :: term().
|
|
||||||
|
|
||||||
%% @doc Collects all payloads from a node props map.
|
|
||||||
%%
|
|
||||||
%% This is the same as calling `collect_payloads(Props, [])'.
|
|
||||||
%%
|
|
||||||
%% @private
|
|
||||||
|
|
||||||
collect_payloads(Props) when is_map(Props) ->
|
|
||||||
collect_payloads(Props, []).
|
|
||||||
|
|
||||||
-spec collect_payloads(Props, Acc0) -> Ret when
|
|
||||||
Props :: khepri:node_props(),
|
|
||||||
Acc0 :: [Payload],
|
|
||||||
Ret :: [Payload],
|
|
||||||
Payload :: term().
|
|
||||||
|
|
||||||
%% @doc Collects all payloads from a node props map into the accumulator list.
|
|
||||||
%%
|
|
||||||
%% This is meant to be used with the `khepri_adv' API to easily collect the
|
|
||||||
%% payloads from the return value of `khepri_adv:delete_many/4' for example.
|
|
||||||
%%
|
|
||||||
%% @returns all payloads in the node props map collected into a list, with
|
|
||||||
%% `Acc0' as the tail.
|
|
||||||
%%
|
|
||||||
%% @private
|
|
||||||
|
|
||||||
collect_payloads(Props, Acc0) when is_map(Props) andalso is_list(Acc0) ->
|
|
||||||
maps:fold(
|
|
||||||
fun (_Path, #{data := Payload}, Acc) ->
|
|
||||||
[Payload | Acc];
|
|
||||||
(_Path, _NoPayload, Acc) ->
|
|
||||||
Acc
|
|
||||||
end, Acc0, Props).
|
|
||||||
|
|
||||||
-spec unregister_legacy_projections() -> Ret when
|
-spec unregister_legacy_projections() -> Ret when
|
||||||
Ret :: ok | timeout_error().
|
Ret :: ok | timeout_error().
|
||||||
%% @doc Unregisters any projections which were registered in RabbitMQ 3.13.x
|
%% @doc Unregisters any projections which were registered in RabbitMQ 3.13.x
|
||||||
|
|
|
@ -104,13 +104,13 @@ create_binding_in_mnesia_tx(Src, Dst, Weight, UpdateFun) ->
|
||||||
create_binding_in_khepri(Src, Dst, Weight, UpdateFun) ->
|
create_binding_in_khepri(Src, Dst, Weight, UpdateFun) ->
|
||||||
Path = khepri_consistent_hash_path(Src),
|
Path = khepri_consistent_hash_path(Src),
|
||||||
case rabbit_khepri:adv_get(Path) of
|
case rabbit_khepri:adv_get(Path) of
|
||||||
{ok, #{data := Chx0, payload_version := DVersion}} ->
|
{ok, #{Path := #{data := Chx0, payload_version := Vsn}}} ->
|
||||||
case UpdateFun(Chx0, Dst, Weight) of
|
case UpdateFun(Chx0, Dst, Weight) of
|
||||||
already_exists ->
|
already_exists ->
|
||||||
already_exists;
|
already_exists;
|
||||||
Chx ->
|
Chx ->
|
||||||
Path1 = khepri_path:combine_with_conditions(
|
Path1 = khepri_path:combine_with_conditions(
|
||||||
Path, [#if_payload_version{version = DVersion}]),
|
Path, [#if_payload_version{version = Vsn}]),
|
||||||
Ret2 = rabbit_khepri:put(Path1, Chx),
|
Ret2 = rabbit_khepri:put(Path1, Chx),
|
||||||
case Ret2 of
|
case Ret2 of
|
||||||
ok ->
|
ok ->
|
||||||
|
|
|
@ -108,9 +108,9 @@ create_or_update_in_mnesia(XName, BindingKeyAndFun, ErrorFun) ->
|
||||||
update_in_khepri(XName, BindingKeyAndFun, UpdateFun, ErrorFun) ->
|
update_in_khepri(XName, BindingKeyAndFun, UpdateFun, ErrorFun) ->
|
||||||
Path = khepri_jms_topic_exchange_path(XName),
|
Path = khepri_jms_topic_exchange_path(XName),
|
||||||
case rabbit_khepri:adv_get(Path) of
|
case rabbit_khepri:adv_get(Path) of
|
||||||
{ok, #{data := BindingFuns, payload_version := DVersion}} ->
|
{ok, #{Path := #{data := BindingFuns, payload_version := Vsn}}} ->
|
||||||
Path1 = khepri_path:combine_with_conditions(
|
Path1 = khepri_path:combine_with_conditions(
|
||||||
Path, [#if_payload_version{version = DVersion}]),
|
Path, [#if_payload_version{version = Vsn}]),
|
||||||
Ret = rabbit_khepri:put(Path1, UpdateFun(BindingFuns, BindingKeyAndFun)),
|
Ret = rabbit_khepri:put(Path1, UpdateFun(BindingFuns, BindingKeyAndFun)),
|
||||||
case Ret of
|
case Ret of
|
||||||
ok -> ok;
|
ok -> ok;
|
||||||
|
|
|
@ -106,10 +106,10 @@ insert0_in_mnesia(Key, Cached, Message, Length) ->
|
||||||
insert_in_khepri(XName, Message, Length) ->
|
insert_in_khepri(XName, Message, Length) ->
|
||||||
Path = khepri_recent_history_path(XName),
|
Path = khepri_recent_history_path(XName),
|
||||||
case rabbit_khepri:adv_get(Path) of
|
case rabbit_khepri:adv_get(Path) of
|
||||||
{ok, #{data := Cached0, payload_version := DVersion}} ->
|
{ok, #{Path := #{data := Cached0, payload_version := Vsn}}} ->
|
||||||
Cached = add_to_cache(Cached0, Message, Length),
|
Cached = add_to_cache(Cached0, Message, Length),
|
||||||
Path1 = khepri_path:combine_with_conditions(
|
Path1 = khepri_path:combine_with_conditions(
|
||||||
Path, [#if_payload_version{version = DVersion}]),
|
Path, [#if_payload_version{version = Vsn}]),
|
||||||
Ret = rabbit_khepri:put(Path1, Cached),
|
Ret = rabbit_khepri:put(Path1, Cached),
|
||||||
case Ret of
|
case Ret of
|
||||||
ok ->
|
ok ->
|
||||||
|
|
Loading…
Reference in New Issue