diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl index d587d9a417..1c4674d995 100644 --- a/deps/amqp10_client/src/amqp10_client.erl +++ b/deps/amqp10_client/src/amqp10_client.erl @@ -28,6 +28,7 @@ attach_receiver_link/5, attach_receiver_link/6, attach_receiver_link/7, + attach_receiver_link/8, attach_link/2, detach_link/1, send_msg/2, @@ -277,7 +278,17 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) -> -spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(), terminus_durability(), filter(), properties()) -> {ok, link_ref()}. -attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) +attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) -> + attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties, false). + +%% @doc Attaches a receiver link to a source. +%% This is asynchronous and will notify completion of the attach request to the +%% caller using an amqp10_event of the following format: +%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}} +-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(), + terminus_durability(), filter(), properties(), boolean()) -> + {ok, link_ref()}. +attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties, RawMode) when is_pid(Session) andalso is_binary(Name) andalso is_binary(Source) andalso @@ -286,14 +297,16 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Prop SettleMode == mixed) andalso is_atom(Durability) andalso is_map(Filter) andalso - is_map(Properties) -> + is_map(Properties) andalso + is_boolean(RawMode) -> AttachArgs = #{name => Name, role => {receiver, #{address => Source, durable => Durability}, self()}, snd_settle_mode => SettleMode, rcv_settle_mode => first, filter => Filter, - properties => Properties}, + properties => Properties, + raw_mode => RawMode}, amqp10_client_session:attach(Session, AttachArgs). -spec attach_link(pid(), attach_args()) -> {ok, link_ref()}. @@ -355,11 +368,16 @@ stop_receiver_link(#link_ref{role = receiver, %%% messages %% @doc Send a message on a the link referred to be the 'LinkRef'. --spec send_msg(link_ref(), amqp10_msg:amqp10_msg()) -> +-spec send_msg(link_ref(), amqp10_msg:amqp10_msg() | amqp10_raw_msg:amqp10_raw_msg()) -> ok | amqp10_client_session:transfer_error(). send_msg(#link_ref{role = sender, session = Session, link_handle = Handle}, Msg0) -> - Msg = amqp10_msg:set_handle(Handle, Msg0), + Msg = case amqp10_raw_msg:is(Msg0) of + true -> + amqp10_raw_msg:set_handle(Handle, Msg0); + false -> + amqp10_msg:set_handle(Handle, Msg0) + end, amqp10_client_session:transfer(Session, Msg, ?TIMEOUT). %% @doc Accept a message on a the link referred to be the 'LinkRef'. diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index fdb75d7d93..68d01ee394 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -100,7 +100,8 @@ properties => amqp10_client_types:properties(), max_message_size => max_message_size(), handle => output_handle(), - footer_opt => footer_opt() + footer_opt => footer_opt(), + raw_mode => boolean() }. -type transfer_error() :: {error, @@ -142,7 +143,8 @@ auto_flow :: never | {RenewWhenBelow :: pos_integer(), Credit :: pos_integer()}, incoming_unsettled = #{} :: #{delivery_number() => ok}, - footer_opt :: footer_opt() | undefined + footer_opt :: footer_opt() | undefined, + raw_mode = false :: boolean() }). -record(state, @@ -208,11 +210,18 @@ attach(Session, Args) -> detach(Session, Handle) -> gen_statem:call(Session, {detach, Handle}, ?TIMEOUT). --spec transfer(pid(), amqp10_msg:amqp10_msg(), timeout()) -> +-spec transfer(pid(), amqp10_msg:amqp10_msg() | amqp10_raw_msg:amqp10_raw_msg(), timeout()) -> ok | transfer_error(). transfer(Session, Amqp10Msg, Timeout) -> - [Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg), - gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout). + case amqp10_raw_msg:is(Amqp10Msg) of + true -> + Transfer = amqp10_raw_msg:transfer(Amqp10Msg), + Payload = amqp10_raw_msg:payload(Amqp10Msg), + gen_statem:call(Session, {transfer, Transfer, {raw, Payload}}, Timeout); + false -> + [Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg), + gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout) + end. -spec flow(pid(), non_neg_integer(), never | pos_integer()) -> ok. flow(Session, IncomingWindow, RenewWhenBelow) when @@ -413,7 +422,8 @@ mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}}, {ok, #link{target = {pid, TargetPid}, ref = LinkRef, incoming_unsettled = Unsettled, - footer_opt = FooterOpt + footer_opt = FooterOpt, + raw_mode = RawMode } = Link0} = find_link_by_input_handle(InHandle, State0), {Transfer = #'v1_0.transfer'{settled = Settled, @@ -428,7 +438,7 @@ mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}}, %% then the settled flag MUST be interpreted as being false." [2.7.5] Link1#link{incoming_unsettled = Unsettled#{DeliveryId => ok}} end, - case decode_as_msg(Transfer, Payload, FooterOpt) of + case decode_as_msg(Transfer, Payload, FooterOpt, RawMode) of {ok, Msg} -> % link bookkeeping % notify when credit is exhausted (link_credit = 0) @@ -619,6 +629,25 @@ send(Record, #state{socket = Socket} = State) -> Frame = encode_frame(Record, State), socket_send(Socket, Frame). +send_transfer(Transfer0, {raw, Payload}, _FooterOpt, MaxMessageSize, + #state{socket = Socket, + channel = Channel, + connection_config = Config}) -> + OutMaxFrameSize = maps:get(outgoing_max_frame_size, Config), + Transfer = Transfer0#'v1_0.transfer'{more = false}, + TransferSize = iolist_size(amqp10_framing:encode_bin(Transfer)), + if is_integer(MaxMessageSize) andalso + MaxMessageSize > 0 andalso + byte_size(Payload) > MaxMessageSize -> + {error, message_size_exceeded}; + true -> + % TODO: this does not take the extended header into account + % see: 2.3 + MaxPayloadSize = OutMaxFrameSize - ?FRAME_HEADER_SIZE - TransferSize, + Frames = build_frames(Channel, Transfer, Payload, MaxPayloadSize, []), + ok = socket_send(Socket, Frames), + {ok, length(Frames)} + end; send_transfer(Transfer0, Sections0, FooterOpt, MaxMessageSize, #state{socket = Socket, channel = Channel, @@ -918,7 +947,8 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _}, target = LinkTarget, delivery_count = unpack(InitialDeliveryCount), max_message_size = unpack(MaxMessageSize), - footer_opt = maps:get(footer_opt, Args, undefined)}, + footer_opt = maps:get(footer_opt, Args, undefined), + raw_mode = maps:get(raw_mode, Args, false)}, {State#state{links = Links#{OutHandle => Link}, next_link_handle = NextLinkHandle, @@ -1199,10 +1229,14 @@ complete_partial_transfer(_Transfer, Payload, {T, iolist_to_binary(lists:reverse([Payload | Payloads])), Link#link{partial_transfers = undefined}}. -decode_as_msg(Transfer, Payload, undefined) -> +decode_as_msg(#'v1_0.transfer'{settled = Settled, + delivery_id = {uint, DeliveryId}}, + Payload, _, true) -> + {ok, amqp10_raw_msg:new(Settled, DeliveryId, Payload)}; +decode_as_msg(Transfer, Payload, undefined, _) -> Sections = amqp10_framing:decode_bin(Payload), {ok, amqp10_msg:from_amqp_records([Transfer | Sections])}; -decode_as_msg(Transfer, Payload, FooterOpt) -> +decode_as_msg(Transfer, Payload, FooterOpt, _) -> PosSections = decode_sections([], Payload, byte_size(Payload), 0), Sections = lists:map(fun({_Pos, S}) -> S end, PosSections), Msg = amqp10_msg:from_amqp_records([Transfer | Sections]), diff --git a/deps/amqp10_client/src/amqp10_raw_msg.erl b/deps/amqp10_client/src/amqp10_raw_msg.erl new file mode 100644 index 0000000000..0d1f66f44a --- /dev/null +++ b/deps/amqp10_client/src/amqp10_raw_msg.erl @@ -0,0 +1,76 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% +-module(amqp10_raw_msg). + +-include_lib("amqp10_common/include/amqp10_framing.hrl"). +-include_lib("amqp10_common/include/amqp10_types.hrl"). + +%% Just for AMQP10 shovel usage. It avoids the binary <-> amqp10 <-> mc +%% conversion, with all the unnecessary encoding/decoding steps. +%% It allows for just binary <-> mc conversion, as the payload is stored as is + +-export([new/3, + settled/1, + delivery_tag/1, + payload/1, + handle/1, + set_handle/2, + transfer/1, + is/1]). + +-record(amqp10_raw_msg, + {settled :: boolean(), + delivery_tag :: non_neg_integer(), + payload :: binary(), + handle :: non_neg_integer() | undefined + }). + +-opaque amqp10_raw_msg() :: #amqp10_raw_msg{}. + +-export_type([amqp10_raw_msg/0]). + +-spec new(boolean(), non_neg_integer(), binary()) -> + amqp10_raw_msg(). +new(Settled, DeliveryTag, Payload) -> + #amqp10_raw_msg{settled = Settled, + delivery_tag = DeliveryTag, + payload = Payload}. + +-spec settled(amqp10_raw_msg()) -> boolean(). +settled(#amqp10_raw_msg{settled = Settled}) -> + Settled. + +-spec delivery_tag(amqp10_raw_msg()) -> non_neg_integer(). +delivery_tag(#amqp10_raw_msg{delivery_tag = DeliveryTag}) -> + DeliveryTag. + +-spec payload(amqp10_raw_msg()) -> binary(). +payload(#amqp10_raw_msg{payload = Payload}) -> + Payload. + +-spec handle(amqp10_raw_msg()) -> non_neg_integer(). +handle(#amqp10_raw_msg{handle = Handle}) -> + Handle. + +-spec set_handle(non_neg_integer(), amqp10_raw_msg()) -> + amqp10_raw_msg(). +set_handle(Handle, #amqp10_raw_msg{} = Msg) -> + Msg#amqp10_raw_msg{handle = Handle}. + +-spec transfer(amqp10_raw_msg()) -> #'v1_0.transfer'{}. +transfer(#amqp10_raw_msg{settled = Settled, + delivery_tag = DeliveryTag, + handle = Handle}) -> + #'v1_0.transfer'{ + delivery_tag = {binary, rabbit_data_coercion:to_binary(DeliveryTag)}, + settled = Settled, + handle = {uint, Handle}, + message_format = {uint, ?MESSAGE_FORMAT}}. + +-spec is(term()) -> boolean(). +is(Record) -> + is_record(Record, amqp10_raw_msg). diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl index 807571b0ae..229c475d7f 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl @@ -11,6 +11,7 @@ -include_lib("rabbit/include/mc.hrl"). -include("rabbit_shovel.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). -export([ parse/2, @@ -81,7 +82,9 @@ connect_source(State = #{name := Name, on_publish -> unsettled; on_confirm -> unsettled end, - AttachFun = fun amqp10_client:attach_receiver_link/5, + AttachFun = fun(S, L, A, SSM, D) -> + amqp10_client:attach_receiver_link(S, L, A, SSM, D, #{}, #{}, true) + end, {Conn, Sess, LinkRef} = connect(Name, SndSettleMode, Uri, "receiver", Addr, Src, AttachFun), State#{source => Src#{current => #{conn => Conn, @@ -128,6 +131,7 @@ connect(Name, SndSettleMode, Uri, Postfix, Addr, Map, AttachFun) -> % needs to be sync, i.e. awaits the 'attach' event as % else we may try to use the link before it is ready Durability = maps:get(durability, Map, unsettled_state), + %% Attach in raw mode {ok, LinkRef} = AttachFun(Sess, LinkName, Addr, SndSettleMode, Durability), @@ -181,11 +185,10 @@ dest_endpoint(#{shovel_type := dynamic, -spec handle_source(Msg :: any(), state()) -> not_handled | state() | {stop, any()}. -handle_source({amqp10_msg, _LinkRef, Msg0}, State) -> - Tag = amqp10_msg:delivery_id(Msg0), - [_ | Rest] = amqp10_msg:to_amqp_records(Msg0), - Bin = iolist_to_binary([amqp10_framing:encode_bin(D) || D <- Rest]), - Msg = mc:init(mc_amqp, Bin, #{}), +handle_source({amqp10_msg, _LinkRef, RawMsg}, State) -> + Tag = amqp10_raw_msg:delivery_tag(RawMsg), + Payload = amqp10_raw_msg:payload(RawMsg), + Msg = mc:init(mc_amqp, Payload, #{}), rabbit_shovel_behaviour:forward(Tag, Msg, State); handle_source({amqp10_event, {connection, Conn, opened}}, State = #{source := #{current := #{conn := Conn}}}) -> @@ -333,11 +336,10 @@ forward(Tag, Msg0, unacked := Unacked} = Dst, ack_mode := AckMode} = State) -> OutTag = rabbit_data_coercion:to_binary(Tag), - Msg1 = mc:protocol_state(mc:convert(mc_amqp, Msg0)), - Records = lists:flatten([amqp10_framing:decode_bin(iolist_to_binary(S)) || S <- Msg1]), - Msg2 = amqp10_msg:new(OutTag, Records, AckMode =/= on_confirm), - Msg = add_timestamp_header(State, add_forward_headers(State, Msg2)), - case send_msg(Link, Msg) of + Msg1 = add_timestamp_header(State, add_forward_headers(State, Msg0)), + Msg2 = mc:protocol_state(mc:convert(mc_amqp, Msg1)), + Msg3 = amqp10_raw_msg:new(AckMode =/= on_confirm, Tag, iolist_to_binary(Msg2)), + case send_msg(Link, Msg3) of ok -> rabbit_shovel_behaviour:decr_remaining_unacked( case AckMode of @@ -366,10 +368,13 @@ send_msg(Link, Msg) -> end. add_timestamp_header(#{dest := #{add_timestamp_header := true}}, Msg) -> - Anns = #{<<"x-opt-shovelled-timestamp">> => os:system_time(milli_seconds)}, - amqp10_msg:set_message_annotations(Anns, Msg); + mc:set_annotation( + <<"x-opt-shovelled-timestamp">>, os:system_time(milli_seconds), + Msg); add_timestamp_header(_, Msg) -> Msg. add_forward_headers(#{dest := #{cached_forward_headers := Anns}}, Msg) -> - amqp10_msg:set_message_annotations(Anns, Msg); + maps:fold(fun(K, V, Acc) -> + mc:set_annotation(K, V, Acc) + end, Msg, Anns); add_forward_headers(_, Msg) -> Msg. diff --git a/deps/rabbitmq_shovel/test/amqp10_shovel_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_shovel_SUITE.erl index 834813fe6a..ca2eb33120 100644 --- a/deps/rabbitmq_shovel/test/amqp10_shovel_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_shovel_SUITE.erl @@ -74,7 +74,9 @@ amqp_encoded_data_list(_Config) -> #'v1_0.data'{content = <<"one">>}, #'v1_0.data'{content = <<"two">>} ], - Msg = amqp10_msg:new(55, Body), + [_Transfer | Sections] = amqp10_msg:to_amqp_records(amqp10_msg:new(<<"55">>, Body)), + Bin = iolist_to_binary([amqp10_framing:encode_bin(S) || S <- Sections]), + Msg = amqp10_raw_msg:new(true, 55, Bin), rabbit_amqp10_shovel:handle_source({amqp10_msg, linkref, Msg}, State), ?assert(meck:validate(rabbit_shovel_behaviour)), @@ -91,8 +93,11 @@ amqp_encoded_amqp_value(_Config) -> State = #{source => #{}, dest => #{module => rabbit_amqp10_shovel}, ack_mode => no_ack}, + Body = #'v1_0.amqp_value'{content = {utf8, <<"hi">>}}, - Msg = amqp10_msg:new(55, Body), + [_Transfer | Sections] = amqp10_msg:to_amqp_records(amqp10_msg:new(<<"55">>, Body)), + Bin = iolist_to_binary([amqp10_framing:encode_bin(S) || S <- Sections]), + Msg = amqp10_raw_msg:new(true, 55, Bin), rabbit_amqp10_shovel:handle_source({amqp10_msg, linkref, Msg}, State), ?assert(meck:validate(rabbit_shovel_behaviour)),