Merge pull request #13409 from rabbitmq/fix-test-flakes-in-amqp_client_SUITE

amqp_client_SUITE: Fix frequent test failures
This commit is contained in:
Jean-Sébastien Pédron 2025-03-07 17:52:52 +01:00 committed by GitHub
commit f661c1ef9c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 147 additions and 115 deletions

View File

@ -25,6 +25,7 @@ jobs:
- parallel-ct-set-2
- parallel-ct-set-3
- parallel-ct-set-4
- ct-amqp_client
- ct-clustering_management
- eunit ct-dead_lettering
- ct-feature_flags

View File

@ -287,6 +287,20 @@ open_sent({call, From}, begin_session,
#state{pending_session_reqs = PendingSessionReqs} = State) ->
State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
{keep_state, State1};
open_sent(_EvtType, {close, Reason}, State) ->
case send_close(State, Reason) of
ok ->
%% "After writing this frame the peer SHOULD continue to read from the connection
%% until it receives the partner's close frame (in order to guard against
%% erroneously or maliciously implemented partners, a peer SHOULD implement a
%% timeout to give its partner a reasonable time to receive and process the close
%% before giving up and simply closing the underlying transport mechanism)." [§2.4.3]
{next_state, close_sent, State, {state_timeout, ?TIMEOUT, received_no_close_frame}};
{error, closed} ->
{stop, normal, State};
Error ->
{stop, Error, State}
end;
open_sent(info, {'DOWN', MRef, process, _, _},
#state{reader_m_ref = MRef}) ->
{stop, {shutdown, reader_down}}.
@ -345,7 +359,10 @@ close_sent(_EvtType, #'v1_0.close'{} = Close, #state{config = Config}) ->
ok = notify_closed(Config, Close),
{stop, normal};
close_sent(state_timeout, received_no_close_frame, _Data) ->
{stop, normal}.
{stop, normal};
close_sent(_EvtType, #'v1_0.open'{}, _Data) ->
%% Transition from CLOSE_PIPE to CLOSE_SENT in figure 2.23.
keep_state_and_data.
set_other_procs0(OtherProcs, State) ->
#{sessions_sup := SessionsSup,

View File

@ -175,7 +175,8 @@ bats: $(BATS)
tests:: bats
SLOW_CT_SUITES := backing_queue \
SLOW_CT_SUITES := amqp_client \
backing_queue \
channel_interceptor \
cluster \
cluster_rename \
@ -257,7 +258,7 @@ define ct_master.erl
halt(0)
endef
PARALLEL_CT_SET_1_A = amqp_client unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
PARALLEL_CT_SET_1_A = unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filtex amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit
@ -282,7 +283,7 @@ PARALLEL_CT_SET_2 = $(sort $(PARALLEL_CT_SET_2_A) $(PARALLEL_CT_SET_2_B) $(PARAL
PARALLEL_CT_SET_3 = $(sort $(PARALLEL_CT_SET_3_A) $(PARALLEL_CT_SET_3_B) $(PARALLEL_CT_SET_3_C) $(PARALLEL_CT_SET_3_D))
PARALLEL_CT_SET_4 = $(sort $(PARALLEL_CT_SET_4_A) $(PARALLEL_CT_SET_4_B) $(PARALLEL_CT_SET_4_C) $(PARALLEL_CT_SET_4_D))
SEQUENTIAL_CT_SUITES = clustering_management dead_lettering feature_flags metadata_store_clustering quorum_queue rabbit_stream_queue
SEQUENTIAL_CT_SUITES = amqp_client clustering_management dead_lettering feature_flags metadata_store_clustering quorum_queue rabbit_stream_queue
PARALLEL_CT_SUITES = $(PARALLEL_CT_SET_1) $(PARALLEL_CT_SET_2) $(PARALLEL_CT_SET_3) $(PARALLEL_CT_SET_4)
ifeq ($(filter-out $(SEQUENTIAL_CT_SUITES) $(PARALLEL_CT_SUITES),$(CT_SUITES)),)

View File

@ -11,6 +11,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-compile([nowarn_export_all,
export_all]).
@ -355,10 +356,9 @@ init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
%% Assert that every testcase cleaned up.
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))),
%% Wait for sessions to terminate before starting the next test case.
%% Clean up any queues, connections, and sessions.
rpc(Config, ?MODULE, delete_queues, []),
ok = rpc(Config, rabbit_networking, close_all_connections, [<<"test finished">>]),
eventually(?_assertEqual([], rpc(Config, rabbit_amqp_session, list_local, []))),
%% Assert that global counters count correctly.
eventually(?_assertMatch(#{publishers := 0,
@ -586,7 +586,7 @@ modified_quorum_queue(Config) ->
ok = amqp10_client:settle_msg(Receiver1, M2e, modified),
%% Test that we can consume via AMQP 0.9.1
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
{#'basic.get_ok'{},
#amqp_msg{payload = <<"m2">>,
props = #'P_basic'{headers = Headers}}
@ -597,7 +597,7 @@ modified_quorum_queue(Config) ->
lists:keysearch(<<"x-other">>, 1, Headers)),
?assertEqual({value, {<<"x-delivery-count">>, long, 5}},
lists:keysearch(<<"x-delivery-count">>, 1, Headers)),
ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
ok = amqp10_client:detach_link(Receiver1),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
@ -1343,7 +1343,7 @@ amqp_amqpl(QType, Config) ->
ok = amqp10_client:detach_link(Sender),
flush(detached),
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'basic.qos_ok'{} = amqp_channel:call(Ch, #'basic.qos'{global = false,
prefetch_count = 100}),
CTag = <<"my-tag">>,
@ -1426,7 +1426,7 @@ amqp_amqpl(QType, Config) ->
after 30000 -> ct:fail({missing_deliver, ?LINE})
end,
ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = close_connection_sync(Connection).
@ -1435,7 +1435,7 @@ message_headers_conversion(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(QName),
%% declare a quorum queue
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
amqp_channel:call(Ch, #'queue.declare'{
queue = QName,
durable = true,
@ -1447,7 +1447,7 @@ message_headers_conversion(Config) ->
amqp10_to_amqp091_header_conversion(Session, Ch, QName, Address),
amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address),
ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
ok = delete_queue(Session, QName),
ok = close_connection_sync(Connection).
@ -1553,11 +1553,11 @@ multiple_sessions(Config) ->
ok = amqp10_client:flow_link_credit(Receiver2, NMsgsPerReceiver, never),
flush("receiver attached"),
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
[#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName,
exchange = <<"amq.fanout">>})
|| QName <- Qs],
ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
%% Send on each session.
TargetAddr = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
@ -1613,13 +1613,13 @@ server_closes_link_stream(Config) ->
server_closes_link(QType, Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{
queue = QName,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, QType}]}),
ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
@ -1694,7 +1694,7 @@ server_closes_link_exchange(Settled, Config) ->
XName = atom_to_binary(?FUNCTION_NAME),
QName = <<"my queue">>,
RoutingKey = <<"my routing key">>,
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = XName}),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName,
@ -1736,7 +1736,7 @@ server_closes_link_exchange(Settled, Config) ->
?assertMatch(#{publishers := 0}, get_global_counters(Config)),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
ok = end_session_sync(Session),
ok = close_connection_sync(Connection).
@ -1748,13 +1748,13 @@ link_target_quorum_queue_deleted(Config) ->
link_target_queue_deleted(QType, Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{
queue = QName,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, QType}]}),
ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
@ -1809,7 +1809,7 @@ target_queues_deleted_accepted(Config) ->
Q2 = <<"q2">>,
Q3 = <<"q3">>,
QNames = [Q1, Q2, Q3],
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
[begin
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName,
@ -1858,7 +1858,7 @@ target_queues_deleted_accepted(Config) ->
?assertEqual(#'queue.delete_ok'{message_count = 2},
amqp_channel:call(Ch, #'queue.delete'{queue = Q1})),
ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
?assert(rpc(Config, meck, validate, [Mod])),
ok = rpc(Config, meck, unload, [Mod]),
ok = end_session_sync(Session),
@ -1943,7 +1943,7 @@ sync_get_unsettled_stream(Config) ->
sync_get_unsettled(QType, Config) ->
SenderSettleMode = unsettled,
QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{
queue = QName,
@ -2032,7 +2032,7 @@ sync_get_unsettled(QType, Config) ->
ok = end_session_sync(Session),
ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
sync_get_unsettled_2_classic_queue(Config) ->
sync_get_unsettled_2(<<"classic">>, Config).
@ -2047,7 +2047,7 @@ sync_get_unsettled_2_stream(Config) ->
sync_get_unsettled_2(QType, Config) ->
SenderSettleMode = unsettled,
QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{
queue = QName,
@ -2122,7 +2122,7 @@ sync_get_unsettled_2(QType, Config) ->
ok = end_session_sync(Session),
ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
sync_get_settled_classic_queue(Config) ->
sync_get_settled(<<"classic">>, Config).
@ -2137,7 +2137,7 @@ sync_get_settled_stream(Config) ->
sync_get_settled(QType, Config) ->
SenderSettleMode = settled,
QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{
queue = QName,
@ -2202,7 +2202,7 @@ sync_get_settled(QType, Config) ->
ok = end_session_sync(Session),
ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
timed_get_classic_queue(Config) ->
timed_get(<<"classic">>, Config).
@ -2216,7 +2216,7 @@ timed_get_stream(Config) ->
%% Synchronous get with a timeout, figure 2.44.
timed_get(QType, Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{
queue = QName,
@ -2274,7 +2274,7 @@ timed_get(QType, Config) ->
ok = end_session_sync(Session),
ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
stop_classic_queue(Config) ->
stop(<<"classic">>, Config).
@ -2287,7 +2287,7 @@ stop_stream(Config) ->
%% Test stopping a link, figure 2.46.
stop(QType, Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
QName = atom_to_binary(?FUNCTION_NAME),
#'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{
@ -2353,7 +2353,7 @@ stop(QType, Config) ->
ok = end_session_sync(Session),
ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
consumer_priority_classic_queue(Config) ->
consumer_priority(<<"classic">>, Config).
@ -2831,7 +2831,7 @@ detach_requeues_one_session_quorum_queue(Config) ->
detach_requeue_one_session(QType, Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{
queue = QName,
@ -2909,7 +2909,7 @@ detach_requeue_one_session(QType, Config) ->
ok = end_session_sync(Session),
ok = close_connection_sync(Connection),
#'queue.delete_ok'{message_count = 0} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
detach_requeues_drop_head_classic_queue(Config) ->
QName1 = <<"q1">>,
@ -3079,7 +3079,7 @@ detach_requeues_two_connections(QType, Config) ->
resource_alarm_before_session_begin(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
@ -3130,11 +3130,11 @@ resource_alarm_before_session_begin(Config) ->
ok = end_session_sync(Session1),
ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
resource_alarm_after_session_begin(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
Address = rabbitmq_amqp_address:queue(QName),
OpnConf = connection_config(Config),
@ -3197,13 +3197,13 @@ resource_alarm_after_session_begin(Config) ->
ok = close_connection_sync(Connection1),
ok = close_connection_sync(Connection2),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
%% Test case for
%% https://github.com/rabbitmq/rabbitmq-server/issues/12816
resource_alarm_send_many(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
Address = rabbitmq_amqp_address:queue(QName),
OpnConf = connection_config(Config),
@ -3233,7 +3233,7 @@ resource_alarm_send_many(Config) ->
ok = end_session_sync(Session),
ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
auth_attempt_metrics(Config) ->
open_and_close_connection(Config),
@ -3266,7 +3266,7 @@ max_message_size_client_to_server(Config) ->
ok = rpc(Config, persistent_term, put, [max_message_size, MaxMessageSize]),
QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
Address = rabbitmq_amqp_address:queue(QName),
OpnConf = connection_config(Config),
@ -3290,12 +3290,12 @@ max_message_size_client_to_server(Config) ->
ok = end_session_sync(Session),
ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
ok = rpc(Config, persistent_term, put, [max_message_size, DefaultMaxMessageSize]).
max_message_size_server_to_client(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
Address = rabbitmq_amqp_address:queue(QName),
OpnConf = connection_config(Config),
@ -3344,13 +3344,13 @@ max_message_size_server_to_client(Config) ->
ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
last_queue_confirms(Config) ->
ClassicQ = <<"my classic queue">>,
QuorumQ = <<"my quorum queue">>,
Qs = [ClassicQ, QuorumQ],
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{queue = ClassicQ}),
#'queue.declare_ok'{} = amqp_channel:call(
@ -3416,13 +3416,13 @@ last_queue_confirms(Config) ->
amqp_channel:call(Ch, #'queue.delete'{queue = ClassicQ})),
?assertEqual(#'queue.delete_ok'{message_count = 2},
amqp_channel:call(Ch, #'queue.delete'{queue = QuorumQ})),
ok = rabbit_ct_client_helpers:close_channel(Ch).
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
target_queue_deleted(Config) ->
ClassicQ = <<"my classic queue">>,
QuorumQ = <<"my quorum queue">>,
Qs = [ClassicQ, QuorumQ],
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{queue = ClassicQ}),
#'queue.declare_ok'{} = amqp_channel:call(
@ -3488,11 +3488,12 @@ target_queue_deleted(Config) ->
ok = close_connection_sync(Connection),
?assertEqual(#'queue.delete_ok'{message_count = 2},
amqp_channel:call(Ch, #'queue.delete'{queue = QuorumQ})),
ok = rabbit_ct_client_helpers:close_channel(Ch).
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
target_classic_queue_down(Config) ->
ClassicQueueNode = 2,
Ch = rabbit_ct_client_helpers:open_channel(Config, ClassicQueueNode),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(
Config, ClassicQueueNode),
QName = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(QName),
#'queue.declare_ok'{} = amqp_channel:call(
@ -3500,7 +3501,7 @@ target_classic_queue_down(Config) ->
queue = QName,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}]}),
ok = rabbit_ct_client_helpers:close_channels_and_connection(Config, ClassicQueueNode),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
@ -3578,7 +3579,8 @@ async_notify_unsettled_stream(Config) ->
%% Test asynchronous notification, figure 2.45.
async_notify(SenderSettleMode, QType, Config) ->
%% Place queue leader on the old node.
Ch = rabbit_ct_client_helpers:open_channel(Config, 1),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(
Config, 1),
QName = atom_to_binary(?FUNCTION_NAME),
#'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{
@ -3635,7 +3637,7 @@ async_notify(SenderSettleMode, QType, Config) ->
end,
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
ok = end_session_sync(Session),
ok = close_connection_sync(Connection).
@ -3643,7 +3645,7 @@ async_notify(SenderSettleMode, QType, Config) ->
%% (slow queue) does not impact other link receivers (fast queues) on the **same** session.
%% (This is unlike AMQP legacy where a single slow queue will block the entire connection.)
link_flow_control(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
CQ = <<"cq">>,
QQ = <<"qq">>,
#'queue.declare_ok'{} = amqp_channel:call(
@ -3656,6 +3658,7 @@ link_flow_control(Config) ->
queue = QQ,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
@ -3743,7 +3746,8 @@ quorum_queue_on_new_node(Config) ->
%% In mixed version tests, run the queue leader with old code
%% and queue client with new code, or vice versa.
queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, QueueLeaderNode),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(
Config, QueueLeaderNode),
QName = atom_to_binary(?FUNCTION_NAME),
#'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{queue = QName,
@ -3812,7 +3816,7 @@ queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config)
ExpectedReadyMsgs = 0,
?assertEqual(#'queue.delete_ok'{message_count = ExpectedReadyMsgs},
amqp_channel:call(Ch, #'queue.delete'{queue = QName})),
ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
ok = close_connection_sync(Connection).
maintenance(Config) ->
@ -3860,11 +3864,14 @@ leader_transfer_stream_credit_batches(Config) ->
leader_transfer_credit(QName, QType, Credit, Config) ->
%% Create queue with leader on node 1.
{_, _, LinkPair1} = Init = init(1, Config),
{ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(
LinkPair1,
QName,
#{arguments => #{<<"x-queue-type">> => {utf8, QType},
<<"x-queue-leader-locator">> => {utf8, <<"client-local">>}}}),
?awaitMatch(
{ok, #{type := QType}},
rabbitmq_amqp_client:declare_queue(
LinkPair1,
QName,
#{arguments => #{<<"x-queue-type">> => {utf8, QType},
<<"x-queue-leader-locator">> => {utf8, <<"client-local">>}}}),
60000),
ok = close(Init),
OpnConf = connection_config(0, Config),
@ -4009,7 +4016,7 @@ global_counters(Config) ->
messages_redelivered_total := QQRedelivered0,
messages_acknowledged_total := QQAcknowledged0} = get_global_counters(Config, rabbit_quorum_queue),
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
CQ = <<"my classic queue">>,
QQ = <<"my quorum queue">>,
CQAddress = rabbitmq_amqp_address:queue(CQ),
@ -4134,7 +4141,7 @@ global_counters(Config) ->
%% m4 was returned
?assertEqual(UnroutableReturned1 + 1, UnroutableReturned2),
ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
ok = amqp10_client:detach_link(Sender),
ok = end_session_sync(Session),
ok = close_connection_sync(Connection).
@ -4142,12 +4149,12 @@ global_counters(Config) ->
stream_bloom_filter(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(Stream),
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
amqp_channel:call(Ch, #'queue.declare'{
queue = Stream,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]}),
ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
@ -4274,7 +4281,7 @@ available_messages_stream(Config) ->
available_messages(QType, Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{
queue = QName,
@ -4366,7 +4373,7 @@ available_messages(QType, Config) ->
ok = end_session_sync(Session),
ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
incoming_message_interceptors(Config) ->
Key = ?FUNCTION_NAME,
@ -4433,7 +4440,7 @@ trace(Q, QType, Config) ->
RoutingKey = <<"my routing key">>,
Payload = <<"my payload">>,
CorrelationId = <<"my correlation 👀"/utf8>>,
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{
queue = Q,
@ -4512,6 +4519,7 @@ trace(Q, QType, Config) ->
timer:sleep(20),
?assertMatch(#'basic.get_empty'{},
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ})),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:detach_link(Receiver),
@ -4556,9 +4564,9 @@ user_id(Config) ->
message_ttl(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(QName),
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
@ -4606,43 +4614,45 @@ plugin(Config) ->
idle_time_out_on_server(Config) ->
App = rabbit,
Par = heartbeat,
{ok, DefaultVal} = rpc(Config, application, get_env, [App, Par]),
%% Configure RabbitMQ to use an idle-time-out of 1 second.
ok = rpc(Config, application, set_env, [App, Par, 1]),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
receive {amqp10_event, {connection, Connection, opened}} -> ok
after 30000 -> ct:fail({missing_event, ?LINE})
end,
%% Mock the server socket to not have received any bytes.
rabbit_ct_broker_helpers:setup_meck(Config),
Mod = rabbit_net,
ok = rpc(Config, meck, new, [Mod, [no_link, passthrough]]),
ok = rpc(Config, meck, expect, [Mod, getstat, fun(_Sock, [recv_oct]) ->
{ok, [{recv_oct, 999}]};
(Sock, Opts) ->
meck:passthrough([Sock, Opts])
end]),
{ok, DefaultVal} = rpc(Config, application, get_env, [App, Par]),
try
%% Configure RabbitMQ to use an idle-time-out of 1 second.
ok = rpc(Config, application, set_env, [App, Par, 1]),
%% The server "SHOULD try to gracefully close the connection using a close
%% frame with an error explaining why" [2.4.5].
%% Since we chose a heartbeat value of 1 second, the server should easily
%% close the connection within 5 seconds.
receive
{amqp10_event,
{connection, Connection,
{closed,
{resource_limit_exceeded,
<<"no frame received from client within idle timeout threshold">>}}}} -> ok
after 30000 ->
ct:fail({missing_event, ?LINE})
end,
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
receive {amqp10_event, {connection, Connection, opened}} -> ok
after 30000 -> ct:fail({missing_event, ?LINE})
end,
?assert(rpc(Config, meck, validate, [Mod])),
ok = rpc(Config, meck, unload, [Mod]),
ok = rpc(Config, application, set_env, [App, Par, DefaultVal]).
%% Mock the server socket to not have received any bytes.
rabbit_ct_broker_helpers:setup_meck(Config),
ok = rpc(Config, meck, new, [Mod, [no_link, passthrough]]),
ok = rpc(Config, meck, expect, [Mod, getstat, fun(_Sock, [recv_oct]) ->
{ok, [{recv_oct, 999}]};
(Sock, Opts) ->
meck:passthrough([Sock, Opts])
end]),
%% The server "SHOULD try to gracefully close the connection using a close
%% frame with an error explaining why" [2.4.5].
%% Since we chose a heartbeat value of 1 second, the server should easily
%% close the connection within 5 seconds.
receive
{amqp10_event,
{connection, Connection,
{closed,
{resource_limit_exceeded,
<<"no frame received from client within idle timeout threshold">>}}}} -> ok
after 30000 ->
ct:fail({missing_event, ?LINE})
end
after
?assert(rpc(Config, meck, validate, [Mod])),
ok = rpc(Config, meck, unload, [Mod]),
ok = rpc(Config, application, set_env, [App, Par, DefaultVal])
end.
%% Test that the idle timeout threshold is exceeded on the client
%% when no frames are sent from server to client.
@ -4741,7 +4751,7 @@ credential_expires(Config) ->
%% Attaching to an exclusive source queue should fail.
attach_to_exclusive_queue(Config) ->
QName = <<"my queue">>,
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{queue = QName,
durable = true,
@ -4764,7 +4774,7 @@ attach_to_exclusive_queue(Config) ->
ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
dynamic_target_short_link_name(Config) ->
OpnConf0 = connection_config(Config),
@ -5436,12 +5446,15 @@ dead_letter_into_stream(Config) ->
<<"x-dead-letter-exchange">> => {utf8, <<>>},
<<"x-dead-letter-routing-key">> => {utf8, QName1}
}}),
{ok, #{type := <<"stream">>}} = rabbitmq_amqp_client:declare_queue(
LinkPair1,
QName1,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>},
<<"x-initial-cluster-size">> => {ulong, 1}
}}),
?awaitMatch(
{ok, #{type := <<"stream">>}},
rabbitmq_amqp_client:declare_queue(
LinkPair1,
QName1,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>},
<<"x-initial-cluster-size">> => {ulong, 1}
}}),
60000),
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session1, <<"receiver">>, <<"/amq/queue/", QName1/binary>>,
settled, configuration,
@ -5874,9 +5887,9 @@ receive_many_auto_flow(QType, Config) ->
%% incoming-window being closed.
incoming_window_closed_transfer_flow_order(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
Address = rabbitmq_amqp_address:queue(QName),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),