Use info items to list unresponsive queues
rabbitmq-cli#207 [#149059849]
This commit is contained in:
parent
3c6a59079c
commit
829a918c5a
|
@ -15,8 +15,10 @@
|
|||
|
||||
|
||||
defmodule RabbitMQ.CLI.Ctl.Commands.ListUnresponsiveQueuesCommand do
|
||||
require RabbitMQ.CLI.Ctl.InfoKeys
|
||||
require RabbitMQ.CLI.Ctl.RpcStream
|
||||
|
||||
alias RabbitMQ.CLI.Ctl.InfoKeys, as: InfoKeys
|
||||
alias RabbitMQ.CLI.Ctl.RpcStream, as: RpcStream
|
||||
alias RabbitMQ.CLI.Core.Helpers, as: Helpers
|
||||
|
||||
|
@ -25,35 +27,50 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListUnresponsiveQueuesCommand do
|
|||
|
||||
def formatter(), do: RabbitMQ.CLI.Formatters.Table
|
||||
|
||||
@info_keys ~w(name durable auto_delete
|
||||
arguments pid recoverable_slaves)a
|
||||
|
||||
def info_keys(), do: @info_keys
|
||||
|
||||
def scopes(), do: [:ctl, :diagnostics]
|
||||
|
||||
def validate(_, _), do: :ok
|
||||
def validate(args, _opts) do
|
||||
case InfoKeys.validate_info_keys(args, @info_keys) do
|
||||
{:ok, _} -> :ok
|
||||
err -> err
|
||||
end
|
||||
end
|
||||
|
||||
def merge_defaults(args, opts) do
|
||||
def merge_defaults([_|_] = args, opts) do
|
||||
{args, Map.merge(default_opts(), opts)}
|
||||
end
|
||||
def merge_defaults([], opts) do
|
||||
merge_defaults(~w(name), opts)
|
||||
end
|
||||
|
||||
def switches(), do: [queue_timeout: :integer, local: :boolean]
|
||||
|
||||
def usage() do
|
||||
"list_unresponsive_queues [--local] [--queue-timeout <queue-timeout>]"
|
||||
"list_unresponsive_queues [--local] [--queue-timeout <queue-timeout>] [<queueinfoitem> ...]"
|
||||
end
|
||||
|
||||
def run([], %{node: node_name, vhost: vhost, timeout: timeout,
|
||||
queue_timeout: queue_timeout, local: local_opt}) do
|
||||
def run(args, %{node: node_name, vhost: vhost, timeout: timeout,
|
||||
queue_timeout: qtimeout, local: local_opt}) do
|
||||
info_keys = InfoKeys.prepare_info_keys(args)
|
||||
queue_timeout = qtimeout * 1000
|
||||
Helpers.with_nodes_in_cluster(node_name, fn(nodes) ->
|
||||
local_mfa = {:rabbit_amqqueue, :emit_unresponsive_local, [vhost, queue_timeout]}
|
||||
all_mfa = {:rabbit_amqqueue, :emit_unresponsive, [nodes, vhost, queue_timeout]}
|
||||
local_mfa = {:rabbit_amqqueue, :emit_unresponsive_local, [vhost, info_keys, queue_timeout]}
|
||||
all_mfa = {:rabbit_amqqueue, :emit_unresponsive, [nodes, vhost, info_keys, queue_timeout]}
|
||||
{chunks, mfas} = case local_opt do
|
||||
true -> {1, [local_mfa]};
|
||||
false -> {Kernel.length(nodes), [all_mfa]}
|
||||
end
|
||||
RpcStream.receive_list_items(node_name, mfas, timeout, [:name], chunks)
|
||||
RpcStream.receive_list_items(node_name, mfas, timeout, info_keys, chunks)
|
||||
end)
|
||||
end
|
||||
|
||||
defp default_opts() do
|
||||
%{vhost: "/", local: false, queue_timeout: 30000}
|
||||
%{vhost: "/", local: false, queue_timeout: 15}
|
||||
end
|
||||
|
||||
def banner(_,%{vhost: vhost}), do: "Listing unresponsive queues for vhost #{vhost} ..."
|
||||
|
|
Loading…
Reference in New Issue