Merge 3a1e3eeab6 into 1c264fa471
This commit is contained in:
commit
f0d0b0aaa5
|
|
@ -68,6 +68,9 @@
|
|||
notify_decorators/3,
|
||||
spawn_notify_decorators/3]).
|
||||
|
||||
-export([get_member_with_highest_index/3,
|
||||
get_member_with_highest_index/4]).
|
||||
|
||||
-export([is_enabled/0,
|
||||
is_compatible/3,
|
||||
declare/2]).
|
||||
|
|
@ -1246,7 +1249,7 @@ key_metrics_rpc(ServerId) ->
|
|||
Metrics = ra:key_metrics(ServerId),
|
||||
Metrics#{machine_version => rabbit_fifo:version()}.
|
||||
|
||||
-spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) ->
|
||||
-spec status(rabbit_types:vhost(), rabbit_misc:resource_name()) ->
|
||||
[[{binary(), term()}]] | {error, term()}.
|
||||
status(Vhost, QueueName) ->
|
||||
%% Handle not found queues
|
||||
|
|
@ -1336,6 +1339,48 @@ get_sys_status(Proc) ->
|
|||
|
||||
end.
|
||||
|
||||
-spec get_member_with_highest_index(rabbit_types:vhost(), rabbit_misc:resource_name(), atom()) ->
|
||||
[[{binary(), term()}]] | {error, term()}.
|
||||
get_member_with_highest_index(Vhost, QueueName, IndexName) ->
|
||||
get_member_with_highest_index(Vhost, QueueName, IndexName, false).
|
||||
|
||||
-spec get_member_with_highest_index(rabbit_types:vhost(), rabbit_misc:resource_name(), atom(), boolean()) ->
|
||||
[[{binary(), term()}]] | {error, term()}.
|
||||
get_member_with_highest_index(Vhost, QueueName, IndexName, IncludeOfflineMembers) ->
|
||||
case ?MODULE:status(Vhost, QueueName) of
|
||||
Status when is_list(Status) ->
|
||||
IndexNameInternal = rabbit_data_coercion:to_atom(IndexName),
|
||||
case index_name_to_status_key(IndexNameInternal) of
|
||||
Key when is_binary(Key) ->
|
||||
{_HighestIndexValue, HighestEntry} =
|
||||
lists:foldl(
|
||||
fun(Entry, {PreviousIndexValue, _PreviousEntry} = Acc) ->
|
||||
State = rabbit_misc:pget(<<"Raft State">>, Entry),
|
||||
case {rabbit_misc:pget(Key, Entry), IncludeOfflineMembers} of
|
||||
{CurrentIndexValue, false} when is_integer(CurrentIndexValue),
|
||||
CurrentIndexValue > PreviousIndexValue,
|
||||
State /= noproc ->
|
||||
{CurrentIndexValue, Entry};
|
||||
{CurrentIndexValue, true} when is_integer(CurrentIndexValue),
|
||||
CurrentIndexValue > PreviousIndexValue ->
|
||||
{CurrentIndexValue, Entry};
|
||||
_ ->
|
||||
Acc
|
||||
end
|
||||
end, {-100, []}, Status),
|
||||
[HighestEntry];
|
||||
undefined ->
|
||||
[]
|
||||
end;
|
||||
{error, _} = Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
index_name_to_status_key(I) when I =:= commit; I =:= commit_index -> <<"Commit Index">>;
|
||||
index_name_to_status_key(I) when I =:= log; I =:= log_index -> <<"Last Log Index">>;
|
||||
index_name_to_status_key(I) when I =:= snapshot; I =:= snapshot_index -> <<"Snapshot Index">>;
|
||||
index_name_to_status_key(_I) -> undefined.
|
||||
|
||||
add_member(VHost, Name, Node, Membership, Timeout)
|
||||
when is_binary(VHost) andalso
|
||||
is_binary(Name) andalso
|
||||
|
|
|
|||
|
|
@ -115,7 +115,8 @@ groups() ->
|
|||
node_removal_is_not_quorum_critical,
|
||||
select_nodes_with_least_replicas,
|
||||
select_nodes_with_least_replicas_node_down,
|
||||
subscribe_from_each
|
||||
subscribe_from_each,
|
||||
get_member_with_highest_index
|
||||
|
||||
|
||||
]},
|
||||
|
|
@ -367,6 +368,8 @@ init_per_testcase(Testcase, Config) ->
|
|||
{skip, "peek_with_wrong_queue_type isn't mixed versions compatible"};
|
||||
cancel_consumer_gh_3729 when IsMixed andalso RabbitMQ3 ->
|
||||
{skip, "this test is not compatible with RabbitMQ 3.13.x"};
|
||||
get_member_with_highest_index when IsMixed ->
|
||||
{skip, "get_member_with_highest_index isn't mixed versions compatible"};
|
||||
_ ->
|
||||
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
|
||||
|
|
@ -4616,6 +4619,110 @@ leader_health_check(Config) ->
|
|||
amqp_connection:close(Conn1),
|
||||
amqp_connection:close(Conn2).
|
||||
|
||||
get_member_with_highest_index(Config) ->
|
||||
[Node1, Node2, Node3, Node4, Node5] =
|
||||
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
|
||||
Q = ?config(queue_name, Config),
|
||||
VHost = <<"/">>,
|
||||
|
||||
Statuses =
|
||||
%% [{Node, Member, LogIdx, CommitIdx, SnapshotIdx}, ...]
|
||||
[{Node1, leader, 1015, 1010, 1010}, %% highest SnapshotIdx
|
||||
{Node2, follower, 1015, 1010, 1010}, %% highest SnapshotIdx (duplicate)
|
||||
{Node3, follower, 1013, 1013, 1009}, %% highest CommitIdx
|
||||
{Node4, follower, 1016, 1009, undefined}, %% highest LogIdx
|
||||
{Node5, noproc, 1050, 1050, 1050}], %% highest but noproc
|
||||
|
||||
Term = 1,
|
||||
MachineVersion = 7,
|
||||
|
||||
meck:new(rabbit_quorum_queue, [passthrough, no_link]),
|
||||
meck:expect(
|
||||
rabbit_quorum_queue, status,
|
||||
fun(_, _) ->
|
||||
[[{<<"Node Name">>, Node},
|
||||
{<<"Raft State">>, Member},
|
||||
{<<"Last Log Index">>, LogIndex},
|
||||
{<<"Last Written">>, LogIndex},
|
||||
{<<"Last Applied">>, LogIndex},
|
||||
{<<"Commit Index">>, CommitIndex},
|
||||
{<<"Snapshot Index">>, SnapshotIdx},
|
||||
{<<"Term">>, Term},
|
||||
{<<"Machine Version">>, MachineVersion}]
|
||||
|| {Node, Member, LogIndex, CommitIndex, SnapshotIdx} <- Statuses]
|
||||
end),
|
||||
|
||||
ct:pal("quorum status: ~tp", [rabbit_quorum_queue:status(VHost, Q)]),
|
||||
|
||||
ExpectedHighestLogIdx =
|
||||
[[{<<"Node Name">>, Node4},
|
||||
{<<"Raft State">>, follower},
|
||||
{<<"Last Log Index">>, 1016},
|
||||
{<<"Last Written">>,1016},
|
||||
{<<"Last Applied">>,1016},
|
||||
{<<"Commit Index">>, 1009},
|
||||
{<<"Snapshot Index">>, undefined},
|
||||
{<<"Term">>, Term},
|
||||
{<<"Machine Version">>, MachineVersion}]],
|
||||
|
||||
[?assertEqual(ExpectedHighestLogIdx,
|
||||
rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I)) || I <- [log, log_index]],
|
||||
|
||||
ExpectedHighestCommitIdx =
|
||||
[[{<<"Node Name">>, Node3},
|
||||
{<<"Raft State">>, follower},
|
||||
{<<"Last Log Index">>, 1013},
|
||||
{<<"Last Written">>,1013},
|
||||
{<<"Last Applied">>,1013},
|
||||
{<<"Commit Index">>, 1013},
|
||||
{<<"Snapshot Index">>, 1009},
|
||||
{<<"Term">>, Term},
|
||||
{<<"Machine Version">>, MachineVersion}]],
|
||||
|
||||
[?assertEqual(ExpectedHighestCommitIdx,
|
||||
rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I)) || I <- [commit, commit_index]],
|
||||
|
||||
ExpectedHighestSnapshotIdx =
|
||||
[[{<<"Node Name">>, Node1},
|
||||
{<<"Raft State">>, leader},
|
||||
{<<"Last Log Index">>, 1015},
|
||||
{<<"Last Written">>,1015},
|
||||
{<<"Last Applied">>,1015},
|
||||
{<<"Commit Index">>, 1010},
|
||||
{<<"Snapshot Index">>, 1010},
|
||||
{<<"Term">>, Term},
|
||||
{<<"Machine Version">>, MachineVersion}]],
|
||||
% Duplicate:
|
||||
% [{<<"Node Name">>, Node2},
|
||||
% {<<"Raft State">>, follower},
|
||||
% {<<"Last Log Index">>, 1015},
|
||||
% {<<"Last Written">>,1015},
|
||||
% {<<"Last Applied">>,1015},
|
||||
% {<<"Commit Index">>, 1010},
|
||||
% {<<"Snapshot Index">>, 1010},
|
||||
% {<<"Term">>, Term},
|
||||
% {<<"Machine Version">>, MachineVersion}],
|
||||
|
||||
[?assertEqual(ExpectedHighestSnapshotIdx,
|
||||
rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I)) || I <- [snapshot, snapshot_index]],
|
||||
|
||||
ExpectedHighestIdxForAll =
|
||||
[[{<<"Node Name">>, Node5},
|
||||
{<<"Raft State">>, noproc},
|
||||
{<<"Last Log Index">>, 1050},
|
||||
{<<"Last Written">>,1050},
|
||||
{<<"Last Applied">>,1050},
|
||||
{<<"Commit Index">>, 1050},
|
||||
{<<"Snapshot Index">>, 1050},
|
||||
{<<"Term">>, Term},
|
||||
{<<"Machine Version">>, MachineVersion}]],
|
||||
|
||||
[?assertEqual(ExpectedHighestIdxForAll,
|
||||
rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I, true))
|
||||
|| I <- [log, log_index, commit, commit_index, snapshot, snapshot_index]],
|
||||
|
||||
ok.
|
||||
|
||||
leader_locator_client_local(Config) ->
|
||||
[Server1 | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
|
|
|
|||
75
deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex
vendored
Normal file
75
deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex
vendored
Normal file
|
|
@ -0,0 +1,75 @@
|
|||
## This Source Code Form is subject to the terms of the Mozilla Public
|
||||
## License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
##
|
||||
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
||||
|
||||
defmodule RabbitMQ.CLI.Queues.Commands.MemberWithHighestIndexCommand do
|
||||
alias RabbitMQ.CLI.Core.DocGuide
|
||||
import RabbitMQ.CLI.Core.DataCoercion
|
||||
|
||||
@behaviour RabbitMQ.CLI.CommandBehaviour
|
||||
|
||||
use RabbitMQ.CLI.Core.AcceptsOnePositionalArgument
|
||||
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
|
||||
|
||||
def switches(), do: [offline_members: :boolean, index: :string, timeout: :integer]
|
||||
def aliases(), do: [o: :offline_members, i: :index, t: :timeout]
|
||||
|
||||
def merge_defaults(args, opts) do
|
||||
{args, Map.merge(%{vhost: "/", index: "commit", offline_members: false}, opts)}
|
||||
end
|
||||
|
||||
def run([name] = _args, %{vhost: vhost, index: index, node: node_name, offline_members: offline_members}) do
|
||||
case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :get_member_with_highest_index, [
|
||||
vhost,
|
||||
name,
|
||||
to_atom(String.downcase(index)),
|
||||
offline_members
|
||||
]) do
|
||||
{:error, :classic_queue_not_supported} ->
|
||||
index = format_index(String.downcase(index))
|
||||
{:error, "Cannot get #{index} index from a classic queue"}
|
||||
|
||||
{:error, :not_found} ->
|
||||
{:error, {:not_found, :queue, vhost, name}}
|
||||
|
||||
other ->
|
||||
other
|
||||
end
|
||||
end
|
||||
|
||||
use RabbitMQ.CLI.DefaultOutput
|
||||
|
||||
def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable
|
||||
|
||||
def usage, do: "member_with_highest_index <queue> [--vhost <vhost>] [--offline-members] [--index <commit|commit_index|log|log_index|snapshot|snapshot_index>]"
|
||||
|
||||
def usage_additional do
|
||||
[
|
||||
["<queue>", "quorum queue name"],
|
||||
["--offline-members", "include members which are down (in noproc state)"],
|
||||
["--index <commit|commit_index|log|log_index|snapshot|snapshot_index>", "name of the index to use to lookup highest member"]
|
||||
]
|
||||
end
|
||||
|
||||
def usage_doc_guides() do
|
||||
[
|
||||
DocGuide.quorum_queues()
|
||||
]
|
||||
end
|
||||
|
||||
def help_section, do: :replication
|
||||
|
||||
def description, do: "Look up first member of a quorum queue with the highest commit, log or snapshot index."
|
||||
|
||||
def banner([name], %{node: node, index: index, vhost: vhost}) do
|
||||
index = format_index(String.downcase(index))
|
||||
"Member with highest #{index} index for queue #{name} in vhost #{vhost} on node #{node}..."
|
||||
end
|
||||
|
||||
defp format_index("log_index"), do: "log"
|
||||
defp format_index("commit_index"), do: "commit"
|
||||
defp format_index("snapshot_index"), do: "snapshot"
|
||||
defp format_index(index_name), do: index_name
|
||||
end
|
||||
|
|
@ -0,0 +1,55 @@
|
|||
## This Source Code Form is subject to the terms of the Mozilla Public
|
||||
## License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
##
|
||||
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
||||
|
||||
defmodule RabbitMQ.CLI.Queues.Commands.MemberWithHighestIndexCommandTest do
|
||||
use ExUnit.Case, async: false
|
||||
import TestHelper
|
||||
|
||||
@command RabbitMQ.CLI.Queues.Commands.MemberWithHighestIndexCommand
|
||||
|
||||
setup_all do
|
||||
RabbitMQ.CLI.Core.Distribution.start()
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
setup context do
|
||||
{:ok,
|
||||
opts: %{
|
||||
node: get_rabbit_hostname(),
|
||||
timeout: context[:test_timeout] || 30000
|
||||
}}
|
||||
end
|
||||
|
||||
test "validate: when no arguments are provided, returns a failure" do
|
||||
assert @command.validate([], %{}) == {:validation_failure, :not_enough_args}
|
||||
end
|
||||
|
||||
test "validate: when two or more arguments are provided, returns a failure" do
|
||||
assert @command.validate(["quorum-queue-a", "one-extra-arg"], %{}) ==
|
||||
{:validation_failure, :too_many_args}
|
||||
|
||||
assert @command.validate(
|
||||
["quorum-queue-a", "extra-arg", "another-extra-arg"],
|
||||
%{}
|
||||
) == {:validation_failure, :too_many_args}
|
||||
end
|
||||
|
||||
test "validate: treats one positional arguments and default switches as a success" do
|
||||
assert @command.validate(["quorum-queue-a"], %{}) == :ok
|
||||
end
|
||||
|
||||
@tag test_timeout: 3000
|
||||
test "run: targeting an unreachable node throws a badrpc" do
|
||||
assert match?(
|
||||
{:badrpc, _},
|
||||
@command.run(
|
||||
["quorum-queue-a"],
|
||||
%{node: :jake@thedog, vhost: "/", index: "log", offline_members: true, timeout: 200}
|
||||
)
|
||||
)
|
||||
end
|
||||
end
|
||||
Loading…
Reference in New Issue