Merge pull request #218 from rabbitmq/timeout_list_queues
Set a timeout on list_queues and a warning if unresponsive ones are found
This commit is contained in:
commit
035e7305ac
|
@ -46,6 +46,7 @@ defmodule RabbitMQ.CLI.Core.ExitCodes do
|
|||
def exit_code_for({:badrpc_multi, :timeout, _}), do: exit_tempfail()
|
||||
def exit_code_for({:badrpc, :timeout}), do: exit_tempfail()
|
||||
def exit_code_for({:badrpc, {:timeout, _}}), do: exit_tempfail()
|
||||
def exit_code_for({:badrpc, {:timeout, _, _}}), do: exit_tempfail()
|
||||
def exit_code_for(:timeout), do: exit_tempfail()
|
||||
def exit_code_for({:timeout, _}), do: exit_tempfail()
|
||||
def exit_code_for({:badrpc_multi, :nodedown, _}), do: exit_unavailable()
|
||||
|
|
|
@ -27,6 +27,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListQueuesCommand do
|
|||
|
||||
def formatter(), do: RabbitMQ.CLI.Formatters.Table
|
||||
|
||||
@default_timeout 60_000
|
||||
@info_keys ~w(name durable auto_delete
|
||||
arguments policy pid owner_pid exclusive exclusive_consumer_pid
|
||||
exclusive_consumer_tag messages_ready messages_unacknowledged messages
|
||||
|
@ -47,7 +48,13 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListQueuesCommand do
|
|||
end
|
||||
end
|
||||
def merge_defaults([_|_] = args, opts) do
|
||||
{args, Map.merge(default_opts(), opts)}
|
||||
timeout = case opts[:timeout] do
|
||||
nil -> @default_timeout;
|
||||
:infinity -> @default_timeout;
|
||||
other -> other
|
||||
end
|
||||
{args, Map.merge(default_opts(),
|
||||
Map.merge(opts, %{timeout: timeout}))}
|
||||
end
|
||||
def merge_defaults([], opts) do
|
||||
merge_defaults(~w(name messages), opts)
|
||||
|
@ -83,7 +90,11 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListQueuesCommand do
|
|||
{_, false, true} -> {Kernel.length(nodes), [online_mfa]};
|
||||
{_, true, false} -> {1, [offline_mfa]}
|
||||
end
|
||||
RpcStream.receive_list_items(node_name, mfas, timeout, info_keys, chunks)
|
||||
RpcStream.receive_list_items_with_fun(node_name, mfas, timeout, info_keys, chunks,
|
||||
fn({{:error, {:badrpc, {:timeout, to}}}, :finished}) ->
|
||||
{{:error, {:badrpc, {:timeout, to, "Some queue(s) are unresponsive, use list_unresponsive_queues command."}}}, :finished};
|
||||
(any) -> any
|
||||
end)
|
||||
end)
|
||||
end
|
||||
|
||||
|
@ -91,5 +102,8 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListQueuesCommand do
|
|||
%{vhost: "/", offline: false, online: false, local: false}
|
||||
end
|
||||
|
||||
def banner(_,%{vhost: vhost}), do: "Listing queues for vhost #{vhost} ..."
|
||||
def banner(_,%{vhost: vhost, timeout: timeout}) do
|
||||
["Timeout: #{timeout / 1000} seconds ...",
|
||||
"Listing queues for vhost #{vhost} ..."]
|
||||
end
|
||||
end
|
||||
|
|
|
@ -29,14 +29,18 @@ defmodule RabbitMQ.CLI.Ctl.RpcStream do
|
|||
end
|
||||
|
||||
def receive_list_items(node, mfas, timeout, info_keys, chunks_init) do
|
||||
receive_list_items_with_fun(node, mfas, timeout, info_keys, chunks_init, fn(v) -> v end)
|
||||
end
|
||||
|
||||
def receive_list_items_with_fun(node, mfas, timeout, info_keys, chunks_init, response_fun) do
|
||||
pid = Kernel.self
|
||||
ref = Kernel.make_ref
|
||||
for {m,f,a} <- mfas, do: init_items_stream(node, m, f, a, timeout, pid, ref)
|
||||
Stream.unfold({chunks_init, :continue},
|
||||
fn
|
||||
:finished -> nil;
|
||||
:finished -> response_fun.(nil);
|
||||
{chunks, :continue} ->
|
||||
receive do
|
||||
received = receive do
|
||||
{^ref, :finished} when chunks === 1 -> nil;
|
||||
{^ref, :finished} -> {[], {chunks - 1, :continue}};
|
||||
{^ref, {:timeout, t}} -> {{:error, {:badrpc, {:timeout, (t / 1000)}}}, :finished};
|
||||
|
@ -48,6 +52,7 @@ defmodule RabbitMQ.CLI.Ctl.RpcStream do
|
|||
{:DOWN, _mref, :process, _pid, :normal} -> {[], {chunks, :continue}};
|
||||
{:DOWN, _mref, :process, _pid, reason} -> {{:error, simplify_emission_error(reason)}, :finished}
|
||||
end
|
||||
response_fun.(received)
|
||||
end)
|
||||
|> display_list_items(info_keys)
|
||||
end
|
||||
|
|
|
@ -283,6 +283,11 @@ defmodule RabbitMQCtl do
|
|||
{:error, ExitCodes.exit_code_for(result),
|
||||
"Error: operation #{op} on node #{opts[:node]} timed out. Timeout value used: #{to}"}
|
||||
end
|
||||
defp format_error({:error, {:badrpc, {:timeout, to, warning}}}, opts, module) do
|
||||
op = CommandModules.module_to_command(module)
|
||||
{:error, ExitCodes.exit_code_for({:timeout, to}),
|
||||
"Error: operation #{op} on node #{opts[:node]} timed out. Timeout value used: #{to}. #{warning}"}
|
||||
end
|
||||
defp format_error({:error, {:no_such_vhost, vhost} = result}, _opts, _) do
|
||||
{:error, ExitCodes.exit_code_for(result),
|
||||
"Virtual host '#{vhost}' does not exist"}
|
||||
|
|
|
@ -72,7 +72,7 @@ defmodule ListQueuesCommandTest do
|
|||
@tag test_timeout: 0
|
||||
test "run: zero timeout causes command to return badrpc", context do
|
||||
assert run_command_to_list(@command, [["name"], context[:opts]]) ==
|
||||
[{:error, {:badrpc, {:timeout, 0.0}}}]
|
||||
[{:error, {:badrpc, {:timeout, 0.0, "Some queue(s) are unresponsive, use list_unresponsive_queues command."}}}]
|
||||
end
|
||||
|
||||
@tag test_timeout: 1
|
||||
|
@ -83,7 +83,7 @@ defmodule ListQueuesCommandTest do
|
|||
declare_queue("test_queue_" <> Integer.to_string(i), @vhost)
|
||||
end
|
||||
assert run_command_to_list(@command, [["name"], context[:opts]]) ==
|
||||
[{:error, {:badrpc, {:timeout, 0.001}}}]
|
||||
[{:error, {:badrpc, {:timeout, 0.001, "Some queue(s) are unresponsive, use list_unresponsive_queues command."}}}]
|
||||
for i <- 1..n do
|
||||
delete_queue("test_queue_" <> Integer.to_string(i), @vhost)
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue