Avoid creation of binaries in AMQP 1.0 generator
When generating iodata() in the AMQP 1.0 generator, prefer integers over binaries. Rename functions and variable names to better reflect the AMQP 1.0 spec instead of using AMQP 0.9.1 wording.
This commit is contained in:
parent
5ad61fe9f3
commit
b0260cf4b3
|
@ -836,7 +836,7 @@ handle_link_flow(#'v1_0.flow'{delivery_count = MaybeTheirDC,
|
|||
{uint, DC} -> DC;
|
||||
undefined -> ?INITIAL_DELIVERY_COUNT
|
||||
end,
|
||||
LinkCredit = diff(add(TheirDC, TheirCredit), OurDC),
|
||||
LinkCredit = amqp10_util:link_credit_snd(TheirDC, TheirCredit, OurDC),
|
||||
{ok, Link#link{link_credit = LinkCredit}};
|
||||
handle_link_flow(#'v1_0.flow'{delivery_count = TheirDC,
|
||||
link_credit = {uint, TheirCredit},
|
||||
|
@ -1219,24 +1219,29 @@ handle_session_flow_pre_begin_test() ->
|
|||
?assertEqual(998 - 51, State#state.remote_incoming_window).
|
||||
|
||||
handle_link_flow_sender_test() ->
|
||||
Handle = 45,
|
||||
DeliveryCount = 55,
|
||||
Link = #link{role = sender, output_handle = 99,
|
||||
link_credit = 0, delivery_count = DeliveryCount + 2},
|
||||
Flow = #'v1_0.flow'{handle = {uint, Handle},
|
||||
link_credit = {uint, 42},
|
||||
delivery_count = {uint, DeliveryCount}
|
||||
DeliveryCountRcv = 55,
|
||||
DeliveryCountSnd = DeliveryCountRcv + 2,
|
||||
LinkCreditRcv = 42,
|
||||
Link = #link{role = sender,
|
||||
output_handle = 99,
|
||||
link_credit = 0,
|
||||
delivery_count = DeliveryCountSnd},
|
||||
Flow = #'v1_0.flow'{handle = {uint, 45},
|
||||
link_credit = {uint, LinkCreditRcv},
|
||||
delivery_count = {uint, DeliveryCountRcv}
|
||||
},
|
||||
{ok, Outcome} = handle_link_flow(Flow, Link),
|
||||
% see section 2.6.7
|
||||
?assertEqual(DeliveryCount + 42 - (DeliveryCount + 2), Outcome#link.link_credit),
|
||||
?assertEqual(DeliveryCountRcv + LinkCreditRcv - DeliveryCountSnd,
|
||||
Outcome#link.link_credit),
|
||||
|
||||
% receiver does not yet know the delivery_count
|
||||
{ok, Outcome2} = handle_link_flow(Flow#'v1_0.flow'{delivery_count = undefined},
|
||||
Link),
|
||||
% using serial number arithmetic:
|
||||
% ?INITIAL_DELIVERY_COUNT + 42 - (DeliveryCount + 2) = -18
|
||||
?assertEqual(-18, Outcome2#link.link_credit).
|
||||
% ?INITIAL_DELIVERY_COUNT + LinkCreditRcv - DeliveryCountSnd = -18
|
||||
% but we maintain a floor of zero
|
||||
?assertEqual(0, Outcome2#link.link_credit).
|
||||
|
||||
handle_link_flow_sender_drain_test() ->
|
||||
Handle = 45,
|
||||
|
|
|
@ -13,6 +13,7 @@ def all_beam_files(name = "all_beam_files"):
|
|||
"src/amqp10_binary_parser.erl",
|
||||
"src/amqp10_framing.erl",
|
||||
"src/amqp10_framing0.erl",
|
||||
"src/amqp10_util.erl",
|
||||
"src/serial_number.erl",
|
||||
],
|
||||
hdrs = [":public_and_private_hdrs"],
|
||||
|
@ -35,6 +36,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
|
|||
"src/amqp10_binary_parser.erl",
|
||||
"src/amqp10_framing.erl",
|
||||
"src/amqp10_framing0.erl",
|
||||
"src/amqp10_util.erl",
|
||||
"src/serial_number.erl",
|
||||
],
|
||||
hdrs = [":public_and_private_hdrs"],
|
||||
|
@ -64,6 +66,7 @@ def all_srcs(name = "all_srcs"):
|
|||
"src/amqp10_binary_parser.erl",
|
||||
"src/amqp10_framing.erl",
|
||||
"src/amqp10_framing0.erl",
|
||||
"src/amqp10_util.erl",
|
||||
"src/serial_number.erl",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -87,8 +87,7 @@ def print_hrl(types, defines):
|
|||
for opt in d.options:
|
||||
print_define(opt, d.source)
|
||||
print("""
|
||||
-define(DESCRIBED, 0:8).
|
||||
-define(DESCRIBED_BIN, <<?DESCRIBED>>).
|
||||
-define(DESCRIBED, 0).
|
||||
""")
|
||||
|
||||
|
||||
|
|
|
@ -61,153 +61,165 @@
|
|||
-define(DOFF, 2).
|
||||
-define(VAR_1_LIMIT, 16#FF).
|
||||
|
||||
-spec build_frame(integer(), iolist()) -> iolist().
|
||||
build_frame(Channel, Payload) ->
|
||||
build_frame(Channel, ?AMQP_FRAME_TYPE, Payload).
|
||||
-spec build_frame(non_neg_integer(), iolist()) -> iolist().
|
||||
build_frame(Channel, Body) ->
|
||||
build_frame(Channel, ?AMQP_FRAME_TYPE, Body).
|
||||
|
||||
build_frame(Channel, FrameType, Payload) ->
|
||||
Size = iolist_size(Payload) + 8, % frame header and no extension
|
||||
[ <<Size:32/unsigned, 2:8, FrameType:8, Channel:16/unsigned>>, Payload ].
|
||||
-spec build_frame(non_neg_integer(), non_neg_integer(), iolist()) -> iolist().
|
||||
build_frame(Channel, FrameType, Body) ->
|
||||
Size = iolist_size(Body) + 8, % frame header and no extension
|
||||
[<<Size:32, 2:8, FrameType:8, Channel:16>>, Body].
|
||||
|
||||
build_heartbeat_frame() ->
|
||||
%% length is inclusive
|
||||
<<8:32, ?DOFF:8, ?AMQP_FRAME_TYPE:8, 0:16>>.
|
||||
|
||||
-spec generate(amqp10_type()) -> iolist().
|
||||
generate({described, Descriptor, Value}) ->
|
||||
DescBin = generate(Descriptor),
|
||||
ValueBin = generate(Value),
|
||||
[ ?DESCRIBED_BIN, DescBin, ValueBin ];
|
||||
-spec generate(amqp10_type()) -> iodata().
|
||||
generate(Type) ->
|
||||
case generate1(Type) of
|
||||
Byte when is_integer(Byte) ->
|
||||
[Byte];
|
||||
IoData ->
|
||||
IoData
|
||||
end.
|
||||
|
||||
generate(null) -> <<16#40>>;
|
||||
generate(true) -> <<16#41>>;
|
||||
generate(false) -> <<16#42>>;
|
||||
generate({boolean, true}) -> <<16#56, 16#01>>;
|
||||
generate({boolean, false}) -> <<16#56, 16#00>>;
|
||||
generate1({described, Descriptor, Value}) ->
|
||||
DescBin = generate1(Descriptor),
|
||||
ValueBin = generate1(Value),
|
||||
[?DESCRIBED, DescBin, ValueBin];
|
||||
|
||||
generate1(null) -> 16#40;
|
||||
generate1(true) -> 16#41;
|
||||
generate1(false) -> 16#42;
|
||||
generate1({boolean, true}) -> [16#56, 16#01];
|
||||
generate1({boolean, false}) -> [16#56, 16#00];
|
||||
|
||||
%% some integral types have a compact encoding as a byte; this is in
|
||||
%% particular for the descriptors of AMQP types, which have the domain
|
||||
%% bits set to zero and values < 256.
|
||||
generate({ubyte, V}) -> <<16#50,V:8/unsigned>>;
|
||||
generate({ushort, V}) -> <<16#60,V:16/unsigned>>;
|
||||
generate({uint, V}) when V =:= 0 -> <<16#43>>;
|
||||
generate({uint, V}) when V < 256 -> <<16#52,V:8/unsigned>>;
|
||||
generate({uint, V}) -> <<16#70,V:32/unsigned>>;
|
||||
generate({ulong, V}) when V =:= 0 -> <<16#44>>;
|
||||
generate({ulong, V}) when V < 256 -> <<16#53,V:8/unsigned>>;
|
||||
generate({ulong, V}) -> <<16#80,V:64/unsigned>>;
|
||||
generate({byte, V}) -> <<16#51,V:8/signed>>;
|
||||
generate({short, V}) -> <<16#61,V:16/signed>>;
|
||||
generate({int, V}) when V<128 andalso V>-129 -> <<16#54,V:8/signed>>;
|
||||
generate({int, V}) -> <<16#71,V:32/signed>>;
|
||||
generate({long, V}) when V<128 andalso V>-129 -> <<16#55,V:8/signed>>;
|
||||
generate({long, V}) -> <<16#81,V:64/signed>>;
|
||||
generate({float, V}) -> <<16#72,V:32/float>>;
|
||||
generate({double, V}) -> <<16#82,V:64/float>>;
|
||||
generate({char, V}) -> <<16#73,V:4/binary>>;
|
||||
generate({timestamp,V}) -> <<16#83,V:64/signed>>;
|
||||
generate({uuid, V}) -> <<16#98,V:16/binary>>;
|
||||
generate1({ubyte, V}) -> [16#50, V];
|
||||
generate1({ushort, V}) -> <<16#60,V:16/unsigned>>;
|
||||
generate1({uint, V}) when V =:= 0 -> 16#43;
|
||||
generate1({uint, V}) when V < 256 -> [16#52, V];
|
||||
generate1({uint, V}) -> <<16#70,V:32/unsigned>>;
|
||||
generate1({ulong, V}) when V =:= 0 -> 16#44;
|
||||
generate1({ulong, V}) when V < 256 -> [16#53, V];
|
||||
generate1({ulong, V}) -> <<16#80,V:64/unsigned>>;
|
||||
generate1({byte, V}) -> <<16#51,V:8/signed>>;
|
||||
generate1({short, V}) -> <<16#61,V:16/signed>>;
|
||||
generate1({int, V}) when V<128 andalso V>-129 -> <<16#54,V:8/signed>>;
|
||||
generate1({int, V}) -> <<16#71,V:32/signed>>;
|
||||
generate1({long, V}) when V<128 andalso V>-129 -> <<16#55,V:8/signed>>;
|
||||
generate1({long, V}) -> <<16#81,V:64/signed>>;
|
||||
generate1({float, V}) -> <<16#72,V:32/float>>;
|
||||
generate1({double, V}) -> <<16#82,V:64/float>>;
|
||||
generate1({char, V}) -> <<16#73,V:4/binary>>;
|
||||
generate1({timestamp,V}) -> <<16#83,V:64/signed>>;
|
||||
generate1({uuid, V}) -> <<16#98,V:16/binary>>;
|
||||
|
||||
generate({utf8, V}) when size(V) < ?VAR_1_LIMIT -> [<<16#a1,(size(V)):8>>, V];
|
||||
generate({utf8, V}) -> [<<16#b1,(size(V)):32>>, V];
|
||||
generate({symbol, V}) -> [<<16#a3,(size(V)):8>>, V];
|
||||
generate({binary, V}) ->
|
||||
generate1({utf8, V}) when size(V) < ?VAR_1_LIMIT -> [16#a1, size(V), V];
|
||||
generate1({utf8, V}) -> [<<16#b1, (size(V)):32>>, V];
|
||||
generate1({symbol, V}) -> [16#a3, size(V), V];
|
||||
generate1({binary, V}) ->
|
||||
Size = iolist_size(V),
|
||||
if Size < ?VAR_1_LIMIT -> [<<16#a0,Size:8>>, V];
|
||||
true -> [<<16#b0,Size:32>>, V]
|
||||
case Size < ?VAR_1_LIMIT of
|
||||
true ->
|
||||
[16#a0, Size, V];
|
||||
false ->
|
||||
[<<16#b0, Size:32>>, V]
|
||||
end;
|
||||
|
||||
generate({list, []}) ->
|
||||
<<16#45>>;
|
||||
generate({list, List}) ->
|
||||
generate1({list, []}) ->
|
||||
16#45;
|
||||
generate1({list, List}) ->
|
||||
Count = length(List),
|
||||
Compound = lists:map(fun generate/1, List),
|
||||
Compound = lists:map(fun generate1/1, List),
|
||||
S = iolist_size(Compound),
|
||||
%% If the list contains less than (256 - 1) elements and if the
|
||||
%% encoded size (including the encoding of "Count", thus S + 1
|
||||
%% in the test) is less than 256 bytes, we use the short form.
|
||||
%% Otherwise, we use the large form.
|
||||
if Count >= (256 - 1) orelse (S + 1) >= 256 ->
|
||||
[<<16#d0, (S + 4):32/unsigned, Count:32/unsigned>>, Compound];
|
||||
true ->
|
||||
[<<16#c0, (S + 1):8/unsigned, Count:8/unsigned>>, Compound]
|
||||
[<<16#d0, (S + 4):32, Count:32>>, Compound];
|
||||
true ->
|
||||
[16#c0, S + 1, Count, Compound]
|
||||
end;
|
||||
|
||||
generate({map, ListOfPairs}) ->
|
||||
generate1({map, ListOfPairs}) ->
|
||||
Count = length(ListOfPairs) * 2,
|
||||
Compound = lists:map(fun ({Key, Val}) ->
|
||||
[(generate(Key)),
|
||||
(generate(Val))]
|
||||
[(generate1(Key)),
|
||||
(generate1(Val))]
|
||||
end, ListOfPairs),
|
||||
S = iolist_size(Compound),
|
||||
%% See generate({list, ...}) for an explanation of this test.
|
||||
%% See generate1({list, ...}) for an explanation of this test.
|
||||
if Count >= (256 - 1) orelse (S + 1) >= 256 ->
|
||||
[<<16#d1, (S + 4):32, Count:32>>, Compound];
|
||||
true ->
|
||||
[<<16#c1, (S + 1):8, Count:8>>, Compound]
|
||||
[<<16#d1, (S + 4):32, Count:32>>, Compound];
|
||||
true ->
|
||||
[16#c1, S + 1, Count, Compound]
|
||||
end;
|
||||
|
||||
generate({array, Type, List}) ->
|
||||
generate1({array, Type, List}) ->
|
||||
Count = length(List),
|
||||
Body = iolist_to_binary([constructor(Type),
|
||||
[generate(Type, I) || I <- List]]),
|
||||
S = size(Body),
|
||||
%% See generate({list, ...}) for an explanation of this test.
|
||||
Array = [constructor(Type),
|
||||
[generate2(Type, I) || I <- List]],
|
||||
S = iolist_size(Array),
|
||||
%% See generate1({list, ...}) for an explanation of this test.
|
||||
if Count >= (256 - 1) orelse (S + 1) >= 256 ->
|
||||
[<<16#f0, (S + 4):32/unsigned, Count:32/unsigned>>, Body];
|
||||
true ->
|
||||
[<<16#e0, (S + 1):8/unsigned, Count:8/unsigned>>, Body]
|
||||
[<<16#f0, (S + 4):32, Count:32>>, Array];
|
||||
true ->
|
||||
[16#e0, S + 1, Count, Array]
|
||||
end;
|
||||
|
||||
generate({as_is, TypeCode, Bin}) ->
|
||||
generate1({as_is, TypeCode, Bin}) ->
|
||||
<<TypeCode, Bin>>.
|
||||
|
||||
%% TODO again these are a stub to get SASL working. New codec? Will
|
||||
%% that ever happen? If not we really just need to split generate/1
|
||||
%% up into things like these...
|
||||
%% for these constructors map straight-forwardly
|
||||
constructor(symbol) -> <<16#b3>>;
|
||||
constructor(ubyte) -> <<16#50>>;
|
||||
constructor(ushort) -> <<16#60>>;
|
||||
constructor(short) -> <<16#61>>;
|
||||
constructor(uint) -> <<16#70>>;
|
||||
constructor(ulong) -> <<16#80>>;
|
||||
constructor(byte) -> <<16#51>>;
|
||||
constructor(int) -> <<16#71>>;
|
||||
constructor(long) -> <<16#81>>;
|
||||
constructor(float) -> <<16#72>>;
|
||||
constructor(double) -> <<16#82>>;
|
||||
constructor(char) -> <<16#73>>;
|
||||
constructor(timestamp) -> <<16#83>>;
|
||||
constructor(uuid) -> <<16#98>>;
|
||||
constructor(null) -> <<16#40>>;
|
||||
constructor(boolean) -> <<16#56>>;
|
||||
constructor(array) -> <<16#f0>>; % use large array type for all nested arrays
|
||||
constructor(utf8) -> <<16#b1>>;
|
||||
constructor(symbol) -> 16#b3;
|
||||
constructor(ubyte) -> 16#50;
|
||||
constructor(ushort) -> 16#60;
|
||||
constructor(short) -> 16#61;
|
||||
constructor(uint) -> 16#70;
|
||||
constructor(ulong) -> 16#80;
|
||||
constructor(byte) -> 16#51;
|
||||
constructor(int) -> 16#71;
|
||||
constructor(long) -> 16#81;
|
||||
constructor(float) -> 16#72;
|
||||
constructor(double) -> 16#82;
|
||||
constructor(char) -> 16#73;
|
||||
constructor(timestamp) -> 16#83;
|
||||
constructor(uuid) -> 16#98;
|
||||
constructor(null) -> 16#40;
|
||||
constructor(boolean) -> 16#56;
|
||||
constructor(array) -> 16#f0; % use large array type for all nested arrays
|
||||
constructor(utf8) -> 16#b1;
|
||||
constructor({described, Descriptor, Primitive}) ->
|
||||
[<<16#00>>, generate(Descriptor), constructor(Primitive)].
|
||||
[16#00, generate1(Descriptor), constructor(Primitive)].
|
||||
|
||||
% returns io_list
|
||||
generate(symbol, {symbol, V}) -> [<<(size(V)):32>>, V];
|
||||
generate(utf8, {utf8, V}) -> [<<(size(V)):32>>, V];
|
||||
generate(boolean, true) -> <<16#01>>;
|
||||
generate(boolean, false) -> <<16#00>>;
|
||||
generate(boolean, {boolean, true}) -> <<16#01>>;
|
||||
generate(boolean, {boolean, false}) -> <<16#00>>;
|
||||
generate(ubyte, {ubyte, V}) -> <<V:8/unsigned>>;
|
||||
generate(byte, {byte, V}) -> <<V:8/signed>>;
|
||||
generate(ushort, {ushort, V}) -> <<V:16/unsigned>>;
|
||||
generate(short, {short, V}) -> <<V:16/signed>>;
|
||||
generate(uint, {uint, V}) -> <<V:32/unsigned>>;
|
||||
generate(int, {int, V}) -> <<V:32/signed>>;
|
||||
generate(ulong, {ulong, V}) -> <<V:64/unsigned>>;
|
||||
generate(long, {long, V}) -> <<V:64/signed>>;
|
||||
generate({described, D, P}, {described, D, V}) ->
|
||||
generate(P, V);
|
||||
generate(array, {array, Type, List}) ->
|
||||
generate2(symbol, {symbol, V}) -> [<<(size(V)):32>>, V];
|
||||
generate2(utf8, {utf8, V}) -> [<<(size(V)):32>>, V];
|
||||
generate2(boolean, true) -> 16#01;
|
||||
generate2(boolean, false) -> 16#00;
|
||||
generate2(boolean, {boolean, true}) -> 16#01;
|
||||
generate2(boolean, {boolean, false}) -> 16#00;
|
||||
generate2(ubyte, {ubyte, V}) -> V;
|
||||
generate2(byte, {byte, V}) -> <<V:8/signed>>;
|
||||
generate2(ushort, {ushort, V}) -> <<V:16/unsigned>>;
|
||||
generate2(short, {short, V}) -> <<V:16/signed>>;
|
||||
generate2(uint, {uint, V}) -> <<V:32/unsigned>>;
|
||||
generate2(int, {int, V}) -> <<V:32/signed>>;
|
||||
generate2(ulong, {ulong, V}) -> <<V:64/unsigned>>;
|
||||
generate2(long, {long, V}) -> <<V:64/signed>>;
|
||||
generate2({described, D, P}, {described, D, V}) ->
|
||||
generate2(P, V);
|
||||
generate2(array, {array, Type, List}) ->
|
||||
Count = length(List),
|
||||
Body = iolist_to_binary([constructor(Type),
|
||||
[generate(Type, I) || I <- List]]),
|
||||
S = size(Body),
|
||||
%% See generate({list, ...}) for an explanation of this test.
|
||||
[<<(S + 4):32/unsigned, Count:32/unsigned>>, Body].
|
||||
Array = [constructor(Type),
|
||||
[generate2(Type, I) || I <- List]],
|
||||
S = iolist_size(Array),
|
||||
%% See generate1({list, ...}) for an explanation of this test.
|
||||
[<<(S + 4):32, Count:32>>, Array].
|
||||
|
|
|
@ -177,7 +177,7 @@ encode_bin(X) ->
|
|||
amqp10_binary_generator:generate(encode(X)).
|
||||
|
||||
decode_bin(X) ->
|
||||
[decode(PerfDesc) || PerfDesc <- amqp10_binary_parser:parse_all(X)].
|
||||
[decode(DescribedPerformative) || DescribedPerformative <- amqp10_binary_parser:parse_all(X)].
|
||||
|
||||
symbol_for(X) ->
|
||||
amqp10_framing0:symbol_for(X).
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
%% This Source Code Form is subject to the terms of the Mozilla Public
|
||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
%%
|
||||
%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
||||
%%
|
||||
|
||||
-module(amqp10_util).
|
||||
-include_lib("amqp10_common/include/amqp10_types.hrl").
|
||||
-export([link_credit_snd/3]).
|
||||
|
||||
%% AMQP 1.0 §2.6.7
|
||||
-spec link_credit_snd(sequence_no(), uint(), sequence_no()) -> uint().
|
||||
link_credit_snd(DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd) ->
|
||||
LinkCreditSnd = serial_number:diff(
|
||||
serial_number:add(DeliveryCountRcv, LinkCreditRcv),
|
||||
DeliveryCountSnd),
|
||||
%% LinkCreditSnd can be negative when receiver decreases credits
|
||||
%% while messages are in flight. Maintain a floor of zero.
|
||||
max(0, LinkCreditSnd).
|
|
@ -147,10 +147,11 @@ recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen})
|
|||
throw({inet_error, Reason})
|
||||
end;
|
||||
recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) ->
|
||||
{Data, Rest} = split_binary(case Buf of
|
||||
[B] -> B;
|
||||
_ -> list_to_binary(lists:reverse(Buf))
|
||||
end, RecvLen),
|
||||
Bin = case Buf of
|
||||
[B] -> B;
|
||||
_ -> list_to_binary(lists:reverse(Buf))
|
||||
end,
|
||||
{Data, Rest} = split_binary(Bin, RecvLen),
|
||||
recvloop(Deb, handle_input(State#v1.callback, Data,
|
||||
State#v1{buf = [Rest],
|
||||
buf_len = BufLen - RecvLen})).
|
||||
|
@ -324,12 +325,9 @@ is_connection_frame(#'v1_0.open'{}) -> true;
|
|||
is_connection_frame(#'v1_0.close'{}) -> true;
|
||||
is_connection_frame(_) -> false.
|
||||
|
||||
%% TODO Handle depending on connection state
|
||||
%% TODO It'd be nice to only decode up to the descriptor
|
||||
|
||||
handle_1_0_frame(Mode, Channel, Payload, State) ->
|
||||
handle_frame(Mode, Channel, Body, State) ->
|
||||
try
|
||||
handle_1_0_frame0(Mode, Channel, Payload, State)
|
||||
handle_frame0(Mode, Channel, Body, State)
|
||||
catch
|
||||
_:#'v1_0.error'{} = Reason ->
|
||||
handle_exception(State, 0, Reason);
|
||||
|
@ -346,41 +344,41 @@ handle_1_0_frame(Mode, Channel, Payload, State) ->
|
|||
[Reason, Trace]))
|
||||
end.
|
||||
|
||||
%% Nothing specifies that connection methods have to be on a
|
||||
%% particular channel.
|
||||
handle_1_0_frame0(_Mode, Channel, Payload,
|
||||
State = #v1{connection_state = CS})
|
||||
%% Nothing specifies that connection methods have to be on a particular channel.
|
||||
handle_frame0(_Mode, Channel, Body,
|
||||
State = #v1{connection_state = CS})
|
||||
when CS =:= closing orelse
|
||||
CS =:= closed ->
|
||||
Sections = parse_1_0_frame(Payload, Channel),
|
||||
case is_connection_frame(Sections) of
|
||||
true -> handle_1_0_connection_frame(Sections, State);
|
||||
Performative = parse_frame_body(Body, Channel),
|
||||
case is_connection_frame(Performative) of
|
||||
true -> handle_connection_frame(Performative, State);
|
||||
false -> State
|
||||
end;
|
||||
handle_1_0_frame0(Mode, Channel, Payload, State) ->
|
||||
Sections = parse_1_0_frame(Payload, Channel),
|
||||
case {Mode, is_connection_frame(Sections)} of
|
||||
{amqp, true} -> handle_1_0_connection_frame(Sections, State);
|
||||
{amqp, false} -> handle_1_0_session_frame(Channel, Sections, State);
|
||||
{sasl, false} -> handle_1_0_sasl_frame(Sections, State)
|
||||
handle_frame0(Mode, Channel, Body, State) ->
|
||||
Performative = parse_frame_body(Body, Channel),
|
||||
case {Mode, is_connection_frame(Performative)} of
|
||||
{amqp, true} -> handle_connection_frame(Performative, State);
|
||||
{amqp, false} -> handle_session_frame(Channel, Performative, State);
|
||||
{sasl, false} -> handle_sasl_frame(Performative, State)
|
||||
end.
|
||||
|
||||
parse_1_0_frame(Payload, _Channel) ->
|
||||
{PerfDesc, Rest} = amqp10_binary_parser:parse(Payload),
|
||||
Perf = amqp10_framing:decode(PerfDesc),
|
||||
%% "The frame body is defined as a performative followed by an opaque payload." [2.3.2]
|
||||
parse_frame_body(Body, _Channel) ->
|
||||
{DescribedPerformative, Payload} = amqp10_binary_parser:parse(Body),
|
||||
Performative = amqp10_framing:decode(DescribedPerformative),
|
||||
?DEBUG("~s Channel ~tp ->~n~tp~n~ts~n",
|
||||
[?MODULE, _Channel, amqp10_framing:pprint(Perf),
|
||||
case Rest of
|
||||
[?MODULE, _Channel, amqp10_framing:pprint(Performative),
|
||||
case Payload of
|
||||
<<>> -> <<>>;
|
||||
_ -> rabbit_misc:format(
|
||||
" followed by ~tp bytes of content", [size(Rest)])
|
||||
" followed by ~tb bytes of payload", [size(Payload)])
|
||||
end]),
|
||||
case Rest of
|
||||
<<>> -> Perf;
|
||||
_ -> {Perf, Rest}
|
||||
case Payload of
|
||||
<<>> -> Performative;
|
||||
_ -> {Performative, Payload}
|
||||
end.
|
||||
|
||||
handle_1_0_connection_frame(
|
||||
handle_connection_frame(
|
||||
#'v1_0.open'{max_frame_size = ClientMaxFrame,
|
||||
channel_max = ClientChannelMax,
|
||||
idle_time_out = IdleTimeout,
|
||||
|
@ -499,7 +497,7 @@ handle_1_0_connection_frame(
|
|||
properties = server_properties()},
|
||||
ok = send_on_channel0(Sock, Open),
|
||||
State;
|
||||
handle_1_0_connection_frame(#'v1_0.close'{}, State0) ->
|
||||
handle_connection_frame(#'v1_0.close'{}, State0) ->
|
||||
State = State0#v1{connection_state = closing},
|
||||
close(undefined, State).
|
||||
|
||||
|
@ -516,17 +514,17 @@ start_writer(#v1{helper_sup = SupPid,
|
|||
{ok, Pid} = supervisor:start_child(SupPid, ChildSpec),
|
||||
State#v1{writer = Pid}.
|
||||
|
||||
handle_1_0_session_frame(Channel, Frame, #v1{tracked_channels = Channels} = State) ->
|
||||
handle_session_frame(Channel, Body, #v1{tracked_channels = Channels} = State) ->
|
||||
case Channels of
|
||||
#{Channel := SessionPid} ->
|
||||
rabbit_amqp_session:process_frame(SessionPid, Frame),
|
||||
rabbit_amqp_session:process_frame(SessionPid, Body),
|
||||
State;
|
||||
_ ->
|
||||
case ?IS_RUNNING(State) of
|
||||
true ->
|
||||
case Frame of
|
||||
case Body of
|
||||
#'v1_0.begin'{} ->
|
||||
send_to_new_1_0_session(Channel, Frame, State);
|
||||
send_to_new_session(Channel, Body, State);
|
||||
_ ->
|
||||
State
|
||||
end;
|
||||
|
@ -534,16 +532,16 @@ handle_1_0_session_frame(Channel, Frame, #v1{tracked_channels = Channels} = Stat
|
|||
throw({channel_frame_while_connection_not_running,
|
||||
Channel,
|
||||
State#v1.connection_state,
|
||||
Frame})
|
||||
Body})
|
||||
end
|
||||
end.
|
||||
|
||||
%% TODO: write a proper ANONYMOUS plugin and unify with STOMP
|
||||
handle_1_0_sasl_frame(#'v1_0.sasl_init'{mechanism = {symbol, <<"ANONYMOUS">>},
|
||||
hostname = _Hostname},
|
||||
#v1{connection_state = starting,
|
||||
connection = Connection,
|
||||
sock = Sock} = State0) ->
|
||||
handle_sasl_frame(#'v1_0.sasl_init'{mechanism = {symbol, <<"ANONYMOUS">>},
|
||||
hostname = _Hostname},
|
||||
#v1{connection_state = starting,
|
||||
connection = Connection,
|
||||
sock = Sock} = State0) ->
|
||||
case default_user() of
|
||||
none ->
|
||||
silent_close_delay(),
|
||||
|
@ -559,12 +557,12 @@ handle_1_0_sasl_frame(#'v1_0.sasl_init'{mechanism = {symbol, <<"ANONYMOUS">>},
|
|||
connection = Connection#v1_connection{auth_mechanism = anonymous}},
|
||||
switch_callback(State, handshake, 8)
|
||||
end;
|
||||
handle_1_0_sasl_frame(#'v1_0.sasl_init'{mechanism = {symbol, Mechanism},
|
||||
initial_response = {binary, Response},
|
||||
hostname = _Hostname},
|
||||
State0 = #v1{connection_state = starting,
|
||||
connection = Connection,
|
||||
sock = Sock}) ->
|
||||
handle_sasl_frame(#'v1_0.sasl_init'{mechanism = {symbol, Mechanism},
|
||||
initial_response = {binary, Response},
|
||||
hostname = _Hostname},
|
||||
State0 = #v1{connection_state = starting,
|
||||
connection = Connection,
|
||||
sock = Sock}) ->
|
||||
AuthMechanism = auth_mechanism_to_module(Mechanism, Sock),
|
||||
State = State0#v1{connection =
|
||||
Connection#v1_connection{
|
||||
|
@ -572,15 +570,15 @@ handle_1_0_sasl_frame(#'v1_0.sasl_init'{mechanism = {symbol, Mechanism},
|
|||
auth_state = AuthMechanism:init(Sock)},
|
||||
connection_state = securing},
|
||||
auth_phase_1_0(Response, State);
|
||||
handle_1_0_sasl_frame(#'v1_0.sasl_response'{response = {binary, Response}},
|
||||
State = #v1{connection_state = securing}) ->
|
||||
handle_sasl_frame(#'v1_0.sasl_response'{response = {binary, Response}},
|
||||
State = #v1{connection_state = securing}) ->
|
||||
auth_phase_1_0(Response, State);
|
||||
handle_1_0_sasl_frame(Frame, State) ->
|
||||
throw({unexpected_1_0_sasl_frame, Frame, State}).
|
||||
handle_sasl_frame(Performative, State) ->
|
||||
throw({unexpected_1_0_sasl_frame, Performative, State}).
|
||||
|
||||
handle_input(handshake, <<"AMQP", 0, 1, 0, 0>>,
|
||||
#v1{connection_state = waiting_amqp0100} = State) ->
|
||||
start_1_0_connection(amqp, State);
|
||||
start_connection(amqp, State);
|
||||
|
||||
handle_input({frame_header_1_0, Mode},
|
||||
Header = <<Size:32, DOff:8, Type:8, Channel:16>>,
|
||||
|
@ -606,11 +604,10 @@ handle_input({frame_header_1_0, Mode},
|
|||
handle_input({frame_header_1_0, _Mode}, Malformed, _State) ->
|
||||
throw({bad_1_0_header, Malformed});
|
||||
handle_input({frame_payload_1_0, Mode, DOff, Channel},
|
||||
FrameBin, State) ->
|
||||
FrameBin, State) ->
|
||||
SkipBits = (DOff * 32 - 64), % DOff = 4-byte words, we've read 8 already
|
||||
<<Skip:SkipBits, FramePayload/binary>> = FrameBin,
|
||||
Skip = Skip, %% hide warning when debug is off
|
||||
handle_1_0_frame(Mode, Channel, FramePayload,
|
||||
<<_Skip:SkipBits, FrameBody/binary>> = FrameBin,
|
||||
handle_frame(Mode, Channel, FrameBody,
|
||||
switch_callback(State, {frame_header_1_0, Mode}, 8));
|
||||
|
||||
handle_input(Callback, Data, _State) ->
|
||||
|
@ -622,12 +619,12 @@ init(Mode, PackedState) ->
|
|||
{parent, Parent} = erlang:process_info(self(), parent),
|
||||
ok = rabbit_connection_sup:remove_connection_helper_sup(Parent, helper_sup_amqp_091),
|
||||
State0 = unpack_from_0_9_1(PackedState, Parent, HandshakeTimeout),
|
||||
State = start_1_0_connection(Mode, State0),
|
||||
State = start_connection(Mode, State0),
|
||||
%% By invoking recvloop here we become 1.0.
|
||||
recvloop(sys:debug_options([]), State).
|
||||
|
||||
start_1_0_connection(Mode = sasl, State = #v1{sock = Sock}) ->
|
||||
send_1_0_handshake(Sock, <<"AMQP",3,1,0,0>>),
|
||||
start_connection(Mode = sasl, State = #v1{sock = Sock}) ->
|
||||
send_handshake(Sock, <<"AMQP",3,1,0,0>>),
|
||||
%% "The server mechanisms are ordered in decreasing level of preference." [5.3.3.1]
|
||||
Ms0 = [{symbol, atom_to_binary(M)} || M <- auth_mechanisms(Sock)],
|
||||
Ms1 = case default_user() of
|
||||
|
@ -637,37 +634,37 @@ start_1_0_connection(Mode = sasl, State = #v1{sock = Sock}) ->
|
|||
Ms2 = {array, symbol, Ms1},
|
||||
Ms = #'v1_0.sasl_mechanisms'{sasl_server_mechanisms = Ms2},
|
||||
ok = send_on_channel0(Sock, Ms, rabbit_amqp_sasl),
|
||||
start_1_0_connection0(Mode, State);
|
||||
start_connection0(Mode, State);
|
||||
|
||||
start_1_0_connection(Mode = amqp,
|
||||
State = #v1{sock = Sock,
|
||||
connection = C = #v1_connection{user = User}}) ->
|
||||
start_connection(Mode = amqp,
|
||||
State = #v1{sock = Sock,
|
||||
connection = C = #v1_connection{user = User}}) ->
|
||||
case User of
|
||||
none ->
|
||||
%% Client either skipped SASL layer or used SASL mechansim ANONYMOUS.
|
||||
case default_user() of
|
||||
none ->
|
||||
send_1_0_handshake(Sock, <<"AMQP",3,1,0,0>>),
|
||||
send_handshake(Sock, <<"AMQP",3,1,0,0>>),
|
||||
throw(banned_unauthenticated_connection);
|
||||
NoAuthUsername ->
|
||||
case rabbit_access_control:check_user_login(NoAuthUsername, []) of
|
||||
{ok, NoAuthUser} ->
|
||||
State1 = State#v1{connection = C#v1_connection{user = NoAuthUser}},
|
||||
send_1_0_handshake(Sock, <<"AMQP",0,1,0,0>>),
|
||||
start_1_0_connection0(Mode, State1);
|
||||
send_handshake(Sock, <<"AMQP",0,1,0,0>>),
|
||||
start_connection0(Mode, State1);
|
||||
{refused, _, _, _} ->
|
||||
send_1_0_handshake(Sock, <<"AMQP",3,1,0,0>>),
|
||||
send_handshake(Sock, <<"AMQP",3,1,0,0>>),
|
||||
throw(amqp1_0_default_user_missing)
|
||||
end
|
||||
end;
|
||||
#user{} ->
|
||||
%% Client already got successfully authenticated by SASL.
|
||||
send_1_0_handshake(Sock, <<"AMQP",0,1,0,0>>),
|
||||
start_1_0_connection0(Mode, State)
|
||||
send_handshake(Sock, <<"AMQP",0,1,0,0>>),
|
||||
start_connection0(Mode, State)
|
||||
end.
|
||||
|
||||
start_1_0_connection0(Mode, State0 = #v1{connection = Connection,
|
||||
helper_sup = HelperSup}) ->
|
||||
start_connection0(Mode, State0 = #v1{connection = Connection,
|
||||
helper_sup = HelperSup}) ->
|
||||
SessionSup = case Mode of
|
||||
sasl ->
|
||||
undefined;
|
||||
|
@ -686,7 +683,7 @@ start_1_0_connection0(Mode, State0 = #v1{connection = Connection,
|
|||
connection = Connection#v1_connection{timeout = ?NORMAL_TIMEOUT}},
|
||||
switch_callback(State, {frame_header_1_0, Mode}, 8).
|
||||
|
||||
send_1_0_handshake(Sock, Handshake) ->
|
||||
send_handshake(Sock, Handshake) ->
|
||||
ok = inet_op(fun () -> rabbit_net:send(Sock, Handshake) end).
|
||||
|
||||
send_on_channel0(Sock, Method) ->
|
||||
|
@ -790,7 +787,7 @@ untrack_channel(ChannelNum, SessionPid, #v1{tracked_channels = Channels0} = Stat
|
|||
State
|
||||
end.
|
||||
|
||||
send_to_new_1_0_session(
|
||||
send_to_new_session(
|
||||
ChannelNum, BeginFrame,
|
||||
#v1{session_sup = SessionSup,
|
||||
connection = #v1_connection{outgoing_max_frame_size = MaxFrame,
|
||||
|
|
|
@ -293,8 +293,8 @@ start_link(ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost, ConnName, Be
|
|||
Opts = [{hibernate_after, ?HIBERNATE_AFTER}],
|
||||
gen_server:start_link(?MODULE, Args, Opts).
|
||||
|
||||
process_frame(Pid, Frame) ->
|
||||
gen_server:cast(Pid, {frame, Frame}).
|
||||
process_frame(Pid, FrameBody) ->
|
||||
gen_server:cast(Pid, {frame_body, FrameBody}).
|
||||
|
||||
init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
|
||||
#'v1_0.begin'{next_outgoing_id = ?UINT(RemoteNextOutgoingId),
|
||||
|
@ -385,10 +385,10 @@ handle_info({{'DOWN', QName}, _MRef, process, QPid, Reason},
|
|||
noreply(State)
|
||||
end.
|
||||
|
||||
handle_cast({frame, Frame},
|
||||
handle_cast({frame_body, FrameBody},
|
||||
#state{cfg = #cfg{writer_pid = WriterPid,
|
||||
channel_num = Ch}} = State0) ->
|
||||
try handle_control(Frame, State0) of
|
||||
try handle_control(FrameBody, State0) of
|
||||
{reply, Replies, State} when is_list(Replies) ->
|
||||
lists:foreach(fun (Reply) ->
|
||||
rabbit_amqp_writer:send_command(WriterPid, Ch, Reply)
|
||||
|
@ -1048,14 +1048,14 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
|
|||
end
|
||||
end;
|
||||
|
||||
handle_control({Txfr = #'v1_0.transfer'{handle = ?UINT(Handle)}, MsgPart},
|
||||
handle_control({Performative = #'v1_0.transfer'{handle = ?UINT(Handle)}, Paylaod},
|
||||
State0 = #state{incoming_links = IncomingLinks}) ->
|
||||
{Flows, State1} = session_flow_control_received_transfer(State0),
|
||||
|
||||
{Reply, State} =
|
||||
case IncomingLinks of
|
||||
#{Handle := Link0} ->
|
||||
case incoming_link_transfer(Txfr, MsgPart, Link0, State1) of
|
||||
case incoming_link_transfer(Performative, Paylaod, Link0, State1) of
|
||||
{ok, Reply0, Link, State2} ->
|
||||
{Reply0, State2#state{incoming_links = IncomingLinks#{Handle := Link}}};
|
||||
{error, Reply0} ->
|
||||
|
@ -1065,7 +1065,7 @@ handle_control({Txfr = #'v1_0.transfer'{handle = ?UINT(Handle)}, MsgPart},
|
|||
{Reply0, State1#state{incoming_links = maps:remove(Handle, IncomingLinks)}}
|
||||
end;
|
||||
_ ->
|
||||
incoming_mgmt_link_transfer(Txfr, MsgPart, State1)
|
||||
incoming_mgmt_link_transfer(Performative, Paylaod, State1)
|
||||
end,
|
||||
reply0(Reply ++ Flows, State);
|
||||
|
||||
|
@ -1073,8 +1073,8 @@ handle_control({Txfr = #'v1_0.transfer'{handle = ?UINT(Handle)}, MsgPart},
|
|||
%% Although the AMQP message format [3.2] requires a body, it is valid to send a transfer frame without payload.
|
||||
%% For example, when a large multi transfer message is streamed using the ProtonJ2 client, the client could send
|
||||
%% a final #'v1_0.transfer'{more=false} frame without a payload.
|
||||
handle_control(Txfr = #'v1_0.transfer'{}, State) ->
|
||||
handle_control({Txfr, <<>>}, State);
|
||||
handle_control(Performative = #'v1_0.transfer'{}, State) ->
|
||||
handle_control({Performative, <<>>}, State);
|
||||
|
||||
%% Flow control. These frames come with two pieces of information:
|
||||
%% the session window, and optionally, credit for a particular link.
|
||||
|
@ -1622,7 +1622,7 @@ handle_deliver(ConsumerTag, AckRequired,
|
|||
Mc = mc:set_annotation(redelivered, Redelivered, Mc1),
|
||||
Sections0 = mc:protocol_state(Mc),
|
||||
Sections = mc_amqp:serialize(Sections0),
|
||||
?DEBUG("~s Outbound content:~n ~tp~n",
|
||||
?DEBUG("~s Outbound payload:~n ~tp~n",
|
||||
[?MODULE, [amqp10_framing:pprint(Section) ||
|
||||
Section <- amqp10_framing:decode_bin(iolist_to_binary(Sections))]]),
|
||||
validate_message_size(Sections, MaxMessageSize),
|
||||
|
@ -1769,7 +1769,7 @@ incoming_mgmt_link_transfer(
|
|||
delivery_tag = {binary, <<>>},
|
||||
message_format = ?UINT(?MESSAGE_FORMAT),
|
||||
settled = true},
|
||||
?DEBUG("~s Outbound content:~n ~tp~n",
|
||||
?DEBUG("~s Outbound payload:~n ~tp~n",
|
||||
[?MODULE, [amqp10_framing:pprint(Section) ||
|
||||
Section <- amqp10_framing:decode_bin(iolist_to_binary(Response))]]),
|
||||
validate_message_size(Response, OutgoingMaxMessageSize),
|
||||
|
@ -1866,7 +1866,7 @@ incoming_link_transfer(
|
|||
#multi_transfer_msg{payload_fragments_rev = PFR,
|
||||
delivery_id = FirstDeliveryId,
|
||||
settled = FirstSettled} ->
|
||||
MsgBin0 = iolist_to_binary(lists:reverse([MsgPart | PFR])),
|
||||
MsgBin0 = list_to_binary(lists:reverse([MsgPart | PFR])),
|
||||
ok = validate_multi_transfer_delivery_id(MaybeDeliveryId, FirstDeliveryId),
|
||||
ok = validate_multi_transfer_settled(MaybeSettled, FirstSettled),
|
||||
{MsgBin0, FirstDeliveryId, FirstSettled}
|
||||
|
@ -1875,7 +1875,7 @@ incoming_link_transfer(
|
|||
validate_incoming_message_size(MsgBin),
|
||||
|
||||
Sections = amqp10_framing:decode_bin(MsgBin),
|
||||
?DEBUG("~s Inbound content:~n ~tp",
|
||||
?DEBUG("~s Inbound payload:~n ~tp",
|
||||
[?MODULE, [amqp10_framing:pprint(Section) || Section <- Sections]]),
|
||||
Mc0 = mc:init(mc_amqp, Sections, #{}),
|
||||
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
|
||||
|
@ -2251,7 +2251,7 @@ handle_outgoing_mgmt_link_flow_control(
|
|||
Drain = default(Drain0, false),
|
||||
Echo = default(Echo0, false),
|
||||
DeliveryCountRcv = delivery_count_rcv(MaybeDeliveryCountRcv),
|
||||
LinkCreditSnd = link_credit_snd(DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd),
|
||||
LinkCreditSnd = amqp10_util:link_credit_snd(DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd),
|
||||
{Count, Credit} = case Drain of
|
||||
true -> {add(DeliveryCountSnd, LinkCreditSnd), 0};
|
||||
false -> {DeliveryCountSnd, LinkCreditSnd}
|
||||
|
@ -2298,7 +2298,7 @@ handle_outgoing_link_flow_control(
|
|||
%% thanks to the queue event containing the consumer tag.
|
||||
State;
|
||||
{credit_api_v1, DeliveryCountSnd} ->
|
||||
LinkCreditSnd = link_credit_snd(DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd),
|
||||
LinkCreditSnd = amqp10_util:link_credit_snd(DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd),
|
||||
{ok, QStates, Actions} = rabbit_queue_type:credit_v1(QName, Ctag, LinkCreditSnd, Drain, QStates0),
|
||||
State1 = State0#state{queue_states = QStates},
|
||||
State = handle_queue_actions(Actions, State1),
|
||||
|
@ -2315,9 +2315,6 @@ delivery_count_rcv(undefined) ->
|
|||
%% by the initial attach frame from the sender to the receiver." [2.6.7]
|
||||
?INITIAL_DELIVERY_COUNT.
|
||||
|
||||
link_credit_snd(DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd) ->
|
||||
diff(add(DeliveryCountRcv, LinkCreditRcv), DeliveryCountSnd).
|
||||
|
||||
%% The AMQP 0.9.1 credit extension was poorly designed because a consumer granting
|
||||
%% credits to a queue has to synchronously wait for a credit reply from the queue:
|
||||
%% https://github.com/rabbitmq/rabbitmq-server/blob/b9566f4d02f7ceddd2f267a92d46affd30fb16c8/deps/rabbitmq_codegen/credit_extension.json#L43
|
||||
|
@ -2398,20 +2395,20 @@ default(Thing, _Default) -> Thing.
|
|||
transfer_frames(Transfer, Sections, unlimited) ->
|
||||
[[Transfer, Sections]];
|
||||
transfer_frames(Transfer, Sections, MaxFrameSize) ->
|
||||
%% TODO Ugh
|
||||
TLen = iolist_size(amqp10_framing:encode_bin(Transfer)),
|
||||
encode_frames(Transfer, Sections, MaxFrameSize - TLen, []).
|
||||
PerformativeSize = iolist_size(amqp10_framing:encode_bin(Transfer)),
|
||||
encode_frames(Transfer, Sections, MaxFrameSize - PerformativeSize, []).
|
||||
|
||||
encode_frames(_T, _Msg, MaxContentLen, _Transfers) when MaxContentLen =< 0 ->
|
||||
encode_frames(_T, _Msg, MaxPayloadSize, _Transfers) when MaxPayloadSize =< 0 ->
|
||||
protocol_error(?V_1_0_AMQP_ERROR_FRAME_SIZE_TOO_SMALL,
|
||||
"Frame size is too small by ~tp bytes",
|
||||
[-MaxContentLen]);
|
||||
encode_frames(T, Msg, MaxContentLen, Transfers) ->
|
||||
case iolist_size(Msg) > MaxContentLen of
|
||||
"Frame size is too small by ~b bytes",
|
||||
[-MaxPayloadSize]);
|
||||
encode_frames(T, Msg, MaxPayloadSize, Transfers) ->
|
||||
case iolist_size(Msg) > MaxPayloadSize of
|
||||
true ->
|
||||
<<Chunk:MaxContentLen/binary, Rest/binary>> = iolist_to_binary(Msg),
|
||||
MsgBin = iolist_to_binary(Msg),
|
||||
{Chunk, Rest} = split_binary(MsgBin, MaxPayloadSize),
|
||||
T1 = T#'v1_0.transfer'{more = true},
|
||||
encode_frames(T, Rest, MaxContentLen, [[T1, Chunk] | Transfers]);
|
||||
encode_frames(T, Rest, MaxPayloadSize, [[T1, Chunk] | Transfers]);
|
||||
false ->
|
||||
lists:reverse([[T, Msg] | Transfers])
|
||||
end.
|
||||
|
|
|
@ -38,6 +38,9 @@
|
|||
-define(CALL_TIMEOUT, 300_000).
|
||||
-define(AMQP_SASL_FRAME_TYPE, 1).
|
||||
|
||||
-type performative() :: tuple().
|
||||
-type payload() :: iodata().
|
||||
|
||||
%%%%%%%%%%%%%%%%%%
|
||||
%%% client API %%%
|
||||
%%%%%%%%%%%%%%%%%%
|
||||
|
@ -51,24 +54,24 @@ start_link(Sock, MaxFrame, ReaderPid) ->
|
|||
|
||||
-spec send_command(pid(),
|
||||
rabbit_types:channel_number(),
|
||||
rabbit_framing:amqp_method_record()) -> ok.
|
||||
send_command(Writer, ChannelNum, MethodRecord) ->
|
||||
Request = {send_command, ChannelNum, MethodRecord},
|
||||
performative()) -> ok.
|
||||
send_command(Writer, ChannelNum, Performative) ->
|
||||
Request = {send_command, ChannelNum, Performative},
|
||||
gen_server:cast(Writer, Request).
|
||||
|
||||
-spec send_command(pid(),
|
||||
rabbit_types:channel_number(),
|
||||
rabbit_framing:amqp_method_record(),
|
||||
rabbit_types:content()) -> ok.
|
||||
send_command(Writer, ChannelNum, MethodRecord, Content) ->
|
||||
Request = {send_command, ChannelNum, MethodRecord, Content},
|
||||
performative(),
|
||||
payload()) -> ok.
|
||||
send_command(Writer, ChannelNum, Performative, Payload) ->
|
||||
Request = {send_command, ChannelNum, Performative, Payload},
|
||||
gen_server:cast(Writer, Request).
|
||||
|
||||
-spec send_command_sync(pid(),
|
||||
rabbit_types:channel_number(),
|
||||
rabbit_framing:amqp_method_record()) -> ok.
|
||||
send_command_sync(Writer, ChannelNum, MethodRecord) ->
|
||||
Request = {send_command, ChannelNum, MethodRecord},
|
||||
performative()) -> ok.
|
||||
send_command_sync(Writer, ChannelNum, Performative) ->
|
||||
Request = {send_command, ChannelNum, Performative},
|
||||
gen_server:call(Writer, Request, ?CALL_TIMEOUT).
|
||||
|
||||
%% Delete this function when feature flag credit_api_v2 becomes required.
|
||||
|
@ -76,17 +79,17 @@ send_command_sync(Writer, ChannelNum, MethodRecord) ->
|
|||
rabbit_types:channel_number(),
|
||||
pid(),
|
||||
pid(),
|
||||
rabbit_framing:amqp_method_record(),
|
||||
rabbit_types:content()) -> ok.
|
||||
send_command_and_notify(Writer, ChannelNum, QueuePid, SessionPid, MethodRecord, Content) ->
|
||||
Request = {send_command_and_notify, ChannelNum, QueuePid, SessionPid, MethodRecord, Content},
|
||||
performative(),
|
||||
payload()) -> ok.
|
||||
send_command_and_notify(Writer, ChannelNum, QueuePid, SessionPid, Performative, Payload) ->
|
||||
Request = {send_command_and_notify, ChannelNum, QueuePid, SessionPid, Performative, Payload},
|
||||
gen_server:cast(Writer, Request).
|
||||
|
||||
-spec internal_send_command(rabbit_net:socket(),
|
||||
rabbit_framing:amqp_method_record(),
|
||||
performative(),
|
||||
amqp10_framing | rabbit_amqp_sasl) -> ok.
|
||||
internal_send_command(Sock, MethodRecord, Protocol) ->
|
||||
Data = assemble_frame(0, MethodRecord, Protocol),
|
||||
internal_send_command(Sock, Performative, Protocol) ->
|
||||
Data = assemble_frame(0, Performative, Protocol),
|
||||
ok = tcp_send(Sock, Data).
|
||||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
|
@ -102,20 +105,20 @@ init({Sock, MaxFrame, ReaderPid}) ->
|
|||
process_flag(message_queue_data, off_heap),
|
||||
{ok, State}.
|
||||
|
||||
handle_cast({send_command, ChannelNum, MethodRecord}, State0) ->
|
||||
State = internal_send_command_async(ChannelNum, MethodRecord, State0),
|
||||
handle_cast({send_command, ChannelNum, Performative}, State0) ->
|
||||
State = internal_send_command_async(ChannelNum, Performative, State0),
|
||||
no_reply(State);
|
||||
handle_cast({send_command, ChannelNum, MethodRecord, Content}, State0) ->
|
||||
State = internal_send_command_async(ChannelNum, MethodRecord, Content, State0),
|
||||
handle_cast({send_command, ChannelNum, Performative, Payload}, State0) ->
|
||||
State = internal_send_command_async(ChannelNum, Performative, Payload, State0),
|
||||
no_reply(State);
|
||||
%% Delete below function clause when feature flag credit_api_v2 becomes required.
|
||||
handle_cast({send_command_and_notify, ChannelNum, QueuePid, SessionPid, MethodRecord, Content}, State0) ->
|
||||
State = internal_send_command_async(ChannelNum, MethodRecord, Content, State0),
|
||||
handle_cast({send_command_and_notify, ChannelNum, QueuePid, SessionPid, Performative, Payload}, State0) ->
|
||||
State = internal_send_command_async(ChannelNum, Performative, Payload, State0),
|
||||
rabbit_amqqueue:notify_sent(QueuePid, SessionPid),
|
||||
no_reply(State).
|
||||
|
||||
handle_call({send_command, ChannelNum, MethodRecord}, _From, State0) ->
|
||||
State1 = internal_send_command_async(ChannelNum, MethodRecord, State0),
|
||||
handle_call({send_command, ChannelNum, Performative}, _From, State0) ->
|
||||
State1 = internal_send_command_async(ChannelNum, Performative, State0),
|
||||
State = flush(State1),
|
||||
{reply, ok, State}.
|
||||
|
||||
|
@ -151,34 +154,21 @@ format_status(Status) ->
|
|||
no_reply(State) ->
|
||||
{noreply, State, 0}.
|
||||
|
||||
internal_send_command_async(Channel, MethodRecord,
|
||||
internal_send_command_async(Channel, Performative,
|
||||
State = #state{pending = Pending,
|
||||
pending_size = PendingSize}) ->
|
||||
Frame = assemble_frame(Channel, MethodRecord),
|
||||
Frame = assemble_frame(Channel, Performative),
|
||||
maybe_flush(State#state{pending = [Frame | Pending],
|
||||
pending_size = PendingSize + iolist_size(Frame)}).
|
||||
|
||||
internal_send_command_async(Channel, MethodRecord, Content,
|
||||
internal_send_command_async(Channel, Performative, Payload,
|
||||
State = #state{max_frame_size = MaxFrame,
|
||||
pending = Pending,
|
||||
pending_size = PendingSize}) ->
|
||||
Frames = assemble_frames(Channel, MethodRecord, Content, MaxFrame),
|
||||
Frames = assemble_frame(Channel, Performative, Payload, MaxFrame),
|
||||
maybe_flush(State#state{pending = [Frames | Pending],
|
||||
pending_size = PendingSize + iolist_size(Frames)}).
|
||||
|
||||
%% Note: a transfer record can be followed by a number of other
|
||||
%% records to make a complete frame but unlike 0-9-1 we may have many
|
||||
%% content records. However, that's already been handled for us, we're
|
||||
%% just sending a chunk, so from this perspective it's just a binary.
|
||||
|
||||
%%TODO respect MaxFrame
|
||||
assemble_frames(Channel, Performative, Content, _MaxFrame) ->
|
||||
?DEBUG("~s Channel ~tp <-~n~tp~n followed by ~tp bytes of content~n",
|
||||
[?MODULE, Channel, amqp10_framing:pprint(Performative),
|
||||
iolist_size(Content)]),
|
||||
PerfBin = amqp10_framing:encode_bin(Performative),
|
||||
amqp10_binary_generator:build_frame(Channel, [PerfBin, Content]).
|
||||
|
||||
assemble_frame(Channel, Performative) ->
|
||||
assemble_frame(Channel, Performative, amqp10_framing).
|
||||
|
||||
|
@ -193,6 +183,14 @@ assemble_frame(Channel, Performative, rabbit_amqp_sasl) ->
|
|||
PerfBin = amqp10_framing:encode_bin(Performative),
|
||||
amqp10_binary_generator:build_frame(Channel, ?AMQP_SASL_FRAME_TYPE, PerfBin).
|
||||
|
||||
%%TODO respect MaxFrame
|
||||
assemble_frame(Channel, Performative, Payload, _MaxFrame) ->
|
||||
?DEBUG("~s Channel ~tp <-~n~tp~n followed by ~tp bytes of payload~n",
|
||||
[?MODULE, Channel, amqp10_framing:pprint(Performative),
|
||||
iolist_size(Payload)]),
|
||||
PerfIoData = amqp10_framing:encode_bin(Performative),
|
||||
amqp10_binary_generator:build_frame(Channel, [PerfIoData, Payload]).
|
||||
|
||||
tcp_send(Sock, Data) ->
|
||||
rabbit_misc:throw_on_error(
|
||||
inet_error,
|
||||
|
|
|
@ -2607,11 +2607,12 @@ initial_delivery_count(_) ->
|
|||
credit_api_v2(#consumer_cfg{meta = ConsumerMeta}) ->
|
||||
maps:is_key(initial_delivery_count, ConsumerMeta).
|
||||
|
||||
%% AMQP 1.0 §2.6.7
|
||||
link_credit_snd(DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd, ConsumerCfg) ->
|
||||
C = case credit_api_v2(ConsumerCfg) of
|
||||
true -> diff(add(DeliveryCountRcv, LinkCreditRcv), DeliveryCountSnd);
|
||||
false -> DeliveryCountRcv + LinkCreditRcv - DeliveryCountSnd
|
||||
end,
|
||||
%% C can be negative when receiver decreases credits while messages are in flight.
|
||||
max(0, C).
|
||||
case credit_api_v2(ConsumerCfg) of
|
||||
true ->
|
||||
amqp10_util:link_credit_snd(DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd);
|
||||
false ->
|
||||
C = DeliveryCountRcv + LinkCreditRcv - DeliveryCountSnd,
|
||||
%% C can be negative when receiver decreases credits while messages are in flight.
|
||||
max(0, C)
|
||||
end.
|
||||
|
|
|
@ -485,10 +485,8 @@ process_credit(DeliveryCountRcv, LinkCredit, ChPid, CTag, State) ->
|
|||
_ ->
|
||||
%% credit API v2
|
||||
%% LinkCredit refers to LinkCreditRcv
|
||||
%% See AMQP §2.6.7
|
||||
serial_number:diff(
|
||||
serial_number:add(DeliveryCountRcv, LinkCredit),
|
||||
DeliveryCountSnd)
|
||||
amqp10_util:link_credit_snd(
|
||||
DeliveryCountRcv, LinkCredit, DeliveryCountSnd)
|
||||
end,
|
||||
C = C0#cr{link_states = maps:update(CTag, LinkState#link_state{credit = LinkCreditSnd}, LinkStates)},
|
||||
case OldLinkCreditSnd > 0 orelse
|
||||
|
|
|
@ -72,8 +72,7 @@
|
|||
-type queue_type() :: rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue.
|
||||
%% see AMQP 1.0 §2.6.7
|
||||
-type delivery_count() :: sequence_no().
|
||||
%% Link credit can be negative, see AMQP 1.0 §2.6.7
|
||||
-type credit() :: integer().
|
||||
-type credit() :: uint().
|
||||
|
||||
-define(STATE, ?MODULE).
|
||||
|
||||
|
|
|
@ -473,9 +473,8 @@ credit(QName, CTag, DeliveryCountRcv, LinkCreditRcv, Drain, Echo,
|
|||
local_pid = LocalPid} = State0) ->
|
||||
case Readers of
|
||||
#{CTag := Str0 = #stream{delivery_count = DeliveryCountSnd}} ->
|
||||
LinkCreditSnd = serial_number:diff(
|
||||
serial_number:add(DeliveryCountRcv, LinkCreditRcv),
|
||||
DeliveryCountSnd),
|
||||
LinkCreditSnd = amqp10_util:link_credit_snd(
|
||||
DeliveryCountRcv, LinkCreditRcv, DeliveryCountSnd),
|
||||
Str1 = Str0#stream{credit = LinkCreditSnd},
|
||||
{Str2 = #stream{delivery_count = DeliveryCount,
|
||||
credit = Credit,
|
||||
|
|
|
@ -44,6 +44,7 @@ amqp10_common:
|
|||
- amqp10_binary_parser
|
||||
- amqp10_framing
|
||||
- amqp10_framing0
|
||||
- amqp10_util
|
||||
- serial_number
|
||||
aten:
|
||||
- aten
|
||||
|
|
Loading…
Reference in New Issue