diff --git a/pkg/registry/apis/dashboard/legacy/sql_dashboards.go b/pkg/registry/apis/dashboard/legacy/sql_dashboards.go index 80adcf19d82..8cd1bb90226 100644 --- a/pkg/registry/apis/dashboard/legacy/sql_dashboards.go +++ b/pkg/registry/apis/dashboard/legacy/sql_dashboards.go @@ -131,6 +131,10 @@ type rowsWrapper struct { err error } +func (a *dashboardSqlAccess) Namespaces(ctx context.Context) ([]string, error) { + return nil, fmt.Errorf("not implemented") +} + func (r *rowsWrapper) Close() error { if r.rows == nil { return nil diff --git a/pkg/storage/unified/resource/cdk_backend.go b/pkg/storage/unified/resource/cdk_backend.go index 910a36c7096..6d45f153c0d 100644 --- a/pkg/storage/unified/resource/cdk_backend.go +++ b/pkg/storage/unified/resource/cdk_backend.go @@ -108,6 +108,10 @@ func (s *cdkBackend) getPath(key *ResourceKey, rv int64) string { return buffer.String() } +func (s *cdkBackend) Namespaces(ctx context.Context) ([]string, error) { + return nil, fmt.Errorf("not implemented") +} + func (s *cdkBackend) WriteEvent(ctx context.Context, event WriteEvent) (rv int64, err error) { // Scope the lock { diff --git a/pkg/storage/unified/resource/index.go b/pkg/storage/unified/resource/index.go index 16f61e97a59..93e9e94e380 100644 --- a/pkg/storage/unified/resource/index.go +++ b/pkg/storage/unified/resource/index.go @@ -4,6 +4,7 @@ import ( "context" golog "log" "path/filepath" + "sync" "time" "github.com/blevesearch/bleve/v2" @@ -31,20 +32,22 @@ type Opts struct { } type Index struct { - shards map[string]Shard - opts Opts - s *server - log log.Logger - tracer tracing.Tracer + shardMutex sync.RWMutex + shards map[string]*Shard + opts Opts + s *server + log log.Logger + tracer tracing.Tracer } func NewIndex(s *server, opts Opts, tracer tracing.Tracer) *Index { idx := &Index{ - s: s, - opts: opts, - shards: make(map[string]Shard), - log: log.New("unifiedstorage.search.index"), - tracer: tracer, + shardMutex: sync.RWMutex{}, + s: s, + opts: opts, + shards: make(map[string]*Shard), + log: log.New("unifiedstorage.search.index"), + tracer: tracer, } return idx @@ -128,32 +131,86 @@ func (i *Index) AddToBatches(ctx context.Context, list *ListResponse) ([]string, } func (i *Index) Init(ctx context.Context) error { + logger := i.log.FromContext(ctx) ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"Init") defer span.End() + + start := time.Now().Unix() + group := errgroup.Group{} + group.SetLimit(i.opts.Workers) + + totalObjects := 0 + // Get all tenants currently in Unified Storage + tenants, err := i.s.backend.Namespaces(ctx) + if err != nil { + return err + } + for _, tenant := range tenants { + group.Go(func() error { + logger.Info("initializing index for tenant", "tenant", tenant) + objs, err := i.InitForTenant(ctx, tenant) + if err != nil { + return err + } + totalObjects += objs + return nil + }) + } + + err = group.Wait() + if err != nil { + return err + } + + //index all remaining batches for all tenants + logger.Info("indexing remaining batches", "shards", len(i.shards)) + err = i.IndexBatches(ctx, 1, i.allTenants()) + if err != nil { + return err + } + + end := time.Now().Unix() + totalDocCount := getTotalDocCount(i) + logger.Info("Initial indexing finished", "seconds", float64(end-start), "objs_fetched", totalObjects, "objs_indexed", totalDocCount) + span.AddEvent( + "indexing finished", + trace.WithAttributes(attribute.Int64("objects_indexed", int64(totalDocCount))), + trace.WithAttributes(attribute.Int64("objects_fetched", int64(totalObjects))), + ) + if IndexServerMetrics != nil { + IndexServerMetrics.IndexCreationTime.WithLabelValues().Observe(float64(end - start)) + } + + return nil +} + +func (i *Index) InitForTenant(ctx context.Context, namespace string) (int, error) { + ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"InitForTenant") + defer span.End() logger := i.log.FromContext(ctx) - start := time.Now().Unix() resourceTypes := fetchResourceTypes() totalObjectsFetched := 0 for _, rt := range resourceTypes { - logger.Info("indexing resource", "kind", rt.Key.Resource, "list_limit", i.opts.ListLimit, "batch_size", i.opts.BatchSize, "workers", i.opts.Workers) + logger.Debug("indexing resource", "kind", rt.Key.Resource, "list_limit", i.opts.ListLimit, "batch_size", i.opts.BatchSize, "workers", i.opts.Workers, "namespace", namespace) r := &ListRequest{Options: rt, Limit: int64(i.opts.ListLimit)} + r.Options.Key.Namespace = namespace // scope the list to a tenant or this will take forever when US has 1M+ resources // Paginate through the list of resources and index each page for { - logger.Info("fetching resource list", "kind", rt.Key.Resource) + logger.Debug("fetching resource list", "kind", rt.Key.Resource, "namespace", namespace) list, err := i.s.List(ctx, r) if err != nil { - return err + return totalObjectsFetched, err } totalObjectsFetched += len(list.Items) - logger.Info("indexing batch", "kind", rt.Key.Resource, "count", len(list.Items)) + logger.Debug("indexing batch", "kind", rt.Key.Resource, "count", len(list.Items), "namespace", namespace) //add changes to batches for shards with changes in the List err = i.writeBatch(ctx, list) if err != nil { - return err + return totalObjectsFetched, err } if list.NextPageToken == "" { @@ -164,21 +221,13 @@ func (i *Index) Init(ctx context.Context) error { } } - //index all remaining batches - logger.Info("indexing remaining batches", "shards", len(i.shards)) - err := i.IndexBatches(ctx, 1, i.allTenants()) - if err != nil { - return err - } + span.AddEvent( + "indexing finished for tenant", + trace.WithAttributes(attribute.Int64("objects_indexed", int64(totalObjectsFetched))), + trace.WithAttributes(attribute.String("tenant", namespace)), + ) - span.AddEvent("indexing finished", trace.WithAttributes(attribute.Int64("objects_indexed", int64(totalObjectsFetched)))) - end := time.Now().Unix() - logger.Info("Initial indexing finished", "seconds", float64(end-start)) - if IndexServerMetrics != nil { - IndexServerMetrics.IndexCreationTime.WithLabelValues().Observe(float64(end - start)) - } - - return nil + return totalObjectsFetched, nil } func (i *Index) writeBatch(ctx context.Context, list *ListResponse) error { @@ -207,6 +256,17 @@ func (i *Index) Index(ctx context.Context, data *Data) error { } tenant := res.Namespace logger.Debug("indexing resource for tenant", "res", string(data.Value.Value), "tenant", tenant) + + // if tenant doesn't exist, they may have been created during initial indexing + _, ok := i.shards[tenant] + if !ok { + i.log.Info("tenant not found, initializing their index", "tenant", tenant) + _, err = i.InitForTenant(ctx, tenant) + if err != nil { + return err + } + } + shard, err := i.getShard(tenant) if err != nil { return err @@ -320,18 +380,20 @@ func (i *Index) Search(ctx context.Context, request *SearchRequest) (*IndexResul return &IndexResults{Values: results, Groups: groups}, nil } -func (i *Index) Count() (uint64, error) { - var total uint64 +// Count returns the total doc count +func (i *Index) Count() (int, error) { + total := 0 for _, shard := range i.shards { count, err := shard.index.DocCount() if err != nil { i.log.Error("failed to get doc count", "error", err) } - total += count + total += int(count) } return total, nil } +// allTenants returns a list of all tenants in the index func (i *Index) allTenants() []string { tenants := make([]string, 0, len(i.shards)) for tenant := range i.shards { @@ -340,7 +402,10 @@ func (i *Index) allTenants() []string { return tenants } -func (i *Index) getShard(tenant string) (Shard, error) { +func (i *Index) getShard(tenant string) (*Shard, error) { + i.shardMutex.Lock() + defer i.shardMutex.Unlock() + shard, ok := i.shards[tenant] if ok { return shard, nil @@ -348,16 +413,16 @@ func (i *Index) getShard(tenant string) (Shard, error) { index, path, err := i.createIndex() if err != nil { - return Shard{}, err + return &Shard{}, err } - shard = Shard{ + shard = &Shard{ index: index, path: path, batch: index.NewBatch(), } - // TODO: do we need to lock this? i.shards[tenant] = shard + return shard, nil } @@ -389,17 +454,19 @@ func createInMemoryIndex() (bleve.Index, string, error) { // TODO - fetch from api func fetchResourceTypes() []*ListOptions { items := []*ListOptions{} - items = append(items, &ListOptions{ - Key: &ResourceKey{ - Group: "playlist.grafana.app", - Resource: "playlists", + items = append(items, + &ListOptions{ + Key: &ResourceKey{ + Group: "playlist.grafana.app", + Resource: "playlists", + }, }, - }, &ListOptions{ - Key: &ResourceKey{ - Group: "folder.grafana.app", - Resource: "folders", + &ListOptions{ + Key: &ResourceKey{ + Group: "folder.grafana.app", + Resource: "folders", + }, }, - }, &ListOptions{ Key: &ResourceKey{ Group: "dashboard.grafana.app", diff --git a/pkg/storage/unified/resource/index_metrics.go b/pkg/storage/unified/resource/index_metrics.go index 103c2bddb7b..570aa605d47 100644 --- a/pkg/storage/unified/resource/index_metrics.go +++ b/pkg/storage/unified/resource/index_metrics.go @@ -87,23 +87,23 @@ func (s *IndexMetrics) Describe(ch chan<- *prometheus.Desc) { s.IndexLatency.Describe(ch) } +// getTotalDocCount returns the total number of documents in the index func getTotalDocCount(index *Index) float64 { var totalCount float64 totalCount = 0 if index == nil { return totalCount } + for _, shard := range index.shards { - docCount, err := shard.index.DocCount() - if err != nil { - continue - } - totalCount += float64(docCount) + count, _ := shard.index.DocCount() + totalCount += float64(count) } return totalCount } +// getTotalIndexSize returns the total size of the index directory when using a file-based index func getTotalIndexSize(dir string) (int64, error) { var totalSize int64 diff --git a/pkg/storage/unified/resource/index_test.go b/pkg/storage/unified/resource/index_test.go index 4787be45a39..14d9d299009 100644 --- a/pkg/storage/unified/resource/index_test.go +++ b/pkg/storage/unified/resource/index_test.go @@ -29,6 +29,7 @@ func TestIndexDashboard(t *testing.T) { require.NoError(t, err) assertCountEquals(t, index, 1) + require.Equal(t, 1, len(index.allTenants())) assertSearchCountEquals(t, index, "*", nil, 1) } @@ -83,7 +84,7 @@ func TestLookupNames(t *testing.T) { err := index.writeBatch(testContext, list) require.NoError(t, err) - assertCountEquals(t, index, uint64(records)) + assertCountEquals(t, index, records) query := "" chunk := ids[:100] // query for n folders by id for _, id := range chunk { @@ -185,7 +186,7 @@ func newTestIndex(t *testing.T, batchSize int) *Index { return &Index{ tracer: trace, - shards: make(map[string]Shard), + shards: make(map[string]*Shard), log: log.New("unifiedstorage.search.index"), opts: Opts{ ListLimit: 5000, @@ -195,7 +196,7 @@ func newTestIndex(t *testing.T, batchSize int) *Index { } } -func assertCountEquals(t *testing.T, index *Index, expected uint64) { +func assertCountEquals(t *testing.T, index *Index, expected int) { total, err := index.Count() require.NoError(t, err) assert.Equal(t, expected, total) diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index f32b5a32c8d..53d33eb4b19 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -75,6 +75,8 @@ type StorageBackend interface { // Get all events from the store // For HA setups, this will be more events than the local WriteEvent above! WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) + + Namespaces(ctx context.Context) ([]string, error) } // This interface is not exposed to end users directly @@ -182,7 +184,7 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { UserID: 1, IsGrafanaAdmin: true, })) - return &server{ + s := &server{ tracer: opts.Tracer, log: logger, backend: opts.Backend, @@ -194,7 +196,9 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { now: opts.Now, ctx: ctx, cancel: cancel, - }, nil + } + + return s, nil } var _ ResourceServer = &server{} @@ -765,6 +769,10 @@ func (s *server) Origin(ctx context.Context, req *OriginRequest) (*OriginRespons // Index returns the search index. If the index is not initialized, it will be initialized. func (s *server) Index(ctx context.Context) (*Index, error) { + if err := s.Init(ctx); err != nil { + return nil, err + } + index := s.index.(*IndexServer) if index.index == nil { err := index.Init(ctx, s) diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index 1e9459336d4..a4c13e5f508 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -137,6 +137,31 @@ func (b *backend) WriteEvent(ctx context.Context, event resource.WriteEvent) (in } } +// Namespaces returns the list of unique namespaces in storage. +func (b *backend) Namespaces(ctx context.Context) ([]string, error) { + var namespaces []string + + err := b.db.WithTx(ctx, RepeatableRead, func(ctx context.Context, tx db.Tx) error { + rows, err := tx.QueryContext(ctx, "SELECT DISTINCT(namespace) FROM resource ORDER BY namespace;") + if err != nil { + return err + } + for rows.Next() { + var ns string + err = rows.Scan(&ns) + if err != nil { + return err + } + namespaces = append(namespaces, ns) + } + + err = rows.Close() + return err + }) + + return namespaces, err +} + func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64, error) { ctx, span := b.tracer.Start(ctx, tracePrefix+"Create") defer span.End() diff --git a/pkg/storage/unified/sql/indexer_seeders/README.md b/pkg/storage/unified/sql/indexer_seeders/README.md new file mode 100644 index 00000000000..1756f6dd0e7 --- /dev/null +++ b/pkg/storage/unified/sql/indexer_seeders/README.md @@ -0,0 +1,5 @@ +

Indexer Seeders

+ +These seeders are used for performance testing in the indexer in your local dev environment. They are not used in production. + +By default, they each create 1 million resources. You can adjust this in the script if needed. Creating 1M objects takes about 5-10 minutes. diff --git a/pkg/storage/unified/sql/indexer_seeders/dashboards.sql b/pkg/storage/unified/sql/indexer_seeders/dashboards.sql new file mode 100644 index 00000000000..9016cffd9ec --- /dev/null +++ b/pkg/storage/unified/sql/indexer_seeders/dashboards.sql @@ -0,0 +1,81 @@ +DROP PROCEDURE IF EXISTS InsertMillionDashboards; + +DELIMITER // + +CREATE PROCEDURE InsertMillionDashboards() +BEGIN + DECLARE i INT DEFAULT 0; + DECLARE new_guid CHAR(36); + DECLARE unique_name VARCHAR(20); + DECLARE batch_size INT DEFAULT 10; + DECLARE stmt_resource TEXT DEFAULT ''; + DECLARE stmt_resource_history TEXT DEFAULT ''; + DECLARE random_number INT; + + WHILE i < 1000000 DO + -- Generate a unique GUID and unique name + SET new_guid = UUID(); + SET unique_name = CONCAT('ad5wkqk', i); + SET @new_uid = CONCAT('dashboard', i); + + -- Generate a random number between 1 and 1000 + SET random_number = FLOOR(1 + (RAND() * 1000)); + SET @stack_namespace = CONCAT('stacks-', random_number); -- Store stack namespace in a variable + + -- Append the value part of the SQL insert statement to both resource and history inserts + SET stmt_resource = CONCAT(stmt_resource, + '(', QUOTE(new_guid), ', ', QUOTE('1730396628210501'), ', ', QUOTE('dashboard.grafana.app'), ', ', QUOTE('dashboards'), ', ', + QUOTE(@stack_namespace), ', ', QUOTE(unique_name), ', ', QUOTE(CONCAT('{\"kind\":\"Dashboard\",\"apiVersion\":\"dashboard.grafana.app/v0alpha1\",\"metadata\":{\"name\":\"ad5wkqk\",\"namespace\":\"', @stack_namespace, '\",\"uid\":\"', @new_uid, '\",\"creationTimestamp\":\"2024-10-31T17:43:48Z\",\"annotations\":{\"grafana.app/createdBy\":\"user:u000000001\",\"grafana.app/originHash\":\"Grafana v11.4.0-pre (d2d7ae2e86)\",\"grafana.app/originName\":\"UI\",\"grafana.app/originPath\":\"/dashboard/new\"},\"managedFields\":[{\"manager\":\"Mozilla\",\"operation\":\"Update\",\"apiVersion\":\"dashboard.grafana.app/v0alpha1\",\"time\":\"2024-10-31T17:43:48Z\",\"fieldsType\":\"FieldsV1\",\"fieldsV1\":{\"f:metadata\":{\"f:annotations\":{\".\":{},\"f:grafana.app/originHash\":{},\"f:grafana.app/originName\":{},\"f:grafana.app/originPath\":{}},\"f:generateName\":{}},\"f:spec\":{\"f:annotations\":{\".\":{},\"f:list\":{}},\"f:description\":{},\"f:editable\":{},\"f:fiscalYearStartMonth\":{},\"f:graphTooltip\":{},\"f:id\":{},\"f:links\":{},\"f:panels\":{},\"f:preload\":{},\"f:schemaVersion\":{},\"f:tags\":{},\"f:templating\":{\".\":{},\"f:list\":{}},\"f:timepicker\":{},\"f:timezone\":{},\"f:title\":{},\"f:uid\":{},\"f:version\":{},\"f:weekStart\":{}}}}]},\"spec\":{\"annotations\":{\"list\":[{\"builtIn\":1,\"datasource\":{\"type\":\"grafana\",\"uid\":\"-- Grafana --\"},\"enable\":true,\"hide\":true,\"iconColor\":\"rgba(0, 211, 255, 1)\",\"name\":\"Annotations \\u0026 Alerts\",\"type\":\"dashboard\"}]},\"description\":\"\",\"editable\":true,\"fiscalYearStartMonth\":0,\"graphTooltip\":0,\"id\":null,\"links\":[],\"panels\":[{\"datasource\":{\"type\":\"grafana-testdata-datasource\",\"uid\":\"PD8C576611E62080A\"},\"fieldConfig\":{\"defaults\":{\"color\":{\"mode\":\"palette-classic\"},\"custom\":{\"axisBorderShow\":false,\"axisCenteredZero\":false,\"axisColorMode\":\"text\",\"axisLabel\":\"\",\"axisPlacement\":\"auto\",\"barAlignment\":0,\"barWidthFactor\":0.6,\"drawStyle\":\"line\",\"fillOpacity\":0,\"gradientMode\":\"none\",\"hideFrom\":{\"legend\":false,\"tooltip\":false,\"viz\":false},\"insertNulls\":false,\"lineInterpolation\":\"linear\",\"lineWidth\":1,\"pointSize\":5,\"scaleDistribution\":{\"type\":\"linear\"},\"showPoints\":\"auto\",\"spanNulls\":false,\"stacking\":{\"group\":\"A\",\"mode\":\"none\"},\"thresholdsStyle\":{\"mode\":\"off\"}},\"mappings\":[],\"thresholds\":{\"mode\":\"absolute\",\"steps\":[{\"color\":\"green\",\"value\":null},{\"color\":\"red\",\"value\":80}]}},\"overrides\":[]},\"gridPos\":{\"h\":8,\"w\":12,\"x\":0,\"y\":0},\"id\":1,\"options\":{\"legend\":{\"calcs\":[],\"displayMode\":\"list\",\"placement\":\"bottom\",\"showLegend\":true},\"tooltip\":{\"mode\":\"single\",\"sort\":\"none\"}},\"pluginVersion\":\"11.4.0-pre\",\"targets\":[{\"datasource\":{\"type\":\"grafana-testdata-datasource\",\"uid\":\"PD8C576611E62080A\"},\"refId\":\"A\"}],\"title\":\"Panel Title\",\"type\":\"timeseries\"}],\"preload\":false,\"schemaVersion\":40,\"tags\":[],\"templating\":{\"list\":[]},\"timepicker\":{},\"timezone\":\"browser\",\"title\":\"dashboard1\",\"uid\":\"\",\"version\":0,\"weekStart\":\"\"}}\n')), ', ', QUOTE('1'), ', NULL, ', QUOTE('0'), '), '); + + SET stmt_resource_history = CONCAT(stmt_resource_history, + '(', QUOTE(new_guid), ', ', QUOTE('1730396628210501'), ', ', QUOTE('dashboard.grafana.app'), ', ', QUOTE('dashboards'), ', ', + QUOTE(@stack_namespace), ', ', QUOTE(unique_name), ', ', QUOTE(CONCAT('{\"kind\":\"Dashboard\",\"apiVersion\":\"dashboard.grafana.app/v0alpha1\",\"metadata\":{\"name\":\"ad5wkqk\",\"namespace\":\"', @stack_namespace, '\",\"uid\":\"', @new_uid ,'\",\"creationTimestamp\":\"2024-10-31T17:43:48Z\",\"annotations\":{\"grafana.app/createdBy\":\"user:u000000001\",\"grafana.app/originHash\":\"Grafana v11.4.0-pre (d2d7ae2e86)\",\"grafana.app/originName\":\"UI\",\"grafana.app/originPath\":\"/dashboard/new\"},\"managedFields\":[{\"manager\":\"Mozilla\",\"operation\":\"Update\",\"apiVersion\":\"dashboard.grafana.app/v0alpha1\",\"time\":\"2024-10-31T17:43:48Z\",\"fieldsType\":\"FieldsV1\",\"fieldsV1\":{\"f:metadata\":{\"f:annotations\":{\".\":{},\"f:grafana.app/originHash\":{},\"f:grafana.app/originName\":{},\"f:grafana.app/originPath\":{}},\"f:generateName\":{}},\"f:spec\":{\"f:annotations\":{\".\":{},\"f:list\":{}},\"f:description\":{},\"f:editable\":{},\"f:fiscalYearStartMonth\":{},\"f:graphTooltip\":{},\"f:id\":{},\"f:links\":{},\"f:panels\":{},\"f:preload\":{},\"f:schemaVersion\":{},\"f:tags\":{},\"f:templating\":{\".\":{},\"f:list\":{}},\"f:timepicker\":{},\"f:timezone\":{},\"f:title\":{},\"f:uid\":{},\"f:version\":{},\"f:weekStart\":{}}}}]},\"spec\":{\"annotations\":{\"list\":[{\"builtIn\":1,\"datasource\":{\"type\":\"grafana\",\"uid\":\"-- Grafana --\"},\"enable\":true,\"hide\":true,\"iconColor\":\"rgba(0, 211, 255, 1)\",\"name\":\"Annotations \\u0026 Alerts\",\"type\":\"dashboard\"}]},\"description\":\"\",\"editable\":true,\"fiscalYearStartMonth\":0,\"graphTooltip\":0,\"id\":null,\"links\":[],\"panels\":[{\"datasource\":{\"type\":\"grafana-testdata-datasource\",\"uid\":\"PD8C576611E62080A\"},\"fieldConfig\":{\"defaults\":{\"color\":{\"mode\":\"palette-classic\"},\"custom\":{\"axisBorderShow\":false,\"axisCenteredZero\":false,\"axisColorMode\":\"text\",\"axisLabel\":\"\",\"axisPlacement\":\"auto\",\"barAlignment\":0,\"barWidthFactor\":0.6,\"drawStyle\":\"line\",\"fillOpacity\":0,\"gradientMode\":\"none\",\"hideFrom\":{\"legend\":false,\"tooltip\":false,\"viz\":false},\"insertNulls\":false,\"lineInterpolation\":\"linear\",\"lineWidth\":1,\"pointSize\":5,\"scaleDistribution\":{\"type\":\"linear\"},\"showPoints\":\"auto\",\"spanNulls\":false,\"stacking\":{\"group\":\"A\",\"mode\":\"none\"},\"thresholdsStyle\":{\"mode\":\"off\"}},\"mappings\":[],\"thresholds\":{\"mode\":\"absolute\",\"steps\":[{\"color\":\"green\",\"value\":null},{\"color\":\"red\",\"value\":80}]}},\"overrides\":[]},\"gridPos\":{\"h\":8,\"w\":12,\"x\":0,\"y\":0},\"id\":1,\"options\":{\"legend\":{\"calcs\":[],\"displayMode\":\"list\",\"placement\":\"bottom\",\"showLegend\":true},\"tooltip\":{\"mode\":\"single\",\"sort\":\"none\"}},\"pluginVersion\":\"11.4.0-pre\",\"targets\":[{\"datasource\":{\"type\":\"grafana-testdata-datasource\",\"uid\":\"PD8C576611E62080A\"},\"refId\":\"A\"}],\"title\":\"Panel Title\",\"type\":\"timeseries\"}],\"preload\":false,\"schemaVersion\":40,\"tags\":[],\"templating\":{\"list\":[]},\"timepicker\":{},\"timezone\":\"browser\",\"title\":\"dashboard1\",\"uid\":\"\",\"version\":0,\"weekStart\":\"\"}}\n')), ', ', QUOTE('1'), ', NULL, ', QUOTE('0'), '), '); + + SET i = i + 1; + + -- Execute statements in batches to avoid reaching the TEXT limit + IF i % batch_size = 0 THEN + -- Remove the last comma and space + SET stmt_resource = LEFT(stmt_resource, LENGTH(stmt_resource) - 2); + SET stmt_resource_history = LEFT(stmt_resource_history, LENGTH(stmt_resource_history) - 2); + + -- Insert current batch into `resource` + SET @stmt_resource = CONCAT('INSERT INTO `resource` (`guid`, `resource_version`, `group`, `resource`, `namespace`, `name`, `value`, `action`, `label_set`, `previous_resource_version`) VALUES ', stmt_resource); +PREPARE stmt_resource_stmt FROM @stmt_resource; +EXECUTE stmt_resource_stmt; +DEALLOCATE PREPARE stmt_resource_stmt; + +-- Insert current batch into `resource_history` +SET @stmt_resource_history = CONCAT('INSERT INTO `resource_history` (`guid`, `resource_version`, `group`, `resource`, `namespace`, `name`, `value`, `action`, `label_set`, `previous_resource_version`) VALUES ', stmt_resource_history); +PREPARE stmt_resource_history_stmt FROM @stmt_resource_history; +EXECUTE stmt_resource_history_stmt; +DEALLOCATE PREPARE stmt_resource_history_stmt; + +-- Reset the batch for the next iteration +SET stmt_resource = ''; + SET stmt_resource_history = ''; +END IF; +END WHILE; + + -- Insert any remaining records if they don't fill a full batch + IF stmt_resource != '' THEN + SET stmt_resource = LEFT(stmt_resource, LENGTH(stmt_resource) - 2); + SET stmt_resource_history = LEFT(stmt_resource_history, LENGTH(stmt_resource_history) - 2); + + SET @stmt_resource = CONCAT('INSERT INTO `resource` (`guid`, `resource_version`, `group`, `resource`, `namespace`, `name`, `value`, `action`, `label_set`, `previous_resource_version`) VALUES ', stmt_resource); +PREPARE stmt_resource_stmt FROM @stmt_resource; +EXECUTE stmt_resource_stmt; +DEALLOCATE PREPARE stmt_resource_stmt; + +SET @stmt_resource_history = CONCAT('INSERT INTO `resource_history` (`guid`, `resource_version`, `group`, `resource`, `namespace`, `name`, `value`, `action`, `label_set`, `previous_resource_version`) VALUES ', stmt_resource_history); +PREPARE stmt_resource_history_stmt FROM @stmt_resource_history; +EXECUTE stmt_resource_history_stmt; +DEALLOCATE PREPARE stmt_resource_history_stmt; +END IF; +END // + +DELIMITER ; + +call InsertMillionDashboards(); + +insert into resource_version values ('dashboard.grafana.app', 'dashboards', 1730396628210501) ON DUPLICATE KEY UPDATE resource_version = 1730396628210501; diff --git a/pkg/storage/unified/sql/indexer_seeders/playlists.sql b/pkg/storage/unified/sql/indexer_seeders/playlists.sql new file mode 100644 index 00000000000..05680c09366 --- /dev/null +++ b/pkg/storage/unified/sql/indexer_seeders/playlists.sql @@ -0,0 +1,104 @@ +DROP PROCEDURE IF EXISTS InsertMillionPlaylists; + +DELIMITER // + +CREATE PROCEDURE InsertMillionPlaylists() +BEGIN + DECLARE i INT DEFAULT 0; + DECLARE new_guid CHAR(36); + DECLARE unique_name VARCHAR(20); + DECLARE batch_size INT DEFAULT 25; + DECLARE stmt_resource TEXT DEFAULT ''; + DECLARE stmt_resource_history TEXT DEFAULT ''; + DECLARE random_number INT; + + WHILE i < 1000000 DO + -- Generate a unique GUID and unique name + SET new_guid = UUID(); + SET unique_name = CONCAT('playlist', i); + SET @new_uid = CONCAT('playlist', i); + + -- Generate a random number between 1 and 1000 + SET random_number = FLOOR(1 + (RAND() * 1000)); + SET @stack_namespace = CONCAT('stacks-', random_number); -- Store stack namespace in a variable + + -- Append the value part of the SQL insert statement to both resource and history inserts + SET stmt_resource = CONCAT(stmt_resource, + '(', QUOTE(new_guid), ', ', QUOTE('1729715497301945'), ', ', QUOTE('playlist.grafana.app'), ', ', QUOTE('playlists'), ', ', + QUOTE(@stack_namespace), ', ', QUOTE(unique_name), ', ', + QUOTE(CONCAT( + '{\"kind\":\"Playlist\",\"apiVersion\":\"playlist.grafana.app/v0alpha1\",\"metadata\":{', + '\"name\":\"', unique_name, '\",\"namespace\":\"', @stack_namespace, '\",\"uid\":\"', @new_uid, '\",', + '\"resourceVersion\":\"1729715497301945\",\"creationTimestamp\":\"2024-10-05T02:17:49Z\",', + '\"annotations\":{\"grafana.app/createdBy\":\"user:u000000002\",\"grafana.app/originName\":\"SQL\",', + '\"grafana.app/originPath\":\"10182\",\"grafana.app/originTimestamp\":\"2024-10-05T02:17:49Z\",', + '\"grafana.app/updatedBy\":\"service-account:\",\"grafana.app/updatedTimestamp\":\"2024-10-23T21:00:21Z\"}},', + '\"spec\":{\"interval\":\"5m\",\"items\":[{\"type\":\"dashboard_by_uid\",\"value\":\"a6232629-98b3-42fa-91a4-579a43fbcda0\"},', + '{\"type\":\"dashboard_by_tag\",\"value\":\"tag1\"},{\"type\":\"dashboard_by_tag\",\"value\":\"tag2\"}],', + '\"title\":\"k6 test playlist create cp3f14j11tthck1\"},\"status\":{}}' + )), + ', ', QUOTE('1'), ', NULL, ', QUOTE('0'), '), '); + + SET stmt_resource_history = CONCAT(stmt_resource_history, + '(', QUOTE(new_guid), ', ', QUOTE('1729715497301945'), ', ', QUOTE('playlist.grafana.app'), ', ', QUOTE('playlists'), ', ', + QUOTE(@stack_namespace), ', ', QUOTE(unique_name), ', ', + QUOTE(CONCAT( + '{\"kind\":\"Playlist\",\"apiVersion\":\"playlist.grafana.app/v0alpha1\",\"metadata\":{', + '\"name\":\"', unique_name, '\",\"namespace\":\"', @stack_namespace, '\",\"uid\":\"', @new_uid, '\",', + '\"resourceVersion\":\"1729715497301945\",\"creationTimestamp\":\"2024-10-05T02:17:49Z\",', + '\"annotations\":{\"grafana.app/createdBy\":\"user:u000000002\",\"grafana.app/originName\":\"SQL\",', + '\"grafana.app/originPath\":\"10182\",\"grafana.app/originTimestamp\":\"2024-10-05T02:17:49Z\",', + '\"grafana.app/updatedBy\":\"service-account:\",\"grafana.app/updatedTimestamp\":\"2024-10-23T21:00:21Z\"}},', + '\"spec\":{\"interval\":\"5m\",\"items\":[{\"type\":\"dashboard_by_uid\",\"value\":\"a6232629-98b3-42fa-91a4-579a43fbcda0\"},', + '{\"type\":\"dashboard_by_tag\",\"value\":\"tag1\"},{\"type\":\"dashboard_by_tag\",\"value\":\"tag2\"}],', + '\"title\":\"k6 test playlist create cp3f14j11tthck1\"},\"status\":{}}' + )), ', ', QUOTE('1'), ', NULL, ', QUOTE('0'), '), '); + + SET i = i + 1; + + -- Execute statements in batches to avoid reaching the TEXT limit + IF i % batch_size = 0 THEN + -- Remove the last comma and space + SET stmt_resource = LEFT(stmt_resource, LENGTH(stmt_resource) - 2); + SET stmt_resource_history = LEFT(stmt_resource_history, LENGTH(stmt_resource_history) - 2); + + -- Insert current batch into `resource` + SET @stmt_resource = CONCAT('INSERT INTO `resource` (`guid`, `resource_version`, `group`, `resource`, `namespace`, `name`, `value`, `action`, `label_set`, `previous_resource_version`) VALUES ', stmt_resource); + PREPARE stmt_resource_stmt FROM @stmt_resource; + EXECUTE stmt_resource_stmt; + DEALLOCATE PREPARE stmt_resource_stmt; + +-- Insert current batch into `resource_history` + SET @stmt_resource_history = CONCAT('INSERT INTO `resource_history` (`guid`, `resource_version`, `group`, `resource`, `namespace`, `name`, `value`, `action`, `label_set`, `previous_resource_version`) VALUES ', stmt_resource_history); + PREPARE stmt_resource_history_stmt FROM @stmt_resource_history; + EXECUTE stmt_resource_history_stmt; + DEALLOCATE PREPARE stmt_resource_history_stmt; + +-- Reset the batch for the next iteration + SET stmt_resource = ''; + SET stmt_resource_history = ''; + END IF; + END WHILE; + + -- Insert any remaining records if they don't fill a full batch + IF stmt_resource != '' THEN + SET stmt_resource = LEFT(stmt_resource, LENGTH(stmt_resource) - 2); + SET stmt_resource_history = LEFT(stmt_resource_history, LENGTH(stmt_resource_history) - 2); + + SET @stmt_resource = CONCAT('INSERT INTO `resource` (`guid`, `resource_version`, `group`, `resource`, `namespace`, `name`, `value`, `action`, `label_set`, `previous_resource_version`) VALUES ', stmt_resource); + PREPARE stmt_resource_stmt FROM @stmt_resource; + EXECUTE stmt_resource_stmt; + DEALLOCATE PREPARE stmt_resource_stmt; + + SET @stmt_resource_history = CONCAT('INSERT INTO `resource_history` (`guid`, `resource_version`, `group`, `resource`, `namespace`, `name`, `value`, `action`, `label_set`, `previous_resource_version`) VALUES ', stmt_resource_history); + PREPARE stmt_resource_history_stmt FROM @stmt_resource_history; + EXECUTE stmt_resource_history_stmt; + DEALLOCATE PREPARE stmt_resource_history_stmt; + END IF; +END // + +DELIMITER ; + +call InsertMillionPlaylists(); + +insert into resource_version values ('playlist.grafana.app', 'playlists', 1729715497301945) ON DUPLICATE KEY UPDATE resource_version = 1729715497301945; diff --git a/pkg/storage/unified/sql/queries.go b/pkg/storage/unified/sql/queries.go index 999d001a7e9..5c4c1d91e89 100644 --- a/pkg/storage/unified/sql/queries.go +++ b/pkg/storage/unified/sql/queries.go @@ -55,6 +55,9 @@ var ( Isolation: sql.LevelReadCommitted, ReadOnly: true, } + RepeatableRead = &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + } ) type sqlResourceRequest struct { diff --git a/pkg/storage/unified/sql/server.go b/pkg/storage/unified/sql/server.go index 4b524c97eb4..b2ca5b493af 100644 --- a/pkg/storage/unified/sql/server.go +++ b/pkg/storage/unified/sql/server.go @@ -2,7 +2,6 @@ package sql import ( "context" - "errors" "os" "strings" @@ -51,17 +50,6 @@ func NewResourceServer(ctx context.Context, db infraDB.DB, cfg *setting.Cfg, fea if features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearch) { opts.Index = resource.NewResourceIndexServer(cfg, tracer) - server, err := resource.NewResourceServer(opts) - if err != nil { - return nil, err - } - // initialze the search index - indexer, ok := server.(resource.ResourceIndexer) - if !ok { - return nil, errors.New("index server does not implement ResourceIndexer") - } - _, err = indexer.Index(ctx) - return server, err } if features.IsEnabledGlobally(featuremgmt.FlagKubernetesFolders) { @@ -75,5 +63,18 @@ func NewResourceServer(ctx context.Context, db infraDB.DB, cfg *setting.Cfg, fea } } - return resource.NewResourceServer(opts) + rs, err := resource.NewResourceServer(opts) + if err != nil { + return nil, err + } + + // Initialize the indexer if one is configured + if opts.Index != nil { + _, err = rs.(resource.ResourceIndexer).Index(ctx) + if err != nil { + return nil, err + } + } + + return rs, nil } diff --git a/pkg/storage/unified/sql/test/indexer_integration_test.go b/pkg/storage/unified/sql/test/indexer_integration_test.go index 5ea5e68bc70..62f6618a4ce 100644 --- a/pkg/storage/unified/sql/test/indexer_integration_test.go +++ b/pkg/storage/unified/sql/test/indexer_integration_test.go @@ -77,7 +77,7 @@ func TestIntegrationIndexerSearch(t *testing.T) { addResource(t, ctx, backend, "playlists", playlist1) addResource(t, ctx, backend, "playlists", playlist2) - // initialze and build the search index + // initialize and build the search index indexer, ok := server.(resource.ResourceIndexer) if !ok { t.Fatal("server does not implement ResourceIndexer")