Rebuild search indexes asynchronously (#111829)

* Add "debouncer" queue, which can combine incoming elements.

* Rebuild indexes asynchronously.

* Remove duplicate method.

* Fix bleve tests.

* Extracted combineRebuildRequests and added test for it.

* Add TestShouldRebuildIndex

* Added TestFindIndexesForRebuild

* Added TestFindIndexesForRebuild

* Introduce index_rebuild_workers option.

* Add metric for rebuild queue length.

* Add TestRebuildIndexes.

* Fix import.

* Linter, review feedback.
This commit is contained in:
Peter Štibraný 2025-10-01 11:52:09 +02:00 committed by GitHub
parent 91e8eb0e45
commit 707c486a46
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1009 additions and 285 deletions

View File

@ -575,13 +575,14 @@ type Cfg struct {
MaxPageSizeBytes int
IndexPath string
IndexWorkers int
IndexRebuildWorkers int
IndexMaxBatchSize int
IndexFileThreshold int
IndexMinCount int
IndexRebuildInterval time.Duration
IndexCacheTTL time.Duration
MaxFileIndexAge time.Duration // Max age of file-based indexes. Index older than this will not be reused between restarts.
MinFileIndexBuildVersion string // Minimum version of Grafana that built the file-based index. If index was built with older Grafana, it will not be reused between restarts.
MaxFileIndexAge time.Duration // Max age of file-based indexes. Index older than this will be rebuilt asynchronously.
MinFileIndexBuildVersion string // Minimum version of Grafana that built the file-based index. If index was built with older Grafana, it will be rebuilt asynchronously.
EnableSharding bool
QOSEnabled bool
QOSNumberWorker int

View File

@ -54,6 +54,7 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
cfg.MaxPageSizeBytes = section.Key("max_page_size_bytes").MustInt(0)
cfg.IndexPath = section.Key("index_path").String()
cfg.IndexWorkers = section.Key("index_workers").MustInt(10)
cfg.IndexRebuildWorkers = section.Key("index_rebuild_workers").MustInt(5)
cfg.IndexMaxBatchSize = section.Key("index_max_batch_size").MustInt(100)
cfg.EnableSharding = section.Key("enable_sharding").MustBool(false)
cfg.QOSEnabled = section.Key("qos_enabled").MustBool(false)

View File

@ -20,6 +20,7 @@ type BleveIndexMetrics struct {
UpdateLatency prometheus.Histogram
UpdatedDocuments prometheus.Summary
SearchUpdateWaitTime *prometheus.HistogramVec
RebuildQueueLength prometheus.Gauge
}
var IndexCreationBuckets = []float64{1, 5, 10, 25, 50, 75, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000}
@ -84,6 +85,10 @@ func ProvideIndexMetrics(reg prometheus.Registerer) *BleveIndexMetrics {
NativeHistogramMaxBucketNumber: 160,
NativeHistogramMinResetDuration: time.Hour,
}, []string{"reason"}),
RebuildQueueLength: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "index_server_rebuild_queue_length",
Help: "Number of indexes waiting for rebuild",
}),
}
// Initialize labels.

View File

@ -10,6 +10,7 @@ import (
"sync"
"time"
"github.com/Masterminds/semver"
"github.com/hashicorp/golang-lru/v2/expirable"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@ -24,6 +25,7 @@ import (
folders "github.com/grafana/grafana/apps/folder/pkg/apis/folder/v1beta1"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/util/debouncer"
)
const maxBatchSize = 1000
@ -61,6 +63,11 @@ type BulkIndexRequest struct {
ResourceVersion int64
}
type IndexBuildInfo struct {
BuildTime time.Time // Timestamp when the index was built. This value doesn't change on subsequent index updates.
BuildVersion *semver.Version // Grafana version used when originally building the index. This value doesn't change on subsequent index updates.
}
type ResourceIndex interface {
// BulkIndex allows for multiple index actions to be performed in a single call.
// The order of the items is guaranteed to be the same as the input
@ -82,6 +89,9 @@ type ResourceIndex interface {
// UpdateIndex updates the index with the latest data (using update function provided when index was built) to guarantee strong consistency during the search.
// Returns RV to which index was updated.
UpdateIndex(ctx context.Context, reason string) (int64, error)
// BuildInfo returns build information about the index.
BuildInfo() (IndexBuildInfo, error)
}
type BuildFn func(index ResourceIndex) (int64, error)
@ -112,6 +122,9 @@ type SearchBackend interface {
// TotalDocs returns the total number of documents across all indexes.
TotalDocs() int64
// GetOpenIndexes returns the list of indexes that are currently open.
GetOpenIndexes() []NamespacedResource
}
const tracingPrexfixSearch = "unified_search."
@ -132,8 +145,17 @@ type searchSupport struct {
buildIndex singleflight.Group
// periodic rebuilding of the indexes to keep usage insights up to date
rebuildInterval time.Duration
// since usage insights is not in unified storage, we need to periodically rebuild the index
// to make sure these data points are up to date.
dashboardIndexMaxAge time.Duration
maxIndexAge time.Duration
minBuildVersion *semver.Version
bgTaskWg sync.WaitGroup
bgTaskCancel func()
rebuildQueue *debouncer.Queue[rebuildRequest]
rebuildWorkers int
}
var (
@ -150,8 +172,12 @@ func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.A
return nil, fmt.Errorf("missing tracer")
}
if opts.WorkerThreads < 1 {
opts.WorkerThreads = 1
if opts.InitWorkerThreads < 1 {
opts.InitWorkerThreads = 1
}
if opts.IndexRebuildWorkers < 1 {
opts.IndexRebuildWorkers = 1
}
if ownsIndexFn == nil {
@ -166,13 +192,19 @@ func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.A
storage: storage,
search: opts.Backend,
log: slog.Default().With("logger", "resource-search"),
initWorkers: opts.WorkerThreads,
initWorkers: opts.InitWorkerThreads,
rebuildWorkers: opts.IndexRebuildWorkers,
initMinSize: opts.InitMinCount,
indexMetrics: indexMetrics,
rebuildInterval: opts.RebuildInterval,
ownsIndexFn: ownsIndexFn,
dashboardIndexMaxAge: opts.DashboardIndexMaxAge,
maxIndexAge: opts.MaxIndexAge,
minBuildVersion: opts.MinBuildVersion,
}
support.rebuildQueue = debouncer.NewQueue(combineRebuildRequests)
info, err := opts.Resources.GetDocumentBuilders()
if err != nil {
return nil, err
@ -186,6 +218,27 @@ func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.A
return support, err
}
func combineRebuildRequests(a, b rebuildRequest) (c rebuildRequest, ok bool) {
if a.NamespacedResource != b.NamespacedResource {
// We can only combine requests for the same keys.
return rebuildRequest{}, false
}
ret := a
// Using higher "min build version" is stricter condition, and causes more indexes to be rebuilt.
if a.minBuildVersion == nil || (b.minBuildVersion != nil && b.minBuildVersion.GreaterThan(a.minBuildVersion)) {
ret.minBuildVersion = b.minBuildVersion
}
// Using higher "min build time" is stricter condition, and causes more indexes to be rebuilt.
if a.minBuildTime.IsZero() || (!b.minBuildTime.IsZero() && b.minBuildTime.After(a.minBuildTime)) {
ret.minBuildTime = b.minBuildTime
}
return ret, true
}
func (s *searchSupport) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
if req.NextPageToken != "" {
return &resourcepb.ListManagedObjectsResponse{
@ -401,7 +454,7 @@ func (s *searchSupport) GetStats(ctx context.Context, req *resourcepb.ResourceSt
return rsp, nil
}
func (s *searchSupport) buildIndexes(ctx context.Context, rebuild bool) (int, error) {
func (s *searchSupport) buildIndexes(ctx context.Context) (int, error) {
totalBatchesIndexed := 0
group := errgroup.Group{}
group.SetLimit(s.initWorkers)
@ -412,11 +465,6 @@ func (s *searchSupport) buildIndexes(ctx context.Context, rebuild bool) (int, er
}
for _, info := range stats {
// only periodically rebuild the dashboard index, specifically to update the usage insights data
if rebuild && info.Resource != dashboardv1.DASHBOARD_RESOURCE {
continue
}
own, err := s.ownsIndexFn(info.NamespacedResource)
if err != nil {
s.log.Warn("failed to check index ownership, building index", "namespace", info.Namespace, "group", info.Group, "resource", info.Resource, "error", err)
@ -426,18 +474,11 @@ func (s *searchSupport) buildIndexes(ctx context.Context, rebuild bool) (int, er
}
group.Go(func() error {
if rebuild {
// we need to clear the cache to make sure we get the latest usage insights data
s.builders.clearNamespacedCache(info.NamespacedResource)
}
totalBatchesIndexed++
s.log.Debug("building index", "namespace", info.Namespace, "group", info.Group, "resource", info.Resource, "rebuild", rebuild)
s.log.Debug("building index", "namespace", info.Namespace, "group", info.Group, "resource", info.Resource)
reason := "init"
if rebuild {
reason = "rebuild"
}
_, err := s.build(ctx, info.NamespacedResource, info.Count, reason, rebuild)
_, err := s.build(ctx, info.NamespacedResource, info.Count, reason, false)
return err
})
}
@ -457,30 +498,41 @@ func (s *searchSupport) init(ctx context.Context) error {
defer span.End()
start := time.Now().Unix()
totalBatchesIndexed, err := s.buildIndexes(ctx, false)
totalBatchesIndexed, err := s.buildIndexes(ctx)
if err != nil {
return err
}
span.AddEvent("namespaces indexed", trace.WithAttributes(attribute.Int("namespaced_indexed", totalBatchesIndexed)))
// since usage insights is not in unified storage, we need to periodically rebuild the index
// to make sure these data points are up to date.
if s.rebuildInterval > 0 {
go s.startPeriodicRebuild(origCtx)
subctx, cancel := context.WithCancel(origCtx)
s.bgTaskCancel = cancel
for i := 0; i < s.rebuildWorkers; i++ {
s.bgTaskWg.Add(1)
go s.runIndexRebuilder(subctx)
}
s.bgTaskWg.Add(1)
go s.runPeriodicScanForIndexesToRebuild(subctx)
end := time.Now().Unix()
s.log.Info("search index initialized", "duration_secs", end-start, "total_docs", s.search.TotalDocs())
return nil
}
func (s *searchSupport) startPeriodicRebuild(ctx context.Context) {
ticker := time.NewTicker(s.rebuildInterval)
defer ticker.Stop()
func (s *searchSupport) stop() {
// Stop background tasks.
s.bgTaskCancel()
s.bgTaskWg.Wait()
}
s.log.Info("starting periodic index rebuild", "interval", s.rebuildInterval)
func (s *searchSupport) runPeriodicScanForIndexesToRebuild(ctx context.Context) {
defer s.bgTaskWg.Done()
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
@ -488,35 +540,166 @@ func (s *searchSupport) startPeriodicRebuild(ctx context.Context) {
s.log.Info("stopping periodic index rebuild due to context cancellation")
return
case <-ticker.C:
s.log.Info("starting periodic index rebuild")
if err := s.rebuildDashboardIndexes(ctx); err != nil {
s.log.Error("error during periodic index rebuild", "error", err)
} else {
s.log.Info("periodic index rebuild completed successfully")
s.findIndexesToRebuild(ctx, time.Now())
}
}
}
func (s *searchSupport) findIndexesToRebuild(ctx context.Context, now time.Time) {
// Check all open indexes and see if any of them need to be rebuilt.
// This is done periodically to make sure that the indexes are up to date.
keys := s.search.GetOpenIndexes()
for _, key := range keys {
idx, err := s.search.GetIndex(ctx, key)
if err != nil {
s.log.Error("failed to check index to rebuild", "key", key, "error", err)
continue
}
if idx == nil {
// This can happen if index was closed in the meantime.
continue
}
maxAge := s.maxIndexAge
if key.Resource == dashboardv1.DASHBOARD_RESOURCE {
maxAge = s.dashboardIndexMaxAge
}
var minBuildTime time.Time
if maxAge > 0 {
minBuildTime = now.Add(-maxAge)
}
bi, err := idx.BuildInfo()
if err != nil {
s.log.Error("failed to get build info for index to rebuild", "key", key, "error", err)
continue
}
if shouldRebuildIndex(s.minBuildVersion, bi, minBuildTime, nil) {
s.rebuildQueue.Add(rebuildRequest{
NamespacedResource: key,
minBuildTime: minBuildTime,
minBuildVersion: s.minBuildVersion,
})
if s.indexMetrics != nil {
s.indexMetrics.RebuildQueueLength.Set(float64(s.rebuildQueue.Len()))
}
}
}
}
func (s *searchSupport) rebuildDashboardIndexes(ctx context.Context) error {
ctx, span := s.tracer.Start(ctx, tracingPrexfixSearch+"RebuildDashboardIndexes")
// runIndexRebuilder is a goroutine waiting for rebuild requests, and rebuilds indexes specified in those requests.
// Rebuild requests can be generated periodically (if configured), or after new documents have been imported into the storage with old RVs.
func (s *searchSupport) runIndexRebuilder(ctx context.Context) {
defer s.bgTaskWg.Done()
for {
req, err := s.rebuildQueue.Next(ctx)
if err != nil {
s.log.Info("index rebuilder stopped", "error", err)
return
}
if s.indexMetrics != nil {
s.indexMetrics.RebuildQueueLength.Set(float64(s.rebuildQueue.Len()))
}
s.rebuildIndex(ctx, req)
}
}
func (s *searchSupport) rebuildIndex(ctx context.Context, req rebuildRequest) {
ctx, span := s.tracer.Start(ctx, tracingPrexfixSearch+"RebuildIndex")
defer span.End()
start := time.Now()
s.log.Info("rebuilding all search indexes")
l := s.log.With("namespace", req.Namespace, "group", req.Group, "resource", req.Resource)
totalBatchesIndexed, err := s.buildIndexes(ctx, true)
idx, err := s.search.GetIndex(ctx, req.NamespacedResource)
if err != nil {
return fmt.Errorf("failed to rebuild dashboard indexes: %w", err)
span.RecordError(err)
l.Error("failed to get index to rebuild", "error", err)
return
}
end := time.Now()
duration := end.Sub(start)
s.log.Info("completed rebuilding all dashboard search indexes",
"duration", duration,
"rebuilt_indexes", totalBatchesIndexed,
"total_docs", s.search.TotalDocs())
return nil
if idx == nil {
span.AddEvent("index not found")
l.Error("index not found")
return
}
bi, err := idx.BuildInfo()
if err != nil {
span.RecordError(err)
l.Error("failed to get build info for index to rebuild", "error", err)
}
rebuild := shouldRebuildIndex(req.minBuildVersion, bi, req.minBuildTime, l)
if !rebuild {
span.AddEvent("index not rebuilt")
l.Info("index doesn't need to be rebuilt")
return
}
if req.Resource == dashboardv1.DASHBOARD_RESOURCE {
// we need to clear the cache to make sure we get the latest usage insights data
s.builders.clearNamespacedCache(req.NamespacedResource)
}
// Get the correct value of size + RV for building the index. This is important for our Bleve
// backend to decide whether to build index in-memory or as file-based.
stats, err := s.storage.GetResourceStats(ctx, req.Namespace, 0)
if err != nil {
span.RecordError(fmt.Errorf("failed to get resource stats: %w", err))
l.Error("failed to get resource stats", "error", err)
return
}
size := int64(0)
for _, stat := range stats {
if stat.Namespace == req.Namespace && stat.Group == req.Group && stat.Resource == req.Resource {
size = stat.Count
break
}
}
_, err = s.build(ctx, req.NamespacedResource, size, "rebuild", true)
if err != nil {
span.RecordError(err)
l.Error("failed to rebuild index", "error", err)
}
}
func shouldRebuildIndex(minBuildVersion *semver.Version, buildInfo IndexBuildInfo, minBuildTime time.Time, rebuildLogger *slog.Logger) bool {
if !minBuildTime.IsZero() {
if buildInfo.BuildTime.IsZero() || buildInfo.BuildTime.Before(minBuildTime) {
if rebuildLogger != nil {
rebuildLogger.Info("index build time is before minBuildTime, rebuilding the index", "indexBuildTime", buildInfo.BuildTime, "minBuildTime", minBuildTime)
}
return true
}
}
if minBuildVersion != nil {
if buildInfo.BuildVersion == nil || buildInfo.BuildVersion.Compare(minBuildVersion) < 0 {
if rebuildLogger != nil {
rebuildLogger.Info("index build version is before minBuildVersion, rebuilding the index", "indexBuildVersion", buildInfo.BuildVersion, "minBuildVersion", minBuildVersion)
}
return true
}
}
return false
}
type rebuildRequest struct {
NamespacedResource
minBuildTime time.Time // if not zero, only rebuild index if it has been built before this timestamp
minBuildVersion *semver.Version // if not nil, only rebuild index with build version older than this.
}
func (s *searchSupport) getOrCreateIndex(ctx context.Context, key NamespacedResource, reason string) (ResourceIndex, error) {

View File

@ -9,11 +9,16 @@ import (
"testing"
"time"
"log/slog"
"github.com/Masterminds/semver"
"github.com/grafana/authlib/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace/noop"
dashboardv1 "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v1beta1"
"github.com/grafana/grafana/pkg/infra/log/logtest"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
@ -27,6 +32,12 @@ type MockResourceIndex struct {
updateIndexMu sync.Mutex
updateIndexCalls []string
buildInfo IndexBuildInfo
}
func (m *MockResourceIndex) BuildInfo() (IndexBuildInfo, error) {
return m.buildInfo, nil
}
func (m *MockResourceIndex) BulkIndex(req *BulkIndexRequest) error {
@ -120,9 +131,10 @@ func (m *mockStorageBackend) ListModifiedSince(ctx context.Context, key Namespac
// mockSearchBackend implements SearchBackend for testing with tracking capabilities
type mockSearchBackend struct {
openIndexes []NamespacedResource
mu sync.Mutex
buildIndexCalls []buildIndexCall
buildEmptyIndexCalls []buildEmptyIndexCall
cache map[NamespacedResource]ResourceIndex
}
@ -132,12 +144,6 @@ type buildIndexCall struct {
fields SearchableDocumentFields
}
type buildEmptyIndexCall struct {
key NamespacedResource
size int64 // should be 0 for empty indexes
fields SearchableDocumentFields
}
func (m *mockSearchBackend) GetIndex(ctx context.Context, key NamespacedResource) (ResourceIndex, error) {
m.mu.Lock()
defer m.mu.Unlock()
@ -165,21 +171,11 @@ func (m *mockSearchBackend) BuildIndex(ctx context.Context, key NamespacedResour
// Determine if this is an empty index based on size
// Empty indexes are characterized by size == 0
if size == 0 {
// This is an empty index (buildEmptyIndex was called)
m.buildEmptyIndexCalls = append(m.buildEmptyIndexCalls, buildEmptyIndexCall{
key: key,
size: size,
fields: fields,
})
} else {
// This is a normal index (build was called)
m.buildIndexCalls = append(m.buildIndexCalls, buildIndexCall{
key: key,
size: size,
fields: fields,
})
}
return index, nil
}
@ -188,6 +184,10 @@ func (m *mockSearchBackend) TotalDocs() int64 {
return 0
}
func (m *mockSearchBackend) GetOpenIndexes() []NamespacedResource {
return m.openIndexes
}
func TestSearchGetOrCreateIndex(t *testing.T) {
// Setup mock implementations
storage := &mockStorageBackend{
@ -195,21 +195,16 @@ func TestSearchGetOrCreateIndex(t *testing.T) {
{NamespacedResource: NamespacedResource{Namespace: "ns", Group: "group", Resource: "resource"}, Count: 50, ResourceVersion: 11111111},
},
}
search := &mockSearchBackend{
buildIndexCalls: []buildIndexCall{},
buildEmptyIndexCalls: []buildEmptyIndexCall{},
}
search := &mockSearchBackend{}
supplier := &TestDocumentBuilderSupplier{
GroupsResources: map[string]string{
"group": "resource",
},
}
// Create search support with the specified initMaxSize
opts := SearchOptions{
Backend: search,
Resources: supplier,
WorkerThreads: 1,
InitMinCount: 1, // set min count to default for this test
}
@ -250,9 +245,6 @@ func TestSearchGetOrCreateIndexWithIndexUpdate(t *testing.T) {
}
failedErr := fmt.Errorf("failed to update index")
search := &mockSearchBackend{
buildIndexCalls: []buildIndexCall{},
buildEmptyIndexCalls: []buildEmptyIndexCall{},
cache: map[NamespacedResource]ResourceIndex{
{Namespace: "ns", Group: "group", Resource: "bad"}: &MockResourceIndex{
updateIndexError: failedErr,
@ -265,11 +257,9 @@ func TestSearchGetOrCreateIndexWithIndexUpdate(t *testing.T) {
},
}
// Create search support with the specified initMaxSize
opts := SearchOptions{
Backend: search,
Resources: supplier,
WorkerThreads: 1,
InitMinCount: 1, // set min count to default for this test
}
@ -317,11 +307,9 @@ func TestSearchGetOrCreateIndexWithCancellation(t *testing.T) {
},
}
// Create search support with the specified initMaxSize
opts := SearchOptions{
Backend: search,
Resources: supplier,
WorkerThreads: 1,
InitMinCount: 1, // set min count to default for this test
}
@ -381,3 +369,350 @@ func (m *slowSearchBackendWithCache) BuildIndex(ctx context.Context, key Namespa
}
return idx, nil
}
func TestCombineBuildRequests(t *testing.T) {
type testcase struct {
a, b rebuildRequest
exp rebuildRequest
expOK bool
}
now := time.Now()
for name, tc := range map[string]testcase{
"mismatched resource": {
a: rebuildRequest{NamespacedResource: NamespacedResource{Namespace: "a", Group: "a", Resource: "a"}},
b: rebuildRequest{NamespacedResource: NamespacedResource{Namespace: "b", Group: "b", Resource: "b"}},
expOK: false,
},
"equal values": {
a: rebuildRequest{minBuildTime: now, minBuildVersion: semver.MustParse("10.15.20")},
b: rebuildRequest{minBuildTime: now, minBuildVersion: semver.MustParse("10.15.20")},
expOK: true,
exp: rebuildRequest{minBuildTime: now, minBuildVersion: semver.MustParse("10.15.20")},
},
"empty field": {
a: rebuildRequest{minBuildTime: now},
b: rebuildRequest{minBuildVersion: semver.MustParse("10.15.20")},
expOK: true,
exp: rebuildRequest{minBuildTime: now, minBuildVersion: semver.MustParse("10.15.20")},
},
"use max build time": {
a: rebuildRequest{minBuildTime: now.Add(2 * time.Hour)},
b: rebuildRequest{minBuildTime: now.Add(-time.Hour)},
expOK: true,
exp: rebuildRequest{minBuildTime: now.Add(2 * time.Hour)},
},
"use max version": {
a: rebuildRequest{minBuildVersion: semver.MustParse("12.10.99")},
b: rebuildRequest{minBuildVersion: semver.MustParse("10.15.20")},
expOK: true,
exp: rebuildRequest{minBuildVersion: semver.MustParse("12.10.99")},
},
"both fields": {
a: rebuildRequest{minBuildTime: now.Add(2 * time.Hour), minBuildVersion: semver.MustParse("12.10.99")},
b: rebuildRequest{minBuildTime: now.Add(-time.Hour), minBuildVersion: semver.MustParse("10.15.20")},
expOK: true,
exp: rebuildRequest{minBuildTime: now.Add(2 * time.Hour), minBuildVersion: semver.MustParse("12.10.99")},
},
} {
t.Run(name, func(t *testing.T) {
res1, ok := combineRebuildRequests(tc.a, tc.b)
require.Equal(t, tc.expOK, ok)
if ok {
require.Equal(t, tc.exp, res1)
}
// commutativity
res2, ok := combineRebuildRequests(tc.b, tc.a)
require.Equal(t, tc.expOK, ok)
if ok {
require.Equal(t, tc.exp, res2)
}
})
}
}
func TestShouldRebuildIndex(t *testing.T) {
type testcase struct {
buildInfo IndexBuildInfo
minTime time.Time
minBuildVersion *semver.Version
expected bool
}
now := time.Now()
for name, tc := range map[string]testcase{
"empty build info, with no rebuild conditions": {
buildInfo: IndexBuildInfo{},
expected: false,
},
"empty build info, with minTime": {
buildInfo: IndexBuildInfo{},
minTime: now,
expected: true,
},
"empty build info, with minVersion": {
buildInfo: IndexBuildInfo{},
minBuildVersion: semver.MustParse("10.15.20"),
expected: true,
},
"build time before min time": {
buildInfo: IndexBuildInfo{BuildTime: now.Add(-2 * time.Hour)},
minTime: now,
expected: true,
},
"build time after min time": {
buildInfo: IndexBuildInfo{BuildTime: now.Add(2 * time.Hour)},
minTime: now,
expected: false,
},
"build version before min version": {
buildInfo: IndexBuildInfo{BuildVersion: semver.MustParse("10.15.19")},
minBuildVersion: semver.MustParse("10.15.20"),
expected: true,
},
"build version after min version": {
buildInfo: IndexBuildInfo{BuildVersion: semver.MustParse("11.0.0")},
minBuildVersion: semver.MustParse("10.15.20"),
expected: false,
},
} {
t.Run(name, func(t *testing.T) {
res := shouldRebuildIndex(tc.minBuildVersion, tc.buildInfo, tc.minTime, slog.New(&logtest.NopHandler{}))
require.Equal(t, tc.expected, res)
})
}
}
func TestFindIndexesForRebuild(t *testing.T) {
storage := &mockStorageBackend{
resourceStats: []ResourceStats{
{NamespacedResource: NamespacedResource{Namespace: "ns", Group: "group", Resource: "resource"}, Count: 50, ResourceVersion: 11111111},
},
}
now := time.Now()
search := &mockSearchBackend{
openIndexes: []NamespacedResource{
{Namespace: "resource-2h-v5", Group: "group", Resource: "folder"},
{Namespace: "resource-2h-v6", Group: "group", Resource: "folder"},
{Namespace: "resource-10h-v5", Group: "group", Resource: "folder"},
{Namespace: "resource-10h-v6", Group: "group", Resource: "folder"},
{Namespace: "resource-v5", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE},
{Namespace: "resource-v6", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE},
{Namespace: "resource-2h-v5", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE},
{Namespace: "resource-2h-v6", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE},
// We report this index as open, but it's really not. This can happen if index expires between the call
// to GetOpenIndexes and the call to GetIndex.
{Namespace: "ns", Group: "group", Resource: "missing"},
},
cache: map[NamespacedResource]ResourceIndex{
// To be rebuilt because of minVersion
{Namespace: "resource-2h-v5", Group: "group", Resource: "folder"}: &MockResourceIndex{
buildInfo: IndexBuildInfo{BuildTime: now.Add(-2 * time.Hour), BuildVersion: semver.MustParse("5.0.0")},
},
// Not rebuilt
{Namespace: "resource-2h-v6", Group: "group", Resource: "folder"}: &MockResourceIndex{
buildInfo: IndexBuildInfo{BuildTime: now.Add(-2 * time.Hour), BuildVersion: semver.MustParse("6.0.0")},
},
// To be rebuilt because of minTime
{Namespace: "resource-10h-v5", Group: "group", Resource: "folder"}: &MockResourceIndex{
buildInfo: IndexBuildInfo{BuildTime: now.Add(-10 * time.Hour), BuildVersion: semver.MustParse("5.0.0")},
},
// To be rebuilt because of minTime
{Namespace: "resource-10h-v6", Group: "group", Resource: "folder"}: &MockResourceIndex{
buildInfo: IndexBuildInfo{BuildTime: now.Add(-10 * time.Hour), BuildVersion: semver.MustParse("6.0.0")},
},
// To be rebuilt because of minVersion
{Namespace: "resource-v5", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}: &MockResourceIndex{
buildInfo: IndexBuildInfo{BuildTime: now, BuildVersion: semver.MustParse("5.0.0")},
},
// Not rebuilt
{Namespace: "resource-v6", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}: &MockResourceIndex{
buildInfo: IndexBuildInfo{BuildTime: now, BuildVersion: semver.MustParse("6.0.0")},
},
// To be rebuilt because of minTime (1h for dashboards)
{Namespace: "resource-2h-v5", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}: &MockResourceIndex{
buildInfo: IndexBuildInfo{BuildTime: now.Add(-2 * time.Hour), BuildVersion: semver.MustParse("5.0.0")},
},
// To be rebuilt because of minTime (1h for dashboards)
{Namespace: "resource-2h-v6", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}: &MockResourceIndex{
buildInfo: IndexBuildInfo{BuildTime: now.Add(-2 * time.Hour), BuildVersion: semver.MustParse("6.0.0")},
},
},
}
supplier := &TestDocumentBuilderSupplier{
GroupsResources: map[string]string{
"group": "resource",
},
}
opts := SearchOptions{
Backend: search,
Resources: supplier,
DashboardIndexMaxAge: 1 * time.Hour,
MaxIndexAge: 5 * time.Hour,
MinBuildVersion: semver.MustParse("5.5.5"),
}
support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil, nil)
require.NoError(t, err)
require.NotNil(t, support)
support.findIndexesToRebuild(context.Background(), now)
require.Equal(t, 6, support.rebuildQueue.Len())
now5m := now.Add(5 * time.Minute)
// Running findIndexesToRebuild again should not add any new indexes to the rebuild queue, and all existing
// ones should be "combined" with new ones (this will "bump" minBuildTime)
support.findIndexesToRebuild(context.Background(), now5m)
require.Equal(t, 6, support.rebuildQueue.Len())
// Values that we expect to find in rebuild requests.
minBuildVersion := semver.MustParse("5.5.5")
minBuildTime := now5m.Add(-5 * time.Hour)
minBuildTimeDashboard := now5m.Add(-1 * time.Hour)
vals := support.rebuildQueue.Elements()
require.ElementsMatch(t, vals, []rebuildRequest{
{NamespacedResource: NamespacedResource{Namespace: "resource-2h-v5", Group: "group", Resource: "folder"}, minBuildVersion: minBuildVersion, minBuildTime: minBuildTime},
{NamespacedResource: NamespacedResource{Namespace: "resource-10h-v5", Group: "group", Resource: "folder"}, minBuildVersion: minBuildVersion, minBuildTime: minBuildTime},
{NamespacedResource: NamespacedResource{Namespace: "resource-10h-v6", Group: "group", Resource: "folder"}, minBuildVersion: minBuildVersion, minBuildTime: minBuildTime},
{NamespacedResource: NamespacedResource{Namespace: "resource-v5", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}, minBuildVersion: minBuildVersion, minBuildTime: minBuildTimeDashboard},
{NamespacedResource: NamespacedResource{Namespace: "resource-2h-v5", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}, minBuildVersion: minBuildVersion, minBuildTime: minBuildTimeDashboard},
{NamespacedResource: NamespacedResource{Namespace: "resource-2h-v6", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}, minBuildVersion: minBuildVersion, minBuildTime: minBuildTimeDashboard},
})
}
func TestRebuildIndexes(t *testing.T) {
storage := &mockStorageBackend{}
now := time.Now()
search := &mockSearchBackend{
cache: map[NamespacedResource]ResourceIndex{
{Namespace: "idx1", Group: "group", Resource: "res"}: &MockResourceIndex{
buildInfo: IndexBuildInfo{BuildVersion: semver.MustParse("5.0.0")},
},
{Namespace: "idx2", Group: "group", Resource: "res"}: &MockResourceIndex{
buildInfo: IndexBuildInfo{BuildTime: now.Add(-2 * time.Hour)},
},
{Namespace: "idx3", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}: &MockResourceIndex{},
},
}
supplier := &TestDocumentBuilderSupplier{
GroupsResources: map[string]string{
"group": "resource",
},
}
opts := SearchOptions{
Backend: search,
Resources: supplier,
}
support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil, nil)
require.NoError(t, err)
require.NotNil(t, support)
// Note: we can only rebuild each index once, after that it "loses" it's build info.
t.Run("Don't rebuild if min build version is old", func(t *testing.T) {
checkRebuildIndex(t, support, rebuildRequest{
NamespacedResource: NamespacedResource{Namespace: "idx1", Group: "group", Resource: "res"},
minBuildVersion: semver.MustParse("4.5"),
}, true, false)
})
t.Run("Rebuild if min build version is more recent", func(t *testing.T) {
checkRebuildIndex(t, support, rebuildRequest{
NamespacedResource: NamespacedResource{Namespace: "idx1", Group: "group", Resource: "res"},
minBuildVersion: semver.MustParse("5.5.5"),
}, true, true)
})
t.Run("Don't rebuild if min build time is very old", func(t *testing.T) {
checkRebuildIndex(t, support, rebuildRequest{
NamespacedResource: NamespacedResource{Namespace: "idx2", Group: "group", Resource: "res"},
minBuildTime: now.Add(-5 * time.Hour),
}, true, false)
})
t.Run("Rebuild if min build time is more recent", func(t *testing.T) {
checkRebuildIndex(t, support, rebuildRequest{
NamespacedResource: NamespacedResource{Namespace: "idx2", Group: "group", Resource: "res"},
minBuildTime: now.Add(-1 * time.Hour),
}, true, true)
})
t.Run("Don't rebuild if index doesn't exist.", func(t *testing.T) {
checkRebuildIndex(t, support, rebuildRequest{
NamespacedResource: NamespacedResource{Namespace: "unknown", Group: "group", Resource: "res"},
minBuildTime: now.Add(-5 * time.Hour),
}, false, true)
})
t.Run("Rebuild dashboard index (it has no build info), verify that builders cache was emptied.", func(t *testing.T) {
dashKey := NamespacedResource{Namespace: "idx3", Group: "group", Resource: dashboardv1.DASHBOARD_RESOURCE}
support.builders.ns.Add(dashKey, &MockDocumentBuilder{})
_, ok := support.builders.ns.Get(dashKey)
require.True(t, ok)
checkRebuildIndex(t, support, rebuildRequest{
NamespacedResource: dashKey,
minBuildTime: now,
}, true, true)
// Verify that builders cache was emptied.
_, ok = support.builders.ns.Get(dashKey)
require.False(t, ok)
})
}
func checkRebuildIndex(t *testing.T, support *searchSupport, req rebuildRequest, indexExists, expectedRebuild bool) {
ctx := context.Background()
idxBefore, err := support.search.GetIndex(ctx, req.NamespacedResource)
require.NoError(t, err)
if indexExists {
require.NotNil(t, idxBefore, "index should exist before rebuildIndex")
} else {
require.Nil(t, idxBefore, "index should not exist before rebuildIndex")
}
support.rebuildIndex(ctx, req)
idxAfter, err := support.search.GetIndex(ctx, req.NamespacedResource)
require.NoError(t, err)
if indexExists {
require.NotNil(t, idxAfter, "index should exist after rebuildIndex")
if expectedRebuild {
require.NotSame(t, idxBefore, idxAfter, "index should be rebuilt")
} else {
require.Same(t, idxBefore, idxAfter, "index should not be rebuilt")
}
} else {
require.Nil(t, idxAfter, "index should not exist after rebuildIndex")
}
}

View File

@ -11,6 +11,7 @@ import (
"sync/atomic"
"time"
"github.com/Masterminds/semver"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
@ -21,7 +22,6 @@ import (
claims "github.com/grafana/authlib/types"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/ring"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/apimachinery/validation"
@ -179,15 +179,22 @@ type SearchOptions struct {
Resources DocumentBuilderSupplier
// How many threads should build indexes
WorkerThreads int
InitWorkerThreads int
// Skip building index on startup for small indexes
InitMinCount int
// Interval for periodic index rebuilds (0 disables periodic rebuilds)
RebuildInterval time.Duration
// How often to rebuild dashboard index. 0 disables periodic rebuilds.
DashboardIndexMaxAge time.Duration
Ring *ring.Ring
// Maximum age of file-based index that can be reused. Ignored if zero.
MaxIndexAge time.Duration
// Minimum build version for reusing file-based indexes. Ignored if nil.
MinBuildVersion *semver.Version
// Number of workers to use for index rebuilds.
IndexRebuildWorkers int
}
type ResourceServerOptions struct {
@ -422,6 +429,10 @@ func (s *server) Stop(ctx context.Context) error {
}
}
if s.search != nil {
s.search.stop()
}
// Stops the streaming
s.cancel()

View File

@ -52,7 +52,7 @@ const (
// Keys used to store internal data in index.
const (
internalRVKey = "rv" // Encoded as big-endian int64
internalBuildInfoKey = "build_info" // Encoded as JSON of IndexBuildInfo struct
internalBuildInfoKey = "build_info" // Encoded as JSON of buildInfo struct
)
var _ resource.SearchBackend = &bleveBackend{}
@ -75,9 +75,6 @@ type BleveOptions struct {
BuildVersion string
MaxFileIndexAge time.Duration // Maximum age of file-based index that can be reused. Ignored if zero.
MinBuildVersion *semver.Version // Minimum build version for reusing file-based indexes. Ignored if nil.
Logger *slog.Logger
UseFullNgram bool
@ -179,6 +176,17 @@ func (b *bleveBackend) GetIndex(_ context.Context, key resource.NamespacedResour
return idx, nil
}
func (b *bleveBackend) GetOpenIndexes() []resource.NamespacedResource {
b.cacheMx.RLock()
defer b.cacheMx.RUnlock()
result := make([]resource.NamespacedResource, 0, len(b.cache))
for key := range b.cache {
result = append(result, key)
}
return result
}
func (b *bleveBackend) getCachedIndex(key resource.NamespacedResource, now time.Time) *bleveIndex {
// Check index with read-lock first.
b.cacheMx.RLock()
@ -318,7 +326,7 @@ func newBleveIndex(path string, mapper mapping.IndexMapping, buildTime time.Time
return nil, err
}
bi := IndexBuildInfo{
bi := buildInfo{
BuildTime: buildTime.Unix(),
BuildVersion: buildVersion,
}
@ -336,29 +344,11 @@ func newBleveIndex(path string, mapper mapping.IndexMapping, buildTime time.Time
return ix, nil
}
type IndexBuildInfo struct {
type buildInfo struct {
BuildTime int64 `json:"build_time"` // Unix seconds timestamp of time when the index was built
BuildVersion string `json:"build_version"` // Grafana version used when building the index
}
func (bi IndexBuildInfo) GetBuildTime() time.Time {
if bi.BuildTime == 0 {
return time.Time{}
}
return time.Unix(bi.BuildTime, 0)
}
func (bi IndexBuildInfo) GetBuildVersion() *semver.Version {
if bi.BuildVersion == "" {
return nil
}
v, err := semver.NewVersion(bi.BuildVersion)
if err != nil {
return nil
}
return v
}
// BuildIndex builds an index from scratch or retrieves it from the filesystem.
// If built successfully, the new index replaces the old index in the cache (if there was any).
// Existing index in the file system is reused, if it exists, and if size indicates that we should use file-based index, and rebuild is not true.
@ -435,11 +425,7 @@ func (b *bleveBackend) BuildIndex(
// This happens on startup, or when memory-based index has expired. (We don't expire file-based indexes)
// If we do have an unexpired cached index already, we always build a new index from scratch.
if cachedIndex == nil && !rebuild {
minBuildTime := time.Time{}
if b.opts.MaxFileIndexAge > 0 {
minBuildTime = time.Now().Add(-b.opts.MaxFileIndexAge)
}
index, fileIndexName, indexRV = b.findPreviousFileBasedIndex(resourceDir, minBuildTime, b.opts.MinBuildVersion)
index, fileIndexName, indexRV = b.findPreviousFileBasedIndex(resourceDir)
}
if index != nil {
@ -621,18 +607,6 @@ func isPathWithinRoot(path, absoluteRoot string) bool {
return true
}
// cacheKeys returns list of keys for indexes in the cache (including possibly expired ones).
func (b *bleveBackend) cacheKeys() []resource.NamespacedResource {
b.cacheMx.RLock()
defer b.cacheMx.RUnlock()
keys := make([]resource.NamespacedResource, 0, len(b.cache))
for k := range b.cache {
keys = append(keys, k)
}
return keys
}
// TotalDocs returns the total number of documents across all indices
func (b *bleveBackend) TotalDocs() int64 {
var totalDocs int64
@ -640,7 +614,7 @@ func (b *bleveBackend) TotalDocs() int64 {
// We do this to avoid keeping a lock for the entire TotalDocs function, since DocCount may be slow (due to disk access).
now := time.Now()
for _, key := range b.cacheKeys() {
for _, key := range b.GetOpenIndexes() {
idx := b.getCachedIndex(key, now)
if idx == nil {
continue
@ -658,7 +632,7 @@ func formatIndexName(now time.Time) string {
return now.Format("20060102-150405")
}
func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string, minBuildTime time.Time, minBuildVersion *semver.Version) (bleve.Index, string, int64) {
func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string) (bleve.Index, string, int64) {
entries, err := os.ReadDir(resourceDir)
if err != nil {
return nil, "", 0
@ -684,31 +658,6 @@ func (b *bleveBackend) findPreviousFileBasedIndex(resourceDir string, minBuildTi
continue
}
buildInfo, err := getBuildInfo(idx)
if err != nil {
b.log.Error("error getting build info from index", "indexDir", indexDir, "err", err)
_ = idx.Close()
continue
}
if !minBuildTime.IsZero() {
bt := buildInfo.GetBuildTime()
if bt.IsZero() || bt.Before(minBuildTime) {
b.log.Debug("index build time is before minBuildTime, not reusing the index", "indexDir", indexDir, "indexBuildTime", bt, "minBuildTime", minBuildTime)
_ = idx.Close()
continue
}
}
if minBuildVersion != nil {
bv := buildInfo.GetBuildVersion()
if bv == nil || bv.Compare(minBuildVersion) < 0 {
b.log.Debug("index build version is before minBuildVersion, not reusing the index", "indexDir", indexDir, "indexBuildVersion", bv, "minBuildVersion", minBuildVersion)
_ = idx.Close()
continue
}
}
return idx, indexName, indexRV
}
@ -878,21 +827,46 @@ func getRV(index bleve.Index) (int64, error) {
return int64(binary.BigEndian.Uint64(raw)), nil
}
func getBuildInfo(index bleve.Index) (IndexBuildInfo, error) {
func getBuildInfo(index bleve.Index) (buildInfo, error) {
raw, err := index.GetInternal([]byte(internalBuildInfoKey))
if err != nil {
return IndexBuildInfo{}, err
return buildInfo{}, err
}
if len(raw) == 0 {
return IndexBuildInfo{}, nil
return buildInfo{}, nil
}
res := IndexBuildInfo{}
res := buildInfo{}
err = json.Unmarshal(raw, &res)
return res, err
}
func (b *bleveIndex) BuildInfo() (resource.IndexBuildInfo, error) {
bi, err := getBuildInfo(b.index)
if err != nil {
return resource.IndexBuildInfo{}, err
}
bt := time.Time{}
if bi.BuildTime > 0 {
bt = time.Unix(bi.BuildTime, 0)
}
var bv *semver.Version
if bi.BuildVersion != "" {
v, err := semver.NewVersion(bi.BuildVersion)
if err == nil {
bv = v
}
}
return resource.IndexBuildInfo{
BuildTime: bt,
BuildVersion: bv,
}, nil
}
func (b *bleveIndex) ListManagedObjects(ctx context.Context, req *resourcepb.ListManagedObjectsRequest) (*resourcepb.ListManagedObjectsResponse, error) {
if req.NextPageToken != "" {
return nil, fmt.Errorf("next page not implemented yet")

View File

@ -14,7 +14,6 @@ import (
"testing"
"time"
"github.com/Masterminds/semver"
"github.com/blevesearch/bleve/v2"
authlib "github.com/grafana/authlib/types"
"github.com/prometheus/client_golang/prometheus"
@ -827,24 +826,6 @@ func withRootDir(root string) setupOption {
}
}
func withBuildVersion(version string) setupOption {
return func(options *BleveOptions) {
options.BuildVersion = version
}
}
func withMinBuildVersion(version *semver.Version) setupOption {
return func(options *BleveOptions) {
options.MinBuildVersion = version
}
}
func withMaxFileIndexAge(maxAge time.Duration) setupOption {
return func(options *BleveOptions) {
options.MaxFileIndexAge = maxAge
}
}
func withOwnsIndexFn(fn func(key resource.NamespacedResource) (bool, error)) setupOption {
return func(options *BleveOptions) {
options.OwnsIndex = fn
@ -978,47 +959,8 @@ func TestBuildIndex(t *testing.T) {
Resource: "resource",
}
const alwaysRebuildDueToAge = 1 * time.Nanosecond
const neverRebuildDueToAge = 1 * time.Hour
for _, rebuild := range []bool{false, true} {
for _, version := range []string{"", "12.5.123"} {
for _, minBuildVersion := range []*semver.Version{nil, semver.MustParse("12.0.0"), semver.MustParse("13.0.0")} {
for _, maxIndexAge := range []time.Duration{0, alwaysRebuildDueToAge, neverRebuildDueToAge} {
shouldRebuild := rebuild
if minBuildVersion != nil {
shouldRebuild = shouldRebuild || version == "" || minBuildVersion.GreaterThan(semver.MustParse(version))
}
if maxIndexAge > 0 {
shouldRebuild = shouldRebuild || maxIndexAge == alwaysRebuildDueToAge
}
testName := ""
if shouldRebuild {
testName += "should REBUILD index"
} else {
testName += "should REUSE index"
}
if rebuild {
testName += " when rebuild is true"
} else {
testName += " when rebuild is false"
}
if version != "" {
testName += " build version is " + version
} else {
testName += " build version is empty"
}
if minBuildVersion != nil {
testName += " min build version is " + minBuildVersion.String()
} else {
testName += " min build version is nil"
}
testName += " max index age is " + maxIndexAge.String()
testName := fmt.Sprintf("rebuild=%t", rebuild)
t.Run(testName, func(t *testing.T) {
tmpDir := t.TempDir()
@ -1029,7 +971,7 @@ func TestBuildIndex(t *testing.T) {
)
{
backend, _ := setupBleveBackend(t, withFileThreshold(5), withRootDir(tmpDir), withBuildVersion(version))
backend, _ := setupBleveBackend(t, withFileThreshold(5), withRootDir(tmpDir))
_, err := backend.BuildIndex(context.Background(), ns, firstIndexDocsCount, nil, "test", indexTestDocs(ns, firstIndexDocsCount, 100), nil, rebuild)
require.NoError(t, err)
backend.Stop()
@ -1038,13 +980,13 @@ func TestBuildIndex(t *testing.T) {
// Make sure we pass at least 1 nanosecond (alwaysRebuildDueToAge) to ensure that the index needs to be rebuild.
time.Sleep(1 * time.Millisecond)
newBackend, _ := setupBleveBackend(t, withFileThreshold(5), withRootDir(tmpDir), withBuildVersion(version), withMinBuildVersion(minBuildVersion), withMaxFileIndexAge(maxIndexAge))
newBackend, _ := setupBleveBackend(t, withFileThreshold(5), withRootDir(tmpDir))
idx, err := newBackend.BuildIndex(context.Background(), ns, secondIndexDocsCount, nil, "test", indexTestDocs(ns, secondIndexDocsCount, 100), nil, rebuild)
require.NoError(t, err)
cnt, err := idx.DocCount(context.Background(), "")
require.NoError(t, err)
if shouldRebuild {
if rebuild {
require.Equal(t, int64(secondIndexDocsCount), cnt, "Index has been not rebuilt")
} else {
require.Equal(t, int64(firstIndexDocsCount), cnt, "Index has not been reused")
@ -1052,9 +994,6 @@ func TestBuildIndex(t *testing.T) {
})
}
}
}
}
}
func TestRebuildingIndexClosesPreviousCachedIndex(t *testing.T) {
ns := resource.NamespacedResource{

View File

@ -46,8 +46,6 @@ func NewSearchOptions(
BatchSize: cfg.IndexMaxBatchSize, // This is the batch size for how many objects to add to the index at once
IndexCacheTTL: cfg.IndexCacheTTL, // How long to keep the index cache in memory
BuildVersion: cfg.BuildVersion,
MaxFileIndexAge: cfg.MaxFileIndexAge,
MinBuildVersion: minVersion,
UseFullNgram: features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageUseFullNgram),
OwnsIndex: ownsIndexFn,
}, tracer, indexMetrics)
@ -59,9 +57,12 @@ func NewSearchOptions(
return resource.SearchOptions{
Backend: bleve,
Resources: docs,
WorkerThreads: cfg.IndexWorkers,
InitWorkerThreads: cfg.IndexWorkers,
IndexRebuildWorkers: cfg.IndexRebuildWorkers,
InitMinCount: cfg.IndexMinCount,
RebuildInterval: cfg.IndexRebuildInterval,
DashboardIndexMaxAge: cfg.IndexRebuildInterval,
MaxIndexAge: cfg.MaxFileIndexAge,
MinBuildVersion: minVersion,
}, nil
}
return resource.SearchOptions{}, nil

119
pkg/util/debouncer/queue.go Normal file
View File

@ -0,0 +1,119 @@
package debouncer
import (
"context"
"errors"
"slices"
"sync"
)
type CombineFn[T any] func(a, b T) (c T, ok bool)
// Queue is a queue of elements. Elements added to the queue can be combined together by the provided combiner function.
// Once the queue is closed, no more elements can be added, but Next() will still return remaining elements.
type Queue[T any] struct {
combineFn CombineFn[T]
mu sync.Mutex
elements []T
closed bool
waitChan chan struct{} // if not nil, will be closed when new element is added
}
func NewQueue[T any](combineFn CombineFn[T]) *Queue[T] {
return &Queue[T]{
combineFn: combineFn,
}
}
func (q *Queue[T]) Len() int {
q.mu.Lock()
defer q.mu.Unlock()
return len(q.elements)
}
// Elements returns copy of the queue.
func (q *Queue[T]) Elements() []T {
q.mu.Lock()
defer q.mu.Unlock()
return slices.Clone(q.elements)
}
func (q *Queue[T]) Add(n T) {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
panic("queue already closed")
}
for i, e := range q.elements {
if c, ok := q.combineFn(e, n); ok {
// No need to signal, since we are not adding new element.
q.elements[i] = c
return
}
}
q.elements = append(q.elements, n)
q.notifyWaiters()
}
// Must be called with lock held.
func (q *Queue[T]) notifyWaiters() {
if q.waitChan != nil {
// Wakes up all waiting goroutines (but also possibly zero, if they stopped waiting already).
close(q.waitChan)
q.waitChan = nil
}
}
var ErrClosed = errors.New("queue closed")
// Next returns the next element in the queue. If no element is available, Next will block until
// an element is added to the queue, or provided context is done.
// If the queue is closed, ErrClosed is returned.
func (q *Queue[T]) Next(ctx context.Context) (T, error) {
var zero T
q.mu.Lock()
unlockInDefer := true
defer func() {
if unlockInDefer {
q.mu.Unlock()
}
}()
for len(q.elements) == 0 {
if q.closed {
return zero, ErrClosed
}
// Wait for an element. Make sure there's a wait channel that we can use.
wch := q.waitChan
if wch == nil {
wch = make(chan struct{})
q.waitChan = wch
}
// Unlock before waiting
q.mu.Unlock()
select {
case <-ctx.Done():
unlockInDefer = false
return zero, ctx.Err()
case <-wch:
q.mu.Lock()
}
}
first := q.elements[0]
q.elements = q.elements[1:]
return first, nil
}
func (q *Queue[T]) Close() {
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
q.notifyWaiters()
}

View File

@ -0,0 +1,155 @@
package debouncer
import (
"context"
"errors"
"math/rand"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/goleak"
)
// This verifies that all goroutines spawned from tests are finished at the end of tests.
// Applies to all tests in the package.
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
func TestQueueBasic(t *testing.T) {
q := NewQueue(func(a, b int) (c int, ok bool) {
return a + b, true
})
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
// Empty queue will time out.
require.Equal(t, context.DeadlineExceeded, nextErr(t, q, ctx))
q.Add(10)
require.Equal(t, 10, next(t, q))
require.Equal(t, 0, q.Len())
q.Add(20)
require.Equal(t, 20, next(t, q))
require.Equal(t, 0, q.Len())
q.Add(10)
require.Equal(t, 1, q.Len())
q.Add(20)
require.Equal(t, 1, q.Len())
require.Equal(t, 30, next(t, q))
require.Equal(t, 0, q.Len())
q.Add(100)
require.Equal(t, 1, q.Len())
q.Close()
require.Equal(t, 1, q.Len())
require.Equal(t, 100, next(t, q))
require.Equal(t, ErrClosed, nextErr(t, q, context.Background()))
require.Equal(t, 0, q.Len())
// We can call Next repeatedly, but will always get error.
require.Equal(t, ErrClosed, nextErr(t, q, context.Background()))
}
func TestQueueConcurrency(t *testing.T) {
q := NewQueue(func(a, b int64) (c int64, ok bool) {
// Combine the same numbers together.
if a == b {
return a + b, true
}
return 0, false
})
const numbers = 10000
const writeConcurrency = 50
const readConcurrency = 25
r := rand.New(rand.NewSource(time.Now().UnixNano()))
totalWrittenSum := atomic.NewInt64(0)
totalReadSum := atomic.NewInt64(0)
addCalls := atomic.NewInt64(0)
nextCalls := atomic.NewInt64(0)
// We will add some numbers to the queue.
writesWG := sync.WaitGroup{}
for i := 0; i < writeConcurrency; i++ {
writesWG.Add(1)
go func() {
defer writesWG.Done()
for j := 0; j < numbers; j++ {
v := r.Int63n(100) // Generate small number, so that we have a chance for combining some numbers.
q.Add(v)
addCalls.Inc()
totalWrittenSum.Add(v)
}
}()
}
readsWG := sync.WaitGroup{}
for i := 0; i < readConcurrency; i++ {
readsWG.Add(1)
go func() {
defer readsWG.Done()
for {
v, err := q.Next(context.Background())
if errors.Is(err, ErrClosed) {
return
}
require.NoError(t, err)
nextCalls.Inc()
totalReadSum.Add(v)
}
}()
}
writesWG.Wait()
// Close queue after sending all numbers. This signals readers that they can stop.
q.Close()
// Wait until all readers finish too.
readsWG.Wait()
// Verify that all numbers were sent, combined and received.
require.Equal(t, int64(writeConcurrency*numbers), addCalls.Load())
require.Equal(t, totalWrittenSum.Load(), totalReadSum.Load())
require.LessOrEqual(t, nextCalls.Load(), addCalls.Load())
t.Log("add calls:", addCalls.Load(), "next calls:", nextCalls.Load(), "total written sum:", totalWrittenSum.Load(), "total read sum:", totalReadSum.Load())
}
func TestQueueCloseUnblocksReaders(t *testing.T) {
q := NewQueue(func(a, b int) (c int, ok bool) {
return a + b, true
})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(50 * time.Millisecond)
q.Close()
}()
_, err := q.Next(context.Background())
require.ErrorIs(t, err, ErrClosed)
wg.Wait()
}
func next[T any](t *testing.T, q *Queue[T]) T {
v, err := q.Next(context.Background())
require.NoError(t, err)
return v
}
func nextErr[T any](t *testing.T, q *Queue[T], ctx context.Context) error {
_, err := q.Next(ctx)
require.Error(t, err)
return err
}