unified-storage: reuse index if RV is set even if it is outdated (#110184)

* unified-storage: reuse index if RV is set even if it is outdated and searchAfterWrite FF is enabled
This commit is contained in:
Will Assis 2025-08-27 11:10:54 -04:00 committed by GitHub
parent edd59d634e
commit 7921c7da23
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 361 additions and 40 deletions

View File

@ -244,7 +244,7 @@ func (s *server) BulkProcess(stream resourcepb.BulkStore_BulkProcessServer) erro
Namespace: summary.Namespace,
Group: summary.Group,
Resource: summary.Resource,
}, summary.Count, summary.ResourceVersion, "rebuildAfterBatchLoad")
}, summary.Count, summary.ResourceVersion, "rebuildAfterBatchLoad", true)
if err != nil {
s.log.Warn("error building search index after batch load", "err", err)
rsp.Error = &resourcepb.ErrorResult{

View File

@ -109,6 +109,8 @@ type SearchBackend interface {
indexBuildReason string,
builder BuildFn,
updater UpdateFn,
rebuild bool,
searchAfterWrite bool,
) (ResourceIndex, error)
// TotalDocs returns the total number of documents across all indexes.
@ -475,7 +477,7 @@ func (s *searchSupport) buildIndexes(ctx context.Context, rebuild bool) (int, er
if rebuild {
reason = "rebuild"
}
_, _, err := s.build(ctx, info.NamespacedResource, info.Count, info.ResourceVersion, reason)
_, _, err := s.build(ctx, info.NamespacedResource, info.Count, info.ResourceVersion, reason, rebuild)
return err
})
}
@ -693,7 +695,7 @@ func (s *searchSupport) getOrCreateIndex(ctx context.Context, key NamespacedReso
}
}
idx, _, err = s.build(ctx, key, size, rv, reason)
idx, _, err = s.build(ctx, key, size, rv, reason, false)
if err != nil {
return nil, fmt.Errorf("error building search index, %w", err)
}
@ -729,7 +731,7 @@ func (s *searchSupport) getOrCreateIndex(ctx context.Context, key NamespacedReso
return idx, nil
}
func (s *searchSupport) build(ctx context.Context, nsr NamespacedResource, size int64, documentStatsRV int64, indexBuildReason string) (ResourceIndex, int64, error) {
func (s *searchSupport) build(ctx context.Context, nsr NamespacedResource, size int64, documentStatsRV int64, indexBuildReason string, rebuild bool) (ResourceIndex, int64, error) {
ctx, span := s.tracer.Start(ctx, tracingPrexfixSearch+"Build")
defer span.End()
@ -897,7 +899,7 @@ func (s *searchSupport) build(ctx context.Context, nsr NamespacedResource, size
return rv, docs, nil
}
index, err := s.search.BuildIndex(ctx, nsr, size, documentStatsRV, fields, indexBuildReason, builderFn, updaterFn)
index, err := s.search.BuildIndex(ctx, nsr, size, documentStatsRV, fields, indexBuildReason, builderFn, updaterFn, rebuild, s.searchAfterWrite)
if err != nil {
return nil, 0, err
@ -931,7 +933,7 @@ func (s *searchSupport) buildEmptyIndex(ctx context.Context, nsr NamespacedResou
}, func(context context.Context, index ResourceIndex, sinceRV int64) (int64, int, error) {
// No update is performed.
return 0, 0, nil
})
}, false, s.searchAfterWrite)
}
type builderCache struct {

View File

@ -147,7 +147,7 @@ func (m *mockSearchBackend) GetIndex(ctx context.Context, key NamespacedResource
return m.cache[key], nil
}
func (m *mockSearchBackend) BuildIndex(ctx context.Context, key NamespacedResource, size int64, resourceVersion int64, fields SearchableDocumentFields, reason string, builder BuildFn, updater UpdateFn) (ResourceIndex, error) {
func (m *mockSearchBackend) BuildIndex(ctx context.Context, key NamespacedResource, size int64, resourceVersion int64, fields SearchableDocumentFields, reason string, builder BuildFn, updater UpdateFn, rebuild bool, searchAfterWrite bool) (ResourceIndex, error) {
index := &MockResourceIndex{}
index.On("BulkIndex", mock.Anything).Return(nil).Maybe()
index.On("DocCount", mock.Anything, mock.Anything).Return(int64(0), nil).Maybe()
@ -538,7 +538,7 @@ func (m *slowSearchBackendWithCache) GetIndex(ctx context.Context, key Namespace
return m.cache[key], nil
}
func (m *slowSearchBackendWithCache) BuildIndex(ctx context.Context, key NamespacedResource, size int64, resourceVersion int64, fields SearchableDocumentFields, reason string, builder BuildFn, updater UpdateFn) (ResourceIndex, error) {
func (m *slowSearchBackendWithCache) BuildIndex(ctx context.Context, key NamespacedResource, size int64, resourceVersion int64, fields SearchableDocumentFields, reason string, builder BuildFn, updater UpdateFn, rebuild bool, searchAfterWrite bool) (ResourceIndex, error) {
m.wg.Add(1)
defer m.wg.Done()
@ -548,7 +548,7 @@ func (m *slowSearchBackendWithCache) BuildIndex(ctx context.Context, key Namespa
if ctx.Err() != nil {
return nil, ctx.Err()
}
idx, err := m.mockSearchBackend.BuildIndex(ctx, key, size, resourceVersion, fields, reason, builder, updater)
idx, err := m.mockSearchBackend.BuildIndex(ctx, key, size, resourceVersion, fields, reason, builder, updater, rebuild, searchAfterWrite)
if err != nil {
return nil, err
}

View File

@ -208,6 +208,8 @@ func (b *bleveBackend) BuildIndex(
indexBuildReason string,
builder resource.BuildFn,
updater resource.UpdateFn,
rebuild bool,
searchAfterWrite bool,
) (resource.ResourceIndex, error) {
_, span := b.tracer.Start(ctx, tracingPrexfixBleve+"BuildIndex")
defer span.End()
@ -269,8 +271,8 @@ func (b *bleveBackend) BuildIndex(
// We only check for the existing file-based index if we don't already have an open index for this key.
// This happens on startup, or when memory-based index has expired. (We don't expire file-based indexes)
// If we do have an unexpired cached index already, we always build a new index from scratch.
if cachedIndex == nil && resourceVersion > 0 {
index, fileIndexName, indexRV = b.findPreviousFileBasedIndex(resourceDir, resourceVersion, size)
if cachedIndex == nil && resourceVersion > 0 && !rebuild {
index, fileIndexName, indexRV = b.findPreviousFileBasedIndex(resourceDir, resourceVersion, size, searchAfterWrite)
}
if index != nil {
@ -488,7 +490,7 @@ func formatIndexName(now time.Time) string {
return now.Format("20060102-150405")
}
func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string, resourceVersion int64, size int64) (bleve.Index, string, int64) {
func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string, resourceVersion int64, size int64, searchAfterWrite bool) (bleve.Index, string, int64) {
entries, err := os.ReadDir(resourceDir)
if err != nil {
return nil, "", 0
@ -529,7 +531,8 @@ func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string, resourceVe
continue
}
if indexRV < resourceVersion {
// if searchAfterWrite is enabled, we don't need to re-build the index, as it will be updated at request time
if !searchAfterWrite && indexRV < resourceVersion {
b.log.Debug("indexRV is less than requested resourceVersion. ignoring index", "indexDir", indexDir, "rv", indexRV, "resourceVersion", resourceVersion)
_ = idx.Close()
continue

View File

@ -551,7 +551,7 @@ func newTestDashboardsIndex(t testing.TB, threshold int64, size int64, batchSize
Namespace: key.Namespace,
Group: key.Group,
Resource: key.Resource,
}, size, rv, info.Fields, "test", writer, nil)
}, size, rv, info.Fields, "test", writer, nil, false, false)
require.NoError(t, err)
return index

View File

@ -190,7 +190,7 @@ func TestBleveBackend(t *testing.T) {
return 0, err
}
return rv, nil
}, nil)
}, nil, false, false)
require.NoError(t, err)
require.NotNil(t, index)
dashboardsIndex = index
@ -419,7 +419,7 @@ func TestBleveBackend(t *testing.T) {
return 0, err
}
return rv, nil
}, nil)
}, nil, false, false)
require.NoError(t, err)
require.NotNil(t, index)
foldersIndex = index
@ -782,7 +782,7 @@ func TestBleveInMemoryIndexExpiration(t *testing.T) {
Resource: "resource",
}
builtIndex, err := backend.BuildIndex(context.Background(), ns, 1 /* below FileThreshold */, 100, nil, "test", indexTestDocs(ns, 1, 100), nil)
builtIndex, err := backend.BuildIndex(context.Background(), ns, 1 /* below FileThreshold */, 100, nil, "test", indexTestDocs(ns, 1, 100), nil, false, false)
require.NoError(t, err)
// Wait for index expiration, which is 1ns
@ -814,7 +814,7 @@ func TestBleveFileIndexExpiration(t *testing.T) {
}
// size=100 is above FileThreshold, this will be file-based index
builtIndex, err := backend.BuildIndex(context.Background(), ns, 100, 100, nil, "test", indexTestDocs(ns, 1, 100), nil)
builtIndex, err := backend.BuildIndex(context.Background(), ns, 100, 100, nil, "test", indexTestDocs(ns, 1, 100), nil, false, false)
require.NoError(t, err)
// Wait for index expiration, which is 1ns
@ -846,7 +846,7 @@ func TestFileIndexIsReusedOnSameSizeAndRVLessThanIndexRV(t *testing.T) {
tmpDir := t.TempDir()
backend1, reg1 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
_, err := backend1.BuildIndex(context.Background(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), nil)
_, err := backend1.BuildIndex(context.Background(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), nil, false, false)
require.NoError(t, err)
// Verify one open index.
@ -869,7 +869,7 @@ func TestFileIndexIsReusedOnSameSizeAndRVLessThanIndexRV(t *testing.T) {
// We open new backend using same directory, and run indexing with same size (10) and RV (100). This should reuse existing index, and skip indexing.
backend2, reg2 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
idx, err := backend2.BuildIndex(context.Background(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 1000, 100), nil)
idx, err := backend2.BuildIndex(context.Background(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 1000, 100), nil, false, false)
require.NoError(t, err)
// Verify that we're reusing existing index and there is only 10 documents in it, not 1000.
@ -895,7 +895,7 @@ func TestFileIndexIsReusedOnSameSizeAndRVLessThanIndexRV(t *testing.T) {
// We repeat with backend3 and RV 99. This should also reuse existing index and skip indexing
backend3, reg3 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
idx, err = backend3.BuildIndex(context.Background(), ns, 10 /* file based */, 99, nil, "test", indexTestDocs(ns, 1000, 99), nil)
idx, err = backend3.BuildIndex(context.Background(), ns, 10 /* file based */, 99, nil, "test", indexTestDocs(ns, 1000, 99), nil, false, false)
require.NoError(t, err)
// Verify that we're reusing existing index and there is only 10 documents in it, not 1000.
@ -911,6 +911,322 @@ func TestFileIndexIsReusedOnSameSizeAndRVLessThanIndexRV(t *testing.T) {
`), "index_server_open_indexes"))
backend3.CloseAllIndexes()
require.NoError(t, testutil.GatherAndCompare(reg3, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 0
`), "index_server_open_indexes"))
// again now RV > 100. Should NOT reuse index
backend4, reg4 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
idx, err = backend4.BuildIndex(context.Background(), ns, 10 /* file based */, 101, nil, "test", indexTestDocs(ns, 1000, 100), nil, false, false)
require.NoError(t, err)
// Verify that we're NOT existing index and there is only 1000 documents in it, not 10.
cnt, err = idx.DocCount(context.Background(), "")
require.NoError(t, err)
require.Equal(t, int64(1000), cnt)
require.NoError(t, testutil.GatherAndCompare(reg4, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 1
`), "index_server_open_indexes"))
backend4.CloseAllIndexes()
}
func TestFileIndexIsIgnoredIfRebuildFlagIsTrueWithoutSearchAfterWrite(t *testing.T) {
ns := resource.NamespacedResource{
Namespace: "test",
Group: "group",
Resource: "resource",
}
tmpDir := t.TempDir()
backend1, reg1 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
_, err := backend1.BuildIndex(context.Background(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), nil, true, false)
require.NoError(t, err)
// Verify one open index.
require.NoError(t, testutil.GatherAndCompare(reg1, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 1
`), "index_server_open_indexes"))
backend1.CloseAllIndexes()
// Verify that there are no open indexes after CloseAllIndexes call.
require.NoError(t, testutil.GatherAndCompare(reg1, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 0
`), "index_server_open_indexes"))
// We open new backend using same directory, and run indexing with same size (10) and RV (100). This should NOT
// reuse existing index, due to the rebuild flag being true
backend2, reg2 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
idx, err := backend2.BuildIndex(context.Background(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 1000, 100), nil, true, false)
require.NoError(t, err)
// Verify that we've re-built the index. There should be 1000 documents, not 10.
cnt, err := idx.DocCount(context.Background(), "")
require.NoError(t, err)
require.Equal(t, int64(1000), cnt)
require.NoError(t, testutil.GatherAndCompare(reg2, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 1
`), "index_server_open_indexes"))
backend2.CloseAllIndexes()
// Verify that there are no open indexes after closeAllIndexes call.
require.NoError(t, testutil.GatherAndCompare(reg2, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 0
`), "index_server_open_indexes"))
// We repeat with backend3 and RV 99. This should also NOT reuse existing index
backend3, reg3 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
idx, err = backend3.BuildIndex(context.Background(), ns, 10 /* file based */, 99, nil, "test", indexTestDocs(ns, 1001, 99), nil, true, false)
require.NoError(t, err)
// Verify that we've re-built the index. There should be 1001 documents
cnt, err = idx.DocCount(context.Background(), "")
require.NoError(t, err)
require.Equal(t, int64(1001), cnt)
require.NoError(t, testutil.GatherAndCompare(reg3, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 1
`), "index_server_open_indexes"))
backend3.CloseAllIndexes()
// again now RV > 100. Should still rebuild the index due to the rebuild flag
backend4, reg4 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
idx, err = backend4.BuildIndex(context.Background(), ns, 10 /* file based */, 101, nil, "test", indexTestDocs(ns, 1002, 100), nil, true, false)
require.NoError(t, err)
// Verify that we've re-built the index. There should be 1002 documents
cnt, err = idx.DocCount(context.Background(), "")
require.NoError(t, err)
require.Equal(t, int64(1002), cnt)
require.NoError(t, testutil.GatherAndCompare(reg4, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 1
`), "index_server_open_indexes"))
backend4.CloseAllIndexes()
}
func TestFileIndexIsIgnoredIfRebuildFlagIsTrueWithSearchAfterWrite(t *testing.T) {
ns := resource.NamespacedResource{
Namespace: "test",
Group: "group",
Resource: "resource",
}
tmpDir := t.TempDir()
backend1, reg1 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
_, err := backend1.BuildIndex(context.Background(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), nil, true, true)
require.NoError(t, err)
// Verify one open index.
require.NoError(t, testutil.GatherAndCompare(reg1, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 1
`), "index_server_open_indexes"))
backend1.CloseAllIndexes()
// Verify that there are no open indexes after CloseAllIndexes call.
require.NoError(t, testutil.GatherAndCompare(reg1, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 0
`), "index_server_open_indexes"))
// We open new backend using same directory, and run indexing with same size (10) and RV (100). This should NOT
// reuse existing index, due to the rebuild flag being true
backend2, reg2 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
idx, err := backend2.BuildIndex(context.Background(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 1000, 100), nil, true, true)
require.NoError(t, err)
// Verify that we've re-built the index. There should be 1000 documents, not 10.
cnt, err := idx.DocCount(context.Background(), "")
require.NoError(t, err)
require.Equal(t, int64(1000), cnt)
require.NoError(t, testutil.GatherAndCompare(reg2, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 1
`), "index_server_open_indexes"))
backend2.CloseAllIndexes()
// Verify that there are no open indexes after closeAllIndexes call.
require.NoError(t, testutil.GatherAndCompare(reg2, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 0
`), "index_server_open_indexes"))
// We repeat with backend3 and RV 99. This should also NOT reuse existing index
backend3, reg3 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
idx, err = backend3.BuildIndex(context.Background(), ns, 10 /* file based */, 99, nil, "test", indexTestDocs(ns, 1001, 99), nil, true, true)
require.NoError(t, err)
// Verify that we've re-built the index. There should be 1001 documents
cnt, err = idx.DocCount(context.Background(), "")
require.NoError(t, err)
require.Equal(t, int64(1001), cnt)
require.NoError(t, testutil.GatherAndCompare(reg3, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 1
`), "index_server_open_indexes"))
backend3.CloseAllIndexes()
// again now RV > 100. Should still rebuild the index due to the rebuild flag
backend4, reg4 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
idx, err = backend4.BuildIndex(context.Background(), ns, 10 /* file based */, 101, nil, "test", indexTestDocs(ns, 1002, 100), nil, true, true)
require.NoError(t, err)
// Verify that we've re-built the index. There should be 1002 documents
cnt, err = idx.DocCount(context.Background(), "")
require.NoError(t, err)
require.Equal(t, int64(1002), cnt)
require.NoError(t, testutil.GatherAndCompare(reg4, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 1
`), "index_server_open_indexes"))
backend4.CloseAllIndexes()
}
func TestFileIndexIsReusedIfRVisPresentAndSearhAfterWriteIsEnabled(t *testing.T) {
ns := resource.NamespacedResource{
Namespace: "test",
Group: "group",
Resource: "resource",
}
tmpDir := t.TempDir()
backend1, reg1 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
_, err := backend1.BuildIndex(context.Background(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), nil, false, true)
require.NoError(t, err)
// Verify one open index.
require.NoError(t, testutil.GatherAndCompare(reg1, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 1
`), "index_server_open_indexes"))
backend1.CloseAllIndexes()
// Verify that there are no open indexes after CloseAllIndexes call.
require.NoError(t, testutil.GatherAndCompare(reg1, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 0
`), "index_server_open_indexes"))
// We open new backend using same directory, and run indexing with same size (10) and RV (100). This should reuse existing index, and skip indexing.
backend2, reg2 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
idx, err := backend2.BuildIndex(context.Background(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 1000, 100), nil, false, true)
require.NoError(t, err)
// Verify that we're reusing existing index and there is only 10 documents in it, not 1000.
cnt, err := idx.DocCount(context.Background(), "")
require.NoError(t, err)
require.Equal(t, int64(10), cnt)
require.NoError(t, testutil.GatherAndCompare(reg2, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 1
`), "index_server_open_indexes"))
backend2.CloseAllIndexes()
// Verify that there are no open indexes after closeAllIndexes call.
require.NoError(t, testutil.GatherAndCompare(reg2, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 0
`), "index_server_open_indexes"))
// We repeat with backend3 and RV 99. This should also reuse existing index and skip indexing
backend3, reg3 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
idx, err = backend3.BuildIndex(context.Background(), ns, 10 /* file based */, 99, nil, "test", indexTestDocs(ns, 1000, 99), nil, false, true)
require.NoError(t, err)
// Verify that we're reusing existing index and there is only 10 documents in it, not 1000.
cnt, err = idx.DocCount(context.Background(), "")
require.NoError(t, err)
require.Equal(t, int64(10), cnt)
require.NoError(t, testutil.GatherAndCompare(reg3, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 1
`), "index_server_open_indexes"))
backend3.CloseAllIndexes()
// again now RV > 100. Should still reuse index
backend4, reg4 := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
idx, err = backend4.BuildIndex(context.Background(), ns, 10 /* file based */, 101, nil, "test", indexTestDocs(ns, 1000, 100), nil, false, true)
require.NoError(t, err)
// Verify that we're reusing existing index and there is only 10 documents in it, not 1000.
cnt, err = idx.DocCount(context.Background(), "")
require.NoError(t, err)
require.Equal(t, int64(10), cnt)
require.NoError(t, testutil.GatherAndCompare(reg4, bytes.NewBufferString(`
# HELP index_server_open_indexes Number of open indexes per storage type. An open index corresponds to single resource group.
# TYPE index_server_open_indexes gauge
index_server_open_indexes{index_storage="memory"} 0
index_server_open_indexes{index_storage="file"} 1
`), "index_server_open_indexes"))
backend4.CloseAllIndexes()
}
func TestFileIndexIsNotReusedOnDifferentSize(t *testing.T) {
@ -923,13 +1239,13 @@ func TestFileIndexIsNotReusedOnDifferentSize(t *testing.T) {
tmpDir := t.TempDir()
backend1, _ := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
_, err := backend1.BuildIndex(context.Background(), ns, 10, 100, nil, "test", indexTestDocs(ns, 10, 100), nil)
_, err := backend1.BuildIndex(context.Background(), ns, 10, 100, nil, "test", indexTestDocs(ns, 10, 100), nil, false, false)
require.NoError(t, err)
backend1.CloseAllIndexes()
// We open new backend using same directory, but with different size. Index should be rebuilt.
backend2, _ := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
idx, err := backend2.BuildIndex(context.Background(), ns, 100, 100, nil, "test", indexTestDocs(ns, 100, 100), nil)
idx, err := backend2.BuildIndex(context.Background(), ns, 100, 100, nil, "test", indexTestDocs(ns, 100, 100), nil, false, false)
require.NoError(t, err)
// Verify that index has updated number of documents.
@ -948,13 +1264,13 @@ func TestFileIndexIsNotReusedOnDifferentRV(t *testing.T) {
tmpDir := t.TempDir()
backend1, _ := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
_, err := backend1.BuildIndex(context.Background(), ns, 10, 100, nil, "test", indexTestDocs(ns, 10, 100), nil)
_, err := backend1.BuildIndex(context.Background(), ns, 10, 100, nil, "test", indexTestDocs(ns, 10, 100), nil, false, false)
require.NoError(t, err)
backend1.CloseAllIndexes()
// We open new backend using same directory, but with different RV. Index should be rebuilt.
backend2, _ := setupBleveBackend(t, 5, time.Nanosecond, tmpDir)
idx, err := backend2.BuildIndex(context.Background(), ns, 10 /* file based */, 999999, nil, "test", indexTestDocs(ns, 100, 999999), nil)
idx, err := backend2.BuildIndex(context.Background(), ns, 10 /* file based */, 999999, nil, "test", indexTestDocs(ns, 100, 999999), nil, false, false)
require.NoError(t, err)
// Verify that index has updated number of documents.
@ -986,7 +1302,7 @@ func TestRebuildingIndexClosesPreviousCachedIndex(t *testing.T) {
if testCase.firstInMemory {
firstSize = 1
}
firstIndex, err := backend.BuildIndex(context.Background(), ns, int64(firstSize), 100, nil, "test", indexTestDocs(ns, firstSize, 100), nil)
firstIndex, err := backend.BuildIndex(context.Background(), ns, int64(firstSize), 100, nil, "test", indexTestDocs(ns, firstSize, 100), nil, false, false)
require.NoError(t, err)
if testCase.firstInMemory {
@ -1002,7 +1318,7 @@ func TestRebuildingIndexClosesPreviousCachedIndex(t *testing.T) {
secondSize = 1
openInMemoryIndexes = 1
}
secondIndex, err := backend.BuildIndex(context.Background(), ns, int64(secondSize), 100, nil, "test", indexTestDocs(ns, secondSize, 100), nil)
secondIndex, err := backend.BuildIndex(context.Background(), ns, int64(secondSize), 100, nil, "test", indexTestDocs(ns, secondSize, 100), nil, false, false)
require.NoError(t, err)
if testCase.secondInMemory {
@ -1149,11 +1465,11 @@ func testBleveIndexWithFailures(t *testing.T, fileBased bool) {
}
_, err := backend.BuildIndex(context.Background(), ns, size, 100, nil, "test", func(index resource.ResourceIndex) (int64, error) {
return 0, fmt.Errorf("fail")
}, nil)
}, nil, false, false)
require.Error(t, err)
// Even though previous build of the index failed, new building of the index should work.
_, err = backend.BuildIndex(context.Background(), ns, size, 100, nil, "test", indexTestDocs(ns, int(size), 100), nil)
_, err = backend.BuildIndex(context.Background(), ns, size, 100, nil, "test", indexTestDocs(ns, int(size), 100), nil, false, false)
require.NoError(t, err)
}
@ -1165,7 +1481,7 @@ func TestIndexUpdate(t *testing.T) {
}
be, _ := setupBleveBackend(t, 5, 1*time.Minute, "")
idx, err := be.BuildIndex(t.Context(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), updateTestDocs(ns, 5))
idx, err := be.BuildIndex(t.Context(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), updateTestDocs(ns, 5), false, false)
require.NoError(t, err)
resp := searchTitle(t, idx, "gen", 10, ns)
@ -1219,7 +1535,7 @@ func TestConcurrentIndexUpdateAndBuildIndex(t *testing.T) {
return sinceRV + int64(5), 5, err
}
idx, err := be.BuildIndex(t.Context(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), updaterFn)
idx, err := be.BuildIndex(t.Context(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), updaterFn, false, false)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
@ -1227,7 +1543,7 @@ func TestConcurrentIndexUpdateAndBuildIndex(t *testing.T) {
_, err = idx.UpdateIndex(ctx, "test")
require.NoError(t, err)
_, err = be.BuildIndex(t.Context(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), updaterFn)
_, err = be.BuildIndex(t.Context(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), updaterFn, false, false)
require.NoError(t, err)
_, err = idx.UpdateIndex(ctx, "test")
@ -1243,7 +1559,7 @@ func TestConcurrentIndexUpdateSearchAndRebuild(t *testing.T) {
be, _ := setupBleveBackend(t, 5, 1*time.Minute, "")
_, err := be.BuildIndex(t.Context(), ns, 10, 0, nil, "test", indexTestDocs(ns, 10, 100), updateTestDocs(ns, 5))
_, err := be.BuildIndex(t.Context(), ns, 10, 0, nil, "test", indexTestDocs(ns, 10, 100), updateTestDocs(ns, 5), false, false)
require.NoError(t, err)
wg := sync.WaitGroup{}
@ -1306,7 +1622,7 @@ func TestConcurrentIndexUpdateSearchAndRebuild(t *testing.T) {
go func() {
defer wg.Done()
for ctx.Err() == nil {
_, err := be.BuildIndex(t.Context(), ns, 10, 0, nil, "test", indexTestDocs(ns, 10, 100), updateTestDocs(ns, 5))
_, err := be.BuildIndex(t.Context(), ns, 10, 0, nil, "test", indexTestDocs(ns, 10, 100), updateTestDocs(ns, 5), false, false)
require.NoError(t, err)
rebuilds.Inc()
}
@ -1329,7 +1645,7 @@ func TestConcurrentIndexUpdateAndSearch(t *testing.T) {
be, _ := setupBleveBackend(t, 5, 1*time.Minute, "")
idx, err := be.BuildIndex(t.Context(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), updateTestDocs(ns, 5))
idx, err := be.BuildIndex(t.Context(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), updateTestDocs(ns, 5), false, false)
require.NoError(t, err)
wg := sync.WaitGroup{}
@ -1393,7 +1709,7 @@ func TestIndexUpdateWithErrors(t *testing.T) {
time.Sleep(100 * time.Millisecond)
return 0, 0, updateErr
}
idx, err := be.BuildIndex(t.Context(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), updaterFn)
idx, err := be.BuildIndex(t.Context(), ns, 10 /* file based */, 100, nil, "test", indexTestDocs(ns, 10, 100), updaterFn, false, false)
require.NoError(t, err)
t.Run("update fail", func(t *testing.T) {

View File

@ -217,7 +217,7 @@ func runSearchBackendBenchmarkWriteThroughput(ctx context.Context, backend resou
size := int64(10000) // force the index to be on disk
index, err := backend.BuildIndex(ctx, nr, size, 0, nil, "benchmark", func(index resource.ResourceIndex) (int64, error) {
return 0, nil
}, nil)
}, nil, false, false)
if err != nil {
return nil, fmt.Errorf("failed to initialize backend: %w", err)
}

View File

@ -86,7 +86,7 @@ func runTestSearchBackendBuildIndex(t *testing.T, backend resource.SearchBackend
return 0, err
}
return 1, nil
}, nil)
}, nil, false, false)
require.NoError(t, err)
require.NotNil(t, index)
@ -152,7 +152,7 @@ func runTestResourceIndex(t *testing.T, backend resource.SearchBackend, nsPrefix
})
require.NoError(t, err)
return int64(2), nil
}, nil)
}, nil, false, false)
require.NoError(t, err)
require.NotNil(t, index)
@ -294,7 +294,7 @@ func runTestResourceIndex(t *testing.T, backend resource.SearchBackend, nsPrefix
})
require.NoError(t, err)
return int64(3), nil
}, nil)
}, nil, false, false)
require.NoError(t, err)
require.NotNil(t, index)