181 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			Ruby
		
	
	
	
			
		
		
	
	
			181 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			Ruby
		
	
	
	
# frozen_string_literal: true
 | 
						|
 | 
						|
module Gitlab
 | 
						|
  module Database
 | 
						|
    module PartitioningMigrationHelpers
 | 
						|
      module IndexHelpers
 | 
						|
        include Gitlab::Database::MigrationHelpers
 | 
						|
        include Gitlab::Database::SchemaHelpers
 | 
						|
 | 
						|
        DuplicatedIndexesError = Class.new(StandardError)
 | 
						|
 | 
						|
        ERROR_SCOPE = 'index'
 | 
						|
 | 
						|
        # Concurrently creates a new index on a partitioned table. In concept this works similarly to
 | 
						|
        # `add_concurrent_index`, and won't block reads or writes on the table while the index is being built.
 | 
						|
        #
 | 
						|
        # A special helper is required for partitioning because Postgres does not support concurrently building indexes
 | 
						|
        # on partitioned tables. This helper concurrently adds the same index to each partition, and creates the final
 | 
						|
        # index on the parent table once all of the partitions are indexed. This is the recommended safe way to add
 | 
						|
        # indexes to partitioned tables.
 | 
						|
        #
 | 
						|
        # Example:
 | 
						|
        #
 | 
						|
        #     add_concurrent_partitioned_index :users, :some_column
 | 
						|
        #
 | 
						|
        # See Rails' `add_index` for more info on the available arguments.
 | 
						|
        def add_concurrent_partitioned_index(table_name, column_names, options = {})
 | 
						|
          assert_not_in_transaction_block(scope: ERROR_SCOPE)
 | 
						|
 | 
						|
          raise ArgumentError, 'A name is required for indexes added to partitioned tables' unless options[:name]
 | 
						|
 | 
						|
          partitioned_table = find_partitioned_table(table_name)
 | 
						|
 | 
						|
          if index_name_exists?(table_name, options[:name])
 | 
						|
            Gitlab::AppLogger.warn "Index not created because it already exists (this may be due to an aborted" \
 | 
						|
              " migration or similar): table_name: #{table_name}, index_name: #{options[:name]}"
 | 
						|
 | 
						|
            return
 | 
						|
          end
 | 
						|
 | 
						|
          partitioned_table.postgres_partitions.order(:name).each do |partition|
 | 
						|
            partition_index_name = generated_index_name(partition.identifier, options[:name])
 | 
						|
            partition_options = options.merge(name: partition_index_name, allow_partition: true)
 | 
						|
 | 
						|
            add_concurrent_index(partition.identifier, column_names, partition_options)
 | 
						|
          end
 | 
						|
 | 
						|
          with_lock_retries do
 | 
						|
            add_index(table_name, column_names, **options)
 | 
						|
          end
 | 
						|
        end
 | 
						|
 | 
						|
        # Safely removes an existing index from a partitioned table. The method name is a bit inaccurate as it does not
 | 
						|
        # drop the index concurrently, but it's named as such to maintain consistency with other similar helpers, and
 | 
						|
        # indicate that this should be safe to use in a production environment.
 | 
						|
        #
 | 
						|
        # In current versions of Postgres it's impossible to drop an index concurrently, or drop an index from an
 | 
						|
        # individual partition that exists across the entire partitioned table. As a result this helper drops the index
 | 
						|
        # from the parent table, which automatically cascades to all partitions. While this does require an exclusive
 | 
						|
        # lock, dropping an index is a fast operation that won't block the table for a significant period of time.
 | 
						|
        #
 | 
						|
        # Example:
 | 
						|
        #
 | 
						|
        #     remove_concurrent_partitioned_index_by_name :users, 'index_name_goes_here'
 | 
						|
        def remove_concurrent_partitioned_index_by_name(table_name, index_name)
 | 
						|
          assert_not_in_transaction_block(scope: ERROR_SCOPE)
 | 
						|
 | 
						|
          find_partitioned_table(table_name)
 | 
						|
 | 
						|
          unless index_name_exists?(table_name, index_name)
 | 
						|
            Gitlab::AppLogger.warn "Index not removed because it does not exist (this may be due to an aborted " \
 | 
						|
              "migration or similar): table_name: #{table_name}, index_name: #{index_name}"
 | 
						|
 | 
						|
            return
 | 
						|
          end
 | 
						|
 | 
						|
          with_lock_retries do
 | 
						|
            remove_index(table_name, name: index_name)
 | 
						|
          end
 | 
						|
        end
 | 
						|
 | 
						|
        # Finds duplicate indexes for a given schema and table. This finds
 | 
						|
        # indexes where the index definition is identical but the names are
 | 
						|
        # different. Returns an array of arrays containing duplicate index name
 | 
						|
        # pairs.
 | 
						|
        #
 | 
						|
        # Example:
 | 
						|
        #
 | 
						|
        #     find_duplicate_indexes('table_name_goes_here')
 | 
						|
        def find_duplicate_indexes(table_name, schema_name: connection.current_schema)
 | 
						|
          find_indexes(table_name, schema_name: schema_name)
 | 
						|
            .group_by { |r| r['index_id'] }
 | 
						|
            .select { |_, v| v.size > 1 }
 | 
						|
            .map { |_, indexes| indexes.map { |index| index['index_name'] } }
 | 
						|
        end
 | 
						|
 | 
						|
        # Retrieves a hash of index names for a given table and schema, by index
 | 
						|
        # definition.
 | 
						|
        #
 | 
						|
        # Example:
 | 
						|
        #
 | 
						|
        #     indexes_by_definition_for_table('table_name_goes_here')
 | 
						|
        #
 | 
						|
        # Returns:
 | 
						|
        #
 | 
						|
        #     {
 | 
						|
        #       "CREATE _ btree (created_at)" => "index_on_created_at"
 | 
						|
        #     }
 | 
						|
        def indexes_by_definition_for_table(table_name, schema_name: connection.current_schema)
 | 
						|
          duplicate_indexes = find_duplicate_indexes(table_name, schema_name: schema_name)
 | 
						|
 | 
						|
          unless duplicate_indexes.empty?
 | 
						|
            raise DuplicatedIndexesError, "#{table_name} has duplicate indexes: #{duplicate_indexes}"
 | 
						|
          end
 | 
						|
 | 
						|
          find_indexes(table_name, schema_name: schema_name)
 | 
						|
            .each_with_object({}) { |row, hash| hash[row['index_id']] = row['index_name'] }
 | 
						|
        end
 | 
						|
 | 
						|
        # Renames indexes for a given table and schema, mapping by index
 | 
						|
        # definition, to a hash of new index names.
 | 
						|
        #
 | 
						|
        # Example:
 | 
						|
        #
 | 
						|
        #     index_names = indexes_by_definition_for_table('source_table_name_goes_here')
 | 
						|
        #     drop_table('source_table_name_goes_here')
 | 
						|
        #     rename_indexes_for_table('destination_table_name_goes_here', index_names)
 | 
						|
        def rename_indexes_for_table(table_name, new_index_names, schema_name: connection.current_schema)
 | 
						|
          current_index_names = indexes_by_definition_for_table(table_name, schema_name: schema_name)
 | 
						|
          rename_indexes(current_index_names, new_index_names, schema_name: schema_name)
 | 
						|
        end
 | 
						|
 | 
						|
        private
 | 
						|
 | 
						|
        def find_indexes(table_name, schema_name: connection.current_schema)
 | 
						|
          indexes = connection.select_all(<<~SQL, 'SQL', [schema_name, table_name])
 | 
						|
            SELECT n.nspname AS schema_name,
 | 
						|
                   c.relname AS table_name,
 | 
						|
                   i.relname AS index_name,
 | 
						|
                   regexp_replace(pg_get_indexdef(i.oid), 'INDEX .*? USING', '_') AS index_id
 | 
						|
            FROM pg_index x
 | 
						|
              JOIN pg_class c ON c.oid = x.indrelid
 | 
						|
              JOIN pg_class i ON i.oid = x.indexrelid
 | 
						|
              LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
 | 
						|
            WHERE (c.relkind = ANY (ARRAY['r'::"char", 'm'::"char", 'p'::"char"]))
 | 
						|
              AND (i.relkind = ANY (ARRAY['i'::"char", 'I'::"char"]))
 | 
						|
              AND n.nspname = $1
 | 
						|
              AND c.relname = $2;
 | 
						|
          SQL
 | 
						|
 | 
						|
          indexes.to_a
 | 
						|
        end
 | 
						|
 | 
						|
        def find_partitioned_table(table_name)
 | 
						|
          partitioned_table = Gitlab::Database::PostgresPartitionedTable.find_by_name_in_current_schema(table_name)
 | 
						|
 | 
						|
          raise ArgumentError, "#{table_name} is not a partitioned table" unless partitioned_table
 | 
						|
 | 
						|
          partitioned_table
 | 
						|
        end
 | 
						|
 | 
						|
        def generated_index_name(partition_name, index_name)
 | 
						|
          object_name("#{partition_name}_#{index_name}", 'index')
 | 
						|
        end
 | 
						|
 | 
						|
        def rename_indexes(from, to, schema_name: connection.current_schema)
 | 
						|
          indexes_to_rename = from.select { |index_id, _| to.has_key?(index_id) }
 | 
						|
          statements = indexes_to_rename.map do |index_id, index_name|
 | 
						|
            <<~SQL
 | 
						|
              ALTER INDEX #{connection.quote_table_name("#{schema_name}.#{connection.quote_column_name(index_name)}")}
 | 
						|
                          RENAME TO #{connection.quote_column_name(to[index_id])}
 | 
						|
            SQL
 | 
						|
          end
 | 
						|
 | 
						|
          connection.execute(statements.join(';'))
 | 
						|
        end
 | 
						|
      end
 | 
						|
    end
 | 
						|
  end
 | 
						|
end
 |