Merge pull request #14672 from rabbitmq/use-timeout-for-leader-record-repair

Use tick-related timeout to repair leader record
This commit is contained in:
Arnaud Cogoluègnes 2025-10-06 09:28:53 +00:00 committed by GitHub
commit 3f719d53b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 40 additions and 12 deletions

View File

@ -34,7 +34,7 @@
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, notify_down_all/3, activate_limit_all/2]).
-export([on_node_up/1, on_node_down/1]).
-export([update/2, store_queue/1, update_decorators/2, policy_changed/2]).
-export([update/2, update/3, store_queue/1, update_decorators/2, policy_changed/2]).
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
-export([is_match/2, is_in_virtual_host/2]).
-export([is_replicable/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]).
@ -298,12 +298,18 @@ do_internal_declare(Q0, false) ->
Queue = rabbit_queue_decorator:set(Q),
rabbit_db_queue:create_or_get(Queue).
-spec update
(name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) ->
'not_found' | amqqueue:amqqueue().
-spec update(name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) ->
'not_found' | amqqueue:amqqueue().
update(Name, Fun) ->
rabbit_db_queue:update(Name, Fun).
update(Name, Fun, #{}).
-spec update(name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue()),
#{timeout => timeout()}) ->
'not_found' | amqqueue:amqqueue().
update(Name, Fun, Options) ->
rabbit_db_queue:update(Name, Fun, Options).
-spec ensure_rabbit_queue_record_is_initialized(Queue) -> Ret when
Queue :: amqqueue:amqqueue(),

View File

@ -31,6 +31,7 @@
set/1,
delete/2,
update/2,
update/3,
update_decorators/2,
exists/1,
foreach/2
@ -637,9 +638,23 @@ get_many_in_ets(Table, Names) ->
%% @private
update(QName, Fun) ->
update(QName, Fun, #{}).
-spec update(QName, UpdateFun, Options) -> Ret when
QName :: rabbit_amqqueue:name(),
Queue :: amqqueue:amqqueue(),
UpdateFun :: fun((Queue) -> NewQueue),
NewQueue :: amqqueue:amqqueue(),
Options :: #{timeout => timeout()},
Ret :: Queue | not_found.
%% @doc Updates an existing queue record using `UpdateFun'.
%%
%% @private
update(QName, Fun, Options) ->
rabbit_khepri:handle_fallback(
#{mnesia => fun() -> update_in_mnesia(QName, Fun) end,
khepri => fun() -> update_in_khepri(QName, Fun) end
khepri => fun() -> update_in_khepri(QName, Fun, Options) end
}).
update_in_mnesia(QName, Fun) ->
@ -648,15 +663,19 @@ update_in_mnesia(QName, Fun) ->
update_in_mnesia_tx(QName, Fun)
end).
update_in_khepri(QName, Fun) ->
update_in_khepri(QName, Fun, #{}).
update_in_khepri(QName, Fun, Options) ->
Path = khepri_queue_path(QName),
Ret1 = rabbit_khepri:adv_get(Path),
Ret1 = rabbit_khepri:adv_get(Path, Options),
case Ret1 of
{ok, #{Path := #{data := Q, payload_version := Vsn}}} ->
UpdatePath = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
Q1 = Fun(Q),
Ret2 = rabbit_khepri:put(UpdatePath, Q1),
Ret2 = rabbit_khepri:put(UpdatePath, Q1, Options),
case Ret2 of
ok -> Q1;
{error, {khepri, mismatching_node, _}} ->

View File

@ -446,7 +446,8 @@ become_leader0(QName, Name) ->
amqqueue:set_pid(Q1, {Name, node()}),
live)
end,
_ = rabbit_amqqueue:update(QName, Fun),
Timeout = max(tick_interval() - 1000, 1000),
_ = rabbit_amqqueue:update(QName, Fun, #{timeout => Timeout}),
case rabbit_amqqueue:lookup(QName) of
{ok, Q0} when ?is_amqqueue(Q0) ->
Nodes = get_nodes(Q0),
@ -656,7 +657,7 @@ handle_tick(QName,
ok;
repaired ->
?LOG_DEBUG("Repaired quorum queue ~ts amqqueue record",
[rabbit_misc:rs(QName)])
[rabbit_misc:rs(QName)])
end,
ExpectedNodes = rabbit_nodes:list_members(),
case Nodes -- ExpectedNodes of
@ -1981,8 +1982,7 @@ make_ra_conf(Q, ServerId) ->
make_ra_conf(Q, ServerId, Membership, MacVersion)
when is_integer(MacVersion) ->
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
?TICK_INTERVAL),
TickTimeout = tick_interval(),
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
?SNAPSHOT_INTERVAL),
CheckpointInterval = application:get_env(rabbit,
@ -2408,3 +2408,6 @@ queue_vm_stats_sups() ->
queue_vm_ets() ->
{[quorum_ets],
[[ra_log_ets]]}.
tick_interval() ->
application:get_env(rabbit, quorum_tick_interval, ?TICK_INTERVAL).