Apply PR feedback

This commit is contained in:
David Ansari 2024-02-28 12:48:21 +01:00 committed by Michael Klishin
parent 8fce918709
commit ad05b7929d
No known key found for this signature in database
GPG Key ID: FF4F6501646A9C9A
3 changed files with 66 additions and 17 deletions

View File

@ -11,6 +11,7 @@
-export([add/2,
compare/2,
ranges/1,
in_range/3,
diff/2,
foldl/4]).
@ -86,6 +87,21 @@ ranges0([H | Rest], [{First, Last} | AccRest] = Acc0) ->
ranges0(Rest, Acc)
end.
-spec in_range(serial_number(), serial_number(), serial_number()) ->
boolean().
in_range(S, First, Last) ->
case compare(S, First) of
less ->
false;
_ ->
case compare(S, Last) of
greater ->
false;
_ ->
true
end
end.
-define(SERIAL_DIFF_BOUND, 16#80000000).
-spec diff(serial_number(), serial_number()) -> integer().
diff(A, B) ->

View File

@ -15,6 +15,7 @@
compare/2,
usort/1,
ranges/1,
in_range/3,
diff/2,
foldl/4]).
@ -22,6 +23,7 @@ all() -> [test_add,
test_compare,
test_usort,
test_ranges,
test_in_range,
test_diff,
test_foldl].
@ -96,6 +98,31 @@ test_ranges(_Config) ->
?assertEqual([{4294967294, 1}, {3, 5}, {10, 10}, {18, 19}],
ranges([1, 10, 4294967294, 0, 3, 4, 5, 19, 18, 4294967295])).
test_in_range(_Config) ->
?assert(in_range(0, 0, 0)),
?assert(in_range(0, 0, 1)),
?assert(in_range(4294967295, 4294967295, 4294967295)),
?assert(in_range(4294967295, 4294967295, 0)),
?assert(in_range(0, 4294967295, 0)),
?assert(in_range(4294967230, 4294967200, 1000)),
?assert(in_range(88, 4294967200, 1000)),
?assertNot(in_range(1, 0, 0)),
?assertNot(in_range(4294967295, 0, 0)),
?assertNot(in_range(0, 1, 1)),
?assertNot(in_range(10, 1, 9)),
?assertNot(in_range(1005, 4294967200, 1000)),
?assertNot(in_range(4294967190, 4294967200, 1000)),
%% Pass wrong First and Last.
?assertNot(in_range(1, 3, 2)),
?assertNot(in_range(2, 3, 2)),
?assertNot(in_range(3, 3, 2)),
?assertNot(in_range(4, 3, 2)),
?assertExit({undefined_serial_comparison, 0, 16#80000000},
in_range(0, 16#80000000, 16#80000000)).
test_diff(_Config) ->
?assertEqual(0, diff(0, 0)),
?assertEqual(0, diff(1, 1)),

View File

@ -7,6 +7,8 @@
-module(rabbit_amqp_session).
-compile({inline, [maps_update_with/4]}).
-behaviour(gen_server).
-include_lib("rabbit_common/include/rabbit.hrl").
@ -1061,20 +1063,16 @@ handle_control(#'v1_0.disposition'{role = ?RECV_ROLE,
consumer_tag = Ctag,
msg_id = MsgId} = Unsettled,
{SettledAcc, UnsettledAcc}) ->
DeliveryIdComparedToFirst = compare(DeliveryId, First),
DeliveryIdComparedToLast = compare(DeliveryId, Last),
if DeliveryIdComparedToFirst =:= less orelse
DeliveryIdComparedToLast =:= greater ->
%% Delivery ID is outside the DISPOSITION range.
{SettledAcc, UnsettledAcc#{DeliveryId => Unsettled}};
true ->
%% Delivery ID is inside the DISPOSITION range.
SettledAcc1 = maps:update_with(
{QName, Ctag},
fun(MsgIds) -> [MsgId | MsgIds] end,
[MsgId],
SettledAcc),
{SettledAcc1, UnsettledAcc}
case serial_number:in_range(DeliveryId, First, Last) of
true ->
SettledAcc1 = maps_update_with(
{QName, Ctag},
fun(MsgIds) -> [MsgId | MsgIds] end,
[MsgId],
SettledAcc),
{SettledAcc1, UnsettledAcc};
false ->
{SettledAcc, UnsettledAcc#{DeliveryId => Unsettled}}
end
end,
{#{}, #{}}, UnsettledMap)
@ -1209,19 +1207,19 @@ session_flow_control_sent_transfers(
State#state{remote_incoming_window = RemoteIncomingWindow - NumTransfers,
next_outgoing_id = add(NextOutgoingId, NumTransfers)}.
settle_delivery_id(Current, {Settled, Unsettled}) ->
settle_delivery_id(Current, {Settled, Unsettled} = Acc) ->
case maps:take(Current, Unsettled) of
{#outgoing_unsettled{queue_name = QName,
consumer_tag = Ctag,
msg_id = MsgId}, Unsettled1} ->
Settled1 = maps:update_with(
Settled1 = maps_update_with(
{QName, Ctag},
fun(MsgIds) -> [MsgId | MsgIds] end,
[MsgId],
Settled),
{Settled1, Unsettled1};
error ->
{Settled, Unsettled}
Acc
end.
settle_op_from_outcome(#'v1_0.accepted'{}) ->
@ -2276,6 +2274,14 @@ check_user_id(Mc, User) ->
protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, Reason, Args)
end.
maps_update_with(Key, Fun, Init, Map) ->
case Map of
#{Key := Value} ->
Map#{Key := Fun(Value)};
_ ->
Map#{Key => Init}
end.
format_status(
#{state := #state{cfg = Cfg,
outgoing_pending = OutgoingPending,