219 lines
		
	
	
		
			7.5 KiB
		
	
	
	
		
			Ruby
		
	
	
	
			
		
		
	
	
			219 lines
		
	
	
		
			7.5 KiB
		
	
	
	
		
			Ruby
		
	
	
	
# frozen_string_literal: true
 | 
						|
 | 
						|
require 'spec_helper'
 | 
						|
 | 
						|
RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do
 | 
						|
  def clear_queues
 | 
						|
    Sidekiq::Queue.new('authorized_projects').clear
 | 
						|
    Sidekiq::Queue.new('post_receive').clear
 | 
						|
    Sidekiq::RetrySet.new.clear
 | 
						|
    Sidekiq::ScheduledSet.new.clear
 | 
						|
  end
 | 
						|
 | 
						|
  around do |example|
 | 
						|
    clear_queues
 | 
						|
    Sidekiq::Testing.disable!(&example)
 | 
						|
    clear_queues
 | 
						|
  end
 | 
						|
 | 
						|
  describe '#execute', :aggregate_failures do
 | 
						|
    shared_examples 'processing a set' do
 | 
						|
      let(:migrator) { described_class.new(set_name) }
 | 
						|
 | 
						|
      let(:set_after) do
 | 
						|
        Sidekiq.redis { |c| c.zrange(set_name, 0, -1, with_scores: true) }
 | 
						|
          .map { |item, score| [Sidekiq.load_json(item), score] }
 | 
						|
      end
 | 
						|
 | 
						|
      context 'when the set is empty' do
 | 
						|
        it 'returns the number of scanned and migrated jobs' do
 | 
						|
          expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue')).to eq(scanned: 0, migrated: 0)
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      context 'when the set is not empty' do
 | 
						|
        it 'returns the number of scanned and migrated jobs' do
 | 
						|
          create_jobs
 | 
						|
 | 
						|
          expect(migrator.execute({})).to eq(scanned: 4, migrated: 0)
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      context 'when there are no matching jobs' do
 | 
						|
        it 'does not change any queue names' do
 | 
						|
          create_jobs(include_post_receive: false)
 | 
						|
 | 
						|
          expect(migrator.execute('PostReceive' => 'new_queue')).to eq(scanned: 3, migrated: 0)
 | 
						|
 | 
						|
          expect(set_after.length).to eq(3)
 | 
						|
          expect(set_after.map(&:first)).to all(include('queue' => 'authorized_projects',
 | 
						|
                                                        'class' => 'AuthorizedProjectsWorker'))
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      context 'when there are matching jobs' do
 | 
						|
        it 'migrates only the workers matching the given worker from the set' do
 | 
						|
          freeze_time do
 | 
						|
            create_jobs
 | 
						|
 | 
						|
            expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue')).to eq(scanned: 4, migrated: 3)
 | 
						|
 | 
						|
            set_after.each.with_index do |(item, score), i|
 | 
						|
              if item['class'] == 'AuthorizedProjectsWorker'
 | 
						|
                expect(item).to include('queue' => 'new_queue', 'args' => [i])
 | 
						|
              else
 | 
						|
                expect(item).to include('queue' => 'post_receive', 'args' => [i])
 | 
						|
              end
 | 
						|
 | 
						|
              expect(score).to be_within(schedule_jitter).of(i.succ.hours.from_now.to_i)
 | 
						|
            end
 | 
						|
          end
 | 
						|
        end
 | 
						|
 | 
						|
        it 'allows migrating multiple workers at once' do
 | 
						|
          freeze_time do
 | 
						|
            create_jobs
 | 
						|
 | 
						|
            expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue', 'PostReceive' => 'another_queue'))
 | 
						|
              .to eq(scanned: 4, migrated: 4)
 | 
						|
 | 
						|
            set_after.each.with_index do |(item, score), i|
 | 
						|
              if item['class'] == 'AuthorizedProjectsWorker'
 | 
						|
                expect(item).to include('queue' => 'new_queue', 'args' => [i])
 | 
						|
              else
 | 
						|
                expect(item).to include('queue' => 'another_queue', 'args' => [i])
 | 
						|
              end
 | 
						|
 | 
						|
              expect(score).to be_within(schedule_jitter).of(i.succ.hours.from_now.to_i)
 | 
						|
            end
 | 
						|
          end
 | 
						|
        end
 | 
						|
 | 
						|
        it 'allows migrating multiple workers to the same queue' do
 | 
						|
          freeze_time do
 | 
						|
            create_jobs
 | 
						|
 | 
						|
            expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue', 'PostReceive' => 'new_queue'))
 | 
						|
              .to eq(scanned: 4, migrated: 4)
 | 
						|
 | 
						|
            set_after.each.with_index do |(item, score), i|
 | 
						|
              expect(item).to include('queue' => 'new_queue', 'args' => [i])
 | 
						|
              expect(score).to be_within(schedule_jitter).of(i.succ.hours.from_now.to_i)
 | 
						|
            end
 | 
						|
          end
 | 
						|
        end
 | 
						|
 | 
						|
        it 'does not try to migrate jobs that are removed from the set during the migration' do
 | 
						|
          freeze_time do
 | 
						|
            create_jobs
 | 
						|
 | 
						|
            allow(migrator).to receive(:migrate_job).and_wrap_original do |meth, *args|
 | 
						|
              Sidekiq.redis { |c| c.zrem(set_name, args.first) }
 | 
						|
 | 
						|
              meth.call(*args)
 | 
						|
            end
 | 
						|
 | 
						|
            expect(migrator.execute('PostReceive' => 'new_queue')).to eq(scanned: 4, migrated: 0)
 | 
						|
 | 
						|
            expect(set_after.length).to eq(3)
 | 
						|
            expect(set_after.map(&:first)).to all(include('queue' => 'authorized_projects'))
 | 
						|
          end
 | 
						|
        end
 | 
						|
 | 
						|
        it 'does not try to migrate unmatched jobs that are added to the set during the migration' do
 | 
						|
          create_jobs
 | 
						|
 | 
						|
          calls = 0
 | 
						|
 | 
						|
          allow(migrator).to receive(:migrate_job).and_wrap_original do |meth, *args|
 | 
						|
            if calls == 0
 | 
						|
              travel_to(5.hours.from_now) { create_jobs(include_post_receive: false) }
 | 
						|
            end
 | 
						|
 | 
						|
            calls += 1
 | 
						|
 | 
						|
            meth.call(*args)
 | 
						|
          end
 | 
						|
 | 
						|
          expect(migrator.execute('PostReceive' => 'new_queue')).to eq(scanned: 4, migrated: 1)
 | 
						|
 | 
						|
          expect(set_after.group_by { |job| job.first['queue'] }.transform_values(&:count))
 | 
						|
            .to eq('authorized_projects' => 6, 'new_queue' => 1)
 | 
						|
        end
 | 
						|
 | 
						|
        it 'iterates through the entire set of jobs' do
 | 
						|
          50.times do |i|
 | 
						|
            travel_to(i.hours.from_now) { create_jobs }
 | 
						|
          end
 | 
						|
 | 
						|
          expect(migrator.execute('NonExistentWorker' => 'new_queue')).to eq(scanned: 200, migrated: 0)
 | 
						|
 | 
						|
          expect(set_after.length).to eq(200)
 | 
						|
        end
 | 
						|
 | 
						|
        it 'logs output at the start, finish, and every LOG_FREQUENCY jobs' do
 | 
						|
          freeze_time do
 | 
						|
            create_jobs
 | 
						|
 | 
						|
            stub_const("#{described_class}::LOG_FREQUENCY", 2)
 | 
						|
 | 
						|
            logger = Logger.new(StringIO.new)
 | 
						|
            migrator = described_class.new(set_name, logger: logger)
 | 
						|
 | 
						|
            expect(logger).to receive(:info).with(a_string_matching('Processing')).once.ordered
 | 
						|
            expect(logger).to receive(:info).with(a_string_matching('In progress')).once.ordered
 | 
						|
            expect(logger).to receive(:info).with(a_string_matching('Done')).once.ordered
 | 
						|
 | 
						|
            expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue', 'PostReceive' => 'new_queue'))
 | 
						|
              .to eq(scanned: 4, migrated: 4)
 | 
						|
          end
 | 
						|
        end
 | 
						|
      end
 | 
						|
    end
 | 
						|
 | 
						|
    context 'scheduled jobs' do
 | 
						|
      let(:set_name) { 'schedule' }
 | 
						|
      let(:schedule_jitter) { 0 }
 | 
						|
 | 
						|
      def create_jobs(include_post_receive: true)
 | 
						|
        AuthorizedProjectsWorker.perform_in(1.hour, 0)
 | 
						|
        AuthorizedProjectsWorker.perform_in(2.hours, 1)
 | 
						|
        PostReceive.perform_in(3.hours, 2) if include_post_receive
 | 
						|
        AuthorizedProjectsWorker.perform_in(4.hours, 3)
 | 
						|
      end
 | 
						|
 | 
						|
      it_behaves_like 'processing a set'
 | 
						|
    end
 | 
						|
 | 
						|
    context 'retried jobs' do
 | 
						|
      let(:set_name) { 'retry' }
 | 
						|
      # Account for Sidekiq retry jitter
 | 
						|
      # https://github.com/mperham/sidekiq/blob/3575ccb44c688dd08bfbfd937696260b12c622fb/lib/sidekiq/job_retry.rb#L217
 | 
						|
      let(:schedule_jitter) { 10 }
 | 
						|
 | 
						|
      # Try to mimic as closely as possible what Sidekiq will actually
 | 
						|
      # do to retry a job.
 | 
						|
      def retry_in(klass, time, args)
 | 
						|
        message = { 'class' => klass.name, 'args' => [args], 'retry' => true }.to_json
 | 
						|
 | 
						|
        allow(klass).to receive(:sidekiq_retry_in_block).and_return(proc { time })
 | 
						|
 | 
						|
        begin
 | 
						|
          Sidekiq::JobRetry.new.local(klass, message, klass.queue) { raise 'boom' }
 | 
						|
        rescue Sidekiq::JobRetry::Skip
 | 
						|
          # Sidekiq scheduled the retry
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      def create_jobs(include_post_receive: true)
 | 
						|
        retry_in(AuthorizedProjectsWorker, 1.hour, 0)
 | 
						|
        retry_in(AuthorizedProjectsWorker, 2.hours, 1)
 | 
						|
        retry_in(PostReceive, 3.hours, 2) if include_post_receive
 | 
						|
        retry_in(AuthorizedProjectsWorker, 4.hours, 3)
 | 
						|
      end
 | 
						|
 | 
						|
      it_behaves_like 'processing a set'
 | 
						|
    end
 | 
						|
  end
 | 
						|
end
 |