Ensure safe amqp client call timeouts on connection

establishment. This guarantees that the effective
call timeouts are always safe, i.e. granting enough
time to the underlying network operations, which must
always timeout first in case of any unexpected
lingering operations leading to timeouts. This eliminates
the chance of leaking connection processes when call
timeouts elapse, while underlying remote cconnection
establishment call was still taking place.
This commit is contained in:
Ayanda-D 2020-11-26 17:02:43 +00:00
parent 482309718b
commit 5c469ed519
3 changed files with 40 additions and 1 deletions

View File

@ -28,3 +28,6 @@
{<<"authentication_failure_close">>, bool, true}]). {<<"authentication_failure_close">>, bool, true}]).
-define(WAIT_FOR_CONFIRMS_TIMEOUT, {60000, millisecond}). -define(WAIT_FOR_CONFIRMS_TIMEOUT, {60000, millisecond}).
-define(DIRECT_OPERATION_TIMEOUT, 120000).
-define(CALL_TIMEOUT_DEVIATION, 10000).

View File

@ -170,6 +170,7 @@ start(AmqpParams, ConnName) when ConnName == undefined; is_binary(ConnName) ->
end, end,
AmqpParams2 = set_connection_name(ConnName, AmqpParams1), AmqpParams2 = set_connection_name(ConnName, AmqpParams1),
AmqpParams3 = amqp_ssl:maybe_enhance_ssl_options(AmqpParams2), AmqpParams3 = amqp_ssl:maybe_enhance_ssl_options(AmqpParams2),
ok = ensure_safe_call_timeout(AmqpParams3, amqp_util:call_timeout()),
{ok, _Sup, Connection} = amqp_sup:start_connection_sup(AmqpParams3), {ok, _Sup, Connection} = amqp_sup:start_connection_sup(AmqpParams3),
amqp_gen_connection:connect(Connection). amqp_gen_connection:connect(Connection).
@ -393,3 +394,30 @@ connection_name(ConnectionPid) ->
{<<"connection_name">>, _, ConnName} -> ConnName; {<<"connection_name">>, _, ConnName} -> ConnName;
false -> undefined false -> undefined
end. end.
ensure_safe_call_timeout(#amqp_params_network{connection_timeout = ConnTimeout}, CallTimeout) ->
maybe_update_call_timeout(ConnTimeout, CallTimeout);
ensure_safe_call_timeout(#amqp_params_direct{}, CallTimeout) ->
case net_kernel:get_net_ticktime() of
NetTicktime when is_integer(NetTicktime) ->
maybe_update_call_timeout(tick_or_direct_timeout(NetTicktime * 1000),
CallTimeout);
{ongoing_change_to, NetTicktime} ->
maybe_update_call_timeout(tick_or_direct_timeout(NetTicktime * 1000),
CallTimeout);
ignore ->
maybe_update_call_timeout(?DIRECT_OPERATION_TIMEOUT, CallTimeout)
end.
maybe_update_call_timeout(BaseTimeout, CallTimeout)
when is_integer(BaseTimeout), CallTimeout > BaseTimeout ->
ok;
maybe_update_call_timeout(BaseTimeout, CallTimeout) ->
EffectiveSafeCallTimeout = amqp_util:safe_call_timeout(BaseTimeout),
?LOG_WARN("AMQP client call timeout was ~p millseconds, is updated to a safe effective "
"value of ~p millseconds", [CallTimeout, EffectiveSafeCallTimeout]),
amqp_util:update_call_timeout(EffectiveSafeCallTimeout),
ok.
tick_or_direct_timeout(Timeout) when Timeout >= ?DIRECT_OPERATION_TIMEOUT -> Timeout;
tick_or_direct_timeout(_Timeout) -> ?DIRECT_OPERATION_TIMEOUT.

View File

@ -2,7 +2,7 @@
-include("amqp_client_internal.hrl"). -include("amqp_client_internal.hrl").
-export([call_timeout/0]). -export([call_timeout/0, update_call_timeout/1, safe_call_timeout/1]).
call_timeout() -> call_timeout() ->
case get(gen_server_call_timeout) of case get(gen_server_call_timeout) of
@ -15,3 +15,11 @@ call_timeout() ->
Timeout -> Timeout ->
Timeout Timeout
end. end.
update_call_timeout(Timeout) ->
application:set_env(amqp_client, gen_server_call_timeout, Timeout),
put(gen_server_call_timeout, Timeout),
ok.
safe_call_timeout(Threshold) ->
Threshold + ?CALL_TIMEOUT_DEVIATION.