233 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			Ruby
		
	
	
	
			
		
		
	
	
			233 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			Ruby
		
	
	
	
# frozen_string_literal: true
 | 
						|
 | 
						|
module Gitlab
 | 
						|
  module Database
 | 
						|
    module BackgroundMigration
 | 
						|
      SplitAndRetryError = Class.new(StandardError)
 | 
						|
      ReduceSubBatchSizeError = Class.new(StandardError)
 | 
						|
 | 
						|
      class BatchedJob < SharedModel
 | 
						|
        include EachBatch
 | 
						|
        include FromUnion
 | 
						|
 | 
						|
        self.table_name = :batched_background_migration_jobs
 | 
						|
 | 
						|
        MAX_ATTEMPTS = 3
 | 
						|
        MIN_BATCH_SIZE = 1
 | 
						|
        SUB_BATCH_SIZE_REDUCE_FACTOR = 0.75
 | 
						|
        SUB_BATCH_SIZE_THRESHOLD = 65
 | 
						|
        STUCK_JOBS_TIMEOUT = 1.hour.freeze
 | 
						|
        TIMEOUT_EXCEPTIONS = [ActiveRecord::StatementTimeout, ActiveRecord::ConnectionTimeoutError,
 | 
						|
                              ActiveRecord::AdapterTimeout, ActiveRecord::LockWaitTimeout,
 | 
						|
                              ActiveRecord::QueryCanceled].freeze
 | 
						|
 | 
						|
        belongs_to :batched_migration, foreign_key: :batched_background_migration_id
 | 
						|
        has_many :batched_job_transition_logs, foreign_key: :batched_background_migration_job_id
 | 
						|
 | 
						|
        scope :active, -> { with_statuses(:pending, :running) }
 | 
						|
        scope :stuck, -> { active.where('updated_at <= ?', STUCK_JOBS_TIMEOUT.ago) }
 | 
						|
        scope :retriable, -> { from_union([with_status(:failed).where('attempts < ?', MAX_ATTEMPTS), self.stuck]) }
 | 
						|
        scope :except_succeeded, -> { without_status(:succeeded) }
 | 
						|
        scope :successful_in_execution_order, -> { where.not(finished_at: nil).with_status(:succeeded).order(:finished_at) }
 | 
						|
        scope :with_preloads, -> { preload(:batched_migration) }
 | 
						|
        scope :created_since, ->(date_time) { where('created_at >= ?', date_time) }
 | 
						|
        scope :blocked_by_max_attempts, -> { where('attempts >= ?', MAX_ATTEMPTS) }
 | 
						|
 | 
						|
        state_machine :status, initial: :pending do
 | 
						|
          state :pending, value: 0
 | 
						|
          state :running, value: 1
 | 
						|
          state :failed, value: 2
 | 
						|
          state :succeeded, value: 3
 | 
						|
 | 
						|
          event :succeed do
 | 
						|
            transition any => :succeeded
 | 
						|
          end
 | 
						|
 | 
						|
          event :failure do
 | 
						|
            transition any => :failed
 | 
						|
          end
 | 
						|
 | 
						|
          event :run do
 | 
						|
            transition any => :running
 | 
						|
          end
 | 
						|
 | 
						|
          before_transition any => [:failed, :succeeded] do |job|
 | 
						|
            job.finished_at = Time.current
 | 
						|
          end
 | 
						|
 | 
						|
          before_transition any => :running do |job|
 | 
						|
            job.attempts += 1
 | 
						|
            job.started_at = Time.current
 | 
						|
            job.finished_at = nil
 | 
						|
            job.metrics = {}
 | 
						|
          end
 | 
						|
 | 
						|
          after_transition any => :failed do |job, transition|
 | 
						|
            exception, from_sub_batch = job.class.extract_transition_options(transition.args)
 | 
						|
 | 
						|
            job.reduce_sub_batch_size! if from_sub_batch && job.can_reduce_sub_batch_size?
 | 
						|
 | 
						|
            job.split_and_retry! if job.can_split?(exception)
 | 
						|
          rescue SplitAndRetryError, ReduceSubBatchSizeError => error
 | 
						|
            Gitlab::AppLogger.error(
 | 
						|
              message: error.message,
 | 
						|
              batched_job_id: job.id,
 | 
						|
              batched_migration_id: job.batched_migration.id,
 | 
						|
              job_class_name: job.migration_job_class_name,
 | 
						|
              job_arguments: job.migration_job_arguments
 | 
						|
            )
 | 
						|
          end
 | 
						|
 | 
						|
          after_transition do |job, transition|
 | 
						|
            exception, _ = job.class.extract_transition_options(transition.args)
 | 
						|
 | 
						|
            job.batched_job_transition_logs.create(previous_status: transition.from, next_status: transition.to, exception_class: exception&.class, exception_message: exception&.message)
 | 
						|
 | 
						|
            Gitlab::ErrorTracking.track_exception(exception, batched_job_id: job.id, job_class_name: job.migration_job_class_name, job_arguments: job.migration_job_arguments) if exception
 | 
						|
 | 
						|
            Gitlab::AppLogger.info(
 | 
						|
              message: 'BatchedJob transition',
 | 
						|
              batched_job_id: job.id,
 | 
						|
              previous_state: transition.from_name,
 | 
						|
              new_state: transition.to_name,
 | 
						|
              batched_migration_id: job.batched_migration.id,
 | 
						|
              job_class_name: job.migration_job_class_name,
 | 
						|
              job_arguments: job.migration_job_arguments,
 | 
						|
              exception_class: exception&.class,
 | 
						|
              exception_message: exception&.message
 | 
						|
            )
 | 
						|
          end
 | 
						|
        end
 | 
						|
 | 
						|
        delegate :job_class, :table_name, :column_name, :job_arguments, :job_class_name,
 | 
						|
          to: :batched_migration, prefix: :migration
 | 
						|
 | 
						|
        def self.extract_transition_options(args)
 | 
						|
          error_hash = args.find { |arg| arg[:error].present? }
 | 
						|
 | 
						|
          return [] unless error_hash
 | 
						|
 | 
						|
          exception = error_hash.fetch(:error)
 | 
						|
          from_sub_batch = error_hash[:from_sub_batch]
 | 
						|
 | 
						|
          [exception, from_sub_batch]
 | 
						|
        end
 | 
						|
 | 
						|
        def time_efficiency
 | 
						|
          return unless succeeded?
 | 
						|
          return unless finished_at && started_at
 | 
						|
 | 
						|
          duration = finished_at - started_at
 | 
						|
 | 
						|
          # TODO: Switch to individual job interval (prereq: https://gitlab.com/gitlab-org/gitlab/-/issues/328801)
 | 
						|
          duration.to_f / batched_migration.interval
 | 
						|
        end
 | 
						|
 | 
						|
        def can_split?(exception)
 | 
						|
          return if still_retryable?
 | 
						|
 | 
						|
          exception.class.in?(TIMEOUT_EXCEPTIONS) && within_batch_size_boundaries?
 | 
						|
        end
 | 
						|
 | 
						|
        def can_reduce_sub_batch_size?
 | 
						|
          return false unless Feature.enabled?(:reduce_sub_batch_size_on_timeouts)
 | 
						|
 | 
						|
          still_retryable? && within_batch_size_boundaries?
 | 
						|
        end
 | 
						|
 | 
						|
        def split_and_retry!
 | 
						|
          with_lock do
 | 
						|
            raise SplitAndRetryError, 'Only failed jobs can be split' unless failed?
 | 
						|
 | 
						|
            new_batch_size = batch_size / 2
 | 
						|
 | 
						|
            break update!(attempts: 0) if new_batch_size < 1
 | 
						|
 | 
						|
            batching_strategy = batched_migration.batch_class.new(connection: self.class.connection)
 | 
						|
            next_batch_bounds = batching_strategy.next_batch(
 | 
						|
              batched_migration.table_name,
 | 
						|
              batched_migration.column_name,
 | 
						|
              batch_min_value: min_value,
 | 
						|
              batch_size: new_batch_size,
 | 
						|
              job_arguments: batched_migration.job_arguments,
 | 
						|
              job_class: batched_migration.job_class
 | 
						|
            )
 | 
						|
            midpoint = next_batch_bounds.last
 | 
						|
 | 
						|
            # We don't want the midpoint to go over the existing max_value because
 | 
						|
            # those IDs would already be in the next batched migration job.
 | 
						|
            # This could happen when a lot of records in the current batch are deleted.
 | 
						|
            #
 | 
						|
            # In this case, we just lower the batch size so that future calls to this
 | 
						|
            # method could eventually split the job if it continues to fail.
 | 
						|
            if midpoint >= max_value
 | 
						|
              update!(batch_size: new_batch_size, attempts: 0)
 | 
						|
            else
 | 
						|
              old_max_value = max_value
 | 
						|
 | 
						|
              update!(
 | 
						|
                batch_size: new_batch_size,
 | 
						|
                max_value: midpoint,
 | 
						|
                attempts: 0,
 | 
						|
                started_at: nil,
 | 
						|
                finished_at: nil,
 | 
						|
                metrics: {}
 | 
						|
              )
 | 
						|
 | 
						|
              new_record = dup
 | 
						|
              new_record.min_value = midpoint.next
 | 
						|
              new_record.max_value = old_max_value
 | 
						|
              new_record.save!
 | 
						|
            end
 | 
						|
          end
 | 
						|
        end
 | 
						|
 | 
						|
        # It reduces the size of +sub_batch_size+ by 25%
 | 
						|
        def reduce_sub_batch_size!
 | 
						|
          raise ReduceSubBatchSizeError, 'Only sub_batch_size of failed jobs can be reduced' unless failed?
 | 
						|
 | 
						|
          return if sub_batch_exceeds_threshold?
 | 
						|
 | 
						|
          with_lock do
 | 
						|
            actual_sub_batch_size = sub_batch_size
 | 
						|
            reduced_sub_batch_size = (sub_batch_size * SUB_BATCH_SIZE_REDUCE_FACTOR).to_i.clamp(1, batch_size)
 | 
						|
 | 
						|
            update!(sub_batch_size: reduced_sub_batch_size)
 | 
						|
 | 
						|
            Gitlab::AppLogger.warn(
 | 
						|
              message: 'Sub batch size reduced due to timeout',
 | 
						|
              batched_job_id: id,
 | 
						|
              sub_batch_size: actual_sub_batch_size,
 | 
						|
              reduced_sub_batch_size: reduced_sub_batch_size,
 | 
						|
              attempts: attempts,
 | 
						|
              batched_migration_id: batched_migration.id,
 | 
						|
              job_class_name: migration_job_class_name,
 | 
						|
              job_arguments: migration_job_arguments
 | 
						|
            )
 | 
						|
          end
 | 
						|
        end
 | 
						|
 | 
						|
        def still_retryable?
 | 
						|
          attempts < MAX_ATTEMPTS
 | 
						|
        end
 | 
						|
 | 
						|
        def within_batch_size_boundaries?
 | 
						|
          batch_size > MIN_BATCH_SIZE && batch_size > sub_batch_size
 | 
						|
        end
 | 
						|
 | 
						|
        # It doesn't allow sub-batch size to be reduced lower than the threshold
 | 
						|
        #
 | 
						|
        # @info It will prevent the next iteration to reduce the +sub_batch_size+ lower
 | 
						|
        #       than the +SUB_BATCH_SIZE_THRESHOLD+ or 65% of its original size.
 | 
						|
        def sub_batch_exceeds_threshold?
 | 
						|
          initial_sub_batch_size = batched_migration.sub_batch_size
 | 
						|
          reduced_sub_batch_size = (sub_batch_size * SUB_BATCH_SIZE_REDUCE_FACTOR).to_i
 | 
						|
          diff = initial_sub_batch_size - reduced_sub_batch_size
 | 
						|
 | 
						|
          (1.0 * diff / initial_sub_batch_size * 100).round(2) > SUB_BATCH_SIZE_THRESHOLD
 | 
						|
        end
 | 
						|
      end
 | 
						|
    end
 | 
						|
  end
 | 
						|
end
 |