amqp_client_SUITE: Use a dedicated AMQP-0-9-1 connection per testcase

... instead of a global one. Otherwise, one connection failure, even if
expected by a testcase, will affect all subsequent testcases negatively.
This commit is contained in:
Jean-Sébastien Pédron 2025-02-26 23:45:20 +01:00
parent 6084055183
commit ce5ba6da04
No known key found for this signature in database
GPG Key ID: 39E99761A5FD94CC
1 changed files with 67 additions and 62 deletions

View File

@ -587,7 +587,7 @@ modified_quorum_queue(Config) ->
ok = amqp10_client:settle_msg(Receiver1, M2e, modified), ok = amqp10_client:settle_msg(Receiver1, M2e, modified),
%% Test that we can consume via AMQP 0.9.1 %% Test that we can consume via AMQP 0.9.1
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
{#'basic.get_ok'{}, {#'basic.get_ok'{},
#amqp_msg{payload = <<"m2">>, #amqp_msg{payload = <<"m2">>,
props = #'P_basic'{headers = Headers}} props = #'P_basic'{headers = Headers}}
@ -598,7 +598,7 @@ modified_quorum_queue(Config) ->
lists:keysearch(<<"x-other">>, 1, Headers)), lists:keysearch(<<"x-other">>, 1, Headers)),
?assertEqual({value, {<<"x-delivery-count">>, long, 5}}, ?assertEqual({value, {<<"x-delivery-count">>, long, 5}},
lists:keysearch(<<"x-delivery-count">>, 1, Headers)), lists:keysearch(<<"x-delivery-count">>, 1, Headers)),
ok = rabbit_ct_client_helpers:close_channel(Ch), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
ok = amqp10_client:detach_link(Receiver1), ok = amqp10_client:detach_link(Receiver1),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
@ -1344,7 +1344,7 @@ amqp_amqpl(QType, Config) ->
ok = amqp10_client:detach_link(Sender), ok = amqp10_client:detach_link(Sender),
flush(detached), flush(detached),
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'basic.qos_ok'{} = amqp_channel:call(Ch, #'basic.qos'{global = false, #'basic.qos_ok'{} = amqp_channel:call(Ch, #'basic.qos'{global = false,
prefetch_count = 100}), prefetch_count = 100}),
CTag = <<"my-tag">>, CTag = <<"my-tag">>,
@ -1427,7 +1427,7 @@ amqp_amqpl(QType, Config) ->
after 30000 -> ct:fail({missing_deliver, ?LINE}) after 30000 -> ct:fail({missing_deliver, ?LINE})
end, end,
ok = rabbit_ct_client_helpers:close_channel(Ch), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = close_connection_sync(Connection). ok = close_connection_sync(Connection).
@ -1436,7 +1436,7 @@ message_headers_conversion(Config) ->
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(QName), Address = rabbitmq_amqp_address:queue(QName),
%% declare a quorum queue %% declare a quorum queue
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
amqp_channel:call(Ch, #'queue.declare'{ amqp_channel:call(Ch, #'queue.declare'{
queue = QName, queue = QName,
durable = true, durable = true,
@ -1448,7 +1448,7 @@ message_headers_conversion(Config) ->
amqp10_to_amqp091_header_conversion(Session, Ch, QName, Address), amqp10_to_amqp091_header_conversion(Session, Ch, QName, Address),
amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address), amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address),
ok = rabbit_ct_client_helpers:close_channel(Ch), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
ok = delete_queue(Session, QName), ok = delete_queue(Session, QName),
ok = close_connection_sync(Connection). ok = close_connection_sync(Connection).
@ -1554,11 +1554,11 @@ multiple_sessions(Config) ->
ok = amqp10_client:flow_link_credit(Receiver2, NMsgsPerReceiver, never), ok = amqp10_client:flow_link_credit(Receiver2, NMsgsPerReceiver, never),
flush("receiver attached"), flush("receiver attached"),
Ch = rabbit_ct_client_helpers:open_channel(Config, 0), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
[#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName, [#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName,
exchange = <<"amq.fanout">>}) exchange = <<"amq.fanout">>})
|| QName <- Qs], || QName <- Qs],
ok = rabbit_ct_client_helpers:close_channel(Ch), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
%% Send on each session. %% Send on each session.
TargetAddr = rabbitmq_amqp_address:exchange(<<"amq.fanout">>), TargetAddr = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
@ -1614,13 +1614,13 @@ server_closes_link_stream(Config) ->
server_closes_link(QType, Config) -> server_closes_link(QType, Config) ->
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{ Ch, #'queue.declare'{
queue = QName, queue = QName,
durable = true, durable = true,
arguments = [{<<"x-queue-type">>, longstr, QType}]}), arguments = [{<<"x-queue-type">>, longstr, QType}]}),
ok = rabbit_ct_client_helpers:close_channel(Ch), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
OpnConf = connection_config(Config), OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Connection} = amqp10_client:open_connection(OpnConf),
@ -1695,7 +1695,7 @@ server_closes_link_exchange(Settled, Config) ->
XName = atom_to_binary(?FUNCTION_NAME), XName = atom_to_binary(?FUNCTION_NAME),
QName = <<"my queue">>, QName = <<"my queue">>,
RoutingKey = <<"my routing key">>, RoutingKey = <<"my routing key">>,
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = XName}), #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = XName}),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName, #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName,
@ -1737,7 +1737,7 @@ server_closes_link_exchange(Settled, Config) ->
?assertMatch(#{publishers := 0}, get_global_counters(Config)), ?assertMatch(#{publishers := 0}, get_global_counters(Config)),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
ok = end_session_sync(Session), ok = end_session_sync(Session),
ok = close_connection_sync(Connection). ok = close_connection_sync(Connection).
@ -1749,13 +1749,13 @@ link_target_quorum_queue_deleted(Config) ->
link_target_queue_deleted(QType, Config) -> link_target_queue_deleted(QType, Config) ->
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{ Ch, #'queue.declare'{
queue = QName, queue = QName,
durable = true, durable = true,
arguments = [{<<"x-queue-type">>, longstr, QType}]}), arguments = [{<<"x-queue-type">>, longstr, QType}]}),
ok = rabbit_ct_client_helpers:close_channel(Ch), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
OpnConf = connection_config(Config), OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Connection} = amqp10_client:open_connection(OpnConf),
@ -1810,7 +1810,7 @@ target_queues_deleted_accepted(Config) ->
Q2 = <<"q2">>, Q2 = <<"q2">>,
Q3 = <<"q3">>, Q3 = <<"q3">>,
QNames = [Q1, Q2, Q3], QNames = [Q1, Q2, Q3],
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
[begin [begin
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName, #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName,
@ -1859,7 +1859,7 @@ target_queues_deleted_accepted(Config) ->
?assertEqual(#'queue.delete_ok'{message_count = 2}, ?assertEqual(#'queue.delete_ok'{message_count = 2},
amqp_channel:call(Ch, #'queue.delete'{queue = Q1})), amqp_channel:call(Ch, #'queue.delete'{queue = Q1})),
ok = rabbit_ct_client_helpers:close_channel(Ch), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
?assert(rpc(Config, meck, validate, [Mod])), ?assert(rpc(Config, meck, validate, [Mod])),
ok = rpc(Config, meck, unload, [Mod]), ok = rpc(Config, meck, unload, [Mod]),
ok = end_session_sync(Session), ok = end_session_sync(Session),
@ -1944,7 +1944,7 @@ sync_get_unsettled_stream(Config) ->
sync_get_unsettled(QType, Config) -> sync_get_unsettled(QType, Config) ->
SenderSettleMode = unsettled, SenderSettleMode = unsettled,
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{ Ch, #'queue.declare'{
queue = QName, queue = QName,
@ -2033,7 +2033,7 @@ sync_get_unsettled(QType, Config) ->
ok = end_session_sync(Session), ok = end_session_sync(Session),
ok = close_connection_sync(Connection), ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch). ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
sync_get_unsettled_2_classic_queue(Config) -> sync_get_unsettled_2_classic_queue(Config) ->
sync_get_unsettled_2(<<"classic">>, Config). sync_get_unsettled_2(<<"classic">>, Config).
@ -2048,7 +2048,7 @@ sync_get_unsettled_2_stream(Config) ->
sync_get_unsettled_2(QType, Config) -> sync_get_unsettled_2(QType, Config) ->
SenderSettleMode = unsettled, SenderSettleMode = unsettled,
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{ Ch, #'queue.declare'{
queue = QName, queue = QName,
@ -2123,7 +2123,7 @@ sync_get_unsettled_2(QType, Config) ->
ok = end_session_sync(Session), ok = end_session_sync(Session),
ok = close_connection_sync(Connection), ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch). ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
sync_get_settled_classic_queue(Config) -> sync_get_settled_classic_queue(Config) ->
sync_get_settled(<<"classic">>, Config). sync_get_settled(<<"classic">>, Config).
@ -2138,7 +2138,7 @@ sync_get_settled_stream(Config) ->
sync_get_settled(QType, Config) -> sync_get_settled(QType, Config) ->
SenderSettleMode = settled, SenderSettleMode = settled,
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{ Ch, #'queue.declare'{
queue = QName, queue = QName,
@ -2203,7 +2203,7 @@ sync_get_settled(QType, Config) ->
ok = end_session_sync(Session), ok = end_session_sync(Session),
ok = close_connection_sync(Connection), ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch). ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
timed_get_classic_queue(Config) -> timed_get_classic_queue(Config) ->
timed_get(<<"classic">>, Config). timed_get(<<"classic">>, Config).
@ -2217,7 +2217,7 @@ timed_get_stream(Config) ->
%% Synchronous get with a timeout, figure 2.44. %% Synchronous get with a timeout, figure 2.44.
timed_get(QType, Config) -> timed_get(QType, Config) ->
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{ Ch, #'queue.declare'{
queue = QName, queue = QName,
@ -2275,7 +2275,7 @@ timed_get(QType, Config) ->
ok = end_session_sync(Session), ok = end_session_sync(Session),
ok = close_connection_sync(Connection), ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch). ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
stop_classic_queue(Config) -> stop_classic_queue(Config) ->
stop(<<"classic">>, Config). stop(<<"classic">>, Config).
@ -2288,7 +2288,7 @@ stop_stream(Config) ->
%% Test stopping a link, figure 2.46. %% Test stopping a link, figure 2.46.
stop(QType, Config) -> stop(QType, Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{ Ch, #'queue.declare'{
@ -2354,7 +2354,7 @@ stop(QType, Config) ->
ok = end_session_sync(Session), ok = end_session_sync(Session),
ok = close_connection_sync(Connection), ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch). ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
consumer_priority_classic_queue(Config) -> consumer_priority_classic_queue(Config) ->
consumer_priority(<<"classic">>, Config). consumer_priority(<<"classic">>, Config).
@ -2832,7 +2832,7 @@ detach_requeues_one_session_quorum_queue(Config) ->
detach_requeue_one_session(QType, Config) -> detach_requeue_one_session(QType, Config) ->
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{ Ch, #'queue.declare'{
queue = QName, queue = QName,
@ -2910,7 +2910,7 @@ detach_requeue_one_session(QType, Config) ->
ok = end_session_sync(Session), ok = end_session_sync(Session),
ok = close_connection_sync(Connection), ok = close_connection_sync(Connection),
#'queue.delete_ok'{message_count = 0} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), #'queue.delete_ok'{message_count = 0} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch). ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
detach_requeues_drop_head_classic_queue(Config) -> detach_requeues_drop_head_classic_queue(Config) ->
QName1 = <<"q1">>, QName1 = <<"q1">>,
@ -3080,7 +3080,7 @@ detach_requeues_two_connections(QType, Config) ->
resource_alarm_before_session_begin(Config) -> resource_alarm_before_session_begin(Config) ->
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
OpnConf = connection_config(Config), OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Connection} = amqp10_client:open_connection(OpnConf),
@ -3131,11 +3131,11 @@ resource_alarm_before_session_begin(Config) ->
ok = end_session_sync(Session1), ok = end_session_sync(Session1),
ok = close_connection_sync(Connection), ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch). ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
resource_alarm_after_session_begin(Config) -> resource_alarm_after_session_begin(Config) ->
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
Address = rabbitmq_amqp_address:queue(QName), Address = rabbitmq_amqp_address:queue(QName),
OpnConf = connection_config(Config), OpnConf = connection_config(Config),
@ -3198,13 +3198,13 @@ resource_alarm_after_session_begin(Config) ->
ok = close_connection_sync(Connection1), ok = close_connection_sync(Connection1),
ok = close_connection_sync(Connection2), ok = close_connection_sync(Connection2),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch). ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
%% Test case for %% Test case for
%% https://github.com/rabbitmq/rabbitmq-server/issues/12816 %% https://github.com/rabbitmq/rabbitmq-server/issues/12816
resource_alarm_send_many(Config) -> resource_alarm_send_many(Config) ->
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
Address = rabbitmq_amqp_address:queue(QName), Address = rabbitmq_amqp_address:queue(QName),
OpnConf = connection_config(Config), OpnConf = connection_config(Config),
@ -3234,7 +3234,7 @@ resource_alarm_send_many(Config) ->
ok = end_session_sync(Session), ok = end_session_sync(Session),
ok = close_connection_sync(Connection), ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch). ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
auth_attempt_metrics(Config) -> auth_attempt_metrics(Config) ->
open_and_close_connection(Config), open_and_close_connection(Config),
@ -3267,7 +3267,7 @@ max_message_size_client_to_server(Config) ->
ok = rpc(Config, persistent_term, put, [max_message_size, MaxMessageSize]), ok = rpc(Config, persistent_term, put, [max_message_size, MaxMessageSize]),
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
Address = rabbitmq_amqp_address:queue(QName), Address = rabbitmq_amqp_address:queue(QName),
OpnConf = connection_config(Config), OpnConf = connection_config(Config),
@ -3291,12 +3291,12 @@ max_message_size_client_to_server(Config) ->
ok = end_session_sync(Session), ok = end_session_sync(Session),
ok = close_connection_sync(Connection), ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
ok = rpc(Config, persistent_term, put, [max_message_size, DefaultMaxMessageSize]). ok = rpc(Config, persistent_term, put, [max_message_size, DefaultMaxMessageSize]).
max_message_size_server_to_client(Config) -> max_message_size_server_to_client(Config) ->
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
Address = rabbitmq_amqp_address:queue(QName), Address = rabbitmq_amqp_address:queue(QName),
OpnConf = connection_config(Config), OpnConf = connection_config(Config),
@ -3345,13 +3345,13 @@ max_message_size_server_to_client(Config) ->
ok = close_connection_sync(Connection), ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch). ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
last_queue_confirms(Config) -> last_queue_confirms(Config) ->
ClassicQ = <<"my classic queue">>, ClassicQ = <<"my classic queue">>,
QuorumQ = <<"my quorum queue">>, QuorumQ = <<"my quorum queue">>,
Qs = [ClassicQ, QuorumQ], Qs = [ClassicQ, QuorumQ],
Ch = rabbit_ct_client_helpers:open_channel(Config, 0), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{queue = ClassicQ}), Ch, #'queue.declare'{queue = ClassicQ}),
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
@ -3417,13 +3417,13 @@ last_queue_confirms(Config) ->
amqp_channel:call(Ch, #'queue.delete'{queue = ClassicQ})), amqp_channel:call(Ch, #'queue.delete'{queue = ClassicQ})),
?assertEqual(#'queue.delete_ok'{message_count = 2}, ?assertEqual(#'queue.delete_ok'{message_count = 2},
amqp_channel:call(Ch, #'queue.delete'{queue = QuorumQ})), amqp_channel:call(Ch, #'queue.delete'{queue = QuorumQ})),
ok = rabbit_ct_client_helpers:close_channel(Ch). ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
target_queue_deleted(Config) -> target_queue_deleted(Config) ->
ClassicQ = <<"my classic queue">>, ClassicQ = <<"my classic queue">>,
QuorumQ = <<"my quorum queue">>, QuorumQ = <<"my quorum queue">>,
Qs = [ClassicQ, QuorumQ], Qs = [ClassicQ, QuorumQ],
Ch = rabbit_ct_client_helpers:open_channel(Config, 0), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{queue = ClassicQ}), Ch, #'queue.declare'{queue = ClassicQ}),
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
@ -3489,11 +3489,12 @@ target_queue_deleted(Config) ->
ok = close_connection_sync(Connection), ok = close_connection_sync(Connection),
?assertEqual(#'queue.delete_ok'{message_count = 2}, ?assertEqual(#'queue.delete_ok'{message_count = 2},
amqp_channel:call(Ch, #'queue.delete'{queue = QuorumQ})), amqp_channel:call(Ch, #'queue.delete'{queue = QuorumQ})),
ok = rabbit_ct_client_helpers:close_channel(Ch). ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
target_classic_queue_down(Config) -> target_classic_queue_down(Config) ->
ClassicQueueNode = 2, ClassicQueueNode = 2,
Ch = rabbit_ct_client_helpers:open_channel(Config, ClassicQueueNode), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(
Config, ClassicQueueNode),
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(QName), Address = rabbitmq_amqp_address:queue(QName),
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
@ -3501,7 +3502,7 @@ target_classic_queue_down(Config) ->
queue = QName, queue = QName,
durable = true, durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}]}), arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}]}),
ok = rabbit_ct_client_helpers:close_channels_and_connection(Config, ClassicQueueNode), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
OpnConf = connection_config(Config), OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Connection} = amqp10_client:open_connection(OpnConf),
@ -3579,7 +3580,8 @@ async_notify_unsettled_stream(Config) ->
%% Test asynchronous notification, figure 2.45. %% Test asynchronous notification, figure 2.45.
async_notify(SenderSettleMode, QType, Config) -> async_notify(SenderSettleMode, QType, Config) ->
%% Place queue leader on the old node. %% Place queue leader on the old node.
Ch = rabbit_ct_client_helpers:open_channel(Config, 1), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(
Config, 1),
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{ Ch, #'queue.declare'{
@ -3636,7 +3638,7 @@ async_notify(SenderSettleMode, QType, Config) ->
end, end,
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
ok = end_session_sync(Session), ok = end_session_sync(Session),
ok = close_connection_sync(Connection). ok = close_connection_sync(Connection).
@ -3644,7 +3646,7 @@ async_notify(SenderSettleMode, QType, Config) ->
%% (slow queue) does not impact other link receivers (fast queues) on the **same** session. %% (slow queue) does not impact other link receivers (fast queues) on the **same** session.
%% (This is unlike AMQP legacy where a single slow queue will block the entire connection.) %% (This is unlike AMQP legacy where a single slow queue will block the entire connection.)
link_flow_control(Config) -> link_flow_control(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
CQ = <<"cq">>, CQ = <<"cq">>,
QQ = <<"qq">>, QQ = <<"qq">>,
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
@ -3657,6 +3659,7 @@ link_flow_control(Config) ->
queue = QQ, queue = QQ,
durable = true, durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}), arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
OpnConf = connection_config(Config), OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection), {ok, Session} = amqp10_client:begin_session_sync(Connection),
@ -3744,7 +3747,8 @@ quorum_queue_on_new_node(Config) ->
%% In mixed version tests, run the queue leader with old code %% In mixed version tests, run the queue leader with old code
%% and queue client with new code, or vice versa. %% and queue client with new code, or vice versa.
queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config) -> queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, QueueLeaderNode), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(
Config, QueueLeaderNode),
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{queue = QName, Ch, #'queue.declare'{queue = QName,
@ -3813,7 +3817,7 @@ queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config)
ExpectedReadyMsgs = 0, ExpectedReadyMsgs = 0,
?assertEqual(#'queue.delete_ok'{message_count = ExpectedReadyMsgs}, ?assertEqual(#'queue.delete_ok'{message_count = ExpectedReadyMsgs},
amqp_channel:call(Ch, #'queue.delete'{queue = QName})), amqp_channel:call(Ch, #'queue.delete'{queue = QName})),
ok = rabbit_ct_client_helpers:close_channel(Ch), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
ok = close_connection_sync(Connection). ok = close_connection_sync(Connection).
maintenance(Config) -> maintenance(Config) ->
@ -4013,7 +4017,7 @@ global_counters(Config) ->
messages_redelivered_total := QQRedelivered0, messages_redelivered_total := QQRedelivered0,
messages_acknowledged_total := QQAcknowledged0} = get_global_counters(Config, rabbit_quorum_queue), messages_acknowledged_total := QQAcknowledged0} = get_global_counters(Config, rabbit_quorum_queue),
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
CQ = <<"my classic queue">>, CQ = <<"my classic queue">>,
QQ = <<"my quorum queue">>, QQ = <<"my quorum queue">>,
CQAddress = rabbitmq_amqp_address:queue(CQ), CQAddress = rabbitmq_amqp_address:queue(CQ),
@ -4138,7 +4142,7 @@ global_counters(Config) ->
%% m4 was returned %% m4 was returned
?assertEqual(UnroutableReturned1 + 1, UnroutableReturned2), ?assertEqual(UnroutableReturned1 + 1, UnroutableReturned2),
ok = rabbit_ct_client_helpers:close_channel(Ch), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
ok = amqp10_client:detach_link(Sender), ok = amqp10_client:detach_link(Sender),
ok = end_session_sync(Session), ok = end_session_sync(Session),
ok = close_connection_sync(Connection). ok = close_connection_sync(Connection).
@ -4146,12 +4150,12 @@ global_counters(Config) ->
stream_bloom_filter(Config) -> stream_bloom_filter(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME), Stream = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(Stream), Address = rabbitmq_amqp_address:queue(Stream),
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
amqp_channel:call(Ch, #'queue.declare'{ amqp_channel:call(Ch, #'queue.declare'{
queue = Stream, queue = Stream,
durable = true, durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]}), arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]}),
ok = rabbit_ct_client_helpers:close_channel(Ch), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
OpnConf = connection_config(Config), OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Connection} = amqp10_client:open_connection(OpnConf),
@ -4278,7 +4282,7 @@ available_messages_stream(Config) ->
available_messages(QType, Config) -> available_messages(QType, Config) ->
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{ Ch, #'queue.declare'{
queue = QName, queue = QName,
@ -4370,7 +4374,7 @@ available_messages(QType, Config) ->
ok = end_session_sync(Session), ok = end_session_sync(Session),
ok = close_connection_sync(Connection), ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch). ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
incoming_message_interceptors(Config) -> incoming_message_interceptors(Config) ->
Key = ?FUNCTION_NAME, Key = ?FUNCTION_NAME,
@ -4437,7 +4441,7 @@ trace(Q, QType, Config) ->
RoutingKey = <<"my routing key">>, RoutingKey = <<"my routing key">>,
Payload = <<"my payload">>, Payload = <<"my payload">>,
CorrelationId = <<"my correlation 👀"/utf8>>, CorrelationId = <<"my correlation 👀"/utf8>>,
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{ Ch, #'queue.declare'{
queue = Q, queue = Q,
@ -4516,6 +4520,7 @@ trace(Q, QType, Config) ->
timer:sleep(20), timer:sleep(20),
?assertMatch(#'basic.get_empty'{}, ?assertMatch(#'basic.get_empty'{},
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ})), amqp_channel:call(Ch, #'basic.get'{queue = TraceQ})),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
ok = amqp10_client:detach_link(Sender), ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:detach_link(Receiver), ok = amqp10_client:detach_link(Receiver),
@ -4560,9 +4565,9 @@ user_id(Config) ->
message_ttl(Config) -> message_ttl(Config) ->
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(QName), Address = rabbitmq_amqp_address:queue(QName),
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
OpnConf = connection_config(Config), OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection), {ok, Session} = amqp10_client:begin_session_sync(Connection),
@ -4747,7 +4752,7 @@ credential_expires(Config) ->
%% Attaching to an exclusive source queue should fail. %% Attaching to an exclusive source queue should fail.
attach_to_exclusive_queue(Config) -> attach_to_exclusive_queue(Config) ->
QName = <<"my queue">>, QName = <<"my queue">>,
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call( #'queue.declare_ok'{} = amqp_channel:call(
Ch, #'queue.declare'{queue = QName, Ch, #'queue.declare'{queue = QName,
durable = true, durable = true,
@ -4770,7 +4775,7 @@ attach_to_exclusive_queue(Config) ->
ok = close_connection_sync(Connection), ok = close_connection_sync(Connection),
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch). ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
dynamic_target_short_link_name(Config) -> dynamic_target_short_link_name(Config) ->
OpnConf0 = connection_config(Config), OpnConf0 = connection_config(Config),
@ -5883,9 +5888,9 @@ receive_many_auto_flow(QType, Config) ->
%% incoming-window being closed. %% incoming-window being closed.
incoming_window_closed_transfer_flow_order(Config) -> incoming_window_closed_transfer_flow_order(Config) ->
QName = atom_to_binary(?FUNCTION_NAME), QName = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
Address = rabbitmq_amqp_address:queue(QName), Address = rabbitmq_amqp_address:queue(QName),
OpnConf = connection_config(Config), OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Connection} = amqp10_client:open_connection(OpnConf),