Merge pull request #79 from rabbitmq/rabbitmq-cli-69
Purge queue command
This commit is contained in:
		
						commit
						bc5ff3bf26
					
				|  | @ -0,0 +1,61 @@ | |||
| ## The contents of this file are subject to the Mozilla Public License | ||||
| ## Version 1.1 (the "License"); you may not use this file except in | ||||
| ## compliance with the License. You may obtain a copy of the License | ||||
| ## at http://www.mozilla.org/MPL/ | ||||
| ## | ||||
| ## Software distributed under the License is distributed on an "AS IS" | ||||
| ## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See | ||||
| ## the License for the specific language governing rights and | ||||
| ## limitations under the License. | ||||
| ## | ||||
| ## The Original Code is RabbitMQ. | ||||
| ## | ||||
| ## The Initial Developer of the Original Code is GoPivotal, Inc. | ||||
| ## Copyright (c) 2007-2016 Pivotal Software, Inc.  All rights reserved. | ||||
| 
 | ||||
| 
 | ||||
| defmodule RabbitMQ.CLI.Ctl.Commands.PurgeQueueCommand do | ||||
|   @behaviour RabbitMQ.CLI.CommandBehaviour | ||||
|   @flags [] | ||||
| 
 | ||||
|   def flags, do: [] | ||||
|   def switches, do: [] | ||||
|   def usage, do: "purge_queue <queue>" | ||||
| 
 | ||||
|   def run([queue], %{node: node_name, vhost: vhost, timeout: timeout}) do | ||||
|     res = :rabbit_misc.rpc_call(node_name, | ||||
|       :rabbit_amqqueue, :lookup, [:rabbit_misc.r(vhost, :queue, queue)], timeout) | ||||
| 
 | ||||
|     case res do | ||||
|       {:ok, q} -> purge(node_name, q, timeout) | ||||
|       _        -> res | ||||
|     end | ||||
|   end | ||||
| 
 | ||||
|   defp purge(node_name, q, timeout) do | ||||
|     res = :rabbit_misc.rpc_call(node_name, :rabbit_amqqueue, :purge, [q], timeout) | ||||
|     case res do | ||||
|       {:ok, _message_count} -> :ok | ||||
|       _                     -> res | ||||
|     end | ||||
|   end | ||||
| 
 | ||||
|   def merge_defaults(args, opts) do | ||||
|     default_opts = Map.merge(opts, %{vhost: "/"}) | ||||
|     {args, default_opts} | ||||
|   end | ||||
| 
 | ||||
|   def validate(args, _) when length(args) > 1 do | ||||
|     {:validation_failure, :too_many_args} | ||||
|   end | ||||
| 
 | ||||
|   def validate([], _) do | ||||
|     {:validation_failure, :not_enough_args} | ||||
|   end | ||||
| 
 | ||||
|   def validate(_, _), do: :ok | ||||
| 
 | ||||
|   def banner([queue], %{vhost: vhost}) do | ||||
|     "Purging queue '#{queue}' in vhost '#{vhost}' ..." | ||||
|   end | ||||
| end | ||||
|  | @ -92,9 +92,9 @@ defmodule ListQueuesCommandTest do | |||
|   @tag test_timeout: 5000 | ||||
|   test "run: return multiple queues", context do | ||||
|     declare_queue("test_queue_1", @vhost) | ||||
|     publish_messages("test_queue_1", 3) | ||||
|     publish_messages(@vhost, "test_queue_1", 3) | ||||
|     declare_queue("test_queue_2", @vhost) | ||||
|     publish_messages("test_queue_2", 1) | ||||
|     publish_messages(@vhost, "test_queue_2", 1) | ||||
|     assert Keyword.equal?(run_command_to_list(@command, [["name", "messages"], context[:opts]]), | ||||
|       [[name: "test_queue_1", messages: 3], | ||||
|        [name: "test_queue_2", messages: 1]]) | ||||
|  | @ -112,9 +112,9 @@ defmodule ListQueuesCommandTest do | |||
|   @tag test_timeout: 5000 | ||||
|   test "run: info keys add additional keys", context do | ||||
|     declare_queue("durable_queue", @vhost, true) | ||||
|     publish_messages("durable_queue", 3) | ||||
|     publish_messages(@vhost, "durable_queue", 3) | ||||
|     declare_queue("auto_delete_queue", @vhost, false, true) | ||||
|     publish_messages("auto_delete_queue", 1) | ||||
|     publish_messages(@vhost, "auto_delete_queue", 1) | ||||
|     assert Keyword.equal?( | ||||
|       run_command_to_list(@command, [["name", "messages", "durable", "auto_delete"], context[:opts]]), | ||||
|       [[name: "durable_queue", messages: 3, durable: true, auto_delete: false], | ||||
|  | @ -124,9 +124,9 @@ defmodule ListQueuesCommandTest do | |||
|   @tag test_timeout: 5000 | ||||
|   test "run: info keys order is preserved", context do | ||||
|     declare_queue("durable_queue", @vhost, true) | ||||
|     publish_messages("durable_queue", 3) | ||||
|     publish_messages(@vhost, "durable_queue", 3) | ||||
|     declare_queue("auto_delete_queue", @vhost, false, true) | ||||
|     publish_messages("auto_delete_queue", 1) | ||||
|     publish_messages(@vhost, "auto_delete_queue", 1) | ||||
|     assert Keyword.equal?( | ||||
|       run_command_to_list(@command, [["messages", "durable", "name", "auto_delete"], context[:opts]]), | ||||
|       [[messages: 3, durable: true, name: "durable_queue", auto_delete: false], | ||||
|  | @ -150,10 +150,10 @@ defmodule ListQueuesCommandTest do | |||
|   # test "list online queues do not show offline queues", context do | ||||
|   #   other_node = @secondary_node | ||||
|   #   declare_queue("online_queue", @vhost, true) | ||||
|   #   publish_messages("online_queue", 3) | ||||
|   #   publish_messages(@vhost, "online_queue", 3) | ||||
|   #   #declare on another node | ||||
|   #   declare_queue_on_node(other_node, "offline_queue", @vhost, true) | ||||
|   #   publish_messages("offline_queue", 3) | ||||
|   #   publish_messages(@vhost, "offline_queue", 3) | ||||
|   #   stop_node(other_node) | ||||
| 
 | ||||
|   #   assert run_command_to_list(@command, [["name"], %{context[:opts] | online: true}]) == [[name: "online_queue"]] | ||||
|  | @ -162,24 +162,12 @@ defmodule ListQueuesCommandTest do | |||
|   # test "list offline queues do not show online queues", context do | ||||
|   #   other_node = @secondary_node | ||||
|   #   declare_queue("online_queue", @vhost, true) | ||||
|   #   publish_messages("online_queue", 3) | ||||
|   #   publish_messages(@vhost, "online_queue", 3) | ||||
|   #   #declare on another node | ||||
|   #   declare_queue_on_node(other_node, "offline_queue", @vhost, true) | ||||
|   #   publish_messages("offline_queue", 3) | ||||
|   #   publish_messages(@vhost, "offline_queue", 3) | ||||
|   #   stop_node(other_node) | ||||
| 
 | ||||
|   #   assert run_command_to_list(@command, [["name"], %{context[:opts] | offline: true}]) == [[name: "offline_queue"]] | ||||
|   # end | ||||
| 
 | ||||
|   def publish_messages(name, count) do | ||||
|     with_channel(@vhost, fn(channel) -> | ||||
|       AMQP.Queue.purge(channel, name) | ||||
|       for i <- 1..count do | ||||
|         AMQP.Basic.publish(channel, "", name, | ||||
|                            "test_message" <> Integer.to_string(i)) | ||||
|       end | ||||
|       AMQP.Confirm.wait_for_confirms(channel, 30) | ||||
|     end) | ||||
|   end | ||||
| 
 | ||||
| end | ||||
|  |  | |||
|  | @ -0,0 +1,102 @@ | |||
| ## The contents of this file are subject to the Mozilla Public License | ||||
| ## Version 1.1 (the "License"); you may not use this file except in | ||||
| ## compliance with the License. You may obtain a copy of the License | ||||
| ## at http://www.mozilla.org/MPL/ | ||||
| ## | ||||
| ## Software distributed under the License is distributed on an "AS IS" | ||||
| ## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See | ||||
| ## the License for the specific language governing rights and | ||||
| ## limitations under the License. | ||||
| ## | ||||
| ## The Original Code is RabbitMQ. | ||||
| ## | ||||
| ## The Initial Developer of the Original Code is GoPivotal, Inc. | ||||
| ## Copyright (c) 2007-2016 Pivotal Software, Inc.  All rights reserved. | ||||
| 
 | ||||
| 
 | ||||
| defmodule PurgeQueueCommandTest do | ||||
|   use ExUnit.Case | ||||
|   import TestHelper | ||||
| 
 | ||||
|   @command RabbitMQ.CLI.Ctl.Commands.PurgeQueueCommand | ||||
|   @user "guest" | ||||
|   @vhost "purge-queue-vhost" | ||||
| 
 | ||||
|   setup_all do | ||||
|     RabbitMQ.CLI.Distribution.start() | ||||
|     :net_kernel.connect_node(get_rabbit_hostname) | ||||
| 
 | ||||
|     on_exit([], fn -> | ||||
|       :erlang.disconnect_node(get_rabbit_hostname) | ||||
|       :net_kernel.stop() | ||||
|     end) | ||||
| 
 | ||||
|     :ok | ||||
|   end | ||||
| 
 | ||||
|   setup context do | ||||
|     {:ok, opts: %{ | ||||
|         node: get_rabbit_hostname, | ||||
|         vhost: @vhost, | ||||
|         timeout: context[:test_timeout] | ||||
|       }} | ||||
|   end | ||||
| 
 | ||||
|   @tag test_timeout: 15 | ||||
|   test "request to an existent queue on active node succeeds", context do | ||||
|     add_vhost @vhost | ||||
|     set_permissions @user, @vhost, [".*", ".*", ".*"] | ||||
|     on_exit(context, fn -> delete_vhost(@vhost) end) | ||||
| 
 | ||||
|     q = "foo" | ||||
|     n = 20 | ||||
| 
 | ||||
|     declare_queue(q, @vhost) | ||||
|     assert message_count(@vhost, q) == 0 | ||||
| 
 | ||||
|     publish_messages(@vhost, q, n) | ||||
|     assert message_count(@vhost, q) == n | ||||
| 
 | ||||
|     assert @command.run([q], context[:opts]) == :ok | ||||
|     assert message_count(@vhost, q) == 0 | ||||
|   end | ||||
| 
 | ||||
|   @tag test_timeout: 15 | ||||
|   test "request to a non-existent queue on active node returns not found", context do | ||||
|     assert @command.run(["non-existent"], context[:opts]) == {:error, :not_found} | ||||
|   end | ||||
| 
 | ||||
|   @tag test_timeout: 0 | ||||
|   test "run: timeout causes command to return a bad RPC", context do | ||||
|     assert @command.run(["foo"], context[:opts]) == {:badrpc, :timeout} | ||||
|   end | ||||
| 
 | ||||
|   test "has no flags" do | ||||
|     assert @command.flags == [] | ||||
|   end | ||||
| 
 | ||||
|   test "shows up in help" do | ||||
|     s = @command.usage() | ||||
|     assert s =~ ~r/purge_queue/ | ||||
|   end | ||||
| 
 | ||||
|   test "defaults to vhost /" do | ||||
|     assert @command.merge_defaults(["foo"], %{bar: "baz"}) == {["foo"], %{bar: "baz", vhost: "/"}} | ||||
|   end | ||||
| 
 | ||||
|   test "validate: with extra arguments returns an arg count error" do | ||||
|     assert @command.validate(["queue-name", "extra"], %{}) == {:validation_failure, :too_many_args} | ||||
|   end | ||||
| 
 | ||||
|   test "validate: with no arguments returns an arg count error" do | ||||
|     assert @command.validate([], %{}) == {:validation_failure, :not_enough_args} | ||||
|   end | ||||
| 
 | ||||
|   test "validate: with correct args returns ok" do | ||||
|     assert @command.validate(["q"], %{}) == :ok | ||||
|   end | ||||
| 
 | ||||
|   test "banner informs that vhost's queue is purged" do | ||||
|     assert @command.banner(["my-q"], %{vhost: "/foo"}) == "Purging queue 'my-q' in vhost '/foo' ..." | ||||
|   end | ||||
| end | ||||
|  | @ -165,6 +165,24 @@ defmodule TestHelper do | |||
|     fun.(conn) | ||||
|   end | ||||
| 
 | ||||
|   def message_count(vhost, queue_name) do | ||||
|     with_channel(vhost, fn(channel) -> | ||||
|       {:ok, %{message_count: mc}} = AMQP.Queue.declare(channel, queue_name) | ||||
|       mc | ||||
|     end) | ||||
|   end | ||||
| 
 | ||||
|   def publish_messages(vhost, queue_name, count) do | ||||
|     with_channel(vhost, fn(channel) -> | ||||
|       AMQP.Queue.purge(channel, queue_name) | ||||
|       for i <- 1..count do | ||||
|         AMQP.Basic.publish(channel, "", queue_name, | ||||
|                            "test_message" <> Integer.to_string(i)) | ||||
|       end | ||||
|       AMQP.Confirm.wait_for_confirms(channel, 30) | ||||
|     end) | ||||
|   end | ||||
| 
 | ||||
|   def close_all_connections(node) do | ||||
|     # we intentionally use connections_local/0 here because connections/0, | ||||
|     # the cluster-wide version, loads some bits around cluster membership | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue