From a11f98ccd8d3d71a9ad215820c4152151c1b28b1 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 24 Feb 2021 20:03:02 +0300 Subject: [PATCH] Fall back to v1 direct reply-to encoding --- deps/rabbit/src/rabbit_channel.erl | 41 ++++++++++++++++------ deps/rabbit/src/rabbit_direct_reply_to.erl | 8 ++--- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 8c4f31dbe5..26932753d0 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -298,12 +298,21 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> -spec deliver_reply(binary(), rabbit_types:delivery()) -> 'ok'. -deliver_reply(<<"amq.rabbitmq.reply-to.", Rest/binary>>, Delivery) -> - case rabbit_direct_reply_to:decode_reply_to_v2(Rest, rabbit_nodes:all_running_with_hashes()) of +deliver_reply(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>, Delivery) -> + case rabbit_direct_reply_to:decode_reply_to_v2(EncodedBin, rabbit_nodes:all_running_with_hashes()) of {ok, Pid, Key} -> - delegate:invoke_no_result( - Pid, {?MODULE, deliver_reply_local, [Key, Delivery]}); - error -> + delegate:invoke_no_result(Pid, {?MODULE, deliver_reply_local, [Key, Delivery]}); + {error, _} -> + deliver_reply_v1(EncodedBin, Delivery) + end. + +-spec deliver_reply_v1(binary(), rabbit_types:delivery()) -> 'ok'. +deliver_reply_v1(EncodedBin, Delivery) -> + %% the the original encoding function + case rabbit_direct_reply_to:decode_reply_to_v1(EncodedBin) of + {ok, V1Pid, V1Key} -> + delegate:invoke_no_result(V1Pid, {?MODULE, deliver_reply_local, [V1Key, Delivery]}); + {error, _} -> ok end. @@ -320,19 +329,31 @@ deliver_reply_local(Pid, Key, Delivery) -> declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) -> exists; -declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", Rest/binary>>) -> - case rabbit_direct_reply_to:decode_reply_to_v2(Rest, rabbit_nodes:all_running_with_hashes()) of +declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) -> + case rabbit_direct_reply_to:decode_reply_to_v2(EncodedBin, rabbit_nodes:all_running_with_hashes()) of + {error, _} -> + declare_fast_reply_to_v1(EncodedBin); {ok, Pid, Key} -> Msg = {declare_fast_reply_to, Key}, rabbit_misc:with_exit_handler( rabbit_misc:const(not_found), - fun() -> gen_server2:call(Pid, Msg, infinity) end); - error -> - not_found + fun() -> gen_server2:call(Pid, Msg, infinity) end) end; declare_fast_reply_to(_) -> not_found. +declare_fast_reply_to_v1(EncodedBin) -> + %% the the original encoding function + case rabbit_direct_reply_to:decode_reply_to_v1(EncodedBin) of + {error, _} -> + not_found; + {ok, V1Pid, V1Key} -> + Msg = {declare_fast_reply_to, V1Key}, + rabbit_misc:with_exit_handler( + rabbit_misc:const(not_found), + fun() -> gen_server2:call(V1Pid, Msg, infinity) end) + end. + -spec send_credit_reply(pid(), non_neg_integer()) -> 'ok'. send_credit_reply(Pid, Len) -> diff --git a/deps/rabbit/src/rabbit_direct_reply_to.erl b/deps/rabbit/src/rabbit_direct_reply_to.erl index fee516e5cd..6d8fc2bb50 100644 --- a/deps/rabbit/src/rabbit_direct_reply_to.erl +++ b/deps/rabbit/src/rabbit_direct_reply_to.erl @@ -22,7 +22,7 @@ %% API %% --type decoded_pid_and_key() :: {ok, pid(), binary()} | error. +-type decoded_pid_and_key() :: {ok, pid(), binary()} | {error, any()}. -spec compute_key_and_suffix_v1(pid()) -> {binary(), binary()}. %% This original pid encoding function produces values that exceed routing key length limit @@ -38,7 +38,7 @@ decode_reply_to_v1(Bin) -> case string:tokens(binary_to_list(Bin), ".") of [PidEnc, Key] -> Pid = binary_to_term(base64:decode(PidEnc)), {ok, Pid, Key}; - _ -> error + _ -> {error, encoding_failed} end. @@ -55,7 +55,7 @@ compute_key_and_suffix_v2(Pid) -> %% %% We also use a synthetic node prefix because the hash alone will be sufficient to NodeHash = erlang:phash2(Node), - PidParts = maps:update(node, rabbit_nodes_common:make("generated", NodeHash), PidParts0), + PidParts = maps:update(node, rabbit_nodes_common:make("reply", integer_to_list(NodeHash)), PidParts0), RecomposedEncoded = base64:encode(pid_recomposition:to_binary(PidParts)), Suffix = <>, @@ -74,5 +74,5 @@ decode_reply_to_v2(Bin, CandidateNodes) -> PidParts = maps:update(node, Candidate, PidParts0), {ok, pid_recomposition:recompose(PidParts), Key} end; - _ -> error + _ -> {error, encoding_failed} end.