|
|
|
@ -17,7 +17,7 @@
|
|
|
|
|
-compile([export_all, nowarn_export_all]).
|
|
|
|
|
|
|
|
|
|
suite() ->
|
|
|
|
|
[{timetrap, {seconds, 120}}].
|
|
|
|
|
[{timetrap, {minutes, 4}}].
|
|
|
|
|
|
|
|
|
|
all() ->
|
|
|
|
|
[
|
|
|
|
@ -64,7 +64,8 @@ shared() ->
|
|
|
|
|
split_transfer,
|
|
|
|
|
transfer_unsettled,
|
|
|
|
|
subscribe,
|
|
|
|
|
subscribe_with_auto_flow,
|
|
|
|
|
subscribe_with_auto_flow_settled,
|
|
|
|
|
subscribe_with_auto_flow_unsettled,
|
|
|
|
|
outgoing_heartbeat,
|
|
|
|
|
roundtrip_large_messages,
|
|
|
|
|
transfer_id_vs_delivery_id
|
|
|
|
@ -290,12 +291,15 @@ roundtrip_large_messages(Config) ->
|
|
|
|
|
Hostname = ?config(rmq_hostname, Config),
|
|
|
|
|
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
|
|
|
|
|
OpenConf = #{address => Hostname, port => Port, sasl => anon},
|
|
|
|
|
DataKb = crypto:strong_rand_bytes(1024),
|
|
|
|
|
roundtrip(OpenConf, DataKb),
|
|
|
|
|
Data1Mb = binary:copy(DataKb, 1024),
|
|
|
|
|
roundtrip(OpenConf, Data1Mb),
|
|
|
|
|
roundtrip(OpenConf, binary:copy(Data1Mb, 8)),
|
|
|
|
|
ok = roundtrip(OpenConf, binary:copy(Data1Mb, 64)).
|
|
|
|
|
|
|
|
|
|
DataKb = rand:bytes(1024),
|
|
|
|
|
DataMb = rand:bytes(1024 * 1024),
|
|
|
|
|
Data8Mb = rand:bytes(8 * 1024 * 1024),
|
|
|
|
|
Data64Mb = rand:bytes(64 * 1024 * 1024),
|
|
|
|
|
ok = roundtrip(OpenConf, DataKb),
|
|
|
|
|
ok = roundtrip(OpenConf, DataMb),
|
|
|
|
|
ok = roundtrip(OpenConf, Data8Mb),
|
|
|
|
|
ok = roundtrip(OpenConf, Data64Mb).
|
|
|
|
|
|
|
|
|
|
roundtrip(OpenConf) ->
|
|
|
|
|
roundtrip(OpenConf, <<"banana">>).
|
|
|
|
@ -321,9 +325,10 @@ roundtrip(OpenConf, Body) ->
|
|
|
|
|
{error, link_not_found} = amqp10_client:detach_link(Sender),
|
|
|
|
|
{ok, Receiver} = amqp10_client:attach_receiver_link(
|
|
|
|
|
Session, <<"banana-receiver">>, <<"test1">>, settled, unsettled_state),
|
|
|
|
|
{ok, OutMsg} = amqp10_client:get_msg(Receiver, 60_000 * 4),
|
|
|
|
|
{ok, OutMsg} = amqp10_client:get_msg(Receiver, 4 * 60_000),
|
|
|
|
|
ok = amqp10_client:end_session(Session),
|
|
|
|
|
ok = amqp10_client:close_connection(Connection),
|
|
|
|
|
|
|
|
|
|
% ct:pal(?LOW_IMPORTANCE, "roundtrip message Out: ~tp~nIn: ~tp~n", [OutMsg, Msg]),
|
|
|
|
|
#{creation_time := Now} = amqp10_msg:properties(OutMsg),
|
|
|
|
|
#{<<"a_key">> := <<"a_value">>} = amqp10_msg:application_properties(OutMsg),
|
|
|
|
@ -502,7 +507,7 @@ transfer_unsettled(Config) ->
|
|
|
|
|
subscribe(Config) ->
|
|
|
|
|
Hostname = ?config(rmq_hostname, Config),
|
|
|
|
|
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
|
|
|
|
|
QueueName = <<"test-sub">>,
|
|
|
|
|
QueueName = atom_to_binary(?FUNCTION_NAME),
|
|
|
|
|
{ok, Connection} = amqp10_client:open_connection(Hostname, Port),
|
|
|
|
|
{ok, Session} = amqp10_client:begin_session(Connection),
|
|
|
|
|
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session,
|
|
|
|
@ -530,10 +535,11 @@ subscribe(Config) ->
|
|
|
|
|
ok = amqp10_client:end_session(Session),
|
|
|
|
|
ok = amqp10_client:close_connection(Connection).
|
|
|
|
|
|
|
|
|
|
subscribe_with_auto_flow(Config) ->
|
|
|
|
|
subscribe_with_auto_flow_settled(Config) ->
|
|
|
|
|
SenderSettleMode = settled,
|
|
|
|
|
Hostname = ?config(rmq_hostname, Config),
|
|
|
|
|
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
|
|
|
|
|
QueueName = <<"test-sub">>,
|
|
|
|
|
QueueName = atom_to_binary(?FUNCTION_NAME),
|
|
|
|
|
{ok, Connection} = amqp10_client:open_connection(Hostname, Port),
|
|
|
|
|
{ok, Session} = amqp10_client:begin_session(Connection),
|
|
|
|
|
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session,
|
|
|
|
@ -541,93 +547,109 @@ subscribe_with_auto_flow(Config) ->
|
|
|
|
|
QueueName),
|
|
|
|
|
await_link(Sender, credited, link_credit_timeout),
|
|
|
|
|
|
|
|
|
|
_ = publish_messages(Sender, <<"banana">>, 20),
|
|
|
|
|
%% Use sender settle mode 'settled'.
|
|
|
|
|
{ok, R1} = amqp10_client:attach_receiver_link(
|
|
|
|
|
Session, <<"sub-receiver-1">>, QueueName, settled),
|
|
|
|
|
await_link(R1, attached, attached_timeout),
|
|
|
|
|
ok = amqp10_client:flow_link_credit(R1, 5, 2),
|
|
|
|
|
?assertEqual(20, count_received_messages(R1)),
|
|
|
|
|
ok = amqp10_client:detach_link(R1),
|
|
|
|
|
publish_messages(Sender, <<"banana">>, 20),
|
|
|
|
|
{ok, Receiver} = amqp10_client:attach_receiver_link(
|
|
|
|
|
Session, <<"sub-receiver">>, QueueName, SenderSettleMode),
|
|
|
|
|
await_link(Receiver, attached, attached_timeout),
|
|
|
|
|
|
|
|
|
|
_ = publish_messages(Sender, <<"banana">>, 30),
|
|
|
|
|
ok = amqp10_client:flow_link_credit(Receiver, 5, 2),
|
|
|
|
|
?assertEqual(20, count_received_messages(Receiver)),
|
|
|
|
|
|
|
|
|
|
ok = amqp10_client:detach_link(Receiver),
|
|
|
|
|
ok = amqp10_client:detach_link(Sender),
|
|
|
|
|
ok = amqp10_client:end_session(Session),
|
|
|
|
|
ok = amqp10_client:close_connection(Connection).
|
|
|
|
|
|
|
|
|
|
subscribe_with_auto_flow_unsettled(Config) ->
|
|
|
|
|
SenderSettleMode = unsettled,
|
|
|
|
|
Hostname = ?config(rmq_hostname, Config),
|
|
|
|
|
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
|
|
|
|
|
QueueName = atom_to_binary(?FUNCTION_NAME),
|
|
|
|
|
{ok, Connection} = amqp10_client:open_connection(Hostname, Port),
|
|
|
|
|
{ok, Session} = amqp10_client:begin_session(Connection),
|
|
|
|
|
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session,
|
|
|
|
|
<<"sub-sender">>,
|
|
|
|
|
QueueName),
|
|
|
|
|
await_link(Sender, credited, link_credit_timeout),
|
|
|
|
|
|
|
|
|
|
_ = publish_messages(Sender, <<"1-">>, 30),
|
|
|
|
|
%% Use sender settle mode 'unsettled'.
|
|
|
|
|
%% This should require us to manually settle message in order to receive more messages.
|
|
|
|
|
{ok, R2} = amqp10_client:attach_receiver_link(Session, <<"sub-receiver-2">>, QueueName, unsettled),
|
|
|
|
|
await_link(R2, attached, attached_timeout),
|
|
|
|
|
ok = amqp10_client:flow_link_credit(R2, 5, 2),
|
|
|
|
|
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"sub-receiver-2">>, QueueName, SenderSettleMode),
|
|
|
|
|
await_link(Receiver, attached, attached_timeout),
|
|
|
|
|
ok = amqp10_client:flow_link_credit(Receiver, 5, 2),
|
|
|
|
|
%% We should receive exactly 5 messages.
|
|
|
|
|
[M1, _M2, M3, M4, M5] = receive_messages(R2, 5),
|
|
|
|
|
ok = assert_no_message(R2),
|
|
|
|
|
[M1, _M2, M3, M4, M5] = receive_messages(Receiver, 5),
|
|
|
|
|
ok = assert_no_message(Receiver),
|
|
|
|
|
|
|
|
|
|
%% Even when we accept the first 3 messages, the number of unsettled messages has not yet fallen below 2.
|
|
|
|
|
%% Therefore, the client should not yet grant more credits to the sender.
|
|
|
|
|
ok = amqp10_client_session:disposition(
|
|
|
|
|
R2, amqp10_msg:delivery_id(M1), amqp10_msg:delivery_id(M3), true, accepted),
|
|
|
|
|
ok = assert_no_message(R2),
|
|
|
|
|
Receiver, amqp10_msg:delivery_id(M1), amqp10_msg:delivery_id(M3), true, accepted),
|
|
|
|
|
ok = assert_no_message(Receiver),
|
|
|
|
|
|
|
|
|
|
%% When we accept 1 more message (the order in which we accept shouldn't matter, here we accept M5 before M4),
|
|
|
|
|
%% the number of unsettled messages now falls below 2 (since only M4 is left unsettled).
|
|
|
|
|
%% Therefore, the client should grant 5 credits to the sender.
|
|
|
|
|
%% Therefore, we should receive 5 more messages.
|
|
|
|
|
ok = amqp10_client:accept_msg(R2, M5),
|
|
|
|
|
[_M6, _M7, _M8, _M9, M10] = receive_messages(R2, 5),
|
|
|
|
|
ok = assert_no_message(R2),
|
|
|
|
|
ok = amqp10_client:accept_msg(Receiver, M5),
|
|
|
|
|
[_M6, _M7, _M8, _M9, M10] = receive_messages(Receiver, 5),
|
|
|
|
|
ok = assert_no_message(Receiver),
|
|
|
|
|
|
|
|
|
|
%% It shouldn't matter how we settle messages, therefore we use 'rejected' this time.
|
|
|
|
|
%% Settling all in flight messages should cause us to receive exactly 5 more messages.
|
|
|
|
|
ok = amqp10_client_session:disposition(
|
|
|
|
|
R2, amqp10_msg:delivery_id(M4), amqp10_msg:delivery_id(M10), true, rejected),
|
|
|
|
|
[M11, _M12, _M13, _M14, M15] = receive_messages(R2, 5),
|
|
|
|
|
ok = assert_no_message(R2),
|
|
|
|
|
Receiver, amqp10_msg:delivery_id(M4), amqp10_msg:delivery_id(M10), true, rejected),
|
|
|
|
|
[M11, _M12, _M13, _M14, M15] = receive_messages(Receiver, 5),
|
|
|
|
|
ok = assert_no_message(Receiver),
|
|
|
|
|
|
|
|
|
|
%% Dynamically decrease link credit.
|
|
|
|
|
%% Since we explicitly tell to grant 3 new credits now, we expect to receive 3 more messages.
|
|
|
|
|
ok = amqp10_client:flow_link_credit(R2, 3, 3),
|
|
|
|
|
[M16, _M17, M18] = receive_messages(R2, 3),
|
|
|
|
|
ok = assert_no_message(R2),
|
|
|
|
|
ok = amqp10_client:flow_link_credit(Receiver, 3, 3),
|
|
|
|
|
[M16, _M17, M18] = receive_messages(Receiver, 3),
|
|
|
|
|
ok = assert_no_message(Receiver),
|
|
|
|
|
|
|
|
|
|
ok = amqp10_client_session:disposition(
|
|
|
|
|
R2, amqp10_msg:delivery_id(M11), amqp10_msg:delivery_id(M15), true, accepted),
|
|
|
|
|
Receiver, amqp10_msg:delivery_id(M11), amqp10_msg:delivery_id(M15), true, accepted),
|
|
|
|
|
%% However, the RenewWhenBelow=3 still refers to all unsettled messages.
|
|
|
|
|
%% Right now we have 3 messages (M16, M17, M18) unsettled.
|
|
|
|
|
ok = assert_no_message(R2),
|
|
|
|
|
ok = assert_no_message(Receiver),
|
|
|
|
|
|
|
|
|
|
%% Settling 1 out of these 3 messages causes RenewWhenBelow to fall below 3 resulting
|
|
|
|
|
%% in 3 new messages to be received.
|
|
|
|
|
ok = amqp10_client:accept_msg(R2, M18),
|
|
|
|
|
[_M19, _M20, _M21] = receive_messages(R2, 3),
|
|
|
|
|
ok = assert_no_message(R2),
|
|
|
|
|
ok = amqp10_client:accept_msg(Receiver, M18),
|
|
|
|
|
[_M19, _M20, _M21] = receive_messages(Receiver, 3),
|
|
|
|
|
ok = assert_no_message(Receiver),
|
|
|
|
|
|
|
|
|
|
ok = amqp10_client:flow_link_credit(R2, 3, never, true),
|
|
|
|
|
[_M22, _M23, M24] = receive_messages(R2, 3),
|
|
|
|
|
ok = assert_no_message(R2),
|
|
|
|
|
ok = amqp10_client:flow_link_credit(Receiver, 3, never, true),
|
|
|
|
|
[_M22, _M23, M24] = receive_messages(Receiver, 3),
|
|
|
|
|
ok = assert_no_message(Receiver),
|
|
|
|
|
|
|
|
|
|
%% Since RenewWhenBelow = never, we expect to receive no new messages despite settling.
|
|
|
|
|
ok = amqp10_client_session:disposition(
|
|
|
|
|
R2, amqp10_msg:delivery_id(M16), amqp10_msg:delivery_id(M24), true, rejected),
|
|
|
|
|
ok = assert_no_message(R2),
|
|
|
|
|
Receiver, amqp10_msg:delivery_id(M16), amqp10_msg:delivery_id(M24), true, rejected),
|
|
|
|
|
ok = assert_no_message(Receiver),
|
|
|
|
|
|
|
|
|
|
ok = amqp10_client:flow_link_credit(R2, 2, never, false),
|
|
|
|
|
[M25, _M26] = receive_messages(R2, 2),
|
|
|
|
|
ok = assert_no_message(R2),
|
|
|
|
|
ok = amqp10_client:flow_link_credit(Receiver, 2, never, false),
|
|
|
|
|
[M25, _M26] = receive_messages(Receiver, 2),
|
|
|
|
|
ok = assert_no_message(Receiver),
|
|
|
|
|
|
|
|
|
|
ok = amqp10_client:flow_link_credit(R2, 3, 3),
|
|
|
|
|
[_M27, _M28, M29] = receive_messages(R2, 3),
|
|
|
|
|
ok = assert_no_message(R2),
|
|
|
|
|
ok = amqp10_client:flow_link_credit(Receiver, 3, 3),
|
|
|
|
|
[_M27, _M28, M29] = receive_messages(Receiver, 3),
|
|
|
|
|
ok = assert_no_message(Receiver),
|
|
|
|
|
|
|
|
|
|
ok = amqp10_client_session:disposition(
|
|
|
|
|
R2, amqp10_msg:delivery_id(M25), amqp10_msg:delivery_id(M29), true, accepted),
|
|
|
|
|
[M30] = receive_messages(R2, 1),
|
|
|
|
|
ok = assert_no_message(R2),
|
|
|
|
|
ok = amqp10_client:accept_msg(R2, M30),
|
|
|
|
|
Receiver, amqp10_msg:delivery_id(M25), amqp10_msg:delivery_id(M29), true, accepted),
|
|
|
|
|
[M30] = receive_messages(Receiver, 1),
|
|
|
|
|
ok = assert_no_message(Receiver),
|
|
|
|
|
ok = amqp10_client:accept_msg(Receiver, M30),
|
|
|
|
|
%% The sender queue is empty now.
|
|
|
|
|
ok = assert_no_message(R2),
|
|
|
|
|
ok = assert_no_message(Receiver),
|
|
|
|
|
|
|
|
|
|
ok = amqp10_client:flow_link_credit(R2, 3, 1),
|
|
|
|
|
_ = publish_messages(Sender, <<"banana">>, 1),
|
|
|
|
|
[M31] = receive_messages(R2, 1),
|
|
|
|
|
ok = amqp10_client:accept_msg(R2, M31),
|
|
|
|
|
ok = amqp10_client:flow_link_credit(Receiver, 3, 1),
|
|
|
|
|
_ = publish_messages(Sender, <<"2-">>, 1),
|
|
|
|
|
[M31] = receive_messages(Receiver, 1),
|
|
|
|
|
ok = amqp10_client:accept_msg(Receiver, M31),
|
|
|
|
|
|
|
|
|
|
%% Since function flow_link_credit/3 documents
|
|
|
|
|
%% "if RenewWhenBelow is an integer, the amqp10_client will automatically grant more
|
|
|
|
@ -637,24 +659,25 @@ subscribe_with_auto_flow(Config) ->
|
|
|
|
|
%% remaining link credit (2) and unsettled messages (0) is 2.
|
|
|
|
|
%%
|
|
|
|
|
%% Therefore, when we publish another 3 messages, we expect to only receive only 2 messages!
|
|
|
|
|
_ = publish_messages(Sender, <<"banana">>, 5),
|
|
|
|
|
[M32, M33] = receive_messages(R2, 2),
|
|
|
|
|
ok = assert_no_message(R2),
|
|
|
|
|
_ = publish_messages(Sender, <<"3-">>, 5),
|
|
|
|
|
[M32, M33] = receive_messages(Receiver, 2),
|
|
|
|
|
ok = assert_no_message(Receiver),
|
|
|
|
|
|
|
|
|
|
%% When we accept both messages, the sum of the remaining link credit (0) and unsettled messages (0)
|
|
|
|
|
%% falls below RenewWhenBelow=1 causing the amqp10_client to grant 3 new credits.
|
|
|
|
|
ok = amqp10_client:accept_msg(R2, M32),
|
|
|
|
|
ok = assert_no_message(R2),
|
|
|
|
|
ok = amqp10_client:accept_msg(R2, M33),
|
|
|
|
|
ok = amqp10_client:accept_msg(Receiver, M32),
|
|
|
|
|
ok = assert_no_message(Receiver),
|
|
|
|
|
ok = amqp10_client:accept_msg(Receiver, M33),
|
|
|
|
|
|
|
|
|
|
[M35, M36, M37] = receive_messages(R2, 3),
|
|
|
|
|
ok = amqp10_client:accept_msg(R2, M35),
|
|
|
|
|
ok = amqp10_client:accept_msg(R2, M36),
|
|
|
|
|
ok = amqp10_client:accept_msg(R2, M37),
|
|
|
|
|
[M35, M36, M37] = receive_messages(Receiver, 3),
|
|
|
|
|
ok = amqp10_client:accept_msg(Receiver, M35),
|
|
|
|
|
ok = amqp10_client:accept_msg(Receiver, M36),
|
|
|
|
|
ok = amqp10_client:accept_msg(Receiver, M37),
|
|
|
|
|
%% The sender queue is empty now.
|
|
|
|
|
ok = assert_no_message(R2),
|
|
|
|
|
ok = assert_no_message(Receiver),
|
|
|
|
|
|
|
|
|
|
ok = amqp10_client:detach_link(R2),
|
|
|
|
|
ok = amqp10_client:detach_link(Receiver),
|
|
|
|
|
ok = amqp10_client:detach_link(Sender),
|
|
|
|
|
ok = amqp10_client:end_session(Session),
|
|
|
|
|
ok = amqp10_client:close_connection(Connection).
|
|
|
|
|
|
|
|
|
@ -817,18 +840,18 @@ await_link(Who, What, Err) ->
|
|
|
|
|
ok;
|
|
|
|
|
{amqp10_event, {link, Who0, {detached, Why}}}
|
|
|
|
|
when Who0 =:= Who ->
|
|
|
|
|
exit(Why)
|
|
|
|
|
ct:fail(Why)
|
|
|
|
|
after 5000 ->
|
|
|
|
|
flush(),
|
|
|
|
|
exit(Err)
|
|
|
|
|
ct:fail(Err)
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
publish_messages(Sender, Data, Num) ->
|
|
|
|
|
publish_messages(Sender, BodyPrefix, Num) ->
|
|
|
|
|
[begin
|
|
|
|
|
Tag = integer_to_binary(T),
|
|
|
|
|
Msg = amqp10_msg:new(Tag, Data, false),
|
|
|
|
|
ok = amqp10_client:send_msg(Sender, Msg),
|
|
|
|
|
ok = await_disposition(Tag)
|
|
|
|
|
Tag = integer_to_binary(T),
|
|
|
|
|
Msg = amqp10_msg:new(Tag, <<BodyPrefix/binary, Tag/binary>>, false),
|
|
|
|
|
ok = amqp10_client:send_msg(Sender, Msg),
|
|
|
|
|
ok = await_disposition(Tag)
|
|
|
|
|
end || T <- lists:seq(1, Num)].
|
|
|
|
|
|
|
|
|
|
await_disposition(DeliveryTag) ->
|
|
|
|
@ -847,7 +870,7 @@ count_received_messages0(Receiver, Count) ->
|
|
|
|
|
receive
|
|
|
|
|
{amqp10_msg, Receiver, _Msg} ->
|
|
|
|
|
count_received_messages0(Receiver, Count + 1)
|
|
|
|
|
after 200 ->
|
|
|
|
|
after 500 ->
|
|
|
|
|
Count
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
@ -861,7 +884,15 @@ receive_messages0(Receiver, N, Acc) ->
|
|
|
|
|
{amqp10_msg, Receiver, Msg} ->
|
|
|
|
|
receive_messages0(Receiver, N - 1, [Msg | Acc])
|
|
|
|
|
after 5000 ->
|
|
|
|
|
ct:fail({timeout, {num_received, length(Acc)}, {num_missing, N}})
|
|
|
|
|
LastReceivedMsg = case Acc of
|
|
|
|
|
[] -> none;
|
|
|
|
|
[M | _] -> M
|
|
|
|
|
end,
|
|
|
|
|
ct:fail({timeout,
|
|
|
|
|
{num_received, length(Acc)},
|
|
|
|
|
{num_missing, N},
|
|
|
|
|
{last_received_msg, LastReceivedMsg}
|
|
|
|
|
})
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
assert_no_message(Receiver) ->
|
|
|
|
|