113 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Ruby
		
	
	
	
			
		
		
	
	
			113 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Ruby
		
	
	
	
| # frozen_string_literal: true
 | |
| 
 | |
| module Gitlab
 | |
|   ##
 | |
|   # This class is a queuing system for processing expensive tasks in an atomic manner
 | |
|   # with batch poping to let you optimize the total processing time.
 | |
|   #
 | |
|   # In usual queuing system, the first item started being processed immediately
 | |
|   # and the following items wait until the next items have been popped from the queue.
 | |
|   # On the other hand, this queueing system, the former part is same, however,
 | |
|   # it pops the enqueued items as batch. This is especially useful when you want to
 | |
|   # drop redandant items from the queue in order to process important items only,
 | |
|   # thus it's more efficient than the traditional queueing system.
 | |
|   #
 | |
|   # Caveats:
 | |
|   # - The order of the items are not guaranteed because of `sadd` (Redis Sets).
 | |
|   #
 | |
|   # Example:
 | |
|   # ```
 | |
|   # class TheWorker
 | |
|   #   def perform
 | |
|   #     result = Gitlab::BatchPopQueueing.new('feature', 'queue').safe_execute([item]) do |items_in_queue|
 | |
|   #       item = extract_the_most_important_item_from(items_in_queue)
 | |
|   #       expensive_process(item)
 | |
|   #     end
 | |
|   #
 | |
|   #     if result[:status] == :finished && result[:new_items].present?
 | |
|   #       item = extract_the_most_important_item_from(items_in_queue)
 | |
|   #       TheWorker.perform_async(item.id)
 | |
|   #     end
 | |
|   #   end
 | |
|   # end
 | |
|   # ```
 | |
|   #
 | |
|   class BatchPopQueueing
 | |
|     attr_reader :namespace, :queue_id
 | |
| 
 | |
|     EXTRA_QUEUE_EXPIRE_WINDOW = 1.hour
 | |
|     MAX_COUNTS_OF_POP_ALL = 1000
 | |
| 
 | |
|     # Initialize queue
 | |
|     #
 | |
|     # @param [String] namespace The namespace of the exclusive lock and queue key. Typically, it's a feature name.
 | |
|     # @param [String] queue_id The identifier of the queue.
 | |
|     # @return [Boolean]
 | |
|     def initialize(namespace, queue_id)
 | |
|       raise ArgumentError if namespace.empty? || queue_id.empty?
 | |
| 
 | |
|       @namespace, @queue_id = namespace, queue_id
 | |
|     end
 | |
| 
 | |
|     ##
 | |
|     # Execute the given block in an exclusive lock.
 | |
|     # If there is the other thread has already working on the block,
 | |
|     # it enqueues the items without processing the block.
 | |
|     #
 | |
|     # @param [Array<String>] new_items New items to be added to the queue.
 | |
|     # @param [Time] lock_timeout The timeout of the exclusive lock. Generally, this value should be longer than the maximum prosess timing of the given block.
 | |
|     # @return [Hash]
 | |
|     #   - status => One of the `:enqueued` or `:finished`.
 | |
|     #   - new_items => Newly enqueued items during the given block had been processed.
 | |
|     #
 | |
|     # NOTE: If an exception is raised in the block, the poppped items will not be recovered.
 | |
|     #       We should NOT re-enqueue the items in this case because it could end up in an infinite loop.
 | |
|     def safe_execute(new_items, lock_timeout: 10.minutes, &block)
 | |
|       enqueue(new_items, lock_timeout + EXTRA_QUEUE_EXPIRE_WINDOW)
 | |
| 
 | |
|       lease = Gitlab::ExclusiveLease.new(lock_key, timeout: lock_timeout)
 | |
| 
 | |
|       return { status: :enqueued } unless uuid = lease.try_obtain
 | |
| 
 | |
|       begin
 | |
|         all_args = pop_all
 | |
| 
 | |
|         yield all_args if block_given?
 | |
| 
 | |
|         { status: :finished, new_items: peek_all }
 | |
|       ensure
 | |
|         Gitlab::ExclusiveLease.cancel(lock_key, uuid)
 | |
|       end
 | |
|     end
 | |
| 
 | |
|     private
 | |
| 
 | |
|     def lock_key
 | |
|       @lock_key ||= "batch_pop_queueing:lock:#{namespace}:#{queue_id}"
 | |
|     end
 | |
| 
 | |
|     def queue_key
 | |
|       @queue_key ||= "batch_pop_queueing:queue:#{namespace}:#{queue_id}"
 | |
|     end
 | |
| 
 | |
|     def enqueue(items, expire_time)
 | |
|       Gitlab::Redis::Queues.with do |redis|
 | |
|         redis.sadd(queue_key, items)
 | |
|         redis.expire(queue_key, expire_time.to_i)
 | |
|       end
 | |
|     end
 | |
| 
 | |
|     def pop_all
 | |
|       Gitlab::Redis::Queues.with do |redis|
 | |
|         redis.spop(queue_key, MAX_COUNTS_OF_POP_ALL)
 | |
|       end
 | |
|     end
 | |
| 
 | |
|     def peek_all
 | |
|       Gitlab::Redis::Queues.with do |redis|
 | |
|         redis.smembers(queue_key)
 | |
|       end
 | |
|     end
 | |
|   end
 | |
| end
 |