139 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Ruby
		
	
	
	
			
		
		
	
	
			139 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Ruby
		
	
	
	
# frozen_string_literal: true
 | 
						|
 | 
						|
module Gitlab
 | 
						|
  module BitbucketImport
 | 
						|
    module ParallelScheduling
 | 
						|
      include Loggable
 | 
						|
      include ErrorTracking
 | 
						|
 | 
						|
      attr_reader :project, :already_enqueued_cache_key, :job_waiter_cache_key, :job_waiter_remaining_cache_key,
 | 
						|
        :page_keyset
 | 
						|
 | 
						|
      # The base cache key to use for tracking already enqueued objects.
 | 
						|
      ALREADY_ENQUEUED_CACHE_KEY =
 | 
						|
        'bitbucket-importer/already-enqueued/%{project}/%{collection}'
 | 
						|
 | 
						|
      # The base cache key to use for storing job waiter key
 | 
						|
      JOB_WAITER_CACHE_KEY =
 | 
						|
        'bitbucket-importer/job-waiter/%{project}/%{collection}'
 | 
						|
 | 
						|
      # The base cache key to use for storing job waiter remaining jobs
 | 
						|
      JOB_WAITER_REMAINING_CACHE_KEY =
 | 
						|
        'bitbucket-importer/job-waiter-remaining/%{project}/%{collection}'
 | 
						|
 | 
						|
      # project - An instance of `Project`.
 | 
						|
      def initialize(project)
 | 
						|
        @project = project
 | 
						|
 | 
						|
        @already_enqueued_cache_key =
 | 
						|
          format(ALREADY_ENQUEUED_CACHE_KEY, project: project.id, collection: collection_method)
 | 
						|
        @job_waiter_cache_key =
 | 
						|
          format(JOB_WAITER_CACHE_KEY, project: project.id, collection: collection_method)
 | 
						|
        @job_waiter_remaining_cache_key = format(JOB_WAITER_REMAINING_CACHE_KEY, project: project.id,
 | 
						|
          collection: collection_method)
 | 
						|
        @page_keyset = Gitlab::Import::PageKeyset.new(project, collection_method, 'bitbucket-importer')
 | 
						|
      end
 | 
						|
 | 
						|
      # The method that will be called for traversing through all the objects to
 | 
						|
      # import, yielding them to the supplied block.
 | 
						|
      def each_object_to_import
 | 
						|
        repo = project.import_source
 | 
						|
 | 
						|
        options = collection_options.merge(next_url: page_keyset.current)
 | 
						|
 | 
						|
        client.each_page(collection_method, representation_type, repo, options) do |page|
 | 
						|
          page.items.each do |object|
 | 
						|
            job_waiter.jobs_remaining = Gitlab::Cache::Import::Caching.increment(job_waiter_remaining_cache_key)
 | 
						|
 | 
						|
            object = object.to_hash
 | 
						|
 | 
						|
            next if already_enqueued?(object)
 | 
						|
 | 
						|
            yield object
 | 
						|
 | 
						|
            # We mark the object as imported immediately so we don't end up
 | 
						|
            # scheduling it multiple times.
 | 
						|
            mark_as_enqueued(object)
 | 
						|
          end
 | 
						|
 | 
						|
          page_keyset.set(page.next) if page.next?
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      private
 | 
						|
 | 
						|
      # Any options to be passed to the method used for retrieving the data to
 | 
						|
      # import.
 | 
						|
      def collection_options
 | 
						|
        {}
 | 
						|
      end
 | 
						|
 | 
						|
      def client
 | 
						|
        @client ||= Bitbucket::Client.new(project.import_data.credentials)
 | 
						|
      end
 | 
						|
 | 
						|
      # Returns the ID to use for the cache used for checking if an object has
 | 
						|
      # already been enqueued or not.
 | 
						|
      #
 | 
						|
      # object - The object we may want to import.
 | 
						|
      def id_for_already_enqueued_cache(object)
 | 
						|
        raise NotImplementedError
 | 
						|
      end
 | 
						|
 | 
						|
      # The Sidekiq worker class used for scheduling the importing of objects in
 | 
						|
      # parallel.
 | 
						|
      def sidekiq_worker_class
 | 
						|
        raise NotImplementedError
 | 
						|
      end
 | 
						|
 | 
						|
      # The name of the method to call to retrieve the data to import.
 | 
						|
      def collection_method
 | 
						|
        raise NotImplementedError
 | 
						|
      end
 | 
						|
 | 
						|
      # The name of the method to call to retrieve the representation object
 | 
						|
      def representation_type
 | 
						|
        raise NotImplementedError
 | 
						|
      end
 | 
						|
 | 
						|
      def job_waiter
 | 
						|
        @job_waiter ||= begin
 | 
						|
          key = Gitlab::Cache::Import::Caching.read(job_waiter_cache_key)
 | 
						|
          key ||= Gitlab::Cache::Import::Caching.write(job_waiter_cache_key, JobWaiter.generate_key)
 | 
						|
          jobs_remaining = Gitlab::Cache::Import::Caching.read(job_waiter_remaining_cache_key).to_i || 0
 | 
						|
 | 
						|
          JobWaiter.new(jobs_remaining, key)
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      def already_enqueued?(object)
 | 
						|
        id = id_for_already_enqueued_cache(object)
 | 
						|
 | 
						|
        Gitlab::Cache::Import::Caching.set_includes?(already_enqueued_cache_key, id)
 | 
						|
      end
 | 
						|
 | 
						|
      # Marks the given object as "already enqueued".
 | 
						|
      def mark_as_enqueued(object)
 | 
						|
        id = id_for_already_enqueued_cache(object)
 | 
						|
 | 
						|
        Gitlab::Cache::Import::Caching.set_add(already_enqueued_cache_key, id)
 | 
						|
      end
 | 
						|
 | 
						|
      def calculate_job_delay(job_index)
 | 
						|
        runtime = Time.current - job_started_at
 | 
						|
        multiplier = (job_index / concurrent_import_jobs_limit.to_f)
 | 
						|
 | 
						|
        (multiplier * 1.minute) + 1.second - runtime
 | 
						|
      end
 | 
						|
 | 
						|
      def job_started_at
 | 
						|
        @job_started_at ||= Time.current
 | 
						|
      end
 | 
						|
 | 
						|
      def concurrent_import_jobs_limit
 | 
						|
        Gitlab::CurrentSettings.concurrent_bitbucket_import_jobs_limit
 | 
						|
      end
 | 
						|
    end
 | 
						|
  end
 | 
						|
end
 |