Merge pull request #14593 from rabbitmq/shovel-raw
Shovels: Optimise amqp10 client messages for shovel usage
This commit is contained in:
commit
351ec1b4c7
|
@ -28,6 +28,7 @@
|
|||
attach_receiver_link/5,
|
||||
attach_receiver_link/6,
|
||||
attach_receiver_link/7,
|
||||
attach_receiver_link/8,
|
||||
attach_link/2,
|
||||
detach_link/1,
|
||||
send_msg/2,
|
||||
|
@ -277,7 +278,17 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) ->
|
|||
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
|
||||
terminus_durability(), filter(), properties()) ->
|
||||
{ok, link_ref()}.
|
||||
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties)
|
||||
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) ->
|
||||
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties, false).
|
||||
|
||||
%% @doc Attaches a receiver link to a source.
|
||||
%% This is asynchronous and will notify completion of the attach request to the
|
||||
%% caller using an amqp10_event of the following format:
|
||||
%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}}
|
||||
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
|
||||
terminus_durability(), filter(), properties(), boolean()) ->
|
||||
{ok, link_ref()}.
|
||||
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties, RawMode)
|
||||
when is_pid(Session) andalso
|
||||
is_binary(Name) andalso
|
||||
is_binary(Source) andalso
|
||||
|
@ -286,14 +297,16 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Prop
|
|||
SettleMode == mixed) andalso
|
||||
is_atom(Durability) andalso
|
||||
is_map(Filter) andalso
|
||||
is_map(Properties) ->
|
||||
is_map(Properties) andalso
|
||||
is_boolean(RawMode) ->
|
||||
AttachArgs = #{name => Name,
|
||||
role => {receiver, #{address => Source,
|
||||
durable => Durability}, self()},
|
||||
snd_settle_mode => SettleMode,
|
||||
rcv_settle_mode => first,
|
||||
filter => Filter,
|
||||
properties => Properties},
|
||||
properties => Properties,
|
||||
raw_mode => RawMode},
|
||||
amqp10_client_session:attach(Session, AttachArgs).
|
||||
|
||||
-spec attach_link(pid(), attach_args()) -> {ok, link_ref()}.
|
||||
|
@ -355,11 +368,16 @@ stop_receiver_link(#link_ref{role = receiver,
|
|||
%%% messages
|
||||
|
||||
%% @doc Send a message on a the link referred to be the 'LinkRef'.
|
||||
-spec send_msg(link_ref(), amqp10_msg:amqp10_msg()) ->
|
||||
-spec send_msg(link_ref(), amqp10_msg:amqp10_msg() | amqp10_raw_msg:amqp10_raw_msg()) ->
|
||||
ok | amqp10_client_session:transfer_error().
|
||||
send_msg(#link_ref{role = sender, session = Session,
|
||||
link_handle = Handle}, Msg0) ->
|
||||
Msg = amqp10_msg:set_handle(Handle, Msg0),
|
||||
Msg = case amqp10_raw_msg:is(Msg0) of
|
||||
true ->
|
||||
amqp10_raw_msg:set_handle(Handle, Msg0);
|
||||
false ->
|
||||
amqp10_msg:set_handle(Handle, Msg0)
|
||||
end,
|
||||
amqp10_client_session:transfer(Session, Msg, ?TIMEOUT).
|
||||
|
||||
%% @doc Accept a message on a the link referred to be the 'LinkRef'.
|
||||
|
|
|
@ -100,7 +100,8 @@
|
|||
properties => amqp10_client_types:properties(),
|
||||
max_message_size => max_message_size(),
|
||||
handle => output_handle(),
|
||||
footer_opt => footer_opt()
|
||||
footer_opt => footer_opt(),
|
||||
raw_mode => boolean()
|
||||
}.
|
||||
|
||||
-type transfer_error() :: {error,
|
||||
|
@ -142,7 +143,8 @@
|
|||
auto_flow :: never | {RenewWhenBelow :: pos_integer(),
|
||||
Credit :: pos_integer()},
|
||||
incoming_unsettled = #{} :: #{delivery_number() => ok},
|
||||
footer_opt :: footer_opt() | undefined
|
||||
footer_opt :: footer_opt() | undefined,
|
||||
raw_mode = false :: boolean()
|
||||
}).
|
||||
|
||||
-record(state,
|
||||
|
@ -208,11 +210,18 @@ attach(Session, Args) ->
|
|||
detach(Session, Handle) ->
|
||||
gen_statem:call(Session, {detach, Handle}, ?TIMEOUT).
|
||||
|
||||
-spec transfer(pid(), amqp10_msg:amqp10_msg(), timeout()) ->
|
||||
-spec transfer(pid(), amqp10_msg:amqp10_msg() | amqp10_raw_msg:amqp10_raw_msg(), timeout()) ->
|
||||
ok | transfer_error().
|
||||
transfer(Session, Amqp10Msg, Timeout) ->
|
||||
[Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg),
|
||||
gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout).
|
||||
case amqp10_raw_msg:is(Amqp10Msg) of
|
||||
true ->
|
||||
Transfer = amqp10_raw_msg:transfer(Amqp10Msg),
|
||||
Payload = amqp10_raw_msg:payload(Amqp10Msg),
|
||||
gen_statem:call(Session, {transfer, Transfer, {raw, Payload}}, Timeout);
|
||||
false ->
|
||||
[Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg),
|
||||
gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout)
|
||||
end.
|
||||
|
||||
-spec flow(pid(), non_neg_integer(), never | pos_integer()) -> ok.
|
||||
flow(Session, IncomingWindow, RenewWhenBelow) when
|
||||
|
@ -413,7 +422,8 @@ mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}},
|
|||
{ok, #link{target = {pid, TargetPid},
|
||||
ref = LinkRef,
|
||||
incoming_unsettled = Unsettled,
|
||||
footer_opt = FooterOpt
|
||||
footer_opt = FooterOpt,
|
||||
raw_mode = RawMode
|
||||
} = Link0} = find_link_by_input_handle(InHandle, State0),
|
||||
|
||||
{Transfer = #'v1_0.transfer'{settled = Settled,
|
||||
|
@ -428,7 +438,7 @@ mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}},
|
|||
%% then the settled flag MUST be interpreted as being false." [2.7.5]
|
||||
Link1#link{incoming_unsettled = Unsettled#{DeliveryId => ok}}
|
||||
end,
|
||||
case decode_as_msg(Transfer, Payload, FooterOpt) of
|
||||
case decode_as_msg(Transfer, Payload, FooterOpt, RawMode) of
|
||||
{ok, Msg} ->
|
||||
% link bookkeeping
|
||||
% notify when credit is exhausted (link_credit = 0)
|
||||
|
@ -619,6 +629,25 @@ send(Record, #state{socket = Socket} = State) ->
|
|||
Frame = encode_frame(Record, State),
|
||||
socket_send(Socket, Frame).
|
||||
|
||||
send_transfer(Transfer0, {raw, Payload}, _FooterOpt, MaxMessageSize,
|
||||
#state{socket = Socket,
|
||||
channel = Channel,
|
||||
connection_config = Config}) ->
|
||||
OutMaxFrameSize = maps:get(outgoing_max_frame_size, Config),
|
||||
Transfer = Transfer0#'v1_0.transfer'{more = false},
|
||||
TransferSize = iolist_size(amqp10_framing:encode_bin(Transfer)),
|
||||
if is_integer(MaxMessageSize) andalso
|
||||
MaxMessageSize > 0 andalso
|
||||
byte_size(Payload) > MaxMessageSize ->
|
||||
{error, message_size_exceeded};
|
||||
true ->
|
||||
% TODO: this does not take the extended header into account
|
||||
% see: 2.3
|
||||
MaxPayloadSize = OutMaxFrameSize - ?FRAME_HEADER_SIZE - TransferSize,
|
||||
Frames = build_frames(Channel, Transfer, Payload, MaxPayloadSize, []),
|
||||
ok = socket_send(Socket, Frames),
|
||||
{ok, length(Frames)}
|
||||
end;
|
||||
send_transfer(Transfer0, Sections0, FooterOpt, MaxMessageSize,
|
||||
#state{socket = Socket,
|
||||
channel = Channel,
|
||||
|
@ -918,7 +947,8 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _},
|
|||
target = LinkTarget,
|
||||
delivery_count = unpack(InitialDeliveryCount),
|
||||
max_message_size = unpack(MaxMessageSize),
|
||||
footer_opt = maps:get(footer_opt, Args, undefined)},
|
||||
footer_opt = maps:get(footer_opt, Args, undefined),
|
||||
raw_mode = maps:get(raw_mode, Args, false)},
|
||||
|
||||
{State#state{links = Links#{OutHandle => Link},
|
||||
next_link_handle = NextLinkHandle,
|
||||
|
@ -1199,10 +1229,14 @@ complete_partial_transfer(_Transfer, Payload,
|
|||
{T, iolist_to_binary(lists:reverse([Payload | Payloads])),
|
||||
Link#link{partial_transfers = undefined}}.
|
||||
|
||||
decode_as_msg(Transfer, Payload, undefined) ->
|
||||
decode_as_msg(#'v1_0.transfer'{settled = Settled,
|
||||
delivery_id = {uint, DeliveryId}},
|
||||
Payload, _, true) ->
|
||||
{ok, amqp10_raw_msg:new(Settled, DeliveryId, Payload)};
|
||||
decode_as_msg(Transfer, Payload, undefined, _) ->
|
||||
Sections = amqp10_framing:decode_bin(Payload),
|
||||
{ok, amqp10_msg:from_amqp_records([Transfer | Sections])};
|
||||
decode_as_msg(Transfer, Payload, FooterOpt) ->
|
||||
decode_as_msg(Transfer, Payload, FooterOpt, _) ->
|
||||
PosSections = decode_sections([], Payload, byte_size(Payload), 0),
|
||||
Sections = lists:map(fun({_Pos, S}) -> S end, PosSections),
|
||||
Msg = amqp10_msg:from_amqp_records([Transfer | Sections]),
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
%% This Source Code Form is subject to the terms of the Mozilla Public
|
||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
%%
|
||||
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
||||
%%
|
||||
-module(amqp10_raw_msg).
|
||||
|
||||
-include_lib("amqp10_common/include/amqp10_framing.hrl").
|
||||
-include_lib("amqp10_common/include/amqp10_types.hrl").
|
||||
|
||||
%% Just for AMQP10 shovel usage. It avoids the binary <-> amqp10 <-> mc
|
||||
%% conversion, with all the unnecessary encoding/decoding steps.
|
||||
%% It allows for just binary <-> mc conversion, as the payload is stored as is
|
||||
|
||||
-export([new/3,
|
||||
settled/1,
|
||||
delivery_tag/1,
|
||||
payload/1,
|
||||
handle/1,
|
||||
set_handle/2,
|
||||
transfer/1,
|
||||
is/1]).
|
||||
|
||||
-record(amqp10_raw_msg,
|
||||
{settled :: boolean(),
|
||||
delivery_tag :: non_neg_integer(),
|
||||
payload :: binary(),
|
||||
handle :: non_neg_integer() | undefined
|
||||
}).
|
||||
|
||||
-opaque amqp10_raw_msg() :: #amqp10_raw_msg{}.
|
||||
|
||||
-export_type([amqp10_raw_msg/0]).
|
||||
|
||||
-spec new(boolean(), non_neg_integer(), binary()) ->
|
||||
amqp10_raw_msg().
|
||||
new(Settled, DeliveryTag, Payload) ->
|
||||
#amqp10_raw_msg{settled = Settled,
|
||||
delivery_tag = DeliveryTag,
|
||||
payload = Payload}.
|
||||
|
||||
-spec settled(amqp10_raw_msg()) -> boolean().
|
||||
settled(#amqp10_raw_msg{settled = Settled}) ->
|
||||
Settled.
|
||||
|
||||
-spec delivery_tag(amqp10_raw_msg()) -> non_neg_integer().
|
||||
delivery_tag(#amqp10_raw_msg{delivery_tag = DeliveryTag}) ->
|
||||
DeliveryTag.
|
||||
|
||||
-spec payload(amqp10_raw_msg()) -> binary().
|
||||
payload(#amqp10_raw_msg{payload = Payload}) ->
|
||||
Payload.
|
||||
|
||||
-spec handle(amqp10_raw_msg()) -> non_neg_integer().
|
||||
handle(#amqp10_raw_msg{handle = Handle}) ->
|
||||
Handle.
|
||||
|
||||
-spec set_handle(non_neg_integer(), amqp10_raw_msg()) ->
|
||||
amqp10_raw_msg().
|
||||
set_handle(Handle, #amqp10_raw_msg{} = Msg) ->
|
||||
Msg#amqp10_raw_msg{handle = Handle}.
|
||||
|
||||
-spec transfer(amqp10_raw_msg()) -> #'v1_0.transfer'{}.
|
||||
transfer(#amqp10_raw_msg{settled = Settled,
|
||||
delivery_tag = DeliveryTag,
|
||||
handle = Handle}) ->
|
||||
#'v1_0.transfer'{
|
||||
delivery_tag = {binary, rabbit_data_coercion:to_binary(DeliveryTag)},
|
||||
settled = Settled,
|
||||
handle = {uint, Handle},
|
||||
message_format = {uint, ?MESSAGE_FORMAT}}.
|
||||
|
||||
-spec is(term()) -> boolean().
|
||||
is(Record) ->
|
||||
is_record(Record, amqp10_raw_msg).
|
|
@ -11,6 +11,7 @@
|
|||
|
||||
-include_lib("rabbit/include/mc.hrl").
|
||||
-include("rabbit_shovel.hrl").
|
||||
-include_lib("amqp10_common/include/amqp10_framing.hrl").
|
||||
|
||||
-export([
|
||||
parse/2,
|
||||
|
@ -81,7 +82,9 @@ connect_source(State = #{name := Name,
|
|||
on_publish -> unsettled;
|
||||
on_confirm -> unsettled
|
||||
end,
|
||||
AttachFun = fun amqp10_client:attach_receiver_link/5,
|
||||
AttachFun = fun(S, L, A, SSM, D) ->
|
||||
amqp10_client:attach_receiver_link(S, L, A, SSM, D, #{}, #{}, true)
|
||||
end,
|
||||
{Conn, Sess, LinkRef} = connect(Name, SndSettleMode, Uri, "receiver", Addr, Src,
|
||||
AttachFun),
|
||||
State#{source => Src#{current => #{conn => Conn,
|
||||
|
@ -128,6 +131,7 @@ connect(Name, SndSettleMode, Uri, Postfix, Addr, Map, AttachFun) ->
|
|||
% needs to be sync, i.e. awaits the 'attach' event as
|
||||
% else we may try to use the link before it is ready
|
||||
Durability = maps:get(durability, Map, unsettled_state),
|
||||
%% Attach in raw mode
|
||||
{ok, LinkRef} = AttachFun(Sess, LinkName, Addr,
|
||||
SndSettleMode,
|
||||
Durability),
|
||||
|
@ -181,11 +185,10 @@ dest_endpoint(#{shovel_type := dynamic,
|
|||
|
||||
-spec handle_source(Msg :: any(), state()) ->
|
||||
not_handled | state() | {stop, any()}.
|
||||
handle_source({amqp10_msg, _LinkRef, Msg0}, State) ->
|
||||
Tag = amqp10_msg:delivery_id(Msg0),
|
||||
[_ | Rest] = amqp10_msg:to_amqp_records(Msg0),
|
||||
Bin = iolist_to_binary([amqp10_framing:encode_bin(D) || D <- Rest]),
|
||||
Msg = mc:init(mc_amqp, Bin, #{}),
|
||||
handle_source({amqp10_msg, _LinkRef, RawMsg}, State) ->
|
||||
Tag = amqp10_raw_msg:delivery_tag(RawMsg),
|
||||
Payload = amqp10_raw_msg:payload(RawMsg),
|
||||
Msg = mc:init(mc_amqp, Payload, #{}),
|
||||
rabbit_shovel_behaviour:forward(Tag, Msg, State);
|
||||
handle_source({amqp10_event, {connection, Conn, opened}},
|
||||
State = #{source := #{current := #{conn := Conn}}}) ->
|
||||
|
@ -333,11 +336,10 @@ forward(Tag, Msg0,
|
|||
unacked := Unacked} = Dst,
|
||||
ack_mode := AckMode} = State) ->
|
||||
OutTag = rabbit_data_coercion:to_binary(Tag),
|
||||
Msg1 = mc:protocol_state(mc:convert(mc_amqp, Msg0)),
|
||||
Records = lists:flatten([amqp10_framing:decode_bin(iolist_to_binary(S)) || S <- Msg1]),
|
||||
Msg2 = amqp10_msg:new(OutTag, Records, AckMode =/= on_confirm),
|
||||
Msg = add_timestamp_header(State, add_forward_headers(State, Msg2)),
|
||||
case send_msg(Link, Msg) of
|
||||
Msg1 = add_timestamp_header(State, add_forward_headers(State, Msg0)),
|
||||
Msg2 = mc:protocol_state(mc:convert(mc_amqp, Msg1)),
|
||||
Msg3 = amqp10_raw_msg:new(AckMode =/= on_confirm, Tag, iolist_to_binary(Msg2)),
|
||||
case send_msg(Link, Msg3) of
|
||||
ok ->
|
||||
rabbit_shovel_behaviour:decr_remaining_unacked(
|
||||
case AckMode of
|
||||
|
@ -366,10 +368,13 @@ send_msg(Link, Msg) ->
|
|||
end.
|
||||
|
||||
add_timestamp_header(#{dest := #{add_timestamp_header := true}}, Msg) ->
|
||||
Anns = #{<<"x-opt-shovelled-timestamp">> => os:system_time(milli_seconds)},
|
||||
amqp10_msg:set_message_annotations(Anns, Msg);
|
||||
mc:set_annotation(
|
||||
<<"x-opt-shovelled-timestamp">>, os:system_time(milli_seconds),
|
||||
Msg);
|
||||
add_timestamp_header(_, Msg) -> Msg.
|
||||
|
||||
add_forward_headers(#{dest := #{cached_forward_headers := Anns}}, Msg) ->
|
||||
amqp10_msg:set_message_annotations(Anns, Msg);
|
||||
maps:fold(fun(K, V, Acc) ->
|
||||
mc:set_annotation(K, V, Acc)
|
||||
end, Msg, Anns);
|
||||
add_forward_headers(_, Msg) -> Msg.
|
||||
|
|
|
@ -74,7 +74,9 @@ amqp_encoded_data_list(_Config) ->
|
|||
#'v1_0.data'{content = <<"one">>},
|
||||
#'v1_0.data'{content = <<"two">>}
|
||||
],
|
||||
Msg = amqp10_msg:new(55, Body),
|
||||
[_Transfer | Sections] = amqp10_msg:to_amqp_records(amqp10_msg:new(<<"55">>, Body)),
|
||||
Bin = iolist_to_binary([amqp10_framing:encode_bin(S) || S <- Sections]),
|
||||
Msg = amqp10_raw_msg:new(true, 55, Bin),
|
||||
rabbit_amqp10_shovel:handle_source({amqp10_msg, linkref, Msg}, State),
|
||||
|
||||
?assert(meck:validate(rabbit_shovel_behaviour)),
|
||||
|
@ -91,8 +93,11 @@ amqp_encoded_amqp_value(_Config) ->
|
|||
State = #{source => #{},
|
||||
dest => #{module => rabbit_amqp10_shovel},
|
||||
ack_mode => no_ack},
|
||||
|
||||
Body = #'v1_0.amqp_value'{content = {utf8, <<"hi">>}},
|
||||
Msg = amqp10_msg:new(55, Body),
|
||||
[_Transfer | Sections] = amqp10_msg:to_amqp_records(amqp10_msg:new(<<"55">>, Body)),
|
||||
Bin = iolist_to_binary([amqp10_framing:encode_bin(S) || S <- Sections]),
|
||||
Msg = amqp10_raw_msg:new(true, 55, Bin),
|
||||
rabbit_amqp10_shovel:handle_source({amqp10_msg, linkref, Msg}, State),
|
||||
|
||||
?assert(meck:validate(rabbit_shovel_behaviour)),
|
||||
|
|
Loading…
Reference in New Issue