Merge remote-tracking branch 'origin/main' into query-history-stars

This commit is contained in:
Ryan McKinley 2025-10-06 13:53:57 +03:00
commit 60c78febb1
32 changed files with 803 additions and 175 deletions

1
.github/CODEOWNERS vendored
View File

@ -1281,6 +1281,7 @@ embed.go @grafana/grafana-as-code
/.github/license_finder.yaml @bergquist
/.github/actionlint.yaml @grafana/grafana-developer-enablement-squad
/.github/workflows/pr-test-docker.yml @grafana/grafana-developer-enablement-squad
/.github/workflows/update-schema-types.yml @grafana/plugins-platform-frontend
# Generated files not requiring owner approval
/packages/grafana-data/src/types/featureToggles.gen.ts @grafanabot

View File

@ -0,0 +1,22 @@
name: Update Schema Types
on:
push:
branches:
- main
paths:
- docs/sources/developers/plugins/plugin.schema.json
workflow_dispatch:
# These permissions are needed to assume roles from Github's OIDC.
permissions:
contents: read
id-token: write
jobs:
bundle-schema-types:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: grafana/plugin-actions/bundle-schema-types@main

View File

@ -688,7 +688,7 @@
},
"languages": {
"type": "array",
"description": "The list of languages supported by the plugin. Each entry should be a locale identifier in the format `language-COUNTRY` (for example `en-US`, `fr-FR`, `es-ES`).",
"description": "The list of languages supported by the plugin. Each entry should be a locale identifier in the format `language-COUNTRY` (for example `en-US`, `es-ES`, `de-DE`).",
"items": {
"type": "string"
}

View File

@ -970,10 +970,6 @@ export interface FeatureToggles {
*/
multiTenantTempCredentials?: boolean;
/**
* Enables localization for plugins
*/
localizationForPlugins?: boolean;
/**
* Enables unified navbars
* @default false
*/

View File

@ -2,6 +2,7 @@ package ofrep
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
@ -14,14 +15,21 @@ import (
"strconv"
"github.com/grafana/grafana/pkg/cmd/grafana-cli/logger"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/util/proxyutil"
goffmodel "github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/model"
)
func (b *APIBuilder) proxyAllFlagReq(isAuthedUser bool, w http.ResponseWriter, r *http.Request) {
func (b *APIBuilder) proxyAllFlagReq(ctx context.Context, isAuthedUser bool, w http.ResponseWriter, r *http.Request) {
ctx, span := tracer.Start(ctx, "ofrep.proxy.evalAllFlags")
defer span.End()
r = r.WithContext(ctx)
proxy, err := b.newProxy(ofrepPath)
if err != nil {
err = tracing.Error(span, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@ -61,9 +69,15 @@ func (b *APIBuilder) proxyAllFlagReq(isAuthedUser bool, w http.ResponseWriter, r
proxy.ServeHTTP(w, r)
}
func (b *APIBuilder) proxyFlagReq(flagKey string, isAuthedUser bool, w http.ResponseWriter, r *http.Request) {
func (b *APIBuilder) proxyFlagReq(ctx context.Context, flagKey string, isAuthedUser bool, w http.ResponseWriter, r *http.Request) {
ctx, span := tracer.Start(ctx, "ofrep.proxy.evalFlag")
defer span.End()
r = r.WithContext(ctx)
proxy, err := b.newProxy(path.Join(ofrepPath, flagKey))
if err != nil {
err = tracing.Error(span, err)
b.logger.Error("Failed to create proxy", "key", flagKey, "error", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return

View File

@ -10,6 +10,9 @@ import (
"net/url"
"github.com/gorilla/mux"
"github.com/grafana/grafana/pkg/infra/tracing"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -31,6 +34,8 @@ var _ builder.APIGroupBuilder = (*APIBuilder)(nil)
var _ builder.APIGroupRouteProvider = (*APIBuilder)(nil)
var _ builder.APIGroupVersionProvider = (*APIBuilder)(nil)
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/registry/apis/ofrep")
const ofrepPath = "/ofrep/v1/evaluate/flags"
const namespaceMismatchMsg = "rejecting request with namespace mismatch"
@ -240,7 +245,13 @@ func (b *APIBuilder) GetAPIRoutes(gv schema.GroupVersion) *builder.APIRoutes {
}
func (b *APIBuilder) oneFlagHandler(w http.ResponseWriter, r *http.Request) {
ctx, span := tracer.Start(r.Context(), "ofrep.handler.evalFlag")
defer span.End()
r = r.WithContext(ctx)
if !b.validateNamespace(r) {
_ = tracing.Errorf(span, namespaceMismatchMsg)
b.logger.Error(namespaceMismatchMsg)
http.Error(w, namespaceMismatchMsg, http.StatusUnauthorized)
return
@ -248,42 +259,54 @@ func (b *APIBuilder) oneFlagHandler(w http.ResponseWriter, r *http.Request) {
flagKey := mux.Vars(r)["flagKey"]
if flagKey == "" {
_ = tracing.Errorf(span, "flagKey parameter is required")
http.Error(w, "flagKey parameter is required", http.StatusBadRequest)
return
}
span.SetAttributes(attribute.String("flag_key", flagKey))
isAuthedReq := b.isAuthenticatedRequest(r)
span.SetAttributes(attribute.Bool("authenticated", isAuthedReq))
// Unless the request is authenticated, we only allow public flags evaluations
if !isAuthedReq && !isPublicFlag(flagKey) {
_ = tracing.Errorf(span, "unauthorized to evaluate flag: %s", flagKey)
b.logger.Error("Unauthorized to evaluate flag", "flagKey", flagKey)
http.Error(w, "unauthorized to evaluate flag", http.StatusUnauthorized)
return
}
if b.providerType == setting.GOFFProviderType {
b.proxyFlagReq(flagKey, isAuthedReq, w, r)
b.proxyFlagReq(ctx, flagKey, isAuthedReq, w, r)
return
}
b.evalFlagStatic(flagKey, w, r)
b.evalFlagStatic(ctx, flagKey, w)
}
func (b *APIBuilder) allFlagsHandler(w http.ResponseWriter, r *http.Request) {
ctx, span := tracer.Start(r.Context(), "ofrep.handler.evalAllFlags")
defer span.End()
r = r.WithContext(ctx)
if !b.validateNamespace(r) {
_ = tracing.Errorf(span, namespaceMismatchMsg)
b.logger.Error(namespaceMismatchMsg)
http.Error(w, namespaceMismatchMsg, http.StatusUnauthorized)
return
}
isAuthedReq := b.isAuthenticatedRequest(r)
span.SetAttributes(attribute.Bool("authenticated", isAuthedReq))
if b.providerType == setting.GOFFProviderType {
b.proxyAllFlagReq(isAuthedReq, w, r)
b.proxyAllFlagReq(ctx, isAuthedReq, w, r)
return
}
b.evalAllFlagsStatic(isAuthedReq, w, r)
b.evalAllFlagsStatic(ctx, isAuthedReq, w)
}
func writeResponse(statusCode int, result any, logger log.Logger, w http.ResponseWriter) {

View File

@ -1,19 +1,28 @@
package ofrep
import (
"context"
"net/http"
"github.com/grafana/grafana/pkg/infra/tracing"
goffmodel "github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/model"
"go.opentelemetry.io/otel/attribute"
)
func (b *APIBuilder) evalAllFlagsStatic(isAuthedUser bool, w http.ResponseWriter, r *http.Request) {
result, err := b.staticEvaluator.EvalAllFlags(r.Context())
func (b *APIBuilder) evalAllFlagsStatic(ctx context.Context, isAuthedUser bool, w http.ResponseWriter) {
_, span := tracer.Start(ctx, "ofrep.static.evalAllFlags")
defer span.End()
result, err := b.staticEvaluator.EvalAllFlags(ctx)
if err != nil {
err = tracing.Error(span, err)
b.logger.Error("Failed to evaluate all static flags", "error", err)
http.Error(w, "failed to evaluate flags", http.StatusInternalServerError)
return
}
span.SetAttributes(attribute.Int("total_flags_count", len(result.Flags)))
if !isAuthedUser {
var publicOnly []goffmodel.OFREPFlagBulkEvaluateSuccessResponse
@ -24,14 +33,21 @@ func (b *APIBuilder) evalAllFlagsStatic(isAuthedUser bool, w http.ResponseWriter
}
result.Flags = publicOnly
span.SetAttributes(attribute.Int("public_flags_count", len(publicOnly)))
}
writeResponse(http.StatusOK, result, b.logger, w)
}
func (b *APIBuilder) evalFlagStatic(flagKey string, w http.ResponseWriter, r *http.Request) {
result, err := b.staticEvaluator.EvalFlag(r.Context(), flagKey)
func (b *APIBuilder) evalFlagStatic(ctx context.Context, flagKey string, w http.ResponseWriter) {
_, span := tracer.Start(ctx, "ofrep.static.evalFlag")
defer span.End()
span.SetAttributes(attribute.String("flag_key", flagKey))
result, err := b.staticEvaluator.EvalFlag(ctx, flagKey)
if err != nil {
err = tracing.Error(span, err)
b.logger.Error("Failed to evaluate static flag", "key", flagKey, "error", err)
http.Error(w, "failed to evaluate flag", http.StatusInternalServerError)
return

View File

@ -0,0 +1,20 @@
package server
import (
"github.com/grafana/grafana/pkg/modules"
)
// ModuleRegisterer is used to inject enterprise dskit modules into
// the module manager. This abstraction allows other builds (e.g. enterprise) to register
// additional modules while keeping the core server decoupled from build-specific dependencies.
type ModuleRegisterer interface {
RegisterModules(manager modules.Registry)
}
type noopModuleRegisterer struct{}
func (noopModuleRegisterer) RegisterModules(manager modules.Registry) {}
func ProvideNoopModuleRegisterer() ModuleRegisterer {
return &noopModuleRegisterer{}
}

View File

@ -44,8 +44,9 @@ func NewModule(opts Options,
promGatherer prometheus.Gatherer,
tracer tracing.Tracer, // Ensures tracing is initialized
license licensing.Licensing,
moduleRegisterer ModuleRegisterer,
) (*ModuleServer, error) {
s, err := newModuleServer(opts, apiOpts, features, cfg, storageMetrics, indexMetrics, reg, promGatherer, license)
s, err := newModuleServer(opts, apiOpts, features, cfg, storageMetrics, indexMetrics, reg, promGatherer, license, moduleRegisterer)
if err != nil {
return nil, err
}
@ -66,6 +67,7 @@ func newModuleServer(opts Options,
reg prometheus.Registerer,
promGatherer prometheus.Gatherer,
license licensing.Licensing,
moduleRegisterer ModuleRegisterer,
) (*ModuleServer, error) {
rootCtx, shutdownFn := context.WithCancel(context.Background())
@ -87,6 +89,7 @@ func newModuleServer(opts Options,
promGatherer: promGatherer,
registerer: reg,
license: license,
moduleRegisterer: moduleRegisterer,
}
return s, nil
@ -124,6 +127,9 @@ type ModuleServer struct {
httpServerRouter *mux.Router
searchServerRing *ring.Ring
searchServerRingClientPool *ringclient.Pool
// moduleRegisterer allows registration of modules provided by other builds (e.g. enterprise).
moduleRegisterer ModuleRegisterer
}
// init initializes the server and its services.
@ -202,6 +208,9 @@ func (s *ModuleServer) Run() error {
m.RegisterModule(modules.All, nil)
// Register modules provided by other builds (e.g. enterprise).
s.moduleRegisterer.RegisterModules(m)
return m.Run(s.context)
}

View File

@ -326,7 +326,7 @@ func initModuleServerForTest(
) testModuleServer {
tracer := tracing.InitializeTracerForTest()
ms, err := NewModule(opts, apiOpts, featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearch), cfg, nil, nil, prometheus.NewRegistry(), prometheus.DefaultGatherer, tracer, nil)
ms, err := NewModule(opts, apiOpts, featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearch), cfg, nil, nil, prometheus.NewRegistry(), prometheus.DefaultGatherer, tracer, nil, ProvideNoopModuleRegisterer())
require.NoError(t, err)
conn, err := grpc.NewClient(cfg.GRPCServer.Address,

View File

@ -1621,7 +1621,8 @@ func InitializeModuleServer(cfg *setting.Cfg, opts Options, apiOpts api.ServerOp
}
hooksService := hooks.ProvideService()
ossLicensingService := licensing.ProvideService(cfg, hooksService)
moduleServer, err := NewModule(opts, apiOpts, featureToggles, cfg, storageMetrics, bleveIndexMetrics, registerer, gatherer, tracingService, ossLicensingService)
moduleRegisterer := ProvideNoopModuleRegisterer()
moduleServer, err := NewModule(opts, apiOpts, featureToggles, cfg, storageMetrics, bleveIndexMetrics, registerer, gatherer, tracingService, ossLicensingService, moduleRegisterer)
if err != nil {
return nil, err
}

View File

@ -191,6 +191,8 @@ var wireExtsModuleServerSet = wire.NewSet(
// Unified storage
resource.ProvideStorageMetrics,
resource.ProvideIndexMetrics,
// Overriden by enterprise
ProvideNoopModuleRegisterer,
)
var wireExtsStandaloneAPIServerSet = wire.NewSet(

View File

@ -14,12 +14,10 @@ import (
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/kvstore"
"github.com/grafana/grafana/pkg/infra/tracing"
@ -63,7 +61,7 @@ func Test_NoopServiceDoesNothing(t *testing.T) {
func Test_CreateGetAndDeleteToken(t *testing.T) {
t.Parallel()
s := setUpServiceTest(t, false)
s := setUpServiceTest(t)
createResp, err := s.CreateToken(context.Background())
assert.NoError(t, err)
@ -88,7 +86,7 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) {
t.Parallel()
setupTest := func(ctx context.Context) (service *Service, snapshotUID string, sessionUID string) {
s := setUpServiceTest(t, false).(*Service)
s := setUpServiceTest(t).(*Service)
gmsClientFake := &gmsClientMock{}
s.gmsClient = gmsClientFake
@ -365,7 +363,7 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) {
func Test_OnlyQueriesStatusFromGMSWhenRequired(t *testing.T) {
t.Parallel()
s := setUpServiceTest(t, false).(*Service)
s := setUpServiceTest(t).(*Service)
gmsClientMock := &gmsClientMock{
getSnapshotResponse: &cloudmigration.GetSnapshotStatusResponse{
@ -427,14 +425,29 @@ func Test_OnlyQueriesStatusFromGMSWhenRequired(t *testing.T) {
Status: status,
})
assert.NoError(t, err)
_, err := s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
snapshot, err := s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{
SnapshotUID: uid,
SessionUID: sess.UID,
})
assert.NoError(t, err)
require.Eventually(t, func() bool { return gmsClientMock.GetSnapshotStatusCallCount() == i+1 }, time.Second, 10*time.Millisecond)
assert.Equal(t, status, snapshot.Status)
require.Eventually(
t,
func() bool { return gmsClientMock.GetSnapshotStatusCallCount() == i+1 },
2*time.Second,
100*time.Millisecond,
"GMS client mock GetSnapshotStatus count: %d", gmsClientMock.GetSnapshotStatusCallCount(),
)
}
assert.Never(t, func() bool { return gmsClientMock.GetSnapshotStatusCallCount() > 2 }, time.Second, 10*time.Millisecond)
assert.Never(
t,
func() bool { return gmsClientMock.GetSnapshotStatusCallCount() > 2 },
2*time.Second,
100*time.Millisecond,
"GMS client mock GetSnapshotStatus called more than expected: %d times", gmsClientMock.GetSnapshotStatusCallCount(),
)
}
// Implementation inspired by ChatGPT, OpenAI's language model.
@ -463,7 +476,7 @@ func Test_SortFolders(t *testing.T) {
func TestDeleteSession(t *testing.T) {
t.Parallel()
s := setUpServiceTest(t, false).(*Service)
s := setUpServiceTest(t).(*Service)
user := &user.SignedInUser{UserUID: "user123"}
t.Run("when deleting a session that does not exist in the database, it returns an error", func(t *testing.T) {
@ -515,7 +528,7 @@ func TestReportEvent(t *testing.T) {
gmsMock := &gmsClientMock{}
s := setUpServiceTest(t, false).(*Service)
s := setUpServiceTest(t).(*Service)
s.gmsClient = gmsMock
require.NotPanics(t, func() {
@ -533,7 +546,7 @@ func TestReportEvent(t *testing.T) {
gmsMock := &gmsClientMock{}
s := setUpServiceTest(t, false).(*Service)
s := setUpServiceTest(t).(*Service)
s.gmsClient = gmsMock
require.NotPanics(t, func() {
@ -547,7 +560,7 @@ func TestReportEvent(t *testing.T) {
func TestGetFolderNamesForFolderUIDs(t *testing.T) {
t.Parallel()
s := setUpServiceTest(t, false).(*Service)
s := setUpServiceTest(t).(*Service)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
@ -616,7 +629,7 @@ func TestGetParentNames(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
s := setUpServiceTest(t, false).(*Service)
s := setUpServiceTest(t).(*Service)
user := &user.SignedInUser{OrgID: 1}
@ -705,7 +718,7 @@ func TestGetParentNames(t *testing.T) {
func TestGetLibraryElementsCommands(t *testing.T) {
t.Parallel()
s := setUpServiceTest(t, false).(*Service)
s := setUpServiceTest(t).(*Service)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
@ -771,7 +784,7 @@ func TestIsPublicSignatureType(t *testing.T) {
func TestGetPlugins(t *testing.T) {
t.Parallel()
s := setUpServiceTest(t, false).(*Service)
s := setUpServiceTest(t).(*Service)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
@ -869,7 +882,7 @@ func TestGetPlugins(t *testing.T) {
type configOverrides func(c *setting.Cfg)
func setUpServiceTest(t *testing.T, withDashboardMock bool, cfgOverrides ...configOverrides) cloudmigration.Service {
func setUpServiceTest(t *testing.T, cfgOverrides ...configOverrides) cloudmigration.Service {
secretsService := secretsfakes.NewFakeSecretsService()
rr := routing.NewRouteRegister()
tracer := tracing.InitializeTracerForTest()
@ -888,17 +901,6 @@ func setUpServiceTest(t *testing.T, withDashboardMock bool, cfgOverrides ...conf
cfg.CloudMigration.SnapshotFolder = filepath.Join(os.TempDir(), uuid.NewString())
dashboardService := dashboards.NewFakeDashboardService(t)
if withDashboardMock {
dashboardService.On("GetAllDashboards", mock.Anything).Return(
[]*dashboards.Dashboard{
{
UID: "1",
Data: simplejson.New(),
},
},
nil,
)
}
dsService := &datafakes.FakeDataSourceService{
DataSources: []*datasources.DataSource{

View File

@ -45,7 +45,7 @@ func TestGetAlertMuteTimings(t *testing.T) {
t.Run("it returns the mute timings", func(t *testing.T) {
t.Parallel()
s := setUpServiceTest(t, false).(*Service)
s := setUpServiceTest(t).(*Service)
s.features = featuremgmt.WithFeatures(featuremgmt.FlagOnPremToCloudMigrations)
user := &user.SignedInUser{OrgID: 1}
@ -69,7 +69,7 @@ func TestGetNotificationTemplates(t *testing.T) {
t.Run("it returns the notification templates", func(t *testing.T) {
t.Parallel()
s := setUpServiceTest(t, false).(*Service)
s := setUpServiceTest(t).(*Service)
user := &user.SignedInUser{OrgID: 1}
@ -92,7 +92,7 @@ func TestGetContactPoints(t *testing.T) {
t.Run("it returns the contact points", func(t *testing.T) {
t.Parallel()
s := setUpServiceTest(t, false).(*Service)
s := setUpServiceTest(t).(*Service)
user := &user.SignedInUser{
OrgID: 1,
@ -115,7 +115,7 @@ func TestGetContactPoints(t *testing.T) {
t.Run("it returns an error when user lacks permission to read contact point secrets", func(t *testing.T) {
t.Parallel()
s := setUpServiceTest(t, false).(*Service)
s := setUpServiceTest(t).(*Service)
user := &user.SignedInUser{
OrgID: 1,
@ -144,7 +144,7 @@ func TestGetNotificationPolicies(t *testing.T) {
t.Run("it returns the contact points", func(t *testing.T) {
t.Parallel()
s := setUpServiceTest(t, false).(*Service)
s := setUpServiceTest(t).(*Service)
user := &user.SignedInUser{OrgID: 1}
@ -172,7 +172,7 @@ func TestGetAlertRules(t *testing.T) {
t.Run("it returns the alert rules", func(t *testing.T) {
t.Parallel()
s := setUpServiceTest(t, false).(*Service)
s := setUpServiceTest(t).(*Service)
user := &user.SignedInUser{OrgID: 1, Permissions: map[int64]map[string][]string{1: alertRulesPermissions}}
@ -191,7 +191,7 @@ func TestGetAlertRules(t *testing.T) {
c.CloudMigration.AlertRulesState = setting.GMSAlertRulesPaused
}
s := setUpServiceTest(t, false, alertRulesState).(*Service)
s := setUpServiceTest(t, alertRulesState).(*Service)
user := &user.SignedInUser{OrgID: 1, Permissions: map[int64]map[string][]string{1: alertRulesPermissions}}
@ -218,7 +218,7 @@ func TestGetAlertRuleGroups(t *testing.T) {
t.Run("it returns the alert rule groups", func(t *testing.T) {
t.Parallel()
s := setUpServiceTest(t, false).(*Service)
s := setUpServiceTest(t).(*Service)
user := &user.SignedInUser{OrgID: 1, Permissions: map[int64]map[string][]string{1: alertRulesPermissions}}
@ -257,7 +257,7 @@ func TestGetAlertRuleGroups(t *testing.T) {
c.CloudMigration.AlertRulesState = setting.GMSAlertRulesPaused
}
s := setUpServiceTest(t, false, alertRulesState).(*Service)
s := setUpServiceTest(t, alertRulesState).(*Service)
user := &user.SignedInUser{OrgID: 1, Permissions: map[int64]map[string][]string{1: alertRulesPermissions}}

View File

@ -1677,13 +1677,6 @@ var (
HideFromDocs: true,
Owner: awsDatasourcesSquad,
},
{
Name: "localizationForPlugins",
Description: "Enables localization for plugins",
Stage: FeatureStageExperimental,
Owner: grafanaPluginsPlatformSquad,
FrontendOnly: false,
},
{
Name: "unifiedNavbars",
Description: "Enables unified navbars",

View File

@ -217,7 +217,6 @@ unifiedStorageGrpcConnectionPool,experimental,@grafana/search-and-storage,false,
alertingRulePermanentlyDelete,GA,@grafana/alerting-squad,false,false,true
alertingRuleRecoverDeleted,GA,@grafana/alerting-squad,false,false,true
multiTenantTempCredentials,experimental,@grafana/aws-datasources,false,false,false
localizationForPlugins,experimental,@grafana/plugins-platform-backend,false,false,false
unifiedNavbars,GA,@grafana/plugins-platform-backend,false,false,true
logsPanelControls,preview,@grafana/observability-logs,false,false,true
metricsFromProfiles,experimental,@grafana/observability-traces-and-profiling,false,false,true

1 Name Stage Owner requiresDevMode RequiresRestart FrontendOnly
217 alertingRulePermanentlyDelete GA @grafana/alerting-squad false false true
218 alertingRuleRecoverDeleted GA @grafana/alerting-squad false false true
219 multiTenantTempCredentials experimental @grafana/aws-datasources false false false
localizationForPlugins experimental @grafana/plugins-platform-backend false false false
220 unifiedNavbars GA @grafana/plugins-platform-backend false false true
221 logsPanelControls preview @grafana/observability-logs false false true
222 metricsFromProfiles experimental @grafana/observability-traces-and-profiling false false true

View File

@ -878,10 +878,6 @@ const (
// use multi-tenant path for awsTempCredentials
FlagMultiTenantTempCredentials = "multiTenantTempCredentials"
// FlagLocalizationForPlugins
// Enables localization for plugins
FlagLocalizationForPlugins = "localizationForPlugins"
// FlagUnifiedNavbars
// Enables unified navbars
FlagUnifiedNavbars = "unifiedNavbars"

View File

@ -2270,7 +2270,8 @@
"metadata": {
"name": "localizationForPlugins",
"resourceVersion": "1753448760331",
"creationTimestamp": "2025-03-31T04:38:38Z"
"creationTimestamp": "2025-03-31T04:38:38Z",
"deletionTimestamp": "2025-09-29T07:10:59Z"
},
"spec": {
"description": "Enables localization for plugins",

View File

@ -324,6 +324,15 @@ func (moa *MultiOrgAlertmanager) SaveAndApplyAlertmanagerConfiguration(ctx conte
}
cleanPermissionsErr := err
if previousConfig != nil {
// If there is a previous configuration, we need to copy its extra configs to the new one.
extraConfigs, err := extractExtraConfigs(previousConfig.AlertmanagerConfiguration)
if err != nil {
return fmt.Errorf("failed to extract extra configs from previous configuration: %w", err)
}
config.ExtraConfigs = extraConfigs
}
if err := moa.Crypto.ProcessSecureSettings(ctx, org, config.AlertmanagerConfig.Receivers); err != nil {
return fmt.Errorf("failed to post process Alertmanager configuration: %w", err)
}
@ -572,3 +581,18 @@ func extractReceiverNames(rawConfig string) (sets.Set[string], error) {
return receiverNames, nil
}
// extractExtraConfigs extracts encrypted (does not decrypt) extra configurations from the raw Alertmanager config.
func extractExtraConfigs(rawConfig string) ([]definitions.ExtraConfiguration, error) {
// Slimmed down version of the Alertmanager configuration to extract extra configs.
type extraConfigUserConfig struct {
ExtraConfigs []definitions.ExtraConfiguration `yaml:"extra_config,omitempty" json:"extra_config,omitempty"`
}
cfg := &extraConfigUserConfig{}
if err := json.Unmarshal([]byte(rawConfig), cfg); err != nil {
return nil, fmt.Errorf("unable to parse Alertmanager configuration: %w", err)
}
return cfg.ExtraConfigs, nil
}

View File

@ -150,6 +150,264 @@ receivers:
})
}
func TestMultiOrgAlertmanager_SaveAndApplyAlertmanagerConfiguration(t *testing.T) {
orgID := int64(1)
ctx := context.Background()
t.Run("SaveAndApplyAlertmanagerConfiguration preserves existing extra configs", func(t *testing.T) {
mam := setupMam(t, nil)
require.NoError(t, mam.LoadAndSyncAlertmanagersForOrgs(ctx))
extraConfig := definitions.ExtraConfiguration{
Identifier: "test-extra-config",
MergeMatchers: amconfig.Matchers{&labels.Matcher{Type: labels.MatchEqual, Name: "env", Value: "test"}},
TemplateFiles: map[string]string{"test.tmpl": "{{ define \"test\" }}Test{{ end }}"},
AlertmanagerConfig: `route:
receiver: extra-receiver
receivers:
- name: extra-receiver`,
}
err := mam.SaveAndApplyExtraConfiguration(ctx, orgID, extraConfig)
require.NoError(t, err)
// Verify extra config was saved
gettableConfig, err := mam.GetAlertmanagerConfiguration(ctx, orgID, false, false)
require.NoError(t, err)
require.Len(t, gettableConfig.ExtraConfigs, 1)
require.Equal(t, extraConfig.Identifier, gettableConfig.ExtraConfigs[0].Identifier)
// Apply a new main configuration
newMainConfig := definitions.PostableUserConfig{
AlertmanagerConfig: definitions.PostableApiAlertingConfig{
Config: definitions.Config{
Route: &definitions.Route{
Receiver: "main-receiver",
},
},
Receivers: []*definitions.PostableApiReceiver{
{
Receiver: amconfig.Receiver{
Name: "main-receiver",
},
PostableGrafanaReceivers: definitions.PostableGrafanaReceivers{
GrafanaManagedReceivers: []*definitions.PostableGrafanaReceiver{
{
Name: "main-receiver",
Type: "email",
Settings: definitions.RawMessage(`{"addresses": "me@grafana.com"}`),
},
},
},
},
},
},
}
err = mam.SaveAndApplyAlertmanagerConfiguration(ctx, orgID, newMainConfig)
require.NoError(t, err)
// Verify that the extra config is still present after applying the new main config
updatedConfig, err := mam.GetAlertmanagerConfiguration(ctx, orgID, false, false)
require.NoError(t, err)
require.Len(t, updatedConfig.ExtraConfigs, 1)
require.Equal(t, extraConfig.Identifier, updatedConfig.ExtraConfigs[0].Identifier)
require.Equal(t, extraConfig.TemplateFiles, updatedConfig.ExtraConfigs[0].TemplateFiles)
// Verify the main config was updated
require.Equal(t, "main-receiver", updatedConfig.AlertmanagerConfig.Route.Receiver)
require.Len(t, updatedConfig.AlertmanagerConfig.Receivers, 1)
require.Equal(t, "main-receiver", updatedConfig.AlertmanagerConfig.Receivers[0].Name)
})
t.Run("SaveAndApplyAlertmanagerConfiguration handles missing extra_configs field", func(t *testing.T) {
mam := setupMam(t, nil)
require.NoError(t, mam.LoadAndSyncAlertmanagersForOrgs(ctx))
// Apply initial config without extra_configs field
initialConfig := definitions.PostableUserConfig{
AlertmanagerConfig: definitions.PostableApiAlertingConfig{
Config: definitions.Config{
Route: &definitions.Route{
Receiver: "initial-receiver",
},
},
Receivers: []*definitions.PostableApiReceiver{
{
Receiver: amconfig.Receiver{
Name: "initial-receiver",
},
PostableGrafanaReceivers: definitions.PostableGrafanaReceivers{
GrafanaManagedReceivers: []*definitions.PostableGrafanaReceiver{
{
Name: "initial-receiver",
Type: "email",
Settings: definitions.RawMessage(`{"addresses": "initial@grafana.com"}`),
},
},
},
},
},
},
}
err := mam.SaveAndApplyAlertmanagerConfiguration(ctx, orgID, initialConfig)
require.NoError(t, err)
// Apply a new main configuration
newMainConfig := definitions.PostableUserConfig{
AlertmanagerConfig: definitions.PostableApiAlertingConfig{
Config: definitions.Config{
Route: &definitions.Route{
Receiver: "main-receiver",
},
},
Receivers: []*definitions.PostableApiReceiver{
{
Receiver: amconfig.Receiver{
Name: "main-receiver",
},
PostableGrafanaReceivers: definitions.PostableGrafanaReceivers{
GrafanaManagedReceivers: []*definitions.PostableGrafanaReceiver{
{
Name: "main-receiver",
Type: "email",
Settings: definitions.RawMessage(`{"addresses": "me@grafana.com"}`),
},
},
},
},
},
},
}
err = mam.SaveAndApplyAlertmanagerConfiguration(ctx, orgID, newMainConfig)
require.NoError(t, err)
// Verify that no extra configs are present and main config was updated
updatedConfig, err := mam.GetAlertmanagerConfiguration(ctx, orgID, false, false)
require.NoError(t, err)
require.Len(t, updatedConfig.ExtraConfigs, 0)
require.Equal(t, "main-receiver", updatedConfig.AlertmanagerConfig.Route.Receiver)
})
t.Run("SaveAndApplyAlertmanagerConfiguration handles empty extra_configs array", func(t *testing.T) {
mam := setupMam(t, nil)
require.NoError(t, mam.LoadAndSyncAlertmanagersForOrgs(ctx))
// Apply initial config with empty extra_configs
initialConfig := definitions.PostableUserConfig{
AlertmanagerConfig: definitions.PostableApiAlertingConfig{
Config: definitions.Config{
Route: &definitions.Route{
Receiver: "initial-receiver",
},
},
Receivers: []*definitions.PostableApiReceiver{
{
Receiver: amconfig.Receiver{
Name: "initial-receiver",
},
PostableGrafanaReceivers: definitions.PostableGrafanaReceivers{
GrafanaManagedReceivers: []*definitions.PostableGrafanaReceiver{
{
Name: "initial-receiver",
Type: "email",
Settings: definitions.RawMessage(`{"addresses": "initial@grafana.com"}`),
},
},
},
},
},
},
ExtraConfigs: []definitions.ExtraConfiguration{}, // Empty array
}
err := mam.SaveAndApplyAlertmanagerConfiguration(ctx, orgID, initialConfig)
require.NoError(t, err)
// Apply a new main configuration
newMainConfig := definitions.PostableUserConfig{
AlertmanagerConfig: definitions.PostableApiAlertingConfig{
Config: definitions.Config{
Route: &definitions.Route{
Receiver: "main-receiver",
},
},
Receivers: []*definitions.PostableApiReceiver{
{
Receiver: amconfig.Receiver{
Name: "main-receiver",
},
PostableGrafanaReceivers: definitions.PostableGrafanaReceivers{
GrafanaManagedReceivers: []*definitions.PostableGrafanaReceiver{
{
Name: "main-receiver",
Type: "email",
Settings: definitions.RawMessage(`{"addresses": "me@grafana.com"}`),
},
},
},
},
},
},
}
err = mam.SaveAndApplyAlertmanagerConfiguration(ctx, orgID, newMainConfig)
require.NoError(t, err)
// Verify that no extra configs are present and main config was updated
updatedConfig, err := mam.GetAlertmanagerConfiguration(ctx, orgID, false, false)
require.NoError(t, err)
require.Len(t, updatedConfig.ExtraConfigs, 0)
require.Equal(t, "main-receiver", updatedConfig.AlertmanagerConfig.Route.Receiver)
})
}
func TestExtractExtraConfigs(t *testing.T) {
t.Run("extracts extra configs from JSON", func(t *testing.T) {
jsonConfig := `{
"extra_config": [
{
"identifier": "test-config",
"merge_matchers": [],
"template_files": {"test.tmpl": "test"},
"alertmanager_config": "route:\n receiver: test"
}
]
}`
extraConfigs, err := extractExtraConfigs(jsonConfig)
require.NoError(t, err)
require.Len(t, extraConfigs, 1)
require.Equal(t, "test-config", extraConfigs[0].Identifier)
})
t.Run("handles missing extra_config field", func(t *testing.T) {
jsonConfig := `{"alertmanager_config": {"route": {"receiver": "test"}}}`
extraConfigs, err := extractExtraConfigs(jsonConfig)
require.NoError(t, err)
require.Len(t, extraConfigs, 0)
})
t.Run("handles empty extra_config array", func(t *testing.T) {
jsonConfig := `{"extra_config": []}`
extraConfigs, err := extractExtraConfigs(jsonConfig)
require.NoError(t, err)
require.Len(t, extraConfigs, 0)
})
t.Run("handles null extra_config", func(t *testing.T) {
jsonConfig := `{"extra_config": null}`
extraConfigs, err := extractExtraConfigs(jsonConfig)
require.NoError(t, err)
require.Len(t, extraConfigs, 0)
})
}
func TestMultiOrgAlertmanager_DeleteExtraConfiguration(t *testing.T) {
orgID := int64(1)

View File

@ -582,6 +582,7 @@ type Cfg struct {
IndexMinCount int
IndexRebuildInterval time.Duration
IndexCacheTTL time.Duration
IndexMinUpdateInterval time.Duration // Don't update index if it was updated less than this interval ago.
MaxFileIndexAge time.Duration // Max age of file-based indexes. Index older than this will be rebuilt asynchronously.
MinFileIndexBuildVersion string // Minimum version of Grafana that built the file-based index. If index was built with older Grafana, it will be rebuilt asynchronously.
EnableSharding bool

View File

@ -73,6 +73,7 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
// default to 24 hours because usage insights summarizes the data every 24 hours
cfg.IndexRebuildInterval = section.Key("index_rebuild_interval").MustDuration(24 * time.Hour)
cfg.IndexCacheTTL = section.Key("index_cache_ttl").MustDuration(10 * time.Minute)
cfg.IndexMinUpdateInterval = section.Key("index_min_update_interval").MustDuration(0)
cfg.SprinklesApiServer = section.Key("sprinkles_api_server").String()
cfg.SprinklesApiServerPageLimit = section.Key("sprinkles_api_server_page_limit").MustInt(10000)
cfg.CACertPath = section.Key("ca_cert_path").String()

View File

@ -88,7 +88,7 @@ type ResourceIndex interface {
// UpdateIndex updates the index with the latest data (using update function provided when index was built) to guarantee strong consistency during the search.
// Returns RV to which index was updated.
UpdateIndex(ctx context.Context, reason string) (int64, error)
UpdateIndex(ctx context.Context) (int64, error)
// BuildInfo returns build information about the index.
BuildInfo() (IndexBuildInfo, error)
@ -102,7 +102,7 @@ type UpdateFn func(context context.Context, index ResourceIndex, sinceRV int64)
// SearchBackend contains the technology specific logic to support search
type SearchBackend interface {
// GetIndex returns existing index, or nil.
GetIndex(ctx context.Context, key NamespacedResource) (ResourceIndex, error)
GetIndex(key NamespacedResource) ResourceIndex
// BuildIndex builds an index from scratch.
// Depending on the size, the backend may choose different options (eg: memory vs disk).
@ -540,23 +540,18 @@ func (s *searchSupport) runPeriodicScanForIndexesToRebuild(ctx context.Context)
s.log.Info("stopping periodic index rebuild due to context cancellation")
return
case <-ticker.C:
s.findIndexesToRebuild(ctx, time.Now())
s.findIndexesToRebuild(time.Now())
}
}
}
func (s *searchSupport) findIndexesToRebuild(ctx context.Context, now time.Time) {
func (s *searchSupport) findIndexesToRebuild(now time.Time) {
// Check all open indexes and see if any of them need to be rebuilt.
// This is done periodically to make sure that the indexes are up to date.
keys := s.search.GetOpenIndexes()
for _, key := range keys {
idx, err := s.search.GetIndex(ctx, key)
if err != nil {
s.log.Error("failed to check index to rebuild", "key", key, "error", err)
continue
}
idx := s.search.GetIndex(key)
if idx == nil {
// This can happen if index was closed in the meantime.
continue
@ -618,13 +613,7 @@ func (s *searchSupport) rebuildIndex(ctx context.Context, req rebuildRequest) {
l := s.log.With("namespace", req.Namespace, "group", req.Group, "resource", req.Resource)
idx, err := s.search.GetIndex(ctx, req.NamespacedResource)
if err != nil {
span.RecordError(err)
l.Error("failed to get index to rebuild", "error", err)
return
}
idx := s.search.GetIndex(req.NamespacedResource)
if idx == nil {
span.AddEvent("index not found")
l.Error("index not found")
@ -716,11 +705,7 @@ func (s *searchSupport) getOrCreateIndex(ctx context.Context, key NamespacedReso
attribute.String("namespace", key.Namespace),
)
idx, err := s.search.GetIndex(ctx, key)
if err != nil {
return nil, tracing.Error(span, err)
}
idx := s.search.GetIndex(key)
if idx == nil {
span.AddEvent("Building index")
ch := s.buildIndex.DoChan(key.String(), func() (interface{}, error) {
@ -730,8 +715,8 @@ func (s *searchSupport) getOrCreateIndex(ctx context.Context, key NamespacedReso
// Recheck if some other goroutine managed to build an index in the meantime.
// (That is, it finished running this function and stored the index into the cache)
idx, err := s.search.GetIndex(ctx, key)
if err == nil && idx != nil {
idx := s.search.GetIndex(key)
if idx != nil {
return idx, nil
}
@ -773,7 +758,7 @@ func (s *searchSupport) getOrCreateIndex(ctx context.Context, key NamespacedReso
span.AddEvent("Updating index")
start := time.Now()
rv, err := idx.UpdateIndex(ctx, reason)
rv, err := idx.UpdateIndex(ctx)
if err != nil {
return nil, tracing.Error(span, fmt.Errorf("failed to update index to guarantee strong consistency: %w", err))
}

View File

@ -31,7 +31,7 @@ type MockResourceIndex struct {
updateIndexError error
updateIndexMu sync.Mutex
updateIndexCalls []string
updateIndexCalls int
buildInfo IndexBuildInfo
}
@ -65,11 +65,11 @@ func (m *MockResourceIndex) ListManagedObjects(ctx context.Context, req *resourc
return args.Get(0).(*resourcepb.ListManagedObjectsResponse), args.Error(1)
}
func (m *MockResourceIndex) UpdateIndex(ctx context.Context, reason string) (int64, error) {
func (m *MockResourceIndex) UpdateIndex(_ context.Context) (int64, error) {
m.updateIndexMu.Lock()
defer m.updateIndexMu.Unlock()
m.updateIndexCalls = append(m.updateIndexCalls, reason)
m.updateIndexCalls++
return 0, m.updateIndexError
}
@ -144,10 +144,10 @@ type buildIndexCall struct {
fields SearchableDocumentFields
}
func (m *mockSearchBackend) GetIndex(ctx context.Context, key NamespacedResource) (ResourceIndex, error) {
func (m *mockSearchBackend) GetIndex(key NamespacedResource) ResourceIndex {
m.mu.Lock()
defer m.mu.Unlock()
return m.cache[key], nil
return m.cache[key]
}
func (m *mockSearchBackend) BuildIndex(ctx context.Context, key NamespacedResource, size int64, fields SearchableDocumentFields, reason string, builder BuildFn, updater UpdateFn, rebuild bool) (ResourceIndex, error) {
@ -271,24 +271,24 @@ func TestSearchGetOrCreateIndexWithIndexUpdate(t *testing.T) {
idx, err := support.getOrCreateIndex(context.Background(), NamespacedResource{Namespace: "ns", Group: "group", Resource: "resource"}, "initial call")
require.NoError(t, err)
require.NotNil(t, idx)
checkMockIndexUpdateCalls(t, idx, []string{"initial call"})
checkMockIndexUpdateCalls(t, idx, 1)
idx, err = support.getOrCreateIndex(context.Background(), NamespacedResource{Namespace: "ns", Group: "group", Resource: "resource"}, "second call")
require.NoError(t, err)
require.NotNil(t, idx)
checkMockIndexUpdateCalls(t, idx, []string{"initial call", "second call"})
checkMockIndexUpdateCalls(t, idx, 2)
idx, err = support.getOrCreateIndex(context.Background(), NamespacedResource{Namespace: "ns", Group: "group", Resource: "bad"}, "call to bad index")
require.ErrorIs(t, err, failedErr)
require.Nil(t, idx)
}
func checkMockIndexUpdateCalls(t *testing.T, idx ResourceIndex, strings []string) {
func checkMockIndexUpdateCalls(t *testing.T, idx ResourceIndex, calls int) {
mi, ok := idx.(*MockResourceIndex)
require.True(t, ok)
mi.updateIndexMu.Lock()
defer mi.updateIndexMu.Unlock()
require.Equal(t, strings, mi.updateIndexCalls)
require.Equal(t, calls, mi.updateIndexCalls)
}
func TestSearchGetOrCreateIndexWithCancellation(t *testing.T) {
@ -333,8 +333,8 @@ func TestSearchGetOrCreateIndexWithCancellation(t *testing.T) {
// Wait until new index is put into cache.
require.Eventually(t, func() bool {
idx, err := support.search.GetIndex(ctx, key)
return err == nil && idx != nil
idx := support.search.GetIndex(key)
return idx != nil
}, 1*time.Second, 100*time.Millisecond, "Indexing finishes despite context cancellation")
// Second call to getOrCreateIndex returns index immediately, even if context is canceled, as the index is now ready and cached.
@ -347,10 +347,10 @@ type slowSearchBackendWithCache struct {
wg sync.WaitGroup
}
func (m *slowSearchBackendWithCache) GetIndex(ctx context.Context, key NamespacedResource) (ResourceIndex, error) {
func (m *slowSearchBackendWithCache) GetIndex(key NamespacedResource) ResourceIndex {
m.mu.Lock()
defer m.mu.Unlock()
return m.cache[key], nil
return m.cache[key]
}
func (m *slowSearchBackendWithCache) BuildIndex(ctx context.Context, key NamespacedResource, size int64, fields SearchableDocumentFields, reason string, builder BuildFn, updater UpdateFn, rebuild bool) (ResourceIndex, error) {
@ -573,14 +573,14 @@ func TestFindIndexesForRebuild(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, support)
support.findIndexesToRebuild(context.Background(), now)
support.findIndexesToRebuild(now)
require.Equal(t, 6, support.rebuildQueue.Len())
now5m := now.Add(5 * time.Minute)
// Running findIndexesToRebuild again should not add any new indexes to the rebuild queue, and all existing
// ones should be "combined" with new ones (this will "bump" minBuildTime)
support.findIndexesToRebuild(context.Background(), now5m)
support.findIndexesToRebuild(now5m)
require.Equal(t, 6, support.rebuildQueue.Len())
// Values that we expect to find in rebuild requests.
@ -692,8 +692,7 @@ func TestRebuildIndexes(t *testing.T) {
func checkRebuildIndex(t *testing.T, support *searchSupport, req rebuildRequest, indexExists, expectedRebuild bool) {
ctx := context.Background()
idxBefore, err := support.search.GetIndex(ctx, req.NamespacedResource)
require.NoError(t, err)
idxBefore := support.search.GetIndex(req.NamespacedResource)
if indexExists {
require.NotNil(t, idxBefore, "index should exist before rebuildIndex")
} else {
@ -702,8 +701,7 @@ func checkRebuildIndex(t *testing.T, support *searchSupport, req rebuildRequest,
support.rebuildIndex(ctx, req)
idxAfter, err := support.search.GetIndex(ctx, req.NamespacedResource)
require.NoError(t, err)
idxAfter := support.search.GetIndex(req.NamespacedResource)
if indexExists {
require.NotNil(t, idxAfter, "index should exist after rebuildIndex")

View File

@ -22,6 +22,7 @@ import (
claims "github.com/grafana/authlib/types"
"github.com/grafana/dskit/backoff"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/apimachinery/validation"
secrets "github.com/grafana/grafana/pkg/registry/apis/secret/contracts"
@ -194,6 +195,10 @@ type SearchOptions struct {
// Number of workers to use for index rebuilds.
IndexRebuildWorkers int
// Minimum time between index updates. This is also used as a delay after a successful write operation, to guarantee
// that subsequent search will observe the effect of the writing.
IndexMinUpdateInterval time.Duration
}
type ResourceServerOptions struct {
@ -336,6 +341,8 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
reg: opts.Reg,
queue: opts.QOSQueue,
queueConfig: opts.QOSConfig,
artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval,
}
if opts.Search.Resources != nil {
@ -386,6 +393,11 @@ type server struct {
reg prometheus.Registerer
queue QOSEnqueuer
queueConfig QueueConfig
// This value is used by storage server to artificially delay returning response after successful
// write operations to make sure that subsequent search by the same client will return up-to-date results.
// Set from SearchOptions.IndexMinUpdateInterval.
artificialSuccessfulWriteDelay time.Duration
}
// Init implements ResourceServer.
@ -661,6 +673,8 @@ func (s *server) Create(ctx context.Context, req *resourcepb.CreateRequest) (*re
})
}
s.sleepAfterSuccessfulWriteOperation(res, err)
return res, err
}
@ -684,6 +698,37 @@ func (s *server) create(ctx context.Context, user claims.AuthInfo, req *resource
return rsp, nil
}
type responseWithErrorResult interface {
GetError() *resourcepb.ErrorResult
}
// sleepAfterSuccessfulWriteOperation will sleep for a specified time if the operation was successful.
// Returns boolean indicating whether the sleep was performed or not (used in testing).
//
// This sleep is performed to guarantee search-after-write consistency, when rate-limiting updates to search index.
func (s *server) sleepAfterSuccessfulWriteOperation(res responseWithErrorResult, err error) bool {
if s.artificialSuccessfulWriteDelay <= 0 {
return false
}
if err != nil {
// No sleep necessary if operation failed.
return false
}
// We expect that non-nil interface values with typed nils can still handle GetError() call.
if res != nil {
errRes := res.GetError()
if errRes != nil {
// No sleep necessary if operation failed.
return false
}
}
time.Sleep(s.artificialSuccessfulWriteDelay)
return true
}
func (s *server) Update(ctx context.Context, req *resourcepb.UpdateRequest) (*resourcepb.UpdateResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.Update")
defer span.End()
@ -715,6 +760,8 @@ func (s *server) Update(ctx context.Context, req *resourcepb.UpdateRequest) (*re
})
}
s.sleepAfterSuccessfulWriteOperation(res, err)
return res, err
}
@ -787,6 +834,8 @@ func (s *server) Delete(ctx context.Context, req *resourcepb.DeleteRequest) (*re
})
}
s.sleepAfterSuccessfulWriteOperation(res, err)
return res, err
}

View File

@ -3,6 +3,7 @@ package resource
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
@ -21,6 +22,7 @@ import (
authlib "github.com/grafana/authlib/types"
"github.com/grafana/dskit/services"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/infra/log"
@ -587,3 +589,30 @@ func newTestServerWithQueue(t *testing.T, maxSizePerTenant int, numWorkers int)
}
return s, q
}
func TestArtificialDelayAfterSuccessfulOperation(t *testing.T) {
s := &server{artificialSuccessfulWriteDelay: 1 * time.Millisecond}
check := func(t *testing.T, expectedSleep bool, res responseWithErrorResult, err error) {
slept := s.sleepAfterSuccessfulWriteOperation(res, err)
require.Equal(t, expectedSleep, slept)
}
// Successful responses should sleep
check(t, true, nil, nil)
check(t, true, (responseWithErrorResult)((*resourcepb.CreateResponse)(nil)), nil)
check(t, true, &resourcepb.CreateResponse{}, nil)
check(t, true, (responseWithErrorResult)((*resourcepb.UpdateResponse)(nil)), nil)
check(t, true, &resourcepb.UpdateResponse{}, nil)
check(t, true, (responseWithErrorResult)((*resourcepb.DeleteResponse)(nil)), nil)
check(t, true, &resourcepb.DeleteResponse{}, nil)
// Failed responses should return without sleeping
check(t, false, nil, errors.New("some error"))
check(t, false, &resourcepb.CreateResponse{Error: AsErrorResult(errors.New("some error"))}, nil)
check(t, false, &resourcepb.UpdateResponse{Error: AsErrorResult(errors.New("some error"))}, nil)
check(t, false, &resourcepb.DeleteResponse{Error: AsErrorResult(errors.New("some error"))}, nil)
}

View File

@ -79,6 +79,9 @@ type BleveOptions struct {
UseFullNgram bool
// Minimum time between index updates.
IndexMinUpdateInterval time.Duration
// This function is called to check whether the index is owned by the current instance.
// Indexes that are not owned by current instance are eligible for cleanup.
// If nil, all indexes are owned by the current instance.
@ -167,13 +170,13 @@ func NewBleveBackend(opts BleveOptions, tracer trace.Tracer, indexMetrics *resou
}
// GetIndex will return nil if the key does not exist
func (b *bleveBackend) GetIndex(_ context.Context, key resource.NamespacedResource) (resource.ResourceIndex, error) {
func (b *bleveBackend) GetIndex(key resource.NamespacedResource) resource.ResourceIndex {
idx := b.getCachedIndex(key, time.Now())
// Avoid returning typed nils.
if idx == nil {
return nil, nil
return nil
}
return idx, nil
return idx
}
func (b *bleveBackend) GetOpenIndexes() []resource.NamespacedResource {
@ -689,8 +692,8 @@ func (b *bleveBackend) closeAllIndexes() {
}
type updateRequest struct {
reason string
callback chan updateResult
requestTime time.Time
callback chan updateResult
}
type updateResult struct {
@ -705,6 +708,10 @@ type bleveIndex struct {
// RV returned by last List/ListModifiedSince operation. Updated when updating index.
resourceVersion int64
// Timestamp when the last update to the index was done (started).
// Subsequent update requests only trigger new update if minUpdateInterval has elapsed.
nextUpdateTime time.Time
standard resource.SearchableDocumentFields
fields resource.SearchableDocumentFields
@ -719,7 +726,8 @@ type bleveIndex struct {
tracing trace.Tracer
logger *slog.Logger
updaterFn resource.UpdateFn
updaterFn resource.UpdateFn
minUpdateInterval time.Duration
updaterMu sync.Mutex
updaterCond *sync.Cond // Used to signal the updater goroutine that there is work to do, or updater is no longer enabled and should stop. Also used by updater itself to stop early if there's no work to be done.
@ -746,15 +754,16 @@ func (b *bleveBackend) newBleveIndex(
logger *slog.Logger,
) *bleveIndex {
bi := &bleveIndex{
key: key,
index: index,
indexStorage: newIndexType,
fields: fields,
allFields: allFields,
standard: standardSearchFields,
tracing: b.tracer,
logger: logger,
updaterFn: updaterFn,
key: key,
index: index,
indexStorage: newIndexType,
fields: fields,
allFields: allFields,
standard: standardSearchFields,
tracing: b.tracer,
logger: logger,
updaterFn: updaterFn,
minUpdateInterval: b.opts.IndexMinUpdateInterval,
}
bi.updaterCond = sync.NewCond(&bi.updaterMu)
if b.indexMetrics != nil {
@ -1349,14 +1358,14 @@ func (b *bleveIndex) stopUpdaterAndCloseIndex() error {
return b.index.Close()
}
func (b *bleveIndex) UpdateIndex(ctx context.Context, reason string) (int64, error) {
func (b *bleveIndex) UpdateIndex(ctx context.Context) (int64, error) {
// We don't have to do anything if the index cannot be updated (typically in tests).
if b.updaterFn == nil {
return 0, nil
}
// Use chan with buffer size 1 to ensure that we can always send the result back, even if there's no reader anymore.
req := updateRequest{reason: reason, callback: make(chan updateResult, 1)}
req := updateRequest{requestTime: time.Now(), callback: make(chan updateResult, 1)}
// Make sure that the updater goroutine is running.
b.updaterMu.Lock()
@ -1413,7 +1422,7 @@ func (b *bleveIndex) runUpdater(ctx context.Context) {
b.updaterMu.Lock()
for !b.updaterShutdown && ctx.Err() == nil && len(b.updaterQueue) == 0 && time.Since(start) < maxWait {
// Cond is signalled when updaterShutdown changes, updaterQueue gets new element or when timeout occurs.
// Cond is signaled when updaterShutdown changes, updaterQueue gets new element or when timeout occurs.
b.updaterCond.Wait()
}
@ -1436,6 +1445,26 @@ func (b *bleveIndex) runUpdater(ctx context.Context) {
return
}
// Check if requests arrived before minUpdateInterval since the last update has elapsed, and remove such requests.
for ix := 0; ix < len(batch); {
req := batch[ix]
if req.requestTime.Before(b.nextUpdateTime) {
req.callback <- updateResult{rv: b.resourceVersion}
batch = append(batch[:ix], batch[ix+1:]...)
} else {
// Keep in the batch
ix++
}
}
// If all requests are now handled, don't perform update.
if len(batch) == 0 {
continue
}
// Bump next update time
b.nextUpdateTime = time.Now().Add(b.minUpdateInterval)
var rv int64
var err = ctx.Err()
if err == nil {

View File

@ -833,6 +833,12 @@ func withOwnsIndexFn(fn func(key resource.NamespacedResource) (bool, error)) set
}
}
func withIndexMinUpdateInterval(d time.Duration) setupOption {
return func(options *BleveOptions) {
options.IndexMinUpdateInterval = d
}
}
func TestBuildIndexExpiration(t *testing.T) {
ns := resource.NamespacedResource{
Namespace: "test",
@ -897,8 +903,7 @@ func TestBuildIndexExpiration(t *testing.T) {
backend.runEvictExpiredOrUnownedIndexes(time.Now().Add(5 * time.Minute))
if tc.expectedEviction {
idx, err := backend.GetIndex(context.Background(), ns)
require.NoError(t, err)
idx := backend.GetIndex(ns)
require.Nil(t, idx)
_, err = builtIndex.DocCount(context.Background(), "")
@ -907,8 +912,7 @@ func TestBuildIndexExpiration(t *testing.T) {
// Verify that there are no open indexes.
checkOpenIndexes(t, reg, 0, 0)
} else {
idx, err := backend.GetIndex(context.Background(), ns)
require.NoError(t, err)
idx := backend.GetIndex(ns)
require.NotNil(t, idx)
cnt, err := builtIndex.DocCount(context.Background(), "")
@ -1132,6 +1136,37 @@ func updateTestDocs(ns resource.NamespacedResource, docs int) resource.UpdateFn
}
}
func updateTestDocsReturningMillisTimestamp(ns resource.NamespacedResource, docs int) (resource.UpdateFn, *atomic.Int64) {
cnt := 0
updateCalls := atomic.NewInt64(0)
return func(context context.Context, index resource.ResourceIndex, sinceRV int64) (newRV int64, updatedDocs int, _ error) {
now := time.Now()
updateCalls.Inc()
cnt++
var items []*resource.BulkIndexItem
for i := 0; i < docs; i++ {
items = append(items, &resource.BulkIndexItem{
Action: resource.ActionIndex,
Doc: &resource.IndexableDocument{
Key: &resourcepb.ResourceKey{
Namespace: ns.Namespace,
Group: ns.Group,
Resource: ns.Resource,
Name: fmt.Sprintf("doc%d", i),
},
Title: fmt.Sprintf("Document %d (gen_%d)", i, cnt),
},
})
}
err := index.BulkIndex(&resource.BulkIndexRequest{Items: items})
return now.UnixMilli(), docs, err
}, updateCalls
}
func TestCleanOldIndexes(t *testing.T) {
dir := t.TempDir()
@ -1209,7 +1244,7 @@ func TestIndexUpdate(t *testing.T) {
require.Equal(t, int64(0), resp.TotalHits)
// Update index.
_, err = idx.UpdateIndex(context.Background(), "test")
_, err = idx.UpdateIndex(context.Background())
require.NoError(t, err)
// Verify that index was updated -- number of docs didn't change, but we can search "gen_1" documents now.
@ -1217,7 +1252,7 @@ func TestIndexUpdate(t *testing.T) {
require.Equal(t, int64(5), searchTitle(t, idx, "gen_1", 10, ns).TotalHits)
// Update index again.
_, err = idx.UpdateIndex(context.Background(), "test")
_, err = idx.UpdateIndex(context.Background())
require.NoError(t, err)
// Verify that index was updated again -- we can search "gen_2" now. "gen_1" documents are gone.
require.Equal(t, 10, docCount(t, idx))
@ -1261,13 +1296,13 @@ func TestConcurrentIndexUpdateAndBuildIndex(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, err = idx.UpdateIndex(ctx, "test")
_, err = idx.UpdateIndex(ctx)
require.NoError(t, err)
_, err = be.BuildIndex(t.Context(), ns, 10 /* file based */, nil, "test", indexTestDocs(ns, 10, 100), updaterFn, false)
require.NoError(t, err)
_, err = idx.UpdateIndex(ctx, "test")
_, err = idx.UpdateIndex(ctx)
require.Contains(t, err.Error(), bleve.ErrorIndexClosed.Error())
}
@ -1303,10 +1338,8 @@ func TestConcurrentIndexUpdateSearchAndRebuild(t *testing.T) {
case <-time.After(time.Duration(i) * time.Millisecond): // introduce small jitter
}
idx, err := be.GetIndex(ctx, ns)
require.NoError(t, err) // GetIndex doesn't really return error.
_, err = idx.UpdateIndex(ctx, "test")
idx := be.GetIndex(ns)
_, err = idx.UpdateIndex(ctx)
if err != nil {
if errors.Is(err, bleve.ErrorIndexClosed) || errors.Is(err, context.Canceled) {
continue
@ -1353,7 +1386,7 @@ func TestConcurrentIndexUpdateSearchAndRebuild(t *testing.T) {
cancel()
wg.Wait()
fmt.Println("Updates:", updates.Load(), "searches:", searches.Load(), "rebuilds:", rebuilds.Load())
t.Log("Updates:", updates.Load(), "searches:", searches.Load(), "rebuilds:", rebuilds.Load())
}
// Verify concurrent updates and searches work as expected.
@ -1387,7 +1420,7 @@ func TestConcurrentIndexUpdateAndSearch(t *testing.T) {
prevRV := int64(0)
for ctx.Err() == nil {
// We use t.Context() here to avoid getting errors from context cancellation.
rv, err := idx.UpdateIndex(t.Context(), "test")
rv, err := idx.UpdateIndex(t.Context())
require.NoError(t, err)
require.Greater(t, rv, prevRV) // Each update should return new RV (that's how our update function works)
require.Equal(t, int64(10), searchTitle(t, idx, "Document", 10, ns).TotalHits)
@ -1415,7 +1448,72 @@ func TestConcurrentIndexUpdateAndSearch(t *testing.T) {
require.Greater(t, rvUpdatedByMultipleGoroutines, int64(0))
}
// Verify concurrent updates and searches work as expected.
func TestConcurrentIndexUpdateAndSearchWithIndexMinUpdateInterval(t *testing.T) {
ns := resource.NamespacedResource{
Namespace: "test",
Group: "group",
Resource: "resource",
}
const minInterval = 100 * time.Millisecond
be, _ := setupBleveBackend(t, withIndexMinUpdateInterval(minInterval))
updateFn, updateCalls := updateTestDocsReturningMillisTimestamp(ns, 5)
idx, err := be.BuildIndex(t.Context(), ns, 10 /* file based */, nil, "test", indexTestDocs(ns, 10, 100), updateFn, false)
require.NoError(t, err)
wg := sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
attemptedUpdates := atomic.NewInt64(0)
// Verify that each returned RV (unix timestamp in millis) is either the same as before, or at least minInterval later.
const searchConcurrency = 25
for i := 0; i < searchConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
prevRV := int64(0)
for ctx.Err() == nil {
attemptedUpdates.Inc()
// We use t.Context() here to avoid getting errors from context cancellation.
rv, err := idx.UpdateIndex(t.Context())
require.NoError(t, err)
// Our update function returns unix timestamp in millis. We expect it to not change at all, or change by minInterval.
if prevRV > 0 {
rvDiff := rv - prevRV
if rvDiff == 0 {
// OK
} else {
// Allow returned RV to be within 10% of minInterval.
require.InDelta(t, minInterval.Milliseconds(), rvDiff, float64(minInterval.Milliseconds())*0.10)
}
}
prevRV = rv
require.Equal(t, int64(10), searchTitle(t, idx, "Document", 10, ns).TotalHits)
}
}()
}
// Run updates and searches for this time.
testTime := 1 * time.Second
time.Sleep(testTime)
cancel()
wg.Wait()
expectedUpdateCalls := int64(testTime / minInterval)
require.InDelta(t, expectedUpdateCalls, updateCalls.Load(), float64(expectedUpdateCalls/2))
require.Greater(t, attemptedUpdates.Load(), updateCalls.Load())
t.Log("Attempted updates:", attemptedUpdates.Load(), "update calls:", updateCalls.Load())
}
func TestIndexUpdateWithErrors(t *testing.T) {
ns := resource.NamespacedResource{
Namespace: "test",
@ -1434,7 +1532,7 @@ func TestIndexUpdateWithErrors(t *testing.T) {
require.NoError(t, err)
t.Run("update fail", func(t *testing.T) {
_, err = idx.UpdateIndex(t.Context(), "test")
_, err = idx.UpdateIndex(t.Context())
require.ErrorIs(t, err, updateErr)
})
@ -1442,7 +1540,7 @@ func TestIndexUpdateWithErrors(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
_, err = idx.UpdateIndex(ctx, "test")
_, err = idx.UpdateIndex(ctx)
require.ErrorIs(t, err, context.DeadlineExceeded)
})
@ -1451,7 +1549,7 @@ func TestIndexUpdateWithErrors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err = idx.UpdateIndex(ctx, "test")
_, err = idx.UpdateIndex(ctx)
require.ErrorIs(t, err, context.Canceled)
})
}

View File

@ -41,13 +41,14 @@ func NewSearchOptions(
}
bleve, err := NewBleveBackend(BleveOptions{
Root: root,
FileThreshold: int64(cfg.IndexFileThreshold), // fewer than X items will use a memory index
BatchSize: cfg.IndexMaxBatchSize, // This is the batch size for how many objects to add to the index at once
IndexCacheTTL: cfg.IndexCacheTTL, // How long to keep the index cache in memory
BuildVersion: cfg.BuildVersion,
UseFullNgram: features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageUseFullNgram),
OwnsIndex: ownsIndexFn,
Root: root,
FileThreshold: int64(cfg.IndexFileThreshold), // fewer than X items will use a memory index
BatchSize: cfg.IndexMaxBatchSize, // This is the batch size for how many objects to add to the index at once
IndexCacheTTL: cfg.IndexCacheTTL, // How long to keep the index cache in memory
BuildVersion: cfg.BuildVersion,
UseFullNgram: features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageUseFullNgram),
OwnsIndex: ownsIndexFn,
IndexMinUpdateInterval: cfg.IndexMinUpdateInterval,
}, tracer, indexMetrics)
if err != nil {
@ -55,14 +56,15 @@ func NewSearchOptions(
}
return resource.SearchOptions{
Backend: bleve,
Resources: docs,
InitWorkerThreads: cfg.IndexWorkers,
IndexRebuildWorkers: cfg.IndexRebuildWorkers,
InitMinCount: cfg.IndexMinCount,
DashboardIndexMaxAge: cfg.IndexRebuildInterval,
MaxIndexAge: cfg.MaxFileIndexAge,
MinBuildVersion: minVersion,
Backend: bleve,
Resources: docs,
InitWorkerThreads: cfg.IndexWorkers,
IndexRebuildWorkers: cfg.IndexRebuildWorkers,
InitMinCount: cfg.IndexMinCount,
DashboardIndexMaxAge: cfg.IndexRebuildInterval,
MaxIndexAge: cfg.MaxFileIndexAge,
MinBuildVersion: minVersion,
IndexMinUpdateInterval: cfg.IndexMinUpdateInterval,
}, nil
}
return resource.SearchOptions{}, nil

View File

@ -59,12 +59,11 @@ func runTestSearchBackendBuildIndex(t *testing.T, backend resource.SearchBackend
}
// Get the index should return nil if the index does not exist
index, err := backend.GetIndex(ctx, ns)
require.NoError(t, err)
index := backend.GetIndex(ns)
require.Nil(t, index)
// Build the index
index, err = backend.BuildIndex(ctx, ns, 0, nil, "test", func(index resource.ResourceIndex) (int64, error) {
index, err := backend.BuildIndex(ctx, ns, 0, nil, "test", func(index resource.ResourceIndex) (int64, error) {
// Write a test document
err := index.BulkIndex(&resource.BulkIndexRequest{
Items: []*resource.BulkIndexItem{
@ -91,8 +90,7 @@ func runTestSearchBackendBuildIndex(t *testing.T, backend resource.SearchBackend
require.NotNil(t, index)
// Get the index should now return the index
index, err = backend.GetIndex(ctx, ns)
require.NoError(t, err)
index = backend.GetIndex(ns)
require.NotNil(t, index)
}

View File

@ -67,3 +67,62 @@ describe('FilterByValueTransformerEditor', () => {
});
});
});
it('hides conditions field when there is 0 or 1 filter', () => {
const onChangeMock = jest.fn();
const input: DataFrame[] = [
{
fields: [{ name: 'field1', type: FieldType.string, config: {}, values: [] }],
length: 0,
},
];
// Test with 0 filters
const { queryByText, rerender } = render(
<FilterByValueTransformerEditor
input={input}
options={{ type: FilterByValueType.include, match: FilterByValueMatch.all, filters: [] }}
onChange={onChangeMock}
/>
);
expect(queryByText('Conditions')).not.toBeInTheDocument();
// Test with 1 filter
rerender(
<FilterByValueTransformerEditor
input={input}
options={{
type: FilterByValueType.include,
match: FilterByValueMatch.all,
filters: [{ fieldName: 'test', config: { id: ValueMatcherID.isNull, options: {} } }],
}}
onChange={onChangeMock}
/>
);
expect(queryByText('Conditions')).not.toBeInTheDocument();
});
it('shows conditions field when there are more than 1 filter', () => {
const onChangeMock = jest.fn();
const input: DataFrame[] = [
{
fields: [{ name: 'field1', type: FieldType.string, config: {}, values: [] }],
length: 0,
},
];
const { getByText } = render(
<FilterByValueTransformerEditor
input={input}
options={{
type: FilterByValueType.include,
match: FilterByValueMatch.all,
filters: [
{ fieldName: 'test1', config: { id: ValueMatcherID.isNull, options: {} } },
{ fieldName: 'test2', config: { id: ValueMatcherID.isNull, options: {} } },
],
}}
onChange={onChangeMock}
/>
);
expect(getByText('Conditions')).toBeInTheDocument();
});

View File

@ -124,14 +124,16 @@ export const FilterByValueTransformerEditor = (props: TransformerUIProps<FilterB
<RadioButtonGroup options={filterTypes} value={options.type} onChange={onChangeType} fullWidth />
</div>
</InlineField>
<InlineField
label={t('transformers.filter-by-value-transformer-editor.label-conditions', 'Conditions')}
labelWidth={16}
>
<div className="width-15">
<RadioButtonGroup options={filterMatch} value={options.match} onChange={onChangeMatch} fullWidth />
</div>
</InlineField>
{options.filters.length > 1 && (
<InlineField
label={t('transformers.filter-by-value-transformer-editor.label-conditions', 'Conditions')}
labelWidth={16}
>
<div className="width-15">
<RadioButtonGroup options={filterMatch} value={options.match} onChange={onChangeMatch} fullWidth />
</div>
</InlineField>
)}
<Box paddingLeft={2}>
{options.filters.map((filter, idx) => (
<FilterByValueFilterEditor