2024-11-21 13:53:25 +08:00
package resource
import (
2025-01-13 23:05:04 +08:00
"cmp"
2024-11-21 13:53:25 +08:00
"context"
"fmt"
"log/slog"
2024-12-05 05:02:40 +08:00
"slices"
2024-12-11 02:37:37 +08:00
"strings"
2024-11-21 13:53:25 +08:00
"sync"
"time"
"github.com/hashicorp/golang-lru/v2/expirable"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/runtime/schema"
2024-11-22 21:44:06 +08:00
2025-01-21 17:06:55 +08:00
"github.com/grafana/authlib/types"
2025-05-16 03:36:52 +08:00
2025-04-24 01:54:35 +08:00
dashboardv1 "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v1beta1"
2025-04-15 04:20:10 +08:00
folders "github.com/grafana/grafana/apps/folder/pkg/apis/folder/v1beta1"
2025-05-16 03:36:52 +08:00
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
2024-11-21 13:53:25 +08:00
)
2025-05-09 21:36:21 +08:00
const maxBatchSize = 1000
2024-11-21 13:53:25 +08:00
type NamespacedResource struct {
Namespace string
Group string
Resource string
}
2024-11-22 21:44:06 +08:00
// All fields are set
func ( s * NamespacedResource ) Valid ( ) bool {
return s . Namespace != "" && s . Group != "" && s . Resource != ""
}
2025-05-09 21:36:21 +08:00
type IndexAction int
2024-11-22 21:44:06 +08:00
2025-05-09 21:36:21 +08:00
const (
ActionIndex IndexAction = iota
ActionDelete
)
2024-11-22 21:44:06 +08:00
2025-05-09 21:36:21 +08:00
type BulkIndexItem struct {
Action IndexAction
2025-05-16 03:36:52 +08:00
Key * resourcepb . ResourceKey // Only used for delete actions
Doc * IndexableDocument // Only used for index actions
2025-05-09 21:36:21 +08:00
}
type BulkIndexRequest struct {
Items [ ] * BulkIndexItem
ResourceVersion int64
}
type ResourceIndex interface {
// BulkIndex allows for multiple index actions to be performed in a single call.
// The order of the items is guaranteed to be the same as the input
BulkIndex ( req * BulkIndexRequest ) error
2024-11-22 21:44:06 +08:00
// Search within a namespaced resource
// When working with federated queries, the additional indexes will be passed in explicitly
2025-05-16 03:36:52 +08:00
Search ( ctx context . Context , access types . AccessClient , req * resourcepb . ResourceSearchRequest , federate [ ] ResourceIndex ) ( * resourcepb . ResourceSearchResponse , error )
2024-11-22 21:44:06 +08:00
2025-01-11 02:27:10 +08:00
// List within an response
2025-05-16 03:36:52 +08:00
ListManagedObjects ( ctx context . Context , req * resourcepb . ListManagedObjectsRequest ) ( * resourcepb . ListManagedObjectsResponse , error )
2025-01-11 02:27:10 +08:00
// Counts the values in a repo
2025-05-16 03:36:52 +08:00
CountManagedObjects ( ctx context . Context ) ( [ ] * resourcepb . CountManagedObjectsResponse_ResourceCount , error )
2024-12-05 05:02:40 +08:00
// Get the number of documents in the index
2024-12-11 02:37:37 +08:00
DocCount ( ctx context . Context , folder string ) ( int64 , error )
2024-11-22 21:44:06 +08:00
}
// SearchBackend contains the technology specific logic to support search
2024-11-21 13:53:25 +08:00
type SearchBackend interface {
2024-11-22 21:44:06 +08:00
// This will return nil if the key does not exist
GetIndex ( ctx context . Context , key NamespacedResource ) ( ResourceIndex , error )
// Build an index from scratch
BuildIndex ( ctx context . Context ,
key NamespacedResource ,
// When the size is known, it will be passed along here
// Depending on the size, the backend may choose different options (eg: memory vs disk)
size int64 ,
// The last known resource version (can be used to know that nothing has changed)
resourceVersion int64 ,
// The non-standard index fields
fields SearchableDocumentFields ,
// The builder will write all documents before returning
builder func ( index ResourceIndex ) ( int64 , error ) ,
) ( ResourceIndex , error )
2024-12-05 05:02:40 +08:00
// Gets the total number of documents across all indexes
TotalDocs ( ) int64
2024-11-21 13:53:25 +08:00
}
const tracingPrexfixSearch = "unified_search."
// This supports indexing+search regardless of implementation
type searchSupport struct {
2025-03-13 22:09:38 +08:00
tracer trace . Tracer
log * slog . Logger
storage StorageBackend
search SearchBackend
indexMetrics * BleveIndexMetrics
access types . AccessClient
builders * builderCache
initWorkers int
initMinSize int
2025-05-09 21:36:21 +08:00
// Index queue processors
indexQueueProcessorsMutex sync . Mutex
indexQueueProcessors map [ string ] * indexQueueProcessor
indexEventsChan chan * IndexEvent
// testing
clientIndexEventsChan chan * IndexEvent
2024-11-21 13:53:25 +08:00
}
2024-11-27 13:57:53 +08:00
var (
2025-05-16 03:36:52 +08:00
_ resourcepb . ResourceIndexServer = ( * searchSupport ) ( nil )
_ resourcepb . ManagedObjectIndexServer = ( * searchSupport ) ( nil )
2024-11-27 13:57:53 +08:00
)
2025-03-13 22:09:38 +08:00
func newSearchSupport ( opts SearchOptions , storage StorageBackend , access types . AccessClient , blob BlobSupport , tracer trace . Tracer , indexMetrics * BleveIndexMetrics ) ( support * searchSupport , err error ) {
2024-11-21 13:53:25 +08:00
// No backend search support
if opts . Backend == nil {
return nil , nil
}
2025-02-12 01:57:46 +08:00
if tracer == nil {
return nil , fmt . Errorf ( "missing tracer" )
}
2024-11-21 13:53:25 +08:00
if opts . WorkerThreads < 1 {
opts . WorkerThreads = 1
}
support = & searchSupport {
2025-05-09 21:36:21 +08:00
access : access ,
tracer : tracer ,
storage : storage ,
search : opts . Backend ,
log : slog . Default ( ) . With ( "logger" , "resource-search" ) ,
initWorkers : opts . WorkerThreads ,
initMinSize : opts . InitMinCount ,
indexMetrics : indexMetrics ,
clientIndexEventsChan : opts . IndexEventsChan ,
indexEventsChan : make ( chan * IndexEvent ) ,
indexQueueProcessors : make ( map [ string ] * indexQueueProcessor ) ,
2024-11-21 13:53:25 +08:00
}
info , err := opts . Resources . GetDocumentBuilders ( )
if err != nil {
return nil , err
}
support . builders , err = newBuilderCache ( info , 100 , time . Minute * 2 ) // TODO? opts
if support . builders != nil {
support . builders . blob = blob
}
return support , err
}
2025-05-16 03:36:52 +08:00
func ( s * searchSupport ) ListManagedObjects ( ctx context . Context , req * resourcepb . ListManagedObjectsRequest ) ( * resourcepb . ListManagedObjectsResponse , error ) {
2025-01-13 23:05:04 +08:00
if req . NextPageToken != "" {
2025-05-16 03:36:52 +08:00
return & resourcepb . ListManagedObjectsResponse {
2025-01-13 23:05:04 +08:00
Error : NewBadRequestError ( "multiple pages not yet supported" ) ,
2025-01-11 02:27:10 +08:00
} , nil
}
2025-01-13 23:05:04 +08:00
2025-05-16 03:36:52 +08:00
rsp := & resourcepb . ListManagedObjectsResponse { }
2025-01-13 23:05:04 +08:00
stats , err := s . storage . GetResourceStats ( ctx , req . Namespace , 0 )
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
for _ , info := range stats {
idx , err := s . getOrCreateIndex ( ctx , NamespacedResource {
Namespace : req . Namespace ,
Group : info . Group ,
Resource : info . Resource ,
} )
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
2025-03-11 00:48:53 +08:00
kind , err := idx . ListManagedObjects ( ctx , req )
2025-01-13 23:05:04 +08:00
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
if kind . NextPageToken != "" {
2025-05-16 03:36:52 +08:00
rsp . Error = & resourcepb . ErrorResult {
2025-01-13 23:05:04 +08:00
Message : "Multiple pages are not yet supported" ,
}
return rsp , nil
}
rsp . Items = append ( rsp . Items , kind . Items ... )
}
// Sort based on path
2025-05-16 03:36:52 +08:00
slices . SortFunc ( rsp . Items , func ( a , b * resourcepb . ListManagedObjectsResponse_Item ) int {
2025-01-13 23:05:04 +08:00
return cmp . Compare ( a . Path , b . Path )
} )
return rsp , nil
2024-11-27 13:57:53 +08:00
}
2025-05-16 03:36:52 +08:00
func ( s * searchSupport ) CountManagedObjects ( ctx context . Context , req * resourcepb . CountManagedObjectsRequest ) ( * resourcepb . CountManagedObjectsResponse , error ) {
rsp := & resourcepb . CountManagedObjectsResponse { }
2025-01-13 23:05:04 +08:00
stats , err := s . storage . GetResourceStats ( ctx , req . Namespace , 0 )
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
for _ , info := range stats {
idx , err := s . getOrCreateIndex ( ctx , NamespacedResource {
Namespace : req . Namespace ,
Group : info . Group ,
Resource : info . Resource ,
} )
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
2025-03-11 00:48:53 +08:00
counts , err := idx . CountManagedObjects ( ctx )
2025-01-13 23:05:04 +08:00
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
2025-03-11 00:48:53 +08:00
if req . Id == "" {
2025-01-13 23:05:04 +08:00
rsp . Items = append ( rsp . Items , counts ... )
} else {
for _ , k := range counts {
2025-03-11 00:48:53 +08:00
if k . Id == req . Id {
2025-01-13 23:05:04 +08:00
rsp . Items = append ( rsp . Items , k )
}
}
}
}
2025-03-11 00:48:53 +08:00
// Sort based on manager/group/resource
2025-05-16 03:36:52 +08:00
slices . SortFunc ( rsp . Items , func ( a , b * resourcepb . CountManagedObjectsResponse_ResourceCount ) int {
2025-01-13 23:05:04 +08:00
return cmp . Or (
2025-03-11 00:48:53 +08:00
cmp . Compare ( a . Kind , b . Kind ) ,
cmp . Compare ( a . Id , b . Id ) ,
2025-01-13 23:05:04 +08:00
cmp . Compare ( a . Group , b . Group ) ,
cmp . Compare ( a . Resource , b . Resource ) ,
)
} )
return rsp , nil
2024-11-27 13:57:53 +08:00
}
// Search implements ResourceIndexServer.
2025-05-16 03:36:52 +08:00
func ( s * searchSupport ) Search ( ctx context . Context , req * resourcepb . ResourceSearchRequest ) ( * resourcepb . ResourceSearchResponse , error ) {
2025-01-29 03:27:01 +08:00
ctx , span := s . tracer . Start ( ctx , tracingPrexfixSearch + "Search" )
defer span . End ( )
2024-11-27 13:57:53 +08:00
nsr := NamespacedResource {
Group : req . Options . Key . Group ,
Namespace : req . Options . Key . Namespace ,
Resource : req . Options . Key . Resource ,
}
idx , err := s . getOrCreateIndex ( ctx , nsr )
if err != nil {
2025-05-16 03:36:52 +08:00
return & resourcepb . ResourceSearchResponse {
2024-11-27 13:57:53 +08:00
Error : AsErrorResult ( err ) ,
} , nil
}
// Get the federated indexes
federate := make ( [ ] ResourceIndex , len ( req . Federated ) )
for i , f := range req . Federated {
nsr . Group = f . Group
nsr . Resource = f . Resource
federate [ i ] , err = s . getOrCreateIndex ( ctx , nsr )
if err != nil {
2025-05-16 03:36:52 +08:00
return & resourcepb . ResourceSearchResponse {
2024-11-27 13:57:53 +08:00
Error : AsErrorResult ( err ) ,
} , nil
}
}
return idx . Search ( ctx , s . access , req , federate )
}
2024-12-11 02:37:37 +08:00
// GetStats implements ResourceServer.
2025-05-16 03:36:52 +08:00
func ( s * searchSupport ) GetStats ( ctx context . Context , req * resourcepb . ResourceStatsRequest ) ( * resourcepb . ResourceStatsResponse , error ) {
2024-12-11 02:37:37 +08:00
if req . Namespace == "" {
2025-05-16 03:36:52 +08:00
return & resourcepb . ResourceStatsResponse {
2024-12-11 02:37:37 +08:00
Error : NewBadRequestError ( "missing namespace" ) ,
} , nil
}
2025-05-16 03:36:52 +08:00
rsp := & resourcepb . ResourceStatsResponse { }
2024-12-11 02:37:37 +08:00
// Explicit list of kinds
if len ( req . Kinds ) > 0 {
2025-05-16 03:36:52 +08:00
rsp . Stats = make ( [ ] * resourcepb . ResourceStatsResponse_Stats , len ( req . Kinds ) )
2024-12-11 02:37:37 +08:00
for i , k := range req . Kinds {
parts := strings . SplitN ( k , "/" , 2 )
index , err := s . getOrCreateIndex ( ctx , NamespacedResource {
Namespace : req . Namespace ,
Group : parts [ 0 ] ,
Resource : parts [ 1 ] ,
} )
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
count , err := index . DocCount ( ctx , req . Folder )
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
2025-05-16 03:36:52 +08:00
rsp . Stats [ i ] = & resourcepb . ResourceStatsResponse_Stats {
2024-12-11 02:37:37 +08:00
Group : parts [ 0 ] ,
Resource : parts [ 1 ] ,
Count : count ,
}
}
return rsp , nil
}
stats , err := s . storage . GetResourceStats ( ctx , req . Namespace , 0 )
if err != nil {
2025-05-16 03:36:52 +08:00
return & resourcepb . ResourceStatsResponse {
2024-12-11 02:37:37 +08:00
Error : AsErrorResult ( err ) ,
} , nil
}
2025-05-16 03:36:52 +08:00
rsp . Stats = make ( [ ] * resourcepb . ResourceStatsResponse_Stats , len ( stats ) )
2024-12-11 02:37:37 +08:00
// When not filtered by folder or repository, we can use the results directly
if req . Folder == "" {
for i , stat := range stats {
2025-05-16 03:36:52 +08:00
rsp . Stats [ i ] = & resourcepb . ResourceStatsResponse_Stats {
2024-12-11 02:37:37 +08:00
Group : stat . Group ,
Resource : stat . Resource ,
Count : stat . Count ,
}
}
return rsp , nil
}
for i , stat := range stats {
index , err := s . getOrCreateIndex ( ctx , NamespacedResource {
Namespace : req . Namespace ,
Group : stat . Group ,
Resource : stat . Resource ,
} )
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
count , err := index . DocCount ( ctx , req . Folder )
if err != nil {
rsp . Error = AsErrorResult ( err )
return rsp , nil
}
2025-05-16 03:36:52 +08:00
rsp . Stats [ i ] = & resourcepb . ResourceStatsResponse_Stats {
2024-12-11 02:37:37 +08:00
Group : stat . Group ,
Resource : stat . Resource ,
Count : count ,
}
}
return rsp , nil
}
2024-11-21 13:53:25 +08:00
// init is called during startup. any failure will block startup and continued execution
func ( s * searchSupport ) init ( ctx context . Context ) error {
2024-12-10 12:32:19 +08:00
ctx , span := s . tracer . Start ( ctx , tracingPrexfixSearch + "Init" )
2024-11-21 13:53:25 +08:00
defer span . End ( )
2024-12-05 05:02:40 +08:00
start := time . Now ( ) . Unix ( )
2024-11-21 13:53:25 +08:00
totalBatchesIndexed := 0
group := errgroup . Group { }
group . SetLimit ( s . initWorkers )
2024-12-05 18:58:13 +08:00
stats , err := s . storage . GetResourceStats ( ctx , "" , s . initMinSize )
2024-12-04 01:20:27 +08:00
if err != nil {
return err
}
for _ , info := range stats {
group . Go ( func ( ) error {
s . log . Debug ( "initializing search index" , "namespace" , info . Namespace , "group" , info . Group , "resource" , info . Resource )
totalBatchesIndexed ++
_ , _ , err = s . build ( ctx , info . NamespacedResource , info . Count , info . ResourceVersion )
return err
} )
2024-11-21 13:53:25 +08:00
}
err = group . Wait ( )
if err != nil {
return err
}
span . AddEvent ( "namespaces indexed" , trace . WithAttributes ( attribute . Int ( "namespaced_indexed" , totalBatchesIndexed ) ) )
2024-11-27 13:57:53 +08:00
// Now start listening for new events
watchctx := context . Background ( ) // new context?
events , err := s . storage . WatchWriteEvents ( watchctx )
if err != nil {
return err
}
go func ( ) {
for {
v := <- events
2025-02-12 01:57:46 +08:00
// Skip events during batch updates
if v . PreviousRV < 0 {
continue
}
2025-05-09 21:36:21 +08:00
s . dispatchEvent ( watchctx , v )
2024-11-27 13:57:53 +08:00
}
} ( )
2024-11-21 13:53:25 +08:00
2025-05-09 21:36:21 +08:00
go s . monitorIndexEvents ( ctx )
2024-12-05 05:02:40 +08:00
end := time . Now ( ) . Unix ( )
2024-12-10 12:32:19 +08:00
s . log . Info ( "search index initialized" , "duration_secs" , end - start , "total_docs" , s . search . TotalDocs ( ) )
2025-03-13 22:09:38 +08:00
if s . indexMetrics != nil {
s . indexMetrics . IndexCreationTime . WithLabelValues ( ) . Observe ( float64 ( end - start ) )
2024-12-05 05:02:40 +08:00
}
2024-11-21 13:53:25 +08:00
return nil
}
2025-05-09 21:36:21 +08:00
// Async event dispatching
// This is called from the watch event loop
// It will dispatch the event to the appropriate index queue processor
func ( s * searchSupport ) dispatchEvent ( ctx context . Context , evt * WrittenEvent ) {
ctx , span := s . tracer . Start ( ctx , tracingPrexfixSearch + "dispatchEvent" )
2025-01-29 02:30:20 +08:00
defer span . End ( )
span . SetAttributes (
attribute . String ( "event_type" , evt . Type . String ( ) ) ,
attribute . String ( "namespace" , evt . Key . Namespace ) ,
attribute . String ( "group" , evt . Key . Group ) ,
attribute . String ( "resource" , evt . Key . Resource ) ,
attribute . String ( "name" , evt . Key . Name ) ,
)
2024-12-05 05:02:40 +08:00
2025-05-09 21:36:21 +08:00
switch evt . Type {
2025-05-16 03:36:52 +08:00
case resourcepb . WatchEvent_ADDED , resourcepb . WatchEvent_MODIFIED , resourcepb . WatchEvent_DELETED : // OK
2025-05-09 21:36:21 +08:00
default :
s . log . Info ( "ignoring watch event" , "type" , evt . Type )
span . AddEvent ( "ignoring watch event" , trace . WithAttributes ( attribute . String ( "type" , evt . Type . String ( ) ) ) )
}
2024-11-27 13:57:53 +08:00
nsr := NamespacedResource {
Namespace : evt . Key . Namespace ,
Group : evt . Key . Group ,
Resource : evt . Key . Resource ,
}
index , err := s . getOrCreateIndex ( ctx , nsr )
if err != nil {
s . log . Warn ( "error getting index for watch event" , "error" , err )
2025-05-09 21:36:21 +08:00
span . RecordError ( err )
2024-11-27 13:57:53 +08:00
return
}
2025-05-09 21:36:21 +08:00
// Get or create index queue processor for this index
indexQueueProcessor , err := s . getOrCreateIndexQueueProcessor ( index , nsr )
2024-11-27 13:57:53 +08:00
if err != nil {
2025-05-09 21:36:21 +08:00
s . log . Error ( "error getting index queue processor for watch event" , "error" , err )
span . RecordError ( err )
2024-11-27 13:57:53 +08:00
return
}
2025-05-09 21:36:21 +08:00
indexQueueProcessor . Add ( evt )
}
2024-11-27 13:57:53 +08:00
2025-05-09 21:36:21 +08:00
func ( s * searchSupport ) monitorIndexEvents ( ctx context . Context ) {
var evt * IndexEvent
for {
select {
case <- ctx . Done ( ) :
2024-12-05 05:02:40 +08:00
return
2025-05-09 21:36:21 +08:00
case evt = <- s . indexEventsChan :
2024-12-05 05:02:40 +08:00
}
2025-05-09 21:36:21 +08:00
if evt . Err != nil {
s . log . Error ( "error indexing watch event" , "error" , evt . Err )
continue
2024-12-05 05:02:40 +08:00
}
2025-05-09 21:36:21 +08:00
_ , span := s . tracer . Start ( ctx , tracingPrexfixSearch + "monitorIndexEvents" )
defer span . End ( )
// record latency from when event was created to when it was indexed
span . AddEvent ( "index latency" , trace . WithAttributes ( attribute . Float64 ( "latency_seconds" , evt . Latency . Seconds ( ) ) ) )
s . log . Debug ( "indexed new object" , "resource" , evt . WrittenEvent . Key . Resource , "latency_seconds" , evt . Latency . Seconds ( ) , "name" , evt . WrittenEvent . Key . Name , "namespace" , evt . WrittenEvent . Key . Namespace , "rv" , evt . WrittenEvent . ResourceVersion )
if evt . Latency . Seconds ( ) > 1 {
s . log . Warn ( "high index latency object details" , "resource" , evt . WrittenEvent . Key . Resource , "latency_seconds" , evt . Latency . Seconds ( ) , "name" , evt . WrittenEvent . Key . Name , "namespace" , evt . WrittenEvent . Key . Namespace , "rv" , evt . WrittenEvent . ResourceVersion )
2024-12-05 05:02:40 +08:00
}
2025-03-13 22:09:38 +08:00
if s . indexMetrics != nil {
2025-05-09 21:36:21 +08:00
s . indexMetrics . IndexLatency . WithLabelValues ( evt . WrittenEvent . Key . Resource ) . Observe ( evt . Latency . Seconds ( ) )
}
if s . clientIndexEventsChan != nil {
s . clientIndexEventsChan <- evt
2025-03-13 22:09:38 +08:00
}
2024-11-27 13:57:53 +08:00
}
}
func ( s * searchSupport ) getOrCreateIndex ( ctx context . Context , key NamespacedResource ) ( ResourceIndex , error ) {
2025-01-13 23:05:04 +08:00
if s == nil || s . search == nil {
return nil , fmt . Errorf ( "search is not configured properly (missing unifiedStorageSearch feature toggle?)" )
}
2025-01-29 03:27:01 +08:00
ctx , span := s . tracer . Start ( ctx , tracingPrexfixSearch + "GetOrCreateIndex" )
defer span . End ( )
2024-11-27 13:57:53 +08:00
// TODO???
// We want to block while building the index and return the same index for the key
// simple mutex not great... we don't want to block while anything in building, just the same key
idx , err := s . search . GetIndex ( ctx , key )
if err != nil {
return nil , err
}
if idx == nil {
idx , _ , err = s . build ( ctx , key , 10 , 0 ) // unknown size and RV
if err != nil {
2025-02-12 01:57:46 +08:00
return nil , fmt . Errorf ( "error building search index, %w" , err )
2024-11-27 13:57:53 +08:00
}
if idx == nil {
return nil , fmt . Errorf ( "nil index after build" )
}
}
return idx , nil
}
2024-11-22 21:44:06 +08:00
func ( s * searchSupport ) build ( ctx context . Context , nsr NamespacedResource , size int64 , rv int64 ) ( ResourceIndex , int64 , error ) {
2024-12-10 12:32:19 +08:00
ctx , span := s . tracer . Start ( ctx , tracingPrexfixSearch + "Build" )
2024-11-21 13:53:25 +08:00
defer span . End ( )
builder , err := s . builders . get ( ctx , nsr )
if err != nil {
return nil , 0 , err
}
2024-11-22 21:44:06 +08:00
fields := s . builders . GetFields ( nsr )
2024-11-21 13:53:25 +08:00
2025-01-21 01:25:26 +08:00
s . log . Debug ( "Building index" , "resource" , nsr . Resource , "size" , size , "rv" , rv )
2024-11-21 13:53:25 +08:00
2024-11-22 21:44:06 +08:00
index , err := s . search . BuildIndex ( ctx , nsr , size , rv , fields , func ( index ResourceIndex ) ( int64 , error ) {
2025-05-16 03:36:52 +08:00
rv , err = s . storage . ListIterator ( ctx , & resourcepb . ListRequest {
2024-11-22 21:44:06 +08:00
Limit : 1000000000000 , // big number
2025-05-16 03:36:52 +08:00
Options : & resourcepb . ListOptions {
Key : & resourcepb . ResourceKey {
2025-05-12 13:56:25 +08:00
Group : nsr . Group ,
Resource : nsr . Resource ,
Namespace : nsr . Namespace ,
} ,
2024-11-22 21:44:06 +08:00
} ,
} , func ( iter ListIterator ) error {
2025-05-09 21:36:21 +08:00
// Collect all documents in a single bulk request
items := make ( [ ] * BulkIndexItem , 0 )
2024-11-22 21:44:06 +08:00
for iter . Next ( ) {
if err = iter . Error ( ) ; err != nil {
return err
}
// Update the key name
2025-05-16 03:36:52 +08:00
key := & resourcepb . ResourceKey {
2025-05-12 13:56:25 +08:00
Group : nsr . Group ,
Resource : nsr . Resource ,
Namespace : nsr . Namespace ,
Name : iter . Name ( ) ,
}
2024-11-22 21:44:06 +08:00
// Convert it to an indexable document
doc , err := builder . BuildDocument ( ctx , key , iter . ResourceVersion ( ) , iter . Value ( ) )
if err != nil {
2025-05-16 03:36:52 +08:00
s . log . Error ( "error building search document" , "key" , SearchID ( key ) , "err" , err )
2025-02-12 01:57:46 +08:00
continue
2024-11-22 21:44:06 +08:00
}
2025-05-09 21:36:21 +08:00
// Add to bulk items
items = append ( items , & BulkIndexItem {
Action : ActionIndex ,
Doc : doc ,
} )
}
// Perform single bulk index operation
if len ( items ) > 0 {
if err = index . BulkIndex ( & BulkIndexRequest {
Items : items ,
} ) ; err != nil {
2024-11-22 21:44:06 +08:00
return err
}
}
2025-04-11 22:25:40 +08:00
return iter . Error ( )
2024-11-22 21:44:06 +08:00
} )
return rv , err
} )
2024-12-06 03:14:04 +08:00
if err != nil {
return nil , 0 , err
}
2024-12-05 05:02:40 +08:00
// Record the number of objects indexed for the kind/resource
2024-12-11 02:37:37 +08:00
docCount , err := index . DocCount ( ctx , "" )
2024-12-05 05:02:40 +08:00
if err != nil {
s . log . Warn ( "error getting doc count" , "error" , err )
}
2025-03-13 22:09:38 +08:00
if s . indexMetrics != nil {
2025-05-12 13:56:25 +08:00
s . indexMetrics . IndexedKinds . WithLabelValues ( nsr . Resource ) . Add ( float64 ( docCount ) )
2024-12-05 05:02:40 +08:00
}
2024-11-22 21:44:06 +08:00
// rv is the last RV we read. when watching, we must add all events since that time
return index , rv , err
2024-11-21 13:53:25 +08:00
}
type builderCache struct {
// The default builder
defaultBuilder DocumentBuilder
// Possible blob support
blob BlobSupport
2024-11-22 21:44:06 +08:00
// searchable fields initialized once on startup
fields map [ schema . GroupResource ] SearchableDocumentFields
2024-11-21 13:53:25 +08:00
// lookup by group, then resource (namespace)
// This is only modified at startup, so we do not need mutex for access
lookup map [ string ] map [ string ] DocumentBuilderInfo
// For namespaced based resources that require a cache
ns * expirable . LRU [ NamespacedResource , DocumentBuilder ]
mu sync . Mutex // only locked for a cache miss
}
func newBuilderCache ( cfg [ ] DocumentBuilderInfo , nsCacheSize int , ttl time . Duration ) ( * builderCache , error ) {
cache := & builderCache {
2024-11-22 21:44:06 +08:00
fields : make ( map [ schema . GroupResource ] SearchableDocumentFields ) ,
2024-11-21 13:53:25 +08:00
lookup : make ( map [ string ] map [ string ] DocumentBuilderInfo ) ,
ns : expirable . NewLRU [ NamespacedResource , DocumentBuilder ] ( nsCacheSize , nil , ttl ) ,
}
if len ( cfg ) == 0 {
return cache , fmt . Errorf ( "no builders configured" )
}
for _ , b := range cfg {
// the default
if b . GroupResource . Group == "" && b . GroupResource . Resource == "" {
if b . Builder == nil {
return cache , fmt . Errorf ( "default document builder is missing" )
}
cache . defaultBuilder = b . Builder
continue
}
g , ok := cache . lookup [ b . GroupResource . Group ]
if ! ok {
g = make ( map [ string ] DocumentBuilderInfo )
cache . lookup [ b . GroupResource . Group ] = g
}
g [ b . GroupResource . Resource ] = b
2024-11-22 21:44:06 +08:00
// Any custom fields
cache . fields [ b . GroupResource ] = b . Fields
2024-11-21 13:53:25 +08:00
}
return cache , nil
}
2024-11-22 21:44:06 +08:00
func ( s * builderCache ) GetFields ( key NamespacedResource ) SearchableDocumentFields {
return s . fields [ schema . GroupResource { Group : key . Group , Resource : key . Resource } ]
}
2024-11-21 13:53:25 +08:00
// context is typically background. Holds an LRU cache for a
func ( s * builderCache ) get ( ctx context . Context , key NamespacedResource ) ( DocumentBuilder , error ) {
g , ok := s . lookup [ key . Group ]
if ok {
r , ok := g [ key . Resource ]
if ok {
if r . Builder != nil {
return r . Builder , nil
}
// The builder needs context
builder , ok := s . ns . Get ( key )
if ok {
return builder , nil
}
{
s . mu . Lock ( )
defer s . mu . Unlock ( )
b , err := r . Namespaced ( ctx , key . Namespace , s . blob )
if err == nil {
_ = s . ns . Add ( key , b )
}
return b , err
}
}
}
return s . defaultBuilder , nil
}
2025-02-07 00:30:47 +08:00
// AsResourceKey converts the given namespace and type to a search key
2025-05-16 03:36:52 +08:00
func AsResourceKey ( ns string , t string ) ( * resourcepb . ResourceKey , error ) {
2025-02-07 00:30:47 +08:00
if ns == "" {
return nil , fmt . Errorf ( "missing namespace" )
}
switch t {
case "folders" , "folder" :
2025-05-16 03:36:52 +08:00
return & resourcepb . ResourceKey {
2025-02-07 00:30:47 +08:00
Namespace : ns ,
2025-04-11 20:09:52 +08:00
Group : folders . GROUP ,
Resource : folders . RESOURCE ,
2025-02-07 00:30:47 +08:00
} , nil
case "dashboards" , "dashboard" :
2025-05-16 03:36:52 +08:00
return & resourcepb . ResourceKey {
2025-02-07 00:30:47 +08:00
Namespace : ns ,
2025-04-11 20:09:52 +08:00
Group : dashboardv1 . GROUP ,
Resource : dashboardv1 . DASHBOARD_RESOURCE ,
2025-02-07 00:30:47 +08:00
} , nil
// NOT really supported in the dashboard search UI, but useful for manual testing
case "playlist" , "playlists" :
2025-05-16 03:36:52 +08:00
return & resourcepb . ResourceKey {
2025-02-07 00:30:47 +08:00
Namespace : ns ,
Group : "playlist.grafana.app" ,
Resource : "playlists" ,
} , nil
}
return nil , fmt . Errorf ( "unknown resource type" )
}
2025-05-09 21:36:21 +08:00
// getOrCreateIndexQueueProcessor returns an IndexQueueProcessor for the given index
func ( s * searchSupport ) getOrCreateIndexQueueProcessor ( index ResourceIndex , nsr NamespacedResource ) ( * indexQueueProcessor , error ) {
s . indexQueueProcessorsMutex . Lock ( )
defer s . indexQueueProcessorsMutex . Unlock ( )
key := fmt . Sprintf ( "%s/%s/%s" , nsr . Namespace , nsr . Group , nsr . Resource )
if indexQueueProcessor , ok := s . indexQueueProcessors [ key ] ; ok {
return indexQueueProcessor , nil
}
builder , err := s . builders . get ( context . Background ( ) , nsr )
if err != nil {
s . log . Error ( "error getting document builder" , "error" , err )
return nil , err
}
indexQueueProcessor := newIndexQueueProcessor ( index , nsr , maxBatchSize , builder , s . indexEventsChan )
s . indexQueueProcessors [ key ] = indexQueueProcessor
return indexQueueProcessor , nil
}