Merge pull request #12948 from rabbitmq/fix-flakes
Test fixes for a few more CI flakes
This commit is contained in:
commit
62ce1c954a
|
|
@ -22,6 +22,8 @@
|
|||
[flush/1,
|
||||
wait_for_credit/1]).
|
||||
|
||||
-define(TIMEOUT, 30_000).
|
||||
|
||||
all() ->
|
||||
[
|
||||
{group, v1_permitted},
|
||||
|
|
@ -216,7 +218,7 @@ target_exchange_absent(Config) ->
|
|||
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
|
||||
description = {utf8, <<"no exchange '", XName:(byte_size(XName))/binary,
|
||||
"' in vhost '/'">>}}}}} -> ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
Reason = {missing_event, ?LINE},
|
||||
flush(Reason),
|
||||
ct:fail(Reason)
|
||||
|
|
@ -275,7 +277,7 @@ target_queue_absent(Config) ->
|
|||
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
|
||||
description = {utf8, <<"no queue '", QName:(byte_size(QName))/binary,
|
||||
"' in vhost '/'">>}}}}} -> ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
Reason = {missing_event, ?LINE},
|
||||
flush(Reason),
|
||||
ct:fail(Reason)
|
||||
|
|
@ -403,7 +405,7 @@ target_per_message_unset_to_address(Config) ->
|
|||
|
||||
ok = amqp10_client:detach_link(Sender),
|
||||
receive {amqp10_event, {link, Sender, {detached, normal}}} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
ok = amqp10_client:end_session(Session),
|
||||
ok = amqp10_client:close_connection(Connection).
|
||||
|
|
@ -467,7 +469,7 @@ target_per_message_bad_to_address(Config) ->
|
|||
?assertMatch(#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
|
||||
description = {utf8, <<"bad 'to' address", _Rest/binary>>}},
|
||||
Error)
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(missing_disposition),
|
||||
ct:fail(missing_disposition)
|
||||
end
|
||||
|
|
@ -507,7 +509,7 @@ target_per_message_exchange_absent_settled(Config) ->
|
|||
info = {map, [{{symbol, <<"delivery-tag">>}, {binary, DTag2}}]}
|
||||
},
|
||||
Error)
|
||||
after 5000 -> ct:fail("server did not close our outgoing link")
|
||||
after ?TIMEOUT -> ct:fail("server did not close our outgoing link")
|
||||
end,
|
||||
|
||||
ok = cleanup(Init).
|
||||
|
|
@ -566,7 +568,7 @@ target_bad_address0(TargetAddress, Config) ->
|
|||
{session, Session,
|
||||
{ended,
|
||||
#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD}}}} -> ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
Reason = {missing_event, ?LINE, TargetAddress},
|
||||
flush(Reason),
|
||||
ct:fail(Reason)
|
||||
|
|
@ -593,7 +595,7 @@ source_queue_absent(Config) ->
|
|||
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
|
||||
description = {utf8, <<"no queue '", QName:(byte_size(QName))/binary,
|
||||
"' in vhost '/'">>}}}}} -> ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
Reason = {missing_event, ?LINE},
|
||||
flush(Reason),
|
||||
ct:fail(Reason)
|
||||
|
|
@ -626,7 +628,7 @@ source_bad_address0(SourceAddress, Config) ->
|
|||
{session, Session,
|
||||
{ended,
|
||||
#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD}}}} -> ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
Reason = {missing_event, ?LINE},
|
||||
flush(Reason),
|
||||
ct:fail(Reason)
|
||||
|
|
@ -657,7 +659,7 @@ wait_for_settled(State, Tag) ->
|
|||
receive
|
||||
{amqp10_disposition, {State, Tag}} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
Reason = {?FUNCTION_NAME, State, Tag},
|
||||
flush(Reason),
|
||||
ct:fail(Reason)
|
||||
|
|
|
|||
|
|
@ -14,6 +14,8 @@
|
|||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
-include_lib("amqp10_common/include/amqp10_framing.hrl").
|
||||
|
||||
-define(TIMEOUT, 30_000).
|
||||
|
||||
-import(rabbit_ct_broker_helpers,
|
||||
[rpc/4]).
|
||||
-import(rabbit_ct_helpers,
|
||||
|
|
@ -153,8 +155,8 @@ v1_attach_target_queue(Config) ->
|
|||
<<"configure access to queue 'test queue' in vhost "
|
||||
"'test vhost' refused for user 'test user'">>),
|
||||
receive {amqp10_event, {session, Session1, {ended, ExpectedErr1}}} -> ok
|
||||
after 5000 -> flush(missing_ended),
|
||||
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
|
||||
after ?TIMEOUT -> flush(missing_ended),
|
||||
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
|
||||
end,
|
||||
|
||||
%% Give the user configure permissions on the queue.
|
||||
|
|
@ -166,7 +168,7 @@ v1_attach_target_queue(Config) ->
|
|||
<<"write access to exchange 'amq.default' in vhost "
|
||||
"'test vhost' refused for user 'test user'">>),
|
||||
receive {amqp10_event, {session, Session2, {ended, ExpectedErr2}}} -> ok
|
||||
after 5000 -> flush(missing_ended),
|
||||
after ?TIMEOUT -> flush(missing_ended),
|
||||
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
|
||||
end,
|
||||
|
||||
|
|
@ -177,8 +179,8 @@ v1_attach_target_queue(Config) ->
|
|||
{ok, Sender3} = amqp10_client:attach_sender_link(
|
||||
Session3, <<"test-sender-3">>, TargetAddress),
|
||||
receive {amqp10_event, {link, Sender3, attached}} -> ok
|
||||
after 5000 -> flush(missing_attached),
|
||||
ct:fail("missing ATTACH from server")
|
||||
after ?TIMEOUT -> flush(missing_attached),
|
||||
ct:fail("missing ATTACH from server")
|
||||
end,
|
||||
|
||||
ok = close_connection_sync(Connection).
|
||||
|
|
@ -202,8 +204,8 @@ v1_attach_source_exchange(Config) ->
|
|||
#'v1_0.error'{
|
||||
condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
|
||||
description = {utf8, <<"configure access to queue 'amq.gen", _/binary>>}}}}} -> ok
|
||||
after 5000 -> flush(missing_ended),
|
||||
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
|
||||
after ?TIMEOUT -> flush(missing_ended),
|
||||
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
|
||||
end,
|
||||
|
||||
%% Give the user configure permissions on the queue.
|
||||
|
|
@ -218,7 +220,7 @@ v1_attach_source_exchange(Config) ->
|
|||
#'v1_0.error'{
|
||||
condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
|
||||
description = {utf8, <<"write access to queue 'amq.gen", _/binary>>}}}}} -> ok
|
||||
after 5000 -> flush(missing_ended),
|
||||
after ?TIMEOUT -> flush(missing_ended),
|
||||
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
|
||||
end,
|
||||
|
||||
|
|
@ -231,7 +233,7 @@ v1_attach_source_exchange(Config) ->
|
|||
<<"read access to exchange 'amq.fanout' in vhost "
|
||||
"'test vhost' refused for user 'test user'">>),
|
||||
receive {amqp10_event, {session, Session3, {ended, ExpectedErr1}}} -> ok
|
||||
after 5000 -> flush(missing_ended),
|
||||
after ?TIMEOUT -> flush(missing_ended),
|
||||
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
|
||||
end,
|
||||
|
||||
|
|
@ -247,7 +249,7 @@ v1_attach_source_exchange(Config) ->
|
|||
#'v1_0.error'{
|
||||
condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
|
||||
description = {utf8, <<"read access to queue 'amq.gen", _/binary>>}}}}} -> ok
|
||||
after 5000 -> flush(missing_ended),
|
||||
after ?TIMEOUT -> flush(missing_ended),
|
||||
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
|
||||
end,
|
||||
|
||||
|
|
@ -258,7 +260,7 @@ v1_attach_source_exchange(Config) ->
|
|||
{ok, Recv5} = amqp10_client:attach_receiver_link(
|
||||
Session5, <<"receiver-5">>, SourceAddress),
|
||||
receive {amqp10_event, {link, Recv5, attached}} -> ok
|
||||
after 5000 -> flush(missing_attached),
|
||||
after ?TIMEOUT -> flush(missing_attached),
|
||||
ct:fail("missing ATTACH from server")
|
||||
end,
|
||||
|
||||
|
|
@ -290,7 +292,7 @@ send_to_topic(TargetAddress, Config) ->
|
|||
<<"write access to topic 'test vhost.test user.a.b' in exchange "
|
||||
"'amq.topic' in vhost 'test vhost' refused for user 'test user'">>),
|
||||
receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok
|
||||
after 5000 -> flush(missing_ended),
|
||||
after ?TIMEOUT -> flush(missing_ended),
|
||||
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
|
||||
end,
|
||||
|
||||
|
|
@ -304,7 +306,7 @@ send_to_topic(TargetAddress, Config) ->
|
|||
ok = amqp10_client:send_msg(Sender2, Msg2),
|
||||
%% We expect RELEASED since no queue is bound.
|
||||
receive {amqp10_disposition, {released, Dtag}} -> ok
|
||||
after 5000 -> ct:fail(released_timeout)
|
||||
after ?TIMEOUT -> ct:fail(released_timeout)
|
||||
end,
|
||||
|
||||
ok = amqp10_client:detach_link(Sender2),
|
||||
|
|
@ -330,7 +332,7 @@ v1_send_to_topic_using_subject(Config) ->
|
|||
ok = amqp10_client:send_msg(Sender, Msg1b),
|
||||
%% We have sufficient authorization, but expect RELEASED since no queue is bound.
|
||||
receive {amqp10_disposition, {released, Dtag1}} -> ok
|
||||
after 5000 -> ct:fail(released_timeout)
|
||||
after ?TIMEOUT -> ct:fail(released_timeout)
|
||||
end,
|
||||
|
||||
Dtag2 = <<"dtag 2">>,
|
||||
|
|
@ -342,7 +344,7 @@ v1_send_to_topic_using_subject(Config) ->
|
|||
<<"write access to topic '.a.b' in exchange 'amq.topic' in "
|
||||
"vhost 'test vhost' refused for user 'test user'">>),
|
||||
receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok
|
||||
after 5000 -> flush(missing_ended),
|
||||
after ?TIMEOUT -> flush(missing_ended),
|
||||
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
|
||||
end,
|
||||
|
||||
|
|
@ -374,7 +376,7 @@ attach_source_topic0(SourceAddress, Config) ->
|
|||
<<"read access to topic 'test vhost.test user.a.b' in exchange "
|
||||
"'amq.topic' in vhost 'test vhost' refused for user 'test user'">>),
|
||||
receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok
|
||||
after 5000 -> flush(missing_ended),
|
||||
after ?TIMEOUT -> flush(missing_ended),
|
||||
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
|
||||
end,
|
||||
|
||||
|
|
@ -383,7 +385,7 @@ attach_source_topic0(SourceAddress, Config) ->
|
|||
{ok, Recv2} = amqp10_client:attach_receiver_link(
|
||||
Session2, <<"receiver-2">>, SourceAddress),
|
||||
receive {amqp10_event, {link, Recv2, attached}} -> ok
|
||||
after 5000 -> flush(missing_attached),
|
||||
after ?TIMEOUT -> flush(missing_attached),
|
||||
ct:fail("missing ATTACH from server")
|
||||
end,
|
||||
|
||||
|
|
@ -405,7 +407,7 @@ v1_attach_target_internal_exchange(Config) ->
|
|||
ExpectedErr = error_unauthorized(
|
||||
<<"forbidden to publish to internal exchange 'test exchange' in vhost '/'">>),
|
||||
receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok
|
||||
after 5000 -> flush(missing_ended),
|
||||
after ?TIMEOUT -> flush(missing_ended),
|
||||
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
|
||||
end,
|
||||
|
||||
|
|
@ -429,7 +431,7 @@ attach_source_queue(Config) ->
|
|||
receive {amqp10_event,
|
||||
{session, Session,
|
||||
{ended, ExpectedErr}}} -> ok
|
||||
after 5000 -> flush(missing_ended),
|
||||
after ?TIMEOUT -> flush(missing_ended),
|
||||
ct:fail("did not receive AMQP_ERROR_UNAUTHORIZED_ACCESS")
|
||||
end,
|
||||
ok = close_connection_sync(Conn).
|
||||
|
|
@ -448,13 +450,13 @@ attach_target_exchange(Config) ->
|
|||
<<"write access to exchange '", XName/binary,
|
||||
"' in vhost 'test vhost' refused for user 'test user'">>),
|
||||
receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
{ok, Session2} = amqp10_client:begin_session_sync(Connection),
|
||||
{ok, _} = amqp10_client:attach_sender_link(Session2, <<"test-sender">>, Address2),
|
||||
receive {amqp10_event, {session, Session2, {ended, ExpectedErr}}} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
ok = amqp10_client:close_connection(Connection).
|
||||
|
|
@ -478,7 +480,7 @@ attach_target_queue(Config) ->
|
|||
<<"write access to exchange 'amq.default' ",
|
||||
"in vhost 'test vhost' refused for user 'test user'">>),
|
||||
receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
ok = amqp10_client:close_connection(Conn).
|
||||
|
||||
|
|
@ -500,7 +502,7 @@ target_per_message_exchange(Config) ->
|
|||
Msg1 = amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, <<"m1">>)),
|
||||
ok = amqp10_client:send_msg(Sender, Msg1),
|
||||
receive {amqp10_disposition, {released, Tag1}} -> ok
|
||||
after 5000 -> ct:fail(released_timeout)
|
||||
after ?TIMEOUT -> ct:fail(released_timeout)
|
||||
end,
|
||||
|
||||
%% We don't have sufficient authorization.
|
||||
|
|
@ -511,7 +513,7 @@ target_per_message_exchange(Config) ->
|
|||
<<"write access to exchange 'amq.default' in "
|
||||
"vhost 'test vhost' refused for user 'test user'">>),
|
||||
receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
ok = close_connection_sync(Connection).
|
||||
|
|
@ -534,7 +536,7 @@ target_per_message_internal_exchange(Config) ->
|
|||
ExpectedErr = error_unauthorized(
|
||||
<<"forbidden to publish to internal exchange '", XName/binary, "' in vhost 'test vhost'">>),
|
||||
receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok
|
||||
after 5000 -> flush(missing_event),
|
||||
after ?TIMEOUT -> flush(missing_event),
|
||||
ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
ok = close_connection_sync(Conn1),
|
||||
|
|
@ -563,7 +565,7 @@ target_per_message_topic(Config) ->
|
|||
Msg1 = amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, <<"m1">>)),
|
||||
ok = amqp10_client:send_msg(Sender, Msg1),
|
||||
receive {amqp10_disposition, {released, Tag1}} -> ok
|
||||
after 5000 -> ct:fail(released_timeout)
|
||||
after ?TIMEOUT -> ct:fail(released_timeout)
|
||||
end,
|
||||
|
||||
%% We don't have sufficient authorization.
|
||||
|
|
@ -574,7 +576,7 @@ target_per_message_topic(Config) ->
|
|||
<<"write access to topic '.a.b' in exchange 'amq.topic' in "
|
||||
"vhost 'test vhost' refused for user 'test user'">>),
|
||||
receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
ok = close_connection_sync(Connection).
|
||||
|
|
@ -594,7 +596,7 @@ authn_failure_event(Config) ->
|
|||
|
||||
{ok, Connection} = amqp10_client:open_connection(OpnConf),
|
||||
receive {amqp10_event, {connection, Connection, {closed, sasl_auth_failure}}} -> ok
|
||||
after 5000 -> flush(missing_closed),
|
||||
after ?TIMEOUT -> flush(missing_closed),
|
||||
ct:fail("did not receive sasl_auth_failure")
|
||||
end,
|
||||
|
||||
|
|
@ -621,7 +623,7 @@ sasl_success(Mechanism, Config) ->
|
|||
OpnConf = OpnConf0#{sasl := Mechanism},
|
||||
{ok, Connection} = amqp10_client:open_connection(OpnConf),
|
||||
receive {amqp10_event, {connection, Connection, opened}} -> ok
|
||||
after 5000 -> ct:fail(missing_opened)
|
||||
after ?TIMEOUT -> ct:fail(missing_opened)
|
||||
end,
|
||||
ok = amqp10_client:close_connection(Connection).
|
||||
|
||||
|
|
@ -638,7 +640,7 @@ sasl_anonymous_failure(Config) ->
|
|||
{ok, Connection} = amqp10_client:open_connection(OpnConf),
|
||||
receive {amqp10_event, {connection, Connection, {closed, Reason}}} ->
|
||||
?assertEqual({sasl_not_supported, Mechanism}, Reason)
|
||||
after 5000 -> ct:fail(missing_closed)
|
||||
after ?TIMEOUT -> ct:fail(missing_closed)
|
||||
end,
|
||||
|
||||
ok = rpc(Config, application, set_env, [App, Par, Default]).
|
||||
|
|
@ -649,7 +651,7 @@ sasl_plain_failure(Config) ->
|
|||
{ok, Connection} = amqp10_client:open_connection(OpnConf),
|
||||
receive {amqp10_event, {connection, Connection, {closed, Reason}}} ->
|
||||
?assertEqual(sasl_auth_failure, Reason)
|
||||
after 5000 -> ct:fail(missing_closed)
|
||||
after ?TIMEOUT -> ct:fail(missing_closed)
|
||||
end.
|
||||
|
||||
%% Skipping SASL is disallowed in RabbitMQ.
|
||||
|
|
@ -658,14 +660,14 @@ sasl_none_failure(Config) ->
|
|||
OpnConf = OpnConf0#{sasl := none},
|
||||
{ok, Connection} = amqp10_client:open_connection(OpnConf),
|
||||
receive {amqp10_event, {connection, Connection, {closed, _Reason}}} -> ok
|
||||
after 5000 -> ct:fail(missing_closed)
|
||||
after ?TIMEOUT -> ct:fail(missing_closed)
|
||||
end.
|
||||
|
||||
vhost_absent(Config) ->
|
||||
OpnConf = connection_config(Config, <<"this vhost does not exist">>),
|
||||
{ok, Connection} = amqp10_client:open_connection(OpnConf),
|
||||
receive {amqp10_event, {connection, Connection, {closed, _}}} -> ok
|
||||
after 5000 -> ct:fail(missing_closed)
|
||||
after ?TIMEOUT -> ct:fail(missing_closed)
|
||||
end.
|
||||
|
||||
vhost_connection_limit(Config) ->
|
||||
|
|
@ -675,22 +677,22 @@ vhost_connection_limit(Config) ->
|
|||
OpnConf = connection_config(Config),
|
||||
{ok, C1} = amqp10_client:open_connection(OpnConf),
|
||||
receive {amqp10_event, {connection, C1, opened}} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
{ok, C2} = amqp10_client:open_connection(OpnConf),
|
||||
receive {amqp10_event, {connection, C2, {closed, _}}} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
OpnConf0 = connection_config(Config, <<"/">>),
|
||||
OpnConf1 = OpnConf0#{sasl := anon},
|
||||
{ok, C3} = amqp10_client:open_connection(OpnConf1),
|
||||
receive {amqp10_event, {connection, C3, opened}} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
{ok, C4} = amqp10_client:open_connection(OpnConf1),
|
||||
receive {amqp10_event, {connection, C4, opened}} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
[ok = close_connection_sync(C) || C <- [C1, C3, C4]],
|
||||
|
|
@ -704,12 +706,12 @@ user_connection_limit(Config) ->
|
|||
OpnConf = OpnConf0#{sasl := anon},
|
||||
{ok, C1} = amqp10_client:open_connection(OpnConf),
|
||||
receive {amqp10_event, {connection, C1, {closed, _}}} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
{ok, C2} = amqp10_client:open_connection(connection_config(Config)),
|
||||
receive {amqp10_event, {connection, C2, opened}} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
ok = close_connection_sync(C2),
|
||||
|
|
@ -731,7 +733,7 @@ v1_vhost_queue_limit(Config) ->
|
|||
?V_1_0_AMQP_ERROR_RESOURCE_LIMIT_EXCEEDED,
|
||||
<<"cannot declare queue 'q1': queue limit in vhost 'test vhost' (0) is reached">>),
|
||||
receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok
|
||||
after 5000 -> flush(missing_ended),
|
||||
after ?TIMEOUT -> flush(missing_ended),
|
||||
ct:fail("did not receive expected error")
|
||||
end,
|
||||
|
||||
|
|
@ -742,8 +744,8 @@ v1_vhost_queue_limit(Config) ->
|
|||
{ok, Sender2} = amqp10_client:attach_sender_link(
|
||||
Session2, <<"test-sender-2">>, TargetAddress),
|
||||
receive {amqp10_event, {link, Sender2, attached}} -> ok
|
||||
after 5000 -> flush(missing_attached),
|
||||
ct:fail("missing ATTACH from server")
|
||||
after ?TIMEOUT -> flush(missing_attached),
|
||||
ct:fail("missing ATTACH from server")
|
||||
end,
|
||||
|
||||
ok = close_connection_sync(C1),
|
||||
|
|
|
|||
|
|
@ -1260,7 +1260,7 @@ drain_many(Config, QueueType, QName)
|
|||
after 30000 -> ct:fail({missing_delivery, ?LINE})
|
||||
end,
|
||||
receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok
|
||||
after 300 -> ct:fail("expected credit_exhausted")
|
||||
after 30000 -> ct:fail("expected credit_exhausted")
|
||||
end,
|
||||
|
||||
ok = amqp10_client:detach_link(Sender),
|
||||
|
|
|
|||
|
|
@ -80,10 +80,10 @@ credit_api_v2(Config) ->
|
|||
{ok, CQSender} = amqp10_client:attach_sender_link(Session, <<"cq sender">>, CQAddr),
|
||||
{ok, QQSender} = amqp10_client:attach_sender_link(Session, <<"qq sender">>, QQAddr),
|
||||
receive {amqp10_event, {link, CQSender, credited}} -> ok
|
||||
after 5000 -> ct:fail(credited_timeout)
|
||||
after 30_000 -> ct:fail(credited_timeout)
|
||||
end,
|
||||
receive {amqp10_event, {link, QQSender, credited}} -> ok
|
||||
after 5000 -> ct:fail(credited_timeout)
|
||||
after 30_000 -> ct:fail(credited_timeout)
|
||||
end,
|
||||
|
||||
%% Send 40 messages to each queue.
|
||||
|
|
@ -146,7 +146,7 @@ credit_api_v2(Config) ->
|
|||
%% Draining should also work.
|
||||
ok = amqp10_client:flow_link_credit(CQReceiver3, 10, never, true),
|
||||
receive {amqp10_event, {link, CQReceiver3, credit_exhausted}} -> ok
|
||||
after 5000 -> ct:fail({missing_credit_exhausted, ?LINE})
|
||||
after 30_000 -> ct:fail({missing_credit_exhausted, ?LINE})
|
||||
end,
|
||||
receive Unexpected1 -> ct:fail({unexpected, ?LINE, Unexpected1})
|
||||
after 20 -> ok
|
||||
|
|
@ -154,7 +154,7 @@ credit_api_v2(Config) ->
|
|||
|
||||
ok = amqp10_client:flow_link_credit(QQReceiver3, 10, never, true),
|
||||
receive {amqp10_event, {link, QQReceiver3, credit_exhausted}} -> ok
|
||||
after 5000 -> ct:fail({missing_credit_exhausted, ?LINE})
|
||||
after 30_000 -> ct:fail({missing_credit_exhausted, ?LINE})
|
||||
end,
|
||||
receive Unexpected2 -> ct:fail({unexpected, ?LINE, Unexpected2})
|
||||
after 20 -> ok
|
||||
|
|
@ -166,11 +166,11 @@ credit_api_v2(Config) ->
|
|||
ok = detach_sync(QQReceiver3),
|
||||
ok = amqp10_client:end_session(Session),
|
||||
receive {amqp10_event, {session, Session, {ended, _}}} -> ok
|
||||
after 5000 -> ct:fail(missing_ended)
|
||||
after 30_000 -> ct:fail(missing_ended)
|
||||
end,
|
||||
ok = amqp10_client:close_connection(Connection),
|
||||
receive {amqp10_event, {connection, Connection, {closed, normal}}} -> ok
|
||||
after 5000 -> ct:fail(missing_closed)
|
||||
after 30_000 -> ct:fail(missing_closed)
|
||||
end.
|
||||
|
||||
consume_and_accept(NumMsgs, Receiver) ->
|
||||
|
|
@ -192,14 +192,14 @@ receive_messages0(Receiver, N, Acc) ->
|
|||
receive
|
||||
{amqp10_msg, Receiver, Msg} ->
|
||||
receive_messages0(Receiver, N - 1, [Msg | Acc])
|
||||
after 5000 ->
|
||||
after 30_000 ->
|
||||
exit({timeout, {num_received, length(Acc)}, {num_missing, N}})
|
||||
end.
|
||||
|
||||
detach_sync(Receiver) ->
|
||||
ok = amqp10_client:detach_link(Receiver),
|
||||
receive {amqp10_event, {link, Receiver, {detached, normal}}} -> ok
|
||||
after 5000 -> ct:fail({missing_detached, Receiver})
|
||||
after 30_000 -> ct:fail({missing_detached, Receiver})
|
||||
end.
|
||||
|
||||
flush(Prefix) ->
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ wait_for_credit(Sender) ->
|
|||
receive
|
||||
{amqp10_event, {link, Sender, credited}} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after 30_000 ->
|
||||
flush("wait_for_credit timed out"),
|
||||
ct:fail(credited_timeout)
|
||||
end.
|
||||
|
|
@ -66,7 +66,7 @@ wait_for_accepts(N) ->
|
|||
receive
|
||||
{amqp10_disposition, {accepted, _}} ->
|
||||
wait_for_accepts(N - 1)
|
||||
after 5000 ->
|
||||
after 30_000 ->
|
||||
ct:fail({missing_accepted, N})
|
||||
end.
|
||||
|
||||
|
|
@ -108,9 +108,9 @@ wait_for_link_detach(Link) ->
|
|||
{amqp10_event, {link, Link, {detached, #'v1_0.detach'{}}}} ->
|
||||
flush(?FUNCTION_NAME),
|
||||
ok
|
||||
after 5000 ->
|
||||
flush("wait_for_link_detach timed out"),
|
||||
ct:fail({link_detach_timeout, Link})
|
||||
after 30_000 ->
|
||||
flush("wait_for_link_detach timed out"),
|
||||
ct:fail({link_detach_timeout, Link})
|
||||
end.
|
||||
|
||||
end_session_sync(Session)
|
||||
|
|
@ -123,9 +123,9 @@ wait_for_session_end(Session) ->
|
|||
{amqp10_event, {session, Session, {ended, _}}} ->
|
||||
flush(?FUNCTION_NAME),
|
||||
ok
|
||||
after 5000 ->
|
||||
flush("wait_for_session_end timed out"),
|
||||
ct:fail({session_end_timeout, Session})
|
||||
after 30_000 ->
|
||||
flush("wait_for_session_end timed out"),
|
||||
ct:fail({session_end_timeout, Session})
|
||||
end.
|
||||
|
||||
close_connection_sync(Connection)
|
||||
|
|
@ -138,7 +138,7 @@ wait_for_connection_close(Connection) ->
|
|||
{amqp10_event, {connection, Connection, {closed, normal}}} ->
|
||||
flush(?FUNCTION_NAME),
|
||||
ok
|
||||
after 5000 ->
|
||||
flush("wait_for_connection_close timed out"),
|
||||
ct:fail({connection_close_timeout, Connection})
|
||||
after 30_000 ->
|
||||
flush("wait_for_connection_close timed out"),
|
||||
ct:fail({connection_close_timeout, Connection})
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -19,6 +19,8 @@
|
|||
-import(rabbit_ct_helpers,
|
||||
[eventually/3]).
|
||||
|
||||
-define(TIMEOUT, 30_000).
|
||||
|
||||
all() ->
|
||||
[
|
||||
{group, tests}
|
||||
|
|
@ -95,7 +97,7 @@ requeue_one_channel(QType, Config) ->
|
|||
self()),
|
||||
|
||||
receive #'basic.consume_ok'{consumer_tag = Ctag} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
[begin
|
||||
|
|
@ -107,19 +109,19 @@ requeue_one_channel(QType, Config) ->
|
|||
|
||||
receive {#'basic.deliver'{},
|
||||
#amqp_msg{payload = <<"1">>}} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
receive {#'basic.deliver'{},
|
||||
#amqp_msg{payload = <<"2">>}} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
D3 = receive {#'basic.deliver'{delivery_tag = Del3},
|
||||
#amqp_msg{payload = <<"3">>}} -> Del3
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
receive {#'basic.deliver'{},
|
||||
#amqp_msg{payload = <<"4">>}} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
assert_messages(QName, 4, 4, Config),
|
||||
|
||||
|
|
@ -132,18 +134,18 @@ requeue_one_channel(QType, Config) ->
|
|||
receive {#'basic.deliver'{},
|
||||
#amqp_msg{payload = P1}} ->
|
||||
?assertEqual(<<"1">>, P1)
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
receive {#'basic.deliver'{},
|
||||
#amqp_msg{payload = P2}} ->
|
||||
?assertEqual(<<"2">>, P2)
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
D3b = receive {#'basic.deliver'{delivery_tag = Del3b},
|
||||
#amqp_msg{payload = P3}} ->
|
||||
?assertEqual(<<"3">>, P3),
|
||||
Del3b
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
assert_messages(QName, 4, 4, Config),
|
||||
|
||||
|
|
@ -181,7 +183,7 @@ requeue_two_channels(QType, Config) ->
|
|||
self()),
|
||||
|
||||
receive #'basic.consume_ok'{consumer_tag = Ctag1} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
amqp_channel:subscribe(Ch2,
|
||||
|
|
@ -189,7 +191,7 @@ requeue_two_channels(QType, Config) ->
|
|||
consumer_tag = Ctag2},
|
||||
self()),
|
||||
receive #'basic.consume_ok'{consumer_tag = Ctag2} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
[begin
|
||||
|
|
@ -203,22 +205,22 @@ requeue_two_channels(QType, Config) ->
|
|||
receive {#'basic.deliver'{consumer_tag = C1},
|
||||
#amqp_msg{payload = <<"1">>}} ->
|
||||
?assertEqual(Ctag1, C1)
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
receive {#'basic.deliver'{consumer_tag = C2},
|
||||
#amqp_msg{payload = <<"2">>}} ->
|
||||
?assertEqual(Ctag2, C2)
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
receive {#'basic.deliver'{consumer_tag = C3},
|
||||
#amqp_msg{payload = <<"3">>}} ->
|
||||
?assertEqual(Ctag1, C3)
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
receive {#'basic.deliver'{consumer_tag = C4},
|
||||
#amqp_msg{payload = <<"4">>}} ->
|
||||
?assertEqual(Ctag2, C4)
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
assert_messages(QName, 4, 4, Config),
|
||||
|
||||
|
|
@ -228,14 +230,14 @@ requeue_two_channels(QType, Config) ->
|
|||
receive {#'basic.deliver'{consumer_tag = C5},
|
||||
#amqp_msg{payload = <<"1">>}} ->
|
||||
?assertEqual(Ctag2, C5)
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
DelTag = receive {#'basic.deliver'{consumer_tag = C6,
|
||||
delivery_tag = D},
|
||||
#amqp_msg{payload = <<"3">>}} ->
|
||||
?assertEqual(Ctag2, C6),
|
||||
D
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
assert_messages(QName, 4, 4, Config),
|
||||
|
||||
|
|
|
|||
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
-import(rabbit_ct_helpers, [eventually/1]).
|
||||
|
||||
-define(TIMEOUT, 30_000).
|
||||
|
||||
all() ->
|
||||
[
|
||||
{group, cluster_size_1},
|
||||
|
|
@ -116,7 +118,7 @@ trace(Config) ->
|
|||
correlation_id = CorrelationId},
|
||||
payload = RequestPayload}),
|
||||
receive #'basic.ack'{} -> ok
|
||||
after 5000 -> ct:fail(confirm_timeout)
|
||||
after ?TIMEOUT -> ct:fail(confirm_timeout)
|
||||
end,
|
||||
|
||||
%% Receive the request.
|
||||
|
|
@ -138,7 +140,7 @@ trace(Config) ->
|
|||
#amqp_msg{payload = ReplyPayload,
|
||||
props = #'P_basic'{correlation_id = CorrelationId}}} ->
|
||||
ok
|
||||
after 5000 -> ct:fail(missing_reply)
|
||||
after ?TIMEOUT -> ct:fail(missing_reply)
|
||||
end,
|
||||
|
||||
%% 2 messages should have entered RabbitMQ:
|
||||
|
|
@ -217,7 +219,7 @@ rpc(RequesterNode, ResponderNode, Config) ->
|
|||
correlation_id = CorrelationId},
|
||||
payload = RequestPayload}),
|
||||
receive #'basic.ack'{} -> ok
|
||||
after 5000 -> ct:fail(confirm_timeout)
|
||||
after ?TIMEOUT -> ct:fail(confirm_timeout)
|
||||
end,
|
||||
|
||||
ok = wait_for_queue_declared(RequestQueue, ResponderNode, Config),
|
||||
|
|
@ -239,7 +241,7 @@ rpc(RequesterNode, ResponderNode, Config) ->
|
|||
#amqp_msg{payload = ReplyPayload,
|
||||
props = #'P_basic'{correlation_id = CorrelationId}}} ->
|
||||
ok
|
||||
after 5000 -> ct:fail(missing_reply)
|
||||
after ?TIMEOUT -> ct:fail(missing_reply)
|
||||
end.
|
||||
|
||||
wait_for_queue_declared(Queue, Node, Config) ->
|
||||
|
|
|
|||
|
|
@ -1052,7 +1052,7 @@ bq_queue_recover1(Config) ->
|
|||
exit(QPid, kill),
|
||||
MRef = erlang:monitor(process, QPid),
|
||||
receive {'DOWN', MRef, process, QPid, _Info} -> ok
|
||||
after 10000 -> exit(timeout_waiting_for_queue_death)
|
||||
after ?TIMEOUT -> exit(timeout_waiting_for_queue_death)
|
||||
end,
|
||||
rabbit_amqqueue:stop(?VHOST),
|
||||
{Recovered, []} = rabbit_amqqueue:recover(?VHOST),
|
||||
|
|
|
|||
|
|
@ -362,7 +362,7 @@ subscribe(Ch, QName) ->
|
|||
receive
|
||||
#'basic.consume_ok'{consumer_tag = CTag} ->
|
||||
ok
|
||||
after 10000 ->
|
||||
after 30000 ->
|
||||
exit(consume_ok_timeout)
|
||||
end.
|
||||
|
||||
|
|
@ -372,6 +372,6 @@ consume(N) ->
|
|||
receive
|
||||
{#'basic.deliver'{consumer_tag = <<"ctag">>}, _} ->
|
||||
consume(N - 1)
|
||||
after 10000 ->
|
||||
after 30000 ->
|
||||
exit(deliver_timeout)
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -6,6 +6,8 @@
|
|||
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
|
||||
-compile(export_all).
|
||||
|
||||
-define(TIMEOUT, 30_000).
|
||||
|
||||
all() ->
|
||||
[
|
||||
{group, parallel_tests}
|
||||
|
|
@ -132,7 +134,7 @@ dead_queue_rejects(Config) ->
|
|||
|
||||
receive
|
||||
{'basic.ack',_,_} -> ok
|
||||
after 10000 ->
|
||||
after ?TIMEOUT ->
|
||||
error(timeout_waiting_for_initial_ack)
|
||||
end,
|
||||
|
||||
|
|
@ -191,7 +193,7 @@ kill_queue_expect_nack(Config, Ch, QueueName, BasicPublish, AmqpMsg, Tries) ->
|
|||
ok;
|
||||
{'basic.ack',_,_} ->
|
||||
retry
|
||||
after 10000 ->
|
||||
after ?TIMEOUT ->
|
||||
error({timeout_waiting_for_nack, process_info(self(), messages)})
|
||||
end,
|
||||
case R of
|
||||
|
|
@ -343,19 +345,19 @@ consume_all_messages(Ch, QueueName, Msgs) ->
|
|||
|
||||
assert_ack() ->
|
||||
receive {'basic.ack', _, _} -> ok
|
||||
after 10000 -> error(timeout_waiting_for_ack)
|
||||
after ?TIMEOUT -> error(timeout_waiting_for_ack)
|
||||
end,
|
||||
clean_acks_mailbox().
|
||||
|
||||
assert_nack() ->
|
||||
receive {'basic.nack', _, _, _} -> ok
|
||||
after 10000 -> error(timeout_waiting_for_nack)
|
||||
after ?TIMEOUT -> error(timeout_waiting_for_nack)
|
||||
end,
|
||||
clean_acks_mailbox().
|
||||
|
||||
assert_acks(N) ->
|
||||
receive {'basic.ack', N, _} -> ok
|
||||
after 10000 -> error({timeout_waiting_for_ack, N})
|
||||
after ?TIMEOUT -> error({timeout_waiting_for_ack, N})
|
||||
end,
|
||||
clean_acks_mailbox().
|
||||
|
||||
|
|
|
|||
|
|
@ -14,7 +14,9 @@
|
|||
-compile(export_all).
|
||||
|
||||
-define(CONSUMER_TIMEOUT, 2000).
|
||||
-define(RECEIVE_TIMEOUT, ?CONSUMER_TIMEOUT * 2).
|
||||
%% Sometimes CI machines are really slow,
|
||||
%% expecting CONSUMER_TIMEOUT*2 might not be enough
|
||||
-define(RECEIVE_TIMEOUT, 30_000).
|
||||
|
||||
-define(GROUP_CONFIG,
|
||||
#{global_consumer_timeout => [{rabbit, [{consumer_timeout, ?CONSUMER_TIMEOUT}]},
|
||||
|
|
@ -147,7 +149,7 @@ consumer_timeout(Config) ->
|
|||
{'DOWN', _, process, Conn, _} ->
|
||||
flush(1),
|
||||
exit(unexpected_connection_exit)
|
||||
after 2000 ->
|
||||
after ?RECEIVE_TIMEOUT ->
|
||||
ok
|
||||
end,
|
||||
rabbit_ct_client_helpers:close_channel(Ch),
|
||||
|
|
@ -172,7 +174,7 @@ consumer_timeout_basic_get(Config) ->
|
|||
{'DOWN', _, process, Conn, _} ->
|
||||
flush(1),
|
||||
exit(unexpected_connection_exit)
|
||||
after 2000 ->
|
||||
after ?RECEIVE_TIMEOUT ->
|
||||
ok
|
||||
end,
|
||||
ok.
|
||||
|
|
@ -221,7 +223,7 @@ consumer_timeout_no_basic_cancel_capability(Config) ->
|
|||
{'DOWN', _, process, Conn, _} ->
|
||||
flush(1),
|
||||
exit(unexpected_connection_exit)
|
||||
after 2000 ->
|
||||
after ?RECEIVE_TIMEOUT ->
|
||||
ok
|
||||
end,
|
||||
ok.
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
-import(queue_utils, [wait_for_messages/2]).
|
||||
|
||||
-define(TIMEOUT, 30_000).
|
||||
|
||||
all() ->
|
||||
[
|
||||
{group, tests}
|
||||
|
|
@ -455,7 +457,7 @@ dead_letter_reject_many(Config) ->
|
|||
[begin
|
||||
receive {#'basic.deliver'{consumer_tag = CTag, delivery_tag = DTag}, #amqp_msg{payload = P}} ->
|
||||
amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag, requeue = false})
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}),
|
||||
exit(timeout)
|
||||
end
|
||||
|
|
@ -628,7 +630,7 @@ dead_letter_nack_requeue_nack_norequeue_basic_consume(Config) ->
|
|||
consumer_tag = Ctag1},
|
||||
self()),
|
||||
receive #'basic.consume_ok'{consumer_tag = Ctag1} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
Ctag2 = <<"ctag 2">>,
|
||||
|
|
@ -637,20 +639,20 @@ dead_letter_nack_requeue_nack_norequeue_basic_consume(Config) ->
|
|||
consumer_tag = Ctag2},
|
||||
self()),
|
||||
receive #'basic.consume_ok'{consumer_tag = Ctag2} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
receive {#'basic.deliver'{},
|
||||
#amqp_msg{payload = <<"m1">>}} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
D2 = receive {#'basic.deliver'{delivery_tag = Del2},
|
||||
#amqp_msg{payload = <<"m2">>}} -> Del2
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
receive {#'basic.deliver'{},
|
||||
#amqp_msg{payload = <<"m3">>}} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
|
||||
|
||||
|
|
@ -663,13 +665,13 @@ dead_letter_nack_requeue_nack_norequeue_basic_consume(Config) ->
|
|||
receive {#'basic.deliver'{},
|
||||
#amqp_msg{payload = P1a}} ->
|
||||
?assertEqual(<<"m1">>, P1a)
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
D5 = receive {#'basic.deliver'{delivery_tag = Del5},
|
||||
#amqp_msg{payload = P2a}} ->
|
||||
?assertEqual(<<"m2">>, P2a),
|
||||
Del5
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
%% Nack all 3 without requeue
|
||||
|
|
@ -681,18 +683,18 @@ dead_letter_nack_requeue_nack_norequeue_basic_consume(Config) ->
|
|||
receive {#'basic.deliver'{},
|
||||
#amqp_msg{payload = P3b}} ->
|
||||
?assertEqual(<<"m3">>, P3b)
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
receive {#'basic.deliver'{},
|
||||
#amqp_msg{payload = P1b}} ->
|
||||
?assertEqual(<<"m1">>, P1b)
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
LastD = receive {#'basic.deliver'{delivery_tag = LastDel},
|
||||
#amqp_msg{payload = P2b}} ->
|
||||
?assertEqual(<<"m2">>, P2b),
|
||||
LastDel
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
wait_for_messages(Config, [[DLXQName, <<"3">>, <<"0">>, <<"3">>]]),
|
||||
|
||||
|
|
@ -1728,7 +1730,7 @@ metric_rejected(Config) ->
|
|||
[begin
|
||||
receive {#'basic.deliver'{consumer_tag = CTag, delivery_tag = DTag}, #amqp_msg{payload = P}} ->
|
||||
amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag, requeue = false})
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}),
|
||||
exit(timeout)
|
||||
end
|
||||
|
|
@ -1813,7 +1815,7 @@ stream(Config) ->
|
|||
self()),
|
||||
receive
|
||||
#'basic.consume_ok'{consumer_tag = Ctag} -> ok
|
||||
after 5000 -> ct:fail({missing_event, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
Headers = receive {#'basic.deliver'{delivery_tag = DeliveryTag},
|
||||
|
|
|
|||
|
|
@ -421,7 +421,7 @@ assert_confirm() ->
|
|||
receive
|
||||
#'basic.ack'{} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after 30_000 ->
|
||||
throw(missing_confirm)
|
||||
end.
|
||||
|
||||
|
|
@ -429,7 +429,7 @@ assert_return() ->
|
|||
receive
|
||||
{#'basic.return'{}, _} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after 30_000 ->
|
||||
throw(missing_return)
|
||||
end.
|
||||
|
||||
|
|
|
|||
|
|
@ -88,7 +88,7 @@ disconnect_detected_during_alarm(Config) ->
|
|||
% Check that connection was indeed blocked
|
||||
#'connection.blocked'{} -> ok
|
||||
after
|
||||
1000 -> exit(connection_was_not_blocked)
|
||||
30_000 -> exit(connection_was_not_blocked)
|
||||
end,
|
||||
|
||||
%% Connection is blocked, now we should forcefully kill it
|
||||
|
|
|
|||
|
|
@ -482,7 +482,7 @@ join_khepri_while_in_minority(Config) ->
|
|||
receive
|
||||
#'basic.consume_ok'{consumer_tag = CTag} ->
|
||||
ok
|
||||
after 10000 ->
|
||||
after 30_000 ->
|
||||
exit(consume_ok_timeout)
|
||||
end,
|
||||
|
||||
|
|
@ -490,7 +490,7 @@ join_khepri_while_in_minority(Config) ->
|
|||
receive
|
||||
{#'basic.deliver'{consumer_tag = <<"ctag">>}, _} ->
|
||||
ok
|
||||
after 10000 ->
|
||||
after 30_000 ->
|
||||
exit(deliver_timeout)
|
||||
end,
|
||||
|
||||
|
|
|
|||
|
|
@ -294,7 +294,7 @@ test_startup_failure(Fail, Group) ->
|
|||
receive
|
||||
{'EXIT', _, shutdown} ->
|
||||
ok
|
||||
after 1000 ->
|
||||
after 30_000 ->
|
||||
exit({did_not_exit, Fail})
|
||||
end,
|
||||
process_flag(trap_exit, false),
|
||||
|
|
|
|||
|
|
@ -81,7 +81,7 @@ message_size(Config) ->
|
|||
Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
|
||||
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address),
|
||||
receive {amqp10_event, {link, Sender, credited}} -> ok
|
||||
after 5000 -> ct:fail(credited_timeout)
|
||||
after 30_000 -> ct:fail(credited_timeout)
|
||||
end,
|
||||
|
||||
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)),
|
||||
|
|
@ -149,6 +149,6 @@ wait_for_settlement(State, Tag) ->
|
|||
receive
|
||||
{amqp10_disposition, {State, Tag}} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
ct:fail({disposition_timeout, Tag})
|
||||
after 30_000 ->
|
||||
ct:fail({disposition_timeout, Tag})
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -155,7 +155,7 @@ confirm_ack(Config) ->
|
|||
receive
|
||||
#'basic.ack'{delivery_tag = 1} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
throw(missing_ack)
|
||||
end.
|
||||
|
||||
|
|
@ -196,13 +196,13 @@ confirm_mandatory_unroutable(Config) ->
|
|||
receive
|
||||
{#'basic.return'{}, _} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
throw(missing_return)
|
||||
end,
|
||||
receive
|
||||
#'basic.ack'{delivery_tag = 1} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
throw(missing_ack)
|
||||
end.
|
||||
|
||||
|
|
@ -218,7 +218,7 @@ confirm_unroutable_message(Config) ->
|
|||
throw(unexpected_basic_return);
|
||||
#'basic.ack'{delivery_tag = 1} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
throw(missing_ack)
|
||||
end.
|
||||
|
||||
|
|
@ -333,6 +333,6 @@ receive_many(DTags) ->
|
|||
receive_many(DTags -- lists:seq(1, DTag));
|
||||
#'basic.ack'{delivery_tag = DTag, multiple = false} ->
|
||||
receive_many(DTags -- [DTag])
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
throw(missing_ack)
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -306,19 +306,19 @@ check_max_length_rejects(QName, Ch, Payload1, Payload2, Payload3) ->
|
|||
%% First message can be enqueued and acks
|
||||
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
|
||||
receive #'basic.ack'{} -> ok
|
||||
after 1000 -> error(expected_ack)
|
||||
after 30_000 -> error(expected_ack)
|
||||
end,
|
||||
|
||||
%% The message cannot be enqueued and nacks
|
||||
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
|
||||
receive #'basic.nack'{} -> ok
|
||||
after 1000 -> error(expected_nack)
|
||||
after 30_000 -> error(expected_nack)
|
||||
end,
|
||||
|
||||
%% The message cannot be enqueued and nacks
|
||||
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
|
||||
receive #'basic.nack'{} -> ok
|
||||
after 1000 -> error(expected_nack)
|
||||
after 30_000 -> error(expected_nack)
|
||||
end,
|
||||
|
||||
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
|
||||
|
|
@ -326,7 +326,7 @@ check_max_length_rejects(QName, Ch, Payload1, Payload2, Payload3) ->
|
|||
%% Now we can publish message 2.
|
||||
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
|
||||
receive #'basic.ack'{} -> ok
|
||||
after 1000 -> error(expected_ack)
|
||||
after 30_000 -> error(expected_ack)
|
||||
end,
|
||||
|
||||
{#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
|
||||
|
|
|
|||
|
|
@ -538,7 +538,7 @@ basic_cancel(Config) ->
|
|||
wait_for_messages(Config, QName, 3, 2, 1),
|
||||
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
|
||||
wait_for_messages(Config, QName, 2, 2, 0)
|
||||
after 5000 ->
|
||||
after 30_000 ->
|
||||
exit(basic_deliver_timeout)
|
||||
end,
|
||||
rabbit_ct_client_helpers:close_channel(Ch),
|
||||
|
|
@ -667,7 +667,7 @@ cc_header_non_array_should_close_channel(Config) ->
|
|||
receive
|
||||
{'DOWN', Ref, process, Ch, {shutdown, {server_initiated_close, 406, _}}} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after 30_000 ->
|
||||
exit(channel_closed_timeout)
|
||||
end,
|
||||
|
||||
|
|
|
|||
|
|
@ -6,6 +6,8 @@
|
|||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
|
||||
-define(TIMEOUT, 30_000).
|
||||
|
||||
%%%===================================================================
|
||||
%%% Common Test callbacks
|
||||
%%%===================================================================
|
||||
|
|
@ -132,7 +134,7 @@ smoke(Config) ->
|
|||
redelivered = false},
|
||||
#amqp_msg{}} ->
|
||||
basic_ack(Ch, DeliveryTag)
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(),
|
||||
exit(basic_deliver_timeout)
|
||||
end,
|
||||
|
|
@ -151,7 +153,7 @@ smoke(Config) ->
|
|||
#amqp_msg{}} ->
|
||||
basic_cancel(Ch, ConsumerTag2),
|
||||
basic_nack(Ch, T)
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(basic_deliver_timeout)
|
||||
end,
|
||||
%% get and ack
|
||||
|
|
@ -255,7 +257,7 @@ stream(Config) ->
|
|||
redelivered = false},
|
||||
#amqp_msg{}} ->
|
||||
basic_ack(SubCh, T)
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(basic_deliver_timeout)
|
||||
end
|
||||
catch
|
||||
|
|
@ -294,7 +296,7 @@ publish_and_confirm(Ch, Queue, Msg) ->
|
|||
ok = receive
|
||||
#'basic.ack'{} -> ok;
|
||||
#'basic.nack'{} -> fail
|
||||
after 2500 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(),
|
||||
exit(confirm_timeout)
|
||||
end.
|
||||
|
|
@ -318,7 +320,7 @@ subscribe(Ch, Queue, CTag) ->
|
|||
receive
|
||||
#'basic.consume_ok'{consumer_tag = CTag} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(basic_consume_timeout)
|
||||
end.
|
||||
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@
|
|||
|
||||
-define(NET_TICKTIME_S, 5).
|
||||
-define(DEFAULT_AWAIT, 10_000).
|
||||
-define(TIMEOUT, 30_000).
|
||||
|
||||
suite() ->
|
||||
[{timetrap, 5 * 60_000}].
|
||||
|
|
@ -604,7 +605,7 @@ start_queue_concurrent(Config) ->
|
|||
|
||||
[begin
|
||||
receive {done, Server} -> ok
|
||||
after 10000 -> exit({await_done_timeout, Server})
|
||||
after ?TIMEOUT -> exit({await_done_timeout, Server})
|
||||
end
|
||||
end || Server <- Servers],
|
||||
|
||||
|
|
@ -1077,7 +1078,7 @@ single_active_consumer_priority_take_over(Config) ->
|
|||
delivery_tag = DeliveryTag}, _} ->
|
||||
amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag,
|
||||
multiple = false})
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(1),
|
||||
exit(basic_deliver_timeout)
|
||||
end,
|
||||
|
|
@ -2407,7 +2408,7 @@ confirm_availability_on_leader_change(Config) ->
|
|||
{'EXIT', Publisher, Err} ->
|
||||
ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
|
||||
exit(Err)
|
||||
after 30000 ->
|
||||
after ?TIMEOUT ->
|
||||
ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
|
||||
flush(100),
|
||||
exit(nothing_received_from_publisher_process)
|
||||
|
|
@ -2872,7 +2873,7 @@ subscribe_redelivery_count(Config) ->
|
|||
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
|
||||
multiple = false,
|
||||
requeue = true})
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(basic_deliver_timeout)
|
||||
end,
|
||||
|
||||
|
|
@ -2885,7 +2886,7 @@ subscribe_redelivery_count(Config) ->
|
|||
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
|
||||
multiple = false,
|
||||
requeue = true})
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(1),
|
||||
exit(basic_deliver_timeout_2)
|
||||
end,
|
||||
|
|
@ -2901,7 +2902,7 @@ subscribe_redelivery_count(Config) ->
|
|||
wait_for_messages_ready(Servers, RaName, 0),
|
||||
ct:pal("wait_for_messages_pending_ack", []),
|
||||
wait_for_messages_pending_ack(Servers, RaName, 0)
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(500),
|
||||
exit(basic_deliver_timeout_3)
|
||||
end.
|
||||
|
|
@ -2986,7 +2987,7 @@ subscribe_redelivery_limit_disable(Config) ->
|
|||
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2,
|
||||
multiple = false,
|
||||
requeue = true})
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(1),
|
||||
ct:fail("message did not arrive as expected")
|
||||
end,
|
||||
|
|
@ -3632,7 +3633,7 @@ receive_and_ack(Ch) ->
|
|||
redelivered = false}, _} ->
|
||||
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
|
||||
multiple = false})
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
ct:fail("receive_and_ack timed out", [])
|
||||
end.
|
||||
|
||||
|
|
@ -3733,7 +3734,7 @@ per_message_ttl_mixed_expiry(Config) ->
|
|||
#amqp_msg{payload = Msg1}} ->
|
||||
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
|
||||
multiple = false})
|
||||
after 2000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(10),
|
||||
ct:fail("basic deliver timeout")
|
||||
end,
|
||||
|
|
@ -3761,7 +3762,7 @@ per_message_ttl_expiration_too_high(Config) ->
|
|||
{'DOWN', MonitorRef, process, Ch,
|
||||
{shutdown, {server_initiated_close, 406, <<"PRECONDITION_FAILED - invalid expiration", _/binary>>}}} ->
|
||||
ok
|
||||
after 1000 ->
|
||||
after ?TIMEOUT ->
|
||||
ct:fail("expected channel error")
|
||||
end.
|
||||
|
||||
|
|
@ -3883,7 +3884,7 @@ consumer_priorities(Config) ->
|
|||
{#'basic.deliver'{delivery_tag = D1,
|
||||
consumer_tag = Tag2}, _} ->
|
||||
D1
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(100),
|
||||
ct:fail("basic.deliver timeout")
|
||||
end,
|
||||
|
|
@ -3893,7 +3894,7 @@ consumer_priorities(Config) ->
|
|||
{#'basic.deliver'{delivery_tag = _,
|
||||
consumer_tag = Tag2}, _} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(100),
|
||||
ct:fail("basic.deliver timeout")
|
||||
end,
|
||||
|
|
@ -3904,7 +3905,7 @@ consumer_priorities(Config) ->
|
|||
{#'basic.deliver'{delivery_tag = _,
|
||||
consumer_tag = Tag1}, _} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(100),
|
||||
ct:fail("basic.deliver timeout")
|
||||
end,
|
||||
|
|
@ -3917,7 +3918,7 @@ consumer_priorities(Config) ->
|
|||
{#'basic.deliver'{delivery_tag = _,
|
||||
consumer_tag = Tag2}, _} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(100),
|
||||
ct:fail("basic.deliver timeout")
|
||||
end,
|
||||
|
|
@ -3945,7 +3946,7 @@ cancel_consumer_gh_3729(Config) ->
|
|||
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
|
||||
R = #'basic.reject'{delivery_tag = DeliveryTag, requeue = true},
|
||||
ok = amqp_channel:cast(Ch, R)
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(100),
|
||||
ct:fail("basic.deliver timeout")
|
||||
end,
|
||||
|
|
@ -3954,7 +3955,7 @@ cancel_consumer_gh_3729(Config) ->
|
|||
|
||||
receive
|
||||
#'basic.cancel_ok'{consumer_tag = <<"ctag">>} -> ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(100),
|
||||
ct:fail("basic.cancel_ok timeout")
|
||||
end,
|
||||
|
|
@ -3990,7 +3991,7 @@ cancel_consumer_gh_12424(Config) ->
|
|||
DeliveryTag = receive
|
||||
{#'basic.deliver'{delivery_tag = DT}, _} ->
|
||||
DT
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(100),
|
||||
ct:fail("basic.deliver timeout")
|
||||
end,
|
||||
|
|
@ -4023,7 +4024,7 @@ cancel_and_consume_with_same_tag(Config) ->
|
|||
{#'basic.deliver'{delivery_tag = D},
|
||||
#amqp_msg{payload = <<"msg1">>}} ->
|
||||
D
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(100),
|
||||
ct:fail("basic.deliver timeout")
|
||||
end,
|
||||
|
|
@ -4038,7 +4039,7 @@ cancel_and_consume_with_same_tag(Config) ->
|
|||
{#'basic.deliver'{delivery_tag = _},
|
||||
#amqp_msg{payload = <<"msg2">>}} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(100),
|
||||
ct:fail("basic.deliver timeout 2")
|
||||
end,
|
||||
|
|
@ -4290,7 +4291,7 @@ requeue_multiple_true(Config) ->
|
|||
#amqp_msg{payload = P0}} ->
|
||||
?assertEqual(P, P0),
|
||||
D
|
||||
after 5000 -> ct:fail({basic_deliver_timeout, P, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({basic_deliver_timeout, P, ?LINE})
|
||||
end || P <- Payloads],
|
||||
|
||||
%% Requeue all messages.
|
||||
|
|
@ -4303,7 +4304,7 @@ requeue_multiple_true(Config) ->
|
|||
[receive {#'basic.deliver'{redelivered = true},
|
||||
#amqp_msg{payload = P1}} ->
|
||||
?assertEqual(P, P1)
|
||||
after 5000 -> ct:fail({basic_deliver_timeout, P, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({basic_deliver_timeout, P, ?LINE})
|
||||
end || P <- Payloads],
|
||||
|
||||
?assertEqual(#'queue.delete_ok'{message_count = 0},
|
||||
|
|
@ -4328,7 +4329,7 @@ requeue_multiple_false(Config) ->
|
|||
#amqp_msg{payload = P0}} ->
|
||||
?assertEqual(P, P0),
|
||||
D
|
||||
after 5000 -> ct:fail({basic_deliver_timeout, P, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({basic_deliver_timeout, P, ?LINE})
|
||||
end || P <- Payloads],
|
||||
|
||||
%% The delivery tags we received via AMQP 0.9.1 are ordered from 1-100.
|
||||
|
|
@ -4347,7 +4348,7 @@ requeue_multiple_false(Config) ->
|
|||
[receive {#'basic.deliver'{redelivered = true},
|
||||
#amqp_msg{payload = P1}} ->
|
||||
?assertEqual(integer_to_binary(D), P1)
|
||||
after 5000 -> ct:fail({basic_deliver_timeout, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({basic_deliver_timeout, ?LINE})
|
||||
end || D <- DTagsShuffled],
|
||||
|
||||
?assertEqual(#'queue.delete_ok'{message_count = 0},
|
||||
|
|
@ -4474,7 +4475,7 @@ subscribe(Ch, Queue, NoAck, Tag, Args) ->
|
|||
receive
|
||||
#'basic.consume_ok'{consumer_tag = Tag} ->
|
||||
ok
|
||||
after 30000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(100),
|
||||
exit(subscribe_timeout)
|
||||
end.
|
||||
|
|
@ -4499,7 +4500,7 @@ nack(Ch, Multiple, Requeue) ->
|
|||
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
|
||||
multiple = Multiple,
|
||||
requeue = Requeue})
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(10),
|
||||
ct:fail("basic deliver timeout")
|
||||
end.
|
||||
|
|
@ -4572,7 +4573,7 @@ validate_queue(Ch, Queue, ExpectedMsgs) ->
|
|||
#amqp_msg{payload = M}} ->
|
||||
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
|
||||
multiple = false})
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(10),
|
||||
exit({validate_queue_timeout, M})
|
||||
end
|
||||
|
|
|
|||
|
|
@ -482,6 +482,6 @@ receive_delete_events(N, Evts) ->
|
|||
receive
|
||||
{mnesia_table_event, {delete, _, Record, _, _}} ->
|
||||
receive_delete_events(N - 1, [Record | Evts])
|
||||
after 10000 ->
|
||||
after 30000 ->
|
||||
Evts
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -755,7 +755,7 @@ publish_confirm(Ch, QName) ->
|
|||
ok;
|
||||
#'basic.nack'{} ->
|
||||
fail
|
||||
after 2500 ->
|
||||
after 30_000 ->
|
||||
ct:fail(confirm_timeout)
|
||||
end.
|
||||
|
||||
|
|
@ -871,14 +871,14 @@ many_target_queues(Config) ->
|
|||
receive
|
||||
#'basic.consume_ok'{consumer_tag = CTag} ->
|
||||
ok
|
||||
after 2000 ->
|
||||
after 30_000 ->
|
||||
exit(consume_ok_timeout)
|
||||
end,
|
||||
receive
|
||||
{#'basic.deliver'{consumer_tag = CTag},
|
||||
#amqp_msg{payload = Msg1}} ->
|
||||
ok
|
||||
after 2000 ->
|
||||
after 30_000 ->
|
||||
exit(deliver_timeout)
|
||||
end,
|
||||
?awaitMatch([{0, 0}],
|
||||
|
|
@ -911,7 +911,7 @@ many_target_queues(Config) ->
|
|||
{#'basic.deliver'{consumer_tag = CTag},
|
||||
#amqp_msg{payload = Msg2}} ->
|
||||
ok
|
||||
after 0 ->
|
||||
after 30_000 ->
|
||||
exit(deliver_timeout)
|
||||
end,
|
||||
?assertEqual(2, counted(messages_dead_lettered_expired_total, Config)),
|
||||
|
|
|
|||
|
|
@ -10,8 +10,9 @@
|
|||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
-include_lib("rabbit_common/include/rabbit_framing.hrl").
|
||||
|
||||
-define(RA_EVENT_TIMEOUT, 5000).
|
||||
-define(RA_EVENT_TIMEOUT, 30_000).
|
||||
-define(RA_SYSTEM, quorum_queues).
|
||||
-define(TIMEOUT, 30_000).
|
||||
|
||||
all() ->
|
||||
[
|
||||
|
|
@ -118,7 +119,7 @@ basics(Config) ->
|
|||
{ok, S, _} ->
|
||||
DeliverFun(S, F)
|
||||
end
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(),
|
||||
exit(await_delivery_timeout)
|
||||
end
|
||||
|
|
@ -136,7 +137,7 @@ basics(Config) ->
|
|||
ct:pal("ra_event ~p", [Evt]),
|
||||
{ok, F6, _} = rabbit_fifo_client:handle_ra_event(ClusterName, From, Evt, FState5),
|
||||
F6
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(leader_change_timeout)
|
||||
end,
|
||||
|
||||
|
|
@ -175,7 +176,7 @@ rabbit_fifo_returns_correlation(Config) ->
|
|||
Del ->
|
||||
exit({unexpected, Del})
|
||||
end
|
||||
after 2000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(await_msg_timeout)
|
||||
end,
|
||||
rabbit_quorum_queue:stop_server(ServerId),
|
||||
|
|
@ -208,7 +209,7 @@ duplicate_delivery(Config) ->
|
|||
end
|
||||
end
|
||||
end
|
||||
after 2000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(await_msg_timeout)
|
||||
end
|
||||
end,
|
||||
|
|
@ -281,7 +282,7 @@ detects_lost_delivery(Config) ->
|
|||
receive
|
||||
{ra_event, _, {machine, {delivery, _, [{_, {_, msg1}}]}}} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(await_delivery_timeout)
|
||||
end,
|
||||
|
||||
|
|
@ -316,7 +317,7 @@ returns(Config) ->
|
|||
?assertEqual(undefined, mc:get_annotation(<<"x-delivery-count">>, Msg1Out0)),
|
||||
?assertEqual(undefined, mc:get_annotation(delivery_count, Msg1Out0)),
|
||||
rabbit_fifo_client:return(<<"tag">>, [MsgId], FC2)
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(),
|
||||
exit(await_delivery_timeout)
|
||||
end,
|
||||
|
|
@ -333,7 +334,7 @@ returns(Config) ->
|
|||
%% delivery_count should _not_ be incremented for a return
|
||||
?assertEqual(undefined, mc:get_annotation(delivery_count, Msg1Out)),
|
||||
rabbit_fifo_client:modify(<<"tag">>, [MsgId1], true, false, #{}, FC4)
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(),
|
||||
exit(await_delivery_timeout_2)
|
||||
end,
|
||||
|
|
@ -349,7 +350,7 @@ returns(Config) ->
|
|||
%% delivery_count should be incremented for a modify with delivery_failed = true
|
||||
?assertEqual(1, mc:get_annotation(delivery_count, Msg2Out)),
|
||||
rabbit_fifo_client:settle(<<"tag">>, [MsgId2], FC6)
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(),
|
||||
exit(await_delivery_timeout_3)
|
||||
end,
|
||||
|
|
@ -377,7 +378,7 @@ returns_after_down(Config) ->
|
|||
receive
|
||||
{'DOWN', MonRef, _, _, _} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
ct:fail("waiting for process exit timed out")
|
||||
end,
|
||||
rabbit_ct_helpers:await_condition(
|
||||
|
|
@ -411,7 +412,7 @@ resends_after_lost_applied(Config) ->
|
|||
receive
|
||||
{ra_event, _, {applied, _}} ->
|
||||
ok
|
||||
after 500 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(await_ra_event_timeout)
|
||||
end,
|
||||
% send another message
|
||||
|
|
@ -477,7 +478,7 @@ discard(Config) ->
|
|||
[msg1] = Letters,
|
||||
rejected = Reason,
|
||||
ok
|
||||
after 500 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(),
|
||||
exit(dead_letter_timeout)
|
||||
end,
|
||||
|
|
@ -571,7 +572,7 @@ lost_delivery(Config) ->
|
|||
{ra_event, _, Evt} ->
|
||||
ct:pal("dropping event ~tp", [Evt]),
|
||||
ok
|
||||
after 500 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(await_ra_event_timeout)
|
||||
end,
|
||||
% send another message
|
||||
|
|
@ -744,7 +745,7 @@ test_queries(Config) ->
|
|||
end),
|
||||
receive
|
||||
ready -> ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(ready_timeout)
|
||||
end,
|
||||
F0 = rabbit_fifo_client:init([ServerId], 4),
|
||||
|
|
@ -820,7 +821,7 @@ receive_ra_events(Applied, Deliveries, Acc) ->
|
|||
receive_ra_events(Applied, Deliveries - length(MsgIds), [Evt | Acc]);
|
||||
{ra_event, _, _} = Evt ->
|
||||
receive_ra_events(Applied, Deliveries, [Evt | Acc])
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit({missing_events, Applied, Deliveries, Acc})
|
||||
end.
|
||||
|
||||
|
|
@ -832,7 +833,7 @@ receive_ra_events(Acc) ->
|
|||
receive
|
||||
{ra_event, _, _} = Evt ->
|
||||
receive_ra_events([Evt | Acc])
|
||||
after 500 ->
|
||||
after 1000 ->
|
||||
Acc
|
||||
end.
|
||||
|
||||
|
|
@ -892,7 +893,7 @@ flush() ->
|
|||
Msg ->
|
||||
ct:pal("flushed: ~w~n", [Msg]),
|
||||
flush()
|
||||
after 10 ->
|
||||
after 100 ->
|
||||
ok
|
||||
end.
|
||||
|
||||
|
|
|
|||
|
|
@ -135,7 +135,7 @@ publish_expect_return(Config, E, Node) ->
|
|||
receive
|
||||
{#'basic.return'{}, _} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after 30_000 ->
|
||||
flush(100),
|
||||
ct:fail("no return received")
|
||||
end
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
-import(rabbit_ct_helpers, [await_condition/2]).
|
||||
-define(WAIT, 5000).
|
||||
-define(TIMEOUT, 30_000).
|
||||
|
||||
suite() ->
|
||||
[{timetrap, 15 * 60_000}].
|
||||
|
|
@ -1009,7 +1010,7 @@ consume(Config) ->
|
|||
multiple = false}),
|
||||
_ = amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}),
|
||||
ok = amqp_channel:close(Ch1)
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
ct:fail(timeout)
|
||||
end,
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
|
||||
|
|
@ -1073,8 +1074,8 @@ consume_timestamp_offset(Config) ->
|
|||
receive
|
||||
#'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
flush(),
|
||||
after ?TIMEOUT ->
|
||||
flush(),
|
||||
exit(consume_ok_timeout)
|
||||
end,
|
||||
|
||||
|
|
@ -1108,7 +1109,7 @@ consume_timestamp_last_offset(Config) ->
|
|||
receive
|
||||
#'basic.consume_ok'{consumer_tag = CTag} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(missing_consume_ok)
|
||||
end,
|
||||
|
||||
|
|
@ -1178,7 +1179,7 @@ basic_cancel(Config) ->
|
|||
{#'basic.deliver'{}, _} ->
|
||||
amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = CTag}),
|
||||
?assertMatch([], filter_consumers(Config, Server, CTag))
|
||||
after 10000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(timeout)
|
||||
end,
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
|
||||
|
|
@ -1203,7 +1204,7 @@ receive_basic_cancel_on_queue_deletion(Config) ->
|
|||
receive
|
||||
#'basic.cancel'{consumer_tag = CTag} ->
|
||||
ok
|
||||
after 10000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(timeout)
|
||||
end.
|
||||
|
||||
|
|
@ -1261,7 +1262,7 @@ keep_consuming_on_leader_restart(Config) ->
|
|||
{#'basic.deliver'{delivery_tag = DeliveryTag1}, _} ->
|
||||
ok = amqp_channel:cast(Ch2, #'basic.ack'{delivery_tag = DeliveryTag1,
|
||||
multiple = false})
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(timeout)
|
||||
end,
|
||||
|
||||
|
|
@ -1275,7 +1276,7 @@ keep_consuming_on_leader_restart(Config) ->
|
|||
{#'basic.deliver'{delivery_tag = DeliveryTag2}, _} ->
|
||||
ok = amqp_channel:cast(Ch2, #'basic.ack'{delivery_tag = DeliveryTag2,
|
||||
multiple = false})
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(timeout)
|
||||
end,
|
||||
|
||||
|
|
@ -1396,7 +1397,7 @@ consume_and_(Config, AckFun) ->
|
|||
?assertMatch({'queue.declare_ok', Q, _MsgCount, 0},
|
||||
declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
|
||||
queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]])
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(timeout)
|
||||
end,
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
|
||||
|
|
@ -1588,7 +1589,7 @@ consume_from_next(Config, Args) ->
|
|||
receive
|
||||
#'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
|
||||
ok
|
||||
after 10000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(consume_ok_failed)
|
||||
end,
|
||||
|
||||
|
|
@ -1684,7 +1685,7 @@ consume_while_deleting_replica(Config) ->
|
|||
ok;
|
||||
{_, #amqp_msg{}} ->
|
||||
exit(unexpected_message)
|
||||
after 30000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(missing_consumer_cancel)
|
||||
end,
|
||||
|
||||
|
|
@ -1712,10 +1713,10 @@ consume_credit(Config) ->
|
|||
|
||||
%% We expect to receive exactly 2 messages.
|
||||
DTag1 = receive {#'basic.deliver'{delivery_tag = Tag1}, _} -> Tag1
|
||||
after 5000 -> ct:fail({missing_delivery, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE})
|
||||
end,
|
||||
_DTag2 = receive {#'basic.deliver'{delivery_tag = Tag2}, _} -> Tag2
|
||||
after 5000 -> ct:fail({missing_delivery, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE})
|
||||
end,
|
||||
receive {#'basic.deliver'{}, _} -> ct:fail({unexpected_delivery, ?LINE})
|
||||
after 100 -> ok
|
||||
|
|
@ -1725,7 +1726,7 @@ consume_credit(Config) ->
|
|||
ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DTag1,
|
||||
multiple = false}),
|
||||
DTag3 = receive {#'basic.deliver'{delivery_tag = Tag3}, _} -> Tag3
|
||||
after 5000 -> ct:fail({missing_delivery, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE})
|
||||
end,
|
||||
receive {#'basic.deliver'{}, _} ->
|
||||
ct:fail({unexpected_delivery, ?LINE})
|
||||
|
|
@ -1747,11 +1748,11 @@ consume_credit0(Ch, DTagPrev) ->
|
|||
multiple = true}),
|
||||
%% Receive 1st message.
|
||||
receive {#'basic.deliver'{}, _} -> ok
|
||||
after 5000 -> ct:fail({missing_delivery, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE})
|
||||
end,
|
||||
%% Receive 2nd message.
|
||||
DTag = receive {#'basic.deliver'{delivery_tag = T}, _} -> T
|
||||
after 5000 -> ct:fail({missing_delivery, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_delivery, ?LINE})
|
||||
end,
|
||||
%% We shouldn't receive more messages given that AMQP 0.9.1 prefetch count is 2.
|
||||
receive {#'basic.deliver'{}, _} -> ct:fail({unexpected_delivery, ?LINE})
|
||||
|
|
@ -1813,7 +1814,7 @@ consume_credit_out_of_order_ack(Config) ->
|
|||
receive
|
||||
{#'basic.deliver'{}, _} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(timeout)
|
||||
end,
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
|
||||
|
|
@ -1853,7 +1854,7 @@ consume_credit_multiple_ack(Config) ->
|
|||
receive
|
||||
{#'basic.deliver'{}, _} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(timeout)
|
||||
end,
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
|
||||
|
|
@ -2066,7 +2067,7 @@ leader_failover_dedupe(Config) ->
|
|||
|
||||
N = receive
|
||||
{last_msg, X} -> X
|
||||
after 2000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(last_msg_timeout)
|
||||
end,
|
||||
%% validate that no duplicates were written even though an internal
|
||||
|
|
@ -2508,7 +2509,7 @@ dead_letter_target(Config) ->
|
|||
receive
|
||||
#'basic.consume_ok'{consumer_tag = CTag} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(basic_consume_ok_timeout)
|
||||
end,
|
||||
receive
|
||||
|
|
@ -2517,7 +2518,7 @@ dead_letter_target(Config) ->
|
|||
requeue =false,
|
||||
multiple = false}),
|
||||
queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]])
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(timeout)
|
||||
end,
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
|
||||
|
|
@ -2602,7 +2603,7 @@ receive_filtered_batch(Ch, Count, ExpectedSize) ->
|
|||
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
|
||||
multiple = false}),
|
||||
receive_filtered_batch(Ch, Count + 1, ExpectedSize)
|
||||
after 10000 ->
|
||||
after ?TIMEOUT ->
|
||||
flush(),
|
||||
exit({not_enough_messages, Count})
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -161,6 +161,6 @@ publish_confirm(Ch, QName) ->
|
|||
#'basic.nack'{} ->
|
||||
ct:pal("NOT CONFIRMED! ~ts", [QName]),
|
||||
fail
|
||||
after 10000 ->
|
||||
after 30000 ->
|
||||
exit(confirm_timeout)
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -179,13 +179,13 @@ fallback_to_another_consumer_when_first_one_is_cancelled_qos1(Config) ->
|
|||
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q,
|
||||
consumer_tag = CTag1}, self()),
|
||||
receive #'basic.consume_ok'{consumer_tag = CTag1} -> ok
|
||||
after 5000 -> ct:fail(timeout_ctag1)
|
||||
after ?TIMEOUT -> ct:fail(timeout_ctag1)
|
||||
end,
|
||||
|
||||
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q,
|
||||
consumer_tag = CTag2}, self()),
|
||||
receive #'basic.consume_ok'{consumer_tag = CTag2} -> ok
|
||||
after 5000 -> ct:fail(timeout_ctag2)
|
||||
after ?TIMEOUT -> ct:fail(timeout_ctag2)
|
||||
end,
|
||||
|
||||
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
|
||||
|
|
@ -195,12 +195,12 @@ fallback_to_another_consumer_when_first_one_is_cancelled_qos1(Config) ->
|
|||
DTag1 = receive {#'basic.deliver'{consumer_tag = CTag1,
|
||||
delivery_tag = DTag},
|
||||
#amqp_msg{payload = <<"m1">>}} -> DTag
|
||||
after 5000 -> ct:fail(timeout_m1)
|
||||
after ?TIMEOUT -> ct:fail(timeout_m1)
|
||||
end,
|
||||
|
||||
#'basic.cancel_ok'{consumer_tag = CTag1} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag1}),
|
||||
receive #'basic.cancel_ok'{consumer_tag = CTag1} -> ok
|
||||
after 5000 -> ct:fail(missing_cancel)
|
||||
after ?TIMEOUT -> ct:fail(missing_cancel)
|
||||
end,
|
||||
|
||||
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag1}),
|
||||
|
|
@ -208,7 +208,7 @@ fallback_to_another_consumer_when_first_one_is_cancelled_qos1(Config) ->
|
|||
receive {#'basic.deliver'{consumer_tag = CTag2},
|
||||
#amqp_msg{payload = <<"m2">>}} -> ok;
|
||||
Unexpected -> ct:fail({unexpected, Unexpected})
|
||||
after 5000 -> ct:fail(timeout_m2)
|
||||
after ?TIMEOUT -> ct:fail(timeout_m2)
|
||||
end,
|
||||
amqp_connection:close(C).
|
||||
|
||||
|
|
@ -385,7 +385,7 @@ wait_for_messages(ExpectedCount, State) ->
|
|||
receive
|
||||
{message, {MessagesPerConsumer, MessageCount}} ->
|
||||
wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount})
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
{missing, ExpectedCount, State}
|
||||
end.
|
||||
|
||||
|
|
@ -393,7 +393,7 @@ wait_for_cancel_ok() ->
|
|||
receive
|
||||
{cancel_ok, CTag} ->
|
||||
{cancel_ok, CTag}
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
throw(consumer_cancel_ok_timeout)
|
||||
end.
|
||||
|
||||
|
|
@ -402,7 +402,7 @@ receive_deliver() ->
|
|||
{#'basic.deliver'{consumer_tag = CTag,
|
||||
delivery_tag = DTag}, _} ->
|
||||
{CTag, DTag}
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
exit(deliver_timeout)
|
||||
end.
|
||||
|
||||
|
|
|
|||
|
|
@ -111,7 +111,7 @@ amqp_x_cc_annotation(Config) ->
|
|||
?assertEqual(
|
||||
<<"write access to topic 'x.1' in exchange 'amq.topic' in vhost '/' refused for user 'guest'">>,
|
||||
Description1)
|
||||
after 5000 -> amqp_utils:flush(missing_ended),
|
||||
after 30_000 -> amqp_utils:flush(missing_ended),
|
||||
ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
|
|
@ -134,7 +134,7 @@ amqp_x_cc_annotation(Config) ->
|
|||
?assertEqual(
|
||||
<<"write access to topic 'x.2' in exchange 'amq.topic' in vhost '/' refused for user 'guest'">>,
|
||||
Description2)
|
||||
after 5000 -> amqp_utils:flush(missing_ended),
|
||||
after 30_000 -> amqp_utils:flush(missing_ended),
|
||||
ct:fail({missing_event, ?LINE})
|
||||
end,
|
||||
|
||||
|
|
@ -178,7 +178,7 @@ amqpl_headers(Header, Config) ->
|
|||
#amqp_msg{payload = <<"m1">>,
|
||||
props = #'P_basic'{headers = [{Header, array, [{longstr, <<"a.2">>}]}]}}),
|
||||
receive #'basic.ack'{} -> ok
|
||||
after 5000 -> ct:fail({missing_confirm, ?LINE})
|
||||
after 30_000 -> ct:fail({missing_confirm, ?LINE})
|
||||
end,
|
||||
|
||||
monitor(process, Ch1),
|
||||
|
|
@ -412,6 +412,6 @@ assert_channel_down(Ch, Reason) ->
|
|||
{shutdown,
|
||||
{server_initiated_close, 403, Reason}}} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
after 30_000 ->
|
||||
ct:fail({did_not_receive, Reason})
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -90,7 +90,7 @@ return_after_commit(Config) ->
|
|||
Result = receive
|
||||
{#'basic.return'{}, _} ->
|
||||
return_before_commit
|
||||
after 1000 ->
|
||||
after 30_000 ->
|
||||
return_after_commit
|
||||
end,
|
||||
?assertEqual(return_after_commit, Result),
|
||||
|
|
|
|||
|
|
@ -103,7 +103,7 @@ stream(Config) ->
|
|||
DelTag = receive
|
||||
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
|
||||
DeliveryTag
|
||||
after 5000 ->
|
||||
after 30_000 ->
|
||||
ct:fail(timeout)
|
||||
end,
|
||||
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DelTag,
|
||||
|
|
|
|||
|
|
@ -173,7 +173,7 @@ file_handle_cache_reserve_open_file_above_limit1(_Config) ->
|
|||
receive
|
||||
opened ->
|
||||
throw(error_file_opened)
|
||||
after 1000 ->
|
||||
after 30_000 ->
|
||||
%% Let's release 5 file handles, that should leave
|
||||
%% enough free for the `open` to go through
|
||||
file_handle_cache:set_reservation(2),
|
||||
|
|
@ -183,7 +183,7 @@ file_handle_cache_reserve_open_file_above_limit1(_Config) ->
|
|||
opened ->
|
||||
ok = file_handle_cache:set_limit(Limit),
|
||||
passed
|
||||
after 5000 ->
|
||||
after 30_000 ->
|
||||
throw(error_file_not_released)
|
||||
end
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -924,7 +924,7 @@ test_failed_token_refresh_case1(Config) ->
|
|||
?assertExit({{shutdown, {server_initiated_close, 403, _}}, _},
|
||||
amqp_channel:call(Ch2, #'queue.declare'{queue = <<"a.q">>, exclusive = true})),
|
||||
|
||||
close_connection(Conn).
|
||||
catch close_connection(Conn).
|
||||
|
||||
refreshed_token_cannot_change_username(Config) ->
|
||||
{_, Token} = generate_valid_token_with_sub(Config, <<"username">>),
|
||||
|
|
@ -950,7 +950,7 @@ test_failed_token_refresh_case2(Config) ->
|
|||
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 530, _}}}, _},
|
||||
amqp_connection:open_channel(Conn)),
|
||||
|
||||
close_connection(Conn).
|
||||
catch close_connection(Conn).
|
||||
|
||||
|
||||
test_successful_connection_with_with_single_scope_alias_in_extra_scopes_source(Config) ->
|
||||
|
|
|
|||
|
|
@ -672,7 +672,7 @@ setup_rabbit_auth_backend_mqtt_mock(Config) ->
|
|||
receive
|
||||
{ok, SP} -> SP
|
||||
after
|
||||
3000 -> ct:fail("timeout waiting for rabbit_auth_backend_mqtt_mock:setup/1")
|
||||
30_000 -> ct:fail("timeout waiting for rabbit_auth_backend_mqtt_mock:setup/1")
|
||||
end.
|
||||
|
||||
client_id_propagation(Config) ->
|
||||
|
|
@ -1333,6 +1333,6 @@ assert_connection_closed(ClientPid) ->
|
|||
{'EXIT', ClientPid, {shutdown, tcp_closed}} ->
|
||||
ok
|
||||
after
|
||||
2000 ->
|
||||
30_000 ->
|
||||
ct:fail("timed out waiting for exit message")
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -53,6 +53,7 @@
|
|||
-define(RC_SERVER_SHUTTING_DOWN, 16#8B).
|
||||
-define(RC_KEEP_ALIVE_TIMEOUT, 16#8D).
|
||||
-define(RC_SESSION_TAKEN_OVER, 16#8E).
|
||||
-define(TIMEOUT, 30_000).
|
||||
|
||||
all() ->
|
||||
[{group, mqtt}].
|
||||
|
|
@ -738,28 +739,28 @@ pubsub(Config) ->
|
|||
receive {publish, #{client_pid := C1,
|
||||
qos := 1,
|
||||
payload := <<"m1">>}} -> ok
|
||||
after 1000 -> ct:fail("missing m1")
|
||||
after ?TIMEOUT -> ct:fail("missing m1")
|
||||
end,
|
||||
|
||||
ok = emqtt:publish(C0, Topic1, <<"m2">>, qos0),
|
||||
receive {publish, #{client_pid := C1,
|
||||
qos := 0,
|
||||
payload := <<"m2">>}} -> ok
|
||||
after 1000 -> ct:fail("missing m2")
|
||||
after ?TIMEOUT -> ct:fail("missing m2")
|
||||
end,
|
||||
|
||||
{ok, _} = emqtt:publish(C1, Topic0, <<"m3">>, qos1),
|
||||
receive {publish, #{client_pid := C0,
|
||||
qos := 1,
|
||||
payload := <<"m3">>}} -> ok
|
||||
after 1000 -> ct:fail("missing m3")
|
||||
after ?TIMEOUT -> ct:fail("missing m3")
|
||||
end,
|
||||
|
||||
ok = emqtt:publish(C1, Topic0, <<"m4">>, qos0),
|
||||
receive {publish, #{client_pid := C0,
|
||||
qos := 0,
|
||||
payload := <<"m4">>}} -> ok
|
||||
after 1000 -> ct:fail("missing m4")
|
||||
after ?TIMEOUT -> ct:fail("missing m4")
|
||||
end,
|
||||
|
||||
ok = emqtt:disconnect(C0),
|
||||
|
|
@ -1130,7 +1131,7 @@ many_qos1_messages(Config) ->
|
|||
end, Payloads),
|
||||
receive
|
||||
proceed -> ok
|
||||
after 30000 ->
|
||||
after ?TIMEOUT ->
|
||||
ct:fail("message to proceed never received")
|
||||
end,
|
||||
ok = expect_publishes(C, Topic, Payloads),
|
||||
|
|
@ -1383,7 +1384,7 @@ keepalive(Config) ->
|
|||
retain := true,
|
||||
topic := WillTopic,
|
||||
payload := WillPayload}} -> ok
|
||||
after 3000 -> ct:fail("missing will")
|
||||
after ?TIMEOUT -> ct:fail("missing will")
|
||||
end,
|
||||
ok = emqtt:disconnect(C2).
|
||||
|
||||
|
|
@ -1453,7 +1454,7 @@ session_switch(Config, Disconnect) ->
|
|||
receive {publish, #{client_pid := C2,
|
||||
payload := <<"m1">>,
|
||||
qos := 0}} -> ok
|
||||
after 1000 -> ct:fail("did not receive m1 with QoS 0")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m1 with QoS 0")
|
||||
end,
|
||||
%% New connection should be able to unsubscribe.
|
||||
?assertMatch({ok, _, _}, emqtt:unsubscribe(C2, Topic)),
|
||||
|
|
@ -1720,7 +1721,7 @@ max_packet_size_authenticated(Config) ->
|
|||
v4 -> ok;
|
||||
v5 -> ?assertMatch(#{'Maximum-Packet-Size' := MaxSize}, ConnAckProps),
|
||||
receive {disconnected, _ReasonCodePacketTooLarge = 149, _Props} -> ok
|
||||
after 1000 -> ct:fail("missing DISCONNECT packet from server")
|
||||
after ?TIMEOUT -> ct:fail("missing DISCONNECT packet from server")
|
||||
end
|
||||
end,
|
||||
ok = rpc(Config, persistent_term, put, [Key, OldMaxSize]).
|
||||
|
|
@ -1805,7 +1806,7 @@ incoming_message_interceptors(Config) ->
|
|||
headers = [{<<"timestamp_in_ms">>, long, Millis} | _XHeaders]
|
||||
}}} ->
|
||||
ok
|
||||
after 5000 -> ct:fail(missing_deliver)
|
||||
after ?TIMEOUT -> ct:fail(missing_deliver)
|
||||
end,
|
||||
|
||||
delete_queue(Ch, Stream),
|
||||
|
|
@ -1831,7 +1832,7 @@ retained_message_conversion(Config) ->
|
|||
retain := true,
|
||||
topic := Topic,
|
||||
payload := Payload}} -> ok
|
||||
after 1000 -> ct:fail("missing retained message")
|
||||
after ?TIMEOUT -> ct:fail("missing retained message")
|
||||
end,
|
||||
ok = emqtt:publish(C, Topic, <<>>, [{retain, true}]),
|
||||
ok = emqtt:disconnect(C).
|
||||
|
|
@ -1917,7 +1918,7 @@ await_confirms_ordered(From, N, To) ->
|
|||
await_confirms_ordered(From, N + 1, To);
|
||||
Got ->
|
||||
ct:fail("Received unexpected message. Expected: ~p Got: ~p", [Expected, Got])
|
||||
after 10_000 ->
|
||||
after ?TIMEOUT ->
|
||||
ct:fail("Did not receive expected message: ~p", [Expected])
|
||||
end.
|
||||
|
||||
|
|
@ -1929,7 +1930,7 @@ await_confirms_unordered(From, Left) ->
|
|||
await_confirms_unordered(From, Left - 1);
|
||||
Other ->
|
||||
ct:fail("Received unexpected message: ~p", [Other])
|
||||
after 10_000 ->
|
||||
after ?TIMEOUT ->
|
||||
ct:fail("~b confirms are missing", [Left])
|
||||
end.
|
||||
|
||||
|
|
@ -1976,6 +1977,6 @@ assert_v5_disconnect_reason_code(Config, ReasonCode) ->
|
|||
v3 -> ok;
|
||||
v4 -> ok;
|
||||
v5 -> receive {disconnected, ReasonCode, _Props} -> ok
|
||||
after 1000 -> ct:fail("missing DISCONNECT packet from server")
|
||||
after ?TIMEOUT -> ct:fail("missing DISCONNECT packet from server")
|
||||
end
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -18,6 +18,8 @@
|
|||
-include_lib("amqp10_common/include/amqp10_framing.hrl").
|
||||
-include_lib("rabbitmq_stomp/include/rabbit_stomp_frame.hrl").
|
||||
|
||||
-define(TIMEOUT, 30_000).
|
||||
|
||||
-import(util,
|
||||
[connect/2,
|
||||
connect/4]).
|
||||
|
|
@ -156,7 +158,7 @@ mqtt_amqpl_mqtt(Config) ->
|
|||
{<<"key">>, <<"val">>},
|
||||
{<<"rabbit🐇"/utf8>>, <<"carrot🥕"/utf8>>}],
|
||||
lists:sort(UserProperty1))
|
||||
after 1000 -> ct:fail("did not receive reply")
|
||||
after ?TIMEOUT -> ct:fail("did not receive reply")
|
||||
end,
|
||||
|
||||
%% Another message MQTT 5.0 to AMQP 0.9.1, this time with QoS 0
|
||||
|
|
@ -188,7 +190,7 @@ amqpl_mqtt_gh_12707(Config) ->
|
|||
payload := MqttPayload}} ->
|
||||
?assertEqual(Topic, MqttTopic),
|
||||
?assertEqual(Payload, MqttPayload)
|
||||
after 5000 ->
|
||||
after ?TIMEOUT ->
|
||||
ct:fail("did not receive a delivery")
|
||||
end,
|
||||
|
||||
|
|
@ -277,7 +279,7 @@ mqtt_amqp_mqtt(Config) ->
|
|||
{ok, Sender} = amqp10_client:attach_sender_link(
|
||||
Session2, SenderLinkName, ReplyToAddress, unsettled),
|
||||
receive {amqp10_event, {link, Sender, credited}} -> ok
|
||||
after 1000 -> ct:fail(credited_timeout)
|
||||
after ?TIMEOUT -> ct:fail(credited_timeout)
|
||||
end,
|
||||
|
||||
DTag = <<"my-dtag">>,
|
||||
|
|
@ -292,7 +294,7 @@ mqtt_amqp_mqtt(Config) ->
|
|||
Msg2 = amqp10_msg:set_headers(#{durable => True}, Msg2b),
|
||||
ok = amqp10_client:send_msg(Sender, Msg2),
|
||||
receive {amqp10_disposition, {accepted, DTag}} -> ok
|
||||
after 1000 -> ct:fail(settled_timeout)
|
||||
after ?TIMEOUT -> ct:fail(settled_timeout)
|
||||
end,
|
||||
|
||||
ok = amqp10_client:detach_link(Sender),
|
||||
|
|
@ -311,7 +313,7 @@ mqtt_amqp_mqtt(Config) ->
|
|||
'Subscription-Identifier' := 999}
|
||||
},
|
||||
MqttMsg)
|
||||
after 1000 -> ct:fail("did not receive reply")
|
||||
after ?TIMEOUT -> ct:fail("did not receive reply")
|
||||
end,
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
|
|
@ -342,7 +344,7 @@ amqp_mqtt_amqp(Config) ->
|
|||
<<"sender">>,
|
||||
rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"t.1">>)),
|
||||
receive {amqp10_event, {link, Sender, credited}} -> ok
|
||||
after 2000 -> ct:fail(credited_timeout)
|
||||
after ?TIMEOUT -> ct:fail(credited_timeout)
|
||||
end,
|
||||
RequestBody = <<"my request">>,
|
||||
|
||||
|
|
@ -371,7 +373,7 @@ amqp_mqtt_amqp(Config) ->
|
|||
false ->
|
||||
ok
|
||||
end
|
||||
after 2000 -> ct:fail("did not receive request")
|
||||
after ?TIMEOUT -> ct:fail("did not receive request")
|
||||
end,
|
||||
|
||||
%% MQTT 5.0 to AMQP 1.0
|
||||
|
|
@ -420,7 +422,7 @@ amqp_mqtt(Qos, Config) ->
|
|||
<<"sender">>,
|
||||
rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"my.topic">>)),
|
||||
receive {amqp10_event, {link, Sender, credited}} -> ok
|
||||
after 2000 -> ct:fail(credited_timeout)
|
||||
after ?TIMEOUT -> ct:fail(credited_timeout)
|
||||
end,
|
||||
|
||||
%% single amqp-value section
|
||||
|
|
@ -463,28 +465,28 @@ amqp_mqtt(Qos, Config) ->
|
|||
false ->
|
||||
ok
|
||||
end
|
||||
after 5000 -> ct:fail({missing_publish, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_publish, ?LINE})
|
||||
end,
|
||||
receive {publish, #{payload := Payload2}} ->
|
||||
?assertEqual([Body2], amqp10_framing:decode_bin(Payload2))
|
||||
after 5000 -> ct:fail({missing_publish, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_publish, ?LINE})
|
||||
end,
|
||||
receive {publish, #{payload := Payload3}} ->
|
||||
?assertEqual(Body3, amqp10_framing:decode_bin(Payload3))
|
||||
after 5000 -> ct:fail({missing_publish, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_publish, ?LINE})
|
||||
end,
|
||||
receive {publish, #{payload := Payload4}} ->
|
||||
?assertEqual(Body4, amqp10_framing:decode_bin(Payload4))
|
||||
after 5000 -> ct:fail({missing_publish, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_publish, ?LINE})
|
||||
end,
|
||||
receive {publish, #{payload := Payload5}} ->
|
||||
?assertEqual(<<0, 255>>, Payload5)
|
||||
after 5000 -> ct:fail({missing_publish, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_publish, ?LINE})
|
||||
end,
|
||||
receive {publish, #{payload := Payload6}} ->
|
||||
%% We expect that RabbitMQ concatenates the binaries of multiple data sections.
|
||||
?assertEqual(<<0, 1, 2, 3>>, Payload6)
|
||||
after 5000 -> ct:fail({missing_publish, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_publish, ?LINE})
|
||||
end,
|
||||
|
||||
ok = emqtt:disconnect(C).
|
||||
|
|
@ -567,7 +569,7 @@ mqtt_stomp_mqtt(Config) ->
|
|||
'Correlation-Data' := Correlation,
|
||||
'User-Property' := UserProp}} = MqttMsg,
|
||||
?assert(lists:member({<<"x-key">>, <<"val4">>}, UserProp))
|
||||
after 1000 -> ct:fail("did not receive reply")
|
||||
after ?TIMEOUT -> ct:fail("did not receive reply")
|
||||
end,
|
||||
|
||||
ok = emqtt:disconnect(C).
|
||||
|
|
|
|||
|
|
@ -109,7 +109,7 @@ block_connack_timeout(Config) ->
|
|||
ClientMRef = monitor(process, Client),
|
||||
{error, connack_timeout} = emqtt:connect(Client),
|
||||
receive {'DOWN', ClientMRef, process, Client, connack_timeout} -> ok
|
||||
after 200 -> ct:fail("missing connack_timeout in client")
|
||||
after 30_000 -> ct:fail("missing connack_timeout in client")
|
||||
end,
|
||||
|
||||
MqttReader = rpc(Config, ?MODULE, mqtt_connection_pid, [Ports]),
|
||||
|
|
@ -122,7 +122,7 @@ block_connack_timeout(Config) ->
|
|||
%% We expect that MQTT reader process exits (without crashing)
|
||||
%% because our client already disconnected.
|
||||
ok
|
||||
after 2000 -> ct:fail("missing peername_not_known from server")
|
||||
after 30_000 -> ct:fail("missing peername_not_known from server")
|
||||
end,
|
||||
%% Ensure that our client is not registered.
|
||||
?assertEqual([], all_connection_pids(Config)),
|
||||
|
|
@ -331,6 +331,6 @@ num_received(Topic, Payload, N) ->
|
|||
{publish, #{topic := Topic,
|
||||
payload := Payload}} ->
|
||||
num_received(Topic, Payload, N + 1)
|
||||
after 1000 ->
|
||||
after 3000 ->
|
||||
N
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -213,7 +213,7 @@ recover_with_message_expiry_interval(Config) ->
|
|||
payload := <<"m1">>,
|
||||
properties := Props}}
|
||||
when map_size(Props) =:= 0 -> ok
|
||||
after 100 -> ct:fail("did not topic/1")
|
||||
after 30_000 -> ct:fail("did not topic/1")
|
||||
end,
|
||||
|
||||
receive {publish, #{client_pid := C2,
|
||||
|
|
@ -222,7 +222,7 @@ recover_with_message_expiry_interval(Config) ->
|
|||
payload := <<"m2">>,
|
||||
properties := #{'Message-Expiry-Interval' := MEI}}} ->
|
||||
assert_message_expiry_interval(100 - ElapsedSeconds2, MEI)
|
||||
after 100 -> ct:fail("did not topic/2")
|
||||
after 30_000 -> ct:fail("did not topic/2")
|
||||
end,
|
||||
|
||||
receive Unexpected -> ct:fail("Received unexpectedly: ~p", [Unexpected])
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ expect_publishes(Client, Topic, [Payload|Rest])
|
|||
payload := Other}} ->
|
||||
ct:fail("Received unexpected PUBLISH payload. Expected: ~p Got: ~p",
|
||||
[Payload, Other])
|
||||
after 3000 ->
|
||||
after 30_000 ->
|
||||
{publish_not_received, Payload}
|
||||
end.
|
||||
|
||||
|
|
@ -120,14 +120,14 @@ await_exit(Pid) ->
|
|||
receive
|
||||
{'EXIT', Pid, _} -> ok
|
||||
after
|
||||
20_000 -> ct:fail({missing_exit, Pid})
|
||||
30_000 -> ct:fail({missing_exit, Pid})
|
||||
end.
|
||||
|
||||
await_exit(Pid, Reason) ->
|
||||
receive
|
||||
{'EXIT', Pid, Reason} -> ok
|
||||
after
|
||||
20_000 -> ct:fail({missing_exit, Pid})
|
||||
30_000 -> ct:fail({missing_exit, Pid})
|
||||
end.
|
||||
|
||||
%% "CleanStart=0 and SessionExpiry=0xFFFFFFFF (UINT_MAX) for
|
||||
|
|
|
|||
|
|
@ -41,6 +41,8 @@
|
|||
-define(RC_SESSION_TAKEN_OVER, 16#8E).
|
||||
-define(RC_TOPIC_ALIAS_INVALID, 16#94).
|
||||
|
||||
-define(TIMEOUT, 30_000).
|
||||
|
||||
all() ->
|
||||
[{group, mqtt}].
|
||||
|
||||
|
|
@ -265,7 +267,7 @@ message_expiry(Config) ->
|
|||
payload := <<"m2">>,
|
||||
properties := Props}}
|
||||
when map_size(Props) =:= 0 -> ok
|
||||
after 1000 -> ct:fail("did not receive m2")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m2")
|
||||
end,
|
||||
|
||||
receive {publish, #{client_pid := Sub2,
|
||||
|
|
@ -276,7 +278,7 @@ message_expiry(Config) ->
|
|||
%% Application Message has been waiting in the Server" [MQTT-3.3.2-6]
|
||||
properties := #{'Message-Expiry-Interval' := MEI}}} ->
|
||||
assert_message_expiry_interval(10 - 2, MEI)
|
||||
after 100 -> ct:fail("did not receive m3")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m3")
|
||||
end,
|
||||
assert_nothing_received(),
|
||||
NumExpired = dead_letter_metric(messages_dead_lettered_expired_total, Config) - NumExpiredBefore,
|
||||
|
|
@ -346,7 +348,7 @@ message_expiry_retained_message(Config) ->
|
|||
payload := <<"m3.2">>,
|
||||
properties := Props}}
|
||||
when map_size(Props) =:= 0 -> ok
|
||||
after 100 -> ct:fail("did not topic3")
|
||||
after ?TIMEOUT -> ct:fail("did not topic3")
|
||||
end,
|
||||
|
||||
receive {publish, #{client_pid := Sub,
|
||||
|
|
@ -355,7 +357,7 @@ message_expiry_retained_message(Config) ->
|
|||
payload := <<"m4">>,
|
||||
properties := #{'Message-Expiry-Interval' := MEI}}} ->
|
||||
assert_message_expiry_interval(100 - 2, MEI)
|
||||
after 100 -> ct:fail("did not receive topic4")
|
||||
after ?TIMEOUT -> ct:fail("did not receive topic4")
|
||||
end,
|
||||
assert_nothing_received(),
|
||||
|
||||
|
|
@ -513,7 +515,7 @@ client_rejects_publish(Config) ->
|
|||
packet_id := PacketId}} ->
|
||||
%% Negatively ack the PUBLISH.
|
||||
emqtt:puback(C, PacketId, ?RC_UNSPECIFIED_ERROR)
|
||||
after 1000 ->
|
||||
after ?TIMEOUT ->
|
||||
ct:fail("did not receive PUBLISH")
|
||||
end,
|
||||
%% Even though we nacked the PUBLISH, we expect the server to not re-send the same message:
|
||||
|
|
@ -538,7 +540,7 @@ client_receive_maximum_min(Config) ->
|
|||
PacketId1 = receive {publish, #{payload := <<"m1">>,
|
||||
packet_id := Id}} ->
|
||||
Id
|
||||
after 1000 ->
|
||||
after ?TIMEOUT ->
|
||||
ct:fail("did not receive m1")
|
||||
end,
|
||||
assert_nothing_received(),
|
||||
|
|
@ -625,19 +627,19 @@ subscription_option_retain_as_published(Config) ->
|
|||
topic := <<"t/1">>,
|
||||
payload := <<"m1">>,
|
||||
retain := true}} -> ok
|
||||
after 1000 -> ct:fail("did not receive m1")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m1")
|
||||
end,
|
||||
receive {publish, #{client_pid := C1,
|
||||
topic := <<"t/2">>,
|
||||
payload := <<"m2">>,
|
||||
retain := false}} -> ok
|
||||
after 1000 -> ct:fail("did not receive m2")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m2")
|
||||
end,
|
||||
receive {publish, #{client_pid := C2,
|
||||
topic := <<"t/1">>,
|
||||
payload := <<"m1">>,
|
||||
retain := true}} -> ok
|
||||
after 1000 -> ct:fail("did not receive m1")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m1")
|
||||
end,
|
||||
{ok, _} = emqtt:publish(C1, <<"t/1">>, <<>>, [{retain, true}, {qos, 1}]),
|
||||
{ok, _} = emqtt:publish(C1, <<"t/2">>, <<>>, [{retain, true}, {qos, 1}]),
|
||||
|
|
@ -659,14 +661,14 @@ subscription_option_retain_as_published_wildcards(Config) ->
|
|||
%% No matching subscription has the
|
||||
%% Retain As Published option set.
|
||||
retain := false}} -> ok
|
||||
after 1000 -> ct:fail("did not receive m1")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m1")
|
||||
end,
|
||||
receive {publish, #{topic := <<"t/2">>,
|
||||
payload := <<"m2">>,
|
||||
%% (At least) one matching subscription has the
|
||||
%% Retain As Published option set.
|
||||
retain := true}} -> ok
|
||||
after 1000 -> ct:fail("did not receive m2")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m2")
|
||||
end,
|
||||
{ok, _} = emqtt:publish(C, <<"t/1">>, <<>>, [{retain, true}, {qos, 1}]),
|
||||
{ok, _} = emqtt:publish(C, <<"t/2">>, <<>>, [{retain, true}, {qos, 1}]),
|
||||
|
|
@ -736,7 +738,7 @@ subscription_identifier(Config) ->
|
|||
%% and that it used the same identifier for more than one of them. In this case the
|
||||
%% PUBLISH packet will carry multiple identical Subscription Identifiers." [v5 3.3.4]
|
||||
properties := #{'Subscription-Identifier' := [1, 1]}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive m1")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m1")
|
||||
end,
|
||||
receive {publish,
|
||||
#{client_pid := C2,
|
||||
|
|
@ -747,14 +749,14 @@ subscription_identifier(Config) ->
|
|||
%% packet the Subscription Identifiers for all matching subscriptions which have a
|
||||
%% Subscription Identifiers, their order is not significant [MQTT-3.3.4-4]." [v5 3.3.4]
|
||||
?assertEqual([1, 16#fffffff], lists:sort(Ids))
|
||||
after 1000 -> ct:fail("did not receive m2")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m2")
|
||||
end,
|
||||
receive {publish,
|
||||
#{client_pid := C2,
|
||||
topic := <<"t/3">>,
|
||||
payload := <<"m3">>,
|
||||
properties := #{'Subscription-Identifier' := 16#fffffff}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive m3")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m3")
|
||||
end,
|
||||
receive {publish,
|
||||
#{client_pid := C2,
|
||||
|
|
@ -762,7 +764,7 @@ subscription_identifier(Config) ->
|
|||
payload := <<"m4">>,
|
||||
properties := Props}} ->
|
||||
?assertNot(maps:is_key('Subscription-Identifier', Props))
|
||||
after 1000 -> ct:fail("did not receive m4")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m4")
|
||||
end,
|
||||
assert_nothing_received(),
|
||||
ok = emqtt:disconnect(C1),
|
||||
|
|
@ -784,7 +786,7 @@ subscription_identifier_amqp091(Config) ->
|
|||
topic := <<"a/a">>,
|
||||
payload := <<"m1">>,
|
||||
properties := #{'Subscription-Identifier' := 1}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive message m1")
|
||||
after ?TIMEOUT -> ct:fail("did not receive message m1")
|
||||
end,
|
||||
|
||||
%% Test routing to multiple queues.
|
||||
|
|
@ -796,14 +798,14 @@ subscription_identifier_amqp091(Config) ->
|
|||
topic := <<"a/b">>,
|
||||
payload := <<"m2">>,
|
||||
properties := #{'Subscription-Identifier' := 1}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive message m2")
|
||||
after ?TIMEOUT -> ct:fail("did not receive message m2")
|
||||
end,
|
||||
receive {publish,
|
||||
#{client_pid := C2,
|
||||
topic := <<"a/b">>,
|
||||
payload := <<"m2">>,
|
||||
properties := #{'Subscription-Identifier' := 16#fffffff}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive message m2")
|
||||
after ?TIMEOUT -> ct:fail("did not receive message m2")
|
||||
end,
|
||||
|
||||
ok = emqtt:disconnect(C1),
|
||||
|
|
@ -829,7 +831,7 @@ subscription_identifier_at_most_once_dead_letter(Config) ->
|
|||
topic := <<"dead letter/a">>,
|
||||
payload := <<"msg">>,
|
||||
properties := #{'Subscription-Identifier' := 1}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive msg")
|
||||
after ?TIMEOUT -> ct:fail("did not receive msg")
|
||||
end,
|
||||
ok = emqtt:disconnect(C),
|
||||
ok = rabbit_ct_client_helpers:close_channels_and_connection(Config, 0).
|
||||
|
|
@ -880,7 +882,7 @@ subscription_options_persisted(Config) ->
|
|||
retain := true,
|
||||
qos := 0,
|
||||
properties := #{'Subscription-Identifier' := 99}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive m2")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m2")
|
||||
end,
|
||||
assert_nothing_received(),
|
||||
{ok, _} = emqtt:publish(C2, <<"t2">>, <<>>, [{retain, true}, {qos, 1}]),
|
||||
|
|
@ -909,7 +911,7 @@ subscription_options_modify(Config) ->
|
|||
{ok, _} = emqtt:publish(C, Topic, <<"m2">>, qos1),
|
||||
receive {publish, #{payload := <<"m2">>,
|
||||
qos := 0 }} -> ok
|
||||
after 1000 -> ct:fail("did not receive m2")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m2")
|
||||
end,
|
||||
|
||||
%% modify QoS
|
||||
|
|
@ -918,7 +920,7 @@ subscription_options_modify(Config) ->
|
|||
receive {publish, #{payload := <<"m3">>,
|
||||
qos := 1,
|
||||
properties := #{'Subscription-Identifier' := 1}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive m3")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m3")
|
||||
end,
|
||||
|
||||
%% modify Subscription Identifier
|
||||
|
|
@ -926,7 +928,7 @@ subscription_options_modify(Config) ->
|
|||
{ok, _} = emqtt:publish(C, Topic, <<"m4">>, qos1),
|
||||
receive {publish, #{payload := <<"m4">>,
|
||||
properties := #{'Subscription-Identifier' := 2}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive m4")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m4")
|
||||
end,
|
||||
|
||||
%% remove Subscription Identifier
|
||||
|
|
@ -935,19 +937,19 @@ subscription_options_modify(Config) ->
|
|||
receive {publish, #{payload := <<"m5">>,
|
||||
retain := false,
|
||||
properties := Props}} when map_size(Props) =:= 0 -> ok
|
||||
after 1000 -> ct:fail("did not receive m5")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m5")
|
||||
end,
|
||||
|
||||
%% modify Retain As Published
|
||||
{ok, _, [1]} = emqtt:subscribe(C, Topic, [{rap, true}, {qos, 1}]),
|
||||
receive {publish, #{payload := <<"m5">>,
|
||||
retain := true}} -> ok
|
||||
after 1000 -> ct:fail("did not receive retained m5")
|
||||
after ?TIMEOUT -> ct:fail("did not receive retained m5")
|
||||
end,
|
||||
{ok, _} = emqtt:publish(C, Topic, <<"m6">>, [{retain, true}, {qos, 1}]),
|
||||
receive {publish, #{payload := <<"m6">>,
|
||||
retain := true}} -> ok
|
||||
after 1000 -> ct:fail("did not receive m6")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m6")
|
||||
end,
|
||||
|
||||
assert_nothing_received(),
|
||||
|
|
@ -979,13 +981,13 @@ subscription_options_modify_qos(Qos, Config) ->
|
|||
receive {publish, #{payload := <<"1">>,
|
||||
properties := Props}} ->
|
||||
?assertEqual(0, maps:size(Props))
|
||||
after 1000 -> ct:fail("did not receive 1")
|
||||
after ?TIMEOUT -> ct:fail("did not receive 1")
|
||||
end,
|
||||
%% Replace subscription while another client is sending messages.
|
||||
{ok, _, [Qos]} = emqtt:subscribe(Sub, #{'Subscription-Identifier' => 1}, Topic, Qos),
|
||||
Sender ! stop,
|
||||
NumSent = receive {N, Sender} -> N
|
||||
after 1000 -> ct:fail("could not stop publisher")
|
||||
after ?TIMEOUT -> ct:fail("could not stop publisher")
|
||||
end,
|
||||
ct:pal("Publisher sent ~b messages", [NumSent]),
|
||||
LastExpectedPayload = integer_to_binary(NumSent),
|
||||
|
|
@ -993,7 +995,7 @@ subscription_options_modify_qos(Qos, Config) ->
|
|||
qos := Qos,
|
||||
client_pid := Sub,
|
||||
properties := #{'Subscription-Identifier' := 1}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive ~s", [LastExpectedPayload])
|
||||
after ?TIMEOUT -> ct:fail("did not receive ~s", [LastExpectedPayload])
|
||||
end,
|
||||
case Qos of
|
||||
0 ->
|
||||
|
|
@ -1024,7 +1026,7 @@ session_upgrade_v3_v5_qos(Qos, Config) ->
|
|||
Sender = spawn_link(?MODULE, send, [self(), Pub, Topic, 0]),
|
||||
receive {publish, #{payload := <<"1">>,
|
||||
client_pid := Subv3}} -> ok
|
||||
after 1000 -> ct:fail("did not receive 1")
|
||||
after ?TIMEOUT -> ct:fail("did not receive 1")
|
||||
end,
|
||||
%% Upgrade session from v3 to v5 while another client is sending messages.
|
||||
ok = emqtt:disconnect(Subv3),
|
||||
|
|
@ -1032,14 +1034,14 @@ session_upgrade_v3_v5_qos(Qos, Config) ->
|
|||
?assertEqual(5, proplists:get_value(proto_ver, emqtt:info(Subv5))),
|
||||
Sender ! stop,
|
||||
NumSent = receive {N, Sender} -> N
|
||||
after 1000 -> ct:fail("could not stop publisher")
|
||||
after ?TIMEOUT -> ct:fail("could not stop publisher")
|
||||
end,
|
||||
ct:pal("Publisher sent ~b messages", [NumSent]),
|
||||
LastExpectedPayload = integer_to_binary(NumSent),
|
||||
receive {publish, #{payload := LastExpectedPayload,
|
||||
qos := Qos,
|
||||
client_pid := Subv5}} -> ok
|
||||
after 1000 -> ct:fail("did not receive ~s", [LastExpectedPayload])
|
||||
after ?TIMEOUT -> ct:fail("did not receive ~s", [LastExpectedPayload])
|
||||
end,
|
||||
case Qos of
|
||||
0 ->
|
||||
|
|
@ -1093,7 +1095,7 @@ session_upgrade_v3_v5_amqp091_pub(Config) ->
|
|||
receive {publish, #{payload := Payload,
|
||||
qos := 1,
|
||||
client_pid := Subv5}} -> ok
|
||||
after 1000 -> ct:fail("did not receive message")
|
||||
after ?TIMEOUT -> ct:fail("did not receive message")
|
||||
end,
|
||||
ok = emqtt:disconnect(Subv5),
|
||||
ok = rabbit_ct_client_helpers:close_channels_and_connection(Config, 0).
|
||||
|
|
@ -1118,7 +1120,7 @@ compatibility_v3_v5(Config) ->
|
|||
%% v5 features should work even when message comes from a v3 client.
|
||||
retain := true,
|
||||
properties := #{'Subscription-Identifier' := 99}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive from v3")
|
||||
after ?TIMEOUT -> ct:fail("did not receive from v3")
|
||||
end,
|
||||
{ok, _} = emqtt:publish(Cv3, <<"v5">>, <<>>, [{retain, true}, {qos, 1}]),
|
||||
ok = emqtt:disconnect(Cv3),
|
||||
|
|
@ -1180,7 +1182,7 @@ amqp091_cc_header(Config) ->
|
|||
#{topic := <<"first/key">>,
|
||||
payload := <<"msg">>,
|
||||
properties := #{'Subscription-Identifier' := 1}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive msg")
|
||||
after ?TIMEOUT -> ct:fail("did not receive msg")
|
||||
end,
|
||||
assert_nothing_received(),
|
||||
ok = emqtt:disconnect(C).
|
||||
|
|
@ -1193,7 +1195,7 @@ publish_property_content_type(Config) ->
|
|||
{ok, _} = emqtt:publish(C, Topic, #{'Content-Type' => <<"text/plain😎;charset=UTF-8"/utf8>>}, Payload, [{qos, 1}]),
|
||||
receive {publish, #{payload := Payload,
|
||||
properties := #{'Content-Type' := <<"text/plain😎;charset=UTF-8"/utf8>>}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive message")
|
||||
after ?TIMEOUT -> ct:fail("did not receive message")
|
||||
end,
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
|
|
@ -1205,11 +1207,11 @@ publish_property_payload_format_indicator(Config) ->
|
|||
{ok, _} = emqtt:publish(C, Topic, #{'Payload-Format-Indicator' => 1}, <<"m2">>, [{qos, 1}]),
|
||||
receive {publish, #{payload := <<"m1">>,
|
||||
properties := #{'Payload-Format-Indicator' := 0}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive m1")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m1")
|
||||
end,
|
||||
receive {publish, #{payload := <<"m2">>,
|
||||
properties := #{'Payload-Format-Indicator' := 1}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive m2")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m2")
|
||||
end,
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
|
|
@ -1241,7 +1243,7 @@ publish_property_response_topic_correlation_data(Config) ->
|
|||
ok = emqtt:publish(FrenchResponder, ResponseTopic,
|
||||
#{'Correlation-Data' => Corr0},
|
||||
<<"Bonjour Henri">>, [{qos, 0}])
|
||||
after 1000 -> ct:fail("French responder did not receive request")
|
||||
after ?TIMEOUT -> ct:fail("French responder did not receive request")
|
||||
end,
|
||||
receive {publish, #{client_pid := ItalianResponder,
|
||||
payload := <<"Harry">>,
|
||||
|
|
@ -1250,21 +1252,21 @@ publish_property_response_topic_correlation_data(Config) ->
|
|||
ok = emqtt:publish(ItalianResponder, ResponseTopic,
|
||||
#{'Correlation-Data' => Corr1},
|
||||
<<"Buongiorno Enrico">>, [{qos, 0}])
|
||||
after 1000 -> ct:fail("Italian responder did not receive request")
|
||||
after ?TIMEOUT -> ct:fail("Italian responder did not receive request")
|
||||
end,
|
||||
receive {publish, #{client_pid := Requester,
|
||||
properties := #{'Correlation-Data' := CorrelationItalian},
|
||||
payload := Payload0
|
||||
}} ->
|
||||
?assertEqual(<<"Buongiorno Enrico">>, Payload0)
|
||||
after 1000 -> ct:fail("did not receive Italian response")
|
||||
after ?TIMEOUT -> ct:fail("did not receive Italian response")
|
||||
end,
|
||||
receive {publish, #{client_pid := Requester,
|
||||
properties := #{'Correlation-Data' := CorrelationFrench},
|
||||
payload := Payload1
|
||||
}} ->
|
||||
?assertEqual(<<"Bonjour Henri">>, Payload1)
|
||||
after 1000 -> ct:fail("did not receive French response")
|
||||
after ?TIMEOUT -> ct:fail("did not receive French response")
|
||||
end,
|
||||
[ok = emqtt:disconnect(C) || C <- [Requester, FrenchResponder, ItalianResponder]].
|
||||
|
||||
|
|
@ -1285,7 +1287,7 @@ publish_property_user_property(Config) ->
|
|||
{ok, _} = emqtt:publish(C, Topic, #{'User-Property' => UserProperty}, Payload, [{qos, 1}]),
|
||||
receive {publish, #{payload := Payload,
|
||||
properties := #{'User-Property' := UserProperty}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive message")
|
||||
after ?TIMEOUT -> ct:fail("did not receive message")
|
||||
end,
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
|
|
@ -1338,7 +1340,7 @@ will_delay(WillDelay, SessionExpiry, ClientId, Config)
|
|||
end,
|
||||
receive {publish, #{payload := Msg}} -> ok;
|
||||
Unexpected -> ct:fail({unexpected_message, Unexpected})
|
||||
after 3000 -> ct:fail(will_message_timeout)
|
||||
after ?TIMEOUT -> ct:fail(will_message_timeout)
|
||||
end,
|
||||
%% Cleanup
|
||||
C2 = connect(ClientId, Config),
|
||||
|
|
@ -1358,7 +1360,7 @@ will_delay_session_expiry_zero(Config) ->
|
|||
erlang:exit(C, trigger_will_message),
|
||||
%% Since default Session Expiry Interval is 0, we expect Will Message immediately.
|
||||
receive {publish, #{payload := Msg}} -> ok
|
||||
after 500 -> ct:fail(will_message_timeout)
|
||||
after ?TIMEOUT -> ct:fail(will_message_timeout)
|
||||
end,
|
||||
ok = emqtt:disconnect(Sub).
|
||||
|
||||
|
|
@ -1467,16 +1469,16 @@ will_delay_session_takeover(Config) ->
|
|||
{will_topic, Topic},
|
||||
{will_payload, <<"will-4b">>}]),
|
||||
[receive {disconnected, ?RC_SESSION_TAKEN_OVER, #{}} -> ok
|
||||
after 1000 -> ct:fail("server did not disconnect us")
|
||||
after ?TIMEOUT -> ct:fail("server did not disconnect us")
|
||||
end || _ <- Clients],
|
||||
|
||||
receive {publish, #{client_pid := Sub,
|
||||
payload := <<"will-3a">>}} -> ok
|
||||
after 5000 -> ct:fail({missing_msg, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_msg, ?LINE})
|
||||
end,
|
||||
receive {publish, #{client_pid := Sub,
|
||||
payload := <<"will-4a">>}} -> ok
|
||||
after 5000 -> ct:fail({missing_msg, ?LINE})
|
||||
after ?TIMEOUT -> ct:fail({missing_msg, ?LINE})
|
||||
end,
|
||||
assert_nothing_received(),
|
||||
|
||||
|
|
@ -1544,7 +1546,7 @@ will_delay_message_expiry_publish_properties(Config) ->
|
|||
%% it has been waiting in the Server for 2 seconds to be consumed.
|
||||
assert_message_expiry_interval(20 - 2, MEI);
|
||||
Other -> ct:fail("received unexpected message: ~p", [Other])
|
||||
after 500 -> ct:fail("did not receive Will Message")
|
||||
after ?TIMEOUT -> ct:fail("did not receive Will Message")
|
||||
end,
|
||||
ok = emqtt:disconnect(Sub2).
|
||||
|
||||
|
|
@ -1594,7 +1596,7 @@ will_properties0(Config, WillDelayInterval) ->
|
|||
'Response-Topic' := <<"response/topic">>,
|
||||
'Correlation-Data' := CorrelationData} = Props}}
|
||||
when map_size(Props) =:= 5 -> ok
|
||||
after 1500 -> ct:fail("did not receive Will Message")
|
||||
after ?TIMEOUT -> ct:fail("did not receive Will Message")
|
||||
end,
|
||||
ok = emqtt:disconnect(Sub).
|
||||
|
||||
|
|
@ -1633,7 +1635,7 @@ retain_properties(Config) ->
|
|||
retain := true,
|
||||
qos := 1,
|
||||
properties := Props}} -> ok
|
||||
after 500 -> ct:fail("did not receive m1")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m1")
|
||||
end,
|
||||
receive {publish,
|
||||
#{client_pid := Sub,
|
||||
|
|
@ -1642,7 +1644,7 @@ retain_properties(Config) ->
|
|||
retain := true,
|
||||
qos := 1,
|
||||
properties := Props}} -> ok
|
||||
after 500 -> ct:fail("did not receive m2")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m2")
|
||||
end,
|
||||
{ok, _} = emqtt:publish(Sub, <<"t/1">>, <<>>, [{retain, true}, {qos, 1}]),
|
||||
{ok, _} = emqtt:publish(Sub, <<"t/2">>, <<>>, [{retain, true}, {qos, 1}]),
|
||||
|
|
@ -1675,7 +1677,7 @@ will_delay_node_restart(Config) ->
|
|||
T = erlang:monotonic_time(millisecond),
|
||||
ok = rabbit_ct_broker_helpers:drain_node(Config, 0),
|
||||
[receive {disconnected, ?RC_SERVER_SHUTTING_DOWN, #{}} -> ok
|
||||
after 10_000 -> ct:fail("server did not disconnect us")
|
||||
after ?TIMEOUT -> ct:fail("server did not disconnect us")
|
||||
end || _ <- ClientsNode0],
|
||||
ok = rabbit_ct_broker_helpers:stop_node(Config, 0),
|
||||
ElapsedMs = erlang:monotonic_time(millisecond) - T,
|
||||
|
|
@ -1687,12 +1689,12 @@ will_delay_node_restart(Config) ->
|
|||
%% After node 0 restarts, we should receive the Will Message promptly on both nodes 0 and 1.
|
||||
receive {publish, #{client_pid := Sub1,
|
||||
payload := Payload}} -> ok
|
||||
after 1000 -> ct:fail("did not receive Will Message on node 1")
|
||||
after ?TIMEOUT -> ct:fail("did not receive Will Message on node 1")
|
||||
end,
|
||||
Sub0b = connect(<<"sub0">>, Config, 0, [{clean_start, false}]),
|
||||
receive {publish, #{client_pid := Sub0b,
|
||||
payload := Payload}} -> ok
|
||||
after 1000 -> ct:fail("did not receive Will Message on node 0")
|
||||
after ?TIMEOUT -> ct:fail("did not receive Will Message on node 0")
|
||||
end,
|
||||
|
||||
ok = emqtt:disconnect(Sub0b),
|
||||
|
|
@ -1726,7 +1728,7 @@ session_switch_v3_v5(Config, Disconnect) ->
|
|||
#{client_pid := C2,
|
||||
payload := <<"m1">>,
|
||||
qos := 1}} -> ok
|
||||
after 1000 -> ct:fail("did not receive from m1")
|
||||
after ?TIMEOUT -> ct:fail("did not receive from m1")
|
||||
end,
|
||||
%% Modifying subscription with v5 specific feature should work.
|
||||
{ok, _, [1]} = emqtt:subscribe(C2, Topic, [{nl, true}, {qos, 1}]),
|
||||
|
|
@ -1745,7 +1747,7 @@ session_switch_v3_v5(Config, Disconnect) ->
|
|||
case Disconnect of
|
||||
true -> ok;
|
||||
false -> receive {disconnected, ?RC_SESSION_TAKEN_OVER, #{}} -> ok
|
||||
after 1000 -> ct:fail("missing DISCONNECT packet for C2")
|
||||
after ?TIMEOUT -> ct:fail("missing DISCONNECT packet for C2")
|
||||
end
|
||||
end,
|
||||
%% We expect that v5 specific subscription feature does not apply
|
||||
|
|
@ -1755,7 +1757,7 @@ session_switch_v3_v5(Config, Disconnect) ->
|
|||
#{client_pid := C3,
|
||||
payload := <<"m3">>,
|
||||
qos := 1}} -> ok
|
||||
after 1000 -> ct:fail("did not receive m3 with QoS 1")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m3 with QoS 1")
|
||||
end,
|
||||
%% Modifying the subscription once more with v3 client should work.
|
||||
{ok, _, [0]} = emqtt:subscribe(C3, Topic, qos0),
|
||||
|
|
@ -1764,7 +1766,7 @@ session_switch_v3_v5(Config, Disconnect) ->
|
|||
#{client_pid := C3,
|
||||
payload := <<"m4">>,
|
||||
qos := 0}} -> ok
|
||||
after 1000 -> ct:fail("did not receive m3 with QoS 0")
|
||||
after ?TIMEOUT -> ct:fail("did not receive m3 with QoS 0")
|
||||
end,
|
||||
|
||||
%% Unsubscribing in v3 should work.
|
||||
|
|
@ -1821,7 +1823,7 @@ topic_alias_server_to_client(Config) ->
|
|||
A1 = receive {publish, #{payload := <<"m1">>,
|
||||
topic := <<"t/1">>,
|
||||
properties := #{'Topic-Alias' := A1a}}} -> A1a
|
||||
after 500 -> ct:fail("Did not receive m1")
|
||||
after ?TIMEOUT -> ct:fail("Did not receive m1")
|
||||
end,
|
||||
|
||||
%% We don't expect a Topic Alias when the Topic Name consists of a single byte.
|
||||
|
|
@ -1830,14 +1832,14 @@ topic_alias_server_to_client(Config) ->
|
|||
topic := <<"t">>,
|
||||
properties := Props1}}
|
||||
when map_size(Props1) =:= 0 -> ok
|
||||
after 500 -> ct:fail("Did not receive m2")
|
||||
after ?TIMEOUT -> ct:fail("Did not receive m2")
|
||||
end,
|
||||
|
||||
{ok, _} = emqtt:publish(C1, <<"t/2">>, <<"m3">>, qos1),
|
||||
A2 = receive {publish, #{payload := <<"m3">>,
|
||||
topic := <<"t/2">>,
|
||||
properties := #{'Topic-Alias' := A2a}}} -> A2a
|
||||
after 500 -> ct:fail("Did not receive m3")
|
||||
after ?TIMEOUT -> ct:fail("Did not receive m3")
|
||||
end,
|
||||
?assertEqual([1, 2], lists:sort([A1, A2])),
|
||||
|
||||
|
|
@ -1848,7 +1850,7 @@ topic_alias_server_to_client(Config) ->
|
|||
topic := <<"t/3">>,
|
||||
properties := Props2}}
|
||||
when map_size(Props2) =:= 0 -> ok
|
||||
after 500 -> ct:fail("Did not receive m4")
|
||||
after ?TIMEOUT -> ct:fail("Did not receive m4")
|
||||
end,
|
||||
|
||||
%% Existing topic aliases should still be sent.
|
||||
|
|
@ -1858,13 +1860,13 @@ topic_alias_server_to_client(Config) ->
|
|||
topic := <<>>,
|
||||
properties := #{'Topic-Alias' := A1b}}} ->
|
||||
?assertEqual(A1, A1b)
|
||||
after 500 -> ct:fail("Did not receive m5")
|
||||
after ?TIMEOUT -> ct:fail("Did not receive m5")
|
||||
end,
|
||||
receive {publish, #{payload := <<"m6">>,
|
||||
topic := <<>>,
|
||||
properties := #{'Topic-Alias' := A2b}}} ->
|
||||
?assertEqual(A2, A2b)
|
||||
after 500 -> ct:fail("Did not receive m6")
|
||||
after ?TIMEOUT -> ct:fail("Did not receive m6")
|
||||
end,
|
||||
|
||||
ok = emqtt:disconnect(C1),
|
||||
|
|
@ -1890,13 +1892,13 @@ topic_alias_bidirectional(Config) ->
|
|||
payload := <<"m2">>,
|
||||
topic := Topic1,
|
||||
properties := #{'Topic-Alias' := 1}}} -> ok
|
||||
after 500 -> ct:fail("Did not receive m2")
|
||||
after ?TIMEOUT -> ct:fail("Did not receive m2")
|
||||
end,
|
||||
receive {publish, #{client_pid := C1,
|
||||
payload := <<"m4">>,
|
||||
topic := <<>>,
|
||||
properties := #{'Topic-Alias' := 1}}} -> ok
|
||||
after 500 -> ct:fail("Did not receive m4")
|
||||
after ?TIMEOUT -> ct:fail("Did not receive m4")
|
||||
end,
|
||||
ok = emqtt:disconnect(C1),
|
||||
ok = emqtt:disconnect(C2).
|
||||
|
|
@ -1950,7 +1952,7 @@ topic_alias_in_retained_message0(Config, TopicAliasMax, TopicAlias, ExpectedProp
|
|||
retain := true,
|
||||
properties := Props}} ->
|
||||
?assertEqual(ExpectedProps, Props)
|
||||
after 500 -> ct:fail("Did not receive retained message")
|
||||
after ?TIMEOUT -> ct:fail("Did not receive retained message")
|
||||
end,
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
|
|
@ -2107,7 +2109,7 @@ receive_correlations(Ctag, N, Set) ->
|
|||
#amqp_msg{props = #'P_basic'{correlation_id = Corr}}} ->
|
||||
?assert(is_binary(Corr)),
|
||||
receive_correlations(Ctag, N + 1, sets:add_element(Corr, Set))
|
||||
after 200 ->
|
||||
after 1000 ->
|
||||
{N, Set}
|
||||
end.
|
||||
|
||||
|
|
|
|||
|
|
@ -759,7 +759,7 @@ stream_pub_sub_metrics(Config) ->
|
|||
maps:with([rabbitmq_stream_consumer_max_offset_lag],
|
||||
parse_response(Body1))
|
||||
end,
|
||||
100),
|
||||
30000),
|
||||
|
||||
%% per-object metrics
|
||||
{_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=stream_consumer_metrics",
|
||||
|
|
|
|||
Loading…
Reference in New Issue