Limit direct reply-to identifier length growth

as node names grow.

Prior to this change, direct reply-to consumer channels
were encoded using term_to_binary/1, which means the result
would grow together with node name (since node name
is one of the components of an Erlang pid type).

This means that with long enough hostnames, reply-to
identifiers could overflow the 255 character limit of
message property field type, longstr.

With this change, the encoded value uses a hash of the node name
and then locates the actual node name from a map of
hashes to current cluster members.

In addition, instead of generating non-predictable "secure"
GUIDs the feature now generates "regular" predictable GUIDs
which compensates some of the additional PID pre- and post-processing
outlined above.
This commit is contained in:
Michael Klishin 2021-02-24 17:36:53 +03:00
parent 129a57dcef
commit 00b7a84191
No known key found for this signature in database
GPG Key ID: E80EDCFA0CDB21EE
5 changed files with 120 additions and 9 deletions

58
deps/rabbit/src/pid_recomposition.erl vendored Normal file
View File

@ -0,0 +1,58 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(pid_recomposition).
%% API
-export([
to_binary/1,
from_binary/1,
decompose/1,
recompose/1
]).
-define(TTB_PREFIX, 131).
-define(NEW_PID_EXT, 88).
-define(ATOM_UTF8_EXT, 118).
-define(SMALL_ATOM_UTF8_EXT, 119).
%%
%% API
%%
-spec decompose(pid()) -> #{atom() => any()}.
decompose(Pid) ->
from_binary(term_to_binary(Pid, [{minor_version, 2}])).
-spec from_binary(binary()) -> #{atom() => any()}.
from_binary(Bin) ->
<<?TTB_PREFIX, ?NEW_PID_EXT, PidData/binary>> = Bin,
{Node, Rest2} = case PidData of
<<?ATOM_UTF8_EXT, AtomLen:16/integer, Node0:AtomLen/binary, Rest1/binary>> ->
{Node0, Rest1};
<<?SMALL_ATOM_UTF8_EXT, AtomLen/integer, Node0:AtomLen/binary, Rest1/binary>> ->
{Node0, Rest1}
end,
<<ID:32/integer, Serial:32/integer, Creation:32/integer>> = Rest2,
#{
node => binary_to_atom(Node),
id => ID,
serial => Serial,
creation => Creation
}.
-spec to_binary(#{atom() => any()}) -> binary().
to_binary(#{node := Node, id := ID, serial := Serial, creation := Creation}) ->
BinNode = atom_to_binary(Node),
NodeLen = byte_size(BinNode),
<<?TTB_PREFIX:8/unsigned, ?NEW_PID_EXT:8/unsigned, ?ATOM_UTF8_EXT:8/unsigned, NodeLen:16/unsigned, BinNode/binary, ID:32, Serial:32, Creation:32>>.
-spec recompose(#{atom() => any()}) -> pid().
recompose(M) ->
binary_to_term(to_binary(M)).

View File

@ -299,7 +299,7 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
-spec deliver_reply(binary(), rabbit_types:delivery()) -> 'ok'.
deliver_reply(<<"amq.rabbitmq.reply-to.", Rest/binary>>, Delivery) ->
case rabbit_direct_reply_to:decode_reply_to_v1(Rest) of
case rabbit_direct_reply_to:decode_reply_to_v2(Rest, rabbit_nodes:all_running_with_hashes()) of
{ok, Pid, Key} ->
delegate:invoke_no_result(
Pid, {?MODULE, deliver_reply_local, [Key, Delivery]});
@ -321,7 +321,7 @@ deliver_reply_local(Pid, Key, Delivery) ->
declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) ->
exists;
declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", Rest/binary>>) ->
case rabbit_direct_reply_to:decode_reply_to_v1(Rest) of
case rabbit_direct_reply_to:decode_reply_to_v2(Rest, rabbit_nodes:all_running_with_hashes()) of
{ok, Pid, Key} ->
Msg = {declare_fast_reply_to, Key},
rabbit_misc:with_exit_handler(
@ -1378,7 +1378,7 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
Other -> Other
end,
%% Precalculate both suffix and key
{Key, Suffix} = rabbit_direct_reply_to:compute_key_and_suffix_v1(self()),
{Key, Suffix} = rabbit_direct_reply_to:compute_key_and_suffix_v2(self()),
rabbit_log:debug("amq.rabbitmq.reply-to key: ~p, suffix: ~p", [Key, Suffix]),
Consumer = {CTag, Suffix, binary_to_list(Key)},
State1 = State#ch{reply_consumer = Consumer},

View File

@ -9,25 +9,70 @@
%% API
-export([
%% Original amq.rabbitmq.reply-to target channel encoding
compute_key_and_suffix_v1/1,
decode_reply_to_v1/1
decode_reply_to_v1/1,
%% v2 amq.rabbitmq.reply-to target channel encoding
compute_key_and_suffix_v2/1,
decode_reply_to_v2/2
]).
%%
%% API
%%
-type decoded_pid_and_key() :: {ok, pid(), binary()} | error.
-spec compute_key_and_suffix_v1(pid()) -> {binary(), binary()}.
%% This original pid encoding function produces values that exceed routing key length limit
%% on nodes with long (say, 130+ characters) node names.
compute_key_and_suffix_v1(Pid) ->
Key = base64:encode(rabbit_guid:gen_secure()),
Key = base64:encode(rabbit_guid:gen()),
PidEnc = base64:encode(term_to_binary(Pid)),
Suffix = <<PidEnc/binary, ".", Key/binary>>,
{Key, Suffix}.
-spec decode_reply_to_v1(binary()) -> {ok, pid(), binary()} | error.
decode_reply_to_v1(Rest) ->
case string:tokens(binary_to_list(Rest), ".") of
-spec decode_reply_to_v1(binary()) -> decoded_pid_and_key().
decode_reply_to_v1(Bin) ->
case string:tokens(binary_to_list(Bin), ".") of
[PidEnc, Key] -> Pid = binary_to_term(base64:decode(PidEnc)),
{ok, Pid, Key};
_ -> error
end.
-spec compute_key_and_suffix_v2(pid()) -> {binary(), binary()}.
%% This pid encoding function produces values that are of mostly fixed size
%% regardless of the node name length.
compute_key_and_suffix_v2(Pid) ->
Key = base64:encode(rabbit_guid:gen()),
PidParts0 = #{node := Node} = pid_recomposition:decompose(Pid),
%% Note: we hash the entire node name. This is sufficient for our needs of shortening node name
%% in the TTB-encoded pid, and helps avoid doing the node name split for every single cluster member
%% in rabbit_nodes:all_running_with_hashes/0.
%%
%% We also use a synthetic node prefix because the hash alone will be sufficient to
NodeHash = erlang:phash2(Node),
PidParts = maps:update(node, rabbit_nodes_common:make("generated", NodeHash), PidParts0),
RecomposedEncoded = base64:encode(pid_recomposition:to_binary(PidParts)),
Suffix = <<RecomposedEncoded/binary, ".", Key/binary>>,
{Key, Suffix}.
-spec decode_reply_to_v2(binary(), #{non_neg_integer() => node()}) -> decoded_pid_and_key().
decode_reply_to_v2(Bin, CandidateNodes) ->
case string:tokens(binary_to_list(Bin), ".") of
[PidEnc, Key] ->
RawPidBin = base64:decode(PidEnc),
PidParts0 = #{node := ShortenedNodename} = pid_recomposition:from_binary(RawPidBin),
{_, NodeHash} = rabbit_nodes_common:parts(ShortenedNodename),
case maps:get(list_to_integer(NodeHash), CandidateNodes, undefined) of
undefined -> error;
Candidate ->
PidParts = maps:update(node, Candidate, PidParts0),
{ok, pid_recomposition:recompose(PidParts), Key}
end;
_ -> error
end.

View File

@ -14,6 +14,7 @@
await_running_count/2, is_single_node_cluster/0,
boot/0]).
-export([persistent_cluster_id/0, seed_internal_cluster_id/0, seed_user_provided_cluster_name/0]).
-export([all_running_with_hashes/0]).
-include_lib("kernel/include/inet.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
@ -155,3 +156,7 @@ await_running_count_with_retries(TargetCount, Retries) ->
timer:sleep(?SAMPLING_INTERVAL),
await_running_count_with_retries(TargetCount, Retries - 1)
end.
-spec all_running_with_hashes() -> #{non_neg_integer() => node()}.
all_running_with_hashes() ->
maps:from_list([{erlang:phash2(Node), Node} || Node <- all_running()]).

View File

@ -18,10 +18,11 @@
%% API
%%
-export([make/1, parts/1, names/1, name_type/1, ensure_epmd/0, is_running/2, is_process_running/2]).
-export([make/1, make/2, parts/1, names/1, name_type/1, ensure_epmd/0, is_running/2, is_process_running/2]).
-export([cookie_hash/0, epmd_port/0, diagnostics/1]).
-spec make({string(), string()} | string()) -> node().
-spec make(string(), string()) -> node().
-spec parts(node() | string()) -> {string(), string()}.
-spec ensure_epmd() -> 'ok'.
-spec epmd_port() -> string().
@ -77,6 +78,8 @@ make({Prefix, Suffix}) -> rabbit_data_coercion:to_atom(
rabbit_data_coercion:to_list(Suffix)]));
make(NodeStr) -> make(parts(NodeStr)).
make(Prefix, Suffix) -> make({Prefix, Suffix}).
parts(Node) when is_atom(Node) ->
parts(atom_to_list(Node));
parts(NodeStr) ->