From 2eb3c2c90024d8a3419603e5dc5e349f20b259da Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Thu, 25 Jan 2024 10:31:49 +0000 Subject: [PATCH] Refactor rabbit_stream_queue:credit/5 And fix a race condition flake in amqp10_client_SUITE. --- deps/rabbit/src/rabbit_stream_queue.erl | 64 ++++++++++--------- .../test/amqp10_client_SUITE.erl | 10 ++- 2 files changed, 42 insertions(+), 32 deletions(-) diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index a7140e0aa9..b0f94dd4fd 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -447,30 +447,30 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0, credit(QName, CTag, Credit, Drain, #stream_client{readers = Readers0, name = Name, local_pid = LocalPid} = State) -> - {Readers1, Msgs} = case Readers0 of - #{CTag := #stream{credit = Credit0} = Str0} -> - Str1 = Str0#stream{credit = Credit0 + Credit}, - {Str, Msgs0} = stream_entries(QName, Name, LocalPid, Str1), - {Readers0#{CTag => Str}, Msgs0}; + case Readers0 of + #{CTag := #stream{credit = Credit0} = Str0} -> + Str1 = Str0#stream{credit = Credit0 + Credit}, + {Str, Msgs} = stream_entries(QName, Name, LocalPid, Str1), + Actions = case Msgs of + [] -> + [{send_credit_reply, 0}]; _ -> - {Readers0, []} + [{send_credit_reply, length(Msgs)}, + {deliver, CTag, true, Msgs}] end, - {Readers, Actions} = - case Drain of - true -> - case Readers1 of - #{CTag := #stream{credit = Credit1} = Str2} -> - {Readers0#{CTag => Str2#stream{credit = 0}}, - [{send_drained, {CTag, Credit1}}]}; - _ -> - {Readers1, []} - end; - false -> - {Readers1, []} - end, - {State#stream_client{readers = Readers}, - [{send_credit_reply, length(Msgs)}, - {deliver, CTag, true, Msgs}] ++ Actions}. + case Drain of + true -> + Readers = Readers0#{CTag => Str#stream{credit = 0}}, + {State#stream_client{readers = Readers}, + %% send_drained needs to come after deliver + Actions ++ [{send_drained, {CTag, Str#stream.credit}}]}; + false -> + Readers = Readers0#{CTag => Str}, + {State#stream_client{readers = Readers}, Actions} + end; + _ -> + {State, []} + end. deliver(QSs, Msg, Options) -> lists:foldl( @@ -552,16 +552,20 @@ handle_event(QName, {osiris_offset, _From, _Offs}, State = #stream_client{local_pid = LocalPid, readers = Readers0, name = Name}) -> + Ack = true, %% offset isn't actually needed as we use the atomic to read the %% current committed - {Readers, TagMsgs} = maps:fold( - fun (Tag, Str0, {Acc, TM}) -> - {Str, Msgs} = stream_entries(QName, Name, LocalPid, Str0), - {Acc#{Tag => Str}, [{Tag, LocalPid, Msgs} | TM]} - end, {#{}, []}, Readers0), - Ack = true, - Deliveries = [{deliver, Tag, Ack, OffsetMsg} - || {Tag, _LeaderPid, OffsetMsg} <- TagMsgs], + {Readers, Deliveries} = + maps:fold( + fun (Tag, Str0, {Acc, TM}) -> + case stream_entries(QName, Name, LocalPid, Str0) of + {Str, []} -> + {Acc#{Tag => Str}, TM}; + {Str, Msgs} -> + {Acc#{Tag => Str}, + [{deliver, Tag, Ack, Msgs} | TM]} + end + end, {#{}, []}, Readers0), {ok, State#stream_client{readers = Readers}, Deliveries}; handle_event(_QName, {stream_leader_change, Pid}, State) -> {ok, update_leader_pid(Pid, State), []}; diff --git a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl index 0a3b017a35..ba7b5a2a83 100644 --- a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl +++ b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl @@ -254,11 +254,13 @@ roundtrip_queue_with_drain(Config, QueueType, QName) when is_binary(QueueType) - wait_for_credit(Sender), + Dtag = <<"my-tag">>, % create a new message using a delivery-tag, body and indicate % it's settlement status (true meaning no disposition confirmation % will be sent by the receiver). - OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true), + OutMsg = amqp10_msg:new(Dtag, <<"my-body">>, false), ok = amqp10_client:send_msg(Sender, OutMsg), + ok = wait_for_settlement(Dtag), flush("pre-receive"), % create a receiver link @@ -286,14 +288,18 @@ roundtrip_queue_with_drain(Config, QueueType, QName) when is_binary(QueueType) - wait_for_accepts(1), ok after 2000 -> + flush("delivery_timeout"), exit(delivery_timeout) end, - OutMsg2 = amqp10_msg:new(<<"my-tag">>, <<"my-body2">>, true), + Dtag = <<"my-tag">>, + OutMsg2 = amqp10_msg:new(Dtag, <<"my-body2">>, false), ok = amqp10_client:send_msg(Sender, OutMsg2), + ok = wait_for_settlement(Dtag), %% no delivery should be made at this point receive {amqp10_msg, _, _} -> + flush("unexpected_delivery"), exit(unexpected_delivery) after 500 -> ok