2024-11-21 13:53:25 +08:00
package resource
import (
2025-01-13 23:05:04 +08:00
"cmp"
2024-11-21 13:53:25 +08:00
"context"
"fmt"
2025-07-24 03:59:24 +08:00
"hash/fnv"
2024-11-21 13:53:25 +08:00
"log/slog"
2024-12-05 05:02:40 +08:00
"slices"
2024-12-11 02:37:37 +08:00
"strings"
2024-11-21 13:53:25 +08:00
"sync"
"time"
"github.com/hashicorp/golang-lru/v2/expirable"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
2025-07-10 19:54:10 +08:00
"golang.org/x/sync/singleflight"
2025-06-27 20:00:39 +08:00
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2024-11-21 13:53:25 +08:00
"k8s.io/apimachinery/pkg/runtime/schema"
2024-11-22 21:44:06 +08:00
2025-01-21 17:06:55 +08:00
"github.com/grafana/authlib/types"
2025-07-24 03:59:24 +08:00
"github.com/grafana/dskit/ring"
2025-05-16 03:36:52 +08:00
2025-04-24 01:54:35 +08:00
dashboardv1 "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v1beta1"
2025-04-15 04:20:10 +08:00
folders "github.com/grafana/grafana/apps/folder/pkg/apis/folder/v1beta1"
2025-05-16 03:36:52 +08:00
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
2024-11-21 13:53:25 +08:00
)
2025-05-09 21:36:21 +08:00
const maxBatchSize = 1000
2024-11-21 13:53:25 +08:00
type NamespacedResource struct {
Namespace string
Group string
Resource string
}
2024-11-22 21:44:06 +08:00
// All fields are set
func ( s * NamespacedResource ) Valid ( ) bool {
return s . Namespace != "" && s . Group != "" && s . Resource != ""
}
2025-06-17 22:28:51 +08:00
func ( s * NamespacedResource ) String ( ) string {
return fmt . Sprintf ( "%s/%s/%s" , s . Namespace , s . Group , s . Resource )
}
2025-05-09 21:36:21 +08:00
type IndexAction int
2024-11-22 21:44:06 +08:00
2025-05-09 21:36:21 +08:00
const (
ActionIndex IndexAction = iota
ActionDelete
)
2024-11-22 21:44:06 +08:00
2025-05-09 21:36:21 +08:00
type BulkIndexItem struct {
Action IndexAction
2025-05-16 03:36:52 +08:00
Key * resourcepb . ResourceKey // Only used for delete actions
Doc * IndexableDocument // Only used for index actions
2025-05-09 21:36:21 +08:00
}
type BulkIndexRequest struct {
Items [ ] * BulkIndexItem
ResourceVersion int64
}
type ResourceIndex interface {
// BulkIndex allows for multiple index actions to be performed in a single call.
// The order of the items is guaranteed to be the same as the input
BulkIndex ( req * BulkIndexRequest ) error
2024-11-22 21:44:06 +08:00
// Search within a namespaced resource
// When working with federated queries, the additional indexes will be passed in explicitly
2025-05-16 03:36:52 +08:00
Search ( ctx context . Context , access types . AccessClient , req * resourcepb . ResourceSearchRequest , federate [ ] ResourceIndex ) ( * resourcepb . ResourceSearchResponse , error )
2024-11-22 21:44:06 +08:00
2025-01-11 02:27:10 +08:00
// List within an response
2025-05-16 03:36:52 +08:00
ListManagedObjects ( ctx context . Context , req * resourcepb . ListManagedObjectsRequest ) ( * resourcepb . ListManagedObjectsResponse , error )
2025-01-11 02:27:10 +08:00
// Counts the values in a repo
2025-05-16 03:36:52 +08:00
CountManagedObjects ( ctx context . Context ) ( [ ] * resourcepb . CountManagedObjectsResponse_ResourceCount , error )
2024-12-05 05:02:40 +08:00
// Get the number of documents in the index
2024-12-11 02:37:37 +08:00
DocCount ( ctx context . Context , folder string ) ( int64 , error )
2024-11-22 21:44:06 +08:00
}
// SearchBackend contains the technology specific logic to support search
2024-11-21 13:53:25 +08:00
type SearchBackend interface {
2025-07-10 19:54:10 +08:00
// GetIndex returns existing index, or nil.
2024-11-22 21:44:06 +08:00
GetIndex ( ctx context . Context , key NamespacedResource ) ( ResourceIndex , error )
2025-07-10 19:54:10 +08:00
// BuildIndex builds an index from scratch.
// Depending on the size, the backend may choose different options (eg: memory vs disk).
// The last known resource version can be used to detect that nothing has changed, and existing on-disk index can be reused.
// The builder will write all documents before returning.
BuildIndex ( ctx context . Context , key NamespacedResource , size int64 , resourceVersion int64 , nonStandardFields SearchableDocumentFields , builder func ( index ResourceIndex ) ( int64 , error ) ) ( ResourceIndex , error )
2024-11-22 21:44:06 +08:00
2025-07-10 19:54:10 +08:00
// TotalDocs returns the total number of documents across all indexes.
2024-12-05 05:02:40 +08:00
TotalDocs ( ) int64
2024-11-21 13:53:25 +08:00
}
const tracingPrexfixSearch = "unified_search."
// This supports indexing+search regardless of implementation
type searchSupport struct {
2025-03-13 22:09:38 +08:00
tracer trace . Tracer
log * slog . Logger
storage StorageBackend
search SearchBackend
indexMetrics * BleveIndexMetrics
access types . AccessClient
builders * builderCache
initWorkers int
initMinSize int
2025-06-27 20:00:39 +08:00
initMaxSize int
2025-05-09 21:36:21 +08:00
2025-07-24 03:59:24 +08:00
ring * ring . Ring
ringLifecycler * ring . BasicLifecycler
2025-07-10 19:54:10 +08:00
buildIndex singleflight . Group
2025-05-09 21:36:21 +08:00
// Index queue processors
indexQueueProcessorsMutex sync . Mutex
indexQueueProcessors map [ string ] * indexQueueProcessor
indexEventsChan chan * IndexEvent
// testing
clientIndexEventsChan chan * IndexEvent
2025-06-13 03:34:48 +08:00
// periodic rebuilding of the indexes to keep usage insights up to date
rebuildInterval time . Duration
2024-11-21 13:53:25 +08:00
}
2024-11-27 13:57:53 +08:00
var (
2025-05-16 03:36:52 +08:00
_ resourcepb . ResourceIndexServer = ( * searchSupport ) ( nil )
_ resourcepb . ManagedObjectIndexServer = ( * searchSupport ) ( nil )
2024-11-27 13:57:53 +08:00
)
2025-07-24 03:59:24 +08:00
func newSearchSupport ( opts SearchOptions , storage StorageBackend , access types . AccessClient , blob BlobSupport , tracer trace . Tracer , indexMetrics * BleveIndexMetrics , ring * ring . Ring , ringLifecycler * ring . BasicLifecycler ) ( support * searchSupport , err error ) {
2024-11-21 13:53:25 +08:00
// No backend search support
if opts . Backend == nil {
return nil , nil
}
2025-02-12 01:57:46 +08:00
if tracer == nil {
return nil , fmt . Errorf ( "missing tracer" )
}
2024-11-21 13:53:25 +08:00
if opts . WorkerThreads < 1 {
opts . WorkerThreads = 1
}
support = & searchSupport {
2025-05-09 21:36:21 +08:00
access : access ,
tracer : tracer ,
storage : storage ,
search : opts . Backend ,
log : slog . Default ( ) . With ( "logger" , "resource-search" ) ,
initWorkers : opts . WorkerThreads ,
initMinSize : opts . InitMinCount ,
2025-06-27 20:00:39 +08:00
initMaxSize : opts . InitMaxCount ,
2025-05-09 21:36:21 +08:00
indexMetrics : indexMetrics ,
clientIndexEventsChan : opts . IndexEventsChan ,
indexEventsChan : make ( chan * IndexEvent ) ,
indexQueueProcessors : make ( map [ string ] * indexQueueProcessor ) ,
2025-06-13 03:34:48 +08:00
rebuildInterval : opts . RebuildInterval ,
2025-07-24 03:59:24 +08:00
ring : ring ,
ringLifecycler : ringLifecycler ,
2024-11-21 13:53:25 +08:00
}
info , err := opts . Resources . GetDocumentBuilders ( )
if err != nil {
return nil , err
}
support . builders , err = newBuilderCache ( info , 100 , time . Minute * 2 ) // TODO? opts
if support . builders != nil {
support . builders . blob = blob
}
return support , err
}
2025-05-16 03:36:52 +08:00
func ( s * searchSupport ) ListManagedObjects ( ctx context . Context , req * resourcepb . ListManagedObjectsRequest ) ( * resourcepb . ListManagedObjectsResponse , error ) {
2025-01-13 23:05:04 +08:00
if req . NextPageToken != "" {
2025-05-16 03:36:52 +08:00
return & resourcepb . ListManagedObjectsResponse {
2025-01-13 23:05:04 +08:00
Error : NewBadRequestError ( "multiple pages not yet supported" ) ,
2025-01-11 02:27:10 +08:00
} , nil
}
2025-01-13 23:05:04 +08:00
2025-05-16 03:36:52 +08:00
rsp := & resourcepb . ListManagedObjectsResponse { }
2025-01-13 23:05:04 +08:00
stats , err := s . storage . GetResourceStats ( ctx , req . Namespace , 0 )
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
for _ , info := range stats {
idx , err := s . getOrCreateIndex ( ctx , NamespacedResource {
Namespace : req . Namespace ,
Group : info . Group ,
Resource : info . Resource ,
} )
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
2025-03-11 00:48:53 +08:00
kind , err := idx . ListManagedObjects ( ctx , req )
2025-01-13 23:05:04 +08:00
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
if kind . NextPageToken != "" {
2025-05-16 03:36:52 +08:00
rsp . Error = & resourcepb . ErrorResult {
2025-01-13 23:05:04 +08:00
Message : "Multiple pages are not yet supported" ,
}
return rsp , nil
}
rsp . Items = append ( rsp . Items , kind . Items ... )
}
// Sort based on path
2025-05-16 03:36:52 +08:00
slices . SortFunc ( rsp . Items , func ( a , b * resourcepb . ListManagedObjectsResponse_Item ) int {
2025-01-13 23:05:04 +08:00
return cmp . Compare ( a . Path , b . Path )
} )
return rsp , nil
2024-11-27 13:57:53 +08:00
}
2025-05-16 03:36:52 +08:00
func ( s * searchSupport ) CountManagedObjects ( ctx context . Context , req * resourcepb . CountManagedObjectsRequest ) ( * resourcepb . CountManagedObjectsResponse , error ) {
rsp := & resourcepb . CountManagedObjectsResponse { }
2025-01-13 23:05:04 +08:00
stats , err := s . storage . GetResourceStats ( ctx , req . Namespace , 0 )
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
for _ , info := range stats {
idx , err := s . getOrCreateIndex ( ctx , NamespacedResource {
Namespace : req . Namespace ,
Group : info . Group ,
Resource : info . Resource ,
} )
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
2025-03-11 00:48:53 +08:00
counts , err := idx . CountManagedObjects ( ctx )
2025-01-13 23:05:04 +08:00
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
2025-03-11 00:48:53 +08:00
if req . Id == "" {
2025-01-13 23:05:04 +08:00
rsp . Items = append ( rsp . Items , counts ... )
} else {
for _ , k := range counts {
2025-03-11 00:48:53 +08:00
if k . Id == req . Id {
2025-01-13 23:05:04 +08:00
rsp . Items = append ( rsp . Items , k )
}
}
}
}
2025-03-11 00:48:53 +08:00
// Sort based on manager/group/resource
2025-05-16 03:36:52 +08:00
slices . SortFunc ( rsp . Items , func ( a , b * resourcepb . CountManagedObjectsResponse_ResourceCount ) int {
2025-01-13 23:05:04 +08:00
return cmp . Or (
2025-03-11 00:48:53 +08:00
cmp . Compare ( a . Kind , b . Kind ) ,
cmp . Compare ( a . Id , b . Id ) ,
2025-01-13 23:05:04 +08:00
cmp . Compare ( a . Group , b . Group ) ,
cmp . Compare ( a . Resource , b . Resource ) ,
)
} )
return rsp , nil
2024-11-27 13:57:53 +08:00
}
// Search implements ResourceIndexServer.
2025-05-16 03:36:52 +08:00
func ( s * searchSupport ) Search ( ctx context . Context , req * resourcepb . ResourceSearchRequest ) ( * resourcepb . ResourceSearchResponse , error ) {
2025-01-29 03:27:01 +08:00
ctx , span := s . tracer . Start ( ctx , tracingPrexfixSearch + "Search" )
defer span . End ( )
2024-11-27 13:57:53 +08:00
nsr := NamespacedResource {
Group : req . Options . Key . Group ,
Namespace : req . Options . Key . Namespace ,
Resource : req . Options . Key . Resource ,
}
idx , err := s . getOrCreateIndex ( ctx , nsr )
if err != nil {
2025-05-16 03:36:52 +08:00
return & resourcepb . ResourceSearchResponse {
2024-11-27 13:57:53 +08:00
Error : AsErrorResult ( err ) ,
} , nil
}
// Get the federated indexes
federate := make ( [ ] ResourceIndex , len ( req . Federated ) )
for i , f := range req . Federated {
nsr . Group = f . Group
nsr . Resource = f . Resource
federate [ i ] , err = s . getOrCreateIndex ( ctx , nsr )
if err != nil {
2025-05-16 03:36:52 +08:00
return & resourcepb . ResourceSearchResponse {
2024-11-27 13:57:53 +08:00
Error : AsErrorResult ( err ) ,
} , nil
}
}
return idx . Search ( ctx , s . access , req , federate )
}
2024-12-11 02:37:37 +08:00
// GetStats implements ResourceServer.
2025-05-16 03:36:52 +08:00
func ( s * searchSupport ) GetStats ( ctx context . Context , req * resourcepb . ResourceStatsRequest ) ( * resourcepb . ResourceStatsResponse , error ) {
2024-12-11 02:37:37 +08:00
if req . Namespace == "" {
2025-05-16 03:36:52 +08:00
return & resourcepb . ResourceStatsResponse {
2024-12-11 02:37:37 +08:00
Error : NewBadRequestError ( "missing namespace" ) ,
} , nil
}
2025-05-16 03:36:52 +08:00
rsp := & resourcepb . ResourceStatsResponse { }
2024-12-11 02:37:37 +08:00
// Explicit list of kinds
if len ( req . Kinds ) > 0 {
2025-05-16 03:36:52 +08:00
rsp . Stats = make ( [ ] * resourcepb . ResourceStatsResponse_Stats , len ( req . Kinds ) )
2024-12-11 02:37:37 +08:00
for i , k := range req . Kinds {
parts := strings . SplitN ( k , "/" , 2 )
index , err := s . getOrCreateIndex ( ctx , NamespacedResource {
Namespace : req . Namespace ,
Group : parts [ 0 ] ,
Resource : parts [ 1 ] ,
} )
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
count , err := index . DocCount ( ctx , req . Folder )
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
2025-05-16 03:36:52 +08:00
rsp . Stats [ i ] = & resourcepb . ResourceStatsResponse_Stats {
2024-12-11 02:37:37 +08:00
Group : parts [ 0 ] ,
Resource : parts [ 1 ] ,
Count : count ,
}
}
return rsp , nil
}
stats , err := s . storage . GetResourceStats ( ctx , req . Namespace , 0 )
if err != nil {
2025-05-16 03:36:52 +08:00
return & resourcepb . ResourceStatsResponse {
2024-12-11 02:37:37 +08:00
Error : AsErrorResult ( err ) ,
} , nil
}
2025-05-16 03:36:52 +08:00
rsp . Stats = make ( [ ] * resourcepb . ResourceStatsResponse_Stats , len ( stats ) )
2024-12-11 02:37:37 +08:00
// When not filtered by folder or repository, we can use the results directly
if req . Folder == "" {
for i , stat := range stats {
2025-05-16 03:36:52 +08:00
rsp . Stats [ i ] = & resourcepb . ResourceStatsResponse_Stats {
2024-12-11 02:37:37 +08:00
Group : stat . Group ,
Resource : stat . Resource ,
Count : stat . Count ,
}
}
return rsp , nil
}
for i , stat := range stats {
index , err := s . getOrCreateIndex ( ctx , NamespacedResource {
Namespace : req . Namespace ,
Group : stat . Group ,
Resource : stat . Resource ,
} )
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
count , err := index . DocCount ( ctx , req . Folder )
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
2025-05-16 03:36:52 +08:00
rsp . Stats [ i ] = & resourcepb . ResourceStatsResponse_Stats {
2024-12-11 02:37:37 +08:00
Group : stat . Group ,
Resource : stat . Resource ,
Count : count ,
}
}
return rsp , nil
}
2025-07-24 03:59:24 +08:00
func ( s * searchSupport ) shouldBuildIndex ( info ResourceStats ) bool {
if s . ring == nil {
s . log . Debug ( "ring is not setup. Will proceed to build index" )
return true
}
if s . ringLifecycler == nil {
s . log . Error ( "missing ring lifecycler" )
return true
}
ringHasher := fnv . New32a ( )
_ , err := ringHasher . Write ( [ ] byte ( info . Namespace ) )
if err != nil {
s . log . Error ( "error hashing namespace" , "namespace" , info . Namespace , "err" , err )
return true
}
rs , err := s . ring . GetWithOptions ( ringHasher . Sum32 ( ) , searchOwnerRead , ring . WithReplicationFactor ( s . ring . ReplicationFactor ( ) ) )
if err != nil {
s . log . Error ( "error getting replicaset from ring" , "namespace" , info . Namespace , "err" , err )
return true
}
return rs . Includes ( s . ringLifecycler . GetInstanceAddr ( ) )
}
2025-06-13 03:34:48 +08:00
func ( s * searchSupport ) buildIndexes ( ctx context . Context , rebuild bool ) ( int , error ) {
2024-11-21 13:53:25 +08:00
totalBatchesIndexed := 0
group := errgroup . Group { }
group . SetLimit ( s . initWorkers )
2024-12-05 18:58:13 +08:00
stats , err := s . storage . GetResourceStats ( ctx , "" , s . initMinSize )
2024-12-04 01:20:27 +08:00
if err != nil {
2025-06-13 03:34:48 +08:00
return 0 , err
2024-12-04 01:20:27 +08:00
}
for _ , info := range stats {
2025-06-13 03:34:48 +08:00
// only periodically rebuild the dashboard index, specifically to update the usage insights data
if rebuild && info . Resource != dashboardv1 . DASHBOARD_RESOURCE {
continue
}
2025-07-24 03:59:24 +08:00
if ! s . shouldBuildIndex ( info ) {
s . log . Debug ( "skip building index" , "namespace" , info . Namespace , "group" , info . Group , "resource" , info . Resource )
continue
}
2024-12-04 01:20:27 +08:00
group . Go ( func ( ) error {
2025-06-13 03:34:48 +08:00
if rebuild {
// we need to clear the cache to make sure we get the latest usage insights data
s . builders . clearNamespacedCache ( info . NamespacedResource )
}
2024-12-04 01:20:27 +08:00
totalBatchesIndexed ++
2025-06-27 20:00:39 +08:00
// If the count is too large, we need to set the index to empty.
// Only do this if the max size is set to a non-zero (default) value.
if s . initMaxSize > 0 && ( info . Count > int64 ( s . initMaxSize ) ) {
s . log . Info ( "setting empty index for resource with count greater than max size" , "namespace" , info . Namespace , "group" , info . Group , "resource" , info . Resource , "count" , info . Count , "maxSize" , s . initMaxSize )
_ , err := s . buildEmptyIndex ( ctx , info . NamespacedResource , info . ResourceVersion )
return err
}
s . log . Debug ( "building index" , "namespace" , info . Namespace , "group" , info . Group , "resource" , info . Resource )
2025-06-13 03:34:48 +08:00
_ , _ , err := s . build ( ctx , info . NamespacedResource , info . Count , info . ResourceVersion )
2024-12-04 01:20:27 +08:00
return err
} )
2024-11-21 13:53:25 +08:00
}
err = group . Wait ( )
2025-06-13 03:34:48 +08:00
if err != nil {
return totalBatchesIndexed , err
}
return totalBatchesIndexed , nil
}
func ( s * searchSupport ) init ( ctx context . Context ) error {
ctx , span := s . tracer . Start ( ctx , tracingPrexfixSearch + "Init" )
defer span . End ( )
start := time . Now ( ) . Unix ( )
totalBatchesIndexed , err := s . buildIndexes ( ctx , false )
2024-11-21 13:53:25 +08:00
if err != nil {
return err
}
2025-06-13 03:34:48 +08:00
2024-11-21 13:53:25 +08:00
span . AddEvent ( "namespaces indexed" , trace . WithAttributes ( attribute . Int ( "namespaced_indexed" , totalBatchesIndexed ) ) )
2024-11-27 13:57:53 +08:00
// Now start listening for new events
watchctx := context . Background ( ) // new context?
events , err := s . storage . WatchWriteEvents ( watchctx )
if err != nil {
return err
}
go func ( ) {
for {
v := <- events
2025-02-12 01:57:46 +08:00
// Skip events during batch updates
if v . PreviousRV < 0 {
continue
}
2025-05-09 21:36:21 +08:00
s . dispatchEvent ( watchctx , v )
2024-11-27 13:57:53 +08:00
}
} ( )
2024-11-21 13:53:25 +08:00
2025-05-09 21:36:21 +08:00
go s . monitorIndexEvents ( ctx )
2025-06-13 03:34:48 +08:00
// since usage insights is not in unified storage, we need to periodically rebuild the index
// to make sure these data points are up to date.
if s . rebuildInterval > 0 {
go s . startPeriodicRebuild ( watchctx )
}
2024-12-05 05:02:40 +08:00
end := time . Now ( ) . Unix ( )
2024-12-10 12:32:19 +08:00
s . log . Info ( "search index initialized" , "duration_secs" , end - start , "total_docs" , s . search . TotalDocs ( ) )
2025-03-13 22:09:38 +08:00
if s . indexMetrics != nil {
s . indexMetrics . IndexCreationTime . WithLabelValues ( ) . Observe ( float64 ( end - start ) )
2024-12-05 05:02:40 +08:00
}
2024-11-21 13:53:25 +08:00
return nil
}
2025-05-09 21:36:21 +08:00
// Async event dispatching
// This is called from the watch event loop
// It will dispatch the event to the appropriate index queue processor
func ( s * searchSupport ) dispatchEvent ( ctx context . Context , evt * WrittenEvent ) {
ctx , span := s . tracer . Start ( ctx , tracingPrexfixSearch + "dispatchEvent" )
2025-01-29 02:30:20 +08:00
defer span . End ( )
span . SetAttributes (
attribute . String ( "event_type" , evt . Type . String ( ) ) ,
attribute . String ( "namespace" , evt . Key . Namespace ) ,
attribute . String ( "group" , evt . Key . Group ) ,
attribute . String ( "resource" , evt . Key . Resource ) ,
attribute . String ( "name" , evt . Key . Name ) ,
)
2024-12-05 05:02:40 +08:00
2025-05-09 21:36:21 +08:00
switch evt . Type {
2025-05-16 03:36:52 +08:00
case resourcepb . WatchEvent_ADDED , resourcepb . WatchEvent_MODIFIED , resourcepb . WatchEvent_DELETED : // OK
2025-05-09 21:36:21 +08:00
default :
s . log . Info ( "ignoring watch event" , "type" , evt . Type )
span . AddEvent ( "ignoring watch event" , trace . WithAttributes ( attribute . String ( "type" , evt . Type . String ( ) ) ) )
}
2024-11-27 13:57:53 +08:00
nsr := NamespacedResource {
Namespace : evt . Key . Namespace ,
Group : evt . Key . Group ,
Resource : evt . Key . Resource ,
}
index , err := s . getOrCreateIndex ( ctx , nsr )
if err != nil {
s . log . Warn ( "error getting index for watch event" , "error" , err )
2025-05-09 21:36:21 +08:00
span . RecordError ( err )
2024-11-27 13:57:53 +08:00
return
}
2025-05-09 21:36:21 +08:00
// Get or create index queue processor for this index
indexQueueProcessor , err := s . getOrCreateIndexQueueProcessor ( index , nsr )
2024-11-27 13:57:53 +08:00
if err != nil {
2025-05-09 21:36:21 +08:00
s . log . Error ( "error getting index queue processor for watch event" , "error" , err )
span . RecordError ( err )
2024-11-27 13:57:53 +08:00
return
}
2025-05-09 21:36:21 +08:00
indexQueueProcessor . Add ( evt )
}
2024-11-27 13:57:53 +08:00
2025-05-09 21:36:21 +08:00
func ( s * searchSupport ) monitorIndexEvents ( ctx context . Context ) {
var evt * IndexEvent
for {
select {
case <- ctx . Done ( ) :
2024-12-05 05:02:40 +08:00
return
2025-05-09 21:36:21 +08:00
case evt = <- s . indexEventsChan :
2024-12-05 05:02:40 +08:00
}
2025-05-09 21:36:21 +08:00
if evt . Err != nil {
s . log . Error ( "error indexing watch event" , "error" , evt . Err )
continue
2024-12-05 05:02:40 +08:00
}
2025-05-09 21:36:21 +08:00
_ , span := s . tracer . Start ( ctx , tracingPrexfixSearch + "monitorIndexEvents" )
defer span . End ( )
// record latency from when event was created to when it was indexed
span . AddEvent ( "index latency" , trace . WithAttributes ( attribute . Float64 ( "latency_seconds" , evt . Latency . Seconds ( ) ) ) )
s . log . Debug ( "indexed new object" , "resource" , evt . WrittenEvent . Key . Resource , "latency_seconds" , evt . Latency . Seconds ( ) , "name" , evt . WrittenEvent . Key . Name , "namespace" , evt . WrittenEvent . Key . Namespace , "rv" , evt . WrittenEvent . ResourceVersion )
if evt . Latency . Seconds ( ) > 1 {
s . log . Warn ( "high index latency object details" , "resource" , evt . WrittenEvent . Key . Resource , "latency_seconds" , evt . Latency . Seconds ( ) , "name" , evt . WrittenEvent . Key . Name , "namespace" , evt . WrittenEvent . Key . Namespace , "rv" , evt . WrittenEvent . ResourceVersion )
2024-12-05 05:02:40 +08:00
}
2025-03-13 22:09:38 +08:00
if s . indexMetrics != nil {
2025-05-09 21:36:21 +08:00
s . indexMetrics . IndexLatency . WithLabelValues ( evt . WrittenEvent . Key . Resource ) . Observe ( evt . Latency . Seconds ( ) )
}
if s . clientIndexEventsChan != nil {
s . clientIndexEventsChan <- evt
2025-03-13 22:09:38 +08:00
}
2024-11-27 13:57:53 +08:00
}
}
2025-06-13 03:34:48 +08:00
func ( s * searchSupport ) startPeriodicRebuild ( ctx context . Context ) {
ticker := time . NewTicker ( s . rebuildInterval )
defer ticker . Stop ( )
s . log . Info ( "starting periodic index rebuild" , "interval" , s . rebuildInterval )
for {
select {
case <- ctx . Done ( ) :
s . log . Info ( "stopping periodic index rebuild due to context cancellation" )
return
case <- ticker . C :
s . log . Info ( "starting periodic index rebuild" )
if err := s . rebuildDashboardIndexes ( ctx ) ; err != nil {
s . log . Error ( "error during periodic index rebuild" , "error" , err )
} else {
s . log . Info ( "periodic index rebuild completed successfully" )
}
}
}
}
func ( s * searchSupport ) rebuildDashboardIndexes ( ctx context . Context ) error {
ctx , span := s . tracer . Start ( ctx , tracingPrexfixSearch + "RebuildDashboardIndexes" )
defer span . End ( )
start := time . Now ( )
s . log . Info ( "rebuilding all search indexes" )
totalBatchesIndexed , err := s . buildIndexes ( ctx , true )
if err != nil {
return fmt . Errorf ( "failed to rebuild dashboard indexes: %w" , err )
}
end := time . Now ( )
duration := end . Sub ( start )
s . log . Info ( "completed rebuilding all dashboard search indexes" ,
"duration" , duration ,
"rebuilt_indexes" , totalBatchesIndexed ,
"total_docs" , s . search . TotalDocs ( ) )
if s . indexMetrics != nil {
s . indexMetrics . IndexCreationTime . WithLabelValues ( ) . Observe ( duration . Seconds ( ) )
}
return nil
}
2024-11-27 13:57:53 +08:00
func ( s * searchSupport ) getOrCreateIndex ( ctx context . Context , key NamespacedResource ) ( ResourceIndex , error ) {
2025-01-13 23:05:04 +08:00
if s == nil || s . search == nil {
return nil , fmt . Errorf ( "search is not configured properly (missing unifiedStorageSearch feature toggle?)" )
}
2025-01-29 03:27:01 +08:00
ctx , span := s . tracer . Start ( ctx , tracingPrexfixSearch + "GetOrCreateIndex" )
defer span . End ( )
2024-11-27 13:57:53 +08:00
idx , err := s . search . GetIndex ( ctx , key )
if err != nil {
return nil , err
}
2025-07-10 19:54:10 +08:00
if idx != nil {
return idx , nil
}
2025-07-29 23:40:16 +08:00
ch := s . buildIndex . DoChan ( key . String ( ) , func ( ) ( interface { } , error ) {
2025-07-10 19:54:10 +08:00
// Recheck if some other goroutine managed to build an index in the meantime.
// (That is, it finished running this function and stored the index into the cache)
idx , err := s . search . GetIndex ( ctx , key )
if err == nil && idx != nil {
return idx , nil
}
// Get correct value of size + RV for building the index. This is important for our Bleve
// backend to decide whether to build index in-memory or as file-based.
stats , err := s . storage . GetResourceStats ( ctx , key . Namespace , 0 )
if err != nil {
return nil , fmt . Errorf ( "failed to get resource stats: %w" , err )
}
size := int64 ( 0 )
rv := int64 ( 0 )
for _ , stat := range stats {
if stat . Namespace == key . Namespace && stat . Group == key . Group && stat . Resource == key . Resource {
size = stat . Count
rv = stat . ResourceVersion
break
}
}
idx , _ , err = s . build ( ctx , key , size , rv )
2024-11-27 13:57:53 +08:00
if err != nil {
2025-02-12 01:57:46 +08:00
return nil , fmt . Errorf ( "error building search index, %w" , err )
2024-11-27 13:57:53 +08:00
}
if idx == nil {
return nil , fmt . Errorf ( "nil index after build" )
}
2025-07-10 19:54:10 +08:00
return idx , nil
} )
2025-07-29 23:40:16 +08:00
select {
case res := <- ch :
if res . Err != nil {
return nil , res . Err
}
return res . Val . ( ResourceIndex ) , nil
case <- ctx . Done ( ) :
return nil , fmt . Errorf ( "failed to get index: %w" , ctx . Err ( ) )
2024-11-27 13:57:53 +08:00
}
}
2024-11-22 21:44:06 +08:00
func ( s * searchSupport ) build ( ctx context . Context , nsr NamespacedResource , size int64 , rv int64 ) ( ResourceIndex , int64 , error ) {
2024-12-10 12:32:19 +08:00
ctx , span := s . tracer . Start ( ctx , tracingPrexfixSearch + "Build" )
2024-11-21 13:53:25 +08:00
defer span . End ( )
2025-05-27 03:33:51 +08:00
logger := s . log . With ( "namespace" , nsr . Namespace , "group" , nsr . Group , "resource" , nsr . Resource )
2024-11-21 13:53:25 +08:00
builder , err := s . builders . get ( ctx , nsr )
if err != nil {
return nil , 0 , err
}
2024-11-22 21:44:06 +08:00
fields := s . builders . GetFields ( nsr )
2024-11-21 13:53:25 +08:00
2024-11-22 21:44:06 +08:00
index , err := s . search . BuildIndex ( ctx , nsr , size , rv , fields , func ( index ResourceIndex ) ( int64 , error ) {
2025-05-16 03:36:52 +08:00
rv , err = s . storage . ListIterator ( ctx , & resourcepb . ListRequest {
2024-11-22 21:44:06 +08:00
Limit : 1000000000000 , // big number
2025-05-16 03:36:52 +08:00
Options : & resourcepb . ListOptions {
Key : & resourcepb . ResourceKey {
2025-05-12 13:56:25 +08:00
Group : nsr . Group ,
Resource : nsr . Resource ,
Namespace : nsr . Namespace ,
} ,
2024-11-22 21:44:06 +08:00
} ,
} , func ( iter ListIterator ) error {
2025-05-27 15:48:39 +08:00
// Process documents in batches to avoid memory issues
// When dealing with large collections (e.g., 100k+ documents),
// loading all documents into memory at once can cause OOM errors.
items := make ( [ ] * BulkIndexItem , 0 , maxBatchSize )
2025-05-09 21:36:21 +08:00
2024-11-22 21:44:06 +08:00
for iter . Next ( ) {
if err = iter . Error ( ) ; err != nil {
return err
}
// Update the key name
2025-05-16 03:36:52 +08:00
key := & resourcepb . ResourceKey {
2025-05-12 13:56:25 +08:00
Group : nsr . Group ,
Resource : nsr . Resource ,
Namespace : nsr . Namespace ,
Name : iter . Name ( ) ,
}
2024-11-22 21:44:06 +08:00
// Convert it to an indexable document
doc , err := builder . BuildDocument ( ctx , key , iter . ResourceVersion ( ) , iter . Value ( ) )
if err != nil {
2025-05-27 03:33:51 +08:00
logger . Error ( "error building search document" , "key" , SearchID ( key ) , "err" , err )
2025-02-12 01:57:46 +08:00
continue
2024-11-22 21:44:06 +08:00
}
2025-05-09 21:36:21 +08:00
// Add to bulk items
items = append ( items , & BulkIndexItem {
Action : ActionIndex ,
Doc : doc ,
} )
2025-05-27 15:48:39 +08:00
// When we reach the batch size, perform bulk index and reset the batch.
if len ( items ) >= maxBatchSize {
if err = index . BulkIndex ( & BulkIndexRequest {
Items : items ,
} ) ; err != nil {
return err
}
// Reset the slice for the next batch while preserving capacity.
items = items [ : 0 ]
}
2025-05-09 21:36:21 +08:00
}
2025-05-27 15:48:39 +08:00
// Index any remaining items in the final batch.
2025-05-09 21:36:21 +08:00
if len ( items ) > 0 {
if err = index . BulkIndex ( & BulkIndexRequest {
Items : items ,
} ) ; err != nil {
2024-11-22 21:44:06 +08:00
return err
}
}
2025-04-11 22:25:40 +08:00
return iter . Error ( )
2024-11-22 21:44:06 +08:00
} )
return rv , err
} )
2024-12-06 03:14:04 +08:00
if err != nil {
return nil , 0 , err
}
2024-12-05 05:02:40 +08:00
// Record the number of objects indexed for the kind/resource
2024-12-11 02:37:37 +08:00
docCount , err := index . DocCount ( ctx , "" )
2024-12-05 05:02:40 +08:00
if err != nil {
2025-05-27 03:33:51 +08:00
logger . Warn ( "error getting doc count" , "error" , err )
2024-12-05 05:02:40 +08:00
}
2025-03-13 22:09:38 +08:00
if s . indexMetrics != nil {
2025-05-12 13:56:25 +08:00
s . indexMetrics . IndexedKinds . WithLabelValues ( nsr . Resource ) . Add ( float64 ( docCount ) )
2024-12-05 05:02:40 +08:00
}
2024-11-22 21:44:06 +08:00
// rv is the last RV we read. when watching, we must add all events since that time
return index , rv , err
2024-11-21 13:53:25 +08:00
}
2025-06-27 20:00:39 +08:00
// buildEmptyIndex creates an empty index without adding any documents
func ( s * searchSupport ) buildEmptyIndex ( ctx context . Context , nsr NamespacedResource , rv int64 ) ( ResourceIndex , error ) {
ctx , span := s . tracer . Start ( ctx , tracingPrexfixSearch + "BuildEmptyIndex" )
defer span . End ( )
fields := s . builders . GetFields ( nsr )
s . log . Debug ( "Building empty index" , "namespace" , nsr . Namespace , "group" , nsr . Group , "resource" , nsr . Resource , "rv" , rv )
// Build an empty index by passing a builder function that doesn't add any documents
return s . search . BuildIndex ( ctx , nsr , 0 , rv , fields , func ( index ResourceIndex ) ( int64 , error ) {
// Return the resource version without adding any documents to the index
return rv , nil
} )
}
2024-11-21 13:53:25 +08:00
type builderCache struct {
// The default builder
defaultBuilder DocumentBuilder
// Possible blob support
blob BlobSupport
2024-11-22 21:44:06 +08:00
// searchable fields initialized once on startup
fields map [ schema . GroupResource ] SearchableDocumentFields
2024-11-21 13:53:25 +08:00
// lookup by group, then resource (namespace)
// This is only modified at startup, so we do not need mutex for access
lookup map [ string ] map [ string ] DocumentBuilderInfo
// For namespaced based resources that require a cache
ns * expirable . LRU [ NamespacedResource , DocumentBuilder ]
mu sync . Mutex // only locked for a cache miss
}
func newBuilderCache ( cfg [ ] DocumentBuilderInfo , nsCacheSize int , ttl time . Duration ) ( * builderCache , error ) {
cache := & builderCache {
2024-11-22 21:44:06 +08:00
fields : make ( map [ schema . GroupResource ] SearchableDocumentFields ) ,
2024-11-21 13:53:25 +08:00
lookup : make ( map [ string ] map [ string ] DocumentBuilderInfo ) ,
ns : expirable . NewLRU [ NamespacedResource , DocumentBuilder ] ( nsCacheSize , nil , ttl ) ,
}
if len ( cfg ) == 0 {
return cache , fmt . Errorf ( "no builders configured" )
}
for _ , b := range cfg {
// the default
if b . GroupResource . Group == "" && b . GroupResource . Resource == "" {
if b . Builder == nil {
return cache , fmt . Errorf ( "default document builder is missing" )
}
cache . defaultBuilder = b . Builder
continue
}
g , ok := cache . lookup [ b . GroupResource . Group ]
if ! ok {
g = make ( map [ string ] DocumentBuilderInfo )
cache . lookup [ b . GroupResource . Group ] = g
}
g [ b . GroupResource . Resource ] = b
2024-11-22 21:44:06 +08:00
// Any custom fields
cache . fields [ b . GroupResource ] = b . Fields
2024-11-21 13:53:25 +08:00
}
return cache , nil
}
2024-11-22 21:44:06 +08:00
func ( s * builderCache ) GetFields ( key NamespacedResource ) SearchableDocumentFields {
return s . fields [ schema . GroupResource { Group : key . Group , Resource : key . Resource } ]
}
2024-11-21 13:53:25 +08:00
// context is typically background. Holds an LRU cache for a
func ( s * builderCache ) get ( ctx context . Context , key NamespacedResource ) ( DocumentBuilder , error ) {
g , ok := s . lookup [ key . Group ]
if ok {
r , ok := g [ key . Resource ]
if ok {
if r . Builder != nil {
return r . Builder , nil
}
// The builder needs context
builder , ok := s . ns . Get ( key )
if ok {
return builder , nil
}
{
s . mu . Lock ( )
defer s . mu . Unlock ( )
b , err := r . Namespaced ( ctx , key . Namespace , s . blob )
if err == nil {
_ = s . ns . Add ( key , b )
}
return b , err
}
}
}
return s . defaultBuilder , nil
}
2025-02-07 00:30:47 +08:00
// AsResourceKey converts the given namespace and type to a search key
2025-05-16 03:36:52 +08:00
func AsResourceKey ( ns string , t string ) ( * resourcepb . ResourceKey , error ) {
2025-02-07 00:30:47 +08:00
if ns == "" {
return nil , fmt . Errorf ( "missing namespace" )
}
switch t {
case "folders" , "folder" :
2025-05-16 03:36:52 +08:00
return & resourcepb . ResourceKey {
2025-02-07 00:30:47 +08:00
Namespace : ns ,
2025-04-11 20:09:52 +08:00
Group : folders . GROUP ,
Resource : folders . RESOURCE ,
2025-02-07 00:30:47 +08:00
} , nil
case "dashboards" , "dashboard" :
2025-05-16 03:36:52 +08:00
return & resourcepb . ResourceKey {
2025-02-07 00:30:47 +08:00
Namespace : ns ,
2025-04-11 20:09:52 +08:00
Group : dashboardv1 . GROUP ,
Resource : dashboardv1 . DASHBOARD_RESOURCE ,
2025-02-07 00:30:47 +08:00
} , nil
// NOT really supported in the dashboard search UI, but useful for manual testing
case "playlist" , "playlists" :
2025-05-16 03:36:52 +08:00
return & resourcepb . ResourceKey {
2025-02-07 00:30:47 +08:00
Namespace : ns ,
Group : "playlist.grafana.app" ,
Resource : "playlists" ,
} , nil
}
return nil , fmt . Errorf ( "unknown resource type" )
}
2025-05-09 21:36:21 +08:00
// getOrCreateIndexQueueProcessor returns an IndexQueueProcessor for the given index
func ( s * searchSupport ) getOrCreateIndexQueueProcessor ( index ResourceIndex , nsr NamespacedResource ) ( * indexQueueProcessor , error ) {
s . indexQueueProcessorsMutex . Lock ( )
defer s . indexQueueProcessorsMutex . Unlock ( )
key := fmt . Sprintf ( "%s/%s/%s" , nsr . Namespace , nsr . Group , nsr . Resource )
if indexQueueProcessor , ok := s . indexQueueProcessors [ key ] ; ok {
return indexQueueProcessor , nil
}
builder , err := s . builders . get ( context . Background ( ) , nsr )
if err != nil {
s . log . Error ( "error getting document builder" , "error" , err )
return nil , err
}
indexQueueProcessor := newIndexQueueProcessor ( index , nsr , maxBatchSize , builder , s . indexEventsChan )
s . indexQueueProcessors [ key ] = indexQueueProcessor
return indexQueueProcessor , nil
}
2025-06-13 03:34:48 +08:00
func ( s * builderCache ) clearNamespacedCache ( key NamespacedResource ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
s . ns . Remove ( key )
}
2025-06-27 20:00:39 +08:00
// Test utilities for document building
// testDocumentBuilder implements DocumentBuilder for testing
type testDocumentBuilder struct { }
func ( b * testDocumentBuilder ) BuildDocument ( ctx context . Context , key * resourcepb . ResourceKey , rv int64 , value [ ] byte ) ( * IndexableDocument , error ) {
// convert value to unstructured.Unstructured
var u unstructured . Unstructured
if err := u . UnmarshalJSON ( value ) ; err != nil {
return nil , fmt . Errorf ( "failed to unmarshal value: %w" , err )
}
title := ""
tags := [ ] string { }
val := ""
spec , ok , _ := unstructured . NestedMap ( u . Object , "spec" )
if ok {
if v , ok := spec [ "title" ] ; ok {
title = v . ( string )
}
if v , ok := spec [ "tags" ] ; ok {
if tagSlice , ok := v . ( [ ] interface { } ) ; ok {
tags = make ( [ ] string , len ( tagSlice ) )
for i , tag := range tagSlice {
if strTag , ok := tag . ( string ) ; ok {
tags [ i ] = strTag
}
}
}
}
if v , ok := spec [ "value" ] ; ok {
val = v . ( string )
}
}
return & IndexableDocument {
Key : & resourcepb . ResourceKey {
Namespace : key . Namespace ,
Group : key . Group ,
Resource : key . Resource ,
Name : u . GetName ( ) ,
} ,
Title : title ,
Tags : tags ,
Fields : map [ string ] interface { } {
"value" : val ,
} ,
} , nil
}
// TestDocumentBuilderSupplier implements DocumentBuilderSupplier for testing
type TestDocumentBuilderSupplier struct {
GroupsResources map [ string ] string
}
func ( s * TestDocumentBuilderSupplier ) GetDocumentBuilders ( ) ( [ ] DocumentBuilderInfo , error ) {
builders := make ( [ ] DocumentBuilderInfo , 0 , len ( s . GroupsResources ) )
// Add builders for all possible group/resource combinations
for group , resourceType := range s . GroupsResources {
builders = append ( builders , DocumentBuilderInfo {
GroupResource : schema . GroupResource {
Group : group ,
Resource : resourceType ,
} ,
Builder : & testDocumentBuilder { } ,
} )
}
return builders , nil
}