Shovels: Optimise amqp10 client messages for shovel usage

AMQP10 shovels don't need the amqp10 message format, the binary
can be translated directly into a message container and also
the other way around. The new amqp10_raw_msg just stores the payload
and information required to create the transfer frame, skipping
a few unnecessary encoding/decoding operations of the AMQP10 sections.
This commit is contained in:
Diana Parra Corbacho 2025-09-22 16:27:53 +02:00 committed by Michael Klishin
parent b84615e215
commit 31e5a722c5
No known key found for this signature in database
GPG Key ID: 16AB14D00D613900
5 changed files with 169 additions and 31 deletions

View File

@ -28,6 +28,7 @@
attach_receiver_link/5,
attach_receiver_link/6,
attach_receiver_link/7,
attach_receiver_link/8,
attach_link/2,
detach_link/1,
send_msg/2,
@ -277,7 +278,17 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) ->
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
terminus_durability(), filter(), properties()) ->
{ok, link_ref()}.
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties)
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) ->
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties, false).
%% @doc Attaches a receiver link to a source.
%% This is asynchronous and will notify completion of the attach request to the
%% caller using an amqp10_event of the following format:
%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}}
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
terminus_durability(), filter(), properties(), boolean()) ->
{ok, link_ref()}.
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties, RawMode)
when is_pid(Session) andalso
is_binary(Name) andalso
is_binary(Source) andalso
@ -286,14 +297,16 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Prop
SettleMode == mixed) andalso
is_atom(Durability) andalso
is_map(Filter) andalso
is_map(Properties) ->
is_map(Properties) andalso
is_boolean(RawMode) ->
AttachArgs = #{name => Name,
role => {receiver, #{address => Source,
durable => Durability}, self()},
snd_settle_mode => SettleMode,
rcv_settle_mode => first,
filter => Filter,
properties => Properties},
properties => Properties,
raw_mode => RawMode},
amqp10_client_session:attach(Session, AttachArgs).
-spec attach_link(pid(), attach_args()) -> {ok, link_ref()}.
@ -355,11 +368,16 @@ stop_receiver_link(#link_ref{role = receiver,
%%% messages
%% @doc Send a message on a the link referred to be the 'LinkRef'.
-spec send_msg(link_ref(), amqp10_msg:amqp10_msg()) ->
-spec send_msg(link_ref(), amqp10_msg:amqp10_msg() | amqp10_raw_msg:amqp10_raw_msg()) ->
ok | amqp10_client_session:transfer_error().
send_msg(#link_ref{role = sender, session = Session,
link_handle = Handle}, Msg0) ->
Msg = amqp10_msg:set_handle(Handle, Msg0),
Msg = case amqp10_raw_msg:is(Msg0) of
true ->
amqp10_raw_msg:set_handle(Handle, Msg0);
false ->
amqp10_msg:set_handle(Handle, Msg0)
end,
amqp10_client_session:transfer(Session, Msg, ?TIMEOUT).
%% @doc Accept a message on a the link referred to be the 'LinkRef'.

View File

@ -100,7 +100,8 @@
properties => amqp10_client_types:properties(),
max_message_size => max_message_size(),
handle => output_handle(),
footer_opt => footer_opt()
footer_opt => footer_opt(),
raw_mode => boolean()
}.
-type transfer_error() :: {error,
@ -142,7 +143,8 @@
auto_flow :: never | {RenewWhenBelow :: pos_integer(),
Credit :: pos_integer()},
incoming_unsettled = #{} :: #{delivery_number() => ok},
footer_opt :: footer_opt() | undefined
footer_opt :: footer_opt() | undefined,
raw_mode = false :: boolean()
}).
-record(state,
@ -208,11 +210,18 @@ attach(Session, Args) ->
detach(Session, Handle) ->
gen_statem:call(Session, {detach, Handle}, ?TIMEOUT).
-spec transfer(pid(), amqp10_msg:amqp10_msg(), timeout()) ->
-spec transfer(pid(), amqp10_msg:amqp10_msg() | amqp10_raw_msg:amqp10_raw_msg(), timeout()) ->
ok | transfer_error().
transfer(Session, Amqp10Msg, Timeout) ->
[Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg),
gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout).
case amqp10_raw_msg:is(Amqp10Msg) of
true ->
Transfer = amqp10_raw_msg:transfer(Amqp10Msg),
Payload = amqp10_raw_msg:payload(Amqp10Msg),
gen_statem:call(Session, {transfer, Transfer, {raw, Payload}}, Timeout);
false ->
[Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg),
gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout)
end.
-spec flow(pid(), non_neg_integer(), never | pos_integer()) -> ok.
flow(Session, IncomingWindow, RenewWhenBelow) when
@ -413,7 +422,8 @@ mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}},
{ok, #link{target = {pid, TargetPid},
ref = LinkRef,
incoming_unsettled = Unsettled,
footer_opt = FooterOpt
footer_opt = FooterOpt,
raw_mode = RawMode
} = Link0} = find_link_by_input_handle(InHandle, State0),
{Transfer = #'v1_0.transfer'{settled = Settled,
@ -428,7 +438,7 @@ mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}},
%% then the settled flag MUST be interpreted as being false." [2.7.5]
Link1#link{incoming_unsettled = Unsettled#{DeliveryId => ok}}
end,
case decode_as_msg(Transfer, Payload, FooterOpt) of
case decode_as_msg(Transfer, Payload, FooterOpt, RawMode) of
{ok, Msg} ->
% link bookkeeping
% notify when credit is exhausted (link_credit = 0)
@ -619,6 +629,25 @@ send(Record, #state{socket = Socket} = State) ->
Frame = encode_frame(Record, State),
socket_send(Socket, Frame).
send_transfer(Transfer0, {raw, Payload}, _FooterOpt, MaxMessageSize,
#state{socket = Socket,
channel = Channel,
connection_config = Config}) ->
OutMaxFrameSize = maps:get(outgoing_max_frame_size, Config),
Transfer = Transfer0#'v1_0.transfer'{more = false},
TransferSize = iolist_size(amqp10_framing:encode_bin(Transfer)),
if is_integer(MaxMessageSize) andalso
MaxMessageSize > 0 andalso
byte_size(Payload) > MaxMessageSize ->
{error, message_size_exceeded};
true ->
% TODO: this does not take the extended header into account
% see: 2.3
MaxPayloadSize = OutMaxFrameSize - ?FRAME_HEADER_SIZE - TransferSize,
Frames = build_frames(Channel, Transfer, Payload, MaxPayloadSize, []),
ok = socket_send(Socket, Frames),
{ok, length(Frames)}
end;
send_transfer(Transfer0, Sections0, FooterOpt, MaxMessageSize,
#state{socket = Socket,
channel = Channel,
@ -918,7 +947,8 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _},
target = LinkTarget,
delivery_count = unpack(InitialDeliveryCount),
max_message_size = unpack(MaxMessageSize),
footer_opt = maps:get(footer_opt, Args, undefined)},
footer_opt = maps:get(footer_opt, Args, undefined),
raw_mode = maps:get(raw_mode, Args, false)},
{State#state{links = Links#{OutHandle => Link},
next_link_handle = NextLinkHandle,
@ -1199,10 +1229,14 @@ complete_partial_transfer(_Transfer, Payload,
{T, iolist_to_binary(lists:reverse([Payload | Payloads])),
Link#link{partial_transfers = undefined}}.
decode_as_msg(Transfer, Payload, undefined) ->
decode_as_msg(#'v1_0.transfer'{settled = Settled,
delivery_id = {uint, DeliveryId}},
Payload, _, true) ->
{ok, amqp10_raw_msg:new(Settled, DeliveryId, Payload)};
decode_as_msg(Transfer, Payload, undefined, _) ->
Sections = amqp10_framing:decode_bin(Payload),
{ok, amqp10_msg:from_amqp_records([Transfer | Sections])};
decode_as_msg(Transfer, Payload, FooterOpt) ->
decode_as_msg(Transfer, Payload, FooterOpt, _) ->
PosSections = decode_sections([], Payload, byte_size(Payload), 0),
Sections = lists:map(fun({_Pos, S}) -> S end, PosSections),
Msg = amqp10_msg:from_amqp_records([Transfer | Sections]),

View File

@ -0,0 +1,76 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(amqp10_raw_msg).
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").
%% Just for AMQP10 shovel usage. It avoids the binary <-> amqp10 <-> mc
%% conversion, with all the unnecessary encoding/decoding steps.
%% It allows for just binary <-> mc conversion, as the payload is stored as is
-export([new/3,
settled/1,
delivery_tag/1,
payload/1,
handle/1,
set_handle/2,
transfer/1,
is/1]).
-record(amqp10_raw_msg,
{settled :: boolean(),
delivery_tag :: non_neg_integer(),
payload :: binary(),
handle :: non_neg_integer() | undefined
}).
-opaque amqp10_raw_msg() :: #amqp10_raw_msg{}.
-export_type([amqp10_raw_msg/0]).
-spec new(boolean(), non_neg_integer(), binary()) ->
amqp10_raw_msg().
new(Settled, DeliveryTag, Payload) ->
#amqp10_raw_msg{settled = Settled,
delivery_tag = DeliveryTag,
payload = Payload}.
-spec settled(amqp10_raw_msg()) -> boolean().
settled(#amqp10_raw_msg{settled = Settled}) ->
Settled.
-spec delivery_tag(amqp10_raw_msg()) -> non_neg_integer().
delivery_tag(#amqp10_raw_msg{delivery_tag = DeliveryTag}) ->
DeliveryTag.
-spec payload(amqp10_raw_msg()) -> binary().
payload(#amqp10_raw_msg{payload = Payload}) ->
Payload.
-spec handle(amqp10_raw_msg()) -> non_neg_integer().
handle(#amqp10_raw_msg{handle = Handle}) ->
Handle.
-spec set_handle(non_neg_integer(), amqp10_raw_msg()) ->
amqp10_raw_msg().
set_handle(Handle, #amqp10_raw_msg{} = Msg) ->
Msg#amqp10_raw_msg{handle = Handle}.
-spec transfer(amqp10_raw_msg()) -> #'v1_0.transfer'{}.
transfer(#amqp10_raw_msg{settled = Settled,
delivery_tag = DeliveryTag,
handle = Handle}) ->
#'v1_0.transfer'{
delivery_tag = {binary, rabbit_data_coercion:to_binary(DeliveryTag)},
settled = Settled,
handle = {uint, Handle},
message_format = {uint, ?MESSAGE_FORMAT}}.
-spec is(term()) -> boolean().
is(Record) ->
is_record(Record, amqp10_raw_msg).

View File

@ -11,6 +11,7 @@
-include_lib("rabbit/include/mc.hrl").
-include("rabbit_shovel.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-export([
parse/2,
@ -81,7 +82,9 @@ connect_source(State = #{name := Name,
on_publish -> unsettled;
on_confirm -> unsettled
end,
AttachFun = fun amqp10_client:attach_receiver_link/5,
AttachFun = fun(S, L, A, SSM, D) ->
amqp10_client:attach_receiver_link(S, L, A, SSM, D, #{}, #{}, true)
end,
{Conn, Sess, LinkRef} = connect(Name, SndSettleMode, Uri, "receiver", Addr, Src,
AttachFun),
State#{source => Src#{current => #{conn => Conn,
@ -128,6 +131,7 @@ connect(Name, SndSettleMode, Uri, Postfix, Addr, Map, AttachFun) ->
% needs to be sync, i.e. awaits the 'attach' event as
% else we may try to use the link before it is ready
Durability = maps:get(durability, Map, unsettled_state),
%% Attach in raw mode
{ok, LinkRef} = AttachFun(Sess, LinkName, Addr,
SndSettleMode,
Durability),
@ -181,11 +185,10 @@ dest_endpoint(#{shovel_type := dynamic,
-spec handle_source(Msg :: any(), state()) ->
not_handled | state() | {stop, any()}.
handle_source({amqp10_msg, _LinkRef, Msg0}, State) ->
Tag = amqp10_msg:delivery_id(Msg0),
[_ | Rest] = amqp10_msg:to_amqp_records(Msg0),
Bin = iolist_to_binary([amqp10_framing:encode_bin(D) || D <- Rest]),
Msg = mc:init(mc_amqp, Bin, #{}),
handle_source({amqp10_msg, _LinkRef, RawMsg}, State) ->
Tag = amqp10_raw_msg:delivery_tag(RawMsg),
Payload = amqp10_raw_msg:payload(RawMsg),
Msg = mc:init(mc_amqp, Payload, #{}),
rabbit_shovel_behaviour:forward(Tag, Msg, State);
handle_source({amqp10_event, {connection, Conn, opened}},
State = #{source := #{current := #{conn := Conn}}}) ->
@ -333,11 +336,10 @@ forward(Tag, Msg0,
unacked := Unacked} = Dst,
ack_mode := AckMode} = State) ->
OutTag = rabbit_data_coercion:to_binary(Tag),
Msg1 = mc:protocol_state(mc:convert(mc_amqp, Msg0)),
Records = lists:flatten([amqp10_framing:decode_bin(iolist_to_binary(S)) || S <- Msg1]),
Msg2 = amqp10_msg:new(OutTag, Records, AckMode =/= on_confirm),
Msg = add_timestamp_header(State, add_forward_headers(State, Msg2)),
case send_msg(Link, Msg) of
Msg1 = add_timestamp_header(State, add_forward_headers(State, Msg0)),
Msg2 = mc:protocol_state(mc:convert(mc_amqp, Msg1)),
Msg3 = amqp10_raw_msg:new(AckMode =/= on_confirm, Tag, iolist_to_binary(Msg2)),
case send_msg(Link, Msg3) of
ok ->
rabbit_shovel_behaviour:decr_remaining_unacked(
case AckMode of
@ -366,10 +368,13 @@ send_msg(Link, Msg) ->
end.
add_timestamp_header(#{dest := #{add_timestamp_header := true}}, Msg) ->
Anns = #{<<"x-opt-shovelled-timestamp">> => os:system_time(milli_seconds)},
amqp10_msg:set_message_annotations(Anns, Msg);
mc:set_annotation(
<<"x-opt-shovelled-timestamp">>, os:system_time(milli_seconds),
Msg);
add_timestamp_header(_, Msg) -> Msg.
add_forward_headers(#{dest := #{cached_forward_headers := Anns}}, Msg) ->
amqp10_msg:set_message_annotations(Anns, Msg);
maps:fold(fun(K, V, Acc) ->
mc:set_annotation(K, V, Acc)
end, Msg, Anns);
add_forward_headers(_, Msg) -> Msg.

View File

@ -74,7 +74,9 @@ amqp_encoded_data_list(_Config) ->
#'v1_0.data'{content = <<"one">>},
#'v1_0.data'{content = <<"two">>}
],
Msg = amqp10_msg:new(55, Body),
[_Transfer | Sections] = amqp10_msg:to_amqp_records(amqp10_msg:new(<<"55">>, Body)),
Bin = iolist_to_binary([amqp10_framing:encode_bin(S) || S <- Sections]),
Msg = amqp10_raw_msg:new(true, 55, Bin),
rabbit_amqp10_shovel:handle_source({amqp10_msg, linkref, Msg}, State),
?assert(meck:validate(rabbit_shovel_behaviour)),
@ -91,8 +93,11 @@ amqp_encoded_amqp_value(_Config) ->
State = #{source => #{},
dest => #{module => rabbit_amqp10_shovel},
ack_mode => no_ack},
Body = #'v1_0.amqp_value'{content = {utf8, <<"hi">>}},
Msg = amqp10_msg:new(55, Body),
[_Transfer | Sections] = amqp10_msg:to_amqp_records(amqp10_msg:new(<<"55">>, Body)),
Bin = iolist_to_binary([amqp10_framing:encode_bin(S) || S <- Sections]),
Msg = amqp10_raw_msg:new(true, 55, Bin),
rabbit_amqp10_shovel:handle_source({amqp10_msg, linkref, Msg}, State),
?assert(meck:validate(rabbit_shovel_behaviour)),