Fix force_checkpoint tests and CLI command

This commit is contained in:
Aaron Seo 2025-02-04 14:06:29 -08:00
parent b54ab1d5e5
commit 12bf3e094e
No known key found for this signature in database
GPG Key ID: 7F5C877C31189F37
5 changed files with 39 additions and 76 deletions

View File

@ -925,26 +925,6 @@ which_module(3) -> rabbit_fifo_v3;
which_module(4) -> ?MODULE; which_module(4) -> ?MODULE;
which_module(5) -> ?MODULE. which_module(5) -> ?MODULE.
-define(AUX, aux_v3).
-record(checkpoint, {index :: ra:index(),
timestamp :: milliseconds(),
smallest_index :: undefined | ra:index(),
messages_total :: non_neg_integer(),
indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(),
unused_1 = ?NIL}).
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
-record(aux, {name :: atom(),
capacity :: term(),
gc = #aux_gc{} :: #aux_gc{}}).
-record(?AUX, {name :: atom(),
last_decorators_state :: term(),
capacity :: term(),
gc = #aux_gc{} :: #aux_gc{},
tick_pid :: undefined | pid(),
cache = #{} :: map(),
last_checkpoint :: #checkpoint{}}).
init_aux(Name) when is_atom(Name) -> init_aux(Name) when is_atom(Name) ->
%% TODO: catch specific exception throw if table already exists %% TODO: catch specific exception throw if table already exists
ok = ra_machine_ets:create_table(rabbit_fifo_usage, ok = ra_machine_ets:create_table(rabbit_fifo_usage,

View File

@ -227,3 +227,23 @@
msg_ttl => non_neg_integer(), msg_ttl => non_neg_integer(),
created => non_neg_integer() created => non_neg_integer()
}. }.
-define(AUX, aux_v3).
-record(checkpoint, {index :: ra:index(),
timestamp :: milliseconds(),
smallest_index :: undefined | ra:index(),
messages_total :: non_neg_integer(),
indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(),
unused_1 = ?NIL}).
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
-record(aux, {name :: atom(),
capacity :: term(),
gc = #aux_gc{} :: #aux_gc{}}).
-record(?AUX, {name :: atom(),
last_decorators_state :: term(),
capacity :: term(),
gc = #aux_gc{} :: #aux_gc{},
tick_pid :: undefined | pid(),
cache = #{} :: map(),
last_checkpoint :: #checkpoint{}}).

View File

@ -2089,13 +2089,13 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis
force_checkpoint_on_queue(QName) -> force_checkpoint_on_queue(QName) ->
Node = node(), Node = node(),
QNameFmt = rabbit_misc:rs(QName), QNameFmt = rabbit_misc:rs(QName),
case rabbit_amqqueue:lookup(QName) of case rabbit_db_queue:get_durable(QName) of
{ok, Q} when ?amqqueue_is_classic(Q) -> {ok, Q} when ?amqqueue_is_classic(Q) ->
{error, classic_queue_not_supported}; {error, classic_queue_not_supported};
{ok, Q} when ?amqqueue_is_quorum(Q) -> {ok, Q} when ?amqqueue_is_quorum(Q) ->
{RaName, _} = amqqueue:get_pid(Q), {RaName, _} = amqqueue:get_pid(Q),
rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint]), rabbit_log:debug("Sending command to force ~ts to take a checkpoint", [QNameFmt]),
rabbit_log:debug("Sent command to force checkpoint ~ts", [QNameFmt]); rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint]);
{ok, _Q} -> {ok, _Q} ->
{error, not_quorum_queue}; {error, not_quorum_queue};
{error, _} = E -> {error, _} = E ->
@ -2114,8 +2114,7 @@ force_checkpoint(VhostSpec, QueueSpec) ->
{QName, {error, Err}} {QName, {error, Err}}
end end
end end
|| Q <- rabbit_amqqueue:list(), || Q <- rabbit_db_queue:get_all_durable_by_type(?MODULE),
amqqueue:get_type(Q) == ?MODULE,
is_match(amqqueue:get_vhost(Q), VhostSpec) is_match(amqqueue:get_vhost(Q), VhostSpec)
andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]. andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)].
@ -2179,4 +2178,3 @@ file_handle_other_reservation() ->
file_handle_release_reservation() -> file_handle_release_reservation() ->
ok. ok.

View File

@ -10,6 +10,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-include_lib("rabbit/src/rabbit_fifo.hrl").
-import(queue_utils, [wait_for_messages_ready/3, -import(queue_utils, [wait_for_messages_ready/3,
wait_for_messages_pending_ack/3, wait_for_messages_pending_ack/3,
@ -1327,12 +1328,9 @@ force_checkpoint_on_queue(Config) ->
% Wait for initial checkpoint and make sure it's 0; checkpoint hasn't been triggered yet. % Wait for initial checkpoint and make sure it's 0; checkpoint hasn't been triggered yet.
rabbit_ct_helpers:await_condition( rabbit_ct_helpers:await_condition(
fun() -> fun() ->
{ok, #{aux := Aux1}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]), {ok, #{aux := Aux}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
{aux_v3, _, _, _, _, _, _, {checkpoint, Index, _, _, _, _, _}} = Aux1, #aux_v3{last_checkpoint = #checkpoint{index = Index}} = Aux,
case Index of Index =:= 0
0 -> true;
_ -> false
end
end), end),
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
@ -1341,12 +1339,9 @@ force_checkpoint_on_queue(Config) ->
% Wait for initial checkpoint and make sure it's not 0 % Wait for initial checkpoint and make sure it's not 0
rabbit_ct_helpers:await_condition( rabbit_ct_helpers:await_condition(
fun() -> fun() ->
{ok, #{aux := Aux1}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]), {ok, #{aux := Aux}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
{aux_v3, _, _, _, _, _, _, {checkpoint, Index, _, _, _, _, _}} = Aux1, #aux_v3{last_checkpoint = #checkpoint{index = Index}} = Aux,
case Index of Index =/= 0
0 -> false;
_ -> true
end
end). end).
force_checkpoint(Config) -> force_checkpoint(Config) ->
@ -1354,6 +1349,7 @@ force_checkpoint(Config) ->
rabbit_ct_broker_helpers:get_node_configs(Config, nodename), rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
QQ = ?config(queue_name, Config), QQ = ?config(queue_name, Config),
QQName = rabbit_misc:r(<<"/">>, queue, QQ),
CQ = <<"force_checkpoint_cq">>, CQ = <<"force_checkpoint_cq">>,
RaName = ra_name(QQ), RaName = ra_name(QQ),
@ -1366,24 +1362,12 @@ force_checkpoint(Config) ->
rabbit_ct_client_helpers:publish(Ch, QQ, 3), rabbit_ct_client_helpers:publish(Ch, QQ, 3),
wait_for_messages_ready([Server0], RaName, 3), wait_for_messages_ready([Server0], RaName, 3),
meck:expect(rabbit_quorum_queue, force_checkpoint_on_queue, fun(Q) -> ok end), ForceCheckpointRes = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_checkpoint, [<<".*">>, <<".*">>]), force_checkpoint, [<<".*">>, <<".*">>]),
ExpectedRes = [{QQName, {ok}}],
% Waiting here to make sure checkpoint has been forced % Result should only have quorum queue
rabbit_ct_helpers:await_condition( ?assertEqual(ExpectedRes, ForceCheckpointRes).
fun() ->
{ok, #{aux := Aux1}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
{aux_v3, _, _, _, _, _, _, {checkpoint, Index, _, _, _, _, _}} = Aux1,
case Index of
0 -> false;
_ -> true
end
end),
% Make sure force_checkpoint_on_queue was only called for the quorun queue
?assertEqual(1, meck:num_calls(rabbit_quorum_queue, force_checkpoint_on_queue, '_')).
% Tests that, if the process of a QQ is dead in the moment of declaring a policy % Tests that, if the process of a QQ is dead in the moment of declaring a policy
% that affects such queue, when the process is made available again, the policy % that affects such queue, when the process is made available again, the policy

View File

@ -35,9 +35,6 @@ defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do
args = [vhost_pat, queue_pat] args = [vhost_pat, queue_pat]
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :force_checkpoint, args) do case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :force_checkpoint, args) do
{:error, _} = error ->
error
{:badrpc, _} = error -> {:badrpc, _} = error ->
error error
@ -46,7 +43,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do
do: [ do: [
{:vhost, vhost}, {:vhost, vhost},
{:name, name}, {:name, name},
{:result, format_result(res)} {:result, res}
] ]
results -> results ->
@ -54,7 +51,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do
do: [ do: [
{:vhost, vhost}, {:vhost, vhost},
{:name, name}, {:name, name},
{:result, format_result(res)} {:result, res}
] ]
end end
end end
@ -88,20 +85,4 @@ defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do
def banner([], _) do def banner([], _) do
"Forcing checkpoint for all matching quorum queues..." "Forcing checkpoint for all matching quorum queues..."
end end
#
# Implementation
#
defp format_result({:ok}) do
"ok"
end
defp format_result({:error, :timeout}) do
"error: the operation timed out and may not have been completed"
end
defp format_result({:error, err}) do
to_string(:io_lib.format("error: ~W", [err, 10]))
end
end end