198 lines
6.4 KiB
Erlang
198 lines
6.4 KiB
Erlang
%% This Source Code Form is subject to the terms of the Mozilla Public
|
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
%%
|
|
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
|
%%
|
|
|
|
-module(channel_operation_timeout_SUITE).
|
|
|
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
|
-include("amqqueue.hrl").
|
|
|
|
-compile([export_all]).
|
|
|
|
-import(rabbit_misc, [pget/2]).
|
|
|
|
-define(CONFIG, [cluster_ab]).
|
|
-define(DEFAULT_VHOST, <<"/">>).
|
|
-define(QRESOURCE(Q), rabbit_misc:r(?DEFAULT_VHOST, queue, Q)).
|
|
-define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>).
|
|
-define(DELAY, 25).
|
|
|
|
all() ->
|
|
[
|
|
notify_down_all
|
|
].
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Testsuite setup/teardown.
|
|
%% -------------------------------------------------------------------
|
|
|
|
init_per_suite(Config) ->
|
|
rabbit_ct_helpers:log_environment(),
|
|
rabbit_ct_helpers:run_setup_steps(Config).
|
|
|
|
end_per_suite(Config) ->
|
|
rabbit_ct_helpers:run_teardown_steps(Config).
|
|
|
|
init_per_group(_, Config) ->
|
|
Config.
|
|
|
|
end_per_group(_, Config) ->
|
|
Config.
|
|
|
|
init_per_testcase(Testcase, Config) ->
|
|
rabbit_ct_helpers:testcase_started(Config, Testcase),
|
|
ClusterSize = 2,
|
|
TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
|
|
Config1 = rabbit_ct_helpers:set_config(Config, [
|
|
{rmq_nodes_count, ClusterSize},
|
|
{rmq_nodename_suffix, Testcase},
|
|
{tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
|
|
]),
|
|
rabbit_ct_helpers:run_steps(Config1,
|
|
rabbit_ct_broker_helpers:setup_steps() ++
|
|
rabbit_ct_client_helpers:setup_steps()).
|
|
|
|
end_per_testcase(Testcase, Config) ->
|
|
Config1 = rabbit_ct_helpers:run_steps(Config,
|
|
rabbit_ct_client_helpers:teardown_steps() ++
|
|
rabbit_ct_broker_helpers:teardown_steps()),
|
|
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
|
|
|
|
%% -------------------------------------------------------------------
|
|
%% Testcases.
|
|
%% -------------------------------------------------------------------
|
|
|
|
notify_down_all(Config) ->
|
|
Rabbit = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
RabbitCh = rabbit_ct_client_helpers:open_channel(Config, 0),
|
|
HareCh = rabbit_ct_client_helpers:open_channel(Config, 1),
|
|
|
|
ct:pal("one"),
|
|
%% success
|
|
set_channel_operation_timeout_config(Config, 1000),
|
|
configure_bq(Config),
|
|
QCfg0 = qconfig(RabbitCh, <<"q0">>, <<"ex0">>, true, false),
|
|
declare(QCfg0),
|
|
ct:pal("two"),
|
|
%% Testing rabbit_amqqueue:notify_down_all via rabbit_channel.
|
|
%% Consumer count = 0 after correct channel termination and
|
|
%% notification of queues via delegate:call/3
|
|
true = (0 =/= length(get_consumers(Config, Rabbit, ?DEFAULT_VHOST))),
|
|
rabbit_ct_client_helpers:close_channel(RabbitCh),
|
|
0 = length(get_consumers(Config, Rabbit, ?DEFAULT_VHOST)),
|
|
false = is_process_alive(RabbitCh),
|
|
ct:pal("three"),
|
|
|
|
%% fail
|
|
set_channel_operation_timeout_config(Config, 10),
|
|
QCfg2 = qconfig(HareCh, <<"q1">>, <<"ex1">>, true, false),
|
|
declare(QCfg2),
|
|
publish(QCfg2, ?TIMEOUT_TEST_MSG),
|
|
timer:sleep(?DELAY),
|
|
rabbit_ct_client_helpers:close_channel(HareCh),
|
|
timer:sleep(?DELAY),
|
|
false = is_process_alive(HareCh),
|
|
|
|
pass.
|
|
|
|
%% -------------------------
|
|
%% Internal helper functions
|
|
%% -------------------------
|
|
|
|
set_channel_operation_timeout_config(Config, Timeout) ->
|
|
[ok = Ret
|
|
|| Ret <- rabbit_ct_broker_helpers:rpc_all(Config,
|
|
application, set_env, [rabbit, channel_operation_timeout, Timeout])],
|
|
ok.
|
|
|
|
set_channel_operation_backing_queue(Config) ->
|
|
[ok = Ret
|
|
|| Ret <- rabbit_ct_broker_helpers:rpc_all(Config,
|
|
application, set_env,
|
|
[rabbit, backing_queue_module, channel_operation_timeout_test_queue])],
|
|
ok.
|
|
|
|
re_enable_priority_queue(Config) ->
|
|
[ok = Ret
|
|
|| Ret <- rabbit_ct_broker_helpers:rpc_all(Config,
|
|
rabbit_priority_queue, enable, [])],
|
|
ok.
|
|
|
|
declare(QCfg) ->
|
|
QDeclare = #'queue.declare'{queue = Q = pget(name, QCfg), durable = true},
|
|
#'queue.declare_ok'{} = amqp_channel:call(Ch = pget(ch, QCfg), QDeclare),
|
|
|
|
ExDeclare = #'exchange.declare'{exchange = Ex = pget(ex, QCfg)},
|
|
#'exchange.declare_ok'{} = amqp_channel:call(Ch, ExDeclare),
|
|
|
|
#'queue.bind_ok'{} =
|
|
amqp_channel:call(Ch, #'queue.bind'{queue = Q,
|
|
exchange = Ex,
|
|
routing_key = Q}),
|
|
maybe_subscribe(QCfg).
|
|
|
|
maybe_subscribe(QCfg) ->
|
|
case pget(consume, QCfg) of
|
|
true ->
|
|
Sub = #'basic.consume'{queue = pget(name, QCfg)},
|
|
Ch = pget(ch, QCfg),
|
|
Del = pget(deliver, QCfg),
|
|
amqp_channel:subscribe(Ch, Sub,
|
|
spawn(fun() -> consume(Ch, Del) end));
|
|
_ -> ok
|
|
end.
|
|
|
|
consume(_Ch, false) -> receive_nothing();
|
|
consume(Ch, Deliver = true) ->
|
|
receive
|
|
{#'basic.deliver'{}, _Msg} ->
|
|
consume(Ch, Deliver)
|
|
end.
|
|
|
|
publish(QCfg, Msg) ->
|
|
Publish = #'basic.publish'{exchange = pget(ex, QCfg),
|
|
routing_key = pget(name, QCfg)},
|
|
amqp_channel:call(pget(ch, QCfg), Publish,
|
|
#amqp_msg{payload = Msg}).
|
|
|
|
get_consumers(Config, Node, VHost) when is_atom(Node),
|
|
is_binary(VHost) ->
|
|
rabbit_ct_broker_helpers:rpc(Config, Node,
|
|
rabbit_amqqueue, consumers_all, [VHost]).
|
|
|
|
get_amqqueue(QName0, []) ->
|
|
throw({not_found, QName0});
|
|
get_amqqueue(QName0, [Q | Rem]) when ?is_amqqueue(Q) ->
|
|
QName1 = amqqueue:get_name(Q),
|
|
compare_amqqueue(QName0, QName1, Q, Rem).
|
|
|
|
compare_amqqueue(QName, QName, Q, _Rem) ->
|
|
Q;
|
|
compare_amqqueue(QName, _, _, Rem) ->
|
|
get_amqqueue(QName, Rem).
|
|
|
|
qconfig(Ch, Name, Ex, Consume, Deliver) ->
|
|
[{ch, Ch}, {name, Name}, {ex,Ex}, {consume, Consume}, {deliver, Deliver}].
|
|
|
|
receive_nothing() ->
|
|
receive
|
|
after infinity -> void
|
|
end.
|
|
|
|
unhandled_req(Fun) ->
|
|
try
|
|
Fun()
|
|
catch
|
|
exit:{{shutdown,{_, ?NOT_FOUND, _}}, _} -> ok;
|
|
_:Reason -> {error, Reason}
|
|
end.
|
|
|
|
configure_bq(Config) ->
|
|
ok = set_channel_operation_backing_queue(Config),
|
|
ok = re_enable_priority_queue(Config),
|
|
ok = rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config,
|
|
?MODULE).
|