Use tick-related timeout to repair leader record
A quorum queue tries to repair its record in a tick handler. This can happen during a network partition and the metadata store may itself be unavailable, making the update likely to time out. The default metadata store timeout is usually higher than the tick interval, so the tick handler may be stuck during several ticks. The record takes some time to be updated (timeout + tick interval, 30 + 5 seconds by default), significantly longer than it takes the metadata store to trigger an election and recover. Client applications may rely on the quorum queue topology to connect to an appropriate node, so making the system reflect the actual topology faster is important to them. This commit makes the record update operations use a timeout 1-second lower than the tick interval. The tick handler process should finish earlier in case of metadata datastore unavailability and it should not take more than a couple of ticks once the datastore is available to update the record.
This commit is contained in:
parent
7a972e050b
commit
8387d739e0
|
@ -34,7 +34,7 @@
|
|||
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
|
||||
-export([notify_down_all/2, notify_down_all/3, activate_limit_all/2]).
|
||||
-export([on_node_up/1, on_node_down/1]).
|
||||
-export([update/2, store_queue/1, update_decorators/2, policy_changed/2]).
|
||||
-export([update/2, update/3, store_queue/1, update_decorators/2, policy_changed/2]).
|
||||
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
|
||||
-export([is_match/2, is_in_virtual_host/2]).
|
||||
-export([is_replicable/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]).
|
||||
|
@ -298,12 +298,18 @@ do_internal_declare(Q0, false) ->
|
|||
Queue = rabbit_queue_decorator:set(Q),
|
||||
rabbit_db_queue:create_or_get(Queue).
|
||||
|
||||
-spec update
|
||||
(name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) ->
|
||||
'not_found' | amqqueue:amqqueue().
|
||||
-spec update(name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) ->
|
||||
'not_found' | amqqueue:amqqueue().
|
||||
|
||||
update(Name, Fun) ->
|
||||
rabbit_db_queue:update(Name, Fun).
|
||||
update(Name, Fun, #{}).
|
||||
|
||||
-spec update(name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue()),
|
||||
#{timeout => timeout()}) ->
|
||||
'not_found' | amqqueue:amqqueue().
|
||||
|
||||
update(Name, Fun, Options) ->
|
||||
rabbit_db_queue:update(Name, Fun, Options).
|
||||
|
||||
-spec ensure_rabbit_queue_record_is_initialized(Queue) -> Ret when
|
||||
Queue :: amqqueue:amqqueue(),
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
set/1,
|
||||
delete/2,
|
||||
update/2,
|
||||
update/3,
|
||||
update_decorators/2,
|
||||
exists/1,
|
||||
foreach/2
|
||||
|
@ -637,9 +638,23 @@ get_many_in_ets(Table, Names) ->
|
|||
%% @private
|
||||
|
||||
update(QName, Fun) ->
|
||||
update(QName, Fun, #{}).
|
||||
|
||||
-spec update(QName, UpdateFun, Options) -> Ret when
|
||||
QName :: rabbit_amqqueue:name(),
|
||||
Queue :: amqqueue:amqqueue(),
|
||||
UpdateFun :: fun((Queue) -> NewQueue),
|
||||
NewQueue :: amqqueue:amqqueue(),
|
||||
Options :: #{timeout => timeout()},
|
||||
Ret :: Queue | not_found.
|
||||
%% @doc Updates an existing queue record using `UpdateFun'.
|
||||
%%
|
||||
%% @private
|
||||
|
||||
update(QName, Fun, Options) ->
|
||||
rabbit_khepri:handle_fallback(
|
||||
#{mnesia => fun() -> update_in_mnesia(QName, Fun) end,
|
||||
khepri => fun() -> update_in_khepri(QName, Fun) end
|
||||
khepri => fun() -> update_in_khepri(QName, Fun, Options) end
|
||||
}).
|
||||
|
||||
update_in_mnesia(QName, Fun) ->
|
||||
|
@ -648,15 +663,19 @@ update_in_mnesia(QName, Fun) ->
|
|||
update_in_mnesia_tx(QName, Fun)
|
||||
end).
|
||||
|
||||
|
||||
update_in_khepri(QName, Fun) ->
|
||||
update_in_khepri(QName, Fun, #{}).
|
||||
|
||||
update_in_khepri(QName, Fun, Options) ->
|
||||
Path = khepri_queue_path(QName),
|
||||
Ret1 = rabbit_khepri:adv_get(Path),
|
||||
Ret1 = rabbit_khepri:adv_get(Path, Options),
|
||||
case Ret1 of
|
||||
{ok, #{Path := #{data := Q, payload_version := Vsn}}} ->
|
||||
UpdatePath = khepri_path:combine_with_conditions(
|
||||
Path, [#if_payload_version{version = Vsn}]),
|
||||
Q1 = Fun(Q),
|
||||
Ret2 = rabbit_khepri:put(UpdatePath, Q1),
|
||||
Ret2 = rabbit_khepri:put(UpdatePath, Q1, Options),
|
||||
case Ret2 of
|
||||
ok -> Q1;
|
||||
{error, {khepri, mismatching_node, _}} ->
|
||||
|
|
|
@ -446,7 +446,8 @@ become_leader0(QName, Name) ->
|
|||
amqqueue:set_pid(Q1, {Name, node()}),
|
||||
live)
|
||||
end,
|
||||
_ = rabbit_amqqueue:update(QName, Fun),
|
||||
Timeout = max(tick_interval() - 1000, 1000),
|
||||
_ = rabbit_amqqueue:update(QName, Fun, #{timeout => Timeout}),
|
||||
case rabbit_amqqueue:lookup(QName) of
|
||||
{ok, Q0} when ?is_amqqueue(Q0) ->
|
||||
Nodes = get_nodes(Q0),
|
||||
|
@ -656,7 +657,7 @@ handle_tick(QName,
|
|||
ok;
|
||||
repaired ->
|
||||
?LOG_DEBUG("Repaired quorum queue ~ts amqqueue record",
|
||||
[rabbit_misc:rs(QName)])
|
||||
[rabbit_misc:rs(QName)])
|
||||
end,
|
||||
ExpectedNodes = rabbit_nodes:list_members(),
|
||||
case Nodes -- ExpectedNodes of
|
||||
|
@ -1981,8 +1982,7 @@ make_ra_conf(Q, ServerId) ->
|
|||
|
||||
make_ra_conf(Q, ServerId, Membership, MacVersion)
|
||||
when is_integer(MacVersion) ->
|
||||
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
|
||||
?TICK_INTERVAL),
|
||||
TickTimeout = tick_interval(),
|
||||
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
|
||||
?SNAPSHOT_INTERVAL),
|
||||
CheckpointInterval = application:get_env(rabbit,
|
||||
|
@ -2408,3 +2408,6 @@ queue_vm_stats_sups() ->
|
|||
queue_vm_ets() ->
|
||||
{[quorum_ets],
|
||||
[[ra_log_ets]]}.
|
||||
|
||||
tick_interval() ->
|
||||
application:get_env(rabbit, quorum_tick_interval, ?TICK_INTERVAL).
|
||||
|
|
Loading…
Reference in New Issue