Quorum queues: handle_tick bugfix

PR 7668 addressed the issue where multiple tick processes could be spawned
should the previous one take longer than the tick interval. This didn't quite
work as the rabbit_quorum_queue:handle_tick/3 function spawns it's own process
to do the work which isn't the process that the aux tick handler checks before
spawning.
This commit is contained in:
Karl Nilsson 2023-06-16 16:27:46 +01:00
parent b959a92cfa
commit d82804ea7b
2 changed files with 57 additions and 55 deletions

View File

@ -1031,13 +1031,14 @@ handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
_ ->
{no_reply, Aux0, Log0}
end;
handle_aux(leader, _, {handle_tick, Args},
handle_aux(leader, _, {handle_tick, [QName, Overview, Nodes]},
#?AUX{tick_pid = Pid} = Aux, Log, _) ->
NewPid =
case process_is_alive(Pid) of
false ->
%% No active TICK pid
spawn(rabbit_quorum_queue, handle_tick, Args);
%% this function spawns and returns the tick process pid
rabbit_quorum_queue:handle_tick(QName, Overview, Nodes);
true ->
%% Active TICK pid, do nothing
Pid

View File

@ -441,62 +441,63 @@ handle_tick(QName,
%% this makes calls to remote processes so cannot be run inside the
%% ra server
Self = self(),
_ = spawn(
fun() ->
try
Reductions = reductions(Name),
rabbit_core_metrics:queue_stats(QName, NumReadyMsgs,
NumCheckedOut, NumMessages,
Reductions),
Util = case NumConsumers of
0 -> 0;
_ -> rabbit_fifo:usage(Name)
end,
Keys = ?STATISTICS_KEYS -- [consumers,
messages_dlx,
message_bytes_dlx,
single_active_consumer_pid,
single_active_consumer_tag
],
{SacTag, SacPid} = maps:get(single_active_consumer_id,
Overview, {'', ''}),
MsgBytesDiscarded = DiscardBytes + DiscardCheckoutBytes,
MsgBytes = EnqueueBytes + CheckoutBytes + MsgBytesDiscarded,
Infos = [{consumers, NumConsumers},
{consumer_capacity, Util},
{consumer_utilisation, Util},
{message_bytes_ready, EnqueueBytes},
{message_bytes_unacknowledged, CheckoutBytes},
{message_bytes, MsgBytes},
{message_bytes_persistent, MsgBytes},
{messages_persistent, NumMessages},
{messages_dlx, NumDiscarded + NumDiscardedCheckedOut},
{message_bytes_dlx, MsgBytesDiscarded},
{single_active_consumer_tag, SacTag},
{single_active_consumer_pid, SacPid}
| infos(QName, Keys)],
rabbit_core_metrics:queue_stats(QName, Infos),
ok = repair_leader_record(QName, Self),
ExpectedNodes = rabbit_nodes:list_members(),
case Nodes -- ExpectedNodes of
[] ->
ok;
Stale ->
rabbit_log:info("~ts: stale nodes detected. Purging ~w",
[rabbit_misc:rs(QName), Stale]),
%% pipeline purge command
{ok, Q} = rabbit_amqqueue:lookup(QName),
ok = ra:pipeline_command(amqqueue:get_pid(Q),
rabbit_fifo:make_purge_nodes(Stale)),
spawn(
fun() ->
try
Reductions = reductions(Name),
rabbit_core_metrics:queue_stats(QName, NumReadyMsgs,
NumCheckedOut, NumMessages,
Reductions),
Util = case NumConsumers of
0 -> 0;
_ -> rabbit_fifo:usage(Name)
end,
Keys = ?STATISTICS_KEYS -- [consumers,
messages_dlx,
message_bytes_dlx,
single_active_consumer_pid,
single_active_consumer_tag
],
{SacTag, SacPid} = maps:get(single_active_consumer_id,
Overview, {'', ''}),
MsgBytesDiscarded = DiscardBytes + DiscardCheckoutBytes,
MsgBytes = EnqueueBytes + CheckoutBytes + MsgBytesDiscarded,
Infos = [{consumers, NumConsumers},
{consumer_capacity, Util},
{consumer_utilisation, Util},
{message_bytes_ready, EnqueueBytes},
{message_bytes_unacknowledged, CheckoutBytes},
{message_bytes, MsgBytes},
{message_bytes_persistent, MsgBytes},
{messages_persistent, NumMessages},
{messages_dlx, NumDiscarded + NumDiscardedCheckedOut},
{message_bytes_dlx, MsgBytesDiscarded},
{single_active_consumer_tag, SacTag},
{single_active_consumer_pid, SacPid}
| infos(QName, Keys)],
rabbit_core_metrics:queue_stats(QName, Infos),
ok = repair_leader_record(QName, Self),
ExpectedNodes = rabbit_nodes:list_members(),
case Nodes -- ExpectedNodes of
[] ->
ok;
Stale ->
rabbit_log:debug("~ts: stale nodes detected. Purging ~w",
[rabbit_misc:rs(QName), Stale]),
%% pipeline purge command
{ok, Q} = rabbit_amqqueue:lookup(QName),
ok = ra:pipeline_command(amqqueue:get_pid(Q),
rabbit_fifo:make_purge_nodes(Stale)),
ok
end
catch
_:_ ->
ok
end
end),
ok.
catch
_:Err ->
rabbit_log:debug("~ts: handle tick failed with ~p",
[rabbit_misc:rs(QName), Err]),
ok
end
end).
repair_leader_record(QName, Self) ->
{ok, Q} = rabbit_amqqueue:lookup(QName),