diff --git a/pkg/storage/unified/resource/search.go b/pkg/storage/unified/resource/search.go index bc85a0a5a8a..50541be922f 100644 --- a/pkg/storage/unified/resource/search.go +++ b/pkg/storage/unified/resource/search.go @@ -88,7 +88,7 @@ type ResourceIndex interface { // UpdateIndex updates the index with the latest data (using update function provided when index was built) to guarantee strong consistency during the search. // Returns RV to which index was updated. - UpdateIndex(ctx context.Context, reason string) (int64, error) + UpdateIndex(ctx context.Context) (int64, error) // BuildInfo returns build information about the index. BuildInfo() (IndexBuildInfo, error) @@ -102,7 +102,7 @@ type UpdateFn func(context context.Context, index ResourceIndex, sinceRV int64) // SearchBackend contains the technology specific logic to support search type SearchBackend interface { // GetIndex returns existing index, or nil. - GetIndex(ctx context.Context, key NamespacedResource) (ResourceIndex, error) + GetIndex(key NamespacedResource) ResourceIndex // BuildIndex builds an index from scratch. // Depending on the size, the backend may choose different options (eg: memory vs disk). @@ -540,23 +540,18 @@ func (s *searchSupport) runPeriodicScanForIndexesToRebuild(ctx context.Context) s.log.Info("stopping periodic index rebuild due to context cancellation") return case <-ticker.C: - s.findIndexesToRebuild(ctx, time.Now()) + s.findIndexesToRebuild(time.Now()) } } } -func (s *searchSupport) findIndexesToRebuild(ctx context.Context, now time.Time) { +func (s *searchSupport) findIndexesToRebuild(now time.Time) { // Check all open indexes and see if any of them need to be rebuilt. // This is done periodically to make sure that the indexes are up to date. keys := s.search.GetOpenIndexes() for _, key := range keys { - idx, err := s.search.GetIndex(ctx, key) - if err != nil { - s.log.Error("failed to check index to rebuild", "key", key, "error", err) - continue - } - + idx := s.search.GetIndex(key) if idx == nil { // This can happen if index was closed in the meantime. continue @@ -618,13 +613,7 @@ func (s *searchSupport) rebuildIndex(ctx context.Context, req rebuildRequest) { l := s.log.With("namespace", req.Namespace, "group", req.Group, "resource", req.Resource) - idx, err := s.search.GetIndex(ctx, req.NamespacedResource) - if err != nil { - span.RecordError(err) - l.Error("failed to get index to rebuild", "error", err) - return - } - + idx := s.search.GetIndex(req.NamespacedResource) if idx == nil { span.AddEvent("index not found") l.Error("index not found") @@ -716,11 +705,7 @@ func (s *searchSupport) getOrCreateIndex(ctx context.Context, key NamespacedReso attribute.String("namespace", key.Namespace), ) - idx, err := s.search.GetIndex(ctx, key) - if err != nil { - return nil, tracing.Error(span, err) - } - + idx := s.search.GetIndex(key) if idx == nil { span.AddEvent("Building index") ch := s.buildIndex.DoChan(key.String(), func() (interface{}, error) { @@ -730,8 +715,8 @@ func (s *searchSupport) getOrCreateIndex(ctx context.Context, key NamespacedReso // Recheck if some other goroutine managed to build an index in the meantime. // (That is, it finished running this function and stored the index into the cache) - idx, err := s.search.GetIndex(ctx, key) - if err == nil && idx != nil { + idx := s.search.GetIndex(key) + if idx != nil { return idx, nil } @@ -773,7 +758,7 @@ func (s *searchSupport) getOrCreateIndex(ctx context.Context, key NamespacedReso span.AddEvent("Updating index") start := time.Now() - rv, err := idx.UpdateIndex(ctx, reason) + rv, err := idx.UpdateIndex(ctx) if err != nil { return nil, tracing.Error(span, fmt.Errorf("failed to update index to guarantee strong consistency: %w", err)) } diff --git a/pkg/storage/unified/resource/search_test.go b/pkg/storage/unified/resource/search_test.go index 270719af2a7..34c9ee6927c 100644 --- a/pkg/storage/unified/resource/search_test.go +++ b/pkg/storage/unified/resource/search_test.go @@ -31,7 +31,7 @@ type MockResourceIndex struct { updateIndexError error updateIndexMu sync.Mutex - updateIndexCalls []string + updateIndexCalls int buildInfo IndexBuildInfo } @@ -65,11 +65,11 @@ func (m *MockResourceIndex) ListManagedObjects(ctx context.Context, req *resourc return args.Get(0).(*resourcepb.ListManagedObjectsResponse), args.Error(1) } -func (m *MockResourceIndex) UpdateIndex(ctx context.Context, reason string) (int64, error) { +func (m *MockResourceIndex) UpdateIndex(_ context.Context) (int64, error) { m.updateIndexMu.Lock() defer m.updateIndexMu.Unlock() - m.updateIndexCalls = append(m.updateIndexCalls, reason) + m.updateIndexCalls++ return 0, m.updateIndexError } @@ -144,10 +144,10 @@ type buildIndexCall struct { fields SearchableDocumentFields } -func (m *mockSearchBackend) GetIndex(ctx context.Context, key NamespacedResource) (ResourceIndex, error) { +func (m *mockSearchBackend) GetIndex(key NamespacedResource) ResourceIndex { m.mu.Lock() defer m.mu.Unlock() - return m.cache[key], nil + return m.cache[key] } func (m *mockSearchBackend) BuildIndex(ctx context.Context, key NamespacedResource, size int64, fields SearchableDocumentFields, reason string, builder BuildFn, updater UpdateFn, rebuild bool) (ResourceIndex, error) { @@ -271,24 +271,24 @@ func TestSearchGetOrCreateIndexWithIndexUpdate(t *testing.T) { idx, err := support.getOrCreateIndex(context.Background(), NamespacedResource{Namespace: "ns", Group: "group", Resource: "resource"}, "initial call") require.NoError(t, err) require.NotNil(t, idx) - checkMockIndexUpdateCalls(t, idx, []string{"initial call"}) + checkMockIndexUpdateCalls(t, idx, 1) idx, err = support.getOrCreateIndex(context.Background(), NamespacedResource{Namespace: "ns", Group: "group", Resource: "resource"}, "second call") require.NoError(t, err) require.NotNil(t, idx) - checkMockIndexUpdateCalls(t, idx, []string{"initial call", "second call"}) + checkMockIndexUpdateCalls(t, idx, 2) idx, err = support.getOrCreateIndex(context.Background(), NamespacedResource{Namespace: "ns", Group: "group", Resource: "bad"}, "call to bad index") require.ErrorIs(t, err, failedErr) require.Nil(t, idx) } -func checkMockIndexUpdateCalls(t *testing.T, idx ResourceIndex, strings []string) { +func checkMockIndexUpdateCalls(t *testing.T, idx ResourceIndex, calls int) { mi, ok := idx.(*MockResourceIndex) require.True(t, ok) mi.updateIndexMu.Lock() defer mi.updateIndexMu.Unlock() - require.Equal(t, strings, mi.updateIndexCalls) + require.Equal(t, calls, mi.updateIndexCalls) } func TestSearchGetOrCreateIndexWithCancellation(t *testing.T) { @@ -333,8 +333,8 @@ func TestSearchGetOrCreateIndexWithCancellation(t *testing.T) { // Wait until new index is put into cache. require.Eventually(t, func() bool { - idx, err := support.search.GetIndex(ctx, key) - return err == nil && idx != nil + idx := support.search.GetIndex(key) + return idx != nil }, 1*time.Second, 100*time.Millisecond, "Indexing finishes despite context cancellation") // Second call to getOrCreateIndex returns index immediately, even if context is canceled, as the index is now ready and cached. @@ -347,10 +347,10 @@ type slowSearchBackendWithCache struct { wg sync.WaitGroup } -func (m *slowSearchBackendWithCache) GetIndex(ctx context.Context, key NamespacedResource) (ResourceIndex, error) { +func (m *slowSearchBackendWithCache) GetIndex(key NamespacedResource) ResourceIndex { m.mu.Lock() defer m.mu.Unlock() - return m.cache[key], nil + return m.cache[key] } func (m *slowSearchBackendWithCache) BuildIndex(ctx context.Context, key NamespacedResource, size int64, fields SearchableDocumentFields, reason string, builder BuildFn, updater UpdateFn, rebuild bool) (ResourceIndex, error) { @@ -573,14 +573,14 @@ func TestFindIndexesForRebuild(t *testing.T) { require.NoError(t, err) require.NotNil(t, support) - support.findIndexesToRebuild(context.Background(), now) + support.findIndexesToRebuild(now) require.Equal(t, 6, support.rebuildQueue.Len()) now5m := now.Add(5 * time.Minute) // Running findIndexesToRebuild again should not add any new indexes to the rebuild queue, and all existing // ones should be "combined" with new ones (this will "bump" minBuildTime) - support.findIndexesToRebuild(context.Background(), now5m) + support.findIndexesToRebuild(now5m) require.Equal(t, 6, support.rebuildQueue.Len()) // Values that we expect to find in rebuild requests. @@ -692,8 +692,7 @@ func TestRebuildIndexes(t *testing.T) { func checkRebuildIndex(t *testing.T, support *searchSupport, req rebuildRequest, indexExists, expectedRebuild bool) { ctx := context.Background() - idxBefore, err := support.search.GetIndex(ctx, req.NamespacedResource) - require.NoError(t, err) + idxBefore := support.search.GetIndex(req.NamespacedResource) if indexExists { require.NotNil(t, idxBefore, "index should exist before rebuildIndex") } else { @@ -702,8 +701,7 @@ func checkRebuildIndex(t *testing.T, support *searchSupport, req rebuildRequest, support.rebuildIndex(ctx, req) - idxAfter, err := support.search.GetIndex(ctx, req.NamespacedResource) - require.NoError(t, err) + idxAfter := support.search.GetIndex(req.NamespacedResource) if indexExists { require.NotNil(t, idxAfter, "index should exist after rebuildIndex") diff --git a/pkg/storage/unified/search/bleve.go b/pkg/storage/unified/search/bleve.go index d8ac41e1332..190322bab7c 100644 --- a/pkg/storage/unified/search/bleve.go +++ b/pkg/storage/unified/search/bleve.go @@ -170,13 +170,13 @@ func NewBleveBackend(opts BleveOptions, tracer trace.Tracer, indexMetrics *resou } // GetIndex will return nil if the key does not exist -func (b *bleveBackend) GetIndex(_ context.Context, key resource.NamespacedResource) (resource.ResourceIndex, error) { +func (b *bleveBackend) GetIndex(key resource.NamespacedResource) resource.ResourceIndex { idx := b.getCachedIndex(key, time.Now()) // Avoid returning typed nils. if idx == nil { - return nil, nil + return nil } - return idx, nil + return idx } func (b *bleveBackend) GetOpenIndexes() []resource.NamespacedResource { @@ -1358,7 +1358,7 @@ func (b *bleveIndex) stopUpdaterAndCloseIndex() error { return b.index.Close() } -func (b *bleveIndex) UpdateIndex(ctx context.Context, reason string) (int64, error) { +func (b *bleveIndex) UpdateIndex(ctx context.Context) (int64, error) { // We don't have to do anything if the index cannot be updated (typically in tests). if b.updaterFn == nil { return 0, nil diff --git a/pkg/storage/unified/search/bleve_test.go b/pkg/storage/unified/search/bleve_test.go index 2c767e079df..c0ad9c68fc0 100644 --- a/pkg/storage/unified/search/bleve_test.go +++ b/pkg/storage/unified/search/bleve_test.go @@ -903,8 +903,7 @@ func TestBuildIndexExpiration(t *testing.T) { backend.runEvictExpiredOrUnownedIndexes(time.Now().Add(5 * time.Minute)) if tc.expectedEviction { - idx, err := backend.GetIndex(context.Background(), ns) - require.NoError(t, err) + idx := backend.GetIndex(ns) require.Nil(t, idx) _, err = builtIndex.DocCount(context.Background(), "") @@ -913,8 +912,7 @@ func TestBuildIndexExpiration(t *testing.T) { // Verify that there are no open indexes. checkOpenIndexes(t, reg, 0, 0) } else { - idx, err := backend.GetIndex(context.Background(), ns) - require.NoError(t, err) + idx := backend.GetIndex(ns) require.NotNil(t, idx) cnt, err := builtIndex.DocCount(context.Background(), "") @@ -1246,7 +1244,7 @@ func TestIndexUpdate(t *testing.T) { require.Equal(t, int64(0), resp.TotalHits) // Update index. - _, err = idx.UpdateIndex(context.Background(), "test") + _, err = idx.UpdateIndex(context.Background()) require.NoError(t, err) // Verify that index was updated -- number of docs didn't change, but we can search "gen_1" documents now. @@ -1254,7 +1252,7 @@ func TestIndexUpdate(t *testing.T) { require.Equal(t, int64(5), searchTitle(t, idx, "gen_1", 10, ns).TotalHits) // Update index again. - _, err = idx.UpdateIndex(context.Background(), "test") + _, err = idx.UpdateIndex(context.Background()) require.NoError(t, err) // Verify that index was updated again -- we can search "gen_2" now. "gen_1" documents are gone. require.Equal(t, 10, docCount(t, idx)) @@ -1298,13 +1296,13 @@ func TestConcurrentIndexUpdateAndBuildIndex(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, err = idx.UpdateIndex(ctx, "test") + _, err = idx.UpdateIndex(ctx) require.NoError(t, err) _, err = be.BuildIndex(t.Context(), ns, 10 /* file based */, nil, "test", indexTestDocs(ns, 10, 100), updaterFn, false) require.NoError(t, err) - _, err = idx.UpdateIndex(ctx, "test") + _, err = idx.UpdateIndex(ctx) require.Contains(t, err.Error(), bleve.ErrorIndexClosed.Error()) } @@ -1340,10 +1338,8 @@ func TestConcurrentIndexUpdateSearchAndRebuild(t *testing.T) { case <-time.After(time.Duration(i) * time.Millisecond): // introduce small jitter } - idx, err := be.GetIndex(ctx, ns) - require.NoError(t, err) // GetIndex doesn't really return error. - - _, err = idx.UpdateIndex(ctx, "test") + idx := be.GetIndex(ns) + _, err = idx.UpdateIndex(ctx) if err != nil { if errors.Is(err, bleve.ErrorIndexClosed) || errors.Is(err, context.Canceled) { continue @@ -1424,7 +1420,7 @@ func TestConcurrentIndexUpdateAndSearch(t *testing.T) { prevRV := int64(0) for ctx.Err() == nil { // We use t.Context() here to avoid getting errors from context cancellation. - rv, err := idx.UpdateIndex(t.Context(), "test") + rv, err := idx.UpdateIndex(t.Context()) require.NoError(t, err) require.Greater(t, rv, prevRV) // Each update should return new RV (that's how our update function works) require.Equal(t, int64(10), searchTitle(t, idx, "Document", 10, ns).TotalHits) @@ -1484,7 +1480,7 @@ func TestConcurrentIndexUpdateAndSearchWithIndexMinUpdateInterval(t *testing.T) attemptedUpdates.Inc() // We use t.Context() here to avoid getting errors from context cancellation. - rv, err := idx.UpdateIndex(t.Context(), "test") + rv, err := idx.UpdateIndex(t.Context()) require.NoError(t, err) // Our update function returns unix timestamp in millis. We expect it to not change at all, or change by minInterval. @@ -1536,7 +1532,7 @@ func TestIndexUpdateWithErrors(t *testing.T) { require.NoError(t, err) t.Run("update fail", func(t *testing.T) { - _, err = idx.UpdateIndex(t.Context(), "test") + _, err = idx.UpdateIndex(t.Context()) require.ErrorIs(t, err, updateErr) }) @@ -1544,7 +1540,7 @@ func TestIndexUpdateWithErrors(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) defer cancel() - _, err = idx.UpdateIndex(ctx, "test") + _, err = idx.UpdateIndex(ctx) require.ErrorIs(t, err, context.DeadlineExceeded) }) @@ -1553,7 +1549,7 @@ func TestIndexUpdateWithErrors(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - _, err = idx.UpdateIndex(ctx, "test") + _, err = idx.UpdateIndex(ctx) require.ErrorIs(t, err, context.Canceled) }) } diff --git a/pkg/storage/unified/testing/search_backend.go b/pkg/storage/unified/testing/search_backend.go index ed1cffd5194..4ef4b6d2753 100644 --- a/pkg/storage/unified/testing/search_backend.go +++ b/pkg/storage/unified/testing/search_backend.go @@ -59,12 +59,11 @@ func runTestSearchBackendBuildIndex(t *testing.T, backend resource.SearchBackend } // Get the index should return nil if the index does not exist - index, err := backend.GetIndex(ctx, ns) - require.NoError(t, err) + index := backend.GetIndex(ns) require.Nil(t, index) // Build the index - index, err = backend.BuildIndex(ctx, ns, 0, nil, "test", func(index resource.ResourceIndex) (int64, error) { + index, err := backend.BuildIndex(ctx, ns, 0, nil, "test", func(index resource.ResourceIndex) (int64, error) { // Write a test document err := index.BulkIndex(&resource.BulkIndexRequest{ Items: []*resource.BulkIndexItem{ @@ -91,8 +90,7 @@ func runTestSearchBackendBuildIndex(t *testing.T, backend resource.SearchBackend require.NotNil(t, index) // Get the index should now return the index - index, err = backend.GetIndex(ctx, ns) - require.NoError(t, err) + index = backend.GetIndex(ns) require.NotNil(t, index) }