From 00b7a841911d095912011810d1e0a1c6d4ee63b7 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 24 Feb 2021 17:36:53 +0300 Subject: [PATCH] Limit direct reply-to identifier length growth as node names grow. Prior to this change, direct reply-to consumer channels were encoded using term_to_binary/1, which means the result would grow together with node name (since node name is one of the components of an Erlang pid type). This means that with long enough hostnames, reply-to identifiers could overflow the 255 character limit of message property field type, longstr. With this change, the encoded value uses a hash of the node name and then locates the actual node name from a map of hashes to current cluster members. In addition, instead of generating non-predictable "secure" GUIDs the feature now generates "regular" predictable GUIDs which compensates some of the additional PID pre- and post-processing outlined above. --- deps/rabbit/src/pid_recomposition.erl | 58 +++++++++++++++++++ deps/rabbit/src/rabbit_channel.erl | 6 +- deps/rabbit/src/rabbit_direct_reply_to.erl | 55 ++++++++++++++++-- deps/rabbit/src/rabbit_nodes.erl | 5 ++ .../rabbit_common/src/rabbit_nodes_common.erl | 5 +- 5 files changed, 120 insertions(+), 9 deletions(-) create mode 100644 deps/rabbit/src/pid_recomposition.erl diff --git a/deps/rabbit/src/pid_recomposition.erl b/deps/rabbit/src/pid_recomposition.erl new file mode 100644 index 0000000000..e13c66a6af --- /dev/null +++ b/deps/rabbit/src/pid_recomposition.erl @@ -0,0 +1,58 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(pid_recomposition). + + +%% API +-export([ + to_binary/1, + from_binary/1, + decompose/1, + recompose/1 +]). + +-define(TTB_PREFIX, 131). + +-define(NEW_PID_EXT, 88). +-define(ATOM_UTF8_EXT, 118). +-define(SMALL_ATOM_UTF8_EXT, 119). + +%% +%% API +%% + +-spec decompose(pid()) -> #{atom() => any()}. +decompose(Pid) -> + from_binary(term_to_binary(Pid, [{minor_version, 2}])). + +-spec from_binary(binary()) -> #{atom() => any()}. +from_binary(Bin) -> + <> = Bin, + {Node, Rest2} = case PidData of + <> -> + {Node0, Rest1}; + <> -> + {Node0, Rest1} + end, + <> = Rest2, + #{ + node => binary_to_atom(Node), + id => ID, + serial => Serial, + creation => Creation + }. + +-spec to_binary(#{atom() => any()}) -> binary(). +to_binary(#{node := Node, id := ID, serial := Serial, creation := Creation}) -> + BinNode = atom_to_binary(Node), + NodeLen = byte_size(BinNode), + <>. + +-spec recompose(#{atom() => any()}) -> pid(). +recompose(M) -> + binary_to_term(to_binary(M)). diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index b80c790901..8c4f31dbe5 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -299,7 +299,7 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> -spec deliver_reply(binary(), rabbit_types:delivery()) -> 'ok'. deliver_reply(<<"amq.rabbitmq.reply-to.", Rest/binary>>, Delivery) -> - case rabbit_direct_reply_to:decode_reply_to_v1(Rest) of + case rabbit_direct_reply_to:decode_reply_to_v2(Rest, rabbit_nodes:all_running_with_hashes()) of {ok, Pid, Key} -> delegate:invoke_no_result( Pid, {?MODULE, deliver_reply_local, [Key, Delivery]}); @@ -321,7 +321,7 @@ deliver_reply_local(Pid, Key, Delivery) -> declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) -> exists; declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", Rest/binary>>) -> - case rabbit_direct_reply_to:decode_reply_to_v1(Rest) of + case rabbit_direct_reply_to:decode_reply_to_v2(Rest, rabbit_nodes:all_running_with_hashes()) of {ok, Pid, Key} -> Msg = {declare_fast_reply_to, Key}, rabbit_misc:with_exit_handler( @@ -1378,7 +1378,7 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>, Other -> Other end, %% Precalculate both suffix and key - {Key, Suffix} = rabbit_direct_reply_to:compute_key_and_suffix_v1(self()), + {Key, Suffix} = rabbit_direct_reply_to:compute_key_and_suffix_v2(self()), rabbit_log:debug("amq.rabbitmq.reply-to key: ~p, suffix: ~p", [Key, Suffix]), Consumer = {CTag, Suffix, binary_to_list(Key)}, State1 = State#ch{reply_consumer = Consumer}, diff --git a/deps/rabbit/src/rabbit_direct_reply_to.erl b/deps/rabbit/src/rabbit_direct_reply_to.erl index e59127ecde..fee516e5cd 100644 --- a/deps/rabbit/src/rabbit_direct_reply_to.erl +++ b/deps/rabbit/src/rabbit_direct_reply_to.erl @@ -9,25 +9,70 @@ %% API -export([ + %% Original amq.rabbitmq.reply-to target channel encoding compute_key_and_suffix_v1/1, - decode_reply_to_v1/1 + decode_reply_to_v1/1, + + %% v2 amq.rabbitmq.reply-to target channel encoding + compute_key_and_suffix_v2/1, + decode_reply_to_v2/2 ]). %% %% API %% +-type decoded_pid_and_key() :: {ok, pid(), binary()} | error. + -spec compute_key_and_suffix_v1(pid()) -> {binary(), binary()}. +%% This original pid encoding function produces values that exceed routing key length limit +%% on nodes with long (say, 130+ characters) node names. compute_key_and_suffix_v1(Pid) -> - Key = base64:encode(rabbit_guid:gen_secure()), + Key = base64:encode(rabbit_guid:gen()), PidEnc = base64:encode(term_to_binary(Pid)), Suffix = <>, {Key, Suffix}. --spec decode_reply_to_v1(binary()) -> {ok, pid(), binary()} | error. -decode_reply_to_v1(Rest) -> - case string:tokens(binary_to_list(Rest), ".") of +-spec decode_reply_to_v1(binary()) -> decoded_pid_and_key(). +decode_reply_to_v1(Bin) -> + case string:tokens(binary_to_list(Bin), ".") of [PidEnc, Key] -> Pid = binary_to_term(base64:decode(PidEnc)), {ok, Pid, Key}; _ -> error end. + + +-spec compute_key_and_suffix_v2(pid()) -> {binary(), binary()}. +%% This pid encoding function produces values that are of mostly fixed size +%% regardless of the node name length. +compute_key_and_suffix_v2(Pid) -> + Key = base64:encode(rabbit_guid:gen()), + + PidParts0 = #{node := Node} = pid_recomposition:decompose(Pid), + %% Note: we hash the entire node name. This is sufficient for our needs of shortening node name + %% in the TTB-encoded pid, and helps avoid doing the node name split for every single cluster member + %% in rabbit_nodes:all_running_with_hashes/0. + %% + %% We also use a synthetic node prefix because the hash alone will be sufficient to + NodeHash = erlang:phash2(Node), + PidParts = maps:update(node, rabbit_nodes_common:make("generated", NodeHash), PidParts0), + RecomposedEncoded = base64:encode(pid_recomposition:to_binary(PidParts)), + + Suffix = <>, + {Key, Suffix}. + +-spec decode_reply_to_v2(binary(), #{non_neg_integer() => node()}) -> decoded_pid_and_key(). +decode_reply_to_v2(Bin, CandidateNodes) -> + case string:tokens(binary_to_list(Bin), ".") of + [PidEnc, Key] -> + RawPidBin = base64:decode(PidEnc), + PidParts0 = #{node := ShortenedNodename} = pid_recomposition:from_binary(RawPidBin), + {_, NodeHash} = rabbit_nodes_common:parts(ShortenedNodename), + case maps:get(list_to_integer(NodeHash), CandidateNodes, undefined) of + undefined -> error; + Candidate -> + PidParts = maps:update(node, Candidate, PidParts0), + {ok, pid_recomposition:recompose(PidParts), Key} + end; + _ -> error + end. diff --git a/deps/rabbit/src/rabbit_nodes.erl b/deps/rabbit/src/rabbit_nodes.erl index 3678a02338..39ba1e6e57 100644 --- a/deps/rabbit/src/rabbit_nodes.erl +++ b/deps/rabbit/src/rabbit_nodes.erl @@ -14,6 +14,7 @@ await_running_count/2, is_single_node_cluster/0, boot/0]). -export([persistent_cluster_id/0, seed_internal_cluster_id/0, seed_user_provided_cluster_name/0]). +-export([all_running_with_hashes/0]). -include_lib("kernel/include/inet.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). @@ -155,3 +156,7 @@ await_running_count_with_retries(TargetCount, Retries) -> timer:sleep(?SAMPLING_INTERVAL), await_running_count_with_retries(TargetCount, Retries - 1) end. + +-spec all_running_with_hashes() -> #{non_neg_integer() => node()}. +all_running_with_hashes() -> + maps:from_list([{erlang:phash2(Node), Node} || Node <- all_running()]). diff --git a/deps/rabbit_common/src/rabbit_nodes_common.erl b/deps/rabbit_common/src/rabbit_nodes_common.erl index 2af539bc47..ed4d14e013 100644 --- a/deps/rabbit_common/src/rabbit_nodes_common.erl +++ b/deps/rabbit_common/src/rabbit_nodes_common.erl @@ -18,10 +18,11 @@ %% API %% --export([make/1, parts/1, names/1, name_type/1, ensure_epmd/0, is_running/2, is_process_running/2]). +-export([make/1, make/2, parts/1, names/1, name_type/1, ensure_epmd/0, is_running/2, is_process_running/2]). -export([cookie_hash/0, epmd_port/0, diagnostics/1]). -spec make({string(), string()} | string()) -> node(). +-spec make(string(), string()) -> node(). -spec parts(node() | string()) -> {string(), string()}. -spec ensure_epmd() -> 'ok'. -spec epmd_port() -> string(). @@ -77,6 +78,8 @@ make({Prefix, Suffix}) -> rabbit_data_coercion:to_atom( rabbit_data_coercion:to_list(Suffix)])); make(NodeStr) -> make(parts(NodeStr)). +make(Prefix, Suffix) -> make({Prefix, Suffix}). + parts(Node) when is_atom(Node) -> parts(atom_to_list(Node)); parts(NodeStr) ->