mirror of https://github.com/grafana/grafana.git
[unified-storage/search] Don't expire file-based indexes, check for resource stats when building index on-demand (#107886)
* Get ResourceStats before indexing * Replaced localcache.CacheService to handle expiration faster (localcache.CacheService / gocache.Cache only expires values at specific interval, but we need to close index faster) * singleflight getOrBuildIndex for the same key * expire only in-memory indexes * file-based indexes have new name on each rebuild * Sanitize file path segments, verify that generated path is within the root dir. * Add comment and test for cleanOldIndexes.
This commit is contained in:
parent
2e568ef672
commit
8fd5739576
|
@ -14,6 +14,7 @@ import (
|
|||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/sync/singleflight"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
|
||||
|
@ -80,28 +81,16 @@ type ResourceIndex interface {
|
|||
|
||||
// SearchBackend contains the technology specific logic to support search
|
||||
type SearchBackend interface {
|
||||
// This will return nil if the key does not exist
|
||||
// GetIndex returns existing index, or nil.
|
||||
GetIndex(ctx context.Context, key NamespacedResource) (ResourceIndex, error)
|
||||
|
||||
// Build an index from scratch
|
||||
BuildIndex(ctx context.Context,
|
||||
key NamespacedResource,
|
||||
// BuildIndex builds an index from scratch.
|
||||
// Depending on the size, the backend may choose different options (eg: memory vs disk).
|
||||
// The last known resource version can be used to detect that nothing has changed, and existing on-disk index can be reused.
|
||||
// The builder will write all documents before returning.
|
||||
BuildIndex(ctx context.Context, key NamespacedResource, size int64, resourceVersion int64, nonStandardFields SearchableDocumentFields, builder func(index ResourceIndex) (int64, error)) (ResourceIndex, error)
|
||||
|
||||
// When the size is known, it will be passed along here
|
||||
// Depending on the size, the backend may choose different options (eg: memory vs disk)
|
||||
size int64,
|
||||
|
||||
// The last known resource version (can be used to know that nothing has changed)
|
||||
resourceVersion int64,
|
||||
|
||||
// The non-standard index fields
|
||||
fields SearchableDocumentFields,
|
||||
|
||||
// The builder will write all documents before returning
|
||||
builder func(index ResourceIndex) (int64, error),
|
||||
) (ResourceIndex, error)
|
||||
|
||||
// Gets the total number of documents across all indexes
|
||||
// TotalDocs returns the total number of documents across all indexes.
|
||||
TotalDocs() int64
|
||||
}
|
||||
|
||||
|
@ -120,6 +109,8 @@ type searchSupport struct {
|
|||
initMinSize int
|
||||
initMaxSize int
|
||||
|
||||
buildIndex singleflight.Group
|
||||
|
||||
// Index queue processors
|
||||
indexQueueProcessorsMutex sync.Mutex
|
||||
indexQueueProcessors map[string]*indexQueueProcessor
|
||||
|
@ -608,24 +599,53 @@ func (s *searchSupport) getOrCreateIndex(ctx context.Context, key NamespacedReso
|
|||
ctx, span := s.tracer.Start(ctx, tracingPrexfixSearch+"GetOrCreateIndex")
|
||||
defer span.End()
|
||||
|
||||
// TODO???
|
||||
// We want to block while building the index and return the same index for the key
|
||||
// simple mutex not great... we don't want to block while anything in building, just the same key
|
||||
idx, err := s.search.GetIndex(ctx, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if idx == nil {
|
||||
idx, _, err = s.build(ctx, key, 10, 0) // unknown size and RV
|
||||
if idx != nil {
|
||||
return idx, nil
|
||||
}
|
||||
|
||||
idxInt, err, _ := s.buildIndex.Do(key.String(), func() (interface{}, error) {
|
||||
// Recheck if some other goroutine managed to build an index in the meantime.
|
||||
// (That is, it finished running this function and stored the index into the cache)
|
||||
idx, err := s.search.GetIndex(ctx, key)
|
||||
if err == nil && idx != nil {
|
||||
return idx, nil
|
||||
}
|
||||
|
||||
// Get correct value of size + RV for building the index. This is important for our Bleve
|
||||
// backend to decide whether to build index in-memory or as file-based.
|
||||
stats, err := s.storage.GetResourceStats(ctx, key.Namespace, 0)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get resource stats: %w", err)
|
||||
}
|
||||
|
||||
size := int64(0)
|
||||
rv := int64(0)
|
||||
for _, stat := range stats {
|
||||
if stat.Namespace == key.Namespace && stat.Group == key.Group && stat.Resource == key.Resource {
|
||||
size = stat.Count
|
||||
rv = stat.ResourceVersion
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
idx, _, err = s.build(ctx, key, size, rv)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error building search index, %w", err)
|
||||
}
|
||||
if idx == nil {
|
||||
return nil, fmt.Errorf("nil index after build")
|
||||
}
|
||||
return idx, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return idx, nil
|
||||
return idxInt.(ResourceIndex), nil
|
||||
}
|
||||
|
||||
func (s *searchSupport) build(ctx context.Context, nsr NamespacedResource, size int64, rv int64) (ResourceIndex, int64, error) {
|
||||
|
@ -640,8 +660,6 @@ func (s *searchSupport) build(ctx context.Context, nsr NamespacedResource, size
|
|||
}
|
||||
fields := s.builders.GetFields(nsr)
|
||||
|
||||
logger.Debug("Building index", "resource", nsr.Resource, "size", size, "rv", rv)
|
||||
|
||||
index, err := s.search.BuildIndex(ctx, nsr, size, rv, fields, func(index ResourceIndex) (int64, error) {
|
||||
rv, err = s.storage.ListIterator(ctx, &resourcepb.ListRequest{
|
||||
Limit: 1000000000000, // big number
|
||||
|
|
|
@ -2,7 +2,9 @@ package resource
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/authlib/types"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
@ -96,6 +98,7 @@ func (m *mockStorageBackend) ListHistory(ctx context.Context, req *resourcepb.Li
|
|||
|
||||
// mockSearchBackend implements SearchBackend for testing with tracking capabilities
|
||||
type mockSearchBackend struct {
|
||||
mu sync.Mutex
|
||||
buildIndexCalls []buildIndexCall
|
||||
buildEmptyIndexCalls []buildEmptyIndexCall
|
||||
}
|
||||
|
@ -129,6 +132,9 @@ func (m *mockSearchBackend) BuildIndex(ctx context.Context, key NamespacedResour
|
|||
return nil, err
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
// Determine if this is an empty index based on size
|
||||
// Empty indexes are characterized by size == 0
|
||||
if size == 0 {
|
||||
|
@ -271,3 +277,58 @@ func TestBuildIndexes_MaxCountThreshold(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSearchGetOrCreateIndex(t *testing.T) {
|
||||
// Setup mock implementations
|
||||
storage := &mockStorageBackend{
|
||||
resourceStats: []ResourceStats{
|
||||
{NamespacedResource: NamespacedResource{Namespace: "ns", Group: "group", Resource: "resource"}, Count: 50, ResourceVersion: 11111111},
|
||||
},
|
||||
}
|
||||
search := &mockSearchBackend{
|
||||
buildIndexCalls: []buildIndexCall{},
|
||||
buildEmptyIndexCalls: []buildEmptyIndexCall{},
|
||||
}
|
||||
supplier := &TestDocumentBuilderSupplier{
|
||||
GroupsResources: map[string]string{
|
||||
"group": "resource",
|
||||
},
|
||||
}
|
||||
|
||||
// Create search support with the specified initMaxSize
|
||||
opts := SearchOptions{
|
||||
Backend: search,
|
||||
Resources: supplier,
|
||||
WorkerThreads: 1,
|
||||
InitMinCount: 1, // set min count to default for this test
|
||||
InitMaxCount: 0,
|
||||
}
|
||||
|
||||
support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, support)
|
||||
|
||||
start := make(chan struct{})
|
||||
|
||||
const concurrency = 100
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < concurrency; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
<-start
|
||||
_, _ = support.getOrCreateIndex(context.Background(), NamespacedResource{Namespace: "ns", Group: "group", Resource: "resource"})
|
||||
}()
|
||||
}
|
||||
|
||||
// Wait a bit for goroutines to start (hopefully)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
// Unblock all goroutines.
|
||||
close(start)
|
||||
wg.Wait()
|
||||
|
||||
require.NotEmpty(t, search.buildIndexCalls)
|
||||
require.Less(t, len(search.buildIndexCalls), concurrency, "Should not have built index more than a few times (ideally once)")
|
||||
require.Equal(t, int64(50), search.buildIndexCalls[0].size)
|
||||
require.Equal(t, int64(11111111), search.buildIndexCalls[0].resourceVersion)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package search
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"math"
|
||||
|
@ -11,6 +12,7 @@ import (
|
|||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/blevesearch/bleve/v2"
|
||||
|
@ -31,7 +33,6 @@ import (
|
|||
authlib "github.com/grafana/authlib/types"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
"github.com/grafana/grafana/pkg/infra/localcache"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
)
|
||||
|
@ -39,8 +40,6 @@ import (
|
|||
const (
|
||||
// tracingPrexfixBleve is the prefix used for tracing spans in the Bleve backend
|
||||
tracingPrexfixBleve = "unified_search.bleve."
|
||||
// Default index cache cleanup TTL is 1 minute
|
||||
indexCacheCleanupInterval = time.Minute
|
||||
)
|
||||
|
||||
var _ resource.SearchBackend = &bleveBackend{}
|
||||
|
@ -57,7 +56,7 @@ type BleveOptions struct {
|
|||
// ?? not totally sure the units
|
||||
BatchSize int
|
||||
|
||||
// Index cache TTL for bleve indices
|
||||
// Index cache TTL for bleve indices. 0 disables expiration for in-memory indexes.
|
||||
IndexCacheTTL time.Duration
|
||||
}
|
||||
|
||||
|
@ -65,9 +64,9 @@ type bleveBackend struct {
|
|||
tracer trace.Tracer
|
||||
log *slog.Logger
|
||||
opts BleveOptions
|
||||
start time.Time
|
||||
|
||||
cache *localcache.CacheService
|
||||
cacheMx sync.RWMutex
|
||||
cache map[resource.NamespacedResource]*bleveIndex
|
||||
|
||||
features featuremgmt.FeatureToggles
|
||||
indexMetrics *resource.BleveIndexMetrics
|
||||
|
@ -77,6 +76,12 @@ func NewBleveBackend(opts BleveOptions, tracer trace.Tracer, features featuremgm
|
|||
if opts.Root == "" {
|
||||
return nil, fmt.Errorf("bleve backend missing root folder configuration")
|
||||
}
|
||||
absRoot, err := filepath.Abs(opts.Root)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting absolute path for bleve root folder %w", err)
|
||||
}
|
||||
opts.Root = absRoot
|
||||
|
||||
root, err := os.Stat(opts.Root)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error opening bleve root folder %w", err)
|
||||
|
@ -85,35 +90,64 @@ func NewBleveBackend(opts BleveOptions, tracer trace.Tracer, features featuremgm
|
|||
return nil, fmt.Errorf("bleve root is configured against a file (not folder)")
|
||||
}
|
||||
|
||||
bleveBackend := &bleveBackend{
|
||||
be := &bleveBackend{
|
||||
log: slog.Default().With("logger", "bleve-backend"),
|
||||
tracer: tracer,
|
||||
cache: localcache.New(opts.IndexCacheTTL, indexCacheCleanupInterval),
|
||||
cache: map[resource.NamespacedResource]*bleveIndex{},
|
||||
opts: opts,
|
||||
start: time.Now(),
|
||||
features: features,
|
||||
indexMetrics: indexMetrics,
|
||||
}
|
||||
|
||||
go bleveBackend.updateIndexSizeMetric(opts.Root)
|
||||
go be.updateIndexSizeMetric(opts.Root)
|
||||
|
||||
return bleveBackend, nil
|
||||
return be, nil
|
||||
}
|
||||
|
||||
// This will return nil if the key does not exist
|
||||
func (b *bleveBackend) GetIndex(ctx context.Context, key resource.NamespacedResource) (resource.ResourceIndex, error) {
|
||||
val, ok := b.cache.Get(key.String())
|
||||
if !ok {
|
||||
// GetIndex will return nil if the key does not exist
|
||||
func (b *bleveBackend) GetIndex(_ context.Context, key resource.NamespacedResource) (resource.ResourceIndex, error) {
|
||||
idx := b.getCachedIndex(key)
|
||||
// Avoid returning typed nils.
|
||||
if idx == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
idx, ok := val.(*bleveIndex)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("cache item is not a bleve index: %s", key.String())
|
||||
}
|
||||
return idx, nil
|
||||
}
|
||||
|
||||
func (b *bleveBackend) getCachedIndex(key resource.NamespacedResource) *bleveIndex {
|
||||
// Check index with read-lock first.
|
||||
b.cacheMx.RLock()
|
||||
val := b.cache[key]
|
||||
b.cacheMx.RUnlock()
|
||||
|
||||
if val == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if val.expiration.IsZero() || val.expiration.After(time.Now()) {
|
||||
// Not expired yet.
|
||||
return val
|
||||
}
|
||||
|
||||
// We're dealing with expired index. We need to remove it from the cache and close it.
|
||||
b.cacheMx.Lock()
|
||||
val = b.cache[key]
|
||||
delete(b.cache, key)
|
||||
b.cacheMx.Unlock()
|
||||
|
||||
if val == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Index is no longer in the cache, but we need to close it.
|
||||
err := val.index.Close()
|
||||
if err != nil {
|
||||
b.log.Error("failed to close index", "key", key, "err", err)
|
||||
}
|
||||
b.log.Info("index evicted from cache", "key", key)
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateIndexSizeMetric sets the total size of all file-based indices metric.
|
||||
func (b *bleveBackend) updateIndexSizeMetric(indexPath string) {
|
||||
if b.indexMetrics == nil {
|
||||
|
@ -147,28 +181,21 @@ func (b *bleveBackend) updateIndexSizeMetric(indexPath string) {
|
|||
}
|
||||
}
|
||||
|
||||
// Build an index from scratch
|
||||
func (b *bleveBackend) BuildIndex(ctx context.Context,
|
||||
// BuildIndex builds an index from scratch.
|
||||
// If built successfully, the new index replaces the old index in the cache (if there was any).
|
||||
func (b *bleveBackend) BuildIndex(
|
||||
ctx context.Context,
|
||||
key resource.NamespacedResource,
|
||||
|
||||
// When the size is known, it will be passed along here
|
||||
// Depending on the size, the backend may choose different options (eg: memory vs disk)
|
||||
size int64,
|
||||
|
||||
// The last known resource version can be used to know that we can skip calling the builder
|
||||
resourceVersion int64,
|
||||
|
||||
// the non-standard searchable fields
|
||||
fields resource.SearchableDocumentFields,
|
||||
|
||||
// The builder will write all documents before returning
|
||||
builder func(index resource.ResourceIndex) (int64, error),
|
||||
) (resource.ResourceIndex, error) {
|
||||
_, span := b.tracer.Start(ctx, tracingPrexfixBleve+"BuildIndex")
|
||||
defer span.End()
|
||||
|
||||
var err error
|
||||
var index bleve.Index
|
||||
fileIndexName := "" // Name of the file-based index, or empty for in-memory indexes.
|
||||
|
||||
build := true
|
||||
mapper, err := GetBleveMappings(fields)
|
||||
|
@ -176,61 +203,62 @@ func (b *bleveBackend) BuildIndex(ctx context.Context,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if size > b.opts.FileThreshold {
|
||||
resourceDir := filepath.Join(b.opts.Root, key.Namespace,
|
||||
fmt.Sprintf("%s.%s", key.Resource, key.Group),
|
||||
)
|
||||
fname := fmt.Sprintf("rv%d", resourceVersion)
|
||||
if resourceVersion == 0 {
|
||||
fname = b.start.Format("tmp-20060102-150405")
|
||||
}
|
||||
dir := filepath.Join(resourceDir, fname)
|
||||
if !isValidPath(dir, b.opts.Root) {
|
||||
b.log.Error("Directory is not valid", "directory", dir)
|
||||
}
|
||||
if resourceVersion > 0 {
|
||||
info, _ := os.Stat(dir)
|
||||
if info != nil && info.IsDir() {
|
||||
index, err = bleve.Open(dir) // NOTE, will use the same mappings!!!
|
||||
if err == nil {
|
||||
found, err := index.DocCount()
|
||||
if err != nil || int64(found) != size {
|
||||
b.log.Info("this size changed since the last time the index opened")
|
||||
_ = index.Close()
|
||||
cachedIndex := b.getCachedIndex(key)
|
||||
|
||||
// Pick a new file name
|
||||
fname = b.start.Format("tmp-20060102-150405-changed")
|
||||
dir = filepath.Join(resourceDir, fname)
|
||||
index = nil
|
||||
} else {
|
||||
build = false // no need to build the index
|
||||
}
|
||||
logWithDetails := b.log.With("namespace", key.Namespace, "group", key.Group, "resource", key.Resource, "size", size, "rv", resourceVersion)
|
||||
|
||||
resourceDir := filepath.Join(b.opts.Root, cleanFileSegment(key.Namespace), cleanFileSegment(fmt.Sprintf("%s.%s", key.Resource, key.Group)))
|
||||
|
||||
if size > b.opts.FileThreshold {
|
||||
// We only check for the existing file-based index if we don't already have an open index for this key.
|
||||
// This happens on startup, or when memory-based index has expired. (We don't expire file-based indexes)
|
||||
// If we do have an unexpired cached index already, we always build a new index from scratch.
|
||||
if cachedIndex == nil && resourceVersion > 0 {
|
||||
index, fileIndexName = b.findPreviousFileBasedIndex(resourceDir, resourceVersion, size)
|
||||
}
|
||||
|
||||
if index != nil {
|
||||
build = false
|
||||
logWithDetails.Debug("Existing index found on filesystem", "directory", filepath.Join(resourceDir, fileIndexName))
|
||||
} else {
|
||||
// Building index from scratch. Index name has a time component in it to be unique, but if
|
||||
// we happen to create non-unique name, we bump the time and try again.
|
||||
|
||||
indexDir := ""
|
||||
now := time.Now()
|
||||
for index == nil {
|
||||
fileIndexName = formatIndexName(time.Now(), resourceVersion)
|
||||
indexDir = filepath.Join(resourceDir, fileIndexName)
|
||||
if !isPathWithinRoot(indexDir, b.opts.Root) {
|
||||
return nil, fmt.Errorf("invalid path %s", indexDir)
|
||||
}
|
||||
|
||||
index, err = bleve.New(indexDir, mapper)
|
||||
if errors.Is(err, bleve.ErrorIndexPathExists) {
|
||||
now = now.Add(time.Second) // Bump time for next try
|
||||
index = nil // Bleve actually returns non-nil value with ErrorIndexPathExists
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating new bleve index: %s %w", indexDir, err)
|
||||
}
|
||||
}
|
||||
|
||||
logWithDetails.Info("Building index using filesystem", "directory", indexDir)
|
||||
}
|
||||
|
||||
if index == nil {
|
||||
index, err = bleve.New(dir, mapper)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error creating new bleve index: %s %w", dir, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Start a background task to cleanup the old index directories
|
||||
if index != nil && err == nil {
|
||||
go b.cleanOldIndexes(resourceDir, fname)
|
||||
}
|
||||
if b.indexMetrics != nil {
|
||||
b.indexMetrics.IndexTenants.WithLabelValues("file").Inc()
|
||||
}
|
||||
} else {
|
||||
index, err = bleve.NewMemOnly(mapper)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating new in-memory bleve index: %w", err)
|
||||
}
|
||||
if b.indexMetrics != nil {
|
||||
b.indexMetrics.IndexTenants.WithLabelValues("memory").Inc()
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
logWithDetails.Info("Building index using memory")
|
||||
}
|
||||
|
||||
// Batch all the changes
|
||||
|
@ -249,28 +277,72 @@ func (b *bleveBackend) BuildIndex(ctx context.Context,
|
|||
}
|
||||
|
||||
if build {
|
||||
start := time.Now()
|
||||
_, err = builder(idx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
elapsed := time.Since(start)
|
||||
logWithDetails.Info("Finished building index", "elapsed", elapsed)
|
||||
}
|
||||
|
||||
b.cache.SetDefault(key.String(), idx)
|
||||
// Set expiration after building the index. Only expire in-memory indexes.
|
||||
if fileIndexName == "" && b.opts.IndexCacheTTL > 0 {
|
||||
idx.expiration = time.Now().Add(b.opts.IndexCacheTTL)
|
||||
}
|
||||
|
||||
// Store the index in the cache.
|
||||
if idx.expiration.IsZero() {
|
||||
logWithDetails.Info("Storing index in cache, with no expiration", "key", key)
|
||||
} else {
|
||||
logWithDetails.Info("Storing index in cache", "key", key, "expiration", idx.expiration)
|
||||
}
|
||||
|
||||
b.cacheMx.Lock()
|
||||
prev := b.cache[key]
|
||||
b.cache[key] = idx
|
||||
b.cacheMx.Unlock()
|
||||
|
||||
// If there was a previous index in the cache, close it.
|
||||
if prev != nil {
|
||||
err := prev.index.Close()
|
||||
if err != nil {
|
||||
logWithDetails.Error("failed to close previous index", "key", key, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Start a background task to cleanup the old index directories. If we have built a new file-based index,
|
||||
// the new name is ignored. If we have created in-memory index and fileIndexName is empty, all old directories can be removed.
|
||||
go b.cleanOldIndexes(resourceDir, fileIndexName)
|
||||
|
||||
return idx, nil
|
||||
}
|
||||
|
||||
func (b *bleveBackend) cleanOldIndexes(dir string, skip string) {
|
||||
func cleanFileSegment(input string) string {
|
||||
input = strings.ReplaceAll(input, string(filepath.Separator), "_")
|
||||
input = strings.ReplaceAll(input, "..", "_")
|
||||
return input
|
||||
}
|
||||
|
||||
// cleanOldIndexes deletes all subdirectories inside dir, skipping directory with "skipName".
|
||||
// "skipName" can be empty.
|
||||
func (b *bleveBackend) cleanOldIndexes(dir string, skipName string) {
|
||||
files, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return
|
||||
}
|
||||
b.log.Warn("error cleaning folders from", "directory", dir, "error", err)
|
||||
return
|
||||
}
|
||||
for _, file := range files {
|
||||
if file.IsDir() && file.Name() != skip {
|
||||
if file.IsDir() && file.Name() != skipName {
|
||||
fpath := filepath.Join(dir, file.Name())
|
||||
if !isValidPath(dir, b.opts.Root) {
|
||||
b.log.Error("Path is not valid", "directory", fpath, "error", err)
|
||||
if !isPathWithinRoot(fpath, b.opts.Root) {
|
||||
b.log.Warn("Skipping cleanup of directory", "directory", fpath)
|
||||
continue
|
||||
}
|
||||
|
||||
err = os.RemoveAll(fpath)
|
||||
if err != nil {
|
||||
b.log.Error("Unable to remove old index folder", "directory", fpath, "error", err)
|
||||
|
@ -281,31 +353,45 @@ func (b *bleveBackend) cleanOldIndexes(dir string, skip string) {
|
|||
}
|
||||
}
|
||||
|
||||
// isValidPath does a sanity check in case it tries to access a different dir
|
||||
func isValidPath(path, safeDir string) bool {
|
||||
if path == "" || safeDir == "" {
|
||||
// isPathWithinRoot verifies that path is within given absoluteRoot.
|
||||
func isPathWithinRoot(path, absoluteRoot string) bool {
|
||||
if path == "" || absoluteRoot == "" {
|
||||
return false
|
||||
}
|
||||
cleanPath := filepath.Clean(path)
|
||||
cleanSafeDir := filepath.Clean(safeDir)
|
||||
|
||||
rel, err := filepath.Rel(cleanSafeDir, cleanPath)
|
||||
path, err := filepath.Abs(path)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return !strings.HasPrefix(rel, "..") && !strings.Contains(rel, "\\")
|
||||
if !strings.HasPrefix(path, absoluteRoot) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// cacheKeys returns list of keys for indexes in the cache (including possibly expired ones).
|
||||
func (b *bleveBackend) cacheKeys() []resource.NamespacedResource {
|
||||
b.cacheMx.RLock()
|
||||
defer b.cacheMx.RUnlock()
|
||||
|
||||
keys := make([]resource.NamespacedResource, 0, len(b.cache))
|
||||
for k := range b.cache {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
// TotalDocs returns the total number of documents across all indices
|
||||
func (b *bleveBackend) TotalDocs() int64 {
|
||||
var totalDocs int64
|
||||
for _, v := range b.cache.Items() {
|
||||
idx, ok := v.Object.(*bleveIndex)
|
||||
if !ok {
|
||||
b.log.Warn("cache item is not a bleve index", "key", v.Object)
|
||||
// We iterate over keys and call getCachedIndex for each index individually.
|
||||
// We do this to avoid keeping a lock for the entire TotalDocs function, since DocCount may be slow (due to disk access).
|
||||
// Calling getCachedIndex also handles index expiration.
|
||||
for _, key := range b.cacheKeys() {
|
||||
idx := b.getCachedIndex(key)
|
||||
if idx == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
c, err := idx.index.DocCount()
|
||||
if err != nil {
|
||||
continue
|
||||
|
@ -315,6 +401,74 @@ func (b *bleveBackend) TotalDocs() int64 {
|
|||
return totalDocs
|
||||
}
|
||||
|
||||
func formatIndexName(now time.Time, resourceVersion int64) string {
|
||||
timestamp := now.Format("20060102-150405")
|
||||
return fmt.Sprintf("%s-%d", timestamp, resourceVersion)
|
||||
}
|
||||
|
||||
func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string, resourceVersion int64, size int64) (bleve.Index, string) {
|
||||
entries, err := os.ReadDir(resourceDir)
|
||||
if err != nil {
|
||||
return nil, ""
|
||||
}
|
||||
|
||||
indexName := ""
|
||||
for _, ent := range entries {
|
||||
if !ent.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
parts := strings.Split(ent.Name(), "-")
|
||||
if len(parts) != 3 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Last part is resourceVersion
|
||||
indexRv, err := strconv.ParseInt(parts[2], 10, 64)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if indexRv != resourceVersion {
|
||||
continue
|
||||
}
|
||||
indexName = ent.Name()
|
||||
break
|
||||
}
|
||||
|
||||
if indexName == "" {
|
||||
return nil, ""
|
||||
}
|
||||
|
||||
indexDir := filepath.Join(resourceDir, indexName)
|
||||
idx, err := bleve.Open(indexDir)
|
||||
if err != nil {
|
||||
return nil, ""
|
||||
}
|
||||
|
||||
cnt, err := idx.DocCount()
|
||||
if err != nil {
|
||||
_ = idx.Close()
|
||||
return nil, ""
|
||||
}
|
||||
|
||||
if uint64(size) != cnt {
|
||||
_ = idx.Close()
|
||||
return nil, ""
|
||||
}
|
||||
|
||||
return idx, indexName
|
||||
}
|
||||
|
||||
func (b *bleveBackend) closeAllIndexes() {
|
||||
b.cacheMx.Lock()
|
||||
defer b.cacheMx.Unlock()
|
||||
|
||||
for key, idx := range b.cache {
|
||||
_ = idx.index.Close()
|
||||
delete(b.cache, key)
|
||||
}
|
||||
}
|
||||
|
||||
type bleveIndex struct {
|
||||
key resource.NamespacedResource
|
||||
index bleve.Index
|
||||
|
@ -322,6 +476,10 @@ type bleveIndex struct {
|
|||
standard resource.SearchableDocumentFields
|
||||
fields resource.SearchableDocumentFields
|
||||
|
||||
// When to expire and close the index. Zero value = no expiration.
|
||||
// We only expire in-memory indexes.
|
||||
expiration time.Time
|
||||
|
||||
// The values returned with all
|
||||
allFields []*resourcepb.ResourceTableColumnDefinition
|
||||
features featuremgmt.FeatureToggles
|
||||
|
|
|
@ -8,7 +8,9 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/blevesearch/bleve/v2"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
|
@ -671,65 +673,301 @@ func TestSafeInt64ToInt(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func Test_isValidPath(t *testing.T) {
|
||||
func Test_isPathWithinRoot(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
dir string
|
||||
safeDir string
|
||||
want bool
|
||||
name string
|
||||
dir string
|
||||
root string
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "valid path",
|
||||
dir: "/path/to/my-file/",
|
||||
safeDir: "/path/to/",
|
||||
want: true,
|
||||
name: "valid path",
|
||||
dir: "/path/to/my-file/",
|
||||
root: "/path/to/",
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "valid path without trailing slash",
|
||||
dir: "/path/to/my-file",
|
||||
safeDir: "/path/to",
|
||||
want: true,
|
||||
name: "valid path without trailing slash",
|
||||
dir: "/path/to/my-file",
|
||||
root: "/path/to",
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "path with double slashes",
|
||||
dir: "/path//to//my-file/",
|
||||
safeDir: "/path/to/",
|
||||
want: true,
|
||||
name: "path with double slashes",
|
||||
dir: "/path//to//my-file/",
|
||||
root: "/path/to/",
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "invalid path: ..",
|
||||
dir: "/path/../above/",
|
||||
safeDir: "/path/to/",
|
||||
name: "invalid path: ..",
|
||||
dir: "/path/../above/",
|
||||
root: "/path/to/",
|
||||
},
|
||||
{
|
||||
name: "invalid path: \\",
|
||||
dir: "\\path/to",
|
||||
safeDir: "/path/to/",
|
||||
name: "invalid path: \\",
|
||||
dir: "\\path/to",
|
||||
root: "/path/to/",
|
||||
},
|
||||
{
|
||||
name: "invalid path: not under safe dir",
|
||||
dir: "/path/to.txt",
|
||||
safeDir: "/path/to/",
|
||||
name: "invalid path: not under safe dir",
|
||||
dir: "/path/to.txt",
|
||||
root: "/path/to/",
|
||||
},
|
||||
{
|
||||
name: "invalid path: empty paths",
|
||||
dir: "",
|
||||
safeDir: "/path/to/",
|
||||
name: "invalid path: empty paths",
|
||||
dir: "",
|
||||
root: "/path/to/",
|
||||
},
|
||||
{
|
||||
name: "invalid path: different path",
|
||||
dir: "/other/path/to/my-file/",
|
||||
safeDir: "/Some/other/path",
|
||||
name: "invalid path: different path",
|
||||
dir: "/other/path/to/my-file/",
|
||||
root: "/Some/other/path",
|
||||
},
|
||||
{
|
||||
name: "invalid path: empty safe path",
|
||||
dir: "/path/to/",
|
||||
safeDir: "",
|
||||
name: "invalid path: empty safe path",
|
||||
dir: "/path/to/",
|
||||
root: "",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
require.Equal(t, tt.want, isValidPath(tt.dir, tt.safeDir))
|
||||
require.Equal(t, tt.want, isPathWithinRoot(tt.dir, tt.root))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func setupBleveBackend(t *testing.T, fileThreshold int, cacheTTL time.Duration, dir string) *bleveBackend {
|
||||
if dir == "" {
|
||||
dir = t.TempDir()
|
||||
}
|
||||
backend, err := NewBleveBackend(BleveOptions{
|
||||
Root: dir,
|
||||
FileThreshold: int64(fileThreshold),
|
||||
IndexCacheTTL: cacheTTL,
|
||||
}, tracing.NewNoopTracerService(), featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearchPermissionFiltering), nil)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, backend)
|
||||
t.Cleanup(backend.closeAllIndexes)
|
||||
return backend
|
||||
}
|
||||
|
||||
func TestBleveInMemoryIndexExpiration(t *testing.T) {
|
||||
backend := setupBleveBackend(t, 5, time.Nanosecond, "")
|
||||
|
||||
ns := resource.NamespacedResource{
|
||||
Namespace: "test",
|
||||
Group: "group",
|
||||
Resource: "resource",
|
||||
}
|
||||
|
||||
builtIndex, err := backend.BuildIndex(context.Background(), ns, 1 /* below FileThreshold */, 100, nil, indexTestDocs(ns, 1))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for index expiration, which is 1ns
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
idx, err := backend.GetIndex(context.Background(), ns)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, idx)
|
||||
|
||||
// Verify that builtIndex is now closed.
|
||||
_, err = builtIndex.DocCount(context.Background(), "")
|
||||
require.ErrorIs(t, err, bleve.ErrorIndexClosed)
|
||||
}
|
||||
|
||||
func TestBleveFileIndexExpiration(t *testing.T) {
|
||||
backend := setupBleveBackend(t, 5, time.Nanosecond, "")
|
||||
|
||||
ns := resource.NamespacedResource{
|
||||
Namespace: "test",
|
||||
Group: "group",
|
||||
Resource: "resource",
|
||||
}
|
||||
|
||||
// size=100 is above FileThreshold, this will be file-based index
|
||||
builtIndex, err := backend.BuildIndex(context.Background(), ns, 100, 100, nil, indexTestDocs(ns, 1))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for index expiration, which is 1ns
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
idx, err := backend.GetIndex(context.Background(), ns)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, idx)
|
||||
|
||||
// Verify that builtIndex is still open.
|
||||
cnt, err := builtIndex.DocCount(context.Background(), "")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(1), cnt)
|
||||
}
|
||||
|
||||
func TestFileIndexIsReusedOnSameSizeAndRV(t *testing.T) {
|
||||
ns := resource.NamespacedResource{
|
||||
Namespace: "test",
|
||||
Group: "group",
|
||||
Resource: "resource",
|
||||
}
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
backend1 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
|
||||
_, err := backend1.BuildIndex(context.Background(), ns, 10 /* file based */, 100, nil, indexTestDocs(ns, 10))
|
||||
require.NoError(t, err)
|
||||
backend1.closeAllIndexes()
|
||||
|
||||
// We open new backend using same directory, and run indexing with same size (10) and RV (100). This should reuse existing index, and skip indexing.
|
||||
backend2 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
|
||||
idx, err := backend2.BuildIndex(context.Background(), ns, 10 /* file based */, 100, nil, indexTestDocs(ns, 1000))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify that we're reusing existing index and there is only 10 documents in it, not 1000.
|
||||
cnt, err := idx.DocCount(context.Background(), "")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(10), cnt)
|
||||
}
|
||||
|
||||
func TestFileIndexIsNotReusedOnDifferentSize(t *testing.T) {
|
||||
ns := resource.NamespacedResource{
|
||||
Namespace: "test",
|
||||
Group: "group",
|
||||
Resource: "resource",
|
||||
}
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
backend1 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
|
||||
_, err := backend1.BuildIndex(context.Background(), ns, 10, 100, nil, indexTestDocs(ns, 10))
|
||||
require.NoError(t, err)
|
||||
backend1.closeAllIndexes()
|
||||
|
||||
// We open new backend using same directory, but with different size. Index should be rebuilt.
|
||||
backend2 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
|
||||
idx, err := backend2.BuildIndex(context.Background(), ns, 100, 100, nil, indexTestDocs(ns, 100))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify that index has updated number of documents.
|
||||
cnt, err := idx.DocCount(context.Background(), "")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(100), cnt)
|
||||
}
|
||||
|
||||
func TestFileIndexIsNotReusedOnDifferentRV(t *testing.T) {
|
||||
ns := resource.NamespacedResource{
|
||||
Namespace: "test",
|
||||
Group: "group",
|
||||
Resource: "resource",
|
||||
}
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
backend1 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
|
||||
_, err := backend1.BuildIndex(context.Background(), ns, 10, 100, nil, indexTestDocs(ns, 10))
|
||||
require.NoError(t, err)
|
||||
backend1.closeAllIndexes()
|
||||
|
||||
// We open new backend using same directory, but with different RV. Index should be rebuilt.
|
||||
backend2 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
|
||||
idx, err := backend2.BuildIndex(context.Background(), ns, 10 /* file based */, 999999, nil, indexTestDocs(ns, 100))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify that index has updated number of documents.
|
||||
cnt, err := idx.DocCount(context.Background(), "")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(100), cnt)
|
||||
}
|
||||
|
||||
func TestRebuildingIndexClosesPreviousCachedIndex(t *testing.T) {
|
||||
ns := resource.NamespacedResource{
|
||||
Namespace: "test",
|
||||
Group: "group",
|
||||
Resource: "resource",
|
||||
}
|
||||
|
||||
for name, testCase := range map[string]struct {
|
||||
firstInMemory bool
|
||||
secondInMemory bool
|
||||
}{
|
||||
"in-memory, in-memory": {true, true},
|
||||
"in-memory, file": {true, false},
|
||||
"file, in-memory": {false, true},
|
||||
"file, file": {false, false},
|
||||
} {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
backend := setupBleveBackend(t, 5, time.Nanosecond, "")
|
||||
|
||||
firstSize := 100
|
||||
if testCase.firstInMemory {
|
||||
firstSize = 1
|
||||
}
|
||||
firstIndex, err := backend.BuildIndex(context.Background(), ns, int64(firstSize), 100, nil, indexTestDocs(ns, firstSize))
|
||||
require.NoError(t, err)
|
||||
|
||||
secondSize := 100
|
||||
if testCase.firstInMemory {
|
||||
secondSize = 1
|
||||
}
|
||||
secondIndex, err := backend.BuildIndex(context.Background(), ns, int64(secondSize), 100, nil, indexTestDocs(ns, secondSize))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify that first and second index are different, and first one is now closed.
|
||||
require.NotEqual(t, firstIndex, secondIndex)
|
||||
|
||||
_, err = firstIndex.DocCount(context.Background(), "")
|
||||
require.ErrorIs(t, err, bleve.ErrorIndexClosed)
|
||||
|
||||
cnt, err := secondIndex.DocCount(context.Background(), "")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(secondSize), cnt)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func indexTestDocs(ns resource.NamespacedResource, docs int) func(index resource.ResourceIndex) (int64, error) {
|
||||
return func(index resource.ResourceIndex) (int64, error) {
|
||||
var items []*resource.BulkIndexItem
|
||||
for i := 0; i < docs; i++ {
|
||||
items = append(items, &resource.BulkIndexItem{
|
||||
Action: resource.ActionIndex,
|
||||
Doc: &resource.IndexableDocument{
|
||||
Key: &resourcepb.ResourceKey{
|
||||
Namespace: ns.Namespace,
|
||||
Group: ns.Group,
|
||||
Resource: ns.Resource,
|
||||
Name: fmt.Sprintf("doc%d", i),
|
||||
},
|
||||
Title: fmt.Sprintf("Document %d", i),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
err := index.BulkIndex(&resource.BulkIndexRequest{Items: items})
|
||||
return int64(docs), err
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanOldIndexes(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
b := setupBleveBackend(t, 5, time.Nanosecond, dir)
|
||||
|
||||
t.Run("with skip", func(t *testing.T) {
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(dir, "index-1/a"), 0750))
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(dir, "index-2/b"), 0750))
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(dir, "index-3/c"), 0750))
|
||||
|
||||
b.cleanOldIndexes(dir, "index-2")
|
||||
files, err := os.ReadDir(dir)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, files, 1)
|
||||
require.Equal(t, "index-2", files[0].Name())
|
||||
})
|
||||
|
||||
t.Run("without skip", func(t *testing.T) {
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(dir, "index-1/a"), 0750))
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(dir, "index-2/b"), 0750))
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(dir, "index-3/c"), 0750))
|
||||
|
||||
b.cleanOldIndexes(dir, "")
|
||||
files, err := os.ReadDir(dir)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, files, 0)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue