Use tick-related timeout to repair leader record

A quorum queue tries to repair its record in a tick handler. This can
happen during a network partition and the metadata store may itself be
unavailable, making the update likely to time out.

The default metadata store timeout is usually higher than the tick
interval, so the tick handler may be stuck during several ticks. The
record takes some time to be updated (timeout + tick interval, 30 + 5
seconds by default), significantly longer than it takes the metadata
store to trigger an election and recover.

Client applications may rely on the quorum queue topology to connect to
an appropriate node, so making the system reflect the actual topology
faster is important to them.

This commit makes the record update operations use a timeout 1-second
lower than the tick interval. The tick handler process should finish
earlier in case of metadata datastore unavailability and it should not
take more than a couple of ticks once the datastore is available to
update the record.

(cherry picked from commit 8387d739e0)
This commit is contained in:
Arnaud Cogoluègnes 2025-10-03 12:07:26 +00:00 committed by Mergify
parent 9027e2a31d
commit e8afe2bf32
3 changed files with 40 additions and 12 deletions

View File

@ -34,7 +34,7 @@
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, notify_down_all/3, activate_limit_all/2]).
-export([on_node_up/1, on_node_down/1]).
-export([update/2, store_queue/1, update_decorators/2, policy_changed/2]).
-export([update/2, update/3, store_queue/1, update_decorators/2, policy_changed/2]).
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
-export([is_match/2, is_in_virtual_host/2]).
-export([is_replicable/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]).
@ -298,12 +298,18 @@ do_internal_declare(Q0, false) ->
Queue = rabbit_queue_decorator:set(Q),
rabbit_db_queue:create_or_get(Queue).
-spec update
(name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) ->
'not_found' | amqqueue:amqqueue().
-spec update(name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) ->
'not_found' | amqqueue:amqqueue().
update(Name, Fun) ->
rabbit_db_queue:update(Name, Fun).
update(Name, Fun, #{}).
-spec update(name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue()),
#{timeout => timeout()}) ->
'not_found' | amqqueue:amqqueue().
update(Name, Fun, Options) ->
rabbit_db_queue:update(Name, Fun, Options).
-spec ensure_rabbit_queue_record_is_initialized(Queue) -> Ret when
Queue :: amqqueue:amqqueue(),

View File

@ -31,6 +31,7 @@
set/1,
delete/2,
update/2,
update/3,
update_decorators/2,
exists/1,
foreach/2
@ -637,9 +638,23 @@ get_many_in_ets(Table, Names) ->
%% @private
update(QName, Fun) ->
update(QName, Fun, #{}).
-spec update(QName, UpdateFun, Options) -> Ret when
QName :: rabbit_amqqueue:name(),
Queue :: amqqueue:amqqueue(),
UpdateFun :: fun((Queue) -> NewQueue),
NewQueue :: amqqueue:amqqueue(),
Options :: #{timeout => timeout()},
Ret :: Queue | not_found.
%% @doc Updates an existing queue record using `UpdateFun'.
%%
%% @private
update(QName, Fun, Options) ->
rabbit_khepri:handle_fallback(
#{mnesia => fun() -> update_in_mnesia(QName, Fun) end,
khepri => fun() -> update_in_khepri(QName, Fun) end
khepri => fun() -> update_in_khepri(QName, Fun, Options) end
}).
update_in_mnesia(QName, Fun) ->
@ -648,15 +663,19 @@ update_in_mnesia(QName, Fun) ->
update_in_mnesia_tx(QName, Fun)
end).
update_in_khepri(QName, Fun) ->
update_in_khepri(QName, Fun, #{}).
update_in_khepri(QName, Fun, Options) ->
Path = khepri_queue_path(QName),
Ret1 = rabbit_khepri:adv_get(Path),
Ret1 = rabbit_khepri:adv_get(Path, Options),
case Ret1 of
{ok, #{Path := #{data := Q, payload_version := Vsn}}} ->
UpdatePath = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
Q1 = Fun(Q),
Ret2 = rabbit_khepri:put(UpdatePath, Q1),
Ret2 = rabbit_khepri:put(UpdatePath, Q1, Options),
case Ret2 of
ok -> Q1;
{error, {khepri, mismatching_node, _}} ->

View File

@ -446,7 +446,8 @@ become_leader0(QName, Name) ->
amqqueue:set_pid(Q1, {Name, node()}),
live)
end,
_ = rabbit_amqqueue:update(QName, Fun),
Timeout = max(tick_interval() - 1000, 1000),
_ = rabbit_amqqueue:update(QName, Fun, #{timeout => Timeout}),
case rabbit_amqqueue:lookup(QName) of
{ok, Q0} when ?is_amqqueue(Q0) ->
Nodes = get_nodes(Q0),
@ -656,7 +657,7 @@ handle_tick(QName,
ok;
repaired ->
?LOG_DEBUG("Repaired quorum queue ~ts amqqueue record",
[rabbit_misc:rs(QName)])
[rabbit_misc:rs(QName)])
end,
ExpectedNodes = rabbit_nodes:list_members(),
case Nodes -- ExpectedNodes of
@ -1981,8 +1982,7 @@ make_ra_conf(Q, ServerId) ->
make_ra_conf(Q, ServerId, Membership, MacVersion)
when is_integer(MacVersion) ->
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
?TICK_INTERVAL),
TickTimeout = tick_interval(),
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
?SNAPSHOT_INTERVAL),
CheckpointInterval = application:get_env(rabbit,
@ -2408,3 +2408,6 @@ queue_vm_stats_sups() ->
queue_vm_ets() ->
{[quorum_ets],
[[ra_log_ets]]}.
tick_interval() ->
application:get_env(rabbit, quorum_tick_interval, ?TICK_INTERVAL).