209 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Ruby
		
	
	
	
			
		
		
	
	
			209 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Ruby
		
	
	
	
# frozen_string_literal: true
 | 
						|
 | 
						|
require 'optparse'
 | 
						|
require 'logger'
 | 
						|
require 'time'
 | 
						|
 | 
						|
module Gitlab
 | 
						|
  module SidekiqCluster
 | 
						|
    class CLI
 | 
						|
      CHECK_TERMINATE_INTERVAL_SECONDS = 1
 | 
						|
 | 
						|
      # How long to wait when asking for a clean termination.
 | 
						|
      # It maps the Sidekiq default timeout:
 | 
						|
      # https://github.com/mperham/sidekiq/wiki/Signals#term
 | 
						|
      #
 | 
						|
      # This value is passed to Sidekiq's `-t` if none
 | 
						|
      # is given through arguments.
 | 
						|
      DEFAULT_SOFT_TIMEOUT_SECONDS = 25
 | 
						|
 | 
						|
      # After surpassing the soft timeout.
 | 
						|
      DEFAULT_HARD_TIMEOUT_SECONDS = 5
 | 
						|
 | 
						|
      CommandError = Class.new(StandardError)
 | 
						|
 | 
						|
      def initialize(log_output = STDERR)
 | 
						|
        require_relative '../../../lib/gitlab/sidekiq_logging/json_formatter'
 | 
						|
 | 
						|
        # As recommended by https://github.com/mperham/sidekiq/wiki/Advanced-Options#concurrency
 | 
						|
        @max_concurrency = 50
 | 
						|
        @min_concurrency = 0
 | 
						|
        @environment = ENV['RAILS_ENV'] || 'development'
 | 
						|
        @pid = nil
 | 
						|
        @interval = 5
 | 
						|
        @alive = true
 | 
						|
        @processes = []
 | 
						|
        @logger = Logger.new(log_output)
 | 
						|
        @logger.formatter = ::Gitlab::SidekiqLogging::JSONFormatter.new
 | 
						|
        @rails_path = Dir.pwd
 | 
						|
        @dryrun = false
 | 
						|
      end
 | 
						|
 | 
						|
      def run(argv = ARGV)
 | 
						|
        if argv.empty?
 | 
						|
          raise CommandError,
 | 
						|
            'You must specify at least one queue to start a worker for'
 | 
						|
        end
 | 
						|
 | 
						|
        option_parser.parse!(argv)
 | 
						|
 | 
						|
        all_queues = SidekiqConfig::CliMethods.all_queues(@rails_path)
 | 
						|
        queue_names = SidekiqConfig::CliMethods.worker_queues(@rails_path)
 | 
						|
 | 
						|
        queue_groups = argv.map do |queues|
 | 
						|
          next queue_names if queues == '*'
 | 
						|
 | 
						|
          # When using the experimental queue query syntax, we treat
 | 
						|
          # each queue group as a worker attribute query, and resolve
 | 
						|
          # the queues for the queue group using this query.
 | 
						|
          if @experimental_queue_selector
 | 
						|
            SidekiqConfig::CliMethods.query_workers(queues, all_queues)
 | 
						|
          else
 | 
						|
            SidekiqConfig::CliMethods.expand_queues(queues.split(','), queue_names)
 | 
						|
          end
 | 
						|
        end
 | 
						|
 | 
						|
        if @negate_queues
 | 
						|
          queue_groups.map! { |queues| queue_names - queues }
 | 
						|
        end
 | 
						|
 | 
						|
        if queue_groups.all?(&:empty?)
 | 
						|
          raise CommandError,
 | 
						|
            'No queues found, you must select at least one queue'
 | 
						|
        end
 | 
						|
 | 
						|
        unless @dryrun
 | 
						|
          @logger.info("Starting cluster with #{queue_groups.length} processes")
 | 
						|
        end
 | 
						|
 | 
						|
        @processes = SidekiqCluster.start(
 | 
						|
          queue_groups,
 | 
						|
          env: @environment,
 | 
						|
          directory: @rails_path,
 | 
						|
          max_concurrency: @max_concurrency,
 | 
						|
          min_concurrency: @min_concurrency,
 | 
						|
          dryrun: @dryrun,
 | 
						|
          timeout: soft_timeout_seconds
 | 
						|
        )
 | 
						|
 | 
						|
        return if @dryrun
 | 
						|
 | 
						|
        write_pid
 | 
						|
        trap_signals
 | 
						|
        start_loop
 | 
						|
      end
 | 
						|
 | 
						|
      def write_pid
 | 
						|
        SidekiqCluster.write_pid(@pid) if @pid
 | 
						|
      end
 | 
						|
 | 
						|
      def soft_timeout_seconds
 | 
						|
        @soft_timeout_seconds || DEFAULT_SOFT_TIMEOUT_SECONDS
 | 
						|
      end
 | 
						|
 | 
						|
      # The amount of time it'll wait for killing the alive Sidekiq processes.
 | 
						|
      def hard_timeout_seconds
 | 
						|
        soft_timeout_seconds + DEFAULT_HARD_TIMEOUT_SECONDS
 | 
						|
      end
 | 
						|
 | 
						|
      def monotonic_time
 | 
						|
        Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_second)
 | 
						|
      end
 | 
						|
 | 
						|
      def continue_waiting?(deadline)
 | 
						|
        SidekiqCluster.any_alive?(@processes) && monotonic_time < deadline
 | 
						|
      end
 | 
						|
 | 
						|
      def hard_stop_stuck_pids
 | 
						|
        SidekiqCluster.signal_processes(SidekiqCluster.pids_alive(@processes), "-KILL")
 | 
						|
      end
 | 
						|
 | 
						|
      def wait_for_termination
 | 
						|
        deadline = monotonic_time + hard_timeout_seconds
 | 
						|
        sleep(CHECK_TERMINATE_INTERVAL_SECONDS) while continue_waiting?(deadline)
 | 
						|
 | 
						|
        hard_stop_stuck_pids
 | 
						|
      end
 | 
						|
 | 
						|
      def trap_signals
 | 
						|
        SidekiqCluster.trap_terminate do |signal|
 | 
						|
          @alive = false
 | 
						|
          SidekiqCluster.signal_processes(@processes, signal)
 | 
						|
          wait_for_termination
 | 
						|
        end
 | 
						|
 | 
						|
        SidekiqCluster.trap_forward do |signal|
 | 
						|
          SidekiqCluster.signal_processes(@processes, signal)
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      def start_loop
 | 
						|
        while @alive
 | 
						|
          sleep(@interval)
 | 
						|
 | 
						|
          unless SidekiqCluster.all_alive?(@processes)
 | 
						|
            # If a child process died we'll just terminate the whole cluster. It's up to
 | 
						|
            # runit and such to then restart the cluster.
 | 
						|
            @logger.info('A worker terminated, shutting down the cluster')
 | 
						|
 | 
						|
            SidekiqCluster.signal_processes(@processes, :TERM)
 | 
						|
            break
 | 
						|
          end
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      def option_parser
 | 
						|
        OptionParser.new do |opt|
 | 
						|
          opt.banner = "#{File.basename(__FILE__)} [QUEUE,QUEUE] [QUEUE] ... [OPTIONS]"
 | 
						|
 | 
						|
          opt.separator "\nOptions:\n"
 | 
						|
 | 
						|
          opt.on('-h', '--help', 'Shows this help message') do
 | 
						|
            abort opt.to_s
 | 
						|
          end
 | 
						|
 | 
						|
          opt.on('-m', '--max-concurrency INT', 'Maximum threads to use with Sidekiq (default: 50, 0 to disable)') do |int|
 | 
						|
            @max_concurrency = int.to_i
 | 
						|
          end
 | 
						|
 | 
						|
          opt.on('--min-concurrency INT', 'Minimum threads to use with Sidekiq (default: 0)') do |int|
 | 
						|
            @min_concurrency = int.to_i
 | 
						|
          end
 | 
						|
 | 
						|
          opt.on('-e', '--environment ENV', 'The application environment') do |env|
 | 
						|
            @environment = env
 | 
						|
          end
 | 
						|
 | 
						|
          opt.on('-P', '--pidfile PATH', 'Path to the PID file') do |pid|
 | 
						|
            @pid = pid
 | 
						|
          end
 | 
						|
 | 
						|
          opt.on('-r', '--require PATH', 'Location of the Rails application') do |path|
 | 
						|
            @rails_path = path
 | 
						|
          end
 | 
						|
 | 
						|
          opt.on('--experimental-queue-selector', 'EXPERIMENTAL: Run workers based on the provided selector') do |experimental_queue_selector|
 | 
						|
            @experimental_queue_selector = experimental_queue_selector
 | 
						|
          end
 | 
						|
 | 
						|
          opt.on('-n', '--negate', 'Run workers for all queues in sidekiq_queues.yml except the given ones') do
 | 
						|
            @negate_queues = true
 | 
						|
          end
 | 
						|
 | 
						|
          opt.on('-i', '--interval INT', 'The number of seconds to wait between worker checks') do |int|
 | 
						|
            @interval = int.to_i
 | 
						|
          end
 | 
						|
 | 
						|
          opt.on('-t', '--timeout INT', 'Graceful timeout for all running processes') do |timeout|
 | 
						|
            @soft_timeout_seconds = timeout.to_i
 | 
						|
          end
 | 
						|
 | 
						|
          opt.on('-d', '--dryrun', 'Print commands that would be run without this flag, and quit') do |int|
 | 
						|
            @dryrun = true
 | 
						|
          end
 | 
						|
        end
 | 
						|
      end
 | 
						|
    end
 | 
						|
  end
 | 
						|
end
 |