102 lines
		
	
	
		
			2.9 KiB
		
	
	
	
		
			Ruby
		
	
	
	
			
		
		
	
	
			102 lines
		
	
	
		
			2.9 KiB
		
	
	
	
		
			Ruby
		
	
	
	
# frozen_string_literal: true
 | 
						|
 | 
						|
module Gitlab
 | 
						|
  module BitbucketServerImport
 | 
						|
    module ParallelScheduling
 | 
						|
      include Loggable
 | 
						|
 | 
						|
      attr_reader :project, :already_processed_cache_key, :job_waiter_cache_key
 | 
						|
 | 
						|
      # The base cache key to use for tracking already processed objects.
 | 
						|
      ALREADY_PROCESSED_CACHE_KEY =
 | 
						|
        'bitbucket-server-importer/already-processed/%{project}/%{collection}'
 | 
						|
 | 
						|
      # The base cache key to use for storing job waiter key
 | 
						|
      JOB_WAITER_CACHE_KEY =
 | 
						|
        'bitbucket-server-importer/job-waiter/%{project}/%{collection}'
 | 
						|
 | 
						|
      BATCH_SIZE = 100
 | 
						|
 | 
						|
      # project - An instance of `Project`.
 | 
						|
      def initialize(project)
 | 
						|
        @project = project
 | 
						|
 | 
						|
        @already_processed_cache_key =
 | 
						|
          format(ALREADY_PROCESSED_CACHE_KEY, project: project.id, collection: collection_method)
 | 
						|
        @job_waiter_cache_key =
 | 
						|
          format(JOB_WAITER_CACHE_KEY, project: project.id, collection: collection_method)
 | 
						|
      end
 | 
						|
 | 
						|
      private
 | 
						|
 | 
						|
      def client
 | 
						|
        @client ||= BitbucketServer::Client.new(project.import_data.credentials)
 | 
						|
      end
 | 
						|
 | 
						|
      def project_key
 | 
						|
        @project_key ||= project.import_data.data['project_key']
 | 
						|
      end
 | 
						|
 | 
						|
      def repository_slug
 | 
						|
        @repository_slug ||= project.import_data.data['repo_slug']
 | 
						|
      end
 | 
						|
 | 
						|
      # Returns the ID to use for the cache used for checking if an object has
 | 
						|
      # already been processed or not.
 | 
						|
      #
 | 
						|
      # object - The object we may want to import.
 | 
						|
      def id_for_already_processed_cache(object)
 | 
						|
        raise NotImplementedError
 | 
						|
      end
 | 
						|
 | 
						|
      # The Sidekiq worker class used for scheduling the importing of objects in
 | 
						|
      # parallel.
 | 
						|
      def sidekiq_worker_class
 | 
						|
        raise NotImplementedError
 | 
						|
      end
 | 
						|
 | 
						|
      # The name of the method to call to retrieve the data to import.
 | 
						|
      def collection_method
 | 
						|
        raise NotImplementedError
 | 
						|
      end
 | 
						|
 | 
						|
      def job_waiter
 | 
						|
        @job_waiter ||= begin
 | 
						|
          key = Gitlab::Cache::Import::Caching.read(job_waiter_cache_key)
 | 
						|
          key ||= Gitlab::Cache::Import::Caching.write(job_waiter_cache_key, JobWaiter.generate_key)
 | 
						|
 | 
						|
          JobWaiter.new(0, key)
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      def already_processed?(object)
 | 
						|
        id = id_for_already_processed_cache(object)
 | 
						|
 | 
						|
        Gitlab::Cache::Import::Caching.set_includes?(already_processed_cache_key, id)
 | 
						|
      end
 | 
						|
 | 
						|
      # Marks the given object as "already processed".
 | 
						|
      def mark_as_processed(object)
 | 
						|
        id = id_for_already_processed_cache(object)
 | 
						|
 | 
						|
        Gitlab::Cache::Import::Caching.set_add(already_processed_cache_key, id)
 | 
						|
      end
 | 
						|
 | 
						|
      def calculate_job_delay(job_index)
 | 
						|
        multiplier = (job_index / BATCH_SIZE)
 | 
						|
 | 
						|
        (multiplier * 1.minute) + 1.second
 | 
						|
      end
 | 
						|
 | 
						|
      def track_import_failure!(project, exception:, **args)
 | 
						|
        Gitlab::Import::ImportFailureService.track(
 | 
						|
          project_id: project.id,
 | 
						|
          error_source: self.class.name,
 | 
						|
          exception: exception,
 | 
						|
          **args
 | 
						|
        )
 | 
						|
      end
 | 
						|
    end
 | 
						|
  end
 | 
						|
end
 |