From 40cb4f46e8bcbe4ea0715a4535187bc9c7df857c Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Mon, 2 Dec 2024 16:04:12 +0100 Subject: [PATCH 1/4] Tests: rabbit_prometheus_http_SUITE longer wait --- deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index bea19a4d4e..7886e1150c 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -759,7 +759,7 @@ stream_pub_sub_metrics(Config) -> maps:with([rabbitmq_stream_consumer_max_offset_lag], parse_response(Body1)) end, - 100), + 30000), %% per-object metrics {_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=stream_consumer_metrics", From bdaa31e7eaeacc40a4f5d47a96524ac09ef39be0 Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Tue, 10 Dec 2024 13:14:37 +0100 Subject: [PATCH 2/4] Tests: catch exception on connection closed The tests force closing the connection with an error --- deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl b/deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl index 45a2f2046a..1b0bf06cac 100644 --- a/deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl +++ b/deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl @@ -924,7 +924,7 @@ test_failed_token_refresh_case1(Config) -> ?assertExit({{shutdown, {server_initiated_close, 403, _}}, _}, amqp_channel:call(Ch2, #'queue.declare'{queue = <<"a.q">>, exclusive = true})), - close_connection(Conn). + catch close_connection(Conn). refreshed_token_cannot_change_username(Config) -> {_, Token} = generate_valid_token_with_sub(Config, <<"username">>), @@ -950,7 +950,7 @@ test_failed_token_refresh_case2(Config) -> ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 530, _}}}, _}, amqp_connection:open_channel(Conn)), - close_connection(Conn). + catch close_connection(Conn). test_successful_connection_with_with_single_scope_alias_in_extra_scopes_source(Config) -> From 43cfc3c93736520d631f68e2e6b2b868c657c9c0 Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Tue, 10 Dec 2024 16:19:34 +0100 Subject: [PATCH 3/4] Tests: Increase receive-after timeout in all mqtt test suites --- deps/rabbitmq_mqtt/test/auth_SUITE.erl | 4 +- deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl | 27 ++-- .../test/protocol_interop_SUITE.erl | 32 ++-- deps/rabbitmq_mqtt/test/reader_SUITE.erl | 6 +- deps/rabbitmq_mqtt/test/retainer_SUITE.erl | 4 +- deps/rabbitmq_mqtt/test/util.erl | 6 +- deps/rabbitmq_mqtt/test/v5_SUITE.erl | 140 +++++++++--------- 7 files changed, 112 insertions(+), 107 deletions(-) diff --git a/deps/rabbitmq_mqtt/test/auth_SUITE.erl b/deps/rabbitmq_mqtt/test/auth_SUITE.erl index 72a9be7260..6420c80297 100644 --- a/deps/rabbitmq_mqtt/test/auth_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/auth_SUITE.erl @@ -672,7 +672,7 @@ setup_rabbit_auth_backend_mqtt_mock(Config) -> receive {ok, SP} -> SP after - 3000 -> ct:fail("timeout waiting for rabbit_auth_backend_mqtt_mock:setup/1") + 30_000 -> ct:fail("timeout waiting for rabbit_auth_backend_mqtt_mock:setup/1") end. client_id_propagation(Config) -> @@ -1333,6 +1333,6 @@ assert_connection_closed(ClientPid) -> {'EXIT', ClientPid, {shutdown, tcp_closed}} -> ok after - 2000 -> + 30_000 -> ct:fail("timed out waiting for exit message") end. diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index 32aa231c9e..e1f7b4154f 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -53,6 +53,7 @@ -define(RC_SERVER_SHUTTING_DOWN, 16#8B). -define(RC_KEEP_ALIVE_TIMEOUT, 16#8D). -define(RC_SESSION_TAKEN_OVER, 16#8E). +-define(TIMEOUT, 30_000). all() -> [{group, mqtt}]. @@ -738,28 +739,28 @@ pubsub(Config) -> receive {publish, #{client_pid := C1, qos := 1, payload := <<"m1">>}} -> ok - after 1000 -> ct:fail("missing m1") + after ?TIMEOUT -> ct:fail("missing m1") end, ok = emqtt:publish(C0, Topic1, <<"m2">>, qos0), receive {publish, #{client_pid := C1, qos := 0, payload := <<"m2">>}} -> ok - after 1000 -> ct:fail("missing m2") + after ?TIMEOUT -> ct:fail("missing m2") end, {ok, _} = emqtt:publish(C1, Topic0, <<"m3">>, qos1), receive {publish, #{client_pid := C0, qos := 1, payload := <<"m3">>}} -> ok - after 1000 -> ct:fail("missing m3") + after ?TIMEOUT -> ct:fail("missing m3") end, ok = emqtt:publish(C1, Topic0, <<"m4">>, qos0), receive {publish, #{client_pid := C0, qos := 0, payload := <<"m4">>}} -> ok - after 1000 -> ct:fail("missing m4") + after ?TIMEOUT -> ct:fail("missing m4") end, ok = emqtt:disconnect(C0), @@ -1130,7 +1131,7 @@ many_qos1_messages(Config) -> end, Payloads), receive proceed -> ok - after 30000 -> + after ?TIMEOUT -> ct:fail("message to proceed never received") end, ok = expect_publishes(C, Topic, Payloads), @@ -1383,7 +1384,7 @@ keepalive(Config) -> retain := true, topic := WillTopic, payload := WillPayload}} -> ok - after 3000 -> ct:fail("missing will") + after ?TIMEOUT -> ct:fail("missing will") end, ok = emqtt:disconnect(C2). @@ -1453,7 +1454,7 @@ session_switch(Config, Disconnect) -> receive {publish, #{client_pid := C2, payload := <<"m1">>, qos := 0}} -> ok - after 1000 -> ct:fail("did not receive m1 with QoS 0") + after ?TIMEOUT -> ct:fail("did not receive m1 with QoS 0") end, %% New connection should be able to unsubscribe. ?assertMatch({ok, _, _}, emqtt:unsubscribe(C2, Topic)), @@ -1720,7 +1721,7 @@ max_packet_size_authenticated(Config) -> v4 -> ok; v5 -> ?assertMatch(#{'Maximum-Packet-Size' := MaxSize}, ConnAckProps), receive {disconnected, _ReasonCodePacketTooLarge = 149, _Props} -> ok - after 1000 -> ct:fail("missing DISCONNECT packet from server") + after ?TIMEOUT -> ct:fail("missing DISCONNECT packet from server") end end, ok = rpc(Config, persistent_term, put, [Key, OldMaxSize]). @@ -1805,7 +1806,7 @@ incoming_message_interceptors(Config) -> headers = [{<<"timestamp_in_ms">>, long, Millis} | _XHeaders] }}} -> ok - after 5000 -> ct:fail(missing_deliver) + after ?TIMEOUT -> ct:fail(missing_deliver) end, delete_queue(Ch, Stream), @@ -1831,7 +1832,7 @@ retained_message_conversion(Config) -> retain := true, topic := Topic, payload := Payload}} -> ok - after 1000 -> ct:fail("missing retained message") + after ?TIMEOUT -> ct:fail("missing retained message") end, ok = emqtt:publish(C, Topic, <<>>, [{retain, true}]), ok = emqtt:disconnect(C). @@ -1917,7 +1918,7 @@ await_confirms_ordered(From, N, To) -> await_confirms_ordered(From, N + 1, To); Got -> ct:fail("Received unexpected message. Expected: ~p Got: ~p", [Expected, Got]) - after 10_000 -> + after ?TIMEOUT -> ct:fail("Did not receive expected message: ~p", [Expected]) end. @@ -1929,7 +1930,7 @@ await_confirms_unordered(From, Left) -> await_confirms_unordered(From, Left - 1); Other -> ct:fail("Received unexpected message: ~p", [Other]) - after 10_000 -> + after ?TIMEOUT -> ct:fail("~b confirms are missing", [Left]) end. @@ -1976,6 +1977,6 @@ assert_v5_disconnect_reason_code(Config, ReasonCode) -> v3 -> ok; v4 -> ok; v5 -> receive {disconnected, ReasonCode, _Props} -> ok - after 1000 -> ct:fail("missing DISCONNECT packet from server") + after ?TIMEOUT -> ct:fail("missing DISCONNECT packet from server") end end. diff --git a/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl b/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl index 723e4e43e4..be2dc431c3 100644 --- a/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl @@ -18,6 +18,8 @@ -include_lib("amqp10_common/include/amqp10_framing.hrl"). -include_lib("rabbitmq_stomp/include/rabbit_stomp_frame.hrl"). +-define(TIMEOUT, 30_000). + -import(util, [connect/2, connect/4]). @@ -156,7 +158,7 @@ mqtt_amqpl_mqtt(Config) -> {<<"key">>, <<"val">>}, {<<"rabbit🐇"/utf8>>, <<"carrot🥕"/utf8>>}], lists:sort(UserProperty1)) - after 1000 -> ct:fail("did not receive reply") + after ?TIMEOUT -> ct:fail("did not receive reply") end, %% Another message MQTT 5.0 to AMQP 0.9.1, this time with QoS 0 @@ -188,7 +190,7 @@ amqpl_mqtt_gh_12707(Config) -> payload := MqttPayload}} -> ?assertEqual(Topic, MqttTopic), ?assertEqual(Payload, MqttPayload) - after 5000 -> + after ?TIMEOUT -> ct:fail("did not receive a delivery") end, @@ -277,7 +279,7 @@ mqtt_amqp_mqtt(Config) -> {ok, Sender} = amqp10_client:attach_sender_link( Session2, SenderLinkName, ReplyToAddress, unsettled), receive {amqp10_event, {link, Sender, credited}} -> ok - after 1000 -> ct:fail(credited_timeout) + after ?TIMEOUT -> ct:fail(credited_timeout) end, DTag = <<"my-dtag">>, @@ -292,7 +294,7 @@ mqtt_amqp_mqtt(Config) -> Msg2 = amqp10_msg:set_headers(#{durable => True}, Msg2b), ok = amqp10_client:send_msg(Sender, Msg2), receive {amqp10_disposition, {accepted, DTag}} -> ok - after 1000 -> ct:fail(settled_timeout) + after ?TIMEOUT -> ct:fail(settled_timeout) end, ok = amqp10_client:detach_link(Sender), @@ -311,7 +313,7 @@ mqtt_amqp_mqtt(Config) -> 'Subscription-Identifier' := 999} }, MqttMsg) - after 1000 -> ct:fail("did not receive reply") + after ?TIMEOUT -> ct:fail("did not receive reply") end, ok = emqtt:disconnect(C). @@ -342,7 +344,7 @@ amqp_mqtt_amqp(Config) -> <<"sender">>, rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"t.1">>)), receive {amqp10_event, {link, Sender, credited}} -> ok - after 2000 -> ct:fail(credited_timeout) + after ?TIMEOUT -> ct:fail(credited_timeout) end, RequestBody = <<"my request">>, @@ -371,7 +373,7 @@ amqp_mqtt_amqp(Config) -> false -> ok end - after 2000 -> ct:fail("did not receive request") + after ?TIMEOUT -> ct:fail("did not receive request") end, %% MQTT 5.0 to AMQP 1.0 @@ -420,7 +422,7 @@ amqp_mqtt(Qos, Config) -> <<"sender">>, rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"my.topic">>)), receive {amqp10_event, {link, Sender, credited}} -> ok - after 2000 -> ct:fail(credited_timeout) + after ?TIMEOUT -> ct:fail(credited_timeout) end, %% single amqp-value section @@ -463,28 +465,28 @@ amqp_mqtt(Qos, Config) -> false -> ok end - after 5000 -> ct:fail({missing_publish, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_publish, ?LINE}) end, receive {publish, #{payload := Payload2}} -> ?assertEqual([Body2], amqp10_framing:decode_bin(Payload2)) - after 5000 -> ct:fail({missing_publish, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_publish, ?LINE}) end, receive {publish, #{payload := Payload3}} -> ?assertEqual(Body3, amqp10_framing:decode_bin(Payload3)) - after 5000 -> ct:fail({missing_publish, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_publish, ?LINE}) end, receive {publish, #{payload := Payload4}} -> ?assertEqual(Body4, amqp10_framing:decode_bin(Payload4)) - after 5000 -> ct:fail({missing_publish, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_publish, ?LINE}) end, receive {publish, #{payload := Payload5}} -> ?assertEqual(<<0, 255>>, Payload5) - after 5000 -> ct:fail({missing_publish, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_publish, ?LINE}) end, receive {publish, #{payload := Payload6}} -> %% We expect that RabbitMQ concatenates the binaries of multiple data sections. ?assertEqual(<<0, 1, 2, 3>>, Payload6) - after 5000 -> ct:fail({missing_publish, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_publish, ?LINE}) end, ok = emqtt:disconnect(C). @@ -567,7 +569,7 @@ mqtt_stomp_mqtt(Config) -> 'Correlation-Data' := Correlation, 'User-Property' := UserProp}} = MqttMsg, ?assert(lists:member({<<"x-key">>, <<"val4">>}, UserProp)) - after 1000 -> ct:fail("did not receive reply") + after ?TIMEOUT -> ct:fail("did not receive reply") end, ok = emqtt:disconnect(C). diff --git a/deps/rabbitmq_mqtt/test/reader_SUITE.erl b/deps/rabbitmq_mqtt/test/reader_SUITE.erl index 1c4fa13319..8378d77e8c 100644 --- a/deps/rabbitmq_mqtt/test/reader_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/reader_SUITE.erl @@ -109,7 +109,7 @@ block_connack_timeout(Config) -> ClientMRef = monitor(process, Client), {error, connack_timeout} = emqtt:connect(Client), receive {'DOWN', ClientMRef, process, Client, connack_timeout} -> ok - after 200 -> ct:fail("missing connack_timeout in client") + after 30_000 -> ct:fail("missing connack_timeout in client") end, MqttReader = rpc(Config, ?MODULE, mqtt_connection_pid, [Ports]), @@ -122,7 +122,7 @@ block_connack_timeout(Config) -> %% We expect that MQTT reader process exits (without crashing) %% because our client already disconnected. ok - after 2000 -> ct:fail("missing peername_not_known from server") + after 30_000 -> ct:fail("missing peername_not_known from server") end, %% Ensure that our client is not registered. ?assertEqual([], all_connection_pids(Config)), @@ -331,6 +331,6 @@ num_received(Topic, Payload, N) -> {publish, #{topic := Topic, payload := Payload}} -> num_received(Topic, Payload, N + 1) - after 1000 -> + after 3000 -> N end. diff --git a/deps/rabbitmq_mqtt/test/retainer_SUITE.erl b/deps/rabbitmq_mqtt/test/retainer_SUITE.erl index a69df2e6e2..13d9753356 100644 --- a/deps/rabbitmq_mqtt/test/retainer_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/retainer_SUITE.erl @@ -213,7 +213,7 @@ recover_with_message_expiry_interval(Config) -> payload := <<"m1">>, properties := Props}} when map_size(Props) =:= 0 -> ok - after 100 -> ct:fail("did not topic/1") + after 30_000 -> ct:fail("did not topic/1") end, receive {publish, #{client_pid := C2, @@ -222,7 +222,7 @@ recover_with_message_expiry_interval(Config) -> payload := <<"m2">>, properties := #{'Message-Expiry-Interval' := MEI}}} -> assert_message_expiry_interval(100 - ElapsedSeconds2, MEI) - after 100 -> ct:fail("did not topic/2") + after 30_000 -> ct:fail("did not topic/2") end, receive Unexpected -> ct:fail("Received unexpectedly: ~p", [Unexpected]) diff --git a/deps/rabbitmq_mqtt/test/util.erl b/deps/rabbitmq_mqtt/test/util.erl index 160235e158..4ab564714d 100644 --- a/deps/rabbitmq_mqtt/test/util.erl +++ b/deps/rabbitmq_mqtt/test/util.erl @@ -65,7 +65,7 @@ expect_publishes(Client, Topic, [Payload|Rest]) payload := Other}} -> ct:fail("Received unexpected PUBLISH payload. Expected: ~p Got: ~p", [Payload, Other]) - after 3000 -> + after 30_000 -> {publish_not_received, Payload} end. @@ -120,14 +120,14 @@ await_exit(Pid) -> receive {'EXIT', Pid, _} -> ok after - 20_000 -> ct:fail({missing_exit, Pid}) + 30_000 -> ct:fail({missing_exit, Pid}) end. await_exit(Pid, Reason) -> receive {'EXIT', Pid, Reason} -> ok after - 20_000 -> ct:fail({missing_exit, Pid}) + 30_000 -> ct:fail({missing_exit, Pid}) end. %% "CleanStart=0 and SessionExpiry=0xFFFFFFFF (UINT_MAX) for diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index fa977fd3a2..cfca785126 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -41,6 +41,8 @@ -define(RC_SESSION_TAKEN_OVER, 16#8E). -define(RC_TOPIC_ALIAS_INVALID, 16#94). +-define(TIMEOUT, 30_000). + all() -> [{group, mqtt}]. @@ -265,7 +267,7 @@ message_expiry(Config) -> payload := <<"m2">>, properties := Props}} when map_size(Props) =:= 0 -> ok - after 1000 -> ct:fail("did not receive m2") + after ?TIMEOUT -> ct:fail("did not receive m2") end, receive {publish, #{client_pid := Sub2, @@ -276,7 +278,7 @@ message_expiry(Config) -> %% Application Message has been waiting in the Server" [MQTT-3.3.2-6] properties := #{'Message-Expiry-Interval' := MEI}}} -> assert_message_expiry_interval(10 - 2, MEI) - after 100 -> ct:fail("did not receive m3") + after ?TIMEOUT -> ct:fail("did not receive m3") end, assert_nothing_received(), NumExpired = dead_letter_metric(messages_dead_lettered_expired_total, Config) - NumExpiredBefore, @@ -346,7 +348,7 @@ message_expiry_retained_message(Config) -> payload := <<"m3.2">>, properties := Props}} when map_size(Props) =:= 0 -> ok - after 100 -> ct:fail("did not topic3") + after ?TIMEOUT -> ct:fail("did not topic3") end, receive {publish, #{client_pid := Sub, @@ -355,7 +357,7 @@ message_expiry_retained_message(Config) -> payload := <<"m4">>, properties := #{'Message-Expiry-Interval' := MEI}}} -> assert_message_expiry_interval(100 - 2, MEI) - after 100 -> ct:fail("did not receive topic4") + after ?TIMEOUT -> ct:fail("did not receive topic4") end, assert_nothing_received(), @@ -513,7 +515,7 @@ client_rejects_publish(Config) -> packet_id := PacketId}} -> %% Negatively ack the PUBLISH. emqtt:puback(C, PacketId, ?RC_UNSPECIFIED_ERROR) - after 1000 -> + after ?TIMEOUT -> ct:fail("did not receive PUBLISH") end, %% Even though we nacked the PUBLISH, we expect the server to not re-send the same message: @@ -538,7 +540,7 @@ client_receive_maximum_min(Config) -> PacketId1 = receive {publish, #{payload := <<"m1">>, packet_id := Id}} -> Id - after 1000 -> + after ?TIMEOUT -> ct:fail("did not receive m1") end, assert_nothing_received(), @@ -625,19 +627,19 @@ subscription_option_retain_as_published(Config) -> topic := <<"t/1">>, payload := <<"m1">>, retain := true}} -> ok - after 1000 -> ct:fail("did not receive m1") + after ?TIMEOUT -> ct:fail("did not receive m1") end, receive {publish, #{client_pid := C1, topic := <<"t/2">>, payload := <<"m2">>, retain := false}} -> ok - after 1000 -> ct:fail("did not receive m2") + after ?TIMEOUT -> ct:fail("did not receive m2") end, receive {publish, #{client_pid := C2, topic := <<"t/1">>, payload := <<"m1">>, retain := true}} -> ok - after 1000 -> ct:fail("did not receive m1") + after ?TIMEOUT -> ct:fail("did not receive m1") end, {ok, _} = emqtt:publish(C1, <<"t/1">>, <<>>, [{retain, true}, {qos, 1}]), {ok, _} = emqtt:publish(C1, <<"t/2">>, <<>>, [{retain, true}, {qos, 1}]), @@ -659,14 +661,14 @@ subscription_option_retain_as_published_wildcards(Config) -> %% No matching subscription has the %% Retain As Published option set. retain := false}} -> ok - after 1000 -> ct:fail("did not receive m1") + after ?TIMEOUT -> ct:fail("did not receive m1") end, receive {publish, #{topic := <<"t/2">>, payload := <<"m2">>, %% (At least) one matching subscription has the %% Retain As Published option set. retain := true}} -> ok - after 1000 -> ct:fail("did not receive m2") + after ?TIMEOUT -> ct:fail("did not receive m2") end, {ok, _} = emqtt:publish(C, <<"t/1">>, <<>>, [{retain, true}, {qos, 1}]), {ok, _} = emqtt:publish(C, <<"t/2">>, <<>>, [{retain, true}, {qos, 1}]), @@ -736,7 +738,7 @@ subscription_identifier(Config) -> %% and that it used the same identifier for more than one of them. In this case the %% PUBLISH packet will carry multiple identical Subscription Identifiers." [v5 3.3.4] properties := #{'Subscription-Identifier' := [1, 1]}}} -> ok - after 1000 -> ct:fail("did not receive m1") + after ?TIMEOUT -> ct:fail("did not receive m1") end, receive {publish, #{client_pid := C2, @@ -747,14 +749,14 @@ subscription_identifier(Config) -> %% packet the Subscription Identifiers for all matching subscriptions which have a %% Subscription Identifiers, their order is not significant [MQTT-3.3.4-4]." [v5 3.3.4] ?assertEqual([1, 16#fffffff], lists:sort(Ids)) - after 1000 -> ct:fail("did not receive m2") + after ?TIMEOUT -> ct:fail("did not receive m2") end, receive {publish, #{client_pid := C2, topic := <<"t/3">>, payload := <<"m3">>, properties := #{'Subscription-Identifier' := 16#fffffff}}} -> ok - after 1000 -> ct:fail("did not receive m3") + after ?TIMEOUT -> ct:fail("did not receive m3") end, receive {publish, #{client_pid := C2, @@ -762,7 +764,7 @@ subscription_identifier(Config) -> payload := <<"m4">>, properties := Props}} -> ?assertNot(maps:is_key('Subscription-Identifier', Props)) - after 1000 -> ct:fail("did not receive m4") + after ?TIMEOUT -> ct:fail("did not receive m4") end, assert_nothing_received(), ok = emqtt:disconnect(C1), @@ -784,7 +786,7 @@ subscription_identifier_amqp091(Config) -> topic := <<"a/a">>, payload := <<"m1">>, properties := #{'Subscription-Identifier' := 1}}} -> ok - after 1000 -> ct:fail("did not receive message m1") + after ?TIMEOUT -> ct:fail("did not receive message m1") end, %% Test routing to multiple queues. @@ -796,14 +798,14 @@ subscription_identifier_amqp091(Config) -> topic := <<"a/b">>, payload := <<"m2">>, properties := #{'Subscription-Identifier' := 1}}} -> ok - after 1000 -> ct:fail("did not receive message m2") + after ?TIMEOUT -> ct:fail("did not receive message m2") end, receive {publish, #{client_pid := C2, topic := <<"a/b">>, payload := <<"m2">>, properties := #{'Subscription-Identifier' := 16#fffffff}}} -> ok - after 1000 -> ct:fail("did not receive message m2") + after ?TIMEOUT -> ct:fail("did not receive message m2") end, ok = emqtt:disconnect(C1), @@ -829,7 +831,7 @@ subscription_identifier_at_most_once_dead_letter(Config) -> topic := <<"dead letter/a">>, payload := <<"msg">>, properties := #{'Subscription-Identifier' := 1}}} -> ok - after 1000 -> ct:fail("did not receive msg") + after ?TIMEOUT -> ct:fail("did not receive msg") end, ok = emqtt:disconnect(C), ok = rabbit_ct_client_helpers:close_channels_and_connection(Config, 0). @@ -880,7 +882,7 @@ subscription_options_persisted(Config) -> retain := true, qos := 0, properties := #{'Subscription-Identifier' := 99}}} -> ok - after 1000 -> ct:fail("did not receive m2") + after ?TIMEOUT -> ct:fail("did not receive m2") end, assert_nothing_received(), {ok, _} = emqtt:publish(C2, <<"t2">>, <<>>, [{retain, true}, {qos, 1}]), @@ -909,7 +911,7 @@ subscription_options_modify(Config) -> {ok, _} = emqtt:publish(C, Topic, <<"m2">>, qos1), receive {publish, #{payload := <<"m2">>, qos := 0 }} -> ok - after 1000 -> ct:fail("did not receive m2") + after ?TIMEOUT -> ct:fail("did not receive m2") end, %% modify QoS @@ -918,7 +920,7 @@ subscription_options_modify(Config) -> receive {publish, #{payload := <<"m3">>, qos := 1, properties := #{'Subscription-Identifier' := 1}}} -> ok - after 1000 -> ct:fail("did not receive m3") + after ?TIMEOUT -> ct:fail("did not receive m3") end, %% modify Subscription Identifier @@ -926,7 +928,7 @@ subscription_options_modify(Config) -> {ok, _} = emqtt:publish(C, Topic, <<"m4">>, qos1), receive {publish, #{payload := <<"m4">>, properties := #{'Subscription-Identifier' := 2}}} -> ok - after 1000 -> ct:fail("did not receive m4") + after ?TIMEOUT -> ct:fail("did not receive m4") end, %% remove Subscription Identifier @@ -935,19 +937,19 @@ subscription_options_modify(Config) -> receive {publish, #{payload := <<"m5">>, retain := false, properties := Props}} when map_size(Props) =:= 0 -> ok - after 1000 -> ct:fail("did not receive m5") + after ?TIMEOUT -> ct:fail("did not receive m5") end, %% modify Retain As Published {ok, _, [1]} = emqtt:subscribe(C, Topic, [{rap, true}, {qos, 1}]), receive {publish, #{payload := <<"m5">>, retain := true}} -> ok - after 1000 -> ct:fail("did not receive retained m5") + after ?TIMEOUT -> ct:fail("did not receive retained m5") end, {ok, _} = emqtt:publish(C, Topic, <<"m6">>, [{retain, true}, {qos, 1}]), receive {publish, #{payload := <<"m6">>, retain := true}} -> ok - after 1000 -> ct:fail("did not receive m6") + after ?TIMEOUT -> ct:fail("did not receive m6") end, assert_nothing_received(), @@ -979,13 +981,13 @@ subscription_options_modify_qos(Qos, Config) -> receive {publish, #{payload := <<"1">>, properties := Props}} -> ?assertEqual(0, maps:size(Props)) - after 1000 -> ct:fail("did not receive 1") + after ?TIMEOUT -> ct:fail("did not receive 1") end, %% Replace subscription while another client is sending messages. {ok, _, [Qos]} = emqtt:subscribe(Sub, #{'Subscription-Identifier' => 1}, Topic, Qos), Sender ! stop, NumSent = receive {N, Sender} -> N - after 1000 -> ct:fail("could not stop publisher") + after ?TIMEOUT -> ct:fail("could not stop publisher") end, ct:pal("Publisher sent ~b messages", [NumSent]), LastExpectedPayload = integer_to_binary(NumSent), @@ -993,7 +995,7 @@ subscription_options_modify_qos(Qos, Config) -> qos := Qos, client_pid := Sub, properties := #{'Subscription-Identifier' := 1}}} -> ok - after 1000 -> ct:fail("did not receive ~s", [LastExpectedPayload]) + after ?TIMEOUT -> ct:fail("did not receive ~s", [LastExpectedPayload]) end, case Qos of 0 -> @@ -1024,7 +1026,7 @@ session_upgrade_v3_v5_qos(Qos, Config) -> Sender = spawn_link(?MODULE, send, [self(), Pub, Topic, 0]), receive {publish, #{payload := <<"1">>, client_pid := Subv3}} -> ok - after 1000 -> ct:fail("did not receive 1") + after ?TIMEOUT -> ct:fail("did not receive 1") end, %% Upgrade session from v3 to v5 while another client is sending messages. ok = emqtt:disconnect(Subv3), @@ -1032,14 +1034,14 @@ session_upgrade_v3_v5_qos(Qos, Config) -> ?assertEqual(5, proplists:get_value(proto_ver, emqtt:info(Subv5))), Sender ! stop, NumSent = receive {N, Sender} -> N - after 1000 -> ct:fail("could not stop publisher") + after ?TIMEOUT -> ct:fail("could not stop publisher") end, ct:pal("Publisher sent ~b messages", [NumSent]), LastExpectedPayload = integer_to_binary(NumSent), receive {publish, #{payload := LastExpectedPayload, qos := Qos, client_pid := Subv5}} -> ok - after 1000 -> ct:fail("did not receive ~s", [LastExpectedPayload]) + after ?TIMEOUT -> ct:fail("did not receive ~s", [LastExpectedPayload]) end, case Qos of 0 -> @@ -1093,7 +1095,7 @@ session_upgrade_v3_v5_amqp091_pub(Config) -> receive {publish, #{payload := Payload, qos := 1, client_pid := Subv5}} -> ok - after 1000 -> ct:fail("did not receive message") + after ?TIMEOUT -> ct:fail("did not receive message") end, ok = emqtt:disconnect(Subv5), ok = rabbit_ct_client_helpers:close_channels_and_connection(Config, 0). @@ -1118,7 +1120,7 @@ compatibility_v3_v5(Config) -> %% v5 features should work even when message comes from a v3 client. retain := true, properties := #{'Subscription-Identifier' := 99}}} -> ok - after 1000 -> ct:fail("did not receive from v3") + after ?TIMEOUT -> ct:fail("did not receive from v3") end, {ok, _} = emqtt:publish(Cv3, <<"v5">>, <<>>, [{retain, true}, {qos, 1}]), ok = emqtt:disconnect(Cv3), @@ -1180,7 +1182,7 @@ amqp091_cc_header(Config) -> #{topic := <<"first/key">>, payload := <<"msg">>, properties := #{'Subscription-Identifier' := 1}}} -> ok - after 1000 -> ct:fail("did not receive msg") + after ?TIMEOUT -> ct:fail("did not receive msg") end, assert_nothing_received(), ok = emqtt:disconnect(C). @@ -1193,7 +1195,7 @@ publish_property_content_type(Config) -> {ok, _} = emqtt:publish(C, Topic, #{'Content-Type' => <<"text/plain😎;charset=UTF-8"/utf8>>}, Payload, [{qos, 1}]), receive {publish, #{payload := Payload, properties := #{'Content-Type' := <<"text/plain😎;charset=UTF-8"/utf8>>}}} -> ok - after 1000 -> ct:fail("did not receive message") + after ?TIMEOUT -> ct:fail("did not receive message") end, ok = emqtt:disconnect(C). @@ -1205,11 +1207,11 @@ publish_property_payload_format_indicator(Config) -> {ok, _} = emqtt:publish(C, Topic, #{'Payload-Format-Indicator' => 1}, <<"m2">>, [{qos, 1}]), receive {publish, #{payload := <<"m1">>, properties := #{'Payload-Format-Indicator' := 0}}} -> ok - after 1000 -> ct:fail("did not receive m1") + after ?TIMEOUT -> ct:fail("did not receive m1") end, receive {publish, #{payload := <<"m2">>, properties := #{'Payload-Format-Indicator' := 1}}} -> ok - after 1000 -> ct:fail("did not receive m2") + after ?TIMEOUT -> ct:fail("did not receive m2") end, ok = emqtt:disconnect(C). @@ -1241,7 +1243,7 @@ publish_property_response_topic_correlation_data(Config) -> ok = emqtt:publish(FrenchResponder, ResponseTopic, #{'Correlation-Data' => Corr0}, <<"Bonjour Henri">>, [{qos, 0}]) - after 1000 -> ct:fail("French responder did not receive request") + after ?TIMEOUT -> ct:fail("French responder did not receive request") end, receive {publish, #{client_pid := ItalianResponder, payload := <<"Harry">>, @@ -1250,21 +1252,21 @@ publish_property_response_topic_correlation_data(Config) -> ok = emqtt:publish(ItalianResponder, ResponseTopic, #{'Correlation-Data' => Corr1}, <<"Buongiorno Enrico">>, [{qos, 0}]) - after 1000 -> ct:fail("Italian responder did not receive request") + after ?TIMEOUT -> ct:fail("Italian responder did not receive request") end, receive {publish, #{client_pid := Requester, properties := #{'Correlation-Data' := CorrelationItalian}, payload := Payload0 }} -> ?assertEqual(<<"Buongiorno Enrico">>, Payload0) - after 1000 -> ct:fail("did not receive Italian response") + after ?TIMEOUT -> ct:fail("did not receive Italian response") end, receive {publish, #{client_pid := Requester, properties := #{'Correlation-Data' := CorrelationFrench}, payload := Payload1 }} -> ?assertEqual(<<"Bonjour Henri">>, Payload1) - after 1000 -> ct:fail("did not receive French response") + after ?TIMEOUT -> ct:fail("did not receive French response") end, [ok = emqtt:disconnect(C) || C <- [Requester, FrenchResponder, ItalianResponder]]. @@ -1285,7 +1287,7 @@ publish_property_user_property(Config) -> {ok, _} = emqtt:publish(C, Topic, #{'User-Property' => UserProperty}, Payload, [{qos, 1}]), receive {publish, #{payload := Payload, properties := #{'User-Property' := UserProperty}}} -> ok - after 1000 -> ct:fail("did not receive message") + after ?TIMEOUT -> ct:fail("did not receive message") end, ok = emqtt:disconnect(C). @@ -1338,7 +1340,7 @@ will_delay(WillDelay, SessionExpiry, ClientId, Config) end, receive {publish, #{payload := Msg}} -> ok; Unexpected -> ct:fail({unexpected_message, Unexpected}) - after 3000 -> ct:fail(will_message_timeout) + after ?TIMEOUT -> ct:fail(will_message_timeout) end, %% Cleanup C2 = connect(ClientId, Config), @@ -1358,7 +1360,7 @@ will_delay_session_expiry_zero(Config) -> erlang:exit(C, trigger_will_message), %% Since default Session Expiry Interval is 0, we expect Will Message immediately. receive {publish, #{payload := Msg}} -> ok - after 500 -> ct:fail(will_message_timeout) + after ?TIMEOUT -> ct:fail(will_message_timeout) end, ok = emqtt:disconnect(Sub). @@ -1467,16 +1469,16 @@ will_delay_session_takeover(Config) -> {will_topic, Topic}, {will_payload, <<"will-4b">>}]), [receive {disconnected, ?RC_SESSION_TAKEN_OVER, #{}} -> ok - after 1000 -> ct:fail("server did not disconnect us") + after ?TIMEOUT -> ct:fail("server did not disconnect us") end || _ <- Clients], receive {publish, #{client_pid := Sub, payload := <<"will-3a">>}} -> ok - after 5000 -> ct:fail({missing_msg, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_msg, ?LINE}) end, receive {publish, #{client_pid := Sub, payload := <<"will-4a">>}} -> ok - after 5000 -> ct:fail({missing_msg, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_msg, ?LINE}) end, assert_nothing_received(), @@ -1544,7 +1546,7 @@ will_delay_message_expiry_publish_properties(Config) -> %% it has been waiting in the Server for 2 seconds to be consumed. assert_message_expiry_interval(20 - 2, MEI); Other -> ct:fail("received unexpected message: ~p", [Other]) - after 500 -> ct:fail("did not receive Will Message") + after ?TIMEOUT -> ct:fail("did not receive Will Message") end, ok = emqtt:disconnect(Sub2). @@ -1594,7 +1596,7 @@ will_properties0(Config, WillDelayInterval) -> 'Response-Topic' := <<"response/topic">>, 'Correlation-Data' := CorrelationData} = Props}} when map_size(Props) =:= 5 -> ok - after 1500 -> ct:fail("did not receive Will Message") + after ?TIMEOUT -> ct:fail("did not receive Will Message") end, ok = emqtt:disconnect(Sub). @@ -1633,7 +1635,7 @@ retain_properties(Config) -> retain := true, qos := 1, properties := Props}} -> ok - after 500 -> ct:fail("did not receive m1") + after ?TIMEOUT -> ct:fail("did not receive m1") end, receive {publish, #{client_pid := Sub, @@ -1642,7 +1644,7 @@ retain_properties(Config) -> retain := true, qos := 1, properties := Props}} -> ok - after 500 -> ct:fail("did not receive m2") + after ?TIMEOUT -> ct:fail("did not receive m2") end, {ok, _} = emqtt:publish(Sub, <<"t/1">>, <<>>, [{retain, true}, {qos, 1}]), {ok, _} = emqtt:publish(Sub, <<"t/2">>, <<>>, [{retain, true}, {qos, 1}]), @@ -1675,7 +1677,7 @@ will_delay_node_restart(Config) -> T = erlang:monotonic_time(millisecond), ok = rabbit_ct_broker_helpers:drain_node(Config, 0), [receive {disconnected, ?RC_SERVER_SHUTTING_DOWN, #{}} -> ok - after 10_000 -> ct:fail("server did not disconnect us") + after ?TIMEOUT -> ct:fail("server did not disconnect us") end || _ <- ClientsNode0], ok = rabbit_ct_broker_helpers:stop_node(Config, 0), ElapsedMs = erlang:monotonic_time(millisecond) - T, @@ -1687,12 +1689,12 @@ will_delay_node_restart(Config) -> %% After node 0 restarts, we should receive the Will Message promptly on both nodes 0 and 1. receive {publish, #{client_pid := Sub1, payload := Payload}} -> ok - after 1000 -> ct:fail("did not receive Will Message on node 1") + after ?TIMEOUT -> ct:fail("did not receive Will Message on node 1") end, Sub0b = connect(<<"sub0">>, Config, 0, [{clean_start, false}]), receive {publish, #{client_pid := Sub0b, payload := Payload}} -> ok - after 1000 -> ct:fail("did not receive Will Message on node 0") + after ?TIMEOUT -> ct:fail("did not receive Will Message on node 0") end, ok = emqtt:disconnect(Sub0b), @@ -1726,7 +1728,7 @@ session_switch_v3_v5(Config, Disconnect) -> #{client_pid := C2, payload := <<"m1">>, qos := 1}} -> ok - after 1000 -> ct:fail("did not receive from m1") + after ?TIMEOUT -> ct:fail("did not receive from m1") end, %% Modifying subscription with v5 specific feature should work. {ok, _, [1]} = emqtt:subscribe(C2, Topic, [{nl, true}, {qos, 1}]), @@ -1745,7 +1747,7 @@ session_switch_v3_v5(Config, Disconnect) -> case Disconnect of true -> ok; false -> receive {disconnected, ?RC_SESSION_TAKEN_OVER, #{}} -> ok - after 1000 -> ct:fail("missing DISCONNECT packet for C2") + after ?TIMEOUT -> ct:fail("missing DISCONNECT packet for C2") end end, %% We expect that v5 specific subscription feature does not apply @@ -1755,7 +1757,7 @@ session_switch_v3_v5(Config, Disconnect) -> #{client_pid := C3, payload := <<"m3">>, qos := 1}} -> ok - after 1000 -> ct:fail("did not receive m3 with QoS 1") + after ?TIMEOUT -> ct:fail("did not receive m3 with QoS 1") end, %% Modifying the subscription once more with v3 client should work. {ok, _, [0]} = emqtt:subscribe(C3, Topic, qos0), @@ -1764,7 +1766,7 @@ session_switch_v3_v5(Config, Disconnect) -> #{client_pid := C3, payload := <<"m4">>, qos := 0}} -> ok - after 1000 -> ct:fail("did not receive m3 with QoS 0") + after ?TIMEOUT -> ct:fail("did not receive m3 with QoS 0") end, %% Unsubscribing in v3 should work. @@ -1821,7 +1823,7 @@ topic_alias_server_to_client(Config) -> A1 = receive {publish, #{payload := <<"m1">>, topic := <<"t/1">>, properties := #{'Topic-Alias' := A1a}}} -> A1a - after 500 -> ct:fail("Did not receive m1") + after ?TIMEOUT -> ct:fail("Did not receive m1") end, %% We don't expect a Topic Alias when the Topic Name consists of a single byte. @@ -1830,14 +1832,14 @@ topic_alias_server_to_client(Config) -> topic := <<"t">>, properties := Props1}} when map_size(Props1) =:= 0 -> ok - after 500 -> ct:fail("Did not receive m2") + after ?TIMEOUT -> ct:fail("Did not receive m2") end, {ok, _} = emqtt:publish(C1, <<"t/2">>, <<"m3">>, qos1), A2 = receive {publish, #{payload := <<"m3">>, topic := <<"t/2">>, properties := #{'Topic-Alias' := A2a}}} -> A2a - after 500 -> ct:fail("Did not receive m3") + after ?TIMEOUT -> ct:fail("Did not receive m3") end, ?assertEqual([1, 2], lists:sort([A1, A2])), @@ -1848,7 +1850,7 @@ topic_alias_server_to_client(Config) -> topic := <<"t/3">>, properties := Props2}} when map_size(Props2) =:= 0 -> ok - after 500 -> ct:fail("Did not receive m4") + after ?TIMEOUT -> ct:fail("Did not receive m4") end, %% Existing topic aliases should still be sent. @@ -1858,13 +1860,13 @@ topic_alias_server_to_client(Config) -> topic := <<>>, properties := #{'Topic-Alias' := A1b}}} -> ?assertEqual(A1, A1b) - after 500 -> ct:fail("Did not receive m5") + after ?TIMEOUT -> ct:fail("Did not receive m5") end, receive {publish, #{payload := <<"m6">>, topic := <<>>, properties := #{'Topic-Alias' := A2b}}} -> ?assertEqual(A2, A2b) - after 500 -> ct:fail("Did not receive m6") + after ?TIMEOUT -> ct:fail("Did not receive m6") end, ok = emqtt:disconnect(C1), @@ -1890,13 +1892,13 @@ topic_alias_bidirectional(Config) -> payload := <<"m2">>, topic := Topic1, properties := #{'Topic-Alias' := 1}}} -> ok - after 500 -> ct:fail("Did not receive m2") + after ?TIMEOUT -> ct:fail("Did not receive m2") end, receive {publish, #{client_pid := C1, payload := <<"m4">>, topic := <<>>, properties := #{'Topic-Alias' := 1}}} -> ok - after 500 -> ct:fail("Did not receive m4") + after ?TIMEOUT -> ct:fail("Did not receive m4") end, ok = emqtt:disconnect(C1), ok = emqtt:disconnect(C2). @@ -1950,7 +1952,7 @@ topic_alias_in_retained_message0(Config, TopicAliasMax, TopicAlias, ExpectedProp retain := true, properties := Props}} -> ?assertEqual(ExpectedProps, Props) - after 500 -> ct:fail("Did not receive retained message") + after ?TIMEOUT -> ct:fail("Did not receive retained message") end, ok = emqtt:disconnect(C). @@ -2107,7 +2109,7 @@ receive_correlations(Ctag, N, Set) -> #amqp_msg{props = #'P_basic'{correlation_id = Corr}}} -> ?assert(is_binary(Corr)), receive_correlations(Ctag, N + 1, sets:add_element(Corr, Set)) - after 200 -> + after 1000 -> {N, Set} end. From fe7a1413316eb88a9bd972ef4a67fcda11fe6cfa Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Thu, 12 Dec 2024 15:51:59 +0100 Subject: [PATCH 4/4] Test: Increase receive timeout in all rabbit test suites --- deps/rabbit/test/amqp_address_SUITE.erl | 20 +++-- deps/rabbit/test/amqp_auth_SUITE.erl | 86 ++++++++++--------- deps/rabbit/test/amqp_client_SUITE.erl | 2 +- deps/rabbit/test/amqp_credit_api_v2_SUITE.erl | 16 ++-- deps/rabbit/test/amqp_utils.erl | 22 ++--- deps/rabbit/test/amqpl_consumer_ack_SUITE.erl | 34 ++++---- .../test/amqpl_direct_reply_to_SUITE.erl | 10 ++- deps/rabbit/test/backing_queue_SUITE.erl | 2 +- .../rabbit/test/clustering_recovery_SUITE.erl | 4 +- deps/rabbit/test/confirms_rejects_SUITE.erl | 12 +-- deps/rabbit/test/consumer_timeout_SUITE.erl | 10 ++- deps/rabbit/test/dead_lettering_SUITE.erl | 28 +++--- .../test/direct_exchange_routing_v2_SUITE.erl | 4 +- ...disconnect_detected_during_alarm_SUITE.erl | 2 +- .../test/metadata_store_clustering_SUITE.erl | 4 +- .../rabbit/test/mirrored_supervisor_SUITE.erl | 2 +- deps/rabbit/test/msg_size_metrics_SUITE.erl | 6 +- .../publisher_confirms_parallel_SUITE.erl | 10 +-- .../rabbit/test/queue_length_limits_SUITE.erl | 8 +- deps/rabbit/test/queue_parallel_SUITE.erl | 4 +- deps/rabbit/test/queue_type_SUITE.erl | 12 +-- deps/rabbit/test/quorum_queue_SUITE.erl | 53 ++++++------ .../test/rabbit_db_topic_exchange_SUITE.erl | 2 +- .../rabbit_fifo_dlx_integration_SUITE.erl | 8 +- deps/rabbit/test/rabbit_fifo_int_SUITE.erl | 35 ++++---- .../rabbit_local_random_exchange_SUITE.erl | 2 +- .../rabbit/test/rabbit_stream_queue_SUITE.erl | 45 +++++----- .../rabbitmq_queues_cli_integration_SUITE.erl | 2 +- .../test/single_active_consumer_SUITE.erl | 16 ++-- deps/rabbit/test/topic_permission_SUITE.erl | 8 +- deps/rabbit/test/transactions_SUITE.erl | 2 +- deps/rabbit/test/unicode_SUITE.erl | 2 +- .../test/unit_file_handle_cache_SUITE.erl | 4 +- 33 files changed, 248 insertions(+), 229 deletions(-) diff --git a/deps/rabbit/test/amqp_address_SUITE.erl b/deps/rabbit/test/amqp_address_SUITE.erl index 607aa11473..a914442d97 100644 --- a/deps/rabbit/test/amqp_address_SUITE.erl +++ b/deps/rabbit/test/amqp_address_SUITE.erl @@ -22,6 +22,8 @@ [flush/1, wait_for_credit/1]). +-define(TIMEOUT, 30_000). + all() -> [ {group, v1_permitted}, @@ -216,7 +218,7 @@ target_exchange_absent(Config) -> condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, description = {utf8, <<"no exchange '", XName:(byte_size(XName))/binary, "' in vhost '/'">>}}}}} -> ok - after 5000 -> + after ?TIMEOUT -> Reason = {missing_event, ?LINE}, flush(Reason), ct:fail(Reason) @@ -275,7 +277,7 @@ target_queue_absent(Config) -> condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, description = {utf8, <<"no queue '", QName:(byte_size(QName))/binary, "' in vhost '/'">>}}}}} -> ok - after 5000 -> + after ?TIMEOUT -> Reason = {missing_event, ?LINE}, flush(Reason), ct:fail(Reason) @@ -403,7 +405,7 @@ target_per_message_unset_to_address(Config) -> ok = amqp10_client:detach_link(Sender), receive {amqp10_event, {link, Sender, {detached, normal}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection). @@ -467,7 +469,7 @@ target_per_message_bad_to_address(Config) -> ?assertMatch(#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, description = {utf8, <<"bad 'to' address", _Rest/binary>>}}, Error) - after 5000 -> + after ?TIMEOUT -> flush(missing_disposition), ct:fail(missing_disposition) end @@ -507,7 +509,7 @@ target_per_message_exchange_absent_settled(Config) -> info = {map, [{{symbol, <<"delivery-tag">>}, {binary, DTag2}}]} }, Error) - after 5000 -> ct:fail("server did not close our outgoing link") + after ?TIMEOUT -> ct:fail("server did not close our outgoing link") end, ok = cleanup(Init). @@ -566,7 +568,7 @@ target_bad_address0(TargetAddress, Config) -> {session, Session, {ended, #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD}}}} -> ok - after 5000 -> + after ?TIMEOUT -> Reason = {missing_event, ?LINE, TargetAddress}, flush(Reason), ct:fail(Reason) @@ -593,7 +595,7 @@ source_queue_absent(Config) -> condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, description = {utf8, <<"no queue '", QName:(byte_size(QName))/binary, "' in vhost '/'">>}}}}} -> ok - after 5000 -> + after ?TIMEOUT -> Reason = {missing_event, ?LINE}, flush(Reason), ct:fail(Reason) @@ -626,7 +628,7 @@ source_bad_address0(SourceAddress, Config) -> {session, Session, {ended, #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD}}}} -> ok - after 5000 -> + after ?TIMEOUT -> Reason = {missing_event, ?LINE}, flush(Reason), ct:fail(Reason) @@ -657,7 +659,7 @@ wait_for_settled(State, Tag) -> receive {amqp10_disposition, {State, Tag}} -> ok - after 5000 -> + after ?TIMEOUT -> Reason = {?FUNCTION_NAME, State, Tag}, flush(Reason), ct:fail(Reason) diff --git a/deps/rabbit/test/amqp_auth_SUITE.erl b/deps/rabbit/test/amqp_auth_SUITE.erl index 6bd905a924..f9328aab96 100644 --- a/deps/rabbit/test/amqp_auth_SUITE.erl +++ b/deps/rabbit/test/amqp_auth_SUITE.erl @@ -14,6 +14,8 @@ -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("amqp10_common/include/amqp10_framing.hrl"). +-define(TIMEOUT, 30_000). + -import(rabbit_ct_broker_helpers, [rpc/4]). -import(rabbit_ct_helpers, @@ -153,8 +155,8 @@ v1_attach_target_queue(Config) -> <<"configure access to queue 'test queue' in vhost " "'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session1, {ended, ExpectedErr1}}} -> ok - after 5000 -> flush(missing_ended), - ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") + after ?TIMEOUT -> flush(missing_ended), + ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, %% Give the user configure permissions on the queue. @@ -166,7 +168,7 @@ v1_attach_target_queue(Config) -> <<"write access to exchange 'amq.default' in vhost " "'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session2, {ended, ExpectedErr2}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, @@ -177,8 +179,8 @@ v1_attach_target_queue(Config) -> {ok, Sender3} = amqp10_client:attach_sender_link( Session3, <<"test-sender-3">>, TargetAddress), receive {amqp10_event, {link, Sender3, attached}} -> ok - after 5000 -> flush(missing_attached), - ct:fail("missing ATTACH from server") + after ?TIMEOUT -> flush(missing_attached), + ct:fail("missing ATTACH from server") end, ok = close_connection_sync(Connection). @@ -202,8 +204,8 @@ v1_attach_source_exchange(Config) -> #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, description = {utf8, <<"configure access to queue 'amq.gen", _/binary>>}}}}} -> ok - after 5000 -> flush(missing_ended), - ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") + after ?TIMEOUT -> flush(missing_ended), + ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, %% Give the user configure permissions on the queue. @@ -218,7 +220,7 @@ v1_attach_source_exchange(Config) -> #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, description = {utf8, <<"write access to queue 'amq.gen", _/binary>>}}}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, @@ -231,7 +233,7 @@ v1_attach_source_exchange(Config) -> <<"read access to exchange 'amq.fanout' in vhost " "'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session3, {ended, ExpectedErr1}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, @@ -247,7 +249,7 @@ v1_attach_source_exchange(Config) -> #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, description = {utf8, <<"read access to queue 'amq.gen", _/binary>>}}}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, @@ -258,7 +260,7 @@ v1_attach_source_exchange(Config) -> {ok, Recv5} = amqp10_client:attach_receiver_link( Session5, <<"receiver-5">>, SourceAddress), receive {amqp10_event, {link, Recv5, attached}} -> ok - after 5000 -> flush(missing_attached), + after ?TIMEOUT -> flush(missing_attached), ct:fail("missing ATTACH from server") end, @@ -290,7 +292,7 @@ send_to_topic(TargetAddress, Config) -> <<"write access to topic 'test vhost.test user.a.b' in exchange " "'amq.topic' in vhost 'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, @@ -304,7 +306,7 @@ send_to_topic(TargetAddress, Config) -> ok = amqp10_client:send_msg(Sender2, Msg2), %% We expect RELEASED since no queue is bound. receive {amqp10_disposition, {released, Dtag}} -> ok - after 5000 -> ct:fail(released_timeout) + after ?TIMEOUT -> ct:fail(released_timeout) end, ok = amqp10_client:detach_link(Sender2), @@ -330,7 +332,7 @@ v1_send_to_topic_using_subject(Config) -> ok = amqp10_client:send_msg(Sender, Msg1b), %% We have sufficient authorization, but expect RELEASED since no queue is bound. receive {amqp10_disposition, {released, Dtag1}} -> ok - after 5000 -> ct:fail(released_timeout) + after ?TIMEOUT -> ct:fail(released_timeout) end, Dtag2 = <<"dtag 2">>, @@ -342,7 +344,7 @@ v1_send_to_topic_using_subject(Config) -> <<"write access to topic '.a.b' in exchange 'amq.topic' in " "vhost 'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, @@ -374,7 +376,7 @@ attach_source_topic0(SourceAddress, Config) -> <<"read access to topic 'test vhost.test user.a.b' in exchange " "'amq.topic' in vhost 'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, @@ -383,7 +385,7 @@ attach_source_topic0(SourceAddress, Config) -> {ok, Recv2} = amqp10_client:attach_receiver_link( Session2, <<"receiver-2">>, SourceAddress), receive {amqp10_event, {link, Recv2, attached}} -> ok - after 5000 -> flush(missing_attached), + after ?TIMEOUT -> flush(missing_attached), ct:fail("missing ATTACH from server") end, @@ -405,7 +407,7 @@ v1_attach_target_internal_exchange(Config) -> ExpectedErr = error_unauthorized( <<"forbidden to publish to internal exchange 'test exchange' in vhost '/'">>), receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, @@ -429,7 +431,7 @@ attach_source_queue(Config) -> receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS") end, ok = close_connection_sync(Conn). @@ -448,13 +450,13 @@ attach_target_exchange(Config) -> <<"write access to exchange '", XName/binary, "' in vhost 'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, {ok, Session2} = amqp10_client:begin_session_sync(Connection), {ok, _} = amqp10_client:attach_sender_link(Session2, <<"test-sender">>, Address2), receive {amqp10_event, {session, Session2, {ended, ExpectedErr}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, ok = amqp10_client:close_connection(Connection). @@ -478,7 +480,7 @@ attach_target_queue(Config) -> <<"write access to exchange 'amq.default' ", "in vhost 'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, ok = amqp10_client:close_connection(Conn). @@ -500,7 +502,7 @@ target_per_message_exchange(Config) -> Msg1 = amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, <<"m1">>)), ok = amqp10_client:send_msg(Sender, Msg1), receive {amqp10_disposition, {released, Tag1}} -> ok - after 5000 -> ct:fail(released_timeout) + after ?TIMEOUT -> ct:fail(released_timeout) end, %% We don't have sufficient authorization. @@ -511,7 +513,7 @@ target_per_message_exchange(Config) -> <<"write access to exchange 'amq.default' in " "vhost 'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, ok = close_connection_sync(Connection). @@ -534,7 +536,7 @@ target_per_message_internal_exchange(Config) -> ExpectedErr = error_unauthorized( <<"forbidden to publish to internal exchange '", XName/binary, "' in vhost 'test vhost'">>), receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok - after 5000 -> flush(missing_event), + after ?TIMEOUT -> flush(missing_event), ct:fail({missing_event, ?LINE}) end, ok = close_connection_sync(Conn1), @@ -563,7 +565,7 @@ target_per_message_topic(Config) -> Msg1 = amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, <<"m1">>)), ok = amqp10_client:send_msg(Sender, Msg1), receive {amqp10_disposition, {released, Tag1}} -> ok - after 5000 -> ct:fail(released_timeout) + after ?TIMEOUT -> ct:fail(released_timeout) end, %% We don't have sufficient authorization. @@ -574,7 +576,7 @@ target_per_message_topic(Config) -> <<"write access to topic '.a.b' in exchange 'amq.topic' in " "vhost 'test vhost' refused for user 'test user'">>), receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, ok = close_connection_sync(Connection). @@ -594,7 +596,7 @@ authn_failure_event(Config) -> {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, {closed, sasl_auth_failure}}} -> ok - after 5000 -> flush(missing_closed), + after ?TIMEOUT -> flush(missing_closed), ct:fail("did not receive sasl_auth_failure") end, @@ -621,7 +623,7 @@ sasl_success(Mechanism, Config) -> OpnConf = OpnConf0#{sasl := Mechanism}, {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, opened}} -> ok - after 5000 -> ct:fail(missing_opened) + after ?TIMEOUT -> ct:fail(missing_opened) end, ok = amqp10_client:close_connection(Connection). @@ -638,7 +640,7 @@ sasl_anonymous_failure(Config) -> {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, {closed, Reason}}} -> ?assertEqual({sasl_not_supported, Mechanism}, Reason) - after 5000 -> ct:fail(missing_closed) + after ?TIMEOUT -> ct:fail(missing_closed) end, ok = rpc(Config, application, set_env, [App, Par, Default]). @@ -649,7 +651,7 @@ sasl_plain_failure(Config) -> {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, {closed, Reason}}} -> ?assertEqual(sasl_auth_failure, Reason) - after 5000 -> ct:fail(missing_closed) + after ?TIMEOUT -> ct:fail(missing_closed) end. %% Skipping SASL is disallowed in RabbitMQ. @@ -658,14 +660,14 @@ sasl_none_failure(Config) -> OpnConf = OpnConf0#{sasl := none}, {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, {closed, _Reason}}} -> ok - after 5000 -> ct:fail(missing_closed) + after ?TIMEOUT -> ct:fail(missing_closed) end. vhost_absent(Config) -> OpnConf = connection_config(Config, <<"this vhost does not exist">>), {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, {closed, _}}} -> ok - after 5000 -> ct:fail(missing_closed) + after ?TIMEOUT -> ct:fail(missing_closed) end. vhost_connection_limit(Config) -> @@ -675,22 +677,22 @@ vhost_connection_limit(Config) -> OpnConf = connection_config(Config), {ok, C1} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, C1, opened}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, {ok, C2} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, C2, {closed, _}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, OpnConf0 = connection_config(Config, <<"/">>), OpnConf1 = OpnConf0#{sasl := anon}, {ok, C3} = amqp10_client:open_connection(OpnConf1), receive {amqp10_event, {connection, C3, opened}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, {ok, C4} = amqp10_client:open_connection(OpnConf1), receive {amqp10_event, {connection, C4, opened}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, [ok = close_connection_sync(C) || C <- [C1, C3, C4]], @@ -704,12 +706,12 @@ user_connection_limit(Config) -> OpnConf = OpnConf0#{sasl := anon}, {ok, C1} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, C1, {closed, _}}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, {ok, C2} = amqp10_client:open_connection(connection_config(Config)), receive {amqp10_event, {connection, C2, opened}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, ok = close_connection_sync(C2), @@ -731,7 +733,7 @@ v1_vhost_queue_limit(Config) -> ?V_1_0_AMQP_ERROR_RESOURCE_LIMIT_EXCEEDED, <<"cannot declare queue 'q1': queue limit in vhost 'test vhost' (0) is reached">>), receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok - after 5000 -> flush(missing_ended), + after ?TIMEOUT -> flush(missing_ended), ct:fail("did not receive expected error") end, @@ -742,8 +744,8 @@ v1_vhost_queue_limit(Config) -> {ok, Sender2} = amqp10_client:attach_sender_link( Session2, <<"test-sender-2">>, TargetAddress), receive {amqp10_event, {link, Sender2, attached}} -> ok - after 5000 -> flush(missing_attached), - ct:fail("missing ATTACH from server") + after ?TIMEOUT -> flush(missing_attached), + ct:fail("missing ATTACH from server") end, ok = close_connection_sync(C1), diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index ab4addfd69..78d8c69938 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -1260,7 +1260,7 @@ drain_many(Config, QueueType, QName) after 30000 -> ct:fail({missing_delivery, ?LINE}) end, receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok - after 300 -> ct:fail("expected credit_exhausted") + after 30000 -> ct:fail("expected credit_exhausted") end, ok = amqp10_client:detach_link(Sender), diff --git a/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl b/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl index ba465e396f..83f91cfda6 100644 --- a/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl +++ b/deps/rabbit/test/amqp_credit_api_v2_SUITE.erl @@ -80,10 +80,10 @@ credit_api_v2(Config) -> {ok, CQSender} = amqp10_client:attach_sender_link(Session, <<"cq sender">>, CQAddr), {ok, QQSender} = amqp10_client:attach_sender_link(Session, <<"qq sender">>, QQAddr), receive {amqp10_event, {link, CQSender, credited}} -> ok - after 5000 -> ct:fail(credited_timeout) + after 30_000 -> ct:fail(credited_timeout) end, receive {amqp10_event, {link, QQSender, credited}} -> ok - after 5000 -> ct:fail(credited_timeout) + after 30_000 -> ct:fail(credited_timeout) end, %% Send 40 messages to each queue. @@ -146,7 +146,7 @@ credit_api_v2(Config) -> %% Draining should also work. ok = amqp10_client:flow_link_credit(CQReceiver3, 10, never, true), receive {amqp10_event, {link, CQReceiver3, credit_exhausted}} -> ok - after 5000 -> ct:fail({missing_credit_exhausted, ?LINE}) + after 30_000 -> ct:fail({missing_credit_exhausted, ?LINE}) end, receive Unexpected1 -> ct:fail({unexpected, ?LINE, Unexpected1}) after 20 -> ok @@ -154,7 +154,7 @@ credit_api_v2(Config) -> ok = amqp10_client:flow_link_credit(QQReceiver3, 10, never, true), receive {amqp10_event, {link, QQReceiver3, credit_exhausted}} -> ok - after 5000 -> ct:fail({missing_credit_exhausted, ?LINE}) + after 30_000 -> ct:fail({missing_credit_exhausted, ?LINE}) end, receive Unexpected2 -> ct:fail({unexpected, ?LINE, Unexpected2}) after 20 -> ok @@ -166,11 +166,11 @@ credit_api_v2(Config) -> ok = detach_sync(QQReceiver3), ok = amqp10_client:end_session(Session), receive {amqp10_event, {session, Session, {ended, _}}} -> ok - after 5000 -> ct:fail(missing_ended) + after 30_000 -> ct:fail(missing_ended) end, ok = amqp10_client:close_connection(Connection), receive {amqp10_event, {connection, Connection, {closed, normal}}} -> ok - after 5000 -> ct:fail(missing_closed) + after 30_000 -> ct:fail(missing_closed) end. consume_and_accept(NumMsgs, Receiver) -> @@ -192,14 +192,14 @@ receive_messages0(Receiver, N, Acc) -> receive {amqp10_msg, Receiver, Msg} -> receive_messages0(Receiver, N - 1, [Msg | Acc]) - after 5000 -> + after 30_000 -> exit({timeout, {num_received, length(Acc)}, {num_missing, N}}) end. detach_sync(Receiver) -> ok = amqp10_client:detach_link(Receiver), receive {amqp10_event, {link, Receiver, {detached, normal}}} -> ok - after 5000 -> ct:fail({missing_detached, Receiver}) + after 30_000 -> ct:fail({missing_detached, Receiver}) end. flush(Prefix) -> diff --git a/deps/rabbit/test/amqp_utils.erl b/deps/rabbit/test/amqp_utils.erl index 22865df919..ebc2d4f03b 100644 --- a/deps/rabbit/test/amqp_utils.erl +++ b/deps/rabbit/test/amqp_utils.erl @@ -55,7 +55,7 @@ wait_for_credit(Sender) -> receive {amqp10_event, {link, Sender, credited}} -> ok - after 5000 -> + after 30_000 -> flush("wait_for_credit timed out"), ct:fail(credited_timeout) end. @@ -66,7 +66,7 @@ wait_for_accepts(N) -> receive {amqp10_disposition, {accepted, _}} -> wait_for_accepts(N - 1) - after 5000 -> + after 30_000 -> ct:fail({missing_accepted, N}) end. @@ -108,9 +108,9 @@ wait_for_link_detach(Link) -> {amqp10_event, {link, Link, {detached, #'v1_0.detach'{}}}} -> flush(?FUNCTION_NAME), ok - after 5000 -> - flush("wait_for_link_detach timed out"), - ct:fail({link_detach_timeout, Link}) + after 30_000 -> + flush("wait_for_link_detach timed out"), + ct:fail({link_detach_timeout, Link}) end. end_session_sync(Session) @@ -123,9 +123,9 @@ wait_for_session_end(Session) -> {amqp10_event, {session, Session, {ended, _}}} -> flush(?FUNCTION_NAME), ok - after 5000 -> - flush("wait_for_session_end timed out"), - ct:fail({session_end_timeout, Session}) + after 30_000 -> + flush("wait_for_session_end timed out"), + ct:fail({session_end_timeout, Session}) end. close_connection_sync(Connection) @@ -138,7 +138,7 @@ wait_for_connection_close(Connection) -> {amqp10_event, {connection, Connection, {closed, normal}}} -> flush(?FUNCTION_NAME), ok - after 5000 -> - flush("wait_for_connection_close timed out"), - ct:fail({connection_close_timeout, Connection}) + after 30_000 -> + flush("wait_for_connection_close timed out"), + ct:fail({connection_close_timeout, Connection}) end. diff --git a/deps/rabbit/test/amqpl_consumer_ack_SUITE.erl b/deps/rabbit/test/amqpl_consumer_ack_SUITE.erl index 1a3a878ccd..e1093fe3ac 100644 --- a/deps/rabbit/test/amqpl_consumer_ack_SUITE.erl +++ b/deps/rabbit/test/amqpl_consumer_ack_SUITE.erl @@ -19,6 +19,8 @@ -import(rabbit_ct_helpers, [eventually/3]). +-define(TIMEOUT, 30_000). + all() -> [ {group, tests} @@ -95,7 +97,7 @@ requeue_one_channel(QType, Config) -> self()), receive #'basic.consume_ok'{consumer_tag = Ctag} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, [begin @@ -107,19 +109,19 @@ requeue_one_channel(QType, Config) -> receive {#'basic.deliver'{}, #amqp_msg{payload = <<"1">>}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, receive {#'basic.deliver'{}, #amqp_msg{payload = <<"2">>}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, D3 = receive {#'basic.deliver'{delivery_tag = Del3}, #amqp_msg{payload = <<"3">>}} -> Del3 - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, receive {#'basic.deliver'{}, #amqp_msg{payload = <<"4">>}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, assert_messages(QName, 4, 4, Config), @@ -132,18 +134,18 @@ requeue_one_channel(QType, Config) -> receive {#'basic.deliver'{}, #amqp_msg{payload = P1}} -> ?assertEqual(<<"1">>, P1) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, receive {#'basic.deliver'{}, #amqp_msg{payload = P2}} -> ?assertEqual(<<"2">>, P2) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, D3b = receive {#'basic.deliver'{delivery_tag = Del3b}, #amqp_msg{payload = P3}} -> ?assertEqual(<<"3">>, P3), Del3b - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, assert_messages(QName, 4, 4, Config), @@ -181,7 +183,7 @@ requeue_two_channels(QType, Config) -> self()), receive #'basic.consume_ok'{consumer_tag = Ctag1} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, amqp_channel:subscribe(Ch2, @@ -189,7 +191,7 @@ requeue_two_channels(QType, Config) -> consumer_tag = Ctag2}, self()), receive #'basic.consume_ok'{consumer_tag = Ctag2} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, [begin @@ -203,22 +205,22 @@ requeue_two_channels(QType, Config) -> receive {#'basic.deliver'{consumer_tag = C1}, #amqp_msg{payload = <<"1">>}} -> ?assertEqual(Ctag1, C1) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, receive {#'basic.deliver'{consumer_tag = C2}, #amqp_msg{payload = <<"2">>}} -> ?assertEqual(Ctag2, C2) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, receive {#'basic.deliver'{consumer_tag = C3}, #amqp_msg{payload = <<"3">>}} -> ?assertEqual(Ctag1, C3) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, receive {#'basic.deliver'{consumer_tag = C4}, #amqp_msg{payload = <<"4">>}} -> ?assertEqual(Ctag2, C4) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, assert_messages(QName, 4, 4, Config), @@ -228,14 +230,14 @@ requeue_two_channels(QType, Config) -> receive {#'basic.deliver'{consumer_tag = C5}, #amqp_msg{payload = <<"1">>}} -> ?assertEqual(Ctag2, C5) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, DelTag = receive {#'basic.deliver'{consumer_tag = C6, delivery_tag = D}, #amqp_msg{payload = <<"3">>}} -> ?assertEqual(Ctag2, C6), D - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, assert_messages(QName, 4, 4, Config), diff --git a/deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl b/deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl index 8cd6079669..720476ac26 100644 --- a/deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl +++ b/deps/rabbit/test/amqpl_direct_reply_to_SUITE.erl @@ -16,6 +16,8 @@ -import(rabbit_ct_helpers, [eventually/1]). +-define(TIMEOUT, 30_000). + all() -> [ {group, cluster_size_1}, @@ -116,7 +118,7 @@ trace(Config) -> correlation_id = CorrelationId}, payload = RequestPayload}), receive #'basic.ack'{} -> ok - after 5000 -> ct:fail(confirm_timeout) + after ?TIMEOUT -> ct:fail(confirm_timeout) end, %% Receive the request. @@ -138,7 +140,7 @@ trace(Config) -> #amqp_msg{payload = ReplyPayload, props = #'P_basic'{correlation_id = CorrelationId}}} -> ok - after 5000 -> ct:fail(missing_reply) + after ?TIMEOUT -> ct:fail(missing_reply) end, %% 2 messages should have entered RabbitMQ: @@ -217,7 +219,7 @@ rpc(RequesterNode, ResponderNode, Config) -> correlation_id = CorrelationId}, payload = RequestPayload}), receive #'basic.ack'{} -> ok - after 5000 -> ct:fail(confirm_timeout) + after ?TIMEOUT -> ct:fail(confirm_timeout) end, ok = wait_for_queue_declared(RequestQueue, ResponderNode, Config), @@ -239,7 +241,7 @@ rpc(RequesterNode, ResponderNode, Config) -> #amqp_msg{payload = ReplyPayload, props = #'P_basic'{correlation_id = CorrelationId}}} -> ok - after 5000 -> ct:fail(missing_reply) + after ?TIMEOUT -> ct:fail(missing_reply) end. wait_for_queue_declared(Queue, Node, Config) -> diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index 2735478986..534a05d4a9 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -1052,7 +1052,7 @@ bq_queue_recover1(Config) -> exit(QPid, kill), MRef = erlang:monitor(process, QPid), receive {'DOWN', MRef, process, QPid, _Info} -> ok - after 10000 -> exit(timeout_waiting_for_queue_death) + after ?TIMEOUT -> exit(timeout_waiting_for_queue_death) end, rabbit_amqqueue:stop(?VHOST), {Recovered, []} = rabbit_amqqueue:recover(?VHOST), diff --git a/deps/rabbit/test/clustering_recovery_SUITE.erl b/deps/rabbit/test/clustering_recovery_SUITE.erl index b5dd042608..503a939bb1 100644 --- a/deps/rabbit/test/clustering_recovery_SUITE.erl +++ b/deps/rabbit/test/clustering_recovery_SUITE.erl @@ -362,7 +362,7 @@ subscribe(Ch, QName) -> receive #'basic.consume_ok'{consumer_tag = CTag} -> ok - after 10000 -> + after 30000 -> exit(consume_ok_timeout) end. @@ -372,6 +372,6 @@ consume(N) -> receive {#'basic.deliver'{consumer_tag = <<"ctag">>}, _} -> consume(N - 1) - after 10000 -> + after 30000 -> exit(deliver_timeout) end. diff --git a/deps/rabbit/test/confirms_rejects_SUITE.erl b/deps/rabbit/test/confirms_rejects_SUITE.erl index 0e51ac8cd5..e477c44471 100644 --- a/deps/rabbit/test/confirms_rejects_SUITE.erl +++ b/deps/rabbit/test/confirms_rejects_SUITE.erl @@ -6,6 +6,8 @@ -include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile(export_all). +-define(TIMEOUT, 30_000). + all() -> [ {group, parallel_tests} @@ -132,7 +134,7 @@ dead_queue_rejects(Config) -> receive {'basic.ack',_,_} -> ok - after 10000 -> + after ?TIMEOUT -> error(timeout_waiting_for_initial_ack) end, @@ -191,7 +193,7 @@ kill_queue_expect_nack(Config, Ch, QueueName, BasicPublish, AmqpMsg, Tries) -> ok; {'basic.ack',_,_} -> retry - after 10000 -> + after ?TIMEOUT -> error({timeout_waiting_for_nack, process_info(self(), messages)}) end, case R of @@ -343,19 +345,19 @@ consume_all_messages(Ch, QueueName, Msgs) -> assert_ack() -> receive {'basic.ack', _, _} -> ok - after 10000 -> error(timeout_waiting_for_ack) + after ?TIMEOUT -> error(timeout_waiting_for_ack) end, clean_acks_mailbox(). assert_nack() -> receive {'basic.nack', _, _, _} -> ok - after 10000 -> error(timeout_waiting_for_nack) + after ?TIMEOUT -> error(timeout_waiting_for_nack) end, clean_acks_mailbox(). assert_acks(N) -> receive {'basic.ack', N, _} -> ok - after 10000 -> error({timeout_waiting_for_ack, N}) + after ?TIMEOUT -> error({timeout_waiting_for_ack, N}) end, clean_acks_mailbox(). diff --git a/deps/rabbit/test/consumer_timeout_SUITE.erl b/deps/rabbit/test/consumer_timeout_SUITE.erl index c3988571a5..8fe6872b9f 100644 --- a/deps/rabbit/test/consumer_timeout_SUITE.erl +++ b/deps/rabbit/test/consumer_timeout_SUITE.erl @@ -14,7 +14,9 @@ -compile(export_all). -define(CONSUMER_TIMEOUT, 2000). --define(RECEIVE_TIMEOUT, ?CONSUMER_TIMEOUT * 2). +%% Sometimes CI machines are really slow, +%% expecting CONSUMER_TIMEOUT*2 might not be enough +-define(RECEIVE_TIMEOUT, 30_000). -define(GROUP_CONFIG, #{global_consumer_timeout => [{rabbit, [{consumer_timeout, ?CONSUMER_TIMEOUT}]}, @@ -147,7 +149,7 @@ consumer_timeout(Config) -> {'DOWN', _, process, Conn, _} -> flush(1), exit(unexpected_connection_exit) - after 2000 -> + after ?RECEIVE_TIMEOUT -> ok end, rabbit_ct_client_helpers:close_channel(Ch), @@ -172,7 +174,7 @@ consumer_timeout_basic_get(Config) -> {'DOWN', _, process, Conn, _} -> flush(1), exit(unexpected_connection_exit) - after 2000 -> + after ?RECEIVE_TIMEOUT -> ok end, ok. @@ -221,7 +223,7 @@ consumer_timeout_no_basic_cancel_capability(Config) -> {'DOWN', _, process, Conn, _} -> flush(1), exit(unexpected_connection_exit) - after 2000 -> + after ?RECEIVE_TIMEOUT -> ok end, ok. diff --git a/deps/rabbit/test/dead_lettering_SUITE.erl b/deps/rabbit/test/dead_lettering_SUITE.erl index b793cb3abe..5598745bf4 100644 --- a/deps/rabbit/test/dead_lettering_SUITE.erl +++ b/deps/rabbit/test/dead_lettering_SUITE.erl @@ -17,6 +17,8 @@ -import(queue_utils, [wait_for_messages/2]). +-define(TIMEOUT, 30_000). + all() -> [ {group, tests} @@ -455,7 +457,7 @@ dead_letter_reject_many(Config) -> [begin receive {#'basic.deliver'{consumer_tag = CTag, delivery_tag = DTag}, #amqp_msg{payload = P}} -> amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag, requeue = false}) - after 5000 -> + after ?TIMEOUT -> amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), exit(timeout) end @@ -628,7 +630,7 @@ dead_letter_nack_requeue_nack_norequeue_basic_consume(Config) -> consumer_tag = Ctag1}, self()), receive #'basic.consume_ok'{consumer_tag = Ctag1} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, Ctag2 = <<"ctag 2">>, @@ -637,20 +639,20 @@ dead_letter_nack_requeue_nack_norequeue_basic_consume(Config) -> consumer_tag = Ctag2}, self()), receive #'basic.consume_ok'{consumer_tag = Ctag2} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, receive {#'basic.deliver'{}, #amqp_msg{payload = <<"m1">>}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, D2 = receive {#'basic.deliver'{delivery_tag = Del2}, #amqp_msg{payload = <<"m2">>}} -> Del2 - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, receive {#'basic.deliver'{}, #amqp_msg{payload = <<"m3">>}} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), @@ -663,13 +665,13 @@ dead_letter_nack_requeue_nack_norequeue_basic_consume(Config) -> receive {#'basic.deliver'{}, #amqp_msg{payload = P1a}} -> ?assertEqual(<<"m1">>, P1a) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, D5 = receive {#'basic.deliver'{delivery_tag = Del5}, #amqp_msg{payload = P2a}} -> ?assertEqual(<<"m2">>, P2a), Del5 - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, %% Nack all 3 without requeue @@ -681,18 +683,18 @@ dead_letter_nack_requeue_nack_norequeue_basic_consume(Config) -> receive {#'basic.deliver'{}, #amqp_msg{payload = P3b}} -> ?assertEqual(<<"m3">>, P3b) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, receive {#'basic.deliver'{}, #amqp_msg{payload = P1b}} -> ?assertEqual(<<"m1">>, P1b) - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, LastD = receive {#'basic.deliver'{delivery_tag = LastDel}, #amqp_msg{payload = P2b}} -> ?assertEqual(<<"m2">>, P2b), LastDel - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, wait_for_messages(Config, [[DLXQName, <<"3">>, <<"0">>, <<"3">>]]), @@ -1726,7 +1728,7 @@ metric_rejected(Config) -> [begin receive {#'basic.deliver'{consumer_tag = CTag, delivery_tag = DTag}, #amqp_msg{payload = P}} -> amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag, requeue = false}) - after 5000 -> + after ?TIMEOUT -> amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), exit(timeout) end @@ -1811,7 +1813,7 @@ stream(Config) -> self()), receive #'basic.consume_ok'{consumer_tag = Ctag} -> ok - after 5000 -> ct:fail({missing_event, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) end, Headers = receive {#'basic.deliver'{delivery_tag = DeliveryTag}, diff --git a/deps/rabbit/test/direct_exchange_routing_v2_SUITE.erl b/deps/rabbit/test/direct_exchange_routing_v2_SUITE.erl index abef0dd187..e53988860b 100644 --- a/deps/rabbit/test/direct_exchange_routing_v2_SUITE.erl +++ b/deps/rabbit/test/direct_exchange_routing_v2_SUITE.erl @@ -421,7 +421,7 @@ assert_confirm() -> receive #'basic.ack'{} -> ok - after 5000 -> + after 30_000 -> throw(missing_confirm) end. @@ -429,7 +429,7 @@ assert_return() -> receive {#'basic.return'{}, _} -> ok - after 5000 -> + after 30_000 -> throw(missing_return) end. diff --git a/deps/rabbit/test/disconnect_detected_during_alarm_SUITE.erl b/deps/rabbit/test/disconnect_detected_during_alarm_SUITE.erl index 92bf9aedd8..579e42b9d0 100644 --- a/deps/rabbit/test/disconnect_detected_during_alarm_SUITE.erl +++ b/deps/rabbit/test/disconnect_detected_during_alarm_SUITE.erl @@ -88,7 +88,7 @@ disconnect_detected_during_alarm(Config) -> % Check that connection was indeed blocked #'connection.blocked'{} -> ok after - 1000 -> exit(connection_was_not_blocked) + 30_000 -> exit(connection_was_not_blocked) end, %% Connection is blocked, now we should forcefully kill it diff --git a/deps/rabbit/test/metadata_store_clustering_SUITE.erl b/deps/rabbit/test/metadata_store_clustering_SUITE.erl index a33241d263..8b761378da 100644 --- a/deps/rabbit/test/metadata_store_clustering_SUITE.erl +++ b/deps/rabbit/test/metadata_store_clustering_SUITE.erl @@ -482,7 +482,7 @@ join_khepri_while_in_minority(Config) -> receive #'basic.consume_ok'{consumer_tag = CTag} -> ok - after 10000 -> + after 30_000 -> exit(consume_ok_timeout) end, @@ -490,7 +490,7 @@ join_khepri_while_in_minority(Config) -> receive {#'basic.deliver'{consumer_tag = <<"ctag">>}, _} -> ok - after 10000 -> + after 30_000 -> exit(deliver_timeout) end, diff --git a/deps/rabbit/test/mirrored_supervisor_SUITE.erl b/deps/rabbit/test/mirrored_supervisor_SUITE.erl index 7ce527a684..6b37f517a5 100644 --- a/deps/rabbit/test/mirrored_supervisor_SUITE.erl +++ b/deps/rabbit/test/mirrored_supervisor_SUITE.erl @@ -294,7 +294,7 @@ test_startup_failure(Fail, Group) -> receive {'EXIT', _, shutdown} -> ok - after 1000 -> + after 30_000 -> exit({did_not_exit, Fail}) end, process_flag(trap_exit, false), diff --git a/deps/rabbit/test/msg_size_metrics_SUITE.erl b/deps/rabbit/test/msg_size_metrics_SUITE.erl index 0b33ecf1a3..4fd9303843 100644 --- a/deps/rabbit/test/msg_size_metrics_SUITE.erl +++ b/deps/rabbit/test/msg_size_metrics_SUITE.erl @@ -81,7 +81,7 @@ message_size(Config) -> Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>), {ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address), receive {amqp10_event, {link, Sender, credited}} -> ok - after 5000 -> ct:fail(credited_timeout) + after 30_000 -> ct:fail(credited_timeout) end, ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)), @@ -149,6 +149,6 @@ wait_for_settlement(State, Tag) -> receive {amqp10_disposition, {State, Tag}} -> ok - after 5000 -> - ct:fail({disposition_timeout, Tag}) + after 30_000 -> + ct:fail({disposition_timeout, Tag}) end. diff --git a/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl b/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl index f2e2c3370e..ec7dfe10d9 100644 --- a/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl +++ b/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl @@ -155,7 +155,7 @@ confirm_ack(Config) -> receive #'basic.ack'{delivery_tag = 1} -> ok - after 5000 -> + after ?TIMEOUT -> throw(missing_ack) end. @@ -196,13 +196,13 @@ confirm_mandatory_unroutable(Config) -> receive {#'basic.return'{}, _} -> ok - after 5000 -> + after ?TIMEOUT -> throw(missing_return) end, receive #'basic.ack'{delivery_tag = 1} -> ok - after 5000 -> + after ?TIMEOUT -> throw(missing_ack) end. @@ -218,7 +218,7 @@ confirm_unroutable_message(Config) -> throw(unexpected_basic_return); #'basic.ack'{delivery_tag = 1} -> ok - after 5000 -> + after ?TIMEOUT -> throw(missing_ack) end. @@ -333,6 +333,6 @@ receive_many(DTags) -> receive_many(DTags -- lists:seq(1, DTag)); #'basic.ack'{delivery_tag = DTag, multiple = false} -> receive_many(DTags -- [DTag]) - after 5000 -> + after ?TIMEOUT -> throw(missing_ack) end. diff --git a/deps/rabbit/test/queue_length_limits_SUITE.erl b/deps/rabbit/test/queue_length_limits_SUITE.erl index b40cab4aa9..83499b0eab 100644 --- a/deps/rabbit/test/queue_length_limits_SUITE.erl +++ b/deps/rabbit/test/queue_length_limits_SUITE.erl @@ -306,19 +306,19 @@ check_max_length_rejects(QName, Ch, Payload1, Payload2, Payload3) -> %% First message can be enqueued and acks amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), receive #'basic.ack'{} -> ok - after 1000 -> error(expected_ack) + after 30_000 -> error(expected_ack) end, %% The message cannot be enqueued and nacks amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), receive #'basic.nack'{} -> ok - after 1000 -> error(expected_nack) + after 30_000 -> error(expected_nack) end, %% The message cannot be enqueued and nacks amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}), receive #'basic.nack'{} -> ok - after 1000 -> error(expected_nack) + after 30_000 -> error(expected_nack) end, {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), @@ -326,7 +326,7 @@ check_max_length_rejects(QName, Ch, Payload1, Payload2, Payload3) -> %% Now we can publish message 2. amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), receive #'basic.ack'{} -> ok - after 1000 -> error(expected_ack) + after 30_000 -> error(expected_ack) end, {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). diff --git a/deps/rabbit/test/queue_parallel_SUITE.erl b/deps/rabbit/test/queue_parallel_SUITE.erl index 5ee1c32326..208556bd5b 100644 --- a/deps/rabbit/test/queue_parallel_SUITE.erl +++ b/deps/rabbit/test/queue_parallel_SUITE.erl @@ -538,7 +538,7 @@ basic_cancel(Config) -> wait_for_messages(Config, QName, 3, 2, 1), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), wait_for_messages(Config, QName, 2, 2, 0) - after 5000 -> + after 30_000 -> exit(basic_deliver_timeout) end, rabbit_ct_client_helpers:close_channel(Ch), @@ -667,7 +667,7 @@ cc_header_non_array_should_close_channel(Config) -> receive {'DOWN', Ref, process, Ch, {shutdown, {server_initiated_close, 406, _}}} -> ok - after 5000 -> + after 30_000 -> exit(channel_closed_timeout) end, diff --git a/deps/rabbit/test/queue_type_SUITE.erl b/deps/rabbit/test/queue_type_SUITE.erl index 28352212df..80ba120db3 100644 --- a/deps/rabbit/test/queue_type_SUITE.erl +++ b/deps/rabbit/test/queue_type_SUITE.erl @@ -6,6 +6,8 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-define(TIMEOUT, 30_000). + %%%=================================================================== %%% Common Test callbacks %%%=================================================================== @@ -132,7 +134,7 @@ smoke(Config) -> redelivered = false}, #amqp_msg{}} -> basic_ack(Ch, DeliveryTag) - after 5000 -> + after ?TIMEOUT -> flush(), exit(basic_deliver_timeout) end, @@ -151,7 +153,7 @@ smoke(Config) -> #amqp_msg{}} -> basic_cancel(Ch, ConsumerTag2), basic_nack(Ch, T) - after 5000 -> + after ?TIMEOUT -> exit(basic_deliver_timeout) end, %% get and ack @@ -255,7 +257,7 @@ stream(Config) -> redelivered = false}, #amqp_msg{}} -> basic_ack(SubCh, T) - after 5000 -> + after ?TIMEOUT -> exit(basic_deliver_timeout) end catch @@ -294,7 +296,7 @@ publish_and_confirm(Ch, Queue, Msg) -> ok = receive #'basic.ack'{} -> ok; #'basic.nack'{} -> fail - after 2500 -> + after ?TIMEOUT -> flush(), exit(confirm_timeout) end. @@ -318,7 +320,7 @@ subscribe(Ch, Queue, CTag) -> receive #'basic.consume_ok'{consumer_tag = CTag} -> ok - after 5000 -> + after ?TIMEOUT -> exit(basic_consume_timeout) end. diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 6b9f5d485a..2ac95b85b4 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -28,6 +28,7 @@ -define(NET_TICKTIME_S, 5). -define(DEFAULT_AWAIT, 10_000). +-define(TIMEOUT, 30_000). suite() -> [{timetrap, 5 * 60_000}]. @@ -604,7 +605,7 @@ start_queue_concurrent(Config) -> [begin receive {done, Server} -> ok - after 10000 -> exit({await_done_timeout, Server}) + after ?TIMEOUT -> exit({await_done_timeout, Server}) end end || Server <- Servers], @@ -1077,7 +1078,7 @@ single_active_consumer_priority_take_over(Config) -> delivery_tag = DeliveryTag}, _} -> amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag, multiple = false}) - after 5000 -> + after ?TIMEOUT -> flush(1), exit(basic_deliver_timeout) end, @@ -2407,7 +2408,7 @@ confirm_availability_on_leader_change(Config) -> {'EXIT', Publisher, Err} -> ok = rabbit_ct_broker_helpers:start_node(Config, Node2), exit(Err) - after 30000 -> + after ?TIMEOUT -> ok = rabbit_ct_broker_helpers:start_node(Config, Node2), flush(100), exit(nothing_received_from_publisher_process) @@ -2872,7 +2873,7 @@ subscribe_redelivery_count(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}) - after 5000 -> + after ?TIMEOUT -> exit(basic_deliver_timeout) end, @@ -2885,7 +2886,7 @@ subscribe_redelivery_count(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, multiple = false, requeue = true}) - after 5000 -> + after ?TIMEOUT -> flush(1), exit(basic_deliver_timeout_2) end, @@ -2901,7 +2902,7 @@ subscribe_redelivery_count(Config) -> wait_for_messages_ready(Servers, RaName, 0), ct:pal("wait_for_messages_pending_ack", []), wait_for_messages_pending_ack(Servers, RaName, 0) - after 5000 -> + after ?TIMEOUT -> flush(500), exit(basic_deliver_timeout_3) end. @@ -2986,7 +2987,7 @@ subscribe_redelivery_limit_disable(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2, multiple = false, requeue = true}) - after 5000 -> + after ?TIMEOUT -> flush(1), ct:fail("message did not arrive as expected") end, @@ -3632,7 +3633,7 @@ receive_and_ack(Ch) -> redelivered = false}, _} -> amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = false}) - after 5000 -> + after ?TIMEOUT -> ct:fail("receive_and_ack timed out", []) end. @@ -3733,7 +3734,7 @@ per_message_ttl_mixed_expiry(Config) -> #amqp_msg{payload = Msg1}} -> amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = false}) - after 2000 -> + after ?TIMEOUT -> flush(10), ct:fail("basic deliver timeout") end, @@ -3761,7 +3762,7 @@ per_message_ttl_expiration_too_high(Config) -> {'DOWN', MonitorRef, process, Ch, {shutdown, {server_initiated_close, 406, <<"PRECONDITION_FAILED - invalid expiration", _/binary>>}}} -> ok - after 1000 -> + after ?TIMEOUT -> ct:fail("expected channel error") end. @@ -3883,7 +3884,7 @@ consumer_priorities(Config) -> {#'basic.deliver'{delivery_tag = D1, consumer_tag = Tag2}, _} -> D1 - after 5000 -> + after ?TIMEOUT -> flush(100), ct:fail("basic.deliver timeout") end, @@ -3893,7 +3894,7 @@ consumer_priorities(Config) -> {#'basic.deliver'{delivery_tag = _, consumer_tag = Tag2}, _} -> ok - after 5000 -> + after ?TIMEOUT -> flush(100), ct:fail("basic.deliver timeout") end, @@ -3904,7 +3905,7 @@ consumer_priorities(Config) -> {#'basic.deliver'{delivery_tag = _, consumer_tag = Tag1}, _} -> ok - after 5000 -> + after ?TIMEOUT -> flush(100), ct:fail("basic.deliver timeout") end, @@ -3917,7 +3918,7 @@ consumer_priorities(Config) -> {#'basic.deliver'{delivery_tag = _, consumer_tag = Tag2}, _} -> ok - after 5000 -> + after ?TIMEOUT -> flush(100), ct:fail("basic.deliver timeout") end, @@ -3945,7 +3946,7 @@ cancel_consumer_gh_3729(Config) -> {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> R = #'basic.reject'{delivery_tag = DeliveryTag, requeue = true}, ok = amqp_channel:cast(Ch, R) - after 5000 -> + after ?TIMEOUT -> flush(100), ct:fail("basic.deliver timeout") end, @@ -3954,7 +3955,7 @@ cancel_consumer_gh_3729(Config) -> receive #'basic.cancel_ok'{consumer_tag = <<"ctag">>} -> ok - after 5000 -> + after ?TIMEOUT -> flush(100), ct:fail("basic.cancel_ok timeout") end, @@ -3990,7 +3991,7 @@ cancel_consumer_gh_12424(Config) -> DeliveryTag = receive {#'basic.deliver'{delivery_tag = DT}, _} -> DT - after 5000 -> + after ?TIMEOUT -> flush(100), ct:fail("basic.deliver timeout") end, @@ -4023,7 +4024,7 @@ cancel_and_consume_with_same_tag(Config) -> {#'basic.deliver'{delivery_tag = D}, #amqp_msg{payload = <<"msg1">>}} -> D - after 5000 -> + after ?TIMEOUT -> flush(100), ct:fail("basic.deliver timeout") end, @@ -4038,7 +4039,7 @@ cancel_and_consume_with_same_tag(Config) -> {#'basic.deliver'{delivery_tag = _}, #amqp_msg{payload = <<"msg2">>}} -> ok - after 5000 -> + after ?TIMEOUT -> flush(100), ct:fail("basic.deliver timeout 2") end, @@ -4290,7 +4291,7 @@ requeue_multiple_true(Config) -> #amqp_msg{payload = P0}} -> ?assertEqual(P, P0), D - after 5000 -> ct:fail({basic_deliver_timeout, P, ?LINE}) + after ?TIMEOUT -> ct:fail({basic_deliver_timeout, P, ?LINE}) end || P <- Payloads], %% Requeue all messages. @@ -4303,7 +4304,7 @@ requeue_multiple_true(Config) -> [receive {#'basic.deliver'{redelivered = true}, #amqp_msg{payload = P1}} -> ?assertEqual(P, P1) - after 5000 -> ct:fail({basic_deliver_timeout, P, ?LINE}) + after ?TIMEOUT -> ct:fail({basic_deliver_timeout, P, ?LINE}) end || P <- Payloads], ?assertEqual(#'queue.delete_ok'{message_count = 0}, @@ -4328,7 +4329,7 @@ requeue_multiple_false(Config) -> #amqp_msg{payload = P0}} -> ?assertEqual(P, P0), D - after 5000 -> ct:fail({basic_deliver_timeout, P, ?LINE}) + after ?TIMEOUT -> ct:fail({basic_deliver_timeout, P, ?LINE}) end || P <- Payloads], %% The delivery tags we received via AMQP 0.9.1 are ordered from 1-100. @@ -4347,7 +4348,7 @@ requeue_multiple_false(Config) -> [receive {#'basic.deliver'{redelivered = true}, #amqp_msg{payload = P1}} -> ?assertEqual(integer_to_binary(D), P1) - after 5000 -> ct:fail({basic_deliver_timeout, ?LINE}) + after ?TIMEOUT -> ct:fail({basic_deliver_timeout, ?LINE}) end || D <- DTagsShuffled], ?assertEqual(#'queue.delete_ok'{message_count = 0}, @@ -4474,7 +4475,7 @@ subscribe(Ch, Queue, NoAck, Tag, Args) -> receive #'basic.consume_ok'{consumer_tag = Tag} -> ok - after 30000 -> + after ?TIMEOUT -> flush(100), exit(subscribe_timeout) end. @@ -4499,7 +4500,7 @@ nack(Ch, Multiple, Requeue) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = Multiple, requeue = Requeue}) - after 5000 -> + after ?TIMEOUT -> flush(10), ct:fail("basic deliver timeout") end. @@ -4572,7 +4573,7 @@ validate_queue(Ch, Queue, ExpectedMsgs) -> #amqp_msg{payload = M}} -> amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, multiple = false}) - after 5000 -> + after ?TIMEOUT -> flush(10), exit({validate_queue_timeout, M}) end diff --git a/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl b/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl index ec907a8411..3909c6e3a2 100644 --- a/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl +++ b/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl @@ -482,6 +482,6 @@ receive_delete_events(N, Evts) -> receive {mnesia_table_event, {delete, _, Record, _, _}} -> receive_delete_events(N - 1, [Record | Evts]) - after 10000 -> + after 30000 -> Evts end. diff --git a/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl b/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl index 5d4c39958e..eda2593be7 100644 --- a/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl @@ -755,7 +755,7 @@ publish_confirm(Ch, QName) -> ok; #'basic.nack'{} -> fail - after 2500 -> + after 30_000 -> ct:fail(confirm_timeout) end. @@ -871,14 +871,14 @@ many_target_queues(Config) -> receive #'basic.consume_ok'{consumer_tag = CTag} -> ok - after 2000 -> + after 30_000 -> exit(consume_ok_timeout) end, receive {#'basic.deliver'{consumer_tag = CTag}, #amqp_msg{payload = Msg1}} -> ok - after 2000 -> + after 30_000 -> exit(deliver_timeout) end, ?awaitMatch([{0, 0}], @@ -911,7 +911,7 @@ many_target_queues(Config) -> {#'basic.deliver'{consumer_tag = CTag}, #amqp_msg{payload = Msg2}} -> ok - after 0 -> + after 30_000 -> exit(deliver_timeout) end, ?assertEqual(2, counted(messages_dead_lettered_expired_total, Config)), diff --git a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl index fae1251d47..202a729b64 100644 --- a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl @@ -10,8 +10,9 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit_common/include/rabbit_framing.hrl"). --define(RA_EVENT_TIMEOUT, 5000). +-define(RA_EVENT_TIMEOUT, 30_000). -define(RA_SYSTEM, quorum_queues). +-define(TIMEOUT, 30_000). all() -> [ @@ -118,7 +119,7 @@ basics(Config) -> {ok, S, _} -> DeliverFun(S, F) end - after 5000 -> + after ?TIMEOUT -> flush(), exit(await_delivery_timeout) end @@ -136,7 +137,7 @@ basics(Config) -> ct:pal("ra_event ~p", [Evt]), {ok, F6, _} = rabbit_fifo_client:handle_ra_event(ClusterName, From, Evt, FState5), F6 - after 5000 -> + after ?TIMEOUT -> exit(leader_change_timeout) end, @@ -175,7 +176,7 @@ rabbit_fifo_returns_correlation(Config) -> Del -> exit({unexpected, Del}) end - after 2000 -> + after ?TIMEOUT -> exit(await_msg_timeout) end, rabbit_quorum_queue:stop_server(ServerId), @@ -208,7 +209,7 @@ duplicate_delivery(Config) -> end end end - after 2000 -> + after ?TIMEOUT -> exit(await_msg_timeout) end end, @@ -281,7 +282,7 @@ detects_lost_delivery(Config) -> receive {ra_event, _, {machine, {delivery, _, [{_, {_, msg1}}]}}} -> ok - after 5000 -> + after ?TIMEOUT -> exit(await_delivery_timeout) end, @@ -316,7 +317,7 @@ returns(Config) -> ?assertEqual(undefined, mc:get_annotation(<<"x-delivery-count">>, Msg1Out0)), ?assertEqual(undefined, mc:get_annotation(delivery_count, Msg1Out0)), rabbit_fifo_client:return(<<"tag">>, [MsgId], FC2) - after 5000 -> + after ?TIMEOUT -> flush(), exit(await_delivery_timeout) end, @@ -333,7 +334,7 @@ returns(Config) -> %% delivery_count should _not_ be incremented for a return ?assertEqual(undefined, mc:get_annotation(delivery_count, Msg1Out)), rabbit_fifo_client:modify(<<"tag">>, [MsgId1], true, false, #{}, FC4) - after 5000 -> + after ?TIMEOUT -> flush(), exit(await_delivery_timeout_2) end, @@ -349,7 +350,7 @@ returns(Config) -> %% delivery_count should be incremented for a modify with delivery_failed = true ?assertEqual(1, mc:get_annotation(delivery_count, Msg2Out)), rabbit_fifo_client:settle(<<"tag">>, [MsgId2], FC6) - after 5000 -> + after ?TIMEOUT -> flush(), exit(await_delivery_timeout_3) end, @@ -377,7 +378,7 @@ returns_after_down(Config) -> receive {'DOWN', MonRef, _, _, _} -> ok - after 5000 -> + after ?TIMEOUT -> ct:fail("waiting for process exit timed out") end, rabbit_ct_helpers:await_condition( @@ -411,7 +412,7 @@ resends_after_lost_applied(Config) -> receive {ra_event, _, {applied, _}} -> ok - after 500 -> + after ?TIMEOUT -> exit(await_ra_event_timeout) end, % send another message @@ -477,7 +478,7 @@ discard(Config) -> [msg1] = Letters, rejected = Reason, ok - after 500 -> + after ?TIMEOUT -> flush(), exit(dead_letter_timeout) end, @@ -571,7 +572,7 @@ lost_delivery(Config) -> {ra_event, _, Evt} -> ct:pal("dropping event ~tp", [Evt]), ok - after 500 -> + after ?TIMEOUT -> exit(await_ra_event_timeout) end, % send another message @@ -744,7 +745,7 @@ test_queries(Config) -> end), receive ready -> ok - after 5000 -> + after ?TIMEOUT -> exit(ready_timeout) end, F0 = rabbit_fifo_client:init([ServerId], 4), @@ -820,7 +821,7 @@ receive_ra_events(Applied, Deliveries, Acc) -> receive_ra_events(Applied, Deliveries - length(MsgIds), [Evt | Acc]); {ra_event, _, _} = Evt -> receive_ra_events(Applied, Deliveries, [Evt | Acc]) - after 5000 -> + after ?TIMEOUT -> exit({missing_events, Applied, Deliveries, Acc}) end. @@ -832,7 +833,7 @@ receive_ra_events(Acc) -> receive {ra_event, _, _} = Evt -> receive_ra_events([Evt | Acc]) - after 500 -> + after 1000 -> Acc end. @@ -892,7 +893,7 @@ flush() -> Msg -> ct:pal("flushed: ~w~n", [Msg]), flush() - after 10 -> + after 100 -> ok end. diff --git a/deps/rabbit/test/rabbit_local_random_exchange_SUITE.erl b/deps/rabbit/test/rabbit_local_random_exchange_SUITE.erl index a2e57f4761..a51eec8797 100644 --- a/deps/rabbit/test/rabbit_local_random_exchange_SUITE.erl +++ b/deps/rabbit/test/rabbit_local_random_exchange_SUITE.erl @@ -135,7 +135,7 @@ publish_expect_return(Config, E, Node) -> receive {#'basic.return'{}, _} -> ok - after 5000 -> + after 30_000 -> flush(100), ct:fail("no return received") end diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index c8b2f8aabc..2650f286e4 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -18,6 +18,7 @@ -import(rabbit_ct_helpers, [await_condition/2]). -define(WAIT, 5000). +-define(TIMEOUT, 30_000). suite() -> [{timetrap, 15 * 60_000}]. @@ -1009,7 +1010,7 @@ consume(Config) -> multiple = false}), _ = amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}), ok = amqp_channel:close(Ch1) - after 5000 -> + after ?TIMEOUT -> ct:fail(timeout) end, rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). @@ -1073,8 +1074,8 @@ consume_timestamp_offset(Config) -> receive #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> ok - after 5000 -> - flush(), + after ?TIMEOUT -> + flush(), exit(consume_ok_timeout) end, @@ -1108,7 +1109,7 @@ consume_timestamp_last_offset(Config) -> receive #'basic.consume_ok'{consumer_tag = CTag} -> ok - after 5000 -> + after ?TIMEOUT -> exit(missing_consume_ok) end, @@ -1178,7 +1179,7 @@ basic_cancel(Config) -> {#'basic.deliver'{}, _} -> amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = CTag}), ?assertMatch([], filter_consumers(Config, Server, CTag)) - after 10000 -> + after ?TIMEOUT -> exit(timeout) end, rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). @@ -1203,7 +1204,7 @@ receive_basic_cancel_on_queue_deletion(Config) -> receive #'basic.cancel'{consumer_tag = CTag} -> ok - after 10000 -> + after ?TIMEOUT -> exit(timeout) end. @@ -1261,7 +1262,7 @@ keep_consuming_on_leader_restart(Config) -> {#'basic.deliver'{delivery_tag = DeliveryTag1}, _} -> ok = amqp_channel:cast(Ch2, #'basic.ack'{delivery_tag = DeliveryTag1, multiple = false}) - after 5000 -> + after ?TIMEOUT -> exit(timeout) end, @@ -1275,7 +1276,7 @@ keep_consuming_on_leader_restart(Config) -> {#'basic.deliver'{delivery_tag = DeliveryTag2}, _} -> ok = amqp_channel:cast(Ch2, #'basic.ack'{delivery_tag = DeliveryTag2, multiple = false}) - after 5000 -> + after ?TIMEOUT -> exit(timeout) end, @@ -1396,7 +1397,7 @@ consume_and_(Config, AckFun) -> ?assertMatch({'queue.declare_ok', Q, _MsgCount, 0}, declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]) - after 5000 -> + after ?TIMEOUT -> exit(timeout) end, rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). @@ -1588,7 +1589,7 @@ consume_from_next(Config, Args) -> receive #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> ok - after 10000 -> + after ?TIMEOUT -> exit(consume_ok_failed) end, @@ -1684,7 +1685,7 @@ consume_while_deleting_replica(Config) -> ok; {_, #amqp_msg{}} -> exit(unexpected_message) - after 30000 -> + after ?TIMEOUT -> exit(missing_consumer_cancel) end, @@ -1712,10 +1713,10 @@ consume_credit(Config) -> %% We expect to receive exactly 2 messages. DTag1 = receive {#'basic.deliver'{delivery_tag = Tag1}, _} -> Tag1 - after 5000 -> ct:fail({missing_delivery, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE}) end, _DTag2 = receive {#'basic.deliver'{delivery_tag = Tag2}, _} -> Tag2 - after 5000 -> ct:fail({missing_delivery, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE}) end, receive {#'basic.deliver'{}, _} -> ct:fail({unexpected_delivery, ?LINE}) after 100 -> ok @@ -1725,7 +1726,7 @@ consume_credit(Config) -> ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DTag1, multiple = false}), DTag3 = receive {#'basic.deliver'{delivery_tag = Tag3}, _} -> Tag3 - after 5000 -> ct:fail({missing_delivery, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE}) end, receive {#'basic.deliver'{}, _} -> ct:fail({unexpected_delivery, ?LINE}) @@ -1747,11 +1748,11 @@ consume_credit0(Ch, DTagPrev) -> multiple = true}), %% Receive 1st message. receive {#'basic.deliver'{}, _} -> ok - after 5000 -> ct:fail({missing_delivery, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE}) end, %% Receive 2nd message. DTag = receive {#'basic.deliver'{delivery_tag = T}, _} -> T - after 5000 -> ct:fail({missing_delivery, ?LINE}) + after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE}) end, %% We shouldn't receive more messages given that AMQP 0.9.1 prefetch count is 2. receive {#'basic.deliver'{}, _} -> ct:fail({unexpected_delivery, ?LINE}) @@ -1813,7 +1814,7 @@ consume_credit_out_of_order_ack(Config) -> receive {#'basic.deliver'{}, _} -> ok - after 5000 -> + after ?TIMEOUT -> exit(timeout) end, rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). @@ -1853,7 +1854,7 @@ consume_credit_multiple_ack(Config) -> receive {#'basic.deliver'{}, _} -> ok - after 5000 -> + after ?TIMEOUT -> exit(timeout) end, rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). @@ -2066,7 +2067,7 @@ leader_failover_dedupe(Config) -> N = receive {last_msg, X} -> X - after 2000 -> + after ?TIMEOUT -> exit(last_msg_timeout) end, %% validate that no duplicates were written even though an internal @@ -2508,7 +2509,7 @@ dead_letter_target(Config) -> receive #'basic.consume_ok'{consumer_tag = CTag} -> ok - after 5000 -> + after ?TIMEOUT -> exit(basic_consume_ok_timeout) end, receive @@ -2517,7 +2518,7 @@ dead_letter_target(Config) -> requeue =false, multiple = false}), queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]) - after 5000 -> + after ?TIMEOUT -> exit(timeout) end, rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). @@ -2602,7 +2603,7 @@ receive_filtered_batch(Ch, Count, ExpectedSize) -> ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = false}), receive_filtered_batch(Ch, Count + 1, ExpectedSize) - after 10000 -> + after ?TIMEOUT -> flush(), exit({not_enough_messages, Count}) end. diff --git a/deps/rabbit/test/rabbitmq_queues_cli_integration_SUITE.erl b/deps/rabbit/test/rabbitmq_queues_cli_integration_SUITE.erl index fc56dfbed9..2d869be8f8 100644 --- a/deps/rabbit/test/rabbitmq_queues_cli_integration_SUITE.erl +++ b/deps/rabbit/test/rabbitmq_queues_cli_integration_SUITE.erl @@ -161,6 +161,6 @@ publish_confirm(Ch, QName) -> #'basic.nack'{} -> ct:pal("NOT CONFIRMED! ~ts", [QName]), fail - after 10000 -> + after 30000 -> exit(confirm_timeout) end. diff --git a/deps/rabbit/test/single_active_consumer_SUITE.erl b/deps/rabbit/test/single_active_consumer_SUITE.erl index ac682ad957..7db3314d1a 100644 --- a/deps/rabbit/test/single_active_consumer_SUITE.erl +++ b/deps/rabbit/test/single_active_consumer_SUITE.erl @@ -179,13 +179,13 @@ fallback_to_another_consumer_when_first_one_is_cancelled_qos1(Config) -> amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, consumer_tag = CTag1}, self()), receive #'basic.consume_ok'{consumer_tag = CTag1} -> ok - after 5000 -> ct:fail(timeout_ctag1) + after ?TIMEOUT -> ct:fail(timeout_ctag1) end, amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, consumer_tag = CTag2}, self()), receive #'basic.consume_ok'{consumer_tag = CTag2} -> ok - after 5000 -> ct:fail(timeout_ctag2) + after ?TIMEOUT -> ct:fail(timeout_ctag2) end, Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, @@ -195,12 +195,12 @@ fallback_to_another_consumer_when_first_one_is_cancelled_qos1(Config) -> DTag1 = receive {#'basic.deliver'{consumer_tag = CTag1, delivery_tag = DTag}, #amqp_msg{payload = <<"m1">>}} -> DTag - after 5000 -> ct:fail(timeout_m1) + after ?TIMEOUT -> ct:fail(timeout_m1) end, #'basic.cancel_ok'{consumer_tag = CTag1} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag1}), receive #'basic.cancel_ok'{consumer_tag = CTag1} -> ok - after 5000 -> ct:fail(missing_cancel) + after ?TIMEOUT -> ct:fail(missing_cancel) end, amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag1}), @@ -208,7 +208,7 @@ fallback_to_another_consumer_when_first_one_is_cancelled_qos1(Config) -> receive {#'basic.deliver'{consumer_tag = CTag2}, #amqp_msg{payload = <<"m2">>}} -> ok; Unexpected -> ct:fail({unexpected, Unexpected}) - after 5000 -> ct:fail(timeout_m2) + after ?TIMEOUT -> ct:fail(timeout_m2) end, amqp_connection:close(C). @@ -385,7 +385,7 @@ wait_for_messages(ExpectedCount, State) -> receive {message, {MessagesPerConsumer, MessageCount}} -> wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount}) - after 5000 -> + after ?TIMEOUT -> {missing, ExpectedCount, State} end. @@ -393,7 +393,7 @@ wait_for_cancel_ok() -> receive {cancel_ok, CTag} -> {cancel_ok, CTag} - after 5000 -> + after ?TIMEOUT -> throw(consumer_cancel_ok_timeout) end. @@ -402,7 +402,7 @@ receive_deliver() -> {#'basic.deliver'{consumer_tag = CTag, delivery_tag = DTag}, _} -> {CTag, DTag} - after 5000 -> + after ?TIMEOUT -> exit(deliver_timeout) end. diff --git a/deps/rabbit/test/topic_permission_SUITE.erl b/deps/rabbit/test/topic_permission_SUITE.erl index b7c2e10b24..fd5bf398be 100644 --- a/deps/rabbit/test/topic_permission_SUITE.erl +++ b/deps/rabbit/test/topic_permission_SUITE.erl @@ -111,7 +111,7 @@ amqp_x_cc_annotation(Config) -> ?assertEqual( <<"write access to topic 'x.1' in exchange 'amq.topic' in vhost '/' refused for user 'guest'">>, Description1) - after 5000 -> amqp_utils:flush(missing_ended), + after 30_000 -> amqp_utils:flush(missing_ended), ct:fail({missing_event, ?LINE}) end, @@ -134,7 +134,7 @@ amqp_x_cc_annotation(Config) -> ?assertEqual( <<"write access to topic 'x.2' in exchange 'amq.topic' in vhost '/' refused for user 'guest'">>, Description2) - after 5000 -> amqp_utils:flush(missing_ended), + after 30_000 -> amqp_utils:flush(missing_ended), ct:fail({missing_event, ?LINE}) end, @@ -178,7 +178,7 @@ amqpl_headers(Header, Config) -> #amqp_msg{payload = <<"m1">>, props = #'P_basic'{headers = [{Header, array, [{longstr, <<"a.2">>}]}]}}), receive #'basic.ack'{} -> ok - after 5000 -> ct:fail({missing_confirm, ?LINE}) + after 30_000 -> ct:fail({missing_confirm, ?LINE}) end, monitor(process, Ch1), @@ -412,6 +412,6 @@ assert_channel_down(Ch, Reason) -> {shutdown, {server_initiated_close, 403, Reason}}} -> ok - after 5000 -> + after 30_000 -> ct:fail({did_not_receive, Reason}) end. diff --git a/deps/rabbit/test/transactions_SUITE.erl b/deps/rabbit/test/transactions_SUITE.erl index 5dae3b8ebf..a5e7628792 100644 --- a/deps/rabbit/test/transactions_SUITE.erl +++ b/deps/rabbit/test/transactions_SUITE.erl @@ -90,7 +90,7 @@ return_after_commit(Config) -> Result = receive {#'basic.return'{}, _} -> return_before_commit - after 1000 -> + after 30_000 -> return_after_commit end, ?assertEqual(return_after_commit, Result), diff --git a/deps/rabbit/test/unicode_SUITE.erl b/deps/rabbit/test/unicode_SUITE.erl index 4f28d1362c..10798fa452 100644 --- a/deps/rabbit/test/unicode_SUITE.erl +++ b/deps/rabbit/test/unicode_SUITE.erl @@ -103,7 +103,7 @@ stream(Config) -> DelTag = receive {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> DeliveryTag - after 5000 -> + after 30_000 -> ct:fail(timeout) end, ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DelTag, diff --git a/deps/rabbit/test/unit_file_handle_cache_SUITE.erl b/deps/rabbit/test/unit_file_handle_cache_SUITE.erl index 5ba8931598..b75da557d1 100644 --- a/deps/rabbit/test/unit_file_handle_cache_SUITE.erl +++ b/deps/rabbit/test/unit_file_handle_cache_SUITE.erl @@ -173,7 +173,7 @@ file_handle_cache_reserve_open_file_above_limit1(_Config) -> receive opened -> throw(error_file_opened) - after 1000 -> + after 30_000 -> %% Let's release 5 file handles, that should leave %% enough free for the `open` to go through file_handle_cache:set_reservation(2), @@ -183,7 +183,7 @@ file_handle_cache_reserve_open_file_above_limit1(_Config) -> opened -> ok = file_handle_cache:set_limit(Limit), passed - after 5000 -> + after 30_000 -> throw(error_file_not_released) end end.