126 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Ruby
		
	
	
	
			
		
		
	
	
			126 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Ruby
		
	
	
	
# frozen_string_literal: true
 | 
						|
 | 
						|
require 'yaml'
 | 
						|
require 'set'
 | 
						|
 | 
						|
# These methods are called by `sidekiq-cluster`, which runs outside of
 | 
						|
# the bundler/Rails context, so we cannot use any gem or Rails methods.
 | 
						|
module Gitlab
 | 
						|
  module SidekiqConfig
 | 
						|
    module CliMethods
 | 
						|
      # The methods in this module are used as module methods
 | 
						|
      # rubocop:disable Gitlab/ModuleWithInstanceVariables
 | 
						|
      extend self
 | 
						|
 | 
						|
      QUEUE_CONFIG_PATHS = begin
 | 
						|
        result = %w[app/workers/all_queues.yml]
 | 
						|
        result << 'ee/app/workers/all_queues.yml' if Gitlab.ee?
 | 
						|
        result
 | 
						|
      end.freeze
 | 
						|
 | 
						|
      QUERY_OR_OPERATOR = '|'
 | 
						|
      QUERY_AND_OPERATOR = '&'
 | 
						|
      QUERY_CONCATENATE_OPERATOR = ','
 | 
						|
      QUERY_TERM_REGEX = %r{^(\w+)(!?=)([\w:#{QUERY_CONCATENATE_OPERATOR}]+)}.freeze
 | 
						|
 | 
						|
      QUERY_PREDICATES = {
 | 
						|
        feature_category: :to_sym,
 | 
						|
        has_external_dependencies: lambda { |value| value == 'true' },
 | 
						|
        name: :to_s,
 | 
						|
        resource_boundary: :to_sym,
 | 
						|
        urgency: :to_sym
 | 
						|
      }.freeze
 | 
						|
 | 
						|
      QueryError = Class.new(StandardError)
 | 
						|
      InvalidTerm = Class.new(QueryError)
 | 
						|
      UnknownOperator = Class.new(QueryError)
 | 
						|
      UnknownPredicate = Class.new(QueryError)
 | 
						|
 | 
						|
      def all_queues(rails_path = Rails.root.to_s)
 | 
						|
        @worker_queues ||= {}
 | 
						|
 | 
						|
        @worker_queues[rails_path] ||= QUEUE_CONFIG_PATHS.flat_map do |path|
 | 
						|
          full_path = File.join(rails_path, path)
 | 
						|
 | 
						|
          File.exist?(full_path) ? YAML.load_file(full_path) : []
 | 
						|
        end
 | 
						|
      end
 | 
						|
      # rubocop:enable Gitlab/ModuleWithInstanceVariables
 | 
						|
 | 
						|
      def worker_queues(rails_path = Rails.root.to_s)
 | 
						|
        # https://gitlab.com/gitlab-org/gitlab/issues/199230
 | 
						|
        worker_names(all_queues(rails_path))
 | 
						|
      end
 | 
						|
 | 
						|
      def expand_queues(queues, all_queues = self.worker_queues)
 | 
						|
        return [] if queues.empty?
 | 
						|
 | 
						|
        queues_set = all_queues.to_set
 | 
						|
 | 
						|
        queues.flat_map do |queue|
 | 
						|
          [queue, *queues_set.grep(/\A#{queue}:/)]
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      def query_workers(query_string, queues)
 | 
						|
        worker_names(queues.select(&query_string_to_lambda(query_string)))
 | 
						|
      end
 | 
						|
 | 
						|
      def clear_memoization!
 | 
						|
        if instance_variable_defined?('@worker_queues')
 | 
						|
          remove_instance_variable('@worker_queues')
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      private
 | 
						|
 | 
						|
      def worker_names(workers)
 | 
						|
        workers.map { |queue| queue.is_a?(Hash) ? queue[:name] : queue }
 | 
						|
      end
 | 
						|
 | 
						|
      def query_string_to_lambda(query_string)
 | 
						|
        or_clauses = query_string.split(QUERY_OR_OPERATOR).map do |and_clauses_string|
 | 
						|
          and_clauses_predicates = and_clauses_string.split(QUERY_AND_OPERATOR).map do |term|
 | 
						|
            predicate_for_term(term)
 | 
						|
          end
 | 
						|
 | 
						|
          lambda { |worker| and_clauses_predicates.all? { |predicate| predicate.call(worker) } }
 | 
						|
        end
 | 
						|
 | 
						|
        lambda { |worker| or_clauses.any? { |predicate| predicate.call(worker) } }
 | 
						|
      end
 | 
						|
 | 
						|
      def predicate_for_term(term)
 | 
						|
        match = term.match(QUERY_TERM_REGEX)
 | 
						|
 | 
						|
        raise InvalidTerm.new("Invalid term: #{term}") unless match
 | 
						|
 | 
						|
        _, lhs, op, rhs = *match
 | 
						|
 | 
						|
        predicate_for_op(op, predicate_factory(lhs, rhs.split(QUERY_CONCATENATE_OPERATOR)))
 | 
						|
      end
 | 
						|
 | 
						|
      def predicate_for_op(op, predicate)
 | 
						|
        case op
 | 
						|
        when '='
 | 
						|
          predicate
 | 
						|
        when '!='
 | 
						|
          lambda { |worker| !predicate.call(worker) }
 | 
						|
        else
 | 
						|
          # This is unreachable because InvalidTerm will be raised instead, but
 | 
						|
          # keeping it allows to guard against that changing in future.
 | 
						|
          raise UnknownOperator.new("Unknown operator: #{op}")
 | 
						|
        end
 | 
						|
      end
 | 
						|
 | 
						|
      def predicate_factory(lhs, values)
 | 
						|
        values_block = QUERY_PREDICATES[lhs.to_sym]
 | 
						|
 | 
						|
        raise UnknownPredicate.new("Unknown predicate: #{lhs}") unless values_block
 | 
						|
 | 
						|
        lambda { |queue| values.map(&values_block).include?(queue[lhs.to_sym]) }
 | 
						|
      end
 | 
						|
    end
 | 
						|
  end
 | 
						|
end
 |