No longer use amqp_util:call_timeout/0 for wait_for_confirms. Allow specifying integer dimension via a tuple

This commit is contained in:
Luke Bakken 2020-11-03 09:24:01 -08:00
parent 78b1f67e75
commit 482907e1fa
2 changed files with 20 additions and 9 deletions

View File

@ -26,3 +26,5 @@
{<<"consumer_cancel_notify">>, bool, true},
{<<"connection.blocked">>, bool, true},
{<<"authentication_failure_close">>, bool, true}]).
-define(WAIT_FOR_CONFIRMS_TIMEOUT, {60000, millisecond}).

View File

@ -218,12 +218,12 @@ next_publish_seqno(Channel) ->
%% @param Channel: the channel on which to wait.
%% @end
wait_for_confirms(Channel) ->
wait_for_confirms(Channel, amqp_util:call_timeout()).
wait_for_confirms(Channel, ?WAIT_FOR_CONFIRMS_TIMEOUT).
%% @spec (Channel, Timeout) -> boolean() | 'timeout'
%% where
%% Channel = pid()
%% Timeout = non_neg_integer() | 'infinity'
%% Timeout = non_neg_integer() | {non_neg_integer(), second | millisecond} | 'infinity'
%% @doc Wait until all messages published since the last call have
%% been either ack'd or nack'd by the broker or the timeout expires.
%% Note, when called on a non-Confirm channel, waitForConfirms throws
@ -231,11 +231,12 @@ wait_for_confirms(Channel) ->
%% @param Channel: the channel on which to wait.
%% @param Timeout: the wait timeout in seconds.
%% @end
wait_for_confirms(Channel, {Timeout, second}) ->
do_wait_for_confirms(Channel, second_to_millisecond(Timeout));
wait_for_confirms(Channel, {Timeout, millisecond}) ->
do_wait_for_confirms(Channel, Timeout);
wait_for_confirms(Channel, Timeout) ->
case gen_server:call(Channel, {wait_for_confirms, Timeout}, amqp_util:call_timeout()) of
{error, Reason} -> throw(Reason);
Other -> Other
end.
do_wait_for_confirms(Channel, second_to_millisecond(Timeout)).
%% @spec (Channel) -> true
%% where
@ -246,12 +247,12 @@ wait_for_confirms(Channel, Timeout) ->
%% @param Channel: the channel on which to wait.
%% @end
wait_for_confirms_or_die(Channel) ->
wait_for_confirms_or_die(Channel, amqp_util:call_timeout()).
wait_for_confirms_or_die(Channel, ?WAIT_FOR_CONFIRMS_TIMEOUT).
%% @spec (Channel, Timeout) -> true
%% where
%% Channel = pid()
%% Timeout = non_neg_integer() | 'infinity'
%% Timeout = non_neg_integer() | {non_neg_integer(), second | millisecond} | 'infinity'
%% @doc Behaves the same as wait_for_confirms/1, but if a nack is
%% received, the calling process is immediately sent an
%% exit(nack_received). If the timeout expires, the calling process is
@ -973,6 +974,12 @@ notify_confirm_waiters(State = #state{waiting_set = WSet,
State#state{waiting_set = gb_trees:empty(),
only_acks_received = true}.
do_wait_for_confirms(Channel, Timeout) when is_integer(Timeout) ->
case gen_server:call(Channel, {wait_for_confirms, Timeout}, amqp_util:call_timeout()) of
{error, Reason} -> throw(Reason);
Other -> Other
end.
handle_wait_for_confirms(_From, _Timeout, State = #state{next_pub_seqno = 0}) ->
{reply, {error, not_in_confirm_mode}, State};
handle_wait_for_confirms(From, Timeout,
@ -983,7 +990,7 @@ handle_wait_for_confirms(From, Timeout,
false -> TRef = case Timeout of
infinity -> undefined;
_ -> erlang:send_after(
Timeout * 1000, self(),
Timeout, self(),
{confirm_timeout, From})
end,
{noreply,
@ -999,3 +1006,5 @@ call_to_consumer(Method, Args, DeliveryCtx, #state{consumer = Consumer}) ->
safe_cancel_timer(undefined) -> ok;
safe_cancel_timer(TRef) -> erlang:cancel_timer(TRef).
second_to_millisecond(Timeout) ->
Timeout * 1000.