add test for ctl delete_queue for a stopped queue
This commit is contained in:
parent
1097ebe66e
commit
290c1c05a4
|
|
@ -72,7 +72,7 @@
|
|||
-export([prepend_extra_bcc/1]).
|
||||
-export([queue/1, queue_names/1]).
|
||||
|
||||
-export([kill_queue/2, kill_queue/3, kill_queue_hard/2]).
|
||||
-export([kill_queue/2, kill_queue/3, kill_queue_hard/2, kill_queue_hard/3]).
|
||||
|
||||
%% internal
|
||||
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
|
||||
|
|
@ -119,6 +119,7 @@
|
|||
-define(CONSUMER_INFO_KEYS,
|
||||
[queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
|
||||
active, activity_status, arguments]).
|
||||
-define(KILL_QUEUE_DELAY_INTERVAL, 100).
|
||||
|
||||
warn_file_limit() ->
|
||||
DurableQueues = find_recoverable_queues(),
|
||||
|
|
@ -2112,17 +2113,27 @@ is_queue_args_combination_permitted(Durable, Exclusive) ->
|
|||
|
||||
-spec kill_queue_hard(node(), name()) -> ok.
|
||||
kill_queue_hard(Node, QRes = #resource{kind = queue}) ->
|
||||
case kill_queue(Node, QRes) of
|
||||
kill_queue_hard(Node, QRes, boom).
|
||||
|
||||
-spec kill_queue_hard(node(), name(), atom()) -> ok.
|
||||
kill_queue_hard(Node, QRes = #resource{kind = queue}, Reason) ->
|
||||
case kill_queue(Node, QRes, Reason) of
|
||||
crashed -> ok;
|
||||
_NewPid -> timer:sleep(100),
|
||||
kill_queue_hard(Node, QRes)
|
||||
stopped -> ok;
|
||||
_NewPid -> timer:sleep(?KILL_QUEUE_DELAY_INTERVAL),
|
||||
kill_queue_hard(Node, QRes, Reason)
|
||||
end.
|
||||
|
||||
-spec kill_queue(node(), name()) -> ok.
|
||||
-spec kill_queue(node(), name()) -> pid() | crashed.
|
||||
kill_queue(Node, QRes = #resource{kind = queue}) ->
|
||||
kill_queue(Node, QRes, boom).
|
||||
|
||||
-spec kill_queue(node(), name(), atom()) -> ok.
|
||||
-spec kill_queue(node(), name(), atom()) -> pid() | crashed | stopped.
|
||||
kill_queue(Node, QRes = #resource{kind = queue}, Reason = shutdown) ->
|
||||
Pid1 = pid_or_crashed(Node, QRes),
|
||||
exit(Pid1, Reason),
|
||||
rabbit_control_misc:await_state(Node, QRes, stopped),
|
||||
stopped;
|
||||
kill_queue(Node, QRes = #resource{kind = queue}, Reason) ->
|
||||
Pid1 = pid_or_crashed(Node, QRes),
|
||||
exit(Pid1, Reason),
|
||||
|
|
|
|||
|
|
@ -94,6 +94,24 @@ defmodule DeleteQueueCommandTest do
|
|||
{:error, :not_found} = lookup_queue(q, @vhost)
|
||||
end
|
||||
|
||||
@tag test_timeout: 30000
|
||||
test "run: request to an existing stopped queue on active node succeeds", context do
|
||||
add_vhost(@vhost)
|
||||
set_permissions(@user, @vhost, [".*", ".*", ".*"])
|
||||
on_exit(context, fn -> delete_vhost(@vhost) end)
|
||||
|
||||
q = "bar"
|
||||
n = 20
|
||||
|
||||
declare_queue(q, @vhost, true)
|
||||
publish_messages(@vhost, q, n)
|
||||
q_resource = :rabbit_misc.r(@vhost, :queue, q)
|
||||
stop_queue(q_resource)
|
||||
|
||||
assert @command.run([q], context[:opts]) == {:ok, 0}
|
||||
{:error, :not_found} = lookup_queue(q, @vhost)
|
||||
end
|
||||
|
||||
@tag test_timeout: 30000
|
||||
test "run: request to a non-existent queue on active node returns not found", context do
|
||||
assert @command.run(["non-existent"], context[:opts]) == {:error, :not_found}
|
||||
|
|
|
|||
|
|
@ -583,6 +583,19 @@ defmodule TestHelper do
|
|||
:crashed = :amqqueue.get_state(existing_amqqueue)
|
||||
end
|
||||
|
||||
def stop_queue(queue_resource = {:resource, vhost, :queue, queue_name}) do
|
||||
node = get_rabbit_hostname()
|
||||
|
||||
:rabbit_misc.rpc_call(node, :rabbit_amqqueue, :kill_queue_hard, [
|
||||
node,
|
||||
queue_resource,
|
||||
:shutdown
|
||||
])
|
||||
|
||||
{:existing, existing_amqqueue} = declare_queue(queue_name, vhost, true)
|
||||
:stopped = :amqqueue.get_state(existing_amqqueue)
|
||||
end
|
||||
|
||||
def delete_all_queues() do
|
||||
try do
|
||||
immediately_delete_all_queues(:rabbit_amqqueue.list())
|
||||
|
|
|
|||
Loading…
Reference in New Issue