rabbit_exchange_type_consistent_hash_SUITE: Open/close connection explicitly

[Why]
In CI, we observe that the channel hangs sometimes.
rabbitmq_ct_client_helpers implicit connection is quite fragile in the
sense that a test case can disturb the next one in some cases.

[How]
Let's use a dedicated connection and see if it fixes the problem.
This commit is contained in:
Jean-Sébastien Pédron 2025-07-31 17:34:41 +02:00
parent 267445680f
commit 17feaa158c
No known key found for this signature in database
GPG Key ID: 39E99761A5FD94CC
1 changed files with 46 additions and 42 deletions

View File

@ -156,7 +156,7 @@ custom_header_undefined(Config) ->
Exchange = <<"my exchange">>, Exchange = <<"my exchange">>,
Queue = <<"my queue">>, Queue = <<"my queue">>,
Ch = rabbit_ct_client_helpers:open_channel(Config), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
#'exchange.declare_ok'{} = amqp_channel:call( #'exchange.declare_ok'{} = amqp_channel:call(
Ch, #'exchange.declare' { Ch, #'exchange.declare' {
@ -179,7 +179,7 @@ custom_header_undefined(Config) ->
?assertMatch({#'basic.get_ok'{}, #amqp_msg{}}, ?assertMatch({#'basic.get_ok'{}, #amqp_msg{}},
amqp_channel:call(Ch, #'basic.get'{queue = Queue})), amqp_channel:call(Ch, #'basic.get'{queue = Queue})),
rabbit_ct_client_helpers:close_channel(Ch), rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
clean_up_test_topology(Config, Exchange, [Queue]), clean_up_test_topology(Config, Exchange, [Queue]),
ok. ok.
@ -373,7 +373,7 @@ test_with_timestamp(Config, Qs) ->
Qs). Qs).
test_mutually_exclusive_arguments(Config) -> test_mutually_exclusive_arguments(Config) ->
Chan = rabbit_ct_client_helpers:open_channel(Config, 0), {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
process_flag(trap_exit, true), process_flag(trap_exit, true),
Cmd = #'exchange.declare'{ Cmd = #'exchange.declare'{
@ -384,11 +384,11 @@ test_mutually_exclusive_arguments(Config) ->
}, },
?assertExit(_, amqp_channel:call(Chan, Cmd)), ?assertExit(_, amqp_channel:call(Chan, Cmd)),
rabbit_ct_client_helpers:close_channel(Chan), rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
ok. ok.
test_non_supported_property(Config) -> test_non_supported_property(Config) ->
Chan = rabbit_ct_client_helpers:open_channel(Config, 0), {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
process_flag(trap_exit, true), process_flag(trap_exit, true),
Cmd = #'exchange.declare'{ Cmd = #'exchange.declare'{
@ -398,7 +398,7 @@ test_non_supported_property(Config) ->
}, },
?assertExit(_, amqp_channel:call(Chan, Cmd)), ?assertExit(_, amqp_channel:call(Chan, Cmd)),
rabbit_ct_client_helpers:close_channel(Chan), rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
ok. ok.
rnd() -> rnd() ->
@ -411,13 +411,13 @@ test0(Config, MakeMethod, MakeMsg, DeclareArgs, Queues) ->
test0(Config, MakeMethod, MakeMsg, DeclareArgs, Queues, ?DEFAULT_SAMPLE_COUNT). test0(Config, MakeMethod, MakeMsg, DeclareArgs, Queues, ?DEFAULT_SAMPLE_COUNT).
test0(Config, MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues, IterationCount) -> test0(Config, MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues, IterationCount) ->
Chan = rabbit_ct_client_helpers:open_channel(Config),
#'confirm.select_ok'{} = amqp_channel:call(Chan, #'confirm.select'{}),
CHX = <<"e">>, CHX = <<"e">>,
clean_up_test_topology(Config, CHX, Queues), clean_up_test_topology(Config, CHX, Queues),
{Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
#'confirm.select_ok'{} = amqp_channel:call(Chan, #'confirm.select'{}),
#'exchange.declare_ok'{} = #'exchange.declare_ok'{} =
amqp_channel:call(Chan, amqp_channel:call(Chan,
#'exchange.declare' { #'exchange.declare' {
@ -464,11 +464,11 @@ test0(Config, MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues, Itera
[Chi, Obs]), [Chi, Obs]),
clean_up_test_topology(Config, CHX, Queues), clean_up_test_topology(Config, CHX, Queues),
rabbit_ct_client_helpers:close_channel(Chan), rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
ok. ok.
test_binding_with_negative_routing_key(Config) -> test_binding_with_negative_routing_key(Config) ->
Chan = rabbit_ct_client_helpers:open_channel(Config, 0), {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
X = <<"bind-fail">>, X = <<"bind-fail">>,
amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), amqp_channel:call(Chan, #'exchange.delete' {exchange = X}),
@ -482,15 +482,15 @@ test_binding_with_negative_routing_key(Config) ->
Cmd = #'queue.bind'{exchange = <<"bind-fail">>, Cmd = #'queue.bind'{exchange = <<"bind-fail">>,
routing_key = <<"-1">>}, routing_key = <<"-1">>},
?assertExit(_, amqp_channel:call(Chan, Cmd)), ?assertExit(_, amqp_channel:call(Chan, Cmd)),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, 0), {Conn2, Ch2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
amqp_channel:call(Ch2, #'queue.delete'{queue = Q}), amqp_channel:call(Ch2, #'queue.delete'{queue = Q}),
rabbit_ct_client_helpers:close_channel(Chan), rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
rabbit_ct_client_helpers:close_channel(Ch2), rabbit_ct_client_helpers:close_connection_and_channel(Conn2, Ch2),
ok. ok.
test_binding_with_non_numeric_routing_key(Config) -> test_binding_with_non_numeric_routing_key(Config) ->
Chan = rabbit_ct_client_helpers:open_channel(Config, 0), {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
X = <<"bind-fail">>, X = <<"bind-fail">>,
amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), amqp_channel:call(Chan, #'exchange.delete' {exchange = X}),
@ -505,10 +505,11 @@ test_binding_with_non_numeric_routing_key(Config) ->
routing_key = <<"not-a-number">>}, routing_key = <<"not-a-number">>},
?assertExit(_, amqp_channel:call(Chan, Cmd)), ?assertExit(_, amqp_channel:call(Chan, Cmd)),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, 0), {Conn2, Ch2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
amqp_channel:call(Ch2, #'queue.delete'{queue = Q}), amqp_channel:call(Ch2, #'queue.delete'{queue = Q}),
rabbit_ct_client_helpers:close_channel(Chan), rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
rabbit_ct_client_helpers:close_connection_and_channel(Conn2, Ch2),
ok. ok.
%% %%
@ -516,7 +517,7 @@ test_binding_with_non_numeric_routing_key(Config) ->
%% %%
test_durable_exchange_hash_ring_recovery_between_node_restarts(Config) -> test_durable_exchange_hash_ring_recovery_between_node_restarts(Config) ->
Chan = rabbit_ct_client_helpers:open_channel(Config, 0), {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
X = <<"test_hash_ring_recovery_between_node_restarts">>, X = <<"test_hash_ring_recovery_between_node_restarts">>,
amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), amqp_channel:call(Chan, #'exchange.delete' {exchange = X}),
@ -547,11 +548,11 @@ test_durable_exchange_hash_ring_recovery_between_node_restarts(Config) ->
assert_ring_consistency(Config, X), assert_ring_consistency(Config, X),
clean_up_test_topology(Config, X, Queues), clean_up_test_topology(Config, X, Queues),
rabbit_ct_client_helpers:close_channel(Chan), rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
ok. ok.
test_hash_ring_updates_when_queue_is_deleted(Config) -> test_hash_ring_updates_when_queue_is_deleted(Config) ->
Chan = rabbit_ct_client_helpers:open_channel(Config, 0), {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
X = <<"test_hash_ring_updates_when_queue_is_deleted">>, X = <<"test_hash_ring_updates_when_queue_is_deleted">>,
amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), amqp_channel:call(Chan, #'exchange.delete' {exchange = X}),
@ -576,11 +577,11 @@ test_hash_ring_updates_when_queue_is_deleted(Config) ->
?assertEqual(0, count_buckets_of_exchange(Config, X)), ?assertEqual(0, count_buckets_of_exchange(Config, X)),
clean_up_test_topology(Config, X, [Q]), clean_up_test_topology(Config, X, [Q]),
rabbit_ct_client_helpers:close_channel(Chan), rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
ok. ok.
test_hash_ring_updates_when_multiple_queues_are_deleted(Config) -> test_hash_ring_updates_when_multiple_queues_are_deleted(Config) ->
Chan = rabbit_ct_client_helpers:open_channel(Config, 0), {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
X = <<"test_hash_ring_updates_when_multiple_queues_are_deleted">>, X = <<"test_hash_ring_updates_when_multiple_queues_are_deleted">>,
amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), amqp_channel:call(Chan, #'exchange.delete' {exchange = X}),
@ -611,7 +612,7 @@ test_hash_ring_updates_when_multiple_queues_are_deleted(Config) ->
?assertEqual(0, count_buckets_of_exchange(Config, X)), ?assertEqual(0, count_buckets_of_exchange(Config, X)),
clean_up_test_topology(Config, X, Queues), clean_up_test_topology(Config, X, Queues),
rabbit_ct_client_helpers:close_channel(Chan), rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
ok. ok.
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure(Config) -> test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure(Config) ->
@ -706,7 +707,7 @@ test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closu
ok. ok.
test_hash_ring_updates_when_exchange_is_deleted(Config) -> test_hash_ring_updates_when_exchange_is_deleted(Config) ->
Chan = rabbit_ct_client_helpers:open_channel(Config, 0), {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
X = <<"test_hash_ring_updates_when_exchange_is_deleted">>, X = <<"test_hash_ring_updates_when_exchange_is_deleted">>,
amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), amqp_channel:call(Chan, #'exchange.delete' {exchange = X}),
@ -734,11 +735,11 @@ test_hash_ring_updates_when_exchange_is_deleted(Config) ->
?assertEqual(0, count_buckets_of_exchange(Config, X)), ?assertEqual(0, count_buckets_of_exchange(Config, X)),
clean_up_test_topology(Config, X, Queues), clean_up_test_topology(Config, X, Queues),
rabbit_ct_client_helpers:close_channel(Chan), rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
ok. ok.
test_hash_ring_updates_when_queue_is_unbound(Config) -> test_hash_ring_updates_when_queue_is_unbound(Config) ->
Chan = rabbit_ct_client_helpers:open_channel(Config, 0), {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
X = <<"test_hash_ring_updates_when_queue_is_unbound">>, X = <<"test_hash_ring_updates_when_queue_is_unbound">>,
amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), amqp_channel:call(Chan, #'exchange.delete' {exchange = X}),
@ -769,11 +770,11 @@ test_hash_ring_updates_when_queue_is_unbound(Config) ->
?assertEqual(8, count_buckets_of_exchange(Config, X)), ?assertEqual(8, count_buckets_of_exchange(Config, X)),
clean_up_test_topology(Config, X, Queues), clean_up_test_topology(Config, X, Queues),
rabbit_ct_client_helpers:close_channel(Chan), rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
ok. ok.
test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted(Config) -> test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted(Config) ->
Chan = rabbit_ct_client_helpers:open_channel(Config, 0), {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
X = <<"test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted">>, X = <<"test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted">>,
amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), amqp_channel:call(Chan, #'exchange.delete' {exchange = X}),
@ -818,11 +819,11 @@ test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted(Co
assert_ring_consistency(Config, X), assert_ring_consistency(Config, X),
clean_up_test_topology(Config, X, [Q1, Q2]), clean_up_test_topology(Config, X, [Q1, Q2]),
rabbit_ct_client_helpers:close_channel(Chan), rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
ok. ok.
test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted(Config) -> test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted(Config) ->
Chan = rabbit_ct_client_helpers:open_channel(Config, 0), {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
X = <<"test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted">>, X = <<"test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted">>,
amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), amqp_channel:call(Chan, #'exchange.delete' {exchange = X}),
@ -872,14 +873,14 @@ test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted(
?assertEqual(0, count_buckets_of_exchange(Config, X)), ?assertEqual(0, count_buckets_of_exchange(Config, X)),
clean_up_test_topology(Config, X, [Q1, Q2]), clean_up_test_topology(Config, X, [Q1, Q2]),
rabbit_ct_client_helpers:close_channel(Chan), rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
ok. ok.
%% Follows the setup described in %% Follows the setup described in
%% https://github.com/rabbitmq/rabbitmq-server/issues/3386#issuecomment-1103929292 %% https://github.com/rabbitmq/rabbitmq-server/issues/3386#issuecomment-1103929292
node_restart(Config) -> node_restart(Config) ->
Chan1 = rabbit_ct_client_helpers:open_channel(Config, 1), {Conn1, Chan1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 1),
Chan2 = rabbit_ct_client_helpers:open_channel(Config, 2), {Conn2, Chan2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 2),
X = atom_to_binary(?FUNCTION_NAME), X = atom_to_binary(?FUNCTION_NAME),
#'exchange.declare_ok'{} = amqp_channel:call(Chan1, #'exchange.declare_ok'{} = amqp_channel:call(Chan1,
@ -903,8 +904,8 @@ node_restart(Config) ->
F(Chan1, QsNode1), F(Chan1, QsNode1),
F(Chan2, QsNode2), F(Chan2, QsNode2),
rabbit_ct_client_helpers:close_channel(Chan1), rabbit_ct_client_helpers:close_connection_and_channel(Conn1, Chan1),
rabbit_ct_client_helpers:close_channel(Chan2), rabbit_ct_client_helpers:close_connection_and_channel(Conn2, Chan2),
rabbit_ct_broker_helpers:restart_node(Config, 1), rabbit_ct_broker_helpers:restart_node(Config, 1),
rabbit_ct_broker_helpers:restart_node(Config, 2), rabbit_ct_broker_helpers:restart_node(Config, 2),
@ -942,13 +943,14 @@ count_buckets_of_exchange(Config, X) ->
from_mnesia_to_khepri(Config) -> from_mnesia_to_khepri(Config) ->
Queues = [Q1, Q2, Q3, Q4] = ?RoutingTestQs, Queues = [Q1, Q2, Q3, Q4] = ?RoutingTestQs,
IterationCount = ?DEFAULT_SAMPLE_COUNT, IterationCount = ?DEFAULT_SAMPLE_COUNT,
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
#'confirm.select_ok'{} = amqp_channel:call(Chan, #'confirm.select'{}),
CHX = <<"e">>, CHX = <<"e">>,
clean_up_test_topology(Config, CHX, Queues), clean_up_test_topology(Config, CHX, Queues),
{Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
#'confirm.select_ok'{} = amqp_channel:call(Chan, #'confirm.select'{}),
#'exchange.declare_ok'{} = #'exchange.declare_ok'{} =
amqp_channel:call(Chan, amqp_channel:call(Chan,
#'exchange.declare' { #'exchange.declare' {
@ -997,12 +999,14 @@ from_mnesia_to_khepri(Config) ->
ct:pal("Chi-square test for 3 degrees of freedom is ~p, p = 0.01 is 11.35, observations (counts, expected): ~p", ct:pal("Chi-square test for 3 degrees of freedom is ~p, p = 0.01 is 11.35, observations (counts, expected): ~p",
[Chi, Obs]), [Chi, Obs]),
clean_up_test_topology(Config, CHX, Queues), clean_up_test_topology(Config, CHX, Queues),
rabbit_ct_client_helpers:close_channel(Chan), rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
ok; ok;
Skip -> Skip ->
rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
Skip Skip
end; end;
Skip -> Skip ->
rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
Skip Skip
end. end.
@ -1010,12 +1014,12 @@ clean_up_test_topology(Config) ->
clean_up_test_topology(Config, none, ?AllQs). clean_up_test_topology(Config, none, ?AllQs).
clean_up_test_topology(Config, none, Qs) -> clean_up_test_topology(Config, none, Qs) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, 0), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
[amqp_channel:call(Ch, #'queue.delete' {queue = Q}) || Q <- Qs], [amqp_channel:call(Ch, #'queue.delete' {queue = Q}) || Q <- Qs],
rabbit_ct_client_helpers:close_channel(Ch); rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch);
clean_up_test_topology(Config, X, Qs) -> clean_up_test_topology(Config, X, Qs) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, 0), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
amqp_channel:call(Ch, #'exchange.delete' {exchange = X}), amqp_channel:call(Ch, #'exchange.delete' {exchange = X}),
[amqp_channel:call(Ch, #'queue.delete' {queue = Q}) || Q <- Qs], [amqp_channel:call(Ch, #'queue.delete' {queue = Q}) || Q <- Qs],
rabbit_ct_client_helpers:close_channel(Ch). rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).