Bump Khepri from 0.14.0 to 0.15.0
Release notes: https://github.com/rabbitmq/khepri/releases/tag/v0.15.0
This commit is contained in:
parent
b90a606f47
commit
bc416757e8
|
|
@ -210,8 +210,8 @@ erlang_package.hex_package(
|
|||
erlang_package.hex_package(
|
||||
name = "khepri",
|
||||
build_file = "@rabbitmq-server//bazel:BUILD.khepri",
|
||||
sha256 = "dccfaeb3583a04722e2258911f7f906ce67f8efac80504be4923aaafae6d4e21",
|
||||
version = "0.14.0",
|
||||
sha256 = "3fca316af28f0a7524be01164a3e9dd484505f18887c5c2065e0db40802522d1",
|
||||
version = "0.15.0",
|
||||
)
|
||||
|
||||
erlang_package.hex_package(
|
||||
|
|
|
|||
|
|
@ -155,11 +155,7 @@ get_consistent_in_mnesia(Node) ->
|
|||
|
||||
get_consistent_in_khepri(Node) ->
|
||||
Path = khepri_maintenance_path(Node),
|
||||
%% FIXME: Ra consistent queries are fragile in the sense that the query
|
||||
%% function may run on a remote node and the function reference or MFA may
|
||||
%% not be valid on that node. That's why we force a local query for now.
|
||||
%Options = #{favor => consistent},
|
||||
Options = #{favor => local},
|
||||
Options = #{favor => consistency},
|
||||
case rabbit_khepri:get(Path, Options) of
|
||||
{ok, #node_maintenance_state{status = Status}} ->
|
||||
Status;
|
||||
|
|
|
|||
|
|
@ -264,19 +264,23 @@ setup(_) ->
|
|||
friendly_name => ?RA_FRIENDLY_NAME},
|
||||
case khepri:start(?RA_SYSTEM, RaServerConfig) of
|
||||
{ok, ?STORE_ID} ->
|
||||
wait_for_leader(),
|
||||
wait_for_register_projections(),
|
||||
?LOG_DEBUG(
|
||||
"Khepri-based " ?RA_FRIENDLY_NAME " ready",
|
||||
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
|
||||
ok;
|
||||
RetryTimeout = retry_timeout(),
|
||||
case khepri_cluster:wait_for_leader(?STORE_ID, RetryTimeout) of
|
||||
ok ->
|
||||
wait_for_register_projections(),
|
||||
?LOG_DEBUG(
|
||||
"Khepri-based " ?RA_FRIENDLY_NAME " ready",
|
||||
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
|
||||
ok;
|
||||
{error, timeout} ->
|
||||
exit(timeout_waiting_for_leader);
|
||||
{error, _} = Error ->
|
||||
exit(Error)
|
||||
end;
|
||||
{error, _} = Error ->
|
||||
exit(Error)
|
||||
end.
|
||||
|
||||
wait_for_leader() ->
|
||||
wait_for_leader(retry_timeout(), retry_limit()).
|
||||
|
||||
retry_timeout() ->
|
||||
case application:get_env(rabbit, khepri_leader_wait_retry_timeout) of
|
||||
{ok, T} -> T;
|
||||
|
|
@ -289,25 +293,6 @@ retry_limit() ->
|
|||
undefined -> 10
|
||||
end.
|
||||
|
||||
wait_for_leader(_Timeout, 0) ->
|
||||
exit(timeout_waiting_for_leader);
|
||||
wait_for_leader(Timeout, Retries) ->
|
||||
rabbit_log:info("Waiting for Khepri leader for ~tp ms, ~tp retries left",
|
||||
[Timeout, Retries - 1]),
|
||||
Options = #{timeout => Timeout,
|
||||
favor => low_latency},
|
||||
case khepri:exists(?STORE_ID, [], Options) of
|
||||
Exists when is_boolean(Exists) ->
|
||||
rabbit_log:info("Khepri leader elected"),
|
||||
ok;
|
||||
{error, timeout} -> %% Khepri >= 0.14.0
|
||||
wait_for_leader(Timeout, Retries -1);
|
||||
{error, {timeout, _ServerId}} -> %% Khepri < 0.14.0
|
||||
wait_for_leader(Timeout, Retries -1);
|
||||
{error, Reason} ->
|
||||
throw(Reason)
|
||||
end.
|
||||
|
||||
wait_for_register_projections() ->
|
||||
wait_for_register_projections(retry_timeout(), retry_limit()).
|
||||
|
||||
|
|
@ -940,50 +925,46 @@ cas(Path, Pattern, Data) ->
|
|||
?STORE_ID, Path, Pattern, Data, ?DEFAULT_COMMAND_OPTIONS).
|
||||
|
||||
fold(Path, Pred, Acc) ->
|
||||
khepri:fold(?STORE_ID, Path, Pred, Acc, #{favor => low_latency}).
|
||||
khepri:fold(?STORE_ID, Path, Pred, Acc).
|
||||
|
||||
fold(Path, Pred, Acc, Options) ->
|
||||
Options1 = Options#{favor => low_latency},
|
||||
khepri:fold(?STORE_ID, Path, Pred, Acc, Options1).
|
||||
khepri:fold(?STORE_ID, Path, Pred, Acc, Options).
|
||||
|
||||
foreach(Path, Pred) ->
|
||||
khepri:foreach(?STORE_ID, Path, Pred, #{favor => low_latency}).
|
||||
khepri:foreach(?STORE_ID, Path, Pred).
|
||||
|
||||
filter(Path, Pred) ->
|
||||
khepri:filter(?STORE_ID, Path, Pred, #{favor => low_latency}).
|
||||
khepri:filter(?STORE_ID, Path, Pred).
|
||||
|
||||
get(Path) ->
|
||||
khepri:get(?STORE_ID, Path, #{favor => low_latency}).
|
||||
khepri:get(?STORE_ID, Path).
|
||||
|
||||
get(Path, Options) ->
|
||||
Options1 = Options#{favor => low_latency},
|
||||
khepri:get(?STORE_ID, Path, Options1).
|
||||
khepri:get(?STORE_ID, Path, Options).
|
||||
|
||||
get_many(PathPattern) ->
|
||||
khepri:get_many(?STORE_ID, PathPattern, #{favor => low_latency}).
|
||||
khepri:get_many(?STORE_ID, PathPattern).
|
||||
|
||||
adv_get(Path) ->
|
||||
khepri_adv:get(?STORE_ID, Path, #{favor => low_latency}).
|
||||
khepri_adv:get(?STORE_ID, Path).
|
||||
|
||||
adv_get_many(PathPattern) ->
|
||||
khepri_adv:get_many(?STORE_ID, PathPattern, #{favor => low_latency}).
|
||||
khepri_adv:get_many(?STORE_ID, PathPattern).
|
||||
|
||||
match(Path) ->
|
||||
match(Path, #{}).
|
||||
|
||||
match(Path, Options) ->
|
||||
Options1 = Options#{favor => low_latency},
|
||||
khepri:get_many(?STORE_ID, Path, Options1).
|
||||
khepri:get_many(?STORE_ID, Path, Options).
|
||||
|
||||
exists(Path) -> khepri:exists(?STORE_ID, Path, #{favor => low_latency}).
|
||||
exists(Path) -> khepri:exists(?STORE_ID, Path).
|
||||
|
||||
list(Path) ->
|
||||
khepri:get_many(
|
||||
?STORE_ID, Path ++ [?KHEPRI_WILDCARD_STAR], #{favor => low_latency}).
|
||||
?STORE_ID, Path ++ [?KHEPRI_WILDCARD_STAR]).
|
||||
|
||||
list_child_nodes(Path) ->
|
||||
Options = #{props_to_return => [child_names],
|
||||
favor => low_latency},
|
||||
Options = #{props_to_return => [child_names]},
|
||||
case khepri_adv:get_many(?STORE_ID, Path, Options) of
|
||||
{ok, Result} ->
|
||||
case maps:values(Result) of
|
||||
|
|
@ -997,8 +978,7 @@ list_child_nodes(Path) ->
|
|||
end.
|
||||
|
||||
count_children(Path) ->
|
||||
Options = #{props_to_return => [child_list_length],
|
||||
favor => low_latency},
|
||||
Options = #{props_to_return => [child_list_length]},
|
||||
case khepri_adv:get_many(?STORE_ID, Path, Options) of
|
||||
{ok, Map} ->
|
||||
lists:sum([L || #{child_list_length := L} <- maps:values(Map)]);
|
||||
|
|
@ -1049,18 +1029,9 @@ transaction(Fun) ->
|
|||
transaction(Fun, ReadWrite) ->
|
||||
transaction(Fun, ReadWrite, #{}).
|
||||
|
||||
transaction(Fun, ReadWrite, Options0) ->
|
||||
%% If the transaction is read-only, use the same default options we use
|
||||
%% for most queries.
|
||||
DefaultQueryOptions = case ReadWrite of
|
||||
ro ->
|
||||
#{favor => low_latency};
|
||||
_ ->
|
||||
#{}
|
||||
end,
|
||||
Options1 = maps:merge(DefaultQueryOptions, Options0),
|
||||
Options = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options1),
|
||||
case khepri:transaction(?STORE_ID, Fun, ReadWrite, Options) of
|
||||
transaction(Fun, ReadWrite, Options) ->
|
||||
Options1 = maps:merge(?DEFAULT_COMMAND_OPTIONS, Options),
|
||||
case khepri:transaction(?STORE_ID, Fun, ReadWrite, Options1) of
|
||||
ok -> ok;
|
||||
{ok, Result} -> Result;
|
||||
{error, Reason} -> throw({error, Reason})
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ dep_credentials_obfuscation = hex 3.4.0
|
|||
dep_cuttlefish = hex 3.4.0
|
||||
dep_gen_batch_server = hex 0.8.8
|
||||
dep_jose = hex 1.11.10
|
||||
dep_khepri = hex 0.14.0
|
||||
dep_khepri = hex 0.15.0
|
||||
dep_khepri_mnesia_migration = hex 0.5.0
|
||||
dep_prometheus = hex 4.11.0
|
||||
dep_ra = hex 2.14.0
|
||||
|
|
|
|||
Loading…
Reference in New Issue