Auto widen session incoming-window in AMQP 1.0 client

This commit fixes a bug in the Erlang AMQP 1.0 client.

Prior to this commit, to repro this bug:
1. Send more than 2^16 messages to a queue.
2. Grant more than a total of 2^16 link credit initially (on a single link
   or across multiple links) on a single session without any
   auto or manual link credit renewal.

The expectation is that thanks to sufficiently granted initial link-credit,
the client will receive all messages.
However, consumption stops after exactly 2^16-1 messages.

That's because the client lib was never sending a flow frame to the server.
So, after the client received all 2^16-1 messages (the initial
incoming-window set by the client), the server's remote-incoming-window
reached 0 causing the server to stop delivering messages.

The expectation is that the client lib automatically handles session
flow control without any manual involvement of the client app.

This commit implements this fix:
* We keep the server's remote-incoming window always large by default as
  explained in https://www.rabbitmq.com/blog/2024/09/02/amqp-flow-control#incoming-window
* Hence, the client lib sets its incoming-window to 100,000 initially.
* The client lib tracks its incoming-window decrementing it by 1 for
  every transfer it received. (This wasn't done prior to this commit.)
* Whenever this window shrinks below 50,000, the client sends a flow
  frame without any link information widening its incoming-window back to 100,000.
* For test cases (maybe later for apps as well), there is a new function
  `amqp10_client_session:flow/3`, which allows for a test case to do manual
  session flow control. Its API is designed very similar to
  `amqp10_client_session:flow_link/4` in that the test can optionally request
  the lib to auto widen the session window whenever it falls below a certain threshold.
This commit is contained in:
David Ansari 2025-03-18 17:29:37 +00:00
parent e93afc5c5b
commit 32854e8d34
4 changed files with 223 additions and 57 deletions

View File

@ -339,7 +339,7 @@ flow_link_credit(#link_ref{role = receiver, session = Session,
RenewWhenBelow =< Credit) ->
Flow = #'v1_0.flow'{link_credit = {uint, Credit},
drain = Drain},
ok = amqp10_client_session:flow(Session, Handle, Flow, RenewWhenBelow).
ok = amqp10_client_session:flow_link(Session, Handle, Flow, RenewWhenBelow).
%% @doc Stop a receiving link.
%% See AMQP 1.0 spec §2.6.10.
@ -348,7 +348,7 @@ stop_receiver_link(#link_ref{role = receiver,
link_handle = Handle}) ->
Flow = #'v1_0.flow'{link_credit = {uint, 0},
echo = true},
ok = amqp10_client_session:flow(Session, Handle, Flow, never).
ok = amqp10_client_session:flow_link(Session, Handle, Flow, never).
%%% messages

View File

@ -20,10 +20,13 @@
attach/2,
detach/2,
transfer/3,
flow/4,
disposition/5
disposition/5,
flow_link/4
]).
%% Manual session flow control is currently only used in tests.
-export([flow/3]).
%% Private API
-export([start_link/4,
socket_ready/2
@ -51,7 +54,8 @@
[add/2,
diff/2]).
-define(MAX_SESSION_WINDOW_SIZE, 65535).
%% By default, we want to keep the server's remote-incoming-window large at all times.
-define(DEFAULT_MAX_INCOMING_WINDOW, 100_000).
-define(UINT_OUTGOING_WINDOW, {uint, ?UINT_MAX}).
-define(INITIAL_OUTGOING_DELIVERY_ID, ?UINT_MAX).
%% "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6]
@ -129,7 +133,8 @@
available = 0 :: non_neg_integer(),
drain = false :: boolean(),
partial_transfers :: undefined | {#'v1_0.transfer'{}, [binary()]},
auto_flow :: never | {auto, RenewWhenBelow :: pos_integer(), Credit :: pos_integer()},
auto_flow :: never | {RenewWhenBelow :: pos_integer(),
Credit :: pos_integer()},
incoming_unsettled = #{} :: #{delivery_number() => ok},
footer_opt :: footer_opt() | undefined
}).
@ -140,7 +145,10 @@
%% session flow control, see section 2.5.6
next_incoming_id :: transfer_number() | undefined,
incoming_window = ?MAX_SESSION_WINDOW_SIZE :: non_neg_integer(),
%% Can become negative if the peer overshoots our window.
incoming_window :: integer(),
auto_flow :: never | {RenewWhenBelow :: pos_integer(),
NewWindowSize :: pos_integer()},
next_outgoing_id = ?INITIAL_OUTGOING_TRANSFER_ID :: transfer_number(),
remote_incoming_window = 0 :: non_neg_integer(),
remote_outgoing_window = 0 :: non_neg_integer(),
@ -200,7 +208,17 @@ transfer(Session, Amqp10Msg, Timeout) ->
[Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg),
gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout).
flow(Session, Handle, Flow, RenewWhenBelow) ->
-spec flow(pid(), non_neg_integer(), never | pos_integer()) -> ok.
flow(Session, IncomingWindow, RenewWhenBelow) when
%% Check that the RenewWhenBelow value make sense.
RenewWhenBelow =:= never orelse
is_integer(RenewWhenBelow) andalso
RenewWhenBelow > 0 andalso
RenewWhenBelow =< IncomingWindow ->
gen_statem:cast(Session, {flow_session, IncomingWindow, RenewWhenBelow}).
-spec flow_link(pid(), link_handle(), #'v1_0.flow'{}, never | pos_integer()) -> ok.
flow_link(Session, Handle, Flow, RenewWhenBelow) ->
gen_statem:cast(Session, {flow_link, Handle, Flow, RenewWhenBelow}).
%% Sending a disposition on a sender link (with receiver-settle-mode = second)
@ -239,6 +257,9 @@ init([FromPid, Channel, Reader, ConnConfig]) ->
channel = Channel,
reader = Reader,
connection_config = ConnConfig,
incoming_window = ?DEFAULT_MAX_INCOMING_WINDOW,
auto_flow = {?DEFAULT_MAX_INCOMING_WINDOW div 2,
?DEFAULT_MAX_INCOMING_WINDOW},
early_attach_requests = []},
{ok, unmapped, State}.
@ -282,15 +303,15 @@ mapped(cast, 'end', State) ->
mapped(cast, {flow_link, OutHandle, Flow0, RenewWhenBelow}, State0) ->
State = send_flow_link(OutHandle, Flow0, RenewWhenBelow, State0),
{keep_state, State};
mapped(cast, {flow_session, Flow0 = #'v1_0.flow'{incoming_window = {uint, IncomingWindow}}},
#state{next_incoming_id = NII,
next_outgoing_id = NOI} = State) ->
Flow = Flow0#'v1_0.flow'{
next_incoming_id = maybe_uint(NII),
next_outgoing_id = uint(NOI),
outgoing_window = ?UINT_OUTGOING_WINDOW},
ok = send(Flow, State),
{keep_state, State#state{incoming_window = IncomingWindow}};
mapped(cast, {flow_session, IncomingWindow, RenewWhenBelow}, State0) ->
AutoFlow = case RenewWhenBelow of
never -> never;
_ -> {RenewWhenBelow, IncomingWindow}
end,
State = State0#state{incoming_window = IncomingWindow,
auto_flow = AutoFlow},
send_flow_session(State),
{keep_state, State};
mapped(cast, #'v1_0.end'{} = End, State) ->
%% We receive the first end frame, reply and terminate.
_ = send_end(State),
@ -656,35 +677,44 @@ is_bare_message_section(_Section) ->
send_flow_link(OutHandle,
#'v1_0.flow'{link_credit = {uint, Credit}} = Flow0, RenewWhenBelow,
#state{links = Links,
next_incoming_id = NII,
next_outgoing_id = NOI,
incoming_window = InWin} = State) ->
#state{links = Links} = State) ->
AutoFlow = case RenewWhenBelow of
never -> never;
Limit -> {auto, Limit, Credit}
_ -> {RenewWhenBelow, Credit}
end,
#{OutHandle := #link{output_handle = H,
role = receiver,
delivery_count = DeliveryCount,
available = Available} = Link} = Links,
Flow = Flow0#'v1_0.flow'{
handle = uint(H),
%% "This value MUST be set if the peer has received the begin
%% frame for the session, and MUST NOT be set if it has not." [2.7.4]
next_incoming_id = maybe_uint(NII),
next_outgoing_id = uint(NOI),
outgoing_window = ?UINT_OUTGOING_WINDOW,
incoming_window = uint(InWin),
%% "In the event that the receiving link endpoint has not yet seen the
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
delivery_count = maybe_uint(DeliveryCount),
available = uint(Available)},
Flow1 = Flow0#'v1_0.flow'{
handle = uint(H),
%% "In the event that the receiving link endpoint has not yet seen the
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
delivery_count = maybe_uint(DeliveryCount),
available = uint(Available)},
Flow = set_flow_session_fields(Flow1, State),
ok = send(Flow, State),
State#state{links = Links#{OutHandle =>
Link#link{link_credit = Credit,
auto_flow = AutoFlow}}}.
send_flow_session(State) ->
Flow = set_flow_session_fields(#'v1_0.flow'{}, State),
ok = send(Flow, State).
set_flow_session_fields(Flow, #state{next_incoming_id = NID,
incoming_window = IW,
next_outgoing_id = NOI}) ->
Flow#'v1_0.flow'{
%% "This value MUST be set if the peer has received the begin
%% frame for the session, and MUST NOT be set if it has not." [2.7.4]
next_incoming_id = maybe_uint(NID),
%% IncomingWindow0 can be negative when the sending server overshoots our window.
%% We must set a floor of 0 in the FLOW frame because field incoming-window is an uint.
incoming_window = uint(max(0, IW)),
next_outgoing_id = uint(NOI),
outgoing_window = ?UINT_OUTGOING_WINDOW}.
build_frames(Channel, Trf, Bin, MaxPayloadSize, Acc)
when byte_size(Bin) =< MaxPayloadSize ->
T = amqp10_framing:encode_bin(Trf#'v1_0.transfer'{more = false}),
@ -1059,17 +1089,21 @@ book_transfer_send(Num, #link{output_handle = Handle} = Link,
links = Links#{Handle => book_link_transfer_send(Link)}}.
book_partial_transfer_received(#state{next_incoming_id = NID,
remote_outgoing_window = ROW} = State) ->
State#state{next_incoming_id = add(NID, 1),
remote_outgoing_window = ROW - 1}.
incoming_window = IW,
remote_outgoing_window = ROW} = State0) ->
State = State0#state{next_incoming_id = add(NID, 1),
incoming_window = IW - 1,
remote_outgoing_window = ROW - 1},
maybe_widen_incoming_window(State).
book_transfer_received(State = #state{connection_config =
#{transfer_limit_margin := Margin}},
#link{link_credit = Margin} = Link) ->
{transfer_limit_exceeded, Link, State};
book_transfer_received(#state{next_incoming_id = NID,
incoming_window = IW,
remote_outgoing_window = ROW,
links = Links} = State,
links = Links} = State0,
#link{output_handle = OutHandle,
delivery_count = DC,
link_credit = LC,
@ -1079,19 +1113,31 @@ book_transfer_received(#state{next_incoming_id = NID,
%% "the receiver MUST maintain a floor of zero in its
%% calculation of the value of available" [2.6.7]
available = max(0, Avail - 1)},
State1 = State#state{links = Links#{OutHandle => Link1},
next_incoming_id = add(NID, 1),
remote_outgoing_window = ROW - 1},
State1 = State0#state{links = Links#{OutHandle => Link1},
next_incoming_id = add(NID, 1),
incoming_window = IW - 1,
remote_outgoing_window = ROW - 1},
State = maybe_widen_incoming_window(State1),
case Link1 of
#link{link_credit = 0,
auto_flow = never} ->
{credit_exhausted, Link1, State1};
{credit_exhausted, Link1, State};
_ ->
{ok, Link1, State1}
{ok, Link1, State}
end.
maybe_widen_incoming_window(
State0 = #state{incoming_window = IncomingWindow,
auto_flow = {RenewWhenBelow, NewWindowSize}})
when IncomingWindow < RenewWhenBelow ->
State = State0#state{incoming_window = NewWindowSize},
send_flow_session(State),
State;
maybe_widen_incoming_window(State) ->
State.
auto_flow(#link{link_credit = LC,
auto_flow = {auto, RenewWhenBelow, Credit},
auto_flow = {RenewWhenBelow, Credit},
output_handle = OutHandle,
incoming_unsettled = Unsettled},
State)
@ -1230,6 +1276,7 @@ format_status(Status = #{data := Data0}) ->
remote_channel = RemoteChannel,
next_incoming_id = NextIncomingId,
incoming_window = IncomingWindow,
auto_flow = SessionAutoFlow,
next_outgoing_id = NextOutgoingId,
remote_incoming_window = RemoteIncomingWindow,
remote_outgoing_window = RemoteOutgoingWindow,
@ -1294,6 +1341,7 @@ format_status(Status = #{data := Data0}) ->
remote_channel => RemoteChannel,
next_incoming_id => NextIncomingId,
incoming_window => IncomingWindow,
auto_flow => SessionAutoFlow,
next_outgoing_id => NextOutgoingId,
remote_incoming_window => RemoteIncomingWindow,
remote_outgoing_window => RemoteOutgoingWindow,

View File

@ -163,6 +163,8 @@ groups() ->
incoming_window_closed_rabbitmq_internal_flow_quorum_queue,
tcp_back_pressure_rabbitmq_internal_flow_classic_queue,
tcp_back_pressure_rabbitmq_internal_flow_quorum_queue,
session_flow_control_default_max_frame_size,
session_flow_control_small_max_frame_size,
session_max_per_connection,
link_max_per_session,
reserved_annotation,
@ -1644,7 +1646,7 @@ server_closes_link(QType, Config) ->
receive {amqp10_msg, Receiver, Msg} ->
?assertEqual([Body], amqp10_msg:body(Msg))
after 30000 -> ct:fail("missing msg")
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
[SessionPid] = rpc(Config, rabbit_amqp_session, list_local, []),
@ -2994,7 +2996,7 @@ detach_requeues_two_connections(QType, Config) ->
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session1, <<"my link pair">>),
QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}},
{ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps),
flush(link_pair_attached),
flush(queue_declared),
%% Attach 1 sender and 2 receivers.
{ok, Sender} = amqp10_client:attach_sender_link(Session0, <<"sender">>, Address, settled),
@ -3004,7 +3006,7 @@ detach_requeues_two_connections(QType, Config) ->
receive {amqp10_event, {link, Receiver0, attached}} -> ok
after 30000 -> ct:fail({missing_event, ?LINE})
end,
ok = gen_statem:cast(Session0, {flow_session, #'v1_0.flow'{incoming_window = {uint, 1}}}),
ok = amqp10_client_session:flow(Session0, 1, never),
ok = amqp10_client:flow_link_credit(Receiver0, 50, never),
%% Wait for credit being applied to the queue.
timer:sleep(100),
@ -4319,7 +4321,7 @@ available_messages(QType, Config) ->
link_credit = {uint, 1},
%% Request sending queue to send us a FLOW including available messages.
echo = true},
ok = amqp10_client_session:flow(Session, OutputHandle, Flow0, never),
ok = amqp10_client_session:flow_link(Session, OutputHandle, Flow0, never),
receive_messages(Receiver, 1),
receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok
after 30000 -> ct:fail({missing_event, ?LINE})
@ -4360,8 +4362,8 @@ available_messages(QType, Config) ->
link_credit = {uint, 1},
echo = true},
%% Send both FLOW frames in sequence.
ok = amqp10_client_session:flow(Session, OutputHandle, Flow1, never),
ok = amqp10_client_session:flow(Session, OutputHandle, Flow2, never),
ok = amqp10_client_session:flow_link(Session, OutputHandle, Flow1, never),
ok = amqp10_client_session:flow_link(Session, OutputHandle, Flow2, never),
receive_messages(Receiver, 1),
receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok
after 30000 -> ct:fail({missing_event, ?LINE})
@ -5916,7 +5918,7 @@ incoming_window_closed_transfer_flow_order(Config) ->
end,
%% Open our incoming window
gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 5}}}),
ok = amqp10_client_session:flow(Session, 5, never),
%% Important: We should first receive the TRANSFER,
%% and only thereafter the FLOW (and hence the credit_exhausted notification).
receive First ->
@ -5969,7 +5971,7 @@ incoming_window_closed_stop_link(Config) ->
end,
%% Open our incoming window
gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 5}}}),
ok = amqp10_client_session:flow(Session, 5, never),
%% Since we decreased link credit dynamically, we may or may not receive the 1st message.
receive {amqp10_msg, Receiver, Msg1} ->
@ -6015,7 +6017,7 @@ incoming_window_closed_close_link(Config) ->
%% Close the link while our session incoming-window is closed.
ok = detach_link_sync(Receiver),
%% Open our incoming window.
gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 5}}}),
ok = amqp10_client_session:flow(Session, 5, never),
%% Given that both endpoints have now destroyed the link, we do not
%% expect to receive any TRANSFER or FLOW frame referencing the destroyed link.
receive Unexpected2 -> ct:fail({unexpected, Unexpected2})
@ -6069,7 +6071,7 @@ incoming_window_closed_rabbitmq_internal_flow(QType, Config) ->
?assert(MsgsReady > 0),
%% Open our incoming window.
gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, Num}}}),
ok = amqp10_client_session:flow(Session, 100, 50),
receive_messages(Receiver, Num),
ok = detach_link_sync(Receiver),
@ -6168,6 +6170,122 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) ->
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = close({Connection, Session, LinkPair}).
session_flow_control_default_max_frame_size(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(QName),
{_, Session, LinkPair} = Init = init(Config),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address),
ok = wait_for_credit(Sender),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address),
receive {amqp10_event, {link, Receiver, attached}} -> ok
after 9000 -> ct:fail({missing_event, ?LINE})
end,
Num = 1000,
ok = send_messages(Sender, Num, false),
ok = wait_for_accepts(Num),
ok = amqp10_client_session:flow(Session, 2, never),
%% Grant link credit worth of all messages that we are going to receive
%% in this test case.
ok = amqp10_client:flow_link_credit(Receiver, Num * 2, never),
[Msg1000, Msg999] = receive_messages(Receiver, 2),
?assertEqual(<<"1000">>, amqp10_msg:body_bin(Msg1000)),
?assertEqual(<<"999">>, amqp10_msg:body_bin(Msg999)),
receive {amqp10_msg, _, _} = Unexpected0 ->
ct:fail({unexpected_msg, Unexpected0, ?LINE})
after 50 -> ok
end,
ok = amqp10_client_session:flow(Session, 1, never),
[Msg998] = receive_messages(Receiver, 1),
?assertEqual(<<"998">>, amqp10_msg:body_bin(Msg998)),
receive {amqp10_msg, _, _} = Unexpected1 ->
ct:fail({unexpected_msg, Unexpected1, ?LINE})
after 50 -> ok
end,
ok = amqp10_client_session:flow(Session, 0, never),
receive {amqp10_msg, _, _} = Unexpected2 ->
ct:fail({unexpected_msg, Unexpected2, ?LINE})
after 50 -> ok
end,
%% When the client automatically widens the session window,
%% we should receive all remaining messages.
ok = amqp10_client_session:flow(Session, 2, 1),
receive_messages(Receiver, Num - 3),
%% Let's test with a different auto renew session flow config (100, 100).
ok = amqp10_client_session:flow(Session, 0, never),
ok = send_messages(Sender, Num, false),
ok = wait_for_accepts(Num),
receive {amqp10_msg, _, _} = Unexpected3 ->
ct:fail({unexpected_msg, Unexpected3, ?LINE})
after 50 -> ok
end,
ok = amqp10_client_session:flow(Session, 100, 100),
receive_messages(Receiver, Num),
ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:detach_link(Receiver),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = close(Init).
%% Test session flow control with large messages split into multiple transfer frames.
session_flow_control_small_max_frame_size(Config) ->
OpnConf0 = connection_config(Config),
OpnConf = OpnConf0#{max_frame_size => 1000},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"pair">>),
QName = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(QName),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address),
ok = wait_for_credit(Sender),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address),
receive {amqp10_event, {link, Receiver, attached}} -> ok
after 9000 -> ct:fail({missing_event, ?LINE})
end,
Suffix = binary:copy(<<"x">>, 2500),
Num = 10,
ok = send_messages(Sender, Num, false, Suffix),
ok = wait_for_accepts(Num),
%% 1 message of size ~2500 bytes gets split into 3 transfer frames
%% because each transfer frame has max size of 1000 bytes.
%% Hence, if we set our incoming-window to 3, we should receive exactly 1 message.
ok = amqp10_client_session:flow(Session, 3, never),
%% Grant plenty of link credit.
ok = amqp10_client:flow_link_credit(Receiver, Num * 5, never),
receive {amqp10_msg, Receiver, Msg10} ->
?assertEqual(<<"10", Suffix/binary>>,
amqp10_msg:body_bin(Msg10))
after 9000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, _, _} = Unexpected0 ->
ct:fail({unexpected_msg, Unexpected0, ?LINE})
after 50 -> ok
end,
%% When the client automatically widens the session window,
%% we should receive all remaining messages.
ok = amqp10_client_session:flow(Session, 2, 1),
Msgs = receive_messages(Receiver, Num - 1),
Msg1 = lists:last(Msgs),
?assertEqual(<<"1", Suffix/binary>>,
amqp10_msg:body_bin(Msg1)),
ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:detach_link(Receiver),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = close_connection_sync(Connection).
session_max_per_connection(Config) ->
App = rabbit,
Par = session_max_per_connection,
@ -6703,4 +6821,4 @@ find_event(Type, Props, Events) when is_list(Props), is_list(Events) ->
end, Events).
close_incoming_window(Session) ->
gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 0}}}).
amqp10_client_session:flow(Session, 0, never).

View File

@ -1015,7 +1015,7 @@ session_flow_control(Config) ->
ok = amqp10_client:flow_link_credit(IncomingLink, 1, never),
%% Close our incoming window.
gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 0}}}),
amqp10_client_session:flow(Session, 0, never),
Request0 = amqp10_msg:new(<<>>, #'v1_0.amqp_value'{content = null}, true),
MessageId = <<1>>,
@ -1031,7 +1031,7 @@ session_flow_control(Config) ->
end,
%% Open our incoming window
gen_statem:cast(Session, {flow_session, #'v1_0.flow'{incoming_window = {uint, 5}}}),
amqp10_client_session:flow(Session, 1, never),
receive {amqp10_msg, IncomingLink, Response} ->
?assertMatch(#{correlation_id := MessageId,