RMQ-1263: Add a --force option to rabbitmqctl delete_queue command
RMQ-1263: Add a --force option to rabbitmqctl delete_queue command. This work was originally done by Iliia Khaprov <iliia.khaprov@broadcom.net>. --------- Co-authored-by: Iliia Khaprov <iliia.khaprov@broadcom.net> Co-authored-by: Michael Klishin <klishinm@vmware.com> (cherry picked from commit d9522d3ee708250cc84443af5c3556b14f7c5ab9)
This commit is contained in:
parent
f1396b5695
commit
d2f66ced1b
|
@ -9,13 +9,13 @@ defmodule RabbitMQ.CLI.Ctl.Commands.DeleteQueueCommand do
|
|||
|
||||
@behaviour RabbitMQ.CLI.CommandBehaviour
|
||||
|
||||
def switches(), do: [if_empty: :boolean, if_unused: :boolean, timeout: :integer]
|
||||
def switches(), do: [if_empty: :boolean, if_unused: :boolean, force: :boolean, timeout: :integer]
|
||||
def aliases(), do: [e: :if_empty, u: :if_unused, t: :timeout]
|
||||
|
||||
def merge_defaults(args, opts) do
|
||||
{
|
||||
args,
|
||||
Map.merge(%{if_empty: false, if_unused: false, vhost: "/"}, opts)
|
||||
Map.merge(%{if_empty: false, if_unused: false, force: false, vhost: "/"}, opts)
|
||||
}
|
||||
end
|
||||
|
||||
|
@ -46,37 +46,49 @@ defmodule RabbitMQ.CLI.Ctl.Commands.DeleteQueueCommand do
|
|||
vhost: vhost,
|
||||
if_empty: if_empty,
|
||||
if_unused: if_unused,
|
||||
force: force,
|
||||
timeout: timeout
|
||||
}) do
|
||||
## Generate queue resource name from queue name and vhost
|
||||
queue_resource = :rabbit_misc.r(vhost, :queue, qname)
|
||||
user = if force, do: RabbitMQ.CLI.Common.internal_user, else: "cli_user"
|
||||
## Lookup a queue on broker node using resource name
|
||||
case :rabbit_misc.rpc_call(node, :rabbit_amqqueue, :lookup, [queue_resource]) do
|
||||
{:ok, queue} ->
|
||||
## Delete queue
|
||||
:rabbit_misc.rpc_call(
|
||||
node,
|
||||
:rabbit_amqqueue,
|
||||
:delete_with,
|
||||
[queue, if_unused, if_empty, "cli_user"],
|
||||
timeout
|
||||
)
|
||||
case :rabbit_misc.rpc_call(node,
|
||||
:rabbit_amqqueue,
|
||||
:delete_with,
|
||||
[queue, if_unused, if_empty, user],
|
||||
timeout
|
||||
) do
|
||||
{:ok, _} = ok -> ok
|
||||
|
||||
{:badrpc, {:EXIT, {:amqp_error, :resource_locked, _, :none}}} ->
|
||||
{:error, :protected}
|
||||
|
||||
other_error -> other_error
|
||||
end
|
||||
|
||||
{:error, _} = error ->
|
||||
error
|
||||
end
|
||||
end
|
||||
|
||||
def output({:error, :protected}, _options) do
|
||||
{:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "The queue is locked or protected from deletion"}
|
||||
end
|
||||
|
||||
def output({:error, :not_found}, _options) do
|
||||
{:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "Queue not found"}
|
||||
{:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "No such queue was found"}
|
||||
end
|
||||
|
||||
def output({:error, :not_empty}, _options) do
|
||||
{:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "Queue is not empty"}
|
||||
{:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "The queue is not empty"}
|
||||
end
|
||||
|
||||
def output({:error, :in_use}, _options) do
|
||||
{:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "Queue is in use"}
|
||||
{:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), "The queue is in use"}
|
||||
end
|
||||
|
||||
def output({:ok, qlen}, _options) do
|
||||
|
@ -103,14 +115,15 @@ defmodule RabbitMQ.CLI.Ctl.Commands.DeleteQueueCommand do
|
|||
Enum.join(Enum.concat([if_empty_str, if_unused_str]), "and ") <> "..."
|
||||
end
|
||||
|
||||
def usage(), do: "delete_queue [--vhost <vhost>] <queue_name> [--if-empty|-e] [--if-unused|-u]"
|
||||
def usage(), do: "delete_queue [--vhost <vhost>] <queue_name> [--if-empty|-e] [--if-unused|-u] [--force]"
|
||||
|
||||
def usage_additional() do
|
||||
[
|
||||
["--vhost", "Virtual host name"],
|
||||
["<queue_name>", "name of the queue to delete"],
|
||||
["--if-empty", "delete the queue if it is empty (has no messages ready for delivery)"],
|
||||
["--if-unused", "delete the queue only if it has no consumers"]
|
||||
["--if-unused", "delete the queue only if it has no consumers"],
|
||||
["--force", "delete the queue even if it is protected"]
|
||||
]
|
||||
end
|
||||
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
%% This Source Code Form is subject to the terms of the Mozilla Public
|
||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
%%
|
||||
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
||||
%%
|
||||
|
||||
-module('Elixir.RabbitMQ.CLI.Common').
|
||||
|
||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
|
||||
-export([internal_user/0]).
|
||||
|
||||
internal_user() ->
|
||||
?INTERNAL_USER.
|
|
@ -25,16 +25,17 @@ defmodule DeleteQueueCommandTest do
|
|||
vhost: @vhost,
|
||||
timeout: context[:test_timeout],
|
||||
if_empty: false,
|
||||
if_unused: false
|
||||
if_unused: false,
|
||||
force: false
|
||||
}}
|
||||
end
|
||||
|
||||
test "merge_defaults: defaults can be overridden" do
|
||||
assert @command.merge_defaults([], %{}) ==
|
||||
{[], %{vhost: "/", if_empty: false, if_unused: false}}
|
||||
{[], %{vhost: "/", if_empty: false, if_unused: false, force: false}}
|
||||
|
||||
assert @command.merge_defaults([], %{vhost: "non_default", if_empty: true}) ==
|
||||
{[], %{vhost: "non_default", if_empty: true, if_unused: false}}
|
||||
{[], %{vhost: "non_default", if_empty: true, if_unused: false, force: false}}
|
||||
end
|
||||
|
||||
test "validate: providing no queue name fails validation", context do
|
||||
|
@ -76,6 +77,25 @@ defmodule DeleteQueueCommandTest do
|
|||
{:error, :not_found} = lookup_queue(q, @vhost)
|
||||
end
|
||||
|
||||
@tag test_timeout: 30000
|
||||
test "run: protected queue can be deleted only with --force", context do
|
||||
add_vhost(@vhost)
|
||||
set_permissions(@user, @vhost, [".*", ".*", ".*"])
|
||||
on_exit(context, fn -> delete_vhost(@vhost) end)
|
||||
|
||||
q = "foo"
|
||||
n = 20
|
||||
|
||||
declare_internal_queue(q, @vhost)
|
||||
publish_messages(@vhost, q, n)
|
||||
|
||||
assert @command.run([q], context[:opts]) == {:error, :protected}
|
||||
{:ok, _queue} = lookup_queue(q, @vhost)
|
||||
|
||||
assert @command.run([q], %{context[:opts] | force: true}) == {:ok, n}
|
||||
{:error, :not_found} = lookup_queue(q, @vhost)
|
||||
end
|
||||
|
||||
@tag test_timeout: 30000
|
||||
test "run: request to an existing crashed queue on active node succeeds", context do
|
||||
add_vhost(@vhost)
|
||||
|
@ -135,7 +155,7 @@ defmodule DeleteQueueCommandTest do
|
|||
|
||||
test "defaults to vhost /" do
|
||||
assert @command.merge_defaults(["foo"], %{bar: "baz"}) ==
|
||||
{["foo"], %{bar: "baz", vhost: "/", if_unused: false, if_empty: false}}
|
||||
{["foo"], %{bar: "baz", vhost: "/", if_unused: false, if_empty: false, force: false}}
|
||||
end
|
||||
|
||||
test "validate: with extra arguments returns an arg count error" do
|
||||
|
@ -152,13 +172,13 @@ defmodule DeleteQueueCommandTest do
|
|||
end
|
||||
|
||||
test "banner informs that vhost's queue is deleted" do
|
||||
assert @command.banner(["my-q"], %{vhost: "/foo", if_empty: false, if_unused: false}) ==
|
||||
assert @command.banner(["my-q"], %{vhost: "/foo", if_empty: false, if_unused: false, force: false}) ==
|
||||
"Deleting queue 'my-q' on vhost '/foo' ..."
|
||||
|
||||
assert @command.banner(["my-q"], %{vhost: "/foo", if_empty: true, if_unused: false}) ==
|
||||
assert @command.banner(["my-q"], %{vhost: "/foo", if_empty: true, if_unused: false, force: false}) ==
|
||||
"Deleting queue 'my-q' on vhost '/foo' if queue is empty ..."
|
||||
|
||||
assert @command.banner(["my-q"], %{vhost: "/foo", if_empty: true, if_unused: true}) ==
|
||||
assert @command.banner(["my-q"], %{vhost: "/foo", if_empty: true, if_unused: true, force: false}) ==
|
||||
"Deleting queue 'my-q' on vhost '/foo' if queue is empty and if queue is unused ..."
|
||||
end
|
||||
end
|
||||
|
|
|
@ -302,6 +302,34 @@ defmodule TestHelper do
|
|||
])
|
||||
end
|
||||
|
||||
def declare_internal_queue(
|
||||
name,
|
||||
vhost,
|
||||
durable \\ false,
|
||||
auto_delete \\ false,
|
||||
args \\ [],
|
||||
owner \\ :none
|
||||
) do
|
||||
queue_name = :rabbit_misc.r(vhost, :queue, name)
|
||||
|
||||
amqqueue = :amqqueue.new(
|
||||
queue_name,
|
||||
:none,
|
||||
durable,
|
||||
auto_delete,
|
||||
owner,
|
||||
args,
|
||||
vhost,
|
||||
%{})
|
||||
|
||||
internal_amqqueue = :amqqueue.make_internal(amqqueue)
|
||||
|
||||
:rpc.call(get_rabbit_hostname(), :rabbit_queue_type, :declare, [
|
||||
internal_amqqueue,
|
||||
get_rabbit_hostname()
|
||||
])
|
||||
end
|
||||
|
||||
def declare_stream(name, vhost) do
|
||||
declare_queue(name, vhost, true, false, [{"x-queue-type", :longstr, "stream"}])
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue