Use info items to list unresponsive queues

rabbitmq-cli#207
[#149059849]
This commit is contained in:
Diana Corbacho 2017-08-07 15:08:33 +02:00
parent 3c6a59079c
commit 829a918c5a
1 changed files with 26 additions and 9 deletions

View File

@ -15,8 +15,10 @@
defmodule RabbitMQ.CLI.Ctl.Commands.ListUnresponsiveQueuesCommand do
require RabbitMQ.CLI.Ctl.InfoKeys
require RabbitMQ.CLI.Ctl.RpcStream
alias RabbitMQ.CLI.Ctl.InfoKeys, as: InfoKeys
alias RabbitMQ.CLI.Ctl.RpcStream, as: RpcStream
alias RabbitMQ.CLI.Core.Helpers, as: Helpers
@ -25,35 +27,50 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListUnresponsiveQueuesCommand do
def formatter(), do: RabbitMQ.CLI.Formatters.Table
@info_keys ~w(name durable auto_delete
arguments pid recoverable_slaves)a
def info_keys(), do: @info_keys
def scopes(), do: [:ctl, :diagnostics]
def validate(_, _), do: :ok
def validate(args, _opts) do
case InfoKeys.validate_info_keys(args, @info_keys) do
{:ok, _} -> :ok
err -> err
end
end
def merge_defaults(args, opts) do
def merge_defaults([_|_] = args, opts) do
{args, Map.merge(default_opts(), opts)}
end
def merge_defaults([], opts) do
merge_defaults(~w(name), opts)
end
def switches(), do: [queue_timeout: :integer, local: :boolean]
def usage() do
"list_unresponsive_queues [--local] [--queue-timeout <queue-timeout>]"
"list_unresponsive_queues [--local] [--queue-timeout <queue-timeout>] [<queueinfoitem> ...]"
end
def run([], %{node: node_name, vhost: vhost, timeout: timeout,
queue_timeout: queue_timeout, local: local_opt}) do
def run(args, %{node: node_name, vhost: vhost, timeout: timeout,
queue_timeout: qtimeout, local: local_opt}) do
info_keys = InfoKeys.prepare_info_keys(args)
queue_timeout = qtimeout * 1000
Helpers.with_nodes_in_cluster(node_name, fn(nodes) ->
local_mfa = {:rabbit_amqqueue, :emit_unresponsive_local, [vhost, queue_timeout]}
all_mfa = {:rabbit_amqqueue, :emit_unresponsive, [nodes, vhost, queue_timeout]}
local_mfa = {:rabbit_amqqueue, :emit_unresponsive_local, [vhost, info_keys, queue_timeout]}
all_mfa = {:rabbit_amqqueue, :emit_unresponsive, [nodes, vhost, info_keys, queue_timeout]}
{chunks, mfas} = case local_opt do
true -> {1, [local_mfa]};
false -> {Kernel.length(nodes), [all_mfa]}
end
RpcStream.receive_list_items(node_name, mfas, timeout, [:name], chunks)
RpcStream.receive_list_items(node_name, mfas, timeout, info_keys, chunks)
end)
end
defp default_opts() do
%{vhost: "/", local: false, queue_timeout: 30000}
%{vhost: "/", local: false, queue_timeout: 15}
end
def banner(_,%{vhost: vhost}), do: "Listing unresponsive queues for vhost #{vhost} ..."