Add inclusive aliases to ctl info keys

Includes generic ability to provide aliases in other commands.
This commit is contained in:
Alex Valiushko 2022-11-06 15:53:19 -08:00
parent 149a893d63
commit d70660dac7
8 changed files with 198 additions and 25 deletions

View File

@ -1394,11 +1394,11 @@ or prefetch count.
.It Cm memory
Bytes of memory allocated by the runtime for the
queue, including stack, heap and internal structures.
.It Cm slave_pids
.It Cm mirror_pids
If the queue is mirrored, this lists the IDs of the mirrors (follower replicas).
To learn more, see the
.Lk https://www.rabbitmq.com/ha.html "RabbitMQ Mirroring guide"
.It Cm synchronised_slave_pids
.It Cm synchronised_mirror_pids
If the queue is mirrored, this gives the IDs of the mirrors (follower replicas) which
are in sync with the leader replica. To learn more, see the
.Lk https://www.rabbitmq.com/ha.html "RabbitMQ Mirroring guide"
@ -1416,6 +1416,8 @@ be shown with a status of
(and most other
.Ar queueinfoitem
will be unavailable).
.It Cm type
Queue type, one of: quorum, stream, classic.
.El
.Pp
If no
@ -1428,9 +1430,45 @@ each queue of the virtual host named
.sp
.Dl rabbitmqctl list_queues -p my-vhost messages consumers
.\" ------------------------------------------------------------------
.It Cm list_unresponsive_queues Oo Fl -local Oc Oo Fl -queue-timeout Ar milliseconds Oc Oo Ar column ... Oc Op Fl -no-table-headers
.It Cm list_unresponsive_queues Oo Fl -local Oc Oo Fl -queue-timeout Ar milliseconds Oc Oo Ar queueinfoitem ... Oc Op Fl -no-table-headers
.Pp
Tests queues to respond within timeout. Lists those which did not respond
Tests queues to respond within timeout. Lists those which did not respond.
.Pp
Displayed queues can be filtered by their status or location using one
of the following mutually exclusive options:
.Bl -tag -width Ds
.It Fl -all
List all queues.
.It Fl -local
List only those queues whose leader replica is located on the current
node.
.El
.Pp
The
.Ar queueinfoitem
parameter is used to indicate which queue information items to include
in the results.
The column order in the results will match the order of the parameters.
.Ar queueinfoitem
can take any value from the list that follows:
.Bl -tag -width Ds
.It Cm name
The name of the queue with non\-ASCII characters escaped as in C.
.It Cm durable
Whether or not the queue survives server restarts.
.It Cm auto_delete
Whether the queue will be deleted automatically when no longer used.
.It Cm arguments
Queue arguments.
.It Cm policy
Effective policy name for the queue.
.It Cm pid
Erlang process identifier of the leader replica.
.It Cm recoverable_mirrors
Erlang process identifiers of the mirror replicas that did respond in time.
.It Cm type
Queue type, one of: quorum, stream, classic.
.El
.Pp
For example, this command lists only those unresponsive queues whose leader replica
is hosted on the target node.

View File

@ -47,7 +47,7 @@ CLI core consists of several modules implementing command execution process:
#### Arguments parsing
Command line arguments are parsed with [OptionParser](https://elixir-lang.org/docs/stable/elixir/OptionParser.html)
Command line arguments are parsed with [OptionParser](https://hexdocs.pm/elixir/OptionParser.html)
Parser returns a list of unnamed arguments and a map of options (named arguments)
First unnamed argument is a command name.
Named arguments can be global or command specific.
@ -412,7 +412,7 @@ and returns a list of strings, that should be printed.
format_stream(output_stream :: Enumerable.t, options :: Map.t) :: Enumerable.t
Format a stream of return values. This function uses elixir
Stream [https://elixir-lang.org/docs/stable/elixir/Stream.html] abstraction
Stream [https://hexdocs.pm/elixir/Stream.html] abstraction
to define processing of continuous data, so the CLI can output data in realtime.
Used in `list_*` commands, that emit data asynchronously.

View File

@ -43,13 +43,14 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListChannelsCommand do
def run([_ | _] = args, %{node: node_name, timeout: timeout}) do
info_keys = InfoKeys.prepare_info_keys(args)
broker_keys = InfoKeys.broker_keys(info_keys)
Helpers.with_nodes_in_cluster(node_name, fn nodes ->
RpcStream.receive_list_items(
node_name,
:rabbit_channel,
:emit_info_all,
[nodes, info_keys],
[nodes, broker_keys],
timeout,
info_keys,
Kernel.length(nodes)

View File

@ -42,13 +42,14 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListConnectionsCommand do
def run([_ | _] = args, %{node: node_name, timeout: timeout}) do
info_keys = InfoKeys.prepare_info_keys(args)
broker_keys = InfoKeys.broker_keys(info_keys)
Helpers.with_nodes_in_cluster(node_name, fn nodes ->
RpcStream.receive_list_items(
node_name,
:rabbit_networking,
:emit_connection_info_all,
[nodes, info_keys],
[nodes, broker_keys],
timeout,
info_keys,
Kernel.length(nodes)

View File

@ -23,7 +23,12 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListQueuesCommand do
head_message_timestamp disk_reads disk_writes consumers
consumer_utilisation consumer_capacity
memory slave_pids synchronised_slave_pids state type
leader members online)a
leader members online
mirror_pids synchronised_mirror_pids)a
@info_key_aliases [
{:mirror_pids, :slave_pids},
{:synchronised_mirror_pids, :synchronised_slave_pids}
]
def description(), do: "Lists queues and their properties"
@ -61,7 +66,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListQueuesCommand do
end
def validate(args, _opts) do
case InfoKeys.validate_info_keys(args, @info_keys) do
case InfoKeys.validate_info_keys(args, @info_keys, @info_key_aliases) do
{:ok, _} -> :ok
err -> err
end
@ -85,12 +90,13 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListQueuesCommand do
other -> other
end
info_keys = InfoKeys.prepare_info_keys(args)
info_keys = InfoKeys.prepare_info_keys(args, @info_key_aliases)
broker_keys = InfoKeys.broker_keys(info_keys)
Helpers.with_nodes_in_cluster(node_name, fn nodes ->
offline_mfa = {:rabbit_amqqueue, :emit_info_down, [vhost, info_keys]}
local_mfa = {:rabbit_amqqueue, :emit_info_local, [vhost, info_keys]}
online_mfa = {:rabbit_amqqueue, :emit_info_all, [nodes, vhost, info_keys]}
offline_mfa = {:rabbit_amqqueue, :emit_info_down, [vhost, broker_keys]}
local_mfa = {:rabbit_amqqueue, :emit_info_local, [vhost, broker_keys]}
online_mfa = {:rabbit_amqqueue, :emit_info_all, [nodes, vhost, broker_keys]}
{chunks, mfas} =
case {local_opt, offline, online} do

View File

@ -14,7 +14,9 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListUnresponsiveQueuesCommand do
@behaviour RabbitMQ.CLI.CommandBehaviour
@info_keys ~w(name durable auto_delete
arguments pid recoverable_slaves)a
arguments pid recoverable_slaves
recoverable_mirrors)a
@info_key_aliases [recoverable_mirrors: :recoverable_slaves]
def info_keys(), do: @info_keys
@ -39,7 +41,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListUnresponsiveQueuesCommand do
end
def validate(args, _opts) do
case InfoKeys.validate_info_keys(args, @info_keys) do
case InfoKeys.validate_info_keys(args, @info_keys, @info_key_aliases) do
{:ok, _} -> :ok
err -> err
end
@ -54,12 +56,15 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListUnresponsiveQueuesCommand do
queue_timeout: qtimeout,
local: local_opt
}) do
info_keys = InfoKeys.prepare_info_keys(args)
info_keys = InfoKeys.prepare_info_keys(args, @info_key_aliases)
broker_keys = InfoKeys.broker_keys(info_keys)
queue_timeout = qtimeout * 1000
Helpers.with_nodes_in_cluster(node_name, fn nodes ->
local_mfa = {:rabbit_amqqueue, :emit_unresponsive_local, [vhost, info_keys, queue_timeout]}
all_mfa = {:rabbit_amqqueue, :emit_unresponsive, [nodes, vhost, info_keys, queue_timeout]}
local_mfa =
{:rabbit_amqqueue, :emit_unresponsive_local, [vhost, broker_keys, queue_timeout]}
all_mfa = {:rabbit_amqqueue, :emit_unresponsive, [nodes, vhost, broker_keys, queue_timeout]}
{chunks, mfas} =
case local_opt do

View File

@ -8,8 +8,19 @@ defmodule RabbitMQ.CLI.Ctl.InfoKeys do
import RabbitCommon.Records
alias RabbitMQ.CLI.Core.DataCoercion
# internal to requested keys
@type info_keys :: Erlang.proplist()
# requested to internal keys
@type aliases :: keyword(atom)
def validate_info_keys(args, valid_keys) do
info_keys = prepare_info_keys(args)
validate_info_keys(args, valid_keys, [])
end
@spec validate_info_keys([charlist], [charlist], aliases) ::
{:ok, info_keys} | {:validation_failure, any}
def validate_info_keys(args, valid_keys, aliases) do
info_keys = prepare_info_keys(args, aliases)
case invalid_info_keys(info_keys, Enum.map(valid_keys, &DataCoercion.to_atom/1)) do
[_ | _] = bad_info_keys ->
@ -21,35 +32,81 @@ defmodule RabbitMQ.CLI.Ctl.InfoKeys do
end
def prepare_info_keys(args) do
prepare_info_keys(args, [])
end
@spec prepare_info_keys([charlist], aliases) :: info_keys
def prepare_info_keys(args, aliases) do
args
|> Enum.flat_map(fn arg -> String.split(arg, ",", trim: true) end)
|> Enum.map(fn s -> String.replace(s, ",", "") end)
|> Enum.map(&String.trim/1)
|> Enum.map(&String.to_atom/1)
|> Enum.map(fn k ->
case Keyword.get(aliases, k) do
nil -> k
v -> {v, k}
end
end)
|> Enum.uniq()
|> :proplists.compact()
end
def broker_keys(info_keys) do
Enum.map(
info_keys,
fn
{k, _} -> k
k -> k
end
)
end
def with_valid_info_keys(args, valid_keys, fun) do
case validate_info_keys(args, valid_keys) do
{:ok, info_keys} -> fun.(info_keys)
with_valid_info_keys(args, valid_keys, [], fun)
end
@spec with_valid_info_keys([charlist], [charlist], aliases, fun([atom])) :: any
def with_valid_info_keys(args, valid_keys, aliases, fun) do
case validate_info_keys(args, valid_keys, aliases) do
{:ok, info_keys} -> fun.(:proplists.get_keys(info_keys))
err -> err
end
end
@spec invalid_info_keys(info_keys, [atom]) :: [atom]
defp invalid_info_keys(info_keys, valid_keys) do
MapSet.new(info_keys)
info_keys
|> :proplists.get_keys()
|> MapSet.new()
|> MapSet.difference(MapSet.new(valid_keys))
|> MapSet.to_list()
|> Enum.map(fn k ->
case :proplists.get_value(k, info_keys, k) do
true -> k
v -> v
end
end)
end
@spec info_for_keys(keyword, info_keys) :: keyword
def info_for_keys(item, []) do
item
end
def info_for_keys([{_, _} | _] = item, info_keys) do
item
|> Enum.filter(fn {k, _} -> Enum.member?(info_keys, k) end)
|> Enum.map(fn {k, v} -> {k, format_info_item(v)} end)
|> Enum.filter(fn {k, _} -> :proplists.is_defined(k, info_keys) end)
|> Enum.map(fn {k, v} ->
original =
case :proplists.get_value(k, info_keys) do
true -> k
v -> v
end
{original, format_info_item(v)}
end)
end
defp format_info_item(resource(name: name)) do

View File

@ -0,0 +1,65 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
defmodule InfoKeysTest do
use ExUnit.Case
import RabbitMQ.CLI.Ctl.InfoKeys
test "prepare translates aliases" do
assert prepare_info_keys(["apple"], apple: :banana) == [banana: :apple]
end
test "prepare works without aliases" do
assert prepare_info_keys(["apple"], []) == [:apple]
assert prepare_info_keys(["apple"]) == [:apple]
end
test "validate translates aliases" do
assert validate_info_keys(["apple"], ["banana"], apple: :banana) ==
{:ok, [banana: :apple]}
end
test "validate works without aliases" do
assert validate_info_keys(["apple"], ["apple"], []) == {:ok, [:apple]}
assert validate_info_keys(["apple"], ["apple"]) == {:ok, [:apple]}
end
test "with_valid translates aliases" do
assert with_valid_info_keys(["apple"], ["banana"], [apple: :banana], fn v -> v end) ==
[:banana]
end
test "with_valid works without aliases" do
assert with_valid_info_keys(["apple"], ["apple"], [], fn v -> v end) == [:apple]
assert with_valid_info_keys(["apple"], ["apple"], fn v -> v end) == [:apple]
end
test "broker_keys preserves order" do
keys = ["a", "b", "c"]
broker_keys = prepare_info_keys(keys) |> broker_keys()
assert broker_keys == [:a, :b, :c]
end
test "info_keys preserves requested key names" do
aliases = [apple: :banana]
broker_response = [banana: "bonono", carrot: "corrot"]
keysA = ["banana", "carrot"]
keysB = ["apple", "carrot"]
normalizedA = prepare_info_keys(keysA, aliases)
normalizedB = prepare_info_keys(keysB, aliases)
assert :proplists.get_keys(normalizedA) == :proplists.get_keys(normalizedB)
returnA = info_for_keys(broker_response, normalizedA)
returnB = info_for_keys(broker_response, normalizedB)
assert broker_keys(returnA) == Enum.map(keysA, &String.to_atom/1)
assert broker_keys(returnB) == Enum.map(keysB, &String.to_atom/1)
end
end