diff --git a/deps/amqp_client/include/amqp_client_internal.hrl b/deps/amqp_client/include/amqp_client_internal.hrl index 01e099097e..075f5f4a12 100644 --- a/deps/amqp_client/include/amqp_client_internal.hrl +++ b/deps/amqp_client/include/amqp_client_internal.hrl @@ -28,3 +28,6 @@ {<<"authentication_failure_close">>, bool, true}]). -define(WAIT_FOR_CONFIRMS_TIMEOUT, {60000, millisecond}). + +-define(DIRECT_OPERATION_TIMEOUT, 120000). +-define(CALL_TIMEOUT_DEVIATION, 10000). diff --git a/deps/amqp_client/src/amqp_connection.erl b/deps/amqp_client/src/amqp_connection.erl index 6800a44a3e..7aecda9108 100644 --- a/deps/amqp_client/src/amqp_connection.erl +++ b/deps/amqp_client/src/amqp_connection.erl @@ -170,6 +170,7 @@ start(AmqpParams, ConnName) when ConnName == undefined; is_binary(ConnName) -> end, AmqpParams2 = set_connection_name(ConnName, AmqpParams1), AmqpParams3 = amqp_ssl:maybe_enhance_ssl_options(AmqpParams2), + ok = ensure_safe_call_timeout(AmqpParams3, amqp_util:call_timeout()), {ok, _Sup, Connection} = amqp_sup:start_connection_sup(AmqpParams3), amqp_gen_connection:connect(Connection). @@ -393,3 +394,30 @@ connection_name(ConnectionPid) -> {<<"connection_name">>, _, ConnName} -> ConnName; false -> undefined end. + +ensure_safe_call_timeout(#amqp_params_network{connection_timeout = ConnTimeout}, CallTimeout) -> + maybe_update_call_timeout(ConnTimeout, CallTimeout); +ensure_safe_call_timeout(#amqp_params_direct{}, CallTimeout) -> + case net_kernel:get_net_ticktime() of + NetTicktime when is_integer(NetTicktime) -> + maybe_update_call_timeout(tick_or_direct_timeout(NetTicktime * 1000), + CallTimeout); + {ongoing_change_to, NetTicktime} -> + maybe_update_call_timeout(tick_or_direct_timeout(NetTicktime * 1000), + CallTimeout); + ignore -> + maybe_update_call_timeout(?DIRECT_OPERATION_TIMEOUT, CallTimeout) + end. + +maybe_update_call_timeout(BaseTimeout, CallTimeout) + when is_integer(BaseTimeout), CallTimeout > BaseTimeout -> + ok; +maybe_update_call_timeout(BaseTimeout, CallTimeout) -> + EffectiveSafeCallTimeout = amqp_util:safe_call_timeout(BaseTimeout), + ?LOG_WARN("AMQP client call timeout was ~p millseconds, is updated to a safe effective " + "value of ~p millseconds", [CallTimeout, EffectiveSafeCallTimeout]), + amqp_util:update_call_timeout(EffectiveSafeCallTimeout), + ok. + +tick_or_direct_timeout(Timeout) when Timeout >= ?DIRECT_OPERATION_TIMEOUT -> Timeout; +tick_or_direct_timeout(_Timeout) -> ?DIRECT_OPERATION_TIMEOUT. diff --git a/deps/amqp_client/src/amqp_util.erl b/deps/amqp_client/src/amqp_util.erl index df7ce30662..0324d4a171 100644 --- a/deps/amqp_client/src/amqp_util.erl +++ b/deps/amqp_client/src/amqp_util.erl @@ -2,7 +2,7 @@ -include("amqp_client_internal.hrl"). --export([call_timeout/0]). +-export([call_timeout/0, update_call_timeout/1, safe_call_timeout/1]). call_timeout() -> case get(gen_server_call_timeout) of @@ -15,3 +15,11 @@ call_timeout() -> Timeout -> Timeout end. + +update_call_timeout(Timeout) -> + application:set_env(amqp_client, gen_server_call_timeout, Timeout), + put(gen_server_call_timeout, Timeout), + ok. + +safe_call_timeout(Threshold) -> + Threshold + ?CALL_TIMEOUT_DEVIATION.