Use info items to list unresponsive queues
rabbitmq-cli#207 [#149059849]
This commit is contained in:
parent
3c6a59079c
commit
829a918c5a
|
@ -15,8 +15,10 @@
|
||||||
|
|
||||||
|
|
||||||
defmodule RabbitMQ.CLI.Ctl.Commands.ListUnresponsiveQueuesCommand do
|
defmodule RabbitMQ.CLI.Ctl.Commands.ListUnresponsiveQueuesCommand do
|
||||||
|
require RabbitMQ.CLI.Ctl.InfoKeys
|
||||||
require RabbitMQ.CLI.Ctl.RpcStream
|
require RabbitMQ.CLI.Ctl.RpcStream
|
||||||
|
|
||||||
|
alias RabbitMQ.CLI.Ctl.InfoKeys, as: InfoKeys
|
||||||
alias RabbitMQ.CLI.Ctl.RpcStream, as: RpcStream
|
alias RabbitMQ.CLI.Ctl.RpcStream, as: RpcStream
|
||||||
alias RabbitMQ.CLI.Core.Helpers, as: Helpers
|
alias RabbitMQ.CLI.Core.Helpers, as: Helpers
|
||||||
|
|
||||||
|
@ -25,35 +27,50 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListUnresponsiveQueuesCommand do
|
||||||
|
|
||||||
def formatter(), do: RabbitMQ.CLI.Formatters.Table
|
def formatter(), do: RabbitMQ.CLI.Formatters.Table
|
||||||
|
|
||||||
|
@info_keys ~w(name durable auto_delete
|
||||||
|
arguments pid recoverable_slaves)a
|
||||||
|
|
||||||
|
def info_keys(), do: @info_keys
|
||||||
|
|
||||||
def scopes(), do: [:ctl, :diagnostics]
|
def scopes(), do: [:ctl, :diagnostics]
|
||||||
|
|
||||||
def validate(_, _), do: :ok
|
def validate(args, _opts) do
|
||||||
|
case InfoKeys.validate_info_keys(args, @info_keys) do
|
||||||
|
{:ok, _} -> :ok
|
||||||
|
err -> err
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def merge_defaults(args, opts) do
|
def merge_defaults([_|_] = args, opts) do
|
||||||
{args, Map.merge(default_opts(), opts)}
|
{args, Map.merge(default_opts(), opts)}
|
||||||
end
|
end
|
||||||
|
def merge_defaults([], opts) do
|
||||||
|
merge_defaults(~w(name), opts)
|
||||||
|
end
|
||||||
|
|
||||||
def switches(), do: [queue_timeout: :integer, local: :boolean]
|
def switches(), do: [queue_timeout: :integer, local: :boolean]
|
||||||
|
|
||||||
def usage() do
|
def usage() do
|
||||||
"list_unresponsive_queues [--local] [--queue-timeout <queue-timeout>]"
|
"list_unresponsive_queues [--local] [--queue-timeout <queue-timeout>] [<queueinfoitem> ...]"
|
||||||
end
|
end
|
||||||
|
|
||||||
def run([], %{node: node_name, vhost: vhost, timeout: timeout,
|
def run(args, %{node: node_name, vhost: vhost, timeout: timeout,
|
||||||
queue_timeout: queue_timeout, local: local_opt}) do
|
queue_timeout: qtimeout, local: local_opt}) do
|
||||||
|
info_keys = InfoKeys.prepare_info_keys(args)
|
||||||
|
queue_timeout = qtimeout * 1000
|
||||||
Helpers.with_nodes_in_cluster(node_name, fn(nodes) ->
|
Helpers.with_nodes_in_cluster(node_name, fn(nodes) ->
|
||||||
local_mfa = {:rabbit_amqqueue, :emit_unresponsive_local, [vhost, queue_timeout]}
|
local_mfa = {:rabbit_amqqueue, :emit_unresponsive_local, [vhost, info_keys, queue_timeout]}
|
||||||
all_mfa = {:rabbit_amqqueue, :emit_unresponsive, [nodes, vhost, queue_timeout]}
|
all_mfa = {:rabbit_amqqueue, :emit_unresponsive, [nodes, vhost, info_keys, queue_timeout]}
|
||||||
{chunks, mfas} = case local_opt do
|
{chunks, mfas} = case local_opt do
|
||||||
true -> {1, [local_mfa]};
|
true -> {1, [local_mfa]};
|
||||||
false -> {Kernel.length(nodes), [all_mfa]}
|
false -> {Kernel.length(nodes), [all_mfa]}
|
||||||
end
|
end
|
||||||
RpcStream.receive_list_items(node_name, mfas, timeout, [:name], chunks)
|
RpcStream.receive_list_items(node_name, mfas, timeout, info_keys, chunks)
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp default_opts() do
|
defp default_opts() do
|
||||||
%{vhost: "/", local: false, queue_timeout: 30000}
|
%{vhost: "/", local: false, queue_timeout: 15}
|
||||||
end
|
end
|
||||||
|
|
||||||
def banner(_,%{vhost: vhost}), do: "Listing unresponsive queues for vhost #{vhost} ..."
|
def banner(_,%{vhost: vhost}), do: "Listing unresponsive queues for vhost #{vhost} ..."
|
||||||
|
|
Loading…
Reference in New Issue