Merge branch 'master' into rabbitmq-cli-207
This commit is contained in:
commit
e8c73d9bea
|
|
@ -46,6 +46,7 @@ defmodule RabbitMQ.CLI.Core.ExitCodes do
|
||||||
def exit_code_for({:badrpc_multi, :timeout, _}), do: exit_tempfail()
|
def exit_code_for({:badrpc_multi, :timeout, _}), do: exit_tempfail()
|
||||||
def exit_code_for({:badrpc, :timeout}), do: exit_tempfail()
|
def exit_code_for({:badrpc, :timeout}), do: exit_tempfail()
|
||||||
def exit_code_for({:badrpc, {:timeout, _}}), do: exit_tempfail()
|
def exit_code_for({:badrpc, {:timeout, _}}), do: exit_tempfail()
|
||||||
|
def exit_code_for({:badrpc, {:timeout, _, _}}), do: exit_tempfail()
|
||||||
def exit_code_for(:timeout), do: exit_tempfail()
|
def exit_code_for(:timeout), do: exit_tempfail()
|
||||||
def exit_code_for({:timeout, _}), do: exit_tempfail()
|
def exit_code_for({:timeout, _}), do: exit_tempfail()
|
||||||
def exit_code_for({:badrpc_multi, :nodedown, _}), do: exit_unavailable()
|
def exit_code_for({:badrpc_multi, :nodedown, _}), do: exit_unavailable()
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListQueuesCommand do
|
||||||
|
|
||||||
def formatter(), do: RabbitMQ.CLI.Formatters.Table
|
def formatter(), do: RabbitMQ.CLI.Formatters.Table
|
||||||
|
|
||||||
|
@default_timeout 60_000
|
||||||
@info_keys ~w(name durable auto_delete
|
@info_keys ~w(name durable auto_delete
|
||||||
arguments policy pid owner_pid exclusive exclusive_consumer_pid
|
arguments policy pid owner_pid exclusive exclusive_consumer_pid
|
||||||
exclusive_consumer_tag messages_ready messages_unacknowledged messages
|
exclusive_consumer_tag messages_ready messages_unacknowledged messages
|
||||||
|
|
@ -47,7 +48,13 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListQueuesCommand do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
def merge_defaults([_|_] = args, opts) do
|
def merge_defaults([_|_] = args, opts) do
|
||||||
{args, Map.merge(default_opts(), opts)}
|
timeout = case opts[:timeout] do
|
||||||
|
nil -> @default_timeout;
|
||||||
|
:infinity -> @default_timeout;
|
||||||
|
other -> other
|
||||||
|
end
|
||||||
|
{args, Map.merge(default_opts(),
|
||||||
|
Map.merge(opts, %{timeout: timeout}))}
|
||||||
end
|
end
|
||||||
def merge_defaults([], opts) do
|
def merge_defaults([], opts) do
|
||||||
merge_defaults(~w(name messages), opts)
|
merge_defaults(~w(name messages), opts)
|
||||||
|
|
@ -83,7 +90,11 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListQueuesCommand do
|
||||||
{_, false, true} -> {Kernel.length(nodes), [online_mfa]};
|
{_, false, true} -> {Kernel.length(nodes), [online_mfa]};
|
||||||
{_, true, false} -> {1, [offline_mfa]}
|
{_, true, false} -> {1, [offline_mfa]}
|
||||||
end
|
end
|
||||||
RpcStream.receive_list_items(node_name, mfas, timeout, info_keys, chunks)
|
RpcStream.receive_list_items_with_fun(node_name, mfas, timeout, info_keys, chunks,
|
||||||
|
fn({{:error, {:badrpc, {:timeout, to}}}, :finished}) ->
|
||||||
|
{{:error, {:badrpc, {:timeout, to, "Some queue(s) are unresponsive, use list_unresponsive_queues command."}}}, :finished};
|
||||||
|
(any) -> any
|
||||||
|
end)
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -91,5 +102,8 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListQueuesCommand do
|
||||||
%{vhost: "/", offline: false, online: false, local: false}
|
%{vhost: "/", offline: false, online: false, local: false}
|
||||||
end
|
end
|
||||||
|
|
||||||
def banner(_,%{vhost: vhost}), do: "Listing queues for vhost #{vhost} ..."
|
def banner(_,%{vhost: vhost, timeout: timeout}) do
|
||||||
|
["Timeout: #{timeout / 1000} seconds ...",
|
||||||
|
"Listing queues for vhost #{vhost} ..."]
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListVhostsCommand do
|
||||||
|
|
||||||
def formatter(), do: RabbitMQ.CLI.Formatters.Table
|
def formatter(), do: RabbitMQ.CLI.Formatters.Table
|
||||||
|
|
||||||
@info_keys ~w(name tracing state)a
|
@info_keys ~w(name tracing cluster_state)a
|
||||||
|
|
||||||
def info_keys(), do: @info_keys
|
def info_keys(), do: @info_keys
|
||||||
|
|
||||||
|
|
@ -45,7 +45,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListVhostsCommand do
|
||||||
def usage, do: "list_vhosts [<vhostinfoitem> ...]"
|
def usage, do: "list_vhosts [<vhostinfoitem> ...]"
|
||||||
|
|
||||||
def usage_additional() do
|
def usage_additional() do
|
||||||
"<vhostinfoitem> must be a member of the list [name, tracing, state]."
|
"<vhostinfoitem> must be a member of the list [name, tracing, cluster_state]."
|
||||||
end
|
end
|
||||||
|
|
||||||
defp filter_by_arg(vhosts, _) when is_tuple(vhosts) do
|
defp filter_by_arg(vhosts, _) when is_tuple(vhosts) do
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,49 @@
|
||||||
|
## The contents of this file are subject to the Mozilla Public License
|
||||||
|
## Version 1.1 (the "License"); you may not use this file except in
|
||||||
|
## compliance with the License. You may obtain a copy of the License
|
||||||
|
## at http://www.mozilla.org/MPL/
|
||||||
|
##
|
||||||
|
## Software distributed under the License is distributed on an "AS IS"
|
||||||
|
## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
||||||
|
## the License for the specific language governing rights and
|
||||||
|
## limitations under the License.
|
||||||
|
##
|
||||||
|
## The Original Code is RabbitMQ.
|
||||||
|
##
|
||||||
|
## The Initial Developer of the Original Code is GoPivotal, Inc.
|
||||||
|
## Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
|
||||||
|
|
||||||
|
alias RabbitMQ.CLI.Core.ExitCodes, as: ExitCodes
|
||||||
|
|
||||||
|
defmodule RabbitMQ.CLI.Ctl.Commands.RestartVhostCommand do
|
||||||
|
@behaviour RabbitMQ.CLI.CommandBehaviour
|
||||||
|
|
||||||
|
def validate([], _), do: :ok
|
||||||
|
def validate(_, _), do: {:validation_failure, :too_many_args}
|
||||||
|
|
||||||
|
def merge_defaults(args, opts), do: {args, Map.merge(%{vhost: "/"}, opts)}
|
||||||
|
|
||||||
|
def run([], %{node: node_name, vhost: vhost, timeout: timeout}) do
|
||||||
|
:rabbit_misc.rpc_call(node_name, :rabbit_vhost_sup_sup, :start_vhost, [vhost], timeout)
|
||||||
|
end
|
||||||
|
|
||||||
|
def usage, do: "restart_vhost [-p <vhost>]"
|
||||||
|
|
||||||
|
def banner(_,%{node: node_name, vhost: vhost}) do
|
||||||
|
"Trying to restart vhost '#{vhost}' on node '#{node_name}' ..."
|
||||||
|
end
|
||||||
|
|
||||||
|
def output({:ok, _pid}, %{vhost: vhost, node: node_name}) do
|
||||||
|
{:ok, "Successfully restarted vhost '#{vhost}' on node '#{node_name}'"}
|
||||||
|
end
|
||||||
|
def output({:error, {:already_started, _pid}}, %{vhost: vhost, node: node_name}) do
|
||||||
|
{:ok, "Vhost '#{vhost}' is already running on node '#{node_name}'"}
|
||||||
|
end
|
||||||
|
def output({:error, err}, %{vhost: vhost, node: node_name}) do
|
||||||
|
{:error, ExitCodes.exit_software(),
|
||||||
|
["Failed to start vhost '#{vhost}' on node '#{node_name}'",
|
||||||
|
"Reason: #{inspect(err)}"]}
|
||||||
|
end
|
||||||
|
use RabbitMQ.CLI.DefaultOutput
|
||||||
|
|
||||||
|
end
|
||||||
|
|
@ -29,14 +29,18 @@ defmodule RabbitMQ.CLI.Ctl.RpcStream do
|
||||||
end
|
end
|
||||||
|
|
||||||
def receive_list_items(node, mfas, timeout, info_keys, chunks_init) do
|
def receive_list_items(node, mfas, timeout, info_keys, chunks_init) do
|
||||||
|
receive_list_items_with_fun(node, mfas, timeout, info_keys, chunks_init, fn(v) -> v end)
|
||||||
|
end
|
||||||
|
|
||||||
|
def receive_list_items_with_fun(node, mfas, timeout, info_keys, chunks_init, response_fun) do
|
||||||
pid = Kernel.self
|
pid = Kernel.self
|
||||||
ref = Kernel.make_ref
|
ref = Kernel.make_ref
|
||||||
for {m,f,a} <- mfas, do: init_items_stream(node, m, f, a, timeout, pid, ref)
|
for {m,f,a} <- mfas, do: init_items_stream(node, m, f, a, timeout, pid, ref)
|
||||||
Stream.unfold({chunks_init, :continue},
|
Stream.unfold({chunks_init, :continue},
|
||||||
fn
|
fn
|
||||||
:finished -> nil;
|
:finished -> response_fun.(nil);
|
||||||
{chunks, :continue} ->
|
{chunks, :continue} ->
|
||||||
receive do
|
received = receive do
|
||||||
{^ref, :finished} when chunks === 1 -> nil;
|
{^ref, :finished} when chunks === 1 -> nil;
|
||||||
{^ref, :finished} -> {[], {chunks - 1, :continue}};
|
{^ref, :finished} -> {[], {chunks - 1, :continue}};
|
||||||
{^ref, {:timeout, t}} -> {{:error, {:badrpc, {:timeout, (t / 1000)}}}, :finished};
|
{^ref, {:timeout, t}} -> {{:error, {:badrpc, {:timeout, (t / 1000)}}}, :finished};
|
||||||
|
|
@ -48,6 +52,7 @@ defmodule RabbitMQ.CLI.Ctl.RpcStream do
|
||||||
{:DOWN, _mref, :process, _pid, :normal} -> {[], {chunks, :continue}};
|
{:DOWN, _mref, :process, _pid, :normal} -> {[], {chunks, :continue}};
|
||||||
{:DOWN, _mref, :process, _pid, reason} -> {{:error, simplify_emission_error(reason)}, :finished}
|
{:DOWN, _mref, :process, _pid, reason} -> {{:error, simplify_emission_error(reason)}, :finished}
|
||||||
end
|
end
|
||||||
|
response_fun.(received)
|
||||||
end)
|
end)
|
||||||
|> display_list_items(info_keys)
|
|> display_list_items(info_keys)
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -42,6 +42,7 @@ defmodule RabbitMQ.CLI.DefaultOutput do
|
||||||
defp normalize_output({:badrpc_multi, _, _} = input), do: {:error, input}
|
defp normalize_output({:badrpc_multi, _, _} = input), do: {:error, input}
|
||||||
defp normalize_output({:badrpc, :nodedown} = input), do: {:error, input}
|
defp normalize_output({:badrpc, :nodedown} = input), do: {:error, input}
|
||||||
defp normalize_output({:badrpc, :timeout} = input), do: {:error, input}
|
defp normalize_output({:badrpc, :timeout} = input), do: {:error, input}
|
||||||
|
defp normalize_output({:badrpc, {:EXIT, reason}}), do: {:error, reason}
|
||||||
defp normalize_output({:error, format, args})
|
defp normalize_output({:error, format, args})
|
||||||
when (is_list(format) or is_binary(format)) and is_list(args) do
|
when (is_list(format) or is_binary(format)) and is_list(args) do
|
||||||
{:error, to_string(:rabbit_misc.format(format, args))}
|
{:error, to_string(:rabbit_misc.format(format, args))}
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,6 @@ defmodule RabbitMQ.CLI.Plugins.Commands.EnableCommand do
|
||||||
|
|
||||||
alias RabbitMQ.CLI.Plugins.Helpers, as: PluginHelpers
|
alias RabbitMQ.CLI.Plugins.Helpers, as: PluginHelpers
|
||||||
alias RabbitMQ.CLI.Core.Helpers, as: Helpers
|
alias RabbitMQ.CLI.Core.Helpers, as: Helpers
|
||||||
alias RabbitMQ.CLI.Core.ExitCodes, as: ExitCodes
|
|
||||||
|
|
||||||
@behaviour RabbitMQ.CLI.CommandBehaviour
|
@behaviour RabbitMQ.CLI.CommandBehaviour
|
||||||
|
|
||||||
|
|
@ -124,7 +123,7 @@ defmodule RabbitMQ.CLI.Plugins.Commands.EnableCommand do
|
||||||
end
|
end
|
||||||
|
|
||||||
def output({:error, err}, _opts) do
|
def output({:error, err}, _opts) do
|
||||||
{:error, ExitCodes.exit_software(), to_string(err)}
|
{:error, err}
|
||||||
end
|
end
|
||||||
def output({:stream, stream}, _opts) do
|
def output({:stream, stream}, _opts) do
|
||||||
{:stream, stream}
|
{:stream, stream}
|
||||||
|
|
|
||||||
|
|
@ -283,6 +283,11 @@ defmodule RabbitMQCtl do
|
||||||
{:error, ExitCodes.exit_code_for(result),
|
{:error, ExitCodes.exit_code_for(result),
|
||||||
"Error: operation #{op} on node #{opts[:node]} timed out. Timeout value used: #{to}"}
|
"Error: operation #{op} on node #{opts[:node]} timed out. Timeout value used: #{to}"}
|
||||||
end
|
end
|
||||||
|
defp format_error({:error, {:badrpc, {:timeout, to, warning}}}, opts, module) do
|
||||||
|
op = CommandModules.module_to_command(module)
|
||||||
|
{:error, ExitCodes.exit_code_for({:timeout, to}),
|
||||||
|
"Error: operation #{op} on node #{opts[:node]} timed out. Timeout value used: #{to}. #{warning}"}
|
||||||
|
end
|
||||||
defp format_error({:error, {:no_such_vhost, vhost} = result}, _opts, _) do
|
defp format_error({:error, {:no_such_vhost, vhost} = result}, _opts, _) do
|
||||||
{:error, ExitCodes.exit_code_for(result),
|
{:error, ExitCodes.exit_code_for(result),
|
||||||
"Virtual host '#{vhost}' does not exist"}
|
"Virtual host '#{vhost}' does not exist"}
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ defmodule RabbitMQCtl.MixfileBase do
|
||||||
[
|
[
|
||||||
app: :rabbitmqctl,
|
app: :rabbitmqctl,
|
||||||
version: "0.0.1",
|
version: "0.0.1",
|
||||||
elixir: "~> 1.4.4 or 1.5.0",
|
elixir: "~> 1.4.4 or ~> 1.5.0",
|
||||||
build_embedded: Mix.env == :prod,
|
build_embedded: Mix.env == :prod,
|
||||||
start_permanent: Mix.env == :prod,
|
start_permanent: Mix.env == :prod,
|
||||||
escript: [main_module: RabbitMQCtl,
|
escript: [main_module: RabbitMQCtl,
|
||||||
|
|
|
||||||
|
|
@ -82,7 +82,6 @@ defmodule DecodeCommandTest do
|
||||||
end
|
end
|
||||||
|
|
||||||
defp encrypt_decrypt(secret) do
|
defp encrypt_decrypt(secret) do
|
||||||
secret_as_erlang_term = format_as_erlang_term(secret)
|
|
||||||
passphrase = "passphrase"
|
passphrase = "passphrase"
|
||||||
cipher = :rabbit_pbe.default_cipher()
|
cipher = :rabbit_pbe.default_cipher()
|
||||||
hash = :rabbit_pbe.default_hash()
|
hash = :rabbit_pbe.default_hash()
|
||||||
|
|
|
||||||
|
|
@ -72,7 +72,7 @@ defmodule ListQueuesCommandTest do
|
||||||
@tag test_timeout: 0
|
@tag test_timeout: 0
|
||||||
test "run: zero timeout causes command to return badrpc", context do
|
test "run: zero timeout causes command to return badrpc", context do
|
||||||
assert run_command_to_list(@command, [["name"], context[:opts]]) ==
|
assert run_command_to_list(@command, [["name"], context[:opts]]) ==
|
||||||
[{:error, {:badrpc, {:timeout, 0.0}}}]
|
[{:error, {:badrpc, {:timeout, 0.0, "Some queue(s) are unresponsive, use list_unresponsive_queues command."}}}]
|
||||||
end
|
end
|
||||||
|
|
||||||
@tag test_timeout: 1
|
@tag test_timeout: 1
|
||||||
|
|
@ -83,7 +83,7 @@ defmodule ListQueuesCommandTest do
|
||||||
declare_queue("test_queue_" <> Integer.to_string(i), @vhost)
|
declare_queue("test_queue_" <> Integer.to_string(i), @vhost)
|
||||||
end
|
end
|
||||||
assert run_command_to_list(@command, [["name"], context[:opts]]) ==
|
assert run_command_to_list(@command, [["name"], context[:opts]]) ==
|
||||||
[{:error, {:badrpc, {:timeout, 0.001}}}]
|
[{:error, {:badrpc, {:timeout, 0.001, "Some queue(s) are unresponsive, use list_unresponsive_queues command."}}}]
|
||||||
for i <- 1..n do
|
for i <- 1..n do
|
||||||
delete_queue("test_queue_" <> Integer.to_string(i), @vhost)
|
delete_queue("test_queue_" <> Integer.to_string(i), @vhost)
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -114,8 +114,6 @@ defmodule ListVhostLimitsCommandTest do
|
||||||
end
|
end
|
||||||
|
|
||||||
test "banner", context do
|
test "banner", context do
|
||||||
vhost_opts = Map.merge(context[:opts], %{vhost: context[:vhost]})
|
|
||||||
|
|
||||||
assert @command.banner([], %{vhost: context[:vhost]})
|
assert @command.banner([], %{vhost: context[:vhost]})
|
||||||
== "Listing limits for vhost \"#{context[:vhost]}\" ..."
|
== "Listing limits for vhost \"#{context[:vhost]}\" ..."
|
||||||
assert @command.banner([], %{global: true})
|
assert @command.banner([], %{global: true})
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,91 @@
|
||||||
|
## The contents of this file are subject to the Mozilla Public License
|
||||||
|
## Version 1.1 (the "License"); you may not use this file except in
|
||||||
|
## compliance with the License. You may obtain a copy of the License
|
||||||
|
## at http://www.mozilla.org/MPL/
|
||||||
|
##
|
||||||
|
## Software distributed under the License is distributed on an "AS IS"
|
||||||
|
## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
||||||
|
## the License for the specific language governing rights and
|
||||||
|
## limitations under the License.
|
||||||
|
##
|
||||||
|
## The Original Code is RabbitMQ.
|
||||||
|
##
|
||||||
|
## The Initial Developer of the Original Code is Pivotal Software, Inc.
|
||||||
|
## Copyright (c) 2016-2017 Pivotal Software, Inc. All rights reserved.
|
||||||
|
|
||||||
|
|
||||||
|
defmodule RestartVhostCommandTest do
|
||||||
|
use ExUnit.Case, async: false
|
||||||
|
import TestHelper
|
||||||
|
|
||||||
|
@command RabbitMQ.CLI.Ctl.Commands.RestartVhostCommand
|
||||||
|
|
||||||
|
setup_all do
|
||||||
|
RabbitMQ.CLI.Core.Distribution.start()
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
|
||||||
|
@vhost "vhost_to_restart"
|
||||||
|
@timeout 10000
|
||||||
|
|
||||||
|
setup do
|
||||||
|
add_vhost @vhost
|
||||||
|
on_exit(fn ->
|
||||||
|
delete_vhost @vhost
|
||||||
|
end)
|
||||||
|
{:ok, opts: %{
|
||||||
|
node: get_rabbit_hostname(),
|
||||||
|
vhost: @vhost,
|
||||||
|
timeout: @timeout
|
||||||
|
}}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "validate: specifying arguments is reported as an error", context do
|
||||||
|
assert @command.validate(["a"], context[:opts]) ==
|
||||||
|
{:validation_failure, :too_many_args}
|
||||||
|
assert @command.validate(["a", "b"], context[:opts]) ==
|
||||||
|
{:validation_failure, :too_many_args}
|
||||||
|
assert @command.validate(["a", "b", "c"], context[:opts]) ==
|
||||||
|
{:validation_failure, :too_many_args}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "run: request to a non-existent node returns nodedown", _context do
|
||||||
|
target = :jake@thedog
|
||||||
|
|
||||||
|
opts = %{node: target, vhost: @vhost, timeout: @timeout}
|
||||||
|
# We use "self" node as the target. It's enough to trigger the error.
|
||||||
|
assert match?(
|
||||||
|
{:badrpc, :nodedown},
|
||||||
|
@command.run([], opts))
|
||||||
|
end
|
||||||
|
|
||||||
|
test "banner", context do
|
||||||
|
expected = "Trying to restart vhost '#{@vhost}' on node '#{get_rabbit_hostname()}' ..."
|
||||||
|
^expected = @command.banner([], context[:opts])
|
||||||
|
end
|
||||||
|
|
||||||
|
test "run: restarting an existing vhost returns already_started", context do
|
||||||
|
{:error, {:already_started, _}} = @command.run([], context[:opts])
|
||||||
|
end
|
||||||
|
|
||||||
|
test "run: restarting an failed vhost returns ok", context do
|
||||||
|
vhost = context[:opts][:vhost]
|
||||||
|
node_name = context[:opts][:node]
|
||||||
|
force_vhost_failure(node_name, vhost)
|
||||||
|
{:ok, _} = @command.run([], context[:opts])
|
||||||
|
{:ok, _} = :rpc.call(node_name, :rabbit_vhost_sup_sup, :get_vhost_sup, [vhost])
|
||||||
|
end
|
||||||
|
|
||||||
|
def force_vhost_failure(node_name, vhost) do
|
||||||
|
case :rpc.call(node_name, :rabbit_vhost_sup_sup, :get_vhost_sup, [vhost]) do
|
||||||
|
{:ok, sup} ->
|
||||||
|
{_, pid, _, _} = :lists.keyfind(:msg_store_persistent, 1, :supervisor.which_children(sup))
|
||||||
|
Process.exit(pid, :foo)
|
||||||
|
:timer.sleep(100)
|
||||||
|
force_vhost_failure(node_name, vhost);
|
||||||
|
{:error, {:vhost_supervisor_not_running, _}} ->
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
||||||
Loading…
Reference in New Issue