113 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Ruby
		
	
	
	
			
		
		
	
	
			113 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Ruby
		
	
	
	
# frozen_string_literal: true
 | 
						|
 | 
						|
module Gitlab
 | 
						|
  ##
 | 
						|
  # This class is a queuing system for processing expensive tasks in an atomic manner
 | 
						|
  # with batch poping to let you optimize the total processing time.
 | 
						|
  #
 | 
						|
  # In usual queuing system, the first item started being processed immediately
 | 
						|
  # and the following items wait until the next items have been popped from the queue.
 | 
						|
  # On the other hand, this queueing system, the former part is same, however,
 | 
						|
  # it pops the enqueued items as batch. This is especially useful when you want to
 | 
						|
  # drop redandant items from the queue in order to process important items only,
 | 
						|
  # thus it's more efficient than the traditional queueing system.
 | 
						|
  #
 | 
						|
  # Caveats:
 | 
						|
  # - The order of the items are not guaranteed because of `sadd` (Redis Sets).
 | 
						|
  #
 | 
						|
  # Example:
 | 
						|
  # ```
 | 
						|
  # class TheWorker
 | 
						|
  #   def perform
 | 
						|
  #     result = Gitlab::BatchPopQueueing.new('feature', 'queue').safe_execute([item]) do |items_in_queue|
 | 
						|
  #       item = extract_the_most_important_item_from(items_in_queue)
 | 
						|
  #       expensive_process(item)
 | 
						|
  #     end
 | 
						|
  #
 | 
						|
  #     if result[:status] == :finished && result[:new_items].present?
 | 
						|
  #       item = extract_the_most_important_item_from(items_in_queue)
 | 
						|
  #       TheWorker.perform_async(item.id)
 | 
						|
  #     end
 | 
						|
  #   end
 | 
						|
  # end
 | 
						|
  # ```
 | 
						|
  #
 | 
						|
  class BatchPopQueueing
 | 
						|
    attr_reader :namespace, :queue_id
 | 
						|
 | 
						|
    EXTRA_QUEUE_EXPIRE_WINDOW = 1.hour
 | 
						|
    MAX_COUNTS_OF_POP_ALL = 1000
 | 
						|
 | 
						|
    # Initialize queue
 | 
						|
    #
 | 
						|
    # @param [String] namespace The namespace of the exclusive lock and queue key. Typically, it's a feature name.
 | 
						|
    # @param [String] queue_id The identifier of the queue.
 | 
						|
    # @return [Boolean]
 | 
						|
    def initialize(namespace, queue_id)
 | 
						|
      raise ArgumentError if namespace.empty? || queue_id.empty?
 | 
						|
 | 
						|
      @namespace, @queue_id = namespace, queue_id
 | 
						|
    end
 | 
						|
 | 
						|
    ##
 | 
						|
    # Execute the given block in an exclusive lock.
 | 
						|
    # If there is the other thread has already working on the block,
 | 
						|
    # it enqueues the items without processing the block.
 | 
						|
    #
 | 
						|
    # @param [Array<String>] new_items New items to be added to the queue.
 | 
						|
    # @param [Time] lock_timeout The timeout of the exclusive lock. Generally, this value should be longer than the maximum prosess timing of the given block.
 | 
						|
    # @return [Hash]
 | 
						|
    #   - status => One of the `:enqueued` or `:finished`.
 | 
						|
    #   - new_items => Newly enqueued items during the given block had been processed.
 | 
						|
    #
 | 
						|
    # NOTE: If an exception is raised in the block, the poppped items will not be recovered.
 | 
						|
    #       We should NOT re-enqueue the items in this case because it could end up in an infinite loop.
 | 
						|
    def safe_execute(new_items, lock_timeout: 10.minutes, &block)
 | 
						|
      enqueue(new_items, lock_timeout + EXTRA_QUEUE_EXPIRE_WINDOW)
 | 
						|
 | 
						|
      lease = Gitlab::ExclusiveLease.new(lock_key, timeout: lock_timeout)
 | 
						|
 | 
						|
      return { status: :enqueued } unless uuid = lease.try_obtain
 | 
						|
 | 
						|
      begin
 | 
						|
        all_args = pop_all
 | 
						|
 | 
						|
        yield all_args if block_given?
 | 
						|
 | 
						|
        { status: :finished, new_items: peek_all }
 | 
						|
      ensure
 | 
						|
        Gitlab::ExclusiveLease.cancel(lock_key, uuid)
 | 
						|
      end
 | 
						|
    end
 | 
						|
 | 
						|
    private
 | 
						|
 | 
						|
    def lock_key
 | 
						|
      @lock_key ||= "batch_pop_queueing:lock:#{namespace}:#{queue_id}"
 | 
						|
    end
 | 
						|
 | 
						|
    def queue_key
 | 
						|
      @queue_key ||= "batch_pop_queueing:queue:#{namespace}:#{queue_id}"
 | 
						|
    end
 | 
						|
 | 
						|
    def enqueue(items, expire_time)
 | 
						|
      Gitlab::Redis::Queues.with do |redis|
 | 
						|
        redis.sadd(queue_key, items)
 | 
						|
        redis.expire(queue_key, expire_time.to_i)
 | 
						|
      end
 | 
						|
    end
 | 
						|
 | 
						|
    def pop_all
 | 
						|
      Gitlab::Redis::Queues.with do |redis|
 | 
						|
        redis.spop(queue_key, MAX_COUNTS_OF_POP_ALL)
 | 
						|
      end
 | 
						|
    end
 | 
						|
 | 
						|
    def peek_all
 | 
						|
      Gitlab::Redis::Queues.with do |redis|
 | 
						|
        redis.smembers(queue_key)
 | 
						|
      end
 | 
						|
    end
 | 
						|
  end
 | 
						|
end
 |