Introduce 'rabbitmq-diagnostics check_port_connectivity'

This commit is contained in:
Michael Klishin 2019-01-23 22:30:20 +03:00
parent ad23b37e3f
commit e8729c5de0
4 changed files with 215 additions and 21 deletions

View File

@ -0,0 +1,105 @@
## The contents of this file are subject to the Mozilla Public License
## Version 1.1 (the "License"); you may not use this file except in
## compliance with the License. You may obtain a copy of the License
## at http://www.mozilla.org/MPL/
##
## Software distributed under the License is distributed on an "AS IS"
## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
## the License for the specific language governing rights and
## limitations under the License.
##
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is GoPivotal, Inc.
## Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
defmodule RabbitMQ.CLI.Diagnostics.Commands.CheckPortConnectivityCommand do
@moduledoc """
Displays all listeners on a node.
Returns a code of 0 unless there were connectivity and authentication
errors. This command is not meant to be used in health checks.
"""
alias RabbitMQ.CLI.Core.Helpers
import RabbitMQ.CLI.Diagnostics.Helpers, only: [listeners_on: 2,
listener_lines: 1,
listener_map: 1,
listener_maps: 1,
check_listener_connectivity: 3]
@behaviour RabbitMQ.CLI.CommandBehaviour
@default_timeout 30_000
use RabbitMQ.CLI.Core.AcceptsDefaultSwitchesAndTimeout
def merge_defaults(args, opts) do
timeout = case opts[:timeout] do
nil -> @default_timeout;
:infinity -> @default_timeout;
other -> other
end
{args, Map.merge(opts, %{timeout: timeout})}
end
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
def run([], %{node: node_name, timeout: timeout}) do
case :rabbit_misc.rpc_call(node_name,
:rabbit_networking, :active_listeners, [], timeout) do
{:error, _} = err -> err
{:error, _, _} = err -> err
xs when is_list(xs) ->
locals = listeners_on(xs, node_name)
case locals do
[] -> {true, locals}
_ -> check_connectivity_of(locals, node_name, timeout)
end;
other -> other
end
end
def output({true, listeners}, %{node: node_name, formatter: "json"}) do
{:ok, %{"result" => "ok",
"node" => node_name,
"listeners" => listener_maps(listeners)}}
end
def output({true, listeners}, %{node: node_name}) do
ports = listeners |> listener_maps |> Enum.map(fn %{port: p} -> p end)
|> Enum.sort |> Enum.join(", ")
{:ok, "Successfully connected to ports #{ports} on node #{node_name}."}
end
def output({false, failures}, %{formatter: "json", node: node_name}) do
{:error, %{"result" => "error",
"node" => node_name,
"failures" => listener_maps(failures)}}
end
def output({false, failures}, %{node: node_name}) do
lines = ["Connection to ports of the following listeners on node #{node_name} failed: "
| listener_lines(failures)]
{:error, Enum.join(lines, Helpers.line_separator())}
end
def usage, do: "check_port_connectivity"
def banner([], %{node: node_name}) do
"Testing TCP connections to all active listeners on node #{node_name} ..."
end
#
# Implementation
#
defp check_connectivity_of(listeners, node_name, timeout) do
# per listener timeout
t = Kernel.trunc(timeout / (length(listeners) + 1))
failures = Enum.reject(listeners,
fn l -> check_listener_connectivity(listener_map(l), node_name, t) end)
case failures do
[] -> {true, listeners}
fs -> {false, fs}
end
end
end

View File

@ -14,6 +14,7 @@
## Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
defmodule RabbitMQ.CLI.Diagnostics.Helpers do
import Record, only: [defrecord: 2, extract: 2]
import RabbitCommon.Records
import Rabbitmq.Atom.Coerce
@ -21,6 +22,8 @@ defmodule RabbitMQ.CLI.Diagnostics.Helpers do
# Listeners
#
defrecord :hostent, extract(:hostent, from_lib: "kernel/include/inet.hrl")
def listeners_on(listeners, target_node) do
Enum.filter(listeners, fn listener(node: node) ->
node == target_node
@ -34,22 +37,25 @@ defmodule RabbitMQ.CLI.Diagnostics.Helpers do
end)
end
def listener_map(listener) do
# Listener options are left out intentionally: they can contain deeply nested values
# that are impossible to serialise to JSON.
#
# Management plugin/HTTP API had its fair share of bugs because of that
# and now filters out a lot of options. Raw listener data can be seen in
# rabbitmq-diagnostics status.
listener(node: node, protocol: protocol, ip_address: interface, port: port) = listener
%{
node: node,
protocol: protocol,
interface: :inet.ntoa(interface) |> to_string |> maybe_enquote_interface,
port: port,
purpose: protocol_label(to_atom(protocol))
}
end
def listener_maps(listeners) do
for listener(node: node, protocol: protocol, ip_address: interface, port: port) <- listeners do
# Listener options are left out intentionally: they can contain deeply nested values
# that are impossible to serialise to JSON.
#
# Management plugin/HTTP API had its fair share of bugs because of that
# and now filters out a lot of options. Raw listener data can be seen in
# rabbitmq-diagnostics status.
%{
node: node,
protocol: protocol,
interface: :inet.ntoa(interface) |> to_string |> maybe_enquote_interface,
port: port,
purpose: protocol_label(to_atom(protocol))
}
end
Enum.map(listeners, &listener_map/1)
end
def listener_rows(listeners) do
@ -63,6 +69,25 @@ defmodule RabbitMQ.CLI.Diagnostics.Helpers do
end
end
def check_port_connectivity(port, node_name, timeout) do
hostname = Regex.replace(~r/^(.+)@/, to_string(node_name), "") |> to_charlist
try do
case :gen_tcp.connect(hostname, port, [], timeout) do
{:error, _} -> false
{:ok, port} ->
:ok = :gen_tcp.close(port)
true
end
# `gen_tcp:connect/4` will throw if the port is outside of its
# expected domain
catch :exit, _ -> false
end
end
def check_listener_connectivity(%{port: port}, node_name, timeout) do
check_port_connectivity(port, node_name, timeout)
end
def normalize_protocol(proto) do
val = proto |> to_string |> String.downcase
case val do

View File

@ -19,9 +19,7 @@ defmodule CloseAllConnectionsCommandTest do
import TestHelper
alias RabbitMQ.CLI.Ctl.RpcStream
@helpers RabbitMQ.CLI.Core.Helpers
@command RabbitMQ.CLI.Ctl.Commands.CloseAllConnectionsCommand
@vhost "/"
@ -33,8 +31,6 @@ defmodule CloseAllConnectionsCommandTest do
on_exit([], fn ->
close_all_connections(get_rabbit_hostname())
end)
:ok
@ -140,7 +136,7 @@ defmodule CloseAllConnectionsCommandTest do
defp fetch_connection_vhosts(node, nodes) do
fetch_connection_vhosts(node, nodes, 10)
end
defp fetch_connection_vhosts(node, nodes, retries) do
stream = RpcStream.receive_list_items(node,
:rabbit_networking,
@ -159,7 +155,7 @@ defmodule CloseAllConnectionsCommandTest do
fetch_connection_vhosts(node, nodes, retries - 1)
_ ->
xs
end
end
end
end

View File

@ -0,0 +1,68 @@
## The contents of this file are subject to the Mozilla Public License
## Version 1.1 (the "License"); you may not use this file except in
## compliance with the License. You may obtain a copy of the License
## at http://www.mozilla.org/MPL/
##
## Software distributed under the License is distributed on an "AS IS"
## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
## the License for the specific language governing rights and
## limitations under the License.
##
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is GoPivotal, Inc.
## Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
defmodule CheckPortConnectivityCommandTest do
use ExUnit.Case
import TestHelper
@command RabbitMQ.CLI.Diagnostics.Commands.CheckPortConnectivityCommand
setup_all do
RabbitMQ.CLI.Core.Distribution.start()
:ok
end
setup context do
{:ok, opts: %{
node: get_rabbit_hostname(),
timeout: context[:test_timeout] || 30000
}}
end
test "merge_defaults: provides a default timeout" do
assert @command.merge_defaults([], %{}) == {[], %{timeout: 30000}}
end
test "validate: treats positional arguments as a failure" do
assert @command.validate(["extra-arg"], %{}) == {:validation_failure, :too_many_args}
end
test "validate: treats empty positional arguments and default switches as a success" do
assert @command.validate([], %{}) == :ok
end
@tag test_timeout: 3000
test "run: targeting an unreachable node throws a badrpc", context do
assert @command.run([], Map.merge(context[:opts], %{node: :jake@thedog})) == {:badrpc, :nodedown}
end
test "run: tries to connect to every inferred active listener", context do
assert match?({true, _}, @command.run([], context[:opts]))
end
test "output: when all connections succeeded, returns a success", context do
assert match?({:ok, _}, @command.output({true, []}, context[:opts]))
end
# note: it's run/2 that filters out non-local alarms
test "output: when target node has a local alarm in effect, returns a failure", context do
failure = {:listener, :rabbit@mercurio, :lolz, 'mercurio',
{0, 0, 0, 0, 0, 0, 0, 0}, 7761613,
[backlog: 128, nodelay: true]}
assert match?({:error, _}, @command.output({false, [failure]}, context[:opts]))
end
end