Merge pull request #13548 from rabbitmq/rabbitmq-server-13175-mk
Trigger a 4.2.x alpha release build / trigger_alpha_build (push) Has been cancelled
Details
Test (make) / Build and Xref (1.17, 26) (push) Has been cancelled
Details
Test (make) / Build and Xref (1.17, 27) (push) Has been cancelled
Details
Test (make) / Test (1.17, 27, khepri) (push) Has been cancelled
Details
Test (make) / Test (1.17, 27, mnesia) (push) Has been cancelled
Details
Test (make) / Test mixed clusters (1.17, 27, khepri) (push) Has been cancelled
Details
Test (make) / Test mixed clusters (1.17, 27, mnesia) (push) Has been cancelled
Details
Test (make) / Type check (1.17, 27) (push) Has been cancelled
Details
Trigger a 4.2.x alpha release build / trigger_alpha_build (push) Has been cancelled
Details
Test (make) / Build and Xref (1.17, 26) (push) Has been cancelled
Details
Test (make) / Build and Xref (1.17, 27) (push) Has been cancelled
Details
Test (make) / Test (1.17, 27, khepri) (push) Has been cancelled
Details
Test (make) / Test (1.17, 27, mnesia) (push) Has been cancelled
Details
Test (make) / Test mixed clusters (1.17, 27, khepri) (push) Has been cancelled
Details
Test (make) / Test mixed clusters (1.17, 27, mnesia) (push) Has been cancelled
Details
Test (make) / Type check (1.17, 27) (push) Has been cancelled
Details
For 4.1.x, by @aaron-seo: introduce a command that would force QQs to take a checkpoint and truncate its segments
This commit is contained in:
commit
5f1bef6141
|
@ -84,6 +84,8 @@
|
||||||
queue_vm_stats_sups/0,
|
queue_vm_stats_sups/0,
|
||||||
queue_vm_ets/0]).
|
queue_vm_ets/0]).
|
||||||
|
|
||||||
|
-export([force_checkpoint/2, force_checkpoint_on_queue/1]).
|
||||||
|
|
||||||
%% for backwards compatibility
|
%% for backwards compatibility
|
||||||
-export([file_handle_leader_reservation/1,
|
-export([file_handle_leader_reservation/1,
|
||||||
file_handle_other_reservation/0,
|
file_handle_other_reservation/0,
|
||||||
|
@ -157,6 +159,7 @@
|
||||||
-define(RPC_TIMEOUT, 1000).
|
-define(RPC_TIMEOUT, 1000).
|
||||||
-define(START_CLUSTER_TIMEOUT, 5000).
|
-define(START_CLUSTER_TIMEOUT, 5000).
|
||||||
-define(START_CLUSTER_RPC_TIMEOUT, 60_000). %% needs to be longer than START_CLUSTER_TIMEOUT
|
-define(START_CLUSTER_RPC_TIMEOUT, 60_000). %% needs to be longer than START_CLUSTER_TIMEOUT
|
||||||
|
-define(FORCE_CHECKPOINT_RPC_TIMEOUT, 15_000).
|
||||||
-define(TICK_INTERVAL, 5000). %% the ra server tick time
|
-define(TICK_INTERVAL, 5000). %% the ra server tick time
|
||||||
-define(DELETE_TIMEOUT, 5000).
|
-define(DELETE_TIMEOUT, 5000).
|
||||||
-define(MEMBER_CHANGE_TIMEOUT, 20_000).
|
-define(MEMBER_CHANGE_TIMEOUT, 20_000).
|
||||||
|
@ -2115,6 +2118,40 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis
|
||||||
rabbit_log:warning("Shrinking finished"),
|
rabbit_log:warning("Shrinking finished"),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
force_checkpoint_on_queue(QName) ->
|
||||||
|
QNameFmt = rabbit_misc:rs(QName),
|
||||||
|
case rabbit_db_queue:get_durable(QName) of
|
||||||
|
{ok, Q} when ?amqqueue_is_classic(Q) ->
|
||||||
|
{error, classic_queue_not_supported};
|
||||||
|
{ok, Q} when ?amqqueue_is_quorum(Q) ->
|
||||||
|
{RaName, _} = amqqueue:get_pid(Q),
|
||||||
|
rabbit_log:debug("Sending command to force ~ts to take a checkpoint", [QNameFmt]),
|
||||||
|
Nodes = amqqueue:get_nodes(Q),
|
||||||
|
_ = [ra:cast_aux_command({RaName, Node}, force_checkpoint)
|
||||||
|
|| Node <- Nodes],
|
||||||
|
ok;
|
||||||
|
{ok, _Q} ->
|
||||||
|
{error, not_quorum_queue};
|
||||||
|
{error, _} = E ->
|
||||||
|
E
|
||||||
|
end.
|
||||||
|
|
||||||
|
force_checkpoint(VhostSpec, QueueSpec) ->
|
||||||
|
[begin
|
||||||
|
QName = amqqueue:get_name(Q),
|
||||||
|
case force_checkpoint_on_queue(QName) of
|
||||||
|
ok ->
|
||||||
|
{QName, {ok}};
|
||||||
|
{error, Err} ->
|
||||||
|
rabbit_log:warning("~ts: failed to force checkpoint, error: ~w",
|
||||||
|
[rabbit_misc:rs(QName), Err]),
|
||||||
|
{QName, {error, Err}}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|| Q <- rabbit_db_queue:get_all_durable_by_type(?MODULE),
|
||||||
|
is_match(amqqueue:get_vhost(Q), VhostSpec)
|
||||||
|
andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)].
|
||||||
|
|
||||||
is_minority(All, Up) ->
|
is_minority(All, Up) ->
|
||||||
MinQuorum = length(All) div 2 + 1,
|
MinQuorum = length(All) div 2 + 1,
|
||||||
length(Up) < MinQuorum.
|
length(Up) < MinQuorum.
|
||||||
|
|
|
@ -10,6 +10,7 @@
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||||
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
|
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
|
||||||
|
-include_lib("rabbit/src/rabbit_fifo.hrl").
|
||||||
|
|
||||||
-import(queue_utils, [wait_for_messages_ready/3,
|
-import(queue_utils, [wait_for_messages_ready/3,
|
||||||
wait_for_messages_pending_ack/3,
|
wait_for_messages_pending_ack/3,
|
||||||
|
@ -98,6 +99,8 @@ groups() ->
|
||||||
force_shrink_member_to_current_member,
|
force_shrink_member_to_current_member,
|
||||||
force_all_queues_shrink_member_to_current_member,
|
force_all_queues_shrink_member_to_current_member,
|
||||||
force_vhost_queues_shrink_member_to_current_member,
|
force_vhost_queues_shrink_member_to_current_member,
|
||||||
|
force_checkpoint_on_queue,
|
||||||
|
force_checkpoint,
|
||||||
policy_repair,
|
policy_repair,
|
||||||
gh_12635,
|
gh_12635,
|
||||||
replica_states
|
replica_states
|
||||||
|
@ -1339,6 +1342,96 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
|
||||||
?assertEqual(3, length(Nodes0))
|
?assertEqual(3, length(Nodes0))
|
||||||
end || Q <- QQs, VHost <- VHosts].
|
end || Q <- QQs, VHost <- VHosts].
|
||||||
|
|
||||||
|
force_checkpoint_on_queue(Config) ->
|
||||||
|
[Server0, Server1, Server2] =
|
||||||
|
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||||
|
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
|
||||||
|
QQ = ?config(queue_name, Config),
|
||||||
|
RaName = ra_name(QQ),
|
||||||
|
QName = rabbit_misc:r(<<"/">>, queue, QQ),
|
||||||
|
|
||||||
|
?assertEqual({'queue.declare_ok', QQ, 0, 0},
|
||||||
|
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
|
||||||
|
|
||||||
|
N = 20_000,
|
||||||
|
rabbit_ct_client_helpers:publish(Ch, QQ, N),
|
||||||
|
wait_for_messages_ready([Server0], RaName, N),
|
||||||
|
|
||||||
|
%% The state before any checkpoints
|
||||||
|
rabbit_ct_helpers:await_condition(
|
||||||
|
fun() ->
|
||||||
|
{ok, State, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
|
||||||
|
#{log := #{latest_checkpoint_index := LCI}} = State,
|
||||||
|
LCI =:= undefined
|
||||||
|
end),
|
||||||
|
rabbit_ct_helpers:await_condition(
|
||||||
|
fun() ->
|
||||||
|
{ok, State, _} = rpc:call(Server1, ra, member_overview, [{RaName, Server1}]),
|
||||||
|
#{log := #{latest_checkpoint_index := LCI}} = State,
|
||||||
|
LCI =:= undefined
|
||||||
|
end),
|
||||||
|
rabbit_ct_helpers:await_condition(
|
||||||
|
fun() ->
|
||||||
|
{ok, State, _} = rpc:call(Server2, ra, member_overview, [{RaName, Server2}]),
|
||||||
|
#{log := #{latest_checkpoint_index := LCI}} = State,
|
||||||
|
LCI =:= undefined
|
||||||
|
end),
|
||||||
|
|
||||||
|
{ok, State0, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
|
||||||
|
ct:pal("Ra server state before forcing a checkpoint: ~tp~n", [State0]),
|
||||||
|
|
||||||
|
%% wait for longer than ?CHECK_MIN_INTERVAL_MS ms
|
||||||
|
timer:sleep(?CHECK_MIN_INTERVAL_MS + 1000),
|
||||||
|
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
|
||||||
|
force_checkpoint_on_queue, [QName]),
|
||||||
|
|
||||||
|
%% Wait for initial checkpoint and make sure it's not 0
|
||||||
|
rabbit_ct_helpers:await_condition(
|
||||||
|
fun() ->
|
||||||
|
{ok, State, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
|
||||||
|
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
|
||||||
|
#{log := #{latest_checkpoint_index := LCI}} = State,
|
||||||
|
(LCI =/= undefined) andalso (LCI >= N)
|
||||||
|
end),
|
||||||
|
rabbit_ct_helpers:await_condition(
|
||||||
|
fun() ->
|
||||||
|
{ok, State, _} = rpc:call(Server1, ra, member_overview, [{RaName, Server1}]),
|
||||||
|
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
|
||||||
|
#{log := #{latest_checkpoint_index := LCI}} = State,
|
||||||
|
(LCI =/= undefined) andalso (LCI >= N)
|
||||||
|
end),
|
||||||
|
rabbit_ct_helpers:await_condition(
|
||||||
|
fun() ->
|
||||||
|
{ok, State, _} = rpc:call(Server2, ra, member_overview, [{RaName, Server2}]),
|
||||||
|
ct:pal("Ra server state post forced checkpoint: ~tp~n", [State]),
|
||||||
|
#{log := #{latest_checkpoint_index := LCI}} = State,
|
||||||
|
(LCI =/= undefined) andalso (LCI >= N)
|
||||||
|
end).
|
||||||
|
|
||||||
|
force_checkpoint(Config) ->
|
||||||
|
[Server0, _Server1, _Server2] =
|
||||||
|
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||||
|
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
|
||||||
|
QQ = ?config(queue_name, Config),
|
||||||
|
QQName = rabbit_misc:r(<<"/">>, queue, QQ),
|
||||||
|
CQ = <<"force_checkpoint_cq">>,
|
||||||
|
RaName = ra_name(QQ),
|
||||||
|
|
||||||
|
?assertEqual({'queue.declare_ok', QQ, 0, 0},
|
||||||
|
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
|
||||||
|
|
||||||
|
?assertEqual({'queue.declare_ok', CQ, 0, 0},
|
||||||
|
declare(Ch, CQ, [{<<"x-queue-type">>, longstr, <<"classic">>}])),
|
||||||
|
|
||||||
|
rabbit_ct_client_helpers:publish(Ch, QQ, 3),
|
||||||
|
wait_for_messages_ready([Server0], RaName, 3),
|
||||||
|
|
||||||
|
ForceCheckpointRes = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
|
||||||
|
force_checkpoint, [<<".*">>, <<".*">>]),
|
||||||
|
ExpectedRes = [{QQName, {ok}}],
|
||||||
|
|
||||||
|
% Result should only have quorum queue
|
||||||
|
?assertEqual(ExpectedRes, ForceCheckpointRes).
|
||||||
|
|
||||||
% Tests that, if the process of a QQ is dead in the moment of declaring a policy
|
% Tests that, if the process of a QQ is dead in the moment of declaring a policy
|
||||||
% that affects such queue, when the process is made available again, the policy
|
% that affects such queue, when the process is made available again, the policy
|
||||||
|
|
88
deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/force_checkpoint_command.ex
vendored
Normal file
88
deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/force_checkpoint_command.ex
vendored
Normal file
|
@ -0,0 +1,88 @@
|
||||||
|
## This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
## License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
|
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||||
|
##
|
||||||
|
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
||||||
|
|
||||||
|
defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do
|
||||||
|
alias RabbitMQ.CLI.Core.{DocGuide}
|
||||||
|
|
||||||
|
@behaviour RabbitMQ.CLI.CommandBehaviour
|
||||||
|
|
||||||
|
defp default_opts,
|
||||||
|
do: %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false}
|
||||||
|
|
||||||
|
def switches(),
|
||||||
|
do: [
|
||||||
|
vhost_pattern: :string,
|
||||||
|
queue_pattern: :string,
|
||||||
|
errors_only: :boolean
|
||||||
|
]
|
||||||
|
|
||||||
|
def merge_defaults(args, opts) do
|
||||||
|
{args, Map.merge(default_opts(), opts)}
|
||||||
|
end
|
||||||
|
|
||||||
|
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
|
||||||
|
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
|
||||||
|
|
||||||
|
def run([], %{
|
||||||
|
node: node_name,
|
||||||
|
vhost_pattern: vhost_pat,
|
||||||
|
queue_pattern: queue_pat,
|
||||||
|
errors_only: errors_only
|
||||||
|
}) do
|
||||||
|
args = [vhost_pat, queue_pat]
|
||||||
|
|
||||||
|
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :force_checkpoint, args) do
|
||||||
|
{:badrpc, _} = error ->
|
||||||
|
error
|
||||||
|
|
||||||
|
results when errors_only ->
|
||||||
|
for {{:resource, vhost, _kind, name}, {:error, _, _} = res} <- results,
|
||||||
|
do: [
|
||||||
|
{:vhost, vhost},
|
||||||
|
{:name, name},
|
||||||
|
{:result, res}
|
||||||
|
]
|
||||||
|
|
||||||
|
results ->
|
||||||
|
for {{:resource, vhost, _kind, name}, res} <- results,
|
||||||
|
do: [
|
||||||
|
{:vhost, vhost},
|
||||||
|
{:name, name},
|
||||||
|
{:result, res}
|
||||||
|
]
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
use RabbitMQ.CLI.DefaultOutput
|
||||||
|
|
||||||
|
def formatter(), do: RabbitMQ.CLI.Formatters.Table
|
||||||
|
|
||||||
|
def usage,
|
||||||
|
do: "force_checkpoint [--vhost-pattern <pattern>] [--queue-pattern <pattern>]"
|
||||||
|
|
||||||
|
def usage_additional do
|
||||||
|
[
|
||||||
|
["--queue-pattern <pattern>", "regular expression to match queue names"],
|
||||||
|
["--vhost-pattern <pattern>", "regular expression to match virtual host names"],
|
||||||
|
["--errors-only", "only list queues which reported an error"]
|
||||||
|
]
|
||||||
|
end
|
||||||
|
|
||||||
|
def usage_doc_guides() do
|
||||||
|
[
|
||||||
|
DocGuide.quorum_queues()
|
||||||
|
]
|
||||||
|
end
|
||||||
|
|
||||||
|
def help_section, do: :replication
|
||||||
|
|
||||||
|
def description,
|
||||||
|
do: "Forces checkpoints for all matching quorum queues"
|
||||||
|
|
||||||
|
def banner([], _) do
|
||||||
|
"Forcing checkpoint for all matching quorum queues..."
|
||||||
|
end
|
||||||
|
end
|
|
@ -0,0 +1,64 @@
|
||||||
|
## This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
## License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
|
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||||
|
##
|
||||||
|
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
||||||
|
|
||||||
|
defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommandTest do
|
||||||
|
use ExUnit.Case, async: false
|
||||||
|
import TestHelper
|
||||||
|
|
||||||
|
@command RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand
|
||||||
|
|
||||||
|
setup_all do
|
||||||
|
RabbitMQ.CLI.Core.Distribution.start()
|
||||||
|
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
|
||||||
|
setup context do
|
||||||
|
{:ok,
|
||||||
|
opts: %{
|
||||||
|
node: get_rabbit_hostname(),
|
||||||
|
timeout: context[:test_timeout] || 30000,
|
||||||
|
vhost_pattern: ".*",
|
||||||
|
queue_pattern: ".*",
|
||||||
|
errors_only: false
|
||||||
|
}}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "merge_defaults: defaults to reporting complete results" do
|
||||||
|
assert @command.merge_defaults([], %{}) ==
|
||||||
|
{[],
|
||||||
|
%{
|
||||||
|
vhost_pattern: ".*",
|
||||||
|
queue_pattern: ".*",
|
||||||
|
errors_only: false
|
||||||
|
}}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "validate: accepts no positional arguments" do
|
||||||
|
assert @command.validate([], %{}) == :ok
|
||||||
|
end
|
||||||
|
|
||||||
|
test "validate: any positional arguments fail validation" do
|
||||||
|
assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :too_many_args}
|
||||||
|
|
||||||
|
assert @command.validate(["quorum-queue-a", "two"], %{}) ==
|
||||||
|
{:validation_failure, :too_many_args}
|
||||||
|
|
||||||
|
assert @command.validate(["quorum-queue-a", "two", "three"], %{}) ==
|
||||||
|
{:validation_failure, :too_many_args}
|
||||||
|
end
|
||||||
|
|
||||||
|
@tag test_timeout: 3000
|
||||||
|
test "run: targeting an unreachable node throws a badrpc", context do
|
||||||
|
assert match?(
|
||||||
|
{:badrpc, _},
|
||||||
|
@command.run(
|
||||||
|
[],
|
||||||
|
Map.merge(context[:opts], %{node: :jake@thedog})
|
||||||
|
)
|
||||||
|
)
|
||||||
|
end
|
||||||
|
end
|
Loading…
Reference in New Issue