Replace infinite timeouts with sensible defaults.
Change gen_server:call timeouts to use a configurable default that is cached inside each process' process dictionary. Also make supervisor shutdown timeouts use the SUPERVISOR_WAIT value. [#147178169]
This commit is contained in:
parent
af13d840e8
commit
c6e5a10035
|
@ -30,7 +30,7 @@
|
|||
channel_max = 0,
|
||||
frame_max = 0,
|
||||
heartbeat = 10,
|
||||
connection_timeout = infinity,
|
||||
connection_timeout = 30000,
|
||||
ssl_options = none,
|
||||
auth_mechanisms =
|
||||
[fun amqp_auth_mechanisms:plain/3,
|
||||
|
|
|
@ -35,3 +35,7 @@
|
|||
{<<"consumer_cancel_notify">>, bool, true},
|
||||
{<<"connection.blocked">>, bool, true},
|
||||
{<<"authentication_failure_close">>, bool, true}]).
|
||||
|
||||
-define(CALL_TIMEOUT, rabbit_misc:get_env(amqp_client, gen_server_call_timeout,
|
||||
30000)).
|
||||
|
||||
|
|
|
@ -140,7 +140,7 @@
|
|||
%% @spec (Channel, Method) -> Result
|
||||
%% @doc This is equivalent to amqp_channel:call(Channel, Method, none).
|
||||
call(Channel, Method) ->
|
||||
gen_server:call(Channel, {call, Method, none, self()}, infinity).
|
||||
gen_server:call(Channel, {call, Method, none, self()}, amqp_util:call_timeout()).
|
||||
|
||||
%% @spec (Channel, Method, Content) -> Result
|
||||
%% where
|
||||
|
@ -163,7 +163,7 @@ call(Channel, Method) ->
|
|||
%% the broker. It does not necessarily imply that the broker has
|
||||
%% accepted responsibility for the message.
|
||||
call(Channel, Method, Content) ->
|
||||
gen_server:call(Channel, {call, Method, Content, self()}, infinity).
|
||||
gen_server:call(Channel, {call, Method, Content, self()}, amqp_util:call_timeout()).
|
||||
|
||||
%% @spec (Channel, Method) -> ok
|
||||
%% @doc This is equivalent to amqp_channel:cast(Channel, Method, none).
|
||||
|
@ -208,7 +208,7 @@ close(Channel) ->
|
|||
%% @doc Closes the channel, allowing the caller to supply a reply code and
|
||||
%% text. If the channel is already closing, the atom 'closing' is returned.
|
||||
close(Channel, Code, Text) ->
|
||||
gen_server:call(Channel, {close, Code, Text}, infinity).
|
||||
gen_server:call(Channel, {close, Code, Text}, amqp_util:call_timeout()).
|
||||
|
||||
%% @spec (Channel) -> integer()
|
||||
%% where
|
||||
|
@ -216,7 +216,7 @@ close(Channel, Code, Text) ->
|
|||
%% @doc When in confirm mode, returns the sequence number of the next
|
||||
%% message to be published.
|
||||
next_publish_seqno(Channel) ->
|
||||
gen_server:call(Channel, next_publish_seqno, infinity).
|
||||
gen_server:call(Channel, next_publish_seqno, amqp_util:call_timeout()).
|
||||
|
||||
%% @spec (Channel) -> boolean() | 'timeout'
|
||||
%% where
|
||||
|
@ -225,7 +225,7 @@ next_publish_seqno(Channel) ->
|
|||
%% been either ack'd or nack'd by the broker. Note, when called on a
|
||||
%% non-Confirm channel, waitForConfirms returns an error.
|
||||
wait_for_confirms(Channel) ->
|
||||
wait_for_confirms(Channel, infinity).
|
||||
wait_for_confirms(Channel, amqp_util:call_timeout()).
|
||||
|
||||
%% @spec (Channel, Timeout) -> boolean() | 'timeout'
|
||||
%% where
|
||||
|
@ -236,7 +236,7 @@ wait_for_confirms(Channel) ->
|
|||
%% Note, when called on a non-Confirm channel, waitForConfirms throws
|
||||
%% an exception.
|
||||
wait_for_confirms(Channel, Timeout) ->
|
||||
case gen_server:call(Channel, {wait_for_confirms, Timeout}, infinity) of
|
||||
case gen_server:call(Channel, {wait_for_confirms, Timeout}, amqp_util:call_timeout()) of
|
||||
{error, Reason} -> throw(Reason);
|
||||
Other -> Other
|
||||
end.
|
||||
|
@ -248,7 +248,7 @@ wait_for_confirms(Channel, Timeout) ->
|
|||
%% received, the calling process is immediately sent an
|
||||
%% exit(nack_received).
|
||||
wait_for_confirms_or_die(Channel) ->
|
||||
wait_for_confirms_or_die(Channel, infinity).
|
||||
wait_for_confirms_or_die(Channel, amqp_util:call_timeout()).
|
||||
|
||||
%% @spec (Channel, Timeout) -> true
|
||||
%% where
|
||||
|
@ -328,7 +328,7 @@ unregister_flow_handler(Channel) ->
|
|||
%% where Consumer is the amqp_gen_consumer implementation registered with
|
||||
%% the channel.
|
||||
call_consumer(Channel, Msg) ->
|
||||
gen_server:call(Channel, {call_consumer, Msg}, infinity).
|
||||
gen_server:call(Channel, {call_consumer, Msg}, amqp_util:call_timeout()).
|
||||
|
||||
%% @spec (Channel, BasicConsume, Subscriber) -> ok
|
||||
%% where
|
||||
|
@ -338,7 +338,7 @@ call_consumer(Channel, Msg) ->
|
|||
%% @doc Subscribe the given pid to a queue using the specified
|
||||
%% basic.consume method.
|
||||
subscribe(Channel, BasicConsume = #'basic.consume'{}, Subscriber) ->
|
||||
gen_server:call(Channel, {subscribe, BasicConsume, Subscriber}, infinity).
|
||||
gen_server:call(Channel, {subscribe, BasicConsume, Subscriber}, amqp_util:call_timeout()).
|
||||
|
||||
%%---------------------------------------------------------------------------
|
||||
%% Internal interface
|
||||
|
@ -364,7 +364,7 @@ connection_closing(Pid, ChannelCloseType, Reason) ->
|
|||
|
||||
%% @private
|
||||
open(Pid) ->
|
||||
gen_server:call(Pid, open, infinity).
|
||||
gen_server:call(Pid, open, amqp_util:call_timeout()).
|
||||
|
||||
%%---------------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
|
@ -993,3 +993,4 @@ call_to_consumer(Method, Args, DeliveryCtx, #state{consumer = Consumer}) ->
|
|||
|
||||
safe_cancel_timer(undefined) -> ok;
|
||||
safe_cancel_timer(TRef) -> erlang:cancel_timer(TRef).
|
||||
|
||||
|
|
|
@ -43,16 +43,16 @@ start_link(Connection, ConnName, ChSupSup) ->
|
|||
|
||||
open_channel(ChMgr, ProposedNumber, Consumer, InfraArgs) ->
|
||||
gen_server:call(ChMgr, {open_channel, ProposedNumber, Consumer, InfraArgs},
|
||||
infinity).
|
||||
amqp_util:call_timeout()).
|
||||
|
||||
set_channel_max(ChMgr, ChannelMax) ->
|
||||
gen_server:cast(ChMgr, {set_channel_max, ChannelMax}).
|
||||
|
||||
is_empty(ChMgr) ->
|
||||
gen_server:call(ChMgr, is_empty, infinity).
|
||||
gen_server:call(ChMgr, is_empty, amqp_util:call_timeout()).
|
||||
|
||||
num_channels(ChMgr) ->
|
||||
gen_server:call(ChMgr, num_channels, infinity).
|
||||
gen_server:call(ChMgr, num_channels, amqp_util:call_timeout()).
|
||||
|
||||
pass_frame(ChMgr, ChNumber, Frame) ->
|
||||
gen_server:cast(ChMgr, {pass_frame, ChNumber, Frame}).
|
||||
|
|
|
@ -121,7 +121,7 @@
|
|||
%% defaults to 0 (turned off) (network only)</li>
|
||||
%% <li>connection_timeout :: non_neg_integer() | 'infinity'
|
||||
%% - The connection timeout in milliseconds,
|
||||
%% defaults to 'infinity' (network only)</li>
|
||||
%% defaults to 30000 (network only)</li>
|
||||
%% <li>ssl_options :: term() - The second parameter to be used with the
|
||||
%% ssl:connect/2 function, defaults to 'none' (network only)</li>
|
||||
%% <li>client_properties :: [{binary(), atom(), binary()}] - A list of extra
|
||||
|
@ -278,7 +278,7 @@ close(ConnectionPid, Timeout) ->
|
|||
%% @doc Closes the AMQP connection, allowing the caller to set the reply
|
||||
%% code and text.
|
||||
close(ConnectionPid, Code, Text) ->
|
||||
close(ConnectionPid, Code, Text, infinity).
|
||||
close(ConnectionPid, Code, Text, amqp_util:call_timeout()).
|
||||
|
||||
%% @spec (ConnectionPid, Code, Text, Timeout) -> ok | closing
|
||||
%% where
|
||||
|
|
|
@ -33,7 +33,7 @@ start_link(AMQPParams) ->
|
|||
{ok, TypeSup} = supervisor2:start_child(
|
||||
Sup, {connection_type_sup,
|
||||
{amqp_connection_type_sup, start_link, []},
|
||||
transient, infinity, supervisor,
|
||||
transient, ?SUPERVISOR_WAIT, supervisor,
|
||||
[amqp_connection_type_sup]}),
|
||||
{ok, Connection} = supervisor2:start_child(
|
||||
Sup, {connection, {amqp_gen_connection, start_link,
|
||||
|
|
|
@ -42,7 +42,7 @@ start_channels_manager(Sup, Conn, ConnName, Type) ->
|
|||
Sup,
|
||||
{channel_sup_sup, {amqp_channel_sup_sup, start_link,
|
||||
[Type, Conn, ConnName]},
|
||||
intrinsic, infinity, supervisor,
|
||||
intrinsic, ?SUPERVISOR_WAIT, supervisor,
|
||||
[amqp_channel_sup_sup]}),
|
||||
{ok, _} = supervisor2:start_child(
|
||||
Sup,
|
||||
|
|
|
@ -55,12 +55,12 @@ start_link(TypeSup, AMQPParams) ->
|
|||
gen_server:start_link(?MODULE, {TypeSup, AMQPParams}, []).
|
||||
|
||||
connect(Pid) ->
|
||||
gen_server:call(Pid, connect, infinity).
|
||||
gen_server:call(Pid, connect, amqp_util:call_timeout()).
|
||||
|
||||
open_channel(Pid, ProposedNumber, Consumer) ->
|
||||
case gen_server:call(Pid,
|
||||
{command, {open_channel, ProposedNumber, Consumer}},
|
||||
infinity) of
|
||||
amqp_util:call_timeout()) of
|
||||
{ok, ChannelPid} -> ok = amqp_channel:open(ChannelPid),
|
||||
{ok, ChannelPid};
|
||||
Error -> Error
|
||||
|
@ -79,19 +79,19 @@ channels_terminated(Pid) ->
|
|||
gen_server:cast(Pid, channels_terminated).
|
||||
|
||||
close(Pid, Close, Timeout) ->
|
||||
gen_server:call(Pid, {command, {close, Close, Timeout}}, infinity).
|
||||
gen_server:call(Pid, {command, {close, Close, Timeout}}, amqp_util:call_timeout()).
|
||||
|
||||
server_close(Pid, Close) ->
|
||||
gen_server:cast(Pid, {server_close, Close}).
|
||||
|
||||
info(Pid, Items) ->
|
||||
gen_server:call(Pid, {info, Items}, infinity).
|
||||
gen_server:call(Pid, {info, Items}, amqp_util:call_timeout()).
|
||||
|
||||
info_keys() ->
|
||||
?INFO_KEYS.
|
||||
|
||||
info_keys(Pid) ->
|
||||
gen_server:call(Pid, info_keys, infinity).
|
||||
gen_server:call(Pid, info_keys, amqp_util:call_timeout()).
|
||||
|
||||
%%---------------------------------------------------------------------------
|
||||
%% Behaviour
|
||||
|
|
|
@ -58,7 +58,7 @@ start_link(ConsumerModule, ExtraParams, Identity) ->
|
|||
%% @doc This function is used to perform arbitrary calls into the
|
||||
%% consumer module.
|
||||
call_consumer(Pid, Msg) ->
|
||||
gen_server2:call(Pid, {consumer_call, Msg}, infinity).
|
||||
gen_server2:call(Pid, {consumer_call, Msg}, amqp_util:call_timeout()).
|
||||
|
||||
%% @spec (Consumer, Method, Args) -> ok
|
||||
%% where
|
||||
|
@ -69,10 +69,10 @@ call_consumer(Pid, Msg) ->
|
|||
%% @doc This function is used by amqp_channel to forward received
|
||||
%% methods and deliveries to the consumer module.
|
||||
call_consumer(Pid, Method, Args) ->
|
||||
gen_server2:call(Pid, {consumer_call, Method, Args}, infinity).
|
||||
gen_server2:call(Pid, {consumer_call, Method, Args}, amqp_util:call_timeout()).
|
||||
|
||||
call_consumer(Pid, Method, Args, DeliveryCtx) ->
|
||||
gen_server2:call(Pid, {consumer_call, Method, Args, DeliveryCtx}, infinity).
|
||||
gen_server2:call(Pid, {consumer_call, Method, Args, DeliveryCtx}, amqp_util:call_timeout()).
|
||||
|
||||
%%---------------------------------------------------------------------------
|
||||
%% Behaviour
|
||||
|
|
|
@ -51,7 +51,7 @@ init([Sock, Connection, ConnName, ChMgr, AState]) ->
|
|||
channels_manager = ChMgr,
|
||||
astate = AState,
|
||||
message = none},
|
||||
case rabbit_net:async_recv(Sock, 0, infinity) of
|
||||
case rabbit_net:async_recv(Sock, 0, amqp_util:call_timeout()) of
|
||||
{ok, _} -> {ok, State};
|
||||
{error, Reason} -> {stop, Reason, _} = handle_error(Reason, State),
|
||||
{stop, Reason}
|
||||
|
@ -72,7 +72,7 @@ handle_cast(Cast, State) ->
|
|||
handle_info({inet_async, Sock, _, {ok, Data}},
|
||||
State = #state {sock = Sock}) ->
|
||||
%% Latency hiding: Request next packet first, then process data
|
||||
case rabbit_net:async_recv(Sock, 0, infinity) of
|
||||
case rabbit_net:async_recv(Sock, 0, amqp_util:call_timeout()) of
|
||||
{ok, _} -> handle_data(Data, State);
|
||||
{error, Reason} -> handle_error(Reason, State)
|
||||
end;
|
||||
|
|
|
@ -70,7 +70,7 @@ start_link(Connection, Queue) ->
|
|||
%% RpcClient = pid()
|
||||
%% @doc Stops an exisiting RPC client.
|
||||
stop(Pid) ->
|
||||
gen_server:call(Pid, stop, infinity).
|
||||
gen_server:call(Pid, stop, amqp_util:call_timeout()).
|
||||
|
||||
%% @spec (RpcClient, Payload) -> ok
|
||||
%% where
|
||||
|
@ -79,7 +79,7 @@ stop(Pid) ->
|
|||
%% @doc Invokes an RPC. Note the caller of this function is responsible for
|
||||
%% encoding the request and decoding the response.
|
||||
call(RpcClient, Payload) ->
|
||||
gen_server:call(RpcClient, {call, Payload}, infinity).
|
||||
gen_server:call(RpcClient, {call, Payload}, amqp_util:call_timeout()).
|
||||
|
||||
%%--------------------------------------------------------------------------
|
||||
%% Plumbing
|
||||
|
|
|
@ -70,7 +70,7 @@ start_link(Connection, Queue, Fun) ->
|
|||
%% RpcServer = pid()
|
||||
%% @doc Stops an exisiting RPC server.
|
||||
stop(Pid) ->
|
||||
gen_server:call(Pid, stop, infinity).
|
||||
gen_server:call(Pid, stop, amqp_util:call_timeout()).
|
||||
|
||||
%%--------------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
|
|
|
@ -44,4 +44,4 @@ start_connection_sup(AmqpParams) ->
|
|||
init([]) ->
|
||||
{ok, {{simple_one_for_one, 0, 1},
|
||||
[{connection_sup, {amqp_connection_sup, start_link, []},
|
||||
temporary, infinity, supervisor, [amqp_connection_sup]}]}}.
|
||||
temporary, ?SUPERVISOR_WAIT, supervisor, [amqp_connection_sup]}]}}.
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
-module(amqp_util).
|
||||
|
||||
-include("amqp_client_internal.hrl").
|
||||
|
||||
-export([call_timeout/0]).
|
||||
|
||||
call_timeout() ->
|
||||
case get(gen_server_call_timeout) of
|
||||
undefined ->
|
||||
Timeout = rabbit_misc:get_env(amqp_client,
|
||||
gen_server_call_timeout,
|
||||
?CALL_TIMEOUT),
|
||||
put(gen_server_call_timeout, Timeout),
|
||||
Timeout;
|
||||
Timeout ->
|
||||
Timeout
|
||||
end.
|
Loading…
Reference in New Issue