Made can_send/2 into can_send/3.
This commit is contained in:
parent
2f4f19b389
commit
a05bed3092
|
|
@ -175,8 +175,7 @@ deliver_immediately(Message, Delivered,
|
|||
C = #cr{limiter_pid = LimiterPid,
|
||||
unsent_message_count = Count,
|
||||
unacked_messages = UAM} = ch_record(ChPid),
|
||||
case not(AckRequired) orelse rabbit_limiter:can_send(
|
||||
LimiterPid, self()) of
|
||||
case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of
|
||||
true ->
|
||||
rabbit_channel:deliver(
|
||||
ChPid, ConsumerTag, AckRequired,
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@
|
|||
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
|
||||
handle_info/2]).
|
||||
-export([start_link/1, shutdown/1]).
|
||||
-export([limit/2, can_send/2, ack/2, register/2, unregister/2]).
|
||||
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
|
|
@ -47,7 +47,7 @@
|
|||
-spec(start_link/1 :: (pid()) -> pid()).
|
||||
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
|
||||
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
|
||||
-spec(can_send/2 :: (maybe_pid(), pid()) -> bool()).
|
||||
-spec(can_send/3 :: (maybe_pid(), pid(), bool()) -> bool()).
|
||||
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
|
||||
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
|
||||
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
|
||||
|
|
@ -85,12 +85,12 @@ limit(LimiterPid, PrefetchCount) ->
|
|||
|
||||
%% Ask the limiter whether the queue can deliver a message without
|
||||
%% breaching a limit
|
||||
can_send(undefined, _QPid) ->
|
||||
can_send(undefined, _QPid, _AckRequired) ->
|
||||
true;
|
||||
can_send(LimiterPid, QPid) ->
|
||||
can_send(LimiterPid, QPid, AckRequired) ->
|
||||
rabbit_misc:with_exit_handler(
|
||||
fun () -> true end,
|
||||
fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end).
|
||||
fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired}, infinity) end).
|
||||
|
||||
%% Let the limiter know that the channel has received some acks from a
|
||||
%% consumer
|
||||
|
|
@ -110,10 +110,13 @@ unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid})
|
|||
init([ChPid]) ->
|
||||
{ok, #lim{ch_pid = ChPid} }.
|
||||
|
||||
handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) ->
|
||||
handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) ->
|
||||
Volume1 = if AckRequired -> Volume + 1;
|
||||
true -> Volume
|
||||
end,
|
||||
case limit_reached(State) of
|
||||
true -> {reply, false, limit_queue(QPid, State)};
|
||||
false -> {reply, true, State#lim{volume = Volume + 1}}
|
||||
false -> {reply, true, State#lim{volume = Volume1}}
|
||||
end.
|
||||
|
||||
handle_cast(shutdown, State) ->
|
||||
|
|
|
|||
Loading…
Reference in New Issue