Wait in stages.

Make wait command work in stages with timeouts.
First wait for a pid file to appear (times out)
Then wait for erlang distribution to start on the node (times out)
Then wait for application to be running in the node (does not time out)

Default timeout is set to 10 seconds, which should be enough to
write a pid file and start distribution.
This commit is contained in:
Daniil Fedotov 2017-06-02 16:48:34 +01:00
parent 6cf003ed03
commit b4a8789ed1
5 changed files with 212 additions and 51 deletions

View File

@ -205,18 +205,56 @@ defmodule RabbitMQ.CLI.Core.Helpers do
# Streamify function sequence.
# Execution can be terminated by an error {:error, _}.
# The error will be the last element in the stream.
def stream_until_error(funs) do
# def stream_until_error(funs) do
# Stream.transform(
# funs, :just,
# fn(f, :just) ->
# case f.() do
# {:error, _} = err -> {[err], :nothing};
# other -> {[other], :just}
# end;
# (_, :nothing) ->
# {:halt, :nothing}
# end)
# end
# Streamify a function sequence passing result
# Execution can be terminated by an error {:error, _}.
# The error will be the last element in the stream.
# Functions can return {:ok, val}, so val will be passed
# to then next function, or {:ok, val, output} where
# val will be passed and output will be put into the stream
def stream_until_error_parametrised(funs, init) do
Stream.transform(
funs, :just,
fn
(f, :just) ->
case f.() do
funs, {:just, init},
fn(f, {:just, val}) ->
case f.(val) do
{:error, _} = err -> {[err], :nothing};
other -> {[other], :just}
:ok -> {[], {:just, val}};
{:ok, new_val} -> {[], {:just, new_val}};
{:ok, new_val, out} -> {[out], {:just, new_val}}
end;
(_, :nothing) ->
{:halt, :nothing}
end)
end)
end
# Streamify function sequence.
# Execution can be terminated by an error {:error, _}.
# The error will be the last element in the stream.
def stream_until_error(funs) do
stream_until_error_parametrised(
Enum.map(
funs,
fn(fun) ->
fn(:no_param) ->
case fun.() do
{:error, _} = err -> err;
other -> {:ok, :no_param, other}
end
end
end),
:no_param)
end
def apply_if_exported(mod, fun, args, default) do

View File

@ -13,24 +13,75 @@
## The Initial Developer of the Original Code is GoPivotal, Inc.
## Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
alias RabbitMQ.CLI.Core.Helpers, as: Helpers
defmodule RabbitMQ.CLI.Ctl.Commands.WaitCommand do
@behaviour RabbitMQ.CLI.CommandBehaviour
@default_timeout 10_000
def merge_defaults(args, opts), do: {args, opts}
def merge_defaults(args, opts) do
timeout = case opts[:timeout] do
nil -> @default_timeout;
:infinity -> @default_timeout;
val -> val
end
{args, Map.put(opts, :timeout, timeout)}
end
def validate([_|_] = args, _) when length(args) > 1, do: {:validation_failure, :too_many_args}
def validate([], _), do: {:validation_failure, :not_enough_args}
def validate([_], %{pid: _}), do: {:validation_failure, "Cannot specify both pid and pidfile"}
def validate([], %{pid: _}), do: :ok
def validate([], _), do: {:validation_failure, "No pid or pidfile specified"}
def validate([_], _), do: :ok
def switches(), do: [pid: :integer]
def aliases(), do: ['P': :pid]
def scopes(), do: [:ctl, :diagnostics]
def run([pid_file], %{node: node_name}) do
wait_for_application(node_name, pid_file, :rabbit_and_plugins);
def run([pid_file], %{node: node_name, timeout: timeout}) do
app_names = :rabbit_and_plugins
Helpers.stream_until_error_parametrised(
[
log("Waiting for pid file '#{pid_file}' to appear"),
fn(_) -> wait_for_pid_file(pid_file, timeout) end,
log_param(fn(pid) -> "pid is #{pid}" end),
]
++
wait_for_pid_funs(node_name, app_names, timeout),
:init)
end
def run([], %{node: node_name, pid: pid, timeout: timeout}) do
app_names = :rabbit_and_plugins
Helpers.stream_until_error_parametrised(
wait_for_pid_funs(node_name, app_names, timeout),
pid)
end
def usage, do: "wait <pid_file>"
defp wait_for_pid_funs(node_name, app_names, timeout) do
app_names_formatted = :io_lib.format('~p', [app_names])
[
log_param(fn(pid) -> "Waiting for erlang distribution on node '#{node_name}' while OS process '#{pid}' is running" end),
fn(pid) -> wait_for_erlang_distribution(pid, node_name, timeout) end,
log("Waiting for applications '#{app_names_formatted}' to start on node '#{node_name}'"),
fn(_) -> wait_for_application(node_name, app_names) end,
log("Applications '#{app_names_formatted}' are running on node '#{node_name}'")
]
end
defp log(string) do
fn(val) -> {:ok, val, string} end
end
defp log_param(fun) do
fn(val) -> {:ok, val, fun.(val)} end
end
def usage, do: "wait [<pid_file>] [--pid|-P <pid>]"
def banner(_, %{node: node_name}), do: "Waiting for node #{node_name} ..."
@ -67,50 +118,74 @@ defmodule RabbitMQ.CLI.Ctl.Commands.WaitCommand do
:undefined
end
defp wait_for_application(node, pid_file, :rabbit_and_plugins) do
case read_pid_file(pid_file, true) do
{:error, _} = err -> err
pid ->
{:stream, Stream.concat([["pid is #{pid}"],
RabbitMQ.CLI.Core.Helpers.defer(
fn() ->
wait_for_startup(node, pid)
end)])}
end
defp wait_for_application(node_name, :rabbit_and_plugins) do
case :rpc.call(node_name, :rabbit, :await_startup, []) do
:ok -> :ok;
{:badrpc, reason} -> {:error, {:badrpc, reason}}
end
end
defp wait_for_startup(node, pid) do
while_process_is_alive(
node, pid, fn() -> :rpc.call(node, :rabbit, :await_startup, []) == :ok end)
end
defp while_process_is_alive(node, pid, activity) do
case :rabbit_misc.is_os_process_alive(pid) do
true ->
case activity.() do
true -> :ok
false ->
:timer.sleep(1000)
while_process_is_alive(node, pid, activity)
defp wait_for_erlang_distribution(pid, node_name, timeout) do
wait_for(timeout,
fn() ->
case check_distribution(pid, node_name) do
# Loop while node is available.
{:error, :pang} -> {:error, :loop};
other -> other
end
end)
end
defp check_distribution(pid, node_name) do
case is_os_process_alive(pid) do
true ->
case Node.ping(node_name) do
:pong -> :ok;
:pang -> {:error, :pang}
end;
false -> {:error, :process_not_running}
end
end
defp read_pid_file(pid_file, wait) do
case {:file.read_file(pid_file), wait} do
{{:ok, bin}, _} ->
case Integer.parse(bin) do
:error ->
{:error, {:garbage_in_pid_file, pid_file}}
{int, _} -> Integer.to_char_list int
defp is_os_process_alive(pid) do
:rabbit_misc.is_os_process_alive(to_charlist(pid))
end
defp wait_for_pid_file(pid_file, timeout) do
wait_for(timeout,
fn() ->
case :file.read_file(pid_file) do
{:ok, bin} ->
case Integer.parse(bin) do
:error ->
{:error, {:garbage_in_pid_file, pid_file}}
{int, _} -> {:ok, int}
end
{:error, :enoent} ->
{:error, :loop};
{:error, err} ->
{:error, {:could_not_read_pid, err}}
end
{{:error, :enoent}, true} ->
:timer.sleep(1000)
read_pid_file(pid_file, wait)
{{:error, err}, _} ->
{:error, {:could_not_read_pid, err}}
end)
end
def wait_for(timeout, fun) do
sleep = round(timeout / 10)
case wait_for_loop(timeout, sleep, fun) do
{:error, :timeout} -> {:error, {:timeout, timeout}}
other -> other
end
end
def wait_for_loop(timeout, _, _) when timeout <= 0 do
{:error, :timeout}
end
def wait_for_loop(timeout, sleep, fun) do
case fun.() do
{:error, :loop} ->
:timer.sleep(sleep)
wait_for_loop(timeout - sleep, sleep, fun);
other -> other
end
end
end

View File

@ -44,7 +44,7 @@ defmodule ReportTest do
test "run: report request on a named, active RMQ node is successful", context do
output = @command.run([], context[:opts]) |> Enum.to_list
true = Enum.all?(output, fn({:error, _}) -> false; ({:error, _, _}) -> false; (_) -> true end)
assert_stream_without_errors(output)
end
test "run: report request on nonexistent RabbitMQ node returns nodedown" do

View File

@ -364,6 +364,12 @@ defmodule TestHelper do
end
end
def assert_stream_without_errors(stream) do
true = Enum.all?(stream, fn({:error, _}) -> false;
({:error, _, _}) -> false;
(_) -> true end)
end
def vhost_exists?(vhost) do
Enum.any?(list_vhosts(), fn(v) -> v[:name] == vhost end)
end

View File

@ -33,12 +33,47 @@ defmodule WaitCommandTest do
end
setup do
{:ok, opts: %{node: get_rabbit_hostname()}}
{:ok, opts: %{node: get_rabbit_hostname(), timeout: 500}}
end
test "validate: with extra arguments returns an arg count error", context do
test "validate: cannot have both pid and pidfile", context do
{:validation_failure, "Cannot specify both pid and pidfile"} =
@command.validate(["pid_file"], Map.merge(context[:opts], %{pid: 123}))
end
test "validate: should have either pid or pidfile", context do
{:validation_failure, "No pid or pidfile specified"} =
@command.validate([], context[:opts])
end
test "validate: with more than one argument returns an arg count error", context do
assert @command.validate(["pid_file", "extra"], context[:opts]) == {:validation_failure, :too_many_args}
assert @command.validate([], context[:opts]) == {:validation_failure, :not_enough_args}
end
test "run: times out waiting for non-existent pid file", context do
{:error, {:timeout, _}} = @command.run(["pid_file"], context[:opts]) |> Enum.to_list |> List.last
end
test "run: fails if pid process does not exist", context do
non_existent_pid = get_non_existent_os_pid()
{:error, :process_not_running} =
@command.run([], Map.merge(context[:opts], %{pid: non_existent_pid}))
|> Enum.to_list
|> List.last
end
test "run: times out if unable to communicate with the node", context do
pid = String.to_integer(System.get_pid())
{:error, {:timeout, _}} =
@command.run([], Map.merge(context[:opts], %{pid: pid, node: :nonode@nohost}))
|> Enum.to_list
|> List.last
end
test "run: happy path", context do
pid = :erlang.list_to_integer(:rpc.call(context[:opts][:node], :os, :getpid, []))
output = @command.run([], Map.merge(context[:opts], %{pid: pid}))
assert_stream_without_errors(output)
end
test "banner", context do
@ -71,4 +106,11 @@ defmodule WaitCommandTest do
assert match?(:ok, @command.output(:ok, context[:opts]))
assert match?({:ok, "ok"}, @command.output("ok", context[:opts]))
end
def get_non_existent_os_pid(pid \\ 2) do
case :rabbit_misc.is_os_process_alive(to_charlist(pid)) do
true -> get_non_existent_os_pid(pid + 1)
false -> pid
end
end
end