From fe7a1413316eb88a9bd972ef4a67fcda11fe6cfa Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Thu, 12 Dec 2024 15:51:59 +0100 Subject: [PATCH] Test: Increase receive timeout in all rabbit test suites --- deps/rabbit/test/amqp_address_SUITE.erl | 20 +++-- deps/rabbit/test/amqp_auth_SUITE.erl | 86 ++++++++++--------- deps/rabbit/test/amqp_client_SUITE.erl | 2 +- deps/rabbit/test/amqp_credit_api_v2_SUITE.erl | 16 ++-- deps/rabbit/test/amqp_utils.erl | 22 ++--- deps/rabbit/test/amqpl_consumer_ack_SUITE.erl | 34 ++++---- .../test/amqpl_direct_reply_to_SUITE.erl | 10 ++- deps/rabbit/test/backing_queue_SUITE.erl | 2 +- .../rabbit/test/clustering_recovery_SUITE.erl | 4 +- deps/rabbit/test/confirms_rejects_SUITE.erl | 12 +-- deps/rabbit/test/consumer_timeout_SUITE.erl | 10 ++- deps/rabbit/test/dead_lettering_SUITE.erl | 28 +++--- .../test/direct_exchange_routing_v2_SUITE.erl | 4 +- ...disconnect_detected_during_alarm_SUITE.erl | 2 +- .../test/metadata_store_clustering_SUITE.erl | 4 +- .../rabbit/test/mirrored_supervisor_SUITE.erl | 2 +- deps/rabbit/test/msg_size_metrics_SUITE.erl | 6 +- .../publisher_confirms_parallel_SUITE.erl | 10 +-- .../rabbit/test/queue_length_limits_SUITE.erl | 8 +- deps/rabbit/test/queue_parallel_SUITE.erl | 4 +- deps/rabbit/test/queue_type_SUITE.erl | 12 +-- deps/rabbit/test/quorum_queue_SUITE.erl | 53 ++++++------ .../test/rabbit_db_topic_exchange_SUITE.erl | 2 +- .../rabbit_fifo_dlx_integration_SUITE.erl | 8 +- deps/rabbit/test/rabbit_fifo_int_SUITE.erl | 35 ++++---- .../rabbit_local_random_exchange_SUITE.erl | 2 +- .../rabbit/test/rabbit_stream_queue_SUITE.erl | 45 +++++----- .../rabbitmq_queues_cli_integration_SUITE.erl | 2 +- .../test/single_active_consumer_SUITE.erl | 16 ++-- deps/rabbit/test/topic_permission_SUITE.erl | 8 +- deps/rabbit/test/transactions_SUITE.erl | 2 +- deps/rabbit/test/unicode_SUITE.erl | 2 +- .../test/unit_file_handle_cache_SUITE.erl | 4 +- 33 files changed, 248 insertions(+), 229 deletions(-) diff --git a/deps/rabbit/test/amqp_address_SUITE.erl b/deps/rabbit/test/amqp_address_SUITE.erl index 607aa11473..a914442d97 100644 --- a/deps/rabbit/test/amqp_address_SUITE.erl +++ b/deps/rabbit/test/amqp_address_SUITE.erl @@ -22,6 +22,8 @@ [flush/1, wait_for_credit/1]). +-define(TIMEOUT, 30_000). + all() -> [ {group, v1_permitted}, @@ -216,7 +218,7 @@ target_exchange_absent(Config) -> condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, description = {utf8, <<"no exchange '", XName:(byte_size(XName))/binary, "' in vhost '/'">>}}}}} -> ok - after 5000 -> + after ?TIMEOUT -> Reason = {missing_event, ?LINE}, flush(Reason), ct:fail(Reason) @@ -275,7 +277,7 @@ target_queue_absent(Config) -> condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, description = {utf8, <<"no queue '", QName:(byte_size(QName))/binary, "' in vhost '/'">>}}}}} -> ok - after 5000 -> + after ?TIMEOUT -> Reason = {missing_event, ?LINE}, flush(Reason), ct:fail(Reason) @@ -403,7 +405,7 @@ target_per_message_unset_to_address(Config) -> ok = amqp10_client:detach_link(Sender), receive {amqp10_event, {link, Sender, {detached, normal}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection). @@ -467,7 +469,7 @@ target_per_message_bad_to_address(Config) -> ?assertMatch(#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, description = {utf8, <<"bad 'to' address", _Rest/binary>>}}, Error) - after 5000 -> + after ?TIMEOUT -> flush(missing_disposition), ct:fail(missing_disposition) end @@ -507,7 +509,7 @@ target_per_message_exchange_absent_settled(Config) -> info = {map, [{{symbol, <<"delivery-tag">>}, {binary, DTag2}}]} }, Error) - after 5000 -> ct:fail("server did not close our outgoing link") + after ?TIMEOUT -> ct:fail("server did not close our outgoing link") end, ok = cleanup(Init). @@ -566,7 +568,7 @@ target_bad_address0(TargetAddress, Config) -> {session, Session, {ended, #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD}}}} -> ok - after 5000 -> + after ?TIMEOUT -> Reason = {missing_event, ?LINE, TargetAddress}, flush(Reason), ct:fail(Reason) @@ -593,7 +595,7 @@ source_queue_absent(Config) -> condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, description = {utf8, <<"no queue '", QName:(byte_size(QName))/binary, "' in vhost '/'">>}}}}} -> ok - after 5000 -> + after ?TIMEOUT -> Reason = {missing_event, ?LINE}, flush(Reason), ct:fail(Reason) @@ -626,7 +628,7 @@ source_bad_address0(SourceAddress, Config) -> {session, Session, {ended, #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD}}}} -> ok - after 5000 -> + after ?TIMEOUT -> Reason = {missing_event, ?LINE}, flush(Reason), ct:fail(Reason) @@ -657,7 +659,7 @@ wait_for_settled(State, Tag) -> receive {amqp10_disposition, {State, Tag}} -> ok - after 5000 -> + after ?TIMEOUT -> Reason = {?FUNCTION_NAME, State, Tag}, flush(Reason), ct:fail(Reason) diff --git a/deps/rabbit/test/amqp_auth_SUITE.erl b/deps/rabbit/test/amqp_auth_SUITE.erl index 6bd905a924..f9328aab96 100644 --- a/deps/rabbit/test/amqp_auth_SUITE.erl +++ b/deps/rabbit/test/amqp_auth_SUITE.erl @@ -14,6 +14,8 @@ -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("amqp10_common/include/amqp10_framing.hrl"). +-define(TIMEOUT, 30_000). + -import(rabbit_ct_broker_helpers, [rpc/4]). -import(rabbit_ct_helpers, @@ -153,8 +155,8 @@ v1_attach_target_queue(Config) -> <<"configure access to queue 'test queue' in vhost " "'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session1, {ended, ExpectedErr1}}} -> ok - after 5000 -> flush(missing_ended), - ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") + after ?TIMEOUT -> flush(missing_ended), + ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, %% Give the user configure permissions on the queue. @@ -166,7 +168,7 @@ v1_attach_target_queue(Config) -> <<"write access to exchange 'amq.default' in vhost " "'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session2, {ended, ExpectedErr2}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, @@ -177,8 +179,8 @@ v1_attach_target_queue(Config) -> {ok, Sender3} = amqp10_client:attach_sender_link( Session3, <<"test-sender-3">>, TargetAddress), receive {amqp10_event, {link, Sender3, attached}} -> ok - after 5000 -> flush(missing_attached), - ct:fail("missing ATTACH from server") + after ?TIMEOUT -> flush(missing_attached), + ct:fail("missing ATTACH from server") end, ok = close_connection_sync(Connection). @@ -202,8 +204,8 @@ v1_attach_source_exchange(Config) -> #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, description = {utf8, <<"configure access to queue 'amq.gen", _/binary>>}}}}} -> ok - after 5000 -> flush(missing_ended), - ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") + after ?TIMEOUT -> flush(missing_ended), + ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, %% Give the user configure permissions on the queue. @@ -218,7 +220,7 @@ v1_attach_source_exchange(Config) -> #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, description = {utf8, <<"write access to queue 'amq.gen", _/binary>>}}}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, @@ -231,7 +233,7 @@ v1_attach_source_exchange(Config) -> <<"read access to exchange 'amq.fanout' in vhost " "'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session3, {ended, ExpectedErr1}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, @@ -247,7 +249,7 @@ v1_attach_source_exchange(Config) -> #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, description = {utf8, <<"read access to queue 'amq.gen", _/binary>>}}}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, @@ -258,7 +260,7 @@ v1_attach_source_exchange(Config) -> {ok, Recv5} = amqp10_client:attach_receiver_link( Session5, <<"receiver-5">>, SourceAddress), receive {amqp10_event, {link, Recv5, attached}} -> ok - after 5000 -> flush(missing_attached), + after ?TIMEOUT -> flush(missing_attached), ct:fail("missing ATTACH from server") end, @@ -290,7 +292,7 @@ send_to_topic(TargetAddress, Config) -> <<"write access to topic 'test vhost.test user.a.b' in exchange " "'amq.topic' in vhost 'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, @@ -304,7 +306,7 @@ send_to_topic(TargetAddress, Config) -> ok = amqp10_client:send_msg(Sender2, Msg2), %% We expect RELEASED since no queue is bound. receive {amqp10_disposition, {released, Dtag}} -> ok - after 5000 -> ct:fail(released_timeout) + after ?TIMEOUT -> ct:fail(released_timeout) end, ok = amqp10_client:detach_link(Sender2), @@ -330,7 +332,7 @@ v1_send_to_topic_using_subject(Config) -> ok = amqp10_client:send_msg(Sender, Msg1b), %% We have sufficient authorization, but expect RELEASED since no queue is bound. receive {amqp10_disposition, {released, Dtag1}} -> ok - after 5000 -> ct:fail(released_timeout) + after ?TIMEOUT -> ct:fail(released_timeout) end, Dtag2 = <<"dtag 2">>, @@ -342,7 +344,7 @@ v1_send_to_topic_using_subject(Config) -> <<"write access to topic '.a.b' in exchange 'amq.topic' in " "vhost 'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, @@ -374,7 +376,7 @@ attach_source_topic0(SourceAddress, Config) -> <<"read access to topic 'test vhost.test user.a.b' in exchange " "'amq.topic' in vhost 'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, @@ -383,7 +385,7 @@ attach_source_topic0(SourceAddress, Config) -> {ok, Recv2} = amqp10_client:attach_receiver_link( Session2, <<"receiver-2">>, SourceAddress), receive {amqp10_event, {link, Recv2, attached}} -> ok - after 5000 -> flush(missing_attached), + after ?TIMEOUT -> flush(missing_attached), ct:fail("missing ATTACH from server") end, @@ -405,7 +407,7 @@ v1_attach_target_internal_exchange(Config) -> ExpectedErr = error_unauthorized( <<"forbidden to publish to internal exchange 'test exchange' in vhost '/'">>), receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, @@ -429,7 +431,7 @@ attach_source_queue(Config) -> receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, ok = close_connection_sync(Conn). @@ -448,13 +450,13 @@ attach_target_exchange(Config) -> <<"write access to exchange '", XName/binary, "' in vhost 'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, {ok, Session2} = amqp10_client:begin_session_sync(Connection), {ok, _} = amqp10_client:attach_sender_link(Session2, <<"test-sender">>, Address2), receive {amqp10_event, {session, Session2, {ended, ExpectedErr}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, ok = amqp10_client:close_connection(Connection). @@ -478,7 +480,7 @@ attach_target_queue(Config) -> <<"write access to exchange 'amq.default' ", "in vhost 'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, ok = amqp10_client:close_connection(Conn). @@ -500,7 +502,7 @@ target_per_message_exchange(Config) -> Msg1 = amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, <<"m1">>)), ok = amqp10_client:send_msg(Sender, Msg1), receive {amqp10_disposition, {released, Tag1}} -> ok - after 5000 -> ct:fail(released_timeout) + after ?TIMEOUT -> ct:fail(released_timeout) end, %% We don't have sufficient authorization. @@ -511,7 +513,7 @@ target_per_message_exchange(Config) -> <<"write access to exchange 'amq.default' in " "vhost 'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, ok = close_connection_sync(Connection). @@ -534,7 +536,7 @@ target_per_message_internal_exchange(Config) -> ExpectedErr = error_unauthorized( <<"forbidden to publish to internal exchange '", XName/binary, "' in vhost 'test vhost'">>), receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok - after 5000 -> flush(missing_event), + after ?TIMEOUT -> flush(missing_event), ct:fail({missing_event, ?LINE}) end, ok = close_connection_sync(Conn1), @@ -563,7 +565,7 @@ target_per_message_topic(Config) -> Msg1 = amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, <<"m1">>)), ok = amqp10_client:send_msg(Sender, Msg1), receive {amqp10_disposition, {released, Tag1}} -> ok - after 5000 -> ct:fail(released_timeout) + after ?TIMEOUT -> ct:fail(released_timeout) end, %% We don't have sufficient authorization. @@ -574,7 +576,7 @@ target_per_message_topic(Config) -> <<"write access to topic '.a.b' in exchange 'amq.topic' in " "vhost 'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, ok = close_connection_sync(Connection). @@ -594,7 +596,7 @@ authn_failure_event(Config) -> {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, {closed, sasl_auth_failure}}} -> ok - after 5000 -> flush(missing_closed), + after ?TIMEOUT -> flush(missing_closed), ct:fail("did not receive sasl_auth_failure") end, @@ -621,7 +623,7 @@ sasl_success(Mechanism, Config) -> OpnConf = OpnConf0#{sasl := Mechanism}, {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, opened}} -> ok - after 5000 -> ct:fail(missing_opened) + after ?TIMEOUT -> ct:fail(missing_opened) end, ok = amqp10_client:close_connection(Connection). @@ -638,7 +640,7 @@ sasl_anonymous_failure(Config) -> {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, {closed, Reason}}} -> ?assertEqual({sasl_not_supported, Mechanism}, Reason) - after 5000 -> ct:fail(missing_closed) + after ?TIMEOUT -> ct:fail(missing_closed) end, ok = rpc(Config, application, set_env, [App, Par, Default]). @@ -649,7 +651,7 @@ sasl_plain_failure(Config) -> {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, {closed, Reason}}} -> ?assertEqual(sasl_auth_failure, Reason) - after 5000 -> ct:fail(missing_closed) + after ?TIMEOUT -> ct:fail(missing_closed) end. %% Skipping SASL is disallowed in RabbitMQ. @@ -658,14 +660,14 @@ sasl_none_failure(Config) -> OpnConf = OpnConf0#{sasl := none}, {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, {closed, _Reason}}} -> ok - after 5000 -> ct:fail(missing_closed) + after ?TIMEOUT -> ct:fail(missing_closed) end. vhost_absent(Config) -> OpnConf = connection_config(Config, <<"this vhost does not exist">>), {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, {closed, _}}} -> ok - after 5000 -> ct:fail(missing_closed) + after ?TIMEOUT -> ct:fail(missing_closed) end. vhost_connection_limit(Config) -> @@ -675,22 +677,22 @@ vhost_connection_limit(Config) -> OpnConf = connection_config(Config), {ok, C1} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, C1, opened}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, {ok, C2} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, C2, {closed, _}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, OpnConf0 = connection_config(Config, <<"/">>), OpnConf1 = OpnConf0#{sasl := anon}, {ok, C3} = amqp10_client:open_connection(OpnConf1), receive {amqp10_event, {connection, C3, opened}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, {ok, C4} = amqp10_client:open_connection(OpnConf1), receive {amqp10_event, {connection, C4, opened}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, [ok = close_connection_sync(C) || C <- [C1, C3, C4]], @@ -704,12 +706,12 @@ user_connection_limit(Config) -> OpnConf = OpnConf0#{sasl := anon}, {ok, C1} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, C1, {closed, _}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, {ok, C2} = amqp10_client:open_connection(connection_config(Config)), receive {amqp10_event, {connection, C2, opened}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, ok = close_connection_sync(C2), @@ -731,7 +733,7 @@ v1_vhost_queue_limit(Config) -> ?V_1_0_AMQP_ERROR_RESOURCE_LIMIT_EXCEEDED, <<"cannot declare queue 'q1': queue limit in vhost 'test vhost' (0) is reached">>), receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive expected error") end, @@ -742,8 +744,8 @@ v1_vhost_queue_limit(Config) -> {ok, Sender2} = amqp10_client:attach_sender_link( Session2, <<"test-sender-2">>, TargetAddress), receive {amqp10_event, {link, Sender2, attached}} -> ok - after 5000 -> flush(missing_attached), - ct:fail("missing ATTACH from server") + after ?TIMEOUT -> flush(missing_attached), + ct:fail("missing ATTACH from server") end, ok = close_connection_sync(C1), diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index ab4addfd69..78d8c69938 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -1260,7 +1260,7 @@ drain_many(Config, QueueType, QName) after 30000 -> ct:fail({missing_delivery, ?LINE}) end, receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok - after 300 -> ct:fail("expected credit_exhausted") + after 30000 -> ct:fail("expected credit_exhausted") end, ok = amqp10_client:detach_link(Sender), diff --git a/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl b/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl index ba465e396f..83f91cfda6 100644 --- a/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl +++ b/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl @@ -80,10 +80,10 @@ credit_api_v2(Config) -> {ok, CQSender} = amqp10_client:attach_sender_link(Session, <<"cq sender">>, CQAddr), {ok, QQSender} = amqp10_client:attach_sender_link(Session, <<"qq sender">>, QQAddr), receive {amqp10_event, {link, CQSender, credited}} -> ok - after 5000 -> ct:fail(credited_timeout) + after 30_000 -> ct:fail(credited_timeout) end, receive {amqp10_event, {link, QQSender, credited}} -> ok - after 5000 -> ct:fail(credited_timeout) + after 30_000 -> ct:fail(credited_timeout) end, %% Send 40 messages to each queue. @@ -146,7 +146,7 @@ credit_api_v2(Config) -> %% Draining should also work. ok = amqp10_client:flow_link_credit(CQReceiver3, 10, never, true), receive {amqp10_event, {link, CQReceiver3, credit_exhausted}} -> ok - after 5000 -> ct:fail({missing_credit_exhausted, ?LINE}) + after 30_000 -> ct:fail({missing_credit_exhausted, ?LINE}) end, receive Unexpected1 -> ct:fail({unexpected, ?LINE, Unexpected1}) after 20 -> ok @@ -154,7 +154,7 @@ credit_api_v2(Config) -> ok = amqp10_client:flow_link_credit(QQReceiver3, 10, never, true), receive {amqp10_event, {link, QQReceiver3, credit_exhausted}} -> ok - after 5000 -> ct:fail({missing_credit_exhausted, ?LINE}) + after 30_000 -> ct:fail({missing_credit_exhausted, ?LINE}) end, receive Unexpected2 -> ct:fail({unexpected, ?LINE, Unexpected2}) after 20 -> ok @@ -166,11 +166,11 @@ credit_api_v2(Config) -> ok = detach_sync(QQReceiver3), ok = amqp10_client:end_session(Session), receive {amqp10_event, {session, Session, {ended, _}}} -> ok - after 5000 -> ct:fail(missing_ended) + after 30_000 -> ct:fail(missing_ended) end, ok = amqp10_client:close_connection(Connection), receive {amqp10_event, {connection, Connection, {closed, normal}}} -> ok - after 5000 -> ct:fail(missing_closed) + after 30_000 -> ct:fail(missing_closed) end. consume_and_accept(NumMsgs, Receiver) -> @@ -192,14 +192,14 @@ receive_messages0(Receiver, N, Acc) -> receive {amqp10_msg, Receiver, Msg} -> receive_messages0(Receiver, N - 1, [Msg | Acc]) - after 5000 -> + after 30_000 -> exit({timeout, {num_received, length(Acc)}, {num_missing, N}}) end. detach_sync(Receiver) -> ok = amqp10_client:detach_link(Receiver), receive {amqp10_event, {link, Receiver, {detached, normal}}} -> ok - after 5000 -> ct:fail({missing_detached, Receiver}) + after 30_000 -> ct:fail({missing_detached, Receiver}) end. flush(Prefix) -> diff --git a/deps/rabbit/test/amqp_utils.erl b/deps/rabbit/test/amqp_utils.erl index 22865df919..ebc2d4f03b 100644 --- a/deps/rabbit/test/amqp_utils.erl +++ b/deps/rabbit/test/amqp_utils.erl @@ -55,7 +55,7 @@ wait_for_credit(Sender) -> receive {amqp10_event, {link, Sender, credited}} -> ok - after 5000 -> + after 30_000 -> flush("wait_for_credit timed out"), ct:fail(credited_timeout) end. @@ -66,7 +66,7 @@ wait_for_accepts(N) -> receive {amqp10_disposition, {accepted, _}} -> wait_for_accepts(N - 1) - after 5000 -> + after 30_000 -> ct:fail({missing_accepted, N}) end. @@ -108,9 +108,9 @@ wait_for_link_detach(Link) -> {amqp10_event, {link, Link, {detached, #'v1_0.detach'{}}}} -> flush(?FUNCTION_NAME), ok - after 5000 -> - flush("wait_for_link_detach timed out"), - ct:fail({link_detach_timeout, Link}) + after 30_000 -> + flush("wait_for_link_detach timed out"), + ct:fail({link_detach_timeout, Link}) end. end_session_sync(Session) @@ -123,9 +123,9 @@ wait_for_session_end(Session) -> {amqp10_event, {session, Session, {ended, _}}} -> flush(?FUNCTION_NAME), ok - after 5000 -> - flush("wait_for_session_end timed out"), - ct:fail({session_end_timeout, Session}) + after 30_000 -> + flush("wait_for_session_end timed out"), + ct:fail({session_end_timeout, Session}) end. close_connection_sync(Connection) @@ -138,7 +138,7 @@ wait_for_connection_close(Connection) -> {amqp10_event, {connection, Connection, {closed, normal}}} -> flush(?FUNCTION_NAME), ok - after 5000 -> - flush("wait_for_connection_close timed out"), - ct:fail({connection_close_timeout, Connection}) + after 30_000 -> + flush("wait_for_connection_close timed out"), + ct:fail({connection_close_timeout, Connection}) end. diff --git a/deps/rabbit/test/amqpl_consumer_ack_SUITE.erl b/deps/rabbit/test/amqpl_consumer_ack_SUITE.erl index 1a3a878ccd..e1093fe3ac 100644 --- a/deps/rabbit/test/amqpl_consumer_ack_SUITE.erl +++ b/deps/rabbit/test/amqpl_consumer_ack_SUITE.erl @@ -19,6 +19,8 @@ -import(rabbit_ct_helpers, [eventually/3]). +-define(TIMEOUT, 30_000). + all() -> [ {group, tests} @@ -95,7 +97,7 @@ requeue_one_channel(QType, Config) -> self()), receive #'basic.consume_ok'{consumer_tag = Ctag} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, [begin @@ -107,19 +109,19 @@ requeue_one_channel(QType, Config) -> receive {#'basic.deliver'{}, #amqp_msg{payload = <<"1">>}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, receive {#'basic.deliver'{}, #amqp_msg{payload = <<"2">>}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, D3 = receive {#'basic.deliver'{delivery_tag = Del3}, #amqp_msg{payload = <<"3">>}} -> Del3 - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, receive {#'basic.deliver'{}, #amqp_msg{payload = <<"4">>}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, assert_messages(QName, 4, 4, Config), @@ -132,18 +134,18 @@ requeue_one_channel(QType, Config) -> receive {#'basic.deliver'{}, #amqp_msg{payload = P1}} -> ?assertEqual(<<"1">>, P1) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, receive {#'basic.deliver'{}, #amqp_msg{payload = P2}} -> ?assertEqual(<<"2">>, P2) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, D3b = receive {#'basic.deliver'{delivery_tag = Del3b}, #amqp_msg{payload = P3}} -> ?assertEqual(<<"3">>, P3), Del3b - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, assert_messages(QName, 4, 4, Config), @@ -181,7 +183,7 @@ requeue_two_channels(QType, Config) -> self()), receive #'basic.consume_ok'{consumer_tag = Ctag1} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, amqp_channel:subscribe(Ch2, @@ -189,7 +191,7 @@ requeue_two_channels(QType, Config) -> consumer_tag = Ctag2}, self()), receive #'basic.consume_ok'{consumer_tag = Ctag2} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, [begin @@ -203,22 +205,22 @@ requeue_two_channels(QType, Config) -> receive {#'basic.deliver'{consumer_tag = C1}, #amqp_msg{payload = <<"1">>}} -> ?assertEqual(Ctag1, C1) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, receive {#'basic.deliver'{consumer_tag = C2}, #amqp_msg{payload = <<"2">>}} -> ?assertEqual(Ctag2, C2) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, receive {#'basic.deliver'{consumer_tag = C3}, #amqp_msg{payload = <<"3">>}} -> ?assertEqual(Ctag1, C3) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, receive {#'basic.deliver'{consumer_tag = C4}, #amqp_msg{payload = <<"4">>}} -> ?assertEqual(Ctag2, C4) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, assert_messages(QName, 4, 4, Config), @@ -228,14 +230,14 @@ requeue_two_channels(QType, Config) -> receive {#'basic.deliver'{consumer_tag = C5}, #amqp_msg{payload = <<"1">>}} -> ?assertEqual(Ctag2, C5) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, DelTag = receive {#'basic.deliver'{consumer_tag = C6, delivery_tag = D}, #amqp_msg{payload = <<"3">>}} -> ?assertEqual(Ctag2, C6), D - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, assert_messages(QName, 4, 4, Config), diff --git a/deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl b/deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl index 8cd6079669..720476ac26 100644 --- a/deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl +++ b/deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl @@ -16,6 +16,8 @@ -import(rabbit_ct_helpers, [eventually/1]). +-define(TIMEOUT, 30_000). + all() -> [ {group, cluster_size_1}, @@ -116,7 +118,7 @@ trace(Config) -> correlation_id = CorrelationId}, payload = RequestPayload}), receive #'basic.ack'{} -> ok - after 5000 -> ct:fail(confirm_timeout) + after ?TIMEOUT -> ct:fail(confirm_timeout) end, %% Receive the request. @@ -138,7 +140,7 @@ trace(Config) -> #amqp_msg{payload = ReplyPayload, props = #'P_basic'{correlation_id = CorrelationId}}} -> ok - after 5000 -> ct:fail(missing_reply) + after ?TIMEOUT -> ct:fail(missing_reply) end, %% 2 messages should have entered RabbitMQ: @@ -217,7 +219,7 @@ rpc(RequesterNode, ResponderNode, Config) -> correlation_id = CorrelationId}, payload = RequestPayload}), receive #'basic.ack'{} -> ok - after 5000 -> ct:fail(confirm_timeout) + after ?TIMEOUT -> ct:fail(confirm_timeout) end, ok = wait_for_queue_declared(RequestQueue, ResponderNode, Config), @@ -239,7 +241,7 @@ rpc(RequesterNode, ResponderNode, Config) -> #amqp_msg{payload = ReplyPayload, props = #'P_basic'{correlation_id = CorrelationId}}} -> ok - after 5000 -> ct:fail(missing_reply) + after ?TIMEOUT -> ct:fail(missing_reply) end. wait_for_queue_declared(Queue, Node, Config) -> diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index 2735478986..534a05d4a9 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -1052,7 +1052,7 @@ bq_queue_recover1(Config) -> exit(QPid, kill), MRef = erlang:monitor(process, QPid), receive {'DOWN', MRef, process, QPid, _Info} -> ok - after 10000 -> exit(timeout_waiting_for_queue_death) + after ?TIMEOUT -> exit(timeout_waiting_for_queue_death) end, rabbit_amqqueue:stop(?VHOST), {Recovered, []} = rabbit_amqqueue:recover(?VHOST), diff --git a/deps/rabbit/test/clustering_recovery_SUITE.erl b/deps/rabbit/test/clustering_recovery_SUITE.erl index b5dd042608..503a939bb1 100644 --- a/deps/rabbit/test/clustering_recovery_SUITE.erl +++ b/deps/rabbit/test/clustering_recovery_SUITE.erl @@ -362,7 +362,7 @@ subscribe(Ch, QName) -> receive #'basic.consume_ok'{consumer_tag = CTag} -> ok - after 10000 -> + after 30000 -> exit(consume_ok_timeout) end. @@ -372,6 +372,6 @@ consume(N) -> receive {#'basic.deliver'{consumer_tag = <<"ctag">>}, _} -> consume(N - 1) - after 10000 -> + after 30000 -> exit(deliver_timeout) end. diff --git a/deps/rabbit/test/confirms_rejects_SUITE.erl b/deps/rabbit/test/confirms_rejects_SUITE.erl index 0e51ac8cd5..e477c44471 100644 --- a/deps/rabbit/test/confirms_rejects_SUITE.erl +++ b/deps/rabbit/test/confirms_rejects_SUITE.erl @@ -6,6 +6,8 @@ -include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile(export_all). +-define(TIMEOUT, 30_000). + all() -> [ {group, parallel_tests} @@ -132,7 +134,7 @@ dead_queue_rejects(Config) -> receive {'basic.ack',_,_} -> ok - after 10000 -> + after ?TIMEOUT -> error(timeout_waiting_for_initial_ack) end, @@ -191,7 +193,7 @@ kill_queue_expect_nack(Config, Ch, QueueName, BasicPublish, AmqpMsg, Tries) -> ok; {'basic.ack',_,_} -> retry - after 10000 -> + after ?TIMEOUT -> error({timeout_waiting_for_nack, process_info(self(), messages)}) end, case R of @@ -343,19 +345,19 @@ consume_all_messages(Ch, QueueName, Msgs) -> assert_ack() -> receive {'basic.ack', _, _} -> ok - after 10000 -> error(timeout_waiting_for_ack) + after ?TIMEOUT -> error(timeout_waiting_for_ack) end, clean_acks_mailbox(). assert_nack() -> receive {'basic.nack', _, _, _} -> ok - after 10000 -> error(timeout_waiting_for_nack) + after ?TIMEOUT -> error(timeout_waiting_for_nack) end, clean_acks_mailbox(). assert_acks(N) -> receive {'basic.ack', N, _} -> ok - after 10000 -> error({timeout_waiting_for_ack, N}) + after ?TIMEOUT -> error({timeout_waiting_for_ack, N}) end, clean_acks_mailbox(). diff --git a/deps/rabbit/test/consumer_timeout_SUITE.erl b/deps/rabbit/test/consumer_timeout_SUITE.erl index c3988571a5..8fe6872b9f 100644 --- a/deps/rabbit/test/consumer_timeout_SUITE.erl +++ b/deps/rabbit/test/consumer_timeout_SUITE.erl @@ -14,7 +14,9 @@ -compile(export_all). -define(CONSUMER_TIMEOUT, 2000). --define(RECEIVE_TIMEOUT, ?CONSUMER_TIMEOUT * 2). +%% Sometimes CI machines are really slow, +%% expecting CONSUMER_TIMEOUT*2 might not be enough +-define(RECEIVE_TIMEOUT, 30_000). -define(GROUP_CONFIG, #{global_consumer_timeout => [{rabbit, [{consumer_timeout, ?CONSUMER_TIMEOUT}]}, @@ -147,7 +149,7 @@ consumer_timeout(Config) -> {'DOWN', _, process, Conn, _} -> flush(1), exit(unexpected_connection_exit) - after 2000 -> + after ?RECEIVE_TIMEOUT -> ok end, rabbit_ct_client_helpers:close_channel(Ch), @@ -172,7 +174,7 @@ consumer_timeout_basic_get(Config) -> {'DOWN', _, process, Conn, _} -> flush(1), exit(unexpected_connection_exit) - after 2000 -> + after ?RECEIVE_TIMEOUT -> ok end, ok. @@ -221,7 +223,7 @@ consumer_timeout_no_basic_cancel_capability(Config) -> {'DOWN', _, process, Conn, _} -> flush(1), exit(unexpected_connection_exit) - after 2000 -> + after ?RECEIVE_TIMEOUT -> ok end, ok. diff --git a/deps/rabbit/test/dead_lettering_SUITE.erl b/deps/rabbit/test/dead_lettering_SUITE.erl index b793cb3abe..5598745bf4 100644 --- a/deps/rabbit/test/dead_lettering_SUITE.erl +++ b/deps/rabbit/test/dead_lettering_SUITE.erl @@ -17,6 +17,8 @@ -import(queue_utils, [wait_for_messages/2]). +-define(TIMEOUT, 30_000). + all() -> [ {group, tests} @@ -455,7 +457,7 @@ dead_letter_reject_many(Config) -> [begin receive {#'basic.deliver'{consumer_tag = CTag, delivery_tag = DTag}, #amqp_msg{payload = P}} -> amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag, requeue = false}) - after 5000 -> + after ?TIMEOUT -> amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), exit(timeout) end @@ -628,7 +630,7 @@ dead_letter_nack_requeue_nack_norequeue_basic_consume(Config) -> consumer_tag = Ctag1}, self()), receive #'basic.consume_ok'{consumer_tag = Ctag1} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, Ctag2 = <<"ctag 2">>, @@ -637,20 +639,20 @@ dead_letter_nack_requeue_nack_norequeue_basic_consume(Config) -> consumer_tag = Ctag2}, self()), receive #'basic.consume_ok'{consumer_tag = Ctag2} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, receive {#'basic.deliver'{}, #amqp_msg{payload = <<"m1">>}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, D2 = receive {#'basic.deliver'{delivery_tag = Del2}, #amqp_msg{payload = <<"m2">>}} -> Del2 - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, receive {#'basic.deliver'{}, #amqp_msg{payload = <<"m3">>}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), @@ -663,13 +665,13 @@ dead_letter_nack_requeue_nack_norequeue_basic_consume(Config) -> receive {#'basic.deliver'{}, #amqp_msg{payload = P1a}} -> ?assertEqual(<<"m1">>, P1a) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, D5 = receive {#'basic.deliver'{delivery_tag = Del5}, #amqp_msg{payload = P2a}} -> ?assertEqual(<<"m2">>, P2a), Del5 - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, %% Nack all 3 without requeue @@ -681,18 +683,18 @@ dead_letter_nack_requeue_nack_norequeue_basic_consume(Config) -> receive {#'basic.deliver'{}, #amqp_msg{payload = P3b}} -> ?assertEqual(<<"m3">>, P3b) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, receive {#'basic.deliver'{}, #amqp_msg{payload = P1b}} -> ?assertEqual(<<"m1">>, P1b) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, LastD = receive {#'basic.deliver'{delivery_tag = LastDel}, #amqp_msg{payload = P2b}} -> ?assertEqual(<<"m2">>, P2b), LastDel - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, wait_for_messages(Config, [[DLXQName, <<"3">>, <<"0">>, <<"3">>]]), @@ -1726,7 +1728,7 @@ metric_rejected(Config) -> [begin receive {#'basic.deliver'{consumer_tag = CTag, delivery_tag = DTag}, #amqp_msg{payload = P}} -> amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag, requeue = false}) - after 5000 -> + after ?TIMEOUT -> amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), exit(timeout) end @@ -1811,7 +1813,7 @@ stream(Config) -> self()), receive #'basic.consume_ok'{consumer_tag = Ctag} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, Headers = receive {#'basic.deliver'{delivery_tag = DeliveryTag}, diff --git a/deps/rabbit/test/direct_exchange_routing_v2_SUITE.erl b/deps/rabbit/test/direct_exchange_routing_v2_SUITE.erl index abef0dd187..e53988860b 100644 --- a/deps/rabbit/test/direct_exchange_routing_v2_SUITE.erl +++ b/deps/rabbit/test/direct_exchange_routing_v2_SUITE.erl @@ -421,7 +421,7 @@ assert_confirm() -> receive #'basic.ack'{} -> ok - after 5000 -> + after 30_000 -> throw(missing_confirm) end. @@ -429,7 +429,7 @@ assert_return() -> receive {#'basic.return'{}, _} -> ok - after 5000 -> + after 30_000 -> throw(missing_return) end. diff --git a/deps/rabbit/test/disconnect_detected_during_alarm_SUITE.erl b/deps/rabbit/test/disconnect_detected_during_alarm_SUITE.erl index 92bf9aedd8..579e42b9d0 100644 --- a/deps/rabbit/test/disconnect_detected_during_alarm_SUITE.erl +++ b/deps/rabbit/test/disconnect_detected_during_alarm_SUITE.erl @@ -88,7 +88,7 @@ disconnect_detected_during_alarm(Config) -> % Check that connection was indeed blocked #'connection.blocked'{} -> ok after - 1000 -> exit(connection_was_not_blocked) + 30_000 -> exit(connection_was_not_blocked) end, %% Connection is blocked, now we should forcefully kill it diff --git a/deps/rabbit/test/metadata_store_clustering_SUITE.erl b/deps/rabbit/test/metadata_store_clustering_SUITE.erl index a33241d263..8b761378da 100644 --- a/deps/rabbit/test/metadata_store_clustering_SUITE.erl +++ b/deps/rabbit/test/metadata_store_clustering_SUITE.erl @@ -482,7 +482,7 @@ join_khepri_while_in_minority(Config) -> receive #'basic.consume_ok'{consumer_tag = CTag} -> ok - after 10000 -> + after 30_000 -> exit(consume_ok_timeout) end, @@ -490,7 +490,7 @@ join_khepri_while_in_minority(Config) -> receive {#'basic.deliver'{consumer_tag = <<"ctag">>}, _} -> ok - after 10000 -> + after 30_000 -> exit(deliver_timeout) end, diff --git a/deps/rabbit/test/mirrored_supervisor_SUITE.erl b/deps/rabbit/test/mirrored_supervisor_SUITE.erl index 7ce527a684..6b37f517a5 100644 --- a/deps/rabbit/test/mirrored_supervisor_SUITE.erl +++ b/deps/rabbit/test/mirrored_supervisor_SUITE.erl @@ -294,7 +294,7 @@ test_startup_failure(Fail, Group) -> receive {'EXIT', _, shutdown} -> ok - after 1000 -> + after 30_000 -> exit({did_not_exit, Fail}) end, process_flag(trap_exit, false), diff --git a/deps/rabbit/test/msg_size_metrics_SUITE.erl b/deps/rabbit/test/msg_size_metrics_SUITE.erl index 0b33ecf1a3..4fd9303843 100644 --- a/deps/rabbit/test/msg_size_metrics_SUITE.erl +++ b/deps/rabbit/test/msg_size_metrics_SUITE.erl @@ -81,7 +81,7 @@ message_size(Config) -> Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>), {ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address), receive {amqp10_event, {link, Sender, credited}} -> ok - after 5000 -> ct:fail(credited_timeout) + after 30_000 -> ct:fail(credited_timeout) end, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)), @@ -149,6 +149,6 @@ wait_for_settlement(State, Tag) -> receive {amqp10_disposition, {State, Tag}} -> ok - after 5000 -> - ct:fail({disposition_timeout, Tag}) + after 30_000 -> + ct:fail({disposition_timeout, Tag}) end. diff --git a/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl b/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl index f2e2c3370e..ec7dfe10d9 100644 --- a/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl +++ b/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl @@ -155,7 +155,7 @@ confirm_ack(Config) -> receive #'basic.ack'{delivery_tag = 1} -> ok - after 5000 -> + after ?TIMEOUT -> throw(missing_ack) end. @@ -196,13 +196,13 @@ confirm_mandatory_unroutable(Config) -> receive {#'basic.return'{}, _} -> ok - after 5000 -> + after ?TIMEOUT -> throw(missing_return) end, receive #'basic.ack'{delivery_tag = 1} -> ok - after 5000 -> + after ?TIMEOUT -> throw(missing_ack) end. @@ -218,7 +218,7 @@ confirm_unroutable_message(Config) -> throw(unexpected_basic_return); #'basic.ack'{delivery_tag = 1} -> ok - after 5000 -> + after ?TIMEOUT -> throw(missing_ack) end. @@ -333,6 +333,6 @@ receive_many(DTags) -> receive_many(DTags -- lists:seq(1, DTag)); #'basic.ack'{delivery_tag = DTag, multiple = false} -> receive_many(DTags -- [DTag]) - after 5000 -> + after ?TIMEOUT -> throw(missing_ack) end. diff --git a/deps/rabbit/test/queue_length_limits_SUITE.erl b/deps/rabbit/test/queue_length_limits_SUITE.erl index b40cab4aa9..83499b0eab 100644 --- a/deps/rabbit/test/queue_length_limits_SUITE.erl +++ b/deps/rabbit/test/queue_length_limits_SUITE.erl @@ -306,19 +306,19 @@ check_max_length_rejects(QName, Ch, Payload1, Payload2, Payload3) -> %% First message can be enqueued and acks amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), receive #'basic.ack'{} -> ok - after 1000 -> error(expected_ack) + after 30_000 -> error(expected_ack) end, %% The message cannot be enqueued and nacks amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), receive #'basic.nack'{} -> ok - after 1000 -> error(expected_nack) + after 30_000 -> error(expected_nack) end, %% The message cannot be enqueued and nacks amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}), receive #'basic.nack'{} -> ok - after 1000 -> error(expected_nack) + after 30_000 -> error(expected_nack) end, {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), @@ -326,7 +326,7 @@ check_max_length_rejects(QName, Ch, Payload1, Payload2, Payload3) -> %% Now we can publish message 2. amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), receive #'basic.ack'{} -> ok - after 1000 -> error(expected_ack) + after 30_000 -> error(expected_ack) end, {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). diff --git a/deps/rabbit/test/queue_parallel_SUITE.erl b/deps/rabbit/test/queue_parallel_SUITE.erl index 5ee1c32326..208556bd5b 100644 --- a/deps/rabbit/test/queue_parallel_SUITE.erl +++ b/deps/rabbit/test/queue_parallel_SUITE.erl @@ -538,7 +538,7 @@ basic_cancel(Config) -> wait_for_messages(Config, QName, 3, 2, 1), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), wait_for_messages(Config, QName, 2, 2, 0) - after 5000 -> + after 30_000 -> exit(basic_deliver_timeout) end, rabbit_ct_client_helpers:close_channel(Ch), @@ -667,7 +667,7 @@ cc_header_non_array_should_close_channel(Config) -> receive {'DOWN', Ref, process, Ch, {shutdown, {server_initiated_close, 406, _}}} -> ok - after 5000 -> + after 30_000 -> exit(channel_closed_timeout) end, diff --git a/deps/rabbit/test/queue_type_SUITE.erl b/deps/rabbit/test/queue_type_SUITE.erl index 28352212df..80ba120db3 100644 --- a/deps/rabbit/test/queue_type_SUITE.erl +++ b/deps/rabbit/test/queue_type_SUITE.erl @@ -6,6 +6,8 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-define(TIMEOUT, 30_000). + %%%=================================================================== %%% Common Test callbacks %%%=================================================================== @@ -132,7 +134,7 @@ smoke(Config) -> redelivered = false}, #amqp_msg{}} -> basic_ack(Ch, DeliveryTag) - after 5000 -> + after ?TIMEOUT -> flush(), exit(basic_deliver_timeout) end, @@ -151,7 +153,7 @@ smoke(Config) -> #amqp_msg{}} -> basic_cancel(Ch, ConsumerTag2), basic_nack(Ch, T) - after 5000 -> + after ?TIMEOUT -> exit(basic_deliver_timeout) end, %% get and ack @@ -255,7 +257,7 @@ stream(Config) -> redelivered = false}, #amqp_msg{}} -> basic_ack(SubCh, T) - after 5000 -> + after ?TIMEOUT -> exit(basic_deliver_timeout) end catch @@ -294,7 +296,7 @@ publish_and_confirm(Ch, Queue, Msg) -> ok = receive #'basic.ack'{} -> ok; #'basic.nack'{} -> fail - after 2500 -> + after ?TIMEOUT -> flush(), exit(confirm_timeout) end. @@ -318,7 +320,7 @@ subscribe(Ch, Queue, CTag) -> receive #'basic.consume_ok'{consumer_tag = CTag} -> ok - after 5000 -> + after ?TIMEOUT -> exit(basic_consume_timeout) end. diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 6b9f5d485a..2ac95b85b4 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -28,6 +28,7 @@ -define(NET_TICKTIME_S, 5). -define(DEFAULT_AWAIT, 10_000). +-define(TIMEOUT, 30_000). suite() -> [{timetrap, 5 * 60_000}]. @@ -604,7 +605,7 @@ start_queue_concurrent(Config) -> [begin receive {done, Server} -> ok - after 10000 -> exit({await_done_timeout, Server}) + after ?TIMEOUT -> exit({await_done_timeout, Server}) end end || Server <- Servers], @@ -1077,7 +1078,7 @@ single_active_consumer_priority_take_over(Config) -> delivery_tag = DeliveryTag}, _} -> amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag, multiple = false}) - after 5000 -> + after ?TIMEOUT -> flush(1), exit(basic_deliver_timeout) end, @@ -2407,7 +2408,7 @@ confirm_availability_on_leader_change(Config) -> {'EXIT', Publisher, Err} -> ok = rabbit_ct_broker_helpers:start_node(Config, Node2), exit(Err) - after 30000 -> + after ?TIMEOUT -> ok = rabbit_ct_broker_helpers:start_node(Config, Node2), flush(100), exit(nothing_received_from_publisher_process) @@ -2872,7 +2873,7 @@ subscribe_redelivery_count(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}) - after 5000 -> + after ?TIMEOUT -> exit(basic_deliver_timeout) end, @@ -2885,7 +2886,7 @@ subscribe_redelivery_count(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, multiple = false, requeue = true}) - after 5000 -> + after ?TIMEOUT -> flush(1), exit(basic_deliver_timeout_2) end, @@ -2901,7 +2902,7 @@ subscribe_redelivery_count(Config) -> wait_for_messages_ready(Servers, RaName, 0), ct:pal("wait_for_messages_pending_ack", []), wait_for_messages_pending_ack(Servers, RaName, 0) - after 5000 -> + after ?TIMEOUT -> flush(500), exit(basic_deliver_timeout_3) end. @@ -2986,7 +2987,7 @@ subscribe_redelivery_limit_disable(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2, multiple = false, requeue = true}) - after 5000 -> + after ?TIMEOUT -> flush(1), ct:fail("message did not arrive as expected") end, @@ -3632,7 +3633,7 @@ receive_and_ack(Ch) -> redelivered = false}, _} -> amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = false}) - after 5000 -> + after ?TIMEOUT -> ct:fail("receive_and_ack timed out", []) end. @@ -3733,7 +3734,7 @@ per_message_ttl_mixed_expiry(Config) -> #amqp_msg{payload = Msg1}} -> amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = false}) - after 2000 -> + after ?TIMEOUT -> flush(10), ct:fail("basic deliver timeout") end, @@ -3761,7 +3762,7 @@ per_message_ttl_expiration_too_high(Config) -> {'DOWN', MonitorRef, process, Ch, {shutdown, {server_initiated_close, 406, <<"PRECONDITION_FAILED - invalid expiration", _/binary>>}}} -> ok - after 1000 -> + after ?TIMEOUT -> ct:fail("expected channel error") end. @@ -3883,7 +3884,7 @@ consumer_priorities(Config) -> {#'basic.deliver'{delivery_tag = D1, consumer_tag = Tag2}, _} -> D1 - after 5000 -> + after ?TIMEOUT -> flush(100), ct:fail("basic.deliver timeout") end, @@ -3893,7 +3894,7 @@ consumer_priorities(Config) -> {#'basic.deliver'{delivery_tag = _, consumer_tag = Tag2}, _} -> ok - after 5000 -> + after ?TIMEOUT -> flush(100), ct:fail("basic.deliver timeout") end, @@ -3904,7 +3905,7 @@ consumer_priorities(Config) -> {#'basic.deliver'{delivery_tag = _, consumer_tag = Tag1}, _} -> ok - after 5000 -> + after ?TIMEOUT -> flush(100), ct:fail("basic.deliver timeout") end, @@ -3917,7 +3918,7 @@ consumer_priorities(Config) -> {#'basic.deliver'{delivery_tag = _, consumer_tag = Tag2}, _} -> ok - after 5000 -> + after ?TIMEOUT -> flush(100), ct:fail("basic.deliver timeout") end, @@ -3945,7 +3946,7 @@ cancel_consumer_gh_3729(Config) -> {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> R = #'basic.reject'{delivery_tag = DeliveryTag, requeue = true}, ok = amqp_channel:cast(Ch, R) - after 5000 -> + after ?TIMEOUT -> flush(100), ct:fail("basic.deliver timeout") end, @@ -3954,7 +3955,7 @@ cancel_consumer_gh_3729(Config) -> receive #'basic.cancel_ok'{consumer_tag = <<"ctag">>} -> ok - after 5000 -> + after ?TIMEOUT -> flush(100), ct:fail("basic.cancel_ok timeout") end, @@ -3990,7 +3991,7 @@ cancel_consumer_gh_12424(Config) -> DeliveryTag = receive {#'basic.deliver'{delivery_tag = DT}, _} -> DT - after 5000 -> + after ?TIMEOUT -> flush(100), ct:fail("basic.deliver timeout") end, @@ -4023,7 +4024,7 @@ cancel_and_consume_with_same_tag(Config) -> {#'basic.deliver'{delivery_tag = D}, #amqp_msg{payload = <<"msg1">>}} -> D - after 5000 -> + after ?TIMEOUT -> flush(100), ct:fail("basic.deliver timeout") end, @@ -4038,7 +4039,7 @@ cancel_and_consume_with_same_tag(Config) -> {#'basic.deliver'{delivery_tag = _}, #amqp_msg{payload = <<"msg2">>}} -> ok - after 5000 -> + after ?TIMEOUT -> flush(100), ct:fail("basic.deliver timeout 2") end, @@ -4290,7 +4291,7 @@ requeue_multiple_true(Config) -> #amqp_msg{payload = P0}} -> ?assertEqual(P, P0), D - after 5000 -> ct:fail({basic_deliver_timeout, P, ?LINE}) + after ?TIMEOUT -> ct:fail({basic_deliver_timeout, P, ?LINE}) end || P <- Payloads], %% Requeue all messages. @@ -4303,7 +4304,7 @@ requeue_multiple_true(Config) -> [receive {#'basic.deliver'{redelivered = true}, #amqp_msg{payload = P1}} -> ?assertEqual(P, P1) - after 5000 -> ct:fail({basic_deliver_timeout, P, ?LINE}) + after ?TIMEOUT -> ct:fail({basic_deliver_timeout, P, ?LINE}) end || P <- Payloads], ?assertEqual(#'queue.delete_ok'{message_count = 0}, @@ -4328,7 +4329,7 @@ requeue_multiple_false(Config) -> #amqp_msg{payload = P0}} -> ?assertEqual(P, P0), D - after 5000 -> ct:fail({basic_deliver_timeout, P, ?LINE}) + after ?TIMEOUT -> ct:fail({basic_deliver_timeout, P, ?LINE}) end || P <- Payloads], %% The delivery tags we received via AMQP 0.9.1 are ordered from 1-100. @@ -4347,7 +4348,7 @@ requeue_multiple_false(Config) -> [receive {#'basic.deliver'{redelivered = true}, #amqp_msg{payload = P1}} -> ?assertEqual(integer_to_binary(D), P1) - after 5000 -> ct:fail({basic_deliver_timeout, ?LINE}) + after ?TIMEOUT -> ct:fail({basic_deliver_timeout, ?LINE}) end || D <- DTagsShuffled], ?assertEqual(#'queue.delete_ok'{message_count = 0}, @@ -4474,7 +4475,7 @@ subscribe(Ch, Queue, NoAck, Tag, Args) -> receive #'basic.consume_ok'{consumer_tag = Tag} -> ok - after 30000 -> + after ?TIMEOUT -> flush(100), exit(subscribe_timeout) end. @@ -4499,7 +4500,7 @@ nack(Ch, Multiple, Requeue) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = Multiple, requeue = Requeue}) - after 5000 -> + after ?TIMEOUT -> flush(10), ct:fail("basic deliver timeout") end. @@ -4572,7 +4573,7 @@ validate_queue(Ch, Queue, ExpectedMsgs) -> #amqp_msg{payload = M}} -> amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, multiple = false}) - after 5000 -> + after ?TIMEOUT -> flush(10), exit({validate_queue_timeout, M}) end diff --git a/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl b/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl index ec907a8411..3909c6e3a2 100644 --- a/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl +++ b/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl @@ -482,6 +482,6 @@ receive_delete_events(N, Evts) -> receive {mnesia_table_event, {delete, _, Record, _, _}} -> receive_delete_events(N - 1, [Record | Evts]) - after 10000 -> + after 30000 -> Evts end. diff --git a/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl b/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl index 5d4c39958e..eda2593be7 100644 --- a/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl @@ -755,7 +755,7 @@ publish_confirm(Ch, QName) -> ok; #'basic.nack'{} -> fail - after 2500 -> + after 30_000 -> ct:fail(confirm_timeout) end. @@ -871,14 +871,14 @@ many_target_queues(Config) -> receive #'basic.consume_ok'{consumer_tag = CTag} -> ok - after 2000 -> + after 30_000 -> exit(consume_ok_timeout) end, receive {#'basic.deliver'{consumer_tag = CTag}, #amqp_msg{payload = Msg1}} -> ok - after 2000 -> + after 30_000 -> exit(deliver_timeout) end, ?awaitMatch([{0, 0}], @@ -911,7 +911,7 @@ many_target_queues(Config) -> {#'basic.deliver'{consumer_tag = CTag}, #amqp_msg{payload = Msg2}} -> ok - after 0 -> + after 30_000 -> exit(deliver_timeout) end, ?assertEqual(2, counted(messages_dead_lettered_expired_total, Config)), diff --git a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl index fae1251d47..202a729b64 100644 --- a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl @@ -10,8 +10,9 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit_common/include/rabbit_framing.hrl"). --define(RA_EVENT_TIMEOUT, 5000). +-define(RA_EVENT_TIMEOUT, 30_000). -define(RA_SYSTEM, quorum_queues). +-define(TIMEOUT, 30_000). all() -> [ @@ -118,7 +119,7 @@ basics(Config) -> {ok, S, _} -> DeliverFun(S, F) end - after 5000 -> + after ?TIMEOUT -> flush(), exit(await_delivery_timeout) end @@ -136,7 +137,7 @@ basics(Config) -> ct:pal("ra_event ~p", [Evt]), {ok, F6, _} = rabbit_fifo_client:handle_ra_event(ClusterName, From, Evt, FState5), F6 - after 5000 -> + after ?TIMEOUT -> exit(leader_change_timeout) end, @@ -175,7 +176,7 @@ rabbit_fifo_returns_correlation(Config) -> Del -> exit({unexpected, Del}) end - after 2000 -> + after ?TIMEOUT -> exit(await_msg_timeout) end, rabbit_quorum_queue:stop_server(ServerId), @@ -208,7 +209,7 @@ duplicate_delivery(Config) -> end end end - after 2000 -> + after ?TIMEOUT -> exit(await_msg_timeout) end end, @@ -281,7 +282,7 @@ detects_lost_delivery(Config) -> receive {ra_event, _, {machine, {delivery, _, [{_, {_, msg1}}]}}} -> ok - after 5000 -> + after ?TIMEOUT -> exit(await_delivery_timeout) end, @@ -316,7 +317,7 @@ returns(Config) -> ?assertEqual(undefined, mc:get_annotation(<<"x-delivery-count">>, Msg1Out0)), ?assertEqual(undefined, mc:get_annotation(delivery_count, Msg1Out0)), rabbit_fifo_client:return(<<"tag">>, [MsgId], FC2) - after 5000 -> + after ?TIMEOUT -> flush(), exit(await_delivery_timeout) end, @@ -333,7 +334,7 @@ returns(Config) -> %% delivery_count should _not_ be incremented for a return ?assertEqual(undefined, mc:get_annotation(delivery_count, Msg1Out)), rabbit_fifo_client:modify(<<"tag">>, [MsgId1], true, false, #{}, FC4) - after 5000 -> + after ?TIMEOUT -> flush(), exit(await_delivery_timeout_2) end, @@ -349,7 +350,7 @@ returns(Config) -> %% delivery_count should be incremented for a modify with delivery_failed = true ?assertEqual(1, mc:get_annotation(delivery_count, Msg2Out)), rabbit_fifo_client:settle(<<"tag">>, [MsgId2], FC6) - after 5000 -> + after ?TIMEOUT -> flush(), exit(await_delivery_timeout_3) end, @@ -377,7 +378,7 @@ returns_after_down(Config) -> receive {'DOWN', MonRef, _, _, _} -> ok - after 5000 -> + after ?TIMEOUT -> ct:fail("waiting for process exit timed out") end, rabbit_ct_helpers:await_condition( @@ -411,7 +412,7 @@ resends_after_lost_applied(Config) -> receive {ra_event, _, {applied, _}} -> ok - after 500 -> + after ?TIMEOUT -> exit(await_ra_event_timeout) end, % send another message @@ -477,7 +478,7 @@ discard(Config) -> [msg1] = Letters, rejected = Reason, ok - after 500 -> + after ?TIMEOUT -> flush(), exit(dead_letter_timeout) end, @@ -571,7 +572,7 @@ lost_delivery(Config) -> {ra_event, _, Evt} -> ct:pal("dropping event ~tp", [Evt]), ok - after 500 -> + after ?TIMEOUT -> exit(await_ra_event_timeout) end, % send another message @@ -744,7 +745,7 @@ test_queries(Config) -> end), receive ready -> ok - after 5000 -> + after ?TIMEOUT -> exit(ready_timeout) end, F0 = rabbit_fifo_client:init([ServerId], 4), @@ -820,7 +821,7 @@ receive_ra_events(Applied, Deliveries, Acc) -> receive_ra_events(Applied, Deliveries - length(MsgIds), [Evt | Acc]); {ra_event, _, _} = Evt -> receive_ra_events(Applied, Deliveries, [Evt | Acc]) - after 5000 -> + after ?TIMEOUT -> exit({missing_events, Applied, Deliveries, Acc}) end. @@ -832,7 +833,7 @@ receive_ra_events(Acc) -> receive {ra_event, _, _} = Evt -> receive_ra_events([Evt | Acc]) - after 500 -> + after 1000 -> Acc end. @@ -892,7 +893,7 @@ flush() -> Msg -> ct:pal("flushed: ~w~n", [Msg]), flush() - after 10 -> + after 100 -> ok end. diff --git a/deps/rabbit/test/rabbit_local_random_exchange_SUITE.erl b/deps/rabbit/test/rabbit_local_random_exchange_SUITE.erl index a2e57f4761..a51eec8797 100644 --- a/deps/rabbit/test/rabbit_local_random_exchange_SUITE.erl +++ b/deps/rabbit/test/rabbit_local_random_exchange_SUITE.erl @@ -135,7 +135,7 @@ publish_expect_return(Config, E, Node) -> receive {#'basic.return'{}, _} -> ok - after 5000 -> + after 30_000 -> flush(100), ct:fail("no return received") end diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index c8b2f8aabc..2650f286e4 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -18,6 +18,7 @@ -import(rabbit_ct_helpers, [await_condition/2]). -define(WAIT, 5000). +-define(TIMEOUT, 30_000). suite() -> [{timetrap, 15 * 60_000}]. @@ -1009,7 +1010,7 @@ consume(Config) -> multiple = false}), _ = amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}), ok = amqp_channel:close(Ch1) - after 5000 -> + after ?TIMEOUT -> ct:fail(timeout) end, rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). @@ -1073,8 +1074,8 @@ consume_timestamp_offset(Config) -> receive #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> ok - after 5000 -> - flush(), + after ?TIMEOUT -> + flush(), exit(consume_ok_timeout) end, @@ -1108,7 +1109,7 @@ consume_timestamp_last_offset(Config) -> receive #'basic.consume_ok'{consumer_tag = CTag} -> ok - after 5000 -> + after ?TIMEOUT -> exit(missing_consume_ok) end, @@ -1178,7 +1179,7 @@ basic_cancel(Config) -> {#'basic.deliver'{}, _} -> amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = CTag}), ?assertMatch([], filter_consumers(Config, Server, CTag)) - after 10000 -> + after ?TIMEOUT -> exit(timeout) end, rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). @@ -1203,7 +1204,7 @@ receive_basic_cancel_on_queue_deletion(Config) -> receive #'basic.cancel'{consumer_tag = CTag} -> ok - after 10000 -> + after ?TIMEOUT -> exit(timeout) end. @@ -1261,7 +1262,7 @@ keep_consuming_on_leader_restart(Config) -> {#'basic.deliver'{delivery_tag = DeliveryTag1}, _} -> ok = amqp_channel:cast(Ch2, #'basic.ack'{delivery_tag = DeliveryTag1, multiple = false}) - after 5000 -> + after ?TIMEOUT -> exit(timeout) end, @@ -1275,7 +1276,7 @@ keep_consuming_on_leader_restart(Config) -> {#'basic.deliver'{delivery_tag = DeliveryTag2}, _} -> ok = amqp_channel:cast(Ch2, #'basic.ack'{delivery_tag = DeliveryTag2, multiple = false}) - after 5000 -> + after ?TIMEOUT -> exit(timeout) end, @@ -1396,7 +1397,7 @@ consume_and_(Config, AckFun) -> ?assertMatch({'queue.declare_ok', Q, _MsgCount, 0}, declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]) - after 5000 -> + after ?TIMEOUT -> exit(timeout) end, rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). @@ -1588,7 +1589,7 @@ consume_from_next(Config, Args) -> receive #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> ok - after 10000 -> + after ?TIMEOUT -> exit(consume_ok_failed) end, @@ -1684,7 +1685,7 @@ consume_while_deleting_replica(Config) -> ok; {_, #amqp_msg{}} -> exit(unexpected_message) - after 30000 -> + after ?TIMEOUT -> exit(missing_consumer_cancel) end, @@ -1712,10 +1713,10 @@ consume_credit(Config) -> %% We expect to receive exactly 2 messages. DTag1 = receive {#'basic.deliver'{delivery_tag = Tag1}, _} -> Tag1 - after 5000 -> ct:fail({missing_delivery, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE}) end, _DTag2 = receive {#'basic.deliver'{delivery_tag = Tag2}, _} -> Tag2 - after 5000 -> ct:fail({missing_delivery, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE}) end, receive {#'basic.deliver'{}, _} -> ct:fail({unexpected_delivery, ?LINE}) after 100 -> ok @@ -1725,7 +1726,7 @@ consume_credit(Config) -> ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DTag1, multiple = false}), DTag3 = receive {#'basic.deliver'{delivery_tag = Tag3}, _} -> Tag3 - after 5000 -> ct:fail({missing_delivery, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE}) end, receive {#'basic.deliver'{}, _} -> ct:fail({unexpected_delivery, ?LINE}) @@ -1747,11 +1748,11 @@ consume_credit0(Ch, DTagPrev) -> multiple = true}), %% Receive 1st message. receive {#'basic.deliver'{}, _} -> ok - after 5000 -> ct:fail({missing_delivery, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE}) end, %% Receive 2nd message. DTag = receive {#'basic.deliver'{delivery_tag = T}, _} -> T - after 5000 -> ct:fail({missing_delivery, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE}) end, %% We shouldn't receive more messages given that AMQP 0.9.1 prefetch count is 2. receive {#'basic.deliver'{}, _} -> ct:fail({unexpected_delivery, ?LINE}) @@ -1813,7 +1814,7 @@ consume_credit_out_of_order_ack(Config) -> receive {#'basic.deliver'{}, _} -> ok - after 5000 -> + after ?TIMEOUT -> exit(timeout) end, rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). @@ -1853,7 +1854,7 @@ consume_credit_multiple_ack(Config) -> receive {#'basic.deliver'{}, _} -> ok - after 5000 -> + after ?TIMEOUT -> exit(timeout) end, rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). @@ -2066,7 +2067,7 @@ leader_failover_dedupe(Config) -> N = receive {last_msg, X} -> X - after 2000 -> + after ?TIMEOUT -> exit(last_msg_timeout) end, %% validate that no duplicates were written even though an internal @@ -2508,7 +2509,7 @@ dead_letter_target(Config) -> receive #'basic.consume_ok'{consumer_tag = CTag} -> ok - after 5000 -> + after ?TIMEOUT -> exit(basic_consume_ok_timeout) end, receive @@ -2517,7 +2518,7 @@ dead_letter_target(Config) -> requeue =false, multiple = false}), queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]) - after 5000 -> + after ?TIMEOUT -> exit(timeout) end, rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). @@ -2602,7 +2603,7 @@ receive_filtered_batch(Ch, Count, ExpectedSize) -> ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = false}), receive_filtered_batch(Ch, Count + 1, ExpectedSize) - after 10000 -> + after ?TIMEOUT -> flush(), exit({not_enough_messages, Count}) end. diff --git a/deps/rabbit/test/rabbitmq_queues_cli_integration_SUITE.erl b/deps/rabbit/test/rabbitmq_queues_cli_integration_SUITE.erl index fc56dfbed9..2d869be8f8 100644 --- a/deps/rabbit/test/rabbitmq_queues_cli_integration_SUITE.erl +++ b/deps/rabbit/test/rabbitmq_queues_cli_integration_SUITE.erl @@ -161,6 +161,6 @@ publish_confirm(Ch, QName) -> #'basic.nack'{} -> ct:pal("NOT CONFIRMED! ~ts", [QName]), fail - after 10000 -> + after 30000 -> exit(confirm_timeout) end. diff --git a/deps/rabbit/test/single_active_consumer_SUITE.erl b/deps/rabbit/test/single_active_consumer_SUITE.erl index ac682ad957..7db3314d1a 100644 --- a/deps/rabbit/test/single_active_consumer_SUITE.erl +++ b/deps/rabbit/test/single_active_consumer_SUITE.erl @@ -179,13 +179,13 @@ fallback_to_another_consumer_when_first_one_is_cancelled_qos1(Config) -> amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, consumer_tag = CTag1}, self()), receive #'basic.consume_ok'{consumer_tag = CTag1} -> ok - after 5000 -> ct:fail(timeout_ctag1) + after ?TIMEOUT -> ct:fail(timeout_ctag1) end, amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, consumer_tag = CTag2}, self()), receive #'basic.consume_ok'{consumer_tag = CTag2} -> ok - after 5000 -> ct:fail(timeout_ctag2) + after ?TIMEOUT -> ct:fail(timeout_ctag2) end, Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, @@ -195,12 +195,12 @@ fallback_to_another_consumer_when_first_one_is_cancelled_qos1(Config) -> DTag1 = receive {#'basic.deliver'{consumer_tag = CTag1, delivery_tag = DTag}, #amqp_msg{payload = <<"m1">>}} -> DTag - after 5000 -> ct:fail(timeout_m1) + after ?TIMEOUT -> ct:fail(timeout_m1) end, #'basic.cancel_ok'{consumer_tag = CTag1} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag1}), receive #'basic.cancel_ok'{consumer_tag = CTag1} -> ok - after 5000 -> ct:fail(missing_cancel) + after ?TIMEOUT -> ct:fail(missing_cancel) end, amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag1}), @@ -208,7 +208,7 @@ fallback_to_another_consumer_when_first_one_is_cancelled_qos1(Config) -> receive {#'basic.deliver'{consumer_tag = CTag2}, #amqp_msg{payload = <<"m2">>}} -> ok; Unexpected -> ct:fail({unexpected, Unexpected}) - after 5000 -> ct:fail(timeout_m2) + after ?TIMEOUT -> ct:fail(timeout_m2) end, amqp_connection:close(C). @@ -385,7 +385,7 @@ wait_for_messages(ExpectedCount, State) -> receive {message, {MessagesPerConsumer, MessageCount}} -> wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount}) - after 5000 -> + after ?TIMEOUT -> {missing, ExpectedCount, State} end. @@ -393,7 +393,7 @@ wait_for_cancel_ok() -> receive {cancel_ok, CTag} -> {cancel_ok, CTag} - after 5000 -> + after ?TIMEOUT -> throw(consumer_cancel_ok_timeout) end. @@ -402,7 +402,7 @@ receive_deliver() -> {#'basic.deliver'{consumer_tag = CTag, delivery_tag = DTag}, _} -> {CTag, DTag} - after 5000 -> + after ?TIMEOUT -> exit(deliver_timeout) end. diff --git a/deps/rabbit/test/topic_permission_SUITE.erl b/deps/rabbit/test/topic_permission_SUITE.erl index b7c2e10b24..fd5bf398be 100644 --- a/deps/rabbit/test/topic_permission_SUITE.erl +++ b/deps/rabbit/test/topic_permission_SUITE.erl @@ -111,7 +111,7 @@ amqp_x_cc_annotation(Config) -> ?assertEqual( <<"write access to topic 'x.1' in exchange 'amq.topic' in vhost '/' refused for user 'guest'">>, Description1) - after 5000 -> amqp_utils:flush(missing_ended), + after 30_000 -> amqp_utils:flush(missing_ended), ct:fail({missing_event, ?LINE}) end, @@ -134,7 +134,7 @@ amqp_x_cc_annotation(Config) -> ?assertEqual( <<"write access to topic 'x.2' in exchange 'amq.topic' in vhost '/' refused for user 'guest'">>, Description2) - after 5000 -> amqp_utils:flush(missing_ended), + after 30_000 -> amqp_utils:flush(missing_ended), ct:fail({missing_event, ?LINE}) end, @@ -178,7 +178,7 @@ amqpl_headers(Header, Config) -> #amqp_msg{payload = <<"m1">>, props = #'P_basic'{headers = [{Header, array, [{longstr, <<"a.2">>}]}]}}), receive #'basic.ack'{} -> ok - after 5000 -> ct:fail({missing_confirm, ?LINE}) + after 30_000 -> ct:fail({missing_confirm, ?LINE}) end, monitor(process, Ch1), @@ -412,6 +412,6 @@ assert_channel_down(Ch, Reason) -> {shutdown, {server_initiated_close, 403, Reason}}} -> ok - after 5000 -> + after 30_000 -> ct:fail({did_not_receive, Reason}) end. diff --git a/deps/rabbit/test/transactions_SUITE.erl b/deps/rabbit/test/transactions_SUITE.erl index 5dae3b8ebf..a5e7628792 100644 --- a/deps/rabbit/test/transactions_SUITE.erl +++ b/deps/rabbit/test/transactions_SUITE.erl @@ -90,7 +90,7 @@ return_after_commit(Config) -> Result = receive {#'basic.return'{}, _} -> return_before_commit - after 1000 -> + after 30_000 -> return_after_commit end, ?assertEqual(return_after_commit, Result), diff --git a/deps/rabbit/test/unicode_SUITE.erl b/deps/rabbit/test/unicode_SUITE.erl index 4f28d1362c..10798fa452 100644 --- a/deps/rabbit/test/unicode_SUITE.erl +++ b/deps/rabbit/test/unicode_SUITE.erl @@ -103,7 +103,7 @@ stream(Config) -> DelTag = receive {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> DeliveryTag - after 5000 -> + after 30_000 -> ct:fail(timeout) end, ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DelTag, diff --git a/deps/rabbit/test/unit_file_handle_cache_SUITE.erl b/deps/rabbit/test/unit_file_handle_cache_SUITE.erl index 5ba8931598..b75da557d1 100644 --- a/deps/rabbit/test/unit_file_handle_cache_SUITE.erl +++ b/deps/rabbit/test/unit_file_handle_cache_SUITE.erl @@ -173,7 +173,7 @@ file_handle_cache_reserve_open_file_above_limit1(_Config) -> receive opened -> throw(error_file_opened) - after 1000 -> + after 30_000 -> %% Let's release 5 file handles, that should leave %% enough free for the `open` to go through file_handle_cache:set_reservation(2), @@ -183,7 +183,7 @@ file_handle_cache_reserve_open_file_above_limit1(_Config) -> opened -> ok = file_handle_cache:set_limit(Limit), passed - after 5000 -> + after 30_000 -> throw(error_file_not_released) end end.