rabbitmq-server/deps/rabbit/test/channel_operation_timeout_S...

198 lines
6.4 KiB
Erlang

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(channel_operation_timeout_SUITE).
-include_lib("amqp_client/include/amqp_client.hrl").
-include("amqqueue.hrl").
-compile([export_all]).
-import(rabbit_misc, [pget/2]).
-define(CONFIG, [cluster_ab]).
-define(DEFAULT_VHOST, <<"/">>).
-define(QRESOURCE(Q), rabbit_misc:r(?DEFAULT_VHOST, queue, Q)).
-define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>).
-define(DELAY, 25).
all() ->
[
notify_down_all
].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(_, Config) ->
Config.
end_per_group(_, Config) ->
Config.
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase),
ClusterSize = 2,
TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodes_count, ClusterSize},
{rmq_nodename_suffix, Testcase},
{tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
]),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
end_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
%% -------------------------------------------------------------------
%% Testcases.
%% -------------------------------------------------------------------
notify_down_all(Config) ->
Rabbit = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
RabbitCh = rabbit_ct_client_helpers:open_channel(Config, 0),
HareCh = rabbit_ct_client_helpers:open_channel(Config, 1),
ct:pal("one"),
%% success
set_channel_operation_timeout_config(Config, 1000),
configure_bq(Config),
QCfg0 = qconfig(RabbitCh, <<"q0">>, <<"ex0">>, true, false),
declare(QCfg0),
ct:pal("two"),
%% Testing rabbit_amqqueue:notify_down_all via rabbit_channel.
%% Consumer count = 0 after correct channel termination and
%% notification of queues via delegate:call/3
true = (0 =/= length(get_consumers(Config, Rabbit, ?DEFAULT_VHOST))),
rabbit_ct_client_helpers:close_channel(RabbitCh),
0 = length(get_consumers(Config, Rabbit, ?DEFAULT_VHOST)),
false = is_process_alive(RabbitCh),
ct:pal("three"),
%% fail
set_channel_operation_timeout_config(Config, 10),
QCfg2 = qconfig(HareCh, <<"q1">>, <<"ex1">>, true, false),
declare(QCfg2),
publish(QCfg2, ?TIMEOUT_TEST_MSG),
timer:sleep(?DELAY),
rabbit_ct_client_helpers:close_channel(HareCh),
timer:sleep(?DELAY),
false = is_process_alive(HareCh),
pass.
%% -------------------------
%% Internal helper functions
%% -------------------------
set_channel_operation_timeout_config(Config, Timeout) ->
[ok = Ret
|| Ret <- rabbit_ct_broker_helpers:rpc_all(Config,
application, set_env, [rabbit, channel_operation_timeout, Timeout])],
ok.
set_channel_operation_backing_queue(Config) ->
[ok = Ret
|| Ret <- rabbit_ct_broker_helpers:rpc_all(Config,
application, set_env,
[rabbit, backing_queue_module, channel_operation_timeout_test_queue])],
ok.
re_enable_priority_queue(Config) ->
[ok = Ret
|| Ret <- rabbit_ct_broker_helpers:rpc_all(Config,
rabbit_priority_queue, enable, [])],
ok.
declare(QCfg) ->
QDeclare = #'queue.declare'{queue = Q = pget(name, QCfg), durable = true},
#'queue.declare_ok'{} = amqp_channel:call(Ch = pget(ch, QCfg), QDeclare),
ExDeclare = #'exchange.declare'{exchange = Ex = pget(ex, QCfg)},
#'exchange.declare_ok'{} = amqp_channel:call(Ch, ExDeclare),
#'queue.bind_ok'{} =
amqp_channel:call(Ch, #'queue.bind'{queue = Q,
exchange = Ex,
routing_key = Q}),
maybe_subscribe(QCfg).
maybe_subscribe(QCfg) ->
case pget(consume, QCfg) of
true ->
Sub = #'basic.consume'{queue = pget(name, QCfg)},
Ch = pget(ch, QCfg),
Del = pget(deliver, QCfg),
amqp_channel:subscribe(Ch, Sub,
spawn(fun() -> consume(Ch, Del) end));
_ -> ok
end.
consume(_Ch, false) -> receive_nothing();
consume(Ch, Deliver = true) ->
receive
{#'basic.deliver'{}, _Msg} ->
consume(Ch, Deliver)
end.
publish(QCfg, Msg) ->
Publish = #'basic.publish'{exchange = pget(ex, QCfg),
routing_key = pget(name, QCfg)},
amqp_channel:call(pget(ch, QCfg), Publish,
#amqp_msg{payload = Msg}).
get_consumers(Config, Node, VHost) when is_atom(Node),
is_binary(VHost) ->
rabbit_ct_broker_helpers:rpc(Config, Node,
rabbit_amqqueue, consumers_all, [VHost]).
get_amqqueue(QName0, []) ->
throw({not_found, QName0});
get_amqqueue(QName0, [Q | Rem]) when ?is_amqqueue(Q) ->
QName1 = amqqueue:get_name(Q),
compare_amqqueue(QName0, QName1, Q, Rem).
compare_amqqueue(QName, QName, Q, _Rem) ->
Q;
compare_amqqueue(QName, _, _, Rem) ->
get_amqqueue(QName, Rem).
qconfig(Ch, Name, Ex, Consume, Deliver) ->
[{ch, Ch}, {name, Name}, {ex,Ex}, {consume, Consume}, {deliver, Deliver}].
receive_nothing() ->
receive
after infinity -> void
end.
unhandled_req(Fun) ->
try
Fun()
catch
exit:{{shutdown,{_, ?NOT_FOUND, _}}, _} -> ok;
_:Reason -> {error, Reason}
end.
configure_bq(Config) ->
ok = set_channel_operation_backing_queue(Config),
ok = re_enable_priority_queue(Config),
ok = rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config,
?MODULE).