Add test for cleaning up dead exclusive queues
This commit is contained in:
parent
23884dd83b
commit
af397dd1b4
|
|
@ -35,7 +35,7 @@
|
|||
-export([on_node_up/1, on_node_down/1]).
|
||||
-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]).
|
||||
-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]).
|
||||
-export([is_mirrored/1, is_exclusive/1]). % Note: exported due to use in qlc expression.
|
||||
-export([is_mirrored/1, is_dead_exclusive/1]). % Note: exported due to use in qlc expression.
|
||||
|
||||
-export([pid_of/1, pid_of/2]).
|
||||
|
||||
|
|
@ -920,10 +920,10 @@ cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mi
|
|||
is_mirrored(Q) ->
|
||||
rabbit_mirror_queue_misc:is_mirrored(Q).
|
||||
|
||||
is_exclusive(#amqqueue{exclusive_owner = none}) ->
|
||||
is_dead_exclusive(#amqqueue{exclusive_owner = none}) ->
|
||||
false;
|
||||
is_exclusive(#amqqueue{exclusive_owner = Pid}) when is_pid(Pid) ->
|
||||
true.
|
||||
is_dead_exclusive(#amqqueue{exclusive_owner = Pid}) when is_pid(Pid) ->
|
||||
not rabbit_mnesia:is_process_alive(Pid).
|
||||
|
||||
on_node_up(Node) ->
|
||||
ok = rabbit_misc:execute_mnesia_transaction(
|
||||
|
|
@ -974,7 +974,7 @@ on_node_down(Node) ->
|
|||
node(Pid) == Node andalso
|
||||
not rabbit_mnesia:is_process_alive(Pid) andalso
|
||||
(not rabbit_amqqueue:is_mirrored(Q) orelse
|
||||
rabbit_amqqueue:is_exclusive(Q))])),
|
||||
rabbit_amqqueue:is_dead_exclusive(Q))])),
|
||||
{Qs, Dels} = lists:unzip(QsDels),
|
||||
T = rabbit_binding:process_deletions(
|
||||
lists:foldl(fun rabbit_binding:combine_deletions/2,
|
||||
|
|
|
|||
|
|
@ -43,8 +43,7 @@ groups() ->
|
|||
{net_ticktime_1, [], [
|
||||
{cluster_size_2, [], [
|
||||
ctl_ticktime_sync,
|
||||
prompt_disconnect_detection,
|
||||
clean_up_exclusive_queues
|
||||
prompt_disconnect_detection
|
||||
]},
|
||||
{cluster_size_3, [], [
|
||||
autoheal,
|
||||
|
|
@ -283,15 +282,6 @@ prompt_disconnect_detection(Config) ->
|
|||
rabbit_ct_client_helpers:close_channel(ChB),
|
||||
ok.
|
||||
|
||||
clean_up_exclusive_queues(Config) ->
|
||||
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<".*">>, <<"all">>),
|
||||
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
ChA = rabbit_ct_client_helpers:open_channel(Config, A),
|
||||
amqp_channel:call(ChA, #'queue.declare'{queue = <<"excl">>,
|
||||
exclusive = true}),
|
||||
block_unblock([{A, B}]),
|
||||
ok.
|
||||
|
||||
ctl_ticktime_sync(Config) ->
|
||||
%% Server has 1s net_ticktime, make sure ctl doesn't get disconnected
|
||||
Cmd = ["eval", "timer:sleep(5000)."],
|
||||
|
|
|
|||
|
|
@ -31,7 +31,8 @@ groups() ->
|
|||
[
|
||||
{cluster_size_2, [], [
|
||||
rapid_redeclare,
|
||||
declare_synchrony
|
||||
declare_synchrony,
|
||||
clean_up_exclusive_queues
|
||||
]},
|
||||
{cluster_size_3, [], [
|
||||
consume_survives_stop,
|
||||
|
|
@ -125,6 +126,21 @@ declare_synchrony(Config) ->
|
|||
declare(Ch, Name) ->
|
||||
amqp_channel:call(Ch, #'queue.declare'{durable = true, queue = Name}).
|
||||
|
||||
%% Ensure that exclusive queues are cleaned up when part of ha cluster
|
||||
%% and node is killed abruptly then restarted
|
||||
clean_up_exclusive_queues(Config) ->
|
||||
QName = <<"excl">>,
|
||||
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<".*">>, <<"all">>),
|
||||
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
ChA = rabbit_ct_client_helpers:open_channel(Config, A),
|
||||
amqp_channel:call(ChA, #'queue.declare'{queue = QName,
|
||||
exclusive = true}),
|
||||
ok = rabbit_ct_broker_helpers:kill_node(Config, A),
|
||||
[] = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_amqqueue, list, []),
|
||||
ok = rabbit_ct_broker_helpers:start_node(Config, A),
|
||||
[[],[]] = rabbit_ct_broker_helpers:rpc_all(Config, rabbit_amqqueue, list, []),
|
||||
ok.
|
||||
|
||||
consume_survives_stop(Cf) -> consume_survives(Cf, fun stop/2, true).
|
||||
consume_survives_sigkill(Cf) -> consume_survives(Cf, fun sigkill/2, true).
|
||||
consume_survives_policy(Cf) -> consume_survives(Cf, fun policy/2, true).
|
||||
|
|
|
|||
Loading…
Reference in New Issue