82 lines
2.4 KiB
Ruby
82 lines
2.4 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
module Gitlab
|
|
module SidekiqMiddleware
|
|
module ConcurrencyLimit
|
|
class ConcurrencyLimitService
|
|
REDIS_KEY_PREFIX = 'sidekiq:concurrency_limit'
|
|
|
|
delegate :add_to_queue!, :queue_size, :metadata_key, :has_jobs_in_queue?, :resume_processing!,
|
|
to: :@queue_manager
|
|
|
|
delegate :track_execution_start, :track_execution_end, :cleanup_stale_trackers,
|
|
:concurrent_worker_count, to: :@worker_execution_tracker
|
|
|
|
delegate :current_limit, :set_current_limit!, to: :@limit_manager
|
|
|
|
def initialize(worker_name)
|
|
@worker_name = worker_name
|
|
@queue_manager = QueueManager.new(worker_name: worker_name, prefix: REDIS_KEY_PREFIX)
|
|
@worker_execution_tracker = WorkerExecutionTracker.new(worker_name: worker_name, prefix: REDIS_KEY_PREFIX)
|
|
@limit_manager = LimitManager.new(worker_name: worker_name, prefix: REDIS_KEY_PREFIX)
|
|
end
|
|
|
|
class << self
|
|
def add_to_queue!(job, context)
|
|
new(job['class']).add_to_queue!(job, context)
|
|
end
|
|
|
|
def has_jobs_in_queue?(worker_name)
|
|
new(worker_name).has_jobs_in_queue?
|
|
end
|
|
|
|
def resume_processing!(worker_name, limit:)
|
|
new(worker_name).resume_processing!(limit: limit)
|
|
end
|
|
|
|
def queue_size(worker_name)
|
|
new(worker_name).queue_size
|
|
end
|
|
|
|
def metadata_key(worker_name)
|
|
new(worker_name).metadata_key
|
|
end
|
|
|
|
def cleanup_stale_trackers(worker_name)
|
|
new(worker_name).cleanup_stale_trackers
|
|
end
|
|
|
|
def track_execution_start(worker_name)
|
|
new(worker_name).track_execution_start
|
|
end
|
|
|
|
def track_execution_end(worker_name)
|
|
new(worker_name).track_execution_end
|
|
end
|
|
|
|
def concurrent_worker_count(worker_name)
|
|
new(worker_name).concurrent_worker_count
|
|
end
|
|
|
|
def current_limit(worker_name)
|
|
new(worker_name).current_limit
|
|
end
|
|
|
|
def set_current_limit!(worker_name, limit:)
|
|
new(worker_name).set_current_limit!(limit)
|
|
end
|
|
|
|
def over_the_limit?(worker_name)
|
|
service = new(worker_name)
|
|
limit = service.current_limit
|
|
|
|
return false if limit == 0
|
|
|
|
service.concurrent_worker_count >= limit
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|