100 lines
		
	
	
		
			2.8 KiB
		
	
	
	
		
			Ruby
		
	
	
	
			
		
		
	
	
			100 lines
		
	
	
		
			2.8 KiB
		
	
	
	
		
			Ruby
		
	
	
	
| # frozen_string_literal: true
 | |
| 
 | |
| module Gitlab
 | |
|   # Returns an ID range within a table so it can be iterated over. Repeats from
 | |
|   # the beginning after it reaches the end.
 | |
|   #
 | |
|   # Used by Geo in particular to iterate over a replicable and its registry
 | |
|   # table.
 | |
|   #
 | |
|   # Tracks a cursor for each table, by "key". If the table is smaller than
 | |
|   # batch_size, then a range for the whole table is returned on every call.
 | |
|   class LoopingBatcher
 | |
|     # @param [Class] model_class the class of the table to iterate on
 | |
|     # @param [String] key to identify the cursor. Note, cursor is already unique
 | |
|     #   per table.
 | |
|     # @param [Integer] batch_size to limit the number of records in a batch
 | |
|     def initialize(model_class, key:, batch_size: 1000)
 | |
|       @model_class = model_class
 | |
|       @key = key
 | |
|       @batch_size = batch_size
 | |
|     end
 | |
| 
 | |
|     # @return [Range] a range of IDs. `nil` if 0 records at or after the cursor.
 | |
|     def next_range!
 | |
|       return unless @model_class.any?
 | |
| 
 | |
|       batch_first_id = cursor_id
 | |
| 
 | |
|       batch_last_id = get_batch_last_id(batch_first_id)
 | |
|       return unless batch_last_id
 | |
| 
 | |
|       batch_first_id..batch_last_id
 | |
|     end
 | |
| 
 | |
|     private
 | |
| 
 | |
|     # @private
 | |
|     #
 | |
|     # Get the last ID of the batch. Increment the cursor or reset it if at end.
 | |
|     #
 | |
|     # @param [Integer] batch_first_id the first ID of the batch
 | |
|     # @return [Integer] batch_last_id the last ID of the batch (not the table)
 | |
|     def get_batch_last_id(batch_first_id)
 | |
|       batch_last_id, more_rows = run_query(@model_class.table_name, @model_class.primary_key, batch_first_id, @batch_size)
 | |
| 
 | |
|       if more_rows
 | |
|         increment_batch(batch_last_id)
 | |
|       else
 | |
|         reset if batch_first_id > 1
 | |
|       end
 | |
| 
 | |
|       batch_last_id
 | |
|     end
 | |
| 
 | |
|     def run_query(table, primary_key, batch_first_id, batch_size)
 | |
|       sql = <<~SQL
 | |
|         SELECT MAX(batch.id) AS batch_last_id,
 | |
|         EXISTS (
 | |
|           SELECT #{primary_key}
 | |
|           FROM #{table}
 | |
|           WHERE #{primary_key} > MAX(batch.id)
 | |
|         ) AS more_rows
 | |
|         FROM (
 | |
|           SELECT #{primary_key}
 | |
|           FROM #{table}
 | |
|           WHERE #{primary_key} >= #{batch_first_id}
 | |
|           ORDER BY #{primary_key}
 | |
|           LIMIT #{batch_size}) AS batch;
 | |
|       SQL
 | |
| 
 | |
|       result = ActiveRecord::Base.connection.exec_query(sql).first
 | |
| 
 | |
|       [result["batch_last_id"], result["more_rows"]]
 | |
|     end
 | |
| 
 | |
|     def reset
 | |
|       set_cursor_id(1)
 | |
|     end
 | |
| 
 | |
|     def increment_batch(batch_last_id)
 | |
|       set_cursor_id(batch_last_id + 1)
 | |
|     end
 | |
| 
 | |
|     # @private
 | |
|     #
 | |
|     # @return [Integer] the cursor ID, or 1 if it is not set
 | |
|     def cursor_id
 | |
|       Rails.cache.fetch("#{cache_key}:cursor_id") || 1
 | |
|     end
 | |
| 
 | |
|     def set_cursor_id(id)
 | |
|       Rails.cache.write("#{cache_key}:cursor_id", id)
 | |
|     end
 | |
| 
 | |
|     def cache_key
 | |
|       @cache_key ||= "#{self.class.name.parameterize}:#{@model_class.name.parameterize}:#{@key}:cursor_id"
 | |
|     end
 | |
|   end
 | |
| end
 |