Add force checkpoint functions for quorum queues and command line tool
(cherry picked from commit b54ab1d5e5
)
This commit is contained in:
parent
6dd7447b92
commit
0d3dfd9695
|
@ -84,6 +84,8 @@
|
|||
queue_vm_stats_sups/0,
|
||||
queue_vm_ets/0]).
|
||||
|
||||
-export([force_checkpoint/2, force_checkpoint_on_queue/1]).
|
||||
|
||||
%% for backwards compatibility
|
||||
-export([file_handle_leader_reservation/1,
|
||||
file_handle_other_reservation/0,
|
||||
|
@ -2115,6 +2117,39 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis
|
|||
rabbit_log:warning("Shrinking finished"),
|
||||
ok.
|
||||
|
||||
force_checkpoint_on_queue(QName) ->
|
||||
Node = node(),
|
||||
QNameFmt = rabbit_misc:rs(QName),
|
||||
case rabbit_amqqueue:lookup(QName) of
|
||||
{ok, Q} when ?amqqueue_is_classic(Q) ->
|
||||
{error, classic_queue_not_supported};
|
||||
{ok, Q} when ?amqqueue_is_quorum(Q) ->
|
||||
{RaName, _} = amqqueue:get_pid(Q),
|
||||
rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint]),
|
||||
rabbit_log:debug("Sent command to force checkpoint ~ts", [QNameFmt]);
|
||||
{ok, _Q} ->
|
||||
{error, not_quorum_queue};
|
||||
{error, _} = E ->
|
||||
E
|
||||
end.
|
||||
|
||||
force_checkpoint(VhostSpec, QueueSpec) ->
|
||||
[begin
|
||||
QName = amqqueue:get_name(Q),
|
||||
case force_checkpoint_on_queue(QName) of
|
||||
ok ->
|
||||
{QName, {ok}};
|
||||
{error, Err} ->
|
||||
rabbit_log:warning("~ts: failed to force checkpoint, error: ~w",
|
||||
[rabbit_misc:rs(QName), Err]),
|
||||
{QName, {error, Err}}
|
||||
end
|
||||
end
|
||||
|| Q <- rabbit_amqqueue:list(),
|
||||
amqqueue:get_type(Q) == ?MODULE,
|
||||
is_match(amqqueue:get_vhost(Q), VhostSpec)
|
||||
andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)].
|
||||
|
||||
is_minority(All, Up) ->
|
||||
MinQuorum = length(All) div 2 + 1,
|
||||
length(Up) < MinQuorum.
|
||||
|
|
|
@ -98,6 +98,8 @@ groups() ->
|
|||
force_shrink_member_to_current_member,
|
||||
force_all_queues_shrink_member_to_current_member,
|
||||
force_vhost_queues_shrink_member_to_current_member,
|
||||
force_checkpoint_on_queue,
|
||||
force_checkpoint,
|
||||
policy_repair,
|
||||
gh_12635,
|
||||
replica_states
|
||||
|
@ -1339,6 +1341,80 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
|
|||
?assertEqual(3, length(Nodes0))
|
||||
end || Q <- QQs, VHost <- VHosts].
|
||||
|
||||
force_checkpoint_on_queue(Config) ->
|
||||
[Server0, _Server1, _Server2] =
|
||||
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
|
||||
QQ = ?config(queue_name, Config),
|
||||
RaName = ra_name(QQ),
|
||||
QName = rabbit_misc:r(<<"/">>, queue, QQ),
|
||||
|
||||
?assertEqual({'queue.declare_ok', QQ, 0, 0},
|
||||
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
|
||||
|
||||
rabbit_ct_client_helpers:publish(Ch, QQ, 3),
|
||||
wait_for_messages_ready([Server0], RaName, 3),
|
||||
|
||||
% Wait for initial checkpoint and make sure it's 0; checkpoint hasn't been triggered yet.
|
||||
rabbit_ct_helpers:await_condition(
|
||||
fun() ->
|
||||
{ok, #{aux := Aux1}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
|
||||
{aux_v3, _, _, _, _, _, _, {checkpoint, Index, _, _, _, _, _}} = Aux1,
|
||||
case Index of
|
||||
0 -> true;
|
||||
_ -> false
|
||||
end
|
||||
end),
|
||||
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
|
||||
force_checkpoint_on_queue, [QName]),
|
||||
|
||||
% Wait for initial checkpoint and make sure it's not 0
|
||||
rabbit_ct_helpers:await_condition(
|
||||
fun() ->
|
||||
{ok, #{aux := Aux1}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
|
||||
{aux_v3, _, _, _, _, _, _, {checkpoint, Index, _, _, _, _, _}} = Aux1,
|
||||
case Index of
|
||||
0 -> false;
|
||||
_ -> true
|
||||
end
|
||||
end).
|
||||
|
||||
force_checkpoint(Config) ->
|
||||
[Server0, _Server1, _Server2] =
|
||||
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
|
||||
QQ = ?config(queue_name, Config),
|
||||
CQ = <<"force_checkpoint_cq">>,
|
||||
RaName = ra_name(QQ),
|
||||
|
||||
?assertEqual({'queue.declare_ok', QQ, 0, 0},
|
||||
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
|
||||
|
||||
?assertEqual({'queue.declare_ok', CQ, 0, 0},
|
||||
declare(Ch, CQ, [{<<"x-queue-type">>, longstr, <<"classic">>}])),
|
||||
|
||||
rabbit_ct_client_helpers:publish(Ch, QQ, 3),
|
||||
wait_for_messages_ready([Server0], RaName, 3),
|
||||
|
||||
meck:expect(rabbit_quorum_queue, force_checkpoint_on_queue, fun(Q) -> ok end),
|
||||
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
|
||||
force_checkpoint, [<<".*">>, <<".*">>]),
|
||||
|
||||
% Waiting here to make sure checkpoint has been forced
|
||||
rabbit_ct_helpers:await_condition(
|
||||
fun() ->
|
||||
{ok, #{aux := Aux1}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
|
||||
{aux_v3, _, _, _, _, _, _, {checkpoint, Index, _, _, _, _, _}} = Aux1,
|
||||
case Index of
|
||||
0 -> false;
|
||||
_ -> true
|
||||
end
|
||||
end),
|
||||
|
||||
% Make sure force_checkpoint_on_queue was only called for the quorun queue
|
||||
?assertEqual(1, meck:num_calls(rabbit_quorum_queue, force_checkpoint_on_queue, '_')).
|
||||
|
||||
% Tests that, if the process of a QQ is dead in the moment of declaring a policy
|
||||
% that affects such queue, when the process is made available again, the policy
|
||||
|
|
107
deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/force_checkpoint_command.ex
vendored
Normal file
107
deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/force_checkpoint_command.ex
vendored
Normal file
|
@ -0,0 +1,107 @@
|
|||
## This Source Code Form is subject to the terms of the Mozilla Public
|
||||
## License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
##
|
||||
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
||||
|
||||
defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do
|
||||
alias RabbitMQ.CLI.Core.{DocGuide}
|
||||
|
||||
@behaviour RabbitMQ.CLI.CommandBehaviour
|
||||
|
||||
defp default_opts,
|
||||
do: %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false}
|
||||
|
||||
def switches(),
|
||||
do: [
|
||||
vhost_pattern: :string,
|
||||
queue_pattern: :string,
|
||||
errors_only: :boolean
|
||||
]
|
||||
|
||||
def merge_defaults(args, opts) do
|
||||
{args, Map.merge(default_opts(), opts)}
|
||||
end
|
||||
|
||||
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
|
||||
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
|
||||
|
||||
def run([], %{
|
||||
node: node_name,
|
||||
vhost_pattern: vhost_pat,
|
||||
queue_pattern: queue_pat,
|
||||
errors_only: errors_only
|
||||
}) do
|
||||
args = [vhost_pat, queue_pat]
|
||||
|
||||
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :force_checkpoint, args) do
|
||||
{:error, _} = error ->
|
||||
error
|
||||
|
||||
{:badrpc, _} = error ->
|
||||
error
|
||||
|
||||
results when errors_only ->
|
||||
for {{:resource, vhost, _kind, name}, {:error, _, _} = res} <- results,
|
||||
do: [
|
||||
{:vhost, vhost},
|
||||
{:name, name},
|
||||
{:result, format_result(res)}
|
||||
]
|
||||
|
||||
results ->
|
||||
for {{:resource, vhost, _kind, name}, res} <- results,
|
||||
do: [
|
||||
{:vhost, vhost},
|
||||
{:name, name},
|
||||
{:result, format_result(res)}
|
||||
]
|
||||
end
|
||||
end
|
||||
|
||||
use RabbitMQ.CLI.DefaultOutput
|
||||
|
||||
def formatter(), do: RabbitMQ.CLI.Formatters.Table
|
||||
|
||||
def usage,
|
||||
do: "force_checkpoint [--vhost-pattern <pattern>] [--queue-pattern <pattern>]"
|
||||
|
||||
def usage_additional do
|
||||
[
|
||||
["--queue-pattern <pattern>", "regular expression to match queue names"],
|
||||
["--vhost-pattern <pattern>", "regular expression to match virtual host names"],
|
||||
["--errors-only", "only list queues which reported an error"]
|
||||
]
|
||||
end
|
||||
|
||||
def usage_doc_guides() do
|
||||
[
|
||||
DocGuide.quorum_queues()
|
||||
]
|
||||
end
|
||||
|
||||
def help_section, do: :replication
|
||||
|
||||
def description,
|
||||
do: "Forces checkpoints for all matching quorum queues"
|
||||
|
||||
def banner([], _) do
|
||||
"Forcing checkpoint for all matching quorum queues..."
|
||||
end
|
||||
|
||||
#
|
||||
# Implementation
|
||||
#
|
||||
|
||||
defp format_result({:ok}) do
|
||||
"ok"
|
||||
end
|
||||
|
||||
defp format_result({:error, :timeout}) do
|
||||
"error: the operation timed out and may not have been completed"
|
||||
end
|
||||
|
||||
defp format_result({:error, err}) do
|
||||
to_string(:io_lib.format("error: ~W", [err, 10]))
|
||||
end
|
||||
end
|
|
@ -0,0 +1,64 @@
|
|||
## This Source Code Form is subject to the terms of the Mozilla Public
|
||||
## License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
##
|
||||
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
||||
|
||||
defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommandTest do
|
||||
use ExUnit.Case, async: false
|
||||
import TestHelper
|
||||
|
||||
@command RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand
|
||||
|
||||
setup_all do
|
||||
RabbitMQ.CLI.Core.Distribution.start()
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
setup context do
|
||||
{:ok,
|
||||
opts: %{
|
||||
node: get_rabbit_hostname(),
|
||||
timeout: context[:test_timeout] || 30000,
|
||||
vhost_pattern: ".*",
|
||||
queue_pattern: ".*",
|
||||
errors_only: false
|
||||
}}
|
||||
end
|
||||
|
||||
test "merge_defaults: defaults to reporting complete results" do
|
||||
assert @command.merge_defaults([], %{}) ==
|
||||
{[],
|
||||
%{
|
||||
vhost_pattern: ".*",
|
||||
queue_pattern: ".*",
|
||||
errors_only: false
|
||||
}}
|
||||
end
|
||||
|
||||
test "validate: accepts no positional arguments" do
|
||||
assert @command.validate([], %{}) == :ok
|
||||
end
|
||||
|
||||
test "validate: any positional arguments fail validation" do
|
||||
assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :too_many_args}
|
||||
|
||||
assert @command.validate(["quorum-queue-a", "two"], %{}) ==
|
||||
{:validation_failure, :too_many_args}
|
||||
|
||||
assert @command.validate(["quorum-queue-a", "two", "three"], %{}) ==
|
||||
{:validation_failure, :too_many_args}
|
||||
end
|
||||
|
||||
@tag test_timeout: 3000
|
||||
test "run: targeting an unreachable node throws a badrpc", context do
|
||||
assert match?(
|
||||
{:badrpc, _},
|
||||
@command.run(
|
||||
[],
|
||||
Map.merge(context[:opts], %{node: :jake@thedog})
|
||||
)
|
||||
)
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue