Dead queue informs limiter
This commit is contained in:
		
							parent
							
								
									e7359063fa
								
							
						
					
					
						commit
						5477c63fa7
					
				| 
						 | 
				
			
			@ -81,6 +81,9 @@ init(Q) ->
 | 
			
		|||
            round_robin = queue:new()}, ?HIBERNATE_AFTER}.
 | 
			
		||||
 | 
			
		||||
terminate(_Reason, State) ->
 | 
			
		||||
    %% Inform all limiters that we're dying
 | 
			
		||||
    [ rabbit_limiter:unregister_queue(LimiterPid, self()) 
 | 
			
		||||
      || #cr{limiter_pid = LimiterPid} <- all_ch_record()],
 | 
			
		||||
    %% FIXME: How do we cancel active subscriptions?
 | 
			
		||||
    QName = qname(State),
 | 
			
		||||
    lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end,
 | 
			
		||||
| 
						 | 
				
			
			@ -665,7 +668,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
 | 
			
		|||
    case lookup_ch(ChPid) of
 | 
			
		||||
        not_found ->
 | 
			
		||||
            noreply(State);
 | 
			
		||||
        C = #cr{unacked_messages = UAM, limiter_pid = LimiterPid} ->
 | 
			
		||||
        C = #cr{unacked_messages = UAM} ->
 | 
			
		||||
            {Acked, Remaining} = collect_messages(MsgIds, UAM),
 | 
			
		||||
            persist_acks(Txn, qname(State), Acked),
 | 
			
		||||
            case Txn of
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -293,6 +293,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
 | 
			
		|||
                _    -> true
 | 
			
		||||
            end
 | 
			
		||||
        end, Acked),
 | 
			
		||||
    % TODO Optimization: Probably don't need to send this if len = 0
 | 
			
		||||
    rabbit_limiter:decrement_capacity(Limiter, queue:len(NotBasicGet)),
 | 
			
		||||
    Participants = ack(State#ch.proxy_pid, TxnKey, Acked),
 | 
			
		||||
    {noreply, case TxnKey of
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -11,6 +11,7 @@
 | 
			
		|||
         handle_info/2]).
 | 
			
		||||
-export([start_link/1]).
 | 
			
		||||
-export([set_prefetch_count/2, can_send/2, decrement_capacity/2]).
 | 
			
		||||
-export([unregister_queue/2]).
 | 
			
		||||
 | 
			
		||||
-record(lim, {prefetch_count = 0,
 | 
			
		||||
              ch_pid,
 | 
			
		||||
| 
						 | 
				
			
			@ -38,6 +39,11 @@ can_send(LimiterPid, QPid) ->
 | 
			
		|||
% and hence can reduce the in-use-by-that queue capcity information
 | 
			
		||||
decrement_capacity(LimiterPid, Magnitude) ->
 | 
			
		||||
    gen_server:cast(LimiterPid, {decrement_capacity, Magnitude}).
 | 
			
		||||
    
 | 
			
		||||
% This is called to tell the limiter that the queue is probably dead and
 | 
			
		||||
% it should be forgotten about
 | 
			
		||||
unregister_queue(LimiterPid, QPid) ->
 | 
			
		||||
    gen_server:cast(LimiterPid, {unregister_queue, QPid}).
 | 
			
		||||
 | 
			
		||||
%---------------------------------------------------------------------------
 | 
			
		||||
% gen_server callbacks
 | 
			
		||||
| 
						 | 
				
			
			@ -68,6 +74,11 @@ handle_cast({prefetch_count, PrefetchCount},
 | 
			
		|||
                        queues = sets:new(),
 | 
			
		||||
                        in_use = 0}};
 | 
			
		||||
 | 
			
		||||
% Removes the queue process from the set of monitored queues
 | 
			
		||||
handle_cast({unregister_queue, QPid}, State= #lim{queues = Queues}) ->
 | 
			
		||||
    NewState = decrement_in_use(1, State),
 | 
			
		||||
    {noreply, NewState#lim{queues = sets:del_element(QPid, Queues)}};
 | 
			
		||||
 | 
			
		||||
% Default setter of the prefetch count
 | 
			
		||||
handle_cast({prefetch_count, PrefetchCount}, State) ->
 | 
			
		||||
    {noreply, State#lim{prefetch_count = PrefetchCount}};
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue