Merge pull request #14601 from rabbitmq/mergify/bp/v4.2.x/pr-14593
Shovels: Optimise amqp10 client messages for shovel usage (backport #14593)
This commit is contained in:
		
						commit
						d6755e49a9
					
				| 
						 | 
				
			
			@ -28,6 +28,7 @@
 | 
			
		|||
         attach_receiver_link/5,
 | 
			
		||||
         attach_receiver_link/6,
 | 
			
		||||
         attach_receiver_link/7,
 | 
			
		||||
         attach_receiver_link/8,
 | 
			
		||||
         attach_link/2,
 | 
			
		||||
         detach_link/1,
 | 
			
		||||
         send_msg/2,
 | 
			
		||||
| 
						 | 
				
			
			@ -277,7 +278,17 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) ->
 | 
			
		|||
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
 | 
			
		||||
                           terminus_durability(), filter(), properties()) ->
 | 
			
		||||
    {ok, link_ref()}.
 | 
			
		||||
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties)
 | 
			
		||||
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) ->
 | 
			
		||||
    attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties, false).
 | 
			
		||||
 | 
			
		||||
%% @doc Attaches a receiver link to a source.
 | 
			
		||||
%% This is asynchronous and will notify completion of the attach request to the
 | 
			
		||||
%% caller using an amqp10_event of the following format:
 | 
			
		||||
%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}}
 | 
			
		||||
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
 | 
			
		||||
                           terminus_durability(), filter(), properties(), boolean()) ->
 | 
			
		||||
    {ok, link_ref()}.
 | 
			
		||||
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties, RawMode)
 | 
			
		||||
  when is_pid(Session) andalso
 | 
			
		||||
       is_binary(Name) andalso
 | 
			
		||||
       is_binary(Source) andalso
 | 
			
		||||
| 
						 | 
				
			
			@ -286,14 +297,16 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Prop
 | 
			
		|||
        SettleMode == mixed) andalso
 | 
			
		||||
       is_atom(Durability) andalso
 | 
			
		||||
       is_map(Filter) andalso
 | 
			
		||||
       is_map(Properties) ->
 | 
			
		||||
       is_map(Properties) andalso
 | 
			
		||||
       is_boolean(RawMode) ->
 | 
			
		||||
    AttachArgs = #{name => Name,
 | 
			
		||||
                   role => {receiver, #{address => Source,
 | 
			
		||||
                                        durable => Durability}, self()},
 | 
			
		||||
                   snd_settle_mode => SettleMode,
 | 
			
		||||
                   rcv_settle_mode => first,
 | 
			
		||||
                   filter => Filter,
 | 
			
		||||
                   properties => Properties},
 | 
			
		||||
                   properties => Properties,
 | 
			
		||||
                   raw_mode => RawMode},
 | 
			
		||||
    amqp10_client_session:attach(Session, AttachArgs).
 | 
			
		||||
 | 
			
		||||
-spec attach_link(pid(), attach_args()) -> {ok, link_ref()}.
 | 
			
		||||
| 
						 | 
				
			
			@ -355,11 +368,16 @@ stop_receiver_link(#link_ref{role = receiver,
 | 
			
		|||
%%% messages
 | 
			
		||||
 | 
			
		||||
%% @doc Send a message on a the link referred to be the 'LinkRef'.
 | 
			
		||||
-spec send_msg(link_ref(), amqp10_msg:amqp10_msg()) ->
 | 
			
		||||
-spec send_msg(link_ref(), amqp10_msg:amqp10_msg() | amqp10_raw_msg:amqp10_raw_msg()) ->
 | 
			
		||||
    ok | amqp10_client_session:transfer_error().
 | 
			
		||||
send_msg(#link_ref{role = sender, session = Session,
 | 
			
		||||
                   link_handle = Handle}, Msg0) ->
 | 
			
		||||
    Msg = amqp10_msg:set_handle(Handle, Msg0),
 | 
			
		||||
    Msg = case amqp10_raw_msg:is(Msg0) of
 | 
			
		||||
              true ->
 | 
			
		||||
                  amqp10_raw_msg:set_handle(Handle, Msg0);
 | 
			
		||||
              false ->
 | 
			
		||||
                  amqp10_msg:set_handle(Handle, Msg0)
 | 
			
		||||
          end,
 | 
			
		||||
    amqp10_client_session:transfer(Session, Msg, ?TIMEOUT).
 | 
			
		||||
 | 
			
		||||
%% @doc Accept a message on a the link referred to be the 'LinkRef'.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -100,7 +100,8 @@
 | 
			
		|||
                         properties => amqp10_client_types:properties(),
 | 
			
		||||
                         max_message_size => max_message_size(),
 | 
			
		||||
                         handle => output_handle(),
 | 
			
		||||
                         footer_opt => footer_opt()
 | 
			
		||||
                         footer_opt => footer_opt(),
 | 
			
		||||
                         raw_mode => boolean()
 | 
			
		||||
                        }.
 | 
			
		||||
 | 
			
		||||
-type transfer_error() :: {error,
 | 
			
		||||
| 
						 | 
				
			
			@ -142,7 +143,8 @@
 | 
			
		|||
         auto_flow :: never | {RenewWhenBelow :: pos_integer(),
 | 
			
		||||
                               Credit :: pos_integer()},
 | 
			
		||||
         incoming_unsettled = #{} :: #{delivery_number() => ok},
 | 
			
		||||
         footer_opt :: footer_opt() | undefined
 | 
			
		||||
         footer_opt :: footer_opt() | undefined,
 | 
			
		||||
         raw_mode = false :: boolean()
 | 
			
		||||
        }).
 | 
			
		||||
 | 
			
		||||
-record(state,
 | 
			
		||||
| 
						 | 
				
			
			@ -208,11 +210,18 @@ attach(Session, Args) ->
 | 
			
		|||
detach(Session, Handle) ->
 | 
			
		||||
    gen_statem:call(Session, {detach, Handle}, ?TIMEOUT).
 | 
			
		||||
 | 
			
		||||
-spec transfer(pid(), amqp10_msg:amqp10_msg(), timeout()) ->
 | 
			
		||||
-spec transfer(pid(), amqp10_msg:amqp10_msg() | amqp10_raw_msg:amqp10_raw_msg(), timeout()) ->
 | 
			
		||||
    ok | transfer_error().
 | 
			
		||||
transfer(Session, Amqp10Msg, Timeout) ->
 | 
			
		||||
    [Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg),
 | 
			
		||||
    gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout).
 | 
			
		||||
    case amqp10_raw_msg:is(Amqp10Msg) of
 | 
			
		||||
        true ->
 | 
			
		||||
            Transfer = amqp10_raw_msg:transfer(Amqp10Msg),
 | 
			
		||||
            Payload = amqp10_raw_msg:payload(Amqp10Msg),
 | 
			
		||||
            gen_statem:call(Session, {transfer, Transfer, {raw, Payload}}, Timeout);
 | 
			
		||||
        false ->
 | 
			
		||||
            [Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg),
 | 
			
		||||
            gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout)
 | 
			
		||||
    end.
 | 
			
		||||
 | 
			
		||||
-spec flow(pid(), non_neg_integer(), never | pos_integer()) -> ok.
 | 
			
		||||
flow(Session, IncomingWindow, RenewWhenBelow) when
 | 
			
		||||
| 
						 | 
				
			
			@ -413,7 +422,8 @@ mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}},
 | 
			
		|||
    {ok, #link{target = {pid, TargetPid},
 | 
			
		||||
               ref = LinkRef,
 | 
			
		||||
               incoming_unsettled = Unsettled,
 | 
			
		||||
               footer_opt = FooterOpt
 | 
			
		||||
               footer_opt = FooterOpt,
 | 
			
		||||
               raw_mode = RawMode
 | 
			
		||||
              } = Link0} = find_link_by_input_handle(InHandle, State0),
 | 
			
		||||
 | 
			
		||||
    {Transfer = #'v1_0.transfer'{settled = Settled,
 | 
			
		||||
| 
						 | 
				
			
			@ -428,7 +438,7 @@ mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}},
 | 
			
		|||
                    %% then the settled flag MUST be interpreted as being false." [2.7.5]
 | 
			
		||||
                    Link1#link{incoming_unsettled = Unsettled#{DeliveryId => ok}}
 | 
			
		||||
            end,
 | 
			
		||||
    case decode_as_msg(Transfer, Payload, FooterOpt) of
 | 
			
		||||
    case decode_as_msg(Transfer, Payload, FooterOpt, RawMode) of
 | 
			
		||||
        {ok, Msg} ->
 | 
			
		||||
            % link bookkeeping
 | 
			
		||||
            % notify when credit is exhausted (link_credit = 0)
 | 
			
		||||
| 
						 | 
				
			
			@ -619,6 +629,25 @@ send(Record, #state{socket = Socket} = State) ->
 | 
			
		|||
    Frame = encode_frame(Record, State),
 | 
			
		||||
    socket_send(Socket, Frame).
 | 
			
		||||
 | 
			
		||||
send_transfer(Transfer0, {raw, Payload}, _FooterOpt, MaxMessageSize,
 | 
			
		||||
              #state{socket = Socket,
 | 
			
		||||
                     channel = Channel,
 | 
			
		||||
                     connection_config = Config}) ->
 | 
			
		||||
    OutMaxFrameSize = maps:get(outgoing_max_frame_size, Config),
 | 
			
		||||
    Transfer = Transfer0#'v1_0.transfer'{more = false},
 | 
			
		||||
    TransferSize = iolist_size(amqp10_framing:encode_bin(Transfer)),
 | 
			
		||||
    if is_integer(MaxMessageSize) andalso
 | 
			
		||||
       MaxMessageSize > 0 andalso
 | 
			
		||||
       byte_size(Payload) > MaxMessageSize ->
 | 
			
		||||
           {error, message_size_exceeded};
 | 
			
		||||
       true ->
 | 
			
		||||
           % TODO: this does not take the extended header into account
 | 
			
		||||
           % see: 2.3
 | 
			
		||||
           MaxPayloadSize = OutMaxFrameSize - ?FRAME_HEADER_SIZE - TransferSize,
 | 
			
		||||
           Frames = build_frames(Channel, Transfer, Payload, MaxPayloadSize, []),
 | 
			
		||||
           ok = socket_send(Socket, Frames),
 | 
			
		||||
           {ok, length(Frames)}
 | 
			
		||||
    end;
 | 
			
		||||
send_transfer(Transfer0, Sections0, FooterOpt, MaxMessageSize,
 | 
			
		||||
              #state{socket = Socket,
 | 
			
		||||
                     channel = Channel,
 | 
			
		||||
| 
						 | 
				
			
			@ -918,7 +947,8 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _},
 | 
			
		|||
                 target = LinkTarget,
 | 
			
		||||
                 delivery_count = unpack(InitialDeliveryCount),
 | 
			
		||||
                 max_message_size = unpack(MaxMessageSize),
 | 
			
		||||
                 footer_opt = maps:get(footer_opt, Args, undefined)},
 | 
			
		||||
                 footer_opt = maps:get(footer_opt, Args, undefined),
 | 
			
		||||
                 raw_mode = maps:get(raw_mode, Args, false)},
 | 
			
		||||
 | 
			
		||||
    {State#state{links = Links#{OutHandle => Link},
 | 
			
		||||
                 next_link_handle = NextLinkHandle,
 | 
			
		||||
| 
						 | 
				
			
			@ -1199,10 +1229,14 @@ complete_partial_transfer(_Transfer, Payload,
 | 
			
		|||
    {T, iolist_to_binary(lists:reverse([Payload | Payloads])),
 | 
			
		||||
     Link#link{partial_transfers = undefined}}.
 | 
			
		||||
 | 
			
		||||
decode_as_msg(Transfer, Payload, undefined) ->
 | 
			
		||||
decode_as_msg(#'v1_0.transfer'{settled = Settled,
 | 
			
		||||
                               delivery_id = {uint, DeliveryId}},
 | 
			
		||||
              Payload, _, true) ->
 | 
			
		||||
    {ok, amqp10_raw_msg:new(Settled, DeliveryId, Payload)};
 | 
			
		||||
decode_as_msg(Transfer, Payload, undefined, _) ->
 | 
			
		||||
    Sections = amqp10_framing:decode_bin(Payload),
 | 
			
		||||
    {ok, amqp10_msg:from_amqp_records([Transfer | Sections])};
 | 
			
		||||
decode_as_msg(Transfer, Payload, FooterOpt) ->
 | 
			
		||||
decode_as_msg(Transfer, Payload, FooterOpt, _) ->
 | 
			
		||||
    PosSections = decode_sections([], Payload, byte_size(Payload), 0),
 | 
			
		||||
    Sections = lists:map(fun({_Pos, S}) -> S end, PosSections),
 | 
			
		||||
    Msg = amqp10_msg:from_amqp_records([Transfer | Sections]),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,76 @@
 | 
			
		|||
%% This Source Code Form is subject to the terms of the Mozilla Public
 | 
			
		||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
 | 
			
		||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
 | 
			
		||||
%%
 | 
			
		||||
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
 | 
			
		||||
%%
 | 
			
		||||
-module(amqp10_raw_msg).
 | 
			
		||||
 | 
			
		||||
-include_lib("amqp10_common/include/amqp10_framing.hrl").
 | 
			
		||||
-include_lib("amqp10_common/include/amqp10_types.hrl").
 | 
			
		||||
 | 
			
		||||
%% Just for AMQP10 shovel usage. It avoids the binary <-> amqp10 <-> mc
 | 
			
		||||
%% conversion, with all the unnecessary encoding/decoding steps.
 | 
			
		||||
%% It allows for just binary <-> mc conversion, as the payload is stored as is
 | 
			
		||||
 | 
			
		||||
-export([new/3,
 | 
			
		||||
         settled/1,
 | 
			
		||||
         delivery_tag/1,
 | 
			
		||||
         payload/1,
 | 
			
		||||
         handle/1,
 | 
			
		||||
         set_handle/2,
 | 
			
		||||
         transfer/1,
 | 
			
		||||
         is/1]).
 | 
			
		||||
 | 
			
		||||
-record(amqp10_raw_msg,
 | 
			
		||||
        {settled :: boolean(),
 | 
			
		||||
         delivery_tag :: non_neg_integer(),
 | 
			
		||||
         payload :: binary(),
 | 
			
		||||
         handle :: non_neg_integer() | undefined
 | 
			
		||||
        }).
 | 
			
		||||
 | 
			
		||||
-opaque amqp10_raw_msg() :: #amqp10_raw_msg{}.
 | 
			
		||||
 | 
			
		||||
-export_type([amqp10_raw_msg/0]).
 | 
			
		||||
 | 
			
		||||
-spec new(boolean(), non_neg_integer(), binary()) ->
 | 
			
		||||
          amqp10_raw_msg().
 | 
			
		||||
new(Settled, DeliveryTag, Payload) ->
 | 
			
		||||
    #amqp10_raw_msg{settled = Settled,
 | 
			
		||||
                    delivery_tag = DeliveryTag,
 | 
			
		||||
                    payload = Payload}.
 | 
			
		||||
 | 
			
		||||
-spec settled(amqp10_raw_msg()) -> boolean().
 | 
			
		||||
settled(#amqp10_raw_msg{settled = Settled}) ->
 | 
			
		||||
    Settled.
 | 
			
		||||
 | 
			
		||||
-spec delivery_tag(amqp10_raw_msg()) -> non_neg_integer().
 | 
			
		||||
delivery_tag(#amqp10_raw_msg{delivery_tag = DeliveryTag}) ->
 | 
			
		||||
    DeliveryTag.
 | 
			
		||||
 | 
			
		||||
-spec payload(amqp10_raw_msg()) -> binary().
 | 
			
		||||
payload(#amqp10_raw_msg{payload = Payload}) ->
 | 
			
		||||
    Payload.
 | 
			
		||||
 | 
			
		||||
-spec handle(amqp10_raw_msg()) -> non_neg_integer().
 | 
			
		||||
handle(#amqp10_raw_msg{handle = Handle}) ->
 | 
			
		||||
    Handle.
 | 
			
		||||
 | 
			
		||||
-spec set_handle(non_neg_integer(), amqp10_raw_msg()) ->
 | 
			
		||||
    amqp10_raw_msg().
 | 
			
		||||
set_handle(Handle, #amqp10_raw_msg{} = Msg) ->
 | 
			
		||||
    Msg#amqp10_raw_msg{handle = Handle}.
 | 
			
		||||
 | 
			
		||||
-spec transfer(amqp10_raw_msg()) -> #'v1_0.transfer'{}.
 | 
			
		||||
transfer(#amqp10_raw_msg{settled = Settled,
 | 
			
		||||
                         delivery_tag = DeliveryTag,
 | 
			
		||||
                         handle = Handle}) ->
 | 
			
		||||
    #'v1_0.transfer'{
 | 
			
		||||
       delivery_tag = {binary, rabbit_data_coercion:to_binary(DeliveryTag)},
 | 
			
		||||
       settled = Settled,
 | 
			
		||||
       handle = {uint, Handle},
 | 
			
		||||
       message_format = {uint, ?MESSAGE_FORMAT}}.
 | 
			
		||||
 | 
			
		||||
-spec is(term()) -> boolean().
 | 
			
		||||
is(Record) ->
 | 
			
		||||
    is_record(Record, amqp10_raw_msg).
 | 
			
		||||
| 
						 | 
				
			
			@ -11,6 +11,7 @@
 | 
			
		|||
 | 
			
		||||
-include_lib("rabbit/include/mc.hrl").
 | 
			
		||||
-include("rabbit_shovel.hrl").
 | 
			
		||||
-include_lib("amqp10_common/include/amqp10_framing.hrl").
 | 
			
		||||
 | 
			
		||||
-export([
 | 
			
		||||
         parse/2,
 | 
			
		||||
| 
						 | 
				
			
			@ -81,7 +82,9 @@ connect_source(State = #{name := Name,
 | 
			
		|||
                        on_publish -> unsettled;
 | 
			
		||||
                        on_confirm -> unsettled
 | 
			
		||||
                    end,
 | 
			
		||||
    AttachFun = fun amqp10_client:attach_receiver_link/5,
 | 
			
		||||
    AttachFun = fun(S, L, A, SSM, D) ->
 | 
			
		||||
                        amqp10_client:attach_receiver_link(S, L, A, SSM, D, #{}, #{}, true)
 | 
			
		||||
                end,
 | 
			
		||||
    {Conn, Sess, LinkRef} = connect(Name, SndSettleMode, Uri, "receiver", Addr, Src,
 | 
			
		||||
                                    AttachFun),
 | 
			
		||||
    State#{source => Src#{current => #{conn => Conn,
 | 
			
		||||
| 
						 | 
				
			
			@ -128,6 +131,7 @@ connect(Name, SndSettleMode, Uri, Postfix, Addr, Map, AttachFun) ->
 | 
			
		|||
    % needs to be sync, i.e. awaits the 'attach' event as
 | 
			
		||||
    % else we may try to use the link before it is ready
 | 
			
		||||
    Durability = maps:get(durability, Map, unsettled_state),
 | 
			
		||||
    %% Attach in raw mode
 | 
			
		||||
    {ok, LinkRef} = AttachFun(Sess, LinkName, Addr,
 | 
			
		||||
                              SndSettleMode,
 | 
			
		||||
                              Durability),
 | 
			
		||||
| 
						 | 
				
			
			@ -181,11 +185,10 @@ dest_endpoint(#{shovel_type := dynamic,
 | 
			
		|||
 | 
			
		||||
-spec handle_source(Msg :: any(), state()) ->
 | 
			
		||||
    not_handled | state() | {stop, any()}.
 | 
			
		||||
handle_source({amqp10_msg, _LinkRef, Msg0}, State) ->
 | 
			
		||||
    Tag = amqp10_msg:delivery_id(Msg0),
 | 
			
		||||
    [_ | Rest] = amqp10_msg:to_amqp_records(Msg0),
 | 
			
		||||
    Bin = iolist_to_binary([amqp10_framing:encode_bin(D) || D <- Rest]),
 | 
			
		||||
    Msg = mc:init(mc_amqp, Bin, #{}),
 | 
			
		||||
handle_source({amqp10_msg, _LinkRef, RawMsg}, State) ->
 | 
			
		||||
    Tag = amqp10_raw_msg:delivery_tag(RawMsg),
 | 
			
		||||
    Payload = amqp10_raw_msg:payload(RawMsg),
 | 
			
		||||
    Msg = mc:init(mc_amqp, Payload, #{}),
 | 
			
		||||
    rabbit_shovel_behaviour:forward(Tag, Msg, State);
 | 
			
		||||
handle_source({amqp10_event, {connection, Conn, opened}},
 | 
			
		||||
              State = #{source := #{current := #{conn := Conn}}}) ->
 | 
			
		||||
| 
						 | 
				
			
			@ -333,11 +336,10 @@ forward(Tag, Msg0,
 | 
			
		|||
                    unacked := Unacked} = Dst,
 | 
			
		||||
          ack_mode := AckMode} = State) ->
 | 
			
		||||
    OutTag = rabbit_data_coercion:to_binary(Tag),
 | 
			
		||||
    Msg1 = mc:protocol_state(mc:convert(mc_amqp, Msg0)),
 | 
			
		||||
    Records = lists:flatten([amqp10_framing:decode_bin(iolist_to_binary(S)) || S <- Msg1]),
 | 
			
		||||
    Msg2 = amqp10_msg:new(OutTag, Records, AckMode =/= on_confirm),
 | 
			
		||||
    Msg = add_timestamp_header(State, add_forward_headers(State, Msg2)),
 | 
			
		||||
    case send_msg(Link, Msg) of
 | 
			
		||||
    Msg1 = add_timestamp_header(State, add_forward_headers(State, Msg0)),
 | 
			
		||||
    Msg2 = mc:protocol_state(mc:convert(mc_amqp, Msg1)),
 | 
			
		||||
    Msg3 = amqp10_raw_msg:new(AckMode =/= on_confirm, Tag, iolist_to_binary(Msg2)),
 | 
			
		||||
    case send_msg(Link, Msg3) of
 | 
			
		||||
        ok ->
 | 
			
		||||
            rabbit_shovel_behaviour:decr_remaining_unacked(
 | 
			
		||||
              case AckMode of
 | 
			
		||||
| 
						 | 
				
			
			@ -366,10 +368,13 @@ send_msg(Link, Msg) ->
 | 
			
		|||
    end.
 | 
			
		||||
 | 
			
		||||
add_timestamp_header(#{dest := #{add_timestamp_header := true}}, Msg) ->
 | 
			
		||||
    Anns = #{<<"x-opt-shovelled-timestamp">> => os:system_time(milli_seconds)},
 | 
			
		||||
    amqp10_msg:set_message_annotations(Anns, Msg);
 | 
			
		||||
    mc:set_annotation(
 | 
			
		||||
      <<"x-opt-shovelled-timestamp">>, os:system_time(milli_seconds),
 | 
			
		||||
      Msg);
 | 
			
		||||
add_timestamp_header(_, Msg) -> Msg.
 | 
			
		||||
 | 
			
		||||
add_forward_headers(#{dest := #{cached_forward_headers := Anns}}, Msg) ->
 | 
			
		||||
    amqp10_msg:set_message_annotations(Anns, Msg);
 | 
			
		||||
    maps:fold(fun(K, V, Acc) ->
 | 
			
		||||
                      mc:set_annotation(K, V, Acc)
 | 
			
		||||
              end, Msg, Anns);
 | 
			
		||||
add_forward_headers(_, Msg) -> Msg.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -74,7 +74,9 @@ amqp_encoded_data_list(_Config) ->
 | 
			
		|||
            #'v1_0.data'{content = <<"one">>},
 | 
			
		||||
            #'v1_0.data'{content = <<"two">>}
 | 
			
		||||
           ],
 | 
			
		||||
    Msg = amqp10_msg:new(55, Body),
 | 
			
		||||
    [_Transfer | Sections] = amqp10_msg:to_amqp_records(amqp10_msg:new(<<"55">>, Body)),
 | 
			
		||||
    Bin = iolist_to_binary([amqp10_framing:encode_bin(S) || S <- Sections]),
 | 
			
		||||
    Msg = amqp10_raw_msg:new(true, 55, Bin),
 | 
			
		||||
    rabbit_amqp10_shovel:handle_source({amqp10_msg, linkref, Msg}, State),
 | 
			
		||||
 | 
			
		||||
    ?assert(meck:validate(rabbit_shovel_behaviour)),
 | 
			
		||||
| 
						 | 
				
			
			@ -91,8 +93,11 @@ amqp_encoded_amqp_value(_Config) ->
 | 
			
		|||
    State = #{source => #{},
 | 
			
		||||
              dest => #{module => rabbit_amqp10_shovel},
 | 
			
		||||
              ack_mode => no_ack},
 | 
			
		||||
 | 
			
		||||
    Body = #'v1_0.amqp_value'{content = {utf8, <<"hi">>}},
 | 
			
		||||
    Msg = amqp10_msg:new(55, Body),
 | 
			
		||||
    [_Transfer | Sections] = amqp10_msg:to_amqp_records(amqp10_msg:new(<<"55">>, Body)),
 | 
			
		||||
    Bin = iolist_to_binary([amqp10_framing:encode_bin(S) || S <- Sections]),
 | 
			
		||||
    Msg = amqp10_raw_msg:new(true, 55, Bin),
 | 
			
		||||
    rabbit_amqp10_shovel:handle_source({amqp10_msg, linkref, Msg}, State),
 | 
			
		||||
 | 
			
		||||
    ?assert(meck:validate(rabbit_shovel_behaviour)),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue