355 lines
13 KiB
Erlang
355 lines
13 KiB
Erlang
%% This Source Code Form is subject to the terms of the Mozilla Public
|
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
%%
|
|
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
|
|
%%
|
|
|
|
-module(rabbit_basic).
|
|
-include_lib("rabbit_common/include/rabbit.hrl").
|
|
-include_lib("rabbit_common/include/rabbit_framing.hrl").
|
|
|
|
-export([publish/4, publish/5, publish/1,
|
|
message/3, message/4, properties/1, prepend_table_header/3,
|
|
extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
|
|
header_routes/1, parse_expiration/1, header/2, header/3]).
|
|
-export([build_content/2, from_content/1, msg_size/1,
|
|
maybe_gc_large_msg/1, maybe_gc_large_msg/2]).
|
|
-export([add_header/4,
|
|
peek_fmt_message/1]).
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
-type properties_input() ::
|
|
rabbit_framing:amqp_property_record() | [{atom(), any()}].
|
|
-type publish_result() ::
|
|
ok | rabbit_types:error('not_found').
|
|
-type header() :: any().
|
|
-type headers() :: rabbit_framing:amqp_table() | 'undefined'.
|
|
|
|
-type exchange_input() :: rabbit_types:exchange() | rabbit_exchange:name().
|
|
-type body_input() :: binary() | [binary()].
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
%% Convenience function, for avoiding round-trips in calls across the
|
|
%% erlang distributed network.
|
|
|
|
-spec publish
|
|
(exchange_input(), rabbit_router:routing_key(), properties_input(),
|
|
body_input()) ->
|
|
publish_result().
|
|
|
|
publish(Exchange, RoutingKeyBin, Properties, Body) ->
|
|
publish(Exchange, RoutingKeyBin, false, Properties, Body).
|
|
|
|
%% Convenience function, for avoiding round-trips in calls across the
|
|
%% erlang distributed network.
|
|
|
|
-spec publish
|
|
(exchange_input(), rabbit_router:routing_key(), boolean(),
|
|
properties_input(), body_input()) ->
|
|
publish_result().
|
|
|
|
publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) ->
|
|
Message = message(XName, RKey, properties(Props), Body),
|
|
publish(X, delivery(Mandatory, false, Message, undefined));
|
|
publish(XName, RKey, Mandatory, Props, Body) ->
|
|
Message = message(XName, RKey, properties(Props), Body),
|
|
publish(delivery(Mandatory, false, Message, undefined)).
|
|
|
|
-spec publish(rabbit_types:delivery()) -> publish_result().
|
|
|
|
publish(Delivery = #delivery{
|
|
message = #basic_message{exchange_name = XName}}) ->
|
|
case rabbit_exchange:lookup(XName) of
|
|
{ok, X} -> publish(X, Delivery);
|
|
Err -> Err
|
|
end.
|
|
|
|
publish(X, Delivery) ->
|
|
Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)),
|
|
_ = rabbit_queue_type:deliver(Qs, Delivery, stateless),
|
|
ok.
|
|
|
|
-spec delivery
|
|
(boolean(), boolean(), rabbit_types:message(), undefined | integer()) ->
|
|
rabbit_types:delivery().
|
|
|
|
delivery(Mandatory, Confirm, Message, MsgSeqNo) ->
|
|
#delivery{mandatory = Mandatory, confirm = Confirm, sender = self(),
|
|
message = Message, msg_seq_no = MsgSeqNo, flow = noflow}.
|
|
|
|
-spec build_content
|
|
(rabbit_framing:amqp_property_record(), binary() | [binary()]) ->
|
|
rabbit_types:content().
|
|
|
|
build_content(Properties, BodyBin) when is_binary(BodyBin) ->
|
|
build_content(Properties, [BodyBin]);
|
|
|
|
build_content(Properties, PFR) ->
|
|
%% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
|
|
{ClassId, _MethodId} =
|
|
rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
|
|
#content{class_id = ClassId,
|
|
properties = Properties,
|
|
properties_bin = none,
|
|
protocol = none,
|
|
payload_fragments_rev = PFR}.
|
|
|
|
-spec from_content
|
|
(rabbit_types:content()) ->
|
|
{rabbit_framing:amqp_property_record(), binary()}.
|
|
|
|
from_content(Content) ->
|
|
#content{class_id = ClassId,
|
|
properties = Props,
|
|
payload_fragments_rev = FragmentsRev} =
|
|
rabbit_binary_parser:ensure_content_decoded(Content),
|
|
%% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
|
|
{ClassId, _MethodId} =
|
|
rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
|
|
{Props, list_to_binary(lists:reverse(FragmentsRev))}.
|
|
|
|
%% This breaks the spec rule forbidding message modification
|
|
strip_header(#content{properties = #'P_basic'{headers = undefined}}
|
|
= DecodedContent, _Key) ->
|
|
DecodedContent;
|
|
strip_header(#content{properties = Props = #'P_basic'{headers = Headers}}
|
|
= DecodedContent, Key) ->
|
|
case lists:keysearch(Key, 1, Headers) of
|
|
false -> DecodedContent;
|
|
{value, Found} -> Headers0 = lists:delete(Found, Headers),
|
|
rabbit_binary_generator:clear_encoded_content(
|
|
DecodedContent#content{
|
|
properties = Props#'P_basic'{
|
|
headers = Headers0}})
|
|
end.
|
|
|
|
-spec message
|
|
(rabbit_exchange:name(), rabbit_router:routing_key(),
|
|
rabbit_types:decoded_content()) ->
|
|
rabbit_types:ok_or_error2(rabbit_types:message(), any()).
|
|
|
|
message(XName, RoutingKey, #content{properties = Props} = DecodedContent) ->
|
|
try
|
|
{ok, #basic_message{
|
|
exchange_name = XName,
|
|
content = strip_header(DecodedContent, ?DELETED_HEADER),
|
|
id = rabbit_guid:gen(),
|
|
is_persistent = is_message_persistent(DecodedContent),
|
|
routing_keys = [RoutingKey |
|
|
header_routes(Props#'P_basic'.headers)]}}
|
|
catch
|
|
{error, _Reason} = Error -> Error
|
|
end.
|
|
|
|
-spec message
|
|
(rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(),
|
|
binary()) ->
|
|
rabbit_types:message().
|
|
|
|
message(XName, RoutingKey, RawProperties, Body) ->
|
|
Properties = properties(RawProperties),
|
|
Content = build_content(Properties, Body),
|
|
{ok, Msg} = message(XName, RoutingKey, Content),
|
|
Msg.
|
|
|
|
-spec properties
|
|
(properties_input()) -> rabbit_framing:amqp_property_record().
|
|
|
|
properties(P = #'P_basic'{}) ->
|
|
P;
|
|
properties(P) when is_list(P) ->
|
|
%% Yes, this is O(length(P) * record_info(size, 'P_basic') / 2),
|
|
%% i.e. slow. Use the definition of 'P_basic' directly if
|
|
%% possible!
|
|
lists:foldl(fun ({Key, Value}, Acc) ->
|
|
case indexof(record_info(fields, 'P_basic'), Key) of
|
|
0 -> throw({unknown_basic_property, Key});
|
|
N -> setelement(N + 1, Acc, Value)
|
|
end
|
|
end, #'P_basic'{}, P).
|
|
|
|
-spec prepend_table_header
|
|
(binary(), rabbit_framing:amqp_table(), headers()) -> headers().
|
|
|
|
prepend_table_header(Name, Info, undefined) ->
|
|
prepend_table_header(Name, Info, []);
|
|
prepend_table_header(Name, Info, Headers) ->
|
|
case rabbit_misc:table_lookup(Headers, Name) of
|
|
{array, Existing} ->
|
|
prepend_table(Name, Info, Existing, Headers);
|
|
undefined ->
|
|
prepend_table(Name, Info, [], Headers);
|
|
Other ->
|
|
Headers2 = prepend_table(Name, Info, [], Headers),
|
|
set_invalid_header(Name, Other, Headers2)
|
|
end.
|
|
|
|
prepend_table(Name, Info, Prior, Headers) ->
|
|
rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]).
|
|
|
|
set_invalid_header(Name, {_, _}=Value, Headers) when is_list(Headers) ->
|
|
case rabbit_misc:table_lookup(Headers, ?INVALID_HEADERS_KEY) of
|
|
undefined ->
|
|
set_invalid([{Name, array, [Value]}], Headers);
|
|
{table, ExistingHdr} ->
|
|
update_invalid(Name, Value, ExistingHdr, Headers);
|
|
Other ->
|
|
%% somehow the x-invalid-headers header is corrupt
|
|
Invalid = [{?INVALID_HEADERS_KEY, array, [Other]}],
|
|
set_invalid_header(Name, Value, set_invalid(Invalid, Headers))
|
|
end.
|
|
|
|
set_invalid(NewHdr, Headers) ->
|
|
rabbit_misc:set_table_value(Headers, ?INVALID_HEADERS_KEY, table, NewHdr).
|
|
|
|
update_invalid(Name, Value, ExistingHdr, Header) ->
|
|
Values = case rabbit_misc:table_lookup(ExistingHdr, Name) of
|
|
undefined -> [Value];
|
|
{array, Prior} -> [Value | Prior]
|
|
end,
|
|
NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values),
|
|
set_invalid(NewHdr, Header).
|
|
|
|
-spec header(header(), headers()) -> 'undefined' | any().
|
|
|
|
header(_Header, undefined) ->
|
|
undefined;
|
|
header(_Header, []) ->
|
|
undefined;
|
|
header(Header, Headers) ->
|
|
header(Header, Headers, undefined).
|
|
|
|
-spec header(header(), headers(), any()) -> 'undefined' | any().
|
|
|
|
header(Header, Headers, Default) ->
|
|
case lists:keysearch(Header, 1, Headers) of
|
|
false -> Default;
|
|
{value, Val} -> Val
|
|
end.
|
|
|
|
-spec extract_headers(rabbit_types:content()) -> headers().
|
|
|
|
extract_headers(Content) ->
|
|
#content{properties = #'P_basic'{headers = Headers}} =
|
|
rabbit_binary_parser:ensure_content_decoded(Content),
|
|
Headers.
|
|
|
|
extract_timestamp(Content) ->
|
|
#content{properties = #'P_basic'{timestamp = Timestamp}} =
|
|
rabbit_binary_parser:ensure_content_decoded(Content),
|
|
Timestamp.
|
|
|
|
-spec map_headers
|
|
(fun((headers()) -> headers()), rabbit_types:content()) ->
|
|
rabbit_types:content().
|
|
|
|
map_headers(F, Content) ->
|
|
Content1 = rabbit_binary_parser:ensure_content_decoded(Content),
|
|
#content{properties = #'P_basic'{headers = Headers} = Props} = Content1,
|
|
Headers1 = F(Headers),
|
|
rabbit_binary_generator:clear_encoded_content(
|
|
Content1#content{properties = Props#'P_basic'{headers = Headers1}}).
|
|
|
|
indexof(L, Element) -> indexof(L, Element, 1).
|
|
|
|
indexof([], _Element, _N) -> 0;
|
|
indexof([Element | _Rest], Element, N) -> N;
|
|
indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
|
|
|
|
is_message_persistent(#content{properties = #'P_basic'{
|
|
delivery_mode = Mode}}) ->
|
|
case Mode of
|
|
1 -> false;
|
|
2 -> true;
|
|
undefined -> false;
|
|
Other -> throw({error, {delivery_mode_unknown, Other}})
|
|
end.
|
|
|
|
%% Extract CC routes from headers
|
|
|
|
-spec header_routes(undefined | rabbit_framing:amqp_table()) -> [string()].
|
|
|
|
header_routes(undefined) ->
|
|
[];
|
|
header_routes(HeadersTable) ->
|
|
lists:append(
|
|
[case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of
|
|
{array, Routes} -> [Route || {longstr, Route} <- Routes];
|
|
undefined -> [];
|
|
{Type, _Val} -> throw({error, {unacceptable_type_in_header,
|
|
binary_to_list(HeaderKey), Type}})
|
|
end || HeaderKey <- ?ROUTING_HEADERS]).
|
|
|
|
-spec parse_expiration
|
|
(rabbit_framing:amqp_property_record()) ->
|
|
rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any()).
|
|
|
|
parse_expiration(#'P_basic'{expiration = undefined}) ->
|
|
{ok, undefined};
|
|
parse_expiration(#'P_basic'{expiration = Expiration}) ->
|
|
case string:to_integer(binary_to_list(Expiration)) of
|
|
{error, no_integer} = E ->
|
|
E;
|
|
{N, ""} ->
|
|
case rabbit_misc:check_expiry(N) of
|
|
ok -> {ok, N};
|
|
E = {error, _} -> E
|
|
end;
|
|
{_, S} ->
|
|
{error, {leftover_string, S}}
|
|
end.
|
|
|
|
maybe_gc_large_msg(Content) ->
|
|
rabbit_writer:maybe_gc_large_msg(Content).
|
|
|
|
maybe_gc_large_msg(Content, undefined) ->
|
|
rabbit_writer:msg_size(Content);
|
|
maybe_gc_large_msg(Content, GCThreshold) ->
|
|
rabbit_writer:maybe_gc_large_msg(Content, GCThreshold).
|
|
|
|
msg_size(Content) ->
|
|
rabbit_writer:msg_size(Content).
|
|
|
|
add_header(Name, Type, Value, #basic_message{content = Content0} = Msg) ->
|
|
Content = rabbit_basic:map_headers(
|
|
fun(undefined) ->
|
|
rabbit_misc:set_table_value([], Name, Type, Value);
|
|
(Headers) ->
|
|
rabbit_misc:set_table_value(Headers, Name, Type, Value)
|
|
end, Content0),
|
|
Msg#basic_message{content = Content}.
|
|
|
|
peek_fmt_message(#basic_message{exchange_name = Ex,
|
|
routing_keys = RKeys,
|
|
content =
|
|
#content{payload_fragments_rev = Payl0,
|
|
properties = Props}}) ->
|
|
Fields = [atom_to_binary(F, utf8) || F <- record_info(fields, 'P_basic')],
|
|
T = lists:zip(Fields, tl(tuple_to_list(Props))),
|
|
lists:foldl(
|
|
fun ({<<"headers">>, Hdrs}, Acc) ->
|
|
case Hdrs of
|
|
[] ->
|
|
Acc;
|
|
_ ->
|
|
Acc ++ [{header_key(H), V} || {H, _T, V} <- Hdrs]
|
|
end;
|
|
({_, undefined}, Acc) ->
|
|
Acc;
|
|
(KV, Acc) ->
|
|
[KV | Acc]
|
|
end, [], [{<<"payload (max 64 bytes)">>,
|
|
%% restric payload to 64 bytes
|
|
binary_prefix_64(iolist_to_binary(lists:reverse(Payl0)), 64)},
|
|
{<<"exchange">>, Ex#resource.name},
|
|
{<<"routing_keys">>, RKeys} | T]).
|
|
|
|
header_key(A) ->
|
|
<<"header.", A/binary>>.
|
|
|
|
binary_prefix_64(Bin, Len) ->
|
|
binary:part(Bin, 0, min(byte_size(Bin), Len)).
|