gitlab-ce/lib/gitlab/sidekiq_middleware/concurrency_limit/middleware.rb

112 lines
3.2 KiB
Ruby

# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module ConcurrencyLimit
class Middleware
def initialize(worker, job)
@worker = worker
@job = job
worker_class = worker.is_a?(Class) ? worker : worker.class
@worker_name = worker_class.name
end
# This will continue the middleware chain if the job should be scheduled
# It will return false if the job needs to be cancelled
def schedule
if should_defer_schedule?
defer_job!
return
end
yield
end
# This will continue the server middleware chain if the job should be
# executed.
# It will return false if the job should not be executed.
def perform
if should_defer_perform?
defer_job!
return
end
track_execution_start
yield
ensure
track_execution_end
end
private
attr_reader :job, :worker, :worker_name
def should_defer_schedule?
return false if Feature.disabled?(:sidekiq_concurrency_limit_middleware, Feature.current_request, type: :ops)
return false if job['at'] # scheduled jobs can be later assessed on enqueue
return false if resumed?
return false if worker_limit == 0
has_jobs_in_queue?
end
def should_defer_perform?
return false if Feature.disabled?(:sidekiq_concurrency_limit_middleware, Feature.current_request, type: :ops)
return false if resumed?
return true if has_jobs_in_queue?
if Feature.enabled?(:concurrency_limit_current_limit_from_redis, Feature.current_request)
concurrency_service.over_the_limit?(worker_name)
else
::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.over_the_limit?(worker: worker)
end
end
def concurrency_service
::Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService
end
def track_execution_start
return if Feature.disabled?(:sidekiq_concurrency_limit_middleware, Feature.current_request, type: :ops)
concurrency_service.track_execution_start(worker_name)
end
def track_execution_end
return if Feature.disabled?(:sidekiq_concurrency_limit_middleware, Feature.current_request, type: :ops)
concurrency_service.track_execution_end(worker_name)
end
def worker_limit
@worker_limit ||= ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: worker)
end
def resumed?
job['concurrency_limit_resume'] == true
end
def has_jobs_in_queue?
concurrency_service.has_jobs_in_queue?(worker_name)
end
def defer_job!
::Gitlab::SidekiqLogging::ConcurrencyLimitLogger.instance.deferred_log(job)
concurrency_service.add_to_queue!(
job,
current_context
)
end
def current_context
::Gitlab::ApplicationContext.current
end
end
end
end
end