199 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Ruby
		
	
	
	
			
		
		
	
	
			199 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Ruby
		
	
	
	
# frozen_string_literal: true
 | 
						|
 | 
						|
module Gitlab
 | 
						|
  module SidekiqDaemon
 | 
						|
    class Monitor < Daemon
 | 
						|
      include ::Gitlab::Utils::StrongMemoize
 | 
						|
      extend ::Gitlab::Utils::Override
 | 
						|
 | 
						|
      NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'
 | 
						|
      CANCEL_DEADLINE = 24.hours.seconds
 | 
						|
      RECONNECT_TIME = 3.seconds
 | 
						|
 | 
						|
      # We use exception derived from `Exception`
 | 
						|
      # to consider this as an very low-level exception
 | 
						|
      # that should not be caught by application
 | 
						|
      CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException
 | 
						|
 | 
						|
      attr_reader :jobs
 | 
						|
      attr_reader :jobs_mutex
 | 
						|
 | 
						|
      def initialize
 | 
						|
        super
 | 
						|
 | 
						|
        @jobs = {}
 | 
						|
        @jobs_mutex = Mutex.new
 | 
						|
      end
 | 
						|
 | 
						|
      override :thread_name
 | 
						|
      def thread_name
 | 
						|
        "job_monitor"
 | 
						|
      end
 | 
						|
 | 
						|
      def within_job(worker_class, jid, queue)
 | 
						|
        jobs_mutex.synchronize do
 | 
						|
          jobs[jid] = { worker_class: worker_class, thread: Thread.current, started_at: Gitlab::Metrics::System.monotonic_time }
 | 
						|
        end
 | 
						|
 | 
						|
        if cancelled?(jid)
 | 
						|
          Sidekiq.logger.warn(
 | 
						|
            class: self.class.to_s,
 | 
						|
            action: 'run',
 | 
						|
            queue: queue,
 | 
						|
            jid: jid,
 | 
						|
            canceled: true
 | 
						|
          )
 | 
						|
          raise CancelledError
 | 
						|
        end
 | 
						|
 | 
						|
        yield
 | 
						|
      ensure
 | 
						|
        jobs_mutex.synchronize do
 | 
						|
          jobs.delete(jid)
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      def self.cancel_job(jid)
 | 
						|
        payload = {
 | 
						|
          action: 'cancel',
 | 
						|
          jid: jid
 | 
						|
        }.to_json
 | 
						|
 | 
						|
        ::Gitlab::Redis::SharedState.with do |redis|
 | 
						|
          redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1)
 | 
						|
          redis.publish(NOTIFICATION_CHANNEL, payload)
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      private
 | 
						|
 | 
						|
      def run_thread
 | 
						|
        return unless notification_channel_enabled?
 | 
						|
 | 
						|
        begin
 | 
						|
          Sidekiq.logger.info(
 | 
						|
            class: self.class.to_s,
 | 
						|
            action: 'start',
 | 
						|
            message: 'Starting Monitor Daemon'
 | 
						|
          )
 | 
						|
 | 
						|
          while enabled?
 | 
						|
            process_messages
 | 
						|
            sleep(RECONNECT_TIME)
 | 
						|
          end
 | 
						|
 | 
						|
        ensure
 | 
						|
          Sidekiq.logger.warn(
 | 
						|
            class: self.class.to_s,
 | 
						|
            action: 'stop',
 | 
						|
            message: 'Stopping Monitor Daemon'
 | 
						|
          )
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      def stop_working
 | 
						|
        thread.raise(Interrupt) if thread.alive?
 | 
						|
      end
 | 
						|
 | 
						|
      def process_messages
 | 
						|
        ::Gitlab::Redis::SharedState.with do |redis|
 | 
						|
          redis.subscribe(NOTIFICATION_CHANNEL) do |on|
 | 
						|
            on.message do |channel, message|
 | 
						|
              process_message(message)
 | 
						|
            end
 | 
						|
          end
 | 
						|
        end
 | 
						|
      rescue Exception => e # rubocop:disable Lint/RescueException
 | 
						|
        Sidekiq.logger.warn(
 | 
						|
          class: self.class.to_s,
 | 
						|
          action: 'exception',
 | 
						|
          message: e.message
 | 
						|
        )
 | 
						|
 | 
						|
        # we re-raise system exceptions
 | 
						|
        raise e unless e.is_a?(StandardError)
 | 
						|
      end
 | 
						|
 | 
						|
      def process_message(message)
 | 
						|
        Sidekiq.logger.info(
 | 
						|
          class: self.class.to_s,
 | 
						|
          channel: NOTIFICATION_CHANNEL,
 | 
						|
          message: 'Received payload on channel',
 | 
						|
          payload: message
 | 
						|
        )
 | 
						|
 | 
						|
        message = safe_parse(message)
 | 
						|
        return unless message
 | 
						|
 | 
						|
        case message['action']
 | 
						|
        when 'cancel'
 | 
						|
          process_job_cancel(message['jid'])
 | 
						|
        else
 | 
						|
          # unknown message
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      def safe_parse(message)
 | 
						|
        Gitlab::Json.parse(message)
 | 
						|
      rescue JSON::ParserError
 | 
						|
      end
 | 
						|
 | 
						|
      def process_job_cancel(jid)
 | 
						|
        return unless jid
 | 
						|
 | 
						|
        # try to find thread without lock
 | 
						|
        return unless find_thread_unsafe(jid)
 | 
						|
 | 
						|
        Thread.new do
 | 
						|
          # try to find a thread, but with guaranteed
 | 
						|
          # that handle for thread corresponds to actually
 | 
						|
          # running job
 | 
						|
          find_thread_with_lock(jid) do |thread|
 | 
						|
            Sidekiq.logger.warn(
 | 
						|
              class: self.class.to_s,
 | 
						|
              action: 'cancel',
 | 
						|
              message: 'Canceling thread with CancelledError',
 | 
						|
              jid: jid,
 | 
						|
              thread_id: thread.object_id
 | 
						|
            )
 | 
						|
 | 
						|
            thread&.raise(CancelledError)
 | 
						|
          end
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      # This method needs to be thread-safe
 | 
						|
      # This is why it passes thread in block,
 | 
						|
      # to ensure that we do process this thread
 | 
						|
      def find_thread_unsafe(jid)
 | 
						|
        jobs.dig(jid, :thread)
 | 
						|
      end
 | 
						|
 | 
						|
      def find_thread_with_lock(jid)
 | 
						|
        # don't try to lock if we cannot find the thread
 | 
						|
        return unless find_thread_unsafe(jid)
 | 
						|
 | 
						|
        jobs_mutex.synchronize do
 | 
						|
          find_thread_unsafe(jid).tap do |thread|
 | 
						|
            yield(thread) if thread
 | 
						|
          end
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      def cancelled?(jid)
 | 
						|
        ::Gitlab::Redis::SharedState.with do |redis|
 | 
						|
          redis.exists?(self.class.cancel_job_key(jid)) # rubocop:disable CodeReuse/ActiveRecord
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      def self.cancel_job_key(jid)
 | 
						|
        "sidekiq:cancel:#{jid}"
 | 
						|
      end
 | 
						|
 | 
						|
      def notification_channel_enabled?
 | 
						|
        ENV.fetch("SIDEKIQ_MONITOR_WORKER", 0).to_i.nonzero?
 | 
						|
      end
 | 
						|
    end
 | 
						|
  end
 | 
						|
end
 |