Merge branch 'stable' into rabbitmq-cli-29

This commit is contained in:
Michael Klishin 2016-05-21 17:33:51 +03:00
commit f4e9d04751
23 changed files with 1575 additions and 32 deletions

42
deps/rabbitmq_cli/lib/info_keys.ex vendored Normal file
View File

@ -0,0 +1,42 @@
defmodule InfoKeys do
require Record
Record.defrecord(:resource, :resource,
Record.extract(:resource,
from_lib: "rabbit_common/include/rabbit.hrl"))
def with_valid_info_keys(args, valid_keys, fun) do
info_keys = Enum.map(args, &String.to_atom/1)
case invalid_info_keys(info_keys, valid_keys) do
[_|_] = bad_info_keys ->
{:error, {:bad_info_key, bad_info_keys}}
[] -> fun.(info_keys)
end
end
defp invalid_info_keys(info_keys, valid_keys) do
# Difference between enums.
# It's faster than converting to sets for small lists
info_keys -- valid_keys
end
def info_for_keys(item, []) do
item
end
def info_for_keys([{_,_}|_] = item, info_keys) do
Enum.filter_map(item,
fn({k, _}) -> Enum.member?(info_keys, k) end,
fn({k, v}) -> {k, format_info_item(v)} end)
end
defp format_info_item(res) when Record.is_record(res, :resource) do
rec = resource()
resource(res, :name)
end
defp format_info_item(any) do
any
end
end

View File

@ -0,0 +1,64 @@
## The contents of this file are subject to the Mozilla Public License
## Version 1.1 (the "License"); you may not use this file except in
## compliance with the License. You may obtain a copy of the License
## at http://www.mozilla.org/MPL/
##
## Software distributed under the License is distributed on an "AS IS"
## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
## the License for the specific language governing rights and
## limitations under the License.
##
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is GoPivotal, Inc.
## Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
defmodule ListBindingsCommand do
@behaviour CommandBehaviour
@info_keys ~w(source_name source_kind destination_name destination_kind routing_key arguments)a
def flags() do
[:param]
end
def usage() do
"list_bindings [-p <vhost>] [<bindinginfoitem> ...]"
end
def usage_additional() do
"<bindinginfoitem> must be a member of the list ["<>
Enum.join(@info_keys, ", ") <>"]."
end
def run([], opts) do
run(~w(source_name source_kind
destination_name destination_kind
routing_key arguments),
opts)
end
def run([_|_] = args, %{node: node_name, timeout: timeout, param: vhost} = opts) do
InfoKeys.with_valid_info_keys(args, @info_keys,
fn(info_keys) ->
info(opts)
node_name
|> Helpers.parse_node
|> RpcStream.receive_list_items(:rabbit_binding, :info_all,
[vhost, info_keys],
timeout,
info_keys)
end)
end
def run([_|_] = args, %{node: _node_name, timeout: _timeout} = opts) do
run(args, Map.merge(default_opts, opts))
end
defp default_opts() do
%{param: "/"}
end
defp info(%{quiet: true}), do: nil
defp info(%{param: vhost}), do: IO.puts "Listing bindings for vhost #{vhost} ..."
end

View File

@ -0,0 +1,62 @@
## The contents of this file are subject to the Mozilla Public License
## Version 1.1 (the "License"); you may not use this file except in
## compliance with the License. You may obtain a copy of the License
## at http://www.mozilla.org/MPL/
##
## Software distributed under the License is distributed on an "AS IS"
## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
## the License for the specific language governing rights and
## limitations under the License.
##
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is GoPivotal, Inc.
## Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
##
defmodule ListChannelsCommand do
@behaviour CommandBehaviour
@info_keys ~w(pid connection name number user vhost transactional
confirm consumer_count messages_unacknowledged
messages_uncommitted acks_uncommitted messages_unconfirmed
prefetch_count global_prefetch_count)a
def flags() do
[]
end
def usage() do
"list_channels [<channelinfoitem> ...]"
end
def usage_additional() do
"<channelinfoitem> must be a member of the list ["<>
Enum.join(@info_keys, ", ") <>"]."
end
def run([], opts) do
run(~w(pid user consumer_count messages_unacknowledged), opts)
end
def run([_|_] = args, %{node: node_name, timeout: timeout} = opts) do
info_keys = Enum.map(args, &String.to_atom/1)
InfoKeys.with_valid_info_keys(args, @info_keys,
fn(info_keys) ->
info(opts)
node_name
|> Helpers.parse_node
|> RpcStream.receive_list_items(:rabbit_channel, :info_all,
[info_keys],
timeout,
info_keys)
end)
end
defp default_opts() do
%{}
end
defp info(%{quiet: true}), do: nil
defp info(_), do: IO.puts "Listing channels ..."
end

View File

@ -0,0 +1,64 @@
## The contents of this file are subject to the Mozilla Public License
## Version 1.1 (the "License"); you may not use this file except in
## compliance with the License. You may obtain a copy of the License
## at http://www.mozilla.org/MPL/
##
## Software distributed under the License is distributed on an "AS IS"
## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
## the License for the specific language governing rights and
## limitations under the License.
##
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is GoPivotal, Inc.
## Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
defmodule ListConnectionsCommand do
@behaviour CommandBehaviour
@info_keys ~w(pid name port host peer_port peer_host ssl ssl_protocol
ssl_key_exchange ssl_cipher ssl_hash peer_cert_subject
peer_cert_issuer peer_cert_validity state
channels protocol auth_mechanism user vhost timeout frame_max
channel_max client_properties recv_oct recv_cnt send_oct
send_cnt send_pend connected_at)a
def flags() do
[]
end
def usage() do
"list_connections [<connectioninfoitem> ...]"
end
def usage_additional() do
"<connectioninfoitem> must be a member of the list ["<>
Enum.join(@info_keys, ", ") <>"]."
end
def run([], opts) do
run(~w(user peer_host peer_port state), opts)
end
def run([_|_] = args, %{node: node_name, timeout: timeout} = opts) do
info_keys = Enum.map(args, &String.to_atom/1)
InfoKeys.with_valid_info_keys(args, @info_keys,
fn(info_keys) ->
info(opts)
node_name
|> Helpers.parse_node
|> RpcStream.receive_list_items(:rabbit_networking,
:connection_info_all,
[info_keys],
timeout,
info_keys)
end)
end
defp default_opts() do
%{}
end
defp info(%{quiet: true}), do: nil
defp info(_), do: IO.puts "Listing connections ..."
end

View File

@ -0,0 +1,60 @@
## The contents of this file are subject to the Mozilla Public License
## Version 1.1 (the "License"); you may not use this file except in
## compliance with the License. You may obtain a copy of the License
## at http://www.mozilla.org/MPL/
##
## Software distributed under the License is distributed on an "AS IS"
## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
## the License for the specific language governing rights and
## limitations under the License.
##
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is GoPivotal, Inc.
## Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
defmodule ListConsumersCommand do
@behaviour CommandBehaviour
@info_keys ~w(queue_name channel_pid consumer_tag
ack_required prefetch_count arguments)a
def flags() do
[]
end
def usage() do
"list_consumers [-p vhost] [<consumerinfoitem> ...]"
end
def usage_additional() do
"<consumerinfoitem> must be a member of the list ["<>
Enum.join(@info_keys, ", ") <>"]."
end
def run([], opts) do
run(Enum.map(@info_keys, &Atom.to_string/1), opts)
end
def run([_|_] = args, %{node: node_name, timeout: timeout, param: vhost} = opts) do
InfoKeys.with_valid_info_keys(args, @info_keys,
fn(info_keys) ->
info(opts)
node_name
|> Helpers.parse_node
|> RpcStream.receive_list_items(:rabbit_amqqueue, :consumers_all,
[vhost], timeout, info_keys)
end)
end
def run(args, %{node: _node_name, timeout: _timeout} = opts) do
run(args, Map.merge(default_opts, opts))
end
defp default_opts() do
%{param: "/"}
end
defp info(%{quiet: true}), do: nil
defp info(%{param: vhost}), do: IO.puts "Listing consumers on vhost #{vhost} ..."
end

View File

@ -0,0 +1,62 @@
## The contents of this file are subject to the Mozilla Public License
## Version 1.1 (the "License"); you may not use this file except in
## compliance with the License. You may obtain a copy of the License
## at http://www.mozilla.org/MPL/
##
## Software distributed under the License is distributed on an "AS IS"
## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
## the License for the specific language governing rights and
## limitations under the License.
##
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is GoPivotal, Inc.
## Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
defmodule ListExchangesCommand do
@behaviour CommandBehaviour
@info_keys ~w(name type durable auto_delete internal arguments policy)a
def flags() do
[:param]
end
def usage() do
"list_exchanges [-p <vhost>] [<exchangeinfoitem> ...]"
end
def usage_additional() do
"<exchangeinfoitem> must be a member of the list ["<>
Enum.join(@info_keys, ", ") <>"]."
end
def run([], opts) do
run(~w(name type), opts)
end
def run([_|_] = args, %{node: node_name, timeout: timeout, param: vhost} = opts) do
info_keys = Enum.map(args, &String.to_atom/1)
InfoKeys.with_valid_info_keys(args, @info_keys,
fn(info_keys) ->
info(opts)
node_name
|> Helpers.parse_node
|> RpcStream.receive_list_items(:rabbit_exchange, :info_all,
[vhost, info_keys],
timeout,
info_keys)
end)
end
def run([_|_] = args, %{node: node_name, timeout: _timeout} = opts) do
run(args, Map.merge(default_opts, opts))
end
defp default_opts() do
%{param: "/"}
end
defp info(%{quiet: true}), do: nil
defp info(_), do: IO.puts "Listing exchanges ..."
end

View File

@ -0,0 +1,72 @@
## The contents of this file are subject to the Mozilla Public License
## Version 1.1 (the "License"); you may not use this file except in
## compliance with the License. You may obtain a copy of the License
## at http://www.mozilla.org/MPL/
##
## Software distributed under the License is distributed on an "AS IS"
## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
## the License for the specific language governing rights and
## limitations under the License.
##
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is GoPivotal, Inc.
## Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
defmodule ListQueuesCommand do
@behaviour CommandBehaviour
@info_keys ~w(name durable auto_delete
arguments policy pid owner_pid exclusive exclusive_consumer_pid
exclusive_consumer_tag messages_ready messages_unacknowledged messages
messages_ready_ram messages_unacknowledged_ram messages_ram
messages_persistent message_bytes message_bytes_ready
message_bytes_unacknowledged message_bytes_ram message_bytes_persistent
head_message_timestamp disk_reads disk_writes consumers
consumer_utilisation memory slave_pids synchronised_slave_pids state)a
def flags() do
[:param, :offline, :online]
end
def usage() do
"list_queues [-p <vhost>] [--online] [--offline] [<queueinfoitem> ...]"
end
def usage_additional() do
"<queueinfoitem> must be a member of the list ["<>
Enum.join(@info_keys, ", ") <>"]."
end
def run([], opts) do
run(~w(name messages), opts)
end
def run([_|_] = args, %{node: node_name, timeout: timeout, param: vhost,
online: online_opt, offline: offline_opt} = opts) do
{online, offline} = case {online_opt, offline_opt} do
{false, false} -> {true, true};
other -> other
end
InfoKeys.with_valid_info_keys(args, @info_keys,
fn(info_keys) ->
info(opts)
node_name
|> Helpers.parse_node
|> RpcStream.receive_list_items(:rabbit_amqqueue, :info_all,
[vhost, info_keys, online, offline],
timeout,
info_keys)
end)
end
def run([_|_] = args, opts) do
run(args, Map.merge(default_opts, opts))
end
defp default_opts() do
%{param: "/", offline: false, online: false}
end
defp info(%{quiet: true}), do: nil
defp info(_), do: IO.puts "Listing queues ..."
end

View File

@ -17,12 +17,13 @@
defmodule Parser do
# Input: A list of strings
# Output: A 2-tuple of lists: one containing the command,
# Output: A 2-tuple of lists: one containing the command,
# one containing flagged options.
def parse(command) do
{options, cmd, invalid} = OptionParser.parse(
command,
switches: [node: :atom, quiet: :boolean, timeout: :integer],
switches: [node: :atom, quiet: :boolean, timeout: :integer,
online: :boolean, offline: :boolean],
aliases: [p: :vhost, n: :node, q: :quiet, t: :timeout]
)
{clear_on_empty_command(cmd), options_map(options, invalid)}

54
deps/rabbitmq_cli/lib/rpc_stream.ex vendored Normal file
View File

@ -0,0 +1,54 @@
defmodule RpcStream do
def receive_list_items(node, mod, fun, args, timeout, info_keys) do
pid = Kernel.self
ref = Kernel.make_ref
init_items_stream(node, mod, fun, args, timeout, pid, ref)
Stream.unfold(:continue,
fn
:finished -> nil;
:continue ->
receive do
{^ref, :finished} -> nil;
{^ref, {:timeout, t}} -> {{:error, {:badrpc, {:timeout, (t / 1000)}}}, :finished};
{^ref, result, :continue} -> {result, :continue};
{:error, _} = error -> {error, :finished}
end
end)
|> display_list_items(info_keys)
end
defp display_list_items(items, info_keys) do
items
|> Stream.filter(fn([]) -> false; (_) -> true end)
|> Enum.map(
fn({:error, error}) -> error;
# if item is list of keyword lists:
([[{_,_}|_]|_] = item) ->
Enum.map(item, fn(i) -> InfoKeys.info_for_keys(i, info_keys) end);
(item) ->
InfoKeys.info_for_keys(item, info_keys)
end)
end
defp init_items_stream(node, mod, fun, args, timeout, pid, ref) do
Kernel.spawn_link(
fn() ->
case :rabbit_misc.rpc_call(node, mod, fun, args, ref, pid, timeout) do
{:error, _} = error -> send(pid, {:error, error});
{:bad_argument, _} = error -> send(pid, {:error, error});
{:badrpc, :timeout} -> send(pid, {:timeout, timeout});
{:badrpc, _} = error -> send(pid, {:error, error});
_ -> :ok
end
end)
set_stream_timeout(pid, ref, timeout)
end
defp set_stream_timeout(_, _, :infinity) do
:ok
end
defp set_stream_timeout(pid, ref, timeout) do
Process.send_after(pid, {ref, {:timeout, timeout}}, timeout)
end
end

View File

@ -15,7 +15,6 @@
defmodule StatusPrint do
import Helpers
import TablePrint
@otp_version_tag "otp_version"

View File

@ -52,6 +52,23 @@ defmodule RabbitMQCtl.Mixfile do
:rabbit_common,
git: "https://github.com/rabbitmq/rabbitmq-common.git",
branch: "stable"
},
# for test helper(s) that close connections and so on
{
:rabbit,
git: "https://github.com/rabbitmq/rabbitmq-server.git",
branch: "stable"
},
{
:amqp_client,
git: "https://github.com/rabbitmq/rabbitmq-erlang-client.git",
branch: "stable",
override: true
},
{
:amqp,
git: "https://github.com/pma/amqp.git",
branch: "master"
}
]
end

View File

@ -0,0 +1,102 @@
defmodule ListBindingsCommandTest do
use ExUnit.Case, async: false
import ExUnit.CaptureIO
import TestHelper
@vhost "test1"
@user "guest"
@root "/"
@default_timeout :infinity
setup_all do
:net_kernel.start([:rabbitmqctl, :shortnames])
:net_kernel.connect_node(get_rabbit_hostname)
on_exit([], fn ->
:erlang.disconnect_node(get_rabbit_hostname)
:net_kernel.stop()
end)
:ok
end
setup context do
add_vhost @vhost
set_permissions @user, @vhost, [".*", ".*", ".*"]
on_exit(fn ->
delete_vhost @vhost
end)
{
:ok,
opts: %{
node: get_rabbit_hostname,
timeout: context[:test_timeout] || @default_timeout,
param: @vhost
}
}
end
@tag test_timeout: :infinity
test "return bad_info_key on a single bad arg", context do
capture_io(fn ->
assert ListBindingsCommand.run(["quack"], context[:opts]) ==
{:error, {:bad_info_key, [:quack]}}
end)
end
@tag test_timeout: :infinity
test "multiple bad args return a list of bad info key values", context do
capture_io(fn ->
assert ListBindingsCommand.run(["quack", "oink"], context[:opts]) ==
{:error, {:bad_info_key, [:quack, :oink]}}
end)
end
@tag test_timeout: :infinity
test "return bad_info_key on mix of good and bad args", context do
capture_io(fn ->
assert ListBindingsCommand.run(["quack", "source_name"], context[:opts]) ==
{:error, {:bad_info_key, [:quack]}}
assert ListBindingsCommand.run(["source_name", "oink"], context[:opts]) ==
{:error, {:bad_info_key, [:oink]}}
assert ListBindingsCommand.run(["source_kind", "oink", "source_name"], context[:opts]) ==
{:error, {:bad_info_key, [:oink]}}
end)
end
@tag test_timeout: 0
test "zero timeout causes command to return badrpc", context do
capture_io(fn ->
assert ListBindingsCommand.run([], context[:opts]) ==
[{:badrpc, {:timeout, 0.0}}]
end)
end
test "no bindings for no queues", context do
capture_io(fn ->
[] = ListBindingsCommand.run([], context[:opts])
end)
end
test "by default returns all info keys", context do
default_keys = ~w(source_name source_kind destination_name destination_kind routing_key arguments)a
capture_io(fn ->
declare_queue("test_queue", @vhost)
:timer.sleep(100)
[binding] = ListBindingsCommand.run([], context[:opts])
assert default_keys == Keyword.keys(binding)
end)
end
test "can filter info keys", context do
wanted_keys = ~w(source_name destination_name routing_key)
capture_io(fn ->
declare_queue("test_queue", @vhost)
assert ListBindingsCommand.run(wanted_keys, context[:opts]) ==
[[source_name: "", destination_name: "test_queue", routing_key: "test_queue"]]
end)
end
end

View File

@ -0,0 +1,125 @@
defmodule ListChannelsCommandTest do
use ExUnit.Case, async: false
import ExUnit.CaptureIO
import TestHelper
@user "guest"
@default_timeout :infinity
setup_all do
:net_kernel.start([:rabbitmqctl, :shortnames])
:net_kernel.connect_node(get_rabbit_hostname)
close_all_connections()
on_exit([], fn ->
close_all_connections()
:erlang.disconnect_node(get_rabbit_hostname)
:net_kernel.stop()
end)
:ok
end
setup context do
{
:ok,
opts: %{
node: get_rabbit_hostname,
timeout: context[:test_timeout] || @default_timeout
}
}
end
test "return bad_info_key on a single bad arg", context do
capture_io(fn ->
assert ListChannelsCommand.run(["quack"], context[:opts]) ==
{:error, {:bad_info_key, [:quack]}}
end)
end
test "multiple bad args return a list of bad info key values", context do
capture_io(fn ->
assert ListChannelsCommand.run(["quack", "oink"], context[:opts]) ==
{:error, {:bad_info_key, [:quack, :oink]}}
end)
end
test "return bad_info_key on mix of good and bad args", context do
capture_io(fn ->
assert ListChannelsCommand.run(["quack", "pid"], context[:opts]) ==
{:error, {:bad_info_key, [:quack]}}
assert ListChannelsCommand.run(["user", "oink"], context[:opts]) ==
{:error, {:bad_info_key, [:oink]}}
assert ListChannelsCommand.run(["user", "oink", "pid"], context[:opts]) ==
{:error, {:bad_info_key, [:oink]}}
end)
end
@tag test_timeout: 0
test "zero timeout causes command to return badrpc", context do
capture_io(fn ->
assert ListChannelsCommand.run([], context[:opts]) ==
[{:badrpc, {:timeout, 0.0}}]
end)
end
test "default channel info keys are pid, user, consumer_count, and messages_unacknowledged", context do
close_all_connections()
capture_io(fn ->
with_channel("/", fn(_channel) ->
channels = ListChannelsCommand.run([], context[:opts])
chan = Enum.at(channels, 0)
assert Keyword.keys(chan) == ~w(pid user consumer_count messages_unacknowledged)a
assert [user: "guest", consumer_count: 0, messages_unacknowledged: 0] == Keyword.delete(chan, :pid)
end)
end)
end
test "multiple channels on multiple connections", context do
close_all_connections()
capture_io(fn ->
with_channel("/", fn(_channel1) ->
with_channel("/", fn(_channel2) ->
channels = ListChannelsCommand.run(["pid", "user", "connection"], context[:opts])
chan1 = Enum.at(channels, 0)
chan2 = Enum.at(channels, 1)
assert Keyword.keys(chan1) == ~w(pid user connection)a
assert Keyword.keys(chan2) == ~w(pid user connection)a
assert "guest" == chan1[:user]
assert "guest" == chan2[:user]
assert chan1[:pid] !== chan2[:pid]
end)
end)
end)
end
test "multiple channels on single connection", context do
close_all_connections()
capture_io(fn ->
with_connection("/", fn(conn) ->
{:ok, _} = AMQP.Channel.open(conn)
{:ok, _} = AMQP.Channel.open(conn)
channels = ListChannelsCommand.run(["pid", "user", "connection"], context[:opts])
chan1 = Enum.at(channels, 0)
chan2 = Enum.at(channels, 1)
assert Keyword.keys(chan1) == ~w(pid user connection)a
assert Keyword.keys(chan2) == ~w(pid user connection)a
assert "guest" == chan1[:user]
assert "guest" == chan2[:user]
assert chan1[:pid] !== chan2[:pid]
end)
end)
end
test "info keys order is preserved", context do
close_all_connections()
capture_io(fn ->
with_channel("/", fn(_channel) ->
channels = ListChannelsCommand.run(~w(connection vhost name pid number user), context[:opts])
chan = Enum.at(channels, 0)
assert Keyword.keys(chan) == ~w(connection vhost name pid number user)a
end)
end)
end
end

View File

@ -0,0 +1,105 @@
defmodule ListConnectionsCommandTest do
use ExUnit.Case, async: false
import ExUnit.CaptureIO
import TestHelper
@user "guest"
@default_timeout 15000
setup_all do
:net_kernel.start([:rabbitmqctl, :shortnames])
:net_kernel.connect_node(get_rabbit_hostname)
close_all_connections()
on_exit([], fn ->
close_all_connections()
:erlang.disconnect_node(get_rabbit_hostname)
:net_kernel.stop()
end)
:ok
end
setup context do
{
:ok,
opts: %{
node: get_rabbit_hostname,
timeout: context[:test_timeout] || @default_timeout
}
}
end
test "return bad_info_key on a single bad arg", context do
capture_io(fn ->
assert ListConnectionsCommand.run(["quack"], context[:opts]) ==
{:error, {:bad_info_key, [:quack]}}
end)
end
test "multiple bad args return a list of bad info key values", context do
capture_io(fn ->
assert ListConnectionsCommand.run(["quack", "oink"], context[:opts]) ==
{:error, {:bad_info_key, [:quack, :oink]}}
end)
end
test "return bad_info_key on mix of good and bad args", context do
capture_io(fn ->
assert ListConnectionsCommand.run(["quack", "peer_host"], context[:opts]) ==
{:error, {:bad_info_key, [:quack]}}
assert ListConnectionsCommand.run(["user", "oink"], context[:opts]) ==
{:error, {:bad_info_key, [:oink]}}
assert ListConnectionsCommand.run(["user", "oink", "peer_host"], context[:opts]) ==
{:error, {:bad_info_key, [:oink]}}
end)
end
@tag test_timeout: 0
test "zero timeout causes command to return badrpc", context do
capture_io(fn ->
assert ListConnectionsCommand.run([], context[:opts]) ==
[{:badrpc, {:timeout, 0.0}}]
end)
end
test "user, peer_host, peer_port and state by default", context do
vhost = "/"
capture_io(fn ->
with_connection(vhost, fn(_conn) ->
conns = ListConnectionsCommand.run([], context[:opts])
assert Enum.any?(conns, fn(conn) -> conn[:state] != nil end)
end)
end)
end
test "filter single key", context do
vhost = "/"
capture_io(fn ->
with_connection(vhost, fn(_conn) ->
conns = ListConnectionsCommand.run(["name"], context[:opts])
assert (Enum.map(conns, &Keyword.keys/1) |> Enum.uniq) == [[:name]]
assert Enum.any?(conns, fn(conn) -> conn[:name] != nil end)
end)
end)
end
test "show connection vhost", context do
vhost = "custom_vhost"
add_vhost vhost
set_permissions @user, vhost, [".*", ".*", ".*"]
on_exit(fn ->
delete_vhost vhost
end)
capture_io(fn ->
with_connection(vhost, fn(_conn) ->
conns = ListConnectionsCommand.run(["vhost"], context[:opts])
assert (Enum.map(conns, &Keyword.keys/1) |> Enum.uniq) == [[:vhost]]
assert Enum.any?(conns, fn(conn) -> conn[:vhost] == vhost end)
end)
end)
end
end

View File

@ -0,0 +1,127 @@
defmodule ListConsumersCommandTest do
use ExUnit.Case, async: false
import ExUnit.CaptureIO
import TestHelper
@vhost "test1"
@user "guest"
@default_timeout :infinity
setup_all do
:net_kernel.start([:rabbitmqctl, :shortnames])
:net_kernel.connect_node(get_rabbit_hostname)
on_exit([], fn ->
:erlang.disconnect_node(get_rabbit_hostname)
:net_kernel.stop()
end)
:ok
end
setup context do
add_vhost @vhost
set_permissions @user, @vhost, [".*", ".*", ".*"]
on_exit(fn ->
delete_vhost @vhost
end)
{
:ok,
opts: %{
node: get_rabbit_hostname,
timeout: context[:test_timeout] || @default_timeout,
param: @vhost
}
}
end
@tag test_timeout: :infinity
test "return bad_info_key on a single bad arg", context do
capture_io(fn ->
assert ListConsumersCommand.run(["quack"], context[:opts]) ==
{:error, {:bad_info_key, [:quack]}}
end)
end
@tag test_timeout: :infinity
test "multiple bad args return a list of bad info key values", context do
capture_io(fn ->
assert ListConsumersCommand.run(["quack", "oink"], context[:opts]) ==
{:error, {:bad_info_key, [:quack, :oink]}}
end)
end
@tag test_timeout: :infinity
test "return bad_info_key on mix of good and bad args", context do
capture_io(fn ->
assert ListConsumersCommand.run(["quack", "queue_name"], context[:opts]) ==
{:error, {:bad_info_key, [:quack]}}
assert ListConsumersCommand.run(["queue_name", "oink"], context[:opts]) ==
{:error, {:bad_info_key, [:oink]}}
assert ListConsumersCommand.run(["channel_pid", "oink", "queue_name"], context[:opts]) ==
{:error, {:bad_info_key, [:oink]}}
end)
end
@tag test_timeout: 0
test "zero timeout causes command to return badrpc", context do
capture_io(fn ->
assert ListConsumersCommand.run([], context[:opts]) ==
[{:badrpc, {:timeout, 0.0}}]
end)
end
test "no consumers for no open connections", context do
close_all_connections
capture_io(fn ->
[] = ListConsumersCommand.run([], context[:opts])
end)
end
test "all info keys by default", context do
queue_name = "test_queue1"
consumer_tag = "i_am_consumer"
info_keys = ~w(queue_name channel_pid consumer_tag ack_required prefetch_count arguments)a
capture_io(fn ->
declare_queue(queue_name, @vhost)
with_channel(@vhost, fn(channel) ->
{:ok, _} = AMQP.Basic.consume(channel, queue_name, nil, [consumer_tag: consumer_tag])
:timer.sleep(100)
[[consumer]] = ListConsumersCommand.run([], context[:opts])
assert info_keys == Keyword.keys(consumer)
assert consumer[:consumer_tag] == consumer_tag
assert consumer[:queue_name] == queue_name
assert Keyword.delete(consumer, :channel_pid) ==
[queue_name: queue_name, consumer_tag: consumer_tag,
ack_required: true, prefetch_count: 0, arguments: []]
end)
end)
end
test "consumers are grouped by queues (multiple consumer per queue)", context do
queue_name1 = "test_queue1"
queue_name2 = "test_queue2"
capture_io(fn ->
declare_queue("test_queue1", @vhost)
declare_queue("test_queue2", @vhost)
with_channel(@vhost, fn(channel) ->
{:ok, tag1} = AMQP.Basic.consume(channel, queue_name1)
{:ok, tag2} = AMQP.Basic.consume(channel, queue_name2)
{:ok, tag3} = AMQP.Basic.consume(channel, queue_name2)
:timer.sleep(100)
try do
consumers = ListConsumersCommand.run(["queue_name", "consumer_tag"], context[:opts])
{[[consumer1]], [consumers2]} = Enum.partition(consumers, fn([_]) -> true; ([_,_]) -> false end)
assert [queue_name: queue_name1, consumer_tag: tag1] == consumer1
assert Keyword.equal?([{tag2, queue_name2}, {tag3, queue_name2}],
for([queue_name: q, consumer_tag: t] <- consumers2, do: {t, q}))
after
AMQP.Basic.cancel(channel, tag1)
AMQP.Basic.cancel(channel, tag2)
AMQP.Basic.cancel(channel, tag3)
end
end)
end)
end
end

View File

@ -0,0 +1,177 @@
defmodule ListExchangesCommandTest do
use ExUnit.Case, async: false
import ExUnit.CaptureIO
import TestHelper
@vhost "test1"
@user "guest"
@root "/"
@default_timeout :infinity
@default_exchanges [{"amq.direct", :direct},
{"amq.fanout", :fanout},
{"amq.match", :headers},
{"amq.rabbitmq.trace", :topic},
{"amq.headers", :headers},
{"amq.topic", :topic},
{"", :direct}]
defp default_exchange_names() do
{names, _types} = Enum.unzip(@default_exchanges)
names
end
setup_all do
:net_kernel.start([:rabbitmqctl, :shortnames])
:net_kernel.connect_node(get_rabbit_hostname)
on_exit([], fn ->
:erlang.disconnect_node(get_rabbit_hostname)
:net_kernel.stop()
end)
:ok
end
setup context do
add_vhost @vhost
set_permissions @user, @vhost, [".*", ".*", ".*"]
on_exit(fn ->
delete_vhost @vhost
end)
{
:ok,
opts: %{
quiet: true,
node: get_rabbit_hostname,
timeout: context[:test_timeout] || @default_timeout,
param: @vhost
}
}
end
@tag test_timeout: :infinity
test "return bad_info_key on a single bad arg", context do
capture_io(fn ->
assert ListExchangesCommand.run(["quack"], context[:opts]) ==
{:error, {:bad_info_key, [:quack]}}
end)
end
@tag test_timeout: :infinity
test "multiple bad args return a list of bad info key values", context do
capture_io(fn ->
assert ListExchangesCommand.run(["quack", "oink"], context[:opts]) ==
{:error, {:bad_info_key, [:quack, :oink]}}
end)
end
@tag test_timeout: :infinity
test "return bad_info_key on mix of good and bad args", context do
capture_io(fn ->
assert ListExchangesCommand.run(["quack", "type"], context[:opts]) ==
{:error, {:bad_info_key, [:quack]}}
assert ListExchangesCommand.run(["name", "oink"], context[:opts]) ==
{:error, {:bad_info_key, [:oink]}}
assert ListExchangesCommand.run(["name", "oink", "type"], context[:opts]) ==
{:error, {:bad_info_key, [:oink]}}
end)
end
@tag test_timeout: 0
test "zero timeout causes command to return badrpc", context do
capture_io(fn ->
assert ListExchangesCommand.run([], context[:opts]) ==
[{:badrpc, {:timeout, 0.0}}]
end)
end
test "show default exchanges by default", context do
capture_io(fn ->
assert MapSet.new(ListExchangesCommand.run(["name"], context[:opts])) ==
MapSet.new(for {ex_name, _ex_type} <- @default_exchanges, do: [name: ex_name])
end)
end
test "no info keys returns name and type", context do
exchange_name = "test_exchange"
declare_exchange(exchange_name, @vhost)
capture_io(fn ->
assert MapSet.new(ListExchangesCommand.run([], context[:opts])) ==
MapSet.new(
for({ex_name, ex_type} <- @default_exchanges, do: [name: ex_name, type: ex_type]) ++
[[name: exchange_name, type: :direct]])
end)
end
test "list multiple excahnges", context do
declare_exchange("test_exchange_1", @vhost, :direct)
declare_exchange("test_exchange_2", @vhost, :fanout)
capture_io(fn ->
non_default_exchanges = ListExchangesCommand.run(["name", "type"], context[:opts])
|> without_default_exchanges
assert_set_equal(
non_default_exchanges,
[[name: "test_exchange_1", type: :direct],
[name: "test_exchange_2", type: :fanout]])
end)
end
def assert_set_equal(one, two) do
assert MapSet.new(one) == MapSet.new(two)
end
test "info keys filter single key", context do
declare_exchange("test_exchange_1", @vhost)
declare_exchange("test_exchange_2", @vhost)
capture_io(fn ->
non_default_exchanges = ListExchangesCommand.run(["name"], context[:opts])
|> without_default_exchanges
assert_set_equal(
non_default_exchanges,
[[name: "test_exchange_1"],
[name: "test_exchange_2"]])
end)
end
test "info keys add additional keys", context do
declare_exchange("durable_exchange", @vhost, :direct, true)
declare_exchange("auto_delete_exchange", @vhost, :fanout, false, true)
capture_io(fn ->
non_default_exchanges = ListExchangesCommand.run(["name", "type", "durable", "auto_delete"], context[:opts])
|> without_default_exchanges
assert_set_equal(
non_default_exchanges,
[[name: "auto_delete_exchange", type: :fanout, durable: false, auto_delete: true],
[name: "durable_exchange", type: :direct, durable: true, auto_delete: false]])
end)
end
test "specifying a vhost returns the targeted vhost exchanges", context do
other_vhost = "other_vhost"
add_vhost other_vhost
on_exit(fn ->
delete_vhost other_vhost
end)
declare_exchange("test_exchange_1", @vhost)
declare_exchange("test_exchange_2", other_vhost)
capture_io(fn ->
non_default_exchanges1 = ListExchangesCommand.run(["name"], context[:opts])
|> without_default_exchanges
non_default_exchanges2 = ListExchangesCommand.run(["name"], %{context[:opts] | :param => other_vhost})
|> without_default_exchanges
assert non_default_exchanges1 == [[name: "test_exchange_1"]]
assert non_default_exchanges2 == [[name: "test_exchange_2"]]
end)
end
defp without_default_exchanges(xs) do
Enum.filter(xs,
fn(x) ->
not Enum.member?(default_exchange_names(), x[:name])
end)
end
end

View File

@ -0,0 +1,214 @@
defmodule ListQueuesCommandTest do
use ExUnit.Case, async: false
import ExUnit.CaptureIO
import TestHelper
@vhost "test1"
@user "guest"
@root "/"
@default_timeout 15000
setup_all do
:net_kernel.start([:rabbitmqctl, :shortnames])
:net_kernel.connect_node(get_rabbit_hostname)
reset_vm_memory_high_watermark()
delete_all_queues()
close_all_connections()
on_exit([], fn ->
delete_all_queues()
close_all_connections()
:erlang.disconnect_node(get_rabbit_hostname)
:net_kernel.stop()
end)
:ok
end
setup context do
add_vhost @vhost
set_permissions @user, @vhost, [".*", ".*", ".*"]
on_exit(fn ->
delete_vhost @vhost
end)
{
:ok,
opts: %{
quiet: true,
node: get_rabbit_hostname,
timeout: context[:test_timeout] || @default_timeout,
param: @vhost
}
}
end
@tag test_timeout: 30000
test "return bad_info_key on a single bad arg", context do
capture_io(fn ->
assert ListQueuesCommand.run(["quack"], context[:opts]) ==
{:error, {:bad_info_key, [:quack]}}
end)
end
@tag test_timeout: 30000
test "multiple bad args return a list of bad info key values", context do
capture_io(fn ->
assert ListQueuesCommand.run(["quack", "oink"], context[:opts]) ==
{:error, {:bad_info_key, [:quack, :oink]}}
end)
end
@tag test_timeout: 30000
test "return bad_info_key on mix of good and bad args", context do
capture_io(fn ->
assert ListQueuesCommand.run(["quack", "messages"], context[:opts]) ==
{:error, {:bad_info_key, [:quack]}}
assert ListQueuesCommand.run(["name", "oink"], context[:opts]) ==
{:error, {:bad_info_key, [:oink]}}
assert ListQueuesCommand.run(["name", "oink", "messages"], context[:opts]) ==
{:error, {:bad_info_key, [:oink]}}
end)
end
@tag test_timeout: 0
test "zero timeout causes command to return badrpc", context do
capture_io(fn ->
assert ListQueuesCommand.run([], context[:opts]) ==
[{:badrpc, {:timeout, 0.0}}]
end)
end
@tag test_timeout: 1
test "command timeout (several thousands queues in 1ms) return badrpc with timeout value in seconds", context do
# we assume it will take longer than 1 ms to list thousands of queues
n = 5000
for i <- 1..n do
declare_queue("test_queue_" <> Integer.to_string(i), @vhost)
end
capture_io(fn ->
assert ListQueuesCommand.run([], context[:opts]) ==
[{:badrpc, {:timeout, 0.001}}]
end)
for i <- 1..n do
delete_queue("test_queue_" <> Integer.to_string(i), @vhost)
end
end
@tag test_timeout: 5000
test "no info keys returns names and message count", context do
queue_name = "test_queue"
message_count = 3
declare_queue(queue_name, @vhost)
publish_messages(queue_name, 3)
capture_io(fn ->
assert ListQueuesCommand.run([], context[:opts]) ==
[[name: queue_name, messages: message_count]]
end)
end
@tag test_timeout: 5000
test "return multiple queues", context do
declare_queue("test_queue_1", @vhost)
publish_messages("test_queue_1", 3)
declare_queue("test_queue_2", @vhost)
publish_messages("test_queue_2", 1)
capture_io(fn ->
assert ListQueuesCommand.run([], context[:opts]) ==
[[name: "test_queue_1", messages: 3],
[name: "test_queue_2", messages: 1]]
end)
end
@tag test_timeout: 5000
test "info keys filter single key", context do
declare_queue("test_queue_1", @vhost)
declare_queue("test_queue_2", @vhost)
capture_io(fn ->
assert ListQueuesCommand.run(["name"], context[:opts]) ==
[[name: "test_queue_1"],
[name: "test_queue_2"]]
end)
end
@tag test_timeout: 5000
test "info keys add additional keys", context do
declare_queue("durable_queue", @vhost, true)
publish_messages("durable_queue", 3)
declare_queue("auto_delete_queue", @vhost, false, true)
publish_messages("auto_delete_queue", 1)
capture_io(fn ->
assert Keyword.equal?(
ListQueuesCommand.run(["name", "messages", "durable", "auto_delete"], context[:opts]),
[[name: "durable_queue", messages: 3, durable: true, auto_delete: false],
[name: "auto_delete_queue", messages: 1, durable: false, auto_delete: true]])
end)
end
@tag test_timeout: 5000
test "info keys order is preserved", context do
declare_queue("durable_queue", @vhost, true)
publish_messages("durable_queue", 3)
declare_queue("auto_delete_queue", @vhost, false, true)
publish_messages("auto_delete_queue", 1)
capture_io(fn ->
assert Keyword.equal?(
ListQueuesCommand.run(["messages", "durable", "name", "auto_delete"], context[:opts]),
[[messages: 3, durable: true, name: "durable_queue", auto_delete: false],
[messages: 1, durable: false, name: "auto_delete_queue", auto_delete: true]])
end)
end
@tag test_timeout: 5000
test "specifying a vhost returns the targeted vhost queues", context do
other_vhost = "other_vhost"
add_vhost other_vhost
on_exit(fn ->
delete_vhost other_vhost
end)
declare_queue("test_queue_1", @vhost)
declare_queue("test_queue_2", other_vhost)
capture_io(fn ->
assert ListQueuesCommand.run(["name"], context[:opts]) == [[name: "test_queue_1"]]
assert ListQueuesCommand.run(["name"], %{context[:opts] | :param => other_vhost}) == [[name: "test_queue_2"]]
end)
end
# TODO: list online/offline queues. Require cluster add/remove
# test "list online queues do not show offline queues", context do
# other_node = @secondary_node
# declare_queue("online_queue", @vhost, true)
# publish_messages("online_queue", 3)
# #declare on another node
# declare_queue_on_node(other_node, "offline_queue", @vhost, true)
# publish_messages("offline_queue", 3)
# stop_node(other_node)
# assert ListQueuesCommand.run(["name"], %{context[:opts] | online: true}) == [[name: "online_queue"]]
# end
# test "list offline queues do not show online queues", context do
# other_node = @secondary_node
# declare_queue("online_queue", @vhost, true)
# publish_messages("online_queue", 3)
# #declare on another node
# declare_queue_on_node(other_node, "offline_queue", @vhost, true)
# publish_messages("offline_queue", 3)
# stop_node(other_node)
# assert ListQueuesCommand.run(["name"], %{context[:opts] | offline: true}) == [[name: "offline_queue"]]
# end
def publish_messages(name, count) do
with_channel(@vhost, fn(channel) ->
AMQP.Queue.purge(channel, name)
for i <- 1..count do
AMQP.Basic.publish(channel, "", name,
"test_message" <> Integer.to_string(i))
end
AMQP.Confirm.wait_for_confirms(channel, 30)
end)
end
end

View File

@ -64,8 +64,10 @@ defmodule ListUserPermissionsCommandTest do
@tag test_timeout: :infinity, username: "guest"
test "valid user returns a list of permissions", context do
capture_io(fn ->
assert(ListUserPermissionsCommand.run(
[context[:username]], context[:opts]) == context[:result])
results = ListUserPermissionsCommand.run([context[:username]], context[:opts])
assert Enum.all?(context[:result], fn(perm) ->
Enum.find(results, fn(found) -> found == perm end)
end)
end)
end
@ -92,10 +94,10 @@ defmodule ListUserPermissionsCommandTest do
@tag test_timeout: 30, username: "guest"
test "long user-defined timeout doesn't interfere with operation", context do
capture_io(fn ->
assert ListUserPermissionsCommand.run(
[context[:username]],
context[:opts]
) == context[:result]
results = ListUserPermissionsCommand.run([context[:username]], context[:opts])
Enum.all?(context[:result], fn(perm) ->
Enum.find(results, fn(found) -> found == perm end)
end)
end)
end

View File

@ -51,13 +51,13 @@ defmodule ListUsersCommandTest do
assert ListUsersCommand.run(["extra"], %{}) == {:too_many_args, ["extra"]}
end
@tag test_timeout: :infinity
@tag test_timeout: 15
test "On a successful query, return an array of lists of tuples", context do
capture_io(fn ->
matches_found = ListUsersCommand.run([], context[:opts])
assert Enum.all?(matches_found, fn(user) ->
Enum.find(context[:std_result], fn(found) -> found == user end)
assert Enum.all?(context[:std_result], fn(user) ->
Enum.find(matches_found, fn(found) -> found == user end)
end)
end)
end
@ -74,8 +74,8 @@ defmodule ListUsersCommandTest do
capture_io(fn ->
matches_found = ListUsersCommand.run([], context[:opts])
assert Enum.all?(matches_found, fn(user) ->
Enum.find(context[:std_result], fn(found) -> found == user end)
assert Enum.all?(context[:std_result], fn(user) ->
Enum.find(matches_found, fn(found) -> found == user end)
end)
end)
end

View File

@ -93,8 +93,8 @@ defmodule ListVhostsCommandTest do
capture_io(fn ->
matches_found = ListVhostsCommand.run([], context[:opts])
assert Enum.all?(matches_found, fn(vhost) ->
Enum.find(context[:name_result], fn(found) -> found == vhost end)
assert Enum.all?(context[:name_result], fn(vhost) ->
Enum.find(matches_found, fn(found) -> found == vhost end)
end)
end)
end
@ -104,10 +104,8 @@ defmodule ListVhostsCommandTest do
# checks to ensure that all expected vhosts are in the results
capture_io(fn ->
matches_found = ListVhostsCommand.run(["name"], context[:opts])
assert matches_found
|> Enum.all?(fn(vhost) ->
Enum.find(context[:name_result], fn(found) -> found == vhost end)
assert Enum.all?(context[:name_result], fn(vhost) ->
Enum.find(matches_found, fn(found) -> found == vhost end)
end)
end)
end
@ -116,8 +114,10 @@ defmodule ListVhostsCommandTest do
test "with the tracing tag, print just say if tracing is on", context do
# checks to ensure that all expected vhosts are in the results
capture_io(fn ->
found = ListVhostsCommand.run(["tracing"], context[:opts])
assert found == context[:tracing_result]
matches_found = ListVhostsCommand.run(["tracing"], context[:opts])
assert Enum.all?(context[:tracing_result], fn(vhost) ->
Enum.find(matches_found, fn(found) -> found == vhost end)
end)
end)
end
@ -154,16 +154,16 @@ defmodule ListVhostsCommandTest do
# checks to ensure that all expected vhosts are in the results
capture_io(fn ->
matches_found = ListVhostsCommand.run(["name", "tracing"], context[:opts])
assert Enum.all?(matches_found, fn(vhost) ->
Enum.find(context[:full_result], fn(found) -> found == vhost end)
assert Enum.all?(context[:full_result], fn(vhost) ->
Enum.find(matches_found, fn(found) -> found == vhost end)
end)
end)
# checks to ensure that all expected vhosts are in the results
capture_io(fn ->
matches_found = ListVhostsCommand.run(["tracing", "name"], context[:opts])
assert Enum.all?(matches_found, fn(vhost) ->
Enum.find(context[:transposed_result], fn(found) -> found == vhost end)
assert Enum.all?(context[:transposed_result], fn(vhost) ->
Enum.find(matches_found, fn(found) -> found == vhost end)
end)
end)
end
@ -172,9 +172,9 @@ defmodule ListVhostsCommandTest do
test "duplicate args do not produce duplicate entries", context do
# checks to ensure that all expected vhosts are in the results
capture_io(fn ->
assert ListVhostsCommand.run(["name", "name"], context[:opts])
|> Enum.all?(fn(vhost) ->
Enum.find(context[:name_result], fn(found) -> found == vhost end)
matches_found = ListVhostsCommand.run(["name", "name"], context[:opts])
assert Enum.all?(context[:name_result], fn(vhost) ->
Enum.find(matches_found, fn(found) -> found == vhost end)
end)
end)
end
@ -183,9 +183,9 @@ defmodule ListVhostsCommandTest do
test "sufficiently long timeouts don't interfere with results", context do
# checks to ensure that all expected vhosts are in the results
capture_io(fn ->
assert ListVhostsCommand.run(["name", "tracing"], context[:opts])
|> Enum.all?(fn(vhost) ->
Enum.find(context[:full_result], fn(found) -> found == vhost end)
matches_found = ListVhostsCommand.run(["name", "tracing"], context[:opts])
assert Enum.all?(context[:full_result], fn(vhost) ->
Enum.find(matches_found, fn(found) -> found == vhost end)
end)
end)
end

View File

@ -0,0 +1,84 @@
defmodule RpcStreamTest do
use ExUnit.Case, async: false
import TestHelper
setup_all do
:net_kernel.start([:rabbitmqctl, :shortnames])
:net_kernel.connect_node(get_rabbit_hostname)
on_exit([], fn ->
:erlang.disconnect_node(get_rabbit_hostname)
:net_kernel.stop()
end)
:ok
end
test "emit empty list" do
items = RpcStream.receive_list_items(Kernel.node, TestHelper, :emit_list, [[]], :infinity, [])
assert [] == items
end
test "emit list without filters" do
list = [:one, :two, :three]
items = RpcStream.receive_list_items(Kernel.node, TestHelper, :emit_list, [list], :infinity, [])
assert list == items
end
test "emit list with filters" do
list = [[one: 1, two: 2, three: 3], [one: 11, two: 12, three: 13]]
items = RpcStream.receive_list_items(Kernel.node, TestHelper, :emit_list, [list], :infinity, [:one, :two])
assert [[one: 1, two: 2], [one: 11, two: 12]] == items
end
test "emit list of lists with filters" do
list = [[[one: 1, two: 2, three: 3], [one: 11, two: 12, three: 13]],
[[one: 21, two: 22, three: 23], [one: 31, two: 32, three: 33]]]
items = RpcStream.receive_list_items(Kernel.node, TestHelper, :emit_list, [list], :infinity, [:one, :two])
assert [[[one: 1, two: 2], [one: 11, two: 12]], [[one: 21, two: 22], [one: 31, two: 32]]] == items
end
test "emission timeout 0 return badrpc" do
items = RpcStream.receive_list_items(Kernel.node, TestHelper, :emit_list, [[]], 0, [])
assert [{:badrpc, {:timeout, 0.0}}] == items
end
test "emission timeout return badrpc with timeout value in seconds" do
timeout_fun = fn(x) -> :timer.sleep(1000); x end
items = RpcStream.receive_list_items(Kernel.node, TestHelper, :emit_list_map, [[1,2,3], timeout_fun], 100, [])
assert [{:badrpc, {:timeout, 0.1}}] == items
end
test "emission timeout in progress return badrpc with timeout value in seconds as last element" do
timeout_fun = fn(x) -> :timer.sleep(100); x end
items = RpcStream.receive_list_items(Kernel.node, TestHelper, :emit_list_map, [[1,2,3], timeout_fun], 200, [])
assert [1, {:badrpc, {:timeout, 0.2}}] == items
end
test "parallel emission do not mix values" do
{:ok, agent} = Agent.start_link(fn() -> :init end)
list1 = [:one, :two, :three]
list2 = [:dog, :cat, :pig]
# Adding timeout to make sure emissions are executed in parallel
timeout_fun = fn(x) -> :timer.sleep(10); x end
Agent.update(agent,
fn(:init) ->
RpcStream.receive_list_items(Kernel.node, TestHelper, :emit_list_map, [list2, timeout_fun], :infinity, [])
end)
items1 = RpcStream.receive_list_items(Kernel.node, TestHelper, :emit_list_map, [list1, timeout_fun], :infinity, [])
items2 = Agent.get(agent, fn(x) -> x end)
assert items1 == list1
assert items2 == list2
end
end

View File

@ -24,8 +24,11 @@ defmodule SetVmMemoryHighWatermarkCommandTest do
setup_all do
:net_kernel.start([:rabbitmqctl, :shortnames])
:net_kernel.connect_node(get_rabbit_hostname)
reset_vm_memory_high_watermark()
on_exit([], fn ->
reset_vm_memory_high_watermark()
:erlang.disconnect_node(get_rabbit_hostname)
:net_kernel.stop()
end)

View File

@ -79,6 +79,27 @@ defmodule TestHelper do
:rpc.call(get_rabbit_hostname, :rabbit_auth_backend_internal, :set_permissions, [user, vhost, conf, write, read])
end
def declare_queue(name, vhost, durable \\ false, auto_delete \\ false, args \\ [], owner \\ :none) do
queue_name = :rabbit_misc.r(vhost, :queue, name)
:rpc.call(get_rabbit_hostname,
:rabbit_amqqueue, :declare,
[queue_name, durable, auto_delete, args, owner])
end
def delete_queue(name, vhost) do
queue_name = :rabbit_misc.r(vhost, :queue, name)
:rpc.call(get_rabbit_hostname,
:rabbit_amqqueue, :delete,
[queue_name, false, false])
end
def declare_exchange(name, vhost, type \\ :direct, durable \\ false, auto_delete \\ false, internal \\ false, args \\ []) do
exchange_name = :rabbit_misc.r(vhost, :exchange, name)
:rpc.call(get_rabbit_hostname,
:rabbit_exchange, :declare,
[exchange_name, type, durable, auto_delete, internal, args])
end
def list_permissions(vhost) do
:rpc.call(
get_rabbit_hostname,
@ -100,4 +121,90 @@ defmodule TestHelper do
def error_check(cmd_line, code) do
assert catch_exit(RabbitMQCtl.main(cmd_line)) == {:shutdown, code}
end
def with_channel(vhost, fun) do
with_connection(vhost,
fn(conn) ->
{:ok, chan} = AMQP.Channel.open(conn)
AMQP.Confirm.select(chan)
fun.(chan)
end)
end
def with_connection(vhost, fun) do
{:ok, conn} = AMQP.Connection.open(virtual_host: vhost)
ExUnit.Callbacks.on_exit(fn ->
try do
:amqp_connection.close(conn, 1000)
catch
:exit, _ -> :ok
end
end)
fun.(conn)
end
def close_all_connections() do
# we intentionally use connections_local/0 here because connections/0,
# the cluster-wide version, loads some bits around cluster membership
# that are not normally ready with a single node. MK.
#
# when/if we decide to test
# this project against a cluster of nodes this will need revisiting. MK.
for pid <- :rabbit_networking.connections_local(), do: :rabbit_networking.close_connection(pid, :force_closed)
end
def delete_all_queues() do
try do
immediately_delete_all_queues(:rabbit_amqqueue.list())
catch
_, _ -> :ok
end
end
def delete_all_queues(vhost) do
try do
immediately_delete_all_queues(:rabbit_amqqueue.list(vhost))
catch
_, _ -> :ok
end
end
def immediately_delete_all_queues(qs) do
for q <- qs do
try do
:rpc.call(
get_rabbit_hostname,
:rabbit_amqeueue,
:delete,
[q, false, false],
5000
)
catch
_, _ -> :ok
end
end
end
def reset_vm_memory_high_watermark() do
try do
:rpc.call(
get_rabbit_hostname,
:vm_memory_monitor,
:set_vm_memory_high_watermark,
[0.4],
5000
)
catch
_, _ -> :ok
end
end
def emit_list(list, ref, pid) do
emit_list_map(list, &(&1), ref, pid)
end
def emit_list_map(list, fun, ref, pid) do
:rabbit_control_misc.emitting_map(pid, ref, fun, list)
end
end