Fall back to v1 direct reply-to encoding

This commit is contained in:
Michael Klishin 2021-02-24 20:03:02 +03:00
parent 00b7a84191
commit a11f98ccd8
No known key found for this signature in database
GPG Key ID: E80EDCFA0CDB21EE
2 changed files with 35 additions and 14 deletions

View File

@ -298,12 +298,21 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
-spec deliver_reply(binary(), rabbit_types:delivery()) -> 'ok'.
deliver_reply(<<"amq.rabbitmq.reply-to.", Rest/binary>>, Delivery) ->
case rabbit_direct_reply_to:decode_reply_to_v2(Rest, rabbit_nodes:all_running_with_hashes()) of
deliver_reply(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>, Delivery) ->
case rabbit_direct_reply_to:decode_reply_to_v2(EncodedBin, rabbit_nodes:all_running_with_hashes()) of
{ok, Pid, Key} ->
delegate:invoke_no_result(
Pid, {?MODULE, deliver_reply_local, [Key, Delivery]});
error ->
delegate:invoke_no_result(Pid, {?MODULE, deliver_reply_local, [Key, Delivery]});
{error, _} ->
deliver_reply_v1(EncodedBin, Delivery)
end.
-spec deliver_reply_v1(binary(), rabbit_types:delivery()) -> 'ok'.
deliver_reply_v1(EncodedBin, Delivery) ->
%% the the original encoding function
case rabbit_direct_reply_to:decode_reply_to_v1(EncodedBin) of
{ok, V1Pid, V1Key} ->
delegate:invoke_no_result(V1Pid, {?MODULE, deliver_reply_local, [V1Key, Delivery]});
{error, _} ->
ok
end.
@ -320,19 +329,31 @@ deliver_reply_local(Pid, Key, Delivery) ->
declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) ->
exists;
declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", Rest/binary>>) ->
case rabbit_direct_reply_to:decode_reply_to_v2(Rest, rabbit_nodes:all_running_with_hashes()) of
declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) ->
case rabbit_direct_reply_to:decode_reply_to_v2(EncodedBin, rabbit_nodes:all_running_with_hashes()) of
{error, _} ->
declare_fast_reply_to_v1(EncodedBin);
{ok, Pid, Key} ->
Msg = {declare_fast_reply_to, Key},
rabbit_misc:with_exit_handler(
rabbit_misc:const(not_found),
fun() -> gen_server2:call(Pid, Msg, infinity) end);
error ->
not_found
fun() -> gen_server2:call(Pid, Msg, infinity) end)
end;
declare_fast_reply_to(_) ->
not_found.
declare_fast_reply_to_v1(EncodedBin) ->
%% the the original encoding function
case rabbit_direct_reply_to:decode_reply_to_v1(EncodedBin) of
{error, _} ->
not_found;
{ok, V1Pid, V1Key} ->
Msg = {declare_fast_reply_to, V1Key},
rabbit_misc:with_exit_handler(
rabbit_misc:const(not_found),
fun() -> gen_server2:call(V1Pid, Msg, infinity) end)
end.
-spec send_credit_reply(pid(), non_neg_integer()) -> 'ok'.
send_credit_reply(Pid, Len) ->

View File

@ -22,7 +22,7 @@
%% API
%%
-type decoded_pid_and_key() :: {ok, pid(), binary()} | error.
-type decoded_pid_and_key() :: {ok, pid(), binary()} | {error, any()}.
-spec compute_key_and_suffix_v1(pid()) -> {binary(), binary()}.
%% This original pid encoding function produces values that exceed routing key length limit
@ -38,7 +38,7 @@ decode_reply_to_v1(Bin) ->
case string:tokens(binary_to_list(Bin), ".") of
[PidEnc, Key] -> Pid = binary_to_term(base64:decode(PidEnc)),
{ok, Pid, Key};
_ -> error
_ -> {error, encoding_failed}
end.
@ -55,7 +55,7 @@ compute_key_and_suffix_v2(Pid) ->
%%
%% We also use a synthetic node prefix because the hash alone will be sufficient to
NodeHash = erlang:phash2(Node),
PidParts = maps:update(node, rabbit_nodes_common:make("generated", NodeHash), PidParts0),
PidParts = maps:update(node, rabbit_nodes_common:make("reply", integer_to_list(NodeHash)), PidParts0),
RecomposedEncoded = base64:encode(pid_recomposition:to_binary(PidParts)),
Suffix = <<RecomposedEncoded/binary, ".", Key/binary>>,
@ -74,5 +74,5 @@ decode_reply_to_v2(Bin, CandidateNodes) ->
PidParts = maps:update(node, Candidate, PidParts0),
{ok, pid_recomposition:recompose(PidParts), Key}
end;
_ -> error
_ -> {error, encoding_failed}
end.