rabbit_khepri: Use local queries exclusively
[Why] Ra consistent queries are currently fragile in the sense that the query function may run on a remote node and the function reference or MFA may not be valid on that node: * A different Erlang compiler may produce difference function references for the same module source code. We observed a difference between Erlang/OTP 25.x and Erlang/OTP 26.x compilers for instance. * There is no way to be sure that the remote function copy, whether it is described by a function reference of an MFA tuple, is the same as the copy local to the caller. Indeed, the remote node may run a different version after an upgrade to one of the local or remote nodes. [How] That's why we force local queries for now. This is fine for now, especially that we use Khepri projections in many places and they are local by design.
This commit is contained in:
parent
d5624d976f
commit
6ea19b9fa1
|
|
@ -276,7 +276,7 @@ wait_for_leader(Timeout, Retries) ->
|
|||
rabbit_log:info("Waiting for Khepri leader for ~tp ms, ~tp retries left",
|
||||
[Timeout, Retries - 1]),
|
||||
Options = #{timeout => Timeout,
|
||||
favor => compromise},
|
||||
favor => low_latency},
|
||||
case khepri:exists(?STORE_ID, [], Options) of
|
||||
Exists when is_boolean(Exists) ->
|
||||
rabbit_log:info("Khepri leader elected"),
|
||||
|
|
@ -858,20 +858,24 @@ cas(Path, Pattern, Data) ->
|
|||
?STORE_ID, Path, Pattern, Data, ?DEFAULT_COMMAND_OPTIONS).
|
||||
|
||||
fold(Path, Pred, Acc) ->
|
||||
khepri:fold(?STORE_ID, Path, Pred, Acc).
|
||||
khepri:fold(?STORE_ID, Path, Pred, Acc, #{favor => low_latency}).
|
||||
|
||||
fold(Path, Pred, Acc, Options) ->
|
||||
khepri:fold(?STORE_ID, Path, Pred, Acc, Options).
|
||||
Options1 = Options#{favor => low_latency},
|
||||
khepri:fold(?STORE_ID, Path, Pred, Acc, Options1).
|
||||
|
||||
foreach(Path, Pred) -> khepri:foreach(?STORE_ID, Path, Pred).
|
||||
foreach(Path, Pred) ->
|
||||
khepri:foreach(?STORE_ID, Path, Pred, #{favor => low_latency}).
|
||||
|
||||
filter(Path, Pred) -> khepri:filter(?STORE_ID, Path, Pred).
|
||||
filter(Path, Pred) ->
|
||||
khepri:filter(?STORE_ID, Path, Pred, #{favor => low_latency}).
|
||||
|
||||
get(Path) ->
|
||||
khepri:get(?STORE_ID, Path, #{favor => low_latency}).
|
||||
|
||||
get(Path, Options) ->
|
||||
khepri:get(?STORE_ID, Path, Options).
|
||||
Options1 = Options#{favor => low_latency},
|
||||
khepri:get(?STORE_ID, Path, Options1).
|
||||
|
||||
get_many(PathPattern) ->
|
||||
khepri:get_many(?STORE_ID, PathPattern, #{favor => low_latency}).
|
||||
|
|
@ -882,14 +886,19 @@ adv_get(Path) ->
|
|||
match(Path) ->
|
||||
match(Path, #{}).
|
||||
|
||||
match(Path, Options) -> khepri:get_many(?STORE_ID, Path, Options).
|
||||
match(Path, Options) ->
|
||||
Options1 = Options#{favor => low_latency},
|
||||
khepri:get_many(?STORE_ID, Path, Options1).
|
||||
|
||||
exists(Path) -> khepri:exists(?STORE_ID, Path, #{favor => low_latency}).
|
||||
|
||||
list(Path) -> khepri:get_many(?STORE_ID, Path ++ [?KHEPRI_WILDCARD_STAR]).
|
||||
list(Path) ->
|
||||
khepri:get_many(
|
||||
?STORE_ID, Path ++ [?KHEPRI_WILDCARD_STAR], #{favor => low_latency}).
|
||||
|
||||
list_child_nodes(Path) ->
|
||||
Options = #{props_to_return => [child_names]},
|
||||
Options = #{props_to_return => [child_names],
|
||||
favor => low_latency},
|
||||
case khepri_adv:get_many(?STORE_ID, Path, Options) of
|
||||
{ok, Result} ->
|
||||
case maps:values(Result) of
|
||||
|
|
@ -903,7 +912,8 @@ list_child_nodes(Path) ->
|
|||
end.
|
||||
|
||||
count_children(Path) ->
|
||||
Options = #{props_to_return => [child_list_length]},
|
||||
Options = #{props_to_return => [child_list_length],
|
||||
favor => low_latency},
|
||||
case khepri_adv:get_many(?STORE_ID, Path, Options) of
|
||||
{ok, Map} ->
|
||||
lists:sum([L || #{child_list_length := L} <- maps:values(Map)]);
|
||||
|
|
|
|||
Loading…
Reference in New Issue