Refactor rabbit_stream_queue:credit/5

And fix a race condition flake in amqp10_client_SUITE.
This commit is contained in:
Karl Nilsson 2024-01-25 10:31:49 +00:00
parent 0926f66416
commit 2eb3c2c900
2 changed files with 42 additions and 32 deletions

View File

@ -447,30 +447,30 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
credit(QName, CTag, Credit, Drain, #stream_client{readers = Readers0,
name = Name,
local_pid = LocalPid} = State) ->
{Readers1, Msgs} = case Readers0 of
#{CTag := #stream{credit = Credit0} = Str0} ->
Str1 = Str0#stream{credit = Credit0 + Credit},
{Str, Msgs0} = stream_entries(QName, Name, LocalPid, Str1),
{Readers0#{CTag => Str}, Msgs0};
case Readers0 of
#{CTag := #stream{credit = Credit0} = Str0} ->
Str1 = Str0#stream{credit = Credit0 + Credit},
{Str, Msgs} = stream_entries(QName, Name, LocalPid, Str1),
Actions = case Msgs of
[] ->
[{send_credit_reply, 0}];
_ ->
{Readers0, []}
[{send_credit_reply, length(Msgs)},
{deliver, CTag, true, Msgs}]
end,
{Readers, Actions} =
case Drain of
true ->
case Readers1 of
#{CTag := #stream{credit = Credit1} = Str2} ->
{Readers0#{CTag => Str2#stream{credit = 0}},
[{send_drained, {CTag, Credit1}}]};
_ ->
{Readers1, []}
end;
false ->
{Readers1, []}
end,
{State#stream_client{readers = Readers},
[{send_credit_reply, length(Msgs)},
{deliver, CTag, true, Msgs}] ++ Actions}.
case Drain of
true ->
Readers = Readers0#{CTag => Str#stream{credit = 0}},
{State#stream_client{readers = Readers},
%% send_drained needs to come after deliver
Actions ++ [{send_drained, {CTag, Str#stream.credit}}]};
false ->
Readers = Readers0#{CTag => Str},
{State#stream_client{readers = Readers}, Actions}
end;
_ ->
{State, []}
end.
deliver(QSs, Msg, Options) ->
lists:foldl(
@ -552,16 +552,20 @@ handle_event(QName, {osiris_offset, _From, _Offs},
State = #stream_client{local_pid = LocalPid,
readers = Readers0,
name = Name}) ->
Ack = true,
%% offset isn't actually needed as we use the atomic to read the
%% current committed
{Readers, TagMsgs} = maps:fold(
fun (Tag, Str0, {Acc, TM}) ->
{Str, Msgs} = stream_entries(QName, Name, LocalPid, Str0),
{Acc#{Tag => Str}, [{Tag, LocalPid, Msgs} | TM]}
end, {#{}, []}, Readers0),
Ack = true,
Deliveries = [{deliver, Tag, Ack, OffsetMsg}
|| {Tag, _LeaderPid, OffsetMsg} <- TagMsgs],
{Readers, Deliveries} =
maps:fold(
fun (Tag, Str0, {Acc, TM}) ->
case stream_entries(QName, Name, LocalPid, Str0) of
{Str, []} ->
{Acc#{Tag => Str}, TM};
{Str, Msgs} ->
{Acc#{Tag => Str},
[{deliver, Tag, Ack, Msgs} | TM]}
end
end, {#{}, []}, Readers0),
{ok, State#stream_client{readers = Readers}, Deliveries};
handle_event(_QName, {stream_leader_change, Pid}, State) ->
{ok, update_leader_pid(Pid, State), []};

View File

@ -254,11 +254,13 @@ roundtrip_queue_with_drain(Config, QueueType, QName) when is_binary(QueueType) -
wait_for_credit(Sender),
Dtag = <<"my-tag">>,
% create a new message using a delivery-tag, body and indicate
% it's settlement status (true meaning no disposition confirmation
% will be sent by the receiver).
OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true),
OutMsg = amqp10_msg:new(Dtag, <<"my-body">>, false),
ok = amqp10_client:send_msg(Sender, OutMsg),
ok = wait_for_settlement(Dtag),
flush("pre-receive"),
% create a receiver link
@ -286,14 +288,18 @@ roundtrip_queue_with_drain(Config, QueueType, QName) when is_binary(QueueType) -
wait_for_accepts(1),
ok
after 2000 ->
flush("delivery_timeout"),
exit(delivery_timeout)
end,
OutMsg2 = amqp10_msg:new(<<"my-tag">>, <<"my-body2">>, true),
Dtag = <<"my-tag">>,
OutMsg2 = amqp10_msg:new(Dtag, <<"my-body2">>, false),
ok = amqp10_client:send_msg(Sender, OutMsg2),
ok = wait_for_settlement(Dtag),
%% no delivery should be made at this point
receive
{amqp10_msg, _, _} ->
flush("unexpected_delivery"),
exit(unexpected_delivery)
after 500 ->
ok