diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index 21f43269fc9..e31ceae24aa 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -575,13 +575,14 @@ type Cfg struct { MaxPageSizeBytes int IndexPath string IndexWorkers int + IndexRebuildWorkers int IndexMaxBatchSize int IndexFileThreshold int IndexMinCount int IndexRebuildInterval time.Duration IndexCacheTTL time.Duration - MaxFileIndexAge time.Duration // Max age of file-based indexes. Index older than this will not be reused between restarts. - MinFileIndexBuildVersion string // Minimum version of Grafana that built the file-based index. If index was built with older Grafana, it will not be reused between restarts. + MaxFileIndexAge time.Duration // Max age of file-based indexes. Index older than this will be rebuilt asynchronously. + MinFileIndexBuildVersion string // Minimum version of Grafana that built the file-based index. If index was built with older Grafana, it will be rebuilt asynchronously. EnableSharding bool QOSEnabled bool QOSNumberWorker int diff --git a/pkg/setting/setting_unified_storage.go b/pkg/setting/setting_unified_storage.go index 921fb2f24a5..0ebb6c93830 100644 --- a/pkg/setting/setting_unified_storage.go +++ b/pkg/setting/setting_unified_storage.go @@ -54,6 +54,7 @@ func (cfg *Cfg) setUnifiedStorageConfig() { cfg.MaxPageSizeBytes = section.Key("max_page_size_bytes").MustInt(0) cfg.IndexPath = section.Key("index_path").String() cfg.IndexWorkers = section.Key("index_workers").MustInt(10) + cfg.IndexRebuildWorkers = section.Key("index_rebuild_workers").MustInt(5) cfg.IndexMaxBatchSize = section.Key("index_max_batch_size").MustInt(100) cfg.EnableSharding = section.Key("enable_sharding").MustBool(false) cfg.QOSEnabled = section.Key("qos_enabled").MustBool(false) diff --git a/pkg/storage/unified/resource/bleve_index_metrics.go b/pkg/storage/unified/resource/bleve_index_metrics.go index bdd3f071232..00e08c62389 100644 --- a/pkg/storage/unified/resource/bleve_index_metrics.go +++ b/pkg/storage/unified/resource/bleve_index_metrics.go @@ -20,6 +20,7 @@ type BleveIndexMetrics struct { UpdateLatency prometheus.Histogram UpdatedDocuments prometheus.Summary SearchUpdateWaitTime *prometheus.HistogramVec + RebuildQueueLength prometheus.Gauge } var IndexCreationBuckets = []float64{1, 5, 10, 25, 50, 75, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000} @@ -84,6 +85,10 @@ func ProvideIndexMetrics(reg prometheus.Registerer) *BleveIndexMetrics { NativeHistogramMaxBucketNumber: 160, NativeHistogramMinResetDuration: time.Hour, }, []string{"reason"}), + RebuildQueueLength: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "index_server_rebuild_queue_length", + Help: "Number of indexes waiting for rebuild", + }), } // Initialize labels. diff --git a/pkg/storage/unified/resource/search.go b/pkg/storage/unified/resource/search.go index fbf70eebe4f..bc85a0a5a8a 100644 --- a/pkg/storage/unified/resource/search.go +++ b/pkg/storage/unified/resource/search.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/Masterminds/semver" "github.com/hashicorp/golang-lru/v2/expirable" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -24,6 +25,7 @@ import ( folders "github.com/grafana/grafana/apps/folder/pkg/apis/folder/v1beta1" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/storage/unified/resourcepb" + "github.com/grafana/grafana/pkg/util/debouncer" ) const maxBatchSize = 1000 @@ -61,6 +63,11 @@ type BulkIndexRequest struct { ResourceVersion int64 } +type IndexBuildInfo struct { + BuildTime time.Time // Timestamp when the index was built. This value doesn't change on subsequent index updates. + BuildVersion *semver.Version // Grafana version used when originally building the index. This value doesn't change on subsequent index updates. +} + type ResourceIndex interface { // BulkIndex allows for multiple index actions to be performed in a single call. // The order of the items is guaranteed to be the same as the input @@ -82,6 +89,9 @@ type ResourceIndex interface { // UpdateIndex updates the index with the latest data (using update function provided when index was built) to guarantee strong consistency during the search. // Returns RV to which index was updated. UpdateIndex(ctx context.Context, reason string) (int64, error) + + // BuildInfo returns build information about the index. + BuildInfo() (IndexBuildInfo, error) } type BuildFn func(index ResourceIndex) (int64, error) @@ -112,6 +122,9 @@ type SearchBackend interface { // TotalDocs returns the total number of documents across all indexes. TotalDocs() int64 + + // GetOpenIndexes returns the list of indexes that are currently open. + GetOpenIndexes() []NamespacedResource } const tracingPrexfixSearch = "unified_search." @@ -132,8 +145,17 @@ type searchSupport struct { buildIndex singleflight.Group - // periodic rebuilding of the indexes to keep usage insights up to date - rebuildInterval time.Duration + // since usage insights is not in unified storage, we need to periodically rebuild the index + // to make sure these data points are up to date. + dashboardIndexMaxAge time.Duration + maxIndexAge time.Duration + minBuildVersion *semver.Version + + bgTaskWg sync.WaitGroup + bgTaskCancel func() + + rebuildQueue *debouncer.Queue[rebuildRequest] + rebuildWorkers int } var ( @@ -150,8 +172,12 @@ func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.A return nil, fmt.Errorf("missing tracer") } - if opts.WorkerThreads < 1 { - opts.WorkerThreads = 1 + if opts.InitWorkerThreads < 1 { + opts.InitWorkerThreads = 1 + } + + if opts.IndexRebuildWorkers < 1 { + opts.IndexRebuildWorkers = 1 } if ownsIndexFn == nil { @@ -161,18 +187,24 @@ func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.A } support = &searchSupport{ - access: access, - tracer: tracer, - storage: storage, - search: opts.Backend, - log: slog.Default().With("logger", "resource-search"), - initWorkers: opts.WorkerThreads, - initMinSize: opts.InitMinCount, - indexMetrics: indexMetrics, - rebuildInterval: opts.RebuildInterval, - ownsIndexFn: ownsIndexFn, + access: access, + tracer: tracer, + storage: storage, + search: opts.Backend, + log: slog.Default().With("logger", "resource-search"), + initWorkers: opts.InitWorkerThreads, + rebuildWorkers: opts.IndexRebuildWorkers, + initMinSize: opts.InitMinCount, + indexMetrics: indexMetrics, + ownsIndexFn: ownsIndexFn, + + dashboardIndexMaxAge: opts.DashboardIndexMaxAge, + maxIndexAge: opts.MaxIndexAge, + minBuildVersion: opts.MinBuildVersion, } + support.rebuildQueue = debouncer.NewQueue(combineRebuildRequests) + info, err := opts.Resources.GetDocumentBuilders() if err != nil { return nil, err @@ -186,6 +218,27 @@ func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.A return support, err } +func combineRebuildRequests(a, b rebuildRequest) (c rebuildRequest, ok bool) { + if a.NamespacedResource != b.NamespacedResource { + // We can only combine requests for the same keys. + return rebuildRequest{}, false + } + + ret := a + + // Using higher "min build version" is stricter condition, and causes more indexes to be rebuilt. + if a.minBuildVersion == nil || (b.minBuildVersion != nil && b.minBuildVersion.GreaterThan(a.minBuildVersion)) { + ret.minBuildVersion = b.minBuildVersion + } + + // Using higher "min build time" is stricter condition, and causes more indexes to be rebuilt. + if a.minBuildTime.IsZero() || (!b.minBuildTime.IsZero() && b.minBuildTime.After(a.minBuildTime)) { + ret.minBuildTime = b.minBuildTime + } + + return ret, true +} + func (s *searchSupport) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) { if req.NextPageToken != "" { return &resourcepb.ListManagedObjectsResponse{ @@ -401,7 +454,7 @@ func (s *searchSupport) GetStats(ctx context.Context, req *resourcepb.ResourceSt return rsp, nil } -func (s *searchSupport) buildIndexes(ctx context.Context, rebuild bool) (int, error) { +func (s *searchSupport) buildIndexes(ctx context.Context) (int, error) { totalBatchesIndexed := 0 group := errgroup.Group{} group.SetLimit(s.initWorkers) @@ -412,11 +465,6 @@ func (s *searchSupport) buildIndexes(ctx context.Context, rebuild bool) (int, er } for _, info := range stats { - // only periodically rebuild the dashboard index, specifically to update the usage insights data - if rebuild && info.Resource != dashboardv1.DASHBOARD_RESOURCE { - continue - } - own, err := s.ownsIndexFn(info.NamespacedResource) if err != nil { s.log.Warn("failed to check index ownership, building index", "namespace", info.Namespace, "group", info.Group, "resource", info.Resource, "error", err) @@ -426,18 +474,11 @@ func (s *searchSupport) buildIndexes(ctx context.Context, rebuild bool) (int, er } group.Go(func() error { - if rebuild { - // we need to clear the cache to make sure we get the latest usage insights data - s.builders.clearNamespacedCache(info.NamespacedResource) - } totalBatchesIndexed++ - s.log.Debug("building index", "namespace", info.Namespace, "group", info.Group, "resource", info.Resource, "rebuild", rebuild) + s.log.Debug("building index", "namespace", info.Namespace, "group", info.Group, "resource", info.Resource) reason := "init" - if rebuild { - reason = "rebuild" - } - _, err := s.build(ctx, info.NamespacedResource, info.Count, reason, rebuild) + _, err := s.build(ctx, info.NamespacedResource, info.Count, reason, false) return err }) } @@ -457,30 +498,41 @@ func (s *searchSupport) init(ctx context.Context) error { defer span.End() start := time.Now().Unix() - totalBatchesIndexed, err := s.buildIndexes(ctx, false) + totalBatchesIndexed, err := s.buildIndexes(ctx) if err != nil { return err } span.AddEvent("namespaces indexed", trace.WithAttributes(attribute.Int("namespaced_indexed", totalBatchesIndexed))) - // since usage insights is not in unified storage, we need to periodically rebuild the index - // to make sure these data points are up to date. - if s.rebuildInterval > 0 { - go s.startPeriodicRebuild(origCtx) + subctx, cancel := context.WithCancel(origCtx) + + s.bgTaskCancel = cancel + for i := 0; i < s.rebuildWorkers; i++ { + s.bgTaskWg.Add(1) + go s.runIndexRebuilder(subctx) } + s.bgTaskWg.Add(1) + go s.runPeriodicScanForIndexesToRebuild(subctx) + end := time.Now().Unix() s.log.Info("search index initialized", "duration_secs", end-start, "total_docs", s.search.TotalDocs()) return nil } -func (s *searchSupport) startPeriodicRebuild(ctx context.Context) { - ticker := time.NewTicker(s.rebuildInterval) - defer ticker.Stop() +func (s *searchSupport) stop() { + // Stop background tasks. + s.bgTaskCancel() + s.bgTaskWg.Wait() +} - s.log.Info("starting periodic index rebuild", "interval", s.rebuildInterval) +func (s *searchSupport) runPeriodicScanForIndexesToRebuild(ctx context.Context) { + defer s.bgTaskWg.Done() + + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() for { select { @@ -488,35 +540,166 @@ func (s *searchSupport) startPeriodicRebuild(ctx context.Context) { s.log.Info("stopping periodic index rebuild due to context cancellation") return case <-ticker.C: - s.log.Info("starting periodic index rebuild") - if err := s.rebuildDashboardIndexes(ctx); err != nil { - s.log.Error("error during periodic index rebuild", "error", err) - } else { - s.log.Info("periodic index rebuild completed successfully") + s.findIndexesToRebuild(ctx, time.Now()) + } + } +} + +func (s *searchSupport) findIndexesToRebuild(ctx context.Context, now time.Time) { + // Check all open indexes and see if any of them need to be rebuilt. + // This is done periodically to make sure that the indexes are up to date. + + keys := s.search.GetOpenIndexes() + for _, key := range keys { + idx, err := s.search.GetIndex(ctx, key) + if err != nil { + s.log.Error("failed to check index to rebuild", "key", key, "error", err) + continue + } + + if idx == nil { + // This can happen if index was closed in the meantime. + continue + } + + maxAge := s.maxIndexAge + if key.Resource == dashboardv1.DASHBOARD_RESOURCE { + maxAge = s.dashboardIndexMaxAge + } + + var minBuildTime time.Time + if maxAge > 0 { + minBuildTime = now.Add(-maxAge) + } + + bi, err := idx.BuildInfo() + if err != nil { + s.log.Error("failed to get build info for index to rebuild", "key", key, "error", err) + continue + } + + if shouldRebuildIndex(s.minBuildVersion, bi, minBuildTime, nil) { + s.rebuildQueue.Add(rebuildRequest{ + NamespacedResource: key, + minBuildTime: minBuildTime, + minBuildVersion: s.minBuildVersion, + }) + + if s.indexMetrics != nil { + s.indexMetrics.RebuildQueueLength.Set(float64(s.rebuildQueue.Len())) } } } } -func (s *searchSupport) rebuildDashboardIndexes(ctx context.Context) error { - ctx, span := s.tracer.Start(ctx, tracingPrexfixSearch+"RebuildDashboardIndexes") +// runIndexRebuilder is a goroutine waiting for rebuild requests, and rebuilds indexes specified in those requests. +// Rebuild requests can be generated periodically (if configured), or after new documents have been imported into the storage with old RVs. +func (s *searchSupport) runIndexRebuilder(ctx context.Context) { + defer s.bgTaskWg.Done() + + for { + req, err := s.rebuildQueue.Next(ctx) + if err != nil { + s.log.Info("index rebuilder stopped", "error", err) + return + } + + if s.indexMetrics != nil { + s.indexMetrics.RebuildQueueLength.Set(float64(s.rebuildQueue.Len())) + } + + s.rebuildIndex(ctx, req) + } +} + +func (s *searchSupport) rebuildIndex(ctx context.Context, req rebuildRequest) { + ctx, span := s.tracer.Start(ctx, tracingPrexfixSearch+"RebuildIndex") defer span.End() - start := time.Now() - s.log.Info("rebuilding all search indexes") + l := s.log.With("namespace", req.Namespace, "group", req.Group, "resource", req.Resource) - totalBatchesIndexed, err := s.buildIndexes(ctx, true) + idx, err := s.search.GetIndex(ctx, req.NamespacedResource) if err != nil { - return fmt.Errorf("failed to rebuild dashboard indexes: %w", err) + span.RecordError(err) + l.Error("failed to get index to rebuild", "error", err) + return } - end := time.Now() - duration := end.Sub(start) - s.log.Info("completed rebuilding all dashboard search indexes", - "duration", duration, - "rebuilt_indexes", totalBatchesIndexed, - "total_docs", s.search.TotalDocs()) - return nil + if idx == nil { + span.AddEvent("index not found") + l.Error("index not found") + return + } + + bi, err := idx.BuildInfo() + if err != nil { + span.RecordError(err) + l.Error("failed to get build info for index to rebuild", "error", err) + } + + rebuild := shouldRebuildIndex(req.minBuildVersion, bi, req.minBuildTime, l) + if !rebuild { + span.AddEvent("index not rebuilt") + l.Info("index doesn't need to be rebuilt") + return + } + + if req.Resource == dashboardv1.DASHBOARD_RESOURCE { + // we need to clear the cache to make sure we get the latest usage insights data + s.builders.clearNamespacedCache(req.NamespacedResource) + } + + // Get the correct value of size + RV for building the index. This is important for our Bleve + // backend to decide whether to build index in-memory or as file-based. + stats, err := s.storage.GetResourceStats(ctx, req.Namespace, 0) + if err != nil { + span.RecordError(fmt.Errorf("failed to get resource stats: %w", err)) + l.Error("failed to get resource stats", "error", err) + return + } + + size := int64(0) + for _, stat := range stats { + if stat.Namespace == req.Namespace && stat.Group == req.Group && stat.Resource == req.Resource { + size = stat.Count + break + } + } + + _, err = s.build(ctx, req.NamespacedResource, size, "rebuild", true) + if err != nil { + span.RecordError(err) + l.Error("failed to rebuild index", "error", err) + } +} + +func shouldRebuildIndex(minBuildVersion *semver.Version, buildInfo IndexBuildInfo, minBuildTime time.Time, rebuildLogger *slog.Logger) bool { + if !minBuildTime.IsZero() { + if buildInfo.BuildTime.IsZero() || buildInfo.BuildTime.Before(minBuildTime) { + if rebuildLogger != nil { + rebuildLogger.Info("index build time is before minBuildTime, rebuilding the index", "indexBuildTime", buildInfo.BuildTime, "minBuildTime", minBuildTime) + } + return true + } + } + + if minBuildVersion != nil { + if buildInfo.BuildVersion == nil || buildInfo.BuildVersion.Compare(minBuildVersion) < 0 { + if rebuildLogger != nil { + rebuildLogger.Info("index build version is before minBuildVersion, rebuilding the index", "indexBuildVersion", buildInfo.BuildVersion, "minBuildVersion", minBuildVersion) + } + return true + } + } + + return false +} + +type rebuildRequest struct { + NamespacedResource + + minBuildTime time.Time // if not zero, only rebuild index if it has been built before this timestamp + minBuildVersion *semver.Version // if not nil, only rebuild index with build version older than this. } func (s *searchSupport) getOrCreateIndex(ctx context.Context, key NamespacedResource, reason string) (ResourceIndex, error) { diff --git a/pkg/storage/unified/resource/search_test.go b/pkg/storage/unified/resource/search_test.go index be34e3a5724..270719af2a7 100644 --- a/pkg/storage/unified/resource/search_test.go +++ b/pkg/storage/unified/resource/search_test.go @@ -9,11 +9,16 @@ import ( "testing" "time" + "log/slog" + + "github.com/Masterminds/semver" "github.com/grafana/authlib/types" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/trace/noop" + dashboardv1 "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v1beta1" + "github.com/grafana/grafana/pkg/infra/log/logtest" "github.com/grafana/grafana/pkg/storage/unified/resourcepb" ) @@ -27,6 +32,12 @@ type MockResourceIndex struct { updateIndexMu sync.Mutex updateIndexCalls []string + + buildInfo IndexBuildInfo +} + +func (m *MockResourceIndex) BuildInfo() (IndexBuildInfo, error) { + return m.buildInfo, nil } func (m *MockResourceIndex) BulkIndex(req *BulkIndexRequest) error { @@ -120,10 +131,11 @@ func (m *mockStorageBackend) ListModifiedSince(ctx context.Context, key Namespac // mockSearchBackend implements SearchBackend for testing with tracking capabilities type mockSearchBackend struct { - mu sync.Mutex - buildIndexCalls []buildIndexCall - buildEmptyIndexCalls []buildEmptyIndexCall - cache map[NamespacedResource]ResourceIndex + openIndexes []NamespacedResource + + mu sync.Mutex + buildIndexCalls []buildIndexCall + cache map[NamespacedResource]ResourceIndex } type buildIndexCall struct { @@ -132,12 +144,6 @@ type buildIndexCall struct { fields SearchableDocumentFields } -type buildEmptyIndexCall struct { - key NamespacedResource - size int64 // should be 0 for empty indexes - fields SearchableDocumentFields -} - func (m *mockSearchBackend) GetIndex(ctx context.Context, key NamespacedResource) (ResourceIndex, error) { m.mu.Lock() defer m.mu.Unlock() @@ -165,21 +171,11 @@ func (m *mockSearchBackend) BuildIndex(ctx context.Context, key NamespacedResour // Determine if this is an empty index based on size // Empty indexes are characterized by size == 0 - if size == 0 { - // This is an empty index (buildEmptyIndex was called) - m.buildEmptyIndexCalls = append(m.buildEmptyIndexCalls, buildEmptyIndexCall{ - key: key, - size: size, - fields: fields, - }) - } else { - // This is a normal index (build was called) - m.buildIndexCalls = append(m.buildIndexCalls, buildIndexCall{ - key: key, - size: size, - fields: fields, - }) - } + m.buildIndexCalls = append(m.buildIndexCalls, buildIndexCall{ + key: key, + size: size, + fields: fields, + }) return index, nil } @@ -188,6 +184,10 @@ func (m *mockSearchBackend) TotalDocs() int64 { return 0 } +func (m *mockSearchBackend) GetOpenIndexes() []NamespacedResource { + return m.openIndexes +} + func TestSearchGetOrCreateIndex(t *testing.T) { // Setup mock implementations storage := &mockStorageBackend{ @@ -195,22 +195,17 @@ func TestSearchGetOrCreateIndex(t *testing.T) { {NamespacedResource: NamespacedResource{Namespace: "ns", Group: "group", Resource: "resource"}, Count: 50, ResourceVersion: 11111111}, }, } - search := &mockSearchBackend{ - buildIndexCalls: []buildIndexCall{}, - buildEmptyIndexCalls: []buildEmptyIndexCall{}, - } + search := &mockSearchBackend{} supplier := &TestDocumentBuilderSupplier{ GroupsResources: map[string]string{ "group": "resource", }, } - // Create search support with the specified initMaxSize opts := SearchOptions{ - Backend: search, - Resources: supplier, - WorkerThreads: 1, - InitMinCount: 1, // set min count to default for this test + Backend: search, + Resources: supplier, + InitMinCount: 1, // set min count to default for this test } support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil, nil) @@ -250,9 +245,6 @@ func TestSearchGetOrCreateIndexWithIndexUpdate(t *testing.T) { } failedErr := fmt.Errorf("failed to update index") search := &mockSearchBackend{ - buildIndexCalls: []buildIndexCall{}, - buildEmptyIndexCalls: []buildEmptyIndexCall{}, - cache: map[NamespacedResource]ResourceIndex{ {Namespace: "ns", Group: "group", Resource: "bad"}: &MockResourceIndex{ updateIndexError: failedErr, @@ -265,12 +257,10 @@ func TestSearchGetOrCreateIndexWithIndexUpdate(t *testing.T) { }, } - // Create search support with the specified initMaxSize opts := SearchOptions{ - Backend: search, - Resources: supplier, - WorkerThreads: 1, - InitMinCount: 1, // set min count to default for this test + Backend: search, + Resources: supplier, + InitMinCount: 1, // set min count to default for this test } // Enable searchAfterWrite @@ -317,12 +307,10 @@ func TestSearchGetOrCreateIndexWithCancellation(t *testing.T) { }, } - // Create search support with the specified initMaxSize opts := SearchOptions{ - Backend: search, - Resources: supplier, - WorkerThreads: 1, - InitMinCount: 1, // set min count to default for this test + Backend: search, + Resources: supplier, + InitMinCount: 1, // set min count to default for this test } support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil, nil) @@ -381,3 +369,350 @@ func (m *slowSearchBackendWithCache) BuildIndex(ctx context.Context, key Namespa } return idx, nil } + +func TestCombineBuildRequests(t *testing.T) { + type testcase struct { + a, b rebuildRequest + exp rebuildRequest + expOK bool + } + + now := time.Now() + for name, tc := range map[string]testcase{ + "mismatched resource": { + a: rebuildRequest{NamespacedResource: NamespacedResource{Namespace: "a", Group: "a", Resource: "a"}}, + b: rebuildRequest{NamespacedResource: NamespacedResource{Namespace: "b", Group: "b", Resource: "b"}}, + expOK: false, + }, + "equal values": { + a: rebuildRequest{minBuildTime: now, minBuildVersion: semver.MustParse("10.15.20")}, + b: rebuildRequest{minBuildTime: now, minBuildVersion: semver.MustParse("10.15.20")}, + expOK: true, + exp: rebuildRequest{minBuildTime: now, minBuildVersion: semver.MustParse("10.15.20")}, + }, + "empty field": { + a: rebuildRequest{minBuildTime: now}, + b: rebuildRequest{minBuildVersion: semver.MustParse("10.15.20")}, + expOK: true, + exp: rebuildRequest{minBuildTime: now, minBuildVersion: semver.MustParse("10.15.20")}, + }, + "use max build time": { + a: rebuildRequest{minBuildTime: now.Add(2 * time.Hour)}, + b: rebuildRequest{minBuildTime: now.Add(-time.Hour)}, + expOK: true, + exp: rebuildRequest{minBuildTime: now.Add(2 * time.Hour)}, + }, + "use max version": { + a: rebuildRequest{minBuildVersion: semver.MustParse("12.10.99")}, + b: rebuildRequest{minBuildVersion: semver.MustParse("10.15.20")}, + expOK: true, + exp: rebuildRequest{minBuildVersion: semver.MustParse("12.10.99")}, + }, + "both fields": { + a: rebuildRequest{minBuildTime: now.Add(2 * time.Hour), minBuildVersion: semver.MustParse("12.10.99")}, + b: rebuildRequest{minBuildTime: now.Add(-time.Hour), minBuildVersion: semver.MustParse("10.15.20")}, + expOK: true, + exp: rebuildRequest{minBuildTime: now.Add(2 * time.Hour), minBuildVersion: semver.MustParse("12.10.99")}, + }, + } { + t.Run(name, func(t *testing.T) { + res1, ok := combineRebuildRequests(tc.a, tc.b) + require.Equal(t, tc.expOK, ok) + if ok { + require.Equal(t, tc.exp, res1) + } + + // commutativity + res2, ok := combineRebuildRequests(tc.b, tc.a) + require.Equal(t, tc.expOK, ok) + if ok { + require.Equal(t, tc.exp, res2) + } + }) + } +} + +func TestShouldRebuildIndex(t *testing.T) { + type testcase struct { + buildInfo IndexBuildInfo + minTime time.Time + minBuildVersion *semver.Version + + expected bool + } + + now := time.Now() + + for name, tc := range map[string]testcase{ + "empty build info, with no rebuild conditions": { + buildInfo: IndexBuildInfo{}, + expected: false, + }, + "empty build info, with minTime": { + buildInfo: IndexBuildInfo{}, + minTime: now, + expected: true, + }, + "empty build info, with minVersion": { + buildInfo: IndexBuildInfo{}, + minBuildVersion: semver.MustParse("10.15.20"), + expected: true, + }, + "build time before min time": { + buildInfo: IndexBuildInfo{BuildTime: now.Add(-2 * time.Hour)}, + minTime: now, + expected: true, + }, + "build time after min time": { + buildInfo: IndexBuildInfo{BuildTime: now.Add(2 * time.Hour)}, + minTime: now, + expected: false, + }, + "build version before min version": { + buildInfo: IndexBuildInfo{BuildVersion: semver.MustParse("10.15.19")}, + minBuildVersion: semver.MustParse("10.15.20"), + expected: true, + }, + "build version after min version": { + buildInfo: IndexBuildInfo{BuildVersion: semver.MustParse("11.0.0")}, + minBuildVersion: semver.MustParse("10.15.20"), + expected: false, + }, + } { + t.Run(name, func(t *testing.T) { + res := shouldRebuildIndex(tc.minBuildVersion, tc.buildInfo, tc.minTime, slog.New(&logtest.NopHandler{})) + require.Equal(t, tc.expected, res) + }) + } +} + +func TestFindIndexesForRebuild(t *testing.T) { + storage := &mockStorageBackend{ + resourceStats: []ResourceStats{ + {NamespacedResource: NamespacedResource{Namespace: "ns", Group: "group", Resource: "resource"}, Count: 50, ResourceVersion: 11111111}, + }, + } + + now := time.Now() + + search := &mockSearchBackend{ + openIndexes: []NamespacedResource{ + {Namespace: "resource-2h-v5", Group: "group", Resource: "folder"}, + {Namespace: "resource-2h-v6", Group: "group", Resource: "folder"}, + {Namespace: "resource-10h-v5", Group: "group", Resource: "folder"}, + {Namespace: "resource-10h-v6", Group: "group", Resource: "folder"}, + {Namespace: "resource-v5", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}, + {Namespace: "resource-v6", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}, + {Namespace: "resource-2h-v5", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}, + {Namespace: "resource-2h-v6", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}, + + // We report this index as open, but it's really not. This can happen if index expires between the call + // to GetOpenIndexes and the call to GetIndex. + {Namespace: "ns", Group: "group", Resource: "missing"}, + }, + + cache: map[NamespacedResource]ResourceIndex{ + // To be rebuilt because of minVersion + {Namespace: "resource-2h-v5", Group: "group", Resource: "folder"}: &MockResourceIndex{ + buildInfo: IndexBuildInfo{BuildTime: now.Add(-2 * time.Hour), BuildVersion: semver.MustParse("5.0.0")}, + }, + + // Not rebuilt + {Namespace: "resource-2h-v6", Group: "group", Resource: "folder"}: &MockResourceIndex{ + buildInfo: IndexBuildInfo{BuildTime: now.Add(-2 * time.Hour), BuildVersion: semver.MustParse("6.0.0")}, + }, + + // To be rebuilt because of minTime + {Namespace: "resource-10h-v5", Group: "group", Resource: "folder"}: &MockResourceIndex{ + buildInfo: IndexBuildInfo{BuildTime: now.Add(-10 * time.Hour), BuildVersion: semver.MustParse("5.0.0")}, + }, + + // To be rebuilt because of minTime + {Namespace: "resource-10h-v6", Group: "group", Resource: "folder"}: &MockResourceIndex{ + buildInfo: IndexBuildInfo{BuildTime: now.Add(-10 * time.Hour), BuildVersion: semver.MustParse("6.0.0")}, + }, + + // To be rebuilt because of minVersion + {Namespace: "resource-v5", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}: &MockResourceIndex{ + buildInfo: IndexBuildInfo{BuildTime: now, BuildVersion: semver.MustParse("5.0.0")}, + }, + + // Not rebuilt + {Namespace: "resource-v6", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}: &MockResourceIndex{ + buildInfo: IndexBuildInfo{BuildTime: now, BuildVersion: semver.MustParse("6.0.0")}, + }, + + // To be rebuilt because of minTime (1h for dashboards) + {Namespace: "resource-2h-v5", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}: &MockResourceIndex{ + buildInfo: IndexBuildInfo{BuildTime: now.Add(-2 * time.Hour), BuildVersion: semver.MustParse("5.0.0")}, + }, + + // To be rebuilt because of minTime (1h for dashboards) + {Namespace: "resource-2h-v6", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}: &MockResourceIndex{ + buildInfo: IndexBuildInfo{BuildTime: now.Add(-2 * time.Hour), BuildVersion: semver.MustParse("6.0.0")}, + }, + }, + } + + supplier := &TestDocumentBuilderSupplier{ + GroupsResources: map[string]string{ + "group": "resource", + }, + } + + opts := SearchOptions{ + Backend: search, + Resources: supplier, + + DashboardIndexMaxAge: 1 * time.Hour, + MaxIndexAge: 5 * time.Hour, + MinBuildVersion: semver.MustParse("5.5.5"), + } + + support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil, nil) + require.NoError(t, err) + require.NotNil(t, support) + + support.findIndexesToRebuild(context.Background(), now) + require.Equal(t, 6, support.rebuildQueue.Len()) + + now5m := now.Add(5 * time.Minute) + + // Running findIndexesToRebuild again should not add any new indexes to the rebuild queue, and all existing + // ones should be "combined" with new ones (this will "bump" minBuildTime) + support.findIndexesToRebuild(context.Background(), now5m) + require.Equal(t, 6, support.rebuildQueue.Len()) + + // Values that we expect to find in rebuild requests. + minBuildVersion := semver.MustParse("5.5.5") + minBuildTime := now5m.Add(-5 * time.Hour) + minBuildTimeDashboard := now5m.Add(-1 * time.Hour) + + vals := support.rebuildQueue.Elements() + require.ElementsMatch(t, vals, []rebuildRequest{ + {NamespacedResource: NamespacedResource{Namespace: "resource-2h-v5", Group: "group", Resource: "folder"}, minBuildVersion: minBuildVersion, minBuildTime: minBuildTime}, + {NamespacedResource: NamespacedResource{Namespace: "resource-10h-v5", Group: "group", Resource: "folder"}, minBuildVersion: minBuildVersion, minBuildTime: minBuildTime}, + {NamespacedResource: NamespacedResource{Namespace: "resource-10h-v6", Group: "group", Resource: "folder"}, minBuildVersion: minBuildVersion, minBuildTime: minBuildTime}, + + {NamespacedResource: NamespacedResource{Namespace: "resource-v5", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}, minBuildVersion: minBuildVersion, minBuildTime: minBuildTimeDashboard}, + {NamespacedResource: NamespacedResource{Namespace: "resource-2h-v5", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}, minBuildVersion: minBuildVersion, minBuildTime: minBuildTimeDashboard}, + {NamespacedResource: NamespacedResource{Namespace: "resource-2h-v6", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}, minBuildVersion: minBuildVersion, minBuildTime: minBuildTimeDashboard}, + }) +} + +func TestRebuildIndexes(t *testing.T) { + storage := &mockStorageBackend{} + + now := time.Now() + + search := &mockSearchBackend{ + cache: map[NamespacedResource]ResourceIndex{ + {Namespace: "idx1", Group: "group", Resource: "res"}: &MockResourceIndex{ + buildInfo: IndexBuildInfo{BuildVersion: semver.MustParse("5.0.0")}, + }, + + {Namespace: "idx2", Group: "group", Resource: "res"}: &MockResourceIndex{ + buildInfo: IndexBuildInfo{BuildTime: now.Add(-2 * time.Hour)}, + }, + + {Namespace: "idx3", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}: &MockResourceIndex{}, + }, + } + + supplier := &TestDocumentBuilderSupplier{ + GroupsResources: map[string]string{ + "group": "resource", + }, + } + + opts := SearchOptions{ + Backend: search, + Resources: supplier, + } + + support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil, nil) + require.NoError(t, err) + require.NotNil(t, support) + + // Note: we can only rebuild each index once, after that it "loses" it's build info. + + t.Run("Don't rebuild if min build version is old", func(t *testing.T) { + checkRebuildIndex(t, support, rebuildRequest{ + NamespacedResource: NamespacedResource{Namespace: "idx1", Group: "group", Resource: "res"}, + minBuildVersion: semver.MustParse("4.5"), + }, true, false) + }) + + t.Run("Rebuild if min build version is more recent", func(t *testing.T) { + checkRebuildIndex(t, support, rebuildRequest{ + NamespacedResource: NamespacedResource{Namespace: "idx1", Group: "group", Resource: "res"}, + minBuildVersion: semver.MustParse("5.5.5"), + }, true, true) + }) + + t.Run("Don't rebuild if min build time is very old", func(t *testing.T) { + checkRebuildIndex(t, support, rebuildRequest{ + NamespacedResource: NamespacedResource{Namespace: "idx2", Group: "group", Resource: "res"}, + minBuildTime: now.Add(-5 * time.Hour), + }, true, false) + }) + + t.Run("Rebuild if min build time is more recent", func(t *testing.T) { + checkRebuildIndex(t, support, rebuildRequest{ + NamespacedResource: NamespacedResource{Namespace: "idx2", Group: "group", Resource: "res"}, + minBuildTime: now.Add(-1 * time.Hour), + }, true, true) + }) + + t.Run("Don't rebuild if index doesn't exist.", func(t *testing.T) { + checkRebuildIndex(t, support, rebuildRequest{ + NamespacedResource: NamespacedResource{Namespace: "unknown", Group: "group", Resource: "res"}, + minBuildTime: now.Add(-5 * time.Hour), + }, false, true) + }) + + t.Run("Rebuild dashboard index (it has no build info), verify that builders cache was emptied.", func(t *testing.T) { + dashKey := NamespacedResource{Namespace: "idx3", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE} + + support.builders.ns.Add(dashKey, &MockDocumentBuilder{}) + _, ok := support.builders.ns.Get(dashKey) + require.True(t, ok) + + checkRebuildIndex(t, support, rebuildRequest{ + NamespacedResource: dashKey, + minBuildTime: now, + }, true, true) + + // Verify that builders cache was emptied. + _, ok = support.builders.ns.Get(dashKey) + require.False(t, ok) + }) +} + +func checkRebuildIndex(t *testing.T, support *searchSupport, req rebuildRequest, indexExists, expectedRebuild bool) { + ctx := context.Background() + + idxBefore, err := support.search.GetIndex(ctx, req.NamespacedResource) + require.NoError(t, err) + if indexExists { + require.NotNil(t, idxBefore, "index should exist before rebuildIndex") + } else { + require.Nil(t, idxBefore, "index should not exist before rebuildIndex") + } + + support.rebuildIndex(ctx, req) + + idxAfter, err := support.search.GetIndex(ctx, req.NamespacedResource) + require.NoError(t, err) + + if indexExists { + require.NotNil(t, idxAfter, "index should exist after rebuildIndex") + if expectedRebuild { + require.NotSame(t, idxBefore, idxAfter, "index should be rebuilt") + } else { + require.Same(t, idxBefore, idxAfter, "index should not be rebuilt") + } + } else { + require.Nil(t, idxAfter, "index should not exist after rebuildIndex") + } +} diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index 575d1dd608b..f2613d6a3cf 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -11,6 +11,7 @@ import ( "sync/atomic" "time" + "github.com/Masterminds/semver" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/trace" @@ -21,7 +22,6 @@ import ( claims "github.com/grafana/authlib/types" "github.com/grafana/dskit/backoff" - "github.com/grafana/dskit/ring" "github.com/grafana/grafana/pkg/apimachinery/utils" "github.com/grafana/grafana/pkg/apimachinery/validation" @@ -179,15 +179,22 @@ type SearchOptions struct { Resources DocumentBuilderSupplier // How many threads should build indexes - WorkerThreads int + InitWorkerThreads int // Skip building index on startup for small indexes InitMinCount int - // Interval for periodic index rebuilds (0 disables periodic rebuilds) - RebuildInterval time.Duration + // How often to rebuild dashboard index. 0 disables periodic rebuilds. + DashboardIndexMaxAge time.Duration - Ring *ring.Ring + // Maximum age of file-based index that can be reused. Ignored if zero. + MaxIndexAge time.Duration + + // Minimum build version for reusing file-based indexes. Ignored if nil. + MinBuildVersion *semver.Version + + // Number of workers to use for index rebuilds. + IndexRebuildWorkers int } type ResourceServerOptions struct { @@ -422,6 +429,10 @@ func (s *server) Stop(ctx context.Context) error { } } + if s.search != nil { + s.search.stop() + } + // Stops the streaming s.cancel() diff --git a/pkg/storage/unified/search/bleve.go b/pkg/storage/unified/search/bleve.go index ef06f585363..7e96a10da26 100644 --- a/pkg/storage/unified/search/bleve.go +++ b/pkg/storage/unified/search/bleve.go @@ -52,7 +52,7 @@ const ( // Keys used to store internal data in index. const ( internalRVKey = "rv" // Encoded as big-endian int64 - internalBuildInfoKey = "build_info" // Encoded as JSON of IndexBuildInfo struct + internalBuildInfoKey = "build_info" // Encoded as JSON of buildInfo struct ) var _ resource.SearchBackend = &bleveBackend{} @@ -75,9 +75,6 @@ type BleveOptions struct { BuildVersion string - MaxFileIndexAge time.Duration // Maximum age of file-based index that can be reused. Ignored if zero. - MinBuildVersion *semver.Version // Minimum build version for reusing file-based indexes. Ignored if nil. - Logger *slog.Logger UseFullNgram bool @@ -179,6 +176,17 @@ func (b *bleveBackend) GetIndex(_ context.Context, key resource.NamespacedResour return idx, nil } +func (b *bleveBackend) GetOpenIndexes() []resource.NamespacedResource { + b.cacheMx.RLock() + defer b.cacheMx.RUnlock() + + result := make([]resource.NamespacedResource, 0, len(b.cache)) + for key := range b.cache { + result = append(result, key) + } + return result +} + func (b *bleveBackend) getCachedIndex(key resource.NamespacedResource, now time.Time) *bleveIndex { // Check index with read-lock first. b.cacheMx.RLock() @@ -318,7 +326,7 @@ func newBleveIndex(path string, mapper mapping.IndexMapping, buildTime time.Time return nil, err } - bi := IndexBuildInfo{ + bi := buildInfo{ BuildTime: buildTime.Unix(), BuildVersion: buildVersion, } @@ -336,29 +344,11 @@ func newBleveIndex(path string, mapper mapping.IndexMapping, buildTime time.Time return ix, nil } -type IndexBuildInfo struct { +type buildInfo struct { BuildTime int64 `json:"build_time"` // Unix seconds timestamp of time when the index was built BuildVersion string `json:"build_version"` // Grafana version used when building the index } -func (bi IndexBuildInfo) GetBuildTime() time.Time { - if bi.BuildTime == 0 { - return time.Time{} - } - return time.Unix(bi.BuildTime, 0) -} - -func (bi IndexBuildInfo) GetBuildVersion() *semver.Version { - if bi.BuildVersion == "" { - return nil - } - v, err := semver.NewVersion(bi.BuildVersion) - if err != nil { - return nil - } - return v -} - // BuildIndex builds an index from scratch or retrieves it from the filesystem. // If built successfully, the new index replaces the old index in the cache (if there was any). // Existing index in the file system is reused, if it exists, and if size indicates that we should use file-based index, and rebuild is not true. @@ -435,11 +425,7 @@ func (b *bleveBackend) BuildIndex( // This happens on startup, or when memory-based index has expired. (We don't expire file-based indexes) // If we do have an unexpired cached index already, we always build a new index from scratch. if cachedIndex == nil && !rebuild { - minBuildTime := time.Time{} - if b.opts.MaxFileIndexAge > 0 { - minBuildTime = time.Now().Add(-b.opts.MaxFileIndexAge) - } - index, fileIndexName, indexRV = b.findPreviousFileBasedIndex(resourceDir, minBuildTime, b.opts.MinBuildVersion) + index, fileIndexName, indexRV = b.findPreviousFileBasedIndex(resourceDir) } if index != nil { @@ -621,18 +607,6 @@ func isPathWithinRoot(path, absoluteRoot string) bool { return true } -// cacheKeys returns list of keys for indexes in the cache (including possibly expired ones). -func (b *bleveBackend) cacheKeys() []resource.NamespacedResource { - b.cacheMx.RLock() - defer b.cacheMx.RUnlock() - - keys := make([]resource.NamespacedResource, 0, len(b.cache)) - for k := range b.cache { - keys = append(keys, k) - } - return keys -} - // TotalDocs returns the total number of documents across all indices func (b *bleveBackend) TotalDocs() int64 { var totalDocs int64 @@ -640,7 +614,7 @@ func (b *bleveBackend) TotalDocs() int64 { // We do this to avoid keeping a lock for the entire TotalDocs function, since DocCount may be slow (due to disk access). now := time.Now() - for _, key := range b.cacheKeys() { + for _, key := range b.GetOpenIndexes() { idx := b.getCachedIndex(key, now) if idx == nil { continue @@ -658,7 +632,7 @@ func formatIndexName(now time.Time) string { return now.Format("20060102-150405") } -func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string, minBuildTime time.Time, minBuildVersion *semver.Version) (bleve.Index, string, int64) { +func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string) (bleve.Index, string, int64) { entries, err := os.ReadDir(resourceDir) if err != nil { return nil, "", 0 @@ -684,31 +658,6 @@ func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string, minBuildTi continue } - buildInfo, err := getBuildInfo(idx) - if err != nil { - b.log.Error("error getting build info from index", "indexDir", indexDir, "err", err) - _ = idx.Close() - continue - } - - if !minBuildTime.IsZero() { - bt := buildInfo.GetBuildTime() - if bt.IsZero() || bt.Before(minBuildTime) { - b.log.Debug("index build time is before minBuildTime, not reusing the index", "indexDir", indexDir, "indexBuildTime", bt, "minBuildTime", minBuildTime) - _ = idx.Close() - continue - } - } - - if minBuildVersion != nil { - bv := buildInfo.GetBuildVersion() - if bv == nil || bv.Compare(minBuildVersion) < 0 { - b.log.Debug("index build version is before minBuildVersion, not reusing the index", "indexDir", indexDir, "indexBuildVersion", bv, "minBuildVersion", minBuildVersion) - _ = idx.Close() - continue - } - } - return idx, indexName, indexRV } @@ -878,21 +827,46 @@ func getRV(index bleve.Index) (int64, error) { return int64(binary.BigEndian.Uint64(raw)), nil } -func getBuildInfo(index bleve.Index) (IndexBuildInfo, error) { +func getBuildInfo(index bleve.Index) (buildInfo, error) { raw, err := index.GetInternal([]byte(internalBuildInfoKey)) if err != nil { - return IndexBuildInfo{}, err + return buildInfo{}, err } if len(raw) == 0 { - return IndexBuildInfo{}, nil + return buildInfo{}, nil } - res := IndexBuildInfo{} + res := buildInfo{} err = json.Unmarshal(raw, &res) return res, err } +func (b *bleveIndex) BuildInfo() (resource.IndexBuildInfo, error) { + bi, err := getBuildInfo(b.index) + if err != nil { + return resource.IndexBuildInfo{}, err + } + + bt := time.Time{} + if bi.BuildTime > 0 { + bt = time.Unix(bi.BuildTime, 0) + } + + var bv *semver.Version + if bi.BuildVersion != "" { + v, err := semver.NewVersion(bi.BuildVersion) + if err == nil { + bv = v + } + } + + return resource.IndexBuildInfo{ + BuildTime: bt, + BuildVersion: bv, + }, nil +} + func (b *bleveIndex) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) { if req.NextPageToken != "" { return nil, fmt.Errorf("next page not implemented yet") diff --git a/pkg/storage/unified/search/bleve_test.go b/pkg/storage/unified/search/bleve_test.go index 347195786a5..62725adda5d 100644 --- a/pkg/storage/unified/search/bleve_test.go +++ b/pkg/storage/unified/search/bleve_test.go @@ -14,7 +14,6 @@ import ( "testing" "time" - "github.com/Masterminds/semver" "github.com/blevesearch/bleve/v2" authlib "github.com/grafana/authlib/types" "github.com/prometheus/client_golang/prometheus" @@ -827,24 +826,6 @@ func withRootDir(root string) setupOption { } } -func withBuildVersion(version string) setupOption { - return func(options *BleveOptions) { - options.BuildVersion = version - } -} - -func withMinBuildVersion(version *semver.Version) setupOption { - return func(options *BleveOptions) { - options.MinBuildVersion = version - } -} - -func withMaxFileIndexAge(maxAge time.Duration) setupOption { - return func(options *BleveOptions) { - options.MaxFileIndexAge = maxAge - } -} - func withOwnsIndexFn(fn func(key resource.NamespacedResource) (bool, error)) setupOption { return func(options *BleveOptions) { options.OwnsIndex = fn @@ -978,81 +959,39 @@ func TestBuildIndex(t *testing.T) { Resource: "resource", } - const alwaysRebuildDueToAge = 1 * time.Nanosecond - const neverRebuildDueToAge = 1 * time.Hour - for _, rebuild := range []bool{false, true} { - for _, version := range []string{"", "12.5.123"} { - for _, minBuildVersion := range []*semver.Version{nil, semver.MustParse("12.0.0"), semver.MustParse("13.0.0")} { - for _, maxIndexAge := range []time.Duration{0, alwaysRebuildDueToAge, neverRebuildDueToAge} { - shouldRebuild := rebuild - if minBuildVersion != nil { - shouldRebuild = shouldRebuild || version == "" || minBuildVersion.GreaterThan(semver.MustParse(version)) - } - if maxIndexAge > 0 { - shouldRebuild = shouldRebuild || maxIndexAge == alwaysRebuildDueToAge - } + testName := fmt.Sprintf("rebuild=%t", rebuild) - testName := "" - if shouldRebuild { - testName += "should REBUILD index" - } else { - testName += "should REUSE index" - } + t.Run(testName, func(t *testing.T) { + tmpDir := t.TempDir() - if rebuild { - testName += " when rebuild is true" - } else { - testName += " when rebuild is false" - } + const ( + firstIndexDocsCount = 10 + secondIndexDocsCount = 1000 + ) - if version != "" { - testName += " build version is " + version - } else { - testName += " build version is empty" - } - - if minBuildVersion != nil { - testName += " min build version is " + minBuildVersion.String() - } else { - testName += " min build version is nil" - } - - testName += " max index age is " + maxIndexAge.String() - - t.Run(testName, func(t *testing.T) { - tmpDir := t.TempDir() - - const ( - firstIndexDocsCount = 10 - secondIndexDocsCount = 1000 - ) - - { - backend, _ := setupBleveBackend(t, withFileThreshold(5), withRootDir(tmpDir), withBuildVersion(version)) - _, err := backend.BuildIndex(context.Background(), ns, firstIndexDocsCount, nil, "test", indexTestDocs(ns, firstIndexDocsCount, 100), nil, rebuild) - require.NoError(t, err) - backend.Stop() - } - - // Make sure we pass at least 1 nanosecond (alwaysRebuildDueToAge) to ensure that the index needs to be rebuild. - time.Sleep(1 * time.Millisecond) - - newBackend, _ := setupBleveBackend(t, withFileThreshold(5), withRootDir(tmpDir), withBuildVersion(version), withMinBuildVersion(minBuildVersion), withMaxFileIndexAge(maxIndexAge)) - idx, err := newBackend.BuildIndex(context.Background(), ns, secondIndexDocsCount, nil, "test", indexTestDocs(ns, secondIndexDocsCount, 100), nil, rebuild) - require.NoError(t, err) - - cnt, err := idx.DocCount(context.Background(), "") - require.NoError(t, err) - if shouldRebuild { - require.Equal(t, int64(secondIndexDocsCount), cnt, "Index has been not rebuilt") - } else { - require.Equal(t, int64(firstIndexDocsCount), cnt, "Index has not been reused") - } - }) - } + { + backend, _ := setupBleveBackend(t, withFileThreshold(5), withRootDir(tmpDir)) + _, err := backend.BuildIndex(context.Background(), ns, firstIndexDocsCount, nil, "test", indexTestDocs(ns, firstIndexDocsCount, 100), nil, rebuild) + require.NoError(t, err) + backend.Stop() } - } + + // Make sure we pass at least 1 nanosecond (alwaysRebuildDueToAge) to ensure that the index needs to be rebuild. + time.Sleep(1 * time.Millisecond) + + newBackend, _ := setupBleveBackend(t, withFileThreshold(5), withRootDir(tmpDir)) + idx, err := newBackend.BuildIndex(context.Background(), ns, secondIndexDocsCount, nil, "test", indexTestDocs(ns, secondIndexDocsCount, 100), nil, rebuild) + require.NoError(t, err) + + cnt, err := idx.DocCount(context.Background(), "") + require.NoError(t, err) + if rebuild { + require.Equal(t, int64(secondIndexDocsCount), cnt, "Index has been not rebuilt") + } else { + require.Equal(t, int64(firstIndexDocsCount), cnt, "Index has not been reused") + } + }) } } diff --git a/pkg/storage/unified/search/options.go b/pkg/storage/unified/search/options.go index a5bb1d52a8c..a719bebf32c 100644 --- a/pkg/storage/unified/search/options.go +++ b/pkg/storage/unified/search/options.go @@ -41,15 +41,13 @@ func NewSearchOptions( } bleve, err := NewBleveBackend(BleveOptions{ - Root: root, - FileThreshold: int64(cfg.IndexFileThreshold), // fewer than X items will use a memory index - BatchSize: cfg.IndexMaxBatchSize, // This is the batch size for how many objects to add to the index at once - IndexCacheTTL: cfg.IndexCacheTTL, // How long to keep the index cache in memory - BuildVersion: cfg.BuildVersion, - MaxFileIndexAge: cfg.MaxFileIndexAge, - MinBuildVersion: minVersion, - UseFullNgram: features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageUseFullNgram), - OwnsIndex: ownsIndexFn, + Root: root, + FileThreshold: int64(cfg.IndexFileThreshold), // fewer than X items will use a memory index + BatchSize: cfg.IndexMaxBatchSize, // This is the batch size for how many objects to add to the index at once + IndexCacheTTL: cfg.IndexCacheTTL, // How long to keep the index cache in memory + BuildVersion: cfg.BuildVersion, + UseFullNgram: features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageUseFullNgram), + OwnsIndex: ownsIndexFn, }, tracer, indexMetrics) if err != nil { @@ -57,11 +55,14 @@ func NewSearchOptions( } return resource.SearchOptions{ - Backend: bleve, - Resources: docs, - WorkerThreads: cfg.IndexWorkers, - InitMinCount: cfg.IndexMinCount, - RebuildInterval: cfg.IndexRebuildInterval, + Backend: bleve, + Resources: docs, + InitWorkerThreads: cfg.IndexWorkers, + IndexRebuildWorkers: cfg.IndexRebuildWorkers, + InitMinCount: cfg.IndexMinCount, + DashboardIndexMaxAge: cfg.IndexRebuildInterval, + MaxIndexAge: cfg.MaxFileIndexAge, + MinBuildVersion: minVersion, }, nil } return resource.SearchOptions{}, nil diff --git a/pkg/util/debouncer/queue.go b/pkg/util/debouncer/queue.go new file mode 100644 index 00000000000..a6dd404da3a --- /dev/null +++ b/pkg/util/debouncer/queue.go @@ -0,0 +1,119 @@ +package debouncer + +import ( + "context" + "errors" + "slices" + "sync" +) + +type CombineFn[T any] func(a, b T) (c T, ok bool) + +// Queue is a queue of elements. Elements added to the queue can be combined together by the provided combiner function. +// Once the queue is closed, no more elements can be added, but Next() will still return remaining elements. +type Queue[T any] struct { + combineFn CombineFn[T] + + mu sync.Mutex + elements []T + closed bool + waitChan chan struct{} // if not nil, will be closed when new element is added +} + +func NewQueue[T any](combineFn CombineFn[T]) *Queue[T] { + return &Queue[T]{ + combineFn: combineFn, + } +} + +func (q *Queue[T]) Len() int { + q.mu.Lock() + defer q.mu.Unlock() + return len(q.elements) +} + +// Elements returns copy of the queue. +func (q *Queue[T]) Elements() []T { + q.mu.Lock() + defer q.mu.Unlock() + return slices.Clone(q.elements) +} + +func (q *Queue[T]) Add(n T) { + q.mu.Lock() + defer q.mu.Unlock() + if q.closed { + panic("queue already closed") + } + + for i, e := range q.elements { + if c, ok := q.combineFn(e, n); ok { + // No need to signal, since we are not adding new element. + q.elements[i] = c + return + } + } + + q.elements = append(q.elements, n) + q.notifyWaiters() +} + +// Must be called with lock held. +func (q *Queue[T]) notifyWaiters() { + if q.waitChan != nil { + // Wakes up all waiting goroutines (but also possibly zero, if they stopped waiting already). + close(q.waitChan) + q.waitChan = nil + } +} + +var ErrClosed = errors.New("queue closed") + +// Next returns the next element in the queue. If no element is available, Next will block until +// an element is added to the queue, or provided context is done. +// If the queue is closed, ErrClosed is returned. +func (q *Queue[T]) Next(ctx context.Context) (T, error) { + var zero T + + q.mu.Lock() + unlockInDefer := true + defer func() { + if unlockInDefer { + q.mu.Unlock() + } + }() + + for len(q.elements) == 0 { + if q.closed { + return zero, ErrClosed + } + + // Wait for an element. Make sure there's a wait channel that we can use. + wch := q.waitChan + if wch == nil { + wch = make(chan struct{}) + q.waitChan = wch + } + // Unlock before waiting + q.mu.Unlock() + + select { + case <-ctx.Done(): + unlockInDefer = false + return zero, ctx.Err() + case <-wch: + q.mu.Lock() + } + } + + first := q.elements[0] + q.elements = q.elements[1:] + return first, nil +} + +func (q *Queue[T]) Close() { + q.mu.Lock() + defer q.mu.Unlock() + q.closed = true + q.notifyWaiters() +} diff --git a/pkg/util/debouncer/queue_test.go b/pkg/util/debouncer/queue_test.go new file mode 100644 index 00000000000..76106f8f99a --- /dev/null +++ b/pkg/util/debouncer/queue_test.go @@ -0,0 +1,155 @@ +package debouncer + +import ( + "context" + "errors" + "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "go.uber.org/goleak" +) + +// This verifies that all goroutines spawned from tests are finished at the end of tests. +// Applies to all tests in the package. +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} + +func TestQueueBasic(t *testing.T) { + q := NewQueue(func(a, b int) (c int, ok bool) { + return a + b, true + }) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) + defer cancel() + // Empty queue will time out. + require.Equal(t, context.DeadlineExceeded, nextErr(t, q, ctx)) + + q.Add(10) + require.Equal(t, 10, next(t, q)) + require.Equal(t, 0, q.Len()) + + q.Add(20) + require.Equal(t, 20, next(t, q)) + require.Equal(t, 0, q.Len()) + + q.Add(10) + require.Equal(t, 1, q.Len()) + q.Add(20) + require.Equal(t, 1, q.Len()) + require.Equal(t, 30, next(t, q)) + require.Equal(t, 0, q.Len()) + + q.Add(100) + require.Equal(t, 1, q.Len()) + q.Close() + require.Equal(t, 1, q.Len()) + require.Equal(t, 100, next(t, q)) + require.Equal(t, ErrClosed, nextErr(t, q, context.Background())) + require.Equal(t, 0, q.Len()) + + // We can call Next repeatedly, but will always get error. + require.Equal(t, ErrClosed, nextErr(t, q, context.Background())) +} + +func TestQueueConcurrency(t *testing.T) { + q := NewQueue(func(a, b int64) (c int64, ok bool) { + // Combine the same numbers together. + if a == b { + return a + b, true + } + return 0, false + }) + + const numbers = 10000 + const writeConcurrency = 50 + const readConcurrency = 25 + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + totalWrittenSum := atomic.NewInt64(0) + totalReadSum := atomic.NewInt64(0) + addCalls := atomic.NewInt64(0) + nextCalls := atomic.NewInt64(0) + + // We will add some numbers to the queue. + writesWG := sync.WaitGroup{} + for i := 0; i < writeConcurrency; i++ { + writesWG.Add(1) + go func() { + defer writesWG.Done() + for j := 0; j < numbers; j++ { + v := r.Int63n(100) // Generate small number, so that we have a chance for combining some numbers. + q.Add(v) + addCalls.Inc() + totalWrittenSum.Add(v) + } + }() + } + + readsWG := sync.WaitGroup{} + for i := 0; i < readConcurrency; i++ { + readsWG.Add(1) + go func() { + defer readsWG.Done() + + for { + v, err := q.Next(context.Background()) + if errors.Is(err, ErrClosed) { + return + } + require.NoError(t, err) + + nextCalls.Inc() + totalReadSum.Add(v) + } + }() + } + + writesWG.Wait() + // Close queue after sending all numbers. This signals readers that they can stop. + q.Close() + + // Wait until all readers finish too. + readsWG.Wait() + + // Verify that all numbers were sent, combined and received. + require.Equal(t, int64(writeConcurrency*numbers), addCalls.Load()) + require.Equal(t, totalWrittenSum.Load(), totalReadSum.Load()) + require.LessOrEqual(t, nextCalls.Load(), addCalls.Load()) + t.Log("add calls:", addCalls.Load(), "next calls:", nextCalls.Load(), "total written sum:", totalWrittenSum.Load(), "total read sum:", totalReadSum.Load()) +} + +func TestQueueCloseUnblocksReaders(t *testing.T) { + q := NewQueue(func(a, b int) (c int, ok bool) { + return a + b, true + }) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(50 * time.Millisecond) + q.Close() + }() + + _, err := q.Next(context.Background()) + require.ErrorIs(t, err, ErrClosed) + + wg.Wait() +} + +func next[T any](t *testing.T, q *Queue[T]) T { + v, err := q.Next(context.Background()) + require.NoError(t, err) + return v +} +func nextErr[T any](t *testing.T, q *Queue[T], ctx context.Context) error { + _, err := q.Next(ctx) + require.Error(t, err) + return err +}