Use info items to list unresponsive queues
rabbitmq-cli#207 [#149059849]
This commit is contained in:
		
							parent
							
								
									3c6a59079c
								
							
						
					
					
						commit
						829a918c5a
					
				|  | @ -15,8 +15,10 @@ | |||
| 
 | ||||
| 
 | ||||
| defmodule RabbitMQ.CLI.Ctl.Commands.ListUnresponsiveQueuesCommand do | ||||
|   require RabbitMQ.CLI.Ctl.InfoKeys | ||||
|   require RabbitMQ.CLI.Ctl.RpcStream | ||||
| 
 | ||||
|   alias RabbitMQ.CLI.Ctl.InfoKeys, as: InfoKeys | ||||
|   alias RabbitMQ.CLI.Ctl.RpcStream, as: RpcStream | ||||
|   alias RabbitMQ.CLI.Core.Helpers, as: Helpers | ||||
| 
 | ||||
|  | @ -25,35 +27,50 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListUnresponsiveQueuesCommand do | |||
| 
 | ||||
|   def formatter(), do: RabbitMQ.CLI.Formatters.Table | ||||
| 
 | ||||
|   @info_keys ~w(name durable auto_delete | ||||
|             arguments pid recoverable_slaves)a | ||||
| 
 | ||||
|   def info_keys(), do: @info_keys | ||||
| 
 | ||||
|   def scopes(), do: [:ctl, :diagnostics] | ||||
| 
 | ||||
|   def validate(_, _), do: :ok | ||||
|   def validate(args, _opts) do | ||||
|       case InfoKeys.validate_info_keys(args, @info_keys) do | ||||
|         {:ok, _} -> :ok | ||||
|         err -> err | ||||
|       end | ||||
|   end | ||||
| 
 | ||||
|   def merge_defaults(args, opts) do | ||||
|   def merge_defaults([_|_] = args, opts) do | ||||
|     {args, Map.merge(default_opts(), opts)} | ||||
|   end | ||||
|   def merge_defaults([], opts) do | ||||
|       merge_defaults(~w(name), opts) | ||||
|   end | ||||
| 
 | ||||
|   def switches(), do: [queue_timeout: :integer, local: :boolean] | ||||
| 
 | ||||
|   def usage() do | ||||
|     "list_unresponsive_queues [--local] [--queue-timeout <queue-timeout>]" | ||||
|     "list_unresponsive_queues [--local] [--queue-timeout <queue-timeout>] [<queueinfoitem> ...]" | ||||
|   end | ||||
| 
 | ||||
|   def run([], %{node: node_name, vhost: vhost, timeout: timeout, | ||||
|                 queue_timeout: queue_timeout, local: local_opt}) do | ||||
|   def run(args, %{node: node_name, vhost: vhost, timeout: timeout, | ||||
|                   queue_timeout: qtimeout, local: local_opt}) do | ||||
|     info_keys = InfoKeys.prepare_info_keys(args) | ||||
|     queue_timeout = qtimeout * 1000 | ||||
|     Helpers.with_nodes_in_cluster(node_name, fn(nodes) -> | ||||
|       local_mfa  = {:rabbit_amqqueue, :emit_unresponsive_local, [vhost, queue_timeout]} | ||||
|       all_mfa  = {:rabbit_amqqueue, :emit_unresponsive, [nodes, vhost, queue_timeout]} | ||||
|       local_mfa  = {:rabbit_amqqueue, :emit_unresponsive_local, [vhost, info_keys, queue_timeout]} | ||||
|       all_mfa  = {:rabbit_amqqueue, :emit_unresponsive, [nodes, vhost, info_keys, queue_timeout]} | ||||
|       {chunks, mfas} = case local_opt do | ||||
|           true  -> {1, [local_mfa]}; | ||||
|           false -> {Kernel.length(nodes), [all_mfa]} | ||||
|         end | ||||
|         RpcStream.receive_list_items(node_name, mfas, timeout, [:name], chunks) | ||||
|         RpcStream.receive_list_items(node_name, mfas, timeout, info_keys, chunks) | ||||
|       end) | ||||
|   end | ||||
| 
 | ||||
|   defp default_opts() do | ||||
|     %{vhost: "/", local: false, queue_timeout: 30000} | ||||
|     %{vhost: "/", local: false, queue_timeout: 15} | ||||
|   end | ||||
| 
 | ||||
|   def banner(_,%{vhost: vhost}), do: "Listing unresponsive queues for vhost #{vhost} ..." | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue