Protect receiving app from being overloaded

What?

Protect receiving application from being overloaded with new messages
while still processing existing messages if the auto credit renewal
feature of the Erlang AMQP 1.0 client library is used.

This feature can therefore be thought of as a prefetch window equivalent
in AMQP 0.9.1 or MQTT 5.0 property Receive Maximum.

How?

The credit auto renewal feature in RabbitMQ 3.x was wrongly implemented.
This commit takes the same approach as done in the server:
The incoming_unsettled map is hold in the link instead of in the session
to accurately and quickly determine the number of unsettled messages for
a receiving link.

The amqp10_client lib will grant more credits to the sender when the sum
of remaining link credits and number of unsettled deliveries falls below
the threshold RenewWhenBelow.

This avoids maintaning additional state like the `link_credit_unsettled`
or an alternative delivery_count_settled sequence number which is more
complex to implement correctly.

This commit breaks the amqp10_client_session:disposition/6 API:
This commit forces the client application to only range settle for a
given link, i.e. not across multiple links on a given session at once.
The latter is allowed according to the AMQP spec.
This commit is contained in:
David Ansari 2024-02-20 11:22:21 +01:00
parent 6485948dda
commit b0142287c7
10 changed files with 319 additions and 197 deletions

View File

@ -51,9 +51,6 @@ include erlang.mk
HEX_TARBALL_FILES += rabbitmq-components.mk \
git-revisions.txt
# Dialyze the tests.
DIALYZER_OPTS += --src -r test
# --------------------------------------------------------------------
# ActiveMQ for the testsuite.
# --------------------------------------------------------------------

View File

@ -301,16 +301,19 @@ attach_link(Session, AttachArgs) ->
%% This is asynchronous and will notify completion of the attach request to the
%% caller using an amqp10_event of the following format:
%% {amqp10_event, {link, LinkRef, {detached, Why}}}
-spec detach_link(link_ref()) -> _.
-spec detach_link(link_ref()) -> ok | {error, term()}.
detach_link(#link_ref{link_handle = Handle, session = Session}) ->
amqp10_client_session:detach(Session, Handle).
%% @doc Grant credit to a sender.
%% The amqp10_client will automatically grant Credit to the sender when
%% the remaining link credit falls below the value of RenewWhenBelow.
%% If RenewWhenBelow is 'never' the client will never grant more credit. Instead
%% the caller will be notified when the link_credit reaches 0 with an
%% amqp10_event of the following format:
%% @doc Grant Credit to a sender.
%%
%% In addition, if RenewWhenBelow is an integer, the amqp10_client will automatically grant more
%% Credit to the sender when the sum of the remaining link credit and the number of unsettled
%% messages falls below the value of RenewWhenBelow.
%% `Credit + RenewWhenBelow - 1` is the maximum number of in-flight unsettled messages.
%%
%% If RenewWhenBelow is `never` the amqp10_client will never grant more credit. Instead the caller
%% will be notified when the link_credit reaches 0 with an amqp10_event of the following format:
%% {amqp10_event, {link, LinkRef, credit_exhausted}}
-spec flow_link_credit(link_ref(), Credit :: non_neg_integer(),
RenewWhenBelow :: never | pos_integer()) -> ok.
@ -323,10 +326,16 @@ flow_link_credit(Ref, Credit, RenewWhenBelow) ->
flow_link_credit(#link_ref{role = receiver, session = Session,
link_handle = Handle},
Credit, RenewWhenBelow, Drain)
when RenewWhenBelow =:= never orelse
when
%% Drain together with auto renewal doesn't make sense, so disallow it in the API.
((Drain) andalso RenewWhenBelow =:= never
orelse not(Drain))
andalso
%% Check that the RenewWhenBelow value make sense.
(RenewWhenBelow =:= never orelse
is_integer(RenewWhenBelow) andalso
RenewWhenBelow > 0 andalso
RenewWhenBelow =< Credit ->
RenewWhenBelow =< Credit) ->
Flow = #'v1_0.flow'{link_credit = {uint, Credit},
drain = Drain},
ok = amqp10_client_session:flow(Session, Handle, Flow, RenewWhenBelow).
@ -359,11 +368,10 @@ accept_msg(LinkRef, Msg) ->
%% the chosen delivery state.
-spec settle_msg(link_ref(), amqp10_msg:amqp10_msg(),
amqp10_client_types:delivery_state()) -> ok.
settle_msg(#link_ref{role = receiver,
session = Session}, Msg, Settlement) ->
settle_msg(LinkRef, Msg, Settlement) ->
DeliveryId = amqp10_msg:delivery_id(Msg),
amqp10_client_session:disposition(Session, receiver, DeliveryId,
DeliveryId, true, Settlement).
amqp10_client_session:disposition(LinkRef, DeliveryId, DeliveryId, true, Settlement).
%% @doc Get a single message from a link.
%% Flows a single link credit then awaits delivery or timeout.
-spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}.

View File

@ -20,4 +20,5 @@
-record(link_ref, {role :: sender | receiver,
session :: pid(),
%% locally chosen output handle
link_handle :: non_neg_integer()}).

View File

@ -21,7 +21,7 @@
detach/2,
transfer/3,
flow/4,
disposition/6
disposition/5
]).
%% Private API
@ -131,8 +131,9 @@
available = 0 :: non_neg_integer(),
drain = false :: boolean(),
partial_transfers :: undefined | {#'v1_0.transfer'{}, [binary()]},
auto_flow :: never | {auto, RenewWhenBelow :: pos_integer(), Credit :: pos_integer()}
}).
auto_flow :: never | {auto, RenewWhenBelow :: pos_integer(), Credit :: pos_integer()},
incoming_unsettled = #{} :: #{delivery_number() => ok}
}).
-record(state,
{channel :: pos_integer(),
@ -155,7 +156,6 @@
connection_config :: amqp10_client_connection:connection_config(),
outgoing_delivery_id = ?INITIAL_OUTGOING_DELIVERY_ID :: delivery_number(),
outgoing_unsettled = #{} :: #{delivery_number() => {amqp10_msg:delivery_tag(), Notify :: pid()}},
incoming_unsettled = #{} :: #{delivery_number() => output_handle()},
notify :: pid()
}).
@ -204,14 +204,18 @@ transfer(Session, Amqp10Msg, Timeout) ->
flow(Session, Handle, Flow, RenewWhenBelow) ->
gen_statem:cast(Session, {flow_link, Handle, Flow, RenewWhenBelow}).
-spec disposition(pid(), link_role(), delivery_number(), delivery_number(), boolean(),
%% Sending a disposition on a sender link (with receiver-settle-mode = second)
%% is currently unsupported.
-spec disposition(link_ref(), delivery_number(), delivery_number(), boolean(),
amqp10_client_types:delivery_state()) -> ok.
disposition(Session, Role, First, Last, Settled, DeliveryState) ->
gen_statem:call(Session, {disposition, Role, First, Last, Settled,
disposition(#link_ref{role = receiver,
session = Session,
link_handle = Handle},
First, Last, Settled, DeliveryState) ->
gen_statem:call(Session, {disposition, Handle, First, Last, Settled,
DeliveryState}, ?TIMEOUT).
%% -------------------------------------------------------------------
%% Private API.
%% -------------------------------------------------------------------
@ -277,7 +281,7 @@ mapped(cast, 'end', State) ->
send_end(State),
{next_state, end_sent, State};
mapped(cast, {flow_link, OutHandle, Flow0, RenewWhenBelow}, State0) ->
State = send_flow_link(fun send/2, OutHandle, Flow0, RenewWhenBelow, State0),
State = send_flow_link(OutHandle, Flow0, RenewWhenBelow, State0),
{keep_state, State};
mapped(cast, {flow_session, Flow0 = #'v1_0.flow'{incoming_window = {uint, IncomingWindow}}},
#state{next_incoming_id = NII,
@ -367,45 +371,43 @@ mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle},
State = book_partial_transfer_received(
State0#state{links = Links#{OutHandle => Link1}}),
{keep_state, State};
mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle},
delivery_id = MaybeDeliveryId,
settled = Settled} = Transfer0, Payload0},
#state{incoming_unsettled = Unsettled0} = State0) ->
mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}},
Payload0}, State0) ->
{ok, #link{target = {pid, TargetPid},
output_handle = OutHandle,
ref = LinkRef} = Link0} =
find_link_by_input_handle(InHandle, State0),
ref = LinkRef,
incoming_unsettled = Unsettled
} = Link0} = find_link_by_input_handle(InHandle, State0),
{Transfer = #'v1_0.transfer'{settled = Settled,
delivery_id = {uint, DeliveryId}},
Payload, Link1} = complete_partial_transfer(Transfer0, Payload0, Link0),
{Transfer, Payload, Link1} = complete_partial_transfer(Transfer0, Payload0, Link0),
Msg = decode_as_msg(Transfer, Payload),
% stash the DeliveryId - not sure for what yet
Unsettled = case MaybeDeliveryId of
{uint, DeliveryId} when Settled =/= true ->
Unsettled0#{DeliveryId => OutHandle};
_ ->
Unsettled0
end,
Link2 = case Settled of
true ->
Link1;
_ ->
%% "If not set on the first (or only) transfer for a (multi-transfer) delivery,
%% then the settled flag MUST be interpreted as being false." [2.7.5]
Link1#link{incoming_unsettled = Unsettled#{DeliveryId => ok}}
end,
% link bookkeeping
% notify when credit is exhausted (link_credit = 0)
% detach the Link with a transfer-limit-exceeded error code if further
% transfers are received
State1 = State0#state{incoming_unsettled = Unsettled},
case book_transfer_received(State1, Link1) of
{ok, Link2, State2} ->
case book_transfer_received(State0, Link2) of
{ok, Link3, State1} ->
% deliver
TargetPid ! {amqp10_msg, LinkRef, Msg},
State = auto_flow(Link2, State2),
State = auto_flow(Link3, State1),
{keep_state, State};
{credit_exhausted, Link2, State} ->
{credit_exhausted, Link3, State} ->
TargetPid ! {amqp10_msg, LinkRef, Msg},
notify_credit_exhausted(Link2),
notify_credit_exhausted(Link3),
{keep_state, State};
{transfer_limit_exceeded, Link2, State} ->
logger:warning("transfer_limit_exceeded for link ~tp", [Link2]),
Link = detach_with_error_cond(Link2, State,
{transfer_limit_exceeded, Link3, State} ->
logger:warning("transfer_limit_exceeded for link ~tp", [Link3]),
Link = detach_with_error_cond(Link3, State,
?V_1_0_LINK_ERROR_TRANSFER_LIMIT_EXCEEDED),
{keep_state, update_link(Link, State)}
end;
@ -501,12 +503,15 @@ mapped({call, From},
end;
mapped({call, From},
{disposition, Role, First, Last, Settled0, DeliveryState},
#state{incoming_unsettled = Unsettled0} = State0) ->
{disposition, OutputHandle, First, Last, Settled0, DeliveryState},
#state{links = Links} = State0) ->
#{OutputHandle := Link0 = #link{incoming_unsettled = Unsettled0}} = Links,
Unsettled = serial_number:foldl(fun maps:remove/2, Unsettled0, First, Last),
State = State0#state{incoming_unsettled = Unsettled},
Link = Link0#link{incoming_unsettled = Unsettled},
State1 = State0#state{links = Links#{OutputHandle := Link}},
State = auto_flow(Link, State1),
Disposition = #'v1_0.disposition'{
role = translate_role(Role),
role = translate_role(receiver),
first = {uint, First},
last = {uint, Last},
settled = Settled0,
@ -599,7 +604,7 @@ send_transfer(Transfer0, Parts0, MaxMessageSize, #state{socket = Socket,
{ok, length(Frames)}
end.
send_flow_link(Send, OutHandle,
send_flow_link(OutHandle,
#'v1_0.flow'{link_credit = {uint, Credit}} = Flow0, RenewWhenBelow,
#state{links = Links,
next_incoming_id = NII,
@ -625,7 +630,7 @@ send_flow_link(Send, OutHandle,
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
delivery_count = maybe_uint(DeliveryCount),
available = uint(Available)},
ok = Send(Flow, State),
ok = send(Flow, State),
State#state{links = Links#{OutHandle =>
Link#link{link_credit = Credit,
auto_flow = AutoFlow}}}.
@ -777,8 +782,9 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
max_message_size = MaxMessageSize},
ok = Send(Attach, State),
LinkRef = make_link_ref(element(1, Role), self(), OutHandle),
Link = #link{name = Name,
ref = make_link_ref(element(1, Role), self(), OutHandle),
ref = LinkRef,
output_handle = OutHandle,
state = attach_sent,
role = element(1, Role),
@ -790,7 +796,7 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
{State#state{links = Links#{OutHandle => Link},
next_link_handle = NextLinkHandle,
link_index = LinkIndex#{Name => OutHandle}}, Link#link.ref}.
link_index = LinkIndex#{Name => OutHandle}}, LinkRef}.
-spec handle_session_flow(#'v1_0.flow'{}, #state{}) -> #state{}.
handle_session_flow(#'v1_0.flow'{next_incoming_id = MaybeNII,
@ -908,7 +914,6 @@ translate_delivery_state({modified,
translate_delivery_state(released) -> #'v1_0.released'{};
translate_delivery_state(received) -> #'v1_0.received'{}.
translate_role(sender) -> false;
translate_role(receiver) -> true.
maybe_notify_link_credit(#link{role = sender,
@ -987,9 +992,11 @@ book_transfer_received(#state{next_incoming_id = NID,
auto_flow(#link{link_credit = LC,
auto_flow = {auto, RenewWhenBelow, Credit},
output_handle = OutHandle}, State)
when LC < RenewWhenBelow ->
send_flow_link(fun send/2, OutHandle,
output_handle = OutHandle,
incoming_unsettled = Unsettled},
State)
when LC + map_size(Unsettled) < RenewWhenBelow ->
send_flow_link(OutHandle,
#'v1_0.flow'{link_credit = {uint, Credit}},
RenewWhenBelow, State);
auto_flow(_, State) ->
@ -1045,7 +1052,8 @@ socket_send0({tcp, Socket}, Data) ->
socket_send0({ssl, Socket}, Data) ->
ssl:send(Socket, Data).
-spec make_link_ref(_, _, _) -> link_ref().
-spec make_link_ref(link_role(), pid(), output_handle()) ->
link_ref().
make_link_ref(Role, Session, Handle) ->
#link_ref{role = Role, session = Session, link_handle = Handle}.
@ -1100,7 +1108,6 @@ format_status(Status = #{data := Data0}) ->
connection_config = ConnectionConfig,
outgoing_delivery_id = OutgoingDeliveryId,
outgoing_unsettled = OutgoingUnsettled,
incoming_unsettled = IncomingUnsettled,
notify = Notify
} = Data0,
Links = maps:map(
@ -1119,7 +1126,8 @@ format_status(Status = #{data := Data0}) ->
available = Available,
drain = Drain,
partial_transfers = PartialTransfers0,
auto_flow = AutoFlow
auto_flow = AutoFlow,
incoming_unsettled = IncomingUnsettled
}) ->
PartialTransfers = case PartialTransfers0 of
undefined ->
@ -1141,7 +1149,9 @@ format_status(Status = #{data := Data0}) ->
available => Available,
drain => Drain,
partial_transfers => PartialTransfers,
auto_flow => AutoFlow}
auto_flow => AutoFlow,
incoming_unsettled => maps:size(IncomingUnsettled)
}
end, Links0),
Data = #{channel => Channel,
remote_channel => RemoteChannel,
@ -1160,7 +1170,6 @@ format_status(Status = #{data := Data0}) ->
connection_config => maps:remove(sasl, ConnectionConfig),
outgoing_delivery_id => OutgoingDeliveryId,
outgoing_unsettled => maps:size(OutgoingUnsettled),
incoming_unsettled => maps:size(IncomingUnsettled),
notify => Notify},
Status#{data := Data}.

View File

@ -514,16 +514,17 @@ subscribe(Config) ->
<<"sub-receiver">>,
QueueName, unsettled),
ok = amqp10_client:flow_link_credit(Receiver, 10, never),
[begin
receive {amqp10_msg, Receiver, Msg} ->
ok = amqp10_client:accept_msg(Receiver, Msg)
after 2000 -> ct:fail(timeout)
end
end || _ <- lists:seq(1, 10)],
ok = assert_no_message(Receiver),
_ = receive_messages(Receiver, 10),
% assert no further messages are delivered
timeout = receive_one(Receiver),
receive
{amqp10_event, {link, Receiver, credit_exhausted}} ->
ok
after 5000 ->
flush(),
exit(credit_exhausted_assert)
receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok
after 5000 -> flush(),
exit(credit_exhausted_assert)
end,
ok = amqp10_client:end_session(Session),
@ -539,16 +540,121 @@ subscribe_with_auto_flow(Config) ->
<<"sub-sender">>,
QueueName),
await_link(Sender, credited, link_credit_timeout),
_ = publish_messages(Sender, <<"banana">>, 10),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session,
<<"sub-receiver">>,
QueueName, unsettled),
ok = amqp10_client:flow_link_credit(Receiver, 5, 2),
_ = receive_messages(Receiver, 10),
_ = publish_messages(Sender, <<"banana">>, 20),
%% Use sender settle mode 'settled'.
{ok, R1} = amqp10_client:attach_receiver_link(
Session, <<"sub-receiver-1">>, QueueName, settled),
await_link(R1, attached, attached_timeout),
ok = amqp10_client:flow_link_credit(R1, 5, 2),
?assertEqual(20, count_received_messages(R1)),
ok = amqp10_client:detach_link(R1),
% assert no further messages are delivered
timeout = receive_one(Receiver),
_ = publish_messages(Sender, <<"banana">>, 30),
%% Use sender settle mode 'unsettled'.
%% This should require us to manually settle message in order to receive more messages.
{ok, R2} = amqp10_client:attach_receiver_link(Session, <<"sub-receiver-2">>, QueueName, unsettled),
await_link(R2, attached, attached_timeout),
ok = amqp10_client:flow_link_credit(R2, 5, 2),
%% We should receive exactly 5 messages.
[M1, _M2, M3, M4, M5] = receive_messages(R2, 5),
ok = assert_no_message(R2),
%% Even when we accept the first 3 messages, the number of unsettled messages has not yet fallen below 2.
%% Therefore, the client should not yet grant more credits to the sender.
ok = amqp10_client_session:disposition(
R2, amqp10_msg:delivery_id(M1), amqp10_msg:delivery_id(M3), true, accepted),
ok = assert_no_message(R2),
%% When we accept 1 more message (the order in which we accept shouldn't matter, here we accept M5 before M4),
%% the number of unsettled messages now falls below 2 (since only M4 is left unsettled).
%% Therefore, the client should grant 5 credits to the sender.
%% Therefore, we should receive 5 more messages.
ok = amqp10_client:accept_msg(R2, M5),
[_M6, _M7, _M8, _M9, M10] = receive_messages(R2, 5),
ok = assert_no_message(R2),
%% It shouldn't matter how we settle messages, therefore we use 'rejected' this time.
%% Settling all in flight messages should cause us to receive exactly 5 more messages.
ok = amqp10_client_session:disposition(
R2, amqp10_msg:delivery_id(M4), amqp10_msg:delivery_id(M10), true, rejected),
[M11, _M12, _M13, _M14, M15] = receive_messages(R2, 5),
ok = assert_no_message(R2),
%% Dynamically decrease link credit.
%% Since we explicitly tell to grant 3 new credits now, we expect to receive 3 more messages.
ok = amqp10_client:flow_link_credit(R2, 3, 3),
[M16, _M17, M18] = receive_messages(R2, 3),
ok = assert_no_message(R2),
ok = amqp10_client_session:disposition(
R2, amqp10_msg:delivery_id(M11), amqp10_msg:delivery_id(M15), true, accepted),
%% However, the RenewWhenBelow=3 still refers to all unsettled messages.
%% Right now we have 3 messages (M16, M17, M18) unsettled.
ok = assert_no_message(R2),
%% Settling 1 out of these 3 messages causes RenewWhenBelow to fall below 3 resulting
%% in 3 new messages to be received.
ok = amqp10_client:accept_msg(R2, M18),
[_M19, _M20, _M21] = receive_messages(R2, 3),
ok = assert_no_message(R2),
ok = amqp10_client:flow_link_credit(R2, 3, never, true),
[_M22, _M23, M24] = receive_messages(R2, 3),
ok = assert_no_message(R2),
%% Since RenewWhenBelow = never, we expect to receive no new messages despite settling.
ok = amqp10_client_session:disposition(
R2, amqp10_msg:delivery_id(M16), amqp10_msg:delivery_id(M24), true, rejected),
ok = assert_no_message(R2),
ok = amqp10_client:flow_link_credit(R2, 2, never, false),
[M25, _M26] = receive_messages(R2, 2),
ok = assert_no_message(R2),
ok = amqp10_client:flow_link_credit(R2, 3, 3),
[_M27, _M28, M29] = receive_messages(R2, 3),
ok = assert_no_message(R2),
ok = amqp10_client_session:disposition(
R2, amqp10_msg:delivery_id(M25), amqp10_msg:delivery_id(M29), true, accepted),
[M30] = receive_messages(R2, 1),
ok = assert_no_message(R2),
ok = amqp10_client:accept_msg(R2, M30),
%% The sender queue is empty now.
ok = assert_no_message(R2),
ok = amqp10_client:flow_link_credit(R2, 3, 1),
_ = publish_messages(Sender, <<"banana">>, 1),
[M31] = receive_messages(R2, 1),
ok = amqp10_client:accept_msg(R2, M31),
%% Since function flow_link_credit/3 documents
%% "if RenewWhenBelow is an integer, the amqp10_client will automatically grant more
%% Credit to the sender when the sum of the remaining link credit and the number of
%% unsettled messages falls below the value of RenewWhenBelow."
%% our expectation is that the amqp10_client has not renewed credit since the sum of
%% remaining link credit (2) and unsettled messages (0) is 2.
%%
%% Therefore, when we publish another 3 messages, we expect to only receive only 2 messages!
_ = publish_messages(Sender, <<"banana">>, 5),
[M32, M33] = receive_messages(R2, 2),
ok = assert_no_message(R2),
%% When we accept both messages, the sum of the remaining link credit (0) and unsettled messages (0)
%% falls below RenewWhenBelow=1 causing the amqp10_client to grant 3 new credits.
ok = amqp10_client:accept_msg(R2, M32),
ok = assert_no_message(R2),
ok = amqp10_client:accept_msg(R2, M33),
[M35, M36, M37] = receive_messages(R2, 3),
ok = amqp10_client:accept_msg(R2, M35),
ok = amqp10_client:accept_msg(R2, M36),
ok = amqp10_client:accept_msg(R2, M37),
%% The sender queue is empty now.
ok = assert_no_message(R2),
ok = amqp10_client:detach_link(R2),
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).
@ -703,38 +809,6 @@ incoming_heartbeat(Config) ->
%%% HELPERS
%%%
receive_messages(Receiver, Num) ->
[begin
ct:pal("receive_messages ~tp", [T]),
ok = receive_one(Receiver)
end || T <- lists:seq(1, Num)].
publish_messages(Sender, Data, Num) ->
[begin
Tag = integer_to_binary(T),
Msg = amqp10_msg:new(Tag, Data, false),
ok = amqp10_client:send_msg(Sender, Msg),
ok = await_disposition(Tag)
end || T <- lists:seq(1, Num)].
receive_one(Receiver) ->
receive
{amqp10_msg, Receiver0, Msg}
when Receiver0 =:= Receiver ->
amqp10_client:accept_msg(Receiver, Msg)
after 2000 ->
timeout
end.
await_disposition(DeliveryTag) ->
receive
{amqp10_disposition, {accepted, DeliveryTag0}}
when DeliveryTag0 =:= DeliveryTag -> ok
after 3000 ->
flush(),
exit(dispostion_timeout)
end.
await_link(Who, What, Err) ->
receive
{amqp10_event, {link, Who0, What0}}
@ -749,6 +823,52 @@ await_link(Who, What, Err) ->
exit(Err)
end.
publish_messages(Sender, Data, Num) ->
[begin
Tag = integer_to_binary(T),
Msg = amqp10_msg:new(Tag, Data, false),
ok = amqp10_client:send_msg(Sender, Msg),
ok = await_disposition(Tag)
end || T <- lists:seq(1, Num)].
await_disposition(DeliveryTag) ->
receive
{amqp10_disposition, {accepted, DeliveryTag0}}
when DeliveryTag0 =:= DeliveryTag -> ok
after 3000 ->
flush(),
ct:fail(dispostion_timeout)
end.
count_received_messages(Receiver) ->
count_received_messages0(Receiver, 0).
count_received_messages0(Receiver, Count) ->
receive
{amqp10_msg, Receiver, _Msg} ->
count_received_messages0(Receiver, Count + 1)
after 200 ->
Count
end.
receive_messages(Receiver, N) ->
receive_messages0(Receiver, N, []).
receive_messages0(_Receiver, 0, Acc) ->
lists:reverse(Acc);
receive_messages0(Receiver, N, Acc) ->
receive
{amqp10_msg, Receiver, Msg} ->
receive_messages0(Receiver, N - 1, [Msg | Acc])
after 5000 ->
ct:fail({timeout, {num_received, length(Acc)}, {num_missing, N}})
end.
assert_no_message(Receiver) ->
receive {amqp10_msg, Receiver, Msg} -> ct:fail({unexpected_message, Msg})
after 50 -> ok
end.
to_bin(X) when is_list(X) ->
list_to_binary(X).

View File

@ -492,7 +492,7 @@ receiver_settle_mode_first(Config) ->
?assertEqual(DeliveryIdMsg9, serial_number_increment(DeliveryIdMsg8)),
Last1 = serial_number_increment(serial_number_increment(DeliveryIdMsg9)),
ok = amqp10_client_session:disposition(
Session, receiver, DeliveryIdMsg8, Last1, true, accepted),
Receiver, DeliveryIdMsg8, Last1, true, accepted),
assert_messages(QName, 8, 7, Config),
%% 2. Ack a range smaller than the number of unacked messages where all delivery IDs
@ -501,7 +501,7 @@ receiver_settle_mode_first(Config) ->
DeliveryIdMsg4 = amqp10_msg:delivery_id(Msg4),
DeliveryIdMsg6 = amqp10_msg:delivery_id(Msg6),
ok = amqp10_client_session:disposition(
Session, receiver, DeliveryIdMsg4, DeliveryIdMsg6, true, accepted),
Receiver, DeliveryIdMsg4, DeliveryIdMsg6, true, accepted),
assert_messages(QName, 5, 4, Config),
%% 3. Ack a range larger than the number of unacked messages where all delivery IDs
@ -509,7 +509,7 @@ receiver_settle_mode_first(Config) ->
DeliveryIdMsg2 = amqp10_msg:delivery_id(Msg2),
DeliveryIdMsg7 = amqp10_msg:delivery_id(Msg7),
ok = amqp10_client_session:disposition(
Session, receiver, DeliveryIdMsg2, DeliveryIdMsg7, true, accepted),
Receiver, DeliveryIdMsg2, DeliveryIdMsg7, true, accepted),
assert_messages(QName, 2, 1, Config),
%% Consume the last message.
@ -523,16 +523,16 @@ receiver_settle_mode_first(Config) ->
DeliveryIdMsg10 = amqp10_msg:delivery_id(Msg10),
Last2 = serial_number_increment(DeliveryIdMsg10),
ok = amqp10_client_session:disposition(
Session, receiver, DeliveryIdMsg1, Last2, true, accepted),
Receiver, DeliveryIdMsg1, Last2, true, accepted),
assert_messages(QName, 0, 0, Config),
%% 5. Ack single delivery ID when there are no unacked messages.
ok = amqp10_client_session:disposition(
Session, receiver, DeliveryIdMsg1, DeliveryIdMsg1, true, accepted),
Receiver, DeliveryIdMsg1, DeliveryIdMsg1, true, accepted),
%% 6. Ack multiple delivery IDs when there are no unacked messages.
ok = amqp10_client_session:disposition(
Session, receiver, DeliveryIdMsg1, DeliveryIdMsg6, true, accepted),
Receiver, DeliveryIdMsg1, DeliveryIdMsg6, true, accepted),
assert_messages(QName, 0, 0, Config),
ok = amqp10_client:detach_link(Receiver),
@ -684,7 +684,7 @@ amqp_stream_amqpl(Config) ->
#amqp_msg{props = #'P_basic'{type = <<"amqp-1.0">>}}} ->
ok
after 5000 ->
exit(basic_deliver_timeout)
ct:fail(basic_deliver_timeout)
end,
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).
@ -1736,7 +1736,7 @@ detach_requeues(Config) ->
%% Receiver2 accepts all 4 messages.
ok = amqp10_client_session:disposition(
Session, receiver,
Receiver2,
amqp10_msg:delivery_id(Msg2),
amqp10_msg:delivery_id(Msg3b),
true, accepted),
@ -2328,29 +2328,20 @@ async_notify(SenderSettleMode, QType, Config) ->
end,
%% Initially, grant 10 credits to the sending queue.
%% Whenever credits drops below 5, renew back to 10.
%% Whenever the sum of credits and number of unsettled messages drops below 5, renew back to 10.
ok = amqp10_client:flow_link_credit(Receiver, 10, 5),
%% We should receive all messages.
Msgs = receive_messages(Receiver, NumMsgs),
Accept = case SenderSettleMode of
settled -> false;
unsettled -> true
end,
Msgs = receive_all_messages(Receiver, Accept),
FirstMsg = hd(Msgs),
LastMsg = lists:last(Msgs),
?assertEqual([<<"1">>], amqp10_msg:body(FirstMsg)),
?assertEqual([integer_to_binary(NumMsgs)], amqp10_msg:body(LastMsg)),
case SenderSettleMode of
settled ->
ok;
unsettled ->
ok = amqp10_client_session:disposition(
Session,
receiver,
amqp10_msg:delivery_id(FirstMsg),
amqp10_msg:delivery_id(LastMsg),
true,
accepted)
end,
%% No further messages should be delivered.
receive Unexpected -> ct:fail({received_unexpected_message, Unexpected})
after 50 -> ok
@ -2503,8 +2494,7 @@ queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config)
?assertEqual([<<"1">>], amqp10_msg:body(FirstMsg)),
?assertEqual([integer_to_binary(NumMsgs)], amqp10_msg:body(LastMsg)),
ok = amqp10_client_session:disposition(
Session,
receiver,
Receiver,
amqp10_msg:delivery_id(FirstMsg),
amqp10_msg:delivery_id(LastMsg),
true,
@ -2803,7 +2793,7 @@ stream_filtering(Config) ->
#{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
<<"rabbitmq:stream-filter">> => <<"apple">>}),
ok = amqp10_client:flow_link_credit(AppleReceiver, 100, 10),
AppleMessages = receive_all_messages(AppleReceiver, []),
AppleMessages = receive_all_messages(AppleReceiver, true),
%% we should get less than all the waves combined
?assert(length(AppleMessages) < WaveCount * 3),
%% client-side filtering
@ -2824,7 +2814,7 @@ stream_filtering(Config) ->
#{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
<<"rabbitmq:stream-filter">> => [<<"apple">>, <<"orange">>]}),
ok = amqp10_client:flow_link_credit(AppleOrangeReceiver, 100, 10),
AppleOrangeMessages = receive_all_messages(AppleOrangeReceiver, []),
AppleOrangeMessages = receive_all_messages(AppleOrangeReceiver, true),
%% we should get less than all the waves combined
?assert(length(AppleOrangeMessages) < WaveCount * 3),
%% client-side filtering
@ -2848,7 +2838,7 @@ stream_filtering(Config) ->
<<"rabbitmq:stream-match-unfiltered">> => {boolean, true}}),
ok = amqp10_client:flow_link_credit(AppleUnfilteredReceiver, 100, 10),
AppleUnfilteredMessages = receive_all_messages(AppleUnfilteredReceiver, []),
AppleUnfilteredMessages = receive_all_messages(AppleUnfilteredReceiver, true),
%% we should get less than all the waves combined
?assert(length(AppleUnfilteredMessages) < WaveCount * 3),
%% client-side filtering
@ -3351,10 +3341,16 @@ classic_priority_queue(Config) ->
%% internal
%%
receive_all_messages(Receiver, Acc) ->
receive_all_messages(Receiver, Accept) ->
receive_all_messages0(Receiver, Accept, []).
receive_all_messages0(Receiver, Accept, Acc) ->
receive {amqp10_msg, Receiver, Msg} ->
ok = amqp10_client:accept_msg(Receiver, Msg),
receive_all_messages(Receiver, [Msg | Acc])
case Accept of
true -> ok = amqp10_client:accept_msg(Receiver, Msg);
false -> ok
end,
receive_all_messages0(Receiver, Accept, [Msg | Acc])
after 500 ->
lists:reverse(Acc)
end.
@ -3501,7 +3497,7 @@ receive_messages0(Receiver, N, Acc) ->
{amqp10_msg, Receiver, Msg} ->
receive_messages0(Receiver, N - 1, [Msg | Acc])
after 5000 ->
exit({timeout, {num_received, length(Acc)}, {num_missing, N}})
ct:fail({timeout, {num_received, length(Acc)}, {num_missing, N}})
end.
count_received_messages(Receiver) ->

View File

@ -121,8 +121,8 @@ credit_api_v2(Config) ->
filter => #{}},
{ok, QQReceiver1} = amqp10_client:attach_link(Session, QQAttachArgs),
ok = consume_and_accept(10, CQReceiver1, Session),
ok = consume_and_accept(10, QQReceiver1, Session),
ok = consume_and_accept(10, CQReceiver1),
ok = consume_and_accept(10, QQReceiver1),
?assertEqual(ok,
rabbit_ct_broker_helpers:enable_feature_flag(Config, ?FUNCTION_NAME)),
@ -133,12 +133,12 @@ credit_api_v2(Config) ->
Session, <<"cq receiver 2">>, CQAddr, unsettled),
{ok, QQReceiver2} = amqp10_client:attach_receiver_link(
Session, <<"qq receiver 2">>, QQAddr, unsettled),
ok = consume_and_accept(10, CQReceiver2, Session),
ok = consume_and_accept(10, QQReceiver2, Session),
ok = consume_and_accept(10, CQReceiver2),
ok = consume_and_accept(10, QQReceiver2),
%% Consume via with credit API v1
ok = consume_and_accept(10, CQReceiver1, Session),
ok = consume_and_accept(10, QQReceiver1, Session),
ok = consume_and_accept(10, CQReceiver1),
ok = consume_and_accept(10, QQReceiver1),
%% Detach the credit API v1 links and attach with the same output handle.
ok = detach_sync(CQReceiver1),
@ -147,8 +147,8 @@ credit_api_v2(Config) ->
{ok, QQReceiver3} = amqp10_client:attach_link(Session, QQAttachArgs),
%% The new links should use credit API v2
ok = consume_and_accept(10, CQReceiver3, Session),
ok = consume_and_accept(10, QQReceiver3, Session),
ok = consume_and_accept(10, CQReceiver3),
ok = consume_and_accept(10, QQReceiver3),
flush(pre_drain),
%% Draining should also work.
@ -181,12 +181,11 @@ credit_api_v2(Config) ->
after 5000 -> ct:fail(missing_closed)
end.
consume_and_accept(NumMsgs, Receiver, Session) ->
consume_and_accept(NumMsgs, Receiver) ->
ok = amqp10_client:flow_link_credit(Receiver, NumMsgs, never),
Msgs = receive_messages(Receiver, NumMsgs),
ok = amqp10_client_session:disposition(
Session,
receiver,
Receiver,
amqp10_msg:delivery_id(hd(Msgs)),
amqp10_msg:delivery_id(lists:last(Msgs)),
true,

View File

@ -283,30 +283,24 @@ close_dest(#{dest := #{current := #{conn := Conn}}}) ->
close_dest(_Config) -> ok.
-spec ack(Tag :: tag(), Multi :: boolean(), state()) -> state().
ack(Tag, true, State = #{source := #{current := #{session := Session},
ack(Tag, true, State = #{source := #{current := #{link := LinkRef},
last_acked_tag := LastTag} = Src}) ->
First = LastTag + 1,
ok = amqp10_client_session:disposition(Session, receiver, First,
Tag, true, accepted),
ok = amqp10_client_session:disposition(LinkRef, First, Tag, true, accepted),
State#{source => Src#{last_acked_tag => Tag}};
ack(Tag, false, State = #{source := #{current :=
#{session := Session}} = Src}) ->
ok = amqp10_client_session:disposition(Session, receiver, Tag,
Tag, true, accepted),
ack(Tag, false, State = #{source := #{current := #{link := LinkRef}} = Src}) ->
ok = amqp10_client_session:disposition(LinkRef, Tag, Tag, true, accepted),
State#{source => Src#{last_acked_tag => Tag}}.
-spec nack(Tag :: tag(), Multi :: boolean(), state()) -> state().
nack(Tag, false, State = #{source :=
#{current := #{session := Session}} = Src}) ->
nack(Tag, false, State = #{source := #{current := #{link := LinkRef}} = Src}) ->
% the tag is the same as the deliveryid
ok = amqp10_client_session:disposition(Session, receiver, Tag,
Tag, false, rejected),
ok = amqp10_client_session:disposition(LinkRef, Tag, Tag, true, rejected),
State#{source => Src#{last_nacked_tag => Tag}};
nack(Tag, true, State = #{source := #{current := #{session := Session},
last_nacked_tag := LastTag} = Src}) ->
nack(Tag, true, State = #{source := #{current := #{link := LinkRef},
last_nacked_tag := LastTag} = Src}) ->
First = LastTag + 1,
ok = amqp10_client_session:disposition(Session, receiver, First,
Tag, true, accepted),
ok = amqp10_client_session:disposition(LinkRef, First, Tag, true, rejected),
State#{source => Src#{last_nacked_tag => Tag}}.
status(#{dest := #{current := #{link_state := attached}}}) ->

View File

@ -178,8 +178,7 @@ amqp_credit_multiple_grants(Config) ->
%% Let's ack all of them.
ok = amqp10_client_session:disposition(
Session,
receiver,
Receiver,
amqp10_msg:delivery_id(M1),
amqp10_msg:delivery_id(M4),
true,
@ -226,8 +225,7 @@ amqp_credit_multiple_grants(Config) ->
%% Let's ack them all.
ok = amqp10_client_session:disposition(
Session,
receiver,
Receiver,
amqp10_msg:delivery_id(M5),
amqp10_msg:delivery_id(M11),
true,

View File

@ -3,6 +3,23 @@ accept:
- accept_header
- accept_neg
- accept_parser
amqp10_client:
- amqp10_client
- amqp10_client_app
- amqp10_client_connection
- amqp10_client_connection_sup
- amqp10_client_frame_reader
- amqp10_client_session
- amqp10_client_sessions_sup
- amqp10_client_sup
- amqp10_client_types
- amqp10_msg
amqp10_common:
- amqp10_binary_generator
- amqp10_binary_parser
- amqp10_framing
- amqp10_framing0
- serial_number
amqp_client:
- amqp_auth_mechanisms
- amqp_channel
@ -28,23 +45,6 @@ amqp_client:
- amqp_util
- rabbit_routing_util
- uri_parser
amqp10_client:
- amqp10_client
- amqp10_client_app
- amqp10_client_connection
- amqp10_client_connection_sup
- amqp10_client_frame_reader
- amqp10_client_session
- amqp10_client_sessions_sup
- amqp10_client_sup
- amqp10_client_types
- amqp10_msg
amqp10_common:
- amqp10_binary_generator
- amqp10_binary_parser
- amqp10_framing
- amqp10_framing0
- serial_number
aten:
- aten
- aten_app