diff --git a/deps/amqp_client/include/amqp_client_internal.hrl b/deps/amqp_client/include/amqp_client_internal.hrl index 960c4a52ff..01e099097e 100644 --- a/deps/amqp_client/include/amqp_client_internal.hrl +++ b/deps/amqp_client/include/amqp_client_internal.hrl @@ -26,3 +26,5 @@ {<<"consumer_cancel_notify">>, bool, true}, {<<"connection.blocked">>, bool, true}, {<<"authentication_failure_close">>, bool, true}]). + +-define(WAIT_FOR_CONFIRMS_TIMEOUT, {60000, millisecond}). diff --git a/deps/amqp_client/src/amqp_channel.erl b/deps/amqp_client/src/amqp_channel.erl index c00e4f61f1..9e95df4fe3 100644 --- a/deps/amqp_client/src/amqp_channel.erl +++ b/deps/amqp_client/src/amqp_channel.erl @@ -218,12 +218,12 @@ next_publish_seqno(Channel) -> %% @param Channel: the channel on which to wait. %% @end wait_for_confirms(Channel) -> - wait_for_confirms(Channel, amqp_util:call_timeout()). + wait_for_confirms(Channel, ?WAIT_FOR_CONFIRMS_TIMEOUT). %% @spec (Channel, Timeout) -> boolean() | 'timeout' %% where %% Channel = pid() -%% Timeout = non_neg_integer() | 'infinity' +%% Timeout = non_neg_integer() | {non_neg_integer(), second | millisecond} | 'infinity' %% @doc Wait until all messages published since the last call have %% been either ack'd or nack'd by the broker or the timeout expires. %% Note, when called on a non-Confirm channel, waitForConfirms throws @@ -231,11 +231,12 @@ wait_for_confirms(Channel) -> %% @param Channel: the channel on which to wait. %% @param Timeout: the wait timeout in seconds. %% @end +wait_for_confirms(Channel, {Timeout, second}) -> + do_wait_for_confirms(Channel, second_to_millisecond(Timeout)); +wait_for_confirms(Channel, {Timeout, millisecond}) -> + do_wait_for_confirms(Channel, Timeout); wait_for_confirms(Channel, Timeout) -> - case gen_server:call(Channel, {wait_for_confirms, Timeout}, amqp_util:call_timeout()) of - {error, Reason} -> throw(Reason); - Other -> Other - end. + do_wait_for_confirms(Channel, second_to_millisecond(Timeout)). %% @spec (Channel) -> true %% where @@ -246,12 +247,12 @@ wait_for_confirms(Channel, Timeout) -> %% @param Channel: the channel on which to wait. %% @end wait_for_confirms_or_die(Channel) -> - wait_for_confirms_or_die(Channel, amqp_util:call_timeout()). + wait_for_confirms_or_die(Channel, ?WAIT_FOR_CONFIRMS_TIMEOUT). %% @spec (Channel, Timeout) -> true %% where %% Channel = pid() -%% Timeout = non_neg_integer() | 'infinity' +%% Timeout = non_neg_integer() | {non_neg_integer(), second | millisecond} | 'infinity' %% @doc Behaves the same as wait_for_confirms/1, but if a nack is %% received, the calling process is immediately sent an %% exit(nack_received). If the timeout expires, the calling process is @@ -973,6 +974,12 @@ notify_confirm_waiters(State = #state{waiting_set = WSet, State#state{waiting_set = gb_trees:empty(), only_acks_received = true}. +do_wait_for_confirms(Channel, Timeout) when is_integer(Timeout) -> + case gen_server:call(Channel, {wait_for_confirms, Timeout}, amqp_util:call_timeout()) of + {error, Reason} -> throw(Reason); + Other -> Other + end. + handle_wait_for_confirms(_From, _Timeout, State = #state{next_pub_seqno = 0}) -> {reply, {error, not_in_confirm_mode}, State}; handle_wait_for_confirms(From, Timeout, @@ -983,7 +990,7 @@ handle_wait_for_confirms(From, Timeout, false -> TRef = case Timeout of infinity -> undefined; _ -> erlang:send_after( - Timeout * 1000, self(), + Timeout, self(), {confirm_timeout, From}) end, {noreply, @@ -999,3 +1006,5 @@ call_to_consumer(Method, Args, DeliveryCtx, #state{consumer = Consumer}) -> safe_cancel_timer(undefined) -> ok; safe_cancel_timer(TRef) -> erlang:cancel_timer(TRef). +second_to_millisecond(Timeout) -> + Timeout * 1000.