Test: Increase receive timeout in all rabbit test suites

This commit is contained in:
Diana Parra Corbacho 2024-12-12 15:51:59 +01:00
parent 43cfc3c937
commit fe7a141331
33 changed files with 248 additions and 229 deletions

View File

@ -22,6 +22,8 @@
[flush/1,
wait_for_credit/1]).
-define(TIMEOUT, 30_000).
all() ->
[
{group, v1_permitted},
@ -216,7 +218,7 @@ target_exchange_absent(Config) ->
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, <<"no exchange '", XName:(byte_size(XName))/binary,
"' in vhost '/'">>}}}}} -> ok
after 5000 ->
after ?TIMEOUT ->
Reason = {missing_event, ?LINE},
flush(Reason),
ct:fail(Reason)
@ -275,7 +277,7 @@ target_queue_absent(Config) ->
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, <<"no queue '", QName:(byte_size(QName))/binary,
"' in vhost '/'">>}}}}} -> ok
after 5000 ->
after ?TIMEOUT ->
Reason = {missing_event, ?LINE},
flush(Reason),
ct:fail(Reason)
@ -403,7 +405,7 @@ target_per_message_unset_to_address(Config) ->
ok = amqp10_client:detach_link(Sender),
receive {amqp10_event, {link, Sender, {detached, normal}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).
@ -467,7 +469,7 @@ target_per_message_bad_to_address(Config) ->
?assertMatch(#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"bad 'to' address", _Rest/binary>>}},
Error)
after 5000 ->
after ?TIMEOUT ->
flush(missing_disposition),
ct:fail(missing_disposition)
end
@ -507,7 +509,7 @@ target_per_message_exchange_absent_settled(Config) ->
info = {map, [{{symbol, <<"delivery-tag">>}, {binary, DTag2}}]}
},
Error)
after 5000 -> ct:fail("server did not close our outgoing link")
after ?TIMEOUT -> ct:fail("server did not close our outgoing link")
end,
ok = cleanup(Init).
@ -566,7 +568,7 @@ target_bad_address0(TargetAddress, Config) ->
{session, Session,
{ended,
#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD}}}} -> ok
after 5000 ->
after ?TIMEOUT ->
Reason = {missing_event, ?LINE, TargetAddress},
flush(Reason),
ct:fail(Reason)
@ -593,7 +595,7 @@ source_queue_absent(Config) ->
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, <<"no queue '", QName:(byte_size(QName))/binary,
"' in vhost '/'">>}}}}} -> ok
after 5000 ->
after ?TIMEOUT ->
Reason = {missing_event, ?LINE},
flush(Reason),
ct:fail(Reason)
@ -626,7 +628,7 @@ source_bad_address0(SourceAddress, Config) ->
{session, Session,
{ended,
#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD}}}} -> ok
after 5000 ->
after ?TIMEOUT ->
Reason = {missing_event, ?LINE},
flush(Reason),
ct:fail(Reason)
@ -657,7 +659,7 @@ wait_for_settled(State, Tag) ->
receive
{amqp10_disposition, {State, Tag}} ->
ok
after 5000 ->
after ?TIMEOUT ->
Reason = {?FUNCTION_NAME, State, Tag},
flush(Reason),
ct:fail(Reason)

View File

@ -14,6 +14,8 @@
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-define(TIMEOUT, 30_000).
-import(rabbit_ct_broker_helpers,
[rpc/4]).
-import(rabbit_ct_helpers,
@ -153,8 +155,8 @@ v1_attach_target_queue(Config) ->
<<"configure access to queue 'test queue' in vhost "
"'test vhost' refused for user 'test user'">>),
receive {amqp10_event, {session, Session1, {ended, ExpectedErr1}}} -> ok
after 5000 -> flush(missing_ended),
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
after ?TIMEOUT -> flush(missing_ended),
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
end,
%% Give the user configure permissions on the queue.
@ -166,7 +168,7 @@ v1_attach_target_queue(Config) ->
<<"write access to exchange 'amq.default' in vhost "
"'test vhost' refused for user 'test user'">>),
receive {amqp10_event, {session, Session2, {ended, ExpectedErr2}}} -> ok
after 5000 -> flush(missing_ended),
after ?TIMEOUT -> flush(missing_ended),
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
end,
@ -177,8 +179,8 @@ v1_attach_target_queue(Config) ->
{ok, Sender3} = amqp10_client:attach_sender_link(
Session3, <<"test-sender-3">>, TargetAddress),
receive {amqp10_event, {link, Sender3, attached}} -> ok
after 5000 -> flush(missing_attached),
ct:fail("missing ATTACH from server")
after ?TIMEOUT -> flush(missing_attached),
ct:fail("missing ATTACH from server")
end,
ok = close_connection_sync(Connection).
@ -202,8 +204,8 @@ v1_attach_source_exchange(Config) ->
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
description = {utf8, <<"configure access to queue 'amq.gen", _/binary>>}}}}} -> ok
after 5000 -> flush(missing_ended),
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
after ?TIMEOUT -> flush(missing_ended),
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
end,
%% Give the user configure permissions on the queue.
@ -218,7 +220,7 @@ v1_attach_source_exchange(Config) ->
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
description = {utf8, <<"write access to queue 'amq.gen", _/binary>>}}}}} -> ok
after 5000 -> flush(missing_ended),
after ?TIMEOUT -> flush(missing_ended),
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
end,
@ -231,7 +233,7 @@ v1_attach_source_exchange(Config) ->
<<"read access to exchange 'amq.fanout' in vhost "
"'test vhost' refused for user 'test user'">>),
receive {amqp10_event, {session, Session3, {ended, ExpectedErr1}}} -> ok
after 5000 -> flush(missing_ended),
after ?TIMEOUT -> flush(missing_ended),
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
end,
@ -247,7 +249,7 @@ v1_attach_source_exchange(Config) ->
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
description = {utf8, <<"read access to queue 'amq.gen", _/binary>>}}}}} -> ok
after 5000 -> flush(missing_ended),
after ?TIMEOUT -> flush(missing_ended),
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
end,
@ -258,7 +260,7 @@ v1_attach_source_exchange(Config) ->
{ok, Recv5} = amqp10_client:attach_receiver_link(
Session5, <<"receiver-5">>, SourceAddress),
receive {amqp10_event, {link, Recv5, attached}} -> ok
after 5000 -> flush(missing_attached),
after ?TIMEOUT -> flush(missing_attached),
ct:fail("missing ATTACH from server")
end,
@ -290,7 +292,7 @@ send_to_topic(TargetAddress, Config) ->
<<"write access to topic 'test vhost.test user.a.b' in exchange "
"'amq.topic' in vhost 'test vhost' refused for user 'test user'">>),
receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok
after 5000 -> flush(missing_ended),
after ?TIMEOUT -> flush(missing_ended),
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
end,
@ -304,7 +306,7 @@ send_to_topic(TargetAddress, Config) ->
ok = amqp10_client:send_msg(Sender2, Msg2),
%% We expect RELEASED since no queue is bound.
receive {amqp10_disposition, {released, Dtag}} -> ok
after 5000 -> ct:fail(released_timeout)
after ?TIMEOUT -> ct:fail(released_timeout)
end,
ok = amqp10_client:detach_link(Sender2),
@ -330,7 +332,7 @@ v1_send_to_topic_using_subject(Config) ->
ok = amqp10_client:send_msg(Sender, Msg1b),
%% We have sufficient authorization, but expect RELEASED since no queue is bound.
receive {amqp10_disposition, {released, Dtag1}} -> ok
after 5000 -> ct:fail(released_timeout)
after ?TIMEOUT -> ct:fail(released_timeout)
end,
Dtag2 = <<"dtag 2">>,
@ -342,7 +344,7 @@ v1_send_to_topic_using_subject(Config) ->
<<"write access to topic '.a.b' in exchange 'amq.topic' in "
"vhost 'test vhost' refused for user 'test user'">>),
receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok
after 5000 -> flush(missing_ended),
after ?TIMEOUT -> flush(missing_ended),
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
end,
@ -374,7 +376,7 @@ attach_source_topic0(SourceAddress, Config) ->
<<"read access to topic 'test vhost.test user.a.b' in exchange "
"'amq.topic' in vhost 'test vhost' refused for user 'test user'">>),
receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok
after 5000 -> flush(missing_ended),
after ?TIMEOUT -> flush(missing_ended),
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
end,
@ -383,7 +385,7 @@ attach_source_topic0(SourceAddress, Config) ->
{ok, Recv2} = amqp10_client:attach_receiver_link(
Session2, <<"receiver-2">>, SourceAddress),
receive {amqp10_event, {link, Recv2, attached}} -> ok
after 5000 -> flush(missing_attached),
after ?TIMEOUT -> flush(missing_attached),
ct:fail("missing ATTACH from server")
end,
@ -405,7 +407,7 @@ v1_attach_target_internal_exchange(Config) ->
ExpectedErr = error_unauthorized(
<<"forbidden to publish to internal exchange 'test exchange' in vhost '/'">>),
receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok
after 5000 -> flush(missing_ended),
after ?TIMEOUT -> flush(missing_ended),
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
end,
@ -429,7 +431,7 @@ attach_source_queue(Config) ->
receive {amqp10_event,
{session, Session,
{ended, ExpectedErr}}} -> ok
after 5000 -> flush(missing_ended),
after ?TIMEOUT -> flush(missing_ended),
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
end,
ok = close_connection_sync(Conn).
@ -448,13 +450,13 @@ attach_target_exchange(Config) ->
<<"write access to exchange '", XName/binary,
"' in vhost 'test vhost' refused for user 'test user'">>),
receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
{ok, Session2} = amqp10_client:begin_session_sync(Connection),
{ok, _} = amqp10_client:attach_sender_link(Session2, <<"test-sender">>, Address2),
receive {amqp10_event, {session, Session2, {ended, ExpectedErr}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
ok = amqp10_client:close_connection(Connection).
@ -478,7 +480,7 @@ attach_target_queue(Config) ->
<<"write access to exchange 'amq.default' ",
"in vhost 'test vhost' refused for user 'test user'">>),
receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
ok = amqp10_client:close_connection(Conn).
@ -500,7 +502,7 @@ target_per_message_exchange(Config) ->
Msg1 = amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, <<"m1">>)),
ok = amqp10_client:send_msg(Sender, Msg1),
receive {amqp10_disposition, {released, Tag1}} -> ok
after 5000 -> ct:fail(released_timeout)
after ?TIMEOUT -> ct:fail(released_timeout)
end,
%% We don't have sufficient authorization.
@ -511,7 +513,7 @@ target_per_message_exchange(Config) ->
<<"write access to exchange 'amq.default' in "
"vhost 'test vhost' refused for user 'test user'">>),
receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
ok = close_connection_sync(Connection).
@ -534,7 +536,7 @@ target_per_message_internal_exchange(Config) ->
ExpectedErr = error_unauthorized(
<<"forbidden to publish to internal exchange '", XName/binary, "' in vhost 'test vhost'">>),
receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok
after 5000 -> flush(missing_event),
after ?TIMEOUT -> flush(missing_event),
ct:fail({missing_event, ?LINE})
end,
ok = close_connection_sync(Conn1),
@ -563,7 +565,7 @@ target_per_message_topic(Config) ->
Msg1 = amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, <<"m1">>)),
ok = amqp10_client:send_msg(Sender, Msg1),
receive {amqp10_disposition, {released, Tag1}} -> ok
after 5000 -> ct:fail(released_timeout)
after ?TIMEOUT -> ct:fail(released_timeout)
end,
%% We don't have sufficient authorization.
@ -574,7 +576,7 @@ target_per_message_topic(Config) ->
<<"write access to topic '.a.b' in exchange 'amq.topic' in "
"vhost 'test vhost' refused for user 'test user'">>),
receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
ok = close_connection_sync(Connection).
@ -594,7 +596,7 @@ authn_failure_event(Config) ->
{ok, Connection} = amqp10_client:open_connection(OpnConf),
receive {amqp10_event, {connection, Connection, {closed, sasl_auth_failure}}} -> ok
after 5000 -> flush(missing_closed),
after ?TIMEOUT -> flush(missing_closed),
ct:fail("did not receive sasl_auth_failure")
end,
@ -621,7 +623,7 @@ sasl_success(Mechanism, Config) ->
OpnConf = OpnConf0#{sasl := Mechanism},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
receive {amqp10_event, {connection, Connection, opened}} -> ok
after 5000 -> ct:fail(missing_opened)
after ?TIMEOUT -> ct:fail(missing_opened)
end,
ok = amqp10_client:close_connection(Connection).
@ -638,7 +640,7 @@ sasl_anonymous_failure(Config) ->
{ok, Connection} = amqp10_client:open_connection(OpnConf),
receive {amqp10_event, {connection, Connection, {closed, Reason}}} ->
?assertEqual({sasl_not_supported, Mechanism}, Reason)
after 5000 -> ct:fail(missing_closed)
after ?TIMEOUT -> ct:fail(missing_closed)
end,
ok = rpc(Config, application, set_env, [App, Par, Default]).
@ -649,7 +651,7 @@ sasl_plain_failure(Config) ->
{ok, Connection} = amqp10_client:open_connection(OpnConf),
receive {amqp10_event, {connection, Connection, {closed, Reason}}} ->
?assertEqual(sasl_auth_failure, Reason)
after 5000 -> ct:fail(missing_closed)
after ?TIMEOUT -> ct:fail(missing_closed)
end.
%% Skipping SASL is disallowed in RabbitMQ.
@ -658,14 +660,14 @@ sasl_none_failure(Config) ->
OpnConf = OpnConf0#{sasl := none},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
receive {amqp10_event, {connection, Connection, {closed, _Reason}}} -> ok
after 5000 -> ct:fail(missing_closed)
after ?TIMEOUT -> ct:fail(missing_closed)
end.
vhost_absent(Config) ->
OpnConf = connection_config(Config, <<"this vhost does not exist">>),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
receive {amqp10_event, {connection, Connection, {closed, _}}} -> ok
after 5000 -> ct:fail(missing_closed)
after ?TIMEOUT -> ct:fail(missing_closed)
end.
vhost_connection_limit(Config) ->
@ -675,22 +677,22 @@ vhost_connection_limit(Config) ->
OpnConf = connection_config(Config),
{ok, C1} = amqp10_client:open_connection(OpnConf),
receive {amqp10_event, {connection, C1, opened}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
{ok, C2} = amqp10_client:open_connection(OpnConf),
receive {amqp10_event, {connection, C2, {closed, _}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
OpnConf0 = connection_config(Config, <<"/">>),
OpnConf1 = OpnConf0#{sasl := anon},
{ok, C3} = amqp10_client:open_connection(OpnConf1),
receive {amqp10_event, {connection, C3, opened}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
{ok, C4} = amqp10_client:open_connection(OpnConf1),
receive {amqp10_event, {connection, C4, opened}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
[ok = close_connection_sync(C) || C <- [C1, C3, C4]],
@ -704,12 +706,12 @@ user_connection_limit(Config) ->
OpnConf = OpnConf0#{sasl := anon},
{ok, C1} = amqp10_client:open_connection(OpnConf),
receive {amqp10_event, {connection, C1, {closed, _}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
{ok, C2} = amqp10_client:open_connection(connection_config(Config)),
receive {amqp10_event, {connection, C2, opened}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
ok = close_connection_sync(C2),
@ -731,7 +733,7 @@ v1_vhost_queue_limit(Config) ->
?V_1_0_AMQP_ERROR_RESOURCE_LIMIT_EXCEEDED,
<<"cannot declare queue 'q1': queue limit in vhost 'test vhost' (0) is reached">>),
receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok
after 5000 -> flush(missing_ended),
after ?TIMEOUT -> flush(missing_ended),
ct:fail("did not receive expected error")
end,
@ -742,8 +744,8 @@ v1_vhost_queue_limit(Config) ->
{ok, Sender2} = amqp10_client:attach_sender_link(
Session2, <<"test-sender-2">>, TargetAddress),
receive {amqp10_event, {link, Sender2, attached}} -> ok
after 5000 -> flush(missing_attached),
ct:fail("missing ATTACH from server")
after ?TIMEOUT -> flush(missing_attached),
ct:fail("missing ATTACH from server")
end,
ok = close_connection_sync(C1),

View File

@ -1260,7 +1260,7 @@ drain_many(Config, QueueType, QName)
after 30000 -> ct:fail({missing_delivery, ?LINE})
end,
receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok
after 300 -> ct:fail("expected credit_exhausted")
after 30000 -> ct:fail("expected credit_exhausted")
end,
ok = amqp10_client:detach_link(Sender),

View File

@ -80,10 +80,10 @@ credit_api_v2(Config) ->
{ok, CQSender} = amqp10_client:attach_sender_link(Session, <<"cq sender">>, CQAddr),
{ok, QQSender} = amqp10_client:attach_sender_link(Session, <<"qq sender">>, QQAddr),
receive {amqp10_event, {link, CQSender, credited}} -> ok
after 5000 -> ct:fail(credited_timeout)
after 30_000 -> ct:fail(credited_timeout)
end,
receive {amqp10_event, {link, QQSender, credited}} -> ok
after 5000 -> ct:fail(credited_timeout)
after 30_000 -> ct:fail(credited_timeout)
end,
%% Send 40 messages to each queue.
@ -146,7 +146,7 @@ credit_api_v2(Config) ->
%% Draining should also work.
ok = amqp10_client:flow_link_credit(CQReceiver3, 10, never, true),
receive {amqp10_event, {link, CQReceiver3, credit_exhausted}} -> ok
after 5000 -> ct:fail({missing_credit_exhausted, ?LINE})
after 30_000 -> ct:fail({missing_credit_exhausted, ?LINE})
end,
receive Unexpected1 -> ct:fail({unexpected, ?LINE, Unexpected1})
after 20 -> ok
@ -154,7 +154,7 @@ credit_api_v2(Config) ->
ok = amqp10_client:flow_link_credit(QQReceiver3, 10, never, true),
receive {amqp10_event, {link, QQReceiver3, credit_exhausted}} -> ok
after 5000 -> ct:fail({missing_credit_exhausted, ?LINE})
after 30_000 -> ct:fail({missing_credit_exhausted, ?LINE})
end,
receive Unexpected2 -> ct:fail({unexpected, ?LINE, Unexpected2})
after 20 -> ok
@ -166,11 +166,11 @@ credit_api_v2(Config) ->
ok = detach_sync(QQReceiver3),
ok = amqp10_client:end_session(Session),
receive {amqp10_event, {session, Session, {ended, _}}} -> ok
after 5000 -> ct:fail(missing_ended)
after 30_000 -> ct:fail(missing_ended)
end,
ok = amqp10_client:close_connection(Connection),
receive {amqp10_event, {connection, Connection, {closed, normal}}} -> ok
after 5000 -> ct:fail(missing_closed)
after 30_000 -> ct:fail(missing_closed)
end.
consume_and_accept(NumMsgs, Receiver) ->
@ -192,14 +192,14 @@ receive_messages0(Receiver, N, Acc) ->
receive
{amqp10_msg, Receiver, Msg} ->
receive_messages0(Receiver, N - 1, [Msg | Acc])
after 5000 ->
after 30_000 ->
exit({timeout, {num_received, length(Acc)}, {num_missing, N}})
end.
detach_sync(Receiver) ->
ok = amqp10_client:detach_link(Receiver),
receive {amqp10_event, {link, Receiver, {detached, normal}}} -> ok
after 5000 -> ct:fail({missing_detached, Receiver})
after 30_000 -> ct:fail({missing_detached, Receiver})
end.
flush(Prefix) ->

View File

@ -55,7 +55,7 @@ wait_for_credit(Sender) ->
receive
{amqp10_event, {link, Sender, credited}} ->
ok
after 5000 ->
after 30_000 ->
flush("wait_for_credit timed out"),
ct:fail(credited_timeout)
end.
@ -66,7 +66,7 @@ wait_for_accepts(N) ->
receive
{amqp10_disposition, {accepted, _}} ->
wait_for_accepts(N - 1)
after 5000 ->
after 30_000 ->
ct:fail({missing_accepted, N})
end.
@ -108,9 +108,9 @@ wait_for_link_detach(Link) ->
{amqp10_event, {link, Link, {detached, #'v1_0.detach'{}}}} ->
flush(?FUNCTION_NAME),
ok
after 5000 ->
flush("wait_for_link_detach timed out"),
ct:fail({link_detach_timeout, Link})
after 30_000 ->
flush("wait_for_link_detach timed out"),
ct:fail({link_detach_timeout, Link})
end.
end_session_sync(Session)
@ -123,9 +123,9 @@ wait_for_session_end(Session) ->
{amqp10_event, {session, Session, {ended, _}}} ->
flush(?FUNCTION_NAME),
ok
after 5000 ->
flush("wait_for_session_end timed out"),
ct:fail({session_end_timeout, Session})
after 30_000 ->
flush("wait_for_session_end timed out"),
ct:fail({session_end_timeout, Session})
end.
close_connection_sync(Connection)
@ -138,7 +138,7 @@ wait_for_connection_close(Connection) ->
{amqp10_event, {connection, Connection, {closed, normal}}} ->
flush(?FUNCTION_NAME),
ok
after 5000 ->
flush("wait_for_connection_close timed out"),
ct:fail({connection_close_timeout, Connection})
after 30_000 ->
flush("wait_for_connection_close timed out"),
ct:fail({connection_close_timeout, Connection})
end.

View File

@ -19,6 +19,8 @@
-import(rabbit_ct_helpers,
[eventually/3]).
-define(TIMEOUT, 30_000).
all() ->
[
{group, tests}
@ -95,7 +97,7 @@ requeue_one_channel(QType, Config) ->
self()),
receive #'basic.consume_ok'{consumer_tag = Ctag} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
[begin
@ -107,19 +109,19 @@ requeue_one_channel(QType, Config) ->
receive {#'basic.deliver'{},
#amqp_msg{payload = <<"1">>}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
receive {#'basic.deliver'{},
#amqp_msg{payload = <<"2">>}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
D3 = receive {#'basic.deliver'{delivery_tag = Del3},
#amqp_msg{payload = <<"3">>}} -> Del3
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
receive {#'basic.deliver'{},
#amqp_msg{payload = <<"4">>}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
assert_messages(QName, 4, 4, Config),
@ -132,18 +134,18 @@ requeue_one_channel(QType, Config) ->
receive {#'basic.deliver'{},
#amqp_msg{payload = P1}} ->
?assertEqual(<<"1">>, P1)
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
receive {#'basic.deliver'{},
#amqp_msg{payload = P2}} ->
?assertEqual(<<"2">>, P2)
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
D3b = receive {#'basic.deliver'{delivery_tag = Del3b},
#amqp_msg{payload = P3}} ->
?assertEqual(<<"3">>, P3),
Del3b
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
assert_messages(QName, 4, 4, Config),
@ -181,7 +183,7 @@ requeue_two_channels(QType, Config) ->
self()),
receive #'basic.consume_ok'{consumer_tag = Ctag1} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
amqp_channel:subscribe(Ch2,
@ -189,7 +191,7 @@ requeue_two_channels(QType, Config) ->
consumer_tag = Ctag2},
self()),
receive #'basic.consume_ok'{consumer_tag = Ctag2} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
[begin
@ -203,22 +205,22 @@ requeue_two_channels(QType, Config) ->
receive {#'basic.deliver'{consumer_tag = C1},
#amqp_msg{payload = <<"1">>}} ->
?assertEqual(Ctag1, C1)
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
receive {#'basic.deliver'{consumer_tag = C2},
#amqp_msg{payload = <<"2">>}} ->
?assertEqual(Ctag2, C2)
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
receive {#'basic.deliver'{consumer_tag = C3},
#amqp_msg{payload = <<"3">>}} ->
?assertEqual(Ctag1, C3)
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
receive {#'basic.deliver'{consumer_tag = C4},
#amqp_msg{payload = <<"4">>}} ->
?assertEqual(Ctag2, C4)
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
assert_messages(QName, 4, 4, Config),
@ -228,14 +230,14 @@ requeue_two_channels(QType, Config) ->
receive {#'basic.deliver'{consumer_tag = C5},
#amqp_msg{payload = <<"1">>}} ->
?assertEqual(Ctag2, C5)
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
DelTag = receive {#'basic.deliver'{consumer_tag = C6,
delivery_tag = D},
#amqp_msg{payload = <<"3">>}} ->
?assertEqual(Ctag2, C6),
D
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
assert_messages(QName, 4, 4, Config),

View File

@ -16,6 +16,8 @@
-import(rabbit_ct_helpers, [eventually/1]).
-define(TIMEOUT, 30_000).
all() ->
[
{group, cluster_size_1},
@ -116,7 +118,7 @@ trace(Config) ->
correlation_id = CorrelationId},
payload = RequestPayload}),
receive #'basic.ack'{} -> ok
after 5000 -> ct:fail(confirm_timeout)
after ?TIMEOUT -> ct:fail(confirm_timeout)
end,
%% Receive the request.
@ -138,7 +140,7 @@ trace(Config) ->
#amqp_msg{payload = ReplyPayload,
props = #'P_basic'{correlation_id = CorrelationId}}} ->
ok
after 5000 -> ct:fail(missing_reply)
after ?TIMEOUT -> ct:fail(missing_reply)
end,
%% 2 messages should have entered RabbitMQ:
@ -217,7 +219,7 @@ rpc(RequesterNode, ResponderNode, Config) ->
correlation_id = CorrelationId},
payload = RequestPayload}),
receive #'basic.ack'{} -> ok
after 5000 -> ct:fail(confirm_timeout)
after ?TIMEOUT -> ct:fail(confirm_timeout)
end,
ok = wait_for_queue_declared(RequestQueue, ResponderNode, Config),
@ -239,7 +241,7 @@ rpc(RequesterNode, ResponderNode, Config) ->
#amqp_msg{payload = ReplyPayload,
props = #'P_basic'{correlation_id = CorrelationId}}} ->
ok
after 5000 -> ct:fail(missing_reply)
after ?TIMEOUT -> ct:fail(missing_reply)
end.
wait_for_queue_declared(Queue, Node, Config) ->

View File

@ -1052,7 +1052,7 @@ bq_queue_recover1(Config) ->
exit(QPid, kill),
MRef = erlang:monitor(process, QPid),
receive {'DOWN', MRef, process, QPid, _Info} -> ok
after 10000 -> exit(timeout_waiting_for_queue_death)
after ?TIMEOUT -> exit(timeout_waiting_for_queue_death)
end,
rabbit_amqqueue:stop(?VHOST),
{Recovered, []} = rabbit_amqqueue:recover(?VHOST),

View File

@ -362,7 +362,7 @@ subscribe(Ch, QName) ->
receive
#'basic.consume_ok'{consumer_tag = CTag} ->
ok
after 10000 ->
after 30000 ->
exit(consume_ok_timeout)
end.
@ -372,6 +372,6 @@ consume(N) ->
receive
{#'basic.deliver'{consumer_tag = <<"ctag">>}, _} ->
consume(N - 1)
after 10000 ->
after 30000 ->
exit(deliver_timeout)
end.

View File

@ -6,6 +6,8 @@
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-compile(export_all).
-define(TIMEOUT, 30_000).
all() ->
[
{group, parallel_tests}
@ -132,7 +134,7 @@ dead_queue_rejects(Config) ->
receive
{'basic.ack',_,_} -> ok
after 10000 ->
after ?TIMEOUT ->
error(timeout_waiting_for_initial_ack)
end,
@ -191,7 +193,7 @@ kill_queue_expect_nack(Config, Ch, QueueName, BasicPublish, AmqpMsg, Tries) ->
ok;
{'basic.ack',_,_} ->
retry
after 10000 ->
after ?TIMEOUT ->
error({timeout_waiting_for_nack, process_info(self(), messages)})
end,
case R of
@ -343,19 +345,19 @@ consume_all_messages(Ch, QueueName, Msgs) ->
assert_ack() ->
receive {'basic.ack', _, _} -> ok
after 10000 -> error(timeout_waiting_for_ack)
after ?TIMEOUT -> error(timeout_waiting_for_ack)
end,
clean_acks_mailbox().
assert_nack() ->
receive {'basic.nack', _, _, _} -> ok
after 10000 -> error(timeout_waiting_for_nack)
after ?TIMEOUT -> error(timeout_waiting_for_nack)
end,
clean_acks_mailbox().
assert_acks(N) ->
receive {'basic.ack', N, _} -> ok
after 10000 -> error({timeout_waiting_for_ack, N})
after ?TIMEOUT -> error({timeout_waiting_for_ack, N})
end,
clean_acks_mailbox().

View File

@ -14,7 +14,9 @@
-compile(export_all).
-define(CONSUMER_TIMEOUT, 2000).
-define(RECEIVE_TIMEOUT, ?CONSUMER_TIMEOUT * 2).
%% Sometimes CI machines are really slow,
%% expecting CONSUMER_TIMEOUT*2 might not be enough
-define(RECEIVE_TIMEOUT, 30_000).
-define(GROUP_CONFIG,
#{global_consumer_timeout => [{rabbit, [{consumer_timeout, ?CONSUMER_TIMEOUT}]},
@ -147,7 +149,7 @@ consumer_timeout(Config) ->
{'DOWN', _, process, Conn, _} ->
flush(1),
exit(unexpected_connection_exit)
after 2000 ->
after ?RECEIVE_TIMEOUT ->
ok
end,
rabbit_ct_client_helpers:close_channel(Ch),
@ -172,7 +174,7 @@ consumer_timeout_basic_get(Config) ->
{'DOWN', _, process, Conn, _} ->
flush(1),
exit(unexpected_connection_exit)
after 2000 ->
after ?RECEIVE_TIMEOUT ->
ok
end,
ok.
@ -221,7 +223,7 @@ consumer_timeout_no_basic_cancel_capability(Config) ->
{'DOWN', _, process, Conn, _} ->
flush(1),
exit(unexpected_connection_exit)
after 2000 ->
after ?RECEIVE_TIMEOUT ->
ok
end,
ok.

View File

@ -17,6 +17,8 @@
-import(queue_utils, [wait_for_messages/2]).
-define(TIMEOUT, 30_000).
all() ->
[
{group, tests}
@ -455,7 +457,7 @@ dead_letter_reject_many(Config) ->
[begin
receive {#'basic.deliver'{consumer_tag = CTag, delivery_tag = DTag}, #amqp_msg{payload = P}} ->
amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag, requeue = false})
after 5000 ->
after ?TIMEOUT ->
amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}),
exit(timeout)
end
@ -628,7 +630,7 @@ dead_letter_nack_requeue_nack_norequeue_basic_consume(Config) ->
consumer_tag = Ctag1},
self()),
receive #'basic.consume_ok'{consumer_tag = Ctag1} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
Ctag2 = <<"ctag 2">>,
@ -637,20 +639,20 @@ dead_letter_nack_requeue_nack_norequeue_basic_consume(Config) ->
consumer_tag = Ctag2},
self()),
receive #'basic.consume_ok'{consumer_tag = Ctag2} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
receive {#'basic.deliver'{},
#amqp_msg{payload = <<"m1">>}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
D2 = receive {#'basic.deliver'{delivery_tag = Del2},
#amqp_msg{payload = <<"m2">>}} -> Del2
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
receive {#'basic.deliver'{},
#amqp_msg{payload = <<"m3">>}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
@ -663,13 +665,13 @@ dead_letter_nack_requeue_nack_norequeue_basic_consume(Config) ->
receive {#'basic.deliver'{},
#amqp_msg{payload = P1a}} ->
?assertEqual(<<"m1">>, P1a)
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
D5 = receive {#'basic.deliver'{delivery_tag = Del5},
#amqp_msg{payload = P2a}} ->
?assertEqual(<<"m2">>, P2a),
Del5
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
%% Nack all 3 without requeue
@ -681,18 +683,18 @@ dead_letter_nack_requeue_nack_norequeue_basic_consume(Config) ->
receive {#'basic.deliver'{},
#amqp_msg{payload = P3b}} ->
?assertEqual(<<"m3">>, P3b)
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
receive {#'basic.deliver'{},
#amqp_msg{payload = P1b}} ->
?assertEqual(<<"m1">>, P1b)
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
LastD = receive {#'basic.deliver'{delivery_tag = LastDel},
#amqp_msg{payload = P2b}} ->
?assertEqual(<<"m2">>, P2b),
LastDel
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
wait_for_messages(Config, [[DLXQName, <<"3">>, <<"0">>, <<"3">>]]),
@ -1726,7 +1728,7 @@ metric_rejected(Config) ->
[begin
receive {#'basic.deliver'{consumer_tag = CTag, delivery_tag = DTag}, #amqp_msg{payload = P}} ->
amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag, requeue = false})
after 5000 ->
after ?TIMEOUT ->
amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}),
exit(timeout)
end
@ -1811,7 +1813,7 @@ stream(Config) ->
self()),
receive
#'basic.consume_ok'{consumer_tag = Ctag} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
end,
Headers = receive {#'basic.deliver'{delivery_tag = DeliveryTag},

View File

@ -421,7 +421,7 @@ assert_confirm() ->
receive
#'basic.ack'{} ->
ok
after 5000 ->
after 30_000 ->
throw(missing_confirm)
end.
@ -429,7 +429,7 @@ assert_return() ->
receive
{#'basic.return'{}, _} ->
ok
after 5000 ->
after 30_000 ->
throw(missing_return)
end.

View File

@ -88,7 +88,7 @@ disconnect_detected_during_alarm(Config) ->
% Check that connection was indeed blocked
#'connection.blocked'{} -> ok
after
1000 -> exit(connection_was_not_blocked)
30_000 -> exit(connection_was_not_blocked)
end,
%% Connection is blocked, now we should forcefully kill it

View File

@ -482,7 +482,7 @@ join_khepri_while_in_minority(Config) ->
receive
#'basic.consume_ok'{consumer_tag = CTag} ->
ok
after 10000 ->
after 30_000 ->
exit(consume_ok_timeout)
end,
@ -490,7 +490,7 @@ join_khepri_while_in_minority(Config) ->
receive
{#'basic.deliver'{consumer_tag = <<"ctag">>}, _} ->
ok
after 10000 ->
after 30_000 ->
exit(deliver_timeout)
end,

View File

@ -294,7 +294,7 @@ test_startup_failure(Fail, Group) ->
receive
{'EXIT', _, shutdown} ->
ok
after 1000 ->
after 30_000 ->
exit({did_not_exit, Fail})
end,
process_flag(trap_exit, false),

View File

@ -81,7 +81,7 @@ message_size(Config) ->
Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address),
receive {amqp10_event, {link, Sender, credited}} -> ok
after 5000 -> ct:fail(credited_timeout)
after 30_000 -> ct:fail(credited_timeout)
end,
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)),
@ -149,6 +149,6 @@ wait_for_settlement(State, Tag) ->
receive
{amqp10_disposition, {State, Tag}} ->
ok
after 5000 ->
ct:fail({disposition_timeout, Tag})
after 30_000 ->
ct:fail({disposition_timeout, Tag})
end.

View File

@ -155,7 +155,7 @@ confirm_ack(Config) ->
receive
#'basic.ack'{delivery_tag = 1} ->
ok
after 5000 ->
after ?TIMEOUT ->
throw(missing_ack)
end.
@ -196,13 +196,13 @@ confirm_mandatory_unroutable(Config) ->
receive
{#'basic.return'{}, _} ->
ok
after 5000 ->
after ?TIMEOUT ->
throw(missing_return)
end,
receive
#'basic.ack'{delivery_tag = 1} ->
ok
after 5000 ->
after ?TIMEOUT ->
throw(missing_ack)
end.
@ -218,7 +218,7 @@ confirm_unroutable_message(Config) ->
throw(unexpected_basic_return);
#'basic.ack'{delivery_tag = 1} ->
ok
after 5000 ->
after ?TIMEOUT ->
throw(missing_ack)
end.
@ -333,6 +333,6 @@ receive_many(DTags) ->
receive_many(DTags -- lists:seq(1, DTag));
#'basic.ack'{delivery_tag = DTag, multiple = false} ->
receive_many(DTags -- [DTag])
after 5000 ->
after ?TIMEOUT ->
throw(missing_ack)
end.

View File

@ -306,19 +306,19 @@ check_max_length_rejects(QName, Ch, Payload1, Payload2, Payload3) ->
%% First message can be enqueued and acks
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
receive #'basic.ack'{} -> ok
after 1000 -> error(expected_ack)
after 30_000 -> error(expected_ack)
end,
%% The message cannot be enqueued and nacks
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
receive #'basic.nack'{} -> ok
after 1000 -> error(expected_nack)
after 30_000 -> error(expected_nack)
end,
%% The message cannot be enqueued and nacks
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
receive #'basic.nack'{} -> ok
after 1000 -> error(expected_nack)
after 30_000 -> error(expected_nack)
end,
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
@ -326,7 +326,7 @@ check_max_length_rejects(QName, Ch, Payload1, Payload2, Payload3) ->
%% Now we can publish message 2.
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
receive #'basic.ack'{} -> ok
after 1000 -> error(expected_ack)
after 30_000 -> error(expected_ack)
end,
{#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).

View File

@ -538,7 +538,7 @@ basic_cancel(Config) ->
wait_for_messages(Config, QName, 3, 2, 1),
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
wait_for_messages(Config, QName, 2, 2, 0)
after 5000 ->
after 30_000 ->
exit(basic_deliver_timeout)
end,
rabbit_ct_client_helpers:close_channel(Ch),
@ -667,7 +667,7 @@ cc_header_non_array_should_close_channel(Config) ->
receive
{'DOWN', Ref, process, Ch, {shutdown, {server_initiated_close, 406, _}}} ->
ok
after 5000 ->
after 30_000 ->
exit(channel_closed_timeout)
end,

View File

@ -6,6 +6,8 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-define(TIMEOUT, 30_000).
%%%===================================================================
%%% Common Test callbacks
%%%===================================================================
@ -132,7 +134,7 @@ smoke(Config) ->
redelivered = false},
#amqp_msg{}} ->
basic_ack(Ch, DeliveryTag)
after 5000 ->
after ?TIMEOUT ->
flush(),
exit(basic_deliver_timeout)
end,
@ -151,7 +153,7 @@ smoke(Config) ->
#amqp_msg{}} ->
basic_cancel(Ch, ConsumerTag2),
basic_nack(Ch, T)
after 5000 ->
after ?TIMEOUT ->
exit(basic_deliver_timeout)
end,
%% get and ack
@ -255,7 +257,7 @@ stream(Config) ->
redelivered = false},
#amqp_msg{}} ->
basic_ack(SubCh, T)
after 5000 ->
after ?TIMEOUT ->
exit(basic_deliver_timeout)
end
catch
@ -294,7 +296,7 @@ publish_and_confirm(Ch, Queue, Msg) ->
ok = receive
#'basic.ack'{} -> ok;
#'basic.nack'{} -> fail
after 2500 ->
after ?TIMEOUT ->
flush(),
exit(confirm_timeout)
end.
@ -318,7 +320,7 @@ subscribe(Ch, Queue, CTag) ->
receive
#'basic.consume_ok'{consumer_tag = CTag} ->
ok
after 5000 ->
after ?TIMEOUT ->
exit(basic_consume_timeout)
end.

View File

@ -28,6 +28,7 @@
-define(NET_TICKTIME_S, 5).
-define(DEFAULT_AWAIT, 10_000).
-define(TIMEOUT, 30_000).
suite() ->
[{timetrap, 5 * 60_000}].
@ -604,7 +605,7 @@ start_queue_concurrent(Config) ->
[begin
receive {done, Server} -> ok
after 10000 -> exit({await_done_timeout, Server})
after ?TIMEOUT -> exit({await_done_timeout, Server})
end
end || Server <- Servers],
@ -1077,7 +1078,7 @@ single_active_consumer_priority_take_over(Config) ->
delivery_tag = DeliveryTag}, _} ->
amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = false})
after 5000 ->
after ?TIMEOUT ->
flush(1),
exit(basic_deliver_timeout)
end,
@ -2407,7 +2408,7 @@ confirm_availability_on_leader_change(Config) ->
{'EXIT', Publisher, Err} ->
ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
exit(Err)
after 30000 ->
after ?TIMEOUT ->
ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
flush(100),
exit(nothing_received_from_publisher_process)
@ -2872,7 +2873,7 @@ subscribe_redelivery_count(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = true})
after 5000 ->
after ?TIMEOUT ->
exit(basic_deliver_timeout)
end,
@ -2885,7 +2886,7 @@ subscribe_redelivery_count(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
multiple = false,
requeue = true})
after 5000 ->
after ?TIMEOUT ->
flush(1),
exit(basic_deliver_timeout_2)
end,
@ -2901,7 +2902,7 @@ subscribe_redelivery_count(Config) ->
wait_for_messages_ready(Servers, RaName, 0),
ct:pal("wait_for_messages_pending_ack", []),
wait_for_messages_pending_ack(Servers, RaName, 0)
after 5000 ->
after ?TIMEOUT ->
flush(500),
exit(basic_deliver_timeout_3)
end.
@ -2986,7 +2987,7 @@ subscribe_redelivery_limit_disable(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2,
multiple = false,
requeue = true})
after 5000 ->
after ?TIMEOUT ->
flush(1),
ct:fail("message did not arrive as expected")
end,
@ -3632,7 +3633,7 @@ receive_and_ack(Ch) ->
redelivered = false}, _} ->
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = false})
after 5000 ->
after ?TIMEOUT ->
ct:fail("receive_and_ack timed out", [])
end.
@ -3733,7 +3734,7 @@ per_message_ttl_mixed_expiry(Config) ->
#amqp_msg{payload = Msg1}} ->
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = false})
after 2000 ->
after ?TIMEOUT ->
flush(10),
ct:fail("basic deliver timeout")
end,
@ -3761,7 +3762,7 @@ per_message_ttl_expiration_too_high(Config) ->
{'DOWN', MonitorRef, process, Ch,
{shutdown, {server_initiated_close, 406, <<"PRECONDITION_FAILED - invalid expiration", _/binary>>}}} ->
ok
after 1000 ->
after ?TIMEOUT ->
ct:fail("expected channel error")
end.
@ -3883,7 +3884,7 @@ consumer_priorities(Config) ->
{#'basic.deliver'{delivery_tag = D1,
consumer_tag = Tag2}, _} ->
D1
after 5000 ->
after ?TIMEOUT ->
flush(100),
ct:fail("basic.deliver timeout")
end,
@ -3893,7 +3894,7 @@ consumer_priorities(Config) ->
{#'basic.deliver'{delivery_tag = _,
consumer_tag = Tag2}, _} ->
ok
after 5000 ->
after ?TIMEOUT ->
flush(100),
ct:fail("basic.deliver timeout")
end,
@ -3904,7 +3905,7 @@ consumer_priorities(Config) ->
{#'basic.deliver'{delivery_tag = _,
consumer_tag = Tag1}, _} ->
ok
after 5000 ->
after ?TIMEOUT ->
flush(100),
ct:fail("basic.deliver timeout")
end,
@ -3917,7 +3918,7 @@ consumer_priorities(Config) ->
{#'basic.deliver'{delivery_tag = _,
consumer_tag = Tag2}, _} ->
ok
after 5000 ->
after ?TIMEOUT ->
flush(100),
ct:fail("basic.deliver timeout")
end,
@ -3945,7 +3946,7 @@ cancel_consumer_gh_3729(Config) ->
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
R = #'basic.reject'{delivery_tag = DeliveryTag, requeue = true},
ok = amqp_channel:cast(Ch, R)
after 5000 ->
after ?TIMEOUT ->
flush(100),
ct:fail("basic.deliver timeout")
end,
@ -3954,7 +3955,7 @@ cancel_consumer_gh_3729(Config) ->
receive
#'basic.cancel_ok'{consumer_tag = <<"ctag">>} -> ok
after 5000 ->
after ?TIMEOUT ->
flush(100),
ct:fail("basic.cancel_ok timeout")
end,
@ -3990,7 +3991,7 @@ cancel_consumer_gh_12424(Config) ->
DeliveryTag = receive
{#'basic.deliver'{delivery_tag = DT}, _} ->
DT
after 5000 ->
after ?TIMEOUT ->
flush(100),
ct:fail("basic.deliver timeout")
end,
@ -4023,7 +4024,7 @@ cancel_and_consume_with_same_tag(Config) ->
{#'basic.deliver'{delivery_tag = D},
#amqp_msg{payload = <<"msg1">>}} ->
D
after 5000 ->
after ?TIMEOUT ->
flush(100),
ct:fail("basic.deliver timeout")
end,
@ -4038,7 +4039,7 @@ cancel_and_consume_with_same_tag(Config) ->
{#'basic.deliver'{delivery_tag = _},
#amqp_msg{payload = <<"msg2">>}} ->
ok
after 5000 ->
after ?TIMEOUT ->
flush(100),
ct:fail("basic.deliver timeout 2")
end,
@ -4290,7 +4291,7 @@ requeue_multiple_true(Config) ->
#amqp_msg{payload = P0}} ->
?assertEqual(P, P0),
D
after 5000 -> ct:fail({basic_deliver_timeout, P, ?LINE})
after ?TIMEOUT -> ct:fail({basic_deliver_timeout, P, ?LINE})
end || P <- Payloads],
%% Requeue all messages.
@ -4303,7 +4304,7 @@ requeue_multiple_true(Config) ->
[receive {#'basic.deliver'{redelivered = true},
#amqp_msg{payload = P1}} ->
?assertEqual(P, P1)
after 5000 -> ct:fail({basic_deliver_timeout, P, ?LINE})
after ?TIMEOUT -> ct:fail({basic_deliver_timeout, P, ?LINE})
end || P <- Payloads],
?assertEqual(#'queue.delete_ok'{message_count = 0},
@ -4328,7 +4329,7 @@ requeue_multiple_false(Config) ->
#amqp_msg{payload = P0}} ->
?assertEqual(P, P0),
D
after 5000 -> ct:fail({basic_deliver_timeout, P, ?LINE})
after ?TIMEOUT -> ct:fail({basic_deliver_timeout, P, ?LINE})
end || P <- Payloads],
%% The delivery tags we received via AMQP 0.9.1 are ordered from 1-100.
@ -4347,7 +4348,7 @@ requeue_multiple_false(Config) ->
[receive {#'basic.deliver'{redelivered = true},
#amqp_msg{payload = P1}} ->
?assertEqual(integer_to_binary(D), P1)
after 5000 -> ct:fail({basic_deliver_timeout, ?LINE})
after ?TIMEOUT -> ct:fail({basic_deliver_timeout, ?LINE})
end || D <- DTagsShuffled],
?assertEqual(#'queue.delete_ok'{message_count = 0},
@ -4474,7 +4475,7 @@ subscribe(Ch, Queue, NoAck, Tag, Args) ->
receive
#'basic.consume_ok'{consumer_tag = Tag} ->
ok
after 30000 ->
after ?TIMEOUT ->
flush(100),
exit(subscribe_timeout)
end.
@ -4499,7 +4500,7 @@ nack(Ch, Multiple, Requeue) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = Multiple,
requeue = Requeue})
after 5000 ->
after ?TIMEOUT ->
flush(10),
ct:fail("basic deliver timeout")
end.
@ -4572,7 +4573,7 @@ validate_queue(Ch, Queue, ExpectedMsgs) ->
#amqp_msg{payload = M}} ->
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
multiple = false})
after 5000 ->
after ?TIMEOUT ->
flush(10),
exit({validate_queue_timeout, M})
end

View File

@ -482,6 +482,6 @@ receive_delete_events(N, Evts) ->
receive
{mnesia_table_event, {delete, _, Record, _, _}} ->
receive_delete_events(N - 1, [Record | Evts])
after 10000 ->
after 30000 ->
Evts
end.

View File

@ -755,7 +755,7 @@ publish_confirm(Ch, QName) ->
ok;
#'basic.nack'{} ->
fail
after 2500 ->
after 30_000 ->
ct:fail(confirm_timeout)
end.
@ -871,14 +871,14 @@ many_target_queues(Config) ->
receive
#'basic.consume_ok'{consumer_tag = CTag} ->
ok
after 2000 ->
after 30_000 ->
exit(consume_ok_timeout)
end,
receive
{#'basic.deliver'{consumer_tag = CTag},
#amqp_msg{payload = Msg1}} ->
ok
after 2000 ->
after 30_000 ->
exit(deliver_timeout)
end,
?awaitMatch([{0, 0}],
@ -911,7 +911,7 @@ many_target_queues(Config) ->
{#'basic.deliver'{consumer_tag = CTag},
#amqp_msg{payload = Msg2}} ->
ok
after 0 ->
after 30_000 ->
exit(deliver_timeout)
end,
?assertEqual(2, counted(messages_dead_lettered_expired_total, Config)),

View File

@ -10,8 +10,9 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").
-define(RA_EVENT_TIMEOUT, 5000).
-define(RA_EVENT_TIMEOUT, 30_000).
-define(RA_SYSTEM, quorum_queues).
-define(TIMEOUT, 30_000).
all() ->
[
@ -118,7 +119,7 @@ basics(Config) ->
{ok, S, _} ->
DeliverFun(S, F)
end
after 5000 ->
after ?TIMEOUT ->
flush(),
exit(await_delivery_timeout)
end
@ -136,7 +137,7 @@ basics(Config) ->
ct:pal("ra_event ~p", [Evt]),
{ok, F6, _} = rabbit_fifo_client:handle_ra_event(ClusterName, From, Evt, FState5),
F6
after 5000 ->
after ?TIMEOUT ->
exit(leader_change_timeout)
end,
@ -175,7 +176,7 @@ rabbit_fifo_returns_correlation(Config) ->
Del ->
exit({unexpected, Del})
end
after 2000 ->
after ?TIMEOUT ->
exit(await_msg_timeout)
end,
rabbit_quorum_queue:stop_server(ServerId),
@ -208,7 +209,7 @@ duplicate_delivery(Config) ->
end
end
end
after 2000 ->
after ?TIMEOUT ->
exit(await_msg_timeout)
end
end,
@ -281,7 +282,7 @@ detects_lost_delivery(Config) ->
receive
{ra_event, _, {machine, {delivery, _, [{_, {_, msg1}}]}}} ->
ok
after 5000 ->
after ?TIMEOUT ->
exit(await_delivery_timeout)
end,
@ -316,7 +317,7 @@ returns(Config) ->
?assertEqual(undefined, mc:get_annotation(<<"x-delivery-count">>, Msg1Out0)),
?assertEqual(undefined, mc:get_annotation(delivery_count, Msg1Out0)),
rabbit_fifo_client:return(<<"tag">>, [MsgId], FC2)
after 5000 ->
after ?TIMEOUT ->
flush(),
exit(await_delivery_timeout)
end,
@ -333,7 +334,7 @@ returns(Config) ->
%% delivery_count should _not_ be incremented for a return
?assertEqual(undefined, mc:get_annotation(delivery_count, Msg1Out)),
rabbit_fifo_client:modify(<<"tag">>, [MsgId1], true, false, #{}, FC4)
after 5000 ->
after ?TIMEOUT ->
flush(),
exit(await_delivery_timeout_2)
end,
@ -349,7 +350,7 @@ returns(Config) ->
%% delivery_count should be incremented for a modify with delivery_failed = true
?assertEqual(1, mc:get_annotation(delivery_count, Msg2Out)),
rabbit_fifo_client:settle(<<"tag">>, [MsgId2], FC6)
after 5000 ->
after ?TIMEOUT ->
flush(),
exit(await_delivery_timeout_3)
end,
@ -377,7 +378,7 @@ returns_after_down(Config) ->
receive
{'DOWN', MonRef, _, _, _} ->
ok
after 5000 ->
after ?TIMEOUT ->
ct:fail("waiting for process exit timed out")
end,
rabbit_ct_helpers:await_condition(
@ -411,7 +412,7 @@ resends_after_lost_applied(Config) ->
receive
{ra_event, _, {applied, _}} ->
ok
after 500 ->
after ?TIMEOUT ->
exit(await_ra_event_timeout)
end,
% send another message
@ -477,7 +478,7 @@ discard(Config) ->
[msg1] = Letters,
rejected = Reason,
ok
after 500 ->
after ?TIMEOUT ->
flush(),
exit(dead_letter_timeout)
end,
@ -571,7 +572,7 @@ lost_delivery(Config) ->
{ra_event, _, Evt} ->
ct:pal("dropping event ~tp", [Evt]),
ok
after 500 ->
after ?TIMEOUT ->
exit(await_ra_event_timeout)
end,
% send another message
@ -744,7 +745,7 @@ test_queries(Config) ->
end),
receive
ready -> ok
after 5000 ->
after ?TIMEOUT ->
exit(ready_timeout)
end,
F0 = rabbit_fifo_client:init([ServerId], 4),
@ -820,7 +821,7 @@ receive_ra_events(Applied, Deliveries, Acc) ->
receive_ra_events(Applied, Deliveries - length(MsgIds), [Evt | Acc]);
{ra_event, _, _} = Evt ->
receive_ra_events(Applied, Deliveries, [Evt | Acc])
after 5000 ->
after ?TIMEOUT ->
exit({missing_events, Applied, Deliveries, Acc})
end.
@ -832,7 +833,7 @@ receive_ra_events(Acc) ->
receive
{ra_event, _, _} = Evt ->
receive_ra_events([Evt | Acc])
after 500 ->
after 1000 ->
Acc
end.
@ -892,7 +893,7 @@ flush() ->
Msg ->
ct:pal("flushed: ~w~n", [Msg]),
flush()
after 10 ->
after 100 ->
ok
end.

View File

@ -135,7 +135,7 @@ publish_expect_return(Config, E, Node) ->
receive
{#'basic.return'{}, _} ->
ok
after 5000 ->
after 30_000 ->
flush(100),
ct:fail("no return received")
end

View File

@ -18,6 +18,7 @@
-import(rabbit_ct_helpers, [await_condition/2]).
-define(WAIT, 5000).
-define(TIMEOUT, 30_000).
suite() ->
[{timetrap, 15 * 60_000}].
@ -1009,7 +1010,7 @@ consume(Config) ->
multiple = false}),
_ = amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}),
ok = amqp_channel:close(Ch1)
after 5000 ->
after ?TIMEOUT ->
ct:fail(timeout)
end,
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
@ -1073,8 +1074,8 @@ consume_timestamp_offset(Config) ->
receive
#'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
ok
after 5000 ->
flush(),
after ?TIMEOUT ->
flush(),
exit(consume_ok_timeout)
end,
@ -1108,7 +1109,7 @@ consume_timestamp_last_offset(Config) ->
receive
#'basic.consume_ok'{consumer_tag = CTag} ->
ok
after 5000 ->
after ?TIMEOUT ->
exit(missing_consume_ok)
end,
@ -1178,7 +1179,7 @@ basic_cancel(Config) ->
{#'basic.deliver'{}, _} ->
amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = CTag}),
?assertMatch([], filter_consumers(Config, Server, CTag))
after 10000 ->
after ?TIMEOUT ->
exit(timeout)
end,
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
@ -1203,7 +1204,7 @@ receive_basic_cancel_on_queue_deletion(Config) ->
receive
#'basic.cancel'{consumer_tag = CTag} ->
ok
after 10000 ->
after ?TIMEOUT ->
exit(timeout)
end.
@ -1261,7 +1262,7 @@ keep_consuming_on_leader_restart(Config) ->
{#'basic.deliver'{delivery_tag = DeliveryTag1}, _} ->
ok = amqp_channel:cast(Ch2, #'basic.ack'{delivery_tag = DeliveryTag1,
multiple = false})
after 5000 ->
after ?TIMEOUT ->
exit(timeout)
end,
@ -1275,7 +1276,7 @@ keep_consuming_on_leader_restart(Config) ->
{#'basic.deliver'{delivery_tag = DeliveryTag2}, _} ->
ok = amqp_channel:cast(Ch2, #'basic.ack'{delivery_tag = DeliveryTag2,
multiple = false})
after 5000 ->
after ?TIMEOUT ->
exit(timeout)
end,
@ -1396,7 +1397,7 @@ consume_and_(Config, AckFun) ->
?assertMatch({'queue.declare_ok', Q, _MsgCount, 0},
declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]])
after 5000 ->
after ?TIMEOUT ->
exit(timeout)
end,
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
@ -1588,7 +1589,7 @@ consume_from_next(Config, Args) ->
receive
#'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
ok
after 10000 ->
after ?TIMEOUT ->
exit(consume_ok_failed)
end,
@ -1684,7 +1685,7 @@ consume_while_deleting_replica(Config) ->
ok;
{_, #amqp_msg{}} ->
exit(unexpected_message)
after 30000 ->
after ?TIMEOUT ->
exit(missing_consumer_cancel)
end,
@ -1712,10 +1713,10 @@ consume_credit(Config) ->
%% We expect to receive exactly 2 messages.
DTag1 = receive {#'basic.deliver'{delivery_tag = Tag1}, _} -> Tag1
after 5000 -> ct:fail({missing_delivery, ?LINE})
after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE})
end,
_DTag2 = receive {#'basic.deliver'{delivery_tag = Tag2}, _} -> Tag2
after 5000 -> ct:fail({missing_delivery, ?LINE})
after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE})
end,
receive {#'basic.deliver'{}, _} -> ct:fail({unexpected_delivery, ?LINE})
after 100 -> ok
@ -1725,7 +1726,7 @@ consume_credit(Config) ->
ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DTag1,
multiple = false}),
DTag3 = receive {#'basic.deliver'{delivery_tag = Tag3}, _} -> Tag3
after 5000 -> ct:fail({missing_delivery, ?LINE})
after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE})
end,
receive {#'basic.deliver'{}, _} ->
ct:fail({unexpected_delivery, ?LINE})
@ -1747,11 +1748,11 @@ consume_credit0(Ch, DTagPrev) ->
multiple = true}),
%% Receive 1st message.
receive {#'basic.deliver'{}, _} -> ok
after 5000 -> ct:fail({missing_delivery, ?LINE})
after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE})
end,
%% Receive 2nd message.
DTag = receive {#'basic.deliver'{delivery_tag = T}, _} -> T
after 5000 -> ct:fail({missing_delivery, ?LINE})
after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE})
end,
%% We shouldn't receive more messages given that AMQP 0.9.1 prefetch count is 2.
receive {#'basic.deliver'{}, _} -> ct:fail({unexpected_delivery, ?LINE})
@ -1813,7 +1814,7 @@ consume_credit_out_of_order_ack(Config) ->
receive
{#'basic.deliver'{}, _} ->
ok
after 5000 ->
after ?TIMEOUT ->
exit(timeout)
end,
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
@ -1853,7 +1854,7 @@ consume_credit_multiple_ack(Config) ->
receive
{#'basic.deliver'{}, _} ->
ok
after 5000 ->
after ?TIMEOUT ->
exit(timeout)
end,
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
@ -2066,7 +2067,7 @@ leader_failover_dedupe(Config) ->
N = receive
{last_msg, X} -> X
after 2000 ->
after ?TIMEOUT ->
exit(last_msg_timeout)
end,
%% validate that no duplicates were written even though an internal
@ -2508,7 +2509,7 @@ dead_letter_target(Config) ->
receive
#'basic.consume_ok'{consumer_tag = CTag} ->
ok
after 5000 ->
after ?TIMEOUT ->
exit(basic_consume_ok_timeout)
end,
receive
@ -2517,7 +2518,7 @@ dead_letter_target(Config) ->
requeue =false,
multiple = false}),
queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]])
after 5000 ->
after ?TIMEOUT ->
exit(timeout)
end,
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
@ -2602,7 +2603,7 @@ receive_filtered_batch(Ch, Count, ExpectedSize) ->
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = false}),
receive_filtered_batch(Ch, Count + 1, ExpectedSize)
after 10000 ->
after ?TIMEOUT ->
flush(),
exit({not_enough_messages, Count})
end.

View File

@ -161,6 +161,6 @@ publish_confirm(Ch, QName) ->
#'basic.nack'{} ->
ct:pal("NOT CONFIRMED! ~ts", [QName]),
fail
after 10000 ->
after 30000 ->
exit(confirm_timeout)
end.

View File

@ -179,13 +179,13 @@ fallback_to_another_consumer_when_first_one_is_cancelled_qos1(Config) ->
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q,
consumer_tag = CTag1}, self()),
receive #'basic.consume_ok'{consumer_tag = CTag1} -> ok
after 5000 -> ct:fail(timeout_ctag1)
after ?TIMEOUT -> ct:fail(timeout_ctag1)
end,
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q,
consumer_tag = CTag2}, self()),
receive #'basic.consume_ok'{consumer_tag = CTag2} -> ok
after 5000 -> ct:fail(timeout_ctag2)
after ?TIMEOUT -> ct:fail(timeout_ctag2)
end,
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
@ -195,12 +195,12 @@ fallback_to_another_consumer_when_first_one_is_cancelled_qos1(Config) ->
DTag1 = receive {#'basic.deliver'{consumer_tag = CTag1,
delivery_tag = DTag},
#amqp_msg{payload = <<"m1">>}} -> DTag
after 5000 -> ct:fail(timeout_m1)
after ?TIMEOUT -> ct:fail(timeout_m1)
end,
#'basic.cancel_ok'{consumer_tag = CTag1} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag1}),
receive #'basic.cancel_ok'{consumer_tag = CTag1} -> ok
after 5000 -> ct:fail(missing_cancel)
after ?TIMEOUT -> ct:fail(missing_cancel)
end,
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag1}),
@ -208,7 +208,7 @@ fallback_to_another_consumer_when_first_one_is_cancelled_qos1(Config) ->
receive {#'basic.deliver'{consumer_tag = CTag2},
#amqp_msg{payload = <<"m2">>}} -> ok;
Unexpected -> ct:fail({unexpected, Unexpected})
after 5000 -> ct:fail(timeout_m2)
after ?TIMEOUT -> ct:fail(timeout_m2)
end,
amqp_connection:close(C).
@ -385,7 +385,7 @@ wait_for_messages(ExpectedCount, State) ->
receive
{message, {MessagesPerConsumer, MessageCount}} ->
wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount})
after 5000 ->
after ?TIMEOUT ->
{missing, ExpectedCount, State}
end.
@ -393,7 +393,7 @@ wait_for_cancel_ok() ->
receive
{cancel_ok, CTag} ->
{cancel_ok, CTag}
after 5000 ->
after ?TIMEOUT ->
throw(consumer_cancel_ok_timeout)
end.
@ -402,7 +402,7 @@ receive_deliver() ->
{#'basic.deliver'{consumer_tag = CTag,
delivery_tag = DTag}, _} ->
{CTag, DTag}
after 5000 ->
after ?TIMEOUT ->
exit(deliver_timeout)
end.

View File

@ -111,7 +111,7 @@ amqp_x_cc_annotation(Config) ->
?assertEqual(
<<"write access to topic 'x.1' in exchange 'amq.topic' in vhost '/' refused for user 'guest'">>,
Description1)
after 5000 -> amqp_utils:flush(missing_ended),
after 30_000 -> amqp_utils:flush(missing_ended),
ct:fail({missing_event, ?LINE})
end,
@ -134,7 +134,7 @@ amqp_x_cc_annotation(Config) ->
?assertEqual(
<<"write access to topic 'x.2' in exchange 'amq.topic' in vhost '/' refused for user 'guest'">>,
Description2)
after 5000 -> amqp_utils:flush(missing_ended),
after 30_000 -> amqp_utils:flush(missing_ended),
ct:fail({missing_event, ?LINE})
end,
@ -178,7 +178,7 @@ amqpl_headers(Header, Config) ->
#amqp_msg{payload = <<"m1">>,
props = #'P_basic'{headers = [{Header, array, [{longstr, <<"a.2">>}]}]}}),
receive #'basic.ack'{} -> ok
after 5000 -> ct:fail({missing_confirm, ?LINE})
after 30_000 -> ct:fail({missing_confirm, ?LINE})
end,
monitor(process, Ch1),
@ -412,6 +412,6 @@ assert_channel_down(Ch, Reason) ->
{shutdown,
{server_initiated_close, 403, Reason}}} ->
ok
after 5000 ->
after 30_000 ->
ct:fail({did_not_receive, Reason})
end.

View File

@ -90,7 +90,7 @@ return_after_commit(Config) ->
Result = receive
{#'basic.return'{}, _} ->
return_before_commit
after 1000 ->
after 30_000 ->
return_after_commit
end,
?assertEqual(return_after_commit, Result),

View File

@ -103,7 +103,7 @@ stream(Config) ->
DelTag = receive
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
DeliveryTag
after 5000 ->
after 30_000 ->
ct:fail(timeout)
end,
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DelTag,

View File

@ -173,7 +173,7 @@ file_handle_cache_reserve_open_file_above_limit1(_Config) ->
receive
opened ->
throw(error_file_opened)
after 1000 ->
after 30_000 ->
%% Let's release 5 file handles, that should leave
%% enough free for the `open` to go through
file_handle_cache:set_reservation(2),
@ -183,7 +183,7 @@ file_handle_cache_reserve_open_file_above_limit1(_Config) ->
opened ->
ok = file_handle_cache:set_limit(Limit),
passed
after 5000 ->
after 30_000 ->
throw(error_file_not_released)
end
end.